2005-04-05 jrandom

* After a successfull netDb search for a leaseSet, republish it to all of
      the peers we have tried so far who did not give us the key (up to 10),
      rather than the old K closest (which may include peers who had given us
      the key)
    * Don't wait 5 minutes to publish a leaseSet (duh!), and rather than
      republish it every 5 minutes, republish it every 3.  In addition, always
      republish as soon as the leaseSet changes (duh^2).
    * Minor fix for oddball startup race (thanks travis_bickle!)
    * Minor AES update to allow in-place decryption.
This commit is contained in:
jrandom
2005-04-05 16:06:14 +00:00
committed by zzz
parent 400feb3ba7
commit bc626ece2d
13 changed files with 195 additions and 40 deletions

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.178 $ $Date: 2005/04/01 08:29:26 $";
public final static String ID = "$Revision: 1.179 $ $Date: 2005/04/03 07:50:12 $";
public final static String VERSION = "0.5.0.5";
public final static long BUILD = 3;
public final static long BUILD = 4;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -289,8 +289,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
if (!_finished) {
if (_log.shouldLog(Log.ERROR))
_log.error("Unable to send to " + _toString + " because we couldn't find their leaseSet");
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send to " + _toString + " because we couldn't find their leaseSet");
}
dieFatal();

View File

