* Fragmenter: Pull the new comments, new stats, and

debug log fix from i2p.i2p.zzz.batch in - but not the
      batching mods, which need a fresh look.
This commit is contained in:
zzz
2009-12-11 18:00:34 +00:00
parent 7cc75c0cb9
commit 6029e1a291
2 changed files with 65 additions and 17 deletions

View File

@ -13,6 +13,36 @@ import net.i2p.util.Log;
* after the delay there still isn't enough data, what is available is sent
* and padded.
*
* As explained in the tunnel document, the preprocessor has a lot of
* potential flexibility in delay, padding, or even reordering.
* We keep things relatively simple for now.
*
* However much of the efficiency results from the clients selecting
* the correct MTU in the streaming lib such that the maximum-size
* streaming lib message fits in an integral number of tunnel messages.
* See ConnectionOptions in the streaming lib for details.
*
* Aside from obvious goals of minimizing delay and padding, we also
* want to minimize the number of tunnel messages a message occupies,
* to minimize the impact of a router dropping a tunnel message.
* So there is some benefit in starting a message in a new tunnel message,
* especially if it will fit perfectly if we do that (a 964 or 1956 byte
* message, for example).
*
* An idea for the future...
*
* If we are in the middle of a tunnel msg and starting a new i2np msg,
* and this one won't fit,
* let's look to see if we have somthing that would fit instead
* by reordering:
* if (allocated > 0 && msg.getFragment == 0) {
* for (j = i+1, j < pending.size(); j++) {
* if it will fit and it is fragment 0 {
* msg = pending.remove(j)
* pending.add(0, msg)
* }
* }
* }
*/
public class BatchedPreprocessor extends TrivialPreprocessor {
private Log _log;
@ -24,13 +54,18 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
_log = ctx.logManager().getLog(BatchedPreprocessor.class);
_name = name;
_pendingSince = 0;
ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchCount", "How many groups of messages were flushed together", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelayAmount", "How long we should wait before flushing the batch", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchFlushRemaining", "How many messages remain after flushing", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.writeDelay", "How long after a message reaches the gateway is it processed (lifetime is size)", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchCount", "How many groups of messages were flushed together", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelayAmount", "How long we should wait before flushing the batch", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchFlushRemaining", "How many messages remain after flushing", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.writeDelay", "How long after a message reaches the gateway is it processed (lifetime is size)", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchSmallFragments", "How many outgoing pad bytes are in small fragments?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l });
ctx.statManager().createRateStat("tunnel.batchFullFragments", "How many outgoing tunnel messages use the full data area?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l });
ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
}
private static final int FULL_SIZE = PREPROCESSED_SIZE
@ -38,11 +73,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
- 1 // 0x00 ending the padding
- 4; // 4 byte checksum
private static final boolean DISABLE_BATCHING = false;
//private static final boolean DISABLE_BATCHING = false;
/* not final or private so the test code can adjust */
static long DEFAULT_DELAY = 100;
/** wait up to 2 seconds before sending a small message */
/** Wait up to this long before sending (flushing) a small tunnel message */
protected long getSendDelay() { return DEFAULT_DELAY; }
/** if we have 50 messages queued that are too small, flush them anyway */
@ -71,11 +106,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
timingBuf = new StringBuilder(128);
timingBuf.append("Preprocess with " + pending.size() + " to send. ");
}
if (DISABLE_BATCHING) {
if (_log.shouldLog(Log.INFO))
_log.info("Disabled batching, pushing " + pending + " immediately");
return super.preprocessQueue(pending, sender, rec);
}
//if (DISABLE_BATCHING) {
// if (_log.shouldLog(Log.INFO))
// _log.info("Disabled batching, pushing " + pending + " immediately");
// return super.preprocessQueue(pending, sender, rec);
//}
long start = System.currentTimeMillis();
int batchCount = 0;
@ -112,6 +147,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
long beforeSend = System.currentTimeMillis();
_pendingSince = 0;
send(pending, 0, i, sender, rec);
_context.statManager().addRateData("tunnel.batchFullFragments", 1, 0);
long afterSend = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO))
_log.info("Allocated=" + allocated + " so we sent " + (i+1)
@ -126,6 +162,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
if (timingBuf != null)
timingBuf.append(" sent " + cur);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed allocated");
_context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0);
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
}
if (msg.getOffset() >= msg.getData().length) {
@ -134,6 +171,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
if (timingBuf != null)
timingBuf.append(" sent perfect fit " + cur).append(".");
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), msg.getData().length, msg.getMessageIds(), "flushed tail, remaining: " + pending);
_context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0);
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
}
if (i > 0)
@ -169,6 +207,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
_context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0);
send(pending, 0, pending.size()-1, sender, rec);
_context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0);
int beforeSize = pending.size();
for (int i = 0; i < pending.size(); i++) {
@ -176,6 +215,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
if (cur.getOffset() >= cur.getData().length) {
pending.remove(i);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining");
_context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0);
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
i--;
}
@ -330,10 +370,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
+ " leaving " + (msg.getData().length - msg.getOffset()) + " bytes for later");
} else {
offset = writeSubsequentFragment(msg, target, offset);
if (_log.shouldLog(Log.DEBUG))
_log.debug("writing " + msg.getMessageId() + " fragment " + (msg.getFragmentNumber()-1)
if (_log.shouldLog(Log.DEBUG)) {
int frag = msg.getFragmentNumber();
int later = msg.getData().length - msg.getOffset();
if (later > 0)
frag--;
_log.debug("writing " + msg.getMessageId() + " fragment " + frag
+ ", ending at " + offset + " prev " + prevOffset
+ " leaving " + (msg.getData().length - msg.getOffset()) + " bytes for later");
+ " leaving " + later + " bytes for later");
}
}
}
return offset;

View File

@ -528,6 +528,9 @@ public class TunnelDispatcher implements Service {
long tooOld = tooYoung - 9*60*1000;
for (int i = 0; i < size; i++) {
HopConfig cfg = participating.get(i);
// rare NPE seen here, guess CHS.values() isn't atomic?
if (cfg == null)
continue;
long c = cfg.getRecentMessagesCount();
bw += c;
bwOut += cfg.getRecentSentMessagesCount();