forked from I2P_Developers/i2p.i2p
increase the bundle probability to yet another arbitrary value
add the jobId to log messages to simplify tracing individual parallel sends logging cleanup
This commit is contained in:
@ -7,6 +7,7 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import net.i2p.crypto.SessionKeyManager;
|
||||
import net.i2p.data.Certificate;
|
||||
@ -99,7 +100,7 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
* How often do messages include the reply leaseSet (out of every 100 tries).
|
||||
* Including it each time is probably overkill, but who knows.
|
||||
*/
|
||||
private static final int BUNDLE_PROBABILITY_DEFAULT = 30;
|
||||
private static final int BUNDLE_PROBABILITY_DEFAULT = 80;
|
||||
|
||||
/**
|
||||
* Send the sucker
|
||||
@ -140,14 +141,14 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
|
||||
public void runJob() {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Send outbound client message job beginning");
|
||||
_log.debug(getJobId() + ": Send outbound client message job beginning");
|
||||
buildClove();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Clove built");
|
||||
_log.debug(getJobId() + ": Clove built");
|
||||
Hash to = _status.getTo().calculateHash();
|
||||
long timeoutMs = _overallExpiration - _context.clock().now();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Send outbound client message - sending off leaseSet lookup job");
|
||||
_log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job");
|
||||
_status.incrementLookups();
|
||||
_context.netDb().lookupLeaseSet(to, _nextStep, _lookupLeaseSetFailed, timeoutMs);
|
||||
}
|
||||
@ -157,24 +158,24 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
*/
|
||||
private void sendNext() {
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug("sendNext() called with " + _status.getNumSent() + " already sent");
|
||||
_log.debug(getJobId() + ": sendNext() called with " + _status.getNumSent() + " already sent");
|
||||
}
|
||||
|
||||
if (_status.getSuccess()) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("sendNext() - already successful!");
|
||||
_log.debug(getJobId() + ": sendNext() - already successful!");
|
||||
return;
|
||||
}
|
||||
if (_status.getFailure()) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("sendNext() - already failed!");
|
||||
_log.debug(getJobId() + ": sendNext() - already failed!");
|
||||
return;
|
||||
}
|
||||
|
||||
long now = _context.clock().now();
|
||||
if (now >= _overallExpiration) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("sendNext() - Expired");
|
||||
_log.warn(getJobId() + ": sendNext() - Expired");
|
||||
dieFatal();
|
||||
return;
|
||||
}
|
||||
@ -182,26 +183,26 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
Lease nextLease = getNextLease();
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Send outbound client message - next lease found for ["
|
||||
_log.debug(getJobId() + ": Send outbound client message - next lease found for ["
|
||||
+ _status.getTo().calculateHash().toBase64() + "] - "
|
||||
+ nextLease);
|
||||
|
||||
if (nextLease == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("No more leases, and we still haven't heard back from the peer"
|
||||
_log.warn(getJobId() + ": No more leases, and we still haven't heard back from the peer"
|
||||
+ ", refetching the leaseSet to try again");
|
||||
_status.setLeaseSet(null);
|
||||
long remainingMs = _overallExpiration - _context.clock().now();
|
||||
if (_status.getNumLookups() < MAX_LEASE_LOOKUPS) {
|
||||
_status.incrementLookups();
|
||||
Hash to = _status.getMessage().getDestination().calculateHash();
|
||||
_status.clearAlreadySent();
|
||||
_context.netDb().fail(to);
|
||||
_status.clearAlreadySent(); // so we can send down old tunnels again
|
||||
_context.netDb().fail(to); // so we don't just fetch what we have
|
||||
_context.netDb().lookupLeaseSet(to, _nextStep, _lookupLeaseSetFailed, remainingMs);
|
||||
return;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("sendNext() - max # lease lookups exceeded! "
|
||||
_log.warn(getJobId() + ": sendNext() - max # lease lookups exceeded! "
|
||||
+ _status.getNumLookups());
|
||||
dieFatal();
|
||||
return;
|
||||
@ -224,12 +225,12 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
if (ls == null) {
|
||||
ls = _context.netDb().lookupLeaseSetLocally(_status.getTo().calculateHash());
|
||||
if (ls == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Lookup locally didn't find the leaseSet");
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Lookup locally didn't find the leaseSet");
|
||||
return null;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Lookup locally DID find the leaseSet");
|
||||
_log.debug(getJobId() + ": Lookup locally DID find the leaseSet");
|
||||
}
|
||||
_status.setLeaseSet(ls);
|
||||
}
|
||||
@ -241,7 +242,7 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
Lease lease = ls.getLease(i);
|
||||
if (lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("getNextLease() - expired lease! - " + lease);
|
||||
_log.warn(getJobId() + ": getNextLease() - expired lease! - " + lease);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -249,12 +250,19 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
leases.add(lease);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("getNextLease() - skipping lease we've already sent it down - "
|
||||
_log.debug(getJobId() + ": getNextLease() - skipping lease we've already sent it down - "
|
||||
+ lease);
|
||||
}
|
||||
}
|
||||
|
||||
// randomize the ordering (so leases with equal # of failures per next sort are randomly ordered)
|
||||
if (leases.size() <= 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": No leases found, since we've tried them all (so fail it and relookup)");
|
||||
return null;
|
||||
}
|
||||
|
||||
// randomize the ordering (so leases with equal # of failures per next
|
||||
// sort are randomly ordered)
|
||||
Collections.shuffle(leases);
|
||||
|
||||
// ordered by lease number of failures
|
||||
@ -266,29 +274,25 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
id++;
|
||||
orderedLeases.put(new Long(id), lease);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("getNextLease() - ranking lease we havent sent it down as " + id);
|
||||
_log.debug(getJobId() + ": ranking lease we havent sent it down as " + id);
|
||||
}
|
||||
|
||||
if (orderedLeases.size() <= 0) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("No leases in the ordered set found! all = " + leases.size());
|
||||
return null;
|
||||
} else {
|
||||
return (Lease)orderedLeases.get(orderedLeases.firstKey());
|
||||
}
|
||||
return (Lease)orderedLeases.get(orderedLeases.firstKey());
|
||||
}
|
||||
|
||||
private boolean getShouldBundle() {
|
||||
String wantBundle = _status.getMessage().getSenderConfig().getOptions().getProperty(BUNDLE_REPLY_LEASESET, "true");
|
||||
Properties opts = _status.getMessage().getSenderConfig().getOptions();
|
||||
String wantBundle = opts.getProperty(BUNDLE_REPLY_LEASESET, "true");
|
||||
if ("true".equals(wantBundle)) {
|
||||
int probability = BUNDLE_PROBABILITY_DEFAULT;
|
||||
String str = _status.getMessage().getSenderConfig().getOptions().getProperty(BUNDLE_PROBABILITY);
|
||||
String str = opts.getProperty(BUNDLE_PROBABILITY);
|
||||
try {
|
||||
if (str != null)
|
||||
probability = Integer.parseInt(str);
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Bundle leaseSet probability overridden incorrectly [" + str + "]", nfe);
|
||||
_log.warn(getJobId() + ": Bundle leaseSet probability overridden incorrectly ["
|
||||
+ str + "]", nfe);
|
||||
}
|
||||
if (probability >= _context.random().nextInt(100))
|
||||
return true;
|
||||
@ -308,7 +312,6 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private void send(Lease lease) {
|
||||
// send it as a garlic with a DeliveryStatusMessage clove and a message selector w/ successJob on reply
|
||||
long token = _context.random().nextInt(Integer.MAX_VALUE);
|
||||
PublicKey key = _status.getLeaseSet().getEncryptionKey();
|
||||
SessionKey sessKey = new SessionKey();
|
||||
@ -325,7 +328,7 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
tags, true, replyLeaseSet);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("send(lease) - token expected " + token);
|
||||
_log.debug(getJobId() + ": send(lease) - token expected " + token);
|
||||
|
||||
_status.sent(lease.getRouterIdentity().getHash(), lease.getTunnelId());
|
||||
|
||||
@ -334,14 +337,14 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
ReplySelector selector = new ReplySelector(token);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Placing GarlicMessage into the new tunnel message bound for "
|
||||
_log.debug(getJobId() + ": Placing GarlicMessage into the new tunnel message bound for "
|
||||
+ lease.getTunnelId() + " on "
|
||||
+ lease.getRouterIdentity().getHash().toBase64());
|
||||
|
||||
TunnelId outTunnelId = selectOutboundTunnel();
|
||||
if (outTunnelId != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Sending tunnel message out " + outTunnelId + " to "
|
||||
_log.debug(getJobId() + ": Sending tunnel message out " + outTunnelId + " to "
|
||||
+ lease.getTunnelId() + " on "
|
||||
+ lease.getRouterIdentity().getHash().toBase64());
|
||||
SendTunnelMessageJob j = new SendTunnelMessageJob(_context, msg, outTunnelId,
|
||||
@ -352,8 +355,8 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
_context.jobQueue().addJob(j);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Could not find any outbound tunnels to send the payload through... wtf?");
|
||||
_context.jobQueue().addJob(onFail);
|
||||
_log.error(getJobId() + ": Could not find any outbound tunnels to send the payload through... wtf?");
|
||||
dieFatal();
|
||||
}
|
||||
}
|
||||
|
||||
@ -385,12 +388,12 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
ClientMessage msg = _status.getMessage();
|
||||
if (alreadyFailed) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("dieFatal() - already failed sending " + msg.getMessageId()
|
||||
_log.debug(getJobId() + ": dieFatal() - already failed sending " + msg.getMessageId()
|
||||
+ ", no need to do it again", new Exception("Duplicate death?"));
|
||||
return;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Failed to send the message " + msg.getMessageId() + " after "
|
||||
_log.error(getJobId() + ": Failed to send the message " + msg.getMessageId() + " after "
|
||||
+ _status.getNumSent() + " sends and " + _status.getNumLookups()
|
||||
+ " lookups (and " + sendTime + "ms)",
|
||||
new Exception("Message send failure"));
|
||||
@ -429,7 +432,7 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
_status.setClove(clove);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Built payload clove with id " + clove.getId());
|
||||
_log.debug(getJobId() + ": Built payload clove with id " + clove.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -638,7 +641,8 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
boolean alreadySuccessful = _status.success();
|
||||
MessageId msgId = _status.getMessage().getMessageId();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("SUCCESS! Message delivered completely for message " + msgId
|
||||
_log.debug(OutboundClientMessageJob.this.getJobId()
|
||||
+ ": SUCCESS! Message delivered completely for message " + msgId
|
||||
+ " after " + sendTime + "ms [for "
|
||||
+ _status.getMessage().getMessageId() + "]");
|
||||
|
||||
@ -649,7 +653,8 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
|
||||
if (alreadySuccessful) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Success is a duplicate for " + _status.getMessage().getMessageId()
|
||||
_log.debug(OutboundClientMessageJob.this.getJobId()
|
||||
+ ": Success is a duplicate for " + _status.getMessage().getMessageId()
|
||||
+ ", dont notify again...");
|
||||
return;
|
||||
}
|
||||
@ -682,7 +687,8 @@ public class OutboundClientMessageJob extends JobImpl {
|
||||
public String getName() { return "Send client message timed out through a lease"; }
|
||||
public void runJob() {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Soft timeout through the lease " + _lease);
|
||||
_log.debug(OutboundClientMessageJob.this.getJobId()
|
||||
+ ": Soft timeout through the lease " + _lease);
|
||||
_lease.setNumFailure(_lease.getNumFailure()+1);
|
||||
sendNext();
|
||||
}
|
||||
|
Reference in New Issue
Block a user