2005-08-07 Complication

* Display the average clock skew for both SSU and TCP connections
2005-08-07  jrandom
    * Fixed the long standing streaming lib bug where we could lose the first
      packet on retransmission.
    * Avoid an NPE when a message expires on the SSU queue.
    * Adjust the streaming lib's window growth factor with an additional
      Vegas-esque congestion detection algorithm.
    * Removed an unnecessary SSU session drop
    * Reduced the MTU (until we get a working PMTU lib)
    * Deferr tunnel acceptance until we know how to reach the next hop,
      rejecting it if we can't find them in time.
    * If our netDb store of our leaseSet fails, give it a few seconds before
      republishing.
This commit is contained in:
jrandom
2005-08-07 19:31:58 +00:00
committed by zzz
parent a375e4b2ce
commit ba30b56c5f
23 changed files with 582 additions and 271 deletions

View File

@ -189,6 +189,7 @@ public class SummaryHelper {
return "0.0";
RateStat receiveRate = _context.statManager().getRate("transport.receiveMessageSize");
if (receiveRate == null) return "0.0";
Rate rate = receiveRate.getRate(60*1000);
double bytes = rate.getLastTotalValue();
double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d);
@ -206,6 +207,7 @@ public class SummaryHelper {
return "0.0";
RateStat receiveRate = _context.statManager().getRate("transport.sendMessageSize");
if (receiveRate == null) return "0.0";
Rate rate = receiveRate.getRate(60*1000);
double bytes = rate.getLastTotalValue();
double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d);
@ -224,6 +226,7 @@ public class SummaryHelper {
return "0.0";
RateStat receiveRate = _context.statManager().getRate("transport.receiveMessageSize");
if (receiveRate == null) return "0.0";
Rate rate = receiveRate.getRate(5*60*1000);
double bytes = rate.getLastTotalValue();
double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d);
@ -242,6 +245,7 @@ public class SummaryHelper {
return "0.0";
RateStat receiveRate = _context.statManager().getRate("transport.sendMessageSize");
if (receiveRate == null) return "0.0";
Rate rate = receiveRate.getRate(5*60*1000);
double bytes = rate.getLastTotalValue();
double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d);

View File

@ -272,10 +272,13 @@ public class Connection {
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), timeout);
}
_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize());
_lastSendTime = _context.clock().now();
_outboundQueue.enqueue(packet);
resetActivityTimer();
/*
if (ackOnly) {
// ACK only, don't schedule this packet for retries
// however, if we are running low on sessionTags we want to send
@ -286,6 +289,7 @@ public class Connection {
_connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent(), new PingNotifier());
}
}
*/
}
private class PingNotifier implements ConnectionManager.PingNotifier {

View File

@ -13,6 +13,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
private int _receiveWindow;
private int _profile;
private int _rtt;
private int _trend[];
private int _resendDelay;
private int _sendAckDelay;
private int _maxMessageSize;
@ -50,6 +51,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = "i2p.streaming.congestionAvoidanceGrowthRateFactor";
public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor";
private static final int TREND_COUNT = 3;
public ConnectionOptions() {
super();
}
@ -85,6 +88,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
protected void init(Properties opts) {
super.init(opts);
_trend = new int[TREND_COUNT];
setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, 4*1024));
@ -186,11 +191,36 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
*/
public int getRTT() { return _rtt; }
public void setRTT(int ms) {
synchronized (_trend) {
_trend[0] = _trend[1];
_trend[1] = _trend[2];
if (ms > _rtt)
_trend[2] = 1;
else if (ms < _rtt)
_trend[2] = -1;
else
_trend[2] = 0;
}
_rtt = ms;
if (_rtt > 60*1000)
_rtt = 60*1000;
}
/**
* If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have
* 3 consecutive rtt decreases, we are trending downwards (-1), else we're stable.
*
*/
public int getRTTTrend() {
synchronized (_trend) {
for (int i = 0; i < TREND_COUNT - 1; i++) {
if (_trend[i] != _trend[i+1])
return 0;
}
return _trend[0];
}
}
/** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */
private static final double RTT_DAMPENING = 0.9;

View File

