* 2006-07-18 0.6.1.22 released

2006-07-18  jrandom
    * Add a failsafe to the NTCP transport to make sure we keep
      pumping writes when we should.
    * Properly reallow 16-32KBps routers in the default config
      (thanks Complication!)
This commit is contained in:
jrandom
2006-07-18 20:08:00 +00:00
committed by zzz
parent 65138357d3
commit a52dd57215
12 changed files with 141 additions and 60 deletions

View File

@ -60,6 +60,10 @@ public class StatSummarizer implements Runnable {
",tunnel.buildExploratoryExpire.60000" +
",client.sendAckTime.60000" +
",client.dispatchNoACK.60000" +
",ntcp.sendTime.60000" +
",ntcp.transmitTime.60000" +
",ntcp.sendBacklogTime.60000" +
",ntcp.receiveTime.60000" +
",transport.sendMessageFailureLifetime.60000" +
",transport.sendProcessingTime.60000";

View File

@ -14,8 +14,8 @@ package net.i2p;
*
*/
public class CoreVersion {
public final static String ID = "$Revision: 1.63 $ $Date: 2006-06-04 17:25:14 $";
public final static String VERSION = "0.6.1.21";
public final static String ID = "$Revision: 1.64 $ $Date: 2006-06-13 21:17:43 $";
public final static String VERSION = "0.6.1.22";
public static void main(String args[]) {
System.out.println("I2P Core version: " + VERSION);

View File

@ -1,4 +1,12 @@
$Id: history.txt,v 1.493 2006-07-14 13:08:44 jrandom Exp $
$Id: history.txt,v 1.494 2006-07-16 12:20:46 complication Exp $
* 2006-07-18 0.6.1.22 released
2006-07-18 jrandom
* Add a failsafe to the NTCP transport to make sure we keep
pumping writes when we should.
* Properly reallow 16-32KBps routers in the default config
(thanks Complication!)
2006-07-16 Complication
* Collect tunnel build agree/reject/expire statistics

View File

@ -1,5 +1,5 @@
<i2p.news date="$Date: 2006-06-04 17:25:08 $">
<i2p.release version="0.6.1.21" date="2006/06/13" minVersion="0.6"
<i2p.news date="$Date: 2006-06-13 21:17:40 $">
<i2p.release version="0.6.1.22" date="2006/06/13" minVersion="0.6"
anonurl="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/i2p/i2pupdate.sud"
publicurl="http://dev.i2p.net/i2p/i2pupdate.sud"
anonannouncement="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/2005-September/000878.html"

View File

@ -4,7 +4,7 @@
<info>
<appname>i2p</appname>
<appversion>0.6.1.21</appversion>
<appversion>0.6.1.22</appversion>
<authors>
<author name="I2P" email="support@i2p.net"/>
</authors>

View File

@ -1,5 +1,5 @@
<i2p.news date="$Date: 2006-06-13 21:17:40 $">
<i2p.release version="0.6.1.21" date="2006/06/13" minVersion="0.6"
<i2p.news date="$Date: 2006-06-15 17:15:09 $">
<i2p.release version="0.6.1.22" date="2006/06/13" minVersion="0.6"
anonurl="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/i2p/i2pupdate.sud"
publicurl="http://dev.i2p.net/i2p/i2pupdate.sud"
anonannouncement="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/2005-September/000878.html"
@ -10,7 +10,7 @@
anonlogs="http://i2p/Nf3ab-ZFkmI-LyMt7GjgT-jfvZ3zKDl0L96pmGQXF1B82W2Bfjf0n7~288vafocjFLnQnVcmZd~-p0-Oolfo9aW2Rm-AhyqxnxyLlPBqGxsJBXjPhm1JBT4Ia8FB-VXt0BuY0fMKdAfWwN61-tj4zIcQWRxv3DFquwEf035K~Ra4SWOqiuJgTRJu7~o~DzHVljVgWIzwf8Z84cz0X33pv-mdG~~y0Bsc2qJVnYwjjR178YMcRSmNE0FVMcs6f17c6zqhMw-11qjKpY~EJfHYCx4lBWF37CD0obbWqTNUIbL~78vxqZRT3dgAgnLixog9nqTO-0Rh~NpVUZnoUi7fNR~awW5U3Cf7rU7nNEKKobLue78hjvRcWn7upHUF45QqTDuaM3yZa7OsjbcH-I909DOub2Q0Dno6vIwuA7yrysccN1sbnkwZbKlf4T6~iDdhaSLJd97QCyPOlbyUfYy9QLNExlRqKgNVJcMJRrIual~Lb1CLbnzt0uvobM57UpqSAAAA/meeting141"
publiclogs="http://www.i2p.net/meeting141" />
&#149;
2006-06-14: 0.6.1.21 <a href="http://dev.i2p/pipermail/i2p/2006-June/001294.html">released</a>
2006-07-14: 0.6.1.22 <a href="http://dev.i2p/pipermail/i2p/2006-June/001294.html">released</a>
<br />
&#149;
2006-06-13:

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.432 $ $Date: 2006-07-14 13:08:51 $";
public final static String VERSION = "0.6.1.21";
public final static long BUILD = 4;
public final static String ID = "$Revision: 1.433 $ $Date: 2006-07-16 12:20:47 $";
public final static String VERSION = "0.6.1.22";
public final static long BUILD = 0;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -3,14 +3,7 @@ package net.i2p.router.transport.ntcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -39,6 +32,11 @@ public class EventPumper implements Runnable {
private static final int BUF_SIZE = 8*1024;
private static final int MAX_CACHE_SIZE = 64;
/**
* every 30s or so, iterate across all ntcp connections just to make sure
* we have their interestOps set properly (and to expire any looong idle cons)
*/
private static final long FAILSAFE_ITERATION_FREQ = 60*1000l;
public EventPumper(RouterContext ctx, NTCPTransport transport) {
_context = ctx;
@ -82,6 +80,7 @@ public class EventPumper implements Runnable {
}
public void run() {
long lastFailsafeIteration = System.currentTimeMillis();
List bufList = new ArrayList(16);
while (_alive && _selector.isOpen()) {
try {
@ -107,43 +106,56 @@ public class EventPumper implements Runnable {
continue;
}
for (Iterator iter = selected.iterator(); iter.hasNext(); ) {
processKeys(selected);
selected.clear();
if (lastFailsafeIteration + FAILSAFE_ITERATION_FREQ < System.currentTimeMillis()) {
// in the *cough* unthinkable possibility that there are bugs in
// the code, lets periodically pass over all NTCP connections and
// make sure that anything which should be able to write has been
// properly marked as such, etc
lastFailsafeIteration = System.currentTimeMillis();
try {
SelectionKey key = (SelectionKey)iter.next();
int ops = key.readyOps();
boolean accept = (ops & SelectionKey.OP_ACCEPT) != 0;
boolean connect = (ops & SelectionKey.OP_CONNECT) != 0;
boolean read = (ops & SelectionKey.OP_READ) != 0;
boolean write = (ops & SelectionKey.OP_WRITE) != 0;
if (_log.shouldLog(Log.DEBUG))
_log.debug("ready ops for : " + key
+ " accept? " + accept + " connect? " + connect
+ " read? " + read
+ "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0)
+ " write? " + write
+ "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0)
);
if (accept) {
processAccept(key);
Set all = _selector.keys();
int failsafeWrites = 0;
int failsafeCloses = 0;
long expireIdleWriteTime = 60*60*1000l + _context.random().nextLong(60*60*1000l);
for (Iterator iter = all.iterator(); iter.hasNext(); ) {
try {
SelectionKey key = (SelectionKey)iter.next();
Object att = key.attachment();
if (!(att instanceof NTCPConnection))
continue; // to the next con
NTCPConnection con = (NTCPConnection)att;
if ( (con.getWriteBufCount() > 0) &&
((key.interestOps() & SelectionKey.OP_WRITE) == 0) ) {
// the data queued to be sent has already passed through
// the bw limiter and really just wants to get shoved
// out the door asap.
key.interestOps(SelectionKey.OP_WRITE | key.interestOps());
failsafeWrites++;
}
if ( (con.getTimeSinceSend() > expireIdleWriteTime) && (con.getMessagesSent() > 0) ) {
// we haven't sent anything in a really long time, so lets just close 'er up
con.close();
failsafeCloses++;
}
} catch (CancelledKeyException cke) {
// cancelled while updating the interest ops. ah well
}
if (failsafeWrites > 0)
_context.statManager().addRateData("ntcp.failsafeWrites", failsafeWrites, 0);
if (failsafeCloses > 0)
_context.statManager().addRateData("ntcp.failsafeCloses", failsafeCloses, 0);
}
if (connect) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
processConnect(key);
}
if (read) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
processRead(key);
}
if (write) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
processWrite(key);
}
} catch (CancelledKeyException cke) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("key cancelled");
} catch (ClosedSelectorException cse) {
continue;
}
}
selected.clear();
} catch (RuntimeException re) {
_log.log(Log.CRIT, "Error in the event pumper", re);
}
@ -184,6 +196,45 @@ public class EventPumper implements Runnable {
synchronized (_wantsWrite) { _wantsWrite.clear(); }
}
private void processKeys(Set selected) {
for (Iterator iter = selected.iterator(); iter.hasNext(); ) {
try {
SelectionKey key = (SelectionKey)iter.next();
int ops = key.readyOps();
boolean accept = (ops & SelectionKey.OP_ACCEPT) != 0;
boolean connect = (ops & SelectionKey.OP_CONNECT) != 0;
boolean read = (ops & SelectionKey.OP_READ) != 0;
boolean write = (ops & SelectionKey.OP_WRITE) != 0;
if (_log.shouldLog(Log.DEBUG))
_log.debug("ready ops for : " + key
+ " accept? " + accept + " connect? " + connect
+ " read? " + read
+ "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0)
+ " write? " + write
+ "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0)
);
if (accept) {
processAccept(key);
}
if (connect) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
processConnect(key);
}
if (read) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
processRead(key);
}
if (write) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
processWrite(key);
}
} catch (CancelledKeyException cke) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("key cancelled");
}
}
}
public void wantsWrite(NTCPConnection con, byte data[]) {
ByteBuffer buf = ByteBuffer.wrap(data);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write", null, null);//con, buf);
@ -305,7 +356,9 @@ public class EventPumper implements Runnable {
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Error processing connection", ioe);
}
} catch (NoConnectionPendingException ncpe) {
// ignore
}
}
private void processRead(SelectionKey key) {
@ -455,6 +508,7 @@ public class EventPumper implements Runnable {
con.setKey(key);
try {
NTCPAddress naddr = con.getRemoteAddress();
if (naddr.getPort() <= 0) throw new IOException("Invalid NTCP address: " + naddr);
InetSocketAddress saddr = new InetSocketAddress(naddr.getHost(), naddr.getPort());
boolean connected = con.getChannel().connect(saddr);
if (connected) {

View File

@ -64,7 +64,7 @@ public class NTCPAddress {
} else {
_host = host.trim();
String port = addr.getOptions().getProperty(PROP_PORT);
if ( (port != null) && (port.trim().length() > 0) ) {
if ( (port != null) && (port.trim().length() > 0) && !("null".equals(port)) ) {
try {
_port = Integer.parseInt(port.trim());
} catch (NumberFormatException nfe) {

View File

@ -246,9 +246,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
boolean successful = false;
_consecutiveBacklog++;
_transport.afterSend(msg, successful, allowRequeue, msg.getLifetime());
if (_consecutiveBacklog > 50) { // waaay too backlogged
if (_consecutiveBacklog > 10) { // waaay too backlogged
boolean wantsWrite = false;
try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (Exception e) {}
int blocks = 0;
synchronized (_writeBufs) { blocks = _writeBufs.size(); }
if (_log.shouldLog(Log.ERROR))
_log.error("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ") sending to " + _remotePeer.calculateHash().toBase64());
_log.error("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64());
close();
}
return;

View File

@ -60,6 +60,8 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().createRateStat("ntcp.receiveTime", "How long it takes to receive an inbound message", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.receiveSize", "How large the received message was", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.sendBacklogTime", "How long the head of the send queue has been waiting when we fail to add a new one to the queue (period is the number of messages queued)", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.failsafeWrites", "How many times do we need to proactively add in an extra nio write to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.failsafeCloses", "How many times do we need to proactively close an idle connection to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 });
_establishing = new ArrayList(4);
_conLock = new Object();

View File

@ -156,8 +156,7 @@ abstract class TunnelPeerSelector {
} else if (filterSlow(ctx, isInbound, isExploratory)) {
Log log = ctx.logManager().getLog(TunnelPeerSelector.class);
String excludeCaps = ctx.getProperty("router.excludePeerCaps",
String.valueOf(Router.CAPABILITY_BW16) +
String.valueOf(Router.CAPABILITY_BW32));
String.valueOf(Router.CAPABILITY_BW16));
Set peers = new HashSet();
if (excludeCaps != null) {
char excl[] = excludeCaps.toCharArray();
@ -224,7 +223,17 @@ abstract class TunnelPeerSelector {
if (infoAge < 0) {
infoAge = 0;
} else if (infoAge > 24*60*60*1000) {
peers.add(peer.getIdentity().calculateHash());
// Only exclude long-unseen peers if we haven't just started up
long DONT_EXCLUDE_PERIOD = 15*60*1000;
if (ctx.router().getUptime() < DONT_EXCLUDE_PERIOD) {
if (log.shouldLog(Log.DEBUG))
log.debug("Not excluding a long-unseen peer, since we just started up.");
} else {
if (log.shouldLog(Log.DEBUG))
log.debug("Excluding a long-unseen peer.");
peers.add(peer.getIdentity().calculateHash());
}
//peers.add(peer.getIdentity().calculateHash());
continue;
} else {
if (infoAge + uptimeMs < 2*60*60*1000) {