2006-07-26 jrandom

* Every time we create a new router identity, add an entry to the
      new "identlog.txt" text file in the I2P install directory.  For
      debugging purposes, publish the count of how many identities the
      router has cycled through, though not the identities itself.
    * Cleaned up the way the multitransport shitlisting worked, and
      added per-transport shitlists
    * When dropping a router reference locally, first fire a netDb
      lookup for the entry
    * Take the peer selection filters into account when organizing the
      profiles (thanks Complication!)
    * Avoid some obvious configuration errors for the NTCP transport
      (invalid ports, "null" ip, etc)
    * Deal with some small NTCP bugs found in the wild (unresolveable
      hosts, strange network discons, etc)
    * Send our netDb info to peers we have direct NTCP connections to
      after each 6-12 hours of connection uptime
    * Clean up the NTCP reading and writing queue logic to avoid some
      potential delays
    * Allow people to specify the IP that the SSU transport binds on
      locally, via the advanced config "i2np.udp.bindInterface=1.2.3.4"
This commit is contained in:
jrandom
2006-07-26 06:36:18 +00:00
committed by zzz
parent e1c686baa6
commit d4e0f27c56
30 changed files with 819 additions and 170 deletions

View File

@ -182,6 +182,12 @@ public class RouterInfo extends DataStructureImpl {
return (Properties) _options.clone();
}
}
public String getOption(String opt) {
if (_options == null) return null;
synchronized (_options) {
return _options.getProperty(opt);
}
}
/**
* Configure a set of options or statistics that the router can expose
@ -347,7 +353,7 @@ public class RouterInfo extends DataStructureImpl {
String capabilities = getCapabilities();
// Iterate through capabilities, searching for known bandwidth tier
for (int i = 0; i < capabilities.length(); i++) {
if (bwTiers.indexOf(String.valueOf(capabilities.charAt(i))) >= 0) {
if (bwTiers.indexOf(String.valueOf(capabilities.charAt(i))) != -1) {
bwTier = String.valueOf(capabilities.charAt(i));
break;
}

View File

@ -0,0 +1,51 @@
package net.i2p.util;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import net.i2p.I2PAppContext;
class Executor implements Runnable {
private I2PAppContext _context;
private Log _log;
private List _readyEvents;
public Executor(I2PAppContext ctx, Log log, List events) {
_context = ctx;
_readyEvents = events;
}
public void run() {
while (true) {
SimpleTimer.TimedEvent evt = null;
synchronized (_readyEvents) {
if (_readyEvents.size() <= 0)
try { _readyEvents.wait(); } catch (InterruptedException ie) {}
if (_readyEvents.size() > 0)
evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
}
if (evt != null) {
long before = _context.clock().now();
try {
evt.timeReached();
} catch (Throwable t) {
log("wtf, event borked: " + evt, t);
}
long time = _context.clock().now() - before;
if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
_log.warn("wtf, event execution took " + time + ": " + evt);
}
}
}
private void log(String msg, Throwable t) {
synchronized (this) {
if (_log == null)
_log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
}
_log.log(Log.CRIT, msg, t);
}
}

View File

@ -222,43 +222,3 @@ public class SimpleTimer {
}
}
class Executor implements Runnable {
private I2PAppContext _context;
private Log _log;
private List _readyEvents;
public Executor(I2PAppContext ctx, Log log, List events) {
_context = ctx;
_readyEvents = events;
}
public void run() {
while (true) {
SimpleTimer.TimedEvent evt = null;
synchronized (_readyEvents) {
if (_readyEvents.size() <= 0)
try { _readyEvents.wait(); } catch (InterruptedException ie) {}
if (_readyEvents.size() > 0)
evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
}
if (evt != null) {
long before = _context.clock().now();
try {
evt.timeReached();
} catch (Throwable t) {
log("wtf, event borked: " + evt, t);
}
long time = _context.clock().now() - before;
if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
_log.warn("wtf, event execution took " + time + ": " + evt);
}
}
}
private void log(String msg, Throwable t) {
synchronized (this) {
if (_log == null)
_log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
}
_log.log(Log.CRIT, msg, t);
}
}

View File

@ -1,4 +1,26 @@
$Id: history.txt,v 1.494 2006-07-16 12:20:46 complication Exp $
$Id: history.txt,v 1.495 2006-07-18 15:08:00 jrandom Exp $
2006-07-26 jrandom
* Every time we create a new router identity, add an entry to the
new "identlog.txt" text file in the I2P install directory. For
debugging purposes, publish the count of how many identities the
router has cycled through, though not the identities itself.
* Cleaned up the way the multitransport shitlisting worked, and
added per-transport shitlists
* When dropping a router reference locally, first fire a netDb
lookup for the entry
* Take the peer selection filters into account when organizing the
profiles (thanks Complication!)
* Avoid some obvious configuration errors for the NTCP transport
(invalid ports, "null" ip, etc)
* Deal with some small NTCP bugs found in the wild (unresolveable
hosts, strange network discons, etc)
* Send our netDb info to peers we have direct NTCP connections to
after each 6-12 hours of connection uptime
* Clean up the NTCP reading and writing queue logic to avoid some
potential delays
* Allow people to specify the IP that the SSU transport binds on
locally, via the advanced config "i2np.udp.bindInterface=1.2.3.4"
* 2006-07-18 0.6.1.22 released

View File

@ -95,8 +95,18 @@ public class I2NPMessageHandler {
cur++;
_lastReadBegin = System.currentTimeMillis();
I2NPMessage msg = I2NPMessageImpl.createMessage(_context, type);
if (msg == null)
throw new I2NPMessageException("The type "+ type + " is an unknown I2NP message");
if (msg == null) {
int sz = data.length-offset;
boolean allZero = false;
for (int i = offset; i < data.length; i++) {
if (data[i] != 0) {
allZero = false;
break;
}
}
throw new I2NPMessageException("The type "+ type + " is an unknown I2NP message (remaining sz="
+ sz + " all zeros? " + allZero + ")");
}
try {
_lastSize = msg.readBytes(data, type, cur);
cur += _lastSize;

View File

@ -323,6 +323,7 @@ public class Router {
ri.setPublished(_context.clock().now());
Properties stats = _context.statPublisher().publishStatistics();
stats.setProperty(RouterInfo.PROP_NETWORK_ID, NETWORK_ID+"");
ri.setOptions(stats);
ri.setAddresses(_context.commSystem().createAddresses());
@ -444,15 +445,32 @@ public class Router {
"keyBackup/publicSigning.key",
"sessionKeys.dat" };
static final String IDENTLOG = "identlog.txt";
public static void killKeys() {
new Exception("Clearing identity files").printStackTrace();
int remCount = 0;
for (int i = 0; i < _rebuildFiles.length; i++) {
File f = new File(_rebuildFiles[i]);
if (f.exists()) {
boolean removed = f.delete();
if (removed)
if (removed) {
System.out.println("INFO: Removing old identity file: " + _rebuildFiles[i]);
else
remCount++;
} else {
System.out.println("ERROR: Could not remove old identity file: " + _rebuildFiles[i]);
}
}
}
if (remCount > 0) {
FileOutputStream log = null;
try {
log = new FileOutputStream(IDENTLOG, true);
log.write((new Date() + ": Old router identity keys cleared\n").getBytes());
} catch (IOException ioe) {
// ignore
} finally {
if (log != null)
try { log.close(); } catch (IOException ioe) {}
}
}
}

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.433 $ $Date: 2006-07-16 12:20:47 $";
public final static String ID = "$Revision: 1.434 $ $Date: 2006-07-18 15:08:02 $";
public final static String VERSION = "0.6.1.22";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -18,10 +18,10 @@ import net.i2p.router.peermanager.PeerProfile;
import net.i2p.util.Log;
/**
* Manage in memory the routers we are oh so fond of.
* This needs to get a little bit more sophisticated... currently there is no
* way out of the shitlist
*
* Routers are shitlisted only if none of our transports can talk to them
* or their signed router info is completely screwy. Individual transports
* manage their own unreachable lists and do not generally add to the overall
* shitlist.
*/
public class Shitlist {
private Log _log;
@ -43,6 +43,39 @@ public class Shitlist {
_context = context;
_log = context.logManager().getLog(Shitlist.class);
_entries = new HashMap(32);
_context.jobQueue().addJob(new Cleanup(_context));
}
private class Cleanup extends JobImpl {
private List _toUnshitlist;
public Cleanup(RouterContext ctx) {
super(ctx);
_toUnshitlist = new ArrayList(4);
}
public String getName() { return "Cleanup shitlist"; }
public void runJob() {
_toUnshitlist.clear();
long now = getContext().clock().now();
synchronized (_entries) {
for (Iterator iter = _entries.keySet().iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
Entry entry = (Entry)_entries.get(peer);
if (entry.expireOn <= now) {
iter.remove();
_toUnshitlist.add(peer);
}
}
}
for (int i = 0; i < _toUnshitlist.size(); i++) {
Hash peer = (Hash)_toUnshitlist.get(i);
PeerProfile prof = _context.profileOrganizer().getProfile(peer);
if (prof != null)
prof.unshitlist();
_context.messageHistory().unshitlist(peer);
}
requeue(30*1000);
}
}
public int getRouterCount() {
@ -130,7 +163,10 @@ public class Shitlist {
fully = true;
} else {
e.transports.remove(transport);
_entries.put(peer, e);
if (e.transports.size() <= 0)
fully = true;
else
_entries.put(peer, e);
}
}
if (fully) {
@ -188,9 +224,14 @@ public class Shitlist {
}
buf.append("<ul>");
int partial = 0;
for (Iterator iter = entries.keySet().iterator(); iter.hasNext(); ) {
Hash key = (Hash)iter.next();
Entry entry = (Entry)entries.get(key);
if ( (entry.transports != null) && (entry.transports.size() > 0) ) {
partial++;
continue;
}
buf.append("<li><b>").append(key.toBase64()).append("</b>");
buf.append(" <a href=\"netdb.jsp#").append(key.toBase64().substring(0, 6)).append("\">(?)</a>");
buf.append(" expiring in ");
@ -205,6 +246,9 @@ public class Shitlist {
buf.append("</li>\n");
}
buf.append("</ul>\n");
buf.append("<i>Partial shitlisted peers (only blocked on some transports): ");
buf.append(partial);
buf.append("</i>\n");
out.write(buf.toString());
out.flush();
}

View File

@ -8,6 +8,8 @@ package net.i2p.router;
*
*/
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Writer;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
@ -100,7 +102,28 @@ public class StatisticsManager implements Service {
// No longer expose, to make build tracking more expensive
// stats.setProperty("router.id", RouterVersion.ID);
// stats.setProperty("core.id", CoreVersion.ID);
int newlines = 0;
FileInputStream in = null;
try {
in = new FileInputStream(Router.IDENTLOG);
int c = -1;
// perhaps later filter this to only include ident changes this
// day/week/month
while ( (c = in.read()) != -1) {
if (c == '\n')
newlines++;
}
} catch (IOException ioe) {
// ignore
} finally {
if (in != null)
try { in.close(); } catch (IOException ioe) {}
}
if (newlines > 0)
stats.setProperty("stat_identities", newlines+"");
if (_includePeerRankings) {
if (false)
stats.putAll(_context.profileManager().summarizePeers(_publishedStats));

View File

@ -320,7 +320,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
boolean wantACK = true;
int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey());
if (existingTags > 30)
if ( (existingTags > 30) && (getContext().random().nextInt(100) >= 5) )
wantACK = false;
long token = (wantACK ? getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE) : -1);

View File

@ -231,6 +231,29 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
FloodfillPeerSelector sel = (FloodfillPeerSelector)getPeerSelector();
return sel.selectFloodfillParticipants(getKBuckets());
}
protected void lookupBeforeDropping(Hash peer, RouterInfo info) {
// this sends out the search to the floodfill peers even if we already have the
// entry locally, firing no job if it gets a reply with an updated value (meaning
// we shouldn't drop them but instead use the new data), or if they all time out,
// firing the dropLookupFailedJob, which actually removes out local reference
search(peer, null, new DropLookupFailedJob(_context, peer, info), 10*1000, false);
}
private class DropLookupFailedJob extends JobImpl {
private Hash _peer;
private RouterInfo _info;
public DropLookupFailedJob(RouterContext ctx, Hash peer, RouterInfo info) {
super(ctx);
_peer = peer;
_info = info;
}
public String getName() { return "Lookup on failure of netDb peer timed out"; }
public void runJob() {
dropAfterLookupFailed(_peer, _info);
}
}
}
/**

View File

@ -742,12 +742,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
}
}
_context.peerManager().removeCapabilities(dbEntry);
boolean removed = _kb.remove(dbEntry);
if (removed) {
if (_log.shouldLog(Log.INFO))
_log.info("Removed kbucket entry for " + dbEntry);
}
lookupBeforeDropping(dbEntry, (RouterInfo)o);
return;
} else {
// we always drop leaseSets that are failed [timed out],
// regardless of how many routers we have. this is called on a lease if
@ -775,6 +771,30 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
}
}
protected void lookupBeforeDropping(Hash peer, RouterInfo info) {
//bah, humbug.
dropAfterLookupFailed(peer, info);
}
protected void dropAfterLookupFailed(Hash peer, RouterInfo info) {
_context.peerManager().removeCapabilities(peer);
boolean removed = _kb.remove(peer);
if (removed) {
if (_log.shouldLog(Log.INFO))
_log.info("Removed kbucket entry for " + peer);
}
_ds.remove(peer);
synchronized (_lastSent) {
_lastSent.remove(peer);
}
synchronized (_explicitSendKeys) {
_explicitSendKeys.remove(peer);
}
synchronized (_passiveSendKeys) {
_passiveSendKeys.remove(peer);
}
}
public void unpublish(LeaseSet localLeaseSet) {
if (!_initialized) return;
Hash h = localLeaseSet.getDestination().calculateHash();
@ -935,8 +955,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
renderRouterInfo(buf, ri, false);
out.write(buf.toString());
buf.setLength(0);
String coreVersion = ri.getOptions().getProperty("coreVersion");
String routerVersion = ri.getOptions().getProperty("router.version");
String coreVersion = ri.getOption("coreVersion");
String routerVersion = ri.getOption("router.version");
if ( (coreVersion != null) && (routerVersion != null) ) {
Map routerVersions = (Map)versions.get(coreVersion);
if (routerVersions == null) {
@ -1001,7 +1021,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
buf.append("Stats: <br /><i><code>\n");
for (Iterator iter = info.getOptions().keySet().iterator(); iter.hasNext(); ) {
String key = (String)iter.next();
String val = info.getOptions().getProperty(key);
String val = info.getOption(key);
buf.append(DataHelper.stripHTML(key)).append(" = ").append(DataHelper.stripHTML(val)).append("<br />\n");
}
buf.append("</code></i><hr />\n");

View File

@ -841,15 +841,17 @@ class SearchReplyJob extends JobImpl {
if (!sendsBadInfo) {
// we don't need to search for everthing we're given here - only ones that
// are next in our search path...
if (getContext().shitlist().isShitlisted(peer)) {
if (_log.shouldLog(Log.INFO))
_log.info("Not looking for a shitlisted peer...");
getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
} else {
// note: no need to think about shitlisted targets in the netdb search, given
// the floodfill's behavior
//if (getContext().shitlist().isShitlisted(peer)) {
// if (_log.shouldLog(Log.INFO))
// _log.info("Not looking for a shitlisted peer...");
// getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
//} else {
//getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs);
//_repliesPendingVerification++;
shouldAdd = true;
}
//}
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64());

View File

@ -21,6 +21,7 @@ import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.router.RouterContext;
import net.i2p.router.NetworkDatabaseFacade;
import net.i2p.router.tunnel.pool.TunnelPeerSelector;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
@ -813,11 +814,16 @@ public class ProfileOrganizer {
_log.warn("Peer " + peer.toBase64() + " is marked as hidden, disallowing its use");
return false;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Peer " + peer.toBase64() + " is locally known, allowing its use");
// perhaps check to see if they are active?
return true;
boolean exclude = TunnelPeerSelector.shouldExclude(_context, info);
if (exclude) {
if (_log.shouldLog(Log.WARN))
_log.warn("Peer " + peer.toBase64() + " has capabilities or other stats suggesting we avoid it");
return false;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Peer " + peer.toBase64() + " is locally known, allowing its use");
return true;
}
}
} else {
if (_log.shouldLog(Log.WARN))

View File

@ -170,8 +170,15 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
isNew = true;
}
*/
if ( (name == null) || (port == null) )
if ( (name == null) || (port == null) || (name.trim().length() <= 0) || ("null".equals(name)) )
return null;
try {
int p = Integer.parseInt(port);
if ( (p <= 0) || (p > 64*1024) )
return null;
} catch (NumberFormatException nfe) {
return null;
}
props.setProperty(NTCPAddress.PROP_HOST, name);
props.setProperty(NTCPAddress.PROP_PORT, port);
addr.setOptions(props);

