disapproval of revision 'bd09bb36a90e766b3a406d78055d427a6200dd41'

This commit is contained in:
sponge
2008-09-25 23:31:57 +00:00
parent fa5c7219d3
commit ee2fd32a97
13 changed files with 1279 additions and 1673 deletions

View File

@ -12,7 +12,6 @@ import java.net.ConnectException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Properties; import java.util.Properties;
@ -220,9 +219,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Error accepting", ce); _log.error("Error accepting", ce);
// not killing the server.. // not killing the server..
} catch(SocketTimeoutException ste) { }
// ignored, we never set the timeout
}
} }
} }
} }

View File

@ -2,7 +2,6 @@ package net.i2p.client.streaming;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException;
import net.i2p.I2PException; import net.i2p.I2PException;
/** /**
@ -10,40 +9,26 @@ import net.i2p.I2PException;
* *
*/ */
public interface I2PServerSocket { public interface I2PServerSocket {
/**
* Closes the socket.
*/
public void close() throws I2PException;
/** /**
* Closes the socket. * Waits for the next socket connecting. If a remote user tried to make a
*/ * connection and the local application wasn't .accept()ing new connections,
public void close() throws I2PException; * they should get refused (if .accept() doesnt occur in some small period)
*
* @return a connected I2PSocket
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
*/
public I2PSocket accept() throws I2PException, ConnectException;
/** /**
* Waits for the next socket connecting. If a remote user tried to make a * Access the manager which is coordinating the server socket
* connection and the local application wasn't .accept()ing new connections, */
* they should get refused (if .accept() doesnt occur in some small period) public I2PSocketManager getManager();
*
* @return a connected I2PSocket
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
* @throws SocketTimeoutException
*/
public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
/**
* Set Sock Option accept timeout
* @param x
*/
public void setSoTimeout(long x);
/**
* Get Sock Option accept timeout
* @return timeout
*/
public long getSoTimeout();
/**
* Access the manager which is coordinating the server socket
*/
public I2PSocketManager getManager();
} }

View File

@ -17,159 +17,134 @@ import net.i2p.util.Log;
* *
*/ */
class I2PServerSocketImpl implements I2PServerSocket { class I2PServerSocketImpl implements I2PServerSocket {
private final static Log _log = new Log(I2PServerSocketImpl.class);
private I2PSocketManager mgr;
/** list of sockets waiting for the client to accept them */
private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
private final static Log _log = new Log(I2PServerSocketImpl.class); /** have we been closed */
private I2PSocketManager mgr; private volatile boolean closing = false;
/** list of sockets waiting for the client to accept them */
private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
/** have we been closed */
private volatile boolean closing = false;
/** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
private Object socketAcceptedLock = new Object();
/** lock on this when adding a new socket to the pending list, and wait on it accordingly */
private Object socketAddedLock = new Object();
/** /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
* Set Sock Option accept timeout stub, does nothing private Object socketAcceptedLock = new Object();
* @param x /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
*/ private Object socketAddedLock = new Object();
public void setSoTimeout(long x) {
}
/** public I2PServerSocketImpl(I2PSocketManager mgr) {
* Get Sock Option accept timeout stub, does nothing this.mgr = mgr;
* @return timeout }
*/
public long getSoTimeout() {
return -1;
}
public I2PServerSocketImpl(I2PSocketManager mgr) { /**
this.mgr = mgr; * Waits for the next socket connecting. If a remote user tried to make a
} * connection and the local application wasn't .accept()ing new connections,
* they should get refused (if .accept() doesnt occur in some small period -
* currently 5 seconds)
*
* @return a connected I2PSocket
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
*/
public I2PSocket accept() throws I2PException, ConnectException {
if (_log.shouldLog(Log.DEBUG))
_log.debug("accept() called, pending: " + pendingSockets.size());
/** I2PSocket ret = null;
* Waits for the next socket connecting. If a remote user tried to make a
* connection and the local application wasn't .accept()ing new connections,
* they should get refused (if .accept() doesnt occur in some small period -
* currently 5 seconds)
*
* @return a connected I2PSocket
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
*/
public I2PSocket accept() throws I2PException, ConnectException {
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("accept() called, pending: " + pendingSockets.size());
}
I2PSocket ret = null;
while((ret == null) && (!closing)) { while ( (ret == null) && (!closing) ){
while(pendingSockets.size() <= 0) { while (pendingSockets.size() <= 0) {
if(closing) { if (closing) throw new ConnectException("I2PServerSocket closed");
throw new ConnectException("I2PServerSocket closed"); try {
} synchronized(socketAddedLock) {
try { socketAddedLock.wait();
synchronized(socketAddedLock) { }
socketAddedLock.wait(); } catch (InterruptedException ie) {}
} }
} catch(InterruptedException ie) { synchronized (pendingSockets) {
} if (pendingSockets.size() > 0) {
} ret = (I2PSocket)pendingSockets.remove(0);
synchronized(pendingSockets) { }
if(pendingSockets.size() > 0) { }
ret = (I2PSocket)pendingSockets.remove(0); if (ret != null) {
} synchronized (socketAcceptedLock) {
} socketAcceptedLock.notifyAll();
if(ret != null) { }
synchronized(socketAcceptedLock) { }
socketAcceptedLock.notifyAll(); }
}
}
}
if(_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG))
_log.debug("TIMING: handed out accept result " + ret.hashCode()); _log.debug("TIMING: handed out accept result " + ret.hashCode());
} return ret;
return ret; }
}
/** /**
* Make the socket available and wait until the client app accepts it, or until * Make the socket available and wait until the client app accepts it, or until
* the given timeout elapses. This doesn't have any limits on the queue size - * the given timeout elapses. This doesn't have any limits on the queue size -
* perhaps it should add some choking (e.g. after 5 waiting for accept, refuse) * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
* *
* @param timeoutMs how long to wait until accept * @param timeoutMs how long to wait until accept
* @return true if the socket was accepted, false if the timeout expired * @return true if the socket was accepted, false if the timeout expired
* or the socket was closed * or the socket was closed
*/ */
public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
if(_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG))
_log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
}
if(closing) {
if(_log.shouldLog(Log.WARN)) {
_log.warn("Already closing the socket");
}
return false;
}
Clock clock = I2PAppContext.getGlobalContext().clock(); if (closing) {
long start = clock.now(); if (_log.shouldLog(Log.WARN))
long end = start + timeoutMs; _log.warn("Already closing the socket");
pendingSockets.add(s); return false;
synchronized(socketAddedLock) { }
socketAddedLock.notifyAll();
}
// keep looping until the socket has been grabbed by the accept() Clock clock = I2PAppContext.getGlobalContext().clock();
// (or the expiration passes, or the socket is closed) long start = clock.now();
while(pendingSockets.contains(s)) { long end = start + timeoutMs;
long now = clock.now(); pendingSockets.add(s);
if(now >= end) { synchronized (socketAddedLock) {
if(_log.shouldLog(Log.INFO)) { socketAddedLock.notifyAll();
_log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); }
}
pendingSockets.remove(s);
return false;
}
if(closing) {
if(_log.shouldLog(Log.WARN)) {
_log.warn("Server socket closed while waiting for accept");
}
pendingSockets.remove(s);
return false;
}
long remaining = end - now;
try {
synchronized(socketAcceptedLock) {
socketAcceptedLock.wait(remaining);
}
} catch(InterruptedException ie) {
}
}
long now = clock.now();
if(_log.shouldLog(Log.DEBUG)) {
_log.info("Socket accepted after " + (now - start) + "ms for socket " + s.toString());
}
return true;
}
public void close() { // keep looping until the socket has been grabbed by the accept()
closing = true; // (or the expiration passes, or the socket is closed)
// let anyone .accept()ing know to fsck off while (pendingSockets.contains(s)) {
synchronized(socketAddedLock) { long now = clock.now();
socketAddedLock.notifyAll(); if (now >= end) {
} if (_log.shouldLog(Log.INFO))
// let anyone addWaitForAccept()ing know to fsck off _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
synchronized(socketAcceptedLock) { pendingSockets.remove(s);
socketAcceptedLock.notifyAll(); return false;
} }
} if (closing) {
if (_log.shouldLog(Log.WARN))
_log.warn("Server socket closed while waiting for accept");
pendingSockets.remove(s);
return false;
}
long remaining = end - now;
try {
synchronized (socketAcceptedLock) {
socketAcceptedLock.wait(remaining);
}
} catch (InterruptedException ie) {}
}
long now = clock.now();
if (_log.shouldLog(Log.DEBUG))
_log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
return true;
}
public I2PSocketManager getManager() { public void close() {
return mgr; closing = true;
} // let anyone .accept()ing know to fsck off
synchronized (socketAddedLock) {
socketAddedLock.notifyAll();
}
// let anyone addWaitForAccept()ing know to fsck off
synchronized (socketAcceptedLock) {
socketAcceptedLock.notifyAll();
}
}
public I2PSocketManager getManager() { return mgr; }
} }

