Pass session in connect();

Store the session in Connection;
Don't create a new ConnectionManager for a subsession,
now that all components track the session properly.
@since updates
This commit is contained in:
zzz
2015-06-10 19:14:33 +00:00
parent 7b84676f4a
commit f341e5566b
3 changed files with 62 additions and 33 deletions

View File

@ -28,6 +28,7 @@ class Connection {
private final I2PAppContext _context; private final I2PAppContext _context;
private final Log _log; private final Log _log;
private final ConnectionManager _connectionManager; private final ConnectionManager _connectionManager;
private final I2PSession _session;
private Destination _remotePeer; private Destination _remotePeer;
private final AtomicLong _sendStreamId = new AtomicLong(); private final AtomicLong _sendStreamId = new AtomicLong();
private final AtomicLong _receiveStreamId = new AtomicLong(); private final AtomicLong _receiveStreamId = new AtomicLong();
@ -112,12 +113,14 @@ class Connection {
/** /**
* @param opts may be null * @param opts may be null
*/ */
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, public Connection(I2PAppContext ctx, ConnectionManager manager,
I2PSession session, SchedulerChooser chooser,
SimpleTimer2 timer, SimpleTimer2 timer,
PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts,
boolean isInbound) { boolean isInbound) {
_context = ctx; _context = ctx;
_connectionManager = manager; _connectionManager = manager;
_session = session;
_chooser = chooser; _chooser = chooser;
_outboundQueue = queue; _outboundQueue = queue;
_handler = handler; _handler = handler;
@ -877,7 +880,7 @@ class Connection {
/** @since 0.9.21 */ /** @since 0.9.21 */
public ConnectionManager getConnectionManager() { return _connectionManager; } public ConnectionManager getConnectionManager() { return _connectionManager; }
public I2PSession getSession() { return _connectionManager.getSession(); } public I2PSession getSession() { return _session; }
public I2PSocketFull getSocket() { return _socket; } public I2PSocketFull getSocket() { return _socket; }
public void setSocket(I2PSocketFull socket) { _socket = socket; } public void setSocket(I2PSocketFull socket) { _socket = socket; }

View File

@ -214,7 +214,8 @@ class ConnectionManager {
ConnectionOptions opts = new ConnectionOptions(_defaultOptions); ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
opts.setPort(synPacket.getRemotePort()); opts.setPort(synPacket.getRemotePort());
opts.setLocalPort(synPacket.getLocalPort()); opts.setLocalPort(synPacket.getLocalPort());
Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, true); Connection con = new Connection(_context, this, synPacket.getSession(), _schedulerChooser,
_timer, _outboundQueue, _conPacketHandler, opts, true);
_tcbShare.updateOptsFromShare(con); _tcbShare.updateOptsFromShare(con);
boolean reject = false; boolean reject = false;
int active = 0; int active = 0;
@ -393,9 +394,10 @@ class ConnectionManager {
* *
* @param peer Destination to contact * @param peer Destination to contact
* @param opts Connection's options * @param opts Connection's options
* @param session generally the session from the constructor, but could be a subsession
* @return new connection, or null if we have exceeded our limit * @return new connection, or null if we have exceeded our limit
*/ */
public Connection connect(Destination peer, ConnectionOptions opts) { public Connection connect(Destination peer, ConnectionOptions opts, I2PSession session) {
Connection con = null; Connection con = null;
long expiration = _context.clock().now(); long expiration = _context.clock().now();
long tmout = opts.getConnectTimeout(); long tmout = opts.getConnectTimeout();
@ -429,7 +431,7 @@ class ConnectionManager {
// try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {} try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
} else { } else {
con = new Connection(_context, this, _schedulerChooser, _timer, con = new Connection(_context, this, session, _schedulerChooser, _timer,
_outboundQueue, _conPacketHandler, opts, false); _outboundQueue, _conPacketHandler, opts, false);
con.setRemotePeer(peer); con.setRemotePeer(peer);
assignReceiveStreamId(con); assignReceiveStreamId(con);
@ -591,7 +593,12 @@ class ConnectionManager {
public MessageHandler getMessageHandler() { return _messageHandler; } public MessageHandler getMessageHandler() { return _messageHandler; }
public PacketHandler getPacketHandler() { return _packetHandler; } public PacketHandler getPacketHandler() { return _packetHandler; }
/**
* This is the primary session only
*/
public I2PSession getSession() { return _session; } public I2PSession getSession() { return _session; }
public void updateOptsFromShare(Connection con) { _tcbShare.updateOptsFromShare(con); } public void updateOptsFromShare(Connection con) { _tcbShare.updateOptsFromShare(con); }
public void updateShareOpts(Connection con) { _tcbShare.updateShareOpts(con); } public void updateShareOpts(Connection con) { _tcbShare.updateShareOpts(con); }
// Both of these methods are // Both of these methods are

View File

@ -14,7 +14,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -35,6 +34,7 @@ import net.i2p.data.PrivateKey;
import net.i2p.data.PublicKey; import net.i2p.data.PublicKey;
import net.i2p.data.SimpleDataStructure; import net.i2p.data.SimpleDataStructure;
import net.i2p.util.ConvertToHash; import net.i2p.util.ConvertToHash;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -51,7 +51,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private final I2PAppContext _context; private final I2PAppContext _context;
private final Log _log; private final Log _log;
private final I2PSession _session; private final I2PSession _session;
private final ConcurrentHashMap<I2PSession, ConnectionManager> _subsessions; private final Set<I2PSession> _subsessions;
private final I2PServerSocketFull _serverSocket; private final I2PServerSocketFull _serverSocket;
private StandardServerSocket _realServerSocket; private StandardServerSocket _realServerSocket;
private final ConnectionOptions _defaultOptions; private final ConnectionOptions _defaultOptions;
@ -61,7 +61,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private final ConnectionManager _connectionManager; private final ConnectionManager _connectionManager;
private final AtomicBoolean _isDestroyed = new AtomicBoolean(); private final AtomicBoolean _isDestroyed = new AtomicBoolean();
/** @since 0.9.20 */ /** @since 0.9.21 */
private static final Set<Hash> _dsaOnly = new HashSet<Hash>(16); private static final Set<Hash> _dsaOnly = new HashSet<Hash>(16);
private static final String[] DSA_ONLY_HASHES = { private static final String[] DSA_ONLY_HASHES = {
// list from http://zzz.i2p/topics/1682?page=1#p8414 // list from http://zzz.i2p/topics/1682?page=1#p8414
@ -140,7 +140,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
_context = context; _context = context;
_session = session; _session = session;
_subsessions = new ConcurrentHashMap<I2PSession, ConnectionManager>(4); _subsessions = new ConcurrentHashSet<I2PSession>(4);
_log = _context.logManager().getLog(I2PSocketManagerFull.class); _log = _context.logManager().getLog(I2PSocketManagerFull.class);
_name = name + " " + (__managerId.incrementAndGet()); _name = name + " " + (__managerId.incrementAndGet());
@ -186,7 +186,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session
* and different signing keys * and different signing keys
* @param opts subsession options if any, may be null * @param opts subsession options if any, may be null
* @since 0.9.19 * @since 0.9.21
*/ */
public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException {
if (privateKeyStream == null) { if (privateKeyStream == null) {
@ -214,15 +214,15 @@ public class I2PSocketManagerFull implements I2PSocketManager {
privateKeyStream = new ByteArrayInputStream(keyStream.toByteArray()); privateKeyStream = new ByteArrayInputStream(keyStream.toByteArray());
} }
I2PSession rv = _session.addSubsession(privateKeyStream, opts); I2PSession rv = _session.addSubsession(privateKeyStream, opts);
ConnectionOptions defaultOptions = new ConnectionOptions(opts); boolean added = _subsessions.add(rv);
ConnectionManager connectionManager = new ConnectionManager(_context, rv, defaultOptions); if (!added) {
ConnectionManager old = _subsessions.putIfAbsent(rv, connectionManager);
if (old != null) {
// shouldn't happen // shouldn't happen
_session.removeSubsession(rv); _session.removeSubsession(rv);
connectionManager.shutdown();
throw new I2PSessionException("dup"); throw new I2PSessionException("dup");
} }
ConnectionOptions defaultOptions = new ConnectionOptions(opts);
int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY;
rv.addMuxedSessionListener(_connectionManager.getMessageHandler(), protocol, defaultOptions.getLocalPort());
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Added subsession " + rv); _log.warn("Added subsession " + rv);
return rv; return rv;
@ -230,7 +230,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
/** /**
* @param opts may be null * @param opts may be null
* @since 0.9.20 copied from I2PSocketManagerFactory * @since 0.9.21 copied from I2PSocketManagerFactory
*/ */
private SigType getSigType(Properties opts) { private SigType getSigType(Properties opts) {
if (opts != null) { if (opts != null) {
@ -252,13 +252,12 @@ public class I2PSocketManagerFull implements I2PSocketManager {
/** /**
* Remove the subsession * Remove the subsession
* *
* @since 0.9.19 * @since 0.9.21
*/ */
public void removeSubsession(I2PSession session) { public void removeSubsession(I2PSession session) {
_session.removeSubsession(session); _session.removeSubsession(session);
ConnectionManager cm = _subsessions.remove(session); boolean removed = _subsessions.remove(session);
if (cm != null) { if (removed) {
cm.shutdown();
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Removeed subsession " + session); _log.warn("Removeed subsession " + session);
} else { } else {
@ -269,7 +268,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
/** /**
* @return a list of subsessions, non-null, does not include the primary session * @return a list of subsessions, non-null, does not include the primary session
* @since 0.9.19 * @since 0.9.21
*/ */
public List<I2PSession> getSubsessions() { public List<I2PSession> getSubsessions() {
return _session.getSubsessions(); return _session.getSubsessions();
@ -282,6 +281,9 @@ public class I2PSocketManagerFull implements I2PSocketManager {
/** /**
* The accept() call. * The accept() call.
* *
* This only listens on the primary session. There is no way to get
* incoming connections on a subsession.
*
* @return connected I2PSocket, or null through 0.9.16, non-null as of 0.9.17 * @return connected I2PSocket, or null through 0.9.16, non-null as of 0.9.17
* @throws I2PException if session is closed * @throws I2PException if session is closed
* @throws ConnectException (since 0.9.17; I2PServerSocket interface always declared it) * @throws ConnectException (since 0.9.17; I2PServerSocket interface always declared it)
@ -301,6 +303,8 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* *
* Uses the ports from the default options. * Uses the ports from the default options.
* *
* TODO There is no way to ping on a subsession.
*
* @param peer * @param peer
* @param timeoutMs timeout in ms, greater than zero * @param timeoutMs timeout in ms, greater than zero
* @return true on success, false on failure * @return true on success, false on failure
@ -319,6 +323,8 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* *
* Uses the ports specified. * Uses the ports specified.
* *
* TODO There is no way to ping on a subsession.
*
* @param peer Destination to ping * @param peer Destination to ping
* @param localPort 0 - 65535 * @param localPort 0 - 65535
* @param remotePort 0 - 65535 * @param remotePort 0 - 65535
@ -342,6 +348,8 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* *
* Uses the ports specified. * Uses the ports specified.
* *
* TODO There is no way to ping on a subsession.
*
* @param peer Destination to ping * @param peer Destination to ping
* @param localPort 0 - 65535 * @param localPort 0 - 65535
* @param remotePort 0 - 65535 * @param remotePort 0 - 65535
@ -375,6 +383,8 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* with the setters; no need to use this method for those. * with the setters; no need to use this method for those.
* This does NOT update the underlying I2CP or tunnel options; use getSession().updateOptions() for that. * This does NOT update the underlying I2CP or tunnel options; use getSession().updateOptions() for that.
* *
* TODO There is no way to update the options on a subsession.
*
* @param options as created from a call to buildOptions(properties), non-null * @param options as created from a call to buildOptions(properties), non-null
*/ */
public void setDefaultOptions(I2PSocketOptions options) { public void setDefaultOptions(I2PSocketOptions options) {
@ -388,6 +398,8 @@ public class I2PSocketManagerFull implements I2PSocketManager {
/** /**
* Current options, not a copy, setters may be used to make changes. * Current options, not a copy, setters may be used to make changes.
*
* TODO There is no facility to specify the session.
*/ */
public I2PSocketOptions getDefaultOptions() { public I2PSocketOptions getDefaultOptions() {
return _defaultOptions; return _defaultOptions;
@ -398,6 +410,9 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* This method does not throw exceptions, but methods on the returned socket * This method does not throw exceptions, but methods on the returned socket
* may throw exceptions if the socket or socket manager is closed. * may throw exceptions if the socket or socket manager is closed.
* *
* This only listens on the primary session. There is no way to get
* incoming connections on a subsession.
*
* @return non-null * @return non-null
*/ */
public I2PServerSocket getServerSocket() { public I2PServerSocket getServerSocket() {
@ -407,6 +422,10 @@ public class I2PSocketManagerFull implements I2PSocketManager {
/** /**
* Like getServerSocket but returns a real ServerSocket for easier porting of apps. * Like getServerSocket but returns a real ServerSocket for easier porting of apps.
*
* This only listens on the primary session. There is no way to get
* incoming connections on a subsession.
*
* @since 0.8.4 * @since 0.8.4
*/ */
public synchronized ServerSocket getStandardServerSocket() throws IOException { public synchronized ServerSocket getStandardServerSocket() throws IOException {
@ -417,16 +436,16 @@ public class I2PSocketManagerFull implements I2PSocketManager {
} }
private void verifySession() throws I2PException { private void verifySession() throws I2PException {
verifySession(_connectionManager); verifySession(_connectionManager.getSession());
} }
/** @since 0.9.20 */ /** @since 0.9.21 */
private void verifySession(ConnectionManager cm) throws I2PException { private void verifySession(I2PSession session) throws I2PException {
if (_isDestroyed.get()) if (_isDestroyed.get())
throw new I2PException("Session was closed"); throw new I2PException("Session was closed");
if (!cm.getSession().isClosed()) if (!session.isClosed())
return; return;
cm.getSession().connect(); session.connect();
} }
/** /**
@ -457,22 +476,22 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
+ " with options: " + opts); + " with options: " + opts);
// pick the subsession here // pick the subsession here
ConnectionManager cm = _connectionManager; I2PSession session = _session;
if (!_subsessions.isEmpty()) { if (!_subsessions.isEmpty()) {
Hash h = peer.calculateHash(); Hash h = peer.calculateHash();
if (_dsaOnly.contains(h)) { if (_dsaOnly.contains(h)) {
// FIXME just taking the first one for now // FIXME just taking the first one for now
for (Map.Entry<I2PSession, ConnectionManager> e : _subsessions.entrySet()) { for (I2PSession sess : _subsessions) {
if (e.getKey().getMyDestination().getSigType() == SigType.DSA_SHA1) { if (sess.getMyDestination().getSigType() == SigType.DSA_SHA1) {
cm = e.getValue(); session = sess;
break; break;
} }
} }
} }
} }
verifySession(cm); verifySession(session);
// the following blocks unless connect delay > 0 // the following blocks unless connect delay > 0
Connection con = cm.connect(peer, opts); Connection con = _connectionManager.connect(peer, opts, session);
if (con == null) if (con == null)
throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns()); throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns());
I2PSocketFull socket = new I2PSocketFull(con,_context); I2PSocketFull socket = new I2PSocketFull(con,_context);
@ -556,7 +575,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_connectionManager.setAllowIncomingConnections(false); _connectionManager.setAllowIncomingConnections(false);
_connectionManager.shutdown(); _connectionManager.shutdown();
if (!_subsessions.isEmpty()) { if (!_subsessions.isEmpty()) {
for (I2PSession sess : _subsessions.keySet()) { for (I2PSession sess : _subsessions) {
removeSubsession(sess); removeSubsession(sess);
} }
} }