Streaming: Move remaining timers from the context to streaming's SimpleTimer2;

these were the ones migrated from SimpleScheduler earlier
This commit is contained in:
zzz
2015-09-18 14:36:49 +00:00
parent bfc6534b20
commit 64889b2bc2
6 changed files with 33 additions and 14 deletions

View File

@ -793,7 +793,7 @@ class Connection {
private boolean scheduleDisconnectEvent() { private boolean scheduleDisconnectEvent() {
if (!_disconnectScheduledOn.compareAndSet(0, _context.clock().now())) if (!_disconnectScheduledOn.compareAndSet(0, _context.clock().now()))
return false; return false;
_context.simpleTimer2().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT); schedule(new DisconnectEvent(), DISCONNECT_TIMEOUT);
return true; return true;
} }
@ -808,6 +808,24 @@ class Connection {
} }
} }
/**
* Called from SchedulerImpl
*
* @since 0.9.23 moved here so we can use our timer
*/
public void scheduleConnectionEvent(long msToWait) {
schedule(_connectionEvent, msToWait);
}
/**
* Schedule something on our timer.
*
* @since 0.9.23
*/
public void schedule(SimpleTimer.TimedEvent event, long msToWait) {
_timer.addEvent(event, msToWait);
}
private boolean _remotePeerSet = false; private boolean _remotePeerSet = false;
/** who are we talking with /** who are we talking with
* @return peer Destination * @return peer Destination
@ -1254,8 +1272,6 @@ class Connection {
return buf.toString(); return buf.toString();
} }
public SimpleTimer.TimedEvent getConnectionEvent() { return _connectionEvent; }
/** /**
* fired to reschedule event notification * fired to reschedule event notification
*/ */

View File

