(no, this doesnt fix things yet, but its a save point along the path)

2005-03-11  jrandom
    * Rather than the fixed resend timeout floor (10s), use 10s+RTT as the
      minimum (increased on resends as before, of course).
    * Always prod the clock update listeners, even if just to tell them that
      the time hasn't changed much.
    * Added support for explicit peer selection for individual tunnel pools,
      which will be useful in debugging but not recommended for use by normal
      end users.
    * More aggressively search for the next hop's routerInfo on tunnel join.
    * Give messages received via inbound tunnels that are bound to remote
      locations sufficient time (taking into account clock skew).
    * Give alternate direct send messages sufficient time (10s min, not 5s)
    * Always give the end to end data message the explicit timeout (though the
      old default was sufficient before)
    * No need to give end to end messages an insane expiration (+2m), as we
      are already handling skew on the receiving side.
    * Don't complain too loudly about expired TunnelCreateMessages (at least,
      not until after all those 0.5 and 0.5.0.1 users upgrade ;)
    * Properly keep the sendBps stat
    * When running the router with router.keepHistory=true, log more data to
      messageHistory.txt
    * Logging updates
    * Minor formatting updates
This commit is contained in:
jrandom
2005-03-11 22:23:36 +00:00
committed by zzz
parent ea6fbc7835
commit d74aa6e53d
27 changed files with 248 additions and 59 deletions

View File

@ -258,7 +258,7 @@ public class Connection {
}
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
long timeout = (_options.getRTT() < MIN_RESEND_DELAY ? MIN_RESEND_DELAY : _options.getRTT());
long timeout = _options.getRTT() + MIN_RESEND_DELAY;
if (timeout > MAX_RESEND_DELAY)
timeout = MAX_RESEND_DELAY;
if (_log.shouldLog(Log.DEBUG))

View File

