* you mean 3f != 0x3f? [duh]

* minor cleanups
This commit is contained in:
jrandom
2005-07-09 22:58:22 +00:00
committed by zzz
parent ad47bf5da3
commit d3380228ac
13 changed files with 172 additions and 44 deletions

View File

@ -25,6 +25,7 @@ public class TunnelDataMessage extends I2NPMessageImpl {
private long _tunnelId; private long _tunnelId;
private TunnelId _tunnelIdObj; private TunnelId _tunnelIdObj;
private byte[] _data; private byte[] _data;
private ByteArray _dataBuf;
public final static int MESSAGE_TYPE = 18; public final static int MESSAGE_TYPE = 18;
private static final int DATA_SIZE = 1024; private static final int DATA_SIZE = 1024;
@ -81,10 +82,12 @@ public class TunnelDataMessage extends I2NPMessageImpl {
// we cant cache it in trivial form, as other components (e.g. HopProcessor) // we cant cache it in trivial form, as other components (e.g. HopProcessor)
// call getData() and use it as the buffer to write with. it is then used // call getData() and use it as the buffer to write with. it is then used
// again to pass to the 'receiver', which may even cache it in a FragmentMessage. // again to pass to the 'receiver', which may even cache it in a FragmentMessage.
if (PIPELINED_CACHE) if (PIPELINED_CACHE) {
_data = _cache.acquire().getData(); _dataBuf = _cache.acquire();
else _data = _dataBuf.getData();
} else {
_data = new byte[DATA_SIZE]; _data = new byte[DATA_SIZE];
}
System.arraycopy(data, curIndex, _data, 0, DATA_SIZE); System.arraycopy(data, curIndex, _data, 0, DATA_SIZE);
} }
@ -102,7 +105,7 @@ public class TunnelDataMessage extends I2NPMessageImpl {
System.arraycopy(_data, 0, out, curIndex, DATA_SIZE); System.arraycopy(_data, 0, out, curIndex, DATA_SIZE);
curIndex += _data.length; curIndex += _data.length;
if (PIPELINED_CACHE) if (PIPELINED_CACHE)
_cache.release(new ByteArray(_data)); _cache.release(_dataBuf);
return curIndex; return curIndex;
} }

View File

@ -99,14 +99,15 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
Map messages = from.getInboundMessages(); Map messages = from.getInboundMessages();
for (int i = 0; i < fragments; i++) { for (int i = 0; i < fragments; i++) {
Long messageId = new Long(data.readMessageId(i)); long mid = data.readMessageId(i);
Long messageId = new Long(mid);
if (_recentlyCompletedMessages.isKnown(messageId.longValue())) { if (_recentlyCompletedMessages.isKnown(mid)) {
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0); _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
from.messageFullyReceived(messageId, -1); from.messageFullyReceived(messageId, -1);
_ackSender.ackPeer(from); _ackSender.ackPeer(from);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Message received is a dup: " + messageId + " dups: " _log.warn("Message received is a dup: " + mid + " dups: "
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of " + _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
+ _recentlyCompletedMessages.getInsertedCount()); + _recentlyCompletedMessages.getInsertedCount());
continue; continue;
@ -124,7 +125,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
synchronized (messages) { synchronized (messages) {
state = (InboundMessageState)messages.get(messageId); state = (InboundMessageState)messages.get(messageId);
if (state == null) { if (state == null) {
state = new InboundMessageState(_context, messageId.longValue(), fromPeer); state = new InboundMessageState(_context, mid, fromPeer);
messages.put(messageId, state); messages.put(messageId, state);
} }
@ -141,7 +142,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
} }
if (messageComplete) { if (messageComplete) {
_recentlyCompletedMessages.add(messageId.longValue()); _recentlyCompletedMessages.add(mid);
_messageReceiver.receiveMessage(state); _messageReceiver.receiveMessage(state);
from.messageFullyReceived(messageId, state.getCompleteSize()); from.messageFullyReceived(messageId, state.getCompleteSize());

View File

@ -1,5 +1,6 @@
package net.i2p.router.transport.udp; package net.i2p.router.transport.udp;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
@ -66,8 +67,19 @@ public class InboundMessageState {
int size = data.readMessageFragmentSize(dataFragment); int size = data.readMessageFragmentSize(dataFragment);
message.setValid(size); message.setValid(size);
_fragments[fragmentNum] = message; _fragments[fragmentNum] = message;
if (data.readMessageIsLast(dataFragment)) boolean isLast = data.readMessageIsLast(dataFragment);
if (isLast)
_lastFragment = fragmentNum; _lastFragment = fragmentNum;
if (_log.shouldLog(Log.DEBUG))
_log.debug("New fragment " + fragmentNum + " for message " + _messageId
+ ", size=" + size
+ ", isLast=" + isLast
+ ", data=" + Base64.encode(message.getData(), 0, size));
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received fragment " + fragmentNum + " for message " + _messageId
+ " again, old size=" + _fragments[fragmentNum].getValid()
+ " and new size=" + data.readMessageFragmentSize(dataFragment));
} }
return true; return true;
} }

