2006-02-23 jrandom

* Increase the SSU retransmit ceiling (for slow links)
    * Estimate the sender's SSU MTU (to help see if we agree)
This commit is contained in:
jrandom
2006-02-23 14:38:39 +00:00
committed by zzz
parent 8b7958cff2
commit 3d8d21e543
12 changed files with 113 additions and 49 deletions

View File

@ -1,4 +1,8 @@
$Id: history.txt,v 1.416 2006/02/22 09:54:22 jrandom Exp $
$Id: history.txt,v 1.417 2006/02/23 03:08:37 jrandom Exp $
2006-02-23 jrandom
* Increase the SSU retransmit ceiling (for slow links)
* Estimate the sender's SSU MTU (to help see if we agree)
2006-02-22 jrandom
* Fix to properly profile tunnel joins (thanks Ragnarok, frosk, et al!)

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.357 $ $Date: 2006/02/22 09:54:23 $";
public final static String ID = "$Revision: 1.358 $ $Date: 2006/02/22 20:48:47 $";
public final static String VERSION = "0.6.1.11";
public final static long BUILD = 2;
public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -81,7 +81,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
int acksIncluded = receiveACKs(from, data);
long afterACKs = _context.clock().now();
from.packetReceived();
from.packetReceived(data.getPacketSize());
_context.statManager().addRateData("udp.receiveMessagePeriod", afterMsgs-beforeMsgs, afterACKs-beforeMsgs);
_context.statManager().addRateData("udp.receiveACKPeriod", afterACKs-afterMsgs, afterACKs-beforeMsgs);
if ( (fragmentsIncluded > 0) && (acksIncluded > 0) )

View File

