forked from I2P_Developers/i2p.i2p
* NTCP:
- First cut at improving EventPumper performance (ticket #551) - Establishment timeout cleanup/concurrent - Remove some logging and stats - Switch some Queues from LBQ to CLQ - Static ByteBuffer cache
This commit is contained in:
@ -1,3 +1,11 @@
|
||||
2011-11-18 zzz
|
||||
* NTCP:
|
||||
- First cut at improving EventPumper performance (ticket #551)
|
||||
- Establishment timeout cleanup/concurrent
|
||||
- Remove some logging and stats
|
||||
- Switch some Queues from LBQ to CLQ
|
||||
- Static ByteBuffer cache
|
||||
|
||||
2011-11-16 zzz
|
||||
* Console: Add Jetty version to logs page
|
||||
* NTCP: Reduce log level for race (ticket #392)
|
||||
|
@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 4;
|
||||
public final static long BUILD = 5;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
@ -13,34 +13,46 @@ import java.nio.channels.Selector;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.channels.UnresolvedAddressException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import net.i2p.data.RouterIdentity;
|
||||
import net.i2p.data.RouterInfo;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
*
|
||||
* The main NTCP NIO thread.
|
||||
*/
|
||||
class EventPumper implements Runnable {
|
||||
private final RouterContext _context;
|
||||
private final Log _log;
|
||||
private volatile boolean _alive;
|
||||
private Selector _selector;
|
||||
private final LinkedBlockingQueue<ByteBuffer> _bufCache;
|
||||
private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue<NTCPConnection>();
|
||||
private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue<NTCPConnection>();
|
||||
private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue<ServerSocketChannel>();
|
||||
private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue<NTCPConnection>();
|
||||
private final Set<NTCPConnection> _wantsWrite = new ConcurrentHashSet<NTCPConnection>(32);
|
||||
/**
|
||||
* The following 3 are unbounded and lockless for performance in runDelayedEvents()
|
||||
*/
|
||||
private final Queue<NTCPConnection> _wantsRead = new ConcurrentLinkedQueue<NTCPConnection>();
|
||||
private final Queue<ServerSocketChannel> _wantsRegister = new ConcurrentLinkedQueue<ServerSocketChannel>();
|
||||
private final Queue<NTCPConnection> _wantsConRegister = new ConcurrentLinkedQueue<NTCPConnection>();
|
||||
private final NTCPTransport _transport;
|
||||
private long _expireIdleWriteTime;
|
||||
|
||||
private static final int BUF_SIZE = 8*1024;
|
||||
private static final int MAX_CACHE_SIZE = 64;
|
||||
|
||||
/**
|
||||
* Shared if there are multiple routers in the JVM
|
||||
*/
|
||||
private static final LinkedBlockingQueue<ByteBuffer> _bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE);
|
||||
|
||||
/**
|
||||
* every few seconds, iterate across all ntcp connections just to make sure
|
||||
* we have their interestOps set properly (and to expire any looong idle cons).
|
||||
@ -49,6 +61,8 @@ class EventPumper implements Runnable {
|
||||
* the time to iterate across them to check a few flags shouldn't be a problem.
|
||||
*/
|
||||
private static final long FAILSAFE_ITERATION_FREQ = 2*1000l;
|
||||
private static final long SELECTOR_LOOP_DELAY = 200;
|
||||
|
||||
/** tunnel test now disabled, but this should be long enough to allow an active tunnel to get started */
|
||||
private static final long MIN_EXPIRE_IDLE_TIME = 135*1000l;
|
||||
private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l;
|
||||
@ -57,8 +71,6 @@ class EventPumper implements Runnable {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(getClass());
|
||||
_transport = transport;
|
||||
_alive = false;
|
||||
_bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE);
|
||||
_expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
|
||||
}
|
||||
|
||||
@ -90,18 +102,33 @@ class EventPumper implements Runnable {
|
||||
return _alive || (_selector != null && _selector.isOpen());
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the acceptor.
|
||||
* This is only called from NTCPTransport.bindAddress(), so it isn't clear
|
||||
* why this needs a queue.
|
||||
*/
|
||||
public void register(ServerSocketChannel chan) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering server socket channel");
|
||||
_wantsRegister.offer(chan);
|
||||
_selector.wakeup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Outbound
|
||||
*/
|
||||
public void registerConnect(NTCPConnection con) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering outbound connection");
|
||||
_context.statManager().addRateData("ntcp.registerConnect", 1, 0);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Registering " + con);
|
||||
_context.statManager().addRateData("ntcp.registerConnect", 1);
|
||||
_wantsConRegister.offer(con);
|
||||
_selector.wakeup();
|
||||
}
|
||||
|
||||
/**
|
||||
* The selector loop.
|
||||
* On high-bandwidth routers, this is the thread with the highest CPU usage, so
|
||||
* take care to minimize overhead and unnecessary debugging stuff.
|
||||
*/
|
||||
public void run() {
|
||||
long lastFailsafeIteration = System.currentTimeMillis();
|
||||
while (_alive && _selector.isOpen()) {
|
||||
@ -111,17 +138,17 @@ class EventPumper implements Runnable {
|
||||
try {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("before select...");
|
||||
count = _selector.select(200);
|
||||
count = _selector.select(SELECTOR_LOOP_DELAY);
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error selecting", ioe);
|
||||
}
|
||||
if (count <= 0)
|
||||
continue;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("select returned " + count);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("select returned " + count);
|
||||
|
||||
Set<SelectionKey> selected = null;
|
||||
Set<SelectionKey> selected;
|
||||
try {
|
||||
selected = _selector.selectedKeys();
|
||||
} catch (ClosedSelectorException cse) {
|
||||
@ -175,15 +202,15 @@ class EventPumper implements Runnable {
|
||||
if ((!key.isValid()) &&
|
||||
(!((SocketChannel)key.channel()).isConnectionPending()) &&
|
||||
con.getTimeSinceCreated() > 2 * NTCPTransport.ESTABLISH_TIMEOUT) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid key " + con);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Invalid key " + con);
|
||||
// this will cancel the key, and it will then be removed from the keyset
|
||||
con.close();
|
||||
failsafeInvalid++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( (con.getWriteBufCount() > 0) &&
|
||||
if ( (!con.isWriteBufEmpty()) &&
|
||||
((key.interestOps() & SelectionKey.OP_WRITE) == 0) ) {
|
||||
// the data queued to be sent has already passed through
|
||||
// the bw limiter and really just wants to get shoved
|
||||
@ -203,11 +230,11 @@ class EventPumper implements Runnable {
|
||||
}
|
||||
}
|
||||
if (failsafeWrites > 0)
|
||||
_context.statManager().addRateData("ntcp.failsafeWrites", failsafeWrites, 0);
|
||||
_context.statManager().addRateData("ntcp.failsafeWrites", failsafeWrites);
|
||||
if (failsafeCloses > 0)
|
||||
_context.statManager().addRateData("ntcp.failsafeCloses", failsafeCloses, 0);
|
||||
_context.statManager().addRateData("ntcp.failsafeCloses", failsafeCloses);
|
||||
if (failsafeInvalid > 0)
|
||||
_context.statManager().addRateData("ntcp.failsafeInvalid", failsafeInvalid, 0);
|
||||
_context.statManager().addRateData("ntcp.failsafeInvalid", failsafeInvalid);
|
||||
} catch (ClosedSelectorException cse) {
|
||||
continue;
|
||||
}
|
||||
@ -249,8 +276,13 @@ class EventPumper implements Runnable {
|
||||
_wantsRead.clear();
|
||||
_wantsRegister.clear();
|
||||
_wantsWrite.clear();
|
||||
_bufCache.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all keys from the last select.
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
private void processKeys(Set<SelectionKey> selected) {
|
||||
for (SelectionKey key : selected) {
|
||||
try {
|
||||
@ -259,17 +291,17 @@ class EventPumper implements Runnable {
|
||||
boolean connect = (ops & SelectionKey.OP_CONNECT) != 0;
|
||||
boolean read = (ops & SelectionKey.OP_READ) != 0;
|
||||
boolean write = (ops & SelectionKey.OP_WRITE) != 0;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("ready ops for : " + key
|
||||
+ " accept? " + accept + " connect? " + connect
|
||||
+ " read? " + read
|
||||
+ "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0)
|
||||
+ " write? " + write
|
||||
+ "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0)
|
||||
+ " on " + key.attachment()
|
||||
);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("ready ops for : " + key
|
||||
// + " accept? " + accept + " connect? " + connect
|
||||
// + " read? " + read
|
||||
// + "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0)
|
||||
// + " write? " + write
|
||||
// + "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0)
|
||||
// + " on " + key.attachment()
|
||||
// );
|
||||
if (accept) {
|
||||
_context.statManager().addRateData("ntcp.accept", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.accept", 1);
|
||||
processAccept(key);
|
||||
}
|
||||
if (connect) {
|
||||
@ -277,12 +309,12 @@ class EventPumper implements Runnable {
|
||||
processConnect(key);
|
||||
}
|
||||
if (read) {
|
||||
_context.statManager().addRateData("ntcp.read", 1, 0);
|
||||
//_context.statManager().addRateData("ntcp.read", 1, 0);
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
|
||||
processRead(key);
|
||||
}
|
||||
if (write) {
|
||||
_context.statManager().addRateData("ntcp.write", 1, 0);
|
||||
//_context.statManager().addRateData("ntcp.write", 1, 0);
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
|
||||
processWrite(key);
|
||||
}
|
||||
@ -293,93 +325,105 @@ class EventPumper implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the connection when it has data ready to write.
|
||||
* If we have bandwidth, calls con.Write() which calls wantsWrite(con).
|
||||
* If no bandwidth, calls con.queuedWrite().
|
||||
*/
|
||||
public void wantsWrite(NTCPConnection con, byte data[]) {
|
||||
ByteBuffer buf = ByteBuffer.wrap(data);
|
||||
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write");//con, buf);
|
||||
if (req.getPendingOutboundRequested() > 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("queued write on " + con + " for " + data.length);
|
||||
_context.statManager().addRateData("ntcp.wantsQueuedWrite", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.wantsQueuedWrite", 1);
|
||||
con.queuedWrite(buf, req);
|
||||
} else {
|
||||
// fully allocated
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("fully allocated write on " + con + " for " + data.length);
|
||||
//if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("fully allocated write on " + con + " for " + data.length);
|
||||
con.write(buf);
|
||||
}
|
||||
}
|
||||
/** called by the connection when it has data ready to write (after bw allocation) */
|
||||
|
||||
/**
|
||||
* Called by the connection when it has data ready to write (after bw allocation).
|
||||
* Only wakeup if new.
|
||||
*/
|
||||
public void wantsWrite(NTCPConnection con) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Before adding wants to write on " + con);
|
||||
if (!_wantsWrite.contains(con))
|
||||
_wantsWrite.offer(con);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Wants to write on " + con);
|
||||
_selector.wakeup();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("selector awoken for write");
|
||||
if (_wantsWrite.add(con)) {
|
||||
_selector.wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is only called from NTCPConnection.complete()
|
||||
* if there is more data, which is rare (never?)
|
||||
* so we don't need to check for dups or make _wantsRead a Set.
|
||||
*/
|
||||
public void wantsRead(NTCPConnection con) {
|
||||
if (!_wantsRead.contains(con))
|
||||
_wantsRead.offer(con);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("wants to read on " + con);
|
||||
_wantsRead.offer(con);
|
||||
_selector.wakeup();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("selector awoken for read");
|
||||
}
|
||||
|
||||
private static final int MIN_BUFS = 5;
|
||||
|
||||
/**
|
||||
* There's only one pumper, so static is fine, unless multi router
|
||||
* Is there a better way to do this?
|
||||
* How many to keep in reserve.
|
||||
* Shared if there are multiple routers in the JVM
|
||||
*/
|
||||
private static int NUM_BUFS = 5;
|
||||
private static int _numBufs = MIN_BUFS;
|
||||
private static int __liveBufs = 0;
|
||||
private static int __consecutiveExtra;
|
||||
ByteBuffer acquireBuf() {
|
||||
|
||||
/**
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
private static ByteBuffer acquireBuf() {
|
||||
ByteBuffer rv = _bufCache.poll();
|
||||
if (rv == null) {
|
||||
rv = ByteBuffer.allocate(BUF_SIZE);
|
||||
NUM_BUFS = ++__liveBufs;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
|
||||
_context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0);
|
||||
_numBufs = ++__liveBufs;
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
|
||||
//_context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("acquiring existing read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("acquiring existing read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
void releaseBuf(ByteBuffer buf) {
|
||||
/**
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
private static void releaseBuf(ByteBuffer buf) {
|
||||
//if (false) return;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf);
|
||||
buf.clear();
|
||||
int extra = _bufCache.size();
|
||||
boolean cached = extra < NUM_BUFS;
|
||||
boolean cached = extra < _numBufs;
|
||||
|
||||
if (cached) {
|
||||
_bufCache.offer(buf);
|
||||
if (extra > 5) {
|
||||
__consecutiveExtra++;
|
||||
if (__consecutiveExtra >= 20) {
|
||||
NUM_BUFS = Math.max(NUM_BUFS - 1, MIN_BUFS);
|
||||
_numBufs = Math.max(_numBufs - 1, MIN_BUFS);
|
||||
__consecutiveExtra = 0;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
__liveBufs--;
|
||||
}
|
||||
if (cached && _log.shouldLog(Log.DEBUG))
|
||||
_log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live");
|
||||
//if (cached && _log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live");
|
||||
}
|
||||
|
||||
private void processAccept(SelectionKey key) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("processing accept");
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("processing accept");
|
||||
ServerSocketChannel servChan = (ServerSocketChannel)key.attachment();
|
||||
try {
|
||||
SocketChannel chan = servChan.accept();
|
||||
@ -408,10 +452,10 @@ class EventPumper implements Runnable {
|
||||
|
||||
SelectionKey ckey = chan.register(_selector, SelectionKey.OP_READ);
|
||||
NTCPConnection con = new NTCPConnection(_context, _transport, chan, ckey);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("new NTCP connection established: " +con);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("new NTCP connection established: " +con);
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR)) _log.error("Error accepting", ioe);
|
||||
_log.error("Error accepting", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@ -427,11 +471,11 @@ class EventPumper implements Runnable {
|
||||
chan.socket().setKeepAlive(true);
|
||||
con.setKey(key);
|
||||
con.outboundConnected();
|
||||
_context.statManager().addRateData("ntcp.connectSuccessful", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.connectSuccessful", 1);
|
||||
} else {
|
||||
con.close();
|
||||
_transport.markUnreachable(con.getRemotePeer().calculateHash());
|
||||
_context.statManager().addRateData("ntcp.connectFailedTimeout", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.connectFailedTimeout", 1);
|
||||
}
|
||||
} catch (IOException ioe) { // this is the usual failure path for a timeout or connect refused
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
@ -439,25 +483,28 @@ class EventPumper implements Runnable {
|
||||
con.close();
|
||||
//_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "Error connecting", NTCPTransport.STYLE);
|
||||
_transport.markUnreachable(con.getRemotePeer().calculateHash());
|
||||
_context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1);
|
||||
} catch (NoConnectionPendingException ncpe) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
private void processRead(SelectionKey key) {
|
||||
NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
ByteBuffer buf = acquireBuf();
|
||||
try {
|
||||
int read = con.getChannel().read(buf);
|
||||
if (read == -1) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con);
|
||||
_context.statManager().addRateData("ntcp.readEOF", 1, 0);
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con);
|
||||
_context.statManager().addRateData("ntcp.readEOF", 1);
|
||||
con.close();
|
||||
releaseBuf(buf);
|
||||
} else if (read == 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("nothing to read for " + con + ", but stay interested");
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("nothing to read for " + con + ", but stay interested");
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
|
||||
releaseBuf(buf);
|
||||
} else if (read > 0) {
|
||||
@ -465,19 +512,20 @@ class EventPumper implements Runnable {
|
||||
buf.flip();
|
||||
buf.get(data);
|
||||
releaseBuf(buf);
|
||||
buf=null;
|
||||
buf = null;
|
||||
ByteBuffer rbuf = ByteBuffer.wrap(data);
|
||||
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
|
||||
if (req.getPendingInboundRequested() > 0) {
|
||||
// rare since we generally don't throttle inbound
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("bw throttled reading for " + con + ", so we don't want to read anymore");
|
||||
_context.statManager().addRateData("ntcp.queuedRecv", read, 0);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("bw throttled reading for " + con + ", so we don't want to read anymore");
|
||||
_context.statManager().addRateData("ntcp.queuedRecv", read);
|
||||
con.queuedRecv(rbuf, req);
|
||||
} else {
|
||||
// fully allocated
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("not bw throttled reading for " + con);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("not bw throttled reading for " + con);
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
|
||||
con.recv(rbuf);
|
||||
}
|
||||
@ -485,63 +533,64 @@ class EventPumper implements Runnable {
|
||||
} catch (CancelledKeyException cke) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error reading", cke);
|
||||
con.close();
|
||||
_context.statManager().addRateData("ntcp.readError", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.readError", 1);
|
||||
if (buf != null) releaseBuf(buf);
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe);
|
||||
con.close();
|
||||
_context.statManager().addRateData("ntcp.readError", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.readError", 1);
|
||||
if (buf != null) releaseBuf(buf);
|
||||
} catch (NotYetConnectedException nyce) {
|
||||
// ???
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
private void processWrite(SelectionKey key) {
|
||||
int totalWritten = 0;
|
||||
int buffers = 0;
|
||||
long before = System.currentTimeMillis();
|
||||
//int totalWritten = 0;
|
||||
//int buffers = 0;
|
||||
//long before = System.currentTimeMillis();
|
||||
NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
try {
|
||||
while (true) {
|
||||
ByteBuffer buf = con.getNextWriteBuf();
|
||||
if (buf != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("writing " + buf.remaining()+"...");
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("writing " + buf.remaining()+"...");
|
||||
if (buf.remaining() <= 0) {
|
||||
long beforeRem = System.currentTimeMillis();
|
||||
//long beforeRem = System.currentTimeMillis();
|
||||
con.removeWriteBuf(buf);
|
||||
long afterRem = System.currentTimeMillis();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("buffer was already fully written and removed after " + (afterRem-beforeRem) + "...");
|
||||
buf = null;
|
||||
buffers++;
|
||||
//long afterRem = System.currentTimeMillis();
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("buffer was already fully written and removed after " + (afterRem-beforeRem) + "...");
|
||||
//buffers++;
|
||||
continue;
|
||||
}
|
||||
int written = con.getChannel().write(buf);
|
||||
totalWritten += written;
|
||||
//totalWritten += written;
|
||||
if (written == 0) {
|
||||
if ( (buf.remaining() > 0) || (con.getWriteBufCount() >= 1) ) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains...");
|
||||
if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) {
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains...");
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, no data remains...");
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, no data remains...");
|
||||
}
|
||||
break;
|
||||
} else if (buf.remaining() > 0) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("buffer data remaining...");
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("buffer data remaining...");
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
break;
|
||||
} else {
|
||||
long beforeRem = System.currentTimeMillis();
|
||||
//long beforeRem = System.currentTimeMillis();
|
||||
con.removeWriteBuf(buf);
|
||||
long afterRem = System.currentTimeMillis();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("buffer "+ buffers+"/"+written+"/"+totalWritten+" fully written after " +
|
||||
(beforeRem-before) + ", then removed after " + (afterRem-beforeRem) + "...");
|
||||
//long afterRem = System.currentTimeMillis();
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("buffer "+ buffers+"/"+written+"/"+totalWritten+" fully written after " +
|
||||
// (beforeRem-before) + ", then removed after " + (afterRem-beforeRem) + "...");
|
||||
//releaseBuf(buf);
|
||||
buf = null;
|
||||
buffers++;
|
||||
//buffers++;
|
||||
//if (buffer time is too much, add OP_WRITe to the interest ops and break?)
|
||||
}
|
||||
} else {
|
||||
@ -550,19 +599,24 @@ class EventPumper implements Runnable {
|
||||
}
|
||||
} catch (CancelledKeyException cke) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error writing", cke);
|
||||
_context.statManager().addRateData("ntcp.writeError", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.writeError", 1);
|
||||
con.close();
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe);
|
||||
_context.statManager().addRateData("ntcp.writeError", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.writeError", 1);
|
||||
con.close();
|
||||
}
|
||||
long after = System.currentTimeMillis();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Wrote " + totalWritten + " in " + buffers + " buffers on " + con
|
||||
+ " after " + (after-before));
|
||||
//long after = System.currentTimeMillis();
|
||||
//if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("Wrote " + totalWritten + " in " + buffers + " buffers on " + con
|
||||
// + " after " + (after-before));
|
||||
}
|
||||
|
||||
/**
|
||||
* Pull off the 4 _wants* queues and update the interest ops,
|
||||
* which may, according to the javadocs, be a "naive" implementation and block.
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
private void runDelayedEvents() {
|
||||
NTCPConnection con;
|
||||
while ((con = _wantsRead.poll()) != null) {
|
||||
@ -590,19 +644,25 @@ class EventPumper implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
while ((con = _wantsWrite.poll()) != null) {
|
||||
SelectionKey key = con.getKey();
|
||||
try {
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
} catch (CancelledKeyException cke) {
|
||||
// ignore
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// see above
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("gnu?", iae);
|
||||
// check before instantiating iterator for speed
|
||||
if (!_wantsWrite.isEmpty()) {
|
||||
for (Iterator<NTCPConnection> iter = _wantsWrite.iterator(); iter.hasNext(); ) {
|
||||
con = iter.next();
|
||||
iter.remove();
|
||||
SelectionKey key = con.getKey();
|
||||
try {
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
} catch (CancelledKeyException cke) {
|
||||
// ignore
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// see above
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("gnu?", iae);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// only when address changes
|
||||
ServerSocketChannel chan;
|
||||
while ((chan = _wantsRegister.poll()) != null) {
|
||||
try {
|
||||
@ -625,13 +685,13 @@ class EventPumper implements Runnable {
|
||||
InetSocketAddress saddr = new InetSocketAddress(naddr.getHost(), naddr.getPort());
|
||||
boolean connected = con.getChannel().connect(saddr);
|
||||
if (connected) {
|
||||
_context.statManager().addRateData("ntcp.connectImmediate", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.connectImmediate", 1);
|
||||
key.interestOps(SelectionKey.OP_READ);
|
||||
processConnect(key);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error connecting", ioe);
|
||||
_context.statManager().addRateData("ntcp.connectFailedIOE", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.connectFailedIOE", 1);
|
||||
_transport.markUnreachable(con.getRemotePeer().calculateHash());
|
||||
//if (ntcpOnly(con)) {
|
||||
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage());
|
||||
@ -642,7 +702,7 @@ class EventPumper implements Runnable {
|
||||
//}
|
||||
} catch (UnresolvedAddressException uae) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("unresolved address connecting", uae);
|
||||
_context.statManager().addRateData("ntcp.connectFailedUnresolved", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.connectFailedUnresolved", 1);
|
||||
_transport.markUnreachable(con.getRemotePeer().calculateHash());
|
||||
//if (ntcpOnly(con)) {
|
||||
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage());
|
||||
@ -671,6 +731,7 @@ class EventPumper implements Runnable {
|
||||
* but if they support other transports (eg ssu) we should allow those transports to be
|
||||
* tried as well.
|
||||
*/
|
||||
/****
|
||||
private boolean ntcpOnly(NTCPConnection con) {
|
||||
RouterIdentity ident = con.getRemotePeer();
|
||||
if (ident == null) return true;
|
||||
@ -678,10 +739,13 @@ class EventPumper implements Runnable {
|
||||
if (info == null) return true;
|
||||
return info.getAddresses().size() == 1;
|
||||
}
|
||||
****/
|
||||
|
||||
private long _lastExpired;
|
||||
|
||||
private void expireTimedOut() {
|
||||
_transport.expireTimedOut();
|
||||
}
|
||||
|
||||
public long getIdleTimeout() { return _expireIdleWriteTime; }
|
||||
}
|
||||
|
@ -8,7 +8,9 @@ import java.nio.channels.SocketChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.zip.Adler32;
|
||||
|
||||
@ -58,13 +60,17 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
private final Log _log;
|
||||
private SocketChannel _chan;
|
||||
private SelectionKey _conKey;
|
||||
/** list of ByteBuffer containing data we have read and are ready to process, oldest first */
|
||||
private final LinkedBlockingQueue<ByteBuffer> _readBufs;
|
||||
/**
|
||||
* queue of ByteBuffer containing data we have read and are ready to process, oldest first
|
||||
* unbounded and lockless
|
||||
*/
|
||||
private final Queue<ByteBuffer> _readBufs;
|
||||
/**
|
||||
* list of ByteBuffers containing fully populated and encrypted data, ready to write,
|
||||
* and already cleared through the bandwidth limiter.
|
||||
* unbounded and lockless
|
||||
*/
|
||||
private final LinkedBlockingQueue<ByteBuffer> _writeBufs;
|
||||
private final Queue<ByteBuffer> _writeBufs;
|
||||
/** Requests that were not granted immediately */
|
||||
private final Set<FIFOBandwidthLimiter.Request> _bwRequests;
|
||||
private boolean _established;
|
||||
@ -79,7 +85,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
/**
|
||||
* pending unprepared OutNetMessage instances
|
||||
*/
|
||||
private final LinkedBlockingQueue<OutNetMessage> _outbound;
|
||||
private final Queue<OutNetMessage> _outbound;
|
||||
/**
|
||||
* current prepared OutNetMessage, or null - synchronize on _outbound to modify
|
||||
* FIXME why do we need this???
|
||||
@ -132,9 +138,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_created = System.currentTimeMillis();
|
||||
_transport = transport;
|
||||
_chan = chan;
|
||||
_readBufs = new LinkedBlockingQueue();
|
||||
_writeBufs = new LinkedBlockingQueue();
|
||||
_readBufs = new ConcurrentLinkedQueue();
|
||||
_writeBufs = new ConcurrentLinkedQueue();
|
||||
_bwRequests = new ConcurrentHashSet(2);
|
||||
// TODO possible switch to CLQ but beware non-constant size() - see below
|
||||
_outbound = new LinkedBlockingQueue();
|
||||
_isInbound = true;
|
||||
_decryptBlockBuf = new byte[16];
|
||||
@ -156,9 +163,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_transport = transport;
|
||||
_remotePeer = remotePeer;
|
||||
_remAddr = remAddr;
|
||||
_readBufs = new LinkedBlockingQueue();
|
||||
_writeBufs = new LinkedBlockingQueue();
|
||||
_readBufs = new ConcurrentLinkedQueue();
|
||||
_writeBufs = new ConcurrentLinkedQueue();
|
||||
_bwRequests = new ConcurrentHashSet(2);
|
||||
// TODO possible switch to CLQ but beware non-constant size() - see below
|
||||
_outbound = new LinkedBlockingQueue();
|
||||
_isInbound = false;
|
||||
_decryptBlockBuf = new byte[16];
|
||||
@ -199,7 +207,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_establishedOn = System.currentTimeMillis();
|
||||
_transport.inboundEstablished(this);
|
||||
_establishState = null;
|
||||
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
|
||||
_nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
|
||||
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
|
||||
}
|
||||
|
||||
@ -283,7 +291,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (Exception e) {}
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
int blocks = _writeBufs.size();
|
||||
_log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64());
|
||||
_log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash());
|
||||
}
|
||||
_context.statManager().addRateData("ntcp.closeOnBacklog", getUptime(), getUptime());
|
||||
close();
|
||||
@ -359,7 +367,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
if (target != null) {
|
||||
infoMsg.setTarget(target);
|
||||
infoMsg.beginSend();
|
||||
_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1);
|
||||
send(infoMsg);
|
||||
|
||||
// See comment below
|
||||
@ -435,7 +443,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_transport.markReachable(getRemotePeer().calculateHash(), false);
|
||||
//_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE);
|
||||
boolean msgs = !_outbound.isEmpty();
|
||||
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
|
||||
_nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
|
||||
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
|
||||
if (msgs)
|
||||
_transport.getWriter().wantsWrite(this, "outbound established");
|
||||
@ -471,6 +479,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
//else
|
||||
// prepareNextWriteSmall();
|
||||
}
|
||||
|
||||
/********** nobody's tried this one in years
|
||||
private void prepareNextWriteSmall() {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -593,7 +602,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
|
||||
if (_nextMetaTime <= System.currentTimeMillis()) {
|
||||
sendMeta();
|
||||
_nextMetaTime = System.currentTimeMillis() + META_FREQUENCY + _context.random().nextInt(META_FREQUENCY);
|
||||
_nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
|
||||
}
|
||||
|
||||
OutNetMessage msg = null;
|
||||
@ -784,7 +793,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
removeRequest(req);
|
||||
ByteBuffer buf = (ByteBuffer)req.attachment();
|
||||
if (req.getTotalInboundRequested() > 0) {
|
||||
_context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime()), 0);
|
||||
_context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime()));
|
||||
recv(buf);
|
||||
// our reads used to be bw throttled (during which time we were no
|
||||
// longer interested in reading from the network), but we aren't
|
||||
@ -792,7 +801,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_transport.getPumper().wantsRead(this);
|
||||
//_transport.getReader().wantsRead(this);
|
||||
} else if (req.getTotalOutboundRequested() > 0) {
|
||||
_context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()), 0);
|
||||
_context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()));
|
||||
write(buf);
|
||||
}
|
||||
}
|
||||
@ -836,14 +845,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_transport.getReader().wantsRead(this);
|
||||
updateStats();
|
||||
}
|
||||
|
||||
/**
|
||||
* The contents of the buffer have been encrypted / padded / etc and have
|
||||
* been fully allocated for the bandwidth limiter.
|
||||
*/
|
||||
public void write(ByteBuffer buf) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)");
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)");
|
||||
_writeBufs.offer(buf);
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)");
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)");
|
||||
_transport.getPumper().wantsWrite(this);
|
||||
}
|
||||
|
||||
@ -852,23 +862,25 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
return _readBufs.poll();
|
||||
}
|
||||
|
||||
/** since getNextReadBuf() removes, this should not be necessary */
|
||||
public void removeReadBuf(ByteBuffer buf) {
|
||||
_readBufs.remove(buf);
|
||||
//_transport.getPumper().releaseBuf(buf);
|
||||
/**
|
||||
* Replaces getWriteBufCount()
|
||||
* @since 0.8.12
|
||||
*/
|
||||
public boolean isWriteBufEmpty() {
|
||||
return _writeBufs.isEmpty();
|
||||
}
|
||||
|
||||
public int getWriteBufCount() { return _writeBufs.size(); }
|
||||
|
||||
/** @return null if none available */
|
||||
public ByteBuffer getNextWriteBuf() {
|
||||
return _writeBufs.peek(); // not remove! we removeWriteBuf afterwards
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the buffer, which _should_ be the one at the head of _writeBufs
|
||||
*/
|
||||
public void removeWriteBuf(ByteBuffer buf) {
|
||||
_bytesSent += buf.capacity();
|
||||
OutNetMessage msg = null;
|
||||
boolean bufsRemain = false;
|
||||
boolean clearMessage = false;
|
||||
if (_sendingMeta && (buf.capacity() == _meta.length)) {
|
||||
_sendingMeta = false;
|
||||
@ -876,7 +888,6 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
clearMessage = true;
|
||||
}
|
||||
_writeBufs.remove(buf);
|
||||
bufsRemain = !_writeBufs.isEmpty();
|
||||
if (clearMessage) {
|
||||
// see synchronization comments in prepareNextWriteFast()
|
||||
synchronized (_outbound) {
|
||||
@ -904,8 +915,13 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
boolean msgs = ((!_outbound.isEmpty()) || (_currentOutbound != null));
|
||||
if (msgs) // push through the bw limiter to reach _writeBufs
|
||||
_transport.getWriter().wantsWrite(this, "write completed");
|
||||
if (bufsRemain) // send asap
|
||||
_transport.getPumper().wantsWrite(this);
|
||||
|
||||
// this is not necessary, EventPumper.processWrite() handles this
|
||||
// and it just causes unnecessary selector.wakeup() and looping
|
||||
//boolean bufsRemain = !_writeBufs.isEmpty();
|
||||
//if (bufsRemain) // send asap
|
||||
// _transport.getPumper().wantsWrite(this);
|
||||
|
||||
updateStats();
|
||||
}
|
||||
|
||||
@ -982,8 +998,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
// _log.debug("parse decrypted i2np block (remaining: " + buf.remaining() + ")");
|
||||
boolean ok = recvUnencryptedI2NP();
|
||||
if (!ok) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data");
|
||||
_log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data");
|
||||
_context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1, getUptime());
|
||||
return;
|
||||
}
|
||||
@ -999,8 +1014,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
private boolean recvUnencryptedI2NP() {
|
||||
_curReadState.receiveBlock(_decryptBlockBuf);
|
||||
if (_curReadState.getSize() > BUFFER_SIZE) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString());
|
||||
_log.error("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString());
|
||||
_context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize(), getUptime());
|
||||
close();
|
||||
return false;
|
||||
@ -1114,7 +1128,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_dataReadBufs.offer(buf);
|
||||
}
|
||||
|
||||
/** @since 0.8.8 */
|
||||
/**
|
||||
* Call at transport shutdown
|
||||
* @since 0.8.8
|
||||
*/
|
||||
static void releaseResources() {
|
||||
_i2npHandlers.clear();
|
||||
_dataReadBufs.clear();
|
||||
@ -1278,9 +1295,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "NTCP Connection to " +
|
||||
(_remotePeer == null ? "unknown " : _remotePeer.calculateHash().toBase64().substring(0,6)) +
|
||||
" inbound? " + _isInbound + " established? " + _established +
|
||||
return "NTCP conn " +
|
||||
(_isInbound ? "from " : "to ") +
|
||||
(_remotePeer == null ? "unknown" : _remotePeer.calculateHash().toBase64().substring(0,6)) +
|
||||
(_established ? "" : " not established") +
|
||||
" created " + DataHelper.formatDuration(getTimeSinceCreated()) + " ago";
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.Vector;
|
||||
|
||||
@ -29,6 +30,7 @@ import net.i2p.router.transport.CommSystemFacadeImpl;
|
||||
import net.i2p.router.transport.Transport;
|
||||
import net.i2p.router.transport.TransportBid;
|
||||
import net.i2p.router.transport.TransportImpl;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.Translate;
|
||||
|
||||
@ -54,7 +56,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
* list of NTCPConnection of connections not yet established that we
|
||||
* want to remove on establishment or close on timeout
|
||||
*/
|
||||
private final List<NTCPConnection> _establishing;
|
||||
private final Set<NTCPConnection> _establishing;
|
||||
|
||||
/** this is rarely if ever used, default is to bind to wildcard address */
|
||||
public static final String PROP_BIND_INTERFACE = "i2np.ntcp.bindInterface";
|
||||
@ -115,14 +117,14 @@ public class NTCPTransport extends TransportImpl {
|
||||
_context.statManager().createRateStat("ntcp.invalidInboundSize", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.invalidInboundSkew", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.invalidSignature", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.liveReadBufs", "", "ntcp", RATES);
|
||||
//_context.statManager().createRateStat("ntcp.liveReadBufs", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.multipleCloseOnRemove", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.outboundEstablishFailed", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.outboundFailedIOEImmediate", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.invalidOutboundSkew", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.noBidTooLargeI2NP", "send size", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES);
|
||||
//_context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.readError", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.receiveCorruptEstablishment", "", "ntcp", RATES);
|
||||
@ -131,9 +133,9 @@ public class NTCPTransport extends TransportImpl {
|
||||
_context.statManager().createRateStat("ntcp.throttledReadComplete", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.throttledWriteComplete", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.wantsQueuedWrite", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.write", "", "ntcp", RATES);
|
||||
//_context.statManager().createRateStat("ntcp.write", "", "ntcp", RATES);
|
||||
_context.statManager().createRateStat("ntcp.writeError", "", "ntcp", RATES);
|
||||
_establishing = new ArrayList(4);
|
||||
_establishing = new ConcurrentHashSet(16);
|
||||
_conLock = new Object();
|
||||
_conByIdent = new HashMap(64);
|
||||
|
||||
@ -152,7 +154,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
}
|
||||
|
||||
void inboundEstablished(NTCPConnection con) {
|
||||
_context.statManager().addRateData("ntcp.inboundEstablished", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.inboundEstablished", 1);
|
||||
markReachable(con.getRemotePeer().calculateHash(), true);
|
||||
//_context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash());
|
||||
NTCPConnection old = null;
|
||||
@ -162,7 +164,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
if (old != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Old connection closed: " + old + " replaced by " + con);
|
||||
_context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", old.getUptime(), 0);
|
||||
_context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", old.getUptime());
|
||||
old.close();
|
||||
}
|
||||
}
|
||||
@ -203,7 +205,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error opening a channel", ioe);
|
||||
_context.statManager().addRateData("ntcp.outboundFailedIOEImmediate", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.outboundFailedIOEImmediate", 1);
|
||||
con.close();
|
||||
}
|
||||
} else {
|
||||
@ -263,17 +265,17 @@ public class NTCPTransport extends TransportImpl {
|
||||
return null;
|
||||
if (dataSize > NTCPConnection.MAX_MSG_SIZE) {
|
||||
// let SSU deal with it
|
||||
_context.statManager().addRateData("ntcp.noBidTooLargeI2NP", dataSize, 0);
|
||||
_context.statManager().addRateData("ntcp.noBidTooLargeI2NP", dataSize);
|
||||
return null;
|
||||
}
|
||||
Hash peer = toAddress.getIdentity().calculateHash();
|
||||
if (_context.shitlist().isShitlisted(peer, STYLE)) {
|
||||
// we aren't shitlisted in general (since we are trying to get a bid), but we have
|
||||
// recently shitlisted the peer on the NTCP transport, so don't try it
|
||||
_context.statManager().addRateData("ntcp.attemptShitlistedPeer", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.attemptShitlistedPeer", 1);
|
||||
return null;
|
||||
} else if (isUnreachable(peer)) {
|
||||
_context.statManager().addRateData("ntcp.attemptUnreachablePeer", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.attemptUnreachablePeer", 1);
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -287,7 +289,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
|
||||
if (addr == null) {
|
||||
markUnreachable(peer);
|
||||
_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1);
|
||||
//_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("no bid when trying to send to " + peer.toBase64() + " as they don't have an ntcp address");
|
||||
@ -295,7 +297,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
}
|
||||
NTCPAddress naddr = new NTCPAddress(addr);
|
||||
if ( (naddr.getPort() <= 0) || (naddr.getHost() == null) ) {
|
||||
_context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1);
|
||||
markUnreachable(peer);
|
||||
//_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -304,7 +306,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
}
|
||||
if (!naddr.isPubliclyRoutable()) {
|
||||
if (! _context.getProperty("i2np.ntcp.allowLocal", "false").equals("true")) {
|
||||
_context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1);
|
||||
markUnreachable(peer);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("no bid when trying to send to " + peer.toBase64() + " as they have a private ntcp address");
|
||||
@ -373,7 +375,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
if ( (removed != null) && (removed != con) ) {// multiple cons, close 'em both
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Multiple connections on remove, closing " + removed + " (already closed " + con + ")");
|
||||
_context.statManager().addRateData("ntcp.multipleCloseOnRemove", removed.getUptime(), 0);
|
||||
_context.statManager().addRateData("ntcp.multipleCloseOnRemove", removed.getUptime());
|
||||
removed.close();
|
||||
}
|
||||
}
|
||||
@ -566,40 +568,31 @@ public class NTCPTransport extends TransportImpl {
|
||||
* the con must be established to avoid premature close()ing
|
||||
*/
|
||||
public static final int ESTABLISH_TIMEOUT = 10*1000;
|
||||
|
||||
/** add us to the establishment timeout process */
|
||||
void establishing(NTCPConnection con) {
|
||||
synchronized (_establishing) {
|
||||
_establishing.add(con);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* called in the EventPumper no more than once a second or so, closing
|
||||
* any unconnected/unestablished connections
|
||||
*/
|
||||
void expireTimedOut() {
|
||||
List expired = null;
|
||||
synchronized (_establishing) {
|
||||
for (int i = 0; i < _establishing.size(); i++) {
|
||||
NTCPConnection con = (NTCPConnection)_establishing.get(i);
|
||||
if (con.isClosed()) {
|
||||
_establishing.remove(i);
|
||||
i--;
|
||||
} else if (con.isEstablished()) {
|
||||
_establishing.remove(i);
|
||||
i--;
|
||||
int expired = 0;
|
||||
|
||||
for (Iterator<NTCPConnection> iter = _establishing.iterator(); iter.hasNext(); ) {
|
||||
NTCPConnection con = iter.next();
|
||||
if (con.isClosed() || con.isEstablished()) {
|
||||
iter.remove();
|
||||
} else if (con.getTimeSinceCreated() > ESTABLISH_TIMEOUT) {
|
||||
_establishing.remove(i);
|
||||
i--;
|
||||
if (expired == null)
|
||||
expired = new ArrayList(2);
|
||||
expired.add(con);
|
||||
iter.remove();
|
||||
con.close();
|
||||
expired++;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int i = 0; expired != null && i < expired.size(); i++)
|
||||
((NTCPConnection)expired.get(i)).close();
|
||||
if ( (expired != null) && (!expired.isEmpty()) )
|
||||
_context.statManager().addRateData("ntcp.outboundEstablishFailed", expired.size(), 0);
|
||||
|
||||
if (expired > 0)
|
||||
_context.statManager().addRateData("ntcp.outboundEstablishFailed", expired);
|
||||
}
|
||||
|
||||
//private boolean bindAllInterfaces() { return true; }
|
||||
|
@ -67,6 +67,7 @@ class Reader {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("wantsRead: " + con + " already live? " + already);
|
||||
}
|
||||
|
||||
public void connectionClosed(NTCPConnection con) {
|
||||
synchronized (_pendingConnections) {
|
||||
_readAfterLive.remove(con);
|
||||
@ -135,14 +136,12 @@ class Reader {
|
||||
} else {
|
||||
// hmm, there shouldn't be a race here - only one reader should
|
||||
// be running on a con at a time...
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("no establishment state but " + con + " is established... race?");
|
||||
_log.error("no establishment state but " + con + " is established... race?");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (est.isComplete()) {
|
||||
// why is it complete yet !con.isEstablished?
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("establishment state [" + est + "] is complete, yet the connection isn't established? "
|
||||
+ con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")");
|
||||
break;
|
||||
@ -152,7 +151,7 @@ class Reader {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("closing connection on establishment because: " +est.getError(), est.getException());
|
||||
if (!est.getFailedBySkew())
|
||||
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1, 0);
|
||||
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1);
|
||||
con.close();
|
||||
return;
|
||||
} else if (buf.remaining() <= 0) {
|
||||
|
Reference in New Issue
Block a user