From c7541f819a01fd374b8462e18e4ed18b26aa0f86 Mon Sep 17 00:00:00 2001 From: sponge Date: Tue, 30 Jun 2009 04:44:13 +0000 Subject: [PATCH] 2009-06-30 sponge * General cleanup on streaming and ministreaming. This fixes some compile warnings, and prepares for a larger fix. There is no code-flow changes, just lint. One warning remains as I am unsure exactly how to solve the problem yet. --- apps/BOB/nbproject/private/private.xml | 3 ++ .../streaming/I2PSocketOptionsImpl.java | 2 +- .../net/i2p/client/streaming/Connection.java | 21 +++++----- .../client/streaming/ConnectionHandler.java | 12 +++--- .../client/streaming/ConnectionManager.java | 38 ++++++++++--------- .../client/streaming/ConnectionOptions.java | 7 ++-- .../i2p/client/streaming/MessageHandler.java | 2 +- .../client/streaming/MessageInputStream.java | 2 +- .../client/streaming/MessageOutputStream.java | 3 +- .../i2p/client/streaming/PacketHandler.java | 1 - .../net/i2p/client/streaming/PacketQueue.java | 2 +- .../client/streaming/SchedulerChooser.java | 2 +- .../i2p/client/streaming/TaskScheduler.java | 2 +- history.txt | 6 +++ .../src/net/i2p/router/RouterVersion.java | 2 +- 15 files changed, 59 insertions(+), 46 deletions(-) diff --git a/apps/BOB/nbproject/private/private.xml b/apps/BOB/nbproject/private/private.xml index c1f155a78..2482568bf 100644 --- a/apps/BOB/nbproject/private/private.xml +++ b/apps/BOB/nbproject/private/private.xml @@ -1,4 +1,7 @@ + + file:/root/NetBeansProjects/i2p.i2p/apps/BOB/src/net/i2p/BOB/MUXlisten.java + diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java index eb58b6871..334c86af0 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java @@ -6,7 +6,7 @@ import java.util.Properties; * Define the configuration for streaming and verifying data on the socket. * */ -class I2PSocketOptionsImpl implements I2PSocketOptions { +public class I2PSocketOptionsImpl implements I2PSocketOptions { private long _connectTimeout; private long _readTimeout; private long _writeTimeout; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index f73632683..e6d99c473 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -49,7 +49,7 @@ public class Connection { private boolean _isInbound; private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ - private Map _outboundPackets; + private final Map _outboundPackets; private PacketQueue _outboundQueue; private ConnectionPacketHandler _handler; private ConnectionOptions _options; @@ -66,7 +66,7 @@ public class Connection { private long _lastCongestionHighestUnacked; private boolean _ackSinceCongestion; /** Notify this on connection (or connection failure) */ - private Object _connectLock; + private final Object _connectLock; /** how many messages have been resent and not yet ACKed? */ private int _activeResends; private ConEvent _connectionEvent; @@ -90,21 +90,22 @@ public class Connection { } public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { _context = ctx; - _log = ctx.logManager().getLog(Connection.class); - _receiver = new ConnectionDataReceiver(ctx, this); - _inputStream = new MessageInputStream(ctx); - _outputStream = new MessageOutputStream(ctx, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); + _connectionManager = manager; _chooser = chooser; - _outboundPackets = new TreeMap(); _outboundQueue = queue; _handler = handler; + _log = _context.logManager().getLog(Connection.class); + _receiver = new ConnectionDataReceiver(_context, this); + _inputStream = new MessageInputStream(_context); + _outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); + _outboundPackets = new TreeMap(); _options = (opts != null ? opts : new ConnectionOptions()); _outputStream.setWriteTimeout((int)_options.getWriteTimeout()); _inputStream.setReadTimeout((int)_options.getReadTimeout()); _lastSendId = -1; _nextSendTime = -1; _ackedPackets = 0; - _createdOn = ctx.clock().now(); + _createdOn = _context.clock().now(); _closeSentOn = -1; _closeReceivedOn = -1; _unackedPacketsReceived = 0; @@ -113,7 +114,6 @@ public class Connection { _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow _lastCongestionTime = -1; _lastCongestionHighestUnacked = -1; - _connectionManager = manager; _resetReceived = false; _connected = true; _disconnectScheduledOn = -1; @@ -126,7 +126,7 @@ public class Connection { _isInbound = false; _updatedShareOpts = false; _connectionEvent = new ConEvent(); - _hardDisconnected = false; + _hardDisconnected = false; _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -1018,6 +1018,7 @@ public class Connection { // _log.debug("firing event on " + _connection, _addedBy); eventOccurred(); } + @Override public String toString() { return "event on " + Connection.this.toString(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index d989a2367..382c984d9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -16,7 +16,7 @@ import net.i2p.util.SimpleTimer; * * @author zzz modded to use concurrent and bound queue size */ -class ConnectionHandler { +public class ConnectionHandler { private I2PAppContext _context; private Log _log; private ConnectionManager _manager; @@ -109,7 +109,7 @@ class ConnectionHandler { // fail all the ones we had queued up while(true) { Packet packet = _synQueue.poll(); // fails immediately if empty - if (packet == null || packet.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST) + if (packet == null || packet.getOptionalDelay() == PoisonPacket.POISON_MAX_DELAY_REQUEST) break; sendReset(packet); } @@ -142,7 +142,7 @@ class ConnectionHandler { } if (syn != null) { - if (syn.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST) + if (syn.getOptionalDelay() == PoisonPacket.POISON_MAX_DELAY_REQUEST) return null; // deal with forged / invalid syn packets @@ -226,14 +226,14 @@ class ConnectionHandler { /** * Simple end-of-queue marker. - * The standard class limits the delay to MAX_DELAY_REQUEST so + * The standard class limits the delay to POISON_MAX_DELAY_REQUEST so * an evil user can't use this to shut us down */ private static class PoisonPacket extends Packet { - public static final int MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1; + public static final int POISON_MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1; public PoisonPacket() { - setOptionalDelay(MAX_DELAY_REQUEST); + setOptionalDelay(POISON_MAX_DELAY_REQUEST); } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 7826ba2a8..b978ff3be 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -34,32 +34,32 @@ public class ConnectionManager { /** Inbound stream ID (Long) to Connection map */ private Map _connectionByInboundId; /** Ping ID (Long) to PingRequest */ - private Map _pendingPings; + private final Map _pendingPings; private boolean _allowIncoming; private int _maxConcurrentStreams; private ConnectionOptions _defaultOptions; private volatile int _numWaiting; - private Object _connectionLock; + private final Object _connectionLock; private long SoTimeout; public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { _context = context; - _log = context.logManager().getLog(ConnectionManager.class); + _session = session; + _maxConcurrentStreams = maxConcurrent; + _defaultOptions = defaultOptions; + _log = _context.logManager().getLog(ConnectionManager.class); _connectionByInboundId = new HashMap(32); _pendingPings = new HashMap(4); _connectionLock = new Object(); - _messageHandler = new MessageHandler(context, this); - _packetHandler = new PacketHandler(context, this); - _connectionHandler = new ConnectionHandler(context, this); - _schedulerChooser = new SchedulerChooser(context); - _conPacketHandler = new ConnectionPacketHandler(context); - _tcbShare = new TCBShare(context); - _session = session; - session.setSessionListener(_messageHandler); - _outboundQueue = new PacketQueue(context, session, this); + _messageHandler = new MessageHandler(_context, this); + _packetHandler = new PacketHandler(_context, this); + _connectionHandler = new ConnectionHandler(_context, this); + _schedulerChooser = new SchedulerChooser(_context); + _conPacketHandler = new ConnectionPacketHandler(_context); + _tcbShare = new TCBShare(_context); + _session.setSessionListener(_messageHandler); + _outboundQueue = new PacketQueue(_context, _session, this); _allowIncoming = false; - _maxConcurrentStreams = maxConcurrent; - _defaultOptions = defaultOptions; _numWaiting = 0; /** Socket timeout for accept() */ SoTimeout = -1; @@ -277,11 +277,13 @@ public class ConnectionManager { public MessageHandler getMessageHandler() { return _messageHandler; } public PacketHandler getPacketHandler() { return _packetHandler; } - public ConnectionHandler getConnectionHandler() { return _connectionHandler; } public I2PSession getSession() { return _session; } - public PacketQueue getPacketQueue() { return _outboundQueue; } public void updateOptsFromShare(Connection con) { _tcbShare.updateOptsFromShare(con); } public void updateShareOpts(Connection con) { _tcbShare.updateShareOpts(con); } + // Both of these methods are + // exporting non-public type through public API, this is a potential bug. + public ConnectionHandler getConnectionHandler() { return _connectionHandler; } + public PacketQueue getPacketQueue() { return _outboundQueue; } /** * Something b0rked hard, so kill all of our connections without mercy. @@ -345,13 +347,13 @@ public class ConnectionManager { return new HashSet(_connectionByInboundId.values()); } } - public boolean ping(Destination peer, long timeoutMs) { return ping(peer, timeoutMs, true); } public boolean ping(Destination peer, long timeoutMs, boolean blocking) { return ping(peer, timeoutMs, blocking, null, null, null); } + public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1); PacketLocal packet = new PacketLocal(_context, peer); @@ -390,7 +392,7 @@ public class ConnectionManager { return ok; } - interface PingNotifier { + public interface PingNotifier { public void pingComplete(boolean ok); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 4363e3f49..740fdcf82 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -15,7 +15,6 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { private int _rtt; private int _rttDev; private int _rto; - private int _trend[]; private int _resendDelay; private int _sendAckDelay; private int _maxMessageSize; @@ -58,7 +57,10 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { static final int DEFAULT_MAX_SENDS = 8; public static final int DEFAULT_INITIAL_RTT = 8*1000; static final int MIN_WINDOW_SIZE = 1; - + // Syncronization fix, but doing it this way causes NPE... + // private final int _trend[] = new int[TREND_COUNT]; + private int _trend[]; + /** * OK, here is the calculation on the message size to fit in a single * tunnel message without fragmentation. @@ -203,7 +205,6 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { protected void init(Properties opts) { super.init(opts); _trend = new int[TREND_COUNT]; - setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index 1ff65248d..98165cf7d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -18,7 +18,7 @@ public class MessageHandler implements I2PSessionListener { private ConnectionManager _manager; private I2PAppContext _context; private Log _log; - private List _listeners; + private final List _listeners; public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) { _manager = mgr; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 77a5014cf..7b87e5583 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -55,7 +55,7 @@ public class MessageInputStream extends InputStream { private byte[] _oneByte = new byte[1]; - private Object _dataLock; + private final Object _dataLock; public MessageInputStream(I2PAppContext ctx) { _context = ctx; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 4a810d565..ab7c9374a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -20,7 +20,7 @@ public class MessageOutputStream extends OutputStream { private Log _log; private byte _buf[]; private int _valid; - private Object _dataLock; + private final Object _dataLock; private DataReceiver _dataReceiver; private IOException _streamError; private boolean _closed; @@ -319,6 +319,7 @@ public class MessageOutputStream extends OutputStream { throwAnyError(); } + @Override public void close() throws IOException { if (_closed) { synchronized (_dataLock) { _dataLock.notifyAll(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 1d26d7b8c..ff43293a0 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -9,7 +9,6 @@ import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.data.DataHelper; import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; /** * receive a packet and dispatch it correctly to the connection specified, diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index e91cbdb7d..db4adb27c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -18,7 +18,7 @@ import net.i2p.util.Log; * mode=bestEffort doesnt block in the SDK. * */ -class PacketQueue { +public class PacketQueue { private I2PAppContext _context; private Log _log; private I2PSession _session; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java index a2aacf82e..be62f1766 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java @@ -10,7 +10,7 @@ import net.i2p.util.Log; * Examine a connection's state and pick the right scheduler for it. * */ -class SchedulerChooser { +public class SchedulerChooser { private I2PAppContext _context; private Log _log; private TaskScheduler _nullScheduler; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TaskScheduler.java b/apps/streaming/java/src/net/i2p/client/streaming/TaskScheduler.java index 8657a2053..c998c8425 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TaskScheduler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TaskScheduler.java @@ -5,7 +5,7 @@ package net.i2p.client.streaming; * selected based upon its current state. * */ -interface TaskScheduler { +public interface TaskScheduler { /** * An event has occurred (timeout, message sent, or message received), * so schedule what to do next based on our current state. diff --git a/history.txt b/history.txt index dfd639beb..dc745e8e9 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,9 @@ +2009-06-30 sponge + * General cleanup on streaming and ministreaming. + This fixes some compile warnings, and prepares for a larger fix. + There is no code-flow changes, just lint. One warning remains as I am + unsure exactly how to solve the problem yet. + * 2009-06-29 0.7.5 released 2009-06-29 Complication diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 8aa072fcf..b74a3a021 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 0; + public final static long BUILD = 1; /** for example "-test" */ public final static String EXTRA = ""; public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;