From d3380228ac21ab1e9157419d4aa6f4d6494edcf8 Mon Sep 17 00:00:00 2001 From: jrandom Date: Sat, 9 Jul 2005 22:58:22 +0000 Subject: [PATCH] * you mean 3f != 0x3f? [duh] * minor cleanups --- .../net/i2p/data/i2np/TunnelDataMessage.java | 11 ++- .../udp/InboundMessageFragments.java | 11 +-- .../transport/udp/InboundMessageState.java | 14 ++- .../router/transport/udp/MessageReceiver.java | 4 +- .../transport/udp/OutboundMessageState.java | 5 +- .../router/transport/udp/PacketBuilder.java | 48 ++++++++--- .../i2p/router/transport/udp/PeerState.java | 12 ++- .../router/transport/udp/UDPEndpointTest.java | 2 +- .../i2p/router/transport/udp/UDPPacket.java | 15 ++-- .../router/transport/udp/UDPPacketReader.java | 2 +- .../i2p/router/transport/udp/UDPReceiver.java | 4 +- .../i2p/router/transport/udp/UDPSender.java | 2 +- .../router/transport/udp/UDPTransport.java | 86 ++++++++++++++++++- 13 files changed, 172 insertions(+), 44 deletions(-) diff --git a/router/java/src/net/i2p/data/i2np/TunnelDataMessage.java b/router/java/src/net/i2p/data/i2np/TunnelDataMessage.java index 69555eaa4..de91a858b 100644 --- a/router/java/src/net/i2p/data/i2np/TunnelDataMessage.java +++ b/router/java/src/net/i2p/data/i2np/TunnelDataMessage.java @@ -25,6 +25,7 @@ public class TunnelDataMessage extends I2NPMessageImpl { private long _tunnelId; private TunnelId _tunnelIdObj; private byte[] _data; + private ByteArray _dataBuf; public final static int MESSAGE_TYPE = 18; private static final int DATA_SIZE = 1024; @@ -81,10 +82,12 @@ public class TunnelDataMessage extends I2NPMessageImpl { // we cant cache it in trivial form, as other components (e.g. HopProcessor) // call getData() and use it as the buffer to write with. it is then used // again to pass to the 'receiver', which may even cache it in a FragmentMessage. - if (PIPELINED_CACHE) - _data = _cache.acquire().getData(); - else + if (PIPELINED_CACHE) { + _dataBuf = _cache.acquire(); + _data = _dataBuf.getData(); + } else { _data = new byte[DATA_SIZE]; + } System.arraycopy(data, curIndex, _data, 0, DATA_SIZE); } @@ -102,7 +105,7 @@ public class TunnelDataMessage extends I2NPMessageImpl { System.arraycopy(_data, 0, out, curIndex, DATA_SIZE); curIndex += _data.length; if (PIPELINED_CACHE) - _cache.release(new ByteArray(_data)); + _cache.release(_dataBuf); return curIndex; } diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 4fdb536de..91d181b69 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -99,14 +99,15 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource Map messages = from.getInboundMessages(); for (int i = 0; i < fragments; i++) { - Long messageId = new Long(data.readMessageId(i)); + long mid = data.readMessageId(i); + Long messageId = new Long(mid); - if (_recentlyCompletedMessages.isKnown(messageId.longValue())) { + if (_recentlyCompletedMessages.isKnown(mid)) { _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0); from.messageFullyReceived(messageId, -1); _ackSender.ackPeer(from); if (_log.shouldLog(Log.WARN)) - _log.warn("Message received is a dup: " + messageId + " dups: " + _log.warn("Message received is a dup: " + mid + " dups: " + _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of " + _recentlyCompletedMessages.getInsertedCount()); continue; @@ -124,7 +125,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource synchronized (messages) { state = (InboundMessageState)messages.get(messageId); if (state == null) { - state = new InboundMessageState(_context, messageId.longValue(), fromPeer); + state = new InboundMessageState(_context, mid, fromPeer); messages.put(messageId, state); } @@ -141,7 +142,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource } if (messageComplete) { - _recentlyCompletedMessages.add(messageId.longValue()); + _recentlyCompletedMessages.add(mid); _messageReceiver.receiveMessage(state); from.messageFullyReceived(messageId, state.getCompleteSize()); diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java index 96c09c7fe..f6840da79 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java @@ -1,5 +1,6 @@ package net.i2p.router.transport.udp; +import net.i2p.data.Base64; import net.i2p.data.ByteArray; import net.i2p.data.Hash; import net.i2p.router.RouterContext; @@ -66,8 +67,19 @@ public class InboundMessageState { int size = data.readMessageFragmentSize(dataFragment); message.setValid(size); _fragments[fragmentNum] = message; - if (data.readMessageIsLast(dataFragment)) + boolean isLast = data.readMessageIsLast(dataFragment); + if (isLast) _lastFragment = fragmentNum; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("New fragment " + fragmentNum + " for message " + _messageId + + ", size=" + size + + ", isLast=" + isLast + + ", data=" + Base64.encode(message.getData(), 0, size)); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received fragment " + fragmentNum + " for message " + _messageId + + " again, old size=" + _fragments[fragmentNum].getValid() + + " and new size=" + data.readMessageFragmentSize(dataFragment)); } return true; } diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java index 704cbf2fc..5e1e266a5 100644 --- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java @@ -93,7 +93,9 @@ public class MessageReceiver implements Runnable { System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": " - + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())); + + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid()) + + " (valid: " + fragments[i].getValid() + + " raw: " + Base64.encode(fragments[i].getData()) + ")"); off += fragments[i].getValid(); } if (off != state.getCompleteSize()) diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index b5aa7eae8..7ac9bf77d 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -248,8 +248,9 @@ public class OutboundMessageState { int toSend = end - start; System.arraycopy(_messageBuf.getData(), start, out, outOffset, toSend); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId + ": " - + Base64.encode(_messageBuf.getData(), start, toSend)); + _log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId + + "[" + start + "-" + (start+toSend) + "/" + _messageBuf.getValid() + "/" + _fragmentSize + "]: " + + Base64.encode(out, outOffset, toSend)); return toSend; } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index 0e84c1d8a..39497d9e4 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -28,6 +28,7 @@ public class PacketBuilder { private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE); private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH); + private static final ByteCache _blockCache = ByteCache.getInstance(64, 16); public PacketBuilder(I2PAppContext ctx) { _context = ctx; @@ -70,20 +71,32 @@ public class PacketBuilder { if (size < 0) return null; DataHelper.toLong(data, off, 2, size); - data[off] &= (byte)3F; // 2 highest bits are reserved + data[off] &= (byte)0x3F; // 2 highest bits are reserved off += 2; - size = state.writeFragment(data, off, fragment); + int sizeWritten = state.writeFragment(data, off, fragment); + if (sizeWritten != size) + _log.error("Size written: " + sizeWritten + " but size: " + size + + " for fragment " + fragment + " of " + state.getMessageId()); + else if (_log.shouldLog(Log.DEBUG)) + _log.debug("Size written: " + sizeWritten + " for fragment " + fragment + + " of " + state.getMessageId()); + size = sizeWritten; if (size < 0) return null; off += size; // we can pad here if we want, maybe randomized? // pad up so we're on the encryption boundary - if ( (off % 16) != 0) - off += 16 - (off % 16); + int padSize = 16 - (off % 16); + if (padSize > 0) { + ByteArray block = _blockCache.acquire(); + _context.random().nextBytes(block.getData()); + System.arraycopy(block.getData(), 0, data, off, padSize); + _blockCache.release(block); + off += padSize; + } packet.getPacket().setLength(off); - packet.setPacketDataLength(off); authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey()); setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort()); return packet; @@ -169,7 +182,6 @@ public class PacketBuilder { if ( (off % 16) != 0) off += 16 - (off % 16); packet.getPacket().setLength(off); - packet.setPacketDataLength(off); authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey()); setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort()); return packet; @@ -262,7 +274,6 @@ public class PacketBuilder { if ( (off % 16) != 0) off += 16 - (off % 16); packet.getPacket().setLength(off); - packet.setPacketDataLength(off); authenticate(packet, ourIntroKey, ourIntroKey, iv); setTo(packet, to, state.getSentPort()); _ivCache.release(iv); @@ -321,7 +332,6 @@ public class PacketBuilder { if ( (off % 16) != 0) off += 16 - (off % 16); packet.getPacket().setLength(off); - packet.setPacketDataLength(off); authenticate(packet, state.getIntroKey(), state.getIntroKey()); setTo(packet, to, state.getSentPort()); return packet; @@ -416,7 +426,6 @@ public class PacketBuilder { System.arraycopy(state.getSentSignature().getData(), 0, data, off, Signature.SIGNATURE_BYTES); packet.getPacket().setLength(off + Signature.SIGNATURE_BYTES); - packet.setPacketDataLength(off + Signature.SIGNATURE_BYTES); authenticate(packet, state.getCipherKey(), state.getMACKey()); } else { // nothing more to add beyond the identity fragment, though we can @@ -426,7 +435,6 @@ public class PacketBuilder { if ( (off % 16) != 0) off += 16 - (off % 16); packet.getPacket().setLength(off); - packet.setPacketDataLength(off); authenticate(packet, state.getIntroKey(), state.getIntroKey()); } @@ -469,7 +477,7 @@ public class PacketBuilder { */ private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, ByteArray iv) { int encryptOffset = packet.getPacket().getOffset() + UDPPacket.IV_SIZE + UDPPacket.MAC_SIZE; - int encryptSize = packet.getPacketDataLength()/*packet.getPacket().getLength()*/ - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset(); + int encryptSize = packet.getPacket().getLength() - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset(); byte data[] = packet.getPacket().getData(); _context.aes().encrypt(data, encryptOffset, data, encryptOffset, cipherKey, iv.getData(), encryptSize); @@ -488,7 +496,7 @@ public class PacketBuilder { _context.hmac().calculate(macKey, data, hmacOff, hmacLen, ba.getData(), 0); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Authenticating " + packet.getPacketDataLength() + // packet.getPacket().getLength() + + _log.debug("Authenticating " + packet.getPacket().getLength() + "\nIV: " + Base64.encode(iv.getData()) + "\nraw mac: " + Base64.encode(ba.getData()) + "\nMAC key: " + macKey.toBase64()); @@ -499,4 +507,20 @@ public class PacketBuilder { System.arraycopy(iv.getData(), 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE); _hmacCache.release(ba); } + + public static void main(String args[]) { + byte data[] = new byte[32]; + int off = 0; + int size = 1216; + System.out.println("Binary: " + Integer.toBinaryString(size)); + System.out.println("Binary: " + Integer.toBinaryString(0x3F)); + DataHelper.toLong(data, off, 2, size); + + System.out.println(DataHelper.toHexString(data)); + data[off] &= (byte)0x3F; // 2 highest bits are reserved + System.out.println(DataHelper.toHexString(data)); + System.out.println(DataHelper.toHexString(data)); + long val = DataHelper.fromLong(data, off, 2); + System.out.println("Val: " + val + " size: " + size + " raw: " +Base64.encode(data)); + } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index d5a49fc76..6932c2cc6 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -1,6 +1,7 @@ package net.i2p.router.transport.udp; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -15,7 +16,7 @@ import net.i2p.data.SessionKey; import net.i2p.util.Log; /** - * Contain all of the state about a UDP connection to a peer + * Contain all of the state about a UDP connection to a peer. * */ public class PeerState { @@ -158,7 +159,7 @@ public class PeerState { * 502 fragment bytes, which is enough to send a tunnel data message in 2 * packets. */ - private static final int DEFAULT_MTU = 576; + private static final int DEFAULT_MTU = 1500; private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY; private static final int MAX_RTO = 2000; // 5000; @@ -503,9 +504,10 @@ public class PeerState { * */ public List retrieveACKBitfields() { - List rv = new ArrayList(16); + List rv = null; int bytesRemaining = countMaxACKData(); synchronized (_currentACKs) { + rv = new ArrayList(_currentACKs.size()); while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) { rv.add(new FullACKBitfield((Long)_currentACKs.remove(0))); bytesRemaining -= 4; @@ -526,6 +528,8 @@ public class PeerState { ACKBitfield bitfield = (ACKBitfield)partial.get(i); int bytes = (bitfield.fragmentCount() / 7) + 1; if (bytesRemaining > bytes + 4) { // msgId + bitfields + if (rv == null) + rv = new ArrayList(partial.size()); rv.add(bitfield); bytesRemaining -= bytes + 4; partialIncluded++; @@ -538,6 +542,8 @@ public class PeerState { _context.statManager().addRateData("udp.sendACKPartial", partialIncluded, rv.size() - partialIncluded); _lastACKSend = _context.clock().now(); + if (rv == null) + rv = Collections.EMPTY_LIST; return rv; } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java index 42991058a..1dde784e8 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java @@ -112,7 +112,7 @@ public class UDPEndpointTest { try { packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort()); packet.writeData(data, 0, 1024); - packet.setPacketDataLength(1024); + packet.getPacket().setLength(1024); int outstanding = _sentNotReceived.size() + 1; _sentNotReceived.add(new ByteArray(data, 0, 1024)); _log.debug("Sending packet " + curPacket + " with outstanding " + outstanding); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index cfcc6d7ed..ac7f60010 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -26,7 +26,6 @@ public class UDPPacket { private I2PAppContext _context; private static Log _log; private volatile DatagramPacket _packet; - private volatile int _packetDataLength; private volatile short _priority; private volatile long _initializeTime; private volatile long _expiration; @@ -77,7 +76,6 @@ public class UDPPacket { _dataBuf = _dataCache.acquire(); _data = _dataBuf.getData(); _packet = new DatagramPacket(_data, MAX_PACKET_SIZE); - _packetDataLength = 0; _initializeTime = _context.clock().now(); _markedType = -1; } @@ -98,7 +96,6 @@ public class UDPPacket { verifyNotReleased(); System.arraycopy(src, offset, _data, 0, len); _packet.setLength(len); - setPacketDataLength(len); resetBegin(); } public DatagramPacket getPacket() { verifyNotReleased(); return _packet; } @@ -106,8 +103,6 @@ public class UDPPacket { public long getExpiration() { verifyNotReleased(); return _expiration; } public long getBegin() { verifyNotReleased(); return _initializeTime; } public long getLifetime() { verifyNotReleased(); return _context.clock().now() - _initializeTime; } - public int getPacketDataLength() { return _packetDataLength; } - public void setPacketDataLength(int bytes) { _packetDataLength = bytes; } public void resetBegin() { _initializeTime = _context.clock().now(); } /** flag this packet as a particular type for accounting purposes */ public void markType(int type) { verifyNotReleased(); _markedType = type; } @@ -131,7 +126,7 @@ public class UDPPacket { // validate by comparing _data[0:15] and // HMAC(payload + IV + payloadLength, macKey) - int payloadLength = _packetDataLength /*_packet.getLength()*/ - MAC_SIZE - IV_SIZE; + int payloadLength = _packet.getLength() - MAC_SIZE - IV_SIZE; if (payloadLength > 0) { int off = 0; System.arraycopy(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, buf.getData(), off, payloadLength); @@ -178,7 +173,7 @@ public class UDPPacket { verifyNotReleased(); ByteArray iv = _ivCache.acquire(); System.arraycopy(_data, MAC_SIZE, iv.getData(), 0, IV_SIZE); - int len = _packetDataLength; // _packet.getLength() + int len = _packet.getLength(); _context.aes().decrypt(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, _data, _packet.getOffset() + MAC_SIZE + IV_SIZE, cipherKey, iv.getData(), len - MAC_SIZE - IV_SIZE); _ivCache.release(iv); } @@ -186,12 +181,12 @@ public class UDPPacket { public String toString() { verifyNotReleased(); StringBuffer buf = new StringBuffer(64); - buf.append(_packetDataLength); + buf.append(_packet.getLength()); buf.append(" byte packet with "); buf.append(_packet.getAddress().getHostAddress()).append(":"); buf.append(_packet.getPort()); buf.append(" id=").append(System.identityHashCode(this)); - buf.append(" data=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength())); + buf.append("\ndata=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength())); return buf.toString(); } @@ -245,6 +240,8 @@ public class UDPPacket { synchronized (_packetCache) { if (_packetCache.size() <= 64) { _packetCache.add(this); + } else { + _dataCache.release(_dataBuf); } } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java index f22e72696..ff5cda595 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java @@ -37,7 +37,7 @@ public class UDPPacketReader { public void initialize(UDPPacket packet) { int off = packet.getPacket().getOffset(); - int len = packet.getPacketDataLength(); //packet.getPacket().getLength(); + int len = packet.getPacket().getLength(); off += UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; len -= UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; initialize(packet.getPacket().getData(), off, len); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index 32bdcd2f8..743718b96 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -87,6 +87,9 @@ public class UDPReceiver { return doReceive(packet); } private final int doReceive(UDPPacket packet) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received: " + packet); + synchronized (_inboundQueue) { int queueSize = _inboundQueue.size(); if (queueSize > 0) { @@ -159,7 +162,6 @@ public class UDPReceiver { int size = packet.getPacket().getLength(); if (_log.shouldLog(Log.DEBUG)) _log.debug("After blocking socket.receive: packet is " + size + " bytes!"); - packet.setPacketDataLength(size); packet.resetBegin(); // and block after we know how much we read but before diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index f38d231e5..e73ce4715 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -137,7 +137,7 @@ public class UDPSender { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet to send known: " + packet); long acquireTime = _context.clock().now(); - int size = packet.getPacketDataLength(); // packet.getPacket().getLength(); + int size = packet.getPacket().getLength(); int size2 = packet.getPacket().getLength(); if (size > 0) { FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender"); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 95921150b..cb5c0c8ed 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -30,6 +30,7 @@ import net.i2p.router.transport.Transport; import net.i2p.router.transport.TransportImpl; import net.i2p.router.transport.TransportBid; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; /** * @@ -53,6 +54,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private PacketPusher _pusher; private InboundMessageFragments _inboundFragments; private UDPFlooder _flooder; + private ExpirePeerEvent _expireEvent; /** list of RelayPeer objects for people who will relay to us */ private List _relayPeers; @@ -123,6 +125,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _inboundFragments = new InboundMessageFragments(_context, _fragments, this); _flooder = new UDPFlooder(_context, this); + _expireEvent = new ExpirePeerEvent(); _context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); } @@ -203,6 +206,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _pusher.startup(); _refiller.startup(); _flooder.startup(); + _expireEvent.setIsAlive(true); } public void shutdown() { @@ -222,6 +226,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _establisher.shutdown(); if (_inboundFragments != null) _inboundFragments.shutdown(); + _expireEvent.setIsAlive(false); } /** @@ -333,16 +338,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (SHOULD_FLOOD_PEERS) _flooder.addPeer(peer); + _expireEvent.add(peer); + return true; } private void dropPeer(PeerState peer) { + dropPeer(peer, true); + } + private void dropPeer(PeerState peer, boolean shouldShitlist) { if (_log.shouldLog(Log.WARN)) _log.debug("Dropping remote peer: " + peer); if (peer.getRemotePeer() != null) { - long now = _context.clock().now(); - _context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime()); - _context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries"); + if (shouldShitlist) { + long now = _context.clock().now(); + _context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime()); + _context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries"); + } synchronized (_peersByIdent) { _peersByIdent.remove(peer.getRemotePeer()); } @@ -360,6 +372,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (SHOULD_FLOOD_PEERS) _flooder.removePeer(peer); + _expireEvent.remove(peer); } int send(UDPPacket packet) { @@ -690,4 +703,71 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority public Transport getTransport() { return UDPTransport.this; } public String toString() { return "UDP bid @ " + _ms; } } + + private class ExpirePeerEvent implements SimpleTimer.TimedEvent { + private List _peers; + // toAdd and toRemove are kept separate from _peers so that add and + // remove calls won't block packet handling while the big iteration is + // in process + private List _toAdd; + private List _toRemove; + private boolean _alive; + public ExpirePeerEvent() { + _peers = new ArrayList(128); + _toAdd = new ArrayList(4); + _toRemove = new ArrayList(4); + } + public void timeReached() { + long inactivityCutoff = _context.clock().now() - 10*60*1000; + for (int i = 0; i < _peers.size(); i++) { + PeerState peer = (PeerState)_peers.get(i); + if (peer.getLastReceiveTime() < inactivityCutoff) { + dropPeer(peer, false); + _peers.remove(i); + i--; + } + } + synchronized (_toAdd) { + for (int i = 0; i < _toAdd.size(); i++) { + PeerState peer = (PeerState)_toAdd.get(i); + if (!_peers.contains(peer)) + _peers.add(peer); + } + _toAdd.clear(); + } + synchronized (_toRemove) { + for (int i = 0; i < _toRemove.size(); i++) { + PeerState peer = (PeerState)_toRemove.get(i); + _peers.remove(peer); + } + _toRemove.clear(); + } + if (_alive) + SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 5*60*1000); + } + public void add(PeerState peer) { + synchronized (_toAdd) { + _toAdd.add(peer); + } + } + public void remove(PeerState peer) { + synchronized (_toRemove) { + _toRemove.add(peer); + } + } + public void setIsAlive(boolean isAlive) { + _alive = isAlive; + if (isAlive) { + SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 5*60*1000); + } else { + SimpleTimer.getInstance().removeEvent(ExpirePeerEvent.this); + synchronized (_toAdd) { + _toAdd.clear(); + } + synchronized (_peers) { + _peers.clear(); + } + } + } + } }