diff --git a/router/java/src/net/i2p/router/transport/BandwidthLimitedInputStream.java b/router/java/src/net/i2p/router/transport/BandwidthLimitedInputStream.java index 4762c4b558..0464386bf8 100644 --- a/router/java/src/net/i2p/router/transport/BandwidthLimitedInputStream.java +++ b/router/java/src/net/i2p/router/transport/BandwidthLimitedInputStream.java @@ -14,12 +14,15 @@ import java.io.InputStream; import net.i2p.data.RouterIdentity; import net.i2p.router.RouterContext; +import net.i2p.util.Log; public class BandwidthLimitedInputStream extends FilterInputStream { + private Log _log; private RouterIdentity _peer; private String _peerSource; private RouterContext _context; private boolean _pullFromOutbound; + private FIFOBandwidthLimiter.Request _currentRequest; public BandwidthLimitedInputStream(RouterContext context, InputStream source, RouterIdentity peer) { this(context, source, peer, false); @@ -35,18 +38,21 @@ public class BandwidthLimitedInputStream extends FilterInputStream { if (peer != null) _peerSource = peer.getHash().toBase64(); _pullFromOutbound = pullFromOutbound; + _log = context.logManager().getLog(BandwidthLimitedInputStream.class); } public int read() throws IOException { - FIFOBandwidthLimiter.Request req = null; if (_pullFromOutbound) - req = _context.bandwidthLimiter().requestOutbound(1, _peerSource); + _currentRequest = _context.bandwidthLimiter().requestOutbound(1, _peerSource); else - req = _context.bandwidthLimiter().requestInbound(1, _peerSource); + _currentRequest = _context.bandwidthLimiter().requestInbound(1, _peerSource); // since its only a single byte, we dont need to loop // or check how much was allocated - req.waitForNextAllocation(); + _currentRequest.waitForNextAllocation(); + synchronized (this) { + _currentRequest = null; + } return in.read(); } @@ -56,32 +62,51 @@ public class BandwidthLimitedInputStream extends FilterInputStream { public int read(byte dest[], int off, int len) throws IOException { int read = in.read(dest, off, len); - FIFOBandwidthLimiter.Request req = null; if (_pullFromOutbound) - req = _context.bandwidthLimiter().requestOutbound(read, _peerSource); + _currentRequest = _context.bandwidthLimiter().requestOutbound(read, _peerSource); else - req = _context.bandwidthLimiter().requestInbound(read, _peerSource); + _currentRequest = _context.bandwidthLimiter().requestInbound(read, _peerSource); - while ( (req.getPendingInboundRequested() > 0) || - (req.getPendingOutboundRequested() > 0) ) { + while ( (_currentRequest.getPendingInboundRequested() > 0) || + (_currentRequest.getPendingOutboundRequested() > 0) ) { // we still haven't been authorized for everything, keep on waiting - req.waitForNextAllocation(); + _currentRequest.waitForNextAllocation(); + if (_currentRequest.getAborted()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Request aborted while trying to read " + len + " (actually read " + read + ")"); + break; + } + } + synchronized (this) { + _currentRequest = null; } return read; } public long skip(long numBytes) throws IOException { long skip = in.skip(numBytes); - FIFOBandwidthLimiter.Request req = null; if (_pullFromOutbound) - req = _context.bandwidthLimiter().requestOutbound((int)skip, _peerSource); + _currentRequest = _context.bandwidthLimiter().requestOutbound((int)skip, _peerSource); else - req = _context.bandwidthLimiter().requestInbound((int)skip, _peerSource); + _currentRequest = _context.bandwidthLimiter().requestInbound((int)skip, _peerSource); - while ( (req.getPendingInboundRequested() > 0) || - (req.getPendingOutboundRequested() > 0) ) { + while ( (_currentRequest.getPendingInboundRequested() > 0) || + (_currentRequest.getPendingOutboundRequested() > 0) ) { // we still haven't been authorized for everything, keep on waiting - req.waitForNextAllocation(); + _currentRequest.waitForNextAllocation(); + if (_currentRequest.getAborted()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Request aborted while trying to skip " + numBytes); + break; + } } return skip; } + + public void close() throws IOException { + synchronized (this) { + if (_currentRequest != null) + _currentRequest.abort(); + } + super.close(); + } } diff --git a/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java b/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java index a2b625e3fc..504ef1c121 100644 --- a/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java +++ b/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java @@ -21,6 +21,7 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream { private String _peerTarget; private RouterContext _context; private Log _log; + private FIFOBandwidthLimiter.Request _currentRequest; public BandwidthLimitedOutputStream(RouterContext context, OutputStream source, RouterIdentity peer) { super(source); @@ -31,6 +32,7 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream { else _peerTarget = "unknown"; _log = context.logManager().getLog(BandwidthLimitedOutputStream.class); + _currentRequest = null; } public void write(int val) throws IOException { @@ -52,17 +54,33 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream { if (len + off > src.length) throw new IllegalArgumentException("wtf are you thinking? len=" + len + ", off=" + off + ", data=" + src.length); - FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(len, _peerTarget); + _currentRequest = _context.bandwidthLimiter().requestOutbound(len, _peerTarget); int written = 0; while (written < len) { - int allocated = len - req.getPendingOutboundRequested(); + int allocated = len - _currentRequest.getPendingOutboundRequested(); int toWrite = allocated - written; if (toWrite > 0) { - out.write(src, off + written, toWrite); + try { + out.write(src, off + written, toWrite); + } catch (IOException ioe) { + _currentRequest.abort(); + throw ioe; + } written += toWrite; } - req.waitForNextAllocation(); + _currentRequest.waitForNextAllocation(); + } + synchronized (this) { + _currentRequest = null; } } + + public void close() throws IOException { + synchronized (this) { + if (_currentRequest != null) + _currentRequest.abort(); + } + super.close(); + } } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index 55a9ee342f..9d3e6a5463 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -191,6 +191,19 @@ public class FIFOBandwidthLimiter { for (int i = 0; i < _pendingInboundRequests.size(); i++) { if (_availableInboundBytes <= 0) break; SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(i); + long waited = _context.clock().now() - req.getRequestTime(); + if (req.getAborted()) { + // connection decided they dont want the data anymore + if (_log.shouldLog(Log.INFO)) + _log.info("Aborting inbound request to " + + req.getRequestName() + " (total " + + req.getTotalInboundRequested() + " bytes, waited " + + waited + + "ms) pending " + _pendingInboundRequests.size()); + _pendingInboundRequests.remove(i); + i--; + continue; + } if (req.getAllocationsSinceWait() > 0) { // we have already allocated some values to this request, but // they haven't taken advantage of it yet (most likely they're @@ -210,7 +223,6 @@ public class FIFOBandwidthLimiter { if (satisfied == null) satisfied = new ArrayList(2); satisfied.add(req); - long waited = _context.clock().now() - req.getRequestTime(); if (req.getPendingInboundRequested() > 0) { if (_log.shouldLog(Log.INFO)) _log.info("Allocating " + allocated + " bytes inbound as a partial grant to " @@ -300,6 +312,19 @@ public class FIFOBandwidthLimiter { for (int i = 0; i < _pendingOutboundRequests.size(); i++) { if (_availableOutboundBytes <= 0) break; SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(i); + long waited = _context.clock().now() - req.getRequestTime(); + if (req.getAborted()) { + // connection decided they dont want the data anymore + if (_log.shouldLog(Log.INFO)) + _log.info("Aborting outbound request to " + + req.getRequestName() + " (total " + + req.getTotalOutboundRequested() + " bytes, waited " + + waited + + "ms) pending " + _pendingOutboundRequests.size()); + _pendingOutboundRequests.remove(i); + i--; + continue; + } if (req.getAllocationsSinceWait() > 0) { // we have already allocated some values to this request, but // they haven't taken advantage of it yet (most likely they're @@ -319,7 +344,6 @@ public class FIFOBandwidthLimiter { if (satisfied == null) satisfied = new ArrayList(2); satisfied.add(req); - long waited = _context.clock().now() - req.getRequestTime(); if (req.getPendingOutboundRequested() > 0) { if (_log.shouldLog(Log.INFO)) _log.info("Allocating " + allocated + " bytes outbound as a partial grant to " @@ -385,12 +409,14 @@ public class FIFOBandwidthLimiter { private long _requestTime; private String _target; private int _allocationsSinceWait; + private boolean _aborted; public SimpleRequest(int in, int out, String target) { _inTotal = in; _outTotal = out; _inAllocated = 0; _outAllocated = 0; + _aborted = false; _target = target; _requestId = ++__requestId; _requestTime = _context.clock().now(); @@ -402,6 +428,9 @@ public class FIFOBandwidthLimiter { public int getPendingOutboundRequested() { return _outTotal - _outAllocated; } public int getTotalInboundRequested() { return _inTotal; } public int getPendingInboundRequested() { return _inTotal - _inAllocated; } + public boolean getAborted() { return _aborted; } + public void abort() { _aborted = true; } + public void waitForNextAllocation() { _allocationsSinceWait = 0; if ( (_outAllocated >= _outTotal) && @@ -441,5 +470,10 @@ public class FIFOBandwidthLimiter { public int getPendingInboundRequested(); /** block until we are allocated some more bytes */ public void waitForNextAllocation(); + /** we no longer want the data requested (e.g. the connection closed */ + public void abort(); + /** was this request aborted? */ + public boolean getAborted(); + } }