- Add heardAbout() and call for receive peers

- Move last-seen tracking from NodeInfo to NID, add fail tracking
- Make NodeInfo fields final
- Remove nodes on consecutive failures
- Only persist nodes heard from recently
- Implement NID verification for security
This commit is contained in:
zzz
2012-06-12 18:09:42 +00:00
parent fe2b97c941
commit 41096c7f23
4 changed files with 92 additions and 78 deletions

View File

@ -124,6 +124,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private static final int REPLY_PEERS = 2;
private static final int REPLY_NODES = 3;
public static final boolean SECURE_NID = true;
/** how long since last heard from do we delete - BEP 5 says 15 minutes */
private static final long MAX_NODEINFO_AGE = 60*60*1000;
/** how long since generated do we delete - BEP 5 says 10 minutes */
@ -155,7 +157,10 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
_qPort = 2555 + ctx.random().nextInt(61111);
_rPort = _qPort + 1;
_myID = new byte[NID.HASH_LENGTH];
ctx.random().nextBytes(_myID);
if (SECURE_NID)
System.arraycopy(session.getMyDestination().calculateHash().getData(), 0, _myID, 0, NID.HASH_LENGTH);
else
ctx.random().nextBytes(_myID);
_myNID = new NID(_myID);
_myNodeInfo = new NodeInfo(_myNID, session.getMyDestination(), _qPort);
_dhtFile = new File(ctx.getConfigDir(), DHT_FILE);
@ -167,29 +172,6 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
///////////////// Public methods
/**
* For bootstrapping if loaded from config file.
* @param when when did we hear from them
*/
public void addNode(NodeInfo nInfo, long when) {
heardFrom(nInfo, when);
}
/**
* NodeInfo heard from
*/
public void addNode(NodeInfo nInfo) {
heardFrom(nInfo);
}
/**
* For saving in a config file.
* @return the values, not a copy, could change, use an iterator
*/
public Collection<NodeInfo> getNodes() {
return _knownNodes.values();
}
/**
* @return The UDP query port
*/
@ -989,14 +971,6 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* @return old NodeInfo or nInfo if none, use this to reduce object churn
*/
private NodeInfo heardFrom(NodeInfo nInfo) {
return heardFrom(nInfo, _context.clock().now());
}
/**
* Used for initialization
* @return old NodeInfo or nInfo if none, use this to reduce object churn
*/
private NodeInfo heardFrom(NodeInfo nInfo, long when) {
// try to keep ourselves out of the DHT
if (nInfo.equals(_myNodeInfo))
return _myNodeInfo;
@ -1013,11 +987,39 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (oldInfo.getDestination() == null && nInfo.getDestination() != null)
oldInfo.setDestination(nInfo.getDestination());
}
if (when > oldInfo.lastSeen())
oldInfo.setLastSeen(when);
oldInfo.getNID().setLastSeen();
return oldInfo;
}
/**
* Called for bootstrap or for all nodes in a receiveNodes reply.
* Package private for PersistDHT.
* @return non-null nodeInfo from DB if present, otherwise the nInfo parameter is returned
*/
NodeInfo heardAbout(NodeInfo nInfo) {
// try to keep ourselves out of the DHT
if (nInfo.equals(_myNodeInfo))
return _myNodeInfo;
NID nID = nInfo.getNID();
NodeInfo rv = _knownNodes.putIfAbsent(nID, nInfo);
if (rv == null)
rv = nInfo;
return rv;
}
/**
* Called when a reply times out
*/
private void timeout(NodeInfo nInfo) {
boolean remove = nInfo.getNID().timeout();
if (remove) {
if (_knownNodes.remove(nInfo) != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Removed after consecutive timeouts: " + nInfo);
}
}
}
/**
* Handle and respond to the query
*/
@ -1163,12 +1165,9 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
*/
private List<NodeInfo> receiveNodes(NodeInfo nInfo, byte[] ids) throws InvalidBEncodingException {
List<NodeInfo> rv = new ArrayList(ids.length / NodeInfo.LENGTH);
long fakeTime = _context.clock().now() - (MAX_NODEINFO_AGE * 3 / 4);
for (int off = 0; off < ids.length; off += NodeInfo.LENGTH) {
NodeInfo nInf = new NodeInfo(ids, off);
// anti-churn
// TODO do we need heardAbout too?
nInf = heardFrom(nInf, fakeTime);
nInf = heardAbout(nInf);
rv.add(nInf);
}
if (_log.shouldLog(Log.INFO))
@ -1315,6 +1314,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
_sentQueries.remove(mid);
if (onTimeout != null)
onTimeout.run();
timeout(ReplyWaiter.this);
if (_log.shouldLog(Log.INFO))
_log.warn("timeout waiting for reply from " + ReplyWaiter.this.toString());
}

View File

@ -4,6 +4,7 @@ package org.klomp.snark.dht;
*/
import net.i2p.crypto.SHA1Hash;
import net.i2p.util.Clock;
/**
* A 20-byte peer ID, used as a Map key in lots of places.
@ -13,7 +14,28 @@ import net.i2p.crypto.SHA1Hash;
*/
class NID extends SHA1Hash {
private long lastSeen;
private int fails;
private static final int MAX_FAILS = 3;
public NID(byte[] data) {
super(data);
}
public long lastSeen() {
return lastSeen;
}
public void setLastSeen() {
lastSeen = Clock.getInstance().now();
fails = 0;
}
/**
* @return if more than max timeouts
*/
public boolean timeout() {
return fails++ > MAX_FAILS;
}
}

