fixed up some of the cleanup code to handle out of order responses
minor refactoring, formatting
This commit is contained in:
@ -4,6 +4,8 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
@ -37,7 +39,7 @@ public class PeerData {
|
||||
private RateStat _lostRate;
|
||||
|
||||
/** how long we wait before timing out pending pings (30 seconds) */
|
||||
private static final long TIMEOUT_PERIOD = 30 * 1000;
|
||||
private static final long TIMEOUT_PERIOD = 60 * 1000;
|
||||
|
||||
/** synchronize on this when updating _dataPoints or _pendingPings */
|
||||
private Object _updateLock = new Object();
|
||||
@ -100,33 +102,20 @@ public class PeerData {
|
||||
* when did this test begin?
|
||||
* @return when the test began
|
||||
*/
|
||||
public long getSessionStart() {
|
||||
return _sessionStart;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets when the test began
|
||||
* @param when the time the session began
|
||||
*/
|
||||
public void setSessionStart(long when) {
|
||||
_sessionStart = when;
|
||||
}
|
||||
public long getSessionStart() { return _sessionStart; }
|
||||
public void setSessionStart(long when) { _sessionStart = when; }
|
||||
|
||||
/**
|
||||
* how many pings have we sent for this test?
|
||||
* @return the number of pings sent
|
||||
*/
|
||||
public long getLifetimeSent() {
|
||||
return _lifetimeSent;
|
||||
}
|
||||
public long getLifetimeSent() { return _lifetimeSent; }
|
||||
|
||||
/**
|
||||
* how many pongs have we received for this test?
|
||||
* @return the number of pings received
|
||||
*/
|
||||
public long getLifetimeReceived() {
|
||||
return _lifetimeReceived;
|
||||
}
|
||||
public long getLifetimeReceived() { return _lifetimeReceived; }
|
||||
|
||||
/**
|
||||
* @return the client configuration
|
||||
@ -219,7 +208,9 @@ public class PeerData {
|
||||
data.setPongReceived(now);
|
||||
data.setPongSent(pongSent);
|
||||
data.setWasPonged(true);
|
||||
addDataPoint(data);
|
||||
locked_addDataPoint(data);
|
||||
} else {
|
||||
_log.warn("Pong received, but no matching ping? ping sent at = " + dateSent);
|
||||
}
|
||||
}
|
||||
_sendRate.addData(pongSent - dateSent, 0);
|
||||
@ -228,7 +219,17 @@ public class PeerData {
|
||||
}
|
||||
|
||||
protected void addDataPoint(EventDataPoint data) {
|
||||
_dataPoints.put(new Long(data.getPingSent()), data);
|
||||
synchronized (_updateLock) {
|
||||
locked_addDataPoint(data);
|
||||
}
|
||||
}
|
||||
|
||||
private void locked_addDataPoint(EventDataPoint data) {
|
||||
Object val = _dataPoints.put(new Long(data.getPingSent()), data);
|
||||
if (val != null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Duplicate data point received: " + data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -244,52 +245,8 @@ public class PeerData {
|
||||
long numTimedOut = 0;
|
||||
|
||||
synchronized (_updateLock) {
|
||||
List toTimeout = new ArrayList(4);
|
||||
List toDrop = new ArrayList(4);
|
||||
for (Iterator iter = _pendingPings.keySet().iterator(); iter.hasNext();) {
|
||||
Long when = (Long) iter.next();
|
||||
if (when.longValue() < dropBefore)
|
||||
toDrop.add(when);
|
||||
else if (when.longValue() < timeoutBefore)
|
||||
toTimeout.add(when);
|
||||
else
|
||||
break; // its ordered, so once we are past timeoutBefore, no need
|
||||
}
|
||||
for (Iterator iter = toDrop.iterator(); iter.hasNext();) {
|
||||
_pendingPings.remove(iter.next());
|
||||
}
|
||||
|
||||
List toAdd = new ArrayList(toTimeout.size());
|
||||
for (Iterator iter = toTimeout.iterator(); iter.hasNext();) {
|
||||
Long when = (Long) iter.next();
|
||||
EventDataPoint data = (EventDataPoint) _pendingPings.remove(when);
|
||||
data.setWasPonged(false);
|
||||
toAdd.add(data);
|
||||
}
|
||||
|
||||
numDropped = toDrop.size();
|
||||
numTimedOut = toDrop.size();
|
||||
toDrop.clear();
|
||||
|
||||
for (Iterator iter = _dataPoints.keySet().iterator(); iter.hasNext();) {
|
||||
Long when = (Long) iter.next();
|
||||
if (when.longValue() < dropBefore)
|
||||
toDrop.add(when);
|
||||
else
|
||||
break; // ordered
|
||||
}
|
||||
for (Iterator iter = toDrop.iterator(); iter.hasNext();) {
|
||||
_dataPoints.remove(iter.next());
|
||||
}
|
||||
|
||||
numDropped += toDrop.size();
|
||||
|
||||
for (Iterator iter = toAdd.iterator(); iter.hasNext();) {
|
||||
EventDataPoint data = (EventDataPoint) iter.next();
|
||||
_dataPoints.put(new Long(data.getPingSent()), data);
|
||||
}
|
||||
|
||||
numTimedOut += toAdd.size();
|
||||
numDropped = locked_dropExpired(dropBefore);
|
||||
numTimedOut = locked_timeoutPending(timeoutBefore);
|
||||
}
|
||||
|
||||
_lostRate.addData(numTimedOut, 0);
|
||||
@ -302,6 +259,50 @@ public class PeerData {
|
||||
_log.debug("Peer data cleaned up " + numTimedOut + " timed out pings and removed " + numDropped
|
||||
+ " old entries");
|
||||
}
|
||||
|
||||
/**
|
||||
* Drop all data points that are already too old for us to be interested in
|
||||
*
|
||||
* @param when the earliest ping send time we care about
|
||||
* @return number of data points dropped
|
||||
*/
|
||||
private int locked_dropExpired(long when) {
|
||||
Set toDrop = new HashSet(4);
|
||||
// drop the failed and really old
|
||||
for (Iterator iter = _dataPoints.keySet().iterator(); iter.hasNext(); ) {
|
||||
Long pingTime = (Long)iter.next();
|
||||
if (pingTime.longValue() < when)
|
||||
toDrop.add(pingTime);
|
||||
}
|
||||
for (Iterator iter = toDrop.iterator(); iter.hasNext(); ) {
|
||||
_dataPoints.remove(iter.next());
|
||||
}
|
||||
return toDrop.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* timeout and remove all pings that were sent before the given time,
|
||||
* moving them from the set of pending pings to the set of data points
|
||||
*
|
||||
* @param when the earliest ping send time we care about
|
||||
* @return number of pings timed out
|
||||
*/
|
||||
private int locked_timeoutPending(long when) {
|
||||
Set toDrop = new HashSet(4);
|
||||
for (Iterator iter = _pendingPings.keySet().iterator(); iter.hasNext(); ) {
|
||||
Long pingTime = (Long)iter.next();
|
||||
if (pingTime.longValue() < when) {
|
||||
toDrop.add(pingTime);
|
||||
EventDataPoint point = (EventDataPoint)_pendingPings.get(pingTime);
|
||||
point.setWasPonged(false);
|
||||
locked_addDataPoint(point);
|
||||
}
|
||||
}
|
||||
for (Iterator iter = toDrop.iterator(); iter.hasNext(); ) {
|
||||
_pendingPings.remove(iter.next());
|
||||
}
|
||||
return toDrop.size();
|
||||
}
|
||||
|
||||
/** actual data point for the peer */
|
||||
public class EventDataPoint {
|
||||
@ -313,9 +314,7 @@ public class PeerData {
|
||||
/**
|
||||
* Creates an EventDataPoint
|
||||
*/
|
||||
public EventDataPoint() {
|
||||
this(-1);
|
||||
}
|
||||
public EventDataPoint() { this(-1); }
|
||||
|
||||
/**
|
||||
* Creates an EventDataPoint with pingtime associated with it =)
|
||||
@ -332,17 +331,8 @@ public class PeerData {
|
||||
* when did we send this ping?
|
||||
* @return the time the ping was sent
|
||||
*/
|
||||
public long getPingSent() {
|
||||
return _pingSent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the time the ping was sent
|
||||
* @param when time to set
|
||||
*/
|
||||
public void setPingSent(long when) {
|
||||
_pingSent = when;
|
||||
}
|
||||
public long getPingSent() { return _pingSent; }
|
||||
public void setPingSent(long when) { _pingSent = when; }
|
||||
|
||||
/**
|
||||
* when did the peer receive the ping?
|
||||
|
Reference in New Issue
Block a user