2006-03-24 jrandom

* Try to desync tunnel building near startup (thanks Complication!)
    * If we are highly congested, fall back on only querying the floodfill
      netDb peers, and only storing to those peers too
    * Cleaned up the floodfill-only queries
This commit is contained in:
jrandom
2006-03-24 20:53:28 +00:00
committed by zzz
parent 2a24029acf
commit d489caa88c
8 changed files with 301 additions and 24 deletions

View File

@ -1,4 +1,10 @@
$Id: history.txt,v 1.436 2006/03/20 00:31:15 jrandom Exp $
$Id: history.txt,v 1.437 2006/03/21 18:11:33 jrandom Exp $
2006-03-24 jrandom
* Try to desync tunnel building near startup (thanks Complication!)
* If we are highly congested, fall back on only querying the floodfill
netDb peers, and only storing to those peers too
* Cleaned up the floodfill-only queries
2006-03-21 jrandom
* Avoid a very strange (unconfirmed) bug that people using the systray's

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.377 $ $Date: 2006/03/20 00:31:13 $";
public final static String ID = "$Revision: 1.378 $ $Date: 2006/03/21 18:13:09 $";
public final static String VERSION = "0.6.1.12";
public final static long BUILD = 15;
public final static long BUILD = 16;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -0,0 +1,244 @@
package net.i2p.router.networkdb.kademlia;
import java.util.*;
import net.i2p.router.*;
import net.i2p.data.Hash;
import net.i2p.data.i2np.*;
import net.i2p.util.Log;
/**
* Try sending a search to some floodfill peers, failing completely if we don't get
* a match from one of those peers, with no fallback to the kademlia search
*
*/
class FloodOnlySearchJob extends FloodSearchJob {
private Log _log;
private FloodfillNetworkDatabaseFacade _facade;
private Hash _key;
private List _onFind;
private List _onFailed;
private long _expiration;
private int _timeoutMs;
private long _origExpiration;
private boolean _isLease;
private volatile int _lookupsRemaining;
private volatile boolean _dead;
private long _created;
private List _out;
private MessageSelector _replySelector;
private ReplyJob _onReply;
private Job _onTimeout;
public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease);
_log = ctx.logManager().getLog(FloodOnlySearchJob.class);
_facade = facade;
_key = key;
_onFind = new ArrayList();
_onFind.add(onFind);
_onFailed = new ArrayList();
_onFailed.add(onFailed);
_timeoutMs = Math.min(timeoutMs, SearchJob.PER_FLOODFILL_PEER_TIMEOUT);
_expiration = _timeoutMs + ctx.clock().now();
_origExpiration = _timeoutMs + ctx.clock().now();
_isLease = isLease;
_lookupsRemaining = 0;
_dead = false;
_out = new ArrayList(2);
_replySelector = new FloodOnlyLookupSelector(getContext(), this);
_onReply = new FloodOnlyLookupMatchJob(getContext(), this);
_onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
_created = System.currentTimeMillis();
}
void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) {
if (_dead) {
getContext().jobQueue().addJob(onFailed);
} else {
if (onFind != null) synchronized (_onFind) { _onFind.add(onFind); }
if (onFailed != null) synchronized (_onFailed) { _onFailed.add(onFailed); }
}
}
public long getExpiration() { return _expiration; }
private static final int CONCURRENT_SEARCHES = 2;
public void runJob() {
// pick some floodfill peers and send out the searches
List floodfillPeers = _facade.getFloodfillPeers();
if (floodfillPeers == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Running netDb searches against the floodfill peers, but we don't know any");
failed();
return;
}
OutNetMessage out = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
_out.add(out);
for (int i = 0; _lookupsRemaining < CONCURRENT_SEARCHES && i < floodfillPeers.size(); i++) {
Hash peer = (Hash)floodfillPeers.get(i);
if (peer.equals(getContext().routerHash()))
continue;
DatabaseLookupMessage dlm = new DatabaseLookupMessage(getContext(), true);
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel();
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel();
if ( (replyTunnel == null) || (outTunnel == null) ) {
failed();
return;
}
dlm.setFrom(replyTunnel.getPeer(0));
dlm.setMessageExpiration(getContext().clock().now()+10*1000);
dlm.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
dlm.setSearchKey(_key);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " to " + peer.toBase64());
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), peer);
_lookupsRemaining++;
}
if (_lookupsRemaining <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " had no peers to send to");
// no floodfill peers, fail
failed();
}
}
public String getName() { return "NetDb flood search (phase 1)"; }
Hash getKey() { return _key; }
void decrementRemaining() { _lookupsRemaining--; }
int getLookupsRemaining() { return _lookupsRemaining; }
void failed() {
synchronized (this) {
if (_dead) return;
_dead = true;
}
for (int i = 0; i < _out.size(); i++) {
OutNetMessage out = (OutNetMessage)_out.get(i);
getContext().messageRegistry().unregisterPending(out);
}
int timeRemaining = (int)(_origExpiration - getContext().clock().now());
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining + " remaining after " + (System.currentTimeMillis()-_created));
_facade.complete(_key);
getContext().statManager().addRateData("netDb.failedTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created);
synchronized (_onFailed) {
for (int i = 0; i < _onFailed.size(); i++) {
Job j = (Job)_onFailed.remove(0);
getContext().jobQueue().addJob(j);
}
}
}
void success() {
synchronized (this) {
if (_dead) return;
_dead = true;
}
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " successful");
_facade.complete(_key);
getContext().statManager().addRateData("netDb.successTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created);
synchronized (_onFind) {
while (_onFind.size() > 0)
getContext().jobQueue().addJob((Job)_onFind.remove(0));
}
}
}
class FloodOnlyLookupTimeoutJob extends JobImpl {
private FloodSearchJob _search;
private Log _log;
public FloodOnlyLookupTimeoutJob(RouterContext ctx, FloodOnlySearchJob job) {
super(ctx);
_search = job;
_log = ctx.logManager().getLog(getClass());
}
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": search timed out");
_search.failed();
}
public String getName() { return "NetDb flood search (phase 1) timeout"; }
}
class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
private Log _log;
private FloodOnlySearchJob _search;
public FloodOnlyLookupMatchJob(RouterContext ctx, FloodOnlySearchJob job) {
super(ctx);
_log = ctx.logManager().getLog(getClass());
_search = job;
}
public void runJob() {
if ( (getContext().netDb().lookupLeaseSetLocally(_search.getKey()) != null) ||
(getContext().netDb().lookupRouterInfoLocally(_search.getKey()) != null) ) {
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": search match and found locally");
_search.success();
} else {
int remaining = _search.getLookupsRemaining();
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": got a DatabasSearchReply when we were looking for "
+ _search.getKey().toBase64() + ", with " + remaining + " outstanding searches");
// netDb reply pointing us at other people
_search.failed();
}
}
public String getName() { return "NetDb flood search (phase 1) match"; }
public void setMessage(I2NPMessage message) {
if (message instanceof DatabaseSearchReplyMessage) {
// a dsrm is only passed in when there are no more lookups remaining
_search.failed();
return;
}
try {
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
if (dsm.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET)
getContext().netDb().store(dsm.getKey(), dsm.getLeaseSet());
else
getContext().netDb().store(dsm.getKey(), dsm.getRouterInfo());
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.WARN))
_log.warn(_search.getJobId() + ": Received an invalid store reply", iae);
}
}
}
class FloodOnlyLookupSelector implements MessageSelector {
private RouterContext _context;
private FloodOnlySearchJob _search;
private boolean _matchFound;
private Log _log;
public FloodOnlyLookupSelector(RouterContext ctx, FloodOnlySearchJob search) {
_context = ctx;
_search = search;
_log = ctx.logManager().getLog(getClass());
_matchFound = false;
}
public boolean continueMatching() {
return _search.getLookupsRemaining() > 0 && !_matchFound && _context.clock().now() < getExpiration();
}
public long getExpiration() { return (_matchFound ? -1 : _search.getExpiration()); }
public boolean isMatch(I2NPMessage message) {
if (message == null) return false;
if (message instanceof DatabaseStoreMessage) {
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
// is it worth making sure the reply came in on the right tunnel?
if (_search.getKey().equals(dsm.getKey())) {
_search.decrementRemaining();
_matchFound = true;
return true;
}
} else if (message instanceof DatabaseSearchReplyMessage) {
DatabaseSearchReplyMessage dsrm = (DatabaseSearchReplyMessage)message;
if (_search.getKey().equals(dsrm.getSearchKey())) {
_search.decrementRemaining();
if (_search.getLookupsRemaining() <= 0)
return true; // ok, no more left, so time to fail
else
return false;
}
}
return false;
}
}

