diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index e7db251fa..6d6cfdc15 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -44,7 +44,7 @@ public interface I2PServerSocket { * @throws ConnectException if the I2PServerSocket is closed * @throws SocketTimeoutException */ - public I2PSocket accept(boolean blocking) throws I2PException, ConnectException, SocketTimeoutException; + public I2PSocket accept(long timeout) throws I2PException, ConnectException, SocketTimeoutException, InterruptedException; /** * Waits until there is a socket waiting for acception or the timeout is @@ -58,7 +58,7 @@ public interface I2PServerSocket { * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed */ - public boolean waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException; + public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException; /** * Set Sock Option accept timeout diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index a2c075e3e..59459286e 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -60,28 +60,24 @@ class I2PServerSocketImpl implements I2PServerSocket { * * @param timeoutMs timeout in ms. A negative value waits forever. * - * @return true if a socket is available, false if not - * * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed */ - public boolean waitIncoming(long timeoutMs) throws I2PException, ConnectException { + public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException { if (_log.shouldLog(Log.DEBUG)) _log.debug("waitIncoming() called, pending: " + pendingSockets.size()); - boolean isTimed = (timeoutMs>=0); + boolean isTimed = (timeoutMs>0); if (isTimed) { Clock clock = I2PAppContext.getGlobalContext().clock(); long now = clock.now(); long end = now + timeoutMs; while (pendingSockets.size() <= 0 && now0); } @@ -112,16 +107,20 @@ class I2PServerSocketImpl implements I2PServerSocket { * @throws ConnectException if the I2PServerSocket is closed */ - public I2PSocket accept(boolean blocking) throws I2PException, ConnectException { + public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException { I2PSocket ret = null; - if (blocking) { + if (timeout<=0) { ret = accept(); } else { + long now = I2PAppContext.getGlobalContext().clock().now(); + long expiration = timeout + now ; synchronized (pendingSockets) { - if (pendingSockets.size() > 0) { - ret = (I2PSocket)pendingSockets.remove(0); + while (pendingSockets.size() == 0 && expiration>now) { + pendingSockets.wait(expiration-now); + now = I2PAppContext.getGlobalContext().clock().now(); } + ret = (I2PSocket)pendingSockets.remove(0); } if (ret != null) { synchronized (socketAcceptedLock) { @@ -151,10 +150,12 @@ class I2PServerSocketImpl implements I2PServerSocket { I2PSocket ret = null; while ( (ret == null) && (!closing) ){ - - this.waitIncoming(-1); - - ret = accept(false); + try { + this.waitIncoming(0); + ret = accept(1); + } catch (InterruptedException e) { + throw new I2PException("Thread interrupted") ; + } } if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/sam/Demos/datagramTests/samForward.py b/apps/sam/Demos/datagramTests/samForward.py index 56590e7ef..1706b3fa5 100755 --- a/apps/sam/Demos/datagramTests/samForward.py +++ b/apps/sam/Demos/datagramTests/samForward.py @@ -16,7 +16,7 @@ else : if len(sys.argv)==3 : name = sys.argv[2] else : - name = "essaiSamForward" + name = "datagramSamForward" sess = socket.socket( socket.AF_INET, socket.SOCK_STREAM) diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java index b9ea710c6..ca83e8a1f 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java @@ -269,10 +269,9 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle { while (session.socketServer!=null) { - boolean available = false ; I2PSocket i2ps = null ; try { - available = session.socketServer.waitIncoming(-1); + session.socketServer.waitIncoming(0); } catch (ConnectException e) { _log.debug("ConnectException"); break ; @@ -283,7 +282,6 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle _log.debug("InterruptedException"); break ; } - if ( !available ) continue ; java.net.InetSocketAddress addr = new java.net.InetSocketAddress(host,port); @@ -296,7 +294,7 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle } try { - i2ps = session.socketServer.accept(false); + i2ps = session.socketServer.accept(1); } catch (Exception e) {} if (i2ps==null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 37a1f0d7c..9dd6aee41 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -23,6 +23,7 @@ class ConnectionHandler { private Log _log; private ConnectionManager _manager; private LinkedBlockingQueue _synQueue; + private Object _synSignal; private boolean _active; private int _acceptTimeout; @@ -81,7 +82,13 @@ class ConnectionHandler { boolean success = _synQueue.offer(packet); // fail immediately if full if (success) { SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout); - } else { + if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) + synchronized (this._synSignal) + { + this._synSignal.notifyAll(); + } + + } else { if (_log.shouldLog(Log.WARN)) _log.warn("Dropping new SYN request, as the queue is full"); if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) @@ -89,8 +96,17 @@ class ConnectionHandler { } } - public boolean waitSyn( long ms ) throws InterruptedException { - throw new InterruptedException(); + /** + * Wait until some SYN packet is available + * @param ms max amount of time to wait for a connection (if negative or null, + * wait indefinitely) + * @throws InterruptedException + */ + public void waitSyn( long ms ) throws InterruptedException { + synchronized (this._synSignal) + { + this._synSignal.wait(ms); + } } /** @@ -120,6 +136,9 @@ class ConnectionHandler { return null; } + if ( (timeoutMs > 0) && (expiration < _context.clock().now()) ) + return null; + Packet syn = null; while ( _active && syn == null) { if (_log.shouldLog(Log.DEBUG)) @@ -162,8 +181,6 @@ class ConnectionHandler { } } // keep looping... - if ( (timeoutMs >= 0) && (expiration < _context.clock().now()) ) - return null; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index ab9cb1c9d..8fe7fa9ce 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -65,28 +65,20 @@ public class I2PServerSocketFull implements I2PServerSocket { * @throws SocketTimeoutException if the timeout has been reached */ - public I2PSocket accept(boolean blocking) throws I2PException, SocketTimeoutException { - long timeout = this.getSoTimeout(); + public I2PSocket accept(long timeout) throws I2PException { + long reset_timeout = this.getSoTimeout(); try { - if (blocking) - { - this.setSoTimeout(-1); - } else { - this.setSoTimeout(0); - } - try { - return this.accept(); - } catch (SocketTimeoutException e) { - if (blocking) throw e; - else return null ; - } - } finally { this.setSoTimeout(timeout); + return this.accept(); + } catch (SocketTimeoutException e) { + return null ; + } finally { + this.setSoTimeout(reset_timeout); } } - public boolean waitIncoming(long timeoutMs) throws InterruptedException { - return this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs); + public void waitIncoming(long timeoutMs) throws InterruptedException { + this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs); } }