concurrentify _availableMessages

This commit is contained in:
zzz
2009-02-09 12:56:53 +00:00
parent f344c9e0be
commit cdab99bd25

View File

@ -14,6 +14,7 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -81,7 +82,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** class that generates new messages */ /** class that generates new messages */
protected I2CPMessageProducer _producer; protected I2CPMessageProducer _producer;
/** map of Long --> MessagePayloadMessage */ /** map of Long --> MessagePayloadMessage */
private Map _availableMessages; private Map<Long, MessagePayloadMessage> _availableMessages;
protected I2PClientMessageHandlerMap _handlerMap; protected I2PClientMessageHandlerMap _handlerMap;
@ -139,7 +140,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_closing = false; _closing = false;
_producer = new I2CPMessageProducer(context); _producer = new I2CPMessageProducer(context);
_availabilityNotifier = new AvailabilityNotifier(); _availabilityNotifier = new AvailabilityNotifier();
_availableMessages = new HashMap(); _availableMessages = new ConcurrentHashMap();
try { try {
readDestination(destKeyStream); readDestination(destKeyStream);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
@ -152,7 +153,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
loadConfig(options); loadConfig(options);
_sessionId = null; _sessionId = null;
_leaseSet = null; _leaseSet = null;
_context.statManager().createRateStat("client.availableMessages", "How many messages are available for the current client", "ClientMessages", new long[] { 60*1000, 10*60*1000 });
} }
/** /**
@ -309,15 +309,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* *
*/ */
public byte[] receiveMessage(int msgId) throws I2PSessionException { public byte[] receiveMessage(int msgId) throws I2PSessionException {
int remaining = 0; MessagePayloadMessage msg = _availableMessages.remove(new Long(msgId));
MessagePayloadMessage msg = null;
synchronized (_availableMessages) {
msg = (MessagePayloadMessage) _availableMessages.remove(new Long(msgId));
remaining = _availableMessages.size();
}
_context.statManager().addRateData("client.availableMessages", remaining, 0);
if (msg == null) { if (msg == null) {
_log.error("Receive message " + msgId + " had no matches, remaining=" + remaining); _log.error("Receive message " + msgId + " had no matches");
return null; return null;
} }
updateActivity(); updateActivity();
@ -357,12 +351,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
*/ */
public void addNewMessage(MessagePayloadMessage msg) { public void addNewMessage(MessagePayloadMessage msg) {
Long mid = new Long(msg.getMessageId()); Long mid = new Long(msg.getMessageId());
int avail = 0; _availableMessages.put(mid, msg);
synchronized (_availableMessages) {
_availableMessages.put(mid, msg);
avail = _availableMessages.size();
}
_context.statManager().addRateData("client.availableMessages", avail, 0);
long id = msg.getMessageId(); long id = msg.getMessageId();
byte data[] = msg.getPayload().getUnencryptedData(); byte data[] = msg.getPayload().getUnencryptedData();
if ((data == null) || (data.length <= 0)) { if ((data == null) || (data.length <= 0)) {
@ -382,16 +371,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
public VerifyUsage(Long id) { _msgId = id; } public VerifyUsage(Long id) { _msgId = id; }
public void timeReached() { public void timeReached() {
MessagePayloadMessage removed = null; MessagePayloadMessage removed = _availableMessages.remove(_msgId);
int remaining = 0; if (removed != null && !isClosed())
synchronized (_availableMessages) { _log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed);
removed = (MessagePayloadMessage)_availableMessages.remove(_msgId);
remaining = _availableMessages.size();
}
if (removed != null) {
_log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed + ": remaining: " + remaining);
_context.statManager().addRateData("client.availableMessages", remaining, 0);
}
} }
} }