- Try to pick better introducers by checking shitlist,
        wasUnreachable list, failing list, and idle times
      - To keep introducer connections up and valid,
        periodically send a "ping" (a data packet with no data and no acks)
        to everybody that has been an introducer in the last two hours
      - Add a stat udp.receiveRelayRequestBadTag, make udp.receiveRelayRequest only for good ones
      - Remove some 60s and 5m stats, leave only the 10m ones
      - Narrow the range for the retransmit time after an allocation fail
      - Adjust some logging
This commit is contained in:
zzz
2008-07-07 14:18:38 +00:00
parent e173a47e01
commit 5228543236
9 changed files with 110 additions and 27 deletions

View File

@ -1,3 +1,31 @@
2008-07-07 zzz
* i2psnark:
- Repair corrupted files with wrong length rather than die
- Register shutdown hook to properly shutdown torrents when
the router shuts down, hopefully will reduce corruption
- Add Galen tracker
- Add a note about how to chane directory
* HTTP Proxy: Don't show jump links for unknown jump hosts
* KeyManager:
- Don't write router key backup when leaseSet keys are updated
- Synchronize to prevent concurrent writes (thanks Galen!)
- Backup keys every 7 days instead of every 5 minutes
* LoadTestManager: Don't instantiate, it's disabled
* Router console: Flag placeholder pages as noncacheable
* Streaming lib:
- Change some logging from WARN to INFO
- Clean up toString()
* SSU:
- Try to pick better introducers by checking shitlist,
wasUnreachable list, failing list, and idle times
- To keep introducer connections up and valid,
periodically send a "ping" (a data packet with no data and no acks)
to everybody that has been an introducer in the last two hours
- Add a stat udp.receiveRelayRequestBadTag, make udp.receiveRelayRequest only for good ones
- Remove some 60s and 5m stats, leave only the 10m ones
- Narrow the range for the retransmit time after an allocation fail
- Adjust some logging
2008-06-30 zzz 2008-06-30 zzz
* configstats.jsp: Fix NPE when no stats checked (thanks nothome27!) * configstats.jsp: Fix NPE when no stats checked (thanks nothome27!)
* i2psnark: * i2psnark:

View File

@ -17,7 +17,7 @@ import net.i2p.CoreVersion;
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $";
public final static String VERSION = "0.6.2"; public final static String VERSION = "0.6.2";
public final static long BUILD = 7; public final static long BUILD = 8;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -90,8 +90,8 @@ public class EstablishmentManager {
InboundEstablishState getInboundState(RemoteHostId from) { InboundEstablishState getInboundState(RemoteHostId from) {
synchronized (_inboundStates) { synchronized (_inboundStates) {
InboundEstablishState state = (InboundEstablishState)_inboundStates.get(from); InboundEstablishState state = (InboundEstablishState)_inboundStates.get(from);
if ( (state == null) && (_log.shouldLog(Log.DEBUG)) ) // if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("No inbound states for " + from + ", with remaining: " + _inboundStates); // _log.debug("No inbound states for " + from + ", with remaining: " + _inboundStates);
return state; return state;
} }
} }
@ -99,8 +99,8 @@ public class EstablishmentManager {
OutboundEstablishState getOutboundState(RemoteHostId from) { OutboundEstablishState getOutboundState(RemoteHostId from) {
synchronized (_outboundStates) { synchronized (_outboundStates) {
OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(from); OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(from);
if ( (state == null) && (_log.shouldLog(Log.DEBUG)) ) // if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("No outbound states for " + from + ", with remaining: " + _outboundStates); // _log.debug("No outbound states for " + from + ", with remaining: " + _outboundStates);
return state; return state;
} }
} }
@ -654,6 +654,8 @@ public class EstablishmentManager {
} }
} }
if (removed != null) { if (removed != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send intro for " + _state.getRemoteHostId().toString() + " timed out");
_context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0); _context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0);
notifyActivity(); notifyActivity();
} }
@ -759,8 +761,8 @@ public class EstablishmentManager {
if (cur.getNextSendTime() <= now) { if (cur.getNextSendTime() <= now) {
// our turn... // our turn...
inboundState = cur; inboundState = cur;
if (_log.shouldLog(Log.DEBUG)) // if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing inbound that wanted activity"); // _log.debug("Processing inbound that wanted activity");
break; break;
} else { } else {
// nothin to do but wait for them to send us // nothin to do but wait for them to send us
@ -856,8 +858,8 @@ public class EstablishmentManager {
if (cur.getNextSendTime() <= now) { if (cur.getNextSendTime() <= now) {
// our turn... // our turn...
outboundState = cur; outboundState = cur;
if (_log.shouldLog(Log.DEBUG)) // if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound wants activity: " + cur); // _log.debug("Outbound wants activity: " + cur);
break; break;
} else { } else {
// nothin to do but wait for them to send us // nothin to do but wait for them to send us
@ -871,8 +873,8 @@ public class EstablishmentManager {
} }
if ( (nextSendTime <= 0) || (when < nextSendTime) ) if ( (nextSendTime <= 0) || (when < nextSendTime) )
nextSendTime = when; nextSendTime = when;
if (_log.shouldLog(Log.DEBUG)) // if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound doesn't want activity: " + cur + " (next=" + (when-now) + ")"); // _log.debug("Outbound doesn't want activity: " + cur + " (next=" + (when-now) + ")");
} }
} }
} }
@ -1017,9 +1019,9 @@ public class EstablishmentManager {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
interrupted = true; interrupted = true;
} }
if (_log.shouldLog(Log.DEBUG)) // if (_log.shouldLog(Log.DEBUG))
_log.debug("After waiting w/ nextSend=" + nextSendTime // _log.debug("After waiting w/ nextSend=" + nextSendTime
+ " and delay=" + delay + " and interrupted=" + interrupted); // + " and delay=" + delay + " and interrupted=" + interrupted);
} }
} }
} }

