* i2psnark: Notify threads awaiting DHT replies at shutdown

This commit is contained in:
zzz
2012-08-27 20:34:19 +00:00
parent 587795552e
commit b827468e2f
2 changed files with 32 additions and 7 deletions

View File

@ -44,6 +44,8 @@ class DHTNodes {
private static final long MIN_EXPIRE_TIME = 10*60*1000; private static final long MIN_EXPIRE_TIME = 10*60*1000;
private static final long DELTA_EXPIRE_TIME = 3*60*1000; private static final long DELTA_EXPIRE_TIME = 3*60*1000;
private static final int MAX_PEERS = 799; private static final int MAX_PEERS = 799;
/** Buckets older than this are refreshed - BEP 5 says 15 minutes */
private static final long MAX_BUCKET_AGE = 15*60*1000;
public DHTNodes(I2PAppContext ctx, NID me) { public DHTNodes(I2PAppContext ctx, NID me) {
_context = ctx; _context = ctx;
@ -121,7 +123,7 @@ class DHTNodes {
* DHT - get random keys to explore * DHT - get random keys to explore
*/ */
public List<NID> getExploreKeys() { public List<NID> getExploreKeys() {
return _kad.getExploreKeys(15*60*1000); return _kad.getExploreKeys(MAX_BUCKET_AGE);
} }
/** */ /** */

View File

@ -133,6 +133,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private static final int REPLY_PONG = 1; private static final int REPLY_PONG = 1;
private static final int REPLY_PEERS = 2; private static final int REPLY_PEERS = 2;
private static final int REPLY_NODES = 3; private static final int REPLY_NODES = 3;
private static final int REPLY_NETWORK_FAIL = 4;
public static final boolean SECURE_NID = true; public static final boolean SECURE_NID = true;
@ -272,6 +273,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (! (ni.equals(_myNodeInfo) || (toTry.contains(ni) && tried.contains(ni)))) if (! (ni.equals(_myNodeInfo) || (toTry.contains(ni) && tried.contains(ni))))
toTry.add(ni); toTry.add(ni);
} }
} else if (replyType == REPLY_NETWORK_FAIL) {
break;
} else { } else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject()); _log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject());
@ -370,6 +373,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (! (ni.equals(_myNodeInfo) || tried.contains(ni) || toTry.contains(ni))) if (! (ni.equals(_myNodeInfo) || tried.contains(ni) || toTry.contains(ni)))
toTry.add(ni); toTry.add(ni);
} }
} else if (replyType == REPLY_NETWORK_FAIL) {
break;
} else { } else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject()); _log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject());
@ -564,7 +569,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
_tracker.stop(); _tracker.stop();
PersistDHT.saveDHT(_knownNodes, _dhtFile); PersistDHT.saveDHT(_knownNodes, _dhtFile);
_knownNodes.stop(); _knownNodes.stop();
_sentQueries.clear(); for (Iterator<ReplyWaiter> iter = _sentQueries.values().iterator(); iter.hasNext(); ) {
ReplyWaiter waiter = iter.next();
iter.remove();
waiter.networkFail();
}
_outgoingTokens.clear(); _outgoingTokens.clear();
_incomingTokens.clear(); _incomingTokens.clear();
} }
@ -1317,7 +1326,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private final NodeInfo sentTo; private final NodeInfo sentTo;
private final Runnable onReply; private final Runnable onReply;
private final Runnable onTimeout; private final Runnable onTimeout;
private int replyCode; private volatile int replyCode;
private Object sentObject; private Object sentObject;
private Object replyObject; private Object replyObject;
@ -1400,6 +1409,18 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
this.notifyAll(); this.notifyAll();
} }
} }
/**
* Will notify this but not run onReply or onTimeout,
* or remove from _sentQueries, or call heardFrom().
*/
public void networkFail() {
cancel();
replyCode = REPLY_NETWORK_FAIL;
synchronized(this) {
this.notifyAll();
}
}
} }
// I2PSessionMuxedListener interface ---------------- // I2PSessionMuxedListener interface ----------------
@ -1532,22 +1553,24 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
return; return;
if (!_hasBootstrapped) { if (!_hasBootstrapped) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Bootstrap start size: " + _knownNodes.size()); _log.info("Bootstrap start, size: " + _knownNodes.size());
explore(_myNID, 8, 60*1000, 1); explore(_myNID, 8, 60*1000, 1);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Bootstrap done size: " + _knownNodes.size()); _log.info("Bootstrap done, size: " + _knownNodes.size());
_hasBootstrapped = true; _hasBootstrapped = true;
} }
if (!_isRunning) if (!_isRunning)
return; return;
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Explore start size: " + _knownNodes.size()); _log.info("Explore start. size: " + _knownNodes.size());
List<NID> keys = _knownNodes.getExploreKeys(); List<NID> keys = _knownNodes.getExploreKeys();
for (NID nid : keys) { for (NID nid : keys) {
explore(nid, 8, 60*1000, 1); explore(nid, 8, 60*1000, 1);
if (!_isRunning)
return;
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Explore done size: " + _knownNodes.size()); _log.info("Explore of " + keys.size() + " buckets done, new size: " + _knownNodes.size());
new Explorer(EXPLORE_TIME); new Explorer(EXPLORE_TIME);
} }
} }