propagate from branch 'i2p.i2p.zzz.dhtsnark' (head d4f16babae7cb0156609b211f5bb0310b03aaf57)

to branch 'i2p.i2p' (head 7bcd2f192b0f571374cc9882eca407095eb97c17)
This commit is contained in:
zzz
2012-08-06 14:05:09 +00:00
36 changed files with 3786 additions and 64 deletions

View File

@ -0,0 +1,75 @@
package net.i2p.kademlia;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.util.Set;
import net.i2p.data.SimpleDataStructure;
/**
* Group, without inherent ordering, a set of keys a certain distance away from
* a local key, using XOR as the distance metric
*
* Refactored from net.i2p.router.networkdb.kademlia
*/
public interface KBucket<T extends SimpleDataStructure> {
/**
* Lowest order high bit for difference keys.
* The lower-bounds distance of this bucket is 2**begin.
* If begin == 0, this is the closest bucket.
*/
public int getRangeBegin();
/**
* Highest high bit for the difference keys.
* The upper-bounds distance of this bucket is (2**(end+1)) - 1.
* If begin == end, the bucket cannot be split further.
* If end == (numbits - 1), this is the furthest bucket.
*/
public int getRangeEnd();
/**
* Number of keys already contained in this kbucket
*/
public int getKeyCount();
/**
* Add the peer to the bucket
*
* @return true if added
*/
public boolean add(T key);
/**
* Remove the key from the bucket
* @return true if the key existed in the bucket before removing it, else false
*/
public boolean remove(T key);
/**
* Update the last-changed timestamp to now.
*/
public void setLastChanged();
/**
* The last-changed timestamp
*/
public long getLastChanged();
/**
* Retrieve all routing table entries stored in the bucket
* @return set of Hash structures
*/
public Set<T> getEntries();
public void getEntries(SelectionCollector<T> collector);
public void clear();
}

View File

@ -0,0 +1,149 @@
package net.i2p.kademlia;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.util.Collections;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.data.SimpleDataStructure;
import net.i2p.util.ConcurrentHashSet;
/**
* A concurrent implementation using ConcurrentHashSet.
* The max size (K) may be temporarily exceeded due to concurrency,
* a pending split, or the behavior of the supplied trimmer,
* as explained below.
* The creator is responsible for splits.
*
* This class has no knowledge of the DHT base used for XORing,
* and thus there are no validity checks in add/remove.
*
* The begin and end values are immutable.
* All entries in this bucket will have at least one bit different
* from us in the range [begin, end] inclusive.
* Splits must be implemented by creating two new buckets
* and discarding this one.
*
* The keys are kept in a Set and are NOT sorted by last-seen.
* Per-key last-seen-time, failures, etc. must be tracked elsewhere.
*
* If this bucket is full (i.e. begin == end && size == max)
* then add() will call KBucketTrimmer.trim() do
* (possibly) remove older entries, and indicate whether
* to add the new entry. If the trimmer returns true without
* removing entries, this KBucket will exceed the max size.
*
* Refactored from net.i2p.router.networkdb.kademlia
*/
class KBucketImpl<T extends SimpleDataStructure> implements KBucket<T> {
/**
* set of Hash objects for the peers in the kbucket
*/
private final Set<T> _entries;
/** include if any bits equal or higher to this bit (in big endian order) */
private final int _begin;
/** include if no bits higher than this bit (inclusive) are set */
private final int _end;
private final int _max;
private final KBucketSet.KBucketTrimmer _trimmer;
/** when did we last shake things up */
private long _lastChanged;
private final I2PAppContext _context;
/**
* All entries in this bucket will have at least one bit different
* from us in the range [begin, end] inclusive.
*/
public KBucketImpl(I2PAppContext context, int begin, int end, int max, KBucketSet.KBucketTrimmer trimmer) {
if (begin > end)
throw new IllegalArgumentException(begin + " > " + end);
_context = context;
_entries = new ConcurrentHashSet(max + 4);
_begin = begin;
_end = end;
_max = max;
_trimmer = trimmer;
}
public int getRangeBegin() { return _begin; }
public int getRangeEnd() { return _end; }
public int getKeyCount() {
return _entries.size();
}
/**
* @return an unmodifiable view; not a copy
*/
public Set<T> getEntries() {
return Collections.unmodifiableSet(_entries);
}
public void getEntries(SelectionCollector<T> collector) {
for (T h : _entries) {
collector.add(h);
}
}
public void clear() {
_entries.clear();
}
/**
* Sets last-changed if rv is true OR if the peer is already present.
* Calls the trimmer if begin == end and we are full.
* If begin != end then add it and caller must do bucket splitting.
* @return true if added
*/
public boolean add(T peer) {
if (_begin != _end || _entries.size() < _max ||
_entries.contains(peer) || _trimmer.trim(this, peer)) {
// do this even if already contains, to call setLastChanged()
boolean rv = _entries.add(peer);
setLastChanged();
return rv;
}
return false;
}
/**
* @return if removed. Does NOT set lastChanged.
*/
public boolean remove(T peer) {
boolean rv = _entries.remove(peer);
//if (rv)
// setLastChanged();
return rv;
}
/**
* Update the last-changed timestamp to now.
*/
public void setLastChanged() {
_lastChanged = _context.clock().now();
}
/**
* The last-changed timestamp, which actually indicates last-added or last-seen.
*/
public long getLastChanged() {
return _lastChanged;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(1024);
buf.append(_entries.size());
buf.append(" entries in (").append(_begin).append(',').append(_end);
buf.append(") : ").append(_entries.toString());
return buf.toString();
}
}

View File

