2004-11-01 jrandom
* Increase the tunnel test timeout rapidly if our tunnels are failing. * Honor message expirations for some tunnel jobs that were prematurely expired. * Streamline memory usage with temporary object caches and more efficient serialization for SHA256 calculation, logging, and both I2CP and I2NP message handling. * Fix some situations where we forward messages too eagerly. For a request at the tunnel endpoint, if the tunnel is inbound and the target is remote, honor the message by tunnel routing the data rather than sending it directly to the requested location.
This commit is contained in:
@ -126,6 +126,61 @@ public class DeliveryInstructions extends DataStructureImpl {
|
||||
}
|
||||
}
|
||||
|
||||
public int readBytes(byte data[], int offset) throws DataFormatException {
|
||||
int cur = offset;
|
||||
long flags = DataHelper.fromLong(data, cur, 1);
|
||||
cur++;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Read flags: " + flags + " mode: " + flagMode(flags));
|
||||
|
||||
if (flagEncrypted(flags)) {
|
||||
byte kd[] = new byte[SessionKey.KEYSIZE_BYTES];
|
||||
System.arraycopy(data, cur, kd, 0, SessionKey.KEYSIZE_BYTES);
|
||||
cur += SessionKey.KEYSIZE_BYTES;
|
||||
setEncryptionKey(new SessionKey(kd));
|
||||
setEncrypted(true);
|
||||
} else {
|
||||
setEncrypted(false);
|
||||
}
|
||||
|
||||
setDeliveryMode(flagMode(flags));
|
||||
switch (flagMode(flags)) {
|
||||
case FLAG_MODE_LOCAL:
|
||||
break;
|
||||
case FLAG_MODE_DESTINATION:
|
||||
byte destHash[] = new byte[Hash.HASH_LENGTH];
|
||||
System.arraycopy(data, cur, destHash, 0, Hash.HASH_LENGTH);
|
||||
cur += Hash.HASH_LENGTH;
|
||||
setDestination(new Hash(destHash));
|
||||
break;
|
||||
case FLAG_MODE_ROUTER:
|
||||
byte routerHash[] = new byte[Hash.HASH_LENGTH];
|
||||
System.arraycopy(data, cur, routerHash, 0, Hash.HASH_LENGTH);
|
||||
cur += Hash.HASH_LENGTH;
|
||||
setRouter(new Hash(routerHash));
|
||||
break;
|
||||
case FLAG_MODE_TUNNEL:
|
||||
byte tunnelRouterHash[] = new byte[Hash.HASH_LENGTH];
|
||||
System.arraycopy(data, cur, tunnelRouterHash, 0, Hash.HASH_LENGTH);
|
||||
cur += Hash.HASH_LENGTH;
|
||||
setRouter(new Hash(tunnelRouterHash));
|
||||
setTunnelId(new TunnelId(DataHelper.fromLong(data, cur, 4)));
|
||||
cur += 4;
|
||||
break;
|
||||
}
|
||||
|
||||
if (flagDelay(flags)) {
|
||||
long delay = DataHelper.fromLong(data, cur, 4);
|
||||
cur += 4;
|
||||
setDelayRequested(true);
|
||||
setDelaySeconds(delay);
|
||||
} else {
|
||||
setDelayRequested(false);
|
||||
}
|
||||
return cur - offset;
|
||||
}
|
||||
|
||||
|
||||
private boolean flagEncrypted(long flags) {
|
||||
return (0 != (flags & FLAG_ENCRYPTED));
|
||||
}
|
||||
|
@ -35,7 +35,8 @@ public interface I2NPMessage extends DataStructure {
|
||||
* @throws IOException if there is a problem reading from the stream
|
||||
*/
|
||||
public int readBytes(InputStream in, int type, byte buffer[]) throws I2NPMessageException, IOException;
|
||||
|
||||
public int readBytes(byte data[], int type, int offset) throws I2NPMessageException, IOException;
|
||||
|
||||
/**
|
||||
* Read the body into the data structures, after the initial type byte and
|
||||
* the uniqueId / expiration, using the current class's format as defined by
|
||||
|
@ -18,7 +18,7 @@ import net.i2p.data.DataHelper;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Handle messages from router to router
|
||||
* Handle messages from router to router. This class is NOT threadsafe
|
||||
*
|
||||
*/
|
||||
public class I2NPMessageHandler {
|
||||
@ -28,6 +28,8 @@ public class I2NPMessageHandler {
|
||||
private long _lastReadEnd;
|
||||
private int _lastSize;
|
||||
private byte _messageBuffer[];
|
||||
private I2NPMessage _lastRead;
|
||||
|
||||
public I2NPMessageHandler(I2PAppContext context) {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(I2NPMessageHandler.class);
|
||||
@ -68,6 +70,51 @@ public class I2NPMessageHandler {
|
||||
throw new I2NPMessageException("Error reading the message", dfe);
|
||||
}
|
||||
}
|
||||
|
||||
/** clear the last message read from a byte array with an offset */
|
||||
public I2NPMessage lastRead() {
|
||||
I2NPMessage rv = _lastRead;
|
||||
_lastRead = null;
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read an I2NPMessage from the stream and return the fully populated object.
|
||||
*
|
||||
* @throws IOException if there is an IO problem reading from the stream
|
||||
* @throws I2NPMessageException if there is a problem handling the particular
|
||||
* message - if it is an unknown type or has improper formatting, etc.
|
||||
*/
|
||||
public I2NPMessage readMessage(byte data[]) throws IOException, I2NPMessageException {
|
||||
int offset = readMessage(data, 0);
|
||||
return lastRead();
|
||||
}
|
||||
public int readMessage(byte data[], int offset) throws IOException, I2NPMessageException {
|
||||
int cur = offset;
|
||||
int type = (int)DataHelper.fromLong(data, cur, 1);
|
||||
cur++;
|
||||
_lastReadBegin = System.currentTimeMillis();
|
||||
I2NPMessage msg = createMessage(type);
|
||||
if (msg == null)
|
||||
throw new I2NPMessageException("The type "+ type + " is an unknown I2NP message");
|
||||
try {
|
||||
_lastSize = msg.readBytes(data, type, cur);
|
||||
cur += _lastSize;
|
||||
} catch (IOException ioe) {
|
||||
throw ioe;
|
||||
} catch (I2NPMessageException ime) {
|
||||
throw ime;
|
||||
} catch (Exception e) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error reading the stream", e);
|
||||
throw new IOException("Unknown error reading the " + msg.getClass().getName()
|
||||
+ ": " + e.getMessage());
|
||||
}
|
||||
_lastReadEnd = System.currentTimeMillis();
|
||||
_lastRead = msg;
|
||||
return cur - offset;
|
||||
}
|
||||
|
||||
public long getLastReadTime() { return _lastReadEnd - _lastReadBegin; }
|
||||
public int getLastSize() { return _lastSize; }
|
||||
|
||||
|
@ -15,6 +15,7 @@ import java.io.OutputStream;
|
||||
import java.util.Date;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.crypto.SHA256EntryCache;
|
||||
import net.i2p.data.DataFormatException;
|
||||
import net.i2p.data.DataHelper;
|
||||
import net.i2p.data.DataStructureImpl;
|
||||
@ -74,8 +75,11 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
|
||||
cur += numRead;
|
||||
}
|
||||
|
||||
Hash calc = _context.sha().calculateHash(buffer, 0, size);
|
||||
if (!calc.equals(h))
|
||||
SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(size);
|
||||
Hash calc = _context.sha().calculateHash(buffer, 0, size, cache);
|
||||
boolean eq = calc.equals(h);
|
||||
_context.sha().cache().release(cache);
|
||||
if (!eq)
|
||||
throw new I2NPMessageException("Hash does not match");
|
||||
|
||||
long start = _context.clock().now();
|
||||
@ -90,6 +94,46 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
|
||||
throw new I2NPMessageException("Error reading the message header", dfe);
|
||||
}
|
||||
}
|
||||
public int readBytes(byte data[], int type, int offset) throws I2NPMessageException, IOException {
|
||||
if (type < 0) {
|
||||
type = (int)DataHelper.fromLong(data, offset, 1);
|
||||
offset++;
|
||||
}
|
||||
_uniqueId = DataHelper.fromLong(data, offset, 4);
|
||||
offset += 4;
|
||||
_expiration = DataHelper.fromDate(data, offset);
|
||||
offset += DataHelper.DATE_LENGTH;
|
||||
int size = (int)DataHelper.fromLong(data, offset, 2);
|
||||
offset += 2;
|
||||
Hash h = new Hash();
|
||||
byte hdata[] = new byte[Hash.HASH_LENGTH];
|
||||
System.arraycopy(data, offset, hdata, 0, Hash.HASH_LENGTH);
|
||||
offset += Hash.HASH_LENGTH;
|
||||
h.setData(hdata);
|
||||
|
||||
if (offset + size > data.length)
|
||||
throw new I2NPMessageException("Payload is too short ["
|
||||
+ "data.len=" + data.length
|
||||
+ " offset=" + offset
|
||||
+ " wanted=" + size + "]");
|
||||
|
||||
SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(size);
|
||||
Hash calc = _context.sha().calculateHash(data, offset, size, cache);
|
||||
boolean eq = calc.equals(h);
|
||||
_context.sha().cache().release(cache);
|
||||
if (!eq)
|
||||
throw new I2NPMessageException("Hash does not match");
|
||||
|
||||
long start = _context.clock().now();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Reading bytes: type = " + type + " / uniqueId : " + _uniqueId + " / expiration : " + _expiration);
|
||||
readMessage(data, offset, size, type);
|
||||
long time = _context.clock().now() - start;
|
||||
if (time > 50)
|
||||
_context.statManager().addRateData("i2np.readTime", time, time);
|
||||
return size + Hash.HASH_LENGTH + 1 + 4 + DataHelper.DATE_LENGTH;
|
||||
}
|
||||
|
||||
public void writeBytes(OutputStream out) throws DataFormatException, IOException {
|
||||
int size = getMessageSize();
|
||||
if (size < 47) throw new DataFormatException("Unable to build the message");
|
||||
|
@ -117,12 +117,14 @@ public class I2NPMessageReader {
|
||||
while (!_context.throttle().acceptNetworkMessage()) {
|
||||
try { Thread.sleep(500 + _context.random().nextInt(512)); } catch (InterruptedException ie) {}
|
||||
}
|
||||
|
||||
// do read
|
||||
try {
|
||||
I2NPMessage msg = _handler.readMessage(_stream);
|
||||
if (msg != null) {
|
||||
long msToRead = _handler.getLastReadTime();
|
||||
int bytesRead = _handler.getLastSize();
|
||||
msToRead += injectLag(bytesRead);
|
||||
_listener.messageReceived(I2NPMessageReader.this, msg, msToRead, bytesRead);
|
||||
}
|
||||
} catch (I2NPMessageException ime) {
|
||||
@ -145,5 +147,33 @@ public class I2NPMessageReader {
|
||||
}
|
||||
// boom bye bye bad bwoy
|
||||
}
|
||||
|
||||
private final long injectLag(int size) {
|
||||
if (true) {
|
||||
return 0;
|
||||
} else {
|
||||
boolean shouldLag = _context.random().nextInt(1000) > size;
|
||||
long readLag = getReadLag();
|
||||
if (readLag > 0) {
|
||||
long lag = _context.random().nextLong(readLag);
|
||||
if (lag > 0) {
|
||||
try { Thread.sleep(lag); } catch (InterruptedException ie) {}
|
||||
return lag;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final long getReadLag() {
|
||||
try {
|
||||
return Long.parseLong(_context.getProperty("router.injectLagMs", "0"));
|
||||
} catch (NumberFormatException nfe) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ public class MessageValidator {
|
||||
public MessageValidator(RouterContext context) {
|
||||
_log = context.logManager().getLog(MessageValidator.class);
|
||||
_receivedIdExpirations = new TreeMap();
|
||||
_receivedIds = new HashSet(32*1024);
|
||||
_receivedIds = new HashSet(256);
|
||||
_receivedIdLock = new Object();
|
||||
_context = context;
|
||||
}
|
||||
@ -108,12 +108,16 @@ public class MessageValidator {
|
||||
*
|
||||
*/
|
||||
private void locked_cleanReceivedIds(long now) {
|
||||
Set toRemoveIds = new HashSet(4);
|
||||
Set toRemoveDates = new HashSet(4);
|
||||
Set toRemoveIds = null;
|
||||
Set toRemoveDates = null;
|
||||
for (Iterator iter = _receivedIdExpirations.keySet().iterator(); iter.hasNext(); ) {
|
||||
Long date = (Long)iter.next();
|
||||
if (date.longValue() <= now) {
|
||||
// no need to keep track of things in the past
|
||||
if (toRemoveIds == null) {
|
||||
toRemoveIds = new HashSet(2);
|
||||
toRemoveDates = new HashSet(2);
|
||||
}
|
||||
toRemoveDates.add(date);
|
||||
toRemoveIds.add(_receivedIdExpirations.get(date));
|
||||
} else {
|
||||
@ -122,12 +126,16 @@ public class MessageValidator {
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (Iterator iter = toRemoveDates.iterator(); iter.hasNext(); )
|
||||
_receivedIdExpirations.remove(iter.next());
|
||||
for (Iterator iter = toRemoveIds.iterator(); iter.hasNext(); )
|
||||
_receivedIds.remove(iter.next());
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Cleaned out " + toRemoveDates.size() + " expired messageIds, leaving " + _receivedIds.size() + " remaining");
|
||||
if (toRemoveIds != null) {
|
||||
for (Iterator iter = toRemoveDates.iterator(); iter.hasNext(); )
|
||||
_receivedIdExpirations.remove(iter.next());
|
||||
for (Iterator iter = toRemoveIds.iterator(); iter.hasNext(); )
|
||||
_receivedIds.remove(iter.next());
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Cleaned out " + toRemoveDates.size()
|
||||
+ " expired messageIds, leaving "
|
||||
+ _receivedIds.size() + " remaining");
|
||||
}
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.64 $ $Date: 2004/10/29 21:40:52 $";
|
||||
public final static String ID = "$Revision: 1.65 $ $Date: 2004/10/30 18:44:01 $";
|
||||
public final static String VERSION = "0.4.1.3";
|
||||
public final static long BUILD = 5;
|
||||
public final static long BUILD = 6;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION);
|
||||
System.out.println("Router ID: " + RouterVersion.ID);
|
||||
|
@ -166,9 +166,11 @@ public class HandleGarlicMessageJob extends JobImpl {
|
||||
return;
|
||||
}
|
||||
long sendExpiration = clove.getExpiration().getTime();
|
||||
// if the clove targets something remote, tunnel route it
|
||||
boolean sendDirect = false;
|
||||
_handler.handleMessage(clove.getInstructions(), clove.getData(),
|
||||
clove.getCloveId(), _from, _fromHash,
|
||||
sendExpiration, FORWARD_PRIORITY);
|
||||
sendExpiration, FORWARD_PRIORITY, sendDirect);
|
||||
}
|
||||
|
||||
public void dropped() {
|
||||
|
@ -11,6 +11,7 @@ package net.i2p.router.message;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import net.i2p.data.DataFormatException;
|
||||
import net.i2p.data.DataHelper;
|
||||
@ -37,6 +38,7 @@ import net.i2p.router.ReplyJob;
|
||||
import net.i2p.router.Router;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.router.TunnelSelectionCriteria;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
public class HandleTunnelMessageJob extends JobImpl {
|
||||
@ -333,20 +335,57 @@ public class HandleTunnelMessageJob extends JobImpl {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Sending on to requested tunnel " + id.getTunnelId() + " on router "
|
||||
+ router.toBase64());
|
||||
TunnelMessage msg = new TunnelMessage(getContext());
|
||||
msg.setTunnelId(id);
|
||||
msg.setData(body.toByteArray());
|
||||
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, FORWARD_TIMEOUT, FORWARD_PRIORITY));
|
||||
|
||||
String bodyType = body.getClass().getName();
|
||||
getContext().messageHistory().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
int timeoutMs = (int)(body.getMessageExpiration().getTime() - getContext().clock().now());
|
||||
if (timeoutMs < 5000)
|
||||
timeoutMs = FORWARD_TIMEOUT;
|
||||
|
||||
TunnelInfo curInfo = getContext().tunnelManager().getTunnelInfo(_message.getTunnelId());
|
||||
if (curInfo.getTunnelId().getType() != TunnelId.TYPE_INBOUND) {
|
||||
// we are not processing a request at the end of an inbound tunnel, so
|
||||
// there's no reason to hide our location. honor the request directly
|
||||
|
||||
TunnelMessage msg = new TunnelMessage(getContext());
|
||||
msg.setTunnelId(id);
|
||||
msg.setData(body.toByteArray());
|
||||
|
||||
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, timeoutMs, FORWARD_PRIORITY));
|
||||
|
||||
String bodyType = body.getClass().getName();
|
||||
getContext().messageHistory().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
} else {
|
||||
// the instructions request that we forward a message remotely from
|
||||
// the hidden location. honor it by sending it out a tunnel
|
||||
TunnelId outTunnelId = selectOutboundTunnelId();
|
||||
if (outTunnelId == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("No outbound tunnels available to forward the message, dropping it");
|
||||
return;
|
||||
}
|
||||
getContext().jobQueue().addJob(new SendTunnelMessageJob(getContext(), body, outTunnelId, router, id,
|
||||
null, null, null, null, timeoutMs, FORWARD_PRIORITY));
|
||||
}
|
||||
}
|
||||
|
||||
private TunnelId selectOutboundTunnelId() {
|
||||
TunnelSelectionCriteria criteria = new TunnelSelectionCriteria();
|
||||
criteria.setMinimumTunnelsRequired(1);
|
||||
criteria.setMaximumTunnelsRequired(1);
|
||||
List ids = getContext().tunnelManager().selectOutboundTunnelIds(criteria);
|
||||
if ( (ids == null) || (ids.size() <= 0) )
|
||||
return null;
|
||||
else
|
||||
return (TunnelId)ids.get(0);
|
||||
}
|
||||
|
||||
private void sendToRouter(Hash router, I2NPMessage body) {
|
||||
// TODO: we may want to send it via a tunnel later on, but for now, direct will do.
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Sending on to requested router " + router.toBase64());
|
||||
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), body, router, FORWARD_TIMEOUT, FORWARD_PRIORITY));
|
||||
int timeoutMs = (int)(body.getMessageExpiration().getTime() - getContext().clock().now());
|
||||
if (timeoutMs < 5000)
|
||||
timeoutMs = FORWARD_TIMEOUT;
|
||||
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), body, router, timeoutMs, FORWARD_PRIORITY));
|
||||
}
|
||||
|
||||
private void sendToLocal(I2NPMessage body) {
|
||||
@ -390,7 +429,7 @@ public class HandleTunnelMessageJob extends JobImpl {
|
||||
|
||||
private I2NPMessage getBody(byte body[]) {
|
||||
try {
|
||||
return _handler.readMessage(new ByteArrayInputStream(body));
|
||||
return _handler.readMessage(body); // new ByteArrayInputStream(body));
|
||||
} catch (I2NPMessageException ime) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error parsing the message body", ime);
|
||||
|
@ -10,6 +10,8 @@ package net.i2p.router.message;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.data.Payload;
|
||||
import net.i2p.data.RouterIdentity;
|
||||
@ -22,6 +24,7 @@ import net.i2p.router.ClientMessage;
|
||||
import net.i2p.router.InNetMessage;
|
||||
import net.i2p.router.MessageReceptionInfo;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelSelectionCriteria;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
@ -40,7 +43,7 @@ class MessageHandler {
|
||||
|
||||
public void handleMessage(DeliveryInstructions instructions, I2NPMessage message,
|
||||
long replyId, RouterIdentity from, Hash fromHash,
|
||||
long expiration, int priority) {
|
||||
long expiration, int priority, boolean sendDirect) {
|
||||
switch (instructions.getDeliveryMode()) {
|
||||
case DeliveryInstructions.DELIVERY_MODE_LOCAL:
|
||||
_log.debug("Instructions for LOCAL DELIVERY");
|
||||
@ -75,7 +78,7 @@ class MessageHandler {
|
||||
_log.debug("Instructions for TUNNEL DELIVERY to"
|
||||
+ instructions.getTunnelId().getTunnelId() + " on "
|
||||
+ instructions.getRouter().toBase64());
|
||||
handleTunnel(instructions, expiration, message, priority);
|
||||
handleTunnel(instructions, expiration, message, priority, sendDirect);
|
||||
break;
|
||||
default:
|
||||
_log.error("Message has instructions that are not yet implemented: mode = " + instructions.getDeliveryMode());
|
||||
@ -111,7 +114,7 @@ class MessageHandler {
|
||||
_context.jobQueue().addJob(j);
|
||||
}
|
||||
|
||||
private void handleTunnel(DeliveryInstructions instructions, long expiration, I2NPMessage message, int priority) {
|
||||
private void handleTunnel(DeliveryInstructions instructions, long expiration, I2NPMessage message, int priority, boolean direct) {
|
||||
Hash to = instructions.getRouter();
|
||||
long timeoutMs = expiration - _context.clock().now();
|
||||
TunnelId tunnelId = instructions.getTunnelId();
|
||||
@ -131,20 +134,44 @@ class MessageHandler {
|
||||
}
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Handle " + message.getClass().getName() + " to send to remote tunnel "
|
||||
+ tunnelId.getTunnelId() + " on router " + to.toBase64());
|
||||
TunnelMessage msg = new TunnelMessage(_context);
|
||||
msg.setData(message.toByteArray());
|
||||
msg.setTunnelId(tunnelId);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Placing message of type " + message.getClass().getName()
|
||||
+ " into the new tunnel message bound for " + tunnelId.getTunnelId()
|
||||
+ " on " + to.toBase64());
|
||||
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, (int)timeoutMs, priority));
|
||||
|
||||
String bodyType = message.getClass().getName();
|
||||
_context.messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
if (direct) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Handle " + message.getClass().getName() + " to send to remote tunnel "
|
||||
+ tunnelId.getTunnelId() + " on router " + to.toBase64());
|
||||
TunnelMessage msg = new TunnelMessage(_context);
|
||||
msg.setData(message.toByteArray());
|
||||
msg.setTunnelId(tunnelId);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Placing message of type " + message.getClass().getName()
|
||||
+ " into the new tunnel message bound for " + tunnelId.getTunnelId()
|
||||
+ " on " + to.toBase64());
|
||||
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, (int)timeoutMs, priority));
|
||||
|
||||
String bodyType = message.getClass().getName();
|
||||
_context.messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
} else {
|
||||
// we received a message with instructions to send it somewhere, but we shouldn't
|
||||
// expose where we are in the process of honoring it. so, send it out a tunnel
|
||||
TunnelId outTunnelId = selectOutboundTunnelId();
|
||||
if (outTunnelId == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("No outbound tunnels available to forward the message, dropping it");
|
||||
return;
|
||||
}
|
||||
_context.jobQueue().addJob(new SendTunnelMessageJob(_context, message, outTunnelId, to, tunnelId,
|
||||
null, null, null, null, timeoutMs, priority));
|
||||
}
|
||||
}
|
||||
|
||||
private TunnelId selectOutboundTunnelId() {
|
||||
TunnelSelectionCriteria criteria = new TunnelSelectionCriteria();
|
||||
criteria.setMinimumTunnelsRequired(1);
|
||||
criteria.setMaximumTunnelsRequired(1);
|
||||
List ids = _context.tunnelManager().selectOutboundTunnelIds(criteria);
|
||||
if ( (ids == null) || (ids.size() <= 0) )
|
||||
return null;
|
||||
else
|
||||
return (TunnelId)ids.get(0);
|
||||
}
|
||||
|
||||
private void handleLocalDestination(DeliveryInstructions instructions, I2NPMessage message, Hash fromHash) {
|
||||
|
@ -75,6 +75,7 @@ class TestTunnelJob extends JobImpl {
|
||||
|
||||
private final static long DEFAULT_TEST_TIMEOUT = 10*1000; // 10 seconds for a test to succeed
|
||||
private final static long DEFAULT_MINIMUM_TEST_TIMEOUT = 5*1000; // 5 second min
|
||||
private final static long MAXIMUM_TEST_TIMEOUT = 60*1000; // 60 second max
|
||||
private final static int TEST_PRIORITY = 100;
|
||||
|
||||
/**
|
||||
@ -86,7 +87,7 @@ class TestTunnelJob extends JobImpl {
|
||||
if (rs != null) {
|
||||
Rate r = rs.getRate(10*60*1000);
|
||||
if (r != null) {
|
||||
if (r.getLifetimeEventCount() > 0) {
|
||||
if (r.getLifetimeEventCount() > 10) {
|
||||
if (r.getLastEventCount() <= 0)
|
||||
rv = (long)(r.getLifetimeAverageValue() * getTunnelTestDeviationLimit());
|
||||
else
|
||||
@ -94,9 +95,28 @@ class TestTunnelJob extends JobImpl {
|
||||
}
|
||||
}
|
||||
}
|
||||
long min = getMinimumTestTimeout();
|
||||
if (rv < min)
|
||||
rv = min;
|
||||
|
||||
// lets back off if we're failing
|
||||
rs = getContext().statManager().getRate("tunnel.failAfterTime");
|
||||
if (rs != null) {
|
||||
Rate r = rs.getRate(5*60*1000);
|
||||
if (r != null) {
|
||||
long failures = r.getLastEventCount() + r.getCurrentEventCount();
|
||||
if (failures > 0) {
|
||||
rv <<= failures;
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Tunnels are failing (" + failures + "), so set the timeout to " + rv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (rv > MAXIMUM_TEST_TIMEOUT) {
|
||||
rv = MAXIMUM_TEST_TIMEOUT;
|
||||
} else {
|
||||
long min = getMinimumTestTimeout();
|
||||
if (rv < min)
|
||||
rv = min;
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
@ -143,10 +163,11 @@ class TestTunnelJob extends JobImpl {
|
||||
|
||||
TunnelInfo inboundInfo = _pool.getTunnelInfo(_secondaryId);
|
||||
inboundInfo.setLastTested(getContext().clock().now());
|
||||
|
||||
|
||||
long timeout = getTunnelTestTimeout();
|
||||
TestFailedJob failureJob = new TestFailedJob();
|
||||
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
|
||||
SendTunnelMessageJob testJob = new SendTunnelMessageJob(getContext(), msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(), failureJob, selector, getTunnelTestTimeout(), TEST_PRIORITY);
|
||||
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId(), timeout);
|
||||
SendTunnelMessageJob testJob = new SendTunnelMessageJob(getContext(), msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(timeout), failureJob, selector, timeout, TEST_PRIORITY);
|
||||
getContext().jobQueue().addJob(testJob);
|
||||
}
|
||||
|
||||
@ -169,9 +190,10 @@ class TestTunnelJob extends JobImpl {
|
||||
TunnelInfo outboundInfo = _pool.getTunnelInfo(_secondaryId);
|
||||
outboundInfo.setLastTested(getContext().clock().now());
|
||||
|
||||
long timeout = getTunnelTestTimeout();
|
||||
TestFailedJob failureJob = new TestFailedJob();
|
||||
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
|
||||
SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(), failureJob, selector, getTunnelTestTimeout(), TEST_PRIORITY);
|
||||
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId(), timeout);
|
||||
SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(timeout), failureJob, selector, timeout, TEST_PRIORITY);
|
||||
getContext().jobQueue().addJob(j);
|
||||
}
|
||||
|
||||
@ -255,9 +277,11 @@ class TestTunnelJob extends JobImpl {
|
||||
|
||||
private class TestSuccessfulJob extends JobImpl implements ReplyJob {
|
||||
private DeliveryStatusMessage _msg;
|
||||
public TestSuccessfulJob() {
|
||||
private long _timeout;
|
||||
public TestSuccessfulJob(long timeout) {
|
||||
super(TestTunnelJob.this.getContext());
|
||||
_msg = null;
|
||||
_timeout = timeout;
|
||||
}
|
||||
|
||||
public String getName() { return "Tunnel Test Successful"; }
|
||||
@ -267,7 +291,7 @@ class TestTunnelJob extends JobImpl {
|
||||
_log.info("Test of tunnel " + _primaryId+ " successfull after "
|
||||
+ time + "ms waiting for " + _nonce);
|
||||
|
||||
if (time > getTunnelTestTimeout()) {
|
||||
if (time > _timeout) {
|
||||
return; // the test failed job should already have run
|
||||
}
|
||||
|
||||
@ -305,11 +329,11 @@ class TestTunnelJob extends JobImpl {
|
||||
private long _tunnelId;
|
||||
private boolean _found;
|
||||
private long _expiration;
|
||||
public TestMessageSelector(long id, long tunnelId) {
|
||||
public TestMessageSelector(long id, long tunnelId, long timeoutMs) {
|
||||
_id = id;
|
||||
_tunnelId = tunnelId;
|
||||
_found = false;
|
||||
_expiration = getContext().clock().now() + getTunnelTestTimeout();
|
||||
_expiration = getContext().clock().now() + timeoutMs;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("the expiration while testing tunnel " + tunnelId
|
||||
+ " waiting for nonce " + id + ": " + new Date(_expiration));
|
||||
|
Reference in New Issue
Block a user