forked from I2P_Developers/i2p.i2p
* I2CP: Buffer output streams
* ClientConnectionRunner: More cleanups and edge cases
This commit is contained in:
@ -9,6 +9,7 @@ package net.i2p.router.client;
|
||||
*/
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
@ -118,7 +119,7 @@ class ClientConnectionRunner {
|
||||
_messageId = new AtomicInteger(_context.random().nextInt());
|
||||
}
|
||||
|
||||
private static volatile int __id = 0;
|
||||
private static final AtomicInteger __id = new AtomicInteger();
|
||||
|
||||
/**
|
||||
* Actually run the connection - listen for I2CP messages and respond. This
|
||||
@ -126,25 +127,25 @@ class ClientConnectionRunner {
|
||||
* {@link net.i2p.data.i2cp.I2CPMessageReader I2CPMessageReader}
|
||||
*
|
||||
*/
|
||||
public synchronized void startRunning() {
|
||||
try {
|
||||
public synchronized void startRunning() throws IOException {
|
||||
if (_dead || _reader != null)
|
||||
throw new IllegalStateException();
|
||||
_reader = new I2CPMessageReader(new BufferedInputStream(_socket.getInputStream(), BUF_SIZE),
|
||||
new ClientMessageEventListener(_context, this, true));
|
||||
_writer = new ClientWriterRunner(_context, this);
|
||||
I2PThread t = new I2PThread(_writer);
|
||||
t.setName("I2CP Writer " + ++__id);
|
||||
t.setName("I2CP Writer " + __id.incrementAndGet());
|
||||
t.setDaemon(true);
|
||||
t.setPriority(I2PThread.MAX_PRIORITY);
|
||||
t.start();
|
||||
_out = _socket.getOutputStream(); // FIXME OWCH! needs a better way so it can be final. FIXME
|
||||
_out = new BufferedOutputStream(_socket.getOutputStream());
|
||||
_reader.startReading();
|
||||
// TODO need a cleaner for unclaimed items in _messages, but we have no timestamps...
|
||||
} catch (IOException ioe) {
|
||||
_log.error("Error starting up the runner", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/** die a horrible death */
|
||||
/**
|
||||
* Die a horrible death. Cannot be restarted.
|
||||
*/
|
||||
public synchronized void stopRunning() {
|
||||
if (_dead) return;
|
||||
if (_context.router().isAlive() && _log.shouldLog(Log.WARN))
|
||||
@ -156,6 +157,7 @@ class ClientConnectionRunner {
|
||||
if (_writer != null) _writer.stopWriting();
|
||||
if (_socket != null) try { _socket.close(); } catch (IOException ioe) { }
|
||||
_messages.clear();
|
||||
_acceptedPending.clear();
|
||||
if (_sessionKeyManager != null)
|
||||
_sessionKeyManager.shutdown();
|
||||
_manager.unregisterConnection(this);
|
||||
@ -499,17 +501,22 @@ class ClientConnectionRunner {
|
||||
////
|
||||
boolean getIsDead() { return _dead; }
|
||||
|
||||
/**
|
||||
* Not thread-safe. Blocking. Only used for external sockets.
|
||||
* ClientWriterRunner thread is the only caller.
|
||||
* Others must use doSend().
|
||||
*/
|
||||
void writeMessage(I2CPMessage msg) {
|
||||
long before = _context.clock().now();
|
||||
//long before = _context.clock().now();
|
||||
try {
|
||||
// We don't still need synchronization here? isn't ClientWriterRunner the only writer?
|
||||
synchronized (_out) {
|
||||
// We don't need synchronization here, ClientWriterRunner is the only writer.
|
||||
//synchronized (_out) {
|
||||
msg.writeMessage(_out);
|
||||
_out.flush();
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("after writeMessage("+ msg.getClass().getName() + "): "
|
||||
+ (_context.clock().now()-before) + "ms");
|
||||
//}
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("after writeMessage("+ msg.getClass().getName() + "): "
|
||||
// + (_context.clock().now()-before) + "ms");
|
||||
} catch (I2CPMessageException ime) {
|
||||
_log.error("Error sending I2CP message to client", ime);
|
||||
stopRunning();
|
||||
@ -525,14 +532,14 @@ class ClientConnectionRunner {
|
||||
} catch (Throwable t) {
|
||||
_log.log(Log.CRIT, "Unhandled exception sending I2CP message to client", t);
|
||||
stopRunning();
|
||||
} finally {
|
||||
long after = _context.clock().now();
|
||||
long lag = after - before;
|
||||
if (lag > 300) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("synchronization on the i2cp message send took too long (" + lag
|
||||
+ "ms): " + msg);
|
||||
}
|
||||
//} finally {
|
||||
// long after = _context.clock().now();
|
||||
// long lag = after - before;
|
||||
// if (lag > 300) {
|
||||
// if (_log.shouldLog(Log.WARN))
|
||||
// _log.warn("synchronization on the i2cp message send took too long (" + lag
|
||||
// + "ms): " + msg);
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
@ -543,25 +550,25 @@ class ClientConnectionRunner {
|
||||
void doSend(I2CPMessage msg) throws I2CPMessageException {
|
||||
if (_out == null) throw new I2CPMessageException("Output stream is not initialized");
|
||||
if (msg == null) throw new I2CPMessageException("Null message?!");
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
if ( (_config == null) || (_config.getDestination() == null) )
|
||||
_log.debug("before doSend of a "+ msg.getClass().getName()
|
||||
+ " message on for establishing i2cp con");
|
||||
else
|
||||
_log.debug("before doSend of a "+ msg.getClass().getName()
|
||||
+ " message on for "
|
||||
+ _config.getDestination().calculateHash().toBase64());
|
||||
}
|
||||
//if (_log.shouldLog(Log.DEBUG)) {
|
||||
// if ( (_config == null) || (_config.getDestination() == null) )
|
||||
// _log.debug("before doSend of a "+ msg.getClass().getName()
|
||||
// + " message on for establishing i2cp con");
|
||||
// else
|
||||
// _log.debug("before doSend of a "+ msg.getClass().getName()
|
||||
// + " message on for "
|
||||
// + _config.getDestination().calculateHash().toBase64());
|
||||
//}
|
||||
_writer.addMessage(msg);
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
if ( (_config == null) || (_config.getDestination() == null) )
|
||||
_log.debug("after doSend of a "+ msg.getClass().getName()
|
||||
+ " message on for establishing i2cp con");
|
||||
else
|
||||
_log.debug("after doSend of a "+ msg.getClass().getName()
|
||||
+ " message on for "
|
||||
+ _config.getDestination().calculateHash().toBase64());
|
||||
}
|
||||
//if (_log.shouldLog(Log.DEBUG)) {
|
||||
// if ( (_config == null) || (_config.getDestination() == null) )
|
||||
// _log.debug("after doSend of a "+ msg.getClass().getName()
|
||||
// + " message on for establishing i2cp con");
|
||||
// else
|
||||
// _log.debug("after doSend of a "+ msg.getClass().getName()
|
||||
// + " message on for "
|
||||
// + _config.getDestination().calculateHash().toBase64());
|
||||
//}
|
||||
}
|
||||
|
||||
public int getNextMessageId() {
|
||||
|
@ -141,10 +141,15 @@ class ClientManager {
|
||||
}
|
||||
|
||||
public void registerConnection(ClientConnectionRunner runner) {
|
||||
synchronized (_pendingRunners) {
|
||||
_pendingRunners.add(runner);
|
||||
try {
|
||||
runner.startRunning();
|
||||
synchronized (_pendingRunners) {
|
||||
_pendingRunners.add(runner);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
_log.error("Error starting up the runner", ioe);
|
||||
runner.stopRunning();
|
||||
}
|
||||
runner.startRunning();
|
||||
}
|
||||
|
||||
public void unregisterConnection(ClientConnectionRunner runner) {
|
||||
|
@ -15,21 +15,22 @@ import net.i2p.util.Log;
|
||||
* the client reads from their i2cp socket, causing all sorts of bad things to
|
||||
* happen)
|
||||
*
|
||||
* For external I2CP connections only.
|
||||
*/
|
||||
class ClientWriterRunner implements Runnable {
|
||||
private final BlockingQueue<I2CPMessage> _messagesToWrite;
|
||||
private final ClientConnectionRunner _runner;
|
||||
private final Log _log;
|
||||
private final long _id;
|
||||
private static long __id = 0;
|
||||
//private final Log _log;
|
||||
//private final long _id;
|
||||
//private static long __id = 0;
|
||||
|
||||
private static final int QUEUE_SIZE = 256;
|
||||
|
||||
public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) {
|
||||
_log = context.logManager().getLog(ClientWriterRunner.class);
|
||||
//_log = context.logManager().getLog(ClientWriterRunner.class);
|
||||
_messagesToWrite = new LinkedBlockingQueue(QUEUE_SIZE);
|
||||
_runner = runner;
|
||||
_id = ++__id;
|
||||
//_id = ++__id;
|
||||
}
|
||||
|
||||
/**
|
||||
|
Reference in New Issue
Block a user