From 66aa29e3d4b2f3419efb8a8b38a2616c8de518de Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 16 Dec 2004 02:45:55 +0000 Subject: [PATCH] 2004-12-15 jrandom * Handle hard disconnects more gracefully within the streaming lib, and log unmonitored events more aggressively. * If we drop a peer after connection due to clock skew, log it to the /logs.jsp#connectionlogs with relevent info. In addition, toss it in the stat 'tcp.disconnectAfterSkew'. * Fixed the formatting in the skew display * Added an ERROR message that is fired once after we run out of routerInfo files (thanks susi!) * Set the connect timeout equal to the streaming lib's disconnect timeout if not already specified (the I2PTunnel httpclient already enforces a 60s connect timeout) * Fix for another connection startup problem in the streaming lib. * Fix for a stupid error in the probabalistic drop (rand <= P, not > P) * Adjust the capacity calculations so that tunnel failures alone in the last 10m will not trigger a 0 capacity rank. --- .../net/i2p/client/streaming/Connection.java | 25 +++++++++++ .../client/streaming/ConnectionOptions.java | 2 + .../streaming/ConnectionPacketHandler.java | 21 ++++++++- .../i2p/client/streaming/I2PSocketFull.java | 2 + .../i2p/client/streaming/PacketHandler.java | 14 +++++- .../client/streaming/SchedulerChooser.java | 4 +- .../client/streaming/SchedulerConnecting.java | 3 +- .../streaming/SchedulerHardDisconnected.java | 45 +++++++++++++++++++ history.txt | 19 +++++++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../src/net/i2p/router/StatisticsManager.java | 1 + .../kademlia/PersistentDataStore.java | 11 +++++ .../peermanager/CapacityCalculator.java | 5 ++- .../i2p/router/transport/TransportImpl.java | 2 +- .../transport/tcp/ConnectionHandler.java | 4 +- .../router/transport/tcp/MessageHandler.java | 22 +++++---- .../router/transport/tcp/TCPConnection.java | 2 +- 17 files changed, 164 insertions(+), 22 deletions(-) create mode 100644 apps/streaming/java/src/net/i2p/client/streaming/SchedulerHardDisconnected.java diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index e8b3d877f..054113047 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -33,6 +33,7 @@ public class Connection { private long _lastSendId; private boolean _resetReceived; private boolean _connected; + private boolean _hardDisconnected; private MessageInputStream _inputStream; private MessageOutputStream _outputStream; private SchedulerChooser _chooser; @@ -171,6 +172,22 @@ public class Connection { void ackImmediately() { _receiver.send(null, 0, 0); } + + /** + * got a packet we shouldn't have, send 'em a reset + * + */ + void sendReset() { + if ( (_remotePeer == null) || (_sendStreamId == null) ) return; + PacketLocal reply = new PacketLocal(_context, _remotePeer); + reply.setFlag(Packet.FLAG_RESET); + reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + reply.setSendStreamId(_sendStreamId); + reply.setReceiveStreamId(_receiveStreamId); + reply.setOptionalFrom(_connectionManager.getSession().getMyDestination()); + // this just sends the packet - no retries or whatnot + _outboundQueue.enqueue(reply); + } /** * Flush any data that we can @@ -362,6 +379,7 @@ public class Connection { public boolean getResetReceived() { return _resetReceived; } public boolean getIsConnected() { return _connected; } + public boolean getHardDisconnected() { return _hardDisconnected; } void disconnect(boolean cleanDisconnect) { disconnect(cleanDisconnect, true); @@ -371,6 +389,13 @@ public class Connection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Disconnecting " + toString(), new Exception("discon")); + if (!cleanDisconnect) { + _hardDisconnected = true; + if (_log.shouldLog(Log.WARN)) + _log.warn("Hard disconnecting and sending a reset on " + toString(), new Exception("cause")); + sendReset(); + } + if (cleanDisconnect && _connected) { // send close packets and schedule stuff... _outputStream.closeInternal(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index c7acaac5b..df28a660b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -89,6 +89,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT)); setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE); + + setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); } /** diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 3f303dd16..36b1d1e98 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -25,6 +25,7 @@ public class ConnectionPacketHandler { _context.statManager().createRateStat("stream.con.receiveDuplicateSize", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.resetReceived", "How many messages had we sent successfully before receiving a RESET?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); } /** distribute a packet to the connection specified */ @@ -35,6 +36,21 @@ public class ConnectionPacketHandler { _log.error("Packet does NOT verify: " + packet); return; } + + if (con.getHardDisconnected()) { + if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) || + (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) || (packet.isFlagSet(Packet.FLAG_CLOSE)) ) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received a data packet after hard disconnect: " + packet + " on " + con); + con.sendReset(); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received a packet after hard disconnect, ignoring: " + packet + " on " + con); + } + return; + } + + con.packetReceived(); long ready = con.getInputStream().getHighestReadyBockId(); @@ -84,7 +100,8 @@ public class ConnectionPacketHandler { _log.debug("Scheduling ack in " + delay + "ms for received packet " + packet); } } else { - if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ) { + if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) || + (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { _context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize(), 0); con.incrementDupMessagesReceived(1); @@ -267,6 +284,8 @@ public class ConnectionPacketHandler { con.resetReceived(); con.eventOccurred(); + _context.statManager().addRateData("stream.resetReceived", con.getHighestAckedThrough(), con.getLifetime()); + // no further processing return; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index 7ffbf5502..e97a86f52 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -32,6 +32,8 @@ public class I2PSocketFull implements I2PSocket { destroy(); } + Connection getConnection() { return _connection; } + public InputStream getInputStream() { return _connection.getInputStream(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 2a17c7a69..913bd967b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -152,7 +152,8 @@ public class PacketHandler { } } else { if (_log.shouldLog(Log.WARN)) - _log.warn("Receive a syn packet with the wrong IDs: " + packet); + _log.warn("Receive a syn packet with the wrong IDs, sending reset: " + packet); + sendReset(packet); } } else { // someone is sending us a packet on the wrong stream @@ -162,6 +163,17 @@ public class PacketHandler { } } + private void sendReset(Packet packet) { + PacketLocal reply = new PacketLocal(_context, packet.getOptionalFrom()); + reply.setFlag(Packet.FLAG_RESET); + reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + reply.setSendStreamId(packet.getReceiveStreamId()); + reply.setReceiveStreamId(packet.getSendStreamId()); + reply.setOptionalFrom(_manager.getSession().getMyDestination()); + // this just sends the packet - no retries or whatnot + _manager.getPacketQueue().enqueue(reply); + } + private void receiveUnknownCon(Packet packet, byte sendId[]) { if (packet.isFlagSet(Packet.FLAG_ECHO)) { if (packet.getSendStreamId() != null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java index 215a9e5e2..3ed8327dd 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java @@ -38,6 +38,7 @@ class SchedulerChooser { private List createSchedulers() { List rv = new ArrayList(8); + rv.add(new SchedulerHardDisconnected(_context)); rv.add(new SchedulerPreconnect(_context)); rv.add(new SchedulerConnecting(_context)); rv.add(new SchedulerReceived(_context)); @@ -54,8 +55,7 @@ class SchedulerChooser { } public void eventOccurred(Connection con) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Event occurred on " + con, new Exception("source")); + _log.log(Log.CRIT, "Yell at jrandom: Event occurred on " + con, new Exception("source")); } public boolean accept(Connection con) { return true; } }; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java index 9701310a6..f5f6b50ac 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java @@ -37,7 +37,7 @@ class SchedulerConnecting extends SchedulerImpl { public boolean accept(Connection con) { if (con == null) return false; boolean notYetConnected = (con.getIsConnected()) && - (con.getSendStreamId() == null) && + //(con.getSendStreamId() == null) && // not null on recv (con.getLastSendId() >= 0) && (con.getAckedPackets() <= 0) && (!con.getResetReceived()); @@ -55,6 +55,7 @@ class SchedulerConnecting extends SchedulerImpl { _log.debug("waited too long: " + waited); return; } else { + // should we be doing a con.sendAvailable here? if (con.getOptions().getConnectTimeout() > 0) reschedule(con.getOptions().getConnectTimeout(), con); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerHardDisconnected.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerHardDisconnected.java new file mode 100644 index 000000000..d20489567 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerHardDisconnected.java @@ -0,0 +1,45 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + *

