* Tunnels

- More code to detect improper reuse of cached objects
        after release
      - Don't pass a msg with a failed IV on to
        the FragmentHandler at the OBEP
      - More cleanups and comments
This commit is contained in:
zzz
2009-12-26 20:20:11 +00:00
parent 213bc4bb71
commit 72a588bfbf
8 changed files with 97 additions and 17 deletions

View File

@ -363,6 +363,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending); _log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending);
// Might as well take a buf from the cache;
// However it will never be returned to the cache.
// (TunnelDataMessage will not wrap the buffer in a new ByteArray and release() it)
// See also TDM for more discussion.
byte preprocessed[] = _dataCache.acquire().getData(); byte preprocessed[] = _dataCache.acquire().getData();
int offset = 0; int offset = 0;

View File

@ -4,7 +4,6 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64; import net.i2p.data.Base64;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
@ -13,6 +12,7 @@ import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageException; import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.I2NPMessageHandler; import net.i2p.data.i2np.I2NPMessageHandler;
import net.i2p.router.RouterContext;
import net.i2p.util.ByteCache; import net.i2p.util.ByteCache;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
@ -30,6 +30,8 @@ fragments it across the necessary number of 1KB tunnel messages, and decides how
each I2NP message should be handled by the tunnel endpoint, encoding that each I2NP message should be handled by the tunnel endpoint, encoding that
data into the raw tunnel payload:</p> data into the raw tunnel payload:</p>
<ul> <ul>
<li>The 4 byte Tunnel ID</li>
<li>The 16 byte IV</li>
<li>the first 4 bytes of the SHA256 of (the remaining preprocessed data concatenated <li>the first 4 bytes of the SHA256 of (the remaining preprocessed data concatenated
with the IV), using the IV as will be seen on the tunnel endpoint (for with the IV), using the IV as will be seen on the tunnel endpoint (for
outbound tunnels), or the IV as was seen on the tunnel gateway (for inbound outbound tunnels), or the IV as was seen on the tunnel gateway (for inbound
@ -81,13 +83,15 @@ set, this is a follow on fragment.</p>
</ul> </ul>
<p>The I2NP message is encoded in its standard form, and the <p>The I2NP message is encoded in its standard form, and the
preprocessed payload must be padded to a multiple of 16 bytes.</p> preprocessed payload must be padded to a multiple of 16 bytes.
The total size, including the tunnel ID and IV, is 1028 bytes.
</p>
* *
*/ */
public class FragmentHandler { public class FragmentHandler {
private I2PAppContext _context; protected RouterContext _context;
private Log _log; protected Log _log;
private final Map<Long, FragmentedMessage> _fragmentedMessages; private final Map<Long, FragmentedMessage> _fragmentedMessages;
private DefragmentedReceiver _receiver; private DefragmentedReceiver _receiver;
private int _completed; private int _completed;
@ -98,7 +102,7 @@ public class FragmentHandler {
static long MAX_DEFRAGMENT_TIME = 60*1000; static long MAX_DEFRAGMENT_TIME = 60*1000;
private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE); private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE);
public FragmentHandler(I2PAppContext context, DefragmentedReceiver receiver) { public FragmentHandler(RouterContext context, DefragmentedReceiver receiver) {
_context = context; _context = context;
_log = context.logManager().getLog(FragmentHandler.class); _log = context.logManager().getLog(FragmentHandler.class);
_fragmentedMessages = new HashMap(8); _fragmentedMessages = new HashMap(8);
@ -185,6 +189,9 @@ public class FragmentHandler {
// each of the FragmentedMessages populated make a copy out of the // each of the FragmentedMessages populated make a copy out of the
// payload, which they release separately, so we can release // payload, which they release separately, so we can release
// immediately // immediately
//
// This is certainly interesting, to wrap the 1024-byte array in a new ByteArray
// in order to put it in the pool, but it shouldn't cause any harm.
_cache.release(new ByteArray(preprocessed)); _cache.release(new ByteArray(preprocessed));
} }
} }
@ -204,6 +211,13 @@ public class FragmentHandler {
* this. * this.
*/ */
private boolean verifyPreprocessed(byte preprocessed[], int offset, int length) { private boolean verifyPreprocessed(byte preprocessed[], int offset, int length) {
// ByteCache/ByteArray corruption detection
//byte[] orig = new byte[length];
//System.arraycopy(preprocessed, 0, orig, 0, length);
//try {
// Thread.sleep(75);
//} catch (InterruptedException ie) {}
// now we need to verify that the message was received correctly // now we need to verify that the message was received correctly
int paddingEnd = HopProcessor.IV_LENGTH + 4; int paddingEnd = HopProcessor.IV_LENGTH + 4;
while (preprocessed[offset+paddingEnd] != (byte)0x00) { while (preprocessed[offset+paddingEnd] != (byte)0x00) {
@ -249,6 +263,13 @@ public class FragmentHandler {
_context.statManager().addRateData("tunnel.fullFragments", 1, 0); _context.statManager().addRateData("tunnel.fullFragments", 1, 0);
} }
// ByteCache/ByteArray corruption detection
//if (!DataHelper.eq(preprocessed, 0, orig, 0, length)) {
// _log.log(Log.CRIT, "Not equal! orig =\n" + Base64.encode(orig, 0, length) +
// "\nprep =\n" + Base64.encode(preprocessed, 0, length),
// new Exception("hosed"));
//}
return eq; return eq;
} }
@ -514,7 +535,7 @@ public class FragmentHandler {
_failed++; _failed++;
noteFailure(_msg.getMessageId(), _msg.toString()); noteFailure(_msg.getMessageId(), _msg.toString());
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Dropped failed fragmented message: " + _msg); _log.warn("Dropped incomplete fragmented message: " + _msg);
_context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime()); _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime());
_msg.failed(); _msg.failed();
} else { } else {

View File

@ -188,6 +188,11 @@ public class FragmentedMessage {
public int getCompleteSize() { public int getCompleteSize() {
if (!_lastReceived) if (!_lastReceived)
throw new IllegalStateException("wtf, don't get the completed size when we're not complete"); throw new IllegalStateException("wtf, don't get the completed size when we're not complete");
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("use after free in FragmentedMessage");
_log.error("FM completeSize()", e);
throw e;
}
int size = 0; int size = 0;
for (int i = 0; i <= _highFragmentNum; i++) { for (int i = 0; i <= _highFragmentNum; i++) {
ByteArray ba = _fragments[i]; ByteArray ba = _fragments[i];
@ -205,6 +210,11 @@ public class FragmentedMessage {
public void writeComplete(OutputStream out) throws IOException { public void writeComplete(OutputStream out) throws IOException {
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("use after free in FragmentedMessage");
_log.error("FM writeComplete()", e);
throw e;
}
for (int i = 0; i <= _highFragmentNum; i++) { for (int i = 0; i <= _highFragmentNum; i++) {
ByteArray ba = _fragments[i]; ByteArray ba = _fragments[i];
out.write(ba.getData(), ba.getOffset(), ba.getValid()); out.write(ba.getData(), ba.getOffset(), ba.getValid());
@ -212,6 +222,11 @@ public class FragmentedMessage {
_completed = true; _completed = true;
} }
public void writeComplete(byte target[], int offset) { public void writeComplete(byte target[], int offset) {
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("use after free in FragmentedMessage");
_log.error("FM writeComplete() 2", e);
throw e;
}
for (int i = 0; i <= _highFragmentNum; i++) { for (int i = 0; i <= _highFragmentNum; i++) {
ByteArray ba = _fragments[i]; ByteArray ba = _fragments[i];
System.arraycopy(ba.getData(), ba.getOffset(), target, offset, ba.getValid()); System.arraycopy(ba.getData(), ba.getOffset(), target, offset, ba.getValid());
@ -241,6 +256,11 @@ public class FragmentedMessage {
* *
*/ */
private void releaseFragments() { private void releaseFragments() {
if (_releasedAfter > 0) {
RuntimeException e = new RuntimeException("double free in FragmentedMessage");
_log.error("FM releaseFragments()", e);
throw e;
}
_releasedAfter = getLifetime(); _releasedAfter = getLifetime();
for (int i = 0; i <= _highFragmentNum; i++) { for (int i = 0; i <= _highFragmentNum; i++) {
ByteArray ba = _fragments[i]; ByteArray ba = _fragments[i];
@ -251,6 +271,7 @@ public class FragmentedMessage {
} }
} }
/****
public InputStream getInputStream() { return new FragmentInputStream(); } public InputStream getInputStream() { return new FragmentInputStream(); }
private class FragmentInputStream extends InputStream { private class FragmentInputStream extends InputStream {
private int _fragment; private int _fragment;
@ -274,6 +295,7 @@ public class FragmentedMessage {
} }
} }
} }
****/
@Override @Override
public String toString() { public String toString() {
@ -301,6 +323,7 @@ public class FragmentedMessage {
return buf.toString(); return buf.toString();
} }
/*****
public static void main(String args[]) { public static void main(String args[]) {
try { try {
I2PAppContext ctx = I2PAppContext.getGlobalContext(); I2PAppContext ctx = I2PAppContext.getGlobalContext();
@ -327,4 +350,5 @@ public class FragmentedMessage {
e.printStackTrace(); e.printStackTrace();
} }
} }
******/
} }

View File

@ -30,7 +30,16 @@ public class OutboundTunnelEndpoint {
} }
public void dispatch(TunnelDataMessage msg, Hash recvFrom) { public void dispatch(TunnelDataMessage msg, Hash recvFrom) {
_config.incrementProcessedMessages(); _config.incrementProcessedMessages();
_processor.process(msg.getData(), 0, msg.getData().length, recvFrom); boolean ok = _processor.process(msg.getData(), 0, msg.getData().length, recvFrom);
if (!ok) {
// invalid IV
// If we pass it on to the handler, it will fail
// If we don't, the data buf won't get released from the cache... that's ok
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid IV, dropping at OBEP " + _config);
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
return;
}
_handler.receiveTunnelMessage(msg.getData(), 0, msg.getData().length); _handler.receiveTunnelMessage(msg.getData(), 0, msg.getData().length);
} }

View File

@ -7,31 +7,27 @@ import net.i2p.util.Log;
* Minor extension to allow message history integration * Minor extension to allow message history integration
*/ */
public class RouterFragmentHandler extends FragmentHandler { public class RouterFragmentHandler extends FragmentHandler {
private RouterContext _routerContext;
private Log _log;
public RouterFragmentHandler(RouterContext context, DefragmentedReceiver receiver) { public RouterFragmentHandler(RouterContext context, DefragmentedReceiver receiver) {
super(context, receiver); super(context, receiver);
_routerContext = context;
_log = context.logManager().getLog(RouterFragmentHandler.class);
} }
@Override @Override
protected void noteReception(long messageId, int fragmentId, Object status) { protected void noteReception(long messageId, int fragmentId, Object status) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Received fragment " + fragmentId + " for message " + messageId + ": " + status); _log.info("Received fragment " + fragmentId + " for message " + messageId + ": " + status);
_routerContext.messageHistory().receiveTunnelFragment(messageId, fragmentId, status); _context.messageHistory().receiveTunnelFragment(messageId, fragmentId, status);
} }
@Override @Override
protected void noteCompletion(long messageId) { protected void noteCompletion(long messageId) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Received complete message " + messageId); _log.info("Received complete message " + messageId);
_routerContext.messageHistory().receiveTunnelFragmentComplete(messageId); _context.messageHistory().receiveTunnelFragmentComplete(messageId);
} }
@Override @Override
protected void noteFailure(long messageId, String status) { protected void noteFailure(long messageId, String status) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Dropped message " + messageId + ": " + status); _log.info("Dropped message " + messageId + ": " + status);
_routerContext.messageHistory().droppedFragmentedMessage(messageId, status); _context.messageHistory().droppedFragmentedMessage(messageId, status);
} }
} }

View File

@ -25,9 +25,17 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
public static final int PREPROCESSED_SIZE = 1024; public static final int PREPROCESSED_SIZE = 1024;
protected static final int IV_SIZE = HopProcessor.IV_LENGTH; protected static final int IV_SIZE = HopProcessor.IV_LENGTH;
/**
* Here in tunnels, we take from the cache but never add to it.
* In other words, we take advantage of other places in the router also using 1024-byte ByteCaches
* (since ByteCache only maintains once instance for each size)
* Used in BatchedPreprocessor; see add'l comments there
*/
protected static final ByteCache _dataCache = ByteCache.getInstance(32, PREPROCESSED_SIZE); protected static final ByteCache _dataCache = ByteCache.getInstance(32, PREPROCESSED_SIZE);
protected static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
protected static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH); private static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
private static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH);
public TrivialPreprocessor(RouterContext ctx) { public TrivialPreprocessor(RouterContext ctx) {
_context = ctx; _context = ctx;
@ -41,8 +49,10 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
* Return true if there were messages remaining, and we should queue up * Return true if there were messages remaining, and we should queue up
* a delayed flush to clear them * a delayed flush to clear them
* *
* NOTE: Unused here, see BatchedPreprocessor override, super is not called.
*/ */
public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (true) throw new IllegalArgumentException("unused, right?");
long begin = System.currentTimeMillis(); long begin = System.currentTimeMillis();
StringBuilder buf = null; StringBuilder buf = null;
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
@ -87,6 +97,9 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds, String msg) {} protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds, String msg) {}
/*
* @deprecated unused except by above
*/
private byte[][] preprocess(TunnelGateway.Pending msg) { private byte[][] preprocess(TunnelGateway.Pending msg) {
List fragments = new ArrayList(1); List fragments = new ArrayList(1);
@ -110,6 +123,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
* bytes after the IV, followed by the first 4 bytes of that SHA256, lining up * bytes after the IV, followed by the first 4 bytes of that SHA256, lining up
* exactly to meet the beginning of the instructions. (i hope) * exactly to meet the beginning of the instructions. (i hope)
* *
* @deprecated unused except by above
*/ */
private byte[] preprocessFragment(TunnelGateway.Pending msg) { private byte[] preprocessFragment(TunnelGateway.Pending msg) {
byte target[] = _dataCache.acquire().getData(); byte target[] = _dataCache.acquire().getData();

View File

@ -224,6 +224,10 @@ public class TunnelGateway {
public int getFragmentNumber() { return _fragmentNumber; } public int getFragmentNumber() { return _fragmentNumber; }
/** ok, fragment sent, increment what the next will be */ /** ok, fragment sent, increment what the next will be */
public void incrementFragmentNumber() { _fragmentNumber++; } public void incrementFragmentNumber() { _fragmentNumber++; }
/**
* Add an ID to the list of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public void addMessageId(long id) { public void addMessageId(long id) {
synchronized (Pending.this) { synchronized (Pending.this) {
if (_messageIds == null) if (_messageIds == null)
@ -231,6 +235,10 @@ public class TunnelGateway {
_messageIds.add(new Long(id)); _messageIds.add(new Long(id));
} }
} }
/**
* The IDs of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public List<Long> getMessageIds() { public List<Long> getMessageIds() {
synchronized (Pending.this) { synchronized (Pending.this) {
if (_messageIds != null) if (_messageIds != null)

View File

@ -71,6 +71,9 @@ public class TunnelParticipant {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Failed to dispatch " + msg + ": processor=" + _processor _log.warn("Failed to dispatch " + msg + ": processor=" + _processor
+ " inboundEndpoint=" + _inboundEndpointProcessor); + " inboundEndpoint=" + _inboundEndpointProcessor);
if (_config != null)
_config.incrementProcessedMessages();
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
return; return;
} }