Util: Convert more caches to TryCache (ticket #2263)

This commit is contained in:
zzz
2018-07-10 21:21:32 +00:00
parent f6da5f43aa
commit 6ad1de8d85
6 changed files with 90 additions and 122 deletions

View File

@ -2,9 +2,7 @@ package net.i2p.util;
import java.util.Arrays;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
@ -22,13 +20,14 @@ import net.i2p.data.ByteArray;
Size Max MaxMem From
1K 32 32K tunnel TrivialPreprocessor
*changed to 512 since we disabled resize()
1K 512 512K tunnel FragmentHandler
1K 512 512K I2NP TunnelDataMessage
1K 512 512K tunnel FragmentedMessage
1730 128 216K streaming MessageOutputStream
1572 64 100K UDP InboundMessageState
2K 64 128K UDP IMS
1730 128 216K streaming MessageOutputStream
4K 32 128K I2PTunnelRunner
@ -49,7 +48,7 @@ import net.i2p.data.ByteArray;
* </pre>
*
*/
public final class ByteCache {
public final class ByteCache extends TryCache<ByteArray> {
//private static final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class);
private static final Map<Integer, ByteCache> _caches = new ConcurrentHashMap<Integer, ByteCache>(16);
@ -83,10 +82,13 @@ public final class ByteCache {
if (cacheSize * size > MAX_CACHE)
cacheSize = MAX_CACHE / size;
Integer sz = Integer.valueOf(size);
ByteCache cache = _caches.get(sz);
if (cache == null) {
cache = new ByteCache(cacheSize, size);
_caches.put(sz, cache);
ByteCache cache;
synchronized(_caches) {
cache = _caches.get(sz);
if (cache == null) {
cache = new ByteCache(cacheSize, size);
_caches.put(sz, cache);
}
}
cache.resize(cacheSize);
//I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class).error("ByteCache size: " + size + " max: " + cacheSize, new Exception("from"));
@ -103,109 +105,85 @@ public final class ByteCache {
//_log.warn("WARNING: Low memory, clearing byte caches");
}
/** list of available and available entries */
private volatile Queue<ByteArray> _available;
private int _maxCached;
private final int _entrySize;
private volatile long _lastOverflow;
/** do we actually want to cache? Warning - setting to false may NPE, this should be fixed or removed */
private static final boolean _cache = true;
/** how often do we cleanup the cache */
private static final int CLEANUP_FREQUENCY = 33*1000;
/** if we haven't exceeded the cache size in 2 minutes, cut our cache in half */
private static final long EXPIRE_PERIOD = 2*60*1000;
/** @since 0.9.36 */
private static class ByteArrayFactory implements TryCache.ObjectFactory<ByteArray> {
private final int sz;
ByteArrayFactory(int entrySize) {
sz = entrySize;
}
public ByteArray newInstance() {
byte data[] = new byte[sz];
ByteArray rv = new ByteArray(data);
rv.setValid(0);
return rv;
}
}
private ByteCache(int maxCachedEntries, int entrySize) {
if (_cache)
_available = new LinkedBlockingQueue<ByteArray>(maxCachedEntries);
_maxCached = maxCachedEntries;
super(new ByteArrayFactory(entrySize), maxCachedEntries);
_entrySize = entrySize;
_lastOverflow = -1;
SimpleTimer2.getInstance().addPeriodicEvent(new Cleanup(), CLEANUP_FREQUENCY + (entrySize % 777)); //stagger
int stagger = SystemVersion.isAndroid() ? 0 : (entrySize % 777);
SimpleTimer2.getInstance().addPeriodicEvent(new Cleanup(), CLEANUP_FREQUENCY + stagger);
I2PAppContext.getGlobalContext().statManager().createRateStat("byteCache.memory." + entrySize, "Memory usage (B)", "Router", new long[] { 10*60*1000 });
}
private void resize(int maxCachedEntries) {
if (_maxCached >= maxCachedEntries) return;
_maxCached = maxCachedEntries;
// make a bigger one, move the cached items over
Queue<ByteArray> newLBQ = new LinkedBlockingQueue<ByteArray>(maxCachedEntries);
ByteArray ba;
while ((ba = _available.poll()) != null)
newLBQ.offer(ba);
_available = newLBQ;
}
/**
* Get the next available structure, either from the cache or a brand new one.
* Returned ByteArray will have valid = 0 and offset = 0.
* Returned ByteArray may or may not be zero, depends on whether
* release(ba) or release(ba, false) was called.
* Which is a problem, you should really specify shouldZero on acquire, not release.
*/
public final ByteArray acquire() {
if (_cache) {
ByteArray rv = _available.poll();
if (rv != null)
return rv;
}
_lastOverflow = System.currentTimeMillis();
byte data[] = new byte[_entrySize];
ByteArray rv = new ByteArray(data);
rv.setValid(0);
//rv.setOffset(0);
return rv;
// disabled since we're now extending TryCache
}
/**
* Put this structure back onto the available cache for reuse
*
*/
@Override
public final void release(ByteArray entry) {
release(entry, true);
}
public final void release(ByteArray entry, boolean shouldZero) {
if (_cache) {
if (entry == null || entry.getData() == null)
return;
if (entry.getData().length != _entrySize) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class);
if (log.shouldLog(Log.WARN))
log.warn("Bad size", new Exception("I did it"));
return;
}
entry.setValid(0);
entry.setOffset(0);
if (shouldZero)
Arrays.fill(entry.getData(), (byte)0x0);
_available.offer(entry);
if (entry == null || entry.getData() == null)
return;
if (entry.getData().length != _entrySize) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class);
if (log.shouldLog(Log.WARN))
log.warn("Bad size", new Exception("I did it"));
return;
}
}
/**
* Clear everything (memory pressure)
* @since 0.7.14
*/
private void clear() {
_available.clear();
entry.setValid(0);
entry.setOffset(0);
if (shouldZero)
Arrays.fill(entry.getData(), (byte)0x0);
super.release(entry);
}
private class Cleanup implements SimpleTimer.TimedEvent {
public void timeReached() {
I2PAppContext.getGlobalContext().statManager().addRateData("byteCache.memory." + _entrySize, _entrySize * _available.size(), 0);
if (System.currentTimeMillis() - _lastOverflow > EXPIRE_PERIOD) {
// we haven't exceeded the cache size in a few minutes, so lets
// shrink the cache
int toRemove = _available.size() / 2;
for (int i = 0; i < toRemove; i++)
_available.poll();
//if ( (toRemove > 0) && (_log.shouldLog(Log.DEBUG)) )
// _log.debug("Removing " + toRemove + " cached entries of size " + _entrySize);
int origsz;
lock.lock();
try {
origsz = items.size();
if (origsz > 1 && System.currentTimeMillis() - _lastUnderflow > EXPIRE_PERIOD) {
// we haven't exceeded the cache size in a few minutes, so lets
// shrink the cache
int toRemove = origsz / 2;
for (int i = 0; i < toRemove; i++) {
items.remove(items.size() - 1);
}
}
} finally {
lock.unlock();
}
I2PAppContext.getGlobalContext().statManager().addRateData("byteCache.memory." + _entrySize, _entrySize * origsz);
}
@Override

View File

@ -1,9 +1,6 @@
package net.i2p.util;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Like ByteCache but works directly with byte arrays, not ByteArrays.
@ -19,9 +16,6 @@ public final class SimpleByteCache {
private static final int DEFAULT_SIZE = 64;
/** up to this, use ABQ to minimize object churn and for performance; above this, use LBQ for two locks */
private static final int MAX_FOR_ABQ = 64;
/**
* Get a cache responsible for arrays of the given size
*
@ -60,38 +54,32 @@ public final class SimpleByteCache {
bc.clear();
}
/** list of available and available entries */
private Queue<byte[]> _available;
private int _maxCached;
private final TryCache<byte[]> _available;
private final int _entrySize;
/** @since 0.9.36 */
private static class ByteArrayFactory implements TryCache.ObjectFactory<byte[]> {
private final int sz;
ByteArrayFactory(int entrySize) {
sz = entrySize;
}
public byte[] newInstance() {
return new byte[sz];
}
}
private SimpleByteCache(int maxCachedEntries, int entrySize) {
_maxCached = maxCachedEntries;
_available = createQueue();
_available = new TryCache(new ByteArrayFactory(entrySize), maxCachedEntries);
_entrySize = entrySize;
}
private void resize(int maxCachedEntries) {
if (_maxCached >= maxCachedEntries) return;
_maxCached = maxCachedEntries;
// make a bigger one, move the cached items over
Queue<byte[]> newLBQ = createQueue();
byte[] ba;
while ((ba = _available.poll()) != null)
newLBQ.offer(ba);
_available = newLBQ;
// _available is now final, and getInstance() is not used anywhere,
// all call sites just use static acquire()
}
/**
* @return LBQ or ABQ
* @since 0.9.2
*/
private Queue<byte[]> createQueue() {
if (_entrySize <= MAX_FOR_ABQ)
return new ArrayBlockingQueue<byte[]>(_maxCached);
return new LinkedBlockingQueue<byte[]>(_maxCached);
}
/**
* Get the next available array, either from the cache or a brand new one
*/
@ -99,14 +87,11 @@ public final class SimpleByteCache {
return getInstance(size).acquire();
}
/**
/**
* Get the next available array, either from the cache or a brand new one
*/
private byte[] acquire() {
byte[] rv = _available.poll();
if (rv == null)
rv = new byte[_entrySize];
return rv;
return _available.acquire();
}
/**
@ -126,7 +111,7 @@ public final class SimpleByteCache {
return;
// should be safe without this
//Arrays.fill(entry, (byte) 0);
_available.offer(entry);
_available.release(entry);
}
/**

View File

@ -11,6 +11,7 @@ import java.util.concurrent.locks.ReentrantLock;
* @author zab
*
* @param <T>
* @since 0.9.36
*/
public class TryCache<T> {
@ -24,9 +25,10 @@ public class TryCache<T> {
}
private final ObjectFactory<T> factory;
private final int capacity;
private final List<T> items;
private final Lock lock = new ReentrantLock();
protected final int capacity;
protected final List<T> items;
protected final Lock lock = new ReentrantLock();
protected long _lastUnderflow;
/**
* @param factory to be used for creating new instances
@ -47,6 +49,8 @@ public class TryCache<T> {
try {
if (!items.isEmpty()) {
rv = items.remove(items.size() - 1);
} else {
_lastUnderflow = System.currentTimeMillis();
}
} finally {
lock.unlock();