NetDB: Improve routing of DatabaseStoreMessage acks

Send our own RI unsolicited in reply if we aren't floodfill
  Don't ack or flood a store of an unknown type
PeerTestJob: Don't generate zero reply token
Tunnels: More checks of messages received down exploratory tunnels
javadocs and comments
This commit is contained in:
zzz
2015-06-13 15:13:35 +00:00
parent b9e07bc9aa
commit 93c7860d2b
7 changed files with 146 additions and 30 deletions

View File

@ -1,3 +1,12 @@
2015-06-13 zzz
* i2psnark: Fix NPE (ticket #1602)
* NetDB:
- Improve routing of DatabaseStoreMessage acks
- Send our own RI unsolicited in reply if we aren't floodfill
- Don't ack or flood a store of an unknown type
* PeerTestJob: Don't generate zero reply token
* Tunnels: More checks of messages received down exploratory tunnels
2015-06-08 dg
* Language fixes
* Make netDb.storeFloodNew graphable for testing (#1195)
@ -6,6 +15,15 @@
* Silence Irc{Inbound,Outbound}Filter warnings about 'no streams'
when we can't connect to an IRC server. Change to WARN.
2015-06-07 zzz
* Logs: Correct wrapper.config location when running as a service
* NetDB: Fix early NPE
* SSU: Possible fix for NPE in establisher
2015-06-06 zzz
* Console: Add indication of current ff status on /configadvanced,
change immediately when config changes, force republish
2015-06-06 str4d
* newsxml: Don't use XXX for parsing dates on Android

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 4;
public final static long BUILD = 5;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -43,15 +43,33 @@ public class SendMessageDirectJob extends JobImpl {
private boolean _sent;
private long _searchOn;
/**
* @param toPeer may be ourselves
*/
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, int timeoutMs, int priority) {
this(ctx, message, toPeer, null, null, null, null, timeoutMs, priority);
}
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) {
/**
* @param toPeer may be ourselves
* @param onSuccess may be null
* @param onFail may be null
* @param selector be null
*/
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess,
Job onFail, MessageSelector selector, int timeoutMs, int priority) {
this(ctx, message, toPeer, null, onSuccess, onFail, selector, timeoutMs, priority);
}
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, Job onSend, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) {
/**
* @param toPeer may be ourselves
* @param onSend may be null
* @param onSuccess may be null
* @param onFail may be null
* @param selector be null
*/
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, Job onSend, ReplyJob onSuccess,
Job onFail, MessageSelector selector, int timeoutMs, int priority) {
super(ctx);
_log = getContext().logManager().getLog(SendMessageDirectJob.class);
_message = message;

View File

@ -14,14 +14,19 @@ import java.util.Date;
import net.i2p.data.DatabaseEntry;
import net.i2p.data.Hash;
import net.i2p.data.LeaseSet;
import net.i2p.data.TunnelId;
import net.i2p.data.router.RouterAddress;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.router.RouterInfo;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.router.message.SendMessageDirectJob;
import net.i2p.util.Log;
/**
@ -34,8 +39,15 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
private final RouterIdentity _from;
private Hash _fromHash;
private final FloodfillNetworkDatabaseFacade _facade;
private final static int REPLY_TIMEOUT = 60*1000;
private final static int MESSAGE_PRIORITY = OutNetMessage.PRIORITY_NETDB_REPLY;
public HandleFloodfillDatabaseStoreMessageJob(RouterContext ctx, DatabaseStoreMessage receivedMessage, RouterIdentity from, Hash fromHash, FloodfillNetworkDatabaseFacade facade) {
/**
* @param receivedMessage must never have reply token set if it came down a tunnel
*/
public HandleFloodfillDatabaseStoreMessageJob(RouterContext ctx, DatabaseStoreMessage receivedMessage,
RouterIdentity from, Hash fromHash,
FloodfillNetworkDatabaseFacade facade) {
super(ctx);
_log = ctx.logManager().getLog(getClass());
_message = receivedMessage;
@ -136,6 +148,7 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
// somebody has our keys...
if (getContext().routerHash().equals(key)) {
//getContext().statManager().addRateData("netDb.storeLocalRouterInfoAttempt", 1, 0);
// This is initiated by PeerTestJob from another peer
// throw rather than return, so that we send the ack below (prevent easy attack)
dontBlamePeer = true;
throw new IllegalArgumentException("Peer attempted to store our RouterInfo");
@ -170,15 +183,18 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
if (_log.shouldLog(Log.ERROR))
_log.error("Invalid DatabaseStoreMessage data type - " + entry.getType()
+ ": " + _message);
// don't ack or flood
return;
}
long recvEnd = System.currentTimeMillis();
getContext().statManager().addRateData("netDb.storeRecvTime", recvEnd-recvBegin);
// ack even if invalid or unsupported
// ack even if invalid
// in particular, ack our own RI (from PeerTestJob)
// TODO any cases where we shouldn't?
if (_message.getReplyToken() > 0)
sendAck();
sendAck(key);
long ackEnd = System.currentTimeMillis();
if (_from != null)
@ -223,7 +239,7 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
}
}
private void sendAck() {
private void sendAck(Hash storedKey) {
DeliveryStatusMessage msg = new DeliveryStatusMessage(getContext());
msg.setMessageId(_message.getReplyToken());
// Randomize for a little protection against clock-skew fingerprinting.
@ -231,31 +247,62 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
// TODO just set to 0?
// TODO we have no session to garlic wrap this with, needs new message
msg.setArrival(getContext().clock().now() - getContext().random().nextInt(3*1000));
/*
if (FloodfillNetworkDatabaseFacade.floodfillEnabled(getContext())) {
// no need to do anything but send it where they ask
// may be null
TunnelId replyTunnel = _message.getReplyTunnel();
// A store of our own RI, only if we are not FF
DatabaseStoreMessage msg2;
if (getContext().netDb().floodfillEnabled() ||
storedKey.equals(getContext().routerHash())) {
// don't send our RI if the store was our RI (from PeerTestJob)
msg2 = null;
} else {
// we aren't ff, send a go-away message
msg2 = new DatabaseStoreMessage(getContext());
RouterInfo me = getContext().router().getRouterInfo();
msg2.setEntry(me);
if (_log.shouldWarn())
_log.warn("Got a store w/ reply token, but we aren't ff: from: " + _from +
" fromHash: " + _fromHash + " msg: " + _message, new Exception());
}
Hash toPeer = _message.getReplyGateway();
boolean toUs = getContext().routerHash().equals(toPeer);
// to reduce connection congestion, send directly if connected already,
// else through an exploratory tunnel.
if (toUs && replyTunnel != null) {
// if we are the gateway, act as if we received it
TunnelGatewayMessage tgm = new TunnelGatewayMessage(getContext());
tgm.setMessage(msg);
tgm.setTunnelId(_message.getReplyTunnel());
tgm.setTunnelId(replyTunnel);
tgm.setMessageExpiration(msg.getMessageExpiration());
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), tgm, _message.getReplyGateway(), 10*1000, 200));
getContext().tunnelDispatcher().dispatch(tgm);
if (msg2 != null) {
TunnelGatewayMessage tgm2 = new TunnelGatewayMessage(getContext());
tgm2.setMessage(msg2);
tgm2.setTunnelId(replyTunnel);
tgm2.setMessageExpiration(msg.getMessageExpiration());
getContext().tunnelDispatcher().dispatch(tgm2);
}
} else if (toUs || getContext().commSystem().isEstablished(toPeer)) {
Job send = new SendMessageDirectJob(getContext(), msg, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
send.runJob();
if (msg2 != null) {
Job send2 = new SendMessageDirectJob(getContext(), msg2, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
send2.runJob();
}
} else {
*/
TunnelInfo outTunnel = selectOutboundTunnel();
// pick tunnel with endpoint closest to toPeer
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundExploratoryTunnel(toPeer);
if (outTunnel == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No outbound tunnel could be found");
return;
} else {
}
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0),
_message.getReplyTunnel(), _message.getReplyGateway());
replyTunnel, toPeer);
if (msg2 != null)
getContext().tunnelDispatcher().dispatchOutbound(msg2, outTunnel.getSendTunnelId(0),
replyTunnel, toPeer);
}
//}
}
private TunnelInfo selectOutboundTunnel() {
return getContext().tunnelManager().selectOutboundTunnel();
}
public String getName() { return "Handle Database Store Message"; }

