timeout gracefully even if the socket is stopped in odd places
This commit is contained in:
@ -66,7 +66,7 @@ public class Connection {
|
||||
public static final long MIN_RESEND_DELAY = 20*1000;
|
||||
|
||||
/** wait up to 5 minutes after disconnection so we can ack/close packets */
|
||||
public static long DISCONNECT_TIMEOUT = 5*60*1000;
|
||||
public static int DISCONNECT_TIMEOUT = 5*60*1000;
|
||||
|
||||
/** lets be sane.. no more than 32 packets in the air in each dir */
|
||||
public static final int MAX_WINDOW_SIZE = 32;
|
||||
|
@ -108,7 +108,16 @@ public class MessageOutputStream extends OutputStream {
|
||||
_dataLock.notifyAll();
|
||||
}
|
||||
|
||||
ws.waitForCompletion(_writeTimeout);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws);
|
||||
if (_closed &&
|
||||
( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ||
|
||||
(_writeTimeout <= 0) ) )
|
||||
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
|
||||
else
|
||||
ws.waitForCompletion(_writeTimeout);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("after waiting " + _writeTimeout + "ms for completion of " + ws);
|
||||
if (ws.writeFailed() && (_writeTimeout > 0) )
|
||||
throw new InterruptedIOException("Timed out during write");
|
||||
else if (ws.writeFailed())
|
||||
@ -117,6 +126,7 @@ public class MessageOutputStream extends OutputStream {
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
if (_closed) return;
|
||||
_closed = true;
|
||||
flush();
|
||||
_log.debug("Output stream closed after writing " + _written);
|
||||
|
@ -117,14 +117,20 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
|
||||
|
||||
public void waitForCompletion(int maxWaitMs) {
|
||||
long expiration = _context.clock().now()+maxWaitMs;
|
||||
while ((maxWaitMs <= 0) || (expiration < _context.clock().now())) {
|
||||
synchronized (this) {
|
||||
if (_ackOn > 0)
|
||||
return;
|
||||
if (_cancelledOn > 0)
|
||||
return;
|
||||
try { wait(); } catch (InterruptedException ie) {}
|
||||
}
|
||||
while (true) {
|
||||
long timeRemaining = expiration - _context.clock().now();
|
||||
if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) return;
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (_ackOn > 0) return;
|
||||
if (_cancelledOn > 0) return;
|
||||
if (timeRemaining > 60*1000)
|
||||
timeRemaining = 60*1000;
|
||||
else if (timeRemaining <= 0)
|
||||
timeRemaining = 10*1000;
|
||||
wait(timeRemaining);
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user