* adjust the algorithm to deal with IO bound requests:
if more tokens become available while the first pending request is still blocked on read/write (aka after allocation and before next .waitForAllocation()), give the tokens to the next request * refactor the satisfy{In,Out}boundRequests methods into smaller logical units
This commit is contained in:
@ -128,67 +128,17 @@ public class FIFOBandwidthLimiter {
|
|||||||
private final void satisfyInboundRequests() {
|
private final void satisfyInboundRequests() {
|
||||||
List satisfied = null;
|
List satisfied = null;
|
||||||
synchronized (_pendingInboundRequests) {
|
synchronized (_pendingInboundRequests) {
|
||||||
while (_pendingInboundRequests.size() > 0) {
|
if (_inboundUnlimited) {
|
||||||
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(0);
|
satisfied = locked_satisfyInboundUnlimited();
|
||||||
if (_inboundUnlimited) {
|
} else {
|
||||||
int allocated = req.getPendingInboundRequested();
|
if (_availableInboundBytes > 0) {
|
||||||
_totalAllocatedInboundBytes += allocated;
|
satisfied = locked_satisfyInboundAvailable();
|
||||||
req.allocateBytes(allocated, 0);
|
|
||||||
if (satisfied == null)
|
|
||||||
satisfied = new ArrayList(2);
|
|
||||||
satisfied.add(req);
|
|
||||||
long waited = _context.clock().now() - req.getRequestTime();
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info("Granting inbound request " + req.getRequestName() + " fully for "
|
|
||||||
+ req.getTotalInboundRequested() + " bytes (waited "
|
|
||||||
+ waited
|
|
||||||
+ "ms) pending " + _pendingInboundRequests.size());
|
|
||||||
// if we're unlimited, we always grant it fully, so there's no need to keep it around
|
|
||||||
_pendingInboundRequests.remove(0);
|
|
||||||
if (waited > 10)
|
|
||||||
_context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited);
|
|
||||||
} else if (_availableInboundBytes > 0) {
|
|
||||||
int requested = req.getPendingInboundRequested();
|
|
||||||
int allocated = 0;
|
|
||||||
if (_availableInboundBytes > requested)
|
|
||||||
allocated = requested;
|
|
||||||
else
|
|
||||||
allocated = _availableInboundBytes;
|
|
||||||
_availableInboundBytes -= allocated;
|
|
||||||
_totalAllocatedInboundBytes += allocated;
|
|
||||||
req.allocateBytes(allocated, 0);
|
|
||||||
if (satisfied == null)
|
|
||||||
satisfied = new ArrayList(2);
|
|
||||||
satisfied.add(req);
|
|
||||||
long waited = _context.clock().now() - req.getRequestTime();
|
|
||||||
if (req.getPendingInboundRequested() > 0) {
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info("Allocating " + allocated + " bytes inbound as a partial grant to "
|
|
||||||
+ req.getRequestName() + " (wanted "
|
|
||||||
+ req.getTotalInboundRequested() + " bytes, waited "
|
|
||||||
+ waited
|
|
||||||
+ "ms) pending " + _pendingInboundRequests.size());
|
|
||||||
} else {
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info("Allocating " + allocated + " bytes inbound to finish the partial grant to "
|
|
||||||
+ req.getRequestName() + " (total "
|
|
||||||
+ req.getTotalInboundRequested() + " bytes, waited "
|
|
||||||
+ waited
|
|
||||||
+ "ms) pending " + _pendingInboundRequests.size());
|
|
||||||
_pendingInboundRequests.remove(0);
|
|
||||||
if (waited > 10)
|
|
||||||
_context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// no bandwidth available
|
// no bandwidth available
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Still denying the first inbound request (" + req.getRequestName()
|
_log.warn("Still denying the " + _pendingInboundRequests.size()
|
||||||
+ " for "
|
+ " pending inbound requests (available "
|
||||||
+ req.getTotalInboundRequested() + " bytes (available "
|
+ _availableInboundBytes + "/" + _availableOutboundBytes + " in/out)");
|
||||||
+ _availableInboundBytes + "/" + _availableOutboundBytes + " in/out) (waited "
|
|
||||||
+ (_context.clock().now() - req.getRequestTime())
|
|
||||||
+ "ms so far) pending " + (_pendingInboundRequests.size()));
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -201,70 +151,103 @@ public class FIFOBandwidthLimiter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* There are no limits, so just give every inbound request whatever they want
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private final List locked_satisfyInboundUnlimited() {
|
||||||
|
List satisfied = null;
|
||||||
|
|
||||||
|
while (_pendingInboundRequests.size() > 0) {
|
||||||
|
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.remove(0);
|
||||||
|
int allocated = req.getPendingInboundRequested();
|
||||||
|
_totalAllocatedInboundBytes += allocated;
|
||||||
|
req.allocateBytes(allocated, 0);
|
||||||
|
if (satisfied == null)
|
||||||
|
satisfied = new ArrayList(2);
|
||||||
|
satisfied.add(req);
|
||||||
|
long waited = _context.clock().now() - req.getRequestTime();
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info("Granting inbound request " + req.getRequestName() + " fully for "
|
||||||
|
+ req.getTotalInboundRequested() + " bytes (waited "
|
||||||
|
+ waited
|
||||||
|
+ "ms) pending " + _pendingInboundRequests.size());
|
||||||
|
if (waited > 10)
|
||||||
|
_context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited);
|
||||||
|
}
|
||||||
|
return satisfied;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ok, we have limits, so lets iterate through the requests, allocating as much
|
||||||
|
* bandwidth as we can to those who have used what we have given them and are waiting
|
||||||
|
* for more (giving priority to the first ones who requested it)
|
||||||
|
*
|
||||||
|
* @return list of requests that were completely satisfied
|
||||||
|
*/
|
||||||
|
private final List locked_satisfyInboundAvailable() {
|
||||||
|
List satisfied = null;
|
||||||
|
|
||||||
|
for (int i = 0; i < _pendingInboundRequests.size(); i++) {
|
||||||
|
if (_availableInboundBytes <= 0) break;
|
||||||
|
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(i);
|
||||||
|
if (req.getAllocationsSinceWait() > 0) {
|
||||||
|
// we have already allocated some values to this request, but
|
||||||
|
// they haven't taken advantage of it yet (most likely they're
|
||||||
|
// IO bound)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// ok, they are really waiting for us to give them stuff
|
||||||
|
int requested = req.getPendingInboundRequested();
|
||||||
|
int allocated = 0;
|
||||||
|
if (_availableInboundBytes > requested)
|
||||||
|
allocated = requested;
|
||||||
|
else
|
||||||
|
allocated = _availableInboundBytes;
|
||||||
|
_availableInboundBytes -= allocated;
|
||||||
|
_totalAllocatedInboundBytes += allocated;
|
||||||
|
req.allocateBytes(allocated, 0);
|
||||||
|
if (satisfied == null)
|
||||||
|
satisfied = new ArrayList(2);
|
||||||
|
satisfied.add(req);
|
||||||
|
long waited = _context.clock().now() - req.getRequestTime();
|
||||||
|
if (req.getPendingInboundRequested() > 0) {
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info("Allocating " + allocated + " bytes inbound as a partial grant to "
|
||||||
|
+ req.getRequestName() + " (wanted "
|
||||||
|
+ req.getTotalInboundRequested() + " bytes, waited "
|
||||||
|
+ waited
|
||||||
|
+ "ms) pending " + _pendingInboundRequests.size());
|
||||||
|
} else {
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info("Allocating " + allocated + " bytes inbound to finish the partial grant to "
|
||||||
|
+ req.getRequestName() + " (total "
|
||||||
|
+ req.getTotalInboundRequested() + " bytes, waited "
|
||||||
|
+ waited
|
||||||
|
+ "ms) pending " + _pendingInboundRequests.size());
|
||||||
|
_pendingInboundRequests.remove(i);
|
||||||
|
i--;
|
||||||
|
if (waited > 10)
|
||||||
|
_context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return satisfied;
|
||||||
|
}
|
||||||
|
|
||||||
private final void satisfyOutboundRequests() {
|
private final void satisfyOutboundRequests() {
|
||||||
List satisfied = null;
|
List satisfied = null;
|
||||||
synchronized (_pendingOutboundRequests) {
|
synchronized (_pendingOutboundRequests) {
|
||||||
while (_pendingOutboundRequests.size() > 0) {
|
if (_outboundUnlimited) {
|
||||||
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(0);
|
satisfied = locked_satisfyOutboundUnlimited();
|
||||||
if (_outboundUnlimited) {
|
} else {
|
||||||
int allocated = req.getPendingOutboundRequested();
|
if (_availableOutboundBytes > 0) {
|
||||||
_totalAllocatedOutboundBytes += allocated;
|
satisfied = locked_satisfyOutboundAvailable();
|
||||||
req.allocateBytes(0, allocated);
|
|
||||||
if (satisfied == null)
|
|
||||||
satisfied = new ArrayList(2);
|
|
||||||
satisfied.add(req);
|
|
||||||
long waited = _context.clock().now() - req.getRequestTime();
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info("Granting outbound request " + req.getRequestName() + " fully for "
|
|
||||||
+ req.getTotalOutboundRequested() + " bytes (waited "
|
|
||||||
+ waited
|
|
||||||
+ "ms) pending " + _pendingOutboundRequests.size());
|
|
||||||
// if we're unlimited, we always grant it fully, so there's no need to keep it around
|
|
||||||
_pendingOutboundRequests.remove(0);
|
|
||||||
if (waited > 10)
|
|
||||||
_context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited);
|
|
||||||
} else if (_availableOutboundBytes > 0) {
|
|
||||||
int requested = req.getPendingOutboundRequested();
|
|
||||||
int allocated = 0;
|
|
||||||
if (_availableOutboundBytes > requested)
|
|
||||||
allocated = requested;
|
|
||||||
else
|
|
||||||
allocated = _availableOutboundBytes;
|
|
||||||
_availableOutboundBytes -= allocated;
|
|
||||||
_totalAllocatedOutboundBytes += allocated;
|
|
||||||
req.allocateBytes(0, allocated);
|
|
||||||
if (satisfied == null)
|
|
||||||
satisfied = new ArrayList(2);
|
|
||||||
satisfied.add(req);
|
|
||||||
long waited = _context.clock().now() - req.getRequestTime();
|
|
||||||
if (req.getPendingOutboundRequested() > 0) {
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info("Allocating " + allocated + " bytes outbound as a partial grant to "
|
|
||||||
+ req.getRequestName() + " (wanted "
|
|
||||||
+ req.getTotalOutboundRequested() + " bytes, waited "
|
|
||||||
+ waited
|
|
||||||
+ "ms) pending " + _pendingOutboundRequests.size());
|
|
||||||
} else {
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info("Allocating " + allocated + " bytes outbound to finish the partial grant to "
|
|
||||||
+ req.getRequestName() + " (total "
|
|
||||||
+ req.getTotalOutboundRequested() + " bytes, waited "
|
|
||||||
+ waited
|
|
||||||
+ "ms) pending " + _pendingOutboundRequests.size());
|
|
||||||
_pendingOutboundRequests.remove(0);
|
|
||||||
if (waited > 10)
|
|
||||||
_context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// no bandwidth available
|
// no bandwidth available
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Still denying the first outbound request (" + req.getRequestName()
|
_log.warn("Still denying the " + _pendingOutboundRequests.size()
|
||||||
+ " for "
|
+ " pending outbound requests (available "
|
||||||
+ req.getTotalOutboundRequested() + " bytes (available "
|
+ _availableInboundBytes + "/" + _availableOutboundBytes + " in/out)");
|
||||||
+ _availableInboundBytes + "/" + _availableOutboundBytes + " in/out) (waited "
|
|
||||||
+ (_context.clock().now() - req.getRequestTime())
|
|
||||||
+ "ms so far) pending " + (_pendingOutboundRequests.size()));
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -277,6 +260,89 @@ public class FIFOBandwidthLimiter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* There are no limits, so just give every outbound request whatever they want
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private final List locked_satisfyOutboundUnlimited() {
|
||||||
|
List satisfied = null;
|
||||||
|
|
||||||
|
while (_pendingOutboundRequests.size() > 0) {
|
||||||
|
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.remove(0);
|
||||||
|
int allocated = req.getPendingOutboundRequested();
|
||||||
|
_totalAllocatedOutboundBytes += allocated;
|
||||||
|
req.allocateBytes(0, allocated);
|
||||||
|
if (satisfied == null)
|
||||||
|
satisfied = new ArrayList(2);
|
||||||
|
satisfied.add(req);
|
||||||
|
long waited = _context.clock().now() - req.getRequestTime();
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info("Granting outbound request " + req.getRequestName() + " fully for "
|
||||||
|
+ req.getTotalOutboundRequested() + " bytes (waited "
|
||||||
|
+ waited
|
||||||
|
+ "ms) pending " + _pendingOutboundRequests.size());
|
||||||
|
if (waited > 10)
|
||||||
|
_context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited);
|
||||||
|
}
|
||||||
|
return satisfied;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ok, we have limits, so lets iterate through the requests, allocating as much
|
||||||
|
* bandwidth as we can to those who have used what we have given them and are waiting
|
||||||
|
* for more (giving priority to the first ones who requested it)
|
||||||
|
*
|
||||||
|
* @return list of requests that were completely satisfied
|
||||||
|
*/
|
||||||
|
private final List locked_satisfyOutboundAvailable() {
|
||||||
|
List satisfied = null;
|
||||||
|
|
||||||
|
for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
|
||||||
|
if (_availableOutboundBytes <= 0) break;
|
||||||
|
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(i);
|
||||||
|
if (req.getAllocationsSinceWait() > 0) {
|
||||||
|
// we have already allocated some values to this request, but
|
||||||
|
// they haven't taken advantage of it yet (most likely they're
|
||||||
|
// IO bound)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// ok, they are really waiting for us to give them stuff
|
||||||
|
int requested = req.getPendingOutboundRequested();
|
||||||
|
int allocated = 0;
|
||||||
|
if (_availableOutboundBytes > requested)
|
||||||
|
allocated = requested;
|
||||||
|
else
|
||||||
|
allocated = _availableOutboundBytes;
|
||||||
|
_availableOutboundBytes -= allocated;
|
||||||
|
_totalAllocatedOutboundBytes += allocated;
|
||||||
|
req.allocateBytes(0, allocated);
|
||||||
|
if (satisfied == null)
|
||||||
|
satisfied = new ArrayList(2);
|
||||||
|
satisfied.add(req);
|
||||||
|
long waited = _context.clock().now() - req.getRequestTime();
|
||||||
|
if (req.getPendingOutboundRequested() > 0) {
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info("Allocating " + allocated + " bytes outbound as a partial grant to "
|
||||||
|
+ req.getRequestName() + " (wanted "
|
||||||
|
+ req.getTotalOutboundRequested() + " bytes, waited "
|
||||||
|
+ waited
|
||||||
|
+ "ms) pending " + _pendingOutboundRequests.size());
|
||||||
|
} else {
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info("Allocating " + allocated + " bytes outbound to finish the partial grant to "
|
||||||
|
+ req.getRequestName() + " (total "
|
||||||
|
+ req.getTotalOutboundRequested() + " bytes, waited "
|
||||||
|
+ waited
|
||||||
|
+ "ms) pending " + _pendingOutboundRequests.size());
|
||||||
|
_pendingOutboundRequests.remove(i);
|
||||||
|
i--;
|
||||||
|
if (waited > 10)
|
||||||
|
_context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return satisfied;
|
||||||
|
}
|
||||||
|
|
||||||
public void renderStatusHTML(OutputStream out) throws IOException {
|
public void renderStatusHTML(OutputStream out) throws IOException {
|
||||||
long now = _context.clock().now();
|
long now = _context.clock().now();
|
||||||
StringBuffer buf = new StringBuffer(4096);
|
StringBuffer buf = new StringBuffer(4096);
|
||||||
@ -318,6 +384,7 @@ public class FIFOBandwidthLimiter {
|
|||||||
private long _requestId;
|
private long _requestId;
|
||||||
private long _requestTime;
|
private long _requestTime;
|
||||||
private String _target;
|
private String _target;
|
||||||
|
private int _allocationsSinceWait;
|
||||||
|
|
||||||
public SimpleRequest(int in, int out, String target) {
|
public SimpleRequest(int in, int out, String target) {
|
||||||
_inTotal = in;
|
_inTotal = in;
|
||||||
@ -336,6 +403,7 @@ public class FIFOBandwidthLimiter {
|
|||||||
public int getTotalInboundRequested() { return _inTotal; }
|
public int getTotalInboundRequested() { return _inTotal; }
|
||||||
public int getPendingInboundRequested() { return _inTotal - _inAllocated; }
|
public int getPendingInboundRequested() { return _inTotal - _inAllocated; }
|
||||||
public void waitForNextAllocation() {
|
public void waitForNextAllocation() {
|
||||||
|
_allocationsSinceWait = 0;
|
||||||
if ( (_outAllocated >= _outTotal) &&
|
if ( (_outAllocated >= _outTotal) &&
|
||||||
(_inAllocated >= _inTotal) )
|
(_inAllocated >= _inTotal) )
|
||||||
return;
|
return;
|
||||||
@ -345,9 +413,11 @@ public class FIFOBandwidthLimiter {
|
|||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {}
|
} catch (InterruptedException ie) {}
|
||||||
}
|
}
|
||||||
|
int getAllocationsSinceWait() { return _allocationsSinceWait; }
|
||||||
void allocateBytes(int in, int out) {
|
void allocateBytes(int in, int out) {
|
||||||
_inAllocated += in;
|
_inAllocated += in;
|
||||||
_outAllocated += out;
|
_outAllocated += out;
|
||||||
|
_allocationsSinceWait++;
|
||||||
}
|
}
|
||||||
void notifyAllocation() {
|
void notifyAllocation() {
|
||||||
synchronized (SimpleRequest.this) {
|
synchronized (SimpleRequest.this) {
|
||||||
|
Reference in New Issue
Block a user