From 612fab1b2a18eda0988b984b7dc9a2db85a38e3b Mon Sep 17 00:00:00 2001 From: zzz Date: Tue, 21 Aug 2012 19:53:08 +0000 Subject: [PATCH] * SSU: - Use external, not internal port to sign SessionCreated message. Together with previous fix to allow external port change, this should fix session establish fails when NAT changes our port - Track outbound establishments by both Hash and IP/port, to improve lookups of establishments in progress - Fix expiration of outbound establishments - Validate address/port in RelayResponse messages - Change RemoteHostID to store Hash instead of byte[] for the peer hash - Log tweaks --- history.txt | 13 + .../src/net/i2p/router/RouterVersion.java | 2 +- .../transport/udp/EstablishmentManager.java | 344 ++++++++++++------ .../transport/udp/InboundEstablishState.java | 4 + .../transport/udp/IntroductionManager.java | 10 +- .../transport/udp/OutboundEstablishState.java | 74 +++- .../router/transport/udp/RemoteHostId.java | 22 +- .../i2p/router/transport/udp/UDPAddress.java | 47 ++- .../router/transport/udp/UDPPacketReader.java | 17 +- .../router/transport/udp/UDPTransport.java | 8 +- 10 files changed, 355 insertions(+), 186 deletions(-) diff --git a/history.txt b/history.txt index a737500c74..fb575400a7 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,16 @@ +2012-08-21 zzz + * NetDB: Decrease stat publish probability + * SSU: + - Use external, not internal port to sign SessionCreated message. + Together with previous fix to allow external port change, this + should fix session establish fails when NAT changes our port + - Track outbound establishments by both Hash and IP/port, + to improve lookups of establishments in progress + - Fix expiration of outbound establishments + - Validate address/port in RelayResponse messages + - Change RemoteHostID to store Hash instead of byte[] for the peer hash + - Log tweaks + 2012-08-20 zzz * I2CP: MessageStatus cleanup * i2psnark: Add minimum tracker and DHT announce intervals diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 7f4edb9752..31c35f8c3e 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 13; + public final static long BUILD = 14; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index a31b4d38c0..c1d472915f 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -39,14 +39,47 @@ class EstablishmentManager { private final Log _log; private final UDPTransport _transport; private final PacketBuilder _builder; + /** map of RemoteHostId to InboundEstablishState */ private final ConcurrentHashMap _inboundStates; - /** map of RemoteHostId to OutboundEstablishState */ + + /** + * Map of RemoteHostId to OutboundEstablishState. + * The key could be either an IP/Port (for direct) or + * a Hash (for indirect, before the RelayResponse is received). + * Once the RelayResponse is received we change the key. + */ private final ConcurrentHashMap _outboundStates; + /** map of RemoteHostId to List of OutNetMessage for messages exceeding capacity */ private final ConcurrentHashMap> _queuedOutbound; - /** map of nonce (Long) to OutboundEstablishState */ + + /** + * Map of nonce (Long) to OutboundEstablishState. + * Only for indirect, before we receive the RelayResponse. + * This is so we can lookup state for the RelayResponse. + * After we receive the relay response, _outboundStates is keyed by actual IP. + */ private final ConcurrentHashMap _liveIntroductions; + + /** + * Map of claimed IP/port to OutboundEstablishState. + * Only for indirect, before we receive the RelayResponse. + * This is so we can lookup a pending introduction by IP + * even before we know the "real" IP, so we can match an inbound packet. + * After we receive the relay response, _outboundStates is keyed by actual IP. + */ + private final ConcurrentHashMap _outboundByClaimedAddress; + + /** + * Map of router hash to OutboundEstablishState. + * Only for indirect, after we receive the RelayResponse. + * This is so we can lookup a pending connection by Hash + * even after we've got the IP/port, so we can match a subsequent outbound packet. + * Before we receive the relay response, _outboundStates is keyed by hash. + */ + private final ConcurrentHashMap _outboundByHash; + private volatile boolean _alive; private final Object _activityLock; private int _activity; @@ -81,7 +114,7 @@ class EstablishmentManager { private static final int MAX_IB_ESTABLISH_TIME = 20*1000; /** max before receiving a response to a single message during outbound establishment */ - private static final int OB_MESSAGE_TIMEOUT = 15*1000; + public static final int OB_MESSAGE_TIMEOUT = 15*1000; /** for the DSM and or netdb store */ private static final int DATA_MESSAGE_TIMEOUT = 10*1000; @@ -95,6 +128,8 @@ class EstablishmentManager { _outboundStates = new ConcurrentHashMap(); _queuedOutbound = new ConcurrentHashMap(); _liveIntroductions = new ConcurrentHashMap(); + _outboundByClaimedAddress = new ConcurrentHashMap(); + _outboundByHash = new ConcurrentHashMap(); _activityLock = new Object(); _context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", UDPTransport.RATES); @@ -149,6 +184,11 @@ class EstablishmentManager { */ OutboundEstablishState getOutboundState(RemoteHostId from) { OutboundEstablishState state = _outboundStates.get(from); + if (state == null) { + state = _outboundByClaimedAddress.get(from); + if (state != null && _log.shouldLog(Log.INFO)) + _log.info("Found by claimed address: " + state); + } // if ( (state == null) && (_log.shouldLog(Log.DEBUG)) ) // _log.debug("No outbound states for " + from + ", with remaining: " + _outboundStates); return state; @@ -166,34 +206,49 @@ class EstablishmentManager { * with them and sending it off. This call does not block, and on failure, * the message is failed. * + * Note - if we go back to multiple PacketHandler threads, this may need more locking. */ public void establish(OutNetMessage msg) { - RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle()); + establish(msg, true); + } + + /** + * @param queueIfMaxExceeded true normally, false if called from locked_admit so we don't loop + * @since 0.9.2 + */ + private void establish(OutNetMessage msg, boolean queueIfMaxExceeded) { + RouterInfo toRouterInfo = msg.getTarget(); + RouterAddress ra = toRouterInfo.getTargetAddress(_transport.getStyle()); if (ra == null) { _transport.failed(msg, "Remote peer has no address, cannot establish"); return; } - if (msg.getTarget().getNetworkId() != Router.NETWORK_ID) { - _context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash()); - _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash()); + RouterIdentity toIdentity = toRouterInfo.getIdentity(); + Hash toHash = toIdentity.calculateHash(); + if (toRouterInfo.getNetworkId() != Router.NETWORK_ID) { + _context.shitlist().shitlistRouter(toHash); + _transport.markUnreachable(toHash); _transport.failed(msg, "Remote peer is on the wrong network, cannot establish"); return; } UDPAddress addr = new UDPAddress(ra); - RemoteHostId to = null; + RemoteHostId maybeTo = null; InetAddress remAddr = addr.getHostAddress(); int port = addr.getPort(); - if (remAddr != null && port > 0 && port <= 65535) { - to = new RemoteHostId(remAddr.getAddress(), port); - if (!_transport.isValid(to.getIP())) { + // check for validity and existing inbound state, using the + // claimed address (which we won't be using if indirect) + if (remAddr != null && port > 0 && port <= 65535) { + maybeTo = new RemoteHostId(remAddr.getAddress(), port); + + if (!_transport.isValid(maybeTo.getIP())) { _transport.failed(msg, "Remote peer's IP isn't valid"); - _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash()); + _transport.markUnreachable(toHash); //_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE); return; } - InboundEstablishState inState = _inboundStates.get(to); + InboundEstablishState inState = _inboundStates.get(maybeTo); if (inState != null) { // we have an inbound establishment in progress, queue it there instead synchronized (inState) { @@ -222,13 +277,14 @@ class EstablishmentManager { } return; } + } - - + RemoteHostId to; + boolean isIndirect = addr.getIntroducerCount() > 0 || maybeTo == null; + if (isIndirect) { + to = new RemoteHostId(toHash); } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add indirect outbound establish state to: " + addr); - to = new RemoteHostId(msg.getTarget().getIdentity().calculateHash().getData()); + to = maybeTo; } OutboundEstablishState state = null; @@ -238,7 +294,12 @@ class EstablishmentManager { state = _outboundStates.get(to); if (state == null) { - if (_outboundStates.size() >= getMaxConcurrentEstablish()) { + state = _outboundByHash.get(toHash); + if (state != null && _log.shouldLog(Log.INFO)) + _log.info("Found by hash: " + state); + } + if (state == null) { + if (queueIfMaxExceeded && _outboundStates.size() >= getMaxConcurrentEstablish()) { if (_queuedOutbound.size() >= MAX_QUEUED_OUTBOUND) { rejected = true; } else { @@ -265,7 +326,7 @@ class EstablishmentManager { // must have a valid session key byte[] keyBytes = addr.getIntroKey(); if (keyBytes == null) { - _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash()); + _transport.markUnreachable(toHash); _transport.failed(msg, "Peer has no key, cannot establish"); return; } @@ -273,16 +334,18 @@ class EstablishmentManager { try { sessionKey = new SessionKey(keyBytes); } catch (IllegalArgumentException iae) { - _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash()); + _transport.markUnreachable(toHash); _transport.failed(msg, "Peer has bad key, cannot establish"); return; } - state = new OutboundEstablishState(_context, remAddr, port, - msg.getTarget().getIdentity(), + state = new OutboundEstablishState(_context, maybeTo, to, + toIdentity, sessionKey, addr, _transport.getDHBuilder()); OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state); boolean isNew = oldState == null; if (isNew) { + if (isIndirect && maybeTo != null) + _outboundByClaimedAddress.put(maybeTo, state); if (_log.shouldLog(Log.DEBUG)) _log.debug("Adding new " + state); } else { @@ -364,7 +427,7 @@ class EstablishmentManager { } if (!_transport.allowConnection()) return; // drop the packet - state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort(), + state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getExternalPort(), _transport.getDHBuilder()); state.receiveSessionRequest(reader.getSessionRequestReader()); InboundEstablishState oldState = _inboundStates.putIfAbsent(from, state); @@ -387,12 +450,13 @@ class EstablishmentManager { if (_log.shouldLog(Log.INFO)) _log.info("Received NEW session request from " + from + ", sending relay tag " + tag); } else { + // we got an IB even though we were firewalled, hidden, not high cap, etc. if (_log.shouldLog(Log.INFO)) _log.info("Received session request, but our status is " + _transport.getReachabilityStatus()); } } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Receive DUP session request from: " + state.getRemoteHostId()); + _log.debug("Receive DUP session request from: " + state); } notifyActivity(); @@ -408,7 +472,7 @@ class EstablishmentManager { state.receiveSessionConfirmed(reader.getSessionConfirmedReader()); notifyActivity(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Receive session confirmed from: " + state.getRemoteHostId().toString()); + _log.debug("Receive session confirmed from: " + state); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Receive (DUP?) session confirmed from: " + from); @@ -425,7 +489,7 @@ class EstablishmentManager { state.receiveSessionCreated(reader.getSessionCreatedReader()); notifyActivity(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Receive session created from: " + state.getRemoteHostId().toString()); + _log.debug("Receive session created from: " + state); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Receive (DUP?) session created from: " + from); @@ -476,6 +540,7 @@ class EstablishmentManager { * A data packet arrived on an outbound connection being established, which * means its complete (yay!). This is a blocking call, more than I'd like... * + * @return the new PeerState */ PeerState receiveData(OutboundEstablishState state) { state.dataReceived(); @@ -541,29 +606,9 @@ class EstablishmentManager { if (queued.isEmpty()) continue; - OutNetMessage msg = queued.get(0); - RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle()); - if (ra == null) { - for (int i = 0; i < queued.size(); i++) - _transport.failed(queued.get(i), "Cannot admit to the queue, as it has no address"); - continue; - } - UDPAddress addr = new UDPAddress(ra); - InetAddress remAddr = addr.getHostAddress(); - int port = addr.getPort(); - - OutboundEstablishState qstate = new OutboundEstablishState(_context, remAddr, port, - msg.getTarget().getIdentity(), - new SessionKey(addr.getIntroKey()), addr, - _transport.getDHBuilder()); - OutboundEstablishState old = _outboundStates.putIfAbsent(to, qstate); - if (old != null) - qstate = old; - - for (int i = 0; i < queued.size(); i++) { - OutNetMessage m = queued.get(i); + for (OutNetMessage m : queued) { m.timestamp("no longer deferred... establishing"); - qstate.addMessage(m); + establish(m, false); } admitted++; } @@ -611,7 +656,7 @@ class EstablishmentManager { //peer.setTheyRelayToUsAs(0); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handle completely established (inbound): " + state.getRemoteHostId().toString() + _log.debug("Handle completely established (inbound): " + state + " - " + peer.getRemotePeer()); //if (true) // for now, only support direct @@ -681,6 +726,7 @@ class EstablishmentManager { * ok, fully received, add it to the established cons and send any * queued messages * + * @return the new PeerState */ private PeerState handleCompletelyEstablished(OutboundEstablishState state) { if (state.complete()) { @@ -691,6 +737,11 @@ class EstablishmentManager { long now = _context.clock().now(); RouterIdentity remote = state.getRemoteIdentity(); + // only if == state + RemoteHostId claimed = state.getClaimedAddress(); + if (claimed != null) + _outboundByClaimedAddress.remove(claimed, state); + _outboundByHash.remove(remote.calculateHash(), state); PeerState peer = new PeerState(_context, _transport, state.getSentIP(), state.getSentPort(), remote.calculateHash(), false); peer.setCurrentCipherKey(state.getCipherKey()); @@ -703,7 +754,7 @@ class EstablishmentManager { //peer.setWeRelayToThemAs(0); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handle completely established (outbound): " + state.getRemoteHostId().toString() + _log.debug("Handle completely established (outbound): " + state + " - " + peer.getRemotePeer()); @@ -764,13 +815,13 @@ class EstablishmentManager { } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send created to: " + state.getRemoteHostId().toString()); + _log.debug("Send created to: " + state); try { state.generateSessionKey(); } catch (DHSessionKeyBuilder.InvalidPublicParameterException ippe) { if (_log.shouldLog(Log.ERROR)) - _log.error("Peer " + state.getRemoteHostId() + " sent us an invalid DH parameter (or were spoofed)", ippe); + _log.error("Peer " + state + " sent us an invalid DH parameter (or were spoofed)", ippe); _inboundStates.remove(state.getRemoteHostId()); return; } @@ -783,13 +834,13 @@ class EstablishmentManager { */ private void sendRequest(OutboundEstablishState state) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send SessionRequest to: " + state.getRemoteHostId()); + _log.debug("Send SessionRequest to: " + state); UDPPacket packet = _builder.buildSessionRequestPacket(state); if (packet != null) { _transport.send(packet); } else { if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to build a session request packet for " + state.getRemoteHostId()); + _log.warn("Unable to build a session request packet for " + state); } state.requestSent(); } @@ -816,50 +867,61 @@ class EstablishmentManager { _transport.send(requests[i]); } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send intro for " + state.getRemoteHostId() + " with our intro key as " + _transport.getIntroKey()); + _log.debug("Send intro for " + state + " with our intro key as " + _transport.getIntroKey()); state.introSent(); } void receiveRelayResponse(RemoteHostId bob, UDPPacketReader reader) { long nonce = reader.getRelayResponseReader().readNonce(); OutboundEstablishState state = _liveIntroductions.remove(Long.valueOf(nonce)); - if (state == null) + if (state == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Dup or unknown RelayResponse: " + nonce); return; // already established + } int sz = reader.getRelayResponseReader().readCharlieIPSize(); byte ip[] = new byte[sz]; reader.getRelayResponseReader().readCharlieIP(ip, 0); + int port = reader.getRelayResponseReader().readCharliePort(); InetAddress addr = null; try { + if (!_transport.isValid(ip)) + throw new UnknownHostException("non-public IP"); + if (port <= 0 || port > 65535) + throw new UnknownHostException("bad port " + port); addr = InetAddress.getByAddress(ip); } catch (UnknownHostException uhe) { if (_log.shouldLog(Log.WARN)) - _log.warn("Introducer for " + state + " (" + bob + ") sent us an invalid IP for our target: " + Addresses.toString(ip), uhe); + _log.warn("Introducer for " + state + " (" + bob + ") sent us an invalid address for our target: " + Addresses.toString(ip, port), uhe); // these two cause this peer to requeue for a new intro peer state.introductionFailed(); notifyActivity(); return; } _context.statManager().addRateData("udp.receiveIntroRelayResponse", state.getLifetime(), 0); - int port = reader.getRelayResponseReader().readCharliePort(); if (_log.shouldLog(Log.INFO)) _log.info("Received RelayResponse for " + state.getRemoteIdentity().calculateHash() + " - they are on " - + addr.toString() + ":" + port + " (according to " + bob + ")"); + + addr.toString() + ":" + port + " (according to " + bob + ") nonce=" + nonce); synchronized (state) { RemoteHostId oldId = state.getRemoteHostId(); - state.introduced(addr, ip, port); + state.introduced(ip, port); RemoteHostId newId = state.getRemoteHostId(); - // Swap out the RemoteHostId the state is indexed under - // TODO only if !oldId.equals(newId) ? synch? - // FIXME if the RemoteHostIDs aren't the same we have problems - // FIXME if the RemoteHostIDs aren't the same the SessionCreated signature is probably going to fail - // Common occurrence - port changes + // Swap out the RemoteHostId the state is indexed under. + // It was a Hash, change it to a IP/port. + // Remove the entry in the byClaimedAddress map as it's now in main map. + // Add an entry in the byHash map so additional OB pkts can find it. + _outboundByHash.put(state.getRemoteIdentity().calculateHash(), state); + RemoteHostId claimed = state.getClaimedAddress(); if (!oldId.equals(newId)) { _outboundStates.remove(oldId); _outboundStates.put(newId, state); if (_log.shouldLog(Log.WARN)) - _log.warn("RR replaced " + oldId + " with " + newId + " -> " + state); + _log.warn("RR replaced " + oldId + " with " + newId + ", claimed address was " + claimed); } + // + if (claimed != null) + _outboundByClaimedAddress.remove(oldId, state); // only if == state } notifyActivity(); } @@ -872,8 +934,13 @@ class EstablishmentManager { */ private void sendConfirmation(OutboundEstablishState state) { boolean valid = state.validateSessionCreated(); - if (!valid) // validate clears fields on failure + if (!valid) { + // validate clears fields on failure + // TODO - send destroy? shitlist? + if (_log.shouldLog(Log.WARN)) + _log.warn("SessionCreated validate failed: " + state); return; + } if (!_transport.isValid(state.getReceivedIP()) || !_transport.isValid(state.getRemoteHostId().getIP())) { state.fail(); @@ -936,7 +1003,7 @@ class EstablishmentManager { /** * Drive through the inbound establishment states, adjusting one of them - * as necessary + * as necessary. Called from Establisher thread only. * @return next requested time or -1 */ private long handleInbound() { @@ -1053,7 +1120,7 @@ class EstablishmentManager { /** * Drive through the outbound establishment states, adjusting one of them - * as necessary + * as necessary. Called from Establisher thread only. * @return next requested time or -1 */ private long handleOutbound() { @@ -1073,7 +1140,7 @@ class EstablishmentManager { if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing confirmed outbound: " + cur); break; - } else if (cur.getLifetime() > MAX_OB_ESTABLISH_TIME) { + } else if (cur.getLifetime() >= MAX_OB_ESTABLISH_TIME) { // took too long iter.remove(); outboundState = cur; @@ -1116,7 +1183,7 @@ class EstablishmentManager { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Processing for outbound: " + outboundState); synchronized (outboundState) { - boolean expired = outboundState.getLifetime() > MAX_OB_ESTABLISH_TIME; + boolean expired = outboundState.getLifetime() >= MAX_OB_ESTABLISH_TIME; switch (outboundState.getState()) { case OB_STATE_UNKNOWN: // fall thru case OB_STATE_INTRODUCED: @@ -1129,7 +1196,7 @@ class EstablishmentManager { case OB_STATE_REQUEST_SENT: // no response yet (or it was invalid), lets retry long rtime = outboundState.getRequestSentTime(); - if (expired || (rtime > 0 && rtime + OB_MESSAGE_TIMEOUT < now)) + if (expired || (rtime > 0 && rtime + OB_MESSAGE_TIMEOUT <= now)) processExpired(outboundState); else if (outboundState.getNextSendTime() <= now) sendRequest(outboundState); @@ -1144,7 +1211,7 @@ class EstablishmentManager { case OB_STATE_CONFIRMED_PARTIALLY: long ctime = outboundState.getConfirmedSentTime(); - if (expired || (ctime > 0 && ctime + OB_MESSAGE_TIMEOUT < now)) { + if (expired || (ctime > 0 && ctime + OB_MESSAGE_TIMEOUT <= now)) { sendDestroy(outboundState); processExpired(outboundState); } else if (outboundState.getNextSendTime() <= now) { @@ -1161,7 +1228,7 @@ class EstablishmentManager { case OB_STATE_PENDING_INTRO: long itime = outboundState.getIntroSentTime(); - if (expired || (itime > 0 && itime + OB_MESSAGE_TIMEOUT < now)) + if (expired || (itime > 0 && itime + OB_MESSAGE_TIMEOUT <= now)) processExpired(outboundState); else if (outboundState.getNextSendTime() <= now) handlePendingIntro(outboundState); @@ -1186,16 +1253,21 @@ class EstablishmentManager { boolean removed = _liveIntroductions.remove(Long.valueOf(nonce), outboundState); if (removed) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send intro for " + outboundState.getRemoteHostId() + " timed out"); + _log.debug("Send intro for " + outboundState + " timed out"); _context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0); } } + // only if == state + RemoteHostId claimed = outboundState.getClaimedAddress(); + if (claimed != null) + _outboundByClaimedAddress.remove(claimed, outboundState); + _outboundByHash.remove(outboundState.getRemoteIdentity().calculateHash(), outboundState); // should have already been removed in handleOutbound() above // remove only if value == state boolean removed = _outboundStates.remove(outboundState.getRemoteHostId(), outboundState); if (outboundState.getState() != OB_STATE_CONFIRMED_COMPLETELY) { if (_log.shouldLog(Log.INFO)) - _log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime()); + _log.info("Expired: " + outboundState + " Lifetime: " + outboundState.getLifetime()); OutNetMessage msg; while ((msg = outboundState.getNextQueuedMessage()) != null) { _transport.failed(msg, "Expired during failed establish"); @@ -1244,54 +1316,92 @@ class EstablishmentManager { _inboundStates.clear(); _outboundStates.clear(); _queuedOutbound.clear(); - _liveIntroductions.clear(); + _outboundByClaimedAddress.clear(); + _outboundByHash.clear(); } - } - // Debugging - private long _lastPrinted; - private static final long PRINT_INTERVAL = 5*1000; + private long _lastFailsafe; + private static final long FAILSAFE_INTERVAL = 3*60*1000; + // Debugging + private long _lastPrinted; + private static final long PRINT_INTERVAL = 5*1000; - private void doPass() { - if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) { - _lastPrinted = _context.clock().now(); - int iactive = _inboundStates.size(); - int oactive = _outboundStates.size(); - if (iactive > 0 || oactive > 0) { - int queued = _queuedOutbound.size(); - int live = _liveIntroductions.size(); - _log.debug("OB states: " + oactive + " IB states: " + iactive + - " OB queued: " + queued + " intros: " + live); + private void doPass() { + if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) { + _lastPrinted = _context.clock().now(); + int iactive = _inboundStates.size(); + int oactive = _outboundStates.size(); + if (iactive > 0 || oactive > 0) { + int queued = _queuedOutbound.size(); + int live = _liveIntroductions.size(); + int claimed = _outboundByClaimedAddress.size(); + int hash = _outboundByHash.size(); + _log.debug("OB states: " + oactive + " IB states: " + iactive + + " OB queued: " + queued + " intros: " + live + + " OB claimed: " + claimed + " hash: " + hash); + } + } + _activity = 0; + long now = _context.clock().now(); + if (_lastFailsafe + FAILSAFE_INTERVAL < _context.clock().now()) { + _lastFailsafe = _context.clock().now(); + doFailsafe(); + } + long nextSendTime = -1; + long nextSendInbound = handleInbound(); + long nextSendOutbound = handleOutbound(); + if (nextSendInbound > 0) + nextSendTime = nextSendInbound; + if ( (nextSendTime < 0) || (nextSendOutbound < nextSendTime) ) + nextSendTime = nextSendOutbound; + + long delay = nextSendTime - now; + if ( (nextSendTime == -1) || (delay > 0) ) { + if (delay > 1000) + delay = 1000; + try { + synchronized (_activityLock) { + if (_activity > 0) + return; + if (nextSendTime == -1) + _activityLock.wait(1000); + else + _activityLock.wait(delay); + } + } catch (InterruptedException ie) { + } + // if (_log.shouldLog(Log.DEBUG)) + // _log.debug("After waiting w/ nextSend=" + nextSendTime + // + " and delay=" + delay + " and interrupted=" + interrupted); } } - _activity = 0; - long now = _context.clock().now(); - long nextSendTime = -1; - long nextSendInbound = handleInbound(); - long nextSendOutbound = handleOutbound(); - if (nextSendInbound > 0) - nextSendTime = nextSendInbound; - if ( (nextSendTime < 0) || (nextSendOutbound < nextSendTime) ) - nextSendTime = nextSendOutbound; - long delay = nextSendTime - now; - if ( (nextSendTime == -1) || (delay > 0) ) { - if (delay > 1000) - delay = 1000; - try { - synchronized (_activityLock) { - if (_activity > 0) - return; - if (nextSendTime == -1) - _activityLock.wait(1000); - else - _activityLock.wait(delay); + /** @since 0.9.2 */ + private void doFailsafe() { + for (Iterator iter = _liveIntroductions.values().iterator(); iter.hasNext(); ) { + OutboundEstablishState state = iter.next(); + if (state.getLifetime() > 3*MAX_OB_ESTABLISH_TIME) { + iter.remove(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Failsafe remove LI " + state); + } + } + for (Iterator iter = _outboundByClaimedAddress.values().iterator(); iter.hasNext(); ) { + OutboundEstablishState state = iter.next(); + if (state.getLifetime() > 3*MAX_OB_ESTABLISH_TIME) { + iter.remove(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Failsafe remove OBBCA " + state); + } + } + for (Iterator iter = _outboundByHash.values().iterator(); iter.hasNext(); ) { + OutboundEstablishState state = iter.next(); + if (state.getLifetime() > 3*MAX_OB_ESTABLISH_TIME) { + iter.remove(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Failsafe remove OBBH " + state); } - } catch (InterruptedException ie) { } - // if (_log.shouldLog(Log.DEBUG)) - // _log.debug("After waiting w/ nextSend=" + nextSendTime - // + " and delay=" + delay + " and interrupted=" + interrupted); } } } diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java index 7412562f83..6540293d68 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java @@ -83,6 +83,10 @@ class InboundEstablishState { /** max delay including backoff */ private static final long MAX_DELAY = 15*1000; + /** + * @param localPort Must be our external port, otherwise the signature of the + & SessionCreated message will be bad if the external port != the internal port. + */ public InboundEstablishState(RouterContext ctx, byte remoteIP[], int remotePort, int localPort, DHSessionKeyBuilder dh) { _context = ctx; diff --git a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java index af3d28f446..c539af49d2 100644 --- a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java +++ b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java @@ -12,6 +12,8 @@ import net.i2p.data.RouterAddress; import net.i2p.data.RouterInfo; import net.i2p.data.SessionKey; import net.i2p.router.RouterContext; +import net.i2p.router.transport.TransportImpl; +import net.i2p.util.Addresses; import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; @@ -141,12 +143,16 @@ class IntroductionManager { _log.info("Peer is idle too long: " + cur); continue; } + byte[] ip = cur.getRemoteIP(); + int port = cur.getRemotePort(); + if (ip == null || !TransportImpl.isPubliclyRoutable(ip) || port <= 0 || port > 65535) + continue; if (_log.shouldLog(Log.INFO)) _log.info("Picking introducer: " + cur); cur.setIntroducerTime(); UDPAddress ura = new UDPAddress(ra); - ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, cur.getRemoteHostId().toHostString()); - ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(cur.getRemotePort())); + ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, Addresses.toString(ip)); + ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(port)); ssuOptions.setProperty(UDPAddress.PROP_INTRO_KEY_PREFIX + found, Base64.encode(ura.getIntroKey())); ssuOptions.setProperty(UDPAddress.PROP_INTRO_TAG_PREFIX + found, String.valueOf(cur.getTheyRelayToUsAs())); found++; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java index c67a7e3bad..6e94173c5f 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java @@ -50,6 +50,7 @@ class OutboundEstablishState { private long _lastSend; private long _nextSend; private RemoteHostId _remoteHostId; + private final RemoteHostId _claimedAddress; private final RouterIdentity _remotePeer; private final SessionKey _introKey; private final Queue _queuedMessages; @@ -91,22 +92,25 @@ class OutboundEstablishState { private static final long MAX_DELAY = 15*1000; /** + * @param claimedAddress an IP/port based RemoteHostId, or null if unknown + * @param remoteHostId non-null, == claimedAddress if direct, or a hash-based one if indirect * @param addr non-null */ - public OutboundEstablishState(RouterContext ctx, InetAddress remoteHost, int remotePort, + public OutboundEstablishState(RouterContext ctx, RemoteHostId claimedAddress, + RemoteHostId remoteHostId, RouterIdentity remotePeer, SessionKey introKey, UDPAddress addr, DHSessionKeyBuilder dh) { _context = ctx; _log = ctx.logManager().getLog(OutboundEstablishState.class); - if ( (remoteHost != null) && (remotePort > 0) ) { - _bobIP = remoteHost.getAddress(); - _bobPort = remotePort; - _remoteHostId = new RemoteHostId(_bobIP, _bobPort); + if (claimedAddress != null) { + _bobIP = claimedAddress.getIP(); + _bobPort = claimedAddress.getPort(); } else { - _bobIP = null; + //_bobIP = null; _bobPort = -1; - _remoteHostId = new RemoteHostId(remotePeer.calculateHash().getData()); } + _claimedAddress = claimedAddress; + _remoteHostId = remoteHostId; _remotePeer = remotePeer; _introKey = introKey; _queuedMessages = new LinkedBlockingQueue(); @@ -173,9 +177,17 @@ class OutboundEstablishState { } public byte[] getSentX() { return _sentX; } - /** the remote side (Bob) */ + + /** + * The remote side (Bob) - note that in some places he's called Charlie. + * Warning - may change after introduction. May be null before introduction. + */ public synchronized byte[] getSentIP() { return _bobIP; } - /** the remote side (Bob) */ + + /** + * The remote side (Bob) - note that in some places he's called Charlie. + * Warning - may change after introduction. May be -1 before introduction. + */ public synchronized int getSentPort() { return _bobPort; } public synchronized void receiveSessionCreated(UDPPacketReader.SessionCreatedReader reader) { @@ -409,7 +421,8 @@ class OutboundEstablishState { delay = RETRANSMIT_DELAY; _confirmedSentTime = _lastSend; } else { - delay = Math.min(RETRANSMIT_DELAY << _confirmedSentCount, MAX_DELAY); + delay = Math.min(RETRANSMIT_DELAY << _confirmedSentCount, + _confirmedSentTime + EstablishmentManager.OB_MESSAGE_TIMEOUT - _lastSend); } _confirmedSentCount++; _nextSend = _lastSend + delay; @@ -437,7 +450,8 @@ class OutboundEstablishState { delay = RETRANSMIT_DELAY; _requestSentTime = _lastSend; } else { - delay = Math.min(RETRANSMIT_DELAY << _requestSentCount, MAX_DELAY); + delay = Math.min(RETRANSMIT_DELAY << _requestSentCount, + _requestSentTime + EstablishmentManager.OB_MESSAGE_TIMEOUT - _lastSend); } _requestSentCount++; _nextSend = _lastSend + delay; @@ -463,7 +477,8 @@ class OutboundEstablishState { delay = RETRANSMIT_DELAY; _introSentTime = _lastSend; } else { - delay = Math.min(RETRANSMIT_DELAY << _introSentCount, MAX_DELAY); + delay = Math.min(RETRANSMIT_DELAY << _introSentCount, + _introSentTime + EstablishmentManager.OB_MESSAGE_TIMEOUT - _lastSend); } _introSentCount++; _nextSend = _lastSend + delay; @@ -484,17 +499,24 @@ class OutboundEstablishState { } /** - * This changes the remoteHostId from a hash-based one to a IP/Port one, - * OR the IP or port could change. + * This changes the remoteHostId from a hash-based one or possibly + * incorrect IP/port to what the introducer told us. + * All params are for the remote end (NOT the introducer) and must have been validated already. */ - public synchronized void introduced(InetAddress bob, byte bobIP[], int bobPort) { + public synchronized void introduced(byte bobIP[], int bobPort) { if (_currentState != OutboundState.OB_STATE_PENDING_INTRO) return; // we've already successfully been introduced, so don't overwrite old settings _nextSend = _context.clock().now() + 500; // wait briefly for the hole punching _currentState = OutboundState.OB_STATE_INTRODUCED; - _bobIP = bobIP; - _bobPort = bobPort; - _remoteHostId = new RemoteHostId(bobIP, bobPort); + if (_claimedAddress != null && bobPort == _bobPort && DataHelper.eq(bobIP, _bobIP)) { + // he's who he said he was + _remoteHostId = _claimedAddress; + } else { + // no IP/port or wrong IP/port in RI + _bobIP = bobIP; + _bobPort = bobPort; + _remoteHostId = new RemoteHostId(bobIP, bobPort); + } if (_log.shouldLog(Log.INFO)) _log.info("Introduced to " + _remoteHostId + ", now lets get on with establishing"); } @@ -504,9 +526,23 @@ class OutboundEstablishState { public long getEstablishBeginTime() { return _establishBegin; } public synchronized long getNextSendTime() { return _nextSend; } - /** uniquely identifies an attempt */ + /** + * This should be what the state is currently indexed by in the _outboundStates table. + * Beware - + * During introduction, this is a router hash. + * After introduced() is called, this is set to the IP/port the introducer told us. + * @return non-null + */ RemoteHostId getRemoteHostId() { return _remoteHostId; } + /** + * This will never be a hash-based address. + * This is the 'claimed' (unverified) address from the netdb, or null. + * It is not changed after introduction. Use getRemoteHostId() for the verified address. + * @return may be null + */ + RemoteHostId getClaimedAddress() { return _claimedAddress; } + /** we have received a real data packet, so we're done establishing */ public synchronized void dataReceived() { packetReceived(); diff --git a/router/java/src/net/i2p/router/transport/udp/RemoteHostId.java b/router/java/src/net/i2p/router/transport/udp/RemoteHostId.java index 399b82eec8..7f4b2fa4b2 100644 --- a/router/java/src/net/i2p/router/transport/udp/RemoteHostId.java +++ b/router/java/src/net/i2p/router/transport/udp/RemoteHostId.java @@ -2,6 +2,7 @@ package net.i2p.router.transport.udp; import net.i2p.data.Base64; import net.i2p.data.DataHelper; +import net.i2p.data.Hash; import net.i2p.util.Addresses; /** @@ -13,7 +14,7 @@ import net.i2p.util.Addresses; final class RemoteHostId { private final byte _ip[]; private final int _port; - private final byte _peerHash[]; + private final Hash _peerHash; private final int _hashCode; /** direct */ @@ -22,11 +23,11 @@ final class RemoteHostId { } /** indirect */ - public RemoteHostId(byte peerHash[]) { + public RemoteHostId(Hash peerHash) { this(null, 0, peerHash); } - private RemoteHostId(byte ip[], int port, byte peerHash[]) { + private RemoteHostId(byte ip[], int port, Hash peerHash) { _ip = ip; _port = port; _peerHash = peerHash; @@ -40,7 +41,7 @@ final class RemoteHostId { public int getPort() { return _port; } /** @return null if direct */ - public byte[] getPeerHash() { return _peerHash; } + public Hash getPeerHash() { return _peerHash; } @Override public int hashCode() { @@ -58,18 +59,11 @@ final class RemoteHostId { } @Override - public String toString() { return toString(true); } - - private String toString(boolean includePort) { + public String toString() { if (_ip != null) { - if (includePort) - return Addresses.toString(_ip, _port); - else - return Addresses.toString(_ip); + return Addresses.toString(_ip, _port); } else { - return Base64.encode(_peerHash); + return _peerHash.toString(); } } - - public String toHostString() { return toString(false); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPAddress.java b/router/java/src/net/i2p/router/transport/udp/UDPAddress.java index 07a8447831..0305b0516a 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPAddress.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPAddress.java @@ -39,31 +39,7 @@ public class UDPAddress { static final int MAX_INTRODUCERS = 3; public UDPAddress(RouterAddress addr) { - parse(addr); - } - - @Override - public String toString() { - StringBuilder rv = new StringBuilder(64); - if (_introHosts != null) { - for (int i = 0; i < _introHosts.length; i++) { - rv.append("ssu://"); - rv.append(_introTags[i]).append('@'); - rv.append(_introHosts[i]).append(':').append(_introPorts[i]); - //rv.append('/').append(Base64.encode(_introKeys[i])); - if (i + 1 < _introKeys.length) - rv.append(", "); - } - } else { - if ( (_host != null) && (_port > 0) ) - rv.append("ssu://").append(_host).append(':').append(_port);//.append('/').append(Base64.encode(_introKey)); - else - rv.append("ssu://autodetect.not.yet.complete:").append(_port); - } - return rv.toString(); - } - - private void parse(RouterAddress addr) { + // TODO make everything final if (addr == null) return; _host = addr.getOption(PROP_HOST); if (_host != null) _host = _host.trim(); @@ -198,4 +174,25 @@ public class UDPAddress { int getMTU() { return _mtu; } + + @Override + public String toString() { + StringBuilder rv = new StringBuilder(64); + if (_introHosts != null) { + for (int i = 0; i < _introHosts.length; i++) { + rv.append("ssu://"); + rv.append(_introTags[i]).append('@'); + rv.append(_introHosts[i]).append(':').append(_introPorts[i]); + //rv.append('/').append(Base64.encode(_introKeys[i])); + if (i + 1 < _introKeys.length) + rv.append(", "); + } + } else { + if ( (_host != null) && (_port > 0) ) + rv.append("ssu://").append(_host).append(':').append(_port);//.append('/').append(Base64.encode(_introKey)); + else + rv.append("ssu://autodetect.not.yet.complete:").append(_port); + } + return rv.toString(); + } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java index 32c4b6c6c8..e8c138a1c5 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java @@ -490,22 +490,26 @@ class UDPPacketReader { * Helper class to fetch the particular bitfields from the raw packet */ private class PacketACKBitfield implements ACKBitfield { - private int _start; - private int _bitfieldStart; - private int _bitfieldSize; + private final int _start; + private final int _bitfieldStart; + private final int _bitfieldSize; + public PacketACKBitfield(int start) { _start = start; _bitfieldStart = start + 4; - _bitfieldSize = 1; + int bfsz = 1; // bitfield is an array of bytes where the high bit is 1 if // further bytes in the bitfield follow - while ((_message[_bitfieldStart + _bitfieldSize - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0) - _bitfieldSize++; + while ((_message[_bitfieldStart + bfsz - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0) + bfsz++; + _bitfieldSize = bfsz; } + public long getMessageId() { return DataHelper.fromLong(_message, _start, 4); } public int getByteLength() { return 4 + _bitfieldSize; } public int fragmentCount() { return _bitfieldSize * 7; } public boolean receivedComplete() { return false; } + public boolean received(int fragmentNum) { if ( (fragmentNum < 0) || (fragmentNum >= _bitfieldSize*7) ) return false; @@ -514,6 +518,7 @@ class UDPPacketReader { int flagNum = fragmentNum % 7; return (_message[byteNum] & (1 << flagNum)) != 0x0; } + @Override public String toString() { StringBuilder buf = new StringBuilder(64); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 68d1b9fb30..91e239807d 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -689,6 +689,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority * Was true before 0.9.2 * Now false if we need introducers (as perhaps that's why we need them, * our firewall is changing our port), unless overridden by the property. + * We must have an accurate external port when firewalled, or else + * our signature of the SessionCreated packet will be invalid. */ private boolean getIsPortFixed() { String prop = _context.getProperty(PROP_FIXED_PORT); @@ -797,6 +799,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority RemoteHostId remoteId = peer.getRemoteHostId(); if (remoteId == null) return false; + // Should always be direct... except maybe for hidden mode? + // or do we always know the IP by now? if (remoteId.getIP() == null && _log.shouldLog(Log.WARN)) _log.warn("Add indirect: " + peer); @@ -1114,8 +1118,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (peer.getCurrentCipherKey() == null) return; UDPPacket pkt = _destroyBuilder.buildSessionDestroyPacket(peer); - if (_log.shouldLog(Log.WARN)) - _log.warn("Sending destroy to : " + peer); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending destroy to : " + peer); send(pkt); }