View File

@ -19,6 +19,18 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
public FloodfillNetworkDatabaseFacade(RouterContext context) {
super(context);
_activeFloodQueries = new HashMap();
_context.statManager().createRateStat("netDb.successTime", "How long a successful search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.failedAttemptedPeers", "How many peers we sent a search to when the search fails", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
_context.statManager().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.failedPeers", "How many peers fail to respond to a lookup?", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.searchCount", "Overall number of searches sent", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.searchMessageCount", "Overall number of mesages for all searches sent", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.searchReplyValidated", "How many search replies we get that we are able to validate (fetch)", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.searchReplyNotValidated", "How many search replies we get that we are NOT able to validate (fetch)", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.searchReplyValidationSkipped", "How many search replies we get from unreliable peers that we skip?", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.republishQuantity", "How many peers do we need to send a found leaseSet to?", "NetworkDatabase", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
}
protected void createHandlers() {
@ -117,13 +129,16 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
*/
SearchJob search(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs, boolean isLease) {
//if (true) return super.search(key, onFindJob, onFailedLookupJob, timeoutMs, isLease);
boolean isNew = true;
FloodSearchJob searchJob = null;
synchronized (_activeFloodQueries) {
searchJob = (FloodSearchJob)_activeFloodQueries.get(key);
if (searchJob == null) {
searchJob = new FloodSearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
if (SearchJob.onlyQueryFloodfillPeers(_context)) {
searchJob = new FloodOnlySearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
} else {
searchJob = new FloodSearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
}
_activeFloodQueries.put(key, searchJob);
isNew = true;
}
@ -206,7 +221,8 @@ class FloodSearchJob extends JobImpl {
_onFind.add(onFind);
_onFailed = new ArrayList();
_onFailed.add(onFailed);
int timeout = timeoutMs / FLOOD_SEARCH_TIME_FACTOR;
int timeout = -1;
timeout = timeoutMs / FLOOD_SEARCH_TIME_FACTOR;
if (timeout < timeoutMs)
timeout = timeoutMs;
_timeoutMs = timeout;
@ -371,4 +387,4 @@ class FloodLookupSelector implements MessageSelector {
}
return false;
}
}
}

View File

@ -84,7 +84,7 @@ class FloodfillPeerSelector extends PeerSelector {
if (info != null && FloodfillNetworkDatabaseFacade.isFloodfill(info)) {
_floodfillMatches.add(entry);
} else {
if ( (_wanted > _matches) && (_key != null) ) {
if ( (!SearchJob.onlyQueryFloodfillPeers(_context)) && (_wanted > _matches) && (_key != null) ) {
BigInteger diff = getDistance(_key, entry);
_sorted.put(diff, entry);
}

View File

@ -103,18 +103,6 @@ class SearchJob extends JobImpl {
_floodfillPeersExhausted = false;
_floodfillSearchesOutstanding = 0;
_expiration = getContext().clock().now() + timeoutMs;
getContext().statManager().createRateStat("netDb.successTime", "How long a successful search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.failedAttemptedPeers", "How many peers we sent a search to when the search fails", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
getContext().statManager().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.failedPeers", "How many peers fail to respond to a lookup?", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchCount", "Overall number of searches sent", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchMessageCount", "Overall number of mesages for all searches sent", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchReplyValidated", "How many search replies we get that we are able to validate (fetch)", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchReplyNotValidated", "How many search replies we get that we are NOT able to validate (fetch)", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchReplyValidationSkipped", "How many search replies we get from unreliable peers that we skip?", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.republishQuantity", "How many peers do we need to send a found leaseSet to?", "NetworkDatabase", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().addRateData("netDb.searchCount", 1, 0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Search (" + getClass().getName() + " for " + key.toBase64(), new Exception("Search enqueued by"));
@ -135,11 +123,21 @@ class SearchJob extends JobImpl {
private static final boolean DEFAULT_FLOODFILL_ONLY = false;
protected boolean onlyQueryFloodfillPeers() {
return Boolean.valueOf(getContext().getProperty("netDb.floodfillOnly", DEFAULT_FLOODFILL_ONLY + "")).booleanValue();
static boolean onlyQueryFloodfillPeers(RouterContext ctx) {
if (isCongested(ctx))
return true;
return Boolean.valueOf(ctx.getProperty("netDb.floodfillOnly", DEFAULT_FLOODFILL_ONLY + "")).booleanValue();
}
private static final int PER_FLOODFILL_PEER_TIMEOUT = 10*1000;
static boolean isCongested(RouterContext ctx) {
float availableSend = ctx.bandwidthLimiter().getOutboundKBytesPerSecond()*1024 - ctx.bandwidthLimiter().getSendBps();
float availableRecv = ctx.bandwidthLimiter().getInboundKBytesPerSecond()*1024 - ctx.bandwidthLimiter().getReceiveBps();
// 6KBps is an arbitrary limit, but a wider search should be able to operate
// in that range without a problem
return ( (availableSend < 6*1024) || (availableRecv < 6*1024) );
}
static final int PER_FLOODFILL_PEER_TIMEOUT = 10*1000;
protected int getPerPeerTimeoutMs(Hash peer) {
int timeout = 0;
@ -251,7 +249,7 @@ class SearchJob extends JobImpl {
int sent = 0;
Set attempted = _state.getAttempted();
while (sent <= 0) {
boolean onlyFloodfill = onlyQueryFloodfillPeers();
boolean onlyFloodfill = onlyQueryFloodfillPeers(getContext());
if (_floodfillPeersExhausted && onlyFloodfill && _state.getPending().size() <= 0) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": no non-floodfill peers left, and no more pending. Searched: "

View File

@ -16,6 +16,7 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
private TestJob _testJob;
private Job _expireJob;
private TunnelInfo _pairedTunnel;
private boolean _live;
/** Creates a new instance of PooledTunnelCreatorConfig */
@ -25,12 +26,19 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
public PooledTunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound, Hash destination) {
super(ctx, length, isInbound, destination);
_pool = null;
_live = false;
}
public void testSuccessful(int ms) {
if (_testJob != null)
_testJob.testSuccessful(ms);
super.testSuccessful(ms);
// once a tunnel has been built and we know it works, lets skew ourselves a bit so we
// aren't as cyclic
if ( (_context.router().getUptime() < 10*60*1000) && (!_live) )
setExpiration(getExpiration() - _context.random().nextInt(5*60*1000));
_live = true;
}
/**

View File

@ -129,6 +129,11 @@ class TestJob extends JobImpl {
getContext().keyManager().getPublicKey(),
encryptKey, encryptTag);
if (msg == null) {
// overloaded / unknown peers / etc
scheduleRetest();
return;
}
Set encryptTags = new HashSet(1);
encryptTags.add(encryptTag);
getContext().sessionKeyManager().tagsReceived(encryptKey, encryptTags);