* netDb searchReply and lookup messages now contain H(peer), not the peer's full RouterInfo

(making a searchReply message ~100 bytes, down from ~30KB, and the lookup message ~64 bytes, down from ~10KB)
* when we get the netDb searchReply or lookup message referencing someone we don't know,
we fire off a lookup for them
* reduced some excessive padding
* dropped the DbSearchReplyMessageHandler, since it shouldn't be used (all search replies
should be handled by a MessageSelector built by the original search message)
* removed some oddball constructors from the SendMessageDirectJob and SendTunnelMessageJob (always must specify a timeout)
* refactored SendTunnelMessageJob main handler method into smaller logical methods
This commit is contained in:
jrandom
2004-07-27 17:34:36 +00:00
committed by zzz
parent 399865e6c8
commit 52b1c0a926
16 changed files with 267 additions and 327 deletions

View File

@ -33,7 +33,7 @@ public class DatabaseLookupMessage extends I2NPMessageImpl {
private final static Log _log = new Log(DatabaseLookupMessage.class); private final static Log _log = new Log(DatabaseLookupMessage.class);
public final static int MESSAGE_TYPE = 2; public final static int MESSAGE_TYPE = 2;
private Hash _key; private Hash _key;
private RouterInfo _from; private Hash _fromHash;
private TunnelId _replyTunnel; private TunnelId _replyTunnel;
private Set _dontIncludePeers; private Set _dontIncludePeers;
@ -51,11 +51,11 @@ public class DatabaseLookupMessage extends I2NPMessageImpl {
public void setSearchKey(Hash key) { _key = key; } public void setSearchKey(Hash key) { _key = key; }
/** /**
* Contains the current router info of the router who requested this lookup * Contains the router who requested this lookup
* *
*/ */
public RouterInfo getFrom() { return _from; } public Hash getFrom() { return _fromHash; }
public void setFrom(RouterInfo from) { _from = from; } public void setFrom(Hash from) { _fromHash = from; }
/** /**
* Contains the tunnel ID a reply should be sent to * Contains the tunnel ID a reply should be sent to
@ -82,8 +82,8 @@ public class DatabaseLookupMessage extends I2NPMessageImpl {
try { try {
_key = new Hash(); _key = new Hash();
_key.readBytes(in); _key.readBytes(in);
_from = new RouterInfo(); _fromHash = new Hash();
_from.readBytes(in); _fromHash.readBytes(in);
Boolean val = DataHelper.readBoolean(in); Boolean val = DataHelper.readBoolean(in);
if (val == null) if (val == null)
throw new I2NPMessageException("Tunnel must be explicitly specified (or not)"); throw new I2NPMessageException("Tunnel must be explicitly specified (or not)");
@ -109,12 +109,12 @@ public class DatabaseLookupMessage extends I2NPMessageImpl {
protected byte[] writeMessage() throws I2NPMessageException, IOException { protected byte[] writeMessage() throws I2NPMessageException, IOException {
if (_key == null) throw new I2NPMessageException("Key being searched for not specified"); if (_key == null) throw new I2NPMessageException("Key being searched for not specified");
if (_from == null) throw new I2NPMessageException("From address not specified"); if (_fromHash == null) throw new I2NPMessageException("From address not specified");
ByteArrayOutputStream os = new ByteArrayOutputStream(32); ByteArrayOutputStream os = new ByteArrayOutputStream(32);
try { try {
_key.writeBytes(os); _key.writeBytes(os);
_from.writeBytes(os); _fromHash.writeBytes(os);
if (_replyTunnel != null) { if (_replyTunnel != null) {
DataHelper.writeBoolean(os, Boolean.TRUE); DataHelper.writeBoolean(os, Boolean.TRUE);
_replyTunnel.writeBytes(os); _replyTunnel.writeBytes(os);

View File

@ -34,7 +34,7 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl {
private final static Log _log = new Log(DatabaseSearchReplyMessage.class); private final static Log _log = new Log(DatabaseSearchReplyMessage.class);
public final static int MESSAGE_TYPE = 3; public final static int MESSAGE_TYPE = 3;
private Hash _key; private Hash _key;
private List _routerInfoStructures; private List _peerHashes;
private Hash _from; private Hash _from;
public DatabaseSearchReplyMessage(I2PAppContext context) { public DatabaseSearchReplyMessage(I2PAppContext context) {
@ -42,7 +42,7 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl {
_context.statManager().createRateStat("netDb.searchReplyMessageSend", "How many search reply messages we send", "Network Database", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("netDb.searchReplyMessageSend", "How many search reply messages we send", "Network Database", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("netDb.searchReplyMessageReceive", "How many search reply messages we receive", "Network Database", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("netDb.searchReplyMessageReceive", "How many search reply messages we receive", "Network Database", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 });
setSearchKey(null); setSearchKey(null);
_routerInfoStructures = new ArrayList(); _peerHashes = new ArrayList(3);
setFromHash(null); setFromHash(null);
} }
@ -52,10 +52,10 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl {
public Hash getSearchKey() { return _key; } public Hash getSearchKey() { return _key; }
public void setSearchKey(Hash key) { _key = key; } public void setSearchKey(Hash key) { _key = key; }
public int getNumReplies() { return _routerInfoStructures.size(); } public int getNumReplies() { return _peerHashes.size(); }
public RouterInfo getReply(int index) { return (RouterInfo)_routerInfoStructures.get(index); } public Hash getReply(int index) { return (Hash)_peerHashes.get(index); }
public void addReply(RouterInfo info) { _routerInfoStructures.add(info); } public void addReply(Hash peer) { _peerHashes.add(peer); }
public void addReplies(Collection replies) { _routerInfoStructures.addAll(replies); } //public void addReplies(Collection replies) { _peerHashes.addAll(replies); }
public Hash getFromHash() { return _from; } public Hash getFromHash() { return _from; }
public void setFromHash(Hash from) { _from = from; } public void setFromHash(Hash from) { _from = from; }
@ -66,27 +66,18 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl {
_key = new Hash(); _key = new Hash();
_key.readBytes(in); _key.readBytes(in);
int compressedLength = (int)DataHelper.readLong(in, 2); int num = (int)DataHelper.readLong(in, 1);
byte compressedData[] = new byte[compressedLength]; _peerHashes.clear();
int read = DataHelper.read(in, compressedData);
if (read != compressedLength)
throw new IOException("Not enough data to decompress");
byte decompressedData[] = DataHelper.decompress(compressedData);
if (decompressedData == null)
throw new I2NPMessageException("Could not decompress the " + compressedLength + "bytes of data");
ByteArrayInputStream bais = new ByteArrayInputStream(decompressedData);
int num = (int)DataHelper.readLong(bais, 1);
_routerInfoStructures.clear();
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
RouterInfo info = new RouterInfo(); Hash peer = new Hash();
info.readBytes(bais); peer.readBytes(in);
addReply(info); addReply(peer);
} }
_from = new Hash(); _from = new Hash();
_from.readBytes(in); _from.readBytes(in);
_context.statManager().addRateData("netDb.searchReplyMessageReceive", compressedLength + 64, 1); _context.statManager().addRateData("netDb.searchReplyMessageReceive", num*32 + 64, 1);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
throw new I2NPMessageException("Unable to load the message data", dfe); throw new I2NPMessageException("Unable to load the message data", dfe);
} }
@ -95,10 +86,8 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl {
protected byte[] writeMessage() throws I2NPMessageException, IOException { protected byte[] writeMessage() throws I2NPMessageException, IOException {
if (_key == null) if (_key == null)
throw new I2NPMessageException("Key in reply to not specified"); throw new I2NPMessageException("Key in reply to not specified");
if (_routerInfoStructures == null) if (_peerHashes == null)
throw new I2NPMessageException("RouterInfo replies are null"); throw new I2NPMessageException("Peer replies are null");
if (_routerInfoStructures.size() <= 0)
throw new I2NPMessageException("No replies specified in SearchReply! Always include oneself!");
if (_from == null) if (_from == null)
throw new I2NPMessageException("No 'from' address specified!"); throw new I2NPMessageException("No 'from' address specified!");
@ -107,16 +96,12 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl {
try { try {
_key.writeBytes(os); _key.writeBytes(os);
ByteArrayOutputStream baos = new ByteArrayOutputStream(512); DataHelper.writeLong(os, 1, _peerHashes.size());
DataHelper.writeLong(baos, 1, _routerInfoStructures.size());
for (int i = 0; i < getNumReplies(); i++) { for (int i = 0; i < getNumReplies(); i++) {
RouterInfo info = getReply(i); Hash peer = getReply(i);
info.writeBytes(baos); peer.writeBytes(os);
} }
byte compressed[] = DataHelper.compress(baos.toByteArray());
DataHelper.writeLong(os, 2, compressed.length);
os.write(compressed);
_from.writeBytes(os); _from.writeBytes(os);
rv = os.toByteArray(); rv = os.toByteArray();
@ -134,7 +119,7 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl {
DatabaseSearchReplyMessage msg = (DatabaseSearchReplyMessage)object; DatabaseSearchReplyMessage msg = (DatabaseSearchReplyMessage)object;
return DataHelper.eq(getSearchKey(),msg.getSearchKey()) && return DataHelper.eq(getSearchKey(),msg.getSearchKey()) &&
DataHelper.eq(getFromHash(),msg.getFromHash()) && DataHelper.eq(getFromHash(),msg.getFromHash()) &&
DataHelper.eq(_routerInfoStructures,msg._routerInfoStructures); DataHelper.eq(_peerHashes,msg._peerHashes);
} else { } else {
return false; return false;
} }
@ -143,7 +128,7 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl {
public int hashCode() { public int hashCode() {
return DataHelper.hashCode(getSearchKey()) + return DataHelper.hashCode(getSearchKey()) +
DataHelper.hashCode(getFromHash()) + DataHelper.hashCode(getFromHash()) +
DataHelper.hashCode(_routerInfoStructures); DataHelper.hashCode(_peerHashes);
} }
public String toString() { public String toString() {

View File

@ -42,7 +42,7 @@ public class GarlicMessage extends I2NPMessageImpl {
_data = new byte[(int)len]; _data = new byte[(int)len];
int read = read(in, _data); int read = read(in, _data);
if (read != len) if (read != len)
throw new I2NPMessageException("Incorrect size read"); throw new I2NPMessageException("Incorrect size read [" + read + " read, expected " + len + "]");
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
throw new I2NPMessageException("Unable to load the message data", dfe); throw new I2NPMessageException("Unable to load the message data", dfe);
} }

View File

@ -277,11 +277,13 @@ public class MessageHistory {
* @param id tunnel ID we received a message for * @param id tunnel ID we received a message for
* @param from peer that sent us this message (if known) * @param from peer that sent us this message (if known)
*/ */
public void droppedTunnelMessage(TunnelId id, Hash from) { public void droppedTunnelMessage(TunnelId id, long msgId, Date expiration, Hash from) {
if (!_doLog) return; if (!_doLog) return;
StringBuffer buf = new StringBuffer(128); StringBuffer buf = new StringBuffer(128);
buf.append(getPrefix()); buf.append(getPrefix());
buf.append("dropped message for unknown tunnel [").append(id.getTunnelId()).append("] from [").append(getName(from)).append("]"); buf.append("dropped message ").append(msgId).append(" for unknown tunnel [").append(id.getTunnelId());
buf.append("] from [").append(getName(from)).append("]").append(" expiring on ");
buf.append(getTime(expiration));
addEntry(buf.toString()); addEntry(buf.toString());
} }

View File

@ -54,7 +54,8 @@ public class GarlicMessageBuilder {
noteWrap(ctx, msg, config); noteWrap(ctx, msg, config);
log.info("Encrypted with public key " + key + " to expire on " + new Date(config.getExpiration())); if (log.shouldLog(Log.INFO))
log.info("Encrypted with public key " + key + " to expire on " + new Date(config.getExpiration()));
byte cloveSet[] = buildCloveSet(ctx, config); byte cloveSet[] = buildCloveSet(ctx, config);
@ -64,26 +65,34 @@ public class GarlicMessageBuilder {
wrappedKey.setData(curKey.getData()); wrappedKey.setData(curKey.getData());
int availTags = ctx.sessionKeyManager().getAvailableTags(key, curKey); int availTags = ctx.sessionKeyManager().getAvailableTags(key, curKey);
log.debug("Available tags for encryption to " + key + ": " + availTags); if (log.shouldLog(Log.DEBUG))
log.debug("Available tags for encryption to " + key + ": " + availTags);
if (availTags < 10) { // arbitrary threshold if (availTags < 10) { // arbitrary threshold
for (int i = 0; i < 20; i++) for (int i = 0; i < 20; i++)
wrappedTags.add(new SessionTag(true)); wrappedTags.add(new SessionTag(true));
log.info("Less than 10 tags are available (" + availTags + "), so we're including 20 more"); if (log.shouldLog(Log.INFO))
log.info("Less than 10 tags are available (" + availTags + "), so we're including 20 more");
} else if (ctx.sessionKeyManager().getAvailableTimeLeft(key, curKey) < 30*1000) { } else if (ctx.sessionKeyManager().getAvailableTimeLeft(key, curKey) < 30*1000) {
// if we have > 10 tags, but they expire in under 30 seconds, we want more // if we have > 10 tags, but they expire in under 30 seconds, we want more
for (int i = 0; i < 20; i++) for (int i = 0; i < 20; i++)
wrappedTags.add(new SessionTag(true)); wrappedTags.add(new SessionTag(true));
log.info("Tags are almost expired, adding 20 new ones"); if (log.shouldLog(Log.INFO))
log.info("Tags are almost expired, adding 20 new ones");
} else { } else {
// always tack on at least one more - not necessary. // always tack on at least one more - not necessary.
//wrappedTags.add(new SessionTag(true)); //wrappedTags.add(new SessionTag(true));
} }
SessionTag curTag = ctx.sessionKeyManager().consumeNextAvailableTag(key, curKey); SessionTag curTag = ctx.sessionKeyManager().consumeNextAvailableTag(key, curKey);
byte encData[] = ctx.elGamalAESEngine().encrypt(cloveSet, key, curKey, wrappedTags, curTag, 1024); byte encData[] = ctx.elGamalAESEngine().encrypt(cloveSet, key, curKey, wrappedTags, curTag, 256);
msg.setData(encData); msg.setData(encData);
Date exp = new Date(config.getExpiration()); Date exp = new Date(config.getExpiration());
msg.setMessageExpiration(exp); msg.setMessageExpiration(exp);
if (log.shouldLog(Log.WARN))
log.warn("CloveSet size for message " + msg.getUniqueId() + " is " + cloveSet.length
+ " and encrypted message data is " + encData.length);
return msg; return msg;
} }

View File

@ -29,8 +29,11 @@ import net.i2p.data.i2np.TunnelMessage;
import net.i2p.data.i2np.TunnelVerificationStructure; import net.i2p.data.i2np.TunnelVerificationStructure;
import net.i2p.router.ClientMessage; import net.i2p.router.ClientMessage;
import net.i2p.router.InNetMessage; import net.i2p.router.InNetMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
import net.i2p.router.MessageReceptionInfo; import net.i2p.router.MessageReceptionInfo;
import net.i2p.router.MessageSelector;
import net.i2p.router.ReplyJob;
import net.i2p.router.Router; import net.i2p.router.Router;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo; import net.i2p.router.TunnelInfo;
@ -43,7 +46,7 @@ public class HandleTunnelMessageJob extends JobImpl {
private Hash _fromHash; private Hash _fromHash;
private I2NPMessageHandler _handler; private I2NPMessageHandler _handler;
private final static long FORWARD_TIMEOUT = 60*1000; private final static int FORWARD_TIMEOUT = 60*1000;
private final static int FORWARD_PRIORITY = 400; private final static int FORWARD_PRIORITY = 400;
public HandleTunnelMessageJob(RouterContext ctx, TunnelMessage msg, RouterIdentity from, Hash fromHash) { public HandleTunnelMessageJob(RouterContext ctx, TunnelMessage msg, RouterIdentity from, Hash fromHash) {
@ -60,10 +63,7 @@ public class HandleTunnelMessageJob extends JobImpl {
_fromHash = fromHash; _fromHash = fromHash;
} }
public String getName() { return "Handle Inbound Tunnel Message"; } private TunnelInfo validate(TunnelId id) {
public void runJob() {
TunnelId id = _message.getTunnelId();
long excessLag = getContext().clock().now() - _message.getMessageExpiration().getTime(); long excessLag = getContext().clock().now() - _message.getMessageExpiration().getTime();
if (excessLag > Router.CLOCK_FUDGE_FACTOR) { if (excessLag > Router.CLOCK_FUDGE_FACTOR) {
// expired while on the queue // expired while on the queue
@ -76,7 +76,7 @@ public class HandleTunnelMessageJob extends JobImpl {
getContext().messageHistory().messageProcessingError(_message.getUniqueId(), getContext().messageHistory().messageProcessingError(_message.getUniqueId(),
TunnelMessage.class.getName(), TunnelMessage.class.getName(),
"tunnel message expired on the queue"); "tunnel message expired on the queue");
return; return null;
} else if (excessLag > 0) { } else if (excessLag > 0) {
// almost expired while on the queue // almost expired while on the queue
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -84,6 +84,8 @@ public class HandleTunnelMessageJob extends JobImpl {
+ id.getTunnelId() + " expiring " + id.getTunnelId() + " expiring "
+ excessLag + excessLag
+ "ms ago"); + "ms ago");
} else {
// not expired
} }
TunnelInfo info = getContext().tunnelManager().getTunnelInfo(id); TunnelInfo info = getContext().tunnelManager().getTunnelInfo(id);
@ -92,16 +94,19 @@ public class HandleTunnelMessageJob extends JobImpl {
Hash from = _fromHash; Hash from = _fromHash;
if (_from != null) if (_from != null)
from = _from.getHash(); from = _from.getHash();
getContext().messageHistory().droppedTunnelMessage(id, from); getContext().messageHistory().droppedTunnelMessage(id, _message.getUniqueId(),
_message.getMessageExpiration(),
from);
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Received a message for an unknown tunnel [" + id.getTunnelId() _log.error("Received a message for an unknown tunnel [" + id.getTunnelId()
+ "], dropping it: " + _message, getAddedBy()); + "], dropping it: " + _message, getAddedBy());
long timeRemaining = _message.getMessageExpiration().getTime() - getContext().clock().now(); long timeRemaining = _message.getMessageExpiration().getTime() - getContext().clock().now();
getContext().statManager().addRateData("tunnel.unknownTunnelTimeLeft", timeRemaining, 0); getContext().statManager().addRateData("tunnel.unknownTunnelTimeLeft", timeRemaining, 0);
return; long lag = getTiming().getActualStart() - getTiming().getStartAfter();
if (_log.shouldLog(Log.ERROR))
_log.error("Lag processing a dropped tunnel message: " + lag);
return null;
} }
info.messageProcessed();
info = getUs(info); info = getUs(info);
if (info == null) { if (info == null) {
@ -109,94 +114,123 @@ public class HandleTunnelMessageJob extends JobImpl {
_log.error("We are not part of a known tunnel?? wtf! drop.", getAddedBy()); _log.error("We are not part of a known tunnel?? wtf! drop.", getAddedBy());
long timeRemaining = _message.getMessageExpiration().getTime() - getContext().clock().now(); long timeRemaining = _message.getMessageExpiration().getTime() - getContext().clock().now();
getContext().statManager().addRateData("tunnel.unknownTunnelTimeLeft", timeRemaining, 0); getContext().statManager().addRateData("tunnel.unknownTunnelTimeLeft", timeRemaining, 0);
return; return null;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel message received for tunnel: \n" + info);
} }
//if ( (_message.getVerificationStructure() == null) && (info.getSigningKey() != null) ) { return info;
if (_message.getVerificationStructure() == null) { }
if (info.getSigningKey() != null) {
if (info.getNextHop() != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("We are the gateway to tunnel " + id.getTunnelId());
byte data[] = _message.getData();
I2NPMessage msg = getBody(data);
getContext().jobQueue().addJob(new HandleGatewayMessageJob(msg, info, data.length));
return;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("We are the gateway and the endpoint for tunnel " + id.getTunnelId());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Process locally");
if (info.getDestination() != null) {
if (!getContext().clientManager().isLocal(info.getDestination())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received a message on a tunnel allocated to a client that has disconnected - dropping it!");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Dropping message for disconnected client: " + _message);
getContext().messageHistory().droppedOtherMessage(_message); /**
getContext().messageHistory().messageProcessingError(_message.getUniqueId(), * The current router may be the gateway to the tunnel since there is no
_message.getClass().getName(), * verification data, or it could be a b0rked message.
"Disconnected client"); *
return; */
} private void receiveUnverified(TunnelInfo info) {
} if (info.getSigningKey() != null) {
if (info.getNextHop() != null) {
I2NPMessage body = getBody(_message.getData()); if (_log.shouldLog(Log.DEBUG))
if (body != null) { _log.debug("We are the gateway to tunnel " + info.getTunnelId().getTunnelId());
getContext().jobQueue().addJob(new HandleLocallyJob(body, info)); byte data[] = _message.getData();
return; I2NPMessage msg = getBody(data);
} else { getContext().jobQueue().addJob(new HandleGatewayMessageJob(msg, info, data.length));
if (_log.shouldLog(Log.ERROR)) return;
_log.error("Body is null! content of message.getData() = [" + } else {
DataHelper.toString(_message.getData()) + "]", getAddedBy()); if (_log.shouldLog(Log.DEBUG))
_log.debug("We are the gateway and the endpoint for tunnel " + info.getTunnelId().getTunnelId());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Process locally");
if (info.getDestination() != null) {
if (!getContext().clientManager().isLocal(info.getDestination())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received a message on a tunnel allocated to a client that has disconnected - dropping it!");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Message that failed: " + _message, getAddedBy()); _log.debug("Dropping message for disconnected client: " + _message);
getContext().messageHistory().droppedOtherMessage(_message);
getContext().messageHistory().messageProcessingError(_message.getUniqueId(),
_message.getClass().getName(),
"Disconnected client");
return; return;
} }
} }
} else {
if (_log.shouldLog(Log.ERROR)) I2NPMessage body = getBody(_message.getData());
_log.error("Received a message that we are not the gateway for on tunnel " if (body != null) {
+ id.getTunnelId() + " without a verification structure: " + _message, getAddedBy()); getContext().jobQueue().addJob(new HandleLocallyJob(body, info));
return; return;
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("Body is null! content of message.getData() = [" +
DataHelper.toString(_message.getData()) + "]", getAddedBy());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Message that failed: " + _message, getAddedBy());
return;
}
} }
} else { } else {
// participant if (_log.shouldLog(Log.ERROR))
TunnelVerificationStructure struct = _message.getVerificationStructure(); _log.error("Received a message that we are not the gateway for on tunnel "
boolean ok = struct.verifySignature(getContext(), info.getVerificationKey().getKey()); + info.getTunnelId().getTunnelId()
if (!ok) { + " without a verification structure: " + _message, getAddedBy());
if (_log.shouldLog(Log.WARN)) return;
_log.warn("Failed tunnel verification! Spoofing / tagging attack? " + _message, getAddedBy()); }
return; }
} else {
if (info.getNextHop() != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Message for tunnel " + id.getTunnelId()
+ " received where we're not the gateway and there are remaining hops, so forward it on to "
+ info.getNextHop().toBase64() + " via SendTunnelMessageJob");
getContext().statManager().addRateData("tunnel.relayMessageSize", /**
* We may be a participant in the tunnel, as there is a verification structure.
*
*/
private void receiveParticipant(TunnelInfo info) {
// participant
TunnelVerificationStructure struct = _message.getVerificationStructure();
boolean ok = struct.verifySignature(getContext(), info.getVerificationKey().getKey());
if (!ok) {
if (_log.shouldLog(Log.WARN))
_log.warn("Failed tunnel verification! Spoofing / tagging attack? " + _message, getAddedBy());
return;
} else {
if (info.getNextHop() != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Message for tunnel " + info.getTunnelId().getTunnelId()
+ " received where we're not the gateway and there are remaining hops, so forward it on to "
+ info.getNextHop().toBase64() + " via SendTunnelMessageJob");
getContext().statManager().addRateData("tunnel.relayMessageSize",
_message.getData().length, 0); _message.getData().length, 0);
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), _message, SendMessageDirectJob j = new SendMessageDirectJob(getContext(), _message,
info.getNextHop(), info.getNextHop(),
getContext().clock().now() + FORWARD_TIMEOUT, (int)(_message.getMessageExpiration().getTime() - getContext().clock().now()),
FORWARD_PRIORITY)); FORWARD_PRIORITY);
return; getContext().jobQueue().addJob(j);
} else { return;
if (_log.shouldLog(Log.DEBUG)) } else {
_log.debug("No more hops, unwrap and follow the instructions"); if (_log.shouldLog(Log.DEBUG))
getContext().jobQueue().addJob(new HandleEndpointJob(info)); _log.debug("No more hops, unwrap and follow the instructions");
return; getContext().jobQueue().addJob(new HandleEndpointJob(info));
} return;
} }
} }
} }
public String getName() { return "Handle Inbound Tunnel Message"; }
public void runJob() {
TunnelId id = _message.getTunnelId();
TunnelInfo info = validate(id);
if (info == null)
return;
info.messageProcessed();
//if ( (_message.getVerificationStructure() == null) && (info.getSigningKey() != null) ) {
if (_message.getVerificationStructure() == null) {
receiveUnverified(info);
} else {
receiveParticipant(info);
}
}
private void processLocally(TunnelInfo ourPlace) { private void processLocally(TunnelInfo ourPlace) {
if (ourPlace.getEncryptionKey() == null) { if (ourPlace.getEncryptionKey() == null) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
@ -288,8 +322,7 @@ public class HandleTunnelMessageJob extends JobImpl {
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
body.writeBytes(baos); body.writeBytes(baos);
msg.setData(baos.toByteArray()); msg.setData(baos.toByteArray());
long exp = getContext().clock().now() + FORWARD_TIMEOUT; getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, FORWARD_TIMEOUT, FORWARD_PRIORITY));
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, exp, FORWARD_PRIORITY));
String bodyType = body.getClass().getName(); String bodyType = body.getClass().getName();
getContext().messageHistory().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); getContext().messageHistory().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());
@ -306,8 +339,7 @@ public class HandleTunnelMessageJob extends JobImpl {
// TODO: we may want to send it via a tunnel later on, but for now, direct will do. // TODO: we may want to send it via a tunnel later on, but for now, direct will do.
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending on to requested router " + router.toBase64()); _log.debug("Sending on to requested router " + router.toBase64());
long exp = getContext().clock().now() + FORWARD_TIMEOUT; getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), body, router, FORWARD_TIMEOUT, FORWARD_PRIORITY));
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), body, router, exp, FORWARD_PRIORITY));
} }
private void sendToLocal(I2NPMessage body) { private void sendToLocal(I2NPMessage body) {
@ -467,11 +499,24 @@ public class HandleTunnelMessageJob extends JobImpl {
public void runJob() { public void runJob() {
RouterContext ctx = HandleTunnelMessageJob.this.getContext(); RouterContext ctx = HandleTunnelMessageJob.this.getContext();
if (_body != null) { if (_body != null) {
long expiration = _body.getMessageExpiration().getTime();
long timeout = expiration - ctx.clock().now();
ctx.statManager().addRateData("tunnel.gatewayMessageSize", _length, 0); ctx.statManager().addRateData("tunnel.gatewayMessageSize", _length, 0);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Message for tunnel " + _info.getTunnelId() + " received at the gateway (us), and since its > 0 length, forward the " _log.info("Message for tunnel " + _info.getTunnelId()
+ _body.getClass().getName() + " message on to " + _info.getNextHop().toBase64() + " via SendTunnelMessageJob"); + " received at the gateway (us), and since its > 0 length, forward the "
ctx.jobQueue().addJob(new SendTunnelMessageJob(ctx, _body, _info.getTunnelId(), null, null, null, null, FORWARD_TIMEOUT, FORWARD_PRIORITY)); + _body.getClass().getName() + " message on to "
+ _info.getNextHop().toBase64() + " via SendTunnelMessageJob expiring in "
+ timeout + "ms");
MessageSelector selector = null;
Job onFailure = null;
Job onSuccess = null;
ReplyJob onReply = null;
Hash targetRouter = null;
TunnelId targetTunnelId = null;
SendTunnelMessageJob j = new SendTunnelMessageJob(ctx, _body, _info.getTunnelId(), targetRouter, targetTunnelId, onSuccess, onReply, onFailure, selector, timeout, FORWARD_PRIORITY);
ctx.jobQueue().addJob(j);
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Body of the message for the tunnel could not be parsed"); _log.warn("Body of the message for the tunnel could not be parsed");

View File

@ -123,7 +123,8 @@ class MessageHandler {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Handle " + message.getClass().getName() + " to a remote router " _log.info("Handle " + message.getClass().getName() + " to a remote router "
+ instructions.getRouter().toBase64() + " - fire a SendMessageDirectJob"); + instructions.getRouter().toBase64() + " - fire a SendMessageDirectJob");
SendMessageDirectJob j = new SendMessageDirectJob(_context, message, instructions.getRouter(), expiration, priority); int timeoutMs = (int)(expiration-_context.clock().now());
SendMessageDirectJob j = new SendMessageDirectJob(_context, message, instructions.getRouter(), timeoutMs, priority);
_context.jobQueue().addJob(j); _context.jobQueue().addJob(j);
} }
@ -160,7 +161,7 @@ class MessageHandler {
_log.debug("Placing message of type " + message.getClass().getName() _log.debug("Placing message of type " + message.getClass().getName()
+ " into the new tunnel message bound for " + tunnelId.getTunnelId() + " into the new tunnel message bound for " + tunnelId.getTunnelId()
+ " on " + to.toBase64()); + " on " + to.toBase64());
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, expiration, priority)); _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, (int)timeoutMs, priority));
String bodyType = message.getClass().getName(); String bodyType = message.getClass().getName();
_context.messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); _context.messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());