@ -138,6 +138,7 @@ public class OutboundMessageFragments {
if ( (msgBody == null) || (target == null) )
return;
// todo: make sure the outNetMessage is initialzed once and only once
OutboundMessageState state = new OutboundMessageState(_context);
boolean ok = state.initialize(msg, msgBody);
if (ok) {

View File

@ -63,6 +63,8 @@ public class PacketHandler {
_context.statManager().createRateStat("udp.packetDequeueTime", "How long it takes the UDPReader to pull a packet off the inbound packet queue (when its slow)", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (period is dequeue time)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.packetVerifyTimeSlow", "How long it takes the PacketHandler to verify a data packet after dequeueing when its slow (period is dequeue time)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.packetValidateMultipleCount", "How many times we validate a packet, if done more than once (period = afterValidate-enqueue)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.packetNoValidationLifetime", "How long packets that are never validated are around for", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
}
public void startup() {
@ -138,6 +140,7 @@ public class PacketHandler {
long recvTime = packet.getReceivedTime();
long beforeValidateTime = packet.getBeforeValidate();
long afterValidateTime = packet.getAfterValidate();
int validateCount = packet.getValidateCount();
long timeToDequeue = recvTime - enqueueTime;
long timeToValidate = 0;
@ -150,11 +153,15 @@ public class PacketHandler {
_context.statManager().addRateData("udp.packetDequeueTime", timeToDequeue, timeToDequeue);
if (authTime > 50)
_context.statManager().addRateData("udp.packetAuthRecvTime", authTime, beforeValidateTime-recvTime);
if (timeToValidate > 0) {
if (afterValidateTime > 0) {
_context.statManager().addRateData("udp.packetVerifyTime", timeToValidate, authTime);
if (timeToValidate > 50)
_context.statManager().addRateData("udp.packetVerifyTimeSlow", timeToValidate, authTime);
}
if (validateCount > 1)
_context.statManager().addRateData("udp.packetValidateMultipleCount", validateCount, timeToValidate);
else if (validateCount <= 0)
_context.statManager().addRateData("udp.packetNoValidationLifetime", packet.getLifetime(), 0);
// back to the cache with thee!
packet.release();

View File

@ -138,6 +138,9 @@ public class PeerState {
private long _theyRelayToUsAs;
/** what is the largest packet we can send to the peer? */
private int _mtu;
private int _mtuReceive;
/* how many consecutive packets at or under the min MTU have been received */
private long _consecutiveSmall;
/** when did we last check the MTU? */
private long _mtuLastChecked;
private long _mtuIncreases;
@ -209,7 +212,7 @@ public class PeerState {
private static final int LARGE_MTU = 1350;
private static final int MIN_RTO = 100 + ACKSender.ACK_FREQUENCY;
private static final int MAX_RTO = 1200; // 5000;
private static final int MAX_RTO = 2000; // 5000;
/** override the default MTU */
private static final String PROP_DEFAULT_MTU = "i2np.udp.mtu";
@ -248,6 +251,7 @@ public class PeerState {
_weRelayToThemAs = 0;
_theyRelayToUsAs = 0;
_mtu = getDefaultMTU();
_mtuReceive = _mtu;
_mtuLastChecked = -1;
_lastACKSend = -1;
_rtt = 1000;
@ -378,6 +382,8 @@ public class PeerState {
public long getTheyRelayToUsAs() { return _theyRelayToUsAs; }
/** what is the largest packet we can send to the peer? */
public int getMTU() { return _mtu; }
/** estimate how large the other side is sending packets */
public int getReceiveMTU() { return _mtuReceive; }
/** when did we last check the MTU? */
public long getMTULastChecked() { return _mtuLastChecked; }
public long getMTUIncreases() { return _mtuIncreases; }
@ -866,7 +872,7 @@ public class PeerState {
private void adjustMTU() {
double retransPct = 0;
if (_packetsTransmitted > 0) {
if (_packetsTransmitted > 10) {
retransPct = (double)_packetsRetransmitted/(double)_packetsTransmitted;
boolean wantLarge = retransPct < .25d; // heuristic to allow fairly lossy links to use large MTUs
if (wantLarge && _mtu != LARGE_MTU) {
@ -930,7 +936,18 @@ public class PeerState {
public long getPacketRetransmissionRate() { return _packetRetransmissionRate; }
public long getPacketsReceived() { return _packetsReceived; }
public long getPacketsReceivedDuplicate() { return _packetsReceivedDuplicate; }
public void packetReceived() { _packetsReceived++; }
public void packetReceived(int size) {
_packetsReceived++;
if (size <= MIN_MTU)
_consecutiveSmall++;
else
_consecutiveSmall = 0;
if ( (_consecutiveSmall < 50) && (_packetsReceived > 50) )
_mtuReceive = LARGE_MTU;
else
_mtuReceive = MIN_MTU;
}
/**
* we received a backoff request, so cut our send window

View File

@ -42,6 +42,7 @@ public class UDPPacket {
private long _afterValidate;
private long _beforeReceiveFragments;
private long _afterHandlingTime;
private int _validateCount;
private boolean _isInbound;
private static final List _packetCache;
@ -211,6 +212,7 @@ public class UDPPacket {
_validateCache.release(buf);
_afterValidate = _context.clock().now();
_validateCount++;
return eq;
}
@ -254,6 +256,8 @@ public class UDPPacket {
long getBeforeValidate() { return _beforeValidate; }
/** when we finished validate() */
long getAfterValidate() { return _afterValidate; }
/** how many times we tried to validate the packet */
int getValidateCount() { return _validateCount; }
public String toString() {
verifyNotReleased();

View File

@ -252,6 +252,7 @@ public class UDPPacketReader {
/** parse out the data message */
public class DataReader {
public int getPacketSize() { return _payloadLength; }
public boolean readACKsIncluded() {
return flagSet(UDPPacket.DATA_FLAG_EXPLICIT_ACK);
}

View File

@ -1316,9 +1316,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("</code></td>");
buf.append("<td valign=\"top\" ><code>");
buf.append(peer.getMTU()).append('/');
buf.append(peer.getMTUIncreases()).append('/');
buf.append(peer.getMTUDecreases());
buf.append(peer.getMTU()).append("/").append(peer.getReceiveMTU());
//.append('/');
//buf.append(peer.getMTUIncreases()).append('/');
//buf.append(peer.getMTUDecreases());
buf.append("</code></td>");
long sent = peer.getPacketsTransmitted();
@ -1433,7 +1435,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
"<b id=\"def.rtt\">rtt</b>: the round trip time is how long it takes to get an acknowledgement of a packet<br />\n" +
"<b id=\"def.dev\">dev</b>: the standard deviation of the round trip time, to help control the retransmit timeout<br />\n" +
"<b id=\"def.rto\">rto</b>: the retransmit timeout controls how frequently an unacknowledged packet will be retransmitted<br />\n" +
"<b id=\"def.mtu\">mtu</b>: current sending packet size/number of times it increased/number of times it decreased<br />\n" +
"<b id=\"def.mtu\">mtu</b>: current sending packet size / estimated receiving packet size<br />\n" +
"<b id=\"def.send\">send</b>: the number of packets sent to the peer<br />\n" +
"<b id=\"def.recv\">recv</b>: the number of packets received from the peer<br />\n" +
"<b id=\"def.resent\">resent</b>: the number of packets retransmitted to the peer<br />\n" +

View File

@ -28,6 +28,8 @@ public class TunnelCreatorConfig implements TunnelInfo {
private boolean _isInbound;
private long _messagesProcessed;
private volatile long _verifiedBytesTransferred;
private boolean _failed;
private int _failures;
public TunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound) {
this(ctx, length, isInbound, null);
@ -45,6 +47,8 @@ public class TunnelCreatorConfig implements TunnelInfo {
_destination = destination;
_messagesProcessed = 0;
_verifiedBytesTransferred = 0;
_failed = false;
_failures = 0;
}
/** how many hops are there in the tunnel? */
@ -90,8 +94,6 @@ public class TunnelCreatorConfig implements TunnelInfo {
public long getReplyMessageId() { return _replyMessageId; }
public void setReplyMessageId(long id) { _replyMessageId = id; }
public void testSuccessful(int ms) {}
/** take note of a message being pumped through this tunnel */
public void incrementProcessedMessages() { _messagesProcessed++; }
public long getProcessedMessagesCount() { return _messagesProcessed; }
@ -135,6 +137,30 @@ public class TunnelCreatorConfig implements TunnelInfo {
}
private static final int MAX_CONSECUTIVE_TEST_FAILURES = 2;
/**
* The tunnel failed, so stop using it
*/
public boolean tunnelFailed() {
_failures++;
if (_failures > MAX_CONSECUTIVE_TEST_FAILURES) {
_failed = true;
return false;
} else {
return true;
}
}
public boolean getTunnelFailed() { return _failed; }
public int getTunnelFailures() { return _failures; }
public void testSuccessful(int ms) {
int failures = _failures - 1;
if (failures < 0)
_failures = 0;
else
_failures = failures;
}
public String toString() {
// H0:1235-->H1:2345-->H2:2345
@ -168,6 +194,8 @@ public class TunnelCreatorConfig implements TunnelInfo {
if (_replyMessageId > 0)
buf.append(" replyMessageId ").append(_replyMessageId);
buf.append(" with ").append(_messagesProcessed).append("/").append(_verifiedBytesTransferred).append(" msgs/bytes");
buf.append(" with ").append(_failures).append(" failures");
return buf.toString();
}

View File

@ -105,6 +105,15 @@ public class TunnelDispatcher implements Service {
ctx.statManager().createRateStat("tunnel.participatingMessageCount",
"How many messages are sent through a participating tunnel?", "Tunnels",
new long[] { 60*10*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.ownedMessageCount",
"How many messages are sent through a tunnel we created (period == failures)?", "Tunnels",
new long[] { 60*1000l, 10*60*1000l, 60*60*1000l });
ctx.statManager().createRateStat("tunnel.failedCompletelyMessages",
"How many messages are sent through a tunnel that failed prematurely (period == failures)?", "Tunnels",
new long[] { 60*1000l, 10*60*1000l, 60*60*1000l });
ctx.statManager().createRateStat("tunnel.failedPartially",
"How many messages are sent through a tunnel that only failed partially (period == failures)?", "Tunnels",
new long[] { 60*1000l, 10*60*1000l, 60*60*1000l });
}
private TunnelGateway.QueuePreprocessor createPreprocessor(HopConfig cfg) {
@ -301,6 +310,15 @@ public class TunnelDispatcher implements Service {
// update stats based on gw.getMessagesSent()
}
}
long msgs = cfg.getProcessedMessagesCount();
int failures = cfg.getTunnelFailures();
boolean failed = cfg.getTunnelFailed();
_context.statManager().addRateData("tunnel.ownedMessageCount", msgs, failures);
if (failed) {
_context.statManager().addRateData("tunnel.failedCompletelyMessages", msgs, failures);
} else if (failures > 0) {
_context.statManager().addRateData("tunnel.failedPartiallyMessages", msgs, failures);
}
}
/**

View File

@ -13,10 +13,8 @@ import net.i2p.util.Log;
*/
public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
private TunnelPool _pool;
private boolean _failed;
private TestJob _testJob;
private Job _expireJob;
private int _failures;
private TunnelInfo _pairedTunnel;
/** Creates a new instance of PooledTunnelCreatorConfig */
@ -26,20 +24,29 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
}
public PooledTunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound, Hash destination) {
super(ctx, length, isInbound, destination);
_failed = false;
_pool = null;
_failures = 0;
}
public void testSuccessful(int ms) {
if (_testJob != null) {
if (_testJob != null)
_testJob.testSuccessful(ms);
super.testSuccessful(ms);
}
/**
* The tunnel failed, so stop using it
*/
public boolean tunnelFailed() {
boolean rv = super.tunnelFailed();
if (!rv) {
// remove us from the pool (but not the dispatcher) so that we aren't
// selected again. _expireJob is left to do its thing, in case there
// are any straggling messages coming down the tunnel
_pool.tunnelFailed(this);
if (_testJob != null) // just in case...
_context.jobQueue().removeJob(_testJob);
}
int failures = _failures - 1;
if (failures < 0)
_failures = 0;
else
_failures = failures;
return rv;
}
public Properties getOptions() {
@ -47,31 +54,6 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
return _pool.getSettings().getUnknownOptions();
}
public String toString() {
return super.toString() + " with " + _failures + " failures";
}
private static final int MAX_CONSECUTIVE_TEST_FAILURES = 2;
/**
* The tunnel failed, so stop using it
*/
public boolean tunnelFailed() {
_failures++;
if (_failures > MAX_CONSECUTIVE_TEST_FAILURES) {
_failed = true;
// remove us from the pool (but not the dispatcher) so that we aren't
// selected again. _expireJob is left to do its thing, in case there
// are any straggling messages coming down the tunnel
_pool.tunnelFailed(this);
if (_testJob != null) // just in case...
_context.jobQueue().removeJob(_testJob);
return false;
} else {
return true;
}
}
public boolean getTunnelFailed() { return _failed; }
public void setTunnelPool(TunnelPool pool) {
if (pool != null) {
_pool = pool;