* Fix race in TunnelDispatcher which caused
participating tunnel count to seesaw - should increase network capacity * Leave participating tunnels in 10s batches for efficiency * Update participating tunnel ratestat when leaving a tunnel too, to generate a smoother graph * Fix tunnel.participatingMessageCount stat to include all participating tunnels, not just outbound endpoints * Simplify Expire Tunnel job name
This commit is contained in:
13
history.txt
13
history.txt
@ -1,4 +1,15 @@
|
|||||||
2008-02-17 zzz
|
2008-02-16 zzz
|
||||||
|
* Fix race in TunnelDispatcher which caused
|
||||||
|
participating tunnel count to seesaw -
|
||||||
|
should increase network capacity
|
||||||
|
* Leave participating tunnels in 10s batches for efficiency
|
||||||
|
* Update participating tunnel ratestat when leaving a tunnel too,
|
||||||
|
to generate a smoother graph
|
||||||
|
* Fix tunnel.participatingMessageCount stat to include all
|
||||||
|
participating tunnels, not just outbound endpoints
|
||||||
|
* Simplify Expire Tunnel job name
|
||||||
|
|
||||||
|
2008-02-13 zzz
|
||||||
* PersistentDataStore: Write out 300 records every 10 min
|
* PersistentDataStore: Write out 300 records every 10 min
|
||||||
rather than 1 every 10 sec;
|
rather than 1 every 10 sec;
|
||||||
Don't store leasesets to disk or read them in
|
Don't store leasesets to disk or read them in
|
||||||
|
@ -17,7 +17,7 @@ import net.i2p.CoreVersion;
|
|||||||
public class RouterVersion {
|
public class RouterVersion {
|
||||||
public final static String ID = "$Revision: 1.548 $ $Date: 2008-02-10 15:00:00 $";
|
public final static String ID = "$Revision: 1.548 $ $Date: 2008-02-10 15:00:00 $";
|
||||||
public final static String VERSION = "0.6.1.31";
|
public final static String VERSION = "0.6.1.31";
|
||||||
public final static long BUILD = 3;
|
public final static long BUILD = 4;
|
||||||
public static void main(String args[]) {
|
public static void main(String args[]) {
|
||||||
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
|
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
|
||||||
System.out.println("Router ID: " + RouterVersion.ID);
|
System.out.println("Router ID: " + RouterVersion.ID);
|
||||||
|
@ -338,14 +338,19 @@ public class TunnelDispatcher implements Service {
|
|||||||
_log.debug("removing " + cfg);
|
_log.debug("removing " + cfg);
|
||||||
|
|
||||||
boolean removed = false;
|
boolean removed = false;
|
||||||
|
int numParticipants = 0;
|
||||||
synchronized (_participatingConfig) {
|
synchronized (_participatingConfig) {
|
||||||
removed = (null != _participatingConfig.remove(recvId));
|
removed = (null != _participatingConfig.remove(recvId));
|
||||||
|
numParticipants = _participatingConfig.size();
|
||||||
}
|
}
|
||||||
if (!removed) {
|
if (!removed) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Participating tunnel, but no longer listed in participatingConfig? " + cfg);
|
_log.warn("Participating tunnel, but no longer listed in participatingConfig? " + cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0);
|
||||||
|
_context.statManager().addRateData("tunnel.participatingMessageCount", cfg.getProcessedMessagesCount(), 10*60*1000);
|
||||||
|
|
||||||
synchronized (_participants) {
|
synchronized (_participants) {
|
||||||
removed = (null != _participants.remove(recvId));
|
removed = (null != _participants.remove(recvId));
|
||||||
}
|
}
|
||||||
@ -357,8 +362,6 @@ public class TunnelDispatcher implements Service {
|
|||||||
synchronized (_outboundEndpoints) {
|
synchronized (_outboundEndpoints) {
|
||||||
removed = (null != _outboundEndpoints.remove(recvId));
|
removed = (null != _outboundEndpoints.remove(recvId));
|
||||||
}
|
}
|
||||||
|
|
||||||
_context.statManager().addRateData("tunnel.participatingMessageCount", cfg.getProcessedMessagesCount(), 10*60*1000);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -635,32 +638,41 @@ public class TunnelDispatcher implements Service {
|
|||||||
_times = new ArrayList(128);
|
_times = new ArrayList(128);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final int LEAVE_BATCH_TIME = 10*1000;
|
||||||
public void add(HopConfig cfg) {
|
public void add(HopConfig cfg) {
|
||||||
Long dropTime = new Long(cfg.getExpiration() + 2*Router.CLOCK_FUDGE_FACTOR);
|
Long dropTime = new Long(cfg.getExpiration() + 2*Router.CLOCK_FUDGE_FACTOR + LEAVE_BATCH_TIME);
|
||||||
|
boolean noTunnels;
|
||||||
synchronized (LeaveTunnel.this) {
|
synchronized (LeaveTunnel.this) {
|
||||||
|
noTunnels = _configs.size() <= 0;
|
||||||
_configs.add(cfg);
|
_configs.add(cfg);
|
||||||
_times.add(dropTime);
|
_times.add(dropTime);
|
||||||
}
|
|
||||||
|
|
||||||
|
// Make really sure we queue or requeue the job only when we have to, or else bad things happen.
|
||||||
|
// Locking around this part may not be sufficient but there was nothing before.
|
||||||
|
// Symptom is the Leave Participant job not running for 12m, leading to seesaw participating tunnel count
|
||||||
|
|
||||||
|
long oldAfter = getTiming().getStartAfter();
|
||||||
|
long oldStart = getTiming().getActualStart();
|
||||||
|
if ( noTunnels || (oldAfter <= 0) ||
|
||||||
|
(oldAfter < getContext().clock().now() && oldAfter <= oldStart) || // if oldAfter > oldStart, it's late but it will run, so don't do this (race)
|
||||||
|
(oldAfter >= dropTime.longValue()) ) {
|
||||||
|
getTiming().setStartAfter(dropTime.longValue());
|
||||||
|
getContext().jobQueue().addJob(LeaveTunnel.this);
|
||||||
|
} else {
|
||||||
|
// already scheduled for the future, and before this expiration
|
||||||
|
}
|
||||||
|
}
|
||||||
if (_log.shouldLog(Log.INFO)) {
|
if (_log.shouldLog(Log.INFO)) {
|
||||||
long now = getContext().clock().now();
|
long now = getContext().clock().now();
|
||||||
_log.info("Scheduling leave in " + DataHelper.formatDuration(dropTime.longValue()-now) +": " + cfg);
|
_log.info("Scheduling leave in " + DataHelper.formatDuration(dropTime.longValue()-now) +": " + cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
long oldAfter = getTiming().getStartAfter();
|
|
||||||
if ( (oldAfter <= 0) || (oldAfter < getContext().clock().now()) || (oldAfter >= dropTime.longValue()) ) {
|
|
||||||
getTiming().setStartAfter(dropTime.longValue());
|
|
||||||
getContext().jobQueue().addJob(LeaveTunnel.this);
|
|
||||||
} else {
|
|
||||||
// already scheduled for the future, and before this expiration
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() { return "Leave participant"; }
|
public String getName() { return "Leave participant"; }
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
HopConfig cur = null;
|
HopConfig cur = null;
|
||||||
Long nextTime = null;
|
Long nextTime = null;
|
||||||
long now = getContext().clock().now();
|
long now = getContext().clock().now() + LEAVE_BATCH_TIME; // leave all expiring in next 10 sec
|
||||||
while (true) {
|
while (true) {
|
||||||
synchronized (LeaveTunnel.this) {
|
synchronized (LeaveTunnel.this) {
|
||||||
if (_configs.size() <= 0)
|
if (_configs.size() <= 0)
|
||||||
@ -685,8 +697,10 @@ public class TunnelDispatcher implements Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (nextTime != null) {
|
if (nextTime != null) {
|
||||||
getTiming().setStartAfter(nextTime.longValue());
|
synchronized (LeaveTunnel.this) {
|
||||||
getContext().jobQueue().addJob(LeaveTunnel.this);
|
getTiming().setStartAfter(nextTime.longValue());
|
||||||
|
getContext().jobQueue().addJob(LeaveTunnel.this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,24 +29,7 @@ class ExpireJob extends JobImpl {
|
|||||||
getTiming().setStartAfter(expire);
|
getTiming().setStartAfter(expire);
|
||||||
}
|
}
|
||||||
public String getName() {
|
public String getName() {
|
||||||
if (_pool.getSettings().isExploratory()) {
|
return "Expire tunnel";
|
||||||
if (_pool.getSettings().isInbound()) {
|
|
||||||
return "Expire exploratory inbound tunnel";
|
|
||||||
} else {
|
|
||||||
return "Expire exploratory outbound tunnel";
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
StringBuffer rv = new StringBuffer(32);
|
|
||||||
if (_pool.getSettings().isInbound())
|
|
||||||
rv.append("Expire inbound client tunnel for ");
|
|
||||||
else
|
|
||||||
rv.append("Expire outbound client tunnel for ");
|
|
||||||
if (_pool.getSettings().getDestinationNickname() != null)
|
|
||||||
rv.append(_pool.getSettings().getDestinationNickname());
|
|
||||||
else
|
|
||||||
rv.append(_pool.getSettings().getDestination().toBase64().substring(0,4));
|
|
||||||
return rv.toString();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
if (!_leaseUpdated) {
|
if (!_leaseUpdated) {
|
||||||
|
Reference in New Issue
Block a user