From 097a4647a82299b74ad99b000f7728dd055f805c Mon Sep 17 00:00:00 2001 From: jrandom Date: Wed, 19 May 2004 15:20:55 +0000 Subject: [PATCH] handle i2ptunnel server connection .accept()s asynchronously so we don't refuse lots of requests, causing intermittent "failures" use the new async error listening interface of the ministreaming lib truckloads of logging --- .../java/src/net/i2p/i2ptunnel/I2PTunnel.java | 41 ++++++++------- .../i2p/i2ptunnel/I2PTunnelClientBase.java | 2 +- .../i2p/i2ptunnel/I2PTunnelHTTPClient.java | 52 ++++++++++--------- .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 19 +++++-- .../net/i2p/i2ptunnel/I2PTunnelServer.java | 50 ++++++++++++++---- 5 files changed, 106 insertions(+), 58 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java index 8e8497ac1..3fb439387 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java @@ -67,6 +67,8 @@ public class I2PTunnel implements Logging, EventDispatcher { private Log _log; private EventDispatcherImpl _event; private I2PAppContext _context; + private static long __tunnelId = 0; + private long _tunnelId; public static final int PACKET_DELAY = 100; @@ -98,7 +100,8 @@ public class I2PTunnel implements Logging, EventDispatcher { } public I2PTunnel(String[] args, ConnectionEventListener lsnr) { - _context = new I2PAppContext(); + _context = I2PAppContext.getGlobalContext(); // new I2PAppContext(); + _tunnelId = ++__tunnelId; _log = _context.logManager().getLog(I2PTunnel.class); _event = new EventDispatcherImpl(); addConnectionEventListener(lsnr); @@ -114,7 +117,7 @@ public class I2PTunnel implements Logging, EventDispatcher { checkRunByE = false; } else if (args[i].equals("-nogui")) { gui = false; - _log.warn("The `-nogui' option of I2PTunnel is deprecated.\n" + _log.warn(getPrefix() + "The `-nogui' option of I2PTunnel is deprecated.\n" + "Use `-cli', `-nocli' (aka `-wait') or `-die' instead."); } else if (args[i].equals("-cli")) { gui = false; @@ -280,7 +283,7 @@ public class I2PTunnel implements Logging, EventDispatcher { serverHost = InetAddress.getByName(args[0]); } catch (UnknownHostException uhe) { l.log("unknown host"); - _log.error("Error resolving " + args[0], uhe); + _log.error(getPrefix() + "Error resolving " + args[0], uhe); notifyEvent("serverTaskId", new Integer(-1)); return; } @@ -289,7 +292,7 @@ public class I2PTunnel implements Logging, EventDispatcher { portNum = Integer.parseInt(args[1]); } catch (NumberFormatException nfe) { l.log("invalid port"); - _log.error("Port specified is not valid: " + args[1], nfe); + _log.error(getPrefix() + "Port specified is not valid: " + args[1], nfe); notifyEvent("serverTaskId", new Integer(-1)); return; } @@ -297,7 +300,7 @@ public class I2PTunnel implements Logging, EventDispatcher { privKeyFile = new File(args[2]); if (!privKeyFile.canRead()) { l.log("private key file does not exist"); - _log.error("Private key file does not exist or is not readable: " + args[2]); + _log.error(getPrefix() + "Private key file does not exist or is not readable: " + args[2]); notifyEvent("serverTaskId", new Integer(-1)); return; } @@ -333,7 +336,7 @@ public class I2PTunnel implements Logging, EventDispatcher { serverHost = InetAddress.getByName(args[0]); } catch (UnknownHostException uhe) { l.log("unknown host"); - _log.error("Error resolving " + args[0], uhe); + _log.error(getPrefix() + "Error resolving " + args[0], uhe); notifyEvent("serverTaskId", new Integer(-1)); return; } @@ -342,7 +345,7 @@ public class I2PTunnel implements Logging, EventDispatcher { portNum = Integer.parseInt(args[1]); } catch (NumberFormatException nfe) { l.log("invalid port"); - _log.error("Port specified is not valid: " + args[1], nfe); + _log.error(getPrefix() + "Port specified is not valid: " + args[1], nfe); notifyEvent("serverTaskId", new Integer(-1)); return; } @@ -378,7 +381,7 @@ public class I2PTunnel implements Logging, EventDispatcher { port = Integer.parseInt(args[0]); } catch (NumberFormatException nfe) { l.log("invalid port"); - _log.error("Port specified is not valid: " + args[0], nfe); + _log.error(getPrefix() + "Port specified is not valid: " + args[0], nfe); notifyEvent("clientTaskId", new Integer(-1)); return; } @@ -410,7 +413,7 @@ public class I2PTunnel implements Logging, EventDispatcher { port = Integer.parseInt(args[0]); } catch (NumberFormatException nfe) { l.log("invalid port"); - _log.error("Port specified is not valid: " + args[0], nfe); + _log.error(getPrefix() + "Port specified is not valid: " + args[0], nfe); notifyEvent("httpclientTaskId", new Integer(-1)); return; } @@ -451,7 +454,7 @@ public class I2PTunnel implements Logging, EventDispatcher { port = Integer.parseInt(args[0]); } catch (NumberFormatException nfe) { l.log("invalid port"); - _log.error("Port specified is not valid: " + args[0], nfe); + _log.error(getPrefix() + "Port specified is not valid: " + args[0], nfe); notifyEvent("sockstunnelTaskId", new Integer(-1)); return; } @@ -565,7 +568,7 @@ public class I2PTunnel implements Logging, EventDispatcher { pubdest = new FileOutputStream(args[1]); } catch (IOException ioe) { l.log("Error opening output stream"); - _log.error("Error generating keys to out", ioe); + _log.error(getPrefix() + "Error generating keys to out", ioe); notifyEvent("genkeysResult", "error"); return; } @@ -588,7 +591,7 @@ public class I2PTunnel implements Logging, EventDispatcher { } catch (IOException ioe) { l.log("Error generating keys - " + ioe.getMessage()); notifyEvent("genkeysResult", "error"); - _log.error("Error generating keys", ioe); + _log.error(getPrefix() + "Error generating keys", ioe); } } @@ -722,7 +725,7 @@ public class I2PTunnel implements Logging, EventDispatcher { notifyEvent("runResult", "ok"); } catch (IOException ioe) { l.log("IO error running the file"); - _log.error("Error running the file", ioe); + _log.error(getPrefix() + "Error running the file", ioe); notifyEvent("runResult", "error"); } } else { @@ -796,12 +799,12 @@ public class I2PTunnel implements Logging, EventDispatcher { private boolean closetask(int num, boolean forced, Logging l) { boolean closed = false; - _log.debug("closetask(): looking for task " + num); + _log.debug(getPrefix() + "closetask(): looking for task " + num); synchronized (tasks) { for (Iterator it = tasks.iterator(); it.hasNext();) { I2PTunnelTask t = (I2PTunnelTask) it.next(); int id = t.getId(); - _log.debug("closetask(): parsing task " + id + " (" + t.toString() + ")"); + _log.debug(getPrefix() + "closetask(): parsing task " + id + " (" + t.toString() + ")"); if (id == num) { closed = closetask(t, forced, l); break; @@ -836,7 +839,7 @@ public class I2PTunnel implements Logging, EventDispatcher { for (Iterator it = tasks.iterator(); it.hasNext();) { I2PTunnelTask t = (I2PTunnelTask) it.next(); if (!t.isOpen()) { - _log.debug("Purging inactive tunnel: [" + t.getId() + "] " + t.toString()); + _log.debug(getPrefix() + "Purging inactive tunnel: [" + t.getId() + "] " + t.toString()); it.remove(); } } @@ -849,7 +852,7 @@ public class I2PTunnel implements Logging, EventDispatcher { */ public void log(String s) { System.out.println(s); - _log.info("Display: " + s); + _log.info(getPrefix() + "Display: " + s); } /** @@ -982,6 +985,8 @@ public class I2PTunnel implements Logging, EventDispatcher { listeners.remove(lsnr); } } + + private String getPrefix() { return '[' + _tunnelId + "]: "; } /** * Call this whenever we lose touch with the router involuntarily (aka the router @@ -989,7 +994,7 @@ public class I2PTunnel implements Logging, EventDispatcher { * */ void routerDisconnected() { - _log.error("Router disconnected - firing notification events"); + _log.error(getPrefix() + "Router disconnected - firing notification events"); synchronized (listeners) { for (Iterator iter = listeners.iterator(); iter.hasNext();) { ConnectionEventListener lsnr = (ConnectionEventListener) iter.next(); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index 1dec0266a..41106a49b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -34,7 +34,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna private static final long DEFAULT_CONNECT_TIMEOUT = 60 * 1000; private static volatile long __clientId = 0; - private long _clientId; + protected long _clientId; protected Object sockLock = new Object(); // Guards sockMgr and mySockets private I2PSocketManager sockMgr; private List mySockets = new ArrayList(); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java index 1985a9feb..09b9cbf4b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java @@ -98,6 +98,8 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable notifyEvent("openHTTPClientResult", "ok"); } + private String getPrefix() { return "Client[" + _clientId + "]: "; } + protected void clientConnectionRun(Socket s) { OutputStream out = null; String targetRequest = null; @@ -110,7 +112,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable StringBuffer newRequest = new StringBuffer(); while ((line = br.readLine()) != null) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Line=[" + line + "]"); + _log.debug(getPrefix() + "Line=[" + line + "]"); if (line.startsWith("Connection: ") || line.startsWith("Keep-Alive: ") || @@ -119,7 +121,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable if (method == null) { // first line (GET /base64/realaddr) if (_log.shouldLog(Log.DEBUG)) - _log.debug("Method is null for [" + line + "]"); + _log.debug(getPrefix() + "Method is null for [" + line + "]"); int pos = line.indexOf(" "); if (pos == -1) break; @@ -155,7 +157,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable destination = wwwProxy; usingWWWProxy = true; if (_log.shouldLog(Log.DEBUG)) - _log.debug("Host doesnt end with .i2p and it contains a period [" + host + "]: wwwProxy!"); + _log.debug(getPrefix() + "Host doesnt end with .i2p and it contains a period [" + host + "]: wwwProxy!"); } else { request = request.substring(pos + 1); pos = request.indexOf("/"); @@ -165,27 +167,27 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable boolean isValid = usingWWWProxy || isSupportedAddress(host, protocol); if (!isValid) { - if (_log.shouldLog(Log.INFO)) _log.info("notValid(" + host + ")"); + if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "notValid(" + host + ")"); method = null; destination = null; break; } else if (!usingWWWProxy) { - if (_log.shouldLog(Log.INFO)) _log.info("host=getHostName(" + destination + ")"); + if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "host=getHostName(" + destination + ")"); host = getHostName(destination); // hide original host } if (_log.shouldLog(Log.DEBUG)) { - _log.debug("METHOD:" + method + ":"); - _log.debug("PROTOC:" + protocol + ":"); - _log.debug("HOST :" + host + ":"); - _log.debug("DEST :" + destination + ":"); + _log.debug(getPrefix() + "METHOD:" + method + ":"); + _log.debug(getPrefix() + "PROTOC:" + protocol + ":"); + _log.debug(getPrefix() + "HOST :" + host + ":"); + _log.debug(getPrefix() + "DEST :" + destination + ":"); } } else { if (line.startsWith("Host: ") && !usingWWWProxy) { line = "Host: " + host; if (_log.shouldLog(Log.INFO)) - _log.info("Setting host = " + host); + _log.info(getPrefix() + "Setting host = " + host); } } @@ -197,7 +199,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable } } if (_log.shouldLog(Log.DEBUG)) - _log.debug("NewRequest header: [" + newRequest.toString() + "]"); + _log.debug(getPrefix() + "NewRequest header: [" + newRequest.toString() + "]"); while (br.ready()) { // empty the buffer (POST requests) int i = br.read(); @@ -219,7 +221,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Destination: " + destination); + _log.debug(getPrefix() + "Destination: " + destination); Destination dest = I2PTunnel.destFromName(destination); if (dest == null) { @@ -236,19 +238,19 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable timeoutThread.start(); } catch (SocketException ex) { if (timeoutThread != null) timeoutThread.disable(); - _log.info("Error trying to connect", ex); + _log.info(getPrefix() + "Error trying to connect", ex); l.log(ex.getMessage()); handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, wwwProxy); closeSocket(s); } catch (IOException ex) { if (timeoutThread != null) timeoutThread.disable(); - _log.info("Error trying to connect", ex); + _log.info(getPrefix() + "Error trying to connect", ex); l.log(ex.getMessage()); handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, wwwProxy); closeSocket(s); } catch (I2PException ex) { if (timeoutThread != null) timeoutThread.disable(); - _log.info("Error trying to connect", ex); + _log.info("getPrefix() + Error trying to connect", ex); l.log(ex.getMessage()); handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, wwwProxy); closeSocket(s); @@ -277,7 +279,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable _useWWWProxy = useWWWProxy; _disabled = false; long timeoutId = ++__timeoutId; - setName("InactivityThread " + timeoutId); + setName("InactivityThread " + getPrefix() + timeoutId); } public void disable() { @@ -290,15 +292,15 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable public void run() { while (!_disabled) { if (_runner.isFinished()) { - if (_log.shouldLog(Log.INFO)) _log.info("HTTP client request completed prior to timeout"); + if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "HTTP client request completed prior to timeout"); return; } if (_runner.getLastActivityOn() < Clock.getInstance().now() - INACTIVITY_TIMEOUT) { if (_runner.getStartedOn() < Clock.getInstance().now() - INACTIVITY_TIMEOUT) { if (_log.shouldLog(Log.WARN)) - _log.warn("HTTP client request timed out (lastActivity: " + _log.warn(getPrefix() + "HTTP client request timed out (lastActivity: " + new Date(_runner.getLastActivityOn()) + ", startedOn: " - + new Date(_runner.getLastActivityOn()) + ")"); + + new Date(_runner.getStartedOn()) + ")"); timeout(); return; } else { @@ -317,7 +319,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable } private void timeout() { - _log.info("Inactivity timeout reached"); + _log.info(getPrefix() + "Inactivity timeout reached"); l.log("Inactivity timeout reached"); if (_out != null) { try { @@ -327,10 +329,10 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable writeErrorMessage(ERR_TIMEOUT, _out, _targetRequest, _useWWWProxy, wwwProxy); } } catch (IOException ioe) { - _log.warn("Error writing out the 'timeout' message", ioe); + _log.warn(getPrefix() + "Error writing out the 'timeout' message", ioe); } } else { - _log.warn("Client disconnected before we could say we timed out"); + _log.warn(getPrefix() + "Client disconnected before we could say we timed out"); } closeSocket(s); } @@ -361,16 +363,16 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable } } - private static void handleHTTPClientException(Exception ex, OutputStream out, String targetRequest, + private void handleHTTPClientException(Exception ex, OutputStream out, String targetRequest, boolean usingWWWProxy, String wwwProxy) { if (out != null) { try { writeErrorMessage(ERR_DESTINATION_UNKNOWN, out, targetRequest, usingWWWProxy, wwwProxy); } catch (IOException ioe) { - _log.warn("Error writing out the 'destination was unknown' " + "message", ioe); + _log.warn(getPrefix() + "Error writing out the 'destination was unknown' " + "message", ioe); } } else { - _log.warn("Client disconnected before we could say that destination " + "was unknown", ex); + _log.warn(getPrefix() + "Client disconnected before we could say that destination " + "was unknown", ex); } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index 6394e925e..9f08e7156 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -18,7 +18,7 @@ import net.i2p.util.Clock; import net.i2p.util.I2PThread; import net.i2p.util.Log; -public class I2PTunnelRunner extends I2PThread { +public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorListener { private final static Log _log = new Log(I2PTunnelRunner.class); private static volatile long __runnerId; @@ -51,8 +51,9 @@ public class I2PTunnelRunner extends I2PThread { this.slock = slock; this.initialData = initialData; lastActivityOn = -1; - startedOn = -1; - _log.info("I2PTunnelRunner started"); + startedOn = Clock.getInstance().now(); + if (_log.shouldLog(Log.INFO)) + _log.info("I2PTunnelRunner started"); _runnerId = ++__runnerId; setName("I2PTunnelRunner " + _runnerId); start(); @@ -90,10 +91,10 @@ public class I2PTunnelRunner extends I2PThread { } public void run() { - startedOn = Clock.getInstance().now(); try { InputStream in = s.getInputStream(); OutputStream out = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE); + i2ps.setSocketErrorListener(this); InputStream i2pin = i2ps.getInputStream(); OutputStream i2pout = new BufferedOutputStream(i2ps.getOutputStream(), MAX_PACKET_SIZE); if (initialData != null) { @@ -134,6 +135,13 @@ public class I2PTunnelRunner extends I2PThread { } } + public void errorOccurred() { + synchronized (finishLock) { + finished = true; + finishLock.notifyAll(); + } + } + private volatile long __forwarderId = 0; private class StreamForwarder extends I2PThread { @@ -189,7 +197,8 @@ public class I2PTunnelRunner extends I2PThread { out.close(); in.close(); } catch (IOException ex) { - _log.error("Error closing streams", ex); + if (_log.shouldLog(Log.WARN)) + _log.warn("Error closing streams", ex); } synchronized (finishLock) { finished = true; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 81aef1680..1776571d5 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -15,6 +15,7 @@ import java.util.Iterator; import java.util.Properties; import net.i2p.I2PException; +import net.i2p.I2PAppContext; import net.i2p.client.I2PClient; import net.i2p.client.I2PClientFactory; import net.i2p.client.streaming.I2PServerSocket; @@ -144,15 +145,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { I2PServerSocket i2pss = sockMgr.getServerSocket(); while (true) { I2PSocket i2ps = i2pss.accept(); - //local is fast, so synchronously. Does not need that many - //threads. - try { - i2ps.setReadTimeout(readTimeout); - Socket s = new Socket(remoteHost, remotePort); - new I2PTunnelRunner(s, i2ps, slock, null); - } catch (SocketException ex) { - i2ps.close(); - } + I2PThread t = new I2PThread(new Handler(i2ps)); + t.start(); } } catch (I2PException ex) { _log.error("Error while waiting for I2PConnections", ex); @@ -160,5 +154,43 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { _log.error("Error while waiting for I2PConnections", ex); } } + + /** + * Async handler to keep .accept() from blocking too long. + * todo: replace with a thread pool so we dont get overrun by threads if/when + * receiving a lot of connection requests concurrently. + * + */ + private class Handler implements Runnable { + private I2PSocket _handleSocket; + public Handler(I2PSocket socket) { + _handleSocket = socket; + } + public void run() { + long afterAccept = I2PAppContext.getGlobalContext().clock().now(); + long afterSocket = -1; + //local is fast, so synchronously. Does not need that many + //threads. + try { + _handleSocket.setReadTimeout(readTimeout); + Socket s = new Socket(remoteHost, remotePort); + afterSocket = I2PAppContext.getGlobalContext().clock().now(); + new I2PTunnelRunner(s, _handleSocket, slock, null); + } catch (SocketException ex) { + try { + _handleSocket.close(); + } catch (IOException ioe) { + _log.error("Error while closing the received i2p con", ex); + } + } catch (IOException ex) { + _log.error("Error while waiting for I2PConnections", ex); + } + + long afterHandle = I2PAppContext.getGlobalContext().clock().now(); + long timeToHandle = afterHandle - afterAccept; + if (timeToHandle > 1000) + _log.warn("Took a while to handle the request [" + timeToHandle + ", socket create: " + (afterSocket-afterAccept) + "]"); + } + } }