@ -65,12 +65,12 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
private PeerSelector _peerSelector;
private RouterContext _context;
/**
* set of Hash objects of leases we're already managing (via RepublishLeaseSetJob).
* Map of Hash to RepublishLeaseSetJob for leases we'realready managing.
* This is added to when we create a new RepublishLeaseSetJob, and the values are
* removed when the job decides to stop running.
*
*/
private Set _publishingLeaseSets;
private Map _publishingLeaseSets;
/**
* Hash of the key currently being searched for, pointing the SearchJob that
@ -126,7 +126,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_log = _context.logManager().getLog(KademliaNetworkDatabaseFacade.class);
_initialized = false;
_peerSelector = new PeerSelector(_context);
_publishingLeaseSets = new HashSet(8);
_publishingLeaseSets = new HashMap(8);
_lastExploreNew = 0;
_knownRouters = 0;
_activeRequests = new HashMap(8);
@ -440,14 +440,16 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
synchronized (_explicitSendKeys) {
_explicitSendKeys.add(h);
}
Job j = null;
RepublishLeaseSetJob j = null;
synchronized (_publishingLeaseSets) {
boolean isNew = _publishingLeaseSets.add(h);
if (isNew)
j = (RepublishLeaseSetJob)_publishingLeaseSets.get(h);
if (j == null) {
j = new RepublishLeaseSetJob(_context, this, h);
_publishingLeaseSets.put(h, j);
}
}
if (j != null)
_context.jobQueue().addJob(j);
j.getTiming().setStartAfter(_context.clock().now());
_context.jobQueue().addJob(j);
}
void stopPublishing(Hash target) {

View File

@ -22,7 +22,7 @@ import net.i2p.util.Log;
*/
public class RepublishLeaseSetJob extends JobImpl {
private Log _log;
private final static long REPUBLISH_LEASESET_DELAY = 5*60*1000; // 5 mins
private final static long REPUBLISH_LEASESET_DELAY = 3*60*1000; // 3 mins
private final static long REPUBLISH_LEASESET_TIMEOUT = 60*1000;
private Hash _dest;
private KademliaNetworkDatabaseFacade _facade;
@ -32,7 +32,7 @@ public class RepublishLeaseSetJob extends JobImpl {
_log = ctx.logManager().getLog(RepublishLeaseSetJob.class);
_facade = facade;
_dest = destHash;
getTiming().setStartAfter(ctx.clock().now()+REPUBLISH_LEASESET_DELAY);
//getTiming().setStartAfter(ctx.clock().now()+REPUBLISH_LEASESET_DELAY);
}
public String getName() { return "Republish a local leaseSet"; }
public void runJob() {
@ -40,23 +40,29 @@ public class RepublishLeaseSetJob extends JobImpl {
if (getContext().clientManager().isLocal(_dest)) {
LeaseSet ls = _facade.lookupLeaseSetLocally(_dest);
if (ls != null) {
_log.warn("Client " + _dest + " is local, so we're republishing it");
if (_log.shouldLog(Log.INFO))
_log.info("Client " + _dest + " is local, so we're republishing it");
if (!ls.isCurrent(Router.CLOCK_FUDGE_FACTOR)) {
_log.warn("Not publishing a LOCAL lease that isn't current - " + _dest, new Exception("Publish expired LOCAL lease?"));
if (_log.shouldLog(Log.WARN))
_log.warn("Not publishing a LOCAL lease that isn't current - " + _dest, new Exception("Publish expired LOCAL lease?"));
} else {
getContext().jobQueue().addJob(new StoreJob(getContext(), _facade, _dest, ls, new OnSuccess(getContext()), new OnFailure(getContext()), REPUBLISH_LEASESET_TIMEOUT));
}
} else {
_log.warn("Client " + _dest + " is local, but we can't find a valid LeaseSet? perhaps its being rebuilt?");
if (_log.shouldLog(Log.WARN))
_log.warn("Client " + _dest + " is local, but we can't find a valid LeaseSet? perhaps its being rebuilt?");
}
requeue(REPUBLISH_LEASESET_DELAY);
long republishDelay = getContext().random().nextLong(2*REPUBLISH_LEASESET_DELAY);
requeue(republishDelay);
return;
} else {
_log.info("Client " + _dest + " is no longer local, so no more republishing their leaseSet");
if (_log.shouldLog(Log.INFO))
_log.info("Client " + _dest + " is no longer local, so no more republishing their leaseSet");
}
_facade.stopPublishing(_dest);
} catch (RuntimeException re) {
_log.error("Uncaught error republishing the leaseSet", re);
if (_log.shouldLog(Log.ERROR))
_log.error("Uncaught error republishing the leaseSet", re);
_facade.stopPublishing(_dest);
throw re;
}

View File

@ -16,10 +16,12 @@ import java.util.Set;
import net.i2p.data.DataHelper;
import net.i2p.data.DataStructure;
import net.i2p.data.Hash;
import net.i2p.data.LeaseSet;
import net.i2p.data.RouterInfo;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
@ -97,6 +99,7 @@ class SearchJob extends JobImpl {
getContext().statManager().createRateStat("netDb.searchReplyValidated", "How many search replies we get that we are able to validate (fetch)", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchReplyNotValidated", "How many search replies we get that we are NOT able to validate (fetch)", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchReplyValidationSkipped", "How many search replies we get from unreliable peers that we skip?", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.republishQuantity", "How many peers do we need to send a found leaseSet to?", "NetworkDatabase", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
if (_log.shouldLog(Log.DEBUG))
_log.debug("Search (" + getClass().getName() + " for " + key.toBase64(), new Exception("Search enqueued by"));
}
@ -586,6 +589,15 @@ class SearchJob extends JobImpl {
resend();
}
/**
* After a successful search for a leaseSet, we resend that leaseSet to all
* of the peers we tried and failed to query. This var bounds how many of
* those peers will get the data, in case a search had to crawl about
* substantially.
*
*/
private static final int MAX_LEASE_RESEND = 10;
/**
* After we get the data we were searching for, rebroadcast it to the peers
* we would query first if we were to search for it again (healing the network).
@ -593,12 +605,54 @@ class SearchJob extends JobImpl {
*/
private void resend() {
DataStructure ds = _facade.lookupLeaseSetLocally(_state.getTarget());
if (ds == null)
if (ds == null) {
ds = _facade.lookupRouterInfoLocally(_state.getTarget());
if (ds != null)
getContext().jobQueue().addJob(new StoreJob(getContext(), _facade, _state.getTarget(),
ds, null, null, RESEND_TIMEOUT,
_state.getSuccessful()));
if (ds != null)
getContext().jobQueue().addJob(new StoreJob(getContext(), _facade, _state.getTarget(),
ds, null, null, RESEND_TIMEOUT,
_state.getSuccessful()));
} else {
Set sendTo = _state.getFailed();
sendTo.addAll(_state.getPending());
int numSent = 0;
for (Iterator iter = sendTo.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
RouterInfo peerInfo = _facade.lookupRouterInfoLocally(peer);
if (peerInfo == null) continue;
if (resend(peerInfo, (LeaseSet)ds))
numSent++;
if (numSent >= MAX_LEASE_RESEND)
break;
}
getContext().statManager().addRateData("netDb.republishQuantity", numSent, numSent);
}
}
/**
* Resend the leaseSet to the peer who had previously failed to
* provide us with the data when we asked them.
*/
private boolean resend(RouterInfo toPeer, LeaseSet ls) {
DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext());
msg.setKey(ls.getDestination().calculateHash());
msg.setLeaseSet(ls);
msg.setMessageExpiration(getContext().clock().now() + RESEND_TIMEOUT);
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel();
if (outTunnel != null) {
TunnelId targetTunnelId = null; // not needed
Job onSend = null; // not wanted
if (_log.shouldLog(Log.DEBUG))
_log.debug("resending leaseSet out to " + toPeer.getIdentity().getHash() + " through " + outTunnel + ": " + msg);
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, toPeer.getIdentity().getHash());
return true;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("unable to resend a leaseSet - no outbound exploratory tunnels!");
return false;
}
}
/**

View File

@ -267,8 +267,11 @@ public class TunnelPoolManager implements TunnelManagerFacade {
*/
int allocateBuilds(int wanted) {
synchronized (this) {
if (_outstandingBuilds >= _maxOutstandingBuilds)
return 0;
if (_outstandingBuilds >= _maxOutstandingBuilds) {
// ok, as a failsafe, always let one through
_outstandingBuilds++;
return 1;
}
if (_outstandingBuilds + wanted < _maxOutstandingBuilds) {
_outstandingBuilds += wanted;
return wanted;