minor rewrite to make timing more precise (keeping a map of message add times, not just the 'last' add time)

This commit is contained in:
jrandom
2004-07-02 15:12:35 +00:00
committed by zzz
parent aae9f671f0
commit c636b0a0ec

View File

@ -15,15 +15,27 @@ import net.i2p.data.i2cp.I2CPMessage;
*/ */
class ClientWriterRunner implements Runnable { class ClientWriterRunner implements Runnable {
private List _messagesToWrite; private List _messagesToWrite;
private volatile long _lastAdded; private List _messagesToWriteTimes;
private ClientConnectionRunner _runner; private ClientConnectionRunner _runner;
private RouterContext _context; private RouterContext _context;
private Log _log; private Log _log;
private long _id;
private static long __id = 0;
private static final long MAX_WAIT = 5*1000;
/** notify this lock when there are messages to write */
private Object _activityLock = new Object();
/** lock on this when updating the class level data structs */
private Object _dataLock = new Object();
public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) { public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) {
_context = context; _context = context;
_log = context.logManager().getLog(ClientWriterRunner.class); _log = context.logManager().getLog(ClientWriterRunner.class);
_messagesToWrite = new ArrayList(4); _messagesToWrite = new ArrayList(4);
_messagesToWriteTimes = new ArrayList(4);
_runner = runner; _runner = runner;
_id = ++__id;
} }
/** /**
@ -31,11 +43,15 @@ class ClientWriterRunner implements Runnable {
* *
*/ */
public void addMessage(I2CPMessage msg) { public void addMessage(I2CPMessage msg) {
synchronized (_messagesToWrite) { synchronized (_dataLock) {
_messagesToWrite.add(msg); _messagesToWrite.add(msg);
_lastAdded = _context.clock().now(); _messagesToWriteTimes.add(new Long(_context.clock().now()));
_messagesToWrite.notifyAll();
} }
synchronized (_activityLock) {
_activityLock.notifyAll();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("["+_id+"] addMessage completed for " + msg.getClass().getName());
} }
/** /**
@ -43,56 +59,45 @@ class ClientWriterRunner implements Runnable {
* *
*/ */
public void stopWriting() { public void stopWriting() {
synchronized (_messagesToWrite) { synchronized (_activityLock) {
_messagesToWrite.notifyAll(); _activityLock.notifyAll();
} }
} }
public void run() { public void run() {
while (!_runner.getIsDead()) { while (!_runner.getIsDead()) {
List messages = null; List messages = null;
long beforeCheckSync = _context.clock().now(); List messageTimes = null;
long inCheckSync = 0;
int remaining = 0; synchronized (_dataLock) {
synchronized (_messagesToWrite) {
inCheckSync = _context.clock().now();
if (_messagesToWrite.size() > 0) { if (_messagesToWrite.size() > 0) {
messages = new ArrayList(_messagesToWrite.size()); messages = new ArrayList(_messagesToWrite.size());
messageTimes = new ArrayList(_messagesToWriteTimes.size());
messages.addAll(_messagesToWrite); messages.addAll(_messagesToWrite);
messageTimes.addAll(_messagesToWriteTimes);
_messagesToWrite.clear(); _messagesToWrite.clear();
} else { _messagesToWriteTimes.clear();
}
}
if (messages == null) {
try { try {
_messagesToWrite.wait(); synchronized (_activityLock) {
} catch (InterruptedException ie) {} _activityLock.wait();
if (_messagesToWrite.size() > 0) {
messages = new ArrayList(_messagesToWrite.size());
messages.addAll(_messagesToWrite);
_messagesToWrite.clear();
} }
} catch (InterruptedException ie) {
if (_log.shouldLog(Log.WARN))
_log.warn("Interrupted while waiting for activity", ie);
} }
remaining = _messagesToWrite.size(); } else {
}
long afterCheckSync = _context.clock().now();
if (messages != null) {
for (int i = 0; i < messages.size(); i++) { for (int i = 0; i < messages.size(); i++) {
I2CPMessage msg = (I2CPMessage)messages.get(i); I2CPMessage msg = (I2CPMessage)messages.get(i);
Long when = (Long)messageTimes.get(i);
_runner.writeMessage(msg); _runner.writeMessage(msg);
long afterWriteMessage = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("writeMessage: check sync took " _log.debug("["+_id+"] writeMessage time since addMessage(): "
+ (inCheckSync-beforeCheckSync) + "ms, writemessage took " + (_context.clock().now()-when.longValue()) + " for "
+ (afterWriteMessage-afterCheckSync) + msg.getClass().getName());
+ "ms, time since addMessage(): "
+ (afterCheckSync-_lastAdded) + " for "
+ msg.getClass().getName() + " remaining - " + remaining);
} }
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("dont writeMessage: check sync took "
+ (inCheckSync-beforeCheckSync) + "ms, "
+ "time since addMessage(): "
+ (afterCheckSync-_lastAdded) + " remaining - " + remaining);
} }
} }
} }