@ -552,6 +552,11 @@ public class Packet {
}
public String toString() {
StringBuffer str = formatAsString();
return str.toString();
}
protected StringBuffer formatAsString() {
StringBuffer buf = new StringBuffer(64);
buf.append(toId(_sendStreamId));
//buf.append("<-->");
@ -570,7 +575,7 @@ public class Packet {
}
if ( (_payload != null) && (_payload.getValid() > 0) )
buf.append(" data: ").append(_payload.getValid());
return buf.toString();
return buf;
}
private static final String toId(byte id[]) {

View File

@ -107,11 +107,15 @@ public class PacketHandler {
private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS");
void displayPacket(Packet packet, String prefix, String suffix) {
if (!_log.shouldLog(Log.DEBUG)) return;
String msg = null;
StringBuffer buf = new StringBuffer(256);
synchronized (_fmt) {
msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString() + (suffix != null ? " " + suffix : "");
buf.append(_fmt.format(new Date()));
}
System.out.println(msg);
buf.append(": ").append(prefix).append(" ");
buf.append(packet.toString());
if (suffix != null)
buf.append(" ").append(suffix);
System.out.println(buf.toString());
}
private void receiveKnownCon(Connection con, Packet packet) {

View File

@ -114,16 +114,44 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public void setResendPacketEvent(SimpleTimer.TimedEvent evt) { _resendEvent = evt; }
public String toString() {
String str = super.toString();
public StringBuffer formatAsString() {
StringBuffer buf = super.formatAsString();
Connection con = _connection;
if (con != null)
buf.append(" rtt ").append(con.getOptions().getRTT());
if ( (_tagsSent != null) && (_tagsSent.size() > 0) )
str = str + " with tags";
buf.append(" with tags");
if (_ackOn > 0)
return str + " ack after " + getAckTime() + (_numSends <= 1 ? "" : " sent " + _numSends + " times");
else
return str + (_numSends <= 1 ? "" : " sent " + _numSends + " times");
buf.append(" ack after ").append(getAckTime());
if (_numSends > 1)
buf.append(" sent ").append(_numSends).append(" times");
if (isFlagSet(Packet.FLAG_SYNCHRONIZE) ||
isFlagSet(Packet.FLAG_CLOSE) ||
isFlagSet(Packet.FLAG_RESET)) {
if (con != null) {
buf.append(" from ");
Destination local = con.getSession().getMyDestination();
if (local != null)
buf.append(local.calculateHash().toBase64().substring(0,4));
else
buf.append("unknown");
buf.append(" to ");
Destination remote = con.getRemotePeer();
if (remote != null)
buf.append(remote.calculateHash().toBase64().substring(0,4));
else
buf.append("unknown");
}
}
return buf;
}
public void waitForAccept(int maxWaitMs) {

View File

@ -172,7 +172,7 @@ public class Timestamper implements Runnable {
if (Math.abs(delta) < MAX_VARIANCE) {
if (_log.shouldLog(Log.INFO))
_log.info("a single SNTP query was within the tolerance (" + delta + "ms)");
return true;
break;
} else {
// outside the tolerance, lets iterate across the concurring queries
expectedDelta = delta;

View File

@ -1,4 +1,31 @@
$Id: history.txt,v 1.166 2005/03/06 19:40:45 jrandom Exp $
$Id: history.txt,v 1.167 2005/03/07 21:45:14 jrandom Exp $
2005-03-11 jrandom
* Rather than the fixed resend timeout floor (10s), use 10s+RTT as the
minimum (increased on resends as before, of course).
* Always prod the clock update listeners, even if just to tell them that
the time hasn't changed much.
* Added support for explicit peer selection for individual tunnel pools,
which will be useful in debugging but not recommended for use by normal
end users.
* More aggressively search for the next hop's routerInfo on tunnel join.
* Give messages received via inbound tunnels that are bound to remote
locations sufficient time (taking into account clock skew).
* Give alternate direct send messages sufficient time (10s min, not 5s)
* Always give the end to end data message the explicit timeout (though the
old default was sufficient before)
* No need to give end to end messages an insane expiration (+2m), as we
are already handling skew on the receiving side.
* Don't complain too loudly about expired TunnelCreateMessages (at least,
not until after all those 0.5 and 0.5.0.1 users upgrade ;)
* Properly keep the sendBps stat
* When running the router with router.keepHistory=true, log more data to
messageHistory.txt
* Logging updates
* Minor formatting updates
2005-03-08 jrandom
* More aggressively adjust the clock
2005-03-07 jrandom
* Fix the HTTP response header filter to allow multiple headers with the

View File

@ -67,6 +67,8 @@ public interface I2NPMessage extends DataStructure {
*
*/
public long getMessageExpiration();
public void setMessageExpiration(long exp);
/** How large the message is, including any checksums */
public int getMessageSize();

View File

@ -17,6 +17,7 @@ import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.data.i2np.TunnelCreateMessage;
import net.i2p.data.i2np.TunnelCreateStatusMessage;
import net.i2p.data.i2np.TunnelDataMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
@ -103,11 +104,15 @@ public class InNetMessagePool implements Service {
if (messageBody instanceof TunnelDataMessage) {
// do not validate the message with the validator - the IV validator is sufficient
} else {
boolean valid = _context.messageValidator().validateMessage(messageBody.getUniqueId(), exp);
if (!valid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Duplicate message received [" + messageBody.getUniqueId()
+ " expiring on " + exp + "]: " + messageBody.getClass().getName());
String invalidReason = _context.messageValidator().validateMessage(messageBody.getUniqueId(), exp);
if (invalidReason != null) {
int level = Log.WARN;
if (messageBody instanceof TunnelCreateMessage)
level = Log.INFO;
if (_log.shouldLog(level))
_log.log(level, "Duplicate message received [" + messageBody.getUniqueId()
+ " expiring on " + exp + "]: " + messageBody.getClass().getName() + ": " + invalidReason
+ ": " + messageBody);
_context.statManager().addRateData("inNetPool.dropped", 1, 0);
_context.statManager().addRateData("inNetPool.duplicate", 1, 0);
_context.messageHistory().droppedOtherMessage(messageBody);

View File

@ -318,7 +318,7 @@ public class MessageHistory {
*/
public void sendMessage(String messageType, long messageId, long expiration, Hash peer, boolean sentOk) {
if (!_doLog) return;
if (true) return;
if (false) return;
StringBuffer buf = new StringBuffer(256);
buf.append(getPrefix());
buf.append("send [").append(messageType).append("] message [").append(messageId).append("] ");
@ -344,7 +344,7 @@ public class MessageHistory {
*/
public void receiveMessage(String messageType, long messageId, long expiration, Hash from, boolean isValid) {
if (!_doLog) return;
if (true) return;
if (false) return;
StringBuffer buf = new StringBuffer(256);
buf.append(getPrefix());
buf.append("receive [").append(messageType).append("] with id [").append(messageId).append("] ");

View File

@ -31,20 +31,20 @@ public class MessageValidator {
/**
* Determine if this message should be accepted as valid (not expired, not a duplicate)
*
* @return true if the message should be accepted as valid, false otherwise
* @return reason why the message is invalid (or null if the message is valid)
*/
public boolean validateMessage(long messageId, long expiration) {
public String validateMessage(long messageId, long expiration) {
long now = _context.clock().now();
if (now - Router.CLOCK_FUDGE_FACTOR >= expiration) {
if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting message " + messageId + " because it expired " + (now-expiration) + "ms ago");
_context.statManager().addRateData("router.invalidMessageTime", (now-expiration), 0);
return false;
return "expired " + (now-expiration) + "ms ago";
} else if (now + 4*Router.CLOCK_FUDGE_FACTOR < expiration) {
if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting message " + messageId + " because it will expire too far in the future (" + (expiration-now) + "ms)");
_context.statManager().addRateData("router.invalidMessageTime", (now-expiration), 0);
return false;
return "expire too far in the future (" + (expiration-now) + "ms)";
}
boolean isDuplicate = noteReception(messageId, expiration);
@ -52,11 +52,11 @@ public class MessageValidator {
if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting message " + messageId + " because it is a duplicate", new Exception("Duplicate origin"));
_context.statManager().addRateData("router.duplicateMessageId", 1, 0);
return false;
return "duplicate";
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Accepting message " + messageId + " because it is NOT a duplicate", new Exception("Original origin"));
return true;
return null;
}
}

View File

@ -371,7 +371,7 @@ public class Router {
RateStat sendRate = _context.statManager().getRate("transport.sendMessageSize");
if (sendRate != null) {
Rate rate = receiveRate.getRate(60*1000);
Rate rate = sendRate.getRate(60*1000);
if (rate != null) {
double bytes = rate.getLastTotalValue();
double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d);

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.160 $ $Date: 2005/03/06 19:07:27 $";
public final static String ID = "$Revision: 1.161 $ $Date: 2005/03/07 21:45:15 $";
public final static String VERSION = "0.5.0.2";
public final static long BUILD = 1;
public final static long BUILD = 2;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -109,6 +109,10 @@ public class GarlicMessageBuilder {
msg.setData(encData);
msg.setMessageExpiration(config.getExpiration());
long timeFromNow = config.getExpiration() - ctx.clock().now();
if (timeFromNow < 10*1000)
log.error("Building a message expiring in " + timeFromNow + "ms: " + config, new Exception("created by"));
if (log.shouldLog(Log.WARN))
log.warn("CloveSet size for message " + msg.getUniqueId() + " is " + cloveSet.length
+ " and encrypted message data is " + encData.length);

View File

@ -99,13 +99,13 @@ public class GarlicMessageReceiver {
}
private boolean isValid(GarlicClove clove) {
boolean valid = _context.messageValidator().validateMessage(clove.getCloveId(),
clove.getExpiration().getTime());
if (!valid) {
String invalidReason = _context.messageValidator().validateMessage(clove.getCloveId(),
clove.getExpiration().getTime());
if (invalidReason != null) {
String howLongAgo = DataHelper.formatDuration(_context.clock().now()-clove.getExpiration().getTime());
if (_log.shouldLog(Log.ERROR))
_log.error("Clove is NOT valid: id=" + clove.getCloveId()
+ " expiration " + howLongAgo + " ago");
+ " expiration " + howLongAgo + " ago: " + invalidReason + ": " + clove);
if (_log.shouldLog(Log.WARN))
_log.warn("Clove is NOT valid: id=" + clove.getCloveId()
+ " expiration " + howLongAgo + " ago", new Exception("Invalid within..."));
@ -113,6 +113,6 @@ public class GarlicMessageReceiver {
clove.getData().getClass().getName(),
"Clove is not valid (expiration " + howLongAgo + " ago)");
}
return valid;
return (invalidReason == null);
}
}

View File

@ -108,7 +108,7 @@ class OutboundClientMessageJobHelper {
config.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
config.setDeliveryInstructions(instructions);
config.setId(ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE));
config.setExpiration(expiration+2*Router.CLOCK_FUDGE_FACTOR);
config.setExpiration(expiration); // +2*Router.CLOCK_FUDGE_FACTOR);
config.setRecipientPublicKey(recipientPK);
config.setRequestAck(false);

View File

@ -51,6 +51,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private int _clientMessageSize;
private Destination _from;
private Destination _to;
private String _toString;
/** target destination's leaseSet, if known */
private LeaseSet _leaseSet;
/** Actual lease the message is being routed through */
@ -117,6 +118,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_clientMessageSize = msg.getPayload().getSize();
_from = msg.getFromDestination();
_to = msg.getDestination();
_toString = _to.calculateHash().toBase64().substring(0,4);
_leaseSetLookupBegin = -1;
String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM);
@ -146,22 +148,24 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_log.debug(getJobId() + ": Send outbound client message job beginning");
buildClove();
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Clove built");
_log.debug(getJobId() + ": Clove built to " + _toString);
long timeoutMs = _overallExpiration - getContext().clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": preparing to search for the leaseSet");
_log.debug(getJobId() + ": preparing to search for the leaseSet for " + _toString);
Hash key = _to.calculateHash();
SendJob success = new SendJob(getContext());
LookupLeaseSetFailedJob failed = new LookupLeaseSetFailedJob(getContext());
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job");
LeaseSet ls = getContext().netDb().lookupLeaseSetLocally(key);
if (ls != null) {
getContext().statManager().addRateData("client.leaseSetFoundLocally", 1, 0);
_leaseSetLookupBegin = -1;
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Send outbound client message - leaseSet found locally for " + _toString);
success.runJob();
} else {
_leaseSetLookupBegin = getContext().clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job for " + _toString);
getContext().netDb().lookupLeaseSet(key, success, failed, timeoutMs);
}
}
@ -203,8 +207,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
boolean ok = getNextLease();
if (ok)
send();
else
else {
if (_log.shouldLog(Log.ERROR))
_log.error("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")");
dieFatal();
}
}
}
@ -218,7 +225,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash());
if (_leaseSet == null) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Lookup locally didn't find the leaseSet");
_log.warn(getJobId() + ": Lookup locally didn't find the leaseSet for " + _toString);
return false;
}
long now = getContext().clock().now();
@ -229,7 +236,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
Lease lease = _leaseSet.getLease(i);
if (lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": getNextLease() - expired lease! - " + lease);
_log.warn(getJobId() + ": getNextLease() - expired lease! - " + lease + " for " + _toString);
continue;
} else {
leases.add(lease);
@ -278,6 +285,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
long lookupTime = getContext().clock().now() - _leaseSetLookupBegin;
getContext().statManager().addRateData("client.leaseSetFailedRemoteTime", lookupTime, lookupTime);
}
if (_log.shouldLog(Log.ERROR))
_log.error("Unable to send to " + _toString + " because we couldn't find their leaseSet");
dieFatal();
}
@ -312,12 +322,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// set to null if there are no tunnels to ack the reply back through
// (should we always fail for this? or should we send it anyway, even if
// we dont receive the reply? hmm...)
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Unable to create the garlic message (no tunnels left) to " + _toString);
dieFatal();
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": send() - token expected " + token);
_log.debug(getJobId() + ": send() - token expected " + token + " to " + _toString);
SendSuccessJob onReply = new SendSuccessJob(getContext(), sessKey, tags);
SendTimeoutJob onFail = new SendTimeoutJob(getContext());
@ -325,6 +337,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Placing GarlicMessage into the new tunnel message bound for "
+ _toString + " at "
+ _lease.getTunnelId() + " on "
+ _lease.getGateway().toBase64());
@ -332,6 +345,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (outTunnel != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Sending tunnel message out " + outTunnel.getSendTunnelId(0) + " to "
+ _toString + " at "
+ _lease.getTunnelId() + " on "
+ _lease.getGateway().toBase64());
@ -365,7 +379,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
public String getName() { return "Dispatch outbound client message"; }
public void runJob() {
getContext().messageRegistry().registerPending(_selector, _replyFound, _replyTimeout, _timeoutMs);
if (_log.shouldLog(Log.INFO))
_log.info("Dispatching message to " + _toString + ": " + _msg);
getContext().tunnelDispatcher().dispatchOutbound(_msg, _outTunnel.getSendTunnelId(0), _lease.getTunnelId(), _lease.getGateway());
if (_log.shouldLog(Log.INFO))
_log.info("Dispatching message to " + _toString + " complete");
}
}
@ -427,6 +445,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
DataMessage msg = new DataMessage(getContext());
msg.setData(_clientMessage.getPayload().getEncryptedData());
msg.setMessageExpiration(_overallExpiration);
clove.setPayload(msg);
clove.setRecipientPublicKey(null);
@ -471,6 +490,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
return false;
}
}
public String toString() {
return "sending " + _toString + " waiting for token " + _pendingToken
+ " for cloveId " + _cloveId;
}
}
/**

View File

@ -48,10 +48,10 @@ public class SendMessageDirectJob extends JobImpl {
_message = message;
_targetHash = toPeer;
_router = null;
if (timeoutMs < 5*1000) {
if (timeoutMs < 10*1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("Very little time given [" + timeoutMs + "], resetting to 5s", new Exception("stingy bastard"));
_expiration = ctx.clock().now() + 5*1000;
_expiration = ctx.clock().now() + 10*1000;
} else {
_expiration = timeoutMs + ctx.clock().now();
}

View File

@ -264,8 +264,9 @@ class SearchJob extends JobImpl {
_log.error(getJobId() + ": Dont send search to ourselves - why did we try?");
return;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Send search to " + router.getIdentity().getHash().toBase64());
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Send search to " + router.getIdentity().getHash().toBase64()
+ " for " + _state.getTarget().toBase64());
}
getContext().statManager().addRateData("netDb.searchMessageCount", 1, 0);
@ -330,8 +331,8 @@ class SearchJob extends JobImpl {
DatabaseLookupMessage msg = buildMessage(expiration);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64()
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64()
+ " for " + msg.getSearchKey().toBase64() + " w/ replies to us ["
+ msg.getFrom().toBase64() + "]");
SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state);

View File

@ -357,7 +357,7 @@ public class ProfileOrganizer {
// we dont want the good peers, just random ones
continue;
} else {
if (isOk(cur))
if (isSelectable(cur))
selected.add(cur);
}
}
@ -474,7 +474,7 @@ public class ProfileOrganizer {
for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile cur = (PeerProfile)iter.next();
if ( (!_fastPeers.containsKey(cur.getPeer())) && (!cur.getIsFailing()) ) {
if (!isOk(cur.getPeer())) {
if (!isSelectable(cur.getPeer())) {
// skip peers we dont have in the netDb
if (_log.shouldLog(Log.INFO))
_log.info("skip unknown peer from fast promotion: " + cur.getPeer().toBase64());
@ -701,7 +701,7 @@ public class ProfileOrganizer {
Collections.shuffle(all, _random);
for (int i = 0; (matches.size() < howMany) && (i < all.size()); i++) {
Hash peer = (Hash)all.get(i);
boolean ok = isOk(peer);
boolean ok = isSelectable(peer);
if (ok)
matches.add(peer);
else
@ -709,7 +709,7 @@ public class ProfileOrganizer {
}
}
private boolean isOk(Hash peer) {
public boolean isSelectable(Hash peer) {
NetworkDatabaseFacade netDb = _context.netDb();
// the CLI shouldn't depend upon the netDb
if (netDb == null) return true;
@ -755,7 +755,7 @@ public class ProfileOrganizer {
if (_log.shouldLog(Log.DEBUG))
_log.debug("High capacity: \t" + profile.getPeer().toBase64());
if (_thresholdSpeedValue <= profile.getSpeedValue()) {
if (!isOk(profile.getPeer())) {
if (!isSelectable(profile.getPeer())) {
if (_log.shouldLog(Log.INFO))
_log.info("Skipping fast mark [!ok] for " + profile.getPeer().toBase64());
} else if (!profile.getIsActive()) {

View File

@ -305,17 +305,18 @@ public class TCPTransport extends TransportImpl {
void connectionClosed(TCPConnection con) {
synchronized (_connectionLock) {
TCPConnection cur = (TCPConnection)_connectionsByIdent.remove(con.getRemoteRouterIdentity().getHash());
if (cur != con)
if ( (cur != null) && (cur != con) )
_connectionsByIdent.put(cur.getRemoteRouterIdentity().getHash(), cur);
cur = (TCPConnection)_connectionsByAddress.remove(con.getRemoteAddress().toString());
if (cur != con)
if ( (cur != null) && (cur != con) )
_connectionsByAddress.put(cur.getRemoteAddress().toString(), cur);
if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(256);
buf.append("\nCLOSING ").append(con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6));
buf.append(".");
buf.append("\nconnectionsByAddress: (cur=").append(con.getRemoteAddress().toString()).append(") ");
if (cur != null)
buf.append("\nconnectionsByAddress: (cur=").append(con.getRemoteAddress().toString()).append(") ");
for (Iterator iter = _connectionsByAddress.keySet().iterator(); iter.hasNext(); ) {
String addr = (String)iter.next();
buf.append(addr).append(" ");

View File

@ -363,8 +363,8 @@ public class FragmentHandler {
}
if (removed && !_msg.getReleased()) {
noteFailure(_msg.getMessageId());
if (_log.shouldLog(Log.WARN))
_log.warn("Dropped failed fragmented message: " + _msg);
if (_log.shouldLog(Log.ERROR))
_log.error("Dropped failed fragmented message: " + _msg);
_context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime());
_msg.failed();
} else {

View File

@ -90,6 +90,8 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec
+ " failing to distribute " + msg);
return;
}
if (msg.getMessageExpiration() < _context.clock().now() + 10*1000)
msg.setMessageExpiration(_context.clock().now() + 10*1000);
_context.tunnelDispatcher().dispatchOutbound(msg, outId, tunnel, target);
}
}

View File

@ -431,6 +431,12 @@ public class TunnelDispatcher implements Service {
+ (before-msg.getMessageExpiration()) + "ms ago? "
+ msg, new Exception("cause"));
return;
} else if (msg.getMessageExpiration() < before) {
// nonfatal, as long as it was remotely created
if (_log.shouldLog(Log.WARN))
_log.warn("why are you sending a tunnel message that expired "
+ (before-msg.getMessageExpiration()) + "ms ago? "
+ msg, new Exception("cause"));
}
gw.add(msg, targetPeer, targetTunnel);
if (targetTunnel == null)

View File

@ -46,6 +46,17 @@ public class TunnelParticipant {
if ( (_config != null) && (_config.getSendTo() != null) ) {
_nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
if (_nextHopCache == null)
_context.netDb().lookupRouterInfo(_config.getSendTo(), new Found(_context), null, 60*1000);
}
}
private class Found extends JobImpl {
public Found(RouterContext ctx) { super(ctx); }
public String getName() { return "Next hop info found"; }
public void runJob() {
if (_nextHopCache == null)
_nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
}
}

View File

@ -18,6 +18,10 @@ class ClientPeerSelector extends TunnelPeerSelector {
if (length < 0)
return null;
HashSet matches = new HashSet(length);
if (shouldSelectExplicit(settings))
return selectExplicit(ctx, settings, length);
ctx.profileOrganizer().selectFastPeers(length, null, matches);
matches.remove(ctx.routerHash());
@ -29,4 +33,5 @@ class ClientPeerSelector extends TunnelPeerSelector {
rv.add(ctx.routerHash());
return rv;
}
}

View File

@ -17,6 +17,10 @@ class ExploratoryPeerSelector extends TunnelPeerSelector {
int length = getLength(ctx, settings);
if (length < 0)
return null;
if (shouldSelectExplicit(settings))
return selectExplicit(ctx, settings, length);
HashSet matches = new HashSet(length);
ctx.profileOrganizer().selectNotFailingPeers(length, null, matches, true);

View File

@ -1,6 +1,12 @@
package net.i2p.router.tunnel.pool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
import net.i2p.data.DataFormatException;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelPoolSettings;
import net.i2p.util.Log;
@ -54,4 +60,58 @@ abstract class TunnelPeerSelector {
}
return length;
}
protected boolean shouldSelectExplicit(TunnelPoolSettings settings) {
Properties opts = settings.getUnknownOptions();
if (opts != null) {
String peers = opts.getProperty("explicitPeers");
if (peers != null)
return true;
}
return false;
}
protected List selectExplicit(RouterContext ctx, TunnelPoolSettings settings, int length) {
String peers = null;
Properties opts = settings.getUnknownOptions();
if (opts != null)
peers = opts.getProperty("explicitPeers");
Log log = ctx.logManager().getLog(ClientPeerSelector.class);
List rv = new ArrayList();
StringTokenizer tok = new StringTokenizer(peers, ",");
Hash h = new Hash();
while (tok.hasMoreTokens()) {
String peerStr = tok.nextToken();
Hash peer = new Hash();
try {
peer.fromBase64(peerStr);
if (ctx.profileOrganizer().isSelectable(peer)) {
rv.add(peer);
} else {
if (log.shouldLog(Log.WARN))
log.warn("Explicit peer is not selectable: " + peerStr);
}
} catch (DataFormatException dfe) {
if (log.shouldLog(Log.ERROR))
log.error("Explicit peer is improperly formatted (" + peerStr + ")", dfe);
}
}
Collections.shuffle(rv, ctx.random());
while (rv.size() > length)
rv.remove(0);
if (settings.isInbound())
rv.add(0, ctx.routerHash());
else
rv.add(ctx.routerHash());
if (log.shouldLog(Log.INFO))
log.info(toString() + ": Selecting peers explicitly: " + rv);
return rv;
}
}