Added {get,set}SOTimeout() to the ServerSocket API,

and fixed all the broken mainstream applications depending on it.
Fixed a grave bug in SimpleTimer.
Fixed Steraming Timer to be public.
Fixed a pile of JavaDoc comments, and reformatted the files I touched.
This commit is contained in:
sponge
2008-09-25 06:55:04 +00:00
parent 8d78a77a8c
commit fa5c7219d3
13 changed files with 1673 additions and 1279 deletions

View File

@ -12,6 +12,7 @@ import java.net.ConnectException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Properties; import java.util.Properties;
@ -219,6 +220,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Error accepting", ce); _log.error("Error accepting", ce);
// not killing the server.. // not killing the server..
} catch(SocketTimeoutException ste) {
// ignored, we never set the timeout
} }
} }
} }

View File

@ -2,6 +2,7 @@ package net.i2p.client.streaming;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException;
import net.i2p.I2PException; import net.i2p.I2PException;
/** /**
@ -9,6 +10,7 @@ import net.i2p.I2PException;
* *
*/ */
public interface I2PServerSocket { public interface I2PServerSocket {
/** /**
* Closes the socket. * Closes the socket.
*/ */
@ -24,8 +26,21 @@ public interface I2PServerSocket {
* @throws I2PException if there is a problem with reading a new socket * @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc) * from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
* @throws SocketTimeoutException
*/ */
public I2PSocket accept() throws I2PException, ConnectException; public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
/**
* Set Sock Option accept timeout
* @param x
*/
public void setSoTimeout(long x);
/**
* Get Sock Option accept timeout
* @return timeout
*/
public long getSoTimeout();
/** /**
* Access the manager which is coordinating the server socket * Access the manager which is coordinating the server socket

View File

@ -17,19 +17,33 @@ import net.i2p.util.Log;
* *
*/ */
class I2PServerSocketImpl implements I2PServerSocket { class I2PServerSocketImpl implements I2PServerSocket {
private final static Log _log = new Log(I2PServerSocketImpl.class); private final static Log _log = new Log(I2PServerSocketImpl.class);
private I2PSocketManager mgr; private I2PSocketManager mgr;
/** list of sockets waiting for the client to accept them */ /** list of sockets waiting for the client to accept them */
private List pendingSockets = Collections.synchronizedList(new ArrayList(4)); private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
/** have we been closed */ /** have we been closed */
private volatile boolean closing = false; private volatile boolean closing = false;
/** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
private Object socketAcceptedLock = new Object(); private Object socketAcceptedLock = new Object();
/** lock on this when adding a new socket to the pending list, and wait on it accordingly */ /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
private Object socketAddedLock = new Object(); private Object socketAddedLock = new Object();
/**
* Set Sock Option accept timeout stub, does nothing
* @param x
*/
public void setSoTimeout(long x) {
}
/**
* Get Sock Option accept timeout stub, does nothing
* @return timeout
*/
public long getSoTimeout() {
return -1;
}
public I2PServerSocketImpl(I2PSocketManager mgr) { public I2PServerSocketImpl(I2PSocketManager mgr) {
this.mgr = mgr; this.mgr = mgr;
} }
@ -47,19 +61,22 @@ class I2PServerSocketImpl implements I2PServerSocket {
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
*/ */
public I2PSocket accept() throws I2PException, ConnectException { public I2PSocket accept() throws I2PException, ConnectException {
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.debug("accept() called, pending: " + pendingSockets.size()); _log.debug("accept() called, pending: " + pendingSockets.size());
}
I2PSocket ret = null; I2PSocket ret = null;
while((ret == null) && (!closing)) { while((ret == null) && (!closing)) {
while(pendingSockets.size() <= 0) { while(pendingSockets.size() <= 0) {
if (closing) throw new ConnectException("I2PServerSocket closed"); if(closing) {
throw new ConnectException("I2PServerSocket closed");
}
try { try {
synchronized(socketAddedLock) { synchronized(socketAddedLock) {
socketAddedLock.wait(); socketAddedLock.wait();
} }
} catch (InterruptedException ie) {} } catch(InterruptedException ie) {
}
} }
synchronized(pendingSockets) { synchronized(pendingSockets) {
if(pendingSockets.size() > 0) { if(pendingSockets.size() > 0) {
@ -73,8 +90,9 @@ class I2PServerSocketImpl implements I2PServerSocket {
} }
} }
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.debug("TIMING: handed out accept result " + ret.hashCode()); _log.debug("TIMING: handed out accept result " + ret.hashCode());
}
return ret; return ret;
} }
@ -88,12 +106,13 @@ class I2PServerSocketImpl implements I2PServerSocket {
* or the socket was closed * or the socket was closed
*/ */
public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
}
if(closing) { if(closing) {
if (_log.shouldLog(Log.WARN)) if(_log.shouldLog(Log.WARN)) {
_log.warn("Already closing the socket"); _log.warn("Already closing the socket");
}
return false; return false;
} }
@ -110,14 +129,16 @@ class I2PServerSocketImpl implements I2PServerSocket {
while(pendingSockets.contains(s)) { while(pendingSockets.contains(s)) {
long now = clock.now(); long now = clock.now();
if(now >= end) { if(now >= end) {
if (_log.shouldLog(Log.INFO)) if(_log.shouldLog(Log.INFO)) {
_log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
}
pendingSockets.remove(s); pendingSockets.remove(s);
return false; return false;
} }
if(closing) { if(closing) {
if (_log.shouldLog(Log.WARN)) if(_log.shouldLog(Log.WARN)) {
_log.warn("Server socket closed while waiting for accept"); _log.warn("Server socket closed while waiting for accept");
}
pendingSockets.remove(s); pendingSockets.remove(s);
return false; return false;
} }
@ -126,11 +147,13 @@ class I2PServerSocketImpl implements I2PServerSocket {
synchronized(socketAcceptedLock) { synchronized(socketAcceptedLock) {
socketAcceptedLock.wait(remaining); socketAcceptedLock.wait(remaining);
} }
} catch (InterruptedException ie) {} } catch(InterruptedException ie) {
}
} }
long now = clock.now(); long now = clock.now();
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.info("Socket accepted after " + (now - start) + "ms for socket " + s.toString()); _log.info("Socket accepted after " + (now - start) + "ms for socket " + s.toString());
}
return true; return true;
} }
@ -146,5 +169,7 @@ class I2PServerSocketImpl implements I2PServerSocket {
} }
} }
public I2PSocketManager getManager() { return mgr; } public I2PSocketManager getManager() {
return mgr;
}
} }