Scheduler used after we've locally done a hard disconnect, + * but the final timeout hasn't passed.

+ * + *

Entry conditions:

+ * + *

Events:

+ * + *

Next states:

+ *
  • {@link SchedulerDead dead} - after the final timeout passes
  • + * + * + * + */ +class SchedulerHardDisconnected extends SchedulerImpl { + private Log _log; + public SchedulerHardDisconnected(I2PAppContext ctx) { + super(ctx); + _log = ctx.logManager().getLog(SchedulerHardDisconnected.class); + } + + public boolean accept(Connection con) { + if (con == null) return false; + long timeSinceClose = _context.clock().now() - con.getCloseSentOn(); + boolean ok = (con.getHardDisconnected()) && + (timeSinceClose < Connection.DISCONNECT_TIMEOUT); + return ok; + } + + public void eventOccurred(Connection con) { + // noop. we do the timeout through the simpleTimer anyway + } +} diff --git a/history.txt b/history.txt index fa8b91323..3750d09a7 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,21 @@ -$Id: history.txt,v 1.106 2004/12/14 06:54:39 jrandom Exp $ +$Id: history.txt,v 1.107 2004/12/14 11:42:35 jrandom Exp $ + +2004-12-15 jrandom + * Handle hard disconnects more gracefully within the streaming lib, and + log unmonitored events more aggressively. + * If we drop a peer after connection due to clock skew, log it to the + /logs.jsp#connectionlogs with relevent info. In addition, toss it in + the stat 'tcp.disconnectAfterSkew'. + * Fixed the formatting in the skew display + * Added an ERROR message that is fired once after we run out of + routerInfo files (thanks susi!) + * Set the connect timeout equal to the streaming lib's disconnect timeout + if not already specified (the I2PTunnel httpclient already enforces a + 60s connect timeout) + * Fix for another connection startup problem in the streaming lib. + * Fix for a stupid error in the probabalistic drop (rand <= P, not > P) + * Adjust the capacity calculations so that tunnel failures alone in the + last 10m will not trigger a 0 capacity rank. 2004-12-14 jrandom * Periodically send a message along all I2NP connections with the router's diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 69a9a3bae..914b1a82a 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.111 $ $Date: 2004/12/14 06:54:39 $"; + public final static String ID = "$Revision: 1.112 $ $Date: 2004/12/14 11:42:35 $"; public final static String VERSION = "0.4.2.3"; - public final static long BUILD = 5; + public final static long BUILD = 6; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index b05c2fd98..0e632c145 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -103,6 +103,7 @@ public class StatisticsManager implements Service { includeThroughput(stats); includeRate("transport.sendProcessingTime", stats, new long[] { 60*60*1000 }); + includeRate("tcp.probabalisticDropQueueSize", stats, new long[] { 60*1000l, 60*60*1000l }); //includeRate("tcp.queueSize", stats); //includeRate("jobQueue.jobLag", stats, new long[] { 60*1000, 60*60*1000 }); //includeRate("jobQueue.jobRun", stats, new long[] { 60*1000, 60*60*1000 }); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java index d6be8bbc5..83716e975 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java @@ -147,8 +147,10 @@ class PersistentDataStore extends TransientDataStore { } private class ReadJob extends JobImpl { + private boolean _alreadyWarned; public ReadJob() { super(PersistentDataStore.this._context); + _alreadyWarned = false; } public String getName() { return "DB Read Job"; } public void runJob() { @@ -158,6 +160,7 @@ class PersistentDataStore extends TransientDataStore { } private void readFiles() { + int routerCount = 0; try { File dbDir = getDbDir(); File leaseSetFiles[] = dbDir.listFiles(LeaseSetFilter.getInstance()); @@ -170,6 +173,9 @@ class PersistentDataStore extends TransientDataStore { } File routerInfoFiles[] = dbDir.listFiles(RouterInfoFilter.getInstance()); if (routerInfoFiles != null) { + routerCount += routerInfoFiles.length; + if (routerInfoFiles.length > 5) + _alreadyWarned = false; for (int i = 0; i < routerInfoFiles.length; i++) { Hash key = getRouterInfoHash(routerInfoFiles[i].getName()); if ( (key != null) && (!isKnown(key)) ) @@ -179,6 +185,11 @@ class PersistentDataStore extends TransientDataStore { } catch (IOException ioe) { _log.error("Error reading files in the db dir", ioe); } + + if ( (routerCount <= 5) && (!_alreadyWarned) ) { + _log.error("Very few routerInfo files remaining - please reseed"); + _alreadyWarned = true; + } } } diff --git a/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java b/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java index 18efe371a..35e36043b 100644 --- a/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java +++ b/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java @@ -107,10 +107,11 @@ public class CapacityCalculator extends Calculator { } else val -= stretch * (curRejected.getCurrentEventCount() + curRejected.getLastEventCount()); + val += GROWTH_FACTOR; + if (val >= 0) { - return (val + GROWTH_FACTOR); + return val; } else { - // failed too much, don't grow return 0.0d; } } diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 0b2fe2603..adf6cfe03 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -364,5 +364,5 @@ public abstract class TransportImpl implements Transport { /** Make this stuff pretty (only used in the old console) */ public String renderStatusHTML() { return null; } - protected RouterContext getContext() { return _context; } + public RouterContext getContext() { return _context; } } diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java index d30a35862..1e8da436b 100644 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java +++ b/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java @@ -440,7 +440,7 @@ public class ConnectionHandler { } else if ( (clockSkew > Router.CLOCK_FUDGE_FACTOR) || (clockSkew < 0 - Router.CLOCK_FUDGE_FACTOR) ) { status = STATUS_SKEWED; - SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddhhmmssSSS"); + SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddHHmmssSSS"); props.setProperty("SKEW", fmt.format(new Date(_context.clock().now()))); } else { try { @@ -603,7 +603,7 @@ public class ConnectionHandler { } else if ( (clockSkew > Router.CLOCK_FUDGE_FACTOR) || (clockSkew < 0 - Router.CLOCK_FUDGE_FACTOR) ) { status = STATUS_SKEWED; - SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddhhmmssSSS"); + SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddHHmmssSSS"); props.setProperty("SKEW", fmt.format(new Date(_context.clock().now()))); } else if (!sigOk) { status = STATUS_SIGNATURE_FAILED; diff --git a/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java b/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java index bbf913c92..cbe1ee7a5 100644 --- a/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java +++ b/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java @@ -25,7 +25,8 @@ public class MessageHandler implements I2NPMessageReader.I2NPMessageEventListene _con = con; _ident = con.getRemoteRouterIdentity(); _identHash = _ident.calculateHash(); - _log = con.getRouterContext().logManager().getLog(MessageHandler.class); + _log = con.getRouterContext().logManager().getLog(MessageHandler.class); + transport.getContext().statManager().createRateStat("tcp.disconnectAfterSkew", "How skewed a connection became before we killed it?", "TCP", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); } public void disconnected(I2NPMessageReader reader) { @@ -52,15 +53,20 @@ public class MessageHandler implements I2NPMessageReader.I2NPMessageEventListene private void timeMessageReceived(long remoteTime) { long delta = _con.getRouterContext().clock().now() - remoteTime; if ( (delta > Router.CLOCK_FUDGE_FACTOR) || (delta < 0 - Router.CLOCK_FUDGE_FACTOR) ) { - _log.error("Peer " + _identHash.toBase64().substring(0,6) + " is too far skewed (" - + DataHelper.formatDuration(delta) + ") after uptime of " - + DataHelper.formatDuration(_con.getLifetime()) ); _con.closeConnection(); + _transport.addConnectionErrorMessage("Peer " + _identHash.toBase64().substring(0,6) + + " is too far skewed (" + + DataHelper.formatDuration(delta) + ") after uptime of " + + DataHelper.formatDuration(_con.getLifetime())); + _transport.getContext().statManager().addRateData("tcp.disconnectAfterSkew", delta, _con.getLifetime()); } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Peer " + _identHash.toBase64().substring(0,6) + " is only skewed by (" - + DataHelper.formatDuration(delta) + ") after uptime of " - + DataHelper.formatDuration(_con.getLifetime()) ); + int level = Log.DEBUG; + if ( (delta > Router.CLOCK_FUDGE_FACTOR/2) || (delta < 0 - Router.CLOCK_FUDGE_FACTOR/2) ) + level = Log.WARN; + if (_log.shouldLog(level)) + _log.log(level, "Peer " + _identHash.toBase64().substring(0,6) + " is only skewed by (" + + DataHelper.formatDuration(delta) + ") after uptime of " + + DataHelper.formatDuration(_con.getLifetime()) ); } } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index 3840bbd05..089f70cc5 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -268,7 +268,7 @@ public class TCPConnection { for (int i = 0; i < _pendingMessages.size() && excessBytesQueued > 0; i++) { OutNetMessage msg = (OutNetMessage)_pendingMessages.get(i); int p = getDropProbability(msg.getMessageSize(), excessBytesQueued); - if (_context.random().nextInt(100) > p) { + if (_context.random().nextInt(100) < p) { _pendingMessages.remove(i); i--; msg.timestamp("Probabalistically dropped due to queue size " + excessBytesQueued);