use concurrent

This commit is contained in:
zzz
2009-02-02 18:03:16 +00:00
parent 8d7340500f
commit 7ec29b0c5a
2 changed files with 52 additions and 101 deletions

View File

@ -11,6 +11,7 @@ package net.i2p.router.client;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -59,7 +60,7 @@ public class ClientConnectionRunner {
/** user's config */ /** user's config */
private SessionConfig _config; private SessionConfig _config;
/** static mapping of MessageId to Payload, storing messages for retrieval */ /** static mapping of MessageId to Payload, storing messages for retrieval */
private Map _messages; private Map<MessageId, Payload> _messages;
/** lease set request state, or null if there is no request pending on at the moment */ /** lease set request state, or null if there is no request pending on at the moment */
private LeaseRequestState _leaseRequest; private LeaseRequestState _leaseRequest;
/** currently allocated leaseSet, or null if none is allocated */ /** currently allocated leaseSet, or null if none is allocated */
@ -88,7 +89,7 @@ public class ClientConnectionRunner {
_manager = manager; _manager = manager;
_socket = socket; _socket = socket;
_config = null; _config = null;
_messages = new HashMap(); _messages = new ConcurrentHashMap();
_alreadyProcessed = new ArrayList(); _alreadyProcessed = new ArrayList();
_acceptedPending = new HashSet(); _acceptedPending = new HashSet();
_dead = false; _dead = false;
@ -106,7 +107,7 @@ public class ClientConnectionRunner {
_reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this)); _reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this));
_writer = new ClientWriterRunner(_context, this); _writer = new ClientWriterRunner(_context, this);
I2PThread t = new I2PThread(_writer); I2PThread t = new I2PThread(_writer);
t.setName("Writer " + ++__id); t.setName("I2CP Writer " + ++__id);
t.setDaemon(true); t.setDaemon(true);
t.setPriority(I2PThread.MAX_PRIORITY); t.setPriority(I2PThread.MAX_PRIORITY);
t.start(); t.start();
@ -128,9 +129,7 @@ public class ClientConnectionRunner {
if (_reader != null) _reader.stopReading(); if (_reader != null) _reader.stopReading();
if (_writer != null) _writer.stopWriting(); if (_writer != null) _writer.stopWriting();
if (_socket != null) try { _socket.close(); } catch (IOException ioe) { } if (_socket != null) try { _socket.close(); } catch (IOException ioe) { }
synchronized (_messages) {
_messages.clear(); _messages.clear();
}
if (_manager != null) if (_manager != null)
_manager.unregisterConnection(this); _manager.unregisterConnection(this);
if (_currentLeaseSet != null) if (_currentLeaseSet != null)
@ -164,51 +163,19 @@ public class ClientConnectionRunner {
} }
/** already closed? */ /** already closed? */
boolean isDead() { return _dead; } boolean isDead() { return _dead; }
/** message body */ /** message body */
Payload getPayload(MessageId id) { Payload getPayload(MessageId id) {
Payload rv = null; return _messages.get(id);
long beforeLock = _context.clock().now();
long inLock = 0;
synchronized (_messages) {
inLock = _context.clock().now();
rv = (Payload)_messages.get(id);
} }
long afterLock = _context.clock().now();
if (afterLock - beforeLock > 50) {
_log.warn("alreadyAccepted.locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
return rv;
}
void setPayload(MessageId id, Payload payload) { void setPayload(MessageId id, Payload payload) {
long beforeLock = _context.clock().now();
long inLock = 0;
synchronized (_messages) {
inLock = _context.clock().now();
_messages.put(id, payload); _messages.put(id, payload);
} }
long afterLock = _context.clock().now();
if (afterLock - beforeLock > 50) {
_log.warn("setPayload.locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
}
void removePayload(MessageId id) { void removePayload(MessageId id) {
long beforeLock = _context.clock().now();
long inLock = 0;
synchronized (_messages) {
inLock = _context.clock().now();
_messages.remove(id); _messages.remove(id);
} }
long afterLock = _context.clock().now();
if (afterLock - beforeLock > 50) {
_log.warn("removePayload.locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
}
void sessionEstablished(SessionConfig config) { void sessionEstablished(SessionConfig config) {
_destHashCache = config.getDestination().calculateHash(); _destHashCache = config.getDestination().calculateHash();

View File

@ -1,9 +1,13 @@
package net.i2p.router.client; package net.i2p.router.client;
import java.util.ArrayList; import java.io.IOException;
import java.util.List; import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageImpl;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -13,26 +17,18 @@ import net.i2p.util.Log;
* the client reads from their i2cp socket, causing all sorts of bad shit to * the client reads from their i2cp socket, causing all sorts of bad shit to
* happen) * happen)
* *
* @author zzz modded to use concurrent
*/ */
class ClientWriterRunner implements Runnable { class ClientWriterRunner implements Runnable {
private List _messagesToWrite; private BlockingQueue<I2CPMessage> _messagesToWrite;
private List _messagesToWriteTimes;
private ClientConnectionRunner _runner; private ClientConnectionRunner _runner;
private RouterContext _context;
private Log _log; private Log _log;
private long _id; private long _id;
private static long __id = 0; private static long __id = 0;
private static final long MAX_WAIT = 5*1000;
/** lock on this when updating the class level data structs */
private Object _dataLock = new Object();
public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) { public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) {
_context = context;
_log = context.logManager().getLog(ClientWriterRunner.class); _log = context.logManager().getLog(ClientWriterRunner.class);
_messagesToWrite = new ArrayList(4); _messagesToWrite = new LinkedBlockingQueue();
_messagesToWriteTimes = new ArrayList(4);
_runner = runner; _runner = runner;
_id = ++__id; _id = ++__id;
} }
@ -42,11 +38,9 @@ class ClientWriterRunner implements Runnable {
* *
*/ */
public void addMessage(I2CPMessage msg) { public void addMessage(I2CPMessage msg) {
synchronized (_dataLock) { try {
_messagesToWrite.add(msg); _messagesToWrite.put(msg);
_messagesToWriteTimes.add(new Long(_context.clock().now())); } catch (InterruptedException ie) {}
_dataLock.notifyAll();
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("["+_id+"] addMessage completed for " + msg.getClass().getName()); _log.debug("["+_id+"] addMessage completed for " + msg.getClass().getName());
} }
@ -56,47 +50,37 @@ class ClientWriterRunner implements Runnable {
* *
*/ */
public void stopWriting() { public void stopWriting() {
synchronized (_dataLock) { _messagesToWrite.clear();
_dataLock.notifyAll(); try {
} _messagesToWrite.put(new PoisonMessage());
} catch (InterruptedException ie) {}
} }
public void run() { public void run() {
List messages = new ArrayList(64); I2CPMessage msg;
List messageTimes = new ArrayList(64);
List switchList = null;
while (!_runner.getIsDead()) { while (!_runner.getIsDead()) {
synchronized (_dataLock) { try {
if (_messagesToWrite.size() <= 0) msg = _messagesToWrite.take();
try { _dataLock.wait(); } catch (InterruptedException ie) {} } catch (InterruptedException ie) {
continue;
if (_messagesToWrite.size() > 0) {
switchList = _messagesToWrite;
_messagesToWrite = messages;
messages = switchList;
switchList = _messagesToWriteTimes;
_messagesToWriteTimes = messageTimes;
messageTimes = switchList;
} }
} if (msg.getType() == PoisonMessage.MESSAGE_TYPE)
break;
if (messages.size() > 0) {
for (int i = 0; i < messages.size(); i++) {
I2CPMessage msg = (I2CPMessage)messages.get(i);
Long when = (Long)messageTimes.get(i);
if (_log.shouldLog(Log.DEBUG))
_log.debug("["+_id+"] writeMessage before writing "
+ msg.getClass().getName());
_runner.writeMessage(msg); _runner.writeMessage(msg);
if (_log.shouldLog(Log.DEBUG))
_log.debug("["+_id+"] writeMessage time since addMessage(): "
+ (_context.clock().now()-when.longValue()) + " for "
+ msg.getClass().getName());
} }
} }
messages.clear();
messageTimes.clear(); /**
} * End-of-stream msg used to stop the concurrent queue
* See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
*
*/
private static class PoisonMessage extends I2CPMessageImpl {
public static final int MESSAGE_TYPE = 999999;
public int getType() {
return MESSAGE_TYPE;
}
public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
} }
} }