@ -0,0 +1,853 @@
package net.i2p.kademlia;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.SimpleDataStructure;
import net.i2p.util.Log;
/**
* In-memory storage of buckets sorted by the XOR metric from the base (us)
* passed in via the constructor.
* This starts with one bucket covering the whole key space, and
* may eventually be split to a max of the number of bits in the data type
* (160 for SHA1Hash or 256 for Hash),
* times 2**(B-1) for Kademlia value B.
*
* Refactored from net.i2p.router.networkdb.kademlia
*/
public class KBucketSet<T extends SimpleDataStructure> {
private final Log _log;
private final I2PAppContext _context;
private final T _us;
/**
* The bucket list is locked by _bucketsLock, however the individual
* buckets are not locked. Users may see buckets that have more than
* the maximum k entries, or may have adds and removes silently fail
* when they appear to succeed.
*
* Closest values are in bucket 0, furthest are in the last bucket.
*/
private final List<KBucket> _buckets;
private final Range<T> _rangeCalc;
private final KBucketTrimmer _trimmer;
/**
* Locked for reading only when traversing all the buckets.
* Locked for writing only when splitting a bucket.
* Adds/removes/gets from individual buckets are not locked.
*/
private final ReentrantReadWriteLock _bucketsLock = new ReentrantReadWriteLock(false);
private final int KEYSIZE_BITS;
private final int NUM_BUCKETS;
private final int BUCKET_SIZE;
private final int B_VALUE;
private final int B_FACTOR;
/**
* Use the default trim strategy, which removes a random entry.
* @param us the local identity (typically a SHA1Hash or Hash)
* The class must have a zero-argument constructor.
* @param max the Kademlia value "k", the max per bucket, k >= 4
* @param b the Kademlia value "b", split buckets an extra 2**(b-1) times,
* b > 0, use 1 for bittorrent, Kademlia paper recommends 5
*/
public KBucketSet(I2PAppContext context, T us, int max, int b) {
this(context, us, max, b, new RandomTrimmer(context, max));
}
/**
* Use the supplied trim strategy.
*/
public KBucketSet(I2PAppContext context, T us, int max, int b, KBucketTrimmer trimmer) {
_us = us;
_context = context;
_log = context.logManager().getLog(KBucketSet.class);
_trimmer = trimmer;
if (max <= 4 || b <= 0 || b > 8)
throw new IllegalArgumentException();
KEYSIZE_BITS = us.length() * 8;
B_VALUE = b;
B_FACTOR = 1 << (b - 1);
NUM_BUCKETS = KEYSIZE_BITS * B_FACTOR;
BUCKET_SIZE = max;
_buckets = createBuckets();
_rangeCalc = new Range(us, B_VALUE);
// this verifies the zero-argument constructor
makeKey(new byte[us.length()]);
}
private void getReadLock() {
_bucketsLock.readLock().lock();
}
/**
* Get the lock if we can. Non-blocking.
* @return true if the lock was acquired
*/
private boolean tryReadLock() {
return _bucketsLock.readLock().tryLock();
}
private void releaseReadLock() {
_bucketsLock.readLock().unlock();
}
/** @return true if the lock was acquired */
private boolean getWriteLock() {
try {
boolean rv = _bucketsLock.writeLock().tryLock(3000, TimeUnit.MILLISECONDS);
if ((!rv) && _log.shouldLog(Log.WARN))
_log.warn("no lock, size is: " + _bucketsLock.getQueueLength(), new Exception("rats"));
return rv;
} catch (InterruptedException ie) {}
return false;
}
private void releaseWriteLock() {
_bucketsLock.writeLock().unlock();
}
/**
* @return true if the peer is new to the bucket it goes in, or false if it was
* already in it. Always returns false on an attempt to add ourselves.
*
*/
public boolean add(T peer) {
KBucket bucket;
getReadLock();
try {
bucket = getBucket(peer);
} finally { releaseReadLock(); }
if (bucket != null) {
if (bucket.add(peer)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + peer + " added to bucket " + bucket);
if (shouldSplit(bucket)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Splitting bucket " + bucket);
split(bucket.getRangeBegin());
//testAudit(this, _log);
}
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + peer + " NOT added to bucket " + bucket);
return false;
}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Failed to add, probably us: " + peer);
return false;
}
}
/**
* No lock required.
* FIXME will split the closest buckets too far if B > 1 and K < 2**B
* Won't ever really happen and if it does it still works.
*/
private boolean shouldSplit(KBucket b) {
return
b.getRangeBegin() != b.getRangeEnd() &&
b.getKeyCount() > BUCKET_SIZE;
}
/**
* Grabs the write lock.
* Caller must NOT have the read lock.
* The bucket should be splittable (range start != range end).
* @param r the range start of the bucket to be split
*/
private void split(int r) {
if (!getWriteLock())
return;
try {
locked_split(r);
} finally { releaseWriteLock(); }
}
/**
* Creates two or more new buckets. The old bucket is replaced and discarded.
*
* Caller must hold write lock
* The bucket should be splittable (range start != range end).
* @param r the range start of the bucket to be split
*/
private void locked_split(int r) {
int b = pickBucket(r);
while (shouldSplit(_buckets.get(b))) {
KBucket<T> b0 = _buckets.get(b);
// Each bucket gets half the keyspace.
// When B_VALUE = 1, or the bucket is larger than B_FACTOR, then
// e.g. 0-159 => 0-158, 159-159
// When B_VALUE > 1, and the bucket is smaller than B_FACTOR, then
// e.g. 1020-1023 => 1020-1021, 1022-1023
int s1, e1, s2, e2;
s1 = b0.getRangeBegin();
e2 = b0.getRangeEnd();
if (B_FACTOR > 1 &&
(s1 & (B_FACTOR - 1)) == 0 &&
((e2 + 1) & (B_FACTOR - 1)) == 0 &&
e2 > s1 + B_FACTOR) {
// The bucket is a "whole" kbucket with a range > B_FACTOR,
// so it should be split into two "whole" kbuckets each with
// a range >= B_FACTOR.
// Log split
s2 = e2 + 1 - B_FACTOR;
} else {
// The bucket is the smallest "whole" kbucket with a range == B_FACTOR,
// or B_VALUE > 1 and the bucket has already been split.
// Start or continue splitting down to a depth B_VALUE.
// Linear split
s2 = s1 + ((1 + e2 - s1) / 2);
}
e1 = s2 - 1;
if (_log.shouldLog(Log.INFO))
_log.info("Splitting (" + s1 + ',' + e2 + ") -> (" + s1 + ',' + e1 + ") (" + s2 + ',' + e2 + ')');
KBucket<T> b1 = createBucket(s1, e1);
KBucket<T> b2 = createBucket(s2, e2);
for (T key : b0.getEntries()) {
if (getRange(key) < s2)
b1.add(key);
else
b2.add(key);
}
_buckets.set(b, b1);
_buckets.add(b + 1, b2);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Split bucket at idx " + b +
":\n" + b0 +
"\ninto: " + b1 +
"\nand: " + b2);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("State is now: " + toString());
if (b2.getKeyCount() > BUCKET_SIZE) {
// should be rare... too hard to call _trimmer from here
// (and definitely not from inside the write lock)
if (_log.shouldLog(Log.INFO))
_log.info("All went into 2nd bucket after split");
}
// loop if all the entries went in the first bucket
}
}
/**
* The current number of entries.
*/
public int size() {
int rv = 0;
getReadLock();
try {
for (KBucket b : _buckets) {
rv += b.getKeyCount();
}
} finally { releaseReadLock(); }
return rv;
}
public boolean remove(T entry) {
KBucket kbucket;
getReadLock();
try {
kbucket = getBucket(entry);
} finally { releaseReadLock(); }
boolean removed = kbucket.remove(entry);
return removed;
}
/** @since 0.8.8 */
public void clear() {
getReadLock();
try {
for (KBucket b : _buckets) {
b.clear();
}
} finally { releaseReadLock(); }
_rangeCalc.clear();
}
/**
* @return a copy in a new set
*/
public Set<T> getAll() {
Set<T> all = new HashSet(256);
getReadLock();
try {
for (KBucket b : _buckets) {
all.addAll(b.getEntries());
}
} finally { releaseReadLock(); }
return all;
}
/**
* @return a copy in a new set
*/
public Set<T> getAll(Set<T> toIgnore) {
Set<T> all = getAll();
all.removeAll(toIgnore);
return all;
}
public void getAll(SelectionCollector<T> collector) {
getReadLock();
try {
for (KBucket b : _buckets) {
b.getEntries(collector);
}
} finally { releaseReadLock(); }
}
/**
* The keys closest to us.
* Returned list will never contain us.
* @return non-null, closest first
*/
public List<T> getClosest(int max) {
return getClosest(max, Collections.EMPTY_SET);
}
/**
* The keys closest to us.
* Returned list will never contain us.
* @return non-null, closest first
*/
public List<T> getClosest(int max, Collection<T> toIgnore) {
List<T> rv = new ArrayList(max);
int count = 0;
getReadLock();
try {
// start at first (closest) bucket
for (int i = 0; i < _buckets.size() && count < max; i++) {
Set<T> entries = _buckets.get(i).getEntries();
// add the whole bucket except for ignores,
// extras will be trimmed after sorting
for (T e : entries) {
if (!toIgnore.contains(e)) {
rv.add(e);
count++;
}
}
}
} finally { releaseReadLock(); }
Comparator comp = new XORComparator(_us);
Collections.sort(rv, comp);
int sz = rv.size();
for (int i = sz - 1; i >= max; i--) {
rv.remove(i);
}
return rv;
}
/**
* The keys closest to the key.
* Returned list will never contain us.
* @return non-null, closest first
*/
public List<T> getClosest(T key, int max) {
return getClosest(key, max, Collections.EMPTY_SET);
}
/**
* The keys closest to the key.
* Returned list will never contain us.
* @return non-null, closest first
*/
public List<T> getClosest(T key, int max, Collection<T> toIgnore) {
if (key.equals(_us))
return getClosest(max, toIgnore);
List<T> rv = new ArrayList(max);
int count = 0;
getReadLock();
try {
int start = pickBucket(key);
// start at closest bucket, then to the smaller (closer to us) buckets
for (int i = start; i >= 0 && count < max; i--) {
Set<T> entries = _buckets.get(i).getEntries();
for (T e : entries) {
if (!toIgnore.contains(e)) {
rv.add(e);
count++;
}
}
}
// then the farther from us buckets if necessary
for (int i = start + 1; i < _buckets.size() && count < max; i++) {
Set<T> entries = _buckets.get(i).getEntries();
for (T e : entries) {
if (!toIgnore.contains(e)) {
rv.add(e);
count++;
}
}
}
} finally { releaseReadLock(); }
Comparator comp = new XORComparator(key);
Collections.sort(rv, comp);
int sz = rv.size();
for (int i = sz - 1; i >= max; i--) {
rv.remove(i);
}
return rv;
}
/**
* The bucket number (NOT the range number) that the xor of the key goes in
* Caller must hold read lock
* @return 0 to max-1 or -1 for us
*/
private int pickBucket(T key) {
int range = getRange(key);
if (range < 0)
return -1;
int rv = pickBucket(range);
if (rv >= 0) {
return rv;
}
_log.error("Key does not fit in any bucket?! WTF!\nKey : ["
+ DataHelper.toHexString(key.getData()) + "]"
+ "\nUs : " + _us
+ "\nDelta: ["
+ DataHelper.toHexString(DataHelper.xor(_us.getData(), key.getData()))
+ "]", new Exception("WTF"));
_log.error(toString());
throw new IllegalStateException("pickBucket returned " + rv);
//return -1;
}
/**
* Returned list is a copy of the bucket list, closest first,
* with the actual buckets (not a copy).
*
* Primarily for testing. You shouldn't ever need to get all the buckets.
* Use getClosest() or getAll() instead to get the keys.
*
* @return non-null
*/
List<KBucket<T>> getBuckets() {
getReadLock();
try {
return new ArrayList(_buckets);
} finally { releaseReadLock(); }
}
/**
* The bucket that the xor of the key goes in
* Caller must hold read lock
* @return null if key is us
*/
private KBucket getBucket(T key) {
int bucket = pickBucket(key);
if (bucket < 0)
return null;
return _buckets.get(bucket);
}
/**
* The bucket number that contains this range number
* Caller must hold read lock or write lock
* @return 0 to max-1 or -1 for us
*/
private int pickBucket(int range) {
// If B is small, a linear search from back to front
// is most efficient since most of the keys are at the end...
// If B is larger, there's a lot of sub-buckets
// of equal size to be checked so a binary search is better
if (B_VALUE <= 3) {
for (int i = _buckets.size() - 1; i >= 0; i--) {
KBucket b = _buckets.get(i);
if (range >= b.getRangeBegin() && range <= b.getRangeEnd())
return i;
}
return -1;
} else {
KBucket dummy = new DummyBucket(range);
return Collections.binarySearch(_buckets, dummy, new BucketComparator());
}
}
private List<KBucket> createBuckets() {
// just an initial size
List<KBucket> buckets = new ArrayList(4 * B_FACTOR);
buckets.add(createBucket(0, NUM_BUCKETS -1));
return buckets;
}
private KBucket createBucket(int start, int end) {
if (end - start >= B_FACTOR &&
(((end + 1) & B_FACTOR - 1) != 0 ||
(start & B_FACTOR - 1) != 0))
throw new IllegalArgumentException("Sub-bkt crosses K-bkt boundary: " + start + '-' + end);
KBucket bucket = new KBucketImpl(_context, start, end, BUCKET_SIZE, _trimmer);
return bucket;
}
/**
* The number of bits minus 1 (range number) for the xor of the key.
* Package private for testing only. Others shouldn't need this.
* @return 0 to max-1 or -1 for us
*/
int getRange(T key) {
return _rangeCalc.getRange(key);
}
/**
* For every bucket that hasn't been updated in this long,
* generate a random key that would be a member of that bucket.
* The returned keys may be searched for to "refresh" the buckets.
* @return non-null, closest first
*/
public List<T> getExploreKeys(long age) {
List<T> rv = new ArrayList(_buckets.size());
long old = _context.clock().now() - age;
getReadLock();
try {
for (KBucket b : _buckets) {
if (b.getLastChanged() < old)
rv.add(generateRandomKey(b));
}
} finally { releaseReadLock(); }
return rv;
}
/**
* Generate a random key to go within this bucket
* Package private for testing only. Others shouldn't need this.
*/
T generateRandomKey(KBucket bucket) {
int begin = bucket.getRangeBegin();
int end = bucket.getRangeEnd();
// number of fixed bits, out of B_VALUE - 1 bits
int fixed = 0;
int bsz = 1 + end - begin;
// compute fixed = B_VALUE - log2(bsz)
// e.g for B=4, B_FACTOR=8, sz 4-> fixed 1, sz 2->fixed 2, sz 1 -> fixed 3
while (bsz < B_FACTOR) {
fixed++;
bsz <<= 1;
}
int fixedBits = 0;
if (fixed > 0) {
// 0x01, 03, 07, 0f, ...
int mask = (1 << fixed) - 1;
// fixed bits masked from begin
fixedBits = (begin >> (B_VALUE - (fixed + 1))) & mask;
}
int obegin = begin;
int oend = end;
begin >>= (B_VALUE - 1);
end >>= (B_VALUE - 1);
// we need randomness for [0, begin) bits
BigInteger variance;
// 00000000rrrr
if (begin > 0)
variance = new BigInteger(begin - fixed, _context.random());
else
variance = BigInteger.ZERO;
// we need nonzero randomness for [begin, end] bits
int numNonZero = 1 + end - begin;
if (numNonZero == 1) {
// 00001000rrrr
variance = variance.setBit(begin);
// fixed bits as the 'main' bucket is split
// 00001fffrrrr
if (fixed > 0)
variance = variance.or(BigInteger.valueOf(fixedBits).shiftLeft(begin - fixed));
} else {
// dont span main bucket boundaries with depth > 1
if (fixed > 0)
throw new IllegalStateException("WTF " + bucket);
BigInteger nonz;
if (numNonZero <= 62) {
// add one to ensure nonzero
long nz = 1 + _context.random().nextLong((1l << numNonZero) - 1);
nonz = BigInteger.valueOf(nz);
} else {
// loop to ensure nonzero
do {
nonz = new BigInteger(numNonZero, _context.random());
} while (nonz.equals(BigInteger.ZERO));
}
// shift left and or-in the nonzero randomness
if (begin > 0)
nonz = nonz.shiftLeft(begin);
// 0000nnnnrrrr
variance = variance.or(nonz);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("SB(" + obegin + ',' + oend + ") KB(" + begin + ',' + end + ") fixed=" + fixed + " fixedBits=" + fixedBits + " numNonZ=" + numNonZero);
byte data[] = variance.toByteArray();
T key = makeKey(data);
byte[] hash = DataHelper.xor(key.getData(), _us.getData());
T rv = makeKey(hash);
// DEBUG
//int range = getRange(rv);
//if (range < obegin || range > oend) {
// throw new IllegalStateException("Generate random key failed range=" + range + " for " + rv + " meant for bucket " + bucket);
//}
return rv;
}
/**
* Make a new SimpleDataStrucure from the data
* @param data size <= SDS length, else throws IAE
* Can be 1 bigger if top byte is zero
*/
private T makeKey(byte[] data) {
int len = _us.length();
int dlen = data.length;
if (dlen > len + 1 ||
(dlen == len + 1 && data[0] != 0))
throw new IllegalArgumentException("bad length " + dlen + " > " + len);
T rv;
try {
rv = (T) _us.getClass().newInstance();
} catch (Exception e) {
_log.error("fail", e);
throw new RuntimeException(e);
}
if (dlen == len) {
rv.setData(data);
} else {
byte[] ndata = new byte[len];
if (dlen == len + 1) {
// one bigger
System.arraycopy(data, 1, ndata, 0, len);
} else {
// smaller
System.arraycopy(data, 0, ndata, len - dlen, dlen);
}
rv.setData(ndata);
}
return rv;
}
private static class Range<T extends SimpleDataStructure> {
private final int _bValue;
private final BigInteger _bigUs;
private final Map<T, Integer> _distanceCache;
public Range(T us, int bValue) {
_bValue = bValue;
_bigUs = new BigInteger(1, us.getData());
_distanceCache = new LHM(256);
}
/** @return 0 to max-1 or -1 for us */
public int getRange(T key) {
Integer rv;
synchronized (_distanceCache) {
rv = _distanceCache.get(key);
if (rv == null) {
// easy way when _bValue == 1
//rv = Integer.valueOf(_bigUs.xor(new BigInteger(1, key.getData())).bitLength() - 1);
BigInteger xor = _bigUs.xor(new BigInteger(1, key.getData()));
int range = xor.bitLength() - 1;
if (_bValue > 1) {
int toShift = range + 1 - _bValue;
int highbit = range;
range <<= _bValue - 1;
if (toShift >= 0) {
int extra = xor.clearBit(highbit).shiftRight(toShift).intValue();
range += extra;
//Log log = I2PAppContext.getGlobalContext().logManager().getLog(KBucketSet.class);
//if (log.shouldLog(Log.DEBUG))
// log.debug("highbit " + highbit + " toshift " + toShift + " extra " + extra + " new " + range);
}
}
rv = Integer.valueOf(range);
_distanceCache.put(key, rv);
}
}
return rv.intValue();
}
public void clear() {
synchronized (_distanceCache) {
_distanceCache.clear();
}
}
}
private static class LHM<K, V> extends LinkedHashMap<K, V> {
private final int _max;
public LHM(int max) {
super(max, 0.75f, true);
_max = max;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > _max;
}
}
/**
* For Collections.binarySearch.
* getRangeBegin == getRangeEnd.
*/
private static class DummyBucket<T extends SimpleDataStructure> implements KBucket<T> {
private final int r;
public DummyBucket(int range) {
r = range;
}
public int getRangeBegin() { return r; }
public int getRangeEnd() { return r; }
public int getKeyCount() {
return 0;
}
public Set<T> getEntries() {
throw new UnsupportedOperationException();
}
public void getEntries(SelectionCollector<T> collector) {
throw new UnsupportedOperationException();
}
public void clear() {}
public boolean add(T peer) {
throw new UnsupportedOperationException();
}
public boolean remove(T peer) {
return false;
}
public void setLastChanged() {}
public long getLastChanged() {
return 0;
}
}
/**
* For Collections.binarySearch.
* Returns equal for any overlap.
*/
private static class BucketComparator implements Comparator<KBucket> {
public int compare(KBucket l, KBucket r) {
if (l.getRangeEnd() < r.getRangeBegin())
return -1;
if (l.getRangeBegin() > r.getRangeEnd())
return 1;
return 0;
}
}
/**
* Called when a kbucket can no longer be split and is too big
*/
public interface KBucketTrimmer<K extends SimpleDataStructure> {
/**
* Called from add() just before adding the entry.
* You may call getEntries() and/or remove() from here.
* Do NOT call add().
* To always discard a newer entry, always return false.
*
* @param kbucket the kbucket that is now too big
* @return true to actually add the entry.
*/
public boolean trim(KBucket<K> kbucket, K toAdd);
}
/**
* Removes a random element. Not resistant to flooding.
*/
public static class RandomTrimmer<T extends SimpleDataStructure> implements KBucketTrimmer<T> {
protected final I2PAppContext _ctx;
private final int _max;
public RandomTrimmer(I2PAppContext ctx, int max) {
_ctx = ctx;
_max = max;
}
public boolean trim(KBucket<T> kbucket, T toAdd) {
List<T> e = new ArrayList(kbucket.getEntries());
int sz = e.size();
// concurrency
if (sz < _max)
return true;
T toRemove = e.get(_ctx.random().nextInt(sz));
return kbucket.remove(toRemove);
}
}
/**
* Removes a random element, but only if the bucket hasn't changed in 5 minutes.
*/
public static class RandomIfOldTrimmer<T extends SimpleDataStructure> extends RandomTrimmer<T> {
public RandomIfOldTrimmer(I2PAppContext ctx, int max) {
super(ctx, max);
}
public boolean trim(KBucket<T> kbucket, T toAdd) {
if (kbucket.getLastChanged() > _ctx.clock().now() - 5*60*1000)
return false;
return super.trim(kbucket, toAdd);
}
}
/**
* Removes nothing and always rejects the add. Flood resistant..
*/
public static class RejectTrimmer<T extends SimpleDataStructure> implements KBucketTrimmer<T> {
public boolean trim(KBucket<T> kbucket, T toAdd) {
return false;
}
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(1024);
buf.append("Bucket set rooted on: ").append(_us.toString())
.append(" K= ").append(BUCKET_SIZE)
.append(" B= ").append(B_VALUE)
.append(" with ").append(size())
.append(" keys in ").append(_buckets.size()).append(" buckets:\n");
getReadLock();
try {
int len = _buckets.size();
for (int i = 0; i < len; i++) {
KBucket b = _buckets.get(i);
buf.append("* Bucket ").append(i).append("/").append(len).append(": ");
buf.append(b.toString()).append("\n");
}
} finally { releaseReadLock(); }
return buf.toString();
}
}

