I2CP Multisession Work in progress:

Fix NPE in requestLeaseSet()
Fix setting new session ID in SessionStatusMessage
Fix subsession support detection
Streaming: one socket manager, multiple connection managers.
Change data structure for subessions in socket manager
Subsession cleanup on destroy
I2PTunnel: add DSA subsession for non-DSA shared client
Javadocs
This commit is contained in:
zzz
2015-04-18 19:01:23 +00:00
parent 6a644dd0e5
commit 91e98ba447
9 changed files with 120 additions and 23 deletions

View File

@ -31,6 +31,7 @@ import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketManagerFactory; import net.i2p.client.streaming.I2PSocketManagerFactory;
import net.i2p.client.streaming.I2PSocketOptions; import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.crypto.SigType;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.EventDispatcher; import net.i2p.util.EventDispatcher;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
@ -291,6 +292,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
socketManager.destroySocketManager(); socketManager.destroySocketManager();
// We could be here a LONG time, holding the lock // We could be here a LONG time, holding the lock
socketManager = buildSocketManager(tunnel, pkf); socketManager = buildSocketManager(tunnel, pkf);
// FIXME may not be the right place for this
I2PSession sub = addSubsession(tunnel);
if (sub != null && _log.shouldLog(Log.WARN))
_log.warn("Added subsession " + sub);
} else { } else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Not building a new socket manager since the old one is open [s=" + s + "]"); _log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Not building a new socket manager since the old one is open [s=" + s + "]");
@ -303,10 +308,41 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Building a new socket manager since there is no other one"); _log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Building a new socket manager since there is no other one");
socketManager = buildSocketManager(tunnel, pkf); socketManager = buildSocketManager(tunnel, pkf);
I2PSession sub = addSubsession(tunnel);
if (sub != null && _log.shouldLog(Log.WARN))
_log.warn("Added subsession " + sub);
} }
return socketManager; return socketManager;
} }
/**
* Add a subsession to a shared client if necessary.
*
* @since 0.9.20
*/
protected static synchronized I2PSession addSubsession(I2PTunnel tunnel) {
I2PSession sess = socketManager.getSession();
if (sess.getMyDestination().getSigType() == SigType.DSA_SHA1)
return null;
Properties props = new Properties();
props.putAll(tunnel.getClientOptions());
String name = props.getProperty("inbound.nickname");
if (name != null)
props.setProperty("inbound.nickname", name + " (DSA)");
name = props.getProperty("outbound.nickname");
if (name != null)
props.setProperty("outbound.nickname", name + " (DSA)");
// TODO set sig type in props?
try {
return socketManager.addSubsession(null, props);
} catch (I2PSessionException ise) {
Log log = tunnel.getContext().logManager().getLog(I2PTunnelClientBase.class);
if (log.shouldLog(Log.WARN))
log.warn("Failed to add subssession", ise);
return null;
}
}
/** /**
* Kill the shared client, so that on restart in android * Kill the shared client, so that on restart in android
* we won't latch onto the old one * we won't latch onto the old one

View File

@ -429,7 +429,8 @@ class ConnectionManager {
// try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {} try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
} else { } else {
con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false); con = new Connection(_context, this, _schedulerChooser, _timer,
_outboundQueue, _conPacketHandler, opts, false);
con.setRemotePeer(peer); con.setRemotePeer(peer);
assignReceiveStreamId(con); assignReceiveStreamId(con);
break; // stop looping as a psuedo-wait break; // stop looping as a psuedo-wait

View File

@ -11,7 +11,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -40,7 +40,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private final I2PAppContext _context; private final I2PAppContext _context;
private final Log _log; private final Log _log;
private final I2PSession _session; private final I2PSession _session;
private final List<I2PSession> _subsessions; private final ConcurrentHashMap<I2PSession, ConnectionManager> _subsessions;
private final I2PServerSocketFull _serverSocket; private final I2PServerSocketFull _serverSocket;
private StandardServerSocket _realServerSocket; private StandardServerSocket _realServerSocket;
private final ConnectionOptions _defaultOptions; private final ConnectionOptions _defaultOptions;
@ -84,7 +84,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
_context = context; _context = context;
_session = session; _session = session;
_subsessions = new CopyOnWriteArrayList<I2PSession>(); _subsessions = new ConcurrentHashMap<I2PSession, ConnectionManager>(4);
_log = _context.logManager().getLog(I2PSocketManagerFull.class); _log = _context.logManager().getLog(I2PSocketManagerFull.class);
_name = name + " " + (__managerId.incrementAndGet()); _name = name + " " + (__managerId.incrementAndGet());
@ -125,7 +125,6 @@ public class I2PSocketManagerFull implements I2PSocketManager {
return _session; return _session;
} }
//////////// gahhh we want a socket manager, not a session
/** /**
* @return a new subsession, non-null * @return a new subsession, non-null
* @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session
@ -135,7 +134,17 @@ public class I2PSocketManagerFull implements I2PSocketManager {
*/ */
public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException {
I2PSession rv = _session.addSubsession(privateKeyStream, opts); I2PSession rv = _session.addSubsession(privateKeyStream, opts);
_subsessions.add(rv); ConnectionOptions defaultOptions = new ConnectionOptions(opts);
ConnectionManager connectionManager = new ConnectionManager(_context, rv, defaultOptions);
ConnectionManager old = _subsessions.putIfAbsent(rv, connectionManager);
if (old != null) {
// shouldn't happen
_session.removeSubsession(rv);
connectionManager.shutdown();
throw new I2PSessionException("dup");
}
if (_log.shouldLog(Log.WARN))
_log.warn("Added subsession " + rv);
return rv; return rv;
} }
@ -146,8 +155,15 @@ public class I2PSocketManagerFull implements I2PSocketManager {
*/ */
public void removeSubsession(I2PSession session) { public void removeSubsession(I2PSession session) {
_session.removeSubsession(session); _session.removeSubsession(session);
_subsessions.remove(session); ConnectionManager cm = _subsessions.remove(session);
// ... if (cm != null) {
cm.shutdown();
if (_log.shouldLog(Log.WARN))
_log.warn("Removeed subsession " + session);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Subsession not found to remove " + session);
}
} }
/** /**
@ -335,6 +351,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
+ " with options: " + opts); + " with options: " + opts);
// fixme pick the subsession here
// the following blocks unless connect delay > 0 // the following blocks unless connect delay > 0
Connection con = _connectionManager.connect(peer, opts); Connection con = _connectionManager.connect(peer, opts);
if (con == null) if (con == null)
@ -419,6 +436,12 @@ public class I2PSocketManagerFull implements I2PSocketManager {
} }
_connectionManager.setAllowIncomingConnections(false); _connectionManager.setAllowIncomingConnections(false);
_connectionManager.shutdown(); _connectionManager.shutdown();
if (!_subsessions.isEmpty()) {
for (I2PSession sess : _subsessions.keySet()) {
removeSubsession(sess);
}
}
// should we destroy the _session too? // should we destroy the _session too?
// yes, since the old lib did (and SAM wants it to, and i dont know why not) // yes, since the old lib did (and SAM wants it to, and i dont know why not)
if ( (_session != null) && (!_session.isClosed()) ) { if ( (_session != null) && (!_session.isClosed()) ) {

View File

@ -267,6 +267,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
} }
_routerSupportsFastReceive = _context.isRouterContext(); _routerSupportsFastReceive = _context.isRouterContext();
_routerSupportsHostLookup = _context.isRouterContext(); _routerSupportsHostLookup = _context.isRouterContext();
_routerSupportsSubsessions = _context.isRouterContext();
} }
/** /**
@ -338,7 +339,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
if (id != null) if (id != null)
_subsessionMap.remove(id); _subsessionMap.remove(id);
/// tell the subsession /// tell the subsession
///.... try {
// doesn't really throw
session.destroySession();
} catch (I2PSessionException ise) {}
} }
} }
@ -893,7 +897,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
synchronized (_subsessionLock) { synchronized (_subsessionLock) {
for (SubSession sess : _subsessions) { for (SubSession sess : _subsessions) {
if (sess.getSessionId() == null) { if (sess.getSessionId() == null) {
sub.messageReceived(reader, message); sess.messageReceived(reader, message);
id = sess.getSessionId(); id = sess.getSessionId();
if (id != null) { if (id != null) {
if (id.equals(_sessionId)) { if (id.equals(_sessionId)) {

View File

@ -723,13 +723,15 @@ class ClientConnectionRunner {
* within the timeout specified, queue up the onFailedJob. This call does not * within the timeout specified, queue up the onFailedJob. This call does not
* block. * block.
* *
* @param h the Destination's hash
* @param set LeaseSet with requested leases - this object must be updated to contain the * @param set LeaseSet with requested leases - this object must be updated to contain the
* signed version (as well as any changed/added/removed Leases) * signed version (as well as any changed/added/removed Leases)
* The LeaseSet contains Leases and destination only, it is unsigned.
* @param expirationTime ms to wait before failing * @param expirationTime ms to wait before failing
* @param onCreateJob Job to run after the LeaseSet is authorized, null OK * @param onCreateJob Job to run after the LeaseSet is authorized, null OK
* @param onFailedJob Job to run after the timeout passes without receiving authorization, null OK * @param onFailedJob Job to run after the timeout passes without receiving authorization, null OK
*/ */
void requestLeaseSet(LeaseSet set, long expirationTime, Job onCreateJob, Job onFailedJob) { void requestLeaseSet(Hash h, LeaseSet set, long expirationTime, Job onCreateJob, Job onFailedJob) {
if (_dead) { if (_dead) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Requesting leaseSet from a dead client: " + set); _log.warn("Requesting leaseSet from a dead client: " + set);
@ -737,6 +739,12 @@ class ClientConnectionRunner {
_context.jobQueue().addJob(onFailedJob); _context.jobQueue().addJob(onFailedJob);
return; return;
} }
SessionParams sp = _sessions.get(h);
if (sp == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Requesting leaseSet for an unknown sesssion");
return;
}
// We can't use LeaseSet.equals() here because the dest, keys, and sig on // We can't use LeaseSet.equals() here because the dest, keys, and sig on
// the new LeaseSet are null. So we compare leases one by one. // the new LeaseSet are null. So we compare leases one by one.
// In addition, the client rewrites the expiration time of all the leases to // In addition, the client rewrites the expiration time of all the leases to
@ -748,10 +756,9 @@ class ClientConnectionRunner {
int leases = set.getLeaseCount(); int leases = set.getLeaseCount();
// synch so _currentLeaseSet isn't changed out from under us // synch so _currentLeaseSet isn't changed out from under us
LeaseSet current = null; LeaseSet current = null;
Destination dest = sp.dest;
synchronized (this) { synchronized (this) {
Destination dest = set.getDestination(); current = sp.currentLeaseSet;
if (dest != null)
current = getLeaseSet(dest.calculateHash());
if (current != null && current.getLeaseCount() == leases) { if (current != null && current.getLeaseCount() == leases) {
for (int i = 0; i < leases; i++) { for (int i = 0; i < leases; i++) {
if (! current.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId())) if (! current.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId()))
@ -770,10 +777,6 @@ class ClientConnectionRunner {
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Current leaseSet " + current + "\nNew leaseSet " + set); _log.info("Current leaseSet " + current + "\nNew leaseSet " + set);
Hash h = set.getDestination().calculateHash();
SessionParams sp = _sessions.get(h);
if (sp == null)
return;
LeaseRequestState state; LeaseRequestState state;
synchronized (this) { synchronized (this) {
state = sp.leaseRequest; state = sp.leaseRequest;
@ -788,12 +791,15 @@ class ClientConnectionRunner {
// theirs is newer // theirs is newer
} else { } else {
// ours is newer, so wait a few secs and retry // ours is newer, so wait a few secs and retry
set.setDestination(dest);
_context.simpleTimer2().addEvent(new Rerequest(set, expirationTime, onCreateJob, onFailedJob), 3*1000); _context.simpleTimer2().addEvent(new Rerequest(set, expirationTime, onCreateJob, onFailedJob), 3*1000);
} }
// fire onCreated? // fire onCreated?
return; // already requesting return; // already requesting
} else { } else {
sp.leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, _context.clock().now() + expirationTime, set); set.setDestination(dest);
sp.leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob,
_context.clock().now() + expirationTime, set);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("New request: " + state); _log.debug("New request: " + state);
} }
@ -807,6 +813,7 @@ class ClientConnectionRunner {
private final Job _onCreate; private final Job _onCreate;
private final Job _onFailed; private final Job _onFailed;
/** @param ls dest must be set */
public Rerequest(LeaseSet ls, long expirationTime, Job onCreate, Job onFailed) { public Rerequest(LeaseSet ls, long expirationTime, Job onCreate, Job onFailed) {
_ls = ls; _ls = ls;
_expirationTime = expirationTime; _expirationTime = expirationTime;
@ -815,7 +822,7 @@ class ClientConnectionRunner {
} }
public void timeReached() { public void timeReached() {
requestLeaseSet(_ls, _expirationTime, _onCreate, _onFailed); requestLeaseSet(_ls.getDestination().calculateHash(), _ls, _expirationTime, _onCreate, _onFailed);
} }
} }

View File

@ -392,7 +392,8 @@ class ClientManager {
* *
* @param dest Destination from which the LeaseSet's authorization should be requested * @param dest Destination from which the LeaseSet's authorization should be requested
* @param set LeaseSet with requested leases - this object must be updated to contain the * @param set LeaseSet with requested leases - this object must be updated to contain the
* signed version (as well as any changed/added/removed Leases) * signed version (as well as any changed/added/removed Leases).
* The LeaseSet contains Leases only; it is unsigned and does not have the destination set.
* @param timeout ms to wait before failing * @param timeout ms to wait before failing
* @param onCreateJob Job to run after the LeaseSet is authorized * @param onCreateJob Job to run after the LeaseSet is authorized
* @param onFailedJob Job to run after the timeout passes without receiving authorization * @param onFailedJob Job to run after the timeout passes without receiving authorization
@ -405,15 +406,24 @@ class ClientManager {
+ dest.calculateHash().toBase64() + ". disconnected?"); + dest.calculateHash().toBase64() + ". disconnected?");
_ctx.jobQueue().addJob(onFailedJob); _ctx.jobQueue().addJob(onFailedJob);
} else { } else {
runner.requestLeaseSet(set, timeout, onCreateJob, onFailedJob); runner.requestLeaseSet(dest.calculateHash(), set, timeout, onCreateJob, onFailedJob);
} }
} }
/**
* Request that a particular client authorize the Leases contained in the
* LeaseSet.
*
* @param dest Destination from which the LeaseSet's authorization should be requested
* @param ls LeaseSet with requested leases - this object must be updated to contain the
* signed version (as well as any changed/added/removed Leases).
* The LeaseSet contains Leases only; it is unsigned and does not have the destination set.
*/
public void requestLeaseSet(Hash dest, LeaseSet ls) { public void requestLeaseSet(Hash dest, LeaseSet ls) {
ClientConnectionRunner runner = getRunner(dest); ClientConnectionRunner runner = getRunner(dest);
if (runner != null) { if (runner != null) {
// no need to fire off any jobs... // no need to fire off any jobs...
runner.requestLeaseSet(ls, REQUEST_LEASESET_TIMEOUT, null, null); runner.requestLeaseSet(dest, ls, REQUEST_LEASESET_TIMEOUT, null, null);
} }
} }

View File

@ -115,6 +115,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
* @param dest Destination from which the LeaseSet's authorization should be requested * @param dest Destination from which the LeaseSet's authorization should be requested
* @param set LeaseSet with requested leases - this object must be updated to contain the * @param set LeaseSet with requested leases - this object must be updated to contain the
* signed version (as well as any changed/added/removed Leases) * signed version (as well as any changed/added/removed Leases)
* The LeaseSet contains Leases only; it is unsigned and does not have the destination set.
* @param timeout ms to wait before failing * @param timeout ms to wait before failing
* @param onCreateJob Job to run after the LeaseSet is authorized * @param onCreateJob Job to run after the LeaseSet is authorized
* @param onFailedJob Job to run after the timeout passes without receiving authorization * @param onFailedJob Job to run after the timeout passes without receiving authorization
@ -126,6 +127,15 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
_log.error("Null manager on requestLeaseSet!"); _log.error("Null manager on requestLeaseSet!");
} }
/**
* Request that a particular client authorize the Leases contained in the
* LeaseSet.
*
* @param dest Destination from which the LeaseSet's authorization should be requested
* @param ls LeaseSet with requested leases - this object must be updated to contain the
* signed version (as well as any changed/added/removed Leases).
* The LeaseSet contains Leases only; it is unsigned and does not have the destination set.
*/
public void requestLeaseSet(Hash dest, LeaseSet set) { public void requestLeaseSet(Hash dest, LeaseSet set) {
if (_manager != null) if (_manager != null)
_manager.requestLeaseSet(dest, set); _manager.requestLeaseSet(dest, set);

View File

@ -251,6 +251,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
props.putAll(in.getOptions()); props.putAll(in.getOptions());
cfg.setOptions(props); cfg.setOptions(props);
boolean isPrimary = _runner.getSessionIds().isEmpty(); boolean isPrimary = _runner.getSessionIds().isEmpty();
// this sets the session id
int status = _runner.sessionEstablished(cfg); int status = _runner.sessionEstablished(cfg);
if (status != SessionStatusMessage.STATUS_CREATED) { if (status != SessionStatusMessage.STATUS_CREATED) {
// For now, we do NOT send a SessionStatusMessage - see javadoc above // For now, we do NOT send a SessionStatusMessage - see javadoc above
@ -266,6 +267,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_runner.disconnectClient(msg); _runner.disconnectClient(msg);
return; return;
} }
// get the new session ID
id = _runner.getSessionId(dest.calculateHash());
sendStatusMessage(id, status); sendStatusMessage(id, status);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))

View File

@ -30,6 +30,9 @@ class LeaseRequestState {
/** /**
* @param expiration absolute time, when the request expires (not when the LS expires) * @param expiration absolute time, when the request expires (not when the LS expires)
* @param requested LeaseSet with requested leases - this object must be updated to contain the
* signed version (as well as any changed/added/removed Leases)
* The LeaseSet contains Leases and destination only, it is unsigned.
*/ */
public LeaseRequestState(Job onGranted, Job onFailed, long expiration, LeaseSet requested) { public LeaseRequestState(Job onGranted, Job onFailed, long expiration, LeaseSet requested) {
_onGranted = onGranted; _onGranted = onGranted;