@ -9,6 +9,7 @@ import net.i2p.I2PAppContext;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/** /**
* Receive new connection attempts * Receive new connection attempts
@ -23,6 +24,7 @@ class ConnectionHandler {
private final Log _log; private final Log _log;
private final ConnectionManager _manager; private final ConnectionManager _manager;
private final LinkedBlockingQueue<Packet> _synQueue; private final LinkedBlockingQueue<Packet> _synQueue;
private final SimpleTimer2 _timer;
private volatile boolean _active; private volatile boolean _active;
private int _acceptTimeout; private int _acceptTimeout;
@ -37,10 +39,11 @@ class ConnectionHandler {
private static final int MAX_QUEUE_SIZE = 64; private static final int MAX_QUEUE_SIZE = 64;
/** Creates a new instance of ConnectionHandler */ /** Creates a new instance of ConnectionHandler */
public ConnectionHandler(I2PAppContext context, ConnectionManager mgr) { public ConnectionHandler(I2PAppContext context, ConnectionManager mgr, SimpleTimer2 timer) {
_context = context; _context = context;
_log = context.logManager().getLog(ConnectionHandler.class); _log = context.logManager().getLog(ConnectionHandler.class);
_manager = mgr; _manager = mgr;
_timer = timer;
_synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE); _synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE);
_acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
} }
@ -96,7 +99,7 @@ class ConnectionHandler {
// also check if expiration of the head is long past for overload detection with peek() ? // also check if expiration of the head is long past for overload detection with peek() ?
boolean success = _synQueue.offer(packet); // fail immediately if full boolean success = _synQueue.offer(packet); // fail immediately if full
if (success) { if (success) {
_context.simpleTimer2().addEvent(new TimeoutSyn(packet), _acceptTimeout); _timer.addEvent(new TimeoutSyn(packet), _acceptTimeout);
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Dropping new SYN request, as the queue is full"); _log.warn("Dropping new SYN request, as the queue is full");

View File

@ -74,11 +74,11 @@ class ConnectionManager {
_pendingPings = new ConcurrentHashMap<Long,PingRequest>(4); _pendingPings = new ConcurrentHashMap<Long,PingRequest>(4);
_messageHandler = new MessageHandler(_context, this); _messageHandler = new MessageHandler(_context, this);
_packetHandler = new PacketHandler(_context, this); _packetHandler = new PacketHandler(_context, this);
_connectionHandler = new ConnectionHandler(_context, this);
_schedulerChooser = new SchedulerChooser(_context); _schedulerChooser = new SchedulerChooser(_context);
_conPacketHandler = new ConnectionPacketHandler(_context); _conPacketHandler = new ConnectionPacketHandler(_context);
_timer = new RetransmissionTimer(_context, "Streaming Timer " + _timer = new RetransmissionTimer(_context, "Streaming Timer " +
session.getMyDestination().calculateHash().toBase64().substring(0, 4)); session.getMyDestination().calculateHash().toBase64().substring(0, 4));
_connectionHandler = new ConnectionHandler(_context, this, _timer);
_tcbShare = new TCBShare(_context, _timer); _tcbShare = new TCBShare(_context, _timer);
// PROTO_ANY is for backward compatibility (pre-0.7.1) // PROTO_ANY is for backward compatibility (pre-0.7.1)
// TODO change proto to PROTO_STREAMING someday. // TODO change proto to PROTO_STREAMING someday.
@ -88,7 +88,7 @@ class ConnectionManager {
// As of 0.9.1, listen on configured port (default 0 = all) // As of 0.9.1, listen on configured port (default 0 = all)
int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY; int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY;
_session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort()); _session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort());
_outboundQueue = new PacketQueue(_context); _outboundQueue = new PacketQueue(_context, _timer);
_recentlyClosed = new LHMCache<Long, Object>(32); _recentlyClosed = new LHMCache<Long, Object>(32);
/** Socket timeout for accept() */ /** Socket timeout for accept() */
_soTimeout = -1; _soTimeout = -1;
@ -839,7 +839,7 @@ class ConnectionManager {
private final PingNotifier _notifier; private final PingNotifier _notifier;
public PingFailed(Long id, PingNotifier notifier) { public PingFailed(Long id, PingNotifier notifier) {
super(_context.simpleTimer2()); super(_timer);
_id = id; _id = id;
_notifier = notifier; _notifier = notifier;
} }

View File

@ -207,7 +207,7 @@ class ConnectionPacketHandler {
final long delay = nextSendTime - now; final long delay = nextSendTime - now;
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("scheduling ack in " + delay); _log.info("scheduling ack in " + delay);
_context.simpleTimer2().addEvent(new AckDup(con), delay); con.schedule(new AckDup(con), delay);
} }
} else { } else {

View File

@ -45,11 +45,11 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
private static final long REMOVE_EXPIRED_TIME = 67*1000; private static final long REMOVE_EXPIRED_TIME = 67*1000;
private static final boolean ENABLE_STATUS_LISTEN = true; private static final boolean ENABLE_STATUS_LISTEN = true;
public PacketQueue(I2PAppContext context) { public PacketQueue(I2PAppContext context, SimpleTimer2 timer) {
_context = context; _context = context;
_log = context.logManager().getLog(PacketQueue.class); _log = context.logManager().getLog(PacketQueue.class);
_messageStatusMap = new ConcurrentHashMap<Long, Connection>(16); _messageStatusMap = new ConcurrentHashMap<Long, Connection>(16);
new RemoveExpired(); new RemoveExpired(timer);
// all createRateStats in ConnectionManager // all createRateStats in ConnectionManager
} }
@ -328,8 +328,8 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
*/ */
private class RemoveExpired extends SimpleTimer2.TimedEvent { private class RemoveExpired extends SimpleTimer2.TimedEvent {
public RemoveExpired() { public RemoveExpired(SimpleTimer2 timer) {
super(_context.simpleTimer2(), REMOVE_EXPIRED_TIME); super(timer, REMOVE_EXPIRED_TIME);
} }
public void timeReached() { public void timeReached() {

View File

@ -16,7 +16,7 @@ abstract class SchedulerImpl implements TaskScheduler {
} }
protected void reschedule(long msToWait, Connection con) { protected void reschedule(long msToWait, Connection con) {
_context.simpleTimer2().addEvent(con.getConnectionEvent(), msToWait); con.scheduleConnectionEvent(msToWait);
} }
@Override @Override