HTTP client: Fix occasional truncation of compressed responses

log tweaks
This commit is contained in:
zzz
2015-05-29 17:37:58 +00:00
parent 4bf115b5f6
commit 3602f73497
3 changed files with 44 additions and 7 deletions

View File

@ -46,6 +46,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
protected boolean _gzip; protected boolean _gzip;
protected long _dataExpected; protected long _dataExpected;
protected String _contentType; protected String _contentType;
private PipedInputStream _pipedInputStream;
private static final int CACHE_SIZE = 8*1024; private static final int CACHE_SIZE = 8*1024;
private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE); private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE);
@ -155,6 +156,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
responseLine = new String(_headerBuffer.getData(), 0, i+1); // includes NL responseLine = new String(_headerBuffer.getData(), 0, i+1); // includes NL
responseLine = filterResponseLine(responseLine); responseLine = filterResponseLine(responseLine);
responseLine = (responseLine.trim() + "\r\n"); responseLine = (responseLine.trim() + "\r\n");
if (_log.shouldLog(Log.INFO))
_log.info("Response: " + responseLine.trim());
out.write(responseLine.getBytes()); out.write(responseLine.getBytes());
} else { } else {
for (int j = lastEnd+1; j < i; j++) { for (int j = lastEnd+1; j < i; j++) {
@ -245,7 +248,30 @@ class HTTPResponseOutputStream extends FilterOutputStream {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
out.close(); if (_log.shouldLog(Log.INFO))
_log.info("Closing " + out + " threaded?? " + shouldCompress(), new Exception("I did it"));
PipedInputStream pi;
synchronized(this) {
// synch with changing out field below
super.close();
pi = _pipedInputStream;
}
// Prevent truncation of gunzipped data as
// I2PTunnelHTTPClientRunner.close() closes the Socket after this.
// Closing pipe only notifies read end, doesn't wait.
// TODO switch to Java 6 InflaterOutputStream and get rid of Pusher thread
if (pi != null) {
for (int i = 0; i < 50; i++) {
if (pi.available() <= 0) {
if (i > 0 && _log.shouldWarn())
_log.warn("Waited " + (i*20) + " for read side to close");
break;
}
try {
Thread.sleep(20);
} catch (InterruptedException ie) {}
}
}
} }
protected void beginProcessing() throws IOException { protected void beginProcessing() throws IOException {
@ -253,7 +279,12 @@ class HTTPResponseOutputStream extends FilterOutputStream {
PipedInputStream pi = BigPipedInputStream.getInstance(); PipedInputStream pi = BigPipedInputStream.getInstance();
PipedOutputStream po = new PipedOutputStream(pi); PipedOutputStream po = new PipedOutputStream(pi);
Runnable r = new Pusher(pi, out); Runnable r = new Pusher(pi, out);
if (_log.shouldLog(Log.INFO))
_log.info("Starting threaded decompressing pusher to " + out);
synchronized(this) {
out = po; out = po;
_pipedInputStream = pi;
}
// TODO we should be able to do this inline somehow // TODO we should be able to do this inline somehow
TunnelControllerGroup tcg = TunnelControllerGroup.getInstance(); TunnelControllerGroup tcg = TunnelControllerGroup.getInstance();
if (tcg != null) { if (tcg != null) {
@ -285,6 +316,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
} }
public void run() { public void run() {
if (_log.shouldLog(Log.INFO))
_log.info("Starting pusher from " + _inRaw + " to: " + _out);
ReusableGZIPInputStream _in = ReusableGZIPInputStream.acquire(); ReusableGZIPInputStream _in = ReusableGZIPInputStream.acquire();
long written = 0; long written = 0;
ByteArray ba = null; ByteArray ba = null;
@ -301,11 +334,11 @@ class HTTPResponseOutputStream extends FilterOutputStream {
_out.flush(); _out.flush();
written += read; written += read;
} }
if (_log.shouldLog(Log.INFO))
_log.info("Decompressed: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded());
} catch (IOException ioe) { } catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error decompressing: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded(), ioe); _log.warn("Error decompressing: " + written + ", " + _in.getTotalRead() +
"/" + _in.getTotalExpanded() +
" from " + _inRaw + " to: " + _out, ioe);
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
_log.error("OOM in HTTP Decompressor", oom); _log.error("OOM in HTTP Decompressor", oom);
} finally { } finally {
@ -313,7 +346,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
_log.info("After decompression, written=" + written + _log.info("After decompression, written=" + written +
" read=" + _in.getTotalRead() " read=" + _in.getTotalRead()
+ ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining() + ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining()
+ ", finished=" + _in.getFinished()); + ", finished=" + _in.getFinished() +
" from " + _inRaw + " to: " + _out);
if (ba != null) if (ba != null)
_cache.release(ba); _cache.release(ba);
if (_out != null) try { if (_out != null) try {

View File

@ -1,3 +1,6 @@
2015-05-29 zzz
* HTTP client: Fix occasional truncation of compressed responses
2015-05-27 zzz 2015-05-27 zzz
* Banlist: Ban all-zero hash * Banlist: Ban all-zero hash
* DataHelper: Add year output to formatDuration() * DataHelper: Add year output to formatDuration()

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 31; public final static long BUILD = 32;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = "-rc"; public final static String EXTRA = "-rc";