View File

@ -29,8 +29,9 @@ public class IntroductionManager {
_builder = new PacketBuilder(ctx, transport); _builder = new PacketBuilder(ctx, transport);
_outbound = Collections.synchronizedMap(new HashMap(128)); _outbound = Collections.synchronizedMap(new HashMap(128));
_inbound = new ArrayList(128); _inbound = new ArrayList(128);
ctx.statManager().createRateStat("udp.receiveRelayIntro", "How often we get a relayed request for us to talk to someone?", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000 }); ctx.statManager().createRateStat("udp.receiveRelayIntro", "How often we get a relayed request for us to talk to someone?", "udp", new long[] { 10*60*1000 });
ctx.statManager().createRateStat("udp.receiveRelayRequest", "How often we receive a request to relay to someone else?", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000 }); ctx.statManager().createRateStat("udp.receiveRelayRequest", "How often we receive a good request to relay to someone else?", "udp", new long[] { 10*60*1000 });
ctx.statManager().createRateStat("udp.receiveRelayRequestBadTag", "Received relay requests with bad/expired tag", "udp", new long[] { 10*60*1000 });
} }
public void reset() { public void reset() {
@ -77,19 +78,25 @@ public class IntroductionManager {
* The picked peers have their info tacked on to the ssuOptions parameter for * The picked peers have their info tacked on to the ssuOptions parameter for
* use in the SSU RouterAddress. * use in the SSU RouterAddress.
* *
* Try to use "good" peers (i.e. reachable, active)
*
* Also, ping all idle peers that were introducers in the last 2 hours,
* to keep the connection up, since the netDb can have quite stale information,
* and we want to keep our introducers valid.
*/ */
public int pickInbound(Properties ssuOptions, int howMany) { public int pickInbound(Properties ssuOptions, int howMany) {
List peers = null; List peers = null;
int start = _context.random().nextInt(Integer.MAX_VALUE); int start = _context.random().nextInt(Integer.MAX_VALUE);
synchronized (_inbound) { synchronized (_inbound) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Picking inbound out of " + _inbound); _log.debug("Picking inbound out of " + _inbound.size());
if (_inbound.size() <= 0) return 0; if (_inbound.size() <= 0) return 0;
peers = new ArrayList(_inbound); peers = new ArrayList(_inbound);
} }
int sz = peers.size(); int sz = peers.size();
start = start % sz; start = start % sz;
int found = 0; int found = 0;
long inactivityCutoff = _context.clock().now() - (UDPTransport.EXPIRE_TIMEOUT / 2);
for (int i = 0; i < sz && found < howMany; i++) { for (int i = 0; i < sz && found < howMany; i++) {
PeerState cur = (PeerState)peers.get((start + i) % sz); PeerState cur = (PeerState)peers.get((start + i) % sz);
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(cur.getRemotePeer()); RouterInfo ri = _context.netDb().lookupRouterInfoLocally(cur.getRemotePeer());
@ -104,6 +111,22 @@ public class IntroductionManager {
_log.info("Picked peer has no SSU address: " + ri); _log.info("Picked peer has no SSU address: " + ri);
continue; continue;
} }
if (_context.profileOrganizer().isFailing(cur.getRemotePeer()) ||
_context.shitlist().isShitlisted(cur.getRemotePeer()) ||
_transport.wasUnreachable(cur.getRemotePeer())) {
if (_log.shouldLog(Log.INFO))
_log.info("Peer is failing, shistlisted or was unreachable: " + cur);
continue;
}
// Try to pick active peers...
if (cur.getLastReceiveTime() < inactivityCutoff || cur.getLastSendTime() < inactivityCutoff) {
if (_log.shouldLog(Log.INFO))
_log.info("Peer is idle too long: " + cur);
continue;
}
if (_log.shouldLog(Log.INFO))
_log.info("Picking introducer: " + cur);
cur.setIntroducerTime();
UDPAddress ura = new UDPAddress(ra); UDPAddress ura = new UDPAddress(ra);
ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, cur.getRemoteHostId().toHostString()); ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, cur.getRemoteHostId().toHostString());
ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(cur.getRemotePort())); ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(cur.getRemotePort()));
@ -111,6 +134,21 @@ public class IntroductionManager {
ssuOptions.setProperty(UDPAddress.PROP_INTRO_TAG_PREFIX + found, String.valueOf(cur.getTheyRelayToUsAs())); ssuOptions.setProperty(UDPAddress.PROP_INTRO_TAG_PREFIX + found, String.valueOf(cur.getTheyRelayToUsAs()));
found++; found++;
} }
// Try to keep the connection up for two hours after we made anybody an introducer
long pingCutoff = _context.clock().now() - (2 * 60 * 60 * 1000);
inactivityCutoff = _context.clock().now() - (UDPTransport.EXPIRE_TIMEOUT / 4);
for (int i = 0; i < sz; i++) {
PeerState cur = (PeerState)peers.get(i);
if (cur.getIntroducerTime() > pingCutoff &&
cur.getLastSendTime() < inactivityCutoff) {
if (_log.shouldLog(Log.INFO))
_log.info("Pinging introducer: " + cur);
cur.setLastSendTime(_context.clock().now());
_transport.send(_builder.buildPing(cur));
}
}
return found; return found;
} }
@ -124,7 +162,6 @@ public class IntroductionManager {
} }
public void receiveRelayRequest(RemoteHostId alice, UDPPacketReader reader) { public void receiveRelayRequest(RemoteHostId alice, UDPPacketReader reader) {
_context.statManager().addRateData("udp.receiveRelayRequest", 1, 0);
if (_context.router().isHidden()) if (_context.router().isHidden())
return; return;
long tag = reader.getRelayRequestReader().readTag(); long tag = reader.getRelayRequestReader().readTag();
@ -133,8 +170,11 @@ public class IntroductionManager {
_log.info("Receive relay request from " + alice _log.info("Receive relay request from " + alice
+ " for tag " + tag + " for tag " + tag
+ " and relaying with " + charlie); + " and relaying with " + charlie);
if (charlie == null) if (charlie == null) {
_context.statManager().addRateData("udp.receiveRelayRequestBadTag", 1, 0);
return; return;
}
_context.statManager().addRateData("udp.receiveRelayRequest", 1, 0);
byte key[] = new byte[SessionKey.KEYSIZE_BYTES]; byte key[] = new byte[SessionKey.KEYSIZE_BYTES];
reader.getRelayRequestReader().readAliceIntroKey(key, 0); reader.getRelayRequestReader().readAliceIntroKey(key, 0);
SessionKey aliceIntroKey = new SessionKey(key); SessionKey aliceIntroKey = new SessionKey(key);

View File

@ -384,8 +384,8 @@ public class OutboundMessageFragments {
_packetsRetransmitted += toSend; // lifetime for the transport _packetsRetransmitted += toSend; // lifetime for the transport
_context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted()); _context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted());
_context.statManager().addRateData("udp.packetsRetransmitted", state.getLifetime(), peer.getPacketsTransmitted()); _context.statManager().addRateData("udp.packetsRetransmitted", state.getLifetime(), peer.getPacketsTransmitted());
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn("Retransmitting " + state + " to " + peer); _log.info("Retransmitting " + state + " to " + peer);
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), toSend); _context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), toSend);
} }
return rv; return rv;

