2005-02-22 jrandom

* Adjusted (and fixed...) the timestamper change detection
    * Deal with a rare reordering bug at the beginning of a stream (so we
      don't drop it unnecessarily)
    * Cleaned up some dropped message handling in the router
    * Reduced job queue churn when dealing with a large number of tunnels by
      sharing an expiration job
    * Keep a separate list of the most recent CRIT messages (shown on the
      logs.jsp).  This way they don't get buried among any other messages.
    * For clarity, display the tunnel variance config as "Randomization" on
      the web console.
    * If lease republishing fails (boo! hiss!) try it again
    * Actually fix the negative jobLag in the right place (this time)
    * Allow reseeding when there are less than 10 known peer references
    * Lots of logging updates.
This commit is contained in:
jrandom
2005-02-22 07:07:29 +00:00
committed by zzz
parent 35fe7f8203
commit c17433cb93
24 changed files with 280 additions and 72 deletions

View File

@ -20,6 +20,7 @@ import net.i2p.data.RouterIdentity;
import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.data.i2np.TunnelCreateStatusMessage;
import net.i2p.data.i2np.TunnelDataMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
@ -35,7 +36,7 @@ import net.i2p.util.Log;
public class InNetMessagePool implements Service {
private Log _log;
private RouterContext _context;
private Map _handlerJobBuilders;
private HandlerJobBuilder _handlerJobBuilders[];
private List _pendingDataMessages;
private List _pendingDataMessagesFrom;
private List _pendingGatewayMessages;
@ -57,7 +58,7 @@ public class InNetMessagePool implements Service {
public InNetMessagePool(RouterContext context) {
_context = context;
_handlerJobBuilders = new HashMap();
_handlerJobBuilders = new HandlerJobBuilder[20];
_pendingDataMessages = new ArrayList(16);
_pendingDataMessagesFrom = new ArrayList(16);
_pendingGatewayMessages = new ArrayList(16);
@ -75,11 +76,15 @@ public class InNetMessagePool implements Service {
}
public HandlerJobBuilder registerHandlerJobBuilder(int i2npMessageType, HandlerJobBuilder builder) {
return (HandlerJobBuilder)_handlerJobBuilders.put(new Integer(i2npMessageType), builder);
HandlerJobBuilder old = _handlerJobBuilders[i2npMessageType];
_handlerJobBuilders[i2npMessageType] = builder;
return old;
}
public HandlerJobBuilder unregisterHandlerJobBuilder(int i2npMessageType) {
return (HandlerJobBuilder)_handlerJobBuilders.remove(new Integer(i2npMessageType));
HandlerJobBuilder old = _handlerJobBuilders[i2npMessageType];
_handlerJobBuilders[i2npMessageType] = null;
return old;
}
/**
@ -132,7 +137,7 @@ public class InNetMessagePool implements Service {
shortCircuitTunnelData(messageBody, fromRouterHash);
allowMatches = false;
} else {
HandlerJobBuilder builder = (HandlerJobBuilder)_handlerJobBuilders.get(new Integer(type));
HandlerJobBuilder builder = _handlerJobBuilders[type];
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add message to the inNetMessage pool - builder: " + builder
@ -190,12 +195,14 @@ public class InNetMessagePool implements Service {
if (_log.shouldLog(Log.INFO))
_log.info("Dropping slow db lookup response: " + messageBody);
_context.statManager().addRateData("inNetPool.droppedDbLookupResponseMessage", 1, 0);
} else if (type == DatabaseLookupMessage.MESSAGE_TYPE) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Dropping netDb lookup due to throttling");
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("Message " + messageBody + " expiring on "
+ (messageBody != null ? (messageBody.getMessageExpiration()+"") : " [unknown]")
+ " was not handled by a HandlerJobBuilder - DROPPING: "
+ messageBody, new Exception("DROPPED MESSAGE"));
if (_log.shouldLog(Log.WARN))
_log.warn("Message expiring on "
+ (messageBody != null ? (messageBody.getMessageExpiration()+"") : " [unknown]")
+ " was not handled by a HandlerJobBuilder - DROPPING: " + messageBody);
_context.statManager().addRateData("inNetPool.dropped", 1, 0);
}
} else {

View File

@ -92,9 +92,12 @@ class JobQueueRunner implements Runnable {
_state = 13;
long diff = _context.clock().now() - beforeUpdate;
long lag = doStart - origStartAfter;
if (lag < 0) lag = 0;
_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
_context.statManager().addRateData("jobQueue.jobRun", duration, duration);
_context.statManager().addRateData("jobQueue.jobLag", doStart - origStartAfter, 0);
_context.statManager().addRateData("jobQueue.jobLag", lag, 0);
_context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime);
if (duration > 1000) {

View File

@ -57,6 +57,8 @@ public abstract class NetworkDatabaseFacade implements Service {
public abstract void publish(LeaseSet localLeaseSet);
public abstract void unpublish(LeaseSet localLeaseSet);
public abstract void fail(Hash dbEntry);
public int getKnownRouters() { return 0; }
}

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.144 $ $Date: 2005/02/20 04:12:46 $";
public final static String ID = "$Revision: 1.145 $ $Date: 2005/02/21 13:02:16 $";
public final static String VERSION = "0.5";
public final static long BUILD = 3;
public final static long BUILD = 4;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -76,6 +76,9 @@ public class GarlicMessageReceiver {
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("CloveMessageParser failed to decrypt the message [" + message.getUniqueId()
+ "]");
if (_log.shouldLog(Log.WARN))
_log.warn("CloveMessageParser failed to decrypt the message [" + message.getUniqueId()
+ "]", new Exception("Decrypt garlic failed"));
_context.statManager().addRateData("crypto.garlic.decryptFail", 1, 0);
_context.messageHistory().messageProcessingError(message.getUniqueId(),
@ -105,6 +108,9 @@ public class GarlicMessageReceiver {
String howLongAgo = DataHelper.formatDuration(_context.clock().now()-clove.getExpiration().getTime());
if (_log.shouldLog(Log.ERROR))
_log.error("Clove is NOT valid: id=" + clove.getCloveId()
+ " expiration " + howLongAgo + " ago");
if (_log.shouldLog(Log.WARN))
_log.warn("Clove is NOT valid: id=" + clove.getCloveId()
+ " expiration " + howLongAgo + " ago", new Exception("Invalid within..."));
_context.messageHistory().messageProcessingError(clove.getCloveId(),
clove.getData().getClass().getName(),

View File

@ -60,6 +60,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
private boolean _initialized;
/** Clock independent time of when we started up */
private long _started;
private int _knownRouters;
private StartExplorersJob _exploreJob;
private HarvesterJob _harvestJob;
/** when was the last time an exploration found something new? */
@ -130,6 +131,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_peerSelector = new PeerSelector(_context);
_publishingLeaseSets = new HashSet(8);
_lastExploreNew = 0;
_knownRouters = 0;
_activeRequests = new HashMap(8);
_enforceNetId = DEFAULT_ENFORCE_NETID;
}
@ -359,6 +361,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
return rv;
}
public int getKnownRouters() { return _knownRouters; }
public void lookupLeaseSet(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs) {
if (!_initialized) return;
LeaseSet ls = lookupLeaseSetLocally(key);
@ -639,6 +643,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
+ routerInfo.getOptions().size() + " options on "
+ new Date(routerInfo.getPublished()));
_knownRouters++;
_ds.put(key, routerInfo);
synchronized (_lastSent) {
if (!_lastSent.containsKey(key))
@ -699,6 +704,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
synchronized (_passiveSendKeys) {
_passiveSendKeys.remove(dbEntry);
}
if (isRouterInfo)
_knownRouters--;
}
public void unpublish(LeaseSet localLeaseSet) {

View File

@ -76,6 +76,7 @@ public class RepublishLeaseSetJob extends JobImpl {
public void runJob() {
if (_log.shouldLog(Log.ERROR))
_log.error("FAILED publishing of the leaseSet for " + _dest.toBase64());
getContext().jobQueue().addJob(new RepublishLeaseSetJob(getContext(), _facade, _dest));
}
}
}

View File

@ -17,10 +17,13 @@ class OutboundReceiver implements TunnelGateway.Receiver {
private RouterContext _context;
private Log _log;
private TunnelCreatorConfig _config;
private RouterInfo _nextHopCache;
public OutboundReceiver(RouterContext ctx, TunnelCreatorConfig cfg) {
_context = ctx;
_log = ctx.logManager().getLog(OutboundReceiver.class);
_config = cfg;
_nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getPeer(1));
}
public void receiveEncrypted(byte encrypted[]) {
@ -30,8 +33,11 @@ class OutboundReceiver implements TunnelGateway.Receiver {
if (_log.shouldLog(Log.DEBUG))
_log.debug("received encrypted, sending out " + _config + ": " + msg);
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getPeer(1));
RouterInfo ri = _nextHopCache;
if (ri == null)
ri = _context.netDb().lookupRouterInfoLocally(_config.getPeer(1));
if (ri != null) {
_nextHopCache = ri;
send(msg, ri);
} else {
if (_log.shouldLog(Log.DEBUG))
@ -65,8 +71,10 @@ class OutboundReceiver implements TunnelGateway.Receiver {
if (_log.shouldLog(Log.DEBUG))
_log.debug("lookup of " + _config.getPeer(1).toBase64().substring(0,4)
+ " successful? " + (ri != null));
if (ri != null)
if (ri != null) {
_nextHopCache = ri;
send(_msg, ri);
}
}
}

View File

@ -38,6 +38,7 @@ public class TunnelDispatcher implements Service {
/** what is the date/time on which the last non-locally-created tunnel expires? */
private long _lastParticipatingExpiration;
private BloomFilterIVValidator _validator;
private LeaveTunnel _leaveJob;
/** Creates a new instance of TunnelDispatcher */
public TunnelDispatcher(RouterContext ctx) {
@ -50,6 +51,7 @@ public class TunnelDispatcher implements Service {
_participatingConfig = new HashMap();
_lastParticipatingExpiration = 0;
_validator = null;
_leaveJob = new LeaveTunnel(ctx);
ctx.statManager().createRateStat("tunnel.participatingTunnels",
"How many tunnels are we participating in?", "Tunnels",
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
@ -176,7 +178,7 @@ public class TunnelDispatcher implements Service {
_context.statManager().addRateData("tunnel.joinParticipant", 1, 0);
if (cfg.getExpiration() > _lastParticipatingExpiration)
_lastParticipatingExpiration = cfg.getExpiration();
_context.jobQueue().addJob(new LeaveTunnel(_context, cfg));
_leaveJob.add(cfg);
}
/**
* We are the outbound endpoint in this tunnel, and did not create it
@ -200,7 +202,7 @@ public class TunnelDispatcher implements Service {
if (cfg.getExpiration() > _lastParticipatingExpiration)
_lastParticipatingExpiration = cfg.getExpiration();
_context.jobQueue().addJob(new LeaveTunnel(_context, cfg));
_leaveJob.add(cfg);
}
/**
@ -228,7 +230,7 @@ public class TunnelDispatcher implements Service {
if (cfg.getExpiration() > _lastParticipatingExpiration)
_lastParticipatingExpiration = cfg.getExpiration();
_context.jobQueue().addJob(new LeaveTunnel(_context, cfg));
_leaveJob.add(cfg);
}
public int getParticipatingCount() {
@ -336,10 +338,11 @@ public class TunnelDispatcher implements Service {
_context.statManager().addRateData("tunnel.dispatchEndpoint", 1, 0);
} else {
_context.messageHistory().droppedTunnelDataMessageUnknown(msg.getUniqueId(), msg.getTunnelId().getTunnelId());
if (_log.shouldLog(Log.ERROR))
_log.error("no matching participant/endpoint for id=" + msg.getTunnelId().getTunnelId()
+ ": existing = " + _participants.keySet()
+ " / " + _outboundEndpoints.keySet());
int level = (_context.router().getUptime() > 10*60*1000 ? Log.ERROR : Log.WARN);
if (_log.shouldLog(level))
_log.log(level, "no matching participant/endpoint for id=" + msg.getTunnelId().getTunnelId()
+ " expiring in " + DataHelper.formatDuration(msg.getMessageExpiration()-_context.clock().now())
+ ": existing = " + _participants.size() + " / " + _outboundEndpoints.size());
}
}
@ -374,8 +377,9 @@ public class TunnelDispatcher implements Service {
_context.statManager().addRateData("tunnel.dispatchInbound", 1, 0);
} else {
_context.messageHistory().droppedTunnelGatewayMessageUnknown(msg.getUniqueId(), msg.getTunnelId().getTunnelId());
if (_log.shouldLog(Log.ERROR))
_log.error("no matching tunnel for id=" + msg.getTunnelId().getTunnelId()
int level = (_context.router().getUptime() > 10*60*1000 ? Log.ERROR : Log.WARN);
if (_log.shouldLog(level))
_log.log(level, "no matching tunnel for id=" + msg.getTunnelId().getTunnelId()
+ ": gateway message expiring in "
+ DataHelper.formatDuration(msg.getMessageExpiration()-_context.clock().now())
+ "/"
@ -383,7 +387,7 @@ public class TunnelDispatcher implements Service {
+ " messageId " + msg.getUniqueId()
+ "/" + msg.getMessage().getUniqueId()
+ " messageType: " + msg.getMessage().getClass().getName()
+ " existing = " + _inboundGateways.keySet());
+ " existing = " + _inboundGateways.size());
}
long dispatchTime = _context.clock().now() - before;
@ -423,7 +427,7 @@ public class TunnelDispatcher implements Service {
if (_log.shouldLog(Log.DEBUG))
_log.debug("dispatch outbound through " + outboundTunnel.getTunnelId()
+ ": " + msg);
if (msg.getMessageExpiration() < before) {
if (msg.getMessageExpiration() < before - Router.CLOCK_FUDGE_FACTOR) {
if (_log.shouldLog(Log.ERROR))
_log.error("why are you sending a tunnel message that expired "
+ (before-msg.getMessageExpiration()) + "ms ago? "
@ -438,9 +442,10 @@ public class TunnelDispatcher implements Service {
} else {
_context.messageHistory().droppedTunnelGatewayMessageUnknown(msg.getUniqueId(), outboundTunnel.getTunnelId());
if (_log.shouldLog(Log.ERROR))
_log.error("no matching outbound tunnel for id=" + outboundTunnel
+ ": existing = " + _outboundGateways.keySet());
int level = (_context.router().getUptime() > 10*60*1000 ? Log.ERROR : Log.WARN);
if (_log.shouldLog(level))
_log.log(level, "no matching outbound tunnel for id=" + outboundTunnel
+ ": existing = " + _outboundGateways.size());
}
long dispatchTime = _context.clock().now() - before;
@ -473,16 +478,59 @@ public class TunnelDispatcher implements Service {
public void renderStatusHTML(Writer out) throws IOException {}
private class LeaveTunnel extends JobImpl {
private HopConfig _config;
private List _configs;
private List _times;
public LeaveTunnel(RouterContext ctx, HopConfig config) {
public LeaveTunnel(RouterContext ctx) {
super(ctx);
_config = config;
getTiming().setStartAfter(config.getExpiration() + 2*Router.CLOCK_FUDGE_FACTOR);
_configs = new ArrayList(128);
_times = new ArrayList(128);
}
public void add(HopConfig cfg) {
Long dropTime = new Long(cfg.getExpiration() + 2*Router.CLOCK_FUDGE_FACTOR);
synchronized (LeaveTunnel.this) {
_configs.add(cfg);
_times.add(dropTime);
}
long oldAfter = getTiming().getStartAfter();
if (oldAfter < getContext().clock().now()) {
getTiming().setStartAfter(dropTime.longValue());
getContext().jobQueue().addJob(LeaveTunnel.this);
} else if (oldAfter >= dropTime.longValue()) {
getTiming().setStartAfter(dropTime.longValue());
} else {
// already scheduled for the future, and before this expiration
}
}
public String getName() { return "Leave participant"; }
public void runJob() {
remove(_config);
HopConfig cur = null;
Long nextTime = null;
long now = getContext().clock().now();
synchronized (LeaveTunnel.this) {
if (_configs.size() <= 0)
return;
nextTime = (Long)_times.get(0);
if (nextTime.longValue() <= now) {
cur = (HopConfig)_configs.remove(0);
_times.remove(0);
if (_times.size() > 0)
nextTime = (Long)_times.get(0);
else
nextTime = null;
}
}
if (cur != null)
remove(cur);
if (nextTime != null) {
getTiming().setStartAfter(nextTime.longValue());
getContext().jobQueue().addJob(LeaveTunnel.this);
}
}
}
}