- Fix leak if nonce = 0 but reliability != none
   - More work on failure codes (ticket #788)
   - Fix race with _finished indication in OCMOSJ
This commit is contained in:
zzz
2013-01-02 13:19:40 +00:00
parent 2ea9fc5d61
commit e375ffe8f1
3 changed files with 42 additions and 36 deletions

View File

@ -30,10 +30,12 @@ class MessageStatusMessageHandler extends HandlerImpl {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle message " + message); _log.debug("Handle message " + message);
MessageStatusMessage msg = (MessageStatusMessage) message; MessageStatusMessage msg = (MessageStatusMessage) message;
switch (msg.getStatus()) { int status = msg.getStatus();
long id = msg.getMessageId();
switch (status) {
case MessageStatusMessage.STATUS_AVAILABLE: case MessageStatusMessage.STATUS_AVAILABLE:
ReceiveMessageBeginMessage m = new ReceiveMessageBeginMessage(); ReceiveMessageBeginMessage m = new ReceiveMessageBeginMessage();
m.setMessageId(msg.getMessageId()); m.setMessageId(id);
m.setSessionId(msg.getSessionId()); m.setSessionId(msg.getSessionId());
try { try {
session.sendMessage(m); session.sendMessage(m);
@ -41,27 +43,23 @@ class MessageStatusMessageHandler extends HandlerImpl {
_log.error("Error asking for the message", ise); _log.error("Error asking for the message", ise);
} }
return; return;
case MessageStatusMessage.STATUS_SEND_ACCEPTED: case MessageStatusMessage.STATUS_SEND_ACCEPTED:
session.receiveStatus((int)msg.getMessageId(), msg.getNonce(), msg.getStatus()); session.receiveStatus((int)id, msg.getNonce(), status);
// noop // noop
return; return;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
if (_log.shouldLog(Log.INFO))
_log.info("Message delivery succeeded for message " + msg.getMessageId());
//if (!skipStatus)
session.receiveStatus((int)msg.getMessageId(), msg.getNonce(), msg.getStatus());
return;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
if (_log.shouldLog(Log.INFO))
_log.info("Message delivery FAILED for message " + msg.getMessageId());
//if (!skipStatus)
session.receiveStatus((int)msg.getMessageId(), msg.getNonce(), msg.getStatus());
return;
default: default:
if (_log.shouldLog(Log.ERROR)) if (msg.isSuccessful()) {
_log.error("Invalid message delivery status received: " + msg.getStatus()); if (_log.shouldLog(Log.DEBUG))
_log.debug("Message delivery succeeded for message " + id);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Message delivery FAILED (" + status + ") for message " + id);
}
//if (!skipStatus)
session.receiveStatus((int)id, msg.getNonce(), status);
return;
} }
} }
} }

View File