View File

@ -37,24 +37,20 @@ public class SendMessageDirectJob extends JobImpl {
private boolean _sent; private boolean _sent;
private long _searchOn; private long _searchOn;
private final static long DEFAULT_TIMEOUT = 60*1000; public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, int timeoutMs, int priority) {
this(ctx, message, toPeer, null, null, null, null, timeoutMs, priority);
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, long expiration, int priority) {
this(ctx, message, toPeer, null, null, null, null, expiration, priority);
} }
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, int priority) { public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) {
this(ctx, message, toPeer, DEFAULT_TIMEOUT+ctx.clock().now(), priority); this(ctx, message, toPeer, null, onSuccess, onFail, selector, timeoutMs, priority);
} }
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess, Job onFail, MessageSelector selector, long expiration, int priority) { public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, Job onSend, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) {
this(ctx, message, toPeer, null, onSuccess, onFail, selector, expiration, priority);
}
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, Job onSend, ReplyJob onSuccess, Job onFail, MessageSelector selector, long expiration, int priority) {
super(ctx); super(ctx);
if (timeoutMs <= 0) throw new IllegalArgumentException("specify a timeout (" + timeoutMs + ")");
_log = getContext().logManager().getLog(SendMessageDirectJob.class); _log = getContext().logManager().getLog(SendMessageDirectJob.class);
_message = message; _message = message;
_targetHash = toPeer; _targetHash = toPeer;
_router = null; _router = null;
_expiration = expiration; _expiration = timeoutMs + ctx.clock().now();
_priority = priority; _priority = priority;
_searchOn = 0; _searchOn = 0;
_alreadySearched = false; _alreadySearched = false;
@ -67,27 +63,22 @@ public class SendMessageDirectJob extends JobImpl {
if (_targetHash == null) if (_targetHash == null)
throw new IllegalArgumentException("Attempt to send a message to a null peer"); throw new IllegalArgumentException("Attempt to send a message to a null peer");
_sent = false; _sent = false;
long remaining = expiration - getContext().clock().now();
if (remaining < 50*1000) {
_log.info("Sending message to expire in " + remaining + "ms containing " + message.getUniqueId() + " (a " + message.getClass().getName() + ")", new Exception("SendDirect from"));
}
} }
public String getName() { return "Send Message Direct"; } public String getName() { return "Send Message Direct"; }
public void runJob() { public void runJob() {
long now = getContext().clock().now(); long now = getContext().clock().now();
if (_expiration == 0)
_expiration = now + DEFAULT_TIMEOUT;
if (_expiration - 30*1000 < now) { if (_expiration - 30*1000 < now) {
_log.info("Soon to expire sendDirect of " + _message.getClass().getName() if (_log.shouldLog(Log.INFO))
+ " [expiring in " + (_expiration-now) + "]", getAddedBy()); _log.info("Soon to expire sendDirect of " + _message.getClass().getName()
+ " [expiring in " + (_expiration-now) + "]", getAddedBy());
} }
if (_expiration < now) { if (_expiration < now) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.ERROR))
_log.warn("Timed out sending message " + _message + " directly (expiration = " _log.error("Timed out sending message " + _message + " directly (expiration = "
+ new Date(_expiration) + ") to " + _targetHash.toBase64(), getAddedBy()); + new Date(_expiration) + ") to " + _targetHash.toBase64(), getAddedBy());
return; return;
} }
if (_router != null) { if (_router != null) {
@ -109,8 +100,8 @@ public class SendMessageDirectJob extends JobImpl {
_searchOn = getContext().clock().now(); _searchOn = getContext().clock().now();
_alreadySearched = true; _alreadySearched = true;
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.ERROR))
_log.warn("Unable to find the router to send to: " + _targetHash _log.error("Unable to find the router to send to: " + _targetHash
+ " after searching for " + (getContext().clock().now()-_searchOn) + " after searching for " + (getContext().clock().now()-_searchOn)
+ "ms, message: " + _message, getAddedBy()); + "ms, message: " + _message, getAddedBy());
} }

