I2CP: Return local delivery failure on queue overflow (ticket #1939)

This commit is contained in:
zzz
2017-02-08 15:22:41 +00:00
parent 36ec4de9c7
commit a11bd7cbe7
4 changed files with 34 additions and 18 deletions

View File

@ -250,6 +250,8 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS: case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
// probably took a long time to open the tunnel, allow retx // probably took a long time to open the tunnel, allow retx
case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED: case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
// overflow in router-side I2CP queue, sent as of 0.9.29, will be retried
case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Rcvd soft failure status " + status + " for msg " + msgId + " on " + con); _log.warn("Rcvd soft failure status " + status + " for msg " + msgId + " on " + con);
_messageStatusMap.remove(id); _messageStatusMap.remove(id);
@ -269,7 +271,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
break; break;
case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER: case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK: case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION: case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:

View File

@ -720,40 +720,44 @@ class ClientConnectionRunner {
} }
/** /**
* Asynchronously deliver the message to the current runner * Synchronously deliver the message to the current runner
* *
* Note that no failure indication is available. * Failure indication is available as of 0.9.29.
* Fails silently on e.g. queue overflow to client, client dead, etc. * Fails on e.g. queue overflow to client, client dead, etc.
* *
* @param toDest non-null * @param toDest non-null
* @param fromDest generally null when from remote, non-null if from local * @param fromDest generally null when from remote, non-null if from local
* @return success
*/ */
void receiveMessage(Destination toDest, Destination fromDest, Payload payload) { boolean receiveMessage(Destination toDest, Destination fromDest, Payload payload) {
if (_dead) return; if (_dead)
return false;
MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload, _dontSendMSMOnReceive); MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload, _dontSendMSMOnReceive);
// This is fast and non-blocking, run in-line // This is fast and non-blocking, run in-line
//_context.jobQueue().addJob(j); //_context.jobQueue().addJob(j);
j.runJob(); //j.runJob();
return j.receiveMessage();
} }
/** /**
* Asynchronously deliver the message to the current runner * Synchronously deliver the message to the current runner
* *
* Note that no failure indication is available. * Failure indication is available as of 0.9.29.
* Fails silently on e.g. queue overflow to client, client dead, etc. * Fails on e.g. queue overflow to client, client dead, etc.
* *
* @param toHash non-null * @param toHash non-null
* @param fromDest generally null when from remote, non-null if from local * @param fromDest generally null when from remote, non-null if from local
* @return success
* @since 0.9.21 * @since 0.9.21
*/ */
void receiveMessage(Hash toHash, Destination fromDest, Payload payload) { boolean receiveMessage(Hash toHash, Destination fromDest, Payload payload) {
SessionParams sp = _sessions.get(toHash); SessionParams sp = _sessions.get(toHash);
if (sp == null) { if (sp == null) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("No session found for receiveMessage()"); _log.warn("No session found for receiveMessage()");
return; return false;
} }
receiveMessage(sp.dest, fromDest, payload); return receiveMessage(sp.dest, fromDest, payload);
} }
/** /**

View File

@ -417,11 +417,10 @@ class ClientManager {
public String getName() { return "Distribute local message"; } public String getName() { return "Distribute local message"; }
public void runJob() { public void runJob() {
_to.receiveMessage(_toDest, _fromDest, _payload); boolean ok = _to.receiveMessage(_toDest, _fromDest, _payload);
// note that receiveMessage() does not indicate a failure,
// so a queue overflow is not recognized. we always return success.
if (_from != null) { if (_from != null) {
_from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); int rc = ok ? MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL : MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL;
_from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, rc);
} }
} }
} }

View File

@ -48,7 +48,17 @@ class MessageReceivedJob extends JobImpl {
public String getName() { return "Deliver New Message"; } public String getName() { return "Deliver New Message"; }
public void runJob() { public void runJob() {
if (_runner.isDead()) return; receiveMessage();
}
/**
* Same as runJob() but with a return value
* @return success
* @since 0.9.29
*/
public boolean receiveMessage() {
if (_runner.isDead())
return false;
MessageId id = null; MessageId id = null;
try { try {
long nextID = _runner.getNextMessageId(); long nextID = _runner.getNextMessageId();
@ -59,6 +69,7 @@ class MessageReceivedJob extends JobImpl {
_runner.setPayload(id, _payload); _runner.setPayload(id, _payload);
messageAvailable(id, _payload.getSize()); messageAvailable(id, _payload.getSize());
} }
return true;
} catch (I2CPMessageException ime) { } catch (I2CPMessageException ime) {
String msg = "Error sending data to client " + _runner.getDestHash(); String msg = "Error sending data to client " + _runner.getDestHash();
if (_log.shouldWarn()) if (_log.shouldWarn())
@ -67,6 +78,7 @@ class MessageReceivedJob extends JobImpl {
_log.logAlways(Log.WARN, msg); _log.logAlways(Log.WARN, msg);
if (id != null && !_sendDirect) if (id != null && !_sendDirect)
_runner.removePayload(id); _runner.removePayload(id);
return false;
} }
} }