2006-02-18 jrandom
* Migrate the outbound packets from a central component to the individual per-peer components, substantially cutting down on lock contention when dealing with higher degrees. * Load balance the outbound SSU transfers evenly across peers, rather than across messages (so peers with few messages won't be starved by peers with many). * Reduce the frequency of router info rebuilds (thanks bar!)
This commit is contained in:
@ -713,7 +713,8 @@ public class EstablishmentManager {
|
||||
break;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("why are we confirmed with no identity? " + inboundState);
|
||||
_log.warn("confirmed with invalid? " + inboundState);
|
||||
inboundState.fail();
|
||||
break;
|
||||
}
|
||||
case InboundEstablishState.STATE_FAILED:
|
||||
|
@ -201,19 +201,26 @@ public class OutboundMessageFragments {
|
||||
*/
|
||||
private void finishMessages() {
|
||||
int rv = 0;
|
||||
List peers = new ArrayList();
|
||||
synchronized (_activePeers) {
|
||||
peers = new ArrayList(_activePeers);
|
||||
for (int i = 0; i < _activePeers.size(); i++) {
|
||||
PeerState state = (PeerState)_activePeers.get(i);
|
||||
int remaining = state.finishMessages();
|
||||
if (remaining <= 0) {
|
||||
if (state.getOutboundMessageCount() <= 0) {
|
||||
_activePeers.remove(i);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
|
||||
i--;
|
||||
}
|
||||
rv += remaining;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < peers.size(); i++) {
|
||||
PeerState state = (PeerState)peers.get(i);
|
||||
int remaining = state.finishMessages();
|
||||
if (remaining <= 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
|
||||
}
|
||||
rv += remaining;
|
||||
}
|
||||
}
|
||||
|
||||
private long _lastCycleTime = System.currentTimeMillis();
|
||||
@ -231,7 +238,7 @@ public class OutboundMessageFragments {
|
||||
while (_alive && (state == null) ) {
|
||||
long now = _context.clock().now();
|
||||
int nextSendDelay = -1;
|
||||
finishMessages();
|
||||
//finishMessages();
|
||||
try {
|
||||
synchronized (_activePeers) {
|
||||
for (int i = 0; i < _activePeers.size(); i++) {
|
||||
|
@ -997,6 +997,18 @@ public class PeerState {
|
||||
}
|
||||
}
|
||||
|
||||
public int getOutboundMessageCount() {
|
||||
Map msgs = _outboundMessages;
|
||||
if (_dead) return 0;
|
||||
if (msgs != null) {
|
||||
synchronized (msgs) {
|
||||
return msgs.size();
|
||||
}
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Expire / complete any outbound messages
|
||||
* @return number of active outbound messages remaining
|
||||
|
@ -808,7 +808,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
}
|
||||
|
||||
void rebuildExternalAddress() { rebuildExternalAddress(true); }
|
||||
void rebuildExternalAddress(boolean allowuterInfo) {
|
||||
void rebuildExternalAddress(boolean allowRebuildRouterInfo) {
|
||||
if (_context.router().isHidden())
|
||||
return;
|
||||
|
||||
|
Reference in New Issue
Block a user