View File

@ -5,7 +5,6 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.Properties; import java.util.Properties;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
@ -21,203 +20,173 @@ import net.i2p.util.Log;
* *
*/ */
public class StreamSinkServer { public class StreamSinkServer {
private Log _log;
private String _sinkDir;
private String _destFile;
private String _i2cpHost;
private int _i2cpPort;
private int _handlers;
private Log _log; /**
private String _sinkDir; * Create but do not start the streaming server.
private String _destFile; *
private String _i2cpHost; * @param sinkDir Directory to store received files in
private int _i2cpPort; * @param ourDestFile filename to write our binary destination to
private int _handlers; */
public StreamSinkServer(String sinkDir, String ourDestFile) {
this(sinkDir, ourDestFile, null, -1, 3);
}
public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
_sinkDir = sinkDir;
_destFile = ourDestFile;
_i2cpHost = i2cpHost;
_i2cpPort = i2cpPort;
_handlers = handlers;
_log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
}
/** /**
* Create but do not start the streaming server. * Actually fire up the server - this call blocks forever (or until the server
* * socket closes)
* @param sinkDir Directory to store received files in *
* @param ourDestFile filename to write our binary destination to */
*/ public void runServer() {
public StreamSinkServer(String sinkDir, String ourDestFile) { I2PSocketManager mgr = null;
this(sinkDir, ourDestFile, null, -1, 3); if (_i2cpHost != null)
} mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
else
mgr = I2PSocketManagerFactory.createManager();
Destination dest = mgr.getSession().getMyDestination();
if (_log.shouldLog(Log.INFO))
_log.info("Listening for connections on: " + dest.calculateHash().toBase64());
FileOutputStream fos = null;
try {
fos = new FileOutputStream(_destFile);
dest.writeBytes(fos);
} catch (IOException ioe) {
_log.error("Error writing out our destination to " + _destFile, ioe);
return;
} catch (DataFormatException dfe) {
_log.error("Error formatting the destination", dfe);
return;
} finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {}
}
public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) { I2PServerSocket sock = mgr.getServerSocket();
_sinkDir = sinkDir; startup(sock);
_destFile = ourDestFile; }
_i2cpHost = i2cpHost;
_i2cpPort = i2cpPort;
_handlers = handlers;
_log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
}
/** public void startup(I2PServerSocket sock) {
* Actually fire up the server - this call blocks forever (or until the server for (int i = 0; i < _handlers; i++) {
* socket closes) I2PThread t = new I2PThread(new ClientRunner(sock));
* t.setName("Handler " + i);
*/ t.setDaemon(false);
public void runServer() { t.start();
I2PSocketManager mgr = null; }
if(_i2cpHost != null) { }
mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
} else {
mgr = I2PSocketManagerFactory.createManager();
}
Destination dest = mgr.getSession().getMyDestination();
if(_log.shouldLog(Log.INFO)) {
_log.info("Listening for connections on: " + dest.calculateHash().toBase64());
}
FileOutputStream fos = null;
try {
fos = new FileOutputStream(_destFile);
dest.writeBytes(fos);
} catch(IOException ioe) {
_log.error("Error writing out our destination to " + _destFile, ioe);
return;
} catch(DataFormatException dfe) {
_log.error("Error formatting the destination", dfe);
return;
} finally {
if(fos != null) {
try {
fos.close();
} catch(IOException ioe) {
}
}
}
I2PServerSocket sock = mgr.getServerSocket(); /**
startup(sock); * Actually deal with a client - pull anything they send us and write it to a file.
} *
*/
private class ClientRunner implements Runnable {
private I2PServerSocket _socket;
public ClientRunner(I2PServerSocket socket) {
_socket = socket;
}
public void run() {
while (true) {
try {
I2PSocket socket = _socket.accept();
if (socket != null)
handle(socket);
} catch (I2PException ie) {
_log.error("Error accepting connection", ie);
return;
} catch (ConnectException ce) {
_log.error("Connection already dropped", ce);
return;
}
}
}
public void startup(I2PServerSocket sock) { private void handle(I2PSocket sock) {
for(int i = 0; i < _handlers; i++) { FileOutputStream fos = null;
I2PThread t = new I2PThread(new ClientRunner(sock)); try {
t.setName("Handler " + i); File sink = new File(_sinkDir);
t.setDaemon(false); if (!sink.exists())
t.start(); sink.mkdirs();
} File cur = File.createTempFile("clientSink", ".dat", sink);
} fos = new FileOutputStream(cur);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Writing to " + cur.getAbsolutePath());
} catch (IOException ioe) {
_log.error("Error creating sink", ioe);
return;
}
/** long start = System.currentTimeMillis();
* Actually deal with a client - pull anything they send us and write it to a file. try {
* InputStream in = sock.getInputStream();
*/ byte buf[] = new byte[4096];
private class ClientRunner implements Runnable { long written = 0;
int read = 0;
while ( (read = in.read(buf)) != -1) {
//_fos.write(buf, 0, read);
written += read;
if (_log.shouldLog(Log.DEBUG))
_log.debug("read and wrote " + read + " (" + written + ")");
}
fos.write(("written: [" + written + "]\n").getBytes());
long lifetime = System.currentTimeMillis() - start;
_log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
} catch (IOException ioe) {
_log.error("Error writing the sink", ioe);
} finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {}
if (sock != null) try { sock.close(); } catch (IOException ioe) {}
_log.debug("Client socket closed");
}
}
}
private I2PServerSocket _socket; /**
* Fire up the streaming server. <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]</code><br />
public ClientRunner(I2PServerSocket socket) { * <ul>
_socket = socket; * <li><b>sinkDir</b>: Directory to store received files in</li>
} * <li><b>ourDestFile</b>: filename to write our binary destination to</li>
* <li><b>numHandlers</b>: how many concurrent connections to handle</li>
public void run() { * </ul>
while(true) { */
try { public static void main(String args[]) {
I2PSocket socket = _socket.accept(); StreamSinkServer server = null;
if(socket != null) { switch (args.length) {
handle(socket); case 0:
} server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
} catch(I2PException ie) { break;
_log.error("Error accepting connection", ie); case 2:
return; server = new StreamSinkServer(args[0], args[1]);
} catch(ConnectException ce) { break;
_log.error("Connection already dropped", ce); case 4:
return; case 5:
} catch(SocketTimeoutException ste) { int handlers = 3;
// ignored if (args.length == 5) {
} try {
} handlers = Integer.parseInt(args[4]);
} } catch (NumberFormatException nfe) {}
}
private void handle(I2PSocket sock) { try {
FileOutputStream fos = null; int port = Integer.parseInt(args[1]);
try { server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
File sink = new File(_sinkDir); } catch (NumberFormatException nfe) {
if(!sink.exists()) { System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
sink.mkdirs(); }
} break;
File cur = File.createTempFile("clientSink", ".dat", sink); default:
fos = new FileOutputStream(cur); System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
if(_log.shouldLog(Log.DEBUG)) { }
_log.debug("Writing to " + cur.getAbsolutePath()); if (server != null)
} server.runServer();
} catch(IOException ioe) { }
_log.error("Error creating sink", ioe);
return;
}
long start = System.currentTimeMillis();
try {
InputStream in = sock.getInputStream();
byte buf[] = new byte[4096];
long written = 0;
int read = 0;
while((read = in.read(buf)) != -1) {
//_fos.write(buf, 0, read);
written += read;
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("read and wrote " + read + " (" + written + ")");
}
}
fos.write(("written: [" + written + "]\n").getBytes());
long lifetime = System.currentTimeMillis() - start;
_log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
} catch(IOException ioe) {
_log.error("Error writing the sink", ioe);
} finally {
if(fos != null) {
try {
fos.close();
} catch(IOException ioe) {
}
}
if(sock != null) {
try {
sock.close();
} catch(IOException ioe) {
}
}
_log.debug("Client socket closed");
}
}
}
/**
* Fire up the streaming server. <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]</code><br />
* <ul>
* <li><b>sinkDir</b>: Directory to store received files in</li>
* <li><b>ourDestFile</b>: filename to write our binary destination to</li>
* <li><b>numHandlers</b>: how many concurrent connections to handle</li>
* </ul>
*/
public static void main(String args[]) {
StreamSinkServer server = null;
switch(args.length) {
case 0:
server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
break;
case 2:
server = new StreamSinkServer(args[0], args[1]);
break;
case 4:
case 5:
int handlers = 3;
if(args.length == 5) {
try {
handlers = Integer.parseInt(args[4]);
} catch(NumberFormatException nfe) {
}
}
try {
int port = Integer.parseInt(args[1]);
server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
} catch(NumberFormatException nfe) {
System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
}
break;
default:
System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
}
if(server != null) {
server.runServer();
}
}
} }