View File

@ -0,0 +1,10 @@
package net.i2p.kademlia;
import net.i2p.data.SimpleDataStructure;
/**
* Visit kbuckets, gathering matches
*/
public interface SelectionCollector<T extends SimpleDataStructure> {
public void add(T entry);
}

View File

@ -0,0 +1,27 @@
package net.i2p.kademlia;
import java.util.Comparator;
import net.i2p.data.DataHelper;
import net.i2p.data.SimpleDataStructure;
/**
* Help sort Hashes in relation to a base key using the XOR metric
*
*/
class XORComparator<T extends SimpleDataStructure> implements Comparator<T> {
private final byte[] _base;
/**
* @param target key to compare distances with
*/
public XORComparator(T target) {
_base = target.getData();
}
public int compare(T lhs, T rhs) {
byte lhsDelta[] = DataHelper.xor(lhs.getData(), _base);
byte rhsDelta[] = DataHelper.xor(rhs.getData(), _base);
return DataHelper.compareTo(lhsDelta, rhsDelta);
}
}

View File

@ -0,0 +1,6 @@
<html><body><p>
This is a major rewrite of KBucket, KBucketSet, and KBucketImpl from net.i2p.router.networkdb.kademlia.
The classes are now generic to support SHA1. SHA256, or other key lengths.
The long-term goal is to prove out this new implementation in i2psnark,
then move it to core, then convert the network database to use it.
</p></body></html>

