SSU: EstablishmentManager fixes (ticket #2397)

This commit is contained in:
zzz
2019-02-08 13:24:42 +00:00
parent 3ba0fcf6da
commit 5b78b53fe8
3 changed files with 45 additions and 45 deletions

View File

@ -477,8 +477,8 @@ class EstablishmentManager {
byte[] fromIP = from.getIP(); byte[] fromIP = from.getIP();
state = new InboundEstablishState(_context, fromIP, from.getPort(), state = new InboundEstablishState(_context, fromIP, from.getPort(),
_transport.getExternalPort(fromIP.length == 16), _transport.getExternalPort(fromIP.length == 16),
_transport.getDHBuilder()); _transport.getDHBuilder(),
state.receiveSessionRequest(reader.getSessionRequestReader()); reader.getSessionRequestReader());
if (_replayFilter.add(state.getReceivedX(), 0, 8)) { if (_replayFilter.add(state.getReceivedX(), 0, 8)) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -652,7 +652,6 @@ class EstablishmentManager {
} catch (IllegalStateException ise) { } catch (IllegalStateException ise) {
continue; continue;
} }
RemoteHostId to = entry.getKey();
List<OutNetMessage> allQueued = entry.getValue(); List<OutNetMessage> allQueued = entry.getValue();
List<OutNetMessage> queued = new ArrayList<OutNetMessage>(); List<OutNetMessage> queued = new ArrayList<OutNetMessage>();
long now = _context.clock().now(); long now = _context.clock().now();
@ -1134,11 +1133,11 @@ class EstablishmentManager {
/** /**
* Drive through the inbound establishment states, adjusting one of them * Drive through the inbound establishment states, adjusting one of them
* as necessary. Called from Establisher thread only. * as necessary. Called from Establisher thread only.
* @return next requested time or -1 * @return next requested time or Long.MAX_VALUE
*/ */
private long handleInbound() { private long handleInbound() {
long now = _context.clock().now(); long now = _context.clock().now();
long nextSendTime = -1; long nextSendTime = Long.MAX_VALUE;
InboundEstablishState inboundState = null; InboundEstablishState inboundState = null;
boolean expired = false; boolean expired = false;
@ -1164,7 +1163,9 @@ class EstablishmentManager {
iter.remove(); iter.remove();
//_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime()); //_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
} else { } else {
if (cur.getNextSendTime() <= now) { // this will always be > 0
long next = cur.getNextSendTime();
if (next <= now) {
// our turn... // our turn...
inboundState = cur; inboundState = cur;
// if (_log.shouldLog(Log.DEBUG)) // if (_log.shouldLog(Log.DEBUG))
@ -1174,14 +1175,8 @@ class EstablishmentManager {
// nothin to do but wait for them to send us // nothin to do but wait for them to send us
// stuff, so lets move on to the next one being // stuff, so lets move on to the next one being
// established // established
long when = -1; if (next < nextSendTime)
if (cur.getNextSendTime() <= 0) { nextSendTime = next;
when = cur.getEstablishBeginTime() + MAX_IB_ESTABLISH_TIME;
} else {
when = cur.getNextSendTime();
}
if (when < nextSendTime)
nextSendTime = when;
} }
} }
} }
@ -1251,11 +1246,11 @@ class EstablishmentManager {
/** /**
* Drive through the outbound establishment states, adjusting one of them * Drive through the outbound establishment states, adjusting one of them
* as necessary. Called from Establisher thread only. * as necessary. Called from Establisher thread only.
* @return next requested time or -1 * @return next requested time or Long.MAX_VALUE
*/ */
private long handleOutbound() { private long handleOutbound() {
long now = _context.clock().now(); long now = _context.clock().now();
long nextSendTime = -1; long nextSendTime = Long.MAX_VALUE;
OutboundEstablishState outboundState = null; OutboundEstablishState outboundState = null;
//int admitted = 0; //int admitted = 0;
//int remaining = 0; //int remaining = 0;
@ -1278,7 +1273,9 @@ class EstablishmentManager {
// _log.debug("Removing expired outbound: " + cur); // _log.debug("Removing expired outbound: " + cur);
break; break;
} else { } else {
if (cur.getNextSendTime() <= now) { // this will be 0 for a new OES that needs sending, > 0 for others
long next = cur.getNextSendTime();
if (next <= now) {
// our turn... // our turn...
outboundState = cur; outboundState = cur;
// if (_log.shouldLog(Log.DEBUG)) // if (_log.shouldLog(Log.DEBUG))
@ -1288,14 +1285,8 @@ class EstablishmentManager {
// nothin to do but wait for them to send us // nothin to do but wait for them to send us
// stuff, so lets move on to the next one being // stuff, so lets move on to the next one being
// established // established
long when = -1; if (next < nextSendTime)
if (cur.getNextSendTime() <= 0) { nextSendTime = next;
when = cur.getEstablishBeginTime() + MAX_OB_ESTABLISH_TIME;
} else {
when = cur.getNextSendTime();
}
if ( (nextSendTime <= 0) || (when < nextSendTime) )
nextSendTime = when;
// if (_log.shouldLog(Log.DEBUG)) // if (_log.shouldLog(Log.DEBUG))
// _log.debug("Outbound doesn't want activity: " + cur + " (next=" + (when-now) + ")"); // _log.debug("Outbound doesn't want activity: " + cur + " (next=" + (when-now) + ")");
} }
@ -1461,8 +1452,9 @@ class EstablishmentManager {
private static final long PRINT_INTERVAL = 5*1000; private static final long PRINT_INTERVAL = 5*1000;
private void doPass() { private void doPass() {
if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) { long now = _context.clock().now();
_lastPrinted = _context.clock().now(); if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < now) {
_lastPrinted = now;
int iactive = _inboundStates.size(); int iactive = _inboundStates.size();
int oactive = _outboundStates.size(); int oactive = _outboundStates.size();
if (iactive > 0 || oactive > 0) { if (iactive > 0 || oactive > 0) {
@ -1476,31 +1468,21 @@ class EstablishmentManager {
} }
} }
_activity = 0; _activity = 0;
long now = _context.clock().now(); if (_lastFailsafe + FAILSAFE_INTERVAL < now) {
if (_lastFailsafe + FAILSAFE_INTERVAL < _context.clock().now()) { _lastFailsafe = now;
_lastFailsafe = _context.clock().now();
doFailsafe(); doFailsafe();
} }
long nextSendTime = -1;
long nextSendInbound = handleInbound();
long nextSendOutbound = handleOutbound();
if (nextSendInbound > 0)
nextSendTime = nextSendInbound;
if ( (nextSendTime < 0) || (nextSendOutbound < nextSendTime) )
nextSendTime = nextSendOutbound;
long nextSendTime = Math.min(handleInbound(), handleOutbound());
long delay = nextSendTime - now; long delay = nextSendTime - now;
if ( (nextSendTime == -1) || (delay > 0) ) { if (delay > 0) {
if (delay > 1000) if (delay > 1000)
delay = 1000; delay = 1000;
try { try {
synchronized (_activityLock) { synchronized (_activityLock) {
if (_activity > 0) if (_activity > 0)
return; return;
if (nextSendTime == -1) _activityLock.wait(delay);
_activityLock.wait(1000);
else
_activityLock.wait(delay);
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
} }

View File

@ -97,7 +97,7 @@ class InboundEstablishState {
* SessionCreated message will be bad if the external port != the internal port. * SessionCreated message will be bad if the external port != the internal port.
*/ */
public InboundEstablishState(RouterContext ctx, byte remoteIP[], int remotePort, int localPort, public InboundEstablishState(RouterContext ctx, byte remoteIP[], int remotePort, int localPort,
DHSessionKeyBuilder dh) { DHSessionKeyBuilder dh, UDPPacketReader.SessionRequestReader req) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(InboundEstablishState.class); _log = ctx.logManager().getLog(InboundEstablishState.class);
_aliceIP = remoteIP; _aliceIP = remoteIP;
@ -108,6 +108,7 @@ class InboundEstablishState {
_establishBegin = ctx.clock().now(); _establishBegin = ctx.clock().now();
_keyBuilder = dh; _keyBuilder = dh;
_queuedMessages = new LinkedBlockingQueue<OutNetMessage>(); _queuedMessages = new LinkedBlockingQueue<OutNetMessage>();
receiveSessionRequest(req);
} }
public synchronized InboundState getState() { return _currentState; } public synchronized InboundState getState() { return _currentState; }
@ -288,6 +289,11 @@ class InboundEstablishState {
/** how long have we been trying to establish this session? */ /** how long have we been trying to establish this session? */
public long getLifetime() { return _context.clock().now() - _establishBegin; } public long getLifetime() { return _context.clock().now() - _establishBegin; }
public long getEstablishBeginTime() { return _establishBegin; } public long getEstablishBeginTime() { return _establishBegin; }
/**
* @return rcv time after receiving a packet (including after constructor),
* send time + delay after sending a packet
*/
public synchronized long getNextSendTime() { return _nextSend; } public synchronized long getNextSendTime() { return _nextSend; }
/** RemoteHostId, uniquely identifies an attempt */ /** RemoteHostId, uniquely identifies an attempt */
@ -478,6 +484,9 @@ class InboundEstablishState {
} }
} }
/**
* Call from synchronized method only
*/
private void packetReceived() { private void packetReceived() {
_nextSend = _context.clock().now(); _nextSend = _context.clock().now();
} }

View File

@ -653,6 +653,12 @@ class OutboundEstablishState {
/** how long have we been trying to establish this session? */ /** how long have we been trying to establish this session? */
public long getLifetime() { return _context.clock().now() - _establishBegin; } public long getLifetime() { return _context.clock().now() - _establishBegin; }
public long getEstablishBeginTime() { return _establishBegin; } public long getEstablishBeginTime() { return _establishBegin; }
/**
* @return 0 at initialization (to force sending session request),
* rcv time after receiving a packet,
* send time + delay after sending a packet (including session request)
*/
public synchronized long getNextSendTime() { return _nextSend; } public synchronized long getNextSendTime() { return _nextSend; }
/** /**
@ -678,10 +684,13 @@ class OutboundEstablishState {
_currentState = OutboundState.OB_STATE_CONFIRMED_COMPLETELY; _currentState = OutboundState.OB_STATE_CONFIRMED_COMPLETELY;
} }
/**
* Call from synchronized method only
*/
private void packetReceived() { private void packetReceived() {
_nextSend = _context.clock().now(); _nextSend = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("Got a packet, nextSend == now"); // _log.debug("Got a packet, nextSend == now");
} }
/** @since 0.8.9 */ /** @since 0.8.9 */