2006-02-20 jrandom

* Major SSU and router tuning to reduce contention, memory usage, and GC
      churn.  There are still issues to be worked out, but this should be a
      substantial improvement.
    * Modified the optional netDb harvester task to support choosing whether
      to use (non-anonymous) direct connections or (anonymous) exploratory
      tunnels to do the harvesting.  Harvesting itself is enabled via the
      advanced config "netDb.shouldHarvest=true" (default is false) and the
      connection type can be chosen via "netDb.harvestDirectly=false" (default
      is false).
This commit is contained in:
jrandom
2006-02-20 14:19:52 +00:00
committed by zzz
parent 222af6c090
commit 4b77ddedcc
28 changed files with 473 additions and 322 deletions

View File

@ -250,6 +250,10 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
+ from + " and " + to); + from + " and " + to);
} }
// boo, hiss! shouldn't need this - the streaming lib should be configurable, but
// somehow the inactivity timer is sometimes failing to get triggered properly
//i2ps.setReadTimeout(2*60*1000);
ByteArray ba = _cache.acquire(); ByteArray ba = _cache.acquire();
byte[] buffer = ba.getData(); // new byte[NETWORK_BUFFER_SIZE]; byte[] buffer = ba.getData(); // new byte[NETWORK_BUFFER_SIZE];
try { try {

View File

@ -71,7 +71,9 @@ public class ReseedHandler {
seedURL = DEFAULT_SEED_URL; seedURL = DEFAULT_SEED_URL;
try { try {
URL dir = new URL(seedURL); URL dir = new URL(seedURL);
String content = new String(readURL(dir)); byte contentRaw[] = readURL(dir);
if (contentRaw == null) return;
String content = new String(contentRaw);
Set urls = new HashSet(); Set urls = new HashSet();
int cur = 0; int cur = 0;
while (true) { while (true) {

View File

@ -140,6 +140,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
loadConfig(options); loadConfig(options);
_sessionId = null; _sessionId = null;
_leaseSet = null; _leaseSet = null;
_context.statManager().createRateStat("client.availableMessages", "How many messages are available for the current client", "ClientMessages", new long[] { 60*1000, 10*60*1000 });
} }
/** /**
@ -299,11 +300,17 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* *
*/ */
public byte[] receiveMessage(int msgId) throws I2PSessionException { public byte[] receiveMessage(int msgId) throws I2PSessionException {
int remaining = 0;
MessagePayloadMessage msg = null; MessagePayloadMessage msg = null;
synchronized (_availableMessages) { synchronized (_availableMessages) {
msg = (MessagePayloadMessage) _availableMessages.remove(new Long(msgId)); msg = (MessagePayloadMessage) _availableMessages.remove(new Long(msgId));
remaining = _availableMessages.size();
}
_context.statManager().addRateData("client.availableMessages", remaining, 0);
if (msg == null) {
_log.error("Receive message " + msgId + " had no matches, remaining=" + remaining);
return null;
} }
if (msg == null) return null;
return msg.getPayload().getUnencryptedData(); return msg.getPayload().getUnencryptedData();
} }
@ -339,9 +346,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* Recieve a payload message and let the app know its available * Recieve a payload message and let the app know its available
*/ */
public void addNewMessage(MessagePayloadMessage msg) { public void addNewMessage(MessagePayloadMessage msg) {
Long mid = new Long(msg.getMessageId());
int avail = 0;
synchronized (_availableMessages) { synchronized (_availableMessages) {
_availableMessages.put(new Long(msg.getMessageId()), msg); _availableMessages.put(mid, msg);
avail = _availableMessages.size();
} }
_context.statManager().addRateData("client.availableMessages", avail, 0);
long id = msg.getMessageId(); long id = msg.getMessageId();
byte data[] = msg.getPayload().getUnencryptedData(); byte data[] = msg.getPayload().getUnencryptedData();
if ((data == null) || (data.length <= 0)) { if ((data == null) || (data.length <= 0)) {
@ -354,20 +365,23 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id); _log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id);
} }
SimpleTimer.getInstance().addEvent(new VerifyUsage(id), 30*1000); SimpleTimer.getInstance().addEvent(new VerifyUsage(mid), 30*1000);
} }
private class VerifyUsage implements SimpleTimer.TimedEvent { private class VerifyUsage implements SimpleTimer.TimedEvent {
private long _msgId; private Long _msgId;
public VerifyUsage(long id) { _msgId = id; } public VerifyUsage(Long id) { _msgId = id; }
public void timeReached() { public void timeReached() {
MessagePayloadMessage removed = null; MessagePayloadMessage removed = null;
int remaining = 0;
synchronized (_availableMessages) { synchronized (_availableMessages) {
removed = (MessagePayloadMessage)_availableMessages.remove(new Long(_msgId)); removed = (MessagePayloadMessage)_availableMessages.remove(_msgId);
remaining = _availableMessages.size();
}
if (removed != null) {
_log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed + ": remaining: " + remaining);
_context.statManager().addRateData("client.availableMessages", remaining, 0);
} }
if (removed != null)
_log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed);
} }
} }
private class AvailabilityNotifier implements Runnable { private class AvailabilityNotifier implements Runnable {

View File

@ -283,11 +283,12 @@ public class ElGamalAESEngine {
try { try {
SessionKey newKey = null; SessionKey newKey = null;
Hash readHash = null; Hash readHash = null;
List tags = new ArrayList(); List tags = null;
//ByteArrayInputStream bais = new ByteArrayInputStream(decrypted); //ByteArrayInputStream bais = new ByteArrayInputStream(decrypted);
int cur = 0; int cur = 0;
long numTags = DataHelper.fromLong(decrypted, cur, 2); long numTags = DataHelper.fromLong(decrypted, cur, 2);
if (numTags > 0) tags = new ArrayList((int)numTags);
cur += 2; cur += 2;
//_log.debug("# tags: " + numTags); //_log.debug("# tags: " + numTags);
if ((numTags < 0) || (numTags > 200)) throw new Exception("Invalid number of session tags"); if ((numTags < 0) || (numTags > 200)) throw new Exception("Invalid number of session tags");
@ -326,7 +327,8 @@ public class ElGamalAESEngine {
if (eq) { if (eq) {
// everything matches. w00t. // everything matches. w00t.
foundTags.addAll(tags); if (tags != null)
foundTags.addAll(tags);
if (newKey != null) foundKey.setData(newKey.getData()); if (newKey != null) foundKey.setData(newKey.getData());
return unencrData; return unencrData;
} }
@ -610,4 +612,4 @@ public class ElGamalAESEngine {
} }
} }
} }
} }

View File

@ -129,6 +129,7 @@ public class ElGamalEngine {
(ybytes.length > 257 ? 257 : ybytes.length)); (ybytes.length > 257 ? 257 : ybytes.length));
System.arraycopy(dbytes, 0, out, (dbytes.length < 257 ? 514 - dbytes.length : 257), System.arraycopy(dbytes, 0, out, (dbytes.length < 257 ? 514 - dbytes.length : 257),
(dbytes.length > 257 ? 257 : dbytes.length)); (dbytes.length > 257 ? 257 : dbytes.length));
/*
StringBuffer buf = new StringBuffer(1024); StringBuffer buf = new StringBuffer(1024);
buf.append("Timing\n"); buf.append("Timing\n");
buf.append("0-1: ").append(t1 - t0).append('\n'); buf.append("0-1: ").append(t1 - t0).append('\n');
@ -142,6 +143,7 @@ public class ElGamalEngine {
buf.append("8-9: ").append(t9 - t8).append('\n'); buf.append("8-9: ").append(t9 - t8).append('\n');
buf.append("9-10: ").append(t10 - t9).append('\n'); buf.append("9-10: ").append(t10 - t9).append('\n');
//_log.debug(buf.toString()); //_log.debug(buf.toString());
*/
long end = _context.clock().now(); long end = _context.clock().now();
long diff = end - start; long diff = end - start;

View File

@ -151,15 +151,17 @@ public class I2CPMessageReader {
_log.debug("After handling the newly received message"); _log.debug("After handling the newly received message");
} }
} catch (I2CPMessageException ime) { } catch (I2CPMessageException ime) {
_log.error("Error handling message", ime); _log.warn("Error handling message", ime);
_listener.readError(I2CPMessageReader.this, ime); _listener.readError(I2CPMessageReader.this, ime);
cancelRunner(); cancelRunner();
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("IO Error handling message", ioe); _log.warn("IO Error handling message", ioe);
_listener.disconnected(I2CPMessageReader.this); _listener.disconnected(I2CPMessageReader.this);
cancelRunner(); cancelRunner();
} catch (Throwable t) { } catch (OutOfMemoryError oom) {
_log.log(Log.CRIT, "Unhandled error reading I2CP stream", t); throw oom;
} catch (Exception e) {
_log.log(Log.CRIT, "Unhandled error reading I2CP stream", e);
_listener.disconnected(I2CPMessageReader.this); _listener.disconnected(I2CPMessageReader.this);
cancelRunner(); cancelRunner();
} }

View File

