2005-09-12 jrandom

* Removed guaranteed delivery mode entirely (so existing i2phex clients
      using it can get the benefits of mode=best_effort).  Guaranteed delivery
      is offered at the streaming lib level.
    * Improve the peer selection code for peer testing, as everyone now
      supports tests.
    * Give the watchdog its fangs - if it detects obscene job lag or if
      clients have been unable to get a leaseSet for more than 5 minutes,
      restart the router.  This was disabled a year ago due to spurious
      restarts, and can be disabled by "watchdog.haltOnHang=false", but the
      cause of the spurious restarts should be gone.
This commit is contained in:
jrandom
2005-09-13 03:32:29 +00:00
committed by zzz
parent c8c109093d
commit 9865af4174
9 changed files with 53 additions and 232 deletions

View File

@ -233,6 +233,12 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
notifier.setName("Notifier " + _myDestination.calculateHash().toBase64().substring(0,4));
notifier.setDaemon(true);
notifier.start();
if ( (_options != null) &&
(I2PClient.PROP_RELIABILITY_GUARANTEED.equals(_options.getProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT))) ) {
if (_log.shouldLog(Log.ERROR))
_log.error("I2CP guaranteed delivery mode has been removed, using best effort.");
}
long startConnect = _context.clock().now();
try {
@ -322,18 +328,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
public abstract void receiveStatus(int msgId, long nonce, int status);
protected boolean isGuaranteed() {
if (_log.shouldLog(Log.DEBUG)) {
String str = _options.getProperty(I2PClient.PROP_RELIABILITY);
if (str == null)
_log.debug("reliability is not specified, fallback");
else
_log.debug("reliability is specified: " + str);
}
String reliability = _options.getProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
return I2PClient.PROP_RELIABILITY_GUARANTEED.equals(reliability);
}
protected static final Set createNewTags(int num) {
Set tags = new HashSet();
for (int i = 0; i < num; i++)

View File

@ -85,16 +85,6 @@ class I2PSessionImpl2 extends I2PSessionImpl {
if (isClosed()) throw new I2PSessionException("Already closed");
if (SHOULD_COMPRESS) payload = DataHelper.compress(payload, offset, size);
else throw new IllegalStateException("we need to update sendGuaranteed to support partial send");
if (_log.shouldLog(Log.DEBUG)) _log.debug("message compressed");
// we always send as guaranteed (so we get the session keys/tags acked),
// but only block until the appropriate event has been reached (guaranteed
// success or accepted). we may want to break this out into a seperate
// attribute, allowing both nonblocking sends and transparently managed keys,
// as well as the nonblocking sends with application managed keys. Later.
if (isGuaranteed() || false) {
//_log.error("sendGuaranteed");
return sendGuaranteed(dest, payload, keyUsed, tagsSent);
}
return sendBestEffort(dest, payload, keyUsed, tagsSent);
}
@ -269,142 +259,6 @@ class I2PSessionImpl2 extends I2PSessionImpl {
return found;
}
private boolean sendGuaranteed(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent)
throws I2PSessionException {
SessionKey key = _context.sessionKeyManager().getCurrentKey(dest.getPublicKey());
if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey());
SessionTag tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key);
Set sentTags = null;
if (_context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key) < 10) {
sentTags = createNewTags(NUM_TAGS);
} else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 2 * 60 * 1000) {
// if we have > 10 tags, but they expire in under 30 seconds, we want more
sentTags = createNewTags(NUM_TAGS);
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding " + NUM_TAGS + " new ones");
}
SessionKey newKey = null;
if (false) // rekey
newKey = _context.keyGenerator().generateSessionKey();
long nonce = _context.random().nextInt(Integer.MAX_VALUE);
MessageState state = new MessageState(_context, nonce, getPrefix());
state.setKey(key);
state.setTags(sentTags);
state.setNewKey(newKey);
state.setTo(dest);
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Setting key = " + key);
if (keyUsed != null) {
if (newKey != null)
keyUsed.setData(newKey.getData());
else
keyUsed.setData(key.getData());
}
if (tagsSent != null) {
if (sentTags != null) {
tagsSent.addAll(sentTags);
}
}
long beforeSendingSync = _context.clock().now();
long inSendingSync = 0;
synchronized (_sendingStates) {
inSendingSync = _context.clock().now();
_sendingStates.add(state);
}
long afterSendingSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / "
+ state.getNonce() + " for guaranteed "
+ " sync took " + (inSendingSync-beforeSendingSync)
+ " add took " + (afterSendingSync-inSendingSync));
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
long beforeWaitFor = _context.clock().now();
if (isGuaranteed())
state.waitFor(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS,
_context.clock().now() + SEND_TIMEOUT);
else
state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED,
_context.clock().now() + SEND_TIMEOUT);
long afterWaitFor = _context.clock().now();
long inRemovingSync = 0;
synchronized (_sendingStates) {
inRemovingSync = _context.clock().now();
_sendingStates.remove(state);
}
long afterRemovingSync = _context.clock().now();
boolean guaranteed = isGuaranteed();
boolean found = false;
boolean accepted = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
if (guaranteed)
found = state.received(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS);
else
found = accepted;
if ((!accepted) || (state.getMessageId() == null)) {
if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, getPrefix() + "State with nonce " + state.getNonce()
+ " was not accepted? (no messageId!! found=" + found
+ " msgId=" + state.getMessageId()
+ ", sendingSync=" + (afterSendingSync-beforeSendingSync)
+ ", sendMessage=" + (beforeWaitFor-afterSendingSync)
+ ", waitFor=" + (afterWaitFor-beforeWaitFor)
+ ", removingSync=" + (afterRemovingSync-afterWaitFor)
+ ")");
//if (true)
// throw new OutOfMemoryError("not really an OOM, but more of jr fucking shit up");
if (guaranteed)
nackTags(state);
return false;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId().getMessageId()
+ " / " + state.getNonce() + " found = " + found);
// the 'found' value is only useful for mode=Guaranteed, as mode=BestEffort
// doesn't block
if (guaranteed) {
if (found) {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with "
+ payload.length + " bytes");
ackTags(state);
} else {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with "
+ payload.length + " bytes");
nackTags(state);
}
} else {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Message send enqueued after " + state.getElapsed() + "ms with "
+ payload.length + " bytes");
}
return found;
}
private void ackTags(MessageState state) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "ack tags for msgId " + state.getMessageId() + " / "
+ state.getNonce() + " key = " + state.getKey() + ", tags = "
+ state.getTags());
if ((state.getTags() != null) && (state.getTags().size() > 0)) {
if (state.getNewKey() == null)
_context.sessionKeyManager().tagsDelivered(state.getTo().getPublicKey(), state.getKey(), state.getTags());
else
_context.sessionKeyManager().tagsDelivered(state.getTo().getPublicKey(), state.getNewKey(), state.getTags());
}
}
private void nackTags(MessageState state) {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "nack tags for msgId " + state.getMessageId() + " / " + state.getNonce()
+ " key = " + state.getKey());
_context.sessionKeyManager().failTags(state.getTo().getPublicKey());
}
public void receiveStatus(int msgId, long nonce, int status) {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
MessageState state = null;

View File

@ -28,9 +28,6 @@ class MessageStatusMessageHandler extends HandlerImpl {
public void handleMessage(I2CPMessage message, I2PSessionImpl session) {
boolean skipStatus = true;
if (I2PClient.PROP_RELIABILITY_GUARANTEED.equals(session.getOptions().getProperty(I2PClient.PROP_RELIABILITY,
I2PClient.PROP_RELIABILITY_BEST_EFFORT)))
skipStatus = false;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle message " + message);
MessageStatusMessage msg = (MessageStatusMessage) message;