View File

@ -12,6 +12,7 @@ import java.io.IOException;
import java.io.Writer;
import java.util.List;
import java.util.Set;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterInfo;
@ -44,4 +45,6 @@ public interface Transport {
public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException;
public short getReachabilityStatus();
public void recheckReachability();
public boolean isUnreachable(Hash peer);
}

View File

@ -10,13 +10,7 @@ package net.i2p.router.transport;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.*;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
@ -24,6 +18,7 @@ import net.i2p.data.RouterIdentity;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.CommSystemFacade;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.MessageSelector;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
@ -39,6 +34,8 @@ public abstract class TransportImpl implements Transport {
private RouterAddress _currentAddress;
private List _sendPool;
protected RouterContext _context;
/** map from routerIdentHash to timestamp (Long) that the peer was last unreachable */
private Map _unreachableEntries;
/**
* Initialize the new transport
@ -56,6 +53,7 @@ public abstract class TransportImpl implements Transport {
_context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
_sendPool = new ArrayList(16);
_unreachableEntries = new HashMap(16);
_currentAddress = null;
}
@ -374,6 +372,54 @@ public abstract class TransportImpl implements Transport {
public RouterContext getContext() { return _context; }
public short getReachabilityStatus() { return CommSystemFacade.STATUS_UNKNOWN; }
public void recheckReachability() {}
private static final long UNREACHABLE_PERIOD = 5*60*1000;
public boolean isUnreachable(Hash peer) {
long now = _context.clock().now();
synchronized (_unreachableEntries) {
Long when = (Long)_unreachableEntries.get(peer);
if (when == null) return false;
if (when.longValue() + UNREACHABLE_PERIOD < now) {
_unreachableEntries.remove(peer);
return false;
} else {
return true;
}
}
}
/** called when we can't reach a peer */
public void markUnreachable(Hash peer) {
long now = _context.clock().now();
synchronized (_unreachableEntries) {
_unreachableEntries.put(peer, new Long(now));
}
}
/** called when we establish a peer connection (outbound or inbound) */
public void markReachable(Hash peer) {
// if *some* transport can reach them, then we shouldn't shitlist 'em
_context.shitlist().unshitlistRouter(peer);
synchronized (_unreachableEntries) {
_unreachableEntries.remove(peer);
}
}
private class CleanupUnreachable extends JobImpl {
public CleanupUnreachable(RouterContext ctx) {
super(ctx);
}
public String getName() { return "Cleanup " + getStyle() + " unreachable list"; }
public void runJob() {
long now = getContext().clock().now();
synchronized (_unreachableEntries) {
for (Iterator iter = _unreachableEntries.keySet().iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
Long when = (Long)_unreachableEntries.get(peer);
if (when.longValue() + UNREACHABLE_PERIOD < now)
iter.remove();
}
}
requeue(60*1000);
}
}
public static boolean isPubliclyRoutable(byte addr[]) {
if (addr.length == 4) {

View File

@ -211,10 +211,16 @@ public class TransportManager implements TransportEventListener {
}
public TransportBid getNextBid(OutNetMessage msg) {
int unreachableTransports = 0;
Hash peer = msg.getTarget().getIdentity().calculateHash();
Set failedTransports = msg.getFailedTransports();
TransportBid rv = null;
for (int i = 0; i < _transports.size(); i++) {
Transport t = (Transport)_transports.get(i);
if (t.isUnreachable(peer)) {
unreachableTransports++;
continue;
}
if (failedTransports.contains(t.getStyle())) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Skipping transport " + t.getStyle() + " as it already failed");
@ -235,6 +241,8 @@ public class TransportManager implements TransportEventListener {
_log.debug("Transport " + t.getStyle() + " did not produce a bid");
}
}
if (unreachableTransports >= _transports.size())
_context.shitlist().shitlistRouter(peer, "Unreachable on any transport");
return rv;
}

View File

@ -141,6 +141,7 @@ public class EstablishState {
//if (_log.shouldLog(Log.DEBUG)) _log.debug("recv x" + (int)c + " received=" + _received);
if (_received >= _X.length) {
if (isCheckInfo(_context, _context.routerHash(), _X)) {
_context.statManager().addRateData("ntcp.inboundCheckConnection", 1, 0);
fail("Incoming connection was a check connection");
return;
}
@ -170,6 +171,7 @@ public class EstablishState {
_log.debug(prefix()+"xor=" + Base64.encode(realXor));
}
if (!DataHelper.eq(realXor, _hX_xor_bobIdentHash)) {
_context.statManager().addRateData("ntcp.invalidHXxorBIH", 1, 0);
fail("Invalid hX_xor");
return;
}
@ -217,6 +219,7 @@ public class EstablishState {
_transport.getPumper().wantsWrite(_con, write);
if (!src.hasRemaining()) return;
} catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
_context.statManager().addRateData("ntcp.invalidDH", 1, 0);
fail("Invalid X", e);
return;
}
@ -306,6 +309,7 @@ public class EstablishState {
_log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")");
_e_hXY_tsB = new byte[Hash.HASH_LENGTH+4+12];
} catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
_context.statManager().addRateData("ntcp.invalidDH", 1, 0);
fail("Invalid X", e);
return;
}
@ -328,6 +332,7 @@ public class EstablishState {
Hash h = _context.sha().calculateHash(XY);
if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "h(XY)=" + h.toBase64());
if (!DataHelper.eq(h.getData(), 0, hXY_tsB, 0, Hash.HASH_LENGTH)) {
_context.statManager().addRateData("ntcp.invalidHXY", 1, 0);
fail("Invalid H(X+Y) - mitm attack attempted?");
return;
}
@ -421,6 +426,7 @@ public class EstablishState {
_verified = _context.dsa().verifySignature(sig, toVerify, _con.getRemotePeer().getSigningPublicKey());
if (!_verified) {
_context.statManager().addRateData("ntcp.invalidSignature", 1, 0);
fail("Signature was invalid - attempt to spoof " + _con.getRemotePeer().calculateHash().toBase64() + "?");
return;
} else {
@ -478,6 +484,7 @@ public class EstablishState {
RouterIdentity alice = new RouterIdentity();
int sz = (int)DataHelper.fromLong(b, 0, 2);
if ( (sz <= 0) || (sz > b.length-2-4-Signature.SIGNATURE_BYTES) ) {
_context.statManager().addRateData("ntcp.invalidInboundSize", sz, 0);
fail("size is invalid", new Exception("size is " + sz));
return;
}
@ -488,6 +495,7 @@ public class EstablishState {
long diff = 1000*Math.abs(tsA-_tsB);
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0);
fail("Clocks too skewed (" + diff + ")");
return;
} else if (_log.shouldLog(Log.DEBUG)) {
@ -525,11 +533,14 @@ public class EstablishState {
if (_log.shouldLog(Log.INFO))
_log.info(prefix()+"Verified remote peer as " + alice.calculateHash().toBase64());
} else {
_context.statManager().addRateData("ntcp.invalidInboundSignature", 1, 0);
fail("Peer verification failed - spoof of " + alice.calculateHash().toBase64() + "?");
}
} catch (IOException ioe) {
_context.statManager().addRateData("ntcp.invalidInboundIOE", 1, 0);
fail("Error verifying peer", ioe);
} catch (DataFormatException dfe) {
_context.statManager().addRateData("ntcp.invalidInboundDFE", 1, 0);
fail("Error verifying peer", dfe);
}
}

View File

@ -36,7 +36,7 @@ public class EventPumper implements Runnable {
* every 30s or so, iterate across all ntcp connections just to make sure
* we have their interestOps set properly (and to expire any looong idle cons)
*/
private static final long FAILSAFE_ITERATION_FREQ = 60*1000l;
private static final long FAILSAFE_ITERATION_FREQ = 30*1000l;
public EventPumper(RouterContext ctx, NTCPTransport transport) {
_context = ctx;
@ -75,6 +75,7 @@ public class EventPumper implements Runnable {
}
public void registerConnect(NTCPConnection con) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering outbound connection");
_context.statManager().addRateData("ntcp.registerConnect", 1, 0);
synchronized (_wantsConRegister) { _wantsConRegister.add(con); }
_selector.wakeup();
}
@ -212,8 +213,10 @@ public class EventPumper implements Runnable {
+ "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0)
+ " write? " + write
+ "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0)
+ " on " + key.attachment()
);
if (accept) {
_context.statManager().addRateData("ntcp.accept", 1, 0);
processAccept(key);
}
if (connect) {
@ -221,10 +224,12 @@ public class EventPumper implements Runnable {
processConnect(key);
}
if (read) {
_context.statManager().addRateData("ntcp.read", 1, 0);
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
processRead(key);
}
if (write) {
_context.statManager().addRateData("ntcp.write", 1, 0);
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
processWrite(key);
}
@ -241,6 +246,7 @@ public class EventPumper implements Runnable {
if (req.getPendingOutboundRequested() > 0) {
if (_log.shouldLog(Log.INFO))
_log.info("queued write on " + con + " for " + data.length);
_context.statManager().addRateData("ntcp.wantsQueuedWrite", 1, 0);
con.queuedWrite(buf, req);
} else {
// fully allocated
@ -290,7 +296,8 @@ public class EventPumper implements Runnable {
rv = ByteBuffer.allocate(BUF_SIZE);
NUM_BUFS = ++__liveBufs;
if (_log.shouldLog(Log.DEBUG))
_log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
_log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
_context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("acquiring existing read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
@ -351,11 +358,15 @@ public class EventPumper implements Runnable {
if (connected) {
con.setKey(key);
con.outboundConnected();
_context.statManager().addRateData("ntcp.connectSuccessful", 1, 0);
} else {
con.close();
_context.statManager().addRateData("ntcp.connectFailedTimeout", 1, 0);
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Error processing connection", ioe);
con.close();
_context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1, 0);
} catch (NoConnectionPendingException ncpe) {
// ignore
}
@ -368,9 +379,12 @@ public class EventPumper implements Runnable {
int read = con.getChannel().read(buf);
if (read == -1) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con);
_context.statManager().addRateData("ntcp.readEOF", 1, 0);
con.close();
releaseBuf(buf);
} else if (read == 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("nothing to read for " + con + ", but stay interested");
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
releaseBuf(buf);
} else if (read > 0) {
@ -382,9 +396,14 @@ public class EventPumper implements Runnable {
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read", null, null); //con, buf);
if (req.getPendingInboundRequested() > 0) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
if (_log.shouldLog(Log.DEBUG))
_log.debug("bw throttled reading for " + con + ", so we don't want to read anymore");
_context.statManager().addRateData("ntcp.queuedRecv", read, 0);
con.queuedRecv(rbuf, req);
} else {
// fully allocated
if (_log.shouldLog(Log.DEBUG))
_log.debug("not bw throttled reading for " + con);
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
con.recv(rbuf);
}
@ -392,6 +411,7 @@ public class EventPumper implements Runnable {
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe);
con.close();
_context.statManager().addRateData("ntcp.readError", 1, 0);
if (buf != null) releaseBuf(buf);
} catch (NotYetConnectedException nyce) {
// ???
@ -406,9 +426,19 @@ public class EventPumper implements Runnable {
try {
while (true) {
ByteBuffer buf = con.getNextWriteBuf();
if ( (buf != null) && (buf.remaining() > 0) ) {
if (buf != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("writing " + buf.remaining()+"...");
if (buf.remaining() <= 0) {
long beforeRem = System.currentTimeMillis();
con.removeWriteBuf(buf);
long afterRem = System.currentTimeMillis();
if (_log.shouldLog(Log.DEBUG))
_log.debug("buffer was already fully written and removed after " + (afterRem-beforeRem) + "...");
buf = null;
buffers++;
continue;
}
int written = con.getChannel().write(buf);
totalWritten += written;
if (written == 0) {
@ -441,6 +471,7 @@ public class EventPumper implements Runnable {
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe);
_context.statManager().addRateData("ntcp.writeError", 1, 0);
con.close();
}
long after = System.currentTimeMillis();
@ -512,18 +543,32 @@ public class EventPumper implements Runnable {
InetSocketAddress saddr = new InetSocketAddress(naddr.getHost(), naddr.getPort());
boolean connected = con.getChannel().connect(saddr);
if (connected) {
_context.statManager().addRateData("ntcp.connectImmediate", 1, 0);
key.interestOps(SelectionKey.OP_READ);
processConnect(key);
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) _log.warn("error connecting", ioe);
if (ntcpOnly(con)) {
_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage());
con.close(false);
} else {
_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE);
_context.statManager().addRateData("ntcp.connectFailedIOE", 1, 0);
_transport.markUnreachable(con.getRemotePeer().calculateHash());
//if (ntcpOnly(con)) {
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage());
// con.close(false);
//} else {
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE);
con.close(true);
}
//}
} catch (UnresolvedAddressException uae) {
if (_log.shouldLog(Log.WARN)) _log.warn("unresolved address connecting", uae);
_context.statManager().addRateData("ntcp.connectFailedUnresolved", 1, 0);
_transport.markUnreachable(con.getRemotePeer().calculateHash());
//if (ntcpOnly(con)) {
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage());
// con.close(false);
//} else {
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage(), NTCPTransport.STYLE);
con.close(true);
//}
}
} catch (ClosedChannelException cce) {
if (_log.shouldLog(Log.WARN)) _log.warn("Error registering", cce);

View File

@ -96,8 +96,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private boolean _sendingMeta;
/** how many consecutive sends were failed due to (estimated) send queue time */
private int _consecutiveBacklog;
private long _nextInfoTime;
private static final int META_FREQUENCY = 10*60*1000;
private static final int INFO_FREQUENCY = 6*60*60*1000;
/**
* Create an inbound connected (though not established) NTCP connection
@ -181,6 +183,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_transport.inboundEstablished(this);
_establishState = null;
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
_nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY);
}
public long getClockSkew() { return _clockSkew; }
public long getUptime() {
@ -199,8 +202,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
return queued;
}
}
public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; }
public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; }
public long getTimeSinceSend() { return _lastSendTime <= 0 ? 0 : System.currentTimeMillis()-_lastSendTime; }
public long getTimeSinceReceive() { return _lastReceiveTime <= 0 ? 0 : System.currentTimeMillis()-_lastReceiveTime; }
public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; }
public int getConsecutiveBacklog() { return _consecutiveBacklog; }
@ -253,22 +256,26 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
synchronized (_writeBufs) { blocks = _writeBufs.size(); }
if (_log.shouldLog(Log.ERROR))
_log.error("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64());
_context.statManager().addRateData("ntcp.closeOnBacklog", _consecutiveBacklog, getUptime());
close();
}
_context.statManager().addRateData("ntcp.dontSendOnBacklog", _consecutiveBacklog, msg.getLifetime());
return;
}
_consecutiveBacklog = 0;
int enqueued = 0;
if (FAST_LARGE)
bufferedPrepare(msg);
boolean noOutbound = false;
synchronized (_outbound) {
_outbound.add(msg);
enqueued = _outbound.size();
msg.setQueueSize(enqueued);
noOutbound = (_currentOutbound == null);
}
if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
if (_established && _currentOutbound == null)
_transport.getWriter().wantsWrite(this);
if (_established && noOutbound)
_transport.getWriter().wantsWrite(this, "enqueued");
}
private long queueTime() {
@ -289,8 +296,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
long queueTime = queueTime();
if (queueTime <= 0) return false;
int size = 0;
boolean currentOutboundSet = false;
synchronized (_outbound) {
size = _outbound.size();
currentOutboundSet = (_currentOutbound != null);
}
// perhaps we could take into account the size of the queued messages too, our
@ -299,8 +308,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (getUptime() < 10*1000) // allow some slack just after establishment
return false;
if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime...
int writeBufs = 0;
synchronized (_writeBufs) { writeBufs = _writeBufs.size(); }
if (_log.shouldLog(Log.WARN))
_log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size);
_log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
+ ", currentOut set? " + currentOutboundSet
+ ", writeBufs: " + writeBufs + " on " + toString());
_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime, size);
return true;
//} else if (size > 32) { // another arbitrary limit.
@ -324,6 +338,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (target != null) {
infoMsg.setTarget(target);
infoMsg.beginSend();
_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1, 0);
send(infoMsg);
} else {
if (_isInbound) {
@ -351,16 +366,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_established = true;
_establishedOn = System.currentTimeMillis();
_establishState = null;
_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash());
_transport.markReachable(getRemotePeer().calculateHash());
//_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE);
boolean msgs = false;
synchronized (_outbound) {
msgs = (_outbound.size() > 0);
}
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
_nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY);
if (msgs)
_transport.getWriter().wantsWrite(this);
_transport.getWriter().wantsWrite(this, "outbound established");
}
public boolean getIsInbound() { return _isInbound; }
// Time vs space tradeoff:
// on slow GCing jvms, the mallocs in the following preparation can cause the
// write to get congested, taking up a substantial portion of the Writer's
@ -474,6 +493,14 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// + Base64.encode(encrypted, 0, 16) + "...\ndecrypted: "
// + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16));
_transport.getPumper().wantsWrite(this, encrypted);
// for every 6-12 hours that we are connected to a peer, send them
// our updated netDb info (they may not accept it and instead query
// the floodfill netDb servers, but they may...)
if (_nextInfoTime <= System.currentTimeMillis()) {
enqueueInfoMessage();
_nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY);
}
}
/**
@ -537,6 +564,15 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
+ " encrypted=" + (encryptedTime-begin)
+ " wantsWrite=" + (wantsTime-encryptedTime)
+ " releaseBuf=" + (releaseTime-wantsTime));
// for every 6-12 hours that we are connected to a peer, send them
// our updated netDb info (they may not accept it and instead query
// the floodfill netDb servers, but they may...)
if (_nextInfoTime <= System.currentTimeMillis()) {
// perhaps this should check to see if we are bw throttled, etc?
enqueueInfoMessage();
_nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY);
}
}
/**
@ -608,6 +644,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
NUM_PREP_BUFS = ++__liveBufs;
if (_log.shouldLog(Log.DEBUG))
_log.debug("creating a new prep buffer with " + __liveBufs + " live");
_context.statManager().addRateData("ntcp.prepBufCache", NUM_PREP_BUFS, 0);
b.acquired();
return b;
}
@ -675,16 +712,24 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_conKey.interestOps(SelectionKey.OP_READ);
// schedule up the beginning of our handshaking by calling prepareNextWrite on the
// writer thread pool
_transport.getWriter().wantsWrite(this);
_transport.getWriter().wantsWrite(this, "outbound connected");
}
public void complete(FIFOBandwidthLimiter.Request req) {
removeRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment();
if (req.getTotalInboundRequested() > 0)
if (req.getTotalInboundRequested() > 0) {
_context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime()), 0);
recv(buf);
else if (req.getTotalOutboundRequested() > 0)
// our reads used to be bw throttled (during which time we were no
// longer interested in reading from the network), but we aren't
// throttled anymore, so we should resume being interested in reading
_transport.getPumper().wantsRead(this);
//_transport.getReader().wantsRead(this);
} else if (req.getTotalOutboundRequested() > 0) {
_context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()), 0);
write(buf);
}
}
private void removeRequest(FIFOBandwidthLimiter.Request req) {
synchronized (_bwRequests) { _bwRequests.remove(req); }
@ -739,6 +784,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
synchronized (_writeBufs) {
_writeBufs.add(buf);
}
if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)");
_transport.getPumper().wantsWrite(this);
}
@ -768,30 +814,38 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
public void removeWriteBuf(ByteBuffer buf) {
_bytesSent += buf.capacity();
OutNetMessage msg = null;
boolean bufsRemain = false;
boolean clearMessage = false;
synchronized (_writeBufs) {
if (_sendingMeta && (buf.capacity() == _meta.length)) {
_sendingMeta = false;
} else {
if (_currentOutbound != null)
clearMessage = true;
}
_writeBufs.remove(buf);
bufsRemain = _writeBufs.size() > 0;
}
if (clearMessage) {
synchronized (_outbound) {
if (_currentOutbound != null)
msg = _currentOutbound;
_currentOutbound = null;
}
_writeBufs.remove(buf);
}
if (msg != null) {
_lastSendTime = System.currentTimeMillis();
_context.statManager().addRateData("ntcp.sendTime", msg.getSendTime(), msg.getSendTime());
_context.statManager().addRateData("ntcp.transmitTime", msg.getTransmissionTime(), msg.getTransmissionTime());
_context.statManager().addRateData("ntcp.sendQueueSize", msg.getQueueSize(), msg.getLifetime());
if (_log.shouldLog(Log.INFO)) {
_log.info("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after "
+ msg.getSendTime() + "/" + msg.getTransmissionTime() + "/"
+ msg.getPreparationTime() + "/" + msg.getLifetime()
+ " queued after " + msg.getQueueSize()
+ " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+")");
if (msg != null) {
_lastSendTime = System.currentTimeMillis();
_context.statManager().addRateData("ntcp.sendTime", msg.getSendTime(), msg.getSendTime());
_context.statManager().addRateData("ntcp.transmitTime", msg.getTransmissionTime(), msg.getTransmissionTime());
_context.statManager().addRateData("ntcp.sendQueueSize", msg.getQueueSize(), msg.getLifetime());
if (_log.shouldLog(Log.INFO)) {
_log.info("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after "
+ msg.getSendTime() + "/" + msg.getTransmissionTime() + "/"
+ msg.getPreparationTime() + "/" + msg.getLifetime()
+ " queued after " + msg.getQueueSize()
+ " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+" on " + toString() + ")");
}
_messagesWritten++;
_transport.sendComplete(msg);
}
_messagesWritten++;
_transport.sendComplete(msg);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("I2NP meta message sent completely");
@ -801,8 +855,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
synchronized (_outbound) {
msgs = ((_outbound.size() > 0) || (_currentOutbound != null));
}
if (msgs)
_transport.getWriter().wantsWrite(this);
if (msgs) // push through the bw limiter to reach _writeBufs
_transport.getWriter().wantsWrite(this, "write completed");
if (bufsRemain) // send asap
_transport.getPumper().wantsWrite(this);
updateStats();
}
@ -879,6 +935,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (!ok) {
if (_log.shouldLog(Log.ERROR))
_log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data");
_context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1, getUptime());
return;
}
byte swap[] = _prevReadBlock;
@ -895,6 +952,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (_curReadState.getSize() > 16*1024) {
if (_log.shouldLog(Log.ERROR))
_log.error("i2np message more than 16KB? nuh uh: " + _curReadState.getSize());
_context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize(), getUptime());
close();
return false;
} else {
@ -922,6 +980,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (read != expected) {
if (_log.shouldLog(Log.WARN))
_log.warn("I2NP metadata message had a bad CRC value");
_context.statManager().addRateData("ntcp.corruptMetaCRC", 1, getUptime());
close();
return;
} else {
@ -929,9 +988,11 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if ( (newSkew > Router.CLOCK_FUDGE_FACTOR) || (newSkew < 0-Router.CLOCK_FUDGE_FACTOR) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Peer's skew jumped too far (from " + _clockSkew + " to " + newSkew + "): " + toString());
_context.statManager().addRateData("ntcp.corruptSkew", newSkew, getUptime());
close();
return;
}
_context.statManager().addRateData("ntcp.receiveMeta", newSkew, getUptime());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received NTCP metadata, old skew of " + _clockSkew + ", new skew of " + newSkew);
_clockSkew = newSkew;
@ -955,6 +1016,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_log.debug("Sending NTCP metadata");
_sendingMeta = true;
_transport.getPumper().wantsWrite(this, encrypted);
// enqueueInfoMessage(); // this often?
}
public int hashCode() { return System.identityHashCode(this); }
@ -1118,7 +1180,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
releaseHandler(h);
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0)
+ " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes");
+ " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes on " + toString());
_context.statManager().addRateData("ntcp.receiveTime", timeToRecv, timeToRecv);
_context.statManager().addRateData("ntcp.receiveSize", _size, timeToRecv);
if (read != null) {
@ -1133,17 +1195,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error parsing I2NP message", ioe);
_context.statManager().addRateData("ntcp.corruptI2NPIOE", 1, getUptime());
close();
return;
} catch (I2NPMessageException ime) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error parsing I2NP message", ime);
_context.statManager().addRateData("ntcp.corruptI2NPIME", 1, getUptime());
close();
return;
}
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks);
_context.statManager().addRateData("ntcp.corruptI2NPCRC", 1, getUptime());
close();
return;
}

View File

@ -63,6 +63,58 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().createRateStat("ntcp.failsafeWrites", "How many times do we need to proactively add in an extra nio write to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.failsafeCloses", "How many times do we need to proactively close an idle connection to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.accept", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.attemptShitlistedPeer", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.attemptUnreachablePeer", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.closeOnBacklog", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.connectFailedIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.connectFailedNoNTCPAddress", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.connectFailedTimeout", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.connectFailedTimeoutIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.connectFailedUnresolved", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.connectImmediate", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.connectSuccessful", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.corruptDecryptedI2NP", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.corruptI2NPCRC", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.corruptI2NPIME", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.corruptI2NPIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.corruptMetaCRC", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.corruptSkew", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.corruptTooLargeI2NP", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.dontSendOnBacklog", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.inboundCheckConnection", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.inboundEstablished", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.inboundEstablishedDuplicate", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.infoMessageEnqueued", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.invalidDH", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.invalidHXY", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.invalidHXxorBIH", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.invalidInboundDFE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.invalidInboundIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.invalidInboundSignature", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.invalidInboundSize", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.invalidInboundSkew", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.invalidSignature", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.liveReadBufs", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.multipleCloseOnRemove", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.outboundEstablishFailed", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.outboundFailedIOEImmediate", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.prepBufCache", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.read", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.readError", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.receiveCorruptEstablishment", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.receiveMeta", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.registerConnect", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.throttledReadComplete", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.throttledWriteComplete", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.wantsQueuedWrite", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.write", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("ntcp.writeError", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
_establishing = new ArrayList(4);
_conLock = new Object();
_conByIdent = new HashMap(64);
@ -79,7 +131,9 @@ public class NTCPTransport extends TransportImpl {
}
void inboundEstablished(NTCPConnection con) {
_context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash());
_context.statManager().addRateData("ntcp.inboundEstablished", 1, 0);
markReachable(con.getRemotePeer().calculateHash());
//_context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash());
NTCPConnection old = null;
synchronized (_conLock) {
old = (NTCPConnection)_conByIdent.put(con.getRemotePeer().calculateHash(), con);
@ -87,6 +141,7 @@ public class NTCPTransport extends TransportImpl {
if (old != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Old connection closed: " + old + " replaced by " + con);
_context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", old.getUptime(), 0);
old.close();
}
}
@ -127,6 +182,7 @@ public class NTCPTransport extends TransportImpl {
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error opening a channel", ioe);
_context.statManager().addRateData("ntcp.outboundFailedIOEImmediate", 1, 0);
con.close();
}
} else {
@ -152,8 +208,8 @@ public class NTCPTransport extends TransportImpl {
old = (NTCPConnection)_conByIdent.put(ih, con);
}
if (old != null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Multiple connections on out ready, closing " + old + " and keeping " + con);
if (_log.shouldLog(Log.WARN))
_log.warn("Multiple connections on out ready, closing " + old + " and keeping " + con);
old.close();
}
con.enqueueInfoMessage(); // enqueues a netDb store of our own info
@ -181,23 +237,50 @@ public class NTCPTransport extends TransportImpl {
super.afterSend(msg, sendSuccessful, allowRequeue, msToSend);
}
public TransportBid bid(RouterInfo toAddress, long dataSize) {
if (_context.shitlist().isShitlisted(toAddress.getIdentity().calculateHash(), STYLE)) {
Hash peer = toAddress.getIdentity().calculateHash();
if (_context.shitlist().isShitlisted(peer, STYLE)) {
// we aren't shitlisted in general (since we are trying to get a bid), but we have
// recently shitlisted the peer on the NTCP transport, so don't try it
_context.statManager().addRateData("ntcp.attemptShitlistedPeer", 1, 0);
return null;
} else if (isUnreachable(peer)) {
_context.statManager().addRateData("ntcp.attemptUnreachablePeer", 1, 0);
return null;
}
boolean established = isEstablished(toAddress.getIdentity());
if (established) { // should we check the queue size? nah, if its valid, use it
if (_log.shouldLog(Log.DEBUG))
_log.debug("fast bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + " as its already established");
return _fastBid;
}
RouterAddress addr = toAddress.getTargetAddress(STYLE);
if (addr == null) {
markUnreachable(peer);
_context.statManager().addRateData("ntcp.connectFailedNoNTCPAddress", 1, 0);
//_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE);
if (_log.shouldLog(Log.DEBUG))
_log.debug("no bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + " as they don't have an ntcp address");
return null;
}
NTCPAddress naddr = new NTCPAddress(addr);
if ( (naddr.getPort() <= 0) || (naddr.getHost() == null) ) {
_context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1, 0);
markUnreachable(peer);
//_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE);
if (_log.shouldLog(Log.DEBUG))
_log.debug("no bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + " as they don't have a valid ntcp address");
return null;
}
//if ( (_myAddress != null) && (_myAddress.equals(addr)) )
// return null; // dont talk to yourself
boolean established = isEstablished(toAddress.getIdentity());
if (established) // should we check the queue size? nah, if its valid, use it
return _fastBid;
else if (addr != null)
return _slowBid;
else
return null;
if (_log.shouldLog(Log.DEBUG))
_log.debug("slow bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64());
return _slowBid;
}
void sendComplete(OutNetMessage msg) { _finisher.add(msg); }
@ -239,6 +322,7 @@ public class NTCPTransport extends TransportImpl {
if ( (removed != null) && (removed != con) ) {// multiple cons, close 'em both
if (_log.shouldLog(Log.ERROR))
_log.error("Multiple connections on remove, closing " + removed + " (already closed " + con + ")");
_context.statManager().addRateData("ntcp.multipleCloseOnRemove", removed.getUptime(), 0);
removed.close();
}
}
@ -349,6 +433,8 @@ public class NTCPTransport extends TransportImpl {
}
for (int i = 0; expired != null && i < expired.size(); i++)
((NTCPConnection)expired.get(i)).close();
if ( (expired != null) && (expired.size() > 0) )
_context.statManager().addRateData("ntcp.outboundEstablishFailed", expired.size(), 0);
}
//private boolean bindAllInterfaces() { return true; }
@ -360,10 +446,17 @@ public class NTCPTransport extends TransportImpl {
} else {
RouterAddress ra = CommSystemFacadeImpl.createNTCPAddress(ctx);
if (ra != null) {
_myAddress = new NTCPAddress(ra);
replaceAddress(ra);
if (_log.shouldLog(Log.INFO))
_log.info("NTCP address configured: " + _myAddress);
NTCPAddress addr = new NTCPAddress(ra);
if (addr.getPort() <= 0) {
_myAddress = null;
if (_log.shouldLog(Log.ERROR))
_log.error("NTCP address is outbound only, since the NTCP configuration is invalid");
} else {
_myAddress = addr;
replaceAddress(ra);
if (_log.shouldLog(Log.INFO))
_log.info("NTCP address configured: " + _myAddress);
}
} else {
if (_log.shouldLog(Log.INFO))
_log.info("NTCP address is outbound only");
@ -404,11 +497,17 @@ public class NTCPTransport extends TransportImpl {
int numPeers = 0;
int readingPeers = 0;
int writingPeers = 0;
float bpsSend = 0;
float bpsRecv = 0;
long totalUptime = 0;
long totalSend = 0;
long totalRecv = 0;
StringBuffer buf = new StringBuffer(512);
buf.append("<b id=\"ntcpcon\">NTCP connections: ").append(peers.size()).append("</b><br />\n");
buf.append("<table border=\"1\">\n");
buf.append(" <tr><td><b>peer</b></td>");
buf.append(" <td><b>dir</b></td>");
buf.append(" <td><b>uptime</b></td>");
buf.append(" <td><b>idle</b></td>");
buf.append(" <td><b>sent</b></td>");
@ -424,13 +523,34 @@ public class NTCPTransport extends TransportImpl {
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
NTCPConnection con = (NTCPConnection)iter.next();
buf.append("<tr><td>").append(con.getRemotePeer().calculateHash().toBase64().substring(0,8));
buf.append("</td><td>");
if (con.getIsInbound())
buf.append("in");
else
buf.append("out");
buf.append("</td><td>").append(DataHelper.formatDuration(con.getUptime()));
buf.append("</td><td>").append(DataHelper.formatDuration(con.getTimeSinceSend()));
buf.append("/").append(DataHelper.formatDuration(con.getTimeSinceReceive()));
buf.append("</td><td>").append(con.getMessagesSent());
totalUptime += con.getUptime();
buf.append("</td><td>").append(con.getTimeSinceSend()/1000);
buf.append("s/").append(con.getTimeSinceReceive()/1000);
buf.append("s</td><td>").append(con.getMessagesSent());
totalSend += con.getMessagesSent();
buf.append("</td><td>").append(con.getMessagesReceived());
buf.append("</td><td>").append(formatRate(con.getSendRate()/1024));
buf.append("/").append(formatRate(con.getRecvRate()/1024)).append("KBps");
totalRecv += con.getMessagesReceived();
buf.append("</td><td>");
if (con.getTimeSinceSend() < 10*1000) {
buf.append(formatRate(con.getSendRate()/1024));
bpsSend += con.getSendRate();
} else {
buf.append(formatRate(0));
}
buf.append("/");
if (con.getTimeSinceReceive() < 10*1000) {
buf.append(formatRate(con.getRecvRate()/1024));
bpsRecv += con.getRecvRate();
} else {
buf.append(formatRate(0));
}
buf.append("KBps");
long outQueue = con.getOutboundQueueSize();
if (outQueue <= 0) {
buf.append("</td><td>No messages");
@ -454,6 +574,15 @@ public class NTCPTransport extends TransportImpl {
buf.setLength(0);
}
if (peers.size() > 0) {
buf.append("<tr><td colspan=\"11\"><hr /></td></tr>\n");
buf.append("<tr><td>").append(peers.size()).append(" peers</td><td>&nbsp;</td><td>").append(DataHelper.formatDuration(totalUptime/peers.size()));
buf.append("</td><td>&nbsp;</td><td>").append(totalSend).append("</td><td>").append(totalRecv);
buf.append("</td><td>").append(formatRate(bpsSend/1024)).append("/").append(formatRate(bpsRecv/1024)).append("KBps");
buf.append("</td><td>&nbsp;</td><td>&nbsp;</td><td>&nbsp;</td><td>&nbsp;</td>");
buf.append("</tr>\n");
}
buf.append("</table>\n");
buf.append("Peers currently reading I2NP messages: ").append(readingPeers).append("<br />\n");
buf.append("Peers currently writing I2NP messages: ").append(writingPeers).append("<br />\n");

View File

@ -148,6 +148,7 @@ class Reader {
if (est.isCorrupt()) {
if (_log.shouldLog(Log.WARN))
_log.warn("closing connection on establishment because: " +est.getError(), est.getException());
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1, 0);
con.close();
return;
} else if (buf.remaining() <= 0) {

View File

@ -48,10 +48,11 @@ class Writer {
}
}
public void wantsWrite(NTCPConnection con) {
public void wantsWrite(NTCPConnection con, String source) {
//if (con.getCurrentOutbound() != null)
// throw new RuntimeException("Current outbound message already in play on " + con);
boolean already = false;
boolean pending = false;
synchronized (_pendingConnections) {
if (_liveWrites.contains(con)) {
if (!_writeAfterLive.contains(con)) {
@ -60,11 +61,12 @@ class Writer {
already = true;
} else if (!_pendingConnections.contains(con)) {
_pendingConnections.add(con);
pending = true;
}
_pendingConnections.notifyAll();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("wantsWrite: " + con + " already live? " + already);
_log.debug("wantsWrite: " + con + " already live? " + already + " added to pending? " + pending + ": " + source);
}
public void connectionClosed(NTCPConnection con) {
synchronized (_pendingConnections) {
@ -87,20 +89,28 @@ class Writer {
boolean keepWriting = (con != null) && _writeAfterLive.remove(con);
if (keepWriting) {
// keep on writing the same one
if (_log.shouldLog(Log.DEBUG))
_log.debug("Keep writing on the same connection: " + con);
} else {
_liveWrites.remove(con);
con = null;
if (_pendingConnections.size() <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Done writing, but nothing pending, so wait");
_pendingConnections.wait();
} else {
con = (NTCPConnection)_pendingConnections.remove(0);
_liveWrites.add(con);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Switch to writing on: " + con);
}
}
}
} catch (InterruptedException ie) {}
if (!_stop && (con != null)) {
try {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Prepare next write on: " + con);
con.prepareNextWrite();
} catch (RuntimeException re) {
_log.log(Log.CRIT, "Error in the ntcp writer", re);

View File

@ -134,6 +134,7 @@ public class EstablishmentManager {
}
if (msg.getTarget().getNetworkId() != Router.NETWORK_ID) {
_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash());
_transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
_transport.failed(msg, "Remote peer is on the wrong network, cannot establish");
return;
}
@ -146,7 +147,8 @@ public class EstablishmentManager {
if (!_transport.isValid(to.getIP())) {
_transport.failed(msg, "Remote peer's IP isn't valid");
_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE);
_transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
//_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE);
return;
}
@ -465,7 +467,7 @@ public class EstablishmentManager {
public PublishToNewInbound(PeerState peer) { _peer = peer; }
public void timeReached() {
Hash peer = _peer.getRemotePeer();
if ((peer != null) && (!_context.shitlist().isShitlisted(peer))) {
if ((peer != null) && (!_context.shitlist().isShitlisted(peer)) && (!_transport.isUnreachable(peer))) {
// ok, we are fine with them, send them our latest info
if (_log.shouldLog(Log.INFO))
_log.info("Publishing to the peer after confirm plus delay (without shitlist): " + peer.toBase64());
@ -942,7 +944,8 @@ public class EstablishmentManager {
}
Hash peer = outboundState.getRemoteIdentity().calculateHash();
_context.shitlist().shitlistRouter(peer, err, UDPTransport.STYLE);
//_context.shitlist().shitlistRouter(peer, err, UDPTransport.STYLE);
_transport.markUnreachable(peer);
_transport.dropPeer(peer, false, err);
//_context.profileManager().commErrorOccurred(peer);
} else {

View File

@ -1,5 +1,6 @@
package net.i2p.router.transport.udp;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
@ -18,12 +19,14 @@ public class UDPEndpoint {
private UDPTransport _transport;
private UDPSender _sender;
private UDPReceiver _receiver;
private DatagramSocket _socket;
private InetAddress _bindAddress;
public UDPEndpoint(RouterContext ctx, UDPTransport transport, int listenPort) throws SocketException {
public UDPEndpoint(RouterContext ctx, UDPTransport transport, int listenPort, InetAddress bindAddress) throws SocketException {
_context = ctx;
_log = ctx.logManager().getLog(UDPEndpoint.class);
_transport = transport;
_bindAddress = bindAddress;
_listenPort = listenPort;
}
@ -32,9 +35,12 @@ public class UDPEndpoint {
_log.debug("Starting up the UDP endpoint");
shutdown();
try {
DatagramSocket socket = new DatagramSocket(_listenPort);
_sender = new UDPSender(_context, socket, "UDPSend on " + _listenPort);
_receiver = new UDPReceiver(_context, _transport, socket, "UDPReceive on " + _listenPort);
if (_bindAddress == null)
_socket = new DatagramSocket(_listenPort);
else
_socket = new DatagramSocket(_listenPort, _bindAddress);
_sender = new UDPSender(_context, _socket, "UDPSend on " + _listenPort);
_receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceive on " + _listenPort);
_sender.startup();
_receiver.startup();
} catch (SocketException se) {
@ -48,16 +54,22 @@ public class UDPEndpoint {
_sender.shutdown();
_receiver.shutdown();
}
if (_socket != null) {
_socket.close();
}
}
public void setListenPort(int newPort) { _listenPort = newPort; }
public void updateListenPort(int newPort) {
if (newPort == _listenPort) return;
try {
DatagramSocket socket = new DatagramSocket(newPort);
_sender.updateListeningPort(socket, newPort);
if (_bindAddress == null)
_socket = new DatagramSocket(_listenPort);
else
_socket = new DatagramSocket(_listenPort, _bindAddress);
_sender.updateListeningPort(_socket, newPort);
// note: this closes the old socket, so call this after the sender!
_receiver.updateListeningPort(socket, newPort);
_receiver.updateListeningPort(_socket, newPort);
_listenPort = newPort;
} catch (SocketException se) {
if (_log.shouldLog(Log.ERROR))

View File

@ -36,7 +36,7 @@ public class UDPEndpointTest {
int base = 2000 + _context.random().nextInt(10000);
for (int i = 0; i < numPeers; i++) {
_log.debug("Building " + i);
UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i);
UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i, null);
_endpoints[i] = endpoint;
endpoint.startup();
I2PThread read = new I2PThread(new TestRead(endpoint), "Test read " + i);

View File

@ -108,6 +108,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public static final String PROP_FORCE_INTRODUCERS = "i2np.udp.forceIntroducers";
/** do we allow direct SSU connections, sans introducers? */
public static final String PROP_ALLOW_DIRECT = "i2np.udp.allowDirect";
public static final String PROP_BIND_INTERFACE = "i2np.udp.bindInterface";
/** how many relays offered to us will we use at a time? */
public static final int PUBLIC_RELAY_COUNT = 3;
@ -220,8 +221,19 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.info("Binding to the explicitly specified external port: " + port);
}
if (_endpoint == null) {
String bindTo = _context.getProperty(PROP_BIND_INTERFACE);
InetAddress bindToAddr = null;
if (bindTo != null) {
try {
bindToAddr = InetAddress.getByName(bindTo);
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Invalid SSU bind interface specified [" + bindTo + "]", uhe);
bindToAddr = null;
}
}
try {
_endpoint = new UDPEndpoint(_context, this, port);
_endpoint = new UDPEndpoint(_context, this, port, bindToAddr);
} catch (SocketException se) {
if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "Unable to listen on the UDP port (" + port + ")", se);
@ -327,7 +339,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.ERROR))
_log.error("The router " + from.toBase64() + " told us we have an invalid IP - "
+ RemoteHostId.toString(ourIP) + ". Lets throw tomatoes at them");
_context.shitlist().shitlistRouter(from, "They said we had an invalid IP", STYLE);
markUnreachable(from);
//_context.shitlist().shitlistRouter(from, "They said we had an invalid IP", STYLE);
return;
} else if (inboundRecent && _externalListenPort > 0 && _externalListenHost != null) {
// use OS clock since its an ordering thing, not a time thing
@ -533,7 +546,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.warn("Peer already connected: old=" + oldPeer + " new=" + peer, new Exception("dup"));
_activeThrottle.unchoke(peer.getRemotePeer());
_context.shitlist().unshitlistRouter(peer.getRemotePeer(), STYLE);
markReachable(peer.getRemotePeer());
//_context.shitlist().unshitlistRouter(peer.getRemotePeer(), STYLE);
if (SHOULD_FLOOD_PEERS)
_flooder.addPeer(peer);
@ -601,7 +615,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
SimpleTimer.getInstance().addEvent(new RemoveDropList(remote), DROPLIST_PERIOD);
}
}
_context.shitlist().shitlistRouter(peerHash, "Part of the wrong network", STYLE);
markUnreachable(peerHash);
//_context.shitlist().shitlistRouter(peerHash, "Part of the wrong network", STYLE);
dropPeer(peerHash, false, "wrong network");
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping the peer " + peerHash.toBase64() + " because they are in the wrong net");
@ -701,8 +716,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (peer.getRemotePeer() != null) {
dropPeerCapacities(peer);
if (shouldShitlist)
_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries", STYLE);
if (shouldShitlist) {
markUnreachable(peer.getRemotePeer());
//_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries", STYLE);
}
long now = _context.clock().now();
_context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
synchronized (_peersByIdent) {
@ -1630,7 +1647,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]");
appended = true;
}
if (_context.shitlist().isShitlisted(peer.getRemotePeer())) {
if (_context.shitlist().isShitlisted(peer.getRemotePeer(), STYLE)) {
if (!appended) buf.append("<br />");
buf.append(" [shitlisted]");
appended = true;

View File

@ -363,6 +363,9 @@ public class FragmentHandler {
private void receiveComplete(FragmentedMessage msg) {
_completed++;
String stringified = null;
if (_log.shouldLog(Log.DEBUG))
stringified = msg.toString();
try {
int fragmentCount = msg.getFragmentCount();
// toByteArray destroys the contents of the message completely
@ -377,11 +380,13 @@ public class FragmentHandler {
noteCompletion(m.getUniqueId());
_receiver.receiveComplete(m, msg.getTargetRouter(), msg.getTargetTunnel());
} catch (IOException ioe) {
if (stringified == null) stringified = msg.toString();
if (_log.shouldLog(Log.ERROR))
_log.error("Error receiving fragmented message (corrupt?): " + msg, ioe);
_log.error("Error receiving fragmented message (corrupt?): " + stringified, ioe);
} catch (I2NPMessageException ime) {
if (stringified == null) stringified = msg.toString();
if (_log.shouldLog(Log.ERROR))
_log.error("Error receiving fragmented message (corrupt?): " + msg, ime);
_log.error("Error receiving fragmented message (corrupt?): " + stringified, ime);
}
}

View File

@ -7,13 +7,14 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelPoolSettings;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.router.peermanager.PeerProfile;
import net.i2p.util.Log;
/**
* Coordinate the selection of peers to go into a tunnel for one particular
* pool.
*/
abstract class TunnelPeerSelector {
public abstract class TunnelPeerSelector {
/**
* Which peers should go into the next tunnel for the given settings?
*
@ -155,16 +156,20 @@ abstract class TunnelPeerSelector {
return rv;
} else if (filterSlow(ctx, isInbound, isExploratory)) {
Log log = ctx.logManager().getLog(TunnelPeerSelector.class);
String excludeCaps = ctx.getProperty("router.excludePeerCaps",
String.valueOf(Router.CAPABILITY_BW16));
Set peers = new HashSet();
if (excludeCaps != null) {
char excl[] = excludeCaps.toCharArray();
char excl[] = getExcludeCaps(ctx);
Set peers = new HashSet(1);
if (excl != null) {
FloodfillNetworkDatabaseFacade fac = (FloodfillNetworkDatabaseFacade)ctx.netDb();
List known = fac.getKnownRouterData();
if (known != null) {
for (int i = 0; i < known.size(); i++) {
RouterInfo peer = (RouterInfo)known.get(i);
boolean shouldExclude = shouldExclude(ctx, log, peer, excl);
if (shouldExclude) {
peers.add(peer.getIdentity().calculateHash());
continue;
}
/*
String cap = peer.getCapabilities();
if (cap == null) {
peers.add(peer.getIdentity().calculateHash());
@ -247,6 +252,7 @@ abstract class TunnelPeerSelector {
continue;
}
}
*/
}
}
/*
@ -264,6 +270,102 @@ abstract class TunnelPeerSelector {
}
}
public static boolean shouldExclude(RouterContext ctx, RouterInfo peer) {
Log log = ctx.logManager().getLog(TunnelPeerSelector.class);
return shouldExclude(ctx, log, peer, getExcludeCaps(ctx));
}
private static char[] getExcludeCaps(RouterContext ctx) {
String excludeCaps = ctx.getProperty("router.excludePeerCaps",
String.valueOf(Router.CAPABILITY_BW16));
Set peers = new HashSet();
if (excludeCaps != null) {
char excl[] = excludeCaps.toCharArray();
return excl;
} else {
return null;
}
}
private static final long DONT_EXCLUDE_PERIOD = 15*60*1000;
private static boolean shouldExclude(RouterContext ctx, Log log, RouterInfo peer, char excl[]) {
String cap = peer.getCapabilities();
if (cap == null) {
return true;
}
for (int j = 0; j < excl.length; j++) {
if (cap.indexOf(excl[j]) >= 0) {
return true;
}
}
int maxLen = 0;
if (cap.indexOf(FloodfillNetworkDatabaseFacade.CAPACITY_FLOODFILL) >= 0)
maxLen++;
if (cap.indexOf(Router.CAPABILITY_REACHABLE) >= 0)
maxLen++;
if (cap.indexOf(Router.CAPABILITY_UNREACHABLE) >= 0)
maxLen++;
if (cap.length() <= maxLen)
return true;
// otherwise, it contains flags we aren't trying to focus on,
// so don't exclude it based on published capacity
String val = peer.getOption("stat_uptime");
if (val != null) {
long uptimeMs = 0;
if (val != null) {
long factor = 1;
if (val.endsWith("ms")) {
factor = 1;
val = val.substring(0, val.length()-2);
} else if (val.endsWith("s")) {
factor = 1000l;
val = val.substring(0, val.length()-1);
} else if (val.endsWith("m")) {
factor = 60*1000l;
val = val.substring(0, val.length()-1);
} else if (val.endsWith("h")) {
factor = 60*60*1000l;
val = val.substring(0, val.length()-1);
} else if (val.endsWith("d")) {
factor = 24*60*60*1000l;
val = val.substring(0, val.length()-1);
}
try { uptimeMs = Long.parseLong(val); } catch (NumberFormatException nfe) {}
uptimeMs *= factor;
} else {
// not publishing an uptime, so exclude it
return true;
}
long infoAge = ctx.clock().now() - peer.getPublished();
if (infoAge < 0) {
return false;
} else if (infoAge > 5*24*60*60*1000) {
// Only exclude long-unseen peers if we haven't just started up
if (ctx.router().getUptime() < DONT_EXCLUDE_PERIOD) {
if (log.shouldLog(Log.DEBUG))
log.debug("Not excluding a long-unseen peer, since we just started up.");
return false;
} else {
if (log.shouldLog(Log.DEBUG))
log.debug("Excluding a long-unseen peer.");
return true;
}
} else {
if ( (infoAge + uptimeMs < 2*60*60*1000) && (ctx.router().getUptime() > DONT_EXCLUDE_PERIOD) ) {
// up for less than 2 hours, so exclude it
return true;
} else {
return false;
}
}
} else {
// not publishing stats, so exclude it
return true;
}
}
private static final String PROP_OUTBOUND_EXPLORATORY_EXCLUDE_UNREACHABLE = "router.outboundExploratoryExcludeUnreachable";
private static final String PROP_OUTBOUND_CLIENT_EXCLUDE_UNREACHABLE = "router.outboundClientExcludeUnreachable";
private static final String PROP_INBOUND_EXPLORATORY_EXCLUDE_UNREACHABLE = "router.inboundExploratoryExcludeUnreachable";