2006-09-26 Complication

* Subclass from Clock a RouterClock which can access router transports,
      with the goal of developing it to second-guess NTP results
    * Make transports report clock skew in seconds
    * Adjust renderStatusHTML() methods accordingly
    * Show average for NTCP clock skews too
    * Give transports a getClockSkews() method to report clock skews
    * Give transport manager a getClockSkews() method to aggregate results
    * Give comm system facade a getMedianPeerClockSkew() method which RouterClock calls
      (to observe results, add "net.i2p.router.transport.CommSystemFacadeImpl=WARN" to
logging)
    * Extra explicitness in NTCP classes to denote unit of time.
    * Fix some places in NTCPConnection where milliseconds and seconds were confused
This commit is contained in:
complication
2006-09-27 04:02:13 +00:00
committed by zzz
parent ef2e24ea11
commit 9325b806e4
16 changed files with 277 additions and 43 deletions

View File

@ -68,7 +68,7 @@ public class I2PAppContext {
private LogManager _logManager;
private HMACGenerator _hmac;
private SHA256Generator _sha;
private Clock _clock;
protected Clock _clock; // overridden in RouterContext
private DSAEngine _dsa;
private RoutingKeyGenerator _routingKeyGenerator;
private RandomSource _random;
@ -83,7 +83,7 @@ public class I2PAppContext {
private volatile boolean _logManagerInitialized;
private volatile boolean _hmacInitialized;
private volatile boolean _shaInitialized;
private volatile boolean _clockInitialized;
protected volatile boolean _clockInitialized; // used in RouterContext
private volatile boolean _dsaInitialized;
private volatile boolean _routingKeyGeneratorInitialized;
private volatile boolean _randomInitialized;
@ -411,11 +411,11 @@ public class I2PAppContext {
* enable simulators to play with clock skew among different instances.
*
*/
public Clock clock() {
public Clock clock() { // overridden in RouterContext
if (!_clockInitialized) initializeClock();
return _clock;
}
private void initializeClock() {
protected void initializeClock() { // overridden in RouterContext
synchronized (this) {
if (_clock == null)
_clock = new Clock(this);

View File

@ -13,12 +13,16 @@ import net.i2p.time.Timestamper;
* between the local computer's current time and the time as known by some reference
* (such as an NTP synchronized clock).
*
* Protected members are used in the subclass RouterClock,
* which has access to a router's transports (particularly peer clock skews)
* to second-guess the sanity of clock adjustments.
*
*/
public class Clock implements Timestamper.UpdateListener {
private I2PAppContext _context;
protected I2PAppContext _context;
private Timestamper _timestamper;
private long _startedOn;
private boolean _statCreated;
protected long _startedOn;
protected boolean _statCreated;
public Clock(I2PAppContext context) {
_context = context;
@ -36,10 +40,10 @@ public class Clock implements Timestamper.UpdateListener {
public Timestamper getTimestamper() { return _timestamper; }
/** we fetch it on demand to avoid circular dependencies (logging uses the clock) */
private Log getLog() { return _context.logManager().getLog(Clock.class); }
protected Log getLog() { return _context.logManager().getLog(Clock.class); }
private volatile long _offset;
private boolean _alreadyChanged;
protected volatile long _offset;
protected boolean _alreadyChanged;
private Set _listeners;
/** if the clock is skewed by 3+ days, fuck 'em */
@ -132,7 +136,7 @@ public class Clock implements Timestamper.UpdateListener {
}
}
private void fireOffsetChanged(long delta) {
protected void fireOffsetChanged(long delta) {
synchronized (_listeners) {
for (Iterator iter = _listeners.iterator(); iter.hasNext();) {
ClockUpdateListener lsnr = (ClockUpdateListener) iter.next();

View File

@ -1,4 +1,17 @@
$Id: history.txt,v 1.524 2006-09-24 13:30:23 zzz Exp $
$Id: history.txt,v 1.525 2006-09-25 22:11:41 zzz Exp $
2006-09-26 Complication
* Subclass from Clock a RouterClock which can access router transports,
with the goal of developing it to second-guess NTP results
* Make transports report clock skew in seconds
* Adjust renderStatusHTML() methods accordingly
* Show average for NTCP clock skews too
* Give transports a getClockSkews() method to report clock skews
* Give transport manager a getClockSkews() method to aggregate results
* Give comm system facade a getMedianPeerClockSkew() method which RouterClock calls
(to observe results, add "net.i2p.router.transport.CommSystemFacadeImpl=WARN" to logging)
* Extra explicitness in NTCP classes to denote unit of time.
* Fix some places in NTCPConnection where milliseconds and seconds were confused
2006-09-25 zzz
* i2psnark: Paranoid copy before writing pieces,

View File

@ -33,6 +33,12 @@ public abstract class CommSystemFacade implements Service {
public int countActiveSendPeers() { return 0; }
public List getMostRecentErrorMessages() { return Collections.EMPTY_LIST; }
/**
* Median clock skew of connected peers in seconds, or null if we cannot answer.
* CommSystemFacadeImpl overrides this.
*/
public Long getMedianPeerClockSkew() { return null; }
/**
* Determine under what conditions we are remotely reachable.
*

View File

@ -0,0 +1,78 @@
package net.i2p.router;
import net.i2p.util.Log;
import net.i2p.I2PAppContext;
import net.i2p.router.RouterContext;
import net.i2p.util.Clock;
/**
* Alternate location for determining the time which takes into account an offset.
* This offset will ideally be periodically updated so as to serve as the difference
* between the local computer's current time and the time as known by some reference
* (such as an NTP synchronized clock).
*
* RouterClock is a subclass of Clock with access to router transports.
* TODO: RouterClock is intended to become capable of second-guessing
* the sanity of clock adjustments.
*/
public class RouterClock extends Clock {
RouterContext _context;
public RouterClock(RouterContext context) {
super(context);
_context = context;
}
/**
* Specify how far away from the "correct" time the computer is - a positive
* value means that we are slow, while a negative value means we are fast.
*
*/
public void setOffset(long offsetMs, boolean force) {
// To test new routines, calculate median peer clock skew
Long medianPeerClockSkew = _context.commSystem().getMedianPeerClockSkew();
if (false) return;
long delta = offsetMs - _offset;
if (!force) {
if ((offsetMs > MAX_OFFSET) || (offsetMs < 0 - MAX_OFFSET)) {
getLog().error("Maximum offset shift exceeded [" + offsetMs + "], NOT HONORING IT");
return;
}
// only allow substantial modifications before the first 10 minutes
if (_alreadyChanged && (System.currentTimeMillis() - _startedOn > 10 * 60 * 1000)) {
if ( (delta > MAX_LIVE_OFFSET) || (delta < 0 - MAX_LIVE_OFFSET) ) {
getLog().log(Log.CRIT, "The clock has already been updated, but you want to change it by "
+ delta + " to " + offsetMs + "? Did something break?");
return;
}
}
if ((delta < MIN_OFFSET_CHANGE) && (delta > 0 - MIN_OFFSET_CHANGE)) {
getLog().debug("Not changing offset since it is only " + delta + "ms");
_alreadyChanged = true;
return;
}
}
if (_alreadyChanged) {
if (delta > 15*1000)
getLog().log(Log.CRIT, "Updating clock offset to " + offsetMs + "ms from " + _offset + "ms");
else if (getLog().shouldLog(Log.INFO))
getLog().info("Updating clock offset to " + offsetMs + "ms from " + _offset + "ms");
if (!_statCreated)
_context.statManager().createRateStat("clock.skew", "How far is the already adjusted clock being skewed?", "Clock", new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*60 });
_statCreated = true;
_context.statManager().addRateData("clock.skew", delta, 0);
} else {
getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms from " + _offset + "ms");
}
_alreadyChanged = true;
_offset = offsetMs;
fireOffsetChanged(delta);
}
}

View File

@ -26,6 +26,8 @@ import net.i2p.router.transport.OutboundMessageRegistry;
import net.i2p.router.transport.VMCommSystem;
import net.i2p.router.tunnel.pool.TunnelPoolManager;
import net.i2p.router.tunnel.TunnelDispatcher;
import net.i2p.util.Clock;
import net.i2p.router.RouterClock;
/**
* Build off the core I2P context to provide a root for a router instance to
@ -59,13 +61,15 @@ public class RouterContext extends I2PAppContext {
private MessageValidator _messageValidator;
private MessageStateMonitor _messageStateMonitor;
private RouterThrottle _throttle;
private RouterClock _clock;
private Calculator _isFailingCalc;
private Calculator _integrationCalc;
private Calculator _speedCalc;
private Calculator _reliabilityCalc;
private Calculator _capacityCalc;
private Calculator _oldSpeedCalc;
private static List _contexts = new ArrayList(1);
public RouterContext(Router router) { this(router, null); }
@ -323,4 +327,25 @@ public class RouterContext extends I2PAppContext {
}
return super.getProperty(propName, defaultVal);
}
/**
* The context's synchronized clock, which is kept context specific only to
* enable simulators to play with clock skew among different instances.
*
* It wouldn't be necessary to override clock(), except for the reason
* that it triggers initializeClock() of which we definitely
* need the local version to run.
*/
public Clock clock() {
if (!_clockInitialized) initializeClock();
return _clock;
}
protected void initializeClock() {
synchronized (this) {
if (_clock == null)
_clock = new RouterClock(this);
_clockInitialized = true;
}
}
}

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.461 $ $Date: 2006-09-24 13:30:22 $";
public final static String ID = "$Revision: 1.462 $ $Date: 2006-09-25 22:11:39 $";
public final static String VERSION = "0.6.1.25";
public final static long BUILD = 8;
public final static long BUILD = 9;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -16,6 +16,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.Collections;
import net.i2p.data.RouterAddress;
import net.i2p.router.CommSystemFacade;
@ -57,6 +59,35 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public int countActivePeers() { return (_manager == null ? 0 : _manager.countActivePeers()); }
public int countActiveSendPeers() { return (_manager == null ? 0 : _manager.countActiveSendPeers()); }
/**
* Median clock skew of connected peers in seconds, or null if we cannot answer.
*/
public Long getMedianPeerClockSkew() {
if (_manager == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Returning null for median peer clock skew (no transport manager)!");
return null;
}
Vector skews = _manager.getClockSkews();
if (skews == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Returning null for median peer clock skew (no data)!");
return null;
}
if (skews.size() < 5) {
if (_log.shouldLog(Log.ERROR))
_log.error("Returning null for median peer clock skew (only " + skews.size() + " peers)!");
return null;
}
// Going to calculate, let's sort them
Collections.sort(skews);
// Pick out median
Long medianPeerClockSkew = (Long) (skews.get(skews.size() / 2));
if (_log.shouldLog(Log.WARN))
_log.warn("Our median peer clock skew is " + medianPeerClockSkew + " s.");
return medianPeerClockSkew;
}
public List getBids(OutNetMessage msg) {
return _manager.getBids(msg);

View File

@ -12,6 +12,7 @@ import java.io.IOException;
import java.io.Writer;
import java.util.List;
import java.util.Set;
import java.util.Vector;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
@ -40,6 +41,7 @@ public interface Transport {
public int countActivePeers();
public int countActiveSendPeers();
public Vector getClockSkews();
public List getMostRecentErrorMessages();
public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException;

View File

@ -67,6 +67,13 @@ public abstract class TransportImpl implements Transport {
*/
public int countActiveSendPeers() { return 0; }
/**
* Return our peer clock skews on a transport.
* Vector composed of Long, each element representing a peer skew in seconds.
* Dummy version. Transports override it.
*/
public Vector getClockSkews() { return new Vector(); }
public List getMostRecentErrorMessages() { return Collections.EMPTY_LIST; }
/**
* Nonblocking call to pull the next outbound message

View File

@ -18,6 +18,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.Vector;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity;
@ -150,6 +151,23 @@ public class TransportManager implements TransportEventListener {
return peers;
}
/**
* Return our peer clock skews on all transports.
* Vector composed of Long, each element representing a peer skew in seconds.
* Note: this method returns them in whimsical order.
*/
public Vector getClockSkews() {
Vector skews = new Vector();
for (int i = 0; i < _transports.size(); i++) {
Vector tempSkews = ((Transport)_transports.get(i)).getClockSkews();
if ((tempSkews == null) || (tempSkews.size() <= 0)) continue;
skews.addAll(tempSkews);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Transport manager returning " + skews.size() + " peer clock skews.");
return skews;
}
public short getReachabilityStatus() {
if (_transports.size() <= 0) return CommSystemFacade.STATUS_UNKNOWN;
short status[] = new short[_transports.size()];

View File

@ -192,7 +192,7 @@ public class EstablishState {
System.arraycopy(_X, 0, xy, 0, _X.length);
System.arraycopy(_Y, 0, xy, _X.length, _Y.length);
Hash hxy = _context.sha().calculateHash(xy);
_tsB = _context.clock().now()/1000l;
_tsB = _context.clock().now()/1000l; // our (Bob's) timestamp in seconds
byte padding[] = new byte[12]; // the encrypted data needs an extra 12 bytes
_context.random().nextBytes(padding);
byte toEncrypt[] = new byte[hxy.getData().length+4+padding.length];
@ -341,8 +341,8 @@ public class EstablishState {
fail("Invalid H(X+Y) - mitm attack attempted?");
return;
}
_tsB = DataHelper.fromLong(hXY_tsB, Hash.HASH_LENGTH, 4);
_tsA = _context.clock().now()/1000;
_tsB = DataHelper.fromLong(hXY_tsB, Hash.HASH_LENGTH, 4); // their (Bob's) timestamp in seconds
_tsA = _context.clock().now()/1000; // our (Alice's) timestamp in seconds
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"h(X+Y) is correct, tsA-tsB=" + (_tsA-_tsB));
@ -352,11 +352,11 @@ public class EstablishState {
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidOutboundSkew", diff, 0);
_transport.markReachable(_con.getRemotePeer().calculateHash());
_context.shitlist().shitlistRouter(_con.getRemotePeer().calculateHash(), "Outbound clock skew of " + diff);
fail("Clocks too skewed (" + diff + ")", null, true);
_context.shitlist().shitlistRouter(_con.getRemotePeer().calculateHash(), "Outbound clock skew of " + diff + " ms");
fail("Clocks too skewed (" + diff + " ms)", null, true);
return;
} else if (_log.shouldLog(Log.DEBUG)) {
_log.debug(prefix()+"Clock skew: " + diff);
_log.debug(prefix()+"Clock skew: " + diff + " ms");
}
// now prepare and send our response
@ -455,7 +455,7 @@ public class EstablishState {
System.arraycopy(_prevEncrypted, _prevEncrypted.length-16, nextWriteIV, 0, 16);
byte nextReadIV[] = new byte[16];
System.arraycopy(_e_bobSig, _e_bobSig.length-16, nextReadIV, 0, nextReadIV.length);
_con.finishOutboundEstablishment(_dh.getSessionKey(), 1000*(_tsA-_tsB), nextWriteIV, nextReadIV);
_con.finishOutboundEstablishment(_dh.getSessionKey(), (_tsA-_tsB), nextWriteIV, nextReadIV); // skew in seconds
return;
}
}
@ -537,11 +537,11 @@ public class EstablishState {
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0);
_transport.markReachable(alice.calculateHash());
_context.shitlist().shitlistRouter(alice.calculateHash(), "Clock skew of " + diff);
fail("Clocks too skewed (" + diff + ")", null, true);
_context.shitlist().shitlistRouter(alice.calculateHash(), "Clock skew of " + diff + " ms");
fail("Clocks too skewed (" + diff + " ms)", null, true);
return;
} else if (_log.shouldLog(Log.DEBUG)) {
_log.debug(prefix()+"Clock skew: " + diff);
_log.debug(prefix()+"Clock skew: " + diff + " ms");
}
sendInboundConfirm(alice, tsA);
@ -550,7 +550,7 @@ public class EstablishState {
_log.debug(prefix()+"e_bobSig is " + _e_bobSig.length + " bytes long");
byte iv[] = new byte[16];
System.arraycopy(_e_bobSig, _e_bobSig.length-16, iv, 0, 16);
_con.finishInboundEstablishment(_dh.getSessionKey(), 1000*(tsA-_tsB), iv, _prevEncrypted);
_con.finishInboundEstablishment(_dh.getSessionKey(), (tsA-_tsB), iv, _prevEncrypted); // skew in seconds
if (_log.shouldLog(Log.INFO))
_log.info(prefix()+"Verified remote peer as " + alice.calculateHash().toBase64());
} else {
@ -670,7 +670,7 @@ public class EstablishState {
long skewSeconds = (ctx.clock().now()/1000)-now;
if (log.shouldLog(Log.INFO))
log.info("Check info received: our IP: " + ourIP + " our port: " + port
+ " skew: " + skewSeconds);
+ " skew: " + skewSeconds + " s");
} catch (UnknownHostException uhe) {
// ipSize is invalid
if (log.shouldLog(Log.WARN))

View File

@ -67,7 +67,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private boolean _closed;
private NTCPAddress _remAddr;
private RouterIdentity _remotePeer;
private long _clockSkew;
private long _clockSkew; // in seconds
/**
* pending unprepared OutNetMessage instances
*/
@ -168,7 +168,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
public RouterIdentity getRemotePeer() { return _remotePeer; }
public void setRemotePeer(RouterIdentity ident) { _remotePeer = ident; }
/**
* @param clockSkew alice's clock minus bob's clock (may be negative, obviously, but |val| should
* @param clockSkew alice's clock minus bob's clock in seconds (may be negative, obviously, but |val| should
* be under 1 minute)
*/
public void finishInboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
@ -360,7 +360,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
/**
* @param clockSkew alice's clock minus bob's clock (may be negative, obviously, but |val| should
* @param clockSkew alice's clock minus bob's clock in seconds (may be negative, obviously, but |val| should
* be under 1 minute)
*/
public void finishOutboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
@ -995,26 +995,27 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
return;
} else {
long newSkew = (ourTs - ts);
if ( (newSkew > Router.CLOCK_FUDGE_FACTOR) || (newSkew < 0-Router.CLOCK_FUDGE_FACTOR) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Peer's skew jumped too far (from " + _clockSkew + " to " + newSkew + "): " + toString());
if (Math.abs(newSkew*1000) > Router.CLOCK_FUDGE_FACTOR) {
if (_log.shouldLog(Log.ERROR))
_log.error("Peer's skew jumped too far (from " + _clockSkew + " s to " + newSkew + " s): " + toString());
_context.statManager().addRateData("ntcp.corruptSkew", newSkew, getUptime());
close();
return;
}
_context.statManager().addRateData("ntcp.receiveMeta", newSkew, getUptime());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received NTCP metadata, old skew of " + _clockSkew + ", new skew of " + newSkew);
_log.debug("Received NTCP metadata, old skew of " + _clockSkew + " s, new skew of " + newSkew + "s.");
_clockSkew = newSkew;
}
}
private void sendMeta() {
byte encrypted[] = new byte[_meta.length];
long ts = _context.clock().now()/1000;
synchronized (_meta) {
_context.random().nextBytes(_meta); // randomize the uninterpreted, then overwrite w/ data
DataHelper.toLong(_meta, 0, 2, 0);
DataHelper.toLong(_meta, 2, 4, _context.clock().now()/1000);
DataHelper.toLong(_meta, 2, 4, ts);
Adler32 crc = new Adler32();
crc.update(_meta, 0, _meta.length-4);
DataHelper.toLong(_meta, _meta.length-4, 4, crc.getValue());
@ -1023,7 +1024,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
System.arraycopy(encrypted, encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length);
// perhaps this should skip the bw limiter to reduce clock skew issues?
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending NTCP metadata");
_log.debug("Sending NTCP metadata with timestamp " + ts);
_sendingMeta = true;
_transport.getPumper().wantsWrite(this, encrypted);
// enqueueInfoMessage(); // this often?

View File

@ -15,6 +15,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.Vector;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
@ -344,6 +345,28 @@ public class NTCPTransport extends TransportImpl {
return active;
}
/**
* Return our peer clock skews on this transport.
* Vector composed of Long, each element representing a peer skew in seconds.
*/
public Vector getClockSkews() {
Vector peers = new Vector();
Vector skews = new Vector();
synchronized (_conLock) {
peers.addAll(_conByIdent.values());
}
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
NTCPConnection con = (NTCPConnection)iter.next();
skews.addElement(new Long (con.getClockSkew()));
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("NTCP transport returning " + skews.size() + " peer clock skews.");
return skews;
}
private static final int NUM_CONCURRENT_READERS = 3;
private static final int NUM_CONCURRENT_WRITERS = 3;
@ -565,8 +588,9 @@ public class NTCPTransport extends TransportImpl {
buf.append("</td><td>For ").append(DataHelper.formatDuration(readTime));
readingPeers++;
}
buf.append("</td><td>").append(DataHelper.formatDuration(con.getClockSkew()));
buf.append("</td></tr>\n");
offsetTotal = offsetTotal + con.getClockSkew();
buf.append("</td><td>").append(con.getClockSkew());
buf.append("s</td></tr>\n");
out.write(buf.toString());
buf.setLength(0);
}
@ -576,8 +600,9 @@ public class NTCPTransport extends TransportImpl {
buf.append("<tr><td>").append(peers.size()).append(" peers</td><td>&nbsp;</td><td>").append(DataHelper.formatDuration(totalUptime/peers.size()));
buf.append("</td><td>&nbsp;</td><td>").append(totalSend).append("</td><td>").append(totalRecv);
buf.append("</td><td>").append(formatRate(bpsSend/1024)).append("/").append(formatRate(bpsRecv/1024)).append("KBps");
buf.append("</td><td>&nbsp;</td><td>&nbsp;</td><td>&nbsp;</td><td>&nbsp;</td>");
buf.append("</tr>\n");
buf.append("</td><td>&nbsp;</td><td>&nbsp;</td><td>&nbsp;");
buf.append("</td><td>").append(peers.size() > 0 ? DataHelper.formatDuration(offsetTotal*1000/peers.size()) : "0ms");
buf.append("</td></tr>\n");
}
buf.append("</table>\n");

View File

@ -332,7 +332,7 @@ public class PeerState {
/** when were the current cipher and MAC keys established/rekeyed? */
public long getKeyEstablishedTime() { return _keyEstablishedTime; }
/** how far off is the remote peer from our clock, in seconds? */
public short getClockSkew() { return _clockSkew; }
public short getClockSkew() { return ( (short) (_clockSkew / 1000)); }
/** what is the current receive second, for congestion control? */
public long getCurrentReceiveSecond() { return _currentReceiveSecond; }
/** when did we last send them a packet? */

View File

@ -1207,6 +1207,30 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return active;
}
/**
* Return our peer clock skews on this transport.
* Vector composed of Long, each element representing a peer skew in seconds.
*/
public Vector getClockSkews() {
Vector skews = new Vector();
Vector peers = new Vector();
synchronized (_peersByIdent) {
peers.addAll(_peersByIdent.values());
}
long now = _context.clock().now();
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
PeerState peer = (PeerState)iter.next();
if (now-peer.getLastReceiveTime() > 60*60*1000) continue; // skip old peers
skews.addElement(new Long (peer.getClockSkew()));
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("UDP transport returning " + skews.size() + " peer clock skews.");
return skews;
}
private static UDPTransport __instance;
/** **internal, do not use** */
public static final UDPTransport _instance() { return __instance; }
@ -1686,7 +1710,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("</code></td>");
buf.append("<td valign=\"top\" ><code>");
buf.append(peer.getClockSkew()/1000);
buf.append(peer.getClockSkew());
buf.append("s</code></td>");
offsetTotal = offsetTotal + peer.getClockSkew();
@ -1783,7 +1807,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(formatKBps(bpsIn)).append("KBps/").append(formatKBps(bpsOut));
buf.append("KBps</td>");
buf.append(" <td>").append(numPeers > 0 ? DataHelper.formatDuration(uptimeMsTotal/numPeers) : "0s");
buf.append("</td><td>").append(numPeers > 0 ? DataHelper.formatDuration(offsetTotal/numPeers) : "0ms").append("</td>\n");
buf.append("</td><td>").append(numPeers > 0 ? DataHelper.formatDuration(offsetTotal*1000/numPeers) : "0ms").append("</td>\n");
buf.append(" <td>");
buf.append(numPeers > 0 ? cwinTotal/(numPeers*1024) + "K" : "0K");
buf.append("</td><td>&nbsp;</td>\n");