From 100d30703794e1030f4b8481ee30aade7827b1d6 Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 2 Dec 2017 23:12:19 +0000 Subject: [PATCH] i2ptunnel: Propagate resets from streaming to Socket and vice versa (ticket #2071) --- .../i2p/i2ptunnel/I2PTunnelHTTPServer.java | 57 ++++++- .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 140 ++++++++++++------ .../i2p/client/streaming/impl/Connection.java | 5 +- .../client/streaming/impl/PacketLocal.java | 6 +- history.txt | 8 + .../src/net/i2p/router/RouterVersion.java | 2 +- 6 files changed, 164 insertions(+), 54 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java index 9dbac300cb..81fa555fcc 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java @@ -24,6 +24,7 @@ import java.util.zip.GZIPOutputStream; import javax.net.ssl.SSLException; import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketException; import net.i2p.I2PAppContext; import net.i2p.data.Base32; import net.i2p.data.ByteArray; @@ -597,6 +598,8 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { OutputStream browserout = null; InputStream browserin = null; InputStream serverin = null; + Sender s = null; + IOException ioex = null; try { serverout = _webserver.getOutputStream(); @@ -642,7 +645,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { String modifiedHeaders = formatHeaders(headers, command); compressedOut.write(DataHelper.getUTF8(modifiedHeaders)); - Sender s = new Sender(compressedOut, serverin, "server: server to browser", _log); + s = new Sender(compressedOut, serverin, "server: server to browser", _log); if (_log.shouldLog(Log.INFO)) _log.info("Before pumping the compressed response"); s.run(); // same thread @@ -658,7 +661,46 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error compressing", ioe); + ioex = ioe; } finally { + if (ioex == null && s != null) + ioex = s.getFailure(); + if (ioex != null) { + // Reset propagation, simplified from I2PTunnelRunner + boolean i2pReset = false; + if (ioex instanceof I2PSocketException) { + I2PSocketException ise = (I2PSocketException) ioex; + int status = ise.getStatus(); + i2pReset = status == I2PSocketException.STATUS_CONNECTION_RESET; + if (i2pReset) { + if (_log.shouldWarn()) + _log.warn("Server got I2P reset, resetting socket", ioex); + try { + _webserver.setSoLinger(true, 0); + } catch (IOException ioe) {} + try { + _webserver.close(); + } catch (IOException ioe) {} + try { + _browser.close(); + } catch (IOException ioe) {} + } + } + if (!i2pReset && ioex instanceof SocketException) { + String msg = ioex.getMessage(); + boolean sockReset = msg != null && msg.contains("reset"); + if (sockReset) { + if (_log.shouldWarn()) + _log.warn("Server got socket reset, resetting I2P socket"); + try { + _browser.reset(); + } catch (IOException ioe) {} + try { + _webserver.close(); + } catch (IOException ioe) {} + } + } + } if (browserout != null) try { browserout.close(); } catch (IOException ioe) {} if (serverout != null) try { serverout.close(); } catch (IOException ioe) {} if (browserin != null) try { browserin.close(); } catch (IOException ioe) {} @@ -673,6 +715,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { private final String _name; // shadows _log in super() private final Log _log; + private IOException _failure; public Sender(OutputStream out, InputStream in, String name, Log log) { _out = out; @@ -691,12 +734,22 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { //_out.flush(); } catch (IOException ioe) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Error sending", ioe); + _log.debug(_name + " Error sending", ioe); + synchronized(this) { + _failure = ioe; + } } finally { if (_out != null) try { _out.close(); } catch (IOException ioe) {} if (_in != null) try { _in.close(); } catch (IOException ioe) {} } } + + /** + * @since 0.9.33 + */ + public synchronized IOException getFailure() { + return _failure; + } } /** diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index a7f60819aa..c3059edc7a 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -17,6 +17,7 @@ import javax.net.ssl.SSLException; import net.i2p.I2PAppContext; import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketException; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; import net.i2p.util.ByteCache; @@ -287,8 +288,10 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr if (_log.shouldLog(Log.DEBUG)) _log.debug("At least one forwarder completed, closing and joining"); + boolean i2pReset = false; + boolean sockReset = false; // this task is useful for the httpclient - if (onTimeout != null || _onFail != null) { + if ((onTimeout != null || _onFail != null) && totalReceived <= 0) { if (_log.shouldLog(Log.DEBUG)) _log.debug("runner has a timeout job, totalReceived = " + totalReceived + " totalSent = " + totalSent + " job = " + onTimeout); @@ -296,20 +299,65 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr // This will be run even if initialSocketData != null, it's the timeout job's // responsibility to know that and decide whether or not to write to the socket. // HTTPClient never sets initialSocketData. - if (totalReceived <= 0) { - if (_onFail != null) { - Exception e = fromI2P.getFailure(); - if (e == null) - e = toI2P.getFailure(); - _onFail.onFail(e); - } else { - onTimeout.run(); - } + if (_onFail != null) { + Exception e = fromI2P.getFailure(); + if (e == null) + e = toI2P.getFailure(); + _onFail.onFail(e); + } else { + onTimeout.run(); + } + } else { + // Detect a reset on one side, and propagate to the other + Exception e1 = fromI2P.getFailure(); + Exception e2 = toI2P.getFailure(); + Throwable c1 = e1 != null ? e1.getCause() : null; + Throwable c2 = e2 != null ? e2.getCause() : null; + if (c1 != null && c1 instanceof I2PSocketException) { + I2PSocketException ise = (I2PSocketException) c1; + int status = ise.getStatus(); + i2pReset = status == I2PSocketException.STATUS_CONNECTION_RESET; + } + if (!i2pReset && c2 != null && c2 instanceof I2PSocketException) { + I2PSocketException ise = (I2PSocketException) c2; + int status = ise.getStatus(); + i2pReset = status == I2PSocketException.STATUS_CONNECTION_RESET; + } + if (!i2pReset && e1 != null && e1 instanceof SocketException) { + String msg = e1.getMessage(); + sockReset = msg != null && msg.contains("reset"); + } + if (!sockReset && e2 != null && e2 instanceof SocketException) { + String msg = e2.getMessage(); + sockReset = msg != null && msg.contains("reset"); } } - - // now one connection is dead - kill the other as well, after making sure we flush - close(out, in, i2pout, i2pin, s, i2ps, toI2P, fromI2P); + + if (i2pReset) { + if (_log.shouldWarn()) + _log.warn("Got I2P reset, resetting socket"); + try { + s.setSoLinger(true, 0); + } catch (IOException ioe) {} + try { + s.close(); + } catch (IOException ioe) {} + try { + i2ps.close(); + } catch (IOException ioe) {} + } else if (sockReset) { + if (_log.shouldWarn()) + _log.warn("Got socket reset, resetting I2P socket"); + try { + i2ps.reset(); + } catch (IOException ioe) {} + try { + s.close(); + } catch (IOException ioe) {} + } else { + // now one connection is dead - kill the other as well, after making sure we flush + close(out, in, i2pout, i2pin, s, i2ps, toI2P, fromI2P); + } } catch (InterruptedException ex) { if (_log.shouldLog(Log.ERROR)) _log.error("Interrupted", ex); @@ -361,36 +409,24 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException { try { out.flush(); - } catch (IOException ioe) { - // ignore - } + } catch (IOException ioe) {} try { i2pout.flush(); - } catch (IOException ioe) { - // ignore - } + } catch (IOException ioe) {} try { in.close(); - } catch (IOException ioe) { - // ignore - } + } catch (IOException ioe) {} try { i2pin.close(); - } catch (IOException ioe) { - // ignore - } + } catch (IOException ioe) {} // ok, yeah, there's a race here in theory, if data comes in after flushing and before // closing, but its better than before... try { s.close(); - } catch (IOException ioe) { - // ignore - } + } catch (IOException ioe) {} try { i2ps.close(); - } catch (IOException ioe) { - // ignore - } + } catch (IOException ioe) {} t1.join(30*1000); // t2 = fromI2P now run inline //t2.join(30*1000); @@ -469,9 +505,9 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr if (in.available() == 0) { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Flushing after sending " + len + " bytes through"); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(direction + ": " + len + " bytes flushed through " + (_toI2P ? "to " : "from ") - + to); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(direction + ": " + len + " bytes flushed through " + (_toI2P ? "to " : "from ") + // + to); if (_toI2P) { try { Thread.sleep(5); @@ -486,29 +522,35 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr } } } + //if (_log.shouldDebug()) + // _log.debug(direction + ": Normal EOF on read"); //out.flush(); // close() flushes } catch (SocketException ex) { // this *will* occur when the other threads closes the socket - synchronized (finishLock) { - if (!finished) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(direction + ": Socket closed - error reading and writing", - ex); + if (_log.shouldDebug()) { + boolean fnshd; + synchronized (finishLock) { + fnshd = finished; + } + if (!fnshd) { + _log.debug(direction + ": IOE - error forwarding", ex); + } else { + _log.debug(direction + ": IOE caused by other direction", ex); } } _failure = ex; - } catch (InterruptedIOException ex) { - if (_log.shouldLog(Log.WARN)) - _log.warn(direction + ": Closing connection due to timeout (error: \"" - + ex.getMessage() + "\")"); - _failure = ex; } catch (IOException ex) { - if (!finished) { - if (_log.shouldLog(Log.WARN)) - _log.warn(direction + ": Error forwarding", ex); + if (_log.shouldWarn()) { + boolean fnshd; + synchronized (finishLock) { + fnshd = finished; + } + if (!fnshd) { + _log.warn(direction + ": IOE - error forwarding", ex); + } else if (_log.shouldDebug()) { + _log.debug(direction + ": IOE caused by other direction", ex); + } } - //else - // _log.warn("You may ignore this", ex); _failure = ex; } finally { _cache.release(ba); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index 474e89bc30..f9bac0dc06 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -200,8 +200,11 @@ class Connection { // no need to wait until the other side has ACKed us before sending the first few wsize // packets through // Incorrect assumption, the constructor defaults _connected to true --Sponge - if (!_connected.get()) + if (!_connected.get()) { + if (getResetReceived()) + throw new I2PSocketException(I2PSocketException.STATUS_CONNECTION_RESET); throw new IOException("disconnected"); + } if (_outputStream.getClosed()) throw new IOException("output stream closed"); started = true; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java index 80b4d29204..b43018d6ca 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java @@ -11,6 +11,7 @@ import net.i2p.data.Destination; import net.i2p.data.SessionKey; import net.i2p.data.SessionTag; import net.i2p.data.SigningPrivateKey; +import net.i2p.client.streaming.I2PSocketException; import net.i2p.util.Log; import net.i2p.util.SimpleTimer2; @@ -321,8 +322,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) break; synchronized (this) { if (_ackOn > 0) break; - if (!_connection.getIsConnected()) + if (!_connection.getIsConnected()) { + if (_connection.getResetReceived()) + throw new I2PSocketException(I2PSocketException.STATUS_CONNECTION_RESET); throw new IOException("disconnected"); + } if (_cancelledOn > 0) throw new IOException("cancelled"); if (timeRemaining > 60*1000) diff --git a/history.txt b/history.txt index f2bda4a7e3..a04b158dc0 100644 --- a/history.txt +++ b/history.txt @@ -1,7 +1,15 @@ +2017-12-02 zzz + * i2ptunnel: Propagate resets from streaming to Socket + and vice versa (ticket #2071) + * Streaming: Send reset when receiving more data after locally closed, + rather than acking (ticket #2071) + * Tests: Fix up deprecation warnings + 2017-12-01 zzz * Build: Split net.i2p.router.web into two packages * Console: Move /configkeyring HTML to console, fix deletion, don't truncate hashes, better form errors, tag for translation (ticket #2108) + * LeaseSet: Better error messages when decode fails (ticket #2108) * Streaming: Double the RTO on congestion (ticket #1939) 2017-11-27 zzz diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 7ca1a3ec2c..5a58b19bc2 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 10; + public final static long BUILD = 11; /** for example "-test" */ public final static String EXTRA = "";