View File

@ -1,6 +1,5 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -21,459 +21,393 @@ import net.i2p.util.SimpleTimer;
* *
*/ */
public class ConnectionManager { public class ConnectionManager {
private I2PAppContext _context;
private Log _log;
private I2PSession _session;
private MessageHandler _messageHandler;
private PacketHandler _packetHandler;
private ConnectionHandler _connectionHandler;
private PacketQueue _outboundQueue;
private SchedulerChooser _schedulerChooser;
private ConnectionPacketHandler _conPacketHandler;
/** Inbound stream ID (Long) to Connection map */
private Map _connectionByInboundId;
/** Ping ID (Long) to PingRequest */
private Map _pendingPings;
private boolean _allowIncoming;
private int _maxConcurrentStreams;
private ConnectionOptions _defaultOptions;
private volatile int _numWaiting;
private Object _connectionLock;
private I2PAppContext _context; public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
private Log _log; _context = context;
private I2PSession _session; _log = context.logManager().getLog(ConnectionManager.class);
private MessageHandler _messageHandler; _connectionByInboundId = new HashMap(32);
private PacketHandler _packetHandler; _pendingPings = new HashMap(4);
private ConnectionHandler _connectionHandler; _connectionLock = new Object();
private PacketQueue _outboundQueue; _messageHandler = new MessageHandler(context, this);
private SchedulerChooser _schedulerChooser; _packetHandler = new PacketHandler(context, this);
private ConnectionPacketHandler _conPacketHandler; _connectionHandler = new ConnectionHandler(context, this);
/** Inbound stream ID (Long) to Connection map */ _schedulerChooser = new SchedulerChooser(context);
private Map _connectionByInboundId; _conPacketHandler = new ConnectionPacketHandler(context);
/** Ping ID (Long) to PingRequest */ _session = session;
private Map _pendingPings; session.setSessionListener(_messageHandler);
private boolean _allowIncoming; _outboundQueue = new PacketQueue(context, session, this);
private int _maxConcurrentStreams; _allowIncoming = false;
private ConnectionOptions _defaultOptions; _maxConcurrentStreams = maxConcurrent;
private volatile int _numWaiting; _defaultOptions = defaultOptions;
private Object _connectionLock; _numWaiting = 0;
private long SoTimeout; _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
}
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { Connection getConnectionByInboundId(long id) {
_context = context; synchronized (_connectionLock) {
_log = context.logManager().getLog(ConnectionManager.class); return (Connection)_connectionByInboundId.get(new Long(id));
_connectionByInboundId = new HashMap(32); }
_pendingPings = new HashMap(4); }
_connectionLock = new Object(); /**
_messageHandler = new MessageHandler(context, this); * not guaranteed to be unique, but in case we receive more than one packet
_packetHandler = new PacketHandler(context, this); * on an inbound connection that we havent ack'ed yet...
_connectionHandler = new ConnectionHandler(context, this); */
_schedulerChooser = new SchedulerChooser(context); Connection getConnectionByOutboundId(long id) {
_conPacketHandler = new ConnectionPacketHandler(context); synchronized (_connectionLock) {
_session = session; for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
session.setSessionListener(_messageHandler); Connection con = (Connection)iter.next();
_outboundQueue = new PacketQueue(context, session, this); if (DataHelper.eq(con.getSendStreamId(), id))
_allowIncoming = false; return con;
_maxConcurrentStreams = maxConcurrent; }
_defaultOptions = defaultOptions; }
_numWaiting = 0; return null;
/** Socket timeout for accept() */ }
SoTimeout = -1;
_context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); public void setAllowIncomingConnections(boolean allow) {
_context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); _connectionHandler.setActive(allow);
_context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); }
_context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); /** should we acceot connections, or just reject everyone? */
_context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); public boolean getAllowIncomingConnections() {
_context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); return _connectionHandler.getActive();
_context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); }
_context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
_context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
_context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
}
Connection getConnectionByInboundId(long id) { /**
synchronized(_connectionLock) { * Create a new connection based on the SYN packet we received.
return (Connection)_connectionByInboundId.get(new Long(id)); *
} * @return created Connection with the packet's data already delivered to
} * it, or null if the syn's streamId was already taken
*/
public Connection receiveConnection(Packet synPacket) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
boolean reject = false;
int active = 0;
int total = 0;
synchronized (_connectionLock) {
total = _connectionByInboundId.size();
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
if ( ((Connection)iter.next()).getIsConnected() )
active++;
}
if (locked_tooManyStreams()) {
reject = true;
} else {
while (true) {
Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
if (oldCon == null) {
break;
} else {
_connectionByInboundId.put(new Long(receiveId), oldCon);
// receiveId already taken, try another
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
}
}
}
}
/** _context.statManager().addRateData("stream.receiveActive", active, total);
* not guaranteed to be unique, but in case we receive more than one packet
* on an inbound connection that we havent ack'ed yet...
*/
Connection getConnectionByOutboundId(long id) {
synchronized(_connectionLock) {
for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
Connection con = (Connection)iter.next();
if(DataHelper.eq(con.getSendStreamId(), id)) {
return con;
}
}
}
return null;
}
/** if (reject) {
* Set the socket accept() timeout. if (_log.shouldLog(Log.WARN))
* @param x _log.warn("Refusing connection since we have exceeded our max of "
*/ + _maxConcurrentStreams + " connections");
public void MsetSoTimeout(long x) { PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
SoTimeout = x; reply.setFlag(Packet.FLAG_RESET);
} reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setAckThrough(synPacket.getSequenceNum());
reply.setSendStreamId(synPacket.getReceiveStreamId());
reply.setReceiveStreamId(0);
reply.setOptionalFrom(_session.getMyDestination());
// this just sends the packet - no retries or whatnot
_outboundQueue.enqueue(reply);
return null;
}
/** con.setReceiveStreamId(receiveId);
* Get the socket accept() timeout. try {
* @return con.getPacketHandler().receivePacket(synPacket, con);
*/ } catch (I2PException ie) {
public long MgetSoTimeout() { synchronized (_connectionLock) {
return SoTimeout; _connectionByInboundId.remove(new Long(receiveId));
} }
return null;
}
public void setAllowIncomingConnections(boolean allow) { _context.statManager().addRateData("stream.connectionReceived", 1, 0);
_connectionHandler.setActive(allow); return con;
} }
/** should we acceot connections, or just reject everyone? */ private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
public boolean getAllowIncomingConnections() {
return _connectionHandler.getActive();
}
/** /**
* Create a new connection based on the SYN packet we received. * Build a new connection to the given peer. This blocks if there is no
* * connection delay, otherwise it returns immediately.
* @return created Connection with the packet's data already delivered to *
* it, or null if the syn's streamId was already taken * @return new connection, or null if we have exceeded our limit
*/ */
public Connection receiveConnection(Packet synPacket) { public Connection connect(Destination peer, ConnectionOptions opts) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); Connection con = null;
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
boolean reject = false; long expiration = _context.clock().now() + opts.getConnectTimeout();
int active = 0; if (opts.getConnectTimeout() <= 0)
int total = 0; expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
synchronized(_connectionLock) { _numWaiting++;
total = _connectionByInboundId.size(); while (true) {
for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { long remaining = expiration - _context.clock().now();
if(((Connection)iter.next()).getIsConnected()) { if (remaining <= 0) {
active++; if (_log.shouldLog(Log.WARN))
} _log.warn("Refusing to connect since we have exceeded our max of "
} + _maxConcurrentStreams + " connections");
if(locked_tooManyStreams()) { _numWaiting--;
reject = true; return null;
} else { }
while(true) { boolean reject = false;
Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con); synchronized (_connectionLock) {
if(oldCon == null) { if (locked_tooManyStreams()) {
break; // allow a full buffer of pending/waiting streams
} else { if (_numWaiting > _maxConcurrentStreams) {
_connectionByInboundId.put(new Long(receiveId), oldCon); if (_log.shouldLog(Log.WARN))
// receiveId already taken, try another _log.warn("Refusing connection since we have exceeded our max of "
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; + _maxConcurrentStreams + " and there are " + _numWaiting
} + " waiting already");
} _numWaiting--;
} return null;
} }
_context.statManager().addRateData("stream.receiveActive", active, total); // no remaining streams, lets wait a bit
try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
} else {
con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
con.setRemotePeer(peer);
if(reject) { while (_connectionByInboundId.containsKey(new Long(receiveId))) {
if(_log.shouldLog(Log.WARN)) { receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
_log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections"); }
} _connectionByInboundId.put(new Long(receiveId), con);
PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom()); break; // stop looping as a psuedo-wait
reply.setFlag(Packet.FLAG_RESET); }
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); }
reply.setAckThrough(synPacket.getSequenceNum()); }
reply.setSendStreamId(synPacket.getReceiveStreamId());
reply.setReceiveStreamId(0);
reply.setOptionalFrom(_session.getMyDestination());
// this just sends the packet - no retries or whatnot
_outboundQueue.enqueue(reply);
return null;
}
con.setReceiveStreamId(receiveId); // ok we're in...
try { con.setReceiveStreamId(receiveId);
con.getPacketHandler().receivePacket(synPacket, con); con.eventOccurred();
} catch(I2PException ie) {
synchronized(_connectionLock) {
_connectionByInboundId.remove(new Long(receiveId));
}
return null;
}
_context.statManager().addRateData("stream.connectionReceived", 1, 0); _log.debug("Connect() conDelay = " + opts.getConnectDelay());
return con; if (opts.getConnectDelay() <= 0) {
} con.waitForConnect();
private static final long DEFAULT_STREAM_DELAY_MAX = 10 * 1000; }
if (_numWaiting > 0)
_numWaiting--;
/** _context.statManager().addRateData("stream.connectionCreated", 1, 0);
* Build a new connection to the given peer. This blocks if there is no return con;
* connection delay, otherwise it returns immediately. }
*
* @return new connection, or null if we have exceeded our limit
*/
public Connection connect(Destination peer, ConnectionOptions opts) {
Connection con = null;
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
long expiration = _context.clock().now() + opts.getConnectTimeout();
if(opts.getConnectTimeout() <= 0) {
expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
}
_numWaiting++;
while(true) {
long remaining = expiration - _context.clock().now();
if(remaining <= 0) {
if(_log.shouldLog(Log.WARN)) {
_log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections");
}
_numWaiting--;
return null;
}
boolean reject = false;
synchronized(_connectionLock) {
if(locked_tooManyStreams()) {
// allow a full buffer of pending/waiting streams
if(_numWaiting > _maxConcurrentStreams) {
if(_log.shouldLog(Log.WARN)) {
_log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already");
}
_numWaiting--;
return null;
}
// no remaining streams, lets wait a bit private boolean locked_tooManyStreams() {
try { if (_maxConcurrentStreams <= 0) return false;
_connectionLock.wait(remaining); if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
} catch(InterruptedException ie) { int active = 0;
} for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
} else { Connection con = (Connection)iter.next();
con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); if (con.getIsConnected())
con.setRemotePeer(peer); active++;
}
while(_connectionByInboundId.containsKey(new Long(receiveId))) { if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) )
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; _log.info("More than 100 connections! " + active
} + " total: " + _connectionByInboundId.size());
_connectionByInboundId.put(new Long(receiveId), con);
break; // stop looping as a psuedo-wait
}
}
}
// ok we're in... return (active >= _maxConcurrentStreams);
con.setReceiveStreamId(receiveId); }
con.eventOccurred();
_log.debug("Connect() conDelay = " + opts.getConnectDelay()); public MessageHandler getMessageHandler() { return _messageHandler; }
if(opts.getConnectDelay() <= 0) { public PacketHandler getPacketHandler() { return _packetHandler; }
con.waitForConnect(); public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
} public I2PSession getSession() { return _session; }
if(_numWaiting > 0) { public PacketQueue getPacketQueue() { return _outboundQueue; }
_numWaiting--;
}
_context.statManager().addRateData("stream.connectionCreated", 1, 0);
return con;
}
private boolean locked_tooManyStreams() { /**
if(_maxConcurrentStreams <= 0) { * Something b0rked hard, so kill all of our connections without mercy.
return false; * Don't bother sending close packets.
} *
if(_connectionByInboundId.size() < _maxConcurrentStreams) { */
return false; public void disconnectAllHard() {
} synchronized (_connectionLock) {
int active = 0; for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { Connection con = (Connection)iter.next();
Connection con = (Connection)iter.next(); con.disconnect(false, false);
if(con.getIsConnected()) { }
active++; _connectionByInboundId.clear();
} _connectionLock.notifyAll();
} }
}
if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) { /**
_log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size()); * Drop the (already closed) connection on the floor.
} *
return (active >= _maxConcurrentStreams); */
} public void removeConnection(Connection con) {
boolean removed = false;
synchronized (_connectionLock) {
Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
removed = (o == con);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection removed? " + removed + " remaining: "
+ _connectionByInboundId.size() + ": " + con);
if (!removed && _log.shouldLog(Log.DEBUG))
_log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
_connectionLock.notifyAll();
}
if (removed) {
_context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
}
}
public MessageHandler getMessageHandler() { /** return a set of Connection objects */
return _messageHandler; public Set listConnections() {
} synchronized (_connectionLock) {
return new HashSet(_connectionByInboundId.values());
}
}
public PacketHandler getPacketHandler() { public boolean ping(Destination peer, long timeoutMs) {
return _packetHandler; return ping(peer, timeoutMs, true);
} }
public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
return ping(peer, timeoutMs, blocking, null, null, null);
}
public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
PacketLocal packet = new PacketLocal(_context, peer);
packet.setSendStreamId(id.longValue());
packet.setFlag(Packet.FLAG_ECHO);
packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
packet.setOptionalFrom(_session.getMyDestination());
if ( (keyToUse != null) && (tagsToSend != null) ) {
packet.setKeyUsed(keyToUse);
packet.setTagsSent(tagsToSend);
}
public ConnectionHandler getConnectionHandler() { PingRequest req = new PingRequest(peer, packet, notifier);
return _connectionHandler;
}
public I2PSession getSession() { synchronized (_pendingPings) {
return _session; _pendingPings.put(id, req);
} }
public PacketQueue getPacketQueue() { _outboundQueue.enqueue(packet);
return _outboundQueue; packet.releasePayload();
}
/** if (blocking) {
* Something b0rked hard, so kill all of our connections without mercy. synchronized (req) {
* Don't bother sending close packets. if (!req.pongReceived())
* try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
*/ }
public void disconnectAllHard() {
synchronized(_connectionLock) {
for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
Connection con = (Connection)iter.next();
con.disconnect(false, false);
}
_connectionByInboundId.clear();
_connectionLock.notifyAll();
}
}
/** synchronized (_pendingPings) {
* Drop the (already closed) connection on the floor. _pendingPings.remove(id);
* }
*/ } else {
public void removeConnection(Connection con) { SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
boolean removed = false; }
synchronized(_connectionLock) {
Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
removed = (o == con);
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con);
}
if(!removed && _log.shouldLog(Log.DEBUG)) {
_log.debug("Failed to remove " + con + "\n" + _connectionByInboundId.values());
}
_connectionLock.notifyAll();
}
if(removed) {
_context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
}
}
/** return a set of Connection objects */ boolean ok = req.pongReceived();
public Set listConnections() { return ok;
synchronized(_connectionLock) { }
return new HashSet(_connectionByInboundId.values());
}
}
public boolean ping(Destination peer, long timeoutMs) { interface PingNotifier {
return ping(peer, timeoutMs, true); public void pingComplete(boolean ok);
} }
public boolean ping(Destination peer, long timeoutMs, boolean blocking) { private class PingFailed implements SimpleTimer.TimedEvent {
return ping(peer, timeoutMs, blocking, null, null, null); private Long _id;
} private PingNotifier _notifier;
public PingFailed(Long id, PingNotifier notifier) {
_id = id;
_notifier = notifier;
}
public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { public void timeReached() {
Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1); boolean removed = false;
PacketLocal packet = new PacketLocal(_context, peer); synchronized (_pendingPings) {
packet.setSendStreamId(id.longValue()); Object o = _pendingPings.remove(_id);
packet.setFlag(Packet.FLAG_ECHO); if (o != null)
packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); removed = true;
packet.setOptionalFrom(_session.getMyDestination()); }
if((keyToUse != null) && (tagsToSend != null)) { if (removed) {
packet.setKeyUsed(keyToUse); if (_notifier != null)
packet.setTagsSent(tagsToSend); _notifier.pingComplete(false);
} if (_log.shouldLog(Log.INFO))
_log.info("Ping failed");
}
}
}
PingRequest req = new PingRequest(peer, packet, notifier); private class PingRequest {
private boolean _ponged;
private Destination _peer;
private PacketLocal _packet;
private PingNotifier _notifier;
public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
_ponged = false;
_peer = peer;
_packet = packet;
_notifier = notifier;
}
public void pong() {
_log.debug("Ping successful");
_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
synchronized (ConnectionManager.PingRequest.this) {
_ponged = true;
ConnectionManager.PingRequest.this.notifyAll();
}
if (_notifier != null)
_notifier.pingComplete(true);
}
public boolean pongReceived() { return _ponged; }
}
synchronized(_pendingPings) { void receivePong(long pingId) {
_pendingPings.put(id, req); PingRequest req = null;
} synchronized (_pendingPings) {
req = (PingRequest)_pendingPings.remove(new Long(pingId));
_outboundQueue.enqueue(packet); }
packet.releasePayload(); if (req != null)
req.pong();
if(blocking) { }
synchronized(req) {
if(!req.pongReceived()) {
try {
req.wait(timeoutMs);
} catch(InterruptedException ie) {
}
}
}
synchronized(_pendingPings) {
_pendingPings.remove(id);
}
} else {
SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
}
boolean ok = req.pongReceived();
return ok;
}
interface PingNotifier {
public void pingComplete(boolean ok);
}
private class PingFailed implements SimpleTimer.TimedEvent {
private Long _id;
private PingNotifier _notifier;
public PingFailed(Long id, PingNotifier notifier) {
_id = id;
_notifier = notifier;
}
public void timeReached() {
boolean removed = false;
synchronized(_pendingPings) {
Object o = _pendingPings.remove(_id);
if(o != null) {
removed = true;
}
}
if(removed) {
if(_notifier != null) {
_notifier.pingComplete(false);
}
if(_log.shouldLog(Log.INFO)) {
_log.info("Ping failed");
}
}
}
}
private class PingRequest {
private boolean _ponged;
private Destination _peer;
private PacketLocal _packet;
private PingNotifier _notifier;
public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
_ponged = false;
_peer = peer;
_packet = packet;
_notifier = notifier;
}
public void pong() {
_log.debug("Ping successful");
_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
synchronized(ConnectionManager.PingRequest.this) {
_ponged = true;
ConnectionManager.PingRequest.this.notifyAll();
}
if(_notifier != null) {
_notifier.pingComplete(true);
}
}
public boolean pongReceived() {
return _ponged;
}
}
void receivePong(long pingId) {
PingRequest req = null;
synchronized(_pendingPings) {
req = (PingRequest)_pendingPings.remove(new Long(pingId));
}
if(req != null) {
req.pong();
}
}
} }

