From fa5c7219d3de5dbfd0bed1472daebba739de0b0b Mon Sep 17 00:00:00 2001 From: sponge Date: Thu, 25 Sep 2008 06:55:04 +0000 Subject: [PATCH] Added {get,set}SOTimeout() to the ServerSocket API, and fixed all the broken mainstream applications depending on it. Fixed a grave bug in SimpleTimer. Fixed Steraming Timer to be public. Fixed a pile of JavaDoc comments, and reformatted the files I touched. --- .../net/i2p/i2ptunnel/I2PTunnelServer.java | 5 +- .../i2p/client/streaming/I2PServerSocket.java | 55 +- .../client/streaming/I2PServerSocketImpl.java | 285 +++--- .../client/streaming/StreamSinkServer.java | 369 ++++---- .../client/streaming/ConnectionHandler.java | 1 + .../client/streaming/ConnectionManager.java | 836 ++++++++++-------- .../client/streaming/I2PServerSocketFull.java | 58 +- .../i2p/client/streaming/I2PSocketFull.java | 250 +++--- .../streaming/I2PSocketManagerFull.java | 508 ++++++----- .../client/streaming/RetransmissionTimer.java | 2 +- core/java/src/net/i2p/util/Executor.java | 91 +- core/java/src/net/i2p/util/SimpleStore.java | 35 + core/java/src/net/i2p/util/SimpleTimer.java | 457 +++++----- 13 files changed, 1673 insertions(+), 1279 deletions(-) create mode 100644 core/java/src/net/i2p/util/SimpleStore.java diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 014d91e9b..40566fe41 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -12,6 +12,7 @@ import java.net.ConnectException; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.Iterator; import java.util.Properties; @@ -219,7 +220,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { if (_log.shouldLog(Log.ERROR)) _log.error("Error accepting", ce); // not killing the server.. - } + } catch(SocketTimeoutException ste) { + // ignored, we never set the timeout + } } } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index 726d462ce..7c9927395 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -2,6 +2,7 @@ package net.i2p.client.streaming; import java.net.ConnectException; +import java.net.SocketTimeoutException; import net.i2p.I2PException; /** @@ -9,26 +10,40 @@ import net.i2p.I2PException; * */ public interface I2PServerSocket { - /** - * Closes the socket. - */ - public void close() throws I2PException; - /** - * Waits for the next socket connecting. If a remote user tried to make a - * connection and the local application wasn't .accept()ing new connections, - * they should get refused (if .accept() doesnt occur in some small period) - * - * @return a connected I2PSocket - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - * @throws ConnectException if the I2PServerSocket is closed - */ - public I2PSocket accept() throws I2PException, ConnectException; + /** + * Closes the socket. + */ + public void close() throws I2PException; - /** - * Access the manager which is coordinating the server socket - */ - public I2PSocketManager getManager(); + /** + * Waits for the next socket connecting. If a remote user tried to make a + * connection and the local application wasn't .accept()ing new connections, + * they should get refused (if .accept() doesnt occur in some small period) + * + * @return a connected I2PSocket + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws ConnectException if the I2PServerSocket is closed + * @throws SocketTimeoutException + */ + public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; + + /** + * Set Sock Option accept timeout + * @param x + */ + public void setSoTimeout(long x); + + /** + * Get Sock Option accept timeout + * @return timeout + */ + public long getSoTimeout(); + + /** + * Access the manager which is coordinating the server socket + */ + public I2PSocketManager getManager(); } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 965ba31bf..2e3dfdb6b 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -17,134 +17,159 @@ import net.i2p.util.Log; * */ class I2PServerSocketImpl implements I2PServerSocket { - private final static Log _log = new Log(I2PServerSocketImpl.class); - private I2PSocketManager mgr; - /** list of sockets waiting for the client to accept them */ - private List pendingSockets = Collections.synchronizedList(new ArrayList(4)); - - /** have we been closed */ - private volatile boolean closing = false; - - /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ - private Object socketAcceptedLock = new Object(); - /** lock on this when adding a new socket to the pending list, and wait on it accordingly */ - private Object socketAddedLock = new Object(); - - public I2PServerSocketImpl(I2PSocketManager mgr) { - this.mgr = mgr; - } - - /** - * Waits for the next socket connecting. If a remote user tried to make a - * connection and the local application wasn't .accept()ing new connections, - * they should get refused (if .accept() doesnt occur in some small period - - * currently 5 seconds) - * - * @return a connected I2PSocket - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - * @throws ConnectException if the I2PServerSocket is closed - */ - public I2PSocket accept() throws I2PException, ConnectException { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("accept() called, pending: " + pendingSockets.size()); - - I2PSocket ret = null; - - while ( (ret == null) && (!closing) ){ - while (pendingSockets.size() <= 0) { - if (closing) throw new ConnectException("I2PServerSocket closed"); - try { - synchronized(socketAddedLock) { - socketAddedLock.wait(); - } - } catch (InterruptedException ie) {} - } - synchronized (pendingSockets) { - if (pendingSockets.size() > 0) { - ret = (I2PSocket)pendingSockets.remove(0); - } - } - if (ret != null) { - synchronized (socketAcceptedLock) { - socketAcceptedLock.notifyAll(); - } - } - } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("TIMING: handed out accept result " + ret.hashCode()); - return ret; - } - - /** - * Make the socket available and wait until the client app accepts it, or until - * the given timeout elapses. This doesn't have any limits on the queue size - - * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse) - * - * @param timeoutMs how long to wait until accept - * @return true if the socket was accepted, false if the timeout expired - * or the socket was closed - */ - public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); - - if (closing) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Already closing the socket"); - return false; - } - - Clock clock = I2PAppContext.getGlobalContext().clock(); - long start = clock.now(); - long end = start + timeoutMs; - pendingSockets.add(s); - synchronized (socketAddedLock) { - socketAddedLock.notifyAll(); - } - - // keep looping until the socket has been grabbed by the accept() - // (or the expiration passes, or the socket is closed) - while (pendingSockets.contains(s)) { - long now = clock.now(); - if (now >= end) { - if (_log.shouldLog(Log.INFO)) - _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); - pendingSockets.remove(s); - return false; - } - if (closing) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Server socket closed while waiting for accept"); - pendingSockets.remove(s); - return false; - } - long remaining = end - now; - try { - synchronized (socketAcceptedLock) { - socketAcceptedLock.wait(remaining); - } - } catch (InterruptedException ie) {} - } - long now = clock.now(); - if (_log.shouldLog(Log.DEBUG)) - _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString()); - return true; - } - - public void close() { - closing = true; - // let anyone .accept()ing know to fsck off - synchronized (socketAddedLock) { - socketAddedLock.notifyAll(); - } - // let anyone addWaitForAccept()ing know to fsck off - synchronized (socketAcceptedLock) { - socketAcceptedLock.notifyAll(); - } - } - - public I2PSocketManager getManager() { return mgr; } + + private final static Log _log = new Log(I2PServerSocketImpl.class); + private I2PSocketManager mgr; + /** list of sockets waiting for the client to accept them */ + private List pendingSockets = Collections.synchronizedList(new ArrayList(4)); + /** have we been closed */ + private volatile boolean closing = false; + /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ + private Object socketAcceptedLock = new Object(); + /** lock on this when adding a new socket to the pending list, and wait on it accordingly */ + private Object socketAddedLock = new Object(); + + /** + * Set Sock Option accept timeout stub, does nothing + * @param x + */ + public void setSoTimeout(long x) { + } + + /** + * Get Sock Option accept timeout stub, does nothing + * @return timeout + */ + public long getSoTimeout() { + return -1; + } + + public I2PServerSocketImpl(I2PSocketManager mgr) { + this.mgr = mgr; + } + + /** + * Waits for the next socket connecting. If a remote user tried to make a + * connection and the local application wasn't .accept()ing new connections, + * they should get refused (if .accept() doesnt occur in some small period - + * currently 5 seconds) + * + * @return a connected I2PSocket + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws ConnectException if the I2PServerSocket is closed + */ + public I2PSocket accept() throws I2PException, ConnectException { + if(_log.shouldLog(Log.DEBUG)) { + _log.debug("accept() called, pending: " + pendingSockets.size()); + } + I2PSocket ret = null; + + while((ret == null) && (!closing)) { + while(pendingSockets.size() <= 0) { + if(closing) { + throw new ConnectException("I2PServerSocket closed"); + } + try { + synchronized(socketAddedLock) { + socketAddedLock.wait(); + } + } catch(InterruptedException ie) { + } + } + synchronized(pendingSockets) { + if(pendingSockets.size() > 0) { + ret = (I2PSocket)pendingSockets.remove(0); + } + } + if(ret != null) { + synchronized(socketAcceptedLock) { + socketAcceptedLock.notifyAll(); + } + } + } + + if(_log.shouldLog(Log.DEBUG)) { + _log.debug("TIMING: handed out accept result " + ret.hashCode()); + } + return ret; + } + + /** + * Make the socket available and wait until the client app accepts it, or until + * the given timeout elapses. This doesn't have any limits on the queue size - + * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse) + * + * @param timeoutMs how long to wait until accept + * @return true if the socket was accepted, false if the timeout expired + * or the socket was closed + */ + public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { + if(_log.shouldLog(Log.DEBUG)) { + _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); + } + if(closing) { + if(_log.shouldLog(Log.WARN)) { + _log.warn("Already closing the socket"); + } + return false; + } + + Clock clock = I2PAppContext.getGlobalContext().clock(); + long start = clock.now(); + long end = start + timeoutMs; + pendingSockets.add(s); + synchronized(socketAddedLock) { + socketAddedLock.notifyAll(); + } + + // keep looping until the socket has been grabbed by the accept() + // (or the expiration passes, or the socket is closed) + while(pendingSockets.contains(s)) { + long now = clock.now(); + if(now >= end) { + if(_log.shouldLog(Log.INFO)) { + _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); + } + pendingSockets.remove(s); + return false; + } + if(closing) { + if(_log.shouldLog(Log.WARN)) { + _log.warn("Server socket closed while waiting for accept"); + } + pendingSockets.remove(s); + return false; + } + long remaining = end - now; + try { + synchronized(socketAcceptedLock) { + socketAcceptedLock.wait(remaining); + } + } catch(InterruptedException ie) { + } + } + long now = clock.now(); + if(_log.shouldLog(Log.DEBUG)) { + _log.info("Socket accepted after " + (now - start) + "ms for socket " + s.toString()); + } + return true; + } + + public void close() { + closing = true; + // let anyone .accept()ing know to fsck off + synchronized(socketAddedLock) { + socketAddedLock.notifyAll(); + } + // let anyone addWaitForAccept()ing know to fsck off + synchronized(socketAcceptedLock) { + socketAcceptedLock.notifyAll(); + } + } + + public I2PSocketManager getManager() { + return mgr; + } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java index c8b566190..9f12be6c4 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -5,6 +5,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.util.Properties; import net.i2p.I2PAppContext; @@ -20,173 +21,203 @@ import net.i2p.util.Log; * */ public class StreamSinkServer { - private Log _log; - private String _sinkDir; - private String _destFile; - private String _i2cpHost; - private int _i2cpPort; - private int _handlers; - - /** - * Create but do not start the streaming server. - * - * @param sinkDir Directory to store received files in - * @param ourDestFile filename to write our binary destination to - */ - public StreamSinkServer(String sinkDir, String ourDestFile) { - this(sinkDir, ourDestFile, null, -1, 3); - } - public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) { - _sinkDir = sinkDir; - _destFile = ourDestFile; - _i2cpHost = i2cpHost; - _i2cpPort = i2cpPort; - _handlers = handlers; - _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class); - } - - /** - * Actually fire up the server - this call blocks forever (or until the server - * socket closes) - * - */ - public void runServer() { - I2PSocketManager mgr = null; - if (_i2cpHost != null) - mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties()); - else - mgr = I2PSocketManagerFactory.createManager(); - Destination dest = mgr.getSession().getMyDestination(); - if (_log.shouldLog(Log.INFO)) - _log.info("Listening for connections on: " + dest.calculateHash().toBase64()); - FileOutputStream fos = null; - try { - fos = new FileOutputStream(_destFile); - dest.writeBytes(fos); - } catch (IOException ioe) { - _log.error("Error writing out our destination to " + _destFile, ioe); - return; - } catch (DataFormatException dfe) { - _log.error("Error formatting the destination", dfe); - return; - } finally { - if (fos != null) try { fos.close(); } catch (IOException ioe) {} - } - - I2PServerSocket sock = mgr.getServerSocket(); - startup(sock); - } - - public void startup(I2PServerSocket sock) { - for (int i = 0; i < _handlers; i++) { - I2PThread t = new I2PThread(new ClientRunner(sock)); - t.setName("Handler " + i); - t.setDaemon(false); - t.start(); - } - } - - /** - * Actually deal with a client - pull anything they send us and write it to a file. - * - */ - private class ClientRunner implements Runnable { - private I2PServerSocket _socket; - public ClientRunner(I2PServerSocket socket) { - _socket = socket; - } - public void run() { - while (true) { - try { - I2PSocket socket = _socket.accept(); - if (socket != null) - handle(socket); - } catch (I2PException ie) { - _log.error("Error accepting connection", ie); - return; - } catch (ConnectException ce) { - _log.error("Connection already dropped", ce); - return; - } - } - } - - private void handle(I2PSocket sock) { - FileOutputStream fos = null; - try { - File sink = new File(_sinkDir); - if (!sink.exists()) - sink.mkdirs(); - File cur = File.createTempFile("clientSink", ".dat", sink); - fos = new FileOutputStream(cur); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Writing to " + cur.getAbsolutePath()); - } catch (IOException ioe) { - _log.error("Error creating sink", ioe); - return; - } - - long start = System.currentTimeMillis(); - try { - InputStream in = sock.getInputStream(); - byte buf[] = new byte[4096]; - long written = 0; - int read = 0; - while ( (read = in.read(buf)) != -1) { - //_fos.write(buf, 0, read); - written += read; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("read and wrote " + read + " (" + written + ")"); - } - fos.write(("written: [" + written + "]\n").getBytes()); - long lifetime = System.currentTimeMillis() - start; - _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); - } catch (IOException ioe) { - _log.error("Error writing the sink", ioe); - } finally { - if (fos != null) try { fos.close(); } catch (IOException ioe) {} - if (sock != null) try { sock.close(); } catch (IOException ioe) {} - _log.debug("Client socket closed"); - } - } - } - - /** - * Fire up the streaming server. Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]
- * - */ - public static void main(String args[]) { - StreamSinkServer server = null; - switch (args.length) { - case 0: - server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3); - break; - case 2: - server = new StreamSinkServer(args[0], args[1]); - break; - case 4: - case 5: - int handlers = 3; - if (args.length == 5) { - try { - handlers = Integer.parseInt(args[4]); - } catch (NumberFormatException nfe) {} - } - try { - int port = Integer.parseInt(args[1]); - server = new StreamSinkServer(args[2], args[3], args[0], port, handlers); - } catch (NumberFormatException nfe) { - System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); - } - break; - default: - System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); - } - if (server != null) - server.runServer(); - } + + private Log _log; + private String _sinkDir; + private String _destFile; + private String _i2cpHost; + private int _i2cpPort; + private int _handlers; + + /** + * Create but do not start the streaming server. + * + * @param sinkDir Directory to store received files in + * @param ourDestFile filename to write our binary destination to + */ + public StreamSinkServer(String sinkDir, String ourDestFile) { + this(sinkDir, ourDestFile, null, -1, 3); + } + + public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) { + _sinkDir = sinkDir; + _destFile = ourDestFile; + _i2cpHost = i2cpHost; + _i2cpPort = i2cpPort; + _handlers = handlers; + _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class); + } + + /** + * Actually fire up the server - this call blocks forever (or until the server + * socket closes) + * + */ + public void runServer() { + I2PSocketManager mgr = null; + if(_i2cpHost != null) { + mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties()); + } else { + mgr = I2PSocketManagerFactory.createManager(); + } + Destination dest = mgr.getSession().getMyDestination(); + if(_log.shouldLog(Log.INFO)) { + _log.info("Listening for connections on: " + dest.calculateHash().toBase64()); + } + FileOutputStream fos = null; + try { + fos = new FileOutputStream(_destFile); + dest.writeBytes(fos); + } catch(IOException ioe) { + _log.error("Error writing out our destination to " + _destFile, ioe); + return; + } catch(DataFormatException dfe) { + _log.error("Error formatting the destination", dfe); + return; + } finally { + if(fos != null) { + try { + fos.close(); + } catch(IOException ioe) { + } + } + } + + I2PServerSocket sock = mgr.getServerSocket(); + startup(sock); + } + + public void startup(I2PServerSocket sock) { + for(int i = 0; i < _handlers; i++) { + I2PThread t = new I2PThread(new ClientRunner(sock)); + t.setName("Handler " + i); + t.setDaemon(false); + t.start(); + } + } + + /** + * Actually deal with a client - pull anything they send us and write it to a file. + * + */ + private class ClientRunner implements Runnable { + + private I2PServerSocket _socket; + + public ClientRunner(I2PServerSocket socket) { + _socket = socket; + } + + public void run() { + while(true) { + try { + I2PSocket socket = _socket.accept(); + if(socket != null) { + handle(socket); + } + } catch(I2PException ie) { + _log.error("Error accepting connection", ie); + return; + } catch(ConnectException ce) { + _log.error("Connection already dropped", ce); + return; + } catch(SocketTimeoutException ste) { + // ignored + } + } + } + + private void handle(I2PSocket sock) { + FileOutputStream fos = null; + try { + File sink = new File(_sinkDir); + if(!sink.exists()) { + sink.mkdirs(); + } + File cur = File.createTempFile("clientSink", ".dat", sink); + fos = new FileOutputStream(cur); + if(_log.shouldLog(Log.DEBUG)) { + _log.debug("Writing to " + cur.getAbsolutePath()); + } + } catch(IOException ioe) { + _log.error("Error creating sink", ioe); + return; + } + + long start = System.currentTimeMillis(); + try { + InputStream in = sock.getInputStream(); + byte buf[] = new byte[4096]; + long written = 0; + int read = 0; + while((read = in.read(buf)) != -1) { + //_fos.write(buf, 0, read); + written += read; + if(_log.shouldLog(Log.DEBUG)) { + _log.debug("read and wrote " + read + " (" + written + ")"); + } + } + fos.write(("written: [" + written + "]\n").getBytes()); + long lifetime = System.currentTimeMillis() - start; + _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); + } catch(IOException ioe) { + _log.error("Error writing the sink", ioe); + } finally { + if(fos != null) { + try { + fos.close(); + } catch(IOException ioe) { + } + } + if(sock != null) { + try { + sock.close(); + } catch(IOException ioe) { + } + } + _log.debug("Client socket closed"); + } + } + } + + /** + * Fire up the streaming server. Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]
+ * + */ + public static void main(String args[]) { + StreamSinkServer server = null; + switch(args.length) { + case 0: + server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3); + break; + case 2: + server = new StreamSinkServer(args[0], args[1]); + break; + case 4: + case 5: + int handlers = 3; + if(args.length == 5) { + try { + handlers = Integer.parseInt(args[4]); + } catch(NumberFormatException nfe) { + } + } + try { + int port = Integer.parseInt(args[1]); + server = new StreamSinkServer(args[2], args[3], args[0], port, handlers); + } catch(NumberFormatException nfe) { + System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); + } + break; + default: + System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); + } + if(server != null) { + server.runServer(); + } + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 4960f1a22..f05ae1c8c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -1,5 +1,6 @@ package net.i2p.client.streaming; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index dcc93c5ec..08d794877 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -21,393 +21,459 @@ import net.i2p.util.SimpleTimer; * */ public class ConnectionManager { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private MessageHandler _messageHandler; - private PacketHandler _packetHandler; - private ConnectionHandler _connectionHandler; - private PacketQueue _outboundQueue; - private SchedulerChooser _schedulerChooser; - private ConnectionPacketHandler _conPacketHandler; - /** Inbound stream ID (Long) to Connection map */ - private Map _connectionByInboundId; - /** Ping ID (Long) to PingRequest */ - private Map _pendingPings; - private boolean _allowIncoming; - private int _maxConcurrentStreams; - private ConnectionOptions _defaultOptions; - private volatile int _numWaiting; - private Object _connectionLock; - - public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { - _context = context; - _log = context.logManager().getLog(ConnectionManager.class); - _connectionByInboundId = new HashMap(32); - _pendingPings = new HashMap(4); - _connectionLock = new Object(); - _messageHandler = new MessageHandler(context, this); - _packetHandler = new PacketHandler(context, this); - _connectionHandler = new ConnectionHandler(context, this); - _schedulerChooser = new SchedulerChooser(context); - _conPacketHandler = new ConnectionPacketHandler(context); - _session = session; - session.setSessionListener(_messageHandler); - _outboundQueue = new PacketQueue(context, session, this); - _allowIncoming = false; - _maxConcurrentStreams = maxConcurrent; - _defaultOptions = defaultOptions; - _numWaiting = 0; - _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - } - - Connection getConnectionByInboundId(long id) { - synchronized (_connectionLock) { - return (Connection)_connectionByInboundId.get(new Long(id)); - } - } - /** - * not guaranteed to be unique, but in case we receive more than one packet - * on an inbound connection that we havent ack'ed yet... - */ - Connection getConnectionByOutboundId(long id) { - synchronized (_connectionLock) { - for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); - if (DataHelper.eq(con.getSendStreamId(), id)) - return con; - } - } - return null; - } - - public void setAllowIncomingConnections(boolean allow) { - _connectionHandler.setActive(allow); - } - /** should we acceot connections, or just reject everyone? */ - public boolean getAllowIncomingConnections() { - return _connectionHandler.getActive(); - } - - /** - * Create a new connection based on the SYN packet we received. - * - * @return created Connection with the packet's data already delivered to - * it, or null if the syn's streamId was already taken - */ - public Connection receiveConnection(Packet synPacket) { - Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); - long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; - boolean reject = false; - int active = 0; - int total = 0; - synchronized (_connectionLock) { - total = _connectionByInboundId.size(); - for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - if ( ((Connection)iter.next()).getIsConnected() ) - active++; - } - if (locked_tooManyStreams()) { - reject = true; - } else { - while (true) { - Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con); - if (oldCon == null) { - break; - } else { - _connectionByInboundId.put(new Long(receiveId), oldCon); - // receiveId already taken, try another - receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; - } - } - } - } - - _context.statManager().addRateData("stream.receiveActive", active, total); - - if (reject) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Refusing connection since we have exceeded our max of " - + _maxConcurrentStreams + " connections"); - PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom()); - reply.setFlag(Packet.FLAG_RESET); - reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); - reply.setAckThrough(synPacket.getSequenceNum()); - reply.setSendStreamId(synPacket.getReceiveStreamId()); - reply.setReceiveStreamId(0); - reply.setOptionalFrom(_session.getMyDestination()); - // this just sends the packet - no retries or whatnot - _outboundQueue.enqueue(reply); - return null; - } - - con.setReceiveStreamId(receiveId); - try { - con.getPacketHandler().receivePacket(synPacket, con); - } catch (I2PException ie) { - synchronized (_connectionLock) { - _connectionByInboundId.remove(new Long(receiveId)); - } - return null; - } - - _context.statManager().addRateData("stream.connectionReceived", 1, 0); - return con; - } - - private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000; - - /** - * Build a new connection to the given peer. This blocks if there is no - * connection delay, otherwise it returns immediately. - * - * @return new connection, or null if we have exceeded our limit - */ - public Connection connect(Destination peer, ConnectionOptions opts) { - Connection con = null; - long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; - long expiration = _context.clock().now() + opts.getConnectTimeout(); - if (opts.getConnectTimeout() <= 0) - expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; - _numWaiting++; - while (true) { - long remaining = expiration - _context.clock().now(); - if (remaining <= 0) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Refusing to connect since we have exceeded our max of " - + _maxConcurrentStreams + " connections"); - _numWaiting--; - return null; - } - boolean reject = false; - synchronized (_connectionLock) { - if (locked_tooManyStreams()) { - // allow a full buffer of pending/waiting streams - if (_numWaiting > _maxConcurrentStreams) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Refusing connection since we have exceeded our max of " - + _maxConcurrentStreams + " and there are " + _numWaiting - + " waiting already"); - _numWaiting--; - return null; - } - - // no remaining streams, lets wait a bit - try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} - } else { - con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); - con.setRemotePeer(peer); - - while (_connectionByInboundId.containsKey(new Long(receiveId))) { - receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; - } - _connectionByInboundId.put(new Long(receiveId), con); - break; // stop looping as a psuedo-wait - } - } - } - // ok we're in... - con.setReceiveStreamId(receiveId); - con.eventOccurred(); - - _log.debug("Connect() conDelay = " + opts.getConnectDelay()); - if (opts.getConnectDelay() <= 0) { - con.waitForConnect(); - } - if (_numWaiting > 0) - _numWaiting--; - - _context.statManager().addRateData("stream.connectionCreated", 1, 0); - return con; - } + private I2PAppContext _context; + private Log _log; + private I2PSession _session; + private MessageHandler _messageHandler; + private PacketHandler _packetHandler; + private ConnectionHandler _connectionHandler; + private PacketQueue _outboundQueue; + private SchedulerChooser _schedulerChooser; + private ConnectionPacketHandler _conPacketHandler; + /** Inbound stream ID (Long) to Connection map */ + private Map _connectionByInboundId; + /** Ping ID (Long) to PingRequest */ + private Map _pendingPings; + private boolean _allowIncoming; + private int _maxConcurrentStreams; + private ConnectionOptions _defaultOptions; + private volatile int _numWaiting; + private Object _connectionLock; + private long SoTimeout; - private boolean locked_tooManyStreams() { - if (_maxConcurrentStreams <= 0) return false; - if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; - int active = 0; - for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); - if (con.getIsConnected()) - active++; - } - - if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) ) - _log.info("More than 100 connections! " + active - + " total: " + _connectionByInboundId.size()); + public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { + _context = context; + _log = context.logManager().getLog(ConnectionManager.class); + _connectionByInboundId = new HashMap(32); + _pendingPings = new HashMap(4); + _connectionLock = new Object(); + _messageHandler = new MessageHandler(context, this); + _packetHandler = new PacketHandler(context, this); + _connectionHandler = new ConnectionHandler(context, this); + _schedulerChooser = new SchedulerChooser(context); + _conPacketHandler = new ConnectionPacketHandler(context); + _session = session; + session.setSessionListener(_messageHandler); + _outboundQueue = new PacketQueue(context, session, this); + _allowIncoming = false; + _maxConcurrentStreams = maxConcurrent; + _defaultOptions = defaultOptions; + _numWaiting = 0; + /** Socket timeout for accept() */ + SoTimeout = -1; - return (active >= _maxConcurrentStreams); - } - - public MessageHandler getMessageHandler() { return _messageHandler; } - public PacketHandler getPacketHandler() { return _packetHandler; } - public ConnectionHandler getConnectionHandler() { return _connectionHandler; } - public I2PSession getSession() { return _session; } - public PacketQueue getPacketQueue() { return _outboundQueue; } - - /** - * Something b0rked hard, so kill all of our connections without mercy. - * Don't bother sending close packets. - * - */ - public void disconnectAllHard() { - synchronized (_connectionLock) { - for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); - con.disconnect(false, false); - } - _connectionByInboundId.clear(); - _connectionLock.notifyAll(); - } - } - - /** - * Drop the (already closed) connection on the floor. - * - */ - public void removeConnection(Connection con) { - boolean removed = false; - synchronized (_connectionLock) { - Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId())); - removed = (o == con); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Connection removed? " + removed + " remaining: " - + _connectionByInboundId.size() + ": " + con); - if (!removed && _log.shouldLog(Log.DEBUG)) - _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values()); - _connectionLock.notifyAll(); - } - if (removed) { - _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime()); - } - } - - /** return a set of Connection objects */ - public Set listConnections() { - synchronized (_connectionLock) { - return new HashSet(_connectionByInboundId.values()); - } - } - - public boolean ping(Destination peer, long timeoutMs) { - return ping(peer, timeoutMs, true); - } - public boolean ping(Destination peer, long timeoutMs, boolean blocking) { - return ping(peer, timeoutMs, blocking, null, null, null); - } - public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { - Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1); - PacketLocal packet = new PacketLocal(_context, peer); - packet.setSendStreamId(id.longValue()); - packet.setFlag(Packet.FLAG_ECHO); - packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); - packet.setOptionalFrom(_session.getMyDestination()); - if ( (keyToUse != null) && (tagsToSend != null) ) { - packet.setKeyUsed(keyToUse); - packet.setTagsSent(tagsToSend); - } - - PingRequest req = new PingRequest(peer, packet, notifier); - - synchronized (_pendingPings) { - _pendingPings.put(id, req); - } - - _outboundQueue.enqueue(packet); - packet.releasePayload(); - - if (blocking) { - synchronized (req) { - if (!req.pongReceived()) - try { req.wait(timeoutMs); } catch (InterruptedException ie) {} - } - - synchronized (_pendingPings) { - _pendingPings.remove(id); - } - } else { - SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs); - } - - boolean ok = req.pongReceived(); - return ok; - } + _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); + } - interface PingNotifier { - public void pingComplete(boolean ok); - } - - private class PingFailed implements SimpleTimer.TimedEvent { - private Long _id; - private PingNotifier _notifier; - public PingFailed(Long id, PingNotifier notifier) { - _id = id; - _notifier = notifier; - } - - public void timeReached() { - boolean removed = false; - synchronized (_pendingPings) { - Object o = _pendingPings.remove(_id); - if (o != null) - removed = true; - } - if (removed) { - if (_notifier != null) - _notifier.pingComplete(false); - if (_log.shouldLog(Log.INFO)) - _log.info("Ping failed"); - } - } - } - - private class PingRequest { - private boolean _ponged; - private Destination _peer; - private PacketLocal _packet; - private PingNotifier _notifier; - public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { - _ponged = false; - _peer = peer; - _packet = packet; - _notifier = notifier; - } - public void pong() { - _log.debug("Ping successful"); - _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); - synchronized (ConnectionManager.PingRequest.this) { - _ponged = true; - ConnectionManager.PingRequest.this.notifyAll(); - } - if (_notifier != null) - _notifier.pingComplete(true); - } - public boolean pongReceived() { return _ponged; } - } - - void receivePong(long pingId) { - PingRequest req = null; - synchronized (_pendingPings) { - req = (PingRequest)_pendingPings.remove(new Long(pingId)); - } - if (req != null) - req.pong(); - } + Connection getConnectionByInboundId(long id) { + synchronized(_connectionLock) { + return (Connection)_connectionByInboundId.get(new Long(id)); + } + } + + /** + * not guaranteed to be unique, but in case we receive more than one packet + * on an inbound connection that we havent ack'ed yet... + */ + Connection getConnectionByOutboundId(long id) { + synchronized(_connectionLock) { + for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { + Connection con = (Connection)iter.next(); + if(DataHelper.eq(con.getSendStreamId(), id)) { + return con; + } + } + } + return null; + } + + /** + * Set the socket accept() timeout. + * @param x + */ + public void MsetSoTimeout(long x) { + SoTimeout = x; + } + + /** + * Get the socket accept() timeout. + * @return + */ + public long MgetSoTimeout() { + return SoTimeout; + } + + public void setAllowIncomingConnections(boolean allow) { + _connectionHandler.setActive(allow); + } + + /** should we acceot connections, or just reject everyone? */ + public boolean getAllowIncomingConnections() { + return _connectionHandler.getActive(); + } + + /** + * Create a new connection based on the SYN packet we received. + * + * @return created Connection with the packet's data already delivered to + * it, or null if the syn's streamId was already taken + */ + public Connection receiveConnection(Packet synPacket) { + Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); + long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; + boolean reject = false; + int active = 0; + int total = 0; + synchronized(_connectionLock) { + total = _connectionByInboundId.size(); + for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { + if(((Connection)iter.next()).getIsConnected()) { + active++; + } + } + if(locked_tooManyStreams()) { + reject = true; + } else { + while(true) { + Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con); + if(oldCon == null) { + break; + } else { + _connectionByInboundId.put(new Long(receiveId), oldCon); + // receiveId already taken, try another + receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; + } + } + } + } + + _context.statManager().addRateData("stream.receiveActive", active, total); + + if(reject) { + if(_log.shouldLog(Log.WARN)) { + _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections"); + } + PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom()); + reply.setFlag(Packet.FLAG_RESET); + reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + reply.setAckThrough(synPacket.getSequenceNum()); + reply.setSendStreamId(synPacket.getReceiveStreamId()); + reply.setReceiveStreamId(0); + reply.setOptionalFrom(_session.getMyDestination()); + // this just sends the packet - no retries or whatnot + _outboundQueue.enqueue(reply); + return null; + } + + con.setReceiveStreamId(receiveId); + try { + con.getPacketHandler().receivePacket(synPacket, con); + } catch(I2PException ie) { + synchronized(_connectionLock) { + _connectionByInboundId.remove(new Long(receiveId)); + } + return null; + } + + _context.statManager().addRateData("stream.connectionReceived", 1, 0); + return con; + } + private static final long DEFAULT_STREAM_DELAY_MAX = 10 * 1000; + + /** + * Build a new connection to the given peer. This blocks if there is no + * connection delay, otherwise it returns immediately. + * + * @return new connection, or null if we have exceeded our limit + */ + public Connection connect(Destination peer, ConnectionOptions opts) { + Connection con = null; + long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; + long expiration = _context.clock().now() + opts.getConnectTimeout(); + if(opts.getConnectTimeout() <= 0) { + expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; + } + _numWaiting++; + while(true) { + long remaining = expiration - _context.clock().now(); + if(remaining <= 0) { + if(_log.shouldLog(Log.WARN)) { + _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections"); + } + _numWaiting--; + return null; + } + boolean reject = false; + synchronized(_connectionLock) { + if(locked_tooManyStreams()) { + // allow a full buffer of pending/waiting streams + if(_numWaiting > _maxConcurrentStreams) { + if(_log.shouldLog(Log.WARN)) { + _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already"); + } + _numWaiting--; + return null; + } + + // no remaining streams, lets wait a bit + try { + _connectionLock.wait(remaining); + } catch(InterruptedException ie) { + } + } else { + con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); + con.setRemotePeer(peer); + + while(_connectionByInboundId.containsKey(new Long(receiveId))) { + receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; + } + _connectionByInboundId.put(new Long(receiveId), con); + break; // stop looping as a psuedo-wait + } + } + } + + // ok we're in... + con.setReceiveStreamId(receiveId); + con.eventOccurred(); + + _log.debug("Connect() conDelay = " + opts.getConnectDelay()); + if(opts.getConnectDelay() <= 0) { + con.waitForConnect(); + } + if(_numWaiting > 0) { + _numWaiting--; + } + _context.statManager().addRateData("stream.connectionCreated", 1, 0); + return con; + } + + private boolean locked_tooManyStreams() { + if(_maxConcurrentStreams <= 0) { + return false; + } + if(_connectionByInboundId.size() < _maxConcurrentStreams) { + return false; + } + int active = 0; + for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { + Connection con = (Connection)iter.next(); + if(con.getIsConnected()) { + active++; + } + } + + if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) { + _log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size()); + } + return (active >= _maxConcurrentStreams); + } + + public MessageHandler getMessageHandler() { + return _messageHandler; + } + + public PacketHandler getPacketHandler() { + return _packetHandler; + } + + public ConnectionHandler getConnectionHandler() { + return _connectionHandler; + } + + public I2PSession getSession() { + return _session; + } + + public PacketQueue getPacketQueue() { + return _outboundQueue; + } + + /** + * Something b0rked hard, so kill all of our connections without mercy. + * Don't bother sending close packets. + * + */ + public void disconnectAllHard() { + synchronized(_connectionLock) { + for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { + Connection con = (Connection)iter.next(); + con.disconnect(false, false); + } + _connectionByInboundId.clear(); + _connectionLock.notifyAll(); + } + } + + /** + * Drop the (already closed) connection on the floor. + * + */ + public void removeConnection(Connection con) { + boolean removed = false; + synchronized(_connectionLock) { + Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId())); + removed = (o == con); + if(_log.shouldLog(Log.DEBUG)) { + _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con); + } + if(!removed && _log.shouldLog(Log.DEBUG)) { + _log.debug("Failed to remove " + con + "\n" + _connectionByInboundId.values()); + } + _connectionLock.notifyAll(); + } + if(removed) { + _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime()); + } + } + + /** return a set of Connection objects */ + public Set listConnections() { + synchronized(_connectionLock) { + return new HashSet(_connectionByInboundId.values()); + } + } + + public boolean ping(Destination peer, long timeoutMs) { + return ping(peer, timeoutMs, true); + } + + public boolean ping(Destination peer, long timeoutMs, boolean blocking) { + return ping(peer, timeoutMs, blocking, null, null, null); + } + + public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { + Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1); + PacketLocal packet = new PacketLocal(_context, peer); + packet.setSendStreamId(id.longValue()); + packet.setFlag(Packet.FLAG_ECHO); + packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + packet.setOptionalFrom(_session.getMyDestination()); + if((keyToUse != null) && (tagsToSend != null)) { + packet.setKeyUsed(keyToUse); + packet.setTagsSent(tagsToSend); + } + + PingRequest req = new PingRequest(peer, packet, notifier); + + synchronized(_pendingPings) { + _pendingPings.put(id, req); + } + + _outboundQueue.enqueue(packet); + packet.releasePayload(); + + if(blocking) { + synchronized(req) { + if(!req.pongReceived()) { + try { + req.wait(timeoutMs); + } catch(InterruptedException ie) { + } + } + } + + synchronized(_pendingPings) { + _pendingPings.remove(id); + } + } else { + SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs); + } + + boolean ok = req.pongReceived(); + return ok; + } + + interface PingNotifier { + + public void pingComplete(boolean ok); + } + + private class PingFailed implements SimpleTimer.TimedEvent { + + private Long _id; + private PingNotifier _notifier; + + public PingFailed(Long id, PingNotifier notifier) { + _id = id; + _notifier = notifier; + } + + public void timeReached() { + boolean removed = false; + synchronized(_pendingPings) { + Object o = _pendingPings.remove(_id); + if(o != null) { + removed = true; + } + } + if(removed) { + if(_notifier != null) { + _notifier.pingComplete(false); + } + if(_log.shouldLog(Log.INFO)) { + _log.info("Ping failed"); + } + } + } + } + + private class PingRequest { + + private boolean _ponged; + private Destination _peer; + private PacketLocal _packet; + private PingNotifier _notifier; + + public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { + _ponged = false; + _peer = peer; + _packet = packet; + _notifier = notifier; + } + + public void pong() { + _log.debug("Ping successful"); + _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); + synchronized(ConnectionManager.PingRequest.this) { + _ponged = true; + ConnectionManager.PingRequest.this.notifyAll(); + } + if(_notifier != null) { + _notifier.pingComplete(true); + } + } + + public boolean pongReceived() { + return _ponged; + } + } + + void receivePong(long pingId) { + PingRequest req = null; + synchronized(_pendingPings) { + req = (PingRequest)_pendingPings.remove(new Long(pingId)); + } + if(req != null) { + req.pong(); + } + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index b1a4175f2..b85459f63 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -1,5 +1,8 @@ package net.i2p.client.streaming; +import java.net.SocketTimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; import net.i2p.I2PException; /** @@ -7,17 +10,46 @@ import net.i2p.I2PException; * */ public class I2PServerSocketFull implements I2PServerSocket { - private I2PSocketManagerFull _socketManager; - - public I2PServerSocketFull(I2PSocketManagerFull mgr) { - _socketManager = mgr; - } - - public I2PSocket accept() throws I2PException { - return _socketManager.receiveSocket(); - } - - public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); } - - public I2PSocketManager getManager() { return _socketManager; } + + private I2PSocketManagerFull _socketManager; + + /** + * + * @param mgr + */ + public I2PServerSocketFull(I2PSocketManagerFull mgr) { + _socketManager = mgr; + } + + /** + * + * @return + * @throws net.i2p.I2PException + * @throws SocketTimeoutException + */ + public I2PSocket accept() throws I2PException, SocketTimeoutException { + return _socketManager.receiveSocket(); + } + + public long getSoTimeout() { + return _socketManager.getConnectionManager().MgetSoTimeout(); + } + + public void setSoTimeout(long x) { + _socketManager.getConnectionManager().MsetSoTimeout(x); + } + /** + * Close the connection. + */ + public void close() { + _socketManager.getConnectionManager().setAllowIncomingConnections(false); + } + + /** + * + * @return _socketManager + */ + public I2PSocketManager getManager() { + return _socketManager; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index 61dd48757..842cf791b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -11,119 +11,139 @@ import net.i2p.data.Destination; * */ public class I2PSocketFull implements I2PSocket { - private Connection _connection; - private I2PSocket.SocketErrorListener _listener; - private Destination _remotePeer; - private Destination _localPeer; - - public I2PSocketFull(Connection con) { - _connection = con; - if (con != null) { - _remotePeer = con.getRemotePeer(); - _localPeer = con.getSession().getMyDestination(); - } - } - - public void close() throws IOException { - Connection c = _connection; - if (c == null) return; - if (c.getIsConnected()) { - OutputStream out = c.getOutputStream(); - if (out != null) { - try { - out.close(); - } catch (IOException ioe) { - // ignore any write error, as we want to keep on and kill the - // con (thanks Complication!) - } - } - c.disconnect(true); - } else { - //throw new IOException("Not connected"); - } - destroy(); - } - - Connection getConnection() { return _connection; } - - public InputStream getInputStream() { - Connection c = _connection; - if (c != null) - return c.getInputStream(); - else - return null; - } - - public I2PSocketOptions getOptions() { - Connection c = _connection; - if (c != null) - return c.getOptions(); - else - return null; - } - - public OutputStream getOutputStream() throws IOException { - Connection c = _connection; - if (c != null) - return c.getOutputStream(); - else - return null; - } - - public Destination getPeerDestination() { return _remotePeer; } - - public long getReadTimeout() { - I2PSocketOptions opts = getOptions(); - if (opts != null) - return opts.getReadTimeout(); - else - return -1; - } - - public Destination getThisDestination() { return _localPeer; } - - public void setOptions(I2PSocketOptions options) { - Connection c = _connection; - if (c == null) return; - - if (options instanceof ConnectionOptions) - c.setOptions((ConnectionOptions)options); - else - c.setOptions(new ConnectionOptions(options)); - } - - public void setReadTimeout(long ms) { - Connection c = _connection; - if (c == null) return; - - c.getInputStream().setReadTimeout((int)ms); - c.getOptions().setReadTimeout(ms); - } - - public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) { - _listener = lsnr; - } - - public boolean isClosed() { - Connection c = _connection; - return ((c == null) || - (!c.getIsConnected()) || - (c.getResetReceived()) || - (c.getResetSent())); - } - - void destroy() { - Connection c = _connection; - _connection = null; - _listener = null; - if (c != null) - c.disconnectComplete(); - } - public String toString() { - Connection c = _connection; - if (c == null) - return super.toString(); - else - return c.toString(); - } + + private Connection _connection; + private I2PSocket.SocketErrorListener _listener; + private Destination _remotePeer; + private Destination _localPeer; + + public I2PSocketFull(Connection con) { + _connection = con; + if(con != null) { + _remotePeer = con.getRemotePeer(); + _localPeer = con.getSession().getMyDestination(); + } + } + + + public void close() throws IOException { + Connection c = _connection; + if(c == null) { + return; + } + if(c.getIsConnected()) { + OutputStream out = c.getOutputStream(); + if(out != null) { + try { + out.close(); + } catch(IOException ioe) { + // ignore any write error, as we want to keep on and kill the + // con (thanks Complication!) + } + } + c.disconnect(true); + } else { + //throw new IOException("Not connected"); + } + destroy(); + } + + Connection getConnection() { + return _connection; + } + + public InputStream getInputStream() { + Connection c = _connection; + if(c != null) { + return c.getInputStream(); + } else { + return null; + } + } + + public I2PSocketOptions getOptions() { + Connection c = _connection; + if(c != null) { + return c.getOptions(); + } else { + return null; + } + } + + public OutputStream getOutputStream() throws IOException { + Connection c = _connection; + if(c != null) { + return c.getOutputStream(); + } else { + return null; + } + } + + public Destination getPeerDestination() { + return _remotePeer; + } + + public long getReadTimeout() { + I2PSocketOptions opts = getOptions(); + if(opts != null) { + return opts.getReadTimeout(); + } else { + return -1; + } + } + + public Destination getThisDestination() { + return _localPeer; + } + + public void setOptions(I2PSocketOptions options) { + Connection c = _connection; + if(c == null) { + return; + } + if(options instanceof ConnectionOptions) { + c.setOptions((ConnectionOptions)options); + } else { + c.setOptions(new ConnectionOptions(options)); + } + } + + public void setReadTimeout(long ms) { + Connection c = _connection; + if(c == null) { + return; + } + c.getInputStream().setReadTimeout((int)ms); + c.getOptions().setReadTimeout(ms); + } + + public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) { + _listener = lsnr; + } + + public boolean isClosed() { + Connection c = _connection; + return ((c == null) || + (!c.getIsConnected()) || + (c.getResetReceived()) || + (c.getResetSent())); + } + + void destroy() { + Connection c = _connection; + _connection = null; + _listener = null; + if(c != null) { + c.disconnectComplete(); + } + } + + public String toString() { + Connection c = _connection; + if(c == null) { + return super.toString(); + } else { + return c.toString(); + } + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index 7384a4972..b0d1c841a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -1,6 +1,7 @@ package net.i2p.client.streaming; import java.net.NoRouteToHostException; +import java.net.SocketTimeoutException; import java.util.HashSet; import java.util.Iterator; import java.util.Properties; @@ -13,7 +14,6 @@ import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; import net.i2p.util.Log; - /** * Centralize the coordination and multiplexing of the local client's streaming. * There should be one I2PSocketManager for each I2PSession, and if an application @@ -23,219 +23,317 @@ import net.i2p.util.Log; * */ public class I2PSocketManagerFull implements I2PSocketManager { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private I2PServerSocketFull _serverSocket; - private ConnectionOptions _defaultOptions; - private long _acceptTimeout; - private String _name; - private int _maxStreams; - private static int __managerId = 0; - private ConnectionManager _connectionManager; - - /** - * How long to wait for the client app to accept() before sending back CLOSE? - * This includes the time waiting in the queue. Currently set to 5 seconds. - */ - private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000; - - public I2PSocketManagerFull() { - _context = null; - _session = null; - } - public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { - this(); - init(context, session, opts, name); - } - - /** how many streams will we allow at once? */ - public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; - - /** - * - */ - public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { - _context = context; - _session = session; - _log = _context.logManager().getLog(I2PSocketManagerFull.class); - - _maxStreams = -1; - try { - String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); - _maxStreams = Integer.parseInt(num); - } catch (NumberFormatException nfe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); - _maxStreams = -1; - } - _name = name + " " + (++__managerId); - _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; - _defaultOptions = new ConnectionOptions(opts); - _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions); - _serverSocket = new I2PServerSocketFull(this); - - if (_log.shouldLog(Log.INFO)) { - _log.info("Socket manager created. \ndefault options: " + _defaultOptions - + "\noriginal properties: " + opts); - } - } - public I2PSocketOptions buildOptions() { return buildOptions(null); } - public I2PSocketOptions buildOptions(Properties opts) { - ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions); - curOpts.setProperties(opts); - return curOpts; - } - - public I2PSession getSession() { - return _session; - } - - public ConnectionManager getConnectionManager() { - return _connectionManager; - } + private I2PAppContext _context; + private Log _log; + private I2PSession _session; + private I2PServerSocketFull _serverSocket; + private ConnectionOptions _defaultOptions; + private long _acceptTimeout; + private String _name; + private int _maxStreams; + private static int __managerId = 0; + private ConnectionManager _connectionManager; + /** + * How long to wait for the client app to accept() before sending back CLOSE? + * This includes the time waiting in the queue. Currently set to 5 seconds. + */ + private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000; - public I2PSocket receiveSocket() throws I2PException { - verifySession(); - Connection con = _connectionManager.getConnectionHandler().accept(-1); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("receiveSocket() called: " + con); - if (con != null) { - I2PSocketFull sock = new I2PSocketFull(con); - con.setSocket(sock); - return sock; - } else { - return null; - } - } - - /** - * Ping the specified peer, returning true if they replied to the ping within - * the timeout specified, false otherwise. This call blocks. - * - */ - public boolean ping(Destination peer, long timeoutMs) { - return _connectionManager.ping(peer, timeoutMs); - } + /** + * + */ + public I2PSocketManagerFull() { + _context = null; + _session = null; + } - /** - * How long should we wait for the client to .accept() a socket before - * sending back a NACK/Close? - * - * @param ms milliseconds to wait, maximum - */ - public void setAcceptTimeout(long ms) { _acceptTimeout = ms; } - public long getAcceptTimeout() { return _acceptTimeout; } + /** + * + * @param context + * @param session + * @param opts + * @param name + */ + public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { + this(); + init(context, session, opts, name); + } + /** how many streams will we allow at once? */ + public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; - public void setDefaultOptions(I2PSocketOptions options) { - _defaultOptions = new ConnectionOptions((ConnectionOptions) options); - } + /** + * + * + * @param context + * @param session + * @param opts + * @param name + */ + public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { + _context = context; + _session = session; + _log = _context.logManager().getLog(I2PSocketManagerFull.class); - public I2PSocketOptions getDefaultOptions() { - return _defaultOptions; - } + _maxStreams = -1; + try { + String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); + _maxStreams = Integer.parseInt(num); + } catch(NumberFormatException nfe) { + if(_log.shouldLog(Log.WARN)) { + _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); + } + _maxStreams = -1; + } + _name = name + " " + (++__managerId); + _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; + _defaultOptions = new ConnectionOptions(opts); + _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions); + _serverSocket = new I2PServerSocketFull(this); - public I2PServerSocket getServerSocket() { - _connectionManager.setAllowIncomingConnections(true); - return _serverSocket; - } + if(_log.shouldLog(Log.INFO)) { + _log.info("Socket manager created. \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts); + } + } - private void verifySession() throws I2PException { - if (!_connectionManager.getSession().isClosed()) - return; - _connectionManager.getSession().connect(); - } - - /** - * Create a new connected socket (block until the socket is created) - * - * @param peer Destination to connect to - * @param options I2P socket options to be used for connecting - * - * @throws NoRouteToHostException if the peer is not found or not reachable - * @throws I2PException if there is some other I2P-related problem - */ - public I2PSocket connect(Destination peer, I2PSocketOptions options) - throws I2PException, NoRouteToHostException { - verifySession(); - if (options == null) - options = _defaultOptions; - ConnectionOptions opts = null; - if (options instanceof ConnectionOptions) - opts = new ConnectionOptions((ConnectionOptions)options); - else - opts = new ConnectionOptions(options); - - if (_log.shouldLog(Log.INFO)) - _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) - + " with options: " + opts); - Connection con = _connectionManager.connect(peer, opts); - if (con == null) - throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); - I2PSocketFull socket = new I2PSocketFull(con); - con.setSocket(socket); - if (con.getConnectionError() != null) { - con.disconnect(false); - throw new NoRouteToHostException(con.getConnectionError()); - } - return socket; - } + /** + * + * @return + */ + public I2PSocketOptions buildOptions() { + return buildOptions(null); + } - /** - * Create a new connected socket (block until the socket is created) - * - * @param peer Destination to connect to - * - * @throws NoRouteToHostException if the peer is not found or not reachable - * @throws I2PException if there is some other I2P-related problem - */ - public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException { - return connect(peer, _defaultOptions); - } + /** + * + * @param opts + * @return + */ + public I2PSocketOptions buildOptions(Properties opts) { + ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions); + curOpts.setProperties(opts); + return curOpts; + } - /** - * Destroy the socket manager, freeing all the associated resources. This - * method will block untill all the managed sockets are closed. - * - */ - public void destroySocketManager() { - _connectionManager.disconnectAllHard(); - _connectionManager.setAllowIncomingConnections(false); - // should we destroy the _session too? - // yes, since the old lib did (and SAM wants it to, and i dont know why not) - if ( (_session != null) && (!_session.isClosed()) ) { - try { - _session.destroySession(); - } catch (I2PSessionException ise) { - _log.warn("Unable to destroy the session", ise); - } - } - } + /** + * + * @return + */ + public I2PSession getSession() { + return _session; + } - /** - * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. - * - */ - public Set listSockets() { - Set connections = _connectionManager.listConnections(); - Set rv = new HashSet(connections.size()); - for (Iterator iter = connections.iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); - if (con.getSocket() != null) - rv.add(con.getSocket()); - } - return rv; - } + /** + * + * @return + */ + public ConnectionManager getConnectionManager() { + return _connectionManager; + } - public String getName() { return _name; } - public void setName(String name) { _name = name; } - - - public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { - _connectionManager.getMessageHandler().addDisconnectListener(lsnr); - } - public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { - _connectionManager.getMessageHandler().removeDisconnectListener(lsnr); - } + /** + * + * @return + * @throws net.i2p.I2PException + * @throws java.net.SocketTimeoutException + */ + public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException { + verifySession(); + Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout()); + if(_log.shouldLog(Log.DEBUG)) { + _log.debug("receiveSocket() called: " + con); + } + if(con != null) { + I2PSocketFull sock = new I2PSocketFull(con); + con.setSocket(sock); + return sock; + } else { + if(_connectionManager.MgetSoTimeout() == -1) { + return null; + } + throw new SocketTimeoutException("I2PSocket timed out"); + } + } + + /** + * Ping the specified peer, returning true if they replied to the ping within + * the timeout specified, false otherwise. This call blocks. + * + * + * @param peer + * @param timeoutMs + * @return + */ + public boolean ping(Destination peer, long timeoutMs) { + return _connectionManager.ping(peer, timeoutMs); + } + + /** + * How long should we wait for the client to .accept() a socket before + * sending back a NACK/Close? + * + * @param ms milliseconds to wait, maximum + */ + public void setAcceptTimeout(long ms) { + _acceptTimeout = ms; + } + + /** + * + * @return + */ + public long getAcceptTimeout() { + return _acceptTimeout; + } + + /** + * + * @param options + */ + public void setDefaultOptions(I2PSocketOptions options) { + _defaultOptions = new ConnectionOptions((ConnectionOptions)options); + } + + /** + * + * @return + */ + public I2PSocketOptions getDefaultOptions() { + return _defaultOptions; + } + + /** + * + * @return + */ + public I2PServerSocket getServerSocket() { + _connectionManager.setAllowIncomingConnections(true); + return _serverSocket; + } + + private void verifySession() throws I2PException { + if(!_connectionManager.getSession().isClosed()) { + return; + } + _connectionManager.getSession().connect(); + } + + /** + * Create a new connected socket (block until the socket is created) + * + * @param peer Destination to connect to + * @param options I2P socket options to be used for connecting + * + * @throws NoRouteToHostException if the peer is not found or not reachable + * @throws I2PException if there is some other I2P-related problem + */ + public I2PSocket connect(Destination peer, I2PSocketOptions options) + throws I2PException, NoRouteToHostException { + verifySession(); + if(options == null) { + options = _defaultOptions; + } + ConnectionOptions opts = null; + if(options instanceof ConnectionOptions) { + opts = new ConnectionOptions((ConnectionOptions)options); + } else { + opts = new ConnectionOptions(options); + } + if(_log.shouldLog(Log.INFO)) { + _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts); + } + Connection con = _connectionManager.connect(peer, opts); + if(con == null) { + throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); + } + I2PSocketFull socket = new I2PSocketFull(con); + con.setSocket(socket); + if(con.getConnectionError() != null) { + con.disconnect(false); + throw new NoRouteToHostException(con.getConnectionError()); + } + return socket; + } + + /** + * Create a new connected socket (block until the socket is created) + * + * @param peer Destination to connect to + * + * @return + * @throws NoRouteToHostException if the peer is not found or not reachable + * @throws I2PException if there is some other I2P-related problem + */ + public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException { + return connect(peer, _defaultOptions); + } + + /** + * Destroy the socket manager, freeing all the associated resources. This + * method will block untill all the managed sockets are closed. + * + */ + public void destroySocketManager() { + _connectionManager.disconnectAllHard(); + _connectionManager.setAllowIncomingConnections(false); + // should we destroy the _session too? + // yes, since the old lib did (and SAM wants it to, and i dont know why not) + if((_session != null) && (!_session.isClosed())) { + try { + _session.destroySession(); + } catch(I2PSessionException ise) { + _log.warn("Unable to destroy the session", ise); + } + } + } + + /** + * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. + * + * + * @return + */ + public Set listSockets() { + Set connections = _connectionManager.listConnections(); + Set rv = new HashSet(connections.size()); + for(Iterator iter = connections.iterator(); iter.hasNext();) { + Connection con = (Connection)iter.next(); + if(con.getSocket() != null) { + rv.add(con.getSocket()); + } + } + return rv; + } + + /** + * + * @return + */ + public String getName() { + return _name; + } + + /** + * + * @param name + */ + public void setName(String name) { + _name = name; + } + + /** + * + * @param lsnr + */ + public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { + _connectionManager.getMessageHandler().addDisconnectListener(lsnr); + } + + /** + * + * @param lsnr + */ + public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { + _connectionManager.getMessageHandler().removeDisconnectListener(lsnr); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java index 0ea0c83d7..c52c373b1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java @@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer; /** * */ -class RetransmissionTimer extends SimpleTimer { +public class RetransmissionTimer extends SimpleTimer { private static final RetransmissionTimer _instance = new RetransmissionTimer(); public static final SimpleTimer getInstance() { return _instance; } protected RetransmissionTimer() { super("StreamingTimer"); } diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java index e3c1b6fbf..c5955c999 100644 --- a/core/java/src/net/i2p/util/Executor.java +++ b/core/java/src/net/i2p/util/Executor.java @@ -5,42 +5,59 @@ import java.util.List; import net.i2p.I2PAppContext; class Executor implements Runnable { - private I2PAppContext _context; - private Log _log; - private List _readyEvents; - public Executor(I2PAppContext ctx, Log log, List events) { - _context = ctx; - _readyEvents = events; - } - public void run() { - while (true) { - SimpleTimer.TimedEvent evt = null; - synchronized (_readyEvents) { - if (_readyEvents.size() <= 0) - try { _readyEvents.wait(); } catch (InterruptedException ie) {} - if (_readyEvents.size() > 0) - evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); - } - if (evt != null) { - long before = _context.clock().now(); - try { - evt.timeReached(); - } catch (Throwable t) { - log("wtf, event borked: " + evt, t); - } - long time = _context.clock().now() - before; - if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) ) - _log.warn("wtf, event execution took " + time + ": " + evt); - } - } - } - - private void log(String msg, Throwable t) { - synchronized (this) { - if (_log == null) - _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); - } - _log.log(Log.CRIT, msg, t); - } + private I2PAppContext _context; + private Log _log; + private List _readyEvents; + private SimpleStore runn; + + public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { + _context = ctx; + _readyEvents = events; + runn = x; + } + + public void run() { + while(runn.getAnswer()) { + SimpleTimer.TimedEvent evt = null; + synchronized(_readyEvents) { + if(_readyEvents.size() <= 0) { + try { + _readyEvents.wait(); + } catch(InterruptedException ie) { + } + } + if(_readyEvents.size() > 0) { + evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); + } + } + + if(evt != null) { + long before = _context.clock().now(); + try { + evt.timeReached(); + } catch(Throwable t) { + log("wtf, event borked: " + evt, t); + } + long time = _context.clock().now() - before; + if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) { + _log.warn("wtf, event execution took " + time + ": " + evt); + } + } + } + } + + /** + * + * @param msg + * @param t + */ + private void log(String msg, Throwable t) { + synchronized(this) { + if(_log == null) { + _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); + } + } + _log.log(Log.CRIT, msg, t); + } } diff --git a/core/java/src/net/i2p/util/SimpleStore.java b/core/java/src/net/i2p/util/SimpleStore.java new file mode 100644 index 000000000..b73a8e7eb --- /dev/null +++ b/core/java/src/net/i2p/util/SimpleStore.java @@ -0,0 +1,35 @@ +/* + * This is free software, do as you please. + */ + +package net.i2p.util; + +/** + * + * @author sponge + */ +public class SimpleStore { + + private boolean answer; + + SimpleStore(boolean x) { + answer=x; + } + + /** + * set the answer + * + * @param x + */ + public void setAnswer(boolean x) { + answer = x; + } + /** + * + * @return boolean + */ + public boolean getAnswer() { + return answer; + } + +} diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index 9543f72c5..5595fbd5c 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -16,211 +16,262 @@ import net.i2p.I2PAppContext; * */ public class SimpleTimer { - private static final SimpleTimer _instance = new SimpleTimer(); - public static SimpleTimer getInstance() { return _instance; } - private I2PAppContext _context; - private Log _log; - /** event time (Long) to event (TimedEvent) mapping */ - private TreeMap _events; - /** event (TimedEvent) to event time (Long) mapping */ - private Map _eventTimes; - private List _readyEvents; - - protected SimpleTimer() { this("SimpleTimer"); } - protected SimpleTimer(String name) { - _context = I2PAppContext.getGlobalContext(); - _log = _context.logManager().getLog(SimpleTimer.class); - _events = new TreeMap(); - _eventTimes = new HashMap(256); - _readyEvents = new ArrayList(4); - I2PThread runner = new I2PThread(new SimpleTimerRunner()); - runner.setName(name); - runner.setDaemon(true); - runner.start(); - for (int i = 0; i < 3; i++) { - I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents)); - executor.setName(name + "Executor " + i); - executor.setDaemon(true); - executor.start(); - } - } - - public void reschedule(TimedEvent event, long timeoutMs) { - addEvent(event, timeoutMs, false); - } - - /** - * Queue up the given event to be fired no sooner than timeoutMs from now. - * However, if this event is already scheduled, the event will be scheduled - * for the earlier of the two timeouts, which may be before this stated - * timeout. If this is not the desired behavior, call removeEvent first. - * - */ - public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } - /** - * @param useEarliestTime if its already scheduled, use the earlier of the - * two timeouts, else use the later - */ - public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) { - int totalEvents = 0; - long now = System.currentTimeMillis(); - long eventTime = now + timeoutMs; - Long time = new Long(eventTime); - synchronized (_events) { - // remove the old scheduled position, then reinsert it - Long oldTime = (Long)_eventTimes.get(event); - if (oldTime != null) { - if (useEarliestTime) { - if (oldTime.longValue() < eventTime) { - _events.notifyAll(); - return; // already scheduled for sooner than requested - } else { - _events.remove(oldTime); - } - } else { - if (oldTime.longValue() > eventTime) { - _events.notifyAll(); - return; // already scheduled for later than the given period - } else { - _events.remove(oldTime); - } - } - } - while (_events.containsKey(time)) - time = new Long(time.longValue() + 1); - _events.put(time, event); - _eventTimes.put(event, time); - - if ( (_events.size() != _eventTimes.size()) ) { - _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size()); - for (Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext(); ) { - TimedEvent evt = (TimedEvent)iter.next(); - Long when = (Long)_eventTimes.get(evt); - TimedEvent cur = (TimedEvent)_events.get(when); - if (cur != evt) { - _log.error("event " + evt + " @ " + when + ": " + cur); - } - } - } - - totalEvents = _events.size(); - _events.notifyAll(); - } - if (time.longValue() > eventTime + 100) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Lots of timer congestion, had to push " + event + " back " - + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")"); - } - long timeToAdd = System.currentTimeMillis() - now; - if (timeToAdd > 50) { - if (_log.shouldLog(Log.WARN)) - _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued"); - } - - } - - public boolean removeEvent(TimedEvent evt) { - if (evt == null) return false; - synchronized (_events) { - Long when = (Long)_eventTimes.remove(evt); - if (when != null) - _events.remove(when); - return null != when; - } - } - - /** - * Simple interface for events to be queued up and notified on expiration - */ - public interface TimedEvent { - /** - * the time requested has been reached (this call should NOT block, - * otherwise the whole SimpleTimer gets backed up) - * - */ - public void timeReached(); - } - - private long _occurredTime; - private long _occurredEventCount; - private TimedEvent _recentEvents[] = new TimedEvent[5]; - - private class SimpleTimerRunner implements Runnable { - public void run() { - List eventsToFire = new ArrayList(1); - while (true) { - try { - synchronized (_events) { - //if (_events.size() <= 0) - // _events.wait(); - //if (_events.size() > 100) - // _log.warn("> 100 events! " + _events.values()); - long now = System.currentTimeMillis(); - long nextEventDelay = -1; - Object nextEvent = null; - while (true) { - if (_events.size() <= 0) break; - Long when = (Long)_events.firstKey(); - if (when.longValue() <= now) { - TimedEvent evt = (TimedEvent)_events.remove(when); - if (evt != null) { - _eventTimes.remove(evt); - eventsToFire.add(evt); - } - } else { - nextEventDelay = when.longValue() - now; - nextEvent = _events.get(when); - break; - } - } - if (eventsToFire.size() <= 0) { - if (nextEventDelay != -1) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Next event in " + nextEventDelay + ": " + nextEvent); - _events.wait(nextEventDelay); - } else { - _events.wait(); - } - } - } - } catch (ThreadDeath td) { - return; // die - } catch (InterruptedException ie) { - // ignore - } catch (Throwable t) { - if (_log != null) { - _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t); - } else { - System.err.println("Uncaught exception in SimpleTimer"); - t.printStackTrace(); - } - } - - long now = System.currentTimeMillis(); - now = now - (now % 1000); - synchronized (_readyEvents) { - for (int i = 0; i < eventsToFire.size(); i++) - _readyEvents.add(eventsToFire.get(i)); - _readyEvents.notifyAll(); - } + private static final SimpleTimer _instance = new SimpleTimer(); - if (_occurredTime == now) { - _occurredEventCount += eventsToFire.size(); - } else { - _occurredTime = now; - if (_occurredEventCount > 2500) { - StringBuffer buf = new StringBuffer(128); - buf.append("Too many simpleTimerJobs (").append(_occurredEventCount); - buf.append(") in a second!"); - _log.log(Log.WARN, buf.toString()); - } - _occurredEventCount = 0; - } + public static SimpleTimer getInstance() { + return _instance; + } + private I2PAppContext _context; + private Log _log; + /** event time (Long) to event (TimedEvent) mapping */ + private TreeMap _events; + /** event (TimedEvent) to event time (Long) mapping */ + private Map _eventTimes; + private List _readyEvents; + private SimpleStore runn; - eventsToFire.clear(); - } - } - } + /** + * + */ + protected SimpleTimer() { + this("SimpleTimer"); + } + + /** + * + * @param name + */ + protected SimpleTimer(String name) { + runn = new SimpleStore(true); + _context = I2PAppContext.getGlobalContext(); + _log = _context.logManager().getLog(SimpleTimer.class); + _events = new TreeMap(); + _eventTimes = new HashMap(256); + _readyEvents = new ArrayList(4); + I2PThread runner = new I2PThread(new SimpleTimerRunner()); + runner.setName(name); + runner.setDaemon(true); + runner.start(); + for(int i = 0; i < 3; i++) { + I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn)); + executor.setName(name + "Executor " + i); + executor.setDaemon(true); + executor.start(); + } + } + + /** + * Removes the SimpleTimer. + */ + public void removeSimpleTimer() { + synchronized(_events) { + runn.setAnswer(false); + _events.notifyAll(); + } + } + + /** + * + * @param event + * @param timeoutMs + */ + public void reschedule(TimedEvent event, long timeoutMs) { + addEvent(event, timeoutMs, false); + } + + /** + * Queue up the given event to be fired no sooner than timeoutMs from now. + * However, if this event is already scheduled, the event will be scheduled + * for the earlier of the two timeouts, which may be before this stated + * timeout. If this is not the desired behavior, call removeEvent first. + * + * @param event + * @param timeoutMs + */ + public void addEvent(TimedEvent event, long timeoutMs) { + addEvent(event, timeoutMs, true); + } + + /** + * @param event + * @param timeoutMs + * @param useEarliestTime if its already scheduled, use the earlier of the + * two timeouts, else use the later + */ + public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) { + int totalEvents = 0; + long now = System.currentTimeMillis(); + long eventTime = now + timeoutMs; + Long time = new Long(eventTime); + synchronized(_events) { + // remove the old scheduled position, then reinsert it + Long oldTime = (Long)_eventTimes.get(event); + if(oldTime != null) { + if(useEarliestTime) { + if(oldTime.longValue() < eventTime) { + _events.notifyAll(); + return; // already scheduled for sooner than requested + } else { + _events.remove(oldTime); + } + } else { + if(oldTime.longValue() > eventTime) { + _events.notifyAll(); + return; // already scheduled for later than the given period + } else { + _events.remove(oldTime); + } + } + } + while(_events.containsKey(time)) { + time = new Long(time.longValue() + 1); + } + _events.put(time, event); + _eventTimes.put(event, time); + + if((_events.size() != _eventTimes.size())) { + _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size()); + for(Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext();) { + TimedEvent evt = (TimedEvent)iter.next(); + Long when = (Long)_eventTimes.get(evt); + TimedEvent cur = (TimedEvent)_events.get(when); + if(cur != evt) { + _log.error("event " + evt + " @ " + when + ": " + cur); + } + } + } + + totalEvents = _events.size(); + _events.notifyAll(); + } + if(time.longValue() > eventTime + 100) { + if(_log.shouldLog(Log.WARN)) { + _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")"); + } + } + long timeToAdd = System.currentTimeMillis() - now; + if(timeToAdd > 50) { + if(_log.shouldLog(Log.WARN)) { + _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued"); + } + } + + } + + /** + * + * @param evt + * @return + */ + public boolean removeEvent(TimedEvent evt) { + if(evt == null) { + return false; + } + synchronized(_events) { + Long when = (Long)_eventTimes.remove(evt); + if(when != null) { + _events.remove(when); + } + return null != when; + } + } + + /** + * Simple interface for events to be queued up and notified on expiration + */ + public interface TimedEvent { + + /** + * the time requested has been reached (this call should NOT block, + * otherwise the whole SimpleTimer gets backed up) + * + */ + public void timeReached(); + } + private long _occurredTime; + private long _occurredEventCount; + // not used + // private TimedEvent _recentEvents[] = new TimedEvent[5]; + private class SimpleTimerRunner implements Runnable { + + public void run() { + List eventsToFire = new ArrayList(1); + while(runn.getAnswer()) { + try { + synchronized(_events) { + //if (_events.size() <= 0) + // _events.wait(); + //if (_events.size() > 100) + // _log.warn("> 100 events! " + _events.values()); + long now = System.currentTimeMillis(); + long nextEventDelay = -1; + Object nextEvent = null; + while(runn.getAnswer()) { + if(_events.size() <= 0) { + break; + } + Long when = (Long)_events.firstKey(); + if(when.longValue() <= now) { + TimedEvent evt = (TimedEvent)_events.remove(when); + if(evt != null) { + _eventTimes.remove(evt); + eventsToFire.add(evt); + } + } else { + nextEventDelay = when.longValue() - now; + nextEvent = _events.get(when); + break; + } + } + if(eventsToFire.size() <= 0) { + if(nextEventDelay != -1) { + if(_log.shouldLog(Log.DEBUG)) { + _log.debug("Next event in " + nextEventDelay + ": " + nextEvent); + } + _events.wait(nextEventDelay); + } else { + _events.wait(); + } + } + } + } catch(InterruptedException ie) { + // ignore + } catch(Throwable t) { + if(_log != null) { + _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t); + } else { + System.err.println("Uncaught exception in SimpleTimer"); + t.printStackTrace(); + } + } + + long now = System.currentTimeMillis(); + now = now - (now % 1000); + + synchronized(_readyEvents) { + for(int i = 0; i < eventsToFire.size(); i++) { + _readyEvents.add(eventsToFire.get(i)); + } + _readyEvents.notifyAll(); + } + + if(_occurredTime == now) { + _occurredEventCount += eventsToFire.size(); + } else { + _occurredTime = now; + if(_occurredEventCount > 2500) { + StringBuffer buf = new StringBuffer(128); + buf.append("Too many simpleTimerJobs (").append(_occurredEventCount); + buf.append(") in a second!"); + _log.log(Log.WARN, buf.toString()); + } + _occurredEventCount = 0; + } + + eventsToFire.clear(); + } + } + } }