forked from I2P_Developers/i2p.i2p
new bandwidth allocation policy and usage to include support for partial allocations (and in turn, partial write(...)) while still keeping the FIFO ordering
this will give a much smoother traffic pattern, as instead of waiting 6 seconds to write a 32KB message under a 6KB rate, it'll write 6KB for each of the first 5 seconds, and 2KB the next this also allows people to have small buckets (but again, bucket sizes smaller than the rate just don't make sense)
This commit is contained in:
@ -38,36 +38,50 @@ public class BandwidthLimitedInputStream extends FilterInputStream {
|
||||
}
|
||||
|
||||
public int read() throws IOException {
|
||||
FIFOBandwidthLimiter.Request req = null;
|
||||
if (_pullFromOutbound)
|
||||
_context.bandwidthLimiter().requestOutbound(1, _peerSource);
|
||||
req = _context.bandwidthLimiter().requestOutbound(1, _peerSource);
|
||||
else
|
||||
_context.bandwidthLimiter().requestInbound(1, _peerSource);
|
||||
req = _context.bandwidthLimiter().requestInbound(1, _peerSource);
|
||||
|
||||
// since its only a single byte, we dont need to loop
|
||||
// or check how much was allocated
|
||||
req.waitForNextAllocation();
|
||||
return in.read();
|
||||
}
|
||||
|
||||
public int read(byte dest[]) throws IOException {
|
||||
int read = in.read(dest);
|
||||
if (_pullFromOutbound)
|
||||
_context.bandwidthLimiter().requestOutbound(read, _peerSource);
|
||||
else
|
||||
_context.bandwidthLimiter().requestInbound(read, _peerSource);
|
||||
return read;
|
||||
return read(dest, 0, dest.length);
|
||||
}
|
||||
|
||||
public int read(byte dest[], int off, int len) throws IOException {
|
||||
int read = in.read(dest, off, len);
|
||||
FIFOBandwidthLimiter.Request req = null;
|
||||
if (_pullFromOutbound)
|
||||
_context.bandwidthLimiter().requestOutbound(read, _peerSource);
|
||||
req = _context.bandwidthLimiter().requestOutbound(read, _peerSource);
|
||||
else
|
||||
_context.bandwidthLimiter().requestInbound(read, _peerSource);
|
||||
req = _context.bandwidthLimiter().requestInbound(read, _peerSource);
|
||||
|
||||
while ( (req.getPendingInboundRequested() > 0) ||
|
||||
(req.getPendingOutboundRequested() > 0) ) {
|
||||
// we still haven't been authorized for everything, keep on waiting
|
||||
req.waitForNextAllocation();
|
||||
}
|
||||
return read;
|
||||
}
|
||||
public long skip(long numBytes) throws IOException {
|
||||
long skip = in.skip(numBytes);
|
||||
FIFOBandwidthLimiter.Request req = null;
|
||||
if (_pullFromOutbound)
|
||||
_context.bandwidthLimiter().requestOutbound((int)skip, _peerSource);
|
||||
req = _context.bandwidthLimiter().requestOutbound((int)skip, _peerSource);
|
||||
else
|
||||
_context.bandwidthLimiter().requestInbound((int)skip, _peerSource);
|
||||
req = _context.bandwidthLimiter().requestInbound((int)skip, _peerSource);
|
||||
|
||||
while ( (req.getPendingInboundRequested() > 0) ||
|
||||
(req.getPendingOutboundRequested() > 0) ) {
|
||||
// we still haven't been authorized for everything, keep on waiting
|
||||
req.waitForNextAllocation();
|
||||
}
|
||||
return skip;
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,9 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
|
||||
public void write(int val) throws IOException {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Writing a single byte!", new Exception("Single byte from..."));
|
||||
_context.bandwidthLimiter().requestOutbound(1, _peerTarget);
|
||||
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(1, _peerTarget);
|
||||
// only a single byte, no need to loop
|
||||
req.waitForNextAllocation();
|
||||
out.write(val);
|
||||
}
|
||||
public void write(byte src[]) throws IOException {
|
||||
@ -47,7 +49,17 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
|
||||
if (len + off > src.length)
|
||||
throw new IllegalArgumentException("wtf are you thinking? len=" + len
|
||||
+ ", off=" + off + ", data=" + src.length);
|
||||
_context.bandwidthLimiter().requestOutbound(len, _peerTarget);
|
||||
out.write(src, off, len);
|
||||
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(len, _peerTarget);
|
||||
|
||||
int written = 0;
|
||||
while (written < len) {
|
||||
int allocated = len - req.getPendingOutboundRequested();
|
||||
int toWrite = allocated - written;
|
||||
if (toWrite > 0) {
|
||||
out.write(src, off + written, toWrite);
|
||||
written += toWrite;
|
||||
}
|
||||
req.waitForNextAllocation();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,14 +15,14 @@ public class FIFOBandwidthLimiter {
|
||||
private I2PAppContext _context;
|
||||
private List _pendingInboundRequests;
|
||||
private List _pendingOutboundRequests;
|
||||
private volatile long _availableInboundBytes;
|
||||
private volatile long _availableOutboundBytes;
|
||||
private volatile int _availableInboundBytes;
|
||||
private volatile int _availableOutboundBytes;
|
||||
private boolean _outboundUnlimited;
|
||||
private boolean _inboundUnlimited;
|
||||
private volatile long _totalAllocatedInboundBytes;
|
||||
private volatile long _totalAllocatedOutboundBytes;
|
||||
private long _maxInboundBytes;
|
||||
private long _maxOutboundBytes;
|
||||
private int _maxInboundBytes;
|
||||
private int _maxOutboundBytes;
|
||||
private FIFOBandwidthRefiller _refiller;
|
||||
|
||||
private static int __id = 0;
|
||||
@ -45,9 +45,9 @@ public class FIFOBandwidthLimiter {
|
||||
public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes; }
|
||||
public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes; }
|
||||
public long getMaxInboundBytes() { return _maxInboundBytes; }
|
||||
public void setMaxInboundBytes(long numBytes) { _maxInboundBytes = numBytes; }
|
||||
public void setMaxInboundBytes(int numBytes) { _maxInboundBytes = numBytes; }
|
||||
public long getMaxOutboundBytes() { return _maxOutboundBytes; }
|
||||
public void setMaxOutboundBytes(long numBytes) { _maxOutboundBytes = numBytes; }
|
||||
public void setMaxOutboundBytes(int numBytes) { _maxOutboundBytes = numBytes; }
|
||||
public boolean getInboundUnlimited() { return _inboundUnlimited; }
|
||||
public void setInboundUnlimited(boolean isUnlimited) { _inboundUnlimited = isUnlimited; }
|
||||
public boolean getOutboundUnlimited() { return _outboundUnlimited; }
|
||||
@ -67,81 +67,25 @@ public class FIFOBandwidthLimiter {
|
||||
* Request some bytes, blocking until they become available
|
||||
*
|
||||
*/
|
||||
public void requestInbound(int bytesIn, String purpose) {
|
||||
addInboundRequest(new SimpleRequest(bytesIn, 0, purpose));
|
||||
public Request requestInbound(int bytesIn, String purpose) {
|
||||
SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose);
|
||||
synchronized (_pendingInboundRequests) {
|
||||
_pendingInboundRequests.add(req);
|
||||
}
|
||||
satisfyInboundRequests();
|
||||
return req;
|
||||
}
|
||||
/**
|
||||
* Request some bytes, blocking until they become available
|
||||
*
|
||||
*/
|
||||
public void requestOutbound(int bytesOut, String purpose) {
|
||||
addOutboundRequest(new SimpleRequest(0, bytesOut, purpose));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the request to the queue, blocking the requesting thread until
|
||||
* bandwidth is available (and all requests for bandwidth ahead of it have
|
||||
* been granted). Once sufficient bandwidth is available, this call will
|
||||
* return and request.grantRequest() will have been called.
|
||||
*
|
||||
*/
|
||||
private final void addInboundRequest(BandwidthRequest request) {
|
||||
synchronized (_pendingInboundRequests) {
|
||||
if ( (_pendingInboundRequests.size() <= 0) &&
|
||||
( (request.getRequestedInboundBytes() <= _availableInboundBytes) || (_inboundUnlimited) ) ) {
|
||||
// the queue is empty and there are sufficient bytes, grant 'em
|
||||
if (!_inboundUnlimited)
|
||||
_availableInboundBytes -= request.getRequestedInboundBytes();
|
||||
_totalAllocatedInboundBytes += request.getRequestedInboundBytes();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting inbound request " + request.getRequestName() + " immediately for "
|
||||
+ request.getRequestedInboundBytes());
|
||||
request.grantRequest();
|
||||
return;
|
||||
} else {
|
||||
_pendingInboundRequests.add(request);
|
||||
}
|
||||
}
|
||||
synchronized (request.getAvailabilityMonitor()) {
|
||||
while (!request.alreadyGranted()) {
|
||||
try {
|
||||
request.getAvailabilityMonitor().wait();
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the request to the queue, blocking the requesting thread until
|
||||
* bandwidth is available (and all requests for bandwidth ahead of it have
|
||||
* been granted). Once sufficient bandwidth is available, this call will
|
||||
* return and request.grantRequest() will have been called.
|
||||
*
|
||||
*/
|
||||
private final void addOutboundRequest(BandwidthRequest request) {
|
||||
public Request requestOutbound(int bytesOut, String purpose) {
|
||||
SimpleRequest req = new SimpleRequest(0, bytesOut, purpose);
|
||||
synchronized (_pendingOutboundRequests) {
|
||||
if ( (_pendingOutboundRequests.size() <= 0) &&
|
||||
( (request.getRequestedOutboundBytes() <= _availableOutboundBytes) || (_outboundUnlimited) ) ) {
|
||||
// the queue is empty and there are sufficient bytes, grant 'em
|
||||
if (!_outboundUnlimited)
|
||||
_availableOutboundBytes -= request.getRequestedOutboundBytes();
|
||||
_totalAllocatedOutboundBytes += request.getRequestedOutboundBytes();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting outbound request " + request.getRequestName() + " immediately for "
|
||||
+ request.getRequestedOutboundBytes());
|
||||
request.grantRequest();
|
||||
return;
|
||||
} else {
|
||||
_pendingOutboundRequests.add(request);
|
||||
}
|
||||
}
|
||||
synchronized (request.getAvailabilityMonitor()) {
|
||||
while (!request.alreadyGranted()) {
|
||||
try {
|
||||
request.getAvailabilityMonitor().wait();
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
_pendingOutboundRequests.add(req);
|
||||
}
|
||||
satisfyOutboundRequests();
|
||||
return req;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -170,80 +114,140 @@ public class FIFOBandwidthLimiter {
|
||||
}
|
||||
|
||||
private final void satisfyInboundRequests() {
|
||||
List satisfied = null;
|
||||
synchronized (_pendingInboundRequests) {
|
||||
while (_pendingInboundRequests.size() > 0) {
|
||||
BandwidthRequest req = (BandwidthRequest)_pendingInboundRequests.get(0);
|
||||
if ( (req.getRequestedInboundBytes() <= _availableInboundBytes) || (_inboundUnlimited) ) {
|
||||
_pendingInboundRequests.remove(0);
|
||||
if (!_inboundUnlimited)
|
||||
_availableInboundBytes -= req.getRequestedInboundBytes();
|
||||
_totalAllocatedInboundBytes += req.getRequestedInboundBytes();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting inbound request " + req.getRequestName() + " for "
|
||||
+ req.getRequestedInboundBytes() + " bytes (waited "
|
||||
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(0);
|
||||
if (_inboundUnlimited) {
|
||||
int allocated = req.getPendingInboundRequested();
|
||||
_totalAllocatedInboundBytes += allocated;
|
||||
req.allocateBytes(allocated, 0);
|
||||
if (satisfied == null)
|
||||
satisfied = new ArrayList(2);
|
||||
satisfied.add(req);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting inbound request " + req.getRequestName() + " fully for "
|
||||
+ req.getTotalInboundRequested() + " bytes (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms) pending " + _pendingInboundRequests.size());
|
||||
// i hate nested synchronization
|
||||
synchronized (req.getAvailabilityMonitor()) {
|
||||
req.grantRequest();
|
||||
req.getAvailabilityMonitor().notifyAll();
|
||||
}
|
||||
// if we're unlimited, we always grant it fully, so there's no need to keep it around
|
||||
_pendingInboundRequests.remove(0);
|
||||
} else if (_availableInboundBytes > 0) {
|
||||
int requested = req.getPendingInboundRequested();
|
||||
int allocated = 0;
|
||||
if (_availableInboundBytes > requested)
|
||||
allocated = requested;
|
||||
else
|
||||
allocated = _availableInboundBytes;
|
||||
_availableInboundBytes -= allocated;
|
||||
req.allocateBytes(allocated, 0);
|
||||
if (satisfied == null)
|
||||
satisfied = new ArrayList(2);
|
||||
satisfied.add(req);
|
||||
if (req.getPendingInboundRequested() > 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Allocating " + allocated + " bytes inbound as a partial grant to "
|
||||
+ req.getRequestName() + " (wanted "
|
||||
+ req.getTotalInboundRequested() + " bytes, waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms) pending " + _pendingInboundRequests.size());
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Allocating " + allocated + " bytes inbound to finish the partial grant to "
|
||||
+ req.getRequestName() + " (total "
|
||||
+ req.getTotalInboundRequested() + " bytes, waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms) pending " + _pendingInboundRequests.size());
|
||||
_pendingInboundRequests.remove(0);
|
||||
}
|
||||
} else {
|
||||
// there isn't sufficient bandwidth for the first request,
|
||||
// so since we're a FIFO limiter, everyone waits. If we were a
|
||||
// best fit or ASAP limiter, we'd continue on iterating to see
|
||||
// if anyone would be satisfied with the current availability
|
||||
// no bandwidth available
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Still denying the first inbound request (" + req.getRequestName()
|
||||
+ " for "
|
||||
+ req.getRequestedInboundBytes() + " bytes (available "
|
||||
+ req.getTotalInboundRequested() + " bytes (available "
|
||||
+ _availableInboundBytes + "/" + _availableOutboundBytes + " in/out) (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms so far) pending " + (_pendingInboundRequests.size()));
|
||||
return;
|
||||
break;
|
||||
}
|
||||
}
|
||||
//if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("Nothing pending");
|
||||
}
|
||||
|
||||
if (satisfied != null) {
|
||||
for (int i = 0; i < satisfied.size(); i++) {
|
||||
SimpleRequest req = (SimpleRequest)satisfied.get(i);
|
||||
req.notifyAllocation();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final void satisfyOutboundRequests() {
|
||||
List satisfied = null;
|
||||
synchronized (_pendingOutboundRequests) {
|
||||
while (_pendingOutboundRequests.size() > 0) {
|
||||
BandwidthRequest req = (BandwidthRequest)_pendingOutboundRequests.get(0);
|
||||
if ( (req.getRequestedOutboundBytes() <= _availableOutboundBytes) || (_outboundUnlimited) ) {
|
||||
_pendingOutboundRequests.remove(0);
|
||||
if (!_outboundUnlimited)
|
||||
_availableOutboundBytes -= req.getRequestedOutboundBytes();
|
||||
_totalAllocatedOutboundBytes += req.getRequestedOutboundBytes();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting outbound request " + req.getRequestName() + " for "
|
||||
+ req.getRequestedOutboundBytes() + " bytes (waited "
|
||||
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(0);
|
||||
if (_outboundUnlimited) {
|
||||
int allocated = req.getPendingOutboundRequested();
|
||||
_totalAllocatedOutboundBytes += allocated;
|
||||
req.allocateBytes(0, allocated);
|
||||
if (satisfied == null)
|
||||
satisfied = new ArrayList(2);
|
||||
satisfied.add(req);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting outbound request " + req.getRequestName() + " fully for "
|
||||
+ req.getTotalOutboundRequested() + " bytes (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms) pending " + (_pendingOutboundRequests.size()-1));
|
||||
// i hate nested synchronization
|
||||
synchronized (req.getAvailabilityMonitor()) {
|
||||
req.grantRequest();
|
||||
req.getAvailabilityMonitor().notifyAll();
|
||||
}
|
||||
+ "ms) pending " + _pendingOutboundRequests.size());
|
||||
// if we're unlimited, we always grant it fully, so there's no need to keep it around
|
||||
_pendingOutboundRequests.remove(0);
|
||||
} else if (_availableOutboundBytes > 0) {
|
||||
int requested = req.getPendingOutboundRequested();
|
||||
int allocated = 0;
|
||||
if (_availableOutboundBytes > requested)
|
||||
allocated = requested;
|
||||
else
|
||||
allocated = _availableOutboundBytes;
|
||||
_availableOutboundBytes -= allocated;
|
||||
req.allocateBytes(0, allocated);
|
||||
if (satisfied == null)
|
||||
satisfied = new ArrayList(2);
|
||||
satisfied.add(req);
|
||||
if (req.getPendingOutboundRequested() > 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Allocating " + allocated + " bytes outbound as a partial grant to "
|
||||
+ req.getRequestName() + " (wanted "
|
||||
+ req.getTotalOutboundRequested() + " bytes, waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms) pending " + _pendingOutboundRequests.size());
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Allocating " + allocated + " bytes outbound to finish the partial grant to "
|
||||
+ req.getRequestName() + " (total "
|
||||
+ req.getTotalOutboundRequested() + " bytes, waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms) pending " + _pendingOutboundRequests.size());
|
||||
_pendingOutboundRequests.remove(0);
|
||||
}
|
||||
} else {
|
||||
// there isn't sufficient bandwidth for the first request,
|
||||
// so since we're a FIFO limiter, everyone waits. If we were a
|
||||
// best fit or ASAP limiter, we'd continue on iterating to see
|
||||
// if anyone would be satisfied with the current availability
|
||||
// no bandwidth available
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Still denying the first outbound request (" + req.getRequestName()
|
||||
+ " for "
|
||||
+ req.getRequestedOutboundBytes() + " bytes (available "
|
||||
+ req.getTotalOutboundRequested() + " bytes (available "
|
||||
+ _availableInboundBytes + "/" + _availableOutboundBytes + " in/out) (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms so far) pending " + (_pendingOutboundRequests.size()));
|
||||
return;
|
||||
break;
|
||||
}
|
||||
}
|
||||
//if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("Nothing pending");
|
||||
}
|
||||
|
||||
if (satisfied != null) {
|
||||
for (int i = 0; i < satisfied.size(); i++) {
|
||||
SimpleRequest req = (SimpleRequest)satisfied.get(i);
|
||||
req.notifyAllocation();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -256,20 +260,22 @@ public class FIFOBandwidthLimiter {
|
||||
buf.append("<li>Inbound requests: <ol>");
|
||||
synchronized (_pendingInboundRequests) {
|
||||
for (int i = 0; i < _pendingInboundRequests.size(); i++) {
|
||||
BandwidthRequest req = (BandwidthRequest)_pendingInboundRequests.get(i);
|
||||
Request req = (Request)_pendingInboundRequests.get(i);
|
||||
buf.append("<li>").append(req.getRequestName()).append(" for ");
|
||||
buf.append(req.getRequestedInboundBytes()).append(" bytes ");
|
||||
buf.append("requested ").append(now-req.getRequestTime());
|
||||
buf.append(req.getTotalInboundRequested()).append(" bytes ");
|
||||
buf.append("requested (").append(req.getPendingInboundRequested()).append(" pending) as of ");
|
||||
buf.append(now-req.getRequestTime());
|
||||
buf.append("ms ago</li>\n");
|
||||
}
|
||||
}
|
||||
buf.append("</ol></li><li>Outbound requests: <ol>\n");
|
||||
synchronized (_pendingOutboundRequests) {
|
||||
for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
|
||||
BandwidthRequest req = (BandwidthRequest)_pendingOutboundRequests.get(i);
|
||||
Request req = (Request)_pendingOutboundRequests.get(i);
|
||||
buf.append("<li>").append(req.getRequestName()).append(" for ");
|
||||
buf.append(req.getRequestedOutboundBytes()).append(" bytes ");
|
||||
buf.append("requested ").append(now-req.getRequestTime());
|
||||
buf.append(req.getTotalOutboundRequested()).append(" bytes ");
|
||||
buf.append("requested (").append(req.getPendingOutboundRequested()).append(" pending) as of ");
|
||||
buf.append(now-req.getRequestTime());
|
||||
buf.append("ms ago</li>\n");
|
||||
}
|
||||
}
|
||||
@ -278,67 +284,66 @@ public class FIFOBandwidthLimiter {
|
||||
}
|
||||
|
||||
private static long __requestId = 0;
|
||||
private final class SimpleRequest implements BandwidthRequest {
|
||||
private boolean _alreadyGranted;
|
||||
private int _in;
|
||||
private int _out;
|
||||
private final class SimpleRequest implements Request {
|
||||
private int _inAllocated;
|
||||
private int _inTotal;
|
||||
private int _outAllocated;
|
||||
private int _outTotal;
|
||||
private long _requestId;
|
||||
private long _requestTime;
|
||||
private String _target;
|
||||
|
||||
public SimpleRequest(int in, int out, String target) {
|
||||
_in = in;
|
||||
_out = out;
|
||||
_inTotal = in;
|
||||
_outTotal = out;
|
||||
_inAllocated = 0;
|
||||
_outAllocated = 0;
|
||||
_target = target;
|
||||
_alreadyGranted = false;
|
||||
_requestId = ++__requestId;
|
||||
_requestTime = _context.clock().now();
|
||||
}
|
||||
public boolean alreadyGranted() { return _alreadyGranted; }
|
||||
public Object getAvailabilityMonitor() { return SimpleRequest.this; }
|
||||
public String getRequestName() { return "Req" + _requestId + " to " + _target; }
|
||||
public int getRequestedInboundBytes() { return _in; }
|
||||
public int getRequestedOutboundBytes() { return _out; }
|
||||
public void grantRequest() { _alreadyGranted = true; }
|
||||
public long getRequestTime() { return _requestTime; }
|
||||
public int getTotalOutboundRequested() { return _outTotal; }
|
||||
public int getPendingOutboundRequested() { return _outTotal - _outAllocated; }
|
||||
public int getTotalInboundRequested() { return _inTotal; }
|
||||
public int getPendingInboundRequested() { return _inTotal - _inAllocated; }
|
||||
public void waitForNextAllocation() {
|
||||
if ( (_outAllocated >= _outTotal) &&
|
||||
(_inAllocated >= _inTotal) )
|
||||
return;
|
||||
try {
|
||||
synchronized (SimpleRequest.this) {
|
||||
SimpleRequest.this.wait();
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
void allocateBytes(int in, int out) {
|
||||
_inAllocated += in;
|
||||
_outAllocated += out;
|
||||
}
|
||||
void notifyAllocation() {
|
||||
synchronized (SimpleRequest.this) {
|
||||
SimpleRequest.this.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a request for bandwidth allocation
|
||||
*/
|
||||
private interface BandwidthRequest {
|
||||
/**
|
||||
* how can we summarize this request (in case we want to display a list
|
||||
* of 'whats pending')
|
||||
*/
|
||||
|
||||
public interface Request {
|
||||
/** describe this particular request */
|
||||
public String getRequestName();
|
||||
/**
|
||||
* How many bytes are we going to send away from the router
|
||||
*/
|
||||
public int getRequestedOutboundBytes();
|
||||
/**
|
||||
* How many bytes are we going to read from the network
|
||||
*/
|
||||
public int getRequestedInboundBytes();
|
||||
/**
|
||||
* Lock unique to this request that will be wait() & notified upon
|
||||
* during the queueing
|
||||
*/
|
||||
public Object getAvailabilityMonitor();
|
||||
/**
|
||||
* When was the bandwidth requested?
|
||||
*/
|
||||
/** when was the request made? */
|
||||
public long getRequestTime();
|
||||
/**
|
||||
* must return true only if grantRequest has been called, else
|
||||
* false
|
||||
*/
|
||||
public boolean alreadyGranted();
|
||||
/**
|
||||
* flag this request to tell it that it has been or is about to be
|
||||
* allocated sufficient bytes. This should NOT be used as the notification
|
||||
* itself
|
||||
*/
|
||||
public void grantRequest();
|
||||
/** how many outbound bytes were requested? */
|
||||
public int getTotalOutboundRequested();
|
||||
/** how many outbound bytes were requested and haven't yet been allocated? */
|
||||
public int getPendingOutboundRequested();
|
||||
/** how many inbound bytes were requested? */
|
||||
public int getTotalInboundRequested();
|
||||
/** how many inbound bytes were requested and haven't yet been allocated? */
|
||||
public int getPendingInboundRequested();
|
||||
/** block until we are allocated some more bytes */
|
||||
public void waitForNextAllocation();
|
||||
}
|
||||
}
|
||||
|
@ -8,9 +8,9 @@ class FIFOBandwidthRefiller implements Runnable {
|
||||
private I2PAppContext _context;
|
||||
private FIFOBandwidthLimiter _limiter;
|
||||
/** how many KBps do we want to allow? */
|
||||
private long _inboundKBytesPerSecond;
|
||||
private int _inboundKBytesPerSecond;
|
||||
/** how many KBps do we want to allow? */
|
||||
private long _outboundKBytesPerSecond;
|
||||
private int _outboundKBytesPerSecond;
|
||||
/** how frequently do we want to replenish the available queues? */
|
||||
private long _replenishFrequency;
|
||||
/** when did we last replenish the queue? */
|
||||
@ -27,13 +27,13 @@ class FIFOBandwidthRefiller implements Runnable {
|
||||
public static final String PROP_REPLENISH_FREQUENCY = "i2np.bandwidth.replenishFrequencyMs";
|
||||
|
||||
/** For now, until there is some tuning and safe throttling, we set the floor at 6KBps inbound */
|
||||
public static final long MIN_INBOUND_BANDWIDTH = 6;
|
||||
public static final int MIN_INBOUND_BANDWIDTH = 6;
|
||||
/** For now, until there is some tuning and safe throttling, we set the floor at 6KBps outbound */
|
||||
public static final long MIN_OUTBOUND_BANDWIDTH = 6;
|
||||
public static final int MIN_OUTBOUND_BANDWIDTH = 6;
|
||||
/** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */
|
||||
public static final long MIN_INBOUND_BANDWIDTH_PEAK = 60;
|
||||
public static final int MIN_INBOUND_BANDWIDTH_PEAK = 6;
|
||||
/** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */
|
||||
public static final long MIN_OUTBOUND_BANDWIDTH_PEAK = 60;
|
||||
public static final int MIN_OUTBOUND_BANDWIDTH_PEAK = 6;
|
||||
/** Updating the bandwidth more than once a second is silly. once every 2 or 5 seconds is less so. */
|
||||
public static final long MIN_REPLENISH_FREQUENCY = 1000;
|
||||
|
||||
@ -134,7 +134,7 @@ class FIFOBandwidthRefiller implements Runnable {
|
||||
(!(inBwStr.equals(String.valueOf(_inboundKBytesPerSecond)))) ) {
|
||||
// bandwidth was specified *and* changed
|
||||
try {
|
||||
long in = Long.parseLong(inBwStr);
|
||||
int in = Integer.parseInt(inBwStr);
|
||||
if ( (in <= 0) || (in > MIN_INBOUND_BANDWIDTH) )
|
||||
_inboundKBytesPerSecond = in;
|
||||
else
|
||||
@ -157,7 +157,7 @@ class FIFOBandwidthRefiller implements Runnable {
|
||||
(!(outBwStr.equals(String.valueOf(_outboundKBytesPerSecond)))) ) {
|
||||
// bandwidth was specified *and* changed
|
||||
try {
|
||||
long out = Long.parseLong(outBwStr);
|
||||
int out = Integer.parseInt(outBwStr);
|
||||
if ( (out <= 0) || (out >= MIN_OUTBOUND_BANDWIDTH) )
|
||||
_outboundKBytesPerSecond = out;
|
||||
else
|
||||
@ -180,11 +180,18 @@ class FIFOBandwidthRefiller implements Runnable {
|
||||
(!(inBwStr.equals(String.valueOf(_limiter.getMaxInboundBytes())))) ) {
|
||||
// peak bw was specified *and* changed
|
||||
try {
|
||||
long in = Long.parseLong(inBwStr);
|
||||
if (in >= MIN_INBOUND_BANDWIDTH_PEAK)
|
||||
_limiter.setMaxInboundBytes(in * 1024);
|
||||
else
|
||||
_limiter.setMaxInboundBytes(MIN_INBOUND_BANDWIDTH_PEAK * 1024);
|
||||
int in = Integer.parseInt(inBwStr);
|
||||
if (in >= MIN_INBOUND_BANDWIDTH_PEAK) {
|
||||
if (in < _inboundKBytesPerSecond)
|
||||
_limiter.setMaxInboundBytes(_inboundKBytesPerSecond * 1024);
|
||||
else
|
||||
_limiter.setMaxInboundBytes(in * 1024);
|
||||
} else {
|
||||
if (MIN_INBOUND_BANDWIDTH_PEAK < _inboundKBytesPerSecond)
|
||||
_limiter.setMaxInboundBytes(_inboundKBytesPerSecond * 1024);
|
||||
else
|
||||
_limiter.setMaxInboundBytes(MIN_INBOUND_BANDWIDTH_PEAK * 1024);
|
||||
}
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid inbound bandwidth burst limit [" + inBwStr
|
||||
@ -203,11 +210,18 @@ class FIFOBandwidthRefiller implements Runnable {
|
||||
(!(outBwStr.equals(String.valueOf(_limiter.getMaxOutboundBytes())))) ) {
|
||||
// peak bw was specified *and* changed
|
||||
try {
|
||||
long out = Long.parseLong(outBwStr);
|
||||
if (out >= MIN_OUTBOUND_BANDWIDTH_PEAK)
|
||||
_limiter.setMaxOutboundBytes(out * 1024);
|
||||
else
|
||||
_limiter.setMaxOutboundBytes(MIN_OUTBOUND_BANDWIDTH_PEAK * 1024);
|
||||
int out = Integer.parseInt(outBwStr);
|
||||
if (out >= MIN_OUTBOUND_BANDWIDTH_PEAK) {
|
||||
if (out < _outboundKBytesPerSecond)
|
||||
_limiter.setMaxOutboundBytes(_outboundKBytesPerSecond * 1024);
|
||||
else
|
||||
_limiter.setMaxOutboundBytes(out * 1024);
|
||||
} else {
|
||||
if (MIN_OUTBOUND_BANDWIDTH_PEAK < _outboundKBytesPerSecond)
|
||||
_limiter.setMaxOutboundBytes(_outboundKBytesPerSecond * 1024);
|
||||
else
|
||||
_limiter.setMaxOutboundBytes(MIN_OUTBOUND_BANDWIDTH_PEAK * 1024);
|
||||
}
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid outbound bandwidth burst limit [" + outBwStr
|
||||
|
Reference in New Issue
Block a user