stats and stats and stats
track the total allocated bytes correctly (even if we're throttled)
This commit is contained in:
@ -26,7 +26,10 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
|
||||
super(source);
|
||||
_context = context;
|
||||
_peer = peer;
|
||||
_peerTarget = peer.getHash().toBase64();
|
||||
if (peer != null)
|
||||
_peerTarget = peer.getHash().toBase64();
|
||||
else
|
||||
_peerTarget = "unknown";
|
||||
_log = context.logManager().getLog(BandwidthLimitedOutputStream.class);
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,10 @@ public class FIFOBandwidthLimiter {
|
||||
public FIFOBandwidthLimiter(I2PAppContext context) {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(FIFOBandwidthLimiter.class);
|
||||
_context.statManager().createRateStat("bwLimiter.pendingOutboundRequests", "How many outbound requests are ahead of the current one (ignoring ones with 0)?", "BandwidthLimiter", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l });
|
||||
_context.statManager().createRateStat("bwLimiter.pendingInboundRequests", "How many inbound requests are ahead of the current one (ignoring ones with 0)?", "BandwidthLimiter", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l });
|
||||
_context.statManager().createRateStat("bwLimiter.outboundDelayedTime", "How long it takes to honor an outbound request (ignoring ones with that go instantly)?", "BandwidthLimiter", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l });
|
||||
_context.statManager().createRateStat("bwLimiter.inboundDelayedTime", "How long it takes to honor an inbound request (ignoring ones with that go instantly)?", "BandwidthLimiter", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l });
|
||||
_pendingInboundRequests = new ArrayList(16);
|
||||
_pendingOutboundRequests = new ArrayList(16);
|
||||
_refiller = new FIFOBandwidthRefiller(_context, this);
|
||||
@ -69,10 +73,14 @@ public class FIFOBandwidthLimiter {
|
||||
*/
|
||||
public Request requestInbound(int bytesIn, String purpose) {
|
||||
SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose);
|
||||
int pending = 0;
|
||||
synchronized (_pendingInboundRequests) {
|
||||
pending = _pendingInboundRequests.size();
|
||||
_pendingInboundRequests.add(req);
|
||||
}
|
||||
satisfyInboundRequests();
|
||||
if (pending > 0)
|
||||
_context.statManager().addRateData("bwLimiter.pendingInboundRequests", pending, pending);
|
||||
return req;
|
||||
}
|
||||
/**
|
||||
@ -81,10 +89,14 @@ public class FIFOBandwidthLimiter {
|
||||
*/
|
||||
public Request requestOutbound(int bytesOut, String purpose) {
|
||||
SimpleRequest req = new SimpleRequest(0, bytesOut, purpose);
|
||||
int pending = 0;
|
||||
synchronized (_pendingOutboundRequests) {
|
||||
pending = _pendingOutboundRequests.size();
|
||||
_pendingOutboundRequests.add(req);
|
||||
}
|
||||
satisfyOutboundRequests();
|
||||
if (pending > 0)
|
||||
_context.statManager().addRateData("bwLimiter.pendingOutboundRequests", pending, pending);
|
||||
return req;
|
||||
}
|
||||
|
||||
@ -125,13 +137,16 @@ public class FIFOBandwidthLimiter {
|
||||
if (satisfied == null)
|
||||
satisfied = new ArrayList(2);
|
||||
satisfied.add(req);
|
||||
long waited = _context.clock().now() - req.getRequestTime();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting inbound request " + req.getRequestName() + " fully for "
|
||||
+ req.getTotalInboundRequested() + " bytes (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ waited
|
||||
+ "ms) pending " + _pendingInboundRequests.size());
|
||||
// if we're unlimited, we always grant it fully, so there's no need to keep it around
|
||||
_pendingInboundRequests.remove(0);
|
||||
if (waited > 10)
|
||||
_context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited);
|
||||
} else if (_availableInboundBytes > 0) {
|
||||
int requested = req.getPendingInboundRequested();
|
||||
int allocated = 0;
|
||||
@ -140,25 +155,29 @@ public class FIFOBandwidthLimiter {
|
||||
else
|
||||
allocated = _availableInboundBytes;
|
||||
_availableInboundBytes -= allocated;
|
||||
_totalAllocatedInboundBytes += allocated;
|
||||
req.allocateBytes(allocated, 0);
|
||||
if (satisfied == null)
|
||||
satisfied = new ArrayList(2);
|
||||
satisfied.add(req);
|
||||
long waited = _context.clock().now() - req.getRequestTime();
|
||||
if (req.getPendingInboundRequested() > 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Allocating " + allocated + " bytes inbound as a partial grant to "
|
||||
+ req.getRequestName() + " (wanted "
|
||||
+ req.getTotalInboundRequested() + " bytes, waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ waited
|
||||
+ "ms) pending " + _pendingInboundRequests.size());
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Allocating " + allocated + " bytes inbound to finish the partial grant to "
|
||||
+ req.getRequestName() + " (total "
|
||||
+ req.getTotalInboundRequested() + " bytes, waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ waited
|
||||
+ "ms) pending " + _pendingInboundRequests.size());
|
||||
_pendingInboundRequests.remove(0);
|
||||
if (waited > 10)
|
||||
_context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited);
|
||||
}
|
||||
} else {
|
||||
// no bandwidth available
|
||||
@ -194,13 +213,16 @@ public class FIFOBandwidthLimiter {
|
||||
if (satisfied == null)
|
||||
satisfied = new ArrayList(2);
|
||||
satisfied.add(req);
|
||||
long waited = _context.clock().now() - req.getRequestTime();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting outbound request " + req.getRequestName() + " fully for "
|
||||
+ req.getTotalOutboundRequested() + " bytes (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ waited
|
||||
+ "ms) pending " + _pendingOutboundRequests.size());
|
||||
// if we're unlimited, we always grant it fully, so there's no need to keep it around
|
||||
_pendingOutboundRequests.remove(0);
|
||||
if (waited > 10)
|
||||
_context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited);
|
||||
} else if (_availableOutboundBytes > 0) {
|
||||
int requested = req.getPendingOutboundRequested();
|
||||
int allocated = 0;
|
||||
@ -209,25 +231,29 @@ public class FIFOBandwidthLimiter {
|
||||
else
|
||||
allocated = _availableOutboundBytes;
|
||||
_availableOutboundBytes -= allocated;
|
||||
_totalAllocatedOutboundBytes += allocated;
|
||||
req.allocateBytes(0, allocated);
|
||||
if (satisfied == null)
|
||||
satisfied = new ArrayList(2);
|
||||
satisfied.add(req);
|
||||
long waited = _context.clock().now() - req.getRequestTime();
|
||||
if (req.getPendingOutboundRequested() > 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Allocating " + allocated + " bytes outbound as a partial grant to "
|
||||
+ req.getRequestName() + " (wanted "
|
||||
+ req.getTotalOutboundRequested() + " bytes, waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ waited
|
||||
+ "ms) pending " + _pendingOutboundRequests.size());
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Allocating " + allocated + " bytes outbound to finish the partial grant to "
|
||||
+ req.getRequestName() + " (total "
|
||||
+ req.getTotalOutboundRequested() + " bytes, waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ waited
|
||||
+ "ms) pending " + _pendingOutboundRequests.size());
|
||||
_pendingOutboundRequests.remove(0);
|
||||
if (waited > 10)
|
||||
_context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited);
|
||||
}
|
||||
} else {
|
||||
// no bandwidth available
|
||||
|
@ -11,7 +11,7 @@ package net.i2p.router.transport;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@ -41,11 +41,14 @@ public abstract class TransportImpl implements Transport {
|
||||
_context = context;
|
||||
_log = _context.logManager().getLog(TransportImpl.class);
|
||||
|
||||
_context.statManager().createFrequencyStat("transport.sendMessageFailureFrequency", "How often do we fail to send messages?", "Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("transport.sendMessageFailureLifetime", "How long the lifetime of messages that fail are?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("transport.sendMessageSize", "How large are the messages sent?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("transport.receiveMessageSize", "How large are the messages received?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_sendPool = new LinkedList();
|
||||
_context.statManager().createRateStat("transport.receiveMessageTime", "How long it takes to read a message?", "Transport", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("transport.receiveMessageTimeSlow", "How long it takes to read a message (when it takes more than a second)?", "Transport", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
|
||||
_sendPool = new ArrayList(16);
|
||||
_currentAddresses = new HashSet();
|
||||
}
|
||||
|
||||
@ -93,6 +96,9 @@ public abstract class TransportImpl implements Transport {
|
||||
_log.info("Failed to send message " + msg.getMessageType()
|
||||
+ " to " + msg.getTarget().getIdentity().getHash().toBase64()
|
||||
+ " with transport " + getStyle() + " (details: " + msg + ")");
|
||||
if (msg.getExpiration() < _context.clock().now())
|
||||
_context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime);
|
||||
|
||||
if (allowRequeue) {
|
||||
if ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) {
|
||||
// this may not be the last transport available - keep going
|
||||
@ -163,7 +169,7 @@ public abstract class TransportImpl implements Transport {
|
||||
_context.statManager().addRateData("transport.sendMessageSize", msg.getMessageSize(), sendTime);
|
||||
} else {
|
||||
_context.profileManager().messageFailed(msg.getTarget().getIdentity().getHash(), getStyle());
|
||||
_context.statManager().updateFrequency("transport.sendMessageFailureFrequency");
|
||||
_context.statManager().addRateData("transport.sendMessageFailureLifetime", lifetime, lifetime);
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,6 +233,10 @@ public abstract class TransportImpl implements Transport {
|
||||
_context.profileManager().messageReceived(remoteIdentHash, getStyle(), msToReceive, bytesReceived);
|
||||
_context.statManager().addRateData("transport.receiveMessageSize", bytesReceived, msToReceive);
|
||||
}
|
||||
|
||||
_context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive);
|
||||
if (msToReceive > 1000)
|
||||
_context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive);
|
||||
|
||||
//// this functionality is built into the InNetMessagePool
|
||||
//String type = inMsg.getClass().getName();
|
||||
|
@ -71,8 +71,14 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) throws IOException {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(TCPConnection.class);
|
||||
_context.statManager().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added?",
|
||||
_context.statManager().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added (only when it wasnt empty)?",
|
||||
"TCP Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("tcp.writeTimeLarge", "How long it takes to write a message that is over 2K?",
|
||||
"TCP Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("tcp.writeTimeSmall", "How long it takes to write a message that is under 2K?",
|
||||
"TCP Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("tcp.writeTimeSlow", "How long it takes to write a message (ignoring messages transferring in under a second)?",
|
||||
"TCP Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_id = ++_idCounter;
|
||||
_weInitiated = locallyInitiated;
|
||||
_closed = false;
|
||||
@ -287,7 +293,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
}
|
||||
long afterAdd = _context.clock().now();
|
||||
|
||||
_context.statManager().addRateData("tcp.queueSize", totalPending-1, 0);
|
||||
if (totalPending >= 2)
|
||||
_context.statManager().addRateData("tcp.queueSize", totalPending-1, 0);
|
||||
|
||||
if (removed != null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
@ -562,6 +569,12 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
+ "ms) sending " + data.length + " bytes to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
}
|
||||
if (data.length > 2*1024)
|
||||
_context.statManager().addRateData("tcp.writeTimeLarge", end - beforeWrite, end - beforeWrite);
|
||||
else
|
||||
_context.statManager().addRateData("tcp.writeTimeSmall", end - beforeWrite, end - beforeWrite);
|
||||
if (end-beforeWrite > 1*1024)
|
||||
_context.statManager().addRateData("tcp.writeTimeSlow", end - beforeWrite, end - beforeWrite);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user