* hang around for 5m (er, 2.5msl, i suppose) after connection closure no matter what, so we

can respond apropriately
* optional inactivty timer with three possible results:
  disconnect, send a (blank) message (to be ACKed), or do nothing.
This commit is contained in:
jrandom
2004-11-08 15:05:13 +00:00
committed by zzz
parent 18ab9b80d2
commit 9ea603caf2
7 changed files with 146 additions and 14 deletions

View File

@ -53,10 +53,16 @@ public class Connection {
private I2PSocketFull _socket; private I2PSocketFull _socket;
/** set to an error cause if the connection could not be established */ /** set to an error cause if the connection could not be established */
private String _connectionError; private String _connectionError;
private boolean _disconnectScheduled;
private long _lastReceivedOn;
private ActivityTimer _activityTimer;
public static final long MAX_RESEND_DELAY = 60*1000; public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 20*1000; public static final long MIN_RESEND_DELAY = 20*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */
public static long DISCONNECT_TIMEOUT = 5*60*1000;
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) {
this(ctx, manager, chooser, queue, handler, null); this(ctx, manager, chooser, queue, handler, null);
} }
@ -83,6 +89,9 @@ public class Connection {
_connectionManager = manager; _connectionManager = manager;
_resetReceived = false; _resetReceived = false;
_connected = true; _connected = true;
_disconnectScheduled = false;
_lastReceivedOn = -1;
_activityTimer = new ActivityTimer();
} }
public long getNextOutboundPacketNum() { public long getNextOutboundPacketNum() {
@ -190,6 +199,7 @@ public class Connection {
_lastSendTime = _context.clock().now(); _lastSendTime = _context.clock().now();
_outboundQueue.enqueue(packet); _outboundQueue.enqueue(packet);
resetActivityTimer();
if (ackOnly) { if (ackOnly) {
// ACK only, don't schedule this packet for retries // ACK only, don't schedule this packet for retries
@ -304,13 +314,39 @@ public class Connection {
_outboundPackets.clear(); _outboundPackets.clear();
_outboundPackets.notifyAll(); _outboundPackets.notifyAll();
} }
if (removeFromConMgr) if (removeFromConMgr) {
_connectionManager.removeConnection(this); if (!_disconnectScheduled) {
_disconnectScheduled = true;
SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
}
}
} }
} }
void disconnectComplete() { void disconnectComplete() {
_connectionManager.removeConnection(this); _connected = false;
if (!_disconnectScheduled) {
_disconnectScheduled = true;
if (_log.shouldLog(Log.INFO))
_log.info("Connection disconnect complete from dead, drop the con "
+ toString());
_connectionManager.removeConnection(this);
}
}
private class DisconnectEvent implements SimpleTimer.TimedEvent {
public DisconnectEvent() {
if (_log.shouldLog(Log.INFO))
_log.info("Connection disconnect timer initiated: 5 minutes to drop "
+ Connection.this.toString());
}
public void timeReached() {
if (_log.shouldLog(Log.INFO))
_log.info("Connection disconnect timer complete, drop the con "
+ Connection.this.toString());
_connectionManager.removeConnection(Connection.this);
}
} }
private void doClose() { private void doClose() {
@ -398,6 +434,64 @@ public class Connection {
public long getHighestAckedThrough() { return _highestAckedThrough; } public long getHighestAckedThrough() { return _highestAckedThrough; }
public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; } public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; }
public long getLastActivityOn() {
return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn);
}
void packetReceived() {
_lastReceivedOn = _context.clock().now();
resetActivityTimer();
}
private void resetActivityTimer() {
if (_options.getInactivityTimeout() <= 0) return;
long howLong = _activityTimer.getTimeLeft();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer to " + howLong);
// this will get rescheduled, and rescheduled, and rescheduled...
SimpleTimer.getInstance().addEvent(_activityTimer, howLong);
}
private class ActivityTimer implements SimpleTimer.TimedEvent {
public void timeReached() {
// uh, nothing more to do...
if (!_connected) return;
// we got rescheduled already
if (getTimeLeft() > 0) return;
// these are either going to time out or cause further rescheduling
if (getUnackedPacketsSent() > 0) return;
// wtf, this shouldn't have been scheduled
if (_options.getInactivityTimeout() <= 0) return;
// bugger it, might as well do the hard work now
switch (_options.getInactivityAction()) {
case ConnectionOptions.INACTIVITY_ACTION_SEND:
if (_log.shouldLog(Log.WARN))
_log.warn("Sending some data due to inactivity");
_receiver.send(null, 0, 0, true);
break;
case ConnectionOptions.INACTIVITY_ACTION_NOOP:
if (_log.shouldLog(Log.WARN))
_log.warn("Inactivity timer expired, but we aint doin' shit");
break;
case ConnectionOptions.INACTIVITY_ACTION_DISCONNECT:
// fall through
default:
if (_log.shouldLog(Log.WARN))
_log.warn("Closing connection due to inactivity");
disconnect(true);
break;
}
}
public final long getTimeLeft() {
if (getLastActivityOn() > 0)
return getLastActivityOn() + _options.getInactivityTimeout() - _context.clock().now();
else
return _createdOn + _options.getInactivityTimeout() - _context.clock().now();
}
}
/** stream that the local peer receives data on */ /** stream that the local peer receives data on */
public MessageInputStream getInputStream() { return _inputStream; } public MessageInputStream getInputStream() { return _inputStream; }
/** stream that the local peer sends data to the remote peer on */ /** stream that the local peer sends data to the remote peer on */

