From 7d64ecb6282bcae92cb3d240d252c81a1704047c Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 8 Dec 2005 20:53:41 +0000 Subject: [PATCH] 2005-12-08 jrandom * Minor bugfix in SSU for dealing with corrupt packets * Added some hooks for load testing --- history.txt | 6 +- router/java/src/net/i2p/router/JobQueue.java | 9 +- .../src/net/i2p/router/LoadTestManager.java | 337 ++++++++++++++++++ .../src/net/i2p/router/RouterVersion.java | 4 +- .../transport/udp/OutboundMessageState.java | 17 +- .../router/transport/udp/UDPTransport.java | 23 ++ .../pool/PooledTunnelCreatorConfig.java | 5 +- 7 files changed, 389 insertions(+), 12 deletions(-) create mode 100644 router/java/src/net/i2p/router/LoadTestManager.java diff --git a/history.txt b/history.txt index 22ea816add..bc98dcadbf 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,8 @@ -$Id: history.txt,v 1.349 2005/12/07 15:19:43 jrandom Exp $ +$Id: history.txt,v 1.350 2005/12/07 19:50:35 jrandom Exp $ + +2005-12-08 jrandom + * Minor bugfix in SSU for dealing with corrupt packets + * Added some hooks for load testing 2005-12-07 jrandom * Added a first pass at a blog view in Syndie diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index c8db5389fb..8b1c208c33 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -226,7 +226,14 @@ public class JobQueue { return false; } - public void allowParallelOperation() { _allowParallelOperation = true; } + private static final String PROP_LOAD_TEST = "router.loadTest"; + public void allowParallelOperation() { + _allowParallelOperation = true; + if (Boolean.valueOf(_context.getProperty(PROP_LOAD_TEST, "false")).booleanValue()) { + LoadTestManager t = new LoadTestManager(_context); + addJob(t.getTestJob()); + } + } public void restart() { synchronized (_jobLock) { diff --git a/router/java/src/net/i2p/router/LoadTestManager.java b/router/java/src/net/i2p/router/LoadTestManager.java new file mode 100644 index 0000000000..aa8378ab7f --- /dev/null +++ b/router/java/src/net/i2p/router/LoadTestManager.java @@ -0,0 +1,337 @@ +package net.i2p.router; + +import java.io.*; +import java.util.*; +import net.i2p.util.*; +import net.i2p.data.*; +import net.i2p.data.i2np.*; +import net.i2p.router.tunnel.*; +import net.i2p.router.tunnel.pool.*; +import net.i2p.router.transport.udp.UDPTransport; + +/** + * Coordinate some tests of peers to see how much load they can handle. This + * test is not safe for use in anonymous environments, but should help pinpoint + * some performance aspects of the live net. + * + * Each individual load test is conducted by building a single one hop inbound + * tunnel with the peer in question acting as the inbound gateway. We then send + * messages directly to that gateway, which they batch up and send "down the + * tunnel" (aka directly to us), at which point we then send another message, + * and so on, until the tunnel expires. Along the way, we record a few vital + * stats to the "loadtest.log" file. If we don't receive a message, we send another + * after 10 seconds. + * + * If "router.loadTestSmall=true", we transmit a tiny DeliveryStatusMessage (~96 bytes + * at the SSU level), which is sent back to us as a single TunnelDataMessage (~1KB). + * Otherwise, we transmit a 4KB DataMessage, which is sent back to us as five (1KB) + * TunnelDataMessages. This size is chosen because the streaming lib uses 4KB messages + * by default. + * + */ +public class LoadTestManager { + private RouterContext _context; + private Log _log; + private Writer _out; + private List _untestedPeers; + + public LoadTestManager(RouterContext ctx) { + _context = ctx; + _log = ctx.logManager().getLog(LoadTestManager.class); + try { + _out = new BufferedWriter(new FileWriter("loadtest.log", true)); + } catch (IOException ioe) { + _log.log(Log.CRIT, "error creating log", ioe); + } + _context.statManager().createRateStat("test.lifetimeSuccessful", "How many messages we can pump through a load test during a tunnel's lifetime", "test", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("test.lifetimeFailed", "How many messages we fail to pump through (period == successful)", "test", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("test.timeoutAfter", "How many messages have we successfully pumped through a tunnel when one particular message times out", "test", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("test.rtt", "How long it takes to get a reply", "test", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("test.rttHigh", "How long it takes to get a reply, if it is a slow rtt", "test", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + } + + public Job getTestJob() { return new TestJob(_context); } + private class TestJob extends JobImpl { + public TestJob(RouterContext ctx) { + super(ctx); + // wait 5m to start up + getTiming().setStartAfter(3*60*1000 + getContext().clock().now()); + } + public String getName() { return "run load tests"; } + public void runJob() { + runTest(); + getTiming().setStartAfter(10*60*1000 + getContext().clock().now()); + getContext().jobQueue().addJob(TestJob.this); + } + } + + private static final int CONCURRENT_PEERS = 10; + + public void runTest() { + if ( (_untestedPeers == null) || (_untestedPeers.size() <= 0) ) { + UDPTransport t = UDPTransport._instance(); + if (t != null) + _untestedPeers = t._getActivePeers(); + } + int peers = getConcurrency(); + for (int i = 0; i < peers && _untestedPeers.size() > 0; i++) + buildTestTunnel((Hash)_untestedPeers.remove(0)); + } + + private int getConcurrency() { + int rv = CONCURRENT_PEERS; + try { + rv = Integer.parseInt(_context.getProperty("router.loadTestConcurrency", CONCURRENT_PEERS+"")); + } catch (NumberFormatException nfe) { + rv = CONCURRENT_PEERS; + } + if (rv < 1) + rv = 1; + if (rv > 50) + rv = 50; + return rv; + } + + /** + * Actually send the messages through the given tunnel + */ + private void runTest(LoadTestTunnelConfig tunnel) { + log(tunnel, "start"); + sendTestMessage(tunnel, 0); + } + private void sendTestMessage(LoadTestTunnelConfig tunnel, long count) { + if (_context.clock().now() > tunnel.getExpiration()) + return; + RouterInfo target = _context.netDb().lookupRouterInfoLocally(tunnel.getPeer(0)); + if (target == null) { + log(tunnel, "lookup failed"); + return; + } + + I2NPMessage payloadMessage = createPayloadMessage(); + + TunnelGatewayMessage tm = new TunnelGatewayMessage(_context); + tm.setMessage(payloadMessage); + tm.setTunnelId(tunnel.getReceiveTunnelId(0)); + tm.setMessageExpiration(payloadMessage.getMessageExpiration()); + + OutNetMessage om = new OutNetMessage(_context); + om.setMessage(tm); + SendAgain failed = new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), false, count+1); + om.setOnFailedReplyJob(failed); + om.setOnReplyJob(new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), true, count+1)); + //om.setOnFailedSendJob(failed); + om.setReplySelector(new Selector(tunnel, payloadMessage.getUniqueId(), count+1)); + om.setTarget(target); + om.setExpiration(tm.getMessageExpiration()); + om.setPriority(40); + _context.outNetMessagePool().add(om); + //log(tunnel, m.getMessageId() + " sent"); + } + + private static final boolean SMALL_PAYLOAD = false; + + private boolean useSmallPayload() { + return Boolean.valueOf(_context.getProperty("router.loadTestSmall", SMALL_PAYLOAD + "")).booleanValue(); + } + + private I2NPMessage createPayloadMessage() { + // doesnt matter whats in the message, as it gets dropped anyway, since we match + // on it with the message.uniqueId + if (useSmallPayload()) { + DeliveryStatusMessage m = new DeliveryStatusMessage(_context); + long now = _context.clock().now(); + m.setArrival(now); + m.setMessageExpiration(now + 10*1000); + m.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); + return m; + } else { + DataMessage m = new DataMessage(_context); + byte data[] = new byte[4096]; + _context.random().nextBytes(data); + m.setData(data); + long now = _context.clock().now(); + m.setMessageExpiration(now + 10*1000); + return m; + } + } + + private class SendAgain extends JobImpl implements ReplyJob { + private LoadTestTunnelConfig _cfg; + private long _messageId; + private boolean _ok; + private long _count; + private boolean _run; + private long _dontStartUntil; + public SendAgain(RouterContext ctx, LoadTestTunnelConfig cfg, long messageId, boolean ok, long count) { + super(ctx); + _cfg = cfg; + _messageId = messageId; + _ok = ok; + _count = count; + _run = false; + _dontStartUntil = ctx.clock().now() + 10*1000; + } + public String getName() { return "send another load test"; } + public void runJob() { + if (!_ok) { + if (!_run) { + log(_cfg, _messageId + " " + _count + " TIMEOUT"); + getContext().statManager().addRateData("test.timeoutAfter", _cfg.getFullMessageCount(), 0); + if (getContext().clock().now() >= _dontStartUntil) { + sendTestMessage(_cfg, (_ok ? _count : _count-1)); + _cfg.incrementFailed(); + } else { + getTiming().setStartAfter(_dontStartUntil); + getContext().jobQueue().addJob(SendAgain.this); + } + } + _run = true; + } else { + sendTestMessage(_cfg, (_ok ? _count : _count-1)); + } + } + + public void setMessage(I2NPMessage message) {} + } + + private class Selector implements MessageSelector { + private LoadTestTunnelConfig _cfg; + private long _messageId; + private long _count; + public Selector(LoadTestTunnelConfig cfg, long messageId, long count) { + _cfg = cfg; + _messageId = messageId; + _count = count; + } + public boolean continueMatching() { return false; } + public long getExpiration() { return _cfg.getExpiration(); } + public boolean isMatch(I2NPMessage message) { + if (message.getUniqueId() == _messageId) { + _cfg.incrementFull(); + long period = _context.clock().now() - (message.getMessageExpiration() - 10*1000); + log(_cfg, _messageId + " " + _count + " after " + period); + _context.statManager().addRateData("test.rtt", period, _count); + if (period > 2000) + _context.statManager().addRateData("test.rttHigh", period, _count); + return true; + } + return false; + } + } + + private void log(LoadTestTunnelConfig tunnel, String msg) { + StringBuffer buf = new StringBuffer(128); + Hash peer = tunnel.getPeer(0); + if (peer != null) + buf.append(peer.toBase64()); + else + buf.append("[unknown_peer]"); + buf.append(" "); + TunnelId id = tunnel.getReceiveTunnelId(0); + if (id != null) + buf.append(id.getTunnelId()); + else + buf.append("[unknown_tunnel]"); + buf.append(" "); + buf.append(_context.clock().now()).append(" ").append(msg).append("\n"); + try { + synchronized (_out) { + _out.write(buf.toString()); + } + } catch (IOException ioe) { + _log.error("error logging [" + msg + "]", ioe); + } + } + + private void buildTestTunnel(Hash peer) { + long expiration = _context.clock().now() + 10*60*1000; + + LoadTestTunnelConfig cfg = new LoadTestTunnelConfig(_context, 2, true); + // cfg.getPeer() is ordered gateway first + cfg.setPeer(0, peer); + HopConfig hop = cfg.getConfig(0); + hop.setExpiration(expiration); + hop.setIVKey(_context.keyGenerator().generateSessionKey()); + hop.setLayerKey(_context.keyGenerator().generateSessionKey()); + // now for ourselves + cfg.setPeer(1, _context.routerHash()); + hop = cfg.getConfig(1); + hop.setExpiration(expiration); + hop.setIVKey(_context.keyGenerator().generateSessionKey()); + hop.setLayerKey(_context.keyGenerator().generateSessionKey()); + + cfg.setExpiration(expiration); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Config for " + peer.toBase64() + ": " + cfg); + + CreatedJob onCreated = new CreatedJob(_context, cfg); + FailedJob fail = new FailedJob(_context, cfg); + RequestTunnelJob req = new RequestTunnelJob(_context, cfg, onCreated, fail, cfg.getLength()-1, false, true); + _context.jobQueue().addJob(req); + } + + private class CreatedJob extends JobImpl { + private LoadTestTunnelConfig _cfg; + public CreatedJob(RouterContext ctx, LoadTestTunnelConfig cfg) { + super(ctx); + _cfg = cfg; + } + public String getName() { return "Test tunnel created"; } + public void runJob() { + if (_log.shouldLog(Log.INFO)) + _log.info("Tunnel created for testing peer " + _cfg.getPeer(0).toBase64()); + getContext().tunnelDispatcher().joinInbound(_cfg); + //log(_cfg, "joined"); + + Expire j = new Expire(getContext(), _cfg); + _cfg.setExpireJob(j); + getContext().jobQueue().addJob(j); + runTest(_cfg); + } + } + private class Expire extends JobImpl { + private LoadTestTunnelConfig _cfg; + public Expire(RouterContext ctx, LoadTestTunnelConfig cfg) { + super(ctx); + _cfg = cfg; + getTiming().setStartAfter(cfg.getExpiration()+60*1000); + } + public String getName() { return "expire test tunnel"; } + public void runJob() { + getContext().tunnelDispatcher().remove(_cfg); + log(_cfg, "expired after sending " + _cfg.getFullMessageCount() + " / " + _cfg.getFailedMessageCount()); + getContext().statManager().addRateData("test.lifetimeSuccessful", _cfg.getFullMessageCount(), _cfg.getFailedMessageCount()); + if (_cfg.getFailedMessageCount() > 0) + getContext().statManager().addRateData("test.lifetimeFailed", _cfg.getFailedMessageCount(), _cfg.getFullMessageCount()); + } + } + private class FailedJob extends JobImpl { + private LoadTestTunnelConfig _cfg; + public FailedJob(RouterContext ctx, LoadTestTunnelConfig cfg) { + super(ctx); + _cfg = cfg; + } + public String getName() { return "Test tunnel failed"; } + public void runJob() { + if (_log.shouldLog(Log.INFO)) + _log.info("Tunnel failed for testing peer " + _cfg.getPeer(0).toBase64()); + log(_cfg, "failed"); + } + } + + private class LoadTestTunnelConfig extends PooledTunnelCreatorConfig { + private long _failed; + private long _fullMessages; + public LoadTestTunnelConfig(RouterContext ctx, int length, boolean isInbound) { + super(ctx, length, isInbound); + _failed = 0; + _fullMessages = 0; + } + public void incrementFailed() { ++_failed; } + public long getFailedMessageCount() { return _failed; } + public void incrementFull() { ++_fullMessages; } + public long getFullMessageCount() { return _fullMessages; } + } +} diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index efdb1cd8ad..12b75cd616 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.306 $ $Date: 2005/11/30 15:48:26 $"; + public final static String ID = "$Revision: 1.307 $ $Date: 2005/12/01 12:16:54 $"; public final static String VERSION = "0.6.1.7"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 167ce023c1..4ebcbcc0b5 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -268,12 +268,17 @@ public class OutboundMessageState { int end = start + fragmentSize(fragmentNum); if (_messageBuf == null) return -1; int toSend = end - start; - System.arraycopy(_messageBuf.getData(), start, out, outOffset, toSend); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId - + "[" + start + "-" + (start+toSend) + "/" + _messageBuf.getValid() + "/" + _fragmentSize + "]: " - + Base64.encode(out, outOffset, toSend)); - return toSend; + byte buf[] = _messageBuf.getData(); + if ( (buf != null) && (start + toSend < buf.length) && (out != null) && (outOffset + toSend < out.length) ) { + System.arraycopy(_messageBuf.getData(), start, out, outOffset, toSend); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId + + "[" + start + "-" + (start+toSend) + "/" + _messageBuf.getValid() + "/" + _fragmentSize + "]: " + + Base64.encode(out, outOffset, toSend)); + return toSend; + } else { + return -1; + } } public String toString() { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 949f17c2e6..184e5a33c3 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -151,6 +151,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _context.statManager().createRateStat("udp.addressTestInsteadOfUpdate", "How many times we fire off a peer test of ourselves instead of adjusting our own reachable address?", "udp", new long[] { 1*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.addressUpdated", "How many times we adjust our own reachable IP address", "udp", new long[] { 1*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.proactiveReestablish", "How long a session was idle for when we proactively reestablished it", "udp", new long[] { 1*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); + + __instance = this; } public void startup() { @@ -947,6 +949,27 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } + + private static UDPTransport __instance; + /** **internal, do not use** */ + public static final UDPTransport _instance() { return __instance; } + /** **internal, do not use** return the peers (Hash) of active peers. */ + public List _getActivePeers() { + List peers = new ArrayList(128); + synchronized (_peersByIdent) { + peers.addAll(_peersByIdent.keySet()); + } + + long now = _context.clock().now(); + for (Iterator iter = peers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + PeerState state = getPeerState(peer); + if (now-state.getLastReceiveTime() > 5*60*1000) + iter.remove(); // don't include old peers + } + return peers; + } + public void renderStatusHTML(Writer out) throws IOException { TreeSet peers = new TreeSet(AlphaComparator.instance()); synchronized (_peersByIdent) { diff --git a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java index 0e634d6ece..f22e88200f 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java @@ -2,6 +2,7 @@ package net.i2p.router.tunnel.pool; import java.util.Properties; import net.i2p.data.Hash; +import net.i2p.router.Job; import net.i2p.router.RouterContext; import net.i2p.router.tunnel.TunnelCreatorConfig; @@ -14,7 +15,7 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { private boolean _failed; private TestJob _testJob; private RebuildJob _rebuildJob; - private ExpireJob _expireJob; + private Job _expireJob; /** Creates a new instance of PooledTunnelCreatorConfig */ @@ -63,5 +64,5 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { public void setTestJob(TestJob job) { _testJob = job; } public void setRebuildJob(RebuildJob job) { _rebuildJob = job; } - public void setExpireJob(ExpireJob job) { _expireJob = job; } + public void setExpireJob(Job job) { _expireJob = job; } }