always send as a guaranteed message (but block as before) - this lets
udp-esque users get transparent sessionKey/sessionTag management. we'll probably refactor mode=guaranteed/best_effort into two concepts later, dealing with blocking and encryption seperately. logging and formatting fixes
This commit is contained in:
@ -67,10 +67,15 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent)
|
public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent)
|
||||||
throws I2PSessionException {
|
throws I2PSessionException {
|
||||||
if (isClosed()) throw new I2PSessionException("Already closed");
|
if (isClosed()) throw new I2PSessionException("Already closed");
|
||||||
if (SHOULD_COMPRESS) payload = DataHelper.compress(payload);
|
if (SHOULD_COMPRESS) payload = DataHelper.compress(payload);
|
||||||
if (isGuaranteed()) {
|
// we always send as guaranteed (so we get the session keys/tags acked),
|
||||||
|
// but only block until the appropriate event has been reached (guaranteed
|
||||||
|
// success or accepted). we may want to break this out into a seperate
|
||||||
|
// attribute, allowing both nonblocking sends and transparently managed keys,
|
||||||
|
// as well as the nonblocking sends with application managed keys. Later.
|
||||||
|
if (isGuaranteed() || true) {
|
||||||
return sendGuaranteed(dest, payload, keyUsed, tagsSent);
|
return sendGuaranteed(dest, payload, keyUsed, tagsSent);
|
||||||
} else {
|
} else {
|
||||||
return sendBestEffort(dest, payload, keyUsed, tagsSent);
|
return sendBestEffort(dest, payload, keyUsed, tagsSent);
|
||||||
@ -89,7 +94,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent)
|
private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent)
|
||||||
throws I2PSessionException {
|
throws I2PSessionException {
|
||||||
SessionKey key = SessionKeyManager.getInstance().getCurrentKey(dest.getPublicKey());
|
SessionKey key = SessionKeyManager.getInstance().getCurrentKey(dest.getPublicKey());
|
||||||
if (key == null) key = SessionKeyManager.getInstance().createSession(dest.getPublicKey());
|
if (key == null) key = SessionKeyManager.getInstance().createSession(dest.getPublicKey());
|
||||||
SessionTag tag = SessionKeyManager.getInstance().consumeNextAvailableTag(dest.getPublicKey(), key);
|
SessionTag tag = SessionKeyManager.getInstance().consumeNextAvailableTag(dest.getPublicKey(), key);
|
||||||
@ -129,8 +134,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
|||||||
_sendingStates.add(state);
|
_sendingStates.add(state);
|
||||||
}
|
}
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Adding sending state " + state.getMessageId() + " / "
|
_log.debug("Adding sending state " + state.getMessageId() + " / "
|
||||||
+ state.getNonce());
|
+ state.getNonce());
|
||||||
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
|
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
|
||||||
state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, Clock.getInstance().now() + getTimeout());
|
state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, Clock.getInstance().now() + getTimeout());
|
||||||
synchronized (_sendingStates) {
|
synchronized (_sendingStates) {
|
||||||
@ -138,19 +143,18 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
|||||||
}
|
}
|
||||||
boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
|
boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("After waitFor sending state " + state.getMessageId().getMessageId()
|
_log.debug("After waitFor sending state " + state.getMessageId().getMessageId()
|
||||||
+ " / " + state.getNonce() + " found = " + found);
|
+ " / " + state.getNonce() + " found = " + found);
|
||||||
if (found) {
|
if (found) {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Message sent after " + state.getElapsed() + "ms with "
|
_log.info("Message sent after " + state.getElapsed() + "ms with "
|
||||||
+ payload.length + " bytes");
|
+ payload.length + " bytes");
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Message send failed after " + state.getElapsed() + "ms with "
|
_log.info("Message send failed after " + state.getElapsed() + "ms with "
|
||||||
+ payload.length + " bytes");
|
+ payload.length + " bytes");
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log
|
_log.error("Never received *accepted* from the router! dropping and reconnecting");
|
||||||
.error("Never received *accepted* from the router! dropping and reconnecting");
|
|
||||||
disconnect();
|
disconnect();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -158,7 +162,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean sendGuaranteed(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent)
|
private boolean sendGuaranteed(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent)
|
||||||
throws I2PSessionException {
|
throws I2PSessionException {
|
||||||
SessionKey key = SessionKeyManager.getInstance().getCurrentKey(dest.getPublicKey());
|
SessionKey key = SessionKeyManager.getInstance().getCurrentKey(dest.getPublicKey());
|
||||||
if (key == null) key = SessionKeyManager.getInstance().createSession(dest.getPublicKey());
|
if (key == null) key = SessionKeyManager.getInstance().createSession(dest.getPublicKey());
|
||||||
SessionTag tag = SessionKeyManager.getInstance().consumeNextAvailableTag(dest.getPublicKey(), key);
|
SessionTag tag = SessionKeyManager.getInstance().consumeNextAvailableTag(dest.getPublicKey(), key);
|
||||||
@ -198,10 +202,13 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
|||||||
_sendingStates.add(state);
|
_sendingStates.add(state);
|
||||||
}
|
}
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Adding sending state " + state.getMessageId() + " / "
|
_log.debug("Adding sending state " + state.getMessageId() + " / "
|
||||||
+ state.getNonce());
|
+ state.getNonce());
|
||||||
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
|
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
|
||||||
state.waitFor(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS, Clock.getInstance().now() + SEND_TIMEOUT);
|
if (isGuaranteed())
|
||||||
|
state.waitFor(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS, Clock.getInstance().now() + SEND_TIMEOUT);
|
||||||
|
else
|
||||||
|
state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, Clock.getInstance().now() + SEND_TIMEOUT);
|
||||||
synchronized (_sendingStates) {
|
synchronized (_sendingStates) {
|
||||||
_sendingStates.remove(state);
|
_sendingStates.remove(state);
|
||||||
}
|
}
|
||||||
@ -210,28 +217,27 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
|||||||
|
|
||||||
if ((!accepted) || (state.getMessageId() == null)) {
|
if ((!accepted) || (state.getMessageId() == null)) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("State with nonce " + state.getNonce()
|
_log.error("State with nonce " + state.getNonce()
|
||||||
+ " was not accepted? (no messageId!!)");
|
+ " was not accepted? (no messageId!!)");
|
||||||
nackTags(state);
|
nackTags(state);
|
||||||
if (_log.shouldLog(Log.CRIT))
|
if (_log.shouldLog(Log.CRIT))
|
||||||
_log.log(Log.CRIT,
|
_log.log(Log.CRIT, "Disconnecting/reconnecting because we never were accepted!");
|
||||||
"Disconnecting/reconnecting because we never were accepted!");
|
|
||||||
disconnect();
|
disconnect();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("After waitFor sending state " + state.getMessageId().getMessageId()
|
_log.debug("After waitFor sending state " + state.getMessageId().getMessageId()
|
||||||
+ " / " + state.getNonce() + " found = " + found);
|
+ " / " + state.getNonce() + " found = " + found);
|
||||||
if (found) {
|
if (found) {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Message sent after " + state.getElapsed() + "ms with "
|
_log.info("Message sent after " + state.getElapsed() + "ms with "
|
||||||
+ payload.length + " bytes");
|
+ payload.length + " bytes");
|
||||||
ackTags(state);
|
ackTags(state);
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Message send failed after " + state.getElapsed() + "ms with "
|
_log.info("Message send failed after " + state.getElapsed() + "ms with "
|
||||||
+ payload.length + " bytes");
|
+ payload.length + " bytes");
|
||||||
nackTags(state);
|
nackTags(state);
|
||||||
}
|
}
|
||||||
return found;
|
return found;
|
||||||
@ -239,23 +245,21 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
|||||||
|
|
||||||
private void ackTags(MessageState state) {
|
private void ackTags(MessageState state) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("ack tags for msgId " + state.getMessageId() + " / "
|
_log.debug("ack tags for msgId " + state.getMessageId() + " / "
|
||||||
+ state.getNonce() + " key = " + state.getKey() + ", tags = "
|
+ state.getNonce() + " key = " + state.getKey() + ", tags = "
|
||||||
+ state.getTags());
|
+ state.getTags());
|
||||||
if ((state.getTags() != null) && (state.getTags().size() > 0)) {
|
if ((state.getTags() != null) && (state.getTags().size() > 0)) {
|
||||||
if (state.getNewKey() == null)
|
if (state.getNewKey() == null)
|
||||||
SessionKeyManager.getInstance().tagsDelivered(state.getTo().getPublicKey(), state.getKey(),
|
SessionKeyManager.getInstance().tagsDelivered(state.getTo().getPublicKey(), state.getKey(), state.getTags());
|
||||||
state.getTags());
|
|
||||||
else
|
else
|
||||||
SessionKeyManager.getInstance().tagsDelivered(state.getTo().getPublicKey(), state.getNewKey(),
|
SessionKeyManager.getInstance().tagsDelivered(state.getTo().getPublicKey(), state.getNewKey(), state.getTags());
|
||||||
state.getTags());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void nackTags(MessageState state) {
|
private void nackTags(MessageState state) {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("nack tags for msgId " + state.getMessageId() + " / " + state.getNonce()
|
_log.info("nack tags for msgId " + state.getMessageId() + " / " + state.getNonce()
|
||||||
+ " key = " + state.getKey());
|
+ " key = " + state.getKey());
|
||||||
SessionKeyManager.getInstance().failTags(state.getTo().getPublicKey());
|
SessionKeyManager.getInstance().failTags(state.getTo().getPublicKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -288,8 +292,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
|||||||
state.receive(status);
|
state.receive(status);
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("No matching state for messageId " + msgId + " / " + nonce
|
_log.info("No matching state for messageId " + msgId + " / " + nonce
|
||||||
+ " w/ status = " + status);
|
+ " w/ status = " + status);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user