udp updates:

* more stats. including per-peer KBps (updated every second)
* improved blocking/timeout situations on the send queue
* added drop simulation hook
* provide logical RTO limits
This commit is contained in:
jrandom
2005-04-30 03:14:09 +00:00
committed by zzz
parent 6e1ac8e173
commit 8063889d23
10 changed files with 109 additions and 29 deletions

View File

@ -22,7 +22,7 @@ public class ACKSender implements Runnable {
private boolean _alive;
/** how frequently do we want to send ACKs to a peer? */
private static final int ACK_FREQUENCY = 400;
static final int ACK_FREQUENCY = 400;
public ACKSender(RouterContext ctx, UDPTransport transport) {
_context = ctx;

View File

@ -42,6 +42,10 @@ public class EstablishmentManager {
_inboundStates = new HashMap(32);
_outboundStates = new HashMap(32);
_activityLock = new Object();
_context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.inboundEstablishFailedState", "What state a failed inbound establishment request fails in", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.outboundEstablishFailedState", "What state a failed outbound establishment request fails in", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
}
public void startup() {
@ -221,6 +225,7 @@ public class EstablishmentManager {
_transport.addRemotePeerState(peer);
_context.statManager().addRateData("udp.inboundEstablishTime", state.getLifetime(), 0);
sendOurInfo(peer);
}
@ -250,6 +255,7 @@ public class EstablishmentManager {
_transport.addRemotePeerState(peer);
_context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0);
sendOurInfo(peer);
while (true) {
@ -343,6 +349,7 @@ public class EstablishmentManager {
} else if (cur.getLifetime() > MAX_ESTABLISH_TIME) {
// took too long, fuck 'em
iter.remove();
_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing expired inbound state");
} else {
@ -430,6 +437,7 @@ public class EstablishmentManager {
// took too long, fuck 'em
iter.remove();
outboundState = cur;
_context.statManager().addRateData("udp.outboundEstablishFailedState", cur.getState(), cur.getLifetime());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing expired outbound: " + cur);
break;

View File

@ -36,7 +36,6 @@ public class InboundMessageFragments {
private MessageReceiver _messageReceiver;
private boolean _alive;
private static final int RECENTLY_COMPLETED_SIZE = 100;
/** decay the recently completed every 2 minutes */
private static final int DECAY_PERIOD = 120*1000;
@ -113,7 +112,7 @@ public class InboundMessageFragments {
if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
from.messageFullyReceived(messageId);
from.messageFullyReceived(messageId, -1);
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.WARN))
_log.warn("Message received is a dup: " + messageId + " dups: "
@ -143,7 +142,7 @@ public class InboundMessageFragments {
_messageReceiver.receiveMessage(state);
from.messageFullyReceived(messageId);
from.messageFullyReceived(messageId, state.getCompleteSize());
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.INFO))

View File

@ -25,6 +25,7 @@ public class InboundMessageState {
*/
private int _lastFragment;
private long _receiveBegin;
private int _completeSize;
/** expire after 30s */
private static final long MAX_RECEIVE_TIME = 10*1000;
@ -39,6 +40,7 @@ public class InboundMessageState {
_from = from;
_fragments = new ByteArray[MAX_FRAGMENTS];
_lastFragment = -1;
_completeSize = -1;
_receiveBegin = ctx.clock().now();
}
@ -86,10 +88,13 @@ public class InboundMessageState {
public Hash getFrom() { return _from; }
public long getMessageId() { return _messageId; }
public synchronized int getCompleteSize() {
int size = 0;
for (int i = 0; i <= _lastFragment; i++)
size += _fragments[i].getValid();
return size;
if (_completeSize < 0) {
int size = 0;
for (int i = 0; i <= _lastFragment; i++)
size += _fragments[i].getValid();
_completeSize = size;
}
return _completeSize;
}
public void releaseResources() {

View File

@ -83,7 +83,7 @@ public class OutboundMessageFragments {
else if (_allowExcess)
return true;
else
_activeMessages.wait();
_activeMessages.wait(1000);
}
} catch (InterruptedException ie) {}
}
@ -177,9 +177,10 @@ public class OutboundMessageFragments {
_nextPacketMessage = 0;
}
i--;
}
}
}
} // end (pushCount > maxVolleys)
} // end iterating over active
_activeMessages.notifyAll();
} // end synchronized
}
private static final long SECOND_MASK = 1023l;
@ -286,6 +287,7 @@ public class OutboundMessageFragments {
if (currentFragment < 0) {
if (nextSend <= 0) {
try {
_activeMessages.notifyAll();
_activeMessages.wait();
} catch (InterruptedException ie) {}
} else {
@ -301,6 +303,8 @@ public class OutboundMessageFragments {
_activeMessages.wait(delay);
} catch (InterruptedException ie) {}
}
} else {
_activeMessages.notifyAll();
}
_allowExcess = false;
} // end of the synchronized block
@ -344,6 +348,7 @@ public class OutboundMessageFragments {
Hash expectedBy = msg.getTarget().getIdentity().getHash();
if (!expectedBy.equals(ackedBy)) {
state = null;
_activeMessages.notifyAll();
return 0;
}
}
@ -355,12 +360,12 @@ public class OutboundMessageFragments {
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
_activeMessages.notifyAll();
break;
} else {
state = null;
}
}
_activeMessages.notifyAll();
}
if (state != null) {

View File

@ -37,7 +37,7 @@ public class PacketPusher implements Runnable {
while (_alive) {
UDPPacket packet = _fragments.getNextPacket();
if (packet != null)
_sender.add(packet, true); // blocks
_sender.add(packet, 1000); // blocks for up to a second
}
}
}

