TunnelGateway: Refactor TunnelGateway.Pending to its own file PendingGatewayMesasge

This commit is contained in:
zzz
2012-09-01 21:39:14 +00:00
parent a0418bec59
commit f44eeaf7dd
6 changed files with 132 additions and 122 deletions

View File

@ -104,7 +104,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
/* See TunnelGateway.QueuePreprocessor for Javadoc */ /* See TunnelGateway.QueuePreprocessor for Javadoc */
@Override @Override
public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
display(0, pending, "Starting"); display(0, pending, "Starting");
StringBuilder timingBuf = null; StringBuilder timingBuf = null;
@ -131,7 +131,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
// loop until we fill up a single message // loop until we fill up a single message
for (int i = 0; i < pending.size(); i++) { for (int i = 0; i < pending.size(); i++) {
long pendingStart = System.currentTimeMillis(); long pendingStart = System.currentTimeMillis();
TunnelGateway.Pending msg = pending.get(i); PendingGatewayMessage msg = pending.get(i);
int instructionsSize = getInstructionsSize(msg); int instructionsSize = getInstructionsSize(msg);
instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize); instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize);
int curWanted = msg.getData().length - msg.getOffset() + instructionsSize; int curWanted = msg.getData().length - msg.getOffset() + instructionsSize;
@ -169,7 +169,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
// Remove what we sent from the pending queue // Remove what we sent from the pending queue
for (int j = 0; j < i; j++) { for (int j = 0; j < i; j++) {
TunnelGateway.Pending cur = pending.remove(0); PendingGatewayMessage cur = pending.remove(0);
if (cur.getOffset() < cur.getData().length) if (cur.getOffset() < cur.getData().length)
throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset() throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset()
+ " len=" + cur.getData().length + " alloc=" + allocated); + " len=" + cur.getData().length + " alloc=" + allocated);
@ -181,7 +181,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
} }
if (msg.getOffset() >= msg.getData().length) { if (msg.getOffset() >= msg.getData().length) {
// ok, this last message fit perfectly, remove it too // ok, this last message fit perfectly, remove it too
TunnelGateway.Pending cur = pending.remove(0); PendingGatewayMessage cur = pending.remove(0);
if (timingBuf != null) if (timingBuf != null)
timingBuf.append(" sent perfect fit " + cur).append("."); timingBuf.append(" sent perfect fit " + cur).append(".");
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), msg.getData().length, msg.getMessageIds(), "flushed tail, remaining: " + pending); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), msg.getData().length, msg.getMessageIds(), "flushed tail, remaining: " + pending);
@ -230,7 +230,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
// Remove everything in the outgoing message from the pending queue // Remove everything in the outgoing message from the pending queue
int beforeSize = pending.size(); int beforeSize = pending.size();
for (int i = 0; i < beforeSize; i++) { for (int i = 0; i < beforeSize; i++) {
TunnelGateway.Pending cur = pending.get(0); PendingGatewayMessage cur = pending.get(0);
if (cur.getOffset() < cur.getData().length) if (cur.getOffset() < cur.getData().length)
break; break;
pending.remove(0); pending.remove(0);
@ -316,7 +316,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
* *
* title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc. * title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc.
*/ */
private void display(long allocated, List<TunnelGateway.Pending> pending, String title) { private void display(long allocated, List<PendingGatewayMessage> pending, String title) {
if (_log.shouldLog(Log.INFO)) { if (_log.shouldLog(Log.INFO)) {
long highestDelay = 0; long highestDelay = 0;
StringBuilder buf = new StringBuilder(128); StringBuilder buf = new StringBuilder(128);
@ -327,7 +327,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
if (_pendingSince > 0) if (_pendingSince > 0)
buf.append(" delay: ").append(getDelayAmount(false)); buf.append(" delay: ").append(getDelayAmount(false));
for (int i = 0; i < pending.size(); i++) { for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending curPending = pending.get(i); PendingGatewayMessage curPending = pending.get(i);
buf.append(" [").append(i).append("]:"); buf.append(" [").append(i).append("]:");
buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/'); buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/');
buf.append(curPending.getLifetime()); buf.append(curPending.getLifetime());
@ -347,7 +347,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
* @param startAt first index in pending to send (inclusive) * @param startAt first index in pending to send (inclusive)
* @param sendThrough last index in pending to send (inclusive) * @param sendThrough last index in pending to send (inclusive)
*/ */
protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { protected void send(List<PendingGatewayMessage> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending); _log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending);
@ -384,7 +384,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
long msgId = sender.sendPreprocessed(preprocessed, rec); long msgId = sender.sendPreprocessed(preprocessed, rec);
for (int i = 0; i < pending.size(); i++) { for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending cur = pending.get(i); PendingGatewayMessage cur = pending.get(i);
cur.addMessageId(msgId); cur.addMessageId(msgId);
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -397,9 +397,9 @@ class BatchedPreprocessor extends TrivialPreprocessor {
* *
* @return new offset into the target for further bytes to be written * @return new offset into the target for further bytes to be written
*/ */
private int writeFragments(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, byte target[], int offset) { private int writeFragments(List<PendingGatewayMessage> pending, int startAt, int sendThrough, byte target[], int offset) {
for (int i = startAt; i <= sendThrough; i++) { for (int i = startAt; i <= sendThrough; i++) {
TunnelGateway.Pending msg = pending.get(i); PendingGatewayMessage msg = pending.get(i);
int prevOffset = offset; int prevOffset = offset;
if (msg.getOffset() == 0) { if (msg.getOffset() == 0) {
offset = writeFirstFragment(msg, target, offset); offset = writeFirstFragment(msg, target, offset);

View File

@ -0,0 +1,109 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.RouterContext;
/**
* Stores all the state for an unsent or partially-sent message
*
* @since 0.9.3 refactored from TunnelGateway.Pending
*/
class PendingGatewayMessage {
protected final Hash _toRouter;
protected final TunnelId _toTunnel;
protected final long _messageId;
protected final long _expiration;
protected final byte _remaining[];
protected int _offset;
protected int _fragmentNumber;
protected final long _created;
private List<Long> _messageIds;
public PendingGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
_toRouter = toRouter;
_toTunnel = toTunnel;
_messageId = message.getUniqueId();
_expiration = message.getMessageExpiration();
_remaining = message.toByteArray();
_created = System.currentTimeMillis();
}
/** may be null */
public Hash getToRouter() { return _toRouter; }
/** may be null */
public TunnelId getToTunnel() { return _toTunnel; }
public long getMessageId() { return _messageId; }
public long getExpiration() { return _expiration; }
/** raw unfragmented message to send */
public byte[] getData() { return _remaining; }
/** index into the data to be sent */
public int getOffset() { return _offset; }
/** move the offset */
public void setOffset(int offset) { _offset = offset; }
public long getLifetime() { return System.currentTimeMillis()-_created; }
/** which fragment are we working on (0 for the first fragment) */
public int getFragmentNumber() { return _fragmentNumber; }
/** ok, fragment sent, increment what the next will be */
public void incrementFragmentNumber() { _fragmentNumber++; }
/**
* Add an ID to the list of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public void addMessageId(long id) {
synchronized (this) {
if (_messageIds == null)
_messageIds = new ArrayList();
_messageIds.add(Long.valueOf(id));
}
}
/**
* The IDs of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public List<Long> getMessageIds() {
synchronized (this) {
if (_messageIds != null)
return new ArrayList(_messageIds);
else
return new ArrayList();
}
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("Message ").append(_messageId); //.append(" on ");
//buf.append(TunnelGateway.this.toString());
if (_toRouter != null) {
buf.append(" targetting ");
buf.append(_toRouter.toBase64()).append(" ");
if (_toTunnel != null)
buf.append(_toTunnel.getTunnelId());
}
buf.append(" actual lifetime ");
buf.append(getLifetime()).append("ms");
buf.append(" potential lifetime ");
buf.append(_expiration - _created).append("ms");
buf.append(" size ").append(_remaining.length);
buf.append(" offset ").append(_offset);
buf.append(" frag ").append(_fragmentNumber);
return buf.toString();
}
}

View File

@ -35,7 +35,7 @@ import net.i2p.util.Log;
* *
*/ */
class PumpedTunnelGateway extends TunnelGateway { class PumpedTunnelGateway extends TunnelGateway {
private final BlockingQueue<Pending> _prequeue; private final BlockingQueue<PendingGatewayMessage> _prequeue;
private final TunnelGatewayPumper _pumper; private final TunnelGatewayPumper _pumper;
private static final int MAX_MSGS_PER_PUMP = 16; private static final int MAX_MSGS_PER_PUMP = 16;
@ -71,7 +71,7 @@ class PumpedTunnelGateway extends TunnelGateway {
@Override @Override
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) { public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
_messagesSent++; _messagesSent++;
Pending cur = new PendingImpl(msg, toRouter, toTunnel); PendingGatewayMessage cur = new PendingGatewayMessage(msg, toRouter, toTunnel);
if (_prequeue.offer(cur)) if (_prequeue.offer(cur))
_pumper.wantsPumping(this); _pumper.wantsPumping(this);
else else
@ -88,7 +88,7 @@ class PumpedTunnelGateway extends TunnelGateway {
* @param queueBuf Empty list for convenience, to use as a temporary buffer. * @param queueBuf Empty list for convenience, to use as a temporary buffer.
* Must be empty when called; will always be emptied before return. * Must be empty when called; will always be emptied before return.
*/ */
void pump(List<Pending> queueBuf) { void pump(List<PendingGatewayMessage> queueBuf) {
_prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP); _prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP);
if (queueBuf.isEmpty()) if (queueBuf.isEmpty())
return; return;
@ -114,7 +114,7 @@ class PumpedTunnelGateway extends TunnelGateway {
// expire any as necessary, even if its framented // expire any as necessary, even if its framented
for (int i = 0; i < _queue.size(); i++) { for (int i = 0; i < _queue.size(); i++) {
Pending m = _queue.get(i); PendingGatewayMessage m = _queue.get(i);
if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) { if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Expire on the queue (size=" + _queue.size() + "): " + m); _log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);

View File

@ -50,7 +50,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
* *
* NOTE: Unused here, see BatchedPreprocessor override, super is not called. * NOTE: Unused here, see BatchedPreprocessor override, super is not called.
*/ */
public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
throw new IllegalArgumentException("unused, right?"); throw new IllegalArgumentException("unused, right?");
} }
@ -155,7 +155,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5); private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5);
private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5); private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5);
protected int writeFirstFragment(TunnelGateway.Pending msg, byte target[], int offset) { protected int writeFirstFragment(PendingGatewayMessage msg, byte target[], int offset) {
boolean fragmented = false; boolean fragmented = false;
int instructionsLength = getInstructionsSize(msg); int instructionsLength = getInstructionsSize(msg);
int payloadLength = msg.getData().length - msg.getOffset(); int payloadLength = msg.getData().length - msg.getOffset();
@ -221,7 +221,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
return offset; return offset;
} }
protected int writeSubsequentFragment(TunnelGateway.Pending msg, byte target[], int offset) { protected int writeSubsequentFragment(PendingGatewayMessage msg, byte target[], int offset) {
boolean isLast = true; boolean isLast = true;
int instructionsLength = getInstructionsSize(msg); int instructionsLength = getInstructionsSize(msg);
@ -269,7 +269,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
* Does NOT include 4 for the message ID if the message will be fragmented; * Does NOT include 4 for the message ID if the message will be fragmented;
* call getInstructionAugmentationSize() for that. * call getInstructionAugmentationSize() for that.
*/ */
protected int getInstructionsSize(TunnelGateway.Pending msg) { protected int getInstructionsSize(PendingGatewayMessage msg) {
if (msg.getFragmentNumber() > 0) if (msg.getFragmentNumber() > 0)
return 7; return 7;
// control byte // control byte
@ -287,7 +287,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
} }
/** @return 0 or 4 */ /** @return 0 or 4 */
protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) { protected int getInstructionAugmentationSize(PendingGatewayMessage msg, int offset, int instructionsSize) {
int payloadLength = msg.getData().length - msg.getOffset(); int payloadLength = msg.getData().length - msg.getOffset();
if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) { if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {
// requires fragmentation, so include the messageId // requires fragmentation, so include the messageId

View File

@ -37,7 +37,7 @@ import net.i2p.util.SimpleTimer2;
class TunnelGateway { class TunnelGateway {
protected final RouterContext _context; protected final RouterContext _context;
protected final Log _log; protected final Log _log;
protected final List<Pending> _queue; protected final List<PendingGatewayMessage> _queue;
protected final QueuePreprocessor _preprocessor; protected final QueuePreprocessor _preprocessor;
protected final Sender _sender; protected final Sender _sender;
protected final Receiver _receiver; protected final Receiver _receiver;
@ -171,7 +171,7 @@ class TunnelGateway {
* *
* @return true if we should delay before preprocessing again * @return true if we should delay before preprocessing again
*/ */
public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver); public boolean preprocessQueue(List<PendingGatewayMessage> pending, Sender sender, Receiver receiver);
/** how long do we want to wait before flushing */ /** how long do we want to wait before flushing */
public long getDelayAmount(); public long getDelayAmount();
@ -185,105 +185,6 @@ class TunnelGateway {
public long receiveEncrypted(byte encrypted[]); public long receiveEncrypted(byte encrypted[]);
} }
/**
* Stores all the state for an unsent or partially-sent message
*/
public static class Pending {
protected final Hash _toRouter;
protected final TunnelId _toTunnel;
protected final long _messageId;
protected final long _expiration;
protected final byte _remaining[];
protected int _offset;
protected int _fragmentNumber;
protected final long _created;
private List<Long> _messageIds;
public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
this(message, toRouter, toTunnel, System.currentTimeMillis());
}
public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel, long now) {
_toRouter = toRouter;
_toTunnel = toTunnel;
_messageId = message.getUniqueId();
_expiration = message.getMessageExpiration();
_remaining = message.toByteArray();
_created = now;
}
/** may be null */
public Hash getToRouter() { return _toRouter; }
/** may be null */
public TunnelId getToTunnel() { return _toTunnel; }
public long getMessageId() { return _messageId; }
public long getExpiration() { return _expiration; }
/** raw unfragmented message to send */
public byte[] getData() { return _remaining; }
/** index into the data to be sent */
public int getOffset() { return _offset; }
/** move the offset */
public void setOffset(int offset) { _offset = offset; }
public long getLifetime() { return System.currentTimeMillis()-_created; }
/** which fragment are we working on (0 for the first fragment) */
public int getFragmentNumber() { return _fragmentNumber; }
/** ok, fragment sent, increment what the next will be */
public void incrementFragmentNumber() { _fragmentNumber++; }
/**
* Add an ID to the list of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public void addMessageId(long id) {
synchronized (Pending.this) {
if (_messageIds == null)
_messageIds = new ArrayList();
_messageIds.add(Long.valueOf(id));
}
}
/**
* The IDs of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public List<Long> getMessageIds() {
synchronized (Pending.this) {
if (_messageIds != null)
return new ArrayList(_messageIds);
else
return new ArrayList();
}
}
}
/** Extend for debugging */
class PendingImpl extends Pending {
public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
super(message, toRouter, toTunnel, _context.clock().now());
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("Message ").append(_messageId).append(" on ");
buf.append(TunnelGateway.this.toString());
if (_toRouter != null) {
buf.append(" targetting ");
buf.append(_toRouter.toBase64()).append(" ");
if (_toTunnel != null)
buf.append(_toTunnel.getTunnelId());
}
long now = _context.clock().now();
buf.append(" actual lifetime ");
buf.append(now - _created).append("ms");
buf.append(" potential lifetime ");
buf.append(_expiration - _created).append("ms");
buf.append(" size ").append(_remaining.length);
buf.append(" offset ").append(_offset);
buf.append(" frag ").append(_fragmentNumber);
return buf.toString();
}
@Override
public long getLifetime() { return _context.clock().now()-_created; }
}
protected class DelayedFlush extends SimpleTimer2.TimedEvent { protected class DelayedFlush extends SimpleTimer2.TimedEvent {
DelayedFlush() { DelayedFlush() {
super(_context.simpleTimer2()); super(_context.simpleTimer2());

View File

@ -65,7 +65,7 @@ class TunnelGatewayPumper implements Runnable {
public void run() { public void run() {
PumpedTunnelGateway gw = null; PumpedTunnelGateway gw = null;
List<TunnelGateway.Pending> queueBuf = new ArrayList(32); List<PendingGatewayMessage> queueBuf = new ArrayList(32);
while (!_stop) { while (!_stop) {
try { try {
synchronized (_wantsPumping) { synchronized (_wantsPumping) {