diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 9c648b4c52..84248a6d22 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -14,10 +14,10 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -85,8 +85,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** used to seperate things out so we can get rid of singletons */ protected I2PAppContext _context; - /** MessageStatusMessage status from the most recent send that hasn't been consumed */ - private List _receivedStatus; private int _totalReconnectAttempts; /** monitor for waiting until a lease set has been granted */ @@ -99,6 +97,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa private boolean _dateReceived; /** lock that we wait upon, that the SetDateMessageHandler notifies */ private Object _dateReceivedLock = new Object(); + + /** + * thread that we tell when new messages are available who then tells us + * to fetch them. The point of this is so that the fetch doesn't block the + * reading of other messages (in turn, potentially leading to deadlock) + * + */ + private AvailabilityNotifier _availabilityNotifier; void dateUpdated() { _dateReceived = true; @@ -119,6 +125,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _handlerMap = new I2PClientMessageHandlerMap(context); _closed = true; _producer = new I2CPMessageProducer(context); + _availabilityNotifier = new AvailabilityNotifier(); _availableMessages = new HashMap(); try { readDestination(destKeyStream); @@ -129,7 +136,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } loadConfig(options); _sessionId = null; - _receivedStatus = new LinkedList(); _leaseSet = null; _totalReconnectAttempts = 0; } @@ -220,6 +226,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa */ public void connect() throws I2PSessionException { _closed = false; + I2PThread notifier = new I2PThread(_availabilityNotifier); + notifier.setName("Notifier " + _myDestination.calculateHash().toBase64().substring(0,4)); + notifier.setDaemon(true); + notifier.start(); + long startConnect = _context.clock().now(); try { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "connect begin to " + _hostname + ":" + _portNum); @@ -317,36 +328,68 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa return tags; } - private static volatile long __notifierId = 0; - /** * Recieve a payload message and let the app know its available */ public void addNewMessage(MessagePayloadMessage msg) { _availableMessages.put(new Integer(msg.getMessageId().getMessageId()), msg); - final int id = msg.getMessageId().getMessageId(); + int id = msg.getMessageId().getMessageId(); byte data[] = msg.getPayload().getUnencryptedData(); if ((data == null) || (data.length <= 0)) { if (_log.shouldLog(Log.ERROR)) _log.error(getPrefix() + "addNewMessage of a message with no unencrypted data", new Exception("Empty message")); } else { - final long size = data.length; - Thread notifier = new I2PThread(new Runnable() { - public void run() { - if (_sessionListener != null) - _sessionListener.messageAvailable(I2PSessionImpl.this, id, size); - } - }); - long nid = ++__notifierId; - notifier.setName("Notifier " + nid); - notifier.setDaemon(true); - notifier.start(); + int size = data.length; + _availabilityNotifier.available(id, size); if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + "Notifier " + nid + " is for session " + _sessionId + ", message " + id + "]"); + _log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id); } } + private class AvailabilityNotifier implements Runnable { + private List _pendingIds; + private List _pendingSizes; + private boolean _alive; + + public AvailabilityNotifier() { + _pendingIds = new ArrayList(2); + _pendingSizes = new ArrayList(2); + } + + public void stopNotifying() { _alive = false; } + + public void available(int msgId, int size) { + synchronized (AvailabilityNotifier.this) { + _pendingIds.add(new Integer(msgId)); + _pendingSizes.add(new Integer(size)); + AvailabilityNotifier.this.notifyAll(); + } + } + public void run() { + _alive = true; + while (_alive) { + Integer msgId = null; + Integer size = null; + synchronized (AvailabilityNotifier.this) { + if (_pendingIds.size() <= 0) { + try { + AvailabilityNotifier.this.wait(); + } catch (InterruptedException ie) {} + } + if (_pendingIds.size() > 0) { + msgId = (Integer)_pendingIds.remove(0); + size = (Integer)_pendingSizes.remove(0); + } + } + if ( (msgId != null) && (size != null) ) { + if (_sessionListener != null) + _sessionListener.messageAvailable(I2PSessionImpl.this, msgId.intValue(), size.intValue()); + } + } + } + } + /** * Recieve notification of some I2CP message and handle it if possible * @@ -466,6 +509,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa propogateError("Error destroying the session", ipe); } } + _availabilityNotifier.stopNotifying(); _closed = true; closeSocket(); if (_sessionListener != null) _sessionListener.disconnected(this);