From f29a45a2c2e75446247c4ed2a661bf3b92feb853 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 31 Oct 2012 16:15:32 +0000 Subject: [PATCH] * PriBlockingQueue: Enforce max size --- history.txt | 8 ++- .../src/net/i2p/router/RouterVersion.java | 2 +- .../router/transport/ntcp/NTCPConnection.java | 4 +- .../i2p/router/transport/udp/PeerState.java | 2 +- .../util/CoDelPriorityBlockingQueue.java | 13 +---- .../net/i2p/router/util/PriBlockingQueue.java | 52 ++++++++++++++++++- 6 files changed, 64 insertions(+), 17 deletions(-) diff --git a/history.txt b/history.txt index 149db427c7..8cd011371e 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,10 @@ +2012-10-31 zzz + * FIFOBandwidthRefiller: Reduce refill interval to smooth output + * I2CP: Reduce log level when outbound queue is full (ticket #758) + * i2ptunnel: Fix NPE in zzzot plugin + * PriBlockingQueue: Enforce max size + * Streaming: New disableRejectLogging option (default false), enable for snark + 2012-10-30 zzz * i2psnark: - Add kbucket debugging @@ -8,7 +15,6 @@ * i2ptunnel: - Create backup privkey files (ticket #752) - Fix NPE in Android startup - - Fix disabling proxy authorization * Installer: Drop news.xml and old certs * logs.jsp: - Don't display dup message if last diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 282c18b422..0725033fa6 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 2; + public final static long BUILD = 3; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index b89d72fb89..b646c8b1a9 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -162,7 +162,7 @@ class NTCPConnection { _bwInRequests = new ConcurrentHashSet(2); _bwOutRequests = new ConcurrentHashSet(8); //_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32); - _outbound = new PriBlockingQueue(32); + _outbound = new PriBlockingQueue(ctx, "NTCP-Connection", 32); _isInbound = true; _decryptBlockBuf = new byte[BLOCK_SIZE]; _curReadState = new ReadState(); @@ -190,7 +190,7 @@ class NTCPConnection { _bwInRequests = new ConcurrentHashSet(2); _bwOutRequests = new ConcurrentHashSet(8); //_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32); - _outbound = new PriBlockingQueue(32); + _outbound = new PriBlockingQueue(ctx, "NTCP-Connection", 32); _isInbound = false; _decryptBlockBuf = new byte[BLOCK_SIZE]; _curReadState = new ReadState(); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 3f0ca45fcb..a0b5d68711 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -325,7 +325,7 @@ class PeerState { _inboundMessages = new HashMap(8); _outboundMessages = new ArrayList(32); //_outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32); - _outboundQueue = new PriBlockingQueue(32); + _outboundQueue = new PriBlockingQueue(ctx, "UDP-PeerState", 32); // all createRateStat() moved to EstablishmentManager _remoteIP = remoteIP; _remotePeer = remotePeer; diff --git a/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java b/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java index b4da749014..42bbd1dd11 100644 --- a/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java +++ b/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java @@ -27,11 +27,6 @@ import net.i2p.util.Log; */ public class CoDelPriorityBlockingQueue extends PriBlockingQueue { - private final I2PAppContext _context; - private final Log _log; - private final String _name; - private final AtomicLong _seqNum = new AtomicLong(); - // following 4 are state variables defined by sample code, locked by this /** Time when we'll declare we're above target (0 if below) */ private long _first_above_time; @@ -75,7 +70,6 @@ public class CoDelPriorityBlockingQueue extends PriBlocking private final String STAT_DROP; private final String STAT_DELAY; - private static final long[] RATES = {5*60*1000, 60*60*1000}; public static final int MIN_PRIORITY = 100; private static final int[] PRIORITIES = {MIN_PRIORITY, 200, 300, 400, 500}; /** if priority is >= this, never drop */ @@ -86,10 +80,7 @@ public class CoDelPriorityBlockingQueue extends PriBlocking * @param name for stats */ public CoDelPriorityBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) { - super(initialCapacity); - _context = ctx; - _log = ctx.logManager().getLog(CoDelPriorityBlockingQueue.class); - _name = name; + super(ctx, name, initialCapacity); STAT_DROP = ("codel." + name + ".drop.").intern(); STAT_DELAY = ("codel." + name + ".delay").intern(); for (int i = 0; i < PRIORITIES.length; i++) { @@ -177,7 +168,7 @@ public class CoDelPriorityBlockingQueue extends PriBlocking @Override protected void timestamp(E o) { - o.setSeqNum(_seqNum.incrementAndGet()); + super.timestamp(o); o.setEnqueueTime(_context.clock().now()); if (o.getPriority() < MIN_PRIORITY && _log.shouldLog(Log.WARN)) _log.warn(_name + " added item with low priority " + o.getPriority() + diff --git a/router/java/src/net/i2p/router/util/PriBlockingQueue.java b/router/java/src/net/i2p/router/util/PriBlockingQueue.java index 486728bc48..9145fbd2eb 100644 --- a/router/java/src/net/i2p/router/util/PriBlockingQueue.java +++ b/router/java/src/net/i2p/router/util/PriBlockingQueue.java @@ -5,6 +5,9 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + /** * Priority Blocking Queue using methods in the entries, * as definied in PQEntry, to store priority and sequence number, @@ -16,32 +19,79 @@ import java.util.concurrent.atomic.AtomicLong; */ public class PriBlockingQueue extends PriorityBlockingQueue { + protected final I2PAppContext _context; + protected final Log _log; + protected final String _name; private final AtomicLong _seqNum = new AtomicLong(); + private final String STAT_FULL; + protected static final long[] RATES = {5*60*1000, 60*60*1000}; protected static final int BACKLOG_SIZE = 256; + protected static final int MAX_SIZE = 512; - public PriBlockingQueue(int initialCapacity) { + /** + * Bounded queue with a hardcoded failsafe max size, + * except when using put(), which is unbounded. + */ + public PriBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) { super(initialCapacity, new PriorityComparator()); + _context = ctx; + _log = ctx.logManager().getLog(PriorityBlockingQueue.class); + _name = name; + STAT_FULL = ("pbq." + name + ".full").intern(); + ctx.statManager().createRequiredRateStat(STAT_FULL, "queue full", "Router", RATES); } + /** + * OpenJDK add(o) calls offer(o), so use offer(o) to avoid dup stamping. + * Returns false if full + * @deprecated use offer(o) + */ @Override public boolean add(E o) { timestamp(o); + if (size() >= MAX_SIZE) { + _context.statManager().addRateData(STAT_FULL, 1); + return false; + } return super.add(o); } + /** + * Returns false if full + */ @Override public boolean offer(E o) { timestamp(o); + if (size() >= MAX_SIZE) { + _context.statManager().addRateData(STAT_FULL, 1); + return false; + } return super.offer(o); } + /** + * OpenJDK offer(o, timeout, unit) calls offer(o), so use offer(o) to avoid dup stamping. + * Non blocking. Returns false if full. + * @param timeout ignored + * @param unit ignored + * @deprecated use offer(o) + */ @Override public boolean offer(E o, long timeout, TimeUnit unit) { timestamp(o); + if (size() >= MAX_SIZE) { + _context.statManager().addRateData(STAT_FULL, 1); + return false; + } return super.offer(o, timeout, unit); } + /** + * OpenJDK put(o) calls offer(o), so use offer(o) to avoid dup stamping. + * Non blocking. Does not add if full. + * @deprecated use offer(o) + */ @Override public void put(E o) { timestamp(o);