merge of '40606168f4086dbe122e96b533df4f24b5e4d87d'

and '44a775450f24ec5d2e921ab01b94546968f81851'
This commit is contained in:
mkvore-commit
2009-05-05 09:24:26 +00:00
45 changed files with 2922 additions and 229 deletions

View File

@ -21,6 +21,7 @@ class ConnectionHandler {
private Log _log;
private ConnectionManager _manager;
private LinkedBlockingQueue<Packet> _synQueue;
private Object _synSignal;
private boolean _active;
private int _acceptTimeout;
@ -39,7 +40,8 @@ class ConnectionHandler {
_context = context;
_log = context.logManager().getLog(ConnectionHandler.class);
_manager = mgr;
_synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
_synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE);
_synSignal= new Object();
_active = false;
_acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
}
@ -79,6 +81,10 @@ class ConnectionHandler {
boolean success = _synQueue.offer(packet); // fail immediately if full
if (success) {
SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout);
// advertise the new syn packet to threads that could be waiting
// (by calling waitSyn(long)
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
synchronized (this._synSignal) {this._synSignal.notifyAll();}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping new SYN request, as the queue is full");
@ -87,6 +93,33 @@ class ConnectionHandler {
}
}
/**
* Wait until some SYN packet is available
* @param ms max amount of time to wait for a connection (if negative or null,
* wait indefinitely)
* @throws InterruptedException
*/
public void waitSyn( long ms ) throws InterruptedException {
synchronized (this._synSignal)
{
long now = this._context.clock().now() ;
long expiration = now + ms ;
while ( expiration > now || ms<=0 ) {
// check if there is a SYN packet in the queue
for ( Packet p : this._synQueue ) {
if ( p.isFlagSet(Packet.FLAG_SYNCHRONIZE) ) return ;
}
// wait until a SYN is signaled
if ( ms == 0) {
this._synSignal.wait();
} else {
this._synSignal.wait(expiration-now);
now = this._context.clock().now();
}
}
}
}
/**
* Receive an incoming connection (built from a received SYN)
* Non-SYN packets with a zero SendStreamID may also be queued here so

View File

@ -1,6 +1,8 @@
package net.i2p.client.streaming;
import java.net.SocketTimeoutException;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
/**
@ -45,4 +47,43 @@ public class I2PServerSocketFull implements I2PServerSocket {
public I2PSocketManager getManager() {
return _socketManager;
}
/**
* accept(timeout) waits timeout ms for a socket connecting. If a socket is
* not available during the timeout, return null. accept(0) behaves like accept()
*
* @param timeout in ms
*
* @return a connected I2PSocket, or null
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
*/
public I2PSocket accept(long timeout) throws I2PException {
long reset_timeout = this.getSoTimeout();
try {
this.setSoTimeout(timeout);
return this.accept();
} catch (SocketTimeoutException e) {
return null ;
} finally {
this.setSoTimeout(reset_timeout);
}
}
/**
* block until a SYN packet is detected or the timeout is reached. If timeout is 0,
* block until a SYN packet is detected.
*
* @param timeoutMs
* @throws InterruptedException
* @throws I2PException
*/
public void waitIncoming(long timeoutMs) throws I2PException, InterruptedException {
if (this._socketManager.getConnectionManager().getSession().isClosed())
throw new I2PException("Session is closed");
this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs);
}
}