View File

@ -24,7 +24,8 @@ import net.i2p.util.Log;
* selection to the peer manager and tests the peer by sending it a useless
* database store message
*
* TODO - What's the point? Disable this? See also notes in PeerManager.selectPeers()
* TODO - What's the point? Disable this? See also notes in PeerManager.selectPeers().
* TODO - Use something besides sending the peer's RI to itself?
*/
public class PeerTestJob extends JobImpl {
private final Log _log;
@ -82,6 +83,7 @@ public class PeerTestJob extends JobImpl {
/**
* Retrieve a group of 0 or more peers that we want to test.
* Returned list will not include ourselves.
*
* @return set of RouterInfo structures
*/
@ -110,7 +112,8 @@ public class PeerTestJob extends JobImpl {
/**
* Fire off the necessary jobs and messages to test the given peer
*
* The message is a store of the peer's RI to itself,
* with a reply token.
*/
private void testPeer(RouterInfo peer) {
TunnelInfo inTunnel = getInboundTunnelId();
@ -130,7 +133,7 @@ public class PeerTestJob extends JobImpl {
int timeoutMs = getTestTimeout();
long expiration = getContext().clock().now() + timeoutMs;
long nonce = getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
long nonce = 1 + getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE - 1);
DatabaseStoreMessage msg = buildMessage(peer, inTunnelId, inGateway.getIdentity().getHash(), nonce, expiration);
TunnelInfo outTunnel = getOutboundTunnelId();
@ -172,7 +175,9 @@ public class PeerTestJob extends JobImpl {
}
/**
* Build a message to test the peer with
* Build a message to test the peer with.
* The message is a store of the peer's RI to itself,
* with a reply token.
*/
private DatabaseStoreMessage buildMessage(RouterInfo peer, TunnelId replyTunnel, Hash replyGateway, long nonce, long expiration) {
DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext());

View File

@ -117,8 +117,8 @@ class InboundMessageDistributor implements GarlicMessageReceiver.CloveReceiver {
}
return;
} else if (dsm.getReplyToken() != 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping LS DSM w/ reply token down a tunnel for " + _client + ": " + msg);
_context.statManager().addRateData("tunnel.dropDangerousClientTunnelMessage", 1, type);
_log.error("Dropping LS DSM w/ reply token down a tunnel for " + _client + ": " + msg);
return;
} else {
// allow DSM of our own key (used by FloodfillVerifyStoreJob)
@ -143,6 +143,33 @@ class InboundMessageDistributor implements GarlicMessageReceiver.CloveReceiver {
return;
} // switch
} else {
// expl. tunnel
switch (type) {
case DatabaseStoreMessage.MESSAGE_TYPE:
DatabaseStoreMessage dsm = (DatabaseStoreMessage) msg;
if (dsm.getReplyToken() != 0) {
_context.statManager().addRateData("tunnel.dropDangerousExplTunnelMessage", 1, type);
_log.error("Dropping DSM w/ reply token down a expl. tunnel: " + msg);
return;
}
if (dsm.getEntry().getType() == DatabaseEntry.KEY_TYPE_LEASESET)
((LeaseSet)dsm.getEntry()).setReceivedAsReply();
break;
case DatabaseSearchReplyMessage.MESSAGE_TYPE:
case DeliveryStatusMessage.MESSAGE_TYPE:
case GarlicMessage.MESSAGE_TYPE:
case TunnelBuildReplyMessage.MESSAGE_TYPE:
case VariableTunnelBuildReplyMessage.MESSAGE_TYPE:
// these are safe, handled below
break;
default:
_context.statManager().addRateData("tunnel.dropDangerousExplTunnelMessage", 1, type);
_log.error("Dropped dangerous message down expl tunnel: " + msg, new Exception("cause"));
return;
} // switch
} // client != null
if ( (target == null) || ( (tunnel == null) && (_context.routerHash().equals(target) ) ) ) {

View File

@ -211,7 +211,8 @@ public class TunnelDispatcher implements Service {
ctx.statManager().createRequiredRateStat("tunnel.corruptMessage", "Corrupt messages received",
"Tunnels", RATES);
// following are for InboundMessageDistributor
ctx.statManager().createRateStat("tunnel.dropDangerousClientTunnelMessage", "How many tunnel messages come down a client tunnel that we shouldn't expect (lifetime is the 'I2NP type')", "Tunnels", new long[] { 60*60*1000 });
ctx.statManager().createRateStat("tunnel.dropDangerousClientTunnelMessage", "(lifetime is the I2NP type)", "Tunnels", new long[] { 60*60*1000 });
ctx.statManager().createRateStat("tunnel.dropDangerousExplTunnelMessage", "(lifetime is the I2NP type)", "Tunnels", new long[] { 60*60*1000 });
ctx.statManager().createRateStat("tunnel.handleLoadClove", "When do we receive load test cloves", "Tunnels", new long[] { 60*60*1000 });
// following is for PumpedTunnelGateway
ctx.statManager().createRateStat("tunnel.dropGatewayOverflow", "Dropped message at GW, queue full", "Tunnels", new long[] { 60*60*1000 });