- Fix memory leak in _peersByRemoteHost map caused by not
     removing peers that change IP or port
   - Send keepalives if firewalled
   - Handle peers that change ports on an established session
   - Synchronize adds and drops
   - Don't use peers with high RTTs in clock skew calculation
   - Reduce initial RTT/RTO
This commit is contained in:
zzz
2012-10-02 18:36:06 +00:00
parent e130264254
commit 4dc90ef5da
5 changed files with 253 additions and 56 deletions

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 4;
public final static long BUILD = 5;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -1,11 +1,14 @@
package net.i2p.router.transport.udp;
import java.util.Date;
import java.util.List;
import java.util.Map;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.data.DataHelper;
import net.i2p.util.I2PThread;
import net.i2p.util.LHMCache;
import net.i2p.util.Log;
/**
@ -30,6 +33,8 @@ class PacketHandler {
private final IntroductionManager _introManager;
private volatile boolean _keepReading;
private final Handler[] _handlers;
private final Map<RemoteHostId, Object> _failCache;
private static final Object DUMMY = new Object();
private static final int MIN_NUM_HANDLERS = 1; // unless < 32MB
private static final int MAX_NUM_HANDLERS = 1;
@ -46,6 +51,7 @@ class PacketHandler {
_inbound = inbound;
_testManager = testManager;
_introManager = introManager;
_failCache = new LHMCache(24);
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
@ -143,8 +149,8 @@ class PacketHandler {
if (packet == null) break; // keepReading is probably false, or bind failed...
packet.received();
if (_log.shouldLog(Log.INFO))
_log.info("Received: " + packet);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received: " + packet);
_state = 4;
long queueTime = packet.getLifetime();
long handleStart = _context.clock().now();
@ -294,7 +300,7 @@ class PacketHandler {
receivePacket(reader, packet, est, false);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP");
_log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP " + packet);
_context.statManager().addRateData("udp.droppedInvalidReestablish", packet.getLifetime(), packet.getExpiration());
}
return;
@ -327,8 +333,70 @@ class PacketHandler {
if (!isValid) {
// Note that the vast majority of these are NOT corrupted packets, but
// packets for which we don't have the PeerState (i.e. SessionKey)
// Case 1: 48 byte destroy packet, we already closed
// Case 2: 369 byte session created packet, re-tx of one that failed validation
// (peer probably doesn't know his correct external port, esp. on <= 0.9.1
// Case 3:
// For peers that change ports, look for an existing session with the same IP
// If we find it, and the packet validates with its mac key, tell the transport
// to change the port, and handle the packet.
// All this since 0.9.3.
RemoteHostId remoteHost = packet.getRemoteHost();
boolean alreadyFailed;
synchronized(_failCache) {
alreadyFailed = _failCache.get(remoteHost) != null;
}
if (!alreadyFailed) {
// this is slow, that's why we cache it above.
List<PeerState> peers = _transport.getPeerStatesByIP(remoteHost);
if (!peers.isEmpty()) {
StringBuilder buf = new StringBuilder(256);
buf.append("Established peers with this IP: ");
boolean foundSamePort = false;
PeerState state = null;
int newPort = remoteHost.getPort();
for (PeerState ps : peers) {
boolean valid = false;
long now = _context.clock().now();
if (_log.shouldLog(Log.WARN))
buf.append(ps.getRemoteHostId().toString())
.append(" last sent: ").append(now - ps.getLastSendTime())
.append(" last rcvd: ").append(now - ps.getLastReceiveTime());
if (ps.getRemotePort() == newPort) {
foundSamePort = true;
} else if (packet.validate(ps.getCurrentMACKey())) {
packet.decrypt(ps.getCurrentCipherKey());
reader.initialize(packet);
if (_log.shouldLog(Log.WARN))
buf.append(" VALID type ").append(reader.readPayloadType()).append("; ");
valid = true;
if (state == null)
state = ps;
} else {
if (_log.shouldLog(Log.WARN))
buf.append(" INVALID; ");
}
}
if (state != null && !foundSamePort) {
_transport.changePeerPort(state, newPort);
if (_log.shouldLog(Log.WARN)) {
buf.append(" CHANGED PORT TO ").append(newPort).append(" AND HANDLED");
_log.warn(buf.toString());
}
handlePacket(reader, packet, state, null, null, true);
return;
}
if (_log.shouldLog(Log.WARN))
_log.warn(buf.toString());
}
synchronized(_failCache) {
_failCache.put(remoteHost, DUMMY);
}
}
if (_log.shouldLog(Log.WARN))
_log.warn("Cannot validate rcvd pkt (path): " + packet);
_log.warn("Cannot validate rcvd pkt (path) wasCached? " + alreadyFailed + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration());
switch (peerType) {
case INBOUND_FALLBACK:

View File

@ -137,11 +137,11 @@ class PeerState {
/** what IP is the peer sending and receiving packets on? */
private final byte[] _remoteIP;
/** cached IP address */
private transient InetAddress _remoteIPAddress;
private volatile InetAddress _remoteIPAddress;
/** what port is the peer sending and receiving packets on? */
private final int _remotePort;
private volatile int _remotePort;
/** cached RemoteHostId, used to find the peerState by remote info */
private final RemoteHostId _remoteHostId;
private volatile RemoteHostId _remoteHostId;
/** if we need to contact them, do we need to talk to an introducer? */
//private boolean _remoteRequiresIntroduction;
@ -284,8 +284,10 @@ class PeerState {
*/
public static final int LARGE_MTU = 1484;
/** 600 */
private static final int MIN_RTO = 100 + ACKSender.ACK_FREQUENCY;
private static final int INIT_RTO = 4*1000;
private static final int INIT_RTO = 3*1000;
public static final int INIT_RTT = INIT_RTO / 2;
private static final int MAX_RTO = 15*1000;
public PeerState(RouterContext ctx, UDPTransport transport,
@ -313,7 +315,7 @@ class PeerState {
//_mtuLastChecked = -1;
_lastACKSend = -1;
_rto = INIT_RTO;
_rtt = INIT_RTO / 2;
_rtt = INIT_RTT;
_rttDeviation = _rtt;
_inboundMessages = new HashMap(8);
_outboundMessages = new ArrayList(32);
@ -325,6 +327,17 @@ class PeerState {
_remoteHostId = new RemoteHostId(remoteIP, remotePort);
}
/**
* Caller should sync; UDPTransport must remove and add to peersByRemoteHost map
* @since 0.9.3
*/
public void changePort(int newPort) {
if (newPort != _remotePort) {
_remoteHostId = new RemoteHostId(_remoteIP, newPort);
_remotePort = newPort;
}
}
/**
* The peer are we talking to. This should be set as soon as this
* state is created if we are initiating a connection, but if we are
@ -342,9 +355,12 @@ class PeerState {
* connection is established.
*/
public SessionKey getCurrentCipherKey() { return _currentCipherKey; }
/**
* The pending AES key for verifying packets if we are rekeying the
* connection, or null if we are not in the process of rekeying.
*
* @return null always, rekeying unimplemented
*/
public SessionKey getNextMACKey() { return _nextMACKey; }
@ -352,6 +368,8 @@ class PeerState {
* The pending AES key for encrypting/decrypting packets if we are
* rekeying the connection, or null if we are not in the process
* of rekeying.
*
* @return null always, rekeying unimplemented
*/
public SessionKey getNextCipherKey() { return _nextCipherKey; }
@ -1213,6 +1231,7 @@ class PeerState {
// return MAX_RTO;
}
/** @return non-null */
RemoteHostId getRemoteHostId() { return _remoteHostId; }
/**
@ -1875,6 +1894,9 @@ class PeerState {
buf.append(" consecFail: ").append(_consecutiveFailedSends);
buf.append(" recv OK/Dup: ").append(_packetsReceived).append('/').append(_packetsReceivedDuplicate);
buf.append(" send OK/Dup: ").append(_packetsTransmitted).append('/').append(_packetsRetransmitted);
buf.append(" IBM: ").append(_inboundMessages.size());
buf.append(" OBQ: ").append(_outboundQueue.size());
buf.append(" OBL: ").append(_outboundMessages.size());
return buf.toString();
}
}

View File

@ -51,6 +51,7 @@ import net.i2p.util.Translate;
public class UDPTransport extends TransportImpl implements TimedWeightedPriorityMessageQueue.FailedListener {
private final Log _log;
private UDPEndpoint _endpoint;
private final Object _addDropLock = new Object();
/** Peer (Hash) to PeerState */
private final Map<Hash, PeerState> _peersByIdent;
/** RemoteHostId to PeerState */
@ -404,7 +405,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/
SessionKey getIntroKey() { return _introKey; }
public int getLocalPort() { return _externalListenPort; }
/** @deprecated unused */
public int getLocalPort() {
return _endpoint != null ? _endpoint.getListenPort() : -1;
}
public InetAddress getLocalAddress() { return _externalListenHost; }
public int getExternalPort() { return _externalListenPort; }
@ -720,6 +725,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
PeerState getPeerState(RemoteHostId hostInfo) {
return _peersByRemoteHost.get(hostInfo);
}
/**
* Get the states for all peers at the given remote host, ignoring port.
* Used for a last-chance search for a peer that changed port, by PacketHandler.
* @since 0.9.3
*/
List<PeerState> getPeerStatesByIP(RemoteHostId hostInfo) {
List<PeerState> rv = new ArrayList(4);
byte[] ip = hostInfo.getIP();
if (ip != null) {
for (PeerState ps : _peersByIdent.values()) {
if (DataHelper.eq(ip, ps.getRemoteIP()))
rv.add(ps);
}
}
return rv;
}
/**
* get the state for the peer with the given ident, or null
@ -729,6 +751,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return _peersByIdent.get(remotePeer);
}
/**
* Remove and add to peersByRemoteHost map
* @since 0.9.3
*/
public void changePeerPort(PeerState peer, int newPort) {
int oldPort;
synchronized (_addDropLock) {
oldPort = peer.getRemotePort();
if (oldPort != newPort) {
_peersByRemoteHost.remove(peer.getRemoteHostId());
peer.changePort(newPort);
_peersByRemoteHost.put(peer.getRemoteHostId(), peer);
}
}
if (_log.shouldLog(Log.WARN) && oldPort != newPort)
_log.warn("Changed port from " + oldPort + " to " + newPort + " for " + peer);
}
/**
* For IntroductionManager
* @return may be null if not started
@ -799,47 +839,69 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
boolean addRemotePeerState(PeerState peer) {
if (_log.shouldLog(Log.INFO))
_log.info("Add remote peer state: " + peer);
synchronized(_addDropLock) {
return locked_addRemotePeerState(peer);
}
}
private boolean locked_addRemotePeerState(PeerState peer) {
Hash remotePeer = peer.getRemotePeer();
long oldEstablishedOn = -1;
PeerState oldPeer = null;
if (remotePeer != null) {
oldPeer = _peersByIdent.put(remotePeer, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) {
// transfer over the old state/inbound message fragments/etc
peer.loadFrom(oldPeer);
oldEstablishedOn = oldPeer.getKeyEstablishedTime();
}
oldPeer = _peersByIdent.put(remotePeer, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Peer already connected (PBID): old=" + oldPeer + " new=" + peer);
// transfer over the old state/inbound message fragments/etc
peer.loadFrom(oldPeer);
oldEstablishedOn = oldPeer.getKeyEstablishedTime();
}
}
RemoteHostId remoteId = peer.getRemoteHostId();
if (oldPeer != null) {
oldPeer.dropOutbound();
_introManager.remove(oldPeer);
_expireEvent.remove(oldPeer);
RemoteHostId oldID = oldPeer.getRemoteHostId();
if (!remoteId.equals(oldID)) {
// leak fix, remove old address
if (_log.shouldLog(Log.WARN))
_log.warn(remotePeer + " changed address FROM " + oldID + " TO " + remoteId);
PeerState oldPeer2 = _peersByRemoteHost.remove(oldID);
// different ones in the two maps? shouldn't happen
if (oldPeer2 != oldPeer) {
oldPeer2.dropOutbound();
_introManager.remove(oldPeer2);
_expireEvent.remove(oldPeer2);
}
}
}
oldPeer = null;
RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId == null) return false;
// Should always be direct... except maybe for hidden mode?
// or do we always know the IP by now?
if (remoteId.getIP() == null && _log.shouldLog(Log.WARN))
_log.warn("Add indirect: " + peer);
oldPeer = _peersByRemoteHost.put(remoteId, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) {
// don't do this twice
PeerState oldPeer2 = _peersByRemoteHost.put(remoteId, peer);
if (oldPeer2 != null && oldPeer2 != peer && oldPeer2 != oldPeer) {
// this shouldn't happen, should have been removed above
if (_log.shouldLog(Log.WARN))
_log.warn("Peer already connected (PBRH): old=" + oldPeer2 + " new=" + peer);
// transfer over the old state/inbound message fragments/etc
peer.loadFrom(oldPeer);
oldEstablishedOn = oldPeer.getKeyEstablishedTime();
oldPeer2.dropOutbound();
_introManager.remove(oldPeer2);
_expireEvent.remove(oldPeer2);
}
if (oldPeer != null) {
oldPeer.dropOutbound();
_introManager.remove(oldPeer);
_expireEvent.remove(oldPeer);
}
if ( (oldPeer != null) && (_log.shouldLog(Log.WARN)) )
_log.warn("Peer already connected: old=" + oldPeer + " new=" + peer, new Exception("dup"));
if (_log.shouldLog(Log.WARN) && _peersByIdent.size() != _peersByRemoteHost.size())
_log.warn("Size Mismatch after add: " + peer
+ " byIDsz = " + _peersByIdent.size()
+ " byHostsz = " + _peersByRemoteHost.size());
_activeThrottle.unchoke(peer.getRemotePeer());
markReachable(peer.getRemotePeer(), peer.isInbound());
@ -996,15 +1058,20 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/
_log.info(buf.toString(), new Exception("Dropped by"));
}
synchronized(_addDropLock) {
locked_dropPeer(peer, shouldShitlist, why);
}
if (needsRebuild())
rebuildExternalAddress();
}
private void locked_dropPeer(PeerState peer, boolean shouldShitlist, String why) {
peer.dropOutbound();
peer.expireInboundMessages();
_introManager.remove(peer);
_fragments.dropPeer(peer);
PeerState altByIdent = null;
PeerState altByHost = null;
if (peer.getRemotePeer() != null) {
dropPeerCapacities(peer);
@ -1018,9 +1085,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId != null) {
altByHost = _peersByRemoteHost.remove(remoteId);
}
PeerState altByHost = _peersByRemoteHost.remove(remoteId);
if (altByIdent != altByHost && _log.shouldLog(Log.WARN))
_log.warn("Mismatch on remove, RHID = " + remoteId
+ " byID = " + altByIdent
+ " byHost = " + altByHost
+ " byIDsz = " + _peersByIdent.size()
+ " byHostsz = " + _peersByRemoteHost.size());
// unchoke 'em, but just because we'll never talk again...
_activeThrottle.unchoke(peer.getRemotePeer());
@ -1029,12 +1101,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// _flooder.removePeer(peer);
_expireEvent.remove(peer);
if (needsRebuild())
rebuildExternalAddress();
// deal with races to make sure we drop the peers fully
if ( (altByIdent != null) && (peer != altByIdent) ) dropPeer(altByIdent, shouldShitlist, "recurse");
if ( (altByHost != null) && (peer != altByHost) ) dropPeer(altByHost, shouldShitlist, "recurse");
if ( (altByIdent != null) && (peer != altByIdent) ) locked_dropPeer(altByIdent, shouldShitlist, "recurse");
if ( (altByHost != null) && (peer != altByHost) ) locked_dropPeer(altByHost, shouldShitlist, "recurse");
}
private boolean needsRebuild() {
@ -1688,7 +1757,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
public boolean allowConnection() {
return _peersByIdent.size() < getMaxConnections();
}
@ -1698,20 +1766,17 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/
@Override
public Vector<Long> getClockSkews() {
Vector<Long> skews = new Vector();
Vector<PeerState> peers = new Vector();
peers.addAll(_peersByIdent.values());
// If our clock is way off, we may not have many (or any) successful connections,
// so try hard in that case to return good data
boolean includeEverybody = _context.router().getUptime() < 10*60*1000 || peers.size() < 10;
boolean includeEverybody = _context.router().getUptime() < 10*60*1000 || _peersByIdent.size() < 10;
long now = _context.clock().now();
for (Iterator<PeerState> iter = peers.iterator(); iter.hasNext(); ) {
PeerState peer = iter.next();
if ((!includeEverybody) && now - peer.getLastReceiveTime() > 15*60*1000)
for (PeerState peer : _peersByIdent.values()) {
if ((!includeEverybody) && now - peer.getLastReceiveTime() > 5*60*1000)
continue; // skip old peers
if (peer.getRTT() > PeerState.INIT_RTT - 250)
continue; // Big RTT makes for a poor calculation
skews.addElement(Long.valueOf(peer.getClockSkew() / 1000));
}
if (_log.shouldLog(Log.DEBUG))
@ -2371,6 +2436,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
private class ExpirePeerEvent extends SimpleTimer2.TimedEvent {
// TODO why have separate Set, just use _peersByIdent.values()
private final Set<PeerState> _expirePeers;
private final List<PeerState> _expireBuffer;
private volatile boolean _alive;
@ -2387,9 +2453,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_expireTimeout = Math.min(_expireTimeout + 15*1000, EXPIRE_TIMEOUT);
else
_expireTimeout = Math.max(_expireTimeout - 45*1000, MIN_EXPIRE_TIMEOUT);
long shortInactivityCutoff = _context.clock().now() - _expireTimeout;
long longInactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT;
long pingCutoff = _context.clock().now() - (2 * 60*60*1000);
long now = _context.clock().now();
long shortInactivityCutoff = now - _expireTimeout;
long longInactivityCutoff = now - EXPIRE_TIMEOUT;
long pingCutoff = now - (2 * 60*60*1000);
long pingFirewallCutoff = now - (60 * 1000);
boolean shouldPingFirewall = _reachabilityStatus != CommSystemFacade.STATUS_OK;
boolean pingOneOnly = shouldPingFirewall && _externalListenPort == _endpoint.getListenPort();
_expireBuffer.clear();
for (Iterator<PeerState> iter = _expirePeers.iterator(); iter.hasNext(); ) {
@ -2403,7 +2473,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) {
_expireBuffer.add(peer);
iter.remove();
}
} else if (shouldPingFirewall &&
peer.getLastSendTime() < pingFirewallCutoff &&
peer.getLastReceiveTime() < pingFirewallCutoff) {
// ping if firewall is mapping the port to keep port the same...
// if the port changes we are screwed
if (_log.shouldLog(Log.DEBUG))
_log.debug("Pinging for firewall: " + peer);
// don't update or idle time won't be right and peer won't get dropped
// TODO if both sides are firewalled should only one ping
// or else session will stay open forever?
//peer.setLastSendTime(now);
send(_destroyBuilder.buildPing(peer));
// If external port is different, it may be changing the port for every
// session, so ping all of them. Otherwise only one.
if (pingOneOnly)
shouldPingFirewall = false;
}
}
for (PeerState peer : _expireBuffer) {
@ -2415,12 +2501,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_alive)
schedule(30*1000);
}
public void add(PeerState peer) {
_expirePeers.add(peer);
}
public void remove(PeerState peer) {
_expirePeers.remove(peer);
}
public void setIsAlive(boolean isAlive) {
_alive = isAlive;
if (isAlive) {