View File

@ -3,6 +3,7 @@ package net.i2p.router.transport.udp;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Arrays; import java.util.Arrays;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -204,6 +205,12 @@ public class PacketBuilder {
return packet; return packet;
} }
// We use this for keepalive purposes.
// It doesn't generate a reply, but that's ok.
public UDPPacket buildPing(PeerState peer) {
return buildACK(peer, new ArrayList(0));
}
private static final int ACK_PRIORITY = 1; private static final int ACK_PRIORITY = 1;
/** /**

View File

@ -422,12 +422,12 @@ public class PacketHandler {
long skew = recvOn - sendOn; long skew = recvOn - sendOn;
if (skew > GRACE_PERIOD) { if (skew > GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet); _log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidSkew", skew, packet.getExpiration()); _context.statManager().addRateData("udp.droppedInvalidSkew", skew, packet.getExpiration());
return; return;
} else if (skew < 0 - GRACE_PERIOD) { } else if (skew < 0 - GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet); _log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidSkew", 0-skew, packet.getExpiration()); _context.statManager().addRateData("udp.droppedInvalidSkew", 0-skew, packet.getExpiration());
return; return;
} }

View File

@ -192,6 +192,8 @@ public class PeerState {
private volatile int _consecutiveRejections = 0; private volatile int _consecutiveRejections = 0;
/** is it inbound? **/ /** is it inbound? **/
private boolean _isInbound; private boolean _isInbound;
/** Last time it was made an introducer **/
private long _lastIntroducerTime;
private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024; private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
@ -271,6 +273,7 @@ public class PeerState {
_outboundMessages = new ArrayList(32); _outboundMessages = new ArrayList(32);
_dead = false; _dead = false;
_isInbound = false; _isInbound = false;
_lastIntroducerTime = 0;
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
@ -558,6 +561,8 @@ public class PeerState {
public int getConsecutiveSendRejections() { return _consecutiveRejections; } public int getConsecutiveSendRejections() { return _consecutiveRejections; }
public boolean isInbound() { return _isInbound; } public boolean isInbound() { return _isInbound; }
public void setInbound() { _isInbound = true; } public void setInbound() { _isInbound = true; }
public long getIntroducerTime() { return _lastIntroducerTime; }
public void setIntroducerTime() { _lastIntroducerTime = _context.clock().now(); }
/** we received the message specified completely */ /** we received the message specified completely */
public void messageFullyReceived(Long messageId, int bytes) { messageFullyReceived(messageId, bytes, false); } public void messageFullyReceived(Long messageId, int bytes) { messageFullyReceived(messageId, bytes, false); }
@ -850,8 +855,8 @@ public class PeerState {
recalculateTimeouts(lifetime); recalculateTimeouts(lifetime);
adjustMTU(); adjustMTU();
} }
else if (_log.shouldLog(Log.WARN)) else if (_log.shouldLog(Log.INFO))
_log.warn("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); _log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
_context.statManager().addRateData("udp.sendBps", _sendBps, lifetime); _context.statManager().addRateData("udp.sendBps", _sendBps, lifetime);
} }
@ -1327,7 +1332,8 @@ public class PeerState {
_log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() _log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
+ " available=" + getSendWindowBytesRemaining() + " available=" + getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state); + " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime(now+(_context.random().nextInt(2*ACKSender.ACK_FREQUENCY))); //(now + 1024) & ~SECOND_MASK); state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
_context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); _log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
//_throttle.choke(peer.getRemotePeer()); //_throttle.choke(peer.getRemotePeer());

View File

@ -1934,7 +1934,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public String toString() { return "UDP bid @ " + getLatencyMs(); } public String toString() { return "UDP bid @ " + getLatencyMs(); }
} }
private static final int EXPIRE_TIMEOUT = 30*60*1000; public static final int EXPIRE_TIMEOUT = 30*60*1000;
private class ExpirePeerEvent implements SimpleTimer.TimedEvent { private class ExpirePeerEvent implements SimpleTimer.TimedEvent {
private List _expirePeers; private List _expirePeers;