cleaned up slice processing

reduced max queued messages per connection to 10 (additional ones are immediately marked as failed)
update the I2P_FLAG byte to '*' making this NOT BACKWARDS COMPATIBLE
formatting
This commit is contained in:
jrandom
2004-05-20 10:51:22 +00:00
committed by zzz
parent 242b9a6af9
commit 60e05e270a
2 changed files with 151 additions and 145 deletions

View File

@ -15,100 +15,100 @@ class SocketCreator implements Runnable {
private boolean _established; private boolean _established;
public SocketCreator(String host, int port) { public SocketCreator(String host, int port) {
this(host, port, true); this(host, port, true);
} }
public SocketCreator(String host, int port, boolean keepOpen) { public SocketCreator(String host, int port, boolean keepOpen) {
_host = host; _host = host;
_port = port; _port = port;
_socket = null; _socket = null;
_keepOpen = keepOpen; _keepOpen = keepOpen;
_established = false; _established = false;
} }
public Socket getSocket() { return _socket; } public Socket getSocket() { return _socket; }
public boolean couldEstablish() { return _established; } public boolean couldEstablish() { return _established; }
/** the first byte sent and received must be 0x22 */ /** the first byte sent and received must be 0x2A */
public final static int I2P_FLAG = 0x22; public final static int I2P_FLAG = 0x2A;
public void run() { public void run() {
if (_keepOpen) { if (_keepOpen) {
doEstablish(); doEstablish();
} else { } else {
checkEstablish(); checkEstablish();
} }
} }
private void doEstablish() { private void doEstablish() {
try { try {
_socket = new Socket(_host, _port); _socket = new Socket(_host, _port);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Socket created"); _log.debug("Socket created");
_socket.getOutputStream().write(I2P_FLAG); _socket.getOutputStream().write(I2P_FLAG);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P flag sent"); _log.debug("I2P flag sent");
int val = _socket.getInputStream().read(); int val = _socket.getInputStream().read();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]");
if (val != I2P_FLAG) { if (val != I2P_FLAG) {
_socket.close(); _socket.close();
_socket = null; _socket = null;
} }
return; return;
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection to " + _host + ':' + _port, uhe); _log.warn("Error establishing connection to " + _host + ':' + _port, uhe);
return; return;
} catch (IOException ioe) { } catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Error establishing", ioe); _log.debug("Error establishing", ioe);
_socket = null; _socket = null;
return; return;
} finally { } finally {
synchronized (this) { synchronized (this) {
notifyAll(); notifyAll();
} }
} }
} }
/** /**
* Try to establish the connection, but don't actually send the I2P flag. The * Try to establish the connection, but don't actually send the I2P flag. The
* other side will timeout waiting for it and consider it a dropped connection, * other side will timeout waiting for it and consider it a dropped connection,
* but since they will have sent us the I2P flag first we will still know they are * but since they will have sent us the I2P flag first we will still know they are
* reachable. * reachable.
* *
*/ */
private void checkEstablish() { private void checkEstablish() {
try { try {
_socket = new Socket(_host, _port); _socket = new Socket(_host, _port);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Socket created (but we're not sending the flag, since we're just testing them)"); _log.debug("Socket created (but we're not sending the flag, since we're just testing them)");
int val = _socket.getInputStream().read(); int val = _socket.getInputStream().read();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]");
_socket.close(); _socket.close();
_socket = null; _socket = null;
_established = (val == I2P_FLAG); _established = (val == I2P_FLAG);
return; return;
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection to " + _host + ':' + _port, uhe); _log.warn("Error establishing connection to " + _host + ':' + _port, uhe);
return; return;
} catch (IOException ioe) { } catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Error establishing", ioe); _log.debug("Error establishing", ioe);
_socket = null; _socket = null;
return; return;
} finally { } finally {
synchronized (this) { synchronized (this) {
notifyAll(); notifyAll();
} }
} }
} }
} }

View File

