more comments and cleanup

This commit is contained in:
zzz
2009-12-20 14:50:41 +00:00
parent 30b7dbf1f7
commit a877b21839
5 changed files with 31 additions and 11 deletions

View File

@ -97,6 +97,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
return rv; return rv;
} }
/* See TunnelGateway.QueuePreprocessor for Javadoc */
@Override @Override
public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
StringBuilder timingBuf = null; StringBuilder timingBuf = null;
@ -115,9 +116,12 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
int batchCount = 0; int batchCount = 0;
int beforeLooping = pending.size(); int beforeLooping = pending.size();
// loop until the queue is empty
while (pending.size() > 0) { while (pending.size() > 0) {
int allocated = 0; int allocated = 0;
long beforePendingLoop = System.currentTimeMillis(); long beforePendingLoop = System.currentTimeMillis();
// loop until we fill up a single message
for (int i = 0; i < pending.size(); i++) { for (int i = 0; i < pending.size(); i++) {
long pendingStart = System.currentTimeMillis(); long pendingStart = System.currentTimeMillis();
TunnelGateway.Pending msg = pending.get(i); TunnelGateway.Pending msg = pending.get(i);
@ -143,6 +147,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
long waited = _context.clock().now() - _pendingSince; long waited = _context.clock().now() - _pendingSince;
_context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), waited); _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), waited);
} }
// Send the message
long beforeSend = System.currentTimeMillis(); long beforeSend = System.currentTimeMillis();
_pendingSince = 0; _pendingSince = 0;
send(pending, 0, i, sender, rec); send(pending, 0, i, sender, rec);
@ -153,6 +159,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
+ " (last complete? " + (msg.getOffset() >= msg.getData().length) + " (last complete? " + (msg.getOffset() >= msg.getData().length)
+ ", off=" + msg.getOffset() + ", count=" + pending.size() + ")"); + ", off=" + msg.getOffset() + ", count=" + pending.size() + ")");
// Remove what we sent from the pending queue
for (int j = 0; j < i; j++) { for (int j = 0; j < i; j++) {
TunnelGateway.Pending cur = pending.remove(0); TunnelGateway.Pending cur = pending.remove(0);
if (cur.getOffset() < cur.getData().length) if (cur.getOffset() < cur.getData().length)
@ -185,18 +192,18 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
+ "/" + (beforeSend-start) + "/" + (beforeSend-start)
+ " pending current " + (pendingEnd-pendingStart)).append("."); + " pending current " + (pendingEnd-pendingStart)).append(".");
break; break;
} } // if >= full size
if (timingBuf != null) if (timingBuf != null)
timingBuf.append(" After pending loop " + (System.currentTimeMillis()-beforePendingLoop)).append("."); timingBuf.append(" After pending loop " + (System.currentTimeMillis()-beforePendingLoop)).append(".");
} } // for
long afterCleared = System.currentTimeMillis(); long afterCleared = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size())); display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size()));
long afterDisplayed = System.currentTimeMillis(); long afterDisplayed = System.currentTimeMillis();
if (allocated > 0) { if (allocated > 0) {
// after going through the entire pending list, we still don't // After going through the entire pending list, we have only a partial message.
// have enough data to send a full message // We might flush it or might not, but we are returning either way.
if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) { if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) {
// not even a full message, but we want to flush it anyway // not even a full message, but we want to flush it anyway
@ -208,6 +215,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
send(pending, 0, pending.size()-1, sender, rec); send(pending, 0, pending.size()-1, sender, rec);
_context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0); _context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0);
// Remove everything in the message from the pending queue
int beforeSize = pending.size(); int beforeSize = pending.size();
for (int i = 0; i < pending.size(); i++) { for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending cur = pending.get(i); TunnelGateway.Pending cur = pending.get(i);
@ -245,7 +253,9 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
} }
return false; return false;
} }
// won't get here, we returned
} else { } else {
// We didn't flush. Note that the messages remain on the pending list.
_context.statManager().addRateData("tunnel.batchDelay", pending.size(), 0); _context.statManager().addRateData("tunnel.batchDelay", pending.size(), 0);
if (_pendingSince <= 0) if (_pendingSince <= 0)
_pendingSince = _context.clock().now(); _pendingSince = _context.clock().now();
@ -261,14 +271,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
} }
return true; return true;
} }
// won't get here, we returned
} else { } else {
// ok, we sent some, but haven't gone back for another // ok, we sent some, but haven't gone back for another
// pass yet. keep looping // pass yet. keep looping
if (timingBuf != null) if (timingBuf != null)
timingBuf.append(" Keep looping"); timingBuf.append(" Keep looping");
} } // if allocated
} } // while
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sent everything on the list (pending=" + pending.size() + ")"); _log.debug("Sent everything on the list (pending=" + pending.size() + ")");

View File

@ -79,7 +79,7 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor {
} }
@Override @Override
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List messageIds, String msg) { protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds, String msg) {
if (_config != null) if (_config != null)
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, _config, msg); _routerContext.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, _config, msg);
else else

View File

@ -84,7 +84,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
return false; return false;
} }
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List messageIds, String msg) {} protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds, String msg) {}
private byte[][] preprocess(TunnelGateway.Pending msg) { private byte[][] preprocess(TunnelGateway.Pending msg) {
List fragments = new ArrayList(1); List fragments = new ArrayList(1);

View File

@ -17,7 +17,7 @@ public class TrivialRouterPreprocessor extends TrivialPreprocessor {
_routerContext = ctx; _routerContext = ctx;
} }
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List messageIds) { protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds) {
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, null); _routerContext.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, null);
} }
} }

View File

@ -159,6 +159,10 @@ public class TunnelGateway {
* @param pending list of Pending objects for messages either unsent * @param pending list of Pending objects for messages either unsent
* or partly sent. This list should be update with any * or partly sent. This list should be update with any
* values removed (the preprocessor owns the lock) * values removed (the preprocessor owns the lock)
* Messages are not removed from the list until actually sent.
* The status of unsent and partially-sent messages is stored in
* the Pending structure.
*
* @return true if we should delay before preprocessing again * @return true if we should delay before preprocessing again
*/ */
public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver); public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver);
@ -175,6 +179,9 @@ public class TunnelGateway {
public long receiveEncrypted(byte encrypted[]); public long receiveEncrypted(byte encrypted[]);
} }
/**
* Stores all the state for an unsent or partially-sent message
*/
public static class Pending { public static class Pending {
protected Hash _toRouter; protected Hash _toRouter;
protected TunnelId _toTunnel; protected TunnelId _toTunnel;
@ -184,7 +191,7 @@ public class TunnelGateway {
protected int _offset; protected int _offset;
protected int _fragmentNumber; protected int _fragmentNumber;
protected long _created; protected long _created;
private List _messageIds; private List<Long> _messageIds;
public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
this(message, toRouter, toTunnel, System.currentTimeMillis()); this(message, toRouter, toTunnel, System.currentTimeMillis());
@ -224,7 +231,7 @@ public class TunnelGateway {
_messageIds.add(new Long(id)); _messageIds.add(new Long(id));
} }
} }
public List getMessageIds() { public List<Long> getMessageIds() {
synchronized (Pending.this) { synchronized (Pending.this) {
if (_messageIds != null) if (_messageIds != null)
return new ArrayList(_messageIds); return new ArrayList(_messageIds);
@ -233,6 +240,8 @@ public class TunnelGateway {
} }
} }
} }
/** Extend for debugging */
class PendingImpl extends Pending { class PendingImpl extends Pending {
public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
super(message, toRouter, toTunnel, _context.clock().now()); super(message, toRouter, toTunnel, _context.clock().now());