diff --git a/core/java/src/net/i2p/data/RouterInfo.java b/core/java/src/net/i2p/data/RouterInfo.java index aa586bc601..a0793c7522 100644 --- a/core/java/src/net/i2p/data/RouterInfo.java +++ b/core/java/src/net/i2p/data/RouterInfo.java @@ -182,6 +182,12 @@ public class RouterInfo extends DataStructureImpl { return (Properties) _options.clone(); } } + public String getOption(String opt) { + if (_options == null) return null; + synchronized (_options) { + return _options.getProperty(opt); + } + } /** * Configure a set of options or statistics that the router can expose @@ -347,7 +353,7 @@ public class RouterInfo extends DataStructureImpl { String capabilities = getCapabilities(); // Iterate through capabilities, searching for known bandwidth tier for (int i = 0; i < capabilities.length(); i++) { - if (bwTiers.indexOf(String.valueOf(capabilities.charAt(i))) >= 0) { + if (bwTiers.indexOf(String.valueOf(capabilities.charAt(i))) != -1) { bwTier = String.valueOf(capabilities.charAt(i)); break; } diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java new file mode 100644 index 0000000000..dfb72cb5fc --- /dev/null +++ b/core/java/src/net/i2p/util/Executor.java @@ -0,0 +1,51 @@ +package net.i2p.util; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import net.i2p.I2PAppContext; + +class Executor implements Runnable { + private I2PAppContext _context; + private Log _log; + private List _readyEvents; + public Executor(I2PAppContext ctx, Log log, List events) { + _context = ctx; + _readyEvents = events; + } + public void run() { + while (true) { + SimpleTimer.TimedEvent evt = null; + synchronized (_readyEvents) { + if (_readyEvents.size() <= 0) + try { _readyEvents.wait(); } catch (InterruptedException ie) {} + if (_readyEvents.size() > 0) + evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); + } + + if (evt != null) { + long before = _context.clock().now(); + try { + evt.timeReached(); + } catch (Throwable t) { + log("wtf, event borked: " + evt, t); + } + long time = _context.clock().now() - before; + if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) ) + _log.warn("wtf, event execution took " + time + ": " + evt); + } + } + } + + private void log(String msg, Throwable t) { + synchronized (this) { + if (_log == null) + _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); + } + _log.log(Log.CRIT, msg, t); + } +} diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index c86b1379b9..a228eb3aef 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -222,43 +222,3 @@ public class SimpleTimer { } } -class Executor implements Runnable { - private I2PAppContext _context; - private Log _log; - private List _readyEvents; - public Executor(I2PAppContext ctx, Log log, List events) { - _context = ctx; - _readyEvents = events; - } - public void run() { - while (true) { - SimpleTimer.TimedEvent evt = null; - synchronized (_readyEvents) { - if (_readyEvents.size() <= 0) - try { _readyEvents.wait(); } catch (InterruptedException ie) {} - if (_readyEvents.size() > 0) - evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); - } - - if (evt != null) { - long before = _context.clock().now(); - try { - evt.timeReached(); - } catch (Throwable t) { - log("wtf, event borked: " + evt, t); - } - long time = _context.clock().now() - before; - if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) ) - _log.warn("wtf, event execution took " + time + ": " + evt); - } - } - } - - private void log(String msg, Throwable t) { - synchronized (this) { - if (_log == null) - _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); - } - _log.log(Log.CRIT, msg, t); - } -} diff --git a/history.txt b/history.txt index b6f5dcddfb..b5911807b0 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,26 @@ -$Id: history.txt,v 1.494 2006-07-16 12:20:46 complication Exp $ +$Id: history.txt,v 1.495 2006-07-18 15:08:00 jrandom Exp $ + +2006-07-26 jrandom + * Every time we create a new router identity, add an entry to the + new "identlog.txt" text file in the I2P install directory. For + debugging purposes, publish the count of how many identities the + router has cycled through, though not the identities itself. + * Cleaned up the way the multitransport shitlisting worked, and + added per-transport shitlists + * When dropping a router reference locally, first fire a netDb + lookup for the entry + * Take the peer selection filters into account when organizing the + profiles (thanks Complication!) + * Avoid some obvious configuration errors for the NTCP transport + (invalid ports, "null" ip, etc) + * Deal with some small NTCP bugs found in the wild (unresolveable + hosts, strange network discons, etc) + * Send our netDb info to peers we have direct NTCP connections to + after each 6-12 hours of connection uptime + * Clean up the NTCP reading and writing queue logic to avoid some + potential delays + * Allow people to specify the IP that the SSU transport binds on + locally, via the advanced config "i2np.udp.bindInterface=1.2.3.4" * 2006-07-18 0.6.1.22 released diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java b/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java index 3ef157ec74..4243f83955 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java @@ -95,8 +95,18 @@ public class I2NPMessageHandler { cur++; _lastReadBegin = System.currentTimeMillis(); I2NPMessage msg = I2NPMessageImpl.createMessage(_context, type); - if (msg == null) - throw new I2NPMessageException("The type "+ type + " is an unknown I2NP message"); + if (msg == null) { + int sz = data.length-offset; + boolean allZero = false; + for (int i = offset; i < data.length; i++) { + if (data[i] != 0) { + allZero = false; + break; + } + } + throw new I2NPMessageException("The type "+ type + " is an unknown I2NP message (remaining sz=" + + sz + " all zeros? " + allZero + ")"); + } try { _lastSize = msg.readBytes(data, type, cur); cur += _lastSize; diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index d3da8b0b31..22d651969e 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -323,6 +323,7 @@ public class Router { ri.setPublished(_context.clock().now()); Properties stats = _context.statPublisher().publishStatistics(); stats.setProperty(RouterInfo.PROP_NETWORK_ID, NETWORK_ID+""); + ri.setOptions(stats); ri.setAddresses(_context.commSystem().createAddresses()); @@ -444,15 +445,32 @@ public class Router { "keyBackup/publicSigning.key", "sessionKeys.dat" }; + static final String IDENTLOG = "identlog.txt"; public static void killKeys() { + new Exception("Clearing identity files").printStackTrace(); + int remCount = 0; for (int i = 0; i < _rebuildFiles.length; i++) { File f = new File(_rebuildFiles[i]); if (f.exists()) { boolean removed = f.delete(); - if (removed) + if (removed) { System.out.println("INFO: Removing old identity file: " + _rebuildFiles[i]); - else + remCount++; + } else { System.out.println("ERROR: Could not remove old identity file: " + _rebuildFiles[i]); + } + } + } + if (remCount > 0) { + FileOutputStream log = null; + try { + log = new FileOutputStream(IDENTLOG, true); + log.write((new Date() + ": Old router identity keys cleared\n").getBytes()); + } catch (IOException ioe) { + // ignore + } finally { + if (log != null) + try { log.close(); } catch (IOException ioe) {} } } } diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index c925478535..e7160d576b 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.433 $ $Date: 2006-07-16 12:20:47 $"; + public final static String ID = "$Revision: 1.434 $ $Date: 2006-07-18 15:08:02 $"; public final static String VERSION = "0.6.1.22"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/Shitlist.java b/router/java/src/net/i2p/router/Shitlist.java index 7f8e98ccf5..80febdeea7 100644 --- a/router/java/src/net/i2p/router/Shitlist.java +++ b/router/java/src/net/i2p/router/Shitlist.java @@ -18,10 +18,10 @@ import net.i2p.router.peermanager.PeerProfile; import net.i2p.util.Log; /** - * Manage in memory the routers we are oh so fond of. - * This needs to get a little bit more sophisticated... currently there is no - * way out of the shitlist - * + * Routers are shitlisted only if none of our transports can talk to them + * or their signed router info is completely screwy. Individual transports + * manage their own unreachable lists and do not generally add to the overall + * shitlist. */ public class Shitlist { private Log _log; @@ -43,6 +43,39 @@ public class Shitlist { _context = context; _log = context.logManager().getLog(Shitlist.class); _entries = new HashMap(32); + _context.jobQueue().addJob(new Cleanup(_context)); + } + + private class Cleanup extends JobImpl { + private List _toUnshitlist; + public Cleanup(RouterContext ctx) { + super(ctx); + _toUnshitlist = new ArrayList(4); + } + public String getName() { return "Cleanup shitlist"; } + public void runJob() { + _toUnshitlist.clear(); + long now = getContext().clock().now(); + synchronized (_entries) { + for (Iterator iter = _entries.keySet().iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + Entry entry = (Entry)_entries.get(peer); + if (entry.expireOn <= now) { + iter.remove(); + _toUnshitlist.add(peer); + } + } + } + for (int i = 0; i < _toUnshitlist.size(); i++) { + Hash peer = (Hash)_toUnshitlist.get(i); + PeerProfile prof = _context.profileOrganizer().getProfile(peer); + if (prof != null) + prof.unshitlist(); + _context.messageHistory().unshitlist(peer); + } + + requeue(30*1000); + } } public int getRouterCount() { @@ -130,7 +163,10 @@ public class Shitlist { fully = true; } else { e.transports.remove(transport); - _entries.put(peer, e); + if (e.transports.size() <= 0) + fully = true; + else + _entries.put(peer, e); } } if (fully) { @@ -188,9 +224,14 @@ public class Shitlist { } buf.append("\n"); + buf.append("Partial shitlisted peers (only blocked on some transports): "); + buf.append(partial); + buf.append("\n"); out.write(buf.toString()); out.flush(); } diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index 7afca28313..c75db136e1 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -8,6 +8,8 @@ package net.i2p.router; * */ +import java.io.FileInputStream; +import java.io.IOException; import java.io.Writer; import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; @@ -100,7 +102,28 @@ public class StatisticsManager implements Service { // No longer expose, to make build tracking more expensive // stats.setProperty("router.id", RouterVersion.ID); // stats.setProperty("core.id", CoreVersion.ID); - + + int newlines = 0; + FileInputStream in = null; + try { + in = new FileInputStream(Router.IDENTLOG); + int c = -1; + // perhaps later filter this to only include ident changes this + // day/week/month + while ( (c = in.read()) != -1) { + if (c == '\n') + newlines++; + } + } catch (IOException ioe) { + // ignore + } finally { + if (in != null) + try { in.close(); } catch (IOException ioe) {} + } + if (newlines > 0) + stats.setProperty("stat_identities", newlines+""); + + if (_includePeerRankings) { if (false) stats.putAll(_context.profileManager().summarizePeers(_publishedStats)); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index fc0c88a4c0..34cbe41181 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -320,7 +320,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } boolean wantACK = true; int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey()); - if (existingTags > 30) + if ( (existingTags > 30) && (getContext().random().nextInt(100) >= 5) ) wantACK = false; long token = (wantACK ? getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE) : -1); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java index 93f362717f..795bcd0693 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java @@ -231,6 +231,29 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad FloodfillPeerSelector sel = (FloodfillPeerSelector)getPeerSelector(); return sel.selectFloodfillParticipants(getKBuckets()); } + + protected void lookupBeforeDropping(Hash peer, RouterInfo info) { + // this sends out the search to the floodfill peers even if we already have the + // entry locally, firing no job if it gets a reply with an updated value (meaning + // we shouldn't drop them but instead use the new data), or if they all time out, + // firing the dropLookupFailedJob, which actually removes out local reference + search(peer, null, new DropLookupFailedJob(_context, peer, info), 10*1000, false); + } + + private class DropLookupFailedJob extends JobImpl { + private Hash _peer; + private RouterInfo _info; + + public DropLookupFailedJob(RouterContext ctx, Hash peer, RouterInfo info) { + super(ctx); + _peer = peer; + _info = info; + } + public String getName() { return "Lookup on failure of netDb peer timed out"; } + public void runJob() { + dropAfterLookupFailed(_peer, _info); + } + } } /** diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index 6ffe4f7708..0c885b2ee8 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -742,12 +742,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } } - _context.peerManager().removeCapabilities(dbEntry); - boolean removed = _kb.remove(dbEntry); - if (removed) { - if (_log.shouldLog(Log.INFO)) - _log.info("Removed kbucket entry for " + dbEntry); - } + lookupBeforeDropping(dbEntry, (RouterInfo)o); + return; } else { // we always drop leaseSets that are failed [timed out], // regardless of how many routers we have. this is called on a lease if @@ -775,6 +771,30 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } } + protected void lookupBeforeDropping(Hash peer, RouterInfo info) { + //bah, humbug. + dropAfterLookupFailed(peer, info); + } + protected void dropAfterLookupFailed(Hash peer, RouterInfo info) { + _context.peerManager().removeCapabilities(peer); + boolean removed = _kb.remove(peer); + if (removed) { + if (_log.shouldLog(Log.INFO)) + _log.info("Removed kbucket entry for " + peer); + } + + _ds.remove(peer); + synchronized (_lastSent) { + _lastSent.remove(peer); + } + synchronized (_explicitSendKeys) { + _explicitSendKeys.remove(peer); + } + synchronized (_passiveSendKeys) { + _passiveSendKeys.remove(peer); + } + } + public void unpublish(LeaseSet localLeaseSet) { if (!_initialized) return; Hash h = localLeaseSet.getDestination().calculateHash(); @@ -935,8 +955,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { renderRouterInfo(buf, ri, false); out.write(buf.toString()); buf.setLength(0); - String coreVersion = ri.getOptions().getProperty("coreVersion"); - String routerVersion = ri.getOptions().getProperty("router.version"); + String coreVersion = ri.getOption("coreVersion"); + String routerVersion = ri.getOption("router.version"); if ( (coreVersion != null) && (routerVersion != null) ) { Map routerVersions = (Map)versions.get(coreVersion); if (routerVersions == null) { @@ -1001,7 +1021,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { buf.append("Stats:
\n"); for (Iterator iter = info.getOptions().keySet().iterator(); iter.hasNext(); ) { String key = (String)iter.next(); - String val = info.getOptions().getProperty(key); + String val = info.getOption(key); buf.append(DataHelper.stripHTML(key)).append(" = ").append(DataHelper.stripHTML(val)).append("
\n"); } buf.append("

\n"); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index 075b3bd758..64f1a7a545 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -841,15 +841,17 @@ class SearchReplyJob extends JobImpl { if (!sendsBadInfo) { // we don't need to search for everthing we're given here - only ones that // are next in our search path... - if (getContext().shitlist().isShitlisted(peer)) { - if (_log.shouldLog(Log.INFO)) - _log.info("Not looking for a shitlisted peer..."); - getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0); - } else { + // note: no need to think about shitlisted targets in the netdb search, given + // the floodfill's behavior + //if (getContext().shitlist().isShitlisted(peer)) { + // if (_log.shouldLog(Log.INFO)) + // _log.info("Not looking for a shitlisted peer..."); + // getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0); + //} else { //getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs); //_repliesPendingVerification++; shouldAdd = true; - } + //} } else { if (_log.shouldLog(Log.INFO)) _log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64()); diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java index 002ddfd739..f96a71b851 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java @@ -21,6 +21,7 @@ import net.i2p.data.Hash; import net.i2p.data.RouterInfo; import net.i2p.router.RouterContext; import net.i2p.router.NetworkDatabaseFacade; +import net.i2p.router.tunnel.pool.TunnelPeerSelector; import net.i2p.stat.Rate; import net.i2p.stat.RateStat; import net.i2p.util.Log; @@ -813,11 +814,16 @@ public class ProfileOrganizer { _log.warn("Peer " + peer.toBase64() + " is marked as hidden, disallowing its use"); return false; } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Peer " + peer.toBase64() + " is locally known, allowing its use"); - // perhaps check to see if they are active? - - return true; + boolean exclude = TunnelPeerSelector.shouldExclude(_context, info); + if (exclude) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Peer " + peer.toBase64() + " has capabilities or other stats suggesting we avoid it"); + return false; + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Peer " + peer.toBase64() + " is locally known, allowing its use"); + return true; + } } } else { if (_log.shouldLog(Log.WARN)) diff --git a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java index cb7f411f94..aba6ecd743 100644 --- a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java +++ b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java @@ -170,8 +170,15 @@ public class CommSystemFacadeImpl extends CommSystemFacade { isNew = true; } */ - if ( (name == null) || (port == null) ) + if ( (name == null) || (port == null) || (name.trim().length() <= 0) || ("null".equals(name)) ) return null; + try { + int p = Integer.parseInt(port); + if ( (p <= 0) || (p > 64*1024) ) + return null; + } catch (NumberFormatException nfe) { + return null; + } props.setProperty(NTCPAddress.PROP_HOST, name); props.setProperty(NTCPAddress.PROP_PORT, port); addr.setOptions(props); diff --git a/router/java/src/net/i2p/router/transport/Transport.java b/router/java/src/net/i2p/router/transport/Transport.java index fe6dee4377..9c86dc0dd7 100644 --- a/router/java/src/net/i2p/router/transport/Transport.java +++ b/router/java/src/net/i2p/router/transport/Transport.java @@ -12,6 +12,7 @@ import java.io.IOException; import java.io.Writer; import java.util.List; import java.util.Set; +import net.i2p.data.Hash; import net.i2p.data.RouterAddress; import net.i2p.data.RouterInfo; @@ -44,4 +45,6 @@ public interface Transport { public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException; public short getReachabilityStatus(); public void recheckReachability(); + + public boolean isUnreachable(Hash peer); } diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index ee33a4e774..e82571320c 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -10,13 +10,7 @@ package net.i2p.router.transport; import java.io.IOException; import java.io.Writer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import net.i2p.data.Hash; import net.i2p.data.RouterAddress; @@ -24,6 +18,7 @@ import net.i2p.data.RouterIdentity; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.CommSystemFacade; import net.i2p.router.Job; +import net.i2p.router.JobImpl; import net.i2p.router.MessageSelector; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; @@ -39,6 +34,8 @@ public abstract class TransportImpl implements Transport { private RouterAddress _currentAddress; private List _sendPool; protected RouterContext _context; + /** map from routerIdentHash to timestamp (Long) that the peer was last unreachable */ + private Map _unreachableEntries; /** * Initialize the new transport @@ -56,6 +53,7 @@ public abstract class TransportImpl implements Transport { _context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); _sendPool = new ArrayList(16); + _unreachableEntries = new HashMap(16); _currentAddress = null; } @@ -374,6 +372,54 @@ public abstract class TransportImpl implements Transport { public RouterContext getContext() { return _context; } public short getReachabilityStatus() { return CommSystemFacade.STATUS_UNKNOWN; } public void recheckReachability() {} + + private static final long UNREACHABLE_PERIOD = 5*60*1000; + public boolean isUnreachable(Hash peer) { + long now = _context.clock().now(); + synchronized (_unreachableEntries) { + Long when = (Long)_unreachableEntries.get(peer); + if (when == null) return false; + if (when.longValue() + UNREACHABLE_PERIOD < now) { + _unreachableEntries.remove(peer); + return false; + } else { + return true; + } + } + } + /** called when we can't reach a peer */ + public void markUnreachable(Hash peer) { + long now = _context.clock().now(); + synchronized (_unreachableEntries) { + _unreachableEntries.put(peer, new Long(now)); + } + } + /** called when we establish a peer connection (outbound or inbound) */ + public void markReachable(Hash peer) { + // if *some* transport can reach them, then we shouldn't shitlist 'em + _context.shitlist().unshitlistRouter(peer); + synchronized (_unreachableEntries) { + _unreachableEntries.remove(peer); + } + } + private class CleanupUnreachable extends JobImpl { + public CleanupUnreachable(RouterContext ctx) { + super(ctx); + } + public String getName() { return "Cleanup " + getStyle() + " unreachable list"; } + public void runJob() { + long now = getContext().clock().now(); + synchronized (_unreachableEntries) { + for (Iterator iter = _unreachableEntries.keySet().iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + Long when = (Long)_unreachableEntries.get(peer); + if (when.longValue() + UNREACHABLE_PERIOD < now) + iter.remove(); + } + } + requeue(60*1000); + } + } public static boolean isPubliclyRoutable(byte addr[]) { if (addr.length == 4) { diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java index a53518b9d4..bb750293cf 100644 --- a/router/java/src/net/i2p/router/transport/TransportManager.java +++ b/router/java/src/net/i2p/router/transport/TransportManager.java @@ -211,10 +211,16 @@ public class TransportManager implements TransportEventListener { } public TransportBid getNextBid(OutNetMessage msg) { + int unreachableTransports = 0; + Hash peer = msg.getTarget().getIdentity().calculateHash(); Set failedTransports = msg.getFailedTransports(); TransportBid rv = null; for (int i = 0; i < _transports.size(); i++) { Transport t = (Transport)_transports.get(i); + if (t.isUnreachable(peer)) { + unreachableTransports++; + continue; + } if (failedTransports.contains(t.getStyle())) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Skipping transport " + t.getStyle() + " as it already failed"); @@ -235,6 +241,8 @@ public class TransportManager implements TransportEventListener { _log.debug("Transport " + t.getStyle() + " did not produce a bid"); } } + if (unreachableTransports >= _transports.size()) + _context.shitlist().shitlistRouter(peer, "Unreachable on any transport"); return rv; } diff --git a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java index 8e5b8dce83..84355fed0e 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java @@ -141,6 +141,7 @@ public class EstablishState { //if (_log.shouldLog(Log.DEBUG)) _log.debug("recv x" + (int)c + " received=" + _received); if (_received >= _X.length) { if (isCheckInfo(_context, _context.routerHash(), _X)) { + _context.statManager().addRateData("ntcp.inboundCheckConnection", 1, 0); fail("Incoming connection was a check connection"); return; } @@ -170,6 +171,7 @@ public class EstablishState { _log.debug(prefix()+"xor=" + Base64.encode(realXor)); } if (!DataHelper.eq(realXor, _hX_xor_bobIdentHash)) { + _context.statManager().addRateData("ntcp.invalidHXxorBIH", 1, 0); fail("Invalid hX_xor"); return; } @@ -217,6 +219,7 @@ public class EstablishState { _transport.getPumper().wantsWrite(_con, write); if (!src.hasRemaining()) return; } catch (DHSessionKeyBuilder.InvalidPublicParameterException e) { + _context.statManager().addRateData("ntcp.invalidDH", 1, 0); fail("Invalid X", e); return; } @@ -306,6 +309,7 @@ public class EstablishState { _log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")"); _e_hXY_tsB = new byte[Hash.HASH_LENGTH+4+12]; } catch (DHSessionKeyBuilder.InvalidPublicParameterException e) { + _context.statManager().addRateData("ntcp.invalidDH", 1, 0); fail("Invalid X", e); return; } @@ -328,6 +332,7 @@ public class EstablishState { Hash h = _context.sha().calculateHash(XY); if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "h(XY)=" + h.toBase64()); if (!DataHelper.eq(h.getData(), 0, hXY_tsB, 0, Hash.HASH_LENGTH)) { + _context.statManager().addRateData("ntcp.invalidHXY", 1, 0); fail("Invalid H(X+Y) - mitm attack attempted?"); return; } @@ -421,6 +426,7 @@ public class EstablishState { _verified = _context.dsa().verifySignature(sig, toVerify, _con.getRemotePeer().getSigningPublicKey()); if (!_verified) { + _context.statManager().addRateData("ntcp.invalidSignature", 1, 0); fail("Signature was invalid - attempt to spoof " + _con.getRemotePeer().calculateHash().toBase64() + "?"); return; } else { @@ -478,6 +484,7 @@ public class EstablishState { RouterIdentity alice = new RouterIdentity(); int sz = (int)DataHelper.fromLong(b, 0, 2); if ( (sz <= 0) || (sz > b.length-2-4-Signature.SIGNATURE_BYTES) ) { + _context.statManager().addRateData("ntcp.invalidInboundSize", sz, 0); fail("size is invalid", new Exception("size is " + sz)); return; } @@ -488,6 +495,7 @@ public class EstablishState { long diff = 1000*Math.abs(tsA-_tsB); if (diff >= Router.CLOCK_FUDGE_FACTOR) { + _context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0); fail("Clocks too skewed (" + diff + ")"); return; } else if (_log.shouldLog(Log.DEBUG)) { @@ -525,11 +533,14 @@ public class EstablishState { if (_log.shouldLog(Log.INFO)) _log.info(prefix()+"Verified remote peer as " + alice.calculateHash().toBase64()); } else { + _context.statManager().addRateData("ntcp.invalidInboundSignature", 1, 0); fail("Peer verification failed - spoof of " + alice.calculateHash().toBase64() + "?"); } } catch (IOException ioe) { + _context.statManager().addRateData("ntcp.invalidInboundIOE", 1, 0); fail("Error verifying peer", ioe); } catch (DataFormatException dfe) { + _context.statManager().addRateData("ntcp.invalidInboundDFE", 1, 0); fail("Error verifying peer", dfe); } } diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 0e5bb8d113..4ff75822e2 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -36,7 +36,7 @@ public class EventPumper implements Runnable { * every 30s or so, iterate across all ntcp connections just to make sure * we have their interestOps set properly (and to expire any looong idle cons) */ - private static final long FAILSAFE_ITERATION_FREQ = 60*1000l; + private static final long FAILSAFE_ITERATION_FREQ = 30*1000l; public EventPumper(RouterContext ctx, NTCPTransport transport) { _context = ctx; @@ -75,6 +75,7 @@ public class EventPumper implements Runnable { } public void registerConnect(NTCPConnection con) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering outbound connection"); + _context.statManager().addRateData("ntcp.registerConnect", 1, 0); synchronized (_wantsConRegister) { _wantsConRegister.add(con); } _selector.wakeup(); } @@ -212,8 +213,10 @@ public class EventPumper implements Runnable { + "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0) + " write? " + write + "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0) + + " on " + key.attachment() ); if (accept) { + _context.statManager().addRateData("ntcp.accept", 1, 0); processAccept(key); } if (connect) { @@ -221,10 +224,12 @@ public class EventPumper implements Runnable { processConnect(key); } if (read) { + _context.statManager().addRateData("ntcp.read", 1, 0); key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); processRead(key); } if (write) { + _context.statManager().addRateData("ntcp.write", 1, 0); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); processWrite(key); } @@ -241,6 +246,7 @@ public class EventPumper implements Runnable { if (req.getPendingOutboundRequested() > 0) { if (_log.shouldLog(Log.INFO)) _log.info("queued write on " + con + " for " + data.length); + _context.statManager().addRateData("ntcp.wantsQueuedWrite", 1, 0); con.queuedWrite(buf, req); } else { // fully allocated @@ -290,7 +296,8 @@ public class EventPumper implements Runnable { rv = ByteBuffer.allocate(BUF_SIZE); NUM_BUFS = ++__liveBufs; if (_log.shouldLog(Log.DEBUG)) - _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); + _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); + _context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("acquiring existing read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); @@ -351,11 +358,15 @@ public class EventPumper implements Runnable { if (connected) { con.setKey(key); con.outboundConnected(); + _context.statManager().addRateData("ntcp.connectSuccessful", 1, 0); } else { con.close(); + _context.statManager().addRateData("ntcp.connectFailedTimeout", 1, 0); } } catch (IOException ioe) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Error processing connection", ioe); + con.close(); + _context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1, 0); } catch (NoConnectionPendingException ncpe) { // ignore } @@ -368,9 +379,12 @@ public class EventPumper implements Runnable { int read = con.getChannel().read(buf); if (read == -1) { if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con); + _context.statManager().addRateData("ntcp.readEOF", 1, 0); con.close(); releaseBuf(buf); } else if (read == 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("nothing to read for " + con + ", but stay interested"); key.interestOps(key.interestOps() | SelectionKey.OP_READ); releaseBuf(buf); } else if (read > 0) { @@ -382,9 +396,14 @@ public class EventPumper implements Runnable { FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read", null, null); //con, buf); if (req.getPendingInboundRequested() > 0) { key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("bw throttled reading for " + con + ", so we don't want to read anymore"); + _context.statManager().addRateData("ntcp.queuedRecv", read, 0); con.queuedRecv(rbuf, req); } else { // fully allocated + if (_log.shouldLog(Log.DEBUG)) + _log.debug("not bw throttled reading for " + con); key.interestOps(key.interestOps() | SelectionKey.OP_READ); con.recv(rbuf); } @@ -392,6 +411,7 @@ public class EventPumper implements Runnable { } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe); con.close(); + _context.statManager().addRateData("ntcp.readError", 1, 0); if (buf != null) releaseBuf(buf); } catch (NotYetConnectedException nyce) { // ??? @@ -406,9 +426,19 @@ public class EventPumper implements Runnable { try { while (true) { ByteBuffer buf = con.getNextWriteBuf(); - if ( (buf != null) && (buf.remaining() > 0) ) { + if (buf != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("writing " + buf.remaining()+"..."); + if (buf.remaining() <= 0) { + long beforeRem = System.currentTimeMillis(); + con.removeWriteBuf(buf); + long afterRem = System.currentTimeMillis(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("buffer was already fully written and removed after " + (afterRem-beforeRem) + "..."); + buf = null; + buffers++; + continue; + } int written = con.getChannel().write(buf); totalWritten += written; if (written == 0) { @@ -441,6 +471,7 @@ public class EventPumper implements Runnable { } } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe); + _context.statManager().addRateData("ntcp.writeError", 1, 0); con.close(); } long after = System.currentTimeMillis(); @@ -512,18 +543,32 @@ public class EventPumper implements Runnable { InetSocketAddress saddr = new InetSocketAddress(naddr.getHost(), naddr.getPort()); boolean connected = con.getChannel().connect(saddr); if (connected) { + _context.statManager().addRateData("ntcp.connectImmediate", 1, 0); key.interestOps(SelectionKey.OP_READ); processConnect(key); } } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error connecting", ioe); - if (ntcpOnly(con)) { - _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage()); - con.close(false); - } else { - _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE); + _context.statManager().addRateData("ntcp.connectFailedIOE", 1, 0); + _transport.markUnreachable(con.getRemotePeer().calculateHash()); + //if (ntcpOnly(con)) { + // _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage()); + // con.close(false); + //} else { + // _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE); con.close(true); - } + //} + } catch (UnresolvedAddressException uae) { + if (_log.shouldLog(Log.WARN)) _log.warn("unresolved address connecting", uae); + _context.statManager().addRateData("ntcp.connectFailedUnresolved", 1, 0); + _transport.markUnreachable(con.getRemotePeer().calculateHash()); + //if (ntcpOnly(con)) { + // _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage()); + // con.close(false); + //} else { + // _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage(), NTCPTransport.STYLE); + con.close(true); + //} } } catch (ClosedChannelException cce) { if (_log.shouldLog(Log.WARN)) _log.warn("Error registering", cce); diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 179df146ae..dd29ef1638 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -96,8 +96,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private boolean _sendingMeta; /** how many consecutive sends were failed due to (estimated) send queue time */ private int _consecutiveBacklog; + private long _nextInfoTime; private static final int META_FREQUENCY = 10*60*1000; + private static final int INFO_FREQUENCY = 6*60*60*1000; /** * Create an inbound connected (though not established) NTCP connection @@ -181,6 +183,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _transport.inboundEstablished(this); _establishState = null; _nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY); + _nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY); } public long getClockSkew() { return _clockSkew; } public long getUptime() { @@ -199,8 +202,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { return queued; } } - public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; } - public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; } + public long getTimeSinceSend() { return _lastSendTime <= 0 ? 0 : System.currentTimeMillis()-_lastSendTime; } + public long getTimeSinceReceive() { return _lastReceiveTime <= 0 ? 0 : System.currentTimeMillis()-_lastReceiveTime; } public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; } public int getConsecutiveBacklog() { return _consecutiveBacklog; } @@ -253,22 +256,26 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { synchronized (_writeBufs) { blocks = _writeBufs.size(); } if (_log.shouldLog(Log.ERROR)) _log.error("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64()); + _context.statManager().addRateData("ntcp.closeOnBacklog", _consecutiveBacklog, getUptime()); close(); } + _context.statManager().addRateData("ntcp.dontSendOnBacklog", _consecutiveBacklog, msg.getLifetime()); return; } _consecutiveBacklog = 0; int enqueued = 0; if (FAST_LARGE) bufferedPrepare(msg); + boolean noOutbound = false; synchronized (_outbound) { _outbound.add(msg); enqueued = _outbound.size(); msg.setQueueSize(enqueued); + noOutbound = (_currentOutbound == null); } if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType()); - if (_established && _currentOutbound == null) - _transport.getWriter().wantsWrite(this); + if (_established && noOutbound) + _transport.getWriter().wantsWrite(this, "enqueued"); } private long queueTime() { @@ -289,8 +296,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { long queueTime = queueTime(); if (queueTime <= 0) return false; int size = 0; + boolean currentOutboundSet = false; synchronized (_outbound) { size = _outbound.size(); + currentOutboundSet = (_currentOutbound != null); } // perhaps we could take into account the size of the queued messages too, our @@ -299,8 +308,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (getUptime() < 10*1000) // allow some slack just after establishment return false; if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime... + int writeBufs = 0; + synchronized (_writeBufs) { writeBufs = _writeBufs.size(); } if (_log.shouldLog(Log.WARN)) - _log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size); + _log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size + + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE)) + + ", currentOut set? " + currentOutboundSet + + ", writeBufs: " + writeBufs + " on " + toString()); _context.statManager().addRateData("ntcp.sendBacklogTime", queueTime, size); return true; //} else if (size > 32) { // another arbitrary limit. @@ -324,6 +338,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (target != null) { infoMsg.setTarget(target); infoMsg.beginSend(); + _context.statManager().addRateData("ntcp.infoMessageEnqueued", 1, 0); send(infoMsg); } else { if (_isInbound) { @@ -351,16 +366,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _established = true; _establishedOn = System.currentTimeMillis(); _establishState = null; - _context.shitlist().unshitlistRouter(getRemotePeer().calculateHash()); + _transport.markReachable(getRemotePeer().calculateHash()); + //_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE); boolean msgs = false; synchronized (_outbound) { msgs = (_outbound.size() > 0); } _nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY); + _nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY); if (msgs) - _transport.getWriter().wantsWrite(this); + _transport.getWriter().wantsWrite(this, "outbound established"); } + public boolean getIsInbound() { return _isInbound; } + // Time vs space tradeoff: // on slow GCing jvms, the mallocs in the following preparation can cause the // write to get congested, taking up a substantial portion of the Writer's @@ -474,6 +493,14 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { // + Base64.encode(encrypted, 0, 16) + "...\ndecrypted: " // + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16)); _transport.getPumper().wantsWrite(this, encrypted); + + // for every 6-12 hours that we are connected to a peer, send them + // our updated netDb info (they may not accept it and instead query + // the floodfill netDb servers, but they may...) + if (_nextInfoTime <= System.currentTimeMillis()) { + enqueueInfoMessage(); + _nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY); + } } /** @@ -537,6 +564,15 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { + " encrypted=" + (encryptedTime-begin) + " wantsWrite=" + (wantsTime-encryptedTime) + " releaseBuf=" + (releaseTime-wantsTime)); + + // for every 6-12 hours that we are connected to a peer, send them + // our updated netDb info (they may not accept it and instead query + // the floodfill netDb servers, but they may...) + if (_nextInfoTime <= System.currentTimeMillis()) { + // perhaps this should check to see if we are bw throttled, etc? + enqueueInfoMessage(); + _nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY); + } } /** @@ -608,6 +644,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { NUM_PREP_BUFS = ++__liveBufs; if (_log.shouldLog(Log.DEBUG)) _log.debug("creating a new prep buffer with " + __liveBufs + " live"); + _context.statManager().addRateData("ntcp.prepBufCache", NUM_PREP_BUFS, 0); b.acquired(); return b; } @@ -675,16 +712,24 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _conKey.interestOps(SelectionKey.OP_READ); // schedule up the beginning of our handshaking by calling prepareNextWrite on the // writer thread pool - _transport.getWriter().wantsWrite(this); + _transport.getWriter().wantsWrite(this, "outbound connected"); } public void complete(FIFOBandwidthLimiter.Request req) { removeRequest(req); ByteBuffer buf = (ByteBuffer)req.attachment(); - if (req.getTotalInboundRequested() > 0) + if (req.getTotalInboundRequested() > 0) { + _context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime()), 0); recv(buf); - else if (req.getTotalOutboundRequested() > 0) + // our reads used to be bw throttled (during which time we were no + // longer interested in reading from the network), but we aren't + // throttled anymore, so we should resume being interested in reading + _transport.getPumper().wantsRead(this); + //_transport.getReader().wantsRead(this); + } else if (req.getTotalOutboundRequested() > 0) { + _context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()), 0); write(buf); + } } private void removeRequest(FIFOBandwidthLimiter.Request req) { synchronized (_bwRequests) { _bwRequests.remove(req); } @@ -739,6 +784,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { synchronized (_writeBufs) { _writeBufs.add(buf); } + if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)"); _transport.getPumper().wantsWrite(this); } @@ -768,30 +814,38 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { public void removeWriteBuf(ByteBuffer buf) { _bytesSent += buf.capacity(); OutNetMessage msg = null; + boolean bufsRemain = false; + boolean clearMessage = false; synchronized (_writeBufs) { if (_sendingMeta && (buf.capacity() == _meta.length)) { _sendingMeta = false; } else { - if (_currentOutbound != null) + clearMessage = true; + } + _writeBufs.remove(buf); + bufsRemain = _writeBufs.size() > 0; + } + if (clearMessage) { + synchronized (_outbound) { + if (_currentOutbound != null) msg = _currentOutbound; _currentOutbound = null; } - _writeBufs.remove(buf); - } - if (msg != null) { - _lastSendTime = System.currentTimeMillis(); - _context.statManager().addRateData("ntcp.sendTime", msg.getSendTime(), msg.getSendTime()); - _context.statManager().addRateData("ntcp.transmitTime", msg.getTransmissionTime(), msg.getTransmissionTime()); - _context.statManager().addRateData("ntcp.sendQueueSize", msg.getQueueSize(), msg.getLifetime()); - if (_log.shouldLog(Log.INFO)) { - _log.info("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after " - + msg.getSendTime() + "/" + msg.getTransmissionTime() + "/" - + msg.getPreparationTime() + "/" + msg.getLifetime() - + " queued after " + msg.getQueueSize() - + " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+")"); + if (msg != null) { + _lastSendTime = System.currentTimeMillis(); + _context.statManager().addRateData("ntcp.sendTime", msg.getSendTime(), msg.getSendTime()); + _context.statManager().addRateData("ntcp.transmitTime", msg.getTransmissionTime(), msg.getTransmissionTime()); + _context.statManager().addRateData("ntcp.sendQueueSize", msg.getQueueSize(), msg.getLifetime()); + if (_log.shouldLog(Log.INFO)) { + _log.info("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after " + + msg.getSendTime() + "/" + msg.getTransmissionTime() + "/" + + msg.getPreparationTime() + "/" + msg.getLifetime() + + " queued after " + msg.getQueueSize() + + " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+" on " + toString() + ")"); + } + _messagesWritten++; + _transport.sendComplete(msg); } - _messagesWritten++; - _transport.sendComplete(msg); } else { if (_log.shouldLog(Log.INFO)) _log.info("I2NP meta message sent completely"); @@ -801,8 +855,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { synchronized (_outbound) { msgs = ((_outbound.size() > 0) || (_currentOutbound != null)); } - if (msgs) - _transport.getWriter().wantsWrite(this); + if (msgs) // push through the bw limiter to reach _writeBufs + _transport.getWriter().wantsWrite(this, "write completed"); + if (bufsRemain) // send asap + _transport.getPumper().wantsWrite(this); updateStats(); } @@ -879,6 +935,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (!ok) { if (_log.shouldLog(Log.ERROR)) _log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data"); + _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1, getUptime()); return; } byte swap[] = _prevReadBlock; @@ -895,6 +952,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (_curReadState.getSize() > 16*1024) { if (_log.shouldLog(Log.ERROR)) _log.error("i2np message more than 16KB? nuh uh: " + _curReadState.getSize()); + _context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize(), getUptime()); close(); return false; } else { @@ -922,6 +980,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (read != expected) { if (_log.shouldLog(Log.WARN)) _log.warn("I2NP metadata message had a bad CRC value"); + _context.statManager().addRateData("ntcp.corruptMetaCRC", 1, getUptime()); close(); return; } else { @@ -929,9 +988,11 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if ( (newSkew > Router.CLOCK_FUDGE_FACTOR) || (newSkew < 0-Router.CLOCK_FUDGE_FACTOR) ) { if (_log.shouldLog(Log.WARN)) _log.warn("Peer's skew jumped too far (from " + _clockSkew + " to " + newSkew + "): " + toString()); + _context.statManager().addRateData("ntcp.corruptSkew", newSkew, getUptime()); close(); return; } + _context.statManager().addRateData("ntcp.receiveMeta", newSkew, getUptime()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received NTCP metadata, old skew of " + _clockSkew + ", new skew of " + newSkew); _clockSkew = newSkew; @@ -955,6 +1016,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _log.debug("Sending NTCP metadata"); _sendingMeta = true; _transport.getPumper().wantsWrite(this, encrypted); + // enqueueInfoMessage(); // this often? } public int hashCode() { return System.identityHashCode(this); } @@ -1118,7 +1180,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { releaseHandler(h); if (_log.shouldLog(Log.DEBUG)) _log.debug("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0) - + " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes"); + + " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes on " + toString()); _context.statManager().addRateData("ntcp.receiveTime", timeToRecv, timeToRecv); _context.statManager().addRateData("ntcp.receiveSize", _size, timeToRecv); if (read != null) { @@ -1133,17 +1195,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } catch (IOException ioe) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Error parsing I2NP message", ioe); + _context.statManager().addRateData("ntcp.corruptI2NPIOE", 1, getUptime()); close(); return; } catch (I2NPMessageException ime) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Error parsing I2NP message", ime); + _context.statManager().addRateData("ntcp.corruptI2NPIME", 1, getUptime()); close(); return; } } else { if (_log.shouldLog(Log.ERROR)) _log.error("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks); + _context.statManager().addRateData("ntcp.corruptI2NPCRC", 1, getUptime()); close(); return; } diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 36a3cbc237..6da52359b1 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -63,6 +63,58 @@ public class NTCPTransport extends TransportImpl { _context.statManager().createRateStat("ntcp.failsafeWrites", "How many times do we need to proactively add in an extra nio write to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("ntcp.failsafeCloses", "How many times do we need to proactively close an idle connection to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.accept", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.attemptShitlistedPeer", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.attemptUnreachablePeer", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.closeOnBacklog", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.connectFailedIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.connectFailedNoNTCPAddress", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.connectFailedTimeout", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.connectFailedTimeoutIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.connectFailedUnresolved", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.connectImmediate", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.connectSuccessful", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.corruptDecryptedI2NP", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.corruptI2NPCRC", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.corruptI2NPIME", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.corruptI2NPIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.corruptMetaCRC", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.corruptSkew", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.corruptTooLargeI2NP", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.dontSendOnBacklog", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.inboundCheckConnection", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.inboundEstablished", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.inboundEstablishedDuplicate", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.infoMessageEnqueued", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.invalidDH", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.invalidHXY", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.invalidHXxorBIH", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.invalidInboundDFE", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.invalidInboundIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.invalidInboundSignature", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.invalidInboundSize", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.invalidInboundSkew", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.invalidSignature", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.liveReadBufs", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.multipleCloseOnRemove", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.outboundEstablishFailed", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.outboundFailedIOEImmediate", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.prepBufCache", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.read", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.readError", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.receiveCorruptEstablishment", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.receiveMeta", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.registerConnect", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.throttledReadComplete", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.throttledWriteComplete", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.wantsQueuedWrite", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.write", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.writeError", "", "ntcp", new long[] { 60*1000, 10*60*1000 }); + + _establishing = new ArrayList(4); _conLock = new Object(); _conByIdent = new HashMap(64); @@ -79,7 +131,9 @@ public class NTCPTransport extends TransportImpl { } void inboundEstablished(NTCPConnection con) { - _context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash()); + _context.statManager().addRateData("ntcp.inboundEstablished", 1, 0); + markReachable(con.getRemotePeer().calculateHash()); + //_context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash()); NTCPConnection old = null; synchronized (_conLock) { old = (NTCPConnection)_conByIdent.put(con.getRemotePeer().calculateHash(), con); @@ -87,6 +141,7 @@ public class NTCPTransport extends TransportImpl { if (old != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Old connection closed: " + old + " replaced by " + con); + _context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", old.getUptime(), 0); old.close(); } } @@ -127,6 +182,7 @@ public class NTCPTransport extends TransportImpl { } catch (IOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error("Error opening a channel", ioe); + _context.statManager().addRateData("ntcp.outboundFailedIOEImmediate", 1, 0); con.close(); } } else { @@ -152,8 +208,8 @@ public class NTCPTransport extends TransportImpl { old = (NTCPConnection)_conByIdent.put(ih, con); } if (old != null) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Multiple connections on out ready, closing " + old + " and keeping " + con); + if (_log.shouldLog(Log.WARN)) + _log.warn("Multiple connections on out ready, closing " + old + " and keeping " + con); old.close(); } con.enqueueInfoMessage(); // enqueues a netDb store of our own info @@ -181,23 +237,50 @@ public class NTCPTransport extends TransportImpl { super.afterSend(msg, sendSuccessful, allowRequeue, msToSend); } public TransportBid bid(RouterInfo toAddress, long dataSize) { - if (_context.shitlist().isShitlisted(toAddress.getIdentity().calculateHash(), STYLE)) { + Hash peer = toAddress.getIdentity().calculateHash(); + if (_context.shitlist().isShitlisted(peer, STYLE)) { // we aren't shitlisted in general (since we are trying to get a bid), but we have // recently shitlisted the peer on the NTCP transport, so don't try it + _context.statManager().addRateData("ntcp.attemptShitlistedPeer", 1, 0); + return null; + } else if (isUnreachable(peer)) { + _context.statManager().addRateData("ntcp.attemptUnreachablePeer", 1, 0); return null; } + + boolean established = isEstablished(toAddress.getIdentity()); + if (established) { // should we check the queue size? nah, if its valid, use it + if (_log.shouldLog(Log.DEBUG)) + _log.debug("fast bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + " as its already established"); + return _fastBid; + } + RouterAddress addr = toAddress.getTargetAddress(STYLE); + if (addr == null) { + markUnreachable(peer); + _context.statManager().addRateData("ntcp.connectFailedNoNTCPAddress", 1, 0); + //_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("no bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + " as they don't have an ntcp address"); + return null; + } + NTCPAddress naddr = new NTCPAddress(addr); + if ( (naddr.getPort() <= 0) || (naddr.getHost() == null) ) { + _context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1, 0); + markUnreachable(peer); + //_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("no bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + " as they don't have a valid ntcp address"); + return null; + } + //if ( (_myAddress != null) && (_myAddress.equals(addr)) ) // return null; // dont talk to yourself - boolean established = isEstablished(toAddress.getIdentity()); - if (established) // should we check the queue size? nah, if its valid, use it - return _fastBid; - else if (addr != null) - return _slowBid; - else - return null; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("slow bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64()); + return _slowBid; } void sendComplete(OutNetMessage msg) { _finisher.add(msg); } @@ -239,6 +322,7 @@ public class NTCPTransport extends TransportImpl { if ( (removed != null) && (removed != con) ) {// multiple cons, close 'em both if (_log.shouldLog(Log.ERROR)) _log.error("Multiple connections on remove, closing " + removed + " (already closed " + con + ")"); + _context.statManager().addRateData("ntcp.multipleCloseOnRemove", removed.getUptime(), 0); removed.close(); } } @@ -349,6 +433,8 @@ public class NTCPTransport extends TransportImpl { } for (int i = 0; expired != null && i < expired.size(); i++) ((NTCPConnection)expired.get(i)).close(); + if ( (expired != null) && (expired.size() > 0) ) + _context.statManager().addRateData("ntcp.outboundEstablishFailed", expired.size(), 0); } //private boolean bindAllInterfaces() { return true; } @@ -360,10 +446,17 @@ public class NTCPTransport extends TransportImpl { } else { RouterAddress ra = CommSystemFacadeImpl.createNTCPAddress(ctx); if (ra != null) { - _myAddress = new NTCPAddress(ra); - replaceAddress(ra); - if (_log.shouldLog(Log.INFO)) - _log.info("NTCP address configured: " + _myAddress); + NTCPAddress addr = new NTCPAddress(ra); + if (addr.getPort() <= 0) { + _myAddress = null; + if (_log.shouldLog(Log.ERROR)) + _log.error("NTCP address is outbound only, since the NTCP configuration is invalid"); + } else { + _myAddress = addr; + replaceAddress(ra); + if (_log.shouldLog(Log.INFO)) + _log.info("NTCP address configured: " + _myAddress); + } } else { if (_log.shouldLog(Log.INFO)) _log.info("NTCP address is outbound only"); @@ -404,11 +497,17 @@ public class NTCPTransport extends TransportImpl { int numPeers = 0; int readingPeers = 0; int writingPeers = 0; + float bpsSend = 0; + float bpsRecv = 0; + long totalUptime = 0; + long totalSend = 0; + long totalRecv = 0; StringBuffer buf = new StringBuffer(512); buf.append("NTCP connections: ").append(peers.size()).append("
\n"); buf.append("\n"); buf.append(" "); + buf.append(" "); buf.append(" "); buf.append(" "); buf.append(" "); @@ -424,13 +523,34 @@ public class NTCPTransport extends TransportImpl { for (Iterator iter = peers.iterator(); iter.hasNext(); ) { NTCPConnection con = (NTCPConnection)iter.next(); buf.append("\n"); + buf.append(""); + buf.append("\n"); + } + buf.append("
peerdiruptimeidlesent
").append(con.getRemotePeer().calculateHash().toBase64().substring(0,8)); + buf.append(""); + if (con.getIsInbound()) + buf.append("in"); + else + buf.append("out"); buf.append("").append(DataHelper.formatDuration(con.getUptime())); - buf.append("").append(DataHelper.formatDuration(con.getTimeSinceSend())); - buf.append("/").append(DataHelper.formatDuration(con.getTimeSinceReceive())); - buf.append("").append(con.getMessagesSent()); + totalUptime += con.getUptime(); + buf.append("").append(con.getTimeSinceSend()/1000); + buf.append("s/").append(con.getTimeSinceReceive()/1000); + buf.append("s").append(con.getMessagesSent()); + totalSend += con.getMessagesSent(); buf.append("").append(con.getMessagesReceived()); - buf.append("").append(formatRate(con.getSendRate()/1024)); - buf.append("/").append(formatRate(con.getRecvRate()/1024)).append("KBps"); + totalRecv += con.getMessagesReceived(); + buf.append(""); + if (con.getTimeSinceSend() < 10*1000) { + buf.append(formatRate(con.getSendRate()/1024)); + bpsSend += con.getSendRate(); + } else { + buf.append(formatRate(0)); + } + buf.append("/"); + if (con.getTimeSinceReceive() < 10*1000) { + buf.append(formatRate(con.getRecvRate()/1024)); + bpsRecv += con.getRecvRate(); + } else { + buf.append(formatRate(0)); + } + buf.append("KBps"); long outQueue = con.getOutboundQueueSize(); if (outQueue <= 0) { buf.append("No messages"); @@ -454,6 +574,15 @@ public class NTCPTransport extends TransportImpl { buf.setLength(0); } + if (peers.size() > 0) { + buf.append("

").append(peers.size()).append(" peers ").append(DataHelper.formatDuration(totalUptime/peers.size())); + buf.append(" ").append(totalSend).append("").append(totalRecv); + buf.append("").append(formatRate(bpsSend/1024)).append("/").append(formatRate(bpsRecv/1024)).append("KBps"); + buf.append("    
\n"); buf.append("Peers currently reading I2NP messages: ").append(readingPeers).append("
\n"); buf.append("Peers currently writing I2NP messages: ").append(writingPeers).append("
\n"); diff --git a/router/java/src/net/i2p/router/transport/ntcp/Reader.java b/router/java/src/net/i2p/router/transport/ntcp/Reader.java index 47cf91b3e3..0d8bed500e 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/Reader.java +++ b/router/java/src/net/i2p/router/transport/ntcp/Reader.java @@ -148,6 +148,7 @@ class Reader { if (est.isCorrupt()) { if (_log.shouldLog(Log.WARN)) _log.warn("closing connection on establishment because: " +est.getError(), est.getException()); + _context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1, 0); con.close(); return; } else if (buf.remaining() <= 0) { diff --git a/router/java/src/net/i2p/router/transport/ntcp/Writer.java b/router/java/src/net/i2p/router/transport/ntcp/Writer.java index d808e8a64b..968c81376d 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/Writer.java +++ b/router/java/src/net/i2p/router/transport/ntcp/Writer.java @@ -48,10 +48,11 @@ class Writer { } } - public void wantsWrite(NTCPConnection con) { + public void wantsWrite(NTCPConnection con, String source) { //if (con.getCurrentOutbound() != null) // throw new RuntimeException("Current outbound message already in play on " + con); boolean already = false; + boolean pending = false; synchronized (_pendingConnections) { if (_liveWrites.contains(con)) { if (!_writeAfterLive.contains(con)) { @@ -60,11 +61,12 @@ class Writer { already = true; } else if (!_pendingConnections.contains(con)) { _pendingConnections.add(con); + pending = true; } _pendingConnections.notifyAll(); } if (_log.shouldLog(Log.DEBUG)) - _log.debug("wantsWrite: " + con + " already live? " + already); + _log.debug("wantsWrite: " + con + " already live? " + already + " added to pending? " + pending + ": " + source); } public void connectionClosed(NTCPConnection con) { synchronized (_pendingConnections) { @@ -87,20 +89,28 @@ class Writer { boolean keepWriting = (con != null) && _writeAfterLive.remove(con); if (keepWriting) { // keep on writing the same one + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Keep writing on the same connection: " + con); } else { _liveWrites.remove(con); con = null; if (_pendingConnections.size() <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Done writing, but nothing pending, so wait"); _pendingConnections.wait(); } else { con = (NTCPConnection)_pendingConnections.remove(0); _liveWrites.add(con); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Switch to writing on: " + con); } } } } catch (InterruptedException ie) {} if (!_stop && (con != null)) { try { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Prepare next write on: " + con); con.prepareNextWrite(); } catch (RuntimeException re) { _log.log(Log.CRIT, "Error in the ntcp writer", re); diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 2d13a361f9..a6f9d13b75 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -134,6 +134,7 @@ public class EstablishmentManager { } if (msg.getTarget().getNetworkId() != Router.NETWORK_ID) { _context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash()); + _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash()); _transport.failed(msg, "Remote peer is on the wrong network, cannot establish"); return; } @@ -146,7 +147,8 @@ public class EstablishmentManager { if (!_transport.isValid(to.getIP())) { _transport.failed(msg, "Remote peer's IP isn't valid"); - _context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE); + _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash()); + //_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE); return; } @@ -465,7 +467,7 @@ public class EstablishmentManager { public PublishToNewInbound(PeerState peer) { _peer = peer; } public void timeReached() { Hash peer = _peer.getRemotePeer(); - if ((peer != null) && (!_context.shitlist().isShitlisted(peer))) { + if ((peer != null) && (!_context.shitlist().isShitlisted(peer)) && (!_transport.isUnreachable(peer))) { // ok, we are fine with them, send them our latest info if (_log.shouldLog(Log.INFO)) _log.info("Publishing to the peer after confirm plus delay (without shitlist): " + peer.toBase64()); @@ -942,7 +944,8 @@ public class EstablishmentManager { } Hash peer = outboundState.getRemoteIdentity().calculateHash(); - _context.shitlist().shitlistRouter(peer, err, UDPTransport.STYLE); + //_context.shitlist().shitlistRouter(peer, err, UDPTransport.STYLE); + _transport.markUnreachable(peer); _transport.dropPeer(peer, false, err); //_context.profileManager().commErrorOccurred(peer); } else { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index f92fe40138..5e83837b63 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -1,5 +1,6 @@ package net.i2p.router.transport.udp; +import java.io.IOException; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.SocketException; @@ -18,12 +19,14 @@ public class UDPEndpoint { private UDPTransport _transport; private UDPSender _sender; private UDPReceiver _receiver; + private DatagramSocket _socket; + private InetAddress _bindAddress; - public UDPEndpoint(RouterContext ctx, UDPTransport transport, int listenPort) throws SocketException { + public UDPEndpoint(RouterContext ctx, UDPTransport transport, int listenPort, InetAddress bindAddress) throws SocketException { _context = ctx; _log = ctx.logManager().getLog(UDPEndpoint.class); _transport = transport; - + _bindAddress = bindAddress; _listenPort = listenPort; } @@ -32,9 +35,12 @@ public class UDPEndpoint { _log.debug("Starting up the UDP endpoint"); shutdown(); try { - DatagramSocket socket = new DatagramSocket(_listenPort); - _sender = new UDPSender(_context, socket, "UDPSend on " + _listenPort); - _receiver = new UDPReceiver(_context, _transport, socket, "UDPReceive on " + _listenPort); + if (_bindAddress == null) + _socket = new DatagramSocket(_listenPort); + else + _socket = new DatagramSocket(_listenPort, _bindAddress); + _sender = new UDPSender(_context, _socket, "UDPSend on " + _listenPort); + _receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceive on " + _listenPort); _sender.startup(); _receiver.startup(); } catch (SocketException se) { @@ -48,16 +54,22 @@ public class UDPEndpoint { _sender.shutdown(); _receiver.shutdown(); } + if (_socket != null) { + _socket.close(); + } } public void setListenPort(int newPort) { _listenPort = newPort; } public void updateListenPort(int newPort) { if (newPort == _listenPort) return; try { - DatagramSocket socket = new DatagramSocket(newPort); - _sender.updateListeningPort(socket, newPort); + if (_bindAddress == null) + _socket = new DatagramSocket(_listenPort); + else + _socket = new DatagramSocket(_listenPort, _bindAddress); + _sender.updateListeningPort(_socket, newPort); // note: this closes the old socket, so call this after the sender! - _receiver.updateListeningPort(socket, newPort); + _receiver.updateListeningPort(_socket, newPort); _listenPort = newPort; } catch (SocketException se) { if (_log.shouldLog(Log.ERROR)) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java index ed592212c5..3a08ac5528 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java @@ -36,7 +36,7 @@ public class UDPEndpointTest { int base = 2000 + _context.random().nextInt(10000); for (int i = 0; i < numPeers; i++) { _log.debug("Building " + i); - UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i); + UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i, null); _endpoints[i] = endpoint; endpoint.startup(); I2PThread read = new I2PThread(new TestRead(endpoint), "Test read " + i); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 564ac606de..ddafb7b979 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -108,6 +108,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority public static final String PROP_FORCE_INTRODUCERS = "i2np.udp.forceIntroducers"; /** do we allow direct SSU connections, sans introducers? */ public static final String PROP_ALLOW_DIRECT = "i2np.udp.allowDirect"; + public static final String PROP_BIND_INTERFACE = "i2np.udp.bindInterface"; /** how many relays offered to us will we use at a time? */ public static final int PUBLIC_RELAY_COUNT = 3; @@ -220,8 +221,19 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _log.info("Binding to the explicitly specified external port: " + port); } if (_endpoint == null) { + String bindTo = _context.getProperty(PROP_BIND_INTERFACE); + InetAddress bindToAddr = null; + if (bindTo != null) { + try { + bindToAddr = InetAddress.getByName(bindTo); + } catch (UnknownHostException uhe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Invalid SSU bind interface specified [" + bindTo + "]", uhe); + bindToAddr = null; + } + } try { - _endpoint = new UDPEndpoint(_context, this, port); + _endpoint = new UDPEndpoint(_context, this, port, bindToAddr); } catch (SocketException se) { if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "Unable to listen on the UDP port (" + port + ")", se); @@ -327,7 +339,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_log.shouldLog(Log.ERROR)) _log.error("The router " + from.toBase64() + " told us we have an invalid IP - " + RemoteHostId.toString(ourIP) + ". Lets throw tomatoes at them"); - _context.shitlist().shitlistRouter(from, "They said we had an invalid IP", STYLE); + markUnreachable(from); + //_context.shitlist().shitlistRouter(from, "They said we had an invalid IP", STYLE); return; } else if (inboundRecent && _externalListenPort > 0 && _externalListenHost != null) { // use OS clock since its an ordering thing, not a time thing @@ -533,7 +546,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _log.warn("Peer already connected: old=" + oldPeer + " new=" + peer, new Exception("dup")); _activeThrottle.unchoke(peer.getRemotePeer()); - _context.shitlist().unshitlistRouter(peer.getRemotePeer(), STYLE); + markReachable(peer.getRemotePeer()); + //_context.shitlist().unshitlistRouter(peer.getRemotePeer(), STYLE); if (SHOULD_FLOOD_PEERS) _flooder.addPeer(peer); @@ -601,7 +615,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority SimpleTimer.getInstance().addEvent(new RemoveDropList(remote), DROPLIST_PERIOD); } } - _context.shitlist().shitlistRouter(peerHash, "Part of the wrong network", STYLE); + markUnreachable(peerHash); + //_context.shitlist().shitlistRouter(peerHash, "Part of the wrong network", STYLE); dropPeer(peerHash, false, "wrong network"); if (_log.shouldLog(Log.WARN)) _log.warn("Dropping the peer " + peerHash.toBase64() + " because they are in the wrong net"); @@ -701,8 +716,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (peer.getRemotePeer() != null) { dropPeerCapacities(peer); - if (shouldShitlist) - _context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries", STYLE); + if (shouldShitlist) { + markUnreachable(peer.getRemotePeer()); + //_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries", STYLE); + } long now = _context.clock().now(); _context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime()); synchronized (_peersByIdent) { @@ -1630,7 +1647,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]"); appended = true; } - if (_context.shitlist().isShitlisted(peer.getRemotePeer())) { + if (_context.shitlist().isShitlisted(peer.getRemotePeer(), STYLE)) { if (!appended) buf.append("
"); buf.append(" [shitlisted]"); appended = true; diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index ce756ad6eb..85aa7d05ef 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -363,6 +363,9 @@ public class FragmentHandler { private void receiveComplete(FragmentedMessage msg) { _completed++; + String stringified = null; + if (_log.shouldLog(Log.DEBUG)) + stringified = msg.toString(); try { int fragmentCount = msg.getFragmentCount(); // toByteArray destroys the contents of the message completely @@ -377,11 +380,13 @@ public class FragmentHandler { noteCompletion(m.getUniqueId()); _receiver.receiveComplete(m, msg.getTargetRouter(), msg.getTargetTunnel()); } catch (IOException ioe) { + if (stringified == null) stringified = msg.toString(); if (_log.shouldLog(Log.ERROR)) - _log.error("Error receiving fragmented message (corrupt?): " + msg, ioe); + _log.error("Error receiving fragmented message (corrupt?): " + stringified, ioe); } catch (I2NPMessageException ime) { + if (stringified == null) stringified = msg.toString(); if (_log.shouldLog(Log.ERROR)) - _log.error("Error receiving fragmented message (corrupt?): " + msg, ime); + _log.error("Error receiving fragmented message (corrupt?): " + stringified, ime); } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java index 6910a8bb5f..5594161ce6 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java @@ -7,13 +7,14 @@ import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.TunnelPoolSettings; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; +import net.i2p.router.peermanager.PeerProfile; import net.i2p.util.Log; /** * Coordinate the selection of peers to go into a tunnel for one particular * pool. */ -abstract class TunnelPeerSelector { +public abstract class TunnelPeerSelector { /** * Which peers should go into the next tunnel for the given settings? * @@ -155,16 +156,20 @@ abstract class TunnelPeerSelector { return rv; } else if (filterSlow(ctx, isInbound, isExploratory)) { Log log = ctx.logManager().getLog(TunnelPeerSelector.class); - String excludeCaps = ctx.getProperty("router.excludePeerCaps", - String.valueOf(Router.CAPABILITY_BW16)); - Set peers = new HashSet(); - if (excludeCaps != null) { - char excl[] = excludeCaps.toCharArray(); + char excl[] = getExcludeCaps(ctx); + Set peers = new HashSet(1); + if (excl != null) { FloodfillNetworkDatabaseFacade fac = (FloodfillNetworkDatabaseFacade)ctx.netDb(); List known = fac.getKnownRouterData(); if (known != null) { for (int i = 0; i < known.size(); i++) { RouterInfo peer = (RouterInfo)known.get(i); + boolean shouldExclude = shouldExclude(ctx, log, peer, excl); + if (shouldExclude) { + peers.add(peer.getIdentity().calculateHash()); + continue; + } + /* String cap = peer.getCapabilities(); if (cap == null) { peers.add(peer.getIdentity().calculateHash()); @@ -247,6 +252,7 @@ abstract class TunnelPeerSelector { continue; } } + */ } } /* @@ -264,6 +270,102 @@ abstract class TunnelPeerSelector { } } + public static boolean shouldExclude(RouterContext ctx, RouterInfo peer) { + Log log = ctx.logManager().getLog(TunnelPeerSelector.class); + return shouldExclude(ctx, log, peer, getExcludeCaps(ctx)); + } + + private static char[] getExcludeCaps(RouterContext ctx) { + String excludeCaps = ctx.getProperty("router.excludePeerCaps", + String.valueOf(Router.CAPABILITY_BW16)); + Set peers = new HashSet(); + if (excludeCaps != null) { + char excl[] = excludeCaps.toCharArray(); + return excl; + } else { + return null; + } + } + + private static final long DONT_EXCLUDE_PERIOD = 15*60*1000; + private static boolean shouldExclude(RouterContext ctx, Log log, RouterInfo peer, char excl[]) { + String cap = peer.getCapabilities(); + if (cap == null) { + return true; + } + for (int j = 0; j < excl.length; j++) { + if (cap.indexOf(excl[j]) >= 0) { + return true; + } + } + int maxLen = 0; + if (cap.indexOf(FloodfillNetworkDatabaseFacade.CAPACITY_FLOODFILL) >= 0) + maxLen++; + if (cap.indexOf(Router.CAPABILITY_REACHABLE) >= 0) + maxLen++; + if (cap.indexOf(Router.CAPABILITY_UNREACHABLE) >= 0) + maxLen++; + if (cap.length() <= maxLen) + return true; + // otherwise, it contains flags we aren't trying to focus on, + // so don't exclude it based on published capacity + + String val = peer.getOption("stat_uptime"); + if (val != null) { + long uptimeMs = 0; + if (val != null) { + long factor = 1; + if (val.endsWith("ms")) { + factor = 1; + val = val.substring(0, val.length()-2); + } else if (val.endsWith("s")) { + factor = 1000l; + val = val.substring(0, val.length()-1); + } else if (val.endsWith("m")) { + factor = 60*1000l; + val = val.substring(0, val.length()-1); + } else if (val.endsWith("h")) { + factor = 60*60*1000l; + val = val.substring(0, val.length()-1); + } else if (val.endsWith("d")) { + factor = 24*60*60*1000l; + val = val.substring(0, val.length()-1); + } + try { uptimeMs = Long.parseLong(val); } catch (NumberFormatException nfe) {} + uptimeMs *= factor; + } else { + // not publishing an uptime, so exclude it + return true; + } + + long infoAge = ctx.clock().now() - peer.getPublished(); + if (infoAge < 0) { + return false; + } else if (infoAge > 5*24*60*60*1000) { + // Only exclude long-unseen peers if we haven't just started up + if (ctx.router().getUptime() < DONT_EXCLUDE_PERIOD) { + if (log.shouldLog(Log.DEBUG)) + log.debug("Not excluding a long-unseen peer, since we just started up."); + return false; + } else { + if (log.shouldLog(Log.DEBUG)) + log.debug("Excluding a long-unseen peer."); + return true; + } + } else { + if ( (infoAge + uptimeMs < 2*60*60*1000) && (ctx.router().getUptime() > DONT_EXCLUDE_PERIOD) ) { + // up for less than 2 hours, so exclude it + return true; + } else { + return false; + } + } + } else { + // not publishing stats, so exclude it + return true; + } + } + private static final String PROP_OUTBOUND_EXPLORATORY_EXCLUDE_UNREACHABLE = "router.outboundExploratoryExcludeUnreachable"; private static final String PROP_OUTBOUND_CLIENT_EXCLUDE_UNREACHABLE = "router.outboundClientExcludeUnreachable"; private static final String PROP_INBOUND_EXPLORATORY_EXCLUDE_UNREACHABLE = "router.inboundExploratoryExcludeUnreachable";