diff --git a/router/java/src/net/i2p/router/MessageHistory.java b/router/java/src/net/i2p/router/MessageHistory.java index 2ded9f03b8..66e13952f7 100644 --- a/router/java/src/net/i2p/router/MessageHistory.java +++ b/router/java/src/net/i2p/router/MessageHistory.java @@ -91,8 +91,13 @@ public class MessageHistory { */ public synchronized void initialize(boolean forceReinitialize) { if (!forceReinitialize) return; + Router router = _context.router(); + if (router == null) { + // unit testing, presumably + return; + } - if (_context.router().getRouterInfo() == null) { + if (router.getRouterInfo() == null) { _reinitializeJob.getTiming().setStartAfter(_context.clock().now() + 15*1000); _context.jobQueue().addJob(_reinitializeJob); } else { diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index c1d977a353..523141d2c5 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -156,6 +156,9 @@ class PacketBuilder { */ private static final int MAX_RESEND_ACKS_SMALL = 4; + /** + * @param transport may be null for unit testing only + */ public PacketBuilder(I2PAppContext ctx, UDPTransport transport) { _context = ctx; _transport = transport; @@ -571,7 +574,7 @@ class PacketBuilder { state.prepareSessionCreated(); byte sentIP[] = state.getSentIP(); - if ( (sentIP == null) || (sentIP.length <= 0) || ( (_transport != null) && (!_transport.isValid(sentIP)) ) ) { + if ( (sentIP == null) || (sentIP.length <= 0) || (!_transport.isValid(sentIP))) { if (_log.shouldLog(Log.ERROR)) _log.error("How did our sent IP become invalid? " + state); state.fail(); @@ -651,7 +654,7 @@ class PacketBuilder { int off = HEADER_SIZE; byte toIP[] = state.getSentIP(); - if ( (_transport !=null) && (!_transport.isValid(toIP)) ) { + if (!_transport.isValid(toIP)) { packet.release(); return null; } @@ -1252,7 +1255,7 @@ class PacketBuilder { } /** - * Sends an empty unauthenticated packet for hole punching. + * Creates an empty unauthenticated packet for hole punching. * Parameters must be validated previously. */ public UDPPacket buildHolePunch(InetAddress to, int port) { @@ -1269,6 +1272,23 @@ class PacketBuilder { return packet; } + /** + * TESTING ONLY. + * Creates an arbitrary packet for unit testing. + * Null transport in constructor OK. + * + * @param type 0-15 + * @since IPv6 + */ + public UDPPacket buildPacket(byte[] data, InetAddress to, int port) { + UDPPacket packet = UDPPacket.acquire(_context, false); + byte d[] = packet.getPacket().getData(); + System.arraycopy(data, 0, d, 0, data.length); + packet.getPacket().setLength(data.length); + setTo(packet, to, port); + return packet; + } + /** * Create a new packet and add the flag byte and the time stamp. * Caller should add data starting at HEADER_SIZE. diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java index f40e05e081..db7fb4ea34 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java @@ -59,7 +59,6 @@ class PacketPusher implements Runnable { * @since IPv6 */ public void send(UDPPacket packet) { - boolean handled = false; boolean isIPv4 = packet.getPacket().getAddress().getAddress().length == 4; for (int j = 0; j < _endpoints.size(); j++) { // Find the best endpoint (socket) to send this out. @@ -77,13 +76,11 @@ class PacketPusher implements Runnable { ((!isIPv4) && ep.isIPv6())) { // BLOCKING if queue is full ep.getSender().add(packet); - handled = true; - break; + return; } } - if (!handled) { - _log.error("No endpoint to send " + packet); - packet.release(); - } + // not handled + _log.error("No endpoint to send " + packet); + packet.release(); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index 40d16253c9..aa1affbb57 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -26,13 +26,16 @@ class UDPSender { private final BlockingQueue _outboundQueue; private volatile boolean _keepRunning; private final Runner _runner; + private final boolean _dummy; + private static final int TYPE_POISON = 99999; - + private static final int MIN_QUEUE_SIZE = 64; private static final int MAX_QUEUE_SIZE = 384; public UDPSender(RouterContext ctx, DatagramSocket socket, String name) { _context = ctx; + _dummy = false; // ctx.commSystem().isDummy(); _log = ctx.logManager().getLog(UDPSender.class); long maxMemory = Runtime.getRuntime().maxMemory(); if (maxMemory == Long.MAX_VALUE) @@ -179,7 +182,7 @@ class UDPSender { _log.error("Dropping large UDP packet " + psz + " bytes: " + packet); return; } - if (_context.commSystem().isDummy()) { + if (_dummy) { // testing // back to the cache packet.release(); diff --git a/router/java/test/junit/net/i2p/router/transport/udp/UDPEndpointTestStandalone.java b/router/java/test/junit/net/i2p/router/transport/udp/UDPEndpointTestStandalone.java index 6f83404d55..5e304f5602 100644 --- a/router/java/test/junit/net/i2p/router/transport/udp/UDPEndpointTestStandalone.java +++ b/router/java/test/junit/net/i2p/router/transport/udp/UDPEndpointTestStandalone.java @@ -1,6 +1,8 @@ package net.i2p.router.transport.udp; +import java.net.InetAddress; import java.net.SocketException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -18,11 +20,11 @@ import net.i2p.util.Log; * --zab */ public class UDPEndpointTestStandalone { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; private UDPEndpoint _endpoints[]; - private boolean _beginTest; - private List _sentNotReceived; + private volatile boolean _beginTest; + private final List _sentNotReceived; public UDPEndpointTestStandalone(RouterContext ctx) { _context = ctx; @@ -33,7 +35,7 @@ public class UDPEndpointTestStandalone { public void runTest(int numPeers) { _log.debug("Run test("+numPeers+")"); _endpoints = new UDPEndpoint[numPeers]; - int base = 2000 + _context.random().nextInt(10000); + int base = 44000 + _context.random().nextInt(10000); for (int i = 0; i < numPeers; i++) { _log.debug("Building " + i); UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i, null); @@ -41,6 +43,7 @@ public class UDPEndpointTestStandalone { try { endpoint.startup(); } catch (SocketException se) { + _log.error("die", se); throw new RuntimeException(se); } I2PThread read = new I2PThread(new TestRead(endpoint), "Test read " + i); @@ -55,7 +58,7 @@ public class UDPEndpointTestStandalone { } private class TestRead implements Runnable { - private UDPEndpoint _endpoint; + private final UDPEndpoint _endpoint; public TestRead(UDPEndpoint peer) { _endpoint = peer; } @@ -87,7 +90,7 @@ public class UDPEndpointTestStandalone { } private class TestWrite implements Runnable { - private UDPEndpoint _endpoint; + private final UDPEndpoint _endpoint; public TestWrite(UDPEndpoint peer) { _endpoint = peer; } @@ -96,8 +99,16 @@ public class UDPEndpointTestStandalone { try { Thread.sleep(2000); } catch (InterruptedException ie) {} } try { Thread.sleep(2000); } catch (InterruptedException ie) {} + PacketBuilder builder = new PacketBuilder(_context, null); + InetAddress localhost = null; + try { + localhost = InetAddress.getLocalHost(); + } catch (UnknownHostException uhe) { + _log.error("die", uhe); + System.exit(0); + } _log.debug("Beginning to write"); - for (int curPacket = 0; curPacket < 10000; curPacket++) { + for (int curPacket = 0; curPacket < 2000; curPacket++) { byte data[] = new byte[1024]; _context.random().nextBytes(data); int curPeer = (curPacket % _endpoints.length); @@ -105,27 +116,21 @@ public class UDPEndpointTestStandalone { curPeer++; if (curPeer >= _endpoints.length) curPeer = 0; - short priority = 1; - long expiration = -1; - UDPPacket packet = UDPPacket.acquire(_context, true); - //try { - if (true) throw new RuntimeException("fixme"); - //packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort()); - // Following method is commented out in UDPPacket - //packet.writeData(data, 0, 1024); - packet.getPacket().setLength(1024); + UDPPacket packet = builder.buildPacket(data, localhost, _endpoints[curPeer].getListenPort()); int outstanding = _sentNotReceived.size() + 1; _sentNotReceived.add(new ByteArray(data, 0, 1024)); _log.debug("Sending packet " + curPacket + " with outstanding " + outstanding); + try { _endpoint.send(packet); - //try { Thread.sleep(10); } catch (InterruptedException ie) {} - //} catch (UnknownHostException uhe) { - // _log.error("foo!", uhe); - //} - //if (_log.shouldLog(Log.DEBUG)) { + } catch (Exception e) { + _log.error("die", e); + break; + } + try { Thread.sleep(3); } catch (InterruptedException ie) {} + //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Sent to " + _endpoints[curPeer].getListenPort() + " from " + _endpoint.getListenPort()); - //} } + _log.debug("Done sending packets"); try { Thread.sleep(10*1000); } catch (InterruptedException e) {} System.exit(0); } @@ -137,7 +142,16 @@ public class UDPEndpointTestStandalone { Properties props = new Properties(); props.setProperty("stat.logFile", "udpEndpointTest.stats"); props.setProperty("stat.logFilters", "*"); - UDPEndpointTestStandalone test = new UDPEndpointTestStandalone(new RouterContext(null, props)); + props.setProperty("i2p.dummyClientFacade", "true"); + props.setProperty("i2p.dummyNetDb", "true"); + props.setProperty("i2p.dummyPeerManager", "true"); + props.setProperty("i2p.dummyTunnelManager", "true"); + props.setProperty("i2p.vmCommSystem", "true"); + props.setProperty("i2np.bandwidth.inboundKBytesPerSecond", "9999"); + props.setProperty("i2np.bandwidth.outboundKBytesPerSecond", "9999"); + RouterContext ctx = new RouterContext(null, props); + ctx.initAll(); + UDPEndpointTestStandalone test = new UDPEndpointTestStandalone(ctx); test.runTest(2); } }