2004-12-11 jrandom
* Use a simpler and less memory intensive job for processing outbound client messages when the session is in mode=bestEffort. We can immediately discard the data as soon as its sent the first time, rather than wait for an ack, since we will never internally resend. * Reduce some synchronization to avoid a rare deadlock * Replaced 'localhost' with 127.0.0.1 in the i2ptunnel config, and special case it within the tunnel controller. * Script cleanup for building jbigi/jcpuid * Logging
This commit is contained in:
@ -233,6 +233,9 @@ public class TunnelController implements Logging {
|
||||
String host = getI2CPHost();
|
||||
if ( (host != null) && (host.length() > 0) )
|
||||
_tunnel.host = host;
|
||||
// woohah, special casing for people with ipv6/etc
|
||||
if ("localhost".equals(_tunnel.host))
|
||||
_tunnel.host = "127.0.0.1";
|
||||
String port = getI2CPPort();
|
||||
if ( (port != null) && (port.length() > 0) )
|
||||
_tunnel.port = port;
|
||||
|
@ -103,7 +103,7 @@ class WebEditPageFormGenerator {
|
||||
if ( (controller != null) && (controller.getTargetHost() != null) )
|
||||
buf.append("value=\"").append(controller.getTargetHost()).append("\" ");
|
||||
else
|
||||
buf.append("value=\"localhost\" ");
|
||||
buf.append("value=\"127.0.0.1\" ");
|
||||
buf.append(" /><br />\n");
|
||||
|
||||
buf.append("<b>Target port:</b> <input type=\"text\" size=\"4\" name=\"targetPort\" ");
|
||||
@ -285,7 +285,7 @@ class WebEditPageFormGenerator {
|
||||
if ( (controller != null) && (controller.getI2CPHost() != null) )
|
||||
buf.append(controller.getI2CPHost());
|
||||
else
|
||||
buf.append("localhost");
|
||||
buf.append("127.0.0.1");
|
||||
buf.append("\" /><br />\n");
|
||||
buf.append("<b>I2CP port:</b> ");
|
||||
buf.append("<input type=\"text\" name=\"clientPort\" size=\"20\" value=\"");
|
||||
|
@ -102,7 +102,8 @@ public class I2PSocketManagerFactory {
|
||||
|
||||
if (i2cpHost != null)
|
||||
opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost);
|
||||
opts.setProperty(I2PClient.PROP_TCP_PORT, "" + i2cpPort);
|
||||
if (i2cpPort > 0)
|
||||
opts.setProperty(I2PClient.PROP_TCP_PORT, "" + i2cpPort);
|
||||
|
||||
try {
|
||||
I2PSession session = client.createSession(myPrivateKeyStream, opts);
|
||||
|
@ -72,7 +72,7 @@ public class TestSwarm {
|
||||
}
|
||||
}
|
||||
try {
|
||||
_manager = I2PSocketManagerFactory.createManager(new FileInputStream(_destFile), "localhost", 7654, null);
|
||||
_manager = I2PSocketManagerFactory.createManager(new FileInputStream(_destFile), null, -1, null);
|
||||
} catch (Exception e) {
|
||||
_log.error("Error creatign the manager", e);
|
||||
return;
|
||||
|
@ -693,6 +693,9 @@ public class Connection {
|
||||
buf.append(" wsize: ").append(_options.getWindowSize());
|
||||
buf.append(" cwin: ").append(_congestionWindowEnd - _highestAckedThrough);
|
||||
buf.append(" rtt: ").append(_options.getRTT());
|
||||
// not synchronized to avoid some kooky races
|
||||
buf.append(" unacked outbound: ").append(_outboundPackets.size()).append(" ");
|
||||
/*
|
||||
buf.append(" unacked outbound: ");
|
||||
synchronized (_outboundPackets) {
|
||||
buf.append(_outboundPackets.size()).append(" [");
|
||||
@ -701,6 +704,7 @@ public class Connection {
|
||||
}
|
||||
buf.append("] ");
|
||||
}
|
||||
*/
|
||||
buf.append("unacked inbound? ").append(getUnackedPacketsReceived());
|
||||
if (_inputStream != null) {
|
||||
buf.append(" [high ");
|
||||
|
13
history.txt
13
history.txt
@ -1,4 +1,15 @@
|
||||
$Id: history.txt,v 1.101 2004/12/08 12:16:17 jrandom Exp $
|
||||
$Id: history.txt,v 1.102 2004/12/08 16:08:11 jrandom Exp $
|
||||
|
||||
2004-12-11 jrandom
|
||||
* Use a simpler and less memory intensive job for processing outbound
|
||||
client messages when the session is in mode=bestEffort. We can
|
||||
immediately discard the data as soon as its sent the first time,
|
||||
rather than wait for an ack, since we will never internally resend.
|
||||
* Reduce some synchronization to avoid a rare deadlock
|
||||
* Replaced 'localhost' with 127.0.0.1 in the i2ptunnel config, and special
|
||||
case it within the tunnel controller.
|
||||
* Script cleanup for building jbigi/jcpuid
|
||||
* Logging
|
||||
|
||||
* 2004-12-08 0.4.2.3 released
|
||||
|
||||
|
@ -5,7 +5,7 @@ tunnel.0.type=httpclient
|
||||
tunnel.0.interface=127.0.0.1
|
||||
tunnel.0.listenPort=4444
|
||||
tunnel.0.proxyList=squid.i2p,www1.squid.i2p
|
||||
tunnel.0.i2cpHost=localhost
|
||||
tunnel.0.i2cpHost=127.0.0.1
|
||||
tunnel.0.i2cpPort=7654
|
||||
tunnel.0.option.tunnels.depthInbound=2
|
||||
tunnel.0.option.tunnels.numInbound=2
|
||||
@ -19,7 +19,7 @@ tunnel.1.type=client
|
||||
tunnel.1.interface=127.0.0.1
|
||||
tunnel.1.listenPort=6668
|
||||
tunnel.1.targetDestination=irc.duck.i2p,irc.baffled.i2p
|
||||
tunnel.1.i2cpHost=localhost
|
||||
tunnel.1.i2cpHost=127.0.0.1
|
||||
tunnel.1.i2cpPort=7654
|
||||
tunnel.1.option.tunnels.depthInbound=2
|
||||
tunnel.1.option.tunnels.numInbound=2
|
||||
@ -33,7 +33,7 @@ tunnel.2.type=client
|
||||
tunnel.2.interface=127.0.0.1
|
||||
tunnel.2.listenPort=2401
|
||||
tunnel.2.targetDestination=cvs.i2p
|
||||
tunnel.2.i2cpHost=localhost
|
||||
tunnel.2.i2cpHost=127.0.0.1
|
||||
tunnel.2.i2cpPort=7654
|
||||
tunnel.2.option.tunnels.depthInbound=2
|
||||
tunnel.2.option.tunnels.numInbound=2
|
||||
@ -43,10 +43,10 @@ tunnel.2.startOnLoad=false
|
||||
tunnel.3.name=eepsite
|
||||
tunnel.3.description=My eepsite
|
||||
tunnel.3.type=server
|
||||
tunnel.3.targetHost=localhost
|
||||
tunnel.3.targetHost=127.0.0.1
|
||||
tunnel.3.targetPort=7658
|
||||
tunnel.3.privKeyFile=eepsite/eepPriv.dat
|
||||
tunnel.3.i2cpHost=localhost
|
||||
tunnel.3.i2cpHost=127.0.0.1
|
||||
tunnel.3.i2cpPort=7654
|
||||
tunnel.3.option.tunnels.depthInbound=2
|
||||
tunnel.3.option.tunnels.numInbound=2
|
||||
@ -54,7 +54,7 @@ tunnel.3.startOnLoad=true
|
||||
|
||||
# postman's SMTP server - see www.postman.i2p
|
||||
tunnel.4.description=smtp server
|
||||
tunnel.4.i2cpHost=localhost
|
||||
tunnel.4.i2cpHost=127.0.0.1
|
||||
tunnel.4.i2cpPort=7654
|
||||
tunnel.4.interface=127.0.0.1
|
||||
tunnel.4.listenPort=7659
|
||||
@ -69,7 +69,7 @@ tunnel.4.type=client
|
||||
# postman's POP3 server - see www.postman.i2p
|
||||
tunnel.5.name=pop3.postman.i2p
|
||||
tunnel.5.description=pop3 server
|
||||
tunnel.5.i2cpHost=localhost
|
||||
tunnel.5.i2cpHost=127.0.0.1
|
||||
tunnel.5.i2cpPort=7654
|
||||
tunnel.5.interface=127.0.0.1
|
||||
tunnel.5.listenPort=7660
|
||||
@ -78,4 +78,4 @@ tunnel.5.option.tunnels.numInbound=2
|
||||
tunnel.5.option.i2p.streaming.connectDelay=1000
|
||||
tunnel.5.startOnLoad=false
|
||||
tunnel.5.targetDestination=pop.postman.i2p
|
||||
tunnel.5.type=client
|
||||
tunnel.5.type=client
|
||||
|
@ -8,8 +8,12 @@ package net.i2p.router;
|
||||
*
|
||||
*/
|
||||
|
||||
//import net.i2p.router.message.ProcessOutboundClientMessageJob;
|
||||
import java.util.Properties;
|
||||
|
||||
import net.i2p.client.I2PClient;
|
||||
|
||||
import net.i2p.router.message.OutboundClientMessageJob;
|
||||
import net.i2p.router.message.OutboundClientMessageOneShotJob;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
@ -55,7 +59,22 @@ public class ClientMessagePool {
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Adding message for remote delivery");
|
||||
_context.jobQueue().addJob(new OutboundClientMessageJob(_context, msg));
|
||||
if (isGuaranteed(msg))
|
||||
_context.jobQueue().addJob(new OutboundClientMessageJob(_context, msg));
|
||||
else
|
||||
_context.jobQueue().addJob(new OutboundClientMessageOneShotJob(_context, msg));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isGuaranteed(ClientMessage msg) {
|
||||
Properties opts = null;
|
||||
if (msg.getSenderConfig() != null)
|
||||
opts = msg.getSenderConfig().getOptions();
|
||||
if (opts != null) {
|
||||
String val = opts.getProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
|
||||
return val.equals(I2PClient.PROP_RELIABILITY_GUARANTEED);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.106 $ $Date: 2004/12/08 12:16:17 $";
|
||||
public final static String ID = "$Revision: 1.107 $ $Date: 2004/12/08 16:08:10 $";
|
||||
public final static String VERSION = "0.4.2.3";
|
||||
public final static long BUILD = 0;
|
||||
public final static long BUILD = 1;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION);
|
||||
System.out.println("Router ID: " + RouterVersion.ID);
|
||||
|
@ -83,6 +83,9 @@ class ClientWriterRunner implements Runnable {
|
||||
for (int i = 0; i < messages.size(); i++) {
|
||||
I2CPMessage msg = (I2CPMessage)messages.get(i);
|
||||
Long when = (Long)messageTimes.get(i);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("["+_id+"] writeMessage before writing "
|
||||
+ msg.getClass().getName());
|
||||
_runner.writeMessage(msg);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("["+_id+"] writeMessage time since addMessage(): "
|
||||
|
@ -0,0 +1,481 @@
|
||||
package net.i2p.router.message;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import net.i2p.data.Certificate;
|
||||
import net.i2p.data.Destination;
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.data.Lease;
|
||||
import net.i2p.data.LeaseSet;
|
||||
import net.i2p.data.PublicKey;
|
||||
import net.i2p.data.SessionTag;
|
||||
import net.i2p.data.SessionKey;
|
||||
import net.i2p.data.TunnelId;
|
||||
|
||||
import net.i2p.data.i2cp.MessageId;
|
||||
|
||||
import net.i2p.data.i2np.DataMessage;
|
||||
import net.i2p.data.i2np.DeliveryStatusMessage;
|
||||
import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.data.i2np.GarlicMessage;
|
||||
import net.i2p.data.i2np.DeliveryInstructions;
|
||||
|
||||
import net.i2p.router.message.PayloadGarlicConfig;
|
||||
|
||||
import net.i2p.router.ClientMessage;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.ReplyJob;
|
||||
import net.i2p.router.Router;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelSelectionCriteria;
|
||||
import net.i2p.router.MessageSelector;
|
||||
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Send a client message out a random outbound tunnel and into a random inbound
|
||||
* tunnel on the target leaseSet. This also bundles the sender's leaseSet and
|
||||
* a DeliveryStatusMessage (for ACKing any sessionTags used in the garlic).
|
||||
*
|
||||
*/
|
||||
public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
private Log _log;
|
||||
private long _overallExpiration;
|
||||
private boolean _shouldBundle;
|
||||
private ClientMessage _clientMessage;
|
||||
private MessageId _clientMessageId;
|
||||
private int _clientMessageSize;
|
||||
private Destination _from;
|
||||
/** target destination's leaseSet, if known */
|
||||
private LeaseSet _leaseSet;
|
||||
/** Actual lease the message is being routed through */
|
||||
private Lease _lease;
|
||||
private PayloadGarlicConfig _clove;
|
||||
private long _cloveId;
|
||||
private long _start;
|
||||
|
||||
/**
|
||||
* final timeout (in milliseconds) that the outbound message will fail in.
|
||||
* This can be overridden in the router.config or the client's session config
|
||||
* (the client's session config takes precedence)
|
||||
*/
|
||||
public final static String OVERALL_TIMEOUT_MS_PARAM = "clientMessageTimeout";
|
||||
private final static long OVERALL_TIMEOUT_MS_DEFAULT = 60*1000;
|
||||
|
||||
/** priority of messages, that might get honored some day... */
|
||||
private final static int SEND_PRIORITY = 500;
|
||||
|
||||
/**
|
||||
* If the client's config specifies shouldBundleReplyInfo=true, messages sent from
|
||||
* that client to any peers will probabalistically include the sending destination's
|
||||
* current LeaseSet (allowing the recipient to reply without having to do a full
|
||||
* netDb lookup). This should improve performance during the initial negotiations,
|
||||
* but is not necessary for communication that isn't bidirectional.
|
||||
*
|
||||
*/
|
||||
public static final String BUNDLE_REPLY_LEASESET = "shouldBundleReplyInfo";
|
||||
/**
|
||||
* Allow the override of the frequency of bundling the reply info in with a message.
|
||||
* The client app can specify bundleReplyInfoProbability=80 (for instance) and that
|
||||
* will cause the router to include the sender's leaseSet with 80% of the messages
|
||||
* sent to the peer.
|
||||
*
|
||||
*/
|
||||
public static final String BUNDLE_PROBABILITY = "bundleReplyInfoProbability";
|
||||
/**
|
||||
* How often do messages include the reply leaseSet (out of every 100 tries).
|
||||
* Including it each time is probably overkill, but who knows.
|
||||
*/
|
||||
private static final int BUNDLE_PROBABILITY_DEFAULT = 100;
|
||||
|
||||
/**
|
||||
* Send the sucker
|
||||
*/
|
||||
public OutboundClientMessageOneShotJob(RouterContext ctx, ClientMessage msg) {
|
||||
super(ctx);
|
||||
_log = ctx.logManager().getLog(OutboundClientMessageOneShotJob.class);
|
||||
|
||||
ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.sendAckTime", "How long does it take to get an ACK back from a message?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
|
||||
long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
|
||||
_clientMessage = msg;
|
||||
_clientMessageId = msg.getMessageId();
|
||||
_clientMessageSize = msg.getPayload().getSize();
|
||||
_from = msg.getFromDestination();
|
||||
|
||||
String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM);
|
||||
if (param == null)
|
||||
param = ctx.router().getConfigSetting(OVERALL_TIMEOUT_MS_PARAM);
|
||||
if (param != null) {
|
||||
try {
|
||||
timeoutMs = Long.parseLong(param);
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid client message timeout specified [" + param
|
||||
+ "], defaulting to " + OVERALL_TIMEOUT_MS_DEFAULT, nfe);
|
||||
timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
|
||||
}
|
||||
}
|
||||
|
||||
_start = getContext().clock().now();
|
||||
_overallExpiration = timeoutMs + _start;
|
||||
_shouldBundle = getShouldBundle();
|
||||
}
|
||||
|
||||
public String getName() { return "Outbound client message"; }
|
||||
|
||||
public void runJob() {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": Send outbound client message job beginning");
|
||||
buildClove();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": Clove built");
|
||||
Hash to = _clientMessage.getDestination().calculateHash();
|
||||
long timeoutMs = _overallExpiration - getContext().clock().now();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job");
|
||||
getContext().netDb().lookupLeaseSet(to, new SendJob(), new LookupLeaseSetFailedJob(), timeoutMs);
|
||||
}
|
||||
|
||||
private boolean getShouldBundle() {
|
||||
Properties opts = _clientMessage.getSenderConfig().getOptions();
|
||||
String wantBundle = opts.getProperty(BUNDLE_REPLY_LEASESET, "true");
|
||||
if ("true".equals(wantBundle)) {
|
||||
int probability = BUNDLE_PROBABILITY_DEFAULT;
|
||||
String str = opts.getProperty(BUNDLE_PROBABILITY);
|
||||
try {
|
||||
if (str != null)
|
||||
probability = Integer.parseInt(str);
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(getJobId() + ": Bundle leaseSet probability overridden incorrectly ["
|
||||
+ str + "]", nfe);
|
||||
}
|
||||
if (probability >= getContext().random().nextInt(100))
|
||||
return true;
|
||||
else
|
||||
return false;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/** send a message to a random lease */
|
||||
private class SendJob extends JobImpl {
|
||||
public SendJob() {
|
||||
super(OutboundClientMessageOneShotJob.this.getContext());
|
||||
}
|
||||
public String getName() { return "Send outbound client message through the lease"; }
|
||||
public void runJob() {
|
||||
boolean ok = getNextLease();
|
||||
if (ok)
|
||||
send();
|
||||
else
|
||||
dieFatal();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* fetch the next lease that we should try sending through, randomly chosen
|
||||
* from within the sorted leaseSet (sorted by # of failures through each
|
||||
* lease).
|
||||
*
|
||||
*/
|
||||
private boolean getNextLease() {
|
||||
_leaseSet = getContext().netDb().lookupLeaseSetLocally(_clientMessage.getDestination().calculateHash());
|
||||
if (_leaseSet == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(getJobId() + ": Lookup locally didn't find the leaseSet");
|
||||
return false;
|
||||
}
|
||||
long now = getContext().clock().now();
|
||||
|
||||
// get the possible leases
|
||||
List leases = new ArrayList(_leaseSet.getLeaseCount());
|
||||
for (int i = 0; i < _leaseSet.getLeaseCount(); i++) {
|
||||
Lease lease = _leaseSet.getLease(i);
|
||||
if (lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(getJobId() + ": getNextLease() - expired lease! - " + lease);
|
||||
continue;
|
||||
} else {
|
||||
leases.add(lease);
|
||||
}
|
||||
}
|
||||
|
||||
if (leases.size() <= 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": No leases found from: " + _leaseSet);
|
||||
return false;
|
||||
}
|
||||
|
||||
// randomize the ordering (so leases with equal # of failures per next
|
||||
// sort are randomly ordered)
|
||||
Collections.shuffle(leases);
|
||||
|
||||
// ordered by lease number of failures
|
||||
TreeMap orderedLeases = new TreeMap();
|
||||
for (Iterator iter = leases.iterator(); iter.hasNext(); ) {
|
||||
Lease lease = (Lease)iter.next();
|
||||
long id = lease.getNumFailure();
|
||||
while (orderedLeases.containsKey(new Long(id)))
|
||||
id++;
|
||||
orderedLeases.put(new Long(id), lease);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": ranking lease we havent sent it down as " + id);
|
||||
}
|
||||
|
||||
_lease = (Lease)orderedLeases.get(orderedLeases.firstKey());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* we couldn't even find the leaseSet, but try again (or die
|
||||
* if we've already tried too hard)
|
||||
*
|
||||
*/
|
||||
private class LookupLeaseSetFailedJob extends JobImpl {
|
||||
public LookupLeaseSetFailedJob() {
|
||||
super(OutboundClientMessageOneShotJob.this.getContext());
|
||||
}
|
||||
public String getName() { return "Lookup for outbound client message failed"; }
|
||||
public void runJob() {
|
||||
dieFatal();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the message to the specified tunnel by creating a new garlic message containing
|
||||
* the (already created) payload clove as well as a new delivery status message. This garlic
|
||||
* message is sent out one of our tunnels, destined for the lease (tunnel+router) specified, and the delivery
|
||||
* status message is targetting one of our free inbound tunnels as well. We use a new
|
||||
* reply selector to keep an eye out for that delivery status message's token
|
||||
*
|
||||
*/
|
||||
private void send() {
|
||||
long token = getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
|
||||
PublicKey key = _leaseSet.getEncryptionKey();
|
||||
SessionKey sessKey = new SessionKey();
|
||||
Set tags = new HashSet();
|
||||
LeaseSet replyLeaseSet = null;
|
||||
if (_shouldBundle) {
|
||||
replyLeaseSet = getContext().netDb().lookupLeaseSetLocally(_clientMessage.getFromDestination().calculateHash());
|
||||
}
|
||||
|
||||
GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token,
|
||||
_overallExpiration, key,
|
||||
_clove,
|
||||
_clientMessage.getDestination(),
|
||||
sessKey, tags,
|
||||
true, replyLeaseSet);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": send() - token expected " + token);
|
||||
|
||||
SendSuccessJob onReply = new SendSuccessJob(sessKey, tags);
|
||||
SendTimeoutJob onFail = new SendTimeoutJob();
|
||||
ReplySelector selector = new ReplySelector(token);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": Placing GarlicMessage into the new tunnel message bound for "
|
||||
+ _lease.getTunnelId() + " on "
|
||||
+ _lease.getRouterIdentity().getHash().toBase64());
|
||||
|
||||
TunnelId outTunnelId = selectOutboundTunnel();
|
||||
if (outTunnelId != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": Sending tunnel message out " + outTunnelId + " to "
|
||||
+ _lease.getTunnelId() + " on "
|
||||
+ _lease.getRouterIdentity().getHash().toBase64());
|
||||
SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, outTunnelId,
|
||||
_lease.getRouterIdentity().getHash(),
|
||||
_lease.getTunnelId(), null, onReply,
|
||||
onFail, selector,
|
||||
_overallExpiration-getContext().clock().now(),
|
||||
SEND_PRIORITY);
|
||||
getContext().jobQueue().addJob(j);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error(getJobId() + ": Could not find any outbound tunnels to send the payload through... wtf?");
|
||||
dieFatal();
|
||||
}
|
||||
_clientMessage = null;
|
||||
_clove = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick an arbitrary outbound tunnel to send the message through, or null if
|
||||
* there aren't any around
|
||||
*
|
||||
*/
|
||||
private TunnelId selectOutboundTunnel() {
|
||||
TunnelSelectionCriteria crit = new TunnelSelectionCriteria();
|
||||
crit.setMaximumTunnelsRequired(1);
|
||||
crit.setMinimumTunnelsRequired(1);
|
||||
List tunnelIds = getContext().tunnelManager().selectOutboundTunnelIds(crit);
|
||||
if (tunnelIds.size() <= 0)
|
||||
return null;
|
||||
else
|
||||
return (TunnelId)tunnelIds.get(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* give up the ghost, this message just aint going through. tell the client to fuck off.
|
||||
*
|
||||
* this is safe to call multiple times (only tells the client once)
|
||||
*/
|
||||
private void dieFatal() {
|
||||
long sendTime = getContext().clock().now() - _start;
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(getJobId() + ": Failed to send the message " + _clientMessageId + " after "
|
||||
+ sendTime + "ms", new Exception("Message send failure"));
|
||||
|
||||
long messageDelay = getContext().throttle().getMessageDelay();
|
||||
long tunnelLag = getContext().throttle().getTunnelLag();
|
||||
long inboundDelta = (long)getContext().throttle().getInboundRateDelta();
|
||||
|
||||
getContext().statManager().addRateData("client.timeoutCongestionTunnel", tunnelLag, 1);
|
||||
getContext().statManager().addRateData("client.timeoutCongestionMessage", messageDelay, 1);
|
||||
getContext().statManager().addRateData("client.timeoutCongestionInbound", inboundDelta, 1);
|
||||
|
||||
getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime);
|
||||
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, false);
|
||||
getContext().statManager().updateFrequency("client.sendMessageFailFrequency");
|
||||
_clientMessage = null;
|
||||
_clove = null;
|
||||
}
|
||||
|
||||
/** build the payload clove that will be used for all of the messages, placing the clove in the status structure */
|
||||
private void buildClove() {
|
||||
PayloadGarlicConfig clove = new PayloadGarlicConfig();
|
||||
|
||||
DeliveryInstructions instructions = new DeliveryInstructions();
|
||||
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_DESTINATION);
|
||||
instructions.setDestination(_clientMessage.getDestination().calculateHash());
|
||||
|
||||
instructions.setDelayRequested(false);
|
||||
instructions.setDelaySeconds(0);
|
||||
instructions.setEncrypted(false);
|
||||
|
||||
clove.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
|
||||
clove.setDeliveryInstructions(instructions);
|
||||
clove.setExpiration(_overallExpiration);
|
||||
clove.setId(getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE));
|
||||
|
||||
DataMessage msg = new DataMessage(getContext());
|
||||
msg.setData(_clientMessage.getPayload().getEncryptedData());
|
||||
|
||||
clove.setPayload(msg);
|
||||
clove.setRecipientPublicKey(null);
|
||||
clove.setRequestAck(false);
|
||||
|
||||
_clove = clove;
|
||||
_cloveId = _clove.getId();
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": Built payload clove with id " + clove.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Keep an eye out for any of the delivery status message tokens that have been
|
||||
* sent down the various tunnels to deliver this message
|
||||
*
|
||||
*/
|
||||
private class ReplySelector implements MessageSelector {
|
||||
private long _pendingToken;
|
||||
public ReplySelector(long token) {
|
||||
_pendingToken = token;
|
||||
}
|
||||
|
||||
public boolean continueMatching() { return false; }
|
||||
public long getExpiration() { return _overallExpiration; }
|
||||
|
||||
public boolean isMatch(I2NPMessage inMsg) {
|
||||
if (inMsg.getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
|
||||
return _pendingToken == ((DeliveryStatusMessage)inMsg).getMessageId();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after we get a confirmation that the message was delivered safely
|
||||
* (hoo-ray!)
|
||||
*
|
||||
*/
|
||||
private class SendSuccessJob extends JobImpl implements ReplyJob {
|
||||
private SessionKey _key;
|
||||
private Set _tags;
|
||||
|
||||
/**
|
||||
* Create a new success job that will be fired when the message encrypted with
|
||||
* the given session key and bearing the specified tags are confirmed delivered.
|
||||
*
|
||||
*/
|
||||
public SendSuccessJob(SessionKey key, Set tags) {
|
||||
super(OutboundClientMessageOneShotJob.this.getContext());
|
||||
_key = key;
|
||||
_tags = tags;
|
||||
}
|
||||
|
||||
public String getName() { return "Send client message successful to a lease"; }
|
||||
public void runJob() {
|
||||
long sendTime = getContext().clock().now() - _start;
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(OutboundClientMessageOneShotJob.this.getJobId()
|
||||
+ ": SUCCESS! msg " + _clientMessageId
|
||||
+ " sent after " + sendTime + "ms");
|
||||
|
||||
if ( (_key != null) && (_tags != null) && (_tags.size() > 0) ) {
|
||||
if (_leaseSet != null)
|
||||
getContext().sessionKeyManager().tagsDelivered(_leaseSet.getEncryptionKey(),
|
||||
_key, _tags);
|
||||
}
|
||||
|
||||
long dataMsgId = _cloveId;
|
||||
getContext().messageHistory().sendPayloadMessage(dataMsgId, true, sendTime);
|
||||
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, true);
|
||||
_lease.setNumSuccess(_lease.getNumSuccess()+1);
|
||||
|
||||
getContext().statManager().addRateData("client.sendAckTime", sendTime, 0);
|
||||
getContext().statManager().addRateData("client.sendMessageSize", _clientMessageSize, sendTime);
|
||||
}
|
||||
|
||||
public void setMessage(I2NPMessage msg) {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fired after the basic timeout for sending through the given tunnel has been reached.
|
||||
* We'll accept successes later, but won't expect them
|
||||
*
|
||||
*/
|
||||
private class SendTimeoutJob extends JobImpl {
|
||||
public SendTimeoutJob() {
|
||||
super(OutboundClientMessageOneShotJob.this.getContext());
|
||||
}
|
||||
|
||||
public String getName() { return "Send client message timed out through a lease"; }
|
||||
public void runJob() {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(OutboundClientMessageOneShotJob.this.getJobId()
|
||||
+ ": Soft timeout through the lease " + _lease);
|
||||
|
||||
_lease.setNumFailure(_lease.getNumFailure()+1);
|
||||
dieFatal();
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user