2005-04-30 jrandom

* Reduced some SimpleTimer churn
* add hooks for per-peer choking in the outbound message queue - if/when a
  peer reaches their cwin, no further messages will enter the 'active' pool
  until there are more bytes available.  other messages waiting (either later
  on in the same priority queue, or in the queues for other priorities) may
  take that slot.
* when we have a message acked, release the acked size to the congestion
  window (duh), rather than waiting for the second to expire and refill the
  capacity.
* send packets in a volley explicitly, waiting until we can allocate the full
  cwin size for that message
This commit is contained in:
jrandom
2005-04-30 23:26:18 +00:00
committed by zzz
parent 8063889d23
commit 0fbe84e9f0
12 changed files with 255 additions and 120 deletions

View File

@ -65,6 +65,7 @@ public class Connection {
private Object _connectLock; private Object _connectLock;
/** how many messages have been resent and not yet ACKed? */ /** how many messages have been resent and not yet ACKed? */
private int _activeResends; private int _activeResends;
private ConEvent _connectionEvent;
private long _lifetimeBytesSent; private long _lifetimeBytesSent;
private long _lifetimeBytesReceived; private long _lifetimeBytesReceived;
@ -116,6 +117,7 @@ public class Connection {
_connectLock = new Object(); _connectLock = new Object();
_activeResends = 0; _activeResends = 0;
_resetSentOn = -1; _resetSentOn = -1;
_connectionEvent = new ConEvent();
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -807,6 +809,24 @@ public class Connection {
return buf.toString(); return buf.toString();
} }
public SimpleTimer.TimedEvent getConnectionEvent() { return _connectionEvent; }
/**
* fired to reschedule event notification
*/
class ConEvent implements SimpleTimer.TimedEvent {
private Exception _addedBy;
public ConEvent() {
//_addedBy = new Exception("added by");
}
public void timeReached() {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("firing event on " + _connection, _addedBy);
eventOccurred();
}
public String toString() { return "event on " + Connection.this.toString(); }
}
/** /**
* Coordinate the resends of a given packet * Coordinate the resends of a given packet
*/ */

View File

@ -17,21 +17,6 @@ abstract class SchedulerImpl implements TaskScheduler {
} }
protected void reschedule(long msToWait, Connection con) { protected void reschedule(long msToWait, Connection con) {
SimpleTimer.getInstance().addEvent(new ConEvent(con), msToWait); SimpleTimer.getInstance().addEvent(con.getConnectionEvent(), msToWait);
}
private class ConEvent implements SimpleTimer.TimedEvent {
private Connection _connection;
private Exception _addedBy;
public ConEvent(Connection con) {
_connection = con;
//_addedBy = new Exception("added by");
}
public void timeReached() {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("firing event on " + _connection, _addedBy);
_connection.eventOccurred();
}
public String toString() { return "event on " + _connection; }
} }
} }

View File