View File

@ -56,7 +56,8 @@ public class SendReplyMessageJob extends JobImpl {
*/ */
protected void send(I2NPMessage msg) { protected void send(I2NPMessage msg) {
_log.info("Sending reply with " + _message.getClass().getName() + " in a sourceRouteeplyMessage to " + _block.getRouter().toBase64()); _log.info("Sending reply with " + _message.getClass().getName() + " in a sourceRouteeplyMessage to " + _block.getRouter().toBase64());
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, _block.getRouter(), _priority); int timeout = (int)(msg.getMessageExpiration().getTime()-getContext().clock().now());
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, _block.getRouter(), timeout, _priority);
getContext().jobQueue().addJob(j); getContext().jobQueue().addJob(j);
} }

View File

@ -136,7 +136,8 @@ public class SendTunnelMessageJob extends JobImpl {
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg,
_destRouter, _onSend, _destRouter, _onSend,
_onReply, _onFailure, _onReply, _onFailure,
_selector, _expiration, _selector,
(int)(_expiration-getContext().clock().now()),
_priority)); _priority));
String bodyType = _message.getClass().getName(); String bodyType = _message.getClass().getName();
@ -186,10 +187,11 @@ public class SendTunnelMessageJob extends JobImpl {
} }
msg.setMessageExpiration(new Date(_expiration)); msg.setMessageExpiration(new Date(_expiration));
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg,
info.getNextHop(), _onSend, info.getNextHop(), _onSend,
_onReply, _onFailure, _onReply, _onFailure,
_selector, _expiration, _selector,
_priority)); (int)(_expiration - getContext().clock().now()),
_priority));
} }
} }
@ -234,7 +236,11 @@ public class SendTunnelMessageJob extends JobImpl {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Message for tunnel " + info.getTunnelId().getTunnelId() + " received where we're not the gateway and there are remaining hops, so forward it on to " _log.info("Message for tunnel " + info.getTunnelId().getTunnelId() + " received where we're not the gateway and there are remaining hops, so forward it on to "
+ info.getNextHop().toBase64() + " via SendMessageDirectJob"); + info.getNextHop().toBase64() + " via SendMessageDirectJob");
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, info.getNextHop(), _onSend, null, _onFailure, null, _message.getMessageExpiration().getTime(), _priority)); SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, info.getNextHop(), _onSend,
null, _onFailure, null,
(int)(_message.getMessageExpiration().getTime() - getContext().clock().now()),
_priority);
getContext().jobQueue().addJob(j);
return; return;
} else { } else {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
@ -273,6 +279,9 @@ public class SendTunnelMessageJob extends JobImpl {
return (us.getSigningKey() != null); // only the gateway can sign return (us.getSigningKey() != null); // only the gateway can sign
} }
private static final int INSTRUCTIONS_PADDING = 32;
private static final int PAYLOAD_PADDING = 32;
/** /**
* Build the tunnel message with appropriate instructions for the * Build the tunnel message with appropriate instructions for the
* tunnel endpoint, then encrypt and sign it. * tunnel endpoint, then encrypt and sign it.
@ -326,8 +335,8 @@ public class SendTunnelMessageJob extends JobImpl {
return null; return null;
} }
byte encryptedInstructions[] = encrypt(instructions, info.getEncryptionKey().getKey(), 512); byte encryptedInstructions[] = encrypt(instructions, info.getEncryptionKey().getKey(), INSTRUCTIONS_PADDING);
byte encryptedMessage[] = encrypt(_message, key, 1024); byte encryptedMessage[] = encrypt(_message, key, PAYLOAD_PADDING);
TunnelVerificationStructure verification = createVerificationStructure(encryptedMessage, info); TunnelVerificationStructure verification = createVerificationStructure(encryptedMessage, info);
String bodyType = _message.getClass().getName(); String bodyType = _message.getClass().getName();
@ -457,9 +466,11 @@ public class SendTunnelMessageJob extends JobImpl {
TunnelMessage.class.getName(), msg.getUniqueId()); TunnelMessage.class.getName(), msg.getUniqueId());
// don't specify a selector, since createFakeOutNetMessage already does that // don't specify a selector, since createFakeOutNetMessage already does that
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, _destRouter, SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, _destRouter,
_onSend, _onReply, _onFailure, _onSend, _onReply, _onFailure,
null, _expiration, _priority)); null, (int)(_expiration-getContext().clock().now()),
_priority);
getContext().jobQueue().addJob(j);
} }
/** /**

View File

@ -1,33 +0,0 @@
package net.i2p.router.networkdb;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import net.i2p.data.Hash;
import net.i2p.data.RouterIdentity;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.SourceRouteBlock;
import net.i2p.router.HandlerJobBuilder;
import net.i2p.router.Job;
import net.i2p.router.RouterContext;
/**
* Build a HandleDatabaseSearchReplyMessageJob whenever a DatabaseSearchReplyMessage arrives
*
*/
public class DatabaseSearchReplyMessageHandler implements HandlerJobBuilder {
private RouterContext _context;
public DatabaseSearchReplyMessageHandler(RouterContext context) {
_context = context;
}
public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) {
// ignore the reply block for now
return new HandleDatabaseSearchReplyMessageJob(_context, (DatabaseSearchReplyMessage)receivedMessage, from, fromHash);
}
}

