handle disconnect while there are still requests pending

This commit is contained in:
jrandom
2004-07-24 17:54:49 +00:00
committed by zzz
parent 3fd35a9c18
commit ce3e7e623c
3 changed files with 99 additions and 22 deletions

View File

@ -14,12 +14,15 @@ import java.io.InputStream;
import net.i2p.data.RouterIdentity;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
public class BandwidthLimitedInputStream extends FilterInputStream {
private Log _log;
private RouterIdentity _peer;
private String _peerSource;
private RouterContext _context;
private boolean _pullFromOutbound;
private FIFOBandwidthLimiter.Request _currentRequest;
public BandwidthLimitedInputStream(RouterContext context, InputStream source, RouterIdentity peer) {
this(context, source, peer, false);
@ -35,18 +38,21 @@ public class BandwidthLimitedInputStream extends FilterInputStream {
if (peer != null)
_peerSource = peer.getHash().toBase64();
_pullFromOutbound = pullFromOutbound;
_log = context.logManager().getLog(BandwidthLimitedInputStream.class);
}
public int read() throws IOException {
FIFOBandwidthLimiter.Request req = null;
if (_pullFromOutbound)
req = _context.bandwidthLimiter().requestOutbound(1, _peerSource);
_currentRequest = _context.bandwidthLimiter().requestOutbound(1, _peerSource);
else
req = _context.bandwidthLimiter().requestInbound(1, _peerSource);
_currentRequest = _context.bandwidthLimiter().requestInbound(1, _peerSource);
// since its only a single byte, we dont need to loop
// or check how much was allocated
req.waitForNextAllocation();
_currentRequest.waitForNextAllocation();
synchronized (this) {
_currentRequest = null;
}
return in.read();
}
@ -56,32 +62,51 @@ public class BandwidthLimitedInputStream extends FilterInputStream {
public int read(byte dest[], int off, int len) throws IOException {
int read = in.read(dest, off, len);
FIFOBandwidthLimiter.Request req = null;
if (_pullFromOutbound)
req = _context.bandwidthLimiter().requestOutbound(read, _peerSource);
_currentRequest = _context.bandwidthLimiter().requestOutbound(read, _peerSource);
else
req = _context.bandwidthLimiter().requestInbound(read, _peerSource);
_currentRequest = _context.bandwidthLimiter().requestInbound(read, _peerSource);
while ( (req.getPendingInboundRequested() > 0) ||
(req.getPendingOutboundRequested() > 0) ) {
while ( (_currentRequest.getPendingInboundRequested() > 0) ||
(_currentRequest.getPendingOutboundRequested() > 0) ) {
// we still haven't been authorized for everything, keep on waiting
req.waitForNextAllocation();
_currentRequest.waitForNextAllocation();
if (_currentRequest.getAborted()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Request aborted while trying to read " + len + " (actually read " + read + ")");
break;
}
}
synchronized (this) {
_currentRequest = null;
}
return read;
}
public long skip(long numBytes) throws IOException {
long skip = in.skip(numBytes);
FIFOBandwidthLimiter.Request req = null;
if (_pullFromOutbound)
req = _context.bandwidthLimiter().requestOutbound((int)skip, _peerSource);
_currentRequest = _context.bandwidthLimiter().requestOutbound((int)skip, _peerSource);
else
req = _context.bandwidthLimiter().requestInbound((int)skip, _peerSource);
_currentRequest = _context.bandwidthLimiter().requestInbound((int)skip, _peerSource);
while ( (req.getPendingInboundRequested() > 0) ||
(req.getPendingOutboundRequested() > 0) ) {
while ( (_currentRequest.getPendingInboundRequested() > 0) ||
(_currentRequest.getPendingOutboundRequested() > 0) ) {
// we still haven't been authorized for everything, keep on waiting
req.waitForNextAllocation();
_currentRequest.waitForNextAllocation();
if (_currentRequest.getAborted()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Request aborted while trying to skip " + numBytes);
break;
}
}
return skip;
}
public void close() throws IOException {
synchronized (this) {
if (_currentRequest != null)
_currentRequest.abort();
}
super.close();
}
}

View File

@ -21,6 +21,7 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
private String _peerTarget;
private RouterContext _context;
private Log _log;
private FIFOBandwidthLimiter.Request _currentRequest;
public BandwidthLimitedOutputStream(RouterContext context, OutputStream source, RouterIdentity peer) {
super(source);
@ -31,6 +32,7 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
else
_peerTarget = "unknown";
_log = context.logManager().getLog(BandwidthLimitedOutputStream.class);
_currentRequest = null;
}
public void write(int val) throws IOException {
@ -52,17 +54,33 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
if (len + off > src.length)
throw new IllegalArgumentException("wtf are you thinking? len=" + len
+ ", off=" + off + ", data=" + src.length);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(len, _peerTarget);
_currentRequest = _context.bandwidthLimiter().requestOutbound(len, _peerTarget);
int written = 0;
while (written < len) {
int allocated = len - req.getPendingOutboundRequested();
int allocated = len - _currentRequest.getPendingOutboundRequested();
int toWrite = allocated - written;
if (toWrite > 0) {
try {
out.write(src, off + written, toWrite);
} catch (IOException ioe) {
_currentRequest.abort();
throw ioe;
}
written += toWrite;
}
req.waitForNextAllocation();
_currentRequest.waitForNextAllocation();
}
synchronized (this) {
_currentRequest = null;
}
}
public void close() throws IOException {
synchronized (this) {
if (_currentRequest != null)
_currentRequest.abort();
}
super.close();
}
}

View File

@ -191,6 +191,19 @@ public class FIFOBandwidthLimiter {
for (int i = 0; i < _pendingInboundRequests.size(); i++) {
if (_availableInboundBytes <= 0) break;
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(i);
long waited = _context.clock().now() - req.getRequestTime();
if (req.getAborted()) {
// connection decided they dont want the data anymore
if (_log.shouldLog(Log.INFO))
_log.info("Aborting inbound request to "
+ req.getRequestName() + " (total "
+ req.getTotalInboundRequested() + " bytes, waited "
+ waited
+ "ms) pending " + _pendingInboundRequests.size());
_pendingInboundRequests.remove(i);
i--;
continue;
}
if (req.getAllocationsSinceWait() > 0) {
// we have already allocated some values to this request, but
// they haven't taken advantage of it yet (most likely they're
@ -210,7 +223,6 @@ public class FIFOBandwidthLimiter {
if (satisfied == null)
satisfied = new ArrayList(2);
satisfied.add(req);
long waited = _context.clock().now() - req.getRequestTime();
if (req.getPendingInboundRequested() > 0) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocating " + allocated + " bytes inbound as a partial grant to "
@ -300,6 +312,19 @@ public class FIFOBandwidthLimiter {
for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
if (_availableOutboundBytes <= 0) break;
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(i);
long waited = _context.clock().now() - req.getRequestTime();
if (req.getAborted()) {
// connection decided they dont want the data anymore
if (_log.shouldLog(Log.INFO))
_log.info("Aborting outbound request to "
+ req.getRequestName() + " (total "
+ req.getTotalOutboundRequested() + " bytes, waited "
+ waited
+ "ms) pending " + _pendingOutboundRequests.size());
_pendingOutboundRequests.remove(i);
i--;
continue;
}
if (req.getAllocationsSinceWait() > 0) {
// we have already allocated some values to this request, but
// they haven't taken advantage of it yet (most likely they're
@ -319,7 +344,6 @@ public class FIFOBandwidthLimiter {
if (satisfied == null)
satisfied = new ArrayList(2);
satisfied.add(req);
long waited = _context.clock().now() - req.getRequestTime();
if (req.getPendingOutboundRequested() > 0) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocating " + allocated + " bytes outbound as a partial grant to "
@ -385,12 +409,14 @@ public class FIFOBandwidthLimiter {
private long _requestTime;
private String _target;
private int _allocationsSinceWait;
private boolean _aborted;
public SimpleRequest(int in, int out, String target) {
_inTotal = in;
_outTotal = out;
_inAllocated = 0;
_outAllocated = 0;
_aborted = false;
_target = target;
_requestId = ++__requestId;
_requestTime = _context.clock().now();
@ -402,6 +428,9 @@ public class FIFOBandwidthLimiter {
public int getPendingOutboundRequested() { return _outTotal - _outAllocated; }
public int getTotalInboundRequested() { return _inTotal; }
public int getPendingInboundRequested() { return _inTotal - _inAllocated; }
public boolean getAborted() { return _aborted; }
public void abort() { _aborted = true; }
public void waitForNextAllocation() {
_allocationsSinceWait = 0;
if ( (_outAllocated >= _outTotal) &&
@ -441,5 +470,10 @@ public class FIFOBandwidthLimiter {
public int getPendingInboundRequested();
/** block until we are allocated some more bytes */
public void waitForNextAllocation();
/** we no longer want the data requested (e.g. the connection closed */
public void abort();
/** was this request aborted? */
public boolean getAborted();
}
}