Crappy fix for incorrect Total Bytes Sent/Total Bytes Received via BandwidthLimiter.... ah.. just read the FIXME there.

shendaras
This commit is contained in:
shendaras
2004-06-30 13:16:05 +00:00
committed by zzz
parent dcdcb7521a
commit d5bd22040c
7 changed files with 30 additions and 17 deletions

View File

@ -36,7 +36,7 @@ public class BandwidthLimitedInputStream extends FilterInputStream {
public int read() throws IOException { public int read() throws IOException {
if (_pullFromOutbound) if (_pullFromOutbound)
_context.bandwidthLimiter().delayOutbound(_peer, 1); _context.bandwidthLimiter().delayOutbound(_peer, 1, true);
else else
_context.bandwidthLimiter().delayInbound(_peer, 1); _context.bandwidthLimiter().delayInbound(_peer, 1);
return in.read(); return in.read();
@ -45,7 +45,7 @@ public class BandwidthLimitedInputStream extends FilterInputStream {
public int read(byte dest[]) throws IOException { public int read(byte dest[]) throws IOException {
int read = in.read(dest); int read = in.read(dest);
if (_pullFromOutbound) if (_pullFromOutbound)
_context.bandwidthLimiter().delayOutbound(_peer, read); _context.bandwidthLimiter().delayOutbound(_peer, read, true);
else else
_context.bandwidthLimiter().delayInbound(_peer, read); _context.bandwidthLimiter().delayInbound(_peer, read);
return read; return read;
@ -54,7 +54,7 @@ public class BandwidthLimitedInputStream extends FilterInputStream {
public int read(byte dest[], int off, int len) throws IOException { public int read(byte dest[], int off, int len) throws IOException {
int read = in.read(dest, off, len); int read = in.read(dest, off, len);
if (_pullFromOutbound) if (_pullFromOutbound)
_context.bandwidthLimiter().delayOutbound(_peer, read); _context.bandwidthLimiter().delayOutbound(_peer, read, true);
else else
_context.bandwidthLimiter().delayInbound(_peer, read); _context.bandwidthLimiter().delayInbound(_peer, read);
return read; return read;
@ -62,7 +62,7 @@ public class BandwidthLimitedInputStream extends FilterInputStream {
public long skip(long numBytes) throws IOException { public long skip(long numBytes) throws IOException {
long skip = in.skip(numBytes); long skip = in.skip(numBytes);
if (_pullFromOutbound) if (_pullFromOutbound)
_context.bandwidthLimiter().delayOutbound(_peer, (int)skip); _context.bandwidthLimiter().delayOutbound(_peer, (int)skip, true);
else else
_context.bandwidthLimiter().delayInbound(_peer, (int)skip); _context.bandwidthLimiter().delayInbound(_peer, (int)skip);
return skip; return skip;

View File

@ -33,7 +33,7 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
public void write(int val) throws IOException { public void write(int val) throws IOException {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Writing a single byte!", new Exception("Single byte from...")); _log.debug("Writing a single byte!", new Exception("Single byte from..."));
_context.bandwidthLimiter().delayOutbound(_peer, 1); _context.bandwidthLimiter().delayOutbound(_peer, 1, false);
out.write(val); out.write(val);
} }
public void write(byte src[]) throws IOException { public void write(byte src[]) throws IOException {
@ -48,18 +48,18 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
throw new IllegalArgumentException("wtf are you thinking? len=" + len throw new IllegalArgumentException("wtf are you thinking? len=" + len
+ ", off=" + off + ", data=" + src.length); + ", off=" + off + ", data=" + src.length);
if (len <= CHUNK_SIZE) { if (len <= CHUNK_SIZE) {
_context.bandwidthLimiter().delayOutbound(_peer, len); _context.bandwidthLimiter().delayOutbound(_peer, len, false);
out.write(src, off, len); out.write(src, off, len);
} else { } else {
int i = 0; int i = 0;
while (i+CHUNK_SIZE < len) { while (i+CHUNK_SIZE < len) {
_context.bandwidthLimiter().delayOutbound(_peer, CHUNK_SIZE); _context.bandwidthLimiter().delayOutbound(_peer, CHUNK_SIZE, false);
out.write(src, off+i, CHUNK_SIZE); out.write(src, off+i, CHUNK_SIZE);
i += CHUNK_SIZE; i += CHUNK_SIZE;
} }
int remainder = (len % CHUNK_SIZE); int remainder = (len % CHUNK_SIZE);
if (remainder > 0) { if (remainder > 0) {
_context.bandwidthLimiter().delayOutbound(_peer, remainder); _context.bandwidthLimiter().delayOutbound(_peer, remainder, false);
out.write(src, i, remainder); out.write(src, i, remainder);
} }
} }

View File

@ -26,8 +26,13 @@ public interface BandwidthLimiter {
/** /**
* Delay the required amount of time before returning so that sending numBytes * Delay the required amount of time before returning so that sending numBytes
* to the peer will not violate the bandwidth limits * to the peer will not violate the bandwidth limits
*
* FIXME: Added 'pulled' to fix an oversight with regards to getTotalReceiveBytes().
* BandwidthLimitedInputStream can pull from the outbound bandwidth, but
* this leads to an incorrect value from getTotalReceiveBytes() with
* TrivialBandwidthLimited. This is an inelegant solution, so fix it! =)
*/ */
public void delayOutbound(RouterIdentity peer, int numBytes); public void delayOutbound(RouterIdentity peer, int numBytes, boolean pulled);
public long getTotalSendBytes(); public long getTotalSendBytes();
public long getTotalReceiveBytes(); public long getTotalReceiveBytes();

View File

@ -116,8 +116,10 @@ public class TrivialBandwidthLimiter implements BandwidthLimiter {
/** /**
* Delay the required amount of time before returning so that sending numBytes * Delay the required amount of time before returning so that sending numBytes
* to the peer will not violate the bandwidth limits * to the peer will not violate the bandwidth limits
*
* FIXME: 'pulled' was added. See FIXME in BandwidthLimiter
*/ */
public void delayOutbound(RouterIdentity peer, int numBytes) { public void delayOutbound(RouterIdentity peer, int numBytes, boolean pulled) {
long delay = 0; long delay = 0;
while ( (delay = calculateDelayOutbound(peer, numBytes)) > 0) { while ( (delay = calculateDelayOutbound(peer, numBytes)) > 0) {
try { try {
@ -127,7 +129,7 @@ public class TrivialBandwidthLimiter implements BandwidthLimiter {
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
synchronized (_outboundWaitLock) { _outboundWaitLock.notify(); } synchronized (_outboundWaitLock) { _outboundWaitLock.notify(); }
consumeOutbound(peer, numBytes); consumeOutbound(peer, numBytes, pulled);
} }
public long getTotalSendBytes() { return _totalOutboundBytes; } public long getTotalSendBytes() { return _totalOutboundBytes; }
@ -198,9 +200,15 @@ public class TrivialBandwidthLimiter implements BandwidthLimiter {
/** /**
* Note that numBytes have been sent to the peer * Note that numBytes have been sent to the peer
*/ */
private void consumeOutbound(RouterIdentity peer, int numBytes) { private void consumeOutbound(RouterIdentity peer, int numBytes, boolean pulled) {
if (numBytes > 0) if (numBytes > 0)
{
if (pulled) { // FIXME: fix to give the correct value from getTotalReceiveBytes()
_totalInboundBytes += numBytes;
} else {
_totalOutboundBytes += numBytes; _totalOutboundBytes += numBytes;
}
}
if (_outboundKBytesPerSecond > 0) if (_outboundKBytesPerSecond > 0)
_outboundAvailable -= numBytes; _outboundAvailable -= numBytes;
} }

View File

@ -128,7 +128,7 @@ class PHTTPPoller {
byte authData[] = getAuthData(); byte authData[] = getAuthData();
if (authData == null) return 0; if (authData == null) return 0;
_context.bandwidthLimiter().delayOutbound(null, authData.length + 512); // HTTP overhead _context.bandwidthLimiter().delayOutbound(null, authData.length + 512, false); // HTTP overhead
try { try {
_log.debug("Before opening " + _pollURL.toExternalForm()); _log.debug("Before opening " + _pollURL.toExternalForm());

View File

@ -113,7 +113,7 @@ class PHTTPSender {
byte data[] = getData(msg); byte data[] = getData(msg);
if (data == null) return false; if (data == null) return false;
_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), data.length+512); // HTTP overhead _context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), data.length+512, false); // HTTP overhead
con.setRequestProperty("Content-length", ""+data.length); con.setRequestProperty("Content-length", ""+data.length);
OutputStream out = con.getOutputStream(); OutputStream out = con.getOutputStream();
@ -176,7 +176,7 @@ class PHTTPSender {
URL checkStatusURL = new URL(checkURLStr); URL checkStatusURL = new URL(checkURLStr);
long delay = RECHECK_DELAY; long delay = RECHECK_DELAY;
do { do {
_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), 512); // HTTP overhead _context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), 512, false); // HTTP overhead
_context.bandwidthLimiter().delayInbound(msg.getTarget().getIdentity(), 512); // HTTP overhead _context.bandwidthLimiter().delayInbound(msg.getTarget().getIdentity(), 512); // HTTP overhead
_log.debug("Checking delivery at " + checkURLStr); _log.debug("Checking delivery at " + checkURLStr);

View File

@ -160,7 +160,7 @@ public class PHTTPTransport extends TransportImpl {
_context.router().getRouterInfo().getIdentity().writeBytes(baos); _context.router().getRouterInfo().getIdentity().writeBytes(baos);
int postLength = baos.size(); int postLength = baos.size();
_context.bandwidthLimiter().delayOutbound(null, postLength+512); // HTTP overhead _context.bandwidthLimiter().delayOutbound(null, postLength+512, false); // HTTP overhead
_context.bandwidthLimiter().delayInbound(null, 2048+512); // HTTP overhead _context.bandwidthLimiter().delayInbound(null, 2048+512); // HTTP overhead
long now = _context.clock().now(); long now = _context.clock().now();