forked from I2P_Developers/i2p.i2p
* TunnelDispatcher: Change participant expire List to a Queue for
efficiency and to remove global lock. Also remove separate time List for space savings.
This commit is contained in:
@ -6,6 +6,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
import net.i2p.data.DataHelper;
|
import net.i2p.data.DataHelper;
|
||||||
import net.i2p.data.Hash;
|
import net.i2p.data.Hash;
|
||||||
@ -757,81 +758,47 @@ public class TunnelDispatcher implements Service {
|
|||||||
/** @deprecated moved to router console */
|
/** @deprecated moved to router console */
|
||||||
public void renderStatusHTML(Writer out) throws IOException {}
|
public void renderStatusHTML(Writer out) throws IOException {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Expire participants.
|
||||||
|
* For efficiency, we keep the HopConfigs in a FIFO, and assume that
|
||||||
|
* tunnels expire (roughly) in the same order as they are added.
|
||||||
|
* As tunnels have a fixed expiration from now, that's a good assumption -
|
||||||
|
* see BuildHandler.handleReq().
|
||||||
|
*/
|
||||||
private class LeaveTunnel extends JobImpl {
|
private class LeaveTunnel extends JobImpl {
|
||||||
private List<HopConfig> _configs;
|
private final LinkedBlockingQueue<HopConfig> _configs;
|
||||||
private List<Long> _times;
|
|
||||||
|
|
||||||
public LeaveTunnel(RouterContext ctx) {
|
public LeaveTunnel(RouterContext ctx) {
|
||||||
super(ctx);
|
super(ctx);
|
||||||
getTiming().setStartAfter(ctx.clock().now());
|
_configs = new LinkedBlockingQueue();
|
||||||
_configs = new ArrayList(128);
|
// 20 min no tunnels accepted + 10 min tunnel expiration
|
||||||
_times = new ArrayList(128);
|
getTiming().setStartAfter(ctx.clock().now() + 30*60*1000);
|
||||||
|
getContext().jobQueue().addJob(LeaveTunnel.this);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final int LEAVE_BATCH_TIME = 10*1000;
|
private static final int LEAVE_BATCH_TIME = 10*1000;
|
||||||
public void add(HopConfig cfg) {
|
|
||||||
Long dropTime = Long.valueOf(cfg.getExpiration() + 2*Router.CLOCK_FUDGE_FACTOR + LEAVE_BATCH_TIME);
|
|
||||||
boolean noTunnels;
|
|
||||||
synchronized (LeaveTunnel.this) {
|
|
||||||
noTunnels = _configs.isEmpty();
|
|
||||||
_configs.add(cfg);
|
|
||||||
_times.add(dropTime);
|
|
||||||
|
|
||||||
// Make really sure we queue or requeue the job only when we have to, or else bad things happen.
|
|
||||||
// Locking around this part may not be sufficient but there was nothing before.
|
|
||||||
// Symptom is the Leave Participant job not running for 12m, leading to seesaw participating tunnel count
|
|
||||||
|
|
||||||
long oldAfter = getTiming().getStartAfter();
|
public void add(HopConfig cfg) {
|
||||||
long oldStart = getTiming().getActualStart();
|
_configs.offer(cfg);
|
||||||
if ( noTunnels || (oldAfter <= 0) ||
|
|
||||||
(oldAfter < getContext().clock().now() && oldAfter <= oldStart) || // if oldAfter > oldStart, it's late but it will run, so don't do this (race)
|
|
||||||
(oldAfter >= dropTime.longValue()) ) {
|
|
||||||
getTiming().setStartAfter(dropTime.longValue());
|
|
||||||
getContext().jobQueue().addJob(LeaveTunnel.this);
|
|
||||||
} else {
|
|
||||||
// already scheduled for the future, and before this expiration
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (_log.shouldLog(Log.DEBUG)) {
|
|
||||||
long now = getContext().clock().now();
|
|
||||||
_log.debug("Scheduling leave in " + DataHelper.formatDuration(dropTime.longValue()-now) +": " + cfg);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() { return "Leave participant"; }
|
public String getName() { return "Expire participating tunnels"; }
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
HopConfig cur = null;
|
HopConfig cur = null;
|
||||||
Long nextTime = null;
|
|
||||||
long now = getContext().clock().now() + LEAVE_BATCH_TIME; // leave all expiring in next 10 sec
|
long now = getContext().clock().now() + LEAVE_BATCH_TIME; // leave all expiring in next 10 sec
|
||||||
while (true) {
|
long nextTime = now + 10*60*1000;
|
||||||
synchronized (LeaveTunnel.this) {
|
while ((cur = _configs.peek()) != null) {
|
||||||
if (_configs.isEmpty())
|
long exp = cur.getExpiration() + (2 * Router.CLOCK_FUDGE_FACTOR) + LEAVE_BATCH_TIME;
|
||||||
return;
|
if (exp < now) {
|
||||||
nextTime = _times.get(0);
|
_configs.poll();
|
||||||
if (nextTime.longValue() <= now) {
|
|
||||||
cur = _configs.remove(0);
|
|
||||||
_times.remove(0);
|
|
||||||
if (!_times.isEmpty())
|
|
||||||
nextTime = _times.get(0);
|
|
||||||
else
|
|
||||||
nextTime = null;
|
|
||||||
} else {
|
|
||||||
cur = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cur != null)
|
|
||||||
remove(cur);
|
remove(cur);
|
||||||
else
|
} else if (exp < nextTime) {
|
||||||
|
nextTime = exp;
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
|
|
||||||
if (nextTime != null) {
|
|
||||||
synchronized (LeaveTunnel.this) {
|
|
||||||
getTiming().setStartAfter(nextTime.longValue());
|
|
||||||
getContext().jobQueue().addJob(LeaveTunnel.this);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
getTiming().setStartAfter(nextTime);
|
||||||
|
getContext().jobQueue().addJob(LeaveTunnel.this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user