i2ptunnel: Close input stream when HTTP client decompressor terminates (ticket #1506)

streaming: Minor cleanups, log tweaks
This commit is contained in:
zzz
2015-05-04 14:43:54 +00:00
parent 2226936737
commit f7b7a98b9d
4 changed files with 31 additions and 23 deletions

View File

@ -309,7 +309,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
_log.error("OOM in HTTP Decompressor", oom); _log.error("OOM in HTTP Decompressor", oom);
} finally { } finally {
if (_log.shouldLog(Log.INFO) && (_in != null)) if (_log.shouldInfo())
_log.info("After decompression, written=" + written + _log.info("After decompression, written=" + written +
" read=" + _in.getTotalRead() " read=" + _in.getTotalRead()
+ ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining() + ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining()
@ -319,19 +319,20 @@ class HTTPResponseOutputStream extends FilterOutputStream {
if (_out != null) try { if (_out != null) try {
_out.close(); _out.close();
} catch (IOException ioe) {} } catch (IOException ioe) {}
try {
_in.close();
} catch (IOException ioe) {}
} }
if (_in != null) { double compressed = _in.getTotalRead();
double compressed = _in.getTotalRead(); double expanded = _in.getTotalExpanded();
double expanded = _in.getTotalExpanded(); ReusableGZIPInputStream.release(_in);
ReusableGZIPInputStream.release(_in); if (compressed > 0 && expanded > 0) {
if (compressed > 0 && expanded > 0) { // only update the stats if we did something
// only update the stats if we did something double ratio = compressed/expanded;
double ratio = compressed/expanded; _context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio));
_context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio)); _context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed);
_context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed); _context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded);
_context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded);
}
} }
} }
} }

View File

@ -1517,7 +1517,7 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
if (tasks.isEmpty()) { if (tasks.isEmpty()) {
System.exit(0); System.exit(0);
} }
l.log("There are running tasks. Try 'list'."); l.log("There are running tasks. Try 'list' or 'close all'.");
//notifyEvent("quitResult", "error"); //notifyEvent("quitResult", "error");
} }

View File

@ -52,7 +52,7 @@ class I2PSocketFull implements I2PSocket {
Connection c = _connection; Connection c = _connection;
if (c == null) return; if (c == null) return;
if (log.shouldLog(Log.INFO)) if (log.shouldLog(Log.INFO))
log.info("close() called, connected? " + c.getIsConnected() + " : " + c); log.info("close() called, connected? " + c.getIsConnected() + " : " + c, new Exception());
if (c.getIsConnected()) { if (c.getIsConnected()) {
MessageInputStream in = c.getInputStream(); MessageInputStream in = c.getInputStream();
in.close(); in.close();

View File

@ -62,6 +62,9 @@ class MessageInputStream extends InputStream {
private final byte[] _oneByte = new byte[1]; private final byte[] _oneByte = new byte[1];
private final Object _dataLock; private final Object _dataLock;
/** only in _notYetReadyBlocks, never in _readyDataBlocks */
private static final ByteArray DUMMY_BA = new ByteArray(null);
private static final int MIN_READY_BUFFERS = 16; private static final int MIN_READY_BUFFERS = 16;
@ -320,9 +323,9 @@ class MessageInputStream extends InputStream {
_highestReadyBlockId = messageId; _highestReadyBlockId = messageId;
long cur = _highestReadyBlockId + 1; long cur = _highestReadyBlockId + 1;
// now pull in any previously pending blocks // now pull in any previously pending blocks
while (_notYetReadyBlocks.containsKey(Long.valueOf(cur))) { ByteArray ba;
ByteArray ba = _notYetReadyBlocks.remove(Long.valueOf(cur)); while ((ba = _notYetReadyBlocks.remove(Long.valueOf(cur))) != null) {
if ( (ba != null) && (ba.getData() != null) && (ba.getValid() > 0) ) { if (ba.getData() != null && ba.getValid() > 0) {
_readyDataBlocks.add(ba); _readyDataBlocks.add(ba);
} }
@ -336,13 +339,17 @@ class MessageInputStream extends InputStream {
// Java throws a SocketTimeoutException. // Java throws a SocketTimeoutException.
// We do neither. // We do neither.
} else { } else {
if (_log.shouldLog(Log.INFO)) // _notYetReadyBlocks size is limited in canAccept()
_log.info("Message is out of order: " + messageId); if (_locallyClosed) {
// _notYetReadyBlocks size is limited in ConnectionPacketHandler. if (_log.shouldInfo())
if (_locallyClosed) // dont need the payload, just the msgId in order _log.info("Message received on closed stream: " + messageId);
_notYetReadyBlocks.put(Long.valueOf(messageId), new ByteArray(null)); // dont need the payload, just the msgId in order
else _notYetReadyBlocks.put(Long.valueOf(messageId), DUMMY_BA);
} else {
if (_log.shouldInfo())
_log.info("Message is out of order: " + messageId);
_notYetReadyBlocks.put(Long.valueOf(messageId), payload); _notYetReadyBlocks.put(Long.valueOf(messageId), payload);
}
} }
_dataLock.notifyAll(); _dataLock.notifyAll();
} }