View File

@ -28,6 +28,9 @@ abstract class ExtensionHandler {
public static final int ID_PEX = 2;
/** not ut_pex since the compact format is different */
public static final String TYPE_PEX = "i2p_pex";
public static final int ID_DHT = 3;
/** not using the option bit since the compact format is different */
public static final String TYPE_DHT = "i2p_dht";
/** Pieces * SHA1 Hash length, + 25% extra for file names, benconding overhead, etc */
private static final int MAX_METADATA_SIZE = Storage.MAX_PIECES * 20 * 5 / 4;
private static final int PARALLEL_REQUESTS = 3;
@ -36,9 +39,10 @@ abstract class ExtensionHandler {
/**
* @param metasize -1 if unknown
* @param pexAndMetadata advertise these capabilities
* @param dht advertise DHT capability
* @return bencoded outgoing handshake message
*/
public static byte[] getHandshake(int metasize, boolean pexAndMetadata) {
public static byte[] getHandshake(int metasize, boolean pexAndMetadata, boolean dht) {
Map<String, Object> handshake = new HashMap();
Map<String, Integer> m = new HashMap();
if (pexAndMetadata) {
@ -47,6 +51,9 @@ abstract class ExtensionHandler {
if (metasize >= 0)
handshake.put("metadata_size", Integer.valueOf(metasize));
}
if (dht) {
m.put(TYPE_DHT, Integer.valueOf(ID_DHT));
}
// include the map even if empty so the far-end doesn't NPE
handshake.put("m", m);
handshake.put("p", Integer.valueOf(6881));
@ -65,6 +72,8 @@ abstract class ExtensionHandler {
handleMetadata(peer, listener, bs, log);
else if (id == ID_PEX)
handlePEX(peer, listener, bs, log);
else if (id == ID_DHT)
handleDHT(peer, listener, bs, log);
else if (log.shouldLog(Log.INFO))
log.info("Unknown extension msg " + id + " from " + peer);
}
@ -87,6 +96,12 @@ abstract class ExtensionHandler {
// peer state calls peer listener calls sendPEX()
}
if (msgmap.get(TYPE_DHT) != null) {
if (log.shouldLog(Log.DEBUG))
log.debug("Peer supports DHT extension: " + peer);
// peer state calls peer listener calls sendDHT()
}
MagnetState state = peer.getMagnetState();
if (msgmap.get(TYPE_METADATA) == null) {
@ -335,6 +350,28 @@ abstract class ExtensionHandler {
}
}
/**
* Receive the DHT port numbers
* @since DHT
*/
private static void handleDHT(Peer peer, PeerListener listener, byte[] bs, Log log) {
if (log.shouldLog(Log.DEBUG))
log.debug("Got DHT msg from " + peer);
try {
InputStream is = new ByteArrayInputStream(bs);
BDecoder dec = new BDecoder(is);
BEValue bev = dec.bdecodeMap();
Map<String, BEValue> map = bev.getMap();
int qport = map.get("port").getInt();
int rport = map.get("rport").getInt();
listener.gotPort(peer, qport, rport);
} catch (Exception e) {
if (log.shouldLog(Log.INFO))
log.info("DHT msg exception from " + peer, e);
//peer.disconnect(false);
}
}
/**
* added.f and dropped unsupported
* @param pList non-null
@ -362,4 +399,22 @@ abstract class ExtensionHandler {
}
}
/**
* Send the DHT port numbers
* @since DHT
*/
public static void sendDHT(Peer peer, int qport, int rport) {
Map<String, Object> map = new HashMap();
map.put("port", Integer.valueOf(qport));
map.put("rport", Integer.valueOf(rport));
byte[] payload = BEncoder.bencode(map);
try {
int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get(TYPE_DHT).getInt();
peer.sendExtension(hisMsgCode, payload);
} catch (Exception e) {
// NPE, no DHT caps
//if (log.shouldLog(Log.INFO))
// log.info("DHT msg exception to " + peer, e);
}
}
}

View File

@ -37,7 +37,7 @@ import net.i2p.util.SimpleTimer;
import net.i2p.util.Translate;
import org.klomp.snark.dht.DHT;
//import org.klomp.snark.dht.KRPC;
import org.klomp.snark.dht.KRPC;
/**
* I2P specific helpers for I2PSnark
@ -65,6 +65,7 @@ public class I2PSnarkUtil {
private final File _tmpDir;
private int _startupDelay;
private boolean _shouldUseOT;
private boolean _shouldUseDHT;
private boolean _areFilesPublic;
private List<String> _openTrackers;
private DHT _dht;
@ -77,7 +78,7 @@ public class I2PSnarkUtil {
public static final int DEFAULT_MAX_UP_BW = 8; //KBps
public static final int MAX_CONNECTIONS = 16; // per torrent
public static final String PROP_MAX_BW = "i2cp.outboundBytesPerSecond";
//private static final boolean ENABLE_DHT = true;
public static final boolean DEFAULT_USE_DHT = false;
public I2PSnarkUtil(I2PAppContext ctx) {
_context = ctx;
@ -94,6 +95,7 @@ public class I2PSnarkUtil {
_shouldUseOT = DEFAULT_USE_OPENTRACKERS;
// FIXME split if default has more than one
_openTrackers = Collections.singletonList(DEFAULT_OPENTRACKERS);
_shouldUseDHT = DEFAULT_USE_DHT;
// This is used for both announce replies and .torrent file downloads,
// so it must be available even if not connected to I2CP.
// so much for multiple instances
@ -241,12 +243,14 @@ public class I2PSnarkUtil {
opts.setProperty("i2p.streaming.maxTotalConnsPerMinute", "8");
if (opts.getProperty("i2p.streaming.maxConnsPerHour") == null)
opts.setProperty("i2p.streaming.maxConnsPerHour", "20");
if (opts.getProperty("i2p.streaming.enforceProtocol") == null)
opts.setProperty("i2p.streaming.enforceProtocol", "true");
_manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts);
_connecting = false;
}
// FIXME this only instantiates krpc once, left stuck with old manager
//if (ENABLE_DHT && _manager != null && _dht == null)
// _dht = new KRPC(_context, _manager.getSession());
if (_shouldUseDHT && _manager != null && _dht == null)
_dht = new KRPC(_context, _manager.getSession());
return (_manager != null);
}
@ -273,7 +277,11 @@ public class I2PSnarkUtil {
/**
* Destroy the destination itself
*/
public void disconnect() {
public synchronized void disconnect() {
if (_dht != null) {
_dht.stop();
_dht = null;
}
I2PSocketManager mgr = _manager;
// FIXME this can cause race NPEs elsewhere
_manager = null;
@ -447,7 +455,8 @@ public class I2PSnarkUtil {
if (sess != null) {
byte[] b = Base32.decode(ip.substring(0, BASE32_HASH_LENGTH));
if (b != null) {
Hash h = new Hash(b);
//Hash h = new Hash(b);
Hash h = Hash.create(b);
if (_log.shouldLog(Log.INFO))
_log.info("Using existing session for lookup of " + ip);
try {
@ -522,6 +531,22 @@ public class I2PSnarkUtil {
public boolean shouldUseOpenTrackers() {
return _shouldUseOT;
}
/** @since DHT */
public synchronized void setUseDHT(boolean yes) {
_shouldUseDHT = yes;
if (yes && _manager != null && _dht == null) {
_dht = new KRPC(_context, _manager.getSession());
} else if (!yes && _dht != null) {
_dht.stop();
_dht = null;
}
}
/** @since DHT */
public boolean shouldUseDHT() {
return _shouldUseDHT;
}
/**
* Like DataHelper.toHexString but ensures no loss of leading zero bytes

View File

@ -80,7 +80,9 @@ public class Peer implements Comparable
static final long OPTION_FAST = 0x0000000000000004l;
static final long OPTION_DHT = 0x0000000000000001l;
/** we use a different bit since the compact format is different */
/* no, let's use an extension message
static final long OPTION_I2P_DHT = 0x0000000040000000l;
*/
static final long OPTION_AZMP = 0x1000000000000000l;
private long options;
@ -269,15 +271,17 @@ public class Peer implements Comparable
_log.debug("Peer supports extensions, sending reply message");
int metasize = metainfo != null ? metainfo.getInfoBytes().length : -1;
boolean pexAndMetadata = metainfo == null || !metainfo.isPrivate();
out.sendExtension(0, ExtensionHandler.getHandshake(metasize, pexAndMetadata));
boolean dht = util.getDHT() != null;
out.sendExtension(0, ExtensionHandler.getHandshake(metasize, pexAndMetadata, dht));
}
if ((options & OPTION_I2P_DHT) != 0 && util.getDHT() != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer supports DHT, sending PORT message");
int port = util.getDHT().getPort();
out.sendPort(port);
}
// Old DHT PORT message
//if ((options & OPTION_I2P_DHT) != 0 && util.getDHT() != null) {
// if (_log.shouldLog(Log.DEBUG))
// _log.debug("Peer supports DHT, sending PORT message");
// int port = util.getDHT().getPort();
// out.sendPort(port);
//}
// Send our bitmap
if (bitfield != null)

View File

@ -117,9 +117,8 @@ public class PeerAcceptor
}
} else {
// multitorrent capable, so lets see what we can handle
for (Iterator iter = coordinators.iterator(); iter.hasNext(); ) {
PeerCoordinator cur = (PeerCoordinator)iter.next();
PeerCoordinator cur = coordinators.get(peerInfoHash);
if (cur != null) {
if (DataHelper.eq(cur.getInfoHash(), peerInfoHash)) {
if (cur.needPeers())
{

View File

@ -28,6 +28,8 @@ import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
import org.klomp.snark.dht.DHT;
/**
* TimerTask that checks for good/bad up/downloader. Works together
* with the PeerCoordinator to select which Peers get (un)choked.
@ -74,6 +76,7 @@ class PeerCheckerTask implements Runnable
List<Peer> removed = new ArrayList();
int uploadLimit = coordinator.allowedUploaders();
boolean overBWLimit = coordinator.overUpBWLimit();
DHT dht = _util.getDHT();
for (Peer peer : peerList) {
// Remove dying peers
@ -218,8 +221,8 @@ class PeerCheckerTask implements Runnable
if (coordinator.getNeededLength() > 0 || !peer.isCompleted())
peer.keepAlive();
// announce them to local tracker (TrackerClient does this too)
if (_util.getDHT() != null && (_runCount % 5) == 0) {
_util.getDHT().announce(coordinator.getInfoHash(), peer.getPeerID().getDestHash());
if (dht != null && (_runCount % 5) == 0) {
dht.announce(coordinator.getInfoHash(), peer.getPeerID().getDestHash());
}
}
@ -267,8 +270,8 @@ class PeerCheckerTask implements Runnable
}
// announce ourselves to local tracker (TrackerClient does this too)
if (_util.getDHT() != null && (_runCount % 16) == 0) {
_util.getDHT().announce(coordinator.getInfoHash());
if (dht != null && (_runCount % 16) == 0) {
dht.announce(coordinator.getInfoHash());
}
}
}

View File

@ -1287,6 +1287,7 @@ class PeerCoordinator implements PeerListener
}
} else if (id == ExtensionHandler.ID_HANDSHAKE) {
sendPeers(peer);
sendDHT(peer);
}
}
@ -1315,6 +1316,26 @@ class PeerCoordinator implements PeerListener
} catch (InvalidBEncodingException ibee) {}
}
/**
* Send a DHT message to the peer, if we both support DHT.
* @since DHT
*/
void sendDHT(Peer peer) {
DHT dht = _util.getDHT();
if (dht == null)
return;
Map<String, BEValue> handshake = peer.getHandshakeMap();
if (handshake == null)
return;
BEValue bev = handshake.get("m");
if (bev == null)
return;
try {
if (bev.getMap().get(ExtensionHandler.TYPE_DHT) != null)
ExtensionHandler.sendDHT(peer, dht.getPort(), dht.getRPort());
} catch (InvalidBEncodingException ibee) {}
}
/**
* Sets the storage after transition out of magnet mode
* Snark calls this after we call gotMetaInfo()
@ -1332,11 +1353,13 @@ class PeerCoordinator implements PeerListener
/**
* PeerListener callback
* Tell the DHT to ping it, this will get back the node info
* @param rport must be port + 1
* @since 0.8.4
*/
public void gotPort(Peer peer, int port) {
public void gotPort(Peer peer, int port, int rport) {
DHT dht = _util.getDHT();
if (dht != null)
if (dht != null &&
port > 0 && port < 65535 && rport == port + 1)
dht.ping(peer.getDestination(), port);
}

View File

@ -1,9 +1,10 @@
package org.klomp.snark;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.crypto.SHA1Hash;
/**
* Hmm, any guesses as to what this is? Used by the multitorrent functionality
@ -12,26 +13,28 @@ import java.util.Set;
* from it there too)
*/
public class PeerCoordinatorSet {
private final Set _coordinators;
private final Map<SHA1Hash, PeerCoordinator> _coordinators;
public PeerCoordinatorSet() {
_coordinators = new HashSet();
_coordinators = new ConcurrentHashMap();
}
public Iterator iterator() {
synchronized (_coordinators) {
return new ArrayList(_coordinators).iterator();
}
public Iterator<PeerCoordinator> iterator() {
return _coordinators.values().iterator();
}
public void add(PeerCoordinator coordinator) {
synchronized (_coordinators) {
_coordinators.add(coordinator);
}
_coordinators.put(new SHA1Hash(coordinator.getInfoHash()), coordinator);
}
public void remove(PeerCoordinator coordinator) {
synchronized (_coordinators) {
_coordinators.remove(coordinator);
}
_coordinators.remove(new SHA1Hash(coordinator.getInfoHash()));
}
/**
* @since 0.9.2
*/
public PeerCoordinator get(byte[] infoHash) {
return _coordinators.get(new SHA1Hash(infoHash));
}
}

View File

@ -190,13 +190,14 @@ interface PeerListener
void gotExtension(Peer peer, int id, byte[] bs);
/**
* Called when a port message is received.
* Called when a DHT port message is received.
*
* @param peer the Peer that got the message.
* @param port the port
* @param port the query port
* @param rport the response port
* @since 0.8.4
*/
void gotPort(Peer peer, int port);
void gotPort(Peer peer, int port, int rport);
/**
* Called when peers are received via PEX

View File

@ -526,10 +526,14 @@ class PeerState implements DataLoader
setInteresting(true);
}
/** @since 0.8.4 */
/**
* Unused
* @since 0.8.4
*/
void portMessage(int port)
{
listener.gotPort(peer, port);
// for compatibility with old DHT PORT message
listener.gotPort(peer, port, port + 1);
}
void unknownMessage(int type, byte[] bs)

View File

@ -230,7 +230,7 @@ public class Snark
private volatile boolean stopped;
private volatile boolean starting;
private byte[] id;
private byte[] infoHash;
private final byte[] infoHash;
private String additionalTrackerURL;
private final I2PSnarkUtil _util;
private final Log _log;
@ -321,6 +321,7 @@ public class Snark
meta = null;
File f = null;
InputStream in = null;
byte[] x_infoHash = null;
try
{
f = new File(torrent);
@ -343,7 +344,7 @@ public class Snark
throw new IOException("not found");
}
meta = new MetaInfo(in);
infoHash = meta.getInfoHash();
x_infoHash = meta.getInfoHash();
}
catch(IOException ioe)
{
@ -384,6 +385,7 @@ public class Snark
try { in.close(); } catch (IOException ioe) {}
}
infoHash = x_infoHash; // final
if (_log.shouldLog(Log.INFO))
_log.info(meta.toString());
@ -1210,8 +1212,8 @@ public class Snark
if (_peerCoordinatorSet == null || uploaders <= 0)
return false;
int totalUploaders = 0;
for (Iterator iter = _peerCoordinatorSet.iterator(); iter.hasNext(); ) {
PeerCoordinator c = (PeerCoordinator)iter.next();
for (Iterator<PeerCoordinator> iter = _peerCoordinatorSet.iterator(); iter.hasNext(); ) {
PeerCoordinator c = iter.next();
if (!c.halted())
totalUploaders += c.uploaders;
}
@ -1224,8 +1226,8 @@ public class Snark
if (_peerCoordinatorSet == null)
return false;
long total = 0;
for (Iterator iter = _peerCoordinatorSet.iterator(); iter.hasNext(); ) {
PeerCoordinator c = (PeerCoordinator)iter.next();
for (Iterator<PeerCoordinator> iter = _peerCoordinatorSet.iterator(); iter.hasNext(); ) {
PeerCoordinator c = iter.next();
if (!c.halted())
total += c.getCurrentUploadRate();
}

View File

@ -92,6 +92,7 @@ public class SnarkManager implements Snark.CompleteListener {
private static final String PROP_USE_OPENTRACKERS = "i2psnark.useOpentrackers";
public static final String PROP_OPENTRACKERS = "i2psnark.opentrackers";
public static final String PROP_PRIVATETRACKERS = "i2psnark.privatetrackers";
private static final String PROP_USE_DHT = "i2psnark.enableDHT";
public static final int MIN_UP_BW = 2;
public static final int DEFAULT_MAX_UP_BW = 10;
@ -290,6 +291,9 @@ public class SnarkManager implements Snark.CompleteListener {
_config.setProperty(PROP_STARTUP_DELAY, Integer.toString(DEFAULT_STARTUP_DELAY));
if (!_config.containsKey(PROP_THEME))
_config.setProperty(PROP_THEME, DEFAULT_THEME);
// no, so we can switch default to true later
//if (!_config.containsKey(PROP_USE_DHT))
// _config.setProperty(PROP_USE_DHT, Boolean.toString(I2PSnarkUtil.DEFAULT_USE_DHT));
updateConfig();
}
/**
@ -380,6 +384,9 @@ public class SnarkManager implements Snark.CompleteListener {
String useOT = _config.getProperty(PROP_USE_OPENTRACKERS);
boolean bOT = useOT == null || Boolean.valueOf(useOT).booleanValue();
_util.setUseOpenTrackers(bOT);
// careful, so we can switch default to true later
_util.setUseDHT(Boolean.valueOf(_config.getProperty(PROP_USE_DHT,
Boolean.toString(I2PSnarkUtil.DEFAULT_USE_DHT))).booleanValue());
getDataDir().mkdirs();
initTrackerMap();
}
@ -398,7 +405,7 @@ public class SnarkManager implements Snark.CompleteListener {
public void updateConfig(String dataDir, boolean filesPublic, boolean autoStart, String refreshDelay,
String startDelay, String seedPct, String eepHost,
String eepPort, String i2cpHost, String i2cpPort, String i2cpOpts,
String upLimit, String upBW, boolean useOpenTrackers, String theme) {
String upLimit, String upBW, boolean useOpenTrackers, boolean useDHT, String theme) {
boolean changed = false;
//if (eepHost != null) {
// // unused, we use socket eepget
@ -582,6 +589,17 @@ public class SnarkManager implements Snark.CompleteListener {
_util.setUseOpenTrackers(useOpenTrackers);
changed = true;
}
if (_util.shouldUseDHT() != useDHT) {
_config.setProperty(PROP_USE_DHT, Boolean.toString(useDHT));
if (useDHT)
addMessage(_("Enabled DHT."));
else
addMessage(_("Disabled DHT."));
if (_util.connected())
addMessage(_("DHT change requires tunnel shutdown and reopen"));
_util.setUseDHT(useDHT);
changed = true;
}
if (theme != null) {
if(!theme.equals(_config.getProperty(PROP_THEME))) {
_config.setProperty(PROP_THEME, theme);

View File

@ -43,6 +43,8 @@ import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
import org.klomp.snark.dht.DHT;
/**
* Informs metainfo tracker of events and gets new peers for peer
* coordinator.
@ -323,8 +325,9 @@ public class TrackerClient implements Runnable {
}
// Local DHT tracker announce
if (_util.getDHT() != null)
_util.getDHT().announce(snark.getInfoHash());
DHT dht = _util.getDHT();
if (dht != null)
dht.announce(snark.getInfoHash());
long uploaded = coordinator.getUploaded();
long downloaded = coordinator.getDownloaded();
@ -372,9 +375,10 @@ public class TrackerClient implements Runnable {
snark.setTrackerSeenPeers(tr.seenPeers);
// pass everybody over to our tracker
if (_util.getDHT() != null) {
dht = _util.getDHT();
if (dht != null) {
for (Peer peer : peers) {
_util.getDHT().announce(snark.getInfoHash(), peer.getPeerID().getDestHash());
dht.announce(snark.getInfoHash(), peer.getPeerID().getDestHash());
}
}
@ -458,19 +462,21 @@ public class TrackerClient implements Runnable {
// Get peers from DHT
// FIXME this needs to be in its own thread
if (_util.getDHT() != null && (meta == null || !meta.isPrivate()) && !stop) {
dht = _util.getDHT();
if (dht != null && (meta == null || !meta.isPrivate()) && !stop) {
int numwant;
if (event.equals(STOPPED_EVENT) || !coordinator.needOutboundPeers())
numwant = 1;
else
numwant = _util.getMaxConnections();
List<Hash> hashes = _util.getDHT().getPeers(snark.getInfoHash(), numwant, 2*60*1000);
List<Hash> hashes = dht.getPeers(snark.getInfoHash(), numwant, 2*60*1000);
if (_log.shouldLog(Log.INFO))
_log.info("Got " + hashes + " from DHT");
// announce ourselves while the token is still good
// FIXME this needs to be in its own thread
if (!stop) {
int good = _util.getDHT().announce(snark.getInfoHash(), 8, 5*60*1000);
// announce only to the 1 closest
int good = dht.announce(snark.getInfoHash(), 1, 5*60*1000);
if (_log.shouldLog(Log.INFO))
_log.info("Sent " + good + " good announces to DHT");
}
@ -547,8 +553,9 @@ public class TrackerClient implements Runnable {
*/
private void unannounce() {
// Local DHT tracker unannounce
if (_util.getDHT() != null)
_util.getDHT().unannounce(snark.getInfoHash());
DHT dht = _util.getDHT();
if (dht != null)
dht.unannounce(snark.getInfoHash());
int i = 0;
for (Tracker tr : trackers) {
if (_util.connected() &&

View File

@ -17,10 +17,15 @@ public interface DHT {
/**
* @return The UDP port that should be included in a PORT message.
* @return The UDP query port
*/
public int getPort();
/**
* @return The UDP response port
*/
public int getRPort();
/**
* Ping. We don't have a NID yet so the node is presumed
* to be absent from our DHT.
@ -79,4 +84,14 @@ public interface DHT {
* @return the number of successful announces, not counting ourselves.
*/
public int announce(byte[] ih, int max, long maxWait);
/**
* Stop everything.
*/
public void stop();
/**
* Known nodes, not estimated total network size.
*/
public int size();
}

View File

@ -0,0 +1,161 @@
package org.klomp.snark.dht;
/*
* From zzzot, modded and relicensed to GPLv2
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext;
import net.i2p.crypto.SHA1Hash;
import net.i2p.data.DataHelper;
import net.i2p.kademlia.KBucketSet;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* All the nodes we know about, stored as a mapping from
* node ID to a Destination and Port.
*
* And a real Kademlia routing table, which stores node IDs only.
*
* @since 0.8.4
* @author zzz
*/
class DHTNodes {
private final I2PAppContext _context;
private long _expireTime;
private final Log _log;
private final ConcurrentHashMap<NID, NodeInfo> _nodeMap;
private final KBucketSet<NID> _kad;
private volatile boolean _isRunning;
/** stagger with other cleaners */
private static final long CLEAN_TIME = 237*1000;
private static final long MAX_EXPIRE_TIME = 60*60*1000;
private static final long MIN_EXPIRE_TIME = 5*60*1000;
private static final long DELTA_EXPIRE_TIME = 7*60*1000;
private static final int MAX_PEERS = 999;
public DHTNodes(I2PAppContext ctx, NID me) {
_context = ctx;
_expireTime = MAX_EXPIRE_TIME;
_log = _context.logManager().getLog(DHTNodes.class);
_nodeMap = new ConcurrentHashMap();
_kad = new KBucketSet(ctx, me, 8, 1);
}
public void start() {
_isRunning = true;
new Cleaner();
}
public void stop() {
clear();
_isRunning = false;
}
// begin ConcurrentHashMap methods
public int size() {
return _nodeMap.size();
}
public void clear() {
_kad.clear();
_nodeMap.clear();
}
public NodeInfo get(NID nid) {
return _nodeMap.get(nid);
}
/**
* @return the old value if present, else null
*/
public NodeInfo putIfAbsent(NodeInfo nInfo) {
_kad.add(nInfo.getNID());
return _nodeMap.putIfAbsent(nInfo.getNID(), nInfo);
}
public NodeInfo remove(NID nid) {
_kad.remove(nid);
return _nodeMap.remove(nid);
}
public Collection<NodeInfo> values() {
return _nodeMap.values();
}
// end ConcurrentHashMap methods
/**
* DHT
* @param h either a InfoHash or a NID
*/
public List<NodeInfo> findClosest(SHA1Hash h, int numWant) {
NID key;
if (h instanceof NID)
key = (NID) h;
else
key = new NID(h.getData());
List<NID> keys = _kad.getClosest(key, numWant);
List<NodeInfo> rv = new ArrayList(keys.size());
for (NID nid : keys) {
NodeInfo ninfo = _nodeMap.get(nid);
if (ninfo != null)
rv.add(ninfo);
}
return rv;
}
/**
* DHT - get random keys to explore
*/
public List<NID> getExploreKeys() {
return _kad.getExploreKeys(15*60*1000);
}
/** */
private class Cleaner extends SimpleTimer2.TimedEvent {
public Cleaner() {
super(SimpleTimer2.getInstance(), CLEAN_TIME);
}
public void timeReached() {
if (!_isRunning)
return;
long now = _context.clock().now();
int peerCount = 0;
for (Iterator<NodeInfo> iter = DHTNodes.this.values().iterator(); iter.hasNext(); ) {
NodeInfo peer = iter.next();
if (peer.lastSeen() < now - _expireTime) {
iter.remove();
_kad.remove(peer.getNID());
} else {
peerCount++;
}
}
if (peerCount > MAX_PEERS)
_expireTime = Math.max(_expireTime - DELTA_EXPIRE_TIME, MIN_EXPIRE_TIME);
else
_expireTime = Math.min(_expireTime + DELTA_EXPIRE_TIME, MAX_EXPIRE_TIME);
if (_log.shouldLog(Log.DEBUG))
_log.debug("DHT storage cleaner done, now with " +
peerCount + " peers, " +
DataHelper.formatDuration(_expireTime) + " expiration");
schedule(CLEAN_TIME);
}
}
}

View File

@ -0,0 +1,143 @@
package org.klomp.snark.dht;
/*
* From zzzot, relicensed to GPLv2
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* The tracker stores peers, i.e. Dest hashes (not nodes).
*
* @since 0.8.4
* @author zzz
*/
class DHTTracker {
private final I2PAppContext _context;
private final Torrents _torrents;
private long _expireTime;
private final Log _log;
private volatile boolean _isRunning;
/** stagger with other cleaners */
private static final long CLEAN_TIME = 199*1000;
/** make this longer than postman's tracker */
private static final long MAX_EXPIRE_TIME = 95*60*1000;
private static final long MIN_EXPIRE_TIME = 5*60*1000;
private static final long DELTA_EXPIRE_TIME = 7*60*1000;
private static final int MAX_PEERS = 2000;
DHTTracker(I2PAppContext ctx) {
_context = ctx;
_torrents = new Torrents();
_expireTime = MAX_EXPIRE_TIME;
_log = _context.logManager().getLog(DHTTracker.class);
}
public void start() {
_isRunning = true;
new Cleaner();
}
void stop() {
_torrents.clear();
_isRunning = false;
}
void announce(InfoHash ih, Hash hash) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Announce " + hash + " for " + ih);
Peers peers = _torrents.get(ih);
if (peers == null) {
peers = new Peers();
Peers peers2 = _torrents.putIfAbsent(ih, peers);
if (peers2 != null)
peers = peers2;
}
Peer peer = new Peer(hash.getData());
Peer peer2 = peers.putIfAbsent(peer, peer);
if (peer2 != null)
peer = peer2;
peer.setLastSeen(_context.clock().now());
}
void unannounce(InfoHash ih, Hash hash) {
Peers peers = _torrents.get(ih);
if (peers == null)
return;
Peer peer = new Peer(hash.getData());
peers.remove(peer);
}
/**
* Caller's responsibility to remove himself from the list
* @return list or empty list (never null)
*/
List<Hash> getPeers(InfoHash ih, int max) {
Peers peers = _torrents.get(ih);
if (peers == null)
return Collections.EMPTY_LIST;
int size = peers.size();
List<Hash> rv = new ArrayList(peers.values());
if (max < size) {
Collections.shuffle(rv, _context.random());
rv = rv.subList(0, max);
}
return rv;
}
private class Cleaner extends SimpleTimer2.TimedEvent {
public Cleaner() {
super(SimpleTimer2.getInstance(), CLEAN_TIME);
}
public void timeReached() {
if (!_isRunning)
return;
long now = _context.clock().now();
int torrentCount = 0;
int peerCount = 0;
for (Iterator<Peers> iter = _torrents.values().iterator(); iter.hasNext(); ) {
Peers p = iter.next();
int recent = 0;
for (Iterator<Peer> iterp = p.values().iterator(); iterp.hasNext(); ) {
Peer peer = iterp.next();
if (peer.lastSeen() < now - _expireTime)
iterp.remove();
else {
recent++;
peerCount++;
}
}
if (recent <= 0)
iter.remove();
else
torrentCount++;
}
if (peerCount > MAX_PEERS)
_expireTime = Math.max(_expireTime - DELTA_EXPIRE_TIME, MIN_EXPIRE_TIME);
else
_expireTime = Math.min(_expireTime + DELTA_EXPIRE_TIME, MAX_EXPIRE_TIME);
if (_log.shouldLog(Log.DEBUG))
_log.debug("DHT tracker cleaner done, now with " +
torrentCount + " torrents, " +
peerCount + " peers, " +
DataHelper.formatDuration(_expireTime) + " expiration");
schedule(CLEAN_TIME);
}
}
}

View File

@ -0,0 +1,19 @@
package org.klomp.snark.dht;
/*
* From zzzot, modded and relicensed to GPLv2
*/
import net.i2p.crypto.SHA1Hash;
/**
* A 20-byte SHA1 info hash
*
* @since 0.8.4
* @author zzz
*/
class InfoHash extends SHA1Hash {
public InfoHash(byte[] data) {
super(data);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,32 @@
package org.klomp.snark.dht;
/*
* GPLv2
*/
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
/**
* Used for both incoming and outgoing message IDs
*
* @since 0.8.4
* @author zzz
*/
class MsgID extends ByteArray {
private static final int MY_TOK_LEN = 8;
/** outgoing - generate a random ID */
public MsgID(I2PAppContext ctx) {
super(null);
byte[] data = new byte[MY_TOK_LEN];
ctx.random().nextBytes(data);
setData(data);
setValid(MY_TOK_LEN);
}
/** incoming - save the ID (arbitrary length) */
public MsgID(byte[] data) {
super(data);
}
}

View File

@ -0,0 +1,46 @@
package org.klomp.snark.dht;
/*
* From zzzot, modded and relicensed to GPLv2
*/
import net.i2p.crypto.SHA1Hash;
import net.i2p.util.Clock;
/**
* A 20-byte peer ID, used as a Map key in lots of places.
* Must be public for constructor in KBucketSet.generateRandomKey()
*
* @since 0.8.4
* @author zzz
*/
public class NID extends SHA1Hash {
private long lastSeen;
private int fails;
private static final int MAX_FAILS = 3;
public NID() {
super(null);
}
public NID(byte[] data) {
super(data);
}
public long lastSeen() {
return lastSeen;
}
public void setLastSeen() {
lastSeen = Clock.getInstance().now();
fails = 0;
}
/**
* @return if more than max timeouts
*/
public boolean timeout() {
return fails++ > MAX_FAILS;
}
}

View File

@ -0,0 +1,253 @@
package org.klomp.snark.dht;
/*
* From zzzot, modded and relicensed to GPLv2
*/
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.SimpleDataStructure;
import net.i2p.util.RandomSource;
/*
* A Node ID, Hash, and port, and an optional Destination.
* This is what DHTNodes remembers. The DHT tracker just stores Hashes.
* getData() returns the 54 byte compact info (NID, Hash, port).
*
* Things are a little tricky in KRPC since we exchange Hashes and don't
* always have the Destination.
* The conpact info is immutable. The Destination may be added later.
*
* @since 0.8.4
* @author zzz
*/
class NodeInfo extends SimpleDataStructure {
private final NID nID;
private final Hash hash;
private Destination dest;
private final int port;
public static final int LENGTH = NID.HASH_LENGTH + Hash.HASH_LENGTH + 2;
/**
* With a fake NID used for pings
*/
public NodeInfo(Destination dest, int port) {
super();
this.nID = KRPC.FAKE_NID;
this.dest = dest;
this.hash = dest.calculateHash();
this.port = port;
initialize();
}
/**
* Use this if we have the full destination
* @throws IllegalArgumentException
*/
public NodeInfo(NID nID, Destination dest, int port) {
super();
this.nID = nID;
this.dest = dest;
this.hash = dest.calculateHash();
this.port = port;
initialize();
verify();
}
/**
* No Destination yet available
* @throws IllegalArgumentException
*/
public NodeInfo(NID nID, Hash hash, int port) {
super();
this.nID = nID;
this.hash = hash;
this.port = port;
initialize();
verify();
}
/**
* No Destination yet available
* @param compactInfo 20 byte node ID, 32 byte destHash, 2 byte port
* @param offset starting at this offset in compactInfo
* @throws IllegalArgumentException
* @throws AIOOBE
*/
public NodeInfo(byte[] compactInfo, int offset) {
super();
byte[] d = new byte[LENGTH];
System.arraycopy(compactInfo, offset, d, 0, LENGTH);
setData(d);
byte[] ndata = new byte[NID.HASH_LENGTH];
System.arraycopy(d, 0, ndata, 0, NID.HASH_LENGTH);
this.nID = new NID(ndata);
this.hash = Hash.create(d, NID.HASH_LENGTH);
this.port = (int) DataHelper.fromLong(d, NID.HASH_LENGTH + Hash.HASH_LENGTH, 2);
if (port <= 0 || port >= 65535)
throw new IllegalArgumentException("Bad port");
verify();
}
/**
* Create from persistent storage string.
* Format: NID:Hash:Destination:port
* First 3 in base 64; Destination may be empty string
* @throws IllegalArgumentException
*/
public NodeInfo(String s) throws DataFormatException {
super();
String[] parts = s.split(":", 4);
if (parts.length != 4)
throw new DataFormatException("Bad format");
byte[] nid = Base64.decode(parts[0]);
if (nid == null)
throw new DataFormatException("Bad NID");
nID = new NID(nid);
byte[] h = Base64.decode(parts[1]);
if (h == null)
throw new DataFormatException("Bad hash");
//hash = new Hash(h);
hash = Hash.create(h);
if (parts[2].length() > 0)
dest = new Destination(parts[2]);
try {
port = Integer.parseInt(parts[3]);
} catch (NumberFormatException nfe) {
throw new DataFormatException("Bad port", nfe);
}
initialize();
}
/**
* Creates 54-byte compact info
* @throws IllegalArgumentException
*/
private void initialize() {
if (port <= 0 || port >= 65535)
throw new IllegalArgumentException("Bad port");
byte[] compactInfo = new byte[LENGTH];
System.arraycopy(nID.getData(), 0, compactInfo, 0, NID.HASH_LENGTH);
System.arraycopy(hash.getData(), 0, compactInfo, NID.HASH_LENGTH, Hash.HASH_LENGTH);
DataHelper.toLong(compactInfo, NID.HASH_LENGTH + Hash.HASH_LENGTH, 2, port);
setData(compactInfo);
}
/**
* Generate a secure NID that matches the Hash and port.
* Rules: First 4 bytes must match Hash.
* Next 2 bytes must match Hash ^ port.
* Remaining bytes may be random.
*
* @throws IllegalArgumentException
*/
public static NID generateNID(Hash h, int p, RandomSource random) {
byte[] n = new byte[NID.HASH_LENGTH];
System.arraycopy(h.getData(), 0, n, 0, 6);
n[4] ^= (byte) (p >> 8);
n[5] ^= (byte) p;
random.nextBytes(n, 6, NID.HASH_LENGTH - 6);
return new NID(n);
}
/**
* Verify the NID matches the Hash.
* See generateNID() for requirements.
* @throws IllegalArgumentException on mismatch
*/
private void verify() {
if (!KRPC.SECURE_NID)
return;
byte[] nb = nID.getData();
byte[] hb = hash.getData();
if ((!DataHelper.eq(nb, 0, hb, 0, 4)) ||
((nb[4] ^ (port >> 8)) & 0xff) != (hb[4] & 0xff) ||
((nb[5] ^ port) & 0xff) != (hb[5] & 0xff))
throw new IllegalArgumentException("NID/Hash mismatch");
}
public int length() {
return LENGTH;
}
public NID getNID() {
return this.nID;
}
/** @return may be null if we don't have it */
public Destination getDestination() {
return this.dest;
}
public Hash getHash() {
return this.hash;
}
@Override
public Hash calculateHash() {
return this.hash;
}
/**
* This can come in later but the hash must match.
* @throws IllegalArgumentException if hash of dest doesn't match previous hash
*/
public void setDestination(Destination dest) throws IllegalArgumentException {
if (this.dest != null)
return;
if (!dest.calculateHash().equals(this.hash))
throw new IllegalArgumentException("Hash mismatch, was: " + this.hash + " new: " + dest.calculateHash());
this.dest = dest;
}
public int getPort() {
return this.port;
}
public long lastSeen() {
return nID.lastSeen();
}
@Override
public int hashCode() {
return super.hashCode() ^ nID.hashCode() ^ port;
}
@Override
public boolean equals(Object o) {
try {
NodeInfo ni = (NodeInfo) o;
// assume dest matches, ignore it
return this.hash.equals(ni.hash) && nID.equals(ni.nID) && port == ni.port;
} catch (Exception e) {
return false;
}
}
@Override
public String toString() {
return "NodeInfo: " + nID + ' ' + hash + " port: " + port + (dest != null ? " known dest" : " null dest");
}
/**
* To persistent storage string.
* Format: NID:Hash:Destination:port
* First 3 in base 64; Destination may be empty string
*/
public String toPersistentString() {
StringBuilder buf = new StringBuilder(650);
buf.append(nID.toBase64()).append(':');
buf.append(hash.toBase64()).append(':');
if (dest != null)
buf.append(dest.toBase64());
buf.append(':').append(port);
return buf.toString();
}
}

View File

@ -0,0 +1,31 @@
package org.klomp.snark.dht;
/*
* From zzzot, modded and relicensed to GPLv2
*/
import java.util.Comparator;
import net.i2p.crypto.SHA1Hash;
import net.i2p.data.DataHelper;
/**
* Closest to a InfoHash or NID key.
* Use for NodeInfos.
*
* @since 0.8.4
* @author zzz
*/
class NodeInfoComparator implements Comparator<NodeInfo> {
private final byte[] _base;
public NodeInfoComparator(SHA1Hash h) {
_base = h.getData();
}
public int compare(NodeInfo lhs, NodeInfo rhs) {
byte lhsDelta[] = DataHelper.xor(lhs.getNID().getData(), _base);
byte rhsDelta[] = DataHelper.xor(rhs.getNID().getData(), _base);
return DataHelper.compareTo(lhsDelta, rhsDelta);
}
}

View File

@ -0,0 +1,30 @@
package org.klomp.snark.dht;
/*
* From zzzot, modded and relicensed to GPLv2
*/
import net.i2p.data.Hash;
/**
* A single peer for a single torrent.
* This is what the DHT tracker remembers.
*
* @since 0.8.4
* @author zzz
*/
class Peer extends Hash {
private long lastSeen;
public Peer(byte[] data) {
super(data);
}
public long lastSeen() {
return lastSeen;
}
public void setLastSeen(long now) {
lastSeen = now;
}
}

View File

@ -0,0 +1,21 @@
package org.klomp.snark.dht;
/*
* From zzzot, modded and relicensed to GPLv2
*/
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.Hash;
/**
* All the peers for a single torrent
*
* @since 0.8.4
* @author zzz
*/
class Peers extends ConcurrentHashMap<Hash, Peer> {
public Peers() {
super();
}
}

View File

@ -0,0 +1,82 @@
package org.klomp.snark.dht;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import net.i2p.I2PAppContext;
import net.i2p.data.DataFormatException;
import net.i2p.util.Log;
import net.i2p.util.SecureFileOutputStream;
/**
* Retrieve / Store the local DHT in a file
*
*/
abstract class PersistDHT {
private static final long MAX_AGE = 60*60*1000;
public static synchronized void loadDHT(KRPC krpc, File file) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(PersistDHT.class);
int count = 0;
FileInputStream in = null;
try {
in = new FileInputStream(file);
BufferedReader br = new BufferedReader(new InputStreamReader(in, "ISO-8859-1"));
String line = null;
while ( (line = br.readLine()) != null) {
if (line.startsWith("#"))
continue;
try {
krpc.heardAbout(new NodeInfo(line));
count++;
// TODO limit number? this will flush the router's SDS caches
} catch (IllegalArgumentException iae) {
if (log.shouldLog(Log.WARN))
log.warn("Error reading DHT entry", iae);
} catch (DataFormatException dfe) {
if (log.shouldLog(Log.WARN))
log.warn("Error reading DHT entry", dfe);
}
}
} catch (IOException ioe) {
if (log.shouldLog(Log.WARN) && file.exists())
log.warn("Error reading the DHT File", ioe);
} finally {
if (in != null) try { in.close(); } catch (IOException ioe) {}
}
if (log.shouldLog(Log.INFO))
log.info("Loaded " + count + " nodes from " + file);
}
public static synchronized void saveDHT(DHTNodes nodes, File file) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(PersistDHT.class);
int count = 0;
long maxAge = I2PAppContext.getGlobalContext().clock().now() - MAX_AGE;
PrintWriter out = null;
try {
out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new SecureFileOutputStream(file), "ISO-8859-1")));
out.println("# DHT nodes, format is NID:Hash:Destination:port");
for (NodeInfo ni : nodes.values()) {
if (ni.lastSeen() < maxAge)
continue;
// DHTNodes shouldn't contain us, if that changes check here
out.println(ni.toPersistentString());
count++;
}
} catch (IOException ioe) {
if (log.shouldLog(Log.WARN))
log.warn("Error writing the DHT File", ioe);
} finally {
if (out != null) out.close();
}
if (log.shouldLog(Log.INFO))
log.info("Stored " + count + " nodes to " + file);
}
}

View File

@ -0,0 +1,71 @@
package org.klomp.snark.dht;
/*
* GPLv2
*/
import java.util.Date;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
/**
* Used for Both outgoing and incoming tokens
*
* @since 0.8.4
* @author zzz
*/
class Token extends ByteArray {
private static final int MY_TOK_LEN = 8;
private final long lastSeen;
/** outgoing - generate a random token */
public Token(I2PAppContext ctx) {
super(null);
byte[] data = new byte[MY_TOK_LEN];
ctx.random().nextBytes(data);
setData(data);
setValid(MY_TOK_LEN);
lastSeen = ctx.clock().now();
}
/** incoming - save the token (arbitrary length) */
public Token(I2PAppContext ctx, byte[] data) {
super(data);
lastSeen = ctx.clock().now();
}
/** incoming - for lookup only, not storage, lastSeen is 0 */
public Token(byte[] data) {
super(data);
lastSeen = 0;
}
public long lastSeen() {
return lastSeen;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("[Token: ");
byte[] bs = getData();
if (bs.length == 0) {
buf.append("0 bytes");
} else {
buf.append(bs.length).append(" bytes: 0x");
// backwards, but the same way BEValue does it
for (int i = 0; i < bs.length; i++) {
int b = bs[i] & 0xff;
if (b < 16)
buf.append('0');
buf.append(Integer.toHexString(b));
}
}
if (lastSeen > 0)
buf.append(" created ").append((new Date(lastSeen)).toString());
buf.append(']');
return buf.toString();
}
}

View File

@ -0,0 +1,20 @@
package org.klomp.snark.dht;
/*
* GPLv2
*/
import net.i2p.crypto.SHA1Hash;
import net.i2p.data.DataHelper;
/**
* Used to index incoming Tokens
*
* @since 0.8.4
* @author zzz
*/
class TokenKey extends SHA1Hash {
public TokenKey(NID nID, InfoHash ih) {
super(DataHelper.xor(nID.getData(), ih.getData()));
}
}

View File

@ -0,0 +1,19 @@
package org.klomp.snark.dht;
/*
* From zzzot, relicensed to GPLv2
*/
import java.util.concurrent.ConcurrentHashMap;
/**
* All the torrents
*
* @since 0.8.4
* @author zzz
*/
class Torrents extends ConcurrentHashMap<InfoHash, Peers> {
public Torrents() {
super();
}
}

View File

@ -42,6 +42,7 @@ import org.klomp.snark.SnarkManager;
import org.klomp.snark.Storage;
import org.klomp.snark.Tracker;
import org.klomp.snark.TrackerClient;
import org.klomp.snark.dht.DHT;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.resource.Resource;
@ -470,6 +471,14 @@ public class I2PSnarkServlet extends DefaultServlet {
out.write(", ");
out.write(DataHelper.formatSize2(stats[5]) + "B, ");
out.write(ngettext("1 connected peer", "{0} connected peers", (int) stats[4]));
DHT dht = _manager.util().getDHT();
if (dht != null) {
int dhts = dht.size();
if (dhts > 0) {
out.write(", ");
out.write(ngettext("1 DHT peer", "{0} DHT peers", dhts));
}
}
out.write("</th>\n");
if (_manager.util().connected()) {
out.write(" <th align=\"right\">" + formatSize(stats[0]) + "</th>\n" +
@ -699,11 +708,12 @@ public class I2PSnarkServlet extends DefaultServlet {
String refreshDel = req.getParameter("refreshDelay");
String startupDel = req.getParameter("startupDelay");
boolean useOpenTrackers = req.getParameter("useOpenTrackers") != null;
boolean useDHT = req.getParameter("useDHT") != null;
//String openTrackers = req.getParameter("openTrackers");
String theme = req.getParameter("theme");
_manager.updateConfig(dataDir, filesPublic, autoStart, refreshDel, startupDel,
seedPct, eepHost, eepPort, i2cpHost, i2cpPort, i2cpOpts,
upLimit, upBW, useOpenTrackers, theme);
upLimit, upBW, useOpenTrackers, useDHT, theme);
} else if ("Save2".equals(action)) {
String taction = req.getParameter("taction");
if (taction != null)
@ -1492,6 +1502,7 @@ public class I2PSnarkServlet extends DefaultServlet {
boolean autoStart = _manager.shouldAutoStart();
boolean useOpenTrackers = _manager.util().shouldUseOpenTrackers();
//String openTrackers = _manager.util().getOpenTrackerString();
boolean useDHT = _manager.util().shouldUseDHT();
//int seedPct = 0;
out.write("<form action=\"/i2psnark/configure\" method=\"POST\">\n" +
@ -1605,6 +1616,14 @@ public class I2PSnarkServlet extends DefaultServlet {
+ (useOpenTrackers ? "checked " : "")
+ "title=\"");
out.write(_("If checked, announce torrents to open trackers as well as the tracker listed in the torrent file"));
out.write("\" ></td></tr>\n" +
"<tr><td>");
out.write(_("Enable DHT") + " (**BETA**)");
out.write(": <td><input type=\"checkbox\" class=\"optbox\" name=\"useDHT\" value=\"true\" "
+ (useDHT ? "checked " : "")
+ "title=\"");
out.write(_("If checked, use DHT"));
out.write("\" ></td></tr>\n");
// "<tr><td>");