Making FIFOBandwithLimiter.Request unidirectional, static,

remove logging, other cleanups (ticket #719)
This commit is contained in:
zzz
2012-10-09 14:15:04 +00:00
parent 56574c41be
commit 6f509967bf
4 changed files with 99 additions and 102 deletions

View File

@ -59,6 +59,7 @@ public class FIFOBandwidthLimiter {
private final AtomicLong _totalAllocatedInboundBytes = new AtomicLong();
/** lifetime counter of bytes sent */
private final AtomicLong _totalAllocatedOutboundBytes = new AtomicLong();
private static final AtomicLong __requestId = new AtomicLong();
/** lifetime counter of tokens available for use but exceeded our maxInboundBurst size */
//private final AtomicLong _totalWastedInboundBytes = new AtomicLong();
@ -202,7 +203,7 @@ public class FIFOBandwidthLimiter {
}
public Request requestInbound(int bytesIn, String purpose, CompleteListener lsnr, Object attachment) {
SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose, lsnr, attachment);
SimpleRequest req = new SimpleRequest(bytesIn, purpose, lsnr, attachment);
requestInbound(req, bytesIn, purpose);
return req;
}
@ -237,7 +238,7 @@ public class FIFOBandwidthLimiter {
}
public Request requestOutbound(int bytesOut, String purpose, CompleteListener lsnr, Object attachment) {
SimpleRequest req = new SimpleRequest(0, bytesOut, purpose, lsnr, attachment);
SimpleRequest req = new SimpleRequest(bytesOut, purpose, lsnr, attachment);
requestOutbound(req, bytesOut, purpose);
return req;
}
@ -489,14 +490,13 @@ public class FIFOBandwidthLimiter {
private final void locked_satisfyInboundUnlimited(List<Request> satisfied) {
while (!_pendingInboundRequests.isEmpty()) {
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.remove(0);
int allocated = req.getPendingInboundRequested();
int allocated = req.getPendingRequested();
_totalAllocatedInboundBytes.addAndGet(allocated);
req.allocateBytes(allocated, 0);
req.allocateBytes(allocated);
satisfied.add(req);
long waited = now() - req.getRequestTime();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Granting inbound request " + req.getRequestName() + " fully for "
+ req.getTotalInboundRequested() + " bytes (waited "
_log.debug("Granting inbound request " + req + " fully (waited "
+ waited
+ "ms) pending " + _pendingInboundRequests.size());
if (waited > 10)
@ -520,8 +520,8 @@ public class FIFOBandwidthLimiter {
// connection decided they dont want the data anymore
if (_log.shouldLog(Log.DEBUG))
_log.debug("Aborting inbound request to "
+ req.getRequestName() + " (total "
+ req.getTotalInboundRequested() + " bytes, waited "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingInboundRequests.size());
_pendingInboundRequests.remove(i);
@ -537,7 +537,7 @@ public class FIFOBandwidthLimiter {
continue;
}
// ok, they are really waiting for us to give them stuff
int requested = req.getPendingInboundRequested();
int requested = req.getPendingRequested();
int avi = _availableInbound.get();
int allocated;
if (avi >= requested)
@ -546,21 +546,21 @@ public class FIFOBandwidthLimiter {
allocated = avi;
_availableInbound.addAndGet(0 - allocated);
_totalAllocatedInboundBytes.addAndGet(allocated);
req.allocateBytes(allocated, 0);
req.allocateBytes(allocated);
satisfied.add(req);
if (req.getPendingInboundRequested() > 0) {
if (req.getPendingRequested() > 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocating " + allocated + " bytes inbound as a partial grant to "
+ req.getRequestName() + " (wanted "
+ req.getTotalInboundRequested() + " bytes, waited "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingInboundRequests.size()
+ ", longest waited " + locked_getLongestInboundWait() + " in");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocating " + allocated + " bytes inbound to finish the partial grant to "
+ req.getRequestName() + " (total "
+ req.getTotalInboundRequested() + " bytes, waited "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingInboundRequests.size()
+ ", longest waited " + locked_getLongestInboundWait() + " out");
@ -607,14 +607,13 @@ public class FIFOBandwidthLimiter {
private final void locked_satisfyOutboundUnlimited(List<Request> satisfied) {
while (!_pendingOutboundRequests.isEmpty()) {
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.remove(0);
int allocated = req.getPendingOutboundRequested();
int allocated = req.getPendingRequested();
_totalAllocatedOutboundBytes.addAndGet(allocated);
req.allocateBytes(0, allocated);
req.allocateBytes(allocated);
satisfied.add(req);
long waited = now() - req.getRequestTime();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Granting outbound request " + req.getRequestName() + " fully for "
+ req.getTotalOutboundRequested() + " bytes (waited "
_log.debug("Granting outbound request " + req + " fully (waited "
+ waited
+ "ms) pending " + _pendingOutboundRequests.size()
+ ", longest waited " + locked_getLongestOutboundWait() + " out");
@ -639,8 +638,8 @@ public class FIFOBandwidthLimiter {
// connection decided they dont want the data anymore
if (_log.shouldLog(Log.DEBUG))
_log.debug("Aborting outbound request to "
+ req.getRequestName() + " (total "
+ req.getTotalOutboundRequested() + " bytes, waited "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingOutboundRequests.size());
_pendingOutboundRequests.remove(i);
@ -652,11 +651,11 @@ public class FIFOBandwidthLimiter {
// they haven't taken advantage of it yet (most likely they're
// IO bound)
if (_log.shouldLog(Log.WARN))
_log.warn("multiple allocations since wait... ntcp shouldn't do this: " + req.getRequestName());
_log.warn("multiple allocations since wait... ntcp shouldn't do this: " + req);
continue;
}
// ok, they are really waiting for us to give them stuff
int requested = req.getPendingOutboundRequested();
int requested = req.getPendingRequested();
int avo = _availableOutbound.get();
int allocated;
if (avo >= requested)
@ -665,22 +664,22 @@ public class FIFOBandwidthLimiter {
allocated = avo;
_availableOutbound.addAndGet(0 - allocated);
_totalAllocatedOutboundBytes.addAndGet(allocated);
req.allocateBytes(0, allocated);
req.allocateBytes(allocated);
satisfied.add(req);
if (req.getPendingOutboundRequested() > 0) {
if (req.getPendingRequested() > 0) {
if (req.attachment() != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocating " + allocated + " bytes outbound as a partial grant to "
+ req.getRequestName() + " (wanted "
+ req.getTotalOutboundRequested() + " bytes, waited "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingOutboundRequests.size()
+ ", longest waited " + locked_getLongestOutboundWait() + " out");
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocating " + allocated + " bytes outbound as a partial grant to "
+ req.getRequestName() + " (wanted "
+ req.getTotalOutboundRequested() + " bytes, waited "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingOutboundRequests.size()
+ ", longest waited " + locked_getLongestOutboundWait() + " out");
@ -688,16 +687,16 @@ public class FIFOBandwidthLimiter {
if (req.attachment() != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocating " + allocated + " bytes outbound to finish the partial grant to "
+ req.getRequestName() + " (total "
+ req.getTotalOutboundRequested() + " bytes, waited "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingOutboundRequests.size()
+ ", longest waited " + locked_getLongestOutboundWait() + " out)");
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocating " + allocated + " bytes outbound to finish the partial grant to "
+ req.getRequestName() + " (total "
+ req.getTotalOutboundRequested() + " bytes, waited "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingOutboundRequests.size()
+ ", longest waited " + locked_getLongestOutboundWait() + " out)");
@ -788,12 +787,9 @@ public class FIFOBandwidthLimiter {
******/
}
private static long __requestId = 0;
private final class SimpleRequest implements Request {
private int _inAllocated;
private int _inTotal;
private int _outAllocated;
private int _outTotal;
private static class SimpleRequest implements Request {
private int _allocated;
private int _total;
private long _requestId;
private long _requestTime;
private String _target;
@ -806,119 +802,121 @@ public class FIFOBandwidthLimiter {
public SimpleRequest() {
satisfiedBuffer = new ArrayList(1);
init(0, 0, null);
init(0, null);
}
public SimpleRequest(int in, int out, String target, CompleteListener lsnr, Object attachment) {
/**
* @param target for debugging, to be removed
*/
public SimpleRequest(int bytes, String target, CompleteListener lsnr, Object attachment) {
satisfiedBuffer = new ArrayList(1);
_lsnr = lsnr;
_attachment = attachment;
init(in, out, target);
init(bytes, target);
}
public void init(int in, int out, String target) {
/**
* @param target for debugging, to be removed
*/
public void init(int bytes, String target) {
_waited = false;
_inTotal = in;
_outTotal = out;
_inAllocated = 0;
_outAllocated = 0;
_total = bytes;
_allocated = 0;
_aborted = false;
_target = target;
satisfiedBuffer.clear();
_requestId = ++__requestId;
_requestTime = now();
_requestId = __requestId.incrementAndGet();
_requestTime = System.currentTimeMillis();
}
public String getRequestName() { return "Req" + _requestId + " to " + _target; }
public long getRequestTime() { return _requestTime; }
public int getTotalOutboundRequested() { return _outTotal; }
public int getPendingOutboundRequested() { return _outTotal - _outAllocated; }
public int getTotalInboundRequested() { return _inTotal; }
public int getPendingInboundRequested() { return _inTotal - _inAllocated; }
public int getTotalRequested() { return _total; }
public int getPendingRequested() { return _total - _allocated; }
public boolean getAborted() { return _aborted; }
public void abort() { _aborted = true; }
public CompleteListener getCompleteListener() { return _lsnr; }
public void setCompleteListener(CompleteListener lsnr) {
boolean complete = false;
synchronized (SimpleRequest.this) {
synchronized (this) {
_lsnr = lsnr;
if (isComplete()) {
complete = true;
}
}
if (complete && lsnr != null) {
if (_log.shouldLog(Log.INFO))
_log.info("complete listener set AND completed: " + lsnr);
lsnr.complete(SimpleRequest.this);
//if (_log.shouldLog(Log.INFO))
// _log.info("complete listener set AND completed: " + lsnr);
lsnr.complete(this);
}
}
private boolean isComplete() { return (_outAllocated >= _outTotal) && (_inAllocated >= _inTotal); }
private boolean isComplete() { return _allocated >= _total; }
public void waitForNextAllocation() {
_waited = true;
_allocationsSinceWait = 0;
boolean complete = false;
try {
synchronized (SimpleRequest.this) {
synchronized (this) {
if (isComplete())
complete = true;
else
SimpleRequest.this.wait();
wait();
}
} catch (InterruptedException ie) {}
if (complete && _lsnr != null)
_lsnr.complete(SimpleRequest.this);
_lsnr.complete(this);
}
int getAllocationsSinceWait() { return _waited ? _allocationsSinceWait : 0; }
void allocateBytes(int in, int out) {
_inAllocated += in;
_outAllocated += out;
void allocateBytes(int bytes) {
_allocated += bytes;
if (_lsnr == null)
_allocationsSinceWait++;
if (isComplete()) {
if (_log.shouldLog(Log.INFO))
_log.info("allocate " + in +"/"+ out + " completed, listener=" + _lsnr);
}
//if (isComplete()) {
// if (_log.shouldLog(Log.INFO))
// _log.info("allocate " + bytes + " completed, listener=" + _lsnr);
//}
//notifyAllocation(); // handled within the satisfy* methods
}
void notifyAllocation() {
boolean complete = false;
synchronized (SimpleRequest.this) {
synchronized (this) {
if (isComplete())
complete = true;
SimpleRequest.this.notifyAll();
notifyAll();
}
if (complete && _lsnr != null) {
_lsnr.complete(SimpleRequest.this);
if (_log.shouldLog(Log.INFO))
_log.info("at completion for " + _inTotal + "/" + _outTotal
+ ", recvBps=" + _recvBps + "/"+ _recvBps15s + " listener is " + _lsnr);
_lsnr.complete(this);
//if (_log.shouldLog(Log.INFO))
// _log.info("at completion for " + _total
// + ", recvBps=" + _recvBps + "/"+ _recvBps15s + " listener is " + _lsnr);
}
}
public void attach(Object obj) { _attachment = obj; }
public Object attachment() { return _attachment; }
@Override
public String toString() { return getRequestName(); }
public String toString() {
return "Req" + _requestId + " to " + _target +
_allocated + '/' + _total;
}
}
/**
* This is somewhat complicated by having both
* inbound and outbound in a single request.
* Making a request unidirectional would
* be a good simplification.
* But NTCP would have to be changed as it puts them on one queue.
* A bandwidth request, either inbound or outbound.
*/
public interface Request {
/** describe this particular request */
public String getRequestName();
/** when was the request made? */
public long getRequestTime();
/** how many outbound bytes were requested? */
public int getTotalOutboundRequested();
/** how many outbound bytes were requested and haven't yet been allocated? */
public int getPendingOutboundRequested();
/** how many inbound bytes were requested? */
public int getTotalInboundRequested();
/** how many inbound bytes were requested and haven't yet been allocated? */
public int getPendingInboundRequested();
/** how many bytes were requested? */
public int getTotalRequested();
/** how many bytes were requested and haven't yet been allocated? */
public int getPendingRequested();
/** block until we are allocated some more bytes */
public void waitForNextAllocation();
/** we no longer want the data requested (the connection closed) */
@ -926,7 +924,7 @@ public class FIFOBandwidthLimiter {
/** was this request aborted? */
public boolean getAborted();
/** thar be dragons */
public void init(int in, int out, String target);
public void init(int bytes, String target);
public void setCompleteListener(CompleteListener lsnr);
/** Only supported if the request is not satisfied */
public void attach(Object obj);
@ -943,14 +941,13 @@ public class FIFOBandwidthLimiter {
private static class NoopRequest implements Request {
public void abort() {}
public boolean getAborted() { return false; }
public int getPendingInboundRequested() { return 0; }
public int getPendingOutboundRequested() { return 0; }
public String getRequestName() { return "noop"; }
public int getPendingRequested() { return 0; }
@Override
public String toString() { return "noop"; }
public long getRequestTime() { return 0; }
public int getTotalInboundRequested() { return 0; }
public int getTotalOutboundRequested() { return 0; }
public int getTotalRequested() { return 0; }
public void waitForNextAllocation() {}
public void init(int in, int out, String target) {}
public void init(int bytes, String target) {}
public CompleteListener getCompleteListener() { return null; }
public void setCompleteListener(CompleteListener lsnr) {
lsnr.complete(NoopRequest.this);

View File

@ -376,7 +376,7 @@ class EventPumper implements Runnable {
public void wantsWrite(NTCPConnection con, byte data[]) {
ByteBuffer buf = ByteBuffer.wrap(data);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write");//con, buf);
if (req.getPendingOutboundRequested() > 0) {
if (req.getPendingRequested() > 0) {
if (_log.shouldLog(Log.INFO))
_log.info("queued write on " + con + " for " + data.length);
_context.statManager().addRateData("ntcp.wantsQueuedWrite", 1);
@ -584,7 +584,7 @@ class EventPumper implements Runnable {
// ZERO COPY. The buffer will be returned in Reader.processRead()
buf.flip();
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
if (req.getPendingInboundRequested() > 0) {
if (req.getPendingRequested() > 0) {
// rare since we generally don't throttle inbound
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
//if (_log.shouldLog(Log.DEBUG))

View File

@ -293,7 +293,7 @@ class UDPReceiver {
//FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver");
//_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver");
req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver");
while (req.getPendingInboundRequested() > 0)
while (req.getPendingRequested() > 0)
req.waitForNextAllocation();
int queued = receive(packet);

View File

@ -212,7 +212,7 @@ class UDPSender {
if (size > 0) {
//_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender");
req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender");
while (req.getPendingOutboundRequested() > 0)
while (req.getPendingRequested() > 0)
req.waitForNextAllocation();
}