* fix a reordering bug that can trim the end of a stream under heavy lag (thanks duck!)
* fix a pair of races on router crash
This commit is contained in:
@ -107,7 +107,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
|
|||||||
packet.setOptionalFrom(_connection.getSession().getMyDestination());
|
packet.setOptionalFrom(_connection.getSession().getMyDestination());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_connection.getOutputStream().getClosed()) {
|
// don't set the closed flag if this is a plain ACK and there are outstanding
|
||||||
|
// packets sent, otherwise the other side could receive the CLOSE prematurely,
|
||||||
|
// since this ACK could arrive before the unacked payload message.
|
||||||
|
if (_connection.getOutputStream().getClosed() &&
|
||||||
|
( (size > 0) || (_connection.getUnackedPacketsSent() <= 0) ) ) {
|
||||||
packet.setFlag(Packet.FLAG_CLOSE);
|
packet.setFlag(Packet.FLAG_CLOSE);
|
||||||
_connection.setCloseSentOn(_context.clock().now());
|
_connection.setCloseSentOn(_context.clock().now());
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
@ -60,6 +60,7 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
// this is the only method that *adds* to the _buf, and all
|
// this is the only method that *adds* to the _buf, and all
|
||||||
// code that reads from it is synchronized
|
// code that reads from it is synchronized
|
||||||
synchronized (_dataLock) {
|
synchronized (_dataLock) {
|
||||||
|
if (_buf == null) throw new IOException("closed (buffer went away)");
|
||||||
if (_valid + remaining < _buf.length) {
|
if (_valid + remaining < _buf.length) {
|
||||||
// simply buffer the data, no flush
|
// simply buffer the data, no flush
|
||||||
System.arraycopy(b, cur, _buf, _valid, remaining);
|
System.arraycopy(b, cur, _buf, _valid, remaining);
|
||||||
@ -106,6 +107,7 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
public void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
WriteStatus ws = null;
|
WriteStatus ws = null;
|
||||||
synchronized (_dataLock) {
|
synchronized (_dataLock) {
|
||||||
|
if (_buf == null) throw new IOException("closed (buffer went away)");
|
||||||
ws = _dataReceiver.writeData(_buf, 0, _valid);
|
ws = _dataReceiver.writeData(_buf, 0, _valid);
|
||||||
_written += _valid;
|
_written += _valid;
|
||||||
_valid = 0;
|
_valid = 0;
|
||||||
@ -134,14 +136,22 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
_closed = true;
|
_closed = true;
|
||||||
flush();
|
flush();
|
||||||
_log.debug("Output stream closed after writing " + _written);
|
_log.debug("Output stream closed after writing " + _written);
|
||||||
if (_buf != null) {
|
ByteArray ba = null;
|
||||||
_dataCache.release(new ByteArray(_buf));
|
synchronized (_dataLock) {
|
||||||
_buf = null;
|
if (_buf != null) {
|
||||||
|
ba = new ByteArray(_buf);
|
||||||
|
_buf = null;
|
||||||
|
_valid = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ba != null) {
|
||||||
|
_dataCache.release(ba);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
public void closeInternal() {
|
public void closeInternal() {
|
||||||
_closed = true;
|
_closed = true;
|
||||||
_streamError = new IOException("Closed internally");
|
_streamError = new IOException("Closed internally");
|
||||||
|
ByteArray ba = null;
|
||||||
synchronized (_dataLock) {
|
synchronized (_dataLock) {
|
||||||
// flush any data, but don't wait for it
|
// flush any data, but don't wait for it
|
||||||
if (_valid > 0) {
|
if (_valid > 0) {
|
||||||
@ -149,11 +159,15 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
_written += _valid;
|
_written += _valid;
|
||||||
_valid = 0;
|
_valid = 0;
|
||||||
}
|
}
|
||||||
|
if (_buf != null) {
|
||||||
|
ba = new ByteArray(_buf);
|
||||||
|
_buf = null;
|
||||||
|
_valid = 0;
|
||||||
|
}
|
||||||
_dataLock.notifyAll();
|
_dataLock.notifyAll();
|
||||||
}
|
}
|
||||||
if (_buf != null) {
|
if (ba != null) {
|
||||||
_dataCache.release(new ByteArray(_buf));
|
_dataCache.release(ba);
|
||||||
_buf = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,6 +197,7 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
void flushAvailable(DataReceiver target, boolean blocking) throws IOException {
|
void flushAvailable(DataReceiver target, boolean blocking) throws IOException {
|
||||||
WriteStatus ws = null;
|
WriteStatus ws = null;
|
||||||
synchronized (_dataLock) {
|
synchronized (_dataLock) {
|
||||||
|
if (_buf == null) throw new IOException("closed (buffer went away)");
|
||||||
ws = target.writeData(_buf, 0, _valid);
|
ws = target.writeData(_buf, 0, _valid);
|
||||||
_written += _valid;
|
_written += _valid;
|
||||||
_valid = 0;
|
_valid = 0;
|
||||||
|
@ -83,7 +83,9 @@ class PacketQueue {
|
|||||||
if (!sent) {
|
if (!sent) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Send failed for " + packet);
|
_log.warn("Send failed for " + packet);
|
||||||
packet.getConnection().disconnect(false);
|
Connection c = packet.getConnection();
|
||||||
|
if (c != null) // handle race on b0rk
|
||||||
|
c.disconnect(false);
|
||||||
} else {
|
} else {
|
||||||
packet.setKeyUsed(keyUsed);
|
packet.setKeyUsed(keyUsed);
|
||||||
packet.setTagsSent(tagsSent);
|
packet.setTagsSent(tagsSent);
|
||||||
|
Reference in New Issue
Block a user