From 77b88ab59dd6db060070ad1152ce63dfa0ca309f Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 15 Jan 2010 21:37:41 +0000 Subject: [PATCH] * NetDb Stores and Verifies: - Do LS stores and verifies through client tunnels to prevent correlation by the OBEP or FF - Encrypt LS stores to prevent snooping by the OBEP - Encrypt LS and RI verifies to prevent snooping by the OBEP - Extend verify delay and timeout - Reenable RI verifies - Disallow simultaneous verifies for the same key - Don't resend on verify timeout; try a different peer instead - Adjust ff selection criteria --- .../crypto/TransientSessionKeyManager.java | 19 ++- .../i2p/data/i2cp/RequestLeaseSetMessage.java | 1 + .../i2p/router/client/RequestLeaseSetJob.java | 9 +- .../router/message/GarlicMessageBuilder.java | 20 ++- .../FloodfillNetworkDatabaseFacade.java | 18 +++ .../kademlia/FloodfillPeerSelector.java | 6 +- .../networkdb/kademlia/FloodfillStoreJob.java | 6 + .../kademlia/FloodfillVerifyStoreJob.java | 118 +++++++++++++---- .../networkdb/kademlia/MessageWrapper.java | 120 ++++++++++++++++++ .../router/networkdb/kademlia/StoreJob.java | 118 +++++++++++++++-- .../router/networkdb/kademlia/StoreState.java | 50 ++++++-- .../tunnel/InboundMessageDistributor.java | 20 ++- 12 files changed, 436 insertions(+), 69 deletions(-) create mode 100644 router/java/src/net/i2p/router/networkdb/kademlia/MessageWrapper.java diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java index 931da3ffa..17543b1bd 100644 --- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java @@ -145,23 +145,20 @@ public class TransientSessionKeyManager extends SessionKeyManager { } - /** TagSet */ - /* FIXME Exporting non-public type through public API */ - protected Set getInboundTagSets() { + /** TagSet - used only by HTML */ + private Set getInboundTagSets() { synchronized (_inboundTagSets) { return new HashSet(_inboundTagSets.values()); } } - /** OutboundSession */ - /* FIXME Exporting non-public type through public API */ - protected Set getOutboundSessions() { + /** OutboundSession - used only by HTML */ + private Set getOutboundSessions() { synchronized (_outboundSessions) { return new HashSet(_outboundSessions.values()); } } - /* FIXME Exporting non-public type through public API */ /****** leftover from when we had the persistent SKM protected void setData(Set inboundTagSets, Set outboundSessions) { if (_log.shouldLog(Log.INFO)) @@ -531,7 +528,7 @@ public class TransientSessionKeyManager extends SessionKeyManager { * * @return number of tag sets expired */ - public int aggressiveExpire() { + private int aggressiveExpire() { int removed = 0; int remaining = 0; long now = _context.clock().now(); @@ -569,9 +566,8 @@ public class TransientSessionKeyManager extends SessionKeyManager { //_log.warn("Expiring tags: [" + tagsToDrop + "]"); synchronized (_outboundSessions) { - for (Iterator iter = _outboundSessions.keySet().iterator(); iter.hasNext();) { - PublicKey key = iter.next(); - OutboundSession sess = _outboundSessions.get(key); + for (Iterator iter = _outboundSessions.values().iterator(); iter.hasNext();) { + OutboundSession sess = iter.next(); removed += sess.expireTags(); // don't kill a new session or one that's temporarily out of tags if (sess.getLastUsedDate() < now - (SESSION_LIFETIME_MAX_MS / 2) && @@ -663,6 +659,7 @@ public class TransientSessionKeyManager extends SessionKeyManager { } } + /** fixme pass in context and change to static */ private class OutboundSession { private PublicKey _target; private SessionKey _currentKey; diff --git a/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java index e4db066cb..35249c76c 100644 --- a/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java @@ -61,6 +61,7 @@ public class RequestLeaseSetMessage extends I2CPMessageImpl { return ((TunnelEndpoint) _endpoints.get(endpoint)).getTunnelId(); } + /** @deprecated unused - presumably he meant remove? */ public void remoteEndpoint(int endpoint) { if ((endpoint >= 0) && (endpoint < _endpoints.size())) _endpoints.remove(endpoint); } diff --git a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java index 9e6c34dbb..c4e8e8d1b 100644 --- a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java +++ b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java @@ -54,13 +54,8 @@ class RequestLeaseSetJob extends JobImpl { if (_runner.isDead()) return; RequestLeaseSetMessage msg = new RequestLeaseSetMessage(); - Date end = null; - // get the earliest end date - for (int i = 0; i < _requestState.getRequested().getLeaseCount(); i++) { - if ( (end == null) || (end.getTime() > _requestState.getRequested().getLease(i).getEndDate().getTime()) ) - end = _requestState.getRequested().getLease(i).getEndDate(); - } - + Date end = new Date(_requestState.getRequested().getEarliestLeaseDate()); + msg.setEndDate(end); msg.setSessionId(_runner.getSessionId()); diff --git a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java index d2fe7cd61..a9bea9c2f 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java @@ -118,6 +118,8 @@ public class GarlicMessageBuilder { ***/ /** + * called by above + * * @param ctx scope * @param config how/what to wrap * @param wrappedKey output parameter that will be filled with the sessionKey used @@ -127,6 +129,22 @@ public class GarlicMessageBuilder { */ private static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags, int numTagsToDeliver, boolean forceElGamal, SessionKeyManager skm) { + return buildMessage(ctx, config, wrappedKey, wrappedTags, numTagsToDeliver, LOW_THRESHOLD, false, skm); + } + + /** + * called by netdb + * + * @param ctx scope + * @param config how/what to wrap + * @param wrappedKey output parameter that will be filled with the sessionKey used + * @param wrappedTags output parameter that will be filled with the sessionTags used + * @param numTagsToDeliver only if the estimated available tags are below the threshold + * @param lowTagsThreshold the threshold + * @param skm non-null + */ + public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags, + int numTagsToDeliver, int lowTagsThreshold, boolean forceElGamal, SessionKeyManager skm) { Log log = ctx.logManager().getLog(GarlicMessageBuilder.class); PublicKey key = config.getRecipientPublicKey(); if (key == null) { @@ -154,7 +172,7 @@ public class GarlicMessageBuilder { if (log.shouldLog(Log.DEBUG)) log.debug("Available tags for encryption to " + key + ": " + availTags); - if (availTags < LOW_THRESHOLD) { // arbitrary threshold + if (availTags < lowTagsThreshold) { // arbitrary threshold for (int i = 0; i < numTagsToDeliver; i++) wrappedTags.add(new SessionTag(true)); if (log.shouldLog(Log.INFO)) diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java index 719aea3d3..b51466d15 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java @@ -24,6 +24,7 @@ import net.i2p.router.ReplyJob; import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; /** @@ -35,11 +36,13 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad private boolean _floodfillEnabled; /** for testing, see isFloodfill() below */ private static String _alwaysQuery; + private final Set _verifiesInProgress; public FloodfillNetworkDatabaseFacade(RouterContext context) { super(context); _activeFloodQueries = new HashMap(); _floodfillEnabled = false; + _verifiesInProgress = new ConcurrentHashSet(8); _alwaysQuery = _context.getProperty("netDb.alwaysQuery"); _context.statManager().createRateStat("netDb.successTime", "How long a successful search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l }); @@ -318,6 +321,21 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad return sel.selectFloodfillParticipants(getKBuckets()); } + /** @since 0.7.10 */ + boolean isVerifyInProgress(Hash h) { + return _verifiesInProgress.contains(h); + } + + /** @since 0.7.10 */ + void verifyStarted(Hash h) { + _verifiesInProgress.add(h); + } + + /** @since 0.7.10 */ + void verifyFinished(Hash h) { + _verifiesInProgress.remove(h); + } + /** NTCP cons drop quickly but SSU takes a while, so it's prudent to keep this * a little higher than 1 or 2. */ protected final static int MIN_ACTIVE_PEERS = 5; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java index c6c2d2cfb..4d270424b 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java @@ -108,11 +108,11 @@ class FloodfillPeerSelector extends PeerSelector { return selectFloodfillParticipants(key, maxNumRouters, null, kbuckets); } - /** .75 * PublishLocalRouterInfoJob.PUBLISH_DELAY */ - private static final int NO_FAIL_STORE_OK = 15*60*1000; + /** .5 * PublishLocalRouterInfoJob.PUBLISH_DELAY */ + private static final int NO_FAIL_STORE_OK = 10*60*1000; private static final int NO_FAIL_STORE_GOOD = NO_FAIL_STORE_OK * 2; /** this must be longer than the max streaming timeout (60s) */ - private static final int NO_FAIL_LOOKUP_OK = 5*60*1000; + private static final int NO_FAIL_LOOKUP_OK = 2*60*1000; private static final int NO_FAIL_LOOKUP_GOOD = NO_FAIL_LOOKUP_OK * 3; private static final int MAX_GOOD_RESP_TIME = 5*1000; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java index 294093627..8fcdb0fa6 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java @@ -18,6 +18,7 @@ import net.i2p.data.LeaseSet; import net.i2p.data.RouterInfo; import net.i2p.router.Job; import net.i2p.router.RouterContext; +import net.i2p.util.Log; /** * This extends StoreJob to fire off a FloodfillVerifyStoreJob after success. @@ -54,6 +55,11 @@ class FloodfillStoreJob extends StoreJob { protected void succeed() { super.succeed(); if (_state != null) { + if (_facade.isVerifyInProgress(_state.getTarget())) { + if (_log.shouldLog(Log.INFO)) + _log.info("Skipping verify, one already in progress for: " + _state.getTarget()); + return; + } // Get the time stamp from the data we sent, so the Verify job can meke sure that // it finds something stamped with that time or newer. long published = 0; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java index 6627b0905..036a8a00b 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java @@ -6,6 +6,7 @@ import java.util.Set; import net.i2p.data.DataStructure; import net.i2p.data.Hash; +import net.i2p.data.RouterInfo; import net.i2p.data.i2np.DatabaseLookupMessage; import net.i2p.data.i2np.DatabaseSearchReplyMessage; import net.i2p.data.i2np.DatabaseStoreMessage; @@ -33,22 +34,32 @@ public class FloodfillVerifyStoreJob extends JobImpl { private long _sendTime; private long _published; private boolean _isRouterInfo; + private MessageWrapper.WrappedMessage _wrappedMessage; + private final Set _ignore; - private static final int VERIFY_TIMEOUT = 10*1000; + private static final int START_DELAY = 20*1000; + private static final int VERIFY_TIMEOUT = 15*1000; + private static final int MAX_PEERS_TO_TRY = 5; /** + * Delay a few seconds, then start the verify * @param sentTo who to give the credit or blame to, can be null */ public FloodfillVerifyStoreJob(RouterContext ctx, Hash key, long published, boolean isRouterInfo, Hash sentTo, FloodfillNetworkDatabaseFacade facade) { super(ctx); + facade.verifyStarted(key); _key = key; _published = published; _isRouterInfo = isRouterInfo; _log = ctx.logManager().getLog(getClass()); _sentTo = sentTo; _facade = facade; - // wait 10 seconds before trying to verify the store - getTiming().setStartAfter(ctx.clock().now() + VERIFY_TIMEOUT); + _ignore = new HashSet(MAX_PEERS_TO_TRY); + if (sentTo != null) { + _ignore.add(_sentTo); + } + // wait some time before trying to verify the store + getTiming().setStartAfter(ctx.clock().now() + START_DELAY); getContext().statManager().createRateStat("netDb.floodfillVerifyOK", "How long a floodfill verify takes when it succeeds", "NetworkDatabase", new long[] { 60*60*1000 }); getContext().statManager().createRateStat("netDb.floodfillVerifyFail", "How long a floodfill verify takes when it fails", "NetworkDatabase", new long[] { 60*60*1000 }); getContext().statManager().createRateStat("netDb.floodfillVerifyTimeout", "How long a floodfill verify takes when it times out", "NetworkDatabase", new long[] { 60*60*1000 }); @@ -56,46 +67,77 @@ public class FloodfillVerifyStoreJob extends JobImpl { public String getName() { return "Verify netdb store"; } /** - * Wait 10 seconds, then query a random floodfill for the leaseset or routerinfo + * Query a random floodfill for the leaseset or routerinfo * that we just stored to a (hopefully different) floodfill peer. * - * If it fails (after waiting up to another 10 seconds), resend the data. + * If it fails (after a timeout period), resend the data. * If the queried data is older than what we stored, that counts as a fail. **/ public void runJob() { _target = pickTarget(); - if (_target == null) return; - + if (_target == null) { + _facade.verifyFinished(_key); + return; + } + DatabaseLookupMessage lookup = buildLookup(); - if (lookup == null) return; + if (lookup == null) { + _facade.verifyFinished(_key); + return; + } - TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel(); + // If we are verifying a leaseset, use the destination's own tunnels, + // to avoid association by the exploratory tunnel OBEP. + // Unless it is an encrypted leaseset. + TunnelInfo outTunnel; + if (_isRouterInfo || getContext().keyRing().get(_key) != null) + outTunnel = getContext().tunnelManager().selectOutboundTunnel(); + else + outTunnel = getContext().tunnelManager().selectOutboundTunnel(_key); if (outTunnel == null) { if (_log.shouldLog(Log.WARN)) _log.warn("No outbound tunnels to verify a store"); + _facade.verifyFinished(_key); return; } + // garlic encrypt to hide contents from the OBEP + RouterInfo peer = _facade.lookupRouterInfoLocally(_target); + if (peer == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Fail finding target RI"); + _facade.verifyFinished(_key); + return; + } + Hash fromKey; + if (_isRouterInfo) + fromKey = null; + else + fromKey = _key; + _wrappedMessage = MessageWrapper.wrap(getContext(), lookup, fromKey, peer); + if (_wrappedMessage == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Fail Garlic encrypting"); + _facade.verifyFinished(_key); + return; + } + I2NPMessage sent = _wrappedMessage.getMessage(); + if (_log.shouldLog(Log.INFO)) _log.info("Starting verify (stored " + _key + " to " + _sentTo + "), asking " + _target); _sendTime = getContext().clock().now(); _expiration = _sendTime + VERIFY_TIMEOUT; getContext().messageRegistry().registerPending(new VerifyReplySelector(), new VerifyReplyJob(getContext()), new VerifyTimeoutJob(getContext()), VERIFY_TIMEOUT); - getContext().tunnelDispatcher().dispatchOutbound(lookup, outTunnel.getSendTunnelId(0), _target); + getContext().tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), _target); } /** * Pick a responsive floodfill close to the key, but not the one we sent to */ private Hash pickTarget() { - Set ignore = null; - if (_sentTo != null) { - ignore = new HashSet(1); - ignore.add(_sentTo); - } Hash rkey = getContext().routingKeyGenerator().getRoutingKey(_key); FloodfillPeerSelector sel = (FloodfillPeerSelector)_facade.getPeerSelector(); - List peers = sel.selectFloodfillParticipants(rkey, 1, ignore, _facade.getKBuckets()); + List peers = sel.selectFloodfillParticipants(rkey, 1, _ignore, _facade.getKBuckets()); if (peers.size() > 0) return peers.get(0); @@ -105,7 +147,14 @@ public class FloodfillVerifyStoreJob extends JobImpl { } private DatabaseLookupMessage buildLookup() { - TunnelInfo replyTunnelInfo = getContext().tunnelManager().selectInboundTunnel(); + // If we are verifying a leaseset, use the destination's own tunnels, + // to avoid association by the exploratory tunnel OBEP. + // Unless it is an encrypted leaseset. + TunnelInfo replyTunnelInfo; + if (_isRouterInfo || getContext().keyRing().get(_key) != null) + replyTunnelInfo = getContext().tunnelManager().selectInboundTunnel(); + else + replyTunnelInfo = getContext().tunnelManager().selectInboundTunnel(_key); if (replyTunnelInfo == null) { if (_log.shouldLog(Log.WARN)) _log.warn("No inbound tunnels to get a reply from!"); @@ -145,6 +194,9 @@ public class FloodfillVerifyStoreJob extends JobImpl { public String getName() { return "Handle floodfill verification reply"; } public void runJob() { long delay = getContext().clock().now() - _sendTime; + if (_wrappedMessage != null) + _wrappedMessage.acked(); + _facade.verifyFinished(_key); if (_message instanceof DatabaseStoreMessage) { // Verify it's as recent as the one we sent boolean success = false; @@ -160,19 +212,20 @@ public class FloodfillVerifyStoreJob extends JobImpl { getContext().profileManager().dbStoreSuccessful(_sentTo); getContext().statManager().addRateData("netDb.floodfillVerifyOK", delay, 0); if (_log.shouldLog(Log.INFO)) - _log.info("Verify success"); + _log.info("Verify success for " + _key); return; } if (_log.shouldLog(Log.WARN)) - _log.warn("Verify failed - older"); + _log.warn("Verify failed (older) for " + _key); } else if (_message instanceof DatabaseSearchReplyMessage) { // assume 0 old, all new, 0 invalid, 0 dup getContext().profileManager().dbLookupReply(_target, 0, ((DatabaseSearchReplyMessage)_message).getNumReplies(), 0, 0, delay); if (_log.shouldLog(Log.WARN)) - _log.warn("Verify failed - DSRM"); + _log.warn("Verify failed (DSRM) for " + _key); } // store failed, boo, hiss! + // For now, blame the sent-to peer, but not the verify peer if (_sentTo != null) getContext().profileManager().dbStoreFailed(_sentTo); getContext().statManager().addRateData("netDb.floodfillVerifyFail", delay, 0); @@ -183,8 +236,9 @@ public class FloodfillVerifyStoreJob extends JobImpl { /** * the netDb store failed to verify, so resend it to a random floodfill peer - * Fixme - this can loop for a long time - do we need a token or counter - * so we don't have multiple verify jobs? + * Fixme - since we now store closest-to-the-key, this is likely to store to the + * very same ff as last time, until the stats get bad enough to switch. + * Pass the failed ff through as a don't-store-to? */ private void resend() { DataStructure ds; @@ -202,14 +256,24 @@ public class FloodfillVerifyStoreJob extends JobImpl { } public String getName() { return "Floodfill verification timeout"; } public void runJob() { - // don't know who to blame (we could have gotten a DSRM) so blame both + if (_wrappedMessage != null) + _wrappedMessage.fail(); + // Only blame the verify peer getContext().profileManager().dbLookupFailed(_target); - if (_sentTo != null) - getContext().profileManager().dbStoreFailed(_sentTo); + //if (_sentTo != null) + // getContext().profileManager().dbStoreFailed(_sentTo); getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime, 0); if (_log.shouldLog(Log.WARN)) - _log.warn("Verify timed out"); - resend(); + _log.warn("Verify timed out for: " + _key); + if (_ignore.size() < MAX_PEERS_TO_TRY) { + // Don't resend, simply rerun FVSJ.this inline and + // chose somebody besides _target for verification + _ignore.add(_target); + FloodfillVerifyStoreJob.this.runJob(); + } else { + _facade.verifyFinished(_key); + resend(); + } } } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/MessageWrapper.java b/router/java/src/net/i2p/router/networkdb/kademlia/MessageWrapper.java new file mode 100644 index 000000000..73ed7ad70 --- /dev/null +++ b/router/java/src/net/i2p/router/networkdb/kademlia/MessageWrapper.java @@ -0,0 +1,120 @@ +package net.i2p.router.networkdb.kademlia; + +import java.util.HashSet; +import java.util.Set; + +import net.i2p.crypto.SessionKeyManager; +import net.i2p.crypto.TagSetHandle; +import net.i2p.data.Certificate; +import net.i2p.data.Hash; +import net.i2p.data.PublicKey; +import net.i2p.data.RouterInfo; +import net.i2p.data.SessionKey; +import net.i2p.data.SessionTag; +import net.i2p.data.i2np.DeliveryInstructions; +import net.i2p.data.i2np.GarlicMessage; +import net.i2p.data.i2np.I2NPMessage; +import net.i2p.router.RouterContext; +import net.i2p.router.message.GarlicMessageBuilder; +import net.i2p.router.message.PayloadGarlicConfig; +import net.i2p.util.Log; + +/** + * Method an class for garlic encrypting outbound netdb traffic, + * including management of the ElGamal/AES tags + * + * @since 0.7.10 + */ +class MessageWrapper { + + private static final Log _log = RouterContext.getGlobalContext().logManager().getLog(MessageWrapper.class); + + private static final int NETDB_TAGS_TO_DELIVER = 6; + private static final int NETDB_LOW_THRESHOLD = 3; + + /** + * Garlic wrap a message from a client or this router, destined for a router, + * to hide the contents from the OBEP. + * Caller must call acked() or fail() on the returned object. + * + * @param from must be a local client with a session key manager, + * or null to use the router's session key manager + * @return null on encrypt failure + */ + static WrappedMessage wrap(RouterContext ctx, I2NPMessage m, Hash from, RouterInfo to) { + DeliveryInstructions instructions = new DeliveryInstructions(); + instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL); + + PayloadGarlicConfig payload = new PayloadGarlicConfig(); + payload.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null)); + payload.setId(ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE)); + payload.setPayload(m); + payload.setRecipient(to); + payload.setDeliveryInstructions(instructions); + payload.setRequestAck(false); + payload.setExpiration(m.getMessageExpiration()); + + SessionKeyManager skm; + if (from != null) + skm = ctx.clientManager().getClientSessionKeyManager(from); + else + skm = ctx.sessionKeyManager(); + if (skm == null) + return null; + SessionKey sentKey = new SessionKey(); + Set sentTags = new HashSet(); + GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, payload, sentKey, sentTags, + NETDB_TAGS_TO_DELIVER, NETDB_LOW_THRESHOLD, false, skm); + if (msg == null) + return null; + TagSetHandle tsh = null; + PublicKey sentTo = to.getIdentity().getPublicKey(); + if (sentTags.size() > 0) + tsh = skm.tagsDelivered(sentTo, sentKey, sentTags); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sent to: " + to.getIdentity().getHash() + " with key: " + sentKey + " and tags: " + sentTags.size()); + return new WrappedMessage(msg, skm, sentTo, sentKey, tsh); + } + + /** + * Wrapper so that we can keep track of the key and tags + * for later notification to the SKM + */ + static class WrappedMessage { + private GarlicMessage msg; + private SessionKeyManager skm; + private PublicKey sentTo; + private SessionKey sessionKey; + private TagSetHandle tsh; + + WrappedMessage(GarlicMessage msg, SessionKeyManager skm, PublicKey sentTo, SessionKey sentKey, TagSetHandle tsh) { + this.msg = msg; + this.skm = skm; + this.sentTo = sentTo; + this.sessionKey = sentKey; + this.tsh = tsh; + } + + GarlicMessage getMessage() { + return this.msg; + } + + /** delivered tags (if any) were acked */ + void acked() { + if (this.tsh != null) { + this.skm.tagsAcked(this.sentTo, this.sessionKey, this.tsh); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Tags acked for key: " + this.sessionKey); + } + } + + /** delivered tags (if any) were not acked */ + void fail() { + if (this.tsh != null) { + this.skm.failTags(this.sentTo, this.sessionKey, this.tsh); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Tags NOT acked for key: " + this.sessionKey); + } + } + } +} diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index 066122ce4..9e3623639 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -32,7 +32,7 @@ import net.i2p.stat.RateStat; import net.i2p.util.Log; class StoreJob extends JobImpl { - private Log _log; + protected Log _log; private KademliaNetworkDatabaseFacade _facade; protected StoreState _state; private Job _onSuccess; @@ -293,7 +293,11 @@ class StoreJob extends JobImpl { private void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) { getContext().statManager().addRateData("netDb.storeLeaseSetSent", 1, 0); - sendStoreThroughGarlic(msg, peer, expiration); + // if it is an encrypted leaseset... + if (getContext().keyRing().get(msg.getKey()) != null) + sendStoreThroughGarlic(msg, peer, expiration); + else + sendStoreThroughClient(msg, peer, expiration); } else { getContext().statManager().addRateData("netDb.storeRouterInfoSent", 1, 0); sendDirect(msg, peer, expiration); @@ -389,6 +393,93 @@ class StoreJob extends JobImpl { return getContext().tunnelManager().selectInboundTunnel(); } + /** + * Send a leaseset store message out the client tunnel, + * with the reply to come back through a client tunnel. + * Stores are garlic encrypted to hide the identity from the OBEP. + * + * This makes it harder for an exploratory OBEP or IBGW to correlate it + * with one or more destinations. Since we are publishing the leaseset, + * it's easy to find out that an IB tunnel belongs to this dest, and + * it isn't much harder to do the same for an OB tunnel. + * + * As a side benefit, client tunnels should be faster and more reliable than + * exploratory tunnels. + * + * @param msg must contain a leaseset + * @since 0.7.10 + */ + private void sendStoreThroughClient(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { + long token = getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE); + Hash client = msg.getKey(); + + TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel(client); + if (replyTunnel == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No reply inbound tunnels available!"); + fail(); + return; + } + TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0); + msg.setReplyToken(token); + msg.setReplyTunnel(replyTunnelId); + msg.setReplyGateway(replyTunnel.getPeer(0)); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": send(dbStore) w/ token expected " + token); + + Hash to = peer.getIdentity().getHash(); + TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel(client); + if (outTunnel != null) { + // garlic encrypt + MessageWrapper.WrappedMessage wm = MessageWrapper.wrap(getContext(), msg, client, peer); + if (wm == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Fail garlic encrypting from: " + client); + fail(); + return; + } + I2NPMessage sent = wm.getMessage(); + + SendSuccessJob onReply = new SendSuccessJob(getContext(), peer, outTunnel, sent.getMessageSize()); + FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now()); + StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, token, expiration); + + if (_log.shouldLog(Log.DEBUG)) { + _log.debug("sending encrypted store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + sent); + //_log.debug("Expiration is " + new Date(sent.getMessageExpiration())); + } + getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now())); + _state.addPending(to, wm); + getContext().tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), null, to); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("No outbound tunnels to send a dbStore out - delaying..."); + // continueSending() above did an addPending() so remove it here. + // This means we will skip the peer next time, can't be helped for now + // without modding StoreState + _state.replyTimeout(to); + Job waiter = new WaitJob(getContext()); + waiter.getTiming().setStartAfter(getContext().clock().now() + 3*1000); + getContext().jobQueue().addJob(waiter); + //fail(); + } + } + + /** + * Called to wait a little while + * @since 0.7.10 + */ + private class WaitJob extends JobImpl { + public WaitJob(RouterContext enclosingContext) { + super(enclosingContext); + } + public void runJob() { + sendNext(); + } + public String getName() { return "Kademlia Store Send Delay"; } + } + /** * Called after sending a dbStore to a peer successfully, * marking the store as successful @@ -414,11 +505,16 @@ class StoreJob extends JobImpl { public String getName() { return "Kademlia Store Send Success"; } public void runJob() { - long howLong = _state.confirmed(_peer.getIdentity().getHash()); + Hash hash = _peer.getIdentity().getHash(); + MessageWrapper.WrappedMessage wm = _state.getPendingMessage(hash); + if (wm != null) + wm.acked(); + long howLong = _state.confirmed(hash); + if (_log.shouldLog(Log.INFO)) _log.info(StoreJob.this.getJobId() + ": Marking store of " + _state.getTarget() - + " to " + _peer.getIdentity().getHash().toBase64() + " successful after " + howLong); - getContext().profileManager().dbStoreSent(_peer.getIdentity().getHash(), howLong); + + " to " + hash.toBase64() + " successful after " + howLong); + getContext().profileManager().dbStoreSent(hash, howLong); getContext().statManager().addRateData("netDb.ackTime", howLong, howLong); if ( (_sendThrough != null) && (_msgSize > 0) ) { @@ -456,11 +552,17 @@ class StoreJob extends JobImpl { _sendOn = sendOn; } public void runJob() { + Hash hash = _peer.getIdentity().getHash(); if (_log.shouldLog(Log.INFO)) - _log.info(StoreJob.this.getJobId() + ": Peer " + _peer.getIdentity().getHash().toBase64() + _log.info(StoreJob.this.getJobId() + ": Peer " + hash.toBase64() + " timed out sending " + _state.getTarget()); - _state.replyTimeout(_peer.getIdentity().getHash()); - getContext().profileManager().dbStoreFailed(_peer.getIdentity().getHash()); + + MessageWrapper.WrappedMessage wm = _state.getPendingMessage(hash); + if (wm != null) + wm.fail(); + _state.replyTimeout(hash); + + getContext().profileManager().dbStoreFailed(hash); getContext().statManager().addRateData("netDb.replyTimeout", getContext().clock().now() - _sendOn, 0); sendNext(); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java index 84e955c9c..eb131fdfc 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java @@ -5,23 +5,26 @@ import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.data.DataStructure; import net.i2p.data.Hash; import net.i2p.router.RouterContext; /** - * Todo: remove exploratory + * */ class StoreState { private RouterContext _context; private Hash _key; private DataStructure _data; private final HashSet _pendingPeers; - private HashMap _pendingPeerTimes; + private Map _pendingPeerTimes; + private Map _pendingMessages; private final HashSet _successfulPeers; - private final HashSet _successfulExploratoryPeers; + //private final HashSet _successfulExploratoryPeers; private final HashSet _failedPeers; private final HashSet _attemptedPeers; private int _completeCount; @@ -35,16 +38,17 @@ class StoreState { _context = ctx; _key = key; _data = data; - _pendingPeers = new HashSet(16); - _pendingPeerTimes = new HashMap(16); - _attemptedPeers = new HashSet(16); + _pendingPeers = new HashSet(4); + _pendingPeerTimes = new HashMap(4); + _pendingMessages = new ConcurrentHashMap(4); + _attemptedPeers = new HashSet(8); if (toSkip != null) { _attemptedPeers.addAll(toSkip); _completeCount = toSkip.size(); } - _failedPeers = new HashSet(16); - _successfulPeers = new HashSet(16); - _successfulExploratoryPeers = new HashSet(16); + _failedPeers = new HashSet(8); + _successfulPeers = new HashSet(4); + //_successfulExploratoryPeers = new HashSet(16); _completed = -1; _started = _context.clock().now(); } @@ -66,12 +70,15 @@ class StoreState { return (Set)_successfulPeers.clone(); } } - /** @deprecated unused */ + /** unused */ +/**** public Set getSuccessfulExploratory() { synchronized (_successfulExploratoryPeers) { return (Set)_successfulExploratoryPeers.clone(); } } +****/ + public Set getFailed() { synchronized (_failedPeers) { return (Set)_failedPeers.clone(); @@ -87,6 +94,23 @@ class StoreState { public long getWhenStarted() { return _started; } public long getWhenCompleted() { return _completed; } + /* + * @since 0.7.10 + */ + public void addPending(Hash peer, MessageWrapper.WrappedMessage msg) { + addPending(peer); + _pendingMessages.put(peer, msg); + } + + /* + * @return the message or null; will only return the message once, so + * tags are only acked or failed once. + * @since 0.7.10 + */ + public MessageWrapper.WrappedMessage getPendingMessage(Hash peer) { + return _pendingMessages.remove(peer); + } + public void addPending(Hash peer) { synchronized (_pendingPeers) { _pendingPeers.add(peer); @@ -128,7 +152,8 @@ class StoreState { return rv; } - /** @deprecated unused */ + /** unused */ +/**** public long confirmedExploratory(Hash peer) { long rv = -1; synchronized (_pendingPeers) { @@ -142,6 +167,7 @@ class StoreState { } return rv; } +****/ public void replyTimeout(Hash peer) { synchronized (_pendingPeers) { @@ -193,6 +219,7 @@ class StoreState { buf.append(peer.toBase64()).append(" "); } } +/**** buf.append(" Successful Exploratory: "); synchronized (_successfulExploratoryPeers) { buf.append(_successfulExploratoryPeers.size()).append(' '); @@ -201,6 +228,7 @@ class StoreState { buf.append(peer.toBase64()).append(" "); } } +****/ return buf.toString(); } } diff --git a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java index b1b03fb22..8533a48bc 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java +++ b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java @@ -4,6 +4,7 @@ import net.i2p.data.Hash; import net.i2p.data.Payload; import net.i2p.data.TunnelId; import net.i2p.data.i2np.DataMessage; +import net.i2p.data.i2np.DatabaseSearchReplyMessage; import net.i2p.data.i2np.DatabaseStoreMessage; import net.i2p.data.i2np.DeliveryInstructions; import net.i2p.data.i2np.DeliveryStatusMessage; @@ -52,9 +53,26 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec } */ + // FVSJ could also result in a DSRM. + // Since there's some code that replies directly to this to gather new ff RouterInfos, + // sanitize it if ( (_client != null) && + (msg.getType() == DatabaseSearchReplyMessage.MESSAGE_TYPE) && + (_client.equals(((DatabaseSearchReplyMessage)msg).getSearchKey()))) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Removing replies from a DSRM down a tunnel for " + _client.toBase64() + ": " + msg); + DatabaseSearchReplyMessage orig = (DatabaseSearchReplyMessage) msg; + DatabaseSearchReplyMessage newMsg = new DatabaseSearchReplyMessage(_context); + newMsg.setFromHash(orig.getFromHash()); + newMsg.setSearchKey(orig.getSearchKey()); + msg = newMsg; + } else if ( (_client != null) && (msg.getType() != DeliveryStatusMessage.MESSAGE_TYPE) && (msg.getType() != GarlicMessage.MESSAGE_TYPE) && + // allow DSM of our own key (used by FloodfillVerifyStoreJob) + // as long as there's no reply token (FVSJ will never set a reply token but an attacker might) + ((msg.getType() != DatabaseStoreMessage.MESSAGE_TYPE) || (!_client.equals(((DatabaseStoreMessage)msg).getKey())) || + (((DatabaseStoreMessage)msg).getReplyToken() != 0)) && (msg.getType() != TunnelBuildReplyMessage.MESSAGE_TYPE)) { // drop it, since we should only get tunnel test messages and garlic messages down // client tunnels @@ -62,7 +80,7 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec _log.error("Dropped dangerous message down a tunnel for " + _client.toBase64() + ": " + msg, new Exception("cause")); return; } - + if ( (target == null) || ( (tunnel == null) && (_context.routerHash().equals(target) ) ) ) { // targetting us either implicitly (no target) or explicitly (no tunnel) // make sure we don't honor any remote requests directly (garlic instructions, etc)