diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java
index 014d91e9b..40566fe41 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java
@@ -12,6 +12,7 @@ import java.net.ConnectException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.Properties;
@@ -219,7 +220,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
if (_log.shouldLog(Log.ERROR))
_log.error("Error accepting", ce);
// not killing the server..
- }
+ } catch(SocketTimeoutException ste) {
+ // ignored, we never set the timeout
+ }
}
}
}
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java
index 726d462ce..7c9927395 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java
@@ -2,6 +2,7 @@ package net.i2p.client.streaming;
import java.net.ConnectException;
+import java.net.SocketTimeoutException;
import net.i2p.I2PException;
/**
@@ -9,26 +10,40 @@ import net.i2p.I2PException;
*
*/
public interface I2PServerSocket {
- /**
- * Closes the socket.
- */
- public void close() throws I2PException;
- /**
- * Waits for the next socket connecting. If a remote user tried to make a
- * connection and the local application wasn't .accept()ing new connections,
- * they should get refused (if .accept() doesnt occur in some small period)
- *
- * @return a connected I2PSocket
- *
- * @throws I2PException if there is a problem with reading a new socket
- * from the data available (aka the I2PSession closed, etc)
- * @throws ConnectException if the I2PServerSocket is closed
- */
- public I2PSocket accept() throws I2PException, ConnectException;
+ /**
+ * Closes the socket.
+ */
+ public void close() throws I2PException;
- /**
- * Access the manager which is coordinating the server socket
- */
- public I2PSocketManager getManager();
+ /**
+ * Waits for the next socket connecting. If a remote user tried to make a
+ * connection and the local application wasn't .accept()ing new connections,
+ * they should get refused (if .accept() doesnt occur in some small period)
+ *
+ * @return a connected I2PSocket
+ *
+ * @throws I2PException if there is a problem with reading a new socket
+ * from the data available (aka the I2PSession closed, etc)
+ * @throws ConnectException if the I2PServerSocket is closed
+ * @throws SocketTimeoutException
+ */
+ public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
+
+ /**
+ * Set Sock Option accept timeout
+ * @param x
+ */
+ public void setSoTimeout(long x);
+
+ /**
+ * Get Sock Option accept timeout
+ * @return timeout
+ */
+ public long getSoTimeout();
+
+ /**
+ * Access the manager which is coordinating the server socket
+ */
+ public I2PSocketManager getManager();
}
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java
index 965ba31bf..2e3dfdb6b 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java
@@ -17,134 +17,159 @@ import net.i2p.util.Log;
*
*/
class I2PServerSocketImpl implements I2PServerSocket {
- private final static Log _log = new Log(I2PServerSocketImpl.class);
- private I2PSocketManager mgr;
- /** list of sockets waiting for the client to accept them */
- private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
-
- /** have we been closed */
- private volatile boolean closing = false;
-
- /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
- private Object socketAcceptedLock = new Object();
- /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
- private Object socketAddedLock = new Object();
-
- public I2PServerSocketImpl(I2PSocketManager mgr) {
- this.mgr = mgr;
- }
-
- /**
- * Waits for the next socket connecting. If a remote user tried to make a
- * connection and the local application wasn't .accept()ing new connections,
- * they should get refused (if .accept() doesnt occur in some small period -
- * currently 5 seconds)
- *
- * @return a connected I2PSocket
- *
- * @throws I2PException if there is a problem with reading a new socket
- * from the data available (aka the I2PSession closed, etc)
- * @throws ConnectException if the I2PServerSocket is closed
- */
- public I2PSocket accept() throws I2PException, ConnectException {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("accept() called, pending: " + pendingSockets.size());
-
- I2PSocket ret = null;
-
- while ( (ret == null) && (!closing) ){
- while (pendingSockets.size() <= 0) {
- if (closing) throw new ConnectException("I2PServerSocket closed");
- try {
- synchronized(socketAddedLock) {
- socketAddedLock.wait();
- }
- } catch (InterruptedException ie) {}
- }
- synchronized (pendingSockets) {
- if (pendingSockets.size() > 0) {
- ret = (I2PSocket)pendingSockets.remove(0);
- }
- }
- if (ret != null) {
- synchronized (socketAcceptedLock) {
- socketAcceptedLock.notifyAll();
- }
- }
- }
-
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("TIMING: handed out accept result " + ret.hashCode());
- return ret;
- }
-
- /**
- * Make the socket available and wait until the client app accepts it, or until
- * the given timeout elapses. This doesn't have any limits on the queue size -
- * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
- *
- * @param timeoutMs how long to wait until accept
- * @return true if the socket was accepted, false if the timeout expired
- * or the socket was closed
- */
- public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
-
- if (closing) {
- if (_log.shouldLog(Log.WARN))
- _log.warn("Already closing the socket");
- return false;
- }
-
- Clock clock = I2PAppContext.getGlobalContext().clock();
- long start = clock.now();
- long end = start + timeoutMs;
- pendingSockets.add(s);
- synchronized (socketAddedLock) {
- socketAddedLock.notifyAll();
- }
-
- // keep looping until the socket has been grabbed by the accept()
- // (or the expiration passes, or the socket is closed)
- while (pendingSockets.contains(s)) {
- long now = clock.now();
- if (now >= end) {
- if (_log.shouldLog(Log.INFO))
- _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
- pendingSockets.remove(s);
- return false;
- }
- if (closing) {
- if (_log.shouldLog(Log.WARN))
- _log.warn("Server socket closed while waiting for accept");
- pendingSockets.remove(s);
- return false;
- }
- long remaining = end - now;
- try {
- synchronized (socketAcceptedLock) {
- socketAcceptedLock.wait(remaining);
- }
- } catch (InterruptedException ie) {}
- }
- long now = clock.now();
- if (_log.shouldLog(Log.DEBUG))
- _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
- return true;
- }
-
- public void close() {
- closing = true;
- // let anyone .accept()ing know to fsck off
- synchronized (socketAddedLock) {
- socketAddedLock.notifyAll();
- }
- // let anyone addWaitForAccept()ing know to fsck off
- synchronized (socketAcceptedLock) {
- socketAcceptedLock.notifyAll();
- }
- }
-
- public I2PSocketManager getManager() { return mgr; }
+
+ private final static Log _log = new Log(I2PServerSocketImpl.class);
+ private I2PSocketManager mgr;
+ /** list of sockets waiting for the client to accept them */
+ private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
+ /** have we been closed */
+ private volatile boolean closing = false;
+ /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
+ private Object socketAcceptedLock = new Object();
+ /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
+ private Object socketAddedLock = new Object();
+
+ /**
+ * Set Sock Option accept timeout stub, does nothing
+ * @param x
+ */
+ public void setSoTimeout(long x) {
+ }
+
+ /**
+ * Get Sock Option accept timeout stub, does nothing
+ * @return timeout
+ */
+ public long getSoTimeout() {
+ return -1;
+ }
+
+ public I2PServerSocketImpl(I2PSocketManager mgr) {
+ this.mgr = mgr;
+ }
+
+ /**
+ * Waits for the next socket connecting. If a remote user tried to make a
+ * connection and the local application wasn't .accept()ing new connections,
+ * they should get refused (if .accept() doesnt occur in some small period -
+ * currently 5 seconds)
+ *
+ * @return a connected I2PSocket
+ *
+ * @throws I2PException if there is a problem with reading a new socket
+ * from the data available (aka the I2PSession closed, etc)
+ * @throws ConnectException if the I2PServerSocket is closed
+ */
+ public I2PSocket accept() throws I2PException, ConnectException {
+ if(_log.shouldLog(Log.DEBUG)) {
+ _log.debug("accept() called, pending: " + pendingSockets.size());
+ }
+ I2PSocket ret = null;
+
+ while((ret == null) && (!closing)) {
+ while(pendingSockets.size() <= 0) {
+ if(closing) {
+ throw new ConnectException("I2PServerSocket closed");
+ }
+ try {
+ synchronized(socketAddedLock) {
+ socketAddedLock.wait();
+ }
+ } catch(InterruptedException ie) {
+ }
+ }
+ synchronized(pendingSockets) {
+ if(pendingSockets.size() > 0) {
+ ret = (I2PSocket)pendingSockets.remove(0);
+ }
+ }
+ if(ret != null) {
+ synchronized(socketAcceptedLock) {
+ socketAcceptedLock.notifyAll();
+ }
+ }
+ }
+
+ if(_log.shouldLog(Log.DEBUG)) {
+ _log.debug("TIMING: handed out accept result " + ret.hashCode());
+ }
+ return ret;
+ }
+
+ /**
+ * Make the socket available and wait until the client app accepts it, or until
+ * the given timeout elapses. This doesn't have any limits on the queue size -
+ * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
+ *
+ * @param timeoutMs how long to wait until accept
+ * @return true if the socket was accepted, false if the timeout expired
+ * or the socket was closed
+ */
+ public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
+ if(_log.shouldLog(Log.DEBUG)) {
+ _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
+ }
+ if(closing) {
+ if(_log.shouldLog(Log.WARN)) {
+ _log.warn("Already closing the socket");
+ }
+ return false;
+ }
+
+ Clock clock = I2PAppContext.getGlobalContext().clock();
+ long start = clock.now();
+ long end = start + timeoutMs;
+ pendingSockets.add(s);
+ synchronized(socketAddedLock) {
+ socketAddedLock.notifyAll();
+ }
+
+ // keep looping until the socket has been grabbed by the accept()
+ // (or the expiration passes, or the socket is closed)
+ while(pendingSockets.contains(s)) {
+ long now = clock.now();
+ if(now >= end) {
+ if(_log.shouldLog(Log.INFO)) {
+ _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
+ }
+ pendingSockets.remove(s);
+ return false;
+ }
+ if(closing) {
+ if(_log.shouldLog(Log.WARN)) {
+ _log.warn("Server socket closed while waiting for accept");
+ }
+ pendingSockets.remove(s);
+ return false;
+ }
+ long remaining = end - now;
+ try {
+ synchronized(socketAcceptedLock) {
+ socketAcceptedLock.wait(remaining);
+ }
+ } catch(InterruptedException ie) {
+ }
+ }
+ long now = clock.now();
+ if(_log.shouldLog(Log.DEBUG)) {
+ _log.info("Socket accepted after " + (now - start) + "ms for socket " + s.toString());
+ }
+ return true;
+ }
+
+ public void close() {
+ closing = true;
+ // let anyone .accept()ing know to fsck off
+ synchronized(socketAddedLock) {
+ socketAddedLock.notifyAll();
+ }
+ // let anyone addWaitForAccept()ing know to fsck off
+ synchronized(socketAcceptedLock) {
+ socketAcceptedLock.notifyAll();
+ }
+ }
+
+ public I2PSocketManager getManager() {
+ return mgr;
+ }
}
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java
index c8b566190..9f12be6c4 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java
@@ -5,6 +5,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
+import java.net.SocketTimeoutException;
import java.util.Properties;
import net.i2p.I2PAppContext;
@@ -20,173 +21,203 @@ import net.i2p.util.Log;
*
*/
public class StreamSinkServer {
- private Log _log;
- private String _sinkDir;
- private String _destFile;
- private String _i2cpHost;
- private int _i2cpPort;
- private int _handlers;
-
- /**
- * Create but do not start the streaming server.
- *
- * @param sinkDir Directory to store received files in
- * @param ourDestFile filename to write our binary destination to
- */
- public StreamSinkServer(String sinkDir, String ourDestFile) {
- this(sinkDir, ourDestFile, null, -1, 3);
- }
- public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
- _sinkDir = sinkDir;
- _destFile = ourDestFile;
- _i2cpHost = i2cpHost;
- _i2cpPort = i2cpPort;
- _handlers = handlers;
- _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
- }
-
- /**
- * Actually fire up the server - this call blocks forever (or until the server
- * socket closes)
- *
- */
- public void runServer() {
- I2PSocketManager mgr = null;
- if (_i2cpHost != null)
- mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
- else
- mgr = I2PSocketManagerFactory.createManager();
- Destination dest = mgr.getSession().getMyDestination();
- if (_log.shouldLog(Log.INFO))
- _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
- FileOutputStream fos = null;
- try {
- fos = new FileOutputStream(_destFile);
- dest.writeBytes(fos);
- } catch (IOException ioe) {
- _log.error("Error writing out our destination to " + _destFile, ioe);
- return;
- } catch (DataFormatException dfe) {
- _log.error("Error formatting the destination", dfe);
- return;
- } finally {
- if (fos != null) try { fos.close(); } catch (IOException ioe) {}
- }
-
- I2PServerSocket sock = mgr.getServerSocket();
- startup(sock);
- }
-
- public void startup(I2PServerSocket sock) {
- for (int i = 0; i < _handlers; i++) {
- I2PThread t = new I2PThread(new ClientRunner(sock));
- t.setName("Handler " + i);
- t.setDaemon(false);
- t.start();
- }
- }
-
- /**
- * Actually deal with a client - pull anything they send us and write it to a file.
- *
- */
- private class ClientRunner implements Runnable {
- private I2PServerSocket _socket;
- public ClientRunner(I2PServerSocket socket) {
- _socket = socket;
- }
- public void run() {
- while (true) {
- try {
- I2PSocket socket = _socket.accept();
- if (socket != null)
- handle(socket);
- } catch (I2PException ie) {
- _log.error("Error accepting connection", ie);
- return;
- } catch (ConnectException ce) {
- _log.error("Connection already dropped", ce);
- return;
- }
- }
- }
-
- private void handle(I2PSocket sock) {
- FileOutputStream fos = null;
- try {
- File sink = new File(_sinkDir);
- if (!sink.exists())
- sink.mkdirs();
- File cur = File.createTempFile("clientSink", ".dat", sink);
- fos = new FileOutputStream(cur);
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Writing to " + cur.getAbsolutePath());
- } catch (IOException ioe) {
- _log.error("Error creating sink", ioe);
- return;
- }
-
- long start = System.currentTimeMillis();
- try {
- InputStream in = sock.getInputStream();
- byte buf[] = new byte[4096];
- long written = 0;
- int read = 0;
- while ( (read = in.read(buf)) != -1) {
- //_fos.write(buf, 0, read);
- written += read;
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("read and wrote " + read + " (" + written + ")");
- }
- fos.write(("written: [" + written + "]\n").getBytes());
- long lifetime = System.currentTimeMillis() - start;
- _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
- } catch (IOException ioe) {
- _log.error("Error writing the sink", ioe);
- } finally {
- if (fos != null) try { fos.close(); } catch (IOException ioe) {}
- if (sock != null) try { sock.close(); } catch (IOException ioe) {}
- _log.debug("Client socket closed");
- }
- }
- }
-
- /**
- * Fire up the streaming server. Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]
- *
- * - sinkDir: Directory to store received files in
- * - ourDestFile: filename to write our binary destination to
- * - numHandlers: how many concurrent connections to handle
- *
- */
- public static void main(String args[]) {
- StreamSinkServer server = null;
- switch (args.length) {
- case 0:
- server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
- break;
- case 2:
- server = new StreamSinkServer(args[0], args[1]);
- break;
- case 4:
- case 5:
- int handlers = 3;
- if (args.length == 5) {
- try {
- handlers = Integer.parseInt(args[4]);
- } catch (NumberFormatException nfe) {}
- }
- try {
- int port = Integer.parseInt(args[1]);
- server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
- } catch (NumberFormatException nfe) {
- System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
- }
- break;
- default:
- System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
- }
- if (server != null)
- server.runServer();
- }
+
+ private Log _log;
+ private String _sinkDir;
+ private String _destFile;
+ private String _i2cpHost;
+ private int _i2cpPort;
+ private int _handlers;
+
+ /**
+ * Create but do not start the streaming server.
+ *
+ * @param sinkDir Directory to store received files in
+ * @param ourDestFile filename to write our binary destination to
+ */
+ public StreamSinkServer(String sinkDir, String ourDestFile) {
+ this(sinkDir, ourDestFile, null, -1, 3);
+ }
+
+ public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
+ _sinkDir = sinkDir;
+ _destFile = ourDestFile;
+ _i2cpHost = i2cpHost;
+ _i2cpPort = i2cpPort;
+ _handlers = handlers;
+ _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
+ }
+
+ /**
+ * Actually fire up the server - this call blocks forever (or until the server
+ * socket closes)
+ *
+ */
+ public void runServer() {
+ I2PSocketManager mgr = null;
+ if(_i2cpHost != null) {
+ mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
+ } else {
+ mgr = I2PSocketManagerFactory.createManager();
+ }
+ Destination dest = mgr.getSession().getMyDestination();
+ if(_log.shouldLog(Log.INFO)) {
+ _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
+ }
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(_destFile);
+ dest.writeBytes(fos);
+ } catch(IOException ioe) {
+ _log.error("Error writing out our destination to " + _destFile, ioe);
+ return;
+ } catch(DataFormatException dfe) {
+ _log.error("Error formatting the destination", dfe);
+ return;
+ } finally {
+ if(fos != null) {
+ try {
+ fos.close();
+ } catch(IOException ioe) {
+ }
+ }
+ }
+
+ I2PServerSocket sock = mgr.getServerSocket();
+ startup(sock);
+ }
+
+ public void startup(I2PServerSocket sock) {
+ for(int i = 0; i < _handlers; i++) {
+ I2PThread t = new I2PThread(new ClientRunner(sock));
+ t.setName("Handler " + i);
+ t.setDaemon(false);
+ t.start();
+ }
+ }
+
+ /**
+ * Actually deal with a client - pull anything they send us and write it to a file.
+ *
+ */
+ private class ClientRunner implements Runnable {
+
+ private I2PServerSocket _socket;
+
+ public ClientRunner(I2PServerSocket socket) {
+ _socket = socket;
+ }
+
+ public void run() {
+ while(true) {
+ try {
+ I2PSocket socket = _socket.accept();
+ if(socket != null) {
+ handle(socket);
+ }
+ } catch(I2PException ie) {
+ _log.error("Error accepting connection", ie);
+ return;
+ } catch(ConnectException ce) {
+ _log.error("Connection already dropped", ce);
+ return;
+ } catch(SocketTimeoutException ste) {
+ // ignored
+ }
+ }
+ }
+
+ private void handle(I2PSocket sock) {
+ FileOutputStream fos = null;
+ try {
+ File sink = new File(_sinkDir);
+ if(!sink.exists()) {
+ sink.mkdirs();
+ }
+ File cur = File.createTempFile("clientSink", ".dat", sink);
+ fos = new FileOutputStream(cur);
+ if(_log.shouldLog(Log.DEBUG)) {
+ _log.debug("Writing to " + cur.getAbsolutePath());
+ }
+ } catch(IOException ioe) {
+ _log.error("Error creating sink", ioe);
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+ try {
+ InputStream in = sock.getInputStream();
+ byte buf[] = new byte[4096];
+ long written = 0;
+ int read = 0;
+ while((read = in.read(buf)) != -1) {
+ //_fos.write(buf, 0, read);
+ written += read;
+ if(_log.shouldLog(Log.DEBUG)) {
+ _log.debug("read and wrote " + read + " (" + written + ")");
+ }
+ }
+ fos.write(("written: [" + written + "]\n").getBytes());
+ long lifetime = System.currentTimeMillis() - start;
+ _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
+ } catch(IOException ioe) {
+ _log.error("Error writing the sink", ioe);
+ } finally {
+ if(fos != null) {
+ try {
+ fos.close();
+ } catch(IOException ioe) {
+ }
+ }
+ if(sock != null) {
+ try {
+ sock.close();
+ } catch(IOException ioe) {
+ }
+ }
+ _log.debug("Client socket closed");
+ }
+ }
+ }
+
+ /**
+ * Fire up the streaming server. Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]
+ *
+ * - sinkDir: Directory to store received files in
+ * - ourDestFile: filename to write our binary destination to
+ * - numHandlers: how many concurrent connections to handle
+ *
+ */
+ public static void main(String args[]) {
+ StreamSinkServer server = null;
+ switch(args.length) {
+ case 0:
+ server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
+ break;
+ case 2:
+ server = new StreamSinkServer(args[0], args[1]);
+ break;
+ case 4:
+ case 5:
+ int handlers = 3;
+ if(args.length == 5) {
+ try {
+ handlers = Integer.parseInt(args[4]);
+ } catch(NumberFormatException nfe) {
+ }
+ }
+ try {
+ int port = Integer.parseInt(args[1]);
+ server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
+ } catch(NumberFormatException nfe) {
+ System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
+ }
+ break;
+ default:
+ System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
+ }
+ if(server != null) {
+ server.runServer();
+ }
+ }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
index 4960f1a22..f05ae1c8c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
@@ -1,5 +1,6 @@
package net.i2p.client.streaming;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
index dcc93c5ec..08d794877 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
@@ -21,393 +21,459 @@ import net.i2p.util.SimpleTimer;
*
*/
public class ConnectionManager {
- private I2PAppContext _context;
- private Log _log;
- private I2PSession _session;
- private MessageHandler _messageHandler;
- private PacketHandler _packetHandler;
- private ConnectionHandler _connectionHandler;
- private PacketQueue _outboundQueue;
- private SchedulerChooser _schedulerChooser;
- private ConnectionPacketHandler _conPacketHandler;
- /** Inbound stream ID (Long) to Connection map */
- private Map _connectionByInboundId;
- /** Ping ID (Long) to PingRequest */
- private Map _pendingPings;
- private boolean _allowIncoming;
- private int _maxConcurrentStreams;
- private ConnectionOptions _defaultOptions;
- private volatile int _numWaiting;
- private Object _connectionLock;
-
- public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
- _context = context;
- _log = context.logManager().getLog(ConnectionManager.class);
- _connectionByInboundId = new HashMap(32);
- _pendingPings = new HashMap(4);
- _connectionLock = new Object();
- _messageHandler = new MessageHandler(context, this);
- _packetHandler = new PacketHandler(context, this);
- _connectionHandler = new ConnectionHandler(context, this);
- _schedulerChooser = new SchedulerChooser(context);
- _conPacketHandler = new ConnectionPacketHandler(context);
- _session = session;
- session.setSessionListener(_messageHandler);
- _outboundQueue = new PacketQueue(context, session, this);
- _allowIncoming = false;
- _maxConcurrentStreams = maxConcurrent;
- _defaultOptions = defaultOptions;
- _numWaiting = 0;
- _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
- }
-
- Connection getConnectionByInboundId(long id) {
- synchronized (_connectionLock) {
- return (Connection)_connectionByInboundId.get(new Long(id));
- }
- }
- /**
- * not guaranteed to be unique, but in case we receive more than one packet
- * on an inbound connection that we havent ack'ed yet...
- */
- Connection getConnectionByOutboundId(long id) {
- synchronized (_connectionLock) {
- for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
- Connection con = (Connection)iter.next();
- if (DataHelper.eq(con.getSendStreamId(), id))
- return con;
- }
- }
- return null;
- }
-
- public void setAllowIncomingConnections(boolean allow) {
- _connectionHandler.setActive(allow);
- }
- /** should we acceot connections, or just reject everyone? */
- public boolean getAllowIncomingConnections() {
- return _connectionHandler.getActive();
- }
-
- /**
- * Create a new connection based on the SYN packet we received.
- *
- * @return created Connection with the packet's data already delivered to
- * it, or null if the syn's streamId was already taken
- */
- public Connection receiveConnection(Packet synPacket) {
- Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
- long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
- boolean reject = false;
- int active = 0;
- int total = 0;
- synchronized (_connectionLock) {
- total = _connectionByInboundId.size();
- for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
- if ( ((Connection)iter.next()).getIsConnected() )
- active++;
- }
- if (locked_tooManyStreams()) {
- reject = true;
- } else {
- while (true) {
- Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
- if (oldCon == null) {
- break;
- } else {
- _connectionByInboundId.put(new Long(receiveId), oldCon);
- // receiveId already taken, try another
- receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
- }
- }
- }
- }
-
- _context.statManager().addRateData("stream.receiveActive", active, total);
-
- if (reject) {
- if (_log.shouldLog(Log.WARN))
- _log.warn("Refusing connection since we have exceeded our max of "
- + _maxConcurrentStreams + " connections");
- PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
- reply.setFlag(Packet.FLAG_RESET);
- reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
- reply.setAckThrough(synPacket.getSequenceNum());
- reply.setSendStreamId(synPacket.getReceiveStreamId());
- reply.setReceiveStreamId(0);
- reply.setOptionalFrom(_session.getMyDestination());
- // this just sends the packet - no retries or whatnot
- _outboundQueue.enqueue(reply);
- return null;
- }
-
- con.setReceiveStreamId(receiveId);
- try {
- con.getPacketHandler().receivePacket(synPacket, con);
- } catch (I2PException ie) {
- synchronized (_connectionLock) {
- _connectionByInboundId.remove(new Long(receiveId));
- }
- return null;
- }
-
- _context.statManager().addRateData("stream.connectionReceived", 1, 0);
- return con;
- }
-
- private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
-
- /**
- * Build a new connection to the given peer. This blocks if there is no
- * connection delay, otherwise it returns immediately.
- *
- * @return new connection, or null if we have exceeded our limit
- */
- public Connection connect(Destination peer, ConnectionOptions opts) {
- Connection con = null;
- long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
- long expiration = _context.clock().now() + opts.getConnectTimeout();
- if (opts.getConnectTimeout() <= 0)
- expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
- _numWaiting++;
- while (true) {
- long remaining = expiration - _context.clock().now();
- if (remaining <= 0) {
- if (_log.shouldLog(Log.WARN))
- _log.warn("Refusing to connect since we have exceeded our max of "
- + _maxConcurrentStreams + " connections");
- _numWaiting--;
- return null;
- }
- boolean reject = false;
- synchronized (_connectionLock) {
- if (locked_tooManyStreams()) {
- // allow a full buffer of pending/waiting streams
- if (_numWaiting > _maxConcurrentStreams) {
- if (_log.shouldLog(Log.WARN))
- _log.warn("Refusing connection since we have exceeded our max of "
- + _maxConcurrentStreams + " and there are " + _numWaiting
- + " waiting already");
- _numWaiting--;
- return null;
- }
-
- // no remaining streams, lets wait a bit
- try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
- } else {
- con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
- con.setRemotePeer(peer);
-
- while (_connectionByInboundId.containsKey(new Long(receiveId))) {
- receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
- }
- _connectionByInboundId.put(new Long(receiveId), con);
- break; // stop looping as a psuedo-wait
- }
- }
- }
- // ok we're in...
- con.setReceiveStreamId(receiveId);
- con.eventOccurred();
-
- _log.debug("Connect() conDelay = " + opts.getConnectDelay());
- if (opts.getConnectDelay() <= 0) {
- con.waitForConnect();
- }
- if (_numWaiting > 0)
- _numWaiting--;
-
- _context.statManager().addRateData("stream.connectionCreated", 1, 0);
- return con;
- }
+ private I2PAppContext _context;
+ private Log _log;
+ private I2PSession _session;
+ private MessageHandler _messageHandler;
+ private PacketHandler _packetHandler;
+ private ConnectionHandler _connectionHandler;
+ private PacketQueue _outboundQueue;
+ private SchedulerChooser _schedulerChooser;
+ private ConnectionPacketHandler _conPacketHandler;
+ /** Inbound stream ID (Long) to Connection map */
+ private Map _connectionByInboundId;
+ /** Ping ID (Long) to PingRequest */
+ private Map _pendingPings;
+ private boolean _allowIncoming;
+ private int _maxConcurrentStreams;
+ private ConnectionOptions _defaultOptions;
+ private volatile int _numWaiting;
+ private Object _connectionLock;
+ private long SoTimeout;
- private boolean locked_tooManyStreams() {
- if (_maxConcurrentStreams <= 0) return false;
- if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
- int active = 0;
- for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
- Connection con = (Connection)iter.next();
- if (con.getIsConnected())
- active++;
- }
-
- if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) )
- _log.info("More than 100 connections! " + active
- + " total: " + _connectionByInboundId.size());
+ public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
+ _context = context;
+ _log = context.logManager().getLog(ConnectionManager.class);
+ _connectionByInboundId = new HashMap(32);
+ _pendingPings = new HashMap(4);
+ _connectionLock = new Object();
+ _messageHandler = new MessageHandler(context, this);
+ _packetHandler = new PacketHandler(context, this);
+ _connectionHandler = new ConnectionHandler(context, this);
+ _schedulerChooser = new SchedulerChooser(context);
+ _conPacketHandler = new ConnectionPacketHandler(context);
+ _session = session;
+ session.setSessionListener(_messageHandler);
+ _outboundQueue = new PacketQueue(context, session, this);
+ _allowIncoming = false;
+ _maxConcurrentStreams = maxConcurrent;
+ _defaultOptions = defaultOptions;
+ _numWaiting = 0;
+ /** Socket timeout for accept() */
+ SoTimeout = -1;
- return (active >= _maxConcurrentStreams);
- }
-
- public MessageHandler getMessageHandler() { return _messageHandler; }
- public PacketHandler getPacketHandler() { return _packetHandler; }
- public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
- public I2PSession getSession() { return _session; }
- public PacketQueue getPacketQueue() { return _outboundQueue; }
-
- /**
- * Something b0rked hard, so kill all of our connections without mercy.
- * Don't bother sending close packets.
- *
- */
- public void disconnectAllHard() {
- synchronized (_connectionLock) {
- for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
- Connection con = (Connection)iter.next();
- con.disconnect(false, false);
- }
- _connectionByInboundId.clear();
- _connectionLock.notifyAll();
- }
- }
-
- /**
- * Drop the (already closed) connection on the floor.
- *
- */
- public void removeConnection(Connection con) {
- boolean removed = false;
- synchronized (_connectionLock) {
- Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
- removed = (o == con);
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Connection removed? " + removed + " remaining: "
- + _connectionByInboundId.size() + ": " + con);
- if (!removed && _log.shouldLog(Log.DEBUG))
- _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
- _connectionLock.notifyAll();
- }
- if (removed) {
- _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
- }
- }
-
- /** return a set of Connection objects */
- public Set listConnections() {
- synchronized (_connectionLock) {
- return new HashSet(_connectionByInboundId.values());
- }
- }
-
- public boolean ping(Destination peer, long timeoutMs) {
- return ping(peer, timeoutMs, true);
- }
- public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
- return ping(peer, timeoutMs, blocking, null, null, null);
- }
- public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
- Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
- PacketLocal packet = new PacketLocal(_context, peer);
- packet.setSendStreamId(id.longValue());
- packet.setFlag(Packet.FLAG_ECHO);
- packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
- packet.setOptionalFrom(_session.getMyDestination());
- if ( (keyToUse != null) && (tagsToSend != null) ) {
- packet.setKeyUsed(keyToUse);
- packet.setTagsSent(tagsToSend);
- }
-
- PingRequest req = new PingRequest(peer, packet, notifier);
-
- synchronized (_pendingPings) {
- _pendingPings.put(id, req);
- }
-
- _outboundQueue.enqueue(packet);
- packet.releasePayload();
-
- if (blocking) {
- synchronized (req) {
- if (!req.pongReceived())
- try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
- }
-
- synchronized (_pendingPings) {
- _pendingPings.remove(id);
- }
- } else {
- SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
- }
-
- boolean ok = req.pongReceived();
- return ok;
- }
+ _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
+ }
- interface PingNotifier {
- public void pingComplete(boolean ok);
- }
-
- private class PingFailed implements SimpleTimer.TimedEvent {
- private Long _id;
- private PingNotifier _notifier;
- public PingFailed(Long id, PingNotifier notifier) {
- _id = id;
- _notifier = notifier;
- }
-
- public void timeReached() {
- boolean removed = false;
- synchronized (_pendingPings) {
- Object o = _pendingPings.remove(_id);
- if (o != null)
- removed = true;
- }
- if (removed) {
- if (_notifier != null)
- _notifier.pingComplete(false);
- if (_log.shouldLog(Log.INFO))
- _log.info("Ping failed");
- }
- }
- }
-
- private class PingRequest {
- private boolean _ponged;
- private Destination _peer;
- private PacketLocal _packet;
- private PingNotifier _notifier;
- public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
- _ponged = false;
- _peer = peer;
- _packet = packet;
- _notifier = notifier;
- }
- public void pong() {
- _log.debug("Ping successful");
- _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
- synchronized (ConnectionManager.PingRequest.this) {
- _ponged = true;
- ConnectionManager.PingRequest.this.notifyAll();
- }
- if (_notifier != null)
- _notifier.pingComplete(true);
- }
- public boolean pongReceived() { return _ponged; }
- }
-
- void receivePong(long pingId) {
- PingRequest req = null;
- synchronized (_pendingPings) {
- req = (PingRequest)_pendingPings.remove(new Long(pingId));
- }
- if (req != null)
- req.pong();
- }
+ Connection getConnectionByInboundId(long id) {
+ synchronized(_connectionLock) {
+ return (Connection)_connectionByInboundId.get(new Long(id));
+ }
+ }
+
+ /**
+ * not guaranteed to be unique, but in case we receive more than one packet
+ * on an inbound connection that we havent ack'ed yet...
+ */
+ Connection getConnectionByOutboundId(long id) {
+ synchronized(_connectionLock) {
+ for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
+ Connection con = (Connection)iter.next();
+ if(DataHelper.eq(con.getSendStreamId(), id)) {
+ return con;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Set the socket accept() timeout.
+ * @param x
+ */
+ public void MsetSoTimeout(long x) {
+ SoTimeout = x;
+ }
+
+ /**
+ * Get the socket accept() timeout.
+ * @return
+ */
+ public long MgetSoTimeout() {
+ return SoTimeout;
+ }
+
+ public void setAllowIncomingConnections(boolean allow) {
+ _connectionHandler.setActive(allow);
+ }
+
+ /** should we acceot connections, or just reject everyone? */
+ public boolean getAllowIncomingConnections() {
+ return _connectionHandler.getActive();
+ }
+
+ /**
+ * Create a new connection based on the SYN packet we received.
+ *
+ * @return created Connection with the packet's data already delivered to
+ * it, or null if the syn's streamId was already taken
+ */
+ public Connection receiveConnection(Packet synPacket) {
+ Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
+ long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
+ boolean reject = false;
+ int active = 0;
+ int total = 0;
+ synchronized(_connectionLock) {
+ total = _connectionByInboundId.size();
+ for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
+ if(((Connection)iter.next()).getIsConnected()) {
+ active++;
+ }
+ }
+ if(locked_tooManyStreams()) {
+ reject = true;
+ } else {
+ while(true) {
+ Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
+ if(oldCon == null) {
+ break;
+ } else {
+ _connectionByInboundId.put(new Long(receiveId), oldCon);
+ // receiveId already taken, try another
+ receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
+ }
+ }
+ }
+ }
+
+ _context.statManager().addRateData("stream.receiveActive", active, total);
+
+ if(reject) {
+ if(_log.shouldLog(Log.WARN)) {
+ _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections");
+ }
+ PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
+ reply.setFlag(Packet.FLAG_RESET);
+ reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
+ reply.setAckThrough(synPacket.getSequenceNum());
+ reply.setSendStreamId(synPacket.getReceiveStreamId());
+ reply.setReceiveStreamId(0);
+ reply.setOptionalFrom(_session.getMyDestination());
+ // this just sends the packet - no retries or whatnot
+ _outboundQueue.enqueue(reply);
+ return null;
+ }
+
+ con.setReceiveStreamId(receiveId);
+ try {
+ con.getPacketHandler().receivePacket(synPacket, con);
+ } catch(I2PException ie) {
+ synchronized(_connectionLock) {
+ _connectionByInboundId.remove(new Long(receiveId));
+ }
+ return null;
+ }
+
+ _context.statManager().addRateData("stream.connectionReceived", 1, 0);
+ return con;
+ }
+ private static final long DEFAULT_STREAM_DELAY_MAX = 10 * 1000;
+
+ /**
+ * Build a new connection to the given peer. This blocks if there is no
+ * connection delay, otherwise it returns immediately.
+ *
+ * @return new connection, or null if we have exceeded our limit
+ */
+ public Connection connect(Destination peer, ConnectionOptions opts) {
+ Connection con = null;
+ long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
+ long expiration = _context.clock().now() + opts.getConnectTimeout();
+ if(opts.getConnectTimeout() <= 0) {
+ expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
+ }
+ _numWaiting++;
+ while(true) {
+ long remaining = expiration - _context.clock().now();
+ if(remaining <= 0) {
+ if(_log.shouldLog(Log.WARN)) {
+ _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections");
+ }
+ _numWaiting--;
+ return null;
+ }
+ boolean reject = false;
+ synchronized(_connectionLock) {
+ if(locked_tooManyStreams()) {
+ // allow a full buffer of pending/waiting streams
+ if(_numWaiting > _maxConcurrentStreams) {
+ if(_log.shouldLog(Log.WARN)) {
+ _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already");
+ }
+ _numWaiting--;
+ return null;
+ }
+
+ // no remaining streams, lets wait a bit
+ try {
+ _connectionLock.wait(remaining);
+ } catch(InterruptedException ie) {
+ }
+ } else {
+ con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
+ con.setRemotePeer(peer);
+
+ while(_connectionByInboundId.containsKey(new Long(receiveId))) {
+ receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
+ }
+ _connectionByInboundId.put(new Long(receiveId), con);
+ break; // stop looping as a psuedo-wait
+ }
+ }
+ }
+
+ // ok we're in...
+ con.setReceiveStreamId(receiveId);
+ con.eventOccurred();
+
+ _log.debug("Connect() conDelay = " + opts.getConnectDelay());
+ if(opts.getConnectDelay() <= 0) {
+ con.waitForConnect();
+ }
+ if(_numWaiting > 0) {
+ _numWaiting--;
+ }
+ _context.statManager().addRateData("stream.connectionCreated", 1, 0);
+ return con;
+ }
+
+ private boolean locked_tooManyStreams() {
+ if(_maxConcurrentStreams <= 0) {
+ return false;
+ }
+ if(_connectionByInboundId.size() < _maxConcurrentStreams) {
+ return false;
+ }
+ int active = 0;
+ for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
+ Connection con = (Connection)iter.next();
+ if(con.getIsConnected()) {
+ active++;
+ }
+ }
+
+ if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) {
+ _log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size());
+ }
+ return (active >= _maxConcurrentStreams);
+ }
+
+ public MessageHandler getMessageHandler() {
+ return _messageHandler;
+ }
+
+ public PacketHandler getPacketHandler() {
+ return _packetHandler;
+ }
+
+ public ConnectionHandler getConnectionHandler() {
+ return _connectionHandler;
+ }
+
+ public I2PSession getSession() {
+ return _session;
+ }
+
+ public PacketQueue getPacketQueue() {
+ return _outboundQueue;
+ }
+
+ /**
+ * Something b0rked hard, so kill all of our connections without mercy.
+ * Don't bother sending close packets.
+ *
+ */
+ public void disconnectAllHard() {
+ synchronized(_connectionLock) {
+ for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
+ Connection con = (Connection)iter.next();
+ con.disconnect(false, false);
+ }
+ _connectionByInboundId.clear();
+ _connectionLock.notifyAll();
+ }
+ }
+
+ /**
+ * Drop the (already closed) connection on the floor.
+ *
+ */
+ public void removeConnection(Connection con) {
+ boolean removed = false;
+ synchronized(_connectionLock) {
+ Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
+ removed = (o == con);
+ if(_log.shouldLog(Log.DEBUG)) {
+ _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con);
+ }
+ if(!removed && _log.shouldLog(Log.DEBUG)) {
+ _log.debug("Failed to remove " + con + "\n" + _connectionByInboundId.values());
+ }
+ _connectionLock.notifyAll();
+ }
+ if(removed) {
+ _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
+ }
+ }
+
+ /** return a set of Connection objects */
+ public Set listConnections() {
+ synchronized(_connectionLock) {
+ return new HashSet(_connectionByInboundId.values());
+ }
+ }
+
+ public boolean ping(Destination peer, long timeoutMs) {
+ return ping(peer, timeoutMs, true);
+ }
+
+ public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
+ return ping(peer, timeoutMs, blocking, null, null, null);
+ }
+
+ public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
+ Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1);
+ PacketLocal packet = new PacketLocal(_context, peer);
+ packet.setSendStreamId(id.longValue());
+ packet.setFlag(Packet.FLAG_ECHO);
+ packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
+ packet.setOptionalFrom(_session.getMyDestination());
+ if((keyToUse != null) && (tagsToSend != null)) {
+ packet.setKeyUsed(keyToUse);
+ packet.setTagsSent(tagsToSend);
+ }
+
+ PingRequest req = new PingRequest(peer, packet, notifier);
+
+ synchronized(_pendingPings) {
+ _pendingPings.put(id, req);
+ }
+
+ _outboundQueue.enqueue(packet);
+ packet.releasePayload();
+
+ if(blocking) {
+ synchronized(req) {
+ if(!req.pongReceived()) {
+ try {
+ req.wait(timeoutMs);
+ } catch(InterruptedException ie) {
+ }
+ }
+ }
+
+ synchronized(_pendingPings) {
+ _pendingPings.remove(id);
+ }
+ } else {
+ SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
+ }
+
+ boolean ok = req.pongReceived();
+ return ok;
+ }
+
+ interface PingNotifier {
+
+ public void pingComplete(boolean ok);
+ }
+
+ private class PingFailed implements SimpleTimer.TimedEvent {
+
+ private Long _id;
+ private PingNotifier _notifier;
+
+ public PingFailed(Long id, PingNotifier notifier) {
+ _id = id;
+ _notifier = notifier;
+ }
+
+ public void timeReached() {
+ boolean removed = false;
+ synchronized(_pendingPings) {
+ Object o = _pendingPings.remove(_id);
+ if(o != null) {
+ removed = true;
+ }
+ }
+ if(removed) {
+ if(_notifier != null) {
+ _notifier.pingComplete(false);
+ }
+ if(_log.shouldLog(Log.INFO)) {
+ _log.info("Ping failed");
+ }
+ }
+ }
+ }
+
+ private class PingRequest {
+
+ private boolean _ponged;
+ private Destination _peer;
+ private PacketLocal _packet;
+ private PingNotifier _notifier;
+
+ public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
+ _ponged = false;
+ _peer = peer;
+ _packet = packet;
+ _notifier = notifier;
+ }
+
+ public void pong() {
+ _log.debug("Ping successful");
+ _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
+ synchronized(ConnectionManager.PingRequest.this) {
+ _ponged = true;
+ ConnectionManager.PingRequest.this.notifyAll();
+ }
+ if(_notifier != null) {
+ _notifier.pingComplete(true);
+ }
+ }
+
+ public boolean pongReceived() {
+ return _ponged;
+ }
+ }
+
+ void receivePong(long pingId) {
+ PingRequest req = null;
+ synchronized(_pendingPings) {
+ req = (PingRequest)_pendingPings.remove(new Long(pingId));
+ }
+ if(req != null) {
+ req.pong();
+ }
+ }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java
index b1a4175f2..b85459f63 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java
@@ -1,5 +1,8 @@
package net.i2p.client.streaming;
+import java.net.SocketTimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import net.i2p.I2PException;
/**
@@ -7,17 +10,46 @@ import net.i2p.I2PException;
*
*/
public class I2PServerSocketFull implements I2PServerSocket {
- private I2PSocketManagerFull _socketManager;
-
- public I2PServerSocketFull(I2PSocketManagerFull mgr) {
- _socketManager = mgr;
- }
-
- public I2PSocket accept() throws I2PException {
- return _socketManager.receiveSocket();
- }
-
- public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); }
-
- public I2PSocketManager getManager() { return _socketManager; }
+
+ private I2PSocketManagerFull _socketManager;
+
+ /**
+ *
+ * @param mgr
+ */
+ public I2PServerSocketFull(I2PSocketManagerFull mgr) {
+ _socketManager = mgr;
+ }
+
+ /**
+ *
+ * @return
+ * @throws net.i2p.I2PException
+ * @throws SocketTimeoutException
+ */
+ public I2PSocket accept() throws I2PException, SocketTimeoutException {
+ return _socketManager.receiveSocket();
+ }
+
+ public long getSoTimeout() {
+ return _socketManager.getConnectionManager().MgetSoTimeout();
+ }
+
+ public void setSoTimeout(long x) {
+ _socketManager.getConnectionManager().MsetSoTimeout(x);
+ }
+ /**
+ * Close the connection.
+ */
+ public void close() {
+ _socketManager.getConnectionManager().setAllowIncomingConnections(false);
+ }
+
+ /**
+ *
+ * @return _socketManager
+ */
+ public I2PSocketManager getManager() {
+ return _socketManager;
+ }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java
index 61dd48757..842cf791b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java
@@ -11,119 +11,139 @@ import net.i2p.data.Destination;
*
*/
public class I2PSocketFull implements I2PSocket {
- private Connection _connection;
- private I2PSocket.SocketErrorListener _listener;
- private Destination _remotePeer;
- private Destination _localPeer;
-
- public I2PSocketFull(Connection con) {
- _connection = con;
- if (con != null) {
- _remotePeer = con.getRemotePeer();
- _localPeer = con.getSession().getMyDestination();
- }
- }
-
- public void close() throws IOException {
- Connection c = _connection;
- if (c == null) return;
- if (c.getIsConnected()) {
- OutputStream out = c.getOutputStream();
- if (out != null) {
- try {
- out.close();
- } catch (IOException ioe) {
- // ignore any write error, as we want to keep on and kill the
- // con (thanks Complication!)
- }
- }
- c.disconnect(true);
- } else {
- //throw new IOException("Not connected");
- }
- destroy();
- }
-
- Connection getConnection() { return _connection; }
-
- public InputStream getInputStream() {
- Connection c = _connection;
- if (c != null)
- return c.getInputStream();
- else
- return null;
- }
-
- public I2PSocketOptions getOptions() {
- Connection c = _connection;
- if (c != null)
- return c.getOptions();
- else
- return null;
- }
-
- public OutputStream getOutputStream() throws IOException {
- Connection c = _connection;
- if (c != null)
- return c.getOutputStream();
- else
- return null;
- }
-
- public Destination getPeerDestination() { return _remotePeer; }
-
- public long getReadTimeout() {
- I2PSocketOptions opts = getOptions();
- if (opts != null)
- return opts.getReadTimeout();
- else
- return -1;
- }
-
- public Destination getThisDestination() { return _localPeer; }
-
- public void setOptions(I2PSocketOptions options) {
- Connection c = _connection;
- if (c == null) return;
-
- if (options instanceof ConnectionOptions)
- c.setOptions((ConnectionOptions)options);
- else
- c.setOptions(new ConnectionOptions(options));
- }
-
- public void setReadTimeout(long ms) {
- Connection c = _connection;
- if (c == null) return;
-
- c.getInputStream().setReadTimeout((int)ms);
- c.getOptions().setReadTimeout(ms);
- }
-
- public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
- _listener = lsnr;
- }
-
- public boolean isClosed() {
- Connection c = _connection;
- return ((c == null) ||
- (!c.getIsConnected()) ||
- (c.getResetReceived()) ||
- (c.getResetSent()));
- }
-
- void destroy() {
- Connection c = _connection;
- _connection = null;
- _listener = null;
- if (c != null)
- c.disconnectComplete();
- }
- public String toString() {
- Connection c = _connection;
- if (c == null)
- return super.toString();
- else
- return c.toString();
- }
+
+ private Connection _connection;
+ private I2PSocket.SocketErrorListener _listener;
+ private Destination _remotePeer;
+ private Destination _localPeer;
+
+ public I2PSocketFull(Connection con) {
+ _connection = con;
+ if(con != null) {
+ _remotePeer = con.getRemotePeer();
+ _localPeer = con.getSession().getMyDestination();
+ }
+ }
+
+
+ public void close() throws IOException {
+ Connection c = _connection;
+ if(c == null) {
+ return;
+ }
+ if(c.getIsConnected()) {
+ OutputStream out = c.getOutputStream();
+ if(out != null) {
+ try {
+ out.close();
+ } catch(IOException ioe) {
+ // ignore any write error, as we want to keep on and kill the
+ // con (thanks Complication!)
+ }
+ }
+ c.disconnect(true);
+ } else {
+ //throw new IOException("Not connected");
+ }
+ destroy();
+ }
+
+ Connection getConnection() {
+ return _connection;
+ }
+
+ public InputStream getInputStream() {
+ Connection c = _connection;
+ if(c != null) {
+ return c.getInputStream();
+ } else {
+ return null;
+ }
+ }
+
+ public I2PSocketOptions getOptions() {
+ Connection c = _connection;
+ if(c != null) {
+ return c.getOptions();
+ } else {
+ return null;
+ }
+ }
+
+ public OutputStream getOutputStream() throws IOException {
+ Connection c = _connection;
+ if(c != null) {
+ return c.getOutputStream();
+ } else {
+ return null;
+ }
+ }
+
+ public Destination getPeerDestination() {
+ return _remotePeer;
+ }
+
+ public long getReadTimeout() {
+ I2PSocketOptions opts = getOptions();
+ if(opts != null) {
+ return opts.getReadTimeout();
+ } else {
+ return -1;
+ }
+ }
+
+ public Destination getThisDestination() {
+ return _localPeer;
+ }
+
+ public void setOptions(I2PSocketOptions options) {
+ Connection c = _connection;
+ if(c == null) {
+ return;
+ }
+ if(options instanceof ConnectionOptions) {
+ c.setOptions((ConnectionOptions)options);
+ } else {
+ c.setOptions(new ConnectionOptions(options));
+ }
+ }
+
+ public void setReadTimeout(long ms) {
+ Connection c = _connection;
+ if(c == null) {
+ return;
+ }
+ c.getInputStream().setReadTimeout((int)ms);
+ c.getOptions().setReadTimeout(ms);
+ }
+
+ public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
+ _listener = lsnr;
+ }
+
+ public boolean isClosed() {
+ Connection c = _connection;
+ return ((c == null) ||
+ (!c.getIsConnected()) ||
+ (c.getResetReceived()) ||
+ (c.getResetSent()));
+ }
+
+ void destroy() {
+ Connection c = _connection;
+ _connection = null;
+ _listener = null;
+ if(c != null) {
+ c.disconnectComplete();
+ }
+ }
+
+ public String toString() {
+ Connection c = _connection;
+ if(c == null) {
+ return super.toString();
+ } else {
+ return c.toString();
+ }
+ }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java
index 7384a4972..b0d1c841a 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java
@@ -1,6 +1,7 @@
package net.i2p.client.streaming;
import java.net.NoRouteToHostException;
+import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
@@ -13,7 +14,6 @@ import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination;
import net.i2p.util.Log;
-
/**
* Centralize the coordination and multiplexing of the local client's streaming.
* There should be one I2PSocketManager for each I2PSession, and if an application
@@ -23,219 +23,317 @@ import net.i2p.util.Log;
*
*/
public class I2PSocketManagerFull implements I2PSocketManager {
- private I2PAppContext _context;
- private Log _log;
- private I2PSession _session;
- private I2PServerSocketFull _serverSocket;
- private ConnectionOptions _defaultOptions;
- private long _acceptTimeout;
- private String _name;
- private int _maxStreams;
- private static int __managerId = 0;
- private ConnectionManager _connectionManager;
-
- /**
- * How long to wait for the client app to accept() before sending back CLOSE?
- * This includes the time waiting in the queue. Currently set to 5 seconds.
- */
- private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
-
- public I2PSocketManagerFull() {
- _context = null;
- _session = null;
- }
- public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
- this();
- init(context, session, opts, name);
- }
-
- /** how many streams will we allow at once? */
- public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
-
- /**
- *
- */
- public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
- _context = context;
- _session = session;
- _log = _context.logManager().getLog(I2PSocketManagerFull.class);
-
- _maxStreams = -1;
- try {
- String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
- _maxStreams = Integer.parseInt(num);
- } catch (NumberFormatException nfe) {
- if (_log.shouldLog(Log.WARN))
- _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
- _maxStreams = -1;
- }
- _name = name + " " + (++__managerId);
- _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
- _defaultOptions = new ConnectionOptions(opts);
- _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
- _serverSocket = new I2PServerSocketFull(this);
-
- if (_log.shouldLog(Log.INFO)) {
- _log.info("Socket manager created. \ndefault options: " + _defaultOptions
- + "\noriginal properties: " + opts);
- }
- }
- public I2PSocketOptions buildOptions() { return buildOptions(null); }
- public I2PSocketOptions buildOptions(Properties opts) {
- ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
- curOpts.setProperties(opts);
- return curOpts;
- }
-
- public I2PSession getSession() {
- return _session;
- }
-
- public ConnectionManager getConnectionManager() {
- return _connectionManager;
- }
+ private I2PAppContext _context;
+ private Log _log;
+ private I2PSession _session;
+ private I2PServerSocketFull _serverSocket;
+ private ConnectionOptions _defaultOptions;
+ private long _acceptTimeout;
+ private String _name;
+ private int _maxStreams;
+ private static int __managerId = 0;
+ private ConnectionManager _connectionManager;
+ /**
+ * How long to wait for the client app to accept() before sending back CLOSE?
+ * This includes the time waiting in the queue. Currently set to 5 seconds.
+ */
+ private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000;
- public I2PSocket receiveSocket() throws I2PException {
- verifySession();
- Connection con = _connectionManager.getConnectionHandler().accept(-1);
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("receiveSocket() called: " + con);
- if (con != null) {
- I2PSocketFull sock = new I2PSocketFull(con);
- con.setSocket(sock);
- return sock;
- } else {
- return null;
- }
- }
-
- /**
- * Ping the specified peer, returning true if they replied to the ping within
- * the timeout specified, false otherwise. This call blocks.
- *
- */
- public boolean ping(Destination peer, long timeoutMs) {
- return _connectionManager.ping(peer, timeoutMs);
- }
+ /**
+ *
+ */
+ public I2PSocketManagerFull() {
+ _context = null;
+ _session = null;
+ }
- /**
- * How long should we wait for the client to .accept() a socket before
- * sending back a NACK/Close?
- *
- * @param ms milliseconds to wait, maximum
- */
- public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
- public long getAcceptTimeout() { return _acceptTimeout; }
+ /**
+ *
+ * @param context
+ * @param session
+ * @param opts
+ * @param name
+ */
+ public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
+ this();
+ init(context, session, opts, name);
+ }
+ /** how many streams will we allow at once? */
+ public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
- public void setDefaultOptions(I2PSocketOptions options) {
- _defaultOptions = new ConnectionOptions((ConnectionOptions) options);
- }
+ /**
+ *
+ *
+ * @param context
+ * @param session
+ * @param opts
+ * @param name
+ */
+ public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
+ _context = context;
+ _session = session;
+ _log = _context.logManager().getLog(I2PSocketManagerFull.class);
- public I2PSocketOptions getDefaultOptions() {
- return _defaultOptions;
- }
+ _maxStreams = -1;
+ try {
+ String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
+ _maxStreams = Integer.parseInt(num);
+ } catch(NumberFormatException nfe) {
+ if(_log.shouldLog(Log.WARN)) {
+ _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
+ }
+ _maxStreams = -1;
+ }
+ _name = name + " " + (++__managerId);
+ _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
+ _defaultOptions = new ConnectionOptions(opts);
+ _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
+ _serverSocket = new I2PServerSocketFull(this);
- public I2PServerSocket getServerSocket() {
- _connectionManager.setAllowIncomingConnections(true);
- return _serverSocket;
- }
+ if(_log.shouldLog(Log.INFO)) {
+ _log.info("Socket manager created. \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts);
+ }
+ }
- private void verifySession() throws I2PException {
- if (!_connectionManager.getSession().isClosed())
- return;
- _connectionManager.getSession().connect();
- }
-
- /**
- * Create a new connected socket (block until the socket is created)
- *
- * @param peer Destination to connect to
- * @param options I2P socket options to be used for connecting
- *
- * @throws NoRouteToHostException if the peer is not found or not reachable
- * @throws I2PException if there is some other I2P-related problem
- */
- public I2PSocket connect(Destination peer, I2PSocketOptions options)
- throws I2PException, NoRouteToHostException {
- verifySession();
- if (options == null)
- options = _defaultOptions;
- ConnectionOptions opts = null;
- if (options instanceof ConnectionOptions)
- opts = new ConnectionOptions((ConnectionOptions)options);
- else
- opts = new ConnectionOptions(options);
-
- if (_log.shouldLog(Log.INFO))
- _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
- + " with options: " + opts);
- Connection con = _connectionManager.connect(peer, opts);
- if (con == null)
- throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
- I2PSocketFull socket = new I2PSocketFull(con);
- con.setSocket(socket);
- if (con.getConnectionError() != null) {
- con.disconnect(false);
- throw new NoRouteToHostException(con.getConnectionError());
- }
- return socket;
- }
+ /**
+ *
+ * @return
+ */
+ public I2PSocketOptions buildOptions() {
+ return buildOptions(null);
+ }
- /**
- * Create a new connected socket (block until the socket is created)
- *
- * @param peer Destination to connect to
- *
- * @throws NoRouteToHostException if the peer is not found or not reachable
- * @throws I2PException if there is some other I2P-related problem
- */
- public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException {
- return connect(peer, _defaultOptions);
- }
+ /**
+ *
+ * @param opts
+ * @return
+ */
+ public I2PSocketOptions buildOptions(Properties opts) {
+ ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
+ curOpts.setProperties(opts);
+ return curOpts;
+ }
- /**
- * Destroy the socket manager, freeing all the associated resources. This
- * method will block untill all the managed sockets are closed.
- *
- */
- public void destroySocketManager() {
- _connectionManager.disconnectAllHard();
- _connectionManager.setAllowIncomingConnections(false);
- // should we destroy the _session too?
- // yes, since the old lib did (and SAM wants it to, and i dont know why not)
- if ( (_session != null) && (!_session.isClosed()) ) {
- try {
- _session.destroySession();
- } catch (I2PSessionException ise) {
- _log.warn("Unable to destroy the session", ise);
- }
- }
- }
+ /**
+ *
+ * @return
+ */
+ public I2PSession getSession() {
+ return _session;
+ }
- /**
- * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
- *
- */
- public Set listSockets() {
- Set connections = _connectionManager.listConnections();
- Set rv = new HashSet(connections.size());
- for (Iterator iter = connections.iterator(); iter.hasNext(); ) {
- Connection con = (Connection)iter.next();
- if (con.getSocket() != null)
- rv.add(con.getSocket());
- }
- return rv;
- }
+ /**
+ *
+ * @return
+ */
+ public ConnectionManager getConnectionManager() {
+ return _connectionManager;
+ }
- public String getName() { return _name; }
- public void setName(String name) { _name = name; }
-
-
- public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
- _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
- }
- public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
- _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
- }
+ /**
+ *
+ * @return
+ * @throws net.i2p.I2PException
+ * @throws java.net.SocketTimeoutException
+ */
+ public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException {
+ verifySession();
+ Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout());
+ if(_log.shouldLog(Log.DEBUG)) {
+ _log.debug("receiveSocket() called: " + con);
+ }
+ if(con != null) {
+ I2PSocketFull sock = new I2PSocketFull(con);
+ con.setSocket(sock);
+ return sock;
+ } else {
+ if(_connectionManager.MgetSoTimeout() == -1) {
+ return null;
+ }
+ throw new SocketTimeoutException("I2PSocket timed out");
+ }
+ }
+
+ /**
+ * Ping the specified peer, returning true if they replied to the ping within
+ * the timeout specified, false otherwise. This call blocks.
+ *
+ *
+ * @param peer
+ * @param timeoutMs
+ * @return
+ */
+ public boolean ping(Destination peer, long timeoutMs) {
+ return _connectionManager.ping(peer, timeoutMs);
+ }
+
+ /**
+ * How long should we wait for the client to .accept() a socket before
+ * sending back a NACK/Close?
+ *
+ * @param ms milliseconds to wait, maximum
+ */
+ public void setAcceptTimeout(long ms) {
+ _acceptTimeout = ms;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public long getAcceptTimeout() {
+ return _acceptTimeout;
+ }
+
+ /**
+ *
+ * @param options
+ */
+ public void setDefaultOptions(I2PSocketOptions options) {
+ _defaultOptions = new ConnectionOptions((ConnectionOptions)options);
+ }
+
+ /**
+ *
+ * @return
+ */
+ public I2PSocketOptions getDefaultOptions() {
+ return _defaultOptions;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public I2PServerSocket getServerSocket() {
+ _connectionManager.setAllowIncomingConnections(true);
+ return _serverSocket;
+ }
+
+ private void verifySession() throws I2PException {
+ if(!_connectionManager.getSession().isClosed()) {
+ return;
+ }
+ _connectionManager.getSession().connect();
+ }
+
+ /**
+ * Create a new connected socket (block until the socket is created)
+ *
+ * @param peer Destination to connect to
+ * @param options I2P socket options to be used for connecting
+ *
+ * @throws NoRouteToHostException if the peer is not found or not reachable
+ * @throws I2PException if there is some other I2P-related problem
+ */
+ public I2PSocket connect(Destination peer, I2PSocketOptions options)
+ throws I2PException, NoRouteToHostException {
+ verifySession();
+ if(options == null) {
+ options = _defaultOptions;
+ }
+ ConnectionOptions opts = null;
+ if(options instanceof ConnectionOptions) {
+ opts = new ConnectionOptions((ConnectionOptions)options);
+ } else {
+ opts = new ConnectionOptions(options);
+ }
+ if(_log.shouldLog(Log.INFO)) {
+ _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts);
+ }
+ Connection con = _connectionManager.connect(peer, opts);
+ if(con == null) {
+ throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
+ }
+ I2PSocketFull socket = new I2PSocketFull(con);
+ con.setSocket(socket);
+ if(con.getConnectionError() != null) {
+ con.disconnect(false);
+ throw new NoRouteToHostException(con.getConnectionError());
+ }
+ return socket;
+ }
+
+ /**
+ * Create a new connected socket (block until the socket is created)
+ *
+ * @param peer Destination to connect to
+ *
+ * @return
+ * @throws NoRouteToHostException if the peer is not found or not reachable
+ * @throws I2PException if there is some other I2P-related problem
+ */
+ public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException {
+ return connect(peer, _defaultOptions);
+ }
+
+ /**
+ * Destroy the socket manager, freeing all the associated resources. This
+ * method will block untill all the managed sockets are closed.
+ *
+ */
+ public void destroySocketManager() {
+ _connectionManager.disconnectAllHard();
+ _connectionManager.setAllowIncomingConnections(false);
+ // should we destroy the _session too?
+ // yes, since the old lib did (and SAM wants it to, and i dont know why not)
+ if((_session != null) && (!_session.isClosed())) {
+ try {
+ _session.destroySession();
+ } catch(I2PSessionException ise) {
+ _log.warn("Unable to destroy the session", ise);
+ }
+ }
+ }
+
+ /**
+ * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
+ *
+ *
+ * @return
+ */
+ public Set listSockets() {
+ Set connections = _connectionManager.listConnections();
+ Set rv = new HashSet(connections.size());
+ for(Iterator iter = connections.iterator(); iter.hasNext();) {
+ Connection con = (Connection)iter.next();
+ if(con.getSocket() != null) {
+ rv.add(con.getSocket());
+ }
+ }
+ return rv;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public String getName() {
+ return _name;
+ }
+
+ /**
+ *
+ * @param name
+ */
+ public void setName(String name) {
+ _name = name;
+ }
+
+ /**
+ *
+ * @param lsnr
+ */
+ public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
+ _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
+ }
+
+ /**
+ *
+ * @param lsnr
+ */
+ public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
+ _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
+ }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java
index 0ea0c83d7..c52c373b1 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java
@@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer;
/**
*
*/
-class RetransmissionTimer extends SimpleTimer {
+public class RetransmissionTimer extends SimpleTimer {
private static final RetransmissionTimer _instance = new RetransmissionTimer();
public static final SimpleTimer getInstance() { return _instance; }
protected RetransmissionTimer() { super("StreamingTimer"); }
diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java
index e3c1b6fbf..c5955c999 100644
--- a/core/java/src/net/i2p/util/Executor.java
+++ b/core/java/src/net/i2p/util/Executor.java
@@ -5,42 +5,59 @@ import java.util.List;
import net.i2p.I2PAppContext;
class Executor implements Runnable {
- private I2PAppContext _context;
- private Log _log;
- private List _readyEvents;
- public Executor(I2PAppContext ctx, Log log, List events) {
- _context = ctx;
- _readyEvents = events;
- }
- public void run() {
- while (true) {
- SimpleTimer.TimedEvent evt = null;
- synchronized (_readyEvents) {
- if (_readyEvents.size() <= 0)
- try { _readyEvents.wait(); } catch (InterruptedException ie) {}
- if (_readyEvents.size() > 0)
- evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
- }
- if (evt != null) {
- long before = _context.clock().now();
- try {
- evt.timeReached();
- } catch (Throwable t) {
- log("wtf, event borked: " + evt, t);
- }
- long time = _context.clock().now() - before;
- if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
- _log.warn("wtf, event execution took " + time + ": " + evt);
- }
- }
- }
-
- private void log(String msg, Throwable t) {
- synchronized (this) {
- if (_log == null)
- _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
- }
- _log.log(Log.CRIT, msg, t);
- }
+ private I2PAppContext _context;
+ private Log _log;
+ private List _readyEvents;
+ private SimpleStore runn;
+
+ public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) {
+ _context = ctx;
+ _readyEvents = events;
+ runn = x;
+ }
+
+ public void run() {
+ while(runn.getAnswer()) {
+ SimpleTimer.TimedEvent evt = null;
+ synchronized(_readyEvents) {
+ if(_readyEvents.size() <= 0) {
+ try {
+ _readyEvents.wait();
+ } catch(InterruptedException ie) {
+ }
+ }
+ if(_readyEvents.size() > 0) {
+ evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
+ }
+ }
+
+ if(evt != null) {
+ long before = _context.clock().now();
+ try {
+ evt.timeReached();
+ } catch(Throwable t) {
+ log("wtf, event borked: " + evt, t);
+ }
+ long time = _context.clock().now() - before;
+ if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) {
+ _log.warn("wtf, event execution took " + time + ": " + evt);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @param msg
+ * @param t
+ */
+ private void log(String msg, Throwable t) {
+ synchronized(this) {
+ if(_log == null) {
+ _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
+ }
+ }
+ _log.log(Log.CRIT, msg, t);
+ }
}
diff --git a/core/java/src/net/i2p/util/SimpleStore.java b/core/java/src/net/i2p/util/SimpleStore.java
new file mode 100644
index 000000000..b73a8e7eb
--- /dev/null
+++ b/core/java/src/net/i2p/util/SimpleStore.java
@@ -0,0 +1,35 @@
+/*
+ * This is free software, do as you please.
+ */
+
+package net.i2p.util;
+
+/**
+ *
+ * @author sponge
+ */
+public class SimpleStore {
+
+ private boolean answer;
+
+ SimpleStore(boolean x) {
+ answer=x;
+ }
+
+ /**
+ * set the answer
+ *
+ * @param x
+ */
+ public void setAnswer(boolean x) {
+ answer = x;
+ }
+ /**
+ *
+ * @return boolean
+ */
+ public boolean getAnswer() {
+ return answer;
+ }
+
+}
diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java
index 9543f72c5..5595fbd5c 100644
--- a/core/java/src/net/i2p/util/SimpleTimer.java
+++ b/core/java/src/net/i2p/util/SimpleTimer.java
@@ -16,211 +16,262 @@ import net.i2p.I2PAppContext;
*
*/
public class SimpleTimer {
- private static final SimpleTimer _instance = new SimpleTimer();
- public static SimpleTimer getInstance() { return _instance; }
- private I2PAppContext _context;
- private Log _log;
- /** event time (Long) to event (TimedEvent) mapping */
- private TreeMap _events;
- /** event (TimedEvent) to event time (Long) mapping */
- private Map _eventTimes;
- private List _readyEvents;
-
- protected SimpleTimer() { this("SimpleTimer"); }
- protected SimpleTimer(String name) {
- _context = I2PAppContext.getGlobalContext();
- _log = _context.logManager().getLog(SimpleTimer.class);
- _events = new TreeMap();
- _eventTimes = new HashMap(256);
- _readyEvents = new ArrayList(4);
- I2PThread runner = new I2PThread(new SimpleTimerRunner());
- runner.setName(name);
- runner.setDaemon(true);
- runner.start();
- for (int i = 0; i < 3; i++) {
- I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents));
- executor.setName(name + "Executor " + i);
- executor.setDaemon(true);
- executor.start();
- }
- }
-
- public void reschedule(TimedEvent event, long timeoutMs) {
- addEvent(event, timeoutMs, false);
- }
-
- /**
- * Queue up the given event to be fired no sooner than timeoutMs from now.
- * However, if this event is already scheduled, the event will be scheduled
- * for the earlier of the two timeouts, which may be before this stated
- * timeout. If this is not the desired behavior, call removeEvent first.
- *
- */
- public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); }
- /**
- * @param useEarliestTime if its already scheduled, use the earlier of the
- * two timeouts, else use the later
- */
- public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
- int totalEvents = 0;
- long now = System.currentTimeMillis();
- long eventTime = now + timeoutMs;
- Long time = new Long(eventTime);
- synchronized (_events) {
- // remove the old scheduled position, then reinsert it
- Long oldTime = (Long)_eventTimes.get(event);
- if (oldTime != null) {
- if (useEarliestTime) {
- if (oldTime.longValue() < eventTime) {
- _events.notifyAll();
- return; // already scheduled for sooner than requested
- } else {
- _events.remove(oldTime);
- }
- } else {
- if (oldTime.longValue() > eventTime) {
- _events.notifyAll();
- return; // already scheduled for later than the given period
- } else {
- _events.remove(oldTime);
- }
- }
- }
- while (_events.containsKey(time))
- time = new Long(time.longValue() + 1);
- _events.put(time, event);
- _eventTimes.put(event, time);
-
- if ( (_events.size() != _eventTimes.size()) ) {
- _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
- for (Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext(); ) {
- TimedEvent evt = (TimedEvent)iter.next();
- Long when = (Long)_eventTimes.get(evt);
- TimedEvent cur = (TimedEvent)_events.get(when);
- if (cur != evt) {
- _log.error("event " + evt + " @ " + when + ": " + cur);
- }
- }
- }
-
- totalEvents = _events.size();
- _events.notifyAll();
- }
- if (time.longValue() > eventTime + 100) {
- if (_log.shouldLog(Log.WARN))
- _log.warn("Lots of timer congestion, had to push " + event + " back "
- + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")");
- }
- long timeToAdd = System.currentTimeMillis() - now;
- if (timeToAdd > 50) {
- if (_log.shouldLog(Log.WARN))
- _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
- }
-
- }
-
- public boolean removeEvent(TimedEvent evt) {
- if (evt == null) return false;
- synchronized (_events) {
- Long when = (Long)_eventTimes.remove(evt);
- if (when != null)
- _events.remove(when);
- return null != when;
- }
- }
-
- /**
- * Simple interface for events to be queued up and notified on expiration
- */
- public interface TimedEvent {
- /**
- * the time requested has been reached (this call should NOT block,
- * otherwise the whole SimpleTimer gets backed up)
- *
- */
- public void timeReached();
- }
-
- private long _occurredTime;
- private long _occurredEventCount;
- private TimedEvent _recentEvents[] = new TimedEvent[5];
-
- private class SimpleTimerRunner implements Runnable {
- public void run() {
- List eventsToFire = new ArrayList(1);
- while (true) {
- try {
- synchronized (_events) {
- //if (_events.size() <= 0)
- // _events.wait();
- //if (_events.size() > 100)
- // _log.warn("> 100 events! " + _events.values());
- long now = System.currentTimeMillis();
- long nextEventDelay = -1;
- Object nextEvent = null;
- while (true) {
- if (_events.size() <= 0) break;
- Long when = (Long)_events.firstKey();
- if (when.longValue() <= now) {
- TimedEvent evt = (TimedEvent)_events.remove(when);
- if (evt != null) {
- _eventTimes.remove(evt);
- eventsToFire.add(evt);
- }
- } else {
- nextEventDelay = when.longValue() - now;
- nextEvent = _events.get(when);
- break;
- }
- }
- if (eventsToFire.size() <= 0) {
- if (nextEventDelay != -1) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
- _events.wait(nextEventDelay);
- } else {
- _events.wait();
- }
- }
- }
- } catch (ThreadDeath td) {
- return; // die
- } catch (InterruptedException ie) {
- // ignore
- } catch (Throwable t) {
- if (_log != null) {
- _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
- } else {
- System.err.println("Uncaught exception in SimpleTimer");
- t.printStackTrace();
- }
- }
-
- long now = System.currentTimeMillis();
- now = now - (now % 1000);
- synchronized (_readyEvents) {
- for (int i = 0; i < eventsToFire.size(); i++)
- _readyEvents.add(eventsToFire.get(i));
- _readyEvents.notifyAll();
- }
+ private static final SimpleTimer _instance = new SimpleTimer();
- if (_occurredTime == now) {
- _occurredEventCount += eventsToFire.size();
- } else {
- _occurredTime = now;
- if (_occurredEventCount > 2500) {
- StringBuffer buf = new StringBuffer(128);
- buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
- buf.append(") in a second!");
- _log.log(Log.WARN, buf.toString());
- }
- _occurredEventCount = 0;
- }
+ public static SimpleTimer getInstance() {
+ return _instance;
+ }
+ private I2PAppContext _context;
+ private Log _log;
+ /** event time (Long) to event (TimedEvent) mapping */
+ private TreeMap _events;
+ /** event (TimedEvent) to event time (Long) mapping */
+ private Map _eventTimes;
+ private List _readyEvents;
+ private SimpleStore runn;
- eventsToFire.clear();
- }
- }
- }
+ /**
+ *
+ */
+ protected SimpleTimer() {
+ this("SimpleTimer");
+ }
+
+ /**
+ *
+ * @param name
+ */
+ protected SimpleTimer(String name) {
+ runn = new SimpleStore(true);
+ _context = I2PAppContext.getGlobalContext();
+ _log = _context.logManager().getLog(SimpleTimer.class);
+ _events = new TreeMap();
+ _eventTimes = new HashMap(256);
+ _readyEvents = new ArrayList(4);
+ I2PThread runner = new I2PThread(new SimpleTimerRunner());
+ runner.setName(name);
+ runner.setDaemon(true);
+ runner.start();
+ for(int i = 0; i < 3; i++) {
+ I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn));
+ executor.setName(name + "Executor " + i);
+ executor.setDaemon(true);
+ executor.start();
+ }
+ }
+
+ /**
+ * Removes the SimpleTimer.
+ */
+ public void removeSimpleTimer() {
+ synchronized(_events) {
+ runn.setAnswer(false);
+ _events.notifyAll();
+ }
+ }
+
+ /**
+ *
+ * @param event
+ * @param timeoutMs
+ */
+ public void reschedule(TimedEvent event, long timeoutMs) {
+ addEvent(event, timeoutMs, false);
+ }
+
+ /**
+ * Queue up the given event to be fired no sooner than timeoutMs from now.
+ * However, if this event is already scheduled, the event will be scheduled
+ * for the earlier of the two timeouts, which may be before this stated
+ * timeout. If this is not the desired behavior, call removeEvent first.
+ *
+ * @param event
+ * @param timeoutMs
+ */
+ public void addEvent(TimedEvent event, long timeoutMs) {
+ addEvent(event, timeoutMs, true);
+ }
+
+ /**
+ * @param event
+ * @param timeoutMs
+ * @param useEarliestTime if its already scheduled, use the earlier of the
+ * two timeouts, else use the later
+ */
+ public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
+ int totalEvents = 0;
+ long now = System.currentTimeMillis();
+ long eventTime = now + timeoutMs;
+ Long time = new Long(eventTime);
+ synchronized(_events) {
+ // remove the old scheduled position, then reinsert it
+ Long oldTime = (Long)_eventTimes.get(event);
+ if(oldTime != null) {
+ if(useEarliestTime) {
+ if(oldTime.longValue() < eventTime) {
+ _events.notifyAll();
+ return; // already scheduled for sooner than requested
+ } else {
+ _events.remove(oldTime);
+ }
+ } else {
+ if(oldTime.longValue() > eventTime) {
+ _events.notifyAll();
+ return; // already scheduled for later than the given period
+ } else {
+ _events.remove(oldTime);
+ }
+ }
+ }
+ while(_events.containsKey(time)) {
+ time = new Long(time.longValue() + 1);
+ }
+ _events.put(time, event);
+ _eventTimes.put(event, time);
+
+ if((_events.size() != _eventTimes.size())) {
+ _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
+ for(Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext();) {
+ TimedEvent evt = (TimedEvent)iter.next();
+ Long when = (Long)_eventTimes.get(evt);
+ TimedEvent cur = (TimedEvent)_events.get(when);
+ if(cur != evt) {
+ _log.error("event " + evt + " @ " + when + ": " + cur);
+ }
+ }
+ }
+
+ totalEvents = _events.size();
+ _events.notifyAll();
+ }
+ if(time.longValue() > eventTime + 100) {
+ if(_log.shouldLog(Log.WARN)) {
+ _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")");
+ }
+ }
+ long timeToAdd = System.currentTimeMillis() - now;
+ if(timeToAdd > 50) {
+ if(_log.shouldLog(Log.WARN)) {
+ _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
+ }
+ }
+
+ }
+
+ /**
+ *
+ * @param evt
+ * @return
+ */
+ public boolean removeEvent(TimedEvent evt) {
+ if(evt == null) {
+ return false;
+ }
+ synchronized(_events) {
+ Long when = (Long)_eventTimes.remove(evt);
+ if(when != null) {
+ _events.remove(when);
+ }
+ return null != when;
+ }
+ }
+
+ /**
+ * Simple interface for events to be queued up and notified on expiration
+ */
+ public interface TimedEvent {
+
+ /**
+ * the time requested has been reached (this call should NOT block,
+ * otherwise the whole SimpleTimer gets backed up)
+ *
+ */
+ public void timeReached();
+ }
+ private long _occurredTime;
+ private long _occurredEventCount;
+ // not used
+ // private TimedEvent _recentEvents[] = new TimedEvent[5];
+ private class SimpleTimerRunner implements Runnable {
+
+ public void run() {
+ List eventsToFire = new ArrayList(1);
+ while(runn.getAnswer()) {
+ try {
+ synchronized(_events) {
+ //if (_events.size() <= 0)
+ // _events.wait();
+ //if (_events.size() > 100)
+ // _log.warn("> 100 events! " + _events.values());
+ long now = System.currentTimeMillis();
+ long nextEventDelay = -1;
+ Object nextEvent = null;
+ while(runn.getAnswer()) {
+ if(_events.size() <= 0) {
+ break;
+ }
+ Long when = (Long)_events.firstKey();
+ if(when.longValue() <= now) {
+ TimedEvent evt = (TimedEvent)_events.remove(when);
+ if(evt != null) {
+ _eventTimes.remove(evt);
+ eventsToFire.add(evt);
+ }
+ } else {
+ nextEventDelay = when.longValue() - now;
+ nextEvent = _events.get(when);
+ break;
+ }
+ }
+ if(eventsToFire.size() <= 0) {
+ if(nextEventDelay != -1) {
+ if(_log.shouldLog(Log.DEBUG)) {
+ _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
+ }
+ _events.wait(nextEventDelay);
+ } else {
+ _events.wait();
+ }
+ }
+ }
+ } catch(InterruptedException ie) {
+ // ignore
+ } catch(Throwable t) {
+ if(_log != null) {
+ _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
+ } else {
+ System.err.println("Uncaught exception in SimpleTimer");
+ t.printStackTrace();
+ }
+ }
+
+ long now = System.currentTimeMillis();
+ now = now - (now % 1000);
+
+ synchronized(_readyEvents) {
+ for(int i = 0; i < eventsToFire.size(); i++) {
+ _readyEvents.add(eventsToFire.get(i));
+ }
+ _readyEvents.notifyAll();
+ }
+
+ if(_occurredTime == now) {
+ _occurredEventCount += eventsToFire.size();
+ } else {
+ _occurredTime = now;
+ if(_occurredEventCount > 2500) {
+ StringBuffer buf = new StringBuffer(128);
+ buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
+ buf.append(") in a second!");
+ _log.log(Log.WARN, buf.toString());
+ }
+ _occurredEventCount = 0;
+ }
+
+ eventsToFire.clear();
+ }
+ }
+ }
}