2005-12-31 jrandom
* Include a simple torrent creator in the I2PSnark web UI * Further streaming lib closing improvements * Refactored the load test components to run off live tunnels (though, still not safe for normal/anonymous load testing)
This commit is contained in:
@ -178,29 +178,9 @@ public class InNetMessagePool implements Service {
|
||||
}
|
||||
|
||||
if (allowMatches) {
|
||||
List origMessages = _context.messageRegistry().getOriginalMessages(messageBody);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Original messages for inbound message: " + origMessages.size());
|
||||
if (origMessages.size() > 1) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Orig: " + origMessages + " \nthe above are replies for: " + messageBody,
|
||||
new Exception("Multiple matches"));
|
||||
}
|
||||
int replies = handleReplies(messageBody);
|
||||
|
||||
for (int i = 0; i < origMessages.size(); i++) {
|
||||
OutNetMessage omsg = (OutNetMessage)origMessages.get(i);
|
||||
ReplyJob job = omsg.getOnReplyJob();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Original message [" + i + "] " + omsg.getReplySelector()
|
||||
+ " : " + omsg + ": reply job: " + job);
|
||||
|
||||
if (job != null) {
|
||||
job.setMessage(messageBody);
|
||||
_context.jobQueue().addJob(job);
|
||||
}
|
||||
}
|
||||
|
||||
if (origMessages.size() <= 0) {
|
||||
if (replies <= 0) {
|
||||
// not handled as a reply
|
||||
if (!jobFound) {
|
||||
// was not handled via HandlerJobBuilder
|
||||
@ -247,6 +227,31 @@ public class InNetMessagePool implements Service {
|
||||
return 0; // no queue
|
||||
}
|
||||
|
||||
public int handleReplies(I2NPMessage messageBody) {
|
||||
List origMessages = _context.messageRegistry().getOriginalMessages(messageBody);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Original messages for inbound message: " + origMessages.size());
|
||||
if (origMessages.size() > 1) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Orig: " + origMessages + " \nthe above are replies for: " + messageBody,
|
||||
new Exception("Multiple matches"));
|
||||
}
|
||||
|
||||
for (int i = 0; i < origMessages.size(); i++) {
|
||||
OutNetMessage omsg = (OutNetMessage)origMessages.get(i);
|
||||
ReplyJob job = omsg.getOnReplyJob();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Original message [" + i + "] " + omsg.getReplySelector()
|
||||
+ " : " + omsg + ": reply job: " + job);
|
||||
|
||||
if (job != null) {
|
||||
job.setMessage(messageBody);
|
||||
_context.jobQueue().addJob(job);
|
||||
}
|
||||
}
|
||||
return origMessages.size();
|
||||
}
|
||||
|
||||
// the following short circuits the tunnel dispatching - i'm not sure whether
|
||||
// we'll want to run the dispatching in jobs or whether it shuold go inline with
|
||||
// others and/or on other threads (e.g. transport threads). lets try 'em both.
|
||||
|
@ -226,13 +226,8 @@ public class JobQueue {
|
||||
return false;
|
||||
}
|
||||
|
||||
private static final String PROP_LOAD_TEST = "router.loadTest";
|
||||
public void allowParallelOperation() {
|
||||
_allowParallelOperation = true;
|
||||
if (Boolean.valueOf(_context.getProperty(PROP_LOAD_TEST, "false")).booleanValue()) {
|
||||
LoadTestManager t = new LoadTestManager(_context);
|
||||
addJob(t.getTestJob());
|
||||
}
|
||||
}
|
||||
|
||||
public void restart() {
|
||||
|
@ -5,16 +5,19 @@ import java.util.*;
|
||||
import net.i2p.util.*;
|
||||
import net.i2p.data.*;
|
||||
import net.i2p.data.i2np.*;
|
||||
import net.i2p.router.message.*;
|
||||
import net.i2p.router.tunnel.*;
|
||||
import net.i2p.router.tunnel.pool.*;
|
||||
import net.i2p.router.transport.udp.UDPTransport;
|
||||
|
||||
/**
|
||||
* Coordinate some tests of peers to see how much load they can handle. This
|
||||
* test is not safe for use in anonymous environments, but should help pinpoint
|
||||
* some performance aspects of the live net.
|
||||
* Coordinate some tests of peers to see how much load they can handle. If
|
||||
* TEST_LIVE_TUNNELS is set to false, it builds load test tunnels across various
|
||||
* peers in ways that are not anonymity sensitive (but may help with testing the net).
|
||||
* If it is set to true, however, it runs a few tests at a time for actual tunnels that
|
||||
* are built, to help determine whether our peer selection is insufficient.
|
||||
*
|
||||
* Each individual load test is conducted by building a single one hop inbound
|
||||
* Load tests of fake tunnels are conducted by building a single one hop inbound
|
||||
* tunnel with the peer in question acting as the inbound gateway. We then send
|
||||
* messages directly to that gateway, which they batch up and send "down the
|
||||
* tunnel" (aka directly to us), at which point we then send another message,
|
||||
@ -24,9 +27,15 @@ import net.i2p.router.transport.udp.UDPTransport;
|
||||
*
|
||||
* If "router.loadTestSmall=true", we transmit a tiny DeliveryStatusMessage (~96 bytes
|
||||
* at the SSU level), which is sent back to us as a single TunnelDataMessage (~1KB).
|
||||
* Otherwise, we transmit a 4KB DataMessage, which is sent back to us as five (1KB)
|
||||
* TunnelDataMessages. This size is chosen because the streaming lib uses 4KB messages
|
||||
* by default.
|
||||
* Otherwise, we transmit a 4KB DataMessage wrapped inside a garlic message, which is
|
||||
* sent back to us as five (1KB) TunnelDataMessages. This size is chosen because the
|
||||
* streaming lib uses 4KB messages by default.
|
||||
*
|
||||
* Load tests of live tunnels pick a random tunnel from the tested pool's pair (e.g. if
|
||||
* we are testing an outbound tunnel for a particular destination, it picks an inbound
|
||||
* tunnel from that destination's inbound pool), with each message going down that one
|
||||
* randomly paired tunnel for the duration of the load test (varying the paired tunnel
|
||||
* with each message had poor results)
|
||||
*
|
||||
*/
|
||||
public class LoadTestManager {
|
||||
@ -34,12 +43,14 @@ public class LoadTestManager {
|
||||
private Log _log;
|
||||
private Writer _out;
|
||||
private List _untestedPeers;
|
||||
|
||||
private List _active;
|
||||
public LoadTestManager(RouterContext ctx) {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(LoadTestManager.class);
|
||||
_active = Collections.synchronizedList(new ArrayList());
|
||||
try {
|
||||
_out = new BufferedWriter(new FileWriter("loadtest.log", true));
|
||||
_out.write("startup at " + ctx.clock().now() + "\n");
|
||||
} catch (IOException ioe) {
|
||||
_log.log(Log.CRIT, "error creating log", ioe);
|
||||
}
|
||||
@ -50,6 +61,8 @@ public class LoadTestManager {
|
||||
_context.statManager().createRateStat("test.rttHigh", "How long it takes to get a reply, if it is a slow rtt", "test", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
|
||||
}
|
||||
|
||||
public static final boolean TEST_LIVE_TUNNELS = true;
|
||||
|
||||
public Job getTestJob() { return new TestJob(_context); }
|
||||
private class TestJob extends JobImpl {
|
||||
public TestJob(RouterContext ctx) {
|
||||
@ -59,14 +72,16 @@ public class LoadTestManager {
|
||||
}
|
||||
public String getName() { return "run load tests"; }
|
||||
public void runJob() {
|
||||
runTest();
|
||||
getTiming().setStartAfter(10*60*1000 + getContext().clock().now());
|
||||
getContext().jobQueue().addJob(TestJob.this);
|
||||
if (!TEST_LIVE_TUNNELS) {
|
||||
runTest();
|
||||
getTiming().setStartAfter(10*60*1000 + getContext().clock().now());
|
||||
getContext().jobQueue().addJob(TestJob.this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** 10 peers at a time */
|
||||
private static final int CONCURRENT_PEERS = 10;
|
||||
private static final int CONCURRENT_PEERS = 0;
|
||||
/** 4 messages per peer at a time */
|
||||
private static final int CONCURRENT_MESSAGES = 4;
|
||||
|
||||
@ -88,8 +103,8 @@ public class LoadTestManager {
|
||||
} catch (NumberFormatException nfe) {
|
||||
rv = CONCURRENT_PEERS;
|
||||
}
|
||||
if (rv < 1)
|
||||
rv = 1;
|
||||
if (rv < 0)
|
||||
rv = 0;
|
||||
if (rv > 50)
|
||||
rv = 50;
|
||||
return rv;
|
||||
@ -115,38 +130,119 @@ public class LoadTestManager {
|
||||
private void runTest(LoadTestTunnelConfig tunnel) {
|
||||
log(tunnel, "start");
|
||||
int peerMessages = getPeerMessages();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Run test on " + tunnel + " with " + peerMessages + " messages");
|
||||
for (int i = 0; i < peerMessages; i++)
|
||||
sendTestMessage(tunnel);
|
||||
}
|
||||
|
||||
private void pickTunnels(LoadTestTunnelConfig tunnel) {
|
||||
TunnelInfo inbound = null;
|
||||
TunnelInfo outbound = null;
|
||||
if (tunnel.getTunnel().isInbound()) {
|
||||
inbound = _context.tunnelManager().getTunnelInfo(tunnel.getReceiveTunnelId(0));
|
||||
if ( (inbound == null) && (_log.shouldLog(Log.WARN)) )
|
||||
_log.warn("where are we? inbound tunnel isn't known: " + tunnel, new Exception("source"));
|
||||
if (tunnel.getTunnel().getDestination() != null)
|
||||
outbound = _context.tunnelManager().selectOutboundTunnel(tunnel.getTunnel().getDestination());
|
||||
else
|
||||
outbound = _context.tunnelManager().selectOutboundTunnel();
|
||||
} else {
|
||||
outbound = _context.tunnelManager().getTunnelInfo(tunnel.getSendTunnelId(0));
|
||||
if ( (outbound == null) && (_log.shouldLog(Log.WARN)) )
|
||||
_log.warn("where are we? outbound tunnel isn't known: " + tunnel, new Exception("source"));
|
||||
if (tunnel.getTunnel().getDestination() != null)
|
||||
inbound = _context.tunnelManager().selectInboundTunnel(tunnel.getTunnel().getDestination());
|
||||
else
|
||||
inbound = _context.tunnelManager().selectInboundTunnel();
|
||||
}
|
||||
tunnel.setInbound(inbound);
|
||||
tunnel.setOutbound(outbound);
|
||||
}
|
||||
|
||||
private void sendTestMessage(LoadTestTunnelConfig tunnel) {
|
||||
if (_context.clock().now() > tunnel.getExpiration())
|
||||
return;
|
||||
RouterInfo target = _context.netDb().lookupRouterInfoLocally(tunnel.getPeer(0));
|
||||
if (target == null) {
|
||||
log(tunnel, "lookup failed");
|
||||
long now = _context.clock().now();
|
||||
if (now > tunnel.getExpiration()) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Not sending a test message to " + tunnel + " because it expired");
|
||||
tunnel.logComplete();
|
||||
_active.remove(tunnel);
|
||||
return;
|
||||
}
|
||||
|
||||
I2NPMessage payloadMessage = createPayloadMessage();
|
||||
|
||||
TunnelGatewayMessage tm = new TunnelGatewayMessage(_context);
|
||||
tm.setMessage(payloadMessage);
|
||||
tm.setTunnelId(tunnel.getReceiveTunnelId(0));
|
||||
tm.setMessageExpiration(payloadMessage.getMessageExpiration());
|
||||
|
||||
OutNetMessage om = new OutNetMessage(_context);
|
||||
om.setMessage(tm);
|
||||
SendAgain failed = new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), false);
|
||||
om.setOnFailedReplyJob(failed);
|
||||
om.setOnReplyJob(new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), true));
|
||||
//om.setOnFailedSendJob(failed);
|
||||
om.setReplySelector(new Selector(tunnel, payloadMessage.getUniqueId()));
|
||||
om.setTarget(target);
|
||||
om.setExpiration(tm.getMessageExpiration());
|
||||
om.setPriority(40);
|
||||
_context.outNetMessagePool().add(om);
|
||||
//log(tunnel, m.getMessageId() + " sent");
|
||||
if (TEST_LIVE_TUNNELS) {
|
||||
TunnelInfo inbound = tunnel.getInbound();
|
||||
TunnelInfo outbound = tunnel.getOutbound();
|
||||
if ( (inbound == null) || (outbound == null) ) {
|
||||
pickTunnels(tunnel);
|
||||
inbound = tunnel.getInbound();
|
||||
outbound = tunnel.getOutbound();
|
||||
}
|
||||
|
||||
if (inbound == null) {
|
||||
log(tunnel, "No inbound tunnels found");
|
||||
_active.remove(tunnel);
|
||||
return;
|
||||
} else if (outbound == null) {
|
||||
log(tunnel, "No outbound tunnels found");
|
||||
tunnel.logComplete();
|
||||
_active.remove(tunnel);
|
||||
return;
|
||||
}
|
||||
|
||||
if ( (now >= inbound.getExpiration()) || (now >= outbound.getExpiration()) ) {
|
||||
tunnel.logComplete();
|
||||
_active.remove(tunnel);
|
||||
return;
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("inbound and outbound found for " + tunnel);
|
||||
|
||||
I2NPMessage payloadMessage = createPayloadMessage();
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("testing live tunnels with inbound [" + inbound + "] and outbound [" + outbound + "]");
|
||||
|
||||
// this should take into consideration both the inbound and outbound tunnels
|
||||
// ... but it doesn't, yet.
|
||||
_context.messageRegistry().registerPending(new Selector(tunnel, payloadMessage.getUniqueId()),
|
||||
new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), true),
|
||||
new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), false),
|
||||
10*1000);
|
||||
_context.tunnelDispatcher().dispatchOutbound(payloadMessage, outbound.getSendTunnelId(0),
|
||||
inbound.getReceiveTunnelId(0),
|
||||
inbound.getPeer(0));
|
||||
//log(tunnel, payloadMessage.getUniqueId() + " sent via " + inbound + " / " + outbound);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("NOT testing live tunnels for [" + tunnel + "]");
|
||||
RouterInfo target = _context.netDb().lookupRouterInfoLocally(tunnel.getPeer(0));
|
||||
if (target == null) {
|
||||
log(tunnel, "lookup failed");
|
||||
return;
|
||||
}
|
||||
|
||||
I2NPMessage payloadMessage = createPayloadMessage();
|
||||
|
||||
TunnelGatewayMessage tm = new TunnelGatewayMessage(_context);
|
||||
tm.setMessage(payloadMessage);
|
||||
tm.setTunnelId(tunnel.getReceiveTunnelId(0));
|
||||
tm.setMessageExpiration(payloadMessage.getMessageExpiration());
|
||||
|
||||
OutNetMessage om = new OutNetMessage(_context);
|
||||
om.setMessage(tm);
|
||||
SendAgain failed = new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), false);
|
||||
om.setOnFailedReplyJob(failed);
|
||||
om.setOnReplyJob(new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), true));
|
||||
//om.setOnFailedSendJob(failed);
|
||||
om.setReplySelector(new Selector(tunnel, payloadMessage.getUniqueId()));
|
||||
om.setTarget(target);
|
||||
om.setExpiration(tm.getMessageExpiration());
|
||||
om.setPriority(40);
|
||||
_context.outNetMessagePool().add(om);
|
||||
//log(tunnel, m.getMessageId() + " sent");
|
||||
}
|
||||
}
|
||||
|
||||
private static final boolean SMALL_PAYLOAD = false;
|
||||
@ -172,7 +268,40 @@ public class LoadTestManager {
|
||||
m.setData(data);
|
||||
long now = _context.clock().now();
|
||||
m.setMessageExpiration(now + 10*1000);
|
||||
return m;
|
||||
|
||||
if (true) {
|
||||
// garlic wrap the data message to ourselves so the endpoints and gateways
|
||||
// can't tell its a test, encrypting it with a random key and tag,
|
||||
// remembering that key+tag so that we can decrypt it later without any ElGamal
|
||||
DeliveryInstructions instructions = new DeliveryInstructions();
|
||||
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL);
|
||||
|
||||
PayloadGarlicConfig payload = new PayloadGarlicConfig();
|
||||
payload.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
|
||||
payload.setId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
|
||||
payload.setId(m.getUniqueId());
|
||||
payload.setPayload(m);
|
||||
payload.setRecipient(_context.router().getRouterInfo());
|
||||
payload.setDeliveryInstructions(instructions);
|
||||
payload.setRequestAck(false);
|
||||
payload.setExpiration(m.getMessageExpiration());
|
||||
|
||||
SessionKey encryptKey = _context.keyGenerator().generateSessionKey();
|
||||
SessionTag encryptTag = new SessionTag(true);
|
||||
SessionKey sentKey = new SessionKey();
|
||||
Set sentTags = null;
|
||||
GarlicMessage msg = GarlicMessageBuilder.buildMessage(_context, payload, sentKey, sentTags,
|
||||
_context.keyManager().getPublicKey(),
|
||||
encryptKey, encryptTag);
|
||||
|
||||
Set encryptTags = new HashSet(1);
|
||||
encryptTags.add(encryptTag);
|
||||
_context.sessionKeyManager().tagsReceived(encryptKey, encryptTags);
|
||||
|
||||
return msg;
|
||||
} else {
|
||||
return m;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -238,23 +367,66 @@ public class LoadTestManager {
|
||||
}
|
||||
|
||||
private void log(LoadTestTunnelConfig tunnel, String msg) {
|
||||
//if (!_log.shouldLog(Log.INFO)) return;
|
||||
StringBuffer buf = new StringBuffer(128);
|
||||
for (int i = 0; i < tunnel.getLength()-1; i++) {
|
||||
Hash peer = tunnel.getPeer(i);
|
||||
if ( (peer != null) && (peer.equals(_context.routerHash())) )
|
||||
continue;
|
||||
else if (peer != null)
|
||||
buf.append(peer.toBase64());
|
||||
else
|
||||
buf.append("[unknown_peer]");
|
||||
buf.append(" ");
|
||||
TunnelId id = tunnel.getReceiveTunnelId(i);
|
||||
if (id != null)
|
||||
buf.append(id.getTunnelId());
|
||||
else
|
||||
buf.append("[unknown_tunnel]");
|
||||
buf.append(" ");
|
||||
buf.append(_context.clock().now()).append(" hop ").append(i).append(" ").append(msg).append("\n");
|
||||
if (tunnel.getInbound() == null) {
|
||||
for (int i = 0; i < tunnel.getLength()-1; i++) {
|
||||
Hash peer = tunnel.getPeer(i);
|
||||
if ( (peer != null) && (peer.equals(_context.routerHash())) )
|
||||
continue;
|
||||
else if (peer != null)
|
||||
buf.append(peer.toBase64());
|
||||
else
|
||||
buf.append("[unknown_peer]");
|
||||
buf.append(" ");
|
||||
TunnelId id = tunnel.getReceiveTunnelId(i);
|
||||
if (id != null)
|
||||
buf.append(id.getTunnelId());
|
||||
else
|
||||
buf.append("[unknown_tunnel]");
|
||||
buf.append(" ");
|
||||
buf.append(_context.clock().now()).append(" hop ").append(i).append(" ").append(msg).append("\n");
|
||||
}
|
||||
} else {
|
||||
int hop = 0;
|
||||
TunnelInfo info = tunnel.getOutbound();
|
||||
for (int i = 0; (info != null) && (i < info.getLength()-1); i++) {
|
||||
Hash peer = info.getPeer(i);
|
||||
if ( (peer != null) && (peer.equals(_context.routerHash())) )
|
||||
continue;
|
||||
else if (peer != null)
|
||||
buf.append(peer.toBase64());
|
||||
else
|
||||
buf.append("[unknown_peer]");
|
||||
buf.append(" ");
|
||||
TunnelId id = tunnel.getReceiveTunnelId(i);
|
||||
if (id != null)
|
||||
buf.append(id.getTunnelId());
|
||||
else
|
||||
buf.append("[unknown_tunnel]");
|
||||
buf.append(" ");
|
||||
buf.append(_context.clock().now()).append(" out_hop ").append(hop).append(" ").append(msg).append("\n");
|
||||
hop++;
|
||||
}
|
||||
info = tunnel.getInbound();
|
||||
for (int i = 0; (info != null) && (i < info.getLength()-1); i++) {
|
||||
Hash peer = info.getPeer(i);
|
||||
if ( (peer != null) && (peer.equals(_context.routerHash())) )
|
||||
continue;
|
||||
else if (peer != null)
|
||||
buf.append(peer.toBase64());
|
||||
else
|
||||
buf.append("[unknown_peer]");
|
||||
buf.append(" ");
|
||||
TunnelId id = tunnel.getReceiveTunnelId(i);
|
||||
if (id != null)
|
||||
buf.append(id.getTunnelId());
|
||||
else
|
||||
buf.append("[unknown_tunnel]");
|
||||
buf.append(" ");
|
||||
buf.append(_context.clock().now()).append(" in_hop ").append(hop).append(" ").append(msg).append("\n");
|
||||
hop++;
|
||||
}
|
||||
}
|
||||
try {
|
||||
synchronized (_out) {
|
||||
@ -280,7 +452,7 @@ public class LoadTestManager {
|
||||
private void buildOneHop(Hash peer) {
|
||||
long expiration = _context.clock().now() + 10*60*1000;
|
||||
|
||||
LoadTestTunnelConfig cfg = new LoadTestTunnelConfig(_context, 2, true);
|
||||
PooledTunnelCreatorConfig cfg = new PooledTunnelCreatorConfig(_context, 2, true);
|
||||
// cfg.getPeer() is ordered gateway first
|
||||
cfg.setPeer(0, peer);
|
||||
HopConfig hop = cfg.getConfig(0);
|
||||
@ -299,8 +471,10 @@ public class LoadTestManager {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Config for " + peer.toBase64() + ": " + cfg);
|
||||
|
||||
CreatedJob onCreated = new CreatedJob(_context, cfg);
|
||||
FailedJob fail = new FailedJob(_context, cfg);
|
||||
LoadTestTunnelConfig ltCfg = new LoadTestTunnelConfig(cfg);
|
||||
|
||||
CreatedJob onCreated = new CreatedJob(_context, ltCfg);
|
||||
FailedJob fail = new FailedJob(_context, ltCfg);
|
||||
RequestTunnelJob req = new RequestTunnelJob(_context, cfg, onCreated, fail, cfg.getLength()-1, false, true);
|
||||
_context.jobQueue().addJob(req);
|
||||
}
|
||||
@ -333,7 +507,7 @@ public class LoadTestManager {
|
||||
private void buildLonger(Hash peer) {
|
||||
long expiration = _context.clock().now() + 10*60*1000;
|
||||
|
||||
LoadTestTunnelConfig cfg = new LoadTestTunnelConfig(_context, 3, true);
|
||||
PooledTunnelCreatorConfig cfg = new PooledTunnelCreatorConfig(_context, 3, true);
|
||||
// cfg.getPeer() is ordered gateway first
|
||||
cfg.setPeer(0, peer);
|
||||
HopConfig hop = cfg.getConfig(0);
|
||||
@ -370,12 +544,61 @@ public class LoadTestManager {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Config for " + peer.toBase64() + " with fastPeer: " + fastPeer.toBase64() + ": " + cfg);
|
||||
|
||||
CreatedJob onCreated = new CreatedJob(_context, cfg);
|
||||
FailedJob fail = new FailedJob(_context, cfg);
|
||||
|
||||
LoadTestTunnelConfig ltCfg = new LoadTestTunnelConfig(cfg);
|
||||
CreatedJob onCreated = new CreatedJob(_context, ltCfg);
|
||||
FailedJob fail = new FailedJob(_context, ltCfg);
|
||||
RequestTunnelJob req = new RequestTunnelJob(_context, cfg, onCreated, fail, cfg.getLength()-1, false, true);
|
||||
_context.jobQueue().addJob(req);
|
||||
}
|
||||
|
||||
/**
|
||||
* If we are testing live tunnels, see if we want to test the one that was just created
|
||||
* fully.
|
||||
*/
|
||||
public void addTunnelTestCandidate(TunnelCreatorConfig cfg) {
|
||||
LoadTestTunnelConfig ltCfg = new LoadTestTunnelConfig(cfg);
|
||||
if (wantToTest(ltCfg)) {
|
||||
// wait briefly so everyone has their things in order (not really necessary...)
|
||||
long delay = _context.random().nextInt(30*1000) + 30*1000;
|
||||
SimpleTimer.getInstance().addEvent(new BeginTest(ltCfg), delay);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Testing " + cfg + ", with " + _active.size() + " active");
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Not testing " + cfg + " because we have " + _active.size() + " active: " + _active);
|
||||
}
|
||||
}
|
||||
public void removeTunnelTestCandidate(TunnelCreatorConfig cfg) { _active.remove(cfg); }
|
||||
|
||||
private class BeginTest implements SimpleTimer.TimedEvent {
|
||||
private LoadTestTunnelConfig _cfg;
|
||||
public BeginTest(LoadTestTunnelConfig cfg) {
|
||||
_cfg = cfg;
|
||||
}
|
||||
public void timeReached() {
|
||||
_context.jobQueue().addJob(new Expire(_context, _cfg, false));
|
||||
runTest(_cfg);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean wantToTest(LoadTestTunnelConfig cfg) {
|
||||
// wait 10 minutes before testing anything
|
||||
if (_context.router().getUptime() <= 10*60*1000) return false;
|
||||
|
||||
if (TEST_LIVE_TUNNELS && _active.size() < getConcurrency()) {
|
||||
// length == #hops+1 (as it includes the creator)
|
||||
if (cfg.getLength() < 2)
|
||||
return false;
|
||||
// only load test the client tunnels
|
||||
if (cfg.getTunnel().getDestination() == null)
|
||||
return false;
|
||||
_active.add(cfg);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private class CreatedJob extends JobImpl {
|
||||
private LoadTestTunnelConfig _cfg;
|
||||
@ -387,29 +610,33 @@ public class LoadTestManager {
|
||||
public void runJob() {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Tunnel created for testing peer " + _cfg.getPeer(0).toBase64());
|
||||
getContext().tunnelDispatcher().joinInbound(_cfg);
|
||||
getContext().tunnelDispatcher().joinInbound(_cfg.getTunnel());
|
||||
//log(_cfg, "joined");
|
||||
|
||||
_active.add(_cfg);
|
||||
Expire j = new Expire(getContext(), _cfg);
|
||||
_cfg.setExpireJob(j);
|
||||
//_cfg.setExpireJob(j);
|
||||
getContext().jobQueue().addJob(j);
|
||||
runTest(_cfg);
|
||||
}
|
||||
}
|
||||
private class Expire extends JobImpl {
|
||||
private LoadTestTunnelConfig _cfg;
|
||||
private boolean _removeFromDispatcher;
|
||||
public Expire(RouterContext ctx, LoadTestTunnelConfig cfg) {
|
||||
this(ctx, cfg, true);
|
||||
}
|
||||
public Expire(RouterContext ctx, LoadTestTunnelConfig cfg, boolean removeFromDispatcher) {
|
||||
super(ctx);
|
||||
_cfg = cfg;
|
||||
_removeFromDispatcher = removeFromDispatcher;
|
||||
getTiming().setStartAfter(cfg.getExpiration()+60*1000);
|
||||
}
|
||||
public String getName() { return "expire test tunnel"; }
|
||||
public void runJob() {
|
||||
getContext().tunnelDispatcher().remove(_cfg);
|
||||
log(_cfg, "expired after sending " + _cfg.getFullMessageCount() + " / " + _cfg.getFailedMessageCount());
|
||||
getContext().statManager().addRateData("test.lifetimeSuccessful", _cfg.getFullMessageCount(), _cfg.getFailedMessageCount());
|
||||
if (_cfg.getFailedMessageCount() > 0)
|
||||
getContext().statManager().addRateData("test.lifetimeFailed", _cfg.getFailedMessageCount(), _cfg.getFullMessageCount());
|
||||
if (_removeFromDispatcher)
|
||||
getContext().tunnelDispatcher().remove(_cfg.getTunnel());
|
||||
_cfg.logComplete();
|
||||
_active.remove(_cfg);
|
||||
}
|
||||
}
|
||||
private class FailedJob extends JobImpl {
|
||||
@ -426,17 +653,46 @@ public class LoadTestManager {
|
||||
}
|
||||
}
|
||||
|
||||
private class LoadTestTunnelConfig extends PooledTunnelCreatorConfig {
|
||||
private class LoadTestTunnelConfig {
|
||||
private TunnelCreatorConfig _cfg;
|
||||
private long _failed;
|
||||
private long _fullMessages;
|
||||
public LoadTestTunnelConfig(RouterContext ctx, int length, boolean isInbound) {
|
||||
super(ctx, length, isInbound);
|
||||
private TunnelInfo _testInbound;
|
||||
private TunnelInfo _testOutbound;
|
||||
private boolean _completed;
|
||||
public LoadTestTunnelConfig(TunnelCreatorConfig cfg) {
|
||||
_cfg = cfg;
|
||||
_failed = 0;
|
||||
_fullMessages = 0;
|
||||
_completed = false;
|
||||
}
|
||||
|
||||
public long getExpiration() { return _cfg.getExpiration(); }
|
||||
public Hash getPeer(int peer) { return _cfg.getPeer(peer); }
|
||||
public TunnelId getReceiveTunnelId(int peer) { return _cfg.getReceiveTunnelId(peer); }
|
||||
public TunnelId getSendTunnelId(int peer) { return _cfg.getSendTunnelId(peer); }
|
||||
public int getLength() { return _cfg.getLength(); }
|
||||
|
||||
public void incrementFailed() { ++_failed; }
|
||||
public long getFailedMessageCount() { return _failed; }
|
||||
public void incrementFull() { ++_fullMessages; }
|
||||
public long getFullMessageCount() { return _fullMessages; }
|
||||
public TunnelCreatorConfig getTunnel() { return _cfg; }
|
||||
public void setInbound(TunnelInfo info) { _testInbound = info; }
|
||||
public void setOutbound(TunnelInfo info) { _testOutbound = info; }
|
||||
public TunnelInfo getInbound() { return _testInbound; }
|
||||
public TunnelInfo getOutbound() { return _testOutbound; }
|
||||
public String toString() { return _cfg + ": failed=" + _failed + " full=" + _fullMessages; }
|
||||
|
||||
void logComplete() {
|
||||
if (_completed) return;
|
||||
_completed = true;
|
||||
LoadTestTunnelConfig cfg = LoadTestTunnelConfig.this;
|
||||
log(cfg, "expired after sending " + cfg.getFullMessageCount() + " / " + cfg.getFailedMessageCount()
|
||||
+ " in " + (10*60*1000l - (cfg.getExpiration()-_context.clock().now())));
|
||||
_context.statManager().addRateData("test.lifetimeSuccessful", cfg.getFullMessageCount(), cfg.getFailedMessageCount());
|
||||
if (cfg.getFailedMessageCount() > 0)
|
||||
_context.statManager().addRateData("test.lifetimeFailed", cfg.getFailedMessageCount(), cfg.getFullMessageCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.321 $ $Date: 2005/12/30 15:57:53 $";
|
||||
public final static String ID = "$Revision: 1.322 $ $Date: 2005/12/30 18:33:54 $";
|
||||
public final static String VERSION = "0.6.1.8";
|
||||
public final static long BUILD = 5;
|
||||
public final static long BUILD = 6;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
|
||||
System.out.println("Router ID: " + RouterVersion.ID);
|
||||
|
@ -34,6 +34,7 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec
|
||||
_log = ctx.logManager().getLog(InboundMessageDistributor.class);
|
||||
_receiver = new GarlicMessageReceiver(ctx, this, client);
|
||||
_context.statManager().createRateStat("tunnel.dropDangerousClientTunnelMessage", "How many tunnel messages come down a client tunnel that we shouldn't expect (lifetime is the 'I2NP type')", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("tunnel.handleLoadClove", "When do we receive load test cloves", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
}
|
||||
|
||||
public void distribute(I2NPMessage msg, Hash target) {
|
||||
@ -65,6 +66,8 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec
|
||||
// targetting us either implicitly (no target) or explicitly (no tunnel)
|
||||
// make sure we don't honor any remote requests directly (garlic instructions, etc)
|
||||
if (msg.getType() == GarlicMessage.MESSAGE_TYPE) {
|
||||
// in case we're looking for replies to a garlic message (cough load tests cough)
|
||||
_context.inNetMessagePool().handleReplies(msg);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("received garlic message in the tunnel, parse it out");
|
||||
_receiver.receive((GarlicMessage)msg);
|
||||
@ -149,6 +152,11 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Bad store attempt", iae);
|
||||
}
|
||||
} else if (data instanceof DataMessage) {
|
||||
// a data message targetting the local router is how we send load tests (real
|
||||
// data messages target destinations)
|
||||
_context.statManager().addRateData("tunnel.handleLoadClove", 1, 0);
|
||||
_context.inNetMessagePool().add(data, null, null);
|
||||
} else {
|
||||
if ( (_client != null) && (data.getType() != DeliveryStatusMessage.MESSAGE_TYPE) ) {
|
||||
// drop it, since the data we receive shouldn't include other stuff,
|
||||
|
@ -15,9 +15,11 @@ class ClientPeerSelector extends TunnelPeerSelector {
|
||||
if (length < 0)
|
||||
return null;
|
||||
HashSet matches = new HashSet(length);
|
||||
|
||||
if (shouldSelectExplicit(settings))
|
||||
return selectExplicit(ctx, settings, length);
|
||||
|
||||
if (length > 0) {
|
||||
if (shouldSelectExplicit(settings))
|
||||
return selectExplicit(ctx, settings, length);
|
||||
}
|
||||
|
||||
Set exclude = getExclude(ctx, settings.isInbound(), settings.isExploratory());
|
||||
ctx.profileOrganizer().selectFastPeers(length, exclude, matches);
|
||||
@ -31,5 +33,4 @@ class ClientPeerSelector extends TunnelPeerSelector {
|
||||
rv.add(ctx.routerHash());
|
||||
return rv;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (shouldSelectExplicit(settings)) {
|
||||
if (false && shouldSelectExplicit(settings)) {
|
||||
List rv = selectExplicit(ctx, settings, length);
|
||||
if (l.shouldLog(Log.DEBUG))
|
||||
l.debug("Explicit peers selected: " + rv);
|
||||
|
@ -29,7 +29,7 @@ class OnCreatedJob extends JobImpl {
|
||||
getContext().tunnelDispatcher().joinOutbound(_cfg);
|
||||
}
|
||||
|
||||
_pool.getManager().buildComplete();
|
||||
_pool.getManager().buildComplete(_cfg);
|
||||
_pool.addTunnel(_cfg);
|
||||
TestJob testJob = (_cfg.getLength() > 1 ? new TestJob(getContext(), _cfg, _pool) : null);
|
||||
RebuildJob rebuildJob = new RebuildJob(getContext(), _cfg, _pool);
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.i2p.router.tunnel.pool;
|
||||
|
||||
import java.util.*;
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.DataFormatException;
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.router.Router;
|
||||
@ -62,9 +63,12 @@ abstract class TunnelPeerSelector {
|
||||
}
|
||||
|
||||
protected boolean shouldSelectExplicit(TunnelPoolSettings settings) {
|
||||
if (settings.isExploratory()) return false;
|
||||
Properties opts = settings.getUnknownOptions();
|
||||
if (opts != null) {
|
||||
String peers = opts.getProperty("explicitPeers");
|
||||
if (peers == null)
|
||||
peers = I2PAppContext.getGlobalContext().getProperty("explicitPeers");
|
||||
if (peers != null)
|
||||
return true;
|
||||
}
|
||||
@ -77,6 +81,9 @@ abstract class TunnelPeerSelector {
|
||||
if (opts != null)
|
||||
peers = opts.getProperty("explicitPeers");
|
||||
|
||||
if (peers == null)
|
||||
peers = I2PAppContext.getGlobalContext().getProperty("explicitPeers");
|
||||
|
||||
Log log = ctx.logManager().getLog(ClientPeerSelector.class);
|
||||
List rv = new ArrayList();
|
||||
StringTokenizer tok = new StringTokenizer(peers, ",");
|
||||
@ -90,8 +97,8 @@ abstract class TunnelPeerSelector {
|
||||
if (ctx.profileOrganizer().isSelectable(peer)) {
|
||||
rv.add(peer);
|
||||
} else {
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn("Explicit peer is not selectable: " + peerStr);
|
||||
if (log.shouldLog(Log.DEBUG))
|
||||
log.debug("Explicit peer is not selectable: " + peerStr);
|
||||
}
|
||||
} catch (DataFormatException dfe) {
|
||||
if (log.shouldLog(Log.ERROR))
|
||||
@ -99,19 +106,34 @@ abstract class TunnelPeerSelector {
|
||||
}
|
||||
}
|
||||
|
||||
int sz = rv.size();
|
||||
Collections.shuffle(rv, ctx.random());
|
||||
|
||||
|
||||
while (rv.size() > length)
|
||||
rv.remove(0);
|
||||
|
||||
if (log.shouldLog(Log.INFO)) {
|
||||
StringBuffer buf = new StringBuffer();
|
||||
if (settings.getDestinationNickname() != null)
|
||||
buf.append("peers for ").append(settings.getDestinationNickname());
|
||||
else if (settings.getDestination() != null)
|
||||
buf.append("peers for ").append(settings.getDestination().toBase64());
|
||||
else
|
||||
buf.append("peers for exploratory ");
|
||||
if (settings.isInbound())
|
||||
buf.append(" inbound");
|
||||
else
|
||||
buf.append(" outbound");
|
||||
buf.append(" peers: ").append(rv);
|
||||
buf.append(", out of ").append(sz).append(" (not including self)");
|
||||
log.info(buf.toString());
|
||||
}
|
||||
|
||||
if (settings.isInbound())
|
||||
rv.add(0, ctx.routerHash());
|
||||
else
|
||||
rv.add(ctx.routerHash());
|
||||
|
||||
if (log.shouldLog(Log.INFO))
|
||||
log.info(toString() + ": Selecting peers explicitly: " + rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
|
@ -17,11 +17,13 @@ import net.i2p.stat.RateStat;
|
||||
import net.i2p.router.ClientTunnelSettings;
|
||||
import net.i2p.router.HandlerJobBuilder;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.LoadTestManager;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.router.TunnelManagerFacade;
|
||||
import net.i2p.router.TunnelPoolSettings;
|
||||
import net.i2p.router.tunnel.HopConfig;
|
||||
import net.i2p.router.tunnel.TunnelCreatorConfig;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
@ -40,6 +42,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
|
||||
private int _outstandingBuilds;
|
||||
/** max # of concurrent build requests */
|
||||
private int _maxOutstandingBuilds;
|
||||
private LoadTestManager _loadTestManager;
|
||||
|
||||
private static final String PROP_MAX_OUTSTANDING_BUILDS = "router.tunnel.maxConcurrentBuilds";
|
||||
private static final int DEFAULT_MAX_OUTSTANDING_BUILDS = 20;
|
||||
@ -70,6 +73,8 @@ public class TunnelPoolManager implements TunnelManagerFacade {
|
||||
}
|
||||
}
|
||||
|
||||
_loadTestManager = new LoadTestManager(_context);
|
||||
|
||||
ctx.statManager().createRateStat("tunnel.testSuccessTime",
|
||||
"How long do successful tunnel tests take?", "Tunnels",
|
||||
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
||||
@ -139,6 +144,10 @@ public class TunnelPoolManager implements TunnelManagerFacade {
|
||||
return info;
|
||||
}
|
||||
}
|
||||
info = _inboundExploratory.getTunnel(id);
|
||||
if (info != null) return info;
|
||||
info = _outboundExploratory.getTunnel(id);
|
||||
if (info != null) return info;
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -332,6 +341,10 @@ public class TunnelPoolManager implements TunnelManagerFacade {
|
||||
return rv.booleanValue();
|
||||
}
|
||||
|
||||
void buildComplete(TunnelCreatorConfig cfg) {
|
||||
buildComplete();
|
||||
_loadTestManager.addTunnelTestCandidate(cfg);
|
||||
}
|
||||
void buildComplete() {
|
||||
synchronized (this) {
|
||||
if (_outstandingBuilds > 0)
|
||||
@ -339,6 +352,9 @@ public class TunnelPoolManager implements TunnelManagerFacade {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final String PROP_LOAD_TEST = "router.loadTest";
|
||||
|
||||
public void startup() {
|
||||
TunnelBuilder builder = new TunnelBuilder();
|
||||
ExploratoryPeerSelector selector = new ExploratoryPeerSelector();
|
||||
@ -359,6 +375,10 @@ public class TunnelPoolManager implements TunnelManagerFacade {
|
||||
// try to build up longer tunnels
|
||||
_context.jobQueue().addJob(new BootstrapPool(_context, _inboundExploratory));
|
||||
_context.jobQueue().addJob(new BootstrapPool(_context, _outboundExploratory));
|
||||
|
||||
if (Boolean.valueOf(_context.getProperty(PROP_LOAD_TEST, "true")).booleanValue()) {
|
||||
_context.jobQueue().addJob(_loadTestManager.getTestJob());
|
||||
}
|
||||
}
|
||||
|
||||
private class BootstrapPool extends JobImpl {
|
||||
|
Reference in New Issue
Block a user