View File

@ -1,8 +1,5 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.SocketTimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.i2p.I2PException; import net.i2p.I2PException;
/** /**
@ -10,46 +7,17 @@ import net.i2p.I2PException;
* *
*/ */
public class I2PServerSocketFull implements I2PServerSocket { public class I2PServerSocketFull implements I2PServerSocket {
private I2PSocketManagerFull _socketManager;
private I2PSocketManagerFull _socketManager; public I2PServerSocketFull(I2PSocketManagerFull mgr) {
_socketManager = mgr;
}
/** public I2PSocket accept() throws I2PException {
* return _socketManager.receiveSocket();
* @param mgr }
*/
public I2PServerSocketFull(I2PSocketManagerFull mgr) {
_socketManager = mgr;
}
/** public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); }
*
* @return
* @throws net.i2p.I2PException
* @throws SocketTimeoutException
*/
public I2PSocket accept() throws I2PException, SocketTimeoutException {
return _socketManager.receiveSocket();
}
public long getSoTimeout() { public I2PSocketManager getManager() { return _socketManager; }
return _socketManager.getConnectionManager().MgetSoTimeout();
}
public void setSoTimeout(long x) {
_socketManager.getConnectionManager().MsetSoTimeout(x);
}
/**
* Close the connection.
*/
public void close() {
_socketManager.getConnectionManager().setAllowIncomingConnections(false);
}
/**
*
* @return _socketManager
*/
public I2PSocketManager getManager() {
return _socketManager;
}
} }