View File

@ -11,6 +11,7 @@ package net.i2p.router.networkdb;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import net.i2p.data.DataFormatException; import net.i2p.data.DataFormatException;
@ -61,7 +62,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling database lookup message for " + _message.getSearchKey()); _log.debug("Handling database lookup message for " + _message.getSearchKey());
Hash fromKey = _message.getFrom().getIdentity().getHash(); Hash fromKey = _message.getFrom();
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
if (_message.getReplyTunnel() != null) if (_message.getReplyTunnel() != null)
@ -69,8 +70,13 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
+ " (tunnel " + _message.getReplyTunnel() + ")"); + " (tunnel " + _message.getReplyTunnel() + ")");
} }
// might as well grab what they sent us if (getContext().netDb().lookupRouterInfoLocally(_message.getFrom()) == null) {
getContext().netDb().store(fromKey, _message.getFrom()); // hmm, perhaps don't always send a lookup for this...
// but for now, wtf, why not. we may even want to adjust it so that
// we penalize or benefit peers who send us that which we can or
// cannot lookup
getContext().netDb().lookupRouterInfo(_message.getFrom(), null, null, REPLY_TIMEOUT);
}
// whatdotheywant? // whatdotheywant?
handleRequest(fromKey); handleRequest(fromKey);
@ -130,11 +136,10 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
DatabaseSearchReplyMessage msg = new DatabaseSearchReplyMessage(getContext()); DatabaseSearchReplyMessage msg = new DatabaseSearchReplyMessage(getContext());
msg.setFromHash(getContext().router().getRouterInfo().getIdentity().getHash()); msg.setFromHash(getContext().router().getRouterInfo().getIdentity().getHash());
msg.setSearchKey(key); msg.setSearchKey(key);
if (routerInfoSet.size() <= 0) { for (Iterator iter = routerInfoSet.iterator(); iter.hasNext(); ) {
// always include something, so lets toss ourselves in there RouterInfo peer = (RouterInfo)iter.next();
routerInfoSet.add(getContext().router().getRouterInfo()); msg.addReply(peer.getIdentity().getHash());
} }
msg.addReplies(routerInfoSet);
getContext().statManager().addRateData("netDb.lookupsHandled", 1, 0); getContext().statManager().addRateData("netDb.lookupsHandled", 1, 0);
sendMessage(msg, toPeer, replyTunnel); // should this go via garlic messages instead? sendMessage(msg, toPeer, replyTunnel); // should this go via garlic messages instead?
} }
@ -146,7 +151,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending reply directly to " + toPeer); _log.debug("Sending reply directly to " + toPeer);
send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT+getContext().clock().now(), MESSAGE_PRIORITY); send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
} }
getContext().netDb().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT); getContext().netDb().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT);
@ -186,7 +191,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
msg.setData(baos.toByteArray()); msg.setData(baos.toByteArray());
msg.setTunnelId(replyTunnel); msg.setTunnelId(replyTunnel);
msg.setMessageExpiration(new Date(expiration)); msg.setMessageExpiration(new Date(expiration));
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, toPeer, null, null, null, null, expiration, MESSAGE_PRIORITY)); getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, toPeer, null, null, null, null, REPLY_TIMEOUT, MESSAGE_PRIORITY));
String bodyType = message.getClass().getName(); String bodyType = message.getClass().getName();
getContext().messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); getContext().messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());

