i2ptunnel: Propagate resets from streaming to Socket and vice versa (ticket #2071)

This commit is contained in:
zzz
2017-12-02 23:12:19 +00:00
parent 3d0e15aaed
commit 100d307037
6 changed files with 164 additions and 54 deletions

View File

@ -24,6 +24,7 @@ import java.util.zip.GZIPOutputStream;
import javax.net.ssl.SSLException;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketException;
import net.i2p.I2PAppContext;
import net.i2p.data.Base32;
import net.i2p.data.ByteArray;
@ -597,6 +598,8 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
OutputStream browserout = null;
InputStream browserin = null;
InputStream serverin = null;
Sender s = null;
IOException ioex = null;
try {
serverout = _webserver.getOutputStream();
@ -642,7 +645,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
String modifiedHeaders = formatHeaders(headers, command);
compressedOut.write(DataHelper.getUTF8(modifiedHeaders));
Sender s = new Sender(compressedOut, serverin, "server: server to browser", _log);
s = new Sender(compressedOut, serverin, "server: server to browser", _log);
if (_log.shouldLog(Log.INFO))
_log.info("Before pumping the compressed response");
s.run(); // same thread
@ -658,7 +661,46 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("error compressing", ioe);
ioex = ioe;
} finally {
if (ioex == null && s != null)
ioex = s.getFailure();
if (ioex != null) {
// Reset propagation, simplified from I2PTunnelRunner
boolean i2pReset = false;
if (ioex instanceof I2PSocketException) {
I2PSocketException ise = (I2PSocketException) ioex;
int status = ise.getStatus();
i2pReset = status == I2PSocketException.STATUS_CONNECTION_RESET;
if (i2pReset) {
if (_log.shouldWarn())
_log.warn("Server got I2P reset, resetting socket", ioex);
try {
_webserver.setSoLinger(true, 0);
} catch (IOException ioe) {}
try {
_webserver.close();
} catch (IOException ioe) {}
try {
_browser.close();
} catch (IOException ioe) {}
}
}
if (!i2pReset && ioex instanceof SocketException) {
String msg = ioex.getMessage();
boolean sockReset = msg != null && msg.contains("reset");
if (sockReset) {
if (_log.shouldWarn())
_log.warn("Server got socket reset, resetting I2P socket");
try {
_browser.reset();
} catch (IOException ioe) {}
try {
_webserver.close();
} catch (IOException ioe) {}
}
}
}
if (browserout != null) try { browserout.close(); } catch (IOException ioe) {}
if (serverout != null) try { serverout.close(); } catch (IOException ioe) {}
if (browserin != null) try { browserin.close(); } catch (IOException ioe) {}
@ -673,6 +715,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
private final String _name;
// shadows _log in super()
private final Log _log;
private IOException _failure;
public Sender(OutputStream out, InputStream in, String name, Log log) {
_out = out;
@ -691,12 +734,22 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
//_out.flush();
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error sending", ioe);
_log.debug(_name + " Error sending", ioe);
synchronized(this) {
_failure = ioe;
}
} finally {
if (_out != null) try { _out.close(); } catch (IOException ioe) {}
if (_in != null) try { _in.close(); } catch (IOException ioe) {}
}
}
/**
* @since 0.9.33
*/
public synchronized IOException getFailure() {
return _failure;
}
}
/**

View File

@ -17,6 +17,7 @@ import javax.net.ssl.SSLException;
import net.i2p.I2PAppContext;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketException;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.util.ByteCache;
@ -287,8 +288,10 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
if (_log.shouldLog(Log.DEBUG))
_log.debug("At least one forwarder completed, closing and joining");
boolean i2pReset = false;
boolean sockReset = false;
// this task is useful for the httpclient
if (onTimeout != null || _onFail != null) {
if ((onTimeout != null || _onFail != null) && totalReceived <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("runner has a timeout job, totalReceived = " + totalReceived
+ " totalSent = " + totalSent + " job = " + onTimeout);
@ -296,20 +299,65 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
// This will be run even if initialSocketData != null, it's the timeout job's
// responsibility to know that and decide whether or not to write to the socket.
// HTTPClient never sets initialSocketData.
if (totalReceived <= 0) {
if (_onFail != null) {
Exception e = fromI2P.getFailure();
if (e == null)
e = toI2P.getFailure();
_onFail.onFail(e);
} else {
onTimeout.run();
}
if (_onFail != null) {
Exception e = fromI2P.getFailure();
if (e == null)
e = toI2P.getFailure();
_onFail.onFail(e);
} else {
onTimeout.run();
}
} else {
// Detect a reset on one side, and propagate to the other
Exception e1 = fromI2P.getFailure();
Exception e2 = toI2P.getFailure();
Throwable c1 = e1 != null ? e1.getCause() : null;
Throwable c2 = e2 != null ? e2.getCause() : null;
if (c1 != null && c1 instanceof I2PSocketException) {
I2PSocketException ise = (I2PSocketException) c1;
int status = ise.getStatus();
i2pReset = status == I2PSocketException.STATUS_CONNECTION_RESET;
}
if (!i2pReset && c2 != null && c2 instanceof I2PSocketException) {
I2PSocketException ise = (I2PSocketException) c2;
int status = ise.getStatus();
i2pReset = status == I2PSocketException.STATUS_CONNECTION_RESET;
}
if (!i2pReset && e1 != null && e1 instanceof SocketException) {
String msg = e1.getMessage();
sockReset = msg != null && msg.contains("reset");
}
if (!sockReset && e2 != null && e2 instanceof SocketException) {
String msg = e2.getMessage();
sockReset = msg != null && msg.contains("reset");
}
}
// now one connection is dead - kill the other as well, after making sure we flush
close(out, in, i2pout, i2pin, s, i2ps, toI2P, fromI2P);
if (i2pReset) {
if (_log.shouldWarn())
_log.warn("Got I2P reset, resetting socket");
try {
s.setSoLinger(true, 0);
} catch (IOException ioe) {}
try {
s.close();
} catch (IOException ioe) {}
try {
i2ps.close();
} catch (IOException ioe) {}
} else if (sockReset) {
if (_log.shouldWarn())
_log.warn("Got socket reset, resetting I2P socket");
try {
i2ps.reset();
} catch (IOException ioe) {}
try {
s.close();
} catch (IOException ioe) {}
} else {
// now one connection is dead - kill the other as well, after making sure we flush
close(out, in, i2pout, i2pin, s, i2ps, toI2P, fromI2P);
}
} catch (InterruptedException ex) {
if (_log.shouldLog(Log.ERROR))
_log.error("Interrupted", ex);
@ -361,36 +409,24 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException {
try {
out.flush();
} catch (IOException ioe) {
// ignore
}
} catch (IOException ioe) {}
try {
i2pout.flush();
} catch (IOException ioe) {
// ignore
}
} catch (IOException ioe) {}
try {
in.close();
} catch (IOException ioe) {
// ignore
}
} catch (IOException ioe) {}
try {
i2pin.close();
} catch (IOException ioe) {
// ignore
}
} catch (IOException ioe) {}
// ok, yeah, there's a race here in theory, if data comes in after flushing and before
// closing, but its better than before...
try {
s.close();
} catch (IOException ioe) {
// ignore
}
} catch (IOException ioe) {}
try {
i2ps.close();
} catch (IOException ioe) {
// ignore
}
} catch (IOException ioe) {}
t1.join(30*1000);
// t2 = fromI2P now run inline
//t2.join(30*1000);
@ -469,9 +505,9 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
if (in.available() == 0) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Flushing after sending " + len + " bytes through");
if (_log.shouldLog(Log.DEBUG))
_log.debug(direction + ": " + len + " bytes flushed through " + (_toI2P ? "to " : "from ")
+ to);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(direction + ": " + len + " bytes flushed through " + (_toI2P ? "to " : "from ")
// + to);
if (_toI2P) {
try {
Thread.sleep(5);
@ -486,29 +522,35 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
}
}
}
//if (_log.shouldDebug())
// _log.debug(direction + ": Normal EOF on read");
//out.flush(); // close() flushes
} catch (SocketException ex) {
// this *will* occur when the other threads closes the socket
synchronized (finishLock) {
if (!finished) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(direction + ": Socket closed - error reading and writing",
ex);
if (_log.shouldDebug()) {
boolean fnshd;
synchronized (finishLock) {
fnshd = finished;
}
if (!fnshd) {
_log.debug(direction + ": IOE - error forwarding", ex);
} else {
_log.debug(direction + ": IOE caused by other direction", ex);
}
}
_failure = ex;
} catch (InterruptedIOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Closing connection due to timeout (error: \""
+ ex.getMessage() + "\")");
_failure = ex;
} catch (IOException ex) {
if (!finished) {
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Error forwarding", ex);
if (_log.shouldWarn()) {
boolean fnshd;
synchronized (finishLock) {
fnshd = finished;
}
if (!fnshd) {
_log.warn(direction + ": IOE - error forwarding", ex);
} else if (_log.shouldDebug()) {
_log.debug(direction + ": IOE caused by other direction", ex);
}
}
//else
// _log.warn("You may ignore this", ex);
_failure = ex;
} finally {
_cache.release(ba);

View File

@ -200,8 +200,11 @@ class Connection {
// no need to wait until the other side has ACKed us before sending the first few wsize
// packets through
// Incorrect assumption, the constructor defaults _connected to true --Sponge
if (!_connected.get())
if (!_connected.get()) {
if (getResetReceived())
throw new I2PSocketException(I2PSocketException.STATUS_CONNECTION_RESET);
throw new IOException("disconnected");
}
if (_outputStream.getClosed())
throw new IOException("output stream closed");
started = true;

View File

@ -11,6 +11,7 @@ import net.i2p.data.Destination;
import net.i2p.data.SessionKey;
import net.i2p.data.SessionTag;
import net.i2p.data.SigningPrivateKey;
import net.i2p.client.streaming.I2PSocketException;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
@ -321,8 +322,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) break;
synchronized (this) {
if (_ackOn > 0) break;
if (!_connection.getIsConnected())
if (!_connection.getIsConnected()) {
if (_connection.getResetReceived())
throw new I2PSocketException(I2PSocketException.STATUS_CONNECTION_RESET);
throw new IOException("disconnected");
}
if (_cancelledOn > 0)
throw new IOException("cancelled");
if (timeRemaining > 60*1000)

View File

@ -1,7 +1,15 @@
2017-12-02 zzz
* i2ptunnel: Propagate resets from streaming to Socket
and vice versa (ticket #2071)
* Streaming: Send reset when receiving more data after locally closed,
rather than acking (ticket #2071)
* Tests: Fix up deprecation warnings
2017-12-01 zzz
* Build: Split net.i2p.router.web into two packages
* Console: Move /configkeyring HTML to console, fix deletion,
don't truncate hashes, better form errors, tag for translation (ticket #2108)
* LeaseSet: Better error messages when decode fails (ticket #2108)
* Streaming: Double the RTO on congestion (ticket #1939)
2017-11-27 zzz

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 10;
public final static long BUILD = 11;
/** for example "-test" */
public final static String EXTRA = "";