forked from I2P_Developers/i2p.i2p
Tunnels: Fix build request Bloom filter (ticket #1746)
Change from 60s DHS to 60m DBF Use reply key as filter key, not first part of encrypted data, to match the specs and hopefully reduce dups BuildMessageProcessor cleanups log and stat tweaks remove deprecated methods remove some timing measurements javadocs
This commit is contained in:
@ -11,7 +11,9 @@ package net.i2p.data;
|
||||
|
||||
/**
|
||||
* Defines the SessionKey as defined by the I2P data structure spec.
|
||||
* A session key is 32byte Integer.
|
||||
* A session key is a 32 byte Integer.
|
||||
*
|
||||
* To create one with random data, use I2PAppContext.keyGenerator().generateSessionKey().
|
||||
*
|
||||
* @author jrandom
|
||||
*/
|
||||
@ -19,6 +21,7 @@ public class SessionKey extends SimpleDataStructure {
|
||||
private Object _preparedKey;
|
||||
|
||||
public final static int KEYSIZE_BYTES = 32;
|
||||
/** A key with all zeroes in the data */
|
||||
public static final SessionKey INVALID_KEY = new SessionKey(new byte[KEYSIZE_BYTES]);
|
||||
|
||||
public SessionKey() {
|
||||
|
@ -1,3 +1,6 @@
|
||||
2016-01-10 zzz
|
||||
* Tunnels: Fix build request Bloom filter (ticket #1746)
|
||||
|
||||
2016-01-07 zzz
|
||||
* Console: Fixed summary bar overflow (ticket #1739)
|
||||
|
||||
|
@ -1,6 +1,9 @@
|
||||
package net.i2p.data.i2np;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.Base64;
|
||||
import net.i2p.data.ByteArray;
|
||||
import net.i2p.data.DataFormatException;
|
||||
import net.i2p.data.DataHelper;
|
||||
@ -68,7 +71,7 @@ public class BuildRequestRecord {
|
||||
private static final int OFF_SEND_IDENT = OFF_SEND_TUNNEL + 4;
|
||||
private static final int OFF_LAYER_KEY = OFF_SEND_IDENT + Hash.HASH_LENGTH;
|
||||
private static final int OFF_IV_KEY = OFF_LAYER_KEY + SessionKey.KEYSIZE_BYTES;
|
||||
private static final int OFF_REPLY_KEY = OFF_IV_KEY + SessionKey.KEYSIZE_BYTES;
|
||||
public static final int OFF_REPLY_KEY = OFF_IV_KEY + SessionKey.KEYSIZE_BYTES;
|
||||
private static final int OFF_REPLY_IV = OFF_REPLY_KEY + SessionKey.KEYSIZE_BYTES;
|
||||
private static final int OFF_FLAG = OFF_REPLY_IV + IV_SIZE;
|
||||
private static final int OFF_REQ_TIME = OFF_FLAG + 1;
|
||||
@ -156,7 +159,8 @@ public class BuildRequestRecord {
|
||||
}
|
||||
|
||||
/**
|
||||
* Time that the request was sent (ms), truncated to the nearest hour
|
||||
* Time that the request was sent (ms), truncated to the nearest hour.
|
||||
* This ignores leap seconds.
|
||||
*/
|
||||
public long readRequestTime() {
|
||||
return DataHelper.fromLong(_data, OFF_REQ_TIME, 4) * (60 * 60 * 1000L);
|
||||
@ -263,6 +267,7 @@ public class BuildRequestRecord {
|
||||
long truncatedHour = ctx.clock().now();
|
||||
// prevent hop identification at top of the hour
|
||||
truncatedHour -= ctx.random().nextInt(90*1000);
|
||||
// this ignores leap seconds
|
||||
truncatedHour /= (60l*60l*1000l);
|
||||
DataHelper.toLong(buf, OFF_REQ_TIME, 4, truncatedHour);
|
||||
DataHelper.toLong(buf, OFF_SEND_MSG_ID, 4, nextMsgId);
|
||||
@ -272,4 +277,34 @@ public class BuildRequestRecord {
|
||||
if (!DataHelper.eq(iv, wroteIV))
|
||||
throw new RuntimeException("foo");
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.9.24
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder buf = new StringBuilder(256);
|
||||
buf.append("BRR ");
|
||||
boolean isIBGW = readIsInboundGateway();
|
||||
boolean isOBEP = readIsOutboundEndpoint();
|
||||
if (isIBGW) {
|
||||
buf.append("IBGW in: ").append(readReceiveTunnelId())
|
||||
.append(" out ").append(readNextTunnelId());
|
||||
} else if (isOBEP) {
|
||||
buf.append("OBEP in: ").append(readReceiveTunnelId());
|
||||
} else {
|
||||
buf.append("part. in: ").append(readReceiveTunnelId())
|
||||
.append(" out: ").append(readNextTunnelId());
|
||||
}
|
||||
buf.append(" to: ").append(readNextIdentity())
|
||||
.append(" layer key: ").append(readLayerKey())
|
||||
.append(" IV key: ").append(readIVKey())
|
||||
.append(" reply key: ").append(readReplyKey())
|
||||
.append(" reply IV: ").append(Base64.encode(readReplyIV()))
|
||||
.append(" hour: ").append(new Date(readRequestTime()))
|
||||
.append(" reply msg id: ").append(readReplyMessageId());
|
||||
// to chase i2pd bug
|
||||
//buf.append('\n').append(net.i2p.util.HexDump.dump(readReplyKey().getData()));
|
||||
return buf.toString();
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ import net.i2p.util.SimpleTimer;
|
||||
* requests if the jobQueue lag is too large.
|
||||
*
|
||||
*/
|
||||
class RouterThrottleImpl implements RouterThrottle {
|
||||
public class RouterThrottleImpl implements RouterThrottle {
|
||||
protected final RouterContext _context;
|
||||
private final Log _log;
|
||||
private volatile String _tunnelStatus;
|
||||
@ -28,8 +28,8 @@ class RouterThrottleImpl implements RouterThrottle {
|
||||
private static final long JOB_LAG_LIMIT_NETDB = 2*1000;
|
||||
// TODO reduce
|
||||
private static final long JOB_LAG_LIMIT_TUNNEL = 500;
|
||||
private static final String PROP_MAX_TUNNELS = "router.maxParticipatingTunnels";
|
||||
private static final int DEFAULT_MAX_TUNNELS = 10*1000;
|
||||
public static final String PROP_MAX_TUNNELS = "router.maxParticipatingTunnels";
|
||||
public static final int DEFAULT_MAX_TUNNELS = 10*1000;
|
||||
private static final String PROP_MAX_PROCESSINGTIME = "router.defaultProcessingTimeThrottle";
|
||||
private static final long DEFAULT_REJECT_STARTUP_TIME = 10*60*1000;
|
||||
private static final String PROP_REJECT_STARTUP_TIME = "router.rejectStartupTime";
|
||||
|
@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 21;
|
||||
public final static long BUILD = 22;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
@ -54,6 +54,9 @@ public abstract class BuildMessageGenerator {
|
||||
* containing the hop's configuration (as well as the reply info, if it is an outbound endpoint)
|
||||
*
|
||||
* @param msg out parameter
|
||||
* @param peerKey Encrypt using this key.
|
||||
* If null, replyRouter and replyTunnel are ignored,
|
||||
* and the entire record is filled with random data
|
||||
* @throws IllegalArgumentException if hop bigger than config
|
||||
*/
|
||||
public static void createRecord(int recordNum, int hop, TunnelBuildMessage msg,
|
||||
|
@ -10,23 +10,72 @@ import net.i2p.data.SessionKey;
|
||||
import net.i2p.data.i2np.BuildRequestRecord;
|
||||
import net.i2p.data.i2np.EncryptedBuildRecord;
|
||||
import net.i2p.data.i2np.TunnelBuildMessage;
|
||||
import net.i2p.router.RouterThrottleImpl;
|
||||
import net.i2p.router.util.DecayingBloomFilter;
|
||||
import net.i2p.router.util.DecayingHashSet;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SystemVersion;
|
||||
|
||||
/**
|
||||
* Receive the build message at a certain hop, decrypt its encrypted record,
|
||||
* read the enclosed tunnel request, decide how to reply, write the reply,
|
||||
* encrypt the reply record, and return a TunnelBuildMessage to forward on to
|
||||
* the next hop
|
||||
* the next hop.
|
||||
*
|
||||
* There is only one of these.
|
||||
* Instantiated by BuildHandler.
|
||||
*
|
||||
*/
|
||||
public class BuildMessageProcessor {
|
||||
private final I2PAppContext ctx;
|
||||
private final Log log;
|
||||
private final DecayingBloomFilter _filter;
|
||||
|
||||
public BuildMessageProcessor(I2PAppContext ctx) {
|
||||
_filter = new DecayingHashSet(ctx, 60*1000, 32, "TunnelBMP");
|
||||
this.ctx = ctx;
|
||||
log = ctx.logManager().getLog(getClass());
|
||||
_filter = selectFilter();
|
||||
// all createRateStat in TunnelDispatcher
|
||||
}
|
||||
|
||||
/**
|
||||
* For N typical part tunnels and rejecting 50%, that's 12N requests per hour.
|
||||
* This is the equivalent of (12N/600) KBps through the IVValidator filter.
|
||||
*
|
||||
* Target false positive rate is 1E-5 or lower
|
||||
*
|
||||
* @since 0.9.24
|
||||
*/
|
||||
private DecayingBloomFilter selectFilter() {
|
||||
long maxMemory = SystemVersion.getMaxMemory();
|
||||
int m;
|
||||
if (SystemVersion.isAndroid() || SystemVersion.isARM() || maxMemory < 96*1024*1024L) {
|
||||
// 32 KB
|
||||
// appx 500 part. tunnels or 6K req/hr
|
||||
m = 17;
|
||||
} else if (ctx.getProperty(RouterThrottleImpl.PROP_MAX_TUNNELS, RouterThrottleImpl.DEFAULT_MAX_TUNNELS) >
|
||||
RouterThrottleImpl.DEFAULT_MAX_TUNNELS && maxMemory > 256*1024*1024L) {
|
||||
// 2 MB
|
||||
// appx 20K part. tunnels or 240K req/hr
|
||||
m = 23;
|
||||
} else if (maxMemory > 256*1024*1024L) {
|
||||
// 1 MB
|
||||
// appx 10K part. tunnels or 120K req/hr
|
||||
m = 22;
|
||||
} else if (maxMemory > 128*1024*1024L) {
|
||||
// 512 KB
|
||||
// appx 5K part. tunnels or 60K req/hr
|
||||
m = 21;
|
||||
} else {
|
||||
// 128 KB
|
||||
// appx 2K part. tunnels or 24K req/hr
|
||||
m = 19;
|
||||
}
|
||||
if (log.shouldInfo())
|
||||
log.info("Selected Bloom filter m = " + m);
|
||||
return new DecayingBloomFilter(ctx, 60*60*1000, 32, "TunnelBMP", m);
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrypt the record targetting us, encrypting all of the other records with the included
|
||||
* reply key and IV. The original, encrypted record targetting us is removed from the request
|
||||
@ -38,65 +87,75 @@ public class BuildMessageProcessor {
|
||||
*
|
||||
* @return the current hop's decrypted record or null on failure
|
||||
*/
|
||||
public BuildRequestRecord decrypt(I2PAppContext ctx, TunnelBuildMessage msg, Hash ourHash, PrivateKey privKey) {
|
||||
Log log = ctx.logManager().getLog(getClass());
|
||||
public BuildRequestRecord decrypt(TunnelBuildMessage msg, Hash ourHash, PrivateKey privKey) {
|
||||
BuildRequestRecord rv = null;
|
||||
int ourHop = -1;
|
||||
long beforeActualDecrypt = 0;
|
||||
long afterActualDecrypt = 0;
|
||||
long totalEq = 0;
|
||||
long totalDup = 0;
|
||||
byte[] ourHashData = ourHash.getData();
|
||||
long beforeLoop = System.currentTimeMillis();
|
||||
for (int i = 0; i < msg.getRecordCount(); i++) {
|
||||
EncryptedBuildRecord rec = msg.getRecord(i);
|
||||
int len = BuildRequestRecord.PEER_SIZE;
|
||||
long beforeEq = System.currentTimeMillis();
|
||||
boolean eq = DataHelper.eq(ourHash.getData(), 0, rec.getData(), 0, len);
|
||||
totalEq += System.currentTimeMillis()-beforeEq;
|
||||
boolean eq = DataHelper.eq(ourHashData, 0, rec.getData(), 0, len);
|
||||
if (eq) {
|
||||
long beforeIsDup = System.currentTimeMillis();
|
||||
boolean isDup = _filter.add(rec.getData(), len, 32);
|
||||
totalDup += System.currentTimeMillis()-beforeIsDup;
|
||||
beforeActualDecrypt = System.currentTimeMillis();
|
||||
try {
|
||||
rv = new BuildRequestRecord(ctx, privKey, rec);
|
||||
afterActualDecrypt = System.currentTimeMillis();
|
||||
|
||||
// i2pd bug
|
||||
boolean isBad = SessionKey.INVALID_KEY.equals(rv.readReplyKey());
|
||||
if (isBad) {
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn(msg.getUniqueId() + ": Bad reply key: " + rv);
|
||||
ctx.statManager().addRateData("tunnel.buildRequestBadReplyKey", 1);
|
||||
return null;
|
||||
}
|
||||
|
||||
// The spec says to feed the 32-byte AES-256 reply key into the Bloom filter.
|
||||
// But we were using the first 32 bytes of the encrypted reply.
|
||||
// Fixed in 0.9.24
|
||||
boolean isDup = _filter.add(rv.getData(), BuildRequestRecord.OFF_REPLY_KEY, 32);
|
||||
if (isDup) {
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.debug(msg.getUniqueId() + ": A record matching our hash was found, but it seems to be a duplicate");
|
||||
log.warn(msg.getUniqueId() + ": Dup record: " + rv);
|
||||
ctx.statManager().addRateData("tunnel.buildRequestDup", 1);
|
||||
return null;
|
||||
}
|
||||
beforeActualDecrypt = System.currentTimeMillis();
|
||||
try {
|
||||
BuildRequestRecord req = new BuildRequestRecord(ctx, privKey, rec);
|
||||
|
||||
if (log.shouldLog(Log.DEBUG))
|
||||
log.debug(msg.getUniqueId() + ": A record matching our hash was found and decrypted");
|
||||
rv = req;
|
||||
} catch (DataFormatException dfe) {
|
||||
if (log.shouldLog(Log.DEBUG))
|
||||
log.debug(msg.getUniqueId() + ": A record matching our hash was found, but could not be decrypted");
|
||||
return null; // our hop is invalid? b0rkage
|
||||
}
|
||||
afterActualDecrypt = System.currentTimeMillis();
|
||||
log.debug(msg.getUniqueId() + ": Matching record: " + rv);
|
||||
ourHop = i;
|
||||
// TODO should we keep looking for a second match and fail if found?
|
||||
break;
|
||||
} catch (DataFormatException dfe) {
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn(msg.getUniqueId() + ": Matching record decrypt failure", dfe);
|
||||
// on the microscopic chance that there's another router
|
||||
// out there with the same first 16 bytes, go around again
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (rv == null) {
|
||||
// none of the records matched, b0rk
|
||||
if (log.shouldLog(Log.DEBUG))
|
||||
log.debug(msg.getUniqueId() + ": No records matching our hash was found");
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn(msg.getUniqueId() + ": No matching record");
|
||||
return null;
|
||||
}
|
||||
|
||||
long beforeEncrypt = System.currentTimeMillis();
|
||||
SessionKey replyKey = rv.readReplyKey();
|
||||
byte iv[] = rv.readReplyIV();
|
||||
int ivOff = 0;
|
||||
for (int i = 0; i < msg.getRecordCount(); i++) {
|
||||
if (i != ourHop) {
|
||||
EncryptedBuildRecord data = msg.getRecord(i);
|
||||
if (log.shouldLog(Log.DEBUG))
|
||||
log.debug("Encrypting record " + i + "/? with replyKey " + replyKey.toBase64() + "/" + Base64.encode(iv, ivOff, 16));
|
||||
// corrupts SDS
|
||||
ctx.aes().encrypt(data.getData(), 0, data.getData(), 0, replyKey,
|
||||
iv, ivOff, data.length());
|
||||
//if (log.shouldLog(Log.DEBUG))
|
||||
// log.debug("Encrypting record " + i + "/? with replyKey " + replyKey.toBase64() + "/" + Base64.encode(iv));
|
||||
// encrypt in-place, corrupts SDS
|
||||
byte[] bytes = data.getData();
|
||||
ctx.aes().encrypt(bytes, 0, bytes, 0, replyKey, iv, 0, EncryptedBuildRecord.LENGTH);
|
||||
}
|
||||
}
|
||||
long afterEncrypt = System.currentTimeMillis();
|
||||
@ -106,8 +165,6 @@ public class BuildMessageProcessor {
|
||||
log.warn("Slow decryption, total=" + (afterEncrypt-beforeLoop)
|
||||
+ " looping=" + (beforeEncrypt-beforeLoop)
|
||||
+ " decrypt=" + (afterActualDecrypt-beforeActualDecrypt)
|
||||
+ " eq=" + totalEq
|
||||
+ " dup=" + totalDup
|
||||
+ " encrypt=" + (afterEncrypt-beforeEncrypt));
|
||||
}
|
||||
return rv;
|
||||
|
@ -31,10 +31,6 @@ class HopProcessor {
|
||||
//static final boolean USE_DOUBLE_IV_ENCRYPTION = true;
|
||||
static final int IV_LENGTH = 16;
|
||||
|
||||
/** @deprecated unused */
|
||||
public HopProcessor(I2PAppContext ctx, HopConfig config) {
|
||||
this(ctx, config, createValidator());
|
||||
}
|
||||
|
||||
public HopProcessor(I2PAppContext ctx, HopConfig config, IVValidator validator) {
|
||||
_context = ctx;
|
||||
@ -43,12 +39,6 @@ class HopProcessor {
|
||||
_validator = validator;
|
||||
}
|
||||
|
||||
/** @deprecated unused */
|
||||
protected static IVValidator createValidator() {
|
||||
// yeah, we'll use an O(1) validator later (e.g. bloom filter)
|
||||
return new HashSetIVValidator();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the data for the current hop, overwriting the original data with
|
||||
* what should be sent to the next peer. This also validates the previous
|
||||
|
@ -204,6 +204,7 @@ public class TunnelDispatcher implements Service {
|
||||
ctx.statManager().createRateStat("tunnel.participantLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 });
|
||||
// following is for BuildMessageProcessor
|
||||
ctx.statManager().createRateStat("tunnel.buildRequestDup", "How frequently we get dup build request messages", "Tunnels", new long[] { 60*60*1000 });
|
||||
ctx.statManager().createRateStat("tunnel.buildRequestBadReplyKey", "Build requests with bad reply keys", "Tunnels", new long[] { 60*60*1000 });
|
||||
// following are for FragmentHandler
|
||||
ctx.statManager().createRateStat("tunnel.smallFragments", "How many pad bytes are in small fragments?",
|
||||
"Tunnels", RATES);
|
||||
@ -603,7 +604,7 @@ public class TunnelDispatcher implements Service {
|
||||
+ " messageId " + msg.getUniqueId()
|
||||
+ "/" + msg.getMessage().getUniqueId()
|
||||
+ " messageType: " + msg.getMessage().getClass().getSimpleName()
|
||||
+ " existing = " + _inboundGateways.size(), new Exception("source"));
|
||||
+ " existing = " + _inboundGateways.size());
|
||||
}
|
||||
|
||||
//long dispatchTime = _context.clock().now() - before;
|
||||
|
@ -47,6 +47,10 @@ import net.i2p.util.Log;
|
||||
* it used to be called from the BuildExecutor thread loop.
|
||||
*
|
||||
* Note that 10 minute tunnel expiration is hardcoded in here.
|
||||
*
|
||||
* There is only one of these objects but there may be multiple
|
||||
* threads running it. Instantiated and started by TunnelPoolManager.
|
||||
*
|
||||
*/
|
||||
class BuildHandler implements Runnable {
|
||||
private final RouterContext _context;
|
||||
@ -122,6 +126,7 @@ class BuildHandler implements Runnable {
|
||||
_context.statManager().createRequiredRateStat("tunnel.rejectHopThrottle", "Reject per-hop limit", "Tunnels", new long[] { 60*60*1000 });
|
||||
_context.statManager().createRequiredRateStat("tunnel.dropReqThrottle", "Drop per-hop limit", "Tunnels", new long[] { 60*60*1000 });
|
||||
_context.statManager().createRequiredRateStat("tunnel.dropLookupThrottle", "Drop next hop lookup", "Tunnels", new long[] { 60*60*1000 });
|
||||
_context.statManager().createRateStat("tunnel.dropDecryptFail", "Can't find our slot", "Tunnels", new long[] { 60*60*1000 });
|
||||
|
||||
_context.statManager().createRequiredRateStat("tunnel.rejectOverloaded", "Delay to process rejected request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRequiredRateStat("tunnel.acceptLoad", "Delay to process accepted request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
|
||||
@ -459,15 +464,20 @@ class BuildHandler implements Runnable {
|
||||
// this not only decrypts the current hop's record, but encrypts the other records
|
||||
// with the enclosed reply key
|
||||
long beforeDecrypt = System.currentTimeMillis();
|
||||
BuildRequestRecord req = _processor.decrypt(_context, state.msg, _context.routerHash(), _context.keyManager().getPrivateKey());
|
||||
BuildRequestRecord req = _processor.decrypt(state.msg, _context.routerHash(), _context.keyManager().getPrivateKey());
|
||||
long decryptTime = System.currentTimeMillis() - beforeDecrypt;
|
||||
_context.statManager().addRateData("tunnel.decryptRequestTime", decryptTime);
|
||||
if (decryptTime > 500 && _log.shouldLog(Log.WARN))
|
||||
_log.warn("Took too long to decrypt the request: " + decryptTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
|
||||
if (req == null) {
|
||||
// no records matched, or the decryption failed. bah
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("The request " + state.msg.getUniqueId() + " could not be decrypted");
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
Hash from = state.fromHash;
|
||||
if (from == null && state.from != null)
|
||||
from = state.from.calculateHash();
|
||||
_log.warn("The request " + state.msg.getUniqueId() + " could not be decrypted from: " + from);
|
||||
}
|
||||
_context.statManager().addRateData("tunnel.dropDecryptFail", 1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -993,7 +1003,7 @@ class BuildHandler implements Runnable {
|
||||
fh = from.calculateHash();
|
||||
if (fh != null && _requestThrottler.shouldThrottle(fh)) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Dropping tunnel request (from throttle), previous hop: " + from);
|
||||
_log.warn("Dropping tunnel request (from throttle), previous hop: " + fh);
|
||||
_context.statManager().addRateData("tunnel.dropReqThrottle", 1);
|
||||
accept = false;
|
||||
}
|
||||
|
@ -61,13 +61,19 @@ public class DecayingBloomFilter {
|
||||
_longToEntry = null;
|
||||
_longToEntryMask = 0;
|
||||
context.addShutdownTask(new Shutdown());
|
||||
_decayEvent = new DecayEvent();
|
||||
_keepDecaying = true;
|
||||
if (_durationMs == 60*60*1000) {
|
||||
// special mode for BuildMessageProcessor
|
||||
_decayEvent = new DecayHourlyEvent();
|
||||
} else {
|
||||
_decayEvent = new DecayEvent();
|
||||
_decayEvent.schedule(_durationMs);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a bloom filter that will decay its entries over time.
|
||||
* Uses default m of 23, memory usage is 2 MB.
|
||||
*
|
||||
* @param durationMs entries last for at least this long, but no more than twice this long
|
||||
* @param entryBytes how large are the entries to be added? if this is less than 32 bytes,
|
||||
@ -78,7 +84,10 @@ public class DecayingBloomFilter {
|
||||
this(context, durationMs, entryBytes, "DBF");
|
||||
}
|
||||
|
||||
/** @param name just for logging / debugging / stats */
|
||||
/**
|
||||
* Uses default m of 23, memory usage is 2 MB.
|
||||
* @param name just for logging / debugging / stats
|
||||
*/
|
||||
public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes, String name) {
|
||||
// this is instantiated in four different places, they may have different
|
||||
// requirements, but for now use this as a gross method of memory reduction.
|
||||
@ -87,6 +96,8 @@ public class DecayingBloomFilter {
|
||||
}
|
||||
|
||||
/**
|
||||
* Memory usage is 2 * (2**m) bits or 2**(m-2) bytes.
|
||||
*
|
||||
* @param m filter size exponent, max is 29
|
||||
*/
|
||||
public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes, String name, int m) {
|
||||
@ -123,9 +134,14 @@ public class DecayingBloomFilter {
|
||||
_longToEntry = null;
|
||||
_longToEntryMask = 0;
|
||||
}
|
||||
_decayEvent = new DecayEvent();
|
||||
_keepDecaying = true;
|
||||
if (_durationMs == 60*60*1000) {
|
||||
// special mode for BuildMessageProcessor
|
||||
_decayEvent = new DecayHourlyEvent();
|
||||
} else {
|
||||
_decayEvent = new DecayEvent();
|
||||
_decayEvent.schedule(_durationMs);
|
||||
}
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("New DBF " + name + " m = " + m + " k = " + k + " entryBytes = " + entryBytes +
|
||||
" numExtenders = " + numExtenders + " cycle (s) = " + (durationMs / 1000));
|
||||
@ -318,6 +334,9 @@ public class DecayingBloomFilter {
|
||||
}
|
||||
|
||||
private class DecayEvent extends SimpleTimer2.TimedEvent {
|
||||
/**
|
||||
* Caller MUST schedule.
|
||||
*/
|
||||
DecayEvent() {
|
||||
super(_context.simpleTimer2());
|
||||
}
|
||||
@ -330,6 +349,48 @@ public class DecayingBloomFilter {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decays at 5 minutes after the top of the hour.
|
||||
* This ignores leap seconds.
|
||||
* @since 0.9.24
|
||||
*/
|
||||
private class DecayHourlyEvent extends SimpleTimer2.TimedEvent {
|
||||
private static final long HOUR = 60 * 60 * 1000L;
|
||||
private static final long LAG = 5 * 60 * 1000L;
|
||||
private volatile long _currentHour;
|
||||
|
||||
/**
|
||||
* Schedules itself. Caller MUST NOT schedule.
|
||||
*/
|
||||
DecayHourlyEvent() {
|
||||
super(_context.simpleTimer2());
|
||||
schedule(getTimeTillNextHour());
|
||||
}
|
||||
|
||||
public void timeReached() {
|
||||
if (_keepDecaying) {
|
||||
long now = _context.clock().now();
|
||||
long currentHour = now / HOUR;
|
||||
// handle possible clock adjustments
|
||||
if (_currentHour != currentHour) {
|
||||
decay();
|
||||
_currentHour = currentHour;
|
||||
}
|
||||
long next = ((1 + currentHour) * HOUR) + LAG;
|
||||
schedule(Math.max(5000, next - now));
|
||||
}
|
||||
}
|
||||
|
||||
/** side effect: sets _currentHour */
|
||||
private long getTimeTillNextHour() {
|
||||
long now = _context.clock().now();
|
||||
long currentHour = now / HOUR;
|
||||
_currentHour = currentHour;
|
||||
long next = ((1 + currentHour) * HOUR) + LAG;
|
||||
return Math.max(5000, next - now);
|
||||
}
|
||||
}
|
||||
|
||||
/** @since 0.8.11 moved from DecayingHashSet */
|
||||
protected void getReadLock() {
|
||||
_reorganizeLock.readLock().lock();
|
||||
|
Reference in New Issue
Block a user