* DecayingBloomFilter:
- Replace with new DecayingHashSet for 3 of 4 uses, and also in the 4th if the router is low-bandwidth. Saves 8 MB heap.
This commit is contained in:
@ -14,6 +14,10 @@ import org.xlattice.crypto.filters.BloomSHA1;
|
|||||||
* entries per second with virtually no false positive rate. Down the line,
|
* entries per second with virtually no false positive rate. Down the line,
|
||||||
* this may be refactored to allow tighter control of the size necessary for the
|
* this may be refactored to allow tighter control of the size necessary for the
|
||||||
* contained bloom filters, but a fixed 2MB overhead isn't that bad.
|
* contained bloom filters, but a fixed 2MB overhead isn't that bad.
|
||||||
|
*
|
||||||
|
* NOTE: At 1MBps, the tunnel IVV will see an unacceptable false positive rate
|
||||||
|
* of almost 0.1% with the current m and k values; however using DHS instead will use 30MB.
|
||||||
|
* Further analysis and tweaking for the tunnel IVV may be required.
|
||||||
*/
|
*/
|
||||||
public class DecayingBloomFilter {
|
public class DecayingBloomFilter {
|
||||||
private I2PAppContext _context;
|
private I2PAppContext _context;
|
||||||
@ -26,13 +30,18 @@ public class DecayingBloomFilter {
|
|||||||
private byte _extended[];
|
private byte _extended[];
|
||||||
private byte _longToEntry[];
|
private byte _longToEntry[];
|
||||||
private long _longToEntryMask;
|
private long _longToEntryMask;
|
||||||
private long _currentDuplicates;
|
protected long _currentDuplicates;
|
||||||
private boolean _keepDecaying;
|
private boolean _keepDecaying;
|
||||||
private DecayEvent _decayEvent;
|
private DecayEvent _decayEvent;
|
||||||
|
/** just for logging */
|
||||||
|
private String _name;
|
||||||
|
|
||||||
private static final int DEFAULT_M = 23;
|
private static final int DEFAULT_M = 23;
|
||||||
private static final boolean ALWAYS_MISS = false;
|
private static final boolean ALWAYS_MISS = false;
|
||||||
|
|
||||||
|
/** noop for DHS */
|
||||||
|
public DecayingBloomFilter() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a bloom filter that will decay its entries over time.
|
* Create a bloom filter that will decay its entries over time.
|
||||||
*
|
*
|
||||||
@ -42,9 +51,15 @@ public class DecayingBloomFilter {
|
|||||||
* against with sufficient random values.
|
* against with sufficient random values.
|
||||||
*/
|
*/
|
||||||
public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes) {
|
public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes) {
|
||||||
|
this(context, durationMs, entryBytes, "DBF");
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @param name just for logging / debugging / stats */
|
||||||
|
public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes, String name) {
|
||||||
_context = context;
|
_context = context;
|
||||||
_log = context.logManager().getLog(DecayingBloomFilter.class);
|
_log = context.logManager().getLog(DecayingBloomFilter.class);
|
||||||
_entryBytes = entryBytes;
|
_entryBytes = entryBytes;
|
||||||
|
_name = name;
|
||||||
// this is instantiated in four different places, they may have different
|
// this is instantiated in four different places, they may have different
|
||||||
// requirements, but for now use this as a gross method of memory reduction.
|
// requirements, but for now use this as a gross method of memory reduction.
|
||||||
// m == 23 => 1MB each BloomSHA1 (4 pairs = 8MB total)
|
// m == 23 => 1MB each BloomSHA1 (4 pairs = 8MB total)
|
||||||
@ -67,6 +82,17 @@ public class DecayingBloomFilter {
|
|||||||
_decayEvent = new DecayEvent();
|
_decayEvent = new DecayEvent();
|
||||||
_keepDecaying = true;
|
_keepDecaying = true;
|
||||||
SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
|
SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn("New DBF " + name + " m = " + m + " entryBytes = " + entryBytes +
|
||||||
|
" numExtenders = " + numExtenders + " cycle (s) = " + (durationMs / 1000));
|
||||||
|
// try to get a handle on memory usage vs. false positives
|
||||||
|
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".size",
|
||||||
|
"Size", "Router", new long[] { Math.max(60*1000, durationMs) });
|
||||||
|
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".dups",
|
||||||
|
"1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) });
|
||||||
|
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".log10(falsePos)",
|
||||||
|
"log10 of the false positive rate (must have net.i2p.util.DecayingBloomFilter=DEBUG)",
|
||||||
|
"Router", new long[] { Math.max(60*1000, durationMs) });
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getCurrentDuplicateCount() { return _currentDuplicates; }
|
public long getCurrentDuplicateCount() { return _currentDuplicates; }
|
||||||
@ -196,9 +222,12 @@ public class DecayingBloomFilter {
|
|||||||
private void decay() {
|
private void decay() {
|
||||||
int currentCount = 0;
|
int currentCount = 0;
|
||||||
long dups = 0;
|
long dups = 0;
|
||||||
|
double fpr = 0d;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
BloomSHA1 tmp = _previous;
|
BloomSHA1 tmp = _previous;
|
||||||
currentCount = _current.size();
|
currentCount = _current.size();
|
||||||
|
if (_log.shouldLog(Log.DEBUG) && currentCount > 0)
|
||||||
|
fpr = _current.falsePositives();
|
||||||
_previous = _current;
|
_previous = _current;
|
||||||
_current = tmp;
|
_current = tmp;
|
||||||
_current.clear();
|
_current.clear();
|
||||||
@ -206,8 +235,19 @@ public class DecayingBloomFilter {
|
|||||||
_currentDuplicates = 0;
|
_currentDuplicates = 0;
|
||||||
}
|
}
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Decaying the filter after inserting " + currentCount
|
_log.debug("Decaying the filter " + _name + " after inserting " + currentCount
|
||||||
+ " elements and " + dups + " false positives");
|
+ " elements and " + dups + " false positives with FPR = " + fpr);
|
||||||
|
_context.statManager().addRateData("router.decayingBloomFilter." + _name + ".size",
|
||||||
|
currentCount, 0);
|
||||||
|
if (currentCount > 0)
|
||||||
|
_context.statManager().addRateData("router.decayingBloomFilter." + _name + ".dups",
|
||||||
|
1000l*1000*dups/currentCount, 0);
|
||||||
|
if (fpr > 0d) {
|
||||||
|
// only if log.shouldLog(Log.DEBUG) ...
|
||||||
|
long exponent = (long) Math.log10(fpr);
|
||||||
|
_context.statManager().addRateData("router.decayingBloomFilter." + _name + ".log10(falsePos)",
|
||||||
|
exponent, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class DecayEvent implements SimpleTimer.TimedEvent {
|
private class DecayEvent implements SimpleTimer.TimedEvent {
|
||||||
@ -219,18 +259,27 @@ public class DecayingBloomFilter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Theoretical false positive rate for 16 KBps: 1.17E-21
|
||||||
|
* Theoretical false positive rate for 24 KBps: 9.81E-20
|
||||||
|
* Theoretical false positive rate for 32 KBps: 2.24E-18
|
||||||
|
* Theoretical false positive rate for 256 KBps: 7.45E-9
|
||||||
|
* Theoretical false positive rate for 512 KBps: 5.32E-6
|
||||||
|
* Theoretical false positive rate for 1024 KBps: 1.48E-3
|
||||||
|
*/
|
||||||
public static void main(String args[]) {
|
public static void main(String args[]) {
|
||||||
int kbps = 256;
|
int kbps = 256;
|
||||||
int iterations = 100;
|
int iterations = 10;
|
||||||
testByLong(kbps, iterations);
|
testByLong(kbps, iterations);
|
||||||
testByBytes(kbps, iterations);
|
testByBytes(kbps, iterations);
|
||||||
}
|
}
|
||||||
public static void testByLong(int kbps, int numRuns) {
|
private static void testByLong(int kbps, int numRuns) {
|
||||||
int messages = 60 * 10 * kbps;
|
int messages = 60 * 10 * kbps;
|
||||||
Random r = new Random();
|
Random r = new Random();
|
||||||
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 8);
|
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 8);
|
||||||
int falsePositives = 0;
|
int falsePositives = 0;
|
||||||
long totalTime = 0;
|
long totalTime = 0;
|
||||||
|
double fpr = 0d;
|
||||||
for (int j = 0; j < numRuns; j++) {
|
for (int j = 0; j < numRuns; j++) {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
for (int i = 0; i < messages; i++) {
|
for (int i = 0; i < messages; i++) {
|
||||||
@ -240,15 +289,17 @@ public class DecayingBloomFilter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
totalTime += System.currentTimeMillis() - start;
|
totalTime += System.currentTimeMillis() - start;
|
||||||
|
fpr = filter.getFalsePositiveRate();
|
||||||
filter.clear();
|
filter.clear();
|
||||||
}
|
}
|
||||||
filter.stopDecaying();
|
filter.stopDecaying();
|
||||||
|
System.out.println("False postive rate should be " + fpr);
|
||||||
System.out.println("After " + numRuns + " runs pushing " + messages + " entries in "
|
System.out.println("After " + numRuns + " runs pushing " + messages + " entries in "
|
||||||
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
|
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
|
||||||
+ falsePositives + " false positives");
|
+ falsePositives + " false positives");
|
||||||
|
|
||||||
}
|
}
|
||||||
public static void testByBytes(int kbps, int numRuns) {
|
private static void testByBytes(int kbps, int numRuns) {
|
||||||
byte iv[][] = new byte[60*10*kbps][16];
|
byte iv[][] = new byte[60*10*kbps][16];
|
||||||
Random r = new Random();
|
Random r = new Random();
|
||||||
for (int i = 0; i < iv.length; i++)
|
for (int i = 0; i < iv.length; i++)
|
||||||
@ -257,18 +308,21 @@ public class DecayingBloomFilter {
|
|||||||
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 16);
|
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 16);
|
||||||
int falsePositives = 0;
|
int falsePositives = 0;
|
||||||
long totalTime = 0;
|
long totalTime = 0;
|
||||||
|
double fpr = 0d;
|
||||||
for (int j = 0; j < numRuns; j++) {
|
for (int j = 0; j < numRuns; j++) {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
for (int i = 0; i < iv.length; i++) {
|
for (int i = 0; i < iv.length; i++) {
|
||||||
if (filter.add(iv[i])) {
|
if (filter.add(iv[i])) {
|
||||||
falsePositives++;
|
falsePositives++;
|
||||||
System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")");
|
System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
totalTime += System.currentTimeMillis() - start;
|
totalTime += System.currentTimeMillis() - start;
|
||||||
|
fpr = filter.getFalsePositiveRate();
|
||||||
filter.clear();
|
filter.clear();
|
||||||
}
|
}
|
||||||
filter.stopDecaying();
|
filter.stopDecaying();
|
||||||
|
System.out.println("False postive rate should be " + fpr);
|
||||||
System.out.println("After " + numRuns + " runs pushing " + iv.length + " entries in "
|
System.out.println("After " + numRuns + " runs pushing " + iv.length + " entries in "
|
||||||
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
|
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
|
||||||
+ falsePositives + " false positives");
|
+ falsePositives + " false positives");
|
||||||
|
380
core/java/src/net/i2p/util/DecayingHashSet.java
Normal file
380
core/java/src/net/i2p/util/DecayingHashSet.java
Normal file
@ -0,0 +1,380 @@
|
|||||||
|
package net.i2p.util;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import net.i2p.I2PAppContext;
|
||||||
|
import net.i2p.data.DataHelper;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Double buffered hash set.
|
||||||
|
* Since DecayingBloomFilter was instantiated 4 times for a total memory usage
|
||||||
|
* of 8MB, it seemed like we could do a lot better, given these usage stats
|
||||||
|
* on a class L router:
|
||||||
|
*
|
||||||
|
* ./router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java:
|
||||||
|
* 32 bytes, peak 10 entries in 1m
|
||||||
|
*
|
||||||
|
* ./router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java:
|
||||||
|
* 4 bytes, peak 150 entries in 10s
|
||||||
|
*
|
||||||
|
* ./router/java/src/net/i2p/router/MessageValidator.java:
|
||||||
|
* 8 bytes, peak 1K entries in 2m
|
||||||
|
*
|
||||||
|
* ./router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java:
|
||||||
|
* 16 bytes, peak 15K entries in 10m
|
||||||
|
*
|
||||||
|
* If the ArrayWrapper object in the HashSet is 50 bytes, and BloomSHA1(23, 11) is 1MB,
|
||||||
|
* then for less than 20K entries this is smaller.
|
||||||
|
* And this uses space proportional to traffiic, so it doesn't penalize small routers
|
||||||
|
* with a fixed 8MB.
|
||||||
|
* So let's try it for the first 2 or 3, for now.
|
||||||
|
*
|
||||||
|
* Also, DBF is syncrhonized, and uses SimpleTimer.
|
||||||
|
* Here we use a read/write lock, with synchronization only
|
||||||
|
* when switching double buffers, and we use SimpleScheduler.
|
||||||
|
*
|
||||||
|
* Yes, we could stare at stats all day, and try to calculate an acceptable
|
||||||
|
* false-positive rate for each of the above uses, then estimate the DBF size
|
||||||
|
* required to meet that rate for a given usage. Or even start adjusting the
|
||||||
|
* Bloom filter m and k values on a per-DBF basis. But it's a whole lot easier
|
||||||
|
* to implement something with a zero false positive rate, and uses less memory
|
||||||
|
* for almost all bandwidth classes.
|
||||||
|
*
|
||||||
|
* This has a strictly zero false positive rate for <= 8 byte keys.
|
||||||
|
* For larger keys, it is 1 / (2**64) ~= 5E-20, which is better than
|
||||||
|
* DBF for any entry count greater than about 14K.
|
||||||
|
*
|
||||||
|
* DBF has a zero false negative rate over the period
|
||||||
|
* 2 * durationMs. And a 100% false negative rate beyond that period.
|
||||||
|
* This has the same properties.
|
||||||
|
*
|
||||||
|
* This performs about twice as fast as DBF in the test below.
|
||||||
|
*
|
||||||
|
* @author zzz
|
||||||
|
*/
|
||||||
|
public class DecayingHashSet extends DecayingBloomFilter {
|
||||||
|
private final I2PAppContext _context;
|
||||||
|
private final Log _log;
|
||||||
|
private ConcurrentHashSet<ArrayWrapper> _current;
|
||||||
|
private ConcurrentHashSet<ArrayWrapper> _previous;
|
||||||
|
private int _durationMs;
|
||||||
|
private int _entryBytes;
|
||||||
|
private volatile boolean _keepDecaying;
|
||||||
|
private final DecayEvent _decayEvent;
|
||||||
|
/** just for logging */
|
||||||
|
private final String _name;
|
||||||
|
/** synchronize against this lock when switching double buffers */
|
||||||
|
private final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock(true);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a double-buffered hash set that will decay its entries over time.
|
||||||
|
*
|
||||||
|
* @param durationMs entries last for at least this long, but no more than twice this long
|
||||||
|
* @param entryBytes how large are the entries to be added? 1 to 32 bytes
|
||||||
|
*/
|
||||||
|
public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes) {
|
||||||
|
this(context, durationMs, entryBytes, "DHS");
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @param name just for logging / debugging / stats */
|
||||||
|
public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes, String name) {
|
||||||
|
if (entryBytes <= 0 || entryBytes > 32)
|
||||||
|
throw new IllegalArgumentException("Bad size");
|
||||||
|
_context = context;
|
||||||
|
_log = context.logManager().getLog(DecayingHashSet.class);
|
||||||
|
_entryBytes = entryBytes;
|
||||||
|
_name = name;
|
||||||
|
_current = new ConcurrentHashSet(128);
|
||||||
|
_previous = new ConcurrentHashSet(128);
|
||||||
|
_durationMs = durationMs;
|
||||||
|
_currentDuplicates = 0;
|
||||||
|
_decayEvent = new DecayEvent();
|
||||||
|
_keepDecaying = true;
|
||||||
|
SimpleScheduler.getInstance().addEvent(_decayEvent, _durationMs);
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn("New DHS " + name + " entryBytes = " + entryBytes +
|
||||||
|
" cycle (s) = " + (durationMs / 1000));
|
||||||
|
// try to get a handle on memory usage vs. false positives
|
||||||
|
context.statManager().createRateStat("router.decayingHashSet." + name + ".size",
|
||||||
|
"Size", "Router", new long[] { Math.max(60*1000, durationMs) });
|
||||||
|
context.statManager().createRateStat("router.decayingHashSet." + name + ".dups",
|
||||||
|
"1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) });
|
||||||
|
}
|
||||||
|
|
||||||
|
/** unsynchronized but only used for logging elsewhere */
|
||||||
|
@Override
|
||||||
|
public int getInsertedCount() {
|
||||||
|
return _current.size() + _previous.size();
|
||||||
|
}
|
||||||
|
/** pointless, only used for logging elsewhere */
|
||||||
|
@Override
|
||||||
|
public double getFalsePositiveRate() {
|
||||||
|
if (_entryBytes <= 8)
|
||||||
|
return 0d;
|
||||||
|
return 1d / Math.pow(2d, 64d); // 5.4E-20
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the entry added is a duplicate
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean add(byte entry[], int off, int len) {
|
||||||
|
if (entry == null)
|
||||||
|
throw new IllegalArgumentException("Null entry");
|
||||||
|
if (len != _entryBytes)
|
||||||
|
throw new IllegalArgumentException("Bad entry [" + len + ", expected "
|
||||||
|
+ _entryBytes + "]");
|
||||||
|
getReadLock();
|
||||||
|
try {
|
||||||
|
return locked_add(entry, off, len, true);
|
||||||
|
} finally { releaseReadLock(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the entry added is a duplicate. the number of low order
|
||||||
|
* bits used is determined by the entryBytes parameter used on creation of the
|
||||||
|
* filter.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean add(long entry) {
|
||||||
|
return add(entry, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the entry is already known. this does NOT add the
|
||||||
|
* entry however.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean isKnown(long entry) {
|
||||||
|
return add(entry, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean add(long entry, boolean addIfNew) {
|
||||||
|
int len = Math.min(8, _entryBytes);
|
||||||
|
byte[] b = toLong(len, entry);
|
||||||
|
getReadLock();
|
||||||
|
try {
|
||||||
|
return locked_add(b, 0, len, addIfNew);
|
||||||
|
} finally { releaseReadLock(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
/** from DataHelper, except negative values ok */
|
||||||
|
private static byte[] toLong(int numBytes, long value) {
|
||||||
|
byte target[] = new byte[numBytes];
|
||||||
|
for (int i = 0; i < numBytes; i++)
|
||||||
|
target[numBytes-i-1] = (byte)(value >>> (i*8));
|
||||||
|
return target;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** so many questions... */
|
||||||
|
private boolean locked_add(byte entry[], int offset, int len, boolean addIfNew) {
|
||||||
|
ArrayWrapper w = new ArrayWrapper(entry, offset, len);
|
||||||
|
boolean seen = _current.contains(w);
|
||||||
|
seen = seen || _previous.contains(w);
|
||||||
|
if (seen) {
|
||||||
|
// why increment if addIfNew == false?
|
||||||
|
// why not add to current if only in previous?
|
||||||
|
_currentDuplicates++;
|
||||||
|
} else if (addIfNew) {
|
||||||
|
_current.add(w);
|
||||||
|
// why add to previous?
|
||||||
|
_previous.add(w);
|
||||||
|
}
|
||||||
|
return seen;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
_current.clear();
|
||||||
|
_previous.clear();
|
||||||
|
_currentDuplicates = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** super doesn't call clear, but neither do the users, so it seems like we should here */
|
||||||
|
@Override
|
||||||
|
public void stopDecaying() {
|
||||||
|
_keepDecaying = false;
|
||||||
|
clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void decay() {
|
||||||
|
int currentCount = 0;
|
||||||
|
long dups = 0;
|
||||||
|
if (!getWriteLock())
|
||||||
|
return;
|
||||||
|
try {
|
||||||
|
ConcurrentHashSet<ArrayWrapper> tmp = _previous;
|
||||||
|
currentCount = _current.size();
|
||||||
|
_previous = _current;
|
||||||
|
_current = tmp;
|
||||||
|
_current.clear();
|
||||||
|
dups = _currentDuplicates;
|
||||||
|
_currentDuplicates = 0;
|
||||||
|
} finally { releaseWriteLock(); }
|
||||||
|
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Decaying the filter " + _name + " after inserting " + currentCount
|
||||||
|
+ " elements and " + dups + " false positives");
|
||||||
|
_context.statManager().addRateData("router.decayingHashSet." + _name + ".size",
|
||||||
|
currentCount, 0);
|
||||||
|
if (currentCount > 0)
|
||||||
|
_context.statManager().addRateData("router.decayingHashSet." + _name + ".dups",
|
||||||
|
1000l*1000*dups/currentCount, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** if decay() ever blows up, we won't reschedule, and will grow unbounded, but it seems unlikely */
|
||||||
|
private class DecayEvent implements SimpleTimer.TimedEvent {
|
||||||
|
public void timeReached() {
|
||||||
|
if (_keepDecaying) {
|
||||||
|
decay();
|
||||||
|
SimpleScheduler.getInstance().addEvent(DecayEvent.this, _durationMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getReadLock() {
|
||||||
|
_reorganizeLock.readLock().lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void releaseReadLock() {
|
||||||
|
_reorganizeLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @return true if the lock was acquired */
|
||||||
|
private boolean getWriteLock() {
|
||||||
|
try {
|
||||||
|
boolean rv = _reorganizeLock.writeLock().tryLock(5000, TimeUnit.MILLISECONDS);
|
||||||
|
if (!rv)
|
||||||
|
_log.error("no lock, size is: " + _reorganizeLock.getQueueLength(), new Exception("rats"));
|
||||||
|
return rv;
|
||||||
|
} catch (InterruptedException ie) {}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void releaseWriteLock() {
|
||||||
|
_reorganizeLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This saves the data as-is if the length is <= 8 bytes,
|
||||||
|
* otherwise it stores an 8-byte hash.
|
||||||
|
* Hash function is from DataHelper, modded to get
|
||||||
|
* the maximum entropy given the length of the data.
|
||||||
|
*/
|
||||||
|
private static class ArrayWrapper {
|
||||||
|
private long _longhashcode;
|
||||||
|
public ArrayWrapper(byte[] b, int offset, int len) {
|
||||||
|
int idx = offset;
|
||||||
|
int shift = Math.min(8, 64 / len);
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
// xor better than + in tests
|
||||||
|
_longhashcode ^= (((long) b[idx++]) << (i * shift));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int hashCode() {
|
||||||
|
return (int) _longhashcode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long longHashCode() {
|
||||||
|
return _longhashcode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (o == null || !(o instanceof ArrayWrapper))
|
||||||
|
return false;
|
||||||
|
return ((ArrayWrapper) o).longHashCode() == _longhashcode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* vs. DBF, this measures 1.93x faster for testByLong and 2.46x faster for testByBytes.
|
||||||
|
*/
|
||||||
|
public static void main(String args[]) {
|
||||||
|
/** KBytes per sec, 1 message per KByte */
|
||||||
|
int kbps = 256;
|
||||||
|
int iterations = 10;
|
||||||
|
//testSize();
|
||||||
|
testByLong(kbps, iterations);
|
||||||
|
testByBytes(kbps, iterations);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** and the answer is: 49.9 bytes. The ArrayWrapper alone measured 16, so that's 34 for the HashSet entry. */
|
||||||
|
/*****
|
||||||
|
private static void testSize() {
|
||||||
|
int qty = 256*1024;
|
||||||
|
byte b[] = new byte[8];
|
||||||
|
Random r = new Random();
|
||||||
|
long old = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
|
||||||
|
ConcurrentHashSet foo = new ConcurrentHashSet(qty);
|
||||||
|
for (int i = 0; i < qty; i++) {
|
||||||
|
r.nextBytes(b);
|
||||||
|
foo.add(new ArrayWrapper(b, 0, 8));
|
||||||
|
}
|
||||||
|
long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
|
||||||
|
System.out.println("Memory per ArrayWrapper: " + (((double) (used - old)) / qty));
|
||||||
|
}
|
||||||
|
*****/
|
||||||
|
|
||||||
|
/** 8 bytes, simulate the router message validator */
|
||||||
|
private static void testByLong(int kbps, int numRuns) {
|
||||||
|
int messages = 60 * 10 * kbps;
|
||||||
|
Random r = new Random();
|
||||||
|
DecayingBloomFilter filter = new DecayingHashSet(I2PAppContext.getGlobalContext(), 600*1000, 8);
|
||||||
|
int falsePositives = 0;
|
||||||
|
long totalTime = 0;
|
||||||
|
for (int j = 0; j < numRuns; j++) {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
for (int i = 0; i < messages; i++) {
|
||||||
|
if (filter.add(r.nextLong())) {
|
||||||
|
falsePositives++;
|
||||||
|
System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
totalTime += System.currentTimeMillis() - start;
|
||||||
|
filter.clear();
|
||||||
|
}
|
||||||
|
System.out.println("False postive rate should be " + filter.getFalsePositiveRate());
|
||||||
|
filter.stopDecaying();
|
||||||
|
System.out.println("After " + numRuns + " runs pushing " + messages + " entries in "
|
||||||
|
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
|
||||||
|
+ falsePositives + " false positives");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/** 16 bytes, simulate the tunnel IV validator */
|
||||||
|
private static void testByBytes(int kbps, int numRuns) {
|
||||||
|
byte iv[][] = new byte[60*10*kbps][16];
|
||||||
|
Random r = new Random();
|
||||||
|
for (int i = 0; i < iv.length; i++)
|
||||||
|
r.nextBytes(iv[i]);
|
||||||
|
|
||||||
|
DecayingBloomFilter filter = new DecayingHashSet(I2PAppContext.getGlobalContext(), 600*1000, 16);
|
||||||
|
int falsePositives = 0;
|
||||||
|
long totalTime = 0;
|
||||||
|
for (int j = 0; j < numRuns; j++) {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
for (int i = 0; i < iv.length; i++) {
|
||||||
|
if (filter.add(iv[i])) {
|
||||||
|
falsePositives++;
|
||||||
|
System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
totalTime += System.currentTimeMillis() - start;
|
||||||
|
filter.clear();
|
||||||
|
}
|
||||||
|
System.out.println("False postive rate should be " + filter.getFalsePositiveRate());
|
||||||
|
filter.stopDecaying();
|
||||||
|
System.out.println("After " + numRuns + " runs pushing " + iv.length + " entries in "
|
||||||
|
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
|
||||||
|
+ falsePositives + " false positives");
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,7 @@
|
|||||||
package net.i2p.router;
|
package net.i2p.router;
|
||||||
|
|
||||||
import net.i2p.util.DecayingBloomFilter;
|
import net.i2p.util.DecayingBloomFilter;
|
||||||
|
import net.i2p.util.DecayingHashSet;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -95,7 +96,7 @@ public class MessageValidator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void startup() {
|
public void startup() {
|
||||||
_filter = new DecayingBloomFilter(_context, (int)Router.CLOCK_FUDGE_FACTOR * 2, 8);
|
_filter = new DecayingHashSet(_context, (int)Router.CLOCK_FUDGE_FACTOR * 2, 8, "RouterMV");
|
||||||
}
|
}
|
||||||
|
|
||||||
void shutdown() {
|
void shutdown() {
|
||||||
|
@ -5,6 +5,7 @@ import java.util.Map;
|
|||||||
import net.i2p.data.Hash;
|
import net.i2p.data.Hash;
|
||||||
import net.i2p.router.RouterContext;
|
import net.i2p.router.RouterContext;
|
||||||
import net.i2p.util.DecayingBloomFilter;
|
import net.i2p.util.DecayingBloomFilter;
|
||||||
|
import net.i2p.util.DecayingHashSet;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -52,7 +53,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
|
|||||||
// may want to extend the DecayingBloomFilter so we can use a smaller
|
// may want to extend the DecayingBloomFilter so we can use a smaller
|
||||||
// array size (currently its tuned for 10 minute rates for the
|
// array size (currently its tuned for 10 minute rates for the
|
||||||
// messageValidator)
|
// messageValidator)
|
||||||
_recentlyCompletedMessages = new DecayingBloomFilter(_context, DECAY_PERIOD, 4);
|
_recentlyCompletedMessages = new DecayingHashSet(_context, DECAY_PERIOD, 4, "UDPIMF");
|
||||||
_ackSender.startup();
|
_ackSender.startup();
|
||||||
_messageReceiver.startup();
|
_messageReceiver.startup();
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,11 @@
|
|||||||
package net.i2p.router.tunnel;
|
package net.i2p.router.tunnel;
|
||||||
|
|
||||||
import net.i2p.I2PAppContext;
|
|
||||||
import net.i2p.data.ByteArray;
|
import net.i2p.data.ByteArray;
|
||||||
import net.i2p.data.DataHelper;
|
import net.i2p.data.DataHelper;
|
||||||
|
import net.i2p.router.RouterContext;
|
||||||
import net.i2p.util.ByteCache;
|
import net.i2p.util.ByteCache;
|
||||||
import net.i2p.util.DecayingBloomFilter;
|
import net.i2p.util.DecayingBloomFilter;
|
||||||
|
import net.i2p.util.DecayingHashSet;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manage the IV validation for all of the router's tunnels by way of a big
|
* Manage the IV validation for all of the router's tunnels by way of a big
|
||||||
@ -12,7 +13,7 @@ import net.i2p.util.DecayingBloomFilter;
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class BloomFilterIVValidator implements IVValidator {
|
public class BloomFilterIVValidator implements IVValidator {
|
||||||
private I2PAppContext _context;
|
private RouterContext _context;
|
||||||
private DecayingBloomFilter _filter;
|
private DecayingBloomFilter _filter;
|
||||||
private ByteCache _ivXorCache = ByteCache.getInstance(32, HopProcessor.IV_LENGTH);
|
private ByteCache _ivXorCache = ByteCache.getInstance(32, HopProcessor.IV_LENGTH);
|
||||||
|
|
||||||
@ -23,9 +24,17 @@ public class BloomFilterIVValidator implements IVValidator {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private static final int HALFLIFE_MS = 10*60*1000;
|
private static final int HALFLIFE_MS = 10*60*1000;
|
||||||
public BloomFilterIVValidator(I2PAppContext ctx, int KBps) {
|
private static final int MIN_SHARE_KBPS_TO_USE_BLOOM = 64;
|
||||||
|
|
||||||
|
public BloomFilterIVValidator(RouterContext ctx, int KBps) {
|
||||||
_context = ctx;
|
_context = ctx;
|
||||||
_filter = new DecayingBloomFilter(ctx, HALFLIFE_MS, 16);
|
// Select the filter based on share bandwidth.
|
||||||
|
// Note that at rates approaching 1MB, we need to do something else,
|
||||||
|
// as the Bloom filter false positive rates approach 0.1%. FIXME
|
||||||
|
if (getShareBandwidth(ctx) < MIN_SHARE_KBPS_TO_USE_BLOOM)
|
||||||
|
_filter = new DecayingHashSet(ctx, HALFLIFE_MS, 16, "TunnelIVV"); // appx. 4MB max
|
||||||
|
else
|
||||||
|
_filter = new DecayingBloomFilter(ctx, HALFLIFE_MS, 16, "TunnelIVV"); // 2MB fixed
|
||||||
ctx.statManager().createRateStat("tunnel.duplicateIV", "Note that a duplicate IV was received", "Tunnels",
|
ctx.statManager().createRateStat("tunnel.duplicateIV", "Note that a duplicate IV was received", "Tunnels",
|
||||||
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
||||||
}
|
}
|
||||||
@ -39,4 +48,11 @@ public class BloomFilterIVValidator implements IVValidator {
|
|||||||
return !dup; // return true if it is OK, false if it isn't
|
return !dup; // return true if it is OK, false if it isn't
|
||||||
}
|
}
|
||||||
public void destroy() { _filter.stopDecaying(); }
|
public void destroy() { _filter.stopDecaying(); }
|
||||||
|
|
||||||
|
private static int getShareBandwidth(RouterContext ctx) {
|
||||||
|
int irateKBps = ctx.bandwidthLimiter().getInboundKBytesPerSecond();
|
||||||
|
int orateKBps = ctx.bandwidthLimiter().getOutboundKBytesPerSecond();
|
||||||
|
double pct = ctx.router().getSharePercentage();
|
||||||
|
return (int) (pct * Math.min(irateKBps, orateKBps));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import net.i2p.data.SessionKey;
|
|||||||
import net.i2p.data.i2np.BuildRequestRecord;
|
import net.i2p.data.i2np.BuildRequestRecord;
|
||||||
import net.i2p.data.i2np.TunnelBuildMessage;
|
import net.i2p.data.i2np.TunnelBuildMessage;
|
||||||
import net.i2p.util.DecayingBloomFilter;
|
import net.i2p.util.DecayingBloomFilter;
|
||||||
|
import net.i2p.util.DecayingHashSet;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -22,7 +23,7 @@ public class BuildMessageProcessor {
|
|||||||
private DecayingBloomFilter _filter;
|
private DecayingBloomFilter _filter;
|
||||||
|
|
||||||
public BuildMessageProcessor(I2PAppContext ctx) {
|
public BuildMessageProcessor(I2PAppContext ctx) {
|
||||||
_filter = new DecayingBloomFilter(ctx, 60*1000, 32);
|
_filter = new DecayingHashSet(ctx, 60*1000, 32, "TunnelBMP");
|
||||||
ctx.statManager().createRateStat("tunnel.buildRequestDup", "How frequently we get dup build request messages", "Tunnels", new long[] { 60*1000, 10*60*1000 });
|
ctx.statManager().createRateStat("tunnel.buildRequestDup", "How frequently we get dup build request messages", "Tunnels", new long[] { 60*1000, 10*60*1000 });
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user