View File

@ -55,7 +55,14 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
public PacketLocal send(byte buf[], int off, int size) { public PacketLocal send(byte buf[], int off, int size) {
PacketLocal packet = buildPacket(buf, off, size); return send(buf, off, size, false);
}
/**
* @param forceIncrement even if the buffer is empty, increment the packetId
* so we get an ACK back
*/
public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) {
PacketLocal packet = buildPacket(buf, off, size, forceIncrement);
_connection.sendPacket(packet); _connection.sendPacket(packet);
return packet; return packet;
} }
@ -69,14 +76,14 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
return ackOnly; return ackOnly;
} }
private PacketLocal buildPacket(byte buf[], int off, int size) { private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) {
boolean ackOnly = isAckOnly(size); boolean ackOnly = isAckOnly(size);
PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection); PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection);
byte data[] = new byte[size]; byte data[] = new byte[size];
if (size > 0) if (size > 0)
System.arraycopy(buf, off, data, 0, size); System.arraycopy(buf, off, data, 0, size);
packet.setPayload(data); packet.setPayload(data);
if (ackOnly) if (ackOnly && !forceIncrement)
packet.setSequenceNum(0); packet.setSequenceNum(0);
else else
packet.setSequenceNum(_connection.getNextOutboundPacketNum()); packet.setSequenceNum(_connection.getNextOutboundPacketNum());

View File

@ -18,10 +18,19 @@ public class ConnectionOptions extends I2PSocketOptions {
private int _maxMessageSize; private int _maxMessageSize;
private int _choke; private int _choke;
private int _maxResends; private int _maxResends;
private int _inactivityTimeout;
private int _inactivityAction;
public static final int PROFILE_BULK = 1; public static final int PROFILE_BULK = 1;
public static final int PROFILE_INTERACTIVE = 2; public static final int PROFILE_INTERACTIVE = 2;
/** on inactivity timeout, do nothing */
public static final int INACTIVITY_ACTION_NOOP = 0;
/** on inactivity timeout, close the connection */
public static final int INACTIVITY_ACTION_DISCONNECT = 1;
/** on inactivity timeout, send a payload message */
public static final int INACTIVITY_ACTION_SEND = 2;
public ConnectionOptions() { public ConnectionOptions() {
super(); super();
init(null); init(null);
@ -48,6 +57,8 @@ public class ConnectionOptions extends I2PSocketOptions {
setMaxMessageSize(opts.getMaxMessageSize()); setMaxMessageSize(opts.getMaxMessageSize());
setChoke(opts.getChoke()); setChoke(opts.getChoke());
setMaxResends(opts.getMaxResends()); setMaxResends(opts.getMaxResends());
setInactivityTimeout(opts.getInactivityTimeout());
setInactivityAction(opts.getInactivityAction());
} else { } else {
setConnectDelay(2*1000); setConnectDelay(2*1000);
setProfile(PROFILE_BULK); setProfile(PROFILE_BULK);
@ -59,6 +70,8 @@ public class ConnectionOptions extends I2PSocketOptions {
setWindowSize(1); setWindowSize(1);
setMaxResends(5); setMaxResends(5);
setWriteTimeout(-1); setWriteTimeout(-1);
setInactivityTimeout(5*60*1000);
setInactivityAction(INACTIVITY_ACTION_SEND);
} }
} }
@ -151,7 +164,11 @@ public class ConnectionOptions extends I2PSocketOptions {
* *
*/ */
public int getProfile() { return _profile; } public int getProfile() { return _profile; }
public void setProfile(int profile) { _profile = profile; } public void setProfile(int profile) {
if (profile != PROFILE_BULK)
throw new IllegalArgumentException("Only bulk is supported so far");
_profile = profile;
}
/** /**
* How many times will we try to send a message before giving up? * How many times will we try to send a message before giving up?
@ -159,4 +176,14 @@ public class ConnectionOptions extends I2PSocketOptions {
*/ */
public int getMaxResends() { return _maxResends; } public int getMaxResends() { return _maxResends; }
public void setMaxResends(int numSends) { _maxResends = numSends; } public void setMaxResends(int numSends) { _maxResends = numSends; }
/**
* What period of inactivity qualifies as "too long"?
*
*/
public int getInactivityTimeout() { return _inactivityTimeout; }
public void setInactivityTimeout(int timeout) { _inactivityTimeout = timeout; }
public int getInactivityAction() { return _inactivityAction; }
public void setInactivityAction(int action) { _inactivityAction = action; }
} }

