- Run HandleJob inline for speed

- Remove payload from message map if availability announce fails
- Cleanups
This commit is contained in:
zzz
2012-09-08 15:10:27 +00:00
parent 98da06cd83
commit e02d82981a
4 changed files with 40 additions and 21 deletions

View File

@ -119,13 +119,14 @@ class ClientConnectionRunner {
}
private static volatile int __id = 0;
/**
* Actually run the connection - listen for I2CP messages and respond. This
* is the main driver for this class, though it gets all its meat from the
* {@link net.i2p.data.i2cp.I2CPMessageReader I2CPMessageReader}
*
*/
public void startRunning() {
public synchronized void startRunning() {
try {
_reader = new I2CPMessageReader(new BufferedInputStream(_socket.getInputStream(), BUF_SIZE),
new ClientMessageEventListener(_context, this, true));
@ -137,13 +138,14 @@ class ClientConnectionRunner {
t.start();
_out = _socket.getOutputStream(); // FIXME OWCH! needs a better way so it can be final. FIXME
_reader.startReading();
// TODO need a cleaner for unclaimed items in _messages, but we have no timestamps...
} catch (IOException ioe) {
_log.error("Error starting up the runner", ioe);
}
}
/** die a horrible death */
void stopRunning() {
public synchronized void stopRunning() {
if (_dead) return;
if (_context.router().isAlive() && _log.shouldLog(Log.WARN))
_log.warn("Stop the I2CP connection! current leaseSet: "
@ -171,16 +173,20 @@ class ClientConnectionRunner {
public SessionConfig getConfig() { return _config; }
/** current client's sessionkeymanager */
public SessionKeyManager getSessionKeyManager() { return _sessionKeyManager; }
/** currently allocated leaseSet */
public LeaseSet getLeaseSet() { return _currentLeaseSet; }
void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; }
public Hash getDestHash() { return _destHashCache; }
/** current client's sessionId */
SessionId getSessionId() { return _sessionId; }
void setSessionId(SessionId id) { if (id != null) _sessionId = id; }
/** data for the current leaseRequest, or null if there is no active leaseSet request */
LeaseRequestState getLeaseRequest() { return _leaseRequest; }
void setLeaseRequest(LeaseRequestState req) {
synchronized (this) {
if ( (_leaseRequest != null) && (req != _leaseRequest) )
@ -188,6 +194,7 @@ class ClientConnectionRunner {
_leaseRequest = req;
}
}
/** already closed? */
boolean isDead() { return _dead; }
@ -469,12 +476,14 @@ class ClientConnectionRunner {
private final long _expirationTime;
private final Job _onCreate;
private final Job _onFailed;
public Rerequest(LeaseSet ls, long expirationTime, Job onCreate, Job onFailed) {
_ls = ls;
_expirationTime = expirationTime;
_onCreate = onCreate;
_onFailed = onFailed;
}
public void timeReached() {
requestLeaseSet(_ls, _expirationTime, _onCreate, _onFailed);
}
@ -579,14 +588,14 @@ class ClientConnectionRunner {
private final static long REQUEUE_DELAY = 500;
private class MessageDeliveryStatusUpdate extends JobImpl {
private MessageId _messageId;
private boolean _success;
private final MessageId _messageId;
private final boolean _success;
private long _lastTried;
public MessageDeliveryStatusUpdate(MessageId id, boolean success) {
super(ClientConnectionRunner.this._context);
_messageId = id;
_success = success;
_lastTried = 0;
}
public String getName() { return "Update Delivery Status"; }

View File

@ -238,7 +238,9 @@ class ClientManager {
_payload = payload;
_msgId = id;
}
public String getName() { return "Distribute local message"; }
public void runJob() {
_to.receiveMessage(_toDest, _fromDest, _payload);
if (_from != null) {
@ -274,6 +276,7 @@ class ClientManager {
}
private static final int REQUEST_LEASESET_TIMEOUT = 120*1000;
public void requestLeaseSet(Hash dest, LeaseSet ls) {
ClientConnectionRunner runner = getRunner(dest);
if (runner != null) {
@ -298,6 +301,7 @@ class ClientManager {
}
return rv;
}
public boolean isLocal(Hash destHash) {
if (destHash == null) return false;
synchronized (_runners) {
@ -480,18 +484,23 @@ class ClientManager {
}
public void messageReceived(ClientMessage msg) {
_ctx.jobQueue().addJob(new HandleJob(msg));
// This is fast and non-blocking, run in-line
//_ctx.jobQueue().addJob(new HandleJob(msg));
(new HandleJob(msg)).runJob();
}
private class HandleJob extends JobImpl {
private ClientMessage _msg;
private final ClientMessage _msg;
public HandleJob(ClientMessage msg) {
super(_ctx);
_msg = msg;
}
public String getName() { return "Handle Inbound Client Messages"; }
public void runJob() {
ClientConnectionRunner runner = null;
ClientConnectionRunner runner;
if (_msg.getDestination() != null)
runner = getRunner(_msg.getDestination());
else

View File

@ -40,16 +40,22 @@ class MessageReceivedJob extends JobImpl {
MessageId id = new MessageId();
id.setMessageId(_runner.getNextMessageId());
_runner.setPayload(id, _payload);
messageAvailable(id, _payload.getSize());
try {
messageAvailable(id, _payload.getSize());
} catch (I2CPMessageException ime) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the message status message", ime);
_runner.removePayload(id);
}
}
/**
* Deliver notification to the client that the given message is available.
*/
private void messageAvailable(MessageId id, long size) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending message available: " + id + " to sessionId " + _runner.getSessionId()
+ " (with nonce=1)", new Exception("available"));
private void messageAvailable(MessageId id, long size) throws I2CPMessageException {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Sending message available: " + id + " to sessionId " + _runner.getSessionId()
// + " (with nonce=1)", new Exception("available"));
MessageStatusMessage msg = new MessageStatusMessage();
msg.setMessageId(id.getMessageId());
msg.setSessionId(_runner.getSessionId().getSessionId());
@ -57,11 +63,6 @@ class MessageReceivedJob extends JobImpl {
// has to be >= 0, it is initialized to -1
msg.setNonce(1);
msg.setStatus(MessageStatusMessage.STATUS_AVAILABLE);
try {
_runner.doSend(msg);
} catch (I2CPMessageException ime) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the message status message", ime);
}
_runner.doSend(msg);
}
}

View File

@ -31,7 +31,7 @@ class QueuedClientConnectionRunner extends ClientConnectionRunner {
* Starts the reader thread. Does not call super().
*/
@Override
public void startRunning() {
public synchronized void startRunning() {
_reader = new QueuedI2CPMessageReader(this.queue, new ClientMessageEventListener(_context, this, false));
_reader.startReading();
}
@ -40,7 +40,7 @@ class QueuedClientConnectionRunner extends ClientConnectionRunner {
* Calls super() to stop the reader, and sends a poison message to the client.
*/
@Override
void stopRunning() {
public synchronized void stopRunning() {
super.stopRunning();
queue.close();
// queue = null;