- Don't delay in OutboundMessageFragments at the end
      of the loop if we have more to send now, this should
      speed things up
    - More cleanups
This commit is contained in:
zzz
2011-07-24 12:16:35 +00:00
parent 49bba109ac
commit 702da0a929
5 changed files with 34 additions and 35 deletions

View File

@ -158,7 +158,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
state.releaseResources(); state.releaseResources();
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Message expired while only being partially read: " + state); _log.warn("Message expired while only being partially read: " + state);
_context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired hile partially read: " + state.toString()); _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString());
} else if (partialACK) { } else if (partialACK) {
// not expired but not yet complete... lets queue up a partial ACK // not expired but not yet complete... lets queue up a partial ACK
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@ -12,20 +12,20 @@ import net.i2p.util.Log;
* *
*/ */
class InboundMessageState { class InboundMessageState {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private long _messageId; private final long _messageId;
private Hash _from; private final Hash _from;
/** /**
* indexed array of fragments for the message, where not yet * indexed array of fragments for the message, where not yet
* received fragments are null. * received fragments are null.
*/ */
private ByteArray _fragments[]; private final ByteArray _fragments[];
/** /**
* what is the last fragment in the message (or -1 if not yet known) * what is the last fragment in the message (or -1 if not yet known)
*/ */
private int _lastFragment; private int _lastFragment;
private long _receiveBegin; private final long _receiveBegin;
private int _completeSize; private int _completeSize;
private boolean _released; private boolean _released;
@ -153,10 +153,12 @@ class InboundMessageState {
} }
public void releaseResources() { public void releaseResources() {
if (_fragments != null) for (int i = 0; i < _fragments.length; i++) {
for (int i = 0; i < _fragments.length; i++) if (_fragments[i] != null) {
_fragmentCache.release(_fragments[i]); _fragmentCache.release(_fragments[i]);
//_fragments = null; _fragments[i] = null;
}
}
_released = true; _released = true;
} }
@ -178,7 +180,7 @@ class InboundMessageState {
buf.append(" completely received with "); buf.append(" completely received with ");
buf.append(getCompleteSize()).append(" bytes"); buf.append(getCompleteSize()).append(" bytes");
} else { } else {
for (int i = 0; (_fragments != null) && (i < _fragments.length); i++) { for (int i = 0; i < _lastFragment; i++) {
buf.append(" fragment ").append(i); buf.append(" fragment ").append(i);
if (_fragments[i] != null) if (_fragments[i] != null)
buf.append(": known at size ").append(_fragments[i].getValid()); buf.append(": known at size ").append(_fragments[i].getValid());

View File

@ -261,7 +261,7 @@ class OutboundMessageFragments {
// Keep track of how many we've looked at, since we don't start the iterator at the beginning. // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
int peersProcessed = 0; int peersProcessed = 0;
while (_alive && (state == null) ) { while (_alive && (state == null) ) {
int nextSendDelay = -1; int nextSendDelay = Integer.MAX_VALUE;
// no, not every time - O(n**2) - do just before waiting below // no, not every time - O(n**2) - do just before waiting below
//finishMessages(); //finishMessages();
@ -295,10 +295,10 @@ class OutboundMessageFragments {
// we've gone all the way around, time to sleep // we've gone all the way around, time to sleep
break; break;
} else { } else {
// Update the minimum delay for all peers (getNextDelay() returns 1 for "now") // Update the minimum delay for all peers
// which will be used if we found nothing to send across all peers // which will be used if we found nothing to send across all peers
int delay = peer.getNextDelay(); int delay = peer.getNextDelay();
if ( (nextSendDelay <= 0) || (delay < nextSendDelay) ) if (delay < nextSendDelay)
nextSendDelay = delay; nextSendDelay = delay;
peer = null; peer = null;
} }
@ -309,23 +309,22 @@ class OutboundMessageFragments {
peer.getRemotePeer().toBase64()); peer.getRemotePeer().toBase64());
// if we've gone all the way through the loop, wait // if we've gone all the way through the loop, wait
if (state == null && peersProcessed >= _activePeers.size()) { // ... unless nextSendDelay says we have more ready now
if (state == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
_isWaiting = true;
peersProcessed = 0; peersProcessed = 0;
// why? we do this in the loop one at a time // why? we do this in the loop one at a time
//finishMessages(); //finishMessages();
if (_log.shouldLog(Log.DEBUG))
_log.debug("wait for " + nextSendDelay);
// wait.. or somethin'
// wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
_isWaiting = true; // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
// gets called regularly
int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT);
if (_log.shouldLog(Log.DEBUG))
_log.debug("wait for " + toWait);
// wait.. or somethin'
synchronized (_activePeers) { synchronized (_activePeers) {
try { try {
// use max of 1 second so finishMessages() and/or PeerState.finishMessages() _activePeers.wait(toWait);
// gets called regularly
if (nextSendDelay > 0)
_activePeers.wait(Math.min(Math.max(nextSendDelay, 10), MAX_WAIT));
else
_activePeers.wait(MAX_WAIT);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// noop // noop
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@ -206,6 +206,7 @@ class OutboundMessageState {
sends[i] = (short)-1; sends[i] = (short)-1;
boolean rv = isComplete(); boolean rv = isComplete();
/****
if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed
long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY); long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY);
//_nextSendTime = Math.max(now, _startedOn+PeerState.MIN_RTO); //_nextSendTime = Math.max(now, _startedOn+PeerState.MIN_RTO);
@ -218,6 +219,7 @@ class OutboundMessageState {
// _nextSendTime = now + 100; // _nextSendTime = now + 100;
//_nextSendTime = now; //_nextSendTime = now;
} }
****/
return rv; return rv;
} }

View File

@ -1248,26 +1248,22 @@ class PeerState {
} }
/** /**
* return how long to wait before sending, or -1 if we have nothing to send * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
* If ready now, will return 0 or a negative value.
*/ */
public int getNextDelay() { public int getNextDelay() {
int rv = -1; int rv = Integer.MAX_VALUE;
if (_dead) return rv;
long now = _context.clock().now(); long now = _context.clock().now();
List<OutboundMessageState> msgs = _outboundMessages; List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return -1;
synchronized (msgs) { synchronized (msgs) {
if (_retransmitter != null) { if (_retransmitter != null) {
rv = (int)(_retransmitter.getNextSendTime() - now); rv = (int)(_retransmitter.getNextSendTime() - now);
if (rv <= 0) return rv;
return 1;
else
return rv;
} }
for (OutboundMessageState state : msgs) { for (OutboundMessageState state : msgs) {
int delay = (int)(state.getNextSendTime() - now); int delay = (int)(state.getNextSendTime() - now);
if (delay <= 0) if (delay < rv)
delay = 1;
if ( (rv <= 0) || (delay < rv) )
rv = delay; rv = delay;
} }
} }