2005-04-29 jrandom

* Reduce the peer profile stat coallesce overhead by inlining it with the
      reorganize.
    * Limit each transport to at most one address (any transport that requires
      multiple entry points can include those alternatives in the address).
udp stuff:
* change the UDP transport's style from "udp" to "SSUv1"
* keep track of each peer's skew
* properly handle session reestablishment over an existing session, rather
  than requiring both sides to expire first
This commit is contained in:
jrandom
2005-04-29 06:24:12 +00:00
committed by zzz
parent 4ce51261f1
commit 1b0bb5ea19
12 changed files with 100 additions and 90 deletions

View File

@ -1,4 +1,10 @@
$Id: history.txt,v 1.199 2005/04/25 21:59:23 smeghead Exp $
$Id: history.txt,v 1.200 2005/04/28 16:54:27 jrandom Exp $
2005-04-29 jrandom
* Reduce the peer profile stat coallesce overhead by inlining it with the
reorganize.
* Limit each transport to at most one address (any transport that requires
multiple entry points can include those alternatives in the address).
2005-04-28 jrandom
* More fixes for the I2PTunnel "other" interface handling (thanks nelgin!)

View File

@ -97,7 +97,7 @@ class RouterThrottleImpl implements RouterThrottle {
if (rs != null)
r = rs.getRate(10*60*1000);
double processTime = (r != null ? r.getAverageValue() : 0);
if (processTime > 1000) {
if (processTime > 2000) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Refusing tunnel request with the job lag of " + lag
+ "since the 10 minute message processing time is too slow (" + processTime + ")");

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.190 $ $Date: 2005/04/24 13:42:04 $";
public final static String ID = "$Revision: 1.191 $ $Date: 2005/04/28 16:54:28 $";
public final static String VERSION = "0.5.0.7";
public final static long BUILD = 2;
public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -27,21 +27,7 @@ class EvaluateProfilesJob extends JobImpl {
public String getName() { return "Evaluate peer profiles"; }
public void runJob() {
try {
long start = getContext().clock().now();
Set allPeers = getContext().profileOrganizer().selectAllPeers();
long afterSelect = getContext().clock().now();
for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
PeerProfile profile = getContext().profileOrganizer().getProfile(peer);
if (profile != null)
profile.coalesceStats();
}
long afterCoalesce = getContext().clock().now();
getContext().profileOrganizer().reorganize();
long afterReorganize = getContext().clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Profiles coalesced and reorganized. total: " + allPeers.size() + ", selectAll: " + (afterSelect-start) + "ms, coalesce: " + (afterCoalesce-afterSelect) + "ms, reorganize: " + (afterReorganize-afterSelect));
getContext().profileOrganizer().reorganize(true);
} catch (Throwable t) {
_log.log(Log.CRIT, "Error evaluating profiles", t);
} finally {

View File

@ -409,17 +409,20 @@ public class ProfileOrganizer {
* this method, but the averages are recalculated.
*
*/
public void reorganize() {
public void reorganize() { reorganize(false); }
public void reorganize(boolean shouldCoalesce) {
synchronized (_reorganizeLock) {
Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
allPeers.addAll(_failingPeers.values());
allPeers.addAll(_notFailingPeers.values());
allPeers.addAll(_highCapacityPeers.values());
allPeers.addAll(_fastPeers.values());
Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
//allPeers.addAll(_failingPeers.values());
//allPeers.addAll(_notFailingPeers.values());
//allPeers.addAll(_highCapacityPeers.values());
//allPeers.addAll(_fastPeers.values());
Set reordered = new TreeSet(_comp);
for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile prof = (PeerProfile)iter.next();
if (shouldCoalesce)
prof.coalesceStats();
reordered.add(prof);
}
_strictCapacityOrder = reordered;

View File

@ -10,8 +10,10 @@ package net.i2p.router.transport;
import java.io.IOException;
import java.io.Writer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@ -75,17 +77,22 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
}
public Set createAddresses() {
Set addresses = new HashSet();
RouterAddress addr = createTCPAddress();
if (addr != null)
addresses.add(addr);
Map addresses = null;
if (_manager != null)
addresses.addAll(_manager.getAddresses());
if (_manager != null)
addresses = _manager.getAddresses();
else
addresses = new HashMap(1);
if (!addresses.containsKey(TCPTransport.STYLE)) {
RouterAddress addr = createTCPAddress();
if (addr != null)
addresses.put(TCPTransport.STYLE, addr);
}
if (_log.shouldLog(Log.INFO))
_log.info("Creating addresses: " + addresses);
return addresses;
return new HashSet(addresses.values());
}
private final static String PROP_I2NP_TCP_HOSTNAME = "i2np.tcp.hostname";

View File

@ -33,7 +33,7 @@ public interface Transport {
public void send(OutNetMessage msg);
public RouterAddress startListening();
public void stopListening();
public Set getCurrentAddresses();
public RouterAddress getCurrentAddress();
public void setListener(TransportEventListener listener);
public String getStyle();

View File

@ -35,7 +35,7 @@ import net.i2p.util.Log;
public abstract class TransportImpl implements Transport {
private Log _log;
private TransportEventListener _listener;
private Set _currentAddresses;
private RouterAddress _currentAddress;
private List _sendPool;
protected RouterContext _context;
@ -55,7 +55,7 @@ public abstract class TransportImpl implements Transport {
_context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
_sendPool = new ArrayList(16);
_currentAddresses = new HashSet();
_currentAddress = null;
}
/**
@ -334,34 +334,15 @@ public abstract class TransportImpl implements Transport {
}
/** What addresses are we currently listening to? */
public Set getCurrentAddresses() {
synchronized (_currentAddresses) {
return new HashSet(_currentAddresses);
}
public RouterAddress getCurrentAddress() {
return _currentAddress;
}
/**
* Replace any existing addresses for the current transport with the given
* one.
*/
protected void replaceAddress(RouterAddress address) {
synchronized (_currentAddresses) {
Set addresses = _currentAddresses;
List toRemove = null;
for (Iterator iter = addresses.iterator(); iter.hasNext(); ) {
RouterAddress cur = (RouterAddress)iter.next();
if (getStyle().equals(cur.getTransportStyle())) {
if (toRemove == null)
toRemove = new ArrayList(1);
toRemove.add(cur);
}
}
if (toRemove != null) {
for (int i = 0; i < toRemove.size(); i++) {
addresses.remove(toRemove.get(i));
}
}
_currentAddresses.add(address);
}
_currentAddress = address;
}
/** Who to notify on message availability */

View File

@ -12,7 +12,9 @@ import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
@ -109,11 +111,12 @@ public class TransportManager implements TransportEventListener {
return peers;
}
List getAddresses() {
List rv = new ArrayList(_transports.size());
Map getAddresses() {
Map rv = new HashMap(_transports.size());
for (int i = 0; i < _transports.size(); i++) {
Transport t = (Transport)_transports.get(i);
rv.addAll(t.getCurrentAddresses());
if (t.getCurrentAddress() != null)
rv.put(t.getStyle(), t.getCurrentAddress());
}
return rv;
}
@ -178,10 +181,8 @@ public class TransportManager implements TransportEventListener {
buf.append("Listening on: <br /><pre>\n");
for (int i = 0; i < _transports.size(); i++) {
Transport t = (Transport)_transports.get(i);
for (Iterator iter = t.getCurrentAddresses().iterator(); iter.hasNext(); ) {
RouterAddress addr = (RouterAddress)iter.next();
buf.append(addr.toString()).append("\n\n");
}
if (t.getCurrentAddress() != null)
buf.append(t.getCurrentAddress()).append("\n\n");
}
buf.append("</pre>\n");
out.write(buf.toString());

View File

@ -141,8 +141,17 @@ public class PacketHandler {
_log.info("Validation with existing con failed, but validation as reestablish/stray passed");
packet.decrypt(_transport.getIntroKey());
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP");
InetAddress remAddr = packet.getPacket().getAddress();
int remPort = packet.getPacket().getPort();
InboundEstablishState est = _establisher.getInboundState(remAddr, remPort);
if (est != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet from an existing peer IS for an inbound establishment");
receivePacket(reader, packet, est, false);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP");
}
return;
}
} else {
@ -171,6 +180,12 @@ public class PacketHandler {
}
private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state) {
receivePacket(reader, packet, state, true);
}
/**
* @param allowFallback if it isn't valid for this establishment state, try as a non-establishment packet
*/
private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state, boolean allowFallback) {
if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) {
StringBuffer buf = new StringBuffer(128);
buf.append("Attempting to receive a packet on a known inbound state: ");
@ -195,9 +210,11 @@ public class PacketHandler {
}
}
// ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets
receivePacket(reader, packet);
if (allowFallback) {
// ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets
receivePacket(reader, packet);
}
}
private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) {
@ -241,27 +258,30 @@ public class PacketHandler {
receivePacket(reader, packet);
}
/** let packets be up to 30s slow */
private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000;
/** let packets be up to 5s slow */
private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 5*1000;
/**
* Parse out the interesting bits and honor what it says
*/
private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) {
reader.initialize(packet);
long now = _context.clock().now();
long when = reader.readTimestamp() * 1000;
long skew = now - when;
long recvOn = packet.getBegin();
long sendOn = reader.readTimestamp() * 1000;
long skew = recvOn - sendOn;
if (skew > GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the future: " + new Date(when) + ": " + packet);
_log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet);
return;
} else if (skew < 0 - GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the past: " + new Date(when) + ": " + packet);
_log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet);
return;
}
if (state != null)
state.adjustClockSkew((short)skew);
_context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime());
InetAddress fromHost = packet.getPacket().getAddress();

View File

@ -300,7 +300,9 @@ public class PeerState {
/** when were the current cipher and MAC keys established/rekeyed? */
public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; }
/** how far off is the remote peer from our clock, in seconds? */
public void setClockSkew(short skew) { _clockSkew = skew; }
public void adjustClockSkew(short skew) {
_clockSkew = (short)(0.9*(float)_clockSkew + 0.1*(float)skew);
}
/** what is the current receive second, for congestion control? */
public void setCurrentReceiveSecond(long sec) { _currentReceiveSecond = sec; }
/** when did we last send them a packet? */

View File

@ -68,7 +68,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** shared slow bid for unconnected peers */
private TransportBid _slowBid;
public static final String STYLE = "udp";
public static final String STYLE = "SSUv1";
public static final String PROP_INTERNAL_PORT = "i2np.udp.internalPort";
/** define this to explicitly set an external IP address */
@ -531,11 +531,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
StringBuffer buf = new StringBuffer(512);
buf.append("<b>UDP connections: ").append(peers.size()).append("</b><br />\n");
buf.append("<table border=\"1\">\n");
buf.append(" <tr><td><b>Peer</b></td><td><b>Location</b></td>\n");
buf.append(" <td><b>Last send</b></td><td><b>Last recv</b></td>\n");
buf.append(" <td><b>Lifetime</b></td><td><b>cwnd</b></td><td><b>ssthresh</b></td>\n");
buf.append(" <tr><td><b>peer</b></td><td><b>activity (in/out)</b></td>\n");
buf.append(" <td><b>uptime</b></td><td><b>skew</b></td>\n");
buf.append(" <td><b>cwnd</b></td><td><b>ssthresh</b></td>\n");
buf.append(" <td><b>rtt</b></td><td><b>dev</b></td><td><b>rto</b></td>\n");
buf.append(" <td><b>Sent</b></td><td><b>Received</b></td>\n");
buf.append(" <td><b>send</b></td><td><b>recv</b></td>\n");
buf.append(" </tr>\n");
out.write(buf.toString());
buf.setLength(0);
@ -547,11 +547,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("<tr>");
buf.append("<td>");
buf.append("<td nowrap>");
buf.append("<a href=\"#");
buf.append(peer.getRemotePeer().toBase64().substring(0,6));
buf.append("</td>");
buf.append("<td>");
buf.append("\">");
byte ip[] = peer.getRemoteIP();
for (int j = 0; j < ip.length; j++) {
if (ip[j] < 0)
@ -562,19 +561,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append('.');
}
buf.append(':').append(peer.getRemotePort());
buf.append("</a>");
if (peer.getConsecutiveFailedSends() > 0)
buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]");
buf.append("</td>");
buf.append("<td>");
buf.append(DataHelper.formatDuration(now-peer.getLastSendTime()));
buf.append("</td>");
buf.append("<td>");
buf.append(DataHelper.formatDuration(now-peer.getLastReceiveTime()));
buf.append("/");
buf.append(DataHelper.formatDuration(now-peer.getLastSendTime()));
buf.append("</td>");
buf.append("<td>");
buf.append(DataHelper.formatDuration(now-peer.getKeyEstablishedTime()));
buf.append("</td>");
buf.append("<td>");
buf.append(peer.getClockSkew()/1000);
buf.append("s</td>");
buf.append("<td>");
buf.append(peer.getSendWindowBytes()/1024);