2004-11-06 jrandom

* Fix for a long standing synchronization bug in the SDK that in rare
      instances can add a few seconds of lag.
This commit is contained in:
jrandom
2004-11-06 07:59:54 +00:00
committed by zzz
parent 9cf663063e
commit 314316cee0
6 changed files with 138 additions and 133 deletions

View File

@ -314,6 +314,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
public abstract void receiveStatus(int msgId, long nonce, int status); public abstract void receiveStatus(int msgId, long nonce, int status);
protected boolean isGuaranteed() { protected boolean isGuaranteed() {
if (_log.shouldLog(Log.DEBUG)) {
String str = _options.getProperty(I2PClient.PROP_RELIABILITY);
if (str == null)
_log.debug("reliability is not specified, fallback");
else
_log.debug("reliability is specified: " + str);
}
String reliability = _options.getProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); String reliability = _options.getProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
return I2PClient.PROP_RELIABILITY_GUARANTEED.equals(reliability); return I2PClient.PROP_RELIABILITY_GUARANTEED.equals(reliability);
} }

View File

@ -168,7 +168,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
long afterSendingSync = _context.clock().now(); long afterSendingSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / "
+ state.getNonce() + state.getNonce() + " for best effort "
+ " sync took " + (inSendingSync-beforeSendingSync) + " sync took " + (inSendingSync-beforeSendingSync)
+ " add took " + (afterSendingSync-inSendingSync)); + " add took " + (afterSendingSync-inSendingSync));
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
@ -248,7 +248,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
long afterSendingSync = _context.clock().now(); long afterSendingSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / "
+ state.getNonce() + state.getNonce() + " for guaranteed "
+ " sync took " + (inSendingSync-beforeSendingSync) + " sync took " + (inSendingSync-beforeSendingSync)
+ " add took " + (afterSendingSync-inSendingSync)); + " add took " + (afterSendingSync-inSendingSync));
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);

View File

@ -29,7 +29,6 @@ class MessageState {
private Destination _to; private Destination _to;
private boolean _cancelled; private boolean _cancelled;
private long _created; private long _created;
private Object _lock = new Object();
private static long __stateId = 0; private static long __stateId = 0;
private long _stateId; private long _stateId;
@ -51,9 +50,7 @@ class MessageState {
public void receive(int status) { public void receive(int status) {
synchronized (_receivedStatus) { synchronized (_receivedStatus) {
_receivedStatus.add(new Integer(status)); _receivedStatus.add(new Integer(status));
} _receivedStatus.notifyAll();
synchronized (_lock) {
_lock.notifyAll();
} }
} }
@ -116,150 +113,140 @@ class MessageState {
_log.warn(_prefix + "Expired waiting for the status [" + status + "]"); _log.warn(_prefix + "Expired waiting for the status [" + status + "]");
return; return;
} }
if (isSuccess(status) || isFailure(status)) { synchronized (_receivedStatus) {
if (_log.shouldLog(Log.DEBUG)) if (locked_isSuccess(status) || locked_isFailure(status)) {
_log.debug(_prefix + "Received a confirm (one way or the other)"); if (_log.shouldLog(Log.DEBUG))
return; _log.debug(_prefix + "Received a confirm (one way or the other)");
} return;
if (timeToWait > 5000) { }
timeToWait = 5000; if (timeToWait > 5000) {
} timeToWait = 5000;
synchronized (_lock) { }
try { try {
_lock.wait(timeToWait); _receivedStatus.wait(timeToWait);
} catch (InterruptedException ie) { // nop } catch (InterruptedException ie) { // nop
} }
} }
} }
} }
private boolean isSuccess(int wantedStatus) { private boolean locked_isSuccess(int wantedStatus) {
List received = null;
synchronized (_receivedStatus) {
received = new ArrayList(_receivedStatus);
//_receivedStatus.clear();
}
boolean rv = false; boolean rv = false;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "isSuccess(" + wantedStatus + "): " + received); _log.debug(_prefix + "isSuccess(" + wantedStatus + "): " + _receivedStatus);
for (Iterator iter = received.iterator(); iter.hasNext();) { for (Iterator iter = _receivedStatus.iterator(); iter.hasNext();) {
Integer val = (Integer) iter.next(); Integer val = (Integer) iter.next();
int recv = val.intValue(); int recv = val.intValue();
switch (recv) { switch (recv) {
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from " _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from "
+ toString()); + toString());
rv = false; rv = false;
break; break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from " _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from "
+ toString()); + toString());
rv = false; rv = false;
break; break;
case MessageStatusMessage.STATUS_SEND_ACCEPTED: case MessageStatusMessage.STATUS_SEND_ACCEPTED:
if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) { if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) {
return true; // if we're only looking for accepted, take it directly (don't let any GUARANTEED_* override it) return true; // if we're only looking for accepted, take it directly (don't let any GUARANTEED_* override it)
} }
// ignore accepted, as we want something better // ignore accepted, as we want something better
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Got accepted, but we're waiting for more from " + toString());
continue;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received best effort success after " + getElapsed()
+ " from " + toString());
if (wantedStatus == recv) {
rv = true;
} else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Not guaranteed success, but best effort after " _log.debug(_prefix + "Got accepted, but we're waiting for more from " + toString());
+ getElapsed() + " will do... from " + toString()); continue;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received best effort success after " + getElapsed()
+ " from " + toString());
if (wantedStatus == recv) {
rv = true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Not guaranteed success, but best effort after "
+ getElapsed() + " will do... from " + toString());
rv = true;
}
break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from "
+ toString());
// even if we're waiting for best effort success, guaranteed is good enough
rv = true; rv = true;
} break;
break; case -1:
case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS: continue;
if (_log.shouldLog(Log.DEBUG)) default:
_log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from " if (_log.shouldLog(Log.DEBUG))
+ toString()); _log.debug(_prefix + "Received something else [" + recv + "]...");
// even if we're waiting for best effort success, guaranteed is good enough
rv = true;
break;
case -1:
continue;
default:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received something else [" + recv + "]...");
} }
} }
return rv; return rv;
} }
private boolean isFailure(int wantedStatus) { private boolean locked_isFailure(int wantedStatus) {
List received = null;
synchronized (_receivedStatus) {
received = new ArrayList(_receivedStatus);
//_receivedStatus.clear();
}
boolean rv = false; boolean rv = false;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "isFailure(" + wantedStatus + "): " + received); _log.debug(_prefix + "isFailure(" + wantedStatus + "): " + _receivedStatus);
for (Iterator iter = received.iterator(); iter.hasNext();) {
for (Iterator iter = _receivedStatus.iterator(); iter.hasNext();) {
Integer val = (Integer) iter.next(); Integer val = (Integer) iter.next();
int recv = val.intValue(); int recv = val.intValue();
switch (recv) { switch (recv) {
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
if (_log.shouldLog(Log.DEBUG))
_log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from "
+ toString());
rv = true;
break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
if (_log.shouldLog(Log.DEBUG))
_log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from "
+ toString());
rv = true;
break;
case MessageStatusMessage.STATUS_SEND_ACCEPTED:
if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) {
rv = false;
} else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Got accepted, but we're waiting for more from " _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from "
+ toString());
rv = true;
break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
if (_log.shouldLog(Log.DEBUG))
_log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from "
+ toString());
rv = true;
break;
case MessageStatusMessage.STATUS_SEND_ACCEPTED:
if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) {
rv = false;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Got accepted, but we're waiting for more from "
+ toString());
continue;
// ignore accepted, as we want something better
}
break;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received best effort success after " + getElapsed()
+ " from " + toString());
if (wantedStatus == recv) {
rv = false;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Not guaranteed success, but best effort after "
+ getElapsed() + " will do... from " + toString());
rv = false;
}
break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from "
+ toString()); + toString());
// even if we're waiting for best effort success, guaranteed is good enough
rv = false;
break;
case -1:
continue; continue;
// ignore accepted, as we want something better default:
}
break;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received best effort success after " + getElapsed()
+ " from " + toString());
if (wantedStatus == recv) {
rv = false;
} else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Not guaranteed success, but best effort after " _log.debug(_prefix + "Received something else [" + recv + "]...");
+ getElapsed() + " will do... from " + toString());
rv = false;
}
break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from "
+ toString());
// even if we're waiting for best effort success, guaranteed is good enough
rv = false;
break;
case -1:
continue;
default:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received something else [" + recv + "]...");
} }
} }
return rv; return rv;
@ -267,13 +254,15 @@ class MessageState {
/** true if the given status (or an equivilant) was received */ /** true if the given status (or an equivilant) was received */
public boolean received(int status) { public boolean received(int status) {
return isSuccess(status); synchronized (_receivedStatus) {
return locked_isSuccess(status);
}
} }
public void cancel() { public void cancel() {
_cancelled = true; _cancelled = true;
synchronized (_lock) { synchronized (_receivedStatus) {
_lock.notifyAll(); _receivedStatus.notifyAll();
} }
} }
} }

