* Streaming:

- Add more info to Connection.toString() for debugging
      - Fix lifetimeMessages{Sent,Received} stats
      - Reduce RTT damping to 0.875 (was 0.9)
      - Add a stream.con.initialRTT.{in,out} stats
This commit is contained in:
zzz
2008-11-12 20:10:39 +00:00
parent 98038e9282
commit 049d6b2fa8
4 changed files with 44 additions and 11 deletions

View File

@ -44,6 +44,7 @@ public class Connection {
private int _unackedPacketsReceived; private int _unackedPacketsReceived;
private long _congestionWindowEnd; private long _congestionWindowEnd;
private long _highestAckedThrough; private long _highestAckedThrough;
private boolean _isInbound;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */ /** Packet ID (Long) to PacketLocal for sent but unacked packets */
private Map _outboundPackets; private Map _outboundPackets;
private PacketQueue _outboundQueue; private PacketQueue _outboundQueue;
@ -118,6 +119,7 @@ public class Connection {
_connectLock = new Object(); _connectLock = new Object();
_activeResends = 0; _activeResends = 0;
_resetSentOn = -1; _resetSentOn = -1;
_isInbound = false;
_connectionEvent = new ConEvent(); _connectionEvent = new ConEvent();
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -470,6 +472,8 @@ public class Connection {
} }
public boolean getResetReceived() { return _resetReceived; } public boolean getResetReceived() { return _resetReceived; }
public void setInbound() { _isInbound = true; }
public boolean isInbound() { return _isInbound; }
public boolean getIsConnected() { return _connected; } public boolean getIsConnected() { return _connected; }
public boolean getHardDisconnected() { return _hardDisconnected; } public boolean getHardDisconnected() { return _hardDisconnected; }
public boolean getResetSent() { return _resetSent; } public boolean getResetSent() { return _resetSent; }
@ -908,6 +912,15 @@ public class Connection {
buf.append(Packet.toId(_sendStreamId)); buf.append(Packet.toId(_sendStreamId));
else else
buf.append("unknown"); buf.append("unknown");
if (_isInbound)
buf.append(" from ");
else
buf.append(" to ");
if (_remotePeerSet)
buf.append(_remotePeer.calculateHash().toBase64().substring(0,4));
else
buf.append("unknown");
buf.append(" up ").append(DataHelper.formatDuration(_context.clock().now() - _createdOn));
buf.append(" wsize: ").append(_options.getWindowSize()); buf.append(" wsize: ").append(_options.getWindowSize());
buf.append(" cwin: ").append(_congestionWindowEnd - _highestAckedThrough); buf.append(" cwin: ").append(_congestionWindowEnd - _highestAckedThrough);
buf.append(" rtt: ").append(_options.getRTT()); buf.append(" rtt: ").append(_options.getRTT());
@ -925,14 +938,13 @@ public class Connection {
} }
*/ */
buf.append("unacked in: ").append(getUnackedPacketsReceived()); buf.append("unacked in: ").append(getUnackedPacketsReceived());
int missing = 0;
if (_inputStream != null) { if (_inputStream != null) {
buf.append(" [high ");
buf.append(_inputStream.getHighestBlockId());
long nacks[] = _inputStream.getNacks(); long nacks[] = _inputStream.getNacks();
if (nacks != null) if (nacks != null) {
for (int i = 0; i < nacks.length; i++) missing = nacks.length;
buf.append(" ").append(nacks[i]); buf.append(" [").append(missing).append(" missing]");
buf.append("]"); }
} }
if (getResetSent()) if (getResetSent())
@ -947,7 +959,9 @@ public class Connection {
} }
if (getCloseReceivedOn() > 0) if (getCloseReceivedOn() > 0)
buf.append(" close received"); buf.append(" close received");
buf.append(" acked: ").append(getAckedPackets()); buf.append(" sent: ").append(1 + _lastSendId);
if (_inputStream != null)
buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
buf.append(" maxWin ").append(getOptions().getMaxWindowSize()); buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
buf.append(" MTU ").append(getOptions().getMaxMessageSize()); buf.append(" MTU ").append(getOptions().getMaxMessageSize());

View File

@ -127,6 +127,7 @@ public class ConnectionManager {
*/ */
public Connection receiveConnection(Packet synPacket) { public Connection receiveConnection(Packet synPacket) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
con.setInbound();
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
boolean reject = false; boolean reject = false;
int active = 0; int active = 0;
@ -311,8 +312,15 @@ public class ConnectionManager {
_connectionLock.notifyAll(); _connectionLock.notifyAll();
} }
if (removed) { if (removed) {
_context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime()); MessageInputStream stream = con.getInputStream();
if (stream != null) {
long rcvd = 1 + stream.getHighestBlockId();
long nacks[] = stream.getNacks();
if (nacks != null)
rcvd -= nacks.length;
_context.statManager().addRateData("stream.con.lifetimeMessagesReceived", rcvd, con.getLifetime());
}
_context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());

View File

@ -355,7 +355,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
} }
/** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */ /** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */
private static final double RTT_DAMPENING = 0.9; /** This is the value specified in RFC 2988, let's try it */
private static final double RTT_DAMPENING = 0.875;
public void updateRTT(int measuredValue) { public void updateRTT(int measuredValue) {
_rttDev = _rttDev + (int)(0.25d*(Math.abs(measuredValue-_rtt)-_rttDev)); _rttDev = _rttDev + (int)(0.25d*(Math.abs(measuredValue-_rtt)-_rttDev));

View File

@ -23,10 +23,12 @@ public class ConnectionPacketHandler {
_log = context.logManager().getLog(ConnectionPacketHandler.class); _log = context.logManager().getLog(ConnectionPacketHandler.class);
_context.statManager().createRateStat("stream.con.receiveMessageSize", "Size of a message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.receiveMessageSize", "Size of a message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.receiveDuplicateSize", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.receiveDuplicateSize", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Avg number of acks in a message", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.resetReceived", "How many messages had we sent successfully before receiving a RESET?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.resetReceived", "How many messages had we sent successfully before receiving a RESET?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.initialRTT.in", "What is the actual RTT for the first packet of an inbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.initialRTT.out", "What is the actual RTT for the first packet of an outbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 });
} }
/** distribute a packet to the connection specified */ /** distribute a packet to the connection specified */
@ -220,6 +222,8 @@ public class ConnectionPacketHandler {
//if ( (nacks != null) && (nacks.length > 0) ) //if ( (nacks != null) && (nacks.length > 0) )
// con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); // con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000);
boolean firstAck = isNew && con.getHighestAckedThrough() < 0;
int numResends = 0; int numResends = 0;
List acked = null; List acked = null;
// if we don't know the streamIds for both sides of the connection, there's no way we // if we don't know the streamIds for both sides of the connection, there's no way we
@ -265,6 +269,12 @@ public class ConnectionPacketHandler {
} }
if (highestRTT > 0) { if (highestRTT > 0) {
con.getOptions().updateRTT(highestRTT); con.getOptions().updateRTT(highestRTT);
if (firstAck) {
if (con.isInbound())
_context.statManager().addRateData("stream.con.initialRTT.in", highestRTT, 0);
else
_context.statManager().addRateData("stream.con.initialRTT.out", highestRTT, 0);
}
} }
_context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT); _context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT);
} }