forked from I2P_Developers/i2p.i2p
- Fix node ID / node info confusion
- Fix updating node ID when receiving pong - Fix getting DHT enable setting from config file - Fix handling of get_peers replies - Fix sending and receiving announces without signing - Fix incoming/outgoing token handling - Set cleanup timer for all queries - More debug logging
This commit is contained in:
@ -350,7 +350,7 @@ public class SnarkManager implements Snark.CompleteListener {
|
||||
String useOT = _config.getProperty(PROP_USE_OPENTRACKERS);
|
||||
boolean bOT = useOT == null || Boolean.valueOf(useOT).booleanValue();
|
||||
_util.setUseOpenTrackers(bOT);
|
||||
_util.setUseDHT(Boolean.valueOf(PROP_USE_DHT).booleanValue());
|
||||
_util.setUseDHT(Boolean.valueOf(_config.getProperty(PROP_USE_DHT)).booleanValue());
|
||||
getDataDir().mkdirs();
|
||||
initTrackerMap();
|
||||
}
|
||||
|
@ -91,13 +91,17 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
private final DHTNodes _knownNodes;
|
||||
/** index to sent queries awaiting reply */
|
||||
private final ConcurrentHashMap<MsgID, ReplyWaiter> _sentQueries;
|
||||
/** index to outgoing tokens, sent in reply to a get_peers query */
|
||||
private final ConcurrentHashMap<InfoHash, Token> _outgoingTokens;
|
||||
/** index to incoming tokens, received in a peers or nodes reply */
|
||||
private final ConcurrentHashMap<TokenKey, Token> _incomingTokens;
|
||||
/** index to outgoing tokens we generated, sent in reply to a get_peers query */
|
||||
private final ConcurrentHashMap<Token, NID> _outgoingTokens;
|
||||
/** index to incoming opaque tokens, received in a peers or nodes reply */
|
||||
private final ConcurrentHashMap<NID, Token> _incomingTokens;
|
||||
|
||||
/** hook to inject and receive datagrams */
|
||||
private final I2PSession _session;
|
||||
/** 20 byte random id */
|
||||
private final byte[] _myID;
|
||||
/** 20 byte random id */
|
||||
private final NID _myNID;
|
||||
/** 20 byte random id + 32 byte Hash + 2 byte port */
|
||||
private final NodeInfo _myNodeInfo;
|
||||
/** unsigned dgrams */
|
||||
@ -146,10 +150,10 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
// ports can really be fixed, just do this for testing
|
||||
_qPort = 30000 + ctx.random().nextInt(99);
|
||||
_rPort = _qPort + 1;
|
||||
byte[] myID = new byte[NID.HASH_LENGTH];
|
||||
ctx.random().nextBytes(myID);
|
||||
NID myNID = new NID(myID);
|
||||
_myNodeInfo = new NodeInfo(myNID, session.getMyDestination(), _qPort);
|
||||
_myID = new byte[NID.HASH_LENGTH];
|
||||
ctx.random().nextBytes(_myID);
|
||||
_myNID = new NID(_myID);
|
||||
_myNodeInfo = new NodeInfo(_myNID, session.getMyDestination(), _qPort);
|
||||
|
||||
session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort);
|
||||
session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM, _qPort);
|
||||
@ -242,7 +246,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
tried.add(nInfo);
|
||||
|
||||
// this isn't going to work, he will just return our own?
|
||||
ReplyWaiter waiter = sendFindNode(nInfo, _myNodeInfo);
|
||||
ReplyWaiter waiter = sendFindNode(nInfo, _myNID);
|
||||
if (waiter == null)
|
||||
continue;
|
||||
synchronized(waiter) {
|
||||
@ -309,7 +313,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
Set<NodeInfo> tried = new HashSet();
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Starting getPeers");
|
||||
_log.info("Starting getPeers with " + nodes.size() + " to try");
|
||||
for (int i = 0; i < max; i++) {
|
||||
NodeInfo nInfo;
|
||||
try {
|
||||
@ -413,7 +417,10 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
announce(ih);
|
||||
int rv = 0;
|
||||
long start = _context.clock().now();
|
||||
List<NodeInfo> nodes = _knownNodes.findClosest(new InfoHash(ih), max);
|
||||
InfoHash iHash = new InfoHash(ih);
|
||||
List<NodeInfo> nodes = _knownNodes.findClosest(iHash, max);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Found " + nodes.size() + " to announce to for " + iHash);
|
||||
for (NodeInfo nInfo : nodes) {
|
||||
if (announce(ih, nInfo, Math.min(maxWait, 60*1000)))
|
||||
rv++;
|
||||
@ -435,14 +442,18 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
* @param maxWait the maximum time to wait (ms) or 0 to return immediately.
|
||||
* @return success
|
||||
*/
|
||||
public boolean announce(byte[] ih, NodeInfo nInfo, long maxWait) {
|
||||
private boolean announce(byte[] ih, NodeInfo nInfo, long maxWait) {
|
||||
InfoHash iHash = new InfoHash(ih);
|
||||
TokenKey tokenKey = new TokenKey(nInfo.getNID(), iHash);
|
||||
Token token = _incomingTokens.get(tokenKey);
|
||||
// it isn't clear from BEP 5 if a token is bound to a single infohash?
|
||||
// for now, just bind to the NID
|
||||
//TokenKey tokenKey = new TokenKey(nInfo.getNID(), iHash);
|
||||
Token token = _incomingTokens.get(nInfo.getNID());
|
||||
if (token == null) {
|
||||
// we have no token, have to do a getPeers first to get a token
|
||||
if (maxWait <= 0)
|
||||
return false;
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("No token for announce to " + nInfo + " sending get_peers first");
|
||||
ReplyWaiter waiter = sendGetPeers(nInfo, iHash);
|
||||
if (waiter == null)
|
||||
return false;
|
||||
@ -453,15 +464,24 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
int replyType = waiter.getReplyCode();
|
||||
if (!(replyType == REPLY_PEERS || replyType == REPLY_NODES))
|
||||
if (!(replyType == REPLY_PEERS || replyType == REPLY_NODES)) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Get_peers failed to " + nInfo);
|
||||
return false;
|
||||
}
|
||||
// we should have a token now
|
||||
token = _incomingTokens.get(tokenKey);
|
||||
if (token == null)
|
||||
token = _incomingTokens.get(nInfo.getNID());
|
||||
if (token == null) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Huh? no token after get_peers succeeded to " + nInfo);
|
||||
return false;
|
||||
}
|
||||
maxWait -= _context.clock().now() - start;
|
||||
if (maxWait < 1000)
|
||||
if (maxWait < 1000) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Ran out of time after get_peers succeeded to " + nInfo);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// send and wait on rcv msg lock unless maxWait <= 0
|
||||
@ -525,6 +545,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
* @return null on error
|
||||
*/
|
||||
private ReplyWaiter sendPing(NodeInfo nInfo) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending ping to: " + nInfo);
|
||||
Map<String, Object> map = new HashMap();
|
||||
map.put("q", "ping");
|
||||
Map<String, Object> args = new HashMap();
|
||||
@ -534,9 +556,12 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
|
||||
/**
|
||||
* @param nInfo who to send it to
|
||||
* @param tID target ID we are looking for
|
||||
* @return null on error
|
||||
*/
|
||||
private ReplyWaiter sendFindNode(NodeInfo nInfo, NodeInfo tID) {
|
||||
private ReplyWaiter sendFindNode(NodeInfo nInfo, NID tID) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending find node of " + tID + " to: " + nInfo);
|
||||
Map<String, Object> map = new HashMap();
|
||||
map.put("q", "find_node");
|
||||
Map<String, Object> args = new HashMap();
|
||||
@ -550,12 +575,18 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
* @return null on error
|
||||
*/
|
||||
private ReplyWaiter sendGetPeers(NodeInfo nInfo, InfoHash ih) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending get peers of " + ih + " to: " + nInfo);
|
||||
Map<String, Object> map = new HashMap();
|
||||
map.put("q", "get_peers");
|
||||
Map<String, Object> args = new HashMap();
|
||||
args.put("info_hash", ih.getData());
|
||||
map.put("a", args);
|
||||
return sendQuery(nInfo, map, true);
|
||||
ReplyWaiter rv = sendQuery(nInfo, map, true);
|
||||
// save the InfoHash so we can get it later
|
||||
if (rv != null)
|
||||
rv.setSentObject(ih);
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -563,6 +594,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
* @return null on error
|
||||
*/
|
||||
private ReplyWaiter sendAnnouncePeer(NodeInfo nInfo, InfoHash ih, Token token) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending announce of " + ih + " to: " + nInfo);
|
||||
Map<String, Object> map = new HashMap();
|
||||
map.put("q", "announce_peer");
|
||||
Map<String, Object> args = new HashMap();
|
||||
@ -573,9 +606,6 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
map.put("a", args);
|
||||
// an announce need not be signed, we have a token
|
||||
ReplyWaiter rv = sendQuery(nInfo, map, false);
|
||||
// save the InfoHash so we can get it later
|
||||
if (rv != null)
|
||||
rv.setSentObject(ih);
|
||||
return rv;
|
||||
}
|
||||
|
||||
@ -587,6 +617,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
* @return success
|
||||
*/
|
||||
private boolean sendPong(NodeInfo nInfo, MsgID msgID) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending pong to: " + nInfo);
|
||||
Map<String, Object> map = new HashMap();
|
||||
Map<String, Object> resps = new HashMap();
|
||||
map.put("r", resps);
|
||||
@ -604,6 +636,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
* @return success
|
||||
*/
|
||||
private boolean sendNodes(NodeInfo nInfo, MsgID msgID, Token token, byte[] ids) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending nodes to: " + nInfo);
|
||||
Map<String, Object> map = new HashMap();
|
||||
Map<String, Object> resps = new HashMap();
|
||||
map.put("r", resps);
|
||||
@ -615,6 +649,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
|
||||
/** @param token non-null */
|
||||
private boolean sendPeers(NodeInfo nInfo, MsgID msgID, Token token, List<byte[]> peers) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending peers to: " + nInfo);
|
||||
Map<String, Object> map = new HashMap();
|
||||
Map<String, Object> resps = new HashMap();
|
||||
map.put("r", resps);
|
||||
@ -630,6 +666,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
* @return success
|
||||
*/
|
||||
private boolean sendError(NodeInfo nInfo, MsgID msgID, int err, String msg) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending error " + msg + " to: " + nInfo);
|
||||
Map<String, Object> map = new HashMap();
|
||||
Map<String, Object> resps = new HashMap();
|
||||
map.put("r", resps);
|
||||
@ -647,8 +685,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
private ReplyWaiter sendQuery(NodeInfo nInfo, Map<String, Object> map, boolean repliable) {
|
||||
if (nInfo.equals(_myNodeInfo))
|
||||
throw new IllegalArgumentException("wtf don't send to ourselves");
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending query to: " + nInfo);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Sending query to: " + nInfo);
|
||||
if (nInfo.getDestination() == null) {
|
||||
NodeInfo newInfo = _knownNodes.get(nInfo.getNID());
|
||||
if (newInfo != null && newInfo.getDestination() != null) {
|
||||
@ -666,11 +704,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
Map<String, Object> args = (Map<String, Object>) map.get("a");
|
||||
if (args == null)
|
||||
throw new IllegalArgumentException("no args");
|
||||
args.put("id", _myNodeInfo.getData());
|
||||
args.put("id", _myID);
|
||||
int port = nInfo.getPort();
|
||||
if (!repliable)
|
||||
port++;
|
||||
boolean success = sendMessage(nInfo.getDestination(), port, map, true);
|
||||
boolean success = sendMessage(nInfo.getDestination(), port, map, repliable);
|
||||
if (success) {
|
||||
// save for the caller to get
|
||||
ReplyWaiter rv = new ReplyWaiter(mID, nInfo, null, null);
|
||||
@ -687,8 +725,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
private boolean sendResponse(NodeInfo nInfo, MsgID msgID, Map<String, Object> map) {
|
||||
if (nInfo.equals(_myNodeInfo))
|
||||
throw new IllegalArgumentException("wtf don't send to ourselves");
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending response to: " + nInfo);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Sending response to: " + nInfo);
|
||||
if (nInfo.getDestination() == null) {
|
||||
NodeInfo newInfo = _knownNodes.get(nInfo.getNID());
|
||||
if (newInfo != null && newInfo.getDestination() != null) {
|
||||
@ -705,7 +743,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
Map<String, Object> resps = (Map<String, Object>) map.get("r");
|
||||
if (resps == null)
|
||||
throw new IllegalArgumentException("no resps");
|
||||
resps.put("id", _myNodeInfo.getData());
|
||||
resps.put("id", _myID);
|
||||
return sendMessage(nInfo.getDestination(), nInfo.getPort() + 1, map, false);
|
||||
}
|
||||
|
||||
@ -803,7 +841,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
byte[] msgIDBytes = map.get("t").getBytes();
|
||||
MsgID mID = new MsgID(msgIDBytes);
|
||||
String type = map.get("y").getString();
|
||||
if (type.equals("q") && from != null) {
|
||||
if (type.equals("q")) {
|
||||
// queries must be repliable
|
||||
String method = map.get("q").getString();
|
||||
Map<String, BEValue> args = map.get("a").getMap();
|
||||
@ -849,21 +887,31 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
|
||||
/**
|
||||
* Adds sender to our DHT.
|
||||
* @param dest non-null
|
||||
* @param dest may be null for announce_peer method only
|
||||
* @throws NPE too
|
||||
*/
|
||||
private void receiveQuery(MsgID msgID, Destination dest, int fromPort, String method, Map<String, BEValue> args) throws InvalidBEncodingException {
|
||||
if (dest == null && !method.equals("announce_peer")) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Received non-announce_peer query method on reply port: " + method);
|
||||
return;
|
||||
}
|
||||
byte[] nid = args.get("id").getBytes();
|
||||
NodeInfo nInfo = new NodeInfo(nid);
|
||||
nInfo = heardFrom(nInfo);
|
||||
nInfo.setDestination(dest);
|
||||
// ninfo.checkport ?
|
||||
NodeInfo nInfo;
|
||||
if (dest != null) {
|
||||
nInfo = new NodeInfo(new NID(nid), dest, fromPort);
|
||||
nInfo = heardFrom(nInfo);
|
||||
nInfo.setDestination(dest);
|
||||
// ninfo.checkport ?
|
||||
} else {
|
||||
nInfo = null;
|
||||
}
|
||||
|
||||
if (method.equals("ping")) {
|
||||
receivePing(msgID, nInfo);
|
||||
} else if (method.equals("find_node")) {
|
||||
byte[] tid = args.get("target").getBytes();
|
||||
NodeInfo tID = new NodeInfo(tid);
|
||||
NID tID = new NID(tid);
|
||||
receiveFindNode(msgID, nInfo, tID);
|
||||
} else if (method.equals("get_peers")) {
|
||||
byte[] hash = args.get("info_hash").getBytes();
|
||||
@ -875,7 +923,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
// this is the "TCP" port, we don't care
|
||||
//int port = args.get("port").getInt();
|
||||
byte[] token = args.get("token").getBytes();
|
||||
receiveAnnouncePeer(msgID, nInfo, ih, token);
|
||||
receiveAnnouncePeer(msgID, ih, token);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Unknown query method rcvd: " + method);
|
||||
@ -907,6 +955,9 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
NodeInfo nInfo2 = _knownNodes.putIfAbsent(nID, nInfo);
|
||||
if (nInfo2 != null)
|
||||
oldInfo = nInfo2;
|
||||
} else {
|
||||
if (oldInfo.getDestination() == null && nInfo.getDestination() != null)
|
||||
oldInfo.setDestination(nInfo.getDestination());
|
||||
}
|
||||
if (when > oldInfo.lastSeen())
|
||||
oldInfo.setLastSeen(when);
|
||||
@ -924,8 +975,9 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
|
||||
/**
|
||||
* Handle and respond to the query
|
||||
* @param tID target ID they are looking for
|
||||
*/
|
||||
private void receiveFindNode(MsgID msgID, NodeInfo nInfo, NodeInfo tID) throws InvalidBEncodingException {
|
||||
private void receiveFindNode(MsgID msgID, NodeInfo nInfo, NID tID) throws InvalidBEncodingException {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Rcvd find_node from: " + nInfo + " for: " + tID);
|
||||
NodeInfo peer = _knownNodes.get(tID);
|
||||
@ -934,7 +986,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
sendNodes(nInfo, msgID, peer.getData());
|
||||
} else {
|
||||
// get closest from DHT
|
||||
List<NodeInfo> nodes = _knownNodes.findClosest(tID.getNID(), K);
|
||||
List<NodeInfo> nodes = _knownNodes.findClosest(tID, K);
|
||||
nodes.remove(nInfo); // him
|
||||
nodes.remove(_myNodeInfo); // me
|
||||
byte[] nodeArray = new byte[nodes.size() * NodeInfo.LENGTH];
|
||||
@ -953,7 +1005,9 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
_log.info("Rcvd get_peers from: " + nInfo + " for: " + ih);
|
||||
// generate and save random token
|
||||
Token token = new Token(_context);
|
||||
_outgoingTokens.put(ih, token);
|
||||
_outgoingTokens.put(token, nInfo.getNID());
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Stored new OB token: " + token + " for: " + nInfo);
|
||||
|
||||
List<Hash> peers = _tracker.getPeers(ih, MAX_WANT);
|
||||
if (peers.isEmpty()) {
|
||||
@ -979,22 +1033,27 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle and respond to the query
|
||||
* Handle and respond to the query.
|
||||
* We have no node info here, it came on response port, we have to get it from the token
|
||||
*/
|
||||
private void receiveAnnouncePeer(MsgID msgID, NodeInfo nInfo, InfoHash ih, byte[] token) throws InvalidBEncodingException {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Rcvd announce from: " + nInfo + " for: " + ih);
|
||||
// check token
|
||||
// get desthash from token->dest map
|
||||
Token oldToken = _outgoingTokens.get(ih);
|
||||
if (oldToken == null || !DataHelper.eq(oldToken.getData(), token)) {
|
||||
private void receiveAnnouncePeer(MsgID msgID, InfoHash ih, byte[] tok) throws InvalidBEncodingException {
|
||||
Token token = new Token(tok);
|
||||
NID nid = _outgoingTokens.get(token);
|
||||
if (nid == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Bad token");
|
||||
_log.warn("Unknown token in announce_peer: " + token);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Current known tokens: " + _outgoingTokens.keySet());
|
||||
return;
|
||||
}
|
||||
|
||||
//msg ID -> NodeInfo -> Dest -> Hash
|
||||
//verify with token -> nid or dest or hash ????
|
||||
NodeInfo nInfo = _knownNodes.get(nid);
|
||||
if (nInfo == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Unknown node in announce_peer for: " + nid);
|
||||
return;
|
||||
}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Rcvd announce from: " + nInfo + " for: " + ih);
|
||||
|
||||
_tracker.announce(ih, nInfo.getHash());
|
||||
// the reply for an announce is the same as the reply for a ping
|
||||
@ -1020,9 +1079,10 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
InfoHash ih = (InfoHash) waiter.getSentObject();
|
||||
if (btok != null && ih != null) {
|
||||
byte[] tok = btok.getBytes();
|
||||
_incomingTokens.put(new TokenKey(nInfo.getNID(), ih), new Token(_context, tok));
|
||||
Token token = new Token(_context, tok);
|
||||
_incomingTokens.put(nInfo.getNID(), token);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Got token, must be a response to get_peers");
|
||||
_log.info("Got token: " + token + ", must be a response to get_peers");
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("No token and saved infohash, must be a response to find_node");
|
||||
@ -1042,7 +1102,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
waiter.gotReply(REPLY_PEERS, rlist);
|
||||
} else {
|
||||
// a ping response or an announce peer response
|
||||
receivePong(nInfo);
|
||||
byte[] nid = response.get("id").getBytes();
|
||||
receivePong(nInfo, nid);
|
||||
waiter.gotReply(REPLY_PONG, null);
|
||||
}
|
||||
}
|
||||
@ -1085,8 +1146,18 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
return rv;
|
||||
}
|
||||
|
||||
/** does nothing, but node was already added to our DHT */
|
||||
private void receivePong(NodeInfo nInfo) {
|
||||
/**
|
||||
* If node info was previously created with the dummy NID,
|
||||
* replace it with the received NID.
|
||||
*/
|
||||
private void receivePong(NodeInfo nInfo, byte[] nid) {
|
||||
if (nInfo.getNID().equals(_fakeNID)) {
|
||||
NodeInfo newInfo = new NodeInfo(new NID(nid), nInfo.getHash(), nInfo.getPort());
|
||||
Destination dest = nInfo.getDestination();
|
||||
if (dest != null)
|
||||
newInfo.setDestination(dest);
|
||||
heardFrom(newInfo);
|
||||
}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Rcvd pong from: " + nInfo);
|
||||
}
|
||||
@ -1121,23 +1192,23 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
|
||||
/**
|
||||
* Either wait on this object with a timeout, or use non-null Runnables.
|
||||
* Any sent data to be rememberd may be stored by setSentObject().
|
||||
* Any sent data to be remembered may be stored by setSentObject().
|
||||
* Reply object may be in getReplyObject().
|
||||
* @param onReply must be fast, otherwise set to null and wait on this
|
||||
* @param onTimeout must be fast, otherwise set to null and wait on this
|
||||
*/
|
||||
public ReplyWaiter(MsgID mID, NodeInfo nInfo, Runnable onReply, Runnable onTimeout) {
|
||||
super(nInfo.getData());
|
||||
super(nInfo.getNID(), nInfo.getHash(), nInfo.getPort());
|
||||
Destination dest = nInfo.getDestination();
|
||||
if (dest != null)
|
||||
setDestination(dest);
|
||||
this.mid = mID;
|
||||
this.onReply = onReply;
|
||||
this.onTimeout = onTimeout;
|
||||
if (onTimeout != null)
|
||||
this.event = new Event();
|
||||
else
|
||||
this.event = null;
|
||||
this.event = new Event();
|
||||
}
|
||||
|
||||
/** only used for announce, to save the Info Hash */
|
||||
/** only used for get_peers, to save the Info Hash */
|
||||
public void setSentObject(Object o) {
|
||||
sentObject = o;
|
||||
}
|
||||
@ -1174,10 +1245,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
public void gotReply(int code, Object o) {
|
||||
replyCode = code;
|
||||
replyObject = o;
|
||||
if (event != null)
|
||||
event.cancel();
|
||||
event.cancel();
|
||||
_sentQueries.remove(mid);
|
||||
heardFrom(this);
|
||||
// if it is fake, heardFrom is called by receivePong()
|
||||
if (!getNID().equals(_fakeNID))
|
||||
heardFrom(this);
|
||||
if (onReply != null)
|
||||
onReply.run();
|
||||
synchronized(this) {
|
||||
@ -1266,7 +1338,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
|
||||
|
||||
public void timeReached() {
|
||||
long now = _context.clock().now();
|
||||
for (Iterator<Token> iter = _outgoingTokens.values().iterator(); iter.hasNext(); ) {
|
||||
for (Iterator<Token> iter = _outgoingTokens.keySet().iterator(); iter.hasNext(); ) {
|
||||
Token tok = iter.next();
|
||||
if (tok.lastSeen() < now - MAX_TOKEN_AGE)
|
||||
iter.remove();
|
||||
|
@ -15,6 +15,7 @@ import net.i2p.data.SimpleDataStructure;
|
||||
*
|
||||
* Things are a little tricky in KRPC since we exchange Hashes and don't
|
||||
* always have the Destination.
|
||||
* The conpact info is immutable. The Destination may be added later.
|
||||
*
|
||||
* @since 0.8.4
|
||||
* @author zzz
|
||||
@ -45,7 +46,6 @@ public class NodeInfo extends SimpleDataStructure {
|
||||
|
||||
/**
|
||||
* No Destination yet available
|
||||
* @deprecated unused
|
||||
* @throws IllegalArgumentException
|
||||
*/
|
||||
public NodeInfo(NID nID, Hash hash, int port) {
|
||||
@ -138,13 +138,11 @@ public class NodeInfo extends SimpleDataStructure {
|
||||
* @throws IllegalArgumentException if hash of dest doesn't match previous hash
|
||||
*/
|
||||
public void setDestination(Destination dest) throws IllegalArgumentException {
|
||||
if (this.dest != null)
|
||||
return;
|
||||
if (!dest.calculateHash().equals(this.hash))
|
||||
throw new IllegalArgumentException("Hash mismatch, was: " + this.hash + " new: " + dest.calculateHash());
|
||||
if (this.dest == null)
|
||||
this.dest = dest;
|
||||
else if (!this.dest.equals(dest))
|
||||
throw new IllegalArgumentException("Dest mismatch, was: " + this.dest+ " new: " + dest);
|
||||
// else keep the old to reduce object churn
|
||||
this.dest = dest;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
|
@ -3,6 +3,8 @@ package org.klomp.snark.dht;
|
||||
* GPLv2
|
||||
*/
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.ByteArray;
|
||||
import net.i2p.data.DataHelper;
|
||||
@ -24,6 +26,7 @@ public class Token extends ByteArray {
|
||||
byte[] data = new byte[MY_TOK_LEN];
|
||||
ctx.random().nextBytes(data);
|
||||
setData(data);
|
||||
setValid(MY_TOK_LEN);
|
||||
lastSeen = ctx.clock().now();
|
||||
}
|
||||
|
||||
@ -33,7 +36,36 @@ public class Token extends ByteArray {
|
||||
lastSeen = ctx.clock().now();
|
||||
}
|
||||
|
||||
/** incoming - for lookup only, not storage, lastSeen is 0 */
|
||||
public Token(byte[] data) {
|
||||
super(data);
|
||||
lastSeen = 0;
|
||||
}
|
||||
|
||||
public long lastSeen() {
|
||||
return lastSeen;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder buf = new StringBuilder(64);
|
||||
buf.append("[Token: ");
|
||||
byte[] bs = getData();
|
||||
if (bs.length == 0) {
|
||||
buf.append("0 bytes");
|
||||
} else {
|
||||
buf.append(bs.length).append(" bytes: 0x");
|
||||
// backwards, but the same way BEValue does it
|
||||
for (int i = 0; i < bs.length; i++) {
|
||||
int b = bs[i] & 0xff;
|
||||
if (b < 16)
|
||||
buf.append('0');
|
||||
buf.append(Integer.toHexString(b));
|
||||
}
|
||||
}
|
||||
if (lastSeen > 0)
|
||||
buf.append(" created ").append((new Date(lastSeen)).toString());
|
||||
buf.append(']');
|
||||
return buf.toString();
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user