diff --git a/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java b/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java index d9aa56cf5..a2b625e3f 100644 --- a/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java +++ b/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java @@ -26,7 +26,10 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream { super(source); _context = context; _peer = peer; - _peerTarget = peer.getHash().toBase64(); + if (peer != null) + _peerTarget = peer.getHash().toBase64(); + else + _peerTarget = "unknown"; _log = context.logManager().getLog(BandwidthLimitedOutputStream.class); } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index aadb8a322..e6a62ab76 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -30,6 +30,10 @@ public class FIFOBandwidthLimiter { public FIFOBandwidthLimiter(I2PAppContext context) { _context = context; _log = context.logManager().getLog(FIFOBandwidthLimiter.class); + _context.statManager().createRateStat("bwLimiter.pendingOutboundRequests", "How many outbound requests are ahead of the current one (ignoring ones with 0)?", "BandwidthLimiter", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l }); + _context.statManager().createRateStat("bwLimiter.pendingInboundRequests", "How many inbound requests are ahead of the current one (ignoring ones with 0)?", "BandwidthLimiter", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l }); + _context.statManager().createRateStat("bwLimiter.outboundDelayedTime", "How long it takes to honor an outbound request (ignoring ones with that go instantly)?", "BandwidthLimiter", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l }); + _context.statManager().createRateStat("bwLimiter.inboundDelayedTime", "How long it takes to honor an inbound request (ignoring ones with that go instantly)?", "BandwidthLimiter", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l }); _pendingInboundRequests = new ArrayList(16); _pendingOutboundRequests = new ArrayList(16); _refiller = new FIFOBandwidthRefiller(_context, this); @@ -69,10 +73,14 @@ public class FIFOBandwidthLimiter { */ public Request requestInbound(int bytesIn, String purpose) { SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose); + int pending = 0; synchronized (_pendingInboundRequests) { + pending = _pendingInboundRequests.size(); _pendingInboundRequests.add(req); } satisfyInboundRequests(); + if (pending > 0) + _context.statManager().addRateData("bwLimiter.pendingInboundRequests", pending, pending); return req; } /** @@ -81,10 +89,14 @@ public class FIFOBandwidthLimiter { */ public Request requestOutbound(int bytesOut, String purpose) { SimpleRequest req = new SimpleRequest(0, bytesOut, purpose); + int pending = 0; synchronized (_pendingOutboundRequests) { + pending = _pendingOutboundRequests.size(); _pendingOutboundRequests.add(req); } satisfyOutboundRequests(); + if (pending > 0) + _context.statManager().addRateData("bwLimiter.pendingOutboundRequests", pending, pending); return req; } @@ -125,13 +137,16 @@ public class FIFOBandwidthLimiter { if (satisfied == null) satisfied = new ArrayList(2); satisfied.add(req); + long waited = _context.clock().now() - req.getRequestTime(); if (_log.shouldLog(Log.INFO)) _log.info("Granting inbound request " + req.getRequestName() + " fully for " + req.getTotalInboundRequested() + " bytes (waited " - + (_context.clock().now() - req.getRequestTime()) + + waited + "ms) pending " + _pendingInboundRequests.size()); // if we're unlimited, we always grant it fully, so there's no need to keep it around _pendingInboundRequests.remove(0); + if (waited > 10) + _context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited); } else if (_availableInboundBytes > 0) { int requested = req.getPendingInboundRequested(); int allocated = 0; @@ -140,25 +155,29 @@ public class FIFOBandwidthLimiter { else allocated = _availableInboundBytes; _availableInboundBytes -= allocated; + _totalAllocatedInboundBytes += allocated; req.allocateBytes(allocated, 0); if (satisfied == null) satisfied = new ArrayList(2); satisfied.add(req); + long waited = _context.clock().now() - req.getRequestTime(); if (req.getPendingInboundRequested() > 0) { if (_log.shouldLog(Log.INFO)) _log.info("Allocating " + allocated + " bytes inbound as a partial grant to " + req.getRequestName() + " (wanted " + req.getTotalInboundRequested() + " bytes, waited " - + (_context.clock().now() - req.getRequestTime()) + + waited + "ms) pending " + _pendingInboundRequests.size()); } else { if (_log.shouldLog(Log.INFO)) _log.info("Allocating " + allocated + " bytes inbound to finish the partial grant to " + req.getRequestName() + " (total " + req.getTotalInboundRequested() + " bytes, waited " - + (_context.clock().now() - req.getRequestTime()) + + waited + "ms) pending " + _pendingInboundRequests.size()); _pendingInboundRequests.remove(0); + if (waited > 10) + _context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited); } } else { // no bandwidth available @@ -194,13 +213,16 @@ public class FIFOBandwidthLimiter { if (satisfied == null) satisfied = new ArrayList(2); satisfied.add(req); + long waited = _context.clock().now() - req.getRequestTime(); if (_log.shouldLog(Log.INFO)) _log.info("Granting outbound request " + req.getRequestName() + " fully for " + req.getTotalOutboundRequested() + " bytes (waited " - + (_context.clock().now() - req.getRequestTime()) + + waited + "ms) pending " + _pendingOutboundRequests.size()); // if we're unlimited, we always grant it fully, so there's no need to keep it around _pendingOutboundRequests.remove(0); + if (waited > 10) + _context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited); } else if (_availableOutboundBytes > 0) { int requested = req.getPendingOutboundRequested(); int allocated = 0; @@ -209,25 +231,29 @@ public class FIFOBandwidthLimiter { else allocated = _availableOutboundBytes; _availableOutboundBytes -= allocated; + _totalAllocatedOutboundBytes += allocated; req.allocateBytes(0, allocated); if (satisfied == null) satisfied = new ArrayList(2); satisfied.add(req); + long waited = _context.clock().now() - req.getRequestTime(); if (req.getPendingOutboundRequested() > 0) { if (_log.shouldLog(Log.INFO)) _log.info("Allocating " + allocated + " bytes outbound as a partial grant to " + req.getRequestName() + " (wanted " + req.getTotalOutboundRequested() + " bytes, waited " - + (_context.clock().now() - req.getRequestTime()) + + waited + "ms) pending " + _pendingOutboundRequests.size()); } else { if (_log.shouldLog(Log.INFO)) _log.info("Allocating " + allocated + " bytes outbound to finish the partial grant to " + req.getRequestName() + " (total " + req.getTotalOutboundRequested() + " bytes, waited " - + (_context.clock().now() - req.getRequestTime()) + + waited + "ms) pending " + _pendingOutboundRequests.size()); _pendingOutboundRequests.remove(0); + if (waited > 10) + _context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited); } } else { // no bandwidth available diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 8e61804c6..2a282861b 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -11,7 +11,7 @@ package net.i2p.router.transport; import java.util.Date; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -41,11 +41,14 @@ public abstract class TransportImpl implements Transport { _context = context; _log = _context.logManager().getLog(TransportImpl.class); - _context.statManager().createFrequencyStat("transport.sendMessageFailureFrequency", "How often do we fail to send messages?", "Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("transport.sendMessageFailureLifetime", "How long the lifetime of messages that fail are?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("transport.sendMessageSize", "How large are the messages sent?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("transport.receiveMessageSize", "How large are the messages received?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - _context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); - _sendPool = new LinkedList(); + _context.statManager().createRateStat("transport.receiveMessageTime", "How long it takes to read a message?", "Transport", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("transport.receiveMessageTimeSlow", "How long it takes to read a message (when it takes more than a second)?", "Transport", new long[] { 60*1000l, 5*60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); + _sendPool = new ArrayList(16); _currentAddresses = new HashSet(); } @@ -93,6 +96,9 @@ public abstract class TransportImpl implements Transport { _log.info("Failed to send message " + msg.getMessageType() + " to " + msg.getTarget().getIdentity().getHash().toBase64() + " with transport " + getStyle() + " (details: " + msg + ")"); + if (msg.getExpiration() < _context.clock().now()) + _context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime); + if (allowRequeue) { if ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) { // this may not be the last transport available - keep going @@ -163,7 +169,7 @@ public abstract class TransportImpl implements Transport { _context.statManager().addRateData("transport.sendMessageSize", msg.getMessageSize(), sendTime); } else { _context.profileManager().messageFailed(msg.getTarget().getIdentity().getHash(), getStyle()); - _context.statManager().updateFrequency("transport.sendMessageFailureFrequency"); + _context.statManager().addRateData("transport.sendMessageFailureLifetime", lifetime, lifetime); } } @@ -227,6 +233,10 @@ public abstract class TransportImpl implements Transport { _context.profileManager().messageReceived(remoteIdentHash, getStyle(), msToReceive, bytesReceived); _context.statManager().addRateData("transport.receiveMessageSize", bytesReceived, msToReceive); } + + _context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive); + if (msToReceive > 1000) + _context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive); //// this functionality is built into the InNetMessagePool //String type = inMsg.getClass().getName(); diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index b9cd6319b..3d1d959f9 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -71,8 +71,14 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) throws IOException { _context = context; _log = context.logManager().getLog(TCPConnection.class); - _context.statManager().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added?", + _context.statManager().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added (only when it wasnt empty)?", "TCP Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("tcp.writeTimeLarge", "How long it takes to write a message that is over 2K?", + "TCP Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("tcp.writeTimeSmall", "How long it takes to write a message that is under 2K?", + "TCP Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("tcp.writeTimeSlow", "How long it takes to write a message (ignoring messages transferring in under a second)?", + "TCP Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _id = ++_idCounter; _weInitiated = locallyInitiated; _closed = false; @@ -287,7 +293,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { } long afterAdd = _context.clock().now(); - _context.statManager().addRateData("tcp.queueSize", totalPending-1, 0); + if (totalPending >= 2) + _context.statManager().addRateData("tcp.queueSize", totalPending-1, 0); if (removed != null) { if (_log.shouldLog(Log.WARN)) @@ -562,6 +569,12 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { + "ms) sending " + data.length + " bytes to " + _remoteIdentity.getHash().toBase64()); } + if (data.length > 2*1024) + _context.statManager().addRateData("tcp.writeTimeLarge", end - beforeWrite, end - beforeWrite); + else + _context.statManager().addRateData("tcp.writeTimeSmall", end - beforeWrite, end - beforeWrite); + if (end-beforeWrite > 1*1024) + _context.statManager().addRateData("tcp.writeTimeSlow", end - beforeWrite, end - beforeWrite); return true; }