* Fix UDPEndpointTestStandalone:

- init context
  - Add unit test buildPacket() method to PacketBuilder
  - Fix NPE in MessageHistory
* Minor PacketPusher optimization
This commit is contained in:
zzz
2013-05-16 23:08:06 +00:00
parent fcdf837f33
commit a374f00613
5 changed files with 76 additions and 37 deletions

View File

@ -91,8 +91,13 @@ public class MessageHistory {
*/ */
public synchronized void initialize(boolean forceReinitialize) { public synchronized void initialize(boolean forceReinitialize) {
if (!forceReinitialize) return; if (!forceReinitialize) return;
Router router = _context.router();
if (router == null) {
// unit testing, presumably
return;
}
if (_context.router().getRouterInfo() == null) { if (router.getRouterInfo() == null) {
_reinitializeJob.getTiming().setStartAfter(_context.clock().now() + 15*1000); _reinitializeJob.getTiming().setStartAfter(_context.clock().now() + 15*1000);
_context.jobQueue().addJob(_reinitializeJob); _context.jobQueue().addJob(_reinitializeJob);
} else { } else {

View File

@ -156,6 +156,9 @@ class PacketBuilder {
*/ */
private static final int MAX_RESEND_ACKS_SMALL = 4; private static final int MAX_RESEND_ACKS_SMALL = 4;
/**
* @param transport may be null for unit testing only
*/
public PacketBuilder(I2PAppContext ctx, UDPTransport transport) { public PacketBuilder(I2PAppContext ctx, UDPTransport transport) {
_context = ctx; _context = ctx;
_transport = transport; _transport = transport;
@ -571,7 +574,7 @@ class PacketBuilder {
state.prepareSessionCreated(); state.prepareSessionCreated();
byte sentIP[] = state.getSentIP(); byte sentIP[] = state.getSentIP();
if ( (sentIP == null) || (sentIP.length <= 0) || ( (_transport != null) && (!_transport.isValid(sentIP)) ) ) { if ( (sentIP == null) || (sentIP.length <= 0) || (!_transport.isValid(sentIP))) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("How did our sent IP become invalid? " + state); _log.error("How did our sent IP become invalid? " + state);
state.fail(); state.fail();
@ -651,7 +654,7 @@ class PacketBuilder {
int off = HEADER_SIZE; int off = HEADER_SIZE;
byte toIP[] = state.getSentIP(); byte toIP[] = state.getSentIP();
if ( (_transport !=null) && (!_transport.isValid(toIP)) ) { if (!_transport.isValid(toIP)) {
packet.release(); packet.release();
return null; return null;
} }
@ -1252,7 +1255,7 @@ class PacketBuilder {
} }
/** /**
* Sends an empty unauthenticated packet for hole punching. * Creates an empty unauthenticated packet for hole punching.
* Parameters must be validated previously. * Parameters must be validated previously.
*/ */
public UDPPacket buildHolePunch(InetAddress to, int port) { public UDPPacket buildHolePunch(InetAddress to, int port) {
@ -1269,6 +1272,23 @@ class PacketBuilder {
return packet; return packet;
} }
/**
* TESTING ONLY.
* Creates an arbitrary packet for unit testing.
* Null transport in constructor OK.
*
* @param type 0-15
* @since IPv6
*/
public UDPPacket buildPacket(byte[] data, InetAddress to, int port) {
UDPPacket packet = UDPPacket.acquire(_context, false);
byte d[] = packet.getPacket().getData();
System.arraycopy(data, 0, d, 0, data.length);
packet.getPacket().setLength(data.length);
setTo(packet, to, port);
return packet;
}
/** /**
* Create a new packet and add the flag byte and the time stamp. * Create a new packet and add the flag byte and the time stamp.
* Caller should add data starting at HEADER_SIZE. * Caller should add data starting at HEADER_SIZE.

View File

@ -59,7 +59,6 @@ class PacketPusher implements Runnable {
* @since IPv6 * @since IPv6
*/ */
public void send(UDPPacket packet) { public void send(UDPPacket packet) {
boolean handled = false;
boolean isIPv4 = packet.getPacket().getAddress().getAddress().length == 4; boolean isIPv4 = packet.getPacket().getAddress().getAddress().length == 4;
for (int j = 0; j < _endpoints.size(); j++) { for (int j = 0; j < _endpoints.size(); j++) {
// Find the best endpoint (socket) to send this out. // Find the best endpoint (socket) to send this out.
@ -77,13 +76,11 @@ class PacketPusher implements Runnable {
((!isIPv4) && ep.isIPv6())) { ((!isIPv4) && ep.isIPv6())) {
// BLOCKING if queue is full // BLOCKING if queue is full
ep.getSender().add(packet); ep.getSender().add(packet);
handled = true; return;
break;
} }
} }
if (!handled) { // not handled
_log.error("No endpoint to send " + packet); _log.error("No endpoint to send " + packet);
packet.release(); packet.release();
} }
} }
}

View File

@ -26,6 +26,8 @@ class UDPSender {
private final BlockingQueue<UDPPacket> _outboundQueue; private final BlockingQueue<UDPPacket> _outboundQueue;
private volatile boolean _keepRunning; private volatile boolean _keepRunning;
private final Runner _runner; private final Runner _runner;
private final boolean _dummy;
private static final int TYPE_POISON = 99999; private static final int TYPE_POISON = 99999;
private static final int MIN_QUEUE_SIZE = 64; private static final int MIN_QUEUE_SIZE = 64;
@ -33,6 +35,7 @@ class UDPSender {
public UDPSender(RouterContext ctx, DatagramSocket socket, String name) { public UDPSender(RouterContext ctx, DatagramSocket socket, String name) {
_context = ctx; _context = ctx;
_dummy = false; // ctx.commSystem().isDummy();
_log = ctx.logManager().getLog(UDPSender.class); _log = ctx.logManager().getLog(UDPSender.class);
long maxMemory = Runtime.getRuntime().maxMemory(); long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE) if (maxMemory == Long.MAX_VALUE)
@ -179,7 +182,7 @@ class UDPSender {
_log.error("Dropping large UDP packet " + psz + " bytes: " + packet); _log.error("Dropping large UDP packet " + psz + " bytes: " + packet);
return; return;
} }
if (_context.commSystem().isDummy()) { if (_dummy) {
// testing // testing
// back to the cache // back to the cache
packet.release(); packet.release();

View File

@ -1,6 +1,8 @@
package net.i2p.router.transport.udp; package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -18,11 +20,11 @@ import net.i2p.util.Log;
* --zab * --zab
*/ */
public class UDPEndpointTestStandalone { public class UDPEndpointTestStandalone {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private UDPEndpoint _endpoints[]; private UDPEndpoint _endpoints[];
private boolean _beginTest; private volatile boolean _beginTest;
private List _sentNotReceived; private final List _sentNotReceived;
public UDPEndpointTestStandalone(RouterContext ctx) { public UDPEndpointTestStandalone(RouterContext ctx) {
_context = ctx; _context = ctx;
@ -33,7 +35,7 @@ public class UDPEndpointTestStandalone {
public void runTest(int numPeers) { public void runTest(int numPeers) {
_log.debug("Run test("+numPeers+")"); _log.debug("Run test("+numPeers+")");
_endpoints = new UDPEndpoint[numPeers]; _endpoints = new UDPEndpoint[numPeers];
int base = 2000 + _context.random().nextInt(10000); int base = 44000 + _context.random().nextInt(10000);
for (int i = 0; i < numPeers; i++) { for (int i = 0; i < numPeers; i++) {
_log.debug("Building " + i); _log.debug("Building " + i);
UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i, null); UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i, null);
@ -41,6 +43,7 @@ public class UDPEndpointTestStandalone {
try { try {
endpoint.startup(); endpoint.startup();
} catch (SocketException se) { } catch (SocketException se) {
_log.error("die", se);
throw new RuntimeException(se); throw new RuntimeException(se);
} }
I2PThread read = new I2PThread(new TestRead(endpoint), "Test read " + i); I2PThread read = new I2PThread(new TestRead(endpoint), "Test read " + i);
@ -55,7 +58,7 @@ public class UDPEndpointTestStandalone {
} }
private class TestRead implements Runnable { private class TestRead implements Runnable {
private UDPEndpoint _endpoint; private final UDPEndpoint _endpoint;
public TestRead(UDPEndpoint peer) { public TestRead(UDPEndpoint peer) {
_endpoint = peer; _endpoint = peer;
} }
@ -87,7 +90,7 @@ public class UDPEndpointTestStandalone {
} }
private class TestWrite implements Runnable { private class TestWrite implements Runnable {
private UDPEndpoint _endpoint; private final UDPEndpoint _endpoint;
public TestWrite(UDPEndpoint peer) { public TestWrite(UDPEndpoint peer) {
_endpoint = peer; _endpoint = peer;
} }
@ -96,8 +99,16 @@ public class UDPEndpointTestStandalone {
try { Thread.sleep(2000); } catch (InterruptedException ie) {} try { Thread.sleep(2000); } catch (InterruptedException ie) {}
} }
try { Thread.sleep(2000); } catch (InterruptedException ie) {} try { Thread.sleep(2000); } catch (InterruptedException ie) {}
PacketBuilder builder = new PacketBuilder(_context, null);
InetAddress localhost = null;
try {
localhost = InetAddress.getLocalHost();
} catch (UnknownHostException uhe) {
_log.error("die", uhe);
System.exit(0);
}
_log.debug("Beginning to write"); _log.debug("Beginning to write");
for (int curPacket = 0; curPacket < 10000; curPacket++) { for (int curPacket = 0; curPacket < 2000; curPacket++) {
byte data[] = new byte[1024]; byte data[] = new byte[1024];
_context.random().nextBytes(data); _context.random().nextBytes(data);
int curPeer = (curPacket % _endpoints.length); int curPeer = (curPacket % _endpoints.length);
@ -105,27 +116,21 @@ public class UDPEndpointTestStandalone {
curPeer++; curPeer++;
if (curPeer >= _endpoints.length) if (curPeer >= _endpoints.length)
curPeer = 0; curPeer = 0;
short priority = 1; UDPPacket packet = builder.buildPacket(data, localhost, _endpoints[curPeer].getListenPort());
long expiration = -1;
UDPPacket packet = UDPPacket.acquire(_context, true);
//try {
if (true) throw new RuntimeException("fixme");
//packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort());
// Following method is commented out in UDPPacket
//packet.writeData(data, 0, 1024);
packet.getPacket().setLength(1024);
int outstanding = _sentNotReceived.size() + 1; int outstanding = _sentNotReceived.size() + 1;
_sentNotReceived.add(new ByteArray(data, 0, 1024)); _sentNotReceived.add(new ByteArray(data, 0, 1024));
_log.debug("Sending packet " + curPacket + " with outstanding " + outstanding); _log.debug("Sending packet " + curPacket + " with outstanding " + outstanding);
try {
_endpoint.send(packet); _endpoint.send(packet);
//try { Thread.sleep(10); } catch (InterruptedException ie) {} } catch (Exception e) {
//} catch (UnknownHostException uhe) { _log.error("die", e);
// _log.error("foo!", uhe); break;
//}
//if (_log.shouldLog(Log.DEBUG)) {
// _log.debug("Sent to " + _endpoints[curPeer].getListenPort() + " from " + _endpoint.getListenPort());
//}
} }
try { Thread.sleep(3); } catch (InterruptedException ie) {}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Sent to " + _endpoints[curPeer].getListenPort() + " from " + _endpoint.getListenPort());
}
_log.debug("Done sending packets");
try { Thread.sleep(10*1000); } catch (InterruptedException e) {} try { Thread.sleep(10*1000); } catch (InterruptedException e) {}
System.exit(0); System.exit(0);
} }
@ -137,7 +142,16 @@ public class UDPEndpointTestStandalone {
Properties props = new Properties(); Properties props = new Properties();
props.setProperty("stat.logFile", "udpEndpointTest.stats"); props.setProperty("stat.logFile", "udpEndpointTest.stats");
props.setProperty("stat.logFilters", "*"); props.setProperty("stat.logFilters", "*");
UDPEndpointTestStandalone test = new UDPEndpointTestStandalone(new RouterContext(null, props)); props.setProperty("i2p.dummyClientFacade", "true");
props.setProperty("i2p.dummyNetDb", "true");
props.setProperty("i2p.dummyPeerManager", "true");
props.setProperty("i2p.dummyTunnelManager", "true");
props.setProperty("i2p.vmCommSystem", "true");
props.setProperty("i2np.bandwidth.inboundKBytesPerSecond", "9999");
props.setProperty("i2np.bandwidth.outboundKBytesPerSecond", "9999");
RouterContext ctx = new RouterContext(null, props);
ctx.initAll();
UDPEndpointTestStandalone test = new UDPEndpointTestStandalone(ctx);
test.runTest(2); test.runTest(2);
} }
} }