Initial implementation of read() timeout on I2PSocket. Let's see whether it
could solve duck's problems with dangling threads... (human)
This commit is contained in:
@ -32,6 +32,20 @@ public interface I2PSocket {
|
|||||||
*/
|
*/
|
||||||
public OutputStream getOutputStream() throws IOException;
|
public OutputStream getOutputStream() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How long we will wait blocked on a read() operation.
|
||||||
|
*
|
||||||
|
* @return milliseconds to wait, or -1 if we will wait indefinitely
|
||||||
|
*/
|
||||||
|
public long getReadTimeout();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Define how long we will wait blocked on a read() operation (-1 will make
|
||||||
|
* the socket wait forever).
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void setReadTimeout(long ms);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes the socket if not closed yet
|
* Closes the socket if not closed yet
|
||||||
*/
|
*/
|
||||||
|
@ -65,14 +65,14 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
synchronized (remoteIDWaiter) {
|
synchronized (remoteIDWaiter) {
|
||||||
if (wait) {
|
if (wait) {
|
||||||
try {
|
try {
|
||||||
if (maxWait > 0)
|
if (maxWait >= 0)
|
||||||
remoteIDWaiter.wait(maxWait);
|
remoteIDWaiter.wait(maxWait);
|
||||||
else
|
else
|
||||||
remoteIDWaiter.wait();
|
remoteIDWaiter.wait();
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((maxWait > 0) && (System.currentTimeMillis() > dieAfter))
|
if ((maxWait >= 0) && (System.currentTimeMillis() >= dieAfter))
|
||||||
throw new InterruptedIOException("Timed out waiting for remote ID");
|
throw new InterruptedIOException("Timed out waiting for remote ID");
|
||||||
|
|
||||||
_log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) + " for "
|
_log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) + " for "
|
||||||
@ -146,11 +146,29 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
return (byte) ((outgoing ? (byte) 0xA0 : (byte) 0x50) + (byte) add);
|
return (byte) ((outgoing ? (byte) 0xA0 : (byte) 0x50) + (byte) add);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getReadTimeout() {
|
||||||
|
return in.getReadTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReadTimeout(long ms) {
|
||||||
|
in.setReadTimeout(ms);
|
||||||
|
}
|
||||||
|
|
||||||
//--------------------------------------------------
|
//--------------------------------------------------
|
||||||
public class I2PInputStream extends InputStream {
|
public class I2PInputStream extends InputStream {
|
||||||
|
|
||||||
private ByteCollector bc = new ByteCollector();
|
private ByteCollector bc = new ByteCollector();
|
||||||
|
|
||||||
|
private long readTimeout = -1;
|
||||||
|
|
||||||
|
public long getReadTimeout() {
|
||||||
|
return readTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReadTimeout(long ms) {
|
||||||
|
readTimeout = ms;
|
||||||
|
}
|
||||||
|
|
||||||
public int read() throws IOException {
|
public int read() throws IOException {
|
||||||
byte[] b = new byte[1];
|
byte[] b = new byte[1];
|
||||||
int res = read(b);
|
int res = read(b);
|
||||||
@ -162,7 +180,10 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
public synchronized int read(byte[] b, int off, int len) throws IOException {
|
public synchronized int read(byte[] b, int off, int len) throws IOException {
|
||||||
_log.debug("Read called: " + this.hashCode());
|
_log.debug("Read called: " + this.hashCode());
|
||||||
if (len == 0) return 0;
|
if (len == 0) return 0;
|
||||||
|
long dieAfter = System.currentTimeMillis() + readTimeout;
|
||||||
byte[] read = bc.startToByteArray(len);
|
byte[] read = bc.startToByteArray(len);
|
||||||
|
boolean timedOut = false;
|
||||||
|
|
||||||
while (read.length == 0) {
|
while (read.length == 0) {
|
||||||
synchronized (flagLock) {
|
synchronized (flagLock) {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
@ -171,9 +192,18 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
if (readTimeout >= 0) {
|
||||||
|
wait(readTimeout);
|
||||||
|
} else {
|
||||||
wait();
|
wait();
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
}
|
}
|
||||||
|
} catch (InterruptedException ex) {}
|
||||||
|
|
||||||
|
if ((readTimeout >= 0)
|
||||||
|
&& (System.currentTimeMillis() >= dieAfter)) {
|
||||||
|
throw new InterruptedIOException("Timeout reading from I2PSocket (" + readTimeout + " msecs)");
|
||||||
|
}
|
||||||
|
|
||||||
read = bc.startToByteArray(len);
|
read = bc.startToByteArray(len);
|
||||||
}
|
}
|
||||||
if (read.length > len) throw new RuntimeException("BUG");
|
if (read.length > len) throw new RuntimeException("BUG");
|
||||||
@ -304,6 +334,8 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
manager.removeSocket(I2PSocketImpl.this);
|
manager.removeSocket(I2PSocketImpl.this);
|
||||||
|
} catch (InterruptedIOException ex) {
|
||||||
|
_log.error("BUG! read() operations should not timeout!", ex);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
// WHOEVER removes this event on inconsistent
|
// WHOEVER removes this event on inconsistent
|
||||||
// state before fixing the inconsistent state (a
|
// state before fixing the inconsistent state (a
|
||||||
|
@ -21,6 +21,10 @@ public class I2PSocketOptions {
|
|||||||
return _connectTimeout;
|
return _connectTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Define how long we will wait for the ACK from a SYN, in milliseconds.
|
||||||
|
*
|
||||||
|
*/
|
||||||
public void setConnectTimeout(long ms) {
|
public void setConnectTimeout(long ms) {
|
||||||
_connectTimeout = ms;
|
_connectTimeout = ms;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user