@ -45,7 +45,10 @@ public class SimpleTimer {
} }
/** /**
* Queue up the given event to be fired no sooner than timeoutMs from now * Queue up the given event to be fired no sooner than timeoutMs from now.
* However, if this event is already scheduled, the event will be scheduled
* for the earlier of the two timeouts, which may be before this stated
* timeout. If this is not the desired behavior, call removeEvent first.
* *
*/ */
public void addEvent(TimedEvent event, long timeoutMs) { public void addEvent(TimedEvent event, long timeoutMs) {
@ -55,8 +58,15 @@ public class SimpleTimer {
Long time = new Long(eventTime); Long time = new Long(eventTime);
synchronized (_events) { synchronized (_events) {
// remove the old scheduled position, then reinsert it // remove the old scheduled position, then reinsert it
if (_eventTimes.containsKey(event)) Long oldTime = (Long)_eventTimes.get(event);
_events.remove(_eventTimes.get(event)); if (oldTime != null) {
if (oldTime.longValue() < eventTime) {
_events.notifyAll();
return; // already scheduled for sooner than requested
} else {
_events.remove(oldTime);
}
}
while (_events.containsKey(time)) while (_events.containsKey(time))
time = new Long(time.longValue() + 1); time = new Long(time.longValue() + 1);
_events.put(time, event); _events.put(time, event);

View File

@ -1,4 +1,7 @@
$Id: history.txt,v 1.200 2005/04/28 16:54:27 jrandom Exp $ $Id: history.txt,v 1.201 2005/04/29 01:24:15 jrandom Exp $
2005-04-30 jrandom
* Reduced some SimpleTimer churn
2005-04-29 jrandom 2005-04-29 jrandom
* Reduce the peer profile stat coallesce overhead by inlining it with the * Reduce the peer profile stat coallesce overhead by inlining it with the

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.191 $ $Date: 2005/04/28 16:54:28 $"; public final static String ID = "$Revision: 1.192 $ $Date: 2005/04/29 01:24:15 $";
public final static String VERSION = "0.5.0.7"; public final static String VERSION = "0.5.0.7";
public final static long BUILD = 3; public final static long BUILD = 4;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION); System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -22,7 +22,7 @@ public class ACKSender implements Runnable {
private boolean _alive; private boolean _alive;
/** how frequently do we want to send ACKs to a peer? */ /** how frequently do we want to send ACKs to a peer? */
static final int ACK_FREQUENCY = 400; static final int ACK_FREQUENCY = 200;
public ACKSender(RouterContext ctx, UDPTransport transport) { public ACKSender(RouterContext ctx, UDPTransport transport) {
_context = ctx; _context = ctx;

View File

@ -25,6 +25,7 @@ public class OutboundMessageFragments {
private RouterContext _context; private RouterContext _context;
private Log _log; private Log _log;
private UDPTransport _transport; private UDPTransport _transport;
private ActiveThrottle _throttle;
/** OutboundMessageState for messages being sent */ /** OutboundMessageState for messages being sent */
private List _activeMessages; private List _activeMessages;
private boolean _alive; private boolean _alive;
@ -38,10 +39,11 @@ public class OutboundMessageFragments {
// don't send a packet more than 10 times // don't send a packet more than 10 times
static final int MAX_VOLLEYS = 10; static final int MAX_VOLLEYS = 10;
public OutboundMessageFragments(RouterContext ctx, UDPTransport transport) { public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(OutboundMessageFragments.class); _log = ctx.logManager().getLog(OutboundMessageFragments.class);
_transport = transport; _transport = transport;
_throttle = throttle;
_activeMessages = new ArrayList(MAX_ACTIVE); _activeMessages = new ArrayList(MAX_ACTIVE);
_nextPacketMessage = 0; _nextPacketMessage = 0;
_builder = new PacketBuilder(ctx); _builder = new PacketBuilder(ctx);
@ -130,6 +132,8 @@ public class OutboundMessageFragments {
if (state.isComplete()) { if (state.isComplete()) {
_activeMessages.remove(i); _activeMessages.remove(i);
_transport.succeeded(state.getMessage()); _transport.succeeded(state.getMessage());
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
state.releaseResources(); state.releaseResources();
if (i < _nextPacketMessage) { if (i < _nextPacketMessage) {
_nextPacketMessage--; _nextPacketMessage--;
@ -149,6 +153,8 @@ public class OutboundMessageFragments {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send an expired direct message: " + state); _log.warn("Unable to send an expired direct message: " + state);
} }
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
state.releaseResources(); state.releaseResources();
if (i < _nextPacketMessage) { if (i < _nextPacketMessage) {
_nextPacketMessage--; _nextPacketMessage--;
@ -170,6 +176,8 @@ public class OutboundMessageFragments {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send a direct message after too many volleys: " + state); _log.warn("Unable to send a direct message after too many volleys: " + state);
} }
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
state.releaseResources(); state.releaseResources();
if (i < _nextPacketMessage) { if (i < _nextPacketMessage) {
_nextPacketMessage--; _nextPacketMessage--;
@ -185,17 +193,17 @@ public class OutboundMessageFragments {
private static final long SECOND_MASK = 1023l; private static final long SECOND_MASK = 1023l;
/** /**
* Grab the next packet that we want to send, blocking until one is ready. * Fetch all the packets for a message volley, blocking until there is a
* This is the main driver for the packet scheduler * message which can be fully transmitted (or the transport is shut down).
* The returned array may be sparse, with null packets taking the place of
* already ACKed fragments.
* *
*/ */
public UDPPacket getNextPacket() { public UDPPacket[] getNextVolley() {
PeerState peer = null; PeerState peer = null;
OutboundMessageState state = null; OutboundMessageState state = null;
int currentFragment = -1; while (_alive && (state == null) ) {
while (_alive && (currentFragment < 0) ) {
long now = _context.clock().now(); long now = _context.clock().now();
long nextSend = -1; long nextSend = -1;
finishMessages(); finishMessages();
@ -203,88 +211,36 @@ public class OutboundMessageFragments {
for (int i = 0; i < _activeMessages.size(); i++) { for (int i = 0; i < _activeMessages.size(); i++) {
int cur = (i + _nextPacketMessage) % _activeMessages.size(); int cur = (i + _nextPacketMessage) % _activeMessages.size();
state = (OutboundMessageState)_activeMessages.get(cur); state = (OutboundMessageState)_activeMessages.get(cur);
if (state.getNextSendTime() <= now) {
peer = state.getPeer(); // known if this is immediately after establish peer = state.getPeer(); // known if this is immediately after establish
if (peer == null) if (peer == null)
peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash()); peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash());
if ((peer != null) && locked_shouldSend(state, peer)) {
// for fairness, we move on in a round robin
_nextPacketMessage = i + 1;
break;
} else {
if (peer == null) { if (peer == null) {
// peer disconnected // peer disconnected
_activeMessages.remove(cur); _activeMessages.remove(cur);
_transport.failed(state); _transport.failed(state);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Peer disconnected for " + state); _log.warn("Peer disconnected for " + state);
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
state.releaseResources(); state.releaseResources();
i--; i--;
} else {
if (!state.isFragmented()) {
state.fragment(fragmentSize(peer.getMTU()));
if (_log.shouldLog(Log.INFO))
_log.info("Fragmenting " + state);
} }
int oldVolley = state.getPushCount();
// pickNextFragment increments the pushCount every
// time we cycle through all of the packets
currentFragment = state.pickNextFragment();
int fragmentSize = state.fragmentSize(currentFragment);
if (peer.allocateSendingBytes(fragmentSize)) {
state.incrementCurrentFragment();
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + fragmentSize + " allowed with "
+ peer.getSendWindowBytesRemaining()
+ "/" + peer.getSendWindowBytes()
+ " remaining"
+ " for message " + state.getMessageId() + ": " + state);
if (state.justBeganVolley() && (state.getPushCount() > 0) && (state.getFragmentCount() > 1)) {
peer.messageRetransmitted();
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmitting " + state + " to " + peer);
}
// for fairness, we move on in a round robin
//_nextPacketMessage = i + 1;
if (currentFragment >= state.getFragmentCount() - 1) {
// this is the last fragment
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount());
if (state.getPeer() != null) {
int rto = state.getPeer().getRTO() * state.getPushCount();
state.setNextSendTime(now + rto);
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("changed volley, unknown peer");
state.setNextSendTime(now + 1000 + _context.random().nextInt(2000));
}
// only move on in round robin after sending a full volley
_nextPacketMessage = (i + 1) % _activeMessages.size();
} else {
if (peer.getSendWindowBytesRemaining() > 0)
state.setNextSendTime(now);
else
state.setNextSendTime((now + 1024) & ~SECOND_MASK);
}
break;
} else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
if (_log.shouldLog(Log.WARN))
_log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes()
+ " available=" + peer.getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime((now + 1024) & ~SECOND_MASK);
currentFragment = -1;
}
}
}
long time = state.getNextSendTime(); long time = state.getNextSendTime();
if ( (nextSend < 0) || (time < nextSend) ) if ( (nextSend < 0) || (time < nextSend) )
nextSend = time; nextSend = time;
state = null;
peer = null;
}
} // end of the for(activeMessages) } // end of the for(activeMessages)
if (currentFragment < 0) { if (state == null) {
if (nextSend <= 0) { if (nextSend <= 0) {
try { try {
_activeMessages.notifyAll(); _activeMessages.notifyAll();
@ -310,11 +266,72 @@ public class OutboundMessageFragments {
} // end of the synchronized block } // end of the synchronized block
} // end of the while (alive && !found) } // end of the while (alive && !found)
if (currentFragment >= 0) { return preparePackets(state, peer);
}
private boolean locked_shouldSend(OutboundMessageState state, PeerState peer) {
long now = _context.clock().now();
if (state.getNextSendTime() <= now) {
if (!state.isFragmented()) {
state.fragment(fragmentSize(peer.getMTU()));
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Building packet for fragment " + currentFragment _log.info("Fragmenting " + state);
+ " of " + state + " to " + peer); }
UDPPacket rv = _builder.buildPacket(state, currentFragment, peer);
int size = state.getUnackedSize();
if (peer.allocateSendingBytes(size)) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + size + " allowed with "
+ peer.getSendWindowBytesRemaining()
+ "/" + peer.getSendWindowBytes()
+ " remaining"
+ " for message " + state.getMessageId() + ": " + state);
if (state.getPushCount() > 0) {
peer.messageRetransmitted();
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmitting " + state + " to " + peer);
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount());
}
state.push();
int rto = peer.getRTO() * state.getPushCount();
state.setNextSendTime(now + rto);
if (peer.getSendWindowBytesRemaining() > 0)
_throttle.unchoke(peer.getRemotePeer());
return true;
} else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
if (_log.shouldLog(Log.WARN))
_log.warn("Allocation of " + size + " rejected w/ wsize=" + peer.getSendWindowBytes()
+ " available=" + peer.getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime((now + 1024) & ~SECOND_MASK);
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
_throttle.choke(peer.getRemotePeer());
}
} // nextTime <= now
return false;
}
private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
if (state != null) {
int fragments = state.getFragmentCount();
if (fragments < 0)
return null;
if (_log.shouldLog(Log.INFO))
_log.info("Building packet for " + state + " to " + peer);
UDPPacket rv[] = new UDPPacket[fragments]; //sparse
for (int i = 0; i < fragments; i++) {
if (state.needsSending(i))
rv[i] = _builder.buildPacket(state, i, peer);
}
return rv; return rv;
} else { } else {
// !alive // !alive
@ -384,6 +401,8 @@ public class OutboundMessageFragments {
} else { } else {
_log.warn("message acked, but no peer attacked: " + state); _log.warn("message acked, but no peer attacked: " + state);
} }
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
state.releaseResources(); state.releaseResources();
return numFragments; return numFragments;
} else { } else {
@ -434,7 +453,15 @@ public class OutboundMessageFragments {
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
_transport.succeeded(state.getMessage()); _transport.succeeded(state.getMessage());
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
state.releaseResources(); state.releaseResources();
} }
} }
public interface ActiveThrottle {
public void choke(Hash peer);
public void unchoke(Hash peer);
public boolean isChoked(Hash peer);
}
} }

View File

@ -114,6 +114,29 @@ public class OutboundMessageState {
// nothing else pending ack // nothing else pending ack
return true; return true;
} }
public synchronized int getUnackedSize() {
int rv = 0;
if ( (_messageBuf != null) && (_fragmentSends != null) ) {
int totalSize = _messageBuf.getValid();
int lastSize = totalSize % _fragmentSize;
if (lastSize == 0)
lastSize = _fragmentSize;
for (int i = 0; i < _fragmentSends.length; i++) {
if (_fragmentSends[i] >= (short)0) {
if (i + 1 == _fragmentSends.length)
rv += lastSize;
else
rv += _fragmentSize;
}
}
}
return rv;
}
public synchronized boolean needsSending(int fragment) {
if ( (_fragmentSends == null) || (fragment >= _fragmentSends.length) || (fragment < 0) )
return false;
return (_fragmentSends[fragment] >= (short)0);
}
public long getLifetime() { return _context.clock().now() - _startedOn; } public long getLifetime() { return _context.clock().now() - _startedOn; }
/** /**
@ -133,7 +156,16 @@ public class OutboundMessageState {
public int getMaxSends() { return _maxSends; } public int getMaxSends() { return _maxSends; }
public int getPushCount() { return _pushCount; } public int getPushCount() { return _pushCount; }
/** note that we have pushed the message fragments */ /** note that we have pushed the message fragments */
public void push() { _pushCount++; } public synchronized void push() {
_pushCount++;
if (_pushCount > _maxSends)
_maxSends = (short)_pushCount;
if (_fragmentSends != null)
for (int i = 0; i < _fragmentSends.length; i++)
if (_fragmentSends[i] >= (short)0)
_fragmentSends[i] = (short)(1 + _fragmentSends[i]);
}
public boolean isFragmented() { return _fragmentSends != null; } public boolean isFragmented() { return _fragmentSends != null; }
/** /**
* Prepare the message for fragmented delivery, using no more than * Prepare the message for fragmented delivery, using no more than

View File

@ -35,9 +35,13 @@ public class PacketPusher implements Runnable {
public void run() { public void run() {
while (_alive) { while (_alive) {
UDPPacket packet = _fragments.getNextPacket(); UDPPacket packets[] = _fragments.getNextVolley();
if (packet != null) if (packets != null) {
_sender.add(packet, 1000); // blocks for up to a second for (int i = 0; i < packets.length; i++) {
if (packets[i] != null) // null for ACKed fragments
_sender.add(packets[i], 1000); // blocks for up to a second
}
}
} }
} }
} }

View File

@ -136,7 +136,7 @@ public class PeerState {
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024; private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
private static final int DEFAULT_MTU = 1472; private static final int DEFAULT_MTU = 1472;
private static final int MIN_RTO = ACKSender.ACK_FREQUENCY + 100; private static final int MIN_RTO = 600;
private static final int MAX_RTO = 5000; private static final int MAX_RTO = 5000;
public PeerState(I2PAppContext ctx) { public PeerState(I2PAppContext ctx) {
@ -179,6 +179,8 @@ public class PeerState {
_rto = 6000; _rto = 6000;
_messagesReceived = 0; _messagesReceived = 0;
_messagesSent = 0; _messagesSent = 0;
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
} }
/** /**
@ -425,6 +427,8 @@ public class PeerState {
return false; // only shrink once every 10 seconds return false; // only shrink once every 10 seconds
_lastCongestionOccurred = now; _lastCongestionOccurred = now;
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps);
//if (true) //if (true)
// _sendWindowBytes -= 10000; // _sendWindowBytes -= 10000;
//else //else
@ -469,6 +473,12 @@ public class PeerState {
if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES) if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES)
_sendWindowBytes = MAX_SEND_WINDOW_BYTES; _sendWindowBytes = MAX_SEND_WINDOW_BYTES;
_lastReceiveTime = _context.clock().now(); _lastReceiveTime = _context.clock().now();
if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
_sendWindowBytesRemaining += bytesACKed;
else
_sendWindowBytesRemaining = _sendWindowBytes;
_messagesSent++; _messagesSent++;
if (numSends <= 2) if (numSends <= 2)
recalculateTimeouts(lifetime); recalculateTimeouts(lifetime);
@ -492,6 +502,7 @@ public class PeerState {
/** we are resending a packet, so lets jack up the rto */ /** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted() { public void messageRetransmitted() {
congestionOccurred(); congestionOccurred();
_context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation);
//_rto *= 2; //_rto *= 2;
} }
/** how long does it usually take to get a message ACKed? */ /** how long does it usually take to get a message ACKed? */

View File

@ -2,8 +2,11 @@ package net.i2p.router.transport.udp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import net.i2p.data.Hash;
import net.i2p.router.OutNetMessage; import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
@ -14,7 +17,7 @@ import net.i2p.util.Log;
* with code to fail messages that expire. * with code to fail messages that expire.
* *
*/ */
public class TimedWeightedPriorityMessageQueue implements MessageQueue { public class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle {
private RouterContext _context; private RouterContext _context;
private Log _log; private Log _log;
/** FIFO queue of messages in a particular priority */ /** FIFO queue of messages in a particular priority */
@ -39,6 +42,8 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue {
private volatile boolean _addedSincePassBegan; private volatile boolean _addedSincePassBegan;
private Expirer _expirer; private Expirer _expirer;
private FailedListener _listener; private FailedListener _listener;
/** set of peers (Hash) whose congestion window is exceeded in the active queue */
private Set _chokedPeers;
/** /**
* Build up a new queue * Build up a new queue
@ -69,6 +74,7 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue {
_alive = true; _alive = true;
_nextLock = this; _nextLock = this;
_nextQueue = 0; _nextQueue = 0;
_chokedPeers = new HashSet(16);
_listener = lsnr; _listener = lsnr;
_context.statManager().createRateStat("udp.timeToEntrance", "Message lifetime until it reaches the UDP system", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.timeToEntrance", "Message lifetime until it reaches the UDP system", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.messageQueueSize", "How many messages are on the current class queue at removal", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.messageQueueSize", "How many messages are on the current class queue at removal", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -114,8 +120,16 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue {
for (int i = 0; i < _queue.length; i++) { for (int i = 0; i < _queue.length; i++) {
int currentQueue = (_nextQueue + i) % _queue.length; int currentQueue = (_nextQueue + i) % _queue.length;
synchronized (_queue[currentQueue]) { synchronized (_queue[currentQueue]) {
if (_queue[currentQueue].size() > 0) { for (int j = 0; j < _queue[currentQueue].size(); j++) {
OutNetMessage msg = (OutNetMessage)_queue[currentQueue].remove(0); OutNetMessage msg = (OutNetMessage)_queue[currentQueue].get(j);
Hash to = msg.getTarget().getIdentity().getHash();
synchronized (_nextLock) { // yikes!
if (_chokedPeers.contains(to))
continue;
}
// not choked, lets push it to active
_queue[currentQueue].remove(j);
long size = msg.getMessageSize(); long size = msg.getMessageSize();
_bytesQueued[currentQueue] -= size; _bytesQueued[currentQueue] -= size;
_bytesTransferred[currentQueue] += size; _bytesTransferred[currentQueue] += size;
@ -129,12 +143,12 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue {
_log.debug("Pulling a message off queue " + currentQueue + " with " _log.debug("Pulling a message off queue " + currentQueue + " with "
+ _queue[currentQueue].size() + " remaining"); + _queue[currentQueue].size() + " remaining");
return msg; return msg;
} else { }
// nothing waiting
// nothing waiting, or only choked peers
_messagesFlushed[currentQueue] = 0; _messagesFlushed[currentQueue] = 0;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothing on queue " + currentQueue); _log.debug("Nothing available on queue " + currentQueue);
}
} }
} }
@ -173,6 +187,26 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue {
} }
} }
public void choke(Hash peer) {
if (true) return;
synchronized (_nextLock) {
_chokedPeers.add(peer);
_nextLock.notifyAll();
}
}
public void unchoke(Hash peer) {
if (true) return;
synchronized (_nextLock) {
_chokedPeers.remove(peer);
_nextLock.notifyAll();
}
}
public boolean isChoked(Hash peer) {
synchronized (_nextLock) {
return _chokedPeers.contains(peer);
}
}
private int pickQueue(OutNetMessage message) { private int pickQueue(OutNetMessage message) {
int target = message.getPriority(); int target = message.getPriority();
for (int i = 0; i < _priorityLimits.length; i++) { for (int i = 0; i < _priorityLimits.length; i++) {

View File

@ -48,6 +48,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private EstablishmentManager _establisher; private EstablishmentManager _establisher;
private MessageQueue _outboundMessages; private MessageQueue _outboundMessages;
private OutboundMessageFragments _fragments; private OutboundMessageFragments _fragments;
private OutboundMessageFragments.ActiveThrottle _activeThrottle;
private OutboundRefiller _refiller; private OutboundRefiller _refiller;
private PacketPusher _pusher; private PacketPusher _pusher;
private InboundMessageFragments _inboundFragments; private InboundMessageFragments _inboundFragments;
@ -101,13 +102,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_peersByRelayTag = new HashMap(128); _peersByRelayTag = new HashMap(128);
_endpoint = null; _endpoint = null;
_outboundMessages = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this); TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this);
_outboundMessages = mq;
_activeThrottle = mq;
_relayPeers = new ArrayList(1); _relayPeers = new ArrayList(1);
_fastBid = new SharedBid(50); _fastBid = new SharedBid(50);
_slowBid = new SharedBid(1000); _slowBid = new SharedBid(1000);
_fragments = new OutboundMessageFragments(_context, this); _fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this); _inboundFragments = new InboundMessageFragments(_context, _fragments, this);
_flooder = new UDPFlooder(_context, this); _flooder = new UDPFlooder(_context, this);
@ -314,6 +317,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
} }
_activeThrottle.unchoke(peer.getRemotePeer());
_context.shitlist().unshitlistRouter(peer.getRemotePeer()); _context.shitlist().unshitlistRouter(peer.getRemotePeer());
if (SHOULD_FLOOD_PEERS) if (SHOULD_FLOOD_PEERS)
@ -341,6 +345,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
} }
// unchoke 'em, but just because we'll never talk again...
_activeThrottle.unchoke(peer.getRemotePeer());
if (SHOULD_FLOOD_PEERS) if (SHOULD_FLOOD_PEERS)
_flooder.removePeer(peer); _flooder.removePeer(peer);
} }
@ -569,6 +576,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
buf.append(':').append(peer.getRemotePort()); buf.append(':').append(peer.getRemotePort());
buf.append("</a>"); buf.append("</a>");
if (_activeThrottle.isChoked(peer.getRemotePeer()))
buf.append(" [choked]");
if (peer.getConsecutiveFailedSends() > 0) if (peer.getConsecutiveFailedSends() > 0)
buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]"); buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]");
buf.append("</td>"); buf.append("</td>");