2005-07-04 jrandom

* Within the tunnel, use xor(IV, msg[0:16]) as the flag to detect dups,
      rather than the IV by itself, preventing an attack that would let
      colluding internal adversaries tag a message to determine that they are
      in the same tunnel.  Thanks dvorak for the catch!
    * Drop long inactive profiles on startup and shutdown
    * /configstats.jsp: web interface to pick what stats to log
    * Deliver more session tags to account for wider window sizes
    * Cache some intermediate values in our HMACSHA256 and BC's HMAC
    * Track the client send rate (stream.sendBps and client.sendBpsRaw)
    * UrlLauncher: adjust the browser selection order
    * I2PAppContext: hooks for dummy HMACSHA256 and a weak PRNG
    * StreamSinkClient: add support for sending an unlimited amount of data
    * Migrate the tests out of the default build jars

2005-06-22  Comwiz
    * Migrate the core tests to junit
This commit is contained in:
jrandom
2005-07-04 20:44:17 +00:00
committed by zzz
parent 440cf2c983
commit 18d3f5d25d
80 changed files with 2398 additions and 958 deletions

View File

@ -20,6 +20,7 @@ import net.i2p.data.DataStructure;
*/
public interface I2NPMessage extends DataStructure {
final long MAX_ID_VALUE = (1l<<32l)-1l;
final int MAX_SIZE = 64*1024; // insane
/**
* Read the body into the data structures, after the initial type byte, using

View File

@ -66,7 +66,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
//Hash h = new Hash();
//h.readBytes(in);
if (buffer.length < size) {
if (size > 64*1024) throw new I2NPMessageException("size=" + size);
if (size > MAX_SIZE) throw new I2NPMessageException("size=" + size);
buffer = new byte[size];
}

View File

@ -217,7 +217,8 @@ public class InNetMessagePool implements Service {
if (_log.shouldLog(Log.WARN))
_log.warn("Message expiring on "
+ (messageBody != null ? (messageBody.getMessageExpiration()+"") : " [unknown]")
+ " was not handled by a HandlerJobBuilder - DROPPING: " + messageBody);
+ " was not handled by a HandlerJobBuilder - DROPPING: " + messageBody,
new Exception("f00!"));
_context.statManager().addRateData("inNetPool.dropped", 1, 0);
}
} else {

View File

@ -401,7 +401,7 @@ public class JobQueue {
try {
while (_alive) {
long now = _context.clock().now();
long timeToWait = 0;
long timeToWait = -1;
ArrayList toAdd = null;
synchronized (_jobLock) {
for (int i = 0; i < _timedJobs.size(); i++) {
@ -434,9 +434,11 @@ public class JobQueue {
_readyJobs.add(toAdd.get(i));
_jobLock.notifyAll();
} else {
if (timeToWait < 10)
if (timeToWait < 0)
timeToWait = 30*1000;
else if (timeToWait < 10)
timeToWait = 10;
if (timeToWait > 10*1000)
else if (timeToWait > 10*1000)
timeToWait = 10*1000;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Waiting " + timeToWait + " before rechecking the timed queue");

View File

@ -206,6 +206,19 @@ public class MessageHistory {
addEntry(getPrefix() + "tunnel dispatched: " + info);
}
public void tunnelDispatched(long messageId, long tunnelId, String type) {
if (!_doLog) return;
addEntry(getPrefix() + "message " + messageId + " on tunnel " + tunnelId + " as " + type);
}
public void tunnelDispatched(long messageId, long tunnelId, long toTunnel, Hash toPeer, String type) {
if (!_doLog) return;
if (toPeer != null)
addEntry(getPrefix() + "message " + messageId + " on tunnel " + tunnelId + " / " + toTunnel + " to " + toPeer.toBase64() + " as " + type);
else
addEntry(getPrefix() + "message " + messageId + " on tunnel " + tunnelId + " / " + toTunnel + " as " + type);
}
/**
* The local router has detected a failure in the given tunnel
*

View File

@ -414,14 +414,12 @@ public class Router {
_cal.set(Calendar.SECOND, 0);
_cal.set(Calendar.MILLISECOND, 0);
long then = _cal.getTime().getTime();
_log.debug("Time till midnight: " + (then-now) + "ms");
if (then - now <= 60*1000) {
// everyone wave at kaffe.
// "Hi Kaffe"
return 60*1000;
} else {
return then - now;
}
long howLong = then - now;
if (howLong < 0) // hi kaffe
howLong = 24*60*60*1000 + howLong;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Time till midnight: " + howLong + "ms");
return howLong;
}
}
@ -838,12 +836,14 @@ public class Router {
FileOutputStream fos = null;
try {
fos = new FileOutputStream(_configFilename);
TreeSet ordered = new TreeSet(_config.keySet());
StringBuffer buf = new StringBuffer(8*1024);
for (Iterator iter = ordered.iterator() ; iter.hasNext(); ) {
String key = (String)iter.next();
String val = _config.getProperty(key);
buf.append(key).append('=').append(val).append('\n');
synchronized (_config) {
TreeSet ordered = new TreeSet(_config.keySet());
for (Iterator iter = ordered.iterator() ; iter.hasNext(); ) {
String key = (String)iter.next();
String val = _config.getProperty(key);
buf.append(key).append('=').append(val).append('\n');
}
}
fos.write(buf.toString().getBytes());
} catch (IOException ioe) {

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.196 $ $Date: 2005/05/01 21:35:16 $";
public final static String ID = "$Revision: 1.197 $ $Date: 2005/05/25 16:32:38 $";
public final static String VERSION = "0.5.0.7";
public final static long BUILD = 8;
public final static long BUILD = 9;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -61,11 +61,21 @@ public class ClientListenerRunner implements Runnable {
int curDelay = 0;
while (_running) {
try {
_log.info("Starting up listening for connections on port " + _port);
if (_bindAllInterfaces)
if (_bindAllInterfaces) {
if (_log.shouldLog(Log.INFO))
_log.info("Listening on port " + _port + " on all interfaces");
_socket = new ServerSocket(_port);
else
_socket = new ServerSocket(_port, 0, InetAddress.getByName("127.0.0.1"));
} else {
String listenInterface = _context.getProperty(ClientManagerFacadeImpl.PROP_CLIENT_HOST,
ClientManagerFacadeImpl.DEFAULT_HOST);
if (_log.shouldLog(Log.INFO))
_log.info("Listening on port " + _port + " of the specific interface: " + listenInterface);
_socket = new ServerSocket(_port, 0, InetAddress.getByName(listenInterface));
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("ServerSocket created, before accept: " + _socket);
curDelay = 0;
while (_running) {

View File

@ -37,6 +37,8 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade {
private RouterContext _context;
public final static String PROP_CLIENT_PORT = "i2cp.port";
public final static int DEFAULT_PORT = 7654;
public final static String PROP_CLIENT_HOST = "i2cp.hostname";
public final static String DEFAULT_HOST = "127.0.0.1";
public ClientManagerFacadeImpl(RouterContext context) {
_context = context;

View File

@ -28,6 +28,14 @@ class ProfilePersistenceHelper {
public final static String DEFAULT_PEER_PROFILE_DIR = "peerProfiles";
private final static String NL = System.getProperty("line.separator");
/**
* If we haven't been able to get a message through to the peer in 3 days,
* drop the profile. They may reappear, but if they do, their config may
* have changed (etc).
*
*/
public static final long EXPIRE_AGE = 3*24*60*60*1000;
private File _profileDir = null;
private Hash _us;
@ -46,6 +54,9 @@ class ProfilePersistenceHelper {
/** write out the data from the profile to the stream */
public void writeProfile(PeerProfile profile) {
if (isExpired(profile.getLastSendSuccessful()))
return;
File f = pickFile(profile);
long before = _context.clock().now();
OutputStream fos = null;
@ -159,6 +170,12 @@ class ProfilePersistenceHelper {
rv.add(files[i]);
return rv;
}
private boolean isExpired(long lastSentToSuccessfully) {
long timeSince = _context.clock().now() - lastSentToSuccessfully;
return (timeSince > EXPIRE_AGE);
}
public PeerProfile readProfile(File file) {
Hash peer = getHash(file.getName());
try {
@ -171,6 +188,15 @@ class ProfilePersistenceHelper {
loadProps(props, file);
long lastSentToSuccessfully = getLong(props, "lastSentToSuccessfully");
if (isExpired(lastSentToSuccessfully)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping old profile for " + file.getName() +
", since we haven't heard from them in a long time");
file.delete();
return null;
}
profile.setReliabilityBonus(getLong(props, "reliabilityBonus"));
profile.setIntegrationBonus(getLong(props, "integrationBonus"));
profile.setSpeedBonus(getLong(props, "speedBonus"));

View File

@ -756,8 +756,14 @@ public class ConnectionBuilder {
_remoteAddress = tcpAddr;
_created = true;
} catch (IOException ioe) {
Hash peer = _target.getIdentity().calculateHash();
String peerName = null;
if (peer == null)
peerName = "unknown";
else
peerName = peer.toBase64().substring(0,6);
fail("Error contacting "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ peerName
+ " on " + tcpAddr.toString() + ": " + ioe.getMessage());
return;
}

View File

@ -136,6 +136,8 @@ public class TCPTransport extends TransportImpl {
}
public TransportBid bid(RouterInfo toAddress, long dataSize) {
if (false) return null;
RouterAddress addr = toAddress.getTargetAddress(STYLE);
if ( (_myAddress != null) && (_myAddress.equals(addr)) )
@ -306,8 +308,13 @@ public class TCPTransport extends TransportImpl {
}
void connectionClosed(TCPConnection con) {
Hash remotePeer = null;
if (con == null) return;
RouterIdentity peer = con.getRemoteRouterIdentity();
if (peer == null) return;
remotePeer = peer.getHash();
synchronized (_connectionLock) {
TCPConnection cur = (TCPConnection)_connectionsByIdent.remove(con.getRemoteRouterIdentity().getHash());
TCPConnection cur = (TCPConnection)_connectionsByIdent.remove(remotePeer);
if ( (cur != null) && (cur != con) )
_connectionsByIdent.put(cur.getRemoteRouterIdentity().getHash(), cur);
cur = (TCPConnection)_connectionsByAddress.remove(con.getRemoteAddress().toString());

View File

@ -0,0 +1,16 @@
package net.i2p.router.transport.udp;
/**
* Generic means of SACK/NACK transmission for partially or fully
* received messages
*/
public interface ACKBitfield {
/** what message is this partially ACKing? */
public long getMessageId();
/** how many fragments are covered in this bitfield? */
public int fragmentCount();
/** has the given fragment been received? */
public boolean received(int fragmentNum);
/** has the entire message been received completely? */
public boolean receivedComplete();
}

View File

@ -68,8 +68,9 @@ public class ACKSender implements Runnable {
synchronized (_peersToACK) {
for (int i = 0; i < _peersToACK.size(); i++) {
PeerState cur = (PeerState)_peersToACK.get(i);
long delta = cur.getWantedACKSendSince() + ACK_FREQUENCY - now;
if ( (delta < 0) || (cur.unsentACKThresholdReached()) ) {
long wanted = cur.getWantedACKSendSince();
long delta = wanted + ACK_FREQUENCY - now;
if ( ( (wanted > 0) && (delta < 0) ) || (cur.unsentACKThresholdReached()) ) {
_peersToACK.remove(i);
peer = cur;
break;
@ -90,28 +91,33 @@ public class ACKSender implements Runnable {
if (peer != null) {
long lastSend = peer.getLastACKSend();
long wanted = peer.getWantedACKSendSince();
List acks = peer.retrieveACKs();
if ( (acks != null) && (acks.size() > 0) ) {
_context.statManager().addRateData("udp.sendACKCount", acks.size(), 0);
List ackBitfields = peer.retrieveACKBitfields(_transport.getPartialACKSource());
if (wanted < 0)
_log.error("wtf, why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields);
if ( (ackBitfields != null) && (ackBitfields.size() > 0) ) {
_context.statManager().addRateData("udp.sendACKCount", ackBitfields.size(), 0);
_context.statManager().addRateData("udp.sendACKRemaining", remaining, 0);
now = _context.clock().now();
_context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted);
_context.statManager().getStatLog().addData(peer.getRemoteHostString(), "udp.peer.sendACKCount", acks.size(), 0);
UDPPacket ack = _builder.buildACK(peer, acks);
_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size(), 0);
UDPPacket ack = _builder.buildACK(peer, ackBitfields);
ack.markType(1);
if (_log.shouldLog(Log.INFO))
_log.info("Sending ACK for " + acks);
_log.info("Sending ACK for " + ackBitfields);
_transport.send(ack);
if (wanted == peer.getWantedACKSendSince()) {
// still packets left to be ACKed, since wanted time
// is reset by retrieveACKs when all of the IDs are
if ( (wanted > 0) && (wanted <= peer.getWantedACKSendSince()) ) {
// still full packets left to be ACKed, since wanted time
// is reset by retrieveACKBitfields when all of the IDs are
// removed
if (_log.shouldLog(Log.WARN))
_log.warn("Rerequesting ACK for peer " + peer);
ackPeer(peer);
}
}
}
}
}
}

View File

@ -26,9 +26,9 @@ public class EstablishmentManager {
private Log _log;
private UDPTransport _transport;
private PacketBuilder _builder;
/** map of host+port (String) to InboundEstablishState */
/** map of RemoteHostId to InboundEstablishState */
private Map _inboundStates;
/** map of host+port (String) to OutboundEstablishState */
/** map of RemoteHostId to OutboundEstablishState */
private Map _outboundStates;
private boolean _alive;
private Object _activityLock;
@ -63,7 +63,7 @@ public class EstablishmentManager {
* Grab the active establishing state
*/
InboundEstablishState getInboundState(InetAddress fromHost, int fromPort) {
String from = PeerState.calculateRemoteHostString(fromHost.getAddress(), fromPort);
RemoteHostId from = new RemoteHostId(fromHost.getAddress(), fromPort);
synchronized (_inboundStates) {
InboundEstablishState state = (InboundEstablishState)_inboundStates.get(from);
if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
@ -73,7 +73,7 @@ public class EstablishmentManager {
}
OutboundEstablishState getOutboundState(InetAddress fromHost, int fromPort) {
String from = PeerState.calculateRemoteHostString(fromHost.getAddress(), fromPort);
RemoteHostId from = new RemoteHostId(fromHost.getAddress(), fromPort);
synchronized (_outboundStates) {
OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(from);
if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
@ -97,7 +97,7 @@ public class EstablishmentManager {
UDPAddress addr = new UDPAddress(ra);
InetAddress remAddr = addr.getHostAddress();
int port = addr.getPort();
String to = PeerState.calculateRemoteHostString(remAddr.getAddress(), port);
RemoteHostId to = new RemoteHostId(remAddr.getAddress(), port);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add outobund establish state to: " + to);
@ -120,7 +120,7 @@ public class EstablishmentManager {
* Got a SessionRequest (initiates an inbound establishment)
*
*/
void receiveSessionRequest(String from, InetAddress host, int port, UDPPacketReader reader) {
void receiveSessionRequest(RemoteHostId from, InetAddress host, int port, UDPPacketReader reader) {
InboundEstablishState state = null;
synchronized (_inboundStates) {
state = (InboundEstablishState)_inboundStates.get(from);
@ -132,7 +132,7 @@ public class EstablishmentManager {
state.receiveSessionRequest(reader.getSessionRequestReader());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session request from: " + state.getRemoteHostInfo());
_log.debug("Receive session request from: " + state.getRemoteHostId().toString());
notifyActivity();
}
@ -141,7 +141,7 @@ public class EstablishmentManager {
* got a SessionConfirmed (should only happen as part of an inbound
* establishment)
*/
void receiveSessionConfirmed(String from, UDPPacketReader reader) {
void receiveSessionConfirmed(RemoteHostId from, UDPPacketReader reader) {
InboundEstablishState state = null;
synchronized (_inboundStates) {
state = (InboundEstablishState)_inboundStates.get(from);
@ -150,7 +150,7 @@ public class EstablishmentManager {
state.receiveSessionConfirmed(reader.getSessionConfirmedReader());
notifyActivity();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session confirmed from: " + state.getRemoteHostInfo());
_log.debug("Receive session confirmed from: " + state.getRemoteHostId().toString());
}
}
@ -158,7 +158,7 @@ public class EstablishmentManager {
* Got a SessionCreated (in response to our outbound SessionRequest)
*
*/
void receiveSessionCreated(String from, UDPPacketReader reader) {
void receiveSessionCreated(RemoteHostId from, UDPPacketReader reader) {
OutboundEstablishState state = null;
synchronized (_outboundStates) {
state = (OutboundEstablishState)_outboundStates.get(from);
@ -167,7 +167,7 @@ public class EstablishmentManager {
state.receiveSessionCreated(reader.getSessionCreatedReader());
notifyActivity();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session created from: " + state.getRemoteHostInfo());
_log.debug("Receive session created from: " + state.getRemoteHostId().toString());
}
}
@ -179,7 +179,7 @@ public class EstablishmentManager {
PeerState receiveData(OutboundEstablishState state) {
state.dataReceived();
synchronized (_outboundStates) {
_outboundStates.remove(state.getRemoteHostInfo());
_outboundStates.remove(state.getRemoteHostId());
}
if (_log.shouldLog(Log.INFO))
_log.info("Outbound established completely! yay");
@ -206,7 +206,7 @@ public class EstablishmentManager {
*/
private void handleCompletelyEstablished(InboundEstablishState state) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle completely established (inbound): " + state.getRemoteHostInfo());
_log.debug("Handle completely established (inbound): " + state.getRemoteHostId().toString());
long now = _context.clock().now();
RouterIdentity remote = state.getConfirmedIdentity();
PeerState peer = new PeerState(_context);
@ -236,7 +236,7 @@ public class EstablishmentManager {
*/
private PeerState handleCompletelyEstablished(OutboundEstablishState state) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle completely established (outbound): " + state.getRemoteHostInfo());
_log.debug("Handle completely established (outbound): " + state.getRemoteHostId().toString());
long now = _context.clock().now();
RouterIdentity remote = state.getRemoteIdentity();
PeerState peer = new PeerState(_context);
@ -284,7 +284,7 @@ public class EstablishmentManager {
state.setSentRelayTag(0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send created to: " + state.getRemoteHostInfo());
_log.debug("Send created to: " + state.getRemoteHostId().toString());
state.generateSessionKey();
_transport.send(_builder.buildSessionCreatedPacket(state, _transport.getExternalPort(), _transport.getIntroKey()));
@ -297,7 +297,7 @@ public class EstablishmentManager {
long now = _context.clock().now();
state.prepareSessionRequest();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send request to: " + state.getRemoteHostInfo());
_log.debug("Send request to: " + state.getRemoteHostId().toString());
_transport.send(_builder.buildSessionRequestPacket(state));
state.requestSent();
}
@ -317,7 +317,7 @@ public class EstablishmentManager {
UDPPacket packets[] = _builder.buildSessionConfirmedPackets(state, _context.router().getRouterInfo().getIdentity());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send confirm to: " + state.getRemoteHostInfo());
_log.debug("Send confirm to: " + state.getRemoteHostId().toString());
for (int i = 0; i < packets.length; i++)
_transport.send(packets[i]);

View File

@ -52,7 +52,7 @@ public class InboundEstablishState {
private long _lastReceive;
private long _lastSend;
private long _nextSend;
private String _remoteHostInfo;
private RemoteHostId _remoteHostId;
private int _currentState;
/** nothin known yet */
@ -71,7 +71,7 @@ public class InboundEstablishState {
_log = ctx.logManager().getLog(InboundEstablishState.class);
_aliceIP = remoteHost.getAddress();
_alicePort = remotePort;
_remoteHostInfo = PeerState.calculateRemoteHostString(_aliceIP, _alicePort);
_remoteHostId = new RemoteHostId(_aliceIP, _alicePort);
_bobPort = localPort;
_keyBuilder = null;
_verificationAttempted = false;
@ -190,8 +190,8 @@ public class InboundEstablishState {
public synchronized long getNextSendTime() { return _nextSend; }
public synchronized void setNextSendTime(long when) { _nextSend = when; }
/** host+port, uniquely identifies an attempt */
public String getRemoteHostInfo() { return _remoteHostInfo; }
/** RemoteHostId, uniquely identifies an attempt */
public RemoteHostId getRemoteHostId() { return _remoteHostId; }
public synchronized void receiveSessionConfirmed(UDPPacketReader.SessionConfirmedReader conf) {
if (_receivedIdentity == null)

View File

@ -2,9 +2,11 @@ package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.util.DecayingBloomFilter;
import net.i2p.util.I2PThread;
@ -23,7 +25,7 @@ import net.i2p.util.Log;
* a scheduled event)
*
*/
public class InboundMessageFragments {
public class InboundMessageFragments implements UDPTransport.PartialACKSource {
private RouterContext _context;
private Log _log;
/** Map of peer (Hash) to a Map of messageId (Long) to InboundMessageState objects */
@ -87,6 +89,7 @@ public class InboundMessageFragments {
receiveACKs(from, data);
long afterACKs = _context.clock().now();
from.packetReceived();
_context.statManager().addRateData("udp.receiveMessagePeriod", afterMsgs-beforeMsgs, afterACKs-beforeMsgs);
_context.statManager().addRateData("udp.receiveACKPeriod", afterACKs-afterMsgs, afterACKs-beforeMsgs);
}
@ -100,7 +103,7 @@ public class InboundMessageFragments {
private void receiveMessages(PeerState from, UDPPacketReader.DataReader data) {
int fragments = data.readFragmentCount();
if (fragments <= 0) return;
synchronized (_inboundMessages) {
synchronized (_inboundMessages) { // XXX: CHOKE POINT (to what extent?)
Map messages = (Map)_inboundMessages.get(from.getRemotePeer());
if (messages == null) {
messages = new HashMap(fragments);
@ -158,6 +161,12 @@ public class InboundMessageFragments {
if (_log.shouldLog(Log.WARN))
_log.warn("Message expired while only being partially read: " + state);
state.releaseResources();
} else {
// not expired but not yet complete... lets queue up a partial ACK
if (_log.shouldLog(Log.DEBUG))
_log.debug("Queueing up a partial ACK for peer: " + from + " for " + state);
from.messagePartiallyReceived();
_ackSender.ackPeer(from);
}
if (!fragmentOK)
@ -172,7 +181,7 @@ public class InboundMessageFragments {
long acks[] = data.readACKs();
if (acks != null) {
_context.statManager().addRateData("udp.receivedACKs", acks.length, 0);
_context.statManager().getStatLog().addData(from.getRemoteHostString(), "udp.peer.receiveACKCount", acks.length, 0);
//_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receiveACKCount", acks.length, 0);
for (int i = 0; i < acks.length; i++) {
if (_log.shouldLog(Log.INFO))
@ -183,9 +192,40 @@ public class InboundMessageFragments {
_log.error("Received ACKs with no acks?! " + data);
}
}
if (data.readACKBitfieldsIncluded()) {
ACKBitfield bitfields[] = data.readACKBitfields();
if (bitfields != null) {
//_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receivePartialACKCount", bitfields.length, 0);
for (int i = 0; i < bitfields.length; i++) {
if (_log.shouldLog(Log.INFO))
_log.info("Partial ACK received: " + bitfields[i]);
_outbound.acked(bitfields[i], from.getRemotePeer());
}
}
}
if (data.readECN())
from.ECNReceived();
else
from.dataReceived();
}
public void fetchPartialACKs(Hash fromPeer, List ackBitfields) {
synchronized (_inboundMessages) {
Map messages = (Map)_inboundMessages.get(fromPeer);
if (messages == null)
return;
for (Iterator iter = messages.values().iterator(); iter.hasNext(); ) {
InboundMessageState state = (InboundMessageState)iter.next();
if (state.isExpired()) {
iter.remove();
} else {
if (!state.isComplete())
ackBitfields.add(state.createACKBitfield());
}
}
if (messages.size() <= 0)
_inboundMessages.remove(fromPeer);
}
}
}

View File

@ -27,9 +27,9 @@ public class InboundMessageState {
private long _receiveBegin;
private int _completeSize;
/** expire after 30s */
/** expire after 10s */
private static final long MAX_RECEIVE_TIME = 10*1000;
private static final int MAX_FRAGMENTS = 32;
private static final int MAX_FRAGMENTS = 64;
private static final ByteCache _fragmentCache = ByteCache.getInstance(64, 2048);
@ -49,7 +49,7 @@ public class InboundMessageState {
*
* @return true if the data was ok, false if it was corrupt
*/
public synchronized boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) {
public boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) {
int fragmentNum = data.readMessageFragmentNum(dataFragment);
if ( (fragmentNum < 0) || (fragmentNum > _fragments.length)) {
StringBuffer buf = new StringBuffer(1024);
@ -72,14 +72,14 @@ public class InboundMessageState {
return true;
}
public synchronized boolean isComplete() {
public boolean isComplete() {
if (_lastFragment < 0) return false;
for (int i = 0; i <= _lastFragment; i++)
if (_fragments[i] == null)
return false;
return true;
}
public synchronized boolean isExpired() {
public boolean isExpired() {
return _context.clock().now() > _receiveBegin + MAX_RECEIVE_TIME;
}
public long getLifetime() {
@ -87,7 +87,7 @@ public class InboundMessageState {
}
public Hash getFrom() { return _from; }
public long getMessageId() { return _messageId; }
public synchronized int getCompleteSize() {
public int getCompleteSize() {
if (_completeSize < 0) {
int size = 0;
for (int i = 0; i <= _lastFragment; i++)
@ -96,6 +96,46 @@ public class InboundMessageState {
}
return _completeSize;
}
public ACKBitfield createACKBitfield() {
return new PartialBitfield(_messageId, _fragments);
}
private static final class PartialBitfield implements ACKBitfield {
private long _bitfieldMessageId;
private boolean _fragmentsReceived[];
public PartialBitfield(long messageId, Object data[]) {
_bitfieldMessageId = messageId;
for (int i = data.length - 1; i >= 0; i--) {
if (data[i] != null) {
if (_fragmentsReceived == null)
_fragmentsReceived = new boolean[i+1];
_fragmentsReceived[i] = true;
}
}
if (_fragmentsReceived == null)
_fragmentsReceived = new boolean[0];
}
public int fragmentCount() { return _fragmentsReceived.length; }
public long getMessageId() { return _bitfieldMessageId; }
public boolean received(int fragmentNum) {
if ( (fragmentNum < 0) || (fragmentNum >= _fragmentsReceived.length) )
return false;
return _fragmentsReceived[fragmentNum];
}
public boolean receivedComplete() { return false; }
public String toString() {
StringBuffer buf = new StringBuffer(64);
buf.append("Partial ACK of ");
buf.append(_bitfieldMessageId);
buf.append(" with ACKs for: ");
for (int i = 0; i < _fragmentsReceived.length; i++)
if (_fragmentsReceived[i])
buf.append(i).append(" ");
return buf.toString();
}
}
public void releaseResources() {
if (_fragments != null)

View File

@ -10,6 +10,7 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.router.RouterContext;
import net.i2p.util.ByteCache;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@ -25,12 +26,14 @@ public class MessageReceiver implements Runnable {
/** list of messages (InboundMessageState) fully received but not interpreted yet */
private List _completeMessages;
private boolean _alive;
private ByteCache _cache;
public MessageReceiver(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(MessageReceiver.class);
_transport = transport;
_completeMessages = new ArrayList(16);
_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
_alive = true;
}
@ -70,8 +73,7 @@ public class MessageReceiver implements Runnable {
if (message != null) {
int size = message.getCompleteSize();
if (_log.shouldLog(Log.INFO))
_log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime()
+ "... todo: parse and plop it onto InNetMessagePool");
_log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());
I2NPMessage msg = readMessage(message);
if (msg != null)
_transport.messageReceived(msg, null, message.getFrom(), message.getLifetime(), size);
@ -81,21 +83,24 @@ public class MessageReceiver implements Runnable {
}
private I2NPMessage readMessage(InboundMessageState state) {
ByteArray buf = _cache.acquire();
try {
byte buf[] = new byte[state.getCompleteSize()];
//byte buf[] = new byte[state.getCompleteSize()];
ByteArray fragments[] = state.getFragments();
int numFragments = state.getFragmentCount();
int off = 0;
for (int i = 0; i < numFragments; i++) {
System.arraycopy(fragments[i].getData(), 0, buf, off, fragments[i].getValid());
System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": "
+ Base64.encode(fragments[i].getData(), 0, fragments[i].getValid()));
off += fragments[i].getValid();
}
if (off != state.getCompleteSize())
_log.error("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw byte array for " + state.getMessageId() + ": " + Base64.encode(buf));
I2NPMessage m = I2NPMessageImpl.fromRawByteArray(_context, buf, 0, buf.length);
_log.debug("Raw byte array for " + state.getMessageId() + ": " + Base64.encode(buf.getData(), 0, state.getCompleteSize()));
I2NPMessage m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize());
m.setUniqueId(state.getMessageId());
return m;
} catch (I2NPMessageException ime) {
@ -107,6 +112,7 @@ public class MessageReceiver implements Runnable {
return null;
} finally {
state.releaseResources();
_cache.release(buf);
}
}
}

View File

@ -51,7 +51,7 @@ public class OutboundEstablishState {
private long _lastReceive;
private long _lastSend;
private long _nextSend;
private String _remoteHostInfo;
private RemoteHostId _remoteHostId;
private RouterIdentity _remotePeer;
private SessionKey _introKey;
private List _queuedMessages;
@ -74,7 +74,7 @@ public class OutboundEstablishState {
_log = ctx.logManager().getLog(OutboundEstablishState.class);
_bobIP = remoteHost.getAddress();
_bobPort = remotePort;
_remoteHostInfo = PeerState.calculateRemoteHostString(_bobIP, _bobPort);
_remoteHostId = new RemoteHostId(_bobIP, _bobPort);
_remotePeer = remotePeer;
_introKey = introKey;
_keyBuilder = null;
@ -172,7 +172,7 @@ public class OutboundEstablishState {
return true;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Session created failed validation, clearing state");
_log.warn("Session created failed validation, clearing state for " + _remoteHostId.toString());
_receivedY = null;
_aliceIP = null;
_receivedRelayTag = 0;
@ -244,7 +244,8 @@ public class OutboundEstablishState {
DataHelper.toLong(signed, off, 4, _receivedRelayTag);
off += 4;
DataHelper.toLong(signed, off, 4, _receivedSignedOnTime);
if (_log.shouldLog(Log.DEBUG)) {
boolean valid = _context.dsa().verifySignature(_receivedSignature, signed, _remotePeer.getSigningPublicKey());
if (!valid || _log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(128);
buf.append("Signed sessionCreated:");
buf.append(" AliceIP: ").append(Base64.encode(_aliceIP));
@ -254,9 +255,12 @@ public class OutboundEstablishState {
buf.append(" RelayTag: ").append(_receivedRelayTag);
buf.append(" SignedOn: ").append(_receivedSignedOnTime);
buf.append(" signature: ").append(Base64.encode(_receivedSignature.getData()));
_log.debug(buf.toString());
if (valid)
_log.debug(buf.toString());
else if (_log.shouldLog(Log.WARN))
_log.warn("INVALID: " + buf.toString());
}
return _context.dsa().verifySignature(_receivedSignature, signed, _remotePeer.getSigningPublicKey());
return valid;
}
public synchronized SessionKey getCipherKey() { return _sessionKey; }
@ -331,8 +335,8 @@ public class OutboundEstablishState {
_log.debug("Explicit nextSend=" + (_nextSend-_context.clock().now()), new Exception("Set by"));
}
/** host+port, uniquely identifies an attempt */
public String getRemoteHostInfo() { return _remoteHostInfo; }
/** uniquely identifies an attempt */
public RemoteHostId getRemoteHostId() { return _remoteHostId; }
/** we have received a real data packet, so we're done establishing */
public synchronized void dataReceived() {

View File

@ -13,7 +13,7 @@ import net.i2p.util.Log;
* Coordinate the outbound fragments and select the next one to be built.
* This pool contains messages we are actively trying to send, essentially
* doing a round robin across each message to send one fragment, as implemented
* in {@link #getNextPacket()}. This also honors per-peer throttling, taking
* in {@link #getNextVolley()}. This also honors per-peer throttling, taking
* note of each peer's allocations. If a message has each of its fragments
* sent more than a certain number of times, it is failed out. In addition,
* this instance also receives notification of message ACKs from the
@ -57,6 +57,9 @@ public class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the active pool when a new one is added", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.activeDelay", "How often we wait blocking on the active queue", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public void startup() { _alive = true; }
@ -74,19 +77,27 @@ public class OutboundMessageFragments {
* @return true if more messages are allowed
*/
public boolean waitForMoreAllowed() {
// test without choking.
// perhaps this should check the lifetime of the first activeMessage?
if (false) return true;
long start = _context.clock().now();
int numActive = 0;
while (_alive) {
finishMessages();
try {
synchronized (_activeMessages) {
numActive = _activeMessages.size();
if (!_alive)
return false;
else if (_activeMessages.size() < MAX_ACTIVE)
else if (numActive < MAX_ACTIVE)
return true;
else if (_allowExcess)
return true;
else
_activeMessages.wait(1000);
}
_context.statManager().addRateData("udp.activeDelay", numActive, _context.clock().now() - start);
} catch (InterruptedException ie) {}
}
return false;
@ -208,63 +219,61 @@ public class OutboundMessageFragments {
long now = _context.clock().now();
long nextSend = -1;
finishMessages();
synchronized (_activeMessages) {
for (int i = 0; i < _activeMessages.size(); i++) {
int cur = (i + _nextPacketMessage) % _activeMessages.size();
state = (OutboundMessageState)_activeMessages.get(cur);
peer = state.getPeer(); // known if this is immediately after establish
if (peer == null)
peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash());
if ((peer != null) && locked_shouldSend(state, peer)) {
// for fairness, we move on in a round robin
_nextPacketMessage = i + 1;
break;
} else {
if (peer == null) {
// peer disconnected
_activeMessages.remove(cur);
_transport.failed(state);
if (_log.shouldLog(Log.WARN))
_log.warn("Peer disconnected for " + state);
if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) )
_throttle.unchoke(peer.getRemotePeer());
state.releaseResources();
i--;
}
try {
synchronized (_activeMessages) {
for (int i = 0; i < _activeMessages.size(); i++) {
int cur = (i + _nextPacketMessage) % _activeMessages.size();
state = (OutboundMessageState)_activeMessages.get(cur);
peer = state.getPeer(); // known if this is immediately after establish
if (peer == null)
peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash());
long time = state.getNextSendTime();
if ( (nextSend < 0) || (time < nextSend) )
nextSend = time;
state = null;
peer = null;
}
} // end of the for(activeMessages)
if (state == null) {
if (nextSend <= 0) {
try {
if ((peer != null) && locked_shouldSend(state, peer)) {
// for fairness, we move on in a round robin
_nextPacketMessage = i + 1;
break;
} else {
if (peer == null) {
// peer disconnected
_activeMessages.remove(cur);
_transport.failed(state);
if (_log.shouldLog(Log.WARN))
_log.warn("Peer disconnected for " + state);
if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) )
_throttle.unchoke(peer.getRemotePeer());
state.releaseResources();
i--;
}
long time = state.getNextSendTime();
if ( (nextSend < 0) || (time < nextSend) )
nextSend = time;
state = null;
peer = null;
}
} // end of the for(activeMessages)
if (state == null) {
if (nextSend <= 0) {
_activeMessages.notifyAll();
_activeMessages.wait(1000);
} else {
// none of the packets were eligible for sending
long delay = nextSend - now;
if (delay <= 0)
delay = 10;
if (delay > 1000)
delay = 1000;
_allowExcess = true;
_activeMessages.notifyAll();
_activeMessages.wait();
} catch (InterruptedException ie) {}
} else {
// none of the packets were eligible for sending
long delay = nextSend - now;
if (delay <= 0)
delay = 10;
if (delay > 1000)
delay = 1000;
_allowExcess = true;
_activeMessages.notifyAll();
try {
_activeMessages.wait(delay);
} catch (InterruptedException ie) {}
}
} else {
_activeMessages.notifyAll();
}
} else {
_activeMessages.notifyAll();
}
_allowExcess = false;
} // end of the synchronized block
_allowExcess = false;
} // end of the synchronized block
} catch (InterruptedException ie) {}
} // end of the while (alive && !found)
return preparePackets(state, peer);
@ -290,10 +299,17 @@ public class OutboundMessageFragments {
+ " for message " + state.getMessageId() + ": " + state);
if (state.getPushCount() > 0) {
peer.messageRetransmitted();
int fragments = state.getFragmentCount();
int toSend = 0;
for (int i = 0; i < fragments; i++) {
if (state.needsSending(i))
toSend++;
}
peer.messageRetransmitted(toSend);
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmitting " + state + " to " + peer);
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount());
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), toSend);
}
state.push();
@ -326,13 +342,19 @@ public class OutboundMessageFragments {
if (fragments < 0)
return null;
if (_log.shouldLog(Log.INFO))
_log.info("Building packet for " + state + " to " + peer);
int sparseCount = 0;
UDPPacket rv[] = new UDPPacket[fragments]; //sparse
for (int i = 0; i < fragments; i++) {
if (state.needsSending(i))
rv[i] = _builder.buildPacket(state, i, peer);
else
sparseCount++;
}
if (sparseCount > 0)
_context.statManager().addRateData("udp.sendSparse", sparseCount, state.getLifetime());
if (_log.shouldLog(Log.INFO))
_log.info("Building packet for " + state + " to " + peer + " with sparse count: " + sparseCount);
peer.packetsTransmitted(fragments - sparseCount);
return rv;
} else {
// !alive
@ -413,50 +435,83 @@ public class OutboundMessageFragments {
}
}
/**
* Receive a set of fragment ACKs for a given messageId from the
* specified peer
*
*/
public void acked(long messageId, int ackedFragments[], Hash ackedBy) {
if (_log.shouldLog(Log.INFO))
_log.info("Received partial ack of " + messageId + " by " + ackedBy.toBase64());
public void acked(ACKBitfield bitfield, Hash ackedBy) {
if (bitfield.receivedComplete()) {
acked(bitfield.getMessageId(), ackedBy);
return;
}
OutboundMessageState state = null;
boolean isComplete = false;
synchronized (_activeMessages) {
// linear search, since its tiny
for (int i = 0; i < _activeMessages.size(); i++) {
state = (OutboundMessageState)_activeMessages.get(i);
if (state.getMessage().getMessageId() == messageId) {
Hash expectedBy = state.getMessage().getTarget().getIdentity().calculateHash();
if (!expectedBy.equals(ackedBy)) {
return;
} else {
state.acked(ackedFragments);
if (state.isComplete()) {
_activeMessages.remove(i);
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
if (state.getMessageId() == bitfield.getMessageId()) {
OutNetMessage msg = state.getMessage();
if (msg != null) {
Hash expectedBy = msg.getTarget().getIdentity().getHash();
if (!expectedBy.equals(ackedBy)) {
state = null;
_activeMessages.notifyAll();
return;
}
break;
}
isComplete = state.acked(bitfield);
if (isComplete) {
// either the message was a short circuit after establishment,
// or it was received from who we sent it to. yay!
_activeMessages.remove(i);
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
}
break;
} else {
state = null;
}
}
_activeMessages.notifyAll();
}
if ( (state != null) && (state.isComplete()) ) {
if (state != null) {
int numSends = state.getMaxSends();
int bits = bitfield.fragmentCount();
int numACKed = 0;
for (int i = 0; i < bits; i++)
if (bitfield.received(i))
numACKed++;
_context.statManager().addRateData("udp.partialACKReceived", numACKed, state.getLifetime());
if (_log.shouldLog(Log.INFO))
_log.info("Received ack of " + messageId + " by " + ackedBy.toBase64()
+ " after " + state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
_transport.succeeded(state.getMessage());
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
state.releaseResources();
_log.info("Received partial ack of " + state.getMessageId() + " by " + ackedBy.toBase64()
+ " after " + state.getLifetime() + " and " + numSends + " sends: " + bitfield + ": completely removed? "
+ isComplete + ": " + state);
if (isComplete) {
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
_transport.succeeded(state.getMessage());
if (state.getPeer() != null) {
// this adjusts the rtt/rto/window/etc
state.getPeer().messageACKed(state.getFragmentCount()*state.getFragmentSize(), state.getLifetime(), 0);
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
}
state.releaseResources();
}
return;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Received an ACK for a message not pending: " + bitfield);
return;
}
}

View File

@ -44,7 +44,7 @@ public class OutboundMessageState {
_nextSendFragment = 0;
}
public synchronized boolean initialize(OutNetMessage msg) {
public boolean initialize(OutNetMessage msg) {
try {
initialize(msg, msg.getMessage(), null);
return true;
@ -68,7 +68,7 @@ public class OutboundMessageState {
}
}
private synchronized void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
private void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
_message = m;
_peer = peer;
if (_messageBuf != null) {
@ -93,10 +93,10 @@ public class OutboundMessageState {
_log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
}
public synchronized void releaseResources() {
public void releaseResources() {
if (_messageBuf != null)
_cache.release(_messageBuf);
_messageBuf = null;
//_messageBuf = null;
}
public OutNetMessage getMessage() { return _message; }
@ -107,23 +107,26 @@ public class OutboundMessageState {
return _expiration < _context.clock().now();
}
public boolean isComplete() {
if (_fragmentSends == null) return false;
for (int i = 0; i < _fragmentSends.length; i++)
if (_fragmentSends[i] >= 0)
short sends[] = _fragmentSends;
if (sends == null) return false;
for (int i = 0; i < sends.length; i++)
if (sends[i] >= 0)
return false;
// nothing else pending ack
return true;
}
public synchronized int getUnackedSize() {
public int getUnackedSize() {
short fragmentSends[] = _fragmentSends;
ByteArray messageBuf = _messageBuf;
int rv = 0;
if ( (_messageBuf != null) && (_fragmentSends != null) ) {
int totalSize = _messageBuf.getValid();
if ( (messageBuf != null) && (fragmentSends != null) ) {
int totalSize = messageBuf.getValid();
int lastSize = totalSize % _fragmentSize;
if (lastSize == 0)
lastSize = _fragmentSize;
for (int i = 0; i < _fragmentSends.length; i++) {
if (_fragmentSends[i] >= (short)0) {
if (i + 1 == _fragmentSends.length)
for (int i = 0; i < fragmentSends.length; i++) {
if (fragmentSends[i] >= (short)0) {
if (i + 1 == fragmentSends.length)
rv += lastSize;
else
rv += _fragmentSize;
@ -132,23 +135,45 @@ public class OutboundMessageState {
}
return rv;
}
public synchronized boolean needsSending(int fragment) {
if ( (_fragmentSends == null) || (fragment >= _fragmentSends.length) || (fragment < 0) )
public boolean needsSending(int fragment) {
short sends[] = _fragmentSends;
if ( (sends == null) || (fragment >= sends.length) || (fragment < 0) )
return false;
return (_fragmentSends[fragment] >= (short)0);
return (sends[fragment] >= (short)0);
}
public long getLifetime() { return _context.clock().now() - _startedOn; }
/**
* Ack all the fragments in the ack list
* Ack all the fragments in the ack list. As a side effect, if there are
* still unacked fragments, the 'next send' time will be updated under the
* assumption that that all of the packets within a volley would reach the
* peer within that ack frequency (2-400ms).
*
* @return true if the message was completely ACKed
*/
public void acked(int ackedFragments[]) {
public boolean acked(ACKBitfield bitfield) {
// stupid brute force, but the cardinality should be trivial
for (int i = 0; i < ackedFragments.length; i++) {
if ( (ackedFragments[i] < 0) || (ackedFragments[i] >= _fragmentSends.length) )
continue;
_fragmentSends[ackedFragments[i]] = -1;
short sends[] = _fragmentSends;
if (sends != null)
for (int i = 0; i < bitfield.fragmentCount(); i++)
if (bitfield.received(i))
sends[i] = (short)-1;
boolean rv = isComplete();
if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed
long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY);
//_nextSendTime = Math.max(now, _startedOn+PeerState.MIN_RTO);
if (_nextSendTime <= 0)
_nextSendTime = nextTime;
else
_nextSendTime = Math.min(_nextSendTime, nextTime);
//if (now + 100 > _nextSendTime)
// _nextSendTime = now + 100;
//_nextSendTime = now;
}
return rv;
}
public long getNextSendTime() { return _nextSendTime; }
@ -156,7 +181,7 @@ public class OutboundMessageState {
public int getMaxSends() { return _maxSends; }
public int getPushCount() { return _pushCount; }
/** note that we have pushed the message fragments */
public synchronized void push() {
public void push() {
_pushCount++;
if (_pushCount > _maxSends)
_maxSends = (short)_pushCount;
@ -172,7 +197,7 @@ public class OutboundMessageState {
* fragmentSize bytes per fragment.
*
*/
public synchronized void fragment(int fragmentSize) {
public void fragment(int fragmentSize) {
int totalSize = _messageBuf.getValid();
int numFragments = totalSize / fragmentSize;
if (numFragments * fragmentSize != totalSize)
@ -198,7 +223,7 @@ public class OutboundMessageState {
public int getFragmentSize() { return _fragmentSize; }
/** should we continue sending this fragment? */
public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; }
public synchronized int fragmentSize(int fragmentNum) {
public int fragmentSize(int fragmentNum) {
if (_messageBuf == null) return -1;
if (fragmentNum + 1 == _fragmentSends.length)
return _messageBuf.getValid() % _fragmentSize;
@ -206,81 +231,6 @@ public class OutboundMessageState {
return _fragmentSize;
}
public void incrementCurrentFragment() {
int cur = _nextSendFragment;
_fragmentSends[cur]++;
_maxSends = _fragmentSends[cur];
_nextSendFragment++;
if (_nextSendFragment >= _fragmentSends.length) {
_nextSendFragment = 0;
_pushCount++;
}
}
/**
* Pick a fragment that we still need to send. Current implementation
* picks the fragment which has been sent the least (randomly choosing
* among equals), incrementing the # sends of the winner in the process.
*
* @return fragment index, or -1 if all of the fragments were acked
*/
public int pickNextFragment() {
if (true) {
return _nextSendFragment;
}
short minValue = -1;
int minIndex = -1;
int startOffset = _context.random().nextInt(_fragmentSends.length);
for (int i = 0; i < _fragmentSends.length; i++) {
int cur = (i + startOffset) % _fragmentSends.length;
if (_fragmentSends[cur] < (short)0)
continue;
else if ( (minValue < (short)0) || (_fragmentSends[cur] < minValue) ) {
minValue = _fragmentSends[cur];
minIndex = cur;
}
}
if (minIndex >= 0) {
_fragmentSends[minIndex]++;
if (_fragmentSends[minIndex] > _maxSends)
_maxSends = _fragmentSends[minIndex];
}
// if all fragments have now been sent an equal number of times,
// lets give pause for an ACK
boolean endOfVolley = true;
for (int i = 0; i < _fragmentSends.length; i++) {
if (_fragmentSends[i] < (short)0)
continue;
if (_fragmentSends[i] != (short)_pushCount+1) {
endOfVolley = false;
break;
}
}
if (endOfVolley) {
_pushCount++;
}
if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(64);
buf.append("Next fragment is ").append(minIndex);
if (minIndex >= 0) {
buf.append(" (#sends: ").append(_fragmentSends[minIndex]-1);
buf.append(" #fragments: ").append(_fragmentSends.length);
buf.append(")");
}
_log.debug(buf.toString());
}
return minIndex;
}
public boolean justBeganVolley() {
if (_fragmentSends.length == 1)
return true;
else
return _nextSendFragment == 1;
}
/**
* Write a part of the the message onto the specified buffer.
*
@ -289,7 +239,7 @@ public class OutboundMessageState {
* @param fragmentNum fragment to write (0 indexed)
* @return bytesWritten
*/
public synchronized int writeFragment(byte out[], int outOffset, int fragmentNum) {
public int writeFragment(byte out[], int outOffset, int fragmentNum) {
int start = _fragmentSize * fragmentNum;
int end = start + _fragmentSize;
if (_messageBuf == null) return -1;
@ -303,15 +253,23 @@ public class OutboundMessageState {
return toSend;
}
public synchronized String toString() {
public String toString() {
short sends[] = _fragmentSends;
ByteArray messageBuf = _messageBuf;
StringBuffer buf = new StringBuffer(64);
buf.append("Message ").append(_messageId);
if (_fragmentSends != null)
buf.append(" with ").append(_fragmentSends.length).append(" fragments");
if (_messageBuf != null)
buf.append(" of size ").append(_messageBuf.getValid());
if (sends != null)
buf.append(" with ").append(sends.length).append(" fragments");
if (messageBuf != null)
buf.append(" of size ").append(messageBuf.getValid());
buf.append(" volleys: ").append(_maxSends);
buf.append(" lifetime: ").append(getLifetime());
if (sends != null) {
buf.append(" pending fragments: ");
for (int i = 0; i < sends.length; i++)
if (sends[i] >= 0)
buf.append(i).append(' ');
}
return buf.toString();
}
}

View File

@ -60,15 +60,16 @@ public class PacketBuilder {
DataHelper.toLong(data, off, 4, state.getMessageId());
off += 4;
data[off] |= fragment << 3;
data[off] |= fragment << 1;
if (fragment == state.getFragmentCount() - 1)
data[off] |= 1 << 2; // isLast
data[off] |= 1; // isLast
off++;
int size = state.fragmentSize(fragment);
if (size < 0)
return null;
DataHelper.toLong(data, off, 2, size);
data[off] &= (byte)3F; // 2 highest bits are reserved
off += 2;
size = state.writeFragment(data, off, fragment);
@ -81,12 +82,18 @@ public class PacketBuilder {
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
return packet;
}
public UDPPacket buildACK(PeerState peer, List ackedMessageIds) {
private static final int ACK_PRIORITY = 1;
/**
* @param ackBitfields list of ACKBitfield instances to either fully or partially ACK
*/
public UDPPacket buildACK(PeerState peer, List ackBitfields) {
UDPPacket packet = UDPPacket.acquire(_context);
byte data[] = packet.getPacket().getData();
@ -101,17 +108,55 @@ public class PacketBuilder {
DataHelper.toLong(data, off, 4, now);
off += 4;
int fullACKCount = 0;
int partialACKCount = 0;
for (int i = 0; i < ackBitfields.size(); i++) {
if (((ACKBitfield)ackBitfields.get(i)).receivedComplete())
fullACKCount++;
else
partialACKCount++;
}
// ok, now for the body...
data[off] |= UDPPacket.DATA_FLAG_EXPLICIT_ACK;
if (fullACKCount > 0)
data[off] |= UDPPacket.DATA_FLAG_EXPLICIT_ACK;
if (partialACKCount > 0)
data[off] |= UDPPacket.DATA_FLAG_ACK_BITFIELDS;
// add ECN if (peer.getSomethingOrOther())
off++;
DataHelper.toLong(data, off, 1, ackedMessageIds.size());
off++;
for (int i = 0; i < ackedMessageIds.size(); i++) {
Long id = (Long)ackedMessageIds.get(i);
DataHelper.toLong(data, off, 4, id.longValue());
off += 4;
if (fullACKCount > 0) {
DataHelper.toLong(data, off, 1, fullACKCount);
off++;
for (int i = 0; i < ackBitfields.size(); i++) {
ACKBitfield bf = (ACKBitfield)ackBitfields.get(i);
if (bf.receivedComplete()) {
DataHelper.toLong(data, off, 4, bf.getMessageId());
off += 4;
}
}
}
if (partialACKCount > 0) {
DataHelper.toLong(data, off, 1, partialACKCount);
off++;
for (int i = 0; i < ackBitfields.size(); i++) {
ACKBitfield bitfield = (ACKBitfield)ackBitfields.get(i);
if (bitfield.receivedComplete()) continue;
DataHelper.toLong(data, off, 4, bitfield.getMessageId());
off += 4;
int bits = bitfield.fragmentCount();
int size = (bits / 7) + 1;
for (int curByte = 0; curByte < size; curByte++) {
if (curByte + 1 < size)
data[off] |= (byte)(1 << 7);
for (int curBit = 0; curBit < 7; curBit++) {
if (bitfield.received(curBit + 7*curByte))
data[off] |= (byte)(1 << curBit);
}
off++;
}
}
}
DataHelper.toLong(data, off, 1, 0); // no fragments in this message
@ -123,6 +168,7 @@ public class PacketBuilder {
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
return packet;
@ -142,15 +188,16 @@ public class PacketBuilder {
*/
public UDPPacket buildSessionCreatedPacket(InboundEstablishState state, int externalPort, SessionKey ourIntroKey) {
UDPPacket packet = UDPPacket.acquire(_context);
InetAddress to = null;
try {
to = InetAddress.getByAddress(state.getSentIP());
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.ERROR))
_log.error("How did we think this was a valid IP? " + state.getRemoteHostInfo());
_log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString());
return null;
}
state.prepareSessionCreated();
byte data[] = packet.getPacket().getData();
@ -214,6 +261,7 @@ public class PacketBuilder {
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, ourIntroKey, ourIntroKey, iv);
setTo(packet, to, state.getSentPort());
_ivCache.release(iv);
@ -239,7 +287,7 @@ public class PacketBuilder {
to = InetAddress.getByAddress(state.getSentIP());
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.ERROR))
_log.error("How did we think this was a valid IP? " + state.getRemoteHostInfo());
_log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString());
return null;
}
@ -272,6 +320,7 @@ public class PacketBuilder {
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, state.getIntroKey(), state.getIntroKey());
setTo(packet, to, state.getSentPort());
return packet;
@ -315,7 +364,7 @@ public class PacketBuilder {
to = InetAddress.getByAddress(state.getSentIP());
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.ERROR))
_log.error("How did we think this was a valid IP? " + state.getRemoteHostInfo());
_log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString());
return null;
}
@ -366,6 +415,7 @@ public class PacketBuilder {
System.arraycopy(state.getSentSignature().getData(), 0, data, off, Signature.SIGNATURE_BYTES);
packet.getPacket().setLength(off + Signature.SIGNATURE_BYTES);
packet.setPacketDataLength(off + Signature.SIGNATURE_BYTES);
authenticate(packet, state.getCipherKey(), state.getMACKey());
} else {
// nothing more to add beyond the identity fragment, though we can
@ -375,6 +425,7 @@ public class PacketBuilder {
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
packet.setPacketDataLength(off);
authenticate(packet, state.getIntroKey(), state.getIntroKey());
}
@ -417,7 +468,7 @@ public class PacketBuilder {
*/
private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, ByteArray iv) {
int encryptOffset = packet.getPacket().getOffset() + UDPPacket.IV_SIZE + UDPPacket.MAC_SIZE;
int encryptSize = packet.getPacket().getLength() - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset();
int encryptSize = packet.getPacketDataLength()/*packet.getPacket().getLength()*/ - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset();
byte data[] = packet.getPacket().getData();
_context.aes().encrypt(data, encryptOffset, data, encryptOffset, cipherKey, iv.getData(), encryptSize);
@ -434,7 +485,7 @@ public class PacketBuilder {
Hash hmac = _context.hmac().calculate(macKey, data, hmacOff, hmacLen);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Authenticating " + packet.getPacket().getLength() +
_log.debug("Authenticating " + packet.getPacketDataLength() + // packet.getPacket().getLength() +
"\nIV: " + Base64.encode(iv.getData()) +
"\nraw mac: " + hmac.toBase64() +
"\nMAC key: " + macKey.toBase64());

View File

@ -41,6 +41,7 @@ public class PacketHandler {
_context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.droppedInvalid", "How old the packet we dropped due to invalidity was", "udp", new long[] { 10*60*1000, 60*60*1000 });
}
public void startup() {
@ -65,11 +66,17 @@ public class PacketHandler {
public void run() {
while (_keepReading) {
UDPPacket packet = _endpoint.receive();
if (packet == null) continue; // keepReading is probably false...
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received the packet " + packet);
long queueTime = packet.getLifetime();
long handleStart = _context.clock().now();
handlePacket(_reader, packet);
try {
handlePacket(_reader, packet);
} catch (Exception e) {
if (_log.shouldLog(Log.ERROR))
_log.error("Crazy error handling a packet: " + packet, e);
}
long handleTime = _context.clock().now() - handleStart;
_context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime());
_context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime());
@ -151,6 +158,7 @@ public class PacketHandler {
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP");
_context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration());
}
return;
}
@ -169,6 +177,7 @@ public class PacketHandler {
if (!isValid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid introduction packet received: " + packet, new Exception("path"));
_context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration());
return;
} else {
if (_log.shouldLog(Log.INFO))
@ -214,6 +223,8 @@ public class PacketHandler {
// ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets
receivePacket(reader, packet);
} else {
_context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration());
}
}
@ -272,16 +283,18 @@ public class PacketHandler {
if (skew > GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration());
return;
} else if (skew < 0 - GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN))
_log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration());
return;
}
if (state != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received packet from " + state.getRemoteHostString() + " with skew " + skew);
_log.debug("Received packet from " + state.getRemoteHostId().toString() + " with skew " + skew);
state.adjustClockSkew((short)skew);
}
@ -289,7 +302,7 @@ public class PacketHandler {
InetAddress fromHost = packet.getPacket().getAddress();
int fromPort = packet.getPacket().getPort();
String from = PeerState.calculateRemoteHostString(fromHost.getAddress(), fromPort);
RemoteHostId from = new RemoteHostId(fromHost.getAddress(), fromPort);
switch (reader.readPayloadType()) {
case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST:
@ -311,6 +324,7 @@ public class PacketHandler {
default:
if (_log.shouldLog(Log.WARN))
_log.warn("Unknown payload type: " + reader.readPayloadType());
_context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration());
return;
}
}

View File

@ -103,8 +103,8 @@ public class PeerState {
private transient InetAddress _remoteIPAddress;
/** what port is the peer sending and receiving packets on? */
private int _remotePort;
/** cached remoteIP + port, used to find the peerState by remote info */
private String _remoteHostString;
/** cached RemoteHostId, used to find the peerState by remote info */
private RemoteHostId _remoteHostId;
/** if we need to contact them, do we need to talk to an introducer? */
private boolean _remoteRequiresIntroduction;
/**
@ -129,8 +129,20 @@ public class PeerState {
/** current retransmission timeout */
private volatile int _rto;
/** how many packets will be considered within the retransmission rate calculation */
static final long RETRANSMISSION_PERIOD_WIDTH = 100;
private long _messagesReceived;
private long _messagesSent;
private long _packetsTransmitted;
/** how many packets were retransmitted within the last RETRANSMISSION_PERIOD_WIDTH packets */
private long _packetsRetransmitted;
private int _packetRetransmissionRate;
/** what was the $packetsTransmitted when the current RETRANSMISSION_PERIOD_WIDTH began */
private long _retransmissionPeriodStart;
/** how many dup packets were received within the last RETRANSMISSION_PERIOD_WIDTH packets */
private long _packetsReceivedDuplicate;
private long _packetsReceived;
private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
@ -141,8 +153,8 @@ public class PeerState {
* packets.
*/
private static final int DEFAULT_MTU = 576;
private static final int MIN_RTO = 600;
private static final int MAX_RTO = 5000;
private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY;
private static final int MAX_RTO = 2000; // 5000;
public PeerState(I2PAppContext ctx) {
_context = ctx;
@ -181,11 +193,20 @@ public class PeerState {
_lastACKSend = -1;
_rtt = 1000;
_rttDeviation = _rtt;
_rto = 6000;
_rto = MAX_RTO;
_messagesReceived = 0;
_messagesSent = 0;
_packetsTransmitted = 0;
_packetsRetransmitted = 0;
_packetRetransmissionRate = 0;
_retransmissionPeriodStart = 0;
_packetsReceived = 0;
_packetsReceivedDuplicate = 0;
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
/**
@ -377,7 +398,7 @@ public class PeerState {
_remoteIP = ip;
_remoteIPAddress = null;
_remotePort = port;
_remoteHostString = calculateRemoteHostString(ip, port);
_remoteHostId = new RemoteHostId(ip, port);
}
/** if we need to contact them, do we need to talk to an introducer? */
public void setRemoteRequiresIntroduction(boolean required) { _remoteRequiresIntroduction = required; }
@ -403,6 +424,14 @@ public class PeerState {
public void messageFullyReceived(Long messageId, int bytes) {
if (bytes > 0)
_receiveBytes += bytes;
else {
if (_retransmissionPeriodStart + RETRANSMISSION_PERIOD_WIDTH < _packetsReceived) {
_packetsReceivedDuplicate++;
} else {
_retransmissionPeriodStart = _packetsReceived;
_packetsReceivedDuplicate = 1;
}
}
long now = _context.clock().now();
long duration = now - _receivePeriodBegin;
@ -410,6 +439,7 @@ public class PeerState {
_receiveBps = (int)(0.9f*(float)_receiveBps + 0.1f*((float)_receiveBytes * (1000f/(float)duration)));
_receiveBytes = 0;
_receivePeriodBegin = now;
_context.statManager().addRateData("udp.receiveBps", _receiveBps, 0);
}
synchronized (_currentACKs) {
@ -421,6 +451,11 @@ public class PeerState {
_messagesReceived++;
}
public void messagePartiallyReceived() {
if (_wantACKSendSince <= 0)
_wantACKSendSince = _context.clock().now();
}
/**
* either they told us to back off, or we had to resend to get
* the data through.
@ -445,25 +480,65 @@ public class PeerState {
return true;
}
/** pull off the ACKs (Long) to send to the peer */
public List retrieveACKs() {
List rv = null;
int threshold = countMaxACKs();
/**
* grab a list of ACKBitfield instances, some of which may fully
* ACK a message while others may only partially ACK a message.
* the values returned are limited in size so that they will fit within
* the peer's current MTU as an ACK - as such, not all messages may be
* ACKed with this call. Be sure to check getWantedACKSendSince() which
* will be unchanged if there are ACKs remaining.
*
*/
public List retrieveACKBitfields(UDPTransport.PartialACKSource partialACKSource) {
List rv = new ArrayList(16);
int bytesRemaining = countMaxACKData();
synchronized (_currentACKs) {
if (_currentACKs.size() < threshold) {
rv = new ArrayList(_currentACKs);
_currentACKs.clear();
while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) {
rv.add(new FullACKBitfield((Long)_currentACKs.remove(0)));
bytesRemaining -= 4;
}
if (_currentACKs.size() <= 0)
_wantACKSendSince = -1;
} else {
rv = new ArrayList(threshold);
for (int i = 0; i < threshold; i++)
rv.add(_currentACKs.remove(0));
}
int partialIncluded = 0;
if ( (bytesRemaining > 4) && (partialACKSource != null) ) {
// ok, there's room to *try* to fit in some partial ACKs, so
// we should try to find some packets to partially ACK
// (preferably the ones which have the most received fragments)
List partial = new ArrayList();
partialACKSource.fetchPartialACKs(_remotePeer, partial);
// we may not be able to use them all, but lets try...
for (int i = 0; (bytesRemaining > 4) && (i < partial.size()); i++) {
ACKBitfield bitfield = (ACKBitfield)partial.get(i);
int bytes = (bitfield.fragmentCount() / 7) + 1;
if (bytesRemaining > bytes + 4) { // msgId + bitfields
rv.add(bitfield);
bytesRemaining -= bytes + 4;
partialIncluded++;
} else {
// continue on to another partial, in case there's a
// smaller one that will fit
}
}
}
_context.statManager().addRateData("udp.sendACKPartial", partialIncluded, rv.size() - partialIncluded);
_lastACKSend = _context.clock().now();
return rv;
}
/** represent a full ACK of a message */
private class FullACKBitfield implements ACKBitfield {
private long _msgId;
public FullACKBitfield(Long id) { _msgId = id.longValue(); }
public int fragmentCount() { return 0; }
public long getMessageId() { return _msgId; }
public boolean received(int fragmentNum) { return true; }
public boolean receivedComplete() { return true; }
public String toString() { return "Full ACK of " + _msgId; }
}
/** we sent a message which was ACKed containing the given # of bytes */
public void messageACKed(int bytesACKed, long lifetime, int numSends) {
_consecutiveFailedSends = 0;
@ -489,27 +564,52 @@ public class PeerState {
recalculateTimeouts(lifetime);
else
_log.warn("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
_context.statManager().addRateData("udp.sendBps", _sendBps, lifetime);
}
/** adjust the tcp-esque timeouts */
private void recalculateTimeouts(long lifetime) {
_rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation));
_rtt = (int)((float)_rtt*(0.9f) + (0.1f)*(float)lifetime);
// the faster we are going, the slower we want to reduce the rtt
float scale = 0.1f;
if (_sendBps > 0)
scale = ((float)lifetime) / (float)((float)lifetime + (float)_sendBps);
if (scale < 0.001f) scale = 0.001f;
_rtt = (int)(((float)_rtt)*(1.0f-scale) + (scale)*(float)lifetime);
_rto = _rtt + (_rttDeviation<<2);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt
+ " rttDev=" + _rttDeviation + " rto=" + _rto);
if (_rto < MIN_RTO)
_rto = MIN_RTO;
if (_rto < minRTO())
_rto = minRTO();
if (_rto > MAX_RTO)
_rto = MAX_RTO;
}
/** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted() {
public void messageRetransmitted(int packets) {
if (_retransmissionPeriodStart + RETRANSMISSION_PERIOD_WIDTH < _packetsTransmitted) {
_packetsRetransmitted += packets;
} else {
_packetRetransmissionRate = (int)((float)(0.9f*_packetRetransmissionRate) + (float)(0.1f*_packetsRetransmitted));
_retransmissionPeriodStart = _packetsTransmitted;
_packetsRetransmitted = packets;
}
congestionOccurred();
_context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation);
//_rto *= 2;
}
public void packetsTransmitted(int packets) {
_packetsTransmitted += packets;
if (_retransmissionPeriodStart + RETRANSMISSION_PERIOD_WIDTH > _packetsTransmitted) {
_packetRetransmissionRate = (int)((float)(0.9f*_packetRetransmissionRate) + (float)(0.1f*_packetsRetransmitted));
_retransmissionPeriodStart = _packetsTransmitted;
_packetsRetransmitted = 0;
}
}
/** how long does it usually take to get a message ACKed? */
public int getRTT() { return _rtt; }
/** how soon should we retransmit an unacked packet? */
@ -519,6 +619,13 @@ public class PeerState {
public long getMessagesSent() { return _messagesSent; }
public long getMessagesReceived() { return _messagesReceived; }
public long getPacketsTransmitted() { return _packetsTransmitted; }
public long getPacketsRetransmitted() { return _packetsRetransmitted; }
/** avg number of packets retransmitted for every 100 packets */
public long getPacketRetransmissionRate() { return _packetRetransmissionRate; }
public long getPacketsReceived() { return _packetsReceived; }
public long getPacketsReceivedDuplicate() { return _packetsReceivedDuplicate; }
public void packetReceived() { _packetsReceived++; }
/**
* we received a backoff request, so cut our send window
@ -538,13 +645,13 @@ public class PeerState {
public void setLastACKSend(long when) { _lastACKSend = when; }
public long getWantedACKSendSince() { return _wantACKSendSince; }
public boolean unsentACKThresholdReached() {
int threshold = countMaxACKs();
int threshold = countMaxACKData() / 4;
synchronized (_currentACKs) {
return _currentACKs.size() >= threshold;
}
}
private int countMaxACKs() {
return (_mtu
private int countMaxACKData() {
return _mtu
- OutboundMessageFragments.IP_HEADER_SIZE
- OutboundMessageFragments.UDP_HEADER_SIZE
- UDPPacket.IV_SIZE
@ -553,25 +660,19 @@ public class PeerState {
- 4 // timestamp
- 1 // data flag
- 1 // # ACKs
- 16 // padding safety
) / 4;
- 16; // padding safety
}
public String getRemoteHostString() { return _remoteHostString; }
public static String calculateRemoteHostString(byte ip[], int port) {
StringBuffer buf = new StringBuffer(ip.length * 4 + 5);
for (int i = 0; i < ip.length; i++)
buf.append(ip[i]&0xFF).append('.');
buf.append(port);
return buf.toString();
private int minRTO() {
if (_packetRetransmissionRate < 10)
return MIN_RTO;
else if (_packetRetransmissionRate < 50)
return 2*MIN_RTO;
else
return MAX_RTO;
}
public static String calculateRemoteHostString(UDPPacket packet) {
InetAddress remAddr = packet.getPacket().getAddress();
int remPort = packet.getPacket().getPort();
return calculateRemoteHostString(remAddr.getAddress(), remPort);
}
public RemoteHostId getRemoteHostId() { return _remoteHostId; }
public int hashCode() {
if (_remotePeer != null)
@ -594,7 +695,7 @@ public class PeerState {
public String toString() {
StringBuffer buf = new StringBuffer(64);
buf.append(_remoteHostString);
buf.append(_remoteHostId.toString());
if (_remotePeer != null)
buf.append(" ").append(_remotePeer.toBase64().substring(0,6));
return buf.toString();

View File

@ -0,0 +1,46 @@
package net.i2p.router.transport.udp;
import net.i2p.data.DataHelper;
/**
* Unique ID for a peer - its IP + port, all bundled into a tidy obj.
* Aint it cute?
*
*/
final class RemoteHostId {
private byte _ip[];
private int _port;
public RemoteHostId(byte ip[], int port) {
_ip = ip;
_port = port;
}
public byte[] getIP() { return _ip; }
public int getPort() { return _port; }
public int hashCode() {
int rv = 0;
for (int i = 0; i < _ip.length; i++)
rv += _ip[i] << i;
rv += _port;
return rv;
}
public boolean equals(Object obj) {
if (obj == null)
throw new NullPointerException("obj is null");
if (!(obj instanceof RemoteHostId))
throw new ClassCastException("obj is a " + obj.getClass().getName());
RemoteHostId id = (RemoteHostId)obj;
return (_port == id.getPort()) && DataHelper.eq(_ip, id.getIP());
}
public String toString() {
StringBuffer buf = new StringBuffer(_ip.length + 5);
for (int i = 0; i < _ip.length; i++)
buf.append(_ip[i]&0xFF).append('.');
buf.append(_port);
return buf.toString();
}
}

View File

@ -2,6 +2,7 @@ package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -74,7 +75,7 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound
_alive = true;
_nextLock = this;
_nextQueue = 0;
_chokedPeers = new HashSet(16);
_chokedPeers = Collections.synchronizedSet(new HashSet(16));
_listener = lsnr;
_context.statManager().createRateStat("udp.timeToEntrance", "Message lifetime until it reaches the UDP system", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.messageQueueSize", "How many messages are on the current class queue at removal", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -123,10 +124,9 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound
for (int j = 0; j < _queue[currentQueue].size(); j++) {
OutNetMessage msg = (OutNetMessage)_queue[currentQueue].get(j);
Hash to = msg.getTarget().getIdentity().getHash();
synchronized (_nextLock) { // yikes!
if (_chokedPeers.contains(to))
continue;
}
if (_chokedPeers.contains(to))
continue;
// not choked, lets push it to active
_queue[currentQueue].remove(j);
@ -189,22 +189,20 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound
public void choke(Hash peer) {
if (true) return;
_chokedPeers.add(peer);
synchronized (_nextLock) {
_chokedPeers.add(peer);
_nextLock.notifyAll();
}
}
public void unchoke(Hash peer) {
if (true) return;
_chokedPeers.remove(peer);
synchronized (_nextLock) {
_chokedPeers.remove(peer);
_nextLock.notifyAll();
}
}
public boolean isChoked(Hash peer) {
synchronized (_nextLock) {
return _chokedPeers.contains(peer);
}
return _chokedPeers.contains(peer);
}
private int pickQueue(OutNetMessage message) {

View File

@ -26,6 +26,8 @@ public class UDPEndpoint {
}
public void startup() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up the UDP endpoint");
shutdown();
try {
DatagramSocket socket = new DatagramSocket(_listenPort);

View File

@ -3,7 +3,12 @@ package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import net.i2p.data.ByteArray;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@ -16,20 +21,22 @@ public class UDPEndpointTest {
private Log _log;
private UDPEndpoint _endpoints[];
private boolean _beginTest;
private List _sentNotReceived;
public UDPEndpointTest(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(UDPEndpointTest.class);
_sentNotReceived = Collections.synchronizedList(new ArrayList(1000));
}
public void runTest(int numPeers) {
RouterContext ctx = new RouterContext(null);
_log.debug("Run test("+numPeers+")");
try {
_endpoints = new UDPEndpoint[numPeers];
int base = 2000 + ctx.random().nextInt(10000);
int base = 2000 + _context.random().nextInt(10000);
for (int i = 0; i < numPeers; i++) {
_log.debug("Building " + i);
UDPEndpoint endpoint = new UDPEndpoint(ctx, base + i);
UDPEndpoint endpoint = new UDPEndpoint(_context, base + i);
_endpoints[i] = endpoint;
endpoint.startup();
I2PThread read = new I2PThread(new TestRead(endpoint), "Test read " + i);
@ -62,11 +69,20 @@ public class UDPEndpointTest {
int received = 0;
while (true) {
UDPPacket packet = _endpoint.receive();
received++;
if (received == 10000) {
long time = System.currentTimeMillis() - start;
_log.debug("Received 10000 in " + time);
ByteArray ba = new ByteArray(packet.getPacket().getData(), 0, packet.getPacket().getLength());
boolean removed = _sentNotReceived.remove(ba);
int outstanding = _sentNotReceived.size();
if (!removed) {
_log.error("Received a packet that we weren't expecting: " + packet);
} else {
_log.debug("Received an expected packet (" + received + ") with outstanding: " + outstanding);
received++;
}
if ((received % 10000) == 0) {
long time = System.currentTimeMillis() - start;
_log.debug("Received "+received+" in " + time);
}
packet.release();
}
}
}
@ -78,8 +94,9 @@ public class UDPEndpointTest {
}
public void run() {
while (!_beginTest) {
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
try { Thread.sleep(2000); } catch (InterruptedException ie) {}
}
try { Thread.sleep(2000); } catch (InterruptedException ie) {}
_log.debug("Beginning to write");
for (int curPacket = 0; curPacket < 10000; curPacket++) {
byte data[] = new byte[1024];
@ -91,11 +108,16 @@ public class UDPEndpointTest {
curPeer = 0;
short priority = 1;
long expiration = -1;
UDPPacket packet = UDPPacket.acquire(_context);
try {
UDPPacket packet = UDPPacket.acquire(_context);
packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort());
packet.writeData(data, 0, 1024);
packet.setPacketDataLength(1024);
int outstanding = _sentNotReceived.size() + 1;
_sentNotReceived.add(new ByteArray(data, 0, 1024));
_log.debug("Sending packet " + curPacket + " with outstanding " + outstanding);
_endpoint.send(packet);
//try { Thread.sleep(10); } catch (InterruptedException ie) {}
} catch (UnknownHostException uhe) {
_log.error("foo!", uhe);
}
@ -103,11 +125,18 @@ public class UDPEndpointTest {
// _log.debug("Sent to " + _endpoints[curPeer].getListenPort() + " from " + _endpoint.getListenPort());
//}
}
try { Thread.sleep(10*1000); } catch (InterruptedException e) {}
System.exit(0);
}
}
public static void main(String args[]) {
UDPEndpointTest test = new UDPEndpointTest(new RouterContext(null));
try { System.out.println("Current dir: " + new java.io.File(".").getCanonicalPath()); } catch (Exception e) {}
new java.io.File("udpEndpointTest.stats").delete();
Properties props = new Properties();
props.setProperty("stat.logFile", "udpEndpointTest.stats");
props.setProperty("stat.logFilters", "*");
UDPEndpointTest test = new UDPEndpointTest(new RouterContext(null, props));
test.runTest(2);
}
}

View File

@ -31,13 +31,15 @@ class UDPFlooder implements Runnable {
public void addPeer(PeerState peer) {
synchronized (_peers) {
_peers.add(peer);
if (!_peers.contains(peer))
_peers.add(peer);
_peers.notifyAll();
}
}
public void removePeer(PeerState peer) {
synchronized (_peers) {
_peers.remove(peer);
while (_peers.remove(peer))
;// loops until its empty
_peers.notifyAll();
}
}

View File

@ -25,13 +25,17 @@ import net.i2p.util.Log;
public class UDPPacket {
private I2PAppContext _context;
private static Log _log;
private DatagramPacket _packet;
private short _priority;
private long _initializeTime;
private long _expiration;
private byte[] _data;
private ByteArray _dataBuf;
private int _markedType;
private volatile DatagramPacket _packet;
private volatile int _packetDataLength;
private volatile short _priority;
private volatile long _initializeTime;
private volatile long _expiration;
private volatile byte[] _data;
private volatile ByteArray _dataBuf;
private volatile int _markedType;
private volatile boolean _released;
private volatile Exception _releasedBy;
private volatile Exception _acquiredBy;
private static final List _packetCache;
static {
@ -39,9 +43,9 @@ public class UDPPacket {
_log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class);
}
private static final boolean CACHE = false;
private static final boolean CACHE = false; // TODO: support caching to cut churn down a /lot/
private static final int MAX_PACKET_SIZE = 2048;
static final int MAX_PACKET_SIZE = 2048;
public static final int IV_SIZE = 16;
public static final int MAC_SIZE = 16;
@ -55,13 +59,14 @@ public class UDPPacket {
// various flag fields for use in the data packets
public static final byte DATA_FLAG_EXPLICIT_ACK = (byte)(1 << 7);
public static final byte DATA_FLAG_EXPLICIT_NACK = (1 << 6);
public static final byte DATA_FLAG_NUMACKS = (1 << 5);
public static final byte DATA_FLAG_ACK_BITFIELDS = (1 << 6);
public static final byte DATA_FLAG_ECN = (1 << 4);
public static final byte DATA_FLAG_WANT_ACKS = (1 << 3);
public static final byte DATA_FLAG_WANT_REPLY = (1 << 2);
public static final byte DATA_FLAG_EXTENDED = (1 << 1);
public static final byte BITFIELD_CONTINUATION = (byte)(1 << 7);
private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE;
private static final ByteCache _validateCache = ByteCache.getInstance(16, MAX_VALIDATE_SIZE);
private static final ByteCache _ivCache = ByteCache.getInstance(16, IV_SIZE);
@ -72,34 +77,46 @@ public class UDPPacket {
_dataBuf = _dataCache.acquire();
_data = _dataBuf.getData();
_packet = new DatagramPacket(_data, MAX_PACKET_SIZE);
_packetDataLength = 0;
_initializeTime = _context.clock().now();
_markedType = -1;
}
public void initialize(short priority, long expiration, InetAddress host, int port) {
_priority = priority;
public void initialize(int priority, long expiration, InetAddress host, int port) {
_priority = (short)priority;
_expiration = expiration;
resetBegin();
Arrays.fill(_data, (byte)0x00);
_packet.setLength(0);
//_packet.setLength(0);
_packet.setAddress(host);
_packet.setPort(port);
_released = false;
_releasedBy = null;
}
public void writeData(byte src[], int offset, int len) {
verifyNotReleased();
System.arraycopy(src, offset, _data, 0, len);
_packet.setLength(len);
setPacketDataLength(len);
resetBegin();
}
public DatagramPacket getPacket() { return _packet; }
public short getPriority() { return _priority; }
public long getExpiration() { return _expiration; }
public long getBegin() { return _initializeTime; }
public long getLifetime() { return _context.clock().now() - _initializeTime; }
public DatagramPacket getPacket() { verifyNotReleased(); return _packet; }
public short getPriority() { verifyNotReleased(); return _priority; }
public long getExpiration() { verifyNotReleased(); return _expiration; }
public long getBegin() { verifyNotReleased(); return _initializeTime; }
public long getLifetime() { verifyNotReleased(); return _context.clock().now() - _initializeTime; }
public int getPacketDataLength() { return _packetDataLength; }
public void setPacketDataLength(int bytes) { _packetDataLength = bytes; }
public void resetBegin() { _initializeTime = _context.clock().now(); }
/** flag this packet as a particular type for accounting purposes */
public void markType(int type) { _markedType = type; }
public int getMarkedType() { return _markedType; }
public void markType(int type) { verifyNotReleased(); _markedType = type; }
/**
* flag this packet as a particular type for accounting purposes, with
* 1 implying the packet is an ACK, otherwise it is a data packet
*
*/
public int getMarkedType() { verifyNotReleased(); return _markedType; }
/**
* Validate the packet against the MAC specified, returning true if the
@ -107,13 +124,14 @@ public class UDPPacket {
*
*/
public boolean validate(SessionKey macKey) {
verifyNotReleased();
boolean eq = false;
ByteArray buf = _validateCache.acquire();
// validate by comparing _data[0:15] and
// HMAC(payload + IV + payloadLength, macKey)
int payloadLength = _packet.getLength() - MAC_SIZE - IV_SIZE;
int payloadLength = _packetDataLength /*_packet.getLength()*/ - MAC_SIZE - IV_SIZE;
if (payloadLength > 0) {
int off = 0;
System.arraycopy(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, buf.getData(), off, payloadLength);
@ -123,6 +141,8 @@ public class UDPPacket {
DataHelper.toLong(buf.getData(), off, 2, payloadLength);
off += 2;
eq = _context.hmac().verify(macKey, buf.getData(), 0, off, _data, _packet.getOffset(), MAC_SIZE);
/*
Hash hmac = _context.hmac().calculate(macKey, buf.getData(), 0, off);
if (_log.shouldLog(Log.DEBUG)) {
@ -139,6 +159,7 @@ public class UDPPacket {
_log.debug(str.toString());
}
eq = DataHelper.eq(hmac.getData(), 0, _data, _packet.getOffset(), MAC_SIZE);
*/
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Payload length is " + payloadLength);
@ -154,47 +175,83 @@ public class UDPPacket {
*
*/
public void decrypt(SessionKey cipherKey) {
verifyNotReleased();
ByteArray iv = _ivCache.acquire();
System.arraycopy(_data, MAC_SIZE, iv.getData(), 0, IV_SIZE);
_context.aes().decrypt(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, _data, _packet.getOffset() + MAC_SIZE + IV_SIZE, cipherKey, iv.getData(), _packet.getLength() - MAC_SIZE - IV_SIZE);
int len = _packetDataLength; // _packet.getLength()
_context.aes().decrypt(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, _data, _packet.getOffset() + MAC_SIZE + IV_SIZE, cipherKey, iv.getData(), len - MAC_SIZE - IV_SIZE);
_ivCache.release(iv);
}
public String toString() {
verifyNotReleased();
StringBuffer buf = new StringBuffer(64);
buf.append(_packet.getLength());
buf.append(_packetDataLength);
buf.append(" byte packet with ");
buf.append(_packet.getAddress().getHostAddress()).append(":");
buf.append(_packet.getPort());
buf.append(" id=").append(System.identityHashCode(this));
buf.append(" data=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength()));
return buf.toString();
}
public static UDPPacket acquire(I2PAppContext ctx) {
UDPPacket rv = null;
if (CACHE) {
synchronized (_packetCache) {
if (_packetCache.size() > 0) {
UDPPacket rv = (UDPPacket)_packetCache.remove(0);
rv._context = ctx;
rv._log = ctx.logManager().getLog(UDPPacket.class);
rv.resetBegin();
Arrays.fill(rv._data, (byte)0x00);
rv._markedType = -1;
return rv;
rv = (UDPPacket)_packetCache.remove(0);
}
}
/*
if (rv != null) {
rv._context = ctx;
//rv._log = ctx.logManager().getLog(UDPPacket.class);
rv.resetBegin();
Arrays.fill(rv._data, (byte)0x00);
rv._markedType = -1;
rv._dataBuf.setValid(0);
rv._released = false;
rv._releasedBy = null;
rv._acquiredBy = null;
rv.setPacketDataLength(0);
synchronized (rv._packet) {
//rv._packet.setLength(0);
//rv._packet.setPort(1);
}
}
*/
}
return new UDPPacket(ctx);
if (rv == null)
rv = new UDPPacket(ctx);
//if (rv._acquiredBy != null) {
// _log.log(Log.CRIT, "Already acquired! current stack trace is:", new Exception());
// _log.log(Log.CRIT, "Earlier acquired:", rv._acquiredBy);
//}
//rv._acquiredBy = new Exception("acquired on");
return rv;
}
public void release() {
_dataCache.release(_dataBuf);
verifyNotReleased();
_released = true;
//_releasedBy = new Exception("released by");
//_acquiredBy = null;
//_dataCache.release(_dataBuf);
if (!CACHE) return;
synchronized (_packetCache) {
_packet.setLength(0);
_packet.setPort(1);
if (_packetCache.size() <= 64)
if (_packetCache.size() <= 64) {
_packetCache.add(this);
}
}
}
private void verifyNotReleased() {
if (CACHE) return;
if (_released) {
_log.log(Log.CRIT, "Already released. current stack trace is:", new Exception());
_log.log(Log.CRIT, "Released by: ", _releasedBy);
_log.log(Log.CRIT, "Acquired by: ", _acquiredBy);
}
}
}

View File

@ -37,7 +37,7 @@ public class UDPPacketReader {
public void initialize(UDPPacket packet) {
int off = packet.getPacket().getOffset();
int len = packet.getPacket().getLength();
int len = packet.getPacketDataLength(); //packet.getPacket().getLength();
off += UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
len -= UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
initialize(packet.getPacket().getData(), off, len);
@ -234,11 +234,8 @@ public class UDPPacketReader {
public boolean readACKsIncluded() {
return flagSet(UDPPacket.DATA_FLAG_EXPLICIT_ACK);
}
public boolean readNACKsIncluded() {
return flagSet(UDPPacket.DATA_FLAG_EXPLICIT_NACK);
}
public boolean readNumACKsIncluded() {
return flagSet(UDPPacket.DATA_FLAG_NUMACKS);
public boolean readACKBitfieldsIncluded() {
return flagSet(UDPPacket.DATA_FLAG_ACK_BITFIELDS);
}
public boolean readECN() {
return flagSet(UDPPacket.DATA_FLAG_ECN);
@ -264,8 +261,8 @@ public class UDPPacketReader {
}
return rv;
}
public long[] readNACKs() {
if (!readNACKsIncluded()) return null;
public ACKBitfield[] readACKBitfields() {
if (!readACKBitfieldsIncluded()) return null;
int off = readBodyOffset() + 1;
if (readACKsIncluded()) {
int numACKs = (int)DataHelper.fromLong(_message, off, 1);
@ -273,31 +270,16 @@ public class UDPPacketReader {
off += 4 * numACKs;
}
int numNACKs = (int)DataHelper.fromLong(_message, off, 1);
int numBitfields = (int)DataHelper.fromLong(_message, off, 1);
off++;
long rv[] = new long[numNACKs];
for (int i = 0; i < numNACKs; i++) {
rv[i] = DataHelper.fromLong(_message, off, 4);
off += 4;
PacketACKBitfield rv[] = new PacketACKBitfield[numBitfields];
for (int i = 0; i < numBitfields; i++) {
rv[i] = new PacketACKBitfield(off);
off += rv[i].getByteLength();
}
return rv;
}
public int readNumACKs() {
if (!readNumACKsIncluded()) return -1;
int off = readBodyOffset() + 1;
if (readACKsIncluded()) {
int numACKs = (int)DataHelper.fromLong(_message, off, 1);
off++;
off += 4 * numACKs;
}
if (readNACKsIncluded()) {
int numNACKs = (int)DataHelper.fromLong(_message, off, 1);
off++;
off += 4 * numNACKs;
}
return (int)DataHelper.fromLong(_message, off, 2);
}
public int readFragmentCount() {
int off = readBodyOffset() + 1;
@ -306,13 +288,15 @@ public class UDPPacketReader {
off++;
off += 4 * numACKs;
}
if (readNACKsIncluded()) {
int numNACKs = (int)DataHelper.fromLong(_message, off, 1);
if (readACKBitfieldsIncluded()) {
int numBitfields = (int)DataHelper.fromLong(_message, off, 1);
off++;
off += 4 * numNACKs;
for (int i = 0; i < numBitfields; i++) {
PacketACKBitfield bf = new PacketACKBitfield(off);
off += bf.getByteLength();
}
}
if (readNumACKsIncluded())
off += 2;
if (readExtendedDataIncluded()) {
int size = (int)DataHelper.fromLong(_message, off, 1);
off++;
@ -328,24 +312,24 @@ public class UDPPacketReader {
public int readMessageFragmentNum(int fragmentNum) {
int off = getFragmentBegin(fragmentNum);
off += 4; // messageId
return (_message[off] & 0xFF) >>> 3;
return (_message[off] & 0xFF) >>> 1;
}
public boolean readMessageIsLast(int fragmentNum) {
int off = getFragmentBegin(fragmentNum);
off += 4; // messageId
return ((_message[off] & (1 << 2)) != 0);
return ((_message[off] & 1) != 0);
}
public int readMessageFragmentSize(int fragmentNum) {
int off = getFragmentBegin(fragmentNum);
off += 4; // messageId
off++; // fragment info
return (int)DataHelper.fromLong(_message, off, 2);
return ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
}
public void readMessageFragment(int fragmentNum, byte target[], int targetOffset) {
int off = getFragmentBegin(fragmentNum);
off += 4; // messageId
off++; // fragment info
int size = (int)DataHelper.fromLong(_message, off, 2);
int size = ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
off += 2;
System.arraycopy(_message, off, target, targetOffset, size);
}
@ -357,13 +341,16 @@ public class UDPPacketReader {
off++;
off += 4 * numACKs;
}
if (readNACKsIncluded()) {
int numNACKs = (int)DataHelper.fromLong(_message, off, 1);
if (readACKBitfieldsIncluded()) {
int numBitfields = (int)DataHelper.fromLong(_message, off, 1);
off++;
off += 5 * numNACKs;
PacketACKBitfield bf[] = new PacketACKBitfield[numBitfields];
for (int i = 0; i < numBitfields; i++) {
bf[i] = new PacketACKBitfield(off);
off += bf[i].getByteLength();
}
}
if (readNumACKsIncluded())
off += 2;
if (readExtendedDataIncluded()) {
int size = (int)DataHelper.fromLong(_message, off, 1);
off++;
@ -376,7 +363,7 @@ public class UDPPacketReader {
} else {
for (int i = 0; i < fragmentNum; i++) {
off += 5; // messageId+info
off += (int)DataHelper.fromLong(_message, off, 2);
off += ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
off += 2;
}
return off;
@ -405,21 +392,16 @@ public class UDPPacketReader {
off += 4;
}
}
if (readNACKsIncluded()) {
int numNACKs = (int)DataHelper.fromLong(_message, off, 1);
if (readACKBitfieldsIncluded()) {
int numBitfields = (int)DataHelper.fromLong(_message, off, 1);
off++;
buf.append("with NACKs for ");
for (int i = 0; i < numNACKs; i++) {
buf.append(DataHelper.fromLong(_message, off, 4)).append(' ');
off += 5;
buf.append("with partial ACKs for ");
for (int i = 0; i < numBitfields; i++) {
PacketACKBitfield bf = new PacketACKBitfield(off);
buf.append(bf.getMessageId()).append(' ');
off += bf.getByteLength();
}
off += 5 * numNACKs;
}
if (readNumACKsIncluded()) {
buf.append("with numACKs of ");
buf.append(DataHelper.fromLong(_message, off, 2));
buf.append(' ');
off += 2;
}
if (readExtendedDataIncluded()) {
int size = (int)DataHelper.fromLong(_message, off, 1);
@ -440,13 +422,13 @@ public class UDPPacketReader {
buf.append("containing messageId ");
buf.append(DataHelper.fromLong(_message, off, 4));
off += 4;
int fragNum = (_message[off] & 0XFF) >>> 3;
boolean isLast = (_message[off] & (1 << 2)) != 0;
int fragNum = (_message[off] & 0xFF) >>> 1;
boolean isLast = (_message[off] & 1) != 0;
off++;
buf.append(" frag# ").append(fragNum);
buf.append(" isLast? ").append(isLast);
buf.append(" info ").append((int)_message[off-1]);
int size = (int)DataHelper.fromLong(_message, off, 2);
int size = ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
buf.append(" with ").append(size).append(" bytes");
buf.append(' ');
off += size;
@ -463,9 +445,50 @@ public class UDPPacketReader {
int off = getFragmentBegin(0); // first fragment
off += 4; // messageId
off++; // fragment info
int size = (int)DataHelper.fromLong(_message, off, 2);
int size = ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
off += 2;
buf.append(Base64.encode(_message, off, size));
}
}
/**
* Helper class to fetch the particular bitfields from the raw packet
*/
private class PacketACKBitfield implements ACKBitfield {
private int _start;
private int _bitfieldStart;
private int _bitfieldSize;
public PacketACKBitfield(int start) {
_start = start;
_bitfieldStart = start + 4;
_bitfieldSize = 1;
// bitfield is an array of bytes where the high bit is 1 if
// further bytes in the bitfield follow
while ((_message[_bitfieldStart + _bitfieldSize - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0)
_bitfieldSize++;
}
public long getMessageId() { return DataHelper.fromLong(_message, _start, 4); }
public int getByteLength() { return 4 + _bitfieldSize; }
public int fragmentCount() { return _bitfieldSize * 7; }
public boolean receivedComplete() { return false; }
public boolean received(int fragmentNum) {
if ( (fragmentNum < 0) || (fragmentNum >= _bitfieldSize*7) )
return false;
// the fragment has been received if the bit is set
int byteNum = _bitfieldStart + (fragmentNum/7);
int flagNum = fragmentNum % 7;
return (_message[byteNum] & (1 << flagNum)) != 0x0;
}
public String toString() {
StringBuffer buf = new StringBuffer(64);
buf.append("Read partial ACK of ");
buf.append(getMessageId());
buf.append(" with ACKs for: ");
int numFrags = fragmentCount();
for (int i = 0; i < numFrags; i++)
if (received(i))
buf.append(i).append(" ");
return buf.toString();
}
}
}

View File

@ -8,6 +8,7 @@ import java.util.ArrayList;
import java.util.List;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.util.SimpleTimer;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@ -66,33 +67,50 @@ public class UDPReceiver {
/** if a packet been sitting in the queue for 2 seconds, drop subsequent packets */
private static final long MAX_QUEUE_PERIOD = 2*1000;
private static final float ARTIFICIAL_DROP_PROBABILITY = 0f; //0.02f;
private static final float ARTIFICIAL_DROP_PROBABILITY = 0.0f; // 0.02f; // 0.0f;
private void receive(UDPPacket packet) {
private static final int ARTIFICIAL_DELAY = 0; // 100;
private static final int ARTIFICIAL_DELAY_BASE = 0; //100;
private int receive(UDPPacket packet) {
if (ARTIFICIAL_DROP_PROBABILITY > 0) {
// the first check is to let the compiler optimize away this
// random block on the live system when the probability is == 0
if (_context.random().nextFloat() <= ARTIFICIAL_DROP_PROBABILITY)
return;
return -1;
}
if ( (ARTIFICIAL_DELAY > 0) || (ARTIFICIAL_DELAY_BASE > 0) ) {
SimpleTimer.getInstance().addEvent(new ArtificiallyDelayedReceive(packet), ARTIFICIAL_DELAY_BASE + _context.random().nextInt(ARTIFICIAL_DELAY));
}
return doReceive(packet);
}
private final int doReceive(UDPPacket packet) {
synchronized (_inboundQueue) {
int queueSize = _inboundQueue.size();
if (queueSize > 0) {
long headPeriod = ((UDPPacket)_inboundQueue.get(0)).getLifetime();
if (headPeriod > MAX_QUEUE_PERIOD) {
_context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound packet with " + queueSize + " queued for " + headPeriod);
if (_log.shouldLog(Log.ERROR))
_log.error("Dropping inbound packet with " + queueSize + " queued for " + headPeriod);
_inboundQueue.notifyAll();
return;
return queueSize;
}
}
_inboundQueue.add(packet);
_inboundQueue.notifyAll();
return queueSize + 1;
}
}
private class ArtificiallyDelayedReceive implements SimpleTimer.TimedEvent {
private UDPPacket _packet;
public ArtificiallyDelayedReceive(UDPPacket packet) { _packet = packet; }
public void timeReached() { doReceive(_packet); }
}
/**
* Blocking call to retrieve the next inbound packet, or null if we have
* shut down.
@ -100,15 +118,17 @@ public class UDPReceiver {
*/
public UDPPacket receiveNext() {
while (_keepRunning) {
synchronized (_inboundQueue) {
if (_inboundQueue.size() <= 0) {
try {
try {
synchronized (_inboundQueue) {
if (_inboundQueue.size() > 0) {
UDPPacket rv = (UDPPacket)_inboundQueue.remove(0);
_inboundQueue.notifyAll();
return rv;
} else {
_inboundQueue.wait();
} catch (InterruptedException ie) {}
}
}
if (_inboundQueue.size() > 0)
return (UDPPacket)_inboundQueue.remove(0);
}
} catch (InterruptedException ie) {}
}
return null;
}
@ -125,17 +145,23 @@ public class UDPReceiver {
UDPPacket packet = UDPPacket.acquire(_context);
// block before we read...
if (_log.shouldLog(Log.DEBUG))
_log.debug("Before throttling receive");
while (!_context.throttle().acceptNetworkMessage())
try { Thread.sleep(10); } catch (InterruptedException ie) {}
try {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Before blocking socket.receive");
synchronized (Runner.this) {
_socket.receive(packet.getPacket());
}
int size = packet.getPacket().getLength();
if (_log.shouldLog(Log.DEBUG))
_log.debug("After blocking socket.receive: packet is " + size + " bytes!");
packet.setPacketDataLength(size);
packet.resetBegin();
_context.statManager().addRateData("udp.receivePacketSize", size, 0);
// and block after we know how much we read but before
// we release the packet to the inbound queue
if (size > 0) {
@ -144,7 +170,8 @@ public class UDPReceiver {
req.waitForNextAllocation();
}
receive(packet);
int queued = receive(packet);
_context.statManager().addRateData("udp.receivePacketSize", size, queued);
} catch (IOException ioe) {
if (_socketChanged) {
if (_log.shouldLog(Log.INFO))
@ -156,6 +183,8 @@ public class UDPReceiver {
packet.release();
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stop receiving...");
}
public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) {

View File

@ -43,6 +43,8 @@ public class UDPSender {
}
public void startup() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting the runner: " + _name);
_keepRunning = true;
I2PThread t = new I2PThread(_runner, _name);
t.setDaemon(true);
@ -72,10 +74,12 @@ public class UDPSender {
public int add(UDPPacket packet, int blockTime) {
long expiration = _context.clock().now() + blockTime;
int remaining = -1;
long lifetime = -1;
while ( (_keepRunning) && (remaining < 0) ) {
try {
synchronized (_outboundQueue) {
if (_outboundQueue.size() < MAX_QUEUED) {
lifetime = packet.getLifetime();
_outboundQueue.add(packet);
remaining = _outboundQueue.size();
_outboundQueue.notifyAll();
@ -91,7 +95,9 @@ public class UDPSender {
}
} catch (InterruptedException ie) {}
}
_context.statManager().addRateData("udp.sendQueueSize", remaining, packet.getLifetime());
_context.statManager().addRateData("udp.sendQueueSize", remaining, lifetime);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Added the packet onto the queue with " + remaining + " remaining and a lifetime of " + lifetime);
return remaining;
}
@ -101,18 +107,24 @@ public class UDPSender {
*/
public int add(UDPPacket packet) {
int size = 0;
long lifetime = -1;
synchronized (_outboundQueue) {
lifetime = packet.getLifetime();
_outboundQueue.add(packet);
size = _outboundQueue.size();
_outboundQueue.notifyAll();
}
_context.statManager().addRateData("udp.sendQueueSize", size, packet.getLifetime());
_context.statManager().addRateData("udp.sendQueueSize", size, lifetime);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + lifetime);
return size;
}
private class Runner implements Runnable {
private boolean _socketChanged;
public void run() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Running the UDP sender");
_socketChanged = false;
while (_keepRunning) {
if (_socketChanged) {
@ -122,8 +134,11 @@ public class UDPSender {
UDPPacket packet = getNextPacket();
if (packet != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet to send known: " + packet);
long acquireTime = _context.clock().now();
int size = packet.getPacket().getLength();
int size = packet.getPacketDataLength(); // packet.getPacket().getLength();
int size2 = packet.getPacket().getLength();
if (size > 0) {
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender");
while (req.getPendingOutboundRequested() > 0)
@ -133,17 +148,23 @@ public class UDPSender {
long afterBW = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) {
int len = packet.getPacket().getLength();
//if (len > 128)
// len = 128;
_log.debug("Sending packet: \nraw: " + Base64.encode(packet.getPacket().getData(), 0, len));
//_log.debug("Sending packet: (size="+size + "/"+size2 +")\nraw: " + Base64.encode(packet.getPacket().getData(), 0, size));
}
//packet.getPacket().setLength(size);
try {
long before = _context.clock().now();
synchronized (Runner.this) {
// synchronization lets us update safely
_socket.send(packet.getPacket());
//_log.debug("Break out datagram for " + packet);
DatagramPacket dp = packet.getPacket();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Just before socket.send of " + packet);
_socket.send(dp);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Just after socket.send of " + packet);
}
long sendTime = _context.clock().now() - before;
_context.statManager().addRateData("udp.socketSendTime", sendTime, packet.getLifetime());
@ -151,7 +172,7 @@ public class UDPSender {
if (packet.getMarkedType() == 1)
_context.statManager().addRateData("udp.sendACKTime", afterBW - acquireTime, packet.getLifetime());
_context.statManager().addRateData("udp.pushTime", packet.getLifetime(), packet.getLifetime());
_context.statManager().addRateData("udp.sendPacketSize", packet.getPacket().getLength(), packet.getLifetime());
_context.statManager().addRateData("udp.sendPacketSize", size, packet.getLifetime());
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error sending", ioe);
@ -161,6 +182,8 @@ public class UDPSender {
packet.release();
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stop sending...");
}
private UDPPacket getNextPacket() {

View File

@ -40,7 +40,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private UDPEndpoint _endpoint;
/** Peer (Hash) to PeerState */
private Map _peersByIdent;
/** Remote host (ip+port as a string) to PeerState */
/** RemoteHostId to PeerState */
private Map _peersByRemoteHost;
/** Relay tag (base64 String) to PeerState */
private Map _peersByRelayTag;
@ -98,7 +98,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 };
/** should we flood all UDP peers with the configured rate? */
private static final boolean SHOULD_FLOOD_PEERS = false;
private static final boolean SHOULD_FLOOD_PEERS = true;
private static final int MAX_CONSECUTIVE_FAILED = 5;
@ -270,7 +270,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* if no state exists
*/
public PeerState getPeerState(InetAddress remoteHost, int remotePort) {
String hostInfo = PeerState.calculateRemoteHostString(remoteHost.getAddress(), remotePort);
RemoteHostId hostInfo = new RemoteHostId(remoteHost.getAddress(), remotePort);
synchronized (_peersByRemoteHost) {
return (PeerState)_peersByRemoteHost.get(hostInfo);
}
@ -316,11 +316,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
String remoteString = peer.getRemoteHostString();
if (remoteString == null) return false;
RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId == null) return false;
synchronized (_peersByRemoteHost) {
PeerState oldPeer = (PeerState)_peersByRemoteHost.put(remoteString, peer);
PeerState oldPeer = (PeerState)_peersByRemoteHost.put(remoteId, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) {
//_peersByRemoteHost.put(remoteString, oldPeer);
//return false;
@ -348,10 +348,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
String remoteString = peer.getRemoteHostString();
if (remoteString != null) {
RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId != null) {
synchronized (_peersByRemoteHost) {
_peersByRemoteHost.remove(remoteString);
_peersByRemoteHost.remove(remoteId);
}
}
@ -568,6 +568,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(" <td><b>cwnd</b></td><td><b>ssthresh</b></td>\n");
buf.append(" <td><b>rtt</b></td><td><b>dev</b></td><td><b>rto</b></td>\n");
buf.append(" <td><b>send</b></td><td><b>recv</b></td>\n");
buf.append(" <td><b>resent</b></td><td><b>dupRecv</b></td>\n");
buf.append(" </tr>\n");
out.write(buf.toString());
buf.setLength(0);
@ -640,11 +641,22 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("</td>");
buf.append("<td>");
buf.append(peer.getMessagesSent());
buf.append(peer.getPacketsTransmitted());
buf.append("</td>");
buf.append("<td>");
buf.append(peer.getMessagesReceived());
buf.append(peer.getPacketsReceived());
buf.append("</td>");
double sendLostPct = (double)peer.getPacketsRetransmitted()/(double)PeerState.RETRANSMISSION_PERIOD_WIDTH;
buf.append("<td>");
//buf.append(formatPct(sendLostPct));
buf.append(peer.getPacketRetransmissionRate());
buf.append("</td>");
double recvDupPct = (double)peer.getPacketsReceivedDuplicate()/(double)peer.getPacketsReceived();
buf.append("<td>");
buf.append(formatPct(recvDupPct));
buf.append("</td>");
buf.append("</tr>");
@ -655,12 +667,29 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
out.write("</table>\n");
}
public PartialACKSource getPartialACKSource() { return _inboundFragments; }
/** help us grab partial ACKs */
public interface PartialACKSource {
/**
* build partial ACKs of messages received from the peer and store
* them in the ackBitfields
*/
public void fetchPartialACKs(Hash fromPeer, List ackBitfields);
}
private static final DecimalFormat _fmt = new DecimalFormat("#,##0.00");
private static final String formatKBps(int bps) {
synchronized (_fmt) {
return _fmt.format((float)bps/1024);
}
}
private static final DecimalFormat _pctFmt = new DecimalFormat("#0.0%");
private static final String formatPct(double pct) {
synchronized (_pctFmt) {
return _pctFmt.format(pct);
}
}
/**
* Cache the bid to reduce object churn

View File

@ -1,6 +1,9 @@
package net.i2p.router.tunnel;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.util.ByteCache;
import net.i2p.util.DecayingBloomFilter;
/**
@ -11,6 +14,7 @@ import net.i2p.util.DecayingBloomFilter;
public class BloomFilterIVValidator implements IVValidator {
private I2PAppContext _context;
private DecayingBloomFilter _filter;
private ByteCache _ivXorCache = ByteCache.getInstance(32, HopProcessor.IV_LENGTH);
/**
* After 2*halflife, an entry is completely forgotten from the bloom filter.
@ -24,11 +28,13 @@ public class BloomFilterIVValidator implements IVValidator {
_filter = new DecayingBloomFilter(ctx, HALFLIFE_MS, 16);
ctx.statManager().createRateStat("tunnel.duplicateIV", "Note that a duplicate IV was received", "Tunnels",
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
}
public boolean receiveIV(byte[] iv) {
boolean dup = _filter.add(iv);
public boolean receiveIV(byte ivData[], int ivOffset, byte payload[], int payloadOffset) {
ByteArray buf = _ivXorCache.acquire();
DataHelper.xor(ivData, ivOffset, payload, payloadOffset, buf.getData(), 0, HopProcessor.IV_LENGTH);
boolean dup = _filter.add(buf.getData());
_ivXorCache.release(buf);
if (dup) _context.statManager().addRateData("tunnel.duplicateIV", 1, 1);
return !dup; // return true if it is OK, false if it isn't
}

View File

@ -8,5 +8,6 @@ class DummyValidator implements IVValidator {
public static DummyValidator getInstance() { return _instance; }
private DummyValidator() {}
public boolean receiveIV(byte[] iv) { return true; }
public boolean receiveIV(byte ivData[], int ivOffset, byte payload[], int payloadOffset) { return true; }
}

View File

@ -143,7 +143,8 @@ public class FragmentHandler {
boolean eq = DataHelper.eq(v.getData(), 0, preprocessed, offset + HopProcessor.IV_LENGTH, 4);
if (!eq) {
if (_log.shouldLog(Log.ERROR))
_log.error("Corrupt tunnel message - verification fails");
_log.error("Corrupt tunnel message - verification fails: \n" + Base64.encode(preprocessed, offset+HopProcessor.IV_LENGTH, 4)
+ "\n" + Base64.encode(v.getData(), 0, 4));
if (_log.shouldLog(Log.WARN))
_log.warn("nomatching endpoint: # pad bytes: " + (paddingEnd-(HopProcessor.IV_LENGTH+4)-1) + "\n"
+ " offset=" + offset + " length=" + length + " paddingEnd=" + paddingEnd

View File

@ -2,6 +2,7 @@ package net.i2p.router.tunnel;
import java.util.HashSet;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
/**
* waste lots of RAM
@ -12,9 +13,12 @@ class HashSetIVValidator implements IVValidator {
public HashSetIVValidator() {
_received = new HashSet();
}
public boolean receiveIV(byte[] iv) {
public boolean receiveIV(byte ivData[], int ivOffset, byte payload[], int payloadOffset) {
//if (true) // foo!
// return true;
byte iv[] = new byte[HopProcessor.IV_LENGTH];
DataHelper.xor(ivData, ivOffset, payload, payloadOffset, iv, 0, HopProcessor.IV_LENGTH);
ByteArray ba = new ByteArray(iv);
boolean isNew = false;
synchronized (_received) {

View File

@ -66,10 +66,7 @@ public class HopProcessor {
}
}
ByteArray ba = _cache.acquire();
byte iv[] = ba.getData(); // new byte[IV_LENGTH];
System.arraycopy(orig, offset, iv, 0, IV_LENGTH);
boolean okIV = _validator.receiveIV(iv);
boolean okIV = _validator.receiveIV(orig, offset, orig, offset + IV_LENGTH);
if (!okIV) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid IV received on tunnel " + _config.getReceiveTunnelId());
@ -88,7 +85,6 @@ public class HopProcessor {
//_log.debug("Data after processing: " + Base64.encode(orig, IV_LENGTH, orig.length - IV_LENGTH));
//_log.debug("IV sent: " + Base64.encode(orig, 0, IV_LENGTH));
}
_cache.release(ba);
return true;
}

View File

@ -8,9 +8,14 @@ package net.i2p.router.tunnel;
*/
public interface IVValidator {
/**
* receive the IV for the tunnel, returning true if it is valid,
* or false if it has already been used (or is otherwise invalid).
* receive the IV for the tunnel message, returning true if it is valid,
* or false if it has already been used (or is otherwise invalid). To
* prevent colluding attackers from successfully tagging the tunnel by
* switching the IV and the first block of the message, the validator should
* treat the XOR of the IV and the first block as the unique identifier,
* not the IV alone (since the tunnel is encrypted via AES/CBC). Thanks to
* dvorak for pointing out that tagging!
*
*/
public boolean receiveIV(byte iv[]);
public boolean receiveIV(byte iv[], int ivOffset, byte payload[], int payloadOffset);
}

View File

@ -58,7 +58,7 @@ public class InboundEndpointProcessor {
//if (_config.getLength() > 1)
// _log.debug("IV at inbound endpoint before decrypt: " + Base64.encode(iv));
boolean ok = _validator.receiveIV(iv);
boolean ok = _validator.receiveIV(iv, 0, orig, offset + HopProcessor.IV_LENGTH);
if (!ok) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid IV received");

View File

@ -356,8 +356,7 @@ public class TunnelDispatcher implements Service {
if (_log.shouldLog(Log.DEBUG))
_log.debug("dispatch to participant " + participant + ": " + msg.getUniqueId() + " from "
+ recvFrom.toBase64().substring(0,4));
_context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + " on tunnel "
+ msg.getTunnelId().getTunnelId() + " as participant");
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId().getTunnelId(), "participant");
participant.dispatch(msg, recvFrom);
_context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0);
} else {
@ -370,8 +369,7 @@ public class TunnelDispatcher implements Service {
if (_log.shouldLog(Log.DEBUG))
_log.debug("dispatch where we are the outbound endpoint: " + endpoint + ": "
+ msg + " from " + recvFrom.toBase64().substring(0,4));
_context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + " on tunnel "
+ msg.getTunnelId().getTunnelId() + " as outbound endpoint");
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId().getTunnelId(), "outbound endpoint");
endpoint.dispatch(msg, recvFrom);
_context.statManager().addRateData("tunnel.dispatchEndpoint", 1, 0);
@ -481,9 +479,9 @@ public class TunnelDispatcher implements Service {
+ (before-msg.getMessageExpiration()) + "ms ago? "
+ msg, new Exception("cause"));
}
_context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + " on tunnel "
+ outboundTunnel + "/" + targetTunnel + " to "
+ targetPeer + " as outbound gateway");
long tid1 = (outboundTunnel != null ? outboundTunnel.getTunnelId() : -1);
long tid2 = (targetTunnel != null ? targetTunnel.getTunnelId() : -1);
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), tid1, tid2, targetPeer, "outbound gateway");
gw.add(msg, targetPeer, targetTunnel);
if (targetTunnel == null)
_context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1, 0);