propagate from branch 'i2p.i2p' (head dbcd208a02bbecfe924e13a7d71297ece3f01ef3)

to branch 'i2p.i2p.zzz.test' (head 9eee20312852c80ca6c8e079174578a335edbe6d)
This commit is contained in:
zzz
2010-01-06 17:16:18 +00:00
174 changed files with 15000 additions and 5280 deletions

View File

@ -5,28 +5,36 @@ import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.util.Log;
//import net.i2p.util.Log;
/**
* Read and write the reply to a tunnel build message record.
*
* The reply record is the same size as the request record (528 bytes).
* Bytes 0-526 contain random data.
* Byte 527 contains the reply.
*/
public class BuildResponseRecord {
/**
* Create a new encrypted response
*
* @param status the response
* @param responseMessageId unused except for debugging
* @return a 528-byte response record
*/
public byte[] create(I2PAppContext ctx, int status, SessionKey replyKey, byte replyIV[], long responseMessageId) {
Log log = ctx.logManager().getLog(BuildResponseRecord.class);
public static byte[] create(I2PAppContext ctx, int status, SessionKey replyKey, byte replyIV[], long responseMessageId) {
//Log log = ctx.logManager().getLog(BuildResponseRecord.class);
byte rv[] = new byte[TunnelBuildReplyMessage.RECORD_SIZE];
ctx.random().nextBytes(rv);
DataHelper.toLong(rv, TunnelBuildMessage.RECORD_SIZE-1, 1, status);
// rv = AES(SHA256(padding+status) + padding + status, replyKey, replyIV)
ctx.sha().calculateHash(rv, Hash.HASH_LENGTH, rv.length - Hash.HASH_LENGTH, rv, 0);
if (log.shouldLog(Log.DEBUG))
log.debug(responseMessageId + ": before encrypt: " + Base64.encode(rv, 0, 128) + " with " + replyKey.toBase64() + "/" + Base64.encode(replyIV));
//if (log.shouldLog(Log.DEBUG))
// log.debug(responseMessageId + ": before encrypt: " + Base64.encode(rv, 0, 128) + " with " + replyKey.toBase64() + "/" + Base64.encode(replyIV));
ctx.aes().encrypt(rv, 0, rv, 0, replyKey, replyIV, rv.length);
if (log.shouldLog(Log.DEBUG))
log.debug(responseMessageId + ": after encrypt: " + Base64.encode(rv, 0, 128));
//if (log.shouldLog(Log.DEBUG))
// log.debug(responseMessageId + ": after encrypt: " + Base64.encode(rv, 0, 128));
return rv;
}
}

View File

@ -12,7 +12,6 @@ import java.io.IOException;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
// import net.i2p.util.Log;
/**
* Defines a message containing arbitrary bytes of data
@ -20,7 +19,6 @@ import net.i2p.data.DataHelper;
* @author jrandom
*/
public class DataMessage extends I2NPMessageImpl {
// private final static Log _log = new Log(DataMessage.class);
public final static int MESSAGE_TYPE = 20;
private byte _data[];

View File

@ -15,7 +15,6 @@ import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.util.Log;
/**
* Defines the message a router sends to another router in response to a
@ -25,7 +24,6 @@ import net.i2p.util.Log;
* @author jrandom
*/
public class DatabaseSearchReplyMessage extends I2NPMessageImpl {
private final static Log _log = new Log(DatabaseSearchReplyMessage.class);
public final static int MESSAGE_TYPE = 3;
private Hash _key;
private List _peerHashes;

View File

@ -18,7 +18,6 @@ import net.i2p.data.Hash;
import net.i2p.data.LeaseSet;
import net.i2p.data.RouterInfo;
import net.i2p.data.TunnelId;
import net.i2p.util.Log;
/**
* Defines the message a router sends to another router to test the network
@ -27,7 +26,6 @@ import net.i2p.util.Log;
* @author jrandom
*/
public class DatabaseStoreMessage extends I2NPMessageImpl {
private final static Log _log = new Log(DatabaseStoreMessage.class);
public final static int MESSAGE_TYPE = 1;
private Hash _key;
private int _type;

View File

@ -12,14 +12,12 @@ import java.io.IOException;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
* Contains the sending router's current time, to sync (and verify sync)
*
*/
public class DateMessage extends I2NPMessageImpl {
private final static Log _log = new Log(DateMessage.class);
public final static int MESSAGE_TYPE = 16;
private long _now;

View File

@ -12,7 +12,6 @@ import java.io.IOException;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
* Defines the message sent back in reply to a message when requested, containing
@ -21,7 +20,6 @@ import net.i2p.util.Log;
* @author jrandom
*/
public class DeliveryStatusMessage extends I2NPMessageImpl {
private final static Log _log = new Log(DeliveryStatusMessage.class);
public final static int MESSAGE_TYPE = 10;
private long _id;
private long _arrival;

View File

@ -66,6 +66,22 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
throw new DataFormatException("Bad bytes", ime);
}
}
/**
* Read the header, then read the rest into buffer, then call
* readMessage in the implemented message type
*
* Specifically:
* 1 byte type (if caller didn't read already, as specified by the type param
* 4 byte ID
* 8 byte expiration
* 2 byte size
* 1 byte checksum
* size bytes of payload (read by readMessage() in implementation)
*
* @param type the message type or -1 if we should read it here
* @param buffer temp buffer to use
*/
public int readBytes(InputStream in, int type, byte buffer[]) throws I2NPMessageException, IOException {
try {
if (type < 0)
@ -268,6 +284,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
*/
/** used by SSU only */
public int toRawByteArray(byte buffer[]) {
verifyUnwritten();
if (RAW_FULL_SIZE)
@ -298,9 +315,13 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
}
/*****
public static I2NPMessage fromRawByteArray(I2PAppContext ctx, byte buffer[], int offset, int len) throws I2NPMessageException {
return fromRawByteArray(ctx, buffer, offset, len, new I2NPMessageHandler(ctx));
}
*****/
/** used by SSU only */
public static I2NPMessage fromRawByteArray(I2PAppContext ctx, byte buffer[], int offset, int len, I2NPMessageHandler handler) throws I2NPMessageException {
int type = (int)DataHelper.fromLong(buffer, offset, 1);
offset++;

View File

@ -33,7 +33,7 @@ public class TunnelDataMessage extends I2NPMessageImpl {
/** if we can't deliver a tunnel message in 10s, fuck it */
private static final int EXPIRATION_PERIOD = 10*1000;
private static final ByteCache _cache = ByteCache.getInstance(512, DATA_SIZE);
private static final ByteCache _cache;
/**
* When true, it means this tunnelDataMessage is being used as part of a tunnel
* processing pipeline, where the byte array is acquired during the TunnelDataMessage's
@ -42,9 +42,63 @@ public class TunnelDataMessage extends I2NPMessageImpl {
* handler's cache, etc), until it is finally released back into the cache when written
* to the next peer (or explicitly by the fragment handler's completion).
* Setting this to false just increases memory churn
*
* Well, this is tricky to get right and avoid data corruption,
* here's an example after checks were put in:
*
*
10:57:05.197 CRIT [NTCP read 1 ] 2p.data.i2np.TunnelDataMessage: TDM boom
net.i2p.data.i2np.I2NPMessageException: TDM data buf use after free
at net.i2p.data.i2np.TunnelDataMessage.writeMessageBody(TunnelDataMessage.java:124)
at net.i2p.data.i2np.I2NPMessageImpl.toByteArray(I2NPMessageImpl.java:217)
at net.i2p.router.transport.ntcp.NTCPConnection.bufferedPrepare(NTCPConnection.java:678)
at net.i2p.router.transport.ntcp.NTCPConnection.send(NTCPConnection.java:293)
at net.i2p.router.transport.ntcp.NTCPTransport.outboundMessageReady(NTCPTransport.java:185)
at net.i2p.router.transport.TransportImpl.send(TransportImpl.java:357)
at net.i2p.router.transport.GetBidsJob.getBids(GetBidsJob.java:80)
at net.i2p.router.transport.CommSystemFacadeImpl.processMessage(CommSystemFacadeImpl.java:129)
at net.i2p.router.OutNetMessagePool.add(OutNetMessagePool.java:61)
at net.i2p.router.transport.TransportImpl.afterSend(TransportImpl.java:252)
at net.i2p.router.transport.TransportImpl.afterSend(TransportImpl.java:163)
at net.i2p.router.transport.udp.UDPTransport.failed(UDPTransport.java:1314)
at net.i2p.router.transport.udp.PeerState.add(PeerState.java:1064)
at net.i2p.router.transport.udp.OutboundMessageFragments.add(OutboundMessageFragments.java:146)
at net.i2p.router.transport.udp.UDPTransport.send(UDPTransport.java:1098)
at net.i2p.router.transport.GetBidsJob.getBids(GetBidsJob.java:80)
at net.i2p.router.transport.CommSystemFacadeImpl.processMessage(CommSystemFacadeImpl.java:129)
at net.i2p.router.OutNetMessagePool.add(OutNetMessagePool.java:61)
at net.i2p.router.tunnel.TunnelParticipant.send(TunnelParticipant.java:172)
at net.i2p.router.tunnel.TunnelParticipant.dispatch(TunnelParticipant.java:86)
at net.i2p.router.tunnel.TunnelDispatcher.dispatch(TunnelDispatcher.java:351)
at net.i2p.router.InNetMessagePool.doShortCircuitTunnelData(InNetMessagePool.java:306)
at net.i2p.router.InNetMessagePool.shortCircuitTunnelData(InNetMessagePool.java:291)
at net.i2p.router.InNetMessagePool.add(InNetMessagePool.java:160)
at net.i2p.router.transport.TransportManager.messageReceived(TransportManager.java:462)
at net.i2p.router.transport.TransportImpl.messageReceived(TransportImpl.java:416)
at net.i2p.router.transport.ntcp.NTCPConnection$ReadState.receiveLastBlock(NTCPConnection.java:1285)
at net.i2p.router.transport.ntcp.NTCPConnection$ReadState.receiveSubsequent(NTCPConnection.java:1248)
at net.i2p.router.transport.ntcp.NTCPConnection$ReadState.receiveBlock(NTCPConnection.java:1205)
at net.i2p.router.transport.ntcp.NTCPConnection.recvUnencryptedI2NP(NTCPConnection.java:1035)
at net.i2p.router.transport.ntcp.NTCPConnection.recvEncryptedI2NP(NTCPConnection.java:1018)
at net.i2p.router.transport.ntcp.Reader.processRead(Reader.java:167)
at net.i2p.router.transport.ntcp.Reader.access$400(Reader.java:17)
at net.i2p.router.transport.ntcp.Reader$Runner.run(Reader.java:106)
at java.lang.Thread.run(Thread.java:619)
at net.i2p.util.I2PThread.run(I2PThread.java:71)
*
*/
private static final boolean PIPELINED_CACHE = true;
static {
if (PIPELINED_CACHE)
_cache = ByteCache.getInstance(512, DATA_SIZE);
else
_cache = null;
}
/** For use-after-free checks. Always false if PIPELINED_CACHE is false. */
private boolean _hadCache;
public TunnelDataMessage(I2PAppContext context) {
super(context);
_log = context.logManager().getLog(TunnelDataMessage.class);
@ -63,7 +117,15 @@ public class TunnelDataMessage extends I2NPMessageImpl {
_tunnelId = id.getTunnelId();
}
public byte[] getData() { return _data; }
public byte[] getData() {
if (_hadCache && _dataBuf == null) {
RuntimeException e = new RuntimeException("TDM data buf use after free");
_log.error("TDM boom", e);
throw e;
}
return _data;
}
public void setData(byte data[]) {
if ( (data == null) || (data.length <= 0) )
throw new IllegalArgumentException("Empty tunnel payload?");
@ -86,6 +148,7 @@ public class TunnelDataMessage extends I2NPMessageImpl {
if (PIPELINED_CACHE) {
_dataBuf = _cache.acquire();
_data = _dataBuf.getData();
_hadCache = true;
} else {
_data = new byte[DATA_SIZE];
}
@ -101,12 +164,24 @@ public class TunnelDataMessage extends I2NPMessageImpl {
if (_data.length <= 0)
throw new I2NPMessageException("Not enough data to write out (data.length=" + _data.length + ")");
if (_hadCache && _dataBuf == null) {
I2NPMessageException e = new I2NPMessageException("TDM data buf use after free");
_log.error("TDM boom", e);
throw e;
}
DataHelper.toLong(out, curIndex, 4, _tunnelId);
curIndex += 4;
System.arraycopy(_data, 0, out, curIndex, DATA_SIZE);
curIndex += _data.length;
if (PIPELINED_CACHE)
_cache.release(_dataBuf);
// We can use from the cache, we just can't release to the cache, due to the bug
// noted above. In effect, this means that transmitted TDMs don't get their
// dataBufs released - but received TDMs do (via FragmentHandler)
//if (_hadCache) {
// _cache.release(_dataBuf);
// _dataBuf = null;
//}
return curIndex;
}

View File

@ -18,8 +18,6 @@ import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.data.i2np.I2NPMessage;
//import net.i2p.data.i2np.TunnelCreateMessage;
//import net.i2p.data.i2np.TunnelCreateStatusMessage;
import net.i2p.data.i2np.TunnelDataMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.util.I2PThread;
@ -35,6 +33,7 @@ public class InNetMessagePool implements Service {
private Log _log;
private RouterContext _context;
private HandlerJobBuilder _handlerJobBuilders[];
/** following 5 unused unless DISPATCH_DIRECT == false */
private final List _pendingDataMessages;
private final List _pendingDataMessagesFrom;
private final List _pendingGatewayMessages;
@ -63,21 +62,27 @@ public class InNetMessagePool implements Service {
public InNetMessagePool(RouterContext context) {
_context = context;
// 32 is greater than the max I2NP message type number (currently 22) + 1
_handlerJobBuilders = new HandlerJobBuilder[32];
_pendingDataMessages = new ArrayList(16);
_pendingDataMessagesFrom = new ArrayList(16);
_pendingGatewayMessages = new ArrayList(16);
_shortCircuitDataJob = new SharedShortCircuitDataJob(context);
_shortCircuitGatewayJob = new SharedShortCircuitGatewayJob(context);
if (DISPATCH_DIRECT) {
// keep the compiler happy since they are final
_pendingDataMessages = null;
_pendingDataMessagesFrom = null;
_pendingGatewayMessages = null;
} else {
_pendingDataMessages = new ArrayList(16);
_pendingDataMessagesFrom = new ArrayList(16);
_pendingGatewayMessages = new ArrayList(16);
_shortCircuitDataJob = new SharedShortCircuitDataJob(context);
_shortCircuitGatewayJob = new SharedShortCircuitGatewayJob(context);
}
_log = _context.logManager().getLog(InNetMessagePool.class);
_alive = false;
_context.statManager().createRateStat("inNetPool.dropped", "How often do we drop a message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("inNetPool.droppedDeliveryStatusDelay", "How long after a delivery status message is created do we receive it back again (for messages that are too slow to be handled)", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("inNetPool.duplicate", "How often do we receive a duplicate message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("inNetPool.dropped", "How often do we drop a message", "InNetPool", new long[] { 60*60*1000l });
_context.statManager().createRateStat("inNetPool.droppedDeliveryStatusDelay", "How long after a delivery status message is created do we receive it back again (for messages that are too slow to be handled)", "InNetPool", new long[] { 60*60*1000l });
_context.statManager().createRateStat("inNetPool.duplicate", "How often do we receive a duplicate message", "InNetPool", new long[] { 60*60*1000l });
//_context.statManager().createRateStat("inNetPool.droppedTunnelCreateStatusMessage", "How often we drop a slow-to-arrive tunnel request response", "InNetPool", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("inNetPool.droppedDbLookupResponseMessage", "How often we drop a slow-to-arrive db search response", "InNetPool", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("pool.dispatchDataTime", "How long a tunnel dispatch takes", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("pool.dispatchGatewayTime", "How long a tunnel gateway dispatch takes", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("inNetPool.droppedDbLookupResponseMessage", "How often we drop a slow-to-arrive db search response", "InNetPool", new long[] { 60*60*1000l });
}
public HandlerJobBuilder registerHandlerJobBuilder(int i2npMessageType, HandlerJobBuilder builder) {
@ -309,10 +314,12 @@ public class InNetMessagePool implements Service {
}
public void shutdown() {
_alive = false;
synchronized (_pendingDataMessages) {
_pendingDataMessages.clear();
_pendingDataMessagesFrom.clear();
_pendingDataMessages.notifyAll();
if (!DISPATCH_DIRECT) {
synchronized (_pendingDataMessages) {
_pendingDataMessages.clear();
_pendingDataMessagesFrom.clear();
_pendingDataMessages.notifyAll();
}
}
}
@ -324,6 +331,8 @@ public class InNetMessagePool implements Service {
_dispatchThreaded = Boolean.valueOf(threadedStr).booleanValue();
}
if (_dispatchThreaded) {
_context.statManager().createRateStat("pool.dispatchDataTime", "How long a tunnel dispatch takes", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("pool.dispatchGatewayTime", "How long a tunnel gateway dispatch takes", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
I2PThread data = new I2PThread(new TunnelDataDispatcher(), "Tunnel data dispatcher");
data.setDaemon(true);
data.start();
@ -333,6 +342,7 @@ public class InNetMessagePool implements Service {
}
}
/** unused unless DISPATCH_DIRECT == false */
private class SharedShortCircuitDataJob extends JobImpl {
public SharedShortCircuitDataJob(RouterContext ctx) {
super(ctx);
@ -355,6 +365,8 @@ public class InNetMessagePool implements Service {
getContext().jobQueue().addJob(SharedShortCircuitDataJob.this);
}
}
/** unused unless DISPATCH_DIRECT == false */
private class SharedShortCircuitGatewayJob extends JobImpl {
public SharedShortCircuitGatewayJob(RouterContext ctx) {
super(ctx);
@ -375,6 +387,7 @@ public class InNetMessagePool implements Service {
}
}
/** unused unless router.dispatchThreaded=true */
private class TunnelGatewayDispatcher implements Runnable {
public void run() {
while (_alive) {
@ -403,6 +416,8 @@ public class InNetMessagePool implements Service {
}
}
}
/** unused unless router.dispatchThreaded=true */
private class TunnelDataDispatcher implements Runnable {
public void run() {
while (_alive) {

View File

@ -33,7 +33,7 @@ public class MessageHistory {
private boolean _doPause; // true == briefly stop writing data to the log (used while submitting it)
private ReinitializeJob _reinitializeJob;
private WriteJob _writeJob;
private SubmitMessageHistoryJob _submitMessageHistoryJob;
//private SubmitMessageHistoryJob _submitMessageHistoryJob;
private volatile boolean _firstPass;
private final static byte[] NL = System.getProperty("line.separator").getBytes();

View File

@ -126,7 +126,7 @@ public class OutNetMessage {
}
return ZERO;
}
private static final Long ZERO = new Long(0);
private static final Long ZERO = Long.valueOf(0);
private void locked_initTimestamps() {
if (_timestamps == null) {
_timestamps = new HashMap(8);

View File

@ -37,10 +37,12 @@ import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.router.startup.StartupJob;
import net.i2p.router.startup.WorkingDir;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.transport.udp.UDPTransport;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.stat.StatManager;
import net.i2p.util.FileUtil;
import net.i2p.util.I2PAppThread;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
@ -201,6 +203,8 @@ public class Router {
installUpdates();
// Apps may use this as an easy way to determine if they are in the router JVM
// But context.isRouterContext() is even easier...
// Both of these as of 0.7.9
System.setProperty("router.version", RouterVersion.VERSION);
// NOW we start all the activity
@ -228,14 +232,10 @@ public class Router {
}
};
_shutdownHook = new ShutdownHook(_context);
_gracefulShutdownDetector = new I2PThread(new GracefulShutdown());
_gracefulShutdownDetector.setDaemon(true);
_gracefulShutdownDetector.setName("Graceful shutdown hook");
_gracefulShutdownDetector = new I2PAppThread(new GracefulShutdown(), "Graceful shutdown hook", true);
_gracefulShutdownDetector.start();
I2PThread watchdog = new I2PThread(new RouterWatchdog(_context));
watchdog.setName("RouterWatchdog");
watchdog.setDaemon(true);
Thread watchdog = new I2PAppThread(new RouterWatchdog(_context), "RouterWatchdog", true);
watchdog.start();
}
@ -315,10 +315,10 @@ public class Router {
readConfig();
setupHandlers();
if (ALLOW_DYNAMIC_KEYS) {
if ("true".equalsIgnoreCase(_context.getProperty(Router.PROP_HIDDEN, "false")))
killKeys();
}
//if (ALLOW_DYNAMIC_KEYS) {
// if ("true".equalsIgnoreCase(_context.getProperty(Router.PROP_HIDDEN, "false")))
// killKeys();
//}
_context.messageValidator().startup();
_context.tunnelDispatcher().startup();
@ -339,7 +339,7 @@ public class Router {
long waited = System.currentTimeMillis() - before;
if (_log.shouldLog(Log.INFO))
_log.info("Waited " + waited + "ms to initialize");
_context.jobQueue().addJob(new StartupJob(_context));
}
@ -527,7 +527,7 @@ public class Router {
static final String IDENTLOG = "identlog.txt";
public void killKeys() {
new Exception("Clearing identity files").printStackTrace();
//new Exception("Clearing identity files").printStackTrace();
int remCount = 0;
for (int i = 0; i < _rebuildFiles.length; i++) {
File f = new File(_context.getRouterDir(),_rebuildFiles[i]);
@ -541,6 +541,12 @@ public class Router {
}
}
}
// now that we have random ports, keeping the same port would be bad
removeConfigSetting(UDPTransport.PROP_INTERNAL_PORT);
removeConfigSetting(UDPTransport.PROP_EXTERNAL_PORT);
saveConfig();
if (remCount > 0) {
FileOutputStream log = null;
try {
@ -910,11 +916,11 @@ public class Router {
*/
private static final boolean ALLOW_DYNAMIC_KEYS = false;
public void finalShutdown(int exitCode) {
private void finalShutdown(int exitCode) {
_log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete", new Exception("Shutdown"));
try { _context.logManager().shutdown(); } catch (Throwable t) { }
if (ALLOW_DYNAMIC_KEYS) {
if ("true".equalsIgnoreCase(_context.getProperty(PROP_DYNAMIC_KEYS, "false")))
if (Boolean.valueOf(_context.getProperty(PROP_DYNAMIC_KEYS)).booleanValue())
killKeys();
}

View File

@ -379,4 +379,12 @@ public class RouterContext extends I2PAppContext {
}
}
/**
* Use this instead of context instanceof RouterContext
* @return true
* @since 0.7.9
*/
public boolean isRouterContext() {
return true;
}
}

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 1;
public final static long BUILD = 17;
/** for example "-test" */
public final static String EXTRA = "";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;

View File

@ -1,138 +0,0 @@
package net.i2p.router;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Date;
import net.i2p.I2PAppContext;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.DataStructureImpl;
/**
* Wrap up the settings specified for a particular tunnel
*
*/
public class TunnelSettings extends DataStructureImpl {
private I2PAppContext _context;
private int _depth;
private long _msgsPerMinuteAvg;
private long _bytesPerMinuteAvg;
private long _msgsPerMinutePeak;
private long _bytesPerMinutePeak;
private boolean _includeDummy;
private boolean _reorder;
private long _expiration;
private long _created;
public TunnelSettings(I2PAppContext context) {
_context = context;
_depth = 0;
_msgsPerMinuteAvg = 0;
_msgsPerMinutePeak = 0;
_bytesPerMinuteAvg = 0;
_bytesPerMinutePeak = 0;
_includeDummy = false;
_reorder = false;
_expiration = 0;
_created = _context.clock().now();
}
public int getDepth() { return _depth; }
public void setDepth(int depth) { _depth = depth; }
public long getMessagesPerMinuteAverage() { return _msgsPerMinuteAvg; }
public long getMessagesPerMinutePeak() { return _msgsPerMinutePeak; }
public long getBytesPerMinuteAverage() { return _bytesPerMinuteAvg; }
public long getBytesPerMinutePeak() { return _bytesPerMinutePeak; }
public void setMessagesPerMinuteAverage(long msgs) { _msgsPerMinuteAvg = msgs; }
public void setMessagesPerMinutePeak(long msgs) { _msgsPerMinutePeak = msgs; }
public void setBytesPerMinuteAverage(long bytes) { _bytesPerMinuteAvg = bytes; }
public void setBytesPerMinutePeak(long bytes) { _bytesPerMinutePeak = bytes; }
public boolean getIncludeDummy() { return _includeDummy; }
public void setIncludeDummy(boolean include) { _includeDummy = include; }
public boolean getReorder() { return _reorder; }
public void setReorder(boolean reorder) { _reorder = reorder; }
public long getExpiration() { return _expiration; }
public void setExpiration(long expiration) { _expiration = expiration; }
public long getCreated() { return _created; }
public void readBytes(InputStream in) throws DataFormatException, IOException {
Boolean b = DataHelper.readBoolean(in);
if (b == null) throw new DataFormatException("Null includeDummy boolean value");
_includeDummy = b.booleanValue();
b = DataHelper.readBoolean(in);
if (b == null) throw new DataFormatException("Null reorder boolean value");
_reorder = b.booleanValue();
_depth = (int)DataHelper.readLong(in, 1);
_bytesPerMinuteAvg = DataHelper.readLong(in, 4);
_bytesPerMinutePeak = DataHelper.readLong(in, 4);
Date exp = DataHelper.readDate(in);
if (exp == null)
_expiration = 0;
else
_expiration = exp.getTime();
_msgsPerMinuteAvg = DataHelper.readLong(in, 4);
_msgsPerMinutePeak = DataHelper.readLong(in, 4);
Date created = DataHelper.readDate(in);
if (created != null)
_created = created.getTime();
else
_created = _context.clock().now();
}
public void writeBytes(OutputStream out) throws DataFormatException, IOException {
DataHelper.writeBoolean(out, _includeDummy ? Boolean.TRUE : Boolean.FALSE);
DataHelper.writeBoolean(out, _reorder ? Boolean.TRUE : Boolean.FALSE);
DataHelper.writeLong(out, 1, _depth);
DataHelper.writeLong(out, 4, _bytesPerMinuteAvg);
DataHelper.writeLong(out, 4, _bytesPerMinutePeak);
if (_expiration <= 0)
DataHelper.writeDate(out, new Date(0));
else
DataHelper.writeDate(out, new Date(_expiration));
DataHelper.writeLong(out, 4, _msgsPerMinuteAvg);
DataHelper.writeLong(out, 4, _msgsPerMinutePeak);
DataHelper.writeDate(out, new Date(_created));
}
@Override
public int hashCode() {
int rv = 0;
rv += _includeDummy ? 100 : 0;
rv += _reorder ? 50 : 0;
rv += _depth;
rv += _bytesPerMinuteAvg;
rv += _bytesPerMinutePeak;
rv += _expiration;
rv += _msgsPerMinuteAvg;
rv += _msgsPerMinutePeak;
return rv;
}
@Override
public boolean equals(Object obj) {
if ( (obj != null) && (obj instanceof TunnelSettings) ) {
TunnelSettings settings = (TunnelSettings)obj;
return settings.getBytesPerMinuteAverage() == getBytesPerMinuteAverage() &&
settings.getBytesPerMinutePeak() == getBytesPerMinutePeak() &&
settings.getDepth() == getDepth() &&
settings.getExpiration() == getExpiration() &&
settings.getIncludeDummy() == getIncludeDummy() &&
settings.getMessagesPerMinuteAverage() == getMessagesPerMinuteAverage() &&
settings.getMessagesPerMinutePeak() == getMessagesPerMinutePeak() &&
settings.getReorder() == getReorder();
} else {
return false;
}
}
}

View File

@ -19,6 +19,7 @@ import net.i2p.util.InternalServerSocket;
* Listen for in-JVM connections on the internal "socket"
*
* @author zzz
* @since 0.7.9
*/
public class InternalClientListenerRunner extends ClientListenerRunner {

View File

@ -100,7 +100,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
getContext().statManager().addRateData("netDb.lookupsMatchedReceivedPublished", 1, 0);
sendData(_message.getSearchKey(), ls, fromKey, _message.getReplyTunnel());
} else {
Set routerInfoSet = getContext().netDb().findNearestRouters(_message.getSearchKey(),
Set<RouterInfo> routerInfoSet = getContext().netDb().findNearestRouters(_message.getSearchKey(),
CLOSENESS_THRESHOLD,
_message.getDontIncludePeers());
if (getContext().clientManager().isLocal(ls.getDestination())) {
@ -142,13 +142,11 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
}
} else {
// not found locally - return closest peer routerInfo structs
Set dontInclude = _message.getDontIncludePeers();
// TODO: Honor flag to exclude all floodfills
Set<Hash> dontInclude = _message.getDontIncludePeers();
// Honor flag to exclude all floodfills
//if (dontInclude.contains(Hash.FAKE_HASH)) {
// dontInclude = new HashSet(dontInclude);
// dontInclude.addAll( pfffft );
//}
Set routerInfoSet = getContext().netDb().findNearestRouters(_message.getSearchKey(),
// This is handled in FloodfillPeerSelector
Set<RouterInfo> routerInfoSet = getContext().netDb().findNearestRouters(_message.getSearchKey(),
MAX_ROUTERS_RETURNED,
dontInclude);
@ -199,7 +197,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
private void sendData(Hash key, DataStructure data, Hash toPeer, TunnelId replyTunnel) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending data matching key key " + key.toBase64() + " to peer " + toPeer.toBase64()
_log.debug("Sending data matching key " + key.toBase64() + " to peer " + toPeer.toBase64()
+ " tunnel " + replyTunnel);
DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext());
msg.setKey(key);
@ -216,7 +214,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
sendMessage(msg, toPeer, replyTunnel);
}
protected void sendClosest(Hash key, Set routerInfoSet, Hash toPeer, TunnelId replyTunnel) {
protected void sendClosest(Hash key, Set<RouterInfo> routerInfoSet, Hash toPeer, TunnelId replyTunnel) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending closest routers to key " + key.toBase64() + ": # peers = "
+ routerInfoSet.size() + " tunnel " + replyTunnel);

View File

@ -85,12 +85,14 @@ class ExploreJob extends SearchJob {
msg.setReplyTunnel(replyTunnelId);
int available = MAX_CLOSEST - msg.getDontIncludePeers().size();
// TODO: add this once ../HTLMJ handles it
//if (available > 0) {
// // add a flag to say this is an exploration and we don't want floodfills in the responses
// if (msg.getDontIncludePeers().add(Hash.FAKE_HASH))
// available--;
//}
if (available > 0) {
// Add a flag to say this is an exploration and we don't want floodfills in the responses.
// Doing it this way is of course backwards-compatible.
// Supported as of 0.7.9
if (msg.getDontIncludePeers().add(Hash.FAKE_HASH))
available--;
}
KBucketSet ks = _facade.getKBuckets();
Hash rkey = getContext().routingKeyGenerator().getRoutingKey(getState().getTarget());
// in a few releases, we can (and should) remove this,

View File

@ -98,7 +98,8 @@ class FloodfillPeerSelector extends PeerSelector {
* @param maxNumRouters max to return
* Sorted by closest to the key if > maxNumRouters, otherwise not
* The list is in 3 groups - sorted by routing key within each group.
* Group 1: No store or lookup failure in a long time
* Group 1: No store or lookup failure in a long time, and
* lookup fail rate no more than 1.5 * average
* Group 2: No store or lookup failure in a little while or
* success newer than failure
* Group 3: All others
@ -126,6 +127,14 @@ class FloodfillPeerSelector extends PeerSelector {
int found = 0;
long now = _context.clock().now();
double maxFailRate;
if (_context.router().getUptime() > 60*60*1000) {
double currentFailRate = _context.statManager().getRate("peer.failedLookupRate").getRate(60*60*1000).getAverageValue();
maxFailRate = Math.max(0.20d, 1.5d * currentFailRate);
} else {
maxFailRate = 100d; // disable
}
// split sorted list into 3 sorted lists
for (int i = 0; found < howMany && i < ffs.size(); i++) {
Hash entry = sorted.first();
@ -146,7 +155,8 @@ class FloodfillPeerSelector extends PeerSelector {
if (prof != null && prof.getDBHistory() != null
&& prof.getDbResponseTime().getRate(10*60*1000).getAverageValue() < maxGoodRespTime
&& prof.getDBHistory().getLastStoreFailed() < now - NO_FAIL_STORE_GOOD
&& prof.getDBHistory().getLastLookupFailed() < now - NO_FAIL_LOOKUP_GOOD) {
&& prof.getDBHistory().getLastLookupFailed() < now - NO_FAIL_LOOKUP_GOOD
&& prof.getDBHistory().getFailedLookupRate().getRate(60*60*1000).getAverageValue() < maxFailRate) {
// good
if (_log.shouldLog(Log.DEBUG))
_log.debug("Good: " + entry);
@ -246,11 +256,14 @@ class FloodfillPeerSelector extends PeerSelector {
/**
* @return list of all with the 'f' mark in their netdb except for shitlisted ones.
* Will return non-floodfills only if there aren't enough floodfills.
*
* The list is in 3 groups - unsorted (shuffled) within each group.
* Group 1: If preferConnected = true, the peers we are directly
* connected to, that meet the group 2 criteria
* Group 2: Netdb published less than 3h ago, no bad send in last 30m.
* Group 3: All others
* Group 4: Non-floodfills, sorted by closest-to-the-key
*/
public List<Hash> get(int howMany, boolean preferConnected) {
Collections.shuffle(_floodfillMatches, _context.random());
@ -310,6 +323,8 @@ class FloodfillPeerSelector extends PeerSelector {
/**
* Floodfill peers only. Used only by HandleDatabaseLookupMessageJob to populate the DSRM.
* UNLESS peersToIgnore contains Hash.FAKE_HASH (all zeros), in which case this is an exploratory
* lookup, and the response should not include floodfills.
*
* @param key the original key (NOT the routing key)
* @return List of Hash for the peers selected, ordered
@ -317,6 +332,15 @@ class FloodfillPeerSelector extends PeerSelector {
@Override
public List<Hash> selectNearest(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets) {
Hash rkey = _context.routingKeyGenerator().getRoutingKey(key);
return selectFloodfillParticipants(rkey, maxNumRouters, peersToIgnore, kbuckets);
if (peersToIgnore != null && peersToIgnore.contains(Hash.FAKE_HASH)) {
// return non-ff
peersToIgnore.addAll(selectFloodfillParticipants(peersToIgnore, kbuckets));
FloodfillSelectionCollector matches = new FloodfillSelectionCollector(rkey, peersToIgnore, maxNumRouters);
kbuckets.getAll(matches);
return matches.get(maxNumRouters);
} else {
// return ff
return selectFloodfillParticipants(rkey, maxNumRouters, peersToIgnore, kbuckets);
}
}
}

View File

@ -532,7 +532,7 @@ class SearchJob extends JobImpl {
void replyFound(DatabaseSearchReplyMessage message, Hash peer) {
long duration = _state.replyFound(peer);
// this processing can take a while, so split 'er up
getContext().jobQueue().addJob(new SearchReplyJob(getContext(), this, (DatabaseSearchReplyMessage)message, peer, duration));
getContext().jobQueue().addJob(new SearchReplyJob(getContext(), this, message, peer, duration));
}
/**

View File

@ -255,7 +255,7 @@ class StoreJob extends JobImpl {
private static final int MAX_DIRECT_EXPIRATION = 15*1000;
/**
* Send a store to the given peer through a garlic route, including a reply
* Send a store to the given peer, including a reply
* DeliveryStatusMessage so we know it got there
*
*/
@ -285,6 +285,11 @@ class StoreJob extends JobImpl {
sendStore(msg, router, getContext().clock().now() + responseTime);
}
/**
* Send a store to the given peer, including a reply
* DeliveryStatusMessage so we know it got there
*
*/
private void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) {
getContext().statManager().addRateData("netDb.storeLeaseSetSent", 1, 0);
@ -295,6 +300,11 @@ class StoreJob extends JobImpl {
}
}
/**
* Send directly,
* with the reply to come back directly.
*
*/
private void sendDirect(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
long token = getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
msg.setReplyToken(token);
@ -324,6 +334,12 @@ class StoreJob extends JobImpl {
getContext().commSystem().processMessage(m);
}
/**
* This is misnamed, it means sending it out through an exploratory tunnel,
* with the reply to come back through an exploratory tunnel.
* There is no garlic encryption added.
*
*/
private void sendStoreThroughGarlic(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
long token = getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);

View File

@ -15,7 +15,7 @@ import java.util.StringTokenizer;
import net.i2p.I2PAppContext;
import net.i2p.router.RouterContext;
import net.i2p.util.EepGet;
import net.i2p.util.I2PThread;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
/**
@ -52,13 +52,15 @@ public class Reseeder {
return;
} else {
System.setProperty(PROP_INPROGRESS, "true");
I2PThread reseed = new I2PThread(_reseedRunner, "Reseed");
// set to daemon so it doesn't hang a shutdown
Thread reseed = new I2PAppThread(_reseedRunner, "Reseed", true);
reseed.start();
}
}
}
/** Todo: translate the messages sent via PROP_STATUS */
public class ReseedRunner implements Runnable, EepGet.StatusListener {
private boolean _isRunning;

View File

@ -105,6 +105,7 @@ public class DBHistory {
*/
public RateStat getFailedLookupRate() { return _failedLookupRate; }
/** not sure how much this is used, to be investigated */
public RateStat getInvalidReplyRate() { return _invalidReplyRate; }
/**
@ -115,6 +116,7 @@ public class DBHistory {
public void lookupSuccessful() {
_successfulLookups++;
_failedLookupRate.addData(0, 0);
_context.statManager().addRateData("peer.failedLookupRate", 0, 0);
_lastLookupSuccessful = _context.clock().now();
}
@ -124,6 +126,7 @@ public class DBHistory {
public void lookupFailed() {
_failedLookups++;
_failedLookupRate.addData(1, 0);
_context.statManager().addRateData("peer.failedLookupRate", 1, 0);
_lastLookupFailed = _context.clock().now();
}
@ -136,6 +139,7 @@ public class DBHistory {
// Fixme, redefined this to include both lookup and store fails,
// need to fix the javadocs
_failedLookupRate.addData(0, 0);
_context.statManager().addRateData("peer.failedLookupRate", 0, 0);
_lastStoreSuccessful = _context.clock().now();
}
@ -275,9 +279,9 @@ public class DBHistory {
private void createRates(String statGroup) {
if (_failedLookupRate == null)
_failedLookupRate = new RateStat("dbHistory.failedLookupRate", "How often does this peer to respond to a lookup?", statGroup, new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_failedLookupRate = new RateStat("dbHistory.failedLookupRate", "How often does this peer to respond to a lookup?", statGroup, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
if (_invalidReplyRate == null)
_invalidReplyRate = new RateStat("dbHistory.invalidReplyRate", "How often does this peer give us a bad (nonexistant, forged, etc) peer?", statGroup, new long[] { 30*60*1000l, 60*60*1000l, 24*60*60*1000l });
_invalidReplyRate = new RateStat("dbHistory.invalidReplyRate", "How often does this peer give us a bad (nonexistant, forged, etc) peer?", statGroup, new long[] { 30*60*1000l });
_failedLookupRate.setStatLog(_context.statManager().getStatLog());
_invalidReplyRate.setStatLog(_context.statManager().getStatLog());
}

View File

@ -97,10 +97,10 @@ public class ProfileOrganizer {
_log = context.logManager().getLog(ProfileOrganizer.class);
_comp = new InverseCapacityComparator();
_fastPeers = new HashMap(16);
_highCapacityPeers = new HashMap(16);
_highCapacityPeers = new HashMap(32);
_wellIntegratedPeers = new HashMap(16);
_notFailingPeers = new HashMap(64);
_notFailingPeersList = new ArrayList(64);
_notFailingPeers = new HashMap(256);
_notFailingPeersList = new ArrayList(256);
_failingPeers = new HashMap(16);
_strictCapacityOrder = new TreeSet(_comp);
_thresholdSpeedValue = 0.0d;
@ -113,6 +113,8 @@ public class ProfileOrganizer {
_context.statManager().createRateStat("peer.profileThresholdTime", "How long the reorg takes determining the tier thresholds", "Peers", new long[] { 10*60*1000 });
_context.statManager().createRateStat("peer.profilePlaceTime", "How long the reorg takes placing peers in the tiers", "Peers", new long[] { 10*60*1000 });
_context.statManager().createRateStat("peer.profileReorgTime", "How long the reorg takes overall", "Peers", new long[] { 10*60*1000 });
// used in DBHistory
_context.statManager().createRateStat("peer.failedLookupRate", "DB Lookup fail rate", "Peers", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
}
private void getReadLock() {

View File

@ -28,13 +28,13 @@ public class TunnelHistory {
private RateStat _failRate;
private String _statGroup;
/** probabalistic tunnel rejection due to a flood of requests */
/** probabalistic tunnel rejection due to a flood of requests - essentially unused */
public static final int TUNNEL_REJECT_PROBABALISTIC_REJECT = 10;
/** tunnel rejection due to temporary cpu/job/tunnel overload */
/** tunnel rejection due to temporary cpu/job/tunnel overload - essentially unused */
public static final int TUNNEL_REJECT_TRANSIENT_OVERLOAD = 20;
/** tunnel rejection due to excess bandwidth usage */
public static final int TUNNEL_REJECT_BANDWIDTH = 30;
/** tunnel rejection due to system failure */
/** tunnel rejection due to system failure - essentially unused */
public static final int TUNNEL_REJECT_CRIT = 50;
public TunnelHistory(RouterContext context, String statGroup) {
@ -100,8 +100,10 @@ public class TunnelHistory {
}
}
// Define this rate as the probability it really failed
// @param pct = probability * 100
/**
* Define this rate as the probability it really failed
* @param pct = probability * 100
*/
public void incrementFailed(int pct) {
_lifetimeFailed++;
_failRate.addData(pct, 1);
@ -150,7 +152,7 @@ public class TunnelHistory {
_failRate.store(out, "tunnelHistory.failRate");
}
private void add(StringBuilder buf, String name, long val, String description) {
private static void add(StringBuilder buf, String name, long val, String description) {
buf.append("# ").append(name.toUpperCase()).append(NL).append("# ").append(description).append(NL);
buf.append("tunnels.").append(name).append('=').append(val).append(NL).append(NL);
}

View File

@ -22,7 +22,7 @@ import java.util.Set;
*/
public class Addresses {
/** return the first non-local address it finds, or null */
/** @return the first non-local address it finds, or null */
public static String getAnyAddress() {
String[] a = getAddresses();
if (a.length > 0)
@ -31,7 +31,7 @@ public class Addresses {
}
/**
* Return an array of all addresses, excluding
* @return an array of all addresses, excluding
* IPv6, local, broadcast, multicast, etc.
*/
public static String[] getAddresses() {

View File

@ -77,31 +77,26 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public boolean haveHighOutboundCapacity() { return (_manager == null ? false : _manager.haveHighOutboundCapacity()); }
/**
* Framed average clock skew of connected peers in seconds, or null if we cannot answer.
* Framed average clock skew of connected peers in seconds, or the clock offset if we cannot answer.
* Average is calculated over the middle "percentToInclude" peers.
*/
@Override
public Long getFramedAveragePeerClockSkew(int percentToInclude) {
if (_manager == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Returning null for framed averege peer clock skew (no transport manager)!");
return null;
// round toward zero
return Long.valueOf(_context.clock().getOffset() / 1000);
}
Vector skews = _manager.getClockSkews();
if (skews == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Returning null for framed average peer clock skew (no data)!");
return null;
return Long.valueOf(_context.clock().getOffset() / 1000);
}
if (skews.size() < 20) {
if (_log.shouldLog(Log.WARN))
_log.warn("Returning null for framed average peer clock skew (only " + skews.size() + " peers)!");
return null;
if (skews.size() < 5) {
return Long.valueOf(_context.clock().getOffset() / 1000);
}
// Going to calculate, sort them
Collections.sort(skews);
// Calculate frame size
int frameSize = (skews.size() * percentToInclude / 100);
int frameSize = Math.min((skews.size() * percentToInclude / 100), 2);
int first = (skews.size() / 2) - (frameSize / 2);
int last = (skews.size() / 2) + (frameSize / 2);
// Sum skew values
@ -112,11 +107,8 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
_log.debug("Adding clock skew " + i + " valued " + value + " s.");
sum = sum + value;
}
// Calculate average
Long framedAverageClockSkew = new Long(sum / frameSize);
if (_log.shouldLog(Log.INFO))
_log.info("Our framed average peer clock skew is " + framedAverageClockSkew + " s.");
return framedAverageClockSkew;
// Calculate average (round toward zero)
return Long.valueOf(sum / frameSize);
}
public List getBids(OutNetMessage msg) {

View File

@ -602,8 +602,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
OutNetMessage msg = null;
synchronized (_outbound) {
if (_currentOutbound != null) {
if (_log.shouldLog(Log.WARN))
_log.warn("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued");
if (_log.shouldLog(Log.INFO))
_log.info("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued");
return;
}
//throw new RuntimeException("We should not be preparing a write while we still have one pending");
@ -772,8 +772,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private long acquiredOn;
PrepBuffer() {
unencrypted = new byte[16*1024];
base = new byte[16*1024];
unencrypted = new byte[BUFFER_SIZE];
base = new byte[BUFFER_SIZE];
pad = new byte[16];
crc = new Adler32();
}
@ -1033,7 +1033,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/** _decryptBlockBuf contains another cleartext block of I2NP to parse */
private boolean recvUnencryptedI2NP() {
_curReadState.receiveBlock(_decryptBlockBuf);
if (_curReadState.getSize() > 16*1024) {
if (_curReadState.getSize() > BUFFER_SIZE) {
if (_log.shouldLog(Log.ERROR))
_log.error("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString());
_context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize(), getUptime());
@ -1112,7 +1112,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
return obj == this;
}
private final static List _i2npHandlers = new ArrayList(4);
private static final int MAX_HANDLERS = 4;
private final static List _i2npHandlers = new ArrayList(MAX_HANDLERS);
private final static I2NPMessageHandler acquireHandler(RouterContext ctx) {
I2NPMessageHandler rv = null;
synchronized (_i2npHandlers) {
@ -1125,7 +1126,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
private static void releaseHandler(I2NPMessageHandler handler) {
synchronized (_i2npHandlers) {
if (_i2npHandlers.size() < 4)
if (_i2npHandlers.size() < MAX_HANDLERS)
_i2npHandlers.add(handler);
}
}
@ -1137,13 +1138,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
byte data[];
ByteArrayInputStream bais;
public DataBuf() {
data = new byte[16*1024];
data = new byte[BUFFER_SIZE];
bais = new ByteArrayInputStream(data);
}
}
private static int MAX_DATA_READ_BUFS = 16;
private final static List _dataReadBufs = new ArrayList(16);
private static final int MAX_DATA_READ_BUFS = 16;
private final static List _dataReadBufs = new ArrayList(MAX_DATA_READ_BUFS);
private static DataBuf acquireReadBuf() {
synchronized (_dataReadBufs) {
if (_dataReadBufs.size() > 0)
@ -1178,7 +1179,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_crc = new Adler32();
init();
}
public void init() {
private void init() {
_size = -1;
_nextWrite = 0;
_expectedCrc = -1;
@ -1268,11 +1269,15 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
I2NPMessage read = h.readMessage(_dataBuf.bais);
long timeToRecv = System.currentTimeMillis() - _stateBegin;
releaseHandler(h);
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0)
+ " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes on " + toString());
if (_log.shouldLog(Log.INFO))
_log.info("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0)
+ " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes on " + NTCPConnection.this.toString());
_context.statManager().addRateData("ntcp.receiveTime", timeToRecv, timeToRecv);
_context.statManager().addRateData("ntcp.receiveSize", _size, timeToRecv);
// FIXME move end of try block here.
// On the input side, move releaseHandler() and init() to a finally block.
if (read != null) {
_transport.messageReceived(read, _remotePeer, null, timeToRecv, _size);
if (_messagesRead <= 0)
@ -1283,23 +1288,27 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// get it ready for the next I2NP message
init();
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error parsing I2NP message", ioe);
if (_log.shouldLog(Log.WARN))
_log.warn("Error parsing I2NP message", ioe);
_context.statManager().addRateData("ntcp.corruptI2NPIOE", 1, getUptime());
close();
// handler and databuf are lost
return;
} catch (I2NPMessageException ime) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error parsing I2NP message", ime);
if (_log.shouldLog(Log.WARN))
_log.warn("Error parsing I2NP message", ime);
_context.statManager().addRateData("ntcp.corruptI2NPIME", 1, getUptime());
close();
// handler and databuf are lost
return;
}
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks);
_context.statManager().addRateData("ntcp.corruptI2NPCRC", 1, getUptime());
// should we try to read in the message and keep going?
close();
// databuf is lost
return;
}
}

View File

@ -27,6 +27,7 @@ public class InboundMessageState {
private int _lastFragment;
private long _receiveBegin;
private int _completeSize;
private boolean _released;
/** expire after 10s */
private static final long MAX_RECEIVE_TIME = 10*1000;
@ -156,9 +157,15 @@ public class InboundMessageState {
for (int i = 0; i < _fragments.length; i++)
_fragmentCache.release(_fragments[i]);
//_fragments = null;
_released = true;
}
public ByteArray[] getFragments() {
if (_released) {
RuntimeException e = new RuntimeException("Use after free: " + toString());
_log.error("SSU IMS", e);
throw e;
}
return _fragments;
}
public int getFragmentCount() { return _lastFragment+1; }

View File

@ -23,9 +23,9 @@ public class IntroductionManager {
private UDPTransport _transport;
private PacketBuilder _builder;
/** map of relay tag to PeerState that should receive the introduction */
private Map _outbound;
private Map<Long, PeerState> _outbound;
/** list of peers (PeerState) who have given us introduction tags */
private final List _inbound;
private final List<PeerState> _inbound;
public IntroductionManager(RouterContext ctx, UDPTransport transport) {
_context = ctx;
@ -74,7 +74,7 @@ public class IntroductionManager {
}
public PeerState get(long id) {
return (PeerState)_outbound.get(new Long(id));
return _outbound.get(new Long(id));
}
/**
@ -90,7 +90,7 @@ public class IntroductionManager {
* and we want to keep our introducers valid.
*/
public int pickInbound(Properties ssuOptions, int howMany) {
List peers = null;
List<PeerState> peers = null;
int start = _context.random().nextInt(Integer.MAX_VALUE);
synchronized (_inbound) {
if (_log.shouldLog(Log.DEBUG))
@ -103,7 +103,7 @@ public class IntroductionManager {
int found = 0;
long inactivityCutoff = _context.clock().now() - (UDPTransport.EXPIRE_TIMEOUT / 2);
for (int i = 0; i < sz && found < howMany; i++) {
PeerState cur = (PeerState)peers.get((start + i) % sz);
PeerState cur = peers.get((start + i) % sz);
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(cur.getRemotePeer());
if (ri == null) {
if (_log.shouldLog(Log.INFO))
@ -144,7 +144,7 @@ public class IntroductionManager {
long pingCutoff = _context.clock().now() - (2 * 60 * 60 * 1000);
inactivityCutoff = _context.clock().now() - (UDPTransport.EXPIRE_TIMEOUT / 4);
for (int i = 0; i < sz; i++) {
PeerState cur = (PeerState)peers.get(i);
PeerState cur = peers.get(i);
if (cur.getIntroducerTime() > pingCutoff &&
cur.getLastSendTime() < inactivityCutoff) {
if (_log.shouldLog(Log.INFO))
@ -157,6 +157,18 @@ public class IntroductionManager {
return found;
}
/**
* Not as elaborate as pickInbound() above.
* Just a quick check to see how many volunteers we know,
* which the Transport uses to see if we need more.
* @return number of peers that have volunteerd to introduce us
*/
int introducerCount() {
synchronized(_inbound) {
return _inbound.size();
}
}
void receiveRelayIntro(RemoteHostId bob, UDPPacketReader reader) {
if (_context.router().isHidden())
return;

View File

@ -1,6 +1,7 @@
package net.i2p.router.transport.udp;
import java.util.Arrays;
import java.util.Date;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
@ -33,6 +34,9 @@ public class OutboundMessageState {
private int _pushCount;
private short _maxSends;
// private int _nextSendFragment;
/** for tracking use-after-free bugs */
private boolean _released;
private Exception _releasedBy;
public static final int MAX_MSG_SIZE = 32 * 1024;
/** is this enough for a high-bandwidth router? */
@ -116,13 +120,22 @@ public class OutboundMessageState {
} catch (IllegalStateException ise) {
_cache.release(_messageBuf);
_messageBuf = null;
_released = true;
return false;
}
}
public void releaseResources() {
if (_messageBuf != null)
/**
* This is synchronized with writeFragment(),
* so we do not release (probably due to an ack) while we are retransmitting.
* Also prevent double-free
*/
public synchronized void releaseResources() {
if (_messageBuf != null && !_released) {
_cache.release(_messageBuf);
_released = true;
_releasedBy = new Exception ("Released on " + new Date() + " by:");
}
//_messageBuf = null;
}
@ -267,16 +280,50 @@ public class OutboundMessageState {
/**
* Write a part of the the message onto the specified buffer.
* See releaseResources() above for synchhronization information.
*
* @param out target to write
* @param outOffset into outOffset to begin writing
* @param fragmentNum fragment to write (0 indexed)
* @return bytesWritten
*/
public int writeFragment(byte out[], int outOffset, int fragmentNum) {
public synchronized int writeFragment(byte out[], int outOffset, int fragmentNum) {
if (_messageBuf == null) return -1;
if (_released) {
/******
Solved by synchronization with releaseResources() and simply returning -1.
Previous output:
23:50:57.013 ERROR [acket pusher] sport.udp.OutboundMessageState: SSU OMS Use after free
java.lang.Exception: Released on Wed Dec 23 23:50:57 GMT 2009 by:
at net.i2p.router.transport.udp.OutboundMessageState.releaseResources(OutboundMessageState.java:133)
at net.i2p.router.transport.udp.PeerState.acked(PeerState.java:1391)
at net.i2p.router.transport.udp.OutboundMessageFragments.acked(OutboundMessageFragments.java:404)
at net.i2p.router.transport.udp.InboundMessageFragments.receiveACKs(InboundMessageFragments.java:191)
at net.i2p.router.transport.udp.InboundMessageFragments.receiveData(InboundMessageFragments.java:77)
at net.i2p.router.transport.udp.PacketHandler$Handler.handlePacket(PacketHandler.java:485)
at net.i2p.router.transport.udp.PacketHandler$Handler.receivePacket(PacketHandler.java:282)
at net.i2p.router.transport.udp.PacketHandler$Handler.handlePacket(PacketHandler.java:231)
at net.i2p.router.transport.udp.PacketHandler$Handler.run(PacketHandler.java:136)
at java.lang.Thread.run(Thread.java:619)
at net.i2p.util.I2PThread.run(I2PThread.java:71)
23:50:57.014 ERROR [acket pusher] ter.transport.udp.PacketPusher: SSU Output Queue Error
java.lang.RuntimeException: SSU OMS Use after free: Message 2381821417 with 4 fragments of size 0 volleys: 2 lifetime: 1258 pending fragments: 0 1 2 3
at net.i2p.router.transport.udp.OutboundMessageState.writeFragment(OutboundMessageState.java:298)
at net.i2p.router.transport.udp.PacketBuilder.buildPacket(PacketBuilder.java:170)
at net.i2p.router.transport.udp.OutboundMessageFragments.preparePackets(OutboundMessageFragments.java:332)
at net.i2p.router.transport.udp.OutboundMessageFragments.getNextVolley(OutboundMessageFragments.java:297)
at net.i2p.router.transport.udp.PacketPusher.run(PacketPusher.java:38)
at java.lang.Thread.run(Thread.java:619)
at net.i2p.util.I2PThread.run(I2PThread.java:71)
*******/
if (_log.shouldLog(Log.WARN))
_log.log(Log.WARN, "SSU OMS Use after free: " + toString(), _releasedBy);
return -1;
//throw new RuntimeException("SSU OMS Use after free: " + toString());
}
int start = _fragmentSize * fragmentNum;
int end = start + fragmentSize(fragmentNum);
if (_messageBuf == null) return -1;
int toSend = end - start;
byte buf[] = _messageBuf.getData();
if ( (buf != null) && (start + toSend < buf.length) && (out != null) && (outOffset + toSend < out.length) ) {

View File

@ -169,8 +169,14 @@ public class PacketBuilder {
int sizeWritten = state.writeFragment(data, off, fragment);
if (sizeWritten != size) {
_log.error("Size written: " + sizeWritten + " but size: " + size
+ " for fragment " + fragment + " of " + state.getMessageId());
if (sizeWritten < 0) {
// probably already freed from OutboundMessageState
if (_log.shouldLog(Log.WARN))
_log.warn("Write failed for fragment " + fragment + " of " + state.getMessageId());
} else {
_log.error("Size written: " + sizeWritten + " but size: " + size
+ " for fragment " + fragment + " of " + state.getMessageId());
}
packet.release();
return null;
} else if (_log.shouldLog(Log.DEBUG))

View File

@ -43,7 +43,7 @@ public class PacketPusher implements Runnable {
}
}
} catch (Exception e) {
_log.log(Log.CRIT, "Error pushing", e);
_log.error("SSU Output Queue Error", e);
}
}
}

View File

@ -132,6 +132,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** remember IP changes */
public static final String PROP_IP= "i2np.lastIP";
public static final String PROP_IP_CHANGE = "i2np.lastIPChange";
public static final String PROP_LAPTOP_MODE = "i2np.laptopMode";
/** do we require introducers, regardless of our status? */
public static final String PROP_FORCE_INTRODUCERS = "i2np.udp.forceIntroducers";
@ -461,13 +462,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
boolean updated = false;
boolean fireTest = false;
if (_log.shouldLog(Log.WARN))
_log.warn("Change address? status = " + _reachabilityStatus +
" diff = " + (_context.clock().now() - _reachabilityStatusLastUpdated) +
" old = " + _externalListenHost + ':' + _externalListenPort +
" new = " + DataHelper.toString(ourIP) + ':' + ourPort);
synchronized (this) {
if ( (_externalListenHost == null) ||
(!eq(_externalListenHost.getAddress(), _externalListenPort, ourIP, ourPort)) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Change address? status = " + _reachabilityStatus +
" diff = " + (_context.clock().now() - _reachabilityStatusLastUpdated) +
" old = " + _externalListenHost + ':' + _externalListenPort);
if ( (_reachabilityStatus != CommSystemFacade.STATUS_OK) ||
(_externalListenHost == null) || (_externalListenPort <= 0) ||
(_context.clock().now() - _reachabilityStatusLastUpdated > 2*TEST_FREQUENCY) ) {
@ -514,9 +517,35 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// store these for laptop-mode (change ident on restart... or every time... when IP changes)
String oldIP = _context.getProperty(PROP_IP);
if (!_externalListenHost.getHostAddress().equals(oldIP)) {
long lastChanged = 0;
long now = _context.clock().now();
String lcs = _context.getProperty(PROP_IP_CHANGE);
if (lcs != null) {
try {
lastChanged = Long.parseLong(lcs);
} catch (NumberFormatException nfe) {}
}
_context.router().setConfigSetting(PROP_IP, _externalListenHost.getHostAddress());
_context.router().setConfigSetting(PROP_IP_CHANGE, "" + _context.clock().now());
_context.router().setConfigSetting(PROP_IP_CHANGE, "" + now);
_context.router().saveConfig();
// laptop mode
// For now, only do this at startup
if (oldIP != null &&
System.getProperty("wrapper.version") != null &&
Boolean.valueOf(_context.getProperty(PROP_LAPTOP_MODE)).booleanValue() &&
now - lastChanged > 10*60*1000 &&
_context.router().getUptime() < 10*60*1000) {
_log.log(Log.CRIT, "IP changed, restarting with a new identity and port");
// this removes the UDP port config
_context.router().killKeys();
// do we need WrapperManager.signalStopped() like in ConfigServiceHandler ???
// without it, the wrapper complains "shutdown unexpectedly"
// but we can't have that dependency in the router
_context.router().shutdown(Router.EXIT_HARD_RESTART);
// doesn't return
}
}
_context.router().rebuildRouterInfo();
}
@ -968,6 +997,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return _endpoint.send(packet);
}
/** minimum active peers to maintain IP detection, etc. */
private static final int MIN_PEERS = 3;
/** minimum peers volunteering to be introducers if we need that */
private static final int MIN_INTRODUCER_POOL = 4;
public TransportBid bid(RouterInfo toAddress, long dataSize) {
if (dataSize > OutboundMessageState.MAX_MSG_SIZE) {
// NTCP max is lower, so msg will get dropped
@ -1015,11 +1049,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// Try to maintain at least 3 peers so we can determine our IP address and
// we have a selection to run peer tests with.
// If we are firewalled, and we don't have enough peers that volunteered to
// also introduce us, also bid aggressively so we are preferred over NTCP.
// (Otherwise we only talk UDP to those that are firewalled, and we will
// never get any introducers)
int count;
synchronized (_peersByIdent) {
count = _peersByIdent.size();
}
if (alwaysPreferUDP() || count < 3)
if (alwaysPreferUDP() || count < MIN_PEERS ||
(introducersRequired() && _introManager.introducerCount() < MIN_INTRODUCER_POOL))
return _slowPreferredBid;
else if (preferUDP())
return _slowBid;
@ -1157,6 +1196,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.info("Picked peers: " + found);
_introducersSelectedOn = _context.clock().now();
introducersIncluded = true;
} else {
// maybe we should fail to publish an address at all in this case?
if (_log.shouldLog(Log.WARN))
_log.warn("Need introducers but we don't know any");
}
}

View File

@ -2,7 +2,7 @@ package net.i2p.router.tunnel;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
@ -45,11 +45,10 @@ import net.i2p.util.Log;
* }
*/
public class BatchedPreprocessor extends TrivialPreprocessor {
private Log _log;
private long _pendingSince;
private String _name;
public BatchedPreprocessor(I2PAppContext ctx, String name) {
public BatchedPreprocessor(RouterContext ctx, String name) {
super(ctx);
_log = ctx.logManager().getLog(BatchedPreprocessor.class);
_name = name;
@ -68,6 +67,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
}
/** 1003 */
private static final int FULL_SIZE = PREPROCESSED_SIZE
- IV_SIZE
- 1 // 0x00 ending the padding
@ -77,11 +77,28 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
/* not final or private so the test code can adjust */
static long DEFAULT_DELAY = 100;
/** Wait up to this long before sending (flushing) a small tunnel message */
/**
* Wait up to this long before sending (flushing) a small tunnel message
* Warning - overridden in BatchedRouterPreprocessor
*/
protected long getSendDelay() { return DEFAULT_DELAY; }
/** if we have 50 messages queued that are too small, flush them anyway */
private static final int FORCE_BATCH_FLUSH = 50;
/**
* if we have this many messages queued that are too small, flush them anyway
* Even small messages take up about 200 bytes or so.
*/
private static final int FORCE_BATCH_FLUSH = 5;
/** If we have this much allocated, flush anyway.
* Tune this to trade off padding vs. fragmentation.
* The lower the value, the more we are willing to send off
* a tunnel msg that isn't full so the next message can start
* in a new tunnel msg to minimize fragmentation.
*
* This should be at most FULL_SIZE - (39 + a few), since
* you want to at least fit in the instructions and a few bytes.
*/
private static final int FULL_ENOUGH_SIZE = (FULL_SIZE * 80) / 100;
/** how long do we want to wait before flushing */
@Override
@ -98,8 +115,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
return rv;
}
/* See TunnelGateway.QueuePreprocessor for Javadoc */
@Override
public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.INFO))
display(0, pending, "Starting");
StringBuilder timingBuf = null;
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Preprocess queue with " + pending.size() + " to send");
@ -116,12 +136,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
int batchCount = 0;
int beforeLooping = pending.size();
// loop until the queue is empty
while (pending.size() > 0) {
int allocated = 0;
long beforePendingLoop = System.currentTimeMillis();
// loop until we fill up a single message
for (int i = 0; i < pending.size(); i++) {
long pendingStart = System.currentTimeMillis();
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(i);
TunnelGateway.Pending msg = pending.get(i);
int instructionsSize = getInstructionsSize(msg);
instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize);
int curWanted = msg.getData().length - msg.getOffset() + instructionsSize;
@ -135,7 +158,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
// the instructions alone exceed the size, so we won't get any
// of the message into it. don't include it
i--;
msg = (TunnelGateway.Pending)pending.get(i);
msg = pending.get(i);
allocated -= curWanted;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Pushback of " + curWanted + " (message " + (i+1) + " in " + pending + ")");
@ -144,18 +167,22 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
long waited = _context.clock().now() - _pendingSince;
_context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), waited);
}
// Send the message
long beforeSend = System.currentTimeMillis();
_pendingSince = 0;
send(pending, 0, i, sender, rec);
_context.statManager().addRateData("tunnel.batchFullFragments", 1, 0);
long afterSend = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO))
_log.info("Allocated=" + allocated + " so we sent " + (i+1)
+ " (last complete? " + (msg.getOffset() >= msg.getData().length)
+ ", off=" + msg.getOffset() + ", count=" + pending.size() + ")");
display(allocated, pending, "Sent the message with " + (i+1));
//_log.info(_name + ": Allocated=" + allocated + "B, Sent " + (i+1)
// + " msgs (last complete? " + (msg.getOffset() >= msg.getData().length)
// + ", off=" + msg.getOffset() + ", pending=" + pending.size() + ")");
// Remove what we sent from the pending queue
for (int j = 0; j < i; j++) {
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
TunnelGateway.Pending cur = pending.remove(0);
if (cur.getOffset() < cur.getData().length)
throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset()
+ " len=" + cur.getData().length + " alloc=" + allocated);
@ -167,7 +194,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
}
if (msg.getOffset() >= msg.getData().length) {
// ok, this last message fit perfectly, remove it too
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
TunnelGateway.Pending cur = pending.remove(0);
if (timingBuf != null)
timingBuf.append(" sent perfect fit " + cur).append(".");
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), msg.getData().length, msg.getMessageIds(), "flushed tail, remaining: " + pending);
@ -186,20 +213,24 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
+ "/" + (beforeSend-start)
+ " pending current " + (pendingEnd-pendingStart)).append(".");
break;
}
} // if >= full size
if (timingBuf != null)
timingBuf.append(" After pending loop " + (System.currentTimeMillis()-beforePendingLoop)).append(".");
}
} // for
long afterCleared = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO))
display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size()));
long afterDisplayed = System.currentTimeMillis();
if (allocated > 0) {
// after going through the entire pending list, we still don't
// have enough data to send a full message
// After going through the entire pending list, we have only a partial message.
// We might flush it or might not, but we are returning either way.
if ( (pending.size() > FORCE_BATCH_FLUSH) || // enough msgs - or
( (_pendingSince > 0) && (getDelayAmount() <= 0) ) || // time to flush - or
(allocated >= FULL_ENOUGH_SIZE)) { // full enough
//(pending.get(0).getFragmentNumber() > 0)) { // don't delay anybody's last fragment,
// // which would be the first fragment in the message
if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) {
// not even a full message, but we want to flush it anyway
if (pending.size() > 1)
@ -209,21 +240,24 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
send(pending, 0, pending.size()-1, sender, rec);
_context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0);
// Remove everything in the outgoing message from the pending queue
int beforeSize = pending.size();
for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.get(i);
if (cur.getOffset() >= cur.getData().length) {
pending.remove(i);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining");
_context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0);
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
i--;
}
for (int i = 0; i < beforeSize; i++) {
TunnelGateway.Pending cur = pending.get(0);
if (cur.getOffset() < cur.getData().length)
break;
pending.remove(0);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining");
_context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0);
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
}
if (pending.size() > 0) {
// rare
_pendingSince = _context.clock().now();
_context.statManager().addRateData("tunnel.batchFlushRemaining", pending.size(), beforeSize);
display(allocated, pending, "flushed, some remain");
if (_log.shouldLog(Log.INFO))
display(allocated, pending, "flushed, some remain");
if (timingBuf != null) {
timingBuf.append(" flushed, some remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
@ -232,12 +266,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
}
return true;
} else {
long delayAmount = _context.clock().now() - _pendingSince;
_pendingSince = 0;
long delayAmount = 0;
if (_pendingSince > 0) {
delayAmount = _context.clock().now() - _pendingSince;
_pendingSince = 0;
}
if (batchCount > 1)
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
if (_log.shouldLog(Log.INFO))
display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount);
display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount + "ms");
if (timingBuf != null) {
timingBuf.append(" flushed, none remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
@ -246,14 +283,17 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
}
return false;
}
// won't get here, we returned
} else {
// We didn't flush. Note that the messages remain on the pending list.
_context.statManager().addRateData("tunnel.batchDelay", pending.size(), 0);
if (_pendingSince <= 0)
_pendingSince = _context.clock().now();
if (batchCount > 1)
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
// not yet time to send the delayed flush
display(allocated, pending, "dont flush");
if (_log.shouldLog(Log.INFO))
display(allocated, pending, "dont flush");
if (timingBuf != null) {
timingBuf.append(" dont flush (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
@ -262,14 +302,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
}
return true;
}
// won't get here, we returned
} else {
// ok, we sent some, but haven't gone back for another
// pass yet. keep looping
if (timingBuf != null)
timingBuf.append(" Keep looping");
}
}
} // if allocated
} // while
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sent everything on the list (pending=" + pending.size() + ")");
@ -283,20 +324,25 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
return false;
}
private void display(long allocated, List pending, String title) {
/*
* Only if Log.INFO
*
* title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc.
*/
private void display(long allocated, List<TunnelGateway.Pending> pending, String title) {
if (_log.shouldLog(Log.INFO)) {
long highestDelay = 0;
StringBuilder buf = new StringBuilder();
StringBuilder buf = new StringBuilder(128);
buf.append(_name).append(": ");
buf.append(title);
buf.append(" allocated: ").append(allocated);
buf.append(" - allocated: ").append(allocated);
buf.append(" pending: ").append(pending.size());
if (_pendingSince > 0)
buf.append(" delay: ").append(getDelayAmount(false));
for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending curPending = (TunnelGateway.Pending)pending.get(i);
buf.append(" pending[").append(i).append("]: ");
buf.append(curPending.getOffset()).append("/").append(curPending.getData().length).append('/');
TunnelGateway.Pending curPending = pending.get(i);
buf.append(" [").append(i).append("]:");
buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/');
buf.append(curPending.getLifetime());
if (curPending.getLifetime() > highestDelay)
highestDelay = curPending.getLifetime();
@ -314,9 +360,14 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
* @param startAt first index in pending to send (inclusive)
* @param sendThrough last index in pending to send (inclusive)
*/
protected void send(List pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending);
// Might as well take a buf from the cache;
// However it will never be returned to the cache.
// (TunnelDataMessage will not wrap the buffer in a new ByteArray and release() it)
// See also TDM for more discussion.
byte preprocessed[] = _dataCache.acquire().getData();
int offset = 0;
@ -343,10 +394,10 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
_log.error("Error preprocessing the messages (offset=" + offset + " start=" + startAt + " through=" + sendThrough + " pending=" + pending.size() + " preproc=" + preprocessed.length);
return;
}
long msgId = sender.sendPreprocessed(preprocessed, rec);
for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.get(i);
TunnelGateway.Pending cur = pending.get(i);
cur.addMessageId(msgId);
}
if (_log.shouldLog(Log.DEBUG))
@ -359,9 +410,9 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
*
* @return new offset into the target for further bytes to be written
*/
private int writeFragments(List pending, int startAt, int sendThrough, byte target[], int offset) {
private int writeFragments(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, byte target[], int offset) {
for (int i = startAt; i <= sendThrough; i++) {
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(i);
TunnelGateway.Pending msg = pending.get(i);
int prevOffset = offset;
if (msg.getOffset() == 0) {
offset = writeFirstFragment(msg, target, offset);

View File

@ -11,75 +11,103 @@ import net.i2p.router.RouterContext;
*
*/
public class BatchedRouterPreprocessor extends BatchedPreprocessor {
private RouterContext _routerContext;
protected RouterContext _routerContext;
private TunnelCreatorConfig _config;
private HopConfig _hopConfig;
protected HopConfig _hopConfig;
private final long _sendDelay;
/**
* How frequently should we flush non-full messages, in milliseconds
* This goes in I2CP custom options for the pool.
* Only applies to OBGWs.
*/
public static final String PROP_BATCH_FREQUENCY = "batchFrequency";
/** This goes in router advanced config */
public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency";
public static final int DEFAULT_BATCH_FREQUENCY = 100;
/** for client OBGWs only (our data) */
public static final int OB_CLIENT_BATCH_FREQ = 100;
/** for exploratory OBGWs only (our tunnel tests and build messages) */
public static final int OB_EXPL_BATCH_FREQ = 150;
/** for IBGWs for efficiency (not our data) */
public static final int DEFAULT_BATCH_FREQUENCY = 250;
public BatchedRouterPreprocessor(RouterContext ctx) {
this(ctx, (HopConfig)null);
}
/** for OBGWs */
public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) {
super(ctx, getName(cfg));
_routerContext = ctx;
_config = cfg;
_sendDelay = initialSendDelay();
}
/** for IBGWs */
public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) {
super(ctx, getName(cfg));
_routerContext = ctx;
_hopConfig = cfg;
_sendDelay = initialSendDelay();
}
private static String getName(HopConfig cfg) {
if (cfg == null) return "[unknown]";
if (cfg == null) return "IB??";
if (cfg.getReceiveTunnel() != null)
return cfg.getReceiveTunnel().getTunnelId() + "";
return "IB " + cfg.getReceiveTunnel().getTunnelId();
else if (cfg.getSendTunnel() != null)
return cfg.getSendTunnel().getTunnelId() + "";
return "IB " + cfg.getSendTunnel().getTunnelId();
else
return "[n/a]";
return "IB??";
}
private static String getName(TunnelCreatorConfig cfg) {
if (cfg == null) return "[unknown]";
if (cfg == null) return "OB??";
if (cfg.getReceiveTunnelId(0) != null)
return cfg.getReceiveTunnelId(0).getTunnelId() + "";
return "OB " + cfg.getReceiveTunnelId(0).getTunnelId();
else if (cfg.getSendTunnelId(0) != null)
return cfg.getSendTunnelId(0).getTunnelId() + "";
return "OB " + cfg.getSendTunnelId(0).getTunnelId();
else
return "[n/a]";
return "OB??";
}
/** how long should we wait before flushing */
/**
* how long should we wait before flushing
*/
@Override
protected long getSendDelay() {
String freq = null;
protected long getSendDelay() { return _sendDelay; }
/*
* Extend the batching time for exploratory OBGWs, they have a lot of small
* tunnel test messages, and build messages that don't fit perfectly.
* And these are not as delay-sensitive.
*
* We won't pick up config changes after the preprocessor is created,
* but a preprocessor lifetime is only 10 minutes, so just wait...
*/
private long initialSendDelay() {
if (_config != null) {
Properties opts = _config.getOptions();
if (opts != null)
freq = opts.getProperty(PROP_BATCH_FREQUENCY);
}
if (freq == null)
freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY);
if (freq != null) {
try {
return Integer.parseInt(freq);
} catch (NumberFormatException nfe) {
return DEFAULT_BATCH_FREQUENCY;
if (opts != null) {
String freq = opts.getProperty(PROP_BATCH_FREQUENCY);
if (freq != null) {
try {
return Integer.parseInt(freq);
} catch (NumberFormatException nfe) {}
}
}
}
return DEFAULT_BATCH_FREQUENCY;
int def;
if (_config != null) {
if (_config.getDestination() != null)
def = OB_CLIENT_BATCH_FREQ;
else
def = OB_EXPL_BATCH_FREQ;
} else {
def = DEFAULT_BATCH_FREQUENCY;
}
return _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY, def);
}
@Override
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List messageIds, String msg) {
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds, String msg) {
if (_config != null)
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, _config, msg);
else

View File

@ -4,7 +4,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
@ -13,6 +12,7 @@ import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.I2NPMessageHandler;
import net.i2p.router.RouterContext;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
@ -21,35 +21,102 @@ import net.i2p.util.SimpleTimer;
* Handle fragments at the endpoint of a tunnel, peeling off fully completed
* I2NPMessages when they arrive, and dropping fragments if they take too long
* to arrive.
*
* From tunnel-alt.html:
<p>When the gateway wants to deliver data through the tunnel, it first
gathers zero or more <a href="i2np.html">I2NP</a> messages, selects how much padding will be used,
fragments it across the necessary number of 1KB tunnel messages, and decides how
each I2NP message should be handled by the tunnel endpoint, encoding that
data into the raw tunnel payload:</p>
<ul>
<li>The 4 byte Tunnel ID</li>
<li>The 16 byte IV</li>
<li>the first 4 bytes of the SHA256 of (the remaining preprocessed data concatenated
with the IV), using the IV as will be seen on the tunnel endpoint (for
outbound tunnels), or the IV as was seen on the tunnel gateway (for inbound
tunnels) (see below for IV processing).</li>
<li>0 or more bytes containing random nonzero integers</li>
<li>1 byte containing 0x00</li>
<li>a series of zero or more { instructions, message } pairs</li>
</ul>
<p>Note that the padding, if any, must be before the instruction/message pairs.
there is no provision for padding at the end.</p>
<p>The instructions are encoded with a single control byte, followed by any
necessary additional information. The first bit in that control byte determines
how the remainder of the header is interpreted - if it is not set, the message
is either not fragmented or this is the first fragment in the message. If it is
set, this is a follow on fragment.</p>
<p>With the first (leftmost or MSB) bit being 0, the instructions are:</p>
<ul>
<li>1 byte control byte:<pre>
bit 0: is follow on fragment? (1 = true, 0 = false, must be 0)
bits 1-2: delivery type
(0x0 = LOCAL, 0x01 = TUNNEL, 0x02 = ROUTER)
bit 3: delay included? (1 = true, 0 = false) (unimplemented)
bit 4: fragmented? (1 = true, 0 = false)
bit 5: extended options? (1 = true, 0 = false) (unimplemented)
bits 6-7: reserved</pre></li>
<li>if the delivery type was TUNNEL, a 4 byte tunnel ID</li>
<li>if the delivery type was TUNNEL or ROUTER, a 32 byte router hash</li>
<li>if the delay included flag is true, a 1 byte value (unimplemented):<pre>
bit 0: type (0 = strict, 1 = randomized)
bits 1-7: delay exponent (2^value minutes)</pre></li>
<li>if the fragmented flag is true, a 4 byte message ID</li>
<li>if the extended options flag is true (unimplemented):<pre>
= a 1 byte option size (in bytes)
= that many bytes</pre></li>
<li>2 byte size of the I2NP message or this fragment</li>
</ul>
<p>If the first bit being 1, the instructions are:</p>
<ul>
<li>1 byte control byte:<pre>
bit 0: is follow on fragment? (1 = true, 0 = false, must be 1)
bits 1-6: fragment number
bit 7: is last? (1 = true, 0 = false)</pre></li>
<li>4 byte message ID (same one defined in the first fragment)</li>
<li>2 byte size of this fragment</li>
</ul>
<p>The I2NP message is encoded in its standard form, and the
preprocessed payload must be padded to a multiple of 16 bytes.
The total size, including the tunnel ID and IV, is 1028 bytes.
</p>
*
*/
public class FragmentHandler {
private I2PAppContext _context;
private Log _log;
private final Map _fragmentedMessages;
protected RouterContext _context;
protected Log _log;
private final Map<Long, FragmentedMessage> _fragmentedMessages;
private DefragmentedReceiver _receiver;
private int _completed;
private int _failed;
private static final long[] RATES = { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 };
/** don't wait more than 60s to defragment the partial message */
static long MAX_DEFRAGMENT_TIME = 60*1000;
private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE);
public FragmentHandler(I2PAppContext context, DefragmentedReceiver receiver) {
public FragmentHandler(RouterContext context, DefragmentedReceiver receiver) {
_context = context;
_log = context.logManager().getLog(FragmentHandler.class);
_fragmentedMessages = new HashMap(4);
_fragmentedMessages = new HashMap(8);
_receiver = receiver;
_context.statManager().createRateStat("tunnel.smallFragments", "How many pad bytes are in small fragments?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
"Tunnels", RATES);
_context.statManager().createRateStat("tunnel.fullFragments", "How many tunnel messages use the full data area?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
"Tunnels", RATES);
_context.statManager().createRateStat("tunnel.fragmentedComplete", "How many fragments were in a completely received message?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
"Tunnels", RATES);
_context.statManager().createRateStat("tunnel.fragmentedDropped", "How many fragments were in a partially received yet failed message?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
"Tunnels", RATES);
_context.statManager().createRateStat("tunnel.corruptMessage", "How many corrupted messages arrived?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
"Tunnels", RATES);
}
/**
@ -122,6 +189,9 @@ public class FragmentHandler {
// each of the FragmentedMessages populated make a copy out of the
// payload, which they release separately, so we can release
// immediately
//
// This is certainly interesting, to wrap the 1024-byte array in a new ByteArray
// in order to put it in the pool, but it shouldn't cause any harm.
_cache.release(new ByteArray(preprocessed));
}
}
@ -141,6 +211,13 @@ public class FragmentHandler {
* this.
*/
private boolean verifyPreprocessed(byte preprocessed[], int offset, int length) {
// ByteCache/ByteArray corruption detection
//byte[] orig = new byte[length];
//System.arraycopy(preprocessed, 0, orig, 0, length);
//try {
// Thread.sleep(75);
//} catch (InterruptedException ie) {}
// now we need to verify that the message was received correctly
int paddingEnd = HopProcessor.IV_LENGTH + 4;
while (preprocessed[offset+paddingEnd] != (byte)0x00) {
@ -149,7 +226,7 @@ public class FragmentHandler {
if (_log.shouldLog(Log.WARN))
_log.warn("cannot verify, going past the end [off="
+ offset + " len=" + length + " paddingEnd="
+ paddingEnd + " data:\n"
+ paddingEnd + " data: "
+ Base64.encode(preprocessed, offset, length));
return false;
}
@ -165,21 +242,19 @@ public class FragmentHandler {
_log.debug("endpoint IV: " + Base64.encode(preV, validLength - HopProcessor.IV_LENGTH, HopProcessor.IV_LENGTH));
Hash v = _context.sha().calculateHash(preV, 0, validLength);
_validateCache.release(ba);
//Hash v = _context.sha().calculateHash(preV, 0, validLength);
boolean eq = DataHelper.eq(v.getData(), 0, preprocessed, offset + HopProcessor.IV_LENGTH, 4);
if (!eq) {
if (_log.shouldLog(Log.WARN))
_log.warn("Corrupt tunnel message - verification fails: \n" + Base64.encode(preprocessed, offset+HopProcessor.IV_LENGTH, 4)
+ "\n" + Base64.encode(v.getData(), 0, 4));
if (_log.shouldLog(Log.WARN))
_log.warn("nomatching endpoint: # pad bytes: " + (paddingEnd-(HopProcessor.IV_LENGTH+4)-1) + "\n"
+ " offset=" + offset + " length=" + length + " paddingEnd=" + paddingEnd
+ Base64.encode(preprocessed, offset, length));
if (_log.shouldLog(Log.WARN)) {
_log.warn("Corrupt tunnel message - verification fails: " + Base64.encode(preprocessed, offset+HopProcessor.IV_LENGTH, 4)
+ " != " + Base64.encode(v.getData(), 0, 4));
_log.warn("No matching endpoint: # pad bytes: " + (paddingEnd-(HopProcessor.IV_LENGTH+4)-1)
+ " offset=" + offset + " length=" + length + " paddingEnd=" + paddingEnd + ' '
+ Base64.encode(preprocessed, offset, length), new Exception("trace"));
}
}
_validateCache.release(ba);
if (eq) {
int excessPadding = paddingEnd - (HopProcessor.IV_LENGTH + 4 + 1);
if (excessPadding > 0) // suboptimal fragmentation
@ -188,6 +263,13 @@ public class FragmentHandler {
_context.statManager().addRateData("tunnel.fullFragments", 1, 0);
}
// ByteCache/ByteArray corruption detection
//if (!DataHelper.eq(preprocessed, 0, orig, 0, length)) {
// _log.log(Log.CRIT, "Not equal! orig =\n" + Base64.encode(orig, 0, length) +
// "\nprep =\n" + Base64.encode(preprocessed, 0, length),
// new Exception("hosed"));
//}
return eq;
}
@ -197,11 +279,12 @@ public class FragmentHandler {
static final byte MASK_TYPE = (byte)(3 << 5);
/** is this the first of a fragmented message? */
static final byte MASK_FRAGMENTED = (byte)(1 << 3);
/** are there follow up headers? */
/** are there follow up headers? UNIMPLEMENTED */
static final byte MASK_EXTENDED = (byte)(1 << 2);
/** for subsequent fragments, which bits contain the fragment #? */
private static final int MASK_FRAGMENT_NUM = (byte)((1 << 7) - 2); // 0x7E;
/** LOCAL isn't explicitly used anywhere, because the code knows that it is 0 */
static final short TYPE_LOCAL = 0;
static final short TYPE_TUNNEL = 1;
static final short TYPE_ROUTER = 2;
@ -211,8 +294,8 @@ public class FragmentHandler {
*/
private int receiveFragment(byte preprocessed[], int offset, int length) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("CONTROL: " + Integer.toHexString(preprocessed[offset]) + " / "
+ "/" + Base64.encode(preprocessed, offset, 1) + " at offset " + offset);
_log.debug("CONTROL: 0x" + Integer.toHexString(preprocessed[offset] & 0xff) +
" at offset " + offset);
if (0 == (preprocessed[offset] & MASK_IS_SUBSEQUENT))
return receiveInitialFragment(preprocessed, offset, length);
else
@ -273,42 +356,48 @@ public class FragmentHandler {
int size = (int)DataHelper.fromLong(preprocessed, offset, 2);
offset += 2;
boolean isNew = false;
FragmentedMessage msg = null;
if (fragmented) {
synchronized (_fragmentedMessages) {
msg = (FragmentedMessage)_fragmentedMessages.get(new Long(messageId));
msg = _fragmentedMessages.get(new Long(messageId));
if (msg == null) {
msg = new FragmentedMessage(_context);
_fragmentedMessages.put(new Long(messageId), msg);
isNew = true;
}
}
} else {
msg = new FragmentedMessage(_context);
}
boolean ok = msg.receive(messageId, preprocessed, offset, size, !fragmented, router, tunnelId);
if (!ok) return -1;
if (msg.isComplete()) {
if (fragmented) {
synchronized (_fragmentedMessages) {
_fragmentedMessages.remove(new Long(messageId));
if (fragmented) {
// synchronized is required, fragments may be arriving in different threads
synchronized(msg) {
boolean ok = msg.receive(messageId, preprocessed, offset, size, false, router, tunnelId);
if (!ok) return -1;
if (msg.isComplete()) {
synchronized (_fragmentedMessages) {
_fragmentedMessages.remove(new Long(messageId));
}
if (msg.getExpireEvent() != null)
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
receiveComplete(msg);
} else {
noteReception(msg.getMessageId(), 0, msg);
if (msg.getExpireEvent() == null) {
RemoveFailed evt = new RemoveFailed(msg);
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
}
}
}
if (msg.getExpireEvent() != null)
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
} else {
// synchronized not required if !fragmented
boolean ok = msg.receive(messageId, preprocessed, offset, size, true, router, tunnelId);
if (!ok) return -1;
// always complete, never an expire event
receiveComplete(msg);
} else {
noteReception(msg.getMessageId(), 0, msg);
}
if (isNew && fragmented && !msg.isComplete()) {
RemoveFailed evt = new RemoveFailed(msg);
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
}
offset += size;
@ -340,38 +429,38 @@ public class FragmentHandler {
throw new RuntimeException("Preprocessed message was invalid [messageId =" + messageId + " size="
+ size + " offset=" + offset + " fragment=" + fragmentNum);
boolean isNew = false;
FragmentedMessage msg = null;
synchronized (_fragmentedMessages) {
msg = (FragmentedMessage)_fragmentedMessages.get(new Long(messageId));
msg = _fragmentedMessages.get(new Long(messageId));
if (msg == null) {
msg = new FragmentedMessage(_context);
_fragmentedMessages.put(new Long(messageId), msg);
isNew = true;
}
}
boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast);
if (!ok) return -1;
if (msg.isComplete()) {
synchronized (_fragmentedMessages) {
_fragmentedMessages.remove(new Long(messageId));
// synchronized is required, fragments may be arriving in different threads
synchronized(msg) {
boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast);
if (!ok) return -1;
if (msg.isComplete()) {
synchronized (_fragmentedMessages) {
_fragmentedMessages.remove(new Long(messageId));
}
if (msg.getExpireEvent() != null)
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
_context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime());
receiveComplete(msg);
} else {
noteReception(msg.getMessageId(), fragmentNum, msg);
if (msg.getExpireEvent() == null) {
RemoveFailed evt = new RemoveFailed(msg);
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
}
}
if (msg.getExpireEvent() != null)
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
_context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime());
receiveComplete(msg);
} else {
noteReception(msg.getMessageId(), fragmentNum, msg);
}
if (isNew && !msg.isComplete()) {
RemoveFailed evt = new RemoveFailed(msg);
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
}
offset += size;
@ -392,8 +481,8 @@ public class FragmentHandler {
if (data == null)
throw new I2NPMessageException("null data"); // fragments already released???
if (_log.shouldLog(Log.DEBUG))
_log.debug("RECV(" + data.length + "): " + Base64.encode(data)
+ " " + _context.sha().calculateHash(data).toBase64());
_log.debug("RECV(" + data.length + "): "); // + Base64.encode(data)
//+ " " + _context.sha().calculateHash(data).toBase64());
I2NPMessage m = new I2NPMessageHandler(_context).readMessage(data);
noteReception(m.getUniqueId(), fragmentCount-1, "complete: ");// + msg.toString());
noteCompletion(m.getUniqueId());
@ -441,15 +530,17 @@ public class FragmentHandler {
synchronized (_fragmentedMessages) {
removed = (null != _fragmentedMessages.remove(new Long(_msg.getMessageId())));
}
if (removed && !_msg.getReleased()) {
_failed++;
noteFailure(_msg.getMessageId(), _msg.toString());
if (_log.shouldLog(Log.WARN))
_log.warn("Dropped failed fragmented message: " + _msg);
_context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime());
_msg.failed();
} else {
// succeeded before timeout
synchronized (_msg) {
if (removed && !_msg.getReleased()) {
_failed++;
noteFailure(_msg.getMessageId(), _msg.toString());
if (_log.shouldLog(Log.WARN))
_log.warn("Dropped incomplete fragmented message: " + _msg);
_context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime());
_msg.failed();
} else {
// succeeded before timeout
}
}
}

View File

@ -21,6 +21,8 @@ import net.i2p.util.SimpleTimer;
* Gather fragments of I2NPMessages at a tunnel endpoint, making them available
* for reading when complete.
*
* Warning - this is all unsynchronized here - receivers must implement synchronization
*
*/
public class FragmentedMessage {
private I2PAppContext _context;
@ -186,6 +188,11 @@ public class FragmentedMessage {
public int getCompleteSize() {
if (!_lastReceived)
throw new IllegalStateException("wtf, don't get the completed size when we're not complete");
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("use after free in FragmentedMessage");
_log.error("FM completeSize()", e);
throw e;
}
int size = 0;
for (int i = 0; i <= _highFragmentNum; i++) {
ByteArray ba = _fragments[i];
@ -203,6 +210,11 @@ public class FragmentedMessage {
public void writeComplete(OutputStream out) throws IOException {
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("use after free in FragmentedMessage");
_log.error("FM writeComplete()", e);
throw e;
}
for (int i = 0; i <= _highFragmentNum; i++) {
ByteArray ba = _fragments[i];
out.write(ba.getData(), ba.getOffset(), ba.getValid());
@ -210,6 +222,11 @@ public class FragmentedMessage {
_completed = true;
}
public void writeComplete(byte target[], int offset) {
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("use after free in FragmentedMessage");
_log.error("FM writeComplete() 2", e);
throw e;
}
for (int i = 0; i <= _highFragmentNum; i++) {
ByteArray ba = _fragments[i];
System.arraycopy(ba.getData(), ba.getOffset(), target, offset, ba.getValid());
@ -239,6 +256,11 @@ public class FragmentedMessage {
*
*/
private void releaseFragments() {
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("double free in FragmentedMessage");
_log.error("FM releaseFragments()", e);
throw e;
}
_releasedAfter = getLifetime();
for (int i = 0; i <= _highFragmentNum; i++) {
ByteArray ba = _fragments[i];
@ -249,6 +271,7 @@ public class FragmentedMessage {
}
}
/****
public InputStream getInputStream() { return new FragmentInputStream(); }
private class FragmentInputStream extends InputStream {
private int _fragment;
@ -272,6 +295,7 @@ public class FragmentedMessage {
}
}
}
****/
@Override
public String toString() {
@ -282,7 +306,7 @@ public class FragmentedMessage {
if (ba != null)
buf.append(i).append(":").append(ba.getValid()).append(" bytes ");
else
buf.append(i).append(": missing ");
buf.append(i).append(":missing ");
}
buf.append(" highest received: ").append(_highFragmentNum);
buf.append(" last received? ").append(_lastReceived);
@ -299,6 +323,7 @@ public class FragmentedMessage {
return buf.toString();
}
/*****
public static void main(String args[]) {
try {
I2PAppContext ctx = I2PAppContext.getGlobalContext();
@ -325,4 +350,5 @@ public class FragmentedMessage {
e.printStackTrace();
}
}
******/
}

View File

@ -8,6 +8,8 @@ import net.i2p.util.ConcurrentHashSet;
/**
* waste lots of RAM
*
* @deprecated unused
*/
class HashSetIVValidator implements IVValidator {
private final Set<ByteArray> _received;

View File

@ -32,9 +32,11 @@ public class HopProcessor {
static final int IV_LENGTH = 16;
private static final ByteCache _cache = ByteCache.getInstance(128, IV_LENGTH);
/** @deprecated unused */
public HopProcessor(I2PAppContext ctx, HopConfig config) {
this(ctx, config, createValidator());
}
public HopProcessor(I2PAppContext ctx, HopConfig config, IVValidator validator) {
_context = ctx;
_log = ctx.logManager().getLog(HopProcessor.class);
@ -42,6 +44,7 @@ public class HopProcessor {
_validator = validator;
}
/** @deprecated unused */
protected static IVValidator createValidator() {
// yeah, we'll use an O(1) validator later (e.g. bloom filter)
return new HashSetIVValidator();
@ -88,10 +91,10 @@ public class HopProcessor {
encrypt(orig, offset, length);
updateIV(orig, offset);
}
if (_log.shouldLog(Log.DEBUG)) {
//if (_log.shouldLog(Log.DEBUG)) {
//_log.debug("Data after processing: " + Base64.encode(orig, IV_LENGTH, orig.length - IV_LENGTH));
//_log.debug("IV sent: " + Base64.encode(orig, 0, IV_LENGTH));
}
//}
return true;
}

View File

@ -1,6 +1,5 @@
package net.i2p.router.tunnel;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
@ -16,7 +15,7 @@ import net.i2p.util.Log;
*
*/
public class InboundEndpointProcessor {
private I2PAppContext _context;
private RouterContext _context;
private Log _log;
private TunnelCreatorConfig _config;
private IVValidator _validator;
@ -24,10 +23,10 @@ public class InboundEndpointProcessor {
static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION;
private static final ByteCache _cache = ByteCache.getInstance(128, HopProcessor.IV_LENGTH);
public InboundEndpointProcessor(I2PAppContext ctx, TunnelCreatorConfig cfg) {
public InboundEndpointProcessor(RouterContext ctx, TunnelCreatorConfig cfg) {
this(ctx, cfg, DummyValidator.getInstance());
}
public InboundEndpointProcessor(I2PAppContext ctx, TunnelCreatorConfig cfg, IVValidator validator) {
public InboundEndpointProcessor(RouterContext ctx, TunnelCreatorConfig cfg, IVValidator validator) {
_context = ctx;
_log = ctx.logManager().getLog(InboundEndpointProcessor.class);
_config = cfg;
@ -73,23 +72,19 @@ public class InboundEndpointProcessor {
_cache.release(ba);
// now for a little bookkeeping
RouterContext ctx = null;
if (_context instanceof RouterContext)
ctx = (RouterContext)_context;
if ( (ctx != null) && (_config.getLength() > 0) ) {
if (_config.getLength() > 0) {
int rtt = 0; // dunno... may not be related to an rtt
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received a " + length + "byte message through tunnel " + _config);
for (int i = 0; i < _config.getLength(); i++)
ctx.profileManager().tunnelDataPushed(_config.getPeer(i), rtt, length);
_context.profileManager().tunnelDataPushed(_config.getPeer(i), rtt, length);
_config.incrementVerifiedBytesTransferred(length);
}
return true;
}
private void decrypt(I2PAppContext ctx, TunnelCreatorConfig cfg, byte iv[], byte orig[], int offset, int length) {
private void decrypt(RouterContext ctx, TunnelCreatorConfig cfg, byte iv[], byte orig[], int offset, int length) {
Log log = ctx.logManager().getLog(OutboundGatewayProcessor.class);
ByteArray ba = _cache.acquire();
byte cur[] = ba.getData(); // new byte[HopProcessor.IV_LENGTH]; // so we dont malloc

View File

@ -35,8 +35,9 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver {
}
}
if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
return -1;
// We do this before the preprocessor now (i.e. before fragmentation)
//if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
// return -1;
_config.incrementSentMessages();
TunnelDataMessage msg = new TunnelDataMessage(_context);
msg.setData(encrypted);

View File

@ -1,7 +1,6 @@
package net.i2p.router.tunnel;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
/**
* Receive the preprocessed data for an inbound gateway, encrypt it, and forward
@ -9,16 +8,12 @@ import net.i2p.util.Log;
*
*/
public class InboundSender implements TunnelGateway.Sender {
private I2PAppContext _context;
private Log _log;
private InboundGatewayProcessor _processor;
static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION;
public InboundSender(I2PAppContext ctx, HopConfig config) {
_context = ctx;
_log = ctx.logManager().getLog(InboundSender.class);
_processor = new InboundGatewayProcessor(_context, config);
_processor = new InboundGatewayProcessor(ctx, config);
}
public long sendPreprocessed(byte[] preprocessed, TunnelGateway.Receiver receiver) {

View File

@ -30,7 +30,16 @@ public class OutboundTunnelEndpoint {
}
public void dispatch(TunnelDataMessage msg, Hash recvFrom) {
_config.incrementProcessedMessages();
_processor.process(msg.getData(), 0, msg.getData().length, recvFrom);
boolean ok = _processor.process(msg.getData(), 0, msg.getData().length, recvFrom);
if (!ok) {
// invalid IV
// If we pass it on to the handler, it will fail
// If we don't, the data buf won't get released from the cache... that's ok
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid IV, dropping at OBEP " + _config);
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
return;
}
_handler.receiveTunnelMessage(msg.getData(), 0, msg.getData().length);
}

View File

@ -3,14 +3,16 @@ package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* This is used for all gateways with more than zero hops.
*
* Serve as the gatekeeper for a tunnel, accepting messages, coallescing and/or
* fragmenting them before wrapping them up for tunnel delivery. The flow here
* is: <ol>
@ -32,7 +34,7 @@ import net.i2p.util.Log;
*
*/
public class PumpedTunnelGateway extends TunnelGateway {
private final List _prequeue;
private final List<Pending> _prequeue;
private TunnelGatewayPumper _pumper;
/**
@ -43,7 +45,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
* @param receiver this receives the encrypted message and forwards it off
* to the first hop
*/
public PumpedTunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
super(context, preprocessor, sender, receiver);
_prequeue = new ArrayList(4);
_pumper = pumper;
@ -77,8 +79,11 @@ public class PumpedTunnelGateway extends TunnelGateway {
* scheduling a later delayed flush as necessary. this allows the gw.add call to
* go quickly, rather than blocking its callers on potentially substantial
* processing.
*
* @param queueBuf Empty list for convenience, to use as a temporary buffer.
* Must be empty when called; will always be emptied before return.
*/
void pump(List queueBuf) {
void pump(List<Pending> queueBuf) {
synchronized (_prequeue) {
if (_prequeue.size() > 0) {
queueBuf.addAll(_prequeue);
@ -88,7 +93,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
}
}
long startAdd = System.currentTimeMillis();
long beforeLock = System.currentTimeMillis();
long beforeLock = startAdd;
long afterAdded = -1;
boolean delayedFlush = false;
long delayAmount = -1;
@ -108,7 +113,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
// expire any as necessary, even if its framented
for (int i = 0; i < _queue.size(); i++) {
Pending m = (Pending)_queue.get(i);
Pending m = _queue.get(i);
if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);

View File

@ -7,31 +7,27 @@ import net.i2p.util.Log;
* Minor extension to allow message history integration
*/
public class RouterFragmentHandler extends FragmentHandler {
private RouterContext _routerContext;
private Log _log;
public RouterFragmentHandler(RouterContext context, DefragmentedReceiver receiver) {
super(context, receiver);
_routerContext = context;
_log = context.logManager().getLog(RouterFragmentHandler.class);
}
@Override
protected void noteReception(long messageId, int fragmentId, Object status) {
if (_log.shouldLog(Log.INFO))
_log.info("Received fragment " + fragmentId + " for message " + messageId + ": " + status);
_routerContext.messageHistory().receiveTunnelFragment(messageId, fragmentId, status);
_context.messageHistory().receiveTunnelFragment(messageId, fragmentId, status);
}
@Override
protected void noteCompletion(long messageId) {
if (_log.shouldLog(Log.INFO))
_log.info("Received complete message " + messageId);
_routerContext.messageHistory().receiveTunnelFragmentComplete(messageId);
_context.messageHistory().receiveTunnelFragmentComplete(messageId);
}
@Override
protected void noteFailure(long messageId, String status) {
if (_log.shouldLog(Log.INFO))
_log.info("Dropped message " + messageId + ": " + status);
_routerContext.messageHistory().droppedFragmentedMessage(messageId, status);
_context.messageHistory().droppedFragmentedMessage(messageId, status);
}
}

View File

@ -0,0 +1,50 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.RouterContext;
/**
* Same as PTG, but check to see if a message should be dropped before queueing it.
* Used for IBGWs.
*
* @since 0.7.9
*/
public class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
/** saved so we can note messages that get dropped */
private HopConfig _config;
public ThrottledPumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender,
Receiver receiver, TunnelGatewayPumper pumper, HopConfig config) {
super(context, preprocessor, sender, receiver, pumper);
_config = config;
}
/**
* Possibly drop a message due to bandwidth before adding it to the preprocessor queue.
* We do this here instead of in the InboundGatewayReceiver because it is much smarter to drop
* whole I2NP messages, where we know the message type and length, rather than
* tunnel messages containing I2NP fragments.
*/
@Override
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
//_log.error("IBGW count: " + _config.getProcessedMessagesCount() + " type: " + msg.getType() + " size: " + msg.getMessageSize());
// Hard to do this exactly, but we'll assume 2:1 batching
// for the purpose of estimating outgoing size.
// We assume that it's the outbound bandwidth that is the issue...
int size = Math.max(msg.getMessageSize(), 1024/2);
if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW " + msg.getType(), size)) {
// this overstates the stat somewhat, but ok for now
int kb = (size + 1023) / 1024;
for (int i = 0; i < kb; i++)
_config.incrementProcessedMessages();
return;
}
super.add(msg, toRouter,toTunnel);
}
}

View File

@ -3,11 +3,11 @@ package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
@ -17,18 +17,27 @@ import net.i2p.util.Log;
* each of those out. This does not coallesce message fragments or delay for more
* optimal throughput.
*
* See FragmentHandler Javadoc for tunnel message fragment format
*/
public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
protected I2PAppContext _context;
private Log _log;
protected RouterContext _context;
protected Log _log;
public static final int PREPROCESSED_SIZE = 1024;
protected static final int IV_SIZE = HopProcessor.IV_LENGTH;
/**
* Here in tunnels, we take from the cache but never add to it.
* In other words, we take advantage of other places in the router also using 1024-byte ByteCaches
* (since ByteCache only maintains once instance for each size)
* Used in BatchedPreprocessor; see add'l comments there
*/
protected static final ByteCache _dataCache = ByteCache.getInstance(32, PREPROCESSED_SIZE);
protected static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
protected static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH);
private static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
private static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH);
public TrivialPreprocessor(I2PAppContext ctx) {
public TrivialPreprocessor(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(TrivialPreprocessor.class);
}
@ -40,8 +49,10 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
* Return true if there were messages remaining, and we should queue up
* a delayed flush to clear them
*
* NOTE: Unused here, see BatchedPreprocessor override, super is not called.
*/
public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (true) throw new IllegalArgumentException("unused, right?");
long begin = System.currentTimeMillis();
StringBuilder buf = null;
if (_log.shouldLog(Log.DEBUG)) {
@ -49,7 +60,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
buf.append("Trivial preprocessing of ").append(pending.size()).append(" ");
}
while (pending.size() > 0) {
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.remove(0);
TunnelGateway.Pending msg = pending.remove(0);
long beforePreproc = System.currentTimeMillis();
byte preprocessed[][] = preprocess(msg);
long afterPreproc = System.currentTimeMillis();
@ -84,8 +95,11 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
return false;
}
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List messageIds, String msg) {}
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds, String msg) {}
/*
* @deprecated unused except by above
*/
private byte[][] preprocess(TunnelGateway.Pending msg) {
List fragments = new ArrayList(1);
@ -109,6 +123,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
* bytes after the IV, followed by the first 4 bytes of that SHA256, lining up
* exactly to meet the beginning of the instructions. (i hope)
*
* @deprecated unused except by above
*/
private byte[] preprocessFragment(TunnelGateway.Pending msg) {
byte target[] = _dataCache.acquire().getData();
@ -175,6 +190,8 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
//_log.debug("# pad bytes: " + numPadBytes + " payloadLength: " + payloadLength + " instructions: " + instructionsLength);
int paddingRemaining = numPadBytes;
// FIXME inefficient, waste of 3/4 of the entropy
// Should get a byte array of random, change all the zeros to something else, and ArrayCopy
while (paddingRemaining > 0) {
byte b = (byte)(_context.random().nextInt() & 0xFF);
if (b != 0x00) {
@ -196,7 +213,11 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
private static final byte MASK_TYPE = FragmentHandler.MASK_TYPE;
/** is this the first of a fragmented message? */
private static final byte MASK_FRAGMENTED = FragmentHandler.MASK_FRAGMENTED;
/** are there follow up headers? */
/**
* are there follow up headers?
* @deprecated unimplemented
*/
private static final byte MASK_EXTENDED = FragmentHandler.MASK_EXTENDED;
private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5);
private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5);
@ -311,19 +332,30 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
return offset;
}
/**
* @return generally 3 or 35 or 39 for first fragment, 7 for subsequent fragments.
*
* Does NOT include 4 for the message ID if the message will be fragmented;
* call getInstructionAugmentationSize() for that.
*/
protected int getInstructionsSize(TunnelGateway.Pending msg) {
if (msg.getFragmentNumber() > 0)
return 7;
// control byte
int header = 1;
// tunnel ID
if (msg.getToTunnel() != null)
header += 4;
// router hash
if (msg.getToRouter() != null)
header += 32;
// size
header += 2;
return header;
}
/** @return 0 or 4 */
protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) {
int payloadLength = msg.getData().length - msg.getOffset();
if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {

View File

@ -7,6 +7,7 @@ import net.i2p.router.RouterContext;
/**
* Minor extension to track fragmentation
*
* @deprecated unused
*/
public class TrivialRouterPreprocessor extends TrivialPreprocessor {
private RouterContext _routerContext;
@ -16,7 +17,7 @@ public class TrivialRouterPreprocessor extends TrivialPreprocessor {
_routerContext = ctx;
}
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List messageIds) {
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds) {
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, null);
}
}

View File

@ -131,17 +131,20 @@ public class TunnelDispatcher implements Service {
new long[] { 60*1000l, 10*60*1000l, 60*60*1000l });
}
/** for IBGW */
private TunnelGateway.QueuePreprocessor createPreprocessor(HopConfig cfg) {
if (true)
//if (true)
return new BatchedRouterPreprocessor(_context, cfg);
else
return new TrivialRouterPreprocessor(_context);
//else
// return new TrivialRouterPreprocessor(_context);
}
/** for OBGW */
private TunnelGateway.QueuePreprocessor createPreprocessor(TunnelCreatorConfig cfg) {
if (true)
//if (true)
return new BatchedRouterPreprocessor(_context, cfg);
else
return new TrivialRouterPreprocessor(_context);
//else
// return new TrivialRouterPreprocessor(_context);
}
/**
@ -237,7 +240,7 @@ public class TunnelDispatcher implements Service {
TunnelGateway.Sender sender = new InboundSender(_context, cfg);
TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg);
//TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
TunnelGateway gw = new ThrottledPumpedTunnelGateway(_context, preproc, sender, receiver, _pumper, cfg);
TunnelId recvId = cfg.getReceiveTunnel();
_inboundGateways.put(recvId, gw);
_participatingConfig.put(recvId, cfg);
@ -605,12 +608,17 @@ public class TunnelDispatcher implements Service {
if (pctDrop <= 0)
return false;
// increase the drop probability for OBEP,
// (except lower it for tunnel build messages (type 21)),
// and lower it for IBGW, for network efficiency
double len = length;
if (type.startsWith("OBEP"))
len *= 1.5;
else if (type.startsWith("IBGW"))
if (type.startsWith("OBEP")) {
if (type.equals("OBEP 21"))
len /= 1.5;
else
len *= 1.5;
} else if (type.startsWith("IBGW")) {
len /= 1.5;
}
// drop in proportion to size w.r.t. a standard 1024-byte message
// this is a little expensive but we want to adjust the curve between 0 and 1
// Most messages are 1024, only at the OBEP do we see other sizes

View File

@ -3,12 +3,12 @@ package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
@ -34,9 +34,9 @@ import net.i2p.util.SimpleTimer;
*
*/
public class TunnelGateway {
protected I2PAppContext _context;
protected RouterContext _context;
protected Log _log;
protected final List _queue;
protected final List<Pending> _queue;
protected QueuePreprocessor _preprocessor;
protected Sender _sender;
protected Receiver _receiver;
@ -53,7 +53,7 @@ public class TunnelGateway {
* @param receiver this receives the encrypted message and forwards it off
* to the first hop
*/
public TunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
public TunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
_context = context;
_log = context.logManager().getLog(getClass());
_queue = new ArrayList(4);
@ -110,7 +110,7 @@ public class TunnelGateway {
// expire any as necessary, even if its framented
for (int i = 0; i < _queue.size(); i++) {
Pending m = (Pending)_queue.get(i);
Pending m = _queue.get(i);
if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);
@ -154,12 +154,18 @@ public class TunnelGateway {
public interface QueuePreprocessor {
/**
* Caller must synchronize on the list!
*
* @param pending list of Pending objects for messages either unsent
* or partly sent. This list should be update with any
* values removed (the preprocessor owns the lock)
* Messages are not removed from the list until actually sent.
* The status of unsent and partially-sent messages is stored in
* the Pending structure.
*
* @return true if we should delay before preprocessing again
*/
public boolean preprocessQueue(List pending, Sender sender, Receiver receiver);
public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver);
/** how long do we want to wait before flushing */
public long getDelayAmount();
@ -173,6 +179,9 @@ public class TunnelGateway {
public long receiveEncrypted(byte encrypted[]);
}
/**
* Stores all the state for an unsent or partially-sent message
*/
public static class Pending {
protected Hash _toRouter;
protected TunnelId _toTunnel;
@ -182,7 +191,7 @@ public class TunnelGateway {
protected int _offset;
protected int _fragmentNumber;
protected long _created;
private List _messageIds;
private List<Long> _messageIds;
public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
this(message, toRouter, toTunnel, System.currentTimeMillis());
@ -215,6 +224,10 @@ public class TunnelGateway {
public int getFragmentNumber() { return _fragmentNumber; }
/** ok, fragment sent, increment what the next will be */
public void incrementFragmentNumber() { _fragmentNumber++; }
/**
* Add an ID to the list of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public void addMessageId(long id) {
synchronized (Pending.this) {
if (_messageIds == null)
@ -222,7 +235,11 @@ public class TunnelGateway {
_messageIds.add(new Long(id));
}
}
public List getMessageIds() {
/**
* The IDs of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public List<Long> getMessageIds() {
synchronized (Pending.this) {
if (_messageIds != null)
return new ArrayList(_messageIds);
@ -231,6 +248,8 @@ public class TunnelGateway {
}
}
}
/** Extend for debugging */
class PendingImpl extends Pending {
public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
super(message, toRouter, toTunnel, _context.clock().now());
@ -269,11 +288,11 @@ public class TunnelGateway {
long beforeLock = _context.clock().now();
long afterChecked = -1;
long delayAmount = -1;
if (_queue.size() > 10000) // stay out of the synchronized block
System.out.println("foo!");
//if (_queue.size() > 10000) // stay out of the synchronized block
// System.out.println("foo!");
synchronized (_queue) {
if (_queue.size() > 10000) // stay in the synchronized block
System.out.println("foo!");
//if (_queue.size() > 10000) // stay in the synchronized block
// System.out.println("foo!");
afterChecked = _context.clock().now();
if (_queue.size() > 0) {
if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )

View File

@ -5,7 +5,6 @@ import java.util.List;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* run through the tunnel gateways that have had messages added to them and push
@ -13,14 +12,12 @@ import net.i2p.util.Log;
*/
public class TunnelGatewayPumper implements Runnable {
private RouterContext _context;
private Log _log;
private final List _wantsPumping;
private final List<PumpedTunnelGateway> _wantsPumping;
private boolean _stop;
/** Creates a new instance of TunnelGatewayPumper */
public TunnelGatewayPumper(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_wantsPumping = new ArrayList(64);
_stop = false;
for (int i = 0; i < 4; i++)
@ -40,12 +37,12 @@ public class TunnelGatewayPumper implements Runnable {
public void run() {
PumpedTunnelGateway gw = null;
List queueBuf = new ArrayList(32);
List<TunnelGateway.Pending> queueBuf = new ArrayList(32);
while (!_stop) {
try {
synchronized (_wantsPumping) {
if (_wantsPumping.size() > 0)
gw = (PumpedTunnelGateway)_wantsPumping.remove(0);
gw = _wantsPumping.remove(0);
else
_wantsPumping.wait();
}

View File

@ -71,6 +71,9 @@ public class TunnelParticipant {
if (_log.shouldLog(Log.WARN))
_log.warn("Failed to dispatch " + msg + ": processor=" + _processor
+ " inboundEndpoint=" + _inboundEndpointProcessor);
if (_config != null)
_config.incrementProcessedMessages();
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
return;
}
@ -101,8 +104,11 @@ public class TunnelParticipant {
}
}
/****
private int _periodMessagesTransferred;
private long _lastCoallesced = System.currentTimeMillis();
****/
/**
* take note that the peers specified were able to push us data. hmm, is this safe?
* this could be easily gamed to get us to rank some peer of their choosing as quite

View File

@ -2,6 +2,7 @@ package net.i2p.router.tunnel.pool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import net.i2p.data.Hash;
@ -19,23 +20,25 @@ import net.i2p.util.Log;
* it waits for a short period before looping again (or until it is told that something
* changed, such as a tunnel failed, new client started up, or tunnel creation was aborted).
*
* Note that 10 minute tunnel expiration is hardcoded in here.
*/
class BuildExecutor implements Runnable {
private final List _recentBuildIds = new ArrayList(100);
private final ArrayList<Long> _recentBuildIds = new ArrayList(100);
private RouterContext _context;
private Log _log;
private TunnelPoolManager _manager;
/** list of TunnelCreatorConfig elements of tunnels currently being built */
private final List _currentlyBuilding;
private final List<PooledTunnelCreatorConfig> _currentlyBuilding;
private boolean _isRunning;
private BuildHandler _handler;
private boolean _repoll;
private static final int MAX_CONCURRENT_BUILDS = 10;
public BuildExecutor(RouterContext ctx, TunnelPoolManager mgr) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_manager = mgr;
_currentlyBuilding = new ArrayList(10);
_currentlyBuilding = new ArrayList(MAX_CONCURRENT_BUILDS);
_context.statManager().createRateStat("tunnel.concurrentBuilds", "How many builds are going at once", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_context.statManager().createRateStat("tunnel.concurrentBuildsLagged", "How many builds are going at once when we reject further builds, due to job lag (period is lag)", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_context.statManager().createRateStat("tunnel.buildExploratoryExpire", "How often an exploratory tunnel times out during creation", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
@ -51,9 +54,7 @@ class BuildExecutor implements Runnable {
// Get stat manager, get recognized bandwidth tiers
StatManager statMgr = _context.statManager();
@SuppressWarnings("static-access")
/* FIXME Accessing static field "BW_CAPABILITY_CHARS" FIXME */
String bwTiers = _context.router().getRouterInfo().BW_CAPABILITY_CHARS;
String bwTiers = RouterInfo.BW_CAPABILITY_CHARS;
// For each bandwidth tier, create tunnel build agree/reject/expire stats
for (int i = 0; i < bwTiers.length(); i++) {
String bwTier = String.valueOf(bwTiers.charAt(i));
@ -74,16 +75,17 @@ class BuildExecutor implements Runnable {
int maxKBps = _context.bandwidthLimiter().getOutboundKBytesPerSecond();
int allowed = maxKBps / 6; // Max. 1 concurrent build per 6 KB/s outbound
if (allowed < 2) allowed = 2; // Never choke below 2 builds (but congestion may)
if (allowed > 10) allowed = 10; // Never go beyond 10, that is uncharted territory (old limit was 5)
if (allowed > MAX_CONCURRENT_BUILDS) allowed = MAX_CONCURRENT_BUILDS; // Never go beyond 10, that is uncharted territory (old limit was 5)
allowed = _context.getProperty("router.tunnelConcurrentBuilds", allowed);
List expired = null;
List<PooledTunnelCreatorConfig> expired = null;
int concurrent = 0;
// Todo: Make expiration variable
long expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT;
synchronized (_currentlyBuilding) {
// expire any old requests
for (int i = 0; i < _currentlyBuilding.size(); i++) {
TunnelCreatorConfig cfg = (TunnelCreatorConfig)_currentlyBuilding.get(i);
PooledTunnelCreatorConfig cfg = _currentlyBuilding.get(i);
if (cfg.getExpiration() <= expireBefore) {
_currentlyBuilding.remove(i);
i--;
@ -98,7 +100,7 @@ class BuildExecutor implements Runnable {
if (expired != null) {
for (int i = 0; i < expired.size(); i++) {
PooledTunnelCreatorConfig cfg = (PooledTunnelCreatorConfig)expired.get(i);
PooledTunnelCreatorConfig cfg = expired.get(i);
if (_log.shouldLog(Log.INFO))
_log.info("Timed out waiting for reply asking for " + cfg);
@ -220,8 +222,8 @@ class BuildExecutor implements Runnable {
public void run() {
_isRunning = true;
List wanted = new ArrayList(8);
List pools = new ArrayList(8);
List<TunnelPool> wanted = new ArrayList(MAX_CONCURRENT_BUILDS);
List<TunnelPool> pools = new ArrayList(8);
int pendingRemaining = 0;
@ -238,7 +240,7 @@ class BuildExecutor implements Runnable {
_repoll = pendingRemaining > 0; // resets repoll to false unless there are inbound requeusts pending
_manager.listPools(pools);
for (int i = 0; i < pools.size(); i++) {
TunnelPool pool = (TunnelPool)pools.get(i);
TunnelPool pool = pools.get(i);
if (!pool.isAlive())
continue;
int howMany = pool.countHowManyToBuild();
@ -278,14 +280,15 @@ class BuildExecutor implements Runnable {
} else {
if ( (allowed > 0) && (wanted.size() > 0) ) {
Collections.shuffle(wanted, _context.random());
Collections.sort(wanted, new TunnelPoolComparator());
// force the loops to be short, since 3 consecutive tunnel build requests can take
// a long, long time
if (allowed > 2)
allowed = 2;
for (int i = 0; (i < allowed) && (wanted.size() > 0); i++) {
TunnelPool pool = (TunnelPool)wanted.remove(0);
TunnelPool pool = wanted.remove(0);
//if (pool.countWantedTunnels() <= 0)
// continue;
PooledTunnelCreatorConfig cfg = pool.configureNewTunnel();
@ -360,13 +363,38 @@ class BuildExecutor implements Runnable {
_isRunning = false;
}
/**
* Prioritize the pools for building
* #1: Exploratory
* #2: Pools without tunnels
* #3: Everybody else
*
* This prevents a large number of client pools from starving the exploratory pool.
*
*/
private static class TunnelPoolComparator implements Comparator {
public int compare(Object l, Object r) {
TunnelPool tpl = (TunnelPool) l;
TunnelPool tpr = (TunnelPool) r;
if (tpl.getSettings().isExploratory() && !tpr.getSettings().isExploratory())
return -1;
if (tpr.getSettings().isExploratory() && !tpl.getSettings().isExploratory())
return 1;
if (tpl.getTunnelCount() <= 0 && tpr.getTunnelCount() > 0)
return -1;
if (tpr.getTunnelCount() <= 0 && tpl.getTunnelCount() > 0)
return 1;
return 0;
}
}
/**
* iterate over the 0hop tunnels, running them all inline regardless of how many are allowed
* @return number of tunnels allowed after processing these zero hop tunnels (almost always the same as before)
*/
private int buildZeroHopTunnels(List wanted, int allowed) {
private int buildZeroHopTunnels(List<TunnelPool> wanted, int allowed) {
for (int i = 0; i < wanted.size(); i++) {
TunnelPool pool = (TunnelPool)wanted.get(0);
TunnelPool pool = wanted.get(0);
if (pool.getSettings().getLength() == 0) {
PooledTunnelCreatorConfig cfg = pool.configureNewTunnel();
if (cfg != null) {
@ -403,8 +431,11 @@ class BuildExecutor implements Runnable {
long id = cfg.getReplyMessageId();
if (id > 0) {
synchronized (_recentBuildIds) {
while (_recentBuildIds.size() > 64)
_recentBuildIds.remove(0);
// every so often, shrink the list semi-efficiently
if (_recentBuildIds.size() > 98) {
for (int i = 0; i < 32; i++)
_recentBuildIds.remove(0);
}
_recentBuildIds.add(new Long(id));
}
}

View File

@ -30,6 +30,8 @@ import net.i2p.stat.RateStat;
import net.i2p.util.Log;
/**
*
* Note that 10 minute tunnel expiration is hardcoded in here.
*
*/
class BuildHandler {
@ -39,20 +41,27 @@ class BuildHandler {
private Job _buildMessageHandlerJob;
private Job _buildReplyMessageHandlerJob;
/** list of BuildMessageState, oldest first */
private final List _inboundBuildMessages;
/** list of BuildReplyMessageState, oldest first */
private final List _inboundBuildReplyMessages;
/** list of BuildEndMessageState, oldest first */
private final List _inboundBuildEndMessages;
private final List<BuildMessageState> _inboundBuildMessages;
/** list of BuildReplyMessageState, oldest first - unused unless HANDLE_REPLIES_INLINE == false */
private final List<BuildReplyMessageState> _inboundBuildReplyMessages;
/** list of BuildEndMessageState, oldest first - unused unless HANDLE_REPLIES_INLINE == false */
private final List<BuildEndMessageState> _inboundBuildEndMessages;
private BuildMessageProcessor _processor;
private static final boolean HANDLE_REPLIES_INLINE = true;
public BuildHandler(RouterContext ctx, BuildExecutor exec) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_exec = exec;
_inboundBuildMessages = new ArrayList(16);
_inboundBuildReplyMessages = new ArrayList(16);
_inboundBuildEndMessages = new ArrayList(16);
if (HANDLE_REPLIES_INLINE) {
_inboundBuildEndMessages = null;
_inboundBuildReplyMessages = null;
} else {
_inboundBuildEndMessages = new ArrayList(16);
_inboundBuildReplyMessages = new ArrayList(16);
}
_context.statManager().createRateStat("tunnel.reject.10", "How often we reject a tunnel probabalistically", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.reject.20", "How often we reject a tunnel because of transient overload", "Tunnels", new long[] { 60*1000, 10*60*1000 });
@ -104,10 +113,10 @@ class BuildHandler {
if (toHandle > MAX_HANDLE_AT_ONCE)
toHandle = MAX_HANDLE_AT_ONCE;
handled = new ArrayList(toHandle);
if (false) {
for (int i = 0; i < toHandle; i++) // LIFO for lower response time (should we RED it for DoS?)
handled.add(_inboundBuildMessages.remove(_inboundBuildMessages.size()-1));
} else {
//if (false) {
// for (int i = 0; i < toHandle; i++) // LIFO for lower response time (should we RED it for DoS?)
// handled.add(_inboundBuildMessages.remove(_inboundBuildMessages.size()-1));
//} else {
// drop any expired messages
long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4);
do {
@ -131,7 +140,7 @@ class BuildHandler {
// when adding)
for (int i = 0; i < toHandle && _inboundBuildMessages.size() > 0; i++)
handled.add(_inboundBuildMessages.remove(0));
}
//}
}
remaining = _inboundBuildMessages.size();
}
@ -148,14 +157,16 @@ class BuildHandler {
}
handled.clear();
}
synchronized (_inboundBuildEndMessages) {
int toHandle = _inboundBuildEndMessages.size();
if (toHandle > 0) {
if (handled == null)
handled = new ArrayList(_inboundBuildEndMessages);
else
handled.addAll(_inboundBuildEndMessages);
_inboundBuildEndMessages.clear();
if (!HANDLE_REPLIES_INLINE) {
synchronized (_inboundBuildEndMessages) {
int toHandle = _inboundBuildEndMessages.size();
if (toHandle > 0) {
if (handled == null)
handled = new ArrayList(_inboundBuildEndMessages);
else
handled.addAll(_inboundBuildEndMessages);
_inboundBuildEndMessages.clear();
}
}
}
if (handled != null) {
@ -181,7 +192,10 @@ class BuildHandler {
return remaining;
}
/** Warning - noop if HANDLE_REPLIES_INLINE == true */
void handleInboundReplies() {
if (HANDLE_REPLIES_INLINE)
return;
List handled = null;
synchronized (_inboundBuildReplyMessages) {
int toHandle = _inboundBuildReplyMessages.size();
@ -236,8 +250,8 @@ class BuildHandler {
private void handleReply(TunnelBuildReplyMessage msg, PooledTunnelCreatorConfig cfg, long delay) {
long requestedOn = cfg.getExpiration() - 10*60*1000;
long rtt = _context.clock().now() - requestedOn;
if (_log.shouldLog(Log.DEBUG))
_log.debug(msg.getUniqueId() + ": Handling the reply after " + rtt + ", delayed " + delay + " waiting for " + cfg);
if (_log.shouldLog(Log.INFO))
_log.info(msg.getUniqueId() + ": Handling the reply after " + rtt + ", delayed " + delay + " waiting for " + cfg);
BuildReplyHandler handler = new BuildReplyHandler();
List order = cfg.getReplyOrder();
@ -468,6 +482,14 @@ class BuildHandler {
return 0;
}
/**
* Actually process the request and send the reply.
*
* Todo: Replies are not subject to RED for bandwidth reasons,
* and the bandwidth is not credited to any tunnel.
* If we did credit the reply to the tunnel, it would
* prevent the classification of the tunnel as 'inactive' on tunnels.jsp.
*/
@SuppressWarnings("static-access")
private void handleReq(RouterInfo nextPeerInfo, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
long ourId = req.readReceiveTunnelId();
@ -590,8 +612,7 @@ class BuildHandler {
return;
}
BuildResponseRecord resp = new BuildResponseRecord();
byte reply[] = resp.create(_context, response, req.readReplyKey(), req.readReplyIV(), state.msg.getUniqueId());
byte reply[] = BuildResponseRecord.create(_context, response, req.readReplyKey(), req.readReplyIV(), state.msg.getUniqueId());
for (int j = 0; j < TunnelBuildMessage.RECORD_COUNT; j++) {
if (state.msg.getRecord(j) == null) {
ourSlot = j;
@ -663,10 +684,10 @@ class BuildHandler {
}
}
/** um, this is bad. don't set this. */
private static final boolean DROP_ALL_REQUESTS = false;
private static final boolean HANDLE_REPLIES_INLINE = true;
/**
* Handle incoming Tunnel Build Messages, which are generally requests to us,
* but could also be the reply where we are the IBEP.
*/
private class TunnelBuildMessageHandlerJobBuilder implements HandlerJobBuilder {
public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) {
// need to figure out if this is a reply to an inbound tunnel request (where we are the
@ -704,9 +725,11 @@ class BuildHandler {
_exec.repoll();
}
} else {
if (DROP_ALL_REQUESTS || _exec.wasRecentlyBuilding(reqId)) {
if (_exec.wasRecentlyBuilding(reqId)) {
// we are the IBEP but we already gave up?
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping the reply " + reqId + ", as we used to be building that");
_context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0);
} else {
synchronized (_inboundBuildMessages) {
boolean removed = false;

View File

@ -29,7 +29,17 @@ class BuildRequestor {
ORDER.add(Integer.valueOf(i));
}
private static final int PRIORITY = 500;
static final int REQUEST_TIMEOUT = 10*1000;
/**
* At 10 seconds, we were receiving about 20% of replies after expiration
* Todo: make this variable on a per-request basis, to account for tunnel length,
* expl. vs. client, uptime, and network conditions.
* Put the expiration in the PTCC.
*
* Also, perhaps, save the PTCC even after expiration for an extended time,
* so can we use a successfully built tunnel anyway.
*
*/
static final int REQUEST_TIMEOUT = 13*1000;
private static boolean usePairedTunnels(RouterContext ctx) {
String val = ctx.getProperty("router.usePairedTunnels");
@ -109,23 +119,24 @@ class BuildRequestor {
return;
}
cfg.setPairedTunnel(pairedTunnel);
//cfg.setPairedTunnel(pairedTunnel);
long beforeDispatch = System.currentTimeMillis();
if (cfg.isInbound()) {
if (log.shouldLog(Log.DEBUG))
log.debug("Sending the tunnel build request " + msg.getUniqueId() + " out the tunnel " + pairedTunnel + " to "
if (log.shouldLog(Log.INFO))
log.info("Sending the tunnel build request " + msg.getUniqueId() + " out the tunnel " + pairedTunnel + " to "
+ cfg.getPeer(0).toBase64() + " for " + cfg + " waiting for the reply of "
+ cfg.getReplyMessageId());
// send it out a tunnel targetting the first hop
ctx.tunnelDispatcher().dispatchOutbound(msg, pairedTunnel.getSendTunnelId(0), cfg.getPeer(0));
} else {
if (log.shouldLog(Log.DEBUG))
log.debug("Sending the tunnel build request directly to " + cfg.getPeer(1).toBase64()
if (log.shouldLog(Log.INFO))
log.info("Sending the tunnel build request directly to " + cfg.getPeer(1).toBase64()
+ " for " + cfg + " waiting for the reply of " + cfg.getReplyMessageId()
+ " with msgId=" + msg.getUniqueId());
// send it directly to the first hop
OutNetMessage outMsg = new OutNetMessage(ctx);
// Todo: add some fuzz to the expiration to make it harder to guess how many hops?
outMsg.setExpiration(msg.getMessageExpiration());
outMsg.setMessage(msg);
outMsg.setPriority(PRIORITY);
@ -190,7 +201,7 @@ class BuildRequestor {
}
}
if (log.shouldLog(Log.DEBUG))
log.debug(cfg.getReplyMessageId() + ": record " + i + "/" + hop + " has key " + key + " for " + cfg);
log.debug(cfg.getReplyMessageId() + ": record " + i + "/" + hop + " has key " + key);
gen.createRecord(i, hop, msg, cfg, replyRouter, replyTunnel, ctx, key);
}
gen.layeredEncrypt(ctx, msg, cfg, order);

View File

@ -55,6 +55,9 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
// remove us from the pool (but not the dispatcher) so that we aren't
// selected again. _expireJob is left to do its thing, in case there
// are any straggling messages coming down the tunnel
//
// Todo: Maybe delay or prevent failing if we are near tunnel build capacity,
// to prevent collapse (loss of all tunnels)
_pool.tunnelFailed(this);
if (_testJob != null) // just in case...
_context.jobQueue().removeJob(_testJob);
@ -79,11 +82,14 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
public TunnelPool getTunnelPool() { return _pool; }
/* FIXME Exporting non-public type through public API FIXME */
public void setTestJob(TestJob job) { _testJob = job; }
/** @deprecated unused, which makes _testJob unused - why is it here */
void setTestJob(TestJob job) { _testJob = job; }
/** does nothing, to be deprecated */
public void setExpireJob(Job job) { /* _expireJob = job; */ }
// Fix memory leaks caused by references if you need to use pairedTunnel
/**
* @deprecated Fix memory leaks caused by references if you need to use pairedTunnel
*/
public void setPairedTunnel(TunnelInfo tunnel) { /* _pairedTunnel = tunnel; */}
// public TunnelInfo getPairedTunnel() { return _pairedTunnel; }
}

View File

@ -491,7 +491,7 @@ public abstract class TunnelPeerSelector {
* Now:
* d((H(l+h), h) - d(H(r+h), h)
*/
private class HashComparator implements Comparator {
private static class HashComparator implements Comparator {
private Hash _hash;
private HashComparator(Hash h) {

View File

@ -29,6 +29,8 @@ import net.i2p.stat.RateStat;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.ObjectCounter;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
*
@ -42,7 +44,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
private final Map<Hash, TunnelPool> _clientOutboundPools;
private TunnelPool _inboundExploratory;
private TunnelPool _outboundExploratory;
private BuildExecutor _executor;
private final BuildExecutor _executor;
private boolean _isShutdown;
public TunnelPoolManager(RouterContext ctx) {
@ -175,7 +177,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
public int getOutboundClientTunnelCount() {
int count = 0;
List destinations = null;
synchronized (_clientInboundPools) {
synchronized (_clientOutboundPools) {
destinations = new ArrayList(_clientOutboundPools.keySet());
}
for (int i = 0; i < destinations.size(); i++) {
@ -250,6 +252,10 @@ public class TunnelPoolManager implements TunnelManagerFacade {
startup();
}
/**
* Used only at session startup.
* Do not use to change settings.
*/
public void buildTunnels(Destination client, ClientTunnelSettings settings) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Building tunnels for the client " + client.calculateHash().toBase64() + ": " + settings);
@ -259,6 +265,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
TunnelPool inbound = null;
TunnelPool outbound = null;
// should we share the clientPeerSelector across both inbound and outbound?
// or just one for all clients? why separate?
synchronized (_clientInboundPools) {
inbound = _clientInboundPools.get(dest);
if (inbound == null) {
@ -280,11 +287,22 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
}
inbound.startup();
try { Thread.sleep(3*1000); } catch (InterruptedException ie) {}
outbound.startup();
SimpleScheduler.getInstance().addEvent(new DelayedStartup(outbound), 3*1000);
}
private static class DelayedStartup implements SimpleTimer.TimedEvent {
private TunnelPool pool;
public DelayedStartup(TunnelPool p) {
this.pool = p;
}
public void timeReached() {
this.pool.startup();
}
}
public void removeTunnels(Hash destination) {
if (destination == null) return;
if (_context.clientManager().isLocal(destination)) {
@ -357,12 +375,11 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_inboundExploratory = new TunnelPool(_context, this, inboundSettings, selector);
_inboundExploratory.startup();
try { Thread.sleep(3*1000); } catch (InterruptedException ie) {}
TunnelPoolSettings outboundSettings = new TunnelPoolSettings();
outboundSettings.setIsExploratory(true);
outboundSettings.setIsInbound(false);
_outboundExploratory = new TunnelPool(_context, this, outboundSettings, selector);
_outboundExploratory.startup();
SimpleScheduler.getInstance().addEvent(new DelayedStartup(_outboundExploratory), 3*1000);
// try to build up longer tunnels
_context.jobQueue().addJob(new BootstrapPool(_context, _inboundExploratory));