2004-12-30 jrandom

* Revised the I2PTunnel client and httpclient connection establishment
      throttles.  There is now a pool of threads that build the I2PSocket
      connections with a default size of 5, configurable via the I2PTunnel
      client option 'i2ptunnel.numConnectionBuilders' (if set to 0, it will
      not throttle the number of concurrent builders, but will launch a thread
      per socket during establishment).  In addition, sockets accepted but
      not yet allocated to one of the connection builders will be destroyed
      after 30 seconds, configurable via 'i2ptunnel.maxWaitTime' (if set to
      0, it will wait indefinitely).
This commit is contained in:
jrandom
2004-12-30 22:51:16 +00:00
committed by zzz
parent 099f6a88c2
commit aec0b0c86a
4 changed files with 158 additions and 34 deletions

View File

@ -12,8 +12,10 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import net.i2p.I2PException;
@ -26,6 +28,7 @@ import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.data.Destination;
import net.i2p.util.EventDispatcher;
import net.i2p.util.I2PThread;
import net.i2p.util.SimpleTimer;
import net.i2p.util.Log;
public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runnable {
@ -58,7 +61,30 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
private String handlerName;
private Object conLock = new Object();
private int pendingConnections = 0;
/** List of Socket for those accept()ed but not yet started up */
private List _waitingSockets;
/** How many connections will we allow to be in the process of being built at once? */
private int _numConnectionBuilders;
/** How long will we allow sockets to sit in the _waitingSockets map before killing them? */
private int _maxWaitTime;
/**
* How many concurrent connections this I2PTunnel instance will allow to be
* in the process of connecting (or if less than 1, there is no limit)?
*/
public static final String PROP_NUM_CONNECTION_BUILDERS = "i2ptunnel.numConnectionBuilders";
/**
* How long will we let a socket wait after being accept()ed without getting
* pumped through a connection builder (in milliseconds). If this time is
* reached, the socket is unceremoniously closed and discarded. If the max
* wait time is less than 1, there is no limit.
*
*/
public static final String PROP_MAX_WAIT_TIME = "i2ptunnel.maxWaitTime";
private static final int DEFAULT_NUM_CONNECTION_BUILDERS = 5;
private static final int DEFAULT_MAX_WAIT_TIME = 30*1000;
//public I2PTunnelClientBase(int localPort, boolean ownDest,
// Logging l) {
@ -108,6 +134,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
}
}
configurePool(tunnel);
if (open && listenerReady) {
l.log("Ready! Port " + getLocalPort());
notifyEvent("openBaseClientResult", "ok");
@ -116,6 +144,37 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
notifyEvent("openBaseClientResult", "error");
}
}
/**
* build and configure the pool handling accept()ed but not yet
* established connections
*
*/
private void configurePool(I2PTunnel tunnel) {
_waitingSockets = new ArrayList(4);
Properties opts = tunnel.getClientOptions();
String maxWait = opts.getProperty(PROP_MAX_WAIT_TIME, DEFAULT_MAX_WAIT_TIME+"");
try {
_maxWaitTime = Integer.parseInt(maxWait);
} catch (NumberFormatException nfe) {
_maxWaitTime = DEFAULT_MAX_WAIT_TIME;
}
String numBuild = opts.getProperty(PROP_NUM_CONNECTION_BUILDERS, DEFAULT_NUM_CONNECTION_BUILDERS+"");
try {
_numConnectionBuilders = Integer.parseInt(numBuild);
} catch (NumberFormatException nfe) {
_numConnectionBuilders = DEFAULT_NUM_CONNECTION_BUILDERS;
}
for (int i = 0; i < _numConnectionBuilders; i++) {
String name = "ClientBuilder" + _clientId + '.' + i;
I2PThread b = new I2PThread(new TunnelConnectionBuilder(), name);
b.setDaemon(true);
b.start();
}
}
private static I2PSocketManager socketManager;
@ -250,6 +309,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
synchronized (this) {
notifyAll();
}
synchronized (_waitingSockets) { _waitingSockets.notifyAll(); }
return;
}
ss = new ServerSocket(localPort, 0, addr);
@ -292,6 +352,9 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
notifyAll();
}
}
synchronized (_waitingSockets) {
_waitingSockets.notifyAll();
}
}
/**
@ -300,16 +363,52 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
* @param s Socket to take care of
*/
protected void manageConnection(Socket s) {
boolean useBlocking = false;
synchronized (conLock) {
pendingConnections++;
if (pendingConnections > 5)
useBlocking = true;
if (s == null) return;
if (_numConnectionBuilders <= 0) {
new I2PThread(new BlockingRunner(s), "Clinet run").start();
return;
}
if (_maxWaitTime > 0)
SimpleTimer.getInstance().addEvent(new CloseEvent(s), _maxWaitTime);
synchronized (_waitingSockets) {
_waitingSockets.add(s);
_waitingSockets.notifyAll();
}
}
/**
* Blocking runner, used during the connection establishment whenever we
* are not using the queued builders.
*
*/
private class BlockingRunner implements Runnable {
private Socket _s;
public BlockingRunner(Socket s) { _s = s; }
public void run() {
clientConnectionRun(_s);
}
}
/**
* Remove and close the socket from the waiting list, if it is still there.
*
*/
private class CloseEvent implements SimpleTimer.TimedEvent {
private Socket _s;
public CloseEvent(Socket s) { _s = s; }
public void timeReached() {
boolean stillWaiting = false;
synchronized (_waitingSockets) {
stillWaiting = _waitingSockets.remove(_s);
}
if (stillWaiting) {
try { _s.close(); } catch (IOException ioe) {}
if (_log.shouldLog(Log.INFO))
_log.info("Closed a waiting socket because of backlog");
}
}
if (useBlocking)
clientConnectionRun(s);
else
new ClientConnectionRunner(s, handlerName);
}
public boolean close(boolean forced) {
@ -341,8 +440,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
}
l.log("Client closed.");
open = false;
return true;
}
synchronized (_waitingSockets) { _waitingSockets.notifyAll(); }
return true;
}
public static void closeSocket(Socket s) {
@ -352,22 +453,29 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
_log.error("Could not close socket", ex);
}
}
private static volatile long __runnerId = 0;
public class ClientConnectionRunner extends I2PThread {
private Socket s;
public ClientConnectionRunner(Socket s, String name) {
this.s = s;
setName(name + '.' + (++__runnerId));
start();
}
public void run() {
clientConnectionRun(s);
synchronized (conLock) {
pendingConnections--;
/**
* Pool runner pulling sockets off the waiting list and pushing them
* through clientConnectionRun. This dies when the I2PTunnel instance
* is closed.
*
*/
private class TunnelConnectionBuilder implements Runnable {
public void run() {
Socket s = null;
while (open) {
try {
synchronized (_waitingSockets) {
if (_waitingSockets.size() <= 0)
_waitingSockets.wait();
else
s = (Socket)_waitingSockets.remove(0);
}
} catch (InterruptedException ie) {}
if (s != null)
clientConnectionRun(s);
s = null;
}
}
}

View File

@ -1,4 +1,15 @@
$Id: history.txt,v 1.120 2004/12/29 15:06:43 jrandom Exp $
$Id: history.txt,v 1.121 2004/12/29 17:16:42 jrandom Exp $
2004-12-30 jrandom
* Revised the I2PTunnel client and httpclient connection establishment
throttles. There is now a pool of threads that build the I2PSocket
connections with a default size of 5, configurable via the I2PTunnel
client option 'i2ptunnel.numConnectionBuilders' (if set to 0, it will
not throttle the number of concurrent builders, but will launch a thread
per socket during establishment). In addition, sockets accepted but
not yet allocated to one of the connection builders will be destroyed
after 30 seconds, configurable via 'i2ptunnel.maxWaitTime' (if set to
0, it will wait indefinitely).
2004-12-29 jrandom
* Imported Ragnarok's addressbook source (2.0.2) which is built but not

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.125 $ $Date: 2004/12/29 15:06:44 $";
public final static String ID = "$Revision: 1.126 $ $Date: 2004/12/29 17:16:42 $";
public final static String VERSION = "0.4.2.5";
public final static long BUILD = 3;
public final static long BUILD = 4;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -163,11 +163,14 @@ class ConnectionRunner implements Runnable {
long timeSinceWrite = _context.clock().now() - _lastWrite;
if (timeSinceWrite > 5*TIME_SEND_FREQUENCY) {
TCPTransport t = _con.getTransport();
t.addConnectionErrorMessage("Connection closed with "
+ _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6)
+ " due to " + DataHelper.formatDuration(timeSinceWrite)
+ " of inactivity after "
+ DataHelper.formatDuration(_con.getLifetime()));
String msg = "Connection closed with "
+ _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6)
+ " due to " + DataHelper.formatDuration(timeSinceWrite)
+ " of inactivity after "
+ DataHelper.formatDuration(_con.getLifetime());
t.addConnectionErrorMessage(msg);
if (_log.shouldLog(Log.INFO))
_log.info(msg);
_con.closeConnection(false);
return;
}
@ -187,5 +190,7 @@ class ConnectionRunner implements Runnable {
msg.setMessage(buildTimeMessage());
msg.setPriority(100);
_con.addMessage(msg);
if (_log.shouldLog(Log.INFO))
_log.info("Enqueueing time message to " + _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6));
}
}