NTCP2: Send termination on idle timeout

Use timer to delay close after sending termination
Prevent sending more data after termination
This commit is contained in:
zzz
2018-08-02 10:56:14 +00:00
parent 726d2f4752
commit 1b6c002883
4 changed files with 75 additions and 15 deletions

View File

@ -1,3 +1,8 @@
2018-08-02 zzz
* i2psnark: Don't disconnect seeds if comments enabled (ticket #2288)
* NTCP2: Send termination on idle timeout
* Streaming: More efficient copying in MessageInputStream
2018-07-28 zzz 2018-07-28 zzz
* Console: Catch ISE in get/setAttribute() (ticket #1529) * Console: Catch ISE in get/setAttribute() (ticket #1529)
* Streaming: Throw exception on read timeout (ticket #2292) * Streaming: Throw exception on read timeout (ticket #2292)

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 19; public final static long BUILD = 20;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@ -296,7 +296,7 @@ class EventPumper implements Runnable {
if ( con.getTimeSinceSend() > expire && if ( con.getTimeSinceSend() > expire &&
con.getTimeSinceReceive() > expire) { con.getTimeSinceReceive() > expire) {
// we haven't sent or received anything in a really long time, so lets just close 'er up // we haven't sent or received anything in a really long time, so lets just close 'er up
con.close(); con.sendTerminationAndClose();
if (_log.shouldInfo()) if (_log.shouldInfo())
_log.info("Failsafe or expire close for " + con); _log.info("Failsafe or expire close for " + con);
failsafeCloses++; failsafeCloses++;

View File

@ -629,8 +629,8 @@ public class NTCPConnection implements Closeable {
*/ */
private void enqueueInfoMessageNTCP1() { private void enqueueInfoMessageNTCP1() {
int priority = INFO_PRIORITY; int priority = INFO_PRIORITY;
if (_log.shouldLog(Log.INFO)) if (_log.shouldDebug())
_log.info("SENDING INFO message pri. " + priority + ": " + toString()); _log.debug("SENDING INFO message pri. " + priority + ": " + toString());
DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context); DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context);
dsm.setEntry(_context.router().getRouterInfo()); dsm.setEntry(_context.router().getRouterInfo());
// We are injecting directly, so we can use a null target. // We are injecting directly, so we can use a null target.
@ -992,7 +992,26 @@ public class NTCPConnection implements Closeable {
} }
/** /**
* NTCP2 only * NTCP 1 or 2.
* For NTCP1, sends termination and then closes the connection after a brief delay.
* For NTCP2, simply closes the connection immediately.
*
* @since 0.9.36
*/
void sendTerminationAndClose() {
ReadState rs = null;
synchronized (this) {
if (_version == 2 && isEstablished())
rs = _curReadState;
}
if (rs != null)
sendTermination(REASON_TIMEOUT, rs.getFramesReceived());
else
close();
}
/**
* NTCP2 only. Also closes the connection after a brief delay.
* *
* @since 0.9.36 * @since 0.9.36
*/ */
@ -1015,7 +1034,16 @@ public class NTCPConnection implements Closeable {
} }
// use a "read buf" for the temp array // use a "read buf" for the temp array
ByteArray dataBuf = acquireReadBuf(); ByteArray dataBuf = acquireReadBuf();
sendNTCP2(dataBuf.getData(), blocks); synchronized(this) {
if (_sender != null) {
sendNTCP2(dataBuf.getData(), blocks);
_sender.destroy();
// this "plugs" the NTCP2 sender, so sendNTCP2()
// won't send any more after the termination.
_sender = null;
new DelayedCloser();
}
}
releaseReadBuf(dataBuf); releaseReadBuf(dataBuf);
} }
@ -1239,8 +1267,8 @@ public class NTCPConnection implements Closeable {
// push through the bw limiter to reach _writeBufs // push through the bw limiter to reach _writeBufs
if (!_outbound.isEmpty()) if (!_outbound.isEmpty())
_transport.getWriter().wantsWrite(this, "write completed"); _transport.getWriter().wantsWrite(this, "write completed");
if (_log.shouldLog(Log.INFO)) if (_log.shouldDebug())
_log.info("I2NP meta message sent completely"); _log.debug("I2NP meta message sent completely");
// need to increment as EventPumper will close conn if not completed // need to increment as EventPumper will close conn if not completed
_messagesWritten.incrementAndGet(); _messagesWritten.incrementAndGet();
} }
@ -1427,6 +1455,7 @@ public class NTCPConnection implements Closeable {
private interface ReadState { private interface ReadState {
public void receive(ByteBuffer buf); public void receive(ByteBuffer buf);
public void destroy(); public void destroy();
public int getFramesReceived();
} }
/** /**
@ -1733,6 +1762,13 @@ public class NTCPConnection implements Closeable {
// get it ready for the next I2NP message // get it ready for the next I2NP message
init(); init();
} }
/*
* Dummy.
* @return 0 always
* @since 0.9.36
*/
public int getFramesReceived() { return 0; }
} }
//// NTCP2 below here //// NTCP2 below here
@ -1819,8 +1855,6 @@ public class NTCPConnection implements Closeable {
_nextInfoTime = Long.MAX_VALUE; _nextInfoTime = Long.MAX_VALUE;
_paddingConfig = OUR_PADDING; _paddingConfig = OUR_PADDING;
sendTermination(reason, 0); sendTermination(reason, 0);
try { Thread.sleep(NTCP2_TERMINATION_CLOSE_DELAY); } catch (InterruptedException ie) {}
close();
} }
/** /**
@ -2031,6 +2065,8 @@ public class NTCPConnection implements Closeable {
_terminated = true; _terminated = true;
} }
public int getFramesReceived() { return _frameCount; }
//// PayloadCallbacks //// PayloadCallbacks
public void gotRI(RouterInfo ri, boolean isHandshake, boolean flood) throws DataFormatException { public void gotRI(RouterInfo ri, boolean isHandshake, boolean flood) throws DataFormatException {
@ -2139,8 +2175,6 @@ public class NTCPConnection implements Closeable {
if (_log.shouldWarn()) if (_log.shouldWarn())
_log.warn("immediate close after AEAD failure and reading " + toRead); _log.warn("immediate close after AEAD failure and reading " + toRead);
sendTermination(REASON_AEAD, validFramesRcvd); sendTermination(REASON_AEAD, validFramesRcvd);
try { Thread.sleep(NTCP2_TERMINATION_CLOSE_DELAY); } catch (InterruptedException ie) {}
close();
} }
} }
@ -2172,11 +2206,11 @@ public class NTCPConnection implements Closeable {
_read += buf.remaining(); _read += buf.remaining();
if (_read >= _toRead) { if (_read >= _toRead) {
cancel(); cancel();
// only do this once
_read = Integer.MIN_VALUE;
if (_log.shouldWarn()) if (_log.shouldWarn())
_log.warn("close after AEAD failure and reading " + _toRead); _log.warn("close after AEAD failure and reading " + _toRead);
sendTermination(REASON_AEAD, _validFramesRcvd); sendTermination(REASON_AEAD, _validFramesRcvd);
try { Thread.sleep(NTCP2_TERMINATION_CLOSE_DELAY); } catch (InterruptedException ie) {}
close();
} }
} }
@ -2185,10 +2219,31 @@ public class NTCPConnection implements Closeable {
} }
public void timeReached() { public void timeReached() {
// only do this once
_read = Integer.MIN_VALUE;
if (_log.shouldWarn()) if (_log.shouldWarn())
_log.warn("timeout after AEAD failure waiting for more data"); _log.warn("timeout after AEAD failure waiting for more data");
sendTermination(REASON_AEAD, _validFramesRcvd); sendTermination(REASON_AEAD, _validFramesRcvd);
try { Thread.sleep(NTCP2_TERMINATION_CLOSE_DELAY); } catch (InterruptedException ie) {} }
public int getFramesReceived() { return 0; }
}
/**
* Close after a brief timeout.
* Construct after sending termination.
*
* @since 0.9.36
*/
private class DelayedCloser extends SimpleTimer2.TimedEvent {
/** schedules itself */
public DelayedCloser() {
super(_context.simpleTimer2());
schedule(NTCP2_TERMINATION_CLOSE_DELAY);
}
public void timeReached() {
close(); close();
} }
} }