* FloodOnlySearchJob:
- Ask non-floodfill peers if we don't know any floodfills - Lookup hashes in the DatabaseSearchReplyMessage if we don't know enough floodfills
This commit is contained in:
@ -23,25 +23,40 @@ import net.i2p.util.Log;
|
|||||||
* Try sending a search to some floodfill peers, failing completely if we don't get
|
* Try sending a search to some floodfill peers, failing completely if we don't get
|
||||||
* a match from one of those peers, with no fallback to the kademlia search
|
* a match from one of those peers, with no fallback to the kademlia search
|
||||||
*
|
*
|
||||||
|
* Exception (a semi-exception, since we still fail completely without fallback):
|
||||||
|
* If we don't know any floodfill peers, we ask a couple of peers at random,
|
||||||
|
* who will hopefully reply with some floodfill keys.
|
||||||
|
* We still fail without fallback, but we then spin off a job to
|
||||||
|
* ask that same random peer for the RouterInfos for those keys.
|
||||||
|
* If that job succeeds, the next search should work better.
|
||||||
|
*
|
||||||
|
* In addition, we follow the floodfill keys in the DSRM
|
||||||
|
* (DatabaseSearchReplyMessage) if we know less than 4 floodfills.
|
||||||
|
*
|
||||||
|
* These enhancements allow the router to bootstrap back into the network
|
||||||
|
* after it loses (or never had) floodfill references, as long as it
|
||||||
|
* knows one peer that is up.
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
class FloodOnlySearchJob extends FloodSearchJob {
|
class FloodOnlySearchJob extends FloodSearchJob {
|
||||||
private Log _log;
|
protected Log _log;
|
||||||
private FloodfillNetworkDatabaseFacade _facade;
|
private FloodfillNetworkDatabaseFacade _facade;
|
||||||
private Hash _key;
|
protected Hash _key;
|
||||||
private List _onFind;
|
private List _onFind;
|
||||||
private List _onFailed;
|
private List _onFailed;
|
||||||
private long _expiration;
|
private long _expiration;
|
||||||
private int _timeoutMs;
|
protected int _timeoutMs;
|
||||||
private long _origExpiration;
|
private long _origExpiration;
|
||||||
private boolean _isLease;
|
private boolean _isLease;
|
||||||
private volatile int _lookupsRemaining;
|
protected volatile int _lookupsRemaining;
|
||||||
private volatile boolean _dead;
|
private volatile boolean _dead;
|
||||||
private long _created;
|
private long _created;
|
||||||
|
private boolean _shouldProcessDSRM;
|
||||||
|
|
||||||
private List _out;
|
protected List _out;
|
||||||
private MessageSelector _replySelector;
|
protected MessageSelector _replySelector;
|
||||||
private ReplyJob _onReply;
|
protected ReplyJob _onReply;
|
||||||
private Job _onTimeout;
|
protected Job _onTimeout;
|
||||||
public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
|
public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
|
||||||
super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease);
|
super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease);
|
||||||
_log = ctx.logManager().getLog(FloodOnlySearchJob.class);
|
_log = ctx.logManager().getLog(FloodOnlySearchJob.class);
|
||||||
@ -62,6 +77,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
|||||||
_onReply = new FloodOnlyLookupMatchJob(getContext(), this);
|
_onReply = new FloodOnlyLookupMatchJob(getContext(), this);
|
||||||
_onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
|
_onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
|
||||||
_created = System.currentTimeMillis();
|
_created = System.currentTimeMillis();
|
||||||
|
_shouldProcessDSRM = false;
|
||||||
}
|
}
|
||||||
void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) {
|
void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) {
|
||||||
if (_dead) {
|
if (_dead) {
|
||||||
@ -72,16 +88,24 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
public long getExpiration() { return _expiration; }
|
public long getExpiration() { return _expiration; }
|
||||||
|
public boolean shouldProcessDSRM() { return _shouldProcessDSRM; }
|
||||||
private static final int CONCURRENT_SEARCHES = 2;
|
private static final int CONCURRENT_SEARCHES = 2;
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
// pick some floodfill peers and send out the searches
|
// pick some floodfill peers and send out the searches
|
||||||
List floodfillPeers = _facade.getFloodfillPeers();
|
List floodfillPeers = _facade.getFloodfillPeers();
|
||||||
if (floodfillPeers == null) {
|
if (floodfillPeers.size() <= 3)
|
||||||
|
_shouldProcessDSRM = true;
|
||||||
|
if (floodfillPeers.size() <= 0) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Running netDb searches against the floodfill peers, but we don't know any");
|
_log.error("Running netDb searches against the floodfill peers, but we don't know any");
|
||||||
|
floodfillPeers = new ArrayList(_facade.getAllRouters());
|
||||||
|
if (floodfillPeers.size() <= 0) {
|
||||||
|
if (_log.shouldLog(Log.ERROR))
|
||||||
|
_log.error("We don't know any peers at all");
|
||||||
failed();
|
failed();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
OutNetMessage out = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
|
OutNetMessage out = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
|
||||||
synchronized (_out) { _out.add(out); }
|
synchronized (_out) { _out.add(out); }
|
||||||
|
|
||||||
@ -210,10 +234,12 @@ class FloodOnlyLookupTimeoutJob extends JobImpl {
|
|||||||
class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
||||||
private Log _log;
|
private Log _log;
|
||||||
private FloodOnlySearchJob _search;
|
private FloodOnlySearchJob _search;
|
||||||
|
private DatabaseSearchReplyMessage _dsrm;
|
||||||
public FloodOnlyLookupMatchJob(RouterContext ctx, FloodOnlySearchJob job) {
|
public FloodOnlyLookupMatchJob(RouterContext ctx, FloodOnlySearchJob job) {
|
||||||
super(ctx);
|
super(ctx);
|
||||||
_log = ctx.logManager().getLog(getClass());
|
_log = ctx.logManager().getLog(getClass());
|
||||||
_search = job;
|
_search = job;
|
||||||
|
_dsrm = null;
|
||||||
}
|
}
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
if ( (getContext().netDb().lookupLeaseSetLocally(_search.getKey()) != null) ||
|
if ( (getContext().netDb().lookupLeaseSetLocally(_search.getKey()) != null) ||
|
||||||
@ -224,9 +250,16 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
|||||||
} else {
|
} else {
|
||||||
int remaining = _search.getLookupsRemaining();
|
int remaining = _search.getLookupsRemaining();
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info(_search.getJobId() + ": got a DatabasSearchReply when we were looking for "
|
_log.info(_search.getJobId() + ": got a DatabaseSearchReply when we were looking for "
|
||||||
+ _search.getKey().toBase64() + ", with " + remaining + " outstanding searches");
|
+ _search.getKey().toBase64() + ", with " + remaining + " outstanding searches");
|
||||||
// netDb reply pointing us at other people
|
// netDb reply pointing us at other people
|
||||||
|
// Only process if we don't know enough floodfills
|
||||||
|
if (_search.shouldProcessDSRM() && _dsrm != null) {
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info(_search.getJobId() + ": Processing DatabaseSearchReply");
|
||||||
|
// Chase the hashes from the reply
|
||||||
|
getContext().jobQueue().addJob(new SingleLookupJob(getContext(), _dsrm));
|
||||||
|
}
|
||||||
_search.failed();
|
_search.failed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -234,6 +267,8 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
|||||||
public void setMessage(I2NPMessage message) {
|
public void setMessage(I2NPMessage message) {
|
||||||
if (message instanceof DatabaseSearchReplyMessage) {
|
if (message instanceof DatabaseSearchReplyMessage) {
|
||||||
// a dsrm is only passed in when there are no more lookups remaining
|
// a dsrm is only passed in when there are no more lookups remaining
|
||||||
|
// If more than one peer sent one, we only process the last one
|
||||||
|
_dsrm = (DatabaseSearchReplyMessage) message;
|
||||||
_search.failed();
|
_search.failed();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -288,3 +323,74 @@ class FloodOnlyLookupSelector implements MessageSelector {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Below here, only used to lookup the DSRM reply hashes when we are short of floodfills **/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ask the peer who sent us the DSRM for the RouterInfos.
|
||||||
|
* A simple version of SearchReplyJob in SearchJob.java.
|
||||||
|
* Skip the profile updates - this should be rare.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class SingleLookupJob extends JobImpl {
|
||||||
|
private Log _log;
|
||||||
|
private DatabaseSearchReplyMessage _dsrm;
|
||||||
|
public SingleLookupJob(RouterContext ctx, DatabaseSearchReplyMessage dsrm) {
|
||||||
|
super(ctx);
|
||||||
|
_log = ctx.logManager().getLog(getClass());
|
||||||
|
_dsrm = dsrm;
|
||||||
|
}
|
||||||
|
public void runJob() {
|
||||||
|
Hash from = _dsrm.getFromHash();
|
||||||
|
for (int i = 0; i < _dsrm.getNumReplies(); i++) {
|
||||||
|
Hash peer = _dsrm.getReply(i);
|
||||||
|
if (peer.equals(getContext().routerHash())) // us
|
||||||
|
continue;
|
||||||
|
if (getContext().netDb().lookupRouterInfoLocally(peer) == null)
|
||||||
|
getContext().jobQueue().addJob(new SingleSearchJob(getContext(), peer, from));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public String getName() { return "NetDb process DSRM"; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ask a single peer for a single key.
|
||||||
|
* This isn't really a flood-only search job at all, but we extend
|
||||||
|
* FloodOnlySearchJob so we can use the same selectors, etc.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class SingleSearchJob extends FloodOnlySearchJob {
|
||||||
|
Hash _to;
|
||||||
|
OutNetMessage _onm;
|
||||||
|
public SingleSearchJob(RouterContext ctx, Hash key, Hash to) {
|
||||||
|
// warning, null FloodfillNetworkDatabaseFacade ...
|
||||||
|
// define our own failed() and success() below so _facade isn't used.
|
||||||
|
super(ctx, null, key, null, null, 5*1000, false);
|
||||||
|
_to = to;
|
||||||
|
}
|
||||||
|
public String getName() { return "NetDb search key from DSRM"; }
|
||||||
|
public boolean shouldProcessDSRM() { return false; } // don't loop
|
||||||
|
public void runJob() {
|
||||||
|
_onm = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
|
||||||
|
DatabaseLookupMessage dlm = new DatabaseLookupMessage(getContext(), true);
|
||||||
|
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel();
|
||||||
|
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel();
|
||||||
|
if ( (replyTunnel == null) || (outTunnel == null) ) {
|
||||||
|
failed();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
dlm.setFrom(replyTunnel.getPeer(0));
|
||||||
|
dlm.setMessageExpiration(getContext().clock().now()+5*1000);
|
||||||
|
dlm.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
|
||||||
|
dlm.setSearchKey(_key);
|
||||||
|
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info(getJobId() + ": Single search for " + _key.toBase64() + " to " + _to.toBase64());
|
||||||
|
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), _to);
|
||||||
|
_lookupsRemaining = 1;
|
||||||
|
}
|
||||||
|
void failed() {
|
||||||
|
getContext().messageRegistry().unregisterPending(_onm);
|
||||||
|
}
|
||||||
|
void success() {}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user