* PriBlockingQueue: Enforce max size

This commit is contained in:
zzz
2012-10-31 16:15:32 +00:00
parent a5b68d4fb0
commit f29a45a2c2
6 changed files with 64 additions and 17 deletions

View File

@ -1,3 +1,10 @@
2012-10-31 zzz
* FIFOBandwidthRefiller: Reduce refill interval to smooth output
* I2CP: Reduce log level when outbound queue is full (ticket #758)
* i2ptunnel: Fix NPE in zzzot plugin
* PriBlockingQueue: Enforce max size
* Streaming: New disableRejectLogging option (default false), enable for snark
2012-10-30 zzz 2012-10-30 zzz
* i2psnark: * i2psnark:
- Add kbucket debugging - Add kbucket debugging
@ -8,7 +15,6 @@
* i2ptunnel: * i2ptunnel:
- Create backup privkey files (ticket #752) - Create backup privkey files (ticket #752)
- Fix NPE in Android startup - Fix NPE in Android startup
- Fix disabling proxy authorization
* Installer: Drop news.xml and old certs * Installer: Drop news.xml and old certs
* logs.jsp: * logs.jsp:
- Don't display dup message if last - Don't display dup message if last

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 2; public final static long BUILD = 3;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@ -162,7 +162,7 @@ class NTCPConnection {
_bwInRequests = new ConcurrentHashSet(2); _bwInRequests = new ConcurrentHashSet(2);
_bwOutRequests = new ConcurrentHashSet(8); _bwOutRequests = new ConcurrentHashSet(8);
//_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32); //_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
_outbound = new PriBlockingQueue(32); _outbound = new PriBlockingQueue(ctx, "NTCP-Connection", 32);
_isInbound = true; _isInbound = true;
_decryptBlockBuf = new byte[BLOCK_SIZE]; _decryptBlockBuf = new byte[BLOCK_SIZE];
_curReadState = new ReadState(); _curReadState = new ReadState();
@ -190,7 +190,7 @@ class NTCPConnection {
_bwInRequests = new ConcurrentHashSet(2); _bwInRequests = new ConcurrentHashSet(2);
_bwOutRequests = new ConcurrentHashSet(8); _bwOutRequests = new ConcurrentHashSet(8);
//_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32); //_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
_outbound = new PriBlockingQueue(32); _outbound = new PriBlockingQueue(ctx, "NTCP-Connection", 32);
_isInbound = false; _isInbound = false;
_decryptBlockBuf = new byte[BLOCK_SIZE]; _decryptBlockBuf = new byte[BLOCK_SIZE];
_curReadState = new ReadState(); _curReadState = new ReadState();

View File

@ -325,7 +325,7 @@ class PeerState {
_inboundMessages = new HashMap(8); _inboundMessages = new HashMap(8);
_outboundMessages = new ArrayList(32); _outboundMessages = new ArrayList(32);
//_outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32); //_outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32);
_outboundQueue = new PriBlockingQueue(32); _outboundQueue = new PriBlockingQueue(ctx, "UDP-PeerState", 32);
// all createRateStat() moved to EstablishmentManager // all createRateStat() moved to EstablishmentManager
_remoteIP = remoteIP; _remoteIP = remoteIP;
_remotePeer = remotePeer; _remotePeer = remotePeer;

View File

@ -27,11 +27,6 @@ import net.i2p.util.Log;
*/ */
public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlockingQueue<E> { public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlockingQueue<E> {
private final I2PAppContext _context;
private final Log _log;
private final String _name;
private final AtomicLong _seqNum = new AtomicLong();
// following 4 are state variables defined by sample code, locked by this // following 4 are state variables defined by sample code, locked by this
/** Time when we'll declare we're above target (0 if below) */ /** Time when we'll declare we're above target (0 if below) */
private long _first_above_time; private long _first_above_time;
@ -75,7 +70,6 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlocking
private final String STAT_DROP; private final String STAT_DROP;
private final String STAT_DELAY; private final String STAT_DELAY;
private static final long[] RATES = {5*60*1000, 60*60*1000};
public static final int MIN_PRIORITY = 100; public static final int MIN_PRIORITY = 100;
private static final int[] PRIORITIES = {MIN_PRIORITY, 200, 300, 400, 500}; private static final int[] PRIORITIES = {MIN_PRIORITY, 200, 300, 400, 500};
/** if priority is >= this, never drop */ /** if priority is >= this, never drop */
@ -86,10 +80,7 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlocking
* @param name for stats * @param name for stats
*/ */
public CoDelPriorityBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) { public CoDelPriorityBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) {
super(initialCapacity); super(ctx, name, initialCapacity);
_context = ctx;
_log = ctx.logManager().getLog(CoDelPriorityBlockingQueue.class);
_name = name;
STAT_DROP = ("codel." + name + ".drop.").intern(); STAT_DROP = ("codel." + name + ".drop.").intern();
STAT_DELAY = ("codel." + name + ".delay").intern(); STAT_DELAY = ("codel." + name + ".delay").intern();
for (int i = 0; i < PRIORITIES.length; i++) { for (int i = 0; i < PRIORITIES.length; i++) {
@ -177,7 +168,7 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlocking
@Override @Override
protected void timestamp(E o) { protected void timestamp(E o) {
o.setSeqNum(_seqNum.incrementAndGet()); super.timestamp(o);
o.setEnqueueTime(_context.clock().now()); o.setEnqueueTime(_context.clock().now());
if (o.getPriority() < MIN_PRIORITY && _log.shouldLog(Log.WARN)) if (o.getPriority() < MIN_PRIORITY && _log.shouldLog(Log.WARN))
_log.warn(_name + " added item with low priority " + o.getPriority() + _log.warn(_name + " added item with low priority " + o.getPriority() +

View File

@ -5,6 +5,9 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
/** /**
* Priority Blocking Queue using methods in the entries, * Priority Blocking Queue using methods in the entries,
* as definied in PQEntry, to store priority and sequence number, * as definied in PQEntry, to store priority and sequence number,
@ -16,32 +19,79 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class PriBlockingQueue<E extends PQEntry> extends PriorityBlockingQueue<E> { public class PriBlockingQueue<E extends PQEntry> extends PriorityBlockingQueue<E> {
protected final I2PAppContext _context;
protected final Log _log;
protected final String _name;
private final AtomicLong _seqNum = new AtomicLong(); private final AtomicLong _seqNum = new AtomicLong();
private final String STAT_FULL;
protected static final long[] RATES = {5*60*1000, 60*60*1000};
protected static final int BACKLOG_SIZE = 256; protected static final int BACKLOG_SIZE = 256;
protected static final int MAX_SIZE = 512;
public PriBlockingQueue(int initialCapacity) { /**
* Bounded queue with a hardcoded failsafe max size,
* except when using put(), which is unbounded.
*/
public PriBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) {
super(initialCapacity, new PriorityComparator()); super(initialCapacity, new PriorityComparator());
_context = ctx;
_log = ctx.logManager().getLog(PriorityBlockingQueue.class);
_name = name;
STAT_FULL = ("pbq." + name + ".full").intern();
ctx.statManager().createRequiredRateStat(STAT_FULL, "queue full", "Router", RATES);
} }
/**
* OpenJDK add(o) calls offer(o), so use offer(o) to avoid dup stamping.
* Returns false if full
* @deprecated use offer(o)
*/
@Override @Override
public boolean add(E o) { public boolean add(E o) {
timestamp(o); timestamp(o);
if (size() >= MAX_SIZE) {
_context.statManager().addRateData(STAT_FULL, 1);
return false;
}
return super.add(o); return super.add(o);
} }
/**
* Returns false if full
*/
@Override @Override
public boolean offer(E o) { public boolean offer(E o) {
timestamp(o); timestamp(o);
if (size() >= MAX_SIZE) {
_context.statManager().addRateData(STAT_FULL, 1);
return false;
}
return super.offer(o); return super.offer(o);
} }
/**
* OpenJDK offer(o, timeout, unit) calls offer(o), so use offer(o) to avoid dup stamping.
* Non blocking. Returns false if full.
* @param timeout ignored
* @param unit ignored
* @deprecated use offer(o)
*/
@Override @Override
public boolean offer(E o, long timeout, TimeUnit unit) { public boolean offer(E o, long timeout, TimeUnit unit) {
timestamp(o); timestamp(o);
if (size() >= MAX_SIZE) {
_context.statManager().addRateData(STAT_FULL, 1);
return false;
}
return super.offer(o, timeout, unit); return super.offer(o, timeout, unit);
} }
/**
* OpenJDK put(o) calls offer(o), so use offer(o) to avoid dup stamping.
* Non blocking. Does not add if full.
* @deprecated use offer(o)
*/
@Override @Override
public void put(E o) { public void put(E o) {
timestamp(o); timestamp(o);