View File

@ -1,4 +1,8 @@
$Id: history.txt,v 1.62 2004/11/02 06:57:08 jrandom Exp $ $Id: history.txt,v 1.63 2004/11/05 05:53:41 jrandom Exp $
2004-11-06 jrandom
* Fix for a long standing synchronization bug in the SDK that in rare
instances can add a few seconds of lag.
2004-11-05 jrandom 2004-11-05 jrandom
* Bugfixes and unit tests for the SAM bridge to handle quoted message * Bugfixes and unit tests for the SAM bridge to handle quoted message

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.68 $ $Date: 2004/11/02 06:57:08 $"; public final static String ID = "$Revision: 1.69 $ $Date: 2004/11/05 05:53:40 $";
public final static String VERSION = "0.4.1.3"; public final static String VERSION = "0.4.1.3";
public final static long BUILD = 9; public final static long BUILD = 10;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION); System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -186,6 +186,7 @@ public class SendTunnelMessageJob extends JobImpl {
_state = 12; _state = 12;
// we're the gateway, so sign, encrypt, and forward to info.getNextHop() // we're the gateway, so sign, encrypt, and forward to info.getNextHop()
TunnelMessage msg = prepareMessage(info); TunnelMessage msg = prepareMessage(info);
_state = 66;
if (msg == null) { if (msg == null) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("wtf, unable to prepare a tunnel message to the next hop, when we're the gateway and hops remain? tunnel: " + info); _log.error("wtf, unable to prepare a tunnel message to the next hop, when we're the gateway and hops remain? tunnel: " + info);
@ -212,13 +213,17 @@ public class SendTunnelMessageJob extends JobImpl {
_log.warn("Adding a tunnel message that will expire shortly [" _log.warn("Adding a tunnel message that will expire shortly ["
+ new Date(_expiration) + "]", getAddedBy()); + new Date(_expiration) + "]", getAddedBy());
} }
_state = 67;
msg.setMessageExpiration(new Date(_expiration)); msg.setMessageExpiration(new Date(_expiration));
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, _state = 68;
info.getNextHop(), _onSend, Job j = new SendMessageDirectJob(getContext(), msg,
_onReply, _onFailure, info.getNextHop(), _onSend,
_selector, _onReply, _onFailure,
(int)(_expiration - getContext().clock().now()), _selector,
_priority)); (int)(_expiration - getContext().clock().now()),
_priority);
_state = 69;
getContext().jobQueue().addJob(j);
_state = 15; _state = 15;
} }
} }