diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index 6d6cfdc15..733958736 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -31,32 +31,33 @@ public interface I2PServerSocket { public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; /** - * accept(true) has the same behaviour as accept(). - * accept(false) does not wait for a socket connecting. If a socket is - * available in the queue, it is accepted. Else, null is returned. + * accept(timeout) waits timeout ms for a socket connecting. If a socket is + * not available during the timeout, return null. accept(0) behaves like accept() * - * @param true if the call should block until a socket is available + * @param timeout in ms * * @return a connected I2PSocket, or null * * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed - * @throws SocketTimeoutException + * @throws InterruptedException if thread is interrupted while waiting */ - public I2PSocket accept(long timeout) throws I2PException, ConnectException, SocketTimeoutException, InterruptedException; + public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException; /** - * Waits until there is a socket waiting for acception or the timeout is + * Wait until there is a socket waiting for acception or the timeout is * reached. * - * @param timeoutMs timeout in ms. A negative value waits forever. + * @param timeoutMs timeout in ms. If ms is 0, wait forever. * * @return true if a socket is available, false if not * * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed + * @throws InterruptedException if the thread is interrupted before + * completion */ public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException; diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 59459286e..2da8320fc 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -1,6 +1,7 @@ package net.i2p.client.streaming; import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -49,11 +50,6 @@ class I2PServerSocketImpl implements I2PServerSocket { this.mgr = mgr; } - - - - - /** * Waits until there is a socket waiting for acception or the timeout is * reached. @@ -63,6 +59,7 @@ class I2PServerSocketImpl implements I2PServerSocket { * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed + * @throws InterruptedException if thread is interrupted while waiting */ public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException { if (_log.shouldLog(Log.DEBUG)) @@ -92,21 +89,19 @@ class I2PServerSocketImpl implements I2PServerSocket { } } - /** - * accept(true) has the same behaviour as accept(). - * accept(false) does not wait for a socket connecting. If a socket is - * available in the queue, it is accepted. Else, null is returned. + * accept(timeout) waits timeout ms for a socket connecting. If a socket is + * not available during the timeout, return null. accept(0) behaves like accept() * - * @param true if the call should block until a socket is available + * @param timeout in ms * * @return a connected I2PSocket, or null * * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed + * @throws InterruptedException if thread is interrupted while waiting */ - public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException { I2PSocket ret = null; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 9dd6aee41..e53e701a1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -2,8 +2,6 @@ package net.i2p.client.streaming; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.ArrayList; -import java.util.List; import net.i2p.I2PAppContext; import net.i2p.util.Log; @@ -42,7 +40,8 @@ class ConnectionHandler { _context = context; _log = context.logManager().getLog(ConnectionHandler.class); _manager = mgr; - _synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE); + _synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE); + _synSignal= new Object(); _active = false; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; } @@ -83,12 +82,8 @@ class ConnectionHandler { if (success) { SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout); if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) - synchronized (this._synSignal) - { - this._synSignal.notifyAll(); - } - - } else { + synchronized (this._synSignal) {this._synSignal.notifyAll();} + } else { if (_log.shouldLog(Log.WARN)) _log.warn("Dropping new SYN request, as the queue is full"); if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) @@ -103,9 +98,24 @@ class ConnectionHandler { * @throws InterruptedException */ public void waitSyn( long ms ) throws InterruptedException { - synchronized (this._synSignal) + synchronized (this._synSignal) { - this._synSignal.wait(ms); + long now = this._context.clock().now() ; + long expiration = now + ms ; + while ( expiration > now || ms<=0 ) { + // check we have not missed a SYN packet before entering + // the lock + for ( Packet p : this._synQueue ) { + if ( p.isFlagSet(Packet.FLAG_SYNCHRONIZE) ) return ; + } + // wait until a SYN is signaled + if ( ms == 0) { + this._synSignal.wait(); + } else { + this._synSignal.wait(expiration-now); + now = this._context.clock().now(); + } + } } } @@ -114,8 +124,8 @@ class ConnectionHandler { * Non-SYN packets with a zero SendStreamID may also be queued here so * that they don't get thrown away while the SYN packet before it is queued. * - * @param timeoutMs max amount of time to wait for a connection (if negative, - * wait indefinitely) + * @param timeoutMs max amount of time to wait for a connection (if less + * than 1ms, wait indefinitely) * @return connection received, or null if there was a timeout or the * handler was shut down */ @@ -125,6 +135,8 @@ class ConnectionHandler { long expiration = timeoutMs + _context.clock().now(); while (true) { + if ( (timeoutMs > 0) && (expiration < _context.clock().now()) ) + return null; if (!_active) { // fail all the ones we had queued up while(true) { @@ -136,9 +148,6 @@ class ConnectionHandler { return null; } - if ( (timeoutMs > 0) && (expiration < _context.clock().now()) ) - return null; - Packet syn = null; while ( _active && syn == null) { if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index 8fe7fa9ce..0a183afef 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -1,12 +1,9 @@ package net.i2p.client.streaming; -import java.net.ConnectException; import java.net.SocketTimeoutException; import net.i2p.I2PAppContext; import net.i2p.I2PException; -import net.i2p.util.Clock; -import net.i2p.util.Log; /** * Bridge to allow accepting new connections @@ -52,17 +49,15 @@ public class I2PServerSocketFull implements I2PServerSocket { } /** - * accept(true) has the same behaviour as accept(). - * accept(false) does not wait for a socket connecting. If a socket is - * available in the queue, it is accepted. Else, null is returned. + * accept(timeout) waits timeout ms for a socket connecting. If a socket is + * not available during the timeout, return null. accept(0) behaves like accept() * - * @param true if the call should block until a socket is available + * @param timeout in ms * * @return a connected I2PSocket, or null * * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) - * @throws SocketTimeoutException if the timeout has been reached */ public I2PSocket accept(long timeout) throws I2PException { @@ -78,7 +73,17 @@ public class I2PServerSocketFull implements I2PServerSocket { } } - public void waitIncoming(long timeoutMs) throws InterruptedException { + /** + * block until a SYN packet is detected or the timeout is reached. If timeout is 0, + * block until a SYN packet is detected. + * + * @param timeoutMs + * @throws InterruptedException + * @throws I2PException + */ + public void waitIncoming(long timeoutMs) throws I2PException, InterruptedException { + if (this._socketManager.getConnectionManager().getSession().isClosed()) + throw new I2PException("Session is closed"); this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs); } }