View File

@ -25,11 +25,10 @@ import net.i2p.data.SimpleDataStructure;
class NodeInfo extends SimpleDataStructure {
private long lastSeen;
private NID nID;
private Hash hash;
private final NID nID;
private final Hash hash;
private Destination dest;
private int port;
private final int port;
public static final int LENGTH = NID.HASH_LENGTH + Hash.HASH_LENGTH + 2;
@ -44,6 +43,7 @@ class NodeInfo extends SimpleDataStructure {
this.hash = dest.calculateHash();
this.port = port;
initialize();
verify();
}
/**
@ -56,16 +56,7 @@ class NodeInfo extends SimpleDataStructure {
this.hash = hash;
this.port = port;
initialize();
}
/**
* No Destination yet available
* @param compactInfo 20 byte node ID, 32 byte destHash, 2 byte port
* @throws IllegalArgumentException
*/
public NodeInfo(byte[] compactInfo) {
super(compactInfo);
initialize(compactInfo);
verify();
}
/**
@ -80,11 +71,18 @@ class NodeInfo extends SimpleDataStructure {
byte[] d = new byte[LENGTH];
System.arraycopy(compactInfo, offset, d, 0, LENGTH);
setData(d);
initialize(d);
byte[] ndata = new byte[NID.HASH_LENGTH];
System.arraycopy(d, 0, ndata, 0, NID.HASH_LENGTH);
this.nID = new NID(ndata);
this.hash = Hash.create(d, NID.HASH_LENGTH);
this.port = (int) DataHelper.fromLong(d, NID.HASH_LENGTH + Hash.HASH_LENGTH, 2);
if (port <= 0 || port >= 65535)
throw new IllegalArgumentException("Bad port");
verify();
}
/**
* Form persistent storage string.
* Create from persistent storage string.
* Format: NID:Hash:Destination:port
* First 3 in base 64; Destination may be empty string
* @throws IllegalArgumentException
@ -113,24 +111,6 @@ class NodeInfo extends SimpleDataStructure {
initialize();
}
/**
* Creates data structures from the compact info
* @throws IllegalArgumentException
*/
private void initialize(byte[] compactInfo) {
if (compactInfo.length != LENGTH)
throw new IllegalArgumentException("Bad compact info length");
byte[] ndata = new byte[NID.HASH_LENGTH];
System.arraycopy(compactInfo, 0, ndata, 0, NID.HASH_LENGTH);
this.nID = new NID(ndata);
//byte[] hdata = new byte[Hash.HASH_LENGTH];
//System.arraycopy(compactInfo, NID.HASH_LENGTH, hdata, 0, Hash.HASH_LENGTH);
//this.hash = new Hash(hdata);
this.hash = Hash.create(compactInfo, NID.HASH_LENGTH);
this.port = (int) DataHelper.fromLong(compactInfo, NID.HASH_LENGTH + Hash.HASH_LENGTH, 2);
if (port <= 0 || port >= 65535)
throw new IllegalArgumentException("Bad port");
}
/**
* Creates 54-byte compact info
@ -146,6 +126,17 @@ class NodeInfo extends SimpleDataStructure {
setData(compactInfo);
}
/**
* Verify the NID matches the Hash
* @throws IllegalArgumentException
*/
private void verify() {
if (!KRPC.SECURE_NID)
return;
if (!DataHelper.eq(nID.getData(), 0, hash.getData(), 0, NID.HASH_LENGTH))
throw new IllegalArgumentException("NID/Hash mismatch");
}
public int length() {
return LENGTH;
}
@ -185,11 +176,7 @@ class NodeInfo extends SimpleDataStructure {
}
public long lastSeen() {
return lastSeen;
}
public void setLastSeen(long now) {
lastSeen = now;
return nID.lastSeen();
}
@Override

View File

@ -20,6 +20,8 @@ import net.i2p.util.SecureFileOutputStream;
*/
abstract class PersistDHT {
private static final long MAX_AGE = 60*60*1000;
public static synchronized void loadDHT(KRPC krpc, File file) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(PersistDHT.class);
int count = 0;
@ -32,7 +34,7 @@ abstract class PersistDHT {
if (line.startsWith("#"))
continue;
try {
krpc.addNode(new NodeInfo(line));
krpc.heardAbout(new NodeInfo(line));
count++;
// TODO limit number? this will flush the router's SDS caches
} catch (IllegalArgumentException iae) {
@ -56,11 +58,14 @@ abstract class PersistDHT {
public static synchronized void saveDHT(DHTNodes nodes, File file) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(PersistDHT.class);
int count = 0;
long maxAge = I2PAppContext.getGlobalContext().clock().now() - MAX_AGE;
PrintWriter out = null;
try {
out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new SecureFileOutputStream(file), "ISO-8859-1")));
out.println("# DHT nodes, format is NID:Hash:Destination:port");
for (NodeInfo ni : nodes.values()) {
if (ni.lastSeen() < maxAge)
continue;
// DHTNodes shouldn't contain us, if that changes check here
out.println(ni.toPersistentString());
count++;