View File

@ -5,6 +5,7 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.Properties; import java.util.Properties;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
@ -20,6 +21,7 @@ import net.i2p.util.Log;
* *
*/ */
public class StreamSinkServer { public class StreamSinkServer {
private Log _log; private Log _log;
private String _sinkDir; private String _sinkDir;
private String _destFile; private String _destFile;
@ -36,6 +38,7 @@ public class StreamSinkServer {
public StreamSinkServer(String sinkDir, String ourDestFile) { public StreamSinkServer(String sinkDir, String ourDestFile) {
this(sinkDir, ourDestFile, null, -1, 3); this(sinkDir, ourDestFile, null, -1, 3);
} }
public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) { public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
_sinkDir = sinkDir; _sinkDir = sinkDir;
_destFile = ourDestFile; _destFile = ourDestFile;
@ -52,13 +55,15 @@ public class StreamSinkServer {
*/ */
public void runServer() { public void runServer() {
I2PSocketManager mgr = null; I2PSocketManager mgr = null;
if (_i2cpHost != null) if(_i2cpHost != null) {
mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties()); mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
else } else {
mgr = I2PSocketManagerFactory.createManager(); mgr = I2PSocketManagerFactory.createManager();
}
Destination dest = mgr.getSession().getMyDestination(); Destination dest = mgr.getSession().getMyDestination();
if (_log.shouldLog(Log.INFO)) if(_log.shouldLog(Log.INFO)) {
_log.info("Listening for connections on: " + dest.calculateHash().toBase64()); _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
}
FileOutputStream fos = null; FileOutputStream fos = null;
try { try {
fos = new FileOutputStream(_destFile); fos = new FileOutputStream(_destFile);
@ -70,7 +75,12 @@ public class StreamSinkServer {
_log.error("Error formatting the destination", dfe); _log.error("Error formatting the destination", dfe);
return; return;
} finally { } finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {} if(fos != null) {
try {
fos.close();
} catch(IOException ioe) {
}
}
} }
I2PServerSocket sock = mgr.getServerSocket(); I2PServerSocket sock = mgr.getServerSocket();
@ -91,22 +101,28 @@ public class StreamSinkServer {
* *
*/ */
private class ClientRunner implements Runnable { private class ClientRunner implements Runnable {
private I2PServerSocket _socket; private I2PServerSocket _socket;
public ClientRunner(I2PServerSocket socket) { public ClientRunner(I2PServerSocket socket) {
_socket = socket; _socket = socket;
} }
public void run() { public void run() {
while(true) { while(true) {
try { try {
I2PSocket socket = _socket.accept(); I2PSocket socket = _socket.accept();
if (socket != null) if(socket != null) {
handle(socket); handle(socket);
}
} catch(I2PException ie) { } catch(I2PException ie) {
_log.error("Error accepting connection", ie); _log.error("Error accepting connection", ie);
return; return;
} catch(ConnectException ce) { } catch(ConnectException ce) {
_log.error("Connection already dropped", ce); _log.error("Connection already dropped", ce);
return; return;
} catch(SocketTimeoutException ste) {
// ignored
} }
} }
} }
@ -115,12 +131,14 @@ public class StreamSinkServer {
FileOutputStream fos = null; FileOutputStream fos = null;
try { try {
File sink = new File(_sinkDir); File sink = new File(_sinkDir);
if (!sink.exists()) if(!sink.exists()) {
sink.mkdirs(); sink.mkdirs();
}
File cur = File.createTempFile("clientSink", ".dat", sink); File cur = File.createTempFile("clientSink", ".dat", sink);
fos = new FileOutputStream(cur); fos = new FileOutputStream(cur);
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.debug("Writing to " + cur.getAbsolutePath()); _log.debug("Writing to " + cur.getAbsolutePath());
}
} catch(IOException ioe) { } catch(IOException ioe) {
_log.error("Error creating sink", ioe); _log.error("Error creating sink", ioe);
return; return;
@ -135,17 +153,28 @@ public class StreamSinkServer {
while((read = in.read(buf)) != -1) { while((read = in.read(buf)) != -1) {
//_fos.write(buf, 0, read); //_fos.write(buf, 0, read);
written += read; written += read;
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.debug("read and wrote " + read + " (" + written + ")"); _log.debug("read and wrote " + read + " (" + written + ")");
} }
}
fos.write(("written: [" + written + "]\n").getBytes()); fos.write(("written: [" + written + "]\n").getBytes());
long lifetime = System.currentTimeMillis() - start; long lifetime = System.currentTimeMillis() - start;
_log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
} catch(IOException ioe) { } catch(IOException ioe) {
_log.error("Error writing the sink", ioe); _log.error("Error writing the sink", ioe);
} finally { } finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {} if(fos != null) {
if (sock != null) try { sock.close(); } catch (IOException ioe) {} try {
fos.close();
} catch(IOException ioe) {
}
}
if(sock != null) {
try {
sock.close();
} catch(IOException ioe) {
}
}
_log.debug("Client socket closed"); _log.debug("Client socket closed");
} }
} }
@ -174,7 +203,8 @@ public class StreamSinkServer {
if(args.length == 5) { if(args.length == 5) {
try { try {
handlers = Integer.parseInt(args[4]); handlers = Integer.parseInt(args[4]);
} catch (NumberFormatException nfe) {} } catch(NumberFormatException nfe) {
}
} }
try { try {
int port = Integer.parseInt(args[1]); int port = Integer.parseInt(args[1]);
@ -186,7 +216,8 @@ public class StreamSinkServer {
default: default:
System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
} }
if (server != null) if(server != null) {
server.runServer(); server.runServer();
} }
} }
}

View File

@ -1,5 +1,6 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -21,6 +21,7 @@ import net.i2p.util.SimpleTimer;
* *
*/ */
public class ConnectionManager { public class ConnectionManager {
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
private I2PSession _session; private I2PSession _session;
@ -39,6 +40,7 @@ public class ConnectionManager {
private ConnectionOptions _defaultOptions; private ConnectionOptions _defaultOptions;
private volatile int _numWaiting; private volatile int _numWaiting;
private Object _connectionLock; private Object _connectionLock;
private long SoTimeout;
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
_context = context; _context = context;
@ -58,6 +60,9 @@ public class ConnectionManager {
_maxConcurrentStreams = maxConcurrent; _maxConcurrentStreams = maxConcurrent;
_defaultOptions = defaultOptions; _defaultOptions = defaultOptions;
_numWaiting = 0; _numWaiting = 0;
/** Socket timeout for accept() */
SoTimeout = -1;
_context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
_context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
_context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000}); _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
@ -75,6 +80,7 @@ public class ConnectionManager {
return (Connection)_connectionByInboundId.get(new Long(id)); return (Connection)_connectionByInboundId.get(new Long(id));
} }
} }
/** /**
* not guaranteed to be unique, but in case we receive more than one packet * not guaranteed to be unique, but in case we receive more than one packet
* on an inbound connection that we havent ack'ed yet... * on an inbound connection that we havent ack'ed yet...
@ -83,16 +89,34 @@ public class ConnectionManager {
synchronized(_connectionLock) { synchronized(_connectionLock) {
for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
Connection con = (Connection)iter.next(); Connection con = (Connection)iter.next();
if (DataHelper.eq(con.getSendStreamId(), id)) if(DataHelper.eq(con.getSendStreamId(), id)) {
return con; return con;
} }
} }
}
return null; return null;
} }
/**
* Set the socket accept() timeout.
* @param x
*/
public void MsetSoTimeout(long x) {
SoTimeout = x;
}
/**
* Get the socket accept() timeout.
* @return
*/
public long MgetSoTimeout() {
return SoTimeout;
}
public void setAllowIncomingConnections(boolean allow) { public void setAllowIncomingConnections(boolean allow) {
_connectionHandler.setActive(allow); _connectionHandler.setActive(allow);
} }
/** should we acceot connections, or just reject everyone? */ /** should we acceot connections, or just reject everyone? */
public boolean getAllowIncomingConnections() { public boolean getAllowIncomingConnections() {
return _connectionHandler.getActive(); return _connectionHandler.getActive();
@ -113,9 +137,10 @@ public class ConnectionManager {
synchronized(_connectionLock) { synchronized(_connectionLock) {
total = _connectionByInboundId.size(); total = _connectionByInboundId.size();
for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
if ( ((Connection)iter.next()).getIsConnected() ) if(((Connection)iter.next()).getIsConnected()) {
active++; active++;
} }
}
if(locked_tooManyStreams()) { if(locked_tooManyStreams()) {
reject = true; reject = true;
} else { } else {
@ -135,9 +160,9 @@ public class ConnectionManager {
_context.statManager().addRateData("stream.receiveActive", active, total); _context.statManager().addRateData("stream.receiveActive", active, total);
if(reject) { if(reject) {
if (_log.shouldLog(Log.WARN)) if(_log.shouldLog(Log.WARN)) {
_log.warn("Refusing connection since we have exceeded our max of " _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections");
+ _maxConcurrentStreams + " connections"); }
PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom()); PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
@ -163,7 +188,6 @@ public class ConnectionManager {
_context.statManager().addRateData("stream.connectionReceived", 1, 0); _context.statManager().addRateData("stream.connectionReceived", 1, 0);
return con; return con;
} }
private static final long DEFAULT_STREAM_DELAY_MAX = 10 * 1000; private static final long DEFAULT_STREAM_DELAY_MAX = 10 * 1000;
/** /**
@ -176,15 +200,16 @@ public class ConnectionManager {
Connection con = null; Connection con = null;
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1; long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
long expiration = _context.clock().now() + opts.getConnectTimeout(); long expiration = _context.clock().now() + opts.getConnectTimeout();
if (opts.getConnectTimeout() <= 0) if(opts.getConnectTimeout() <= 0) {
expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
}
_numWaiting++; _numWaiting++;
while(true) { while(true) {
long remaining = expiration - _context.clock().now(); long remaining = expiration - _context.clock().now();
if(remaining <= 0) { if(remaining <= 0) {
if (_log.shouldLog(Log.WARN)) if(_log.shouldLog(Log.WARN)) {
_log.warn("Refusing to connect since we have exceeded our max of " _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections");
+ _maxConcurrentStreams + " connections"); }
_numWaiting--; _numWaiting--;
return null; return null;
} }
@ -193,16 +218,18 @@ public class ConnectionManager {
if(locked_tooManyStreams()) { if(locked_tooManyStreams()) {
// allow a full buffer of pending/waiting streams // allow a full buffer of pending/waiting streams
if(_numWaiting > _maxConcurrentStreams) { if(_numWaiting > _maxConcurrentStreams) {
if (_log.shouldLog(Log.WARN)) if(_log.shouldLog(Log.WARN)) {
_log.warn("Refusing connection since we have exceeded our max of " _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already");
+ _maxConcurrentStreams + " and there are " + _numWaiting }
+ " waiting already");
_numWaiting--; _numWaiting--;
return null; return null;
} }
// no remaining streams, lets wait a bit // no remaining streams, lets wait a bit
try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} try {
_connectionLock.wait(remaining);
} catch(InterruptedException ie) {
}
} else { } else {
con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
con.setRemotePeer(peer); con.setRemotePeer(peer);
@ -224,35 +251,53 @@ public class ConnectionManager {
if(opts.getConnectDelay() <= 0) { if(opts.getConnectDelay() <= 0) {
con.waitForConnect(); con.waitForConnect();
} }
if (_numWaiting > 0) if(_numWaiting > 0) {
_numWaiting--; _numWaiting--;
}
_context.statManager().addRateData("stream.connectionCreated", 1, 0); _context.statManager().addRateData("stream.connectionCreated", 1, 0);
return con; return con;
} }
private boolean locked_tooManyStreams() { private boolean locked_tooManyStreams() {
if (_maxConcurrentStreams <= 0) return false; if(_maxConcurrentStreams <= 0) {
if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; return false;
}
if(_connectionByInboundId.size() < _maxConcurrentStreams) {
return false;
}
int active = 0; int active = 0;
for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) { for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
Connection con = (Connection)iter.next(); Connection con = (Connection)iter.next();
if (con.getIsConnected()) if(con.getIsConnected()) {
active++; active++;
} }
}
if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) ) if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) {
_log.info("More than 100 connections! " + active _log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size());
+ " total: " + _connectionByInboundId.size()); }
return (active >= _maxConcurrentStreams); return (active >= _maxConcurrentStreams);
} }
public MessageHandler getMessageHandler() { return _messageHandler; } public MessageHandler getMessageHandler() {
public PacketHandler getPacketHandler() { return _packetHandler; } return _messageHandler;
public ConnectionHandler getConnectionHandler() { return _connectionHandler; } }
public I2PSession getSession() { return _session; }
public PacketQueue getPacketQueue() { return _outboundQueue; } public PacketHandler getPacketHandler() {
return _packetHandler;
}
public ConnectionHandler getConnectionHandler() {
return _connectionHandler;
}
public I2PSession getSession() {
return _session;
}
public PacketQueue getPacketQueue() {
return _outboundQueue;
}
/** /**
* Something b0rked hard, so kill all of our connections without mercy. * Something b0rked hard, so kill all of our connections without mercy.
@ -279,11 +324,12 @@ public class ConnectionManager {
synchronized(_connectionLock) { synchronized(_connectionLock) {
Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId())); Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
removed = (o == con); removed = (o == con);
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.debug("Connection removed? " + removed + " remaining: " _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con);
+ _connectionByInboundId.size() + ": " + con); }
if (!removed && _log.shouldLog(Log.DEBUG)) if(!removed && _log.shouldLog(Log.DEBUG)) {
_log.debug("Failed to remove " + con + "\n" + _connectionByInboundId.values()); _log.debug("Failed to remove " + con + "\n" + _connectionByInboundId.values());
}
_connectionLock.notifyAll(); _connectionLock.notifyAll();
} }
if(removed) { if(removed) {
@ -309,9 +355,11 @@ public class ConnectionManager {
public boolean ping(Destination peer, long timeoutMs) { public boolean ping(Destination peer, long timeoutMs) {
return ping(peer, timeoutMs, true); return ping(peer, timeoutMs, true);
} }
public boolean ping(Destination peer, long timeoutMs, boolean blocking) { public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
return ping(peer, timeoutMs, blocking, null, null, null); return ping(peer, timeoutMs, blocking, null, null, null);
} }
public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1); Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1);
PacketLocal packet = new PacketLocal(_context, peer); PacketLocal packet = new PacketLocal(_context, peer);
@ -335,8 +383,12 @@ public class ConnectionManager {
if(blocking) { if(blocking) {
synchronized(req) { synchronized(req) {
if (!req.pongReceived()) if(!req.pongReceived()) {
try { req.wait(timeoutMs); } catch (InterruptedException ie) {} try {
req.wait(timeoutMs);
} catch(InterruptedException ie) {
}
}
} }
synchronized(_pendingPings) { synchronized(_pendingPings) {
@ -351,12 +403,15 @@ public class ConnectionManager {
} }
interface PingNotifier { interface PingNotifier {
public void pingComplete(boolean ok); public void pingComplete(boolean ok);
} }
private class PingFailed implements SimpleTimer.TimedEvent { private class PingFailed implements SimpleTimer.TimedEvent {
private Long _id; private Long _id;
private PingNotifier _notifier; private PingNotifier _notifier;
public PingFailed(Long id, PingNotifier notifier) { public PingFailed(Long id, PingNotifier notifier) {
_id = id; _id = id;
_notifier = notifier; _notifier = notifier;
@ -366,29 +421,35 @@ public class ConnectionManager {
boolean removed = false; boolean removed = false;
synchronized(_pendingPings) { synchronized(_pendingPings) {
Object o = _pendingPings.remove(_id); Object o = _pendingPings.remove(_id);
if (o != null) if(o != null) {
removed = true; removed = true;
} }
}
if(removed) { if(removed) {
if (_notifier != null) if(_notifier != null) {
_notifier.pingComplete(false); _notifier.pingComplete(false);
if (_log.shouldLog(Log.INFO)) }
if(_log.shouldLog(Log.INFO)) {
_log.info("Ping failed"); _log.info("Ping failed");
} }
} }
} }
}
private class PingRequest { private class PingRequest {
private boolean _ponged; private boolean _ponged;
private Destination _peer; private Destination _peer;
private PacketLocal _packet; private PacketLocal _packet;
private PingNotifier _notifier; private PingNotifier _notifier;
public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
_ponged = false; _ponged = false;
_peer = peer; _peer = peer;
_packet = packet; _packet = packet;
_notifier = notifier; _notifier = notifier;
} }
public void pong() { public void pong() {
_log.debug("Ping successful"); _log.debug("Ping successful");
_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
@ -396,10 +457,14 @@ public class ConnectionManager {
_ponged = true; _ponged = true;
ConnectionManager.PingRequest.this.notifyAll(); ConnectionManager.PingRequest.this.notifyAll();
} }
if (_notifier != null) if(_notifier != null) {
_notifier.pingComplete(true); _notifier.pingComplete(true);
} }
public boolean pongReceived() { return _ponged; } }
public boolean pongReceived() {
return _ponged;
}
} }
void receivePong(long pingId) { void receivePong(long pingId) {
@ -407,7 +472,8 @@ public class ConnectionManager {
synchronized(_pendingPings) { synchronized(_pendingPings) {
req = (PingRequest)_pendingPings.remove(new Long(pingId)); req = (PingRequest)_pendingPings.remove(new Long(pingId));
} }
if (req != null) if(req != null) {
req.pong(); req.pong();
} }
} }
}

View File

@ -1,5 +1,8 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.SocketTimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.i2p.I2PException; import net.i2p.I2PException;
/** /**
@ -7,17 +10,46 @@ import net.i2p.I2PException;
* *
*/ */
public class I2PServerSocketFull implements I2PServerSocket { public class I2PServerSocketFull implements I2PServerSocket {
private I2PSocketManagerFull _socketManager; private I2PSocketManagerFull _socketManager;
/**
*
* @param mgr
*/
public I2PServerSocketFull(I2PSocketManagerFull mgr) { public I2PServerSocketFull(I2PSocketManagerFull mgr) {
_socketManager = mgr; _socketManager = mgr;
} }
public I2PSocket accept() throws I2PException { /**
*
* @return
* @throws net.i2p.I2PException
* @throws SocketTimeoutException
*/
public I2PSocket accept() throws I2PException, SocketTimeoutException {
return _socketManager.receiveSocket(); return _socketManager.receiveSocket();
} }
public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); } public long getSoTimeout() {
return _socketManager.getConnectionManager().MgetSoTimeout();
public I2PSocketManager getManager() { return _socketManager; } }
public void setSoTimeout(long x) {
_socketManager.getConnectionManager().MsetSoTimeout(x);
}
/**
* Close the connection.
*/
public void close() {
_socketManager.getConnectionManager().setAllowIncomingConnections(false);
}
/**
*
* @return _socketManager
*/
public I2PSocketManager getManager() {
return _socketManager;
}
} }

View File

@ -11,6 +11,7 @@ import net.i2p.data.Destination;
* *
*/ */
public class I2PSocketFull implements I2PSocket { public class I2PSocketFull implements I2PSocket {
private Connection _connection; private Connection _connection;
private I2PSocket.SocketErrorListener _listener; private I2PSocket.SocketErrorListener _listener;
private Destination _remotePeer; private Destination _remotePeer;
@ -24,9 +25,12 @@ public class I2PSocketFull implements I2PSocket {
} }
} }
public void close() throws IOException { public void close() throws IOException {
Connection c = _connection; Connection c = _connection;
if (c == null) return; if(c == null) {
return;
}
if(c.getIsConnected()) { if(c.getIsConnected()) {
OutputStream out = c.getOutputStream(); OutputStream out = c.getOutputStream();
if(out != null) { if(out != null) {
@ -44,58 +48,71 @@ public class I2PSocketFull implements I2PSocket {
destroy(); destroy();
} }
Connection getConnection() { return _connection; } Connection getConnection() {
return _connection;
}
public InputStream getInputStream() { public InputStream getInputStream() {
Connection c = _connection; Connection c = _connection;
if (c != null) if(c != null) {
return c.getInputStream(); return c.getInputStream();
else } else {
return null; return null;
} }
}
public I2PSocketOptions getOptions() { public I2PSocketOptions getOptions() {
Connection c = _connection; Connection c = _connection;
if (c != null) if(c != null) {
return c.getOptions(); return c.getOptions();
else } else {
return null; return null;
} }
}
public OutputStream getOutputStream() throws IOException { public OutputStream getOutputStream() throws IOException {
Connection c = _connection; Connection c = _connection;
if (c != null) if(c != null) {
return c.getOutputStream(); return c.getOutputStream();
else } else {
return null; return null;
} }
}
public Destination getPeerDestination() { return _remotePeer; } public Destination getPeerDestination() {
return _remotePeer;
}
public long getReadTimeout() { public long getReadTimeout() {
I2PSocketOptions opts = getOptions(); I2PSocketOptions opts = getOptions();
if (opts != null) if(opts != null) {
return opts.getReadTimeout(); return opts.getReadTimeout();
else } else {
return -1; return -1;
} }
}
public Destination getThisDestination() { return _localPeer; } public Destination getThisDestination() {
return _localPeer;
}
public void setOptions(I2PSocketOptions options) { public void setOptions(I2PSocketOptions options) {
Connection c = _connection; Connection c = _connection;
if (c == null) return; if(c == null) {
return;
if (options instanceof ConnectionOptions) }
if(options instanceof ConnectionOptions) {
c.setOptions((ConnectionOptions)options); c.setOptions((ConnectionOptions)options);
else } else {
c.setOptions(new ConnectionOptions(options)); c.setOptions(new ConnectionOptions(options));
} }
}
public void setReadTimeout(long ms) { public void setReadTimeout(long ms) {
Connection c = _connection; Connection c = _connection;
if (c == null) return; if(c == null) {
return;
}
c.getInputStream().setReadTimeout((int)ms); c.getInputStream().setReadTimeout((int)ms);
c.getOptions().setReadTimeout(ms); c.getOptions().setReadTimeout(ms);
} }
@ -116,14 +133,17 @@ public class I2PSocketFull implements I2PSocket {
Connection c = _connection; Connection c = _connection;
_connection = null; _connection = null;
_listener = null; _listener = null;
if (c != null) if(c != null) {
c.disconnectComplete(); c.disconnectComplete();
} }
}
public String toString() { public String toString() {
Connection c = _connection; Connection c = _connection;
if (c == null) if(c == null) {
return super.toString(); return super.toString();
else } else {
return c.toString(); return c.toString();
} }
} }
}

View File

@ -1,6 +1,7 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.net.NoRouteToHostException; import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Properties; import java.util.Properties;
@ -13,7 +14,6 @@ import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
* Centralize the coordination and multiplexing of the local client's streaming. * Centralize the coordination and multiplexing of the local client's streaming.
* There should be one I2PSocketManager for each I2PSession, and if an application * There should be one I2PSocketManager for each I2PSession, and if an application
@ -23,6 +23,7 @@ import net.i2p.util.Log;
* *
*/ */
public class I2PSocketManagerFull implements I2PSocketManager { public class I2PSocketManagerFull implements I2PSocketManager {
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
private I2PSession _session; private I2PSession _session;
@ -33,27 +34,41 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private int _maxStreams; private int _maxStreams;
private static int __managerId = 0; private static int __managerId = 0;
private ConnectionManager _connectionManager; private ConnectionManager _connectionManager;
/** /**
* How long to wait for the client app to accept() before sending back CLOSE? * How long to wait for the client app to accept() before sending back CLOSE?
* This includes the time waiting in the queue. Currently set to 5 seconds. * This includes the time waiting in the queue. Currently set to 5 seconds.
*/ */
private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000; private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000;
/**
*
*/
public I2PSocketManagerFull() { public I2PSocketManagerFull() {
_context = null; _context = null;
_session = null; _session = null;
} }
/**
*
* @param context
* @param session
* @param opts
* @param name
*/
public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
this(); this();
init(context, session, opts, name); init(context, session, opts, name);
} }
/** how many streams will we allow at once? */ /** how many streams will we allow at once? */
public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
/** /**
* *
*
* @param context
* @param session
* @param opts
* @param name
*/ */
public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
_context = context; _context = context;
@ -65,8 +80,9 @@ public class I2PSocketManagerFull implements I2PSocketManager {
String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
_maxStreams = Integer.parseInt(num); _maxStreams = Integer.parseInt(num);
} catch(NumberFormatException nfe) { } catch(NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN)) if(_log.shouldLog(Log.WARN)) {
_log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
}
_maxStreams = -1; _maxStreams = -1;
} }
_name = name + " " + (++__managerId); _name = name + " " + (++__managerId);
@ -76,44 +92,77 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_serverSocket = new I2PServerSocketFull(this); _serverSocket = new I2PServerSocketFull(this);
if(_log.shouldLog(Log.INFO)) { if(_log.shouldLog(Log.INFO)) {
_log.info("Socket manager created. \ndefault options: " + _defaultOptions _log.info("Socket manager created. \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts);
+ "\noriginal properties: " + opts);
} }
} }
public I2PSocketOptions buildOptions() { return buildOptions(null); } /**
*
* @return
*/
public I2PSocketOptions buildOptions() {
return buildOptions(null);
}
/**
*
* @param opts
* @return
*/
public I2PSocketOptions buildOptions(Properties opts) { public I2PSocketOptions buildOptions(Properties opts) {
ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions); ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
curOpts.setProperties(opts); curOpts.setProperties(opts);
return curOpts; return curOpts;
} }
/**
*
* @return
*/
public I2PSession getSession() { public I2PSession getSession() {
return _session; return _session;
} }
/**
*
* @return
*/
public ConnectionManager getConnectionManager() { public ConnectionManager getConnectionManager() {
return _connectionManager; return _connectionManager;
} }
public I2PSocket receiveSocket() throws I2PException { /**
*
* @return
* @throws net.i2p.I2PException
* @throws java.net.SocketTimeoutException
*/
public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException {
verifySession(); verifySession();
Connection con = _connectionManager.getConnectionHandler().accept(-1); Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout());
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.debug("receiveSocket() called: " + con); _log.debug("receiveSocket() called: " + con);
}
if(con != null) { if(con != null) {
I2PSocketFull sock = new I2PSocketFull(con); I2PSocketFull sock = new I2PSocketFull(con);
con.setSocket(sock); con.setSocket(sock);
return sock; return sock;
} else { } else {
if(_connectionManager.MgetSoTimeout() == -1) {
return null; return null;
} }
throw new SocketTimeoutException("I2PSocket timed out");
}
} }
/** /**
* Ping the specified peer, returning true if they replied to the ping within * Ping the specified peer, returning true if they replied to the ping within
* the timeout specified, false otherwise. This call blocks. * the timeout specified, false otherwise. This call blocks.
* *
*
* @param peer
* @param timeoutMs
* @return
*/ */
public boolean ping(Destination peer, long timeoutMs) { public boolean ping(Destination peer, long timeoutMs) {
return _connectionManager.ping(peer, timeoutMs); return _connectionManager.ping(peer, timeoutMs);
@ -125,25 +174,47 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* *
* @param ms milliseconds to wait, maximum * @param ms milliseconds to wait, maximum
*/ */
public void setAcceptTimeout(long ms) { _acceptTimeout = ms; } public void setAcceptTimeout(long ms) {
public long getAcceptTimeout() { return _acceptTimeout; } _acceptTimeout = ms;
}
/**
*
* @return
*/
public long getAcceptTimeout() {
return _acceptTimeout;
}
/**
*
* @param options
*/
public void setDefaultOptions(I2PSocketOptions options) { public void setDefaultOptions(I2PSocketOptions options) {
_defaultOptions = new ConnectionOptions((ConnectionOptions)options); _defaultOptions = new ConnectionOptions((ConnectionOptions)options);
} }
/**
*
* @return
*/
public I2PSocketOptions getDefaultOptions() { public I2PSocketOptions getDefaultOptions() {
return _defaultOptions; return _defaultOptions;
} }
/**
*
* @return
*/
public I2PServerSocket getServerSocket() { public I2PServerSocket getServerSocket() {
_connectionManager.setAllowIncomingConnections(true); _connectionManager.setAllowIncomingConnections(true);
return _serverSocket; return _serverSocket;
} }
private void verifySession() throws I2PException { private void verifySession() throws I2PException {
if (!_connectionManager.getSession().isClosed()) if(!_connectionManager.getSession().isClosed()) {
return; return;
}
_connectionManager.getSession().connect(); _connectionManager.getSession().connect();
} }
@ -159,20 +230,22 @@ public class I2PSocketManagerFull implements I2PSocketManager {
public I2PSocket connect(Destination peer, I2PSocketOptions options) public I2PSocket connect(Destination peer, I2PSocketOptions options)
throws I2PException, NoRouteToHostException { throws I2PException, NoRouteToHostException {
verifySession(); verifySession();
if (options == null) if(options == null) {
options = _defaultOptions; options = _defaultOptions;
}
ConnectionOptions opts = null; ConnectionOptions opts = null;
if (options instanceof ConnectionOptions) if(options instanceof ConnectionOptions) {
opts = new ConnectionOptions((ConnectionOptions)options); opts = new ConnectionOptions((ConnectionOptions)options);
else } else {
opts = new ConnectionOptions(options); opts = new ConnectionOptions(options);
}
if (_log.shouldLog(Log.INFO)) if(_log.shouldLog(Log.INFO)) {
_log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts);
+ " with options: " + opts); }
Connection con = _connectionManager.connect(peer, opts); Connection con = _connectionManager.connect(peer, opts);
if (con == null) if(con == null) {
throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
}
I2PSocketFull socket = new I2PSocketFull(con); I2PSocketFull socket = new I2PSocketFull(con);
con.setSocket(socket); con.setSocket(socket);
if(con.getConnectionError() != null) { if(con.getConnectionError() != null) {
@ -187,6 +260,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* *
* @param peer Destination to connect to * @param peer Destination to connect to
* *
* @return
* @throws NoRouteToHostException if the peer is not found or not reachable * @throws NoRouteToHostException if the peer is not found or not reachable
* @throws I2PException if there is some other I2P-related problem * @throws I2PException if there is some other I2P-related problem
*/ */
@ -216,25 +290,49 @@ public class I2PSocketManagerFull implements I2PSocketManager {
/** /**
* Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
* *
*
* @return
*/ */
public Set listSockets() { public Set listSockets() {
Set connections = _connectionManager.listConnections(); Set connections = _connectionManager.listConnections();
Set rv = new HashSet(connections.size()); Set rv = new HashSet(connections.size());
for(Iterator iter = connections.iterator(); iter.hasNext();) { for(Iterator iter = connections.iterator(); iter.hasNext();) {
Connection con = (Connection)iter.next(); Connection con = (Connection)iter.next();
if (con.getSocket() != null) if(con.getSocket() != null) {
rv.add(con.getSocket()); rv.add(con.getSocket());
} }
}
return rv; return rv;
} }
public String getName() { return _name; } /**
public void setName(String name) { _name = name; } *
* @return
*/
public String getName() {
return _name;
}
/**
*
* @param name
*/
public void setName(String name) {
_name = name;
}
/**
*
* @param lsnr
*/
public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
_connectionManager.getMessageHandler().addDisconnectListener(lsnr); _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
} }
/**
*
* @param lsnr
*/
public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
_connectionManager.getMessageHandler().removeDisconnectListener(lsnr); _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
} }

View File

@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer;
/** /**
* *
*/ */
class RetransmissionTimer extends SimpleTimer { public class RetransmissionTimer extends SimpleTimer {
private static final RetransmissionTimer _instance = new RetransmissionTimer(); private static final RetransmissionTimer _instance = new RetransmissionTimer();
public static final SimpleTimer getInstance() { return _instance; } public static final SimpleTimer getInstance() { return _instance; }
protected RetransmissionTimer() { super("StreamingTimer"); } protected RetransmissionTimer() { super("StreamingTimer"); }

View File

@ -5,22 +5,32 @@ import java.util.List;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
class Executor implements Runnable { class Executor implements Runnable {
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
private List _readyEvents; private List _readyEvents;
public Executor(I2PAppContext ctx, Log log, List events) { private SimpleStore runn;
public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) {
_context = ctx; _context = ctx;
_readyEvents = events; _readyEvents = events;
runn = x;
} }
public void run() { public void run() {
while (true) { while(runn.getAnswer()) {
SimpleTimer.TimedEvent evt = null; SimpleTimer.TimedEvent evt = null;
synchronized(_readyEvents) { synchronized(_readyEvents) {
if (_readyEvents.size() <= 0) if(_readyEvents.size() <= 0) {
try { _readyEvents.wait(); } catch (InterruptedException ie) {} try {
if (_readyEvents.size() > 0) _readyEvents.wait();
} catch(InterruptedException ie) {
}
}
if(_readyEvents.size() > 0) {
evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
} }
}
if(evt != null) { if(evt != null) {
long before = _context.clock().now(); long before = _context.clock().now();
@ -30,17 +40,24 @@ class Executor implements Runnable {
log("wtf, event borked: " + evt, t); log("wtf, event borked: " + evt, t);
} }
long time = _context.clock().now() - before; long time = _context.clock().now() - before;
if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) ) if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) {
_log.warn("wtf, event execution took " + time + ": " + evt); _log.warn("wtf, event execution took " + time + ": " + evt);
} }
} }
} }
}
/**
*
* @param msg
* @param t
*/
private void log(String msg, Throwable t) { private void log(String msg, Throwable t) {
synchronized(this) { synchronized(this) {
if (_log == null) if(_log == null) {
_log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
} }
}
_log.log(Log.CRIT, msg, t); _log.log(Log.CRIT, msg, t);
} }
} }

