From 6bd9e58ece41e72f9d41fbb7efb07bd452bcb66e Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 18 Nov 2004 13:47:27 +0000 Subject: [PATCH] * fix a reordering bug that can trim the end of a stream under heavy lag (thanks duck!) * fix a pair of races on router crash --- .../streaming/ConnectionDataReceiver.java | 6 ++++- .../client/streaming/MessageOutputStream.java | 27 ++++++++++++++----- .../net/i2p/client/streaming/PacketQueue.java | 4 ++- 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 4d4177ff4..9219e5f57 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -107,7 +107,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setOptionalFrom(_connection.getSession().getMyDestination()); } - if (_connection.getOutputStream().getClosed()) { + // don't set the closed flag if this is a plain ACK and there are outstanding + // packets sent, otherwise the other side could receive the CLOSE prematurely, + // since this ACK could arrive before the unacked payload message. + if (_connection.getOutputStream().getClosed() && + ( (size > 0) || (_connection.getUnackedPacketsSent() <= 0) ) ) { packet.setFlag(Packet.FLAG_CLOSE); _connection.setCloseSentOn(_context.clock().now()); if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 577125812..55b7684ca 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -60,6 +60,7 @@ public class MessageOutputStream extends OutputStream { // this is the only method that *adds* to the _buf, and all // code that reads from it is synchronized synchronized (_dataLock) { + if (_buf == null) throw new IOException("closed (buffer went away)"); if (_valid + remaining < _buf.length) { // simply buffer the data, no flush System.arraycopy(b, cur, _buf, _valid, remaining); @@ -106,6 +107,7 @@ public class MessageOutputStream extends OutputStream { public void flush() throws IOException { WriteStatus ws = null; synchronized (_dataLock) { + if (_buf == null) throw new IOException("closed (buffer went away)"); ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; @@ -134,14 +136,22 @@ public class MessageOutputStream extends OutputStream { _closed = true; flush(); _log.debug("Output stream closed after writing " + _written); - if (_buf != null) { - _dataCache.release(new ByteArray(_buf)); - _buf = null; + ByteArray ba = null; + synchronized (_dataLock) { + if (_buf != null) { + ba = new ByteArray(_buf); + _buf = null; + _valid = 0; + } + } + if (ba != null) { + _dataCache.release(ba); } } public void closeInternal() { _closed = true; _streamError = new IOException("Closed internally"); + ByteArray ba = null; synchronized (_dataLock) { // flush any data, but don't wait for it if (_valid > 0) { @@ -149,11 +159,15 @@ public class MessageOutputStream extends OutputStream { _written += _valid; _valid = 0; } + if (_buf != null) { + ba = new ByteArray(_buf); + _buf = null; + _valid = 0; + } _dataLock.notifyAll(); } - if (_buf != null) { - _dataCache.release(new ByteArray(_buf)); - _buf = null; + if (ba != null) { + _dataCache.release(ba); } } @@ -183,6 +197,7 @@ public class MessageOutputStream extends OutputStream { void flushAvailable(DataReceiver target, boolean blocking) throws IOException { WriteStatus ws = null; synchronized (_dataLock) { + if (_buf == null) throw new IOException("closed (buffer went away)"); ws = target.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 1841f797c..8c3d30a5d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -83,7 +83,9 @@ class PacketQueue { if (!sent) { if (_log.shouldLog(Log.WARN)) _log.warn("Send failed for " + packet); - packet.getConnection().disconnect(false); + Connection c = packet.getConnection(); + if (c != null) // handle race on b0rk + c.disconnect(false); } else { packet.setKeyUsed(keyUsed); packet.setTagsSent(tagsSent);