* if we timeout connecting or otherwise need to cancel tags that we've sent, go one step
further and cancel all of the tags we're using for that peer so that we can react to their potential restart / tag loss quicker. * use the minimum resend delay as the base to be exponentiated if our RTT is too low (so we resend less) * dont be such a wuss when flushing a closed stream
This commit is contained in:
@ -336,14 +336,19 @@ public class Connection {
|
|||||||
_inputStream.close();
|
_inputStream.close();
|
||||||
} else {
|
} else {
|
||||||
doClose();
|
doClose();
|
||||||
|
boolean tagsCancelled = false;
|
||||||
synchronized (_outboundPackets) {
|
synchronized (_outboundPackets) {
|
||||||
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
|
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
|
||||||
PacketLocal pl = (PacketLocal)iter.next();
|
PacketLocal pl = (PacketLocal)iter.next();
|
||||||
|
if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
|
||||||
|
tagsCancelled = true;
|
||||||
pl.cancelled();
|
pl.cancelled();
|
||||||
}
|
}
|
||||||
_outboundPackets.clear();
|
_outboundPackets.clear();
|
||||||
_outboundPackets.notifyAll();
|
_outboundPackets.notifyAll();
|
||||||
}
|
}
|
||||||
|
if (tagsCancelled)
|
||||||
|
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
|
||||||
}
|
}
|
||||||
if (removeFromConMgr) {
|
if (removeFromConMgr) {
|
||||||
if (!_disconnectScheduled) {
|
if (!_disconnectScheduled) {
|
||||||
@ -379,15 +384,21 @@ public class Connection {
|
|||||||
+ toString());
|
+ toString());
|
||||||
_connectionManager.removeConnection(this);
|
_connectionManager.removeConnection(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean tagsCancelled = false;
|
||||||
synchronized (_outboundPackets) {
|
synchronized (_outboundPackets) {
|
||||||
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
|
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
|
||||||
PacketLocal pl = (PacketLocal)iter.next();
|
PacketLocal pl = (PacketLocal)iter.next();
|
||||||
|
if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
|
||||||
|
tagsCancelled = true;
|
||||||
pl.cancelled();
|
pl.cancelled();
|
||||||
}
|
}
|
||||||
_outboundPackets.clear();
|
_outboundPackets.clear();
|
||||||
_outboundPackets.notifyAll();
|
_outboundPackets.notifyAll();
|
||||||
}
|
}
|
||||||
|
if (tagsCancelled)
|
||||||
|
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class DisconnectEvent implements SimpleTimer.TimedEvent {
|
private class DisconnectEvent implements SimpleTimer.TimedEvent {
|
||||||
@ -672,7 +683,13 @@ public class Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void timeReached() {
|
public void timeReached() {
|
||||||
if (!_connected) return;
|
if (_packet.getAckTime() > 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (!_connected) {
|
||||||
|
_packet.cancelled();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
//if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
// _log.debug("Resend period reached for " + _packet);
|
// _log.debug("Resend period reached for " + _packet);
|
||||||
@ -732,12 +749,14 @@ public class Connection {
|
|||||||
if (numSends > _options.getMaxResends()) {
|
if (numSends > _options.getMaxResends()) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Too many resends");
|
_log.debug("Too many resends");
|
||||||
|
_packet.cancelled();
|
||||||
disconnect(false);
|
disconnect(false);
|
||||||
} else {
|
} else {
|
||||||
//long timeout = _options.getResendDelay() << numSends;
|
//long timeout = _options.getResendDelay() << numSends;
|
||||||
long timeout = _options.getRTT() << (numSends-1);
|
long rtt = _options.getRTT();
|
||||||
if (timeout < MIN_RESEND_DELAY)
|
if (rtt < MIN_RESEND_DELAY)
|
||||||
timeout = MIN_RESEND_DELAY;
|
rtt = MIN_RESEND_DELAY;
|
||||||
|
long timeout = rtt << (numSends-1);
|
||||||
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
|
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
|
||||||
timeout = MAX_RESEND_DELAY;
|
timeout = MAX_RESEND_DELAY;
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
@ -197,13 +197,14 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
void flushAvailable(DataReceiver target, boolean blocking) throws IOException {
|
void flushAvailable(DataReceiver target, boolean blocking) throws IOException {
|
||||||
WriteStatus ws = null;
|
WriteStatus ws = null;
|
||||||
synchronized (_dataLock) {
|
synchronized (_dataLock) {
|
||||||
if (_buf == null) throw new IOException("closed (buffer went away)");
|
// _buf may be null, but the data receiver can handle that just fine,
|
||||||
|
// deciding whether or not to send a packet
|
||||||
ws = target.writeData(_buf, 0, _valid);
|
ws = target.writeData(_buf, 0, _valid);
|
||||||
_written += _valid;
|
_written += _valid;
|
||||||
_valid = 0;
|
_valid = 0;
|
||||||
_dataLock.notifyAll();
|
_dataLock.notifyAll();
|
||||||
}
|
}
|
||||||
if (blocking) {
|
if (blocking && ws != null) {
|
||||||
ws.waitForAccept(_writeTimeout);
|
ws.waitForAccept(_writeTimeout);
|
||||||
if (ws.writeFailed())
|
if (ws.writeFailed())
|
||||||
throw new IOException("Flush available failed");
|
throw new IOException("Flush available failed");
|
||||||
|
Reference in New Issue
Block a user