* Netdb StoreJob, FloodfillVerifyStoreJob:

- Fix bug where reply selector wasn't registered for
        routerinfo stores, so we didn't get stats, and
        we kept retrying. This also prevented verification
        and profile updates for routerinfo stores.
        This bug was introduced 4 years ago by the change to
        store routerinfos directly.
      - Add dbStoreSuccessful() to profile, and have FVSJ
        call it or dbStoreFailed() as appropriate to give
        credit or blame to the floodfill we stored to.
      - Don't let FVSJ verify using the peer we stored to
This commit is contained in:
zzz
2009-11-10 01:24:39 +00:00
parent 2dc3798116
commit e02845076d
6 changed files with 79 additions and 17 deletions

View File

@ -136,6 +136,13 @@ public interface ProfileManager {
*/ */
void dbStoreSent(Hash peer, long responseTimeMs); void dbStoreSent(Hash peer, long responseTimeMs);
/**
* Note that we confirmed a successful send of db data to
* the peer.
*
*/
void dbStoreSuccessful(Hash peer);
/** /**
* Note that we were unable to confirm a successful send of db data to * Note that we were unable to confirm a successful send of db data to
* the peer, at least not within our timeout period * the peer, at least not within our timeout period

View File

@ -8,6 +8,8 @@ package net.i2p.router.networkdb.kademlia;
* *
*/ */
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set; import java.util.Set;
import net.i2p.data.DataStructure; import net.i2p.data.DataStructure;
@ -55,16 +57,20 @@ class FloodfillStoreJob extends StoreJob {
// Get the time stamp from the data we sent, so the Verify job can meke sure that // Get the time stamp from the data we sent, so the Verify job can meke sure that
// it finds something stamped with that time or newer. // it finds something stamped with that time or newer.
long published = 0; long published = 0;
boolean isRouterInfo = false;
DataStructure data = _state.getData(); DataStructure data = _state.getData();
if (data instanceof RouterInfo) { boolean isRouterInfo = data instanceof RouterInfo;
if (isRouterInfo) {
published = ((RouterInfo) data).getPublished(); published = ((RouterInfo) data).getPublished();
isRouterInfo = true;
} else if (data instanceof LeaseSet) { } else if (data instanceof LeaseSet) {
published = ((LeaseSet) data).getEarliestLeaseDate(); published = ((LeaseSet) data).getEarliestLeaseDate();
} }
// we should always have exactly one successful entry
Hash sentTo = null;
try {
sentTo = _state.getSuccessful().iterator().next();
} catch (NoSuchElementException nsee) {}
getContext().jobQueue().addJob(new FloodfillVerifyStoreJob(getContext(), _state.getTarget(), getContext().jobQueue().addJob(new FloodfillVerifyStoreJob(getContext(), _state.getTarget(),
published, isRouterInfo, _facade)); published, isRouterInfo, sentTo, _facade));
} }
} }

View File

