2005-04-28 jrandom

* More fixes for the I2PTunnel "other" interface handling (thanks nelgin!)
    * Add back the code to handle bids from multiple transports (though there
      is still only one transport enabled by default)
    * Adjust the router's queueing of outbound client messages when under
      heavy load by running the preparatory job in the client's I2CP handler
      thread, thereby blocking additional outbound messages when the router is
      hosed.
    * No need to validate or persist a netDb entry if we already have it
And for some udp stuff:
* only bid on what we know (duh)
* reduceed the queue size in the UDPSender itself, so that ACKs go
  through more quickly, leaving the payload messages to queue up in
  the outbound fragment scheduler
* rather than /= 2 on congestion, /= 2/3 (still AIMD, but less drastic)
* adjust the fragment selector so a wsiz throttle won't force extra
  volleys
* mark congestion when it occurs, not after the message has been
  ACKed
* when doing a round robin over the active messages, move on to the
  next after a full volley, not after each packet (causing less "fair"
  performance but better latency)
* reduced the lock contention in the inboundMessageFragments by
  moving the ack and complete queues to the ACKSender and
  MessageReceiver respectively (each of which have their own
  threads)
* prefer new and existing UDP sessions to new TCP sessions, but
  prefer existing TCP sessions to new UDP sessions
This commit is contained in:
jrandom
2005-04-28 21:54:27 +00:00
committed by zzz
parent 6e34d9b73e
commit 4ce51261f1
22 changed files with 371 additions and 205 deletions

View File

@ -96,7 +96,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
_log.error("Error while closing the received i2p con", ex);
}
} catch (IOException ex) {
_log.error("Error while waiting for I2PConnections", ex);
_log.error("Error while handling for I2PConnections", ex);
}
long afterHandle = I2PAppContext.getGlobalContext().clock().now();

View File

