diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 1d748c1523..ad9eaafc37 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -258,7 +258,7 @@ public class Connection { } packet.setFlag(Packet.FLAG_DELAY_REQUESTED); - long timeout = (_options.getRTT() < MIN_RESEND_DELAY ? MIN_RESEND_DELAY : _options.getRTT()); + long timeout = _options.getRTT() + MIN_RESEND_DELAY; if (timeout > MAX_RESEND_DELAY) timeout = MAX_RESEND_DELAY; if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index d5d84d2ab4..a55c2e36f9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -552,6 +552,11 @@ public class Packet { } public String toString() { + StringBuffer str = formatAsString(); + return str.toString(); + } + + protected StringBuffer formatAsString() { StringBuffer buf = new StringBuffer(64); buf.append(toId(_sendStreamId)); //buf.append("<-->"); @@ -570,7 +575,7 @@ public class Packet { } if ( (_payload != null) && (_payload.getValid() > 0) ) buf.append(" data: ").append(_payload.getValid()); - return buf.toString(); + return buf; } private static final String toId(byte id[]) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 2aba60361d..1c53401519 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -107,11 +107,15 @@ public class PacketHandler { private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS"); void displayPacket(Packet packet, String prefix, String suffix) { if (!_log.shouldLog(Log.DEBUG)) return; - String msg = null; + StringBuffer buf = new StringBuffer(256); synchronized (_fmt) { - msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString() + (suffix != null ? " " + suffix : ""); + buf.append(_fmt.format(new Date())); } - System.out.println(msg); + buf.append(": ").append(prefix).append(" "); + buf.append(packet.toString()); + if (suffix != null) + buf.append(" ").append(suffix); + System.out.println(buf.toString()); } private void receiveKnownCon(Connection con, Packet packet) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index d6ad8d69f9..fce529e951 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -114,16 +114,44 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public void setResendPacketEvent(SimpleTimer.TimedEvent evt) { _resendEvent = evt; } - public String toString() { - String str = super.toString(); + public StringBuffer formatAsString() { + StringBuffer buf = super.formatAsString(); + + Connection con = _connection; + if (con != null) + buf.append(" rtt ").append(con.getOptions().getRTT()); if ( (_tagsSent != null) && (_tagsSent.size() > 0) ) - str = str + " with tags"; + buf.append(" with tags"); if (_ackOn > 0) - return str + " ack after " + getAckTime() + (_numSends <= 1 ? "" : " sent " + _numSends + " times"); - else - return str + (_numSends <= 1 ? "" : " sent " + _numSends + " times"); + buf.append(" ack after ").append(getAckTime()); + + if (_numSends > 1) + buf.append(" sent ").append(_numSends).append(" times"); + + if (isFlagSet(Packet.FLAG_SYNCHRONIZE) || + isFlagSet(Packet.FLAG_CLOSE) || + isFlagSet(Packet.FLAG_RESET)) { + + if (con != null) { + buf.append(" from "); + Destination local = con.getSession().getMyDestination(); + if (local != null) + buf.append(local.calculateHash().toBase64().substring(0,4)); + else + buf.append("unknown"); + + buf.append(" to "); + Destination remote = con.getRemotePeer(); + if (remote != null) + buf.append(remote.calculateHash().toBase64().substring(0,4)); + else + buf.append("unknown"); + + } + } + return buf; } public void waitForAccept(int maxWaitMs) { diff --git a/core/java/src/net/i2p/time/Timestamper.java b/core/java/src/net/i2p/time/Timestamper.java index db7b4bd89f..be0ee7b515 100644 --- a/core/java/src/net/i2p/time/Timestamper.java +++ b/core/java/src/net/i2p/time/Timestamper.java @@ -172,7 +172,7 @@ public class Timestamper implements Runnable { if (Math.abs(delta) < MAX_VARIANCE) { if (_log.shouldLog(Log.INFO)) _log.info("a single SNTP query was within the tolerance (" + delta + "ms)"); - return true; + break; } else { // outside the tolerance, lets iterate across the concurring queries expectedDelta = delta; diff --git a/history.txt b/history.txt index 892e78fd78..8a30c2ea32 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,31 @@ -$Id: history.txt,v 1.166 2005/03/06 19:40:45 jrandom Exp $ +$Id: history.txt,v 1.167 2005/03/07 21:45:14 jrandom Exp $ + +2005-03-11 jrandom + * Rather than the fixed resend timeout floor (10s), use 10s+RTT as the + minimum (increased on resends as before, of course). + * Always prod the clock update listeners, even if just to tell them that + the time hasn't changed much. + * Added support for explicit peer selection for individual tunnel pools, + which will be useful in debugging but not recommended for use by normal + end users. + * More aggressively search for the next hop's routerInfo on tunnel join. + * Give messages received via inbound tunnels that are bound to remote + locations sufficient time (taking into account clock skew). + * Give alternate direct send messages sufficient time (10s min, not 5s) + * Always give the end to end data message the explicit timeout (though the + old default was sufficient before) + * No need to give end to end messages an insane expiration (+2m), as we + are already handling skew on the receiving side. + * Don't complain too loudly about expired TunnelCreateMessages (at least, + not until after all those 0.5 and 0.5.0.1 users upgrade ;) + * Properly keep the sendBps stat + * When running the router with router.keepHistory=true, log more data to + messageHistory.txt + * Logging updates + * Minor formatting updates + +2005-03-08 jrandom + * More aggressively adjust the clock 2005-03-07 jrandom * Fix the HTTP response header filter to allow multiple headers with the diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessage.java b/router/java/src/net/i2p/data/i2np/I2NPMessage.java index 4d9259f437..59d7fe5974 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessage.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessage.java @@ -67,6 +67,8 @@ public interface I2NPMessage extends DataStructure { * */ public long getMessageExpiration(); + public void setMessageExpiration(long exp); + /** How large the message is, including any checksums */ public int getMessageSize(); diff --git a/router/java/src/net/i2p/router/InNetMessagePool.java b/router/java/src/net/i2p/router/InNetMessagePool.java index b4428908f8..40ee57d7e2 100644 --- a/router/java/src/net/i2p/router/InNetMessagePool.java +++ b/router/java/src/net/i2p/router/InNetMessagePool.java @@ -17,6 +17,7 @@ import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.DatabaseSearchReplyMessage; import net.i2p.data.i2np.DatabaseLookupMessage; +import net.i2p.data.i2np.TunnelCreateMessage; import net.i2p.data.i2np.TunnelCreateStatusMessage; import net.i2p.data.i2np.TunnelDataMessage; import net.i2p.data.i2np.TunnelGatewayMessage; @@ -103,11 +104,15 @@ public class InNetMessagePool implements Service { if (messageBody instanceof TunnelDataMessage) { // do not validate the message with the validator - the IV validator is sufficient } else { - boolean valid = _context.messageValidator().validateMessage(messageBody.getUniqueId(), exp); - if (!valid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Duplicate message received [" + messageBody.getUniqueId() - + " expiring on " + exp + "]: " + messageBody.getClass().getName()); + String invalidReason = _context.messageValidator().validateMessage(messageBody.getUniqueId(), exp); + if (invalidReason != null) { + int level = Log.WARN; + if (messageBody instanceof TunnelCreateMessage) + level = Log.INFO; + if (_log.shouldLog(level)) + _log.log(level, "Duplicate message received [" + messageBody.getUniqueId() + + " expiring on " + exp + "]: " + messageBody.getClass().getName() + ": " + invalidReason + + ": " + messageBody); _context.statManager().addRateData("inNetPool.dropped", 1, 0); _context.statManager().addRateData("inNetPool.duplicate", 1, 0); _context.messageHistory().droppedOtherMessage(messageBody); diff --git a/router/java/src/net/i2p/router/MessageHistory.java b/router/java/src/net/i2p/router/MessageHistory.java index 41cf440cd5..0e087b35dc 100644 --- a/router/java/src/net/i2p/router/MessageHistory.java +++ b/router/java/src/net/i2p/router/MessageHistory.java @@ -318,7 +318,7 @@ public class MessageHistory { */ public void sendMessage(String messageType, long messageId, long expiration, Hash peer, boolean sentOk) { if (!_doLog) return; - if (true) return; + if (false) return; StringBuffer buf = new StringBuffer(256); buf.append(getPrefix()); buf.append("send [").append(messageType).append("] message [").append(messageId).append("] "); @@ -344,7 +344,7 @@ public class MessageHistory { */ public void receiveMessage(String messageType, long messageId, long expiration, Hash from, boolean isValid) { if (!_doLog) return; - if (true) return; + if (false) return; StringBuffer buf = new StringBuffer(256); buf.append(getPrefix()); buf.append("receive [").append(messageType).append("] with id [").append(messageId).append("] "); diff --git a/router/java/src/net/i2p/router/MessageValidator.java b/router/java/src/net/i2p/router/MessageValidator.java index e12417c2d8..8c35bff262 100644 --- a/router/java/src/net/i2p/router/MessageValidator.java +++ b/router/java/src/net/i2p/router/MessageValidator.java @@ -31,20 +31,20 @@ public class MessageValidator { /** * Determine if this message should be accepted as valid (not expired, not a duplicate) * - * @return true if the message should be accepted as valid, false otherwise + * @return reason why the message is invalid (or null if the message is valid) */ - public boolean validateMessage(long messageId, long expiration) { + public String validateMessage(long messageId, long expiration) { long now = _context.clock().now(); if (now - Router.CLOCK_FUDGE_FACTOR >= expiration) { if (_log.shouldLog(Log.WARN)) _log.warn("Rejecting message " + messageId + " because it expired " + (now-expiration) + "ms ago"); _context.statManager().addRateData("router.invalidMessageTime", (now-expiration), 0); - return false; + return "expired " + (now-expiration) + "ms ago"; } else if (now + 4*Router.CLOCK_FUDGE_FACTOR < expiration) { if (_log.shouldLog(Log.WARN)) _log.warn("Rejecting message " + messageId + " because it will expire too far in the future (" + (expiration-now) + "ms)"); _context.statManager().addRateData("router.invalidMessageTime", (now-expiration), 0); - return false; + return "expire too far in the future (" + (expiration-now) + "ms)"; } boolean isDuplicate = noteReception(messageId, expiration); @@ -52,11 +52,11 @@ public class MessageValidator { if (_log.shouldLog(Log.WARN)) _log.warn("Rejecting message " + messageId + " because it is a duplicate", new Exception("Duplicate origin")); _context.statManager().addRateData("router.duplicateMessageId", 1, 0); - return false; + return "duplicate"; } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Accepting message " + messageId + " because it is NOT a duplicate", new Exception("Original origin")); - return true; + return null; } } diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index 600b92c000..a3a1ba0527 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -371,7 +371,7 @@ public class Router { RateStat sendRate = _context.statManager().getRate("transport.sendMessageSize"); if (sendRate != null) { - Rate rate = receiveRate.getRate(60*1000); + Rate rate = sendRate.getRate(60*1000); if (rate != null) { double bytes = rate.getLastTotalValue(); double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 0dc6cccb9c..ccb0ede398 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.160 $ $Date: 2005/03/06 19:07:27 $"; + public final static String ID = "$Revision: 1.161 $ $Date: 2005/03/07 21:45:15 $"; public final static String VERSION = "0.5.0.2"; - public final static long BUILD = 1; + public final static long BUILD = 2; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java index 68bee587d8..77d7c4b1be 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java @@ -109,6 +109,10 @@ public class GarlicMessageBuilder { msg.setData(encData); msg.setMessageExpiration(config.getExpiration()); + long timeFromNow = config.getExpiration() - ctx.clock().now(); + if (timeFromNow < 10*1000) + log.error("Building a message expiring in " + timeFromNow + "ms: " + config, new Exception("created by")); + if (log.shouldLog(Log.WARN)) log.warn("CloveSet size for message " + msg.getUniqueId() + " is " + cloveSet.length + " and encrypted message data is " + encData.length); diff --git a/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java b/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java index 0f12039d3c..27cf5dafea 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java @@ -99,13 +99,13 @@ public class GarlicMessageReceiver { } private boolean isValid(GarlicClove clove) { - boolean valid = _context.messageValidator().validateMessage(clove.getCloveId(), - clove.getExpiration().getTime()); - if (!valid) { + String invalidReason = _context.messageValidator().validateMessage(clove.getCloveId(), + clove.getExpiration().getTime()); + if (invalidReason != null) { String howLongAgo = DataHelper.formatDuration(_context.clock().now()-clove.getExpiration().getTime()); if (_log.shouldLog(Log.ERROR)) _log.error("Clove is NOT valid: id=" + clove.getCloveId() - + " expiration " + howLongAgo + " ago"); + + " expiration " + howLongAgo + " ago: " + invalidReason + ": " + clove); if (_log.shouldLog(Log.WARN)) _log.warn("Clove is NOT valid: id=" + clove.getCloveId() + " expiration " + howLongAgo + " ago", new Exception("Invalid within...")); @@ -113,6 +113,6 @@ public class GarlicMessageReceiver { clove.getData().getClass().getName(), "Clove is not valid (expiration " + howLongAgo + " ago)"); } - return valid; + return (invalidReason == null); } } diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java index f3632ef36f..49800858b1 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java @@ -108,7 +108,7 @@ class OutboundClientMessageJobHelper { config.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null)); config.setDeliveryInstructions(instructions); config.setId(ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE)); - config.setExpiration(expiration+2*Router.CLOCK_FUDGE_FACTOR); + config.setExpiration(expiration); // +2*Router.CLOCK_FUDGE_FACTOR); config.setRecipientPublicKey(recipientPK); config.setRequestAck(false); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index f6a1fa3fc5..2501c162d8 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -51,6 +51,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { private int _clientMessageSize; private Destination _from; private Destination _to; + private String _toString; /** target destination's leaseSet, if known */ private LeaseSet _leaseSet; /** Actual lease the message is being routed through */ @@ -117,6 +118,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _clientMessageSize = msg.getPayload().getSize(); _from = msg.getFromDestination(); _to = msg.getDestination(); + _toString = _to.calculateHash().toBase64().substring(0,4); _leaseSetLookupBegin = -1; String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM); @@ -146,22 +148,24 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _log.debug(getJobId() + ": Send outbound client message job beginning"); buildClove(); if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Clove built"); + _log.debug(getJobId() + ": Clove built to " + _toString); long timeoutMs = _overallExpiration - getContext().clock().now(); if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": preparing to search for the leaseSet"); + _log.debug(getJobId() + ": preparing to search for the leaseSet for " + _toString); Hash key = _to.calculateHash(); SendJob success = new SendJob(getContext()); LookupLeaseSetFailedJob failed = new LookupLeaseSetFailedJob(getContext()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job"); LeaseSet ls = getContext().netDb().lookupLeaseSetLocally(key); if (ls != null) { getContext().statManager().addRateData("client.leaseSetFoundLocally", 1, 0); _leaseSetLookupBegin = -1; + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Send outbound client message - leaseSet found locally for " + _toString); success.runJob(); } else { _leaseSetLookupBegin = getContext().clock().now(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job for " + _toString); getContext().netDb().lookupLeaseSet(key, success, failed, timeoutMs); } } @@ -203,8 +207,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl { boolean ok = getNextLease(); if (ok) send(); - else + else { + if (_log.shouldLog(Log.ERROR)) + _log.error("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")"); dieFatal(); + } } } @@ -218,7 +225,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash()); if (_leaseSet == null) { if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": Lookup locally didn't find the leaseSet"); + _log.warn(getJobId() + ": Lookup locally didn't find the leaseSet for " + _toString); return false; } long now = getContext().clock().now(); @@ -229,7 +236,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { Lease lease = _leaseSet.getLease(i); if (lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) { if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": getNextLease() - expired lease! - " + lease); + _log.warn(getJobId() + ": getNextLease() - expired lease! - " + lease + " for " + _toString); continue; } else { leases.add(lease); @@ -278,6 +285,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { long lookupTime = getContext().clock().now() - _leaseSetLookupBegin; getContext().statManager().addRateData("client.leaseSetFailedRemoteTime", lookupTime, lookupTime); } + + if (_log.shouldLog(Log.ERROR)) + _log.error("Unable to send to " + _toString + " because we couldn't find their leaseSet"); dieFatal(); } @@ -312,12 +322,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // set to null if there are no tunnels to ack the reply back through // (should we always fail for this? or should we send it anyway, even if // we dont receive the reply? hmm...) + if (_log.shouldLog(Log.ERROR)) + _log.error(getJobId() + ": Unable to create the garlic message (no tunnels left) to " + _toString); dieFatal(); return; } if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": send() - token expected " + token); + _log.debug(getJobId() + ": send() - token expected " + token + " to " + _toString); SendSuccessJob onReply = new SendSuccessJob(getContext(), sessKey, tags); SendTimeoutJob onFail = new SendTimeoutJob(getContext()); @@ -325,6 +337,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": Placing GarlicMessage into the new tunnel message bound for " + + _toString + " at " + _lease.getTunnelId() + " on " + _lease.getGateway().toBase64()); @@ -332,6 +345,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { if (outTunnel != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": Sending tunnel message out " + outTunnel.getSendTunnelId(0) + " to " + + _toString + " at " + _lease.getTunnelId() + " on " + _lease.getGateway().toBase64()); @@ -365,7 +379,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl { public String getName() { return "Dispatch outbound client message"; } public void runJob() { getContext().messageRegistry().registerPending(_selector, _replyFound, _replyTimeout, _timeoutMs); + if (_log.shouldLog(Log.INFO)) + _log.info("Dispatching message to " + _toString + ": " + _msg); getContext().tunnelDispatcher().dispatchOutbound(_msg, _outTunnel.getSendTunnelId(0), _lease.getTunnelId(), _lease.getGateway()); + if (_log.shouldLog(Log.INFO)) + _log.info("Dispatching message to " + _toString + " complete"); } } @@ -427,6 +445,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { DataMessage msg = new DataMessage(getContext()); msg.setData(_clientMessage.getPayload().getEncryptedData()); + msg.setMessageExpiration(_overallExpiration); clove.setPayload(msg); clove.setRecipientPublicKey(null); @@ -471,6 +490,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl { return false; } } + + public String toString() { + return "sending " + _toString + " waiting for token " + _pendingToken + + " for cloveId " + _cloveId; + } } /** diff --git a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java index 4a91947554..5ab565ad70 100644 --- a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java +++ b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java @@ -48,10 +48,10 @@ public class SendMessageDirectJob extends JobImpl { _message = message; _targetHash = toPeer; _router = null; - if (timeoutMs < 5*1000) { + if (timeoutMs < 10*1000) { if (_log.shouldLog(Log.WARN)) _log.warn("Very little time given [" + timeoutMs + "], resetting to 5s", new Exception("stingy bastard")); - _expiration = ctx.clock().now() + 5*1000; + _expiration = ctx.clock().now() + 10*1000; } else { _expiration = timeoutMs + ctx.clock().now(); } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index 9688e1a4c9..f772f91748 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -264,8 +264,9 @@ class SearchJob extends JobImpl { _log.error(getJobId() + ": Dont send search to ourselves - why did we try?"); return; } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Send search to " + router.getIdentity().getHash().toBase64()); + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Send search to " + router.getIdentity().getHash().toBase64() + + " for " + _state.getTarget().toBase64()); } getContext().statManager().addRateData("netDb.searchMessageCount", 1, 0); @@ -330,8 +331,8 @@ class SearchJob extends JobImpl { DatabaseLookupMessage msg = buildMessage(expiration); - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64() + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64() + " for " + msg.getSearchKey().toBase64() + " w/ replies to us [" + msg.getFrom().toBase64() + "]"); SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java index 68b5b9211f..89435f3392 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java @@ -357,7 +357,7 @@ public class ProfileOrganizer { // we dont want the good peers, just random ones continue; } else { - if (isOk(cur)) + if (isSelectable(cur)) selected.add(cur); } } @@ -474,7 +474,7 @@ public class ProfileOrganizer { for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) { PeerProfile cur = (PeerProfile)iter.next(); if ( (!_fastPeers.containsKey(cur.getPeer())) && (!cur.getIsFailing()) ) { - if (!isOk(cur.getPeer())) { + if (!isSelectable(cur.getPeer())) { // skip peers we dont have in the netDb if (_log.shouldLog(Log.INFO)) _log.info("skip unknown peer from fast promotion: " + cur.getPeer().toBase64()); @@ -701,7 +701,7 @@ public class ProfileOrganizer { Collections.shuffle(all, _random); for (int i = 0; (matches.size() < howMany) && (i < all.size()); i++) { Hash peer = (Hash)all.get(i); - boolean ok = isOk(peer); + boolean ok = isSelectable(peer); if (ok) matches.add(peer); else @@ -709,7 +709,7 @@ public class ProfileOrganizer { } } - private boolean isOk(Hash peer) { + public boolean isSelectable(Hash peer) { NetworkDatabaseFacade netDb = _context.netDb(); // the CLI shouldn't depend upon the netDb if (netDb == null) return true; @@ -755,7 +755,7 @@ public class ProfileOrganizer { if (_log.shouldLog(Log.DEBUG)) _log.debug("High capacity: \t" + profile.getPeer().toBase64()); if (_thresholdSpeedValue <= profile.getSpeedValue()) { - if (!isOk(profile.getPeer())) { + if (!isSelectable(profile.getPeer())) { if (_log.shouldLog(Log.INFO)) _log.info("Skipping fast mark [!ok] for " + profile.getPeer().toBase64()); } else if (!profile.getIsActive()) { diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index d08e776532..deac4229aa 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -305,17 +305,18 @@ public class TCPTransport extends TransportImpl { void connectionClosed(TCPConnection con) { synchronized (_connectionLock) { TCPConnection cur = (TCPConnection)_connectionsByIdent.remove(con.getRemoteRouterIdentity().getHash()); - if (cur != con) + if ( (cur != null) && (cur != con) ) _connectionsByIdent.put(cur.getRemoteRouterIdentity().getHash(), cur); cur = (TCPConnection)_connectionsByAddress.remove(con.getRemoteAddress().toString()); - if (cur != con) + if ( (cur != null) && (cur != con) ) _connectionsByAddress.put(cur.getRemoteAddress().toString(), cur); if (_log.shouldLog(Log.DEBUG)) { StringBuffer buf = new StringBuffer(256); buf.append("\nCLOSING ").append(con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6)); buf.append("."); - buf.append("\nconnectionsByAddress: (cur=").append(con.getRemoteAddress().toString()).append(") "); + if (cur != null) + buf.append("\nconnectionsByAddress: (cur=").append(con.getRemoteAddress().toString()).append(") "); for (Iterator iter = _connectionsByAddress.keySet().iterator(); iter.hasNext(); ) { String addr = (String)iter.next(); buf.append(addr).append(" "); diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index 1b5a0cb926..b20867e80c 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -363,8 +363,8 @@ public class FragmentHandler { } if (removed && !_msg.getReleased()) { noteFailure(_msg.getMessageId()); - if (_log.shouldLog(Log.WARN)) - _log.warn("Dropped failed fragmented message: " + _msg); + if (_log.shouldLog(Log.ERROR)) + _log.error("Dropped failed fragmented message: " + _msg); _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime()); _msg.failed(); } else { diff --git a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java index 145d044ea5..d6527efd63 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java +++ b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java @@ -90,6 +90,8 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec + " failing to distribute " + msg); return; } + if (msg.getMessageExpiration() < _context.clock().now() + 10*1000) + msg.setMessageExpiration(_context.clock().now() + 10*1000); _context.tunnelDispatcher().dispatchOutbound(msg, outId, tunnel, target); } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 04e991b1e0..48912c0ea6 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -431,6 +431,12 @@ public class TunnelDispatcher implements Service { + (before-msg.getMessageExpiration()) + "ms ago? " + msg, new Exception("cause")); return; + } else if (msg.getMessageExpiration() < before) { + // nonfatal, as long as it was remotely created + if (_log.shouldLog(Log.WARN)) + _log.warn("why are you sending a tunnel message that expired " + + (before-msg.getMessageExpiration()) + "ms ago? " + + msg, new Exception("cause")); } gw.add(msg, targetPeer, targetTunnel); if (targetTunnel == null) diff --git a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java index dc28127601..5f7a392803 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java @@ -46,6 +46,17 @@ public class TunnelParticipant { if ( (_config != null) && (_config.getSendTo() != null) ) { _nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); + if (_nextHopCache == null) + _context.netDb().lookupRouterInfo(_config.getSendTo(), new Found(_context), null, 60*1000); + } + } + + private class Found extends JobImpl { + public Found(RouterContext ctx) { super(ctx); } + public String getName() { return "Next hop info found"; } + public void runJob() { + if (_nextHopCache == null) + _nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java index 4480ab6d2e..b66f59d711 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java @@ -18,6 +18,10 @@ class ClientPeerSelector extends TunnelPeerSelector { if (length < 0) return null; HashSet matches = new HashSet(length); + + if (shouldSelectExplicit(settings)) + return selectExplicit(ctx, settings, length); + ctx.profileOrganizer().selectFastPeers(length, null, matches); matches.remove(ctx.routerHash()); @@ -29,4 +33,5 @@ class ClientPeerSelector extends TunnelPeerSelector { rv.add(ctx.routerHash()); return rv; } + } diff --git a/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java index 79d560e229..63a40ce680 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java @@ -17,6 +17,10 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { int length = getLength(ctx, settings); if (length < 0) return null; + + if (shouldSelectExplicit(settings)) + return selectExplicit(ctx, settings, length); + HashSet matches = new HashSet(length); ctx.profileOrganizer().selectNotFailingPeers(length, null, matches, true); diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java index d224306a2b..295ae931b0 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java @@ -1,6 +1,12 @@ package net.i2p.router.tunnel.pool; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Properties; +import java.util.StringTokenizer; +import net.i2p.data.DataFormatException; +import net.i2p.data.Hash; import net.i2p.router.RouterContext; import net.i2p.router.TunnelPoolSettings; import net.i2p.util.Log; @@ -54,4 +60,58 @@ abstract class TunnelPeerSelector { } return length; } + + protected boolean shouldSelectExplicit(TunnelPoolSettings settings) { + Properties opts = settings.getUnknownOptions(); + if (opts != null) { + String peers = opts.getProperty("explicitPeers"); + if (peers != null) + return true; + } + return false; + } + + protected List selectExplicit(RouterContext ctx, TunnelPoolSettings settings, int length) { + String peers = null; + Properties opts = settings.getUnknownOptions(); + if (opts != null) + peers = opts.getProperty("explicitPeers"); + + Log log = ctx.logManager().getLog(ClientPeerSelector.class); + List rv = new ArrayList(); + StringTokenizer tok = new StringTokenizer(peers, ","); + Hash h = new Hash(); + while (tok.hasMoreTokens()) { + String peerStr = tok.nextToken(); + Hash peer = new Hash(); + try { + peer.fromBase64(peerStr); + + if (ctx.profileOrganizer().isSelectable(peer)) { + rv.add(peer); + } else { + if (log.shouldLog(Log.WARN)) + log.warn("Explicit peer is not selectable: " + peerStr); + } + } catch (DataFormatException dfe) { + if (log.shouldLog(Log.ERROR)) + log.error("Explicit peer is improperly formatted (" + peerStr + ")", dfe); + } + } + + Collections.shuffle(rv, ctx.random()); + + + while (rv.size() > length) + rv.remove(0); + + if (settings.isInbound()) + rv.add(0, ctx.routerHash()); + else + rv.add(ctx.routerHash()); + + if (log.shouldLog(Log.INFO)) + log.info(toString() + ": Selecting peers explicitly: " + rv); + return rv; + } }