@ -26,6 +26,7 @@ public class FloodfillVerifyStoreJob extends JobImpl {
private Log _log; private Log _log;
private Hash _key; private Hash _key;
private Hash _target; private Hash _target;
private Hash _sentTo;
private FloodfillNetworkDatabaseFacade _facade; private FloodfillNetworkDatabaseFacade _facade;
private long _expiration; private long _expiration;
private long _sendTime; private long _sendTime;
@ -34,12 +35,16 @@ public class FloodfillVerifyStoreJob extends JobImpl {
private static final int VERIFY_TIMEOUT = 10*1000; private static final int VERIFY_TIMEOUT = 10*1000;
public FloodfillVerifyStoreJob(RouterContext ctx, Hash key, long published, boolean isRouterInfo, FloodfillNetworkDatabaseFacade facade) { /**
* @param sentTo who to give the credit or blame to, can be null
*/
public FloodfillVerifyStoreJob(RouterContext ctx, Hash key, long published, boolean isRouterInfo, Hash sentTo, FloodfillNetworkDatabaseFacade facade) {
super(ctx); super(ctx);
_key = key; _key = key;
_published = published; _published = published;
_isRouterInfo = isRouterInfo; _isRouterInfo = isRouterInfo;
_log = ctx.logManager().getLog(getClass()); _log = ctx.logManager().getLog(getClass());
_sentTo = sentTo;
_facade = facade; _facade = facade;
// wait 10 seconds before trying to verify the store // wait 10 seconds before trying to verify the store
getTiming().setStartAfter(ctx.clock().now() + VERIFY_TIMEOUT); getTiming().setStartAfter(ctx.clock().now() + VERIFY_TIMEOUT);
@ -70,22 +75,28 @@ public class FloodfillVerifyStoreJob extends JobImpl {
return; return;
} }
if (_log.shouldLog(Log.INFO))
_log.info("Starting verify (stored " + _key + " to " + _sentTo + "), asking " + _target);
_sendTime = getContext().clock().now(); _sendTime = getContext().clock().now();
_expiration = _sendTime + VERIFY_TIMEOUT; _expiration = _sendTime + VERIFY_TIMEOUT;
getContext().messageRegistry().registerPending(new VerifyReplySelector(), new VerifyReplyJob(getContext()), new VerifyTimeoutJob(getContext()), VERIFY_TIMEOUT); getContext().messageRegistry().registerPending(new VerifyReplySelector(), new VerifyReplyJob(getContext()), new VerifyTimeoutJob(getContext()), VERIFY_TIMEOUT);
getContext().tunnelDispatcher().dispatchOutbound(lookup, outTunnel.getSendTunnelId(0), _target); getContext().tunnelDispatcher().dispatchOutbound(lookup, outTunnel.getSendTunnelId(0), _target);
} }
/** todo pick one close to the key */
private Hash pickTarget() { private Hash pickTarget() {
FloodfillPeerSelector sel = (FloodfillPeerSelector)_facade.getPeerSelector(); FloodfillPeerSelector sel = (FloodfillPeerSelector)_facade.getPeerSelector();
List peers = sel.selectFloodfillParticipants(_facade.getKBuckets()); List<Hash> peers = sel.selectFloodfillParticipants(_facade.getKBuckets());
Collections.shuffle(peers, getContext().random()); Collections.shuffle(peers, getContext().random());
if (peers.size() > 0) for (int i = 0; i < peers.size(); i++) {
return (Hash)peers.get(0); Hash rv = peers.get(i);
if (!rv.equals(_sentTo))
return rv;
}
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("No peers to verify floodfill with"); _log.warn("No other peers to verify floodfill with, using the one we sent to");
return null; return _sentTo;
} }
private DatabaseLookupMessage buildLookup() { private DatabaseLookupMessage buildLookup() {
@ -140,7 +151,11 @@ public class FloodfillVerifyStoreJob extends JobImpl {
if (success) { if (success) {
// store ok, w00t! // store ok, w00t!
getContext().profileManager().dbLookupSuccessful(_target, delay); getContext().profileManager().dbLookupSuccessful(_target, delay);
if (_sentTo != null)
getContext().profileManager().dbStoreSuccessful(_sentTo);
getContext().statManager().addRateData("netDb.floodfillVerifyOK", delay, 0); getContext().statManager().addRateData("netDb.floodfillVerifyOK", delay, 0);
if (_log.shouldLog(Log.INFO))
_log.info("Verify success");
return; return;
} }
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -149,8 +164,12 @@ public class FloodfillVerifyStoreJob extends JobImpl {
// assume 0 old, all new, 0 invalid, 0 dup // assume 0 old, all new, 0 invalid, 0 dup
getContext().profileManager().dbLookupReply(_target, 0, getContext().profileManager().dbLookupReply(_target, 0,
((DatabaseSearchReplyMessage)_message).getNumReplies(), 0, 0, delay); ((DatabaseSearchReplyMessage)_message).getNumReplies(), 0, 0, delay);
if (_log.shouldLog(Log.WARN))
_log.warn("Verify failed - DSRM");
} }
// store failed, boo, hiss! // store failed, boo, hiss!
if (_sentTo != null)
getContext().profileManager().dbStoreFailed(_sentTo);
getContext().statManager().addRateData("netDb.floodfillVerifyFail", delay, 0); getContext().statManager().addRateData("netDb.floodfillVerifyFail", delay, 0);
resend(); resend();
} }
@ -173,7 +192,10 @@ public class FloodfillVerifyStoreJob extends JobImpl {
} }
public String getName() { return "Floodfill verification timeout"; } public String getName() { return "Floodfill verification timeout"; }
public void runJob() { public void runJob() {
// don't know who to blame (we could have gotten a DSRM) so blame both
getContext().profileManager().dbLookupFailed(_target); getContext().profileManager().dbLookupFailed(_target);
if (_sentTo != null)
getContext().profileManager().dbStoreFailed(_sentTo);
getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime, 0); getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime, 0);
resend(); resend();
} }

View File