@ -93,7 +93,7 @@ if (curTunnel >= 0) {
</select>
&nbsp;&nbsp;
<b>others:</b>
<input type="text" name="reachablyByOther" size="20" />
<input type="text" name="reachableByOther" size="20" />
<% } else if ("0.0.0.0".equals(clientInterface)) { %>
<option value="127.0.0.1">Locally (127.0.0.1)</option>
<option value="0.0.0.0" selected="true">Everyone (0.0.0.0)</option>
@ -102,7 +102,7 @@ if (curTunnel >= 0) {
</select>
&nbsp;&nbsp;
<b>others:</b>
<input type="text" name="reachablyByOther" size="20" />
<input type="text" name="reachableByOther" size="20" />
<% } else { %>
<option value="127.0.0.1">Locally (127.0.0.1)</option>
<option value="0.0.0.0">Everyone (0.0.0.0)</option>

View File

@ -1,4 +1,14 @@
$Id: history.txt,v 1.198 2005/04/24 13:42:05 jrandom Exp $
$Id: history.txt,v 1.199 2005/04/25 21:59:23 smeghead Exp $
2005-04-28 jrandom
* More fixes for the I2PTunnel "other" interface handling (thanks nelgin!)
* Add back the code to handle bids from multiple transports (though there
is still only one transport enabled by default)
* Adjust the router's queueing of outbound client messages when under
heavy load by running the preparatory job in the client's I2CP handler
thread, thereby blocking additional outbound messages when the router is
hosed.
* No need to validate or persist a netDb entry if we already have it
2005-04-25 smeghead
* Added button to router console for manual update checks.

View File

@ -1,15 +1,18 @@
<i2p.news date="$Date: 2005/04/06 10:43:25 $">
<i2p.news date="$Date: 2005/04/20 15:14:18 $">
<i2p.release version="0.5.0.7" date="2005/04/20" minVersion="0.5.0.4"
anonurl="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/i2p/i2pupdate.sud"
publicurl="http://dev.i2p.net/i2p/i2pupdate.sud"
anonannouncement="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/April-2005/000709.html"
publicannouncement="http://dev.i2p.net/pipermail/i2p/April-2005/000709.html" />
anonannouncement="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/2005-April/000709.html"
publicannouncement="http://dev.i2p.net/pipermail/i2p/2005-April/000709.html" />
<i2p.notes date="2005/04/19"
anonurl="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/April-2005/000708.html"
publicurl="http://dev.i2p.net/pipermail/i2p/April-2005/000708.html"
anonlogs="http://i2p/Nf3ab-ZFkmI-LyMt7GjgT-jfvZ3zKDl0L96pmGQXF1B82W2Bfjf0n7~288vafocjFLnQnVcmZd~-p0-Oolfo9aW2Rm-AhyqxnxyLlPBqGxsJBXjPhm1JBT4Ia8FB-VXt0BuY0fMKdAfWwN61-tj4zIcQWRxv3DFquwEf035K~Ra4SWOqiuJgTRJu7~o~DzHVljVgWIzwf8Z84cz0X33pv-mdG~~y0Bsc2qJVnYwjjR178YMcRSmNE0FVMcs6f17c6zqhMw-11qjKpY~EJfHYCx4lBWF37CD0obbWqTNUIbL~78vxqZRT3dgAgnLixog9nqTO-0Rh~NpVUZnoUi7fNR~awW5U3Cf7rU7nNEKKobLue78hjvRcWn7upHUF45QqTDuaM3yZa7OsjbcH-I909DOub2Q0Dno6vIwuA7yrysccN1sbnkwZbKlf4T6~iDdhaSLJd97QCyPOlbyUfYy9QLNExlRqKgNVJcMJRrIual~Lb1CLbnzt0uvobM57UpqSAAAA/meeting138"
anonurl="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/2005-April/000723.html"
publicurl="http://dev.i2p.net/pipermail/i2p/2005-April/000723.html"
anonlogs="http://i2p/Nf3ab-ZFkmI-LyMt7GjgT-jfvZ3zKDl0L96pmGQXF1B82W2Bfjf0n7~288vafocjFLnQnVcmZd~-p0-Oolfo9aW2Rm-AhyqxnxyLlPBqGxsJBXjPhm1JBT4Ia8FB-VXt0BuY0fMKdAfWwN61-tj4zIcQWRxv3DFquwEf035K~Ra4SWOqiuJgTRJu7~o~DzHVljVgWIzwf8Z84cz0X33pv-mdG~~y0Bsc2qJVnYwjjR178YMcRSmNE0FVMcs6f17c6zqhMw-11qjKpY~EJfHYCx4lBWF37CD0obbWqTNUIbL~78vxqZRT3dgAgnLixog9nqTO-0Rh~NpVUZnoUi7fNR~awW5U3Cf7rU7nNEKKobLue78hjvRcWn7upHUF45QqTDuaM3yZa7OsjbcH-I909DOub2Q0Dno6vIwuA7yrysccN1sbnkwZbKlf4T6~iDdhaSLJd97QCyPOlbyUfYy9QLNExlRqKgNVJcMJRrIual~Lb1CLbnzt0uvobM57UpqSAAAA/meeting139"
publiclogs="http://www.i2p.net/meeting138" />
<b>0.5.0.7 release available!</b> It is backwards compatible, but should
help eepsite authors keep their sites reachable, so give it a whirl! <br />
No news to report, beyond the usual
<a href="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/2005-April/000723.html">status notes</a>
(<a href="http://dev.i2p.net/pipermail/i2p/2005-April/000723.html">non anon</a>)
and <a href="http://i2p/Nf3ab-ZFkmI-LyMt7GjgT-jfvZ3zKDl0L96pmGQXF1B82W2Bfjf0n7~288vafocjFLnQnVcmZd~-p0-Oolfo9aW2Rm-AhyqxnxyLlPBqGxsJBXjPhm1JBT4Ia8FB-VXt0BuY0fMKdAfWwN61-tj4zIcQWRxv3DFquwEf035K~Ra4SWOqiuJgTRJu7~o~DzHVljVgWIzwf8Z84cz0X33pv-mdG~~y0Bsc2qJVnYwjjR178YMcRSmNE0FVMcs6f17c6zqhMw-11qjKpY~EJfHYCx4lBWF37CD0obbWqTNUIbL~78vxqZRT3dgAgnLixog9nqTO-0Rh~NpVUZnoUi7fNR~awW5U3Cf7rU7nNEKKobLue78hjvRcWn7upHUF45QqTDuaM3yZa7OsjbcH-I909DOub2Q0Dno6vIwuA7yrysccN1sbnkwZbKlf4T6~iDdhaSLJd97QCyPOlbyUfYy9QLNExlRqKgNVJcMJRrIual~Lb1CLbnzt0uvobM57UpqSAAAA/meeting139">meeting logs</a>
(<a href="http://www.i2p.net/meeting139">non anon</a>)<br />
</i2p.news>

View File

@ -58,7 +58,11 @@ public class ClientMessagePool {
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding message for remote delivery");
_context.jobQueue().addJob(new OutboundClientMessageOneShotJob(_context, msg));
OutboundClientMessageOneShotJob j = new OutboundClientMessageOneShotJob(_context, msg);
if (true) // blocks the I2CP reader for a nontrivial period of time
j.runJob();
else
_context.jobQueue().addJob(j);
}
}

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.189 $ $Date: 2005/04/20 15:14:19 $";
public final static String ID = "$Revision: 1.190 $ $Date: 2005/04/24 13:42:04 $";
public final static String VERSION = "0.5.0.7";
public final static long BUILD = 1;
public final static long BUILD = 2;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -42,6 +42,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_context = context;
_log = _context.logManager().getLog(ClientMessageEventListener.class);
_runner = runner;
_context.statManager().createRateStat("client.distributeTime", "How long it took to inject the client message into the router", "ClientMessages", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
}
/**
@ -162,6 +163,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
MessageId id = _runner.distributeMessage(message);
long timeToDistribute = _context.clock().now() - beforeDistribute;
_runner.ackSendMessage(id, message.getNonce());
_context.statManager().addRateData("client.distributeTime", timeToDistribute, timeToDistribute);
if ( (timeToDistribute > 50) && (_log.shouldLog(Log.WARN)) )
_log.warn("Took too long to distribute the message (which holds up the ack): " + timeToDistribute);
}

View File

@ -545,13 +545,17 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
public LeaseSet store(Hash key, LeaseSet leaseSet) throws IllegalArgumentException {
if (!_initialized) return null;
LeaseSet rv = (LeaseSet)_ds.get(key);
if ( (rv != null) && (rv.equals(leaseSet)) ) {
// if it hasn't changed, no need to do anything
return rv;
}
String err = validate(key, leaseSet);
if (err != null)
throw new IllegalArgumentException("Invalid store attempt - " + err);
LeaseSet rv = null;
if (_ds.isKnown(key))
rv = (LeaseSet)_ds.get(key);
_ds.put(key, leaseSet);
synchronized (_lastSent) {
if (!_lastSent.containsKey(key))
@ -629,14 +633,17 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
public RouterInfo store(Hash key, RouterInfo routerInfo) throws IllegalArgumentException {
if (!_initialized) return null;
RouterInfo rv = (RouterInfo)_ds.get(key);
if ( (rv != null) && (rv.equals(routerInfo)) ) {
// no need to validate
return rv;
}
String err = validate(key, routerInfo);
if (err != null)
throw new IllegalArgumentException("Invalid store attempt - " + err);
RouterInfo rv = null;
if (_ds.isKnown(key))
rv = (RouterInfo)_ds.get(key);
if (_log.shouldLog(Log.INFO))
_log.info("RouterInfo " + key.toBase64() + " is stored with "
+ routerInfo.getOptions().size() + " options on "
@ -648,7 +655,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (!_lastSent.containsKey(key))
_lastSent.put(key, new Long(0));
}
_kb.add(key);
if (rv == null)
_kb.add(key);
return rv;
}

View File

@ -8,6 +8,7 @@ package net.i2p.router.transport;
*
*/
import java.util.List;
import net.i2p.data.Hash;
import net.i2p.router.JobImpl;
import net.i2p.router.MessageSelector;
@ -58,18 +59,28 @@ public class GetBidsJob extends JobImpl {
return;
}
TransportBid bid = facade.getBid(msg);
if (bid == null) {
// only shitlist them if we couldnt even try a single transport
if (msg.getFailedTransports().size() <= 0) {
if (log.shouldLog(Log.WARN))
log.warn("No bids available for the message " + msg);
context.shitlist().shitlistRouter(to, "No bids");
context.netDb().fail(to);
}
List bids = facade.getBids(msg);
if ( (bids == null) || (bids.size() <= 0) ) {
context.shitlist().shitlistRouter(to, "No bids after " + (bids != null ? bids.size() + " tries" : "0 tries"));
context.netDb().fail(to);
fail(context, msg);
} else {
bid.getTransport().send(msg);
int lowestCost = -1;
TransportBid winner = null;
for (int i = 0; i < bids.size(); i++) {
TransportBid bid = (TransportBid)bids.get(i);
if ( (lowestCost < 0) || (bid.getLatencyMs() < lowestCost) ) {
winner = bid;
lowestCost = bid.getLatencyMs();
}
}
if (winner != null) {
if (log.shouldLog(Log.INFO))
log.info("Winning bid: " + winner + " out of " + bids);
winner.getTransport().send(msg);
}
}
}

View File

@ -30,7 +30,7 @@ public class TransportManager implements TransportEventListener {
private RouterContext _context;
private final static String PROP_DISABLE_TCP = "i2np.tcp.disable";
private static final boolean ENABLE_UDP = false;
private final static String PROP_ENABLE_UDP = "i2np.udp.enable";
public TransportManager(RouterContext context) {
_context = context;
@ -59,7 +59,8 @@ public class TransportManager implements TransportEventListener {
t.setListener(this);
_transports.add(t);
}
if (ENABLE_UDP) {
String enableUDP = _context.router().getConfigSetting(PROP_ENABLE_UDP);
if ( (enableUDP != null) && (Boolean.valueOf(enableUDP).booleanValue())) {
UDPTransport udp = new UDPTransport(_context);
udp.setListener(this);
_transports.add(udp);
@ -75,6 +76,7 @@ public class TransportManager implements TransportEventListener {
_log.debug("Transport " + i + " (" + t.getStyle() + ") started");
}
_log.debug("Done start listening on transports");
_context.router().rebuildRouterInfo();
}
public void restart() {
@ -116,17 +118,20 @@ public class TransportManager implements TransportEventListener {
return rv;
}
public List getBids(OutNetMessage msg) {
List rv = new ArrayList(1);
rv.add(getBid(msg));
return rv;
}
public TransportBid getBid(OutNetMessage msg) {
List bids = getBids(msg);
if ( (bids == null) || (bids.size() <= 0) )
return null;
else
return (TransportBid)bids.get(0);
}
public List getBids(OutNetMessage msg) {
if (msg == null)
throw new IllegalArgumentException("Null message? no bidding on a null outNetMessage!");
if (_context.router().getRouterInfo().equals(msg.getTarget()))
throw new IllegalArgumentException("WTF, bids for a message bound to ourselves?");
List rv = new ArrayList(_transports.size());
Set failedTransports = msg.getFailedTransports();
for (int i = 0; i < _transports.size(); i++) {
Transport t = (Transport)_transports.get(i);
@ -141,13 +146,13 @@ public class TransportManager implements TransportEventListener {
if (bid != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Transport " + t.getStyle() + " bid: " + bid);
return bid;
rv.add(bid);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Transport " + t.getStyle() + " did not produce a bid");
}
}
return null;
return rv;
}
public void messageReceived(I2NPMessage message, RouterIdentity fromRouter, Hash fromRouterHash) {

View File

@ -137,16 +137,18 @@ public class TCPTransport extends TransportImpl {
public TransportBid bid(RouterInfo toAddress, long dataSize) {
RouterAddress addr = toAddress.getTargetAddress(STYLE);
if (addr == null)
return null;
if ( (_myAddress != null) && (_myAddress.equals(addr)) )
return null; // dont talk to yourself
if (getIsConnected(toAddress.getIdentity()))
if (getIsConnected(toAddress.getIdentity())) {
return _fastBid;
else
} else {
if (addr == null)
return null;
return _slowBid;
}
}
private boolean getIsConnected(RouterIdentity ident) {
@ -381,7 +383,10 @@ public class TCPTransport extends TransportImpl {
configureLocalAddress();
_listener.startListening();
if (_myAddress != null) {
return _myAddress.toRouterAddress();
RouterAddress rv = _myAddress.toRouterAddress();
if (rv != null)
replaceAddress(rv);
return rv;
} else {
return null;
}
@ -826,5 +831,6 @@ public class TCPTransport extends TransportImpl {
public SharedBid(int ms) { _ms = ms; }
public int getLatency() { return _ms; }
public Transport getTransport() { return TCPTransport.this; }
public String toString() { return "TCP bid @ " + _ms; }
}
}

View File

@ -1,41 +1,104 @@
package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.List;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Blocking thread that pulls peers off the inboundFragment pool and
* sends them any outstanding ACKs. The logic of what peers get ACKed when
* is determined by the {@link InboundMessageFragments#getNextPeerToACK }
* Blocking thread that is given peers by the inboundFragment pool, sending out
* any outstanding ACKs.
*
*/
public class ACKSender implements Runnable {
private RouterContext _context;
private Log _log;
private InboundMessageFragments _fragments;
private UDPTransport _transport;
private PacketBuilder _builder;
/** list of peers (PeerState) who we have received data from but not yet ACKed to */
private List _peersToACK;
private boolean _alive;
public ACKSender(RouterContext ctx, InboundMessageFragments fragments, UDPTransport transport) {
/** how frequently do we want to send ACKs to a peer? */
private static final int ACK_FREQUENCY = 400;
public ACKSender(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(ACKSender.class);
_fragments = fragments;
_transport = transport;
_peersToACK = new ArrayList(4);
_builder = new PacketBuilder(_context);
_alive = true;
_context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", new long[] { 60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.ackFrequency", "how long ago did we send an ACK to this peer?", "udp", new long[] { 60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendACKRemaining", "when we ack a peer, how many peers are left waiting to ack?", "udp", new long[] { 60*1000, 60*60*1000 });
}
public void ackPeer(PeerState peer) {
synchronized (_peersToACK) {
if (!_peersToACK.contains(peer))
_peersToACK.add(peer);
_peersToACK.notifyAll();
}
}
public void startup() {
_alive = true;
I2PThread t = new I2PThread(this, "UDP ACK sender");
t.setDaemon(true);
t.start();
}
public void shutdown() {
_alive = false;
synchronized (_peersToACK) {
_peersToACK.clear();
_peersToACK.notifyAll();
}
}
public void run() {
while (_fragments.isAlive()) {
PeerState peer = _fragments.getNextPeerToACK();
while (_alive) {
PeerState peer = null;
long now = _context.clock().now();
long remaining = -1;
try {
synchronized (_peersToACK) {
for (int i = 0; i < _peersToACK.size(); i++) {
PeerState cur = (PeerState)_peersToACK.get(i);
long delta = cur.getWantedACKSendSince() + ACK_FREQUENCY - now;
if ( (delta < 0) || (cur.unsentACKThresholdReached()) ) {
_peersToACK.remove(i);
peer = cur;
break;
}
}
if (peer == null) {
if (_peersToACK.size() <= 0)
_peersToACK.wait();
else
_peersToACK.wait(50);
} else {
remaining = _peersToACK.size();
}
}
} catch (InterruptedException ie) {}
if (peer != null) {
long lastSend = peer.getLastACKSend();
long wanted = peer.getWantedACKSendSince();
List acks = peer.retrieveACKs();
if ( (acks != null) && (acks.size() > 0) ) {
_context.statManager().addRateData("udp.sendACKCount", acks.size(), 0);
_context.statManager().addRateData("udp.sendACKRemaining", remaining, 0);
now = _context.clock().now();
_context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted);
_context.statManager().getStatLog().addData(peer.getRemoteHostString(), "udp.peer.sendACKCount", acks.size(), 0);
UDPPacket ack = _builder.buildACK(peer, acks);
ack.markType(1);
if (_log.shouldLog(Log.INFO))
_log.info("Sending ACK for " + acks);
_transport.send(ack);

View File

@ -11,13 +11,16 @@ import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Organize the received data message fragments, allowing its
* {@link MessageReceiver} to pull off completed messages and its
* {@link ACKSender} to pull off peers who need to receive an ACK for
* these messages. In addition, it drops failed fragments and keeps a
* Organize the received data message fragments, feeding completed messages
* to the {@link MessageReceiver} and telling the {@link ACKSender} of new
* peers to ACK. In addition, it drops failed fragments and keeps a
* minimal list of the most recently completed messages (even though higher
* up in the router we have full blown replay detection, its nice to have a
* basic line of defense here)
* basic line of defense here).
*
* TODO: add in some sensible code to drop expired fragments from peers we
* don't hear from again (either a periodic culling for expired peers, or
* a scheduled event)
*
*/
public class InboundMessageFragments {
@ -25,21 +28,15 @@ public class InboundMessageFragments {
private Log _log;
/** Map of peer (Hash) to a Map of messageId (Long) to InboundMessageState objects */
private Map _inboundMessages;
/** list of peers (PeerState) who we have received data from but not yet ACKed to */
private List _unsentACKs;
/** list of messages (InboundMessageState) fully received but not interpreted yet */
private List _completeMessages;
/** list of message IDs recently received, so we can ignore in flight dups */
private DecayingBloomFilter _recentlyCompletedMessages;
private OutboundMessageFragments _outbound;
private UDPTransport _transport;
/** this can be broken down further, but to start, OneBigLock does the trick */
private Object _stateLock;
private ACKSender _ackSender;
private MessageReceiver _messageReceiver;
private boolean _alive;
private static final int RECENTLY_COMPLETED_SIZE = 100;
/** how frequently do we want to send ACKs to a peer? */
private static final int ACK_FREQUENCY = 200;
/** decay the recently completed every 2 minutes */
private static final int DECAY_PERIOD = 120*1000;
@ -47,17 +44,16 @@ public class InboundMessageFragments {
_context = ctx;
_log = ctx.logManager().getLog(InboundMessageFragments.class);
_inboundMessages = new HashMap(64);
_unsentACKs = new ArrayList(64);
_completeMessages = new ArrayList(64);
_outbound = outbound;
_transport = transport;
_ackSender = new ACKSender(_context, _transport);
_messageReceiver = new MessageReceiver(_context, _transport);
_context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receivedACKs", "How many messages were ACKed at a time", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.ignoreRecentDuplicate", "Take note that we received a packet for a recently completed message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveMessagePeriod", "How long it takes to pull the message fragments out of a packet", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveACKPeriod", "How long it takes to pull the ACKs out of a packet", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_stateLock = this;
}
public void startup() {
@ -66,25 +62,18 @@ public class InboundMessageFragments {
// array size (currently its tuned for 10 minute rates for the
// messageValidator)
_recentlyCompletedMessages = new DecayingBloomFilter(_context, DECAY_PERIOD, 8);
I2PThread t = new I2PThread(new ACKSender(_context, this, _transport), "UDP ACK sender");
t.setDaemon(true);
t.start();
t = new I2PThread(new MessageReceiver(_context, this, _transport), "UDP message receiver");
t.setDaemon(true);
t.start();
_ackSender.startup();
_messageReceiver.startup();
}
public void shutdown() {
_alive = false;
if (_recentlyCompletedMessages != null)
_recentlyCompletedMessages.stopDecaying();
_recentlyCompletedMessages = null;
synchronized (_stateLock) {
_completeMessages.clear();
_unsentACKs.clear();
_ackSender.shutdown();
_messageReceiver.shutdown();
synchronized (_inboundMessages) {
_inboundMessages.clear();
_stateLock.notifyAll();
}
}
public boolean isAlive() { return _alive; }
@ -112,7 +101,7 @@ public class InboundMessageFragments {
private void receiveMessages(PeerState from, UDPPacketReader.DataReader data) {
int fragments = data.readFragmentCount();
if (fragments <= 0) return;
synchronized (_stateLock) {
synchronized (_inboundMessages) {
Map messages = (Map)_inboundMessages.get(from.getRemotePeer());
if (messages == null) {
messages = new HashMap(fragments);
@ -125,8 +114,7 @@ public class InboundMessageFragments {
if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
from.messageFullyReceived(messageId);
if (!_unsentACKs.contains(from))
_unsentACKs.add(from);
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.WARN))
_log.warn("Message received is a dup: " + messageId + " dups: "
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
@ -148,14 +136,15 @@ public class InboundMessageFragments {
if (state.isComplete()) {
messageComplete = true;
messages.remove(messageId);
if (messages.size() <= 0)
_inboundMessages.remove(from.getRemotePeer());
_recentlyCompletedMessages.add(messageId.longValue());
_completeMessages.add(state);
_messageReceiver.receiveMessage(state);
from.messageFullyReceived(messageId);
if (!_unsentACKs.contains(from))
_unsentACKs.add(from);
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.INFO))
_log.info("Message received completely! " + state);
@ -165,6 +154,8 @@ public class InboundMessageFragments {
} else if (state.isExpired()) {
messageExpired = true;
messages.remove(messageId);
if (messages.size() <= 0)
_inboundMessages.remove(from.getRemotePeer());
if (_log.shouldLog(Log.WARN))
_log.warn("Message expired while only being partially read: " + state);
state.releaseResources();
@ -173,8 +164,6 @@ public class InboundMessageFragments {
if (!fragmentOK)
break;
}
_stateLock.notifyAll();
}
}
@ -200,50 +189,4 @@ public class InboundMessageFragments {
else
from.dataReceived();
}
/**
* Blocking call to pull off the next fully received message
*
*/
public InboundMessageState receiveNextMessage() {
while (_alive) {
try {
synchronized (_stateLock) {
if (_completeMessages.size() > 0)
return (InboundMessageState)_completeMessages.remove(0);
_stateLock.wait();
}
} catch (InterruptedException ie) {}
}
return null;
}
/**
* Pull off the peer who we next want to send ACKs/NACKs to.
* This call blocks, and only returns null on shutdown.
*
*/
public PeerState getNextPeerToACK() {
while (_alive) {
try {
long now = _context.clock().now();
synchronized (_stateLock) {
for (int i = 0; i < _unsentACKs.size(); i++) {
PeerState peer = (PeerState)_unsentACKs.get(i);
if ( (peer.getLastACKSend() + ACK_FREQUENCY <= now) ||
(peer.unsentACKThresholdReached()) ) {
_unsentACKs.remove(i);
peer.setLastACKSend(now);
return peer;
}
}
if (_unsentACKs.size() > 0)
_stateLock.wait(_context.random().nextInt(100));
else
_stateLock.wait();
}
} catch (InterruptedException ie) {}
}
return null;
}
}

View File

@ -96,7 +96,7 @@ public class InboundMessageState {
if (_fragments != null)
for (int i = 0; i < _fragments.length; i++)
_fragmentCache.release(_fragments[i]);
_fragments = null;
//_fragments = null;
}
public ByteArray[] getFragments() {
@ -107,10 +107,11 @@ public class InboundMessageState {
public String toString() {
StringBuffer buf = new StringBuffer(32);
buf.append("Message: ").append(_messageId);
if (isComplete()) {
buf.append(" completely received with ");
buf.append(getCompleteSize()).append(" bytes");
}
//if (isComplete()) {
// buf.append(" completely received with ");
// buf.append(getCompleteSize()).append(" bytes");
//}
buf.append(" lifetime: ").append(getLifetime());
return buf.toString();
}
}

View File

@ -1,5 +1,8 @@
package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.List;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataFormatException;
@ -7,6 +10,7 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
@ -17,28 +21,62 @@ import net.i2p.util.Log;
public class MessageReceiver implements Runnable {
private RouterContext _context;
private Log _log;
private InboundMessageFragments _fragments;
private UDPTransport _transport;
/** list of messages (InboundMessageState) fully received but not interpreted yet */
private List _completeMessages;
private boolean _alive;
public MessageReceiver(RouterContext ctx, InboundMessageFragments frag, UDPTransport transport) {
public MessageReceiver(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(MessageReceiver.class);
_fragments = frag;
_transport = transport;
_completeMessages = new ArrayList(16);
_alive = true;
}
public void startup() {
_alive = true;
I2PThread t = new I2PThread(this, "UDP message receiver");
t.setDaemon(true);
t.start();
}
public void shutdown() {
_alive = false;
synchronized (_completeMessages) {
_completeMessages.clear();
_completeMessages.notifyAll();
}
}
public void receiveMessage(InboundMessageState state) {
synchronized (_completeMessages) {
_completeMessages.add(state);
_completeMessages.notifyAll();
}
}
public void run() {
while (_fragments.isAlive()) {
InboundMessageState message = _fragments.receiveNextMessage();
if (message == null) continue;
InboundMessageState message = null;
while (_alive) {
try {
synchronized (_completeMessages) {
if (_completeMessages.size() > 0)
message = (InboundMessageState)_completeMessages.remove(0);
else
_completeMessages.wait();
}
} catch (InterruptedException ie) {}
int size = message.getCompleteSize();
if (_log.shouldLog(Log.INFO))
_log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime()
+ "... todo: parse and plop it onto InNetMessagePool");
I2NPMessage msg = readMessage(message);
if (msg != null)
_transport.messageReceived(msg, null, message.getFrom(), message.getLifetime(), size);
if (message != null) {
int size = message.getCompleteSize();
if (_log.shouldLog(Log.INFO))
_log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime()
+ "... todo: parse and plop it onto InNetMessagePool");
I2NPMessage msg = readMessage(message);
if (msg != null)
_transport.messageReceived(msg, null, message.getFrom(), message.getLifetime(), size);
message = null;
}
}
}

View File

@ -34,9 +34,9 @@ public class OutboundMessageFragments {
/** if we can handle more messages explicitly, set this to true */
private boolean _allowExcess;
private static final int MAX_ACTIVE = 64;
private static final int MAX_ACTIVE = 16;
// don't send a packet more than 10 times
private static final int MAX_VOLLEYS = 10;
static final int MAX_VOLLEYS = 10;
public OutboundMessageFragments(RouterContext ctx, UDPTransport transport) {
_context = ctx;
@ -54,6 +54,7 @@ public class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendFailed", "How many fragments were in a message that couldn't be delivered", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the active pool when a new one is added", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public void startup() { _alive = true; }
@ -130,6 +131,11 @@ public class OutboundMessageFragments {
_activeMessages.remove(i);
_transport.succeeded(state.getMessage());
state.releaseResources();
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
i--;
} else if (state.isExpired()) {
_activeMessages.remove(i);
@ -144,6 +150,11 @@ public class OutboundMessageFragments {
_log.warn("Unable to send an expired direct message: " + state);
}
state.releaseResources();
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
i--;
} else if (state.getPushCount() > MAX_VOLLEYS) {
_activeMessages.remove(i);
@ -160,6 +171,11 @@ public class OutboundMessageFragments {
_log.warn("Unable to send a direct message after too many volleys: " + state);
}
state.releaseResources();
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
i--;
}
}
@ -214,6 +230,7 @@ public class OutboundMessageFragments {
int fragmentSize = state.fragmentSize(currentFragment);
if (peer.allocateSendingBytes(fragmentSize)) {
state.incrementCurrentFragment();
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + fragmentSize + " allowed with "
+ peer.getSendWindowBytesRemaining()
@ -223,24 +240,26 @@ public class OutboundMessageFragments {
if (state.justBeganVolley() && (state.getPushCount() > 0) && (state.getFragmentCount() > 1)) {
peer.messageRetransmitted();
if (_log.shouldLog(Log.ERROR))
_log.error("Retransmitting " + state + " to " + peer);
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmitting " + state + " to " + peer);
}
// for fairness, we move on in a round robin
_nextPacketMessage = i + 1;
//_nextPacketMessage = i + 1;
if (currentFragment >= state.getFragmentCount() - 1) {
// this is the last fragment
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount());
if (state.getPeer() != null) {
int rto = state.getPeer().getRTO() * state.getPushCount();
//_log.error("changed volley, rto=" + rto + " volley="+ state.getPushCount());
state.setNextSendTime(now + rto);
} else {
_log.error("changed volley, unknown peer");
if (_log.shouldLog(Log.ERROR))
_log.error("changed volley, unknown peer");
state.setNextSendTime(now + 1000 + _context.random().nextInt(2000));
}
// only move on in round robin after sending a full volley
_nextPacketMessage = (i + 1) % _activeMessages.size();
} else {
if (peer.getSendWindowBytesRemaining() > 0)
state.setNextSendTime(now);
@ -249,6 +268,7 @@ public class OutboundMessageFragments {
}
break;
} else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
if (_log.shouldLog(Log.WARN))
_log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes()
+ " available=" + peer.getSendWindowBytesRemaining()
@ -330,6 +350,11 @@ public class OutboundMessageFragments {
// either the message was a short circuit after establishment,
// or it was received from who we sent it to. yay!
_activeMessages.remove(i);
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
_activeMessages.notifyAll();
break;
} else {
@ -346,8 +371,6 @@ public class OutboundMessageFragments {
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
if ( (numSends > 1) && (state.getPeer() != null) )
state.getPeer().congestionOccurred();
_transport.succeeded(state.getMessage());
int numFragments = state.getFragmentCount();
if (state.getPeer() != null) {
@ -359,8 +382,8 @@ public class OutboundMessageFragments {
state.releaseResources();
return numFragments;
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("Received an ACK for a message not pending: " + messageId);
if (_log.shouldLog(Log.WARN))
_log.warn("Received an ACK for a message not pending: " + messageId);
return 0;
}
}
@ -386,6 +409,11 @@ public class OutboundMessageFragments {
state.acked(ackedFragments);
if (state.isComplete()) {
_activeMessages.remove(i);
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
_activeMessages.notifyAll();
}
break;

View File

@ -174,6 +174,17 @@ public class OutboundMessageState {
return _fragmentSize;
}
public void incrementCurrentFragment() {
int cur = _nextSendFragment;
_fragmentSends[cur]++;
_maxSends = _fragmentSends[cur];
_nextSendFragment++;
if (_nextSendFragment >= _fragmentSends.length) {
_nextSendFragment = 0;
_pushCount++;
}
}
/**
* Pick a fragment that we still need to send. Current implementation
* picks the fragment which has been sent the least (randomly choosing
@ -183,15 +194,7 @@ public class OutboundMessageState {
*/
public int pickNextFragment() {
if (true) {
int rv = _nextSendFragment;
_fragmentSends[rv]++;
_maxSends = _fragmentSends[rv];
_nextSendFragment++;
if (_nextSendFragment >= _fragmentSends.length) {
_nextSendFragment = 0;
_pushCount++;
}
return rv;
return _nextSendFragment;
}
short minValue = -1;
int minIndex = -1;

View File

@ -29,7 +29,7 @@ public class PacketHandler {
private InboundMessageFragments _inbound;
private boolean _keepReading;
private static final int NUM_HANDLERS = 1;
private static final int NUM_HANDLERS = 3;
public PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound) {
_context = ctx;

View File

@ -65,12 +65,14 @@ public class PeerState {
private long _lastReceiveTime;
/** how many consecutive messages have we sent and not received an ACK to */
private int _consecutiveFailedSends;
/** when did we last have a failed send */
private long _lastFailedSendMinute;
/** when did we last have a failed send (beginning of period) */
private long _lastFailedSendPeriod;
/** list of messageIds (Long) that we have received but not yet sent */
private List _currentACKs;
/** when did we last send ACKs to the peer? */
private long _lastACKSend;
private volatile long _lastACKSend;
/** when did we decide we need to ACK to this peer? */
private volatile long _wantACKSendSince;
/** have we received a packet with the ECN bit set in the current second? */
private boolean _currentSecondECNReceived;
/**
@ -79,17 +81,17 @@ public class PeerState {
*/
private boolean _remoteWantsPreviousACKs;
/** how many bytes should we send to the peer in a second */
private int _sendWindowBytes;
private volatile int _sendWindowBytes;
/** how many bytes can we send to the peer in the current second */
private int _sendWindowBytesRemaining;
private volatile int _sendWindowBytesRemaining;
private long _lastSendRefill;
private long _lastCongestionOccurred;
private volatile long _lastCongestionOccurred;
/**
* when sendWindowBytes is below this, grow the window size quickly,
* but after we reach it, grow it slowly
*
*/
private int _slowStartThreshold;
private volatile int _slowStartThreshold;
/** what IP is the peer sending and receiving packets on? */
private byte[] _remoteIP;
/** cached IP address */
@ -116,19 +118,19 @@ public class PeerState {
/** when did we last check the MTU? */
private long _mtuLastChecked;
/** current round trip time estimate */
private int _rtt;
private volatile int _rtt;
/** smoothed mean deviation in the rtt */
private int _rttDeviation;
private volatile int _rttDeviation;
/** current retransmission timeout */
private int _rto;
private volatile int _rto;
private long _messagesReceived;
private long _messagesSent;
private static final int DEFAULT_SEND_WINDOW_BYTES = 16*1024;
private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
private static final int DEFAULT_MTU = 1492;
private static final int DEFAULT_MTU = 1472;
public PeerState(I2PAppContext ctx) {
_context = ctx;
@ -306,11 +308,11 @@ public class PeerState {
/** when did we last receive a packet from them? */
public void setLastReceiveTime(long when) { _lastReceiveTime = when; }
public int incrementConsecutiveFailedSends() {
long now = _context.clock().now()/60*1000;
if (_lastFailedSendMinute == now) {
long now = _context.clock().now()/(10*1000);
if (_lastFailedSendPeriod >= now) {
// ignore... too fast
} else {
_lastFailedSendMinute = now;
_lastFailedSendPeriod = now;
_consecutiveFailedSends++;
}
return _consecutiveFailedSends;
@ -372,6 +374,8 @@ public class PeerState {
/** we received the message specified completely */
public void messageFullyReceived(Long messageId) {
synchronized (_currentACKs) {
if (_wantACKSendSince <= 0)
_wantACKSendSince = _context.clock().now();
if (!_currentACKs.contains(messageId))
_currentACKs.add(messageId);
}
@ -383,17 +387,21 @@ public class PeerState {
* the data through.
*
*/
public void congestionOccurred() {
private boolean congestionOccurred() {
long now = _context.clock().now();
if (_lastCongestionOccurred + 2000 > now)
return; // only shrink once every other second
if (_lastCongestionOccurred + 10*1000 > now)
return false; // only shrink once every 10 seconds
_lastCongestionOccurred = now;
_sendWindowBytes /= 2;
//if (true)
// _sendWindowBytes -= 10000;
//else
_sendWindowBytes = (_sendWindowBytes*2) / 3;
if (_sendWindowBytes < MINIMUM_WINDOW_BYTES)
_sendWindowBytes = MINIMUM_WINDOW_BYTES;
if (_sendWindowBytes < _slowStartThreshold)
_slowStartThreshold = _sendWindowBytes;
return true;
}
/** pull off the ACKs (Long) to send to the peer */
@ -404,19 +412,21 @@ public class PeerState {
if (_currentACKs.size() < threshold) {
rv = new ArrayList(_currentACKs);
_currentACKs.clear();
_wantACKSendSince = -1;
} else {
rv = new ArrayList(threshold);
for (int i = 0; i < threshold; i++)
rv.add(_currentACKs.remove(0));
}
}
_lastACKSend = _context.clock().now();
return rv;
}
/** we sent a message which was ACKed containing the given # of bytes */
public void messageACKed(int bytesACKed, long lifetime, int numSends) {
_consecutiveFailedSends = 0;
_lastFailedSendMinute = -1;
_lastFailedSendPeriod = -1;
if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
} else {
@ -449,6 +459,7 @@ public class PeerState {
}
/** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted() {
congestionOccurred();
//_rto *= 2;
}
/** how long does it usually take to get a message ACKed? */
@ -477,6 +488,7 @@ public class PeerState {
/** when did we last send an ACK to the peer? */
public long getLastACKSend() { return _lastACKSend; }
public void setLastACKSend(long when) { _lastACKSend = when; }
public long getWantedACKSendSince() { return _wantACKSendSince; }
public boolean unsentACKThresholdReached() {
int threshold = countMaxACKs();
synchronized (_currentACKs) {

View File

@ -31,6 +31,7 @@ public class UDPPacket {
private long _expiration;
private byte[] _data;
private ByteArray _dataBuf;
private int _markedType;
private static final List _packetCache;
static {
@ -72,6 +73,7 @@ public class UDPPacket {
_data = _dataBuf.getData();
_packet = new DatagramPacket(_data, MAX_PACKET_SIZE);
_initializeTime = _context.clock().now();
_markedType = -1;
}
public void initialize(short priority, long expiration, InetAddress host, int port) {
@ -92,8 +94,12 @@ public class UDPPacket {
public DatagramPacket getPacket() { return _packet; }
public short getPriority() { return _priority; }
public long getExpiration() { return _expiration; }
public long getBegin() { return _initializeTime; }
public long getLifetime() { return _context.clock().now() - _initializeTime; }
public void resetBegin() { _initializeTime = _context.clock().now(); }
/** flag this packet as a particular type for accounting purposes */
public void markType(int type) { _markedType = type; }
public int getMarkedType() { return _markedType; }
/**
* Validate the packet against the MAC specified, returning true if the
@ -173,6 +179,7 @@ public class UDPPacket {
rv._log = ctx.logManager().getLog(UDPPacket.class);
rv.resetBegin();
Arrays.fill(rv._data, (byte)0x00);
rv._markedType = -1;
return rv;
}
}

View File

@ -25,7 +25,7 @@ public class UDPSender {
private boolean _keepRunning;
private Runner _runner;
private static final int MAX_QUEUED = 64;
private static final int MAX_QUEUED = 4;
public UDPSender(RouterContext ctx, DatagramSocket socket, String name) {
_context = ctx;
@ -35,7 +35,11 @@ public class UDPSender {
_runner = new Runner();
_name = name;
_context.statManager().createRateStat("udp.pushTime", "How long a UDP packet takes to get pushed out", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendQueueSize", "How many packets are queued on the UDP sender", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendPacketSize", "How large packets sent are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.socketSendTime", "How long the actual socket.send took", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendBWThrottleTime", "How long the send is blocked by the bandwidth throttle", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendACKTime", "How long an ACK packet is blocked for (duration == lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
}
public void startup() {
@ -83,6 +87,7 @@ public class UDPSender {
}
} catch (InterruptedException ie) {}
}
_context.statManager().addRateData("udp.sendQueueSize", remaining, packet.getLifetime());
return remaining;
}
@ -97,6 +102,7 @@ public class UDPSender {
size = _outboundQueue.size();
_outboundQueue.notifyAll();
}
_context.statManager().addRateData("udp.sendQueueSize", size, packet.getLifetime());
return size;
}
@ -112,6 +118,7 @@ public class UDPSender {
UDPPacket packet = getNextPacket();
if (packet != null) {
long acquireTime = _context.clock().now();
int size = packet.getPacket().getLength();
if (size > 0) {
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender");
@ -119,6 +126,8 @@ public class UDPSender {
req.waitForNextAllocation();
}
long afterBW = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) {
int len = packet.getPacket().getLength();
//if (len > 128)
@ -127,10 +136,16 @@ public class UDPSender {
}
try {
long before = _context.clock().now();
synchronized (Runner.this) {
// synchronization lets us update safely
_socket.send(packet.getPacket());
}
long sendTime = _context.clock().now() - before;
_context.statManager().addRateData("udp.socketSendTime", sendTime, packet.getLifetime());
_context.statManager().addRateData("udp.sendBWThrottleTime", afterBW - acquireTime, acquireTime - packet.getBegin());
if (packet.getMarkedType() == 1)
_context.statManager().addRateData("udp.sendACKTime", afterBW - acquireTime, packet.getLifetime());
_context.statManager().addRateData("udp.pushTime", packet.getLifetime(), packet.getLifetime());
_context.statManager().addRateData("udp.sendPacketSize", packet.getPacket().getLength(), packet.getLifetime());
} catch (IOException ioe) {

View File

@ -86,9 +86,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 };
/** should we flood all UDP peers with the configured rate? */
private static final boolean SHOULD_FLOOD_PEERS = true;
private static final boolean SHOULD_FLOOD_PEERS = false;
private static final int MAX_CONSECUTIVE_FAILED = 2;
private static final int MAX_CONSECUTIVE_FAILED = 5;
public UDPTransport(RouterContext ctx) {
super(ctx);
@ -103,7 +103,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_relayPeers = new ArrayList(1);
_fastBid = new SharedBid(50);
_slowBid = new SharedBid(100);
_slowBid = new SharedBid(1000);
_fragments = new OutboundMessageFragments(_context, this);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this);
@ -353,6 +353,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.debug("bidding on a message to an established peer: " + peer);
return _fastBid;
} else {
if (null == toAddress.getTargetAddress(STYLE))
return null;
if (_log.shouldLog(Log.DEBUG))
_log.debug("bidding on a message to an unestablished peer: " + to.toBase64());
return _slowBid;
@ -478,12 +481,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void failed(OutboundMessageState msg) {
if (msg == null) return;
int consecutive = 0;
if (msg.getPeer() != null)
if ( (msg.getPeer() != null) &&
( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) ||
(msg.isExpired())) ) {
consecutive = msg.getPeer().incrementConsecutiveFailedSends();
if (_log.shouldLog(Log.WARN))
_log.warn("Consecutive failure #" + consecutive + " sending to " + msg.getPeer());
if (consecutive > MAX_CONSECUTIVE_FAILED)
dropPeer(msg.getPeer());
if (_log.shouldLog(Log.WARN))
_log.warn("Consecutive failure #" + consecutive + " sending to " + msg.getPeer());
if (consecutive > MAX_CONSECUTIVE_FAILED)
dropPeer(msg.getPeer());
}
failed(msg.getMessage());
}
@ -614,5 +620,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public SharedBid(int ms) { _ms = ms; }
public int getLatency() { return _ms; }
public Transport getTransport() { return UDPTransport.this; }
public String toString() { return "UDP bid @ " + _ms; }
}
}