View File

@ -0,0 +1,35 @@
/*
* This is free software, do as you please.
*/
package net.i2p.util;
/**
*
* @author sponge
*/
public class SimpleStore {
private boolean answer;
SimpleStore(boolean x) {
answer=x;
}
/**
* set the answer
*
* @param x
*/
public void setAnswer(boolean x) {
answer = x;
}
/**
*
* @return boolean
*/
public boolean getAnswer() {
return answer;
}
}

View File

@ -16,8 +16,12 @@ import net.i2p.I2PAppContext;
* *
*/ */
public class SimpleTimer { public class SimpleTimer {
private static final SimpleTimer _instance = new SimpleTimer(); private static final SimpleTimer _instance = new SimpleTimer();
public static SimpleTimer getInstance() { return _instance; }
public static SimpleTimer getInstance() {
return _instance;
}
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
/** event time (Long) to event (TimedEvent) mapping */ /** event time (Long) to event (TimedEvent) mapping */
@ -25,9 +29,21 @@ public class SimpleTimer {
/** event (TimedEvent) to event time (Long) mapping */ /** event (TimedEvent) to event time (Long) mapping */
private Map _eventTimes; private Map _eventTimes;
private List _readyEvents; private List _readyEvents;
private SimpleStore runn;
protected SimpleTimer() { this("SimpleTimer"); } /**
*
*/
protected SimpleTimer() {
this("SimpleTimer");
}
/**
*
* @param name
*/
protected SimpleTimer(String name) { protected SimpleTimer(String name) {
runn = new SimpleStore(true);
_context = I2PAppContext.getGlobalContext(); _context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(SimpleTimer.class); _log = _context.logManager().getLog(SimpleTimer.class);
_events = new TreeMap(); _events = new TreeMap();
@ -38,13 +54,28 @@ public class SimpleTimer {
runner.setDaemon(true); runner.setDaemon(true);
runner.start(); runner.start();
for(int i = 0; i < 3; i++) { for(int i = 0; i < 3; i++) {
I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents)); I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn));
executor.setName(name + "Executor " + i); executor.setName(name + "Executor " + i);
executor.setDaemon(true); executor.setDaemon(true);
executor.start(); executor.start();
} }
} }
/**
* Removes the SimpleTimer.
*/
public void removeSimpleTimer() {
synchronized(_events) {
runn.setAnswer(false);
_events.notifyAll();
}
}
/**
*
* @param event
* @param timeoutMs
*/
public void reschedule(TimedEvent event, long timeoutMs) { public void reschedule(TimedEvent event, long timeoutMs) {
addEvent(event, timeoutMs, false); addEvent(event, timeoutMs, false);
} }
@ -55,9 +86,16 @@ public class SimpleTimer {
* for the earlier of the two timeouts, which may be before this stated * for the earlier of the two timeouts, which may be before this stated
* timeout. If this is not the desired behavior, call removeEvent first. * timeout. If this is not the desired behavior, call removeEvent first.
* *
* @param event
* @param timeoutMs
*/ */
public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } public void addEvent(TimedEvent event, long timeoutMs) {
addEvent(event, timeoutMs, true);
}
/** /**
* @param event
* @param timeoutMs
* @param useEarliestTime if its already scheduled, use the earlier of the * @param useEarliestTime if its already scheduled, use the earlier of the
* two timeouts, else use the later * two timeouts, else use the later
*/ */
@ -86,8 +124,9 @@ public class SimpleTimer {
} }
} }
} }
while (_events.containsKey(time)) while(_events.containsKey(time)) {
time = new Long(time.longValue() + 1); time = new Long(time.longValue() + 1);
}
_events.put(time, event); _events.put(time, event);
_eventTimes.put(event, time); _eventTimes.put(event, time);
@ -107,24 +146,33 @@ public class SimpleTimer {
_events.notifyAll(); _events.notifyAll();
} }
if(time.longValue() > eventTime + 100) { if(time.longValue() > eventTime + 100) {
if (_log.shouldLog(Log.WARN)) if(_log.shouldLog(Log.WARN)) {
_log.warn("Lots of timer congestion, had to push " + event + " back " _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")");
+ (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")"); }
} }
long timeToAdd = System.currentTimeMillis() - now; long timeToAdd = System.currentTimeMillis() - now;
if(timeToAdd > 50) { if(timeToAdd > 50) {
if (_log.shouldLog(Log.WARN)) if(_log.shouldLog(Log.WARN)) {
_log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued"); _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
} }
}
} }
/**
*
* @param evt
* @return
*/
public boolean removeEvent(TimedEvent evt) { public boolean removeEvent(TimedEvent evt) {
if (evt == null) return false; if(evt == null) {
return false;
}
synchronized(_events) { synchronized(_events) {
Long when = (Long)_eventTimes.remove(evt); Long when = (Long)_eventTimes.remove(evt);
if (when != null) if(when != null) {
_events.remove(when); _events.remove(when);
}
return null != when; return null != when;
} }
} }
@ -133,6 +181,7 @@ public class SimpleTimer {
* Simple interface for events to be queued up and notified on expiration * Simple interface for events to be queued up and notified on expiration
*/ */
public interface TimedEvent { public interface TimedEvent {
/** /**
* the time requested has been reached (this call should NOT block, * the time requested has been reached (this call should NOT block,
* otherwise the whole SimpleTimer gets backed up) * otherwise the whole SimpleTimer gets backed up)
@ -140,15 +189,15 @@ public class SimpleTimer {
*/ */
public void timeReached(); public void timeReached();
} }
private long _occurredTime; private long _occurredTime;
private long _occurredEventCount; private long _occurredEventCount;
private TimedEvent _recentEvents[] = new TimedEvent[5]; // not used
// private TimedEvent _recentEvents[] = new TimedEvent[5];
private class SimpleTimerRunner implements Runnable { private class SimpleTimerRunner implements Runnable {
public void run() { public void run() {
List eventsToFire = new ArrayList(1); List eventsToFire = new ArrayList(1);
while (true) { while(runn.getAnswer()) {
try { try {
synchronized(_events) { synchronized(_events) {
//if (_events.size() <= 0) //if (_events.size() <= 0)
@ -158,8 +207,10 @@ public class SimpleTimer {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long nextEventDelay = -1; long nextEventDelay = -1;
Object nextEvent = null; Object nextEvent = null;
while (true) { while(runn.getAnswer()) {
if (_events.size() <= 0) break; if(_events.size() <= 0) {
break;
}
Long when = (Long)_events.firstKey(); Long when = (Long)_events.firstKey();
if(when.longValue() <= now) { if(when.longValue() <= now) {
TimedEvent evt = (TimedEvent)_events.remove(when); TimedEvent evt = (TimedEvent)_events.remove(when);
@ -175,16 +226,15 @@ public class SimpleTimer {
} }
if(eventsToFire.size() <= 0) { if(eventsToFire.size() <= 0) {
if(nextEventDelay != -1) { if(nextEventDelay != -1) {
if (_log.shouldLog(Log.DEBUG)) if(_log.shouldLog(Log.DEBUG)) {
_log.debug("Next event in " + nextEventDelay + ": " + nextEvent); _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
}
_events.wait(nextEventDelay); _events.wait(nextEventDelay);
} else { } else {
_events.wait(); _events.wait();
} }
} }
} }
} catch (ThreadDeath td) {
return; // die
} catch(InterruptedException ie) { } catch(InterruptedException ie) {
// ignore // ignore
} catch(Throwable t) { } catch(Throwable t) {
@ -200,8 +250,9 @@ public class SimpleTimer {
now = now - (now % 1000); now = now - (now % 1000);
synchronized(_readyEvents) { synchronized(_readyEvents) {
for (int i = 0; i < eventsToFire.size(); i++) for(int i = 0; i < eventsToFire.size(); i++) {
_readyEvents.add(eventsToFire.get(i)); _readyEvents.add(eventsToFire.get(i));
}
_readyEvents.notifyAll(); _readyEvents.notifyAll();
} }