Transports: Fix clock skew calculations

- NTCP inbound skew calculation had flipped sign
- Include RTT in NTCP skew calulation
- Set SSU skew on first packet so it's right from the beginning
- Above should fix the "ignoring clock offset" error
- Javadocs
RouterClock:
- Add config to disable clock adjustment for testing
- Reduce window in skew vector for slew calculation
- Double max slew rate
This commit is contained in:
zzz
2015-04-29 01:30:33 +00:00
parent 613440ff63
commit fe680eb192
9 changed files with 138 additions and 37 deletions

View File

@ -1,3 +1,25 @@
2015-04-29 zzz
* Transports: Fix clock skew calculations
2015-04-28 zzz
* JobQueueRunner: Don't call System.exit() on OOM,
let the shutdown progress normally;
Make it an I2PThread instead of a Runner so we can
call fireOOM() for consistent logging (ticket #1549)
* Router: Don't add OOM listener on Android so
we don't hang onto the context
2015-04-27 zzz
* NamingService: Add export methods
* SusiDNS: Add export support, no UI yet
* Transports:
- Convert internal state to enums, prep for tracking
IPv4/v6 reachability separately (ticket #1458)
- Don't set TCP keepalive for IPv6
2015-04-26 zzz
* i2ptunnel: Reduce sleep time in runners to reduce latency
2015-04-25 zzz 2015-04-25 zzz
* I2PSSLSocketFactory: Add hostname verification * I2PSSLSocketFactory: Add hostname verification
* SSLEepGet: * SSLEepGet:

View File

@ -29,7 +29,7 @@ public class RouterClock extends Clock {
* 1/50 is 12s in a 10m tunnel lifetime, that should be fine. * 1/50 is 12s in a 10m tunnel lifetime, that should be fine.
* All of this is @since 0.7.12 * All of this is @since 0.7.12
*/ */
private static final long MAX_SLEW = 50; private static final long MAX_SLEW = 25;
public static final int DEFAULT_STRATUM = 8; public static final int DEFAULT_STRATUM = 8;
private static final int WORST_STRATUM = 16; private static final int WORST_STRATUM = 16;
@ -50,6 +50,9 @@ public class RouterClock extends Clock {
private static final long MASSIVE_SHIFT_FORWARD = 150*1000; private static final long MASSIVE_SHIFT_FORWARD = 150*1000;
private static final long MASSIVE_SHIFT_BACKWARD = 61*1000; private static final long MASSIVE_SHIFT_BACKWARD = 61*1000;
/** testing only */
private static final String PROP_DISABLE_ADJUSTMENT = "time.disableOffset";
private final Set<ClockShiftListener> _shiftListeners; private final Set<ClockShiftListener> _shiftListeners;
private volatile long _lastShiftNanos; private volatile long _lastShiftNanos;
@ -145,25 +148,26 @@ public class RouterClock extends Clock {
_alreadyChanged) { _alreadyChanged) {
// Try calculating peer clock skew // Try calculating peer clock skew
long currentPeerClockSkew = ((RouterContext)_context).commSystem().getFramedAveragePeerClockSkew(50); long currentPeerClockSkew = ((RouterContext)_context).commSystem().getFramedAveragePeerClockSkew(33);
// Predict the effect of applying the proposed clock offset // Predict the effect of applying the proposed clock offset
long predictedPeerClockSkew = currentPeerClockSkew + delta; long predictedPeerClockSkew = currentPeerClockSkew + delta;
// Fail sanity check if applying the offset would increase peer clock skew // Fail sanity check if applying the offset would increase peer clock skew
Log log = getLog();
if ((Math.abs(predictedPeerClockSkew) > (Math.abs(currentPeerClockSkew) + 5*1000)) || if ((Math.abs(predictedPeerClockSkew) > (Math.abs(currentPeerClockSkew) + 5*1000)) ||
(Math.abs(predictedPeerClockSkew) > 20*1000)) { (Math.abs(predictedPeerClockSkew) > 20*1000)) {
getLog().error("Ignoring clock offset " + offsetMs + "ms (current " + _offset + if (log.shouldWarn())
log.warn("Ignoring clock offset " + offsetMs + "ms (current " + _offset +
"ms) since it would increase peer clock skew from " + currentPeerClockSkew + "ms) since it would increase peer clock skew from " + currentPeerClockSkew +
"ms to " + predictedPeerClockSkew + "ms. Bad time server?"); "ms to " + predictedPeerClockSkew + "ms. Stratrum: " + stratum);
return; return;
} else { } else {
Log log = getLog(); if (log.shouldInfo())
if (log.shouldLog(Log.DEBUG)) log.info("Approving clock offset " + offsetMs + "ms (current " + _offset +
log.debug("Approving clock offset " + offsetMs + "ms (current " + _offset +
"ms) since it would decrease peer clock skew from " + currentPeerClockSkew + "ms) since it would decrease peer clock skew from " + currentPeerClockSkew +
"ms to " + predictedPeerClockSkew + "ms."); "ms to " + predictedPeerClockSkew + "ms. Stratrum: " + stratum);
} }
} // check sanity } // check sanity
} }
@ -184,17 +188,25 @@ public class RouterClock extends Clock {
_statCreated = true; _statCreated = true;
} }
_context.statManager().addRateData("clock.skew", delta); _context.statManager().addRateData("clock.skew", delta);
if (_context.getBooleanProperty(PROP_DISABLE_ADJUSTMENT)) {
getLog().error("Clock adjustment disabled", new Exception());
} else {
_desiredOffset = offsetMs; _desiredOffset = offsetMs;
}
} else { } else {
Log log = getLog(); Log log = getLog();
if (log.shouldLog(Log.INFO)) if (log.shouldLog(Log.INFO))
log.info("Initializing clock offset to " + offsetMs + "ms, Stratum " + stratum); log.info("Initializing clock offset to " + offsetMs + "ms, Stratum " + stratum);
_alreadyChanged = true; _alreadyChanged = true;
if (_context.getBooleanProperty(PROP_DISABLE_ADJUSTMENT)) {
log.error("Clock adjustment disabled", new Exception());
} else {
_offset = offsetMs; _offset = offsetMs;
_desiredOffset = offsetMs; _desiredOffset = offsetMs;
// this is used by the JobQueue // this is used by the JobQueue
fireOffsetChanged(delta); fireOffsetChanged(delta);
} }
}
_lastChanged = System.currentTimeMillis(); _lastChanged = System.currentTimeMillis();
_lastStratum = stratum; _lastStratum = stratum;

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 9; public final static long BUILD = 10;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@ -105,6 +105,9 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
* @param percentToInclude 1-100 * @param percentToInclude 1-100
* @return Framed average clock skew of connected peers in milliseconds, or the clock offset if we cannot answer. * @return Framed average clock skew of connected peers in milliseconds, or the clock offset if we cannot answer.
* Average is calculated over the middle "percentToInclude" peers. * Average is calculated over the middle "percentToInclude" peers.
*
* A positive number means our clock is ahead of theirs.
*
* Todo: change Vectors to milliseconds * Todo: change Vectors to milliseconds
*/ */
@Override @Override
@ -518,7 +521,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public void timeReached() { public void timeReached() {
// use the same % as in RouterClock so that check will never fail // use the same % as in RouterClock so that check will never fail
// This is their our offset w.r.t. them... // This is their our offset w.r.t. them...
long peerOffset = getFramedAveragePeerClockSkew(50); long peerOffset = getFramedAveragePeerClockSkew(33);
if (peerOffset == 0) if (peerOffset == 0)
return; return;
long currentOffset = _context.clock().getOffset(); long currentOffset = _context.clock().getOffset();

View File

@ -357,6 +357,7 @@ public class TransportManager implements TransportEventListener {
/** /**
* Return our peer clock skews on all transports. * Return our peer clock skews on all transports.
* Vector composed of Long, each element representing a peer skew in seconds. * Vector composed of Long, each element representing a peer skew in seconds.
* A positive number means our clock is ahead of theirs.
* Note: this method returns them in whimsical order. * Note: this method returns them in whimsical order.
*/ */
public Vector<Long> getClockSkews() { public Vector<Long> getClockSkews() {

View File

@ -81,10 +81,19 @@ class EstablishState {
// alice receives (and bob sends) // alice receives (and bob sends)
private final byte _Y[]; private final byte _Y[];
private final byte _e_hXY_tsB[]; private final byte _e_hXY_tsB[];
/** Bob's Timestamp in seconds */ /** Bob's timestamp in seconds, this is in message #2, *before* _tsA */
private transient long _tsB; private transient long _tsB;
/** Alice's Timestamp in seconds */ /** Alice's timestamp in seconds, this is in message #3, *after* _tsB
* Only saved for outbound. For inbound, see verifyInbound().
*/
private transient long _tsA; private transient long _tsA;
/**
* OUR clock minus HIS clock, in seconds
*
* Inbound: tsB - tsA - rtt/2
* Outbound: tsA - tsB - rtt/2
*/
private transient long _peerSkew;
private transient byte _e_bobSig[]; private transient byte _e_bobSig[];
/** previously received encrypted block (or the IV) */ /** previously received encrypted block (or the IV) */
@ -312,7 +321,8 @@ class EstablishState {
System.arraycopy(_Y, 0, xy, XY_SIZE, XY_SIZE); System.arraycopy(_Y, 0, xy, XY_SIZE, XY_SIZE);
byte[] hxy = SimpleByteCache.acquire(HXY_SIZE); byte[] hxy = SimpleByteCache.acquire(HXY_SIZE);
_context.sha().calculateHash(xy, 0, XY_SIZE + XY_SIZE, hxy, 0); _context.sha().calculateHash(xy, 0, XY_SIZE + XY_SIZE, hxy, 0);
_tsB = (_context.clock().now() + 500) / 1000l; // our (Bob's) timestamp in seconds // our (Bob's) timestamp in seconds
_tsB = (_context.clock().now() + 500) / 1000l;
byte toEncrypt[] = new byte[HXY_TSB_PAD_SIZE]; // 48 byte toEncrypt[] = new byte[HXY_TSB_PAD_SIZE]; // 48
System.arraycopy(hxy, 0, toEncrypt, 0, HXY_SIZE); System.arraycopy(hxy, 0, toEncrypt, 0, HXY_SIZE);
byte tsB[] = DataHelper.toLong(4, _tsB); byte tsB[] = DataHelper.toLong(4, _tsB);
@ -471,7 +481,9 @@ class EstablishState {
* is synchronized, should be OK. See isComplete() * is synchronized, should be OK. See isComplete()
*/ */
private void receiveOutbound(ByteBuffer src) { private void receiveOutbound(ByteBuffer src) {
// recv Y+E(H(X+Y)+tsB, sk, Y[239:255]) // recv Y+E(H(X+Y)+tsB, sk, Y[239:255])
// Read in Y, which is the first part of message #2
while (_state == State.OB_SENT_X && src.hasRemaining()) { while (_state == State.OB_SENT_X && src.hasRemaining()) {
byte c = src.get(); byte c = src.get();
_Y[_received++] = c; _Y[_received++] = c;
@ -491,6 +503,8 @@ class EstablishState {
} }
} }
// Read in Y, which is the first part of message #2
// Read in the rest of message #2
while (_state == State.OB_GOT_Y && src.hasRemaining()) { while (_state == State.OB_GOT_Y && src.hasRemaining()) {
int i = _received-XY_SIZE; int i = _received-XY_SIZE;
_received++; _received++;
@ -517,18 +531,25 @@ class EstablishState {
} }
SimpleByteCache.release(h); SimpleByteCache.release(h);
changeState(State.OB_GOT_HXY); changeState(State.OB_GOT_HXY);
_tsB = DataHelper.fromLong(hXY_tsB, HXY_SIZE, 4); // their (Bob's) timestamp in seconds // their (Bob's) timestamp in seconds
_tsA = (_context.clock().now() + 500) / 1000; // our (Alice's) timestamp in seconds _tsB = DataHelper.fromLong(hXY_tsB, HXY_SIZE, 4);
long now = _context.clock().now();
// rtt from sending #1 to receiving #2
long rtt = now - _con.getCreated();
// our (Alice's) timestamp in seconds
_tsA = (now + 500) / 1000;
_peerSkew = (now - (_tsB * 1000) - (rtt / 2) + 500) / 1000;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"h(X+Y) is correct, tsA-tsB=" + (_tsA-_tsB)); _log.debug(prefix()+"h(X+Y) is correct, skew = " + _peerSkew);
// the skew is not authenticated yet, but it is certainly fatal to // the skew is not authenticated yet, but it is certainly fatal to
// the establishment, so fail hard if appropriate // the establishment, so fail hard if appropriate
long diff = 1000*Math.abs(_tsA-_tsB); long diff = 1000*Math.abs(_peerSkew);
if (!_context.clock().getUpdatedSuccessfully()) { if (!_context.clock().getUpdatedSuccessfully()) {
// Adjust the clock one time in desperation // Adjust the clock one time in desperation
_context.clock().setOffset(1000 * (_tsB - _tsA), true); // We are Alice, he is Bob, adjust to match Bob
_tsA = _tsB; _context.clock().setOffset(1000 * (0 - _peerSkew), true);
_peerSkew = 0;
if (diff != 0) if (diff != 0)
_log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff)); _log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) { } else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
@ -538,7 +559,7 @@ class EstablishState {
_context.banlist().banlistRouter(DataHelper.formatDuration(diff), _context.banlist().banlistRouter(DataHelper.formatDuration(diff),
_con.getRemotePeer().calculateHash(), _con.getRemotePeer().calculateHash(),
_x("Excessive clock skew: {0}")); _x("Excessive clock skew: {0}"));
_transport.setLastBadSkew(_tsA- _tsB); _transport.setLastBadSkew(_peerSkew);
fail("Clocks too skewed (" + diff + " ms)", null, true); fail("Clocks too skewed (" + diff + " ms)", null, true);
return; return;
} else if (_log.shouldLog(Log.DEBUG)) { } else if (_log.shouldLog(Log.DEBUG)) {
@ -593,6 +614,8 @@ class EstablishState {
_transport.getPumper().wantsWrite(_con, _prevEncrypted); _transport.getPumper().wantsWrite(_con, _prevEncrypted);
} }
} }
// Read in message #4
if (_state == State.OB_SENT_RI && src.hasRemaining()) { if (_state == State.OB_SENT_RI && src.hasRemaining()) {
// we are receiving their confirmation // we are receiving their confirmation
@ -656,7 +679,8 @@ class EstablishState {
byte nextWriteIV[] = _curEncrypted; // reuse buf byte nextWriteIV[] = _curEncrypted; // reuse buf
System.arraycopy(_prevEncrypted, _prevEncrypted.length-AES_SIZE, nextWriteIV, 0, AES_SIZE); System.arraycopy(_prevEncrypted, _prevEncrypted.length-AES_SIZE, nextWriteIV, 0, AES_SIZE);
// this does not copy the nextWriteIV, do not release to cache // this does not copy the nextWriteIV, do not release to cache
_con.finishOutboundEstablishment(_dh.getSessionKey(), (_tsA-_tsB), nextWriteIV, _e_bobSig); // skew in seconds // We are Alice, he is Bob, clock skew is Bob - Alice
_con.finishOutboundEstablishment(_dh.getSessionKey(), _peerSkew, nextWriteIV, _e_bobSig); // skew in seconds
releaseBufs(true); releaseBufs(true);
// if socket gets closed this will be null - prevent NPE // if socket gets closed this will be null - prevent NPE
InetAddress ia = _con.getChannel().socket().getInetAddress(); InetAddress ia = _con.getChannel().socket().getInetAddress();
@ -783,7 +807,15 @@ class EstablishState {
byte b[] = _sz_aliceIdent_tsA_padding_aliceSig.toByteArray(); byte b[] = _sz_aliceIdent_tsA_padding_aliceSig.toByteArray();
try { try {
int sz = _aliceIdentSize; int sz = _aliceIdentSize;
// her timestamp from message #3
long tsA = DataHelper.fromLong(b, 2+sz, 4); long tsA = DataHelper.fromLong(b, 2+sz, 4);
// _tsB is when we sent message #2
// Adjust backward by RTT/2
long now = _context.clock().now();
// rtt from sending #2 to receiving #3
long rtt = now - _con.getCreated();
_peerSkew = (now - (tsA * 1000) - (rtt / 2) + 500) / 1000;
ByteArrayOutputStream baos = new ByteArrayOutputStream(768); ByteArrayOutputStream baos = new ByteArrayOutputStream(768);
baos.write(_X); baos.write(_X);
baos.write(_Y); baos.write(_Y);
@ -827,12 +859,13 @@ class EstablishState {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "verification successful for " + _con); _log.debug(prefix() + "verification successful for " + _con);
long diff = 1000*Math.abs(tsA-_tsB); long diff = 1000*Math.abs(_peerSkew);
if (!_context.clock().getUpdatedSuccessfully()) { if (!_context.clock().getUpdatedSuccessfully()) {
// Adjust the clock one time in desperation // Adjust the clock one time in desperation
// This isn't very likely, outbound will do it first // This isn't very likely, outbound will do it first
_context.clock().setOffset(1000 * (_tsB - tsA), true); // We are Bob, she is Alice, adjust to match Alice
tsA = _tsB; _context.clock().setOffset(1000 * (0 - _peerSkew), true);
_peerSkew = 0;
if (diff != 0) if (diff != 0)
_log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff)); _log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) { } else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
@ -842,7 +875,7 @@ class EstablishState {
_context.banlist().banlistRouter(DataHelper.formatDuration(diff), _context.banlist().banlistRouter(DataHelper.formatDuration(diff),
_aliceIdent.calculateHash(), _aliceIdent.calculateHash(),
_x("Excessive clock skew: {0}")); _x("Excessive clock skew: {0}"));
_transport.setLastBadSkew(tsA- _tsB); _transport.setLastBadSkew(_peerSkew);
fail("Clocks too skewed (" + diff + " ms)", null, true); fail("Clocks too skewed (" + diff + " ms)", null, true);
return; return;
} else if (_log.shouldLog(Log.DEBUG)) { } else if (_log.shouldLog(Log.DEBUG)) {
@ -856,7 +889,8 @@ class EstablishState {
byte iv[] = _curEncrypted; // reuse buf byte iv[] = _curEncrypted; // reuse buf
System.arraycopy(_e_bobSig, _e_bobSig.length-AES_SIZE, iv, 0, AES_SIZE); System.arraycopy(_e_bobSig, _e_bobSig.length-AES_SIZE, iv, 0, AES_SIZE);
// this does not copy the IV, do not release to cache // this does not copy the IV, do not release to cache
_con.finishInboundEstablishment(_dh.getSessionKey(), (tsA-_tsB), iv, _prevEncrypted); // skew in seconds // We are Bob, she is Alice, clock skew is Alice-Bob
_con.finishInboundEstablishment(_dh.getSessionKey(), _peerSkew, iv, _prevEncrypted); // skew in seconds
releaseBufs(true); releaseBufs(true);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(prefix()+"Verified remote peer as " + _aliceIdent.calculateHash()); _log.info(prefix()+"Verified remote peer as " + _aliceIdent.calculateHash());

View File

@ -267,7 +267,9 @@ class NTCPConnection {
public void setRemotePeer(RouterIdentity ident) { _remotePeer = ident; } public void setRemotePeer(RouterIdentity ident) { _remotePeer = ident; }
/** /**
* @param clockSkew alice's clock minus bob's clock in seconds (may be negative, obviously, but |val| should * We are Bob.
*
* @param clockSkew OUR clock minus ALICE's clock in seconds (may be negative, obviously, but |val| should
* be under 1 minute) * be under 1 minute)
* @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt * @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt
* @param prevReadEnd 16 or more bytes, last 16 bytes copied * @param prevReadEnd 16 or more bytes, last 16 bytes copied
@ -284,7 +286,9 @@ class NTCPConnection {
} }
/** /**
* @param clockSkew alice's clock minus bob's clock in seconds (may be negative, obviously, but |val| should * We are Bob.
*
* @param clockSkew OUR clock minus ALICE's clock in seconds (may be negative, obviously, but |val| should
* be under 1 minute) * be under 1 minute)
* @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt * @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt
* @param prevReadEnd 16 or more bytes, last 16 bytes copied * @param prevReadEnd 16 or more bytes, last 16 bytes copied
@ -306,7 +310,10 @@ class NTCPConnection {
return rv; return rv;
} }
/** @return seconds */ /**
* A positive number means our clock is ahead of theirs.
* @return seconds
*/
public long getClockSkew() { return _clockSkew; } public long getClockSkew() { return _clockSkew; }
/** @return milliseconds */ /** @return milliseconds */
@ -346,6 +353,12 @@ class NTCPConnection {
/** @return milliseconds */ /** @return milliseconds */
public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; } public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; }
/**
* @return when this connection was created (not established)
* @since 0.9.20
*/
public long getCreated() { return _created; }
/** /**
* workaround for EventPumper * workaround for EventPumper
* @since 0.8.12 * @since 0.8.12
@ -595,7 +608,9 @@ class NTCPConnection {
***********/ ***********/
/** /**
* @param clockSkew alice's clock minus bob's clock in seconds (may be negative, obviously, but |val| should * We are Alice.
*
* @param clockSkew OUR clock minus BOB's clock in seconds (may be negative, obviously, but |val| should
* be under 1 minute) * be under 1 minute)
* @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt * @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt
* @param prevReadEnd 16 or more bytes, last 16 bytes copied * @param prevReadEnd 16 or more bytes, last 16 bytes copied
@ -1304,6 +1319,7 @@ class NTCPConnection {
_context.statManager().addRateData("ntcp.receiveMeta", newSkew); _context.statManager().addRateData("ntcp.receiveMeta", newSkew);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received NTCP metadata, old skew of " + _clockSkew + " s, new skew of " + newSkew + "s."); _log.debug("Received NTCP metadata, old skew of " + _clockSkew + " s, new skew of " + newSkew + "s.");
// FIXME does not account for RTT
_clockSkew = newSkew; _clockSkew = newSkew;
} }
} }

View File

@ -610,6 +610,7 @@ class PacketHandler {
_state = 44; _state = 44;
long recvOn = packet.getBegin(); long recvOn = packet.getBegin();
long sendOn = reader.readTimestamp() * 1000; long sendOn = reader.readTimestamp() * 1000;
// Positive when we are ahead of them
long skew = recvOn - sendOn; long skew = recvOn - sendOn;
int type = reader.readPayloadType(); int type = reader.readPayloadType();
// if it's a bad type, the whole packet is probably corrupt // if it's a bad type, the whole packet is probably corrupt

View File

@ -574,13 +574,25 @@ class PeerState {
/** /**
* Update the moving-average clock skew based on the current difference. * Update the moving-average clock skew based on the current difference.
* The raw skew will be adjusted for RTT/2 here. * The raw skew will be adjusted for RTT/2 here.
* @param skew milliseconds, NOT adjusted for RTT.
* A positive number means our clock is ahead of theirs. * A positive number means our clock is ahead of theirs.
* @param skew milliseconds, NOT adjusted for RTT.
*/ */
public void adjustClockSkew(long skew) { public void adjustClockSkew(long skew) {
// the real one-way delay is much less than RTT / 2, due to ack delays, // the real one-way delay is much less than RTT / 2, due to ack delays,
// so add a fudge factor // so add a fudge factor
double adj = 0.1 * (skew + CLOCK_SKEW_FUDGE - (_rtt / 2)); long actualSkew = skew + CLOCK_SKEW_FUDGE - (_rtt / 2);
//_log.error("Skew " + skew + " actualSkew " + actualSkew + " rtt " + _rtt + " pktsRcvd " + _packetsReceived);
// First time...
// This is important because we need accurate
// skews right from the beginning, since the median is taken
// and fed to the timestamper. Lots of connections only send a few packets.
if (_packetsReceived <= 1) {
synchronized(_clockSkewLock) {
_clockSkew = actualSkew;
}
return;
}
double adj = 0.1 * actualSkew;
synchronized(_clockSkewLock) { synchronized(_clockSkewLock) {
_clockSkew = (long) (0.9*_clockSkew + adj); _clockSkew = (long) (0.9*_clockSkew + adj);
} }