- Add logging of drops
  - Set drop stat to delay of dropped item
  - Add no-drop priority
This commit is contained in:
zzz
2012-09-01 16:30:12 +00:00
parent 8bfbe855a1
commit 10d9eb70c8
2 changed files with 27 additions and 4 deletions

View File

@ -5,6 +5,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.util.Log;
/** /**
* CoDel implementation of Active Queue Management. * CoDel implementation of Active Queue Management.
@ -24,6 +25,8 @@ import net.i2p.I2PAppContext;
public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<E> { public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<E> {
private final I2PAppContext _context; private final I2PAppContext _context;
private final Log _log;
private final String _name;
// following 4 are state variables defined by sample code, locked by this // following 4 are state variables defined by sample code, locked by this
/** Time when we'll declare we're above target (0 if below) */ /** Time when we'll declare we're above target (0 if below) */
@ -66,9 +69,11 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
public CoDelBlockingQueue(I2PAppContext ctx, String name, int capacity) { public CoDelBlockingQueue(I2PAppContext ctx, String name, int capacity) {
super(capacity); super(capacity);
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(CoDelBlockingQueue.class);
_name = name;
STAT_DROP = "codel." + name + ".drop"; STAT_DROP = "codel." + name + ".drop";
STAT_DELAY = "codel." + name + ".delay"; STAT_DELAY = "codel." + name + ".delay";
ctx.statManager().createRequiredRateStat(STAT_DROP, "AQM drop events", "Router", RATES); ctx.statManager().createRequiredRateStat(STAT_DROP, "queue delay of dropped items", "Router", RATES);
ctx.statManager().createRequiredRateStat(STAT_DELAY, "average queue delay", "Router", RATES); ctx.statManager().createRequiredRateStat(STAT_DELAY, "average queue delay", "Router", RATES);
} }
@ -254,7 +259,11 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
} }
private void drop(E entry) { private void drop(E entry) {
_context.statManager().addRateData(STAT_DROP, 1); long delay = _context.clock().now() - entry.getEnqueueTime();
_context.statManager().addRateData(STAT_DROP, delay);
if (_log.shouldLog(Log.WARN))
_log.warn(_name + " dropped item with delay " + delay + ", " +
size() + " remaining in queue: " + entry);
entry.drop(); entry.drop();
} }

View File

@ -7,6 +7,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.util.Log;
/** /**
* CoDel implementation of Active Queue Management. * CoDel implementation of Active Queue Management.
@ -26,6 +27,8 @@ import net.i2p.I2PAppContext;
public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlockingQueue<E> { public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlockingQueue<E> {
private final I2PAppContext _context; private final I2PAppContext _context;
private final Log _log;
private final String _name;
private final AtomicLong _seqNum = new AtomicLong(); private final AtomicLong _seqNum = new AtomicLong();
// following 4 are state variables defined by sample code, locked by this // following 4 are state variables defined by sample code, locked by this
@ -65,6 +68,8 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo
private final String STAT_DELAY; private final String STAT_DELAY;
private static final long[] RATES = {5*60*1000}; private static final long[] RATES = {5*60*1000};
private static final int[] PRIORITIES = {100, 200, 300, 400, 500}; private static final int[] PRIORITIES = {100, 200, 300, 400, 500};
/** if priority is >= this, never drop */
public static final int DONT_DROP_PRIORITY = 1000;
/** /**
* @param name for stats * @param name for stats
@ -72,10 +77,12 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo
public CoDelPriorityBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) { public CoDelPriorityBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) {
super(initialCapacity, new PriorityComparator()); super(initialCapacity, new PriorityComparator());
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(CoDelPriorityBlockingQueue.class);
_name = name;
STAT_DROP = "codel." + name + ".drop."; STAT_DROP = "codel." + name + ".drop.";
STAT_DELAY = "codel." + name + ".delay"; STAT_DELAY = "codel." + name + ".delay";
for (int i = 0; i < PRIORITIES.length; i++) { for (int i = 0; i < PRIORITIES.length; i++) {
ctx.statManager().createRequiredRateStat(STAT_DROP + PRIORITIES[i], "AQM drop events by priority", "Router", RATES); ctx.statManager().createRequiredRateStat(STAT_DROP + PRIORITIES[i], "queue delay of dropped items by priority", "Router", RATES);
} }
ctx.statManager().createRequiredRateStat(STAT_DELAY, "average queue delay", "Router", RATES); ctx.statManager().createRequiredRateStat(STAT_DELAY, "average queue delay", "Router", RATES);
} }
@ -237,6 +244,7 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo
} }
} }
} else if (ok_to_drop && } else if (ok_to_drop &&
rv.getPriority() < DONT_DROP_PRIORITY &&
(_now - _drop_next < INTERVAL || _now - _first_above_time >= INTERVAL)) { (_now - _drop_next < INTERVAL || _now - _first_above_time >= INTERVAL)) {
// If we get here, then we're not in dropping state. If the sojourn time has been above // If we get here, then we're not in dropping state. If the sojourn time has been above
// target for interval, then we decide whether it's time to enter dropping state. // target for interval, then we decide whether it's time to enter dropping state.
@ -266,7 +274,13 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo
} }
private void drop(E entry) { private void drop(E entry) {
_context.statManager().addRateData(STAT_DROP + entry.getPriority(), 1); long delay = _context.clock().now() - entry.getEnqueueTime();
_context.statManager().addRateData(STAT_DROP + entry.getPriority(), delay);
if (_log.shouldLog(Log.WARN))
_log.warn(_name + " dropped item with delay " + delay + ", priority " +
entry.getPriority() + ", seq " +
entry.getSeqNum() + ", " +
size() + " remaining in queue: " + entry);
entry.drop(); entry.drop();
} }