forked from I2P_Developers/i2p.i2p
NetDB: Store Meta LS2 to floodfills (proposal #123)
This commit is contained in:
@ -13,6 +13,7 @@ import java.io.Writer;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import net.i2p.client.I2PSessionException;
|
||||
import net.i2p.crypto.SessionKeyManager;
|
||||
import net.i2p.data.Destination;
|
||||
import net.i2p.data.Hash;
|
||||
@ -104,4 +105,20 @@ public abstract class ClientManagerFacade implements Service {
|
||||
|
||||
/** @since 0.8.8 */
|
||||
public abstract void shutdown(String msg);
|
||||
|
||||
/**
|
||||
* Declare that we're going to publish a meta LS for this destination.
|
||||
* Must be called before publishing the leaseset.
|
||||
*
|
||||
* @throws I2PSessionException on duplicate dest
|
||||
* @since 0.9.41
|
||||
*/
|
||||
public void registerMetaDest(Destination dest) throws I2PSessionException {}
|
||||
|
||||
/**
|
||||
* Declare that we're no longer going to publish a meta LS for this destination.
|
||||
*
|
||||
* @since 0.9.41
|
||||
*/
|
||||
public void unregisterMetaDest(Destination dest) {}
|
||||
}
|
||||
|
@ -43,6 +43,7 @@ import net.i2p.router.ClientMessage;
|
||||
import net.i2p.router.Job;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleTimer2;
|
||||
@ -67,6 +68,8 @@ class ClientManager {
|
||||
// ClientConnectionRunner for clients w/out a Dest yet
|
||||
private final Set<ClientConnectionRunner> _pendingRunners;
|
||||
private final Set<SessionId> _runnerSessionIds;
|
||||
private final Set<Destination> _metaDests;
|
||||
private final Set<Hash> _metaHashes;
|
||||
protected final RouterContext _ctx;
|
||||
protected final int _port;
|
||||
protected volatile boolean _isStarted;
|
||||
@ -100,11 +103,13 @@ class ClientManager {
|
||||
// "How large are messages received by the client?",
|
||||
// "ClientMessages",
|
||||
// new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_listeners = new ArrayList<ClientListenerRunner>();
|
||||
_runners = new ConcurrentHashMap<Destination, ClientConnectionRunner>();
|
||||
_runnersByHash = new ConcurrentHashMap<Hash, ClientConnectionRunner>();
|
||||
_pendingRunners = new HashSet<ClientConnectionRunner>();
|
||||
_runnerSessionIds = new HashSet<SessionId>();
|
||||
_listeners = new ArrayList<ClientListenerRunner>(4);
|
||||
_runners = new ConcurrentHashMap<Destination, ClientConnectionRunner>(4);
|
||||
_runnersByHash = new ConcurrentHashMap<Hash, ClientConnectionRunner>(4);
|
||||
_pendingRunners = new HashSet<ClientConnectionRunner>(4);
|
||||
_runnerSessionIds = new HashSet<SessionId>(4);
|
||||
_metaDests = new ConcurrentHashSet<Destination>(4);
|
||||
_metaHashes = new ConcurrentHashSet<Hash>(4);
|
||||
_port = port;
|
||||
_clientTimestamper = new ClientTimestamper();
|
||||
// following are for RequestLeaseSetJob
|
||||
@ -367,6 +372,37 @@ class ClientManager {
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Declare that we're going to publish a meta LS for this destination.
|
||||
* Must be called before publishing the leaseset.
|
||||
*
|
||||
* @throws I2PSessionException on duplicate dest
|
||||
* @since 0.9.41
|
||||
*/
|
||||
public void registerMetaDest(Destination dest) throws I2PSessionException {
|
||||
synchronized (_runners) {
|
||||
if (_runners.containsKey(dest) || _metaDests.contains(dest)) {
|
||||
String msg = "Client attempted to register duplicate destination " + dest.toBase32();
|
||||
_log.error(msg);
|
||||
throw new I2PSessionException(msg);
|
||||
}
|
||||
_metaDests.add(dest);
|
||||
_metaHashes.add(dest.calculateHash());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Declare that we're no longer going to publish a meta LS for this destination.
|
||||
*
|
||||
* @since 0.9.41
|
||||
*/
|
||||
public void unregisterMetaDest(Destination dest) {
|
||||
synchronized (_runners) {
|
||||
_metaDests.remove(dest);
|
||||
_metaHashes.remove(dest.calculateHash());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a new random, unused sessionId. Caller must synch on _runners.
|
||||
* @return null on failure
|
||||
@ -515,18 +551,20 @@ class ClientManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsynchronized
|
||||
* Unsynchronized.
|
||||
* DOES contain meta destinations.
|
||||
*/
|
||||
public boolean isLocal(Destination dest) {
|
||||
return _runners.containsKey(dest);
|
||||
return _runners.containsKey(dest) || _metaDests.contains(dest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsynchronized
|
||||
* Unsynchronized.
|
||||
* DOES contain meta destinations.
|
||||
*/
|
||||
public boolean isLocal(Hash destHash) {
|
||||
if (destHash == null) return false;
|
||||
return _runnersByHash.containsKey(destHash);
|
||||
return _runnersByHash.containsKey(destHash) || _metaHashes.contains(destHash);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -542,7 +580,8 @@ class ClientManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsynchronized
|
||||
* Unsynchronized.
|
||||
* Does NOT contain meta destinations.
|
||||
*/
|
||||
public Set<Destination> listClients() {
|
||||
Set<Destination> rv = new HashSet<Destination>();
|
||||
|
@ -266,4 +266,28 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
|
||||
return _manager.internalConnect();
|
||||
throw new I2PSessionException("No manager yet");
|
||||
}
|
||||
|
||||
/**
|
||||
* Declare that we're going to publish a meta LS for this destination.
|
||||
* Must be called before publishing the leaseset.
|
||||
*
|
||||
* @throws I2PSessionException on duplicate dest
|
||||
* @since 0.9.41
|
||||
*/
|
||||
@Override
|
||||
public void registerMetaDest(Destination dest) throws I2PSessionException {
|
||||
if (_manager != null)
|
||||
_manager.registerMetaDest(dest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Declare that we're no longer going to publish a meta LS for this destination.
|
||||
*
|
||||
* @since 0.9.41
|
||||
*/
|
||||
@Override
|
||||
public void unregisterMetaDest(Destination dest) {
|
||||
if (_manager != null)
|
||||
_manager.unregisterMetaDest(dest);
|
||||
}
|
||||
}
|
||||
|
@ -346,7 +346,9 @@ abstract class StoreJob extends JobImpl {
|
||||
getContext().statManager().addRateData("netDb.storeLeaseSetSent", 1);
|
||||
// if it is an encrypted leaseset...
|
||||
if (getContext().keyRing().get(msg.getKey()) != null)
|
||||
sendStoreThroughGarlic(msg, peer, expiration);
|
||||
sendStoreThroughExploratory(msg, peer, expiration);
|
||||
else if (msg.getEntry().getType() == DatabaseEntry.KEY_TYPE_META_LS2)
|
||||
sendWrappedStoreThroughExploratory(msg, peer, expiration);
|
||||
else
|
||||
sendStoreThroughClient(msg, peer, expiration);
|
||||
} else {
|
||||
@ -355,7 +357,7 @@ abstract class StoreJob extends JobImpl {
|
||||
if (_connectChecker.canConnect(_connectMask, peer))
|
||||
sendDirect(msg, peer, expiration);
|
||||
else
|
||||
sendStoreThroughGarlic(msg, peer, expiration);
|
||||
sendStoreThroughExploratory(msg, peer, expiration);
|
||||
}
|
||||
}
|
||||
|
||||
@ -387,12 +389,13 @@ abstract class StoreJob extends JobImpl {
|
||||
}
|
||||
|
||||
/**
|
||||
* This is misnamed, it means sending it out through an exploratory tunnel,
|
||||
* Send it out through an exploratory tunnel,
|
||||
* with the reply to come back through an exploratory tunnel.
|
||||
* There is no garlic encryption added.
|
||||
*
|
||||
* @since 0.9.41 renamed from sendStoreThroughGarlic()
|
||||
*/
|
||||
private void sendStoreThroughGarlic(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
|
||||
private void sendStoreThroughExploratory(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
|
||||
long token = 1 + getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
|
||||
|
||||
Hash to = peer.getIdentity().getHash();
|
||||
@ -514,6 +517,76 @@ abstract class StoreJob extends JobImpl {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a leaseset store message out an exploratory tunnel,
|
||||
* with the reply to come back through a exploratory tunnel.
|
||||
* Stores are garlic encrypted to hide the identity from the OBEP.
|
||||
*
|
||||
* Only for Meta LS2, for now.
|
||||
*
|
||||
* @param msg must contain a leaseset
|
||||
* @since 0.9.41
|
||||
*/
|
||||
private void sendWrappedStoreThroughExploratory(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
|
||||
long token = 1 + getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
|
||||
Hash to = peer.getIdentity().getHash();
|
||||
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundExploratoryTunnel(to);
|
||||
if (replyTunnel == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("No inbound expl. tunnels for reply - delaying...");
|
||||
// continueSending() above did an addPending() so remove it here.
|
||||
// This means we will skip the peer next time, can't be helped for now
|
||||
// without modding StoreState
|
||||
_state.replyTimeout(to);
|
||||
Job waiter = new WaitJob(getContext());
|
||||
waiter.getTiming().setStartAfter(getContext().clock().now() + 3*1000);
|
||||
getContext().jobQueue().addJob(waiter);
|
||||
return;
|
||||
}
|
||||
TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0);
|
||||
msg.setReplyToken(token);
|
||||
msg.setReplyTunnel(replyTunnelId);
|
||||
msg.setReplyGateway(replyTunnel.getPeer(0));
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": send(dbStore) w/ token expected " + token);
|
||||
|
||||
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundExploratoryTunnel(to);
|
||||
if (outTunnel != null) {
|
||||
I2NPMessage sent;
|
||||
// garlic encrypt using router SKM
|
||||
MessageWrapper.WrappedMessage wm = MessageWrapper.wrap(getContext(), msg, null, peer);
|
||||
if (wm == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Fail garlic encrypting");
|
||||
fail();
|
||||
return;
|
||||
}
|
||||
sent = wm.getMessage();
|
||||
_state.addPending(to, wm);
|
||||
|
||||
SendSuccessJob onReply = new SendSuccessJob(getContext(), peer, outTunnel, sent.getMessageSize());
|
||||
FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now());
|
||||
StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, token, expiration);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug(getJobId() + ": sending encrypted store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + sent);
|
||||
}
|
||||
getContext().messageRegistry().registerPending(selector, onReply, onFail);
|
||||
getContext().tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), null, to);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("No outbound expl. tunnels to send a dbStore out - delaying...");
|
||||
// continueSending() above did an addPending() so remove it here.
|
||||
// This means we will skip the peer next time, can't be helped for now
|
||||
// without modding StoreState
|
||||
_state.replyTimeout(to);
|
||||
Job waiter = new WaitJob(getContext());
|
||||
waiter.getTiming().setStartAfter(getContext().clock().now() + 3*1000);
|
||||
getContext().jobQueue().addJob(waiter);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called to wait a little while
|
||||
* @since 0.7.10
|
||||
|
Reference in New Issue
Block a user