View File

@ -93,7 +93,9 @@ public class MessageReceiver implements Runnable {
System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid()); System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": " _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": "
+ Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())); + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())
+ " (valid: " + fragments[i].getValid()
+ " raw: " + Base64.encode(fragments[i].getData()) + ")");
off += fragments[i].getValid(); off += fragments[i].getValid();
} }
if (off != state.getCompleteSize()) if (off != state.getCompleteSize())

View File

@ -248,8 +248,9 @@ public class OutboundMessageState {
int toSend = end - start; int toSend = end - start;
System.arraycopy(_messageBuf.getData(), start, out, outOffset, toSend); System.arraycopy(_messageBuf.getData(), start, out, outOffset, toSend);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId + ": " _log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId
+ Base64.encode(_messageBuf.getData(), start, toSend)); + "[" + start + "-" + (start+toSend) + "/" + _messageBuf.getValid() + "/" + _fragmentSize + "]: "
+ Base64.encode(out, outOffset, toSend));
return toSend; return toSend;
} }

View File

@ -28,6 +28,7 @@ public class PacketBuilder {
private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE); private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE);
private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH); private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH);
private static final ByteCache _blockCache = ByteCache.getInstance(64, 16);
public PacketBuilder(I2PAppContext ctx) { public PacketBuilder(I2PAppContext ctx) {
_context = ctx; _context = ctx;
@ -70,20 +71,32 @@ public class PacketBuilder {
if (size < 0) if (size < 0)
return null; return null;
DataHelper.toLong(data, off, 2, size); DataHelper.toLong(data, off, 2, size);
data[off] &= (byte)3F; // 2 highest bits are reserved data[off] &= (byte)0x3F; // 2 highest bits are reserved
off += 2; off += 2;
size = state.writeFragment(data, off, fragment); int sizeWritten = state.writeFragment(data, off, fragment);
if (sizeWritten != size)
_log.error("Size written: " + sizeWritten + " but size: " + size
+ " for fragment " + fragment + " of " + state.getMessageId());
else if (_log.shouldLog(Log.DEBUG))
_log.debug("Size written: " + sizeWritten + " for fragment " + fragment
+ " of " + state.getMessageId());
size = sizeWritten;
if (size < 0) return null; if (size < 0) return null;
off += size; off += size;
// we can pad here if we want, maybe randomized? // we can pad here if we want, maybe randomized?
// pad up so we're on the encryption boundary // pad up so we're on the encryption boundary
if ( (off % 16) != 0) int padSize = 16 - (off % 16);
off += 16 - (off % 16); if (padSize > 0) {
ByteArray block = _blockCache.acquire();
_context.random().nextBytes(block.getData());
System.arraycopy(block.getData(), 0, data, off, padSize);
_blockCache.release(block);
off += padSize;
}
packet.getPacket().setLength(off); packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey()); authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort()); setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
return packet; return packet;
@ -169,7 +182,6 @@ public class PacketBuilder {
if ( (off % 16) != 0) if ( (off % 16) != 0)
off += 16 - (off % 16); off += 16 - (off % 16);
packet.getPacket().setLength(off); packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey()); authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort()); setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
return packet; return packet;
@ -262,7 +274,6 @@ public class PacketBuilder {
if ( (off % 16) != 0) if ( (off % 16) != 0)
off += 16 - (off % 16); off += 16 - (off % 16);
packet.getPacket().setLength(off); packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, ourIntroKey, ourIntroKey, iv); authenticate(packet, ourIntroKey, ourIntroKey, iv);
setTo(packet, to, state.getSentPort()); setTo(packet, to, state.getSentPort());
_ivCache.release(iv); _ivCache.release(iv);
@ -321,7 +332,6 @@ public class PacketBuilder {
if ( (off % 16) != 0) if ( (off % 16) != 0)
off += 16 - (off % 16); off += 16 - (off % 16);
packet.getPacket().setLength(off); packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, state.getIntroKey(), state.getIntroKey()); authenticate(packet, state.getIntroKey(), state.getIntroKey());
setTo(packet, to, state.getSentPort()); setTo(packet, to, state.getSentPort());
return packet; return packet;
@ -416,7 +426,6 @@ public class PacketBuilder {
System.arraycopy(state.getSentSignature().getData(), 0, data, off, Signature.SIGNATURE_BYTES); System.arraycopy(state.getSentSignature().getData(), 0, data, off, Signature.SIGNATURE_BYTES);
packet.getPacket().setLength(off + Signature.SIGNATURE_BYTES); packet.getPacket().setLength(off + Signature.SIGNATURE_BYTES);
packet.setPacketDataLength(off + Signature.SIGNATURE_BYTES);
authenticate(packet, state.getCipherKey(), state.getMACKey()); authenticate(packet, state.getCipherKey(), state.getMACKey());
} else { } else {
// nothing more to add beyond the identity fragment, though we can // nothing more to add beyond the identity fragment, though we can
@ -426,7 +435,6 @@ public class PacketBuilder {
if ( (off % 16) != 0) if ( (off % 16) != 0)
off += 16 - (off % 16); off += 16 - (off % 16);
packet.getPacket().setLength(off); packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, state.getIntroKey(), state.getIntroKey()); authenticate(packet, state.getIntroKey(), state.getIntroKey());
} }
@ -469,7 +477,7 @@ public class PacketBuilder {
*/ */
private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, ByteArray iv) { private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, ByteArray iv) {
int encryptOffset = packet.getPacket().getOffset() + UDPPacket.IV_SIZE + UDPPacket.MAC_SIZE; int encryptOffset = packet.getPacket().getOffset() + UDPPacket.IV_SIZE + UDPPacket.MAC_SIZE;
int encryptSize = packet.getPacketDataLength()/*packet.getPacket().getLength()*/ - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset(); int encryptSize = packet.getPacket().getLength() - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset();
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
_context.aes().encrypt(data, encryptOffset, data, encryptOffset, cipherKey, iv.getData(), encryptSize); _context.aes().encrypt(data, encryptOffset, data, encryptOffset, cipherKey, iv.getData(), encryptSize);
@ -488,7 +496,7 @@ public class PacketBuilder {
_context.hmac().calculate(macKey, data, hmacOff, hmacLen, ba.getData(), 0); _context.hmac().calculate(macKey, data, hmacOff, hmacLen, ba.getData(), 0);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Authenticating " + packet.getPacketDataLength() + // packet.getPacket().getLength() + _log.debug("Authenticating " + packet.getPacket().getLength() +
"\nIV: " + Base64.encode(iv.getData()) + "\nIV: " + Base64.encode(iv.getData()) +
"\nraw mac: " + Base64.encode(ba.getData()) + "\nraw mac: " + Base64.encode(ba.getData()) +
"\nMAC key: " + macKey.toBase64()); "\nMAC key: " + macKey.toBase64());
@ -499,4 +507,20 @@ public class PacketBuilder {
System.arraycopy(iv.getData(), 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE); System.arraycopy(iv.getData(), 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE);
_hmacCache.release(ba); _hmacCache.release(ba);
} }
public static void main(String args[]) {
byte data[] = new byte[32];
int off = 0;
int size = 1216;
System.out.println("Binary: " + Integer.toBinaryString(size));
System.out.println("Binary: " + Integer.toBinaryString(0x3F));
DataHelper.toLong(data, off, 2, size);
System.out.println(DataHelper.toHexString(data));
data[off] &= (byte)0x3F; // 2 highest bits are reserved
System.out.println(DataHelper.toHexString(data));
System.out.println(DataHelper.toHexString(data));
long val = DataHelper.fromLong(data, off, 2);
System.out.println("Val: " + val + " size: " + size + " raw: " +Base64.encode(data));
}
} }

