From 1b0bb5ea19e1dcff735b9d90de335eb9790f6e58 Mon Sep 17 00:00:00 2001 From: jrandom Date: Fri, 29 Apr 2005 06:24:12 +0000 Subject: [PATCH] 2005-04-29 jrandom * Reduce the peer profile stat coallesce overhead by inlining it with the reorganize. * Limit each transport to at most one address (any transport that requires multiple entry points can include those alternatives in the address). udp stuff: * change the UDP transport's style from "udp" to "SSUv1" * keep track of each peer's skew * properly handle session reestablishment over an existing session, rather than requiring both sides to expire first --- history.txt | 8 +++- .../net/i2p/router/RouterThrottleImpl.java | 2 +- .../src/net/i2p/router/RouterVersion.java | 4 +- .../peermanager/EvaluateProfilesJob.java | 16 +------ .../router/peermanager/ProfileOrganizer.java | 15 ++++--- .../transport/CommSystemFacadeImpl.java | 21 ++++++--- .../net/i2p/router/transport/Transport.java | 2 +- .../i2p/router/transport/TransportImpl.java | 29 +++--------- .../router/transport/TransportManager.java | 15 ++++--- .../router/transport/udp/PacketHandler.java | 44 ++++++++++++++----- .../i2p/router/transport/udp/PeerState.java | 4 +- .../router/transport/udp/UDPTransport.java | 30 +++++++------ 12 files changed, 100 insertions(+), 90 deletions(-) diff --git a/history.txt b/history.txt index f1a4144e4a..e79dc3f269 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ -$Id: history.txt,v 1.199 2005/04/25 21:59:23 smeghead Exp $ +$Id: history.txt,v 1.200 2005/04/28 16:54:27 jrandom Exp $ + +2005-04-29 jrandom + * Reduce the peer profile stat coallesce overhead by inlining it with the + reorganize. + * Limit each transport to at most one address (any transport that requires + multiple entry points can include those alternatives in the address). 2005-04-28 jrandom * More fixes for the I2PTunnel "other" interface handling (thanks nelgin!) diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java index 1aad451621..25ffbaa5ce 100644 --- a/router/java/src/net/i2p/router/RouterThrottleImpl.java +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -97,7 +97,7 @@ class RouterThrottleImpl implements RouterThrottle { if (rs != null) r = rs.getRate(10*60*1000); double processTime = (r != null ? r.getAverageValue() : 0); - if (processTime > 1000) { + if (processTime > 2000) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Refusing tunnel request with the job lag of " + lag + "since the 10 minute message processing time is too slow (" + processTime + ")"); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 6d7b27da3d..60d318e5e4 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.190 $ $Date: 2005/04/24 13:42:04 $"; + public final static String ID = "$Revision: 1.191 $ $Date: 2005/04/28 16:54:28 $"; public final static String VERSION = "0.5.0.7"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java b/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java index 97c911b39f..1a5efc1d4c 100644 --- a/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java +++ b/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java @@ -27,21 +27,7 @@ class EvaluateProfilesJob extends JobImpl { public String getName() { return "Evaluate peer profiles"; } public void runJob() { try { - long start = getContext().clock().now(); - Set allPeers = getContext().profileOrganizer().selectAllPeers(); - long afterSelect = getContext().clock().now(); - for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - PeerProfile profile = getContext().profileOrganizer().getProfile(peer); - if (profile != null) - profile.coalesceStats(); - } - long afterCoalesce = getContext().clock().now(); - getContext().profileOrganizer().reorganize(); - long afterReorganize = getContext().clock().now(); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Profiles coalesced and reorganized. total: " + allPeers.size() + ", selectAll: " + (afterSelect-start) + "ms, coalesce: " + (afterCoalesce-afterSelect) + "ms, reorganize: " + (afterReorganize-afterSelect)); + getContext().profileOrganizer().reorganize(true); } catch (Throwable t) { _log.log(Log.CRIT, "Error evaluating profiles", t); } finally { diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java index 3cc6374a08..414aa95f5c 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java @@ -409,17 +409,20 @@ public class ProfileOrganizer { * this method, but the averages are recalculated. * */ - public void reorganize() { + public void reorganize() { reorganize(false); } + public void reorganize(boolean shouldCoalesce) { synchronized (_reorganizeLock) { - Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size()); - allPeers.addAll(_failingPeers.values()); - allPeers.addAll(_notFailingPeers.values()); - allPeers.addAll(_highCapacityPeers.values()); - allPeers.addAll(_fastPeers.values()); + Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size()); + //allPeers.addAll(_failingPeers.values()); + //allPeers.addAll(_notFailingPeers.values()); + //allPeers.addAll(_highCapacityPeers.values()); + //allPeers.addAll(_fastPeers.values()); Set reordered = new TreeSet(_comp); for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) { PeerProfile prof = (PeerProfile)iter.next(); + if (shouldCoalesce) + prof.coalesceStats(); reordered.add(prof); } _strictCapacityOrder = reordered; diff --git a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java index af1dfcde44..f9ff4a89c9 100644 --- a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java +++ b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java @@ -10,8 +10,10 @@ package net.i2p.router.transport; import java.io.IOException; import java.io.Writer; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; @@ -75,17 +77,22 @@ public class CommSystemFacadeImpl extends CommSystemFacade { } public Set createAddresses() { - Set addresses = new HashSet(); - RouterAddress addr = createTCPAddress(); - if (addr != null) - addresses.add(addr); + Map addresses = null; - if (_manager != null) - addresses.addAll(_manager.getAddresses()); + if (_manager != null) + addresses = _manager.getAddresses(); + else + addresses = new HashMap(1); + + if (!addresses.containsKey(TCPTransport.STYLE)) { + RouterAddress addr = createTCPAddress(); + if (addr != null) + addresses.put(TCPTransport.STYLE, addr); + } if (_log.shouldLog(Log.INFO)) _log.info("Creating addresses: " + addresses); - return addresses; + return new HashSet(addresses.values()); } private final static String PROP_I2NP_TCP_HOSTNAME = "i2np.tcp.hostname"; diff --git a/router/java/src/net/i2p/router/transport/Transport.java b/router/java/src/net/i2p/router/transport/Transport.java index 9f3ee4e535..c60c03ce4e 100644 --- a/router/java/src/net/i2p/router/transport/Transport.java +++ b/router/java/src/net/i2p/router/transport/Transport.java @@ -33,7 +33,7 @@ public interface Transport { public void send(OutNetMessage msg); public RouterAddress startListening(); public void stopListening(); - public Set getCurrentAddresses(); + public RouterAddress getCurrentAddress(); public void setListener(TransportEventListener listener); public String getStyle(); diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 0da951571a..c24abd3a00 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -35,7 +35,7 @@ import net.i2p.util.Log; public abstract class TransportImpl implements Transport { private Log _log; private TransportEventListener _listener; - private Set _currentAddresses; + private RouterAddress _currentAddress; private List _sendPool; protected RouterContext _context; @@ -55,7 +55,7 @@ public abstract class TransportImpl implements Transport { _context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); _sendPool = new ArrayList(16); - _currentAddresses = new HashSet(); + _currentAddress = null; } /** @@ -334,34 +334,15 @@ public abstract class TransportImpl implements Transport { } /** What addresses are we currently listening to? */ - public Set getCurrentAddresses() { - synchronized (_currentAddresses) { - return new HashSet(_currentAddresses); - } + public RouterAddress getCurrentAddress() { + return _currentAddress; } /** * Replace any existing addresses for the current transport with the given * one. */ protected void replaceAddress(RouterAddress address) { - synchronized (_currentAddresses) { - Set addresses = _currentAddresses; - List toRemove = null; - for (Iterator iter = addresses.iterator(); iter.hasNext(); ) { - RouterAddress cur = (RouterAddress)iter.next(); - if (getStyle().equals(cur.getTransportStyle())) { - if (toRemove == null) - toRemove = new ArrayList(1); - toRemove.add(cur); - } - } - if (toRemove != null) { - for (int i = 0; i < toRemove.size(); i++) { - addresses.remove(toRemove.get(i)); - } - } - _currentAddresses.add(address); - } + _currentAddress = address; } /** Who to notify on message availability */ diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java index 744aaf60c6..76c2080a50 100644 --- a/router/java/src/net/i2p/router/transport/TransportManager.java +++ b/router/java/src/net/i2p/router/transport/TransportManager.java @@ -12,7 +12,9 @@ import java.io.IOException; import java.io.Writer; import java.util.ArrayList; import java.util.Iterator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import net.i2p.data.Hash; import net.i2p.data.RouterAddress; @@ -109,11 +111,12 @@ public class TransportManager implements TransportEventListener { return peers; } - List getAddresses() { - List rv = new ArrayList(_transports.size()); + Map getAddresses() { + Map rv = new HashMap(_transports.size()); for (int i = 0; i < _transports.size(); i++) { Transport t = (Transport)_transports.get(i); - rv.addAll(t.getCurrentAddresses()); + if (t.getCurrentAddress() != null) + rv.put(t.getStyle(), t.getCurrentAddress()); } return rv; } @@ -178,10 +181,8 @@ public class TransportManager implements TransportEventListener { buf.append("Listening on:
\n");
         for (int i = 0; i < _transports.size(); i++) {
             Transport t = (Transport)_transports.get(i);
-            for (Iterator iter = t.getCurrentAddresses().iterator(); iter.hasNext(); ) {
-                RouterAddress addr = (RouterAddress)iter.next();
-                buf.append(addr.toString()).append("\n\n");
-            }   
+            if (t.getCurrentAddress() != null)
+                buf.append(t.getCurrentAddress()).append("\n\n");
         }
         buf.append("
\n"); out.write(buf.toString()); diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index d6c3eda8c9..0380df72dd 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -141,8 +141,17 @@ public class PacketHandler { _log.info("Validation with existing con failed, but validation as reestablish/stray passed"); packet.decrypt(_transport.getIntroKey()); } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP"); + InetAddress remAddr = packet.getPacket().getAddress(); + int remPort = packet.getPacket().getPort(); + InboundEstablishState est = _establisher.getInboundState(remAddr, remPort); + if (est != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Packet from an existing peer IS for an inbound establishment"); + receivePacket(reader, packet, est, false); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP"); + } return; } } else { @@ -171,6 +180,12 @@ public class PacketHandler { } private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state) { + receivePacket(reader, packet, state, true); + } + /** + * @param allowFallback if it isn't valid for this establishment state, try as a non-establishment packet + */ + private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state, boolean allowFallback) { if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { StringBuffer buf = new StringBuffer(128); buf.append("Attempting to receive a packet on a known inbound state: "); @@ -195,9 +210,11 @@ public class PacketHandler { } } - // ok, we couldn't handle it with the established stuff, so fall back - // on earlier state packets - receivePacket(reader, packet); + if (allowFallback) { + // ok, we couldn't handle it with the established stuff, so fall back + // on earlier state packets + receivePacket(reader, packet); + } } private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) { @@ -241,27 +258,30 @@ public class PacketHandler { receivePacket(reader, packet); } - /** let packets be up to 30s slow */ - private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000; + /** let packets be up to 5s slow */ + private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 5*1000; /** * Parse out the interesting bits and honor what it says */ private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) { reader.initialize(packet); - long now = _context.clock().now(); - long when = reader.readTimestamp() * 1000; - long skew = now - when; + long recvOn = packet.getBegin(); + long sendOn = reader.readTimestamp() * 1000; + long skew = recvOn - sendOn; if (skew > GRACE_PERIOD) { if (_log.shouldLog(Log.WARN)) - _log.warn("Packet too far in the future: " + new Date(when) + ": " + packet); + _log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet); return; } else if (skew < 0 - GRACE_PERIOD) { if (_log.shouldLog(Log.WARN)) - _log.warn("Packet too far in the past: " + new Date(when) + ": " + packet); + _log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet); return; } + if (state != null) + state.adjustClockSkew((short)skew); + _context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime()); InetAddress fromHost = packet.getPacket().getAddress(); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 984f6d0cb5..5c4e55b574 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -300,7 +300,9 @@ public class PeerState { /** when were the current cipher and MAC keys established/rekeyed? */ public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; } /** how far off is the remote peer from our clock, in seconds? */ - public void setClockSkew(short skew) { _clockSkew = skew; } + public void adjustClockSkew(short skew) { + _clockSkew = (short)(0.9*(float)_clockSkew + 0.1*(float)skew); + } /** what is the current receive second, for congestion control? */ public void setCurrentReceiveSecond(long sec) { _currentReceiveSecond = sec; } /** when did we last send them a packet? */ diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 2ebff7471f..6536a23755 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -68,7 +68,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority /** shared slow bid for unconnected peers */ private TransportBid _slowBid; - public static final String STYLE = "udp"; + public static final String STYLE = "SSUv1"; public static final String PROP_INTERNAL_PORT = "i2np.udp.internalPort"; /** define this to explicitly set an external IP address */ @@ -531,11 +531,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority StringBuffer buf = new StringBuffer(512); buf.append("UDP connections: ").append(peers.size()).append("
\n"); buf.append("\n"); - buf.append(" \n"); - buf.append(" \n"); - buf.append(" \n"); + buf.append(" \n"); + buf.append(" \n"); + buf.append(" \n"); buf.append(" \n"); - buf.append(" \n"); + buf.append(" \n"); buf.append(" \n"); out.write(buf.toString()); buf.setLength(0); @@ -547,11 +547,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(""); - buf.append(""); - buf.append(""); - buf.append(""); buf.append(""); + + buf.append(""); buf.append("
PeerLocationLast sendLast recvLifetimecwndssthresh
peeractivity (in/out)uptimeskewcwndssthreshrttdevrtoSentReceivedsendrecv
"); + buf.append(""); + buf.append(""); - - buf.append(""); + buf.append("\">"); byte ip[] = peer.getRemoteIP(); for (int j = 0; j < ip.length; j++) { if (ip[j] < 0) @@ -562,19 +561,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append('.'); } buf.append(':').append(peer.getRemotePort()); + buf.append(""); + if (peer.getConsecutiveFailedSends() > 0) + buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]"); buf.append(""); - buf.append(DataHelper.formatDuration(now-peer.getLastSendTime())); - buf.append(""); buf.append(DataHelper.formatDuration(now-peer.getLastReceiveTime())); + buf.append("/"); + buf.append(DataHelper.formatDuration(now-peer.getLastSendTime())); buf.append(""); buf.append(DataHelper.formatDuration(now-peer.getKeyEstablishedTime())); buf.append(""); + buf.append(peer.getClockSkew()/1000); + buf.append("s"); buf.append(peer.getSendWindowBytes()/1024);