View File

@ -85,6 +85,11 @@ public class PeerState {
/** how many bytes can we send to the peer in the current second */
private volatile int _sendWindowBytesRemaining;
private long _lastSendRefill;
private int _sendBps;
private int _sendBytes;
private int _receiveBps;
private int _receiveBytes;
private long _receivePeriodBegin;
private volatile long _lastCongestionOccurred;
/**
* when sendWindowBytes is below this, grow the window size quickly,
@ -131,6 +136,8 @@ public class PeerState {
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
private static final int DEFAULT_MTU = 1472;
private static final int MIN_RTO = ACKSender.ACK_FREQUENCY + 100;
private static final int MAX_RTO = 5000;
public PeerState(I2PAppContext ctx) {
_context = ctx;
@ -154,6 +161,10 @@ public class PeerState {
_sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
_slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
_lastSendRefill = _context.clock().now();
_receivePeriodBegin = _lastSendRefill;
_sendBps = 0;
_sendBytes = 0;
_receiveBps = 0;
_lastCongestionOccurred = -1;
_remoteIP = null;
_remotePort = -1;
@ -309,6 +320,9 @@ public class PeerState {
public void setLastSendTime(long when) { _lastSendTime = when; }
/** when did we last receive a packet from them? */
public void setLastReceiveTime(long when) { _lastReceiveTime = when; }
/** return the smoothed send transfer rate */
public int getSendBps() { return _sendBps; }
public int getReceiveBps() { return _receiveBps; }
public int incrementConsecutiveFailedSends() {
long now = _context.clock().now()/(10*1000);
if (_lastFailedSendPeriod >= now) {
@ -333,13 +347,18 @@ public class PeerState {
*/
public boolean allocateSendingBytes(int size) {
long now = _context.clock().now();
if (_lastSendRefill + 1000 <= now) {
long duration = now - _lastSendRefill;
if (duration >= 1000) {
_sendWindowBytesRemaining = _sendWindowBytes;
_sendBytes += size;
_sendBps = (int)(0.9f*(float)_sendBps + 0.1f*((float)_sendBytes * (1000f/(float)duration)));
_sendBytes = 0;
_lastSendRefill = now;
}
//if (true) return true;
if (size <= _sendWindowBytesRemaining) {
_sendWindowBytesRemaining -= size;
_sendBytes += size;
_lastSendTime = now;
return true;
} else {
@ -374,10 +393,21 @@ public class PeerState {
public int getSlowStartThreshold() { return _slowStartThreshold; }
/** we received the message specified completely */
public void messageFullyReceived(Long messageId) {
public void messageFullyReceived(Long messageId, int bytes) {
if (bytes > 0)
_receiveBytes += bytes;
long now = _context.clock().now();
long duration = now - _receivePeriodBegin;
if (duration >= 1000) {
_receiveBps = (int)(0.9f*(float)_receiveBps + 0.1f*((float)_receiveBytes * (1000f/(float)duration)));
_receiveBytes = 0;
_receivePeriodBegin = now;
}
synchronized (_currentACKs) {
if (_wantACKSendSince <= 0)
_wantACKSendSince = _context.clock().now();
_wantACKSendSince = now;
if (!_currentACKs.contains(messageId))
_currentACKs.add(messageId);
}
@ -454,10 +484,10 @@ public class PeerState {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt
+ " rttDev=" + _rttDeviation + " rto=" + _rto);
if (_rto < 1000)
_rto = 1000;
if (_rto > 5000)
_rto = 5000;
if (_rto < MIN_RTO)
_rto = MIN_RTO;
if (_rto > MAX_RTO)
_rto = MAX_RTO;
}
/** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted() {

View File

@ -66,7 +66,16 @@ public class UDPReceiver {
/** if a packet been sitting in the queue for 2 seconds, drop subsequent packets */
private static final long MAX_QUEUE_PERIOD = 2*1000;
private static final float ARTIFICIAL_DROP_PROBABILITY = 0f; //0.02f;
private void receive(UDPPacket packet) {
if (ARTIFICIAL_DROP_PROBABILITY > 0) {
// the first check is to let the compiler optimize away this
// random block on the live system when the probability is == 0
if (_context.random().nextFloat() <= ARTIFICIAL_DROP_PROBABILITY)
return;
}
synchronized (_inboundQueue) {
int queueSize = _inboundQueue.size();
if (queueSize > 0) {

View File

@ -66,9 +66,11 @@ public class UDPSender {
* Add the packet to the queue. This may block until there is space
* available, if requested, otherwise it returns immediately
*
* @param blockTime how long to block
* @return number of packets queued
*/
public int add(UDPPacket packet, boolean blocking) {
public int add(UDPPacket packet, int blockTime) {
long expiration = _context.clock().now() + blockTime;
int remaining = -1;
while ( (_keepRunning) && (remaining < 0) ) {
try {
@ -78,10 +80,12 @@ public class UDPSender {
remaining = _outboundQueue.size();
_outboundQueue.notifyAll();
} else {
if (blocking) {
_outboundQueue.wait();
long remainingTime = expiration - _context.clock().now();
if (remainingTime > 0) {
_outboundQueue.wait(remainingTime);
} else {
remaining = _outboundQueue.size();
_outboundQueue.notifyAll();
}
}
}

View File

@ -7,6 +7,8 @@ import java.net.UnknownHostException;
import java.io.IOException;
import java.io.Writer;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -108,6 +110,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments = new OutboundMessageFragments(_context, this);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this);
_flooder = new UDPFlooder(_context, this);
_context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
}
public void startup() {
@ -322,6 +326,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.WARN))
_log.debug("Dropping remote peer: " + peer);
if (peer.getRemotePeer() != null) {
long now = _context.clock().now();
_context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries");
synchronized (_peersByIdent) {
_peersByIdent.remove(peer.getRemotePeer());
@ -531,7 +537,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
StringBuffer buf = new StringBuffer(512);
buf.append("<b>UDP connections: ").append(peers.size()).append("</b><br />\n");
buf.append("<table border=\"1\">\n");
buf.append(" <tr><td><b>peer</b></td><td><b>activity (in/out)</b></td>\n");
buf.append(" <tr><td><b>peer</b></td><td><b>activity (in/out)</b></td>");
buf.append(" <td><b>transfer (in/out)</b></td>\n");
buf.append(" <td><b>uptime</b></td><td><b>skew</b></td>\n");
buf.append(" <td><b>cwnd</b></td><td><b>ssthresh</b></td>\n");
buf.append(" <td><b>rtt</b></td><td><b>dev</b></td><td><b>rto</b></td>\n");
@ -567,10 +574,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("</td>");
buf.append("<td>");
buf.append(DataHelper.formatDuration(now-peer.getLastReceiveTime()));
buf.append("/");
buf.append(DataHelper.formatDuration(now-peer.getLastSendTime()));
buf.append("</td>");
buf.append((now-peer.getLastReceiveTime())/1000);
buf.append("s/");
buf.append((now-peer.getLastSendTime())/1000);
buf.append("s</td>");
buf.append("<td>");
buf.append(formatKBps(peer.getReceiveBps()));
buf.append("KBps/");
buf.append(formatKBps(peer.getSendBps()));
buf.append("KBps</td>");
buf.append("<td>");
buf.append(DataHelper.formatDuration(now-peer.getKeyEstablishedTime()));
@ -616,6 +629,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
out.write("</table>\n");
}
private static final DecimalFormat _fmt = new DecimalFormat("#,##0.00");
private static final String formatKBps(int bps) {
synchronized (_fmt) {
return _fmt.format((float)bps/1024);
}
}
/**
* Cache the bid to reduce object churn
*/