View File

@ -11,139 +11,119 @@ import net.i2p.data.Destination;
* *
*/ */
public class I2PSocketFull implements I2PSocket { public class I2PSocketFull implements I2PSocket {
private Connection _connection;
private I2PSocket.SocketErrorListener _listener;
private Destination _remotePeer;
private Destination _localPeer;
private Connection _connection; public I2PSocketFull(Connection con) {
private I2PSocket.SocketErrorListener _listener; _connection = con;
private Destination _remotePeer; if (con != null) {
private Destination _localPeer; _remotePeer = con.getRemotePeer();
_localPeer = con.getSession().getMyDestination();
}
}
public I2PSocketFull(Connection con) { public void close() throws IOException {
_connection = con; Connection c = _connection;
if(con != null) { if (c == null) return;
_remotePeer = con.getRemotePeer(); if (c.getIsConnected()) {
_localPeer = con.getSession().getMyDestination(); OutputStream out = c.getOutputStream();
} if (out != null) {
} try {
out.close();
} catch (IOException ioe) {
// ignore any write error, as we want to keep on and kill the
// con (thanks Complication!)
}
}
c.disconnect(true);
} else {
//throw new IOException("Not connected");
}
destroy();
}
Connection getConnection() { return _connection; }
public void close() throws IOException { public InputStream getInputStream() {
Connection c = _connection; Connection c = _connection;
if(c == null) { if (c != null)
return; return c.getInputStream();
} else
if(c.getIsConnected()) { return null;
OutputStream out = c.getOutputStream(); }
if(out != null) {
try {
out.close();
} catch(IOException ioe) {
// ignore any write error, as we want to keep on and kill the
// con (thanks Complication!)
}
}
c.disconnect(true);
} else {
//throw new IOException("Not connected");
}
destroy();
}
Connection getConnection() { public I2PSocketOptions getOptions() {
return _connection; Connection c = _connection;
} if (c != null)
return c.getOptions();
else
return null;
}
public InputStream getInputStream() { public OutputStream getOutputStream() throws IOException {
Connection c = _connection; Connection c = _connection;
if(c != null) { if (c != null)
return c.getInputStream(); return c.getOutputStream();
} else { else
return null; return null;
} }
}
public I2PSocketOptions getOptions() { public Destination getPeerDestination() { return _remotePeer; }
Connection c = _connection;
if(c != null) {
return c.getOptions();
} else {
return null;
}
}
public OutputStream getOutputStream() throws IOException { public long getReadTimeout() {
Connection c = _connection; I2PSocketOptions opts = getOptions();
if(c != null) { if (opts != null)
return c.getOutputStream(); return opts.getReadTimeout();
} else { else
return null; return -1;
} }
}
public Destination getPeerDestination() { public Destination getThisDestination() { return _localPeer; }
return _remotePeer;
}
public long getReadTimeout() { public void setOptions(I2PSocketOptions options) {
I2PSocketOptions opts = getOptions(); Connection c = _connection;
if(opts != null) { if (c == null) return;
return opts.getReadTimeout();
} else {
return -1;
}
}
public Destination getThisDestination() { if (options instanceof ConnectionOptions)
return _localPeer; c.setOptions((ConnectionOptions)options);
} else
c.setOptions(new ConnectionOptions(options));
}
public void setOptions(I2PSocketOptions options) { public void setReadTimeout(long ms) {
Connection c = _connection; Connection c = _connection;
if(c == null) { if (c == null) return;
return;
}
if(options instanceof ConnectionOptions) {
c.setOptions((ConnectionOptions)options);
} else {
c.setOptions(new ConnectionOptions(options));
}
}
public void setReadTimeout(long ms) { c.getInputStream().setReadTimeout((int)ms);
Connection c = _connection; c.getOptions().setReadTimeout(ms);
if(c == null) { }
return;
}
c.getInputStream().setReadTimeout((int)ms);
c.getOptions().setReadTimeout(ms);
}
public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) { public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
_listener = lsnr; _listener = lsnr;
} }
public boolean isClosed() { public boolean isClosed() {
Connection c = _connection; Connection c = _connection;
return ((c == null) || return ((c == null) ||
(!c.getIsConnected()) || (!c.getIsConnected()) ||
(c.getResetReceived()) || (c.getResetReceived()) ||
(c.getResetSent())); (c.getResetSent()));
} }
void destroy() { void destroy() {
Connection c = _connection; Connection c = _connection;
_connection = null; _connection = null;
_listener = null; _listener = null;
if(c != null) { if (c != null)
c.disconnectComplete(); c.disconnectComplete();
} }
} public String toString() {
Connection c = _connection;
public String toString() { if (c == null)
Connection c = _connection; return super.toString();
if(c == null) { else
return super.toString(); return c.toString();
} else { }
return c.toString();
}
}
} }

