forked from I2P_Developers/i2p.i2p
NTCP: Rare EventPumper 100% CPU fix (ticket #2476)
This commit is contained in:
@ -18,10 +18,10 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 8;
|
||||
public final static long BUILD = 9;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
public final static String EXTRA = "-rc";
|
||||
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + FULL_VERSION);
|
||||
|
@ -268,6 +268,7 @@ class EventPumper implements Runnable {
|
||||
_log.info("Removing invalid key for " + con);
|
||||
// this will cancel the key, and it will then be removed from the keyset
|
||||
con.close();
|
||||
key.cancel();
|
||||
failsafeInvalid++;
|
||||
continue;
|
||||
}
|
||||
@ -298,6 +299,7 @@ class EventPumper implements Runnable {
|
||||
con.getTimeSinceReceive(now) > expire) {
|
||||
// we haven't sent or received anything in a really long time, so lets just close 'er up
|
||||
con.sendTerminationAndClose();
|
||||
key.cancel();
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Failsafe or expire close for " + con);
|
||||
failsafeCloses++;
|
||||
@ -536,9 +538,9 @@ class EventPumper implements Runnable {
|
||||
}
|
||||
|
||||
private void processConnect(SelectionKey key) {
|
||||
NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
final NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
final SocketChannel chan = con.getChannel();
|
||||
try {
|
||||
SocketChannel chan = con.getChannel();
|
||||
boolean connected = chan.finishConnect();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("processing connect for " + con + ": connected? " + connected);
|
||||
@ -587,7 +589,8 @@ class EventPumper implements Runnable {
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
private void processRead(SelectionKey key) {
|
||||
NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
final NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
final SocketChannel chan = con.getChannel();
|
||||
ByteBuffer buf = null;
|
||||
try {
|
||||
while (true) {
|
||||
@ -595,7 +598,7 @@ class EventPumper implements Runnable {
|
||||
int read = 0;
|
||||
int readThisTime;
|
||||
int readCount = 0;
|
||||
while ((readThisTime = con.getChannel().read(buf)) > 0) {
|
||||
while ((readThisTime = chan.read(buf)) > 0) {
|
||||
read += readThisTime;
|
||||
readCount++;
|
||||
}
|
||||
@ -605,7 +608,7 @@ class EventPumper implements Runnable {
|
||||
_log.debug("Read " + read + " bytes total in " + readCount + " times from " + con);
|
||||
if (read < 0) {
|
||||
if (con.isInbound() && con.getMessagesReceived() <= 0) {
|
||||
InetAddress addr = con.getChannel().socket().getInetAddress();
|
||||
InetAddress addr = chan.socket().getInetAddress();
|
||||
int count;
|
||||
if (addr != null) {
|
||||
byte[] ip = addr.getAddress();
|
||||
@ -685,7 +688,7 @@ class EventPumper implements Runnable {
|
||||
if (buf != null)
|
||||
releaseBuf(buf);
|
||||
if (con.isInbound() && con.getMessagesReceived() <= 0) {
|
||||
InetAddress addr = con.getChannel().socket().getInetAddress();
|
||||
InetAddress addr = chan.socket().getInetAddress();
|
||||
int count;
|
||||
if (addr != null) {
|
||||
byte[] ip = addr.getAddress();
|
||||
@ -731,7 +734,8 @@ class EventPumper implements Runnable {
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
private void processWrite(SelectionKey key) {
|
||||
NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
final NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
final SocketChannel chan = con.getChannel();
|
||||
try {
|
||||
while (true) {
|
||||
ByteBuffer buf = con.getNextWriteBuf();
|
||||
@ -740,7 +744,7 @@ class EventPumper implements Runnable {
|
||||
con.removeWriteBuf(buf);
|
||||
continue;
|
||||
}
|
||||
int written = con.getChannel().write(buf);
|
||||
int written = chan.write(buf);
|
||||
//totalWritten += written;
|
||||
if (written == 0) {
|
||||
if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) {
|
||||
@ -845,8 +849,9 @@ class EventPumper implements Runnable {
|
||||
}
|
||||
|
||||
while ((con = _wantsConRegister.poll()) != null) {
|
||||
final SocketChannel schan = con.getChannel();
|
||||
try {
|
||||
SelectionKey key = con.getChannel().register(_selector, SelectionKey.OP_CONNECT);
|
||||
SelectionKey key = schan.register(_selector, SelectionKey.OP_CONNECT);
|
||||
key.attach(con);
|
||||
con.setKey(key);
|
||||
RouterAddress naddr = con.getRemoteAddress();
|
||||
@ -857,7 +862,7 @@ class EventPumper implements Runnable {
|
||||
if (port <= 0 || ip == null)
|
||||
throw new IOException("Invalid NTCP address: " + naddr);
|
||||
InetSocketAddress saddr = new InetSocketAddress(InetAddress.getByAddress(ip), port);
|
||||
boolean connected = con.getChannel().connect(saddr);
|
||||
boolean connected = schan.connect(saddr);
|
||||
if (connected) {
|
||||
// Never happens, we use nonblocking
|
||||
key.interestOps(SelectionKey.OP_READ);
|
||||
|
@ -288,14 +288,14 @@ public class NTCPConnection implements Closeable {
|
||||
/**
|
||||
* Valid for inbound; valid for outbound shortly after creation
|
||||
*/
|
||||
public SocketChannel getChannel() { return _chan; }
|
||||
public synchronized SocketChannel getChannel() { return _chan; }
|
||||
|
||||
/**
|
||||
* Valid for inbound; valid for outbound shortly after creation
|
||||
*/
|
||||
public SelectionKey getKey() { return _conKey; }
|
||||
public void setChannel(SocketChannel chan) { _chan = chan; }
|
||||
public void setKey(SelectionKey key) { _conKey = key; }
|
||||
public synchronized SelectionKey getKey() { return _conKey; }
|
||||
public synchronized void setChannel(SocketChannel chan) { _chan = chan; }
|
||||
public synchronized void setKey(SelectionKey key) { _conKey = key; }
|
||||
|
||||
public boolean isInbound() { return _isInbound; }
|
||||
public boolean isEstablished() { return _establishState.isComplete(); }
|
||||
@ -1125,7 +1125,11 @@ public class NTCPConnection implements Closeable {
|
||||
* async callback after the outbound connection was completed (this should NOT block,
|
||||
* as it occurs in the selector thread)
|
||||
*/
|
||||
void outboundConnected() {
|
||||
synchronized void outboundConnected() {
|
||||
if (_establishState == EstablishBase.FAILED) {
|
||||
_conKey.cancel();
|
||||
return;
|
||||
}
|
||||
_conKey.interestOps(_conKey.interestOps() | SelectionKey.OP_READ);
|
||||
// schedule up the beginning of our handshaking by calling prepareNextWrite on the
|
||||
// writer thread pool
|
||||
|
Reference in New Issue
Block a user