2005-09-26 jrandom

* I2PTunnel bugfix (thanks Complication!)
    * Increase the SSU cwin slower during congestion avoidance (at k/cwin^2
      instead of k/cwin)
    * Limit the number of inbound SSU sessions being built at once (using
      half of the i2np.udp.maxConcurrentEstablish config prop)
    * Don't shitlist on a message send failure alone (unless there aren't any
      common transports).
    * More careful bandwidth bursting
This commit is contained in:
jrandom
2005-09-27 07:17:40 +00:00
committed by zzz
parent f6d8200bc8
commit 24bad8e4bb
12 changed files with 67 additions and 36 deletions

View File

@ -234,12 +234,15 @@ class HTTPResponseOutputStream extends FilterOutputStream {
_log.info("Decompressed: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded());
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error decompressing: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded(), ioe);
_log.warn("Error decompressing: " + written + ", " + (_in != null ? _in.getTotalRead() + "/" + _in.getTotalExpanded() : ""), ioe);
} finally {
if (_log.shouldLog(Log.WARN) && (_in != null))
_log.warn("After decompression, written=" + written + " read=" + _in.getTotalRead()
+ ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining()
+ ", finished=" + _in.getFinished());
_log.warn("After decompression, written=" + written +
(_in != null ?
" read=" + _in.getTotalRead()
+ ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining()
+ ", finished=" + _in.getFinished()
: ""));
if (_out != null) try {
_out.close();
} catch (IOException ioe) {}

View File

@ -417,8 +417,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
long before = System.currentTimeMillis();
_sessionListener.messageAvailable(I2PSessionImpl.this, msgId.intValue(), size.intValue());
long duration = System.currentTimeMillis() - before;
if ((duration > 100) && _log.shouldLog(Log.ERROR))
_log.error("Message availability notification for " + msgId.intValue() + " took "
if ((duration > 100) && _log.shouldLog(Log.WARN))
_log.warn("Message availability notification for " + msgId.intValue() + " took "
+ duration + " to " + _sessionListener);
} catch (Exception e) {
_log.log(Log.CRIT, "Error notifying app of message availability", e);

View File

@ -1,4 +1,14 @@
$Id: history.txt,v 1.265 2005/09/25 18:52:58 jrandom Exp $
$Id: history.txt,v 1.266 2005/09/26 18:45:53 jrandom Exp $
2005-09-27 jrandom
* I2PTunnel bugfix (thanks Complication!)
* Increase the SSU cwin slower during congestion avoidance (at k/cwin^2
instead of k/cwin)
* Limit the number of inbound SSU sessions being built at once (using
half of the i2np.udp.maxConcurrentEstablish config prop)
* Don't shitlist on a message send failure alone (unless there aren't any
common transports).
* More careful bandwidth bursting
2005-09-26 jrandom
* Reworded the SSU introductions config section (thanks duck!)

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.245 $ $Date: 2005/09/25 18:52:58 $";
public final static String ID = "$Revision: 1.246 $ $Date: 2005/09/26 18:45:52 $";
public final static String VERSION = "0.6.0.6";
public final static long BUILD = 5;
public final static long BUILD = 6;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -75,7 +75,8 @@ public class Shitlist {
Date oldDate = (Date)_shitlist.put(peer, new Date(_context.clock().now() + period));
wasAlready = (null == oldDate);
if (reason != null) {
_shitlistCause.put(peer, reason);
if (!wasAlready)
_shitlistCause.put(peer, reason);
} else {
_shitlistCause.remove(peer);
}

View File

@ -180,8 +180,11 @@ public class FIFOBandwidthLimiter {
/**
* More bytes are available - add them to the queue and satisfy any requests
* we can
*
* @param maxBurstIn allow up to this many bytes in from the burst section for this time period (may be negative)
* @param maxBurstOut allow up to this many bytes in from the burst section for this time period (may be negative)
*/
final void refillBandwidthQueues(long bytesInbound, long bytesOutbound) {
final void refillBandwidthQueues(long bytesInbound, long bytesOutbound, long maxBurstIn, long maxBurstOut) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Refilling the queues with " + bytesInbound + "/" + bytesOutbound + ": " + getStatus().toString());
_availableInbound += bytesInbound;
@ -197,9 +200,8 @@ public class FIFOBandwidthLimiter {
_unavailableInboundBurst = _maxInboundBurst;
}
} else {
// try to pull in up to 1/10th of the max inbound rate (aka burst rate), since
// we refill every 100ms
int want = _maxInbound/10;
// try to pull in up to 1/10th of the burst rate, since we refill every 100ms
int want = (int)maxBurstIn;
if (want > (_maxInbound - _availableInbound))
want = _maxInbound - _availableInbound;
if (_log.shouldLog(Log.DEBUG))
@ -226,9 +228,8 @@ public class FIFOBandwidthLimiter {
_unavailableOutboundBurst = _maxOutboundBurst;
}
} else {
// try to pull in up to 1/10th of the max outbound rate (aka burst rate), since
// we refill every 100ms
int want = _maxOutbound/10;
// try to pull in up to 1/10th of the burst rate, since we refill every 100ms
int want = (int)maxBurstOut;
if (want > (_maxOutbound - _availableOutbound))
want = _maxOutbound - _availableOutbound;
if (_log.shouldLog(Log.DEBUG))

View File

@ -112,7 +112,9 @@ class FIFOBandwidthRefiller implements Runnable {
_limiter.setOutboundUnlimited(false);
}
_limiter.refillBandwidthQueues(inboundToAdd, outboundToAdd);
long maxBurstIn = ((_inboundBurstKBytesPerSecond-_inboundKBytesPerSecond)*1024*numMs)/1000;
long maxBurstOut = ((_outboundBurstKBytesPerSecond-_outboundKBytesPerSecond)*1024*numMs)/1000;
_limiter.refillBandwidthQueues(inboundToAdd, outboundToAdd, maxBurstIn, maxBurstOut);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable");

View File

@ -61,8 +61,10 @@ public class GetBidsJob extends JobImpl {
TransportBid bid = facade.getNextBid(msg);
if (bid == null) {
context.shitlist().shitlistRouter(to, "No more bids available");
context.netDb().fail(to);
if (msg.getFailedTransports().size() == 0) {
context.shitlist().shitlistRouter(to, "We share no common transports with them");
context.netDb().fail(to);
}
fail(context, msg);
} else {
if (log.shouldLog(Log.INFO))

View File

@ -165,6 +165,13 @@ public class EstablishmentManager {
notifyActivity();
}
/**
* How many concurrent inbound sessions to deal with
*/
private int getMaxInboundEstablishers() {
return getMaxConcurrentEstablish()/2;
}
/**
* Got a SessionRequest (initiates an inbound establishment)
*
@ -173,9 +180,14 @@ public class EstablishmentManager {
if (!_transport.isValid(from.getIP()))
return;
int maxInbound = getMaxInboundEstablishers();
boolean isNew = false;
InboundEstablishState state = null;
synchronized (_inboundStates) {
if (_inboundStates.size() >= maxInbound)
return; // drop the packet
state = (InboundEstablishState)_inboundStates.get(from);
if (state == null) {
state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort());

View File

@ -55,7 +55,7 @@ public class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendConfirmTime", "How long it takes to send a message and get the ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendConfirmFragments", "How many fragments are included in a fully ACKed message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendConfirmVolley", "How many times did fragments need to be sent before ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendFailed", "How many fragments were in a message that couldn't be delivered", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendFailed", "How many sends a failed message was pushed", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the active pool when a new one is added", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
@ -167,7 +167,7 @@ public class OutboundMessageFragments {
i--;
} else if (state.isExpired()) {
_activeMessages.remove(i);
_context.statManager().addRateData("udp.sendFailed", state.getFragmentCount(), state.getLifetime());
_context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime());
if (state.getMessage() != null) {
_transport.failed(state);

View File

@ -171,7 +171,7 @@ public class PeerState {
*/
private static final int DEFAULT_MTU = 608;//600; //1500;
private static final int MIN_RTO = 800 + ACKSender.ACK_FREQUENCY;
private static final int MAX_RTO = 3000; // 5000;
private static final int MAX_RTO = 2000; // 5000;
public PeerState(I2PAppContext ctx) {
_context = ctx;
@ -220,11 +220,11 @@ public class PeerState {
_packetsReceived = 0;
_packetsReceivedDuplicate = 0;
_inboundMessages = new HashMap(8);
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
/**
@ -458,7 +458,7 @@ public class PeerState {
if (isForACK)
_receiveACKBytes += bytes;
} else {
if (_retransmissionPeriodStart + 1000 < _context.clock().now()) {
if (true || _retransmissionPeriodStart + 1000 < _context.clock().now()) {
_packetsReceivedDuplicate++;
} else {
_retransmissionPeriodStart = _context.clock().now();
@ -505,8 +505,8 @@ public class PeerState {
*/
private boolean congestionOccurred() {
long now = _context.clock().now();
if (_lastCongestionOccurred + 5*1000 > now)
return false; // only shrink once every 5 seconds
if (_lastCongestionOccurred + _rto > now)
return false; // only shrink once every few seconds
_lastCongestionOccurred = now;
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps);
@ -628,11 +628,11 @@ public class PeerState {
if (false) {
_sendWindowBytes += 16; // why 16?
} else {
float prob = ((float)bytesACKed) / ((float)_sendWindowBytes);
float prob = ((float)bytesACKed) / ((float)(_sendWindowBytes<<1));
float v = _context.random().nextFloat();
if (v < 0) v = 0-v;
if (v <= prob)
_sendWindowBytes += bytesACKed;
_sendWindowBytes += 512; // bytesACKed;
}
}
}
@ -680,7 +680,7 @@ public class PeerState {
/** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted(int packets) {
long now = _context.clock().now();
if (_retransmissionPeriodStart + 1000 <= now) {
if (true || _retransmissionPeriodStart + 1000 <= now) {
_packetsRetransmitted += packets;
} else {
_packetRetransmissionRate = (int)((float)(0.9f*_packetRetransmissionRate) + (float)(0.1f*_packetsRetransmitted));
@ -697,7 +697,7 @@ public class PeerState {
long now = _context.clock().now();
_packetsTransmitted += packets;
//_packetsPeriodTransmitted += packets;
if (_retransmissionPeriodStart + 1000 <= now) {
if (false && _retransmissionPeriodStart + 1000 <= now) {
_packetRetransmissionRate = (int)((float)(0.9f*_packetRetransmissionRate) + (float)(0.1f*_packetsRetransmitted));
_retransmissionPeriodStart = 0;
_packetsPeriodRetransmitted = (int)_packetsRetransmitted;

View File

@ -1006,7 +1006,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
double recvDupPct = (double)peer.getPacketsReceivedDuplicate()/(double)peer.getPacketsReceived();
buf.append("<td valign=\"top\" ><code>");
buf.append(formatPct(recvDupPct));
buf.append(peer.getPacketsReceivedDuplicate()); //formatPct(recvDupPct));
buf.append("</code></td>");
buf.append("</tr>");