* allow the client subsystem to tell the clientMessagePool that a message is definitely remote (since the client subsystem should know). this reduces the churn of the message pool asking all over again
* add a new ClientWriterRunner thread (1 per I2CP connection) so that a client application that hangs or otherwise doesn't read from its i2cp socket quickly doesn't hang the whole router (since we've previously used the jobQueue for pushing I2CP messages). This may or may not clear the intermittent eepsite bug, but I'm not counting on it to (yet). * update various points to deal with the client writer's operation (aka doSend won't throw IOException) * logging * lots and lots of metrics (yeah i know some of them vary based on the compiler)
This commit is contained in:
@ -35,12 +35,26 @@ public class ClientMessagePool {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public void add(ClientMessage msg) {
|
public void add(ClientMessage msg) {
|
||||||
if ( (_context.clientManager().isLocal(msg.getDestination())) ||
|
add(msg, false);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* If we're coming from the client subsystem itself, we already know whether
|
||||||
|
* the target is definitely remote and as such don't need to recheck
|
||||||
|
* ourselves, but if we aren't certain, we want it to check for us.
|
||||||
|
*
|
||||||
|
* @param isDefinitelyRemote true if we know for sure that the target is not local
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void add(ClientMessage msg, boolean isDefinitelyRemote) {
|
||||||
|
if ( !isDefinitelyRemote ||
|
||||||
|
(_context.clientManager().isLocal(msg.getDestination())) ||
|
||||||
(_context.clientManager().isLocal(msg.getDestinationHash())) ) {
|
(_context.clientManager().isLocal(msg.getDestinationHash())) ) {
|
||||||
_log.debug("Adding message for local delivery");
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Adding message for local delivery");
|
||||||
_context.clientManager().messageReceived(msg);
|
_context.clientManager().messageReceived(msg);
|
||||||
} else {
|
} else {
|
||||||
_log.debug("Adding message for remote delivery");
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Adding message for remote delivery");
|
||||||
_context.jobQueue().addJob(new OutboundClientMessageJob(_context, msg));
|
_context.jobQueue().addJob(new OutboundClientMessageJob(_context, msg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,7 +13,7 @@ import java.io.OutputStream;
|
|||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -35,6 +35,7 @@ import net.i2p.router.JobImpl;
|
|||||||
import net.i2p.router.RouterContext;
|
import net.i2p.router.RouterContext;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
import net.i2p.util.RandomSource;
|
import net.i2p.util.RandomSource;
|
||||||
|
import net.i2p.util.I2PThread;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bridge the router and the client - managing state for a client.
|
* Bridge the router and the client - managing state for a client.
|
||||||
@ -68,6 +69,7 @@ public class ClientConnectionRunner {
|
|||||||
* delivered to the client (so that we can be sure only to update when necessary)
|
* delivered to the client (so that we can be sure only to update when necessary)
|
||||||
*/
|
*/
|
||||||
private List _alreadyProcessed;
|
private List _alreadyProcessed;
|
||||||
|
private ClientWriterRunner _writer;
|
||||||
/** are we, uh, dead */
|
/** are we, uh, dead */
|
||||||
private boolean _dead;
|
private boolean _dead;
|
||||||
|
|
||||||
@ -82,11 +84,12 @@ public class ClientConnectionRunner {
|
|||||||
_socket = socket;
|
_socket = socket;
|
||||||
_config = null;
|
_config = null;
|
||||||
_messages = new HashMap();
|
_messages = new HashMap();
|
||||||
_alreadyProcessed = new LinkedList();
|
_alreadyProcessed = new ArrayList();
|
||||||
_acceptedPending = new HashSet();
|
_acceptedPending = new HashSet();
|
||||||
_dead = false;
|
_dead = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static int __id = 0;
|
||||||
/**
|
/**
|
||||||
* Actually run the connection - listen for I2CP messages and respond. This
|
* Actually run the connection - listen for I2CP messages and respond. This
|
||||||
* is the main driver for this class, though it gets all its meat from the
|
* is the main driver for this class, though it gets all its meat from the
|
||||||
@ -96,6 +99,12 @@ public class ClientConnectionRunner {
|
|||||||
public void startRunning() {
|
public void startRunning() {
|
||||||
try {
|
try {
|
||||||
_reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this));
|
_reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this));
|
||||||
|
_writer = new ClientWriterRunner();
|
||||||
|
I2PThread t = new I2PThread(_writer);
|
||||||
|
t.setName("Writer " + ++__id);
|
||||||
|
t.setDaemon(true);
|
||||||
|
t.setPriority(I2PThread.MIN_PRIORITY);
|
||||||
|
t.start();
|
||||||
_out = _socket.getOutputStream();
|
_out = _socket.getOutputStream();
|
||||||
_reader.startReading();
|
_reader.startReading();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
@ -112,6 +121,7 @@ public class ClientConnectionRunner {
|
|||||||
_dead = true;
|
_dead = true;
|
||||||
// we need these keys to unpublish the leaseSet
|
// we need these keys to unpublish the leaseSet
|
||||||
if (_reader != null) _reader.stopReading();
|
if (_reader != null) _reader.stopReading();
|
||||||
|
if (_writer != null) _writer.stopWriting();
|
||||||
if (_socket != null) try { _socket.close(); } catch (IOException ioe) { }
|
if (_socket != null) try { _socket.close(); } catch (IOException ioe) { }
|
||||||
synchronized (_messages) {
|
synchronized (_messages) {
|
||||||
_messages.clear();
|
_messages.clear();
|
||||||
@ -143,9 +153,50 @@ public class ClientConnectionRunner {
|
|||||||
/** already closed? */
|
/** already closed? */
|
||||||
boolean isDead() { return _dead; }
|
boolean isDead() { return _dead; }
|
||||||
/** message body */
|
/** message body */
|
||||||
Payload getPayload(MessageId id) { synchronized (_messages) { return (Payload)_messages.get(id); } }
|
Payload getPayload(MessageId id) {
|
||||||
void setPayload(MessageId id, Payload payload) { synchronized (_messages) { _messages.put(id, payload); } }
|
Payload rv = null;
|
||||||
void removePayload(MessageId id) { synchronized (_messages) { _messages.remove(id); } }
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
|
synchronized (_messages) {
|
||||||
|
inLock = _context.clock().now();
|
||||||
|
rv = (Payload)_messages.get(id);
|
||||||
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("alreadyAccepted.locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
void setPayload(MessageId id, Payload payload) {
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
|
synchronized (_messages) {
|
||||||
|
inLock = _context.clock().now();
|
||||||
|
_messages.put(id, payload);
|
||||||
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("setPayload.locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void removePayload(MessageId id) {
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
|
synchronized (_messages) {
|
||||||
|
inLock = _context.clock().now();
|
||||||
|
_messages.remove(id);
|
||||||
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("removePayload.locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void sessionEstablished(SessionConfig config) {
|
void sessionEstablished(SessionConfig config) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
@ -183,8 +234,6 @@ public class ClientConnectionRunner {
|
|||||||
doSend(msg);
|
doSend(msg);
|
||||||
} catch (I2CPMessageException ime) {
|
} catch (I2CPMessageException ime) {
|
||||||
_log.error("Error writing out the disconnect message", ime);
|
_log.error("Error writing out the disconnect message", ime);
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.error("Error writing out the disconnect message", ioe);
|
|
||||||
}
|
}
|
||||||
stopRunning();
|
stopRunning();
|
||||||
}
|
}
|
||||||
@ -200,15 +249,31 @@ public class ClientConnectionRunner {
|
|||||||
Destination dest = message.getDestination();
|
Destination dest = message.getDestination();
|
||||||
MessageId id = new MessageId();
|
MessageId id = new MessageId();
|
||||||
id.setMessageId(getNextMessageId());
|
id.setMessageId(getNextMessageId());
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
synchronized (_acceptedPending) {
|
synchronized (_acceptedPending) {
|
||||||
|
inLock = _context.clock().now();
|
||||||
_acceptedPending.add(id);
|
_acceptedPending.add(id);
|
||||||
}
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("distributeMessage.locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("** Recieving message [" + id.getMessageId() + "] with payload of size ["
|
_log.debug("** Recieving message [" + id.getMessageId() + "] with payload of size ["
|
||||||
+ payload.getSize() + "]" + " for session [" + _sessionId.getSessionId()
|
+ payload.getSize() + "]" + " for session [" + _sessionId.getSessionId()
|
||||||
+ "]");
|
+ "]");
|
||||||
|
long beforeDistribute = _context.clock().now();
|
||||||
// the following blocks as described above
|
// the following blocks as described above
|
||||||
_manager.distributeMessage(_config.getDestination(), message.getDestination(), message.getPayload(), id);
|
_manager.distributeMessage(_config.getDestination(), message.getDestination(), message.getPayload(), id);
|
||||||
|
long timeToDistribute = _context.clock().now() - beforeDistribute;
|
||||||
|
if (timeToDistribute > 50)
|
||||||
|
_log.warn("Took too long to distribute in the manager to "
|
||||||
|
+ message.getDestination().calculateHash().toBase64() + ": "
|
||||||
|
+ timeToDistribute);
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,13 +294,20 @@ public class ClientConnectionRunner {
|
|||||||
status.setStatus(MessageStatusMessage.STATUS_SEND_ACCEPTED);
|
status.setStatus(MessageStatusMessage.STATUS_SEND_ACCEPTED);
|
||||||
try {
|
try {
|
||||||
doSend(status);
|
doSend(status);
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
synchronized (_acceptedPending) {
|
synchronized (_acceptedPending) {
|
||||||
|
inLock = _context.clock().now();
|
||||||
_acceptedPending.remove(id);
|
_acceptedPending.remove(id);
|
||||||
}
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("ackSendMessage.locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
} catch (I2CPMessageException ime) {
|
} catch (I2CPMessageException ime) {
|
||||||
_log.error("Error writing out the message status message", ime);
|
_log.error("Error writing out the message status message", ime);
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.error("Error writing out the message status message", ioe);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -282,11 +354,96 @@ public class ClientConnectionRunner {
|
|||||||
////
|
////
|
||||||
////
|
////
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Async writer class so that if a client app hangs, they wont take down the
|
||||||
|
* whole router with them (since otherwise the JobQueue would block until
|
||||||
|
* the client reads from their i2cp socket, causing all sorts of bad shit to
|
||||||
|
* happen)
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private class ClientWriterRunner implements Runnable {
|
||||||
|
private List _messagesToWrite;
|
||||||
|
public ClientWriterRunner() {
|
||||||
|
_messagesToWrite = new ArrayList(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add this message to the writer's queue
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void addMessage(I2CPMessage msg) {
|
||||||
|
synchronized (_messagesToWrite) {
|
||||||
|
_messagesToWrite.add(msg);
|
||||||
|
_messagesToWrite.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* No more messages - dont even try to send what we have
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void stopWriting() {
|
||||||
|
synchronized (_messagesToWrite) {
|
||||||
|
_messagesToWrite.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public void run() {
|
||||||
|
while (!_dead) {
|
||||||
|
I2CPMessage msg = null;
|
||||||
|
synchronized (_messagesToWrite) {
|
||||||
|
if (_messagesToWrite.size() > 0) {
|
||||||
|
// we do this test before and after wait, in case more than
|
||||||
|
// one message gets enqueued
|
||||||
|
msg = (I2CPMessage)_messagesToWrite.remove(0);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
_messagesToWrite.wait();
|
||||||
|
} catch (InterruptedException ie) {}
|
||||||
|
|
||||||
|
if (_messagesToWrite.size() > 0)
|
||||||
|
msg = (I2CPMessage)_messagesToWrite.remove(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (msg != null)
|
||||||
|
writeMessage(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeMessage(I2CPMessage msg) {
|
||||||
|
long before = _context.clock().now();
|
||||||
|
try {
|
||||||
|
synchronized (_out) {
|
||||||
|
msg.writeMessage(_out);
|
||||||
|
_out.flush();
|
||||||
|
}
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("after doSend of a "+ msg.getClass().getName() + " message");
|
||||||
|
} catch (I2CPMessageException ime) {
|
||||||
|
_log.error("Message exception sending I2CP message", ime);
|
||||||
|
stopRunning();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
_log.error("IO exception sending I2CP message", ioe);
|
||||||
|
stopRunning();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
_log.log(Log.CRIT, "Unhandled exception sending I2CP message", t);
|
||||||
|
stopRunning();
|
||||||
|
} finally {
|
||||||
|
long after = _context.clock().now();
|
||||||
|
long lag = after - before;
|
||||||
|
if (lag > 300) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn("synchronization on the i2cp message send took too long (" + lag
|
||||||
|
+ "ms): " + msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Actually send the I2CPMessage to the peer through the socket
|
* Actually send the I2CPMessage to the peer through the socket
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
void doSend(I2CPMessage msg) throws I2CPMessageException, IOException {
|
void doSend(I2CPMessage msg) throws I2CPMessageException {
|
||||||
if (_out == null) throw new I2CPMessageException("Output stream is not initialized");
|
if (_out == null) throw new I2CPMessageException("Output stream is not initialized");
|
||||||
if (msg == null) throw new I2CPMessageException("Null message?!");
|
if (msg == null) throw new I2CPMessageException("Null message?!");
|
||||||
if (_log.shouldLog(Log.DEBUG)) {
|
if (_log.shouldLog(Log.DEBUG)) {
|
||||||
@ -298,32 +455,8 @@ public class ClientConnectionRunner {
|
|||||||
+ " message on for "
|
+ " message on for "
|
||||||
+ _config.getDestination().calculateHash().toBase64());
|
+ _config.getDestination().calculateHash().toBase64());
|
||||||
}
|
}
|
||||||
long before = _context.clock().now();
|
_writer.addMessage(msg);
|
||||||
try {
|
|
||||||
synchronized (_out) {
|
|
||||||
msg.writeMessage(_out);
|
|
||||||
_out.flush();
|
|
||||||
}
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug("after doSend of a "+ msg.getClass().getName() + " message");
|
|
||||||
} catch (I2CPMessageException ime) {
|
|
||||||
_log.error("Message exception sending I2CP message", ime);
|
|
||||||
throw ime;
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.error("IO exception sending I2CP message", ioe);
|
|
||||||
throw ioe;
|
|
||||||
} catch (Throwable t) {
|
|
||||||
_log.log(Log.CRIT, "Unhandled exception sending I2CP message", t);
|
|
||||||
throw new IOException("Unhandled exception sending I2CP message: " + t.getMessage());
|
|
||||||
} finally {
|
|
||||||
long after = _context.clock().now();
|
|
||||||
long lag = after - before;
|
|
||||||
if (lag > 300) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn("synchronization on the i2cp message send took too long (" + lag
|
|
||||||
+ "ms): " + msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME
|
// this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME
|
||||||
@ -350,12 +483,21 @@ public class ClientConnectionRunner {
|
|||||||
boolean isPending = false;
|
boolean isPending = false;
|
||||||
int pending = 0;
|
int pending = 0;
|
||||||
String buf = null;
|
String buf = null;
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
synchronized (_acceptedPending) {
|
synchronized (_acceptedPending) {
|
||||||
|
inLock = _context.clock().now();
|
||||||
if (_acceptedPending.contains(id))
|
if (_acceptedPending.contains(id))
|
||||||
isPending = true;
|
isPending = true;
|
||||||
pending = _acceptedPending.size();
|
pending = _acceptedPending.size();
|
||||||
buf = _acceptedPending.toString();
|
buf = _acceptedPending.toString();
|
||||||
}
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("alreadyAccepted.locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
if (pending >= 1) {
|
if (pending >= 1) {
|
||||||
_log.warn("Pending acks: " + pending + ": " + buf);
|
_log.warn("Pending acks: " + pending + ": " + buf);
|
||||||
}
|
}
|
||||||
@ -405,16 +547,28 @@ public class ClientConnectionRunner {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean alreadyProcessed = false;
|
||||||
|
long beforeLock = MessageDeliveryStatusUpdate.this._context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
synchronized (_alreadyProcessed) {
|
synchronized (_alreadyProcessed) {
|
||||||
|
inLock = MessageDeliveryStatusUpdate.this._context.clock().now();
|
||||||
if (_alreadyProcessed.contains(_messageId)) {
|
if (_alreadyProcessed.contains(_messageId)) {
|
||||||
_log.warn("Status already updated");
|
_log.warn("Status already updated");
|
||||||
return;
|
alreadyProcessed = true;
|
||||||
} else {
|
} else {
|
||||||
_alreadyProcessed.add(_messageId);
|
_alreadyProcessed.add(_messageId);
|
||||||
while (_alreadyProcessed.size() > 10)
|
while (_alreadyProcessed.size() > 10)
|
||||||
_alreadyProcessed.remove(0);
|
_alreadyProcessed.remove(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
long afterLock = MessageDeliveryStatusUpdate.this._context.clock().now();
|
||||||
|
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("MessageDeliveryStatusUpdate.locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (alreadyProcessed) return;
|
||||||
|
|
||||||
if (_lastTried > 0) {
|
if (_lastTried > 0) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
@ -435,8 +589,6 @@ public class ClientConnectionRunner {
|
|||||||
doSend(msg);
|
doSend(msg);
|
||||||
} catch (I2CPMessageException ime) {
|
} catch (I2CPMessageException ime) {
|
||||||
_log.warn("Error updating the status for message ID " + _messageId, ime);
|
_log.warn("Error updating the status for message ID " + _messageId, ime);
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.warn("Error updating the status for message ID " + _messageId, ioe);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -116,7 +116,8 @@ public class ClientManager {
|
|||||||
// check if there is a runner for it
|
// check if there is a runner for it
|
||||||
ClientConnectionRunner runner = getRunner(toDest);
|
ClientConnectionRunner runner = getRunner(toDest);
|
||||||
if (runner != null) {
|
if (runner != null) {
|
||||||
_log.debug("Message " + msgId + " is targeting a local destination. distribute it as such");
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Message " + msgId + " is targeting a local destination. distribute it as such");
|
||||||
runner.receiveMessage(toDest, fromDest, payload);
|
runner.receiveMessage(toDest, fromDest, payload);
|
||||||
if (fromDest != null) {
|
if (fromDest != null) {
|
||||||
ClientConnectionRunner sender = getRunner(fromDest);
|
ClientConnectionRunner sender = getRunner(fromDest);
|
||||||
@ -128,7 +129,8 @@ public class ClientManager {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// remote. w00t
|
// remote. w00t
|
||||||
_log.debug("Message " + msgId + " is targeting a REMOTE destination! Added to the client message pool");
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Message " + msgId + " is targeting a REMOTE destination! Added to the client message pool");
|
||||||
runner = getRunner(fromDest);
|
runner = getRunner(fromDest);
|
||||||
ClientMessage msg = new ClientMessage();
|
ClientMessage msg = new ClientMessage();
|
||||||
msg.setDestination(toDest);
|
msg.setDestination(toDest);
|
||||||
@ -137,7 +139,7 @@ public class ClientManager {
|
|||||||
msg.setSenderConfig(runner.getConfig());
|
msg.setSenderConfig(runner.getConfig());
|
||||||
msg.setFromDestination(runner.getConfig().getDestination());
|
msg.setFromDestination(runner.getConfig().getDestination());
|
||||||
msg.setMessageId(msgId);
|
msg.setMessageId(msgId);
|
||||||
_context.clientMessagePool().add(msg);
|
_context.clientMessagePool().add(msg, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,16 +171,35 @@ public class ClientManager {
|
|||||||
|
|
||||||
|
|
||||||
public boolean isLocal(Destination dest) {
|
public boolean isLocal(Destination dest) {
|
||||||
|
boolean rv = false;
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
synchronized (_runners) {
|
synchronized (_runners) {
|
||||||
return (_runners.containsKey(dest));
|
inLock = _context.clock().now();
|
||||||
|
rv = _runners.containsKey(dest);
|
||||||
}
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("isLocal(Destination).locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
|
return rv;
|
||||||
}
|
}
|
||||||
public boolean isLocal(Hash destHash) {
|
public boolean isLocal(Hash destHash) {
|
||||||
if (destHash == null) return false;
|
if (destHash == null) return false;
|
||||||
Set dests = new HashSet();
|
Set dests = new HashSet();
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
synchronized (_runners) {
|
synchronized (_runners) {
|
||||||
|
inLock = _context.clock().now();
|
||||||
dests.addAll(_runners.keySet());
|
dests.addAll(_runners.keySet());
|
||||||
}
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("isLocal(Hash).locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
for (Iterator iter = dests.iterator(); iter.hasNext();) {
|
for (Iterator iter = dests.iterator(); iter.hasNext();) {
|
||||||
Destination d = (Destination)iter.next();
|
Destination d = (Destination)iter.next();
|
||||||
if (d.calculateHash().equals(destHash)) return true;
|
if (d.calculateHash().equals(destHash)) return true;
|
||||||
@ -187,9 +208,19 @@ public class ClientManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private ClientConnectionRunner getRunner(Destination dest) {
|
private ClientConnectionRunner getRunner(Destination dest) {
|
||||||
|
ClientConnectionRunner rv = null;
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
synchronized (_runners) {
|
synchronized (_runners) {
|
||||||
return (ClientConnectionRunner)_runners.get(dest);
|
inLock = _context.clock().now();
|
||||||
|
rv = (ClientConnectionRunner)_runners.get(dest);
|
||||||
}
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("getRunner(Dest).locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -208,9 +239,18 @@ public class ClientManager {
|
|||||||
if (destHash == null)
|
if (destHash == null)
|
||||||
return null;
|
return null;
|
||||||
Set dests = new HashSet();
|
Set dests = new HashSet();
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
synchronized (_runners) {
|
synchronized (_runners) {
|
||||||
|
inLock = _context.clock().now();
|
||||||
dests.addAll(_runners.keySet());
|
dests.addAll(_runners.keySet());
|
||||||
}
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("getRunner(Hash).locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
|
|
||||||
for (Iterator iter = dests.iterator(); iter.hasNext(); ) {
|
for (Iterator iter = dests.iterator(); iter.hasNext(); ) {
|
||||||
Destination d = (Destination)iter.next();
|
Destination d = (Destination)iter.next();
|
||||||
if (d.calculateHash().equals(destHash))
|
if (d.calculateHash().equals(destHash))
|
||||||
@ -235,9 +275,18 @@ public class ClientManager {
|
|||||||
|
|
||||||
private Set getRunnerDestinations() {
|
private Set getRunnerDestinations() {
|
||||||
Set dests = new HashSet();
|
Set dests = new HashSet();
|
||||||
|
long beforeLock = _context.clock().now();
|
||||||
|
long inLock = 0;
|
||||||
synchronized (_runners) {
|
synchronized (_runners) {
|
||||||
|
inLock = _context.clock().now();
|
||||||
dests.addAll(_runners.keySet());
|
dests.addAll(_runners.keySet());
|
||||||
}
|
}
|
||||||
|
long afterLock = _context.clock().now();
|
||||||
|
if (afterLock - beforeLock > 50) {
|
||||||
|
_log.warn("getRunnerDestinations().locking took too long: " + (afterLock-beforeLock)
|
||||||
|
+ " overall, synchronized took " + (inLock - beforeLock));
|
||||||
|
}
|
||||||
|
|
||||||
return dests;
|
return dests;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,8 +104,6 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
|
|||||||
_runner.doSend(new SetDateMessage());
|
_runner.doSend(new SetDateMessage());
|
||||||
} catch (I2CPMessageException ime) {
|
} catch (I2CPMessageException ime) {
|
||||||
_log.error("Error writing out the setDate message", ime);
|
_log.error("Error writing out the setDate message", ime);
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.error("Error writing out the setDate message", ioe);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private void handleSetDate(I2CPMessageReader reader, SetDateMessage message) {
|
private void handleSetDate(I2CPMessageReader reader, SetDateMessage message) {
|
||||||
@ -119,7 +117,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
|
|||||||
*/
|
*/
|
||||||
private void handleCreateSession(I2CPMessageReader reader, CreateSessionMessage message) {
|
private void handleCreateSession(I2CPMessageReader reader, CreateSessionMessage message) {
|
||||||
if (message.getSessionConfig().verifySignature()) {
|
if (message.getSessionConfig().verifySignature()) {
|
||||||
_log.debug("Signature verified correctly on create session message");
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Signature verified correctly on create session message");
|
||||||
} else {
|
} else {
|
||||||
_log.error("Signature verification *FAILED* on a create session message. Hijack attempt?");
|
_log.error("Signature verification *FAILED* on a create session message. Hijack attempt?");
|
||||||
_runner.disconnectClient("Invalid signature on CreateSessionMessage");
|
_runner.disconnectClient("Invalid signature on CreateSessionMessage");
|
||||||
@ -143,8 +142,6 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
|
|||||||
_log.debug("after sessionEstablished for " + message.getSessionConfig().getDestination().calculateHash().toBase64());
|
_log.debug("after sessionEstablished for " + message.getSessionConfig().getDestination().calculateHash().toBase64());
|
||||||
} catch (I2CPMessageException ime) {
|
} catch (I2CPMessageException ime) {
|
||||||
_log.error("Error writing out the session status message", ime);
|
_log.error("Error writing out the session status message", ime);
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.error("Error writing out the session status message", ioe);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_context.jobQueue().addJob(new CreateSessionJob(_context, _runner));
|
_context.jobQueue().addJob(new CreateSessionJob(_context, _runner));
|
||||||
@ -158,8 +155,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
|
|||||||
*/
|
*/
|
||||||
private void handleSendMessage(I2CPMessageReader reader, SendMessageMessage message) {
|
private void handleSendMessage(I2CPMessageReader reader, SendMessageMessage message) {
|
||||||
_log.debug("handleSendMessage called");
|
_log.debug("handleSendMessage called");
|
||||||
|
long beforeDistribute = _context.clock().now();
|
||||||
MessageId id = _runner.distributeMessage(message);
|
MessageId id = _runner.distributeMessage(message);
|
||||||
|
long timeToDistribute = _context.clock().now() - beforeDistribute;
|
||||||
_runner.ackSendMessage(id, message.getNonce());
|
_runner.ackSendMessage(id, message.getNonce());
|
||||||
|
if (timeToDistribute > 50)
|
||||||
|
_log.warn("Took too long to distribute the message (which holds up the ack): " + timeToDistribute);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -182,8 +183,6 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
|
|||||||
msg.setPayload(payload);
|
msg.setPayload(payload);
|
||||||
try {
|
try {
|
||||||
_runner.doSend(msg);
|
_runner.doSend(msg);
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.error("Error delivering the payload", ioe);
|
|
||||||
} catch (I2CPMessageException ime) {
|
} catch (I2CPMessageException ime) {
|
||||||
_log.error("Error delivering the payload", ime);
|
_log.error("Error delivering the payload", ime);
|
||||||
}
|
}
|
||||||
|
@ -65,8 +65,6 @@ class MessageReceivedJob extends JobImpl {
|
|||||||
_runner.doSend(msg);
|
_runner.doSend(msg);
|
||||||
} catch (I2CPMessageException ime) {
|
} catch (I2CPMessageException ime) {
|
||||||
_log.error("Error writing out the message status message", ime);
|
_log.error("Error writing out the message status message", ime);
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.error("Error writing out the message status message", ioe);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -51,8 +51,6 @@ class ReportAbuseJob extends JobImpl {
|
|||||||
_runner.doSend(msg);
|
_runner.doSend(msg);
|
||||||
} catch (I2CPMessageException ime) {
|
} catch (I2CPMessageException ime) {
|
||||||
_log.error("Error reporting abuse", ime);
|
_log.error("Error reporting abuse", ime);
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.error("Error reporting abuse", ioe);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,12 +85,6 @@ class RequestLeaseSetJob extends JobImpl {
|
|||||||
_runner.setLeaseRequest(null);
|
_runner.setLeaseRequest(null);
|
||||||
_runner.disconnectClient("I2CP error requesting leaseSet");
|
_runner.disconnectClient("I2CP error requesting leaseSet");
|
||||||
return;
|
return;
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.error("Error sending I2CP message requesting the lease set", ioe);
|
|
||||||
state.setIsSuccessful(false);
|
|
||||||
_runner.setLeaseRequest(null);
|
|
||||||
_runner.disconnectClient("IO error requesting leaseSet");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user