View File

@ -1,74 +0,0 @@
package net.i2p.router.networkdb;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import net.i2p.data.Hash;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Receive DatabaseSearchReplyMessage data and store it in the local net db
*
*/
public class HandleDatabaseSearchReplyMessageJob extends JobImpl {
private Log _log;
private DatabaseSearchReplyMessage _message;
private RouterIdentity _from;
private Hash _fromHash;
public HandleDatabaseSearchReplyMessageJob(RouterContext context, DatabaseSearchReplyMessage receivedMessage, RouterIdentity from, Hash fromHash) {
super(context);
_log = context.logManager().getLog(HandleDatabaseSearchReplyMessageJob.class);
_message = receivedMessage;
_from = from;
_fromHash = fromHash;
}
public void runJob() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling database search reply message for key " + _message.getSearchKey().toBase64() + " with " + _message.getNumReplies() + " replies");
if (_message.getNumReplies() > 0)
getContext().jobQueue().addJob(new HandlePeerJob(0));
}
/**
* Partial job - take each reply entry, store it, then requeue again until all
* of the entries are stored. This prevents a single reply from swamping the jobqueue
*
*/
private final class HandlePeerJob extends JobImpl {
private int _curReply;
public HandlePeerJob(int reply) {
super(HandleDatabaseSearchReplyMessageJob.this.getContext());
_curReply = reply;
}
public void runJob() {
boolean remaining = handle();
if (remaining)
requeue(0);
}
private boolean handle() {
RouterInfo info = _message.getReply(_curReply);
if (_log.shouldLog(Log.INFO))
_log.info("On search for " + _message.getSearchKey().toBase64() + ", received " + info.getIdentity().getHash().toBase64());
HandlePeerJob.this.getContext().netDb().store(info.getIdentity().getHash(), info);
_curReply++;
return _message.getNumReplies() > _curReply;
}
public String getName() { return "Handle search reply value"; }
}
public String getName() { return "Handle Database Search Reply Message"; }
}