View File

@ -1,6 +1,7 @@
package net.i2p.router.transport.udp; package net.i2p.router.transport.udp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -15,7 +16,7 @@ import net.i2p.data.SessionKey;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
* Contain all of the state about a UDP connection to a peer * Contain all of the state about a UDP connection to a peer.
* *
*/ */
public class PeerState { public class PeerState {
@ -158,7 +159,7 @@ public class PeerState {
* 502 fragment bytes, which is enough to send a tunnel data message in 2 * 502 fragment bytes, which is enough to send a tunnel data message in 2
* packets. * packets.
*/ */
private static final int DEFAULT_MTU = 576; private static final int DEFAULT_MTU = 1500;
private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY; private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY;
private static final int MAX_RTO = 2000; // 5000; private static final int MAX_RTO = 2000; // 5000;
@ -503,9 +504,10 @@ public class PeerState {
* *
*/ */
public List retrieveACKBitfields() { public List retrieveACKBitfields() {
List rv = new ArrayList(16); List rv = null;
int bytesRemaining = countMaxACKData(); int bytesRemaining = countMaxACKData();
synchronized (_currentACKs) { synchronized (_currentACKs) {
rv = new ArrayList(_currentACKs.size());
while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) { while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) {
rv.add(new FullACKBitfield((Long)_currentACKs.remove(0))); rv.add(new FullACKBitfield((Long)_currentACKs.remove(0)));
bytesRemaining -= 4; bytesRemaining -= 4;
@ -526,6 +528,8 @@ public class PeerState {
ACKBitfield bitfield = (ACKBitfield)partial.get(i); ACKBitfield bitfield = (ACKBitfield)partial.get(i);
int bytes = (bitfield.fragmentCount() / 7) + 1; int bytes = (bitfield.fragmentCount() / 7) + 1;
if (bytesRemaining > bytes + 4) { // msgId + bitfields if (bytesRemaining > bytes + 4) { // msgId + bitfields
if (rv == null)
rv = new ArrayList(partial.size());
rv.add(bitfield); rv.add(bitfield);
bytesRemaining -= bytes + 4; bytesRemaining -= bytes + 4;
partialIncluded++; partialIncluded++;
@ -538,6 +542,8 @@ public class PeerState {
_context.statManager().addRateData("udp.sendACKPartial", partialIncluded, rv.size() - partialIncluded); _context.statManager().addRateData("udp.sendACKPartial", partialIncluded, rv.size() - partialIncluded);
_lastACKSend = _context.clock().now(); _lastACKSend = _context.clock().now();
if (rv == null)
rv = Collections.EMPTY_LIST;
return rv; return rv;
} }

View File

@ -112,7 +112,7 @@ public class UDPEndpointTest {
try { try {
packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort()); packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort());
packet.writeData(data, 0, 1024); packet.writeData(data, 0, 1024);
packet.setPacketDataLength(1024); packet.getPacket().setLength(1024);
int outstanding = _sentNotReceived.size() + 1; int outstanding = _sentNotReceived.size() + 1;
_sentNotReceived.add(new ByteArray(data, 0, 1024)); _sentNotReceived.add(new ByteArray(data, 0, 1024));
_log.debug("Sending packet " + curPacket + " with outstanding " + outstanding); _log.debug("Sending packet " + curPacket + " with outstanding " + outstanding);

View File

@ -26,7 +26,6 @@ public class UDPPacket {
private I2PAppContext _context; private I2PAppContext _context;
private static Log _log; private static Log _log;
private volatile DatagramPacket _packet; private volatile DatagramPacket _packet;
private volatile int _packetDataLength;
private volatile short _priority; private volatile short _priority;
private volatile long _initializeTime; private volatile long _initializeTime;
private volatile long _expiration; private volatile long _expiration;
@ -77,7 +76,6 @@ public class UDPPacket {
_dataBuf = _dataCache.acquire(); _dataBuf = _dataCache.acquire();
_data = _dataBuf.getData(); _data = _dataBuf.getData();
_packet = new DatagramPacket(_data, MAX_PACKET_SIZE); _packet = new DatagramPacket(_data, MAX_PACKET_SIZE);
_packetDataLength = 0;
_initializeTime = _context.clock().now(); _initializeTime = _context.clock().now();
_markedType = -1; _markedType = -1;
} }
@ -98,7 +96,6 @@ public class UDPPacket {
verifyNotReleased(); verifyNotReleased();
System.arraycopy(src, offset, _data, 0, len); System.arraycopy(src, offset, _data, 0, len);
_packet.setLength(len); _packet.setLength(len);
setPacketDataLength(len);
resetBegin(); resetBegin();
} }
public DatagramPacket getPacket() { verifyNotReleased(); return _packet; } public DatagramPacket getPacket() { verifyNotReleased(); return _packet; }
@ -106,8 +103,6 @@ public class UDPPacket {
public long getExpiration() { verifyNotReleased(); return _expiration; } public long getExpiration() { verifyNotReleased(); return _expiration; }
public long getBegin() { verifyNotReleased(); return _initializeTime; } public long getBegin() { verifyNotReleased(); return _initializeTime; }
public long getLifetime() { verifyNotReleased(); return _context.clock().now() - _initializeTime; } public long getLifetime() { verifyNotReleased(); return _context.clock().now() - _initializeTime; }
public int getPacketDataLength() { return _packetDataLength; }
public void setPacketDataLength(int bytes) { _packetDataLength = bytes; }
public void resetBegin() { _initializeTime = _context.clock().now(); } public void resetBegin() { _initializeTime = _context.clock().now(); }
/** flag this packet as a particular type for accounting purposes */ /** flag this packet as a particular type for accounting purposes */
public void markType(int type) { verifyNotReleased(); _markedType = type; } public void markType(int type) { verifyNotReleased(); _markedType = type; }
@ -131,7 +126,7 @@ public class UDPPacket {
// validate by comparing _data[0:15] and // validate by comparing _data[0:15] and
// HMAC(payload + IV + payloadLength, macKey) // HMAC(payload + IV + payloadLength, macKey)
int payloadLength = _packetDataLength /*_packet.getLength()*/ - MAC_SIZE - IV_SIZE; int payloadLength = _packet.getLength() - MAC_SIZE - IV_SIZE;
if (payloadLength > 0) { if (payloadLength > 0) {
int off = 0; int off = 0;
System.arraycopy(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, buf.getData(), off, payloadLength); System.arraycopy(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, buf.getData(), off, payloadLength);
@ -178,7 +173,7 @@ public class UDPPacket {
verifyNotReleased(); verifyNotReleased();
ByteArray iv = _ivCache.acquire(); ByteArray iv = _ivCache.acquire();
System.arraycopy(_data, MAC_SIZE, iv.getData(), 0, IV_SIZE); System.arraycopy(_data, MAC_SIZE, iv.getData(), 0, IV_SIZE);
int len = _packetDataLength; // _packet.getLength() int len = _packet.getLength();
_context.aes().decrypt(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, _data, _packet.getOffset() + MAC_SIZE + IV_SIZE, cipherKey, iv.getData(), len - MAC_SIZE - IV_SIZE); _context.aes().decrypt(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, _data, _packet.getOffset() + MAC_SIZE + IV_SIZE, cipherKey, iv.getData(), len - MAC_SIZE - IV_SIZE);
_ivCache.release(iv); _ivCache.release(iv);
} }
@ -186,12 +181,12 @@ public class UDPPacket {
public String toString() { public String toString() {
verifyNotReleased(); verifyNotReleased();
StringBuffer buf = new StringBuffer(64); StringBuffer buf = new StringBuffer(64);
buf.append(_packetDataLength); buf.append(_packet.getLength());
buf.append(" byte packet with "); buf.append(" byte packet with ");
buf.append(_packet.getAddress().getHostAddress()).append(":"); buf.append(_packet.getAddress().getHostAddress()).append(":");
buf.append(_packet.getPort()); buf.append(_packet.getPort());
buf.append(" id=").append(System.identityHashCode(this)); buf.append(" id=").append(System.identityHashCode(this));
buf.append(" data=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength())); buf.append("\ndata=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength()));
return buf.toString(); return buf.toString();
} }
@ -245,6 +240,8 @@ public class UDPPacket {
synchronized (_packetCache) { synchronized (_packetCache) {
if (_packetCache.size() <= 64) { if (_packetCache.size() <= 64) {
_packetCache.add(this); _packetCache.add(this);
} else {
_dataCache.release(_dataBuf);
} }
} }
} }

View File

@ -37,7 +37,7 @@ public class UDPPacketReader {
public void initialize(UDPPacket packet) { public void initialize(UDPPacket packet) {
int off = packet.getPacket().getOffset(); int off = packet.getPacket().getOffset();
int len = packet.getPacketDataLength(); //packet.getPacket().getLength(); int len = packet.getPacket().getLength();
off += UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; off += UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
len -= UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; len -= UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
initialize(packet.getPacket().getData(), off, len); initialize(packet.getPacket().getData(), off, len);

View File

@ -87,6 +87,9 @@ public class UDPReceiver {
return doReceive(packet); return doReceive(packet);
} }
private final int doReceive(UDPPacket packet) { private final int doReceive(UDPPacket packet) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received: " + packet);
synchronized (_inboundQueue) { synchronized (_inboundQueue) {
int queueSize = _inboundQueue.size(); int queueSize = _inboundQueue.size();
if (queueSize > 0) { if (queueSize > 0) {
@ -159,7 +162,6 @@ public class UDPReceiver {
int size = packet.getPacket().getLength(); int size = packet.getPacket().getLength();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("After blocking socket.receive: packet is " + size + " bytes!"); _log.debug("After blocking socket.receive: packet is " + size + " bytes!");
packet.setPacketDataLength(size);
packet.resetBegin(); packet.resetBegin();
// and block after we know how much we read but before // and block after we know how much we read but before

View File

@ -137,7 +137,7 @@ public class UDPSender {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet to send known: " + packet); _log.debug("Packet to send known: " + packet);
long acquireTime = _context.clock().now(); long acquireTime = _context.clock().now();
int size = packet.getPacketDataLength(); // packet.getPacket().getLength(); int size = packet.getPacket().getLength();
int size2 = packet.getPacket().getLength(); int size2 = packet.getPacket().getLength();
if (size > 0) { if (size > 0) {
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender"); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender");

View File

@ -30,6 +30,7 @@ import net.i2p.router.transport.Transport;
import net.i2p.router.transport.TransportImpl; import net.i2p.router.transport.TransportImpl;
import net.i2p.router.transport.TransportBid; import net.i2p.router.transport.TransportBid;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/** /**
* *
@ -53,6 +54,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private PacketPusher _pusher; private PacketPusher _pusher;
private InboundMessageFragments _inboundFragments; private InboundMessageFragments _inboundFragments;
private UDPFlooder _flooder; private UDPFlooder _flooder;
private ExpirePeerEvent _expireEvent;
/** list of RelayPeer objects for people who will relay to us */ /** list of RelayPeer objects for people who will relay to us */
private List _relayPeers; private List _relayPeers;
@ -123,6 +125,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this); _inboundFragments = new InboundMessageFragments(_context, _fragments, this);
_flooder = new UDPFlooder(_context, this); _flooder = new UDPFlooder(_context, this);
_expireEvent = new ExpirePeerEvent();
_context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
} }
@ -203,6 +206,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_pusher.startup(); _pusher.startup();
_refiller.startup(); _refiller.startup();
_flooder.startup(); _flooder.startup();
_expireEvent.setIsAlive(true);
} }
public void shutdown() { public void shutdown() {
@ -222,6 +226,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_establisher.shutdown(); _establisher.shutdown();
if (_inboundFragments != null) if (_inboundFragments != null)
_inboundFragments.shutdown(); _inboundFragments.shutdown();
_expireEvent.setIsAlive(false);
} }
/** /**
@ -333,16 +338,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (SHOULD_FLOOD_PEERS) if (SHOULD_FLOOD_PEERS)
_flooder.addPeer(peer); _flooder.addPeer(peer);
_expireEvent.add(peer);
return true; return true;
} }
private void dropPeer(PeerState peer) { private void dropPeer(PeerState peer) {
dropPeer(peer, true);
}
private void dropPeer(PeerState peer, boolean shouldShitlist) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.debug("Dropping remote peer: " + peer); _log.debug("Dropping remote peer: " + peer);
if (peer.getRemotePeer() != null) { if (peer.getRemotePeer() != null) {
long now = _context.clock().now(); if (shouldShitlist) {
_context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime()); long now = _context.clock().now();
_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries"); _context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries");
}
synchronized (_peersByIdent) { synchronized (_peersByIdent) {
_peersByIdent.remove(peer.getRemotePeer()); _peersByIdent.remove(peer.getRemotePeer());
} }
@ -360,6 +372,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (SHOULD_FLOOD_PEERS) if (SHOULD_FLOOD_PEERS)
_flooder.removePeer(peer); _flooder.removePeer(peer);
_expireEvent.remove(peer);
} }
int send(UDPPacket packet) { int send(UDPPacket packet) {
@ -690,4 +703,71 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public Transport getTransport() { return UDPTransport.this; } public Transport getTransport() { return UDPTransport.this; }
public String toString() { return "UDP bid @ " + _ms; } public String toString() { return "UDP bid @ " + _ms; }
} }
private class ExpirePeerEvent implements SimpleTimer.TimedEvent {
private List _peers;
// toAdd and toRemove are kept separate from _peers so that add and
// remove calls won't block packet handling while the big iteration is
// in process
private List _toAdd;
private List _toRemove;
private boolean _alive;
public ExpirePeerEvent() {
_peers = new ArrayList(128);
_toAdd = new ArrayList(4);
_toRemove = new ArrayList(4);
}
public void timeReached() {
long inactivityCutoff = _context.clock().now() - 10*60*1000;
for (int i = 0; i < _peers.size(); i++) {
PeerState peer = (PeerState)_peers.get(i);
if (peer.getLastReceiveTime() < inactivityCutoff) {
dropPeer(peer, false);
_peers.remove(i);
i--;
}
}
synchronized (_toAdd) {
for (int i = 0; i < _toAdd.size(); i++) {
PeerState peer = (PeerState)_toAdd.get(i);
if (!_peers.contains(peer))
_peers.add(peer);
}
_toAdd.clear();
}
synchronized (_toRemove) {
for (int i = 0; i < _toRemove.size(); i++) {
PeerState peer = (PeerState)_toRemove.get(i);
_peers.remove(peer);
}
_toRemove.clear();
}
if (_alive)
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 5*60*1000);
}
public void add(PeerState peer) {
synchronized (_toAdd) {
_toAdd.add(peer);
}
}
public void remove(PeerState peer) {
synchronized (_toRemove) {
_toRemove.add(peer);
}
}
public void setIsAlive(boolean isAlive) {
_alive = isAlive;
if (isAlive) {
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 5*60*1000);
} else {
SimpleTimer.getInstance().removeEvent(ExpirePeerEvent.this);
synchronized (_toAdd) {
_toAdd.clear();
}
synchronized (_peers) {
_peers.clear();
}
}
}
}
} }