forked from I2P_Developers/i2p.i2p
Implement tryLock-based object cache and make ntcp and ssu code use it
This commit is contained in:
@ -28,6 +28,7 @@ import net.i2p.data.router.RouterIdentity;
|
|||||||
import net.i2p.router.CommSystemFacade.Status;
|
import net.i2p.router.CommSystemFacade.Status;
|
||||||
import net.i2p.router.RouterContext;
|
import net.i2p.router.RouterContext;
|
||||||
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
||||||
|
import net.i2p.router.util.TryCache;
|
||||||
import net.i2p.util.Addresses;
|
import net.i2p.util.Addresses;
|
||||||
import net.i2p.util.ConcurrentHashSet;
|
import net.i2p.util.ConcurrentHashSet;
|
||||||
import net.i2p.util.I2PThread;
|
import net.i2p.util.I2PThread;
|
||||||
@ -53,7 +54,7 @@ class EventPumper implements Runnable {
|
|||||||
private final NTCPTransport _transport;
|
private final NTCPTransport _transport;
|
||||||
private final ObjectCounter<ByteArray> _blockedIPs;
|
private final ObjectCounter<ByteArray> _blockedIPs;
|
||||||
private long _expireIdleWriteTime;
|
private long _expireIdleWriteTime;
|
||||||
private boolean _useDirect;
|
private static boolean _useDirect;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This probably doesn't need to be bigger than the largest typical
|
* This probably doesn't need to be bigger than the largest typical
|
||||||
@ -63,13 +64,16 @@ class EventPumper implements Runnable {
|
|||||||
private static final int BUF_SIZE = 8*1024;
|
private static final int BUF_SIZE = 8*1024;
|
||||||
private static final int MAX_CACHE_SIZE = 64;
|
private static final int MAX_CACHE_SIZE = 64;
|
||||||
|
|
||||||
/**
|
private static class BufferFactory implements TryCache.ObjectFactory<ByteBuffer> {
|
||||||
* Read buffers. (write buffers use wrap())
|
public ByteBuffer newInstance() {
|
||||||
* Shared if there are multiple routers in the JVM
|
if (_useDirect)
|
||||||
* Note that if the routers have different PROP_DIRECT settings this will have a mix,
|
return ByteBuffer.allocateDirect(BUF_SIZE);
|
||||||
* so don't do that.
|
else
|
||||||
*/
|
return ByteBuffer.allocate(BUF_SIZE);
|
||||||
private static final LinkedBlockingQueue<ByteBuffer> _bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final TryCache<ByteBuffer> _bufferCache = new TryCache<>(new BufferFactory(), MAX_CACHE_SIZE);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* every few seconds, iterate across all ntcp connections just to make sure
|
* every few seconds, iterate across all ntcp connections just to make sure
|
||||||
@ -319,13 +323,7 @@ class EventPumper implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Clear the cache if the user changes the setting,
|
_useDirect = _context.getBooleanProperty(PROP_DIRECT);
|
||||||
// so we can test the effect.
|
|
||||||
boolean newUseDirect = _context.getBooleanProperty(PROP_DIRECT);
|
|
||||||
if (_useDirect != newUseDirect) {
|
|
||||||
_useDirect = newUseDirect;
|
|
||||||
_bufCache.clear();
|
|
||||||
}
|
|
||||||
} catch (RuntimeException re) {
|
} catch (RuntimeException re) {
|
||||||
_log.error("Error in the event pumper", re);
|
_log.error("Error in the event pumper", re);
|
||||||
}
|
}
|
||||||
@ -363,7 +361,6 @@ class EventPumper implements Runnable {
|
|||||||
_wantsRead.clear();
|
_wantsRead.clear();
|
||||||
_wantsRegister.clear();
|
_wantsRegister.clear();
|
||||||
_wantsWrite.clear();
|
_wantsWrite.clear();
|
||||||
_bufCache.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -461,27 +458,11 @@ class EventPumper implements Runnable {
|
|||||||
_selector.wakeup();
|
_selector.wakeup();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* How many to keep in reserve.
|
|
||||||
* Shared if there are multiple routers in the JVM
|
|
||||||
*/
|
|
||||||
private static int _numBufs = MIN_BUFS;
|
|
||||||
private static int __consecutiveExtra;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* High-frequency path in thread.
|
* High-frequency path in thread.
|
||||||
*/
|
*/
|
||||||
private ByteBuffer acquireBuf() {
|
private ByteBuffer acquireBuf() {
|
||||||
ByteBuffer rv = _bufCache.poll();
|
return _bufferCache.tryAcquire();
|
||||||
// discard buffer if _useDirect setting changes
|
|
||||||
if (rv == null || rv.isDirect() != _useDirect) {
|
|
||||||
if (_useDirect)
|
|
||||||
rv = ByteBuffer.allocateDirect(BUF_SIZE);
|
|
||||||
else
|
|
||||||
rv = ByteBuffer.allocate(BUF_SIZE);
|
|
||||||
_numBufs++;
|
|
||||||
}
|
|
||||||
return rv;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -490,27 +471,7 @@ class EventPumper implements Runnable {
|
|||||||
* High-frequency path in thread.
|
* High-frequency path in thread.
|
||||||
*/
|
*/
|
||||||
public static void releaseBuf(ByteBuffer buf) {
|
public static void releaseBuf(ByteBuffer buf) {
|
||||||
// double check
|
_bufferCache.tryRelease(buf);
|
||||||
if (buf.capacity() < BUF_SIZE) {
|
|
||||||
I2PAppContext.getGlobalContext().logManager().getLog(EventPumper.class).error("Bad size " + buf.capacity(), new Exception());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
buf.clear();
|
|
||||||
int extra = _bufCache.size();
|
|
||||||
boolean cached = extra < _numBufs;
|
|
||||||
|
|
||||||
// TODO always offer if direct?
|
|
||||||
if (cached) {
|
|
||||||
_bufCache.offer(buf);
|
|
||||||
if (extra > MIN_BUFS) {
|
|
||||||
__consecutiveExtra++;
|
|
||||||
if (__consecutiveExtra >= 20) {
|
|
||||||
if (_numBufs > MIN_BUFS)
|
|
||||||
_numBufs--;
|
|
||||||
__consecutiveExtra = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processAccept(SelectionKey key) {
|
private void processAccept(SelectionKey key) {
|
||||||
|
@ -12,6 +12,7 @@ import net.i2p.data.SessionKey;
|
|||||||
import net.i2p.router.RouterContext;
|
import net.i2p.router.RouterContext;
|
||||||
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
||||||
import net.i2p.router.util.CDQEntry;
|
import net.i2p.router.util.CDQEntry;
|
||||||
|
import net.i2p.router.util.TryCache;
|
||||||
import net.i2p.util.Addresses;
|
import net.i2p.util.Addresses;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
import net.i2p.util.SystemVersion;
|
import net.i2p.util.SystemVersion;
|
||||||
@ -45,18 +46,25 @@ class UDPPacket implements CDQEntry {
|
|||||||
// private boolean _isInbound;
|
// private boolean _isInbound;
|
||||||
private FIFOBandwidthLimiter.Request _bandwidthRequest;
|
private FIFOBandwidthLimiter.Request _bandwidthRequest;
|
||||||
|
|
||||||
|
private static class PacketFactory implements TryCache.ObjectFactory<UDPPacket> {
|
||||||
|
static RouterContext context;
|
||||||
|
public UDPPacket newInstance() {
|
||||||
|
return new UDPPacket(context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Warning - this mixes contexts in a multi-router JVM
|
// Warning - this mixes contexts in a multi-router JVM
|
||||||
private static final Queue<UDPPacket> _packetCache;
|
private static final TryCache<UDPPacket> _packetCache;
|
||||||
|
private static final TryCache.ObjectFactory<UDPPacket> _packetFactory;
|
||||||
private static final boolean CACHE = true;
|
private static final boolean CACHE = true;
|
||||||
private static final int MIN_CACHE_SIZE = 64;
|
|
||||||
private static final int MAX_CACHE_SIZE = 256;
|
private static final int MAX_CACHE_SIZE = 256;
|
||||||
static {
|
static {
|
||||||
if (CACHE) {
|
if (CACHE) {
|
||||||
long maxMemory = SystemVersion.getMaxMemory();
|
_packetFactory = new PacketFactory();
|
||||||
int csize = (int) Math.max(MIN_CACHE_SIZE, Math.min(MAX_CACHE_SIZE, maxMemory / (1024*1024)));
|
_packetCache = new TryCache<>(_packetFactory, MAX_CACHE_SIZE);
|
||||||
_packetCache = new LinkedBlockingQueue<UDPPacket>(csize);
|
|
||||||
} else {
|
} else {
|
||||||
_packetCache = null;
|
_packetCache = null;
|
||||||
|
_packetFactory = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -398,18 +406,8 @@ class UDPPacket implements CDQEntry {
|
|||||||
public static UDPPacket acquire(RouterContext ctx, boolean inbound) {
|
public static UDPPacket acquire(RouterContext ctx, boolean inbound) {
|
||||||
UDPPacket rv = null;
|
UDPPacket rv = null;
|
||||||
if (CACHE) {
|
if (CACHE) {
|
||||||
rv = _packetCache.poll();
|
rv = _packetCache.tryAcquire();
|
||||||
if (rv != null) {
|
rv.init(ctx);
|
||||||
synchronized(rv) {
|
|
||||||
if (!rv._released) {
|
|
||||||
Log log = rv._context.logManager().getLog(UDPPacket.class);
|
|
||||||
log.error("Unreleased cached packet", new Exception());
|
|
||||||
rv = null;
|
|
||||||
} else {
|
|
||||||
rv.init(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (rv == null)
|
if (rv == null)
|
||||||
rv = new UDPPacket(ctx);
|
rv = new UDPPacket(ctx);
|
||||||
@ -440,7 +438,7 @@ class UDPPacket implements CDQEntry {
|
|||||||
}
|
}
|
||||||
if (!CACHE)
|
if (!CACHE)
|
||||||
return;
|
return;
|
||||||
_packetCache.offer(this);
|
_packetCache.tryRelease(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
91
router/java/src/net/i2p/router/util/TryCache.java
Normal file
91
router/java/src/net/i2p/router/util/TryCache.java
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
package net.i2p.router.util;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An object cache which is safe to use by multiple threads without blocking.
|
||||||
|
*
|
||||||
|
* @author zab
|
||||||
|
*
|
||||||
|
* @param <T>
|
||||||
|
*/
|
||||||
|
public class TryCache<T> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Something that creates objects of the type cached by this cache
|
||||||
|
*
|
||||||
|
* @param <T>
|
||||||
|
*/
|
||||||
|
public static interface ObjectFactory<T> {
|
||||||
|
T newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
private final ObjectFactory<T> factory;
|
||||||
|
private final int capacity;
|
||||||
|
private final List<T> items;
|
||||||
|
private final Lock lock = new ReentrantLock();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param factory to be used for creating new instances
|
||||||
|
* @param capacity cache up to this many items
|
||||||
|
*/
|
||||||
|
public TryCache(ObjectFactory<T> factory, int capacity) {
|
||||||
|
this.factory = factory;
|
||||||
|
this.capacity = capacity;
|
||||||
|
this.items = new ArrayList<>(capacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a cached or newly created item from this cache
|
||||||
|
*/
|
||||||
|
public T tryAcquire() {
|
||||||
|
T rv = null;
|
||||||
|
if (lock.tryLock()) {
|
||||||
|
try {
|
||||||
|
if (!items.isEmpty()) {
|
||||||
|
rv = items.remove(items.size() - 1);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rv == null) {
|
||||||
|
rv = factory.newInstance();
|
||||||
|
}
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to return this item to the cache but it may fail if
|
||||||
|
* the cache has reached capacity or it's lock is held by
|
||||||
|
* another thread.
|
||||||
|
*/
|
||||||
|
public void tryRelease(T item) {
|
||||||
|
if (lock.tryLock()) {
|
||||||
|
try {
|
||||||
|
if (items.size() < capacity) {
|
||||||
|
items.add(item);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears all cached items. This is the only method
|
||||||
|
* that blocks until it acquires the lock.
|
||||||
|
*/
|
||||||
|
public void clear() {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
items.clear();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user