* made dbStore use a pessimistic algorithm - requiring confirmation of a store, rather than optimistically considering all store messages successful (NOT BACKWARDS COMPATIBLE)

* when allocating tunnels for a client, make sure it has a good amount of time left in it (using default values, this means at least 7.5 minutes)
* allow overriding the profile organizer's thresholds so as to enforce a minimum number of fast and reliable peers, allowing a base level of tunnel diversification.  this is done through the "profileOrganizer.minFastPeers" router.config / context property (default minimum = 4 fast and reliable peers)
* don't be so harsh with the isFailing calculator regarding db lookup responses, since we've decreased the timeout.  however, include "participated in a failed tunnel" as part of the criteria
* more logging than god
* for dropped messages, if it is a DeliveryStatusMessage its not an error, its just lag / congestion (keep the average delay as the new stat "inNetPool.droppedDeliveryStatusDelay")
This commit is contained in:
jrandom
2004-05-20 11:06:25 +00:00
committed by zzz
parent bfd59e64ea
commit f2fa2038b1
10 changed files with 575 additions and 284 deletions

View File

@ -19,6 +19,7 @@ import net.i2p.data.DataHelper;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.LeaseSet; import net.i2p.data.LeaseSet;
import net.i2p.data.RouterInfo; import net.i2p.data.RouterInfo;
import net.i2p.data.TunnelId;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -34,6 +35,9 @@ public class DatabaseStoreMessage extends I2NPMessageImpl {
private int _type; private int _type;
private LeaseSet _leaseSet; private LeaseSet _leaseSet;
private RouterInfo _info; private RouterInfo _info;
private long _replyToken;
private TunnelId _replyTunnel;
private Hash _replyGateway;
public final static int KEY_TYPE_ROUTERINFO = 0; public final static int KEY_TYPE_ROUTERINFO = 0;
public final static int KEY_TYPE_LEASESET = 1; public final static int KEY_TYPE_LEASESET = 1;
@ -44,6 +48,9 @@ public class DatabaseStoreMessage extends I2NPMessageImpl {
setKey(null); setKey(null);
setLeaseSet(null); setLeaseSet(null);
setRouterInfo(null); setRouterInfo(null);
setReplyToken(0);
setReplyTunnel(null);
setReplyGateway(null);
} }
/** /**
@ -83,6 +90,22 @@ public class DatabaseStoreMessage extends I2NPMessageImpl {
public int getValueType() { return _type; } public int getValueType() { return _type; }
public void setValueType(int type) { _type = type; } public void setValueType(int type) { _type = type; }
/**
* If a reply is desired, this token specifies the message ID that should
* be used for a DeliveryStatusMessage to be sent to the reply tunnel on the
* reply gateway.
*
* @return positive reply token ID, or 0 if no reply is necessary.
*/
public long getReplyToken() { return _replyToken; }
public void setReplyToken(long token) { _replyToken = token; }
public TunnelId getReplyTunnel() { return _replyTunnel; }
public void setReplyTunnel(TunnelId id) { _replyTunnel = id; }
public Hash getReplyGateway() { return _replyGateway; }
public void setReplyGateway(Hash peer) { _replyGateway = peer; }
public void readMessage(InputStream in, int type) throws I2NPMessageException, IOException { public void readMessage(InputStream in, int type) throws I2NPMessageException, IOException {
if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message"); if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message");
try { try {
@ -91,6 +114,16 @@ public class DatabaseStoreMessage extends I2NPMessageImpl {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hash read: " + _key.toBase64()); _log.debug("Hash read: " + _key.toBase64());
_type = (int)DataHelper.readLong(in, 1); _type = (int)DataHelper.readLong(in, 1);
_replyToken = DataHelper.readLong(in, 4);
if (_replyToken > 0) {
_replyTunnel = new TunnelId();
_replyTunnel.readBytes(in);
_replyGateway = new Hash();
_replyGateway.readBytes(in);
} else {
_replyTunnel = null;
_replyGateway = null;
}
if (_type == KEY_TYPE_LEASESET) { if (_type == KEY_TYPE_LEASESET) {
_leaseSet = new LeaseSet(); _leaseSet = new LeaseSet();
_leaseSet.readBytes(in); _leaseSet.readBytes(in);
@ -121,6 +154,13 @@ public class DatabaseStoreMessage extends I2NPMessageImpl {
try { try {
_key.writeBytes(os); _key.writeBytes(os);
DataHelper.writeLong(os, 1, _type); DataHelper.writeLong(os, 1, _type);
DataHelper.writeLong(os, 4, _replyToken);
if (_replyToken > 0) {
_replyTunnel.writeBytes(os);
_replyGateway.writeBytes(os);
} else {
// noop
}
if (_type == KEY_TYPE_LEASESET) { if (_type == KEY_TYPE_LEASESET) {
_leaseSet.writeBytes(os); _leaseSet.writeBytes(os);
} else if (_type == KEY_TYPE_ROUTERINFO) { } else if (_type == KEY_TYPE_ROUTERINFO) {
@ -143,7 +183,10 @@ public class DatabaseStoreMessage extends I2NPMessageImpl {
return DataHelper.hashCode(getKey()) + return DataHelper.hashCode(getKey()) +
DataHelper.hashCode(getLeaseSet()) + DataHelper.hashCode(getLeaseSet()) +
DataHelper.hashCode(getRouterInfo()) + DataHelper.hashCode(getRouterInfo()) +
getValueType(); getValueType() +
(int)getReplyToken() +
DataHelper.hashCode(getReplyTunnel()) +
DataHelper.hashCode(getReplyGateway());
} }
public boolean equals(Object object) { public boolean equals(Object object) {
@ -152,7 +195,10 @@ public class DatabaseStoreMessage extends I2NPMessageImpl {
return DataHelper.eq(getKey(),msg.getKey()) && return DataHelper.eq(getKey(),msg.getKey()) &&
DataHelper.eq(getLeaseSet(),msg.getLeaseSet()) && DataHelper.eq(getLeaseSet(),msg.getLeaseSet()) &&
DataHelper.eq(getRouterInfo(),msg.getRouterInfo()) && DataHelper.eq(getRouterInfo(),msg.getRouterInfo()) &&
DataHelper.eq(getValueType(),msg.getValueType()); DataHelper.eq(getValueType(),msg.getValueType()) &&
getReplyToken() == msg.getReplyToken() &&
DataHelper.eq(getReplyTunnel(), msg.getReplyTunnel()) &&
DataHelper.eq(getReplyGateway(), msg.getReplyGateway());
} else { } else {
return false; return false;
} }
@ -167,6 +213,9 @@ public class DatabaseStoreMessage extends I2NPMessageImpl {
buf.append("\n\tValue Type: ").append(getValueType()); buf.append("\n\tValue Type: ").append(getValueType());
buf.append("\n\tRouter Info: ").append(getRouterInfo()); buf.append("\n\tRouter Info: ").append(getRouterInfo());
buf.append("\n\tLease Set: ").append(getLeaseSet()); buf.append("\n\tLease Set: ").append(getLeaseSet());
buf.append("\n\tReply token: ").append(getReplyToken());
buf.append("\n\tReply tunnel: ").append(getReplyTunnel());
buf.append("\n\tReply gateway: ").append(getReplyGateway());
buf.append("]"); buf.append("]");
return buf.toString(); return buf.toString();
} }

View File

@ -14,6 +14,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -34,6 +35,7 @@ public class InNetMessagePool {
_handlerJobBuilders = new HashMap(); _handlerJobBuilders = new HashMap();
_log = _context.logManager().getLog(InNetMessagePool.class); _log = _context.logManager().getLog(InNetMessagePool.class);
_context.statManager().createRateStat("inNetPool.dropped", "How often do we drop a message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("inNetPool.dropped", "How often do we drop a message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("inNetPool.droppedDeliveryStatusDelay", "How long after a delivery status message is created do we receive it back again (for messages that are too slow to be handled)", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("inNetPool.duplicate", "How often do we receive a duplicate message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("inNetPool.duplicate", "How often do we receive a duplicate message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
} }
@ -119,10 +121,18 @@ public class InNetMessagePool {
if (size == -1) { if (size == -1) {
// was not handled via HandlerJobBuilder // was not handled via HandlerJobBuilder
_context.messageHistory().droppedOtherMessage(msg.getMessage()); _context.messageHistory().droppedOtherMessage(msg.getMessage());
if (msg.getMessage().getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
long timeSinceSent = _context.clock().now() -
((DeliveryStatusMessage)msg.getMessage()).getArrival().getTime();
if (_log.shouldLog(Log.INFO))
_log.info("Dropping unhandled delivery status message created " + timeSinceSent + "ms ago: " + msg);
_context.statManager().addRateData("inNetPool.droppedDeliveryStatusDelay", timeSinceSent, timeSinceSent);
} else {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Message " + msg.getMessage() + " was not handled by a HandlerJobBuilder - DROPPING: " _log.error("Message " + msg.getMessage() + " was not handled by a HandlerJobBuilder - DROPPING: "
+ msg, new Exception("DROPPED MESSAGE")); + msg, new Exception("DROPPED MESSAGE"));
_context.statManager().addRateData("inNetPool.dropped", 1, 0); _context.statManager().addRateData("inNetPool.dropped", 1, 0);
}
} else { } else {
String mtype = msg.getMessage().getClass().getName(); String mtype = msg.getMessage().getClass().getName();
_context.messageHistory().receiveMessage(mtype, msg.getMessage().getUniqueId(), _context.messageHistory().receiveMessage(mtype, msg.getMessage().getUniqueId(),

View File

@ -9,12 +9,20 @@ package net.i2p.router.networkdb;
*/ */
import java.util.Date; import java.util.Date;
import java.util.List;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.RouterIdentity; import net.i2p.data.RouterIdentity;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.DatabaseStoreMessage; import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
import net.i2p.router.MessageHistory;
import net.i2p.router.NetworkDatabaseFacade;
import net.i2p.router.ProfileManager;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.TunnelSelectionCriteria;
import net.i2p.router.message.SendTunnelMessageJob;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -27,6 +35,9 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
private RouterIdentity _from; private RouterIdentity _from;
private Hash _fromHash; private Hash _fromHash;
private static final long ACK_TIMEOUT = 15*1000;
private static final int ACK_PRIORITY = 100;
public HandleDatabaseStoreMessageJob(RouterContext ctx, DatabaseStoreMessage receivedMessage, RouterIdentity from, Hash fromHash) { public HandleDatabaseStoreMessageJob(RouterContext ctx, DatabaseStoreMessage receivedMessage, RouterIdentity from, Hash fromHash) {
super(ctx); super(ctx);
_log = ctx.logManager().getLog(HandleDatabaseStoreMessageJob.class); _log = ctx.logManager().getLog(HandleDatabaseStoreMessageJob.class);
@ -56,6 +67,10 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
_log.error("Invalid DatabaseStoreMessage data type - " + _message.getValueType() _log.error("Invalid DatabaseStoreMessage data type - " + _message.getValueType()
+ ": " + _message); + ": " + _message);
} }
if (_message.getReplyToken() > 0)
sendAck();
if (_from != null) if (_from != null)
_fromHash = _from.getHash(); _fromHash = _from.getHash();
if (_fromHash != null) if (_fromHash != null)
@ -63,6 +78,32 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
_context.statManager().addRateData("netDb.storeHandled", 1, 0); _context.statManager().addRateData("netDb.storeHandled", 1, 0);
} }
private void sendAck() {
DeliveryStatusMessage msg = new DeliveryStatusMessage(_context);
msg.setMessageId(_message.getReplyToken());
msg.setArrival(new Date(_context.clock().now()));
TunnelId outTunnelId = selectOutboundTunnel();
_context.jobQueue().addJob(new SendTunnelMessageJob(_context, msg, outTunnelId,
_message.getReplyGateway(), _message.getReplyTunnel(),
null, null, null, null, ACK_TIMEOUT, ACK_PRIORITY));
}
private TunnelId selectOutboundTunnel() {
TunnelSelectionCriteria criteria = new TunnelSelectionCriteria();
criteria.setAnonymityPriority(80);
criteria.setLatencyPriority(50);
criteria.setReliabilityPriority(20);
criteria.setMaximumTunnelsRequired(1);
criteria.setMinimumTunnelsRequired(1);
List tunnelIds = _context.tunnelManager().selectOutboundTunnelIds(criteria);
if (tunnelIds.size() <= 0) {
_log.error("No outbound tunnels?!");
return null;
} else {
return (TunnelId)tunnelIds.get(0);
}
}
public String getName() { return "Handle Database Store Message"; } public String getName() { return "Handle Database Store Message"; }
public void dropped() { public void dropped() {

View File

@ -24,9 +24,11 @@ import net.i2p.data.TunnelId;
import net.i2p.data.i2np.DatabaseStoreMessage; import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.GarlicMessage;
import net.i2p.router.Job; import net.i2p.router.Job;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
import net.i2p.router.MessageSelector; import net.i2p.router.MessageSelector;
import net.i2p.router.ReplyJob;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo; import net.i2p.router.TunnelInfo;
import net.i2p.router.TunnelSelectionCriteria; import net.i2p.router.TunnelSelectionCriteria;
@ -57,6 +59,9 @@ class StoreJob extends JobImpl {
private final static int EXPLORATORY_REDUNDANCY = 1; private final static int EXPLORATORY_REDUNDANCY = 1;
private final static int STORE_PRIORITY = 100; private final static int STORE_PRIORITY = 100;
/** how long we allow for an ACK to take after a store */
private final static long STORE_TIMEOUT_MS = 10*1000;
/** /**
* Create a new search for the routingKey specified * Create a new search for the routingKey specified
* *
@ -75,8 +80,10 @@ class StoreJob extends JobImpl {
super(context); super(context);
_log = context.logManager().getLog(StoreJob.class); _log = context.logManager().getLog(StoreJob.class);
_context.statManager().createRateStat("netDb.storeSent", "How many netDb store messages have we sent?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("netDb.storeSent", "How many netDb store messages have we sent?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.storePeers", "How many peers each netDb must be sent to before success?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.ackTime", "How long does it take for a peer to ack a netDb store?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_facade = facade; _facade = facade;
_state = new StoreState(key, data, toSkip); _state = new StoreState(_context, key, data, toSkip);
_onSuccess = onSuccess; _onSuccess = onSuccess;
_onFailure = onFailure; _onFailure = onFailure;
_timeoutMs = timeoutMs; _timeoutMs = timeoutMs;
@ -89,14 +96,14 @@ class StoreJob extends JobImpl {
sendNext(); sendNext();
} }
protected boolean isExpired() { private boolean isExpired() {
return _context.clock().now() >= _expiration; return _context.clock().now() >= _expiration;
} }
/** /**
* send the key to the next batch of peers * send the key to the next batch of peers
*/ */
protected void sendNext() { private void sendNext() {
if (_state.completed()) { if (_state.completed()) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Already completed"); _log.info("Already completed");
@ -106,8 +113,8 @@ class StoreJob extends JobImpl {
_state.complete(true); _state.complete(true);
fail(); fail();
} else { } else {
if (_log.shouldLog(Log.INFO)) //if (_log.shouldLog(Log.INFO))
_log.info("Sending: " + _state); // _log.info(getJobId() + ": Sending: " + _state);
continueSending(); continueSending();
} }
} }
@ -118,11 +125,13 @@ class StoreJob extends JobImpl {
* at any time * at any time
* *
*/ */
protected void continueSending() { private void continueSending() {
if (_state.completed()) return; if (_state.completed()) return;
int toCheck = PARALLELIZATION - _state.getPending().size(); int toCheck = PARALLELIZATION - _state.getPending().size();
if (toCheck <= 0) { if (toCheck <= 0) {
// too many already pending // too many already pending
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Too many store messages pending");
return; return;
} }
if (toCheck > PARALLELIZATION) if (toCheck > PARALLELIZATION)
@ -131,22 +140,24 @@ class StoreJob extends JobImpl {
List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted());
if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { if ( (closestHashes == null) || (closestHashes.size() <= 0) ) {
if (_state.getPending().size() <= 0) { if (_state.getPending().size() <= 0) {
// we tried to find some peers, but there weren't any and no one else is going to answer if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": No more peers left and none pending");
fail(); fail();
} else { } else {
// no more to try, but we might get data or close peers from some outstanding requests if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": No more peers left but some are pending, so keep waiting");
return; return;
} }
} else { } else {
_state.addPending(closestHashes); _state.addPending(closestHashes);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Continue sending key " + _state.getTarget() + " to " + closestHashes); _log.info(getJobId() + ": Continue sending key " + _state.getTarget() + " after " + _state.getAttempted().size() + " tries to " + closestHashes);
for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next(); Hash peer = (Hash)iter.next();
DataStructure ds = _facade.getDataStore().get(peer); DataStructure ds = _facade.getDataStore().get(peer);
if ( (ds == null) || !(ds instanceof RouterInfo) ) { if ( (ds == null) || !(ds instanceof RouterInfo) ) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error selecting closest hash that wasnt a router! " + peer + " : " + ds); _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds);
} else { } else {
sendStore((RouterInfo)ds); sendStore((RouterInfo)ds);
} }
@ -162,10 +173,10 @@ class StoreJob extends JobImpl {
* *
* @return ordered list of Hash objects * @return ordered list of Hash objects
*/ */
protected List getClosestRouters(Hash key, int numClosest, Set alreadyChecked) { private List getClosestRouters(Hash key, int numClosest, Set alreadyChecked) {
Hash rkey = _context.routingKeyGenerator().getRoutingKey(key); Hash rkey = _context.routingKeyGenerator().getRoutingKey(key);
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("Current routing key for " + key + ": " + rkey); // _log.debug(getJobId() + ": Current routing key for " + key + ": " + rkey);
return _peerSelector.selectNearestExplicit(rkey, numClosest, alreadyChecked, _facade.getKBuckets()); return _peerSelector.selectNearestExplicit(rkey, numClosest, alreadyChecked, _facade.getKBuckets());
} }
@ -175,7 +186,7 @@ class StoreJob extends JobImpl {
* DeliveryStatusMessage so we know it got there * DeliveryStatusMessage so we know it got there
* *
*/ */
protected void sendStore(RouterInfo router) { private void sendStore(RouterInfo router) {
DatabaseStoreMessage msg = new DatabaseStoreMessage(_context); DatabaseStoreMessage msg = new DatabaseStoreMessage(_context);
msg.setKey(_state.getTarget()); msg.setKey(_state.getTarget());
if (_state.getData() instanceof RouterInfo) if (_state.getData() instanceof RouterInfo)
@ -189,43 +200,61 @@ class StoreJob extends JobImpl {
if (router.getIdentity().equals(_context.router().getRouterInfo().getIdentity())) { if (router.getIdentity().equals(_context.router().getRouterInfo().getIdentity())) {
// don't send it to ourselves // don't send it to ourselves
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Dont send store to ourselves - why did we try?"); _log.error(getJobId() + ": Dont send store to ourselves - why did we try?");
return; return;
} else { } else {
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("Send store to " + router.getIdentity().getHash().toBase64()); // _log.debug(getJobId() + ": Send store to " + router.getIdentity().getHash().toBase64());
} }
sendStore(msg, router, _expiration); sendStore(msg, router, _expiration);
} }
protected void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { private void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
sendStoreThroughTunnel(msg, peer, expiration); _context.statManager().addRateData("netDb.storeSent", 1, 0);
sendStoreThroughGarlic(msg, peer, expiration);
} }
protected void sendStoreThroughTunnel(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { private void sendStoreThroughGarlic(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
FailedJob fail = new FailedJob(peer); long token = _context.random().nextInt(Integer.MAX_VALUE);
Job sent = new OptimisticSendSuccess(peer);
TunnelInfo info = null; TunnelId replyTunnelId = selectInboundTunnel();
TunnelId outboundTunnelId = selectOutboundTunnel(); TunnelInfo replyTunnel = _context.tunnelManager().getTunnelInfo(replyTunnelId);
if (outboundTunnelId != null) if (replyTunnel == null) {
info = _context.tunnelManager().getTunnelInfo(outboundTunnelId); _log.error("No reply inbound tunnels available!");
if (info == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("selectOutboundTunnel didn't find a valid tunnel! outboundTunnelId = "
+ outboundTunnelId + " is not known by the tunnel manager");
return; return;
} }
if (_log.shouldLog(Log.INFO)) msg.setReplyToken(token);
_log.info("Store for " + _state.getTarget() + " expiring on " + new Date(_expiration) msg.setReplyTunnel(replyTunnelId);
+ " is going to " + peer.getIdentity().getHash() + " via outbound tunnel: " + info); msg.setReplyGateway(replyTunnel.getThisHop());
// send it out our outboundTunnelId with instructions for our endpoint to forward it
// to the router specified (though no particular tunnelId on the target) if (_log.shouldLog(Log.DEBUG))
Job j = new SendTunnelMessageJob(_context, msg, outboundTunnelId, peer.getIdentity().getHash(), _log.debug(getJobId() + ": send(dbStore) w/ token expected " + token);
null, sent, null, fail, null, _expiration-_context.clock().now(),
_state.addPending(peer.getIdentity().getHash());
SendSuccessJob onReply = new SendSuccessJob(peer);
FailedJob onFail = new FailedJob(peer);
StoreMessageSelector selector = new StoreMessageSelector(_context, getJobId(), peer, token, expiration);
TunnelId outTunnelId = selectOutboundTunnel();
if (outTunnelId != null) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(getJobId() + ": Sending tunnel message out " + outTunnelId + " to "
// + peer.getIdentity().getHash().toBase64());
TunnelId targetTunnelId = null; // not needed
Job onSend = null; // not wanted
SendTunnelMessageJob j = new SendTunnelMessageJob(_context, msg, outTunnelId,
peer.getIdentity().getHash(),
targetTunnelId, onSend, onReply,
onFail, selector, STORE_TIMEOUT_MS,
STORE_PRIORITY); STORE_PRIORITY);
_context.jobQueue().addJob(j); _context.jobQueue().addJob(j);
_context.statManager().addRateData("netDb.storeSent", 1, 0); } else {
if (_log.shouldLog(Log.ERROR))
_log.error("No outbound tunnels to send a dbStore out!");
fail();
}
} }
private TunnelId selectOutboundTunnel() { private TunnelId selectOutboundTunnel() {
@ -244,31 +273,43 @@ class StoreJob extends JobImpl {
} }
} }
/** private TunnelId selectInboundTunnel() {
* Called after a match to a db store is found (match against a deliveryStatusMessage) TunnelSelectionCriteria criteria = new TunnelSelectionCriteria();
* criteria.setAnonymityPriority(80);
*/ criteria.setLatencyPriority(50);
criteria.setReliabilityPriority(20);
/** criteria.setMaximumTunnelsRequired(1);
* Called after sending a dbStore to a peer successfully without waiting for confirm and criteria.setMinimumTunnelsRequired(1);
* optimistically mark the store as successful List tunnelIds = _context.tunnelManager().selectInboundTunnelIds(criteria);
* if (tunnelIds.size() <= 0) {
*/ _log.error("No inbound tunnels?!");
protected class OptimisticSendSuccess extends JobImpl { return null;
private Hash _peer; } else {
return (TunnelId)tunnelIds.get(0);
public OptimisticSendSuccess(RouterInfo peer) { }
super(StoreJob.this._context);
_peer = peer.getIdentity().getHash();
} }
public String getName() { return "Optimistic Kademlia Store Send Success"; } /**
* Called after sending a dbStore to a peer successfully,
* marking the store as successful
*
*/
private class SendSuccessJob extends JobImpl implements ReplyJob {
private RouterInfo _peer;
public SendSuccessJob(RouterInfo peer) {
super(StoreJob.this._context);
_peer = peer;
}
public String getName() { return "Kademlia Store Send Success"; }
public void runJob() { public void runJob() {
long howLong = _state.confirmed(_peer.getIdentity().getHash());
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Optimistically marking store of " + _state.getTarget() _log.info(StoreJob.this.getJobId() + ": Marking store of " + _state.getTarget()
+ " to " + _peer + " successful"); + " to " + _peer.getIdentity().getHash().toBase64() + " successful after " + howLong);
//long howLong = _state.confirmed(_peer); _context.profileManager().dbStoreSent(_peer.getIdentity().getHash(), howLong);
//ProfileManager.getInstance().dbStoreSent(_peer, howLong); _context.statManager().addRateData("netDb.ackTime", howLong, howLong);
if (_state.getSuccessful().size() >= REDUNDANCY) { if (_state.getSuccessful().size() >= REDUNDANCY) {
succeed(); succeed();
@ -276,6 +317,10 @@ class StoreJob extends JobImpl {
sendNext(); sendNext();
} }
} }
public void setMessage(I2NPMessage message) {
// ignored, since if the selector matched it, its fine by us
}
} }
/** /**
@ -283,244 +328,47 @@ class StoreJob extends JobImpl {
* reached, or if the peer could not be contacted at all. * reached, or if the peer could not be contacted at all.
* *
*/ */
protected class FailedJob extends JobImpl { private class FailedJob extends JobImpl {
private Hash _peer; private RouterInfo _peer;
public FailedJob(RouterInfo peer) { public FailedJob(RouterInfo peer) {
super(StoreJob.this._context); super(StoreJob.this._context);
_peer = peer.getIdentity().getHash(); _peer = peer;
} }
public void runJob() { public void runJob() {
_state.replyTimeout(_peer); if (_log.shouldLog(Log.WARN))
_context.profileManager().dbStoreFailed(_peer); _log.warn(StoreJob.this.getJobId() + ": Peer " + _peer.getIdentity().getHash().toBase64() + " timed out");
_state.replyTimeout(_peer.getIdentity().getHash());
_context.profileManager().dbStoreFailed(_peer.getIdentity().getHash());
sendNext(); sendNext();
} }
public String getName() { return "Kademlia Store Failed"; } public String getName() { return "Kademlia Store Failed"; }
} }
/**
* Check to see the message is a reply from the peer regarding the current
* search
*
*/
protected class StoreMessageSelector implements MessageSelector {
private Hash _peer;
private long _waitingForId;
private boolean _found;
public StoreMessageSelector(RouterInfo peer, long waitingForId) {
_peer = peer.getIdentity().getHash();
_found = false;
_waitingForId = waitingForId;
}
public boolean continueMatching() { return !_found; }
public long getExpiration() { return _expiration; }
public boolean isMatch(I2NPMessage message) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("isMatch("+message.getClass().getName() + ") [want deliveryStatusMessage from "
+ _peer + " wrt " + _state.getTarget() + "]");
if (message instanceof DeliveryStatusMessage) {
DeliveryStatusMessage msg = (DeliveryStatusMessage)message;
if (msg.getMessageId() == _waitingForId) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Found match for the key we're waiting for: " + _waitingForId);
_found = true;
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("DeliveryStatusMessage of a key we're not looking for");
return false;
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not a DeliveryStatusMessage");
return false;
}
}
}
/** /**
* Send was totally successful * Send was totally successful
*/ */
protected void succeed() { private void succeed() {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Succeeded sending key " + _state.getTarget()); _log.info(getJobId() + ": Succeeded sending key " + _state.getTarget());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("State of successful send: " + _state); _log.debug(getJobId() + ": State of successful send: " + _state);
if (_onSuccess != null) if (_onSuccess != null)
_context.jobQueue().addJob(_onSuccess); _context.jobQueue().addJob(_onSuccess);
_facade.noteKeySent(_state.getTarget()); _facade.noteKeySent(_state.getTarget());
_context.statManager().addRateData("netDb.storePeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted());
} }
/** /**
* Send totally failed * Send totally failed
*/ */
protected void fail() { private void fail() {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Failed sending key " + _state.getTarget()); _log.info(getJobId() + ": Failed sending key " + _state.getTarget());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("State of failed send: " + _state, new Exception("Who failed me?")); _log.debug(getJobId() + ": State of failed send: " + _state, new Exception("Who failed me?"));
if (_onFailure != null) if (_onFailure != null)
_context.jobQueue().addJob(_onFailure); _context.jobQueue().addJob(_onFailure);
} }
protected class StoreState {
private Hash _key;
private DataStructure _data;
private HashSet _pendingPeers;
private HashMap _pendingPeerTimes;
private HashSet _successfulPeers;
private HashSet _successfulExploratoryPeers;
private HashSet _failedPeers;
private HashSet _attemptedPeers;
private volatile long _completed;
private volatile long _started;
public StoreState(Hash key, DataStructure data) {
this(key, data, null);
}
public StoreState(Hash key, DataStructure data, Set toSkip) {
_key = key;
_data = data;
_pendingPeers = new HashSet(16);
_pendingPeerTimes = new HashMap(16);
_attemptedPeers = new HashSet(16);
if (toSkip != null)
_attemptedPeers.addAll(toSkip);
_failedPeers = new HashSet(16);
_successfulPeers = new HashSet(16);
_successfulExploratoryPeers = new HashSet(16);
_completed = -1;
_started = _context.clock().now();
}
public Hash getTarget() { return _key; }
public DataStructure getData() { return _data; }
public Set getPending() {
synchronized (_pendingPeers) {
return (Set)_pendingPeers.clone();
}
}
public Set getAttempted() {
synchronized (_attemptedPeers) {
return (Set)_attemptedPeers.clone();
}
}
public Set getSuccessful() {
synchronized (_successfulPeers) {
return (Set)_successfulPeers.clone();
}
}
public Set getSuccessfulExploratory() {
synchronized (_successfulExploratoryPeers) {
return (Set)_successfulExploratoryPeers.clone();
}
}
public Set getFailed() {
synchronized (_failedPeers) {
return (Set)_failedPeers.clone();
}
}
public boolean completed() { return _completed != -1; }
public void complete(boolean completed) {
if (completed)
_completed = _context.clock().now();
}
public long getWhenStarted() { return _started; }
public long getWhenCompleted() { return _completed; }
public void addPending(Collection pending) {
synchronized (_pendingPeers) {
_pendingPeers.addAll(pending);
for (Iterator iter = pending.iterator(); iter.hasNext(); )
_pendingPeerTimes.put(iter.next(), new Long(_context.clock().now()));
}
synchronized (_attemptedPeers) {
_attemptedPeers.addAll(pending);
}
}
public long confirmed(Hash peer) {
long rv = -1;
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);
Long when = (Long)_pendingPeerTimes.remove(peer);
if (when != null)
rv = _context.clock().now() - when.longValue();
}
synchronized (_successfulPeers) {
_successfulPeers.add(peer);
}
return rv;
}
public long confirmedExploratory(Hash peer) {
long rv = -1;
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);
Long when = (Long)_pendingPeerTimes.remove(peer);
if (when != null)
rv = _context.clock().now() - when.longValue();
}
synchronized (_successfulExploratoryPeers) {
_successfulExploratoryPeers.add(peer);
}
return rv;
}
public void replyTimeout(Hash peer) {
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);
}
synchronized (_failedPeers) {
_failedPeers.add(peer);
}
}
public String toString() {
StringBuffer buf = new StringBuffer(256);
buf.append("Storing ").append(_key);
buf.append(" ");
if (_completed <= 0)
buf.append(" completed? false ");
else
buf.append(" completed on ").append(new Date(_completed));
buf.append(" Attempted: ");
synchronized (_attemptedPeers) {
for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
buf.append(" Pending: ");
synchronized (_pendingPeers) {
for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
buf.append(" Failed: ");
synchronized (_failedPeers) {
for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
buf.append(" Successful: ");
synchronized (_successfulPeers) {
for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
buf.append(" Successful Exploratory: ");
synchronized (_successfulExploratoryPeers) {
for (Iterator iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
return buf.toString();
}
}
} }

View File

@ -0,0 +1,58 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.router.RouterContext;
import net.i2p.router.MessageSelector;
import net.i2p.util.Log;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.DeliveryStatusMessage;
/**
* Check to see the message is a reply from the peer regarding the current
* store
*
*/
class StoreMessageSelector implements MessageSelector {
private Log _log;
private Hash _peer;
private long _storeJobId;
private long _waitingForId;
private long _expiration;
private boolean _found;
public StoreMessageSelector(RouterContext ctx, long storeJobId, RouterInfo peer, long waitingForId,
long expiration) {
_log = ctx.logManager().getLog(StoreMessageSelector.class);
_peer = peer.getIdentity().getHash();
_storeJobId = storeJobId;
_found = false;
_waitingForId = waitingForId;
_expiration = expiration;
}
public boolean continueMatching() { return !_found; }
public long getExpiration() { return _expiration; }
public boolean isMatch(I2NPMessage message) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_storeJobId + ": isMatch("+message.getClass().getName() + ") [want deliveryStatusMessage from "
+ _peer + "]");
if (message instanceof DeliveryStatusMessage) {
DeliveryStatusMessage msg = (DeliveryStatusMessage)message;
if (msg.getMessageId() == _waitingForId) {
if (_log.shouldLog(Log.INFO))
_log.info(_storeJobId + ": Found match for the key we're waiting for: " + _waitingForId);
_found = true;
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_storeJobId + ": DeliveryStatusMessage of a key we're not looking for");
return false;
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_storeJobId + ": Not a DeliveryStatusMessage");
return false;
}
}
}

View File

@ -0,0 +1,189 @@
package net.i2p.router.networkdb.kademlia;
import java.util.Set;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Collection;
import java.util.Date;
import net.i2p.data.Hash;
import net.i2p.data.DataStructure;
import net.i2p.router.RouterContext;
class StoreState {
private RouterContext _context;
private Hash _key;
private DataStructure _data;
private HashSet _pendingPeers;
private HashMap _pendingPeerTimes;
private HashSet _successfulPeers;
private HashSet _successfulExploratoryPeers;
private HashSet _failedPeers;
private HashSet _attemptedPeers;
private volatile long _completed;
private volatile long _started;
public StoreState(RouterContext ctx, Hash key, DataStructure data) {
this(ctx, key, data, null);
}
public StoreState(RouterContext ctx, Hash key, DataStructure data, Set toSkip) {
_context = ctx;
_key = key;
_data = data;
_pendingPeers = new HashSet(16);
_pendingPeerTimes = new HashMap(16);
_attemptedPeers = new HashSet(16);
if (toSkip != null)
_attemptedPeers.addAll(toSkip);
_failedPeers = new HashSet(16);
_successfulPeers = new HashSet(16);
_successfulExploratoryPeers = new HashSet(16);
_completed = -1;
_started = _context.clock().now();
}
public Hash getTarget() { return _key; }
public DataStructure getData() { return _data; }
public Set getPending() {
synchronized (_pendingPeers) {
return (Set)_pendingPeers.clone();
}
}
public Set getAttempted() {
synchronized (_attemptedPeers) {
return (Set)_attemptedPeers.clone();
}
}
public Set getSuccessful() {
synchronized (_successfulPeers) {
return (Set)_successfulPeers.clone();
}
}
public Set getSuccessfulExploratory() {
synchronized (_successfulExploratoryPeers) {
return (Set)_successfulExploratoryPeers.clone();
}
}
public Set getFailed() {
synchronized (_failedPeers) {
return (Set)_failedPeers.clone();
}
}
public boolean completed() { return _completed != -1; }
public void complete(boolean completed) {
if (completed)
_completed = _context.clock().now();
}
public long getWhenStarted() { return _started; }
public long getWhenCompleted() { return _completed; }
public void addPending(Hash peer) {
synchronized (_pendingPeers) {
_pendingPeers.add(peer);
_pendingPeerTimes.put(peer, new Long(_context.clock().now()));
}
synchronized (_attemptedPeers) {
_attemptedPeers.add(peer);
}
}
public void addPending(Collection pending) {
synchronized (_pendingPeers) {
_pendingPeers.addAll(pending);
for (Iterator iter = pending.iterator(); iter.hasNext(); )
_pendingPeerTimes.put(iter.next(), new Long(_context.clock().now()));
}
synchronized (_attemptedPeers) {
_attemptedPeers.addAll(pending);
}
}
public long confirmed(Hash peer) {
long rv = -1;
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);
Long when = (Long)_pendingPeerTimes.remove(peer);
if (when != null)
rv = _context.clock().now() - when.longValue();
}
synchronized (_successfulPeers) {
_successfulPeers.add(peer);
}
return rv;
}
public long confirmedExploratory(Hash peer) {
long rv = -1;
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);
Long when = (Long)_pendingPeerTimes.remove(peer);
if (when != null)
rv = _context.clock().now() - when.longValue();
}
synchronized (_successfulExploratoryPeers) {
_successfulExploratoryPeers.add(peer);
}
return rv;
}
public void replyTimeout(Hash peer) {
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);
}
synchronized (_failedPeers) {
_failedPeers.add(peer);
}
}
public String toString() {
StringBuffer buf = new StringBuffer(256);
buf.append("Storing ").append(_key);
buf.append(" ");
if (_completed <= 0)
buf.append(" completed? false ");
else
buf.append(" completed on ").append(new Date(_completed));
buf.append(" Attempted: ");
synchronized (_attemptedPeers) {
buf.append(_attemptedPeers.size()).append(' ');
for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
buf.append(" Pending: ");
synchronized (_pendingPeers) {
buf.append(_pendingPeers.size()).append(' ');
for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
buf.append(" Failed: ");
synchronized (_failedPeers) {
buf.append(_failedPeers.size()).append(' ');
for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
buf.append(" Successful: ");
synchronized (_successfulPeers) {
buf.append(_successfulPeers.size()).append(' ');
for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
buf.append(" Successful Exploratory: ");
synchronized (_successfulExploratoryPeers) {
buf.append(_successfulExploratoryPeers.size()).append(' ');
for (Iterator iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" ");
}
}
return buf.toString();
}
}

View File

@ -34,11 +34,11 @@ public class IsFailingCalculator extends Calculator {
(profile.getCommError().getRate(60*1000).getLastEventCount() > 0) ) { (profile.getCommError().getRate(60*1000).getLastEventCount() > 0) ) {
return true; return true;
} else { } else {
if ( (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getCurrentEventCount() > 0) || //if ( (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getCurrentEventCount() > 0) ||
(profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getLastEventCount() > 0) ) { // (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getLastEventCount() > 0) ) {
// are they overloaded (or disconnected)? // // are they overloaded (or disconnected)?
return true; // return true;
} //}
long recently = _context.clock().now() - GRACE_PERIOD; long recently = _context.clock().now() - GRACE_PERIOD;
@ -47,6 +47,11 @@ public class IsFailingCalculator extends Calculator {
return true; return true;
} }
if (profile.getTunnelHistory().getLastFailed() >= recently) {
// has a tunnel they participate in failed in the last 5 minutes?
return true;
}
if (profile.getLastSendFailed() >= recently) if (profile.getLastSendFailed() >= recently)
return true; return true;

View File

@ -65,6 +65,14 @@ public class ProfileOrganizer {
public static final String PROP_RELIABILITY_THRESHOLD_FACTOR = "profileOrganizer.reliabilityThresholdFactor"; public static final String PROP_RELIABILITY_THRESHOLD_FACTOR = "profileOrganizer.reliabilityThresholdFactor";
public static final double DEFAULT_RELIABILITY_THRESHOLD_FACTOR = .5d; public static final double DEFAULT_RELIABILITY_THRESHOLD_FACTOR = .5d;
/**
* Defines the minimum number of 'fast' peers that the organizer should select. See
* {@see getMinimumFastPeers}
*
*/
public static final String PROP_MINIMUM_FAST_PEERS = "profileOrganizer.minFastPeers";
public static final int DEFAULT_MINIMUM_FAST_PEERS = 4;
/** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */ /** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */
private Object _reorganizeLock = new Object(); private Object _reorganizeLock = new Object();
@ -323,6 +331,7 @@ public class ProfileOrganizer {
_strictReliabilityOrder = reordered; _strictReliabilityOrder = reordered;
locked_unfailAsNecessary(); locked_unfailAsNecessary();
locked_promoteFastAsNecessary();
} }
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
@ -331,6 +340,35 @@ public class ProfileOrganizer {
} }
} }
/**
* As with locked_unfailAsNecessary, I'm not sure how much I like this - if there
* aren't enough fast peers, move some of the not-so-fast peers into the fast group.
* This picks the not-so-fast peers based on reliability, not speed, and skips over any
* failing peers. Perhaps it should build a seperate strict ordering by speed? Nah, not
* worth the maintenance and memory overhead, at least not for now.
*
*/
private void locked_promoteFastAsNecessary() {
int minFastPeers = getMinimumFastPeers();
int numToPromote = minFastPeers - _fastAndReliablePeers.size();
if (numToPromote > 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Need to explicitly promote " + numToPromote + " peers to the fast+reliable group");
for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) {
PeerProfile cur = (PeerProfile)iter.next();
if ( (!_fastAndReliablePeers.containsKey(cur.getPeer())) && (!cur.getIsFailing()) ) {
_fastAndReliablePeers.put(cur.getPeer(), cur);
// no need to remove it from any of the other groups, since if it is
// fast and reliable, it is reliable, and it is not failing
numToPromote--;
if (numToPromote <= 0)
break;
}
}
}
return;
}
/** how many not failing/active peers must we have? */ /** how many not failing/active peers must we have? */
private final static int MIN_NOT_FAILING_ACTIVE = 3; private final static int MIN_NOT_FAILING_ACTIVE = 3;
/** /**
@ -655,6 +693,7 @@ public class ProfileOrganizer {
double rv = Double.parseDouble(val); double rv = Double.parseDouble(val);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("router context said " + PROP_RELIABILITY_THRESHOLD_FACTOR+ '=' + val); _log.debug("router context said " + PROP_RELIABILITY_THRESHOLD_FACTOR+ '=' + val);
return rv;
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Reliability threshold factor improperly set in the router environment [" + val + "]", nfe); _log.warn("Reliability threshold factor improperly set in the router environment [" + val + "]", nfe);
@ -666,6 +705,48 @@ public class ProfileOrganizer {
return DEFAULT_RELIABILITY_THRESHOLD_FACTOR; return DEFAULT_RELIABILITY_THRESHOLD_FACTOR;
} }
/**
* Defines the minimum number of 'fast' peers that the organizer should select. If
* the profile calculators derive a threshold that does not select at least this many peers,
* the threshold will be overridden to make sure this many peers are in the fast+reliable group.
* This parameter should help deal with a lack of diversity in the tunnels created when some
* peers are particularly fast.
*
* @return minimum number of peers to be placed in the 'fast+reliable' group
*/
private int getMinimumFastPeers() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_MINIMUM_FAST_PEERS);
if (val != null) {
try {
int rv = Integer.parseInt(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router config said " + PROP_MINIMUM_FAST_PEERS + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Minimum fast peers improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_MINIMUM_FAST_PEERS, ""+DEFAULT_MINIMUM_FAST_PEERS);
if (val != null) {
try {
int rv = Integer.parseInt(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router context said " + PROP_MINIMUM_FAST_PEERS + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Minimum fast peers improperly set in the router environment [" + val + "]", nfe);
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("no config for " + PROP_MINIMUM_FAST_PEERS + ", using " + DEFAULT_MINIMUM_FAST_PEERS);
return DEFAULT_MINIMUM_FAST_PEERS;
}
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK)); private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } } private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } }
} }

View File

@ -85,7 +85,7 @@ public class VMCommSystem extends CommSystemFacade {
_from = from; _from = from;
_msg = msg; _msg = msg;
// bah, ueberspeed! // bah, ueberspeed!
//getTiming().setStartAfter(us.clock().now() + 50); getTiming().setStartAfter(us.clock().now() + 200);
} }
public void runJob() { public void runJob() {
I2NPMessageHandler handler = new I2NPMessageHandler(_ctx); I2NPMessageHandler handler = new I2NPMessageHandler(_ctx);

View File

@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.Date;
import net.i2p.data.TunnelId; import net.i2p.data.TunnelId;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
@ -96,6 +97,8 @@ class ClientTunnelPoolManagerJob extends JobImpl {
private void requestMoreTunnels(int numTunnels) { private void requestMoreTunnels(int numTunnels) {
if (_clientPool.getClientSettings().getDepthInbound() < 1) { if (_clientPool.getClientSettings().getDepthInbound() < 1) {
// the client wants 0-hop tunnels, so don't waste longer tunnels on them // the client wants 0-hop tunnels, so don't waste longer tunnels on them
if (_log.shouldLog(Log.DEBUG))
_log.debug("0 hop tunnels wanted - create custom ones");
requestCustomTunnels(numTunnels); requestCustomTunnels(numTunnels);
return; return;
} }
@ -103,6 +106,9 @@ class ClientTunnelPoolManagerJob extends JobImpl {
int allocated = allocateExisting(numTunnels); int allocated = allocateExisting(numTunnels);
if (allocated < numTunnels) { if (allocated < numTunnels) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("insufficient tunnels available (wanted: " + numTunnels
+ ", allocated: " + allocated + ", requesting custom ones");
requestCustomTunnels(numTunnels - allocated); requestCustomTunnels(numTunnels - allocated);
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -169,10 +175,14 @@ class ClientTunnelPoolManagerJob extends JobImpl {
return false; return false;
} }
long expireAfter = _context.clock().now() + POOL_CHECK_DELAY + _tunnelPool.getTunnelCreationTimeout()*2; // (furthest in the future) - (rebuild buffer time)
long expireAfter = _context.clock().now() + _tunnelPool.getPoolSettings().getInboundDuration()
- POOL_CHECK_DELAY - _tunnelPool.getTunnelCreationTimeout()*2;
if (info.getSettings().getExpiration() <= expireAfter) { if (info.getSettings().getExpiration() <= expireAfter) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Refusing tunnel " + info.getTunnelId() + " because it is going to expire soon"); _log.debug("Refusing tunnel " + info.getTunnelId() + " because it is going to expire soon ("
+ new Date(info.getSettings().getExpiration())
+ ", before " + new Date(expireAfter) + ")");
return false; return false;
} }