have a write() timeout after 60s instead of blocking forever by default (also used when injecting data into an input stream)
This commit is contained in:
@ -9,6 +9,7 @@ import net.i2p.I2PAppContext;
|
|||||||
import net.i2p.I2PException;
|
import net.i2p.I2PException;
|
||||||
import net.i2p.client.I2PSessionException;
|
import net.i2p.client.I2PSessionException;
|
||||||
import net.i2p.data.Destination;
|
import net.i2p.data.Destination;
|
||||||
|
import net.i2p.util.Clock;
|
||||||
import net.i2p.util.I2PThread;
|
import net.i2p.util.I2PThread;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
@ -414,20 +415,27 @@ class I2PSocketImpl implements I2PSocket {
|
|||||||
public void queueData(byte[] data, int off, int len) throws InterruptedIOException, IOException {
|
public void queueData(byte[] data, int off, int len) throws InterruptedIOException, IOException {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
|
_log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
|
||||||
|
Clock clock = I2PAppContext.getGlobalContext().clock();
|
||||||
|
long endAfter = clock.now() + _options.getWriteTimeout();
|
||||||
synchronized (bc) {
|
synchronized (bc) {
|
||||||
if (_options.getMaxBufferSize() > 0) {
|
if (_options.getMaxBufferSize() > 0) {
|
||||||
int waited = 0;
|
while (bc.getCurrentSize() > _options.getMaxBufferSize()) {
|
||||||
while (bc.getCurrentSize() + len > _options.getMaxBufferSize()) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Buffer size exceeded: pending " + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize());
|
_log.debug("Buffer size exceeded: pending " + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize());
|
||||||
if ( (_options.getWriteTimeout() > 0) && (waited > _options.getWriteTimeout()) ) {
|
if (_options.getWriteTimeout() > 0) {
|
||||||
throw new InterruptedIOException("Waited " + waited + "ms to write " + len + " with a buffer at " + bc.getCurrentSize());
|
long timeLeft = endAfter - clock.now();
|
||||||
|
if (timeLeft <= 0) {
|
||||||
|
long waited = _options.getWriteTimeout() - timeLeft;
|
||||||
|
throw new InterruptedIOException("Waited too long (" + waited + "ms) to write "
|
||||||
|
+ len + " with a buffer at " + bc.getCurrentSize());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (inStreamClosed)
|
if (inStreamClosed)
|
||||||
throw new IOException("Stream closed while writing");
|
throw new IOException("Stream closed while writing");
|
||||||
|
if (_closedOn > 0)
|
||||||
|
throw new IOException("I2PSocket closed while writing");
|
||||||
try {
|
try {
|
||||||
bc.wait(1000);
|
bc.wait(1000);
|
||||||
waited += 1000;
|
|
||||||
} catch (InterruptedException ie) {}
|
} catch (InterruptedException ie) {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,12 +10,13 @@ public class I2PSocketOptions {
|
|||||||
private long _writeTimeout;
|
private long _writeTimeout;
|
||||||
private int _maxBufferSize;
|
private int _maxBufferSize;
|
||||||
|
|
||||||
public static final int DEFAULT_BUFFER_SIZE = 1024*128;
|
public static final int DEFAULT_BUFFER_SIZE = 1024*64;
|
||||||
|
public static final int DEFAULT_WRITE_TIMEOUT = 60*1000;
|
||||||
|
|
||||||
public I2PSocketOptions() {
|
public I2PSocketOptions() {
|
||||||
_connectTimeout = -1;
|
_connectTimeout = -1;
|
||||||
_readTimeout = -1;
|
_readTimeout = -1;
|
||||||
_writeTimeout = -1;
|
_writeTimeout = DEFAULT_WRITE_TIMEOUT;
|
||||||
_maxBufferSize = DEFAULT_BUFFER_SIZE;
|
_maxBufferSize = DEFAULT_BUFFER_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user