avoid the race that could corrupt local transfers by using a single thread to receive notifications of message availability (and in turn fetch that data)

the old way fired off a new (very short lived) thread for each message received, and if two happened really really quickly, they'd both lock on the mutex and the order would be undefined
this avoids that.  thanks to oOo et al for pestering me and sending in logs :)
This commit is contained in:
jrandom
2004-07-29 20:02:12 +00:00
committed by zzz
parent 4a9bd84bf0
commit c6bb8f09ca

View File

@ -14,10 +14,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -85,8 +85,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** used to seperate things out so we can get rid of singletons */
protected I2PAppContext _context;
/** MessageStatusMessage status from the most recent send that hasn't been consumed */
private List _receivedStatus;
private int _totalReconnectAttempts;
/** monitor for waiting until a lease set has been granted */
@ -100,6 +98,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** lock that we wait upon, that the SetDateMessageHandler notifies */
private Object _dateReceivedLock = new Object();
/**
* thread that we tell when new messages are available who then tells us
* to fetch them. The point of this is so that the fetch doesn't block the
* reading of other messages (in turn, potentially leading to deadlock)
*
*/
private AvailabilityNotifier _availabilityNotifier;
void dateUpdated() {
_dateReceived = true;
synchronized (_dateReceivedLock) {
@ -119,6 +125,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_handlerMap = new I2PClientMessageHandlerMap(context);
_closed = true;
_producer = new I2CPMessageProducer(context);
_availabilityNotifier = new AvailabilityNotifier();
_availableMessages = new HashMap();
try {
readDestination(destKeyStream);
@ -129,7 +136,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
}
loadConfig(options);
_sessionId = null;
_receivedStatus = new LinkedList();
_leaseSet = null;
_totalReconnectAttempts = 0;
}
@ -220,6 +226,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
*/
public void connect() throws I2PSessionException {
_closed = false;
I2PThread notifier = new I2PThread(_availabilityNotifier);
notifier.setName("Notifier " + _myDestination.calculateHash().toBase64().substring(0,4));
notifier.setDaemon(true);
notifier.start();
long startConnect = _context.clock().now();
try {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "connect begin to " + _hostname + ":" + _portNum);
@ -317,33 +328,65 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
return tags;
}
private static volatile long __notifierId = 0;
/**
* Recieve a payload message and let the app know its available
*/
public void addNewMessage(MessagePayloadMessage msg) {
_availableMessages.put(new Integer(msg.getMessageId().getMessageId()), msg);
final int id = msg.getMessageId().getMessageId();
int id = msg.getMessageId().getMessageId();
byte data[] = msg.getPayload().getUnencryptedData();
if ((data == null) || (data.length <= 0)) {
if (_log.shouldLog(Log.ERROR))
_log.error(getPrefix() + "addNewMessage of a message with no unencrypted data",
new Exception("Empty message"));
} else {
final long size = data.length;
Thread notifier = new I2PThread(new Runnable() {
public void run() {
if (_sessionListener != null)
_sessionListener.messageAvailable(I2PSessionImpl.this, id, size);
}
});
long nid = ++__notifierId;
notifier.setName("Notifier " + nid);
notifier.setDaemon(true);
notifier.start();
int size = data.length;
_availabilityNotifier.available(id, size);
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Notifier " + nid + " is for session " + _sessionId + ", message " + id + "]");
_log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id);
}
}
private class AvailabilityNotifier implements Runnable {
private List _pendingIds;
private List _pendingSizes;
private boolean _alive;
public AvailabilityNotifier() {
_pendingIds = new ArrayList(2);
_pendingSizes = new ArrayList(2);
}
public void stopNotifying() { _alive = false; }
public void available(int msgId, int size) {
synchronized (AvailabilityNotifier.this) {
_pendingIds.add(new Integer(msgId));
_pendingSizes.add(new Integer(size));
AvailabilityNotifier.this.notifyAll();
}
}
public void run() {
_alive = true;
while (_alive) {
Integer msgId = null;
Integer size = null;
synchronized (AvailabilityNotifier.this) {
if (_pendingIds.size() <= 0) {
try {
AvailabilityNotifier.this.wait();
} catch (InterruptedException ie) {}
}
if (_pendingIds.size() > 0) {
msgId = (Integer)_pendingIds.remove(0);
size = (Integer)_pendingSizes.remove(0);
}
}
if ( (msgId != null) && (size != null) ) {
if (_sessionListener != null)
_sessionListener.messageAvailable(I2PSessionImpl.this, msgId.intValue(), size.intValue());
}
}
}
}
@ -466,6 +509,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
propogateError("Error destroying the session", ipe);
}
}
_availabilityNotifier.stopNotifying();
_closed = true;
closeSocket();
if (_sessionListener != null) _sessionListener.disconnected(this);