2006-07-27 jrandom

* Further NTCP write status cleanup
    * Handle more oddly-timed NTCP disconnections (thanks bar!)
This commit is contained in:
jrandom
2006-07-27 06:20:25 +00:00
committed by zzz
parent c48c419d74
commit 84b741ac98
4 changed files with 41 additions and 9 deletions

View File

@ -1,4 +1,8 @@
$Id: history.txt,v 1.496 2006-07-26 01:36:18 jrandom Exp $
$Id: history.txt,v 1.497 2006-07-26 20:04:59 jrandom Exp $
2006-07-27 jrandom
* Further NTCP write status cleanup
* Handle more oddly-timed NTCP disconnections (thanks bar!)
2006-07-26 jrandom
* When dropping a netDb router reference, only accept newer

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.435 $ $Date: 2006-07-26 01:36:30 $";
public final static String ID = "$Revision: 1.436 $ $Date: 2006-07-26 19:56:52 $";
public final static String VERSION = "0.6.1.22";
public final static long BUILD = 2;
public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -33,10 +33,13 @@ public class EventPumper implements Runnable {
private static final int BUF_SIZE = 8*1024;
private static final int MAX_CACHE_SIZE = 64;
/**
* every 30s or so, iterate across all ntcp connections just to make sure
* we have their interestOps set properly (and to expire any looong idle cons)
* every few seconds, iterate across all ntcp connections just to make sure
* we have their interestOps set properly (and to expire any looong idle cons).
* as the number of connections grows, we should try to make this happen
* less frequently (or not at all), but while the connection count is small,
* the time to iterate across them to check a few flags shouldn't be a problem.
*/
private static final long FAILSAFE_ITERATION_FREQ = 30*1000l;
private static final long FAILSAFE_ITERATION_FREQ = 2*1000l;
public EventPumper(RouterContext ctx, NTCPTransport transport) {
_context = ctx;
@ -408,6 +411,11 @@ public class EventPumper implements Runnable {
con.recv(rbuf);
}
}
} catch (CancelledKeyException cke) {
if (_log.shouldLog(Log.WARN)) _log.warn("error reading", cke);
con.close();
_context.statManager().addRateData("ntcp.readError", 1, 0);
if (buf != null) releaseBuf(buf);
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe);
con.close();
@ -442,7 +450,7 @@ public class EventPumper implements Runnable {
int written = con.getChannel().write(buf);
totalWritten += written;
if (written == 0) {
if ( (buf.remaining() > 0) || (con.getWriteBufCount() > 1) ) {
if ( (buf.remaining() > 0) || (con.getWriteBufCount() >= 1) ) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains...");
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
@ -469,6 +477,10 @@ public class EventPumper implements Runnable {
break;
}
}
} catch (CancelledKeyException cke) {
if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe);
_context.statManager().addRateData("ntcp.writeError", 1, 0);
con.close();
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe);
_context.statManager().addRateData("ntcp.writeError", 1, 0);
@ -490,7 +502,11 @@ public class EventPumper implements Runnable {
while (buf.size() > 0) {
NTCPConnection con = (NTCPConnection)buf.remove(0);
SelectionKey key = con.getKey();
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
try {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} catch (CancelledKeyException cke) {
// ignore, we remove/etc elsewhere
}
}
synchronized (_wantsWrite) {
@ -569,6 +585,8 @@ public class EventPumper implements Runnable {
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage(), NTCPTransport.STYLE);
con.close(true);
//}
} catch (CancelledKeyException cke) {
con.close(false);
}
} catch (ClosedChannelException cce) {
if (_log.shouldLog(Log.WARN)) _log.warn("Error registering", cce);

View File

@ -236,6 +236,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage msg = (OutNetMessage)msgs.get(i);
Object buf = msg.releasePreparationBuffer();
if (buf != null)
releaseBuf((PrepBuffer)buf);
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
OutNetMessage msg = _currentOutbound;
if (msg != null) {
Object buf = msg.releasePreparationBuffer();
if (buf != null)
releaseBuf((PrepBuffer)buf);
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
}
@ -806,7 +816,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
public ByteBuffer getNextWriteBuf() {
synchronized (_writeBufs) {
if (_writeBufs.size() > 0)
return (ByteBuffer)_writeBufs.remove(0);
return (ByteBuffer)_writeBufs.get(0); // not remove! we removeWriteBuf afterwards
}
return null;
}