View File

@ -1,7 +1,6 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.NoRouteToHostException; import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Properties; import java.util.Properties;
@ -14,6 +13,7 @@ import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
* Centralize the coordination and multiplexing of the local client's streaming. * Centralize the coordination and multiplexing of the local client's streaming.
* There should be one I2PSocketManager for each I2PSession, and if an application * There should be one I2PSocketManager for each I2PSession, and if an application
@ -23,317 +23,219 @@ import net.i2p.util.Log;
* *
*/ */
public class I2PSocketManagerFull implements I2PSocketManager { public class I2PSocketManagerFull implements I2PSocketManager {
private I2PAppContext _context;
private Log _log;
private I2PSession _session;
private I2PServerSocketFull _serverSocket;
private ConnectionOptions _defaultOptions;
private long _acceptTimeout;
private String _name;
private int _maxStreams;
private static int __managerId = 0;
private ConnectionManager _connectionManager;
private I2PAppContext _context; /**
private Log _log; * How long to wait for the client app to accept() before sending back CLOSE?
private I2PSession _session; * This includes the time waiting in the queue. Currently set to 5 seconds.
private I2PServerSocketFull _serverSocket; */
private ConnectionOptions _defaultOptions; private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
private long _acceptTimeout;
private String _name;
private int _maxStreams;
private static int __managerId = 0;
private ConnectionManager _connectionManager;
/**
* How long to wait for the client app to accept() before sending back CLOSE?
* This includes the time waiting in the queue. Currently set to 5 seconds.
*/
private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000;
/** public I2PSocketManagerFull() {
* _context = null;
*/ _session = null;
public I2PSocketManagerFull() { }
_context = null; public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
_session = null; this();
} init(context, session, opts, name);
}
/** /** how many streams will we allow at once? */
* public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
* @param context
* @param session
* @param opts
* @param name
*/
public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
this();
init(context, session, opts, name);
}
/** how many streams will we allow at once? */
public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
/** /**
* *
* */
* @param context public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
* @param session _context = context;
* @param opts _session = session;
* @param name _log = _context.logManager().getLog(I2PSocketManagerFull.class);
*/
public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
_context = context;
_session = session;
_log = _context.logManager().getLog(I2PSocketManagerFull.class);
_maxStreams = -1; _maxStreams = -1;
try { try {
String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
_maxStreams = Integer.parseInt(num); _maxStreams = Integer.parseInt(num);
} catch(NumberFormatException nfe) { } catch (NumberFormatException nfe) {
if(_log.shouldLog(Log.WARN)) { if (_log.shouldLog(Log.WARN))
_log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
} _maxStreams = -1;
_maxStreams = -1; }
} _name = name + " " + (++__managerId);
_name = name + " " + (++__managerId); _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; _defaultOptions = new ConnectionOptions(opts);
_defaultOptions = new ConnectionOptions(opts); _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
_connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions); _serverSocket = new I2PServerSocketFull(this);
_serverSocket = new I2PServerSocketFull(this);
if(_log.shouldLog(Log.INFO)) { if (_log.shouldLog(Log.INFO)) {
_log.info("Socket manager created. \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts); _log.info("Socket manager created. \ndefault options: " + _defaultOptions
} + "\noriginal properties: " + opts);
} }
}
/** public I2PSocketOptions buildOptions() { return buildOptions(null); }
* public I2PSocketOptions buildOptions(Properties opts) {
* @return ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
*/ curOpts.setProperties(opts);
public I2PSocketOptions buildOptions() { return curOpts;
return buildOptions(null); }
}
/** public I2PSession getSession() {
* return _session;
* @param opts }
* @return
*/
public I2PSocketOptions buildOptions(Properties opts) {
ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
curOpts.setProperties(opts);
return curOpts;
}
/** public ConnectionManager getConnectionManager() {
* return _connectionManager;
* @return }
*/
public I2PSession getSession() {
return _session;
}
/** public I2PSocket receiveSocket() throws I2PException {
* verifySession();
* @return Connection con = _connectionManager.getConnectionHandler().accept(-1);
*/ if (_log.shouldLog(Log.DEBUG))
public ConnectionManager getConnectionManager() { _log.debug("receiveSocket() called: " + con);
return _connectionManager; if (con != null) {
} I2PSocketFull sock = new I2PSocketFull(con);
con.setSocket(sock);
return sock;
} else {
return null;
}
}
/** /**
* * Ping the specified peer, returning true if they replied to the ping within
* @return * the timeout specified, false otherwise. This call blocks.
* @throws net.i2p.I2PException *
* @throws java.net.SocketTimeoutException */
*/ public boolean ping(Destination peer, long timeoutMs) {
public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException { return _connectionManager.ping(peer, timeoutMs);
verifySession(); }
Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout());
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("receiveSocket() called: " + con);
}
if(con != null) {
I2PSocketFull sock = new I2PSocketFull(con);
con.setSocket(sock);
return sock;
} else {
if(_connectionManager.MgetSoTimeout() == -1) {
return null;
}
throw new SocketTimeoutException("I2PSocket timed out");
}
}
/** /**
* Ping the specified peer, returning true if they replied to the ping within * How long should we wait for the client to .accept() a socket before
* the timeout specified, false otherwise. This call blocks. * sending back a NACK/Close?
* *
* * @param ms milliseconds to wait, maximum
* @param peer */
* @param timeoutMs public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
* @return public long getAcceptTimeout() { return _acceptTimeout; }
*/
public boolean ping(Destination peer, long timeoutMs) {
return _connectionManager.ping(peer, timeoutMs);
}
/** public void setDefaultOptions(I2PSocketOptions options) {
* How long should we wait for the client to .accept() a socket before _defaultOptions = new ConnectionOptions((ConnectionOptions) options);
* sending back a NACK/Close? }
*
* @param ms milliseconds to wait, maximum
*/
public void setAcceptTimeout(long ms) {
_acceptTimeout = ms;
}
/** public I2PSocketOptions getDefaultOptions() {
* return _defaultOptions;
* @return }
*/
public long getAcceptTimeout() {
return _acceptTimeout;
}
/** public I2PServerSocket getServerSocket() {
* _connectionManager.setAllowIncomingConnections(true);
* @param options return _serverSocket;
*/ }
public void setDefaultOptions(I2PSocketOptions options) {
_defaultOptions = new ConnectionOptions((ConnectionOptions)options);
}
/** private void verifySession() throws I2PException {
* if (!_connectionManager.getSession().isClosed())
* @return return;
*/ _connectionManager.getSession().connect();
public I2PSocketOptions getDefaultOptions() { }
return _defaultOptions;
}
/** /**
* * Create a new connected socket (block until the socket is created)
* @return *
*/ * @param peer Destination to connect to
public I2PServerSocket getServerSocket() { * @param options I2P socket options to be used for connecting
_connectionManager.setAllowIncomingConnections(true); *
return _serverSocket; * @throws NoRouteToHostException if the peer is not found or not reachable
} * @throws I2PException if there is some other I2P-related problem
*/
public I2PSocket connect(Destination peer, I2PSocketOptions options)
throws I2PException, NoRouteToHostException {
verifySession();
if (options == null)
options = _defaultOptions;
ConnectionOptions opts = null;
if (options instanceof ConnectionOptions)
opts = new ConnectionOptions((ConnectionOptions)options);
else
opts = new ConnectionOptions(options);
private void verifySession() throws I2PException { if (_log.shouldLog(Log.INFO))
if(!_connectionManager.getSession().isClosed()) { _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
return; + " with options: " + opts);
} Connection con = _connectionManager.connect(peer, opts);
_connectionManager.getSession().connect(); if (con == null)
} throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
I2PSocketFull socket = new I2PSocketFull(con);
con.setSocket(socket);
if (con.getConnectionError() != null) {
con.disconnect(false);
throw new NoRouteToHostException(con.getConnectionError());
}
return socket;
}
/** /**
* Create a new connected socket (block until the socket is created) * Create a new connected socket (block until the socket is created)
* *
* @param peer Destination to connect to * @param peer Destination to connect to
* @param options I2P socket options to be used for connecting *
* * @throws NoRouteToHostException if the peer is not found or not reachable
* @throws NoRouteToHostException if the peer is not found or not reachable * @throws I2PException if there is some other I2P-related problem
* @throws I2PException if there is some other I2P-related problem */
*/ public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException {
public I2PSocket connect(Destination peer, I2PSocketOptions options) return connect(peer, _defaultOptions);
throws I2PException, NoRouteToHostException { }
verifySession();
if(options == null) {
options = _defaultOptions;
}
ConnectionOptions opts = null;
if(options instanceof ConnectionOptions) {
opts = new ConnectionOptions((ConnectionOptions)options);
} else {
opts = new ConnectionOptions(options);
}
if(_log.shouldLog(Log.INFO)) {
_log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts);
}
Connection con = _connectionManager.connect(peer, opts);
if(con == null) {
throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
}
I2PSocketFull socket = new I2PSocketFull(con);
con.setSocket(socket);
if(con.getConnectionError() != null) {
con.disconnect(false);
throw new NoRouteToHostException(con.getConnectionError());
}
return socket;
}
/** /**
* Create a new connected socket (block until the socket is created) * Destroy the socket manager, freeing all the associated resources. This
* * method will block untill all the managed sockets are closed.
* @param peer Destination to connect to *
* */
* @return public void destroySocketManager() {
* @throws NoRouteToHostException if the peer is not found or not reachable _connectionManager.disconnectAllHard();
* @throws I2PException if there is some other I2P-related problem _connectionManager.setAllowIncomingConnections(false);
*/ // should we destroy the _session too?
public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException { // yes, since the old lib did (and SAM wants it to, and i dont know why not)
return connect(peer, _defaultOptions); if ( (_session != null) && (!_session.isClosed()) ) {
} try {
_session.destroySession();
} catch (I2PSessionException ise) {
_log.warn("Unable to destroy the session", ise);
}
}
}
/** /**
* Destroy the socket manager, freeing all the associated resources. This * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
* method will block untill all the managed sockets are closed. *
* */
*/ public Set listSockets() {
public void destroySocketManager() { Set connections = _connectionManager.listConnections();
_connectionManager.disconnectAllHard(); Set rv = new HashSet(connections.size());
_connectionManager.setAllowIncomingConnections(false); for (Iterator iter = connections.iterator(); iter.hasNext(); ) {
// should we destroy the _session too? Connection con = (Connection)iter.next();
// yes, since the old lib did (and SAM wants it to, and i dont know why not) if (con.getSocket() != null)
if((_session != null) && (!_session.isClosed())) { rv.add(con.getSocket());
try { }
_session.destroySession(); return rv;
} catch(I2PSessionException ise) { }
_log.warn("Unable to destroy the session", ise);
}
}
}
/** public String getName() { return _name; }
* Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. public void setName(String name) { _name = name; }
*
*
* @return
*/
public Set listSockets() {
Set connections = _connectionManager.listConnections();
Set rv = new HashSet(connections.size());
for(Iterator iter = connections.iterator(); iter.hasNext();) {
Connection con = (Connection)iter.next();
if(con.getSocket() != null) {
rv.add(con.getSocket());
}
}
return rv;
}
/**
*
* @return
*/
public String getName() {
return _name;
}
/** public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
* _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
* @param name }
*/ public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
public void setName(String name) { _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
_name = name; }
}
/**
*
* @param lsnr
*/
public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
_connectionManager.getMessageHandler().addDisconnectListener(lsnr);
}
/**
*
* @param lsnr
*/
public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
_connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
}
} }

