- Use external, not internal port to sign SessionCreated message.
     Together with previous fix to allow external port change, this
     should fix session establish fails when NAT changes our port
   - Track outbound establishments by both Hash and IP/port,
     to improve lookups of establishments in progress
   - Fix expiration of outbound establishments
   - Validate address/port in RelayResponse messages
   - Change RemoteHostID to store Hash instead of byte[] for the peer hash
   - Log tweaks
This commit is contained in:
zzz
2012-08-21 19:53:08 +00:00
parent fbd8c69eea
commit 612fab1b2a
10 changed files with 355 additions and 186 deletions

View File

@ -1,3 +1,16 @@
2012-08-21 zzz
* NetDB: Decrease stat publish probability
* SSU:
- Use external, not internal port to sign SessionCreated message.
Together with previous fix to allow external port change, this
should fix session establish fails when NAT changes our port
- Track outbound establishments by both Hash and IP/port,
to improve lookups of establishments in progress
- Fix expiration of outbound establishments
- Validate address/port in RelayResponse messages
- Change RemoteHostID to store Hash instead of byte[] for the peer hash
- Log tweaks
2012-08-20 zzz
* I2CP: MessageStatus cleanup
* i2psnark: Add minimum tracker and DHT announce intervals

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 13;
public final static long BUILD = 14;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -39,14 +39,47 @@ class EstablishmentManager {
private final Log _log;
private final UDPTransport _transport;
private final PacketBuilder _builder;
/** map of RemoteHostId to InboundEstablishState */
private final ConcurrentHashMap<RemoteHostId, InboundEstablishState> _inboundStates;
/** map of RemoteHostId to OutboundEstablishState */
/**
* Map of RemoteHostId to OutboundEstablishState.
* The key could be either an IP/Port (for direct) or
* a Hash (for indirect, before the RelayResponse is received).
* Once the RelayResponse is received we change the key.
*/
private final ConcurrentHashMap<RemoteHostId, OutboundEstablishState> _outboundStates;
/** map of RemoteHostId to List of OutNetMessage for messages exceeding capacity */
private final ConcurrentHashMap<RemoteHostId, List<OutNetMessage>> _queuedOutbound;
/** map of nonce (Long) to OutboundEstablishState */
/**
* Map of nonce (Long) to OutboundEstablishState.
* Only for indirect, before we receive the RelayResponse.
* This is so we can lookup state for the RelayResponse.
* After we receive the relay response, _outboundStates is keyed by actual IP.
*/
private final ConcurrentHashMap<Long, OutboundEstablishState> _liveIntroductions;
/**
* Map of claimed IP/port to OutboundEstablishState.
* Only for indirect, before we receive the RelayResponse.
* This is so we can lookup a pending introduction by IP
* even before we know the "real" IP, so we can match an inbound packet.
* After we receive the relay response, _outboundStates is keyed by actual IP.
*/
private final ConcurrentHashMap<RemoteHostId, OutboundEstablishState> _outboundByClaimedAddress;
/**
* Map of router hash to OutboundEstablishState.
* Only for indirect, after we receive the RelayResponse.
* This is so we can lookup a pending connection by Hash
* even after we've got the IP/port, so we can match a subsequent outbound packet.
* Before we receive the relay response, _outboundStates is keyed by hash.
*/
private final ConcurrentHashMap<Hash, OutboundEstablishState> _outboundByHash;
private volatile boolean _alive;
private final Object _activityLock;
private int _activity;
@ -81,7 +114,7 @@ class EstablishmentManager {
private static final int MAX_IB_ESTABLISH_TIME = 20*1000;
/** max before receiving a response to a single message during outbound establishment */
private static final int OB_MESSAGE_TIMEOUT = 15*1000;
public static final int OB_MESSAGE_TIMEOUT = 15*1000;
/** for the DSM and or netdb store */
private static final int DATA_MESSAGE_TIMEOUT = 10*1000;
@ -95,6 +128,8 @@ class EstablishmentManager {
_outboundStates = new ConcurrentHashMap();
_queuedOutbound = new ConcurrentHashMap();
_liveIntroductions = new ConcurrentHashMap();
_outboundByClaimedAddress = new ConcurrentHashMap();
_outboundByHash = new ConcurrentHashMap();
_activityLock = new Object();
_context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", UDPTransport.RATES);
@ -149,6 +184,11 @@ class EstablishmentManager {
*/
OutboundEstablishState getOutboundState(RemoteHostId from) {
OutboundEstablishState state = _outboundStates.get(from);
if (state == null) {
state = _outboundByClaimedAddress.get(from);
if (state != null && _log.shouldLog(Log.INFO))
_log.info("Found by claimed address: " + state);
}
// if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
// _log.debug("No outbound states for " + from + ", with remaining: " + _outboundStates);
return state;
@ -166,34 +206,49 @@ class EstablishmentManager {
* with them and sending it off. This call does not block, and on failure,
* the message is failed.
*
* Note - if we go back to multiple PacketHandler threads, this may need more locking.
*/
public void establish(OutNetMessage msg) {
RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle());
establish(msg, true);
}
/**
* @param queueIfMaxExceeded true normally, false if called from locked_admit so we don't loop
* @since 0.9.2
*/
private void establish(OutNetMessage msg, boolean queueIfMaxExceeded) {
RouterInfo toRouterInfo = msg.getTarget();
RouterAddress ra = toRouterInfo.getTargetAddress(_transport.getStyle());
if (ra == null) {
_transport.failed(msg, "Remote peer has no address, cannot establish");
return;
}
if (msg.getTarget().getNetworkId() != Router.NETWORK_ID) {
_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash());
_transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
RouterIdentity toIdentity = toRouterInfo.getIdentity();
Hash toHash = toIdentity.calculateHash();
if (toRouterInfo.getNetworkId() != Router.NETWORK_ID) {
_context.shitlist().shitlistRouter(toHash);
_transport.markUnreachable(toHash);
_transport.failed(msg, "Remote peer is on the wrong network, cannot establish");
return;
}
UDPAddress addr = new UDPAddress(ra);
RemoteHostId to = null;
RemoteHostId maybeTo = null;
InetAddress remAddr = addr.getHostAddress();
int port = addr.getPort();
if (remAddr != null && port > 0 && port <= 65535) {
to = new RemoteHostId(remAddr.getAddress(), port);
if (!_transport.isValid(to.getIP())) {
// check for validity and existing inbound state, using the
// claimed address (which we won't be using if indirect)
if (remAddr != null && port > 0 && port <= 65535) {
maybeTo = new RemoteHostId(remAddr.getAddress(), port);
if (!_transport.isValid(maybeTo.getIP())) {
_transport.failed(msg, "Remote peer's IP isn't valid");
_transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
_transport.markUnreachable(toHash);
//_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE);
return;
}
InboundEstablishState inState = _inboundStates.get(to);
InboundEstablishState inState = _inboundStates.get(maybeTo);
if (inState != null) {
// we have an inbound establishment in progress, queue it there instead
synchronized (inState) {
@ -222,13 +277,14 @@ class EstablishmentManager {
}
return;
}
}
RemoteHostId to;
boolean isIndirect = addr.getIntroducerCount() > 0 || maybeTo == null;
if (isIndirect) {
to = new RemoteHostId(toHash);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add indirect outbound establish state to: " + addr);
to = new RemoteHostId(msg.getTarget().getIdentity().calculateHash().getData());
to = maybeTo;
}
OutboundEstablishState state = null;
@ -238,7 +294,12 @@ class EstablishmentManager {
state = _outboundStates.get(to);
if (state == null) {
if (_outboundStates.size() >= getMaxConcurrentEstablish()) {
state = _outboundByHash.get(toHash);
if (state != null && _log.shouldLog(Log.INFO))
_log.info("Found by hash: " + state);
}
if (state == null) {
if (queueIfMaxExceeded && _outboundStates.size() >= getMaxConcurrentEstablish()) {
if (_queuedOutbound.size() >= MAX_QUEUED_OUTBOUND) {
rejected = true;
} else {
@ -265,7 +326,7 @@ class EstablishmentManager {
// must have a valid session key
byte[] keyBytes = addr.getIntroKey();
if (keyBytes == null) {
_transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
_transport.markUnreachable(toHash);
_transport.failed(msg, "Peer has no key, cannot establish");
return;
}
@ -273,16 +334,18 @@ class EstablishmentManager {
try {
sessionKey = new SessionKey(keyBytes);
} catch (IllegalArgumentException iae) {
_transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
_transport.markUnreachable(toHash);
_transport.failed(msg, "Peer has bad key, cannot establish");
return;
}
state = new OutboundEstablishState(_context, remAddr, port,
msg.getTarget().getIdentity(),
state = new OutboundEstablishState(_context, maybeTo, to,
toIdentity,
sessionKey, addr, _transport.getDHBuilder());
OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state);
boolean isNew = oldState == null;
if (isNew) {
if (isIndirect && maybeTo != null)
_outboundByClaimedAddress.put(maybeTo, state);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding new " + state);
} else {
@ -364,7 +427,7 @@ class EstablishmentManager {
}
if (!_transport.allowConnection())
return; // drop the packet
state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort(),
state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getExternalPort(),
_transport.getDHBuilder());
state.receiveSessionRequest(reader.getSessionRequestReader());
InboundEstablishState oldState = _inboundStates.putIfAbsent(from, state);
@ -387,12 +450,13 @@ class EstablishmentManager {
if (_log.shouldLog(Log.INFO))
_log.info("Received NEW session request from " + from + ", sending relay tag " + tag);
} else {
// we got an IB even though we were firewalled, hidden, not high cap, etc.
if (_log.shouldLog(Log.INFO))
_log.info("Received session request, but our status is " + _transport.getReachabilityStatus());
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive DUP session request from: " + state.getRemoteHostId());
_log.debug("Receive DUP session request from: " + state);
}
notifyActivity();
@ -408,7 +472,7 @@ class EstablishmentManager {
state.receiveSessionConfirmed(reader.getSessionConfirmedReader());
notifyActivity();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session confirmed from: " + state.getRemoteHostId().toString());
_log.debug("Receive session confirmed from: " + state);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive (DUP?) session confirmed from: " + from);
@ -425,7 +489,7 @@ class EstablishmentManager {
state.receiveSessionCreated(reader.getSessionCreatedReader());
notifyActivity();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session created from: " + state.getRemoteHostId().toString());
_log.debug("Receive session created from: " + state);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive (DUP?) session created from: " + from);
@ -476,6 +540,7 @@ class EstablishmentManager {
* A data packet arrived on an outbound connection being established, which
* means its complete (yay!). This is a blocking call, more than I'd like...
*
* @return the new PeerState
*/
PeerState receiveData(OutboundEstablishState state) {
state.dataReceived();
@ -541,29 +606,9 @@ class EstablishmentManager {
if (queued.isEmpty())
continue;
OutNetMessage msg = queued.get(0);
RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle());
if (ra == null) {
for (int i = 0; i < queued.size(); i++)
_transport.failed(queued.get(i), "Cannot admit to the queue, as it has no address");
continue;
}
UDPAddress addr = new UDPAddress(ra);
InetAddress remAddr = addr.getHostAddress();
int port = addr.getPort();
OutboundEstablishState qstate = new OutboundEstablishState(_context, remAddr, port,
msg.getTarget().getIdentity(),
new SessionKey(addr.getIntroKey()), addr,
_transport.getDHBuilder());
OutboundEstablishState old = _outboundStates.putIfAbsent(to, qstate);
if (old != null)
qstate = old;
for (int i = 0; i < queued.size(); i++) {
OutNetMessage m = queued.get(i);
for (OutNetMessage m : queued) {
m.timestamp("no longer deferred... establishing");
qstate.addMessage(m);
establish(m, false);
}
admitted++;
}
@ -611,7 +656,7 @@ class EstablishmentManager {
//peer.setTheyRelayToUsAs(0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle completely established (inbound): " + state.getRemoteHostId().toString()
_log.debug("Handle completely established (inbound): " + state
+ " - " + peer.getRemotePeer());
//if (true) // for now, only support direct
@ -681,6 +726,7 @@ class EstablishmentManager {
* ok, fully received, add it to the established cons and send any
* queued messages
*
* @return the new PeerState
*/
private PeerState handleCompletelyEstablished(OutboundEstablishState state) {
if (state.complete()) {
@ -691,6 +737,11 @@ class EstablishmentManager {
long now = _context.clock().now();
RouterIdentity remote = state.getRemoteIdentity();
// only if == state
RemoteHostId claimed = state.getClaimedAddress();
if (claimed != null)
_outboundByClaimedAddress.remove(claimed, state);
_outboundByHash.remove(remote.calculateHash(), state);
PeerState peer = new PeerState(_context, _transport,
state.getSentIP(), state.getSentPort(), remote.calculateHash(), false);
peer.setCurrentCipherKey(state.getCipherKey());
@ -703,7 +754,7 @@ class EstablishmentManager {
//peer.setWeRelayToThemAs(0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle completely established (outbound): " + state.getRemoteHostId().toString()
_log.debug("Handle completely established (outbound): " + state
+ " - " + peer.getRemotePeer());
@ -764,13 +815,13 @@ class EstablishmentManager {
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send created to: " + state.getRemoteHostId().toString());
_log.debug("Send created to: " + state);
try {
state.generateSessionKey();
} catch (DHSessionKeyBuilder.InvalidPublicParameterException ippe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Peer " + state.getRemoteHostId() + " sent us an invalid DH parameter (or were spoofed)", ippe);
_log.error("Peer " + state + " sent us an invalid DH parameter (or were spoofed)", ippe);
_inboundStates.remove(state.getRemoteHostId());
return;
}
@ -783,13 +834,13 @@ class EstablishmentManager {
*/
private void sendRequest(OutboundEstablishState state) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send SessionRequest to: " + state.getRemoteHostId());
_log.debug("Send SessionRequest to: " + state);
UDPPacket packet = _builder.buildSessionRequestPacket(state);
if (packet != null) {
_transport.send(packet);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to build a session request packet for " + state.getRemoteHostId());
_log.warn("Unable to build a session request packet for " + state);
}
state.requestSent();
}
@ -816,50 +867,61 @@ class EstablishmentManager {
_transport.send(requests[i]);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send intro for " + state.getRemoteHostId() + " with our intro key as " + _transport.getIntroKey());
_log.debug("Send intro for " + state + " with our intro key as " + _transport.getIntroKey());
state.introSent();
}
void receiveRelayResponse(RemoteHostId bob, UDPPacketReader reader) {
long nonce = reader.getRelayResponseReader().readNonce();
OutboundEstablishState state = _liveIntroductions.remove(Long.valueOf(nonce));
if (state == null)
if (state == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Dup or unknown RelayResponse: " + nonce);
return; // already established
}
int sz = reader.getRelayResponseReader().readCharlieIPSize();
byte ip[] = new byte[sz];
reader.getRelayResponseReader().readCharlieIP(ip, 0);
int port = reader.getRelayResponseReader().readCharliePort();
InetAddress addr = null;
try {
if (!_transport.isValid(ip))
throw new UnknownHostException("non-public IP");
if (port <= 0 || port > 65535)
throw new UnknownHostException("bad port " + port);
addr = InetAddress.getByAddress(ip);
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Introducer for " + state + " (" + bob + ") sent us an invalid IP for our target: " + Addresses.toString(ip), uhe);
_log.warn("Introducer for " + state + " (" + bob + ") sent us an invalid address for our target: " + Addresses.toString(ip, port), uhe);
// these two cause this peer to requeue for a new intro peer
state.introductionFailed();
notifyActivity();
return;
}
_context.statManager().addRateData("udp.receiveIntroRelayResponse", state.getLifetime(), 0);
int port = reader.getRelayResponseReader().readCharliePort();
if (_log.shouldLog(Log.INFO))
_log.info("Received RelayResponse for " + state.getRemoteIdentity().calculateHash() + " - they are on "
+ addr.toString() + ":" + port + " (according to " + bob + ")");
+ addr.toString() + ":" + port + " (according to " + bob + ") nonce=" + nonce);
synchronized (state) {
RemoteHostId oldId = state.getRemoteHostId();
state.introduced(addr, ip, port);
state.introduced(ip, port);
RemoteHostId newId = state.getRemoteHostId();
// Swap out the RemoteHostId the state is indexed under
// TODO only if !oldId.equals(newId) ? synch?
// FIXME if the RemoteHostIDs aren't the same we have problems
// FIXME if the RemoteHostIDs aren't the same the SessionCreated signature is probably going to fail
// Common occurrence - port changes
// Swap out the RemoteHostId the state is indexed under.
// It was a Hash, change it to a IP/port.
// Remove the entry in the byClaimedAddress map as it's now in main map.
// Add an entry in the byHash map so additional OB pkts can find it.
_outboundByHash.put(state.getRemoteIdentity().calculateHash(), state);
RemoteHostId claimed = state.getClaimedAddress();
if (!oldId.equals(newId)) {
_outboundStates.remove(oldId);
_outboundStates.put(newId, state);
if (_log.shouldLog(Log.WARN))
_log.warn("RR replaced " + oldId + " with " + newId + " -> " + state);
_log.warn("RR replaced " + oldId + " with " + newId + ", claimed address was " + claimed);
}
//
if (claimed != null)
_outboundByClaimedAddress.remove(oldId, state); // only if == state
}
notifyActivity();
}
@ -872,8 +934,13 @@ class EstablishmentManager {
*/
private void sendConfirmation(OutboundEstablishState state) {
boolean valid = state.validateSessionCreated();
if (!valid) // validate clears fields on failure
if (!valid) {
// validate clears fields on failure
// TODO - send destroy? shitlist?
if (_log.shouldLog(Log.WARN))
_log.warn("SessionCreated validate failed: " + state);
return;
}
if (!_transport.isValid(state.getReceivedIP()) || !_transport.isValid(state.getRemoteHostId().getIP())) {
state.fail();
@ -936,7 +1003,7 @@ class EstablishmentManager {
/**
* Drive through the inbound establishment states, adjusting one of them
* as necessary
* as necessary. Called from Establisher thread only.
* @return next requested time or -1
*/
private long handleInbound() {
@ -1053,7 +1120,7 @@ class EstablishmentManager {
/**
* Drive through the outbound establishment states, adjusting one of them
* as necessary
* as necessary. Called from Establisher thread only.
* @return next requested time or -1
*/
private long handleOutbound() {
@ -1073,7 +1140,7 @@ class EstablishmentManager {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing confirmed outbound: " + cur);
break;
} else if (cur.getLifetime() > MAX_OB_ESTABLISH_TIME) {
} else if (cur.getLifetime() >= MAX_OB_ESTABLISH_TIME) {
// took too long
iter.remove();
outboundState = cur;
@ -1116,7 +1183,7 @@ class EstablishmentManager {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Processing for outbound: " + outboundState);
synchronized (outboundState) {
boolean expired = outboundState.getLifetime() > MAX_OB_ESTABLISH_TIME;
boolean expired = outboundState.getLifetime() >= MAX_OB_ESTABLISH_TIME;
switch (outboundState.getState()) {
case OB_STATE_UNKNOWN: // fall thru
case OB_STATE_INTRODUCED:
@ -1129,7 +1196,7 @@ class EstablishmentManager {
case OB_STATE_REQUEST_SENT:
// no response yet (or it was invalid), lets retry
long rtime = outboundState.getRequestSentTime();
if (expired || (rtime > 0 && rtime + OB_MESSAGE_TIMEOUT < now))
if (expired || (rtime > 0 && rtime + OB_MESSAGE_TIMEOUT <= now))
processExpired(outboundState);
else if (outboundState.getNextSendTime() <= now)
sendRequest(outboundState);
@ -1144,7 +1211,7 @@ class EstablishmentManager {
case OB_STATE_CONFIRMED_PARTIALLY:
long ctime = outboundState.getConfirmedSentTime();
if (expired || (ctime > 0 && ctime + OB_MESSAGE_TIMEOUT < now)) {
if (expired || (ctime > 0 && ctime + OB_MESSAGE_TIMEOUT <= now)) {
sendDestroy(outboundState);
processExpired(outboundState);
} else if (outboundState.getNextSendTime() <= now) {
@ -1161,7 +1228,7 @@ class EstablishmentManager {
case OB_STATE_PENDING_INTRO:
long itime = outboundState.getIntroSentTime();
if (expired || (itime > 0 && itime + OB_MESSAGE_TIMEOUT < now))
if (expired || (itime > 0 && itime + OB_MESSAGE_TIMEOUT <= now))
processExpired(outboundState);
else if (outboundState.getNextSendTime() <= now)
handlePendingIntro(outboundState);
@ -1186,16 +1253,21 @@ class EstablishmentManager {
boolean removed = _liveIntroductions.remove(Long.valueOf(nonce), outboundState);
if (removed) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send intro for " + outboundState.getRemoteHostId() + " timed out");
_log.debug("Send intro for " + outboundState + " timed out");
_context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0);
}
}
// only if == state
RemoteHostId claimed = outboundState.getClaimedAddress();
if (claimed != null)
_outboundByClaimedAddress.remove(claimed, outboundState);
_outboundByHash.remove(outboundState.getRemoteIdentity().calculateHash(), outboundState);
// should have already been removed in handleOutbound() above
// remove only if value == state
boolean removed = _outboundStates.remove(outboundState.getRemoteHostId(), outboundState);
if (outboundState.getState() != OB_STATE_CONFIRMED_COMPLETELY) {
if (_log.shouldLog(Log.INFO))
_log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime());
_log.info("Expired: " + outboundState + " Lifetime: " + outboundState.getLifetime());
OutNetMessage msg;
while ((msg = outboundState.getNextQueuedMessage()) != null) {
_transport.failed(msg, "Expired during failed establish");
@ -1244,54 +1316,92 @@ class EstablishmentManager {
_inboundStates.clear();
_outboundStates.clear();
_queuedOutbound.clear();
_liveIntroductions.clear();
_outboundByClaimedAddress.clear();
_outboundByHash.clear();
}
}
// Debugging
private long _lastPrinted;
private static final long PRINT_INTERVAL = 5*1000;
private long _lastFailsafe;
private static final long FAILSAFE_INTERVAL = 3*60*1000;
// Debugging
private long _lastPrinted;
private static final long PRINT_INTERVAL = 5*1000;
private void doPass() {
if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) {
_lastPrinted = _context.clock().now();
int iactive = _inboundStates.size();
int oactive = _outboundStates.size();
if (iactive > 0 || oactive > 0) {
int queued = _queuedOutbound.size();
int live = _liveIntroductions.size();
_log.debug("OB states: " + oactive + " IB states: " + iactive +
" OB queued: " + queued + " intros: " + live);
private void doPass() {
if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) {
_lastPrinted = _context.clock().now();
int iactive = _inboundStates.size();
int oactive = _outboundStates.size();
if (iactive > 0 || oactive > 0) {
int queued = _queuedOutbound.size();
int live = _liveIntroductions.size();
int claimed = _outboundByClaimedAddress.size();
int hash = _outboundByHash.size();
_log.debug("OB states: " + oactive + " IB states: " + iactive +
" OB queued: " + queued + " intros: " + live +
" OB claimed: " + claimed + " hash: " + hash);
}
}
_activity = 0;
long now = _context.clock().now();
if (_lastFailsafe + FAILSAFE_INTERVAL < _context.clock().now()) {
_lastFailsafe = _context.clock().now();
doFailsafe();
}
long nextSendTime = -1;
long nextSendInbound = handleInbound();
long nextSendOutbound = handleOutbound();
if (nextSendInbound > 0)
nextSendTime = nextSendInbound;
if ( (nextSendTime < 0) || (nextSendOutbound < nextSendTime) )
nextSendTime = nextSendOutbound;
long delay = nextSendTime - now;
if ( (nextSendTime == -1) || (delay > 0) ) {
if (delay > 1000)
delay = 1000;
try {
synchronized (_activityLock) {
if (_activity > 0)
return;
if (nextSendTime == -1)
_activityLock.wait(1000);
else
_activityLock.wait(delay);
}
} catch (InterruptedException ie) {
}
// if (_log.shouldLog(Log.DEBUG))
// _log.debug("After waiting w/ nextSend=" + nextSendTime
// + " and delay=" + delay + " and interrupted=" + interrupted);
}
}
_activity = 0;
long now = _context.clock().now();
long nextSendTime = -1;
long nextSendInbound = handleInbound();
long nextSendOutbound = handleOutbound();
if (nextSendInbound > 0)
nextSendTime = nextSendInbound;
if ( (nextSendTime < 0) || (nextSendOutbound < nextSendTime) )
nextSendTime = nextSendOutbound;
long delay = nextSendTime - now;
if ( (nextSendTime == -1) || (delay > 0) ) {
if (delay > 1000)
delay = 1000;
try {
synchronized (_activityLock) {
if (_activity > 0)
return;
if (nextSendTime == -1)
_activityLock.wait(1000);
else
_activityLock.wait(delay);
/** @since 0.9.2 */
private void doFailsafe() {
for (Iterator<OutboundEstablishState> iter = _liveIntroductions.values().iterator(); iter.hasNext(); ) {
OutboundEstablishState state = iter.next();
if (state.getLifetime() > 3*MAX_OB_ESTABLISH_TIME) {
iter.remove();
if (_log.shouldLog(Log.WARN))
_log.warn("Failsafe remove LI " + state);
}
}
for (Iterator<OutboundEstablishState> iter = _outboundByClaimedAddress.values().iterator(); iter.hasNext(); ) {
OutboundEstablishState state = iter.next();
if (state.getLifetime() > 3*MAX_OB_ESTABLISH_TIME) {
iter.remove();
if (_log.shouldLog(Log.WARN))
_log.warn("Failsafe remove OBBCA " + state);
}
}
for (Iterator<OutboundEstablishState> iter = _outboundByHash.values().iterator(); iter.hasNext(); ) {
OutboundEstablishState state = iter.next();
if (state.getLifetime() > 3*MAX_OB_ESTABLISH_TIME) {
iter.remove();
if (_log.shouldLog(Log.WARN))
_log.warn("Failsafe remove OBBH " + state);
}
} catch (InterruptedException ie) {
}
// if (_log.shouldLog(Log.DEBUG))
// _log.debug("After waiting w/ nextSend=" + nextSendTime
// + " and delay=" + delay + " and interrupted=" + interrupted);
}
}
}

View File

@ -83,6 +83,10 @@ class InboundEstablishState {
/** max delay including backoff */
private static final long MAX_DELAY = 15*1000;
/**
* @param localPort Must be our external port, otherwise the signature of the
& SessionCreated message will be bad if the external port != the internal port.
*/
public InboundEstablishState(RouterContext ctx, byte remoteIP[], int remotePort, int localPort,
DHSessionKeyBuilder dh) {
_context = ctx;

View File

@ -12,6 +12,8 @@ import net.i2p.data.RouterAddress;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.TransportImpl;
import net.i2p.util.Addresses;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
@ -141,12 +143,16 @@ class IntroductionManager {
_log.info("Peer is idle too long: " + cur);
continue;
}
byte[] ip = cur.getRemoteIP();
int port = cur.getRemotePort();
if (ip == null || !TransportImpl.isPubliclyRoutable(ip) || port <= 0 || port > 65535)
continue;
if (_log.shouldLog(Log.INFO))
_log.info("Picking introducer: " + cur);
cur.setIntroducerTime();
UDPAddress ura = new UDPAddress(ra);
ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, cur.getRemoteHostId().toHostString());
ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(cur.getRemotePort()));
ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, Addresses.toString(ip));
ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(port));
ssuOptions.setProperty(UDPAddress.PROP_INTRO_KEY_PREFIX + found, Base64.encode(ura.getIntroKey()));
ssuOptions.setProperty(UDPAddress.PROP_INTRO_TAG_PREFIX + found, String.valueOf(cur.getTheyRelayToUsAs()));
found++;

View File

@ -50,6 +50,7 @@ class OutboundEstablishState {
private long _lastSend;
private long _nextSend;
private RemoteHostId _remoteHostId;
private final RemoteHostId _claimedAddress;
private final RouterIdentity _remotePeer;
private final SessionKey _introKey;
private final Queue<OutNetMessage> _queuedMessages;
@ -91,22 +92,25 @@ class OutboundEstablishState {
private static final long MAX_DELAY = 15*1000;
/**
* @param claimedAddress an IP/port based RemoteHostId, or null if unknown
* @param remoteHostId non-null, == claimedAddress if direct, or a hash-based one if indirect
* @param addr non-null
*/
public OutboundEstablishState(RouterContext ctx, InetAddress remoteHost, int remotePort,
public OutboundEstablishState(RouterContext ctx, RemoteHostId claimedAddress,
RemoteHostId remoteHostId,
RouterIdentity remotePeer, SessionKey introKey, UDPAddress addr,
DHSessionKeyBuilder dh) {
_context = ctx;
_log = ctx.logManager().getLog(OutboundEstablishState.class);
if ( (remoteHost != null) && (remotePort > 0) ) {
_bobIP = remoteHost.getAddress();
_bobPort = remotePort;
_remoteHostId = new RemoteHostId(_bobIP, _bobPort);
if (claimedAddress != null) {
_bobIP = claimedAddress.getIP();
_bobPort = claimedAddress.getPort();
} else {
_bobIP = null;
//_bobIP = null;
_bobPort = -1;
_remoteHostId = new RemoteHostId(remotePeer.calculateHash().getData());
}
_claimedAddress = claimedAddress;
_remoteHostId = remoteHostId;
_remotePeer = remotePeer;
_introKey = introKey;
_queuedMessages = new LinkedBlockingQueue();
@ -173,9 +177,17 @@ class OutboundEstablishState {
}
public byte[] getSentX() { return _sentX; }
/** the remote side (Bob) */
/**
* The remote side (Bob) - note that in some places he's called Charlie.
* Warning - may change after introduction. May be null before introduction.
*/
public synchronized byte[] getSentIP() { return _bobIP; }
/** the remote side (Bob) */
/**
* The remote side (Bob) - note that in some places he's called Charlie.
* Warning - may change after introduction. May be -1 before introduction.
*/
public synchronized int getSentPort() { return _bobPort; }
public synchronized void receiveSessionCreated(UDPPacketReader.SessionCreatedReader reader) {
@ -409,7 +421,8 @@ class OutboundEstablishState {
delay = RETRANSMIT_DELAY;
_confirmedSentTime = _lastSend;
} else {
delay = Math.min(RETRANSMIT_DELAY << _confirmedSentCount, MAX_DELAY);
delay = Math.min(RETRANSMIT_DELAY << _confirmedSentCount,
_confirmedSentTime + EstablishmentManager.OB_MESSAGE_TIMEOUT - _lastSend);
}
_confirmedSentCount++;
_nextSend = _lastSend + delay;
@ -437,7 +450,8 @@ class OutboundEstablishState {
delay = RETRANSMIT_DELAY;
_requestSentTime = _lastSend;
} else {
delay = Math.min(RETRANSMIT_DELAY << _requestSentCount, MAX_DELAY);
delay = Math.min(RETRANSMIT_DELAY << _requestSentCount,
_requestSentTime + EstablishmentManager.OB_MESSAGE_TIMEOUT - _lastSend);
}
_requestSentCount++;
_nextSend = _lastSend + delay;
@ -463,7 +477,8 @@ class OutboundEstablishState {
delay = RETRANSMIT_DELAY;
_introSentTime = _lastSend;
} else {
delay = Math.min(RETRANSMIT_DELAY << _introSentCount, MAX_DELAY);
delay = Math.min(RETRANSMIT_DELAY << _introSentCount,
_introSentTime + EstablishmentManager.OB_MESSAGE_TIMEOUT - _lastSend);
}
_introSentCount++;
_nextSend = _lastSend + delay;
@ -484,17 +499,24 @@ class OutboundEstablishState {
}
/**
* This changes the remoteHostId from a hash-based one to a IP/Port one,
* OR the IP or port could change.
* This changes the remoteHostId from a hash-based one or possibly
* incorrect IP/port to what the introducer told us.
* All params are for the remote end (NOT the introducer) and must have been validated already.
*/
public synchronized void introduced(InetAddress bob, byte bobIP[], int bobPort) {
public synchronized void introduced(byte bobIP[], int bobPort) {
if (_currentState != OutboundState.OB_STATE_PENDING_INTRO)
return; // we've already successfully been introduced, so don't overwrite old settings
_nextSend = _context.clock().now() + 500; // wait briefly for the hole punching
_currentState = OutboundState.OB_STATE_INTRODUCED;
_bobIP = bobIP;
_bobPort = bobPort;
_remoteHostId = new RemoteHostId(bobIP, bobPort);
if (_claimedAddress != null && bobPort == _bobPort && DataHelper.eq(bobIP, _bobIP)) {
// he's who he said he was
_remoteHostId = _claimedAddress;
} else {
// no IP/port or wrong IP/port in RI
_bobIP = bobIP;
_bobPort = bobPort;
_remoteHostId = new RemoteHostId(bobIP, bobPort);
}
if (_log.shouldLog(Log.INFO))
_log.info("Introduced to " + _remoteHostId + ", now lets get on with establishing");
}
@ -504,9 +526,23 @@ class OutboundEstablishState {
public long getEstablishBeginTime() { return _establishBegin; }
public synchronized long getNextSendTime() { return _nextSend; }
/** uniquely identifies an attempt */
/**
* This should be what the state is currently indexed by in the _outboundStates table.
* Beware -
* During introduction, this is a router hash.
* After introduced() is called, this is set to the IP/port the introducer told us.
* @return non-null
*/
RemoteHostId getRemoteHostId() { return _remoteHostId; }
/**
* This will never be a hash-based address.
* This is the 'claimed' (unverified) address from the netdb, or null.
* It is not changed after introduction. Use getRemoteHostId() for the verified address.
* @return may be null
*/
RemoteHostId getClaimedAddress() { return _claimedAddress; }
/** we have received a real data packet, so we're done establishing */
public synchronized void dataReceived() {
packetReceived();

View File

@ -2,6 +2,7 @@ package net.i2p.router.transport.udp;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.util.Addresses;
/**
@ -13,7 +14,7 @@ import net.i2p.util.Addresses;
final class RemoteHostId {
private final byte _ip[];
private final int _port;
private final byte _peerHash[];
private final Hash _peerHash;
private final int _hashCode;
/** direct */
@ -22,11 +23,11 @@ final class RemoteHostId {
}
/** indirect */
public RemoteHostId(byte peerHash[]) {
public RemoteHostId(Hash peerHash) {
this(null, 0, peerHash);
}
private RemoteHostId(byte ip[], int port, byte peerHash[]) {
private RemoteHostId(byte ip[], int port, Hash peerHash) {
_ip = ip;
_port = port;
_peerHash = peerHash;
@ -40,7 +41,7 @@ final class RemoteHostId {
public int getPort() { return _port; }
/** @return null if direct */
public byte[] getPeerHash() { return _peerHash; }
public Hash getPeerHash() { return _peerHash; }
@Override
public int hashCode() {
@ -58,18 +59,11 @@ final class RemoteHostId {
}
@Override
public String toString() { return toString(true); }
private String toString(boolean includePort) {
public String toString() {
if (_ip != null) {
if (includePort)
return Addresses.toString(_ip, _port);
else
return Addresses.toString(_ip);
return Addresses.toString(_ip, _port);
} else {
return Base64.encode(_peerHash);
return _peerHash.toString();
}
}
public String toHostString() { return toString(false); }
}

View File

@ -39,31 +39,7 @@ public class UDPAddress {
static final int MAX_INTRODUCERS = 3;
public UDPAddress(RouterAddress addr) {
parse(addr);
}
@Override
public String toString() {
StringBuilder rv = new StringBuilder(64);
if (_introHosts != null) {
for (int i = 0; i < _introHosts.length; i++) {
rv.append("ssu://");
rv.append(_introTags[i]).append('@');
rv.append(_introHosts[i]).append(':').append(_introPorts[i]);
//rv.append('/').append(Base64.encode(_introKeys[i]));
if (i + 1 < _introKeys.length)
rv.append(", ");
}
} else {
if ( (_host != null) && (_port > 0) )
rv.append("ssu://").append(_host).append(':').append(_port);//.append('/').append(Base64.encode(_introKey));
else
rv.append("ssu://autodetect.not.yet.complete:").append(_port);
}
return rv.toString();
}
private void parse(RouterAddress addr) {
// TODO make everything final
if (addr == null) return;
_host = addr.getOption(PROP_HOST);
if (_host != null) _host = _host.trim();
@ -198,4 +174,25 @@ public class UDPAddress {
int getMTU() {
return _mtu;
}
@Override
public String toString() {
StringBuilder rv = new StringBuilder(64);
if (_introHosts != null) {
for (int i = 0; i < _introHosts.length; i++) {
rv.append("ssu://");
rv.append(_introTags[i]).append('@');
rv.append(_introHosts[i]).append(':').append(_introPorts[i]);
//rv.append('/').append(Base64.encode(_introKeys[i]));
if (i + 1 < _introKeys.length)
rv.append(", ");
}
} else {
if ( (_host != null) && (_port > 0) )
rv.append("ssu://").append(_host).append(':').append(_port);//.append('/').append(Base64.encode(_introKey));
else
rv.append("ssu://autodetect.not.yet.complete:").append(_port);
}
return rv.toString();
}
}

View File

@ -490,22 +490,26 @@ class UDPPacketReader {
* Helper class to fetch the particular bitfields from the raw packet
*/
private class PacketACKBitfield implements ACKBitfield {
private int _start;
private int _bitfieldStart;
private int _bitfieldSize;
private final int _start;
private final int _bitfieldStart;
private final int _bitfieldSize;
public PacketACKBitfield(int start) {
_start = start;
_bitfieldStart = start + 4;
_bitfieldSize = 1;
int bfsz = 1;
// bitfield is an array of bytes where the high bit is 1 if
// further bytes in the bitfield follow
while ((_message[_bitfieldStart + _bitfieldSize - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0)
_bitfieldSize++;
while ((_message[_bitfieldStart + bfsz - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0)
bfsz++;
_bitfieldSize = bfsz;
}
public long getMessageId() { return DataHelper.fromLong(_message, _start, 4); }
public int getByteLength() { return 4 + _bitfieldSize; }
public int fragmentCount() { return _bitfieldSize * 7; }
public boolean receivedComplete() { return false; }
public boolean received(int fragmentNum) {
if ( (fragmentNum < 0) || (fragmentNum >= _bitfieldSize*7) )
return false;
@ -514,6 +518,7 @@ class UDPPacketReader {
int flagNum = fragmentNum % 7;
return (_message[byteNum] & (1 << flagNum)) != 0x0;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);

View File

@ -689,6 +689,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* Was true before 0.9.2
* Now false if we need introducers (as perhaps that's why we need them,
* our firewall is changing our port), unless overridden by the property.
* We must have an accurate external port when firewalled, or else
* our signature of the SessionCreated packet will be invalid.
*/
private boolean getIsPortFixed() {
String prop = _context.getProperty(PROP_FIXED_PORT);
@ -797,6 +799,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId == null) return false;
// Should always be direct... except maybe for hidden mode?
// or do we always know the IP by now?
if (remoteId.getIP() == null && _log.shouldLog(Log.WARN))
_log.warn("Add indirect: " + peer);
@ -1114,8 +1118,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (peer.getCurrentCipherKey() == null)
return;
UDPPacket pkt = _destroyBuilder.buildSessionDestroyPacket(peer);
if (_log.shouldLog(Log.WARN))
_log.warn("Sending destroy to : " + peer);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending destroy to : " + peer);
send(pkt);
}