* FragmentHandler: Zero-copy read of unfragmented messages

for speed and to reduce object churn
  * FragmentedMessage cleanup
This commit is contained in:
zzz
2012-03-16 12:16:05 +00:00
parent 080f435708
commit e7898b5b8f
2 changed files with 58 additions and 31 deletions

View File

@ -106,7 +106,7 @@ class FragmentHandler {
public FragmentHandler(RouterContext context, DefragmentedReceiver receiver) {
_context = context;
_log = context.logManager().getLog(FragmentHandler.class);
_fragmentedMessages = new HashMap(8);
_fragmentedMessages = new HashMap(16);
_receiver = receiver;
// all createRateStat in TunnelDispatcher
}
@ -351,8 +351,8 @@ class FragmentHandler {
int size = (int)DataHelper.fromLong(preprocessed, offset, 2);
offset += 2;
FragmentedMessage msg = null;
if (fragmented) {
FragmentedMessage msg;
synchronized (_fragmentedMessages) {
msg = _fragmentedMessages.get(Long.valueOf(messageId));
if (msg == null) {
@ -360,11 +360,7 @@ class FragmentHandler {
_fragmentedMessages.put(Long.valueOf(messageId), msg);
}
}
} else {
msg = new FragmentedMessage(_context);
}
if (fragmented) {
// synchronized is required, fragments may be arriving in different threads
synchronized(msg) {
boolean ok = msg.receive(messageId, preprocessed, offset, size, false, router, tunnelId);
@ -387,18 +383,17 @@ class FragmentHandler {
}
}
}
} else {
// synchronized not required if !fragmented
boolean ok = msg.receive(messageId, preprocessed, offset, size, true, router, tunnelId);
if (!ok) return -1;
} else {
// Unfragmented
// synchronized not required
// always complete, never an expire event
receiveComplete(msg);
receiveComplete(preprocessed, offset, size, router, tunnelId);
}
offset += size;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling finished message " + msg.getMessageId() + " at offset " + offset);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Handling finished message " + msg.getMessageId() + " at offset " + offset);
return offset;
}
@ -462,6 +457,7 @@ class FragmentHandler {
return offset;
}
private void receiveComplete(FragmentedMessage msg) {
if (msg == null)
return;
@ -500,6 +496,37 @@ class FragmentHandler {
}
}
/**
* Zero-copy reception of an unfragmented message
* @since 0.9
*/
private void receiveComplete(byte[] data, int offset, int len, Hash router, TunnelId tunnelId) {
_completed++;
try {
if (_log.shouldLog(Log.DEBUG))
_log.debug("RECV unfrag(" + len + ')');
// TODO read in as unknown message for outbound tunnels,
// since this will just be packaged in a TunnelGatewayMessage.
// Not a big savings since most everything is a GarlicMessage
// and so the readMessage() call is fast.
// The unencrypted messages at the OBEP are (V)TBMs
// and perhaps an occasional DatabaseLookupMessage
I2NPMessageHandler h = new I2NPMessageHandler(_context);
h.readMessage(data, offset, len);
I2NPMessage m = h.lastRead();
noteReception(m.getUniqueId(), 0, "complete: ");// + msg.toString());
noteCompletion(m.getUniqueId());
_receiver.receiveComplete(m, router, tunnelId);
} catch (I2NPMessageException ime) {
if (_log.shouldLog(Log.WARN)) {
_log.warn("Error receiving unfragmented message (corrupt?)", ime);
_log.warn("DUMP:\n" + HexDump.dump(data, offset, len));
_log.warn("RAW:\n" + Base64.encode(data, offset, len));
}
}
}
protected void noteReception(long messageId, int fragmentId, Object status) {}
protected void noteCompletion(long messageId) {}
protected void noteFailure(long messageId, String status) {}
@ -523,10 +550,12 @@ class FragmentHandler {
}
private class RemoveFailed implements SimpleTimer.TimedEvent {
private FragmentedMessage _msg;
private final FragmentedMessage _msg;
public RemoveFailed(FragmentedMessage msg) {
_msg = msg;
}
public void timeReached() {
boolean removed = false;
synchronized (_fragmentedMessages) {

View File

@ -1,18 +1,10 @@
package net.i2p.router.tunnel;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.DataMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageHandler;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
@ -30,7 +22,7 @@ class FragmentedMessage {
private long _messageId;
private Hash _toRouter;
private TunnelId _toTunnel;
private ByteArray _fragments[];
private final ByteArray _fragments[];
private boolean _lastReceived;
private int _highFragmentNum;
private final long _createdOn;
@ -93,9 +85,9 @@ class FragmentedMessage {
ba.setValid(length);
ba.setOffset(0);
//System.arraycopy(payload, offset, ba.getData(), 0, length);
if (_log.shouldLog(Log.DEBUG))
_log.debug("fragment[" + fragmentNum + "/" + offset + "/" + length + "]: "
+ Base64.encode(ba.getData(), ba.getOffset(), ba.getValid()));
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("fragment[" + fragmentNum + "/" + offset + "/" + length + "]: "
// + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid()));
_fragments[fragmentNum] = ba;
_lastReceived = _lastReceived || isLast;
@ -145,9 +137,9 @@ class FragmentedMessage {
ba.setValid(length);
ba.setOffset(0);
//System.arraycopy(payload, offset, ba.getData(), 0, length);
if (_log.shouldLog(Log.DEBUG))
_log.debug("fragment[0/" + offset + "/" + length + "]: "
+ Base64.encode(ba.getData(), ba.getOffset(), ba.getValid()));
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("fragment[0/" + offset + "/" + length + "]: "
// + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid()));
_fragments[0] = ba;
_lastReceived = _lastReceived || isLast;
_toRouter = toRouter;
@ -160,6 +152,7 @@ class FragmentedMessage {
public long getMessageId() { return _messageId; }
public Hash getTargetRouter() { return _toRouter; }
public TunnelId getTargetTunnel() { return _toTunnel; }
public int getFragmentCount() {
int found = 0;
for (int i = 0; i < _fragments.length; i++)
@ -204,6 +197,7 @@ class FragmentedMessage {
public boolean getReleased() { return _completed; }
/****
public void writeComplete(OutputStream out) throws IOException {
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("use after free in FragmentedMessage");
@ -216,7 +210,10 @@ class FragmentedMessage {
}
_completed = true;
}
public void writeComplete(byte target[], int offset) {
****/
/** */
private void writeComplete(byte target[], int offset) {
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("use after free in FragmentedMessage");
_log.error("FM writeComplete() 2", e);
@ -229,6 +226,7 @@ class FragmentedMessage {
}
_completed = true;
}
public byte[] toByteArray() {
synchronized (this) {
if (_releasedAfter > 0) return null;