if you want to get this data and you're running outside the router, just toss on a
"-Dstat.logFilters=* -Dstat.logFile=proxy.stats" to the java command line.  the resulting
file can be parsed w/ java -cp lib/i2p.jar net.i2p.stat.StatLogSplitter proxy.stats, then fed
into gnuplot or whatever
This commit is contained in:
jrandom
2004-11-19 00:00:05 +00:00
committed by zzz
parent 4a029b7853
commit ed8eced9dd
6 changed files with 79 additions and 2 deletions

View File

@ -64,6 +64,11 @@ public class Connection {
/** how many messages have been resent and not yet ACKed? */ /** how many messages have been resent and not yet ACKed? */
private int _activeResends; private int _activeResends;
private long _lifetimeBytesSent;
private long _lifetimeBytesReceived;
private long _lifetimeDupMessageSent;
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 60*1000; public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 20*1000; public static final long MIN_RESEND_DELAY = 20*1000;
@ -106,6 +111,7 @@ public class Connection {
_ackSinceCongestion = true; _ackSinceCongestion = true;
_connectLock = new Object(); _connectLock = new Object();
_activeResends = 0; _activeResends = 0;
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
} }
public long getNextOutboundPacketNum() { public long getNextOutboundPacketNum() {
@ -454,10 +460,24 @@ public class Connection {
public String getConnectionError() { return _connectionError; } public String getConnectionError() { return _connectionError; }
public void setConnectionError(String err) { _connectionError = err; } public void setConnectionError(String err) { _connectionError = err; }
public long getLifetime() { return _context.clock().now() - _createdOn; } public long getLifetime() {
if (_closeSentOn <= 0)
return _context.clock().now() - _createdOn;
else
return _closeSentOn - _createdOn;
}
public ConnectionPacketHandler getPacketHandler() { return _handler; } public ConnectionPacketHandler getPacketHandler() { return _handler; }
public long getLifetimeBytesSent() { return _lifetimeBytesSent; }
public long getLifetimeBytesReceived() { return _lifetimeBytesReceived; }
public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent; }
public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived; }
public void incrementBytesSent(int bytes) { _lifetimeBytesSent += bytes; }
public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent += msgs; }
public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived += bytes; }
public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived += msgs; }
/** /**
* Time when the scheduler next want to send a packet, or -1 if * Time when the scheduler next want to send a packet, or -1 if
* never. This should be set when we want to send on timeout, for * never. This should be set when we want to send on timeout, for
@ -720,6 +740,7 @@ public class Connection {
// shrink the window // shrink the window
int newWindowSize = getOptions().getWindowSize(); int newWindowSize = getOptions().getWindowSize();
congestionOccurred(); congestionOccurred();
_context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
newWindowSize /= 2; newWindowSize /= 2;
if (newWindowSize <= 0) if (newWindowSize <= 0)
newWindowSize = 1; newWindowSize = 1;

View File

@ -57,6 +57,15 @@ public class ConnectionManager {
_allowIncoming = false; _allowIncoming = false;
_maxConcurrentStreams = maxConcurrent; _maxConcurrentStreams = maxConcurrent;
_numWaiting = 0; _numWaiting = 0;
_context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
} }
Connection getConnectionByInboundId(byte[] id) { Connection getConnectionByInboundId(byte[] id) {
@ -140,6 +149,8 @@ public class ConnectionManager {
} }
return null; return null;
} }
_context.statManager().addRateData("stream.connectionReceived", 1, 0);
return con; return con;
} }
@ -207,6 +218,8 @@ public class ConnectionManager {
} }
if (_numWaiting > 0) if (_numWaiting > 0)
_numWaiting--; _numWaiting--;
_context.statManager().addRateData("stream.connectionCreated", 1, 0);
return con; return con;
} }
@ -261,6 +274,17 @@ public class ConnectionManager {
_log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values()); _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
_connectionLock.notifyAll(); _connectionLock.notifyAll();
} }
if (removed) {
_context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
}
} }
public Set listConnections() { public Set listConnections() {

View File

@ -21,6 +21,10 @@ public class ConnectionPacketHandler {
public ConnectionPacketHandler(I2PAppContext context) { public ConnectionPacketHandler(I2PAppContext context) {
_context = context; _context = context;
_log = context.logManager().getLog(ConnectionPacketHandler.class); _log = context.logManager().getLog(ConnectionPacketHandler.class);
_context.statManager().createRateStat("stream.con.receiveMessageSize", "Size of a message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.receiveDuplicateSize", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 });
} }
/** distribute a packet to the connection specified */ /** distribute a packet to the connection specified */
@ -28,6 +32,7 @@ public class ConnectionPacketHandler {
boolean ok = verifyPacket(packet, con); boolean ok = verifyPacket(packet, con);
if (!ok) return; if (!ok) return;
con.packetReceived(); con.packetReceived();
if (con.getInputStream().getTotalQueuedSize() > con.getOptions().getInboundBufferSize()) { if (con.getInputStream().getTotalQueuedSize() > con.getOptions().getInboundBufferSize()) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Inbound buffer exceeded on connection " + con + ": dropping " + packet); _log.warn("Inbound buffer exceeded on connection " + con + ": dropping " + packet);
@ -35,6 +40,9 @@ public class ConnectionPacketHandler {
return; return;
} }
con.getOptions().setChoke(0); con.getOptions().setChoke(0);
_context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize(), 0);
boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
@ -49,6 +57,8 @@ public class ConnectionPacketHandler {
if (isNew) { if (isNew) {
con.incrementUnackedPacketsReceived(); con.incrementUnackedPacketsReceived();
con.incrementBytesReceived(packet.getPayloadSize());
if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED) && (packet.getOptionalDelay() <= 0) ) { if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED) && (packet.getOptionalDelay() <= 0) ) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling immediate ack for " + packet); _log.debug("Scheduling immediate ack for " + packet);
@ -63,6 +73,9 @@ public class ConnectionPacketHandler {
} }
} else { } else {
if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ) { if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ) {
_context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize(), 0);
con.incrementDupMessagesReceived(1);
// take note of congestion // take note of congestion
//con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2); //con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2);
//con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2); //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2);
@ -96,6 +109,7 @@ public class ConnectionPacketHandler {
//if (p.getNumSends() <= 1) //if (p.getNumSends() <= 1)
highestRTT = p.getAckTime(); highestRTT = p.getAckTime();
} }
_context.statManager().addRateData("stream.sendsBeforeAck", p.getNumSends(), p.getAckTime());
if (p.getNumSends() > 1) if (p.getNumSends() > 1)
numResends++; numResends++;
@ -113,6 +127,7 @@ public class ConnectionPacketHandler {
if (highestRTT > 0) { if (highestRTT > 0) {
con.getOptions().updateRTT(highestRTT); con.getOptions().updateRTT(highestRTT);
} }
_context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT);
} }
boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0)); boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0));

