diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
index 1d2ef075a..c4cdc9a7a 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
@@ -242,8 +242,8 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
+ ex.getMessage() + "\")");
} catch (IOException ex) {
if (!finished) {
- if (_log.shouldLog(Log.ERROR))
- _log.error(direction + ": Error forwarding", ex);
+ if (_log.shouldLog(Log.WARN))
+ _log.warn(direction + ": Error forwarding", ex);
}
//else
// _log.warn("You may ignore this", ex);
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java
index e6238d28f..0a11db920 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java
@@ -100,7 +100,8 @@ public class I2PSocketManagerFactory {
//p.setProperty("tunnels.depthInbound", "0");
}
- opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost);
+ if (i2cpHost != null)
+ opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost);
opts.setProperty(I2PClient.PROP_TCP_PORT, "" + i2cpPort);
try {
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java b/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java
new file mode 100644
index 000000000..79ba6089c
--- /dev/null
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java
@@ -0,0 +1,218 @@
+package net.i2p.client.streaming;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import java.util.Map;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import net.i2p.I2PAppContext;
+import net.i2p.client.I2PClientFactory;
+import net.i2p.data.DataHelper;
+import net.i2p.data.Destination;
+import net.i2p.util.Log;
+import net.i2p.util.I2PThread;
+
+/**
+ * Sit around on a destination, receiving lots of data and sending lots of
+ * data to whomever talks to us.
+ *
+ * Usage: TestSwarm myKeyFile [peerDestFile ]*
+ *
+ */
+public class TestSwarm {
+ private I2PAppContext _context;
+ private Log _log;
+ private String _destFile;
+ private String _peerDestFiles[];
+ private String _conOptions;
+ private I2PSocketManager _manager;
+ private boolean _dead;
+
+ public static void main(String args[]) {
+ if (args.length < 1) {
+ System.err.println("Usage: TestSwarm myDestFile [peerDestFile ]*");
+ return;
+ }
+ I2PAppContext ctx = new I2PAppContext();
+ String files[] = new String[args.length - 1];
+ System.arraycopy(args, 1, files, 0, files.length);
+ TestSwarm swarm = new TestSwarm(ctx, args[0], files);
+ swarm.startup();
+ }
+
+ public TestSwarm(I2PAppContext ctx, String destFile, String peerDestFiles[]) {
+ _context = ctx;
+ _log = ctx.logManager().getLog(TestSwarm.class);
+ _dead = false;
+ _destFile = destFile;
+ _peerDestFiles = peerDestFiles;
+ _conOptions = "";
+ }
+
+ public void startup() {
+ _log.debug("Starting up");
+ File keys = new File(_destFile);
+ if (!keys.exists()) {
+ try {
+ I2PClientFactory.createClient().createDestination(new FileOutputStream(keys));
+ } catch (Exception e) {
+ _log.error("Error creating a new destination on " + keys, e);
+ return;
+ }
+ }
+ try {
+ _manager = I2PSocketManagerFactory.createManager(new FileInputStream(_destFile), "localhost", 10001, null);
+ } catch (Exception e) {
+ _log.error("Error creatign the manager", e);
+ return;
+ }
+
+ I2PThread listener = new I2PThread(new Listener(), "Listener");
+ listener.start();
+
+ connectWithPeers();
+ }
+
+
+ private void connectWithPeers() {
+ if (_peerDestFiles != null) {
+ for (int i = 0; i < _peerDestFiles.length; i++) {
+ try {
+ FileInputStream fin = new FileInputStream(_peerDestFiles[i]);
+ Destination dest = new Destination();
+ dest.readBytes(fin);
+
+ I2PThread flooder = new I2PThread(new Flooder(dest), "Flooder+" + dest.calculateHash().toBase64().substring(0,4));
+ flooder.start();
+ } catch (Exception e) {
+ _log.error("Unable to read the peer from " + _peerDestFiles[i], e);
+ }
+ }
+ }
+ }
+
+ private class Listener implements Runnable {
+ public void run() {
+ try {
+ I2PServerSocket ss = _manager.getServerSocket();
+ I2PSocket s = null;
+ while ( (s = ss.accept()) != null) {
+ I2PThread flooder = new I2PThread(new Flooder(s), "Flooder-" + s.getPeerDestination().calculateHash().toBase64().substring(0,4));
+ flooder.start();
+ }
+ } catch (Exception e) {
+ _log.error("Error listening", e);
+ }
+ }
+ }
+
+ private static volatile long __conId = 0;
+ private class Flooder implements Runnable {
+ private Destination _remoteDestination;
+ private I2PSocket _socket;
+ private boolean _closed;
+ private long _started;
+ private long _totalSent;
+ private long _totalReceived;
+ private long _lastReceived;
+ private long _lastReceivedOn;
+ private long _connectionId;
+
+ public Flooder(Destination dest) {
+ _socket = null;
+ _remoteDestination = dest;
+ _connectionId = ++__conId;
+ _closed = false;
+ _lastReceived = -1;
+ _lastReceivedOn = _context.clock().now();
+ _context.statManager().createRateStat("swarm." + _connectionId + ".totalReceived", "Data size received", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 });
+ _context.statManager().createRateStat("swarm." + _connectionId + ".totalSent", "Data size sent", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 });
+ _context.statManager().createRateStat("swarm." + _connectionId + ".started", "When we start", "swarm", new long[] { 5*60*1000 });
+ _context.statManager().createRateStat("swarm." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 });
+ }
+
+ public Flooder(I2PSocket socket) {
+ _socket = socket;
+ _remoteDestination = socket.getPeerDestination();
+ _connectionId = ++__conId;
+ _closed = false;
+ _lastReceived = -1;
+ _lastReceivedOn = _context.clock().now();
+ _context.statManager().createRateStat("swarm." + _connectionId + ".totalReceived", "Data size received", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 });
+ _context.statManager().createRateStat("swarm." + _connectionId + ".totalSent", "Data size sent", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 });
+ _context.statManager().createRateStat("swarm." + _connectionId + ".started", "When we start", "swarm", new long[] { 5*60*1000 });
+ _context.statManager().createRateStat("swarm." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 });
+ }
+
+ public long getConnectionId() { return _connectionId; }
+ public Destination getDestination() { return _remoteDestination; }
+
+ public void run() {
+ _started = _context.clock().now();
+ _context.statManager().addRateData("swarm." + _connectionId + ".started", 1, 0);
+ byte data[] = new byte[32*1024];
+ long value = 0;
+ long lastSend = _context.clock().now();
+ if (_socket == null) {
+ try {
+ _socket = _manager.connect(_remoteDestination);
+ } catch (Exception e) {
+ _log.error("Error connecting to " + _remoteDestination.calculateHash().toBase64().substring(0,4));
+ return;
+ }
+ }
+
+ I2PThread floodListener = new I2PThread(new FloodListener(), "FloodListener" + _connectionId);
+ floodListener.start();
+
+ try {
+ OutputStream out = _socket.getOutputStream();
+ while (!_closed) {
+ out.write(data);
+ // out.flush();
+ _totalSent += data.length;
+ _context.statManager().addRateData("swarm." + _connectionId + ".totalSent", _totalSent, 0);
+ //try { Thread.sleep(100); } catch (InterruptedException ie) {}
+ long now = _context.clock().now();
+ _log.debug("Sending " + _connectionId + " after " + (now-lastSend));
+ lastSend = now;
+ try { Thread.sleep(20); } catch (InterruptedException ie) {}
+ }
+ } catch (Exception e) {
+ _log.error("Error sending", e);
+ }
+ }
+
+ private class FloodListener implements Runnable {
+ public void run() {
+ long lastRead = System.currentTimeMillis();
+ long now = lastRead;
+ try {
+ InputStream in = _socket.getInputStream();
+ byte buf[] = new byte[32*1024];
+ int read = 0;
+ while ( (read = in.read(buf)) != -1) {
+ now = System.currentTimeMillis();
+ _totalReceived += read;
+ _context.statManager().addRateData("swarm." + getConnectionId() + ".totalReceived", _totalReceived, 0);
+ _log.debug("Receiving " + _connectionId + " with " + read + " after " + (now-lastRead));
+ lastRead = now;
+ }
+ } catch (Exception e) {
+ _log.error("Error listening to the flood", e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/apps/sam/java/build.xml b/apps/sam/java/build.xml
index 065f0a9be..cdeb2d0ef 100644
--- a/apps/sam/java/build.xml
+++ b/apps/sam/java/build.xml
@@ -10,7 +10,7 @@
diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java
index b98f5391d..630b0a224 100644
--- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java
+++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java
@@ -28,8 +28,10 @@ import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketManagerFactory;
import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.data.Base64;
+import net.i2p.data.ByteArray;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
+import net.i2p.util.ByteCache;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@@ -199,7 +201,7 @@ public class SAMStreamSession {
*
* @return True if the data was sent, false otherwise
*/
- public boolean sendBytes(int id, byte[] data) {
+ public boolean sendBytes(int id, InputStream in, int size) {
Destination d = new Destination();
SAMStreamSessionSocketHandler handler = getSocketHandler(id);
@@ -208,7 +210,7 @@ public class SAMStreamSession {
return false;
}
- return handler.sendBytes(data);
+ return handler.sendBytes(in, size);
}
/**
@@ -313,8 +315,7 @@ public class SAMStreamSession {
}
if (removed == null) {
- _log.error("BUG! Trying to remove inexistent SAM STREAM session socket handler " + id);
- recv.stopStreamReceiving();
+ // ignore - likely the socket handler was already removed by removeAllSocketHandlers
} else {
removed.stopRunning();
_log.debug("Removed SAM STREAM session socket handler " + id);
@@ -462,17 +463,29 @@ public class SAMStreamSession {
*
* @return True if data has been sent without errors, false otherwise
*/
- public boolean sendBytes(byte[] data) {
+ public boolean sendBytes(InputStream in, int size) { // byte[] data) {
if (_log.shouldLog(Log.DEBUG)) {
- _log.debug("Handler " + id + ": sending " + data.length
+ _log.debug("Handler " + id + ": sending " + size
+ " bytes");
}
+ ByteCache cache = ByteCache.getInstance(1024, 4);
+ ByteArray ba = cache.acquire();
try {
- i2pSocketOS.write(data);
+ int sent = 0;
+ byte buf[] = ba.getData();
+ while (sent < size) {
+ int read = in.read(buf);
+ if (read == -1)
+ throw new IOException("Insufficient data from the SAM client (" + sent + "/" + size + ")");
+ i2pSocketOS.write(buf, 0, read);
+ sent += read;
+ }
//i2pSocketOS.flush();
} catch (IOException e) {
_log.error("Error sending data through I2P socket", e);
return false;
+ } finally {
+ cache.release(ba);
}
return true;
diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java
index 822fa5c72..eb3807cbf 100644
--- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java
+++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java
@@ -44,6 +44,9 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
private SAMDatagramSession datagramSession = null;
private SAMStreamSession streamSession = null;
+ private long _id;
+ private static volatile long __id = 0;
+
/**
* Create a new SAM version 1 handler. This constructor expects
* that the SAM HELLO message has been still answered (and
@@ -68,6 +71,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
*/
public SAMv1Handler(Socket s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
super(s, verMajor, verMinor, i2cpProps);
+ _id = ++__id;
_log.debug("SAM version 1 handler instantiated");
if ((this.verMajor != 1) || (this.verMinor != 0)) {
@@ -82,7 +86,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
StringTokenizer tok;
Properties props;
- this.thread.setName("SAMv1Handler");
+ this.thread.setName("SAMv1Handler " + _id);
_log.debug("SAM handling started");
try {
@@ -550,12 +554,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
try {
- DataInputStream in = new DataInputStream(getClientSocketInputStream());
- byte[] data = new byte[size];
-
- in.readFully(data);
-
- if (!streamSession.sendBytes(id, data)) {
+ if (!streamSession.sendBytes(id, getClientSocketInputStream(), size)) { // data)) {
_log.error("STREAM SEND failed");
boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n");
streamSession.closeConnection(id);
@@ -801,7 +800,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
public void stopStreamReceiving() {
- _log.debug("stopStreamReceiving() invoked");
+ _log.debug("stopStreamReceiving() invoked", new Exception("stopped"));
if (streamSession == null) {
_log.error("BUG! Got stream receiving stop, but session is null!");
diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java b/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java
new file mode 100644
index 000000000..5388f5dc2
--- /dev/null
+++ b/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java
@@ -0,0 +1,18 @@
+package net.i2p.sam.client;
+
+import java.util.Properties;
+
+/**
+ * Basic noop client event listener
+ */
+public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListener {
+ public void destReplyReceived(String publicKey, String privateKey) {}
+ public void helloReplyReceived(boolean ok) {}
+ public void namingReplyReceived(String name, String result, String value, String message) {}
+ public void sessionStatusReceived(String result, String destination, String message) {}
+ public void streamClosedReceived(String result, int id, String message) {}
+ public void streamConnectedReceived(String remoteDestination, int id) {}
+ public void streamDataReceived(int id, byte[] data, int offset, int length) {}
+ public void streamStatusReceived(String result, int id, String message) {}
+ public void unknownMessageReceived(String major, String minor, Properties params) {}
+}
diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java
new file mode 100644
index 000000000..9641057a0
--- /dev/null
+++ b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java
@@ -0,0 +1,127 @@
+package net.i2p.sam.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import net.i2p.I2PAppContext;
+import net.i2p.util.Log;
+
+/**
+ * Simple helper implementation of a the SAMClientEventListener
+ *
+ */
+public class SAMEventHandler extends SAMClientEventListenerImpl {
+ private I2PAppContext _context;
+ private Log _log;
+ private Boolean _helloOk;
+ private Object _helloLock = new Object();
+ private Boolean _sessionCreateOk;
+ private Object _sessionCreateLock = new Object();
+ private Object _namingReplyLock = new Object();
+ private Map _namingReplies = new HashMap();
+
+ public SAMEventHandler(I2PAppContext ctx) {
+ _context = ctx;
+ _log = ctx.logManager().getLog(getClass());
+ }
+
+ public void helloReplyReceived(boolean ok) {
+ synchronized (_helloLock) {
+ if (ok)
+ _helloOk = Boolean.TRUE;
+ else
+ _helloOk = Boolean.FALSE;
+ _helloLock.notifyAll();
+ }
+ }
+
+ public void sessionStatusReceived(String result, String destination, String msg) {
+ synchronized (_sessionCreateLock) {
+ if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result))
+ _sessionCreateOk = Boolean.TRUE;
+ else
+ _sessionCreateOk = Boolean.FALSE;
+ _sessionCreateLock.notifyAll();
+ }
+ }
+
+ public void namingReplyReceived(String name, String result, String value, String msg) {
+ synchronized (_namingReplyLock) {
+ if (SAMReader.SAMClientEventListener.NAMING_REPLY_OK.equals(result))
+ _namingReplies.put(name, value);
+ else
+ _namingReplies.put(name, result);
+ _namingReplyLock.notifyAll();
+ }
+ }
+
+ public void unknownMessageReceived(String major, String minor, Properties params) {
+ _log.error("wrt, [" + major + "] [" + minor + "] [" + params + "]");
+ }
+
+
+ //
+ // blocking lookup calls below
+ //
+
+ /**
+ * Wait for the connection to be established, returning true if everything
+ * went ok
+ */
+ public boolean waitForHelloReply() {
+ while (true) {
+ try {
+ synchronized (_helloLock) {
+ if (_helloOk == null)
+ _helloLock.wait();
+ else
+ return _helloOk.booleanValue();
+ }
+ } catch (InterruptedException ie) {}
+ }
+ }
+
+ /**
+ * Wait for the session to be created, returning true if everything went ok
+ *
+ */
+ public boolean waitForSessionCreateReply() {
+ while (true) {
+ try {
+ synchronized (_sessionCreateLock) {
+ if (_sessionCreateOk == null)
+ _sessionCreateLock.wait();
+ else
+ return _sessionCreateOk.booleanValue();
+ }
+ } catch (InterruptedException ie) {}
+ }
+ }
+
+ /**
+ * Return the destination found matching the name, or null if the key was
+ * not able to be retrieved.
+ *
+ * @param name name to be looked for, or "ME"
+ */
+ public String waitForNamingReply(String name) {
+ while (true) {
+ try {
+ synchronized (_namingReplyLock) {
+ String val = (String)_namingReplies.remove(name);
+ if (val == null) {
+ _namingReplyLock.wait();
+ } else {
+ if (SAMReader.SAMClientEventListener.NAMING_REPLY_INVALID_KEY.equals(val))
+ return null;
+ else if (SAMReader.SAMClientEventListener.NAMING_REPLY_KEY_NOT_FOUND.equals(val))
+ return null;
+ else
+ return val;
+ }
+ }
+ } catch (InterruptedException ie) {}
+ }
+ }
+}
diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMReader.java b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java
new file mode 100644
index 000000000..7e87bf643
--- /dev/null
+++ b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java
@@ -0,0 +1,253 @@
+package net.i2p.sam.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import net.i2p.I2PAppContext;
+import net.i2p.data.DataHelper;
+import net.i2p.util.Log;
+import net.i2p.util.I2PThread;
+
+/**
+ * Read from a socket, producing events for any SAM message read
+ *
+ */
+public class SAMReader {
+ private Log _log;
+ private InputStream _inRaw;
+ private SAMClientEventListener _listener;
+ private boolean _live;
+
+ public SAMReader(I2PAppContext context, InputStream samIn, SAMClientEventListener listener) {
+ _log = context.logManager().getLog(SAMReader.class);
+ _inRaw = samIn;
+ _listener = listener;
+ }
+
+ public void startReading() {
+ _live = true;
+ I2PThread t = new I2PThread(new Runner(), "SAM reader");
+ t.start();
+ }
+ public void stopReading() { _live = false; }
+
+ /**
+ * Async event notification interface for SAM clients
+ *
+ */
+ public interface SAMClientEventListener {
+ public static final String SESSION_STATUS_OK = "OK";
+ public static final String SESSION_STATUS_DUPLICATE_DEST = "DUPLICATE_DEST";
+ public static final String SESSION_STATUS_I2P_ERROR = "I2P_ERROR";
+ public static final String SESSION_STATUS_INVALID_KEY = "INVALID_KEY";
+
+ public static final String STREAM_STATUS_OK = "OK";
+ public static final String STREAM_STATUS_CANT_REACH_PEER = "CANT_REACH_PEER";
+ public static final String STREAM_STATUS_I2P_ERROR = "I2P_ERROR";
+ public static final String STREAM_STATUS_INVALID_KEY = "INVALID_KEY";
+ public static final String STREAM_STATUS_TIMEOUT = "TIMEOUT";
+
+ public static final String STREAM_CLOSED_OK = "OK";
+ public static final String STREAM_CLOSED_CANT_REACH_PEER = "CANT_REACH_PEER";
+ public static final String STREAM_CLOSED_I2P_ERROR = "I2P_ERROR";
+ public static final String STREAM_CLOSED_PEER_NOT_FOUND = "PEER_NOT_FOUND";
+ public static final String STREAM_CLOSED_TIMEOUT = "CLOSED";
+
+ public static final String NAMING_REPLY_OK = "OK";
+ public static final String NAMING_REPLY_INVALID_KEY = "INVALID_KEY";
+ public static final String NAMING_REPLY_KEY_NOT_FOUND = "KEY_NOT_FOUND";
+
+ public void helloReplyReceived(boolean ok);
+ public void sessionStatusReceived(String result, String destination, String message);
+ public void streamStatusReceived(String result, int id, String message);
+ public void streamConnectedReceived(String remoteDestination, int id);
+ public void streamClosedReceived(String result, int id, String message);
+ public void streamDataReceived(int id, byte data[], int offset, int length);
+ public void namingReplyReceived(String name, String result, String value, String message);
+ public void destReplyReceived(String publicKey, String privateKey);
+
+ public void unknownMessageReceived(String major, String minor, Properties params);
+ }
+
+ private class Runner implements Runnable {
+ public void run() {
+ Properties params = new Properties();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(80);
+ while (_live) {
+
+ try {
+ int c = -1;
+ while ((c = _inRaw.read()) != -1) {
+ if (c == '\n') {
+ break;
+ }
+ baos.write(c);
+ }
+ if (c == -1) {
+ _log.error("Error reading from the SAM bridge");
+ return;
+ }
+ } catch (IOException ioe) {
+ _log.error("Error reading from SAM", ioe);
+ }
+
+ String line = new String(baos.toByteArray());
+ baos.reset();
+
+ if (line == null) {
+ _log.info("No more data from the SAM bridge");
+ break;
+ }
+
+ _log.debug("Line read from the bridge: " + line);
+
+ StringTokenizer tok = new StringTokenizer(line);
+
+ if (tok.countTokens() < 2) {
+ _log.error("Invalid SAM line: [" + line + "]");
+ _live = false;
+ return;
+ }
+
+ String major = tok.nextToken();
+ String minor = tok.nextToken();
+
+ params.clear();
+ while (tok.hasMoreTokens()) {
+ String pair = tok.nextToken();
+ int eq = pair.indexOf('=');
+ if ( (eq > 0) && (eq < pair.length() - 1) ) {
+ String name = pair.substring(0, eq);
+ String val = pair.substring(eq+1);
+ while ( (val.charAt(0) == '\"') && (val.length() > 0) )
+ val = val.substring(1);
+ while ( (val.length() > 0) && (val.charAt(val.length()-1) == '\"') )
+ val = val.substring(0, val.length()-1);
+ params.setProperty(name, val);
+ }
+ }
+
+ processEvent(major, minor, params);
+ }
+ }
+ }
+
+ /**
+ * Big ugly method parsing everything. If I cared, I'd factor this out into
+ * a dozen tiny methods.
+ *
+ */
+ private void processEvent(String major, String minor, Properties params) {
+ if ("HELLO".equals(major)) {
+ if ("REPLY".equals(minor)) {
+ String result = params.getProperty("RESULT");
+ if ("OK".equals(result))
+ _listener.helloReplyReceived(true);
+ else
+ _listener.helloReplyReceived(false);
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else if ("SESSION".equals(major)) {
+ if ("STATUS".equals(minor)) {
+ String result = params.getProperty("RESULT");
+ String dest = params.getProperty("DESTINATION");
+ String msg = params.getProperty("MESSAGE");
+ _listener.sessionStatusReceived(result, dest, msg);
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else if ("STREAM".equals(major)) {
+ if ("STATUS".equals(minor)) {
+ String result = params.getProperty("RESULT");
+ String id = params.getProperty("ID");
+ String msg = params.getProperty("MESSAGE");
+ if (id != null) {
+ try {
+ _listener.streamStatusReceived(result, Integer.parseInt(id), msg);
+ } catch (NumberFormatException nfe) {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else if ("CONNECTED".equals(minor)) {
+ String dest = params.getProperty("DESTINATION");
+ String id = params.getProperty("ID");
+ if (id != null) {
+ try {
+ _listener.streamConnectedReceived(dest, Integer.parseInt(id));
+ } catch (NumberFormatException nfe) {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else if ("CLOSED".equals(minor)) {
+ String result = params.getProperty("RESULT");
+ String id = params.getProperty("ID");
+ String msg = params.getProperty("MESSAGE");
+ if (id != null) {
+ try {
+ _listener.streamClosedReceived(result, Integer.parseInt(id), msg);
+ } catch (NumberFormatException nfe) {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else if ("RECEIVED".equals(minor)) {
+ String id = params.getProperty("ID");
+ String size = params.getProperty("SIZE");
+ if (id != null) {
+ try {
+ int idVal = Integer.parseInt(id);
+ int sizeVal = Integer.parseInt(size);
+
+ byte data[] = new byte[sizeVal];
+ int read = DataHelper.read(_inRaw, data);
+ if (read != sizeVal) {
+ _listener.unknownMessageReceived(major, minor, params);
+ } else {
+ _listener.streamDataReceived(idVal, data, 0, sizeVal);
+ }
+ } catch (NumberFormatException nfe) {
+ _listener.unknownMessageReceived(major, minor, params);
+ } catch (IOException ioe) {
+ _live = false;
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else if ("NAMING".equals(major)) {
+ if ("REPLY".equals(minor)) {
+ String name = params.getProperty("NAME");
+ String result = params.getProperty("RESULT");
+ String value = params.getProperty("VALUE");
+ String msg = params.getProperty("MESSAGE");
+ _listener.namingReplyReceived(name, result, value, msg);
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else if ("DEST".equals(major)) {
+ if ("REPLY".equals(minor)) {
+ String pub = params.getProperty("PUB");
+ String priv = params.getProperty("PRIV");
+ _listener.destReplyReceived(pub, priv);
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ } else {
+ _listener.unknownMessageReceived(major, minor, params);
+ }
+ }
+}
diff --git a/apps/sam/java/test/net/i2p/sam/TestSwarm.java b/apps/sam/java/test/net/i2p/sam/TestSwarm.java
new file mode 100644
index 000000000..7561e5c73
--- /dev/null
+++ b/apps/sam/java/test/net/i2p/sam/TestSwarm.java
@@ -0,0 +1,312 @@
+package net.i2p.sam;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import java.util.Map;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import net.i2p.I2PAppContext;
+import net.i2p.data.DataHelper;
+import net.i2p.util.Log;
+import net.i2p.util.I2PThread;
+
+import net.i2p.sam.client.SAMEventHandler;
+import net.i2p.sam.client.SAMClientEventListenerImpl;
+import net.i2p.sam.client.SAMReader;
+
+/**
+ * Sit around on a SAM destination, receiving lots of data and sending lots of
+ * data to whomever talks to us.
+ *
+ * Usage: TestSwarm samHost samPort myKeyFile [peerDestFile ]*
+ *
+ */
+public class TestSwarm {
+ private I2PAppContext _context;
+ private Log _log;
+ private String _samHost;
+ private String _samPort;
+ private String _destFile;
+ private String _peerDestFiles[];
+ private String _conOptions;
+ private Socket _samSocket;
+ private OutputStream _samOut;
+ private InputStream _samIn;
+ private SAMReader _reader;
+ private boolean _dead;
+ private SAMEventHandler _eventHandler;
+ /** Connection id (Integer) to peer (Flooder) */
+ private Map _remotePeers;
+
+ public static void main(String args[]) {
+ if (args.length < 3) {
+ System.err.println("Usage: TestSwarm samHost samPort myDestFile [peerDestFile ]*");
+ return;
+ }
+ I2PAppContext ctx = new I2PAppContext();
+ String files[] = new String[args.length - 3];
+ System.arraycopy(args, 3, files, 0, files.length);
+ TestSwarm swarm = new TestSwarm(ctx, args[0], args[1], args[2], files);
+ swarm.startup();
+ }
+
+ public TestSwarm(I2PAppContext ctx, String samHost, String samPort, String destFile, String peerDestFiles[]) {
+ _context = ctx;
+ _log = ctx.logManager().getLog(TestSwarm.class);
+ _dead = false;
+ _samHost = samHost;
+ _samPort = samPort;
+ _destFile = destFile;
+ _peerDestFiles = peerDestFiles;
+ _conOptions = "";
+ _eventHandler = new SwarmEventHandler(_context);
+ _remotePeers = new HashMap();
+ }
+
+ public void startup() {
+ _log.debug("Starting up");
+ boolean ok = connect();
+ _log.debug("Connected: " + ok);
+ if (ok) {
+ _reader = new SAMReader(_context, _samIn, _eventHandler);
+ _reader.startReading();
+ _log.debug("Reader created");
+ String ourDest = handshake();
+ _log.debug("Handshake complete. we are " + ourDest);
+ if (ourDest != null) {
+ boolean written = writeDest(ourDest);
+ _log.debug("Dest written");
+ if (written) {
+ connectWithPeers();
+ _log.debug("connected with peers");
+ }
+ }
+ }
+ }
+
+ private class SwarmEventHandler extends SAMEventHandler {
+ public SwarmEventHandler(I2PAppContext ctx) { super(ctx); }
+ public void streamClosedReceived(String result, int id, String message) {
+ Flooder flooder = null;
+ synchronized (_remotePeers) {
+ flooder = (Flooder)_remotePeers.remove(new Integer(id));
+ }
+ if (flooder != null) {
+ flooder.closed();
+ _log.debug("Connection " + flooder.getConnectionId() + " closed to " + flooder.getDestination());
+ } else {
+ _log.error("wtf, not connected to " + id + " but we were just closed?");
+ }
+ }
+ public void streamDataReceived(int id, byte data[], int offset, int length) {
+ Flooder flooder = null;
+ synchronized (_remotePeers) {
+ flooder = (Flooder)_remotePeers.get(new Integer(id));
+ }
+ long value = DataHelper.fromLong(data, 0, 4);
+ if (flooder != null) {
+ flooder.received(length, value);
+ } else {
+ _log.error("wtf, not connected to " + id + " but we received " + value + "?");
+ }
+ }
+ public void streamConnectedReceived(String dest, int id) {
+ _log.debug("Connection " + id + " received from " + dest);
+
+ Flooder flooder = new Flooder(id, dest);
+ synchronized (_remotePeers) {
+ _remotePeers.put(new Integer(id), flooder);
+ }
+ I2PThread t = new I2PThread(flooder, "Flood " + id);
+ t.start();
+ }
+
+ }
+
+ private boolean connect() {
+ try {
+ _samSocket = new Socket(_samHost, Integer.parseInt(_samPort));
+ _samOut = _samSocket.getOutputStream();
+ _samIn = _samSocket.getInputStream();
+ return true;
+ } catch (Exception e) {
+ _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
+ return false;
+ }
+ }
+
+ private String handshake() {
+ synchronized (_samOut) {
+ try {
+ _samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes());
+ _samOut.flush();
+ _log.debug("Hello sent");
+ boolean ok = _eventHandler.waitForHelloReply();
+ _log.debug("Hello reply found: " + ok);
+ if (!ok)
+ throw new IOException("wtf, hello failed?");
+ String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + _destFile + " " + _conOptions + "\n";
+ _samOut.write(req.getBytes());
+ _samOut.flush();
+ _log.debug("Session create sent");
+ ok = _eventHandler.waitForSessionCreateReply();
+ _log.debug("Session create reply found: " + ok);
+
+ req = "NAMING LOOKUP NAME=ME\n";
+ _samOut.write(req.getBytes());
+ _samOut.flush();
+ _log.debug("Naming lookup sent");
+ String destination = _eventHandler.waitForNamingReply("ME");
+ _log.debug("Naming lookup reply found: " + destination);
+ if (destination == null) {
+ _log.error("No naming lookup reply found!");
+ return null;
+ } else {
+ _log.info(_destFile + " is located at " + destination);
+ }
+ return destination;
+ } catch (Exception e) {
+ _log.error("Error handshaking", e);
+ return null;
+ }
+ }
+ }
+
+ private boolean writeDest(String dest) {
+ try {
+ FileOutputStream fos = new FileOutputStream(_destFile);
+ fos.write(dest.getBytes());
+ fos.close();
+ return true;
+ } catch (Exception e) {
+ _log.error("Error writing to " + _destFile, e);
+ return false;
+ }
+ }
+
+ private void connectWithPeers() {
+ if (_peerDestFiles != null) {
+ for (int i = 0; i < _peerDestFiles.length; i++) {
+ try {
+ FileInputStream fin = new FileInputStream(_peerDestFiles[i]);
+ byte dest[] = new byte[1024];
+ int read = DataHelper.read(fin, dest);
+
+ String remDest = new String(dest, 0, read);
+ int con = 0;
+ Flooder flooder = null;
+ synchronized (_remotePeers) {
+ con = _remotePeers.size() + 1;
+ flooder = new Flooder(con, remDest);
+ _remotePeers.put(new Integer(con), flooder);
+ }
+
+ byte msg[] = ("STREAM CONNECT ID=" + con + " DESTINATION=" + remDest + "\n").getBytes();
+ synchronized (_samOut) {
+ _samOut.write(msg);
+ _samOut.flush();
+ }
+ I2PThread flood = new I2PThread(flooder, "Flood " + con);
+ flood.start();
+ _log.debug("Starting flooder with peer from " + _peerDestFiles[i] + ": " + con);
+ } catch (IOException ioe) {
+ _log.error("Unable to read the peer from " + _peerDestFiles[i]);
+ }
+ }
+ }
+ }
+
+ private class Flooder implements Runnable {
+ private int _connectionId;
+ private String _remoteDestination;
+ private boolean _closed;
+ private long _started;
+ private long _totalSent;
+ private long _totalReceived;
+ private long _lastReceived;
+ private long _lastReceivedOn;
+ private boolean _outOfSync;
+
+ public Flooder(int conId, String remDest) {
+ _connectionId = conId;
+ _remoteDestination = remDest;
+ _closed = false;
+ _outOfSync = false;
+ _lastReceived = -1;
+ _lastReceivedOn = _context.clock().now();
+ _context.statManager().createRateStat("swarm." + conId + ".totalReceived", "Data size received", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 });
+ _context.statManager().createRateStat("swarm." + conId + ".totalSent", "Data size sent", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 });
+ _context.statManager().createRateStat("swarm." + conId + ".started", "When we start", "swarm", new long[] { 5*60*1000 });
+ _context.statManager().createRateStat("swarm." + conId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 });
+ }
+
+ public int getConnectionId() { return _connectionId; }
+ public String getDestination() { return _remoteDestination; }
+
+ public void closed() {
+ _closed = true;
+ long lifetime = _context.clock().now() - _started;
+ _context.statManager().addRateData("swarm." + _connectionId + ".lifetime", lifetime, lifetime);
+ }
+ public void run() {
+ _started = _context.clock().now();
+ _context.statManager().addRateData("swarm." + _connectionId + ".started", 1, 0);
+ byte data[] = new byte[32*1024];
+ long value = 0;
+ long lastSend = _context.clock().now();
+ while (!_closed) {
+ byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + data.length + "\n").getBytes();
+ DataHelper.toLong(data, 0, 4, value);
+ try {
+ synchronized (_samOut) {
+ _samOut.write(msg);
+ _samOut.write(data);
+ _samOut.flush();
+ }
+ } catch (IOException ioe) {
+ _log.error("Error talking to SAM", ioe);
+ return;
+ }
+ _totalSent += data.length;
+ _context.statManager().addRateData("swarm." + _connectionId + ".totalSent", _totalSent, 0);
+ value++;
+ try { Thread.sleep(20); } catch (InterruptedException ie) {}
+ long now = _context.clock().now();
+ _log.debug("Sending " + value + " on " + _connectionId + " after " + (now-lastSend));
+ lastSend = now;
+ }
+ }
+ public void received(int len, long value) {
+ _totalReceived += len;
+ if ( (!_outOfSync) && (len % 32*1024 != 0) ) {
+ _outOfSync = true;
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("Out of sync (len=" + len + " after " + (_totalReceived-len) + ")");
+ }
+ _context.statManager().addRateData("swarm." + getConnectionId() + ".totalReceived", _totalReceived, 0);
+ if (value != _lastReceived + 1) {
+ if (!_outOfSync)
+ _log.error("Received " + value + " when expecting " + (_lastReceived+1) + " on "
+ + _connectionId + " with " + _remoteDestination.substring(0,6));
+ else
+ _log.debug("(out of sync) Received " + value + " when expecting " + (_lastReceived+1) + " on "
+ + _connectionId + " with " + _remoteDestination.substring(0,6));
+ } else {
+ _log.debug("Received " + value + " on " + _connectionId + " after " + (_context.clock().now()-_lastReceivedOn)
+ + "ms with " + _remoteDestination.substring(0,6));
+ }
+ _lastReceived = value;
+ _lastReceivedOn = _context.clock().now();
+ }
+ }
+}
\ No newline at end of file
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index b6558f252..850b530ae 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -112,6 +112,8 @@ public class Connection {
_connectLock = new Object();
_activeResends = 0;
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
+ _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
+ _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
}
public long getNextOutboundPacketNum() {
@@ -135,9 +137,14 @@ public class Connection {
boolean packetSendChoke(long timeoutMs) {
if (false) return true;
long writeExpire = timeoutMs;
+ long start = _context.clock().now();
+ boolean started = false;
while (true) {
long timeLeft = writeExpire - _context.clock().now();
synchronized (_outboundPackets) {
+ if (!started)
+ _context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size(), timeoutMs);
+ started = true;
if (_outboundPackets.size() >= _options.getWindowSize()) {
if (writeExpire > 0) {
if (timeLeft <= 0) {
@@ -154,6 +161,7 @@ public class Connection {
try { _outboundPackets.wait(); } catch (InterruptedException ie) {}
}
} else {
+ _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start);
return true;
}
}
@@ -325,14 +333,19 @@ public class Connection {
_occurredEventCount++;
} else {
_occurredTime = now;
- if (_occurredEventCount > 100) {
- _log.log(Log.CRIT, "More than 100 events (" + _occurredEventCount + ") in a second on "
- + toString() + ": scheduler = " + sched);
+ if ( (_occurredEventCount > 1000) && (_log.shouldLog(Log.WARN)) ) {
+ _log.warn("More than 1000 events (" + _occurredEventCount + ") in a second on "
+ + toString() + ": scheduler = " + sched);
}
_occurredEventCount = 0;
}
+
+ long before = System.currentTimeMillis();
sched.eventOccurred(this);
+ long elapsed = System.currentTimeMillis() - before;
+ if ( (elapsed > 1000) && (_log.shouldLog(Log.WARN)) )
+ _log.warn("Took " + elapsed + "ms to pump through " + sched);
}
void resetReceived() {
@@ -714,6 +727,7 @@ public class Connection {
public ResendPacketEvent(PacketLocal packet) {
_packet = packet;
_currentIsActiveResend = false;
+ packet.setResendPacketEvent(ResendPacketEvent.this);
}
public void timeReached() {
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
index d8a0f7681..4acb1049a 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
@@ -64,7 +64,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if (doSend) {
PacketLocal packet = send(buf, off, size);
//dont wait for non-acks
- if ( (packet.getPayloadSize() > 0) || (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) )
+ if ( (packet.getSequenceNum() > 0) || (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) )
return packet;
else
return _dummyStatus;
@@ -95,8 +95,16 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
* @return the packet sent
*/
public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) {
+ long before = System.currentTimeMillis();
PacketLocal packet = buildPacket(buf, off, size, forceIncrement);
+ long built = System.currentTimeMillis();
_connection.sendPacket(packet);
+ long sent = System.currentTimeMillis();
+
+ if ( (built-before > 1000) && (_log.shouldLog(Log.WARN)) )
+ _log.warn("wtf, took " + (built-before) + "ms to build a packet: " + packet);
+ if ( (sent-built> 1000) && (_log.shouldLog(Log.WARN)) )
+ _log.warn("wtf, took " + (sent-built) + "ms to send a packet: " + packet);
return packet;
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
index fd928b691..c7acaac5b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
@@ -87,7 +87,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
- setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND));
+ setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT));
setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE);
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index fdf5cc579..ecbfc95b3 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -30,12 +30,22 @@ public class ConnectionPacketHandler {
/** distribute a packet to the connection specified */
void receivePacket(Packet packet, Connection con) throws I2PException {
boolean ok = verifyPacket(packet, con);
- if (!ok) return;
+ if (!ok) {
+ if ( (!packet.isFlagSet(Packet.FLAG_RESET)) && (_log.shouldLog(Log.ERROR)) )
+ _log.error("Packet does NOT verify: " + packet);
+ return;
+ }
con.packetReceived();
- if (con.getInputStream().getTotalQueuedSize() > con.getOptions().getInboundBufferSize()) {
+ long ready = con.getInputStream().getHighestReadyBockId();
+ int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize();
+ int allowedBlocks = available/con.getOptions().getMaxMessageSize();
+ if (packet.getSequenceNum() > ready + allowedBlocks) {
if (_log.shouldLog(Log.WARN))
- _log.warn("Inbound buffer exceeded on connection " + con + ": dropping " + packet);
+ _log.warn("Inbound buffer exceeded on connection " + con + " ("
+ + ready + "/"+ (ready+allowedBlocks) + "/" + available
+ + ": dropping " + packet);
+ ack(con, packet.getAckThrough(), packet.getNacks(), null, false);
con.getOptions().setChoke(5*1000);
return;
}
@@ -95,8 +105,20 @@ public class ConnectionPacketHandler {
}
}
+ boolean fastAck = ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew);
+ con.eventOccurred();
+ if (fastAck) {
+ if (con.getLastSendTime() + con.getOptions().getRTT() < _context.clock().now()) {
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Fast ack for dup " + packet);
+ con.ackImmediately();
+ }
+ }
+ }
+
+ private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew) {
int numResends = 0;
- List acked = con.ackPackets(packet.getAckThrough(), packet.getNacks());
+ List acked = con.ackPackets(ackThrough, nacks);
if ( (acked != null) && (acked.size() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(acked.size() + " of our packets acked with " + packet);
@@ -130,18 +152,15 @@ public class ConnectionPacketHandler {
_context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT);
}
- boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0));
- con.eventOccurred();
- if (fastAck) {
- if (con.getLastSendTime() + con.getOptions().getRTT() < _context.clock().now()) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Fast ack for dup " + packet);
- con.ackImmediately();
- }
- }
+ if (packet != null)
+ return adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0));
+ else
+ return adjustWindow(con, false, -1, numResends, (acked != null ? acked.size() : 0));
}
+
private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked) {
+ boolean congested = false;
if ( (!isNew) && (sequenceNum > 0) ) {
// dup real packet
int oldSize = con.getOptions().getWindowSize();
@@ -156,64 +175,38 @@ public class ConnectionPacketHandler {
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends
+ ") for " + con);
- return true;
- //} else if (numResends > 0) {
- // window sizes are shrunk on resend, not on ack
- } else {
- if (acked > 0) {
- long lowest = con.getHighestAckedThrough();
- if (lowest >= con.getCongestionWindowEnd()) {
- // new packet that ack'ed uncongested data, or an empty ack
- int newWindowSize = con.getOptions().getWindowSize();
-
- if (numResends <= 0) {
- if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
- // congestion avoidance
+ congested = true;
+ }
+
+ long lowest = con.getHighestAckedThrough();
+ if (lowest >= con.getCongestionWindowEnd()) {
+ // new packet that ack'ed uncongested data, or an empty ack
+ int newWindowSize = con.getOptions().getWindowSize();
- // we can't use newWindowSize += 1/newWindowSize, since we're
- // integers, so lets use a random distribution instead
- int shouldIncrement = _context.random().nextInt(newWindowSize);
- if (shouldIncrement <= 0)
- newWindowSize += 1;
- } else {
- // slow start
- newWindowSize += 1;
- }
- }
-
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("New window size " + newWindowSize + " congestionSeenAt: "
- + con.getLastCongestionSeenAt() + " (#resends: " + numResends
- + ") for " + con);
- con.getOptions().setWindowSize(newWindowSize);
- con.setCongestionWindowEnd(newWindowSize + lowest);
+ if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
+ if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
+ // congestion avoidance
+
+ // we can't use newWindowSize += 1/newWindowSize, since we're
+ // integers, so lets use a random distribution instead
+ int shouldIncrement = _context.random().nextInt(newWindowSize);
+ if (shouldIncrement <= 0)
+ newWindowSize += 1;
+ } else {
+ // slow start
+ newWindowSize += 1;
}
- } else {
- // received a message that doesn't contain a new ack
-
- // ehh. cant do this, as we SACK and the acks may be
- // received out of order:
- // Alice: RECEIVE 2
- // Alice: SEND ack 2 nack 1
- // Alice: RECEIVE 1
- // Alice: SEND ack 2
- // Bob: RECEIVE ack 2
- // Bob: RECEIVE ack 2 nack 1 <-- NOT bad
-
- /*
- if (con.getUnackedPacketsSent() > 0) {
- // peer got a dup
- int oldSize = con.getOptions().getWindowSize();
- oldSize >>>= 1;
- if (oldSize <= 0)
- oldSize = 1;
- con.getOptions().setWindowSize(oldSize);
- return false;
- }
- */
}
+
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("New window size " + newWindowSize + " congestionSeenAt: "
+ + con.getLastCongestionSeenAt() + " (#resends: " + numResends
+ + ") for " + con);
+ con.getOptions().setWindowSize(newWindowSize);
+ con.setCongestionWindowEnd(newWindowSize + lowest);
}
- return false;
+
+ return congested;
}
/**
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
index 0ff68c5a5..618916208 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
@@ -33,9 +33,9 @@ public class MessageInputStream extends InputStream {
private List _readyDataBlocks;
private int _readyDataBlockIndex;
/** highest message ID used in the readyDataBlocks */
- private long _highestReadyBlockId;
+ private volatile long _highestReadyBlockId;
/** highest overall message ID */
- private long _highestBlockId;
+ private volatile long _highestBlockId;
/**
* Message ID (Long) to ByteArray for blocks received
* out of order when there are lower IDs not yet
@@ -74,15 +74,13 @@ public class MessageInputStream extends InputStream {
/** What is the highest block ID we've completely received through? */
public long getHighestReadyBockId() {
- synchronized (_dataLock) {
- return _highestReadyBlockId;
- }
+ // not synchronized as it doesnt hurt to read a too-low value
+ return _highestReadyBlockId;
}
public long getHighestBlockId() {
- synchronized (_dataLock) {
- return _highestBlockId;
- }
+ // not synchronized as it doesnt hurt to read a too-low value
+ return _highestBlockId;
}
/**
@@ -394,6 +392,21 @@ public class MessageInputStream extends InputStream {
}
}
+ public int getTotalReadySize() {
+ synchronized (_dataLock) {
+ if (_locallyClosed) return 0;
+ int numBytes = 0;
+ for (int i = 0; i < _readyDataBlocks.size(); i++) {
+ ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
+ if (i == 0)
+ numBytes += cur.getData().length - _readyDataBlockIndex;
+ else
+ numBytes += cur.getData().length;
+ }
+ return numBytes;
+ }
+ }
+
public void close() {
synchronized (_dataLock) {
_readyDataBlocks.clear();
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
index 694c7b5bb..97268a0ed 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
@@ -47,7 +47,7 @@ public class MessageOutputStream extends OutputStream {
_written = 0;
_closed = false;
_writeTimeout = -1;
- _passiveFlushDelay = 5*1000;
+ _passiveFlushDelay = 2*1000;
_flusher = new Flusher();
if (_log.shouldLog(Log.DEBUG))
_log.debug("MessageOutputStream created");
@@ -83,8 +83,7 @@ public class MessageOutputStream extends OutputStream {
remaining = 0;
_lastBuffered = _context.clock().now();
if (_passiveFlushDelay > 0) {
- // if it is already enqueued, this just pushes it further out
- SimpleTimer.getInstance().addEvent(_flusher, _passiveFlushDelay);
+ _flusher.enqueue();
}
} else {
// buffer whatever we can fit then flush,
@@ -115,9 +114,9 @@ public class MessageOutputStream extends OutputStream {
ws.waitForAccept(_writeTimeout);
if (!ws.writeAccepted()) {
if (_writeTimeout > 0)
- throw new InterruptedIOException("Write not accepted within timeout");
+ throw new InterruptedIOException("Write not accepted within timeout: " + ws);
else
- throw new IOException("Write not accepted into the queue");
+ throw new IOException("Write not accepted into the queue: " + ws);
}
} else {
if (_log.shouldLog(Log.DEBUG))
@@ -140,7 +139,24 @@ public class MessageOutputStream extends OutputStream {
* period of inactivity
*/
private class Flusher implements SimpleTimer.TimedEvent {
+ private boolean _enqueued;
+ public void enqueue() {
+ // no need to be overly worried about duplicates - it would just
+ // push it further out
+ if (!_enqueued)
+ SimpleTimer.getInstance().addEvent(_flusher, _passiveFlushDelay);
+ _enqueued = true;
+ }
public void timeReached() {
+ _enqueued = false;
+ long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now());
+ if (timeLeft > 0)
+ enqueue();
+ else
+ doFlush();
+ }
+
+ private void doFlush() {
boolean sent = false;
WriteStatus ws = null;
synchronized (_dataLock) {
@@ -159,7 +175,6 @@ public class MessageOutputStream extends OutputStream {
if (sent && _log.shouldLog(Log.DEBUG))
_log.debug("Passive flush of " + ws);
}
-
}
/**
@@ -275,6 +290,7 @@ public class MessageOutputStream extends OutputStream {
}
void flushAvailable(DataReceiver target, boolean blocking) throws IOException {
WriteStatus ws = null;
+ long before = System.currentTimeMillis();
synchronized (_dataLock) {
// _buf may be null, but the data receiver can handle that just fine,
// deciding whether or not to send a packet
@@ -284,6 +300,10 @@ public class MessageOutputStream extends OutputStream {
_dataLock.notifyAll();
_lastFlushed = _context.clock().now();
}
+ long afterBuild = System.currentTimeMillis();
+ if ( (afterBuild - before > 1000) && (_log.shouldLog(Log.DEBUG)) )
+ _log.debug("Took " + (afterBuild-before) + "ms to build a packet? " + ws);
+
if (blocking && ws != null) {
ws.waitForAccept(_writeTimeout);
if (ws.writeFailed())
@@ -291,6 +311,9 @@ public class MessageOutputStream extends OutputStream {
else if (!ws.writeAccepted())
throw new InterruptedIOException("Flush available timed out");
}
+ long afterAccept = System.currentTimeMillis();
+ if ( (afterAccept - afterBuild > 1000) && (_log.shouldLog(Log.DEBUG)) )
+ _log.debug("Took " + (afterAccept-afterBuild) + "ms to accept a packet? " + ws);
return;
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
index 08ad87da4..fae7cd0e3 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
@@ -104,7 +104,7 @@ public class PacketHandler {
}
}
- private static final SimpleDateFormat _fmt = new SimpleDateFormat("hh:mm:ss.SSS");
+ private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS");
void displayPacket(Packet packet, String prefix) {
String msg = null;
synchronized (_fmt) {
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java
index 27c093f47..aeac85cbd 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java
@@ -5,6 +5,8 @@ import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.data.SessionKey;
+import net.i2p.util.Log;
+import net.i2p.util.SimpleTimer;
/**
* coordinate local attributes about a packet - send time, ack time, number of
@@ -12,6 +14,7 @@ import net.i2p.data.SessionKey;
*/
public class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
private I2PAppContext _context;
+ private Log _log;
private Connection _connection;
private Destination _to;
private SessionKey _keyUsed;
@@ -22,6 +25,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
private long _acceptedOn;
private long _ackOn;
private long _cancelledOn;
+ private SimpleTimer.TimedEvent _resendEvent;
public PacketLocal(I2PAppContext ctx, Destination to) {
this(ctx, to, null);
@@ -29,6 +33,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public PacketLocal(I2PAppContext ctx, Destination to, Connection con) {
_context = ctx;
_createdOn = ctx.clock().now();
+ _log = ctx.logManager().getLog(PacketLocal.class);
_to = to;
_connection = con;
_lastSend = -1;
@@ -78,12 +83,16 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
_ackOn = _context.clock().now();
notifyAll();
}
+ SimpleTimer.getInstance().removeEvent(_resendEvent);
}
public void cancelled() {
synchronized (this) {
_cancelledOn = _context.clock().now();
notifyAll();
}
+ SimpleTimer.getInstance().removeEvent(_resendEvent);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Cancelled! " + toString(), new Exception("cancelled"));
}
/** how long after packet creation was it acked? */
@@ -97,6 +106,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public long getLastSend() { return _lastSend; }
public Connection getConnection() { return _connection; }
+ public void setResendPacketEvent(SimpleTimer.TimedEvent evt) { _resendEvent = evt; }
+
public String toString() {
String str = super.toString();
if (_ackOn > 0)
@@ -108,12 +119,24 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public void waitForAccept(int maxWaitMs) {
if (_connection == null)
throw new IllegalStateException("Cannot wait for accept with no connection");
- long expiration = _context.clock().now()+maxWaitMs;
+ long before = _context.clock().now();
+ long expiration = before+maxWaitMs;
+ int queued = _connection.getUnackedPacketsSent();
+ int window = _connection.getOptions().getWindowSize();
boolean accepted = _connection.packetSendChoke(maxWaitMs);
+ long after = _context.clock().now();
if (accepted)
- _acceptedOn = _context.clock().now();
+ _acceptedOn = after;
else
_acceptedOn = -1;
+ int afterQueued = _connection.getUnackedPacketsSent();
+ if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) )
+ _log.debug("Took " + (after-before) + "ms to get "
+ + (accepted ? " accepted" : " rejected")
+ + (_cancelledOn > 0 ? " and CANCELLED" : "")
+ + ", queued behind " + queued +" with a window size of " + window
+ + ", finally accepted with " + afterQueued + " queued: "
+ + toString());
}
public void waitForCompletion(int maxWaitMs) {
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java
index fceb25e42..5947524ed 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java
@@ -24,24 +24,17 @@ class PacketQueue {
private Log _log;
private I2PSession _session;
private ConnectionManager _connectionManager;
- private byte _buf[];
private ByteCache _cache = ByteCache.getInstance(64, 36*1024);
public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
_context = context;
_session = session;
_connectionManager = mgr;
- _buf = _cache.acquire().getData(); // new byte[36*1024];
_log = context.logManager().getLog(PacketQueue.class);
_context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
}
- protected void finalize() throws Throwable {
- _cache.release(new ByteArray(_buf));
- super.finalize();
- }
-
/**
* Add a new packet to be sent out ASAP
*/
@@ -53,7 +46,7 @@ class PacketQueue {
keyUsed = new SessionKey();
Set tagsSent = packet.getTagsSent();
if (tagsSent == null)
- tagsSent = new HashSet();
+ tagsSent = new HashSet(0);
// cache this from before sendMessage
String conStr = (packet.getConnection() != null ? packet.getConnection().toString() : "");
@@ -63,29 +56,36 @@ class PacketQueue {
} else {
_log.debug("Sending... " + packet);
}
+
+ ByteArray ba = _cache.acquire();
+ byte buf[] = ba.getData();
long begin = 0;
long end = 0;
boolean sent = false;
try {
int size = 0;
- synchronized (this) {
- Arrays.fill(_buf, (byte)0x0);
- if (packet.shouldSign())
- size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey());
- else
- size = packet.writePacket(_buf, 0);
+ long beforeWrite = System.currentTimeMillis();
+ if (packet.shouldSign())
+ size = packet.writeSignedPacket(buf, 0, _context, _session.getPrivateKey());
+ else
+ size = packet.writePacket(buf, 0);
+ long writeTime = System.currentTimeMillis() - beforeWrite;
+ if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) )
+ _log.warn("took " + writeTime + "ms to write the packet: " + packet);
- // this should not block!
- begin = _context.clock().now();
- sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent);
- end = _context.clock().now();
- }
+ // this should not block!
+ begin = _context.clock().now();
+ sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent);
+ end = _context.clock().now();
+
+ if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) )
+ _log.warn("Took " + (end-begin) + "ms to sendMessage(...) " + packet);
+
_context.statManager().addRateData("stream.con.sendMessageSize", size, packet.getLifetime());
if (packet.getNumSends() > 1)
_context.statManager().addRateData("stream.con.sendDuplicateSize", size, packet.getLifetime());
-
Connection con = packet.getConnection();
if (con != null) {
con.incrementBytesSent(size);
@@ -97,6 +97,8 @@ class PacketQueue {
_log.warn("Unable to send the packet " + packet, ise);
}
+ _cache.release(ba);
+
if (!sent) {
if (_log.shouldLog(Log.WARN))
_log.warn("Send failed for " + packet);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java
index 617ca9ab5..f907bc173 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java
@@ -32,5 +32,6 @@ abstract class SchedulerImpl implements TaskScheduler {
// _log.debug("firing event on " + _connection, _addedBy);
_connection.eventOccurred();
}
+ public String toString() { return "event on " + _connection; }
}
}
diff --git a/build.xml b/build.xml
index 735481180..81cce7c46 100644
--- a/build.xml
+++ b/build.xml
@@ -239,8 +239,6 @@
-
-
diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java
index 4c7466352..27494d8f7 100644
--- a/core/java/src/net/i2p/client/I2PSessionImpl2.java
+++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java
@@ -108,6 +108,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent)
throws I2PSessionException {
+ long begin = _context.clock().now();
+
SessionKey key = _context.sessionKeyManager().getCurrentKey(dest.getPublicKey());
if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey());
SessionTag tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key);
@@ -180,9 +182,17 @@ class I2PSessionImpl2 extends I2PSessionImpl {
+ " sync took " + (inSendingSync-beforeSendingSync)
+ " add took " + (afterSendingSync-inSendingSync));
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
+
+ // since this is 'best effort', all we're waiting for is a status update
+ // saying that the router received it - in theory, that should come back
+ // immediately, but in practice can take up to a second (though usually
+ // much quicker). setting this to false will short-circuit that delay
+ boolean actuallyWait = true;
+
long beforeWaitFor = _context.clock().now();
- state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED,
- _context.clock().now() + getTimeout());
+ if (actuallyWait)
+ state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED,
+ _context.clock().now() + getTimeout());
long afterWaitFor = _context.clock().now();
long inRemovingSync = 0;
synchronized (_sendingStates) {
@@ -190,7 +200,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
_sendingStates.remove(state);
}
long afterRemovingSync = _context.clock().now();
- boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
+ boolean found = !actuallyWait || state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
+ " / " + state.getNonce() + " found = " + found);
@@ -200,6 +210,13 @@ class I2PSessionImpl2 extends I2PSessionImpl {
_log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz"));
}
+ if ( (afterRemovingSync - begin > 500) && (_log.shouldLog(Log.WARN) ) ) {
+ _log.warn("Took " + (afterRemovingSync-begin) + "ms to sendBestEffort, "
+ + (afterSendingSync-begin) + "ms to prepare, "
+ + (beforeWaitFor-afterSendingSync) + "ms to send, "
+ + (afterRemovingSync-beforeWaitFor) + "ms waiting for reply");
+ }
+
if (found) {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with "
diff --git a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java
index b217b1816..4902e9d40 100644
--- a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java
+++ b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java
@@ -16,6 +16,7 @@ import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.ReceiveMessageEndMessage;
+import net.i2p.util.Log;
/**
* Handle I2CP MessagePayloadMessages from the router delivering the contents
@@ -30,7 +31,8 @@ class MessagePayloadMessageHandler extends HandlerImpl {
}
public void handleMessage(I2CPMessage message, I2PSessionImpl session) {
- _log.debug("Handle message " + message);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Handle message " + message);
try {
MessagePayloadMessage msg = (MessagePayloadMessage) message;
MessageId id = msg.getMessageId();
@@ -55,9 +57,8 @@ class MessagePayloadMessageHandler extends HandlerImpl {
Payload payload = msg.getPayload();
byte[] data = _context.elGamalAESEngine().decrypt(payload.getEncryptedData(), session.getDecryptionKey());
if (data == null) {
- _log
- .error("Error decrypting the payload to public key "
- + session.getMyDestination().getPublicKey().toBase64() + "\nPayload: " + payload.calculateHash());
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("Error decrypting the payload");
throw new DataFormatException("Unable to decrypt the payload");
}
payload.setUnencryptedData(data);
diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java
index dc7e92e37..d226004aa 100644
--- a/core/java/src/net/i2p/data/DataHelper.java
+++ b/core/java/src/net/i2p/data/DataHelper.java
@@ -735,7 +735,7 @@ public class DataHelper {
/** decompress the GZIP compressed data (returning null on error) */
public static byte[] decompress(byte orig[]) throws IOException {
- return decompress(orig, 0, orig.length);
+ return (orig != null ? decompress(orig, 0, orig.length) : null);
}
public static byte[] decompress(byte orig[], int offset, int length) throws IOException {
if ((orig == null) || (orig.length <= 0)) return orig;
diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java
index 0ce17b02e..7f71db6b4 100644
--- a/core/java/src/net/i2p/util/SimpleTimer.java
+++ b/core/java/src/net/i2p/util/SimpleTimer.java
@@ -18,20 +18,30 @@ import net.i2p.I2PAppContext;
public class SimpleTimer {
private static final SimpleTimer _instance = new SimpleTimer();
public static SimpleTimer getInstance() { return _instance; }
+ private I2PAppContext _context;
private Log _log;
/** event time (Long) to event (TimedEvent) mapping */
private TreeMap _events;
/** event (TimedEvent) to event time (Long) mapping */
private Map _eventTimes;
+ private List _readyEvents;
private SimpleTimer() {
- _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
+ _context = I2PAppContext.getGlobalContext();
+ _log = _context.logManager().getLog(SimpleTimer.class);
_events = new TreeMap();
_eventTimes = new HashMap();
+ _readyEvents = new ArrayList(4);
I2PThread runner = new I2PThread(new SimpleTimerRunner());
runner.setName("SimpleTimer");
runner.setDaemon(true);
runner.start();
+ for (int i = 0; i < 3; i++) {
+ I2PThread executor = new I2PThread(new Executor());
+ executor.setName("SimpleTimerExecutor " + i);
+ executor.setDaemon(true);
+ executor.start();
+ }
}
/**
@@ -40,18 +50,42 @@ public class SimpleTimer {
*/
public void addEvent(TimedEvent event, long timeoutMs) {
long eventTime = System.currentTimeMillis() + timeoutMs;
+ Long time = new Long(eventTime);
synchronized (_events) {
// remove the old scheduled position, then reinsert it
if (_eventTimes.containsKey(event))
_events.remove(_eventTimes.get(event));
- while (_events.containsKey(new Long(eventTime)))
- eventTime++;
- _events.put(new Long(eventTime), event);
- _eventTimes.put(event, new Long(eventTime));
+ while (_events.containsKey(time))
+ time = new Long(time.longValue() + 1);
+ _events.put(time, event);
+ _eventTimes.put(event, time);
+
+ if ( (_events.size() != _eventTimes.size()) ) {
+ _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
+ for (Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext(); ) {
+ TimedEvent evt = (TimedEvent)iter.next();
+ Long when = (Long)_eventTimes.get(evt);
+ TimedEvent cur = (TimedEvent)_events.get(when);
+ if (cur != evt) {
+ _log.error("event " + evt + " @ " + when + ": " + cur);
+ }
+ }
+ }
+
_events.notifyAll();
}
}
+ public boolean removeEvent(TimedEvent evt) {
+ if (evt == null) return false;
+ synchronized (_events) {
+ Long when = (Long)_eventTimes.remove(evt);
+ if (when != null)
+ _events.remove(when);
+ return null != when;
+ }
+ }
+
/**
* Simple interface for events to be queued up and notified on expiration
*/
@@ -82,8 +116,8 @@ public class SimpleTimer {
while (true) {
try {
synchronized (_events) {
- if (_events.size() <= 0)
- _events.wait();
+ //if (_events.size() <= 0)
+ // _events.wait();
//if (_events.size() > 100)
// _log.warn("> 100 events! " + _events.values());
long now = System.currentTimeMillis();
@@ -97,7 +131,7 @@ public class SimpleTimer {
if (evt != null) {
_eventTimes.remove(evt);
eventsToFire.add(evt);
- }
+ }
} else {
nextEventDelay = when.longValue() - now;
nextEvent = _events.get(when);
@@ -128,32 +162,20 @@ public class SimpleTimer {
long now = System.currentTimeMillis();
now = now - (now % 1000);
- for (int i = 0; i < eventsToFire.size(); i++) {
- TimedEvent evt = (TimedEvent)eventsToFire.get(i);
- try {
- evt.timeReached();
- } catch (Throwable t) {
- log("wtf, event borked: " + evt, t);
- }
- _recentEvents[4] = _recentEvents[3];
- _recentEvents[3] = _recentEvents[2];
- _recentEvents[2] = _recentEvents[1];
- _recentEvents[1] = _recentEvents[0];
- _recentEvents[0] = evt;
+ synchronized (_readyEvents) {
+ for (int i = 0; i < eventsToFire.size(); i++)
+ _readyEvents.add(eventsToFire.get(i));
+ _readyEvents.notifyAll();
}
if (_occurredTime == now) {
_occurredEventCount += eventsToFire.size();
} else {
_occurredTime = now;
- if (_occurredEventCount > 100) {
- StringBuffer buf = new StringBuffer(256);
+ if (_occurredEventCount > 1000) {
+ StringBuffer buf = new StringBuffer(128);
buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
- buf.append(") in a second! Last 5: \n");
- for (int i = 0; i < _recentEvents.length; i++) {
- if (_recentEvents[i] != null)
- buf.append(_recentEvents[i]).append('\n');
- }
+ buf.append(") in a second!");
_log.log(Log.CRIT, buf.toString());
}
_occurredEventCount = 0;
@@ -163,4 +185,30 @@ public class SimpleTimer {
}
}
}
+
+ private class Executor implements Runnable {
+ public void run() {
+ while (true) {
+ TimedEvent evt = null;
+ synchronized (_readyEvents) {
+ if (_readyEvents.size() <= 0)
+ try { _readyEvents.wait(); } catch (InterruptedException ie) {}
+ if (_readyEvents.size() > 0)
+ evt = (TimedEvent)_readyEvents.remove(0);
+ }
+
+ if (evt != null) {
+ long before = _context.clock().now();
+ try {
+ evt.timeReached();
+ } catch (Throwable t) {
+ log("wtf, event borked: " + evt, t);
+ }
+ long time = _context.clock().now() - before;
+ if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
+ _log.warn("wtf, event execution took " + time + ": " + evt);
+ }
+ }
+ }
+ }
}
diff --git a/history.txt b/history.txt
index 05d19cdc3..ae325f69b 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,33 @@
-$Id: history.txt,v 1.92 2004/12/01 19:27:27 jrandom Exp $
+$Id: history.txt,v 1.93 2004/12/01 22:20:04 jrandom Exp $
+
+2004-12-03 jrandom
+ * Toss in a small pool of threads (3) to execute the events queued up with
+ the SimpleTimer, as we do currently see the occational event
+ notification spiking up to a second or so.
+ * Implement a SAM client API in java, useful for event based streaming (or
+ for testing the SAM bridge)
+ * Added support to shut down the SAM bridge on OOM (useful if the SAM
+ bridge is being run outside of the router).
+ * Include the SAM test code in the sam.jar
+ * Remove an irrelevent warning message from SAM, which was caused by
+ perfectly normal operation due to a session being closed.
+ * Removed some unnecessary synchronization in the streaming lib's
+ PacketQueue
+ * More quickly clean up the memory used by the streaming lib by
+ immediately killing each packet's resend job as soon as it is ACKed (or
+ cancelled), so that there are no longer any valid pointers to the
+ (potentially 32KB) packet.
+ * Fixed the timestamps dumped to stdout when debugging the PacketHandler.
+ * Drop packets that would expand our inbound window beyond our maximum
+ buffer size (default 32 messages)
+ * Always read the ACK/NACK data from the verified packets received, even
+ if we are going to drop them
+ * Always adjust the window when there are messages ACKed, though do not
+ change its size except as before.
+ * Streamlined some synchronization in the router's I2CP handling
+ * Streamlined some memory allocation in the SAM bridge
+ * Default the streaming lib to disconnect on inactivity, rather than send
+ an empty message.
2004-12-01 jrandom
* Fix for a race in the streaming lib as caused by some odd SAM activity
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index b1ad56031..b366e74ed 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
- public final static String ID = "$Revision: 1.97 $ $Date: 2004/12/01 19:27:27 $";
+ public final static String ID = "$Revision: 1.98 $ $Date: 2004/12/01 22:20:03 $";
public final static String VERSION = "0.4.2.2";
- public final static long BUILD = 1;
+ public final static long BUILD = 2;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);
diff --git a/router/java/src/net/i2p/router/client/ClientWriterRunner.java b/router/java/src/net/i2p/router/client/ClientWriterRunner.java
index b8bd8c6e8..57304bbeb 100644
--- a/router/java/src/net/i2p/router/client/ClientWriterRunner.java
+++ b/router/java/src/net/i2p/router/client/ClientWriterRunner.java
@@ -25,8 +25,6 @@ class ClientWriterRunner implements Runnable {
private static final long MAX_WAIT = 5*1000;
- /** notify this lock when there are messages to write */
- private Object _activityLock = new Object();
/** lock on this when updating the class level data structs */
private Object _dataLock = new Object();
@@ -47,9 +45,7 @@ class ClientWriterRunner implements Runnable {
synchronized (_dataLock) {
_messagesToWrite.add(msg);
_messagesToWriteTimes.add(new Long(_context.clock().now()));
- }
- synchronized (_activityLock) {
- _activityLock.notifyAll();
+ _dataLock.notifyAll();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("["+_id+"] addMessage completed for " + msg.getClass().getName());
@@ -60,8 +56,8 @@ class ClientWriterRunner implements Runnable {
*
*/
public void stopWriting() {
- synchronized (_activityLock) {
- _activityLock.notifyAll();
+ synchronized (_dataLock) {
+ _dataLock.notifyAll();
}
}
public void run() {
@@ -70,6 +66,9 @@ class ClientWriterRunner implements Runnable {
List messageTimes = null;
synchronized (_dataLock) {
+ if (_messagesToWrite.size() <= 0)
+ try { _dataLock.wait(); } catch (InterruptedException ie) {}
+
if (_messagesToWrite.size() > 0) {
messages = new ArrayList(_messagesToWrite.size());
messageTimes = new ArrayList(_messagesToWriteTimes.size());
@@ -80,16 +79,7 @@ class ClientWriterRunner implements Runnable {
}
}
- if (messages == null) {
- try {
- synchronized (_activityLock) {
- _activityLock.wait();
- }
- } catch (InterruptedException ie) {
- if (_log.shouldLog(Log.WARN))
- _log.warn("Interrupted while waiting for activity", ie);
- }
- } else {
+ if (messages != null) {
for (int i = 0; i < messages.size(); i++) {
I2CPMessage msg = (I2CPMessage)messages.get(i);
Long when = (Long)messageTimes.get(i);
diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
index b12bf9588..63dfd1c47 100644
--- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
+++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
@@ -769,8 +769,8 @@ public class TCPTransport extends TransportImpl {
/** Make this stuff pretty (only used in the old console) */
public String renderStatusHTML() {
StringBuffer buf = new StringBuffer(1024);
- buf.append("Connections:
\n");
synchronized (_connectionLock) {
+ buf.append("Connections (").append(_connectionsByIdent.size()).append("):\n");
for (Iterator iter = _connectionsByIdent.values().iterator(); iter.hasNext(); ) {
TCPConnection con = (TCPConnection)iter.next();
buf.append("- ");