2009-06-30 sponge

* General cleanup on streaming and ministreaming.
      This fixes some compile warnings, and prepares for a larger fix.
      There is no code-flow changes, just lint. One warning remains as I am
      unsure exactly how to solve the problem yet.
This commit is contained in:
sponge
2009-06-30 04:44:13 +00:00
parent 9e1181900b
commit c7541f819a
15 changed files with 59 additions and 46 deletions

View File

@ -1,4 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project-private xmlns="http://www.netbeans.org/ns/project-private/1">
<editor-bookmarks xmlns="http://www.netbeans.org/ns/editor-bookmarks/1"/>
<open-files xmlns="http://www.netbeans.org/ns/projectui-open-files/1">
<file>file:/root/NetBeansProjects/i2p.i2p/apps/BOB/src/net/i2p/BOB/MUXlisten.java</file>
</open-files>
</project-private>

View File

@ -6,7 +6,7 @@ import java.util.Properties;
* Define the configuration for streaming and verifying data on the socket.
*
*/
class I2PSocketOptionsImpl implements I2PSocketOptions {
public class I2PSocketOptionsImpl implements I2PSocketOptions {
private long _connectTimeout;
private long _readTimeout;
private long _writeTimeout;

View File

@ -49,7 +49,7 @@ public class Connection {
private boolean _isInbound;
private boolean _updatedShareOpts;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */
private Map _outboundPackets;
private final Map _outboundPackets;
private PacketQueue _outboundQueue;
private ConnectionPacketHandler _handler;
private ConnectionOptions _options;
@ -66,7 +66,7 @@ public class Connection {
private long _lastCongestionHighestUnacked;
private boolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */
private Object _connectLock;
private final Object _connectLock;
/** how many messages have been resent and not yet ACKed? */
private int _activeResends;
private ConEvent _connectionEvent;
@ -90,21 +90,22 @@ public class Connection {
}
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) {
_context = ctx;
_log = ctx.logManager().getLog(Connection.class);
_receiver = new ConnectionDataReceiver(ctx, this);
_inputStream = new MessageInputStream(ctx);
_outputStream = new MessageOutputStream(ctx, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize()));
_connectionManager = manager;
_chooser = chooser;
_outboundPackets = new TreeMap();
_outboundQueue = queue;
_handler = handler;
_log = _context.logManager().getLog(Connection.class);
_receiver = new ConnectionDataReceiver(_context, this);
_inputStream = new MessageInputStream(_context);
_outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize()));
_outboundPackets = new TreeMap();
_options = (opts != null ? opts : new ConnectionOptions());
_outputStream.setWriteTimeout((int)_options.getWriteTimeout());
_inputStream.setReadTimeout((int)_options.getReadTimeout());
_lastSendId = -1;
_nextSendTime = -1;
_ackedPackets = 0;
_createdOn = ctx.clock().now();
_createdOn = _context.clock().now();
_closeSentOn = -1;
_closeReceivedOn = -1;
_unackedPacketsReceived = 0;
@ -113,7 +114,6 @@ public class Connection {
_lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
_lastCongestionTime = -1;
_lastCongestionHighestUnacked = -1;
_connectionManager = manager;
_resetReceived = false;
_connected = true;
_disconnectScheduledOn = -1;
@ -126,7 +126,7 @@ public class Connection {
_isInbound = false;
_updatedShareOpts = false;
_connectionEvent = new ConEvent();
_hardDisconnected = false;
_hardDisconnected = false;
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -1018,6 +1018,7 @@ public class Connection {
// _log.debug("firing event on " + _connection, _addedBy);
eventOccurred();
}
@Override
public String toString() { return "event on " + Connection.this.toString(); }
}

View File

@ -16,7 +16,7 @@ import net.i2p.util.SimpleTimer;
*
* @author zzz modded to use concurrent and bound queue size
*/
class ConnectionHandler {
public class ConnectionHandler {
private I2PAppContext _context;
private Log _log;
private ConnectionManager _manager;
@ -109,7 +109,7 @@ class ConnectionHandler {
// fail all the ones we had queued up
while(true) {
Packet packet = _synQueue.poll(); // fails immediately if empty
if (packet == null || packet.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST)
if (packet == null || packet.getOptionalDelay() == PoisonPacket.POISON_MAX_DELAY_REQUEST)
break;
sendReset(packet);
}
@ -142,7 +142,7 @@ class ConnectionHandler {
}
if (syn != null) {
if (syn.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST)
if (syn.getOptionalDelay() == PoisonPacket.POISON_MAX_DELAY_REQUEST)
return null;
// deal with forged / invalid syn packets
@ -226,14 +226,14 @@ class ConnectionHandler {
/**
* Simple end-of-queue marker.
* The standard class limits the delay to MAX_DELAY_REQUEST so
* The standard class limits the delay to POISON_MAX_DELAY_REQUEST so
* an evil user can't use this to shut us down
*/
private static class PoisonPacket extends Packet {
public static final int MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1;
public static final int POISON_MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1;
public PoisonPacket() {
setOptionalDelay(MAX_DELAY_REQUEST);
setOptionalDelay(POISON_MAX_DELAY_REQUEST);
}
}
}

View File

@ -34,32 +34,32 @@ public class ConnectionManager {
/** Inbound stream ID (Long) to Connection map */
private Map _connectionByInboundId;
/** Ping ID (Long) to PingRequest */
private Map _pendingPings;
private final Map _pendingPings;
private boolean _allowIncoming;
private int _maxConcurrentStreams;
private ConnectionOptions _defaultOptions;
private volatile int _numWaiting;
private Object _connectionLock;
private final Object _connectionLock;
private long SoTimeout;
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
_context = context;
_log = context.logManager().getLog(ConnectionManager.class);
_session = session;
_maxConcurrentStreams = maxConcurrent;
_defaultOptions = defaultOptions;
_log = _context.logManager().getLog(ConnectionManager.class);
_connectionByInboundId = new HashMap(32);
_pendingPings = new HashMap(4);
_connectionLock = new Object();
_messageHandler = new MessageHandler(context, this);
_packetHandler = new PacketHandler(context, this);
_connectionHandler = new ConnectionHandler(context, this);
_schedulerChooser = new SchedulerChooser(context);
_conPacketHandler = new ConnectionPacketHandler(context);
_tcbShare = new TCBShare(context);
_session = session;
session.setSessionListener(_messageHandler);
_outboundQueue = new PacketQueue(context, session, this);
_messageHandler = new MessageHandler(_context, this);
_packetHandler = new PacketHandler(_context, this);
_connectionHandler = new ConnectionHandler(_context, this);
_schedulerChooser = new SchedulerChooser(_context);
_conPacketHandler = new ConnectionPacketHandler(_context);
_tcbShare = new TCBShare(_context);
_session.setSessionListener(_messageHandler);
_outboundQueue = new PacketQueue(_context, _session, this);
_allowIncoming = false;
_maxConcurrentStreams = maxConcurrent;
_defaultOptions = defaultOptions;
_numWaiting = 0;
/** Socket timeout for accept() */
SoTimeout = -1;
@ -277,11 +277,13 @@ public class ConnectionManager {
public MessageHandler getMessageHandler() { return _messageHandler; }
public PacketHandler getPacketHandler() { return _packetHandler; }
public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
public I2PSession getSession() { return _session; }
public PacketQueue getPacketQueue() { return _outboundQueue; }
public void updateOptsFromShare(Connection con) { _tcbShare.updateOptsFromShare(con); }
public void updateShareOpts(Connection con) { _tcbShare.updateShareOpts(con); }
// Both of these methods are
// exporting non-public type through public API, this is a potential bug.
public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
public PacketQueue getPacketQueue() { return _outboundQueue; }
/**
* Something b0rked hard, so kill all of our connections without mercy.
@ -345,13 +347,13 @@ public class ConnectionManager {
return new HashSet(_connectionByInboundId.values());
}
}
public boolean ping(Destination peer, long timeoutMs) {
return ping(peer, timeoutMs, true);
}
public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
return ping(peer, timeoutMs, blocking, null, null, null);
}
public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
PacketLocal packet = new PacketLocal(_context, peer);
@ -390,7 +392,7 @@ public class ConnectionManager {
return ok;
}
interface PingNotifier {
public interface PingNotifier {
public void pingComplete(boolean ok);
}

View File

@ -15,7 +15,6 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
private int _rtt;
private int _rttDev;
private int _rto;
private int _trend[];
private int _resendDelay;
private int _sendAckDelay;
private int _maxMessageSize;
@ -58,7 +57,10 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
static final int DEFAULT_MAX_SENDS = 8;
public static final int DEFAULT_INITIAL_RTT = 8*1000;
static final int MIN_WINDOW_SIZE = 1;
// Syncronization fix, but doing it this way causes NPE...
// private final int _trend[] = new int[TREND_COUNT];
private int _trend[];
/**
* OK, here is the calculation on the message size to fit in a single
* tunnel message without fragmentation.
@ -203,7 +205,6 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
protected void init(Properties opts) {
super.init(opts);
_trend = new int[TREND_COUNT];
setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE));
setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));

View File

@ -18,7 +18,7 @@ public class MessageHandler implements I2PSessionListener {
private ConnectionManager _manager;
private I2PAppContext _context;
private Log _log;
private List _listeners;
private final List _listeners;
public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) {
_manager = mgr;

View File

@ -55,7 +55,7 @@ public class MessageInputStream extends InputStream {
private byte[] _oneByte = new byte[1];
private Object _dataLock;
private final Object _dataLock;
public MessageInputStream(I2PAppContext ctx) {
_context = ctx;

View File

@ -20,7 +20,7 @@ public class MessageOutputStream extends OutputStream {
private Log _log;
private byte _buf[];
private int _valid;
private Object _dataLock;
private final Object _dataLock;
private DataReceiver _dataReceiver;
private IOException _streamError;
private boolean _closed;
@ -319,6 +319,7 @@ public class MessageOutputStream extends OutputStream {
throwAnyError();
}
@Override
public void close() throws IOException {
if (_closed) {
synchronized (_dataLock) { _dataLock.notifyAll(); }

View File

@ -9,7 +9,6 @@ import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/**
* receive a packet and dispatch it correctly to the connection specified,

View File

@ -18,7 +18,7 @@ import net.i2p.util.Log;
* mode=bestEffort doesnt block in the SDK.
*
*/
class PacketQueue {
public class PacketQueue {
private I2PAppContext _context;
private Log _log;
private I2PSession _session;

View File

@ -10,7 +10,7 @@ import net.i2p.util.Log;
* Examine a connection's state and pick the right scheduler for it.
*
*/
class SchedulerChooser {
public class SchedulerChooser {
private I2PAppContext _context;
private Log _log;
private TaskScheduler _nullScheduler;

View File

@ -5,7 +5,7 @@ package net.i2p.client.streaming;
* selected based upon its current state.
*
*/
interface TaskScheduler {
public interface TaskScheduler {
/**
* An event has occurred (timeout, message sent, or message received),
* so schedule what to do next based on our current state.

View File

@ -1,3 +1,9 @@
2009-06-30 sponge
* General cleanup on streaming and ministreaming.
This fixes some compile warnings, and prepares for a larger fix.
There is no code-flow changes, just lint. One warning remains as I am
unsure exactly how to solve the problem yet.
* 2009-06-29 0.7.5 released
2009-06-29 Complication

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 0;
public final static long BUILD = 1;
/** for example "-test" */
public final static String EXTRA = "";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;