View File

@ -70,7 +70,7 @@ class ExploreJob extends SearchJob {
protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) { protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) {
DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext()); DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext());
msg.setSearchKey(getState().getTarget()); msg.setSearchKey(getState().getTarget());
msg.setFrom(replyGateway); msg.setFrom(replyGateway.getIdentity().getHash());
msg.setDontIncludePeers(getState().getAttempted()); msg.setDontIncludePeers(getState().getAttempted());
msg.setMessageExpiration(new Date(expiration)); msg.setMessageExpiration(new Date(expiration));
msg.setReplyTunnel(replyTunnelId); msg.setReplyTunnel(replyTunnelId);

View File

@ -202,7 +202,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_context.inNetMessagePool().registerHandlerJobBuilder(DatabaseLookupMessage.MESSAGE_TYPE, new DatabaseLookupMessageHandler(_context)); _context.inNetMessagePool().registerHandlerJobBuilder(DatabaseLookupMessage.MESSAGE_TYPE, new DatabaseLookupMessageHandler(_context));
_context.inNetMessagePool().registerHandlerJobBuilder(DatabaseStoreMessage.MESSAGE_TYPE, new DatabaseStoreMessageHandler(_context)); _context.inNetMessagePool().registerHandlerJobBuilder(DatabaseStoreMessage.MESSAGE_TYPE, new DatabaseStoreMessageHandler(_context));
_context.inNetMessagePool().registerHandlerJobBuilder(DatabaseSearchReplyMessage.MESSAGE_TYPE, new DatabaseSearchReplyMessageHandler(_context));
_initialized = true; _initialized = true;
_started = System.currentTimeMillis(); _started = System.currentTimeMillis();
@ -249,9 +248,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Selected hash " + rhash.toBase64() + " is not stored locally"); _log.info("Selected hash " + rhash.toBase64() + " is not stored locally");
} else if ( !(ds instanceof RouterInfo) ) { } else if ( !(ds instanceof RouterInfo) ) {
// could be a LeaseSet // leaseSet
if (_log.shouldLog(Log.DEBUG))
_log.debug("Selected router hash " + rhash.toBase64() + " is NOT a routerInfo!");
} else { } else {
rv.add(ds); rv.add(ds);
} }
@ -274,8 +271,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Selected hash " + key.toBase64() + " is not stored locally"); _log.info("Selected hash " + key.toBase64() + " is not stored locally");
} else if ( !(ds instanceof RouterInfo) ) { } else if ( !(ds instanceof RouterInfo) ) {
if (_log.shouldLog(Log.WARN)) // leaseSet
_log.warn("Selected router hash [" + key.toBase64() + "] is NOT a routerInfo!");
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("getAllRouters(): key is router: " + key.toBase64()); _log.debug("getAllRouters(): key is router: " + key.toBase64());

View File

@ -53,7 +53,7 @@ class SearchJob extends JobImpl {
* How long will we give each peer to reply to our search? * How long will we give each peer to reply to our search?
* *
*/ */
private static final long PER_PEER_TIMEOUT = 10*1000; private static final int PER_PEER_TIMEOUT = 10*1000;
/** /**
* give ourselves 30 seconds to send out the value found to the closest * give ourselves 30 seconds to send out the value found to the closest
@ -283,15 +283,14 @@ class SearchJob extends JobImpl {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Sending leaseSet search to " + router.getIdentity().getHash().toBase64() _log.debug(getJobId() + ": Sending leaseSet search to " + router.getIdentity().getHash().toBase64()
+ " for " + msg.getSearchKey().toBase64() + " w/ replies through [" + " for " + msg.getSearchKey().toBase64() + " w/ replies through ["
+ msg.getFrom().getIdentity().getHash().toBase64() + "] via tunnel [" + msg.getFrom().toBase64() + "] via tunnel ["
+ msg.getReplyTunnel() + "]"); + msg.getReplyTunnel() + "]");
SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state);
long timeoutMs = PER_PEER_TIMEOUT; // getTimeoutMs();
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, outTunnelId, router.getIdentity().getHash(), SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, outTunnelId, router.getIdentity().getHash(),
null, null, reply, new FailedJob(router), sel, null, null, reply, new FailedJob(router), sel,
timeoutMs, SEARCH_PRIORITY); PER_PEER_TIMEOUT, SEARCH_PRIORITY);
getContext().jobQueue().addJob(j); getContext().jobQueue().addJob(j);
} }
@ -304,12 +303,11 @@ class SearchJob extends JobImpl {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64() _log.info(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64()
+ " for " + msg.getSearchKey().toBase64() + " w/ replies to us [" + " for " + msg.getSearchKey().toBase64() + " w/ replies to us ["
+ msg.getFrom().getIdentity().getHash().toBase64() + "]"); + msg.getFrom().toBase64() + "]");
SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state);
long timeoutMs = PER_PEER_TIMEOUT;
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(), SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(),
reply, new FailedJob(router), sel, expiration, SEARCH_PRIORITY); reply, new FailedJob(router), sel, PER_PEER_TIMEOUT, SEARCH_PRIORITY);
getContext().jobQueue().addJob(j); getContext().jobQueue().addJob(j);
} }
@ -356,7 +354,7 @@ class SearchJob extends JobImpl {
protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) { protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) {
DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext()); DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext());
msg.setSearchKey(_state.getTarget()); msg.setSearchKey(_state.getTarget());
msg.setFrom(replyGateway); msg.setFrom(replyGateway.getIdentity().getHash());
msg.setDontIncludePeers(_state.getAttempted()); msg.setDontIncludePeers(_state.getAttempted());
msg.setMessageExpiration(new Date(expiration)); msg.setMessageExpiration(new Date(expiration));
msg.setReplyTunnel(replyTunnelId); msg.setReplyTunnel(replyTunnelId);
@ -371,7 +369,7 @@ class SearchJob extends JobImpl {
protected DatabaseLookupMessage buildMessage(long expiration) { protected DatabaseLookupMessage buildMessage(long expiration) {
DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext()); DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext());
msg.setSearchKey(_state.getTarget()); msg.setSearchKey(_state.getTarget());
msg.setFrom(getContext().router().getRouterInfo()); msg.setFrom(getContext().routerHash());
msg.setDontIncludePeers(_state.getAttempted()); msg.setDontIncludePeers(_state.getAttempted());
msg.setMessageExpiration(new Date(expiration)); msg.setMessageExpiration(new Date(expiration));
msg.setReplyTunnel(null); msg.setReplyTunnel(null);
@ -420,25 +418,28 @@ class SearchJob extends JobImpl {
if (_newPeers > 0) if (_newPeers > 0)
newPeersFound(_newPeers); newPeersFound(_newPeers);
} else { } else {
RouterInfo ri = _msg.getReply(_curIndex); Hash peer = _msg.getReply(_curIndex);
if (ri.isValid()) {
if (_state.wasAttempted(ri.getIdentity().getHash())) { RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer);
_duplicatePeers++; if (info == null) {
} // hmm, perhaps don't always send a lookup for this...
if (_log.shouldLog(Log.DEBUG)) // but for now, wtf, why not. we may even want to adjust it so that
_log.debug(getJobId() + ": dbSearchReply received on search containing router " // we penalize or benefit peers who send us that which we can or
+ ri.getIdentity().getHash() + " with publishDate of " // cannot lookup
+ new Date(ri.getPublished())); getContext().netDb().lookupRouterInfo(peer, null, null, _timeoutMs);
_facade.store(ri.getIdentity().getHash(), ri);
if (_facade.getKBuckets().add(ri.getIdentity().getHash()))
_newPeers++;
else
_seenPeers++;
} else {
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Received an invalid peer from " + _peer + ": " + ri);
_invalidPeers++;
} }
if (_state.wasAttempted(peer)) {
_duplicatePeers++;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": dbSearchReply received on search referencing router "
+ peer);
if (_facade.getKBuckets().add(peer))
_newPeers++;
else
_seenPeers++;
_curIndex++; _curIndex++;
requeue(0); requeue(0);
} }