* sliding windows w/ additive increase / multiplicitive decrease
* immediately send an ack on receiving a duplicate payload message (unless we've sent one within the last RTT) * only adjust the RTT when there have been no resends * added some (disabled) throttles - randomly injecting delays on received packets, as well as randomly dropping them * logging
This commit is contained in:
@ -93,16 +93,18 @@ public class Connection {
|
|||||||
* @return true if the packet should be sent
|
* @return true if the packet should be sent
|
||||||
*/
|
*/
|
||||||
boolean packetSendChoke() {
|
boolean packetSendChoke() {
|
||||||
if (true) return true;
|
if (false) return true;
|
||||||
long writeExpire = _options.getWriteTimeout();
|
long writeExpire = _options.getWriteTimeout();
|
||||||
if (writeExpire > 0)
|
|
||||||
writeExpire += _context.clock().now();
|
|
||||||
while (true) {
|
while (true) {
|
||||||
long timeLeft = writeExpire - _context.clock().now();
|
long timeLeft = writeExpire - _context.clock().now();
|
||||||
synchronized (_outboundPackets) {
|
synchronized (_outboundPackets) {
|
||||||
if (_outboundPackets.size() >= _options.getWindowSize()) {
|
if (_outboundPackets.size() >= _options.getWindowSize()) {
|
||||||
if (writeExpire > 0) {
|
if (writeExpire > 0) {
|
||||||
if (timeLeft <= 0) return false;
|
if (timeLeft <= 0) {
|
||||||
|
_log.error("Outbound window is full of " + _outboundPackets.size()
|
||||||
|
+ " and we've waited too long (" + writeExpire + "ms)");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "), waiting " + timeLeft);
|
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "), waiting " + timeLeft);
|
||||||
try { _outboundPackets.wait(timeLeft); } catch (InterruptedException ie) {}
|
try { _outboundPackets.wait(timeLeft); } catch (InterruptedException ie) {}
|
||||||
@ -118,6 +120,10 @@ public class Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ackImmediately() {
|
||||||
|
_receiver.send(null, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush any data that we can
|
* Flush any data that we can
|
||||||
*/
|
*/
|
||||||
@ -147,7 +153,7 @@ public class Connection {
|
|||||||
synchronized (_outboundPackets) {
|
synchronized (_outboundPackets) {
|
||||||
_outboundPackets.put(new Long(packet.getSequenceNum()), packet);
|
_outboundPackets.put(new Long(packet.getSequenceNum()), packet);
|
||||||
}
|
}
|
||||||
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getResendDelay());
|
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getRTT()*2);
|
||||||
}
|
}
|
||||||
|
|
||||||
_lastSendTime = _context.clock().now();
|
_lastSendTime = _context.clock().now();
|
||||||
@ -159,7 +165,7 @@ public class Connection {
|
|||||||
// something that will get a reply so that we can deliver some new tags -
|
// something that will get a reply so that we can deliver some new tags -
|
||||||
// ACKs don't get ACKed, but pings do.
|
// ACKs don't get ACKed, but pings do.
|
||||||
if (packet.getTagsSent().size() > 0) {
|
if (packet.getTagsSent().size() > 0) {
|
||||||
_log.error("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags");
|
_log.warn("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags");
|
||||||
_connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent());
|
_connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -327,6 +333,8 @@ public class Connection {
|
|||||||
buf.append(Base64.encode(_sendStreamId));
|
buf.append(Base64.encode(_sendStreamId));
|
||||||
else
|
else
|
||||||
buf.append("unknown");
|
buf.append("unknown");
|
||||||
|
buf.append(" wsize: ").append(_options.getWindowSize());
|
||||||
|
buf.append(" rtt: ").append(_options.getRTT());
|
||||||
buf.append(" unacked outbound: ");
|
buf.append(" unacked outbound: ");
|
||||||
synchronized (_outboundPackets) {
|
synchronized (_outboundPackets) {
|
||||||
buf.append(_outboundPackets.size()).append(" [");
|
buf.append(_outboundPackets.size()).append(" [");
|
||||||
@ -367,8 +375,8 @@ public class Connection {
|
|||||||
|
|
||||||
int numSends = _packet.getNumSends() + 1;
|
int numSends = _packet.getNumSends() + 1;
|
||||||
|
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error("Resend packet " + _packet + " time " + numSends + " on " + Connection.this);
|
_log.warn("Resend packet " + _packet + " time " + numSends + " on " + Connection.this);
|
||||||
_outboundQueue.enqueue(_packet);
|
_outboundQueue.enqueue(_packet);
|
||||||
|
|
||||||
if (numSends > _options.getMaxResends()) {
|
if (numSends > _options.getMaxResends()) {
|
||||||
@ -376,7 +384,8 @@ public class Connection {
|
|||||||
_log.debug("Too many resends");
|
_log.debug("Too many resends");
|
||||||
disconnect(false);
|
disconnect(false);
|
||||||
} else {
|
} else {
|
||||||
long timeout = _options.getResendDelay() << numSends;
|
//long timeout = _options.getResendDelay() << numSends;
|
||||||
|
long timeout = _options.getRTT() << numSends;
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Scheduling resend in " + timeout + "ms");
|
_log.debug("Scheduling resend in " + timeout + "ms");
|
||||||
SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
|
SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
|
||||||
|
@ -39,17 +39,23 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
|
|||||||
if (_connection.getUnackedPacketsReceived() > 0)
|
if (_connection.getUnackedPacketsReceived() > 0)
|
||||||
doSend = true;
|
doSend = true;
|
||||||
|
|
||||||
//if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.ERROR) && !doSend)
|
||||||
// _log.debug("writeData called: size="+size + " doSend=" + doSend + " con: " + _connection, new Exception("write called by"));
|
_log.error("writeData called: size="+size + " doSend=" + doSend
|
||||||
|
+ " unackedReceived: " + _connection.getUnackedPacketsReceived()
|
||||||
|
+ " con: " + _connection, new Exception("write called by"));
|
||||||
|
|
||||||
if (doSend) {
|
if (doSend) {
|
||||||
PacketLocal packet = buildPacket(buf, off, size);
|
send(buf, off, size);
|
||||||
_connection.sendPacket(packet);
|
|
||||||
} else {
|
} else {
|
||||||
//_connection.flushPackets();
|
//_connection.flushPackets();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void send(byte buf[], int off, int size) {
|
||||||
|
PacketLocal packet = buildPacket(buf, off, size);
|
||||||
|
_connection.sendPacket(packet);
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isAckOnly(int size) {
|
private boolean isAckOnly(int size) {
|
||||||
boolean ackOnly = ( (size <= 0) && // no data
|
boolean ackOnly = ( (size <= 0) && // no data
|
||||||
(_connection.getLastSendId() >= 0) && // not a SYN
|
(_connection.getLastSendId() >= 0) && // not a SYN
|
||||||
@ -63,7 +69,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
|
|||||||
boolean ackOnly = isAckOnly(size);
|
boolean ackOnly = isAckOnly(size);
|
||||||
PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer());
|
PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer());
|
||||||
byte data[] = new byte[size];
|
byte data[] = new byte[size];
|
||||||
System.arraycopy(buf, off, data, 0, size);
|
if (size > 0)
|
||||||
|
System.arraycopy(buf, off, data, 0, size);
|
||||||
packet.setPayload(data);
|
packet.setPayload(data);
|
||||||
if (ackOnly)
|
if (ackOnly)
|
||||||
packet.setSequenceNum(0);
|
packet.setSequenceNum(0);
|
||||||
|
@ -218,7 +218,7 @@ public class ConnectionManager {
|
|||||||
_packet = packet;
|
_packet = packet;
|
||||||
}
|
}
|
||||||
public void pong() {
|
public void pong() {
|
||||||
_log.error("Ping successful");
|
_log.debug("Ping successful");
|
||||||
_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
|
_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
|
||||||
synchronized (ConnectionManager.PingRequest.this) {
|
synchronized (ConnectionManager.PingRequest.this) {
|
||||||
_ponged = true;
|
_ponged = true;
|
||||||
|
@ -48,11 +48,14 @@ public class ConnectionPacketHandler {
|
|||||||
//con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2);
|
//con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2);
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("congestion.. dup " + packet);
|
_log.warn("congestion.. dup " + packet);
|
||||||
|
con.incrementUnackedPacketsReceived();
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("ACK only packet received: " + packet);
|
_log.debug("ACK only packet received: " + packet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int numResends = 0;
|
||||||
List acked = con.ackPackets(packet.getAckThrough(), packet.getNacks());
|
List acked = con.ackPackets(packet.getAckThrough(), packet.getNacks());
|
||||||
if ( (acked != null) && (acked.size() > 0) ) {
|
if ( (acked != null) && (acked.size() > 0) ) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
@ -62,8 +65,13 @@ public class ConnectionPacketHandler {
|
|||||||
int lowestRtt = -1;
|
int lowestRtt = -1;
|
||||||
for (int i = 0; i < acked.size(); i++) {
|
for (int i = 0; i < acked.size(); i++) {
|
||||||
PacketLocal p = (PacketLocal)acked.get(i);
|
PacketLocal p = (PacketLocal)acked.get(i);
|
||||||
if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) )
|
if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) ) {
|
||||||
lowestRtt = p.getAckTime();
|
if (p.getNumSends() <= 1)
|
||||||
|
lowestRtt = p.getAckTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p.getNumSends() > 1)
|
||||||
|
numResends++;
|
||||||
|
|
||||||
// ACK the tags we delivered so we can use them
|
// ACK the tags we delivered so we can use them
|
||||||
if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null)
|
if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null)
|
||||||
@ -75,12 +83,51 @@ public class ConnectionPacketHandler {
|
|||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Packet acked: " + p);
|
_log.debug("Packet acked: " + p);
|
||||||
}
|
}
|
||||||
int oldRTT = con.getOptions().getRTT();
|
if (lowestRtt > 0) {
|
||||||
int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*lowestRtt);
|
int oldRTT = con.getOptions().getRTT();
|
||||||
con.getOptions().setRTT(newRTT);
|
int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*lowestRtt);
|
||||||
|
con.getOptions().setRTT(newRTT);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends);
|
||||||
con.eventOccurred();
|
con.eventOccurred();
|
||||||
|
if (fastAck) {
|
||||||
|
if (con.getLastSendTime() + con.getOptions().getRTT() < _context.clock().now()) {
|
||||||
|
_log.error("Fast ack for dup " + packet);
|
||||||
|
con.ackImmediately();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends) {
|
||||||
|
if ( (!isNew) && (sequenceNum > 0) ) {
|
||||||
|
// dup real packet
|
||||||
|
int oldSize = con.getOptions().getWindowSize();
|
||||||
|
oldSize >>>= 1;
|
||||||
|
if (oldSize <= 0)
|
||||||
|
oldSize = 1;
|
||||||
|
con.getOptions().setWindowSize(oldSize);
|
||||||
|
return true;
|
||||||
|
} else if (numResends > 0) {
|
||||||
|
int newWindowSize = con.getOptions().getWindowSize();
|
||||||
|
newWindowSize /= 2; // >>>= numResends;
|
||||||
|
if (newWindowSize <= 0)
|
||||||
|
newWindowSize = 1;
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Shrink the window to " + newWindowSize + " (#resends: " + numResends
|
||||||
|
+ ") for " + con);
|
||||||
|
con.getOptions().setWindowSize(newWindowSize);
|
||||||
|
} else {
|
||||||
|
// new packet that ack'ed uncongested data, or an empty ack
|
||||||
|
int newWindowSize = con.getOptions().getWindowSize();
|
||||||
|
newWindowSize += 1; //acked.size();
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("New window size " + newWindowSize + " (#resends: " + numResends
|
||||||
|
+ ") for " + con);
|
||||||
|
con.getOptions().setWindowSize(newWindowSize);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -26,7 +26,28 @@ public class PacketHandler {
|
|||||||
_log = ctx.logManager().getLog(PacketHandler.class);
|
_log = ctx.logManager().getLog(PacketHandler.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean choke(Packet packet) {
|
||||||
|
if (false) {
|
||||||
|
// artificial choke: 2% random drop and a 1s
|
||||||
|
// random delay
|
||||||
|
if (_context.random().nextInt(100) >= 98) {
|
||||||
|
_log.error("DROP: " + packet);
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
int delay = _context.random().nextInt(1000);
|
||||||
|
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
|
||||||
|
_log.debug("OK : " + packet + " delay = " + delay);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void receivePacket(Packet packet) {
|
void receivePacket(Packet packet) {
|
||||||
|
boolean ok = choke(packet);
|
||||||
|
if (!ok) return;
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("packet received: " + packet);
|
_log.debug("packet received: " + packet);
|
||||||
|
|
||||||
@ -46,11 +67,11 @@ public class PacketHandler {
|
|||||||
|
|
||||||
private void displayPacket(Packet packet, Connection con) {
|
private void displayPacket(Packet packet, Connection con) {
|
||||||
if (_log.shouldLog(Log.DEBUG)) {
|
if (_log.shouldLog(Log.DEBUG)) {
|
||||||
//SimpleDateFormat fmt = new SimpleDateFormat("hh:mm:ss.SSS");
|
SimpleDateFormat fmt = new SimpleDateFormat("hh:mm:ss.SSS");
|
||||||
//String now = fmt.format(new Date());
|
String now = fmt.format(new Date());
|
||||||
String msg = packet + (con != null ? " on " + con : " on unknown con");
|
String msg = packet + (con != null ? " on " + con : " on unknown con");
|
||||||
_log.debug(msg);
|
//_log.debug(msg);
|
||||||
// System.out.println(now + ": " + msg);
|
System.out.println(now + ": " + msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ class PacketQueue {
|
|||||||
packet.setTagsSent(tagsSent);
|
packet.setTagsSent(tagsSent);
|
||||||
packet.incrementSends();
|
packet.incrementSends();
|
||||||
if (_log.shouldLog(Log.DEBUG)) {
|
if (_log.shouldLog(Log.DEBUG)) {
|
||||||
String msg = packet + " sent" + (tagsSent.size() > 0
|
String msg = "SEND " + packet + (tagsSent.size() > 0
|
||||||
? " with " + tagsSent.size() + " tags"
|
? " with " + tagsSent.size() + " tags"
|
||||||
: "")
|
: "")
|
||||||
+ " send # " + packet.getNumSends();
|
+ " send # " + packet.getNumSends();
|
||||||
|
Reference in New Issue
Block a user