diff --git a/history.txt b/history.txt
index f1a4144e4a..e79dc3f269 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,10 @@
-$Id: history.txt,v 1.199 2005/04/25 21:59:23 smeghead Exp $
+$Id: history.txt,v 1.200 2005/04/28 16:54:27 jrandom Exp $
+
+2005-04-29 jrandom
+ * Reduce the peer profile stat coallesce overhead by inlining it with the
+ reorganize.
+ * Limit each transport to at most one address (any transport that requires
+ multiple entry points can include those alternatives in the address).
2005-04-28 jrandom
* More fixes for the I2PTunnel "other" interface handling (thanks nelgin!)
diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java
index 1aad451621..25ffbaa5ce 100644
--- a/router/java/src/net/i2p/router/RouterThrottleImpl.java
+++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java
@@ -97,7 +97,7 @@ class RouterThrottleImpl implements RouterThrottle {
if (rs != null)
r = rs.getRate(10*60*1000);
double processTime = (r != null ? r.getAverageValue() : 0);
- if (processTime > 1000) {
+ if (processTime > 2000) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Refusing tunnel request with the job lag of " + lag
+ "since the 10 minute message processing time is too slow (" + processTime + ")");
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index 6d7b27da3d..60d318e5e4 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
- public final static String ID = "$Revision: 1.190 $ $Date: 2005/04/24 13:42:04 $";
+ public final static String ID = "$Revision: 1.191 $ $Date: 2005/04/28 16:54:28 $";
public final static String VERSION = "0.5.0.7";
- public final static long BUILD = 2;
+ public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);
diff --git a/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java b/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java
index 97c911b39f..1a5efc1d4c 100644
--- a/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java
+++ b/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java
@@ -27,21 +27,7 @@ class EvaluateProfilesJob extends JobImpl {
public String getName() { return "Evaluate peer profiles"; }
public void runJob() {
try {
- long start = getContext().clock().now();
- Set allPeers = getContext().profileOrganizer().selectAllPeers();
- long afterSelect = getContext().clock().now();
- for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
- Hash peer = (Hash)iter.next();
- PeerProfile profile = getContext().profileOrganizer().getProfile(peer);
- if (profile != null)
- profile.coalesceStats();
- }
- long afterCoalesce = getContext().clock().now();
- getContext().profileOrganizer().reorganize();
- long afterReorganize = getContext().clock().now();
-
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Profiles coalesced and reorganized. total: " + allPeers.size() + ", selectAll: " + (afterSelect-start) + "ms, coalesce: " + (afterCoalesce-afterSelect) + "ms, reorganize: " + (afterReorganize-afterSelect));
+ getContext().profileOrganizer().reorganize(true);
} catch (Throwable t) {
_log.log(Log.CRIT, "Error evaluating profiles", t);
} finally {
diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java
index 3cc6374a08..414aa95f5c 100644
--- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java
+++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java
@@ -409,17 +409,20 @@ public class ProfileOrganizer {
* this method, but the averages are recalculated.
*
*/
- public void reorganize() {
+ public void reorganize() { reorganize(false); }
+ public void reorganize(boolean shouldCoalesce) {
synchronized (_reorganizeLock) {
- Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
- allPeers.addAll(_failingPeers.values());
- allPeers.addAll(_notFailingPeers.values());
- allPeers.addAll(_highCapacityPeers.values());
- allPeers.addAll(_fastPeers.values());
+ Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
+ //allPeers.addAll(_failingPeers.values());
+ //allPeers.addAll(_notFailingPeers.values());
+ //allPeers.addAll(_highCapacityPeers.values());
+ //allPeers.addAll(_fastPeers.values());
Set reordered = new TreeSet(_comp);
for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile prof = (PeerProfile)iter.next();
+ if (shouldCoalesce)
+ prof.coalesceStats();
reordered.add(prof);
}
_strictCapacityOrder = reordered;
diff --git a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java
index af1dfcde44..f9ff4a89c9 100644
--- a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java
+++ b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java
@@ -10,8 +10,10 @@ package net.i2p.router.transport;
import java.io.IOException;
import java.io.Writer;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -75,17 +77,22 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
}
public Set createAddresses() {
- Set addresses = new HashSet();
- RouterAddress addr = createTCPAddress();
- if (addr != null)
- addresses.add(addr);
+ Map addresses = null;
- if (_manager != null)
- addresses.addAll(_manager.getAddresses());
+ if (_manager != null)
+ addresses = _manager.getAddresses();
+ else
+ addresses = new HashMap(1);
+
+ if (!addresses.containsKey(TCPTransport.STYLE)) {
+ RouterAddress addr = createTCPAddress();
+ if (addr != null)
+ addresses.put(TCPTransport.STYLE, addr);
+ }
if (_log.shouldLog(Log.INFO))
_log.info("Creating addresses: " + addresses);
- return addresses;
+ return new HashSet(addresses.values());
}
private final static String PROP_I2NP_TCP_HOSTNAME = "i2np.tcp.hostname";
diff --git a/router/java/src/net/i2p/router/transport/Transport.java b/router/java/src/net/i2p/router/transport/Transport.java
index 9f3ee4e535..c60c03ce4e 100644
--- a/router/java/src/net/i2p/router/transport/Transport.java
+++ b/router/java/src/net/i2p/router/transport/Transport.java
@@ -33,7 +33,7 @@ public interface Transport {
public void send(OutNetMessage msg);
public RouterAddress startListening();
public void stopListening();
- public Set getCurrentAddresses();
+ public RouterAddress getCurrentAddress();
public void setListener(TransportEventListener listener);
public String getStyle();
diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java
index 0da951571a..c24abd3a00 100644
--- a/router/java/src/net/i2p/router/transport/TransportImpl.java
+++ b/router/java/src/net/i2p/router/transport/TransportImpl.java
@@ -35,7 +35,7 @@ import net.i2p.util.Log;
public abstract class TransportImpl implements Transport {
private Log _log;
private TransportEventListener _listener;
- private Set _currentAddresses;
+ private RouterAddress _currentAddress;
private List _sendPool;
protected RouterContext _context;
@@ -55,7 +55,7 @@ public abstract class TransportImpl implements Transport {
_context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
_sendPool = new ArrayList(16);
- _currentAddresses = new HashSet();
+ _currentAddress = null;
}
/**
@@ -334,34 +334,15 @@ public abstract class TransportImpl implements Transport {
}
/** What addresses are we currently listening to? */
- public Set getCurrentAddresses() {
- synchronized (_currentAddresses) {
- return new HashSet(_currentAddresses);
- }
+ public RouterAddress getCurrentAddress() {
+ return _currentAddress;
}
/**
* Replace any existing addresses for the current transport with the given
* one.
*/
protected void replaceAddress(RouterAddress address) {
- synchronized (_currentAddresses) {
- Set addresses = _currentAddresses;
- List toRemove = null;
- for (Iterator iter = addresses.iterator(); iter.hasNext(); ) {
- RouterAddress cur = (RouterAddress)iter.next();
- if (getStyle().equals(cur.getTransportStyle())) {
- if (toRemove == null)
- toRemove = new ArrayList(1);
- toRemove.add(cur);
- }
- }
- if (toRemove != null) {
- for (int i = 0; i < toRemove.size(); i++) {
- addresses.remove(toRemove.get(i));
- }
- }
- _currentAddresses.add(address);
- }
+ _currentAddress = address;
}
/** Who to notify on message availability */
diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java
index 744aaf60c6..76c2080a50 100644
--- a/router/java/src/net/i2p/router/transport/TransportManager.java
+++ b/router/java/src/net/i2p/router/transport/TransportManager.java
@@ -12,7 +12,9 @@ import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
@@ -109,11 +111,12 @@ public class TransportManager implements TransportEventListener {
return peers;
}
- List getAddresses() {
- List rv = new ArrayList(_transports.size());
+ Map getAddresses() {
+ Map rv = new HashMap(_transports.size());
for (int i = 0; i < _transports.size(); i++) {
Transport t = (Transport)_transports.get(i);
- rv.addAll(t.getCurrentAddresses());
+ if (t.getCurrentAddress() != null)
+ rv.put(t.getStyle(), t.getCurrentAddress());
}
return rv;
}
@@ -178,10 +181,8 @@ public class TransportManager implements TransportEventListener {
buf.append("Listening on:
\n"); for (int i = 0; i < _transports.size(); i++) { Transport t = (Transport)_transports.get(i); - for (Iterator iter = t.getCurrentAddresses().iterator(); iter.hasNext(); ) { - RouterAddress addr = (RouterAddress)iter.next(); - buf.append(addr.toString()).append("\n\n"); - } + if (t.getCurrentAddress() != null) + buf.append(t.getCurrentAddress()).append("\n\n"); } buf.append("\n"); out.write(buf.toString()); diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index d6c3eda8c9..0380df72dd 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -141,8 +141,17 @@ public class PacketHandler { _log.info("Validation with existing con failed, but validation as reestablish/stray passed"); packet.decrypt(_transport.getIntroKey()); } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP"); + InetAddress remAddr = packet.getPacket().getAddress(); + int remPort = packet.getPacket().getPort(); + InboundEstablishState est = _establisher.getInboundState(remAddr, remPort); + if (est != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Packet from an existing peer IS for an inbound establishment"); + receivePacket(reader, packet, est, false); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP"); + } return; } } else { @@ -171,6 +180,12 @@ public class PacketHandler { } private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state) { + receivePacket(reader, packet, state, true); + } + /** + * @param allowFallback if it isn't valid for this establishment state, try as a non-establishment packet + */ + private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state, boolean allowFallback) { if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { StringBuffer buf = new StringBuffer(128); buf.append("Attempting to receive a packet on a known inbound state: "); @@ -195,9 +210,11 @@ public class PacketHandler { } } - // ok, we couldn't handle it with the established stuff, so fall back - // on earlier state packets - receivePacket(reader, packet); + if (allowFallback) { + // ok, we couldn't handle it with the established stuff, so fall back + // on earlier state packets + receivePacket(reader, packet); + } } private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) { @@ -241,27 +258,30 @@ public class PacketHandler { receivePacket(reader, packet); } - /** let packets be up to 30s slow */ - private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000; + /** let packets be up to 5s slow */ + private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 5*1000; /** * Parse out the interesting bits and honor what it says */ private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) { reader.initialize(packet); - long now = _context.clock().now(); - long when = reader.readTimestamp() * 1000; - long skew = now - when; + long recvOn = packet.getBegin(); + long sendOn = reader.readTimestamp() * 1000; + long skew = recvOn - sendOn; if (skew > GRACE_PERIOD) { if (_log.shouldLog(Log.WARN)) - _log.warn("Packet too far in the future: " + new Date(when) + ": " + packet); + _log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet); return; } else if (skew < 0 - GRACE_PERIOD) { if (_log.shouldLog(Log.WARN)) - _log.warn("Packet too far in the past: " + new Date(when) + ": " + packet); + _log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet); return; } + if (state != null) + state.adjustClockSkew((short)skew); + _context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime()); InetAddress fromHost = packet.getPacket().getAddress(); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 984f6d0cb5..5c4e55b574 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -300,7 +300,9 @@ public class PeerState { /** when were the current cipher and MAC keys established/rekeyed? */ public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; } /** how far off is the remote peer from our clock, in seconds? */ - public void setClockSkew(short skew) { _clockSkew = skew; } + public void adjustClockSkew(short skew) { + _clockSkew = (short)(0.9*(float)_clockSkew + 0.1*(float)skew); + } /** what is the current receive second, for congestion control? */ public void setCurrentReceiveSecond(long sec) { _currentReceiveSecond = sec; } /** when did we last send them a packet? */ diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 2ebff7471f..6536a23755 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -68,7 +68,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority /** shared slow bid for unconnected peers */ private TransportBid _slowBid; - public static final String STYLE = "udp"; + public static final String STYLE = "SSUv1"; public static final String PROP_INTERNAL_PORT = "i2np.udp.internalPort"; /** define this to explicitly set an external IP address */ @@ -531,11 +531,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority StringBuffer buf = new StringBuffer(512); buf.append("UDP connections: ").append(peers.size()).append("
Peer | Location | \n"); - buf.append("Last send | Last recv | \n"); - buf.append("Lifetime | cwnd | ssthresh | \n"); + buf.append("||||||
peer | activity (in/out) | \n"); + buf.append("uptime | skew | \n"); + buf.append("cwnd | ssthresh | \n"); buf.append("rtt | dev | rto | \n"); - buf.append("Sent | Received | \n"); + buf.append("send | recv | \n"); buf.append("
"); + buf.append(" | "); + buf.append(""); - - buf.append(" | "); + buf.append("\">"); byte ip[] = peer.getRemoteIP(); for (int j = 0; j < ip.length; j++) { if (ip[j] < 0) @@ -562,19 +561,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append('.'); } buf.append(':').append(peer.getRemotePort()); + buf.append(""); + if (peer.getConsecutiveFailedSends() > 0) + buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]"); buf.append(" | "); - buf.append(""); - buf.append(DataHelper.formatDuration(now-peer.getLastSendTime())); - buf.append(" | "); - buf.append(""); buf.append(DataHelper.formatDuration(now-peer.getLastReceiveTime())); + buf.append("/"); + buf.append(DataHelper.formatDuration(now-peer.getLastSendTime())); buf.append(" | "); buf.append(""); buf.append(DataHelper.formatDuration(now-peer.getKeyEstablishedTime())); buf.append(" | "); + + buf.append(""); + buf.append(peer.getClockSkew()/1000); + buf.append("s | "); buf.append(""); buf.append(peer.getSendWindowBytes()/1024); |