From 9f433b2e6bafefab2cc277d9cea8c446ef9717a4 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 14 Jul 2011 18:53:10 +0000 Subject: [PATCH 1/6] * Streaming: - Hook I2CP ports through to I2PSocket - Javadocs, init cleanups, final --- .../net/i2p/client/streaming/I2PSocket.java | 15 ++++ .../i2p/client/streaming/I2PSocketImpl.java | 21 +++++- .../streaming/I2PSocketManagerFactory.java | 6 +- .../client/streaming/I2PSocketOptions.java | 30 +++++++- .../streaming/I2PSocketOptionsImpl.java | 64 ++++++++++++++++- .../net/i2p/client/streaming/Connection.java | 70 ++++++++++++------- .../streaming/ConnectionDataReceiver.java | 20 +++--- .../client/streaming/ConnectionHandler.java | 9 ++- .../client/streaming/ConnectionManager.java | 37 +++++----- .../client/streaming/ConnectionOptions.java | 40 ++++++++--- .../streaming/ConnectionPacketHandler.java | 4 +- .../client/streaming/I2PServerSocketFull.java | 2 +- .../i2p/client/streaming/I2PSocketFull.java | 24 ++++++- .../i2p/client/streaming/MessageHandler.java | 24 +++++-- .../client/streaming/MessageOutputStream.java | 4 -- .../src/net/i2p/client/streaming/Packet.java | 47 +++++++++++++ .../i2p/client/streaming/PacketHandler.java | 6 +- .../net/i2p/client/streaming/PacketLocal.java | 14 ++-- .../net/i2p/client/streaming/PacketQueue.java | 14 ++-- .../i2p/client/streaming/StandardSocket.java | 8 +-- .../net/i2p/client/streaming/TCBShare.java | 8 +-- 21 files changed, 361 insertions(+), 106 deletions(-) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java index fee9fe6678..595385cc47 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java @@ -70,6 +70,21 @@ public interface I2PSocket { public boolean isClosed(); public void setSocketErrorListener(SocketErrorListener lsnr); + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort(); + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort(); + /** * Allow notification of underlying errors communicating across I2P without * waiting for any sort of cleanup process. For example, if some data could diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 98c0a2cbfc..05ff0631e5 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -7,6 +7,7 @@ import java.io.OutputStream; import net.i2p.I2PAppContext; import net.i2p.I2PException; +import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; import net.i2p.util.Clock; @@ -301,6 +302,24 @@ class I2PSocketImpl implements I2PSocket { public long getCreatedOn() { return _createdOn; } public long getClosedOn() { return _closedOn; } + /** + * The remote port. + * @return 0 always + * @since 0.8.9 + */ + public int getPort() { + return I2PSession.PORT_UNSPECIFIED; + } + + /** + * The local port. + * @return 0 always + * @since 0.8.9 + */ + public int getLocalPort() { + return I2PSession.PORT_UNSPECIFIED; + } + private String getPrefix() { return "[" + _socketId + "]: "; } @@ -671,7 +690,7 @@ class I2PSocketImpl implements I2PSocket { return sent; } } - + @Override public String toString() { return "" + hashCode(); } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 34dc8ac593..57f5f7bb4d 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -90,7 +90,7 @@ public class I2PSocketManagerFactory { * Create a socket manager using the destination loaded from the given private key * stream and connected to the default I2CP host and port. * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(InputStream myPrivateKeyStream) { @@ -101,7 +101,7 @@ public class I2PSocketManagerFactory { * Create a socket manager using the destination loaded from the given private key * stream and connected to the default I2CP host and port. * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param opts I2CP options * @return the newly created socket manager, or null if there were errors */ @@ -114,7 +114,7 @@ public class I2PSocketManagerFactory { * stream and connected to the I2CP router on the specified machine on the given * port * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param i2cpHost I2CP host * @param i2cpPort I2CP port * @param opts I2CP options diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index 94532e51b1..d926eb8313 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -2,7 +2,7 @@ package net.i2p.client.streaming; /** * Define the configuration for streaming and verifying data on the socket. - * + * Use I2PSocketManager.buildOptions() to get one of these. */ public interface I2PSocketOptions { /** How much data will we accept that hasn't been written out yet. */ @@ -81,4 +81,32 @@ public interface I2PSocketOptions { * @param ms wait time to block on the output stream while waiting for the data to flush. */ public void setWriteTimeout(long ms); + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort(); + + /** + * The remote port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setPort(int port); + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort(); + + /** + * The local port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setLocalPort(int port); } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java index b1fedcea79..cb66b1486e 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java @@ -4,22 +4,32 @@ import java.util.Properties; /** * Define the configuration for streaming and verifying data on the socket. - * + * Use I2PSocketManager.buildOptions() to get one of these. */ class I2PSocketOptionsImpl implements I2PSocketOptions { private long _connectTimeout; private long _readTimeout; private long _writeTimeout; private int _maxBufferSize; + private int _localPort; + private int _remotePort; public static final int DEFAULT_BUFFER_SIZE = 1024*64; public static final int DEFAULT_WRITE_TIMEOUT = -1; public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000; + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from System properties. Does not set local port or remote port. + */ public I2PSocketOptionsImpl() { this(System.getProperties()); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public I2PSocketOptionsImpl(I2PSocketOptions opts) { this(System.getProperties()); if (opts != null) { @@ -27,13 +37,25 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { _readTimeout = opts.getReadTimeout(); _writeTimeout = opts.getWriteTimeout(); _maxBufferSize = opts.getMaxBufferSize(); + _localPort = opts.getLocalPort(); + _remotePort = opts.getPort(); } } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public I2PSocketOptionsImpl(Properties opts) { init(opts); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public void setProperties(Properties opts) { if (opts == null) return; if (opts.containsKey(PROP_BUFFER_SIZE)) @@ -46,6 +68,10 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { _writeTimeout = getInt(opts, PROP_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + */ protected void init(Properties opts) { _maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); _connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); @@ -144,4 +170,40 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { public void setWriteTimeout(long ms) { _writeTimeout = ms; } + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort() { + return _remotePort; + } + + /** + * The remote port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setPort(int port) { + _remotePort = port; + } + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + + /** + * The local port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setLocalPort(int port) { + _localPort = port; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 0eb8fe66ff..20105da772 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -23,25 +23,25 @@ import net.i2p.util.SimpleTimer2; * */ class Connection { - private I2PAppContext _context; - private Log _log; - private ConnectionManager _connectionManager; + private final I2PAppContext _context; + private final Log _log; + private final ConnectionManager _connectionManager; private Destination _remotePeer; private long _sendStreamId; private long _receiveStreamId; private long _lastSendTime; - private AtomicLong _lastSendId; + private final AtomicLong _lastSendId; private boolean _resetReceived; private boolean _resetSent; private long _resetSentOn; private boolean _connected; private boolean _hardDisconnected; - private MessageInputStream _inputStream; - private MessageOutputStream _outputStream; - private SchedulerChooser _chooser; + private final MessageInputStream _inputStream; + private final MessageOutputStream _outputStream; + private final SchedulerChooser _chooser; private long _nextSendTime; private long _ackedPackets; - private long _createdOn; + private final long _createdOn; private long _closeSentOn; private long _closeReceivedOn; private int _unackedPacketsReceived; @@ -51,10 +51,10 @@ class Connection { private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ private final Map _outboundPackets; - private PacketQueue _outboundQueue; - private ConnectionPacketHandler _handler; + private final PacketQueue _outboundQueue; + private final ConnectionPacketHandler _handler; private ConnectionOptions _options; - private ConnectionDataReceiver _receiver; + private final ConnectionDataReceiver _receiver; private I2PSocketFull _socket; /** set to an error cause if the connection could not be established */ private String _connectionError; @@ -70,8 +70,10 @@ class Connection { private final Object _connectLock; /** how many messages have been resent and not yet ACKed? */ private int _activeResends; - private ConEvent _connectionEvent; - private int _randomWait; + private final ConEvent _connectionEvent; + private final int _randomWait; + private int _localPort; + private int _remotePort; private long _lifetimeBytesSent; private long _lifetimeBytesReceived; @@ -86,10 +88,13 @@ class Connection { public static final int MAX_WINDOW_SIZE = 128; - public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + PacketQueue queue, ConnectionPacketHandler handler) { this(ctx, manager, chooser, queue, handler, null); } - public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { + + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { _context = ctx; _connectionManager = manager; _chooser = chooser; @@ -101,34 +106,31 @@ class Connection { // FIXME pass through a passive flush delay setting as the 4th arg _outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); _outboundPackets = new TreeMap(); + if (opts != null) { + _localPort = opts.getLocalPort(); + _remotePort = opts.getPort(); + } _options = (opts != null ? opts : new ConnectionOptions()); _outputStream.setWriteTimeout((int)_options.getWriteTimeout()); _inputStream.setReadTimeout((int)_options.getReadTimeout()); _lastSendId = new AtomicLong(-1); _nextSendTime = -1; - _ackedPackets = 0; _createdOn = _context.clock().now(); _closeSentOn = -1; _closeReceivedOn = -1; - _unackedPacketsReceived = 0; _congestionWindowEnd = _options.getWindowSize()-1; _highestAckedThrough = -1; _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow _lastCongestionTime = -1; _lastCongestionHighestUnacked = -1; - _resetReceived = false; _connected = true; _disconnectScheduledOn = -1; _lastReceivedOn = -1; _activityTimer = new ActivityTimer(); _ackSinceCongestion = true; _connectLock = new Object(); - _activeResends = 0; _resetSentOn = -1; - _isInbound = false; - _updatedShareOpts = false; _connectionEvent = new ConEvent(); - _hardDisconnected = false; _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -678,6 +680,23 @@ class Connection { public I2PSocketFull getSocket() { return _socket; } public void setSocket(I2PSocketFull socket) { _socket = socket; } + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort() { + return _remotePort; + } + + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + public String getConnectionError() { return _connectionError; } public void setConnectionError(String err) { _connectionError = err; } @@ -781,7 +800,7 @@ class Connection { } public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; } - + void congestionOccurred() { // if we hit congestion and e.g. 5 packets are resent, // dont set the size to (winSize >> 4). only set the @@ -962,12 +981,13 @@ class Connection { * @return the inbound message stream */ public MessageInputStream getInputStream() { return _inputStream; } + /** stream that the local peer sends data to the remote peer on * @return the outbound message stream */ public MessageOutputStream getOutputStream() { return _outputStream; } - - @Override + + @Override public String toString() { StringBuilder buf = new StringBuilder(128); buf.append("[Connection "); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 07247e670e..900fb96267 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -13,11 +13,14 @@ import net.i2p.util.Log; * */ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { - private I2PAppContext _context; - private Log _log; - private Connection _connection; + private final I2PAppContext _context; + private final Log _log; + private final Connection _connection; private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus(); + /** + * @param con non-null + */ public ConnectionDataReceiver(I2PAppContext ctx, Connection con) { _context = ctx; _log = ctx.logManager().getLog(ConnectionDataReceiver.class); @@ -41,10 +44,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * @return !flush */ public boolean writeInProcess() { - Connection con = _connection; - if (con != null) - return con.getUnackedPacketsSent() >= con.getOptions().getWindowSize(); - return false; + return _connection.getUnackedPacketsSent() >= _connection.getOptions().getWindowSize(); } /** @@ -60,7 +60,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { */ public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) { Connection con = _connection; - if (con == null) return _dummyStatus; + //if (con == null) return _dummyStatus; boolean doSend = true; if ( (size <= 0) && (con.getLastSendId() >= 0) ) { if (con.getOutputStream().getClosed()) { @@ -121,7 +121,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { */ public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { Connection con = _connection; - if (con == null) return null; + //if (con == null) return null; long before = System.currentTimeMillis(); PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement); long built = System.currentTimeMillis(); @@ -185,6 +185,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setFlag(Packet.FLAG_SYNCHRONIZE); packet.setOptionalFrom(con.getSession().getMyDestination()); packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); + packet.setLocalPort(con.getLocalPort()); + packet.setRemotePort(con.getPort()); } if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) { packet.setFlag(Packet.FLAG_NO_ACK); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 6ba876dd92..e2c9d3f556 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -18,10 +18,10 @@ import net.i2p.util.SimpleTimer; * @author zzz modded to use concurrent and bound queue size */ class ConnectionHandler { - private I2PAppContext _context; - private Log _log; - private ConnectionManager _manager; - private LinkedBlockingQueue _synQueue; + private final I2PAppContext _context; + private final Log _log; + private final ConnectionManager _manager; + private final LinkedBlockingQueue _synQueue; private boolean _active; private int _acceptTimeout; @@ -41,7 +41,6 @@ class ConnectionHandler { _log = context.logManager().getLog(ConnectionHandler.class); _manager = mgr; _synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE); - _active = false; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 10ff388564..95a1923677 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -21,24 +21,24 @@ import net.i2p.util.SimpleTimer; * */ class ConnectionManager { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private MessageHandler _messageHandler; - private PacketHandler _packetHandler; - private ConnectionHandler _connectionHandler; - private PacketQueue _outboundQueue; - private SchedulerChooser _schedulerChooser; - private ConnectionPacketHandler _conPacketHandler; - private TCBShare _tcbShare; + private final I2PAppContext _context; + private final Log _log; + private final I2PSession _session; + private final MessageHandler _messageHandler; + private final PacketHandler _packetHandler; + private final ConnectionHandler _connectionHandler; + private final PacketQueue _outboundQueue; + private final SchedulerChooser _schedulerChooser; + private final ConnectionPacketHandler _conPacketHandler; + private final TCBShare _tcbShare; /** Inbound stream ID (Long) to Connection map */ - private ConcurrentHashMap _connectionByInboundId; + private final ConcurrentHashMap _connectionByInboundId; /** Ping ID (Long) to PingRequest */ private final Map _pendingPings; private boolean _allowIncoming; private boolean _throttlersInitialized; private int _maxConcurrentStreams; - private ConnectionOptions _defaultOptions; + private final ConnectionOptions _defaultOptions; private volatile int _numWaiting; private long _soTimeout; private ConnThrottler _minuteThrottler; @@ -59,10 +59,12 @@ class ConnectionManager { _schedulerChooser = new SchedulerChooser(_context); _conPacketHandler = new ConnectionPacketHandler(_context); _tcbShare = new TCBShare(_context); - _session.setSessionListener(_messageHandler); + // PROTO_ANY is for backward compatibility (pre-0.7.1) + // TODO change proto to PROTO_STREAMING someday. + // Right now we get everything, and rely on Datagram to specify PROTO_UDP. + // PacketQueue has sent PROTO_STREAMING since the beginning of mux support (0.7.1) + _session.addMuxedSessionListener(_messageHandler, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); _outboundQueue = new PacketQueue(_context, _session, this); - _allowIncoming = false; - _numWaiting = 0; /** Socket timeout for accept() */ _soTimeout = -1; @@ -141,7 +143,10 @@ class ConnectionManager { * it, or null if the syn's streamId was already taken */ public Connection receiveConnection(Packet synPacket) { - Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); + ConnectionOptions opts = new ConnectionOptions(_defaultOptions); + opts.setPort(synPacket.getRemotePort()); + opts.setLocalPort(synPacket.getLocalPort()); + Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); _tcbShare.updateOptsFromShare(con); con.setInbound(); long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index ae14daa15c..e6ec6fa423 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -106,6 +106,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * This is based on documentation, the code, and logging, however there are still * some parts that could use more research. * + *
      *  1024 Tunnel Message
      *  - 21 Header (see router/tunnel/BatchedPreprocessor.java)
      * -----
@@ -169,7 +170,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      * Similarly:
      *   3 msgs: 2722
      *   4 msgs: 3714
-     *
+     *
* * Before release 0.6.1.14 this was 4096. * From release 0.6.1.14 through release 0.6.4, this was 960. @@ -205,18 +206,35 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public static final int DEFAULT_MAX_MESSAGE_SIZE = 1730; public static final int MIN_MESSAGE_SIZE = 512; + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from System properties. Does not set local port or remote port. + */ public ConnectionOptions() { super(); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public ConnectionOptions(Properties opts) { super(opts); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public ConnectionOptions(I2PSocketOptions opts) { super(opts); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public ConnectionOptions(ConnectionOptions opts) { super(opts); if (opts != null) { @@ -235,8 +253,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setInboundBufferSize(opts.getInboundBufferSize()); setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor()); setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor()); - setWriteTimeout(opts.getWriteTimeout()); - setReadTimeout(opts.getReadTimeout()); + // handled in super() + // not clear why added by jr 12/22/2005 + //setWriteTimeout(opts.getWriteTimeout()); + //setReadTimeout(opts.getReadTimeout()); setAnswerPings(opts.getAnswerPings()); initLists(opts); _maxConnsPerMinute = opts.getMaxConnsPerMinute(); @@ -248,7 +268,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { } } - @Override + @Override protected void init(Properties opts) { super.init(opts); _trend = new int[TREND_COUNT]; @@ -262,12 +282,14 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY)); setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); - setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + // handled in super() + //setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); + // overrides default in super() setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); initLists(opts); @@ -279,7 +301,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0); } - @Override + @Override public void setProperties(Properties opts) { super.setProperties(opts); if (opts == null) return; @@ -303,8 +325,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); if (opts.containsKey(PROP_MAX_RESENDS)) setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); - if (opts.containsKey(PROP_WRITE_TIMEOUT)) - setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + // handled in super() + //if (opts.containsKey(PROP_WRITE_TIMEOUT)) + // setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); if (opts.containsKey(PROP_INACTIVITY_ACTION)) @@ -316,6 +339,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); if (opts.containsKey(PROP_CONNECT_TIMEOUT)) // wow 5 minutes!!! FIXME!! + // overrides default in super() setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); if (opts.containsKey(PROP_ANSWER_PINGS)) setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 526fae3722..53ebb17e1b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -15,8 +15,8 @@ import net.i2p.util.SimpleTimer; * */ class ConnectionPacketHandler { - private I2PAppContext _context; - private Log _log; + private final I2PAppContext _context; + private final Log _log; public ConnectionPacketHandler(I2PAppContext context) { _context = context; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index 71e1dd3ac7..f40dbd0c79 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -8,7 +8,7 @@ import net.i2p.I2PException; * */ class I2PServerSocketFull implements I2PServerSocket { - private I2PSocketManagerFull _socketManager; + private final I2PSocketManagerFull _socketManager; public I2PServerSocketFull(I2PSocketManagerFull mgr) { _socketManager = mgr; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index f8dbe74ea6..5cd76a864f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import net.i2p.client.I2PSession; import net.i2p.data.Destination; /** @@ -127,7 +128,28 @@ class I2PSocketFull implements I2PSocket { if (c != null) c.disconnectComplete(); } - @Override + + /** + * The remote port. + * @return the port or 0 if unknown + * @since 0.8.9 + */ + public int getPort() { + Connection c = _connection; + return c == null ? I2PSession.PORT_UNSPECIFIED : c.getPort(); + } + + /** + * The local port. + * @return the port or 0 if unknown + * @since 0.8.9 + */ + public int getLocalPort() { + Connection c = _connection; + return c == null ? I2PSession.PORT_UNSPECIFIED : c.getLocalPort(); + } + + @Override public String toString() { Connection c = _connection; if (c == null) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index 632b904b74..d9ca691b4d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -7,7 +7,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; -import net.i2p.client.I2PSessionListener; +import net.i2p.client.I2PSessionMuxedListener; import net.i2p.util.Log; /** @@ -15,10 +15,10 @@ import net.i2p.util.Log; * Packets, if we can. * */ -class MessageHandler implements I2PSessionListener { - private ConnectionManager _manager; - private I2PAppContext _context; - private Log _log; +class MessageHandler implements I2PSessionMuxedListener { + private final ConnectionManager _manager; + private final I2PAppContext _context; + private final Log _log; private final Set _listeners; public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) { @@ -31,11 +31,23 @@ class MessageHandler implements I2PSessionListener { /** Instruct the client that the given session has received a message with * size # of bytes. + * This shouldn't be called anymore since we are registering as a muxed listener. * @param session session to notify * @param msgId message number available * @param size size of the message */ public void messageAvailable(I2PSession session, int msgId, long size) { + messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED, + I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + } + + /** Instruct the client that the given session has received a message with + * size # of bytes. + * @param session session to notify + * @param msgId message number available + * @param size size of the message + */ + public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) { byte data[] = null; try { data = session.receiveMessage(msgId); @@ -49,6 +61,8 @@ class MessageHandler implements I2PSessionListener { Packet packet = new Packet(); try { packet.readPacket(data, 0, data.length); + packet.setRemotePort(fromPort); + packet.setLocalPort(toPort); _manager.getPacketHandler().receivePacket(packet); } catch (IllegalArgumentException iae) { _context.statManager().addRateData("stream.packetReceiveFailure", 1, 0); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 71c9ebce9f..737e0f7b2d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -63,14 +63,10 @@ class MessageOutputStream extends OutputStream { _buf = _dataCache.acquire().getData(); // new byte[bufSize]; _dataReceiver = receiver; _dataLock = new Object(); - _written = 0; - _closed = false; _writeTimeout = -1; _passiveFlushDelay = passiveFlushDelay; _nextBufferSize = -1; _sendPeriodBeginTime = ctx.clock().now(); - _sendPeriodBytes = 0; - _sendBps = 0; _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _flusher = new Flusher(); if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index faff2ff722..5f25c3cd96 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -13,6 +13,13 @@ import net.i2p.data.SigningPrivateKey; import net.i2p.util.Log; /** + * This contains solely the data that goes out on the wire, + * including the local and remote port which is embedded in + * the I2CP overhead, not in the packet itself. + * For local state saved for outbound packets, see PacketLocal. + * + *

+ * * Contain a single packet transferred as part of a streaming connection. * The data format is as follows:

    *
  • {@link #getSendStreamId sendStreamId} [4 byte value]
  • @@ -67,6 +74,8 @@ class Packet { private Destination _optionFrom; private int _optionDelay; private int _optionMaxSize; + private int _localPort; + private int _remotePort; /** * The receiveStreamId will be set to this when the packet doesn't know @@ -148,6 +157,10 @@ class Packet { public static final int DEFAULT_MAX_SIZE = 32*1024; protected static final int MAX_DELAY_REQUEST = 65535; + /** + * Does no initialization. + * See readPacket() for inbound packets, and the setters for outbound packets. + */ public Packet() { } private boolean _sendStreamIdSet = false; @@ -316,6 +329,40 @@ class Packet { _optionMaxSize = numBytes; } + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + + /** + * Must be called to change the port, not set by readPacket() + * as the port is out-of-band in the I2CP header. + * @since 0.8.9 + */ + public void setLocalPort(int port) { + _localPort = port; + } + + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getRemotePort() { + return _remotePort; + } + + /** + * Must be called to change the port, not set by readPacket() + * as the port is out-of-band in the I2CP header. + * @since 0.8.9 + */ + public void setRemotePort(int port) { + _remotePort = port; + } + /** * Write the packet to the buffer (starting at the offset) and return * the number of bytes written. diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 4f977712c3..ef145179c3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -16,9 +16,9 @@ import net.i2p.util.Log; * */ class PacketHandler { - private ConnectionManager _manager; - private I2PAppContext _context; - private Log _log; + private final ConnectionManager _manager; + private final I2PAppContext _context; + private final Log _log; //private int _lastDelay; //private int _dropped; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index dd5fe1ceb4..ca2e25d42d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -13,13 +13,13 @@ import net.i2p.util.SimpleTimer2; * retries, etc. */ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { - private I2PAppContext _context; - private Log _log; - private Connection _connection; + private final I2PAppContext _context; + private final Log _log; + private final Connection _connection; private Destination _to; private SessionKey _keyUsed; private Set _tagsSent; - private long _createdOn; + private final long _createdOn; private int _numSends; private long _lastSend; private long _acceptedOn; @@ -29,9 +29,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private volatile boolean _retransmitted; private SimpleTimer2.TimedEvent _resendEvent; + /** not bound to a connection */ public PacketLocal(I2PAppContext ctx, Destination to) { this(ctx, to, null); } + public PacketLocal(I2PAppContext ctx, Destination to, Connection con) { _context = ctx; _createdOn = ctx.clock().now(); @@ -40,8 +42,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { _connection = con; _lastSend = -1; _cancelledOn = -1; - _nackCount = 0; - _retransmitted = false; } public Destination getTo() { return _to; } @@ -138,6 +138,8 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { } public int getNumSends() { return _numSends; } public long getLastSend() { return _lastSend; } + + /** @return null if not bound */ public Connection getConnection() { return _connection; } public void incrementNACKs() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index fa0aa87ce7..4de4c6e163 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -19,11 +19,11 @@ import net.i2p.util.Log; * */ class PacketQueue { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private ConnectionManager _connectionManager; - private ByteCache _cache = ByteCache.getInstance(64, 36*1024); + private final I2PAppContext _context; + private final Log _log; + private final I2PSession _session; + private final ConnectionManager _connectionManager; + private final ByteCache _cache = ByteCache.getInstance(64, 36*1024); public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { _context = context; @@ -98,7 +98,7 @@ class PacketQueue { // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSessionMuxedImpl no tags sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, expires, - I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort()); else // I2PSessionImpl2 //sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0); @@ -107,7 +107,7 @@ class PacketQueue { // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSessionMuxedImpl no tags sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, - I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort()); end = _context.clock().now(); if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) ) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java b/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java index 6ba78bfd49..b0ffbf2504 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java @@ -107,11 +107,11 @@ class StandardSocket extends Socket { } /** - * @return -1 always + * @return the port or 0 if unknown */ @Override public int getLocalPort() { - return -1; + return _socket.getLocalPort(); } /** @@ -139,11 +139,11 @@ class StandardSocket extends Socket { } /** - * @return 0 always + * @return the port or 0 if unknown */ @Override public int getPort() { - return 0; + return _socket.getPort(); } @Override diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index d2d02021a0..d33e7a741e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -21,10 +21,10 @@ import net.i2p.util.SimpleTimer2; * */ class TCBShare { - private I2PAppContext _context; - private Log _log; - private Map _cache; - private CleanEvent _cleaner; + private final I2PAppContext _context; + private final Log _log; + private final Map _cache; + private final CleanEvent _cleaner; private static final long EXPIRE_TIME = 30*60*1000; private static final long CLEAN_TIME = 10*60*1000; From 252f1047e56fc80df8325135aa6afe03a2a85803 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 14 Jul 2011 18:53:58 +0000 Subject: [PATCH 2/6] javadocs and final --- core/java/src/net/i2p/client/I2PClient.java | 9 ++++-- .../src/net/i2p/client/I2PClientImpl.java | 13 +++++++++ core/java/src/net/i2p/client/I2PSession.java | 29 +++++++++++++++++-- .../i2p/client/I2PSessionDemultiplexer.java | 7 +++-- .../src/net/i2p/client/I2PSessionImpl.java | 2 ++ .../src/net/i2p/client/I2PSessionImpl2.java | 2 ++ .../net/i2p/client/I2PSessionMuxedImpl.java | 13 +++++---- 7 files changed, 62 insertions(+), 13 deletions(-) diff --git a/core/java/src/net/i2p/client/I2PClient.java b/core/java/src/net/i2p/client/I2PClient.java index adc03bb273..6521cf258f 100644 --- a/core/java/src/net/i2p/client/I2PClient.java +++ b/core/java/src/net/i2p/client/I2PClient.java @@ -44,7 +44,9 @@ public interface I2PClient { * using the specified options to both connect to the router, to instruct * the router how to handle the new session, and to configure the end to end * encryption. - * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from + * + * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties * @return new session allowing a Destination to recieve all of its messages and send messages to any other Destination. */ @@ -52,8 +54,10 @@ public interface I2PClient { /** Create a new destination with the default certificate creation properties and store * it, along with the private encryption and signing keys at the specified location + * * @param destKeyStream create a new destination and write out the object to the given stream, * formatted as Destination, PrivateKey, and SigningPrivateKey + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @return new destination */ public Destination createDestination(OutputStream destKeyStream) throws I2PException, IOException; @@ -61,7 +65,8 @@ public interface I2PClient { /** Create a new destination with the given certificate and store it, along with the private * encryption and signing keys at the specified location * - * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey + * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param cert certificate to tie to the destination * @return newly created destination */ diff --git a/core/java/src/net/i2p/client/I2PClientImpl.java b/core/java/src/net/i2p/client/I2PClientImpl.java index d0147f0d77..23033dbf0f 100644 --- a/core/java/src/net/i2p/client/I2PClientImpl.java +++ b/core/java/src/net/i2p/client/I2PClientImpl.java @@ -30,8 +30,12 @@ import net.i2p.data.SigningPublicKey; * @author jrandom */ class I2PClientImpl implements I2PClient { + /** * Create the destination with a null payload + * + * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} */ public Destination createDestination(OutputStream destKeyStream) throws I2PException, IOException { Certificate cert = new Certificate(); @@ -44,6 +48,8 @@ class I2PClientImpl implements I2PClient { * Create the destination with the given payload and write it out along with * the PrivateKey and SigningPrivateKey to the destKeyStream * + * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} */ public Destination createDestination(OutputStream destKeyStream, Certificate cert) throws I2PException, IOException { Destination d = new Destination(); @@ -67,13 +73,20 @@ class I2PClientImpl implements I2PClient { /** * Create a new session (though do not connect it yet) + * + * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties */ public I2PSession createSession(InputStream destKeyStream, Properties options) throws I2PSessionException { return createSession(I2PAppContext.getGlobalContext(), destKeyStream, options); } + /** * Create a new session (though do not connect it yet) + * + * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties */ public I2PSession createSession(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException { diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index d21c858d71..06b094488c 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -217,11 +217,34 @@ public interface I2PSession { */ public int[] bandwidthLimits() throws I2PSessionException; - /** See I2PSessionMuxedImpl for details */ + /** + * Listen on specified protocol and port. + * + * An existing listener with the same proto and port is replaced. + * Only the listener with the best match is called back for each message. + * + * @param proto 1-254 or PROTO_ANY (0) for all; recommended: + * I2PSession.PROTO_STREAMING + * I2PSession.PROTO_DATAGRAM + * 255 disallowed + * @param port 1-65535 or PORT_ANY (0) for all + * @since 0.7.1 + */ public void addSessionListener(I2PSessionListener lsnr, int proto, int port); - /** See I2PSessionMuxedImpl for details */ + + /** + * Listen on specified protocol and port, and receive notification + * of proto, fromPort, and toPort for every message. + * @param proto 1-254 or PROTO_ANY (0) for all; 255 disallowed + * @param port 1-65535 or PORT_ANY (0) for all + * @since 0.7.1 + */ public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port); - /** See I2PSessionMuxedImpl for details */ + + /** + * removes the specified listener (only) + * @since 0.7.1 + */ public void removeListener(int proto, int port); public static final int PORT_ANY = 0; diff --git a/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java b/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java index 28d211fa55..5d28d1a213 100644 --- a/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java +++ b/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java @@ -16,13 +16,14 @@ import net.i2p.util.Log; * depending on whether they want to hear about the * protocol, from port, and to port for every received message. * - * This only calls one listener, not all that apply. + * messageAvailable() only calls one listener, not all that apply. + * The others call all listeners. * * @author zzz */ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener { - private Log _log; - private Map _listeners; + private final Log _log; + private final Map _listeners; public I2PSessionDemultiplexer(I2PAppContext ctx) { _log = ctx.logManager().getLog(I2PSessionDemultiplexer.class); diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 0a29ff9d81..04dde2050e 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -159,6 +159,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey * from the destKeyStream, and using the specified options to connect to the router * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties * @throws I2PSessionException if there is a problem loading the private keys or */ diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 3af551eaaf..0f397f68e2 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -49,6 +49,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { * Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey * from the destKeyStream, and using the specified options to connect to the router * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties * @throws I2PSessionException if there is a problem loading the private keys or */ diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index fcf11d0daf..e125fc49b1 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -65,9 +65,12 @@ import net.i2p.util.SimpleScheduler; * @author zzz */ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { - private I2PSessionDemultiplexer _demultiplexer; + + private final I2PSessionDemultiplexer _demultiplexer; /* + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties */ public I2PSessionMuxedImpl(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { @@ -92,11 +95,11 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { * An existing listener with the same proto and port is replaced. * Only the listener with the best match is called back for each message. * - * @param proto 1-254 or PROTO_ANY for all; recommended: + * @param proto 1-254 or PROTO_ANY (0) for all; recommended: * I2PSession.PROTO_STREAMING * I2PSession.PROTO_DATAGRAM * 255 disallowed - * @param port 1-65535 or PORT_ANY for all + * @param port 1-65535 or PORT_ANY (0) for all */ @Override public void addSessionListener(I2PSessionListener lsnr, int proto, int port) { @@ -106,8 +109,8 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { /** * Listen on specified protocol and port, and receive notification * of proto, fromPort, and toPort for every message. - * @param proto 1-254 or 0 for all; 255 disallowed - * @param port 1-65535 or 0 for all + * @param proto 1-254 or PROTO_ANY (0) for all; 255 disallowed + * @param port 1-65535 or PORT_ANY (0) for all */ @Override public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) { From 79ac955b33d4b462ddb5479990dad3acfc7e3935 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 14 Jul 2011 20:06:31 +0000 Subject: [PATCH 3/6] * I2PTunnelIRCClient: - Big refactoring into multiple class files - Allow AWAY and CAP messages - First cut at DCC support - not for SOCKS (yet) --- .../i2p/i2ptunnel/I2PTunnelClientBase.java | 5 +- .../net/i2p/i2ptunnel/I2PTunnelIRCClient.java | 441 +++--------------- .../net/i2p/i2ptunnel/I2PTunnelServer.java | 29 +- .../i2p/i2ptunnel/irc/DCCClientManager.java | 105 +++++ .../src/net/i2p/i2ptunnel/irc/DCCHelper.java | 37 ++ .../i2p/i2ptunnel/irc/I2PTunnelDCCClient.java | 77 +++ .../i2p/i2ptunnel/irc/I2PTunnelDCCServer.java | 173 +++++++ .../src/net/i2p/i2ptunnel/irc/IRCFilter.java | 428 +++++++++++++++++ .../i2p/i2ptunnel/irc/IrcInboundFilter.java | 101 ++++ .../i2p/i2ptunnel/irc/IrcOutboundFilter.java | 101 ++++ .../i2ptunnel/socks/I2PSOCKSIRCTunnel.java | 7 +- 11 files changed, 1124 insertions(+), 380 deletions(-) create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCHelper.java create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IRCFilter.java create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index dadb2a58d1..1d98798bc4 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -193,7 +193,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna // no need to load the netDb with leaseSets for destinations that will never // be looked up - tunnel.getClientOptions().setProperty("i2cp.dontPublishLeaseSet", "true"); + boolean dccEnabled = (this instanceof I2PTunnelIRCClient) && + Boolean.valueOf(tunnel.getClientOptions().getProperty(I2PTunnelIRCClient.PROP_DCC)).booleanValue(); + if (!dccEnabled) + tunnel.getClientOptions().setProperty("i2cp.dontPublishLeaseSet", "true"); boolean openNow = !Boolean.valueOf(tunnel.getClientOptions().getProperty("i2cp.delayOpen")).booleanValue(); if (openNow) { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java index ebf6cc0a14..dc128d1c4a 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java @@ -10,7 +10,13 @@ import java.util.List; import java.util.StringTokenizer; import net.i2p.client.streaming.I2PSocket; +import net.i2p.data.Base32; import net.i2p.data.Destination; +import net.i2p.i2ptunnel.irc.DCCClientManager; +import net.i2p.i2ptunnel.irc.DCCHelper; +import net.i2p.i2ptunnel.irc.I2PTunnelDCCServer; +import net.i2p.i2ptunnel.irc.IrcInboundFilter; +import net.i2p.i2ptunnel.irc.IrcOutboundFilter; import net.i2p.util.EventDispatcher; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; @@ -18,7 +24,7 @@ import net.i2p.util.Log; /** * Todo: Can we extend I2PTunnelClient instead and remove some duplicated code? */ -public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable { +public class I2PTunnelIRCClient extends I2PTunnelClientBase implements DCCHelper { /** used to assign unique IDs to the threads / clients. no logic or functionality */ private static volatile long __clientId = 0; @@ -27,6 +33,14 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable protected List dests; private static final long DEFAULT_READ_TIMEOUT = 5*60*1000; // -1 protected long readTimeout = DEFAULT_READ_TIMEOUT; + private final boolean _dccEnabled; + private I2PTunnelDCCServer _DCCServer; + private DCCClientManager _DCCClientManager; + + /** + * @since 0.8.9 + */ + public static final String PROP_DCC = "i2ptunnel.ircclient.enableDCC"; /** * @throws IllegalArgumentException if the I2PTunnel does not contain @@ -75,6 +89,9 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable setName("IRC Client on " + tunnel.listenHost + ':' + localPort); + _dccEnabled = Boolean.valueOf(tunnel.getClientOptions().getProperty(PROP_DCC)).booleanValue(); + // TODO add some prudent tunnel options (or is it too late?) + startRunning(); notifyEvent("openIRCClientResult", "ok"); @@ -89,9 +106,9 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable i2ps = createI2PSocket(clientDest); i2ps.setReadTimeout(readTimeout); StringBuffer expectedPong = new StringBuffer(); - Thread in = new I2PAppThread(new IrcInboundFilter(s,i2ps, expectedPong, _log), "IRC Client " + __clientId + " in", true); + Thread in = new I2PAppThread(new IrcInboundFilter(s,i2ps, expectedPong, _log, this), "IRC Client " + __clientId + " in", true); in.start(); - Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log), "IRC Client " + __clientId + " out", true); + Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log, this), "IRC Client " + __clientId + " out", true); out.start(); } catch (Exception ex) { if (_log.shouldLog(Log.ERROR)) @@ -120,388 +137,62 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable return dests.get(index); } - /************************************************************************* - * - */ - public static class IrcInboundFilter implements Runnable { - - private final Socket local; - private final I2PSocket remote; - private final StringBuffer expectedPong; - private final Log _log; - - public IrcInboundFilter(Socket _local, I2PSocket _remote, StringBuffer pong, Log log) { - local=_local; - remote=_remote; - expectedPong=pong; - _log = log; - } - - public void run() { - // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but... - BufferedReader in; - OutputStream output; - try { - in = new BufferedReader(new InputStreamReader(remote.getInputStream(), "ISO-8859-1")); - output=local.getOutputStream(); - } catch (IOException e) { - if (_log.shouldLog(Log.ERROR)) - _log.error("IrcInboundFilter: no streams",e); - return; - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("IrcInboundFilter: Running."); - try { - while(true) - { - try { - String inmsg = in.readLine(); - if(inmsg==null) - break; - if(inmsg.endsWith("\r")) - inmsg=inmsg.substring(0,inmsg.length()-1); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("in: [" + inmsg + "]"); - String outmsg = inboundFilter(inmsg, expectedPong); - if(outmsg!=null) - { - if(!inmsg.equals(outmsg)) { - if (_log.shouldLog(Log.WARN)) { - _log.warn("inbound FILTERED: "+outmsg); - _log.warn(" - inbound was: "+inmsg); - } - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("inbound: "+outmsg); - } - outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 - output.write(outmsg.getBytes("ISO-8859-1")); - // probably doesn't do much but can't hurt - output.flush(); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("inbound BLOCKED: "+inmsg); - } - } catch (IOException e1) { - if (_log.shouldLog(Log.WARN)) - _log.warn("IrcInboundFilter: disconnected",e1); - break; - } - } - } catch (RuntimeException re) { - _log.error("Error filtering inbound data", re); - } finally { - try { local.close(); } catch (IOException e) {} - } - if(_log.shouldLog(Log.DEBUG)) - _log.debug("IrcInboundFilter: Done."); - } - - } - - /************************************************************************* - * - */ - public static class IrcOutboundFilter implements Runnable { - - private final Socket local; - private final I2PSocket remote; - private final StringBuffer expectedPong; - private final Log _log; - - public IrcOutboundFilter(Socket _local, I2PSocket _remote, StringBuffer pong, Log log) { - local=_local; - remote=_remote; - expectedPong=pong; - _log = log; - } - - public void run() { - // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but... - BufferedReader in; - OutputStream output; - try { - in = new BufferedReader(new InputStreamReader(local.getInputStream(), "ISO-8859-1")); - output=remote.getOutputStream(); - } catch (IOException e) { - if (_log.shouldLog(Log.ERROR)) - _log.error("IrcOutboundFilter: no streams",e); - return; - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("IrcOutboundFilter: Running."); - try { - while(true) - { - try { - String inmsg = in.readLine(); - if(inmsg==null) - break; - if(inmsg.endsWith("\r")) - inmsg=inmsg.substring(0,inmsg.length()-1); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("out: [" + inmsg + "]"); - String outmsg = outboundFilter(inmsg, expectedPong); - if(outmsg!=null) - { - if(!inmsg.equals(outmsg)) { - if (_log.shouldLog(Log.WARN)) { - _log.warn("outbound FILTERED: "+outmsg); - _log.warn(" - outbound was: "+inmsg); - } - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("outbound: "+outmsg); - } - outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 - output.write(outmsg.getBytes("ISO-8859-1")); - // save 250 ms in streaming - output.flush(); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("outbound BLOCKED: "+"\""+inmsg+"\""); - } - } catch (IOException e1) { - if (_log.shouldLog(Log.WARN)) - _log.warn("IrcOutboundFilter: disconnected",e1); - break; - } - } - } catch (RuntimeException re) { - _log.error("Error filtering outbound data", re); - } finally { - try { remote.close(); } catch (IOException e) {} - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("IrcOutboundFilter: Done."); + @Override + public boolean close(boolean forced) { + synchronized(this) { + if (_DCCServer != null) { + _DCCServer.close(forced); + _DCCServer = null; } } - - - /************************************************************************* - * - */ - - public static String inboundFilter(String s, StringBuffer expectedPong) { - - String field[]=s.split(" ",4); - String command; - int idx=0; - final String[] allowedCommands = - { - // "NOTICE", // can contain CTCP - //"PING", - //"PONG", - "MODE", - "JOIN", - "NICK", - "QUIT", - "PART", - "WALLOPS", - "ERROR", - "KICK", - "H", // "hide operator status" (after kicking an op) - "TOPIC" - }; - - if(field[0].charAt(0)==':') - idx++; - - try { command = field[idx++]; } - catch (IndexOutOfBoundsException ioobe) // wtf, server sent borked command? - { - //_log.warn("Dropping defective message: index out of bounds while extracting command."); - return null; - } - - idx++; //skip victim - - // Allow numerical responses - try { - new Integer(command); - return s; - } catch(NumberFormatException nfe){} - - - if ("PING".equalsIgnoreCase(command)) - return "PING 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works - if ("PONG".equalsIgnoreCase(command)) { - // Turn the received ":irc.freshcoffee.i2p PONG irc.freshcoffee.i2p :127.0.0.1" - // into ":127.0.0.1 PONG 127.0.0.1 " so that the caller can append the client's extra parameter - // though, does 127.0.0.1 work for irc clients connecting remotely? and for all of them? sure would - // be great if irc clients actually followed the RFCs here, but i guess thats too much to ask. - // If we haven't PINGed them, or the PING we sent isn't something we know how to filter, this - // is blank. - // - // String pong = expectedPong.length() > 0 ? expectedPong.toString() : null; - // If we aren't going to rewrite it, pass it through - String pong = expectedPong.length() > 0 ? expectedPong.toString() : s; - expectedPong.setLength(0); - return pong; - } - - // Allow all allowedCommands - for(int i=0;i= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':' - { - // CTCP - msg=msg.substring(2); - if(msg.startsWith("ACTION ")) { - // /me says hello - return s; - } - return null; // Block all other ctcp - } - return s; - } - - // Block the rest - return null; + return super.close(forced); } - - public static String outboundFilter(String s, StringBuffer expectedPong) { - - String field[]=s.split(" ",3); - String command; - final String[] allowedCommands = - { - // "NOTICE", // can contain CTCP - "MODE", - "JOIN", - "NICK", - "WHO", - "WHOIS", - "LIST", - "NAMES", - "NICK", - // "QUIT", // replace with a filtered QUIT to hide client quit messages - "SILENCE", - "MAP", // seems safe enough, the ircd should protect themselves though - // "PART", // replace with filtered PART to hide client part messages - "OPER", - // "PONG", // replaced with a filtered PING/PONG since some clients send the server IP (thanks aardvax!) - // "PING", - "KICK", - "HELPME", - "RULES", - "TOPIC", - "ISON", // jIRCii uses this for a ping (response is 303) - "INVITE" - }; - if(field[0].length()==0) - return null; // W T F? - - - if(field[0].charAt(0)==':') - return null; // wtf - - command = field[0].toUpperCase(); + // + // Start of the DCCHelper interface + // - if ("PING".equals(command)) { - // Most clients just send a PING and are happy with any old PONG. Others, - // like BitchX, actually expect certain behavior. It sends two different pings: - // "PING :irc.freshcoffee.i2p" and "PING 1234567890 127.0.0.1" (where the IP is the proxy) - // the PONG to the former seems to be "PONG 127.0.0.1", while the PONG to the later is - // ":irc.freshcoffee.i2p PONG irc.freshcoffe.i2p :1234567890". - // We don't want to send them our proxy's IP address, so we need to rewrite the PING - // sent to the server, but when we get a PONG back, use what we expected, rather than - // what they sent. - // - // Yuck. + public boolean isEnabled() { + return _dccEnabled; + } - String rv = null; - expectedPong.setLength(0); - if (field.length == 1) { // PING - rv = "PING"; - // If we aren't rewriting the PING don't rewrite the PONG - // expectedPong.append("PONG 127.0.0.1"); - } else if (field.length == 2) { // PING nonce - rv = "PING " + field[1]; - // If we aren't rewriting the PING don't rewrite the PONG - // expectedPong.append("PONG ").append(field[1]); - } else if (field.length == 3) { // PING nonce serverLocation - rv = "PING " + field[1]; - expectedPong.append("PONG ").append(field[2]).append(" :").append(field[1]); // PONG serverLocation nonce - } else { - //if (_log.shouldLog(Log.ERROR)) - // _log.error("IRC client sent a PING we don't understand, filtering it (\"" + s + "\")"); - rv = null; + public String getB32Hostname() { + return Base32.encode(sockMgr.getSession().getMyDestination().calculateHash().getData()) + ".b32.i2p"; + } + + + public int newOutgoing(byte[] ip, int port, String type) { + I2PTunnelDCCServer server; + synchronized(this) { + if (_DCCServer == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Starting DCC Server"); + _DCCServer = new I2PTunnelDCCServer(sockMgr, l, this, getTunnel()); + // TODO add some prudent tunnel options (or is it too late?) + _DCCServer.startRunning(); } - - //if (_log.shouldLog(Log.WARN)) - // _log.warn("sending ping [" + rv + "], waiting for [" + expectedPong + "] orig was [" + s + "]"); - - return rv; + server = _DCCServer; } - if ("PONG".equals(command)) - return "PONG 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works + int rv = server.newOutgoing(ip, port, type); + if (_log.shouldLog(Log.INFO)) + _log.info("New outgoing " + type + ' ' + port + " returns " + rv); + return rv; + } - // Allow all allowedCommands - for(int i=0;i= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':' - { - // CTCP - msg=msg.substring(2); - if(msg.startsWith("ACTION ")) { - // /me says hello - return s; - } - return null; // Block all other ctcp + public int newIncoming(String b32, int port, String type) { + DCCClientManager tracker; + synchronized(this) { + if (_DCCClientManager == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Starting DCC Client"); + _DCCClientManager = new DCCClientManager(sockMgr, l, this, getTunnel()); } - return s; + tracker = _DCCClientManager; } - - if("USER".equals(command)) { - int idx = field[2].lastIndexOf(":"); - if(idx<0) - return "USER user hostname localhost :realname"; - String realname = field[2].substring(idx+1); - String ret = "USER "+field[1]+" hostname localhost :"+realname; - return ret; - } - - if ("PART".equals(command)) { - // hide client message - return "PART " + field[1] + " :leaving"; - } - - if ("QUIT".equals(command)) { - return "QUIT :leaving"; - } - - // Block the rest - return null; + // The tracker starts our client + int rv = tracker.newIncoming(b32, port, type); + if (_log.shouldLog(Log.INFO)) + _log.info("New incoming " + type + ' ' + b32 + ' ' + port + " returns " + rv); + return rv; } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 6947b520f7..94edfac46a 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -47,7 +47,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected int remotePort; private boolean _usePool; - private Logging l; + protected Logging l; private static final long DEFAULT_READ_TIMEOUT = -1; // 3*60*1000; /** default timeout to 3 minutes - override if desired */ @@ -69,10 +69,13 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected boolean bidir = false; private ThreadPoolExecutor _executor; + /** unused? port should always be specified */ private int DEFAULT_LOCALPORT = 4488; protected int localPort = DEFAULT_LOCALPORT; /** + * @param privData Base64-encoded private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ @@ -84,6 +87,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } /** + * @param privkey file containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param privkeyname the name of the privKey file, not clear why we need this too * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ @@ -105,6 +111,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } /** + * @param privData stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param privkeyname the name of the privKey file, not clear why we need this too * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ @@ -114,10 +123,28 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { init(host, port, privData, privkeyname, l); } + /** + * @param sktMgr the existing socket manager + * @since 0.8.9 + */ + public I2PTunnelServer(InetAddress host, int port, I2PSocketManager sktMgr, + Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { + super("Server at " + host + ':' + port, notifyThis, tunnel); + this.l = l; + this.remoteHost = host; + this.remotePort = port; + _log = tunnel.getContext().logManager().getLog(getClass()); + sockMgr = sktMgr; + open = true; + } + private static final int RETRY_DELAY = 20*1000; private static final int MAX_RETRIES = 4; /** + * @param privData stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param privkeyname the name of the privKey file, not clear why we need this too * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java new file mode 100644 index 0000000000..ac9ed249b2 --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java @@ -0,0 +1,105 @@ +package net.i2p.i2ptunnel.irc; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import net.i2p.client.streaming.I2PSocketManager; +import net.i2p.i2ptunnel.I2PTunnel; +import net.i2p.i2ptunnel.Logging; +import net.i2p.util.EventDispatcher; +import net.i2p.util.Log; + +/** + * Start, track, and expire the I2PTunnelDCCClients. + * + *
    + *
    + *                <---  I2PTunnelDCCServer <--------------- I2PTunnelDCCClient <----
    + *   originating                                                                     responding
    + *   chat client                                                                     chat client
    + *                ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
    + *
    + * 
    + * + * @since 0.8.9 + */ +public class DCCClientManager { + + private final I2PSocketManager sockMgr; + private final EventDispatcher _dispatch; + private final Logging l; + private final I2PTunnel _tunnel; + private final Log _log; + + private final ConcurrentHashMap _incoming; + // list of client tunnels? + private static long _id; + + private static final int MAX_INCOMING_PENDING = 10; + private static final int MAX_INCOMING_ACTIVE = 10; + private static final long INBOUND_EXPIRE = 30*60*1000; + + public DCCClientManager(I2PSocketManager sktMgr, Logging logging, + EventDispatcher dispatch, I2PTunnel tunnel) { + sockMgr = sktMgr; + l = logging; + _dispatch = dispatch; + _tunnel = tunnel; + _log = tunnel.getContext().logManager().getLog(DCCClientManager.class); + _incoming = new ConcurrentHashMap(8); + } + + /** + * An incoming DCC request + * + * @param b32 remote dcc server address + * @param port remote dcc server port + * @param type ignored + * @return local server port or -1 on error + */ + public int newIncoming(String b32, int port, String type) { + expireInbound(); + if (_incoming.size() >= MAX_INCOMING_PENDING) { + _log.error("Too many incoming DCC, max is " + MAX_INCOMING_PENDING); + return -1; + } + I2PAddress client = new I2PAddress(b32, port, _tunnel.getContext().clock().now() + INBOUND_EXPIRE); + try { + // Transparent tunnel used for all types... + // Do we need to do any filtering for chat? + I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, port, l, sockMgr, + _dispatch, _tunnel, ++_id); + int lport = cTunnel.getLocalPort(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Opened client tunnel at port " + lport + + " pointing to " + b32 + ':' + port); + _incoming.put(Integer.valueOf(lport), client); + return lport; + } catch (IllegalArgumentException uhe) { + l.log("Could not find listen host to bind to [" + _tunnel.host + "]"); + _log.error("Error finding host to bind", uhe); + return -1; + } + } + + private void expireInbound() { + for (Iterator iter = _incoming.values().iterator(); iter.hasNext(); ) { + I2PAddress a = iter.next(); + if (a.expire < _tunnel.getContext().clock().now()) + iter.remove(); + } + } + + private static class I2PAddress { + public final String dest; + public final int port; + public final long expire; + + public I2PAddress(String b32, int p, long exp) { + dest = b32; + port = p; + expire = exp; + } + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCHelper.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCHelper.java new file mode 100644 index 0000000000..c5162f148e --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCHelper.java @@ -0,0 +1,37 @@ +package net.i2p.i2ptunnel.irc; + +/** + * Hooks to create and maintain DCC client and server tunnels + * + * @since 0.8.9 + */ +public interface DCCHelper { + + public boolean isEnabled(); + + /** + * String to put in the outgoing DCC + */ + public String getB32Hostname(); + + /** + * An outgoing DCC request + * + * @param ip local irc client IP + * @param port local irc client port + * @param type string + * @return i2p port or -1 on error + */ + public int newOutgoing(byte[] ip, int port, String type); + + /** + * An incoming DCC request + * + * @param b32 remote dcc server address + * @param port remote dcc server port + * @param type string + * @return local server port or -1 on error + */ + public int newIncoming(String b32, int port, String type); + +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java new file mode 100644 index 0000000000..e6a9c96cd7 --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java @@ -0,0 +1,77 @@ +/* I2PTunnel is GPL'ed (with the exception mentioned in I2PTunnel.java) + * (c) 2003 - 2004 mihi + */ +package net.i2p.i2ptunnel.irc; + +import java.net.Socket; +import java.io.IOException; + +import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketManager; +import net.i2p.client.streaming.I2PSocketOptions; +import net.i2p.data.Destination; +import net.i2p.i2ptunnel.I2PTunnel; +import net.i2p.i2ptunnel.I2PTunnelClientBase; +import net.i2p.i2ptunnel.I2PTunnelRunner; +import net.i2p.i2ptunnel.Logging; +import net.i2p.util.EventDispatcher; +import net.i2p.util.Log; + +/** + * A standard client, using an existing socket manager. + * Targets a single destination and port. + * Naming resolution is delayed until connect time. + * + * @since 0.8.9 + */ +public class I2PTunnelDCCClient extends I2PTunnelClientBase { + + // delay resolution until connect time + private final String _dest; + private final int _remotePort; + + /** + * @param dest the target, presumably b32 + * @throws IllegalArgumentException if the I2PTunnel does not contain + * valid config to contact the router + */ + public I2PTunnelDCCClient(String dest, int remotePort, Logging l, + I2PSocketManager sktMgr, EventDispatcher notifyThis, + I2PTunnel tunnel, long clientId) throws IllegalArgumentException { + super(0, l, sktMgr, tunnel, notifyThis, clientId); + _dest = dest; + _remotePort = remotePort; + + setName("DCC send -> " + dest + ':' + remotePort); + + startRunning(); + + notifyEvent("openClientResult", "ok"); + } + + protected void clientConnectionRun(Socket s) { + I2PSocket i2ps = null; + if (_log.shouldLog(Log.INFO)) + _log.info("Opening DCC connection to " + _dest + ':' + _remotePort); + Destination dest = _context.namingService().lookup(_dest); + if (dest == null) { + _log.error("Could not find leaseset for DCC connection to " + _dest + ':' + _remotePort); + closeSocket(s); + // shutdown? + return; + } + + I2PSocketOptions opts = sockMgr.buildOptions(); + opts.setPort(_remotePort); + try { + i2ps = createI2PSocket(dest, opts); + new I2PTunnelRunner(s, i2ps, sockLock, null, mySockets); + } catch (Exception ex) { + _log.error("Could not make DCC connection to " + _dest + ':' + _remotePort, ex); + closeSocket(s); + if (i2ps != null) { + try { i2ps.close(); } catch (IOException ioe) {} + } + } + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java new file mode 100644 index 0000000000..d2a16cee9a --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java @@ -0,0 +1,173 @@ +package net.i2p.i2ptunnel.irc; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketManager; +import net.i2p.i2ptunnel.I2PTunnel; +import net.i2p.i2ptunnel.I2PTunnelRunner; +import net.i2p.i2ptunnel.I2PTunnelServer; +import net.i2p.i2ptunnel.Logging; +import net.i2p.util.EventDispatcher; +import net.i2p.util.Log; + +/** + * A standard server that only answers for registered ports, + * and each port can only be used once. + * + *
    + *
    + *                <---  I2PTunnelDCCServer <--------------- I2PTunnelDCCClient <----
    + *   originating                                                                     responding
    + *   chat client                                                                     chat client
    + *                ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
    + *
    + * 
    + * + * @since 0.8.9 + */ +public class I2PTunnelDCCServer extends I2PTunnelServer { + + private final ConcurrentHashMap _outgoing; + // list of client tunnels? + private static long _id; + + /** just to keep super() happy */ + private static final InetAddress DUMMY; + static { + InetAddress dummy = null; + try { + dummy = InetAddress.getByAddress(new byte[4]); + } catch (UnknownHostException uhe) {} + DUMMY = dummy; + } + + private static final int MIN_I2P_PORT = 1; + private static final int MAX_I2P_PORT = 65535; + private static final int MAX_OUTGOING_PENDING = 20; + private static final int MAX_OUTGOING_ACTIVE = 20; + private static final long OUTBOUND_EXPIRE = 30*60*1000; + + /** + * There's no support for unsolicited incoming I2P connections, + * so there's no server host or port parameters. + * + * @param sktMgr an existing socket manager + * @throws IllegalArgumentException if the I2PTunnel does not contain + * valid config to contact the router + */ + public I2PTunnelDCCServer(I2PSocketManager sktMgr, Logging l, + EventDispatcher notifyThis, I2PTunnel tunnel) { + super(DUMMY, 0, sktMgr, l, notifyThis, tunnel); + _outgoing = new ConcurrentHashMap(8); + } + + /** + * An incoming DCC connection, only accept for a known port. + * Passed through without filtering. + */ + @Override + protected void blockingHandle(I2PSocket socket) { + if (_log.shouldLog(Log.INFO)) + _log.info("Incoming connection to '" + toString() + "' from: " + socket.getPeerDestination().calculateHash().toBase64()); + + try { + expireOutbound(); + int myPort = socket.getLocalPort(); + // TODO remove, add to active + LocalAddress local = _outgoing.get(Integer.valueOf(myPort)); + if (local == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Incoming DCC connection for unknown port " + myPort); + try { + socket.close(); + } catch (IOException ioe) {} + return; + } + if (_log.shouldLog(Log.WARN)) + _log.warn("Incoming DCC connection for I2P port " + myPort + + " sending to " + local.ia + ':' + local.port); + Socket s = new Socket(local.ia, local.port); + new I2PTunnelRunner(s, socket, slock, null, null); + } catch (SocketException ex) { + try { + socket.close(); + } catch (IOException ioe) {} + if (_log.shouldLog(Log.ERROR)) + _log.error("Error connecting to server " + remoteHost + ':' + remotePort, ex); + } catch (IOException ex) { + _log.error("Error while waiting for I2PConnections", ex); + } + } + + /** + * An outgoing DCC request + * + * @param ip local irc client IP + * @param port local irc client port + * @param type ignored + * @return i2p port or -1 on error + */ + public int newOutgoing(byte[] ip, int port, String type) { + expireOutbound(); + if (_outgoing.size() >= MAX_OUTGOING_PENDING) { + _log.error("Too many outgoing DCC, max is " + MAX_OUTGOING_PENDING); + return -1; + } + InetAddress ia; + try { + ia = InetAddress.getByAddress(ip); + } catch (UnknownHostException uhe) { + return -1; + } + LocalAddress client = new LocalAddress(ia, port, getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE); + for (int i = 0; i < 10; i++) { + int iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT); + LocalAddress old = _outgoing.putIfAbsent(Integer.valueOf(iport), client); + if (old != null) + continue; + // TODO expire in a few minutes + return iport; + } + // couldn't find an unused i2p port + return -1; + } + + private InetAddress getListenHost(Logging l) { + try { + return InetAddress.getByName(getTunnel().listenHost); + } catch (UnknownHostException uhe) { + l.log("Could not find listen host to bind to [" + getTunnel().host + "]"); + _log.error("Error finding host to bind", uhe); + notifyEvent("openBaseClientResult", "error"); + return null; + } + } + + private void expireOutbound() { + for (Iterator iter = _outgoing.values().iterator(); iter.hasNext(); ) { + LocalAddress a = iter.next(); + if (a.expire < getTunnel().getContext().clock().now()) + iter.remove(); + } + } + + private static class LocalAddress { + public final InetAddress ia; + public final int port; + public final long expire; + + public LocalAddress(InetAddress a, int p, long exp) { + ia = a; + port = p; + expire = exp; + } + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IRCFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IRCFilter.java new file mode 100644 index 0000000000..2f386225de --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IRCFilter.java @@ -0,0 +1,428 @@ +package net.i2p.i2ptunnel.irc; + +import net.i2p.data.DataHelper; + + +/** + * Static methods to filter individual lines. + * Moved from I2PTunnelIRCClient.java + * + * @since 0.8.9 + */ +abstract class IRCFilter { + + private static final boolean ALLOW_ALL_DCC_IN = false; + private static final boolean ALLOW_ALL_DCC_OUT = false; + /** does not override DCC handling */ + private static final boolean ALLOW_ALL_CTCP_IN = false; + /** does not override DCC handling */ + private static final boolean ALLOW_ALL_CTCP_OUT = false; + + /************************************************************************* + * + * Modify or filter a single inbound line. + * + * @param helper may be null + * @return the original or modified line, or null if it should be dropped. + */ + public static String inboundFilter(String s, StringBuffer expectedPong, DCCHelper helper) { + + String field[]=s.split(" ",4); + String command; + int idx=0; + final String[] allowedCommands = + { + // "NOTICE", // can contain CTCP + //"PING", + //"PONG", + "MODE", + "JOIN", + "NICK", + "QUIT", + "PART", + "WALLOPS", + "ERROR", + "KICK", + "H", // "hide operator status" (after kicking an op) + "TOPIC", + // http://tools.ietf.org/html/draft-mitchell-irc-capabilities-01 + "CAP" + }; + + if(field[0].charAt(0)==':') + idx++; + + try { command = field[idx++]; } + catch (IndexOutOfBoundsException ioobe) // wtf, server sent borked command? + { + //_log.warn("Dropping defective message: index out of bounds while extracting command."); + return null; + } + + idx++; //skip victim + + // Allow numerical responses + try { + new Integer(command); + return s; + } catch(NumberFormatException nfe){} + + + if ("PING".equalsIgnoreCase(command)) + return "PING 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works + if ("PONG".equalsIgnoreCase(command)) { + // Turn the received ":irc.freshcoffee.i2p PONG irc.freshcoffee.i2p :127.0.0.1" + // into ":127.0.0.1 PONG 127.0.0.1 " so that the caller can append the client's extra parameter + // though, does 127.0.0.1 work for irc clients connecting remotely? and for all of them? sure would + // be great if irc clients actually followed the RFCs here, but i guess thats too much to ask. + // If we haven't PINGed them, or the PING we sent isn't something we know how to filter, this + // is blank. + // + // String pong = expectedPong.length() > 0 ? expectedPong.toString() : null; + // If we aren't going to rewrite it, pass it through + String pong = expectedPong.length() > 0 ? expectedPong.toString() : s; + expectedPong.setLength(0); + return pong; + } + + // Allow all allowedCommands + for(int i=0;i= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':' + { + // CTCP + + // don't even try to parse multiple CTCP in the same message + int count = 0; + for (int i = 0; i < msg.length(); i++) { + if (msg.charAt(i) == 0x01) + count++; + } + if (count != 2) + return null; + + msg=msg.substring(2); + if(msg.startsWith("ACTION ")) { + // /me says hello + return s; + } + if (msg.startsWith("DCC ")) { + StringBuilder buf = new StringBuilder(128); + for (int i = 0; i <= idx - 2; i++) { + buf.append(field[i]).append(' '); + } + buf.append(":\001DCC "); + return filterDCCIn(buf.toString(), msg.substring(4), helper); + } + if (ALLOW_ALL_CTCP_IN) + return s; + return null; // Block all other ctcp + } + return s; + } + + // Block the rest + return null; + } + + /************************************************************************* + * + * Modify or filter a single outbound line. + * + * @param helper may be null + * @return the original or modified line, or null if it should be dropped. + */ + public static String outboundFilter(String s, StringBuffer expectedPong, DCCHelper helper) { + + String field[]=s.split(" ",3); + String command; + final String[] allowedCommands = + { + // "NOTICE", // can contain CTCP + "MODE", + "JOIN", + "NICK", + "WHO", + "WHOIS", + "LIST", + "NAMES", + "NICK", + // "QUIT", // replace with a filtered QUIT to hide client quit messages + "SILENCE", + "MAP", // seems safe enough, the ircd should protect themselves though + // "PART", // replace with filtered PART to hide client part messages + "OPER", + // "PONG", // replaced with a filtered PING/PONG since some clients send the server IP (thanks aardvax!) + // "PING", + "KICK", + "HELPME", + "RULES", + "TOPIC", + "ISON", // jIRCii uses this for a ping (response is 303) + "INVITE", + "AWAY", // should be harmless + // http://tools.ietf.org/html/draft-mitchell-irc-capabilities-01 + "CAP" + }; + + if(field[0].length()==0) + return null; // W T F? + + + if(field[0].charAt(0)==':') + return null; // wtf + + command = field[0].toUpperCase(); + + if ("PING".equals(command)) { + // Most clients just send a PING and are happy with any old PONG. Others, + // like BitchX, actually expect certain behavior. It sends two different pings: + // "PING :irc.freshcoffee.i2p" and "PING 1234567890 127.0.0.1" (where the IP is the proxy) + // the PONG to the former seems to be "PONG 127.0.0.1", while the PONG to the later is + // ":irc.freshcoffee.i2p PONG irc.freshcoffe.i2p :1234567890". + // We don't want to send them our proxy's IP address, so we need to rewrite the PING + // sent to the server, but when we get a PONG back, use what we expected, rather than + // what they sent. + // + // Yuck. + + String rv = null; + expectedPong.setLength(0); + if (field.length == 1) { // PING + rv = "PING"; + // If we aren't rewriting the PING don't rewrite the PONG + // expectedPong.append("PONG 127.0.0.1"); + } else if (field.length == 2) { // PING nonce + rv = "PING " + field[1]; + // If we aren't rewriting the PING don't rewrite the PONG + // expectedPong.append("PONG ").append(field[1]); + } else if (field.length == 3) { // PING nonce serverLocation + rv = "PING " + field[1]; + expectedPong.append("PONG ").append(field[2]).append(" :").append(field[1]); // PONG serverLocation nonce + } else { + //if (_log.shouldLog(Log.ERROR)) + // _log.error("IRC client sent a PING we don't understand, filtering it (\"" + s + "\")"); + rv = null; + } + + //if (_log.shouldLog(Log.WARN)) + // _log.warn("sending ping [" + rv + "], waiting for [" + expectedPong + "] orig was [" + s + "]"); + + return rv; + } + if ("PONG".equals(command)) + return "PONG 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works + + // Allow all allowedCommands + for(int i=0;i= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':' + { + // CTCP + + // don't even try to parse multiple CTCP in the same message + int count = 0; + for (int i = 0; i < msg.length(); i++) { + if (msg.charAt(i) == 0x01) + count++; + } + if (count != 2) + return null; + + msg=msg.substring(2); + if(msg.startsWith("ACTION ")) { + // /me says hello + return s; + } + if (msg.startsWith("DCC ")) + return filterDCCOut(field[0] + ' ' + field[1] + " :\001DCC ", msg.substring(4), helper); + if (ALLOW_ALL_CTCP_OUT) + return s; + return null; // Block all other ctcp + } + return s; + } + + if("USER".equals(command)) { + int idx = field[2].lastIndexOf(":"); + if(idx<0) + return "USER user hostname localhost :realname"; + String realname = field[2].substring(idx+1); + String ret = "USER "+field[1]+" hostname localhost :"+realname; + return ret; + } + + if ("PART".equals(command)) { + // hide client message + return "PART " + field[1] + " :leaving"; + } + + if ("QUIT".equals(command)) { + return "QUIT :leaving"; + } + + // Block the rest + return null; + } + + /** + * @param pfx the message through the "DCC " part + * @param msg the message after the "DCC " part + * @param helper may be null + * @return the sanitized message or null to block + * @since 0.8.9 + */ + private static String filterDCCIn(String pfx, String msg, DCCHelper helper) { + // strip trailing ctcp (other one is in pfx) + int ctcp = msg.indexOf(0x01); + if (ctcp > 0) + msg = msg.substring(0, ctcp); + String[] args = msg.split(" ", 5); + if (args.length <= 0) + return null; + String type = args[0]; + // no IP in these but port needs to be fixed still + //if (type == "RESUME" || type == "ACCEPT") + // return msg; + if (!(type.equals("CHAT") || type.equals("SEND"))) { + if (ALLOW_ALL_DCC_IN) { + if (ctcp > 0) + return pfx + msg + (char) 0x01; + return pfx + msg; + } + return null; + } + if (helper == null || !helper.isEnabled()) + return null; + if (args.length < 4) + return null; + String arg = args[1]; + String b32 = args[2]; + int cPort; + try { + String cp = args[3]; + cPort = Integer.parseInt(cp); + } catch (NumberFormatException nfe) { + return null; + } + + int port = helper.newIncoming(b32, cPort, type); + if (port < 0) + return null; + StringBuilder buf = new StringBuilder(256); + // fixme what is our address? + byte[] myIP = { 127, 0, 0, 1 }; + buf.append(pfx) + .append(type).append(' ').append(arg).append(' ') + .append(DataHelper.fromLong(myIP, 0, myIP.length)).append(' ') + .append(port); + if (args.length > 4) + buf.append(' ').append(args[4]); + if (pfx.indexOf(0x01) >= 0) + buf.append((char) 0x01); + return buf.toString(); + } + + /** + * @param pfx the message through the "DCC " part + * @param msg the message after the "DCC " part + * @param helper may be null + * @return the sanitized message or null to block + * @since 0.8.9 + */ + private static String filterDCCOut(String pfx, String msg, DCCHelper helper) { + // strip trailing ctcp (other one is in pfx) + int ctcp = msg.indexOf(0x01); + if (ctcp > 0) + msg = msg.substring(0, ctcp); + String[] args = msg.split(" ", 5); + if (args.length <= 0) + return null; + String type = args[0]; + // no IP in these but port needs to be fixed still + //if (type == "RESUME" || type == "ACCEPT") + // return msg; + if (!(type.equals("CHAT") || type.equals("SEND"))) { + if (ALLOW_ALL_DCC_OUT) { + if (ctcp > 0) + return pfx + msg + (char) 0x01; + return pfx + msg; + } + } + if (helper == null || !helper.isEnabled()) + return null; + if (args.length < 4) + return null; + String arg = args[1]; + byte[] ip; + try { + String ips = args[2]; + long ipl = Long.parseLong(ips); + if (ipl < 0x01000000) { + // "reverse/firewall DCC" + // http://en.wikipedia.org/wiki/Direct_Client-to-Client + // xchat sends an IP of 199 and a port of 0 + System.err.println("Reverse / Firewall DCC not supported IP = 0x" + Long.toHexString(ipl)); + return null; + } + ip = DataHelper.toLong(4, ipl); + } catch (NumberFormatException nfe) { + return null; + } + int cPort; + try { + String cp = args[3]; + cPort = Integer.parseInt(cp); + } catch (NumberFormatException nfe) { + return null; + } + if (cPort <= 0) { + // "reverse/firewall DCC" + // http://en.wikipedia.org/wiki/Direct_Client-to-Client + System.err.println("Reverse / Firewall DCC not supported"); + return null; + } + int port = helper.newOutgoing(ip, cPort, type); + if (port < 0) + return null; + StringBuilder buf = new StringBuilder(256); + buf.append(pfx) + .append(type).append(' ').append(arg).append(' ') + .append(helper.getB32Hostname()).append(' ') + .append(port); + if (args.length > 4) + buf.append(' ').append(args[4]); + if (pfx.indexOf(0x01) >= 0) + buf.append((char) 0x01); + return buf.toString(); + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java new file mode 100644 index 0000000000..ce301a48e8 --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java @@ -0,0 +1,101 @@ +package net.i2p.i2ptunnel.irc; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; + +import net.i2p.client.streaming.I2PSocket; +import net.i2p.util.Log; + +/** + * Thread to do inbound filtering. + * Moved from I2PTunnelIRCClient.java + * + * @since 0.8.9 + */ +public class IrcInboundFilter implements Runnable { + + private final Socket local; + private final I2PSocket remote; + private final StringBuffer expectedPong; + private final Log _log; + private final DCCHelper _dccHelper; + + public IrcInboundFilter(Socket lcl, I2PSocket rem, StringBuffer pong, Log log) { + this(lcl, rem, pong, log, null); + } + + /** + * @param helper may be null + * @since 0.8.9 + */ + public IrcInboundFilter(Socket lcl, I2PSocket rem, StringBuffer pong, Log log, DCCHelper helper) { + local = lcl; + remote = rem; + expectedPong = pong; + _log = log; + _dccHelper = helper; + } + + public void run() { + // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but... + BufferedReader in; + OutputStream output; + try { + in = new BufferedReader(new InputStreamReader(remote.getInputStream(), "ISO-8859-1")); + output=local.getOutputStream(); + } catch (IOException e) { + if (_log.shouldLog(Log.ERROR)) + _log.error("IrcInboundFilter: no streams",e); + return; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("IrcInboundFilter: Running."); + try { + while(true) + { + try { + String inmsg = in.readLine(); + if(inmsg==null) + break; + if(inmsg.endsWith("\r")) + inmsg=inmsg.substring(0,inmsg.length()-1); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("in: [" + inmsg + "]"); + String outmsg = IRCFilter.inboundFilter(inmsg, expectedPong, _dccHelper); + if(outmsg!=null) + { + if(!inmsg.equals(outmsg)) { + if (_log.shouldLog(Log.WARN)) { + _log.warn("inbound FILTERED: "+outmsg); + _log.warn(" - inbound was: "+inmsg); + } + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("inbound: "+outmsg); + } + outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 + output.write(outmsg.getBytes("ISO-8859-1")); + // probably doesn't do much but can't hurt + output.flush(); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("inbound BLOCKED: "+inmsg); + } + } catch (IOException e1) { + if (_log.shouldLog(Log.WARN)) + _log.warn("IrcInboundFilter: disconnected",e1); + break; + } + } + } catch (RuntimeException re) { + _log.error("Error filtering inbound data", re); + } finally { + try { local.close(); } catch (IOException e) {} + } + if(_log.shouldLog(Log.DEBUG)) + _log.debug("IrcInboundFilter: Done."); + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java new file mode 100644 index 0000000000..ce68835c2e --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java @@ -0,0 +1,101 @@ +package net.i2p.i2ptunnel.irc; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; + +import net.i2p.client.streaming.I2PSocket; +import net.i2p.util.Log; + +/** + * Thread to do inbound filtering. + * Moved from I2PTunnelIRCClient.java + * + * @since 0.8.9 + */ +public class IrcOutboundFilter implements Runnable { + + private final Socket local; + private final I2PSocket remote; + private final StringBuffer expectedPong; + private final Log _log; + private final DCCHelper _dccHelper; + + public IrcOutboundFilter(Socket lcl, I2PSocket rem, StringBuffer pong, Log log) { + this(lcl, rem, pong, log, null); + } + + /** + * @param helper may be null + * @since 0.8.9 + */ + public IrcOutboundFilter(Socket lcl, I2PSocket rem, StringBuffer pong, Log log, DCCHelper helper) { + local = lcl; + remote = rem; + expectedPong = pong; + _log = log; + _dccHelper = helper; + } + + public void run() { + // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but... + BufferedReader in; + OutputStream output; + try { + in = new BufferedReader(new InputStreamReader(local.getInputStream(), "ISO-8859-1")); + output=remote.getOutputStream(); + } catch (IOException e) { + if (_log.shouldLog(Log.ERROR)) + _log.error("IrcOutboundFilter: no streams",e); + return; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("IrcOutboundFilter: Running."); + try { + while(true) + { + try { + String inmsg = in.readLine(); + if(inmsg==null) + break; + if(inmsg.endsWith("\r")) + inmsg=inmsg.substring(0,inmsg.length()-1); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("out: [" + inmsg + "]"); + String outmsg = IRCFilter.outboundFilter(inmsg, expectedPong, _dccHelper); + if(outmsg!=null) + { + if(!inmsg.equals(outmsg)) { + if (_log.shouldLog(Log.WARN)) { + _log.warn("outbound FILTERED: "+outmsg); + _log.warn(" - outbound was: "+inmsg); + } + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("outbound: "+outmsg); + } + outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 + output.write(outmsg.getBytes("ISO-8859-1")); + // save 250 ms in streaming + output.flush(); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("outbound BLOCKED: "+"\""+inmsg+"\""); + } + } catch (IOException e1) { + if (_log.shouldLog(Log.WARN)) + _log.warn("IrcOutboundFilter: disconnected",e1); + break; + } + } + } catch (RuntimeException re) { + _log.error("Error filtering outbound data", re); + } finally { + try { remote.close(); } catch (IOException e) {} + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("IrcOutboundFilter: Done."); + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java index a197636781..ee5156f8db 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java @@ -11,7 +11,8 @@ import java.net.Socket; import net.i2p.I2PAppContext; import net.i2p.client.streaming.I2PSocket; import net.i2p.i2ptunnel.I2PTunnel; -import net.i2p.i2ptunnel.I2PTunnelIRCClient; +import net.i2p.i2ptunnel.irc.IrcInboundFilter; +import net.i2p.i2ptunnel.irc.IrcOutboundFilter; import net.i2p.i2ptunnel.Logging; import net.i2p.util.EventDispatcher; import net.i2p.util.I2PAppThread; @@ -50,10 +51,10 @@ public class I2PSOCKSIRCTunnel extends I2PSOCKSTunnel { Socket clientSock = serv.getClientSocket(); I2PSocket destSock = serv.getDestinationI2PSocket(this); StringBuffer expectedPong = new StringBuffer(); - Thread in = new I2PAppThread(new I2PTunnelIRCClient.IrcInboundFilter(clientSock, destSock, expectedPong, _log), + Thread in = new I2PAppThread(new IrcInboundFilter(clientSock, destSock, expectedPong, _log), "SOCKS IRC Client " + (++__clientId) + " in", true); in.start(); - Thread out = new I2PAppThread(new I2PTunnelIRCClient.IrcOutboundFilter(clientSock, destSock, expectedPong, _log), + Thread out = new I2PAppThread(new IrcOutboundFilter(clientSock, destSock, expectedPong, _log), "SOCKS IRC Client " + __clientId + " out", true); out.start(); } catch (SOCKSException e) { From 55bfd6aa2d90806e484ee5df37500175a1b09291 Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 15 Jul 2011 20:47:49 +0000 Subject: [PATCH 4/6] concurrentify --- .../src/net/i2p/util/EventDispatcherImpl.java | 84 +++++-------------- 1 file changed, 21 insertions(+), 63 deletions(-) diff --git a/core/java/src/net/i2p/util/EventDispatcherImpl.java b/core/java/src/net/i2p/util/EventDispatcherImpl.java index 306925aff5..700cf8e0f5 100644 --- a/core/java/src/net/i2p/util/EventDispatcherImpl.java +++ b/core/java/src/net/i2p/util/EventDispatcherImpl.java @@ -9,13 +9,13 @@ package net.i2p.util; * */ -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.ListIterator; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; /** * An implementation of the EventDispatcher interface. Since Java @@ -34,11 +34,9 @@ import java.util.Set; */ public class EventDispatcherImpl implements EventDispatcher { - //private final static Log _log = new Log(EventDispatcherImpl.class); - private boolean _ignore = false; - private final HashMap _events = new HashMap(4); - private final ArrayList _attached = new ArrayList(); + private final Map _events = new ConcurrentHashMap(4); + private final List _attached = new CopyOnWriteArrayList(); public EventDispatcher getEventDispatcher() { return this; @@ -46,23 +44,12 @@ public class EventDispatcherImpl implements EventDispatcher { public void attachEventDispatcher(EventDispatcher ev) { if (ev == null) return; - synchronized (_attached) { - //_log.debug(this.hashCode() + ": attaching EventDispatcher " + ev.hashCode()); - _attached.add(ev); - } + _attached.add(ev); } public void detachEventDispatcher(EventDispatcher ev) { if (ev == null) return; - synchronized (_attached) { - ListIterator it = _attached.listIterator(); - while (it.hasNext()) { - if (((EventDispatcher) it.next()) == ev) { - it.remove(); - break; - } - } - } + _attached.remove(ev); } public void notifyEvent(String eventName, Object args) { @@ -70,50 +57,28 @@ public class EventDispatcherImpl implements EventDispatcher { if (args == null) { args = "[null value]"; } - //_log.debug(this.hashCode() + ": got notification [" + eventName + "] = [" + args + "]"); + _events.put(eventName, args); synchronized (_events) { - _events.put(eventName, args); _events.notifyAll(); - synchronized (_attached) { - Iterator it = _attached.iterator(); - EventDispatcher e; - while (it.hasNext()) { - e = (EventDispatcher) it.next(); - //_log.debug(this.hashCode() + ": notifying attached EventDispatcher " + e.hashCode() + ": [" - // + eventName + "] = [" + args + "]"); - e.notifyEvent(eventName, args); - } - } + } + for (EventDispatcher e : _attached) { + e.notifyEvent(eventName, args); } } public Object getEventValue(String name) { if (_ignore) return null; - Object val; - - synchronized (_events) { - val = _events.get(name); - } - - return val; + return _events.get(name); } - public Set getEvents() { + public Set getEvents() { if (_ignore) return Collections.EMPTY_SET; - Set set; - - synchronized (_events) { - set = new HashSet(_events.keySet()); - } - - return set; + return new HashSet(_events.keySet()); } public void ignoreEvents() { _ignore = true; - synchronized (_events) { - _events.clear(); - } + _events.clear(); } public void unIgnoreEvents() { @@ -122,22 +87,15 @@ public class EventDispatcherImpl implements EventDispatcher { public Object waitEventValue(String name) { if (_ignore) return null; - Object val; - - //_log.debug(this.hashCode() + ": waiting for [" + name + "]"); do { synchronized (_events) { - if (_events.containsKey(name)) { - val = _events.get(name); - break; - } + Object val = _events.get(name); + if (val != null) + return val; try { - _events.wait(1 * 1000); - } catch (InterruptedException e) { // nop - } + _events.wait(5 * 1000); + } catch (InterruptedException e) {} } } while (true); - - return val; } } From 7fa874f6256a3905d849b78955105af6d01416d2 Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 15 Jul 2011 20:52:18 +0000 Subject: [PATCH 5/6] - Tracking, expiration, closing of DCC tunnels - I2PTunnelRunner cleanups --- .../i2p/i2ptunnel/I2PTunnelClientBase.java | 2 +- .../net/i2p/i2ptunnel/I2PTunnelIRCClient.java | 4 + .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 43 +++++---- .../i2p/i2ptunnel/irc/DCCClientManager.java | 89 ++++++++++++++----- .../net/i2p/i2ptunnel/irc/EventReceiver.java | 76 ++++++++++++++++ .../i2p/i2ptunnel/irc/I2PTunnelDCCClient.java | 51 ++++++++++- .../i2p/i2ptunnel/irc/I2PTunnelDCCServer.java | 31 +++++-- 7 files changed, 248 insertions(+), 48 deletions(-) create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/EventReceiver.java diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index 1d98798bc4..ea29c7ed88 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -59,7 +59,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna private boolean listenerReady = false; - private ServerSocket ss; + protected ServerSocket ss; private final Object startLock = new Object(); private boolean startRunning = false; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java index dc128d1c4a..bcead946b6 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java @@ -144,6 +144,10 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements DCCHelper _DCCServer.close(forced); _DCCServer = null; } + if (_DCCClientManager != null) { + _DCCClientManager.close(forced); + _DCCClientManager = null; + } } return super.close(forced); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index 0e1c9049f8..cd7c330ee6 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -20,10 +20,10 @@ import net.i2p.util.I2PAppThread; import net.i2p.util.Log; public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener { - private final static Log _log = new Log(I2PTunnelRunner.class); + private final Log _log = new Log(I2PTunnelRunner.class); private static volatile long __runnerId; - private long _runnerId; + private final long _runnerId; /** * max bytes streamed in a packet - smaller ones might be filled * up to this size. Larger ones are not split (at least not on @@ -34,20 +34,20 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr static final int NETWORK_BUFFER_SIZE = MAX_PACKET_SIZE; - private Socket s; - private I2PSocket i2ps; - final Object slock, finishLock = new Object(); + private final Socket s; + private final I2PSocket i2ps; + private final Object slock, finishLock = new Object(); boolean finished = false; - HashMap ostreams, sockets; - byte[] initialI2PData; - byte[] initialSocketData; + private HashMap ostreams, sockets; + private final byte[] initialI2PData; + private final byte[] initialSocketData; /** when the last data was sent/received (or -1 if never) */ private long lastActivityOn; /** when the runner started up */ - private long startedOn; - private List sockList; + private final long startedOn; + private final List sockList; /** if we die before receiving any data, run this job */ - private Runnable onTimeout; + private final Runnable onTimeout; private long totalSent; private long totalReceived; @@ -56,12 +56,23 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList) { this(s, i2ps, slock, initialI2PData, null, sockList, null); } + public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList) { this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null); } + public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, Runnable onTimeout) { this(s, i2ps, slock, initialI2PData, null, sockList, onTimeout); } + + /** + * Starts itself + * + * @param initialI2PData may be null + * @param initialSocketData may be null + * @param sockList may be null + * @param onTImeout may be null + */ public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList, Runnable onTimeout) { this.sockList = sockList; this.s = s; @@ -237,11 +248,11 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr private class StreamForwarder extends I2PAppThread { - InputStream in; - OutputStream out; - String direction; - private boolean _toI2P; - private ByteCache _cache; + private final InputStream in; + private final OutputStream out; + private final String direction; + private final boolean _toI2P; + private final ByteCache _cache; private StreamForwarder(InputStream in, OutputStream out, boolean toI2P) { this.in = in; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java index ac9ed249b2..1b24bc0dcd 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java @@ -24,21 +24,22 @@ import net.i2p.util.Log; * * @since 0.8.9 */ -public class DCCClientManager { - +public class DCCClientManager extends EventReceiver { private final I2PSocketManager sockMgr; private final EventDispatcher _dispatch; private final Logging l; private final I2PTunnel _tunnel; private final Log _log; - private final ConcurrentHashMap _incoming; + private final ConcurrentHashMap _incoming; + private final ConcurrentHashMap _active; + // list of client tunnels? private static long _id; private static final int MAX_INCOMING_PENDING = 10; private static final int MAX_INCOMING_ACTIVE = 10; - private static final long INBOUND_EXPIRE = 30*60*1000; + private static final long ACTIVE_EXPIRE = 60*60*1000; public DCCClientManager(I2PSocketManager sktMgr, Logging logging, EventDispatcher dispatch, I2PTunnel tunnel) { @@ -48,6 +49,19 @@ public class DCCClientManager { _tunnel = tunnel; _log = tunnel.getContext().logManager().getLog(DCCClientManager.class); _incoming = new ConcurrentHashMap(8); + _active = new ConcurrentHashMap(8); + } + + public boolean close(boolean forced) { + for (I2PTunnelDCCClient c : _incoming.values()) { + c.stop(); + } + _incoming.clear(); + for (I2PTunnelDCCClient c : _active.values()) { + c.stop(); + } + _active.clear(); + return true; } /** @@ -60,21 +74,23 @@ public class DCCClientManager { */ public int newIncoming(String b32, int port, String type) { expireInbound(); - if (_incoming.size() >= MAX_INCOMING_PENDING) { - _log.error("Too many incoming DCC, max is " + MAX_INCOMING_PENDING); + if (_incoming.size() >= MAX_INCOMING_PENDING || + _active.size() >= MAX_INCOMING_PENDING) { + _log.error("Too many incoming DCC, max is " + MAX_INCOMING_PENDING + + '/' + MAX_INCOMING_ACTIVE + " pending/active"); return -1; } - I2PAddress client = new I2PAddress(b32, port, _tunnel.getContext().clock().now() + INBOUND_EXPIRE); try { // Transparent tunnel used for all types... // Do we need to do any filtering for chat? I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, port, l, sockMgr, _dispatch, _tunnel, ++_id); + cTunnel.attachEventDispatcher(this); int lport = cTunnel.getLocalPort(); if (_log.shouldLog(Log.WARN)) _log.warn("Opened client tunnel at port " + lport + " pointing to " + b32 + ':' + port); - _incoming.put(Integer.valueOf(lport), client); + _incoming.put(Integer.valueOf(lport), cTunnel); return lport; } catch (IllegalArgumentException uhe) { l.log("Could not find listen host to bind to [" + _tunnel.host + "]"); @@ -83,23 +99,52 @@ public class DCCClientManager { } } - private void expireInbound() { - for (Iterator iter = _incoming.values().iterator(); iter.hasNext(); ) { - I2PAddress a = iter.next(); - if (a.expire < _tunnel.getContext().clock().now()) - iter.remove(); + /** + * The EventReceiver callback + */ + public void notifyEvent(String eventName, Object args) { + if (eventName.equals(I2PTunnelDCCClient.CONNECT_START_EVENT)) { + try { + I2PTunnelDCCClient client = (I2PTunnelDCCClient) args; + connStarted(client); + } catch (ClassCastException cce) {} + } else if (eventName.equals(I2PTunnelDCCClient.CONNECT_STOP_EVENT)) { + try { + Integer port = (Integer) args; + connStopped(port); + } catch (ClassCastException cce) {} } } - private static class I2PAddress { - public final String dest; - public final int port; - public final long expire; - - public I2PAddress(String b32, int p, long exp) { - dest = b32; - port = p; - expire = exp; + private void connStarted(I2PTunnelDCCClient client) { + Integer lport = Integer.valueOf(client.getLocalPort()); + I2PTunnelDCCClient c = _incoming.remove(lport); + if (c != null) { + _active.put(lport, client); + if (_log.shouldLog(Log.WARN)) + _log.warn("Added client tunnel for port " + lport + + " pending count now: " + _incoming.size() + + " active count now: " + _active.size()); } } + + private void connStopped(Integer lport) { + _incoming.remove(lport); + _active.remove(lport); + if (_log.shouldLog(Log.WARN)) + _log.warn("Removed client tunnel for port " + lport + + " pending count now: " + _incoming.size() + + " active count now: " + _active.size()); + } + + private void expireInbound() { + for (Iterator iter = _incoming.values().iterator(); iter.hasNext(); ) { + I2PTunnelDCCClient c = iter.next(); + if (c.getExpires() < _tunnel.getContext().clock().now()) { + iter.remove(); + c.stop(); + } + } + // shouldn't need to expire active + } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/EventReceiver.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/EventReceiver.java new file mode 100644 index 0000000000..f03c8b90c2 --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/EventReceiver.java @@ -0,0 +1,76 @@ +package net.i2p.i2ptunnel.irc; + +/* + * free (adj.): unencumbered; not under the control of others Written + * by human & jrandom in 2004 and released into the public domain with + * no warranty of any kind, either expressed or implied. It probably + * won't make your computer catch on fire, or eat your children, but + * it might. Use at your own risk. + * + */ + +import java.util.Set; + +import net.i2p.util.EventDispatcher; + +/** + * An implementation of the EventDispatcher interface for + * receiving events via in-line notifyEvent() only. + * Does not support chaining to additional dispatchers. + * Does not support waitEventValue(). + * Does not support ignoring. + * + * @since 0.8.9 + */ +public abstract class EventReceiver implements EventDispatcher { + + public EventDispatcher getEventDispatcher() { + return this; + } + + /** + * @throws UnsupportedOperationException always + */ + public void attachEventDispatcher(EventDispatcher ev) { + throw new UnsupportedOperationException(); + } + + /** + * @throws UnsupportedOperationException always + */ + public void detachEventDispatcher(EventDispatcher ev) { + throw new UnsupportedOperationException(); + } + + public abstract void notifyEvent(String eventName, Object args); + + /** + * @throws UnsupportedOperationException always + */ + public Object getEventValue(String name) { + throw new UnsupportedOperationException(); + } + + /** + * @throws UnsupportedOperationException always + */ + public Set getEvents() { + throw new UnsupportedOperationException(); + } + + /** + * @throws UnsupportedOperationException always + */ + public void ignoreEvents() { + throw new UnsupportedOperationException(); + } + + public void unIgnoreEvents() {} + + /** + * @throws UnsupportedOperationException always + */ + public Object waitEventValue(String name) { + throw new UnsupportedOperationException(); + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java index e6a9c96cd7..88d462926d 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java @@ -29,6 +29,11 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase { // delay resolution until connect time private final String _dest; private final int _remotePort; + private final long _expires; + + private static final long INBOUND_EXPIRE = 30*60*1000; + public static final String CONNECT_START_EVENT = "connectionStarted"; + public static final String CONNECT_STOP_EVENT = "connectionStopped"; /** * @param dest the target, presumably b32 @@ -41,14 +46,16 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase { super(0, l, sktMgr, tunnel, notifyThis, clientId); _dest = dest; _remotePort = remotePort; + _expires = tunnel.getContext().clock().now() + INBOUND_EXPIRE; setName("DCC send -> " + dest + ':' + remotePort); startRunning(); - - notifyEvent("openClientResult", "ok"); } + /** + * Accept one connection only. + */ protected void clientConnectionRun(Socket s) { I2PSocket i2ps = null; if (_log.shouldLog(Log.INFO)) @@ -57,7 +64,8 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase { if (dest == null) { _log.error("Could not find leaseset for DCC connection to " + _dest + ':' + _remotePort); closeSocket(s); - // shutdown? + stop(); + notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort())); return; } @@ -65,13 +73,48 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase { opts.setPort(_remotePort); try { i2ps = createI2PSocket(dest, opts); - new I2PTunnelRunner(s, i2ps, sockLock, null, mySockets); + new Runner(s, i2ps); } catch (Exception ex) { _log.error("Could not make DCC connection to " + _dest + ':' + _remotePort, ex); closeSocket(s); if (i2ps != null) { try { i2ps.close(); } catch (IOException ioe) {} } + notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort())); + } + stop(); + } + + public long getExpires() { + return _expires; + } + + /** + * Stop listening for new sockets. + * We can't call super.close() as it kills all sockets in the sockMgr + */ + public void stop() { + open = false; + try { + ss.close(); + } catch (IOException ioe) {} + } + + /** + * Just so we can do the callbacks + */ + private class Runner extends I2PTunnelRunner { + + public Runner(Socket s, I2PSocket i2ps) { + // super calls start() + super(s, i2ps, sockLock, null, mySockets); + } + + @Override + public void run() { + notifyEvent(CONNECT_START_EVENT, I2PTunnelDCCClient.this); + super.run(); + notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort())); } } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java index d2a16cee9a..ee0432208b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java @@ -36,6 +36,8 @@ import net.i2p.util.Log; public class I2PTunnelDCCServer extends I2PTunnelServer { private final ConcurrentHashMap _outgoing; + private final ConcurrentHashMap _active; + // list of client tunnels? private static long _id; @@ -54,6 +56,7 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { private static final int MAX_OUTGOING_PENDING = 20; private static final int MAX_OUTGOING_ACTIVE = 20; private static final long OUTBOUND_EXPIRE = 30*60*1000; + private static final long ACTIVE_EXPIRE = 60*60*1000; /** * There's no support for unsolicited incoming I2P connections, @@ -67,6 +70,7 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { EventDispatcher notifyThis, I2PTunnel tunnel) { super(DUMMY, 0, sktMgr, l, notifyThis, tunnel); _outgoing = new ConcurrentHashMap(8); + _active = new ConcurrentHashMap(8); } /** @@ -81,11 +85,11 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { try { expireOutbound(); int myPort = socket.getLocalPort(); - // TODO remove, add to active - LocalAddress local = _outgoing.get(Integer.valueOf(myPort)); + // Port is a one-time-use only + LocalAddress local = _outgoing.remove(Integer.valueOf(myPort)); if (local == null) { if (_log.shouldLog(Log.WARN)) - _log.warn("Incoming DCC connection for unknown port " + myPort); + _log.warn("Rejecting incoming DCC connection for unknown port " + myPort); try { socket.close(); } catch (IOException ioe) {} @@ -96,6 +100,7 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { " sending to " + local.ia + ':' + local.port); Socket s = new Socket(local.ia, local.port); new I2PTunnelRunner(s, socket, slock, null, null); + _active.put(Integer.valueOf(myPort), socket); } catch (SocketException ex) { try { socket.close(); @@ -107,6 +112,13 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { } } + @Override + public boolean close(boolean forced) { + _outgoing.clear(); + _active.clear(); + return super.close(forced); + } + /** * An outgoing DCC request * @@ -117,8 +129,10 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { */ public int newOutgoing(byte[] ip, int port, String type) { expireOutbound(); - if (_outgoing.size() >= MAX_OUTGOING_PENDING) { - _log.error("Too many outgoing DCC, max is " + MAX_OUTGOING_PENDING); + if (_outgoing.size() >= MAX_OUTGOING_PENDING || + _active.size() >= MAX_OUTGOING_ACTIVE) { + _log.error("Too many outgoing DCC, max is " + MAX_OUTGOING_PENDING + + '/' + MAX_OUTGOING_ACTIVE + " pending/active"); return -1; } InetAddress ia; @@ -130,6 +144,8 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { LocalAddress client = new LocalAddress(ia, port, getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE); for (int i = 0; i < 10; i++) { int iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT); + if (_active.containsKey(Integer.valueOf(iport))) + continue; LocalAddress old = _outgoing.putIfAbsent(Integer.valueOf(iport), client); if (old != null) continue; @@ -157,6 +173,11 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { if (a.expire < getTunnel().getContext().clock().now()) iter.remove(); } + for (Iterator iter = _active.values().iterator(); iter.hasNext(); ) { + I2PSocket s = iter.next(); + if (s.isClosed()) + iter.remove(); + } } private static class LocalAddress { From 7ba6f5a7553bd47b19b6486078c981834aabb551 Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 15 Jul 2011 21:41:38 +0000 Subject: [PATCH 6/6] add gui option --- .../java/src/net/i2p/i2ptunnel/web/EditBean.java | 6 ++++++ .../src/net/i2p/i2ptunnel/web/IndexBean.java | 16 +++++++++++++++- apps/i2ptunnel/jsp/editClient.jsp | 9 +++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/EditBean.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/EditBean.java index 63f119bca8..100e87bb64 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/EditBean.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/EditBean.java @@ -21,6 +21,7 @@ import net.i2p.data.Signature; import net.i2p.data.SigningPrivateKey; import net.i2p.i2ptunnel.I2PTunnelHTTPClient; import net.i2p.i2ptunnel.I2PTunnelHTTPClientBase; +import net.i2p.i2ptunnel.I2PTunnelIRCClient; import net.i2p.i2ptunnel.TunnelController; import net.i2p.i2ptunnel.TunnelControllerGroup; import net.i2p.util.Addresses; @@ -170,6 +171,11 @@ public class EditBean extends IndexBean { return getBooleanProperty(tunnel, "i2cp.encryptLeaseSet"); } + /** @since 0.8.9 */ + public boolean getDCC(int tunnel) { + return getBooleanProperty(tunnel, I2PTunnelIRCClient.PROP_DCC); + } + public String getEncryptKey(int tunnel) { return getProperty(tunnel, "i2cp.leaseSetKey", ""); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java index eb36a64bc4..c69a03e09b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java @@ -27,6 +27,7 @@ import net.i2p.data.PrivateKeyFile; import net.i2p.data.SessionKey; import net.i2p.i2ptunnel.I2PTunnelHTTPClient; import net.i2p.i2ptunnel.I2PTunnelHTTPClientBase; +import net.i2p.i2ptunnel.I2PTunnelIRCClient; import net.i2p.i2ptunnel.TunnelController; import net.i2p.i2ptunnel.TunnelControllerGroup; import net.i2p.util.ConcurrentHashSet; @@ -675,6 +676,11 @@ public class IndexBean { _booleanOptions.add("i2cp.encryptLeaseSet"); } + /** @since 0.8.9 */ + public void setDCC(String moo) { + _booleanOptions.add(I2PTunnelIRCClient.PROP_DCC); + } + protected static final String PROP_ENABLE_ACCESS_LIST = "i2cp.enableAccessList"; protected static final String PROP_ENABLE_BLACKLIST = "i2cp.enableBlackList"; @@ -980,13 +986,20 @@ public class IndexBean { else config.setProperty("interface", ""); } + + if ("ircclient".equals(_type)) { + config.setProperty("option." + I2PTunnelIRCClient.PROP_DCC, + "" + _booleanOptions.contains(I2PTunnelIRCClient.PROP_DCC)); + } + return config; } private static final String _noShowOpts[] = { "inbound.length", "outbound.length", "inbound.lengthVariance", "outbound.lengthVariance", "inbound.backupQuantity", "outbound.backupQuantity", "inbound.quantity", "outbound.quantity", - "inbound.nickname", "outbound.nickname", "i2p.streaming.connectDelay", "i2p.streaming.maxWindowSize" + "inbound.nickname", "outbound.nickname", "i2p.streaming.connectDelay", "i2p.streaming.maxWindowSize", + I2PTunnelIRCClient.PROP_DCC }; private static final String _booleanClientOpts[] = { "i2cp.reduceOnIdle", "i2cp.closeOnIdle", "i2cp.newDestOnResume", "persistentClientKey", "i2cp.delayOpen" @@ -1008,6 +1021,7 @@ public class IndexBean { PROP_MAX_TOTAL_CONNS_MIN, PROP_MAX_TOTAL_CONNS_HOUR, PROP_MAX_TOTAL_CONNS_DAY, PROP_MAX_STREAMS }; + protected static final Set _noShowSet = new HashSet(64); static { _noShowSet.addAll(Arrays.asList(_noShowOpts)); diff --git a/apps/i2ptunnel/jsp/editClient.jsp b/apps/i2ptunnel/jsp/editClient.jsp index b95452b129..ad2d14aa2c 100644 --- a/apps/i2ptunnel/jsp/editClient.jsp +++ b/apps/i2ptunnel/jsp/editClient.jsp @@ -173,6 +173,15 @@ class="tickbox" /> <%=intl._("(Check the Box for 'YES')")%> + <% if ("ircclient".equals(tunnelType)) { %> +
    + + class="tickbox" /> + <%=intl._("(Check the Box for 'YES')")%> +
    + <% } // ircclient %>