- Switch to real kad with lib from i2p.zzz.kademlia (not checked in yet)

- Bootstrap only once in explore thread
- Add exploring to explore thread
- Don't store default DHT setting in config file, so we can switch default to true later
- Add new enforce-protocol streaming config, sorry locks out < 0.7.1 peers
- Log tweaks
This commit is contained in:
zzz
2012-06-22 15:12:43 +00:00
parent f8e470c7f4
commit 8522779df1
6 changed files with 76 additions and 49 deletions

View File

@ -240,6 +240,8 @@ public class I2PSnarkUtil {
opts.setProperty("i2p.streaming.maxTotalConnsPerMinute", "8"); opts.setProperty("i2p.streaming.maxTotalConnsPerMinute", "8");
if (opts.getProperty("i2p.streaming.maxConnsPerHour") == null) if (opts.getProperty("i2p.streaming.maxConnsPerHour") == null)
opts.setProperty("i2p.streaming.maxConnsPerHour", "20"); opts.setProperty("i2p.streaming.maxConnsPerHour", "20");
if (opts.getProperty("i2p.streaming.enforceProtocol") == null)
opts.setProperty("i2p.streaming.enforceProtocol", "true");
_manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts); _manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts);
_connecting = false; _connecting = false;
} }

View File

@ -289,8 +289,9 @@ public class SnarkManager implements Snark.CompleteListener {
_config.setProperty(PROP_STARTUP_DELAY, Integer.toString(DEFAULT_STARTUP_DELAY)); _config.setProperty(PROP_STARTUP_DELAY, Integer.toString(DEFAULT_STARTUP_DELAY));
if (!_config.containsKey(PROP_THEME)) if (!_config.containsKey(PROP_THEME))
_config.setProperty(PROP_THEME, DEFAULT_THEME); _config.setProperty(PROP_THEME, DEFAULT_THEME);
if (!_config.containsKey(PROP_USE_DHT)) // no, so we can switch default to true later
_config.setProperty(PROP_USE_DHT, Boolean.toString(I2PSnarkUtil.DEFAULT_USE_DHT)); //if (!_config.containsKey(PROP_USE_DHT))
// _config.setProperty(PROP_USE_DHT, Boolean.toString(I2PSnarkUtil.DEFAULT_USE_DHT));
updateConfig(); updateConfig();
} }
/** /**
@ -365,7 +366,9 @@ public class SnarkManager implements Snark.CompleteListener {
String useOT = _config.getProperty(PROP_USE_OPENTRACKERS); String useOT = _config.getProperty(PROP_USE_OPENTRACKERS);
boolean bOT = useOT == null || Boolean.valueOf(useOT).booleanValue(); boolean bOT = useOT == null || Boolean.valueOf(useOT).booleanValue();
_util.setUseOpenTrackers(bOT); _util.setUseOpenTrackers(bOT);
_util.setUseDHT(Boolean.valueOf(_config.getProperty(PROP_USE_DHT)).booleanValue()); // careful, so we can switch default to true later
_util.setUseDHT(Boolean.valueOf(_config.getProperty(PROP_USE_DHT,
Boolean.toString(I2PSnarkUtil.DEFAULT_USE_DHT))).booleanValue());
getDataDir().mkdirs(); getDataDir().mkdirs();
initTrackerMap(); initTrackerMap();
} }

View File

@ -15,15 +15,15 @@ import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.crypto.SHA1Hash; import net.i2p.crypto.SHA1Hash;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.kademlia.KBucketSet;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2; import net.i2p.util.SimpleTimer2;
/** /**
* All the nodes we know about, stored as a mapping from * All the nodes we know about, stored as a mapping from
* node ID to a Destination and Port. * node ID to a Destination and Port.
* Also uses the keySet as a subsitute for kbuckets.
* *
* Swap this out for a real DHT later. * And a real Kademlia routing table, which stores node IDs only.
* *
* @since 0.8.4 * @since 0.8.4
* @author zzz * @author zzz
@ -34,6 +34,7 @@ class DHTNodes {
private long _expireTime; private long _expireTime;
private final Log _log; private final Log _log;
private final ConcurrentHashMap<NID, NodeInfo> _nodeMap; private final ConcurrentHashMap<NID, NodeInfo> _nodeMap;
private final KBucketSet<NID> _kad;
private volatile boolean _isRunning; private volatile boolean _isRunning;
/** stagger with other cleaners */ /** stagger with other cleaners */
@ -43,11 +44,12 @@ class DHTNodes {
private static final long DELTA_EXPIRE_TIME = 7*60*1000; private static final long DELTA_EXPIRE_TIME = 7*60*1000;
private static final int MAX_PEERS = 999; private static final int MAX_PEERS = 999;
public DHTNodes(I2PAppContext ctx) { public DHTNodes(I2PAppContext ctx, NID me) {
_context = ctx; _context = ctx;
_expireTime = MAX_EXPIRE_TIME; _expireTime = MAX_EXPIRE_TIME;
_log = _context.logManager().getLog(DHTNodes.class); _log = _context.logManager().getLog(DHTNodes.class);
_nodeMap = new ConcurrentHashMap(); _nodeMap = new ConcurrentHashMap();
_kad = new KBucketSet(ctx, me, 8, 1);
} }
public void start() { public void start() {
@ -67,6 +69,7 @@ class DHTNodes {
} }
public void clear() { public void clear() {
_kad.clear();
_nodeMap.clear(); _nodeMap.clear();
} }
@ -75,13 +78,15 @@ class DHTNodes {
} }
/** /**
* @return the old value if present, else nInfo * @return the old value if present, else null
*/ */
public NodeInfo putIfAbsent(NodeInfo nInfo) { public NodeInfo putIfAbsent(NodeInfo nInfo) {
_kad.add(nInfo.getNID());
return _nodeMap.putIfAbsent(nInfo.getNID(), nInfo); return _nodeMap.putIfAbsent(nInfo.getNID(), nInfo);
} }
public NodeInfo remove(NID nid) { public NodeInfo remove(NID nid) {
_kad.remove(nid);
return _nodeMap.remove(nid); return _nodeMap.remove(nid);
} }
@ -92,36 +97,31 @@ class DHTNodes {
// end ConcurrentHashMap methods // end ConcurrentHashMap methods
/** /**
* Fake DHT * DHT
* @param sha1 either a InfoHash or a NID * @param sha1 either a InfoHash or a NID
*/ */
List<NodeInfo> findClosest(SHA1Hash h, int numWant) { public List<NodeInfo> findClosest(SHA1Hash h, int numWant) {
// sort the whole thing NID key;
Set<NID> all = new TreeSet(new SHA1Comparator(h)); if (h instanceof NID)
all.addAll(_nodeMap.keySet()); key = (NID) h;
int sz = all.size(); else
int max = Math.min(numWant, sz); key = new NID(h.getData());
List<NID> keys = _kad.getClosest(key, numWant);
// return the first ones List<NodeInfo> rv = new ArrayList(keys.size());
List<NodeInfo> rv = new ArrayList(max); for (NID nid : keys) {
int count = 0; NodeInfo ninfo = _nodeMap.get(nid);
for (NID nid : all) { if (ninfo != null)
if (count++ >= max) rv.add(ninfo);
break;
NodeInfo nInfo = get(nid);
if (nInfo == null)
continue;
rv.add(nInfo);
} }
return rv; return rv;
} }
/**** used CHM methods to be replaced: /**
public Collection<NodeInfo> values() {} * DHT - get random keys to explore
public NodeInfo get(NID nID) {} */
public NodeInfo putIfAbssent(NID nID, NodeInfo nInfo) {} public List<NID> getExploreKeys() {
public int size() {} return _kad.getExploreKeys(15*60*1000);
****/ }
/** */ /** */
private class Cleaner extends SimpleTimer2.TimedEvent { private class Cleaner extends SimpleTimer2.TimedEvent {
@ -137,10 +137,12 @@ class DHTNodes {
int peerCount = 0; int peerCount = 0;
for (Iterator<NodeInfo> iter = DHTNodes.this.values().iterator(); iter.hasNext(); ) { for (Iterator<NodeInfo> iter = DHTNodes.this.values().iterator(); iter.hasNext(); ) {
NodeInfo peer = iter.next(); NodeInfo peer = iter.next();
if (peer.lastSeen() < now - _expireTime) if (peer.lastSeen() < now - _expireTime) {
iter.remove(); iter.remove();
else _kad.remove(peer.getNID());
} else {
peerCount++; peerCount++;
}
} }
if (peerCount > MAX_PEERS) if (peerCount > MAX_PEERS)

View File

@ -110,6 +110,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private final int _qPort; private final int _qPort;
private final File _dhtFile; private final File _dhtFile;
private volatile boolean _isRunning; private volatile boolean _isRunning;
private volatile boolean _hasBootstrapped;
/** all-zero NID used for pings */ /** all-zero NID used for pings */
public static final NID FAKE_NID = new NID(new byte[NID.HASH_LENGTH]); public static final NID FAKE_NID = new NID(new byte[NID.HASH_LENGTH]);
@ -147,8 +148,6 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
_log = ctx.logManager().getLog(KRPC.class); _log = ctx.logManager().getLog(KRPC.class);
_tracker = new DHTTracker(ctx); _tracker = new DHTTracker(ctx);
// in place of a DHT, store everybody we hear from for now
_knownNodes = new DHTNodes(ctx);
_sentQueries = new ConcurrentHashMap(); _sentQueries = new ConcurrentHashMap();
_outgoingTokens = new ConcurrentHashMap(); _outgoingTokens = new ConcurrentHashMap();
_incomingTokens = new ConcurrentHashMap(); _incomingTokens = new ConcurrentHashMap();
@ -168,6 +167,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
} }
_myNodeInfo = new NodeInfo(_myNID, session.getMyDestination(), _qPort); _myNodeInfo = new NodeInfo(_myNID, session.getMyDestination(), _qPort);
_dhtFile = new File(ctx.getConfigDir(), DHT_FILE); _dhtFile = new File(ctx.getConfigDir(), DHT_FILE);
_knownNodes = new DHTNodes(ctx, _myNID);
session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort); session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort);
session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM, _qPort); session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM, _qPort);
@ -206,25 +206,24 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* Blocking! * Blocking!
* This is almost the same as getPeers() * This is almost the same as getPeers()
* *
* @param target the key we are searching for
* @param maxNodes how many to contact * @param maxNodes how many to contact
* @param maxWait how long to wait for each to reply (not total) must be > 0 * @param maxWait how long to wait for each to reply (not total) must be > 0
* @param parallel how many outstanding at once (unimplemented, always 1) * @param parallel how many outstanding at once (unimplemented, always 1)
*/ */
public void explore(int maxNodes, long maxWait, int parallel) { private void explore(NID target, int maxNodes, long maxWait, int parallel) {
// Initial set to try, will get added to as we go List<NodeInfo> nodes = _knownNodes.findClosest(target, maxNodes);
NID myNID = _myNodeInfo.getNID();
List<NodeInfo> nodes = _knownNodes.findClosest(myNID, maxNodes);
if (nodes.isEmpty()) { if (nodes.isEmpty()) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.info("DHT is empty, cannot explore"); _log.info("DHT is empty, cannot explore");
return; return;
} }
SortedSet<NodeInfo> toTry = new TreeSet(new NodeInfoComparator(myNID)); SortedSet<NodeInfo> toTry = new TreeSet(new NodeInfoComparator(target));
toTry.addAll(nodes); toTry.addAll(nodes);
Set<NodeInfo> tried = new HashSet(); Set<NodeInfo> tried = new HashSet();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Starting explore"); _log.info("Starting explore of " + target);
for (int i = 0; i < maxNodes; i++) { for (int i = 0; i < maxNodes; i++) {
if (!_isRunning) if (!_isRunning)
break; break;
@ -237,8 +236,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
toTry.remove(nInfo); toTry.remove(nInfo);
tried.add(nInfo); tried.add(nInfo);
// this isn't going to work, he will just return our own? ReplyWaiter waiter = sendFindNode(nInfo, target);
ReplyWaiter waiter = sendFindNode(nInfo, _myNID);
if (waiter == null) if (waiter == null)
continue; continue;
synchronized(waiter) { synchronized(waiter) {
@ -266,7 +264,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
} }
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Finished explore"); _log.info("Finished explore of " + target);
} }
/** /**
@ -1441,7 +1439,10 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
public void timeReached() { public void timeReached() {
if (!_isRunning) if (!_isRunning)
return; return;
(new I2PAppThread(new ExplorerThread(), "DHT Explore", true)).start(); if (_knownNodes.size() > 0)
(new I2PAppThread(new ExplorerThread(), "DHT Explore", true)).start();
else
schedule(60*1000);
} }
} }
@ -1453,10 +1454,24 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
public void run() { public void run() {
if (!_isRunning) if (!_isRunning)
return; return;
explore(8, 60*1000, 1); if (!_hasBootstrapped) {
// refresh the buckets here too if (_log.shouldLog(Log.INFO))
_log.info("Bootstrap start size: " + _knownNodes.size());
explore(_myNID, 8, 60*1000, 1);
if (_log.shouldLog(Log.INFO))
_log.info("Bootstrap done size: " + _knownNodes.size());
_hasBootstrapped = true;
}
if (!_isRunning) if (!_isRunning)
return; return;
if (_log.shouldLog(Log.INFO))
_log.info("Explore start size: " + _knownNodes.size());
List<NID> keys = _knownNodes.getExploreKeys();
for (NID nid : keys) {
explore(nid, 8, 60*1000, 1);
}
if (_log.shouldLog(Log.INFO))
_log.info("Explore done size: " + _knownNodes.size());
new Explorer(EXPLORE_TIME); new Explorer(EXPLORE_TIME);
} }
} }

View File

@ -8,17 +8,22 @@ import net.i2p.util.Clock;
/** /**
* A 20-byte peer ID, used as a Map key in lots of places. * A 20-byte peer ID, used as a Map key in lots of places.
* Must be public for constructor in KBucketSet.generateRandomKey()
* *
* @since 0.8.4 * @since 0.8.4
* @author zzz * @author zzz
*/ */
class NID extends SHA1Hash { public class NID extends SHA1Hash {
private long lastSeen; private long lastSeen;
private int fails; private int fails;
private static final int MAX_FAILS = 3; private static final int MAX_FAILS = 3;
public NID() {
super(null);
}
public NID(byte[] data) { public NID(byte[] data) {
super(data); super(data);
} }

View File

@ -225,7 +225,7 @@ class NodeInfo extends SimpleDataStructure {
@Override @Override
public String toString() { public String toString() {
return "NodeInfo: " + nID + ' ' + hash + " port: " + port; return "NodeInfo: " + nID + ' ' + hash + " port: " + port + (dest != null ? " known dest" : " null dest");
} }
/** /**