minimize differences with mainstream apps/streaming

This commit is contained in:
mkvore-commit
2009-04-04 12:37:19 +00:00
parent de6edc6a99
commit 2cf5221620
4 changed files with 54 additions and 44 deletions

View File

@ -31,32 +31,33 @@ public interface I2PServerSocket {
public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
/** /**
* accept(true) has the same behaviour as accept(). * accept(timeout) waits timeout ms for a socket connecting. If a socket is
* accept(false) does not wait for a socket connecting. If a socket is * not available during the timeout, return null. accept(0) behaves like accept()
* available in the queue, it is accepted. Else, null is returned.
* *
* @param true if the call should block until a socket is available * @param timeout in ms
* *
* @return a connected I2PSocket, or null * @return a connected I2PSocket, or null
* *
* @throws I2PException if there is a problem with reading a new socket * @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc) * from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
* @throws SocketTimeoutException * @throws InterruptedException if thread is interrupted while waiting
*/ */
public I2PSocket accept(long timeout) throws I2PException, ConnectException, SocketTimeoutException, InterruptedException; public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException;
/** /**
* Waits until there is a socket waiting for acception or the timeout is * Wait until there is a socket waiting for acception or the timeout is
* reached. * reached.
* *
* @param timeoutMs timeout in ms. A negative value waits forever. * @param timeoutMs timeout in ms. If ms is 0, wait forever.
* *
* @return true if a socket is available, false if not * @return true if a socket is available, false if not
* *
* @throws I2PException if there is a problem with reading a new socket * @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc) * from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
* @throws InterruptedException if the thread is interrupted before
* completion
*/ */
public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException; public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException;

View File

@ -1,6 +1,7 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -49,11 +50,6 @@ class I2PServerSocketImpl implements I2PServerSocket {
this.mgr = mgr; this.mgr = mgr;
} }
/** /**
* Waits until there is a socket waiting for acception or the timeout is * Waits until there is a socket waiting for acception or the timeout is
* reached. * reached.
@ -63,6 +59,7 @@ class I2PServerSocketImpl implements I2PServerSocket {
* @throws I2PException if there is a problem with reading a new socket * @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc) * from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
* @throws InterruptedException if thread is interrupted while waiting
*/ */
public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException { public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -92,21 +89,19 @@ class I2PServerSocketImpl implements I2PServerSocket {
} }
} }
/** /**
* accept(true) has the same behaviour as accept(). * accept(timeout) waits timeout ms for a socket connecting. If a socket is
* accept(false) does not wait for a socket connecting. If a socket is * not available during the timeout, return null. accept(0) behaves like accept()
* available in the queue, it is accepted. Else, null is returned.
* *
* @param true if the call should block until a socket is available * @param timeout in ms
* *
* @return a connected I2PSocket, or null * @return a connected I2PSocket, or null
* *
* @throws I2PException if there is a problem with reading a new socket * @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc) * from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
* @throws InterruptedException if thread is interrupted while waiting
*/ */
public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException { public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException {
I2PSocket ret = null; I2PSocket ret = null;

View File

@ -2,8 +2,6 @@ package net.i2p.client.streaming;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -42,7 +40,8 @@ class ConnectionHandler {
_context = context; _context = context;
_log = context.logManager().getLog(ConnectionHandler.class); _log = context.logManager().getLog(ConnectionHandler.class);
_manager = mgr; _manager = mgr;
_synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE); _synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE);
_synSignal= new Object();
_active = false; _active = false;
_acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
} }
@ -83,12 +82,8 @@ class ConnectionHandler {
if (success) { if (success) {
SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout); SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout);
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
synchronized (this._synSignal) synchronized (this._synSignal) {this._synSignal.notifyAll();}
{ } else {
this._synSignal.notifyAll();
}
} else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Dropping new SYN request, as the queue is full"); _log.warn("Dropping new SYN request, as the queue is full");
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
@ -103,9 +98,24 @@ class ConnectionHandler {
* @throws InterruptedException * @throws InterruptedException
*/ */
public void waitSyn( long ms ) throws InterruptedException { public void waitSyn( long ms ) throws InterruptedException {
synchronized (this._synSignal) synchronized (this._synSignal)
{ {
this._synSignal.wait(ms); long now = this._context.clock().now() ;
long expiration = now + ms ;
while ( expiration > now || ms<=0 ) {
// check we have not missed a SYN packet before entering
// the lock
for ( Packet p : this._synQueue ) {
if ( p.isFlagSet(Packet.FLAG_SYNCHRONIZE) ) return ;
}
// wait until a SYN is signaled
if ( ms == 0) {
this._synSignal.wait();
} else {
this._synSignal.wait(expiration-now);
now = this._context.clock().now();
}
}
} }
} }
@ -114,8 +124,8 @@ class ConnectionHandler {
* Non-SYN packets with a zero SendStreamID may also be queued here so * Non-SYN packets with a zero SendStreamID may also be queued here so
* that they don't get thrown away while the SYN packet before it is queued. * that they don't get thrown away while the SYN packet before it is queued.
* *
* @param timeoutMs max amount of time to wait for a connection (if negative, * @param timeoutMs max amount of time to wait for a connection (if less
* wait indefinitely) * than 1ms, wait indefinitely)
* @return connection received, or null if there was a timeout or the * @return connection received, or null if there was a timeout or the
* handler was shut down * handler was shut down
*/ */
@ -125,6 +135,8 @@ class ConnectionHandler {
long expiration = timeoutMs + _context.clock().now(); long expiration = timeoutMs + _context.clock().now();
while (true) { while (true) {
if ( (timeoutMs > 0) && (expiration < _context.clock().now()) )
return null;
if (!_active) { if (!_active) {
// fail all the ones we had queued up // fail all the ones we had queued up
while(true) { while(true) {
@ -136,9 +148,6 @@ class ConnectionHandler {
return null; return null;
} }
if ( (timeoutMs > 0) && (expiration < _context.clock().now()) )
return null;
Packet syn = null; Packet syn = null;
while ( _active && syn == null) { while ( _active && syn == null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@ -1,12 +1,9 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.I2PException; import net.i2p.I2PException;
import net.i2p.util.Clock;
import net.i2p.util.Log;
/** /**
* Bridge to allow accepting new connections * Bridge to allow accepting new connections
@ -52,17 +49,15 @@ public class I2PServerSocketFull implements I2PServerSocket {
} }
/** /**
* accept(true) has the same behaviour as accept(). * accept(timeout) waits timeout ms for a socket connecting. If a socket is
* accept(false) does not wait for a socket connecting. If a socket is * not available during the timeout, return null. accept(0) behaves like accept()
* available in the queue, it is accepted. Else, null is returned.
* *
* @param true if the call should block until a socket is available * @param timeout in ms
* *
* @return a connected I2PSocket, or null * @return a connected I2PSocket, or null
* *
* @throws I2PException if there is a problem with reading a new socket * @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc) * from the data available (aka the I2PSession closed, etc)
* @throws SocketTimeoutException if the timeout has been reached
*/ */
public I2PSocket accept(long timeout) throws I2PException { public I2PSocket accept(long timeout) throws I2PException {
@ -78,7 +73,17 @@ public class I2PServerSocketFull implements I2PServerSocket {
} }
} }
public void waitIncoming(long timeoutMs) throws InterruptedException { /**
* block until a SYN packet is detected or the timeout is reached. If timeout is 0,
* block until a SYN packet is detected.
*
* @param timeoutMs
* @throws InterruptedException
* @throws I2PException
*/
public void waitIncoming(long timeoutMs) throws I2PException, InterruptedException {
if (this._socketManager.getConnectionManager().getSession().isClosed())
throw new I2PException("Session is closed");
this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs); this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs);
} }
} }