Non-codel version of priority blocking queue, so we can

implement priority queues without necessarily committing to codel.
This commit is contained in:
zzz
2012-09-05 15:50:11 +00:00
parent 9286d6a7b8
commit b8949eafe2
4 changed files with 95 additions and 55 deletions

View File

@ -4,20 +4,6 @@ package net.i2p.router.util;
* For CoDelPriorityQueue
* @since 0.9.3
*/
public interface CDPQEntry extends CDQEntry {
public interface CDPQEntry extends CDQEntry, PQEntry {
/**
* Higher is higher priority
*/
public int getPriority();
/**
* To be set by the queue
*/
public void setSeqNum(long num);
/**
* Needed to ensure FIFO ordering within a single priority
*/
public long getSeqNum();
}

View File

@ -25,7 +25,7 @@ import net.i2p.util.Log;
*
* @since 0.9.3
*/
public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlockingQueue<E> {
public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlockingQueue<E> {
private final I2PAppContext _context;
private final Log _log;
@ -81,7 +81,7 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo
* @param name for stats
*/
public CoDelPriorityBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) {
super(initialCapacity, new PriorityComparator());
super(initialCapacity);
_context = ctx;
_log = ctx.logManager().getLog(CoDelPriorityBlockingQueue.class);
_name = name;
@ -94,30 +94,6 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo
_id = __id.incrementAndGet();
}
@Override
public boolean add(E o) {
timestamp(o);
return super.add(o);
}
@Override
public boolean offer(E o) {
timestamp(o);
return super.offer(o);
}
@Override
public boolean offer(E o, long timeout, TimeUnit unit) {
timestamp(o);
return super.offer(o, timeout, unit);
}
@Override
public void put(E o) {
timestamp(o);
super.put(o);
}
@Override
public void clear() {
super.clear();
@ -180,7 +156,8 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo
/////// private below here
private void timestamp(E o) {
@Override
protected void timestamp(E o) {
o.setSeqNum(_seqNum.incrementAndGet());
o.setEnqueueTime(_context.clock().now());
if (o.getPriority() < MIN_PRIORITY && _log.shouldLog(Log.WARN))
@ -317,17 +294,4 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo
private void control_law(long t) {
_drop_next = t + (long) (INTERVAL / Math.sqrt(_count));
}
/**
* highest priority first, then lowest sequence number first
*/
private static class PriorityComparator<E extends CDPQEntry> implements Comparator<E> {
public int compare(E l, E r) {
int d = r.getPriority() - l.getPriority();
if (d != 0)
return d;
long ld = l.getSeqNum() - r.getSeqNum();
return ld > 0 ? 1 : -1;
}
}
}

View File

@ -0,0 +1,23 @@
package net.i2p.router.util;
/**
* For PriBlockingQueue
* @since 0.9.3
*/
public interface PQEntry {
/**
* Higher is higher priority
*/
public int getPriority();
/**
* To be set by the queue
*/
public void setSeqNum(long num);
/**
* Needed to ensure FIFO ordering within a single priority
*/
public long getSeqNum();
}

View File

@ -0,0 +1,67 @@
package net.i2p.router.util;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Priority Blocking Queue using methods in the entries,
* as definied in PQEntry, to store priority and sequence number,
* ensuring FIFO order within a priority.
*
* Input: add(), offer(), and put() are overridden to add a sequence number.
*
* @since 0.9.3
*/
public class PriBlockingQueue<E extends PQEntry> extends PriorityBlockingQueue<E> {
private final AtomicLong _seqNum = new AtomicLong();
public PriBlockingQueue(int initialCapacity) {
super(initialCapacity, new PriorityComparator());
}
@Override
public boolean add(E o) {
timestamp(o);
return super.add(o);
}
@Override
public boolean offer(E o) {
timestamp(o);
return super.offer(o);
}
@Override
public boolean offer(E o, long timeout, TimeUnit unit) {
timestamp(o);
return super.offer(o, timeout, unit);
}
@Override
public void put(E o) {
timestamp(o);
super.put(o);
}
/////// private below here
protected void timestamp(E o) {
o.setSeqNum(_seqNum.incrementAndGet());
}
/**
* highest priority first, then lowest sequence number first
*/
private static class PriorityComparator<E extends PQEntry> implements Comparator<E> {
public int compare(E l, E r) {
int d = r.getPriority() - l.getPriority();
if (d != 0)
return d;
long ld = l.getSeqNum() - r.getSeqNum();
return ld > 0 ? 1 : -1;
}
}
}