@ -45,6 +45,10 @@ public class SimpleTimer {
} }
} }
public void reschedule(TimedEvent event, long timeoutMs) {
addEvent(event, timeoutMs, false);
}
/** /**
* Queue up the given event to be fired no sooner than timeoutMs from now. * Queue up the given event to be fired no sooner than timeoutMs from now.
* However, if this event is already scheduled, the event will be scheduled * However, if this event is already scheduled, the event will be scheduled
@ -52,7 +56,12 @@ public class SimpleTimer {
* timeout. If this is not the desired behavior, call removeEvent first. * timeout. If this is not the desired behavior, call removeEvent first.
* *
*/ */
public void addEvent(TimedEvent event, long timeoutMs) { public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); }
/**
* @param useEarliestEventTime if its already scheduled, use the earlier of the
* two timeouts, else use the later
*/
public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
int totalEvents = 0; int totalEvents = 0;
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long eventTime = now + timeoutMs; long eventTime = now + timeoutMs;
@ -61,11 +70,20 @@ public class SimpleTimer {
// remove the old scheduled position, then reinsert it // remove the old scheduled position, then reinsert it
Long oldTime = (Long)_eventTimes.get(event); Long oldTime = (Long)_eventTimes.get(event);
if (oldTime != null) { if (oldTime != null) {
if (oldTime.longValue() < eventTime) { if (useEarliestTime) {
_events.notifyAll(); if (oldTime.longValue() < eventTime) {
return; // already scheduled for sooner than requested _events.notifyAll();
return; // already scheduled for sooner than requested
} else {
_events.remove(oldTime);
}
} else { } else {
_events.remove(oldTime); if (oldTime.longValue() > eventTime) {
_events.notifyAll();
return; // already scheduled for later than the given period
} else {
_events.remove(oldTime);
}
} }
} }
while (_events.containsKey(time)) while (_events.containsKey(time))

View File

@ -30,10 +30,19 @@ public class DataMessage extends I2NPMessageImpl {
_data = null; _data = null;
} }
public byte[] getData() { return _data; } public byte[] getData() {
public void setData(byte data[]) { _data = data; } verifyUnwritten();
return _data;
}
public void setData(byte[] data) {
verifyUnwritten();
_data = data;
}
public int getSize() { return _data.length; } public int getSize() {
verifyUnwritten();
return _data.length;
}
public void readMessage(byte data[], int offset, int dataSize, int type) throws I2NPMessageException, IOException { public void readMessage(byte data[], int offset, int dataSize, int type) throws I2NPMessageException, IOException {
if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message"); if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message");
@ -55,6 +64,7 @@ public class DataMessage extends I2NPMessageImpl {
} }
/** write the message body to the output array, starting at the given index */ /** write the message body to the output array, starting at the given index */
protected int writeMessageBody(byte out[], int curIndex) { protected int writeMessageBody(byte out[], int curIndex) {
verifyUnwritten();
if (_data == null) { if (_data == null) {
out[curIndex++] = 0x0; out[curIndex++] = 0x0;
out[curIndex++] = 0x0; out[curIndex++] = 0x0;
@ -70,6 +80,11 @@ public class DataMessage extends I2NPMessageImpl {
return curIndex; return curIndex;
} }
protected void written() {
super.written();
_data = null;
}
public int getType() { return MESSAGE_TYPE; } public int getType() { return MESSAGE_TYPE; }
public int hashCode() { public int hashCode() {

View File

@ -28,8 +28,14 @@ public class GarlicMessage extends I2NPMessageImpl {
setData(null); setData(null);
} }
public byte[] getData() { return _data; } public byte[] getData() {
public void setData(byte[] data) { _data = data; } verifyUnwritten();
return _data;
}
public void setData(byte[] data) {
verifyUnwritten();
_data = data;
}
public void readMessage(byte data[], int offset, int dataSize, int type) throws I2NPMessageException, IOException { public void readMessage(byte data[], int offset, int dataSize, int type) throws I2NPMessageException, IOException {
if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message"); if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message");
@ -43,11 +49,13 @@ public class GarlicMessage extends I2NPMessageImpl {
} }
/** calculate the message body's length (not including the header and footer */ /** calculate the message body's length (not including the header and footer */
protected int calculateWrittenLength() { protected int calculateWrittenLength() {
verifyUnwritten();
return 4 + _data.length; return 4 + _data.length;
} }
/** write the message body to the output array, starting at the given index */ /** write the message body to the output array, starting at the given index */
protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException {
verifyUnwritten();
byte len[] = DataHelper.toLong(4, _data.length); byte len[] = DataHelper.toLong(4, _data.length);
System.arraycopy(len, 0, out, curIndex, 4); System.arraycopy(len, 0, out, curIndex, 4);
curIndex += 4; curIndex += 4;
@ -62,6 +70,11 @@ public class GarlicMessage extends I2NPMessageImpl {
return DataHelper.hashCode(getData()); return DataHelper.hashCode(getData());
} }
protected void written() {
super.written();
_data = null;
}
public boolean equals(Object object) { public boolean equals(Object object) {
if ( (object != null) && (object instanceof GarlicMessage) ) { if ( (object != null) && (object instanceof GarlicMessage) ) {
GarlicMessage msg = (GarlicMessage)object; GarlicMessage msg = (GarlicMessage)object;

View File

@ -32,7 +32,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
protected I2PAppContext _context; protected I2PAppContext _context;
private long _expiration; private long _expiration;
private long _uniqueId; private long _uniqueId;
private byte _data[]; private boolean _written;
public final static long DEFAULT_EXPIRATION_MS = 1*60*1000; // 1 minute by default public final static long DEFAULT_EXPIRATION_MS = 1*60*1000; // 1 minute by default
public final static int CHECKSUM_LENGTH = 1; //Hash.HASH_LENGTH; public final static int CHECKSUM_LENGTH = 1; //Hash.HASH_LENGTH;
@ -53,6 +53,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
_log = context.logManager().getLog(I2NPMessageImpl.class); _log = context.logManager().getLog(I2NPMessageImpl.class);
_expiration = _context.clock().now() + DEFAULT_EXPIRATION_MS; _expiration = _context.clock().now() + DEFAULT_EXPIRATION_MS;
_uniqueId = _context.random().nextLong(MAX_ID_VALUE); _uniqueId = _context.random().nextLong(MAX_ID_VALUE);
_written = false;
//_context.statManager().createRateStat("i2np.writeTime", "How long it takes to write an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 }); //_context.statManager().createRateStat("i2np.writeTime", "How long it takes to write an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 });
//_context.statManager().createRateStat("i2np.readTime", "How long it takes to read an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 }); //_context.statManager().createRateStat("i2np.readTime", "How long it takes to read an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 });
} }
@ -264,6 +265,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
public int toRawByteArray(byte buffer[]) { public int toRawByteArray(byte buffer[]) {
verifyUnwritten();
if (RAW_FULL_SIZE) if (RAW_FULL_SIZE)
return toByteArray(buffer); return toByteArray(buffer);
try { try {
@ -277,6 +279,8 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
_context.logManager().getLog(getClass()).log(Log.CRIT, "Error writing", ime); _context.logManager().getLog(getClass()).log(Log.CRIT, "Error writing", ime);
throw new IllegalStateException("Unable to serialize the message (" + getClass().getName() throw new IllegalStateException("Unable to serialize the message (" + getClass().getName()
+ "): " + ime.getMessage()); + "): " + ime.getMessage());
} finally {
written();
} }
} }
@ -316,6 +320,8 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
} }
} }
protected void verifyUnwritten() { if (_written) throw new RuntimeException("Already written"); }
protected void written() { _written = true; }
/** /**
* Yes, this is fairly ugly, but its the only place it ever happens. * Yes, this is fairly ugly, but its the only place it ever happens.

View File

@ -66,7 +66,7 @@ public class JobTiming implements Clock.ClockUpdateListener {
*/ */
public void end() { public void end() {
_actualEnd = _context.clock().now(); _actualEnd = _context.clock().now();
_context.clock().removeUpdateListener(this); //_context.clock().removeUpdateListener(this);
} }
public void offsetChanged(long delta) { public void offsetChanged(long delta) {

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.349 $ $Date: 2006/02/18 22:22:32 $"; public final static String ID = "$Revision: 1.350 $ $Date: 2006/02/19 07:29:59 $";
public final static String VERSION = "0.6.1.10"; public final static String VERSION = "0.6.1.10";
public final static long BUILD = 5; public final static long BUILD = 6;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -21,6 +21,7 @@ import java.util.Set;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.LeaseSet; import net.i2p.data.LeaseSet;
import net.i2p.data.Payload; import net.i2p.data.Payload;
import net.i2p.data.Hash;
import net.i2p.data.i2cp.DisconnectMessage; import net.i2p.data.i2cp.DisconnectMessage;
import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.I2CPMessageException;
@ -71,6 +72,7 @@ public class ClientConnectionRunner {
*/ */
private List _alreadyProcessed; private List _alreadyProcessed;
private ClientWriterRunner _writer; private ClientWriterRunner _writer;
private Hash _destHashCache;
/** are we, uh, dead */ /** are we, uh, dead */
private boolean _dead; private boolean _dead;
@ -144,6 +146,7 @@ public class ClientConnectionRunner {
/** currently allocated leaseSet */ /** currently allocated leaseSet */
public LeaseSet getLeaseSet() { return _currentLeaseSet; } public LeaseSet getLeaseSet() { return _currentLeaseSet; }
void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; } void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; }
public Hash getDestHash() { return _destHashCache; }
/** current client's sessionId */ /** current client's sessionId */
SessionId getSessionId() { return _sessionId; } SessionId getSessionId() { return _sessionId; }
@ -206,8 +209,9 @@ public class ClientConnectionRunner {
} }
void sessionEstablished(SessionConfig config) { void sessionEstablished(SessionConfig config) {
_destHashCache = config.getDestination().calculateHash();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("SessionEstablished called for destination " + config.getDestination().calculateHash().toBase64()); _log.debug("SessionEstablished called for destination " + _destHashCache.toBase64());
_config = config; _config = config;
_manager.destinationEstablished(this); _manager.destinationEstablished(this);
} }

View File

@ -251,21 +251,11 @@ public class ClientManager {
} }
public boolean isLocal(Hash destHash) { public boolean isLocal(Hash destHash) {
if (destHash == null) return false; if (destHash == null) return false;
Set dests = new HashSet();
long beforeLock = _ctx.clock().now();
long inLock = 0;
synchronized (_runners) { synchronized (_runners) {
inLock = _ctx.clock().now(); for (Iterator iter = _runners.values().iterator(); iter.hasNext(); ) {
dests.addAll(_runners.keySet()); ClientConnectionRunner cur = (ClientConnectionRunner)iter.next();
} if (destHash.equals(cur.getDestHash())) return true;
long afterLock = _ctx.clock().now(); }
if (afterLock - beforeLock > 50) {
_log.warn("isLocal(Hash).locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
for (Iterator iter = dests.iterator(); iter.hasNext();) {
Destination d = (Destination)iter.next();
if (d.calculateHash().equals(destHash)) return true;
} }
return false; return false;
} }
@ -324,23 +314,12 @@ public class ClientManager {
private ClientConnectionRunner getRunner(Hash destHash) { private ClientConnectionRunner getRunner(Hash destHash) {
if (destHash == null) if (destHash == null)
return null; return null;
Set dests = new HashSet();
long beforeLock = _ctx.clock().now();
long inLock = 0;
synchronized (_runners) { synchronized (_runners) {
inLock = _ctx.clock().now(); for (Iterator iter = _runners.values().iterator(); iter.hasNext(); ) {
dests.addAll(_runners.keySet()); ClientConnectionRunner cur = (ClientConnectionRunner)iter.next();
} if (cur.getDestHash().equals(destHash))
long afterLock = _ctx.clock().now(); return cur;
if (afterLock - beforeLock > 50) { }
_log.warn("getRunner(Hash).locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
for (Iterator iter = dests.iterator(); iter.hasNext(); ) {
Destination d = (Destination)iter.next();
if (d.calculateHash().equals(destHash))
return getRunner(d);
} }
return null; return null;
} }

View File

@ -13,6 +13,7 @@ import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.router.message.SendMessageDirectJob; import net.i2p.router.message.SendMessageDirectJob;
/** /**
@ -41,6 +42,10 @@ class HarvesterJob extends JobImpl {
private static final int PRIORITY = 100; private static final int PRIORITY = 100;
private static final String PROP_ENABLED = "netDb.shouldHarvest"; private static final String PROP_ENABLED = "netDb.shouldHarvest";
private boolean harvestDirectly() {
return Boolean.valueOf(getContext().getProperty("netDb.harvestDirectly", "false")).booleanValue();
}
public HarvesterJob(RouterContext context, KademliaNetworkDatabaseFacade facade) { public HarvesterJob(RouterContext context, KademliaNetworkDatabaseFacade facade) {
super(context); super(context);
@ -107,13 +112,28 @@ class HarvesterJob extends JobImpl {
*/ */
private void harvest(Hash peer) { private void harvest(Hash peer) {
long now = getContext().clock().now(); long now = getContext().clock().now();
DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true); if (harvestDirectly()) {
msg.setFrom(getContext().routerHash()); DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true);
msg.setMessageExpiration(10*1000+now); msg.setFrom(getContext().routerHash());
msg.setSearchKey(peer); msg.setMessageExpiration(10*1000+now);
msg.setReplyTunnel(null); msg.setSearchKey(peer);
SendMessageDirectJob job = new SendMessageDirectJob(getContext(), msg, peer, 10*1000, PRIORITY); msg.setReplyTunnel(null);
job.runJob(); SendMessageDirectJob job = new SendMessageDirectJob(getContext(), msg, peer, 10*1000, PRIORITY);
//getContext().jobQueue().addJob(job); job.runJob();
//getContext().jobQueue().addJob(job);
} else {
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel();
TunnelInfo sendTunnel = getContext().tunnelManager().selectOutboundTunnel();
if ( (replyTunnel != null) && (sendTunnel != null) ) {
DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true);
msg.setFrom(replyTunnel.getPeer(0));
msg.setMessageExpiration(10*1000+now);
msg.setSearchKey(peer);
msg.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
// we don't even bother to register a reply selector, because we don't really care.
// just send it out, and if we get a reply, neat. if not, oh well
getContext().tunnelDispatcher().dispatchOutbound(msg, sendTunnel.getSendTunnelId(0), peer);
}
}
} }
} }

View File

@ -119,6 +119,8 @@ public class FIFOBandwidthLimiter {
_refiller.reinitialize(); _refiller.reinitialize();
} }
public Request createRequest() { return new SimpleRequest(); }
/** /**
* Request some bytes, blocking until they become available * Request some bytes, blocking until they become available
* *
@ -130,15 +132,21 @@ public class FIFOBandwidthLimiter {
} }
SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose); SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose);
requestInbound(req, bytesIn, purpose);
return req;
}
public void requestInbound(Request req, int bytesIn, String purpose) {
req.init(bytesIn, 0, purpose);
if (false) { ((SimpleRequest)req).allocateAll(); return; }
int pending = 0; int pending = 0;
synchronized (_pendingInboundRequests) { synchronized (_pendingInboundRequests) {
pending = _pendingInboundRequests.size(); pending = _pendingInboundRequests.size();
_pendingInboundRequests.add(req); _pendingInboundRequests.add(req);
} }
satisfyInboundRequests(); satisfyInboundRequests(((SimpleRequest)req).satisfiedBuffer);
((SimpleRequest)req).satisfiedBuffer.clear();
if (pending > 0) if (pending > 0)
_context.statManager().addRateData("bwLimiter.pendingInboundRequests", pending, pending); _context.statManager().addRateData("bwLimiter.pendingInboundRequests", pending, pending);
return req;
} }
/** /**
* Request some bytes, blocking until they become available * Request some bytes, blocking until they become available
@ -151,15 +159,21 @@ public class FIFOBandwidthLimiter {
} }
SimpleRequest req = new SimpleRequest(0, bytesOut, purpose); SimpleRequest req = new SimpleRequest(0, bytesOut, purpose);
requestOutbound(req, bytesOut, purpose);
return req;
}
public void requestOutbound(Request req, int bytesOut, String purpose) {
req.init(0, bytesOut, purpose);
if (false) { ((SimpleRequest)req).allocateAll(); return; }
int pending = 0; int pending = 0;
synchronized (_pendingOutboundRequests) { synchronized (_pendingOutboundRequests) {
pending = _pendingOutboundRequests.size(); pending = _pendingOutboundRequests.size();
_pendingOutboundRequests.add(req); _pendingOutboundRequests.add(req);
} }
satisfyOutboundRequests(); satisfyOutboundRequests(((SimpleRequest)req).satisfiedBuffer);
((SimpleRequest)req).satisfiedBuffer.clear();
if (pending > 0) if (pending > 0)
_context.statManager().addRateData("bwLimiter.pendingOutboundRequests", pending, pending); _context.statManager().addRateData("bwLimiter.pendingOutboundRequests", pending, pending);
return req;
} }
void setInboundBurstKBps(int kbytesPerSecond) { void setInboundBurstKBps(int kbytesPerSecond) {
@ -189,7 +203,7 @@ public class FIFOBandwidthLimiter {
* @param maxBurstIn allow up to this many bytes in from the burst section for this time period (may be negative) * @param maxBurstIn allow up to this many bytes in from the burst section for this time period (may be negative)
* @param maxBurstOut allow up to this many bytes in from the burst section for this time period (may be negative) * @param maxBurstOut allow up to this many bytes in from the burst section for this time period (may be negative)
*/ */
final void refillBandwidthQueues(long bytesInbound, long bytesOutbound, long maxBurstIn, long maxBurstOut) { final void refillBandwidthQueues(List buf, long bytesInbound, long bytesOutbound, long maxBurstIn, long maxBurstOut) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Refilling the queues with " + bytesInbound + "/" + bytesOutbound + ": " + getStatus().toString()); _log.debug("Refilling the queues with " + bytesInbound + "/" + bytesOutbound + ": " + getStatus().toString());
_availableInbound += bytesInbound; _availableInbound += bytesInbound;
@ -251,7 +265,7 @@ public class FIFOBandwidthLimiter {
} }
} }
satisfyRequests(); satisfyRequests(buf);
updateStats(); updateStats();
} }
@ -292,19 +306,20 @@ public class FIFOBandwidthLimiter {
* Go through the queue, satisfying as many requests as possible (notifying * Go through the queue, satisfying as many requests as possible (notifying
* each one satisfied that the request has been granted). * each one satisfied that the request has been granted).
*/ */
private final void satisfyRequests() { private final void satisfyRequests(List buffer) {
satisfyInboundRequests(); buffer.clear();
satisfyOutboundRequests(); satisfyInboundRequests(buffer);
buffer.clear();
satisfyOutboundRequests(buffer);
} }
private final void satisfyInboundRequests() { private final void satisfyInboundRequests(List satisfied) {
List satisfied = null;
synchronized (_pendingInboundRequests) { synchronized (_pendingInboundRequests) {
if (_inboundUnlimited) { if (_inboundUnlimited) {
satisfied = locked_satisfyInboundUnlimited(); locked_satisfyInboundUnlimited(satisfied);
} else { } else {
if (_availableInbound > 0) { if (_availableInbound > 0) {
satisfied = locked_satisfyInboundAvailable(); locked_satisfyInboundAvailable(satisfied);
} else { } else {
// no bandwidth available // no bandwidth available
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -317,8 +332,8 @@ public class FIFOBandwidthLimiter {
if (satisfied != null) { if (satisfied != null) {
for (int i = 0; i < satisfied.size(); i++) { for (int i = 0; i < satisfied.size(); i++) {
SimpleRequest req = (SimpleRequest)satisfied.get(i); SimpleRequest creq = (SimpleRequest)satisfied.get(i);
req.notifyAllocation(); creq.notifyAllocation();
} }
} }
} }
@ -353,16 +368,12 @@ public class FIFOBandwidthLimiter {
* There are no limits, so just give every inbound request whatever they want * There are no limits, so just give every inbound request whatever they want
* *
*/ */
private final List locked_satisfyInboundUnlimited() { private final void locked_satisfyInboundUnlimited(List satisfied) {
List satisfied = null;
while (_pendingInboundRequests.size() > 0) { while (_pendingInboundRequests.size() > 0) {
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.remove(0); SimpleRequest req = (SimpleRequest)_pendingInboundRequests.remove(0);
int allocated = req.getPendingInboundRequested(); int allocated = req.getPendingInboundRequested();
_totalAllocatedInboundBytes += allocated; _totalAllocatedInboundBytes += allocated;
req.allocateBytes(allocated, 0); req.allocateBytes(allocated, 0);
if (satisfied == null)
satisfied = new ArrayList(2);
satisfied.add(req); satisfied.add(req);
long waited = now() - req.getRequestTime(); long waited = now() - req.getRequestTime();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -373,7 +384,6 @@ public class FIFOBandwidthLimiter {
if (waited > 10) if (waited > 10)
_context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited); _context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited);
} }
return satisfied;
} }
/** /**
@ -383,9 +393,7 @@ public class FIFOBandwidthLimiter {
* *
* @return list of requests that were completely satisfied * @return list of requests that were completely satisfied
*/ */
private final List locked_satisfyInboundAvailable() { private final void locked_satisfyInboundAvailable(List satisfied) {
List satisfied = null;
for (int i = 0; i < _pendingInboundRequests.size(); i++) { for (int i = 0; i < _pendingInboundRequests.size(); i++) {
if (_availableInbound <= 0) break; if (_availableInbound <= 0) break;
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(i); SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(i);
@ -418,8 +426,6 @@ public class FIFOBandwidthLimiter {
_availableInbound -= allocated; _availableInbound -= allocated;
_totalAllocatedInboundBytes += allocated; _totalAllocatedInboundBytes += allocated;
req.allocateBytes(allocated, 0); req.allocateBytes(allocated, 0);
if (satisfied == null)
satisfied = new ArrayList(2);
satisfied.add(req); satisfied.add(req);
if (req.getPendingInboundRequested() > 0) { if (req.getPendingInboundRequested() > 0) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -443,17 +449,15 @@ public class FIFOBandwidthLimiter {
_context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited); _context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited);
} }
} }
return satisfied;
} }
private final void satisfyOutboundRequests() { private final void satisfyOutboundRequests(List satisfied) {
List satisfied = null;
synchronized (_pendingOutboundRequests) { synchronized (_pendingOutboundRequests) {
if (_outboundUnlimited) { if (_outboundUnlimited) {
satisfied = locked_satisfyOutboundUnlimited(); locked_satisfyOutboundUnlimited(satisfied);
} else { } else {
if (_availableOutbound > 0) { if (_availableOutbound > 0) {
satisfied = locked_satisfyOutboundAvailable(); locked_satisfyOutboundAvailable(satisfied);
} else { } else {
// no bandwidth available // no bandwidth available
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -466,8 +470,8 @@ public class FIFOBandwidthLimiter {
if (satisfied != null) { if (satisfied != null) {
for (int i = 0; i < satisfied.size(); i++) { for (int i = 0; i < satisfied.size(); i++) {
SimpleRequest req = (SimpleRequest)satisfied.get(i); SimpleRequest creq = (SimpleRequest)satisfied.get(i);
req.notifyAllocation(); creq.notifyAllocation();
} }
} }
} }
@ -476,16 +480,12 @@ public class FIFOBandwidthLimiter {
* There are no limits, so just give every outbound request whatever they want * There are no limits, so just give every outbound request whatever they want
* *
*/ */
private final List locked_satisfyOutboundUnlimited() { private final void locked_satisfyOutboundUnlimited(List satisfied) {
List satisfied = null;
while (_pendingOutboundRequests.size() > 0) { while (_pendingOutboundRequests.size() > 0) {
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.remove(0); SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.remove(0);
int allocated = req.getPendingOutboundRequested(); int allocated = req.getPendingOutboundRequested();
_totalAllocatedOutboundBytes += allocated; _totalAllocatedOutboundBytes += allocated;
req.allocateBytes(0, allocated); req.allocateBytes(0, allocated);
if (satisfied == null)
satisfied = new ArrayList(2);
satisfied.add(req); satisfied.add(req);
long waited = now() - req.getRequestTime(); long waited = now() - req.getRequestTime();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -497,7 +497,6 @@ public class FIFOBandwidthLimiter {
if (waited > 10) if (waited > 10)
_context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited); _context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited);
} }
return satisfied;
} }
/** /**
@ -507,9 +506,7 @@ public class FIFOBandwidthLimiter {
* *
* @return list of requests that were completely satisfied * @return list of requests that were completely satisfied
*/ */
private final List locked_satisfyOutboundAvailable() { private final void locked_satisfyOutboundAvailable(List satisfied) {
List satisfied = null;
for (int i = 0; i < _pendingOutboundRequests.size(); i++) { for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
if (_availableOutbound <= 0) break; if (_availableOutbound <= 0) break;
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(i); SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(i);
@ -542,8 +539,6 @@ public class FIFOBandwidthLimiter {
_availableOutbound -= allocated; _availableOutbound -= allocated;
_totalAllocatedOutboundBytes += allocated; _totalAllocatedOutboundBytes += allocated;
req.allocateBytes(0, allocated); req.allocateBytes(0, allocated);
if (satisfied == null)
satisfied = new ArrayList(2);
satisfied.add(req); satisfied.add(req);
if (req.getPendingOutboundRequested() > 0) { if (req.getPendingOutboundRequested() > 0) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -567,7 +562,6 @@ public class FIFOBandwidthLimiter {
_context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited); _context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited);
} }
} }
return satisfied;
} }
public void renderStatusHTML(Writer out) throws IOException { public void renderStatusHTML(Writer out) throws IOException {
@ -613,14 +607,24 @@ public class FIFOBandwidthLimiter {
private String _target; private String _target;
private int _allocationsSinceWait; private int _allocationsSinceWait;
private boolean _aborted; private boolean _aborted;
List satisfiedBuffer;
public SimpleRequest() {
satisfiedBuffer = new ArrayList(1);
init(0, 0, null);
}
public SimpleRequest(int in, int out, String target) { public SimpleRequest(int in, int out, String target) {
satisfiedBuffer = new ArrayList(1);
init(in, out, target);
}
public void init(int in, int out, String target) {
_inTotal = in; _inTotal = in;
_outTotal = out; _outTotal = out;
_inAllocated = 0; _inAllocated = 0;
_outAllocated = 0; _outAllocated = 0;
_aborted = false; _aborted = false;
_target = target; _target = target;
satisfiedBuffer.clear();
_requestId = ++__requestId; _requestId = ++__requestId;
_requestTime = now(); _requestTime = now();
} }
@ -646,6 +650,13 @@ public class FIFOBandwidthLimiter {
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
int getAllocationsSinceWait() { return _allocationsSinceWait; } int getAllocationsSinceWait() { return _allocationsSinceWait; }
void allocateAll() {
_inAllocated = _inTotal;
_outAllocated = _outTotal;
_outAllocated = _outTotal;
_allocationsSinceWait++;
notifyAllocation();
}
void allocateBytes(int in, int out) { void allocateBytes(int in, int out) {
_inAllocated += in; _inAllocated += in;
_outAllocated += out; _outAllocated += out;
@ -677,7 +688,8 @@ public class FIFOBandwidthLimiter {
public void abort(); public void abort();
/** was this request aborted? */ /** was this request aborted? */
public boolean getAborted(); public boolean getAborted();
/** thar be dragons */
public void init(int in, int out, String target);
} }
private static final NoopRequest _noop = new NoopRequest(); private static final NoopRequest _noop = new NoopRequest();
@ -691,5 +703,6 @@ public class FIFOBandwidthLimiter {
public int getTotalInboundRequested() { return 0; } public int getTotalInboundRequested() { return 0; }
public int getTotalOutboundRequested() { return 0; } public int getTotalOutboundRequested() { return 0; }
public void waitForNextAllocation() {} public void waitForNextAllocation() {}
public void init(int in, int out, String target) {}
} }
} }

View File

@ -1,5 +1,6 @@
package net.i2p.router.transport; package net.i2p.router.transport;
import java.util.*;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -62,6 +63,7 @@ class FIFOBandwidthRefiller implements Runnable {
public void run() { public void run() {
// bootstrap 'em with nothing // bootstrap 'em with nothing
_lastRefillTime = _limiter.now(); _lastRefillTime = _limiter.now();
List buffer = new ArrayList(2);
while (true) { while (true) {
long now = _limiter.now(); long now = _limiter.now();
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) { if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
@ -70,7 +72,7 @@ class FIFOBandwidthRefiller implements Runnable {
_lastCheckConfigTime = now; _lastCheckConfigTime = now;
} }
boolean updated = updateQueues(now); boolean updated = updateQueues(buffer, now);
if (updated) { if (updated) {
_lastRefillTime = now; _lastRefillTime = now;
} }
@ -85,7 +87,7 @@ class FIFOBandwidthRefiller implements Runnable {
_lastCheckConfigTime = _lastRefillTime; _lastCheckConfigTime = _lastRefillTime;
} }
private boolean updateQueues(long now) { private boolean updateQueues(List buffer, long now) {
long numMs = (now - _lastRefillTime); long numMs = (now - _lastRefillTime);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Updating bandwidth after " + numMs + " (status: " + _limiter.getStatus().toString() _log.info("Updating bandwidth after " + numMs + " (status: " + _limiter.getStatus().toString()
@ -114,7 +116,7 @@ class FIFOBandwidthRefiller implements Runnable {
long maxBurstIn = ((_inboundBurstKBytesPerSecond-_inboundKBytesPerSecond)*1024*numMs)/1000; long maxBurstIn = ((_inboundBurstKBytesPerSecond-_inboundKBytesPerSecond)*1024*numMs)/1000;
long maxBurstOut = ((_outboundBurstKBytesPerSecond-_outboundKBytesPerSecond)*1024*numMs)/1000; long maxBurstOut = ((_outboundBurstKBytesPerSecond-_outboundKBytesPerSecond)*1024*numMs)/1000;
_limiter.refillBandwidthQueues(inboundToAdd, outboundToAdd, maxBurstIn, maxBurstOut); _limiter.refillBandwidthQueues(buffer, inboundToAdd, outboundToAdd, maxBurstIn, maxBurstOut);
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable"); _log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable");
@ -331,4 +333,4 @@ class FIFOBandwidthRefiller implements Runnable {
int getOutboundKBytesPerSecond() { return _outboundKBytesPerSecond; } int getOutboundKBytesPerSecond() { return _outboundKBytesPerSecond; }
int getInboundKBytesPerSecond() { return _inboundKBytesPerSecond; } int getInboundKBytesPerSecond() { return _inboundKBytesPerSecond; }
} }

View File

@ -140,12 +140,12 @@ public abstract class TransportImpl implements Transport {
long lifetime = msg.getLifetime(); long lifetime = msg.getLifetime();
if (lifetime > 3000) { if (lifetime > 3000) {
int level = Log.WARN; int level = Log.WARN;
//if (!sendSuccessful) if (!sendSuccessful)
// level = Log.INFO; level = Log.INFO;
if (_log.shouldLog(level)) if (_log.shouldLog(level))
_log.log(level, "afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " _log.log(level, "afterSend slow (" + lifetime + "): [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6)
+ " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + "\n" + msg.toString()); + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + ": " + msg.toString());
} else { } else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " _log.info("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "

View File

@ -125,8 +125,6 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
boolean fragmentOK = false; boolean fragmentOK = false;
boolean partialACK = false; boolean partialACK = false;
// perhaps compact the synchronized block further by synchronizing on the
// particular state once its found?
synchronized (messages) { synchronized (messages) {
state = (InboundMessageState)messages.get(messageId); state = (InboundMessageState)messages.get(messageId);
if (state == null) { if (state == null) {
@ -145,36 +143,36 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
} else { } else {
partialACK = true; partialACK = true;
} }
if (messageComplete) {
_recentlyCompletedMessages.add(mid);
_messageReceiver.receiveMessage(state);
from.messageFullyReceived(messageId, state.getCompleteSize());
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.INFO))
_log.info("Message received completely! " + state);
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
if (state.getFragmentCount() > 0)
_context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
} else if (messageExpired) {
state.releaseResources();
if (_log.shouldLog(Log.WARN))
_log.warn("Message expired while only being partially read: " + state);
_context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired hile partially read: " + state.toString());
} else if (partialACK) {
// not expired but not yet complete... lets queue up a partial ACK
if (_log.shouldLog(Log.DEBUG))
_log.debug("Queueing up a partial ACK for peer: " + from + " for " + state);
from.messagePartiallyReceived();
_ackSender.ackPeer(from);
}
if (!fragmentOK)
break;
} }
if (messageComplete) {
_recentlyCompletedMessages.add(mid);
_messageReceiver.receiveMessage(state);
from.messageFullyReceived(messageId, state.getCompleteSize());
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.INFO))
_log.info("Message received completely! " + state);
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
if (state.getFragmentCount() > 0)
_context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
} else if (messageExpired) {
state.releaseResources();
if (_log.shouldLog(Log.WARN))
_log.warn("Message expired while only being partially read: " + state);
_context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired hile partially read: " + state.toString());
} else if (partialACK) {
// not expired but not yet complete... lets queue up a partial ACK
if (_log.shouldLog(Log.DEBUG))
_log.debug("Queueing up a partial ACK for peer: " + from + " for " + state);
from.messagePartiallyReceived();
_ackSender.ackPeer(from);
}
if (!fragmentOK)
break;
} }
return fragments; return fragments;
} }
@ -183,16 +181,17 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
int rv = 0; int rv = 0;
if (data.readACKsIncluded()) { if (data.readACKsIncluded()) {
int fragments = 0; int fragments = 0;
long acks[] = data.readACKs(); int ackCount = data.readACKCount();
if (acks != null) { if (ackCount > 0) {
rv += acks.length; rv += ackCount;
_context.statManager().addRateData("udp.receivedACKs", acks.length, 0); _context.statManager().addRateData("udp.receivedACKs", ackCount, 0);
//_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receiveACKCount", acks.length, 0); //_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receiveACKCount", acks.length, 0);
for (int i = 0; i < acks.length; i++) { for (int i = 0; i < ackCount; i++) {
long id = data.readACK(i);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Full ACK of message " + acks[i] + " received!"); _log.info("Full ACK of message " + id + " received!");
fragments += _outbound.acked(acks[i], from.getRemotePeer()); fragments += _outbound.acked(id, from.getRemotePeer());
} }
} else { } else {
_log.error("Received ACKs with no acks?! " + data); _log.error("Received ACKs with no acks?! " + data);

View File

@ -202,14 +202,16 @@ public class OutboundMessageFragments {
*/ */
private void finishMessages() { private void finishMessages() {
int rv = 0; int rv = 0;
List peers = new ArrayList(); List peers = null;
synchronized (_activePeers) { synchronized (_activePeers) {
peers = new ArrayList(_activePeers); peers = new ArrayList(_activePeers.size());
for (int i = 0; i < _activePeers.size(); i++) { for (int i = 0; i < _activePeers.size(); i++) {
PeerState state = (PeerState)_activePeers.get(i); PeerState state = (PeerState)_activePeers.get(i);
if (state.getOutboundMessageCount() <= 0) { if (state.getOutboundMessageCount() <= 0) {
_activePeers.remove(i); _activePeers.remove(i);
i--; i--;
} else {
peers.add(state);
} }
} }
_activePeers.notifyAll(); _activePeers.notifyAll();
@ -297,11 +299,13 @@ public class OutboundMessageFragments {
for (int i = 0; packets != null && i < packets.length ; i++) for (int i = 0; packets != null && i < packets.length ; i++)
if (packets[i] != null) if (packets[i] != null)
valid++; valid++;
/*
state.getMessage().timestamp("sending a volley of " + valid state.getMessage().timestamp("sending a volley of " + valid
+ " lastReceived: " + " lastReceived: "
+ (_context.clock().now() - peer.getLastReceiveTime()) + (_context.clock().now() - peer.getLastReceiveTime())
+ " lastSentFully: " + " lastSentFully: "
+ (_context.clock().now() - peer.getLastSendFullyTime())); + (_context.clock().now() - peer.getLastSendFullyTime()));
*/
} }
return packets; return packets;
} }

View File

@ -56,7 +56,7 @@ public class PacketBuilder {
* included, it should be removed from the list. * included, it should be removed from the list.
*/ */
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) { public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
StringBuffer msg = null; StringBuffer msg = null;
boolean acksIncluded = false; boolean acksIncluded = false;
@ -156,8 +156,10 @@ public class PacketBuilder {
off++; off++;
int size = state.fragmentSize(fragment); int size = state.fragmentSize(fragment);
if (size < 0) if (size < 0) {
packet.release();
return null; return null;
}
DataHelper.toLong(data, off, 2, size); DataHelper.toLong(data, off, 2, size);
data[off] &= (byte)0x3F; // 2 highest bits are reserved data[off] &= (byte)0x3F; // 2 highest bits are reserved
off += 2; off += 2;
@ -166,12 +168,16 @@ public class PacketBuilder {
if (sizeWritten != size) { if (sizeWritten != size) {
_log.error("Size written: " + sizeWritten + " but size: " + size _log.error("Size written: " + sizeWritten + " but size: " + size
+ " for fragment " + fragment + " of " + state.getMessageId()); + " for fragment " + fragment + " of " + state.getMessageId());
packet.release();
return null; return null;
} else if (_log.shouldLog(Log.DEBUG)) } else if (_log.shouldLog(Log.DEBUG))
_log.debug("Size written: " + sizeWritten + " for fragment " + fragment _log.debug("Size written: " + sizeWritten + " for fragment " + fragment
+ " of " + state.getMessageId()); + " of " + state.getMessageId());
size = sizeWritten; size = sizeWritten;
if (size < 0) return null; if (size < 0) {
packet.release();
return null;
}
off += size; off += size;
// we can pad here if we want, maybe randomized? // we can pad here if we want, maybe randomized?
@ -202,7 +208,7 @@ public class PacketBuilder {
* @param ackBitfields list of ACKBitfield instances to either fully or partially ACK * @param ackBitfields list of ACKBitfield instances to either fully or partially ACK
*/ */
public UDPPacket buildACK(PeerState peer, List ackBitfields) { public UDPPacket buildACK(PeerState peer, List ackBitfields) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
StringBuffer msg = null; StringBuffer msg = null;
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
@ -308,7 +314,7 @@ public class PacketBuilder {
* @return ready to send packet, or null if there was a problem * @return ready to send packet, or null if there was a problem
*/ */
public UDPPacket buildSessionCreatedPacket(InboundEstablishState state, int externalPort, SessionKey ourIntroKey) { public UDPPacket buildSessionCreatedPacket(InboundEstablishState state, int externalPort, SessionKey ourIntroKey) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
InetAddress to = null; InetAddress to = null;
try { try {
@ -316,6 +322,7 @@ public class PacketBuilder {
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString()); _log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString());
packet.release();
return null; return null;
} }
@ -337,6 +344,7 @@ public class PacketBuilder {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("How did our sent IP become invalid? " + state); _log.error("How did our sent IP become invalid? " + state);
state.fail(); state.fail();
packet.release();
return null; return null;
} }
// now for the body // now for the body
@ -408,9 +416,10 @@ public class PacketBuilder {
* @return ready to send packet, or null if there was a problem * @return ready to send packet, or null if there was a problem
*/ */
public UDPPacket buildSessionRequestPacket(OutboundEstablishState state) { public UDPPacket buildSessionRequestPacket(OutboundEstablishState state) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
byte toIP[] = state.getSentIP(); byte toIP[] = state.getSentIP();
if ( (_transport !=null) && (!_transport.isValid(toIP)) ) { if ( (_transport !=null) && (!_transport.isValid(toIP)) ) {
packet.release();
return null; return null;
} }
InetAddress to = null; InetAddress to = null;
@ -419,6 +428,7 @@ public class PacketBuilder {
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString()); _log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString());
packet.release();
return null; return null;
} }
@ -488,13 +498,14 @@ public class PacketBuilder {
* @return ready to send packets, or null if there was a problem * @return ready to send packets, or null if there was a problem
*/ */
public UDPPacket buildSessionConfirmedPacket(OutboundEstablishState state, int fragmentNum, int numFragments, byte identity[]) { public UDPPacket buildSessionConfirmedPacket(OutboundEstablishState state, int fragmentNum, int numFragments, byte identity[]) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
InetAddress to = null; InetAddress to = null;
try { try {
to = InetAddress.getByAddress(state.getSentIP()); to = InetAddress.getByAddress(state.getSentIP());
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString()); _log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString());
packet.release();
return null; return null;
} }
@ -578,7 +589,7 @@ public class PacketBuilder {
return buildPeerTestFromAlice(toIP, toPort, toIntroKey, toIntroKey, nonce, aliceIntroKey); return buildPeerTestFromAlice(toIP, toPort, toIntroKey, toIntroKey, nonce, aliceIntroKey);
} }
public UDPPacket buildPeerTestFromAlice(InetAddress toIP, int toPort, SessionKey toCipherKey, SessionKey toMACKey, long nonce, SessionKey aliceIntroKey) { public UDPPacket buildPeerTestFromAlice(InetAddress toIP, int toPort, SessionKey toCipherKey, SessionKey toMACKey, long nonce, SessionKey aliceIntroKey) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0); Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
@ -619,7 +630,7 @@ public class PacketBuilder {
* @return ready to send packet, or null if there was a problem * @return ready to send packet, or null if there was a problem
*/ */
public UDPPacket buildPeerTestToAlice(InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, SessionKey charlieIntroKey, long nonce) { public UDPPacket buildPeerTestToAlice(InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, SessionKey charlieIntroKey, long nonce) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0); Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
@ -665,7 +676,7 @@ public class PacketBuilder {
public UDPPacket buildPeerTestToCharlie(InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, long nonce, public UDPPacket buildPeerTestToCharlie(InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, long nonce,
InetAddress charlieIP, int charliePort, InetAddress charlieIP, int charliePort,
SessionKey charlieCipherKey, SessionKey charlieMACKey) { SessionKey charlieCipherKey, SessionKey charlieMACKey) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0); Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
@ -709,7 +720,7 @@ public class PacketBuilder {
* @return ready to send packet, or null if there was a problem * @return ready to send packet, or null if there was a problem
*/ */
public UDPPacket buildPeerTestToBob(InetAddress bobIP, int bobPort, InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, long nonce, SessionKey bobCipherKey, SessionKey bobMACKey) { public UDPPacket buildPeerTestToBob(InetAddress bobIP, int bobPort, InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, long nonce, SessionKey bobCipherKey, SessionKey bobMACKey) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0); Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
@ -783,7 +794,7 @@ public class PacketBuilder {
} }
public UDPPacket buildRelayRequest(InetAddress introHost, int introPort, byte introKey[], long introTag, SessionKey ourIntroKey, long introNonce, boolean encrypt) { public UDPPacket buildRelayRequest(InetAddress introHost, int introPort, byte introKey[], long introTag, SessionKey ourIntroKey, long introNonce, boolean encrypt) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0); Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
@ -851,7 +862,7 @@ public class PacketBuilder {
private static final byte PEER_RELAY_INTRO_FLAG_BYTE = (UDPPacket.PAYLOAD_TYPE_RELAY_INTRO << 4); private static final byte PEER_RELAY_INTRO_FLAG_BYTE = (UDPPacket.PAYLOAD_TYPE_RELAY_INTRO << 4);
public UDPPacket buildRelayIntro(RemoteHostId alice, PeerState charlie, UDPPacketReader.RelayRequestReader request) { public UDPPacket buildRelayIntro(RemoteHostId alice, PeerState charlie, UDPPacketReader.RelayRequestReader request) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0); Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
@ -907,7 +918,7 @@ public class PacketBuilder {
return null; return null;
} }
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0); Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
@ -954,7 +965,7 @@ public class PacketBuilder {
} }
public UDPPacket buildHolePunch(UDPPacketReader reader) { public UDPPacket buildHolePunch(UDPPacketReader reader) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, false);
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0); Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
@ -970,6 +981,7 @@ public class PacketBuilder {
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("IP for alice to hole punch to is invalid", uhe); _log.warn("IP for alice to hole punch to is invalid", uhe);
packet.release();
return null; return null;
} }

View File

@ -167,10 +167,10 @@ public class PeerState {
private long _packetsReceivedDuplicate; private long _packetsReceivedDuplicate;
private long _packetsReceived; private long _packetsReceived;
/** Message (Long) to InboundMessageState for active message */ /** list of InboundMessageState for active message */
private Map _inboundMessages; private Map _inboundMessages;
/** Message (Long) to OutboundMessageState */ /** list of OutboundMessageState */
private Map _outboundMessages; private List _outboundMessages;
/** which outbound message is currently being retransmitted */ /** which outbound message is currently being retransmitted */
private OutboundMessageState _retransmitter; private OutboundMessageState _retransmitter;
@ -262,7 +262,7 @@ public class PeerState {
_packetsReceived = 0; _packetsReceived = 0;
_packetsReceivedDuplicate = 0; _packetsReceivedDuplicate = 0;
_inboundMessages = new HashMap(8); _inboundMessages = new HashMap(8);
_outboundMessages = new HashMap(8); _outboundMessages = new ArrayList(32);
_dead = false; _dead = false;
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
@ -603,7 +603,8 @@ public class PeerState {
int rv = 0; int rv = 0;
synchronized (_inboundMessages) { synchronized (_inboundMessages) {
for (Iterator iter = _inboundMessages.values().iterator(); iter.hasNext(); ) { int remaining = _inboundMessages.size();
for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) {
InboundMessageState state = (InboundMessageState)iter.next(); InboundMessageState state = (InboundMessageState)iter.next();
if (state.isExpired()) { if (state.isExpired()) {
iter.remove(); iter.remove();
@ -687,7 +688,7 @@ public class PeerState {
List rv = null; List rv = null;
int bytesRemaining = countMaxACKData(); int bytesRemaining = countMaxACKData();
synchronized (_currentACKs) { synchronized (_currentACKs) {
rv = new ArrayList(_currentACKs.size()); rv = new ArrayList(16); //_currentACKs.size());
int oldIndex = _currentACKsResend.size(); int oldIndex = _currentACKsResend.size();
while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) { while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) {
Long val = (Long)_currentACKs.remove(0); Long val = (Long)_currentACKs.remove(0);
@ -701,7 +702,10 @@ public class PeerState {
if (alwaysIncludeRetransmissions || rv.size() > 0) { if (alwaysIncludeRetransmissions || rv.size() > 0) {
// now repeat by putting in some old ACKs // now repeat by putting in some old ACKs
for (int i = 0; (i < oldIndex) && (bytesRemaining >= 4); i++) { for (int i = 0; (i < oldIndex) && (bytesRemaining >= 4); i++) {
rv.add(new FullACKBitfield(((Long)_currentACKsResend.get(i)).longValue())); Long cur = (Long)_currentACKsResend.get(i);
long c = cur.longValue();
FullACKBitfield bf = new FullACKBitfield(c);
rv.add(bf);
bytesRemaining -= 4; bytesRemaining -= 4;
} }
} }
@ -749,7 +753,9 @@ public class PeerState {
int numMessages = _inboundMessages.size(); int numMessages = _inboundMessages.size();
if (numMessages <= 0) if (numMessages <= 0)
return; return;
for (Iterator iter = _inboundMessages.values().iterator(); iter.hasNext(); ) { // todo: make this a list instead of a map, so we can iterate faster w/out the memory overhead?
int remaining = _inboundMessages.size();
for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) {
InboundMessageState state = (InboundMessageState)iter.next(); InboundMessageState state = (InboundMessageState)iter.next();
if (state.isExpired()) { if (state.isExpired()) {
//if (_context instanceof RouterContext) //if (_context instanceof RouterContext)
@ -974,10 +980,10 @@ public class PeerState {
state.setPeer(this); state.setPeer(this);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
Map msgs = _outboundMessages; List msgs = _outboundMessages;
if (msgs == null) return 0; if (msgs == null) return 0;
synchronized (msgs) { synchronized (msgs) {
msgs.put(new Long(state.getMessageId()), state); msgs.add(state);
return msgs.size(); return msgs.size();
} }
} }
@ -985,20 +991,23 @@ public class PeerState {
public void dropOutbound() { public void dropOutbound() {
if (_dead) return; if (_dead) return;
_dead = true; _dead = true;
Map msgs = _outboundMessages; List msgs = _outboundMessages;
//_outboundMessages = null; //_outboundMessages = null;
_retransmitter = null; _retransmitter = null;
if (msgs != null) { if (msgs != null) {
List tempList = null;
synchronized (msgs) { synchronized (msgs) {
for (Iterator iter = msgs.values().iterator(); iter.hasNext();) tempList = new ArrayList(msgs);
_transport.failed((OutboundMessageState)iter.next());
msgs.clear(); msgs.clear();
} }
int sz = tempList.size();
for (int i = 0; i < sz; i++)
_transport.failed((OutboundMessageState)tempList.get(i));
} }
} }
public int getOutboundMessageCount() { public int getOutboundMessageCount() {
Map msgs = _outboundMessages; List msgs = _outboundMessages;
if (_dead) return 0; if (_dead) return 0;
if (msgs != null) { if (msgs != null) {
synchronized (msgs) { synchronized (msgs) {
@ -1015,29 +1024,35 @@ public class PeerState {
*/ */
public int finishMessages() { public int finishMessages() {
int rv = 0; int rv = 0;
Map msgs = _outboundMessages; List msgs = _outboundMessages;
if (_dead) return 0; if (_dead) return 0;
List succeeded = null; List succeeded = null;
List failed = null; List failed = null;
synchronized (msgs) { synchronized (msgs) {
for (Iterator iter = msgs.keySet().iterator(); iter.hasNext(); ) { int size = msgs.size();
Long id = (Long)iter.next(); for (int i = 0; i < size; i++) {
OutboundMessageState state = (OutboundMessageState)msgs.get(id); OutboundMessageState state = (OutboundMessageState)msgs.get(i);
if (state.isComplete()) { if (state.isComplete()) {
iter.remove(); msgs.remove(i);
i--;
size--;
if (_retransmitter == state) if (_retransmitter == state)
_retransmitter = null; _retransmitter = null;
if (succeeded == null) succeeded = new ArrayList(4); if (succeeded == null) succeeded = new ArrayList(4);
succeeded.add(state); succeeded.add(state);
} else if (state.isExpired()) { } else if (state.isExpired()) {
iter.remove(); msgs.remove(i);
i--;
size--;
if (_retransmitter == state) if (_retransmitter == state)
_retransmitter = null; _retransmitter = null;
_context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime());
if (failed == null) failed = new ArrayList(4); if (failed == null) failed = new ArrayList(4);
failed.add(state); failed.add(state);
} else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) { } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
iter.remove(); msgs.remove(i);
i--;
size--;
if (state == _retransmitter) if (state == _retransmitter)
_retransmitter = null; _retransmitter = null;
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime());
@ -1082,11 +1097,12 @@ public class PeerState {
*/ */
public OutboundMessageState allocateSend() { public OutboundMessageState allocateSend() {
int total = 0; int total = 0;
Map msgs = _outboundMessages; List msgs = _outboundMessages;
if (_dead) return null; if (_dead) return null;
synchronized (msgs) { synchronized (msgs) {
for (Iterator iter = msgs.values().iterator(); iter.hasNext(); ) { int size = msgs.size();
OutboundMessageState state = (OutboundMessageState)iter.next(); for (int i = 0; i < size; i++) {
OutboundMessageState state = (OutboundMessageState)msgs.get(i);
if (locked_shouldSend(state)) { if (locked_shouldSend(state)) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId()); _log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId());
@ -1099,11 +1115,11 @@ public class PeerState {
} }
*/ */
return state; return state;
} else { } /* else {
OutNetMessage msg = state.getMessage(); OutNetMessage msg = state.getMessage();
if (msg != null) if (msg != null)
msg.timestamp("passed over for allocation with " + msgs.size() + " peers"); msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
} } */
} }
total = msgs.size(); total = msgs.size();
} }
@ -1118,7 +1134,7 @@ public class PeerState {
public int getNextDelay() { public int getNextDelay() {
int rv = -1; int rv = -1;
long now = _context.clock().now(); long now = _context.clock().now();
Map msgs = _outboundMessages; List msgs = _outboundMessages;
if (_dead) return -1; if (_dead) return -1;
synchronized (msgs) { synchronized (msgs) {
if (_retransmitter != null) { if (_retransmitter != null) {
@ -1128,8 +1144,9 @@ public class PeerState {
else else
return rv; return rv;
} }
for (Iterator iter = msgs.values().iterator(); iter.hasNext(); ) { int size = msgs.size();
OutboundMessageState state = (OutboundMessageState)iter.next(); for (int i = 0; i < size; i++) {
OutboundMessageState state = (OutboundMessageState)msgs.get(i);
int delay = (int)(state.getNextSendTime() - now); int delay = (int)(state.getNextSendTime() - now);
if (delay <= 0) if (delay <= 0)
delay = 1; delay = 1;
@ -1140,7 +1157,6 @@ public class PeerState {
return rv; return rv;
} }
/** /**
* If set to true, we should throttle retransmissions of all but the first message in * If set to true, we should throttle retransmissions of all but the first message in
* flight to a peer. If set to false, we will only throttle the initial flight of a * flight to a peer. If set to false, we will only throttle the initial flight of a
@ -1182,17 +1198,18 @@ public class PeerState {
if ( (_retransmitter != null) && (_retransmitter != state) ) { if ( (_retransmitter != null) && (_retransmitter != state) ) {
// choke it, since there's already another message retransmitting to this // choke it, since there's already another message retransmitting to this
// peer. // peer.
_context.statManager().addRateData("udp.blockedRetransmissions", getPacketsRetransmitted(), getPacketsTransmitted()); _context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted, _packetsTransmitted);
if ( (state.getMaxSends() <= 0) && (!THROTTLE_INITIAL_SEND) ) { int max = state.getMaxSends();
if (state.getMessage() != null) if ( (max <= 0) && (!THROTTLE_INITIAL_SEND) ) {
state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley..."); //if (state.getMessage() != null)
} else if ( (state.getMaxSends() <= 0) || (THROTTLE_RESENDS) ) { // state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley...");
if (state.getMessage() != null) } else if ( (max <= 0) || (THROTTLE_RESENDS) ) {
state.getMessage().timestamp("choked, with another message retransmitting"); //if (state.getMessage() != null)
// state.getMessage().timestamp("choked, with another message retransmitting");
return false; return false;
} else { } else {
if (state.getMessage() != null) //if (state.getMessage() != null)
state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending..."); // state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");
} }
} }
@ -1218,8 +1235,8 @@ public class PeerState {
return true; return true;
} else { } else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
if (state.getMessage() != null) //if (state.getMessage() != null)
state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining()); // state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() _log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
+ " available=" + getSendWindowBytesRemaining() + " available=" + getSendWindowBytesRemaining()
@ -1229,10 +1246,10 @@ public class PeerState {
_log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); _log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
//_throttle.choke(peer.getRemotePeer()); //_throttle.choke(peer.getRemotePeer());
if (state.getMessage() != null) //if (state.getMessage() != null)
state.getMessage().timestamp("choked, not enough available, wsize=" // state.getMessage().timestamp("choked, not enough available, wsize="
+ getSendWindowBytes() + " available=" // + getSendWindowBytes() + " available="
+ getSendWindowBytesRemaining()); // + getSendWindowBytesRemaining());
return false; return false;
} }
} // nextTime <= now } // nextTime <= now
@ -1242,23 +1259,32 @@ public class PeerState {
public int acked(long messageId) { public int acked(long messageId) {
OutboundMessageState state = null; OutboundMessageState state = null;
Map msgs = _outboundMessages; List msgs = _outboundMessages;
if (_dead) return 0; if (_dead) return 0;
synchronized (msgs) { synchronized (msgs) {
state = (OutboundMessageState)msgs.remove(new Long(messageId)); int sz = msgs.size();
for (int i = 0; i < sz; i++) {
state = (OutboundMessageState)msgs.get(i);
if (state.getMessageId() == messageId) {
msgs.remove(i);
break;
} else {
state = null;
}
}
if ( (state != null) && (state == _retransmitter) ) if ( (state != null) && (state == _retransmitter) )
_retransmitter = null; _retransmitter = null;
} }
if (state != null) { if (state != null) {
int numSends = state.getMaxSends(); int numSends = state.getMaxSends();
if (state.getMessage() != null) { //if (state.getMessage() != null) {
state.getMessage().timestamp("acked after " + numSends // state.getMessage().timestamp("acked after " + numSends
+ " lastReceived: " // + " lastReceived: "
+ (_context.clock().now() - getLastReceiveTime()) // + (_context.clock().now() - getLastReceiveTime())
+ " lastSentFully: " // + " lastSentFully: "
+ (_context.clock().now() - getLastSendFullyTime())); // + (_context.clock().now() - getLastSendFullyTime()));
} //}
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Received ack of " + messageId + " by " + _remotePeer.toBase64() _log.info("Received ack of " + messageId + " by " + _remotePeer.toBase64()
@ -1294,19 +1320,24 @@ public class PeerState {
return; return;
} }
Map msgs = _outboundMessages; List msgs = _outboundMessages;
OutboundMessageState state = null; OutboundMessageState state = null;
boolean isComplete = false; boolean isComplete = false;
synchronized (msgs) { synchronized (msgs) {
state = (OutboundMessageState)msgs.get(new Long(bitfield.getMessageId())); for (int i = 0; i < msgs.size(); i++) {
if (state != null) { state = (OutboundMessageState)msgs.get(i);
if (state.acked(bitfield)) { if (state.getMessageId() == bitfield.getMessageId()) {
// this partial ack actually clears it fully boolean complete = state.acked(bitfield);
isComplete = true; if (complete) {
msgs.remove(new Long(bitfield.getMessageId())); isComplete = true;
if (state == _retransmitter) msgs.remove(i);
_retransmitter = null; if (state == _retransmitter)
_retransmitter = null;
}
break;
} else {
state = null;
} }
} }
} }
@ -1333,8 +1364,8 @@ public class PeerState {
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
if (numSends > 1) if (numSends > 1)
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
if (state.getMessage() != null) //if (state.getMessage() != null)
state.getMessage().timestamp("partial ack to complete after " + numSends); // state.getMessage().timestamp("partial ack to complete after " + numSends);
_transport.succeeded(state); _transport.succeeded(state);
// this adjusts the rtt/rto/window/etc // this adjusts the rtt/rto/window/etc
@ -1344,8 +1375,8 @@ public class PeerState {
state.releaseResources(); state.releaseResources();
} else { } else {
if (state.getMessage() != null) //if (state.getMessage() != null)
state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString()); // state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());
} }
return; return;
} else { } else {
@ -1392,21 +1423,16 @@ public class PeerState {
msgs.clear(); msgs.clear();
OutboundMessageState retransmitter = null; OutboundMessageState retransmitter = null;
Map omsgs = oldPeer._outboundMessages; synchronized (oldPeer._outboundMessages) {
if (omsgs != null) { tmp.addAll(oldPeer._outboundMessages);
synchronized (omsgs) { oldPeer._outboundMessages.clear();
msgs.putAll(omsgs); retransmitter = oldPeer._retransmitter;
omsgs.clear();
retransmitter = oldPeer._retransmitter;
}
} }
omsgs = _outboundMessages; synchronized (_outboundMessages) {
if (omsgs != null) { _outboundMessages.addAll(tmp);
synchronized (omsgs) { _retransmitter = retransmitter;
omsgs.putAll(msgs);
_retransmitter = retransmitter;
}
} }
tmp.clear();
} }
public int hashCode() { public int hashCode() {

View File

@ -108,9 +108,10 @@ public class UDPEndpointTest {
curPeer = 0; curPeer = 0;
short priority = 1; short priority = 1;
long expiration = -1; long expiration = -1;
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, true);
try { //try {
packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort()); if (true) throw new RuntimeException("fixme");
//packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort());
packet.writeData(data, 0, 1024); packet.writeData(data, 0, 1024);
packet.getPacket().setLength(1024); packet.getPacket().setLength(1024);
int outstanding = _sentNotReceived.size() + 1; int outstanding = _sentNotReceived.size() + 1;
@ -118,9 +119,9 @@ public class UDPEndpointTest {
_log.debug("Sending packet " + curPacket + " with outstanding " + outstanding); _log.debug("Sending packet " + curPacket + " with outstanding " + outstanding);
_endpoint.send(packet); _endpoint.send(packet);
//try { Thread.sleep(10); } catch (InterruptedException ie) {} //try { Thread.sleep(10); } catch (InterruptedException ie) {}
} catch (UnknownHostException uhe) { //} catch (UnknownHostException uhe) {
_log.error("foo!", uhe); // _log.error("foo!", uhe);
} //}
//if (_log.shouldLog(Log.DEBUG)) { //if (_log.shouldLog(Log.DEBUG)) {
// _log.debug("Sent to " + _endpoints[curPeer].getListenPort() + " from " + _endpoint.getListenPort()); // _log.debug("Sent to " + _endpoints[curPeer].getListenPort() + " from " + _endpoint.getListenPort());
//} //}

View File

@ -40,6 +40,7 @@ public class UDPPacket {
private long _receivedTime; private long _receivedTime;
private long _beforeReceiveFragments; private long _beforeReceiveFragments;
private long _afterHandlingTime; private long _afterHandlingTime;
private boolean _isInbound;
private static final List _packetCache; private static final List _packetCache;
static { static {
@ -47,7 +48,8 @@ public class UDPPacket {
_log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class); _log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class);
} }
private static final boolean CACHE = false; // TODO: support caching to cut churn down a /lot/ private static final boolean CACHE = true; // TODO: support caching to cut churn down a /lot/
private static final int CACHE_SIZE = 64;
static final int MAX_PACKET_SIZE = 2048; static final int MAX_PACKET_SIZE = 2048;
public static final int IV_SIZE = 16; public static final int IV_SIZE = 16;
@ -75,18 +77,31 @@ public class UDPPacket {
private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE; private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE;
private static final ByteCache _validateCache = ByteCache.getInstance(64, MAX_VALIDATE_SIZE); private static final ByteCache _validateCache = ByteCache.getInstance(64, MAX_VALIDATE_SIZE);
private static final ByteCache _ivCache = ByteCache.getInstance(64, IV_SIZE); private static final ByteCache _ivCache = ByteCache.getInstance(64, IV_SIZE);
private static final ByteCache _dataCache = ByteCache.getInstance(128, MAX_PACKET_SIZE); private static final ByteCache _dataCache = ByteCache.getInstance(64, MAX_PACKET_SIZE);
private UDPPacket(I2PAppContext ctx) { private UDPPacket(I2PAppContext ctx, boolean inbound) {
ctx.statManager().createRateStat("udp.packetsLiveInbound", "Number of live inbound packets in memory", "udp", new long[] { 60*1000, 5*60*1000 });
ctx.statManager().createRateStat("udp.packetsLiveOutbound", "Number of live outbound packets in memory", "udp", new long[] { 60*1000, 5*60*1000 });
ctx.statManager().createRateStat("udp.packetsLivePendingRecvInbound", "Number of live inbound packets not yet handled by the PacketHandler", "udp", new long[] { 60*1000, 5*60*1000 });
ctx.statManager().createRateStat("udp.packetsLivePendingHandleInbound", "Number of live inbound packets not yet handled fully by the PacketHandler", "udp", new long[] { 60*1000, 5*60*1000 });
// the data buffer is clobbered on init(..), but we need it to bootstrap
_packet = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE);
init(ctx, inbound);
}
private void init(I2PAppContext ctx, boolean inbound) {
_context = ctx; _context = ctx;
_dataBuf = _dataCache.acquire(); _dataBuf = _dataCache.acquire();
_data = _dataBuf.getData(); _data = _dataBuf.getData();
_packet = new DatagramPacket(_data, MAX_PACKET_SIZE); //_packet = new DatagramPacket(_data, MAX_PACKET_SIZE);
_packet.setData(_data);
_isInbound = inbound;
_initializeTime = _context.clock().now(); _initializeTime = _context.clock().now();
_markedType = -1; _markedType = -1;
_remoteHost = null; _remoteHost = null;
_released = false;
} }
/*
public void initialize(int priority, long expiration, InetAddress host, int port) { public void initialize(int priority, long expiration, InetAddress host, int port) {
_priority = (short)priority; _priority = (short)priority;
_expiration = expiration; _expiration = expiration;
@ -99,6 +114,7 @@ public class UDPPacket {
_released = false; _released = false;
_releasedBy = null; _releasedBy = null;
} }
*/
public void writeData(byte src[], int offset, int len) { public void writeData(byte src[], int offset, int len) {
verifyNotReleased(); verifyNotReleased();
@ -129,8 +145,12 @@ public class UDPPacket {
void setFragmentCount(int count) { _fragmentCount = count; } void setFragmentCount(int count) { _fragmentCount = count; }
public RemoteHostId getRemoteHost() { public RemoteHostId getRemoteHost() {
if (_remoteHost == null) if (_remoteHost == null) {
_remoteHost = new RemoteHostId(_packet.getAddress().getAddress(), _packet.getPort()); InetAddress addr = _packet.getAddress();
byte ip[] = addr.getAddress();
int port = _packet.getPort();
_remoteHost = new RemoteHostId(ip, port);
}
return _remoteHost; return _remoteHost;
} }
@ -234,7 +254,7 @@ public class UDPPacket {
return buf.toString(); return buf.toString();
} }
public static UDPPacket acquire(I2PAppContext ctx) { public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) {
UDPPacket rv = null; UDPPacket rv = null;
if (CACHE) { if (CACHE) {
synchronized (_packetCache) { synchronized (_packetCache) {
@ -242,27 +262,12 @@ public class UDPPacket {
rv = (UDPPacket)_packetCache.remove(0); rv = (UDPPacket)_packetCache.remove(0);
} }
} }
/*
if (rv != null) { if (rv != null)
rv._context = ctx; rv.init(ctx, inbound);
//rv._log = ctx.logManager().getLog(UDPPacket.class);
rv.resetBegin();
Arrays.fill(rv._data, (byte)0x00);
rv._markedType = -1;
rv._dataBuf.setValid(0);
rv._released = false;
rv._releasedBy = null;
rv._acquiredBy = null;
rv.setPacketDataLength(0);
synchronized (rv._packet) {
//rv._packet.setLength(0);
//rv._packet.setPort(1);
}
}
*/
} }
if (rv == null) if (rv == null)
rv = new UDPPacket(ctx); rv = new UDPPacket(ctx, inbound);
//if (rv._acquiredBy != null) { //if (rv._acquiredBy != null) {
// _log.log(Log.CRIT, "Already acquired! current stack trace is:", new Exception()); // _log.log(Log.CRIT, "Already acquired! current stack trace is:", new Exception());
// _log.log(Log.CRIT, "Earlier acquired:", rv._acquiredBy); // _log.log(Log.CRIT, "Earlier acquired:", rv._acquiredBy);
@ -277,15 +282,12 @@ public class UDPPacket {
//_releasedBy = new Exception("released by"); //_releasedBy = new Exception("released by");
//_acquiredBy = null; //_acquiredBy = null;
// //
if (!CACHE) { _dataCache.release(_dataBuf);
_dataCache.release(_dataBuf); if (!CACHE)
return; return;
}
synchronized (_packetCache) { synchronized (_packetCache) {
if (_packetCache.size() <= 64) { if (_packetCache.size() <= CACHE_SIZE) {
_packetCache.add(this); _packetCache.add(this);
} else {
_dataCache.release(_dataBuf);
} }
} }
} }

View File

@ -270,17 +270,17 @@ public class UDPPacketReader {
public boolean readExtendedDataIncluded() { public boolean readExtendedDataIncluded() {
return flagSet(UDPPacket.DATA_FLAG_EXTENDED); return flagSet(UDPPacket.DATA_FLAG_EXTENDED);
} }
public long[] readACKs() { public int readACKCount() {
if (!readACKsIncluded()) return null; if (!readACKsIncluded()) return 0;
int off = readBodyOffset() + 1; int off = readBodyOffset() + 1;
int num = (int)DataHelper.fromLong(_message, off, 1); return (int)DataHelper.fromLong(_message, off, 1);
}
public long readACK(int index) {
if (!readACKsIncluded()) return -1;
int off = readBodyOffset() + 1;
//int num = (int)DataHelper.fromLong(_message, off, 1);
off++; off++;
long rv[] = new long[num]; return DataHelper.fromLong(_message, off + (4 * index), 4);
for (int i = 0; i < num; i++) {
rv[i] = DataHelper.fromLong(_message, off, 4);
off += 4;
}
return rv;
} }
public ACKBitfield[] readACKBitfields() { public ACKBitfield[] readACKBitfields() {
if (!readACKBitfieldsIncluded()) return null; if (!readACKBitfieldsIncluded()) return null;

View File

@ -42,6 +42,7 @@ public class UDPReceiver {
_transport = transport; _transport = transport;
_runner = new Runner(); _runner = new Runner();
_context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.receiveRemaining", "How many packets are left sitting on the receiver's queue", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.droppedInboundProbabalistically", "How many packet we drop probabalistically (to simulate failures)", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.droppedInboundProbabalistically", "How many packet we drop probabalistically (to simulate failures)", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.acceptedInboundProbabalistically", "How many packet we accept probabalistically (to simulate failures)", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.acceptedInboundProbabalistically", "How many packet we accept probabalistically (to simulate failures)", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 });
@ -145,6 +146,7 @@ public class UDPReceiver {
} }
// rejected // rejected
packet.release();
_context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod); _context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod);
if (_log.shouldLog(Log.WARN)) { if (_log.shouldLog(Log.WARN)) {
StringBuffer msg = new StringBuffer(); StringBuffer msg = new StringBuffer();
@ -171,31 +173,36 @@ public class UDPReceiver {
* *
*/ */
public UDPPacket receiveNext() { public UDPPacket receiveNext() {
UDPPacket rv = null;
int remaining = 0;
while (_keepRunning) { while (_keepRunning) {
synchronized (_inboundQueue) { synchronized (_inboundQueue) {
if (_inboundQueue.size() <= 0) if (_inboundQueue.size() <= 0)
try { _inboundQueue.wait(); } catch (InterruptedException ie) {} try { _inboundQueue.wait(); } catch (InterruptedException ie) {}
if (_inboundQueue.size() > 0) { if (_inboundQueue.size() > 0) {
UDPPacket rv = (UDPPacket)_inboundQueue.remove(0); rv = (UDPPacket)_inboundQueue.remove(0);
if (_inboundQueue.size() > 0) remaining = _inboundQueue.size();
if (remaining > 0)
_inboundQueue.notifyAll(); _inboundQueue.notifyAll();
return rv; break;
} }
} }
} }
return null; _context.statManager().addRateData("udp.receiveRemaining", remaining, 0);
return rv;
} }
private class Runner implements Runnable { private class Runner implements Runnable {
private boolean _socketChanged; private boolean _socketChanged;
public void run() { public void run() {
_socketChanged = false; _socketChanged = false;
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest();
while (_keepRunning) { while (_keepRunning) {
if (_socketChanged) { if (_socketChanged) {
Thread.currentThread().setName(_name + "." + _id); Thread.currentThread().setName(_name + "." + _id);
_socketChanged = false; _socketChanged = false;
} }
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context, true);
// block before we read... // block before we read...
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -217,7 +224,9 @@ public class UDPReceiver {
// and block after we know how much we read but before // and block after we know how much we read but before
// we release the packet to the inbound queue // we release the packet to the inbound queue
if (size > 0) { if (size > 0) {
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); //FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver");
//_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver");
req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver");
while (req.getPendingInboundRequested() > 0) while (req.getPendingInboundRequested() > 0)
req.waitForNextAllocation(); req.waitForNextAllocation();

View File

@ -178,6 +178,7 @@ public class UDPSender {
private class Runner implements Runnable { private class Runner implements Runnable {
private boolean _socketChanged; private boolean _socketChanged;
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest();
public void run() { public void run() {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Running the UDP sender"); _log.debug("Running the UDP sender");
@ -196,7 +197,8 @@ public class UDPSender {
int size = packet.getPacket().getLength(); int size = packet.getPacket().getLength();
int size2 = packet.getPacket().getLength(); int size2 = packet.getPacket().getLength();
if (size > 0) { if (size > 0) {
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender"); //_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender");
req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender");
while (req.getPendingOutboundRequested() > 0) while (req.getPendingOutboundRequested() > 0)
req.waitForNextAllocation(); req.waitForNextAllocation();
} }
@ -209,7 +211,7 @@ public class UDPSender {
//_log.debug("Sending packet: (size="+size + "/"+size2 +")\nraw: " + Base64.encode(packet.getPacket().getData(), 0, size)); //_log.debug("Sending packet: (size="+size + "/"+size2 +")\nraw: " + Base64.encode(packet.getPacket().getData(), 0, size));
} }
_context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount()); //_context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount());
//packet.getPacket().setLength(size); //packet.getPacket().setLength(size);
try { try {

View File

@ -961,13 +961,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if ( (msg.getPeer() != null) && if ( (msg.getPeer() != null) &&
( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) || ( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) ||
(msg.isExpired())) ) { (msg.isExpired())) ) {
long recvDelay = _context.clock().now() - msg.getPeer().getLastReceiveTime(); //long recvDelay = _context.clock().now() - msg.getPeer().getLastReceiveTime();
long sendDelay = _context.clock().now() - msg.getPeer().getLastSendFullyTime(); //long sendDelay = _context.clock().now() - msg.getPeer().getLastSendFullyTime();
if (m != null) //if (m != null)
m.timestamp("message failure - volleys = " + msg.getMaxSends() // m.timestamp("message failure - volleys = " + msg.getMaxSends()
+ " lastReceived: " + recvDelay // + " lastReceived: " + recvDelay
+ " lastSentFully: " + sendDelay // + " lastSentFully: " + sendDelay
+ " expired? " + msg.isExpired()); // + " expired? " + msg.isExpired());
consecutive = msg.getPeer().incrementConsecutiveFailedSends(); consecutive = msg.getPeer().incrementConsecutiveFailedSends();
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Consecutive failure #" + consecutive _log.warn("Consecutive failure #" + consecutive