2006-02-18 jrandom
* Add a new AIMD throttle in SSU to control the number of concurrent messages being sent to a given peer, in addition to the throttle on the number of concurrent bytes to that peer. * Adjust the existing SSU outbound queue to throttle based on the queue's lag, not an arbitrary number of packets.
This commit is contained in:
@ -121,7 +121,7 @@ class RouterThrottleImpl implements RouterThrottle {
|
||||
if (rs != null)
|
||||
r = rs.getRate(60*1000);
|
||||
double processTime = (r != null ? r.getAverageValue() : 0);
|
||||
if (processTime > 2000) {
|
||||
if (processTime > 5000) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Refusing tunnel request with the job lag of " + lag
|
||||
+ "since the 1 minute message processing time is too slow (" + processTime + ")");
|
||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.346 $ $Date: 2006/02/17 04:07:53 $";
|
||||
public final static String ID = "$Revision: 1.347 $ $Date: 2006/02/17 17:29:32 $";
|
||||
public final static String VERSION = "0.6.1.10";
|
||||
public final static long BUILD = 2;
|
||||
public final static long BUILD = 3;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
|
||||
System.out.println("Router ID: " + RouterVersion.ID);
|
||||
|
@ -78,6 +78,8 @@ public class OutboundMessageFragments {
|
||||
_context.statManager().createRateStat("udp.packetsRetransmitted", "Lifetime of packets during their retransmission (period == packets transmitted, lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.peerPacketsRetransmitted", "How many packets have been retransmitted to the peer (lifetime) when a burst of packets are retransmitted (period == packets transmitted, lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.blockedRetransmissions", "How packets have been transmitted to the peer when we blocked a retransmission to them?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.sendCycleTime", "How long it takes to cycle through all of the active messages?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.sendCycleTimeSlow", "How long it takes to cycle through all of the active messages, when its going slowly?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
}
|
||||
|
||||
public void startup() { _alive = true; }
|
||||
@ -145,6 +147,8 @@ public class OutboundMessageFragments {
|
||||
if (ok)
|
||||
_activeMessages.add(state);
|
||||
active = _activeMessages.size();
|
||||
if (active == 1)
|
||||
_lastCycleTime = System.currentTimeMillis();
|
||||
_activeMessages.notifyAll();
|
||||
}
|
||||
msg.timestamp("made active along with: " + active);
|
||||
@ -158,6 +162,8 @@ public class OutboundMessageFragments {
|
||||
public void add(OutboundMessageState state) {
|
||||
synchronized (_activeMessages) {
|
||||
_activeMessages.add(state);
|
||||
if (_activeMessages.size() == 1)
|
||||
_lastCycleTime = System.currentTimeMillis();
|
||||
_activeMessages.notifyAll();
|
||||
}
|
||||
}
|
||||
@ -264,6 +270,8 @@ public class OutboundMessageFragments {
|
||||
|
||||
private static final long SECOND_MASK = 1023l;
|
||||
|
||||
private long _lastCycleTime = System.currentTimeMillis();
|
||||
|
||||
/**
|
||||
* Fetch all the packets for a message volley, blocking until there is a
|
||||
* message which can be fully transmitted (or the transport is shut down).
|
||||
@ -282,6 +290,14 @@ public class OutboundMessageFragments {
|
||||
synchronized (_activeMessages) {
|
||||
for (int i = 0; i < _activeMessages.size(); i++) {
|
||||
int cur = (i + _nextPacketMessage) % _activeMessages.size();
|
||||
if (cur == 0) {
|
||||
long ts = System.currentTimeMillis();
|
||||
long cycleTime = ts - _lastCycleTime;
|
||||
_lastCycleTime = ts;
|
||||
_context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activeMessages.size());
|
||||
if (cycleTime > 1000)
|
||||
_context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activeMessages.size());
|
||||
}
|
||||
state = (OutboundMessageState)_activeMessages.get(cur);
|
||||
peer = state.getPeer(); // known if this is immediately after establish
|
||||
if (peer == null)
|
||||
|
@ -39,7 +39,7 @@ public class PacketPusher implements Runnable {
|
||||
if (packets != null) {
|
||||
for (int i = 0; i < packets.length; i++) {
|
||||
if (packets[i] != null) // null for ACKed fragments
|
||||
_sender.add(packets[i], 100); // blocks for up to 100ms
|
||||
_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -171,6 +171,15 @@ public class PeerState {
|
||||
|
||||
/** have we migrated away from this peer to another newer one? */
|
||||
private volatile boolean _dead;
|
||||
|
||||
/** how many concurrent outbound messages do we allow throws OutboundMessageFragments to send */
|
||||
private volatile int _concurrentMessagesAllowed = 8;
|
||||
/**
|
||||
* how many outbound messages are currently being transmitted. Not thread safe, as we're not strict
|
||||
*/
|
||||
private volatile int _concurrentMessagesActive = 0;
|
||||
/** how many concurrency rejections have we had in a row */
|
||||
private volatile int _consecutiveRejections = 0;
|
||||
|
||||
private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
|
||||
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
|
||||
@ -253,6 +262,9 @@ public class PeerState {
|
||||
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
}
|
||||
|
||||
private int getDefaultMTU() {
|
||||
@ -414,6 +426,10 @@ public class PeerState {
|
||||
public int getSendBps() { return _sendBps; }
|
||||
public int getReceiveBps() { return _receiveBps; }
|
||||
public int incrementConsecutiveFailedSends() {
|
||||
_concurrentMessagesActive--;
|
||||
if (_concurrentMessagesActive < 0)
|
||||
_concurrentMessagesActive = 0;
|
||||
|
||||
long now = _context.clock().now()/(10*1000);
|
||||
if (_lastFailedSendPeriod >= now) {
|
||||
// ignore... too fast
|
||||
@ -469,6 +485,17 @@ public class PeerState {
|
||||
}
|
||||
//if (true) return true;
|
||||
if (IGNORE_CWIN || size <= _sendWindowBytesRemaining || (ALWAYS_ALLOW_FIRST_PUSH && messagePushCount == 0)) {
|
||||
if ( (messagePushCount == 0) && (_concurrentMessagesActive > _concurrentMessagesAllowed) ) {
|
||||
_consecutiveRejections++;
|
||||
_context.statManager().addRateData("udp.rejectConcurrentActive", _concurrentMessagesActive, _consecutiveRejections);
|
||||
return false;
|
||||
} else if (messagePushCount == 0) {
|
||||
_context.statManager().addRateData("udp.allowConcurrentActive", _concurrentMessagesActive, _concurrentMessagesAllowed);
|
||||
_concurrentMessagesActive++;
|
||||
if (_consecutiveRejections > 0)
|
||||
_context.statManager().addRateData("udp.rejectConcurrentSequence", _consecutiveRejections, _concurrentMessagesActive);
|
||||
_consecutiveRejections = 0;
|
||||
}
|
||||
_sendWindowBytesRemaining -= size;
|
||||
_sendBytes += size;
|
||||
_lastSendTime = now;
|
||||
@ -479,6 +506,7 @@ public class PeerState {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/** what IP+port is the peer sending and receiving packets on? */
|
||||
public void setRemoteAddress(byte ip[], int port) {
|
||||
_remoteIP = ip;
|
||||
@ -505,6 +533,9 @@ public class PeerState {
|
||||
_mtuLastChecked = _context.clock().now();
|
||||
}
|
||||
public int getSlowStartThreshold() { return _slowStartThreshold; }
|
||||
public int getConcurrentSends() { return _concurrentMessagesActive; }
|
||||
public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; }
|
||||
public int getConsecutiveSendRejections() { return _consecutiveRejections; }
|
||||
|
||||
/** we received the message specified completely */
|
||||
public void messageFullyReceived(Long messageId, int bytes) { messageFullyReceived(messageId, bytes, false); }
|
||||
@ -745,9 +776,16 @@ public class PeerState {
|
||||
|
||||
/** we sent a message which was ACKed containing the given # of bytes */
|
||||
public void messageACKed(int bytesACKed, long lifetime, int numSends) {
|
||||
_concurrentMessagesActive--;
|
||||
if (_concurrentMessagesActive < 0)
|
||||
_concurrentMessagesActive = 0;
|
||||
|
||||
_consecutiveFailedSends = 0;
|
||||
_lastFailedSendPeriod = -1;
|
||||
if (numSends < 2) {
|
||||
if (_context.random().nextInt(_concurrentMessagesAllowed) <= 0)
|
||||
_concurrentMessagesAllowed++;
|
||||
|
||||
if (_sendWindowBytes <= _slowStartThreshold) {
|
||||
_sendWindowBytes += bytesACKed;
|
||||
} else {
|
||||
@ -761,6 +799,11 @@ public class PeerState {
|
||||
_sendWindowBytes += bytesACKed; //512; // bytesACKed;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int allow = _concurrentMessagesAllowed - 1;
|
||||
if (allow < 8)
|
||||
allow = 8;
|
||||
_concurrentMessagesAllowed = allow;
|
||||
}
|
||||
if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES)
|
||||
_sendWindowBytes = MAX_SEND_WINDOW_BYTES;
|
||||
|
@ -36,6 +36,8 @@ public class UDPSender {
|
||||
_name = name;
|
||||
_context.statManager().createRateStat("udp.pushTime", "How long a UDP packet takes to get pushed out", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.sendQueueSize", "How many packets are queued on the UDP sender", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.sendQueueFailed", "How often it was unable to add a new packet to the queue", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.sendQueueTrimmed", "How many packets were removed from the queue for being too old (duration == remaining)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.sendPacketSize", "How large packets sent are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.socketSendTime", "How long the actual socket.send took", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("udp.sendBWThrottleTime", "How long the send is blocked by the bandwidth throttle", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
@ -89,12 +91,29 @@ public class UDPSender {
|
||||
long expiration = _context.clock().now() + blockTime;
|
||||
int remaining = -1;
|
||||
long lifetime = -1;
|
||||
boolean added = false;
|
||||
int removed = 0;
|
||||
while ( (_keepRunning) && (remaining < 0) ) {
|
||||
try {
|
||||
synchronized (_outboundQueue) {
|
||||
if (_outboundQueue.size() < MAX_QUEUED) {
|
||||
// clear out any too-old packets
|
||||
UDPPacket head = null;
|
||||
if (_outboundQueue.size() > 0) {
|
||||
head = (UDPPacket)_outboundQueue.get(0);
|
||||
while (head.getLifetime() > MAX_HEAD_LIFETIME) {
|
||||
_outboundQueue.remove(0);
|
||||
removed++;
|
||||
if (_outboundQueue.size() > 0)
|
||||
head = (UDPPacket)_outboundQueue.get(0);
|
||||
else
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (true || (_outboundQueue.size() < MAX_QUEUED)) {
|
||||
lifetime = packet.getLifetime();
|
||||
_outboundQueue.add(packet);
|
||||
added = true;
|
||||
remaining = _outboundQueue.size();
|
||||
_outboundQueue.notifyAll();
|
||||
} else {
|
||||
@ -105,16 +124,23 @@ public class UDPSender {
|
||||
remaining = _outboundQueue.size();
|
||||
_outboundQueue.notifyAll();
|
||||
}
|
||||
lifetime = packet.getLifetime();
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
_context.statManager().addRateData("udp.sendQueueSize", remaining, lifetime);
|
||||
if (!added)
|
||||
_context.statManager().addRateData("udp.sendQueueFailed", remaining, lifetime);
|
||||
if (removed > 0)
|
||||
_context.statManager().addRateData("udp.sendQueueTrimmed", removed, remaining);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Added the packet onto the queue with " + remaining + " remaining and a lifetime of " + lifetime);
|
||||
return remaining;
|
||||
}
|
||||
|
||||
private static final int MAX_HEAD_LIFETIME = 1000;
|
||||
|
||||
/**
|
||||
*
|
||||
* @return number of packets in the queue
|
||||
@ -123,13 +149,28 @@ public class UDPSender {
|
||||
if (packet == null) return 0;
|
||||
int size = 0;
|
||||
long lifetime = -1;
|
||||
int removed = 0;
|
||||
synchronized (_outboundQueue) {
|
||||
lifetime = packet.getLifetime();
|
||||
UDPPacket head = null;
|
||||
if (_outboundQueue.size() > 0) {
|
||||
head = (UDPPacket)_outboundQueue.get(0);
|
||||
while (head.getLifetime() > MAX_HEAD_LIFETIME) {
|
||||
_outboundQueue.remove(0);
|
||||
removed++;
|
||||
if (_outboundQueue.size() > 0)
|
||||
head = (UDPPacket)_outboundQueue.get(0);
|
||||
else
|
||||
break;
|
||||
}
|
||||
}
|
||||
_outboundQueue.add(packet);
|
||||
size = _outboundQueue.size();
|
||||
_outboundQueue.notifyAll();
|
||||
}
|
||||
_context.statManager().addRateData("udp.sendQueueSize", size, lifetime);
|
||||
if (removed > 0)
|
||||
_context.statManager().addRateData("udp.sendQueueTrimmed", removed, size);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + lifetime);
|
||||
return size;
|
||||
|
@ -955,6 +955,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
+ " to " + msg.getPeer());
|
||||
if ( (consecutive > MAX_CONSECUTIVE_FAILED) && (msg.getPeer().getInactivityTime() > DROP_INACTIVITY_TIME))
|
||||
dropPeer(msg.getPeer(), false);
|
||||
else if (consecutive > 2 * MAX_CONSECUTIVE_FAILED) // they're sending us data, but we cant reply?
|
||||
dropPeer(msg.getPeer(), false);
|
||||
}
|
||||
noteSend(msg, false);
|
||||
super.afterSend(msg.getMessage(), false);
|
||||
@ -1198,7 +1200,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
|
||||
buf.append("<td valign=\"top\" ><code>");
|
||||
buf.append(sendWindow/1024);
|
||||
buf.append("K</code></td>");
|
||||
buf.append("K");
|
||||
buf.append("/").append(peer.getConcurrentSends());
|
||||
buf.append("/").append(peer.getConcurrentSendWindow());
|
||||
buf.append("/").append(peer.getConsecutiveSendRejections());
|
||||
buf.append("</code></td>");
|
||||
|
||||
buf.append("<td valign=\"top\" ><code>");
|
||||
buf.append(peer.getSlowStartThreshold()/1024);
|
||||
@ -1329,7 +1335,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
"<b id=\"def.rate\">in/out</b>: the rates show a smoothed inbound and outbound transfer rate (KBytes per second)<br />\n" +
|
||||
"<b id=\"def.up\">up</b>: the uptime is how long ago this session was established<br />\n" +
|
||||
"<b id=\"def.skew\">skew</b>: the skew says how far off the other user's clock is, relative to your own<br />\n" +
|
||||
"<b id=\"def.cwnd\">cwnd</b>: the congestion window is how many bytes in 'in flight' you can send without an acknowledgement<br />\n" +
|
||||
"<b id=\"def.cwnd\">cwnd</b>: the congestion window is how many bytes in 'in flight' you can send without an acknowledgement / <br />\n" +
|
||||
" the number of currently active messages being sent /<br />\n the maximum number of concurrent messages to send /<br />\n"+
|
||||
" the number of consecutive sends which were blocked due to throws message window size<br />\n" +
|
||||
"<b id=\"def.ssthresh\">ssthresh</b>: the slow start threshold help make sure the cwnd doesn't grow too fast<br />\n" +
|
||||
"<b id=\"def.rtt\">rtt</b>: the round trip time is how long it takes to get an acknowledgement of a packet<br />\n" +
|
||||
"<b id=\"def.dev\">dev</b>: the standard deviation of the round trip time, to help control the retransmit timeout<br />\n" +
|
||||
|
@ -73,8 +73,8 @@ class BuildHandler {
|
||||
if (toHandle > MAX_HANDLE_AT_ONCE)
|
||||
toHandle = MAX_HANDLE_AT_ONCE;
|
||||
handled = new ArrayList(toHandle);
|
||||
for (int i = 0; i < toHandle; i++)
|
||||
handled.add(_inboundBuildMessages.remove(0));
|
||||
for (int i = 0; i < toHandle; i++) // LIFO for lower response time (should we RED it for DoS?)
|
||||
handled.add(_inboundBuildMessages.remove(_inboundBuildMessages.size()-1));
|
||||
}
|
||||
}
|
||||
if (handled != null) {
|
||||
|
Reference in New Issue
Block a user