From c6bb8f09ca9bd9cc20cbc0dd2e69fb15e5ec6190 Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 29 Jul 2004 20:02:12 +0000 Subject: [PATCH] avoid the race that could corrupt local transfers by using a single thread to receive notifications of message availability (and in turn fetch that data) the old way fired off a new (very short lived) thread for each message received, and if two happened really really quickly, they'd both lock on the mutex and the order would be undefined this avoids that. thanks to oOo et al for pestering me and sending in logs :) --- .../src/net/i2p/client/I2PSessionImpl.java | 82 ++++++++++++++----- 1 file changed, 63 insertions(+), 19 deletions(-) diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 9c648b4c5..84248a6d2 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -14,10 +14,10 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -85,8 +85,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** used to seperate things out so we can get rid of singletons */ protected I2PAppContext _context; - /** MessageStatusMessage status from the most recent send that hasn't been consumed */ - private List _receivedStatus; private int _totalReconnectAttempts; /** monitor for waiting until a lease set has been granted */ @@ -99,6 +97,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa private boolean _dateReceived; /** lock that we wait upon, that the SetDateMessageHandler notifies */ private Object _dateReceivedLock = new Object(); + + /** + * thread that we tell when new messages are available who then tells us + * to fetch them. The point of this is so that the fetch doesn't block the + * reading of other messages (in turn, potentially leading to deadlock) + * + */ + private AvailabilityNotifier _availabilityNotifier; void dateUpdated() { _dateReceived = true; @@ -119,6 +125,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _handlerMap = new I2PClientMessageHandlerMap(context); _closed = true; _producer = new I2CPMessageProducer(context); + _availabilityNotifier = new AvailabilityNotifier(); _availableMessages = new HashMap(); try { readDestination(destKeyStream); @@ -129,7 +136,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } loadConfig(options); _sessionId = null; - _receivedStatus = new LinkedList(); _leaseSet = null; _totalReconnectAttempts = 0; } @@ -220,6 +226,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa */ public void connect() throws I2PSessionException { _closed = false; + I2PThread notifier = new I2PThread(_availabilityNotifier); + notifier.setName("Notifier " + _myDestination.calculateHash().toBase64().substring(0,4)); + notifier.setDaemon(true); + notifier.start(); + long startConnect = _context.clock().now(); try { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "connect begin to " + _hostname + ":" + _portNum); @@ -317,36 +328,68 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa return tags; } - private static volatile long __notifierId = 0; - /** * Recieve a payload message and let the app know its available */ public void addNewMessage(MessagePayloadMessage msg) { _availableMessages.put(new Integer(msg.getMessageId().getMessageId()), msg); - final int id = msg.getMessageId().getMessageId(); + int id = msg.getMessageId().getMessageId(); byte data[] = msg.getPayload().getUnencryptedData(); if ((data == null) || (data.length <= 0)) { if (_log.shouldLog(Log.ERROR)) _log.error(getPrefix() + "addNewMessage of a message with no unencrypted data", new Exception("Empty message")); } else { - final long size = data.length; - Thread notifier = new I2PThread(new Runnable() { - public void run() { - if (_sessionListener != null) - _sessionListener.messageAvailable(I2PSessionImpl.this, id, size); - } - }); - long nid = ++__notifierId; - notifier.setName("Notifier " + nid); - notifier.setDaemon(true); - notifier.start(); + int size = data.length; + _availabilityNotifier.available(id, size); if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + "Notifier " + nid + " is for session " + _sessionId + ", message " + id + "]"); + _log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id); } } + private class AvailabilityNotifier implements Runnable { + private List _pendingIds; + private List _pendingSizes; + private boolean _alive; + + public AvailabilityNotifier() { + _pendingIds = new ArrayList(2); + _pendingSizes = new ArrayList(2); + } + + public void stopNotifying() { _alive = false; } + + public void available(int msgId, int size) { + synchronized (AvailabilityNotifier.this) { + _pendingIds.add(new Integer(msgId)); + _pendingSizes.add(new Integer(size)); + AvailabilityNotifier.this.notifyAll(); + } + } + public void run() { + _alive = true; + while (_alive) { + Integer msgId = null; + Integer size = null; + synchronized (AvailabilityNotifier.this) { + if (_pendingIds.size() <= 0) { + try { + AvailabilityNotifier.this.wait(); + } catch (InterruptedException ie) {} + } + if (_pendingIds.size() > 0) { + msgId = (Integer)_pendingIds.remove(0); + size = (Integer)_pendingSizes.remove(0); + } + } + if ( (msgId != null) && (size != null) ) { + if (_sessionListener != null) + _sessionListener.messageAvailable(I2PSessionImpl.this, msgId.intValue(), size.intValue()); + } + } + } + } + /** * Recieve notification of some I2CP message and handle it if possible * @@ -466,6 +509,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa propogateError("Error destroying the session", ipe); } } + _availabilityNotifier.stopNotifying(); _closed = true; closeSocket(); if (_sessionListener != null) _sessionListener.disconnected(this);