* I2CP client session - improvements after review:

- Move more cleanups to finally block
   - Bounded wait
   - Don't ignore InterruptedExceptions, wrap in I2PSessionException and throw
   - More finals
   - Synch tweaks
This commit is contained in:
zzz
2013-07-17 18:56:26 +00:00
parent d31ce49e77
commit 3b46acc285
3 changed files with 67 additions and 42 deletions

View File

@ -92,7 +92,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
protected I2PSessionListener _sessionListener;
/** class that generates new messages */
protected I2CPMessageProducer _producer;
protected final I2CPMessageProducer _producer;
/** map of Long --> MessagePayloadMessage */
protected Map<Long, MessagePayloadMessage> _availableMessages;
@ -101,7 +101,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
protected final Object _bwReceivedLock = new Object();
protected volatile int[] _bwLimits;
protected I2PClientMessageHandlerMap _handlerMap;
protected final I2PClientMessageHandlerMap _handlerMap;
/** used to seperate things out so we can get rid of singletons */
protected final I2PAppContext _context;
@ -119,7 +119,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
CLOSED
}
protected State _state = State.CLOSED;
private State _state = State.CLOSED;
protected final Object _stateLock = new Object();
/** have we received the current date from the router yet? */
@ -172,16 +172,19 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/**
* for extension by SimpleSession (no dest)
*/
protected I2PSessionImpl(I2PAppContext context, Properties options) {
this(context, options, false);
protected I2PSessionImpl(I2PAppContext context, Properties options,
I2PClientMessageHandlerMap handlerMap) {
this(context, options, handlerMap, false);
}
/**
* Basic setup of finals
* @since 0.9.7
*/
private I2PSessionImpl(I2PAppContext context, Properties options, boolean hasDest) {
private I2PSessionImpl(I2PAppContext context, Properties options,
I2PClientMessageHandlerMap handlerMap, boolean hasDest) {
_context = context;
_handlerMap = handlerMap;
_log = context.logManager().getLog(getClass());
if (options == null)
options = (Properties) System.getProperties().clone();
@ -190,10 +193,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_portNum = getPort();
_fastReceive = Boolean.parseBoolean(_options.getProperty(I2PClient.PROP_FAST_RECEIVE));
if (hasDest) {
_producer = new I2CPMessageProducer(context);
_availableMessages = new ConcurrentHashMap();
_myDestination = new Destination();
_privateKey = new PrivateKey();
_signingPrivateKey = new SigningPrivateKey();
} else {
_producer = null;
_availableMessages = null;
_myDestination = null;
_privateKey = null;
_signingPrivateKey = null;
@ -210,11 +217,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* @throws I2PSessionException if there is a problem loading the private keys or
*/
public I2PSessionImpl(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException {
this(context, options, true);
_handlerMap = new I2PClientMessageHandlerMap(context);
_producer = new I2CPMessageProducer(context);
this(context, options, new I2PClientMessageHandlerMap(context), true);
_availabilityNotifier = new AvailabilityNotifier();
_availableMessages = new ConcurrentHashMap();
try {
readDestination(destKeyStream);
} catch (DataFormatException dfe) {
@ -394,8 +398,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
case OPENING:
wasOpening = true;
try {
_stateLock.wait();
} catch (InterruptedException ie) {}
_stateLock.wait(10*1000);
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted", ie);
}
break;
case CLOSING:
throw new I2PSessionException("close in progress");
@ -442,8 +448,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_reader = new I2CPMessageReader(in, this);
}
}
Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true);
notifier.start();
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "before startReading");
_reader.startReading();
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before getDate");
@ -452,53 +456,60 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
int waitcount = 0;
while (!_dateReceived) {
if (waitcount++ > 30) {
closeSocket();
throw new IOException("No handshake received from the router");
}
try {
synchronized (_dateReceivedLock) {
_dateReceivedLock.wait(1000);
}
} catch (InterruptedException ie) { // nop
synchronized (_dateReceivedLock) {
// InterruptedException caught below
_dateReceivedLock.wait(1000);
}
}
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After received a SetDate response");
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before producer.connect()");
_producer.connect(this);
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After producer.connect()");
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After producer.connect()");
// wait until we have created a lease set
waitcount = 0;
while (_leaseSet == null) {
if (waitcount++ > 5*60) {
try {
_producer.disconnect(this);
} catch (I2PSessionException ipe) {}
closeSocket();
throw new IOException("No tunnels built after waiting 5 minutes. Your network connection may be down, or there is severe network congestion.");
}
synchronized (_leaseSetWait) {
try {
_leaseSetWait.wait(1000);
} catch (InterruptedException ie) { // nop
}
// InterruptedException caught below
_leaseSetWait.wait(1000);
}
}
long connected = _context.clock().now();
if (_log.shouldLog(Log.INFO))
if (_log.shouldLog(Log.INFO)) {
long connected = _context.clock().now();
_log.info(getPrefix() + "Lease set created with inbound tunnels after "
+ (connected - startConnect)
+ "ms - ready to participate in the network!");
}
Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true);
notifier.start();
startIdleMonitor();
startVerifyUsage();
success = true;
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted", ie);
} catch (UnknownHostException uhe) {
throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, uhe);
} catch (IOException ioe) {
throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe);
} finally {
changeState(success ? State.OPEN : State.CLOSED);
if (success) {
changeState(State.OPEN);
} else {
_availabilityNotifier.stopNotifying();
synchronized(_stateLock) {
changeState(State.CLOSING);
try {
_producer.disconnect(this);
} catch (I2PSessionException ipe) {}
closeSocket();
}
}
}
}
@ -723,7 +734,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* Has the session been closed (or not yet connected)?
* False when open and during transitions. Unsynchronized.
*/
public boolean isClosed() { return _state == State.CLOSED; }
public boolean isClosed() {
synchronized (_stateLock) {
return _state == State.CLOSED;
}
}
/**
* Deliver an I2CP message to the router
@ -740,7 +755,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
if (!_queue.offer(message, MAX_SEND_WAIT))
throw new I2PSessionException("Timed out waiting while write queue was full");
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted while write queue was full", ie);
throw new I2PSessionException("Interrupted", ie);
}
} else if (_writer == null) {
throw new I2PSessionException("Already closed");
@ -902,7 +917,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
i++;
if ( (delay > MAX_RECONNECT_DELAY) || (delay <= 0) )
delay = MAX_RECONNECT_DELAY;
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
return false;
}
try {
connect();
@ -1017,7 +1036,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
synchronized (waiter) {
waiter.wait(maxWait);
}
} catch (InterruptedException ie) {}
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted", ie);
}
} finally {
_pendingLookups.remove(waiter);
}
@ -1040,7 +1061,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
synchronized (_bwReceivedLock) {
_bwReceivedLock.wait(5*1000);
}
} catch (InterruptedException ie) {}
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted", ie);
}
return _bwLimits;
}

View File

@ -44,9 +44,12 @@ class I2PSessionImpl2 extends I2PSessionImpl {
/** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */
protected boolean _noEffort;
/** for extension */
protected I2PSessionImpl2(I2PAppContext context, Properties options) {
super(context, options);
/**
* for extension by SimpleSession (no dest)
*/
protected I2PSessionImpl2(I2PAppContext context, Properties options,
I2PClientMessageHandlerMap handlerMap) {
super(context, options, handlerMap);
}
/**

View File

@ -38,8 +38,7 @@ class I2PSimpleSession extends I2PSessionImpl2 {
* @throws I2PSessionException if there is a problem
*/
public I2PSimpleSession(I2PAppContext context, Properties options) throws I2PSessionException {
super(context, options);
_handlerMap = new SimpleMessageHandlerMap(context);
super(context, options, new SimpleMessageHandlerMap(context));
}
/**