2005-04-20 jrandom

* In the SDK, we don't actually need to block when we're sending a message
      as BestEffort (and these days, we're always sending BestEffort).
    * Pass out client messages in fewer (larger) steps.
    * Have the InNetMessagePool short circuit dispatch requests.
    * Have the message validator take into account expiration to cut down on
      false positives at high transfer rates.
    * Allow configuration of the probabalistic window size growth rate in the
      streaming lib's slow start and congestion avoidance phases, and default
      them to a more conservative value (2), rather than the previous value
      (1).
    * Reduce the ack delay in the streaming lib to 500ms
    * Honor choke requests in the streaming lib (only affects those getting
      insanely high transfer rates)
    * Let the user specify an interface besides 127.0.0.1 or 0.0.0.0 on the
      I2PTunnel client page (thanks maestro^!)
(plus minor udp tweaks)
This commit is contained in:
jrandom
2005-04-20 19:15:25 +00:00
committed by zzz
parent 1861379d43
commit a2c7c5a516
17 changed files with 222 additions and 63 deletions

View File

@ -13,6 +13,7 @@ import java.util.ArrayList;
import java.util.List;
import net.i2p.data.Hash;
import net.i2p.data.RouterIdentity;
import net.i2p.data.i2np.DataMessage;
import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
@ -52,6 +53,13 @@ public class InNetMessagePool implements Service {
*/
public static final String PROP_DISPATCH_THREADED = "router.dispatchThreaded";
public static final boolean DEFAULT_DISPATCH_THREADED = false;
/**
* If we aren't doing threaded dispatch for tunnel messages, should we
* call the actual dispatch() method inline (on the same thread which
* called add())? If false, we queue it up in a shared short circuit
* job.
*/
private static final boolean DISPATCH_DIRECT = true;
public InNetMessagePool(RouterContext context) {
_context = context;
@ -101,6 +109,10 @@ public class InNetMessagePool implements Service {
+ " expiring on " + exp
+ " of type " + messageBody.getClass().getName());
//if (messageBody instanceof DataMessage) {
// _context.statManager().getStatLog().addData(fromRouterHash.toBase64().substring(0,6), "udp.floodDataReceived", 1, 0);
// return 0;
//}
if (messageBody instanceof TunnelDataMessage) {
// do not validate the message with the validator - the IV validator is sufficient
} else {
@ -228,7 +240,7 @@ public class InNetMessagePool implements Service {
// others and/or on other threads (e.g. transport threads). lets try 'em both.
private void shortCircuitTunnelGateway(I2NPMessage messageBody) {
if (false) {
if (DISPATCH_DIRECT) {
doShortCircuitTunnelGateway(messageBody);
} else {
synchronized (_pendingGatewayMessages) {
@ -249,7 +261,7 @@ public class InNetMessagePool implements Service {
}
private void shortCircuitTunnelData(I2NPMessage messageBody, Hash from) {
if (false) {
if (DISPATCH_DIRECT) {
doShortCircuitTunnelData(messageBody, from);
} else {
synchronized (_pendingDataMessages) {

View File

@ -60,6 +60,8 @@ public class MessageValidator {
}
}
private static final long TIME_MASK = 0xFFFFFC00;
/**
* Note that we've received the message (which has the expiration given).
* This functionality will need to be reworked for I2P 3.0 when we take into
@ -69,7 +71,16 @@ public class MessageValidator {
* @return true if we HAVE already seen this message, false if not
*/
private boolean noteReception(long messageId, long messageExpiration) {
boolean dup = _filter.add(messageId);
long val = messageId;
// tweak the high order bits with the message expiration /seconds/
val ^= (messageExpiration & TIME_MASK) << 16;
boolean dup = _filter.add(val);
if (dup && _log.shouldLog(Log.WARN)) {
_log.warn("Duplicate with " + _filter.getCurrentDuplicateCount()
+ " other dups, " + _filter.getInsertedCount()
+ " other entries, and a false positive rate of "
+ _filter.getFalsePositiveRate());
}
return dup;
}

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.186 $ $Date: 2005/04/17 18:23:20 $";
public final static String ID = "$Revision: 1.187 $ $Date: 2005/04/17 21:07:58 $";
public final static String VERSION = "0.5.0.6";
public final static long BUILD = 4;
public final static long BUILD = 5;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -114,6 +114,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look fora remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
_clientMessage = msg;
_clientMessageId = msg.getMessageId();
@ -355,8 +358,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
+ _lease.getTunnelId() + " on "
+ _lease.getGateway().toBase64());
// dispatch may take 100+ms, so toss it in its own job
getContext().jobQueue().addJob(new DispatchJob(getContext(), msg, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now())));
DispatchJob dispatchJob = new DispatchJob(getContext(), msg, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now()));
if (false) // dispatch may take 100+ms, so toss it in its own job
getContext().jobQueue().addJob(dispatchJob);
else
dispatchJob.runJob();
} else {
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Could not find any outbound tunnels to send the payload through... wtf?");
@ -364,6 +370,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
_clientMessage = null;
_clove = null;
getContext().statManager().addRateData("client.dispatchPrepareTime", getContext().clock().now() - _start, 0);
}
private class DispatchJob extends JobImpl {
@ -385,10 +392,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
getContext().messageRegistry().registerPending(_selector, _replyFound, _replyTimeout, _timeoutMs);
if (_log.shouldLog(Log.INFO))
_log.info("Dispatching message to " + _toString + ": " + _msg);
long before = getContext().clock().now();
getContext().tunnelDispatcher().dispatchOutbound(_msg, _outTunnel.getSendTunnelId(0), _lease.getTunnelId(), _lease.getGateway());
long dispatchSendTime = getContext().clock().now() - before;
if (_log.shouldLog(Log.INFO))
_log.info("Dispatching message to " + _toString + " complete");
getContext().statManager().addRateData("client.dispatchTime", getContext().clock().now() - _start, 0);
getContext().statManager().addRateData("client.dispatchSendTime", dispatchSendTime, 0);
}
}

View File

@ -66,7 +66,7 @@ public class OutboundMessageState {
}
}
private void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
private synchronized void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
_message = m;
_peer = peer;
if (_messageBuf != null) {
@ -91,8 +91,9 @@ public class OutboundMessageState {
_log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
}
public void releaseResources() {
_cache.release(_messageBuf);
public synchronized void releaseResources() {
if (_messageBuf != null)
_cache.release(_messageBuf);
_messageBuf = null;
}
@ -136,7 +137,7 @@ public class OutboundMessageState {
* fragmentSize bytes per fragment.
*
*/
public void fragment(int fragmentSize) {
public synchronized void fragment(int fragmentSize) {
int totalSize = _messageBuf.getValid();
int numFragments = totalSize / fragmentSize;
if (numFragments * fragmentSize != totalSize)
@ -161,7 +162,8 @@ public class OutboundMessageState {
}
/** should we continue sending this fragment? */
public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; }
public int fragmentSize(int fragmentNum) {
public synchronized int fragmentSize(int fragmentNum) {
if (_messageBuf == null) return -1;
if (fragmentNum + 1 == _fragmentSends.length)
return _messageBuf.getValid() % _fragmentSize;
else
@ -233,6 +235,7 @@ public class OutboundMessageState {
public synchronized int writeFragment(byte out[], int outOffset, int fragmentNum) {
int start = _fragmentSize * fragmentNum;
int end = start + _fragmentSize;
if (_messageBuf == null) return -1;
if (end > _messageBuf.getValid())
end = _messageBuf.getValid();
int toSend = end - start;
@ -243,7 +246,7 @@ public class OutboundMessageState {
return toSend;
}
public String toString() {
public synchronized String toString() {
StringBuffer buf = new StringBuffer(64);
buf.append("Message ").append(_messageId);
if (_fragmentSends != null)

View File

@ -65,10 +65,15 @@ public class PacketBuilder {
data[off] |= 1 << 2; // isLast
off++;
DataHelper.toLong(data, off, 2, state.fragmentSize(fragment));
int size = state.fragmentSize(fragment);
if (size < 0)
return null;
DataHelper.toLong(data, off, 2, size);
off += 2;
off += state.writeFragment(data, off, fragment);
size = state.writeFragment(data, off, fragment);
if (size < 0) return null;
off += size;
// we can pad here if we want, maybe randomized?

View File

@ -150,6 +150,7 @@ public class UDPSender {
try {
synchronized (_outboundQueue) {
if (_outboundQueue.size() <= 0) {
_outboundQueue.notifyAll();
_outboundQueue.wait();
} else {
packet = (UDPPacket)_outboundQueue.remove(0);

View File

@ -31,9 +31,10 @@ public class OutboundMessageDistributor {
public void distribute(I2NPMessage msg, Hash target, TunnelId tunnel) {
RouterInfo info = _context.netDb().lookupRouterInfoLocally(target);
if (info == null) {
_log.debug("outbound distributor to " + target.toBase64().substring(0,4)
+ "." + (tunnel != null ? tunnel.getTunnelId() + "" : "")
+ ": no info locally, searching...");
if (_log.shouldLog(Log.DEBUG))
_log.debug("outbound distributor to " + target.toBase64().substring(0,4)
+ "." + (tunnel != null ? tunnel.getTunnelId() + "" : "")
+ ": no info locally, searching...");
_context.netDb().lookupRouterInfo(target, new DistributeJob(_context, msg, target, tunnel), null, MAX_DISTRIBUTE_TIME);
return;
} else {