* randomized the shitlist duration (still with exponential backoff though)

* fail UDP sessions after two consecutive failed messages in different minutes
* honor UDP reconnections
This commit is contained in:
jrandom
2005-04-25 16:29:48 +00:00
committed by zzz
parent cde7ac7e52
commit 567ce84e1e
5 changed files with 70 additions and 31 deletions

View File

@ -63,7 +63,7 @@ public class Shitlist {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Shitlisting router " + peer.toBase64(), new Exception("Shitlist cause")); _log.info("Shitlisting router " + peer.toBase64(), new Exception("Shitlist cause"));
long period = SHITLIST_DURATION_MS; long period = SHITLIST_DURATION_MS + _context.random().nextLong(SHITLIST_DURATION_MS);
PeerProfile prof = _context.profileOrganizer().getProfile(peer); PeerProfile prof = _context.profileOrganizer().getProfile(peer);
if (prof != null) if (prof != null)
period = SHITLIST_DURATION_MS << prof.incrementShitlists(); period = SHITLIST_DURATION_MS << prof.incrementShitlists();

View File

@ -136,7 +136,7 @@ public class OutboundMessageFragments {
_context.statManager().addRateData("udp.sendFailed", state.getFragmentCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendFailed", state.getFragmentCount(), state.getLifetime());
if (state.getMessage() != null) { if (state.getMessage() != null) {
_transport.failed(state.getMessage()); _transport.failed(state);
} else { } else {
// it can not have an OutNetMessage if the source is the // it can not have an OutNetMessage if the source is the
// final after establishment message // final after establishment message
@ -152,7 +152,7 @@ public class OutboundMessageFragments {
// state.getPeer().congestionOccurred(); // state.getPeer().congestionOccurred();
if (state.getMessage() != null) { if (state.getMessage() != null) {
_transport.failed(state.getMessage()); _transport.failed(state);
} else { } else {
// it can not have an OutNetMessage if the source is the // it can not have an OutNetMessage if the source is the
// final after establishment message // final after establishment message
@ -192,9 +192,9 @@ public class OutboundMessageFragments {
peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash()); peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash());
if (peer == null) { if (peer == null) {
// peer disconnected (whatever that means) // peer disconnected
_activeMessages.remove(cur); _activeMessages.remove(cur);
_transport.failed(state.getMessage()); _transport.failed(state);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Peer disconnected for " + state); _log.warn("Peer disconnected for " + state);
state.releaseResources(); state.releaseResources();

View File

@ -63,8 +63,10 @@ public class PeerState {
private long _lastSendTime; private long _lastSendTime;
/** when did we last receive a packet from them? */ /** when did we last receive a packet from them? */
private long _lastReceiveTime; private long _lastReceiveTime;
/** how many seconds have we sent packets without any ACKs received? */ /** how many consecutive messages have we sent and not received an ACK to */
private int _consecutiveSendingSecondsWithoutACKs; private int _consecutiveFailedSends;
/** when did we last have a failed send */
private long _lastFailedSendMinute;
/** list of messageIds (Long) that we have received but not yet sent */ /** list of messageIds (Long) that we have received but not yet sent */
private List _currentACKs; private List _currentACKs;
/** when did we last send ACKs to the peer? */ /** when did we last send ACKs to the peer? */
@ -212,7 +214,7 @@ public class PeerState {
/** when did we last receive a packet from them? */ /** when did we last receive a packet from them? */
public long getLastReceiveTime() { return _lastReceiveTime; } public long getLastReceiveTime() { return _lastReceiveTime; }
/** how many seconds have we sent packets without any ACKs received? */ /** how many seconds have we sent packets without any ACKs received? */
public int getConsecutiveSendingSecondsWithoutACKS() { return _consecutiveSendingSecondsWithoutACKs; } public int getConsecutiveFailedSends() { return _consecutiveFailedSends; }
/** have we received a packet with the ECN bit set in the current second? */ /** have we received a packet with the ECN bit set in the current second? */
public boolean getCurrentSecondECNReceived() { return _currentSecondECNReceived; } public boolean getCurrentSecondECNReceived() { return _currentSecondECNReceived; }
/** /**
@ -303,22 +305,16 @@ public class PeerState {
public void setLastSendTime(long when) { _lastSendTime = when; } public void setLastSendTime(long when) { _lastSendTime = when; }
/** when did we last receive a packet from them? */ /** when did we last receive a packet from them? */
public void setLastReceiveTime(long when) { _lastReceiveTime = when; } public void setLastReceiveTime(long when) { _lastReceiveTime = when; }
public void incrementConsecutiveSendingSecondsWithoutACKS() { _consecutiveSendingSecondsWithoutACKs++; } public int incrementConsecutiveFailedSends() {
public void resetConsecutiveSendingSecondsWithoutACKS() { _consecutiveSendingSecondsWithoutACKs = 0; } long now = _context.clock().now()/60*1000;
if (_lastFailedSendMinute == now) {
/* // ignore... too fast
public void migrateACKs(List NACKs, long newSecond) { } else {
_previousSecondACKs = _currentSecondACKs; _lastFailedSendMinute = now;
if (_currentSecondECNReceived) _consecutiveFailedSends++;
_sendWindowBytes /= 2; }
if (_sendWindowBytes < MINIMUM_WINDOW_BYTES) return _consecutiveFailedSends;
_sendWindowBytes = MINIMUM_WINDOW_BYTES;
_sendWindowBytesRemaining = _sendWindowBytes;
_currentSecondECNReceived = false;
_remoteWantsPreviousACKs = true;
_currentReceiveSecond = newSecond;
} }
*/
/** /**
* have all of the packets received in the current second requested that * have all of the packets received in the current second requested that
@ -419,7 +415,8 @@ public class PeerState {
/** we sent a message which was ACKed containing the given # of bytes */ /** we sent a message which was ACKed containing the given # of bytes */
public void messageACKed(int bytesACKed, long lifetime, int numSends) { public void messageACKed(int bytesACKed, long lifetime, int numSends) {
_consecutiveSendingSecondsWithoutACKs = 0; _consecutiveFailedSends = 0;
_lastFailedSendMinute = -1;
if (_sendWindowBytes <= _slowStartThreshold) { if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed; _sendWindowBytes += bytesACKed;
} else { } else {
@ -442,9 +439,9 @@ public class PeerState {
_rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation)); _rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation));
_rtt = (int)((float)_rtt*(0.9f) + (0.1f)*(float)lifetime); _rtt = (int)((float)_rtt*(0.9f) + (0.1f)*(float)lifetime);
_rto = _rtt + (_rttDeviation<<2); _rto = _rtt + (_rttDeviation<<2);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.DEBUG))
_log.warn("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt _log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt
+ " rttDev=" + _rttDeviation + " rto=" + _rto); + " rttDev=" + _rttDeviation + " rto=" + _rto);
if (_rto < 1000) if (_rto < 1000)
_rto = 1000; _rto = 1000;
if (_rto > 5000) if (_rto > 5000)

View File

@ -35,6 +35,12 @@ class UDPFlooder implements Runnable {
_peers.notifyAll(); _peers.notifyAll();
} }
} }
public void removePeer(PeerState peer) {
synchronized (_peers) {
_peers.remove(peer);
_peers.notifyAll();
}
}
public void startup() { public void startup() {
_alive = true; _alive = true;

View File

@ -88,6 +88,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** should we flood all UDP peers with the configured rate? */ /** should we flood all UDP peers with the configured rate? */
private static final boolean SHOULD_FLOOD_PEERS = true; private static final boolean SHOULD_FLOOD_PEERS = true;
private static final int MAX_CONSECUTIVE_FAILED = 2;
public UDPTransport(RouterContext ctx) { public UDPTransport(RouterContext ctx) {
super(ctx); super(ctx);
_context = ctx; _context = ctx;
@ -290,8 +292,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
synchronized (_peersByIdent) { synchronized (_peersByIdent) {
PeerState oldPeer = (PeerState)_peersByIdent.put(peer.getRemotePeer(), peer); PeerState oldPeer = (PeerState)_peersByIdent.put(peer.getRemotePeer(), peer);
if ( (oldPeer != null) && (oldPeer != peer) ) { if ( (oldPeer != null) && (oldPeer != peer) ) {
_peersByIdent.put(oldPeer.getRemotePeer(), oldPeer); // should we transfer the oldPeer's RTT/RTO/etc? nah
return false; // or perhaps reject the new session? nah,
// using the new one allow easier reconnect
} }
} }
} }
@ -302,8 +305,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
synchronized (_peersByRemoteHost) { synchronized (_peersByRemoteHost) {
PeerState oldPeer = (PeerState)_peersByRemoteHost.put(remoteString, peer); PeerState oldPeer = (PeerState)_peersByRemoteHost.put(remoteString, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) { if ( (oldPeer != null) && (oldPeer != peer) ) {
_peersByRemoteHost.put(remoteString, oldPeer); //_peersByRemoteHost.put(remoteString, oldPeer);
return false; //return false;
} }
} }
@ -315,6 +318,27 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return true; return true;
} }
private void dropPeer(PeerState peer) {
if (_log.shouldLog(Log.WARN))
_log.debug("Dropping remote peer: " + peer);
if (peer.getRemotePeer() != null) {
_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries");
synchronized (_peersByIdent) {
_peersByIdent.remove(peer.getRemotePeer());
}
}
String remoteString = peer.getRemoteHostString();
if (remoteString != null) {
synchronized (_peersByRemoteHost) {
_peersByRemoteHost.remove(remoteString);
}
}
if (SHOULD_FLOOD_PEERS)
_flooder.removePeer(peer);
}
int send(UDPPacket packet) { int send(UDPPacket packet) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending packet " + packet); _log.debug("Sending packet " + packet);
@ -451,6 +475,18 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
replaceAddress(addr); replaceAddress(addr);
} }
public void failed(OutboundMessageState msg) {
if (msg == null) return;
int consecutive = 0;
if (msg.getPeer() != null)
consecutive = msg.getPeer().incrementConsecutiveFailedSends();
if (_log.shouldLog(Log.WARN))
_log.warn("Consecutive failure #" + consecutive + " sending to " + msg.getPeer());
if (consecutive > MAX_CONSECUTIVE_FAILED)
dropPeer(msg.getPeer());
failed(msg.getMessage());
}
public void failed(OutNetMessage msg) { public void failed(OutNetMessage msg) {
if (msg == null) return; if (msg == null) return;
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))