@ -270,6 +270,8 @@ class ClientConnectionRunner {
* Note that this sends the Guaranteed status codes, even though we only support best effort. * Note that this sends the Guaranteed status codes, even though we only support best effort.
* Doesn't do anything if i2cp.messageReliability = "none" * Doesn't do anything if i2cp.messageReliability = "none"
* *
* Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that.
*
* @param status see I2CP MessageStatusMessage for success/failure codes * @param status see I2CP MessageStatusMessage for success/failure codes
*/ */
void updateMessageDeliveryStatus(MessageId id, int status) { void updateMessageDeliveryStatus(MessageId id, int status) {
@ -357,7 +359,7 @@ class ClientConnectionRunner {
expiration = msg.getExpirationTime(); expiration = msg.getExpirationTime();
flags = msg.getFlags(); flags = msg.getFlags();
} }
if (!_dontSendMSM) if (message.getNonce() != 0 && !_dontSendMSM)
_acceptedPending.add(id); _acceptedPending.add(id);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -382,6 +384,9 @@ class ClientConnectionRunner {
* for delivery (but not necessarily delivered) * for delivery (but not necessarily delivered)
* Doesn't do anything if i2cp.messageReliability = "none" * Doesn't do anything if i2cp.messageReliability = "none"
* or if the nonce is 0. * or if the nonce is 0.
*
* @param id OUR id for the message
* @param nonce HIS id for the message
*/ */
void ackSendMessage(MessageId id, long nonce) { void ackSendMessage(MessageId id, long nonce) {
if (_dontSendMSM || nonce == 0) if (_dontSendMSM || nonce == 0)
@ -630,6 +635,8 @@ class ClientConnectionRunner {
private long _lastTried; private long _lastTried;
/** /**
* Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that.
*
* @param status see I2CP MessageStatusMessage for success/failure codes * @param status see I2CP MessageStatusMessage for success/failure codes
*/ */
public MessageDeliveryStatusUpdate(MessageId id, int status) { public MessageDeliveryStatusUpdate(MessageId id, int status) {

View File

@ -6,6 +6,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.client.SendMessageOptions; import net.i2p.client.SendMessageOptions;
import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.SessionKeyManager;
@ -59,7 +60,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private PayloadGarlicConfig _clove; private PayloadGarlicConfig _clove;
private long _cloveId; private long _cloveId;
private final long _start; private final long _start;
private boolean _finished; private final AtomicBoolean _finished = new AtomicBoolean();
private long _leaseSetLookupBegin; private long _leaseSetLookupBegin;
private TunnelInfo _outTunnel; private TunnelInfo _outTunnel;
private TunnelInfo _inTunnel; private TunnelInfo _inTunnel;
@ -199,7 +200,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
public void runJob() { public void runJob() {
long now = getContext().clock().now(); long now = getContext().clock().now();
if (now >= _overallExpiration) { if (now >= _overallExpiration) {
dieFatal(); dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED);
return; return;
} }
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
@ -271,7 +272,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// shouldn't happen // shouldn't happen
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")"); _log.warn("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")");
dieFatal(); dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET);
} }
} }
} }
@ -403,12 +404,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
getContext().statManager().addRateData("client.leaseSetFailedRemoteTime", lookupTime, lookupTime); getContext().statManager().addRateData("client.leaseSetFailedRemoteTime", lookupTime, lookupTime);
} }
if (!_finished) { if (!_finished.get()) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send to " + _toString + " because we couldn't find their leaseSet"); _log.warn("Unable to send to " + _toString + " because we couldn't find their leaseSet");
} }
dieFatal(); dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET);
} }
} }
@ -421,10 +422,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* *
*/ */
private void send() { private void send() {
if (_finished) return; if (_finished.get()) return;
long now = getContext().clock().now(); long now = getContext().clock().now();
if (now >= _overallExpiration) { if (now >= _overallExpiration) {
dieFatal(); dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED);
return; return;
} }
@ -480,7 +481,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
boolean ok = buildClove(); boolean ok = buildClove();
if (!ok) { if (!ok) {
dieFatal(); dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION);
return; return;
} }
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
@ -501,7 +502,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Unable to create the garlic message (no tunnels left or too lagged) to " + _toString); _log.warn(getJobId() + ": Unable to create the garlic message (no tunnels left or too lagged) to " + _toString);
getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0); getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0);
dieFatal(); dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS);
return; return;
} }
@ -545,7 +546,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Could not find any outbound tunnels to send the payload through... this might take a while"); _log.warn(getJobId() + ": Could not find any outbound tunnels to send the payload through... this might take a while");
getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0); getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0);
dieFatal(); dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS);
} }
_clove = null; _clove = null;
getContext().statManager().addRateData("client.dispatchPrepareTime", now - _start, 0); getContext().statManager().addRateData("client.dispatchPrepareTime", now - _start, 0);
@ -681,8 +682,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} }
private void dieFatal(int status) { private void dieFatal(int status) {
if (_finished) return; if (_finished.getAndSet(true))
_finished = true; return;
long sendTime = getContext().clock().now() - _start; long sendTime = getContext().clock().now() - _start;
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -811,8 +812,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
public void runJob() { public void runJob() {
// do we leak tags here? // do we leak tags here?
if (_finished) return; if (_finished.getAndSet(true))
_finished = true; return;
long sendTime = getContext().clock().now() - _start; long sendTime = getContext().clock().now() - _start;
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(OutboundClientMessageOneShotJob.this.getJobId() _log.info(OutboundClientMessageOneShotJob.this.getJobId()
@ -883,7 +884,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (skm != null) if (skm != null)
skm.failTags(_leaseSet.getEncryptionKey(), _key, _tags); skm.failTags(_leaseSet.getEncryptionKey(), _key, _tags);
} }
dieFatal(); dieFatal(MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE);
} }
} }
} }