DatabaseLookupMessageHandler:

added stat - netDb.lookupsReceived
 fixed formatting
HandleDatabaseLookupMessage:
 added stat - netDb.lookupsHandled
 added stat - netDb.lookupsMatched
 fixed formatting
HandleDatabaseStoreMessage:
 added stat - netDb.storeHandled
 fixed formatting
StoreJob:
 added stat - netDb.storeSent
 fixed formatting
 removed old unused code (we do dbStore through tunnels, not garlics)
 logging
SearchJob:
 fixed formatting
 logging
HandleTunnelCreateMessageJob:
 fixed formatting
 logging
PoolingTunnelManagerFacade:
 added stat - tunnel.participatingTunnels
 fixed formatting
 logging
TunnelPool:
 added getParticipatingTunnelCount()
 fixed formatting
 logging
StatisticsManager:
 revamped whats published
 fixed formatting
 logging
 fixed formatting
This commit is contained in:
jrandom
2004-04-16 23:52:11 +00:00
committed by zzz
parent 58c145ba08
commit 86759d2f9c
9 changed files with 1531 additions and 1568 deletions

View File

@ -45,16 +45,24 @@ public class StatisticsManager implements Service {
String val = Router.getInstance().getConfigSetting(PROP_PUBLISH_RANKINGS);
try {
if (val == null) {
_log.info("Peer publishing setting " + PROP_PUBLISH_RANKINGS + " not set - using default " + DEFAULT_PROP_PUBLISH_RANKINGS);
if (_log.shouldLog(Log.INFO))
_log.info("Peer publishing setting " + PROP_PUBLISH_RANKINGS
+ " not set - using default " + DEFAULT_PROP_PUBLISH_RANKINGS);
val = DEFAULT_PROP_PUBLISH_RANKINGS;
} else {
_log.info("Peer publishing setting " + PROP_PUBLISH_RANKINGS + " set to " + val);
if (_log.shouldLog(Log.INFO))
_log.info("Peer publishing setting " + PROP_PUBLISH_RANKINGS
+ " set to " + val);
}
boolean v = Boolean.TRUE.toString().equalsIgnoreCase(val);
_includePeerRankings = v;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Setting includePeerRankings = " + v);
} catch (Throwable t) {
_log.error("Error determining whether to publish rankings [" + PROP_PUBLISH_RANKINGS + "=" + val + "], so we're defaulting to FALSE");
if (_log.shouldLog(Log.ERROR))
_log.error("Error determining whether to publish rankings ["
+ PROP_PUBLISH_RANKINGS + "=" + val
+ "], so we're defaulting to FALSE");
_includePeerRankings = false;
}
val = Router.getInstance().getConfigSetting(PROP_MAX_PUBLISHED_PEERS);
@ -65,7 +73,9 @@ public class StatisticsManager implements Service {
int num = Integer.parseInt(val);
_publishedStats = num;
} catch (NumberFormatException nfe) {
_log.error("Invalid max number of peers to publish [" + val + "], defaulting to " + DEFAULT_MAX_PUBLISHED_PEERS, nfe);
if (_log.shouldLog(Log.ERROR))
_log.error("Invalid max number of peers to publish [" + val
+ "], defaulting to " + DEFAULT_MAX_PUBLISHED_PEERS, nfe);
_publishedStats = DEFAULT_MAX_PUBLISHED_PEERS;
}
}
@ -82,13 +92,22 @@ public class StatisticsManager implements Service {
if (_includePeerRankings) {
stats.putAll(ProfileManager.getInstance().summarizePeers(_publishedStats));
includeRate("transport.sendProcessingTime", stats);
includeRate("tcp.queueSize", stats);
includeRate("jobQueue.jobLag", stats);
includeRate("jobQueue.jobRun", stats);
includeRate("crypto.elGamal.encrypt", stats);
includeRate("jobQueue.readyJobs", stats);
includeRate("jobQueue.droppedJobs", stats);
includeRate("transport.sendProcessingTime", stats, new long[] { 60*1000, 60*60*1000 });
//includeRate("tcp.queueSize", stats);
includeRate("jobQueue.jobLag", stats, new long[] { 60*1000, 60*60*1000 });
includeRate("jobQueue.jobRun", stats, new long[] { 60*1000, 60*60*1000 });
includeRate("crypto.elGamal.encrypt", stats, new long[] { 60*1000, 60*60*1000 });
includeRate("jobQueue.readyJobs", stats, new long[] { 60*1000, 60*60*1000 });
includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 });
includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 });
includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("netDb.lookupsReceived", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("netDb.lookupsHandled", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("netDb.storeSent", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("netDb.successPeers", stats, new long[] { 60*60*1000 });
includeRate("transport.receiveMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("transport.sendMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 });
stats.setProperty("stat_uptime", DataHelper.formatDuration(Router.getInstance().getUptime()));
stats.setProperty("stat__rateKey", "avg;maxAvg;pctLifetime;[sat;satLim;maxSat;maxSatLim;][num;lifetimeFreq;maxFreq]");
_log.debug("Publishing peer rankings");
@ -102,10 +121,25 @@ public class StatisticsManager implements Service {
}
private void includeRate(String rateName, Properties stats) {
includeRate(rateName, stats, null);
}
private void includeRate(String rateName, Properties stats, long selectedPeriods[]) {
RateStat rate = StatManager.getInstance().getRate(rateName);
if (rate == null) return;
for (int i = 0; i < rate.getPeriods().length; i++) {
Rate curRate = rate.getRate(rate.getPeriods()[i]);
long periods[] = rate.getPeriods();
for (int i = 0; i < periods.length; i++) {
if (selectedPeriods != null) {
boolean found = false;
for (int j = 0; j < selectedPeriods.length; j++) {
if (selectedPeriods[j] == periods[i]) {
found = true;
break;
}
}
if (!found) continue;
}
Rate curRate = rate.getRate(periods[i]);
if (curRate == null) continue;
stats.setProperty("stat_" + rateName + '.' + getPeriod(curRate), renderRate(curRate));
}
@ -142,6 +176,5 @@ public class StatisticsManager implements Service {
private final static DecimalFormat _pct = new DecimalFormat("#0.00%", new DecimalFormatSymbols(Locale.UK));
private final static String pct(double num) { synchronized (_pct) { return _pct.format(num); } }
public String renderStatusHTML() { return ""; }
}

View File

@ -15,13 +15,19 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.SourceRouteBlock;
import net.i2p.router.HandlerJobBuilder;
import net.i2p.router.Job;
import net.i2p.stat.StatManager;
/**
* Build a HandleDatabaseLookupMessageJob whenever a DatabaseLookupMessage arrives
*
*/
public class DatabaseLookupMessageHandler implements HandlerJobBuilder {
static {
StatManager.getInstance().createRateStat("netDb.lookupsReceived", "How many netDb lookups have we received?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
}
public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) {
StatManager.getInstance().addRateData("netDb.lookupsReceived", 1, 0);
// ignore the reply block for the moment
return new HandleDatabaseLookupMessageJob((DatabaseLookupMessage)receivedMessage, from, fromHash);
}

View File

@ -37,6 +37,7 @@ import net.i2p.router.message.SendMessageDirectJob;
import net.i2p.router.message.SendTunnelMessageJob;
import net.i2p.util.Clock;
import net.i2p.util.Log;
import net.i2p.stat.StatManager;
/**
* Handle a lookup for a key received from a remote peer. Needs to be implemented
@ -52,6 +53,11 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
private final static int REPLY_TIMEOUT = 60*1000;
private final static int MESSAGE_PRIORITY = 300;
static {
StatManager.getInstance().createRateStat("netDb.lookupsHandled", "How many netDb lookups have we handled?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
StatManager.getInstance().createRateStat("netDb.lookupsMatched", "How many netDb lookups did we have the data for?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
}
public HandleDatabaseLookupMessageJob(DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash) {
_message = receivedMessage;
_from = from;
@ -66,7 +72,8 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
if (_message.getReplyTunnel() != null) {
if (_log.shouldLog(Log.INFO))
_log.info("dbLookup received with replies going to " + fromKey + " (tunnel " + _message.getReplyTunnel() + ")");
_log.info("dbLookup received with replies going to " + fromKey
+ " (tunnel " + _message.getReplyTunnel() + ")");
}
NetworkDatabaseFacade.getInstance().store(fromKey, _message.getFrom());
@ -75,20 +82,24 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
if (ls != null) {
// send that lease set to the _message.getFromHash peer
if (_log.shouldLog(Log.DEBUG))
_log.debug("We do have key " + _message.getSearchKey().toBase64() + " locally as a lease set. sending to " + fromKey.toBase64());
_log.debug("We do have key " + _message.getSearchKey().toBase64()
+ " locally as a lease set. sending to " + fromKey.toBase64());
sendData(_message.getSearchKey(), ls, fromKey, _message.getReplyTunnel());
} else {
RouterInfo info = NetworkDatabaseFacade.getInstance().lookupRouterInfoLocally(_message.getSearchKey());
if (info != null) {
// send that routerInfo to the _message.getFromHash peer
if (_log.shouldLog(Log.DEBUG))
_log.debug("We do have key " + _message.getSearchKey().toBase64() + " locally as a router info. sending to " + fromKey.toBase64());
_log.debug("We do have key " + _message.getSearchKey().toBase64()
+ " locally as a router info. sending to " + fromKey.toBase64());
sendData(_message.getSearchKey(), info, fromKey, _message.getReplyTunnel());
} else {
// not found locally - return closest peer routerInfo structs
Set routerInfoSet = NetworkDatabaseFacade.getInstance().findNearestRouters(_message.getSearchKey(), MAX_ROUTERS_RETURNED, _message.getDontIncludePeers());
Set routerInfoSet = NetworkDatabaseFacade.getInstance().findNearestRouters(_message.getSearchKey(),
MAX_ROUTERS_RETURNED, _message.getDontIncludePeers());
if (_log.shouldLog(Log.DEBUG))
_log.debug("We do not have key " + _message.getSearchKey().toBase64() + " locally. sending back " + routerInfoSet.size() + " peers to " + fromKey.toBase64());
_log.debug("We do not have key " + _message.getSearchKey().toBase64() +
" locally. sending back " + routerInfoSet.size() + " peers to " + fromKey.toBase64());
sendClosest(_message.getSearchKey(), routerInfoSet, fromKey, _message.getReplyTunnel());
}
}
@ -96,7 +107,9 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
private void sendData(Hash key, DataStructure data, Hash toPeer, TunnelId replyTunnel) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending data matching key key " + key.toBase64() + " to peer " + toPeer.toBase64() + " tunnel " + replyTunnel);
_log.debug("Sending data matching key key " + key.toBase64() + " to peer " + toPeer.toBase64()
+ " tunnel " + replyTunnel);
StatManager.getInstance().addRateData("netDb.lookupsMatched", 1, 0);
DatabaseStoreMessage msg = new DatabaseStoreMessage();
msg.setKey(key);
if (data instanceof LeaseSet) {
@ -111,7 +124,8 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
private void sendClosest(Hash key, Set routerInfoSet, Hash toPeer, TunnelId replyTunnel) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending closest routers to key " + key.toBase64() + ": # peers = " + routerInfoSet.size() + " tunnel " + replyTunnel);
_log.debug("Sending closest routers to key " + key.toBase64() + ": # peers = "
+ routerInfoSet.size() + " tunnel " + replyTunnel);
DatabaseSearchReplyMessage msg = new DatabaseSearchReplyMessage();
msg.setFromHash(Router.getInstance().getRouterInfo().getIdentity().getHash());
msg.setSearchKey(key);
@ -124,6 +138,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
}
private void sendMessage(I2NPMessage message, Hash toPeer, TunnelId replyTunnel) {
StatManager.getInstance().addRateData("netDb.lookupsHandled", 1, 0);
Job send = null;
if (replyTunnel != null) {
sendThroughTunnel(message, toPeer, replyTunnel);

View File

@ -18,6 +18,7 @@ import net.i2p.router.MessageHistory;
import net.i2p.router.NetworkDatabaseFacade;
import net.i2p.router.ProfileManager;
import net.i2p.util.Log;
import net.i2p.stat.StatManager;
/**
* Receive DatabaseStoreMessage data and store it in the local net db
@ -29,6 +30,10 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
private RouterIdentity _from;
private Hash _fromHash;
static {
StatManager.getInstance().createRateStat("netDb.storeHandled", "How many netDb store messages have we handled?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
}
public HandleDatabaseStoreMessageJob(DatabaseStoreMessage receivedMessage, RouterIdentity from, Hash fromHash) {
_message = receivedMessage;
_from = from;
@ -40,21 +45,26 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
_log.debug("Handling database store message");
boolean wasNew = false;
if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET)
wasNew = (null == NetworkDatabaseFacade.getInstance().store(_message.getKey(), _message.getLeaseSet()));
else if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) {
if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) {
Object match = NetworkDatabaseFacade.getInstance().store(_message.getKey(), _message.getLeaseSet());
wasNew = (null == match);
} else if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) {
if (_log.shouldLog(Log.INFO))
_log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of " + new Date(_message.getRouterInfo().getPublished()));
wasNew = (null == NetworkDatabaseFacade.getInstance().store(_message.getKey(), _message.getRouterInfo()));
_log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of "
+ new Date(_message.getRouterInfo().getPublished()));
Object match = NetworkDatabaseFacade.getInstance().store(_message.getKey(), _message.getRouterInfo());
wasNew = (null == match);
ProfileManager.getInstance().heardAbout(_message.getKey());
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("Invalid DatabaseStoreMessage data type - " + _message.getValueType() + ": " + _message);
_log.error("Invalid DatabaseStoreMessage data type - " + _message.getValueType()
+ ": " + _message);
}
if (_from != null)
_fromHash = _from.getHash();
if (_fromHash != null)
ProfileManager.getInstance().dbStoreReceived(_fromHash, wasNew);
StatManager.getInstance().addRateData("netDb.storeHandled", 1, 0);
}
public String getName() { return "Handle Database Store Message"; }

View File

@ -100,6 +100,7 @@ class SearchJob extends JobImpl {
_log.info(getJobId() + ": Already completed");
return;
}
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Searching: " + _state);
if (isLocal()) {
if (_log.shouldLog(Log.INFO))
@ -143,7 +144,9 @@ class SearchJob extends JobImpl {
if (toCheck <= 0) {
// too many already pending
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Too many searches already pending (pending: " + _state.getPending().size() + " max: " + SEARCH_BREDTH + ")", new Exception("too many pending"));
_log.warn(getJobId() + ": Too many searches already pending (pending: "
+ _state.getPending().size() + " max: " + SEARCH_BREDTH + ")",
new Exception("too many pending"));
requeuePending();
return;
}
@ -152,12 +155,17 @@ class SearchJob extends JobImpl {
if (_state.getPending().size() <= 0) {
// we tried to find some peers, but there weren't any and no one else is going to answer
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": No peers left, and none pending! Already searched: " + _state.getAttempted().size() + " failed: " + _state.getFailed().size(), new Exception("none left"));
_log.warn(getJobId() + ": No peers left, and none pending! Already searched: "
+ _state.getAttempted().size() + " failed: " + _state.getFailed().size(),
new Exception("none left"));
fail();
} else {
// no more to try, but we might get data or close peers from some outstanding requests
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": No peers left, but some are pending! Pending: " + _state.getPending().size() + " attempted: " + _state.getAttempted().size() + " failed: " + _state.getFailed().size(), new Exception("none left, but pending"));
_log.warn(getJobId() + ": No peers left, but some are pending! Pending: "
+ _state.getPending().size() + " attempted: " + _state.getAttempted().size()
+ " failed: " + _state.getFailed().size(),
new Exception("none left, but pending"));
requeuePending();
return;
}
@ -168,7 +176,8 @@ class SearchJob extends JobImpl {
DataStructure ds = _facade.getDataStore().get(peer);
if ( (ds == null) || !(ds instanceof RouterInfo) ) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds);
_log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! "
+ peer + " : " + ds);
} else {
sendSearch((RouterInfo)ds);
}
@ -256,12 +265,17 @@ class SearchJob extends JobImpl {
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Sending leaseSet search to " + router.getIdentity().getHash().toBase64() + " for " + msg.getSearchKey().toBase64() + " w/ replies through [" + msg.getFrom().getIdentity().getHash().toBase64() + "] via tunnel [" + msg.getReplyTunnel() + "]");
_log.debug(getJobId() + ": Sending leaseSet search to " + router.getIdentity().getHash().toBase64()
+ " for " + msg.getSearchKey().toBase64() + " w/ replies through ["
+ msg.getFrom().getIdentity().getHash().toBase64() + "] via tunnel ["
+ msg.getReplyTunnel() + "]");
SearchMessageSelector sel = new SearchMessageSelector(router, _expiration, _state);
long timeoutMs = PER_PEER_TIMEOUT; // getTimeoutMs();
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(router, _state, _facade, this);
SendTunnelMessageJob j = new SendTunnelMessageJob(msg, outTunnelId, router.getIdentity().getHash(), null, null, reply, new FailedJob(router), sel, timeoutMs, SEARCH_PRIORITY);
SendTunnelMessageJob j = new SendTunnelMessageJob(msg, outTunnelId, router.getIdentity().getHash(),
null, null, reply, new FailedJob(router), sel,
timeoutMs, SEARCH_PRIORITY);
JobQueue.getInstance().addJob(j);
}
@ -272,11 +286,14 @@ class SearchJob extends JobImpl {
DatabaseLookupMessage msg = buildMessage(expiration);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64() + " for " + msg.getSearchKey().toBase64() + " w/ replies to us [" + msg.getFrom().getIdentity().getHash().toBase64() + "]");
_log.info(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64()
+ " for " + msg.getSearchKey().toBase64() + " w/ replies to us ["
+ msg.getFrom().getIdentity().getHash().toBase64() + "]");
SearchMessageSelector sel = new SearchMessageSelector(router, _expiration, _state);
long timeoutMs = PER_PEER_TIMEOUT;
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(router, _state, _facade, this);
SendMessageDirectJob j = new SendMessageDirectJob(msg, router.getIdentity().getHash(), reply, new FailedJob(router), sel, expiration, SEARCH_PRIORITY);
SendMessageDirectJob j = new SendMessageDirectJob(msg, router.getIdentity().getHash(),
reply, new FailedJob(router), sel, expiration, SEARCH_PRIORITY);
JobQueue.getInstance().addJob(j);
}
@ -372,7 +389,8 @@ class SearchJob extends JobImpl {
public String getName() { return "Process Reply for Kademlia Search"; }
public void runJob() {
if (_curIndex >= _msg.getNumReplies()) {
ProfileManager.getInstance().dbLookupReply(_peer, _newPeers, _seenPeers, _invalidPeers, _duplicatePeers, _duration);
ProfileManager.getInstance().dbLookupReply(_peer, _newPeers, _seenPeers,
_invalidPeers, _duplicatePeers, _duration);
} else {
RouterInfo ri = _msg.getReply(_curIndex);
if (ri.isValid()) {
@ -380,7 +398,9 @@ class SearchJob extends JobImpl {
_duplicatePeers++;
}
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": dbSearchReply received on search containing router " + ri.getIdentity().getHash() + " with publishDate of " + new Date(ri.getPublished()));
_log.info(getJobId() + ": dbSearchReply received on search containing router "
+ ri.getIdentity().getHash() + " with publishDate of "
+ new Date(ri.getPublished()));
_facade.store(ri.getIdentity().getHash(), ri);
if (_facade.getKBuckets().add(ri.getIdentity().getHash()))
_newPeers++;
@ -388,7 +408,8 @@ class SearchJob extends JobImpl {
_seenPeers++;
} else {
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Received an invalid peer from " + _peer + ": " + ri, new Exception("Invalid peer"));
_log.error(getJobId() + ": Received an invalid peer from " + _peer + ": "
+ ri, new Exception("Invalid peer"));
_invalidPeers++;
}
_curIndex++;
@ -421,9 +442,11 @@ class SearchJob extends JobImpl {
public void runJob() {
_state.replyTimeout(_peer);
if (_penalizePeer) {
if (_log.shouldLog(Log.WARN))
_log.warn("Penalizing peer for timeout on search: " + _peer.toBase64());
ProfileManager.getInstance().dbLookupFailed(_peer);
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("NOT (!!) Penalizing peer for timeout on search: " + _peer.toBase64());
}
searchNext();
@ -453,7 +476,9 @@ class SearchJob extends JobImpl {
* Search totally failed
*/
protected void fail() {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Failed search for key " + _state.getTarget());
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": State of failed search: " + _state);
if (_keepStats) {

View File

@ -48,6 +48,7 @@ import net.i2p.router.message.SendTunnelMessageJob;
import net.i2p.util.Clock;
import net.i2p.util.Log;
import net.i2p.util.RandomSource;
import net.i2p.stat.StatManager;
class StoreJob extends JobImpl {
private final Log _log = new Log(StoreJob.class);
@ -72,6 +73,10 @@ class StoreJob extends JobImpl {
private final static int EXPLORATORY_REDUNDANCY = 1;
private final static int STORE_PRIORITY = 100;
static {
StatManager.getInstance().createRateStat("netDb.storeSent", "How many netDb store messages have we sent?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
}
/**
* Create a new search for the routingKey specified
*
@ -99,6 +104,7 @@ class StoreJob extends JobImpl {
*/
protected void sendNext() {
if (_state.completed()) {
if (_log.shouldLog(Log.INFO))
_log.info("Already completed");
return;
}
@ -106,6 +112,7 @@ class StoreJob extends JobImpl {
_state.complete(true);
fail();
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Sending: " + _state);
continueSending();
}
@ -138,11 +145,13 @@ class StoreJob extends JobImpl {
}
} else {
_state.addPending(closestHashes);
if (_log.shouldLog(Log.INFO))
_log.info("Continue sending key " + _state.getTarget() + " to " + closestHashes);
for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
DataStructure ds = _facade.getDataStore().get(peer);
if ( (ds == null) || !(ds instanceof RouterInfo) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error selecting closest hash that wasnt a router! " + peer + " : " + ds);
} else {
sendStore((RouterInfo)ds);
@ -161,6 +170,7 @@ class StoreJob extends JobImpl {
*/
protected List getClosestRouters(Hash key, int numClosest, Set alreadyChecked) {
Hash rkey = RoutingKeyGenerator.getInstance().getRoutingKey(key);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Current routing key for " + key + ": " + rkey);
return PeerSelector.getInstance().selectNearestExplicit(rkey, numClosest, alreadyChecked, _facade.getKBuckets());
}
@ -183,9 +193,11 @@ class StoreJob extends JobImpl {
if (router.getIdentity().equals(Router.getInstance().getRouterInfo().getIdentity())) {
// don't send it to ourselves
if (_log.shouldLog(Log.ERROR))
_log.error("Dont send store to ourselves - why did we try?");
return;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send store to " + router.getIdentity().getHash().toBase64());
}
@ -193,7 +205,6 @@ class StoreJob extends JobImpl {
}
protected void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
//sendStoreAsGarlic(msg, peer, expiration);
sendStoreThroughTunnel(msg, peer, expiration);
}
@ -205,13 +216,21 @@ class StoreJob extends JobImpl {
if (outboundTunnelId != null)
info = TunnelManagerFacade.getInstance().getTunnelInfo(outboundTunnelId);
if (info == null) {
_log.error("selectOutboundTunnel didn't find a valid tunnel! outboundTunnelId = " + outboundTunnelId + " is not known by the tunnel manager");
if (_log.shouldLog(Log.ERROR))
_log.error("selectOutboundTunnel didn't find a valid tunnel! outboundTunnelId = "
+ outboundTunnelId + " is not known by the tunnel manager");
return;
}
_log.info("Store for " + _state.getTarget() + " expiring on " + new Date(_expiration) + " is going to " + peer.getIdentity().getHash() + " via outbound tunnel: " + info);
if (_log.shouldLog(Log.INFO))
_log.info("Store for " + _state.getTarget() + " expiring on " + new Date(_expiration)
+ " is going to " + peer.getIdentity().getHash() + " via outbound tunnel: " + info);
// send it out our outboundTunnelId with instructions for our endpoint to forward it
// to the router specified (though no particular tunnelId on the target)
JobQueue.getInstance().addJob(new SendTunnelMessageJob(msg, outboundTunnelId, peer.getIdentity().getHash(), null, sent, null, fail, null, _expiration-Clock.getInstance().now(), STORE_PRIORITY));
Job j = new SendTunnelMessageJob(msg, outboundTunnelId, peer.getIdentity().getHash(),
null, sent, null, fail, null, _expiration-Clock.getInstance().now(),
STORE_PRIORITY);
JobQueue.getInstance().addJob(j);
StatManager.getInstance().addRateData("netDb.storeSent", 1, 0);
}
private TunnelId selectOutboundTunnel() {
@ -230,191 +249,10 @@ class StoreJob extends JobImpl {
}
}
/**
* Send the store to the peer by way of a garlic and route an ack back to us
*
*/
protected void sendStoreAsGarlic(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
long waitingForId = RandomSource.getInstance().nextInt(Integer.MAX_VALUE);
GarlicConfig cfg = buildGarlicConfig(msg, peer, waitingForId, expiration);
FailedJob failedJob = new FailedJob(peer);
long timeoutMs = expiration - Clock.getInstance().now();
StoreMessageSelector selector = new StoreMessageSelector(peer, waitingForId);
SessionKey sentKey = new SessionKey();
Set sentTags = new HashSet(32);
PublicKey rcptKey = cfg.getRecipientPublicKey();
if (rcptKey == null) {
if (cfg.getRecipient() == null) {
throw new IllegalArgumentException("Null recipient specified");
} else if (cfg.getRecipient().getIdentity() == null) {
throw new IllegalArgumentException("Null recipient.identity specified");
} else if (cfg.getRecipient().getIdentity().getPublicKey() == null) {
throw new IllegalArgumentException("Null recipient.identity.publicKey specified");
} else
rcptKey = cfg.getRecipient().getIdentity().getPublicKey();
}
JobQueue.getInstance().addJob(new SendGarlicJob(cfg, null, failedJob, new UpdateReplyFoundJob(peer, sentKey, sentTags, rcptKey), failedJob, timeoutMs, STORE_PRIORITY, selector, sentKey, sentTags));
}
/**
* Build a garlic containing the data store and an ack to be unwrapped at the
* target, with the data store sent locally and the ack sent back to us through
* a random tunnel as a DeliveryStatusMessage containing the ackId
*
*/
protected GarlicConfig buildGarlicConfig(I2NPMessage msg, RouterInfo target, long ackId, long expiration) {
GarlicConfig config = new GarlicConfig();
PayloadGarlicConfig dataClove = buildDataClove(msg, target, expiration);
config.addClove(dataClove);
PayloadGarlicConfig ackClove = buildAckClove(ackId, expiration);
config.addClove(ackClove);
DeliveryInstructions instructions = new DeliveryInstructions();
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_ROUTER);
instructions.setDelayRequested(false);
instructions.setDelaySeconds(0);
instructions.setEncrypted(false);
instructions.setEncryptionKey(null);
instructions.setRouter(target.getIdentity().getHash());
instructions.setTunnelId(null);
config.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
config.setDeliveryInstructions(instructions);
config.setId(RandomSource.getInstance().nextInt(Integer.MAX_VALUE));
config.setExpiration(_expiration);
config.setRecipientPublicKey(target.getIdentity().getPublicKey());
config.setRecipient(target);
config.setRequestAck(false);
return config;
}
/**
* Build a clove that sends a DeliveryStatusMessage to us after tunneling it
* through a random inbound tunnel
*
*/
protected PayloadGarlicConfig buildAckClove(long ackId, long expiration) {
DeliveryStatusMessage ackMsg = new DeliveryStatusMessage();
ackMsg.setArrival(new Date(Clock.getInstance().now()));
ackMsg.setMessageId(ackId);
ackMsg.setMessageExpiration(new Date(expiration));
ackMsg.setUniqueId(RandomSource.getInstance().nextInt(Integer.MAX_VALUE));
PayloadGarlicConfig ackClove = new PayloadGarlicConfig();
TunnelSelectionCriteria criteria = new TunnelSelectionCriteria();
criteria.setAnonymityPriority(80);
criteria.setLatencyPriority(20);
criteria.setReliabilityPriority(50);
criteria.setMaximumTunnelsRequired(1);
criteria.setMinimumTunnelsRequired(1);
List tunnelIds = TunnelManagerFacade.getInstance().selectInboundTunnelIds(criteria);
if (tunnelIds.size() <= 0) {
_log.error("No inbound tunnels exist for a db store ack to come through!");
return null;
}
TunnelId replyToTunnelId = (TunnelId)tunnelIds.get(0); // tunnel id on that gateway
TunnelInfo info = TunnelManagerFacade.getInstance().getTunnelInfo(replyToTunnelId);
RouterInfo replyPeer = NetworkDatabaseFacade.getInstance().lookupRouterInfoLocally(info.getThisHop()); // inbound tunnel gateway
if (replyPeer == null) {
_log.error("We don't know how to reach the gateway of our own inbound tunnel?! " + info);
return null;
}
Hash replyToTunnelRouter = replyPeer.getIdentity().getHash();
DeliveryInstructions ackInstructions = new DeliveryInstructions();
ackInstructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_TUNNEL);
ackInstructions.setRouter(replyToTunnelRouter);
ackInstructions.setTunnelId(replyToTunnelId);
ackInstructions.setDelayRequested(false);
ackInstructions.setDelaySeconds(0);
ackInstructions.setEncrypted(false);
ackClove.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
ackClove.setDeliveryInstructions(ackInstructions);
ackClove.setExpiration(_expiration);
ackClove.setId(RandomSource.getInstance().nextInt(Integer.MAX_VALUE));
ackClove.setPayload(ackMsg);
ackClove.setRecipient(replyPeer);
ackClove.setRequestAck(false);
return ackClove;
}
/**
* Build a clove that sends the data to the target (which is local)
*/
protected PayloadGarlicConfig buildDataClove(I2NPMessage data, RouterInfo target, long expiration) {
PayloadGarlicConfig clove = new PayloadGarlicConfig();
DeliveryInstructions instructions = new DeliveryInstructions();
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL);
instructions.setRouter(target.getIdentity().getHash());
instructions.setTunnelId(null);
instructions.setDelayRequested(false);
instructions.setDelaySeconds(0);
instructions.setEncrypted(false);
clove.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
clove.setDeliveryInstructions(instructions);
clove.setExpiration(expiration);
clove.setId(RandomSource.getInstance().nextInt(Integer.MAX_VALUE));
clove.setPayload(data);
clove.setRecipientPublicKey(null);
clove.setRequestAck(false);
return clove;
}
/**
* Called after a match to a db store is found (match against a deliveryStatusMessage)
*
*/
protected class UpdateReplyFoundJob extends JobImpl implements ReplyJob {
private I2NPMessage _message;
private Hash _peer;
private SessionKey _sentKey;
private Set _sentTags;
private PublicKey _toKey;
public UpdateReplyFoundJob(RouterInfo peer, SessionKey sentKey, Set sentTags, PublicKey toKey) {
super();
_peer = peer.getIdentity().getHash();
_sentKey = sentKey;
_sentTags = sentTags;
_toKey = toKey;
}
public String getName() { return "Update Reply Found for Kademlia Store"; }
public void runJob() {
_log.info("Reply from " + _peer + " with message " + _message);
if (_message.getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
long delay = _state.confirmed(_peer);
ProfileManager.getInstance().dbStoreSent(_peer, delay);
if ( (_sentKey != null) && (_sentKey.getData() != null) && (_sentTags != null) && (_sentTags.size() > 0) && (_toKey != null) ) {
SessionKeyManager.getInstance().tagsDelivered(_toKey, _sentKey, _sentTags);
_log.info("Delivered tags successfully to " + _peer + "! # tags: " + _sentTags.size());
}
if (_state.getSuccessful().size() >= REDUNDANCY) {
succeed();
} else {
sendNext();
}
} else {
_log.error("Selector matched to an UpdateReplyFoundJob with a message that isnt a DeliveryStatusMessage! " + _message);
}
}
public void setMessage(I2NPMessage message) { _message = message; }
}
/**
* Called after sending a dbStore to a peer successfully without waiting for confirm and
@ -431,7 +269,9 @@ class StoreJob extends JobImpl {
public String getName() { return "Optimistic Kademlia Store Send Success"; }
public void runJob() {
_log.info("Optimistically marking store of " + _state.getTarget() + " to " + _peer + " successful");
if (_log.shouldLog(Log.INFO))
_log.info("Optimistically marking store of " + _state.getTarget()
+ " to " + _peer + " successful");
//long howLong = _state.confirmed(_peer);
//ProfileManager.getInstance().dbStoreSent(_peer, howLong);
@ -480,18 +320,23 @@ class StoreJob extends JobImpl {
public boolean continueMatching() { return !_found; }
public long getExpiration() { return _expiration; }
public boolean isMatch(I2NPMessage message) {
_log.debug("isMatch("+message.getClass().getName() + ") [want deliveryStatusMessage from " + _peer + " wrt " + _state.getTarget() + "]");
if (_log.shouldLog(Log.DEBUG))
_log.debug("isMatch("+message.getClass().getName() + ") [want deliveryStatusMessage from "
+ _peer + " wrt " + _state.getTarget() + "]");
if (message instanceof DeliveryStatusMessage) {
DeliveryStatusMessage msg = (DeliveryStatusMessage)message;
if (msg.getMessageId() == _waitingForId) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Found match for the key we're waiting for: " + _waitingForId);
_found = true;
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("DeliveryStatusMessage of a key we're not looking for");
return false;
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not a DeliveryStatusMessage");
return false;
}
@ -502,7 +347,9 @@ class StoreJob extends JobImpl {
* Send was totally successful
*/
protected void succeed() {
if (_log.shouldLog(Log.INFO))
_log.info("Succeeded sending key " + _state.getTarget());
if (_log.shouldLog(Log.DEBUG))
_log.debug("State of successful send: " + _state);
if (_onSuccess != null)
JobQueue.getInstance().addJob(_onSuccess);
@ -513,7 +360,9 @@ class StoreJob extends JobImpl {
* Send totally failed
*/
protected void fail() {
if (_log.shouldLog(Log.INFO))
_log.info("Failed sending key " + _state.getTarget());
if (_log.shouldLog(Log.DEBUG))
_log.debug("State of failed send: " + _state, new Exception("Who failed me?"));
if (_onFailure != null)
JobQueue.getInstance().addJob(_onFailure);
@ -675,4 +524,3 @@ class StoreJob extends JobImpl {
}
}
}

View File

@ -96,7 +96,11 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Lookup successful for tested peer " + _target.toBase64() + ", now continue with the test");
JobQueue.getInstance().addJob(new BuildTestMessageJob(info, Router.getInstance().getRouterInfo().getIdentity().getHash(), new JoinJob(_target, true), new JoinJob(_target, false), TIMEOUT, PRIORITY));
Hash peer = Router.getInstance().getRouterInfo().getIdentity().getHash();
JoinJob success = new JoinJob(_target, true);
JoinJob failure = new JoinJob(_target, false);
BuildTestMessageJob test = new BuildTestMessageJob(info, peer, success, failure, TIMEOUT, PRIORITY);
JobQueue.getInstance().addJob(test);
}
}
}
@ -104,9 +108,12 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
private void sendReply(boolean ok) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending reply to a tunnel create of id " + _message.getTunnelId() + " with ok (" + ok + ") to router " + _message.getReplyBlock().getRouter().toBase64());
_log.debug("Sending reply to a tunnel create of id " + _message.getTunnelId()
+ " with ok (" + ok + ") to router " + _message.getReplyBlock().getRouter().toBase64());
MessageHistory.getInstance().receiveTunnelCreate(_message.getTunnelId(), _message.getNextRouter(), new Date(Clock.getInstance().now() + 1000*_message.getTunnelDurationSeconds()), ok, _message.getReplyBlock().getRouter());
MessageHistory.getInstance().receiveTunnelCreate(_message.getTunnelId(), _message.getNextRouter(),
new Date(Clock.getInstance().now() + 1000*_message.getTunnelDurationSeconds()),
ok, _message.getReplyBlock().getRouter());
TunnelCreateStatusMessage msg = new TunnelCreateStatusMessage();
msg.setFromHash(Router.getInstance().getRouterInfo().getIdentity().getHash());

View File

@ -30,6 +30,7 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade {
static {
StatManager.getInstance().createFrequencyStat("tunnel.acceptRequestFrequency", "How often do we accept requests to join a tunnel?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
StatManager.getInstance().createFrequencyStat("tunnel.rejectRequestFrequency", "How often do we reject requests to join a tunnel?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
StatManager.getInstance().createRateStat("tunnel.participatingTunnels", "How many tunnels are we participating in?", "Tunnels", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
}
public PoolingTunnelManagerFacade() {
@ -57,33 +58,41 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade {
*/
public boolean joinTunnel(TunnelInfo info) {
if (info == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Null tunnel", new Exception("Null tunnel"));
StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency");
return false;
}
if (info.getSettings() == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Null settings!", new Exception("settings are null"));
StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency");
return false;
}
if (info.getSettings().getExpiration() == 0) {
_log.info("No expiration for tunnel " + info.getTunnelId().getTunnelId(), new Exception("No expiration"));
if (_log.shouldLog(Log.INFO))
_log.info("No expiration for tunnel " + info.getTunnelId().getTunnelId(),
new Exception("No expiration"));
StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency");
return false;
} else {
if (info.getSettings().getExpiration() < Clock.getInstance().now()) {
_log.warn("Already expired - " + new Date(info.getSettings().getExpiration()), new Exception("Already expired"));
if (_log.shouldLog(Log.WARN))
_log.warn("Already expired - " + new Date(info.getSettings().getExpiration()),
new Exception("Already expired"));
StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency");
return false;
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Joining tunnel: " + info);
boolean ok = _pool.addParticipatingTunnel(info);
if (!ok)
StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency");
else
StatManager.getInstance().updateFrequency("tunnel.acceptRequestFrequency");
StatManager.getInstance().addRateData("tunnel.participatingTunnels", _pool.getParticipatingTunnelCount(), 0);
return ok;
}
/**
@ -132,7 +141,8 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade {
TunnelId id = (TunnelId)iter.next();
TunnelInfo info = (TunnelInfo)_pool.getTunnelInfo(id);
if (isParticipant(info, peer)) {
_log.info("Peer " + peer.toBase64() + " failed and they participate in tunnel " + id.getTunnelId() + ". Marking the tunnel as not ready!");
_log.info("Peer " + peer.toBase64() + " failed and they participate in tunnel "
+ id.getTunnelId() + ". Marking the tunnel as not ready!");
info.setIsReady(false);
numFailed++;
@ -141,6 +151,7 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade {
}
}
if (_log.shouldLog(Log.INFO))
_log.info("On peer " + peer.toBase64() + " failure, " + numFailed + " tunnels were killed");
}

View File

@ -48,8 +48,8 @@ class TunnelPool {
/** active or has it been shutdown? */
private boolean _isLive;
/** write out the current state every 15 seconds */
private final static long WRITE_POOL_DELAY = 15*1000;
/** write out the current state every 60 seconds */
private final static long WRITE_POOL_DELAY = 60*1000;
/** allow the tunnel create timeout to be overridden, default is 60 seconds [but really slow computers should be larger] */
public final static String TUNNEL_CREATION_TIMEOUT_PARAM = "tunnel.creationTimeoutMs";
@ -146,11 +146,13 @@ class TunnelPool {
if (!_isLive) return false;
ClientTunnelPool pool = getClientPool(dest);
if (pool == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error allocating tunnel " + id + " to " + dest + ": no pool for the client known");
return false;
}
TunnelInfo tunnel = removeFreeTunnel(id);
if (tunnel == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error allocating tunnel " + id + " to " + dest + ": tunnel is no longer free?");
return false;
}
@ -265,6 +267,12 @@ class TunnelPool {
return new HashSet(_participatingTunnels.keySet());
}
}
public int getParticipatingTunnelCount() {
if (!_isLive) return 0;
synchronized (_participatingTunnels) {
return _participatingTunnels.size();
}
}
public TunnelInfo getParticipatingTunnel(TunnelId id) {
if (!_isLive) return null;
synchronized (_participatingTunnels) {
@ -495,10 +503,10 @@ class TunnelPool {
public void startup() {
if (_log.shouldLog(Log.INFO)) _log.info("Starting up tunnel pool");
_isLive = true;
_outboundTunnels = new HashMap(8);
_freeInboundTunnels = new HashMap(8);
_outboundTunnels = new HashMap(16);
_freeInboundTunnels = new HashMap(16);
_clientPools = new HashMap(8);
_participatingTunnels = new HashMap(8);
_participatingTunnels = new HashMap(64);
_pendingTunnels = new HashMap(8);
_poolSettings = createPoolSettings();
_persistenceHelper.loadPool(this);