diff --git a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java index d284a3bcdd..26be53d92f 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java @@ -189,6 +189,7 @@ public class SummaryHelper { return "0.0"; RateStat receiveRate = _context.statManager().getRate("transport.receiveMessageSize"); + if (receiveRate == null) return "0.0"; Rate rate = receiveRate.getRate(60*1000); double bytes = rate.getLastTotalValue(); double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d); @@ -206,6 +207,7 @@ public class SummaryHelper { return "0.0"; RateStat receiveRate = _context.statManager().getRate("transport.sendMessageSize"); + if (receiveRate == null) return "0.0"; Rate rate = receiveRate.getRate(60*1000); double bytes = rate.getLastTotalValue(); double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d); @@ -224,6 +226,7 @@ public class SummaryHelper { return "0.0"; RateStat receiveRate = _context.statManager().getRate("transport.receiveMessageSize"); + if (receiveRate == null) return "0.0"; Rate rate = receiveRate.getRate(5*60*1000); double bytes = rate.getLastTotalValue(); double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d); @@ -242,6 +245,7 @@ public class SummaryHelper { return "0.0"; RateStat receiveRate = _context.statManager().getRate("transport.sendMessageSize"); + if (receiveRate == null) return "0.0"; Rate rate = receiveRate.getRate(5*60*1000); double bytes = rate.getLastTotalValue(); double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 1fb61338af..128e2f991a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -272,10 +272,13 @@ public class Connection { SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), timeout); } + _context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize()); + _lastSendTime = _context.clock().now(); _outboundQueue.enqueue(packet); resetActivityTimer(); + /* if (ackOnly) { // ACK only, don't schedule this packet for retries // however, if we are running low on sessionTags we want to send @@ -286,6 +289,7 @@ public class Connection { _connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent(), new PingNotifier()); } } + */ } private class PingNotifier implements ConnectionManager.PingNotifier { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 07a5cdb798..a239a62605 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -13,6 +13,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { private int _receiveWindow; private int _profile; private int _rtt; + private int _trend[]; private int _resendDelay; private int _sendAckDelay; private int _maxMessageSize; @@ -50,6 +51,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { public static final String PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = "i2p.streaming.congestionAvoidanceGrowthRateFactor"; public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor"; + private static final int TREND_COUNT = 3; + public ConnectionOptions() { super(); } @@ -85,6 +88,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { protected void init(Properties opts) { super.init(opts); + _trend = new int[TREND_COUNT]; + setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, 4*1024)); @@ -186,11 +191,36 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { */ public int getRTT() { return _rtt; } public void setRTT(int ms) { + synchronized (_trend) { + _trend[0] = _trend[1]; + _trend[1] = _trend[2]; + if (ms > _rtt) + _trend[2] = 1; + else if (ms < _rtt) + _trend[2] = -1; + else + _trend[2] = 0; + } _rtt = ms; if (_rtt > 60*1000) _rtt = 60*1000; } + /** + * If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have + * 3 consecutive rtt decreases, we are trending downwards (-1), else we're stable. + * + */ + public int getRTTTrend() { + synchronized (_trend) { + for (int i = 0; i < TREND_COUNT - 1; i++) { + if (_trend[i] != _trend[i+1]) + return 0; + } + return _trend[0]; + } + } + /** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */ private static final double RTT_DAMPENING = 0.9; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 710798b875..479b464836 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -26,6 +26,7 @@ public class ConnectionPacketHandler { _context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.resetReceived", "How many messages had we sent successfully before receiving a RESET?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 }); } /** distribute a packet to the connection specified */ @@ -177,7 +178,19 @@ public class ConnectionPacketHandler { // con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); int numResends = 0; - List acked = con.ackPackets(ackThrough, nacks); + List acked = null; + // if we don't know the streamIds for both sides of the connection, there's no way we + // could actually be acking data (this fixes the buggered up ack of packet 0 problem). + // this is called after packet verification, which places the stream IDs as necessary if + // the SYN verifies (so if we're acking w/out stream IDs, no SYN has been received yet) + if ( (packet.getSendStreamId() != null) && (packet.getReceiveStreamId() != null) && + (con.getSendStreamId() != null) && (con.getReceiveStreamId() != null) && + (!DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) && + (!DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) && + (!DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) && + (!DataHelper.eq(con.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) ) + acked = con.ackPackets(ackThrough, nacks); + if ( (acked != null) && (acked.size() > 0) ) { if (_log.shouldLog(Log.DEBUG)) _log.debug(acked.size() + " of our packets acked with " + packet); @@ -247,8 +260,13 @@ public class ConnectionPacketHandler { int oldWindow = con.getOptions().getWindowSize(); int newWindowSize = oldWindow; + int trend = con.getOptions().getRTTTrend(); + + _context.statManager().addRateData("stream.trend", trend, newWindowSize); + if ( (!congested) && (acked > 0) && (numResends <= 0) ) { - if (newWindowSize > con.getLastCongestionSeenAt() / 2) { + if ( (newWindowSize > con.getLastCongestionSeenAt() / 2) || + (trend > 0) ) { // tcp vegas: avoidance if rtt is increasing, even if we arent at ssthresh/2 yet // congestion avoidance // we can't use newWindowSize += 1/newWindowSize, since we're diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index a55c2e36f9..b967a8aff4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -578,7 +578,7 @@ public class Packet { return buf; } - private static final String toId(byte id[]) { + static final String toId(byte id[]) { if (id == null) return Base64.encode(STREAM_ID_UNKNOWN); else diff --git a/history.txt b/history.txt index 6e75abf083..411cdfc025 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,20 @@ -$Id: history.txt,v 1.221 2005/08/01 22:26:51 duck Exp $ +$Id: history.txt,v 1.222 2005/08/03 13:58:13 jrandom Exp $ + +2005-08-07 Complication + * Display the average clock skew for both SSU and TCP connections + +2005-08-07 jrandom + * Fixed the long standing streaming lib bug where we could lose the first + packet on retransmission. + * Avoid an NPE when a message expires on the SSU queue. + * Adjust the streaming lib's window growth factor with an additional + Vegas-esque congestion detection algorithm. + * Removed an unnecessary SSU session drop + * Reduced the MTU (until we get a working PMTU lib) + * Deferr tunnel acceptance until we know how to reach the next hop, + rejecting it if we can't find them in time. + * If our netDb store of our leaseSet fails, give it a few seconds before + republishing. * 2005-08-03 0.6.0.1 released diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 54046c1f29..e39e3e2e0b 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.210 $ $Date: 2005/07/31 16:35:27 $"; + public final static String ID = "$Revision: 1.211 $ $Date: 2005/08/03 13:58:13 $"; public final static String VERSION = "0.6.0.1"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java index 90ae2b0dfa..7504121be1 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java @@ -84,7 +84,7 @@ public class RepublishLeaseSetJob extends JobImpl { public void runJob() { if (_log.shouldLog(Log.WARN)) _log.warn("FAILED publishing of the leaseSet for " + _dest.toBase64()); - RepublishLeaseSetJob.this.requeue(5*1000); + RepublishLeaseSetJob.this.requeue(30*1000); } } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index 0c690fdc92..7b90d2d844 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -334,16 +334,19 @@ class SearchJob extends JobImpl { } TunnelId inTunnelId = inTunnel.getReceiveTunnelId(0); - RouterInfo inGateway = getContext().netDb().lookupRouterInfoLocally(inTunnel.getPeer(0)); - if (inGateway == null) { - _log.error("We can't find the gateway to our inbound tunnel?! wtf"); - getContext().jobQueue().addJob(new FailedJob(getContext(), router)); - return; - } + // this will fail if we've shitlisted our inbound gateway, but the gw may not necessarily + // be shitlisted by whomever needs to contact them, so we don't need to check this + + //RouterInfo inGateway = getContext().netDb().lookupRouterInfoLocally(inTunnel.getPeer(0)); + //if (inGateway == null) { + // _log.error("We can't find the gateway to our inbound tunnel?! wtf"); + // getContext().jobQueue().addJob(new FailedJob(getContext(), router)); + // return; + //} long expiration = getContext().clock().now() + getPerPeerTimeoutMs(); - DatabaseLookupMessage msg = buildMessage(inTunnelId, inGateway, expiration); + DatabaseLookupMessage msg = buildMessage(inTunnelId, inTunnel.getPeer(0), expiration); TunnelInfo outTunnel = getOutboundTunnelId(); if (outTunnel == null) { @@ -409,10 +412,11 @@ class SearchJob extends JobImpl { * @param replyGateway gateway for the reply tunnel * @param expiration when the search should stop */ - protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) { + protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, Hash replyGateway, long expiration) { DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true); msg.setSearchKey(_state.getTarget()); - msg.setFrom(replyGateway.getIdentity().getHash()); + //msg.setFrom(replyGateway.getIdentity().getHash()); + msg.setFrom(replyGateway); msg.setDontIncludePeers(_state.getClosestAttempted(MAX_CLOSEST)); msg.setMessageExpiration(expiration); msg.setReplyTunnel(replyTunnelId); @@ -504,6 +508,8 @@ class SearchJob extends JobImpl { boolean sendsBadInfo = getContext().profileOrganizer().peerSendsBadReplies(_peer); if (!sendsBadInfo) { + // we don't need to search for everthing we're given here - only ones that + // are next in our search path... getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs); _repliesPendingVerification++; } else { diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index ffdf5e9045..5b9309c129 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -39,7 +39,7 @@ class StoreJob extends JobImpl { private PeerSelector _peerSelector; private final static int PARALLELIZATION = 3; // how many sent at a time - private final static int REDUNDANCY = 10; // we want the data sent to 10 peers + private final static int REDUNDANCY = 6; // we want the data sent to 6 peers /** * additionally send to 1 outlier(s), in case all of the routers chosen in our * REDUNDANCY set are attacking us by accepting DbStore messages but dropping diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index d2ca97c075..5543606254 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -802,7 +802,7 @@ public class TCPTransport extends TransportImpl { } buf.append("\n"); - buf.append("Average clock skew: "); + buf.append("Average clock skew, TCP peers: "); if (_connectionsByIdent.size() > 0) buf.append(offsetTotal / _connectionsByIdent.size()).append("ms
\n"); else diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index e788213f9a..3dd2386c16 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -22,7 +22,7 @@ public class ACKSender implements Runnable { private boolean _alive; /** how frequently do we want to send ACKs to a peer? */ - static final int ACK_FREQUENCY = 200; + static final int ACK_FREQUENCY = 100; public ACKSender(RouterContext ctx, UDPTransport transport) { _context = ctx; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index b20fccd5f3..8b3d5bb01a 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -4,6 +4,8 @@ import java.util.ArrayList; import java.util.List; import net.i2p.data.Hash; +import net.i2p.data.RouterInfo; +import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; @@ -35,7 +37,7 @@ public class OutboundMessageFragments { /** if we can handle more messages explicitly, set this to true */ private boolean _allowExcess; - private static final int MAX_ACTIVE = 32; + private static final int MAX_ACTIVE = 64; // don't send a packet more than 10 times static final int MAX_VOLLEYS = 10; @@ -83,6 +85,7 @@ public class OutboundMessageFragments { long start = _context.clock().now(); int numActive = 0; + int maxActive = Math.max(_transport.countActivePeers(), MAX_ACTIVE); while (_alive) { finishMessages(); try { @@ -90,7 +93,7 @@ public class OutboundMessageFragments { numActive = _activeMessages.size(); if (!_alive) return false; - else if (numActive < MAX_ACTIVE) + else if (numActive < maxActive) return true; else if (_allowExcess) return true; @@ -108,9 +111,18 @@ public class OutboundMessageFragments { * */ public void add(OutNetMessage msg) { + I2NPMessage msgBody = msg.getMessage(); + RouterInfo target = msg.getTarget(); + if ( (msgBody == null) || (target == null) ) { + synchronized (_activeMessages) { + _activeMessages.notifyAll(); + } + return; + } + OutboundMessageState state = new OutboundMessageState(_context); - boolean ok = state.initialize(msg); - state.setPeer(_transport.getPeerState(msg.getTarget().getIdentity().calculateHash())); + boolean ok = state.initialize(msg, msgBody); + state.setPeer(_transport.getPeerState(target.getIdentity().calculateHash())); finishMessages(); int active = 0; synchronized (_activeMessages) { @@ -337,7 +349,7 @@ public class OutboundMessageFragments { } private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) { - if (state != null) { + if ( (state != null) && (peer != null) ) { int fragments = state.getFragmentCount(); if (fragments < 0) return null; @@ -420,14 +432,16 @@ public class OutboundMessageFragments { _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); _transport.succeeded(state.getMessage()); int numFragments = state.getFragmentCount(); - if (state.getPeer() != null) { + PeerState peer = state.getPeer(); + if (peer != null) { // this adjusts the rtt/rto/window/etc - state.getPeer().messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), state.getMaxSends()); + peer.messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), state.getMaxSends()); + if (peer.getSendWindowBytesRemaining() > 0) + _throttle.unchoke(peer.getRemotePeer()); } else { - _log.warn("message acked, but no peer attacked: " + state); + if (_log.shouldLog(Log.WARN)) + _log.warn("message acked, but no peer attacked: " + state); } - if (state.getPeer().getSendWindowBytesRemaining() > 0) - _throttle.unchoke(state.getPeer().getRemotePeer()); state.releaseResources(); return numFragments; } else { diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index b955904e0b..167ce023c1 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -45,6 +45,7 @@ public class OutboundMessageState { } public boolean initialize(OutNetMessage msg) { + if (msg == null) return false; try { initialize(msg, msg.getMessage(), null); return true; @@ -57,6 +58,9 @@ public class OutboundMessageState { } public boolean initialize(I2NPMessage msg, PeerState peer) { + if (msg == null) + return false; + try { initialize(null, msg, peer); return true; @@ -68,6 +72,21 @@ public class OutboundMessageState { } } + public boolean initialize(OutNetMessage m, I2NPMessage msg) { + if ( (m == null) || (msg == null) ) + return false; + + try { + initialize(m, msg, null); + return true; + } catch (OutOfMemoryError oom) { + throw oom; + } catch (Exception e) { + _log.log(Log.CRIT, "Error initializing " + msg, e); + return false; + } + } + private void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) { _message = m; _peer = peer; @@ -200,7 +219,7 @@ public class OutboundMessageState { public void fragment(int fragmentSize) { int totalSize = _messageBuf.getValid(); int numFragments = totalSize / fragmentSize; - if (numFragments * fragmentSize != totalSize) + if (numFragments * fragmentSize < totalSize) numFragments++; if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 276aec24f5..5eacf87369 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -1,7 +1,9 @@ package net.i2p.router.transport.udp; import java.net.InetAddress; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import net.i2p.data.Base64; import net.i2p.router.Router; @@ -29,8 +31,12 @@ public class PacketHandler { private InboundMessageFragments _inbound; private PeerTestManager _testManager; private boolean _keepReading; + private List _handlers; private static final int NUM_HANDLERS = 3; + /** let packets be up to 30s slow */ + private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000; + public PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound, PeerTestManager testManager) { _context = ctx; @@ -40,6 +46,10 @@ public class PacketHandler { _establisher = establisher; _inbound = inbound; _testManager = testManager; + _handlers = new ArrayList(NUM_HANDLERS); + for (int i = 0; i < NUM_HANDLERS; i++) { + _handlers.add(new Handler()); + } _context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", new long[] { 10*60*1000, 60*60*1000 }); @@ -52,8 +62,8 @@ public class PacketHandler { public void startup() { _keepReading = true; - for (int i = 0; i < NUM_HANDLERS; i++) { - I2PThread t = new I2PThread(new Handler(), "Packet handler " + i + ": " + _endpoint.getListenPort()); + for (int i = 0; i < _handlers.size(); i++) { + I2PThread t = new I2PThread((Handler)_handlers.get(i), "Packet handler " + i + ": " + _endpoint.getListenPort()); t.setDaemon(true); t.start(); } @@ -62,30 +72,51 @@ public class PacketHandler { public void shutdown() { _keepReading = false; } + + String getHandlerStatus() { + StringBuffer rv = new StringBuffer(); + int size = _handlers.size(); + rv.append("Handlers: ").append(size); + for (int i = 0; i < size; i++) { + Handler handler = (Handler)_handlers.get(i); + rv.append(" handler ").append(i).append(" state: ").append(handler._state); + } + return rv.toString(); + } private class Handler implements Runnable { private UDPPacketReader _reader; + public volatile int _state; public Handler() { _reader = new UDPPacketReader(_context); + _state = 0; } public void run() { + _state = 1; while (_keepReading) { + _state = 2; UDPPacket packet = _endpoint.receive(); + _state = 3; if (packet == null) continue; // keepReading is probably false... if (_log.shouldLog(Log.DEBUG)) _log.debug("Received the packet " + packet); + _state = 4; long queueTime = packet.getLifetime(); long handleStart = _context.clock().now(); try { + _state = 5; handlePacket(_reader, packet); + _state = 6; } catch (Exception e) { + _state = 7; if (_log.shouldLog(Log.ERROR)) _log.error("Crazy error handling a packet: " + packet, e); } long handleTime = _context.clock().now() - handleStart; _context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime()); _context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime()); + _state = 8; if (handleTime > 1000) { if (_log.shouldLog(Log.WARN)) @@ -95,244 +126,287 @@ public class PacketHandler { // back to the cache with thee! packet.release(); + _state = 9; } } - } - - private void handlePacket(UDPPacketReader reader, UDPPacket packet) { - if (packet == null) return; - - RemoteHostId rem = packet.getRemoteHost(); - PeerState state = _transport.getPeerState(rem); - if (state == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet received is not for a connected peer"); - InboundEstablishState est = _establisher.getInboundState(rem); - if (est != null) { + //} + + private void handlePacket(UDPPacketReader reader, UDPPacket packet) { + if (packet == null) return; + + _state = 10; + + RemoteHostId rem = packet.getRemoteHost(); + PeerState state = _transport.getPeerState(rem); + if (state == null) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet received IS for an inbound establishment"); - receivePacket(reader, packet, est); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet received is not for an inbound establishment"); - OutboundEstablishState oest = _establisher.getOutboundState(rem); - if (oest != null) { + _log.debug("Packet received is not for a connected peer"); + _state = 11; + InboundEstablishState est = _establisher.getInboundState(rem); + if (est != null) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet received IS for an outbound establishment"); - receivePacket(reader, packet, oest); + _log.debug("Packet received IS for an inbound establishment"); + _state = 12; + receivePacket(reader, packet, est); } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet received is not for an inbound or outbound establishment"); - // ok, not already known establishment, try as a new one - receivePacket(reader, packet); + _log.debug("Packet received is not for an inbound establishment"); + _state = 13; + OutboundEstablishState oest = _establisher.getOutboundState(rem); + if (oest != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Packet received IS for an outbound establishment"); + _state = 14; + receivePacket(reader, packet, oest); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Packet received is not for an inbound or outbound establishment"); + // ok, not already known establishment, try as a new one + _state = 15; + receivePacket(reader, packet); + } } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Packet received IS for an existing peer"); + _state = 16; + receivePacket(reader, packet, state); } - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet received IS for an existing peer"); - receivePacket(reader, packet, state); } - } - - private void receivePacket(UDPPacketReader reader, UDPPacket packet, PeerState state) { - boolean isValid = packet.validate(state.getCurrentMACKey()); - if (!isValid) { - if (state.getNextMACKey() != null) - isValid = packet.validate(state.getNextMACKey()); + + private void receivePacket(UDPPacketReader reader, UDPPacket packet, PeerState state) { + _state = 17; + boolean isValid = packet.validate(state.getCurrentMACKey()); + if (!isValid) { + _state = 18; + if (state.getNextMACKey() != null) + isValid = packet.validate(state.getNextMACKey()); + if (!isValid) { + _state = 19; + if (_log.shouldLog(Log.WARN)) + _log.warn("Failed validation with existing con, trying as new con: " + packet); + + isValid = packet.validate(_transport.getIntroKey()); + if (isValid) { + _state = 20; + // this is a stray packet from an inbound establishment + // process, so try our intro key + // (after an outbound establishment process, there wouldn't + // be any stray packets) + if (_log.shouldLog(Log.INFO)) + _log.info("Validation with existing con failed, but validation as reestablish/stray passed"); + packet.decrypt(_transport.getIntroKey()); + } else { + _state = 21; + InboundEstablishState est = _establisher.getInboundState(packet.getRemoteHost()); + if (est != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Packet from an existing peer IS for an inbound establishment"); + _state = 22; + receivePacket(reader, packet, est, false); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP"); + _context.statManager().addRateData("udp.droppedInvalidReestablish", packet.getLifetime(), packet.getExpiration()); + } + return; + } + } else { + _state = 23; + packet.decrypt(state.getNextCipherKey()); + } + } else { + _state = 24; + packet.decrypt(state.getCurrentCipherKey()); + } + + _state = 25; + handlePacket(reader, packet, state, null, null); + _state = 26; + } + + private void receivePacket(UDPPacketReader reader, UDPPacket packet) { + _state = 27; + boolean isValid = packet.validate(_transport.getIntroKey()); if (!isValid) { if (_log.shouldLog(Log.WARN)) - _log.warn("Failed validation with existing con, trying as new con: " + packet); - - isValid = packet.validate(_transport.getIntroKey()); + _log.warn("Invalid introduction packet received: " + packet, new Exception("path")); + _context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration()); + _state = 28; + return; + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Valid introduction packet received: " + packet); + } + + _state = 29; + packet.decrypt(_transport.getIntroKey()); + handlePacket(reader, packet, null, null, null); + _state = 30; + } + + private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state) { + receivePacket(reader, packet, state, true); + } + /** + * @param allowFallback if it isn't valid for this establishment state, try as a non-establishment packet + */ + private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state, boolean allowFallback) { + _state = 31; + if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { + StringBuffer buf = new StringBuffer(128); + buf.append("Attempting to receive a packet on a known inbound state: "); + buf.append(state); + buf.append(" MAC key: ").append(state.getMACKey()); + buf.append(" intro key: ").append(_transport.getIntroKey()); + _log.debug(buf.toString()); + } + boolean isValid = false; + if (state.getMACKey() != null) { + isValid = packet.validate(state.getMACKey()); if (isValid) { - // this is a stray packet from an inbound establishment - // process, so try our intro key - // (after an outbound establishment process, there wouldn't - // be any stray packets) - if (_log.shouldLog(Log.INFO)) - _log.info("Validation with existing con failed, but validation as reestablish/stray passed"); - packet.decrypt(_transport.getIntroKey()); + if (_log.shouldLog(Log.WARN)) + _log.warn("Valid introduction packet received for inbound con: " + packet); + + _state = 32; + packet.decrypt(state.getCipherKey()); + handlePacket(reader, packet, null, null, null); + return; } else { - InboundEstablishState est = _establisher.getInboundState(packet.getRemoteHost()); - if (est != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet from an existing peer IS for an inbound establishment"); - receivePacket(reader, packet, est, false); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP"); - _context.statManager().addRateData("udp.droppedInvalidReestablish", packet.getLifetime(), packet.getExpiration()); - } + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid introduction packet received for inbound con, falling back: " + packet); + + _state = 33; + } + } + if (allowFallback) { + // ok, we couldn't handle it with the established stuff, so fall back + // on earlier state packets + _state = 34; + receivePacket(reader, packet); + } else { + _context.statManager().addRateData("udp.droppedInvalidInboundEstablish", packet.getLifetime(), packet.getExpiration()); + } + } + + private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) { + _state = 35; + if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { + StringBuffer buf = new StringBuffer(128); + buf.append("Attempting to receive a packet on a known outbound state: "); + buf.append(state); + buf.append(" MAC key: ").append(state.getMACKey()); + buf.append(" intro key: ").append(state.getIntroKey()); + _log.debug(buf.toString()); + } + + boolean isValid = false; + if (state.getMACKey() != null) { + _state = 36; + isValid = packet.validate(state.getMACKey()); + if (isValid) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Valid introduction packet received for outbound established con: " + packet); + + _state = 37; + packet.decrypt(state.getCipherKey()); + handlePacket(reader, packet, null, state, null); + _state = 38; return; } - } else { - packet.decrypt(state.getNextCipherKey()); } - } else { - packet.decrypt(state.getCurrentCipherKey()); - } - - handlePacket(reader, packet, state, null, null); - } - - private void receivePacket(UDPPacketReader reader, UDPPacket packet) { - boolean isValid = packet.validate(_transport.getIntroKey()); - if (!isValid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid introduction packet received: " + packet, new Exception("path")); - _context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration()); - return; - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Valid introduction packet received: " + packet); - } - - packet.decrypt(_transport.getIntroKey()); - handlePacket(reader, packet, null, null, null); - } - private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state) { - receivePacket(reader, packet, state, true); - } - /** - * @param allowFallback if it isn't valid for this establishment state, try as a non-establishment packet - */ - private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state, boolean allowFallback) { - if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { - StringBuffer buf = new StringBuffer(128); - buf.append("Attempting to receive a packet on a known inbound state: "); - buf.append(state); - buf.append(" MAC key: ").append(state.getMACKey()); - buf.append(" intro key: ").append(_transport.getIntroKey()); - _log.debug(buf.toString()); - } - boolean isValid = false; - if (state.getMACKey() != null) { - isValid = packet.validate(state.getMACKey()); + // keys not yet exchanged, lets try it with the peer's intro key + isValid = packet.validate(state.getIntroKey()); if (isValid) { if (_log.shouldLog(Log.WARN)) - _log.warn("Valid introduction packet received for inbound con: " + packet); - - packet.decrypt(state.getCipherKey()); - handlePacket(reader, packet, null, null, null); + _log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet); + _state = 39; + packet.decrypt(state.getIntroKey()); + handlePacket(reader, packet, null, state, null); + _state = 40; return; } else { if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid introduction packet received for inbound con, falling back: " + packet); - + _log.warn("Invalid introduction packet received for outbound established con with old intro key, falling back: " + packet); } - } - if (allowFallback) { + // ok, we couldn't handle it with the established stuff, so fall back // on earlier state packets + _state = 41; receivePacket(reader, packet); - } else { - _context.statManager().addRateData("udp.droppedInvalidInboundEstablish", packet.getLifetime(), packet.getExpiration()); + _state = 42; } - } - private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) { - if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { - StringBuffer buf = new StringBuffer(128); - buf.append("Attempting to receive a packet on a known outbound state: "); - buf.append(state); - buf.append(" MAC key: ").append(state.getMACKey()); - buf.append(" intro key: ").append(state.getIntroKey()); - _log.debug(buf.toString()); - } - - boolean isValid = false; - if (state.getMACKey() != null) { - isValid = packet.validate(state.getMACKey()); - if (isValid) { + /** + * Parse out the interesting bits and honor what it says + */ + private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) { + _state = 43; + reader.initialize(packet); + _state = 44; + long recvOn = packet.getBegin(); + long sendOn = reader.readTimestamp() * 1000; + long skew = recvOn - sendOn; + if (skew > GRACE_PERIOD) { if (_log.shouldLog(Log.WARN)) - _log.warn("Valid introduction packet received for outbound established con: " + packet); - - packet.decrypt(state.getCipherKey()); - handlePacket(reader, packet, null, state, null); + _log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet); + _context.statManager().addRateData("udp.droppedInvalidSkew", skew, packet.getExpiration()); + return; + } else if (skew < 0 - GRACE_PERIOD) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet); + _context.statManager().addRateData("udp.droppedInvalidSkew", 0-skew, packet.getExpiration()); return; } - } - - // keys not yet exchanged, lets try it with the peer's intro key - isValid = packet.validate(state.getIntroKey()); - if (isValid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet); - packet.decrypt(state.getIntroKey()); - handlePacket(reader, packet, null, state, null); - return; - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid introduction packet received for outbound established con with old intro key, falling back: " + packet); - } - - // ok, we couldn't handle it with the established stuff, so fall back - // on earlier state packets - receivePacket(reader, packet); - } - /** let packets be up to 30s slow */ - private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000; - - /** - * Parse out the interesting bits and honor what it says - */ - private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) { - reader.initialize(packet); - long recvOn = packet.getBegin(); - long sendOn = reader.readTimestamp() * 1000; - long skew = recvOn - sendOn; - if (skew > GRACE_PERIOD) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet); - _context.statManager().addRateData("udp.droppedInvalidSkew", skew, packet.getExpiration()); - return; - } else if (skew < 0 - GRACE_PERIOD) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet); - _context.statManager().addRateData("udp.droppedInvalidSkew", 0-skew, packet.getExpiration()); - return; - } - - if (state != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received packet from " + state.getRemoteHostId().toString() + " with skew " + skew); - state.adjustClockSkew((short)skew); - } - - _context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime()); - - //InetAddress fromHost = packet.getPacket().getAddress(); - //int fromPort = packet.getPacket().getPort(); - //RemoteHostId from = new RemoteHostId(fromHost.getAddress(), fromPort); - RemoteHostId from = packet.getRemoteHost(); - - switch (reader.readPayloadType()) { - case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST: - _establisher.receiveSessionRequest(from, reader); - break; - case UDPPacket.PAYLOAD_TYPE_SESSION_CONFIRMED: - _establisher.receiveSessionConfirmed(from, reader); - break; - case UDPPacket.PAYLOAD_TYPE_SESSION_CREATED: - _establisher.receiveSessionCreated(from, reader); - break; - case UDPPacket.PAYLOAD_TYPE_DATA: - if (outState != null) - state = _establisher.receiveData(outState); - if (_log.shouldLog(Log.INFO)) - _log.info("Received new DATA packet from " + state + ": " + packet); - _inbound.receiveData(state, reader.getDataReader()); - break; - case UDPPacket.PAYLOAD_TYPE_TEST: - _testManager.receiveTest(from, reader); - break; - default: - if (_log.shouldLog(Log.WARN)) - _log.warn("Unknown payload type: " + reader.readPayloadType()); - _context.statManager().addRateData("udp.droppedInvalidUnknown", packet.getLifetime(), packet.getExpiration()); - return; + if (state != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received packet from " + state.getRemoteHostId().toString() + " with skew " + skew); + state.adjustClockSkew((short)skew); + } + + _context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime()); + + //InetAddress fromHost = packet.getPacket().getAddress(); + //int fromPort = packet.getPacket().getPort(); + //RemoteHostId from = new RemoteHostId(fromHost.getAddress(), fromPort); + _state = 45; + RemoteHostId from = packet.getRemoteHost(); + _state = 46; + + switch (reader.readPayloadType()) { + case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST: + _state = 47; + _establisher.receiveSessionRequest(from, reader); + break; + case UDPPacket.PAYLOAD_TYPE_SESSION_CONFIRMED: + _state = 48; + _establisher.receiveSessionConfirmed(from, reader); + break; + case UDPPacket.PAYLOAD_TYPE_SESSION_CREATED: + _state = 49; + _establisher.receiveSessionCreated(from, reader); + break; + case UDPPacket.PAYLOAD_TYPE_DATA: + _state = 50; + if (outState != null) + state = _establisher.receiveData(outState); + if (_log.shouldLog(Log.INFO)) + _log.info("Received new DATA packet from " + state + ": " + packet); + _inbound.receiveData(state, reader.getDataReader()); + break; + case UDPPacket.PAYLOAD_TYPE_TEST: + _state = 51; + _testManager.receiveTest(from, reader); + break; + default: + _state = 52; + if (_log.shouldLog(Log.WARN)) + _log.warn("Unknown payload type: " + reader.readPayloadType()); + _context.statManager().addRateData("udp.droppedInvalidUnknown", packet.getLifetime(), packet.getExpiration()); + return; + } } } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 0b36f8af81..4e3765e3e2 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -155,12 +155,15 @@ public class PeerState { private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MAX_SEND_WINDOW_BYTES = 1024*1024; /* - * 576 gives us 568 IP byes, 548 UDP bytes, and with an SSU data message, - * 502 fragment bytes, which is enough to send a tunnel data message in 2 - * packets. + * 596 gives us 588 IP byes, 568 UDP bytes, and with an SSU data message, + * 522 fragment bytes, which is enough to send a tunnel data message in 2 + * packets. A tunnel data message sent over the wire is 1044 bytes, meaning + * we need 522 fragment bytes to fit it in 2 packets - add 46 for SSU, 20 + * for UDP, and 8 for IP, giving us 596. round up to mod 16, giving a total + * of 608 */ - private static final int DEFAULT_MTU = 1500; - private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY; + private static final int DEFAULT_MTU = 608;//600; //1500; + private static final int MIN_RTO = 1000 + ACKSender.ACK_FREQUENCY; private static final int MAX_RTO = 2000; // 5000; public PeerState(I2PAppContext ctx) { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index 21fe37b9d3..2d6e236350 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -15,12 +15,14 @@ public class UDPEndpoint { private RouterContext _context; private Log _log; private int _listenPort; + private UDPTransport _transport; private UDPSender _sender; private UDPReceiver _receiver; - public UDPEndpoint(RouterContext ctx, int listenPort) throws SocketException { + public UDPEndpoint(RouterContext ctx, UDPTransport transport, int listenPort) throws SocketException { _context = ctx; _log = ctx.logManager().getLog(UDPEndpoint.class); + _transport = transport; _listenPort = listenPort; } @@ -32,7 +34,7 @@ public class UDPEndpoint { try { DatagramSocket socket = new DatagramSocket(_listenPort); _sender = new UDPSender(_context, socket, "UDPSend on " + _listenPort); - _receiver = new UDPReceiver(_context, socket, "UDPReceive on " + _listenPort); + _receiver = new UDPReceiver(_context, _transport, socket, "UDPReceive on " + _listenPort); _sender.startup(); _receiver.startup(); } catch (SocketException se) { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java index 1dde784e8c..bf73e1cb6b 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java @@ -36,7 +36,7 @@ public class UDPEndpointTest { int base = 2000 + _context.random().nextInt(10000); for (int i = 0; i < numPeers; i++) { _log.debug("Building " + i); - UDPEndpoint endpoint = new UDPEndpoint(_context, base + i); + UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i); _endpoints[i] = endpoint; endpoint.startup(); I2PThread read = new I2PThread(new TestRead(endpoint), "Test read " + i); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index df137136d6..3cb5d1db0c 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -28,13 +28,15 @@ public class UDPReceiver { private List _inboundQueue; private boolean _keepRunning; private Runner _runner; + private UDPTransport _transport; - public UDPReceiver(RouterContext ctx, DatagramSocket socket, String name) { + public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) { _context = ctx; _log = ctx.logManager().getLog(UDPReceiver.class); _name = name; _inboundQueue = new ArrayList(128); _socket = socket; + _transport = transport; _runner = new Runner(); _context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -64,8 +66,8 @@ public class UDPReceiver { return _runner.updateListeningPort(socket, newPort); } - /** if a packet been sitting in the queue for 2 seconds, drop subsequent packets */ - private static final long MAX_QUEUE_PERIOD = 2*1000; + /** if a packet been sitting in the queue for a full second (meaning the handlers are overwhelmed), drop subsequent packets */ + private static final long MAX_QUEUE_PERIOD = 1*1000; private static final float ARTIFICIAL_DROP_PROBABILITY = 0.0f; // 0.02f; // 0.0f; @@ -90,22 +92,38 @@ public class UDPReceiver { if (_log.shouldLog(Log.DEBUG)) _log.debug("Received: " + packet); + boolean rejected = false; + int queueSize = 0; + long headPeriod = 0; synchronized (_inboundQueue) { - int queueSize = _inboundQueue.size(); + queueSize = _inboundQueue.size(); if (queueSize > 0) { - long headPeriod = ((UDPPacket)_inboundQueue.get(0)).getLifetime(); + headPeriod = ((UDPPacket)_inboundQueue.get(0)).getLifetime(); if (headPeriod > MAX_QUEUE_PERIOD) { - _context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod); - if (_log.shouldLog(Log.ERROR)) - _log.error("Dropping inbound packet with " + queueSize + " queued for " + headPeriod); + rejected = true; _inboundQueue.notifyAll(); - return queueSize; } } - _inboundQueue.add(packet); - _inboundQueue.notifyAll(); - return queueSize + 1; + if (!rejected) { + _inboundQueue.add(packet); + _inboundQueue.notifyAll(); + return queueSize + 1; + } } + + // rejected + _context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod); + if (_log.shouldLog(Log.ERROR)) { + StringBuffer msg = new StringBuffer(); + msg.append("Dropping inbound packet with "); + msg.append(queueSize); + msg.append(" queued for "); + msg.append(headPeriod); + if (_transport != null) + msg.append(" packet handlers: ").append(_transport.getPacketHandlerStatus()); + _log.error(msg.toString()); + } + return queueSize; } private class ArtificiallyDelayedReceive implements SimpleTimer.TimedEvent { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index f7baab32d4..dd71a3eafe 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -199,7 +199,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } if (_endpoint == null) { try { - _endpoint = new UDPEndpoint(_context, port); + _endpoint = new UDPEndpoint(_context, this, port); } catch (SocketException se) { if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "Unable to listen on the UDP port (" + port + ")", se); @@ -450,8 +450,46 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority dropPeer(peer, true); } private void dropPeer(PeerState peer, boolean shouldShitlist) { - if (_log.shouldLog(Log.INFO)) - _log.info("Dropping remote peer: " + peer + " shitlist? " + shouldShitlist, new Exception("Dropped by")); + if (_log.shouldLog(Log.INFO)) { + long now = _context.clock().now(); + StringBuffer buf = new StringBuffer(4096); + long timeSinceSend = now - peer.getLastSendTime(); + long timeSinceRecv = now - peer.getLastReceiveTime(); + long timeSinceAck = now - peer.getLastACKSend(); + buf.append("Dropping remote peer: ").append(peer.toString()).append(" shitlist? ").append(shouldShitlist); + buf.append(" lifetime: ").append(now - peer.getKeyEstablishedTime()); + buf.append(" time since send/recv/ack: ").append(timeSinceSend).append(" / "); + buf.append(timeSinceRecv).append(" / ").append(timeSinceAck); + + buf.append("Existing peers: \n"); + synchronized (_peersByIdent) { + for (Iterator iter = _peersByIdent.keySet().iterator(); iter.hasNext(); ) { + Hash c = (Hash)iter.next(); + PeerState p = (PeerState)_peersByIdent.get(c); + if (c.equals(peer.getRemotePeer())) { + if (p != peer) { + buf.append(" SAME PEER, DIFFERENT STATE "); + } else { + buf.append(" same peer, same state "); + } + } else { + buf.append("Peer ").append(p.toString()).append(" "); + } + + buf.append(" lifetime: ").append(now - p.getKeyEstablishedTime()); + + timeSinceSend = now - p.getLastSendTime(); + timeSinceRecv = now - p.getLastReceiveTime(); + timeSinceAck = now - p.getLastACKSend(); + + buf.append(" time since send/recv/ack: ").append(timeSinceSend).append(" / "); + buf.append(timeSinceRecv).append(" / ").append(timeSinceAck); + buf.append("\n"); + } + } + _log.info(buf.toString(), new Exception("Dropped by")); + } + if (peer.getRemotePeer() != null) { dropPeerCapacities(peer); @@ -659,6 +697,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority replaceAddress(addr); } + String getPacketHandlerStatus() { + PacketHandler handler = _handler; + if (handler != null) + return handler.getHandlerStatus(); + else + return ""; + } + public void failed(OutboundMessageState msg) { if (msg == null) return; int consecutive = 0; @@ -708,6 +754,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority synchronized (_peersByIdent) { peers = new ArrayList(_peersByIdent.values()); } + long offsetTotal = 0; StringBuffer buf = new StringBuffer(512); buf.append("UDP connections: ").append(peers.size()).append("
\n"); @@ -748,6 +795,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(" [choked]"); if (peer.getConsecutiveFailedSends() > 0) buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]"); + if (_context.shitlist().isShitlisted(peer.getRemotePeer())) + buf.append(" [shitlisted]"); buf.append(""); buf.append(""); @@ -769,6 +818,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(""); buf.append(peer.getClockSkew()/1000); buf.append("s"); + offsetTotal = offsetTotal + peer.getClockSkew(); buf.append(""); buf.append(peer.getSendWindowBytes()/1024); @@ -815,6 +865,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } out.write("\n"); + + buf.append("Average clock skew, UDP peers:"); + if (peers.size() > 0) + buf.append(offsetTotal / peers.size()).append("ms

\n"); + else + buf.append("n/a


\n"); + + out.write(buf.toString()); + buf.setLength(0); } private static final DecimalFormat _fmt = new DecimalFormat("#,##0.00"); @@ -839,6 +898,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority public String toString() { return "UDP bid @ " + getLatencyMs(); } } + private static final int EXPIRE_TIMEOUT = 10*60*1000; + private class ExpirePeerEvent implements SimpleTimer.TimedEvent { private List _peers; // toAdd and toRemove are kept separate from _peers so that add and @@ -853,10 +914,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _toRemove = new ArrayList(4); } public void timeReached() { - long inactivityCutoff = _context.clock().now() - 10*60*1000; + long inactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT; for (int i = 0; i < _peers.size(); i++) { PeerState peer = (PeerState)_peers.get(i); - if (peer.getLastReceiveTime() < inactivityCutoff) { + if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) { dropPeer(peer, false); _peers.remove(i); i--; @@ -865,8 +926,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority synchronized (_toAdd) { for (int i = 0; i < _toAdd.size(); i++) { PeerState peer = (PeerState)_toAdd.get(i); - if (!_peers.contains(peer)) - _peers.add(peer); + _peers.remove(peer); // in case we are switching peers + _peers.add(peer); } _toAdd.clear(); } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java index aa7792f6b5..133bfbbc5a 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java @@ -75,7 +75,6 @@ public class TunnelParticipant { } if ( (_config != null) && (_config.getSendTo() != null) ) { - _config.incrementProcessedMessages(); RouterInfo ri = _nextHopCache; if (ri == null) ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); @@ -83,6 +82,7 @@ public class TunnelParticipant { if (_log.shouldLog(Log.DEBUG)) _log.debug("Send off to nextHop directly (" + _config.getSendTo().toBase64().substring(0,4) + " for " + msg); + _config.incrementProcessedMessages(); send(_config, msg, ri); } else { if (_log.shouldLog(Log.WARN)) diff --git a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java index e7b2e7034c..ed458314b3 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java @@ -3,6 +3,7 @@ package net.i2p.router.tunnel.pool; import net.i2p.data.Certificate; import net.i2p.data.DataHelper; import net.i2p.data.Hash; +import net.i2p.data.RouterInfo; import net.i2p.data.RouterIdentity; import net.i2p.data.TunnelId; import net.i2p.data.i2np.DeliveryInstructions; @@ -19,6 +20,7 @@ import net.i2p.router.message.GarlicMessageBuilder; import net.i2p.router.message.PayloadGarlicConfig; import net.i2p.router.message.SendMessageDirectJob; import net.i2p.router.tunnel.HopConfig; +import net.i2p.router.peermanager.TunnelHistory; import net.i2p.util.Log; /** @@ -31,6 +33,7 @@ import net.i2p.util.Log; public class HandleTunnelCreateMessageJob extends JobImpl { private Log _log; private TunnelCreateMessage _request; + private boolean _alreadySearched; /** job builder to redirect all tunnelCreateMessages through this job type */ static class Builder implements HandlerJobBuilder { @@ -46,14 +49,19 @@ public class HandleTunnelCreateMessageJob extends JobImpl { super(ctx); _log = ctx.logManager().getLog(HandleTunnelCreateMessageJob.class); _request = msg; + _alreadySearched = false; } + private static final int STATUS_DEFERRED = 10000; + public String getName() { return "Handle tunnel join request"; } public void runJob() { if (_log.shouldLog(Log.DEBUG)) _log.debug("handle join request: " + _request); int status = shouldAccept(); - if (status > 0) { + if (status == STATUS_DEFERRED) { + return; + } else if (status > 0) { if (_log.shouldLog(Log.WARN)) _log.warn("reject(" + status + ") join request: " + _request); sendRejection(status); @@ -64,7 +72,34 @@ public class HandleTunnelCreateMessageJob extends JobImpl { } } - private int shouldAccept() { return getContext().throttle().acceptTunnelRequest(_request); } + private int shouldAccept() { + Hash nextRouter = _request.getNextRouter(); + if (nextRouter != null) { + RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(nextRouter); + if (ri == null) { + if (_alreadySearched) // only search once + return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD; + getContext().netDb().lookupRouterInfo(nextRouter, new DeferredAccept(getContext(), true), new DeferredAccept(getContext(), false), 5*1000); + _alreadySearched = true; + return STATUS_DEFERRED; + } + } + return getContext().throttle().acceptTunnelRequest(_request); + } + + private class DeferredAccept extends JobImpl { + private boolean _shouldAccept; + public DeferredAccept(RouterContext ctx, boolean shouldAccept) { + super(ctx); + _shouldAccept = shouldAccept; + } + public void runJob() { + HandleTunnelCreateMessageJob.this.runJob(); + } + private static final String NAME_OK = "Deferred netDb accept"; + private static final String NAME_REJECT = "Deferred netDb reject"; + public String getName() { return _shouldAccept ? NAME_OK : NAME_REJECT; } + } private void accept() { byte recvId[] = new byte[4]; diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index 457848b4dc..16f3d06ea0 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -163,7 +163,15 @@ public class TunnelPool { * when selecting tunnels, stick with the same one for a brief * period to allow batching if we can. */ - private static final long SELECTION_PERIOD = 500; + private long curPeriod() { + long period = _context.clock().now(); + long ms = period % 1000; + if (ms > 500) + period = period - ms + 500; + else + period = period - ms; + return period; + } /** * Pull a random tunnel out of the pool. If there are none available but @@ -173,8 +181,7 @@ public class TunnelPool { */ public TunnelInfo selectTunnel() { return selectTunnel(true); } private TunnelInfo selectTunnel(boolean allowRecurseOnFail) { - long period = _context.clock().now(); - period -= period % SELECTION_PERIOD; + long period = curPeriod(); synchronized (_tunnels) { if (_lastSelectionPeriod == period) { if ( (_lastSelected != null) &&