* NetDb Stores and Verifies:

- Do LS stores and verifies through client tunnels
        to prevent correlation by the OBEP or FF
      - Encrypt LS stores to prevent snooping by the OBEP
      - Encrypt LS and RI verifies to prevent snooping by the OBEP
      - Extend verify delay and timeout
      - Reenable RI verifies
      - Disallow simultaneous verifies for the same key
      - Don't resend on verify timeout; try a different peer instead
      - Adjust ff selection criteria
This commit is contained in:
zzz
2010-01-15 21:37:41 +00:00
parent 64235bd745
commit 77b88ab59d
12 changed files with 436 additions and 69 deletions

View File

@ -145,23 +145,20 @@ public class TransientSessionKeyManager extends SessionKeyManager {
}
/** TagSet */
/* FIXME Exporting non-public type through public API */
protected Set<TagSet> getInboundTagSets() {
/** TagSet - used only by HTML */
private Set<TagSet> getInboundTagSets() {
synchronized (_inboundTagSets) {
return new HashSet(_inboundTagSets.values());
}
}
/** OutboundSession */
/* FIXME Exporting non-public type through public API */
protected Set<OutboundSession> getOutboundSessions() {
/** OutboundSession - used only by HTML */
private Set<OutboundSession> getOutboundSessions() {
synchronized (_outboundSessions) {
return new HashSet(_outboundSessions.values());
}
}
/* FIXME Exporting non-public type through public API */
/****** leftover from when we had the persistent SKM
protected void setData(Set<TagSet> inboundTagSets, Set<OutboundSession> outboundSessions) {
if (_log.shouldLog(Log.INFO))
@ -531,7 +528,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
*
* @return number of tag sets expired
*/
public int aggressiveExpire() {
private int aggressiveExpire() {
int removed = 0;
int remaining = 0;
long now = _context.clock().now();
@ -569,9 +566,8 @@ public class TransientSessionKeyManager extends SessionKeyManager {
//_log.warn("Expiring tags: [" + tagsToDrop + "]");
synchronized (_outboundSessions) {
for (Iterator<PublicKey> iter = _outboundSessions.keySet().iterator(); iter.hasNext();) {
PublicKey key = iter.next();
OutboundSession sess = _outboundSessions.get(key);
for (Iterator<OutboundSession> iter = _outboundSessions.values().iterator(); iter.hasNext();) {
OutboundSession sess = iter.next();
removed += sess.expireTags();
// don't kill a new session or one that's temporarily out of tags
if (sess.getLastUsedDate() < now - (SESSION_LIFETIME_MAX_MS / 2) &&
@ -663,6 +659,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
}
}
/** fixme pass in context and change to static */
private class OutboundSession {
private PublicKey _target;
private SessionKey _currentKey;

View File

@ -61,6 +61,7 @@ public class RequestLeaseSetMessage extends I2CPMessageImpl {
return ((TunnelEndpoint) _endpoints.get(endpoint)).getTunnelId();
}
/** @deprecated unused - presumably he meant remove? */
public void remoteEndpoint(int endpoint) {
if ((endpoint >= 0) && (endpoint < _endpoints.size())) _endpoints.remove(endpoint);
}

View File

@ -54,12 +54,7 @@ class RequestLeaseSetJob extends JobImpl {
if (_runner.isDead()) return;
RequestLeaseSetMessage msg = new RequestLeaseSetMessage();
Date end = null;
// get the earliest end date
for (int i = 0; i < _requestState.getRequested().getLeaseCount(); i++) {
if ( (end == null) || (end.getTime() > _requestState.getRequested().getLease(i).getEndDate().getTime()) )
end = _requestState.getRequested().getLease(i).getEndDate();
}
Date end = new Date(_requestState.getRequested().getEarliestLeaseDate());
msg.setEndDate(end);
msg.setSessionId(_runner.getSessionId());

View File

@ -118,6 +118,8 @@ public class GarlicMessageBuilder {
***/
/**
* called by above
*
* @param ctx scope
* @param config how/what to wrap
* @param wrappedKey output parameter that will be filled with the sessionKey used
@ -127,6 +129,22 @@ public class GarlicMessageBuilder {
*/
private static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set<SessionTag> wrappedTags,
int numTagsToDeliver, boolean forceElGamal, SessionKeyManager skm) {
return buildMessage(ctx, config, wrappedKey, wrappedTags, numTagsToDeliver, LOW_THRESHOLD, false, skm);
}
/**
* called by netdb
*
* @param ctx scope
* @param config how/what to wrap
* @param wrappedKey output parameter that will be filled with the sessionKey used
* @param wrappedTags output parameter that will be filled with the sessionTags used
* @param numTagsToDeliver only if the estimated available tags are below the threshold
* @param lowTagsThreshold the threshold
* @param skm non-null
*/
public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set<SessionTag> wrappedTags,
int numTagsToDeliver, int lowTagsThreshold, boolean forceElGamal, SessionKeyManager skm) {
Log log = ctx.logManager().getLog(GarlicMessageBuilder.class);
PublicKey key = config.getRecipientPublicKey();
if (key == null) {
@ -154,7 +172,7 @@ public class GarlicMessageBuilder {
if (log.shouldLog(Log.DEBUG))
log.debug("Available tags for encryption to " + key + ": " + availTags);
if (availTags < LOW_THRESHOLD) { // arbitrary threshold
if (availTags < lowTagsThreshold) { // arbitrary threshold
for (int i = 0; i < numTagsToDeliver; i++)
wrappedTags.add(new SessionTag(true));
if (log.shouldLog(Log.INFO))

View File

@ -24,6 +24,7 @@ import net.i2p.router.ReplyJob;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
/**
@ -35,11 +36,13 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
private boolean _floodfillEnabled;
/** for testing, see isFloodfill() below */
private static String _alwaysQuery;
private final Set<Hash> _verifiesInProgress;
public FloodfillNetworkDatabaseFacade(RouterContext context) {
super(context);
_activeFloodQueries = new HashMap();
_floodfillEnabled = false;
_verifiesInProgress = new ConcurrentHashSet(8);
_alwaysQuery = _context.getProperty("netDb.alwaysQuery");
_context.statManager().createRateStat("netDb.successTime", "How long a successful search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
@ -318,6 +321,21 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
return sel.selectFloodfillParticipants(getKBuckets());
}
/** @since 0.7.10 */
boolean isVerifyInProgress(Hash h) {
return _verifiesInProgress.contains(h);
}
/** @since 0.7.10 */
void verifyStarted(Hash h) {
_verifiesInProgress.add(h);
}
/** @since 0.7.10 */
void verifyFinished(Hash h) {
_verifiesInProgress.remove(h);
}
/** NTCP cons drop quickly but SSU takes a while, so it's prudent to keep this
* a little higher than 1 or 2. */
protected final static int MIN_ACTIVE_PEERS = 5;

View File

@ -108,11 +108,11 @@ class FloodfillPeerSelector extends PeerSelector {
return selectFloodfillParticipants(key, maxNumRouters, null, kbuckets);
}
/** .75 * PublishLocalRouterInfoJob.PUBLISH_DELAY */
private static final int NO_FAIL_STORE_OK = 15*60*1000;
/** .5 * PublishLocalRouterInfoJob.PUBLISH_DELAY */
private static final int NO_FAIL_STORE_OK = 10*60*1000;
private static final int NO_FAIL_STORE_GOOD = NO_FAIL_STORE_OK * 2;
/** this must be longer than the max streaming timeout (60s) */
private static final int NO_FAIL_LOOKUP_OK = 5*60*1000;
private static final int NO_FAIL_LOOKUP_OK = 2*60*1000;
private static final int NO_FAIL_LOOKUP_GOOD = NO_FAIL_LOOKUP_OK * 3;
private static final int MAX_GOOD_RESP_TIME = 5*1000;

View File

@ -18,6 +18,7 @@ import net.i2p.data.LeaseSet;
import net.i2p.data.RouterInfo;
import net.i2p.router.Job;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* This extends StoreJob to fire off a FloodfillVerifyStoreJob after success.
@ -54,6 +55,11 @@ class FloodfillStoreJob extends StoreJob {
protected void succeed() {
super.succeed();
if (_state != null) {
if (_facade.isVerifyInProgress(_state.getTarget())) {
if (_log.shouldLog(Log.INFO))
_log.info("Skipping verify, one already in progress for: " + _state.getTarget());
return;
}
// Get the time stamp from the data we sent, so the Verify job can meke sure that
// it finds something stamped with that time or newer.
long published = 0;

View File

@ -6,6 +6,7 @@ import java.util.Set;
import net.i2p.data.DataStructure;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DatabaseStoreMessage;
@ -33,22 +34,32 @@ public class FloodfillVerifyStoreJob extends JobImpl {
private long _sendTime;
private long _published;
private boolean _isRouterInfo;
private MessageWrapper.WrappedMessage _wrappedMessage;
private final Set<Hash> _ignore;
private static final int VERIFY_TIMEOUT = 10*1000;
private static final int START_DELAY = 20*1000;
private static final int VERIFY_TIMEOUT = 15*1000;
private static final int MAX_PEERS_TO_TRY = 5;
/**
* Delay a few seconds, then start the verify
* @param sentTo who to give the credit or blame to, can be null
*/
public FloodfillVerifyStoreJob(RouterContext ctx, Hash key, long published, boolean isRouterInfo, Hash sentTo, FloodfillNetworkDatabaseFacade facade) {
super(ctx);
facade.verifyStarted(key);
_key = key;
_published = published;
_isRouterInfo = isRouterInfo;
_log = ctx.logManager().getLog(getClass());
_sentTo = sentTo;
_facade = facade;
// wait 10 seconds before trying to verify the store
getTiming().setStartAfter(ctx.clock().now() + VERIFY_TIMEOUT);
_ignore = new HashSet(MAX_PEERS_TO_TRY);
if (sentTo != null) {
_ignore.add(_sentTo);
}
// wait some time before trying to verify the store
getTiming().setStartAfter(ctx.clock().now() + START_DELAY);
getContext().statManager().createRateStat("netDb.floodfillVerifyOK", "How long a floodfill verify takes when it succeeds", "NetworkDatabase", new long[] { 60*60*1000 });
getContext().statManager().createRateStat("netDb.floodfillVerifyFail", "How long a floodfill verify takes when it fails", "NetworkDatabase", new long[] { 60*60*1000 });
getContext().statManager().createRateStat("netDb.floodfillVerifyTimeout", "How long a floodfill verify takes when it times out", "NetworkDatabase", new long[] { 60*60*1000 });
@ -56,46 +67,77 @@ public class FloodfillVerifyStoreJob extends JobImpl {
public String getName() { return "Verify netdb store"; }
/**
* Wait 10 seconds, then query a random floodfill for the leaseset or routerinfo
* Query a random floodfill for the leaseset or routerinfo
* that we just stored to a (hopefully different) floodfill peer.
*
* If it fails (after waiting up to another 10 seconds), resend the data.
* If it fails (after a timeout period), resend the data.
* If the queried data is older than what we stored, that counts as a fail.
**/
public void runJob() {
_target = pickTarget();
if (_target == null) return;
if (_target == null) {
_facade.verifyFinished(_key);
return;
}
DatabaseLookupMessage lookup = buildLookup();
if (lookup == null) return;
if (lookup == null) {
_facade.verifyFinished(_key);
return;
}
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel();
// If we are verifying a leaseset, use the destination's own tunnels,
// to avoid association by the exploratory tunnel OBEP.
// Unless it is an encrypted leaseset.
TunnelInfo outTunnel;
if (_isRouterInfo || getContext().keyRing().get(_key) != null)
outTunnel = getContext().tunnelManager().selectOutboundTunnel();
else
outTunnel = getContext().tunnelManager().selectOutboundTunnel(_key);
if (outTunnel == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No outbound tunnels to verify a store");
_facade.verifyFinished(_key);
return;
}
// garlic encrypt to hide contents from the OBEP
RouterInfo peer = _facade.lookupRouterInfoLocally(_target);
if (peer == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Fail finding target RI");
_facade.verifyFinished(_key);
return;
}
Hash fromKey;
if (_isRouterInfo)
fromKey = null;
else
fromKey = _key;
_wrappedMessage = MessageWrapper.wrap(getContext(), lookup, fromKey, peer);
if (_wrappedMessage == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Fail Garlic encrypting");
_facade.verifyFinished(_key);
return;
}
I2NPMessage sent = _wrappedMessage.getMessage();
if (_log.shouldLog(Log.INFO))
_log.info("Starting verify (stored " + _key + " to " + _sentTo + "), asking " + _target);
_sendTime = getContext().clock().now();
_expiration = _sendTime + VERIFY_TIMEOUT;
getContext().messageRegistry().registerPending(new VerifyReplySelector(), new VerifyReplyJob(getContext()), new VerifyTimeoutJob(getContext()), VERIFY_TIMEOUT);
getContext().tunnelDispatcher().dispatchOutbound(lookup, outTunnel.getSendTunnelId(0), _target);
getContext().tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), _target);
}
/**
* Pick a responsive floodfill close to the key, but not the one we sent to
*/
private Hash pickTarget() {
Set<Hash> ignore = null;
if (_sentTo != null) {
ignore = new HashSet(1);
ignore.add(_sentTo);
}
Hash rkey = getContext().routingKeyGenerator().getRoutingKey(_key);
FloodfillPeerSelector sel = (FloodfillPeerSelector)_facade.getPeerSelector();
List<Hash> peers = sel.selectFloodfillParticipants(rkey, 1, ignore, _facade.getKBuckets());
List<Hash> peers = sel.selectFloodfillParticipants(rkey, 1, _ignore, _facade.getKBuckets());
if (peers.size() > 0)
return peers.get(0);
@ -105,7 +147,14 @@ public class FloodfillVerifyStoreJob extends JobImpl {
}
private DatabaseLookupMessage buildLookup() {
TunnelInfo replyTunnelInfo = getContext().tunnelManager().selectInboundTunnel();
// If we are verifying a leaseset, use the destination's own tunnels,
// to avoid association by the exploratory tunnel OBEP.
// Unless it is an encrypted leaseset.
TunnelInfo replyTunnelInfo;
if (_isRouterInfo || getContext().keyRing().get(_key) != null)
replyTunnelInfo = getContext().tunnelManager().selectInboundTunnel();
else
replyTunnelInfo = getContext().tunnelManager().selectInboundTunnel(_key);
if (replyTunnelInfo == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No inbound tunnels to get a reply from!");
@ -145,6 +194,9 @@ public class FloodfillVerifyStoreJob extends JobImpl {
public String getName() { return "Handle floodfill verification reply"; }
public void runJob() {
long delay = getContext().clock().now() - _sendTime;
if (_wrappedMessage != null)
_wrappedMessage.acked();
_facade.verifyFinished(_key);
if (_message instanceof DatabaseStoreMessage) {
// Verify it's as recent as the one we sent
boolean success = false;
@ -160,19 +212,20 @@ public class FloodfillVerifyStoreJob extends JobImpl {
getContext().profileManager().dbStoreSuccessful(_sentTo);
getContext().statManager().addRateData("netDb.floodfillVerifyOK", delay, 0);
if (_log.shouldLog(Log.INFO))
_log.info("Verify success");
_log.info("Verify success for " + _key);
return;
}
if (_log.shouldLog(Log.WARN))
_log.warn("Verify failed - older");
_log.warn("Verify failed (older) for " + _key);
} else if (_message instanceof DatabaseSearchReplyMessage) {
// assume 0 old, all new, 0 invalid, 0 dup
getContext().profileManager().dbLookupReply(_target, 0,
((DatabaseSearchReplyMessage)_message).getNumReplies(), 0, 0, delay);
if (_log.shouldLog(Log.WARN))
_log.warn("Verify failed - DSRM");
_log.warn("Verify failed (DSRM) for " + _key);
}
// store failed, boo, hiss!
// For now, blame the sent-to peer, but not the verify peer
if (_sentTo != null)
getContext().profileManager().dbStoreFailed(_sentTo);
getContext().statManager().addRateData("netDb.floodfillVerifyFail", delay, 0);
@ -183,8 +236,9 @@ public class FloodfillVerifyStoreJob extends JobImpl {
/**
* the netDb store failed to verify, so resend it to a random floodfill peer
* Fixme - this can loop for a long time - do we need a token or counter
* so we don't have multiple verify jobs?
* Fixme - since we now store closest-to-the-key, this is likely to store to the
* very same ff as last time, until the stats get bad enough to switch.
* Pass the failed ff through as a don't-store-to?
*/
private void resend() {
DataStructure ds;
@ -202,14 +256,24 @@ public class FloodfillVerifyStoreJob extends JobImpl {
}
public String getName() { return "Floodfill verification timeout"; }
public void runJob() {
// don't know who to blame (we could have gotten a DSRM) so blame both
if (_wrappedMessage != null)
_wrappedMessage.fail();
// Only blame the verify peer
getContext().profileManager().dbLookupFailed(_target);
if (_sentTo != null)
getContext().profileManager().dbStoreFailed(_sentTo);
//if (_sentTo != null)
// getContext().profileManager().dbStoreFailed(_sentTo);
getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime, 0);
if (_log.shouldLog(Log.WARN))
_log.warn("Verify timed out");
_log.warn("Verify timed out for: " + _key);
if (_ignore.size() < MAX_PEERS_TO_TRY) {
// Don't resend, simply rerun FVSJ.this inline and
// chose somebody besides _target for verification
_ignore.add(_target);
FloodfillVerifyStoreJob.this.runJob();
} else {
_facade.verifyFinished(_key);
resend();
}
}
}
}

View File

@ -0,0 +1,120 @@
package net.i2p.router.networkdb.kademlia;
import java.util.HashSet;
import java.util.Set;
import net.i2p.crypto.SessionKeyManager;
import net.i2p.crypto.TagSetHandle;
import net.i2p.data.Certificate;
import net.i2p.data.Hash;
import net.i2p.data.PublicKey;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.data.SessionTag;
import net.i2p.data.i2np.DeliveryInstructions;
import net.i2p.data.i2np.GarlicMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.message.GarlicMessageBuilder;
import net.i2p.router.message.PayloadGarlicConfig;
import net.i2p.util.Log;
/**
* Method an class for garlic encrypting outbound netdb traffic,
* including management of the ElGamal/AES tags
*
* @since 0.7.10
*/
class MessageWrapper {
private static final Log _log = RouterContext.getGlobalContext().logManager().getLog(MessageWrapper.class);
private static final int NETDB_TAGS_TO_DELIVER = 6;
private static final int NETDB_LOW_THRESHOLD = 3;
/**
* Garlic wrap a message from a client or this router, destined for a router,
* to hide the contents from the OBEP.
* Caller must call acked() or fail() on the returned object.
*
* @param from must be a local client with a session key manager,
* or null to use the router's session key manager
* @return null on encrypt failure
*/
static WrappedMessage wrap(RouterContext ctx, I2NPMessage m, Hash from, RouterInfo to) {
DeliveryInstructions instructions = new DeliveryInstructions();
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL);
PayloadGarlicConfig payload = new PayloadGarlicConfig();
payload.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
payload.setId(ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE));
payload.setPayload(m);
payload.setRecipient(to);
payload.setDeliveryInstructions(instructions);
payload.setRequestAck(false);
payload.setExpiration(m.getMessageExpiration());
SessionKeyManager skm;
if (from != null)
skm = ctx.clientManager().getClientSessionKeyManager(from);
else
skm = ctx.sessionKeyManager();
if (skm == null)
return null;
SessionKey sentKey = new SessionKey();
Set<SessionTag> sentTags = new HashSet();
GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, payload, sentKey, sentTags,
NETDB_TAGS_TO_DELIVER, NETDB_LOW_THRESHOLD, false, skm);
if (msg == null)
return null;
TagSetHandle tsh = null;
PublicKey sentTo = to.getIdentity().getPublicKey();
if (sentTags.size() > 0)
tsh = skm.tagsDelivered(sentTo, sentKey, sentTags);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sent to: " + to.getIdentity().getHash() + " with key: " + sentKey + " and tags: " + sentTags.size());
return new WrappedMessage(msg, skm, sentTo, sentKey, tsh);
}
/**
* Wrapper so that we can keep track of the key and tags
* for later notification to the SKM
*/
static class WrappedMessage {
private GarlicMessage msg;
private SessionKeyManager skm;
private PublicKey sentTo;
private SessionKey sessionKey;
private TagSetHandle tsh;
WrappedMessage(GarlicMessage msg, SessionKeyManager skm, PublicKey sentTo, SessionKey sentKey, TagSetHandle tsh) {
this.msg = msg;
this.skm = skm;
this.sentTo = sentTo;
this.sessionKey = sentKey;
this.tsh = tsh;
}
GarlicMessage getMessage() {
return this.msg;
}
/** delivered tags (if any) were acked */
void acked() {
if (this.tsh != null) {
this.skm.tagsAcked(this.sentTo, this.sessionKey, this.tsh);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tags acked for key: " + this.sessionKey);
}
}
/** delivered tags (if any) were not acked */
void fail() {
if (this.tsh != null) {
this.skm.failTags(this.sentTo, this.sessionKey, this.tsh);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tags NOT acked for key: " + this.sessionKey);
}
}
}
}

View File

@ -32,7 +32,7 @@ import net.i2p.stat.RateStat;
import net.i2p.util.Log;
class StoreJob extends JobImpl {
private Log _log;
protected Log _log;
private KademliaNetworkDatabaseFacade _facade;
protected StoreState _state;
private Job _onSuccess;
@ -293,7 +293,11 @@ class StoreJob extends JobImpl {
private void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) {
getContext().statManager().addRateData("netDb.storeLeaseSetSent", 1, 0);
// if it is an encrypted leaseset...
if (getContext().keyRing().get(msg.getKey()) != null)
sendStoreThroughGarlic(msg, peer, expiration);
else
sendStoreThroughClient(msg, peer, expiration);
} else {
getContext().statManager().addRateData("netDb.storeRouterInfoSent", 1, 0);
sendDirect(msg, peer, expiration);
@ -389,6 +393,93 @@ class StoreJob extends JobImpl {
return getContext().tunnelManager().selectInboundTunnel();
}
/**
* Send a leaseset store message out the client tunnel,
* with the reply to come back through a client tunnel.
* Stores are garlic encrypted to hide the identity from the OBEP.
*
* This makes it harder for an exploratory OBEP or IBGW to correlate it
* with one or more destinations. Since we are publishing the leaseset,
* it's easy to find out that an IB tunnel belongs to this dest, and
* it isn't much harder to do the same for an OB tunnel.
*
* As a side benefit, client tunnels should be faster and more reliable than
* exploratory tunnels.
*
* @param msg must contain a leaseset
* @since 0.7.10
*/
private void sendStoreThroughClient(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
long token = getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
Hash client = msg.getKey();
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel(client);
if (replyTunnel == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No reply inbound tunnels available!");
fail();
return;
}
TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0);
msg.setReplyToken(token);
msg.setReplyTunnel(replyTunnelId);
msg.setReplyGateway(replyTunnel.getPeer(0));
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": send(dbStore) w/ token expected " + token);
Hash to = peer.getIdentity().getHash();
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel(client);
if (outTunnel != null) {
// garlic encrypt
MessageWrapper.WrappedMessage wm = MessageWrapper.wrap(getContext(), msg, client, peer);
if (wm == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Fail garlic encrypting from: " + client);
fail();
return;
}
I2NPMessage sent = wm.getMessage();
SendSuccessJob onReply = new SendSuccessJob(getContext(), peer, outTunnel, sent.getMessageSize());
FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now());
StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, token, expiration);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("sending encrypted store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + sent);
//_log.debug("Expiration is " + new Date(sent.getMessageExpiration()));
}
getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now()));
_state.addPending(to, wm);
getContext().tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), null, to);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("No outbound tunnels to send a dbStore out - delaying...");
// continueSending() above did an addPending() so remove it here.
// This means we will skip the peer next time, can't be helped for now
// without modding StoreState
_state.replyTimeout(to);
Job waiter = new WaitJob(getContext());
waiter.getTiming().setStartAfter(getContext().clock().now() + 3*1000);
getContext().jobQueue().addJob(waiter);
//fail();
}
}
/**
* Called to wait a little while
* @since 0.7.10
*/
private class WaitJob extends JobImpl {
public WaitJob(RouterContext enclosingContext) {
super(enclosingContext);
}
public void runJob() {
sendNext();
}
public String getName() { return "Kademlia Store Send Delay"; }
}
/**
* Called after sending a dbStore to a peer successfully,
* marking the store as successful
@ -414,11 +505,16 @@ class StoreJob extends JobImpl {
public String getName() { return "Kademlia Store Send Success"; }
public void runJob() {
long howLong = _state.confirmed(_peer.getIdentity().getHash());
Hash hash = _peer.getIdentity().getHash();
MessageWrapper.WrappedMessage wm = _state.getPendingMessage(hash);
if (wm != null)
wm.acked();
long howLong = _state.confirmed(hash);
if (_log.shouldLog(Log.INFO))
_log.info(StoreJob.this.getJobId() + ": Marking store of " + _state.getTarget()
+ " to " + _peer.getIdentity().getHash().toBase64() + " successful after " + howLong);
getContext().profileManager().dbStoreSent(_peer.getIdentity().getHash(), howLong);
+ " to " + hash.toBase64() + " successful after " + howLong);
getContext().profileManager().dbStoreSent(hash, howLong);
getContext().statManager().addRateData("netDb.ackTime", howLong, howLong);
if ( (_sendThrough != null) && (_msgSize > 0) ) {
@ -456,11 +552,17 @@ class StoreJob extends JobImpl {
_sendOn = sendOn;
}
public void runJob() {
Hash hash = _peer.getIdentity().getHash();
if (_log.shouldLog(Log.INFO))
_log.info(StoreJob.this.getJobId() + ": Peer " + _peer.getIdentity().getHash().toBase64()
_log.info(StoreJob.this.getJobId() + ": Peer " + hash.toBase64()
+ " timed out sending " + _state.getTarget());
_state.replyTimeout(_peer.getIdentity().getHash());
getContext().profileManager().dbStoreFailed(_peer.getIdentity().getHash());
MessageWrapper.WrappedMessage wm = _state.getPendingMessage(hash);
if (wm != null)
wm.fail();
_state.replyTimeout(hash);
getContext().profileManager().dbStoreFailed(hash);
getContext().statManager().addRateData("netDb.replyTimeout", getContext().clock().now() - _sendOn, 0);
sendNext();

View File

@ -5,23 +5,26 @@ import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.DataStructure;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
/**
* Todo: remove exploratory
*
*/
class StoreState {
private RouterContext _context;
private Hash _key;
private DataStructure _data;
private final HashSet<Hash> _pendingPeers;
private HashMap<Hash, Long> _pendingPeerTimes;
private Map<Hash, Long> _pendingPeerTimes;
private Map<Hash, MessageWrapper.WrappedMessage> _pendingMessages;
private final HashSet<Hash> _successfulPeers;
private final HashSet<Hash> _successfulExploratoryPeers;
//private final HashSet<Hash> _successfulExploratoryPeers;
private final HashSet<Hash> _failedPeers;
private final HashSet<Hash> _attemptedPeers;
private int _completeCount;
@ -35,16 +38,17 @@ class StoreState {
_context = ctx;
_key = key;
_data = data;
_pendingPeers = new HashSet(16);
_pendingPeerTimes = new HashMap(16);
_attemptedPeers = new HashSet(16);
_pendingPeers = new HashSet(4);
_pendingPeerTimes = new HashMap(4);
_pendingMessages = new ConcurrentHashMap(4);
_attemptedPeers = new HashSet(8);
if (toSkip != null) {
_attemptedPeers.addAll(toSkip);
_completeCount = toSkip.size();
}
_failedPeers = new HashSet(16);
_successfulPeers = new HashSet(16);
_successfulExploratoryPeers = new HashSet(16);
_failedPeers = new HashSet(8);
_successfulPeers = new HashSet(4);
//_successfulExploratoryPeers = new HashSet(16);
_completed = -1;
_started = _context.clock().now();
}
@ -66,12 +70,15 @@ class StoreState {
return (Set<Hash>)_successfulPeers.clone();
}
}
/** @deprecated unused */
/** unused */
/****
public Set<Hash> getSuccessfulExploratory() {
synchronized (_successfulExploratoryPeers) {
return (Set<Hash>)_successfulExploratoryPeers.clone();
}
}
****/
public Set<Hash> getFailed() {
synchronized (_failedPeers) {
return (Set<Hash>)_failedPeers.clone();
@ -87,6 +94,23 @@ class StoreState {
public long getWhenStarted() { return _started; }
public long getWhenCompleted() { return _completed; }
/*
* @since 0.7.10
*/
public void addPending(Hash peer, MessageWrapper.WrappedMessage msg) {
addPending(peer);
_pendingMessages.put(peer, msg);
}
/*
* @return the message or null; will only return the message once, so
* tags are only acked or failed once.
* @since 0.7.10
*/
public MessageWrapper.WrappedMessage getPendingMessage(Hash peer) {
return _pendingMessages.remove(peer);
}
public void addPending(Hash peer) {
synchronized (_pendingPeers) {
_pendingPeers.add(peer);
@ -128,7 +152,8 @@ class StoreState {
return rv;
}
/** @deprecated unused */
/** unused */
/****
public long confirmedExploratory(Hash peer) {
long rv = -1;
synchronized (_pendingPeers) {
@ -142,6 +167,7 @@ class StoreState {
}
return rv;
}
****/
public void replyTimeout(Hash peer) {
synchronized (_pendingPeers) {
@ -193,6 +219,7 @@ class StoreState {
buf.append(peer.toBase64()).append(" ");
}
}
/****
buf.append(" Successful Exploratory: ");
synchronized (_successfulExploratoryPeers) {
buf.append(_successfulExploratoryPeers.size()).append(' ');
@ -201,6 +228,7 @@ class StoreState {
buf.append(peer.toBase64()).append(" ");
}
}
****/
return buf.toString();
}
}

View File

@ -4,6 +4,7 @@ import net.i2p.data.Hash;
import net.i2p.data.Payload;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.DataMessage;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.DeliveryInstructions;
import net.i2p.data.i2np.DeliveryStatusMessage;
@ -52,9 +53,26 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec
}
*/
// FVSJ could also result in a DSRM.
// Since there's some code that replies directly to this to gather new ff RouterInfos,
// sanitize it
if ( (_client != null) &&
(msg.getType() == DatabaseSearchReplyMessage.MESSAGE_TYPE) &&
(_client.equals(((DatabaseSearchReplyMessage)msg).getSearchKey()))) {
if (_log.shouldLog(Log.WARN))
_log.warn("Removing replies from a DSRM down a tunnel for " + _client.toBase64() + ": " + msg);
DatabaseSearchReplyMessage orig = (DatabaseSearchReplyMessage) msg;
DatabaseSearchReplyMessage newMsg = new DatabaseSearchReplyMessage(_context);
newMsg.setFromHash(orig.getFromHash());
newMsg.setSearchKey(orig.getSearchKey());
msg = newMsg;
} else if ( (_client != null) &&
(msg.getType() != DeliveryStatusMessage.MESSAGE_TYPE) &&
(msg.getType() != GarlicMessage.MESSAGE_TYPE) &&
// allow DSM of our own key (used by FloodfillVerifyStoreJob)
// as long as there's no reply token (FVSJ will never set a reply token but an attacker might)
((msg.getType() != DatabaseStoreMessage.MESSAGE_TYPE) || (!_client.equals(((DatabaseStoreMessage)msg).getKey())) ||
(((DatabaseStoreMessage)msg).getReplyToken() != 0)) &&
(msg.getType() != TunnelBuildReplyMessage.MESSAGE_TYPE)) {
// drop it, since we should only get tunnel test messages and garlic messages down
// client tunnels