From ee2fd32a9736212b153d703f455afd375848b26e Mon Sep 17 00:00:00 2001 From: sponge Date: Thu, 25 Sep 2008 23:31:57 +0000 Subject: [PATCH] disapproval of revision 'bd09bb36a90e766b3a406d78055d427a6200dd41' --- .../net/i2p/i2ptunnel/I2PTunnelServer.java | 5 +- .../i2p/client/streaming/I2PServerSocket.java | 55 +- .../client/streaming/I2PServerSocketImpl.java | 285 +++--- .../client/streaming/StreamSinkServer.java | 369 ++++---- .../client/streaming/ConnectionHandler.java | 1 - .../client/streaming/ConnectionManager.java | 836 ++++++++---------- .../client/streaming/I2PServerSocketFull.java | 58 +- .../i2p/client/streaming/I2PSocketFull.java | 250 +++--- .../streaming/I2PSocketManagerFull.java | 508 +++++------ .../client/streaming/RetransmissionTimer.java | 2 +- core/java/src/net/i2p/util/Executor.java | 91 +- core/java/src/net/i2p/util/SimpleStore.java | 35 - core/java/src/net/i2p/util/SimpleTimer.java | 457 +++++----- 13 files changed, 1279 insertions(+), 1673 deletions(-) delete mode 100644 core/java/src/net/i2p/util/SimpleStore.java diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 40566fe41..014d91e9b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -12,7 +12,6 @@ import java.net.ConnectException; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; -import java.net.SocketTimeoutException; import java.util.Iterator; import java.util.Properties; @@ -220,9 +219,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { if (_log.shouldLog(Log.ERROR)) _log.error("Error accepting", ce); // not killing the server.. - } catch(SocketTimeoutException ste) { - // ignored, we never set the timeout - } + } } } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index 7c9927395..726d462ce 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -2,7 +2,6 @@ package net.i2p.client.streaming; import java.net.ConnectException; -import java.net.SocketTimeoutException; import net.i2p.I2PException; /** @@ -10,40 +9,26 @@ import net.i2p.I2PException; * */ public interface I2PServerSocket { + /** + * Closes the socket. + */ + public void close() throws I2PException; - /** - * Closes the socket. - */ - public void close() throws I2PException; + /** + * Waits for the next socket connecting. If a remote user tried to make a + * connection and the local application wasn't .accept()ing new connections, + * they should get refused (if .accept() doesnt occur in some small period) + * + * @return a connected I2PSocket + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws ConnectException if the I2PServerSocket is closed + */ + public I2PSocket accept() throws I2PException, ConnectException; - /** - * Waits for the next socket connecting. If a remote user tried to make a - * connection and the local application wasn't .accept()ing new connections, - * they should get refused (if .accept() doesnt occur in some small period) - * - * @return a connected I2PSocket - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - * @throws ConnectException if the I2PServerSocket is closed - * @throws SocketTimeoutException - */ - public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; - - /** - * Set Sock Option accept timeout - * @param x - */ - public void setSoTimeout(long x); - - /** - * Get Sock Option accept timeout - * @return timeout - */ - public long getSoTimeout(); - - /** - * Access the manager which is coordinating the server socket - */ - public I2PSocketManager getManager(); + /** + * Access the manager which is coordinating the server socket + */ + public I2PSocketManager getManager(); } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 2e3dfdb6b..965ba31bf 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -17,159 +17,134 @@ import net.i2p.util.Log; * */ class I2PServerSocketImpl implements I2PServerSocket { - - private final static Log _log = new Log(I2PServerSocketImpl.class); - private I2PSocketManager mgr; - /** list of sockets waiting for the client to accept them */ - private List pendingSockets = Collections.synchronizedList(new ArrayList(4)); - /** have we been closed */ - private volatile boolean closing = false; - /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ - private Object socketAcceptedLock = new Object(); - /** lock on this when adding a new socket to the pending list, and wait on it accordingly */ - private Object socketAddedLock = new Object(); - - /** - * Set Sock Option accept timeout stub, does nothing - * @param x - */ - public void setSoTimeout(long x) { - } - - /** - * Get Sock Option accept timeout stub, does nothing - * @return timeout - */ - public long getSoTimeout() { - return -1; - } - - public I2PServerSocketImpl(I2PSocketManager mgr) { - this.mgr = mgr; - } - - /** - * Waits for the next socket connecting. If a remote user tried to make a - * connection and the local application wasn't .accept()ing new connections, - * they should get refused (if .accept() doesnt occur in some small period - - * currently 5 seconds) - * - * @return a connected I2PSocket - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - * @throws ConnectException if the I2PServerSocket is closed - */ - public I2PSocket accept() throws I2PException, ConnectException { - if(_log.shouldLog(Log.DEBUG)) { - _log.debug("accept() called, pending: " + pendingSockets.size()); - } - I2PSocket ret = null; - - while((ret == null) && (!closing)) { - while(pendingSockets.size() <= 0) { - if(closing) { - throw new ConnectException("I2PServerSocket closed"); - } - try { - synchronized(socketAddedLock) { - socketAddedLock.wait(); - } - } catch(InterruptedException ie) { - } - } - synchronized(pendingSockets) { - if(pendingSockets.size() > 0) { - ret = (I2PSocket)pendingSockets.remove(0); - } - } - if(ret != null) { - synchronized(socketAcceptedLock) { - socketAcceptedLock.notifyAll(); - } - } - } - - if(_log.shouldLog(Log.DEBUG)) { - _log.debug("TIMING: handed out accept result " + ret.hashCode()); - } - return ret; - } - - /** - * Make the socket available and wait until the client app accepts it, or until - * the given timeout elapses. This doesn't have any limits on the queue size - - * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse) - * - * @param timeoutMs how long to wait until accept - * @return true if the socket was accepted, false if the timeout expired - * or the socket was closed - */ - public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { - if(_log.shouldLog(Log.DEBUG)) { - _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); - } - if(closing) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Already closing the socket"); - } - return false; - } - - Clock clock = I2PAppContext.getGlobalContext().clock(); - long start = clock.now(); - long end = start + timeoutMs; - pendingSockets.add(s); - synchronized(socketAddedLock) { - socketAddedLock.notifyAll(); - } - - // keep looping until the socket has been grabbed by the accept() - // (or the expiration passes, or the socket is closed) - while(pendingSockets.contains(s)) { - long now = clock.now(); - if(now >= end) { - if(_log.shouldLog(Log.INFO)) { - _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); - } - pendingSockets.remove(s); - return false; - } - if(closing) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Server socket closed while waiting for accept"); - } - pendingSockets.remove(s); - return false; - } - long remaining = end - now; - try { - synchronized(socketAcceptedLock) { - socketAcceptedLock.wait(remaining); - } - } catch(InterruptedException ie) { - } - } - long now = clock.now(); - if(_log.shouldLog(Log.DEBUG)) { - _log.info("Socket accepted after " + (now - start) + "ms for socket " + s.toString()); - } - return true; - } - - public void close() { - closing = true; - // let anyone .accept()ing know to fsck off - synchronized(socketAddedLock) { - socketAddedLock.notifyAll(); - } - // let anyone addWaitForAccept()ing know to fsck off - synchronized(socketAcceptedLock) { - socketAcceptedLock.notifyAll(); - } - } - - public I2PSocketManager getManager() { - return mgr; - } + private final static Log _log = new Log(I2PServerSocketImpl.class); + private I2PSocketManager mgr; + /** list of sockets waiting for the client to accept them */ + private List pendingSockets = Collections.synchronizedList(new ArrayList(4)); + + /** have we been closed */ + private volatile boolean closing = false; + + /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ + private Object socketAcceptedLock = new Object(); + /** lock on this when adding a new socket to the pending list, and wait on it accordingly */ + private Object socketAddedLock = new Object(); + + public I2PServerSocketImpl(I2PSocketManager mgr) { + this.mgr = mgr; + } + + /** + * Waits for the next socket connecting. If a remote user tried to make a + * connection and the local application wasn't .accept()ing new connections, + * they should get refused (if .accept() doesnt occur in some small period - + * currently 5 seconds) + * + * @return a connected I2PSocket + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws ConnectException if the I2PServerSocket is closed + */ + public I2PSocket accept() throws I2PException, ConnectException { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("accept() called, pending: " + pendingSockets.size()); + + I2PSocket ret = null; + + while ( (ret == null) && (!closing) ){ + while (pendingSockets.size() <= 0) { + if (closing) throw new ConnectException("I2PServerSocket closed"); + try { + synchronized(socketAddedLock) { + socketAddedLock.wait(); + } + } catch (InterruptedException ie) {} + } + synchronized (pendingSockets) { + if (pendingSockets.size() > 0) { + ret = (I2PSocket)pendingSockets.remove(0); + } + } + if (ret != null) { + synchronized (socketAcceptedLock) { + socketAcceptedLock.notifyAll(); + } + } + } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("TIMING: handed out accept result " + ret.hashCode()); + return ret; + } + + /** + * Make the socket available and wait until the client app accepts it, or until + * the given timeout elapses. This doesn't have any limits on the queue size - + * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse) + * + * @param timeoutMs how long to wait until accept + * @return true if the socket was accepted, false if the timeout expired + * or the socket was closed + */ + public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); + + if (closing) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Already closing the socket"); + return false; + } + + Clock clock = I2PAppContext.getGlobalContext().clock(); + long start = clock.now(); + long end = start + timeoutMs; + pendingSockets.add(s); + synchronized (socketAddedLock) { + socketAddedLock.notifyAll(); + } + + // keep looping until the socket has been grabbed by the accept() + // (or the expiration passes, or the socket is closed) + while (pendingSockets.contains(s)) { + long now = clock.now(); + if (now >= end) { + if (_log.shouldLog(Log.INFO)) + _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); + pendingSockets.remove(s); + return false; + } + if (closing) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Server socket closed while waiting for accept"); + pendingSockets.remove(s); + return false; + } + long remaining = end - now; + try { + synchronized (socketAcceptedLock) { + socketAcceptedLock.wait(remaining); + } + } catch (InterruptedException ie) {} + } + long now = clock.now(); + if (_log.shouldLog(Log.DEBUG)) + _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString()); + return true; + } + + public void close() { + closing = true; + // let anyone .accept()ing know to fsck off + synchronized (socketAddedLock) { + socketAddedLock.notifyAll(); + } + // let anyone addWaitForAccept()ing know to fsck off + synchronized (socketAcceptedLock) { + socketAcceptedLock.notifyAll(); + } + } + + public I2PSocketManager getManager() { return mgr; } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java index 9f12be6c4..c8b566190 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -5,7 +5,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.ConnectException; -import java.net.SocketTimeoutException; import java.util.Properties; import net.i2p.I2PAppContext; @@ -21,203 +20,173 @@ import net.i2p.util.Log; * */ public class StreamSinkServer { - - private Log _log; - private String _sinkDir; - private String _destFile; - private String _i2cpHost; - private int _i2cpPort; - private int _handlers; - - /** - * Create but do not start the streaming server. - * - * @param sinkDir Directory to store received files in - * @param ourDestFile filename to write our binary destination to - */ - public StreamSinkServer(String sinkDir, String ourDestFile) { - this(sinkDir, ourDestFile, null, -1, 3); - } - - public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) { - _sinkDir = sinkDir; - _destFile = ourDestFile; - _i2cpHost = i2cpHost; - _i2cpPort = i2cpPort; - _handlers = handlers; - _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class); - } - - /** - * Actually fire up the server - this call blocks forever (or until the server - * socket closes) - * - */ - public void runServer() { - I2PSocketManager mgr = null; - if(_i2cpHost != null) { - mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties()); - } else { - mgr = I2PSocketManagerFactory.createManager(); - } - Destination dest = mgr.getSession().getMyDestination(); - if(_log.shouldLog(Log.INFO)) { - _log.info("Listening for connections on: " + dest.calculateHash().toBase64()); - } - FileOutputStream fos = null; - try { - fos = new FileOutputStream(_destFile); - dest.writeBytes(fos); - } catch(IOException ioe) { - _log.error("Error writing out our destination to " + _destFile, ioe); - return; - } catch(DataFormatException dfe) { - _log.error("Error formatting the destination", dfe); - return; - } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException ioe) { - } - } - } - - I2PServerSocket sock = mgr.getServerSocket(); - startup(sock); - } - - public void startup(I2PServerSocket sock) { - for(int i = 0; i < _handlers; i++) { - I2PThread t = new I2PThread(new ClientRunner(sock)); - t.setName("Handler " + i); - t.setDaemon(false); - t.start(); - } - } - - /** - * Actually deal with a client - pull anything they send us and write it to a file. - * - */ - private class ClientRunner implements Runnable { - - private I2PServerSocket _socket; - - public ClientRunner(I2PServerSocket socket) { - _socket = socket; - } - - public void run() { - while(true) { - try { - I2PSocket socket = _socket.accept(); - if(socket != null) { - handle(socket); - } - } catch(I2PException ie) { - _log.error("Error accepting connection", ie); - return; - } catch(ConnectException ce) { - _log.error("Connection already dropped", ce); - return; - } catch(SocketTimeoutException ste) { - // ignored - } - } - } - - private void handle(I2PSocket sock) { - FileOutputStream fos = null; - try { - File sink = new File(_sinkDir); - if(!sink.exists()) { - sink.mkdirs(); - } - File cur = File.createTempFile("clientSink", ".dat", sink); - fos = new FileOutputStream(cur); - if(_log.shouldLog(Log.DEBUG)) { - _log.debug("Writing to " + cur.getAbsolutePath()); - } - } catch(IOException ioe) { - _log.error("Error creating sink", ioe); - return; - } - - long start = System.currentTimeMillis(); - try { - InputStream in = sock.getInputStream(); - byte buf[] = new byte[4096]; - long written = 0; - int read = 0; - while((read = in.read(buf)) != -1) { - //_fos.write(buf, 0, read); - written += read; - if(_log.shouldLog(Log.DEBUG)) { - _log.debug("read and wrote " + read + " (" + written + ")"); - } - } - fos.write(("written: [" + written + "]\n").getBytes()); - long lifetime = System.currentTimeMillis() - start; - _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); - } catch(IOException ioe) { - _log.error("Error writing the sink", ioe); - } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException ioe) { - } - } - if(sock != null) { - try { - sock.close(); - } catch(IOException ioe) { - } - } - _log.debug("Client socket closed"); - } - } - } - - /** - * Fire up the streaming server. Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]
- * - */ - public static void main(String args[]) { - StreamSinkServer server = null; - switch(args.length) { - case 0: - server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3); - break; - case 2: - server = new StreamSinkServer(args[0], args[1]); - break; - case 4: - case 5: - int handlers = 3; - if(args.length == 5) { - try { - handlers = Integer.parseInt(args[4]); - } catch(NumberFormatException nfe) { - } - } - try { - int port = Integer.parseInt(args[1]); - server = new StreamSinkServer(args[2], args[3], args[0], port, handlers); - } catch(NumberFormatException nfe) { - System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); - } - break; - default: - System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); - } - if(server != null) { - server.runServer(); - } - } + private Log _log; + private String _sinkDir; + private String _destFile; + private String _i2cpHost; + private int _i2cpPort; + private int _handlers; + + /** + * Create but do not start the streaming server. + * + * @param sinkDir Directory to store received files in + * @param ourDestFile filename to write our binary destination to + */ + public StreamSinkServer(String sinkDir, String ourDestFile) { + this(sinkDir, ourDestFile, null, -1, 3); + } + public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) { + _sinkDir = sinkDir; + _destFile = ourDestFile; + _i2cpHost = i2cpHost; + _i2cpPort = i2cpPort; + _handlers = handlers; + _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class); + } + + /** + * Actually fire up the server - this call blocks forever (or until the server + * socket closes) + * + */ + public void runServer() { + I2PSocketManager mgr = null; + if (_i2cpHost != null) + mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties()); + else + mgr = I2PSocketManagerFactory.createManager(); + Destination dest = mgr.getSession().getMyDestination(); + if (_log.shouldLog(Log.INFO)) + _log.info("Listening for connections on: " + dest.calculateHash().toBase64()); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(_destFile); + dest.writeBytes(fos); + } catch (IOException ioe) { + _log.error("Error writing out our destination to " + _destFile, ioe); + return; + } catch (DataFormatException dfe) { + _log.error("Error formatting the destination", dfe); + return; + } finally { + if (fos != null) try { fos.close(); } catch (IOException ioe) {} + } + + I2PServerSocket sock = mgr.getServerSocket(); + startup(sock); + } + + public void startup(I2PServerSocket sock) { + for (int i = 0; i < _handlers; i++) { + I2PThread t = new I2PThread(new ClientRunner(sock)); + t.setName("Handler " + i); + t.setDaemon(false); + t.start(); + } + } + + /** + * Actually deal with a client - pull anything they send us and write it to a file. + * + */ + private class ClientRunner implements Runnable { + private I2PServerSocket _socket; + public ClientRunner(I2PServerSocket socket) { + _socket = socket; + } + public void run() { + while (true) { + try { + I2PSocket socket = _socket.accept(); + if (socket != null) + handle(socket); + } catch (I2PException ie) { + _log.error("Error accepting connection", ie); + return; + } catch (ConnectException ce) { + _log.error("Connection already dropped", ce); + return; + } + } + } + + private void handle(I2PSocket sock) { + FileOutputStream fos = null; + try { + File sink = new File(_sinkDir); + if (!sink.exists()) + sink.mkdirs(); + File cur = File.createTempFile("clientSink", ".dat", sink); + fos = new FileOutputStream(cur); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Writing to " + cur.getAbsolutePath()); + } catch (IOException ioe) { + _log.error("Error creating sink", ioe); + return; + } + + long start = System.currentTimeMillis(); + try { + InputStream in = sock.getInputStream(); + byte buf[] = new byte[4096]; + long written = 0; + int read = 0; + while ( (read = in.read(buf)) != -1) { + //_fos.write(buf, 0, read); + written += read; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read and wrote " + read + " (" + written + ")"); + } + fos.write(("written: [" + written + "]\n").getBytes()); + long lifetime = System.currentTimeMillis() - start; + _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); + } catch (IOException ioe) { + _log.error("Error writing the sink", ioe); + } finally { + if (fos != null) try { fos.close(); } catch (IOException ioe) {} + if (sock != null) try { sock.close(); } catch (IOException ioe) {} + _log.debug("Client socket closed"); + } + } + } + + /** + * Fire up the streaming server. Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]
+ * + */ + public static void main(String args[]) { + StreamSinkServer server = null; + switch (args.length) { + case 0: + server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3); + break; + case 2: + server = new StreamSinkServer(args[0], args[1]); + break; + case 4: + case 5: + int handlers = 3; + if (args.length == 5) { + try { + handlers = Integer.parseInt(args[4]); + } catch (NumberFormatException nfe) {} + } + try { + int port = Integer.parseInt(args[1]); + server = new StreamSinkServer(args[2], args[3], args[0], port, handlers); + } catch (NumberFormatException nfe) { + System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); + } + break; + default: + System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); + } + if (server != null) + server.runServer(); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index f05ae1c8c..4960f1a22 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -1,6 +1,5 @@ package net.i2p.client.streaming; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 08d794877..dcc93c5ec 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -21,459 +21,393 @@ import net.i2p.util.SimpleTimer; * */ public class ConnectionManager { + private I2PAppContext _context; + private Log _log; + private I2PSession _session; + private MessageHandler _messageHandler; + private PacketHandler _packetHandler; + private ConnectionHandler _connectionHandler; + private PacketQueue _outboundQueue; + private SchedulerChooser _schedulerChooser; + private ConnectionPacketHandler _conPacketHandler; + /** Inbound stream ID (Long) to Connection map */ + private Map _connectionByInboundId; + /** Ping ID (Long) to PingRequest */ + private Map _pendingPings; + private boolean _allowIncoming; + private int _maxConcurrentStreams; + private ConnectionOptions _defaultOptions; + private volatile int _numWaiting; + private Object _connectionLock; + + public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { + _context = context; + _log = context.logManager().getLog(ConnectionManager.class); + _connectionByInboundId = new HashMap(32); + _pendingPings = new HashMap(4); + _connectionLock = new Object(); + _messageHandler = new MessageHandler(context, this); + _packetHandler = new PacketHandler(context, this); + _connectionHandler = new ConnectionHandler(context, this); + _schedulerChooser = new SchedulerChooser(context); + _conPacketHandler = new ConnectionPacketHandler(context); + _session = session; + session.setSessionListener(_messageHandler); + _outboundQueue = new PacketQueue(context, session, this); + _allowIncoming = false; + _maxConcurrentStreams = maxConcurrent; + _defaultOptions = defaultOptions; + _numWaiting = 0; + _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + } + + Connection getConnectionByInboundId(long id) { + synchronized (_connectionLock) { + return (Connection)_connectionByInboundId.get(new Long(id)); + } + } + /** + * not guaranteed to be unique, but in case we receive more than one packet + * on an inbound connection that we havent ack'ed yet... + */ + Connection getConnectionByOutboundId(long id) { + synchronized (_connectionLock) { + for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + if (DataHelper.eq(con.getSendStreamId(), id)) + return con; + } + } + return null; + } + + public void setAllowIncomingConnections(boolean allow) { + _connectionHandler.setActive(allow); + } + /** should we acceot connections, or just reject everyone? */ + public boolean getAllowIncomingConnections() { + return _connectionHandler.getActive(); + } + + /** + * Create a new connection based on the SYN packet we received. + * + * @return created Connection with the packet's data already delivered to + * it, or null if the syn's streamId was already taken + */ + public Connection receiveConnection(Packet synPacket) { + Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); + long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; + boolean reject = false; + int active = 0; + int total = 0; + synchronized (_connectionLock) { + total = _connectionByInboundId.size(); + for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + if ( ((Connection)iter.next()).getIsConnected() ) + active++; + } + if (locked_tooManyStreams()) { + reject = true; + } else { + while (true) { + Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con); + if (oldCon == null) { + break; + } else { + _connectionByInboundId.put(new Long(receiveId), oldCon); + // receiveId already taken, try another + receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; + } + } + } + } + + _context.statManager().addRateData("stream.receiveActive", active, total); + + if (reject) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Refusing connection since we have exceeded our max of " + + _maxConcurrentStreams + " connections"); + PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom()); + reply.setFlag(Packet.FLAG_RESET); + reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + reply.setAckThrough(synPacket.getSequenceNum()); + reply.setSendStreamId(synPacket.getReceiveStreamId()); + reply.setReceiveStreamId(0); + reply.setOptionalFrom(_session.getMyDestination()); + // this just sends the packet - no retries or whatnot + _outboundQueue.enqueue(reply); + return null; + } + + con.setReceiveStreamId(receiveId); + try { + con.getPacketHandler().receivePacket(synPacket, con); + } catch (I2PException ie) { + synchronized (_connectionLock) { + _connectionByInboundId.remove(new Long(receiveId)); + } + return null; + } + + _context.statManager().addRateData("stream.connectionReceived", 1, 0); + return con; + } + + private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000; + + /** + * Build a new connection to the given peer. This blocks if there is no + * connection delay, otherwise it returns immediately. + * + * @return new connection, or null if we have exceeded our limit + */ + public Connection connect(Destination peer, ConnectionOptions opts) { + Connection con = null; + long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; + long expiration = _context.clock().now() + opts.getConnectTimeout(); + if (opts.getConnectTimeout() <= 0) + expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; + _numWaiting++; + while (true) { + long remaining = expiration - _context.clock().now(); + if (remaining <= 0) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Refusing to connect since we have exceeded our max of " + + _maxConcurrentStreams + " connections"); + _numWaiting--; + return null; + } + boolean reject = false; + synchronized (_connectionLock) { + if (locked_tooManyStreams()) { + // allow a full buffer of pending/waiting streams + if (_numWaiting > _maxConcurrentStreams) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Refusing connection since we have exceeded our max of " + + _maxConcurrentStreams + " and there are " + _numWaiting + + " waiting already"); + _numWaiting--; + return null; + } + + // no remaining streams, lets wait a bit + try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} + } else { + con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); + con.setRemotePeer(peer); + + while (_connectionByInboundId.containsKey(new Long(receiveId))) { + receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; + } + _connectionByInboundId.put(new Long(receiveId), con); + break; // stop looping as a psuedo-wait + } + } + } - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private MessageHandler _messageHandler; - private PacketHandler _packetHandler; - private ConnectionHandler _connectionHandler; - private PacketQueue _outboundQueue; - private SchedulerChooser _schedulerChooser; - private ConnectionPacketHandler _conPacketHandler; - /** Inbound stream ID (Long) to Connection map */ - private Map _connectionByInboundId; - /** Ping ID (Long) to PingRequest */ - private Map _pendingPings; - private boolean _allowIncoming; - private int _maxConcurrentStreams; - private ConnectionOptions _defaultOptions; - private volatile int _numWaiting; - private Object _connectionLock; - private long SoTimeout; + // ok we're in... + con.setReceiveStreamId(receiveId); + con.eventOccurred(); + + _log.debug("Connect() conDelay = " + opts.getConnectDelay()); + if (opts.getConnectDelay() <= 0) { + con.waitForConnect(); + } + if (_numWaiting > 0) + _numWaiting--; + + _context.statManager().addRateData("stream.connectionCreated", 1, 0); + return con; + } - public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { - _context = context; - _log = context.logManager().getLog(ConnectionManager.class); - _connectionByInboundId = new HashMap(32); - _pendingPings = new HashMap(4); - _connectionLock = new Object(); - _messageHandler = new MessageHandler(context, this); - _packetHandler = new PacketHandler(context, this); - _connectionHandler = new ConnectionHandler(context, this); - _schedulerChooser = new SchedulerChooser(context); - _conPacketHandler = new ConnectionPacketHandler(context); - _session = session; - session.setSessionListener(_messageHandler); - _outboundQueue = new PacketQueue(context, session, this); - _allowIncoming = false; - _maxConcurrentStreams = maxConcurrent; - _defaultOptions = defaultOptions; - _numWaiting = 0; - /** Socket timeout for accept() */ - SoTimeout = -1; + private boolean locked_tooManyStreams() { + if (_maxConcurrentStreams <= 0) return false; + if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; + int active = 0; + for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + if (con.getIsConnected()) + active++; + } + + if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) ) + _log.info("More than 100 connections! " + active + + " total: " + _connectionByInboundId.size()); - _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); - } + return (active >= _maxConcurrentStreams); + } + + public MessageHandler getMessageHandler() { return _messageHandler; } + public PacketHandler getPacketHandler() { return _packetHandler; } + public ConnectionHandler getConnectionHandler() { return _connectionHandler; } + public I2PSession getSession() { return _session; } + public PacketQueue getPacketQueue() { return _outboundQueue; } + + /** + * Something b0rked hard, so kill all of our connections without mercy. + * Don't bother sending close packets. + * + */ + public void disconnectAllHard() { + synchronized (_connectionLock) { + for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + con.disconnect(false, false); + } + _connectionByInboundId.clear(); + _connectionLock.notifyAll(); + } + } + + /** + * Drop the (already closed) connection on the floor. + * + */ + public void removeConnection(Connection con) { + boolean removed = false; + synchronized (_connectionLock) { + Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId())); + removed = (o == con); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Connection removed? " + removed + " remaining: " + + _connectionByInboundId.size() + ": " + con); + if (!removed && _log.shouldLog(Log.DEBUG)) + _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values()); + _connectionLock.notifyAll(); + } + if (removed) { + _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime()); + } + } + + /** return a set of Connection objects */ + public Set listConnections() { + synchronized (_connectionLock) { + return new HashSet(_connectionByInboundId.values()); + } + } + + public boolean ping(Destination peer, long timeoutMs) { + return ping(peer, timeoutMs, true); + } + public boolean ping(Destination peer, long timeoutMs, boolean blocking) { + return ping(peer, timeoutMs, blocking, null, null, null); + } + public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { + Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1); + PacketLocal packet = new PacketLocal(_context, peer); + packet.setSendStreamId(id.longValue()); + packet.setFlag(Packet.FLAG_ECHO); + packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + packet.setOptionalFrom(_session.getMyDestination()); + if ( (keyToUse != null) && (tagsToSend != null) ) { + packet.setKeyUsed(keyToUse); + packet.setTagsSent(tagsToSend); + } + + PingRequest req = new PingRequest(peer, packet, notifier); + + synchronized (_pendingPings) { + _pendingPings.put(id, req); + } + + _outboundQueue.enqueue(packet); + packet.releasePayload(); + + if (blocking) { + synchronized (req) { + if (!req.pongReceived()) + try { req.wait(timeoutMs); } catch (InterruptedException ie) {} + } + + synchronized (_pendingPings) { + _pendingPings.remove(id); + } + } else { + SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs); + } + + boolean ok = req.pongReceived(); + return ok; + } - Connection getConnectionByInboundId(long id) { - synchronized(_connectionLock) { - return (Connection)_connectionByInboundId.get(new Long(id)); - } - } - - /** - * not guaranteed to be unique, but in case we receive more than one packet - * on an inbound connection that we havent ack'ed yet... - */ - Connection getConnectionByOutboundId(long id) { - synchronized(_connectionLock) { - for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { - Connection con = (Connection)iter.next(); - if(DataHelper.eq(con.getSendStreamId(), id)) { - return con; - } - } - } - return null; - } - - /** - * Set the socket accept() timeout. - * @param x - */ - public void MsetSoTimeout(long x) { - SoTimeout = x; - } - - /** - * Get the socket accept() timeout. - * @return - */ - public long MgetSoTimeout() { - return SoTimeout; - } - - public void setAllowIncomingConnections(boolean allow) { - _connectionHandler.setActive(allow); - } - - /** should we acceot connections, or just reject everyone? */ - public boolean getAllowIncomingConnections() { - return _connectionHandler.getActive(); - } - - /** - * Create a new connection based on the SYN packet we received. - * - * @return created Connection with the packet's data already delivered to - * it, or null if the syn's streamId was already taken - */ - public Connection receiveConnection(Packet synPacket) { - Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); - long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; - boolean reject = false; - int active = 0; - int total = 0; - synchronized(_connectionLock) { - total = _connectionByInboundId.size(); - for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { - if(((Connection)iter.next()).getIsConnected()) { - active++; - } - } - if(locked_tooManyStreams()) { - reject = true; - } else { - while(true) { - Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con); - if(oldCon == null) { - break; - } else { - _connectionByInboundId.put(new Long(receiveId), oldCon); - // receiveId already taken, try another - receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; - } - } - } - } - - _context.statManager().addRateData("stream.receiveActive", active, total); - - if(reject) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections"); - } - PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom()); - reply.setFlag(Packet.FLAG_RESET); - reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); - reply.setAckThrough(synPacket.getSequenceNum()); - reply.setSendStreamId(synPacket.getReceiveStreamId()); - reply.setReceiveStreamId(0); - reply.setOptionalFrom(_session.getMyDestination()); - // this just sends the packet - no retries or whatnot - _outboundQueue.enqueue(reply); - return null; - } - - con.setReceiveStreamId(receiveId); - try { - con.getPacketHandler().receivePacket(synPacket, con); - } catch(I2PException ie) { - synchronized(_connectionLock) { - _connectionByInboundId.remove(new Long(receiveId)); - } - return null; - } - - _context.statManager().addRateData("stream.connectionReceived", 1, 0); - return con; - } - private static final long DEFAULT_STREAM_DELAY_MAX = 10 * 1000; - - /** - * Build a new connection to the given peer. This blocks if there is no - * connection delay, otherwise it returns immediately. - * - * @return new connection, or null if we have exceeded our limit - */ - public Connection connect(Destination peer, ConnectionOptions opts) { - Connection con = null; - long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; - long expiration = _context.clock().now() + opts.getConnectTimeout(); - if(opts.getConnectTimeout() <= 0) { - expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; - } - _numWaiting++; - while(true) { - long remaining = expiration - _context.clock().now(); - if(remaining <= 0) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections"); - } - _numWaiting--; - return null; - } - boolean reject = false; - synchronized(_connectionLock) { - if(locked_tooManyStreams()) { - // allow a full buffer of pending/waiting streams - if(_numWaiting > _maxConcurrentStreams) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already"); - } - _numWaiting--; - return null; - } - - // no remaining streams, lets wait a bit - try { - _connectionLock.wait(remaining); - } catch(InterruptedException ie) { - } - } else { - con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); - con.setRemotePeer(peer); - - while(_connectionByInboundId.containsKey(new Long(receiveId))) { - receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; - } - _connectionByInboundId.put(new Long(receiveId), con); - break; // stop looping as a psuedo-wait - } - } - } - - // ok we're in... - con.setReceiveStreamId(receiveId); - con.eventOccurred(); - - _log.debug("Connect() conDelay = " + opts.getConnectDelay()); - if(opts.getConnectDelay() <= 0) { - con.waitForConnect(); - } - if(_numWaiting > 0) { - _numWaiting--; - } - _context.statManager().addRateData("stream.connectionCreated", 1, 0); - return con; - } - - private boolean locked_tooManyStreams() { - if(_maxConcurrentStreams <= 0) { - return false; - } - if(_connectionByInboundId.size() < _maxConcurrentStreams) { - return false; - } - int active = 0; - for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { - Connection con = (Connection)iter.next(); - if(con.getIsConnected()) { - active++; - } - } - - if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) { - _log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size()); - } - return (active >= _maxConcurrentStreams); - } - - public MessageHandler getMessageHandler() { - return _messageHandler; - } - - public PacketHandler getPacketHandler() { - return _packetHandler; - } - - public ConnectionHandler getConnectionHandler() { - return _connectionHandler; - } - - public I2PSession getSession() { - return _session; - } - - public PacketQueue getPacketQueue() { - return _outboundQueue; - } - - /** - * Something b0rked hard, so kill all of our connections without mercy. - * Don't bother sending close packets. - * - */ - public void disconnectAllHard() { - synchronized(_connectionLock) { - for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { - Connection con = (Connection)iter.next(); - con.disconnect(false, false); - } - _connectionByInboundId.clear(); - _connectionLock.notifyAll(); - } - } - - /** - * Drop the (already closed) connection on the floor. - * - */ - public void removeConnection(Connection con) { - boolean removed = false; - synchronized(_connectionLock) { - Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId())); - removed = (o == con); - if(_log.shouldLog(Log.DEBUG)) { - _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con); - } - if(!removed && _log.shouldLog(Log.DEBUG)) { - _log.debug("Failed to remove " + con + "\n" + _connectionByInboundId.values()); - } - _connectionLock.notifyAll(); - } - if(removed) { - _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime()); - } - } - - /** return a set of Connection objects */ - public Set listConnections() { - synchronized(_connectionLock) { - return new HashSet(_connectionByInboundId.values()); - } - } - - public boolean ping(Destination peer, long timeoutMs) { - return ping(peer, timeoutMs, true); - } - - public boolean ping(Destination peer, long timeoutMs, boolean blocking) { - return ping(peer, timeoutMs, blocking, null, null, null); - } - - public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { - Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1); - PacketLocal packet = new PacketLocal(_context, peer); - packet.setSendStreamId(id.longValue()); - packet.setFlag(Packet.FLAG_ECHO); - packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); - packet.setOptionalFrom(_session.getMyDestination()); - if((keyToUse != null) && (tagsToSend != null)) { - packet.setKeyUsed(keyToUse); - packet.setTagsSent(tagsToSend); - } - - PingRequest req = new PingRequest(peer, packet, notifier); - - synchronized(_pendingPings) { - _pendingPings.put(id, req); - } - - _outboundQueue.enqueue(packet); - packet.releasePayload(); - - if(blocking) { - synchronized(req) { - if(!req.pongReceived()) { - try { - req.wait(timeoutMs); - } catch(InterruptedException ie) { - } - } - } - - synchronized(_pendingPings) { - _pendingPings.remove(id); - } - } else { - SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs); - } - - boolean ok = req.pongReceived(); - return ok; - } - - interface PingNotifier { - - public void pingComplete(boolean ok); - } - - private class PingFailed implements SimpleTimer.TimedEvent { - - private Long _id; - private PingNotifier _notifier; - - public PingFailed(Long id, PingNotifier notifier) { - _id = id; - _notifier = notifier; - } - - public void timeReached() { - boolean removed = false; - synchronized(_pendingPings) { - Object o = _pendingPings.remove(_id); - if(o != null) { - removed = true; - } - } - if(removed) { - if(_notifier != null) { - _notifier.pingComplete(false); - } - if(_log.shouldLog(Log.INFO)) { - _log.info("Ping failed"); - } - } - } - } - - private class PingRequest { - - private boolean _ponged; - private Destination _peer; - private PacketLocal _packet; - private PingNotifier _notifier; - - public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { - _ponged = false; - _peer = peer; - _packet = packet; - _notifier = notifier; - } - - public void pong() { - _log.debug("Ping successful"); - _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); - synchronized(ConnectionManager.PingRequest.this) { - _ponged = true; - ConnectionManager.PingRequest.this.notifyAll(); - } - if(_notifier != null) { - _notifier.pingComplete(true); - } - } - - public boolean pongReceived() { - return _ponged; - } - } - - void receivePong(long pingId) { - PingRequest req = null; - synchronized(_pendingPings) { - req = (PingRequest)_pendingPings.remove(new Long(pingId)); - } - if(req != null) { - req.pong(); - } - } + interface PingNotifier { + public void pingComplete(boolean ok); + } + + private class PingFailed implements SimpleTimer.TimedEvent { + private Long _id; + private PingNotifier _notifier; + public PingFailed(Long id, PingNotifier notifier) { + _id = id; + _notifier = notifier; + } + + public void timeReached() { + boolean removed = false; + synchronized (_pendingPings) { + Object o = _pendingPings.remove(_id); + if (o != null) + removed = true; + } + if (removed) { + if (_notifier != null) + _notifier.pingComplete(false); + if (_log.shouldLog(Log.INFO)) + _log.info("Ping failed"); + } + } + } + + private class PingRequest { + private boolean _ponged; + private Destination _peer; + private PacketLocal _packet; + private PingNotifier _notifier; + public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { + _ponged = false; + _peer = peer; + _packet = packet; + _notifier = notifier; + } + public void pong() { + _log.debug("Ping successful"); + _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); + synchronized (ConnectionManager.PingRequest.this) { + _ponged = true; + ConnectionManager.PingRequest.this.notifyAll(); + } + if (_notifier != null) + _notifier.pingComplete(true); + } + public boolean pongReceived() { return _ponged; } + } + + void receivePong(long pingId) { + PingRequest req = null; + synchronized (_pendingPings) { + req = (PingRequest)_pendingPings.remove(new Long(pingId)); + } + if (req != null) + req.pong(); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index b85459f63..b1a4175f2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -1,8 +1,5 @@ package net.i2p.client.streaming; -import java.net.SocketTimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; import net.i2p.I2PException; /** @@ -10,46 +7,17 @@ import net.i2p.I2PException; * */ public class I2PServerSocketFull implements I2PServerSocket { - - private I2PSocketManagerFull _socketManager; - - /** - * - * @param mgr - */ - public I2PServerSocketFull(I2PSocketManagerFull mgr) { - _socketManager = mgr; - } - - /** - * - * @return - * @throws net.i2p.I2PException - * @throws SocketTimeoutException - */ - public I2PSocket accept() throws I2PException, SocketTimeoutException { - return _socketManager.receiveSocket(); - } - - public long getSoTimeout() { - return _socketManager.getConnectionManager().MgetSoTimeout(); - } - - public void setSoTimeout(long x) { - _socketManager.getConnectionManager().MsetSoTimeout(x); - } - /** - * Close the connection. - */ - public void close() { - _socketManager.getConnectionManager().setAllowIncomingConnections(false); - } - - /** - * - * @return _socketManager - */ - public I2PSocketManager getManager() { - return _socketManager; - } + private I2PSocketManagerFull _socketManager; + + public I2PServerSocketFull(I2PSocketManagerFull mgr) { + _socketManager = mgr; + } + + public I2PSocket accept() throws I2PException { + return _socketManager.receiveSocket(); + } + + public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); } + + public I2PSocketManager getManager() { return _socketManager; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index 842cf791b..61dd48757 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -11,139 +11,119 @@ import net.i2p.data.Destination; * */ public class I2PSocketFull implements I2PSocket { - - private Connection _connection; - private I2PSocket.SocketErrorListener _listener; - private Destination _remotePeer; - private Destination _localPeer; - - public I2PSocketFull(Connection con) { - _connection = con; - if(con != null) { - _remotePeer = con.getRemotePeer(); - _localPeer = con.getSession().getMyDestination(); - } - } - - - public void close() throws IOException { - Connection c = _connection; - if(c == null) { - return; - } - if(c.getIsConnected()) { - OutputStream out = c.getOutputStream(); - if(out != null) { - try { - out.close(); - } catch(IOException ioe) { - // ignore any write error, as we want to keep on and kill the - // con (thanks Complication!) - } - } - c.disconnect(true); - } else { - //throw new IOException("Not connected"); - } - destroy(); - } - - Connection getConnection() { - return _connection; - } - - public InputStream getInputStream() { - Connection c = _connection; - if(c != null) { - return c.getInputStream(); - } else { - return null; - } - } - - public I2PSocketOptions getOptions() { - Connection c = _connection; - if(c != null) { - return c.getOptions(); - } else { - return null; - } - } - - public OutputStream getOutputStream() throws IOException { - Connection c = _connection; - if(c != null) { - return c.getOutputStream(); - } else { - return null; - } - } - - public Destination getPeerDestination() { - return _remotePeer; - } - - public long getReadTimeout() { - I2PSocketOptions opts = getOptions(); - if(opts != null) { - return opts.getReadTimeout(); - } else { - return -1; - } - } - - public Destination getThisDestination() { - return _localPeer; - } - - public void setOptions(I2PSocketOptions options) { - Connection c = _connection; - if(c == null) { - return; - } - if(options instanceof ConnectionOptions) { - c.setOptions((ConnectionOptions)options); - } else { - c.setOptions(new ConnectionOptions(options)); - } - } - - public void setReadTimeout(long ms) { - Connection c = _connection; - if(c == null) { - return; - } - c.getInputStream().setReadTimeout((int)ms); - c.getOptions().setReadTimeout(ms); - } - - public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) { - _listener = lsnr; - } - - public boolean isClosed() { - Connection c = _connection; - return ((c == null) || - (!c.getIsConnected()) || - (c.getResetReceived()) || - (c.getResetSent())); - } - - void destroy() { - Connection c = _connection; - _connection = null; - _listener = null; - if(c != null) { - c.disconnectComplete(); - } - } - - public String toString() { - Connection c = _connection; - if(c == null) { - return super.toString(); - } else { - return c.toString(); - } - } + private Connection _connection; + private I2PSocket.SocketErrorListener _listener; + private Destination _remotePeer; + private Destination _localPeer; + + public I2PSocketFull(Connection con) { + _connection = con; + if (con != null) { + _remotePeer = con.getRemotePeer(); + _localPeer = con.getSession().getMyDestination(); + } + } + + public void close() throws IOException { + Connection c = _connection; + if (c == null) return; + if (c.getIsConnected()) { + OutputStream out = c.getOutputStream(); + if (out != null) { + try { + out.close(); + } catch (IOException ioe) { + // ignore any write error, as we want to keep on and kill the + // con (thanks Complication!) + } + } + c.disconnect(true); + } else { + //throw new IOException("Not connected"); + } + destroy(); + } + + Connection getConnection() { return _connection; } + + public InputStream getInputStream() { + Connection c = _connection; + if (c != null) + return c.getInputStream(); + else + return null; + } + + public I2PSocketOptions getOptions() { + Connection c = _connection; + if (c != null) + return c.getOptions(); + else + return null; + } + + public OutputStream getOutputStream() throws IOException { + Connection c = _connection; + if (c != null) + return c.getOutputStream(); + else + return null; + } + + public Destination getPeerDestination() { return _remotePeer; } + + public long getReadTimeout() { + I2PSocketOptions opts = getOptions(); + if (opts != null) + return opts.getReadTimeout(); + else + return -1; + } + + public Destination getThisDestination() { return _localPeer; } + + public void setOptions(I2PSocketOptions options) { + Connection c = _connection; + if (c == null) return; + + if (options instanceof ConnectionOptions) + c.setOptions((ConnectionOptions)options); + else + c.setOptions(new ConnectionOptions(options)); + } + + public void setReadTimeout(long ms) { + Connection c = _connection; + if (c == null) return; + + c.getInputStream().setReadTimeout((int)ms); + c.getOptions().setReadTimeout(ms); + } + + public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) { + _listener = lsnr; + } + + public boolean isClosed() { + Connection c = _connection; + return ((c == null) || + (!c.getIsConnected()) || + (c.getResetReceived()) || + (c.getResetSent())); + } + + void destroy() { + Connection c = _connection; + _connection = null; + _listener = null; + if (c != null) + c.disconnectComplete(); + } + public String toString() { + Connection c = _connection; + if (c == null) + return super.toString(); + else + return c.toString(); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index b0d1c841a..7384a4972 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -1,7 +1,6 @@ package net.i2p.client.streaming; import java.net.NoRouteToHostException; -import java.net.SocketTimeoutException; import java.util.HashSet; import java.util.Iterator; import java.util.Properties; @@ -14,6 +13,7 @@ import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; import net.i2p.util.Log; + /** * Centralize the coordination and multiplexing of the local client's streaming. * There should be one I2PSocketManager for each I2PSession, and if an application @@ -23,317 +23,219 @@ import net.i2p.util.Log; * */ public class I2PSocketManagerFull implements I2PSocketManager { + private I2PAppContext _context; + private Log _log; + private I2PSession _session; + private I2PServerSocketFull _serverSocket; + private ConnectionOptions _defaultOptions; + private long _acceptTimeout; + private String _name; + private int _maxStreams; + private static int __managerId = 0; + private ConnectionManager _connectionManager; + + /** + * How long to wait for the client app to accept() before sending back CLOSE? + * This includes the time waiting in the queue. Currently set to 5 seconds. + */ + private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000; + + public I2PSocketManagerFull() { + _context = null; + _session = null; + } + public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { + this(); + init(context, session, opts, name); + } + + /** how many streams will we allow at once? */ + public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; + + /** + * + */ + public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { + _context = context; + _session = session; + _log = _context.logManager().getLog(I2PSocketManagerFull.class); + + _maxStreams = -1; + try { + String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); + _maxStreams = Integer.parseInt(num); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); + _maxStreams = -1; + } + _name = name + " " + (++__managerId); + _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; + _defaultOptions = new ConnectionOptions(opts); + _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions); + _serverSocket = new I2PServerSocketFull(this); + + if (_log.shouldLog(Log.INFO)) { + _log.info("Socket manager created. \ndefault options: " + _defaultOptions + + "\noriginal properties: " + opts); + } + } - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private I2PServerSocketFull _serverSocket; - private ConnectionOptions _defaultOptions; - private long _acceptTimeout; - private String _name; - private int _maxStreams; - private static int __managerId = 0; - private ConnectionManager _connectionManager; - /** - * How long to wait for the client app to accept() before sending back CLOSE? - * This includes the time waiting in the queue. Currently set to 5 seconds. - */ - private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000; + public I2PSocketOptions buildOptions() { return buildOptions(null); } + public I2PSocketOptions buildOptions(Properties opts) { + ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions); + curOpts.setProperties(opts); + return curOpts; + } + + public I2PSession getSession() { + return _session; + } + + public ConnectionManager getConnectionManager() { + return _connectionManager; + } - /** - * - */ - public I2PSocketManagerFull() { - _context = null; - _session = null; - } + public I2PSocket receiveSocket() throws I2PException { + verifySession(); + Connection con = _connectionManager.getConnectionHandler().accept(-1); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("receiveSocket() called: " + con); + if (con != null) { + I2PSocketFull sock = new I2PSocketFull(con); + con.setSocket(sock); + return sock; + } else { + return null; + } + } + + /** + * Ping the specified peer, returning true if they replied to the ping within + * the timeout specified, false otherwise. This call blocks. + * + */ + public boolean ping(Destination peer, long timeoutMs) { + return _connectionManager.ping(peer, timeoutMs); + } - /** - * - * @param context - * @param session - * @param opts - * @param name - */ - public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { - this(); - init(context, session, opts, name); - } - /** how many streams will we allow at once? */ - public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; + /** + * How long should we wait for the client to .accept() a socket before + * sending back a NACK/Close? + * + * @param ms milliseconds to wait, maximum + */ + public void setAcceptTimeout(long ms) { _acceptTimeout = ms; } + public long getAcceptTimeout() { return _acceptTimeout; } - /** - * - * - * @param context - * @param session - * @param opts - * @param name - */ - public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { - _context = context; - _session = session; - _log = _context.logManager().getLog(I2PSocketManagerFull.class); + public void setDefaultOptions(I2PSocketOptions options) { + _defaultOptions = new ConnectionOptions((ConnectionOptions) options); + } - _maxStreams = -1; - try { - String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); - _maxStreams = Integer.parseInt(num); - } catch(NumberFormatException nfe) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); - } - _maxStreams = -1; - } - _name = name + " " + (++__managerId); - _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; - _defaultOptions = new ConnectionOptions(opts); - _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions); - _serverSocket = new I2PServerSocketFull(this); + public I2PSocketOptions getDefaultOptions() { + return _defaultOptions; + } - if(_log.shouldLog(Log.INFO)) { - _log.info("Socket manager created. \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts); - } - } + public I2PServerSocket getServerSocket() { + _connectionManager.setAllowIncomingConnections(true); + return _serverSocket; + } - /** - * - * @return - */ - public I2PSocketOptions buildOptions() { - return buildOptions(null); - } + private void verifySession() throws I2PException { + if (!_connectionManager.getSession().isClosed()) + return; + _connectionManager.getSession().connect(); + } + + /** + * Create a new connected socket (block until the socket is created) + * + * @param peer Destination to connect to + * @param options I2P socket options to be used for connecting + * + * @throws NoRouteToHostException if the peer is not found or not reachable + * @throws I2PException if there is some other I2P-related problem + */ + public I2PSocket connect(Destination peer, I2PSocketOptions options) + throws I2PException, NoRouteToHostException { + verifySession(); + if (options == null) + options = _defaultOptions; + ConnectionOptions opts = null; + if (options instanceof ConnectionOptions) + opts = new ConnectionOptions((ConnectionOptions)options); + else + opts = new ConnectionOptions(options); + + if (_log.shouldLog(Log.INFO)) + _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) + + " with options: " + opts); + Connection con = _connectionManager.connect(peer, opts); + if (con == null) + throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); + I2PSocketFull socket = new I2PSocketFull(con); + con.setSocket(socket); + if (con.getConnectionError() != null) { + con.disconnect(false); + throw new NoRouteToHostException(con.getConnectionError()); + } + return socket; + } - /** - * - * @param opts - * @return - */ - public I2PSocketOptions buildOptions(Properties opts) { - ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions); - curOpts.setProperties(opts); - return curOpts; - } + /** + * Create a new connected socket (block until the socket is created) + * + * @param peer Destination to connect to + * + * @throws NoRouteToHostException if the peer is not found or not reachable + * @throws I2PException if there is some other I2P-related problem + */ + public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException { + return connect(peer, _defaultOptions); + } - /** - * - * @return - */ - public I2PSession getSession() { - return _session; - } + /** + * Destroy the socket manager, freeing all the associated resources. This + * method will block untill all the managed sockets are closed. + * + */ + public void destroySocketManager() { + _connectionManager.disconnectAllHard(); + _connectionManager.setAllowIncomingConnections(false); + // should we destroy the _session too? + // yes, since the old lib did (and SAM wants it to, and i dont know why not) + if ( (_session != null) && (!_session.isClosed()) ) { + try { + _session.destroySession(); + } catch (I2PSessionException ise) { + _log.warn("Unable to destroy the session", ise); + } + } + } - /** - * - * @return - */ - public ConnectionManager getConnectionManager() { - return _connectionManager; - } + /** + * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. + * + */ + public Set listSockets() { + Set connections = _connectionManager.listConnections(); + Set rv = new HashSet(connections.size()); + for (Iterator iter = connections.iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + if (con.getSocket() != null) + rv.add(con.getSocket()); + } + return rv; + } - /** - * - * @return - * @throws net.i2p.I2PException - * @throws java.net.SocketTimeoutException - */ - public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException { - verifySession(); - Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout()); - if(_log.shouldLog(Log.DEBUG)) { - _log.debug("receiveSocket() called: " + con); - } - if(con != null) { - I2PSocketFull sock = new I2PSocketFull(con); - con.setSocket(sock); - return sock; - } else { - if(_connectionManager.MgetSoTimeout() == -1) { - return null; - } - throw new SocketTimeoutException("I2PSocket timed out"); - } - } - - /** - * Ping the specified peer, returning true if they replied to the ping within - * the timeout specified, false otherwise. This call blocks. - * - * - * @param peer - * @param timeoutMs - * @return - */ - public boolean ping(Destination peer, long timeoutMs) { - return _connectionManager.ping(peer, timeoutMs); - } - - /** - * How long should we wait for the client to .accept() a socket before - * sending back a NACK/Close? - * - * @param ms milliseconds to wait, maximum - */ - public void setAcceptTimeout(long ms) { - _acceptTimeout = ms; - } - - /** - * - * @return - */ - public long getAcceptTimeout() { - return _acceptTimeout; - } - - /** - * - * @param options - */ - public void setDefaultOptions(I2PSocketOptions options) { - _defaultOptions = new ConnectionOptions((ConnectionOptions)options); - } - - /** - * - * @return - */ - public I2PSocketOptions getDefaultOptions() { - return _defaultOptions; - } - - /** - * - * @return - */ - public I2PServerSocket getServerSocket() { - _connectionManager.setAllowIncomingConnections(true); - return _serverSocket; - } - - private void verifySession() throws I2PException { - if(!_connectionManager.getSession().isClosed()) { - return; - } - _connectionManager.getSession().connect(); - } - - /** - * Create a new connected socket (block until the socket is created) - * - * @param peer Destination to connect to - * @param options I2P socket options to be used for connecting - * - * @throws NoRouteToHostException if the peer is not found or not reachable - * @throws I2PException if there is some other I2P-related problem - */ - public I2PSocket connect(Destination peer, I2PSocketOptions options) - throws I2PException, NoRouteToHostException { - verifySession(); - if(options == null) { - options = _defaultOptions; - } - ConnectionOptions opts = null; - if(options instanceof ConnectionOptions) { - opts = new ConnectionOptions((ConnectionOptions)options); - } else { - opts = new ConnectionOptions(options); - } - if(_log.shouldLog(Log.INFO)) { - _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts); - } - Connection con = _connectionManager.connect(peer, opts); - if(con == null) { - throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); - } - I2PSocketFull socket = new I2PSocketFull(con); - con.setSocket(socket); - if(con.getConnectionError() != null) { - con.disconnect(false); - throw new NoRouteToHostException(con.getConnectionError()); - } - return socket; - } - - /** - * Create a new connected socket (block until the socket is created) - * - * @param peer Destination to connect to - * - * @return - * @throws NoRouteToHostException if the peer is not found or not reachable - * @throws I2PException if there is some other I2P-related problem - */ - public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException { - return connect(peer, _defaultOptions); - } - - /** - * Destroy the socket manager, freeing all the associated resources. This - * method will block untill all the managed sockets are closed. - * - */ - public void destroySocketManager() { - _connectionManager.disconnectAllHard(); - _connectionManager.setAllowIncomingConnections(false); - // should we destroy the _session too? - // yes, since the old lib did (and SAM wants it to, and i dont know why not) - if((_session != null) && (!_session.isClosed())) { - try { - _session.destroySession(); - } catch(I2PSessionException ise) { - _log.warn("Unable to destroy the session", ise); - } - } - } - - /** - * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. - * - * - * @return - */ - public Set listSockets() { - Set connections = _connectionManager.listConnections(); - Set rv = new HashSet(connections.size()); - for(Iterator iter = connections.iterator(); iter.hasNext();) { - Connection con = (Connection)iter.next(); - if(con.getSocket() != null) { - rv.add(con.getSocket()); - } - } - return rv; - } - - /** - * - * @return - */ - public String getName() { - return _name; - } - - /** - * - * @param name - */ - public void setName(String name) { - _name = name; - } - - /** - * - * @param lsnr - */ - public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { - _connectionManager.getMessageHandler().addDisconnectListener(lsnr); - } - - /** - * - * @param lsnr - */ - public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { - _connectionManager.getMessageHandler().removeDisconnectListener(lsnr); - } + public String getName() { return _name; } + public void setName(String name) { _name = name; } + + + public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { + _connectionManager.getMessageHandler().addDisconnectListener(lsnr); + } + public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { + _connectionManager.getMessageHandler().removeDisconnectListener(lsnr); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java index c52c373b1..0ea0c83d7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java @@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer; /** * */ -public class RetransmissionTimer extends SimpleTimer { +class RetransmissionTimer extends SimpleTimer { private static final RetransmissionTimer _instance = new RetransmissionTimer(); public static final SimpleTimer getInstance() { return _instance; } protected RetransmissionTimer() { super("StreamingTimer"); } diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java index c5955c999..e3c1b6fbf 100644 --- a/core/java/src/net/i2p/util/Executor.java +++ b/core/java/src/net/i2p/util/Executor.java @@ -5,59 +5,42 @@ import java.util.List; import net.i2p.I2PAppContext; class Executor implements Runnable { + private I2PAppContext _context; + private Log _log; + private List _readyEvents; + public Executor(I2PAppContext ctx, Log log, List events) { + _context = ctx; + _readyEvents = events; + } + public void run() { + while (true) { + SimpleTimer.TimedEvent evt = null; + synchronized (_readyEvents) { + if (_readyEvents.size() <= 0) + try { _readyEvents.wait(); } catch (InterruptedException ie) {} + if (_readyEvents.size() > 0) + evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); + } - private I2PAppContext _context; - private Log _log; - private List _readyEvents; - private SimpleStore runn; - - public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { - _context = ctx; - _readyEvents = events; - runn = x; - } - - public void run() { - while(runn.getAnswer()) { - SimpleTimer.TimedEvent evt = null; - synchronized(_readyEvents) { - if(_readyEvents.size() <= 0) { - try { - _readyEvents.wait(); - } catch(InterruptedException ie) { - } - } - if(_readyEvents.size() > 0) { - evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); - } - } - - if(evt != null) { - long before = _context.clock().now(); - try { - evt.timeReached(); - } catch(Throwable t) { - log("wtf, event borked: " + evt, t); - } - long time = _context.clock().now() - before; - if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) { - _log.warn("wtf, event execution took " + time + ": " + evt); - } - } - } - } - - /** - * - * @param msg - * @param t - */ - private void log(String msg, Throwable t) { - synchronized(this) { - if(_log == null) { - _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); - } - } - _log.log(Log.CRIT, msg, t); - } + if (evt != null) { + long before = _context.clock().now(); + try { + evt.timeReached(); + } catch (Throwable t) { + log("wtf, event borked: " + evt, t); + } + long time = _context.clock().now() - before; + if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) ) + _log.warn("wtf, event execution took " + time + ": " + evt); + } + } + } + + private void log(String msg, Throwable t) { + synchronized (this) { + if (_log == null) + _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); + } + _log.log(Log.CRIT, msg, t); + } } diff --git a/core/java/src/net/i2p/util/SimpleStore.java b/core/java/src/net/i2p/util/SimpleStore.java deleted file mode 100644 index b73a8e7eb..000000000 --- a/core/java/src/net/i2p/util/SimpleStore.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * This is free software, do as you please. - */ - -package net.i2p.util; - -/** - * - * @author sponge - */ -public class SimpleStore { - - private boolean answer; - - SimpleStore(boolean x) { - answer=x; - } - - /** - * set the answer - * - * @param x - */ - public void setAnswer(boolean x) { - answer = x; - } - /** - * - * @return boolean - */ - public boolean getAnswer() { - return answer; - } - -} diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index 5595fbd5c..9543f72c5 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -16,262 +16,211 @@ import net.i2p.I2PAppContext; * */ public class SimpleTimer { + private static final SimpleTimer _instance = new SimpleTimer(); + public static SimpleTimer getInstance() { return _instance; } + private I2PAppContext _context; + private Log _log; + /** event time (Long) to event (TimedEvent) mapping */ + private TreeMap _events; + /** event (TimedEvent) to event time (Long) mapping */ + private Map _eventTimes; + private List _readyEvents; + + protected SimpleTimer() { this("SimpleTimer"); } + protected SimpleTimer(String name) { + _context = I2PAppContext.getGlobalContext(); + _log = _context.logManager().getLog(SimpleTimer.class); + _events = new TreeMap(); + _eventTimes = new HashMap(256); + _readyEvents = new ArrayList(4); + I2PThread runner = new I2PThread(new SimpleTimerRunner()); + runner.setName(name); + runner.setDaemon(true); + runner.start(); + for (int i = 0; i < 3; i++) { + I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents)); + executor.setName(name + "Executor " + i); + executor.setDaemon(true); + executor.start(); + } + } + + public void reschedule(TimedEvent event, long timeoutMs) { + addEvent(event, timeoutMs, false); + } + + /** + * Queue up the given event to be fired no sooner than timeoutMs from now. + * However, if this event is already scheduled, the event will be scheduled + * for the earlier of the two timeouts, which may be before this stated + * timeout. If this is not the desired behavior, call removeEvent first. + * + */ + public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } + /** + * @param useEarliestTime if its already scheduled, use the earlier of the + * two timeouts, else use the later + */ + public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) { + int totalEvents = 0; + long now = System.currentTimeMillis(); + long eventTime = now + timeoutMs; + Long time = new Long(eventTime); + synchronized (_events) { + // remove the old scheduled position, then reinsert it + Long oldTime = (Long)_eventTimes.get(event); + if (oldTime != null) { + if (useEarliestTime) { + if (oldTime.longValue() < eventTime) { + _events.notifyAll(); + return; // already scheduled for sooner than requested + } else { + _events.remove(oldTime); + } + } else { + if (oldTime.longValue() > eventTime) { + _events.notifyAll(); + return; // already scheduled for later than the given period + } else { + _events.remove(oldTime); + } + } + } + while (_events.containsKey(time)) + time = new Long(time.longValue() + 1); + _events.put(time, event); + _eventTimes.put(event, time); + + if ( (_events.size() != _eventTimes.size()) ) { + _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size()); + for (Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext(); ) { + TimedEvent evt = (TimedEvent)iter.next(); + Long when = (Long)_eventTimes.get(evt); + TimedEvent cur = (TimedEvent)_events.get(when); + if (cur != evt) { + _log.error("event " + evt + " @ " + when + ": " + cur); + } + } + } + + totalEvents = _events.size(); + _events.notifyAll(); + } + if (time.longValue() > eventTime + 100) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Lots of timer congestion, had to push " + event + " back " + + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")"); + } + long timeToAdd = System.currentTimeMillis() - now; + if (timeToAdd > 50) { + if (_log.shouldLog(Log.WARN)) + _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued"); + } + + } + + public boolean removeEvent(TimedEvent evt) { + if (evt == null) return false; + synchronized (_events) { + Long when = (Long)_eventTimes.remove(evt); + if (when != null) + _events.remove(when); + return null != when; + } + } + + /** + * Simple interface for events to be queued up and notified on expiration + */ + public interface TimedEvent { + /** + * the time requested has been reached (this call should NOT block, + * otherwise the whole SimpleTimer gets backed up) + * + */ + public void timeReached(); + } + + private long _occurredTime; + private long _occurredEventCount; + private TimedEvent _recentEvents[] = new TimedEvent[5]; + + private class SimpleTimerRunner implements Runnable { + public void run() { + List eventsToFire = new ArrayList(1); + while (true) { + try { + synchronized (_events) { + //if (_events.size() <= 0) + // _events.wait(); + //if (_events.size() > 100) + // _log.warn("> 100 events! " + _events.values()); + long now = System.currentTimeMillis(); + long nextEventDelay = -1; + Object nextEvent = null; + while (true) { + if (_events.size() <= 0) break; + Long when = (Long)_events.firstKey(); + if (when.longValue() <= now) { + TimedEvent evt = (TimedEvent)_events.remove(when); + if (evt != null) { + _eventTimes.remove(evt); + eventsToFire.add(evt); + } + } else { + nextEventDelay = when.longValue() - now; + nextEvent = _events.get(when); + break; + } + } + if (eventsToFire.size() <= 0) { + if (nextEventDelay != -1) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Next event in " + nextEventDelay + ": " + nextEvent); + _events.wait(nextEventDelay); + } else { + _events.wait(); + } + } + } + } catch (ThreadDeath td) { + return; // die + } catch (InterruptedException ie) { + // ignore + } catch (Throwable t) { + if (_log != null) { + _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t); + } else { + System.err.println("Uncaught exception in SimpleTimer"); + t.printStackTrace(); + } + } + + long now = System.currentTimeMillis(); + now = now - (now % 1000); - private static final SimpleTimer _instance = new SimpleTimer(); + synchronized (_readyEvents) { + for (int i = 0; i < eventsToFire.size(); i++) + _readyEvents.add(eventsToFire.get(i)); + _readyEvents.notifyAll(); + } - public static SimpleTimer getInstance() { - return _instance; - } - private I2PAppContext _context; - private Log _log; - /** event time (Long) to event (TimedEvent) mapping */ - private TreeMap _events; - /** event (TimedEvent) to event time (Long) mapping */ - private Map _eventTimes; - private List _readyEvents; - private SimpleStore runn; + if (_occurredTime == now) { + _occurredEventCount += eventsToFire.size(); + } else { + _occurredTime = now; + if (_occurredEventCount > 2500) { + StringBuffer buf = new StringBuffer(128); + buf.append("Too many simpleTimerJobs (").append(_occurredEventCount); + buf.append(") in a second!"); + _log.log(Log.WARN, buf.toString()); + } + _occurredEventCount = 0; + } - /** - * - */ - protected SimpleTimer() { - this("SimpleTimer"); - } - - /** - * - * @param name - */ - protected SimpleTimer(String name) { - runn = new SimpleStore(true); - _context = I2PAppContext.getGlobalContext(); - _log = _context.logManager().getLog(SimpleTimer.class); - _events = new TreeMap(); - _eventTimes = new HashMap(256); - _readyEvents = new ArrayList(4); - I2PThread runner = new I2PThread(new SimpleTimerRunner()); - runner.setName(name); - runner.setDaemon(true); - runner.start(); - for(int i = 0; i < 3; i++) { - I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn)); - executor.setName(name + "Executor " + i); - executor.setDaemon(true); - executor.start(); - } - } - - /** - * Removes the SimpleTimer. - */ - public void removeSimpleTimer() { - synchronized(_events) { - runn.setAnswer(false); - _events.notifyAll(); - } - } - - /** - * - * @param event - * @param timeoutMs - */ - public void reschedule(TimedEvent event, long timeoutMs) { - addEvent(event, timeoutMs, false); - } - - /** - * Queue up the given event to be fired no sooner than timeoutMs from now. - * However, if this event is already scheduled, the event will be scheduled - * for the earlier of the two timeouts, which may be before this stated - * timeout. If this is not the desired behavior, call removeEvent first. - * - * @param event - * @param timeoutMs - */ - public void addEvent(TimedEvent event, long timeoutMs) { - addEvent(event, timeoutMs, true); - } - - /** - * @param event - * @param timeoutMs - * @param useEarliestTime if its already scheduled, use the earlier of the - * two timeouts, else use the later - */ - public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) { - int totalEvents = 0; - long now = System.currentTimeMillis(); - long eventTime = now + timeoutMs; - Long time = new Long(eventTime); - synchronized(_events) { - // remove the old scheduled position, then reinsert it - Long oldTime = (Long)_eventTimes.get(event); - if(oldTime != null) { - if(useEarliestTime) { - if(oldTime.longValue() < eventTime) { - _events.notifyAll(); - return; // already scheduled for sooner than requested - } else { - _events.remove(oldTime); - } - } else { - if(oldTime.longValue() > eventTime) { - _events.notifyAll(); - return; // already scheduled for later than the given period - } else { - _events.remove(oldTime); - } - } - } - while(_events.containsKey(time)) { - time = new Long(time.longValue() + 1); - } - _events.put(time, event); - _eventTimes.put(event, time); - - if((_events.size() != _eventTimes.size())) { - _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size()); - for(Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext();) { - TimedEvent evt = (TimedEvent)iter.next(); - Long when = (Long)_eventTimes.get(evt); - TimedEvent cur = (TimedEvent)_events.get(when); - if(cur != evt) { - _log.error("event " + evt + " @ " + when + ": " + cur); - } - } - } - - totalEvents = _events.size(); - _events.notifyAll(); - } - if(time.longValue() > eventTime + 100) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")"); - } - } - long timeToAdd = System.currentTimeMillis() - now; - if(timeToAdd > 50) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued"); - } - } - - } - - /** - * - * @param evt - * @return - */ - public boolean removeEvent(TimedEvent evt) { - if(evt == null) { - return false; - } - synchronized(_events) { - Long when = (Long)_eventTimes.remove(evt); - if(when != null) { - _events.remove(when); - } - return null != when; - } - } - - /** - * Simple interface for events to be queued up and notified on expiration - */ - public interface TimedEvent { - - /** - * the time requested has been reached (this call should NOT block, - * otherwise the whole SimpleTimer gets backed up) - * - */ - public void timeReached(); - } - private long _occurredTime; - private long _occurredEventCount; - // not used - // private TimedEvent _recentEvents[] = new TimedEvent[5]; - private class SimpleTimerRunner implements Runnable { - - public void run() { - List eventsToFire = new ArrayList(1); - while(runn.getAnswer()) { - try { - synchronized(_events) { - //if (_events.size() <= 0) - // _events.wait(); - //if (_events.size() > 100) - // _log.warn("> 100 events! " + _events.values()); - long now = System.currentTimeMillis(); - long nextEventDelay = -1; - Object nextEvent = null; - while(runn.getAnswer()) { - if(_events.size() <= 0) { - break; - } - Long when = (Long)_events.firstKey(); - if(when.longValue() <= now) { - TimedEvent evt = (TimedEvent)_events.remove(when); - if(evt != null) { - _eventTimes.remove(evt); - eventsToFire.add(evt); - } - } else { - nextEventDelay = when.longValue() - now; - nextEvent = _events.get(when); - break; - } - } - if(eventsToFire.size() <= 0) { - if(nextEventDelay != -1) { - if(_log.shouldLog(Log.DEBUG)) { - _log.debug("Next event in " + nextEventDelay + ": " + nextEvent); - } - _events.wait(nextEventDelay); - } else { - _events.wait(); - } - } - } - } catch(InterruptedException ie) { - // ignore - } catch(Throwable t) { - if(_log != null) { - _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t); - } else { - System.err.println("Uncaught exception in SimpleTimer"); - t.printStackTrace(); - } - } - - long now = System.currentTimeMillis(); - now = now - (now % 1000); - - synchronized(_readyEvents) { - for(int i = 0; i < eventsToFire.size(); i++) { - _readyEvents.add(eventsToFire.get(i)); - } - _readyEvents.notifyAll(); - } - - if(_occurredTime == now) { - _occurredEventCount += eventsToFire.size(); - } else { - _occurredTime = now; - if(_occurredEventCount > 2500) { - StringBuffer buf = new StringBuffer(128); - buf.append("Too many simpleTimerJobs (").append(_occurredEventCount); - buf.append(") in a second!"); - _log.log(Log.WARN, buf.toString()); - } - _occurredEventCount = 0; - } - - eventsToFire.clear(); - } - } - } + eventsToFire.clear(); + } + } + } }