From 60e05e270a8c188a5f1a42cc525d240840ac701a Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 20 May 2004 10:51:22 +0000 Subject: [PATCH] cleaned up slice processing reduced max queued messages per connection to 10 (additional ones are immediately marked as failed) update the I2P_FLAG byte to '*' making this NOT BACKWARDS COMPATIBLE formatting --- .../router/transport/tcp/SocketCreator.java | 152 +++++++++--------- .../router/transport/tcp/TCPConnection.java | 144 +++++++++-------- 2 files changed, 151 insertions(+), 145 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java b/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java index b8c7ac14e..61497c0ab 100644 --- a/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java +++ b/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java @@ -15,100 +15,100 @@ class SocketCreator implements Runnable { private boolean _established; public SocketCreator(String host, int port) { - this(host, port, true); + this(host, port, true); } public SocketCreator(String host, int port, boolean keepOpen) { - _host = host; - _port = port; - _socket = null; - _keepOpen = keepOpen; - _established = false; + _host = host; + _port = port; + _socket = null; + _keepOpen = keepOpen; + _established = false; } - + public Socket getSocket() { return _socket; } - + public boolean couldEstablish() { return _established; } - /** the first byte sent and received must be 0x22 */ - public final static int I2P_FLAG = 0x22; + /** the first byte sent and received must be 0x2A */ + public final static int I2P_FLAG = 0x2A; public void run() { - if (_keepOpen) { - doEstablish(); - } else { - checkEstablish(); - } + if (_keepOpen) { + doEstablish(); + } else { + checkEstablish(); + } } private void doEstablish() { - try { - _socket = new Socket(_host, _port); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Socket created"); - _socket.getOutputStream().write(I2P_FLAG); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("I2P flag sent"); - int val = _socket.getInputStream().read(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); - if (val != I2P_FLAG) { - _socket.close(); - _socket = null; - } - return; - } catch (UnknownHostException uhe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error establishing connection to " + _host + ':' + _port, uhe); - return; - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Error establishing", ioe); - _socket = null; - return; - } finally { - synchronized (this) { - notifyAll(); - } - } + try { + _socket = new Socket(_host, _port); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Socket created"); + _socket.getOutputStream().write(I2P_FLAG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("I2P flag sent"); + int val = _socket.getInputStream().read(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); + if (val != I2P_FLAG) { + _socket.close(); + _socket = null; + } + return; + } catch (UnknownHostException uhe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error establishing connection to " + _host + ':' + _port, uhe); + return; + } catch (IOException ioe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Error establishing", ioe); + _socket = null; + return; + } finally { + synchronized (this) { + notifyAll(); + } + } } /** * Try to establish the connection, but don't actually send the I2P flag. The * other side will timeout waiting for it and consider it a dropped connection, * but since they will have sent us the I2P flag first we will still know they are - * reachable. - * + * reachable. + * */ private void checkEstablish() { - try { - _socket = new Socket(_host, _port); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Socket created (but we're not sending the flag, since we're just testing them)"); - int val = _socket.getInputStream().read(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); - - _socket.close(); - _socket = null; - _established = (val == I2P_FLAG); - return; - } catch (UnknownHostException uhe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error establishing connection to " + _host + ':' + _port, uhe); - return; - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Error establishing", ioe); - _socket = null; - return; - } finally { - synchronized (this) { - notifyAll(); - } - } + try { + _socket = new Socket(_host, _port); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Socket created (but we're not sending the flag, since we're just testing them)"); + int val = _socket.getInputStream().read(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); + + _socket.close(); + _socket = null; + _established = (val == I2P_FLAG); + return; + } catch (UnknownHostException uhe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error establishing connection to " + _host + ':' + _port, uhe); + return; + } catch (IOException ioe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Error establishing", ioe); + _socket = null; + return; + } finally { + synchronized (this) { + notifyAll(); + } + } } } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index 7cf428bd7..9477ffe00 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -69,7 +69,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { protected RouterContext _context; public final static String PARAM_MAX_QUEUED_MESSAGES = "i2np.tcp.maxQueuedMessages"; - private final static int DEFAULT_MAX_QUEUED_MESSAGES = 20; + private final static int DEFAULT_MAX_QUEUED_MESSAGES = 10; public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) { _context = context; @@ -272,7 +272,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { if (fail) { if (_log.shouldLog(Log.ERROR)) - _log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64()); + _log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64() + ": " + totalPending); msg.timestamp("TCPConnection.addMessage exceeded max queued"); _transport.afterSend(msg, false); @@ -415,26 +415,24 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { private boolean _running; public void run() { _running = true; - try { - while (_running) { - long startSlice = _context.clock().now(); - _lastSliceRun = startSlice; - processSlice(); - long endSlice = _context.clock().now(); + while (_running) { + long startSlice = _context.clock().now(); + _lastSliceRun = startSlice; + boolean processOk = processSlice(); + if (!processOk) { + closeConnection(); + return; } - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Connection runner failed with an IO exception to " - + _remoteIdentity.getHash().toBase64(), ioe); - closeConnection(); - } catch (Throwable t) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Somehow we ran into an uncaught exception running the connection!", t); - closeConnection(); + long endSlice = _context.clock().now(); } } - private void processSlice() throws IOException { + /** + * Process a slice (push a message if available). + * + * @return true if the operation succeeded, false if there was a critical error + */ + private boolean processSlice() { long start = _context.clock().now(); OutNetMessage msg = null; @@ -451,7 +449,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { } catch (InterruptedException ie) {} } remaining = _toBeSent.size(); - if (remaining <= 0) return; + if (remaining <= 0) return true; msg = (OutNetMessage)_toBeSent.remove(0); remaining--; if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) { @@ -469,7 +467,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { + _remoteIdentity.getHash().toBase64() + ": " + msg); msg.timestamp("TCPConnection.runner.processSlice expired"); _transport.afterSend(msg, false); - return; + return true; } if (remaining > 0) { @@ -484,62 +482,70 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { msg.timestamp("TCPConnection.runner.processSlice fetched"); //_log.debug("Processing slice - msg to be sent"); - try { - byte data[] = msg.getMessageData(); - msg.timestamp("TCPConnection.runner.processSlice before sending " - + data.length + " bytes"); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Sending " + data.length + " bytes in the slice... to " - + _remoteIdentity.getHash().toBase64()); + byte data[] = msg.getMessageData(); + if (data == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("wtf, for some reason, an I2NPMessage couldn't be serialized..."); + return true; + } + msg.timestamp("TCPConnection.runner.processSlice before sending " + + data.length + " bytes"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending " + data.length + " bytes in the slice... to " + + _remoteIdentity.getHash().toBase64()); + try { synchronized (_out) { _out.write(data); _out.flush(); } - - msg.timestamp("TCPConnection.runner.processSlice sent and flushed"); - long end = _context.clock().now(); - long timeLeft = msg.getMessage().getMessageExpiration().getTime() - end; - if (_log.shouldLog(Log.INFO)) - _log.info("Message " + msg.getMessage().getClass().getName() - + " (expiring in " + timeLeft + "ms) sent to " - + _remoteIdentity.getHash().toBase64() + " from " - + _context.routerHash().toBase64() - + " over connection " + _id + " with " + data.length - + " bytes in " + (end - start) + "ms"); - if (timeLeft < 10*1000) { - if (_log.shouldLog(Log.DEBUG)) - _log.warn("Very little time left... time to send [" + (end-start) - + "] time left [" + timeLeft + "] to " - + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString(), - msg.getCreatedBy()); - } - - long lifetime = msg.getLifetime(); - if (lifetime > 10*1000) { - if (_log.shouldLog(Log.WARN)) - _log.warn("The processing of the message took way too long (" + lifetime - + "ms) - time to send (" + (end-start) + "), time left (" + timeLeft - + ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString()); - } - _transport.afterSend(msg, true); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Processing slice - message sent completely: " - + msg.getMessage().getClass().getName() + " to " - + _remoteIdentity.getHash().toBase64()); - if (end - afterExpire > 1000) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Actual sending took too long ( " + (end-afterExpire) - + "ms) sending " + data.length + " bytes to " - + _remoteIdentity.getHash().toBase64()); - } } catch (IOException ioe) { - msg.timestamp("TCPConnection.runner.processSlice failed to send/flushflushed"); - _transport.afterSend(msg, false); - throw ioe; + if (_log.shouldLog(Log.ERROR)) + _log.error("IO error writing out a " + data.length + " byte message to " + + _remoteIdentity.getHash().toBase64()); + return false; } - } + + msg.timestamp("TCPConnection.runner.processSlice sent and flushed"); + long end = _context.clock().now(); + + long timeLeft = msg.getMessage().getMessageExpiration().getTime() - end; + if (_log.shouldLog(Log.INFO)) + _log.info("Message " + msg.getMessage().getClass().getName() + + " (expiring in " + timeLeft + "ms) sent to " + + _remoteIdentity.getHash().toBase64() + " from " + + _context.routerHash().toBase64() + + " over connection " + _id + " with " + data.length + + " bytes in " + (end - start) + "ms"); + if (timeLeft < 10*1000) { + if (_log.shouldLog(Log.DEBUG)) + _log.warn("Very little time left... time to send [" + (end-start) + + "] time left [" + timeLeft + "] to " + + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString(), + msg.getCreatedBy()); + } + + long lifetime = msg.getLifetime(); + if (lifetime > 10*1000) { + if (_log.shouldLog(Log.WARN)) + _log.warn("The processing of the message took way too long (" + lifetime + + "ms) - time to send (" + (end-start) + "), time left (" + timeLeft + + ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString()); + } + _transport.afterSend(msg, true); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Processing slice - message sent completely: " + + msg.getMessage().getClass().getName() + " to " + + _remoteIdentity.getHash().toBase64()); + if (end - afterExpire > 1000) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Actual sending took too long ( " + (end-afterExpire) + + "ms) sending " + data.length + " bytes to " + + _remoteIdentity.getHash().toBase64()); + } + } + return true; } public void stopRunning() {