2006-07-27 jrandom
* Cut down NTCP connection establishments once we know the peer is skewed (rather than wait for full establishment before verifying) * Removed a lock on the stats framework when accessing rates, which shouldn't be a problem, assuming rates are created (pretty much) all at once and merely updated during the lifetime of the jvm.
This commit is contained in:
@ -43,7 +43,7 @@ public class StatManager {
|
||||
_log = context.logManager().getLog(StatManager.class);
|
||||
_context = context;
|
||||
_frequencyStats = Collections.synchronizedMap(new HashMap(128));
|
||||
_rateStats = Collections.synchronizedMap(new HashMap(128));
|
||||
_rateStats = new HashMap(128); // synchronized only on add //Collections.synchronizedMap(new HashMap(128));
|
||||
_statLog = new BufferedStatLog(context);
|
||||
}
|
||||
|
||||
@ -80,10 +80,12 @@ public class StatManager {
|
||||
* @param periods array of period lengths (in milliseconds)
|
||||
*/
|
||||
public void createRateStat(String name, String description, String group, long periods[]) {
|
||||
if (_rateStats.containsKey(name)) return;
|
||||
RateStat rs = new RateStat(name, description, group, periods);
|
||||
if (_statLog != null) rs.setStatLog(_statLog);
|
||||
_rateStats.put(name, rs);
|
||||
synchronized (_rateStats) {
|
||||
if (_rateStats.containsKey(name)) return;
|
||||
RateStat rs = new RateStat(name, description, group, periods);
|
||||
if (_statLog != null) rs.setStatLog(_statLog);
|
||||
_rateStats.put(name, rs);
|
||||
}
|
||||
}
|
||||
|
||||
/** update the given frequency statistic, taking note that an event occurred (and recalculating all frequencies) */
|
||||
@ -94,7 +96,7 @@ public class StatManager {
|
||||
|
||||
/** update the given rate statistic, taking note that the given data point was received (and recalculating all rates) */
|
||||
public void addRateData(String name, long data, long eventDuration) {
|
||||
RateStat stat = (RateStat) _rateStats.get(name);
|
||||
RateStat stat = (RateStat) _rateStats.get(name); // unsynchronized
|
||||
if (stat != null) stat.addData(data, eventDuration);
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,11 @@
|
||||
$Id: history.txt,v 1.497 2006-07-26 20:04:59 jrandom Exp $
|
||||
$Id: history.txt,v 1.498 2006-07-27 01:20:25 jrandom Exp $
|
||||
|
||||
2006-07-27 jrandom
|
||||
* Cut down NTCP connection establishments once we know the peer is skewed
|
||||
(rather than wait for full establishment before verifying)
|
||||
* Removed a lock on the stats framework when accessing rates, which
|
||||
shouldn't be a problem, assuming rates are created (pretty much) all at
|
||||
once and merely updated during the lifetime of the jvm.
|
||||
|
||||
2006-07-27 jrandom
|
||||
* Further NTCP write status cleanup
|
||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.436 $ $Date: 2006-07-26 19:56:52 $";
|
||||
public final static String ID = "$Revision: 1.437 $ $Date: 2006-07-27 01:20:27 $";
|
||||
public final static String VERSION = "0.6.1.22";
|
||||
public final static long BUILD = 3;
|
||||
public final static long BUILD = 4;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
|
||||
System.out.println("Router ID: " + RouterVersion.ID);
|
||||
|
@ -48,6 +48,7 @@ public class GetBidsJob extends JobImpl {
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn("Attempt to send a message to a shitlisted peer - " + to);
|
||||
//context.messageRegistry().peerFailed(to);
|
||||
context.statManager().addRateData("transport.bidFailShitlisted", msg.getLifetime(), 0);
|
||||
fail(context, msg);
|
||||
return;
|
||||
}
|
||||
@ -56,6 +57,7 @@ public class GetBidsJob extends JobImpl {
|
||||
if (to.equals(us)) {
|
||||
if (log.shouldLog(Log.ERROR))
|
||||
log.error("wtf, send a message to ourselves? nuh uh. msg = " + msg);
|
||||
context.statManager().addRateData("transport.bidFailSelf", msg.getLifetime(), 0);
|
||||
fail(context, msg);
|
||||
return;
|
||||
}
|
||||
@ -64,8 +66,10 @@ public class GetBidsJob extends JobImpl {
|
||||
if (bid == null) {
|
||||
int failedCount = msg.getFailedTransports().size();
|
||||
if (failedCount == 0) {
|
||||
context.statManager().addRateData("transport.bidFailNoTransports", msg.getLifetime(), 0);
|
||||
context.shitlist().shitlistRouter(to, "We share no common transports with them");
|
||||
} else if (failedCount >= facade.getTransportCount()) {
|
||||
context.statManager().addRateData("transport.bidFailAllTransports", msg.getLifetime(), 0);
|
||||
// fail after all transports were unsuccessful
|
||||
context.netDb().fail(to);
|
||||
}
|
||||
|
@ -46,6 +46,10 @@ public class TransportManager implements TransportEventListener {
|
||||
_log = _context.logManager().getLog(TransportManager.class);
|
||||
_context.statManager().createRateStat("transport.shitlistOnUnreachable", "Add a peer to the shitlist since none of the transports can reach them", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.noBidsYetNotAllUnreachable", "Add a peer to the shitlist since none of the transports can reach them", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.bidFailShitlisted", "Could not attempt to bid on message, as they were shitlisted", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.bidFailSelf", "Could not attempt to bid on message, as it targeted ourselves", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.bidFailNoTransports", "Could not attempt to bid on message, as none of the transports could attempt it", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.bidFailAllTransports", "Could not attempt to bid on message, as all of the transports had failed", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_transports = new ArrayList();
|
||||
}
|
||||
|
||||
|
@ -75,6 +75,7 @@ public class EstablishState {
|
||||
private Exception _e;
|
||||
private boolean _verified;
|
||||
private boolean _confirmWritten;
|
||||
private boolean _failedBySkew;
|
||||
|
||||
public EstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
|
||||
_context = ctx;
|
||||
@ -131,6 +132,8 @@ public class EstablishState {
|
||||
*/
|
||||
public boolean confirmWritten() { return _confirmWritten; }
|
||||
|
||||
public boolean getFailedBySkew() { return _failedBySkew; }
|
||||
|
||||
/** we are Bob, so receive these bytes as part of an inbound connection */
|
||||
private void receiveInbound(ByteBuffer src) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -340,6 +343,19 @@ public class EstablishState {
|
||||
_tsA = _context.clock().now()/1000;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(prefix()+"h(X+Y) is correct, tsA-tsB=" + (_tsA-_tsB));
|
||||
|
||||
// the skew is not authenticated yet, but it is certainly fatal to
|
||||
// the establishment, so fail hard if appropriate
|
||||
long diff = 1000*Math.abs(_tsA-_tsB);
|
||||
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
|
||||
_context.statManager().addRateData("ntcp.invalidOutboundSkew", diff, 0);
|
||||
_transport.markReachable(_con.getRemotePeer().calculateHash());
|
||||
_context.shitlist().shitlistRouter(_con.getRemotePeer().calculateHash(), "Outbound clock skew of " + diff);
|
||||
fail("Clocks too skewed (" + diff + ")", null, true);
|
||||
return;
|
||||
} else if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug(prefix()+"Clock skew: " + diff);
|
||||
}
|
||||
|
||||
// now prepare and send our response
|
||||
// send E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB), sk, hX_xor_Bob.identHash[16:31])
|
||||
@ -493,15 +509,6 @@ public class EstablishState {
|
||||
alice.fromByteArray(aliceData);
|
||||
long tsA = DataHelper.fromLong(b, 2+sz, 4);
|
||||
|
||||
long diff = 1000*Math.abs(tsA-_tsB);
|
||||
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
|
||||
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0);
|
||||
fail("Clocks too skewed (" + diff + ")");
|
||||
return;
|
||||
} else if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug(prefix()+"Clock skew: " + diff);
|
||||
}
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(768);
|
||||
baos.write(_X);
|
||||
baos.write(_Y);
|
||||
@ -523,6 +530,18 @@ public class EstablishState {
|
||||
if (_verified) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(prefix() + "verification successful for " + _con);
|
||||
|
||||
long diff = 1000*Math.abs(tsA-_tsB);
|
||||
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
|
||||
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0);
|
||||
_transport.markReachable(alice.calculateHash());
|
||||
_context.shitlist().shitlistRouter(alice.calculateHash(), "Clock skew of " + diff);
|
||||
fail("Clocks too skewed (" + diff + ")", null, true);
|
||||
return;
|
||||
} else if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug(prefix()+"Clock skew: " + diff);
|
||||
}
|
||||
|
||||
sendInboundConfirm(alice, tsA);
|
||||
_con.setRemotePeer(alice);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -589,8 +608,10 @@ public class EstablishState {
|
||||
public byte[] getExtraBytes() { return _extra; }
|
||||
|
||||
private void fail(String reason) { fail(reason, null); }
|
||||
private void fail(String reason, Exception e) {
|
||||
private void fail(String reason, Exception e) { fail(reason, e, false); }
|
||||
private void fail(String reason, Exception e, boolean bySkew) {
|
||||
_corrupt = true;
|
||||
_failedBySkew = bySkew;
|
||||
_err = reason;
|
||||
_e = e;
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
|
@ -99,6 +99,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
_context.statManager().createRateStat("ntcp.multipleCloseOnRemove", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.outboundEstablishFailed", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.outboundFailedIOEImmediate", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidOutboundSkew", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.prepBufCache", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.read", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
@ -336,7 +337,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
synchronized (_conLock) {
|
||||
for (Iterator iter = _conByIdent.values().iterator(); iter.hasNext(); ) {
|
||||
NTCPConnection con = (NTCPConnection)iter.next();
|
||||
if (con.getTimeSinceSend() <= 60*1000)
|
||||
if ( (con.getTimeSinceSend() <= 60*1000) || (con.getTimeSinceReceive() <= 60*1000) )
|
||||
active++;
|
||||
}
|
||||
}
|
||||
|
@ -148,7 +148,8 @@ class Reader {
|
||||
if (est.isCorrupt()) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("closing connection on establishment because: " +est.getError(), est.getException());
|
||||
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1, 0);
|
||||
if (!est.getFailedBySkew())
|
||||
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1, 0);
|
||||
con.close();
|
||||
return;
|
||||
} else if (buf.remaining() <= 0) {
|
||||
|
Reference in New Issue
Block a user