diff --git a/history.txt b/history.txt index 4390b3ba2a..fd059f0e7e 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,11 @@ +2011-11-18 zzz + * NTCP: + - First cut at improving EventPumper performance (ticket #551) + - Establishment timeout cleanup/concurrent + - Remove some logging and stats + - Switch some Queues from LBQ to CLQ + - Static ByteBuffer cache + 2011-11-16 zzz * Console: Add Jetty version to logs page * NTCP: Reduce log level for race (ticket #392) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index f6e6df3a5c..4e28a2c8d2 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 4; + public final static long BUILD = 5; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 0512e79152..124c48a219 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -13,34 +13,46 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; +import java.util.Iterator; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.RouterIdentity; import net.i2p.data.RouterInfo; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.I2PThread; import net.i2p.util.Log; /** - * + * The main NTCP NIO thread. */ class EventPumper implements Runnable { private final RouterContext _context; private final Log _log; private volatile boolean _alive; private Selector _selector; - private final LinkedBlockingQueue _bufCache; - private final LinkedBlockingQueue _wantsRead = new LinkedBlockingQueue(); - private final LinkedBlockingQueue _wantsWrite = new LinkedBlockingQueue(); - private final LinkedBlockingQueue _wantsRegister = new LinkedBlockingQueue(); - private final LinkedBlockingQueue _wantsConRegister = new LinkedBlockingQueue(); + private final Set _wantsWrite = new ConcurrentHashSet(32); + /** + * The following 3 are unbounded and lockless for performance in runDelayedEvents() + */ + private final Queue _wantsRead = new ConcurrentLinkedQueue(); + private final Queue _wantsRegister = new ConcurrentLinkedQueue(); + private final Queue _wantsConRegister = new ConcurrentLinkedQueue(); private final NTCPTransport _transport; private long _expireIdleWriteTime; private static final int BUF_SIZE = 8*1024; private static final int MAX_CACHE_SIZE = 64; + + /** + * Shared if there are multiple routers in the JVM + */ + private static final LinkedBlockingQueue _bufCache = new LinkedBlockingQueue(MAX_CACHE_SIZE); + /** * every few seconds, iterate across all ntcp connections just to make sure * we have their interestOps set properly (and to expire any looong idle cons). @@ -49,6 +61,8 @@ class EventPumper implements Runnable { * the time to iterate across them to check a few flags shouldn't be a problem. */ private static final long FAILSAFE_ITERATION_FREQ = 2*1000l; + private static final long SELECTOR_LOOP_DELAY = 200; + /** tunnel test now disabled, but this should be long enough to allow an active tunnel to get started */ private static final long MIN_EXPIRE_IDLE_TIME = 135*1000l; private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l; @@ -57,8 +71,6 @@ class EventPumper implements Runnable { _context = ctx; _log = ctx.logManager().getLog(getClass()); _transport = transport; - _alive = false; - _bufCache = new LinkedBlockingQueue(MAX_CACHE_SIZE); _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME; } @@ -90,18 +102,33 @@ class EventPumper implements Runnable { return _alive || (_selector != null && _selector.isOpen()); } + /** + * Register the acceptor. + * This is only called from NTCPTransport.bindAddress(), so it isn't clear + * why this needs a queue. + */ public void register(ServerSocketChannel chan) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering server socket channel"); _wantsRegister.offer(chan); _selector.wakeup(); } + + /** + * Outbound + */ public void registerConnect(NTCPConnection con) { - if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering outbound connection"); - _context.statManager().addRateData("ntcp.registerConnect", 1, 0); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Registering " + con); + _context.statManager().addRateData("ntcp.registerConnect", 1); _wantsConRegister.offer(con); _selector.wakeup(); } + /** + * The selector loop. + * On high-bandwidth routers, this is the thread with the highest CPU usage, so + * take care to minimize overhead and unnecessary debugging stuff. + */ public void run() { long lastFailsafeIteration = System.currentTimeMillis(); while (_alive && _selector.isOpen()) { @@ -111,17 +138,17 @@ class EventPumper implements Runnable { try { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("before select..."); - count = _selector.select(200); + count = _selector.select(SELECTOR_LOOP_DELAY); } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error selecting", ioe); } if (count <= 0) continue; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("select returned " + count); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("select returned " + count); - Set selected = null; + Set selected; try { selected = _selector.selectedKeys(); } catch (ClosedSelectorException cse) { @@ -175,15 +202,15 @@ class EventPumper implements Runnable { if ((!key.isValid()) && (!((SocketChannel)key.channel()).isConnectionPending()) && con.getTimeSinceCreated() > 2 * NTCPTransport.ESTABLISH_TIMEOUT) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid key " + con); + if (_log.shouldLog(Log.INFO)) + _log.info("Invalid key " + con); // this will cancel the key, and it will then be removed from the keyset con.close(); failsafeInvalid++; continue; } - if ( (con.getWriteBufCount() > 0) && + if ( (!con.isWriteBufEmpty()) && ((key.interestOps() & SelectionKey.OP_WRITE) == 0) ) { // the data queued to be sent has already passed through // the bw limiter and really just wants to get shoved @@ -203,11 +230,11 @@ class EventPumper implements Runnable { } } if (failsafeWrites > 0) - _context.statManager().addRateData("ntcp.failsafeWrites", failsafeWrites, 0); + _context.statManager().addRateData("ntcp.failsafeWrites", failsafeWrites); if (failsafeCloses > 0) - _context.statManager().addRateData("ntcp.failsafeCloses", failsafeCloses, 0); + _context.statManager().addRateData("ntcp.failsafeCloses", failsafeCloses); if (failsafeInvalid > 0) - _context.statManager().addRateData("ntcp.failsafeInvalid", failsafeInvalid, 0); + _context.statManager().addRateData("ntcp.failsafeInvalid", failsafeInvalid); } catch (ClosedSelectorException cse) { continue; } @@ -249,8 +276,13 @@ class EventPumper implements Runnable { _wantsRead.clear(); _wantsRegister.clear(); _wantsWrite.clear(); + _bufCache.clear(); } + /** + * Process all keys from the last select. + * High-frequency path in thread. + */ private void processKeys(Set selected) { for (SelectionKey key : selected) { try { @@ -259,17 +291,17 @@ class EventPumper implements Runnable { boolean connect = (ops & SelectionKey.OP_CONNECT) != 0; boolean read = (ops & SelectionKey.OP_READ) != 0; boolean write = (ops & SelectionKey.OP_WRITE) != 0; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("ready ops for : " + key - + " accept? " + accept + " connect? " + connect - + " read? " + read - + "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0) - + " write? " + write - + "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0) - + " on " + key.attachment() - ); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("ready ops for : " + key + // + " accept? " + accept + " connect? " + connect + // + " read? " + read + // + "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0) + // + " write? " + write + // + "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0) + // + " on " + key.attachment() + // ); if (accept) { - _context.statManager().addRateData("ntcp.accept", 1, 0); + _context.statManager().addRateData("ntcp.accept", 1); processAccept(key); } if (connect) { @@ -277,12 +309,12 @@ class EventPumper implements Runnable { processConnect(key); } if (read) { - _context.statManager().addRateData("ntcp.read", 1, 0); + //_context.statManager().addRateData("ntcp.read", 1, 0); key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); processRead(key); } if (write) { - _context.statManager().addRateData("ntcp.write", 1, 0); + //_context.statManager().addRateData("ntcp.write", 1, 0); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); processWrite(key); } @@ -293,93 +325,105 @@ class EventPumper implements Runnable { } } + /** + * Called by the connection when it has data ready to write. + * If we have bandwidth, calls con.Write() which calls wantsWrite(con). + * If no bandwidth, calls con.queuedWrite(). + */ public void wantsWrite(NTCPConnection con, byte data[]) { ByteBuffer buf = ByteBuffer.wrap(data); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write");//con, buf); if (req.getPendingOutboundRequested() > 0) { if (_log.shouldLog(Log.INFO)) _log.info("queued write on " + con + " for " + data.length); - _context.statManager().addRateData("ntcp.wantsQueuedWrite", 1, 0); + _context.statManager().addRateData("ntcp.wantsQueuedWrite", 1); con.queuedWrite(buf, req); } else { // fully allocated - if (_log.shouldLog(Log.INFO)) - _log.info("fully allocated write on " + con + " for " + data.length); + //if (_log.shouldLog(Log.INFO)) + // _log.info("fully allocated write on " + con + " for " + data.length); con.write(buf); } } - /** called by the connection when it has data ready to write (after bw allocation) */ + + /** + * Called by the connection when it has data ready to write (after bw allocation). + * Only wakeup if new. + */ public void wantsWrite(NTCPConnection con) { - if (_log.shouldLog(Log.INFO)) - _log.info("Before adding wants to write on " + con); - if (!_wantsWrite.contains(con)) - _wantsWrite.offer(con); - if (_log.shouldLog(Log.INFO)) - _log.info("Wants to write on " + con); - _selector.wakeup(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("selector awoken for write"); + if (_wantsWrite.add(con)) { + _selector.wakeup(); + } } + + /** + * This is only called from NTCPConnection.complete() + * if there is more data, which is rare (never?) + * so we don't need to check for dups or make _wantsRead a Set. + */ public void wantsRead(NTCPConnection con) { - if (!_wantsRead.contains(con)) - _wantsRead.offer(con); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("wants to read on " + con); + _wantsRead.offer(con); _selector.wakeup(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("selector awoken for read"); } private static final int MIN_BUFS = 5; + /** - * There's only one pumper, so static is fine, unless multi router - * Is there a better way to do this? + * How many to keep in reserve. + * Shared if there are multiple routers in the JVM */ - private static int NUM_BUFS = 5; + private static int _numBufs = MIN_BUFS; private static int __liveBufs = 0; private static int __consecutiveExtra; - ByteBuffer acquireBuf() { + + /** + * High-frequency path in thread. + */ + private static ByteBuffer acquireBuf() { ByteBuffer rv = _bufCache.poll(); if (rv == null) { rv = ByteBuffer.allocate(BUF_SIZE); - NUM_BUFS = ++__liveBufs; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); - _context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0); + _numBufs = ++__liveBufs; + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); + //_context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0); } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("acquiring existing read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("acquiring existing read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); } return rv; } - void releaseBuf(ByteBuffer buf) { + /** + * High-frequency path in thread. + */ + private static void releaseBuf(ByteBuffer buf) { //if (false) return; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf); buf.clear(); int extra = _bufCache.size(); - boolean cached = extra < NUM_BUFS; + boolean cached = extra < _numBufs; if (cached) { _bufCache.offer(buf); if (extra > 5) { __consecutiveExtra++; if (__consecutiveExtra >= 20) { - NUM_BUFS = Math.max(NUM_BUFS - 1, MIN_BUFS); + _numBufs = Math.max(_numBufs - 1, MIN_BUFS); __consecutiveExtra = 0; } } } else { __liveBufs--; } - if (cached && _log.shouldLog(Log.DEBUG)) - _log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live"); + //if (cached && _log.shouldLog(Log.DEBUG)) + // _log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live"); } private void processAccept(SelectionKey key) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("processing accept"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("processing accept"); ServerSocketChannel servChan = (ServerSocketChannel)key.attachment(); try { SocketChannel chan = servChan.accept(); @@ -408,10 +452,10 @@ class EventPumper implements Runnable { SelectionKey ckey = chan.register(_selector, SelectionKey.OP_READ); NTCPConnection con = new NTCPConnection(_context, _transport, chan, ckey); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("new NTCP connection established: " +con); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("new NTCP connection established: " +con); } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) _log.error("Error accepting", ioe); + _log.error("Error accepting", ioe); } } @@ -427,11 +471,11 @@ class EventPumper implements Runnable { chan.socket().setKeepAlive(true); con.setKey(key); con.outboundConnected(); - _context.statManager().addRateData("ntcp.connectSuccessful", 1, 0); + _context.statManager().addRateData("ntcp.connectSuccessful", 1); } else { con.close(); _transport.markUnreachable(con.getRemotePeer().calculateHash()); - _context.statManager().addRateData("ntcp.connectFailedTimeout", 1, 0); + _context.statManager().addRateData("ntcp.connectFailedTimeout", 1); } } catch (IOException ioe) { // this is the usual failure path for a timeout or connect refused if (_log.shouldLog(Log.WARN)) @@ -439,25 +483,28 @@ class EventPumper implements Runnable { con.close(); //_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "Error connecting", NTCPTransport.STYLE); _transport.markUnreachable(con.getRemotePeer().calculateHash()); - _context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1, 0); + _context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1); } catch (NoConnectionPendingException ncpe) { // ignore } } + /** + * High-frequency path in thread. + */ private void processRead(SelectionKey key) { NTCPConnection con = (NTCPConnection)key.attachment(); ByteBuffer buf = acquireBuf(); try { int read = con.getChannel().read(buf); if (read == -1) { - if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con); - _context.statManager().addRateData("ntcp.readEOF", 1, 0); + //if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con); + _context.statManager().addRateData("ntcp.readEOF", 1); con.close(); releaseBuf(buf); } else if (read == 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("nothing to read for " + con + ", but stay interested"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("nothing to read for " + con + ", but stay interested"); key.interestOps(key.interestOps() | SelectionKey.OP_READ); releaseBuf(buf); } else if (read > 0) { @@ -465,19 +512,20 @@ class EventPumper implements Runnable { buf.flip(); buf.get(data); releaseBuf(buf); - buf=null; + buf = null; ByteBuffer rbuf = ByteBuffer.wrap(data); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf); if (req.getPendingInboundRequested() > 0) { + // rare since we generally don't throttle inbound key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("bw throttled reading for " + con + ", so we don't want to read anymore"); - _context.statManager().addRateData("ntcp.queuedRecv", read, 0); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("bw throttled reading for " + con + ", so we don't want to read anymore"); + _context.statManager().addRateData("ntcp.queuedRecv", read); con.queuedRecv(rbuf, req); } else { // fully allocated - if (_log.shouldLog(Log.DEBUG)) - _log.debug("not bw throttled reading for " + con); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("not bw throttled reading for " + con); key.interestOps(key.interestOps() | SelectionKey.OP_READ); con.recv(rbuf); } @@ -485,63 +533,64 @@ class EventPumper implements Runnable { } catch (CancelledKeyException cke) { if (_log.shouldLog(Log.WARN)) _log.warn("error reading", cke); con.close(); - _context.statManager().addRateData("ntcp.readError", 1, 0); + _context.statManager().addRateData("ntcp.readError", 1); if (buf != null) releaseBuf(buf); } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe); con.close(); - _context.statManager().addRateData("ntcp.readError", 1, 0); + _context.statManager().addRateData("ntcp.readError", 1); if (buf != null) releaseBuf(buf); } catch (NotYetConnectedException nyce) { // ??? } } + /** + * High-frequency path in thread. + */ private void processWrite(SelectionKey key) { - int totalWritten = 0; - int buffers = 0; - long before = System.currentTimeMillis(); + //int totalWritten = 0; + //int buffers = 0; + //long before = System.currentTimeMillis(); NTCPConnection con = (NTCPConnection)key.attachment(); try { while (true) { ByteBuffer buf = con.getNextWriteBuf(); if (buf != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("writing " + buf.remaining()+"..."); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("writing " + buf.remaining()+"..."); if (buf.remaining() <= 0) { - long beforeRem = System.currentTimeMillis(); + //long beforeRem = System.currentTimeMillis(); con.removeWriteBuf(buf); - long afterRem = System.currentTimeMillis(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("buffer was already fully written and removed after " + (afterRem-beforeRem) + "..."); - buf = null; - buffers++; + //long afterRem = System.currentTimeMillis(); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("buffer was already fully written and removed after " + (afterRem-beforeRem) + "..."); + //buffers++; continue; } int written = con.getChannel().write(buf); - totalWritten += written; + //totalWritten += written; if (written == 0) { - if ( (buf.remaining() > 0) || (con.getWriteBufCount() >= 1) ) { - if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains..."); + if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) { + //if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains..."); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } else { - if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, no data remains..."); + //if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, no data remains..."); } break; } else if (buf.remaining() > 0) { - if (_log.shouldLog(Log.DEBUG)) _log.debug("buffer data remaining..."); + //if (_log.shouldLog(Log.DEBUG)) _log.debug("buffer data remaining..."); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); break; } else { - long beforeRem = System.currentTimeMillis(); + //long beforeRem = System.currentTimeMillis(); con.removeWriteBuf(buf); - long afterRem = System.currentTimeMillis(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("buffer "+ buffers+"/"+written+"/"+totalWritten+" fully written after " + - (beforeRem-before) + ", then removed after " + (afterRem-beforeRem) + "..."); + //long afterRem = System.currentTimeMillis(); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("buffer "+ buffers+"/"+written+"/"+totalWritten+" fully written after " + + // (beforeRem-before) + ", then removed after " + (afterRem-beforeRem) + "..."); //releaseBuf(buf); - buf = null; - buffers++; + //buffers++; //if (buffer time is too much, add OP_WRITe to the interest ops and break?) } } else { @@ -550,19 +599,24 @@ class EventPumper implements Runnable { } } catch (CancelledKeyException cke) { if (_log.shouldLog(Log.WARN)) _log.warn("error writing", cke); - _context.statManager().addRateData("ntcp.writeError", 1, 0); + _context.statManager().addRateData("ntcp.writeError", 1); con.close(); } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe); - _context.statManager().addRateData("ntcp.writeError", 1, 0); + _context.statManager().addRateData("ntcp.writeError", 1); con.close(); } - long after = System.currentTimeMillis(); - if (_log.shouldLog(Log.INFO)) - _log.info("Wrote " + totalWritten + " in " + buffers + " buffers on " + con - + " after " + (after-before)); + //long after = System.currentTimeMillis(); + //if (_log.shouldLog(Log.INFO)) + // _log.info("Wrote " + totalWritten + " in " + buffers + " buffers on " + con + // + " after " + (after-before)); } + /** + * Pull off the 4 _wants* queues and update the interest ops, + * which may, according to the javadocs, be a "naive" implementation and block. + * High-frequency path in thread. + */ private void runDelayedEvents() { NTCPConnection con; while ((con = _wantsRead.poll()) != null) { @@ -590,19 +644,25 @@ class EventPumper implements Runnable { } } - while ((con = _wantsWrite.poll()) != null) { - SelectionKey key = con.getKey(); - try { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - } catch (CancelledKeyException cke) { - // ignore - } catch (IllegalArgumentException iae) { - // see above - if (_log.shouldLog(Log.WARN)) - _log.warn("gnu?", iae); + // check before instantiating iterator for speed + if (!_wantsWrite.isEmpty()) { + for (Iterator iter = _wantsWrite.iterator(); iter.hasNext(); ) { + con = iter.next(); + iter.remove(); + SelectionKey key = con.getKey(); + try { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } catch (CancelledKeyException cke) { + // ignore + } catch (IllegalArgumentException iae) { + // see above + if (_log.shouldLog(Log.WARN)) + _log.warn("gnu?", iae); + } } } + // only when address changes ServerSocketChannel chan; while ((chan = _wantsRegister.poll()) != null) { try { @@ -625,13 +685,13 @@ class EventPumper implements Runnable { InetSocketAddress saddr = new InetSocketAddress(naddr.getHost(), naddr.getPort()); boolean connected = con.getChannel().connect(saddr); if (connected) { - _context.statManager().addRateData("ntcp.connectImmediate", 1, 0); + _context.statManager().addRateData("ntcp.connectImmediate", 1); key.interestOps(SelectionKey.OP_READ); processConnect(key); } } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error connecting", ioe); - _context.statManager().addRateData("ntcp.connectFailedIOE", 1, 0); + _context.statManager().addRateData("ntcp.connectFailedIOE", 1); _transport.markUnreachable(con.getRemotePeer().calculateHash()); //if (ntcpOnly(con)) { // _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage()); @@ -642,7 +702,7 @@ class EventPumper implements Runnable { //} } catch (UnresolvedAddressException uae) { if (_log.shouldLog(Log.WARN)) _log.warn("unresolved address connecting", uae); - _context.statManager().addRateData("ntcp.connectFailedUnresolved", 1, 0); + _context.statManager().addRateData("ntcp.connectFailedUnresolved", 1); _transport.markUnreachable(con.getRemotePeer().calculateHash()); //if (ntcpOnly(con)) { // _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage()); @@ -671,6 +731,7 @@ class EventPumper implements Runnable { * but if they support other transports (eg ssu) we should allow those transports to be * tried as well. */ +/**** private boolean ntcpOnly(NTCPConnection con) { RouterIdentity ident = con.getRemotePeer(); if (ident == null) return true; @@ -678,10 +739,13 @@ class EventPumper implements Runnable { if (info == null) return true; return info.getAddresses().size() == 1; } +****/ private long _lastExpired; + private void expireTimedOut() { _transport.expireTimedOut(); } + public long getIdleTimeout() { return _expireIdleWriteTime; } } diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 6736637619..507cde2cc7 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -8,7 +8,9 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.zip.Adler32; @@ -58,13 +60,17 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private final Log _log; private SocketChannel _chan; private SelectionKey _conKey; - /** list of ByteBuffer containing data we have read and are ready to process, oldest first */ - private final LinkedBlockingQueue _readBufs; + /** + * queue of ByteBuffer containing data we have read and are ready to process, oldest first + * unbounded and lockless + */ + private final Queue _readBufs; /** * list of ByteBuffers containing fully populated and encrypted data, ready to write, * and already cleared through the bandwidth limiter. + * unbounded and lockless */ - private final LinkedBlockingQueue _writeBufs; + private final Queue _writeBufs; /** Requests that were not granted immediately */ private final Set _bwRequests; private boolean _established; @@ -79,7 +85,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { /** * pending unprepared OutNetMessage instances */ - private final LinkedBlockingQueue _outbound; + private final Queue _outbound; /** * current prepared OutNetMessage, or null - synchronize on _outbound to modify * FIXME why do we need this??? @@ -132,9 +138,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _created = System.currentTimeMillis(); _transport = transport; _chan = chan; - _readBufs = new LinkedBlockingQueue(); - _writeBufs = new LinkedBlockingQueue(); + _readBufs = new ConcurrentLinkedQueue(); + _writeBufs = new ConcurrentLinkedQueue(); _bwRequests = new ConcurrentHashSet(2); + // TODO possible switch to CLQ but beware non-constant size() - see below _outbound = new LinkedBlockingQueue(); _isInbound = true; _decryptBlockBuf = new byte[16]; @@ -156,9 +163,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _transport = transport; _remotePeer = remotePeer; _remAddr = remAddr; - _readBufs = new LinkedBlockingQueue(); - _writeBufs = new LinkedBlockingQueue(); + _readBufs = new ConcurrentLinkedQueue(); + _writeBufs = new ConcurrentLinkedQueue(); _bwRequests = new ConcurrentHashSet(2); + // TODO possible switch to CLQ but beware non-constant size() - see below _outbound = new LinkedBlockingQueue(); _isInbound = false; _decryptBlockBuf = new byte[16]; @@ -199,7 +207,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _establishedOn = System.currentTimeMillis(); _transport.inboundEstablished(this); _establishState = null; - _nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY); + _nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY); _nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); } @@ -283,7 +291,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (Exception e) {} if (_log.shouldLog(Log.WARN)) { int blocks = _writeBufs.size(); - _log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64()); + _log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash()); } _context.statManager().addRateData("ntcp.closeOnBacklog", getUptime(), getUptime()); close(); @@ -359,7 +367,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (target != null) { infoMsg.setTarget(target); infoMsg.beginSend(); - _context.statManager().addRateData("ntcp.infoMessageEnqueued", 1, 0); + _context.statManager().addRateData("ntcp.infoMessageEnqueued", 1); send(infoMsg); // See comment below @@ -435,7 +443,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _transport.markReachable(getRemotePeer().calculateHash(), false); //_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE); boolean msgs = !_outbound.isEmpty(); - _nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY); + _nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY); _nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); if (msgs) _transport.getWriter().wantsWrite(this, "outbound established"); @@ -471,6 +479,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { //else // prepareNextWriteSmall(); } + /********** nobody's tried this one in years private void prepareNextWriteSmall() { if (_log.shouldLog(Log.DEBUG)) @@ -593,7 +602,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (_nextMetaTime <= System.currentTimeMillis()) { sendMeta(); - _nextMetaTime = System.currentTimeMillis() + META_FREQUENCY + _context.random().nextInt(META_FREQUENCY); + _nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY); } OutNetMessage msg = null; @@ -784,7 +793,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { removeRequest(req); ByteBuffer buf = (ByteBuffer)req.attachment(); if (req.getTotalInboundRequested() > 0) { - _context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime()), 0); + _context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime())); recv(buf); // our reads used to be bw throttled (during which time we were no // longer interested in reading from the network), but we aren't @@ -792,7 +801,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _transport.getPumper().wantsRead(this); //_transport.getReader().wantsRead(this); } else if (req.getTotalOutboundRequested() > 0) { - _context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()), 0); + _context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime())); write(buf); } } @@ -836,14 +845,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _transport.getReader().wantsRead(this); updateStats(); } + /** * The contents of the buffer have been encrypted / padded / etc and have * been fully allocated for the bandwidth limiter. */ public void write(ByteBuffer buf) { - if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)"); + //if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)"); _writeBufs.offer(buf); - if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)"); + //if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)"); _transport.getPumper().wantsWrite(this); } @@ -852,23 +862,25 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { return _readBufs.poll(); } - /** since getNextReadBuf() removes, this should not be necessary */ - public void removeReadBuf(ByteBuffer buf) { - _readBufs.remove(buf); - //_transport.getPumper().releaseBuf(buf); + /** + * Replaces getWriteBufCount() + * @since 0.8.12 + */ + public boolean isWriteBufEmpty() { + return _writeBufs.isEmpty(); } - - public int getWriteBufCount() { return _writeBufs.size(); } /** @return null if none available */ public ByteBuffer getNextWriteBuf() { return _writeBufs.peek(); // not remove! we removeWriteBuf afterwards } + /** + * Remove the buffer, which _should_ be the one at the head of _writeBufs + */ public void removeWriteBuf(ByteBuffer buf) { _bytesSent += buf.capacity(); OutNetMessage msg = null; - boolean bufsRemain = false; boolean clearMessage = false; if (_sendingMeta && (buf.capacity() == _meta.length)) { _sendingMeta = false; @@ -876,7 +888,6 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { clearMessage = true; } _writeBufs.remove(buf); - bufsRemain = !_writeBufs.isEmpty(); if (clearMessage) { // see synchronization comments in prepareNextWriteFast() synchronized (_outbound) { @@ -904,8 +915,13 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { boolean msgs = ((!_outbound.isEmpty()) || (_currentOutbound != null)); if (msgs) // push through the bw limiter to reach _writeBufs _transport.getWriter().wantsWrite(this, "write completed"); - if (bufsRemain) // send asap - _transport.getPumper().wantsWrite(this); + + // this is not necessary, EventPumper.processWrite() handles this + // and it just causes unnecessary selector.wakeup() and looping + //boolean bufsRemain = !_writeBufs.isEmpty(); + //if (bufsRemain) // send asap + // _transport.getPumper().wantsWrite(this); + updateStats(); } @@ -982,8 +998,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { // _log.debug("parse decrypted i2np block (remaining: " + buf.remaining() + ")"); boolean ok = recvUnencryptedI2NP(); if (!ok) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data"); + _log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data"); _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1, getUptime()); return; } @@ -999,8 +1014,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private boolean recvUnencryptedI2NP() { _curReadState.receiveBlock(_decryptBlockBuf); if (_curReadState.getSize() > BUFFER_SIZE) { - if (_log.shouldLog(Log.ERROR)) - _log.error("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString()); + _log.error("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString()); _context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize(), getUptime()); close(); return false; @@ -1114,7 +1128,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _dataReadBufs.offer(buf); } - /** @since 0.8.8 */ + /** + * Call at transport shutdown + * @since 0.8.8 + */ static void releaseResources() { _i2npHandlers.clear(); _dataReadBufs.clear(); @@ -1278,9 +1295,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { @Override public String toString() { - return "NTCP Connection to " + - (_remotePeer == null ? "unknown " : _remotePeer.calculateHash().toBase64().substring(0,6)) + - " inbound? " + _isInbound + " established? " + _established + + return "NTCP conn " + + (_isInbound ? "from " : "to ") + + (_remotePeer == null ? "unknown" : _remotePeer.calculateHash().toBase64().substring(0,6)) + + (_established ? "" : " not established") + " created " + DataHelper.formatDuration(getTimeSinceCreated()) + " ago"; } } diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 8ad1ea8c32..5db11954d8 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -14,6 +14,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeSet; import java.util.Vector; @@ -29,6 +30,7 @@ import net.i2p.router.transport.CommSystemFacadeImpl; import net.i2p.router.transport.Transport; import net.i2p.router.transport.TransportBid; import net.i2p.router.transport.TransportImpl; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; import net.i2p.util.Translate; @@ -54,7 +56,7 @@ public class NTCPTransport extends TransportImpl { * list of NTCPConnection of connections not yet established that we * want to remove on establishment or close on timeout */ - private final List _establishing; + private final Set _establishing; /** this is rarely if ever used, default is to bind to wildcard address */ public static final String PROP_BIND_INTERFACE = "i2np.ntcp.bindInterface"; @@ -115,14 +117,14 @@ public class NTCPTransport extends TransportImpl { _context.statManager().createRateStat("ntcp.invalidInboundSize", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.invalidInboundSkew", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.invalidSignature", "", "ntcp", RATES); - _context.statManager().createRateStat("ntcp.liveReadBufs", "", "ntcp", RATES); + //_context.statManager().createRateStat("ntcp.liveReadBufs", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.multipleCloseOnRemove", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.outboundEstablishFailed", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.outboundFailedIOEImmediate", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.invalidOutboundSkew", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.noBidTooLargeI2NP", "send size", "ntcp", RATES); _context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", RATES); - _context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES); + //_context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.readError", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.receiveCorruptEstablishment", "", "ntcp", RATES); @@ -131,9 +133,9 @@ public class NTCPTransport extends TransportImpl { _context.statManager().createRateStat("ntcp.throttledReadComplete", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.throttledWriteComplete", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.wantsQueuedWrite", "", "ntcp", RATES); - _context.statManager().createRateStat("ntcp.write", "", "ntcp", RATES); + //_context.statManager().createRateStat("ntcp.write", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.writeError", "", "ntcp", RATES); - _establishing = new ArrayList(4); + _establishing = new ConcurrentHashSet(16); _conLock = new Object(); _conByIdent = new HashMap(64); @@ -152,7 +154,7 @@ public class NTCPTransport extends TransportImpl { } void inboundEstablished(NTCPConnection con) { - _context.statManager().addRateData("ntcp.inboundEstablished", 1, 0); + _context.statManager().addRateData("ntcp.inboundEstablished", 1); markReachable(con.getRemotePeer().calculateHash(), true); //_context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash()); NTCPConnection old = null; @@ -162,7 +164,7 @@ public class NTCPTransport extends TransportImpl { if (old != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Old connection closed: " + old + " replaced by " + con); - _context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", old.getUptime(), 0); + _context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", old.getUptime()); old.close(); } } @@ -203,7 +205,7 @@ public class NTCPTransport extends TransportImpl { } catch (IOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error("Error opening a channel", ioe); - _context.statManager().addRateData("ntcp.outboundFailedIOEImmediate", 1, 0); + _context.statManager().addRateData("ntcp.outboundFailedIOEImmediate", 1); con.close(); } } else { @@ -263,17 +265,17 @@ public class NTCPTransport extends TransportImpl { return null; if (dataSize > NTCPConnection.MAX_MSG_SIZE) { // let SSU deal with it - _context.statManager().addRateData("ntcp.noBidTooLargeI2NP", dataSize, 0); + _context.statManager().addRateData("ntcp.noBidTooLargeI2NP", dataSize); return null; } Hash peer = toAddress.getIdentity().calculateHash(); if (_context.shitlist().isShitlisted(peer, STYLE)) { // we aren't shitlisted in general (since we are trying to get a bid), but we have // recently shitlisted the peer on the NTCP transport, so don't try it - _context.statManager().addRateData("ntcp.attemptShitlistedPeer", 1, 0); + _context.statManager().addRateData("ntcp.attemptShitlistedPeer", 1); return null; } else if (isUnreachable(peer)) { - _context.statManager().addRateData("ntcp.attemptUnreachablePeer", 1, 0); + _context.statManager().addRateData("ntcp.attemptUnreachablePeer", 1); return null; } @@ -287,7 +289,7 @@ public class NTCPTransport extends TransportImpl { if (addr == null) { markUnreachable(peer); - _context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1, 0); + _context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1); //_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE); if (_log.shouldLog(Log.DEBUG)) _log.debug("no bid when trying to send to " + peer.toBase64() + " as they don't have an ntcp address"); @@ -295,7 +297,7 @@ public class NTCPTransport extends TransportImpl { } NTCPAddress naddr = new NTCPAddress(addr); if ( (naddr.getPort() <= 0) || (naddr.getHost() == null) ) { - _context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1, 0); + _context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1); markUnreachable(peer); //_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE); if (_log.shouldLog(Log.DEBUG)) @@ -304,7 +306,7 @@ public class NTCPTransport extends TransportImpl { } if (!naddr.isPubliclyRoutable()) { if (! _context.getProperty("i2np.ntcp.allowLocal", "false").equals("true")) { - _context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1, 0); + _context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1); markUnreachable(peer); if (_log.shouldLog(Log.DEBUG)) _log.debug("no bid when trying to send to " + peer.toBase64() + " as they have a private ntcp address"); @@ -373,7 +375,7 @@ public class NTCPTransport extends TransportImpl { if ( (removed != null) && (removed != con) ) {// multiple cons, close 'em both if (_log.shouldLog(Log.WARN)) _log.warn("Multiple connections on remove, closing " + removed + " (already closed " + con + ")"); - _context.statManager().addRateData("ntcp.multipleCloseOnRemove", removed.getUptime(), 0); + _context.statManager().addRateData("ntcp.multipleCloseOnRemove", removed.getUptime()); removed.close(); } } @@ -566,40 +568,31 @@ public class NTCPTransport extends TransportImpl { * the con must be established to avoid premature close()ing */ public static final int ESTABLISH_TIMEOUT = 10*1000; + /** add us to the establishment timeout process */ void establishing(NTCPConnection con) { - synchronized (_establishing) { _establishing.add(con); - } } /** * called in the EventPumper no more than once a second or so, closing * any unconnected/unestablished connections */ void expireTimedOut() { - List expired = null; - synchronized (_establishing) { - for (int i = 0; i < _establishing.size(); i++) { - NTCPConnection con = (NTCPConnection)_establishing.get(i); - if (con.isClosed()) { - _establishing.remove(i); - i--; - } else if (con.isEstablished()) { - _establishing.remove(i); - i--; + int expired = 0; + + for (Iterator iter = _establishing.iterator(); iter.hasNext(); ) { + NTCPConnection con = iter.next(); + if (con.isClosed() || con.isEstablished()) { + iter.remove(); } else if (con.getTimeSinceCreated() > ESTABLISH_TIMEOUT) { - _establishing.remove(i); - i--; - if (expired == null) - expired = new ArrayList(2); - expired.add(con); + iter.remove(); + con.close(); + expired++; } } - } - for (int i = 0; expired != null && i < expired.size(); i++) - ((NTCPConnection)expired.get(i)).close(); - if ( (expired != null) && (!expired.isEmpty()) ) - _context.statManager().addRateData("ntcp.outboundEstablishFailed", expired.size(), 0); + + if (expired > 0) + _context.statManager().addRateData("ntcp.outboundEstablishFailed", expired); } //private boolean bindAllInterfaces() { return true; } diff --git a/router/java/src/net/i2p/router/transport/ntcp/Reader.java b/router/java/src/net/i2p/router/transport/ntcp/Reader.java index f98cf7d40a..55ea09f288 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/Reader.java +++ b/router/java/src/net/i2p/router/transport/ntcp/Reader.java @@ -67,6 +67,7 @@ class Reader { if (_log.shouldLog(Log.DEBUG)) _log.debug("wantsRead: " + con + " already live? " + already); } + public void connectionClosed(NTCPConnection con) { synchronized (_pendingConnections) { _readAfterLive.remove(con); @@ -135,14 +136,12 @@ class Reader { } else { // hmm, there shouldn't be a race here - only one reader should // be running on a con at a time... - if (_log.shouldLog(Log.ERROR)) - _log.error("no establishment state but " + con + " is established... race?"); + _log.error("no establishment state but " + con + " is established... race?"); break; } } if (est.isComplete()) { // why is it complete yet !con.isEstablished? - if (_log.shouldLog(Log.ERROR)) _log.error("establishment state [" + est + "] is complete, yet the connection isn't established? " + con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")"); break; @@ -152,7 +151,7 @@ class Reader { if (_log.shouldLog(Log.WARN)) _log.warn("closing connection on establishment because: " +est.getError(), est.getException()); if (!est.getFailedBySkew()) - _context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1, 0); + _context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1); con.close(); return; } else if (buf.remaining() <= 0) {