From b457001b42a39bac45491b52f1555257608c3e11 Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 11 Nov 2004 09:37:46 +0000 Subject: [PATCH] timeout gracefully even if the socket is stopped in odd places --- .../net/i2p/client/streaming/Connection.java | 2 +- .../client/streaming/MessageOutputStream.java | 12 +++++++++- .../net/i2p/client/streaming/PacketLocal.java | 22 ++++++++++++------- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index c437c2b2a..af5b6bc59 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -66,7 +66,7 @@ public class Connection { public static final long MIN_RESEND_DELAY = 20*1000; /** wait up to 5 minutes after disconnection so we can ack/close packets */ - public static long DISCONNECT_TIMEOUT = 5*60*1000; + public static int DISCONNECT_TIMEOUT = 5*60*1000; /** lets be sane.. no more than 32 packets in the air in each dir */ public static final int MAX_WINDOW_SIZE = 32; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 10e43c25a..aa29c6282 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -108,7 +108,16 @@ public class MessageOutputStream extends OutputStream { _dataLock.notifyAll(); } - ws.waitForCompletion(_writeTimeout); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws); + if (_closed && + ( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) || + (_writeTimeout <= 0) ) ) + ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT); + else + ws.waitForCompletion(_writeTimeout); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("after waiting " + _writeTimeout + "ms for completion of " + ws); if (ws.writeFailed() && (_writeTimeout > 0) ) throw new InterruptedIOException("Timed out during write"); else if (ws.writeFailed()) @@ -117,6 +126,7 @@ public class MessageOutputStream extends OutputStream { } public void close() throws IOException { + if (_closed) return; _closed = true; flush(); _log.debug("Output stream closed after writing " + _written); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index ea02dc64f..3b291f78d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -117,14 +117,20 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public void waitForCompletion(int maxWaitMs) { long expiration = _context.clock().now()+maxWaitMs; - while ((maxWaitMs <= 0) || (expiration < _context.clock().now())) { - synchronized (this) { - if (_ackOn > 0) - return; - if (_cancelledOn > 0) - return; - try { wait(); } catch (InterruptedException ie) {} - } + while (true) { + long timeRemaining = expiration - _context.clock().now(); + if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) return; + try { + synchronized (this) { + if (_ackOn > 0) return; + if (_cancelledOn > 0) return; + if (timeRemaining > 60*1000) + timeRemaining = 60*1000; + else if (timeRemaining <= 0) + timeRemaining = 10*1000; + wait(timeRemaining); + } + } catch (InterruptedException ie) {} } }