I2CP: Take 2 of fix, so a newly created session

isn't destroyed and immediately replaced by i2ptunnel,
which caused dup shared clients in a race at startup;
Clarify session exception text if not open
This commit is contained in:
zzz
2015-05-24 00:14:32 +00:00
parent 3d07e1a10b
commit 4ea99b8a10
6 changed files with 37 additions and 11 deletions

View File

@ -77,6 +77,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
private volatile ThreadPoolExecutor _executor; private volatile ThreadPoolExecutor _executor;
/** this is ONLY for shared clients */
private static I2PSocketManager socketManager;
/**
* Only destroy and replace a static shared client socket manager if it's been connected before
* @since 0.9.20
*/
private enum SocketManagerState { INIT, CONNECTED }
private static SocketManagerState _socketManagerState = SocketManagerState.INIT;
public static final String PROP_USE_SSL = I2PTunnelServer.PROP_USE_SSL; public static final String PROP_USE_SSL = I2PTunnelServer.PROP_USE_SSL;
/** /**
@ -239,10 +249,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
connectManager(); connectManager();
} }
/** this is ONLY for shared clients */
private static I2PSocketManager socketManager;
/** /**
* This is ONLY for shared clients. * This is ONLY for shared clients.
* As of 0.9.20 this is fast, and does NOT connect the manager to the router. * As of 0.9.20 this is fast, and does NOT connect the manager to the router.
@ -283,12 +289,13 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
Log _log = tunnel.getContext().logManager().getLog(I2PTunnelClientBase.class); Log _log = tunnel.getContext().logManager().getLog(I2PTunnelClientBase.class);
if (socketManager != null && !socketManager.isDestroyed()) { if (socketManager != null && !socketManager.isDestroyed()) {
I2PSession s = socketManager.getSession(); I2PSession s = socketManager.getSession();
if (s.isClosed()) { if (s.isClosed() && _socketManagerState != SocketManagerState.INIT) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Building a new socket manager since the old one closed [s=" + s + "]"); _log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Building a new socket manager since the old one closed [s=" + s + "]");
tunnel.removeSession(s); tunnel.removeSession(s);
// make sure the old one is closed // make sure the old one is closed
socketManager.destroySocketManager(); socketManager.destroySocketManager();
_socketManagerState = SocketManagerState.INIT;
// We could be here a LONG time, holding the lock // We could be here a LONG time, holding the lock
socketManager = buildSocketManager(tunnel, pkf); socketManager = buildSocketManager(tunnel, pkf);
} else { } else {
@ -424,6 +431,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
while (sockMgr.getSession().isClosed()) { while (sockMgr.getSession().isClosed()) {
try { try {
sockMgr.getSession().connect(); sockMgr.getSession().connect();
synchronized(I2PTunnelClientBase.class) {
if (sockMgr == socketManager)
_socketManagerState = SocketManagerState.CONNECTED;
}
} catch (I2PSessionException ise) { } catch (I2PSessionException ise) {
// shadows instance _log // shadows instance _log
Log _log = getTunnel().getContext().logManager().getLog(I2PTunnelClientBase.class); Log _log = getTunnel().getContext().logManager().getLog(I2PTunnelClientBase.class);

View File

@ -257,7 +257,7 @@ public interface I2PSession {
/** /**
* Have we closed the session? * Have we closed the session?
* *
* @return true if the session is closed * @return true if the session is closed, OR connect() has not been called yet
*/ */
public boolean isClosed(); public boolean isClosed();

View File

@ -135,7 +135,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
CLOSED CLOSED
} }
private State _state = State.INIT; protected State _state = State.INIT;
protected final Object _stateLock = new Object(); protected final Object _stateLock = new Object();
/** /**
@ -614,7 +614,12 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
* Report abuse with regards to the given messageId * Report abuse with regards to the given messageId
*/ */
public void reportAbuse(int msgId, int severity) throws I2PSessionException { public void reportAbuse(int msgId, int severity) throws I2PSessionException {
if (isClosed()) throw new I2PSessionException(getPrefix() + "Already closed"); synchronized (_stateLock) {
if (_state == State.CLOSED)
throw new I2PSessionException("Already closed");
if (_state == State.INIT)
throw new I2PSessionException("Not open, must call connect() first");
}
_producer.reportAbuse(this, msgId, severity); _producer.reportAbuse(this, msgId, severity);
} }

View File

@ -247,7 +247,12 @@ class I2PSessionImpl2 extends I2PSessionImpl {
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expires) public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expires)
throws I2PSessionException { throws I2PSessionException {
if (_log.shouldLog(Log.DEBUG)) _log.debug("sending message"); if (_log.shouldLog(Log.DEBUG)) _log.debug("sending message");
if (isClosed()) throw new I2PSessionException("Already closed"); synchronized (_stateLock) {
if (_state == State.CLOSED)
throw new I2PSessionException("Already closed");
if (_state == State.INIT)
throw new I2PSessionException("Not open, must call connect() first");
}
updateActivity(); updateActivity();
// Sadly there is no way to send something completely uncompressed in a backward-compatible way, // Sadly there is no way to send something completely uncompressed in a backward-compatible way,

View File

@ -256,7 +256,12 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
* @since 0.9.14 * @since 0.9.14
*/ */
private byte[] prepPayload(byte[] payload, int offset, int size, int proto, int fromPort, int toPort) throws I2PSessionException { private byte[] prepPayload(byte[] payload, int offset, int size, int proto, int fromPort, int toPort) throws I2PSessionException {
if (isClosed()) throw new I2PSessionException("Already closed"); synchronized (_stateLock) {
if (_state == State.CLOSED)
throw new I2PSessionException("Already closed");
if (_state == State.INIT)
throw new I2PSessionException("Not open, must call connect() first");
}
updateActivity(); updateActivity();
if (shouldCompress(size)) if (shouldCompress(size))

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 29; public final static long BUILD = 30;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = "-rc"; public final static String EXTRA = "-rc";