@ -26,6 +26,7 @@ public class ConnectionPacketHandler {
_context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.resetReceived", "How many messages had we sent successfully before receiving a RESET?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 });
}
/** distribute a packet to the connection specified */
@ -177,7 +178,19 @@ public class ConnectionPacketHandler {
// con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000);
int numResends = 0;
List acked = con.ackPackets(ackThrough, nacks);
List acked = null;
// if we don't know the streamIds for both sides of the connection, there's no way we
// could actually be acking data (this fixes the buggered up ack of packet 0 problem).
// this is called after packet verification, which places the stream IDs as necessary if
// the SYN verifies (so if we're acking w/out stream IDs, no SYN has been received yet)
if ( (packet.getSendStreamId() != null) && (packet.getReceiveStreamId() != null) &&
(con.getSendStreamId() != null) && (con.getReceiveStreamId() != null) &&
(!DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(con.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) )
acked = con.ackPackets(ackThrough, nacks);
if ( (acked != null) && (acked.size() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(acked.size() + " of our packets acked with " + packet);
@ -247,8 +260,13 @@ public class ConnectionPacketHandler {
int oldWindow = con.getOptions().getWindowSize();
int newWindowSize = oldWindow;
int trend = con.getOptions().getRTTTrend();
_context.statManager().addRateData("stream.trend", trend, newWindowSize);
if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
if ( (newWindowSize > con.getLastCongestionSeenAt() / 2) ||
(trend > 0) ) { // tcp vegas: avoidance if rtt is increasing, even if we arent at ssthresh/2 yet
// congestion avoidance
// we can't use newWindowSize += 1/newWindowSize, since we're

View File

@ -578,7 +578,7 @@ public class Packet {
return buf;
}
private static final String toId(byte id[]) {
static final String toId(byte id[]) {
if (id == null)
return Base64.encode(STREAM_ID_UNKNOWN);
else

View File

@ -1,4 +1,20 @@
$Id: history.txt,v 1.221 2005/08/01 22:26:51 duck Exp $
$Id: history.txt,v 1.222 2005/08/03 13:58:13 jrandom Exp $
2005-08-07 Complication
* Display the average clock skew for both SSU and TCP connections
2005-08-07 jrandom
* Fixed the long standing streaming lib bug where we could lose the first
packet on retransmission.
* Avoid an NPE when a message expires on the SSU queue.
* Adjust the streaming lib's window growth factor with an additional
Vegas-esque congestion detection algorithm.
* Removed an unnecessary SSU session drop
* Reduced the MTU (until we get a working PMTU lib)
* Deferr tunnel acceptance until we know how to reach the next hop,
rejecting it if we can't find them in time.
* If our netDb store of our leaseSet fails, give it a few seconds before
republishing.
* 2005-08-03 0.6.0.1 released

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.210 $ $Date: 2005/07/31 16:35:27 $";
public final static String ID = "$Revision: 1.211 $ $Date: 2005/08/03 13:58:13 $";
public final static String VERSION = "0.6.0.1";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -84,7 +84,7 @@ public class RepublishLeaseSetJob extends JobImpl {
public void runJob() {
if (_log.shouldLog(Log.WARN))
_log.warn("FAILED publishing of the leaseSet for " + _dest.toBase64());
RepublishLeaseSetJob.this.requeue(5*1000);
RepublishLeaseSetJob.this.requeue(30*1000);
}
}
}

View File

@ -334,16 +334,19 @@ class SearchJob extends JobImpl {
}
TunnelId inTunnelId = inTunnel.getReceiveTunnelId(0);
RouterInfo inGateway = getContext().netDb().lookupRouterInfoLocally(inTunnel.getPeer(0));
if (inGateway == null) {
_log.error("We can't find the gateway to our inbound tunnel?! wtf");
getContext().jobQueue().addJob(new FailedJob(getContext(), router));
return;
}
// this will fail if we've shitlisted our inbound gateway, but the gw may not necessarily
// be shitlisted by whomever needs to contact them, so we don't need to check this
//RouterInfo inGateway = getContext().netDb().lookupRouterInfoLocally(inTunnel.getPeer(0));
//if (inGateway == null) {
// _log.error("We can't find the gateway to our inbound tunnel?! wtf");
// getContext().jobQueue().addJob(new FailedJob(getContext(), router));
// return;
//}
long expiration = getContext().clock().now() + getPerPeerTimeoutMs();
DatabaseLookupMessage msg = buildMessage(inTunnelId, inGateway, expiration);
DatabaseLookupMessage msg = buildMessage(inTunnelId, inTunnel.getPeer(0), expiration);
TunnelInfo outTunnel = getOutboundTunnelId();
if (outTunnel == null) {
@ -409,10 +412,11 @@ class SearchJob extends JobImpl {
* @param replyGateway gateway for the reply tunnel
* @param expiration when the search should stop
*/
protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) {
protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, Hash replyGateway, long expiration) {
DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true);
msg.setSearchKey(_state.getTarget());
msg.setFrom(replyGateway.getIdentity().getHash());
//msg.setFrom(replyGateway.getIdentity().getHash());
msg.setFrom(replyGateway);
msg.setDontIncludePeers(_state.getClosestAttempted(MAX_CLOSEST));
msg.setMessageExpiration(expiration);
msg.setReplyTunnel(replyTunnelId);
@ -504,6 +508,8 @@ class SearchJob extends JobImpl {
boolean sendsBadInfo = getContext().profileOrganizer().peerSendsBadReplies(_peer);
if (!sendsBadInfo) {
// we don't need to search for everthing we're given here - only ones that
// are next in our search path...
getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs);
_repliesPendingVerification++;
} else {

View File

@ -39,7 +39,7 @@ class StoreJob extends JobImpl {
private PeerSelector _peerSelector;
private final static int PARALLELIZATION = 3; // how many sent at a time
private final static int REDUNDANCY = 10; // we want the data sent to 10 peers
private final static int REDUNDANCY = 6; // we want the data sent to 6 peers
/**
* additionally send to 1 outlier(s), in case all of the routers chosen in our
* REDUNDANCY set are attacking us by accepting DbStore messages but dropping

View File

@ -802,7 +802,7 @@ public class TCPTransport extends TransportImpl {
}
buf.append("</ul>\n");
buf.append("<b>Average clock skew: ");
buf.append("<b>Average clock skew, TCP peers: ");
if (_connectionsByIdent.size() > 0)
buf.append(offsetTotal / _connectionsByIdent.size()).append("ms</b><br />\n");
else

View File

@ -22,7 +22,7 @@ public class ACKSender implements Runnable {
private boolean _alive;
/** how frequently do we want to send ACKs to a peer? */
static final int ACK_FREQUENCY = 200;
static final int ACK_FREQUENCY = 100;
public ACKSender(RouterContext ctx, UDPTransport transport) {
_context = ctx;

View File

@ -4,6 +4,8 @@ import java.util.ArrayList;
import java.util.List;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
@ -35,7 +37,7 @@ public class OutboundMessageFragments {
/** if we can handle more messages explicitly, set this to true */
private boolean _allowExcess;
private static final int MAX_ACTIVE = 32;
private static final int MAX_ACTIVE = 64;
// don't send a packet more than 10 times
static final int MAX_VOLLEYS = 10;
@ -83,6 +85,7 @@ public class OutboundMessageFragments {
long start = _context.clock().now();
int numActive = 0;
int maxActive = Math.max(_transport.countActivePeers(), MAX_ACTIVE);
while (_alive) {
finishMessages();
try {
@ -90,7 +93,7 @@ public class OutboundMessageFragments {
numActive = _activeMessages.size();
if (!_alive)
return false;
else if (numActive < MAX_ACTIVE)
else if (numActive < maxActive)
return true;
else if (_allowExcess)
return true;
@ -108,9 +111,18 @@ public class OutboundMessageFragments {
*
*/
public void add(OutNetMessage msg) {
I2NPMessage msgBody = msg.getMessage();
RouterInfo target = msg.getTarget();
if ( (msgBody == null) || (target == null) ) {
synchronized (_activeMessages) {
_activeMessages.notifyAll();
}
return;
}
OutboundMessageState state = new OutboundMessageState(_context);
boolean ok = state.initialize(msg);
state.setPeer(_transport.getPeerState(msg.getTarget().getIdentity().calculateHash()));
boolean ok = state.initialize(msg, msgBody);
state.setPeer(_transport.getPeerState(target.getIdentity().calculateHash()));
finishMessages();
int active = 0;
synchronized (_activeMessages) {
@ -337,7 +349,7 @@ public class OutboundMessageFragments {
}
private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
if (state != null) {
if ( (state != null) && (peer != null) ) {
int fragments = state.getFragmentCount();
if (fragments < 0)
return null;
@ -420,14 +432,16 @@ public class OutboundMessageFragments {
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
_transport.succeeded(state.getMessage());
int numFragments = state.getFragmentCount();
if (state.getPeer() != null) {
PeerState peer = state.getPeer();
if (peer != null) {
// this adjusts the rtt/rto/window/etc
state.getPeer().messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), state.getMaxSends());
peer.messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), state.getMaxSends());
if (peer.getSendWindowBytesRemaining() > 0)
_throttle.unchoke(peer.getRemotePeer());
} else {
_log.warn("message acked, but no peer attacked: " + state);
if (_log.shouldLog(Log.WARN))
_log.warn("message acked, but no peer attacked: " + state);
}
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
state.releaseResources();
return numFragments;
} else {

View File

@ -45,6 +45,7 @@ public class OutboundMessageState {
}
public boolean initialize(OutNetMessage msg) {
if (msg == null) return false;
try {
initialize(msg, msg.getMessage(), null);
return true;
@ -57,6 +58,9 @@ public class OutboundMessageState {
}
public boolean initialize(I2NPMessage msg, PeerState peer) {
if (msg == null)
return false;
try {
initialize(null, msg, peer);
return true;
@ -68,6 +72,21 @@ public class OutboundMessageState {
}
}
public boolean initialize(OutNetMessage m, I2NPMessage msg) {
if ( (m == null) || (msg == null) )
return false;
try {
initialize(m, msg, null);
return true;
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Exception e) {
_log.log(Log.CRIT, "Error initializing " + msg, e);
return false;
}
}
private void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
_message = m;
_peer = peer;
@ -200,7 +219,7 @@ public class OutboundMessageState {
public void fragment(int fragmentSize) {
int totalSize = _messageBuf.getValid();
int numFragments = totalSize / fragmentSize;
if (numFragments * fragmentSize != totalSize)
if (numFragments * fragmentSize < totalSize)
numFragments++;
if (_log.shouldLog(Log.DEBUG))

View File

@ -1,7 +1,9 @@
package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import net.i2p.data.Base64;
import net.i2p.router.Router;
@ -29,8 +31,12 @@ public class PacketHandler {
private InboundMessageFragments _inbound;
private PeerTestManager _testManager;
private boolean _keepReading;
private List _handlers;
private static final int NUM_HANDLERS = 3;
/** let packets be up to 30s slow */
private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000;
public PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound, PeerTestManager testManager) {
_context = ctx;
@ -40,6 +46,10 @@ public class PacketHandler {
_establisher = establisher;
_inbound = inbound;
_testManager = testManager;
_handlers = new ArrayList(NUM_HANDLERS);
for (int i = 0; i < NUM_HANDLERS; i++) {
_handlers.add(new Handler());
}
_context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", new long[] { 10*60*1000, 60*60*1000 });
@ -52,8 +62,8 @@ public class PacketHandler {
public void startup() {
_keepReading = true;
for (int i = 0; i < NUM_HANDLERS; i++) {
I2PThread t = new I2PThread(new Handler(), "Packet handler " + i + ": " + _endpoint.getListenPort());
for (int i = 0; i < _handlers.size(); i++) {
I2PThread t = new I2PThread((Handler)_handlers.get(i), "Packet handler " + i + ": " + _endpoint.getListenPort());
t.setDaemon(true);
t.start();
}
@ -62,30 +72,51 @@ public class PacketHandler {
public void shutdown() {
_keepReading = false;
}
String getHandlerStatus() {
StringBuffer rv = new StringBuffer();
int size = _handlers.size();
rv.append("Handlers: ").append(size);
for (int i = 0; i < size; i++) {
Handler handler = (Handler)_handlers.get(i);
rv.append(" handler ").append(i).append(" state: ").append(handler._state);
}
return rv.toString();
}
private class Handler implements Runnable {
private UDPPacketReader _reader;
public volatile int _state;
public Handler() {
_reader = new UDPPacketReader(_context);
_state = 0;
}
public void run() {
_state = 1;
while (_keepReading) {
_state = 2;
UDPPacket packet = _endpoint.receive();
_state = 3;
if (packet == null) continue; // keepReading is probably false...
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received the packet " + packet);
_state = 4;
long queueTime = packet.getLifetime();
long handleStart = _context.clock().now();
try {
_state = 5;
handlePacket(_reader, packet);
_state = 6;
} catch (Exception e) {
_state = 7;
if (_log.shouldLog(Log.ERROR))
_log.error("Crazy error handling a packet: " + packet, e);
}
long handleTime = _context.clock().now() - handleStart;
_context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime());
_context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime());
_state = 8;
if (handleTime > 1000) {
if (_log.shouldLog(Log.WARN))
@ -95,244 +126,287 @@ public class PacketHandler {
// back to the cache with thee!
packet.release();
_state = 9;
}
}
}
private void handlePacket(UDPPacketReader reader, UDPPacket packet) {
if (packet == null) return;
RemoteHostId rem = packet.getRemoteHost();
PeerState state = _transport.getPeerState(rem);
if (state == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for a connected peer");
InboundEstablishState est = _establisher.getInboundState(rem);
if (est != null) {
//}
private void handlePacket(UDPPacketReader reader, UDPPacket packet) {
if (packet == null) return;
_state = 10;
RemoteHostId rem = packet.getRemoteHost();
PeerState state = _transport.getPeerState(rem);
if (state == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an inbound establishment");
receivePacket(reader, packet, est);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for an inbound establishment");
OutboundEstablishState oest = _establisher.getOutboundState(rem);
if (oest != null) {
_log.debug("Packet received is not for a connected peer");
_state = 11;
InboundEstablishState est = _establisher.getInboundState(rem);
if (est != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an outbound establishment");
receivePacket(reader, packet, oest);
_log.debug("Packet received IS for an inbound establishment");
_state = 12;
receivePacket(reader, packet, est);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for an inbound or outbound establishment");
// ok, not already known establishment, try as a new one
receivePacket(reader, packet);
_log.debug("Packet received is not for an inbound establishment");
_state = 13;
OutboundEstablishState oest = _establisher.getOutboundState(rem);
if (oest != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an outbound establishment");
_state = 14;
receivePacket(reader, packet, oest);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for an inbound or outbound establishment");
// ok, not already known establishment, try as a new one
_state = 15;
receivePacket(reader, packet);
}
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an existing peer");
_state = 16;
receivePacket(reader, packet, state);
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an existing peer");
receivePacket(reader, packet, state);
}
}
private void receivePacket(UDPPacketReader reader, UDPPacket packet, PeerState state) {
boolean isValid = packet.validate(state.getCurrentMACKey());
if (!isValid) {
if (state.getNextMACKey() != null)
isValid = packet.validate(state.getNextMACKey());
private void receivePacket(UDPPacketReader reader, UDPPacket packet, PeerState state) {
_state = 17;
boolean isValid = packet.validate(state.getCurrentMACKey());
if (!isValid) {
_state = 18;
if (state.getNextMACKey() != null)
isValid = packet.validate(state.getNextMACKey());
if (!isValid) {
_state = 19;
if (_log.shouldLog(Log.WARN))
_log.warn("Failed validation with existing con, trying as new con: " + packet);
isValid = packet.validate(_transport.getIntroKey());
if (isValid) {
_state = 20;
// this is a stray packet from an inbound establishment
// process, so try our intro key
// (after an outbound establishment process, there wouldn't
// be any stray packets)
if (_log.shouldLog(Log.INFO))
_log.info("Validation with existing con failed, but validation as reestablish/stray passed");
packet.decrypt(_transport.getIntroKey());
} else {
_state = 21;
InboundEstablishState est = _establisher.getInboundState(packet.getRemoteHost());
if (est != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet from an existing peer IS for an inbound establishment");
_state = 22;
receivePacket(reader, packet, est, false);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP");
_context.statManager().addRateData("udp.droppedInvalidReestablish", packet.getLifetime(), packet.getExpiration());
}
return;
}
} else {
_state = 23;
packet.decrypt(state.getNextCipherKey());
}
} else {
_state = 24;
packet.decrypt(state.getCurrentCipherKey());
}
_state = 25;
handlePacket(reader, packet, state, null, null);
_state = 26;
}
private void receivePacket(UDPPacketReader reader, UDPPacket packet) {
_state = 27;
boolean isValid = packet.validate(_transport.getIntroKey());
if (!isValid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Failed validation with existing con, trying as new con: " + packet);
isValid = packet.validate(_transport.getIntroKey());
_log.warn("Invalid introduction packet received: " + packet, new Exception("path"));
_context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration());
_state = 28;
return;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Valid introduction packet received: " + packet);
}
_state = 29;
packet.decrypt(_transport.getIntroKey());
handlePacket(reader, packet, null, null, null);
_state = 30;
}
private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state) {
receivePacket(reader, packet, state, true);
}
/**
* @param allowFallback if it isn't valid for this establishment state, try as a non-establishment packet
*/
private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state, boolean allowFallback) {
_state = 31;
if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) {
StringBuffer buf = new StringBuffer(128);
buf.append("Attempting to receive a packet on a known inbound state: ");
buf.append(state);
buf.append(" MAC key: ").append(state.getMACKey());
buf.append(" intro key: ").append(_transport.getIntroKey());
_log.debug(buf.toString());
}
boolean isValid = false;
if (state.getMACKey() != null) {
isValid = packet.validate(state.getMACKey());
if (isValid) {
// this is a stray packet from an inbound establishment
// process, so try our intro key
// (after an outbound establishment process, there wouldn't
// be any stray packets)
if (_log.shouldLog(Log.INFO))
_log.info("Validation with existing con failed, but validation as reestablish/stray passed");
packet.decrypt(_transport.getIntroKey());
if (_log.shouldLog(Log.WARN))
_log.warn("Valid introduction packet received for inbound con: " + packet);
_state = 32;
packet.decrypt(state.getCipherKey());
handlePacket(reader, packet, null, null, null);
return;
} else {
InboundEstablishState est = _establisher.getInboundState(packet.getRemoteHost());
if (est != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet from an existing peer IS for an inbound establishment");
receivePacket(reader, packet, est, false);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP");
_context.statManager().addRateData("udp.droppedInvalidReestablish", packet.getLifetime(), packet.getExpiration());
}
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid introduction packet received for inbound con, falling back: " + packet);
_state = 33;
}
}
if (allowFallback) {
// ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets
_state = 34;
receivePacket(reader, packet);
} else {
_context.statManager().addRateData("udp.droppedInvalidInboundEstablish", packet.getLifetime(), packet.getExpiration());
}
}
private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) {
_state = 35;
if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) {
StringBuffer buf = new StringBuffer(128);
buf.append("Attempting to receive a packet on a known outbound state: ");
buf.append(state);
buf.append(" MAC key: ").append(state.getMACKey());
buf.append(" intro key: ").append(state.getIntroKey());
_log.debug(buf.toString());
}
boolean isValid = false;
if (state.getMACKey() != null) {
_state = 36;
isValid = packet.validate(state.getMACKey());
if (isValid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Valid introduction packet received for outbound established con: " + packet);
_state = 37;
packet.decrypt(state.getCipherKey());
handlePacket(reader, packet, null, state, null);
_state = 38;
return;
}
} else {
packet.decrypt(state.getNextCipherKey());
}
} else {
packet.decrypt(state.getCurrentCipherKey());
}
handlePacket(reader, packet, state, null, null);
}
private void receivePacket(UDPPacketReader reader, UDPPacket packet) {
boolean isValid = packet.validate(_transport.getIntroKey());
if (!isValid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid introduction packet received: " + packet, new Exception("path"));
_context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration());
return;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Valid introduction packet received: " + packet);
}
packet.decrypt(_transport.getIntroKey());
handlePacket(reader, packet, null, null, null);
}
private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state) {
receivePacket(reader, packet, state, true);
}
/**
* @param allowFallback if it isn't valid for this establishment state, try as a non-establishment packet
*/
private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state, boolean allowFallback) {
if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) {
StringBuffer buf = new StringBuffer(128);
buf.append("Attempting to receive a packet on a known inbound state: ");
buf.append(state);
buf.append(" MAC key: ").append(state.getMACKey());
buf.append(" intro key: ").append(_transport.getIntroKey());
_log.debug(buf.toString());
}
boolean isValid = false;
if (state.getMACKey() != null) {
isValid = packet.validate(state.getMACKey());
// keys not yet exchanged, lets try it with the peer's intro key
isValid = packet.validate(state.getIntroKey());
if (isValid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Valid introduction packet received for inbound con: " + packet);
packet.decrypt(state.getCipherKey());
handlePacket(reader, packet, null, null, null);
_log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet);
_state = 39;
packet.decrypt(state.getIntroKey());
handlePacket(reader, packet, null, state, null);
_state = 40;
return;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid introduction packet received for inbound con, falling back: " + packet);
_log.warn("Invalid introduction packet received for outbound established con with old intro key, falling back: " + packet);
}
}
if (allowFallback) {
// ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets
_state = 41;
receivePacket(reader, packet);
} else {
_context.statManager().addRateData("udp.droppedInvalidInboundEstablish", packet.getLifetime(), packet.getExpiration());
_state = 42;
}
}
private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) {
if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) {
StringBuffer buf = new StringBuffer(128);
buf.append("Attempting to receive a packet on a known outbound state: ");
buf.append(state);
buf.append(" MAC key: ").append(state.getMACKey());
buf.append(" intro key: ").append(state.getIntroKey());
_log.debug(buf.toString());
}
boolean isValid = false;
if (state.getMACKey() != null) {
isValid = packet.validate(state.getMACKey());
if (isValid) {
/**
* Parse out the interesting bits and honor what it says
*/
private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) {
_state = 43;
reader.initialize(packet);
_state = 44;
long recvOn = packet.getBegin();
long sendOn = reader.readTimestamp() * 1000;
long skew = recvOn - sendOn;
if (skew > GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Valid introduction packet received for outbound established con: " + packet);
packet.decrypt(state.getCipherKey());
handlePacket(reader, packet, null, state, null);
_log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidSkew", skew, packet.getExpiration());
return;
} else if (skew < 0 - GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidSkew", 0-skew, packet.getExpiration());
return;
}
}
// keys not yet exchanged, lets try it with the peer's intro key
isValid = packet.validate(state.getIntroKey());
if (isValid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet);
packet.decrypt(state.getIntroKey());
handlePacket(reader, packet, null, state, null);
return;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid introduction packet received for outbound established con with old intro key, falling back: " + packet);
}
// ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets
receivePacket(reader, packet);
}
/** let packets be up to 30s slow */
private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000;
/**
* Parse out the interesting bits and honor what it says
*/
private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) {
reader.initialize(packet);
long recvOn = packet.getBegin();
long sendOn = reader.readTimestamp() * 1000;
long skew = recvOn - sendOn;
if (skew > GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidSkew", skew, packet.getExpiration());
return;
} else if (skew < 0 - GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidSkew", 0-skew, packet.getExpiration());
return;
}
if (state != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received packet from " + state.getRemoteHostId().toString() + " with skew " + skew);
state.adjustClockSkew((short)skew);
}
_context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime());
//InetAddress fromHost = packet.getPacket().getAddress();
//int fromPort = packet.getPacket().getPort();
//RemoteHostId from = new RemoteHostId(fromHost.getAddress(), fromPort);
RemoteHostId from = packet.getRemoteHost();
switch (reader.readPayloadType()) {
case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST:
_establisher.receiveSessionRequest(from, reader);
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CONFIRMED:
_establisher.receiveSessionConfirmed(from, reader);
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CREATED:
_establisher.receiveSessionCreated(from, reader);
break;
case UDPPacket.PAYLOAD_TYPE_DATA:
if (outState != null)
state = _establisher.receiveData(outState);
if (_log.shouldLog(Log.INFO))
_log.info("Received new DATA packet from " + state + ": " + packet);
_inbound.receiveData(state, reader.getDataReader());
break;
case UDPPacket.PAYLOAD_TYPE_TEST:
_testManager.receiveTest(from, reader);
break;
default:
if (_log.shouldLog(Log.WARN))
_log.warn("Unknown payload type: " + reader.readPayloadType());
_context.statManager().addRateData("udp.droppedInvalidUnknown", packet.getLifetime(), packet.getExpiration());
return;
if (state != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received packet from " + state.getRemoteHostId().toString() + " with skew " + skew);
state.adjustClockSkew((short)skew);
}
_context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime());
//InetAddress fromHost = packet.getPacket().getAddress();
//int fromPort = packet.getPacket().getPort();
//RemoteHostId from = new RemoteHostId(fromHost.getAddress(), fromPort);
_state = 45;
RemoteHostId from = packet.getRemoteHost();
_state = 46;
switch (reader.readPayloadType()) {
case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST:
_state = 47;
_establisher.receiveSessionRequest(from, reader);
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CONFIRMED:
_state = 48;
_establisher.receiveSessionConfirmed(from, reader);
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CREATED:
_state = 49;
_establisher.receiveSessionCreated(from, reader);
break;
case UDPPacket.PAYLOAD_TYPE_DATA:
_state = 50;
if (outState != null)
state = _establisher.receiveData(outState);
if (_log.shouldLog(Log.INFO))
_log.info("Received new DATA packet from " + state + ": " + packet);
_inbound.receiveData(state, reader.getDataReader());
break;
case UDPPacket.PAYLOAD_TYPE_TEST:
_state = 51;
_testManager.receiveTest(from, reader);
break;
default:
_state = 52;
if (_log.shouldLog(Log.WARN))
_log.warn("Unknown payload type: " + reader.readPayloadType());
_context.statManager().addRateData("udp.droppedInvalidUnknown", packet.getLifetime(), packet.getExpiration());
return;
}
}
}
}

View File

@ -155,12 +155,15 @@ public class PeerState {
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
/*
* 576 gives us 568 IP byes, 548 UDP bytes, and with an SSU data message,
* 502 fragment bytes, which is enough to send a tunnel data message in 2
* packets.
* 596 gives us 588 IP byes, 568 UDP bytes, and with an SSU data message,
* 522 fragment bytes, which is enough to send a tunnel data message in 2
* packets. A tunnel data message sent over the wire is 1044 bytes, meaning
* we need 522 fragment bytes to fit it in 2 packets - add 46 for SSU, 20
* for UDP, and 8 for IP, giving us 596. round up to mod 16, giving a total
* of 608
*/
private static final int DEFAULT_MTU = 1500;
private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY;
private static final int DEFAULT_MTU = 608;//600; //1500;
private static final int MIN_RTO = 1000 + ACKSender.ACK_FREQUENCY;
private static final int MAX_RTO = 2000; // 5000;
public PeerState(I2PAppContext ctx) {

View File

@ -15,12 +15,14 @@ public class UDPEndpoint {
private RouterContext _context;
private Log _log;
private int _listenPort;
private UDPTransport _transport;
private UDPSender _sender;
private UDPReceiver _receiver;
public UDPEndpoint(RouterContext ctx, int listenPort) throws SocketException {
public UDPEndpoint(RouterContext ctx, UDPTransport transport, int listenPort) throws SocketException {
_context = ctx;
_log = ctx.logManager().getLog(UDPEndpoint.class);
_transport = transport;
_listenPort = listenPort;
}
@ -32,7 +34,7 @@ public class UDPEndpoint {
try {
DatagramSocket socket = new DatagramSocket(_listenPort);
_sender = new UDPSender(_context, socket, "UDPSend on " + _listenPort);
_receiver = new UDPReceiver(_context, socket, "UDPReceive on " + _listenPort);
_receiver = new UDPReceiver(_context, _transport, socket, "UDPReceive on " + _listenPort);
_sender.startup();
_receiver.startup();
} catch (SocketException se) {

View File

@ -36,7 +36,7 @@ public class UDPEndpointTest {
int base = 2000 + _context.random().nextInt(10000);
for (int i = 0; i < numPeers; i++) {
_log.debug("Building " + i);
UDPEndpoint endpoint = new UDPEndpoint(_context, base + i);
UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i);
_endpoints[i] = endpoint;
endpoint.startup();
I2PThread read = new I2PThread(new TestRead(endpoint), "Test read " + i);

View File

@ -28,13 +28,15 @@ public class UDPReceiver {
private List _inboundQueue;
private boolean _keepRunning;
private Runner _runner;
private UDPTransport _transport;
public UDPReceiver(RouterContext ctx, DatagramSocket socket, String name) {
public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) {
_context = ctx;
_log = ctx.logManager().getLog(UDPReceiver.class);
_name = name;
_inboundQueue = new ArrayList(128);
_socket = socket;
_transport = transport;
_runner = new Runner();
_context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -64,8 +66,8 @@ public class UDPReceiver {
return _runner.updateListeningPort(socket, newPort);
}
/** if a packet been sitting in the queue for 2 seconds, drop subsequent packets */
private static final long MAX_QUEUE_PERIOD = 2*1000;
/** if a packet been sitting in the queue for a full second (meaning the handlers are overwhelmed), drop subsequent packets */
private static final long MAX_QUEUE_PERIOD = 1*1000;
private static final float ARTIFICIAL_DROP_PROBABILITY = 0.0f; // 0.02f; // 0.0f;
@ -90,22 +92,38 @@ public class UDPReceiver {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received: " + packet);
boolean rejected = false;
int queueSize = 0;
long headPeriod = 0;
synchronized (_inboundQueue) {
int queueSize = _inboundQueue.size();
queueSize = _inboundQueue.size();
if (queueSize > 0) {
long headPeriod = ((UDPPacket)_inboundQueue.get(0)).getLifetime();
headPeriod = ((UDPPacket)_inboundQueue.get(0)).getLifetime();
if (headPeriod > MAX_QUEUE_PERIOD) {
_context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod);
if (_log.shouldLog(Log.ERROR))
_log.error("Dropping inbound packet with " + queueSize + " queued for " + headPeriod);
rejected = true;
_inboundQueue.notifyAll();
return queueSize;
}
}
_inboundQueue.add(packet);
_inboundQueue.notifyAll();
return queueSize + 1;
if (!rejected) {
_inboundQueue.add(packet);
_inboundQueue.notifyAll();
return queueSize + 1;
}
}
// rejected
_context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod);
if (_log.shouldLog(Log.ERROR)) {
StringBuffer msg = new StringBuffer();
msg.append("Dropping inbound packet with ");
msg.append(queueSize);
msg.append(" queued for ");
msg.append(headPeriod);
if (_transport != null)
msg.append(" packet handlers: ").append(_transport.getPacketHandlerStatus());
_log.error(msg.toString());
}
return queueSize;
}
private class ArtificiallyDelayedReceive implements SimpleTimer.TimedEvent {

View File

@ -199,7 +199,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
if (_endpoint == null) {
try {
_endpoint = new UDPEndpoint(_context, port);
_endpoint = new UDPEndpoint(_context, this, port);
} catch (SocketException se) {
if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "Unable to listen on the UDP port (" + port + ")", se);
@ -450,8 +450,46 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
dropPeer(peer, true);
}
private void dropPeer(PeerState peer, boolean shouldShitlist) {
if (_log.shouldLog(Log.INFO))
_log.info("Dropping remote peer: " + peer + " shitlist? " + shouldShitlist, new Exception("Dropped by"));
if (_log.shouldLog(Log.INFO)) {
long now = _context.clock().now();
StringBuffer buf = new StringBuffer(4096);
long timeSinceSend = now - peer.getLastSendTime();
long timeSinceRecv = now - peer.getLastReceiveTime();
long timeSinceAck = now - peer.getLastACKSend();
buf.append("Dropping remote peer: ").append(peer.toString()).append(" shitlist? ").append(shouldShitlist);
buf.append(" lifetime: ").append(now - peer.getKeyEstablishedTime());
buf.append(" time since send/recv/ack: ").append(timeSinceSend).append(" / ");
buf.append(timeSinceRecv).append(" / ").append(timeSinceAck);
buf.append("Existing peers: \n");
synchronized (_peersByIdent) {
for (Iterator iter = _peersByIdent.keySet().iterator(); iter.hasNext(); ) {
Hash c = (Hash)iter.next();
PeerState p = (PeerState)_peersByIdent.get(c);
if (c.equals(peer.getRemotePeer())) {
if (p != peer) {
buf.append(" SAME PEER, DIFFERENT STATE ");
} else {
buf.append(" same peer, same state ");
}
} else {
buf.append("Peer ").append(p.toString()).append(" ");
}
buf.append(" lifetime: ").append(now - p.getKeyEstablishedTime());
timeSinceSend = now - p.getLastSendTime();
timeSinceRecv = now - p.getLastReceiveTime();
timeSinceAck = now - p.getLastACKSend();
buf.append(" time since send/recv/ack: ").append(timeSinceSend).append(" / ");
buf.append(timeSinceRecv).append(" / ").append(timeSinceAck);
buf.append("\n");
}
}
_log.info(buf.toString(), new Exception("Dropped by"));
}
if (peer.getRemotePeer() != null) {
dropPeerCapacities(peer);
@ -659,6 +697,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
replaceAddress(addr);
}
String getPacketHandlerStatus() {
PacketHandler handler = _handler;
if (handler != null)
return handler.getHandlerStatus();
else
return "";
}
public void failed(OutboundMessageState msg) {
if (msg == null) return;
int consecutive = 0;
@ -708,6 +754,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
synchronized (_peersByIdent) {
peers = new ArrayList(_peersByIdent.values());
}
long offsetTotal = 0;
StringBuffer buf = new StringBuffer(512);
buf.append("<b>UDP connections: ").append(peers.size()).append("</b><br />\n");
@ -748,6 +795,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(" [choked]");
if (peer.getConsecutiveFailedSends() > 0)
buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]");
if (_context.shitlist().isShitlisted(peer.getRemotePeer()))
buf.append(" [shitlisted]");
buf.append("</td>");
buf.append("<td>");
@ -769,6 +818,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("<td>");
buf.append(peer.getClockSkew()/1000);
buf.append("s</td>");
offsetTotal = offsetTotal + peer.getClockSkew();
buf.append("<td>");
buf.append(peer.getSendWindowBytes()/1024);
@ -815,6 +865,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
out.write("</table>\n");
buf.append("<b>Average clock skew, UDP peers:");
if (peers.size() > 0)
buf.append(offsetTotal / peers.size()).append("ms</b><br><br>\n");
else
buf.append("n/a</b><br><br>\n");
out.write(buf.toString());
buf.setLength(0);
}
private static final DecimalFormat _fmt = new DecimalFormat("#,##0.00");
@ -839,6 +898,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public String toString() { return "UDP bid @ " + getLatencyMs(); }
}
private static final int EXPIRE_TIMEOUT = 10*60*1000;
private class ExpirePeerEvent implements SimpleTimer.TimedEvent {
private List _peers;
// toAdd and toRemove are kept separate from _peers so that add and
@ -853,10 +914,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_toRemove = new ArrayList(4);
}
public void timeReached() {
long inactivityCutoff = _context.clock().now() - 10*60*1000;
long inactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT;
for (int i = 0; i < _peers.size(); i++) {
PeerState peer = (PeerState)_peers.get(i);
if (peer.getLastReceiveTime() < inactivityCutoff) {
if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) {
dropPeer(peer, false);
_peers.remove(i);
i--;
@ -865,8 +926,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
synchronized (_toAdd) {
for (int i = 0; i < _toAdd.size(); i++) {
PeerState peer = (PeerState)_toAdd.get(i);
if (!_peers.contains(peer))
_peers.add(peer);
_peers.remove(peer); // in case we are switching peers
_peers.add(peer);
}
_toAdd.clear();
}

View File

@ -75,7 +75,6 @@ public class TunnelParticipant {
}
if ( (_config != null) && (_config.getSendTo() != null) ) {
_config.incrementProcessedMessages();
RouterInfo ri = _nextHopCache;
if (ri == null)
ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
@ -83,6 +82,7 @@ public class TunnelParticipant {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send off to nextHop directly (" + _config.getSendTo().toBase64().substring(0,4)
+ " for " + msg);
_config.incrementProcessedMessages();
send(_config, msg, ri);
} else {
if (_log.shouldLog(Log.WARN))

View File

@ -3,6 +3,7 @@ package net.i2p.router.tunnel.pool;
import net.i2p.data.Certificate;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.RouterIdentity;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.DeliveryInstructions;
@ -19,6 +20,7 @@ import net.i2p.router.message.GarlicMessageBuilder;
import net.i2p.router.message.PayloadGarlicConfig;
import net.i2p.router.message.SendMessageDirectJob;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.router.peermanager.TunnelHistory;
import net.i2p.util.Log;
/**
@ -31,6 +33,7 @@ import net.i2p.util.Log;
public class HandleTunnelCreateMessageJob extends JobImpl {
private Log _log;
private TunnelCreateMessage _request;
private boolean _alreadySearched;
/** job builder to redirect all tunnelCreateMessages through this job type */
static class Builder implements HandlerJobBuilder {
@ -46,14 +49,19 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
super(ctx);
_log = ctx.logManager().getLog(HandleTunnelCreateMessageJob.class);
_request = msg;
_alreadySearched = false;
}
private static final int STATUS_DEFERRED = 10000;
public String getName() { return "Handle tunnel join request"; }
public void runJob() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("handle join request: " + _request);
int status = shouldAccept();
if (status > 0) {
if (status == STATUS_DEFERRED) {
return;
} else if (status > 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("reject(" + status + ") join request: " + _request);
sendRejection(status);
@ -64,7 +72,34 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
}
}
private int shouldAccept() { return getContext().throttle().acceptTunnelRequest(_request); }
private int shouldAccept() {
Hash nextRouter = _request.getNextRouter();
if (nextRouter != null) {
RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(nextRouter);
if (ri == null) {
if (_alreadySearched) // only search once
return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
getContext().netDb().lookupRouterInfo(nextRouter, new DeferredAccept(getContext(), true), new DeferredAccept(getContext(), false), 5*1000);
_alreadySearched = true;
return STATUS_DEFERRED;
}
}
return getContext().throttle().acceptTunnelRequest(_request);
}
private class DeferredAccept extends JobImpl {
private boolean _shouldAccept;
public DeferredAccept(RouterContext ctx, boolean shouldAccept) {
super(ctx);
_shouldAccept = shouldAccept;
}
public void runJob() {
HandleTunnelCreateMessageJob.this.runJob();
}
private static final String NAME_OK = "Deferred netDb accept";
private static final String NAME_REJECT = "Deferred netDb reject";
public String getName() { return _shouldAccept ? NAME_OK : NAME_REJECT; }
}
private void accept() {
byte recvId[] = new byte[4];

View File

@ -163,7 +163,15 @@ public class TunnelPool {
* when selecting tunnels, stick with the same one for a brief
* period to allow batching if we can.
*/
private static final long SELECTION_PERIOD = 500;
private long curPeriod() {
long period = _context.clock().now();
long ms = period % 1000;
if (ms > 500)
period = period - ms + 500;
else
period = period - ms;
return period;
}
/**
* Pull a random tunnel out of the pool. If there are none available but
@ -173,8 +181,7 @@ public class TunnelPool {
*/
public TunnelInfo selectTunnel() { return selectTunnel(true); }
private TunnelInfo selectTunnel(boolean allowRecurseOnFail) {
long period = _context.clock().now();
period -= period % SELECTION_PERIOD;
long period = curPeriod();
synchronized (_tunnels) {
if (_lastSelectionPeriod == period) {
if ( (_lastSelected != null) &&