View File

@ -18,6 +18,7 @@ public class MessageHandler implements I2PSessionListener {
_manager = mgr; _manager = mgr;
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(MessageHandler.class); _log = ctx.logManager().getLog(MessageHandler.class);
_context.statManager().createRateStat("stream.packetReceiveFailure", "When do we fail to decrypt or otherwise receive a packet sent to us?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
} }
/** Instruct the client that the given session has received a message with /** Instruct the client that the given session has received a message with
@ -31,6 +32,7 @@ public class MessageHandler implements I2PSessionListener {
try { try {
data = session.receiveMessage(msgId); data = session.receiveMessage(msgId);
} catch (I2PSessionException ise) { } catch (I2PSessionException ise) {
_context.statManager().addRateData("stream.packetReceiveFailure", 1, 0);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error receiving the message", ise); _log.warn("Error receiving the message", ise);
return; return;
@ -40,6 +42,7 @@ public class MessageHandler implements I2PSessionListener {
packet.readPacket(data, 0, data.length); packet.readPacket(data, 0, data.length);
_manager.getPacketHandler().receivePacket(packet); _manager.getPacketHandler().receivePacket(packet);
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
_context.statManager().addRateData("stream.packetReceiveFailure", 1, 0);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Received an invalid packet", iae); _log.warn("Received an invalid packet", iae);
} }

View File

@ -67,6 +67,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
} }
public long getCreatedOn() { return _createdOn; } public long getCreatedOn() { return _createdOn; }
public long getLifetime() { return _context.clock().now() - _createdOn; }
public void incrementSends() { public void incrementSends() {
_numSends++; _numSends++;
_lastSend = _context.clock().now(); _lastSend = _context.clock().now();

View File

@ -29,6 +29,8 @@ class PacketQueue {
_connectionManager = mgr; _connectionManager = mgr;
_buf = _cache.acquire().getData(); // new byte[36*1024]; _buf = _cache.acquire().getData(); // new byte[36*1024];
_log = context.logManager().getLog(PacketQueue.class); _log = context.logManager().getLog(PacketQueue.class);
_context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
} }
protected void finalize() throws Throwable { protected void finalize() throws Throwable {
@ -62,9 +64,9 @@ class PacketQueue {
long end = 0; long end = 0;
boolean sent = false; boolean sent = false;
try { try {
int size = 0;
synchronized (this) { synchronized (this) {
Arrays.fill(_buf, (byte)0x0); Arrays.fill(_buf, (byte)0x0);
int size = 0;
if (packet.shouldSign()) if (packet.shouldSign())
size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey()); size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey());
else else
@ -75,6 +77,17 @@ class PacketQueue {
sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent); sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent);
end = _context.clock().now(); end = _context.clock().now();
} }
_context.statManager().addRateData("stream.con.sendMessageSize", size, packet.getLifetime());
if (packet.getNumSends() > 1)
_context.statManager().addRateData("stream.con.sendDuplicateSize", size, packet.getLifetime());
Connection con = packet.getConnection();
if (con != null) {
con.incrementBytesSent(size);
if (packet.getNumSends() > 1)
con.incrementDupMessagesSent(1);
}
} catch (I2PSessionException ise) { } catch (I2PSessionException ise) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send the packet " + packet, ise); _log.warn("Unable to send the packet " + packet, ise);