2005-08-23 jrandom

* Removed the concept of "no bandwidth limit" - if none is specified, its
      16KBps in/out.
    * Include ack packets in the per-peer cwin throttle (they were part of the
      bandwidth limit though).
    * Tweak the SSU cwin operation to get more accurrate estimates under
      congestions.
    * SSU improvements to resend more efficiently.
    * Added a basic scheduler to eepget to fetch multiple files sequentially.
This commit is contained in:
jrandom
2005-08-23 21:25:49 +00:00
committed by zzz
parent c7b75df390
commit 1a6b49cfb8
37 changed files with 1634 additions and 273 deletions

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.216 $ $Date: 2005/08/17 15:05:03 $";
public final static String ID = "$Revision: 1.217 $ $Date: 2005/08/21 13:39:05 $";
public final static String VERSION = "0.6.0.3";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -119,7 +119,9 @@ public class FIFOBandwidthLimiter {
*/
final void refillBandwidthQueues(long bytesInbound, long bytesOutbound) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Refilling the queues with " + bytesInbound + "/" + bytesOutbound);
_log.debug("Refilling the queues with " + bytesInbound + "/" + bytesOutbound + ", available " +
_availableInboundBytes + '/' + _availableOutboundBytes + ", max " +
_maxInboundBytes + '/' + _maxOutboundBytes);
_availableInboundBytes += bytesInbound;
_availableOutboundBytes += bytesOutbound;
if (_availableInboundBytes > _maxInboundBytes) {

View File

@ -25,15 +25,21 @@ class FIFOBandwidthRefiller implements Runnable {
public static final String PROP_INBOUND_BANDWIDTH_PEAK = "i2np.bandwidth.inboundBurstKBytes";
public static final String PROP_OUTBOUND_BANDWIDTH_PEAK = "i2np.bandwidth.outboundBurstKBytes";
//public static final String PROP_REPLENISH_FREQUENCY = "i2np.bandwidth.replenishFrequencyMs";
// no longer allow unlimited bandwidth - the user must specify a value, and if they do not, it is 16KBps
public static final int DEFAULT_INBOUND_BANDWIDTH = 16;
public static final int DEFAULT_OUTBOUND_BANDWIDTH = 16;
public static final int DEFAULT_BURST_SECONDS = 60;
/** For now, until there is some tuning and safe throttling, we set the floor at 6KBps inbound */
public static final int MIN_INBOUND_BANDWIDTH = 1;
public static final int MIN_INBOUND_BANDWIDTH = 5;
/** For now, until there is some tuning and safe throttling, we set the floor at 6KBps outbound */
public static final int MIN_OUTBOUND_BANDWIDTH = 1;
public static final int MIN_OUTBOUND_BANDWIDTH = 5;
/** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */
public static final int MIN_INBOUND_BANDWIDTH_PEAK = 1;
public static final int MIN_INBOUND_BANDWIDTH_PEAK = 10;
/** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */
public static final int MIN_OUTBOUND_BANDWIDTH_PEAK = 1;
public static final int MIN_OUTBOUND_BANDWIDTH_PEAK = 10;
/** Updating the bandwidth more than once a second is silly. once every 2 or 5 seconds is less so. */
public static final long MIN_REPLENISH_FREQUENCY = 100;
@ -146,6 +152,8 @@ class FIFOBandwidthRefiller implements Runnable {
_inboundKBytesPerSecond = in;
else
_inboundKBytesPerSecond = MIN_INBOUND_BANDWIDTH;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Updating inbound rate to " + _inboundKBytesPerSecond);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid inbound bandwidth limit [" + inBwStr
@ -155,6 +163,9 @@ class FIFOBandwidthRefiller implements Runnable {
if ( (inBwStr == null) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Inbound bandwidth limits not specified in the config via " + PROP_INBOUND_BANDWIDTH);
}
if (_inboundKBytesPerSecond <= 0)
_inboundKBytesPerSecond = DEFAULT_INBOUND_BANDWIDTH;
}
private void updateOutboundRate() {
String outBwStr = _context.getProperty(PROP_OUTBOUND_BANDWIDTH);
@ -169,6 +180,8 @@ class FIFOBandwidthRefiller implements Runnable {
_outboundKBytesPerSecond = out;
else
_outboundKBytesPerSecond = MIN_OUTBOUND_BANDWIDTH;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Updating outbound rate to " + _outboundKBytesPerSecond);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid outbound bandwidth limit [" + outBwStr
@ -178,6 +191,9 @@ class FIFOBandwidthRefiller implements Runnable {
if ( (outBwStr == null) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Outbound bandwidth limits not specified in the config via " + PROP_OUTBOUND_BANDWIDTH);
}
if (_outboundKBytesPerSecond <= 0)
_outboundKBytesPerSecond = DEFAULT_OUTBOUND_BANDWIDTH;
}
private void updateInboundPeak() {
@ -203,11 +219,13 @@ class FIFOBandwidthRefiller implements Runnable {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid inbound bandwidth burst limit [" + inBwStr
+ "]");
_limiter.setMaxInboundBytes(DEFAULT_BURST_SECONDS * _inboundKBytesPerSecond * 1024);
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Inbound bandwidth burst limits not specified in the config via "
+ PROP_INBOUND_BANDWIDTH_PEAK);
_limiter.setMaxInboundBytes(DEFAULT_BURST_SECONDS * _inboundKBytesPerSecond * 1024);
}
}
private void updateOutboundPeak() {
@ -233,11 +251,13 @@ class FIFOBandwidthRefiller implements Runnable {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid outbound bandwidth burst limit [" + outBwStr
+ "]");
_limiter.setMaxOutboundBytes(DEFAULT_BURST_SECONDS * _outboundKBytesPerSecond * 1024);
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound bandwidth burst limits not specified in the config via "
+ PROP_OUTBOUND_BANDWIDTH_PEAK);
_limiter.setMaxOutboundBytes(DEFAULT_BURST_SECONDS * _outboundKBytesPerSecond * 1024);
}
}

View File

@ -575,6 +575,8 @@ public class EstablishmentManager {
if (outboundState != null) {
if (outboundState.getLifetime() > MAX_ESTABLISH_TIME) {
if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
if (_log.shouldLog(Log.WARN))
_log.warn("Lifetime of expired outbound establish: " + outboundState.getLifetime());
while (true) {
OutNetMessage msg = outboundState.getNextQueuedMessage();
if (msg == null)

View File

@ -326,7 +326,7 @@ public class OutboundMessageFragments {
state.push();
int rto = peer.getRTO() * state.getPushCount();
int rto = peer.getRTO();// * state.getPushCount();
state.setNextSendTime(now + rto);
if (peer.getSendWindowBytesRemaining() > 0)
@ -338,7 +338,7 @@ public class OutboundMessageFragments {
_log.warn("Allocation of " + size + " rejected w/ wsize=" + peer.getSendWindowBytes()
+ " available=" + peer.getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime((now + 1024) & ~SECOND_MASK);
state.setNextSendTime(now+(_context.random().nextInt(2*ACKSender.ACK_FREQUENCY))); //(now + 1024) & ~SECOND_MASK);
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
_throttle.choke(peer.getRemotePeer());
@ -435,7 +435,7 @@ public class OutboundMessageFragments {
PeerState peer = state.getPeer();
if (peer != null) {
// this adjusts the rtt/rto/window/etc
peer.messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), state.getMaxSends());
peer.messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), numSends);
if (peer.getSendWindowBytesRemaining() > 0)
_throttle.unchoke(peer.getRemotePeer());
} else {

View File

@ -93,6 +93,10 @@ public class PeerState {
private int _sendBytes;
private int _receiveBps;
private int _receiveBytes;
private int _sendACKBps;
private int _sendACKBytes;
private int _receiveACKBps;
private int _receiveACKBytes;
private long _receivePeriodBegin;
private volatile long _lastCongestionOccurred;
/**
@ -141,8 +145,11 @@ public class PeerState {
private long _packetsTransmitted;
/** how many packets were retransmitted within the last RETRANSMISSION_PERIOD_WIDTH packets */
private long _packetsRetransmitted;
/** how many packets were transmitted within the last RETRANSMISSION_PERIOD_WIDTH packets */
private long _packetsPeriodTransmitted;
private int _packetsPeriodRetransmitted;
private int _packetRetransmissionRate;
/** what was the $packetsTransmitted when the current RETRANSMISSION_PERIOD_WIDTH began */
/** at what time did we last break off the retransmission counter period */
private long _retransmissionPeriodStart;
/** how many dup packets were received within the last RETRANSMISSION_PERIOD_WIDTH packets */
private long _packetsReceivedDuplicate;
@ -163,7 +170,7 @@ public class PeerState {
* of 608
*/
private static final int DEFAULT_MTU = 608;//600; //1500;
private static final int MIN_RTO = 1000 + ACKSender.ACK_FREQUENCY;
private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY;
private static final int MAX_RTO = 3000; // 5000;
public PeerState(I2PAppContext ctx) {
@ -373,6 +380,10 @@ public class PeerState {
return _consecutiveFailedSends;
}
/** how fast we are sending *ack* packets */
public int getSendACKBps() { return _sendACKBps; }
public int getReceiveACKBps() { return _receiveACKBps; }
/**
* have all of the packets received in the current second requested that
* the previous second's ACKs be sent?
@ -384,14 +395,20 @@ public class PeerState {
* cannot. If it is not decremented, the window size remaining is
* not adjusted at all.
*/
public boolean allocateSendingBytes(int size) {
public boolean allocateSendingBytes(int size) { return allocateSendingBytes(size, false); }
public boolean allocateSendingBytes(int size, boolean isForACK) {
long now = _context.clock().now();
long duration = now - _lastSendRefill;
if (duration >= 1000) {
_sendWindowBytesRemaining = _sendWindowBytes;
_sendBytes += size;
_sendBps = (int)(0.9f*(float)_sendBps + 0.1f*((float)_sendBytes * (1000f/(float)duration)));
if (isForACK) {
_sendACKBytes += size;
_sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
}
_sendBytes = 0;
_sendACKBytes = 0;
_lastSendRefill = now;
}
//if (true) return true;
@ -399,6 +416,8 @@ public class PeerState {
_sendWindowBytesRemaining -= size;
_sendBytes += size;
_lastSendTime = now;
if (isForACK)
_sendACKBytes += size;
return true;
} else {
return false;
@ -432,14 +451,17 @@ public class PeerState {
public int getSlowStartThreshold() { return _slowStartThreshold; }
/** we received the message specified completely */
public void messageFullyReceived(Long messageId, int bytes) {
if (bytes > 0)
public void messageFullyReceived(Long messageId, int bytes) { messageFullyReceived(messageId, bytes, false); }
public void messageFullyReceived(Long messageId, int bytes, boolean isForACK) {
if (bytes > 0) {
_receiveBytes += bytes;
else {
if (_retransmissionPeriodStart + RETRANSMISSION_PERIOD_WIDTH < _packetsReceived) {
if (isForACK)
_receiveACKBytes += bytes;
} else {
if (_retransmissionPeriodStart + 1000 < _context.clock().now()) {
_packetsReceivedDuplicate++;
} else {
_retransmissionPeriodStart = _packetsReceived;
_retransmissionPeriodStart = _context.clock().now();
_packetsReceivedDuplicate = 1;
}
}
@ -448,6 +470,9 @@ public class PeerState {
long duration = now - _receivePeriodBegin;
if (duration >= 1000) {
_receiveBps = (int)(0.9f*(float)_receiveBps + 0.1f*((float)_receiveBytes * (1000f/(float)duration)));
if (isForACK)
_receiveACKBps = (int)(0.9f*(float)_receiveACKBps + 0.1f*((float)_receiveACKBytes * (1000f/(float)duration)));
_receiveACKBytes = 0;
_receiveBytes = 0;
_receivePeriodBegin = now;
_context.statManager().addRateData("udp.receiveBps", _receiveBps, 0);
@ -480,20 +505,21 @@ public class PeerState {
*/
private boolean congestionOccurred() {
long now = _context.clock().now();
if (_lastCongestionOccurred + 10*1000 > now)
return false; // only shrink once every 10 seconds
if (_lastCongestionOccurred + 5*1000 > now)
return false; // only shrink once every 5 seconds
_lastCongestionOccurred = now;
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps);
int congestionAt = _sendWindowBytes;
//if (true)
// _sendWindowBytes -= 10000;
//else
_sendWindowBytes = (_sendWindowBytes*2) / 3;
_sendWindowBytes = _sendWindowBytes/4; //(_sendWindowBytes*2) / 3;
if (_sendWindowBytes < MINIMUM_WINDOW_BYTES)
_sendWindowBytes = MINIMUM_WINDOW_BYTES;
if (_sendWindowBytes < _slowStartThreshold)
_slowStartThreshold = _sendWindowBytes;
//if (congestionAt/2 < _slowStartThreshold)
_slowStartThreshold = congestionAt/2;
return true;
}
@ -595,24 +621,34 @@ public class PeerState {
public void messageACKed(int bytesACKed, long lifetime, int numSends) {
_consecutiveFailedSends = 0;
_lastFailedSendPeriod = -1;
if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
} else {
double prob = ((double)bytesACKed) / ((double)_sendWindowBytes);
if (_context.random().nextDouble() <= prob)
if (numSends < 2) {
if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
} else {
if (false) {
_sendWindowBytes += 16; // why 16?
} else {
float prob = ((float)bytesACKed) / ((float)_sendWindowBytes);
float v = _context.random().nextFloat();
if (v < 0) v = 0-v;
if (v <= prob)
_sendWindowBytes += bytesACKed;
}
}
}
if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES)
_sendWindowBytes = MAX_SEND_WINDOW_BYTES;
_lastReceiveTime = _context.clock().now();
if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
_sendWindowBytesRemaining += bytesACKed;
else
_sendWindowBytesRemaining = _sendWindowBytes;
if (false) {
if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
_sendWindowBytesRemaining += bytesACKed;
else
_sendWindowBytesRemaining = _sendWindowBytes;
}
_messagesSent++;
if (numSends <= 2)
if (numSends < 2)
recalculateTimeouts(lifetime);
else
_log.warn("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
@ -643,11 +679,14 @@ public class PeerState {
/** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted(int packets) {
if (_retransmissionPeriodStart + RETRANSMISSION_PERIOD_WIDTH < _packetsTransmitted) {
long now = _context.clock().now();
if (_retransmissionPeriodStart + 1000 <= now) {
_packetsRetransmitted += packets;
} else {
_packetRetransmissionRate = (int)((float)(0.9f*_packetRetransmissionRate) + (float)(0.1f*_packetsRetransmitted));
_retransmissionPeriodStart = _packetsTransmitted;
//_packetsPeriodTransmitted = _packetsTransmitted - _retransmissionPeriodStart;
_packetsPeriodRetransmitted = (int)_packetsRetransmitted;
_retransmissionPeriodStart = now;
_packetsRetransmitted = packets;
}
congestionOccurred();
@ -655,10 +694,13 @@ public class PeerState {
//_rto *= 2;
}
public void packetsTransmitted(int packets) {
long now = _context.clock().now();
_packetsTransmitted += packets;
if (_retransmissionPeriodStart + RETRANSMISSION_PERIOD_WIDTH > _packetsTransmitted) {
//_packetsPeriodTransmitted += packets;
if (_retransmissionPeriodStart + 1000 <= now) {
_packetRetransmissionRate = (int)((float)(0.9f*_packetRetransmissionRate) + (float)(0.1f*_packetsRetransmitted));
_retransmissionPeriodStart = _packetsTransmitted;
_retransmissionPeriodStart = 0;
_packetsPeriodRetransmitted = (int)_packetsRetransmitted;
_packetsRetransmitted = 0;
}
}
@ -673,6 +715,8 @@ public class PeerState {
public long getMessagesReceived() { return _messagesReceived; }
public long getPacketsTransmitted() { return _packetsTransmitted; }
public long getPacketsRetransmitted() { return _packetsRetransmitted; }
public long getPacketsPeriodTransmitted() { return _packetsPeriodTransmitted; }
public int getPacketsPeriodRetransmitted() { return _packetsPeriodRetransmitted; }
/** avg number of packets retransmitted for every 100 packets */
public long getPacketRetransmissionRate() { return _packetRetransmissionRate; }
public long getPacketsReceived() { return _packetsReceived; }

View File

@ -132,6 +132,7 @@ class PeerTestManager {
*/
private void receiveTestReply(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo) {
PeerTestState test = _currentTest;
if (test == null) return;
if ( (DataHelper.eq(from.getIP(), test.getBobIP().getAddress())) && (from.getPort() == test.getBobPort()) ) {
byte ip[] = new byte[testInfo.readIPSize()];
testInfo.readIP(ip, 0);

View File

@ -40,9 +40,12 @@ public class UDPReceiver {
_runner = new Runner();
_context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.droppedInboundProbabalistically", "How many packet we drop probabalistically (to simulate failures)", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.acceptedInboundProbabalistically", "How many packet we accept probabalistically (to simulate failures)", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 });
}
public void startup() {
adjustDropProbability();
_keepRunning = true;
I2PThread t = new I2PThread(_runner, _name);
t.setDaemon(true);
@ -57,6 +60,18 @@ public class UDPReceiver {
}
}
private void adjustDropProbability() {
String p = _context.getProperty("i2np.udp.dropProbability");
if (p != null) {
try {
ARTIFICIAL_DROP_PROBABILITY = Float.parseFloat(p);
} catch (NumberFormatException nfe) {}
if (ARTIFICIAL_DROP_PROBABILITY < 0) ARTIFICIAL_DROP_PROBABILITY = 0;
} else {
ARTIFICIAL_DROP_PROBABILITY = 0;
}
}
/**
* Replace the old listen port with the new one, returning the old.
* NOTE: this closes the old socket so that blocking calls unblock!
@ -69,17 +84,26 @@ public class UDPReceiver {
/** if a packet been sitting in the queue for a full second (meaning the handlers are overwhelmed), drop subsequent packets */
private static final long MAX_QUEUE_PERIOD = 1*1000;
private static final float ARTIFICIAL_DROP_PROBABILITY = 0.0f; // 0.02f; // 0.0f;
private static float ARTIFICIAL_DROP_PROBABILITY = 0.0f; // 0.02f; // 0.0f;
private static final int ARTIFICIAL_DELAY = 0; // 100;
private static final int ARTIFICIAL_DELAY_BASE = 0; //100;
private int receive(UDPPacket packet) {
//adjustDropProbability();
if (ARTIFICIAL_DROP_PROBABILITY > 0) {
// the first check is to let the compiler optimize away this
// random block on the live system when the probability is == 0
if (_context.random().nextFloat() <= ARTIFICIAL_DROP_PROBABILITY)
int v = _context.random().nextInt(1000);
if (v < ARTIFICIAL_DROP_PROBABILITY*1000) {
if (_log.shouldLog(Log.ERROR))
_log.error("Drop with v=" + v + " p=" + ARTIFICIAL_DROP_PROBABILITY + " packet size: " + packet.getPacket().getLength());
_context.statManager().addRateData("udp.droppedInboundProbabalistically", 1, 0);
return -1;
} else {
_context.statManager().addRateData("udp.acceptedInboundProbabalistically", 1, 0);
}
}
if ( (ARTIFICIAL_DELAY > 0) || (ARTIFICIAL_DELAY_BASE > 0) ) {