factor out the clientWriterRunner and have it deal with multiple i2cp messages being enqueued really fast (at least, more efficiently, by pulling them all off at once and handling them in one pass)

This commit is contained in:
jrandom
2004-07-01 15:21:32 +00:00
committed by zzz
parent 15b1cbd762
commit 148dcc084d
2 changed files with 128 additions and 98 deletions

View File

@ -89,7 +89,7 @@ public class ClientConnectionRunner {
_dead = false;
}
private static int __id = 0;
private static volatile int __id = 0;
/**
* Actually run the connection - listen for I2CP messages and respond. This
* is the main driver for this class, though it gets all its meat from the
@ -99,7 +99,7 @@ public class ClientConnectionRunner {
public void startRunning() {
try {
_reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this));
_writer = new ClientWriterRunner();
_writer = new ClientWriterRunner(_context, this);
I2PThread t = new I2PThread(_writer);
t.setName("Writer " + ++__id);
t.setDaemon(true);
@ -352,103 +352,34 @@ public class ClientConnectionRunner {
////
////
/**
* Async writer class so that if a client app hangs, they wont take down the
* whole router with them (since otherwise the JobQueue would block until
* the client reads from their i2cp socket, causing all sorts of bad shit to
* happen)
*
*/
private class ClientWriterRunner implements Runnable {
private List _messagesToWrite;
private long _lastAdded;
public ClientWriterRunner() {
_messagesToWrite = new ArrayList(4);
}
/**
* Add this message to the writer's queue
*
*/
public void addMessage(I2CPMessage msg) {
synchronized (_messagesToWrite) {
_messagesToWrite.add(msg);
_lastAdded = _context.clock().now();
_messagesToWrite.notifyAll();
boolean getIsDead() { return _dead; }
void writeMessage(I2CPMessage msg) {
long before = _context.clock().now();
try {
synchronized (_out) {
msg.writeMessage(_out);
_out.flush();
}
}
/**
* No more messages - dont even try to send what we have
*
*/
public void stopWriting() {
synchronized (_messagesToWrite) {
_messagesToWrite.notifyAll();
}
}
public void run() {
while (!_dead) {
I2CPMessage msg = null;
long beforeCheckSync = _context.clock().now();
long inCheckSync = 0;
synchronized (_messagesToWrite) {
inCheckSync = _context.clock().now();
if (_messagesToWrite.size() > 0) {
msg = (I2CPMessage)_messagesToWrite.remove(0);
} else {
try {
_messagesToWrite.wait();
} catch (InterruptedException ie) {
if (_messagesToWrite.size() > 0)
msg = (I2CPMessage)_messagesToWrite.remove(0);
}
}
}
long afterCheckSync = _context.clock().now();
if (msg != null) {
writeMessage(msg);
long afterWriteMessage = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("writeMessage: check sync took "
+ (inCheckSync-beforeCheckSync) + "ms, writemessage took "
+ (afterWriteMessage-afterCheckSync)
+ "ms, time since addMessage(): " +
+ (afterCheckSync-_lastAdded));
}
}
}
private void writeMessage(I2CPMessage msg) {
long before = _context.clock().now();
try {
synchronized (_out) {
msg.writeMessage(_out);
_out.flush();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("after writeMessage("+ msg.getClass().getName() + "): "
+ (_context.clock().now()-before) + "ms");;
} catch (I2CPMessageException ime) {
_log.error("Message exception sending I2CP message", ime);
stopRunning();
} catch (IOException ioe) {
_log.error("IO exception sending I2CP message", ioe);
stopRunning();
} catch (Throwable t) {
_log.log(Log.CRIT, "Unhandled exception sending I2CP message", t);
stopRunning();
} finally {
long after = _context.clock().now();
long lag = after - before;
if (lag > 300) {
if (_log.shouldLog(Log.WARN))
_log.warn("synchronization on the i2cp message send took too long (" + lag
+ "ms): " + msg);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("after writeMessage("+ msg.getClass().getName() + "): "
+ (_context.clock().now()-before) + "ms");;
} catch (I2CPMessageException ime) {
_log.error("Message exception sending I2CP message", ime);
stopRunning();
} catch (IOException ioe) {
_log.error("IO exception sending I2CP message", ioe);
stopRunning();
} catch (Throwable t) {
_log.log(Log.CRIT, "Unhandled exception sending I2CP message", t);
stopRunning();
} finally {
long after = _context.clock().now();
long lag = after - before;
if (lag > 300) {
if (_log.shouldLog(Log.WARN))
_log.warn("synchronization on the i2cp message send took too long (" + lag
+ "ms): " + msg);
}
}
}

View File

@ -0,0 +1,99 @@
package net.i2p.router.client;
import java.util.List;
import java.util.ArrayList;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.data.i2cp.I2CPMessage;
/**
* Async writer class so that if a client app hangs, they wont take down the
* whole router with them (since otherwise the JobQueue would block until
* the client reads from their i2cp socket, causing all sorts of bad shit to
* happen)
*
*/
class ClientWriterRunner implements Runnable {
private List _messagesToWrite;
private volatile long _lastAdded;
private ClientConnectionRunner _runner;
private RouterContext _context;
private Log _log;
public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) {
_context = context;
_log = context.logManager().getLog(ClientWriterRunner.class);
_messagesToWrite = new ArrayList(4);
_runner = runner;
}
/**
* Add this message to the writer's queue
*
*/
public void addMessage(I2CPMessage msg) {
synchronized (_messagesToWrite) {
_messagesToWrite.add(msg);
_lastAdded = _context.clock().now();
_messagesToWrite.notifyAll();
}
}
/**
* No more messages - dont even try to send what we have
*
*/
public void stopWriting() {
synchronized (_messagesToWrite) {
_messagesToWrite.notifyAll();
}
}
public void run() {
while (!_runner.getIsDead()) {
List messages = null;
long beforeCheckSync = _context.clock().now();
long inCheckSync = 0;
int remaining = 0;
synchronized (_messagesToWrite) {
inCheckSync = _context.clock().now();
if (_messagesToWrite.size() > 0) {
messages = new ArrayList(_messagesToWrite.size());
messages.addAll(_messagesToWrite);
_messagesToWrite.clear();
} else {
try {
_messagesToWrite.wait();
} catch (InterruptedException ie) {}
if (_messagesToWrite.size() > 0) {
messages = new ArrayList(_messagesToWrite.size());
messages.addAll(_messagesToWrite);
_messagesToWrite.clear();
}
}
remaining = _messagesToWrite.size();
}
long afterCheckSync = _context.clock().now();
if (messages != null) {
for (int i = 0; i < messages.size(); i++) {
I2CPMessage msg = (I2CPMessage)messages.get(i);
_runner.writeMessage(msg);
long afterWriteMessage = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("writeMessage: check sync took "
+ (inCheckSync-beforeCheckSync) + "ms, writemessage took "
+ (afterWriteMessage-afterCheckSync)
+ "ms, time since addMessage(): "
+ (afterCheckSync-_lastAdded) + " for "
+ msg.getClass().getName() + " remaining - " + remaining);
}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("dont writeMessage: check sync took "
+ (inCheckSync-beforeCheckSync) + "ms, "
+ "time since addMessage(): "
+ (afterCheckSync-_lastAdded) + " remaining - " + remaining);
}
}
}
}