Streaming: Throw exception on read timeout (ticket #2292)

Log tweaks
This commit is contained in:
zzz
2018-07-28 21:44:56 +00:00
parent f12dbba3d6
commit b5ed39f10d
5 changed files with 39 additions and 39 deletions

View File

@ -37,7 +37,8 @@ public interface I2PSocketOptions {
/**
* What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws
* InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
* SocketTimeoutException as of 0.9.36.
* Prior to that, the read() returned -1 or 0.
*
* WARNING: Default -1 (unlimited), which is probably not what you want.
*
@ -48,7 +49,8 @@ public interface I2PSocketOptions {
/**
* What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws
* InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
* SocketTimeoutException as of 0.9.36.
* Prior to that, the read() returned -1 or 0.
*
* WARNING: Default -1 (unlimited), which is probably not what you want.
*

View File

@ -144,7 +144,8 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
/**
* What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws
* InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
* SocketTimeoutException as of 0.9.36.
* Prior to that, the read() returned -1 or 0.
*
* WARNING: Default -1 (unlimited), which is probably not what you want.
*
@ -157,7 +158,8 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
/**
* What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws
* InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
* SocketTimeoutException as of 0.9.36.
* Prior to that, the read() returned -1 or 0.
*
* WARNING: Default -1 (unlimited), which is probably not what you want.
*

View File

@ -3,6 +3,7 @@ package net.i2p.client.streaming.impl;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -352,10 +353,6 @@ class MessageInputStream extends InputStream {
cur++;
_highestReadyBlockId++;
}
// FIXME Javadocs for setReadTimeout() say we will throw
// an InterruptedIOException.
// Java throws a SocketTimeoutException.
// We do neither.
} else {
// _notYetReadyBlocks size is limited in canAccept()
if (_locallyClosed) {
@ -375,9 +372,8 @@ class MessageInputStream extends InputStream {
}
/**
* On a read timeout, this returns -1
* (doesn't throw SocketTimeoutException like Socket)
* (doesn't throw InterruptedIOException like our javadocs say)
* On a read timeout, this throws a SocketTimeoutException
* as of 0.9.36. Prior to that, returned -1.
*/
public int read() throws IOException {
int read = read(_oneByte, 0, 1);
@ -387,9 +383,8 @@ class MessageInputStream extends InputStream {
}
/**
* On a read timeout, this returns 0
* (doesn't throw SocketTimeoutException like Socket)
* (doesn't throw InterruptedIOException like our javadocs say)
* On a read timeout, this throws a SocketTimeoutException
* as of 0.9.36. Prior to that, returned 0.
*/
@Override
public int read(byte target[]) throws IOException {
@ -397,9 +392,8 @@ class MessageInputStream extends InputStream {
}
/**
* On a read timeout, this returns 0
* (doesn't throw SocketTimeoutException like Socket)
* (doesn't throw InterruptedIOException like our javadocs say)
* On a read timeout, this throws a SocketTimeoutException
* as of 0.9.36. Prior to that, returned 0.
*/
@Override
public int read(byte target[], int offset, int length) throws IOException {
@ -409,6 +403,8 @@ class MessageInputStream extends InputStream {
expiration = readTimeout + System.currentTimeMillis();
else
expiration = -1;
// for speed
final boolean shouldDebug = _log.shouldDebug();
synchronized (_dataLock) {
if (_locallyClosed) throw new IOException("Input stream closed");
throwAnyError();
@ -424,13 +420,13 @@ class MessageInputStream extends InputStream {
if ( (_notYetReadyBlocks.isEmpty()) && (_closeReceived) ) {
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset + ", " + length + ")[" + i
+ "] got EOF after " + _readTotal + " " + toString());
+ "] got EOF after " + _readTotal + ": " + hashCode());
return -1;
} else {
if (readTimeout < 0) {
if (_log.shouldLog(Log.DEBUG))
if (shouldDebug)
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] with no timeout: " + toString());
+ "] wait w/o timeout: " + hashCode());
try {
_dataLock.wait();
} catch (InterruptedException ie) {
@ -438,14 +434,14 @@ class MessageInputStream extends InputStream {
ioe2.initCause(ie);
throw ioe2;
}
if (_log.shouldLog(Log.DEBUG))
if (shouldDebug)
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] with no timeout complete: " + toString());
+ "] wait w/o timeout complete: " + hashCode());
throwAnyError();
} else if (readTimeout > 0) {
if (_log.shouldLog(Log.DEBUG))
if (shouldDebug)
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] with timeout: " + readTimeout + ": " + toString());
+ "] wait: " + readTimeout + ": " + hashCode());
try {
_dataLock.wait(readTimeout);
} catch (InterruptedException ie) {
@ -453,29 +449,25 @@ class MessageInputStream extends InputStream {
ioe2.initCause(ie);
throw ioe2;
}
if (_log.shouldLog(Log.DEBUG))
if (shouldDebug)
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] with timeout complete: " + readTimeout + ": " + toString());
+ "] wait complete: " + readTimeout + ": " + hashCode());
throwAnyError();
} else { // readTimeout == 0
// noop, don't block
if (_log.shouldLog(Log.DEBUG))
if (shouldDebug)
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] with nonblocking setup: " + toString());
return i;
+ "] nonblocking return: " + hashCode());
return 0;
}
if (_readyDataBlocks.isEmpty()) {
if (readTimeout > 0) {
long remaining = expiration - System.currentTimeMillis();
if (remaining <= 0) {
// FIXME Javadocs for setReadTimeout() say we will throw
// an InterruptedIOException.
// Java throws a SocketTimeoutException.
// We do neither.
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset+", " + length+ ")[" + i
+ "] expired: " + toString());
return i;
+ "] timed out: " + hashCode());
throw new SocketTimeoutException();
} else {
readTimeout = (int) remaining;
}
@ -486,7 +478,7 @@ class MessageInputStream extends InputStream {
// we looped a few times then got data, so this pass doesnt count
i--;
} else if (_readyDataBlocks.isEmpty()) {
if (_log.shouldLog(Log.DEBUG))
if (shouldDebug)
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] no more ready blocks, returning");
return i;
@ -502,7 +494,7 @@ class MessageInputStream extends InputStream {
_readTotal++;
target[offset + i] = rv; // rv < 0 ? rv + 256 : rv
if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getValid() - 5) ) {
if (_log.shouldLog(Log.DEBUG))
if (shouldDebug)
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] after ready data: readyDataBlockIndex=" + _readyDataBlockIndex
+ " readyBlocks=" + _readyDataBlocks.size()
@ -514,7 +506,7 @@ class MessageInputStream extends InputStream {
} // for (int i = 0; i < length; i++) {
} // synchronized (_dataLock)
if (_log.shouldLog(Log.DEBUG))
if (shouldDebug)
_log.debug("read(byte[]," + offset + ',' + length + ") read fully; total read: " +_readTotal);
return length;

View File

@ -1,3 +1,7 @@
2018-07-28 zzz
* Console: Catch ISE in get/setAttribute() (ticket #1529)
* Streaming: Throw exception on read timeout (ticket #2292)
2018-07-27 zzz
* Console: Split netdb output into pages
* Router: Implement router.rejectStartupTime config (ticket #2285)

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 18;
public final static long BUILD = 19;
/** for example "-test" */
public final static String EXTRA = "";