forked from I2P_Developers/i2p.i2p
Ratchet: More updates
- Reply callback from ratchet to OCMOSJ (ES TODO) - Store key ID in tagset (prep for next key) - Move debug tagset ID from engine to tagset - OCMOSJ minor cleanups
This commit is contained in:
@ -304,7 +304,7 @@ public final class ECIESAEADEngine {
|
||||
|
||||
// tell the SKM
|
||||
PublicKey bob = new PublicKey(EncType.ECIES_X25519, bobPK);
|
||||
keyManager.createSession(bob, state);
|
||||
keyManager.createSession(bob, state, null);
|
||||
|
||||
if (pc.cloveSet.isEmpty()) {
|
||||
if (_log.shouldWarn())
|
||||
@ -430,7 +430,7 @@ public final class ECIESAEADEngine {
|
||||
|
||||
// tell the SKM
|
||||
PublicKey bob = new PublicKey(EncType.ECIES_X25519, bobPK);
|
||||
keyManager.updateSession(bob, oldState, state);
|
||||
keyManager.updateSession(bob, oldState, state, null);
|
||||
|
||||
if (pc.cloveSet.isEmpty()) {
|
||||
if (_log.shouldWarn())
|
||||
@ -558,13 +558,15 @@ public final class ECIESAEADEngine {
|
||||
* @param target public key to which the data should be encrypted.
|
||||
* @param priv local private key to encrypt with, from the leaseset
|
||||
* @param replyDI non-null to request an ack, or null
|
||||
* @param callback may be null
|
||||
* @return encrypted data or null on failure
|
||||
*
|
||||
*/
|
||||
public byte[] encrypt(CloveSet cloves, PublicKey target, PrivateKey priv,
|
||||
RatchetSKM keyManager, DeliveryInstructions replyDI) {
|
||||
RatchetSKM keyManager, DeliveryInstructions replyDI,
|
||||
ReplyCallback callback) {
|
||||
try {
|
||||
return x_encrypt(cloves, target, priv, keyManager, replyDI);
|
||||
return x_encrypt(cloves, target, priv, keyManager, replyDI, callback);
|
||||
} catch (Exception e) {
|
||||
_log.error("ECIES encrypt error", e);
|
||||
return null;
|
||||
@ -572,7 +574,8 @@ public final class ECIESAEADEngine {
|
||||
}
|
||||
|
||||
private byte[] x_encrypt(CloveSet cloves, PublicKey target, PrivateKey priv,
|
||||
RatchetSKM keyManager, DeliveryInstructions replyDI) {
|
||||
RatchetSKM keyManager, DeliveryInstructions replyDI,
|
||||
ReplyCallback callback) {
|
||||
if (target.getType() != EncType.ECIES_X25519)
|
||||
throw new IllegalArgumentException();
|
||||
if (Arrays.equals(target.getData(), NULLPK)) {
|
||||
@ -586,7 +589,7 @@ public final class ECIESAEADEngine {
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Encrypting as NS to " + target);
|
||||
// no ack in NS
|
||||
return encryptNewSession(cloves, target, priv, keyManager, null);
|
||||
return encryptNewSession(cloves, target, priv, keyManager, null, callback);
|
||||
}
|
||||
|
||||
HandshakeState state = re.key.getHandshakeState();
|
||||
@ -601,11 +604,11 @@ public final class ECIESAEADEngine {
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Encrypting as NSR to " + target + " with tag " + re.tag.toBase64());
|
||||
// no ack in NSR
|
||||
return encryptNewSessionReply(cloves, target, state, re.tag, keyManager, null);
|
||||
return encryptNewSessionReply(cloves, target, state, re.tag, keyManager, null, callback);
|
||||
}
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Encrypting as ES to " + target + " with key " + re.key + " and tag " + re.tag.toBase64());
|
||||
byte rv[] = encryptExistingSession(cloves, target, re, replyDI);
|
||||
byte rv[] = encryptExistingSession(cloves, target, re, replyDI, callback);
|
||||
return rv;
|
||||
}
|
||||
|
||||
@ -625,10 +628,12 @@ public final class ECIESAEADEngine {
|
||||
* </pre>
|
||||
*
|
||||
* @param replyDI non-null to request an ack, or null
|
||||
* @param callback may be null
|
||||
* @return encrypted data or null on failure
|
||||
*/
|
||||
private byte[] encryptNewSession(CloveSet cloves, PublicKey target, PrivateKey priv,
|
||||
RatchetSKM keyManager, DeliveryInstructions replyDI) {
|
||||
RatchetSKM keyManager, DeliveryInstructions replyDI,
|
||||
ReplyCallback callback) {
|
||||
HandshakeState state;
|
||||
try {
|
||||
state = new HandshakeState(HandshakeState.PATTERN_ID_IK, HandshakeState.INITIATOR, _edhThread);
|
||||
@ -667,7 +672,7 @@ public final class ECIESAEADEngine {
|
||||
_log.debug("Elligator2 encoded eph. key: " + Base64.encode(enc, 0, 32));
|
||||
|
||||
// tell the SKM
|
||||
keyManager.createSession(target, state);
|
||||
keyManager.createSession(target, state, callback);
|
||||
return enc;
|
||||
}
|
||||
|
||||
@ -689,11 +694,12 @@ public final class ECIESAEADEngine {
|
||||
*
|
||||
* @param state must have already been cloned
|
||||
* @param replyDI non-null to request an ack, or null
|
||||
* @param callback may be null
|
||||
* @return encrypted data or null on failure
|
||||
*/
|
||||
private byte[] encryptNewSessionReply(CloveSet cloves, PublicKey target, HandshakeState state,
|
||||
RatchetSessionTag currentTag, RatchetSKM keyManager,
|
||||
DeliveryInstructions replyDI) {
|
||||
DeliveryInstructions replyDI, ReplyCallback callback) {
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("State before encrypt new session reply: " + state);
|
||||
byte[] tag = currentTag.getData();
|
||||
@ -746,7 +752,7 @@ public final class ECIESAEADEngine {
|
||||
return null;
|
||||
}
|
||||
// tell the SKM
|
||||
keyManager.updateSession(target, null, state);
|
||||
keyManager.updateSession(target, null, state, callback);
|
||||
|
||||
return enc;
|
||||
}
|
||||
@ -765,7 +771,7 @@ public final class ECIESAEADEngine {
|
||||
* @return encrypted data or null on failure
|
||||
*/
|
||||
private byte[] encryptExistingSession(CloveSet cloves, PublicKey target, RatchetEntry re,
|
||||
DeliveryInstructions replyDI) {
|
||||
DeliveryInstructions replyDI, ReplyCallback callback) {
|
||||
//
|
||||
if (ACKREQ_IN_ES && replyDI == null)
|
||||
replyDI = new DeliveryInstructions();
|
||||
@ -774,6 +780,9 @@ public final class ECIESAEADEngine {
|
||||
SessionKeyAndNonce key = re.key;
|
||||
byte encr[] = encryptAEADBlock(rawTag, payload, key, key.getNonce());
|
||||
System.arraycopy(rawTag, 0, encr, 0, TAGLEN);
|
||||
if (callback != null) {
|
||||
// TODO
|
||||
}
|
||||
return encr;
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,6 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.southernstorm.noise.protocol.HandshakeState;
|
||||
|
||||
@ -48,9 +47,6 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
private final ConcurrentHashMap<RatchetSessionTag, RatchetTagSet> _inboundTagSets;
|
||||
protected final I2PAppContext _context;
|
||||
private volatile boolean _alive;
|
||||
/** for debugging */
|
||||
private final AtomicInteger _rcvTagSetID = new AtomicInteger();
|
||||
private final AtomicInteger _sentTagSetID = new AtomicInteger();
|
||||
private final HKDF _hkdf;
|
||||
|
||||
/**
|
||||
@ -168,15 +164,16 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
* For inbound (NS rcvd), if no other pending outbound sessions, creates one
|
||||
* and returns true, or false if one already exists.
|
||||
*
|
||||
* @param callback null for inbound, may be null for outbound
|
||||
*/
|
||||
boolean createSession(PublicKey target, HandshakeState state) {
|
||||
boolean createSession(PublicKey target, HandshakeState state, ReplyCallback callback) {
|
||||
EncType type = target.getType();
|
||||
if (type != EncType.ECIES_X25519)
|
||||
throw new IllegalArgumentException("Bad public key type " + type);
|
||||
OutboundSession sess = new OutboundSession(target, null, state, callback);
|
||||
boolean isInbound = state.getRole() == HandshakeState.RESPONDER;
|
||||
if (isInbound) {
|
||||
// we are Bob, NS received
|
||||
OutboundSession sess = new OutboundSession(target, null, state);
|
||||
boolean rv = addSession(sess, true);
|
||||
if (_log.shouldInfo()) {
|
||||
if (rv)
|
||||
@ -187,7 +184,6 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
return rv;
|
||||
} else {
|
||||
// we are Alice, NS sent
|
||||
OutboundSession sess = new OutboundSession(target, null, state);
|
||||
synchronized (_pendingOutboundSessions) {
|
||||
List<OutboundSession> pending = _pendingOutboundSessions.get(target);
|
||||
if (pending != null) {
|
||||
@ -215,7 +211,7 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
* @param oldState null for inbound, pre-clone for outbound
|
||||
* @return true if this was the first NSR received
|
||||
*/
|
||||
boolean updateSession(PublicKey target, HandshakeState oldState, HandshakeState state) {
|
||||
boolean updateSession(PublicKey target, HandshakeState oldState, HandshakeState state, ReplyCallback callback) {
|
||||
EncType type = target.getType();
|
||||
if (type != EncType.ECIES_X25519)
|
||||
throw new IllegalArgumentException("Bad public key type " + type);
|
||||
@ -231,7 +227,7 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
// TODO can we recover?
|
||||
return false;
|
||||
}
|
||||
sess.updateSession(state);
|
||||
sess.updateSession(state, callback);
|
||||
} else {
|
||||
// we are Alice, NSR received
|
||||
if (_log.shouldInfo())
|
||||
@ -251,7 +247,7 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
if (oldState.equals(pstate)) {
|
||||
if (!found) {
|
||||
found = true;
|
||||
sess.updateSession(state);
|
||||
sess.updateSession(state, null);
|
||||
boolean ok = addSession(sess, false);
|
||||
if (_log.shouldDebug()) {
|
||||
if (ok)
|
||||
@ -407,13 +403,13 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
if (sess == null) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("No session for delivered RatchetTagSet to target: " + toString(target));
|
||||
///////////
|
||||
// TODO
|
||||
createSession(target, key);
|
||||
} else {
|
||||
sess.setCurrentKey(key);
|
||||
}
|
||||
///////////
|
||||
RatchetTagSet set = new RatchetTagSet(_hkdf, key, key, _context.clock().now(), _sentTagSetID.incrementAndGet());
|
||||
// TODO
|
||||
RatchetTagSet set = new RatchetTagSet(_hkdf, key, key, _context.clock().now(), 0);
|
||||
sess.addTags(set);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Tags delivered: " + set +
|
||||
@ -812,6 +808,8 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
private class OutboundSession {
|
||||
private final PublicKey _target;
|
||||
private final HandshakeState _state;
|
||||
private final ReplyCallback _NScallback;
|
||||
private ReplyCallback _NSRcallback;
|
||||
private SessionKey _currentKey;
|
||||
private final long _established;
|
||||
private long _lastUsed;
|
||||
@ -843,10 +841,17 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
private int _consecutiveFailures;
|
||||
|
||||
private static final int MAX_FAILS = 2;
|
||||
private static final int DEBUG_OB_NSR = 0x10001;
|
||||
private static final int DEBUG_IB_NSR = 0x10002;
|
||||
|
||||
public OutboundSession(PublicKey target, SessionKey key, HandshakeState state) {
|
||||
/**
|
||||
* @param key may be null
|
||||
* @param callback may be null. Always null for IB.
|
||||
*/
|
||||
public OutboundSession(PublicKey target, SessionKey key, HandshakeState state, ReplyCallback callback) {
|
||||
_target = target;
|
||||
_currentKey = key;
|
||||
_NScallback = callback;
|
||||
_established = _context.clock().now();
|
||||
_lastUsed = _established;
|
||||
_unackedTagSets = new HashSet<RatchetTagSet>(4);
|
||||
@ -863,7 +868,7 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
// This is an INBOUND NS, we make an OUTBOUND tagset for the NSR
|
||||
RatchetTagSet tagset = new RatchetTagSet(_hkdf, state,
|
||||
rk, tk,
|
||||
_established, _sentTagSetID.getAndIncrement());
|
||||
_established, DEBUG_OB_NSR);
|
||||
_tagSets.add(tagset);
|
||||
_state = null;
|
||||
if (_log.shouldDebug())
|
||||
@ -873,7 +878,7 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
// This is an OUTBOUND NS, we make an INBOUND tagset for the NSR
|
||||
RatchetTagSet tagset = new RatchetTagSet(_hkdf, RatchetSKM.this, state,
|
||||
rk, tk,
|
||||
_established, _rcvTagSetID.getAndIncrement(),
|
||||
_established, DEBUG_IB_NSR,
|
||||
MIN_RCV_WINDOW_NSR, MAX_RCV_WINDOW_NSR);
|
||||
// store the state so we can find the right session when we receive the NSR
|
||||
_state = state;
|
||||
@ -888,8 +893,9 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
* For inbound (NSR sent by Bob), sets up inbound ES tagset.
|
||||
*
|
||||
* @param state current state
|
||||
* @param callback only for inbound (NSR sent by Bob), may be null
|
||||
*/
|
||||
void updateSession(HandshakeState state) {
|
||||
void updateSession(HandshakeState state, ReplyCallback callback) {
|
||||
byte[] ck = state.getChainingKey();
|
||||
byte[] k_ab = new byte[32];
|
||||
byte[] k_ba = new byte[32];
|
||||
@ -901,26 +907,27 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
// We are Bob
|
||||
// This is an OUTBOUND NSR, we make an INBOUND tagset for ES
|
||||
RatchetTagSet tagset_ab = new RatchetTagSet(_hkdf, RatchetSKM.this, _target, rk, new SessionKey(k_ab),
|
||||
now, _rcvTagSetID.getAndIncrement(),
|
||||
now, 0,
|
||||
MIN_RCV_WINDOW_ES, MAX_RCV_WINDOW_ES);
|
||||
// and a pending outbound one
|
||||
RatchetTagSet tagset_ba = new RatchetTagSet(_hkdf, rk, new SessionKey(k_ba),
|
||||
now, _sentTagSetID.getAndIncrement());
|
||||
now, 0);
|
||||
if (_log.shouldDebug()) {
|
||||
_log.debug("Update IB Session, rk = " + rk + " tk = " + Base64.encode(k_ab) + " ES tagset: " + tagset_ab);
|
||||
_log.debug("Pending OB Session, rk = " + rk + " tk = " + Base64.encode(k_ba) + " ES tagset: " + tagset_ba);
|
||||
}
|
||||
synchronized (_tagSets) {
|
||||
_unackedTagSets.add(tagset_ba);
|
||||
_NSRcallback = callback;
|
||||
}
|
||||
} else {
|
||||
// We are Alice
|
||||
// This is an INBOUND NSR, we make an OUTBOUND tagset for ES
|
||||
RatchetTagSet tagset_ab = new RatchetTagSet(_hkdf, rk, new SessionKey(k_ab),
|
||||
now, _sentTagSetID.getAndIncrement());
|
||||
now, 0);
|
||||
// and an inbound one
|
||||
RatchetTagSet tagset_ba = new RatchetTagSet(_hkdf, RatchetSKM.this, _target, rk, new SessionKey(k_ba),
|
||||
now, _rcvTagSetID.getAndIncrement(),
|
||||
now, 0,
|
||||
MIN_RCV_WINDOW_ES, MAX_RCV_WINDOW_ES);
|
||||
if (_log.shouldDebug()) {
|
||||
_log.debug("Update OB Session, rk = " + rk + " tk = " + Base64.encode(k_ab) + " ES tagset: " + tagset_ab);
|
||||
@ -932,6 +939,9 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
}
|
||||
// We can't destroy the original state, as more NSRs may come in
|
||||
//_state.destroy();
|
||||
// Bob received the NS, call the callback
|
||||
if (_NScallback != null)
|
||||
_NScallback.onReply();
|
||||
}
|
||||
// kills the keys for future NSRs
|
||||
//state.destroy();
|
||||
@ -955,6 +965,10 @@ public class RatchetSKM extends SessionKeyManager implements SessionTagListener
|
||||
_unackedTagSets.clear();
|
||||
_tagSets.clear();
|
||||
_tagSets.add(obSet);
|
||||
if (_NSRcallback != null) {
|
||||
_NSRcallback.onReply();
|
||||
_NSRcallback = null;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.southernstorm.noise.protocol.DHState;
|
||||
import com.southernstorm.noise.protocol.HandshakeState;
|
||||
@ -60,6 +61,9 @@ class RatchetTagSet implements TagSetHandle {
|
||||
private KeyPair _nextKeys;
|
||||
private NextSessionKey _nextKey;
|
||||
private boolean _nextKeyAcked;
|
||||
/** for debugging */
|
||||
private static final AtomicInteger __tagSetID = new AtomicInteger();
|
||||
private final int _tagSetID = __tagSetID.incrementAndGet();
|
||||
|
||||
private static final String INFO_1 = "KDFDHRatchetStep";
|
||||
private static final String INFO_2 = "TagAndKeyGenKeys";
|
||||
@ -432,7 +436,7 @@ class RatchetTagSet implements TagSetHandle {
|
||||
*/
|
||||
public boolean getAcked() { return _acked; }
|
||||
|
||||
/** for debugging */
|
||||
/** the Key ID */
|
||||
public int getID() {
|
||||
return _id;
|
||||
}
|
||||
@ -448,7 +452,8 @@ class RatchetTagSet implements TagSetHandle {
|
||||
buf.append("NSR ").append(_state.hashCode()).append(' ');
|
||||
else
|
||||
buf.append("ES ");
|
||||
buf.append("TagSet #").append(_id)
|
||||
buf.append("TagSet #").append(_tagSetID)
|
||||
.append(" keyID #").append(_id)
|
||||
.append("\nCreated: ").append(DataHelper.formatTime(_created))
|
||||
.append("\nLast use: ").append(DataHelper.formatTime(_date));
|
||||
PublicKey pk = getRemoteKey();
|
||||
|
@ -0,0 +1,20 @@
|
||||
package net.i2p.router.crypto.ratchet;
|
||||
|
||||
/**
|
||||
* ECIES will call this back if an ack was requested and received.
|
||||
*
|
||||
* @since 0.9.46
|
||||
*/
|
||||
public interface ReplyCallback {
|
||||
|
||||
/**
|
||||
* When does this callback expire?
|
||||
* @return java time
|
||||
*/
|
||||
public long getExpiration();
|
||||
|
||||
/**
|
||||
* A reply was received.
|
||||
*/
|
||||
public void onReply();
|
||||
}
|
@ -1,7 +1,5 @@
|
||||
package net.i2p.router.crypto.ratchet;
|
||||
|
||||
import net.i2p.data.SessionTag;
|
||||
|
||||
/**
|
||||
* Something that looks for SessionTags.
|
||||
*
|
||||
|
@ -32,6 +32,7 @@ import net.i2p.router.LeaseSetKeys;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.crypto.ratchet.MuxedSKM;
|
||||
import net.i2p.router.crypto.ratchet.RatchetSKM;
|
||||
import net.i2p.router.crypto.ratchet.ReplyCallback;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
@ -246,20 +247,21 @@ public class GarlicMessageBuilder {
|
||||
|
||||
/**
|
||||
* ECIES_X25519 only.
|
||||
* Called by GarlicMessageBuilder only.
|
||||
* Called by OCMJH only.
|
||||
*
|
||||
* @param ctx scope
|
||||
* @param config how/what to wrap
|
||||
* @param target public key of the location being garlic routed to (may be null if we
|
||||
* know the encryptKey and encryptTag)
|
||||
* @param replyDI non-null to request an ack, or null
|
||||
* @param callback may be null
|
||||
* @return null if expired or on other errors
|
||||
* @throws IllegalArgumentException on error
|
||||
* @since 0.9.44
|
||||
*/
|
||||
static GarlicMessage buildECIESMessage(RouterContext ctx, GarlicConfig config,
|
||||
PublicKey target, Hash from, SessionKeyManager skm,
|
||||
DeliveryInstructions replyDI) {
|
||||
DeliveryInstructions replyDI, ReplyCallback callback) {
|
||||
PublicKey key = config.getRecipientPublicKey();
|
||||
if (key.getType() != EncType.ECIES_X25519)
|
||||
throw new IllegalArgumentException();
|
||||
@ -289,7 +291,7 @@ public class GarlicMessageBuilder {
|
||||
log.warn("No SKM for " + from.toBase32());
|
||||
return null;
|
||||
}
|
||||
byte encData[] = ctx.eciesEngine().encrypt(cloveSet, target, priv, rskm, replyDI);
|
||||
byte encData[] = ctx.eciesEngine().encrypt(cloveSet, target, priv, rskm, replyDI, callback);
|
||||
if (encData == null) {
|
||||
if (log.shouldWarn())
|
||||
log.warn("Encrypt fail for " + from.toBase32());
|
||||
|
@ -30,6 +30,7 @@ import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.router.LeaseSetKeys;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.router.crypto.ratchet.ReplyCallback;
|
||||
import net.i2p.router.networkdb.kademlia.MessageWrapper;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
@ -108,12 +109,14 @@ class OutboundClientMessageJobHelper {
|
||||
* @param replyTunnel non-null if requireAck is true or bundledReplyLeaseSet is non-null
|
||||
* @param requireAck if true, bundle replyToken in an ack clove
|
||||
* @param bundledReplyLeaseSet may be null; if non-null, put it in a clove
|
||||
* @param callback only for ECIES, may be null
|
||||
* @return garlic, or null if no tunnels were found (or other errors)
|
||||
*/
|
||||
static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK,
|
||||
PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel,
|
||||
int tagsToSendOverride, int lowTagsOverride, SessionKey wrappedKey,
|
||||
Set<SessionTag> wrappedTags, boolean requireAck, LeaseSet bundledReplyLeaseSet) {
|
||||
Set<SessionTag> wrappedTags, boolean requireAck, LeaseSet bundledReplyLeaseSet,
|
||||
ReplyCallback callback) {
|
||||
|
||||
SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(from);
|
||||
if (skm == null)
|
||||
@ -147,7 +150,7 @@ class OutboundClientMessageJobHelper {
|
||||
} else {
|
||||
di = null;
|
||||
}
|
||||
msg = GarlicMessageBuilder.buildECIESMessage(ctx, config, recipientPK, from, skm, di);
|
||||
msg = GarlicMessageBuilder.buildECIESMessage(ctx, config, recipientPK, from, skm, di, callback);
|
||||
} else {
|
||||
// no use sending tags unless we have a reply token set up already
|
||||
int tagsToSend = replyToken >= 0 ? (tagsToSendOverride > 0 ? tagsToSendOverride : skm.getTagsToSend()) : 0;
|
||||
|
@ -38,6 +38,7 @@ import net.i2p.router.ReplyJob;
|
||||
import net.i2p.router.Router;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.router.crypto.ratchet.ReplyCallback;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
@ -655,12 +656,18 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
|
||||
// Per-message flag > 0 overrides per-session option
|
||||
int tagsToSend = SendMessageOptions.getTagsToSend(sendFlags);
|
||||
ReplyCallback callback;
|
||||
if (wantACK && _encryptionKey.getType() == EncType.ECIES_X25519) {
|
||||
callback = new ECIESReplyCallback(replyLeaseSet);
|
||||
} else {
|
||||
callback = null;
|
||||
}
|
||||
GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token,
|
||||
_overallExpiration, _encryptionKey,
|
||||
clove, _from.calculateHash(),
|
||||
_to, _inTunnel, tagsToSend,
|
||||
tagsRequired, sessKey, tags,
|
||||
wantACK, replyLeaseSet);
|
||||
wantACK, replyLeaseSet, callback);
|
||||
if (msg == null) {
|
||||
// set to null if there are no tunnels to ack the reply back through
|
||||
// (should we always fail for this? or should we send it anyway, even if
|
||||
@ -675,9 +682,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug(getJobId() + ": send() - token expected " + token + " to " + _toString);
|
||||
|
||||
SendSuccessJob onReply = null;
|
||||
SendTimeoutJob onFail = null;
|
||||
ReplySelector selector = null;
|
||||
SendSuccessJob onReply;
|
||||
SendTimeoutJob onFail;
|
||||
ReplySelector selector;
|
||||
|
||||
if (wantACK && _encryptionKey.getType() == EncType.ELGAMAL_2048) {
|
||||
TagSetHandle tsh = null;
|
||||
@ -686,10 +693,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
if (skm != null)
|
||||
tsh = skm.tagsDelivered(_encryptionKey, sessKey, tags);
|
||||
}
|
||||
onFail = new SendTimeoutJob(getContext(), sessKey, tsh);
|
||||
onReply = new SendSuccessJob(getContext(), sessKey, tsh, replyLeaseSet, onFail);
|
||||
onFail = new SendTimeoutJob(sessKey, tsh);
|
||||
onReply = new SendSuccessJob(sessKey, tsh, replyLeaseSet, onFail);
|
||||
long expiration = Math.max(_overallExpiration, _start + REPLY_TIMEOUT_MS_MIN);
|
||||
selector = new ReplySelector(token, expiration);
|
||||
} else {
|
||||
onReply = null;
|
||||
onFail = null;
|
||||
selector = null;
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -698,7 +709,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
+ _lease.getTunnelId() + " on "
|
||||
+ _lease.getGateway());
|
||||
|
||||
DispatchJob dispatchJob = new DispatchJob(getContext(), msg, selector, onReply, onFail);
|
||||
DispatchJob dispatchJob = new DispatchJob(msg, selector, onReply, onFail);
|
||||
//if (false) // dispatch may take 100+ms, so toss it in its own job
|
||||
// getContext().jobQueue().addJob(dispatchJob);
|
||||
//else
|
||||
@ -723,9 +734,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
* @param success non-null if sel non-null
|
||||
* @param timeout non-null if sel non-null
|
||||
*/
|
||||
public DispatchJob(RouterContext ctx, GarlicMessage msg, ReplySelector sel,
|
||||
public DispatchJob(GarlicMessage msg, ReplySelector sel,
|
||||
SendSuccessJob success, SendTimeoutJob timeout) {
|
||||
super(ctx);
|
||||
super(OutboundClientMessageOneShotJob.this.getContext());
|
||||
_msg = msg;
|
||||
_selector = sel;
|
||||
_replyFound = success;
|
||||
@ -1014,11 +1025,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
* @param key may be null
|
||||
* @param tags may be null
|
||||
* @param ls the delivered leaseset or null
|
||||
* @param timeout will be cancelled when this is run
|
||||
* @param timeout will be cancelled when this is run, may be null
|
||||
*/
|
||||
public SendSuccessJob(RouterContext enclosingContext, SessionKey key,
|
||||
public SendSuccessJob(SessionKey key,
|
||||
TagSetHandle tags, LeaseSet ls, SendTimeoutJob timeout) {
|
||||
super(enclosingContext);
|
||||
super(OutboundClientMessageOneShotJob.this.getContext());
|
||||
_key = key;
|
||||
_tags = tags;
|
||||
_deliveredLS = ls;
|
||||
@ -1064,6 +1075,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
skm.tagsAcked(_encryptionKey, _key, _tags);
|
||||
}
|
||||
}
|
||||
if (_replyTimeout != null)
|
||||
getContext().jobQueue().removeJob(_replyTimeout);
|
||||
|
||||
long sendTime = getContext().clock().now() - _start;
|
||||
@ -1111,6 +1123,26 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
public void setMessage(I2NPMessage msg) {}
|
||||
}
|
||||
|
||||
/**
|
||||
* For ECIES only.
|
||||
*
|
||||
* @since 0.9.46
|
||||
*/
|
||||
private class ECIESReplyCallback extends SendSuccessJob implements ReplyCallback {
|
||||
public ECIESReplyCallback(LeaseSet ls) {
|
||||
super(null, null, ls, null);
|
||||
}
|
||||
|
||||
public long getExpiration() {
|
||||
// same as SendTimeoutJob
|
||||
return Math.max(_overallExpiration, _start + REPLY_TIMEOUT_MS_MIN);
|
||||
}
|
||||
|
||||
public void onReply() {
|
||||
runJob();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fired after the basic timeout for sending through the given tunnel has been reached.
|
||||
* We'll accept successes later, but won't expect them
|
||||
@ -1127,8 +1159,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
* @param key may be null
|
||||
* @param tags may be null
|
||||
*/
|
||||
public SendTimeoutJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags) {
|
||||
super(enclosingContext);
|
||||
public SendTimeoutJob(SessionKey key, TagSetHandle tags) {
|
||||
super(OutboundClientMessageOneShotJob.this.getContext());
|
||||
_key = key;
|
||||
_tags = tags;
|
||||
}
|
||||
|
Reference in New Issue
Block a user