2006-02-18 jrandom

* Migrate the outbound packets from a central component to the individual
      per-peer components, substantially cutting down on lock contention when
      dealing with higher degrees.
    * Load balance the outbound SSU transfers evenly across peers, rather than
      across messages (so peers with few messages won't be starved by peers
      with many).
    * Reduce the frequency of router info rebuilds (thanks bar!)
This commit is contained in:
jrandom
2006-02-19 03:22:31 +00:00
committed by zzz
parent 1202751359
commit 5aa335740a
6 changed files with 655 additions and 521 deletions

View File

@ -1,4 +1,13 @@
$Id: history.txt,v 1.404 2006/02/17 17:29:33 jrandom Exp $
$Id: history.txt,v 1.405 2006/02/18 01:48:48 jrandom Exp $
2006-02-18 jrandom
* Migrate the outbound packets from a central component to the individual
per-peer components, substantially cutting down on lock contention when
dealing with higher degrees.
* Load balance the outbound SSU transfers evenly across peers, rather than
across messages (so peers with few messages won't be starved by peers
with many).
* Reduce the frequency of router info rebuilds (thanks bar!)
2006-02-18 jrandom
* Add a new AIMD throttle in SSU to control the number of concurrent

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.347 $ $Date: 2006/02/17 17:29:32 $";
public final static String ID = "$Revision: 1.348 $ $Date: 2006/02/18 01:38:30 $";
public final static String VERSION = "0.6.1.10";
public final static long BUILD = 3;
public final static long BUILD = 4;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -169,6 +169,7 @@ public class EstablishmentManager {
msg.getTarget().getIdentity(),
new SessionKey(addr.getIntroKey()), addr);
_outboundStates.put(to, state);
SimpleTimer.getInstance().addEvent(new Expire(to, state), 10*1000);
}
}
if (state != null) {
@ -187,6 +188,33 @@ public class EstablishmentManager {
notifyActivity();
}
private class Expire implements SimpleTimer.TimedEvent {
private RemoteHostId _to;
private OutboundEstablishState _state;
public Expire(RemoteHostId to, OutboundEstablishState state) {
_to = to;
_state = state;
}
public void timeReached() {
Object removed = null;
synchronized (_outboundStates) {
removed = _outboundStates.remove(_to);
if (removed != _state) { // oops, we must have failed, then retried
_outboundStates.put(_to, removed);
removed = null;
}/* else {
locked_admitQueued();
}*/
}
if (removed != null) {
_context.statManager().addRateData("udp.outboundEstablishFailedState", _state.getState(), _state.getLifetime());
if (_log.shouldLog(Log.WARN))
_log.warn("Timing out expired outbound: " + _state);
processExpired(_state);
}
}
}
/**
* How many concurrent inbound sessions to deal with
*/
@ -213,11 +241,11 @@ public class EstablishmentManager {
state = (InboundEstablishState)_inboundStates.get(from);
if (state == null) {
state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort());
state.receiveSessionRequest(reader.getSessionRequestReader());
isNew = true;
_inboundStates.put(from, state);
}
}
state.receiveSessionRequest(reader.getSessionRequestReader());
if (isNew) {
if (!_transport.introducersRequired()) {
long tag = _context.random().nextLong(MAX_TAG_VALUE);
@ -332,6 +360,7 @@ public class EstablishmentManager {
msg.getTarget().getIdentity(),
new SessionKey(addr.getIntroKey()), addr);
_outboundStates.put(to, qstate);
SimpleTimer.getInstance().addEvent(new Expire(to, qstate), 10*1000);
for (int i = 0; i < queued.size(); i++) {
OutNetMessage m = (OutNetMessage)queued.get(i);
@ -361,7 +390,7 @@ public class EstablishmentManager {
private void handleCompletelyEstablished(InboundEstablishState state) {
long now = _context.clock().now();
RouterIdentity remote = state.getConfirmedIdentity();
PeerState peer = new PeerState(_context);
PeerState peer = new PeerState(_context, _transport);
peer.setCurrentCipherKey(state.getCipherKey());
peer.setCurrentMACKey(state.getMACKey());
peer.setCurrentReceiveSecond(now - (now % 1000));
@ -396,7 +425,7 @@ public class EstablishmentManager {
private PeerState handleCompletelyEstablished(OutboundEstablishState state) {
long now = _context.clock().now();
RouterIdentity remote = state.getRemoteIdentity();
PeerState peer = new PeerState(_context);
PeerState peer = new PeerState(_context, _transport);
peer.setCurrentCipherKey(state.getCipherKey());
peer.setCurrentMACKey(state.getMACKey());
peer.setCurrentReceiveSecond(now - (now % 1000));
@ -721,6 +750,7 @@ public class EstablishmentManager {
// _log.debug("# outbound states: " + _outboundStates.size());
for (Iterator iter = _outboundStates.values().iterator(); iter.hasNext(); ) {
OutboundEstablishState cur = (OutboundEstablishState)iter.next();
if (cur == null) continue;
if (cur.getState() == OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
// completely received
iter.remove();
@ -770,46 +800,7 @@ public class EstablishmentManager {
if (outboundState != null) {
if (outboundState.getLifetime() > MAX_ESTABLISH_TIME) {
if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
if (_log.shouldLog(Log.WARN))
_log.warn("Lifetime of expired outbound establish: " + outboundState.getLifetime());
while (true) {
OutNetMessage msg = outboundState.getNextQueuedMessage();
if (msg == null)
break;
_transport.failed(msg, "Expired during failed establish");
}
String err = null;
switch (outboundState.getState()) {
case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY:
err = "Took too long to establish remote connection (confirmed partially)";
break;
case OutboundEstablishState.STATE_CREATED_RECEIVED:
err = "Took too long to establish remote connection (created received)";
break;
case OutboundEstablishState.STATE_REQUEST_SENT:
err = "Took too long to establish remote connection (request sent)";
break;
case OutboundEstablishState.STATE_PENDING_INTRO:
err = "Took too long to establish remote connection (intro failed)";
break;
case OutboundEstablishState.STATE_UNKNOWN: // fallthrough
default:
err = "Took too long to establish remote connection (unknown state)";
}
Hash peer = outboundState.getRemoteIdentity().calculateHash();
_context.shitlist().shitlistRouter(peer, err);
_transport.dropPeer(peer);
//_context.profileManager().commErrorOccurred(peer);
} else {
while (true) {
OutNetMessage msg = outboundState.getNextQueuedMessage();
if (msg == null)
break;
_transport.send(msg);
}
}
processExpired(outboundState);
} else {
switch (outboundState.getState()) {
case OutboundEstablishState.STATE_UNKNOWN:
@ -847,6 +838,49 @@ public class EstablishmentManager {
return nextSendTime;
}
private void processExpired(OutboundEstablishState outboundState) {
if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
if (_log.shouldLog(Log.WARN))
_log.warn("Lifetime of expired outbound establish: " + outboundState.getLifetime());
while (true) {
OutNetMessage msg = outboundState.getNextQueuedMessage();
if (msg == null)
break;
_transport.failed(msg, "Expired during failed establish");
}
String err = null;
switch (outboundState.getState()) {
case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY:
err = "Took too long to establish remote connection (confirmed partially)";
break;
case OutboundEstablishState.STATE_CREATED_RECEIVED:
err = "Took too long to establish remote connection (created received)";
break;
case OutboundEstablishState.STATE_REQUEST_SENT:
err = "Took too long to establish remote connection (request sent)";
break;
case OutboundEstablishState.STATE_PENDING_INTRO:
err = "Took too long to establish remote connection (intro failed)";
break;
case OutboundEstablishState.STATE_UNKNOWN: // fallthrough
default:
err = "Took too long to establish remote connection (unknown state)";
}
Hash peer = outboundState.getRemoteIdentity().calculateHash();
_context.shitlist().shitlistRouter(peer, err);
_transport.dropPeer(peer);
//_context.profileManager().commErrorOccurred(peer);
} else {
while (true) {
OutNetMessage msg = outboundState.getNextQueuedMessage();
if (msg == null)
break;
_transport.send(msg);
}
}
}
/**
* Driving thread, processing up to one step for an inbound peer and up to
@ -881,13 +915,15 @@ public class EstablishmentManager {
long delay = nextSendTime - now;
if ( (nextSendTime == -1) || (delay > 0) ) {
if (delay > 5000)
delay = 5000;
boolean interrupted = false;
try {
synchronized (_activityLock) {
if (_activity > 0)
return;
if (nextSendTime == -1)
_activityLock.wait();
_activityLock.wait(5000);
else
_activityLock.wait(delay);
}

View File

@ -31,21 +31,15 @@ public class OutboundMessageFragments {
private Log _log;
private UDPTransport _transport;
private ActiveThrottle _throttle;
/** OutboundMessageState for messages being sent */
private List _activeMessages;
/** peers we are actively sending messages to */
private List _activePeers;
private boolean _alive;
/** which message should we build the next packet out of? */
private int _nextPacketMessage;
/** which peer should we build the next packet out of? */
private int _nextPeer;
private PacketBuilder _builder;
/** if we can handle more messages explicitly, set this to true */
private boolean _allowExcess;
private volatile long _packetsRetransmitted;
/**
* Map of peer to OutboundMessageState for messages being retransmitted, to
* keep bad peers from bursting too much due to congestion/outage. This
* should only be accessed when holding the lock on _activeMessages.
*/
private Map _retransmitters;
private static final int MAX_ACTIVE = 64;
// don't send a packet more than 10 times
@ -56,9 +50,8 @@ public class OutboundMessageFragments {
_log = ctx.logManager().getLog(OutboundMessageFragments.class);
_transport = transport;
_throttle = throttle;
_activeMessages = new ArrayList(MAX_ACTIVE);
_retransmitters = new HashMap(MAX_ACTIVE);
_nextPacketMessage = 0;
_activePeers = new ArrayList(256);
_nextPeer = 0;
_builder = new PacketBuilder(ctx, transport);
_alive = true;
_allowExcess = false;
@ -85,8 +78,16 @@ public class OutboundMessageFragments {
public void startup() { _alive = true; }
public void shutdown() {
_alive = false;
synchronized (_activeMessages) {
_activeMessages.notifyAll();
synchronized (_activePeers) {
_activePeers.notifyAll();
}
}
void dropPeer(PeerState peer) {
if (_log.shouldLog(Log.INFO))
_log.info("Dropping peer " + peer.getRemotePeer().toBase64());
peer.dropOutbound();
synchronized (_activePeers) {
_activePeers.remove(peer);
}
}
@ -99,7 +100,8 @@ public class OutboundMessageFragments {
public boolean waitForMoreAllowed() {
// test without choking.
// perhaps this should check the lifetime of the first activeMessage?
if (true) return true;
if (true) return true;
/*
long start = _context.clock().now();
int numActive = 0;
@ -121,6 +123,7 @@ public class OutboundMessageFragments {
_context.statManager().addRateData("udp.activeDelay", numActive, _context.clock().now() - start);
} catch (InterruptedException ie) {}
}
*/
return false;
}
@ -131,28 +134,32 @@ public class OutboundMessageFragments {
public void add(OutNetMessage msg) {
I2NPMessage msgBody = msg.getMessage();
RouterInfo target = msg.getTarget();
if ( (msgBody == null) || (target == null) ) {
synchronized (_activeMessages) {
_activeMessages.notifyAll();
}
if ( (msgBody == null) || (target == null) )
return;
}
OutboundMessageState state = new OutboundMessageState(_context);
boolean ok = state.initialize(msg, msgBody);
state.setPeer(_transport.getPeerState(target.getIdentity().calculateHash()));
finishMessages();
int active = 0;
synchronized (_activeMessages) {
if (ok)
_activeMessages.add(state);
active = _activeMessages.size();
if (active == 1)
_lastCycleTime = System.currentTimeMillis();
_activeMessages.notifyAll();
if (ok) {
PeerState peer = _transport.getPeerState(target.getIdentity().calculateHash());
int active = peer.add(state);
synchronized (_activePeers) {
if (!_activePeers.contains(peer)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
_activePeers.add(peer);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
}
_activePeers.notifyAll();
}
msg.timestamp("made active along with: " + active);
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Error initializing " + msg);
}
msg.timestamp("made active along with: " + active);
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
finishMessages();
}
/**
@ -160,116 +167,55 @@ public class OutboundMessageFragments {
* complete message reliably
*/
public void add(OutboundMessageState state) {
PeerState peer = state.getPeer();
if (peer == null)
throw new RuntimeException("wtf, null peer for " + state);
int active = peer.add(state);
synchronized (_activePeers) {
if (!_activePeers.contains(peer)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
_activePeers.add(peer);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
}
if (_activePeers.size() == 1)
_lastCycleTime = System.currentTimeMillis();
_activePeers.notifyAll();
}
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
// should we finish messages here too?
/*
synchronized (_activeMessages) {
_activeMessages.add(state);
if (_activeMessages.size() == 1)
_lastCycleTime = System.currentTimeMillis();
_activeMessages.notifyAll();
}
*/
}
/**
* Remove any expired or complete messages
*/
private void finishMessages() {
synchronized (_activeMessages) {
for (int i = 0; i < _activeMessages.size(); i++) {
OutboundMessageState state = (OutboundMessageState)_activeMessages.get(i);
PeerState peer = state.getPeer();
if (state.isComplete()) {
_activeMessages.remove(i);
locked_removeRetransmitter(state);
_transport.succeeded(state);
if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) )
_throttle.unchoke(peer.getRemotePeer());
state.releaseResources();
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
if (state.getMessage() != null)
state.getMessage().timestamp("sending complete");
int rv = 0;
synchronized (_activePeers) {
for (int i = 0; i < _activePeers.size(); i++) {
PeerState state = (PeerState)_activePeers.get(i);
int remaining = state.finishMessages();
if (remaining <= 0) {
_activePeers.remove(i);
if (_log.shouldLog(Log.DEBUG))
_log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
i--;
} else if (state.isExpired()) {
_activeMessages.remove(i);
locked_removeRetransmitter(state);
_context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime());
if (state.getMessage() != null) {
state.getMessage().timestamp("expired in the active pool");
_transport.failed(state);
} else {
// it can not have an OutNetMessage if the source is the
// final after establishment message
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send an expired direct message: " + state);
}
if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) )
_throttle.unchoke(peer.getRemotePeer());
state.releaseResources();
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
i--;
} else if (state.getPushCount() > MAX_VOLLEYS) {
_activeMessages.remove(i);
locked_removeRetransmitter(state);
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime());
//if (state.getPeer() != null)
// state.getPeer().congestionOccurred();
if (state.getMessage() != null) {
state.getMessage().timestamp("too many sends");
_transport.failed(state);
} else {
// it can not have an OutNetMessage if the source is the
// final after establishment message
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send a direct message after too many volleys: " + state);
}
if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) )
_throttle.unchoke(peer.getRemotePeer());
state.releaseResources();
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
i--;
} // end (pushCount > maxVolleys)
} // end iterating over active
_activeMessages.notifyAll();
} // end synchronized
}
/**
* Remove the block on retransmissions to the peer if and only if the given
* message is the current "retransmitter" for it.
*
*/
private void locked_removeRetransmitter(OutboundMessageState state) {
PeerState curPeer = state.getPeer();
if (curPeer == null) {
for (Iterator iter = _retransmitters.keySet().iterator(); iter.hasNext(); ) {
PeerState cpeer = (PeerState)iter.next();
OutboundMessageState cstate = (OutboundMessageState)_retransmitters.get(cpeer);
if (cstate == state) {
iter.remove();
break;
}
rv += remaining;
}
} else {
OutboundMessageState remState = (OutboundMessageState)_retransmitters.get(curPeer);
if (remState == state)
_retransmitters.remove(curPeer);
}
}
private static final long SECOND_MASK = 1023l;
private long _lastCycleTime = System.currentTimeMillis();
/**
@ -284,76 +230,58 @@ public class OutboundMessageFragments {
OutboundMessageState state = null;
while (_alive && (state == null) ) {
long now = _context.clock().now();
long nextSend = -1;
int nextSendDelay = -1;
finishMessages();
try {
synchronized (_activeMessages) {
for (int i = 0; i < _activeMessages.size(); i++) {
int cur = (i + _nextPacketMessage) % _activeMessages.size();
synchronized (_activePeers) {
for (int i = 0; i < _activePeers.size(); i++) {
int cur = (i + _nextPeer) % _activePeers.size();
if (cur == 0) {
long ts = System.currentTimeMillis();
long cycleTime = ts - _lastCycleTime;
_lastCycleTime = ts;
_context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activeMessages.size());
_context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size());
if (cycleTime > 1000)
_context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activeMessages.size());
_context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size());
}
state = (OutboundMessageState)_activeMessages.get(cur);
peer = state.getPeer(); // known if this is immediately after establish
if (peer == null)
peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash());
if ((peer != null) && locked_shouldSend(state, peer)) {
// for fairness, we move on in a round robin
_nextPacketMessage = i + 1;
peer = (PeerState)_activePeers.get(i);
state = peer.allocateSend();
if (state != null) {
_nextPeer = i + 1;
break;
} else {
if (peer == null) {
// peer disconnected
_activeMessages.remove(cur);
locked_removeRetransmitter(state);
if (state.getMessage() != null)
state.getMessage().timestamp("peer disconnected");
_transport.failed(state);
if (_log.shouldLog(Log.ERROR))
_log.error("Peer disconnected for " + state);
if ( (peer != null) && (peer.getSendWindowBytesRemaining() > 0) )
_throttle.unchoke(peer.getRemotePeer());
state.releaseResources();
i--;
}
long time = state.getNextSendTime();
if ( (nextSend < 0) || (time < nextSend) )
nextSend = time;
state = null;
int delay = peer.getNextDelay();
if ( (nextSendDelay <= 0) || (delay < nextSendDelay) )
nextSendDelay = delay;
peer = null;
state = null;
}
} // end of the for(activeMessages)
if (state == null) {
if (nextSend <= 0) {
_activeMessages.notifyAll();
_activeMessages.wait(1000);
} else {
// none of the packets were eligible for sending
long delay = nextSend - now;
if (delay <= 0)
delay = 10;
if (delay > 1000)
delay = 1000;
_allowExcess = true;
_activeMessages.notifyAll();
_activeMessages.wait(delay);
}
} else {
_activeMessages.notifyAll();
}
_allowExcess = false;
} // end of the synchronized block
} catch (InterruptedException ie) {}
} // end of the while (alive && !found)
if (_log.shouldLog(Log.DEBUG))
_log.debug("Done looping, next peer we are sending for: " +
(peer != null ? peer.getRemotePeer().toBase64() : "none"));
if (state == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("wait for " + nextSendDelay);
// wait.. or somethin'
if (nextSendDelay > 0)
_activePeers.wait(nextSendDelay);
else
_activePeers.wait(1000);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("dont wait: alive=" + _alive + " state = " + state);
}
}
} catch (InterruptedException ie) {
// noop
if (_log.shouldLog(Log.DEBUG))
_log.debug("Woken up while waiting");
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + state);
UDPPacket packets[] = preparePackets(state, peer);
if ( (state != null) && (state.getMessage() != null) ) {
int valid = 0;
@ -369,120 +297,6 @@ public class OutboundMessageFragments {
return packets;
}
/**
* If set to true, we should throttle retransmissions of all but the first message in
* flight to a peer. If set to false, we will only throttle the initial flight of a
* message to a peer while a retransmission is going on.
*/
private static final boolean THROTTLE_RESENDS = true;
/**
* if true, throttle the initial volley of a message if there is a resend in progress.
* if false, always send the first volley, regardless of retransmissions (but keeping in
* mind bw/cwin throttle, etc)
*
*/
private static final boolean THROTTLE_INITIAL_SEND = false;
private boolean locked_shouldSend(OutboundMessageState state, PeerState peer) {
long now = _context.clock().now();
if (state.getNextSendTime() <= now) {
if (!state.isFragmented()) {
state.fragment(fragmentSize(peer.getMTU()));
if (state.getMessage() != null)
state.getMessage().timestamp("fragment into " + state.getFragmentCount());
if (_log.shouldLog(Log.INFO))
_log.info("Fragmenting " + state);
}
OutboundMessageState curRetransMsg = (OutboundMessageState)_retransmitters.get(peer);
if ( (curRetransMsg != null) && ( (curRetransMsg.isExpired() || curRetransMsg.isComplete()) ) ) {
_retransmitters.remove(peer);
curRetransMsg = null;
}
if ( (curRetransMsg != null) && (curRetransMsg != state) ) {
// choke it, since there's already another message retransmitting to this
// peer.
_context.statManager().addRateData("udp.blockedRetransmissions", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted());
if ( (state.getMaxSends() <= 0) && (!THROTTLE_INITIAL_SEND) ) {
if (state.getMessage() != null)
state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley...");
} else if ( (state.getMaxSends() <= 0) || (THROTTLE_RESENDS) ) {
if (state.getMessage() != null)
state.getMessage().timestamp("choked, with another message retransmitting");
return false;
} else {
if (state.getMessage() != null)
state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");
}
}
int size = state.getUnackedSize();
if (peer.allocateSendingBytes(size, state.getPushCount())) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + size + " allowed with "
+ peer.getSendWindowBytesRemaining()
+ "/" + peer.getSendWindowBytes()
+ " remaining"
+ " for message " + state.getMessageId() + ": " + state);
if (state.getPushCount() > 0) {
_retransmitters.put(peer, state);
/*
int fragments = state.getFragmentCount();
int toSend = 0;
for (int i = 0; i < fragments; i++) {
if (state.needsSending(i))
toSend++;
}
peer.messageRetransmitted(toSend);
_packetsRetransmitted += toSend; // lifetime for the transport
_context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted());
_context.statManager().addRateData("udp.packetsRetransmitted", _packetsRetransmitted, peer.getPacketsTransmitted());
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmitting " + state + " to " + peer);
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), toSend);
*/
}
state.push();
int rto = peer.getRTO();// * state.getPushCount();
state.setNextSendTime(now + rto);
if (peer.getSendWindowBytesRemaining() > 0)
_throttle.unchoke(peer.getRemotePeer());
return true;
} else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
if (state.getMessage() != null)
state.getMessage().timestamp("send rejected, available=" + peer.getSendWindowBytesRemaining());
if (_log.shouldLog(Log.WARN))
_log.warn("Allocation of " + size + " rejected w/ wsize=" + peer.getSendWindowBytes()
+ " available=" + peer.getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime(now+(_context.random().nextInt(2*ACKSender.ACK_FREQUENCY))); //(now + 1024) & ~SECOND_MASK);
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
_throttle.choke(peer.getRemotePeer());
if (state.getMessage() != null)
state.getMessage().timestamp("choked, not enough available, wsize="
+ peer.getSendWindowBytes() + " available="
+ peer.getSendWindowBytesRemaining());
return false;
}
} // nextTime <= now
//if (state.getMessage() != null)
// state.getMessage().timestamp("choked, time remaining to retransmit: " + (state.getNextSendTime() - now));
return false;
}
private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
if ( (state != null) && (peer != null) ) {
int fragments = state.getFragmentCount();
@ -562,14 +376,6 @@ public class OutboundMessageFragments {
}
}
private static final int SSU_HEADER_SIZE = 46;
static final int UDP_HEADER_SIZE = 8;
static final int IP_HEADER_SIZE = 20;
/** how much payload data can we shove in there? */
private static final int fragmentSize(int mtu) {
return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE;
}
/**
* We received an ACK of the given messageId from the given peer, so if it
* is still unacked, mark it as complete.
@ -577,165 +383,27 @@ public class OutboundMessageFragments {
* @return fragments acked
*/
public int acked(long messageId, Hash ackedBy) {
OutboundMessageState state = null;
synchronized (_activeMessages) {
// linear search, since its tiny
for (int i = 0; i < _activeMessages.size(); i++) {
state = (OutboundMessageState)_activeMessages.get(i);
if (state.getMessageId() == messageId) {
OutNetMessage msg = state.getMessage();
if (msg != null) {
Hash expectedBy = msg.getTarget().getIdentity().getHash();
if (!expectedBy.equals(ackedBy)) {
state = null;
_activeMessages.notifyAll();
return 0;
}
}
// either the message was a short circuit after establishment,
// or it was received from who we sent it to. yay!
_activeMessages.remove(i);
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
locked_removeRetransmitter(state);
break;
} else {
state = null;
}
}
_activeMessages.notifyAll();
}
if (state != null) {
int numSends = state.getMaxSends();
if (state.getMessage() != null) {
PeerState peer = state.getPeer();
if (peer != null)
state.getMessage().timestamp("acked after " + numSends
+ " lastReceived: "
+ (_context.clock().now() - peer.getLastReceiveTime())
+ " lastSentFully: "
+ (_context.clock().now() - peer.getLastSendFullyTime()));
}
if (_log.shouldLog(Log.INFO))
_log.info("Received ack of " + messageId + " by " + ackedBy.toBase64()
+ " after " + state.getLifetime() + " and " + numSends + " sends");
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
if (numSends > 1)
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
_transport.succeeded(state);
int numFragments = state.getFragmentCount();
PeerState peer = state.getPeer();
if (peer != null) {
// this adjusts the rtt/rto/window/etc
peer.messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), numSends);
if (peer.getSendWindowBytesRemaining() > 0)
_throttle.unchoke(peer.getRemotePeer());
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("message acked, but no peer attacked: " + state);
}
state.releaseResources();
return numFragments;
PeerState peer = _transport.getPeerState(ackedBy);
if (peer != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("acked [" + messageId + "] by " + ackedBy.toBase64());
return peer.acked(messageId);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Received an ACK for a message not pending: " + messageId);
if (_log.shouldLog(Log.DEBUG))
_log.debug("acked [" + messageId + "] by an unknown remote peer? " + ackedBy.toBase64());
return 0;
}
}
public void acked(ACKBitfield bitfield, Hash ackedBy) {
if (bitfield.receivedComplete()) {
acked(bitfield.getMessageId(), ackedBy);
return;
}
OutboundMessageState state = null;
boolean isComplete = false;
synchronized (_activeMessages) {
// linear search, since its tiny
for (int i = 0; i < _activeMessages.size(); i++) {
state = (OutboundMessageState)_activeMessages.get(i);
if (state.getMessageId() == bitfield.getMessageId()) {
OutNetMessage msg = state.getMessage();
if (msg != null) {
Hash expectedBy = msg.getTarget().getIdentity().getHash();
if (!expectedBy.equals(ackedBy)) {
state = null;
_activeMessages.notifyAll();
return;
}
}
isComplete = state.acked(bitfield);
if (isComplete) {
// either the message was a short circuit after establishment,
// or it was received from who we sent it to. yay!
_activeMessages.remove(i);
if (i < _nextPacketMessage) {
_nextPacketMessage--;
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
}
locked_removeRetransmitter(state);
break;
} else {
state = null;
}
}
_activeMessages.notifyAll();
}
if (state != null) {
int numSends = state.getMaxSends();
int bits = bitfield.fragmentCount();
int numACKed = 0;
for (int i = 0; i < bits; i++)
if (bitfield.received(i))
numACKed++;
_context.statManager().addRateData("udp.partialACKReceived", numACKed, state.getLifetime());
if (_log.shouldLog(Log.INFO))
_log.info("Received partial ack of " + state.getMessageId() + " by " + ackedBy.toBase64()
+ " after " + state.getLifetime() + " and " + numSends + " sends: " + bitfield + ": completely removed? "
+ isComplete + ": " + state);
if (isComplete) {
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
if (numSends > 1)
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
if (state.getMessage() != null)
state.getMessage().timestamp("partial ack to complete after " + numSends);
_transport.succeeded(state);
if (state.getPeer() != null) {
// this adjusts the rtt/rto/window/etc
state.getPeer().messageACKed(state.getFragmentCount()*state.getFragmentSize(), state.getLifetime(), 0);
if (state.getPeer().getSendWindowBytesRemaining() > 0)
_throttle.unchoke(state.getPeer().getRemotePeer());
}
state.releaseResources();
} else {
if (state.getMessage() != null)
state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());
}
return;
PeerState peer = _transport.getPeerState(ackedBy);
if (peer != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("partial acked [" + bitfield + "] by " + ackedBy.toBase64());
peer.acked(bitfield);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Received an ACK for a message not pending: " + bitfield);
return;
if (_log.shouldLog(Log.DEBUG))
_log.debug("partial acked [" + bitfield + "] by an unknown remote peer? " + ackedBy.toBase64());
}
}

View File

@ -15,6 +15,7 @@ import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.util.Log;
import net.i2p.router.RouterContext;
import net.i2p.router.OutNetMessage;
/**
* Contain all of the state about a UDP connection to a peer.
@ -168,6 +169,12 @@ public class PeerState {
/** Message (Long) to InboundMessageState for active message */
private Map _inboundMessages;
/** Message (Long) to OutboundMessageState */
private Map _outboundMessages;
/** which outbound message is currently being retransmitted */
private OutboundMessageState _retransmitter;
private UDPTransport _transport;
/** have we migrated away from this peer to another newer one? */
private volatile boolean _dead;
@ -206,9 +213,10 @@ public class PeerState {
/** override the default MTU */
private static final String PROP_DEFAULT_MTU = "i2np.udp.mtu";
public PeerState(I2PAppContext ctx) {
public PeerState(I2PAppContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(PeerState.class);
_transport = transport;
_remotePeer = null;
_currentMACKey = null;
_currentCipherKey = null;
@ -254,6 +262,7 @@ public class PeerState {
_packetsReceived = 0;
_packetsReceivedDuplicate = 0;
_inboundMessages = new HashMap(8);
_outboundMessages = new HashMap(8);
_dead = false;
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
@ -938,8 +947,8 @@ public class PeerState {
}
private int countMaxACKData() {
return _mtu
- OutboundMessageFragments.IP_HEADER_SIZE
- OutboundMessageFragments.UDP_HEADER_SIZE
- IP_HEADER_SIZE
- UDP_HEADER_SIZE
- UDPPacket.IV_SIZE
- UDPPacket.MAC_SIZE
- 1 // type flag
@ -959,6 +968,381 @@ public class PeerState {
}
public RemoteHostId getRemoteHostId() { return _remoteHostId; }
public int add(OutboundMessageState state) {
state.setPeer(this);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
Map msgs = _outboundMessages;
if (msgs == null) return 0;
synchronized (msgs) {
msgs.put(new Long(state.getMessageId()), state);
return msgs.size();
}
}
/** drop all outbound messages */
public void dropOutbound() {
if (_dead) return;
_dead = true;
Map msgs = _outboundMessages;
//_outboundMessages = null;
_retransmitter = null;
if (msgs != null) {
synchronized (msgs) {
for (Iterator iter = msgs.values().iterator(); iter.hasNext();)
_transport.failed((OutboundMessageState)iter.next());
msgs.clear();
}
}
}
/**
* Expire / complete any outbound messages
* @return number of active outbound messages remaining
*/
public int finishMessages() {
int rv = 0;
Map msgs = _outboundMessages;
if (_dead) return 0;
List succeeded = null;
List failed = null;
synchronized (msgs) {
for (Iterator iter = msgs.keySet().iterator(); iter.hasNext(); ) {
Long id = (Long)iter.next();
OutboundMessageState state = (OutboundMessageState)msgs.get(id);
if (state.isComplete()) {
iter.remove();
if (_retransmitter == state)
_retransmitter = null;
if (succeeded == null) succeeded = new ArrayList(4);
succeeded.add(state);
} else if (state.isExpired()) {
iter.remove();
if (_retransmitter == state)
_retransmitter = null;
_context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime());
if (failed == null) failed = new ArrayList(4);
failed.add(state);
} else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
iter.remove();
if (state == _retransmitter)
_retransmitter = null;
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime());
if (failed == null) failed = new ArrayList(4);
failed.add(state);
} // end (pushCount > maxVolleys)
} // end iterating over outbound messages
rv = msgs.size();
}
for (int i = 0; succeeded != null && i < succeeded.size(); i++) {
OutboundMessageState state = (OutboundMessageState)succeeded.get(i);
_transport.succeeded(state);
state.releaseResources();
OutNetMessage msg = state.getMessage();
if (msg != null)
msg.timestamp("sending complete");
}
for (int i = 0; failed != null && i < failed.size(); i++) {
OutboundMessageState state = (OutboundMessageState)failed.get(i);
OutNetMessage msg = state.getMessage();
if (msg != null) {
msg.timestamp("expired in the active pool");
_transport.failed(state);
} else {
// it can not have an OutNetMessage if the source is the
// final after establishment message
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send a direct message: " + state);
}
state.releaseResources();
}
return rv;
}
/**
* Pick a message we want to send and allocate it out of our window
* @return allocated message to send, or null if no messages or no resources
*
*/
public OutboundMessageState allocateSend() {
int total = 0;
Map msgs = _outboundMessages;
if (_dead) return null;
synchronized (msgs) {
for (Iterator iter = msgs.values().iterator(); iter.hasNext(); ) {
OutboundMessageState state = (OutboundMessageState)iter.next();
if (locked_shouldSend(state)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId());
/*
while (iter.hasNext()) {
OutboundMessageState later = (OutboundMessageState)iter.next();
OutNetMessage msg = later.getMessage();
if (msg != null)
msg.timestamp("not reached for allocation " + msgs.size() + " other peers");
}
*/
return state;
} else {
OutNetMessage msg = state.getMessage();
if (msg != null)
msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
}
}
total = msgs.size();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send to " + _remotePeer.toBase64() + ", with " + total + " remaining");
return null;
}
/**
* return how long to wait before sending, or -1 if we have nothing to send
*/
public int getNextDelay() {
int rv = -1;
long now = _context.clock().now();
Map msgs = _outboundMessages;
if (_dead) return -1;
synchronized (msgs) {
if (_retransmitter != null) {
rv = (int)(now - _retransmitter.getNextSendTime());
if (rv <= 0)
return 1;
else
return rv;
}
for (Iterator iter = msgs.values().iterator(); iter.hasNext(); ) {
OutboundMessageState state = (OutboundMessageState)iter.next();
int delay = (int)(state.getNextSendTime() - now);
if (delay <= 0)
delay = 1;
if ( (rv <= 0) || (delay < rv) )
rv = delay;
}
}
return rv;
}
/**
* If set to true, we should throttle retransmissions of all but the first message in
* flight to a peer. If set to false, we will only throttle the initial flight of a
* message to a peer while a retransmission is going on.
*/
private static final boolean THROTTLE_RESENDS = true;
/**
* if true, throttle the initial volley of a message if there is a resend in progress.
* if false, always send the first volley, regardless of retransmissions (but keeping in
* mind bw/cwin throttle, etc)
*
*/
private static final boolean THROTTLE_INITIAL_SEND = false;
private static final int SSU_HEADER_SIZE = 46;
static final int UDP_HEADER_SIZE = 8;
static final int IP_HEADER_SIZE = 20;
/** how much payload data can we shove in there? */
private static final int fragmentSize(int mtu) {
return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE;
}
private boolean locked_shouldSend(OutboundMessageState state) {
long now = _context.clock().now();
if (state.getNextSendTime() <= now) {
if (!state.isFragmented()) {
state.fragment(fragmentSize(getMTU()));
if (state.getMessage() != null)
state.getMessage().timestamp("fragment into " + state.getFragmentCount());
if (_log.shouldLog(Log.INFO))
_log.info("Fragmenting " + state);
}
if ( (_retransmitter != null) && ( (_retransmitter.isExpired() || _retransmitter.isComplete()) ) )
_retransmitter = null;
if ( (_retransmitter != null) && (_retransmitter != state) ) {
// choke it, since there's already another message retransmitting to this
// peer.
_context.statManager().addRateData("udp.blockedRetransmissions", getPacketsRetransmitted(), getPacketsTransmitted());
if ( (state.getMaxSends() <= 0) && (!THROTTLE_INITIAL_SEND) ) {
if (state.getMessage() != null)
state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley...");
} else if ( (state.getMaxSends() <= 0) || (THROTTLE_RESENDS) ) {
if (state.getMessage() != null)
state.getMessage().timestamp("choked, with another message retransmitting");
return false;
} else {
if (state.getMessage() != null)
state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");
}
}
int size = state.getUnackedSize();
if (allocateSendingBytes(size, state.getPushCount())) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + size + " allowed with "
+ getSendWindowBytesRemaining()
+ "/" + getSendWindowBytes()
+ " remaining"
+ " for message " + state.getMessageId() + ": " + state);
if (state.getPushCount() > 0)
_retransmitter = state;
state.push();
int rto = getRTO();
state.setNextSendTime(now + rto);
//if (peer.getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(peer.getRemotePeer());
return true;
} else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
if (state.getMessage() != null)
state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
if (_log.shouldLog(Log.WARN))
_log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
+ " available=" + getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime(now+(_context.random().nextInt(2*ACKSender.ACK_FREQUENCY))); //(now + 1024) & ~SECOND_MASK);
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
//_throttle.choke(peer.getRemotePeer());
if (state.getMessage() != null)
state.getMessage().timestamp("choked, not enough available, wsize="
+ getSendWindowBytes() + " available="
+ getSendWindowBytesRemaining());
return false;
}
} // nextTime <= now
return false;
}
public int acked(long messageId) {
OutboundMessageState state = null;
Map msgs = _outboundMessages;
if (_dead) return 0;
synchronized (msgs) {
state = (OutboundMessageState)msgs.remove(new Long(messageId));
if ( (state != null) && (state == _retransmitter) )
_retransmitter = null;
}
if (state != null) {
int numSends = state.getMaxSends();
if (state.getMessage() != null) {
state.getMessage().timestamp("acked after " + numSends
+ " lastReceived: "
+ (_context.clock().now() - getLastReceiveTime())
+ " lastSentFully: "
+ (_context.clock().now() - getLastSendFullyTime()));
}
if (_log.shouldLog(Log.INFO))
_log.info("Received ack of " + messageId + " by " + _remotePeer.toBase64()
+ " after " + state.getLifetime() + " and " + numSends + " sends");
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
if (numSends > 1)
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
_transport.succeeded(state);
int numFragments = state.getFragmentCount();
// this adjusts the rtt/rto/window/etc
messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), numSends);
//if (getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(peer.getRemotePeer());
state.releaseResources();
return numFragments;
} else {
// dupack, likely
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received an ACK for a message not pending: " + messageId);
return 0;
}
}
public void acked(ACKBitfield bitfield) {
if (_dead)
return;
if (bitfield.receivedComplete()) {
acked(bitfield.getMessageId());
return;
}
Map msgs = _outboundMessages;
OutboundMessageState state = null;
boolean isComplete = false;
synchronized (msgs) {
state = (OutboundMessageState)msgs.get(new Long(bitfield.getMessageId()));
if (state != null) {
if (state.acked(bitfield)) {
// this partial ack actually clears it fully
isComplete = true;
msgs.remove(new Long(bitfield.getMessageId()));
if (state == _retransmitter)
_retransmitter = null;
}
}
}
if (state != null) {
int numSends = state.getMaxSends();
int bits = bitfield.fragmentCount();
int numACKed = 0;
for (int i = 0; i < bits; i++)
if (bitfield.received(i))
numACKed++;
_context.statManager().addRateData("udp.partialACKReceived", numACKed, state.getLifetime());
if (_log.shouldLog(Log.INFO))
_log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer.toBase64()
+ " after " + state.getLifetime() + " and " + numSends + " sends: " + bitfield + ": completely removed? "
+ isComplete + ": " + state);
if (isComplete) {
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
if (numSends > 1)
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
if (state.getMessage() != null)
state.getMessage().timestamp("partial ack to complete after " + numSends);
_transport.succeeded(state);
// this adjusts the rtt/rto/window/etc
messageACKed(state.getFragmentCount()*state.getFragmentSize(), state.getLifetime(), 0);
//if (state.getPeer().getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(state.getPeer().getRemotePeer());
state.releaseResources();
} else {
if (state.getMessage() != null)
state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());
}
return;
} else {
// dupack
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received an ACK for a message not pending: " + bitfield);
return;
}
}
/**
* Transfer the basic activity/state from the old peer to the current peer
@ -993,7 +1377,24 @@ public class PeerState {
oldPeer._inboundMessages.clear();
}
synchronized (_inboundMessages) { _inboundMessages.putAll(msgs); }
msgs.clear();
OutboundMessageState retransmitter = null;
Map omsgs = oldPeer._outboundMessages;
if (omsgs != null) {
synchronized (omsgs) {
msgs.putAll(omsgs);
omsgs.clear();
retransmitter = oldPeer._retransmitter;
}
}
omsgs = _outboundMessages;
if (omsgs != null) {
synchronized (omsgs) {
omsgs.putAll(msgs);
_retransmitter = retransmitter;
}
}
}
public int hashCode() {
@ -1003,6 +1404,7 @@ public class PeerState {
return super.hashCode();
}
public boolean equals(Object o) {
if (o == this) return true;
if (o == null) return false;
if (o instanceof PeerState) {
PeerState s = (PeerState)o;

View File

@ -590,7 +590,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(" lifetime: ").append(now - peer.getKeyEstablishedTime());
buf.append(" time since send/recv/ack: ").append(timeSinceSend).append(" / ");
buf.append(timeSinceRecv).append(" / ").append(timeSinceAck);
/*
buf.append("Existing peers: \n");
synchronized (_peersByIdent) {
for (Iterator iter = _peersByIdent.keySet().iterator(); iter.hasNext(); ) {
@ -617,13 +617,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("\n");
}
}
*/
_log.warn(buf.toString(), new Exception("Dropped by"));
}
_introManager.remove(peer);
_fragments.dropPeer(peer);
// a bit overzealous - perhaps we should only rebuild the external if the peer being dropped
// is one of our introducers?
rebuildExternalAddress();
// is one of our introducers? dropping it only if we are considered 'not reachable' is a start
if (introducersRequired())
rebuildExternalAddress();
if (peer.getRemotePeer() != null) {
dropPeerCapacities(peer);
@ -723,6 +726,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
msg.timestamp("sending on UDP transport");
Hash to = msg.getTarget().getIdentity().calculateHash();
PeerState peer = getPeerState(to);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending to " + (to != null ? to.toBase64() : ""));
if (peer != null) {
long lastSend = peer.getLastSendFullyTime();
long lastRecv = peer.getLastReceiveTime();
@ -736,14 +741,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// peer is waaaay idle, drop the con and queue it up as a new con
dropPeer(peer, false);
msg.timestamp("peer is really idle, dropping con and reestablishing");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Proactive reestablish to " + to.toBase64());
_establisher.establish(msg);
_context.statManager().addRateData("udp.proactiveReestablish", now-lastSend, now-peer.getKeyEstablishedTime());
return;
}
}
msg.timestamp("enqueueing for an already established peer");
_outboundMessages.add(msg);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add to fragments for " + to.toBase64());
if (true) // skip the priority queue and go straight to the active pool
_fragments.add(msg);
else
_outboundMessages.add(msg);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Establish new connection to " + to.toBase64());
msg.timestamp("establishing a new connection");
_establisher.establish(msg);
}
@ -752,20 +766,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.DEBUG))
_log.debug("Injecting a data message to a new peer: " + peer);
OutboundMessageState state = new OutboundMessageState(_context);
state.initialize(msg, peer);
_fragments.add(state);
boolean ok = state.initialize(msg, peer);
if (ok)
_fragments.add(state);
}
public OutNetMessage getNextMessage() { return getNextMessage(-1); }
//public OutNetMessage getNextMessage() { return getNextMessage(-1); }
/**
* Get the next message, blocking until one is found or the expiration
* reached.
*
* @param blockUntil expiration, or -1 if indefinite
*/
/*
public OutNetMessage getNextMessage(long blockUntil) {
return _outboundMessages.getNext(blockUntil);
}
*/
// we don't need the following, since we have our own queueing
@ -791,7 +808,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
void rebuildExternalAddress() { rebuildExternalAddress(true); }
void rebuildExternalAddress(boolean allowRebuildRouterInfo) {
void rebuildExternalAddress(boolean allowuterInfo) {
if (_context.router().isHidden())
return;
@ -937,10 +954,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void failed(OutboundMessageState msg) {
if (msg == null) return;
int consecutive = 0;
OutNetMessage m = msg.getMessage();
if ( (msg.getPeer() != null) &&
( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) ||
(msg.isExpired())) ) {
OutNetMessage m = msg.getMessage();
long recvDelay = _context.clock().now() - msg.getPeer().getLastReceiveTime();
long sendDelay = _context.clock().now() - msg.getPeer().getLastSendFullyTime();
if (m != null)
@ -959,7 +976,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
dropPeer(msg.getPeer(), false);
}
noteSend(msg, false);
super.afterSend(msg.getMessage(), false);
if (m != null)
super.afterSend(m, false);
}
private void noteSend(OutboundMessageState msg, boolean successful) {
@ -1011,8 +1029,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending message succeeded: " + msg);
noteSend(msg, true);
if (msg.getMessage() != null)
super.afterSend(msg.getMessage(), true);
OutNetMessage m = msg.getMessage();
if (m != null)
super.afterSend(m, true);
}
public int countActivePeers() {
@ -1171,8 +1190,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(idleOut);
buf.append("s</code></td>");
int recvBps = (idleIn > 10 ? 0 : peer.getReceiveBps());
int sendBps = (idleOut > 10 ? 0 : peer.getSendBps());
int recvBps = (idleIn > 2 ? 0 : peer.getReceiveBps());
int sendBps = (idleOut > 2 ? 0 : peer.getSendBps());
buf.append("<td valign=\"top\" ><code>");
buf.append(formatKBps(recvBps));