View File

@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer;
/** /**
* *
*/ */
public class RetransmissionTimer extends SimpleTimer { class RetransmissionTimer extends SimpleTimer {
private static final RetransmissionTimer _instance = new RetransmissionTimer(); private static final RetransmissionTimer _instance = new RetransmissionTimer();
public static final SimpleTimer getInstance() { return _instance; } public static final SimpleTimer getInstance() { return _instance; }
protected RetransmissionTimer() { super("StreamingTimer"); } protected RetransmissionTimer() { super("StreamingTimer"); }

View File

@ -5,59 +5,42 @@ import java.util.List;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
class Executor implements Runnable { class Executor implements Runnable {
private I2PAppContext _context;
private Log _log;
private List _readyEvents;
public Executor(I2PAppContext ctx, Log log, List events) {
_context = ctx;
_readyEvents = events;
}
public void run() {
while (true) {
SimpleTimer.TimedEvent evt = null;
synchronized (_readyEvents) {
if (_readyEvents.size() <= 0)
try { _readyEvents.wait(); } catch (InterruptedException ie) {}
if (_readyEvents.size() > 0)
evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
}
private I2PAppContext _context; if (evt != null) {
private Log _log; long before = _context.clock().now();
private List _readyEvents; try {
private SimpleStore runn; evt.timeReached();
} catch (Throwable t) {
log("wtf, event borked: " + evt, t);
}
long time = _context.clock().now() - before;
if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
_log.warn("wtf, event execution took " + time + ": " + evt);
}
}
}
public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { private void log(String msg, Throwable t) {
_context = ctx; synchronized (this) {
_readyEvents = events; if (_log == null)
runn = x; _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
} }
_log.log(Log.CRIT, msg, t);
public void run() { }
while(runn.getAnswer()) {
SimpleTimer.TimedEvent evt = null;
synchronized(_readyEvents) {
if(_readyEvents.size() <= 0) {
try {
_readyEvents.wait();
} catch(InterruptedException ie) {
}
}
if(_readyEvents.size() > 0) {
evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
}
}
if(evt != null) {
long before = _context.clock().now();
try {
evt.timeReached();
} catch(Throwable t) {
log("wtf, event borked: " + evt, t);
}
long time = _context.clock().now() - before;
if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) {
_log.warn("wtf, event execution took " + time + ": " + evt);
}
}
}
}
/**
*
* @param msg
* @param t
*/
private void log(String msg, Throwable t) {
synchronized(this) {
if(_log == null) {
_log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
}
}
_log.log(Log.CRIT, msg, t);
}
} }

View File

@ -1,35 +0,0 @@
/*
* This is free software, do as you please.
*/
package net.i2p.util;
/**
*
* @author sponge
*/
public class SimpleStore {
private boolean answer;
SimpleStore(boolean x) {
answer=x;
}
/**
* set the answer
*
* @param x
*/
public void setAnswer(boolean x) {
answer = x;
}
/**
*
* @return boolean
*/
public boolean getAnswer() {
return answer;
}
}

View File

@ -16,262 +16,211 @@ import net.i2p.I2PAppContext;
* *
*/ */
public class SimpleTimer { public class SimpleTimer {
private static final SimpleTimer _instance = new SimpleTimer();
public static SimpleTimer getInstance() { return _instance; }
private I2PAppContext _context;
private Log _log;
/** event time (Long) to event (TimedEvent) mapping */
private TreeMap _events;
/** event (TimedEvent) to event time (Long) mapping */
private Map _eventTimes;
private List _readyEvents;
private static final SimpleTimer _instance = new SimpleTimer(); protected SimpleTimer() { this("SimpleTimer"); }
protected SimpleTimer(String name) {
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(SimpleTimer.class);
_events = new TreeMap();
_eventTimes = new HashMap(256);
_readyEvents = new ArrayList(4);
I2PThread runner = new I2PThread(new SimpleTimerRunner());
runner.setName(name);
runner.setDaemon(true);
runner.start();
for (int i = 0; i < 3; i++) {
I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents));
executor.setName(name + "Executor " + i);
executor.setDaemon(true);
executor.start();
}
}
public static SimpleTimer getInstance() { public void reschedule(TimedEvent event, long timeoutMs) {
return _instance; addEvent(event, timeoutMs, false);
} }
private I2PAppContext _context;
private Log _log;
/** event time (Long) to event (TimedEvent) mapping */
private TreeMap _events;
/** event (TimedEvent) to event time (Long) mapping */
private Map _eventTimes;
private List _readyEvents;
private SimpleStore runn;
/** /**
* * Queue up the given event to be fired no sooner than timeoutMs from now.
*/ * However, if this event is already scheduled, the event will be scheduled
protected SimpleTimer() { * for the earlier of the two timeouts, which may be before this stated
this("SimpleTimer"); * timeout. If this is not the desired behavior, call removeEvent first.
} *
*/
public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); }
/**
* @param useEarliestTime if its already scheduled, use the earlier of the
* two timeouts, else use the later
*/
public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
int totalEvents = 0;
long now = System.currentTimeMillis();
long eventTime = now + timeoutMs;
Long time = new Long(eventTime);
synchronized (_events) {
// remove the old scheduled position, then reinsert it
Long oldTime = (Long)_eventTimes.get(event);
if (oldTime != null) {
if (useEarliestTime) {
if (oldTime.longValue() < eventTime) {
_events.notifyAll();
return; // already scheduled for sooner than requested
} else {
_events.remove(oldTime);
}
} else {
if (oldTime.longValue() > eventTime) {
_events.notifyAll();
return; // already scheduled for later than the given period
} else {
_events.remove(oldTime);
}
}
}
while (_events.containsKey(time))
time = new Long(time.longValue() + 1);
_events.put(time, event);
_eventTimes.put(event, time);
/** if ( (_events.size() != _eventTimes.size()) ) {
* _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
* @param name for (Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext(); ) {
*/ TimedEvent evt = (TimedEvent)iter.next();
protected SimpleTimer(String name) { Long when = (Long)_eventTimes.get(evt);
runn = new SimpleStore(true); TimedEvent cur = (TimedEvent)_events.get(when);
_context = I2PAppContext.getGlobalContext(); if (cur != evt) {
_log = _context.logManager().getLog(SimpleTimer.class); _log.error("event " + evt + " @ " + when + ": " + cur);
_events = new TreeMap(); }
_eventTimes = new HashMap(256); }
_readyEvents = new ArrayList(4); }
I2PThread runner = new I2PThread(new SimpleTimerRunner());
runner.setName(name);
runner.setDaemon(true);
runner.start();
for(int i = 0; i < 3; i++) {
I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn));
executor.setName(name + "Executor " + i);
executor.setDaemon(true);
executor.start();
}
}
/** totalEvents = _events.size();
* Removes the SimpleTimer. _events.notifyAll();
*/ }
public void removeSimpleTimer() { if (time.longValue() > eventTime + 100) {
synchronized(_events) { if (_log.shouldLog(Log.WARN))
runn.setAnswer(false); _log.warn("Lots of timer congestion, had to push " + event + " back "
_events.notifyAll(); + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")");
} }
} long timeToAdd = System.currentTimeMillis() - now;
if (timeToAdd > 50) {
if (_log.shouldLog(Log.WARN))
_log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
}
/** }
*
* @param event
* @param timeoutMs
*/
public void reschedule(TimedEvent event, long timeoutMs) {
addEvent(event, timeoutMs, false);
}
/** public boolean removeEvent(TimedEvent evt) {
* Queue up the given event to be fired no sooner than timeoutMs from now. if (evt == null) return false;
* However, if this event is already scheduled, the event will be scheduled synchronized (_events) {
* for the earlier of the two timeouts, which may be before this stated Long when = (Long)_eventTimes.remove(evt);
* timeout. If this is not the desired behavior, call removeEvent first. if (when != null)
* _events.remove(when);
* @param event return null != when;
* @param timeoutMs }
*/ }
public void addEvent(TimedEvent event, long timeoutMs) {
addEvent(event, timeoutMs, true);
}
/** /**
* @param event * Simple interface for events to be queued up and notified on expiration
* @param timeoutMs */
* @param useEarliestTime if its already scheduled, use the earlier of the public interface TimedEvent {
* two timeouts, else use the later /**
*/ * the time requested has been reached (this call should NOT block,
public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) { * otherwise the whole SimpleTimer gets backed up)
int totalEvents = 0; *
long now = System.currentTimeMillis(); */
long eventTime = now + timeoutMs; public void timeReached();
Long time = new Long(eventTime); }
synchronized(_events) {
// remove the old scheduled position, then reinsert it
Long oldTime = (Long)_eventTimes.get(event);
if(oldTime != null) {
if(useEarliestTime) {
if(oldTime.longValue() < eventTime) {
_events.notifyAll();
return; // already scheduled for sooner than requested
} else {
_events.remove(oldTime);
}
} else {
if(oldTime.longValue() > eventTime) {
_events.notifyAll();
return; // already scheduled for later than the given period
} else {
_events.remove(oldTime);
}
}
}
while(_events.containsKey(time)) {
time = new Long(time.longValue() + 1);
}
_events.put(time, event);
_eventTimes.put(event, time);
if((_events.size() != _eventTimes.size())) { private long _occurredTime;
_log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size()); private long _occurredEventCount;
for(Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext();) { private TimedEvent _recentEvents[] = new TimedEvent[5];
TimedEvent evt = (TimedEvent)iter.next();
Long when = (Long)_eventTimes.get(evt);
TimedEvent cur = (TimedEvent)_events.get(when);
if(cur != evt) {
_log.error("event " + evt + " @ " + when + ": " + cur);
}
}
}
totalEvents = _events.size(); private class SimpleTimerRunner implements Runnable {
_events.notifyAll(); public void run() {
} List eventsToFire = new ArrayList(1);
if(time.longValue() > eventTime + 100) { while (true) {
if(_log.shouldLog(Log.WARN)) { try {
_log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")"); synchronized (_events) {
} //if (_events.size() <= 0)
} // _events.wait();
long timeToAdd = System.currentTimeMillis() - now; //if (_events.size() > 100)
if(timeToAdd > 50) { // _log.warn("> 100 events! " + _events.values());
if(_log.shouldLog(Log.WARN)) { long now = System.currentTimeMillis();
_log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued"); long nextEventDelay = -1;
} Object nextEvent = null;
} while (true) {
if (_events.size() <= 0) break;
Long when = (Long)_events.firstKey();
if (when.longValue() <= now) {
TimedEvent evt = (TimedEvent)_events.remove(when);
if (evt != null) {
_eventTimes.remove(evt);
eventsToFire.add(evt);
}
} else {
nextEventDelay = when.longValue() - now;
nextEvent = _events.get(when);
break;
}
}
if (eventsToFire.size() <= 0) {
if (nextEventDelay != -1) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
_events.wait(nextEventDelay);
} else {
_events.wait();
}
}
}
} catch (ThreadDeath td) {
return; // die
} catch (InterruptedException ie) {
// ignore
} catch (Throwable t) {
if (_log != null) {
_log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
} else {
System.err.println("Uncaught exception in SimpleTimer");
t.printStackTrace();
}
}
} long now = System.currentTimeMillis();
now = now - (now % 1000);
/** synchronized (_readyEvents) {
* for (int i = 0; i < eventsToFire.size(); i++)
* @param evt _readyEvents.add(eventsToFire.get(i));
* @return _readyEvents.notifyAll();
*/ }
public boolean removeEvent(TimedEvent evt) {
if(evt == null) {
return false;
}
synchronized(_events) {
Long when = (Long)_eventTimes.remove(evt);
if(when != null) {
_events.remove(when);
}
return null != when;
}
}
/** if (_occurredTime == now) {
* Simple interface for events to be queued up and notified on expiration _occurredEventCount += eventsToFire.size();
*/ } else {
public interface TimedEvent { _occurredTime = now;
if (_occurredEventCount > 2500) {
StringBuffer buf = new StringBuffer(128);
buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
buf.append(") in a second!");
_log.log(Log.WARN, buf.toString());
}
_occurredEventCount = 0;
}
/** eventsToFire.clear();
* the time requested has been reached (this call should NOT block, }
* otherwise the whole SimpleTimer gets backed up) }
* }
*/
public void timeReached();
}
private long _occurredTime;
private long _occurredEventCount;
// not used
// private TimedEvent _recentEvents[] = new TimedEvent[5];
private class SimpleTimerRunner implements Runnable {
public void run() {
List eventsToFire = new ArrayList(1);
while(runn.getAnswer()) {
try {
synchronized(_events) {
//if (_events.size() <= 0)
// _events.wait();
//if (_events.size() > 100)
// _log.warn("> 100 events! " + _events.values());
long now = System.currentTimeMillis();
long nextEventDelay = -1;
Object nextEvent = null;
while(runn.getAnswer()) {
if(_events.size() <= 0) {
break;
}
Long when = (Long)_events.firstKey();
if(when.longValue() <= now) {
TimedEvent evt = (TimedEvent)_events.remove(when);
if(evt != null) {
_eventTimes.remove(evt);
eventsToFire.add(evt);
}
} else {
nextEventDelay = when.longValue() - now;
nextEvent = _events.get(when);
break;
}
}
if(eventsToFire.size() <= 0) {
if(nextEventDelay != -1) {
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
}
_events.wait(nextEventDelay);
} else {
_events.wait();
}
}
}
} catch(InterruptedException ie) {
// ignore
} catch(Throwable t) {
if(_log != null) {
_log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
} else {
System.err.println("Uncaught exception in SimpleTimer");
t.printStackTrace();
}
}
long now = System.currentTimeMillis();
now = now - (now % 1000);
synchronized(_readyEvents) {
for(int i = 0; i < eventsToFire.size(); i++) {
_readyEvents.add(eventsToFire.get(i));
}
_readyEvents.notifyAll();
}
if(_occurredTime == now) {
_occurredEventCount += eventsToFire.size();
} else {
_occurredTime = now;
if(_occurredEventCount > 2500) {
StringBuffer buf = new StringBuffer(128);
buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
buf.append(") in a second!");
_log.log(Log.WARN, buf.toString());
}
_occurredEventCount = 0;
}
eventsToFire.clear();
}
}
}
} }