* deal with nondeferred connections (block the mgr.connect(..) until either success or failure)

* allow loading the connection options from the env (or another Properties specified)
This commit is contained in:
jrandom
2004-11-10 12:55:06 +00:00
committed by zzz
parent 881524a5e4
commit 299e5528bc
7 changed files with 136 additions and 42 deletions

View File

@ -59,6 +59,8 @@ public class Connection {
/** window size when we last saw congestion */ /** window size when we last saw congestion */
private int _lastCongestionSeenAt; private int _lastCongestionSeenAt;
private boolean _ackSinceCongestion; private boolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */
private Object _connectLock;
public static final long MAX_RESEND_DELAY = 60*1000; public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 20*1000; public static final long MIN_RESEND_DELAY = 20*1000;
@ -100,6 +102,7 @@ public class Connection {
_lastReceivedOn = -1; _lastReceivedOn = -1;
_activityTimer = new ActivityTimer(); _activityTimer = new ActivityTimer();
_ackSinceCongestion = true; _ackSinceCongestion = true;
_connectLock = new Object();
} }
public long getNextOutboundPacketNum() { public long getNextOutboundPacketNum() {
@ -111,6 +114,7 @@ public class Connection {
void closeReceived() { void closeReceived() {
setCloseReceivedOn(_context.clock().now()); setCloseReceivedOn(_context.clock().now());
_inputStream.closeReceived(); _inputStream.closeReceived();
synchronized (_connectLock) { _connectLock.notifyAll(); }
} }
/** /**
@ -296,6 +300,8 @@ public class Connection {
_resetReceived = true; _resetReceived = true;
_outputStream.streamErrorOccurred(new IOException("Reset received")); _outputStream.streamErrorOccurred(new IOException("Reset received"));
_inputStream.streamErrorOccurred(new IOException("Reset received")); _inputStream.streamErrorOccurred(new IOException("Reset received"));
_connectionError = "Connection reset";
synchronized (_connectLock) { _connectLock.notifyAll(); }
} }
public boolean getResetReceived() { return _resetReceived; } public boolean getResetReceived() { return _resetReceived; }
@ -307,6 +313,7 @@ public class Connection {
void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) { void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) {
if (!_connected) return; if (!_connected) return;
_connected = false; _connected = false;
synchronized (_connectLock) { _connectLock.notifyAll(); }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Disconnecting " + toString(), new Exception("discon")); _log.debug("Disconnecting " + toString(), new Exception("discon"));
@ -362,6 +369,7 @@ public class Connection {
private void doClose() { private void doClose() {
_outputStream.streamErrorOccurred(new IOException("Hard disconnect")); _outputStream.streamErrorOccurred(new IOException("Hard disconnect"));
_inputStream.closeReceived(); _inputStream.closeReceived();
synchronized (_connectLock) { _connectLock.notifyAll(); }
} }
/** who are we talking with */ /** who are we talking with */
@ -374,7 +382,10 @@ public class Connection {
/** stream the peer sends data to us on. (may be null) */ /** stream the peer sends data to us on. (may be null) */
public byte[] getReceiveStreamId() { return _receiveStreamId; } public byte[] getReceiveStreamId() { return _receiveStreamId; }
public void setReceiveStreamId(byte[] id) { _receiveStreamId = id; } public void setReceiveStreamId(byte[] id) {
_receiveStreamId = id;
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
/** when did we last send anything to the peer? */ /** when did we last send anything to the peer? */
public long getLastSendTime() { return _lastSendTime; } public long getLastSendTime() { return _lastSendTime; }
@ -394,6 +405,8 @@ public class Connection {
public String getConnectionError() { return _connectionError; } public String getConnectionError() { return _connectionError; }
public void setConnectionError(String err) { _connectionError = err; } public void setConnectionError(String err) { _connectionError = err; }
public long getLifetime() { return _context.clock().now() - _createdOn; }
public ConnectionPacketHandler getPacketHandler() { return _handler; } public ConnectionPacketHandler getPacketHandler() { return _handler; }
/** /**
@ -462,6 +475,57 @@ public class Connection {
void packetReceived() { void packetReceived() {
_lastReceivedOn = _context.clock().now(); _lastReceivedOn = _context.clock().now();
resetActivityTimer(); resetActivityTimer();
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
/**
* wait until a connection is made or the connection fails within the
* timeout period, setting the error accordingly.
*/
void waitForConnect() {
long expiration = _context.clock().now() + _options.getConnectTimeout();
while (true) {
if (_connected && (_receiveStreamId != null) && (_sendStreamId != null) ) {
// w00t
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): Connected and we have stream IDs");
return;
}
if (_connectionError != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): connection error found: " + _connectionError);
return;
}
if (!_connected) {
_connectionError = "Connection failed";
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): not connected");
return;
}
long timeLeft = expiration - _context.clock().now();
if ( (timeLeft <= 0) && (_options.getConnectTimeout() > 0) ) {
if (_connectionError == null) {
_connectionError = "Connection timed out";
disconnect(false);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): timed out: " + _connectionError);
return;
}
if (timeLeft > 60*1000)
timeLeft = 60*1000;
if (_options.getConnectTimeout() <= 0)
timeLeft = 60*1000;
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): wait " + timeLeft);
try {
synchronized (_connectLock) {
_connectLock.wait(timeLeft);
}
} catch (InterruptedException ie) {}
}
} }
private void resetActivityTimer() { private void resetActivityTimer() {

View File

@ -103,7 +103,9 @@ public class ConnectionManager {
} }
/** /**
* Build a new connection to the given peer * Build a new connection to the given peer. This blocks if there is no
* connection delay, otherwise it returns immediately.
*
*/ */
public Connection connect(Destination peer, ConnectionOptions opts) { public Connection connect(Destination peer, ConnectionOptions opts) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
@ -120,6 +122,11 @@ public class ConnectionManager {
con.setReceiveStreamId(receiveId); con.setReceiveStreamId(receiveId);
con.eventOccurred(); con.eventOccurred();
_log.debug("Connect() conDelay = " + opts.getConnectDelay());
if (opts.getConnectDelay() <= 0) {
con.waitForConnect();
}
return con; return con;
} }

View File

@ -32,22 +32,32 @@ public class ConnectionOptions extends I2PSocketOptions {
/** on inactivity timeout, send a payload message */ /** on inactivity timeout, send a payload message */
public static final int INACTIVITY_ACTION_SEND = 2; public static final int INACTIVITY_ACTION_SEND = 2;
public static final String PROP_CONNECT_DELAY = "i2p.streaming.connectDelay";
public static final String PROP_PROFILE = "i2p.streaming.profile";
public static final String PROP_MAX_MESSAGE_SIZE = "i2p.streaming.maxMessageSize";
public static final String PROP_MAX_RESENDS = "i2p.streaming.maxResends";
public static final String PROP_INITIAL_RTT = "i2p.streaming.initialRTT";
public static final String PROP_INITIAL_RESEND_DELAY = "i2p.streaming.initialResendDelay";
public static final String PROP_INITIAL_ACK_DELAY = "i2p.streaming.initialAckDelay";
public static final String PROP_INITIAL_WINDOW_SIZE = "i2p.streaming.initialWindowSize";
public static final String PROP_INITIAL_RECEIVE_WINDOW = "i2p.streaming.initialReceiveWindow";
public static final String PROP_INACTIVITY_TIMEOUT = "i2p.streaming.inactivityTimeout";
public static final String PROP_INACTIVITY_ACTION = "i2p.streaming.inactivityAction";
public ConnectionOptions() { public ConnectionOptions() {
super(); super();
init(null); }
public ConnectionOptions(Properties opts) {
super(opts);
} }
public ConnectionOptions(I2PSocketOptions opts) { public ConnectionOptions(I2PSocketOptions opts) {
super(opts); super(opts);
init(null);
} }
public ConnectionOptions(ConnectionOptions opts) { public ConnectionOptions(ConnectionOptions opts) {
super(opts); super(opts);
init(opts);
}
private void init(ConnectionOptions opts) {
if (opts != null) { if (opts != null) {
setConnectDelay(opts.getConnectDelay()); setConnectDelay(opts.getConnectDelay());
setProfile(opts.getProfile()); setProfile(opts.getProfile());
@ -61,26 +71,24 @@ public class ConnectionOptions extends I2PSocketOptions {
setInactivityTimeout(opts.getInactivityTimeout()); setInactivityTimeout(opts.getInactivityTimeout());
setInactivityAction(opts.getInactivityAction()); setInactivityAction(opts.getInactivityAction());
setInboundBufferSize(opts.getInboundBufferSize()); setInboundBufferSize(opts.getInboundBufferSize());
} else {
setConnectDelay(2*1000);
setProfile(PROFILE_BULK);
setMaxMessageSize(Packet.MAX_PAYLOAD_SIZE);
setRTT(30*1000);
setReceiveWindow(1);
setResendDelay(5*1000);
setSendAckDelay(2*1000);
setWindowSize(1);
setMaxResends(5);
setWriteTimeout(-1);
setInactivityTimeout(5*60*1000);
setInactivityAction(INACTIVITY_ACTION_SEND);
setInboundBufferSize((Packet.MAX_PAYLOAD_SIZE + 2) * Connection.MAX_WINDOW_SIZE);
} }
} }
public ConnectionOptions(Properties opts) { protected void init(Properties opts) {
super(opts); super.init(opts);
// load the options; setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE));
setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000));
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 5*1000));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2*1000));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND));
setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE);
} }
/** /**

View File

@ -134,7 +134,12 @@ public class I2PSocketManagerFull implements I2PSocketManager {
throws I2PException, NoRouteToHostException { throws I2PException, NoRouteToHostException {
if (_connectionManager.getSession().isClosed()) if (_connectionManager.getSession().isClosed())
throw new I2PException("Session is closed"); throw new I2PException("Session is closed");
Connection con = _connectionManager.connect(peer, new ConnectionOptions(options)); ConnectionOptions opts = null;
if (options instanceof ConnectionOptions)
opts = (ConnectionOptions)options;
else
opts = new ConnectionOptions(options);
Connection con = _connectionManager.connect(peer, opts);
I2PSocketFull socket = new I2PSocketFull(con); I2PSocketFull socket = new I2PSocketFull(con);
con.setSocket(socket); con.setSocket(socket);
if (con.getConnectionError() != null) { if (con.getConnectionError() != null) {

View File

@ -32,14 +32,18 @@ class SchedulerClosed extends SchedulerImpl {
} }
public boolean accept(Connection con) { public boolean accept(Connection con) {
boolean ok = (con != null) && if (con == null) return false;
(con.getCloseSentOn() > 0) && long timeSinceClose = _context.clock().now() - con.getCloseSentOn();
boolean ok = (con.getCloseSentOn() > 0) &&
(con.getCloseReceivedOn() > 0) && (con.getCloseReceivedOn() > 0) &&
(con.getUnackedPacketsReceived() <= 0) && (con.getUnackedPacketsReceived() <= 0) &&
(con.getUnackedPacketsSent() <= 0) && (con.getUnackedPacketsSent() <= 0) &&
(!con.getResetReceived()) && (!con.getResetReceived()) &&
(con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT > _context.clock().now()); (timeSinceClose < Connection.DISCONNECT_TIMEOUT);
return ok; boolean conTimeout = (con.getOptions().getConnectTimeout() < con.getLifetime()) &&
con.getSendStreamId() == null &&
con.getLifetime() < Connection.DISCONNECT_TIMEOUT;
return (ok || conTimeout);
} }
public void eventOccurred(Connection con) { public void eventOccurred(Connection con) {

View File

@ -35,10 +35,13 @@ class SchedulerConnecting extends SchedulerImpl {
} }
public boolean accept(Connection con) { public boolean accept(Connection con) {
return (con != null) && if (con == null) return false;
boolean notYetConnected = (con.getIsConnected()) &&
(con.getSendStreamId() == null) &&
(con.getLastSendId() >= 0) && (con.getLastSendId() >= 0) &&
(con.getAckedPackets() <= 0) && (con.getAckedPackets() <= 0) &&
(!con.getResetReceived()); (!con.getResetReceived());
return notYetConnected;
} }
public void eventOccurred(Connection con) { public void eventOccurred(Connection con) {

View File

@ -31,14 +31,17 @@ class SchedulerDead extends SchedulerImpl {
} }
public boolean accept(Connection con) { public boolean accept(Connection con) {
boolean ok = (con != null) && if (con == null) return false;
(con.getResetReceived()) || long timeSinceClose = _context.clock().now() - con.getCloseSentOn();
((con.getCloseSentOn() > 0) && boolean nothingLeftToDo = (con.getCloseSentOn() > 0) &&
(con.getCloseReceivedOn() > 0) && (con.getCloseReceivedOn() > 0) &&
(con.getUnackedPacketsReceived() <= 0) && (con.getUnackedPacketsReceived() <= 0) &&
(con.getUnackedPacketsSent() <= 0) && (con.getUnackedPacketsSent() <= 0) &&
(con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT <= _context.clock().now())); (timeSinceClose >= Connection.DISCONNECT_TIMEOUT);
return ok; boolean timedOut = (con.getOptions().getConnectTimeout() < con.getLifetime()) &&
con.getSendStreamId() == null &&
con.getLifetime() >= Connection.DISCONNECT_TIMEOUT;
return con.getResetReceived() || nothingLeftToDo || timedOut;
} }
public void eventOccurred(Connection con) { public void eventOccurred(Connection con) {