diff --git a/history.txt b/history.txt index 2ebb05a575..0c7d06ff81 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,9 @@ -$Id: history.txt,v 1.589 2007-09-19 20:44:05 zzz Exp $ +$Id: history.txt,v 1.590 2007-09-22 21:44:36 zzz Exp $ + +2007-09-27 zzz + * Implement pushback of NTCP transport backlog to the outbound tunnel selection code + * Clean up the NTCP and UDP tables on peers.jsp to be consistent, + fix some of the sorting 2007-09-22 zzz * Send messages for the same destination out the same outbound diff --git a/router/java/src/net/i2p/router/CommSystemFacade.java b/router/java/src/net/i2p/router/CommSystemFacade.java index 62c8cdd384..4f0a532e18 100644 --- a/router/java/src/net/i2p/router/CommSystemFacade.java +++ b/router/java/src/net/i2p/router/CommSystemFacade.java @@ -14,6 +14,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import net.i2p.data.Hash; import net.i2p.data.RouterAddress; /** @@ -52,6 +53,7 @@ public abstract class CommSystemFacade implements Service { */ public short getReachabilityStatus() { return STATUS_OK; } public void recheckReachability() {} + public boolean isBacklogged(Hash dest) { return false; } /** * Tell other transports our address changed diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index a68d3a5c3d..7d62fb54ec 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.525 $ $Date: 2007-09-19 20:44:02 $"; + public final static String ID = "$Revision: 1.526 $ $Date: 2007-09-22 21:44:34 $"; public final static String VERSION = "0.6.1.29"; - public final static long BUILD = 7; + public final static long BUILD = 8; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index a3252fdaf6..cc73c0c8bc 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -463,7 +463,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } tunnel = (TunnelInfo) _tunnelCache.get(to); if (tunnel != null) { - if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) + if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel) && + (tunnel.getLength() <= 1 || !getContext().commSystem().isBacklogged(tunnel.getPeer(1)))) return(tunnel); else _tunnelCache.remove(to); diff --git a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java index ce33d349a2..c41d1dc910 100644 --- a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java +++ b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java @@ -19,6 +19,7 @@ import java.util.Set; import java.util.Vector; import java.util.Collections; +import net.i2p.data.Hash; import net.i2p.data.RouterAddress; import net.i2p.router.CommSystemFacade; import net.i2p.router.OutNetMessage; @@ -120,6 +121,10 @@ public class CommSystemFacadeImpl extends CommSystemFacade { GetBidsJob.getBids(_context, this, msg); } + public boolean isBacklogged(Hash dest) { + return _manager.isBacklogged(dest); + } + public List getMostRecentErrorMessages() { return _manager.getMostRecentErrorMessages(); } diff --git a/router/java/src/net/i2p/router/transport/Transport.java b/router/java/src/net/i2p/router/transport/Transport.java index a9e28eda81..d67c452944 100644 --- a/router/java/src/net/i2p/router/transport/Transport.java +++ b/router/java/src/net/i2p/router/transport/Transport.java @@ -47,6 +47,7 @@ public interface Transport { public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException; public short getReachabilityStatus(); public void recheckReachability(); + public boolean isBacklogged(Hash dest); public boolean isUnreachable(Hash peer); } diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index c694711001..02451e8fde 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -382,6 +382,7 @@ public abstract class TransportImpl implements Transport { public RouterContext getContext() { return _context; } public short getReachabilityStatus() { return CommSystemFacade.STATUS_UNKNOWN; } public void recheckReachability() {} + public boolean isBacklogged(Hash dest) { return false; } private static final long UNREACHABLE_PERIOD = 5*60*1000; public boolean isUnreachable(Hash peer) { diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java index bbb6388484..0d29bb87bb 100644 --- a/router/java/src/net/i2p/router/transport/TransportManager.java +++ b/router/java/src/net/i2p/router/transport/TransportManager.java @@ -193,7 +193,14 @@ public class TransportManager implements TransportEventListener { ((Transport)_transports.get(i)).recheckReachability(); } - + public boolean isBacklogged(Hash dest) { + for (int i = 0; i < _transports.size(); i++) { + Transport t = (Transport)_transports.get(i); + if (t.isBacklogged(dest)) + return true; + } + return false; + } Map getAddresses() { Map rv = new HashMap(_transports.size()); diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 50629ef17b..c1365fd51b 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -304,7 +304,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } return queueTime; } - private boolean tooBacklogged() { + public boolean tooBacklogged() { long queueTime = queueTime(); if (queueTime <= 0) return false; int size = 0; diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 127340b18f..06a0d04a78 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -311,6 +311,13 @@ public class NTCPTransport extends TransportImpl { } } + public boolean isBacklogged(Hash dest) { + synchronized (_conLock) { + NTCPConnection con = (NTCPConnection)_conByIdent.get(dest); + return (con != null) && con.isEstablished() && con.tooBacklogged(); + } + } + void removeCon(NTCPConnection con) { NTCPConnection removed = null; synchronized (_conLock) { @@ -541,82 +548,85 @@ public class NTCPTransport extends TransportImpl { StringBuffer buf = new StringBuffer(512); buf.append("NTCP connections: ").append(peers.size()).append("
\n"); buf.append("\n"); - buf.append(" "); + buf.append(" "); buf.append(" "); - buf.append(" "); - buf.append(" "); - buf.append(" "); - buf.append(" "); - buf.append(" "); + buf.append(" "); + buf.append(" "); + buf.append(" "); + buf.append(" "); + buf.append(" "); + buf.append(" "); buf.append(" "); buf.append(" "); buf.append(" "); - buf.append(" "); buf.append(" \n"); out.write(buf.toString()); buf.setLength(0); for (Iterator iter = peers.iterator(); iter.hasNext(); ) { NTCPConnection con = (NTCPConnection)iter.next(); - buf.append("\n"); + buf.append("\n"); out.write(buf.toString()); buf.setLength(0); } if (peers.size() > 0) { buf.append("\n"); - buf.append("\n"); } @@ -627,7 +637,7 @@ public class NTCPTransport extends TransportImpl { buf.setLength(0); } - private static NumberFormat _rateFmt = new DecimalFormat("#,#00.00"); + private static NumberFormat _rateFmt = new DecimalFormat("#,#0.00"); private static String formatRate(float rate) { synchronized (_rateFmt) { return _rateFmt.format(rate); } } @@ -661,7 +671,7 @@ public class NTCPTransport extends TransportImpl { } protected int compare(NTCPConnection l, NTCPConnection r) { // base64 retains binary ordering - return DataHelper.compareTo(l.getRemotePeer().calculateHash().getData(), r.getRemotePeer().calculateHash().getData()); + return l.getRemotePeer().calculateHash().toBase64().compareTo(r.getRemotePeer().calculateHash().toBase64()); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 8ad8c1b8af..a6d95288cd 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -1356,7 +1356,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final IdleInComparator _instance = new IdleInComparator(); public static final IdleInComparator instance() { return _instance; } protected int compare(PeerState l, PeerState r) { - long rv = l.getLastReceiveTime() - r.getLastReceiveTime(); + long rv = r.getLastReceiveTime() - l.getLastReceiveTime(); if (rv == 0) // fallback on alpha return super.compare(l, r); else @@ -1367,7 +1367,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final IdleOutComparator _instance = new IdleOutComparator(); public static final IdleOutComparator instance() { return _instance; } protected int compare(PeerState l, PeerState r) { - long rv = l.getLastSendTime() - r.getLastSendTime(); + long rv = r.getLastSendTime() - l.getLastSendTime(); if (rv == 0) // fallback on alpha return super.compare(l, r); else @@ -1400,7 +1400,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final UptimeComparator _instance = new UptimeComparator(); public static final UptimeComparator instance() { return _instance; } protected int compare(PeerState l, PeerState r) { - long rv = l.getKeyEstablishedTime() - r.getKeyEstablishedTime(); + long rv = r.getKeyEstablishedTime() - l.getKeyEstablishedTime(); if (rv == 0) // fallback on alpha return super.compare(l, r); else @@ -1537,7 +1537,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } protected int compare(PeerState l, PeerState r) { // base64 retains binary ordering - return DataHelper.compareTo(l.getRemotePeer().getData(), r.getRemotePeer().getData()); + return l.getRemotePeer().toBase64().compareTo(r.getRemotePeer().toBase64()); } } private static class InverseComparator implements Comparator { @@ -1646,7 +1646,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(""); - buf.append(name).append("@"); + buf.append(name); +/* + buf.append("@"); byte ip[] = peer.getRemoteIP(); for (int j = 0; j < ip.length; j++) { int num = ip[j] & 0xFF; @@ -1669,7 +1671,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority else if (port < 10000) buf.append("0"); buf.append(port); - buf.append(""); +*/ + buf.append(" "); if (peer.getWeRelayToThemAs() > 0) buf.append(">"); else @@ -1702,7 +1705,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (idleIn < 0) idleIn = 0; if (idleOut < 0) idleOut = 0; - buf.append(""); - buf.append(""); offsetTotal = offsetTotal + peer.getClockSkew(); long sendWindow = peer.getSendWindowBytes(); - buf.append(""); - buf.append(""); int rtt = peer.getRTT(); int rto = peer.getRTO(); - buf.append(""); - buf.append(""); - buf.append(""); - buf.append(""); - buf.append(""); @@ -1789,14 +1792,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority long resent = peer.getPacketsRetransmitted(); long dupRecv = peer.getPacketsReceivedDuplicate(); - buf.append(""); double recvDupPct = (double)peer.getPacketsReceivedDuplicate()/(double)peer.getPacketsReceived(); - buf.append(""); @@ -1822,22 +1825,22 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append("\n"); buf.append(" "); - buf.append(" "); - buf.append(" \n"); - buf.append(" \n"); + buf.append(" \n"); - buf.append(" \n \n"); - buf.append(" \n"); + buf.append("\n \n"); + buf.append(" \n"); buf.append(" \n"); buf.append("
peer
peerdiruptimeidlesentreceivedout/inidlein/outupskewsendrecvout queuebacklogged?reading?skew
").append(con.getRemotePeer().calculateHash().toBase64().substring(0,8)); - buf.append(""); + String name = con.getRemotePeer().calculateHash().toBase64().substring(0,6); + buf.append("
").append(name); + buf.append(""); if (con.getIsInbound()) buf.append("in"); else buf.append("out"); - buf.append("").append(DataHelper.formatDuration(con.getUptime())); - totalUptime += con.getUptime(); - buf.append("").append(con.getTimeSinceSend()/1000); - buf.append("s/").append(con.getTimeSinceReceive()/1000); - buf.append("s").append(con.getMessagesSent()); - totalSend += con.getMessagesSent(); - buf.append("").append(con.getMessagesReceived()); - totalRecv += con.getMessagesReceived(); - buf.append(""); - if (con.getTimeSinceSend() < 10*1000) { - buf.append(formatRate(con.getSendRate()/1024)); - bpsSend += con.getSendRate(); - } else { - buf.append(formatRate(0)); - } - buf.append("/"); + buf.append(""); + buf.append(con.getTimeSinceReceive()/1000); + buf.append("s/").append(con.getTimeSinceSend()/1000); + buf.append("s"); if (con.getTimeSinceReceive() < 10*1000) { buf.append(formatRate(con.getRecvRate()/1024)); bpsRecv += con.getRecvRate(); } else { buf.append(formatRate(0)); } + buf.append("/"); + if (con.getTimeSinceSend() < 10*1000) { + buf.append(formatRate(con.getSendRate()/1024)); + bpsSend += con.getSendRate(); + } else { + buf.append(formatRate(0)); + } buf.append("KBps"); + buf.append("").append(DataHelper.formatDuration(con.getUptime())); + totalUptime += con.getUptime(); + offsetTotal = offsetTotal + con.getClockSkew(); + buf.append("").append(con.getClockSkew()); + buf.append("s").append(con.getMessagesSent()); + totalSend += con.getMessagesSent(); + buf.append("").append(con.getMessagesReceived()); + totalRecv += con.getMessagesReceived(); long outQueue = con.getOutboundQueueSize(); if (outQueue <= 0) { - buf.append("No messages"); + buf.append("No messages"); } else { - buf.append("").append(outQueue).append(" message"); + buf.append("").append(outQueue).append(" message"); if (outQueue > 1) buf.append("s"); writingPeers++; } - buf.append("").append(con.getConsecutiveBacklog() > 0 ? "true" : "false"); + buf.append("").append(con.getConsecutiveBacklog() > 0 ? "true" : "false"); long readTime = con.getReadTime(); if (readTime <= 0) { - buf.append("No"); + buf.append("No"); } else { - buf.append("For ").append(DataHelper.formatDuration(readTime)); + buf.append("For ").append(DataHelper.formatDuration(readTime)); readingPeers++; } - offsetTotal = offsetTotal + con.getClockSkew(); - buf.append("").append(con.getClockSkew()); - buf.append("s

").append(peers.size()).append(" peers ").append(DataHelper.formatDuration(totalUptime/peers.size())); - buf.append(" ").append(totalSend).append("").append(totalRecv); - buf.append("").append(formatRate(bpsSend/1024)).append("/").append(formatRate(bpsRecv/1024)).append("KBps"); + buf.append("
").append(peers.size()).append(" peers  "); + buf.append("").append(formatRate(bpsRecv/1024)).append("/").append(formatRate(bpsSend/1024)).append("KBps"); + buf.append("").append(DataHelper.formatDuration(totalUptime/peers.size())); + buf.append("").append(peers.size() > 0 ? DataHelper.formatDuration(offsetTotal*1000/peers.size()) : "0ms"); + buf.append("").append(totalSend).append("").append(totalRecv); buf.append("   "); - buf.append("").append(peers.size() > 0 ? DataHelper.formatDuration(offsetTotal*1000/peers.size()) : "0ms"); buf.append("
"); + buf.append(""); buf.append(idleIn); buf.append("s/"); buf.append(idleOut); @@ -1711,9 +1714,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority int recvBps = (idleIn > 2 ? 0 : peer.getReceiveBps()); int sendBps = (idleOut > 2 ? 0 : peer.getSendBps()); - buf.append(""); + buf.append(""); buf.append(formatKBps(recvBps)); - buf.append("KBps/"); + buf.append("/"); buf.append(formatKBps(sendBps)); buf.append("KBps "); //buf.append(formatKBps(peer.getReceiveACKBps())); @@ -1724,18 +1727,18 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority long uptime = now - peer.getKeyEstablishedTime(); - buf.append(""); + buf.append(""); buf.append(DataHelper.formatDuration(uptime)); buf.append(""); + buf.append(""); buf.append(peer.getClockSkew()); buf.append("s"); + buf.append(""); buf.append(sendWindow/1024); buf.append("K"); buf.append("/").append(peer.getConcurrentSends()); @@ -1743,26 +1746,26 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append("/").append(peer.getConsecutiveSendRejections()); buf.append(""); + buf.append(""); buf.append(peer.getSlowStartThreshold()/1024); buf.append("K"); + buf.append(""); buf.append(rtt); buf.append(""); + buf.append(""); buf.append(peer.getRTTDeviation()); buf.append(""); + buf.append(""); buf.append(rto); buf.append(""); + buf.append(""); buf.append(peer.getMTU()).append("/").append(peer.getReceiveMTU()); //.append('/'); @@ -1773,11 +1776,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority long sent = peer.getPacketsTransmitted(); long recv = peer.getPacketsReceived(); - buf.append(""); + buf.append(""); buf.append(sent); buf.append(""); + buf.append(""); buf.append(recv); buf.append(""); + buf.append(""); //buf.append(formatPct(sendLostPct)); buf.append(resent); // + "/" + peer.getPacketsPeriodRetransmitted() + "/" + sent); //buf.append(peer.getPacketRetransmissionRate()); buf.append(""); + buf.append(""); buf.append(dupRecv); //formatPct(recvDupPct)); buf.append("

Total"); - buf.append(formatKBps(bpsIn)).append("KBps/").append(formatKBps(bpsOut)); + buf.append(" "); + buf.append(formatKBps(bpsIn)).append("/").append(formatKBps(bpsOut)); buf.append("KBps").append(numPeers > 0 ? DataHelper.formatDuration(uptimeMsTotal/numPeers) : "0s"); - buf.append("").append(numPeers > 0 ? DataHelper.formatDuration(offsetTotal*1000/numPeers) : "0ms").append(""); + buf.append(" ").append(numPeers > 0 ? DataHelper.formatDuration(uptimeMsTotal/numPeers) : "0s"); + buf.append("").append(numPeers > 0 ? DataHelper.formatDuration(offsetTotal*1000/numPeers) : "0ms").append(""); buf.append(numPeers > 0 ? cwinTotal/(numPeers*1024) + "K" : "0K"); buf.append(" "); + buf.append(" "); buf.append(numPeers > 0 ? rttTotal/numPeers : 0); - buf.append(" "); + buf.append(" "); buf.append(numPeers > 0 ? rtoTotal/numPeers : 0); - buf.append(" "); - buf.append(sendTotal).append("").append(recvTotal).append("").append(resentTotal); - buf.append("").append(dupRecvTotal).append(" "); + buf.append(sendTotal).append("").append(recvTotal).append("").append(resentTotal); + buf.append("").append(dupRecvTotal).append("
"); long bytesTransmitted = _context.bandwidthLimiter().getTotalAllocatedOutboundBytes();