2005-10-12 jrandom

* Choke SSU retransmissions to a peer while there is already a
      retransmission in flight to them.  This currently lets other initial
      transmissions through, since packet loss is often sporadic, but maybe
      this should block initial transmissions as well?
    * Display the retransmission bytes stat on peers.jsp (thanks bar!)
    * Filter QUIT messages in the I2PTunnelIRCClient proxy
This commit is contained in:
jrandom
2005-10-13 03:54:54 +00:00
committed by zzz
parent c4d785667a
commit 3516701272
5 changed files with 79 additions and 4 deletions

View File

@ -330,7 +330,7 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable
"LIST",
"NAMES",
"NICK",
"QUIT",
// "QUIT", // replace with a filtered QUIT to hide client quit messages
"SILENCE",
"PART",
"OPER",
@ -379,6 +379,8 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable
String realname = field[2].substring(idx+1);
String ret = "USER "+field[1]+" hostname localhost :"+realname;
return ret;
} else if ("QUIT".equals(command)) {
return "QUIT :leaving";
}
// Block the rest

View File

@ -1,4 +1,12 @@
$Id: history.txt,v 1.290 2005/10/11 02:07:40 jrandom Exp $
$Id: history.txt,v 1.291 2005/10/11 16:05:15 jrandom Exp $
2005-10-12 jrandom
* Choke SSU retransmissions to a peer while there is already a
retransmission in flight to them. This currently lets other initial
transmissions through, since packet loss is often sporadic, but maybe
this should block initial transmissions as well?
* Display the retransmission bytes stat on peers.jsp (thanks bar!)
* Filter QUIT messages in the I2PTunnelIRCClient proxy
2005-10-11 jrandom
* Piggyback the SSU partial ACKs with data packets. This is backwards

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.265 $ $Date: 2005/10/11 02:07:40 $";
public final static String ID = "$Revision: 1.266 $ $Date: 2005/10/11 16:05:14 $";
public final static String VERSION = "0.6.1.2";
public final static long BUILD = 6;
public final static long BUILD = 7;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -1,7 +1,10 @@
package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
@ -36,6 +39,13 @@ public class OutboundMessageFragments {
private PacketBuilder _builder;
/** if we can handle more messages explicitly, set this to true */
private boolean _allowExcess;
private volatile long _packetsRetransmitted;
/**
* Map of peer to OutboundMessageState for messages being retransmitted, to
* keep bad peers from bursting too much due to congestion/outage. This
* should only be accessed when holding the lock on _activeMessages.
*/
private Map _retransmitters;
private static final int MAX_ACTIVE = 64;
// don't send a packet more than 10 times
@ -47,6 +57,7 @@ public class OutboundMessageFragments {
_transport = transport;
_throttle = throttle;
_activeMessages = new ArrayList(MAX_ACTIVE);
_retransmitters = new HashMap(MAX_ACTIVE);
_nextPacketMessage = 0;
_builder = new PacketBuilder(ctx, transport);
_alive = true;
@ -64,6 +75,9 @@ public class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendPiggyback", "How many acks were piggybacked on a data packet (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendPiggybackPartial", "How many partial acks were piggybacked on a data packet (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.activeDelay", "How often we wait blocking on the active queue", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.packetsRetransmitted", "How many packets have been retransmitted (lifetime) when a burst of packets are retransmitted (period == packets transmitted, lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.peerPacketsRetransmitted", "How many packets have been retransmitted to the peer (lifetime) when a burst of packets are retransmitted (period == packets transmitted, lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.blockedRetransmissions", "How packets have been transmitted to the peer when we blocked a retransmission to them?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public void startup() { _alive = true; }
@ -157,6 +171,7 @@ public class OutboundMessageFragments {
PeerState peer = state.getPeer();
if (state.isComplete()) {
_activeMessages.remove(i);
locked_removeRetransmitter(state);
_transport.succeeded(state.getMessage());
if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) )
_throttle.unchoke(peer.getRemotePeer());
@ -169,6 +184,7 @@ public class OutboundMessageFragments {
i--;
} else if (state.isExpired()) {
_activeMessages.remove(i);
locked_removeRetransmitter(state);
_context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime());
if (state.getMessage() != null) {
@ -190,6 +206,7 @@ public class OutboundMessageFragments {
i--;
} else if (state.getPushCount() > MAX_VOLLEYS) {
_activeMessages.remove(i);
locked_removeRetransmitter(state);
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime());
//if (state.getPeer() != null)
// state.getPeer().congestionOccurred();
@ -217,6 +234,29 @@ public class OutboundMessageFragments {
} // end synchronized
}
/**
* Remove the block on retransmissions to the peer if and only if the given
* message is the current "retransmitter" for it.
*
*/
private void locked_removeRetransmitter(OutboundMessageState state) {
PeerState curPeer = state.getPeer();
if (curPeer == null) {
for (Iterator iter = _retransmitters.keySet().iterator(); iter.hasNext(); ) {
PeerState cpeer = (PeerState)iter.next();
OutboundMessageState cstate = (OutboundMessageState)_retransmitters.get(cpeer);
if (cstate == state) {
iter.remove();
break;
}
}
} else {
OutboundMessageState remState = (OutboundMessageState)_retransmitters.get(curPeer);
if (remState == state)
_retransmitters.remove(curPeer);
}
}
private static final long SECOND_MASK = 1023l;
/**
@ -250,6 +290,7 @@ public class OutboundMessageFragments {
if (peer == null) {
// peer disconnected
_activeMessages.remove(cur);
locked_removeRetransmitter(state);
_transport.failed(state);
if (_log.shouldLog(Log.WARN))
_log.warn("Peer disconnected for " + state);
@ -313,6 +354,17 @@ public class OutboundMessageFragments {
+ " for message " + state.getMessageId() + ": " + state);
if (state.getPushCount() > 0) {
OutboundMessageState curRetransMsg = (OutboundMessageState)_retransmitters.get(peer);
if ( (curRetransMsg != null) && (curRetransMsg == state) ) {
// choke it, there's already another message to this peer in flight
// perhaps we should release the sendingBytes? or maybe not, since we
// *do* want to choke the peer...
_context.statManager().addRateData("udp.blockedRetransmissions", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted());
return false;
} else {
_retransmitters.put(peer, state);
}
int fragments = state.getFragmentCount();
int toSend = 0;
for (int i = 0; i < fragments; i++) {
@ -321,6 +373,9 @@ public class OutboundMessageFragments {
}
peer.messageRetransmitted(toSend);
_packetsRetransmitted += toSend; // lifetime for the transport
_context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted());
_context.statManager().addRateData("udp.packetsRetransmitted", _packetsRetransmitted, peer.getPacketsTransmitted());
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmitting " + state + " to " + peer);
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), toSend);
@ -436,6 +491,7 @@ public class OutboundMessageFragments {
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
locked_removeRetransmitter(state);
break;
} else {
state = null;
@ -508,6 +564,7 @@ public class OutboundMessageFragments {
_nextPacketMessage = 0;
}
}
locked_removeRetransmitter(state);
break;
} else {
state = null;

View File

@ -1108,6 +1108,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(" <td>").append(resentTotal);
buf.append("</td><td>").append(dupRecvTotal).append("</td>\n");
buf.append(" </tr>\n");
buf.append("<tr><td colspan=\"14\" valign=\"top\" align=\"left\">");
long bytesTransmitted = _context.bandwidthLimiter().getTotalAllocatedOutboundBytes();
double averagePacketSize = _context.statManager().getRate("udp.sendPacketSize").getLifetimeAverageValue();
double nondupSent = ((double)bytesTransmitted - ((double)resentTotal)*averagePacketSize);
double bwResent = (nondupSent <= 0 ? 0d : ((((double)resentTotal)*averagePacketSize) / nondupSent));
buf.append("Percentage of bytes retransmitted (lifetime): ").append(formatPct(bwResent));
buf.append(" <i>(includes retransmission required by packet loss)</i><br />\n");
buf.append("</td></tr>\n");
out.write(buf.toString());
buf.setLength(0);