diff --git a/history.txt b/history.txt index 858a5a012..580ece500 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,13 @@ -$Id: history.txt,v 1.404 2006/02/17 17:29:33 jrandom Exp $ +$Id: history.txt,v 1.405 2006/02/18 01:48:48 jrandom Exp $ + +2006-02-18 jrandom + * Migrate the outbound packets from a central component to the individual + per-peer components, substantially cutting down on lock contention when + dealing with higher degrees. + * Load balance the outbound SSU transfers evenly across peers, rather than + across messages (so peers with few messages won't be starved by peers + with many). + * Reduce the frequency of router info rebuilds (thanks bar!) 2006-02-18 jrandom * Add a new AIMD throttle in SSU to control the number of concurrent diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 9f59881e0..6978a0e02 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.347 $ $Date: 2006/02/17 17:29:32 $"; + public final static String ID = "$Revision: 1.348 $ $Date: 2006/02/18 01:38:30 $"; public final static String VERSION = "0.6.1.10"; - public final static long BUILD = 3; + public final static long BUILD = 4; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 7a861693c..43268a817 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -169,6 +169,7 @@ public class EstablishmentManager { msg.getTarget().getIdentity(), new SessionKey(addr.getIntroKey()), addr); _outboundStates.put(to, state); + SimpleTimer.getInstance().addEvent(new Expire(to, state), 10*1000); } } if (state != null) { @@ -187,6 +188,33 @@ public class EstablishmentManager { notifyActivity(); } + private class Expire implements SimpleTimer.TimedEvent { + private RemoteHostId _to; + private OutboundEstablishState _state; + public Expire(RemoteHostId to, OutboundEstablishState state) { + _to = to; + _state = state; + } + public void timeReached() { + Object removed = null; + synchronized (_outboundStates) { + removed = _outboundStates.remove(_to); + if (removed != _state) { // oops, we must have failed, then retried + _outboundStates.put(_to, removed); + removed = null; + }/* else { + locked_admitQueued(); + }*/ + } + if (removed != null) { + _context.statManager().addRateData("udp.outboundEstablishFailedState", _state.getState(), _state.getLifetime()); + if (_log.shouldLog(Log.WARN)) + _log.warn("Timing out expired outbound: " + _state); + processExpired(_state); + } + } + } + /** * How many concurrent inbound sessions to deal with */ @@ -213,11 +241,11 @@ public class EstablishmentManager { state = (InboundEstablishState)_inboundStates.get(from); if (state == null) { state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort()); + state.receiveSessionRequest(reader.getSessionRequestReader()); isNew = true; _inboundStates.put(from, state); } } - state.receiveSessionRequest(reader.getSessionRequestReader()); if (isNew) { if (!_transport.introducersRequired()) { long tag = _context.random().nextLong(MAX_TAG_VALUE); @@ -332,6 +360,7 @@ public class EstablishmentManager { msg.getTarget().getIdentity(), new SessionKey(addr.getIntroKey()), addr); _outboundStates.put(to, qstate); + SimpleTimer.getInstance().addEvent(new Expire(to, qstate), 10*1000); for (int i = 0; i < queued.size(); i++) { OutNetMessage m = (OutNetMessage)queued.get(i); @@ -361,7 +390,7 @@ public class EstablishmentManager { private void handleCompletelyEstablished(InboundEstablishState state) { long now = _context.clock().now(); RouterIdentity remote = state.getConfirmedIdentity(); - PeerState peer = new PeerState(_context); + PeerState peer = new PeerState(_context, _transport); peer.setCurrentCipherKey(state.getCipherKey()); peer.setCurrentMACKey(state.getMACKey()); peer.setCurrentReceiveSecond(now - (now % 1000)); @@ -396,7 +425,7 @@ public class EstablishmentManager { private PeerState handleCompletelyEstablished(OutboundEstablishState state) { long now = _context.clock().now(); RouterIdentity remote = state.getRemoteIdentity(); - PeerState peer = new PeerState(_context); + PeerState peer = new PeerState(_context, _transport); peer.setCurrentCipherKey(state.getCipherKey()); peer.setCurrentMACKey(state.getMACKey()); peer.setCurrentReceiveSecond(now - (now % 1000)); @@ -721,6 +750,7 @@ public class EstablishmentManager { // _log.debug("# outbound states: " + _outboundStates.size()); for (Iterator iter = _outboundStates.values().iterator(); iter.hasNext(); ) { OutboundEstablishState cur = (OutboundEstablishState)iter.next(); + if (cur == null) continue; if (cur.getState() == OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) { // completely received iter.remove(); @@ -770,46 +800,7 @@ public class EstablishmentManager { if (outboundState != null) { if (outboundState.getLifetime() > MAX_ESTABLISH_TIME) { - if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Lifetime of expired outbound establish: " + outboundState.getLifetime()); - while (true) { - OutNetMessage msg = outboundState.getNextQueuedMessage(); - if (msg == null) - break; - _transport.failed(msg, "Expired during failed establish"); - } - String err = null; - switch (outboundState.getState()) { - case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY: - err = "Took too long to establish remote connection (confirmed partially)"; - break; - case OutboundEstablishState.STATE_CREATED_RECEIVED: - err = "Took too long to establish remote connection (created received)"; - break; - case OutboundEstablishState.STATE_REQUEST_SENT: - err = "Took too long to establish remote connection (request sent)"; - break; - case OutboundEstablishState.STATE_PENDING_INTRO: - err = "Took too long to establish remote connection (intro failed)"; - break; - case OutboundEstablishState.STATE_UNKNOWN: // fallthrough - default: - err = "Took too long to establish remote connection (unknown state)"; - } - - Hash peer = outboundState.getRemoteIdentity().calculateHash(); - _context.shitlist().shitlistRouter(peer, err); - _transport.dropPeer(peer); - //_context.profileManager().commErrorOccurred(peer); - } else { - while (true) { - OutNetMessage msg = outboundState.getNextQueuedMessage(); - if (msg == null) - break; - _transport.send(msg); - } - } + processExpired(outboundState); } else { switch (outboundState.getState()) { case OutboundEstablishState.STATE_UNKNOWN: @@ -847,6 +838,49 @@ public class EstablishmentManager { return nextSendTime; } + + private void processExpired(OutboundEstablishState outboundState) { + if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Lifetime of expired outbound establish: " + outboundState.getLifetime()); + while (true) { + OutNetMessage msg = outboundState.getNextQueuedMessage(); + if (msg == null) + break; + _transport.failed(msg, "Expired during failed establish"); + } + String err = null; + switch (outboundState.getState()) { + case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY: + err = "Took too long to establish remote connection (confirmed partially)"; + break; + case OutboundEstablishState.STATE_CREATED_RECEIVED: + err = "Took too long to establish remote connection (created received)"; + break; + case OutboundEstablishState.STATE_REQUEST_SENT: + err = "Took too long to establish remote connection (request sent)"; + break; + case OutboundEstablishState.STATE_PENDING_INTRO: + err = "Took too long to establish remote connection (intro failed)"; + break; + case OutboundEstablishState.STATE_UNKNOWN: // fallthrough + default: + err = "Took too long to establish remote connection (unknown state)"; + } + + Hash peer = outboundState.getRemoteIdentity().calculateHash(); + _context.shitlist().shitlistRouter(peer, err); + _transport.dropPeer(peer); + //_context.profileManager().commErrorOccurred(peer); + } else { + while (true) { + OutNetMessage msg = outboundState.getNextQueuedMessage(); + if (msg == null) + break; + _transport.send(msg); + } + } + } /** * Driving thread, processing up to one step for an inbound peer and up to @@ -881,13 +915,15 @@ public class EstablishmentManager { long delay = nextSendTime - now; if ( (nextSendTime == -1) || (delay > 0) ) { + if (delay > 5000) + delay = 5000; boolean interrupted = false; try { synchronized (_activityLock) { if (_activity > 0) return; if (nextSendTime == -1) - _activityLock.wait(); + _activityLock.wait(5000); else _activityLock.wait(delay); } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 0d6d21474..696718f65 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -31,21 +31,15 @@ public class OutboundMessageFragments { private Log _log; private UDPTransport _transport; private ActiveThrottle _throttle; - /** OutboundMessageState for messages being sent */ - private List _activeMessages; + /** peers we are actively sending messages to */ + private List _activePeers; private boolean _alive; - /** which message should we build the next packet out of? */ - private int _nextPacketMessage; + /** which peer should we build the next packet out of? */ + private int _nextPeer; private PacketBuilder _builder; /** if we can handle more messages explicitly, set this to true */ private boolean _allowExcess; private volatile long _packetsRetransmitted; - /** - * Map of peer to OutboundMessageState for messages being retransmitted, to - * keep bad peers from bursting too much due to congestion/outage. This - * should only be accessed when holding the lock on _activeMessages. - */ - private Map _retransmitters; private static final int MAX_ACTIVE = 64; // don't send a packet more than 10 times @@ -56,9 +50,8 @@ public class OutboundMessageFragments { _log = ctx.logManager().getLog(OutboundMessageFragments.class); _transport = transport; _throttle = throttle; - _activeMessages = new ArrayList(MAX_ACTIVE); - _retransmitters = new HashMap(MAX_ACTIVE); - _nextPacketMessage = 0; + _activePeers = new ArrayList(256); + _nextPeer = 0; _builder = new PacketBuilder(ctx, transport); _alive = true; _allowExcess = false; @@ -85,8 +78,16 @@ public class OutboundMessageFragments { public void startup() { _alive = true; } public void shutdown() { _alive = false; - synchronized (_activeMessages) { - _activeMessages.notifyAll(); + synchronized (_activePeers) { + _activePeers.notifyAll(); + } + } + void dropPeer(PeerState peer) { + if (_log.shouldLog(Log.INFO)) + _log.info("Dropping peer " + peer.getRemotePeer().toBase64()); + peer.dropOutbound(); + synchronized (_activePeers) { + _activePeers.remove(peer); } } @@ -99,7 +100,8 @@ public class OutboundMessageFragments { public boolean waitForMoreAllowed() { // test without choking. // perhaps this should check the lifetime of the first activeMessage? - if (true) return true; + if (true) return true; + /* long start = _context.clock().now(); int numActive = 0; @@ -121,6 +123,7 @@ public class OutboundMessageFragments { _context.statManager().addRateData("udp.activeDelay", numActive, _context.clock().now() - start); } catch (InterruptedException ie) {} } + */ return false; } @@ -131,28 +134,32 @@ public class OutboundMessageFragments { public void add(OutNetMessage msg) { I2NPMessage msgBody = msg.getMessage(); RouterInfo target = msg.getTarget(); - if ( (msgBody == null) || (target == null) ) { - synchronized (_activeMessages) { - _activeMessages.notifyAll(); - } + if ( (msgBody == null) || (target == null) ) return; - } OutboundMessageState state = new OutboundMessageState(_context); boolean ok = state.initialize(msg, msgBody); - state.setPeer(_transport.getPeerState(target.getIdentity().calculateHash())); - finishMessages(); - int active = 0; - synchronized (_activeMessages) { - if (ok) - _activeMessages.add(state); - active = _activeMessages.size(); - if (active == 1) - _lastCycleTime = System.currentTimeMillis(); - _activeMessages.notifyAll(); + if (ok) { + PeerState peer = _transport.getPeerState(target.getIdentity().calculateHash()); + int active = peer.add(state); + synchronized (_activePeers) { + if (!_activePeers.contains(peer)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); + _activePeers.add(peer); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); + } + _activePeers.notifyAll(); + } + msg.timestamp("made active along with: " + active); + _context.statManager().addRateData("udp.outboundActiveCount", active, 0); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error initializing " + msg); } - msg.timestamp("made active along with: " + active); - _context.statManager().addRateData("udp.outboundActiveCount", active, 0); + finishMessages(); } /** @@ -160,116 +167,55 @@ public class OutboundMessageFragments { * complete message reliably */ public void add(OutboundMessageState state) { + PeerState peer = state.getPeer(); + if (peer == null) + throw new RuntimeException("wtf, null peer for " + state); + int active = peer.add(state); + synchronized (_activePeers) { + if (!_activePeers.contains(peer)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); + _activePeers.add(peer); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); + } + if (_activePeers.size() == 1) + _lastCycleTime = System.currentTimeMillis(); + _activePeers.notifyAll(); + } + _context.statManager().addRateData("udp.outboundActiveCount", active, 0); + // should we finish messages here too? + /* synchronized (_activeMessages) { _activeMessages.add(state); if (_activeMessages.size() == 1) _lastCycleTime = System.currentTimeMillis(); _activeMessages.notifyAll(); } + */ } /** * Remove any expired or complete messages */ private void finishMessages() { - synchronized (_activeMessages) { - for (int i = 0; i < _activeMessages.size(); i++) { - OutboundMessageState state = (OutboundMessageState)_activeMessages.get(i); - PeerState peer = state.getPeer(); - if (state.isComplete()) { - _activeMessages.remove(i); - locked_removeRetransmitter(state); - _transport.succeeded(state); - if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) ) - _throttle.unchoke(peer.getRemotePeer()); - state.releaseResources(); - if (i < _nextPacketMessage) { - _nextPacketMessage--; - if (_nextPacketMessage < 0) - _nextPacketMessage = 0; - } - if (state.getMessage() != null) - state.getMessage().timestamp("sending complete"); + int rv = 0; + synchronized (_activePeers) { + for (int i = 0; i < _activePeers.size(); i++) { + PeerState state = (PeerState)_activePeers.get(i); + int remaining = state.finishMessages(); + if (remaining <= 0) { + _activePeers.remove(i); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("No more pending messages for " + state.getRemotePeer().toBase64()); i--; - } else if (state.isExpired()) { - _activeMessages.remove(i); - locked_removeRetransmitter(state); - _context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime()); - - if (state.getMessage() != null) { - state.getMessage().timestamp("expired in the active pool"); - _transport.failed(state); - } else { - // it can not have an OutNetMessage if the source is the - // final after establishment message - if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to send an expired direct message: " + state); - } - if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) ) - _throttle.unchoke(peer.getRemotePeer()); - state.releaseResources(); - if (i < _nextPacketMessage) { - _nextPacketMessage--; - if (_nextPacketMessage < 0) - _nextPacketMessage = 0; - } - i--; - } else if (state.getPushCount() > MAX_VOLLEYS) { - _activeMessages.remove(i); - locked_removeRetransmitter(state); - _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); - //if (state.getPeer() != null) - // state.getPeer().congestionOccurred(); - - if (state.getMessage() != null) { - state.getMessage().timestamp("too many sends"); - _transport.failed(state); - } else { - // it can not have an OutNetMessage if the source is the - // final after establishment message - if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to send a direct message after too many volleys: " + state); - } - if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) ) - _throttle.unchoke(peer.getRemotePeer()); - state.releaseResources(); - if (i < _nextPacketMessage) { - _nextPacketMessage--; - if (_nextPacketMessage < 0) - _nextPacketMessage = 0; - } - i--; - } // end (pushCount > maxVolleys) - } // end iterating over active - _activeMessages.notifyAll(); - } // end synchronized - } - - /** - * Remove the block on retransmissions to the peer if and only if the given - * message is the current "retransmitter" for it. - * - */ - private void locked_removeRetransmitter(OutboundMessageState state) { - PeerState curPeer = state.getPeer(); - if (curPeer == null) { - for (Iterator iter = _retransmitters.keySet().iterator(); iter.hasNext(); ) { - PeerState cpeer = (PeerState)iter.next(); - OutboundMessageState cstate = (OutboundMessageState)_retransmitters.get(cpeer); - if (cstate == state) { - iter.remove(); - break; } + rv += remaining; } - } else { - OutboundMessageState remState = (OutboundMessageState)_retransmitters.get(curPeer); - if (remState == state) - _retransmitters.remove(curPeer); } } - private static final long SECOND_MASK = 1023l; - private long _lastCycleTime = System.currentTimeMillis(); /** @@ -284,76 +230,58 @@ public class OutboundMessageFragments { OutboundMessageState state = null; while (_alive && (state == null) ) { long now = _context.clock().now(); - long nextSend = -1; + int nextSendDelay = -1; finishMessages(); try { - synchronized (_activeMessages) { - for (int i = 0; i < _activeMessages.size(); i++) { - int cur = (i + _nextPacketMessage) % _activeMessages.size(); + synchronized (_activePeers) { + for (int i = 0; i < _activePeers.size(); i++) { + int cur = (i + _nextPeer) % _activePeers.size(); if (cur == 0) { long ts = System.currentTimeMillis(); long cycleTime = ts - _lastCycleTime; - _lastCycleTime = ts; - _context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activeMessages.size()); + _context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size()); if (cycleTime > 1000) - _context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activeMessages.size()); + _context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size()); } - state = (OutboundMessageState)_activeMessages.get(cur); - peer = state.getPeer(); // known if this is immediately after establish - if (peer == null) - peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash()); - - if ((peer != null) && locked_shouldSend(state, peer)) { - // for fairness, we move on in a round robin - _nextPacketMessage = i + 1; + peer = (PeerState)_activePeers.get(i); + state = peer.allocateSend(); + if (state != null) { + _nextPeer = i + 1; break; } else { - if (peer == null) { - // peer disconnected - _activeMessages.remove(cur); - locked_removeRetransmitter(state); - if (state.getMessage() != null) - state.getMessage().timestamp("peer disconnected"); - _transport.failed(state); - if (_log.shouldLog(Log.ERROR)) - _log.error("Peer disconnected for " + state); - if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) ) - _throttle.unchoke(peer.getRemotePeer()); - state.releaseResources(); - i--; - } - - long time = state.getNextSendTime(); - if ( (nextSend < 0) || (time < nextSend) ) - nextSend = time; - state = null; + int delay = peer.getNextDelay(); + if ( (nextSendDelay <= 0) || (delay < nextSendDelay) ) + nextSendDelay = delay; peer = null; + state = null; } - } // end of the for(activeMessages) - - if (state == null) { - if (nextSend <= 0) { - _activeMessages.notifyAll(); - _activeMessages.wait(1000); - } else { - // none of the packets were eligible for sending - long delay = nextSend - now; - if (delay <= 0) - delay = 10; - if (delay > 1000) - delay = 1000; - _allowExcess = true; - _activeMessages.notifyAll(); - _activeMessages.wait(delay); - } - } else { - _activeMessages.notifyAll(); } - _allowExcess = false; - } // end of the synchronized block - } catch (InterruptedException ie) {} - } // end of the while (alive && !found) - + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Done looping, next peer we are sending for: " + + (peer != null ? peer.getRemotePeer().toBase64() : "none")); + if (state == null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("wait for " + nextSendDelay); + // wait.. or somethin' + if (nextSendDelay > 0) + _activePeers.wait(nextSendDelay); + else + _activePeers.wait(1000); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("dont wait: alive=" + _alive + " state = " + state); + } + } + } catch (InterruptedException ie) { + // noop + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Woken up while waiting"); + } + } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending " + state); + UDPPacket packets[] = preparePackets(state, peer); if ( (state != null) && (state.getMessage() != null) ) { int valid = 0; @@ -369,120 +297,6 @@ public class OutboundMessageFragments { return packets; } - /** - * If set to true, we should throttle retransmissions of all but the first message in - * flight to a peer. If set to false, we will only throttle the initial flight of a - * message to a peer while a retransmission is going on. - */ - private static final boolean THROTTLE_RESENDS = true; - /** - * if true, throttle the initial volley of a message if there is a resend in progress. - * if false, always send the first volley, regardless of retransmissions (but keeping in - * mind bw/cwin throttle, etc) - * - */ - private static final boolean THROTTLE_INITIAL_SEND = false; - - private boolean locked_shouldSend(OutboundMessageState state, PeerState peer) { - long now = _context.clock().now(); - if (state.getNextSendTime() <= now) { - if (!state.isFragmented()) { - state.fragment(fragmentSize(peer.getMTU())); - if (state.getMessage() != null) - state.getMessage().timestamp("fragment into " + state.getFragmentCount()); - - if (_log.shouldLog(Log.INFO)) - _log.info("Fragmenting " + state); - } - - OutboundMessageState curRetransMsg = (OutboundMessageState)_retransmitters.get(peer); - if ( (curRetransMsg != null) && ( (curRetransMsg.isExpired() || curRetransMsg.isComplete()) ) ) { - _retransmitters.remove(peer); - curRetransMsg = null; - } - - if ( (curRetransMsg != null) && (curRetransMsg != state) ) { - // choke it, since there's already another message retransmitting to this - // peer. - _context.statManager().addRateData("udp.blockedRetransmissions", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted()); - if ( (state.getMaxSends() <= 0) && (!THROTTLE_INITIAL_SEND) ) { - if (state.getMessage() != null) - state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley..."); - } else if ( (state.getMaxSends() <= 0) || (THROTTLE_RESENDS) ) { - if (state.getMessage() != null) - state.getMessage().timestamp("choked, with another message retransmitting"); - return false; - } else { - if (state.getMessage() != null) - state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending..."); - } - } - - int size = state.getUnackedSize(); - if (peer.allocateSendingBytes(size, state.getPushCount())) { - if (_log.shouldLog(Log.INFO)) - _log.info("Allocation of " + size + " allowed with " - + peer.getSendWindowBytesRemaining() - + "/" + peer.getSendWindowBytes() - + " remaining" - + " for message " + state.getMessageId() + ": " + state); - - if (state.getPushCount() > 0) { - _retransmitters.put(peer, state); - /* - - int fragments = state.getFragmentCount(); - int toSend = 0; - for (int i = 0; i < fragments; i++) { - if (state.needsSending(i)) - toSend++; - } - - peer.messageRetransmitted(toSend); - _packetsRetransmitted += toSend; // lifetime for the transport - _context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted()); - _context.statManager().addRateData("udp.packetsRetransmitted", _packetsRetransmitted, peer.getPacketsTransmitted()); - if (_log.shouldLog(Log.WARN)) - _log.warn("Retransmitting " + state + " to " + peer); - _context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), toSend); - */ - } - - state.push(); - - int rto = peer.getRTO();// * state.getPushCount(); - state.setNextSendTime(now + rto); - - if (peer.getSendWindowBytesRemaining() > 0) - _throttle.unchoke(peer.getRemotePeer()); - return true; - } else { - _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); - if (state.getMessage() != null) - state.getMessage().timestamp("send rejected, available=" + peer.getSendWindowBytesRemaining()); - if (_log.shouldLog(Log.WARN)) - _log.warn("Allocation of " + size + " rejected w/ wsize=" + peer.getSendWindowBytes() - + " available=" + peer.getSendWindowBytesRemaining() - + " for message " + state.getMessageId() + ": " + state); - state.setNextSendTime(now+(_context.random().nextInt(2*ACKSender.ACK_FREQUENCY))); //(now + 1024) & ~SECOND_MASK); - if (_log.shouldLog(Log.WARN)) - _log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); - _throttle.choke(peer.getRemotePeer()); - - if (state.getMessage() != null) - state.getMessage().timestamp("choked, not enough available, wsize=" - + peer.getSendWindowBytes() + " available=" - + peer.getSendWindowBytesRemaining()); - return false; - } - } // nextTime <= now - - //if (state.getMessage() != null) - // state.getMessage().timestamp("choked, time remaining to retransmit: " + (state.getNextSendTime() - now)); - - return false; - } - private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) { if ( (state != null) && (peer != null) ) { int fragments = state.getFragmentCount(); @@ -562,14 +376,6 @@ public class OutboundMessageFragments { } } - private static final int SSU_HEADER_SIZE = 46; - static final int UDP_HEADER_SIZE = 8; - static final int IP_HEADER_SIZE = 20; - /** how much payload data can we shove in there? */ - private static final int fragmentSize(int mtu) { - return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE; - } - /** * We received an ACK of the given messageId from the given peer, so if it * is still unacked, mark it as complete. @@ -577,165 +383,27 @@ public class OutboundMessageFragments { * @return fragments acked */ public int acked(long messageId, Hash ackedBy) { - OutboundMessageState state = null; - synchronized (_activeMessages) { - // linear search, since its tiny - for (int i = 0; i < _activeMessages.size(); i++) { - state = (OutboundMessageState)_activeMessages.get(i); - if (state.getMessageId() == messageId) { - OutNetMessage msg = state.getMessage(); - if (msg != null) { - Hash expectedBy = msg.getTarget().getIdentity().getHash(); - if (!expectedBy.equals(ackedBy)) { - state = null; - _activeMessages.notifyAll(); - return 0; - } - } - // either the message was a short circuit after establishment, - // or it was received from who we sent it to. yay! - _activeMessages.remove(i); - if (i < _nextPacketMessage) { - _nextPacketMessage--; - if (_nextPacketMessage < 0) - _nextPacketMessage = 0; - } - locked_removeRetransmitter(state); - break; - } else { - state = null; - } - } - _activeMessages.notifyAll(); - } - - if (state != null) { - int numSends = state.getMaxSends(); - if (state.getMessage() != null) { - PeerState peer = state.getPeer(); - if (peer != null) - state.getMessage().timestamp("acked after " + numSends - + " lastReceived: " - + (_context.clock().now() - peer.getLastReceiveTime()) - + " lastSentFully: " - + (_context.clock().now() - peer.getLastSendFullyTime())); - } - - - if (_log.shouldLog(Log.INFO)) - _log.info("Received ack of " + messageId + " by " + ackedBy.toBase64() - + " after " + state.getLifetime() + " and " + numSends + " sends"); - _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); - if (state.getFragmentCount() > 1) - _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); - if (numSends > 1) - _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); - _transport.succeeded(state); - int numFragments = state.getFragmentCount(); - PeerState peer = state.getPeer(); - if (peer != null) { - // this adjusts the rtt/rto/window/etc - peer.messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), numSends); - if (peer.getSendWindowBytesRemaining() > 0) - _throttle.unchoke(peer.getRemotePeer()); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("message acked, but no peer attacked: " + state); - } - state.releaseResources(); - return numFragments; + PeerState peer = _transport.getPeerState(ackedBy); + if (peer != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("acked [" + messageId + "] by " + ackedBy.toBase64()); + return peer.acked(messageId); } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Received an ACK for a message not pending: " + messageId); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("acked [" + messageId + "] by an unknown remote peer? " + ackedBy.toBase64()); return 0; } } public void acked(ACKBitfield bitfield, Hash ackedBy) { - if (bitfield.receivedComplete()) { - acked(bitfield.getMessageId(), ackedBy); - return; - } - - OutboundMessageState state = null; - boolean isComplete = false; - synchronized (_activeMessages) { - // linear search, since its tiny - for (int i = 0; i < _activeMessages.size(); i++) { - state = (OutboundMessageState)_activeMessages.get(i); - if (state.getMessageId() == bitfield.getMessageId()) { - OutNetMessage msg = state.getMessage(); - if (msg != null) { - Hash expectedBy = msg.getTarget().getIdentity().getHash(); - if (!expectedBy.equals(ackedBy)) { - state = null; - _activeMessages.notifyAll(); - return; - } - } - isComplete = state.acked(bitfield); - if (isComplete) { - // either the message was a short circuit after establishment, - // or it was received from who we sent it to. yay! - _activeMessages.remove(i); - if (i < _nextPacketMessage) { - _nextPacketMessage--; - if (_nextPacketMessage < 0) - _nextPacketMessage = 0; - } - } - locked_removeRetransmitter(state); - break; - } else { - state = null; - } - } - _activeMessages.notifyAll(); - } - - if (state != null) { - int numSends = state.getMaxSends(); - - int bits = bitfield.fragmentCount(); - int numACKed = 0; - for (int i = 0; i < bits; i++) - if (bitfield.received(i)) - numACKed++; - - _context.statManager().addRateData("udp.partialACKReceived", numACKed, state.getLifetime()); - - if (_log.shouldLog(Log.INFO)) - _log.info("Received partial ack of " + state.getMessageId() + " by " + ackedBy.toBase64() - + " after " + state.getLifetime() + " and " + numSends + " sends: " + bitfield + ": completely removed? " - + isComplete + ": " + state); - - if (isComplete) { - _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); - if (state.getFragmentCount() > 1) - _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); - if (numSends > 1) - _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); - if (state.getMessage() != null) - state.getMessage().timestamp("partial ack to complete after " + numSends); - _transport.succeeded(state); - - if (state.getPeer() != null) { - // this adjusts the rtt/rto/window/etc - state.getPeer().messageACKed(state.getFragmentCount()*state.getFragmentSize(), state.getLifetime(), 0); - if (state.getPeer().getSendWindowBytesRemaining() > 0) - _throttle.unchoke(state.getPeer().getRemotePeer()); - } - - state.releaseResources(); - } else { - if (state.getMessage() != null) - state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString()); - } - return; + PeerState peer = _transport.getPeerState(ackedBy); + if (peer != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("partial acked [" + bitfield + "] by " + ackedBy.toBase64()); + peer.acked(bitfield); } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Received an ACK for a message not pending: " + bitfield); - return; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("partial acked [" + bitfield + "] by an unknown remote peer? " + ackedBy.toBase64()); } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index e16fbe5df..22abbe224 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -15,6 +15,7 @@ import net.i2p.data.Hash; import net.i2p.data.SessionKey; import net.i2p.util.Log; import net.i2p.router.RouterContext; +import net.i2p.router.OutNetMessage; /** * Contain all of the state about a UDP connection to a peer. @@ -168,6 +169,12 @@ public class PeerState { /** Message (Long) to InboundMessageState for active message */ private Map _inboundMessages; + /** Message (Long) to OutboundMessageState */ + private Map _outboundMessages; + /** which outbound message is currently being retransmitted */ + private OutboundMessageState _retransmitter; + + private UDPTransport _transport; /** have we migrated away from this peer to another newer one? */ private volatile boolean _dead; @@ -206,9 +213,10 @@ public class PeerState { /** override the default MTU */ private static final String PROP_DEFAULT_MTU = "i2np.udp.mtu"; - public PeerState(I2PAppContext ctx) { + public PeerState(I2PAppContext ctx, UDPTransport transport) { _context = ctx; _log = ctx.logManager().getLog(PeerState.class); + _transport = transport; _remotePeer = null; _currentMACKey = null; _currentCipherKey = null; @@ -254,6 +262,7 @@ public class PeerState { _packetsReceived = 0; _packetsReceivedDuplicate = 0; _inboundMessages = new HashMap(8); + _outboundMessages = new HashMap(8); _dead = false; _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); @@ -938,8 +947,8 @@ public class PeerState { } private int countMaxACKData() { return _mtu - - OutboundMessageFragments.IP_HEADER_SIZE - - OutboundMessageFragments.UDP_HEADER_SIZE + - IP_HEADER_SIZE + - UDP_HEADER_SIZE - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - 1 // type flag @@ -959,6 +968,381 @@ public class PeerState { } public RemoteHostId getRemoteHostId() { return _remoteHostId; } + + + public int add(OutboundMessageState state) { + state.setPeer(this); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); + Map msgs = _outboundMessages; + if (msgs == null) return 0; + synchronized (msgs) { + msgs.put(new Long(state.getMessageId()), state); + return msgs.size(); + } + } + /** drop all outbound messages */ + public void dropOutbound() { + if (_dead) return; + _dead = true; + Map msgs = _outboundMessages; + //_outboundMessages = null; + _retransmitter = null; + if (msgs != null) { + synchronized (msgs) { + for (Iterator iter = msgs.values().iterator(); iter.hasNext();) + _transport.failed((OutboundMessageState)iter.next()); + msgs.clear(); + } + } + } + + /** + * Expire / complete any outbound messages + * @return number of active outbound messages remaining + */ + public int finishMessages() { + int rv = 0; + Map msgs = _outboundMessages; + if (_dead) return 0; + List succeeded = null; + List failed = null; + synchronized (msgs) { + for (Iterator iter = msgs.keySet().iterator(); iter.hasNext(); ) { + Long id = (Long)iter.next(); + OutboundMessageState state = (OutboundMessageState)msgs.get(id); + if (state.isComplete()) { + iter.remove(); + if (_retransmitter == state) + _retransmitter = null; + if (succeeded == null) succeeded = new ArrayList(4); + succeeded.add(state); + } else if (state.isExpired()) { + iter.remove(); + if (_retransmitter == state) + _retransmitter = null; + _context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime()); + if (failed == null) failed = new ArrayList(4); + failed.add(state); + } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) { + iter.remove(); + if (state == _retransmitter) + _retransmitter = null; + _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); + if (failed == null) failed = new ArrayList(4); + failed.add(state); + } // end (pushCount > maxVolleys) + } // end iterating over outbound messages + rv = msgs.size(); + } + + for (int i = 0; succeeded != null && i < succeeded.size(); i++) { + OutboundMessageState state = (OutboundMessageState)succeeded.get(i); + _transport.succeeded(state); + state.releaseResources(); + OutNetMessage msg = state.getMessage(); + if (msg != null) + msg.timestamp("sending complete"); + } + + for (int i = 0; failed != null && i < failed.size(); i++) { + OutboundMessageState state = (OutboundMessageState)failed.get(i); + OutNetMessage msg = state.getMessage(); + if (msg != null) { + msg.timestamp("expired in the active pool"); + _transport.failed(state); + } else { + // it can not have an OutNetMessage if the source is the + // final after establishment message + if (_log.shouldLog(Log.WARN)) + _log.warn("Unable to send a direct message: " + state); + } + state.releaseResources(); + } + + return rv; + } + + /** + * Pick a message we want to send and allocate it out of our window + * @return allocated message to send, or null if no messages or no resources + * + */ + public OutboundMessageState allocateSend() { + int total = 0; + Map msgs = _outboundMessages; + if (_dead) return null; + synchronized (msgs) { + for (Iterator iter = msgs.values().iterator(); iter.hasNext(); ) { + OutboundMessageState state = (OutboundMessageState)iter.next(); + if (locked_shouldSend(state)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId()); + /* + while (iter.hasNext()) { + OutboundMessageState later = (OutboundMessageState)iter.next(); + OutNetMessage msg = later.getMessage(); + if (msg != null) + msg.timestamp("not reached for allocation " + msgs.size() + " other peers"); + } + */ + return state; + } else { + OutNetMessage msg = state.getMessage(); + if (msg != null) + msg.timestamp("passed over for allocation with " + msgs.size() + " peers"); + } + } + total = msgs.size(); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Nothing to send to " + _remotePeer.toBase64() + ", with " + total + " remaining"); + return null; + } + + /** + * return how long to wait before sending, or -1 if we have nothing to send + */ + public int getNextDelay() { + int rv = -1; + long now = _context.clock().now(); + Map msgs = _outboundMessages; + if (_dead) return -1; + synchronized (msgs) { + if (_retransmitter != null) { + rv = (int)(now - _retransmitter.getNextSendTime()); + if (rv <= 0) + return 1; + else + return rv; + } + for (Iterator iter = msgs.values().iterator(); iter.hasNext(); ) { + OutboundMessageState state = (OutboundMessageState)iter.next(); + int delay = (int)(state.getNextSendTime() - now); + if (delay <= 0) + delay = 1; + if ( (rv <= 0) || (delay < rv) ) + rv = delay; + } + } + return rv; + } + + + /** + * If set to true, we should throttle retransmissions of all but the first message in + * flight to a peer. If set to false, we will only throttle the initial flight of a + * message to a peer while a retransmission is going on. + */ + private static final boolean THROTTLE_RESENDS = true; + /** + * if true, throttle the initial volley of a message if there is a resend in progress. + * if false, always send the first volley, regardless of retransmissions (but keeping in + * mind bw/cwin throttle, etc) + * + */ + private static final boolean THROTTLE_INITIAL_SEND = false; + + private static final int SSU_HEADER_SIZE = 46; + static final int UDP_HEADER_SIZE = 8; + static final int IP_HEADER_SIZE = 20; + /** how much payload data can we shove in there? */ + private static final int fragmentSize(int mtu) { + return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE; + } + + private boolean locked_shouldSend(OutboundMessageState state) { + long now = _context.clock().now(); + if (state.getNextSendTime() <= now) { + if (!state.isFragmented()) { + state.fragment(fragmentSize(getMTU())); + if (state.getMessage() != null) + state.getMessage().timestamp("fragment into " + state.getFragmentCount()); + + if (_log.shouldLog(Log.INFO)) + _log.info("Fragmenting " + state); + } + + + if ( (_retransmitter != null) && ( (_retransmitter.isExpired() || _retransmitter.isComplete()) ) ) + _retransmitter = null; + + if ( (_retransmitter != null) && (_retransmitter != state) ) { + // choke it, since there's already another message retransmitting to this + // peer. + _context.statManager().addRateData("udp.blockedRetransmissions", getPacketsRetransmitted(), getPacketsTransmitted()); + if ( (state.getMaxSends() <= 0) && (!THROTTLE_INITIAL_SEND) ) { + if (state.getMessage() != null) + state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley..."); + } else if ( (state.getMaxSends() <= 0) || (THROTTLE_RESENDS) ) { + if (state.getMessage() != null) + state.getMessage().timestamp("choked, with another message retransmitting"); + return false; + } else { + if (state.getMessage() != null) + state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending..."); + } + } + + int size = state.getUnackedSize(); + if (allocateSendingBytes(size, state.getPushCount())) { + if (_log.shouldLog(Log.INFO)) + _log.info("Allocation of " + size + " allowed with " + + getSendWindowBytesRemaining() + + "/" + getSendWindowBytes() + + " remaining" + + " for message " + state.getMessageId() + ": " + state); + + if (state.getPushCount() > 0) + _retransmitter = state; + + state.push(); + + int rto = getRTO(); + state.setNextSendTime(now + rto); + + //if (peer.getSendWindowBytesRemaining() > 0) + // _throttle.unchoke(peer.getRemotePeer()); + return true; + } else { + _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); + if (state.getMessage() != null) + state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining()); + if (_log.shouldLog(Log.WARN)) + _log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() + + " available=" + getSendWindowBytesRemaining() + + " for message " + state.getMessageId() + ": " + state); + state.setNextSendTime(now+(_context.random().nextInt(2*ACKSender.ACK_FREQUENCY))); //(now + 1024) & ~SECOND_MASK); + if (_log.shouldLog(Log.WARN)) + _log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); + //_throttle.choke(peer.getRemotePeer()); + + if (state.getMessage() != null) + state.getMessage().timestamp("choked, not enough available, wsize=" + + getSendWindowBytes() + " available=" + + getSendWindowBytesRemaining()); + return false; + } + } // nextTime <= now + + return false; + } + + public int acked(long messageId) { + OutboundMessageState state = null; + Map msgs = _outboundMessages; + if (_dead) return 0; + synchronized (msgs) { + state = (OutboundMessageState)msgs.remove(new Long(messageId)); + if ( (state != null) && (state == _retransmitter) ) + _retransmitter = null; + } + + if (state != null) { + int numSends = state.getMaxSends(); + if (state.getMessage() != null) { + state.getMessage().timestamp("acked after " + numSends + + " lastReceived: " + + (_context.clock().now() - getLastReceiveTime()) + + " lastSentFully: " + + (_context.clock().now() - getLastSendFullyTime())); + } + + if (_log.shouldLog(Log.INFO)) + _log.info("Received ack of " + messageId + " by " + _remotePeer.toBase64() + + " after " + state.getLifetime() + " and " + numSends + " sends"); + _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); + if (state.getFragmentCount() > 1) + _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); + if (numSends > 1) + _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); + _transport.succeeded(state); + int numFragments = state.getFragmentCount(); + // this adjusts the rtt/rto/window/etc + messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), numSends); + //if (getSendWindowBytesRemaining() > 0) + // _throttle.unchoke(peer.getRemotePeer()); + + state.releaseResources(); + return numFragments; + } else { + // dupack, likely + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received an ACK for a message not pending: " + messageId); + return 0; + } + } + + public void acked(ACKBitfield bitfield) { + if (_dead) + return; + + if (bitfield.receivedComplete()) { + acked(bitfield.getMessageId()); + return; + } + + Map msgs = _outboundMessages; + + OutboundMessageState state = null; + boolean isComplete = false; + synchronized (msgs) { + state = (OutboundMessageState)msgs.get(new Long(bitfield.getMessageId())); + if (state != null) { + if (state.acked(bitfield)) { + // this partial ack actually clears it fully + isComplete = true; + msgs.remove(new Long(bitfield.getMessageId())); + if (state == _retransmitter) + _retransmitter = null; + } + } + } + + if (state != null) { + int numSends = state.getMaxSends(); + + int bits = bitfield.fragmentCount(); + int numACKed = 0; + for (int i = 0; i < bits; i++) + if (bitfield.received(i)) + numACKed++; + + _context.statManager().addRateData("udp.partialACKReceived", numACKed, state.getLifetime()); + + if (_log.shouldLog(Log.INFO)) + _log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer.toBase64() + + " after " + state.getLifetime() + " and " + numSends + " sends: " + bitfield + ": completely removed? " + + isComplete + ": " + state); + + if (isComplete) { + _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); + if (state.getFragmentCount() > 1) + _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); + if (numSends > 1) + _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); + if (state.getMessage() != null) + state.getMessage().timestamp("partial ack to complete after " + numSends); + _transport.succeeded(state); + + // this adjusts the rtt/rto/window/etc + messageACKed(state.getFragmentCount()*state.getFragmentSize(), state.getLifetime(), 0); + //if (state.getPeer().getSendWindowBytesRemaining() > 0) + // _throttle.unchoke(state.getPeer().getRemotePeer()); + + state.releaseResources(); + } else { + if (state.getMessage() != null) + state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString()); + } + return; + } else { + // dupack + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received an ACK for a message not pending: " + bitfield); + return; + } + } /** * Transfer the basic activity/state from the old peer to the current peer @@ -993,7 +1377,24 @@ public class PeerState { oldPeer._inboundMessages.clear(); } synchronized (_inboundMessages) { _inboundMessages.putAll(msgs); } + msgs.clear(); + OutboundMessageState retransmitter = null; + Map omsgs = oldPeer._outboundMessages; + if (omsgs != null) { + synchronized (omsgs) { + msgs.putAll(omsgs); + omsgs.clear(); + retransmitter = oldPeer._retransmitter; + } + } + omsgs = _outboundMessages; + if (omsgs != null) { + synchronized (omsgs) { + omsgs.putAll(msgs); + _retransmitter = retransmitter; + } + } } public int hashCode() { @@ -1003,6 +1404,7 @@ public class PeerState { return super.hashCode(); } public boolean equals(Object o) { + if (o == this) return true; if (o == null) return false; if (o instanceof PeerState) { PeerState s = (PeerState)o; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index b5acee669..cc50b01bb 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -590,7 +590,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(" lifetime: ").append(now - peer.getKeyEstablishedTime()); buf.append(" time since send/recv/ack: ").append(timeSinceSend).append(" / "); buf.append(timeSinceRecv).append(" / ").append(timeSinceAck); - + /* buf.append("Existing peers: \n"); synchronized (_peersByIdent) { for (Iterator iter = _peersByIdent.keySet().iterator(); iter.hasNext(); ) { @@ -617,13 +617,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append("\n"); } } + */ _log.warn(buf.toString(), new Exception("Dropped by")); } _introManager.remove(peer); + _fragments.dropPeer(peer); // a bit overzealous - perhaps we should only rebuild the external if the peer being dropped - // is one of our introducers? - rebuildExternalAddress(); + // is one of our introducers? dropping it only if we are considered 'not reachable' is a start + if (introducersRequired()) + rebuildExternalAddress(); if (peer.getRemotePeer() != null) { dropPeerCapacities(peer); @@ -723,6 +726,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority msg.timestamp("sending on UDP transport"); Hash to = msg.getTarget().getIdentity().calculateHash(); PeerState peer = getPeerState(to); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending to " + (to != null ? to.toBase64() : "")); if (peer != null) { long lastSend = peer.getLastSendFullyTime(); long lastRecv = peer.getLastReceiveTime(); @@ -736,14 +741,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority // peer is waaaay idle, drop the con and queue it up as a new con dropPeer(peer, false); msg.timestamp("peer is really idle, dropping con and reestablishing"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Proactive reestablish to " + to.toBase64()); _establisher.establish(msg); _context.statManager().addRateData("udp.proactiveReestablish", now-lastSend, now-peer.getKeyEstablishedTime()); return; } } msg.timestamp("enqueueing for an already established peer"); - _outboundMessages.add(msg); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add to fragments for " + to.toBase64()); + if (true) // skip the priority queue and go straight to the active pool + _fragments.add(msg); + else + _outboundMessages.add(msg); } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Establish new connection to " + to.toBase64()); msg.timestamp("establishing a new connection"); _establisher.establish(msg); } @@ -752,20 +766,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_log.shouldLog(Log.DEBUG)) _log.debug("Injecting a data message to a new peer: " + peer); OutboundMessageState state = new OutboundMessageState(_context); - state.initialize(msg, peer); - _fragments.add(state); + boolean ok = state.initialize(msg, peer); + if (ok) + _fragments.add(state); } - public OutNetMessage getNextMessage() { return getNextMessage(-1); } + //public OutNetMessage getNextMessage() { return getNextMessage(-1); } /** * Get the next message, blocking until one is found or the expiration * reached. * * @param blockUntil expiration, or -1 if indefinite */ + /* public OutNetMessage getNextMessage(long blockUntil) { return _outboundMessages.getNext(blockUntil); } + */ // we don't need the following, since we have our own queueing @@ -791,7 +808,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } void rebuildExternalAddress() { rebuildExternalAddress(true); } - void rebuildExternalAddress(boolean allowRebuildRouterInfo) { + void rebuildExternalAddress(boolean allowuterInfo) { if (_context.router().isHidden()) return; @@ -937,10 +954,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority public void failed(OutboundMessageState msg) { if (msg == null) return; int consecutive = 0; + OutNetMessage m = msg.getMessage(); if ( (msg.getPeer() != null) && ( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) || (msg.isExpired())) ) { - OutNetMessage m = msg.getMessage(); long recvDelay = _context.clock().now() - msg.getPeer().getLastReceiveTime(); long sendDelay = _context.clock().now() - msg.getPeer().getLastSendFullyTime(); if (m != null) @@ -959,7 +976,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority dropPeer(msg.getPeer(), false); } noteSend(msg, false); - super.afterSend(msg.getMessage(), false); + if (m != null) + super.afterSend(m, false); } private void noteSend(OutboundMessageState msg, boolean successful) { @@ -1011,8 +1029,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending message succeeded: " + msg); noteSend(msg, true); - if (msg.getMessage() != null) - super.afterSend(msg.getMessage(), true); + OutNetMessage m = msg.getMessage(); + if (m != null) + super.afterSend(m, true); } public int countActivePeers() { @@ -1171,8 +1190,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(idleOut); buf.append("s"); - int recvBps = (idleIn > 10 ? 0 : peer.getReceiveBps()); - int sendBps = (idleOut > 10 ? 0 : peer.getSendBps()); + int recvBps = (idleIn > 2 ? 0 : peer.getReceiveBps()); + int sendBps = (idleOut > 2 ? 0 : peer.getSendBps()); buf.append("
");
buf.append(formatKBps(recvBps));