diff --git a/router/java/src/net/i2p/data/i2np/DatabaseStoreMessage.java b/router/java/src/net/i2p/data/i2np/DatabaseStoreMessage.java index 2a9d999bb..4ad60e6e8 100644 --- a/router/java/src/net/i2p/data/i2np/DatabaseStoreMessage.java +++ b/router/java/src/net/i2p/data/i2np/DatabaseStoreMessage.java @@ -19,6 +19,7 @@ import net.i2p.data.DataHelper; import net.i2p.data.Hash; import net.i2p.data.LeaseSet; import net.i2p.data.RouterInfo; +import net.i2p.data.TunnelId; import net.i2p.util.Log; /** @@ -34,6 +35,9 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { private int _type; private LeaseSet _leaseSet; private RouterInfo _info; + private long _replyToken; + private TunnelId _replyTunnel; + private Hash _replyGateway; public final static int KEY_TYPE_ROUTERINFO = 0; public final static int KEY_TYPE_LEASESET = 1; @@ -44,6 +48,9 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { setKey(null); setLeaseSet(null); setRouterInfo(null); + setReplyToken(0); + setReplyTunnel(null); + setReplyGateway(null); } /** @@ -83,6 +90,22 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { public int getValueType() { return _type; } public void setValueType(int type) { _type = type; } + /** + * If a reply is desired, this token specifies the message ID that should + * be used for a DeliveryStatusMessage to be sent to the reply tunnel on the + * reply gateway. + * + * @return positive reply token ID, or 0 if no reply is necessary. + */ + public long getReplyToken() { return _replyToken; } + public void setReplyToken(long token) { _replyToken = token; } + + public TunnelId getReplyTunnel() { return _replyTunnel; } + public void setReplyTunnel(TunnelId id) { _replyTunnel = id; } + + public Hash getReplyGateway() { return _replyGateway; } + public void setReplyGateway(Hash peer) { _replyGateway = peer; } + public void readMessage(InputStream in, int type) throws I2NPMessageException, IOException { if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message"); try { @@ -91,6 +114,16 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Hash read: " + _key.toBase64()); _type = (int)DataHelper.readLong(in, 1); + _replyToken = DataHelper.readLong(in, 4); + if (_replyToken > 0) { + _replyTunnel = new TunnelId(); + _replyTunnel.readBytes(in); + _replyGateway = new Hash(); + _replyGateway.readBytes(in); + } else { + _replyTunnel = null; + _replyGateway = null; + } if (_type == KEY_TYPE_LEASESET) { _leaseSet = new LeaseSet(); _leaseSet.readBytes(in); @@ -121,6 +154,13 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { try { _key.writeBytes(os); DataHelper.writeLong(os, 1, _type); + DataHelper.writeLong(os, 4, _replyToken); + if (_replyToken > 0) { + _replyTunnel.writeBytes(os); + _replyGateway.writeBytes(os); + } else { + // noop + } if (_type == KEY_TYPE_LEASESET) { _leaseSet.writeBytes(os); } else if (_type == KEY_TYPE_ROUTERINFO) { @@ -143,7 +183,10 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { return DataHelper.hashCode(getKey()) + DataHelper.hashCode(getLeaseSet()) + DataHelper.hashCode(getRouterInfo()) + - getValueType(); + getValueType() + + (int)getReplyToken() + + DataHelper.hashCode(getReplyTunnel()) + + DataHelper.hashCode(getReplyGateway()); } public boolean equals(Object object) { @@ -152,7 +195,10 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { return DataHelper.eq(getKey(),msg.getKey()) && DataHelper.eq(getLeaseSet(),msg.getLeaseSet()) && DataHelper.eq(getRouterInfo(),msg.getRouterInfo()) && - DataHelper.eq(getValueType(),msg.getValueType()); + DataHelper.eq(getValueType(),msg.getValueType()) && + getReplyToken() == msg.getReplyToken() && + DataHelper.eq(getReplyTunnel(), msg.getReplyTunnel()) && + DataHelper.eq(getReplyGateway(), msg.getReplyGateway()); } else { return false; } @@ -167,6 +213,9 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { buf.append("\n\tValue Type: ").append(getValueType()); buf.append("\n\tRouter Info: ").append(getRouterInfo()); buf.append("\n\tLease Set: ").append(getLeaseSet()); + buf.append("\n\tReply token: ").append(getReplyToken()); + buf.append("\n\tReply tunnel: ").append(getReplyTunnel()); + buf.append("\n\tReply gateway: ").append(getReplyGateway()); buf.append("]"); return buf.toString(); } diff --git a/router/java/src/net/i2p/router/InNetMessagePool.java b/router/java/src/net/i2p/router/InNetMessagePool.java index 440af4d93..5350025c3 100644 --- a/router/java/src/net/i2p/router/InNetMessagePool.java +++ b/router/java/src/net/i2p/router/InNetMessagePool.java @@ -14,6 +14,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.util.Log; /** @@ -34,6 +35,7 @@ public class InNetMessagePool { _handlerJobBuilders = new HashMap(); _log = _context.logManager().getLog(InNetMessagePool.class); _context.statManager().createRateStat("inNetPool.dropped", "How often do we drop a message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("inNetPool.droppedDeliveryStatusDelay", "How long after a delivery status message is created do we receive it back again (for messages that are too slow to be handled)", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("inNetPool.duplicate", "How often do we receive a duplicate message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); } @@ -119,10 +121,18 @@ public class InNetMessagePool { if (size == -1) { // was not handled via HandlerJobBuilder _context.messageHistory().droppedOtherMessage(msg.getMessage()); - if (_log.shouldLog(Log.ERROR)) - _log.error("Message " + msg.getMessage() + " was not handled by a HandlerJobBuilder - DROPPING: " - + msg, new Exception("DROPPED MESSAGE")); - _context.statManager().addRateData("inNetPool.dropped", 1, 0); + if (msg.getMessage().getType() == DeliveryStatusMessage.MESSAGE_TYPE) { + long timeSinceSent = _context.clock().now() - + ((DeliveryStatusMessage)msg.getMessage()).getArrival().getTime(); + if (_log.shouldLog(Log.INFO)) + _log.info("Dropping unhandled delivery status message created " + timeSinceSent + "ms ago: " + msg); + _context.statManager().addRateData("inNetPool.droppedDeliveryStatusDelay", timeSinceSent, timeSinceSent); + } else { + if (_log.shouldLog(Log.ERROR)) + _log.error("Message " + msg.getMessage() + " was not handled by a HandlerJobBuilder - DROPPING: " + + msg, new Exception("DROPPED MESSAGE")); + _context.statManager().addRateData("inNetPool.dropped", 1, 0); + } } else { String mtype = msg.getMessage().getClass().getName(); _context.messageHistory().receiveMessage(mtype, msg.getMessage().getUniqueId(), diff --git a/router/java/src/net/i2p/router/networkdb/HandleDatabaseStoreMessageJob.java b/router/java/src/net/i2p/router/networkdb/HandleDatabaseStoreMessageJob.java index 538299872..3a5cbc534 100644 --- a/router/java/src/net/i2p/router/networkdb/HandleDatabaseStoreMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/HandleDatabaseStoreMessageJob.java @@ -9,12 +9,20 @@ package net.i2p.router.networkdb; */ import java.util.Date; +import java.util.List; import net.i2p.data.Hash; import net.i2p.data.RouterIdentity; +import net.i2p.data.TunnelId; import net.i2p.data.i2np.DatabaseStoreMessage; +import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.router.JobImpl; +import net.i2p.router.MessageHistory; +import net.i2p.router.NetworkDatabaseFacade; +import net.i2p.router.ProfileManager; import net.i2p.router.RouterContext; +import net.i2p.router.TunnelSelectionCriteria; +import net.i2p.router.message.SendTunnelMessageJob; import net.i2p.util.Log; /** @@ -27,6 +35,9 @@ public class HandleDatabaseStoreMessageJob extends JobImpl { private RouterIdentity _from; private Hash _fromHash; + private static final long ACK_TIMEOUT = 15*1000; + private static final int ACK_PRIORITY = 100; + public HandleDatabaseStoreMessageJob(RouterContext ctx, DatabaseStoreMessage receivedMessage, RouterIdentity from, Hash fromHash) { super(ctx); _log = ctx.logManager().getLog(HandleDatabaseStoreMessageJob.class); @@ -56,13 +67,43 @@ public class HandleDatabaseStoreMessageJob extends JobImpl { _log.error("Invalid DatabaseStoreMessage data type - " + _message.getValueType() + ": " + _message); } + + if (_message.getReplyToken() > 0) + sendAck(); + if (_from != null) _fromHash = _from.getHash(); if (_fromHash != null) _context.profileManager().dbStoreReceived(_fromHash, wasNew); _context.statManager().addRateData("netDb.storeHandled", 1, 0); } + + private void sendAck() { + DeliveryStatusMessage msg = new DeliveryStatusMessage(_context); + msg.setMessageId(_message.getReplyToken()); + msg.setArrival(new Date(_context.clock().now())); + TunnelId outTunnelId = selectOutboundTunnel(); + _context.jobQueue().addJob(new SendTunnelMessageJob(_context, msg, outTunnelId, + _message.getReplyGateway(), _message.getReplyTunnel(), + null, null, null, null, ACK_TIMEOUT, ACK_PRIORITY)); + } + private TunnelId selectOutboundTunnel() { + TunnelSelectionCriteria criteria = new TunnelSelectionCriteria(); + criteria.setAnonymityPriority(80); + criteria.setLatencyPriority(50); + criteria.setReliabilityPriority(20); + criteria.setMaximumTunnelsRequired(1); + criteria.setMinimumTunnelsRequired(1); + List tunnelIds = _context.tunnelManager().selectOutboundTunnelIds(criteria); + if (tunnelIds.size() <= 0) { + _log.error("No outbound tunnels?!"); + return null; + } else { + return (TunnelId)tunnelIds.get(0); + } + } + public String getName() { return "Handle Database Store Message"; } public void dropped() { diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index 5f040634f..b6d23d802 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -24,9 +24,11 @@ import net.i2p.data.TunnelId; import net.i2p.data.i2np.DatabaseStoreMessage; import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.data.i2np.I2NPMessage; +import net.i2p.data.i2np.GarlicMessage; import net.i2p.router.Job; import net.i2p.router.JobImpl; import net.i2p.router.MessageSelector; +import net.i2p.router.ReplyJob; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; import net.i2p.router.TunnelSelectionCriteria; @@ -56,6 +58,9 @@ class StoreJob extends JobImpl { */ private final static int EXPLORATORY_REDUNDANCY = 1; private final static int STORE_PRIORITY = 100; + + /** how long we allow for an ACK to take after a store */ + private final static long STORE_TIMEOUT_MS = 10*1000; /** * Create a new search for the routingKey specified @@ -75,8 +80,10 @@ class StoreJob extends JobImpl { super(context); _log = context.logManager().getLog(StoreJob.class); _context.statManager().createRateStat("netDb.storeSent", "How many netDb store messages have we sent?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("netDb.storePeers", "How many peers each netDb must be sent to before success?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("netDb.ackTime", "How long does it take for a peer to ack a netDb store?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); _facade = facade; - _state = new StoreState(key, data, toSkip); + _state = new StoreState(_context, key, data, toSkip); _onSuccess = onSuccess; _onFailure = onFailure; _timeoutMs = timeoutMs; @@ -89,14 +96,14 @@ class StoreJob extends JobImpl { sendNext(); } - protected boolean isExpired() { + private boolean isExpired() { return _context.clock().now() >= _expiration; } /** * send the key to the next batch of peers */ - protected void sendNext() { + private void sendNext() { if (_state.completed()) { if (_log.shouldLog(Log.INFO)) _log.info("Already completed"); @@ -106,8 +113,8 @@ class StoreJob extends JobImpl { _state.complete(true); fail(); } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Sending: " + _state); + //if (_log.shouldLog(Log.INFO)) + // _log.info(getJobId() + ": Sending: " + _state); continueSending(); } } @@ -118,11 +125,13 @@ class StoreJob extends JobImpl { * at any time * */ - protected void continueSending() { + private void continueSending() { if (_state.completed()) return; int toCheck = PARALLELIZATION - _state.getPending().size(); if (toCheck <= 0) { // too many already pending + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Too many store messages pending"); return; } if (toCheck > PARALLELIZATION) @@ -131,22 +140,24 @@ class StoreJob extends JobImpl { List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { if (_state.getPending().size() <= 0) { - // we tried to find some peers, but there weren't any and no one else is going to answer + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": No more peers left and none pending"); fail(); } else { - // no more to try, but we might get data or close peers from some outstanding requests + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": No more peers left but some are pending, so keep waiting"); return; } } else { _state.addPending(closestHashes); if (_log.shouldLog(Log.INFO)) - _log.info("Continue sending key " + _state.getTarget() + " to " + closestHashes); + _log.info(getJobId() + ": Continue sending key " + _state.getTarget() + " after " + _state.getAttempted().size() + " tries to " + closestHashes); for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { Hash peer = (Hash)iter.next(); DataStructure ds = _facade.getDataStore().get(peer); if ( (ds == null) || !(ds instanceof RouterInfo) ) { if (_log.shouldLog(Log.WARN)) - _log.warn("Error selecting closest hash that wasnt a router! " + peer + " : " + ds); + _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds); } else { sendStore((RouterInfo)ds); } @@ -162,10 +173,10 @@ class StoreJob extends JobImpl { * * @return ordered list of Hash objects */ - protected List getClosestRouters(Hash key, int numClosest, Set alreadyChecked) { + private List getClosestRouters(Hash key, int numClosest, Set alreadyChecked) { Hash rkey = _context.routingKeyGenerator().getRoutingKey(key); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Current routing key for " + key + ": " + rkey); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": Current routing key for " + key + ": " + rkey); return _peerSelector.selectNearestExplicit(rkey, numClosest, alreadyChecked, _facade.getKBuckets()); } @@ -175,7 +186,7 @@ class StoreJob extends JobImpl { * DeliveryStatusMessage so we know it got there * */ - protected void sendStore(RouterInfo router) { + private void sendStore(RouterInfo router) { DatabaseStoreMessage msg = new DatabaseStoreMessage(_context); msg.setKey(_state.getTarget()); if (_state.getData() instanceof RouterInfo) @@ -189,43 +200,61 @@ class StoreJob extends JobImpl { if (router.getIdentity().equals(_context.router().getRouterInfo().getIdentity())) { // don't send it to ourselves if (_log.shouldLog(Log.ERROR)) - _log.error("Dont send store to ourselves - why did we try?"); + _log.error(getJobId() + ": Dont send store to ourselves - why did we try?"); return; } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send store to " + router.getIdentity().getHash().toBase64()); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": Send store to " + router.getIdentity().getHash().toBase64()); } sendStore(msg, router, _expiration); } - protected void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { - sendStoreThroughTunnel(msg, peer, expiration); + private void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { + _context.statManager().addRateData("netDb.storeSent", 1, 0); + sendStoreThroughGarlic(msg, peer, expiration); } - protected void sendStoreThroughTunnel(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { - FailedJob fail = new FailedJob(peer); - Job sent = new OptimisticSendSuccess(peer); - TunnelInfo info = null; - TunnelId outboundTunnelId = selectOutboundTunnel(); - if (outboundTunnelId != null) - info = _context.tunnelManager().getTunnelInfo(outboundTunnelId); - if (info == null) { - if (_log.shouldLog(Log.ERROR)) - _log.error("selectOutboundTunnel didn't find a valid tunnel! outboundTunnelId = " - + outboundTunnelId + " is not known by the tunnel manager"); + private void sendStoreThroughGarlic(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { + long token = _context.random().nextInt(Integer.MAX_VALUE); + + TunnelId replyTunnelId = selectInboundTunnel(); + TunnelInfo replyTunnel = _context.tunnelManager().getTunnelInfo(replyTunnelId); + if (replyTunnel == null) { + _log.error("No reply inbound tunnels available!"); return; } - if (_log.shouldLog(Log.INFO)) - _log.info("Store for " + _state.getTarget() + " expiring on " + new Date(_expiration) - + " is going to " + peer.getIdentity().getHash() + " via outbound tunnel: " + info); - // send it out our outboundTunnelId with instructions for our endpoint to forward it - // to the router specified (though no particular tunnelId on the target) - Job j = new SendTunnelMessageJob(_context, msg, outboundTunnelId, peer.getIdentity().getHash(), - null, sent, null, fail, null, _expiration-_context.clock().now(), - STORE_PRIORITY); - _context.jobQueue().addJob(j); - _context.statManager().addRateData("netDb.storeSent", 1, 0); + msg.setReplyToken(token); + msg.setReplyTunnel(replyTunnelId); + msg.setReplyGateway(replyTunnel.getThisHop()); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": send(dbStore) w/ token expected " + token); + + _state.addPending(peer.getIdentity().getHash()); + + SendSuccessJob onReply = new SendSuccessJob(peer); + FailedJob onFail = new FailedJob(peer); + StoreMessageSelector selector = new StoreMessageSelector(_context, getJobId(), peer, token, expiration); + + TunnelId outTunnelId = selectOutboundTunnel(); + if (outTunnelId != null) { + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": Sending tunnel message out " + outTunnelId + " to " + // + peer.getIdentity().getHash().toBase64()); + TunnelId targetTunnelId = null; // not needed + Job onSend = null; // not wanted + SendTunnelMessageJob j = new SendTunnelMessageJob(_context, msg, outTunnelId, + peer.getIdentity().getHash(), + targetTunnelId, onSend, onReply, + onFail, selector, STORE_TIMEOUT_MS, + STORE_PRIORITY); + _context.jobQueue().addJob(j); + } else { + if (_log.shouldLog(Log.ERROR)) + _log.error("No outbound tunnels to send a dbStore out!"); + fail(); + } } private TunnelId selectOutboundTunnel() { @@ -244,31 +273,43 @@ class StoreJob extends JobImpl { } } + private TunnelId selectInboundTunnel() { + TunnelSelectionCriteria criteria = new TunnelSelectionCriteria(); + criteria.setAnonymityPriority(80); + criteria.setLatencyPriority(50); + criteria.setReliabilityPriority(20); + criteria.setMaximumTunnelsRequired(1); + criteria.setMinimumTunnelsRequired(1); + List tunnelIds = _context.tunnelManager().selectInboundTunnelIds(criteria); + if (tunnelIds.size() <= 0) { + _log.error("No inbound tunnels?!"); + return null; + } else { + return (TunnelId)tunnelIds.get(0); + } + } + /** - * Called after a match to a db store is found (match against a deliveryStatusMessage) + * Called after sending a dbStore to a peer successfully, + * marking the store as successful * */ - - /** - * Called after sending a dbStore to a peer successfully without waiting for confirm and - * optimistically mark the store as successful - * - */ - protected class OptimisticSendSuccess extends JobImpl { - private Hash _peer; - - public OptimisticSendSuccess(RouterInfo peer) { + private class SendSuccessJob extends JobImpl implements ReplyJob { + private RouterInfo _peer; + + public SendSuccessJob(RouterInfo peer) { super(StoreJob.this._context); - _peer = peer.getIdentity().getHash(); + _peer = peer; } - public String getName() { return "Optimistic Kademlia Store Send Success"; } + public String getName() { return "Kademlia Store Send Success"; } public void runJob() { + long howLong = _state.confirmed(_peer.getIdentity().getHash()); if (_log.shouldLog(Log.INFO)) - _log.info("Optimistically marking store of " + _state.getTarget() - + " to " + _peer + " successful"); - //long howLong = _state.confirmed(_peer); - //ProfileManager.getInstance().dbStoreSent(_peer, howLong); + _log.info(StoreJob.this.getJobId() + ": Marking store of " + _state.getTarget() + + " to " + _peer.getIdentity().getHash().toBase64() + " successful after " + howLong); + _context.profileManager().dbStoreSent(_peer.getIdentity().getHash(), howLong); + _context.statManager().addRateData("netDb.ackTime", howLong, howLong); if (_state.getSuccessful().size() >= REDUNDANCY) { succeed(); @@ -276,6 +317,10 @@ class StoreJob extends JobImpl { sendNext(); } } + + public void setMessage(I2NPMessage message) { + // ignored, since if the selector matched it, its fine by us + } } /** @@ -283,244 +328,47 @@ class StoreJob extends JobImpl { * reached, or if the peer could not be contacted at all. * */ - protected class FailedJob extends JobImpl { - private Hash _peer; + private class FailedJob extends JobImpl { + private RouterInfo _peer; + public FailedJob(RouterInfo peer) { super(StoreJob.this._context); - _peer = peer.getIdentity().getHash(); + _peer = peer; } public void runJob() { - _state.replyTimeout(_peer); - _context.profileManager().dbStoreFailed(_peer); + if (_log.shouldLog(Log.WARN)) + _log.warn(StoreJob.this.getJobId() + ": Peer " + _peer.getIdentity().getHash().toBase64() + " timed out"); + _state.replyTimeout(_peer.getIdentity().getHash()); + _context.profileManager().dbStoreFailed(_peer.getIdentity().getHash()); + sendNext(); } public String getName() { return "Kademlia Store Failed"; } } - /** - * Check to see the message is a reply from the peer regarding the current - * search - * - */ - protected class StoreMessageSelector implements MessageSelector { - private Hash _peer; - private long _waitingForId; - private boolean _found; - public StoreMessageSelector(RouterInfo peer, long waitingForId) { - _peer = peer.getIdentity().getHash(); - _found = false; - _waitingForId = waitingForId; - } - - public boolean continueMatching() { return !_found; } - public long getExpiration() { return _expiration; } - public boolean isMatch(I2NPMessage message) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("isMatch("+message.getClass().getName() + ") [want deliveryStatusMessage from " - + _peer + " wrt " + _state.getTarget() + "]"); - if (message instanceof DeliveryStatusMessage) { - DeliveryStatusMessage msg = (DeliveryStatusMessage)message; - if (msg.getMessageId() == _waitingForId) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Found match for the key we're waiting for: " + _waitingForId); - _found = true; - return true; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("DeliveryStatusMessage of a key we're not looking for"); - return false; - } - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Not a DeliveryStatusMessage"); - return false; - } - } - } - /** * Send was totally successful */ - protected void succeed() { + private void succeed() { if (_log.shouldLog(Log.INFO)) - _log.info("Succeeded sending key " + _state.getTarget()); + _log.info(getJobId() + ": Succeeded sending key " + _state.getTarget()); if (_log.shouldLog(Log.DEBUG)) - _log.debug("State of successful send: " + _state); + _log.debug(getJobId() + ": State of successful send: " + _state); if (_onSuccess != null) _context.jobQueue().addJob(_onSuccess); _facade.noteKeySent(_state.getTarget()); + _context.statManager().addRateData("netDb.storePeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted()); } /** * Send totally failed */ - protected void fail() { + private void fail() { if (_log.shouldLog(Log.INFO)) - _log.info("Failed sending key " + _state.getTarget()); + _log.info(getJobId() + ": Failed sending key " + _state.getTarget()); if (_log.shouldLog(Log.DEBUG)) - _log.debug("State of failed send: " + _state, new Exception("Who failed me?")); + _log.debug(getJobId() + ": State of failed send: " + _state, new Exception("Who failed me?")); if (_onFailure != null) _context.jobQueue().addJob(_onFailure); } - - protected class StoreState { - private Hash _key; - private DataStructure _data; - private HashSet _pendingPeers; - private HashMap _pendingPeerTimes; - private HashSet _successfulPeers; - private HashSet _successfulExploratoryPeers; - private HashSet _failedPeers; - private HashSet _attemptedPeers; - private volatile long _completed; - private volatile long _started; - - public StoreState(Hash key, DataStructure data) { - this(key, data, null); - } - public StoreState(Hash key, DataStructure data, Set toSkip) { - _key = key; - _data = data; - _pendingPeers = new HashSet(16); - _pendingPeerTimes = new HashMap(16); - _attemptedPeers = new HashSet(16); - if (toSkip != null) - _attemptedPeers.addAll(toSkip); - _failedPeers = new HashSet(16); - _successfulPeers = new HashSet(16); - _successfulExploratoryPeers = new HashSet(16); - _completed = -1; - _started = _context.clock().now(); - } - - public Hash getTarget() { return _key; } - public DataStructure getData() { return _data; } - public Set getPending() { - synchronized (_pendingPeers) { - return (Set)_pendingPeers.clone(); - } - } - public Set getAttempted() { - synchronized (_attemptedPeers) { - return (Set)_attemptedPeers.clone(); - } - } - public Set getSuccessful() { - synchronized (_successfulPeers) { - return (Set)_successfulPeers.clone(); - } - } - public Set getSuccessfulExploratory() { - synchronized (_successfulExploratoryPeers) { - return (Set)_successfulExploratoryPeers.clone(); - } - } - public Set getFailed() { - synchronized (_failedPeers) { - return (Set)_failedPeers.clone(); - } - } - public boolean completed() { return _completed != -1; } - public void complete(boolean completed) { - if (completed) - _completed = _context.clock().now(); - } - - public long getWhenStarted() { return _started; } - public long getWhenCompleted() { return _completed; } - - public void addPending(Collection pending) { - synchronized (_pendingPeers) { - _pendingPeers.addAll(pending); - for (Iterator iter = pending.iterator(); iter.hasNext(); ) - _pendingPeerTimes.put(iter.next(), new Long(_context.clock().now())); - } - synchronized (_attemptedPeers) { - _attemptedPeers.addAll(pending); - } - } - - public long confirmed(Hash peer) { - long rv = -1; - synchronized (_pendingPeers) { - _pendingPeers.remove(peer); - Long when = (Long)_pendingPeerTimes.remove(peer); - if (when != null) - rv = _context.clock().now() - when.longValue(); - } - synchronized (_successfulPeers) { - _successfulPeers.add(peer); - } - return rv; - } - - public long confirmedExploratory(Hash peer) { - long rv = -1; - synchronized (_pendingPeers) { - _pendingPeers.remove(peer); - Long when = (Long)_pendingPeerTimes.remove(peer); - if (when != null) - rv = _context.clock().now() - when.longValue(); - } - synchronized (_successfulExploratoryPeers) { - _successfulExploratoryPeers.add(peer); - } - return rv; - } - - public void replyTimeout(Hash peer) { - synchronized (_pendingPeers) { - _pendingPeers.remove(peer); - } - synchronized (_failedPeers) { - _failedPeers.add(peer); - } - } - - public String toString() { - StringBuffer buf = new StringBuffer(256); - buf.append("Storing ").append(_key); - buf.append(" "); - if (_completed <= 0) - buf.append(" completed? false "); - else - buf.append(" completed on ").append(new Date(_completed)); - buf.append(" Attempted: "); - synchronized (_attemptedPeers) { - for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - buf.append(" Pending: "); - synchronized (_pendingPeers) { - for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - buf.append(" Failed: "); - synchronized (_failedPeers) { - for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - buf.append(" Successful: "); - synchronized (_successfulPeers) { - for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - buf.append(" Successful Exploratory: "); - synchronized (_successfulExploratoryPeers) { - for (Iterator iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - return buf.toString(); - } - } } \ No newline at end of file diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreMessageSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreMessageSelector.java new file mode 100644 index 000000000..f45c55f0f --- /dev/null +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreMessageSelector.java @@ -0,0 +1,58 @@ +package net.i2p.router.networkdb.kademlia; + +import net.i2p.router.RouterContext; +import net.i2p.router.MessageSelector; +import net.i2p.util.Log; +import net.i2p.data.Hash; +import net.i2p.data.RouterInfo; +import net.i2p.data.i2np.I2NPMessage; +import net.i2p.data.i2np.DeliveryStatusMessage; + +/** + * Check to see the message is a reply from the peer regarding the current + * store + * + */ +class StoreMessageSelector implements MessageSelector { + private Log _log; + private Hash _peer; + private long _storeJobId; + private long _waitingForId; + private long _expiration; + private boolean _found; + public StoreMessageSelector(RouterContext ctx, long storeJobId, RouterInfo peer, long waitingForId, + long expiration) { + _log = ctx.logManager().getLog(StoreMessageSelector.class); + _peer = peer.getIdentity().getHash(); + _storeJobId = storeJobId; + _found = false; + _waitingForId = waitingForId; + _expiration = expiration; + } + + public boolean continueMatching() { return !_found; } + public long getExpiration() { return _expiration; } + public boolean isMatch(I2NPMessage message) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_storeJobId + ": isMatch("+message.getClass().getName() + ") [want deliveryStatusMessage from " + + _peer + "]"); + if (message instanceof DeliveryStatusMessage) { + DeliveryStatusMessage msg = (DeliveryStatusMessage)message; + if (msg.getMessageId() == _waitingForId) { + if (_log.shouldLog(Log.INFO)) + _log.info(_storeJobId + ": Found match for the key we're waiting for: " + _waitingForId); + _found = true; + return true; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_storeJobId + ": DeliveryStatusMessage of a key we're not looking for"); + return false; + } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_storeJobId + ": Not a DeliveryStatusMessage"); + return false; + } + } +} + diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java new file mode 100644 index 000000000..d85082a78 --- /dev/null +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java @@ -0,0 +1,189 @@ +package net.i2p.router.networkdb.kademlia; + +import java.util.Set; +import java.util.HashSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Collection; +import java.util.Date; + +import net.i2p.data.Hash; +import net.i2p.data.DataStructure; +import net.i2p.router.RouterContext; + +class StoreState { + private RouterContext _context; + private Hash _key; + private DataStructure _data; + private HashSet _pendingPeers; + private HashMap _pendingPeerTimes; + private HashSet _successfulPeers; + private HashSet _successfulExploratoryPeers; + private HashSet _failedPeers; + private HashSet _attemptedPeers; + private volatile long _completed; + private volatile long _started; + + public StoreState(RouterContext ctx, Hash key, DataStructure data) { + this(ctx, key, data, null); + } + public StoreState(RouterContext ctx, Hash key, DataStructure data, Set toSkip) { + _context = ctx; + _key = key; + _data = data; + _pendingPeers = new HashSet(16); + _pendingPeerTimes = new HashMap(16); + _attemptedPeers = new HashSet(16); + if (toSkip != null) + _attemptedPeers.addAll(toSkip); + _failedPeers = new HashSet(16); + _successfulPeers = new HashSet(16); + _successfulExploratoryPeers = new HashSet(16); + _completed = -1; + _started = _context.clock().now(); + } + + public Hash getTarget() { return _key; } + public DataStructure getData() { return _data; } + public Set getPending() { + synchronized (_pendingPeers) { + return (Set)_pendingPeers.clone(); + } + } + public Set getAttempted() { + synchronized (_attemptedPeers) { + return (Set)_attemptedPeers.clone(); + } + } + public Set getSuccessful() { + synchronized (_successfulPeers) { + return (Set)_successfulPeers.clone(); + } + } + public Set getSuccessfulExploratory() { + synchronized (_successfulExploratoryPeers) { + return (Set)_successfulExploratoryPeers.clone(); + } + } + public Set getFailed() { + synchronized (_failedPeers) { + return (Set)_failedPeers.clone(); + } + } + public boolean completed() { return _completed != -1; } + public void complete(boolean completed) { + if (completed) + _completed = _context.clock().now(); + } + + public long getWhenStarted() { return _started; } + public long getWhenCompleted() { return _completed; } + + public void addPending(Hash peer) { + synchronized (_pendingPeers) { + _pendingPeers.add(peer); + _pendingPeerTimes.put(peer, new Long(_context.clock().now())); + } + synchronized (_attemptedPeers) { + _attemptedPeers.add(peer); + } + } + public void addPending(Collection pending) { + synchronized (_pendingPeers) { + _pendingPeers.addAll(pending); + for (Iterator iter = pending.iterator(); iter.hasNext(); ) + _pendingPeerTimes.put(iter.next(), new Long(_context.clock().now())); + } + synchronized (_attemptedPeers) { + _attemptedPeers.addAll(pending); + } + } + + public long confirmed(Hash peer) { + long rv = -1; + synchronized (_pendingPeers) { + _pendingPeers.remove(peer); + Long when = (Long)_pendingPeerTimes.remove(peer); + if (when != null) + rv = _context.clock().now() - when.longValue(); + } + synchronized (_successfulPeers) { + _successfulPeers.add(peer); + } + return rv; + } + + public long confirmedExploratory(Hash peer) { + long rv = -1; + synchronized (_pendingPeers) { + _pendingPeers.remove(peer); + Long when = (Long)_pendingPeerTimes.remove(peer); + if (when != null) + rv = _context.clock().now() - when.longValue(); + } + synchronized (_successfulExploratoryPeers) { + _successfulExploratoryPeers.add(peer); + } + return rv; + } + + public void replyTimeout(Hash peer) { + synchronized (_pendingPeers) { + _pendingPeers.remove(peer); + } + synchronized (_failedPeers) { + _failedPeers.add(peer); + } + } + + public String toString() { + StringBuffer buf = new StringBuffer(256); + buf.append("Storing ").append(_key); + buf.append(" "); + if (_completed <= 0) + buf.append(" completed? false "); + else + buf.append(" completed on ").append(new Date(_completed)); + buf.append(" Attempted: "); + synchronized (_attemptedPeers) { + buf.append(_attemptedPeers.size()).append(' '); + for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + buf.append(" Pending: "); + synchronized (_pendingPeers) { + buf.append(_pendingPeers.size()).append(' '); + for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + buf.append(" Failed: "); + synchronized (_failedPeers) { + buf.append(_failedPeers.size()).append(' '); + for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + buf.append(" Successful: "); + synchronized (_successfulPeers) { + buf.append(_successfulPeers.size()).append(' '); + for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + buf.append(" Successful Exploratory: "); + synchronized (_successfulExploratoryPeers) { + buf.append(_successfulExploratoryPeers.size()).append(' '); + for (Iterator iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + return buf.toString(); + } +} \ No newline at end of file diff --git a/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java b/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java index 993b493d0..37aa80eb2 100644 --- a/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java +++ b/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java @@ -34,11 +34,11 @@ public class IsFailingCalculator extends Calculator { (profile.getCommError().getRate(60*1000).getLastEventCount() > 0) ) { return true; } else { - if ( (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getCurrentEventCount() > 0) || - (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getLastEventCount() > 0) ) { - // are they overloaded (or disconnected)? - return true; - } + //if ( (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getCurrentEventCount() > 0) || + // (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getLastEventCount() > 0) ) { + // // are they overloaded (or disconnected)? + // return true; + //} long recently = _context.clock().now() - GRACE_PERIOD; @@ -47,6 +47,11 @@ public class IsFailingCalculator extends Calculator { return true; } + if (profile.getTunnelHistory().getLastFailed() >= recently) { + // has a tunnel they participate in failed in the last 5 minutes? + return true; + } + if (profile.getLastSendFailed() >= recently) return true; diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java index 597926019..4adff48d6 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java @@ -65,6 +65,14 @@ public class ProfileOrganizer { public static final String PROP_RELIABILITY_THRESHOLD_FACTOR = "profileOrganizer.reliabilityThresholdFactor"; public static final double DEFAULT_RELIABILITY_THRESHOLD_FACTOR = .5d; + /** + * Defines the minimum number of 'fast' peers that the organizer should select. See + * {@see getMinimumFastPeers} + * + */ + public static final String PROP_MINIMUM_FAST_PEERS = "profileOrganizer.minFastPeers"; + public static final int DEFAULT_MINIMUM_FAST_PEERS = 4; + /** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */ private Object _reorganizeLock = new Object(); @@ -323,6 +331,7 @@ public class ProfileOrganizer { _strictReliabilityOrder = reordered; locked_unfailAsNecessary(); + locked_promoteFastAsNecessary(); } if (_log.shouldLog(Log.DEBUG)) { @@ -331,6 +340,35 @@ public class ProfileOrganizer { } } + /** + * As with locked_unfailAsNecessary, I'm not sure how much I like this - if there + * aren't enough fast peers, move some of the not-so-fast peers into the fast group. + * This picks the not-so-fast peers based on reliability, not speed, and skips over any + * failing peers. Perhaps it should build a seperate strict ordering by speed? Nah, not + * worth the maintenance and memory overhead, at least not for now. + * + */ + private void locked_promoteFastAsNecessary() { + int minFastPeers = getMinimumFastPeers(); + int numToPromote = minFastPeers - _fastAndReliablePeers.size(); + if (numToPromote > 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Need to explicitly promote " + numToPromote + " peers to the fast+reliable group"); + for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) { + PeerProfile cur = (PeerProfile)iter.next(); + if ( (!_fastAndReliablePeers.containsKey(cur.getPeer())) && (!cur.getIsFailing()) ) { + _fastAndReliablePeers.put(cur.getPeer(), cur); + // no need to remove it from any of the other groups, since if it is + // fast and reliable, it is reliable, and it is not failing + numToPromote--; + if (numToPromote <= 0) + break; + } + } + } + return; + } + /** how many not failing/active peers must we have? */ private final static int MIN_NOT_FAILING_ACTIVE = 3; /** @@ -655,6 +693,7 @@ public class ProfileOrganizer { double rv = Double.parseDouble(val); if (_log.shouldLog(Log.DEBUG)) _log.debug("router context said " + PROP_RELIABILITY_THRESHOLD_FACTOR+ '=' + val); + return rv; } catch (NumberFormatException nfe) { if (_log.shouldLog(Log.WARN)) _log.warn("Reliability threshold factor improperly set in the router environment [" + val + "]", nfe); @@ -666,6 +705,48 @@ public class ProfileOrganizer { return DEFAULT_RELIABILITY_THRESHOLD_FACTOR; } + /** + * Defines the minimum number of 'fast' peers that the organizer should select. If + * the profile calculators derive a threshold that does not select at least this many peers, + * the threshold will be overridden to make sure this many peers are in the fast+reliable group. + * This parameter should help deal with a lack of diversity in the tunnels created when some + * peers are particularly fast. + * + * @return minimum number of peers to be placed in the 'fast+reliable' group + */ + private int getMinimumFastPeers() { + if (_context.router() != null) { + String val = _context.router().getConfigSetting(PROP_MINIMUM_FAST_PEERS); + if (val != null) { + try { + int rv = Integer.parseInt(val); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("router config said " + PROP_MINIMUM_FAST_PEERS + '=' + val); + return rv; + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Minimum fast peers improperly set in the router config [" + val + "]", nfe); + } + } + } + String val = _context.getProperty(PROP_MINIMUM_FAST_PEERS, ""+DEFAULT_MINIMUM_FAST_PEERS); + if (val != null) { + try { + int rv = Integer.parseInt(val); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("router context said " + PROP_MINIMUM_FAST_PEERS + '=' + val); + return rv; + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Minimum fast peers improperly set in the router environment [" + val + "]", nfe); + } + } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("no config for " + PROP_MINIMUM_FAST_PEERS + ", using " + DEFAULT_MINIMUM_FAST_PEERS); + return DEFAULT_MINIMUM_FAST_PEERS; + } + private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK)); private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } } } diff --git a/router/java/src/net/i2p/router/transport/VMCommSystem.java b/router/java/src/net/i2p/router/transport/VMCommSystem.java index 0fa060fd4..935bc9ed7 100644 --- a/router/java/src/net/i2p/router/transport/VMCommSystem.java +++ b/router/java/src/net/i2p/router/transport/VMCommSystem.java @@ -85,7 +85,7 @@ public class VMCommSystem extends CommSystemFacade { _from = from; _msg = msg; // bah, ueberspeed! - //getTiming().setStartAfter(us.clock().now() + 50); + getTiming().setStartAfter(us.clock().now() + 200); } public void runJob() { I2NPMessageHandler handler = new I2NPMessageHandler(_ctx); diff --git a/router/java/src/net/i2p/router/tunnelmanager/ClientTunnelPoolManagerJob.java b/router/java/src/net/i2p/router/tunnelmanager/ClientTunnelPoolManagerJob.java index 3bbe9cc9f..1e39d6ca2 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/ClientTunnelPoolManagerJob.java +++ b/router/java/src/net/i2p/router/tunnelmanager/ClientTunnelPoolManagerJob.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.TreeMap; +import java.util.Date; import net.i2p.data.TunnelId; import net.i2p.router.JobImpl; @@ -96,6 +97,8 @@ class ClientTunnelPoolManagerJob extends JobImpl { private void requestMoreTunnels(int numTunnels) { if (_clientPool.getClientSettings().getDepthInbound() < 1) { // the client wants 0-hop tunnels, so don't waste longer tunnels on them + if (_log.shouldLog(Log.DEBUG)) + _log.debug("0 hop tunnels wanted - create custom ones"); requestCustomTunnels(numTunnels); return; } @@ -103,6 +106,9 @@ class ClientTunnelPoolManagerJob extends JobImpl { int allocated = allocateExisting(numTunnels); if (allocated < numTunnels) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("insufficient tunnels available (wanted: " + numTunnels + + ", allocated: " + allocated + ", requesting custom ones"); requestCustomTunnels(numTunnels - allocated); } else { if (_log.shouldLog(Log.DEBUG)) @@ -169,10 +175,14 @@ class ClientTunnelPoolManagerJob extends JobImpl { return false; } - long expireAfter = _context.clock().now() + POOL_CHECK_DELAY + _tunnelPool.getTunnelCreationTimeout()*2; + // (furthest in the future) - (rebuild buffer time) + long expireAfter = _context.clock().now() + _tunnelPool.getPoolSettings().getInboundDuration() + - POOL_CHECK_DELAY - _tunnelPool.getTunnelCreationTimeout()*2; if (info.getSettings().getExpiration() <= expireAfter) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Refusing tunnel " + info.getTunnelId() + " because it is going to expire soon"); + _log.debug("Refusing tunnel " + info.getTunnelId() + " because it is going to expire soon (" + + new Date(info.getSettings().getExpiration()) + + ", before " + new Date(expireAfter) + ")"); return false; }