forked from I2P_Developers/i2p.i2p
add accept timeouts (default is that if the server doesnt .accept() in 5s, refuse the con)
add unique IDs to the various threads for logging / tracing purposes
This commit is contained in:
@ -4,71 +4,147 @@ import java.net.ConnectException;
|
||||
|
||||
import net.i2p.I2PException;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.Clock;
|
||||
import net.i2p.I2PAppContext;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* Initial stub implementation for the server socket
|
||||
* Server socket implementation, allowing multiple threads to accept I2PSockets
|
||||
* and pull from a queue populated by various threads (each of whom have their own
|
||||
* timeout)
|
||||
*
|
||||
*/
|
||||
class I2PServerSocketImpl implements I2PServerSocket {
|
||||
private final static Log _log = new Log(I2PServerSocketImpl.class);
|
||||
private I2PSocketManager mgr;
|
||||
private I2PSocket cached = null; // buffer one socket here
|
||||
|
||||
private boolean closing = false; // Are we being closed?
|
||||
|
||||
private Object acceptLock = new Object();
|
||||
|
||||
/** list of sockets waiting for the client to accept them */
|
||||
private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
|
||||
|
||||
/** have we been closed */
|
||||
private volatile boolean closing = false;
|
||||
|
||||
/** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
|
||||
private Object socketAcceptedLock = new Object();
|
||||
/** lock on this when adding a new socket to the pending list, and wait on it accordingly */
|
||||
private Object socketAddedLock = new Object();
|
||||
|
||||
public I2PServerSocketImpl(I2PSocketManager mgr) {
|
||||
this.mgr = mgr;
|
||||
}
|
||||
|
||||
public synchronized I2PSocket accept() throws I2PException, ConnectException {
|
||||
I2PSocket ret;
|
||||
|
||||
synchronized (acceptLock) {
|
||||
while ((cached == null) && !closing) {
|
||||
myWait();
|
||||
}
|
||||
|
||||
if (closing) {
|
||||
throw new ConnectException("I2PServerSocket closed");
|
||||
}
|
||||
|
||||
ret = cached;
|
||||
cached = null;
|
||||
acceptLock.notifyAll();
|
||||
}
|
||||
|
||||
_log.debug("TIMING: handed out accept result " + ret.hashCode());
|
||||
|
||||
/**
|
||||
* Waits for the next socket connecting. If a remote user tried to make a
|
||||
* connection and the local application wasn't .accept()ing new connections,
|
||||
* they should get refused (if .accept() doesnt occur in some small period)
|
||||
*
|
||||
* @return a connected I2PSocket
|
||||
*
|
||||
* @throws I2PException if there is a problem with reading a new socket
|
||||
* from the data available (aka the I2PSession closed, etc)
|
||||
* @throws ConnectException if the I2PServerSocket is closed
|
||||
*/
|
||||
public I2PSocket accept() throws I2PException, ConnectException {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("accept() called, pending: " + pendingSockets.size());
|
||||
|
||||
I2PSocket ret = null;
|
||||
|
||||
while ( (ret == null) && (!closing) ){
|
||||
while (pendingSockets.size() <= 0) {
|
||||
if (closing) throw new ConnectException("I2PServerSocket closed");
|
||||
try {
|
||||
synchronized(socketAddedLock) {
|
||||
socketAddedLock.wait();
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
synchronized (pendingSockets) {
|
||||
if (pendingSockets.size() > 0) {
|
||||
ret = (I2PSocket)pendingSockets.remove(0);
|
||||
}
|
||||
}
|
||||
if (ret != null) {
|
||||
synchronized (socketAcceptedLock) {
|
||||
socketAcceptedLock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("TIMING: handed out accept result " + ret.hashCode());
|
||||
return ret;
|
||||
}
|
||||
|
||||
public boolean getNewSocket(I2PSocket s) {
|
||||
synchronized (acceptLock) {
|
||||
while (cached != null) {
|
||||
myWait();
|
||||
}
|
||||
cached = s;
|
||||
acceptLock.notifyAll();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Make the socket available and wait until the client app accepts it, or until
|
||||
* the given timeout elapses. This doesn't have any limits on the queue size -
|
||||
* perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
|
||||
*
|
||||
* @param timeoutMs how long to wait until accept
|
||||
* @return true if the socket was accepted, false if the timeout expired
|
||||
* or the socket was closed
|
||||
*/
|
||||
public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("addWaitForAccept [new socket arrived, pending: " + pendingSockets.size());
|
||||
|
||||
if (closing) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Already closing the socket");
|
||||
return false;
|
||||
}
|
||||
|
||||
Clock clock = I2PAppContext.getGlobalContext().clock();
|
||||
long start = clock.now();
|
||||
long end = start + timeoutMs;
|
||||
pendingSockets.add(s);
|
||||
synchronized (socketAddedLock) {
|
||||
socketAddedLock.notifyAll();
|
||||
}
|
||||
|
||||
// keep looping until the socket has been grabbed by the accept()
|
||||
// (or the expiration passes, or the socket is closed)
|
||||
while (pendingSockets.contains(s)) {
|
||||
long now = clock.now();
|
||||
if (now >= end) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms");
|
||||
pendingSockets.remove(s);
|
||||
return false;
|
||||
}
|
||||
if (closing) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Server socket closed while waiting for accept");
|
||||
pendingSockets.remove(s);
|
||||
return false;
|
||||
}
|
||||
long remaining = end - now;
|
||||
try {
|
||||
synchronized (socketAcceptedLock) {
|
||||
socketAcceptedLock.wait(remaining);
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
long now = clock.now();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.info("Socket accepted after " + (now-start) + "ms");
|
||||
return true;
|
||||
}
|
||||
|
||||
public void close() throws I2PException {
|
||||
synchronized (acceptLock) {
|
||||
closing = true;
|
||||
acceptLock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public I2PSocketManager getManager() {
|
||||
return mgr;
|
||||
}
|
||||
|
||||
private void myWait() {
|
||||
try {
|
||||
acceptLock.wait();
|
||||
} catch (InterruptedException ex) {}
|
||||
|
||||
public void close() {
|
||||
closing = true;
|
||||
// let anyone .accept()ing know to fsck off
|
||||
synchronized (socketAddedLock) {
|
||||
socketAddedLock.notifyAll();
|
||||
}
|
||||
// let anyone addWaitForAccept()ing know to fsck off
|
||||
synchronized (socketAcceptedLock) {
|
||||
socketAcceptedLock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public I2PSocketManager getManager() { return mgr; }
|
||||
}
|
||||
|
@ -268,6 +268,7 @@ class I2PSocketImpl implements I2PSocket {
|
||||
}
|
||||
}
|
||||
|
||||
private static volatile long __runnerId = 0;
|
||||
public class I2PSocketRunner extends I2PThread {
|
||||
|
||||
public InputStream in;
|
||||
@ -276,7 +277,7 @@ class I2PSocketImpl implements I2PSocket {
|
||||
_log.debug("Runner's input stream is: " + in.hashCode());
|
||||
this.in = in;
|
||||
String peer = I2PSocketImpl.this.remote.calculateHash().toBase64();
|
||||
setName("SocketRunner from " + peer.substring(0, 4));
|
||||
setName("SocketRunner " + (++__runnerId) + " " + peer.substring(0, 4));
|
||||
start();
|
||||
}
|
||||
|
||||
|
@ -41,6 +41,7 @@ public class I2PSocketManager implements I2PSessionListener {
|
||||
private HashMap _outSockets;
|
||||
private HashMap _inSockets;
|
||||
private I2PSocketOptions _defaultOptions;
|
||||
private long _acceptTimeout;
|
||||
|
||||
public static final short ACK = 0x51;
|
||||
public static final short CLOSE_OUT = 0x52;
|
||||
@ -50,10 +51,17 @@ public class I2PSocketManager implements I2PSessionListener {
|
||||
public static final short DATA_IN = 0xA0;
|
||||
public static final short CHAFF = 0xFF;
|
||||
|
||||
/**
|
||||
* How long to wait for the client app to accept() before sending back CLOSE?
|
||||
* This includes the time waiting in the queue. Currently set to 5 seconds.
|
||||
*/
|
||||
private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
|
||||
|
||||
public I2PSocketManager() {
|
||||
_session = null;
|
||||
_inSockets = new HashMap(16);
|
||||
_outSockets = new HashMap(16);
|
||||
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
|
||||
}
|
||||
|
||||
public I2PSession getSession() {
|
||||
@ -64,6 +72,15 @@ public class I2PSocketManager implements I2PSessionListener {
|
||||
_session = session;
|
||||
if (session != null) session.setSessionListener(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* How long should we wait for the client to .accept() a socket before
|
||||
* sending back a NACK/Close?
|
||||
*
|
||||
* @param ms milliseconds to wait, maximum
|
||||
*/
|
||||
public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
|
||||
public long getAcceptTimeout() { return _acceptTimeout; }
|
||||
|
||||
public void disconnected(I2PSession session) {
|
||||
_log.info("Disconnected from the session");
|
||||
@ -260,23 +277,25 @@ public class I2PSocketManager implements I2PSessionListener {
|
||||
return;
|
||||
}
|
||||
|
||||
if (_serverSocket.getNewSocket(s)) {
|
||||
if (_serverSocket.addWaitForAccept(s, _acceptTimeout)) {
|
||||
_inSockets.put(newLocalID, s);
|
||||
byte[] packet = makePacket((byte) ACK, id, toBytes(newLocalID));
|
||||
boolean replySentOk = false;
|
||||
replySentOk = _session.sendMessage(d, packet);
|
||||
if (!replySentOk) {
|
||||
_log.error("Error sending reply to " + d.calculateHash().toBase64()
|
||||
+ " in response to a new con message",
|
||||
new Exception("Failed creation"));
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error sending reply to " + d.calculateHash().toBase64()
|
||||
+ " in response to a new con message",
|
||||
new Exception("Failed creation"));
|
||||
s.internalClose();
|
||||
}
|
||||
} else {
|
||||
// timed out or serverSocket closed
|
||||
byte[] packet = toBytes(" " + id);
|
||||
packet[0] = CLOSE_OUT;
|
||||
boolean nackSent = session.sendMessage(d, packet);
|
||||
if (!nackSent) {
|
||||
_log.error("Error sending NACK for session creation");
|
||||
_log.warn("Error sending NACK for session creation");
|
||||
}
|
||||
s.internalClose();
|
||||
}
|
||||
@ -461,14 +480,9 @@ public class I2PSocketManager implements I2PSessionListener {
|
||||
*
|
||||
*/
|
||||
public void destroySocketManager() {
|
||||
|
||||
try {
|
||||
if (_serverSocket != null) {
|
||||
_serverSocket.close();
|
||||
_serverSocket = null;
|
||||
}
|
||||
} catch (I2PException ex) {
|
||||
_log.error("Error closing I2PServerSocket", ex);
|
||||
if (_serverSocket != null) {
|
||||
_serverSocket.close();
|
||||
_serverSocket = null;
|
||||
}
|
||||
|
||||
synchronized (lock) {
|
||||
|
Reference in New Issue
Block a user