@ -69,7 +69,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
protected RouterContext _context; protected RouterContext _context;
public final static String PARAM_MAX_QUEUED_MESSAGES = "i2np.tcp.maxQueuedMessages"; public final static String PARAM_MAX_QUEUED_MESSAGES = "i2np.tcp.maxQueuedMessages";
private final static int DEFAULT_MAX_QUEUED_MESSAGES = 20; private final static int DEFAULT_MAX_QUEUED_MESSAGES = 10;
public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) { public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) {
_context = context; _context = context;
@ -272,7 +272,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
if (fail) { if (fail) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64()); _log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64() + ": " + totalPending);
msg.timestamp("TCPConnection.addMessage exceeded max queued"); msg.timestamp("TCPConnection.addMessage exceeded max queued");
_transport.afterSend(msg, false); _transport.afterSend(msg, false);
@ -415,26 +415,24 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
private boolean _running; private boolean _running;
public void run() { public void run() {
_running = true; _running = true;
try { while (_running) {
while (_running) { long startSlice = _context.clock().now();
long startSlice = _context.clock().now(); _lastSliceRun = startSlice;
_lastSliceRun = startSlice; boolean processOk = processSlice();
processSlice(); if (!processOk) {
long endSlice = _context.clock().now(); closeConnection();
return;
} }
} catch (IOException ioe) { long endSlice = _context.clock().now();
if (_log.shouldLog(Log.ERROR))
_log.error("Connection runner failed with an IO exception to "
+ _remoteIdentity.getHash().toBase64(), ioe);
closeConnection();
} catch (Throwable t) {
if (_log.shouldLog(Log.ERROR))
_log.error("Somehow we ran into an uncaught exception running the connection!", t);
closeConnection();
} }
} }
private void processSlice() throws IOException { /**
* Process a slice (push a message if available).
*
* @return true if the operation succeeded, false if there was a critical error
*/
private boolean processSlice() {
long start = _context.clock().now(); long start = _context.clock().now();
OutNetMessage msg = null; OutNetMessage msg = null;
@ -451,7 +449,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
remaining = _toBeSent.size(); remaining = _toBeSent.size();
if (remaining <= 0) return; if (remaining <= 0) return true;
msg = (OutNetMessage)_toBeSent.remove(0); msg = (OutNetMessage)_toBeSent.remove(0);
remaining--; remaining--;
if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) { if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) {
@ -469,7 +467,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
+ _remoteIdentity.getHash().toBase64() + ": " + msg); + _remoteIdentity.getHash().toBase64() + ": " + msg);
msg.timestamp("TCPConnection.runner.processSlice expired"); msg.timestamp("TCPConnection.runner.processSlice expired");
_transport.afterSend(msg, false); _transport.afterSend(msg, false);
return; return true;
} }
if (remaining > 0) { if (remaining > 0) {
@ -484,62 +482,70 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
msg.timestamp("TCPConnection.runner.processSlice fetched"); msg.timestamp("TCPConnection.runner.processSlice fetched");
//_log.debug("Processing slice - msg to be sent"); //_log.debug("Processing slice - msg to be sent");
try { byte data[] = msg.getMessageData();
byte data[] = msg.getMessageData(); if (data == null) {
msg.timestamp("TCPConnection.runner.processSlice before sending " if (_log.shouldLog(Log.ERROR))
+ data.length + " bytes"); _log.error("wtf, for some reason, an I2NPMessage couldn't be serialized...");
if (_log.shouldLog(Log.DEBUG)) return true;
_log.debug("Sending " + data.length + " bytes in the slice... to " }
+ _remoteIdentity.getHash().toBase64()); msg.timestamp("TCPConnection.runner.processSlice before sending "
+ data.length + " bytes");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + data.length + " bytes in the slice... to "
+ _remoteIdentity.getHash().toBase64());
try {
synchronized (_out) { synchronized (_out) {
_out.write(data); _out.write(data);
_out.flush(); _out.flush();
} }
msg.timestamp("TCPConnection.runner.processSlice sent and flushed");
long end = _context.clock().now();
long timeLeft = msg.getMessage().getMessageExpiration().getTime() - end;
if (_log.shouldLog(Log.INFO))
_log.info("Message " + msg.getMessage().getClass().getName()
+ " (expiring in " + timeLeft + "ms) sent to "
+ _remoteIdentity.getHash().toBase64() + " from "
+ _context.routerHash().toBase64()
+ " over connection " + _id + " with " + data.length
+ " bytes in " + (end - start) + "ms");
if (timeLeft < 10*1000) {
if (_log.shouldLog(Log.DEBUG))
_log.warn("Very little time left... time to send [" + (end-start)
+ "] time left [" + timeLeft + "] to "
+ _remoteIdentity.getHash().toBase64() + "\n" + msg.toString(),
msg.getCreatedBy());
}
long lifetime = msg.getLifetime();
if (lifetime > 10*1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("The processing of the message took way too long (" + lifetime
+ "ms) - time to send (" + (end-start) + "), time left (" + timeLeft
+ ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString());
}
_transport.afterSend(msg, true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing slice - message sent completely: "
+ msg.getMessage().getClass().getName() + " to "
+ _remoteIdentity.getHash().toBase64());
if (end - afterExpire > 1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("Actual sending took too long ( " + (end-afterExpire)
+ "ms) sending " + data.length + " bytes to "
+ _remoteIdentity.getHash().toBase64());
}
} catch (IOException ioe) { } catch (IOException ioe) {
msg.timestamp("TCPConnection.runner.processSlice failed to send/flushflushed"); if (_log.shouldLog(Log.ERROR))
_transport.afterSend(msg, false); _log.error("IO error writing out a " + data.length + " byte message to "
throw ioe; + _remoteIdentity.getHash().toBase64());
return false;
} }
}
msg.timestamp("TCPConnection.runner.processSlice sent and flushed");
long end = _context.clock().now();
long timeLeft = msg.getMessage().getMessageExpiration().getTime() - end;
if (_log.shouldLog(Log.INFO))
_log.info("Message " + msg.getMessage().getClass().getName()
+ " (expiring in " + timeLeft + "ms) sent to "
+ _remoteIdentity.getHash().toBase64() + " from "
+ _context.routerHash().toBase64()
+ " over connection " + _id + " with " + data.length
+ " bytes in " + (end - start) + "ms");
if (timeLeft < 10*1000) {
if (_log.shouldLog(Log.DEBUG))
_log.warn("Very little time left... time to send [" + (end-start)
+ "] time left [" + timeLeft + "] to "
+ _remoteIdentity.getHash().toBase64() + "\n" + msg.toString(),
msg.getCreatedBy());
}
long lifetime = msg.getLifetime();
if (lifetime > 10*1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("The processing of the message took way too long (" + lifetime
+ "ms) - time to send (" + (end-start) + "), time left (" + timeLeft
+ ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString());
}
_transport.afterSend(msg, true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing slice - message sent completely: "
+ msg.getMessage().getClass().getName() + " to "
+ _remoteIdentity.getHash().toBase64());
if (end - afterExpire > 1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("Actual sending took too long ( " + (end-afterExpire)
+ "ms) sending " + data.length + " bytes to "
+ _remoteIdentity.getHash().toBase64());
}
}
return true;
} }
public void stopRunning() { public void stopRunning() {