diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 16ae226613..26e54e1913 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -28,6 +28,7 @@ import net.i2p.data.router.RouterIdentity; import net.i2p.router.CommSystemFacade.Status; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; +import net.i2p.router.util.TryCache; import net.i2p.util.Addresses; import net.i2p.util.ConcurrentHashSet; import net.i2p.util.I2PThread; @@ -53,7 +54,7 @@ class EventPumper implements Runnable { private final NTCPTransport _transport; private final ObjectCounter _blockedIPs; private long _expireIdleWriteTime; - private boolean _useDirect; + private static boolean _useDirect; /** * This probably doesn't need to be bigger than the largest typical @@ -63,13 +64,16 @@ class EventPumper implements Runnable { private static final int BUF_SIZE = 8*1024; private static final int MAX_CACHE_SIZE = 64; - /** - * Read buffers. (write buffers use wrap()) - * Shared if there are multiple routers in the JVM - * Note that if the routers have different PROP_DIRECT settings this will have a mix, - * so don't do that. - */ - private static final LinkedBlockingQueue _bufCache = new LinkedBlockingQueue(MAX_CACHE_SIZE); + private static class BufferFactory implements TryCache.ObjectFactory { + public ByteBuffer newInstance() { + if (_useDirect) + return ByteBuffer.allocateDirect(BUF_SIZE); + else + return ByteBuffer.allocate(BUF_SIZE); + } + } + + private static final TryCache _bufferCache = new TryCache<>(new BufferFactory(), MAX_CACHE_SIZE); /** * every few seconds, iterate across all ntcp connections just to make sure @@ -319,13 +323,7 @@ class EventPumper implements Runnable { } - // Clear the cache if the user changes the setting, - // so we can test the effect. - boolean newUseDirect = _context.getBooleanProperty(PROP_DIRECT); - if (_useDirect != newUseDirect) { - _useDirect = newUseDirect; - _bufCache.clear(); - } + _useDirect = _context.getBooleanProperty(PROP_DIRECT); } catch (RuntimeException re) { _log.error("Error in the event pumper", re); } @@ -363,7 +361,6 @@ class EventPumper implements Runnable { _wantsRead.clear(); _wantsRegister.clear(); _wantsWrite.clear(); - _bufCache.clear(); } /** @@ -461,27 +458,11 @@ class EventPumper implements Runnable { _selector.wakeup(); } - /** - * How many to keep in reserve. - * Shared if there are multiple routers in the JVM - */ - private static int _numBufs = MIN_BUFS; - private static int __consecutiveExtra; - /** * High-frequency path in thread. */ private ByteBuffer acquireBuf() { - ByteBuffer rv = _bufCache.poll(); - // discard buffer if _useDirect setting changes - if (rv == null || rv.isDirect() != _useDirect) { - if (_useDirect) - rv = ByteBuffer.allocateDirect(BUF_SIZE); - else - rv = ByteBuffer.allocate(BUF_SIZE); - _numBufs++; - } - return rv; + return _bufferCache.tryAcquire(); } /** @@ -490,27 +471,7 @@ class EventPumper implements Runnable { * High-frequency path in thread. */ public static void releaseBuf(ByteBuffer buf) { - // double check - if (buf.capacity() < BUF_SIZE) { - I2PAppContext.getGlobalContext().logManager().getLog(EventPumper.class).error("Bad size " + buf.capacity(), new Exception()); - return; - } - buf.clear(); - int extra = _bufCache.size(); - boolean cached = extra < _numBufs; - - // TODO always offer if direct? - if (cached) { - _bufCache.offer(buf); - if (extra > MIN_BUFS) { - __consecutiveExtra++; - if (__consecutiveExtra >= 20) { - if (_numBufs > MIN_BUFS) - _numBufs--; - __consecutiveExtra = 0; - } - } - } + _bufferCache.tryRelease(buf); } private void processAccept(SelectionKey key) { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index f5647661c6..f217517b7d 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -12,6 +12,7 @@ import net.i2p.data.SessionKey; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; import net.i2p.router.util.CDQEntry; +import net.i2p.router.util.TryCache; import net.i2p.util.Addresses; import net.i2p.util.Log; import net.i2p.util.SystemVersion; @@ -45,18 +46,25 @@ class UDPPacket implements CDQEntry { // private boolean _isInbound; private FIFOBandwidthLimiter.Request _bandwidthRequest; + private static class PacketFactory implements TryCache.ObjectFactory { + static RouterContext context; + public UDPPacket newInstance() { + return new UDPPacket(context); + } + } + // Warning - this mixes contexts in a multi-router JVM - private static final Queue _packetCache; + private static final TryCache _packetCache; + private static final TryCache.ObjectFactory _packetFactory; private static final boolean CACHE = true; - private static final int MIN_CACHE_SIZE = 64; private static final int MAX_CACHE_SIZE = 256; static { if (CACHE) { - long maxMemory = SystemVersion.getMaxMemory(); - int csize = (int) Math.max(MIN_CACHE_SIZE, Math.min(MAX_CACHE_SIZE, maxMemory / (1024*1024))); - _packetCache = new LinkedBlockingQueue(csize); + _packetFactory = new PacketFactory(); + _packetCache = new TryCache<>(_packetFactory, MAX_CACHE_SIZE); } else { _packetCache = null; + _packetFactory = null; } } @@ -398,18 +406,8 @@ class UDPPacket implements CDQEntry { public static UDPPacket acquire(RouterContext ctx, boolean inbound) { UDPPacket rv = null; if (CACHE) { - rv = _packetCache.poll(); - if (rv != null) { - synchronized(rv) { - if (!rv._released) { - Log log = rv._context.logManager().getLog(UDPPacket.class); - log.error("Unreleased cached packet", new Exception()); - rv = null; - } else { - rv.init(ctx); - } - } - } + rv = _packetCache.tryAcquire(); + rv.init(ctx); } if (rv == null) rv = new UDPPacket(ctx); @@ -440,7 +438,7 @@ class UDPPacket implements CDQEntry { } if (!CACHE) return; - _packetCache.offer(this); + _packetCache.tryRelease(this); } /** diff --git a/router/java/src/net/i2p/router/util/TryCache.java b/router/java/src/net/i2p/router/util/TryCache.java new file mode 100644 index 0000000000..faa03b0878 --- /dev/null +++ b/router/java/src/net/i2p/router/util/TryCache.java @@ -0,0 +1,91 @@ +package net.i2p.router.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An object cache which is safe to use by multiple threads without blocking. + * + * @author zab + * + * @param + */ +public class TryCache { + + /** + * Something that creates objects of the type cached by this cache + * + * @param + */ + public static interface ObjectFactory { + T newInstance(); + } + + private final ObjectFactory factory; + private final int capacity; + private final List items; + private final Lock lock = new ReentrantLock(); + + /** + * @param factory to be used for creating new instances + * @param capacity cache up to this many items + */ + public TryCache(ObjectFactory factory, int capacity) { + this.factory = factory; + this.capacity = capacity; + this.items = new ArrayList<>(capacity); + } + + /** + * @return a cached or newly created item from this cache + */ + public T tryAcquire() { + T rv = null; + if (lock.tryLock()) { + try { + if (!items.isEmpty()) { + rv = items.remove(items.size() - 1); + } + } finally { + lock.unlock(); + } + } + + if (rv == null) { + rv = factory.newInstance(); + } + return rv; + } + + /** + * Tries to return this item to the cache but it may fail if + * the cache has reached capacity or it's lock is held by + * another thread. + */ + public void tryRelease(T item) { + if (lock.tryLock()) { + try { + if (items.size() < capacity) { + items.add(item); + } + } finally { + lock.unlock(); + } + } + } + + /** + * Clears all cached items. This is the only method + * that blocks until it acquires the lock. + */ + public void clear() { + lock.lock(); + try { + items.clear(); + } finally { + lock.unlock(); + } + } +}