Router: Implement ratchet-layer acks (proposal 144)

Store destination in outbound session
Allow sending null data through OCMOSJ for ratchet acks; omit data clove
Only call messageDeliveryStatusUpdate() for nonzero nonce
This commit is contained in:
zzz
2020-06-03 12:33:09 +00:00
parent 2af26f7d5b
commit 3f895d32dd
11 changed files with 231 additions and 36 deletions

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 3;
public final static long BUILD = 4;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -633,7 +633,7 @@ class ClientConnectionRunner {
*
* @param dest the client
* @param id the router's ID for this message
* @param messageNonce the client's ID for this message
* @param messageNonce the client's ID for this message, greater than zero
* @param status see I2CP MessageStatusMessage for success/failure codes
*/
void updateMessageDeliveryStatus(Destination dest, MessageId id, long messageNonce, int status) {

View File

@ -669,7 +669,7 @@ class ClientManager {
/**
* @param id the router's ID for this message
* @param messageNonce the client's ID for this message
* @param messageNonce the client's ID for this message, greater than zero
* @param status see I2CP MessageStatusMessage for success/failure codes
*/
public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, long messageNonce, int status) {

View File

@ -191,7 +191,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
/**
* @param id the router's ID for this message
* @param messageNonce the client's ID for this message
* @param messageNonce the client's ID for this message, greater than zero
* @param status see I2CP MessageStatusMessage for success/failure codes
*/
public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, long messageNonce, int status) {

View File

@ -0,0 +1,60 @@
package net.i2p.router.crypto.ratchet;
import net.i2p.data.Destination;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.SessionConfig;
import net.i2p.router.ClientMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* Send an empty message if the timer expires.
*
* This will be created for incoming NS, NSR,
* ACK request blocks, and forward next key blocks.
* The vast majority of these will be cancelled before firing,
* when streaming sends a response.
* This should only fire if streaming drops completely,
* and for certain datagram traffic patterns.
*
* @since 0.9.47
*/
class ACKTimer extends SimpleTimer2.TimedEvent {
private final RouterContext _context;
private final Log _log;
private final Destination _from, _to;
private static final long EXPIRATION = 60*1000;
/**
* Caller must schedule
*
* @param from local destination ACK will come from
* @param to remote destination ACK will go to
*
*/
public ACKTimer(RouterContext context, Destination from, Destination to) {
super(context.simpleTimer2());
_context = context;;
_log = context.logManager().getLog(ACKTimer.class);
_from = from;
_to = to;
}
public void timeReached() {
SessionConfig config = _context.clientManager().getClientSessionConfig(_from);
if (config == null) {
// Client gone
return;
}
long now = _context.clock().now();
long exp = now + EXPIRATION;
MessageId msgID = new MessageId();
// null payload, no nonce, no flags
ClientMessage cmsg = new ClientMessage(_to, null, config, _from, msgID, 0, exp, 0);
_context.clientMessagePool().add(cmsg, true);
if (_log.shouldInfo())
_log.info("Sent ratchet ack from " + _from.toBase32() + " to " + _to.toBase32());
}
}

View File

@ -443,7 +443,7 @@ public final class ECIESAEADEngine {
// tell the SKM
PublicKey alice = new PublicKey(EncType.ECIES_X25519, alicePK);
keyManager.createSession(alice, state, null);
keyManager.createSession(alice, null, state, null);
if (pc.cloveSet.isEmpty()) {
// this is legal
@ -455,8 +455,7 @@ public final class ECIESAEADEngine {
GarlicClove[] arr = new GarlicClove[num];
// msg id and expiration not checked in GarlicMessageReceiver
CloveSet rv = new CloveSet(pc.cloveSet.toArray(arr), Certificate.NULL_CERT, 0, pc.datetime);
// TODO
//setResponseTimer(alice, pc.cloveSet, keyManager);
setResponseTimerNS(alice, pc.cloveSet, keyManager);
return rv;
}
@ -597,8 +596,7 @@ public final class ECIESAEADEngine {
GarlicClove[] arr = new GarlicClove[num];
// msg id and expiration not checked in GarlicMessageReceiver
CloveSet rv = new CloveSet(pc.cloveSet.toArray(arr), Certificate.NULL_CERT, 0, pc.datetime);
// TODO
//setResponseTimer(bob, pc.cloveSet, keyManager);
setResponseTimer(bob, pc.cloveSet, keyManager);
return rv;
}
@ -647,13 +645,20 @@ public final class ECIESAEADEngine {
} catch (Exception e) {
throw new DataFormatException("ES payload error", e);
}
boolean shouldAck = false;
if (pc.nextKeys != null) {
for (NextSessionKey nextKey : pc.nextKeys) {
keyManager.nextKeyReceived(remote, nextKey);
if (!nextKey.isReverse())
shouldAck = true;
}
}
if (pc.ackRequested) {
keyManager.ackRequested(remote, key.getID(), nonce);
shouldAck = true;
}
if (shouldAck) {
setResponseTimer(remote, pc.cloveSet, keyManager);
}
if (pc.cloveSet.isEmpty()) {
// this is legal
@ -706,18 +711,18 @@ public final class ECIESAEADEngine {
* @return encrypted data or null on failure
*
*/
public byte[] encrypt(CloveSet cloves, PublicKey target, PrivateKey priv,
public byte[] encrypt(CloveSet cloves, PublicKey target, Destination to, PrivateKey priv,
RatchetSKM keyManager,
ReplyCallback callback) {
try {
return x_encrypt(cloves, target, priv, keyManager, callback);
return x_encrypt(cloves, target, to, priv, keyManager, callback);
} catch (Exception e) {
_log.error("ECIES encrypt error", e);
return null;
}
}
private byte[] x_encrypt(CloveSet cloves, PublicKey target, PrivateKey priv,
private byte[] x_encrypt(CloveSet cloves, PublicKey target, Destination to, PrivateKey priv,
RatchetSKM keyManager,
ReplyCallback callback) {
if (target.getType() != EncType.ECIES_X25519)
@ -732,7 +737,7 @@ public final class ECIESAEADEngine {
if (re == null) {
if (_log.shouldDebug())
_log.debug("Encrypting as NS to " + target);
return encryptNewSession(cloves, target, priv, keyManager, callback);
return encryptNewSession(cloves, target, to, priv, keyManager, callback);
}
HandshakeState state = re.key.getHandshakeState();
@ -772,7 +777,7 @@ public final class ECIESAEADEngine {
* @param callback may be null
* @return encrypted data or null on failure
*/
private byte[] encryptNewSession(CloveSet cloves, PublicKey target, PrivateKey priv,
private byte[] encryptNewSession(CloveSet cloves, PublicKey target, Destination to, PrivateKey priv,
RatchetSKM keyManager,
ReplyCallback callback) {
HandshakeState state;
@ -813,7 +818,7 @@ public final class ECIESAEADEngine {
_log.debug("Elligator2 encoded eph. key: " + Base64.encode(enc, 0, 32));
// tell the SKM
keyManager.createSession(target, state, callback);
keyManager.createSession(target, to, state, callback);
return enc;
}
@ -1222,10 +1227,11 @@ public final class ECIESAEADEngine {
/*
* Set a timer for a ratchet-layer reply if the application does not respond.
* NS only. CloveSet must include a LS for validation.
*
* @since 0.9.46
*/
private void setResponseTimer(PublicKey from, List<GarlicClove> cloveSet, RatchetSKM skm) {
private void setResponseTimerNS(PublicKey from, List<GarlicClove> cloveSet, RatchetSKM skm) {
for (GarlicClove clove : cloveSet) {
I2NPMessage msg = clove.getData();
if (msg.getType() != DatabaseStoreMessage.MESSAGE_TYPE)
@ -1246,13 +1252,39 @@ public final class ECIESAEADEngine {
Destination d = ls2.getDestination();
if (_log.shouldInfo())
_log.info("Validated NS sender: " + d.toBase32());
// TODO
Destination us = skm.getDestination();
ACKTimer ack = new ACKTimer(_context, us, d);
if (skm.registerTimer(from, d, ack)) {
ack.schedule(1000);
}
return;
}
if (_log.shouldInfo())
_log.info("Unvalidated NS sender: " + from);
}
/*
* Set a timer for a ratchet-layer reply if the application does not respond.
* NSR/ES only.
*
* @since 0.9.47
*/
private void setResponseTimer(PublicKey from, List<GarlicClove> cloveSet, RatchetSKM skm) {
Destination us = skm.getDestination();
Destination d = skm.getDestination(from);
if (d != null) {
ACKTimer ack = new ACKTimer(_context, us, d);
if (skm.registerTimer(from, null, ack)) {
ack.schedule(1000);
}
} else {
// we didn't get a LS in the original NS, but maybe we have one now
if (_log.shouldInfo())
_log.info("No full dest to ack to, looking for LS from: " + from);
setResponseTimerNS(from, cloveSet, skm);
}
}
/****
public static void main(String args[]) {

View File

@ -184,13 +184,14 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
* For inbound (NS rcvd), if no other pending outbound sessions, creates one
* and returns true, or false if one already exists.
*
* @param d null if unknown
* @param callback null for inbound, may be null for outbound
*/
boolean createSession(PublicKey target, HandshakeState state, ReplyCallback callback) {
boolean createSession(PublicKey target, Destination d, HandshakeState state, ReplyCallback callback) {
EncType type = target.getType();
if (type != EncType.ECIES_X25519)
throw new IllegalArgumentException("Bad public key type " + type);
OutboundSession sess = new OutboundSession(target, null, state, callback);
OutboundSession sess = new OutboundSession(target, d, null, state, callback);
boolean isInbound = state.getRole() == HandshakeState.RESPONDER;
if (isInbound) {
// we are Bob, NS received
@ -309,7 +310,7 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
/**
* @since 0.9.46
*/
public void nextKeyReceived(PublicKey target, NextSessionKey key) {
void nextKeyReceived(PublicKey target, NextSessionKey key) {
OutboundSession sess = getSession(target);
if (sess == null) {
if (_log.shouldWarn())
@ -319,6 +320,35 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
sess.nextKeyReceived(key);
}
/**
* Side effect - binds this session to the supplied destination.
*
* @param the far-end Destination for this PublicKey if known, or null
* @return true if registered
* @since 0.9.47
*/
boolean registerTimer(PublicKey target, Destination d, SimpleTimer2.TimedEvent timer) {
OutboundSession sess = getSession(target);
if (sess == null) {
if (_log.shouldWarn())
_log.warn("registerTimer() but no session found for " + target);
return false;
}
return sess.registerTimer(d, timer);
}
/**
* @return the far-end Destination for this PublicKey, or null
* @since 0.9.47
*/
Destination getDestination(PublicKey target) {
OutboundSession sess = getSession(target);
if (sess != null) {
return sess.getDestination();
}
return null;
}
/**
* @throws UnsupportedOperationException always
*/
@ -888,6 +918,8 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
private RatchetTagSet _tagSet;
private final ConcurrentHashMap<Integer, ReplyCallback> _callbacks;
private final LinkedBlockingQueue<Integer> _acksToSend;
private SimpleTimer2.TimedEvent _ackTimer;
private Destination _destination;
/**
* Set to true after first tagset is acked.
* Upon repeated failures, we may revert back to false.
@ -924,11 +956,13 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
private static final int MAX_SEND_REVERSE_KEY = 64;
/**
* @param d may be null
* @param key may be null
* @param callback may be null. Always null for IB.
*/
public OutboundSession(PublicKey target, SessionKey key, HandshakeState state, ReplyCallback callback) {
public OutboundSession(PublicKey target, Destination d, SessionKey key, HandshakeState state, ReplyCallback callback) {
_target = target;
_destination = d;
_currentKey = key;
_NScallback = callback;
_established = _context.clock().now();
@ -1376,6 +1410,13 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
}
synchronized (_unackedTagSets) {
if (_tagSet != null) {
if (_ackTimer != null) {
// cancel all ratchet-layer acks
_ackTimer.cancel();
_ackTimer = null;
//if (_log.shouldDebug())
// _log.debug("Cancelled the ack timer");
}
synchronized(_tagSet) {
// use even if expired, this will reset the expiration
RatchetSessionTag tag = _tagSet.consumeNext();
@ -1396,6 +1437,41 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
return null;
}
/**
* A timer that we will cancel when we send someting.
* Side effect - binds this session to the supplied destination.
*
* @param d the far-end Destination for this PublicKey if known, or null
* @return true if registered
* @since 0.9.47
*/
public boolean registerTimer(Destination d, SimpleTimer2.TimedEvent timer) {
synchronized (_unackedTagSets) {
if (_ackTimer != null)
return false;
if (d != null) {
if (_destination == null)
_destination = d;
else if (_log.shouldWarn() && !_destination.equals(d))
_log.warn("Destination mismatch? was: " + _destination.toBase32() + " now: " + d.toBase32());
}
_ackTimer = timer;
if (_log.shouldDebug())
_log.debug("Registered an ack timer to: " + (_destination != null ? _destination.toBase32() : _target.toString()));
}
return true;
}
/**
* @return the far-end Destination for this PublicKey, or null
* @since 0.9.47
*/
public Destination getDestination() {
synchronized (_unackedTagSets) {
return _destination;
}
}
/** @return the total number of tags in acked RatchetTagSets */
public int availableTags() {
long now = _context.clock().now();

View File

@ -19,6 +19,7 @@ import net.i2p.crypto.SessionKeyManager;
import net.i2p.data.Certificate;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.PrivateKey;
import net.i2p.data.PublicKey;
@ -283,7 +284,7 @@ public class GarlicMessageBuilder {
* @since 0.9.44
*/
static GarlicMessage buildECIESMessage(RouterContext ctx, GarlicConfig config,
PublicKey target, Hash from, SessionKeyManager skm,
PublicKey target, Hash from, Destination to, SessionKeyManager skm,
ReplyCallback callback) {
PublicKey key = config.getRecipientPublicKey();
if (key.getType() != EncType.ECIES_X25519)
@ -314,7 +315,7 @@ public class GarlicMessageBuilder {
log.warn("No SKM for " + from.toBase32());
return null;
}
byte encData[] = ctx.eciesEngine().encrypt(cloveSet, target, priv, rskm, callback);
byte encData[] = ctx.eciesEngine().encrypt(cloveSet, target, to, priv, rskm, callback);
if (encData == null) {
if (log.shouldWarn())
log.warn("Encrypt fail for " + from.toBase32());

View File

@ -101,6 +101,7 @@ class OutboundClientMessageJobHelper {
*
* This is called from OCMOSJ
*
* @param dataClove may be null for ECIES-layer ack
* @param tagsToSendOverride if &gt; 0, use this instead of skm's default
* @param lowTagsOverride if &gt; 0, use this instead of skm's default
* @param wrappedKey non-null with null data,
@ -130,7 +131,7 @@ class OutboundClientMessageJobHelper {
return null;
GarlicMessage msg;
if (isECIES) {
msg = GarlicMessageBuilder.buildECIESMessage(ctx, config, recipientPK, from, skm, callback);
msg = GarlicMessageBuilder.buildECIESMessage(ctx, config, recipientPK, from, dest, skm, callback);
} else {
// no use sending tags unless we have a reply token set up already
int tagsToSend = replyToken >= 0 ? (tagsToSendOverride > 0 ? tagsToSendOverride : skm.getTagsToSend()) : 0;
@ -145,7 +146,7 @@ class OutboundClientMessageJobHelper {
* Make the top-level config, with a data clove, an optional ack clove, and
* an optional leaseset clove.
*
* @param dataClove non-null
* @param dataClove may be null for ECIES-layer ack
* @param replyTunnel non-null if requireAck is true or bundledReplyLeaseSet is non-null
* @param requireAck if true, bundle replyToken in an ack clove
* @param bundledReplyLeaseSet may be null; if non-null, put it in a clove
@ -183,7 +184,8 @@ class OutboundClientMessageJobHelper {
// As of 0.9.2, since the receiver processes them in-order,
// put data clove last to speed up the ack,
// and get the leaseset stored before handling the data
config.addClove(dataClove);
if (dataClove != null)
config.addClove(dataClove);
config.setRecipientPublicKey(recipientPK);

View File

@ -187,7 +187,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private static final int REPLY_REQUEST_INTERVAL = 60*1000;
/**
* Send the sucker
* Send it.
*
* @param msg may have a null payload for ratchet-layer acks
*/
public OutboundClientMessageOneShotJob(RouterContext ctx, OutboundCache cache, ClientMessage msg) {
super(ctx);
@ -198,7 +200,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
_clientMessage = msg;
_clientMessageId = msg.getMessageId();
_clientMessageSize = msg.getPayload().getSize();
Payload payload = msg.getPayload();
_clientMessageSize = (payload != null) ? payload.getSize() : 0;
_from = msg.getFromDestination();
_to = msg.getDestination();
Hash toHash = _to.calculateHash();
@ -652,10 +655,16 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
token = -1;
}
PayloadGarlicConfig clove = buildClove();
if (clove == null) {
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION);
return;
PayloadGarlicConfig clove;
if (_clientMessageSize > 0) {
clove = buildClove();
if (clove == null) {
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION);
return;
}
} else {
// ratchet-layer acks
clove = null;
}
SessionKey sessKey = new SessionKey();
@ -941,8 +950,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
clearCaches();
getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime);
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId,
_clientMessage.getMessageNonce(), status);
long nonce = _clientMessage.getMessageNonce();
if (nonce > 0)
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, nonce, status);
getContext().statManager().updateFrequency("client.sendMessageFailFrequency");
}
@ -1112,8 +1122,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
//long dataMsgId = _cloveId; // fake ID 99999
getContext().messageHistory().sendPayloadMessage(99999, true, sendTime);
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, _clientMessage.getMessageNonce(),
MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS);
long nonce = _clientMessage.getMessageNonce();
if (nonce > 0)
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, nonce,
MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS);
// unused
//_lease.setNumSuccess(_lease.getNumSuccess()+1);