View File

@ -26,6 +26,7 @@ public class ConnectionPacketHandler {
void receivePacket(Packet packet, Connection con) throws I2PException { void receivePacket(Packet packet, Connection con) throws I2PException {
boolean ok = verifyPacket(packet, con); boolean ok = verifyPacket(packet, con);
if (!ok) return; if (!ok) return;
con.packetReceived();
boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
// close *after* receiving the data, as well as after verifying the signatures / etc // close *after* receiving the data, as well as after verifying the signatures / etc

View File

@ -31,8 +31,6 @@ class SchedulerClosed extends SchedulerImpl {
_log = ctx.logManager().getLog(SchedulerClosed.class); _log = ctx.logManager().getLog(SchedulerClosed.class);
} }
static final long CLOSE_TIMEOUT = 30*1000;
public boolean accept(Connection con) { public boolean accept(Connection con) {
boolean ok = (con != null) && boolean ok = (con != null) &&
(con.getCloseSentOn() > 0) && (con.getCloseSentOn() > 0) &&
@ -40,12 +38,12 @@ class SchedulerClosed extends SchedulerImpl {
(con.getUnackedPacketsReceived() <= 0) && (con.getUnackedPacketsReceived() <= 0) &&
(con.getUnackedPacketsSent() <= 0) && (con.getUnackedPacketsSent() <= 0) &&
(!con.getResetReceived()) && (!con.getResetReceived()) &&
(con.getCloseSentOn() + CLOSE_TIMEOUT > _context.clock().now()); (con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT > _context.clock().now());
return ok; return ok;
} }
public void eventOccurred(Connection con) { public void eventOccurred(Connection con) {
long timeLeft = con.getCloseSentOn() + CLOSE_TIMEOUT - _context.clock().now(); long timeLeft = con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT - _context.clock().now();
reschedule(timeLeft, con); reschedule(timeLeft, con);
} }
} }

View File

@ -45,9 +45,14 @@ class SchedulerClosing extends SchedulerImpl {
if (con.getNextSendTime() <= 0) if (con.getNextSendTime() <= 0)
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
long remaining = con.getNextSendTime() - _context.clock().now(); long remaining = con.getNextSendTime() - _context.clock().now();
if (remaining <= 0) if (remaining <= 0) {
con.sendAvailable(); con.sendAvailable();
else con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
} else {
//if (remaining < 5*1000)
// remaining = 5*1000;
//con.setNextSendTime(when
reschedule(remaining, con); reschedule(remaining, con);
}
} }
} }

View File

@ -37,7 +37,7 @@ class SchedulerDead extends SchedulerImpl {
(con.getCloseReceivedOn() > 0) && (con.getCloseReceivedOn() > 0) &&
(con.getUnackedPacketsReceived() <= 0) && (con.getUnackedPacketsReceived() <= 0) &&
(con.getUnackedPacketsSent() <= 0) && (con.getUnackedPacketsSent() <= 0) &&
(con.getCloseSentOn() + SchedulerClosed.CLOSE_TIMEOUT <= _context.clock().now())); (con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT <= _context.clock().now()));
return ok; return ok;
} }