@ -269,6 +269,7 @@ class StoreJob extends JobImpl {
sendStoreThroughGarlic(msg, peer, expiration); sendStoreThroughGarlic(msg, peer, expiration);
} else { } else {
getContext().statManager().addRateData("netDb.storeRouterInfoSent", 1, 0); getContext().statManager().addRateData("netDb.storeRouterInfoSent", 1, 0);
// todo - shorter expiration for direct?
sendDirect(msg, peer, expiration); sendDirect(msg, peer, expiration);
} }
} }
@ -298,6 +299,7 @@ class StoreJob extends JobImpl {
m.setPriority(STORE_PRIORITY); m.setPriority(STORE_PRIORITY);
m.setReplySelector(selector); m.setReplySelector(selector);
m.setTarget(peer); m.setTarget(peer);
getContext().messageRegistry().registerPending(m);
getContext().commSystem().processMessage(m); getContext().commSystem().processMessage(m);
} }
@ -324,8 +326,8 @@ class StoreJob extends JobImpl {
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug(getJobId() + ": Sending tunnel message out " + outTunnelId + " to " // _log.debug(getJobId() + ": Sending tunnel message out " + outTunnelId + " to "
// + peer.getIdentity().getHash().toBase64()); // + peer.getIdentity().getHash().toBase64());
TunnelId targetTunnelId = null; // not needed //TunnelId targetTunnelId = null; // not needed
Job onSend = null; // not wanted //Job onSend = null; // not wanted
SendSuccessJob onReply = new SendSuccessJob(getContext(), peer, outTunnel, msg.getMessageSize()); SendSuccessJob onReply = new SendSuccessJob(getContext(), peer, outTunnel, msg.getMessageSize());
FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now()); FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now());
@ -426,7 +428,7 @@ class StoreJob extends JobImpl {
sendNext(); sendNext();
} }
public String getName() { return "Kademlia Store Peer Failed"; } public String getName() { return "Kademlia Store Send Failed"; }
} }
/** /**

View File

@ -11,6 +11,9 @@ import net.i2p.data.DataStructure;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
/**
* Todo: remove exploratory
*/
class StoreState { class StoreState {
private RouterContext _context; private RouterContext _context;
private Hash _key; private Hash _key;
@ -63,6 +66,7 @@ class StoreState {
return (Set<Hash>)_successfulPeers.clone(); return (Set<Hash>)_successfulPeers.clone();
} }
} }
/** @deprecated unused */
public Set<Hash> getSuccessfulExploratory() { public Set<Hash> getSuccessfulExploratory() {
synchronized (_successfulExploratoryPeers) { synchronized (_successfulExploratoryPeers) {
return (Set<Hash>)_successfulExploratoryPeers.clone(); return (Set<Hash>)_successfulExploratoryPeers.clone();
@ -124,6 +128,7 @@ class StoreState {
return rv; return rv;
} }
/** @deprecated unused */
public long confirmedExploratory(Hash peer) { public long confirmedExploratory(Hash peer) {
long rv = -1; long rv = -1;
synchronized (_pendingPeers) { synchronized (_pendingPeers) {

View File

@ -89,6 +89,7 @@ public class ProfileManagerImpl implements ProfileManager {
/** /**
* Note that a router explicitly rejected joining a tunnel. * Note that a router explicitly rejected joining a tunnel.
* *
* @param responseTimeMs ignored
* @param severity how much the peer doesnt want to participate in the * @param severity how much the peer doesnt want to participate in the
* tunnel (large == more severe) * tunnel (large == more severe)
*/ */
@ -249,9 +250,30 @@ public class ProfileManagerImpl implements ProfileManager {
* Note that we've confirmed a successful send of db data to the peer (though we haven't * Note that we've confirmed a successful send of db data to the peer (though we haven't
* necessarily requested it again from them, so they /might/ be lying) * necessarily requested it again from them, so they /might/ be lying)
* *
* This will force creation of DB stats * This is not really interesting, since they could be lying, so we do not
* increment any DB stats at all. On verify, call dbStoreSuccessful().
*
* @param responseTimeMs ignored
*/ */
public void dbStoreSent(Hash peer, long responseTimeMs) { public void dbStoreSent(Hash peer, long responseTimeMs) {
PeerProfile data = getProfile(peer);
if (data == null) return;
long now = _context.clock().now();
data.setLastHeardFrom(now);
data.setLastSendSuccessful(now);
//if (!data.getIsExpandedDB())
// data.expandDBProfile();
//DBHistory hist = data.getDBHistory();
//hist.storeSuccessful();
}
/**
* Note that we've verified a successful send of db data to the floodfill peer
* by querying another floodfill.
*
* This will force creation of DB stats
*/
public void dbStoreSuccessful(Hash peer) {
PeerProfile data = getProfile(peer); PeerProfile data = getProfile(peer);
if (data == null) return; if (data == null) return;
long now = _context.clock().now(); long now = _context.clock().now();
@ -261,8 +283,6 @@ public class ProfileManagerImpl implements ProfileManager {
data.expandDBProfile(); data.expandDBProfile();
DBHistory hist = data.getDBHistory(); DBHistory hist = data.getDBHistory();
hist.storeSuccessful(); hist.storeSuccessful();
// we could do things like update some sort of "how many successful stores we've sent them"...
// naah.. dont really care now
} }
/** /**
@ -277,7 +297,7 @@ public class ProfileManagerImpl implements ProfileManager {
if (!data.getIsExpandedDB()) if (!data.getIsExpandedDB())
data.expandDBProfile(); data.expandDBProfile();
DBHistory hist = data.getDBHistory(); DBHistory hist = data.getDBHistory();
hist.storeSuccessful(); hist.storeFailed();
// we could do things like update some sort of "how many successful stores we've // we could do things like update some sort of "how many successful stores we've
// failed to send them"... // failed to send them"...
} }