forked from I2P_Developers/i2p.i2p
implemented the FIFO bandwidth limiter which, suprisingly, chokes read/write operations
if they would exceed the currently available # of tokens, letting through those operations in the order they are called. like the old trivial bandwidth limiter, this uses a token bucket approach (keeping a pool of 'available' bytes, decrementing on their use and periodically refilling it [up to a max limit, to prevent absurd bursts]). on the other hand, it doesn't have the starvation issues the old one had, which would continue to let small operations go through (e.g. 8 byte write) and potentially block large operations indefinitely (e.g. 32KB write). However, this new version is, how shall I put it, context switch heavy? :) We'll revise with a scheduling / queueing algorithm once we're away from transports that require threads per connection The two directions (input and output) are managed on their own queues, and if/when things are backed up, you can see the details of what operations have been requested on the router console. Since we still need better router throttling code (to reject tunnels and back off more accurately), I've included a minimum KBps on the limiter, currently set to 6KBps both ways. Once there is good throttling code, we can drop that to 1-2KBps, and maybe even less after we do some bandwidth usage tuning. There were also a few minor touch ups to handle message data being discarded earlier than it had been before (since write/read operations can now take a long period of time in the face of contention) The five config properties for the bandwidth limiter are: * i2np.bandwidth.inboundKBytesPerSecond * i2np.bandwidth.outboundKBytesPerSecond (you can guess what those are) * i2np.bandwidth.inboundBurstKBytes * i2np.bandwidth.outboundBurstKBytes the burst KBytes specify how many bytes we'll let accumulate in the bucket, allowing us to burst after a period of inactivity. excess tokens greater than this limit are discarded. * i2np.bandwidth.replenishFrequencyMs this is an internal setting, used to specify how frequently to refil the buckets (min value of 1s, which is the default) You may want to hold off on using these parameters though until the next release, leaving it to the default of unlimited. They are read periodically from the config file however, so you can update them without restart / etc. (if you want to have no limit on the bandwidth, set the KBytesPerSecond to a value <= 0)
This commit is contained in:
@ -125,8 +125,11 @@ public class MultiRouterBuilder {
|
||||
baseDir.mkdirs();
|
||||
File cfgFile = new File(baseDir, "router.config");
|
||||
StringBuffer buf = new StringBuffer(8*1024);
|
||||
buf.append("i2np.bandwidth.inboundBytesPerMinute=-60\n");
|
||||
buf.append("i2np.bandwidth.outboundBytesPerMinute=-60\n");
|
||||
buf.append("i2np.bandwidth.inboundKBytesPerSecond=8\n");
|
||||
buf.append("i2np.bandwidth.outboundKBytesPerSecond=8\n");
|
||||
buf.append("i2np.bandwidth.inboundBurstKBytes=80\n");
|
||||
buf.append("i2np.bandwidth.outboundBurstKBytes=80\n");
|
||||
buf.append("i2np.bandwidth.replenishFrequencyMs=1000\n");
|
||||
buf.append("router.publishPeerRankings=true\n");
|
||||
buf.append("router.keepHistory=false\n");
|
||||
buf.append("router.submitHistory=false\n");
|
||||
|
@ -54,7 +54,8 @@ public class OutNetMessagePool {
|
||||
private boolean validate(OutNetMessage msg) {
|
||||
if (msg == null) return false;
|
||||
if (msg.getMessage() == null) {
|
||||
_log.error("Null message in the OutNetMessage: " + msg, new Exception("Someone fucked up"));
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Null message in the OutNetMessage - expired too soon");
|
||||
return false;
|
||||
}
|
||||
if (msg.getTarget() == null) {
|
||||
|
@ -261,8 +261,8 @@ public class Router {
|
||||
buf.append("<b><font color=\"red\">HIGHER VERSION SEEN</font><b> - please <a href=\"http://i2p.dnsalias.net/\">check</a> to see if there is a new release out<br />\n");
|
||||
|
||||
buf.append("<hr /><a name=\"bandwidth\"> </a><h2>Bandwidth</h2>\n");
|
||||
long sent = _context.bandwidthLimiter().getTotalSendBytes();
|
||||
long received = _context.bandwidthLimiter().getTotalReceiveBytes();
|
||||
long sent = _context.bandwidthLimiter().getTotalAllocatedOutboundBytes();
|
||||
long received = _context.bandwidthLimiter().getTotalAllocatedInboundBytes();
|
||||
buf.append("<ul>");
|
||||
|
||||
buf.append("<li> ").append(sent).append(" bytes sent, ");
|
||||
@ -351,6 +351,8 @@ public class Router {
|
||||
buf.append("while period averages count how fast the transfers go across the entire period, even when we're not ");
|
||||
buf.append("trying to transfer data. Lifetime averages count how many elephants there are on the moon [like anyone reads this text]</i>");
|
||||
buf.append("\n");
|
||||
|
||||
buf.append(_context.bandwidthLimiter().renderStatusHTML());
|
||||
|
||||
buf.append("<hr /><a name=\"clients\"> </a>\n");
|
||||
buf.append(_context.clientManager().renderStatusHTML());
|
||||
|
@ -14,10 +14,9 @@ import net.i2p.router.peermanager.ProfileManagerImpl;
|
||||
import net.i2p.router.peermanager.ProfileOrganizer;
|
||||
import net.i2p.router.peermanager.ReliabilityCalculator;
|
||||
import net.i2p.router.peermanager.SpeedCalculator;
|
||||
import net.i2p.router.transport.BandwidthLimiter;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
||||
import net.i2p.router.transport.CommSystemFacadeImpl;
|
||||
import net.i2p.router.transport.OutboundMessageRegistry;
|
||||
import net.i2p.router.transport.TrivialBandwidthLimiter;
|
||||
import net.i2p.router.transport.VMCommSystem;
|
||||
import net.i2p.router.tunnelmanager.PoolingTunnelManagerFacade;
|
||||
|
||||
@ -44,7 +43,7 @@ public class RouterContext extends I2PAppContext {
|
||||
private ProfileOrganizer _profileOrganizer;
|
||||
private PeerManagerFacade _peerManagerFacade;
|
||||
private ProfileManager _profileManager;
|
||||
private BandwidthLimiter _bandwidthLimiter;
|
||||
private FIFOBandwidthLimiter _bandwidthLimiter;
|
||||
private TunnelManagerFacade _tunnelManager;
|
||||
private StatisticsManager _statPublisher;
|
||||
private Shitlist _shitlist;
|
||||
@ -79,7 +78,7 @@ public class RouterContext extends I2PAppContext {
|
||||
_profileOrganizer = new ProfileOrganizer(this);
|
||||
_peerManagerFacade = new PeerManagerFacadeImpl(this);
|
||||
_profileManager = new ProfileManagerImpl(this);
|
||||
_bandwidthLimiter = new TrivialBandwidthLimiter(this);
|
||||
_bandwidthLimiter = new FIFOBandwidthLimiter(this);
|
||||
_tunnelManager = new PoolingTunnelManagerFacade(this);
|
||||
_statPublisher = new StatisticsManager(this);
|
||||
_shitlist = new Shitlist(this);
|
||||
@ -169,7 +168,7 @@ public class RouterContext extends I2PAppContext {
|
||||
/**
|
||||
* Coordinate this router's bandwidth limits
|
||||
*/
|
||||
public BandwidthLimiter bandwidthLimiter() { return _bandwidthLimiter; }
|
||||
public FIFOBandwidthLimiter bandwidthLimiter() { return _bandwidthLimiter; }
|
||||
/**
|
||||
* Coordinate this router's tunnels (its pools, participation, backup, etc).
|
||||
* Any configuration for the tunnels is rooted from the context's properties
|
||||
|
@ -17,6 +17,7 @@ import net.i2p.router.RouterContext;
|
||||
|
||||
public class BandwidthLimitedInputStream extends FilterInputStream {
|
||||
private RouterIdentity _peer;
|
||||
private String _peerSource;
|
||||
private RouterContext _context;
|
||||
private boolean _pullFromOutbound;
|
||||
|
||||
@ -31,40 +32,41 @@ public class BandwidthLimitedInputStream extends FilterInputStream {
|
||||
super(source);
|
||||
_context = context;
|
||||
_peer = peer;
|
||||
_peerSource = peer.getHash().toBase64();
|
||||
_pullFromOutbound = pullFromOutbound;
|
||||
}
|
||||
|
||||
public int read() throws IOException {
|
||||
if (_pullFromOutbound)
|
||||
_context.bandwidthLimiter().delayOutbound(_peer, 1, true);
|
||||
_context.bandwidthLimiter().requestOutbound(1, _peerSource);
|
||||
else
|
||||
_context.bandwidthLimiter().delayInbound(_peer, 1);
|
||||
_context.bandwidthLimiter().requestInbound(1, _peerSource);
|
||||
return in.read();
|
||||
}
|
||||
|
||||
public int read(byte dest[]) throws IOException {
|
||||
int read = in.read(dest);
|
||||
if (_pullFromOutbound)
|
||||
_context.bandwidthLimiter().delayOutbound(_peer, read, true);
|
||||
_context.bandwidthLimiter().requestOutbound(read, _peerSource);
|
||||
else
|
||||
_context.bandwidthLimiter().delayInbound(_peer, read);
|
||||
_context.bandwidthLimiter().requestInbound(read, _peerSource);
|
||||
return read;
|
||||
}
|
||||
|
||||
public int read(byte dest[], int off, int len) throws IOException {
|
||||
int read = in.read(dest, off, len);
|
||||
if (_pullFromOutbound)
|
||||
_context.bandwidthLimiter().delayOutbound(_peer, read, true);
|
||||
_context.bandwidthLimiter().requestOutbound(read, _peerSource);
|
||||
else
|
||||
_context.bandwidthLimiter().delayInbound(_peer, read);
|
||||
_context.bandwidthLimiter().requestInbound(read, _peerSource);
|
||||
return read;
|
||||
}
|
||||
public long skip(long numBytes) throws IOException {
|
||||
long skip = in.skip(numBytes);
|
||||
if (_pullFromOutbound)
|
||||
_context.bandwidthLimiter().delayOutbound(_peer, (int)skip, true);
|
||||
_context.bandwidthLimiter().requestOutbound((int)skip, _peerSource);
|
||||
else
|
||||
_context.bandwidthLimiter().delayInbound(_peer, (int)skip);
|
||||
_context.bandwidthLimiter().requestInbound((int)skip, _peerSource);
|
||||
return skip;
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import net.i2p.util.Log;
|
||||
|
||||
public class BandwidthLimitedOutputStream extends FilterOutputStream {
|
||||
private RouterIdentity _peer;
|
||||
private String _peerTarget;
|
||||
private RouterContext _context;
|
||||
private Log _log;
|
||||
|
||||
@ -25,15 +26,14 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
|
||||
super(source);
|
||||
_context = context;
|
||||
_peer = peer;
|
||||
_peerTarget = peer.getHash().toBase64();
|
||||
_log = context.logManager().getLog(BandwidthLimitedOutputStream.class);
|
||||
}
|
||||
|
||||
private final static int CHUNK_SIZE = 1024;
|
||||
|
||||
public void write(int val) throws IOException {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Writing a single byte!", new Exception("Single byte from..."));
|
||||
_context.bandwidthLimiter().delayOutbound(_peer, 1, false);
|
||||
_context.bandwidthLimiter().requestOutbound(1, _peerTarget);
|
||||
out.write(val);
|
||||
}
|
||||
public void write(byte src[]) throws IOException {
|
||||
@ -47,21 +47,7 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream {
|
||||
if (len + off > src.length)
|
||||
throw new IllegalArgumentException("wtf are you thinking? len=" + len
|
||||
+ ", off=" + off + ", data=" + src.length);
|
||||
if (len <= CHUNK_SIZE) {
|
||||
_context.bandwidthLimiter().delayOutbound(_peer, len, false);
|
||||
out.write(src, off, len);
|
||||
} else {
|
||||
int i = 0;
|
||||
while (i+CHUNK_SIZE < len) {
|
||||
_context.bandwidthLimiter().delayOutbound(_peer, CHUNK_SIZE, false);
|
||||
out.write(src, off+i, CHUNK_SIZE);
|
||||
i += CHUNK_SIZE;
|
||||
}
|
||||
int remainder = (len % CHUNK_SIZE);
|
||||
if (remainder > 0) {
|
||||
_context.bandwidthLimiter().delayOutbound(_peer, remainder, false);
|
||||
out.write(src, i, remainder);
|
||||
}
|
||||
}
|
||||
_context.bandwidthLimiter().requestOutbound(len, _peerTarget);
|
||||
out.write(src, off, len);
|
||||
}
|
||||
}
|
||||
|
@ -1,45 +0,0 @@
|
||||
package net.i2p.router.transport;
|
||||
/*
|
||||
* free (adj.): unencumbered; not under the control of others
|
||||
* Written by jrandom in 2003 and released into the public domain
|
||||
* with no warranty of any kind, either expressed or implied.
|
||||
* It probably won't make your computer catch on fire, or eat
|
||||
* your children, but it might. Use at your own risk.
|
||||
*
|
||||
*/
|
||||
|
||||
import net.i2p.data.RouterIdentity;
|
||||
|
||||
/**
|
||||
* Coordinate the bandwidth limiting across all classes of peers.
|
||||
*
|
||||
*/
|
||||
public interface BandwidthLimiter {
|
||||
/**
|
||||
* Delay the required amount of time before returning so that receiving numBytes
|
||||
* from the peer will not violate the bandwidth limits
|
||||
*/
|
||||
public void delayInbound(RouterIdentity peer, int numBytes);
|
||||
|
||||
/**
|
||||
* Delay the required amount of time before returning so that sending numBytes
|
||||
* to the peer will not violate the bandwidth limits
|
||||
*
|
||||
* FIXME: Added 'pulled' to fix an oversight with regards to getTotalReceiveBytes().
|
||||
* BandwidthLimitedInputStream can pull from the outbound bandwidth, but
|
||||
* this leads to an incorrect value from getTotalReceiveBytes() with
|
||||
* TrivialBandwidthLimited. This is an inelegant solution, so fix it! =)
|
||||
*/
|
||||
public void delayOutbound(RouterIdentity peer, int numBytes, boolean pulled);
|
||||
|
||||
public long getTotalSendBytes();
|
||||
public long getTotalReceiveBytes();
|
||||
|
||||
|
||||
static final String PROP_INBOUND_BANDWIDTH = "i2np.bandwidth.inboundKBytesPerSecond";
|
||||
static final String PROP_OUTBOUND_BANDWIDTH = "i2np.bandwidth.outboundKBytesPerSecond";
|
||||
static final String PROP_INBOUND_BANDWIDTH_PEAK = "i2np.bandwidth.inboundBurstKBytes";
|
||||
static final String PROP_OUTBOUND_BANDWIDTH_PEAK = "i2np.bandwidth.outboundBurstKBytes";
|
||||
static final String PROP_REPLENISH_FREQUENCY = "i2np.bandwidth.replenishFrequency";
|
||||
static final String PROP_MIN_NON_ZERO_DELAY = "i2np.bandwidth.minimumNonZeroDelay";
|
||||
}
|
@ -0,0 +1,341 @@
|
||||
package net.i2p.router.transport;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.I2PThread;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class FIFOBandwidthLimiter {
|
||||
private Log _log;
|
||||
private I2PAppContext _context;
|
||||
private List _pendingInboundRequests;
|
||||
private List _pendingOutboundRequests;
|
||||
private volatile long _availableInboundBytes;
|
||||
private volatile long _availableOutboundBytes;
|
||||
private boolean _outboundUnlimited;
|
||||
private boolean _inboundUnlimited;
|
||||
private volatile long _totalAllocatedInboundBytes;
|
||||
private volatile long _totalAllocatedOutboundBytes;
|
||||
private long _maxInboundBytes;
|
||||
private long _maxOutboundBytes;
|
||||
private FIFOBandwidthRefiller _refiller;
|
||||
|
||||
private static int __id = 0;
|
||||
|
||||
public FIFOBandwidthLimiter(I2PAppContext context) {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(FIFOBandwidthLimiter.class);
|
||||
_pendingInboundRequests = new ArrayList(16);
|
||||
_pendingOutboundRequests = new ArrayList(16);
|
||||
_refiller = new FIFOBandwidthRefiller(_context, this);
|
||||
I2PThread t = new I2PThread(_refiller);
|
||||
t.setName("BWRefiller" + (++__id));
|
||||
t.setDaemon(true);
|
||||
t.setPriority(I2PThread.NORM_PRIORITY-1);
|
||||
t.start();
|
||||
}
|
||||
|
||||
public long getAvailableInboundBytes() { return _availableInboundBytes; }
|
||||
public long getAvailableOutboundBytes() { return _availableOutboundBytes; }
|
||||
public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes; }
|
||||
public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes; }
|
||||
public long getMaxInboundBytes() { return _maxInboundBytes; }
|
||||
public void setMaxInboundBytes(long numBytes) { _maxInboundBytes = numBytes; }
|
||||
public long getMaxOutboundBytes() { return _maxOutboundBytes; }
|
||||
public void setMaxOutboundBytes(long numBytes) { _maxOutboundBytes = numBytes; }
|
||||
public boolean getInboundUnlimited() { return _inboundUnlimited; }
|
||||
public void setInboundUnlimited(boolean isUnlimited) { _inboundUnlimited = isUnlimited; }
|
||||
public boolean getOutboundUnlimited() { return _outboundUnlimited; }
|
||||
public void setOutboundUnlimited(boolean isUnlimited) { _outboundUnlimited = isUnlimited; }
|
||||
|
||||
public void reinitialize() {
|
||||
_pendingInboundRequests.clear();
|
||||
_pendingOutboundRequests.clear();
|
||||
_availableInboundBytes = 0;
|
||||
_availableOutboundBytes = 0;
|
||||
_inboundUnlimited = false;
|
||||
_outboundUnlimited = false;
|
||||
_refiller.reinitialize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Request some bytes, blocking until they become available
|
||||
*
|
||||
*/
|
||||
public void requestInbound(int bytesIn, String purpose) {
|
||||
addInboundRequest(new SimpleRequest(bytesIn, 0, purpose));
|
||||
}
|
||||
/**
|
||||
* Request some bytes, blocking until they become available
|
||||
*
|
||||
*/
|
||||
public void requestOutbound(int bytesOut, String purpose) {
|
||||
addOutboundRequest(new SimpleRequest(0, bytesOut, purpose));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the request to the queue, blocking the requesting thread until
|
||||
* bandwidth is available (and all requests for bandwidth ahead of it have
|
||||
* been granted). Once sufficient bandwidth is available, this call will
|
||||
* return and request.grantRequest() will have been called.
|
||||
*
|
||||
*/
|
||||
private final void addInboundRequest(BandwidthRequest request) {
|
||||
synchronized (_pendingInboundRequests) {
|
||||
if ( (_pendingInboundRequests.size() <= 0) &&
|
||||
( (request.getRequestedInboundBytes() <= _availableInboundBytes) || (_inboundUnlimited) ) ) {
|
||||
// the queue is empty and there are sufficient bytes, grant 'em
|
||||
if (!_inboundUnlimited)
|
||||
_availableInboundBytes -= request.getRequestedInboundBytes();
|
||||
_totalAllocatedInboundBytes += request.getRequestedInboundBytes();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting inbound request " + request.getRequestName() + " immediately for "
|
||||
+ request.getRequestedInboundBytes());
|
||||
request.grantRequest();
|
||||
return;
|
||||
} else {
|
||||
_pendingInboundRequests.add(request);
|
||||
}
|
||||
}
|
||||
synchronized (request.getAvailabilityMonitor()) {
|
||||
while (!request.alreadyGranted()) {
|
||||
try {
|
||||
request.getAvailabilityMonitor().wait();
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the request to the queue, blocking the requesting thread until
|
||||
* bandwidth is available (and all requests for bandwidth ahead of it have
|
||||
* been granted). Once sufficient bandwidth is available, this call will
|
||||
* return and request.grantRequest() will have been called.
|
||||
*
|
||||
*/
|
||||
private final void addOutboundRequest(BandwidthRequest request) {
|
||||
synchronized (_pendingOutboundRequests) {
|
||||
if ( (_pendingOutboundRequests.size() <= 0) &&
|
||||
( (request.getRequestedOutboundBytes() <= _availableOutboundBytes) || (_outboundUnlimited) ) ) {
|
||||
// the queue is empty and there are sufficient bytes, grant 'em
|
||||
if (!_outboundUnlimited)
|
||||
_availableOutboundBytes -= request.getRequestedOutboundBytes();
|
||||
_totalAllocatedOutboundBytes += request.getRequestedOutboundBytes();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting outbound request " + request.getRequestName() + " immediately for "
|
||||
+ request.getRequestedOutboundBytes());
|
||||
request.grantRequest();
|
||||
return;
|
||||
} else {
|
||||
_pendingOutboundRequests.add(request);
|
||||
}
|
||||
}
|
||||
synchronized (request.getAvailabilityMonitor()) {
|
||||
while (!request.alreadyGranted()) {
|
||||
try {
|
||||
request.getAvailabilityMonitor().wait();
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* More bytes are available - add them to the queue and satisfy any requests
|
||||
* we can
|
||||
*/
|
||||
final void refillBandwidthQueues(long bytesInbound, long bytesOutbound) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Refilling the queues with " + bytesInbound + "/" + bytesOutbound);
|
||||
_availableInboundBytes += bytesInbound;
|
||||
_availableOutboundBytes += bytesOutbound;
|
||||
if (_availableInboundBytes > _maxInboundBytes)
|
||||
_availableInboundBytes = _maxInboundBytes;
|
||||
if (_availableOutboundBytes > _maxOutboundBytes)
|
||||
_availableOutboundBytes = _maxOutboundBytes;
|
||||
satisfyRequests();
|
||||
}
|
||||
|
||||
/**
|
||||
* Go through the queue, satisfying as many requests as possible (notifying
|
||||
* each one satisfied that the request has been granted).
|
||||
*/
|
||||
private final void satisfyRequests() {
|
||||
satisfyInboundRequests();
|
||||
satisfyOutboundRequests();
|
||||
}
|
||||
|
||||
private final void satisfyInboundRequests() {
|
||||
synchronized (_pendingInboundRequests) {
|
||||
while (_pendingInboundRequests.size() > 0) {
|
||||
BandwidthRequest req = (BandwidthRequest)_pendingInboundRequests.get(0);
|
||||
if ( (req.getRequestedInboundBytes() <= _availableInboundBytes) || (_inboundUnlimited) ) {
|
||||
_pendingInboundRequests.remove(0);
|
||||
if (!_inboundUnlimited)
|
||||
_availableInboundBytes -= req.getRequestedInboundBytes();
|
||||
_totalAllocatedInboundBytes += req.getRequestedInboundBytes();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting inbound request " + req.getRequestName() + " for "
|
||||
+ req.getRequestedInboundBytes() + " bytes (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms) pending " + _pendingInboundRequests.size());
|
||||
// i hate nested synchronization
|
||||
synchronized (req.getAvailabilityMonitor()) {
|
||||
req.grantRequest();
|
||||
req.getAvailabilityMonitor().notifyAll();
|
||||
}
|
||||
} else {
|
||||
// there isn't sufficient bandwidth for the first request,
|
||||
// so since we're a FIFO limiter, everyone waits. If we were a
|
||||
// best fit or ASAP limiter, we'd continue on iterating to see
|
||||
// if anyone would be satisfied with the current availability
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Still denying the first inbound request (" + req.getRequestName()
|
||||
+ " for "
|
||||
+ req.getRequestedInboundBytes() + " bytes (available "
|
||||
+ _availableInboundBytes + "/" + _availableOutboundBytes + " in/out) (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms so far) pending " + (_pendingInboundRequests.size()));
|
||||
return;
|
||||
}
|
||||
}
|
||||
//if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("Nothing pending");
|
||||
}
|
||||
}
|
||||
|
||||
private final void satisfyOutboundRequests() {
|
||||
synchronized (_pendingOutboundRequests) {
|
||||
while (_pendingOutboundRequests.size() > 0) {
|
||||
BandwidthRequest req = (BandwidthRequest)_pendingOutboundRequests.get(0);
|
||||
if ( (req.getRequestedOutboundBytes() <= _availableOutboundBytes) || (_outboundUnlimited) ) {
|
||||
_pendingOutboundRequests.remove(0);
|
||||
if (!_outboundUnlimited)
|
||||
_availableOutboundBytes -= req.getRequestedOutboundBytes();
|
||||
_totalAllocatedOutboundBytes += req.getRequestedOutboundBytes();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Granting outbound request " + req.getRequestName() + " for "
|
||||
+ req.getRequestedOutboundBytes() + " bytes (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms) pending " + (_pendingOutboundRequests.size()-1));
|
||||
// i hate nested synchronization
|
||||
synchronized (req.getAvailabilityMonitor()) {
|
||||
req.grantRequest();
|
||||
req.getAvailabilityMonitor().notifyAll();
|
||||
}
|
||||
} else {
|
||||
// there isn't sufficient bandwidth for the first request,
|
||||
// so since we're a FIFO limiter, everyone waits. If we were a
|
||||
// best fit or ASAP limiter, we'd continue on iterating to see
|
||||
// if anyone would be satisfied with the current availability
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Still denying the first outbound request (" + req.getRequestName()
|
||||
+ " for "
|
||||
+ req.getRequestedOutboundBytes() + " bytes (available "
|
||||
+ _availableInboundBytes + "/" + _availableOutboundBytes + " in/out) (waited "
|
||||
+ (_context.clock().now() - req.getRequestTime())
|
||||
+ "ms so far) pending " + (_pendingOutboundRequests.size()));
|
||||
return;
|
||||
}
|
||||
}
|
||||
//if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("Nothing pending");
|
||||
}
|
||||
}
|
||||
|
||||
public String renderStatusHTML() {
|
||||
long now = _context.clock().now();
|
||||
StringBuffer buf = new StringBuffer(4096);
|
||||
buf.append("<br /><b>Pending bandwidth requests (with ");
|
||||
buf.append(_availableInboundBytes).append('/');
|
||||
buf.append(_availableOutboundBytes).append(" bytes inbound/outbound available):</b><ul>");
|
||||
buf.append("<li>Inbound requests: <ol>");
|
||||
synchronized (_pendingInboundRequests) {
|
||||
for (int i = 0; i < _pendingInboundRequests.size(); i++) {
|
||||
BandwidthRequest req = (BandwidthRequest)_pendingInboundRequests.get(i);
|
||||
buf.append("<li>").append(req.getRequestName()).append(" for ");
|
||||
buf.append(req.getRequestedInboundBytes()).append(" bytes ");
|
||||
buf.append("requested ").append(now-req.getRequestTime());
|
||||
buf.append("ms ago</li>\n");
|
||||
}
|
||||
}
|
||||
buf.append("</ol></li><li>Outbound requests: <ol>\n");
|
||||
synchronized (_pendingOutboundRequests) {
|
||||
for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
|
||||
BandwidthRequest req = (BandwidthRequest)_pendingOutboundRequests.get(i);
|
||||
buf.append("<li>").append(req.getRequestName()).append(" for ");
|
||||
buf.append(req.getRequestedOutboundBytes()).append(" bytes ");
|
||||
buf.append("requested ").append(now-req.getRequestTime());
|
||||
buf.append("ms ago</li>\n");
|
||||
}
|
||||
}
|
||||
buf.append("</ol></li></ul>\n");
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
private static long __requestId = 0;
|
||||
private final class SimpleRequest implements BandwidthRequest {
|
||||
private boolean _alreadyGranted;
|
||||
private int _in;
|
||||
private int _out;
|
||||
private long _requestId;
|
||||
private long _requestTime;
|
||||
private String _target;
|
||||
|
||||
public SimpleRequest(int in, int out, String target) {
|
||||
_in = in;
|
||||
_out = out;
|
||||
_target = target;
|
||||
_alreadyGranted = false;
|
||||
_requestId = ++__requestId;
|
||||
_requestTime = _context.clock().now();
|
||||
}
|
||||
public boolean alreadyGranted() { return _alreadyGranted; }
|
||||
public Object getAvailabilityMonitor() { return SimpleRequest.this; }
|
||||
public String getRequestName() { return "Req" + _requestId + " to " + _target; }
|
||||
public int getRequestedInboundBytes() { return _in; }
|
||||
public int getRequestedOutboundBytes() { return _out; }
|
||||
public void grantRequest() { _alreadyGranted = true; }
|
||||
public long getRequestTime() { return _requestTime; }
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a request for bandwidth allocation
|
||||
*/
|
||||
private interface BandwidthRequest {
|
||||
/**
|
||||
* how can we summarize this request (in case we want to display a list
|
||||
* of 'whats pending')
|
||||
*/
|
||||
public String getRequestName();
|
||||
/**
|
||||
* How many bytes are we going to send away from the router
|
||||
*/
|
||||
public int getRequestedOutboundBytes();
|
||||
/**
|
||||
* How many bytes are we going to read from the network
|
||||
*/
|
||||
public int getRequestedInboundBytes();
|
||||
/**
|
||||
* Lock unique to this request that will be wait() & notified upon
|
||||
* during the queueing
|
||||
*/
|
||||
public Object getAvailabilityMonitor();
|
||||
/**
|
||||
* When was the bandwidth requested?
|
||||
*/
|
||||
public long getRequestTime();
|
||||
/**
|
||||
* must return true only if grantRequest has been called, else
|
||||
* false
|
||||
*/
|
||||
public boolean alreadyGranted();
|
||||
/**
|
||||
* flag this request to tell it that it has been or is about to be
|
||||
* allocated sufficient bytes. This should NOT be used as the notification
|
||||
* itself
|
||||
*/
|
||||
public void grantRequest();
|
||||
}
|
||||
}
|
@ -0,0 +1,246 @@
|
||||
package net.i2p.router.transport;
|
||||
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.I2PAppContext;
|
||||
|
||||
class FIFOBandwidthRefiller implements Runnable {
|
||||
private Log _log;
|
||||
private I2PAppContext _context;
|
||||
private FIFOBandwidthLimiter _limiter;
|
||||
/** how many KBps do we want to allow? */
|
||||
private long _inboundKBytesPerSecond;
|
||||
/** how many KBps do we want to allow? */
|
||||
private long _outboundKBytesPerSecond;
|
||||
/** how frequently do we want to replenish the available queues? */
|
||||
private long _replenishFrequency;
|
||||
/** when did we last replenish the queue? */
|
||||
private long _lastRefillTime;
|
||||
/** when did we last check the config for updates? */
|
||||
private long _lastCheckConfigTime;
|
||||
/** how frequently do we check the config for updates? */
|
||||
private long _configCheckPeriodMs = 60*1000;
|
||||
|
||||
public static final String PROP_INBOUND_BANDWIDTH = "i2np.bandwidth.inboundKBytesPerSecond";
|
||||
public static final String PROP_OUTBOUND_BANDWIDTH = "i2np.bandwidth.outboundKBytesPerSecond";
|
||||
public static final String PROP_INBOUND_BANDWIDTH_PEAK = "i2np.bandwidth.inboundBurstKBytes";
|
||||
public static final String PROP_OUTBOUND_BANDWIDTH_PEAK = "i2np.bandwidth.outboundBurstKBytes";
|
||||
public static final String PROP_REPLENISH_FREQUENCY = "i2np.bandwidth.replenishFrequencyMs";
|
||||
|
||||
/** For now, until there is some tuning and safe throttling, we set the floor at 6KBps inbound */
|
||||
public static final long MIN_INBOUND_BANDWIDTH = 6;
|
||||
/** For now, until there is some tuning and safe throttling, we set the floor at 6KBps outbound */
|
||||
public static final long MIN_OUTBOUND_BANDWIDTH = 6;
|
||||
/** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */
|
||||
public static final long MIN_INBOUND_BANDWIDTH_PEAK = 60;
|
||||
/** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */
|
||||
public static final long MIN_OUTBOUND_BANDWIDTH_PEAK = 60;
|
||||
/** Updating the bandwidth more than once a second is silly. once every 2 or 5 seconds is less so. */
|
||||
public static final long MIN_REPLENISH_FREQUENCY = 1000;
|
||||
|
||||
private static final long DEFAULT_REPLENISH_FREQUENCY = 10*1000;
|
||||
|
||||
public FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) {
|
||||
_limiter = limiter;
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
|
||||
reinitialize();
|
||||
}
|
||||
public void run() {
|
||||
// bootstrap 'em with nothing
|
||||
_lastRefillTime = _context.clock().now();
|
||||
while (true) {
|
||||
long now = _context.clock().now();
|
||||
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
|
||||
checkConfig();
|
||||
now = _context.clock().now();
|
||||
_lastCheckConfigTime = now;
|
||||
}
|
||||
|
||||
updateQueues(now);
|
||||
|
||||
_lastRefillTime = now;
|
||||
|
||||
try { Thread.sleep(_replenishFrequency); } catch (InterruptedException ie) {}
|
||||
}
|
||||
}
|
||||
|
||||
public void reinitialize() {
|
||||
_lastRefillTime = _context.clock().now();
|
||||
checkConfig();
|
||||
_lastCheckConfigTime = _lastRefillTime;
|
||||
}
|
||||
|
||||
private void updateQueues(long now) {
|
||||
long numMs = (now - _lastRefillTime);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Updating bandwidth after " + numMs + " (available in="
|
||||
+ _limiter.getAvailableInboundBytes() + ", out="
|
||||
+ _limiter.getAvailableOutboundBytes()+ ", rate in="
|
||||
+ _inboundKBytesPerSecond + ", out="
|
||||
+ _outboundKBytesPerSecond +")");
|
||||
if (numMs > 1000) {
|
||||
long inboundToAdd = 1024*_inboundKBytesPerSecond * (numMs/1000);
|
||||
long outboundToAdd = 1024*_outboundKBytesPerSecond * (numMs/1000);
|
||||
|
||||
if (inboundToAdd < 0) inboundToAdd = 0;
|
||||
if (outboundToAdd < 0) outboundToAdd = 0;
|
||||
|
||||
if (_inboundKBytesPerSecond <= 0) {
|
||||
_limiter.setInboundUnlimited(true);
|
||||
inboundToAdd = 0;
|
||||
} else {
|
||||
_limiter.setInboundUnlimited(false);
|
||||
}
|
||||
if (_outboundKBytesPerSecond <= 0) {
|
||||
_limiter.setOutboundUnlimited(true);
|
||||
outboundToAdd = 0;
|
||||
} else {
|
||||
_limiter.setOutboundUnlimited(false);
|
||||
}
|
||||
|
||||
_limiter.refillBandwidthQueues(inboundToAdd, outboundToAdd);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable");
|
||||
_log.debug("Adding " + outboundToAdd + " bytes to outboundAvailable");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkConfig() {
|
||||
updateInboundRate();
|
||||
updateOutboundRate();
|
||||
updateInboundPeak();
|
||||
updateOutboundPeak();
|
||||
updateReplenishFrequency();
|
||||
|
||||
if (_inboundKBytesPerSecond <= 0) {
|
||||
_limiter.setInboundUnlimited(true);
|
||||
} else {
|
||||
_limiter.setInboundUnlimited(false);
|
||||
}
|
||||
if (_outboundKBytesPerSecond <= 0) {
|
||||
_limiter.setOutboundUnlimited(true);
|
||||
} else {
|
||||
_limiter.setOutboundUnlimited(false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void updateInboundRate() {
|
||||
String inBwStr = _context.getProperty(PROP_INBOUND_BANDWIDTH);
|
||||
if ( (inBwStr != null) &&
|
||||
(inBwStr.trim().length() > 0) &&
|
||||
(!(inBwStr.equals(String.valueOf(_inboundKBytesPerSecond)))) ) {
|
||||
// bandwidth was specified *and* changed
|
||||
try {
|
||||
long in = Long.parseLong(inBwStr);
|
||||
if ( (in <= 0) || (in > MIN_INBOUND_BANDWIDTH) )
|
||||
_inboundKBytesPerSecond = in;
|
||||
else
|
||||
_inboundKBytesPerSecond = MIN_INBOUND_BANDWIDTH;
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid inbound bandwidth limit [" + inBwStr
|
||||
+ "], keeping as " + _inboundKBytesPerSecond);
|
||||
}
|
||||
} else {
|
||||
if ( (inBwStr == null) && (_log.shouldLog(Log.DEBUG)) )
|
||||
_log.debug("Inbound bandwidth limits not specified in the config via " + PROP_INBOUND_BANDWIDTH);
|
||||
}
|
||||
}
|
||||
private void updateOutboundRate() {
|
||||
String outBwStr = _context.getProperty(PROP_OUTBOUND_BANDWIDTH);
|
||||
|
||||
if ( (outBwStr != null) &&
|
||||
(outBwStr.trim().length() > 0) &&
|
||||
(!(outBwStr.equals(String.valueOf(_outboundKBytesPerSecond)))) ) {
|
||||
// bandwidth was specified *and* changed
|
||||
try {
|
||||
long out = Long.parseLong(outBwStr);
|
||||
if ( (out <= 0) || (out >= MIN_OUTBOUND_BANDWIDTH) )
|
||||
_outboundKBytesPerSecond = out;
|
||||
else
|
||||
_outboundKBytesPerSecond = MIN_OUTBOUND_BANDWIDTH;
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid outbound bandwidth limit [" + outBwStr
|
||||
+ "], keeping as " + _outboundKBytesPerSecond);
|
||||
}
|
||||
} else {
|
||||
if ( (outBwStr == null) && (_log.shouldLog(Log.DEBUG)) )
|
||||
_log.debug("Outbound bandwidth limits not specified in the config via " + PROP_OUTBOUND_BANDWIDTH);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateInboundPeak() {
|
||||
String inBwStr = _context.getProperty(PROP_INBOUND_BANDWIDTH_PEAK);
|
||||
if ( (inBwStr != null) &&
|
||||
(inBwStr.trim().length() > 0) &&
|
||||
(!(inBwStr.equals(String.valueOf(_limiter.getMaxInboundBytes())))) ) {
|
||||
// peak bw was specified *and* changed
|
||||
try {
|
||||
long in = Long.parseLong(inBwStr);
|
||||
if (in >= MIN_INBOUND_BANDWIDTH_PEAK)
|
||||
_limiter.setMaxInboundBytes(in * 1024);
|
||||
else
|
||||
_limiter.setMaxInboundBytes(MIN_INBOUND_BANDWIDTH_PEAK * 1024);
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid inbound bandwidth burst limit [" + inBwStr
|
||||
+ "]");
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Inbound bandwidth burst limits not specified in the config via "
|
||||
+ PROP_INBOUND_BANDWIDTH_PEAK);
|
||||
}
|
||||
}
|
||||
private void updateOutboundPeak() {
|
||||
String outBwStr = _context.getProperty(PROP_OUTBOUND_BANDWIDTH_PEAK);
|
||||
if ( (outBwStr != null) &&
|
||||
(outBwStr.trim().length() > 0) &&
|
||||
(!(outBwStr.equals(String.valueOf(_limiter.getMaxOutboundBytes())))) ) {
|
||||
// peak bw was specified *and* changed
|
||||
try {
|
||||
long out = Long.parseLong(outBwStr);
|
||||
if (out >= MIN_OUTBOUND_BANDWIDTH_PEAK)
|
||||
_limiter.setMaxOutboundBytes(out * 1024);
|
||||
else
|
||||
_limiter.setMaxOutboundBytes(MIN_OUTBOUND_BANDWIDTH_PEAK * 1024);
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid outbound bandwidth burst limit [" + outBwStr
|
||||
+ "]");
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Outbound bandwidth burst limits not specified in the config via "
|
||||
+ PROP_OUTBOUND_BANDWIDTH_PEAK);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateReplenishFrequency() {
|
||||
String freqMs = _context.getProperty(PROP_REPLENISH_FREQUENCY);
|
||||
if ( (freqMs != null) &&
|
||||
(freqMs.trim().length() > 0) &&
|
||||
(!(freqMs.equals(String.valueOf(_replenishFrequency)))) ) {
|
||||
// frequency was specified *and* changed
|
||||
try {
|
||||
long ms = Long.parseLong(freqMs);
|
||||
if (ms >= MIN_REPLENISH_FREQUENCY)
|
||||
_replenishFrequency = ms;
|
||||
else
|
||||
_replenishFrequency = MIN_REPLENISH_FREQUENCY;
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid replenish frequency [" + freqMs
|
||||
+ "]");
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Replenish frequency not specified in the config via " + PROP_REPLENISH_FREQUENCY);
|
||||
_replenishFrequency = DEFAULT_REPLENISH_FREQUENCY;
|
||||
}
|
||||
}
|
||||
}
|
@ -9,6 +9,7 @@ package net.i2p.router.transport;
|
||||
*/
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -174,7 +175,9 @@ public class TransportManager implements TransportEventListener {
|
||||
private List orderBids(HashSet bids, OutNetMessage msg) {
|
||||
// db messages should go as fast as possible, while the others
|
||||
// should use as little bandwidth as possible.
|
||||
switch (msg.getMessage().getType()) {
|
||||
I2NPMessage message = msg.getMessage();
|
||||
if (message == null) return Collections.EMPTY_LIST;
|
||||
switch (message.getType()) {
|
||||
case DatabaseLookupMessage.MESSAGE_TYPE:
|
||||
case DatabaseSearchReplyMessage.MESSAGE_TYPE:
|
||||
case DatabaseStoreMessage.MESSAGE_TYPE:
|
||||
|
@ -1,424 +0,0 @@
|
||||
package net.i2p.router.transport;
|
||||
/*
|
||||
* free (adj.): unencumbered; not under the control of others
|
||||
* Written by jrandom in 2003 and released into the public domain
|
||||
* with no warranty of any kind, either expressed or implied.
|
||||
* It probably won't make your computer catch on fire, or eat
|
||||
* your children, but it might. Use at your own risk.
|
||||
*
|
||||
*/
|
||||
|
||||
import net.i2p.data.RouterIdentity;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Coordinate the bandwidth limiting across all classes of peers. Currently
|
||||
* treats everything as open (aka doesn't limit)
|
||||
*
|
||||
*/
|
||||
public class TrivialBandwidthLimiter implements BandwidthLimiter {
|
||||
private Log _log;
|
||||
private RouterContext _context;
|
||||
|
||||
/** how many bytes can we read from the network without blocking? */
|
||||
private volatile long _inboundAvailable;
|
||||
/** how many bytes can we write to the network without blocking? */
|
||||
private volatile long _outboundAvailable;
|
||||
/** how large will we let the inboundAvailable queue grow? */
|
||||
private volatile long _inboundBurstBytes;
|
||||
/** how large will we let the outboundAvailable queue grow? */
|
||||
private volatile long _outboundBurstBytes;
|
||||
/** how many bytes have we ever read from the network? */
|
||||
private volatile long _totalInboundBytes;
|
||||
/** how many bytes have we ever written to the network? */
|
||||
private volatile long _totalOutboundBytes;
|
||||
/** how many KBps do we want to allow? */
|
||||
private long _inboundKBytesPerSecond;
|
||||
/** how many KBps do we want to allow? */
|
||||
private long _outboundKBytesPerSecond;
|
||||
/** how frequently do we want to replenish the available queues? */
|
||||
private long _replenishFrequency;
|
||||
private long _minNonZeroDelay;
|
||||
/**
|
||||
* when did we last replenish the available queues (since it wont
|
||||
* likely exactly match the replenish frequency)?
|
||||
*/
|
||||
private volatile long _lastResync;
|
||||
/** when did we last update the limits? */
|
||||
private long _lastUpdateLimits;
|
||||
|
||||
/**
|
||||
* notify this object whenever we need bandwidth and we'll refresh the pool
|
||||
* (though not necessarily with sufficient or even any bytes)
|
||||
*
|
||||
*/
|
||||
private Object _updateBwLock = new Object();
|
||||
|
||||
/**
|
||||
* wait on this when we want outbound bytes, and notify
|
||||
* it when more bytes are available
|
||||
*/
|
||||
private Object _outboundWaitLock = new Object();
|
||||
/**
|
||||
* wait on this when we want inbound bytes, and notify
|
||||
* it when more bytes are available
|
||||
*/
|
||||
private Object _inboundWaitLock = new Object();
|
||||
|
||||
private static final long DEFAULT_REPLENISH_FREQUENCY = 1*1000;
|
||||
private static final long DEFAULT_MIN_NON_ZERO_DELAY = 1*1000;
|
||||
|
||||
private static final long UPDATE_LIMIT_FREQUENCY = 60*1000;
|
||||
|
||||
public TrivialBandwidthLimiter(RouterContext ctx) {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(TrivialBandwidthLimiter.class);
|
||||
_inboundAvailable = 0;
|
||||
_outboundAvailable = 0;
|
||||
_inboundBurstBytes = -1;
|
||||
_outboundBurstBytes = -1;
|
||||
_inboundKBytesPerSecond = -1;
|
||||
_outboundKBytesPerSecond = -1;
|
||||
_totalInboundBytes = 0;
|
||||
_totalOutboundBytes = 0;
|
||||
_replenishFrequency = DEFAULT_REPLENISH_FREQUENCY;
|
||||
_lastResync = ctx.clock().now();
|
||||
|
||||
updateLimits();
|
||||
I2PThread bwThread = new I2PThread(new UpdateBWRunner());
|
||||
bwThread.setDaemon(true);
|
||||
bwThread.setPriority(I2PThread.MIN_PRIORITY);
|
||||
bwThread.setName("BW Updater");
|
||||
bwThread.start();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Delay the required amount of time before returning so that receiving numBytes
|
||||
* from the peer will not violate the bandwidth limits
|
||||
*/
|
||||
public void delayInbound(RouterIdentity peer, int numBytes) {
|
||||
long delay = 0;
|
||||
while ( (delay = calculateDelayInbound(peer, numBytes)) > 0) {
|
||||
try {
|
||||
synchronized (_inboundWaitLock) {
|
||||
_inboundWaitLock.wait(delay);
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
synchronized (_inboundWaitLock) { _inboundWaitLock.notify(); }
|
||||
consumeInbound(peer, numBytes);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Delay the required amount of time before returning so that sending numBytes
|
||||
* to the peer will not violate the bandwidth limits
|
||||
*
|
||||
* FIXME: 'pulled' was added. See FIXME in BandwidthLimiter
|
||||
*/
|
||||
public void delayOutbound(RouterIdentity peer, int numBytes, boolean pulled) {
|
||||
long delay = 0;
|
||||
while ( (delay = calculateDelayOutbound(peer, numBytes)) > 0) {
|
||||
try {
|
||||
synchronized (_outboundWaitLock) {
|
||||
_outboundWaitLock.wait(delay);
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
synchronized (_outboundWaitLock) { _outboundWaitLock.notify(); }
|
||||
consumeOutbound(peer, numBytes, pulled);
|
||||
}
|
||||
|
||||
public long getTotalSendBytes() { return _totalOutboundBytes; }
|
||||
public long getTotalReceiveBytes() { return _totalInboundBytes; }
|
||||
|
||||
/**
|
||||
* Return how many milliseconds to wait before receiving/processing numBytes from the peer
|
||||
*/
|
||||
private long calculateDelayInbound(RouterIdentity peer, int numBytes) {
|
||||
if (_inboundKBytesPerSecond <= 0) return 0;
|
||||
if (_inboundAvailable - numBytes > 0) {
|
||||
// we have bytes available
|
||||
return 0;
|
||||
} else {
|
||||
// we don't have sufficient bytes.
|
||||
// the delay = 1000*(bytes needed/bytes per second)
|
||||
double val = 1000.0*(((double)numBytes-(double)_inboundAvailable)/((double)_inboundKBytesPerSecond*1024));
|
||||
long rv = (long)Math.ceil(val);
|
||||
if ( (rv > 0) && (rv < _minNonZeroDelay) )
|
||||
rv = _minNonZeroDelay;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("DelayInbound: " + rv + " for " + numBytes + " (avail="
|
||||
+ _inboundAvailable + ", max=" + _inboundBurstBytes + ", kbps=" + _inboundKBytesPerSecond + ")");
|
||||
// we will want to replenish before this requestor comes back for the data
|
||||
if (rv < _replenishFrequency)
|
||||
synchronized (_updateBwLock) { _updateBwLock.notify(); }
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return how many milliseconds to wait before sending numBytes to the peer
|
||||
*/
|
||||
private long calculateDelayOutbound(RouterIdentity peer, int numBytes) {
|
||||
if (_outboundKBytesPerSecond <= 0) return 0;
|
||||
if (_outboundAvailable - numBytes > 0) {
|
||||
// we have bytes available
|
||||
return 0;
|
||||
} else {
|
||||
// we don't have sufficient bytes.
|
||||
// lets make sure...
|
||||
// the delay = 1000*(bytes needed/bytes per second)
|
||||
long avail = _outboundAvailable;
|
||||
double val = 1000.0*(((double)numBytes-(double)avail)/((double)_outboundKBytesPerSecond*1024.0));
|
||||
long rv = (long)Math.ceil(val);
|
||||
if ( (rv > 0) && (rv < _minNonZeroDelay) )
|
||||
rv = _minNonZeroDelay;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("DelayOutbound: " + rv + " for " + numBytes + " (avail="
|
||||
+ avail + ", max=" + _outboundBurstBytes + ", kbps=" + _outboundKBytesPerSecond + ")");
|
||||
// we will want to replenish before this requestor comes back for the data
|
||||
if (rv < _replenishFrequency)
|
||||
synchronized (_updateBwLock) { _updateBwLock.notify(); }
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Note that numBytes have been read from the peer
|
||||
*/
|
||||
private void consumeInbound(RouterIdentity peer, int numBytes) {
|
||||
if (numBytes > 0)
|
||||
_totalInboundBytes += numBytes;
|
||||
if (_inboundKBytesPerSecond > 0)
|
||||
_inboundAvailable -= numBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Note that numBytes have been sent to the peer
|
||||
*/
|
||||
private void consumeOutbound(RouterIdentity peer, int numBytes, boolean pulled) {
|
||||
if (numBytes > 0)
|
||||
{
|
||||
if (pulled) { // FIXME: fix to give the correct value from getTotalReceiveBytes()
|
||||
_totalInboundBytes += numBytes;
|
||||
} else {
|
||||
_totalOutboundBytes += numBytes;
|
||||
}
|
||||
}
|
||||
if (_outboundKBytesPerSecond > 0)
|
||||
_outboundAvailable -= numBytes;
|
||||
}
|
||||
|
||||
private void updateLimits() {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Updating rates for the bw limiter");
|
||||
|
||||
_lastUpdateLimits = _context.clock().now();
|
||||
updateInboundRate();
|
||||
updateOutboundRate();
|
||||
updateInboundPeak();
|
||||
updateOutboundPeak();
|
||||
updateReplenishFrequency();
|
||||
updateMinNonZeroDelay();
|
||||
}
|
||||
|
||||
private void updateInboundRate() {
|
||||
String inBwStr = _context.getProperty(PROP_INBOUND_BANDWIDTH);
|
||||
if ( (inBwStr != null) &&
|
||||
(inBwStr.trim().length() > 0) &&
|
||||
(!(inBwStr.equals(String.valueOf(_inboundKBytesPerSecond)))) ) {
|
||||
// bandwidth was specified *and* changed
|
||||
try {
|
||||
long in = Long.parseLong(inBwStr);
|
||||
if (in >= 0) {
|
||||
_inboundKBytesPerSecond = in;
|
||||
}
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid inbound bandwidth limit [" + inBwStr
|
||||
+ "], keeping as " + _inboundKBytesPerSecond);
|
||||
}
|
||||
} else {
|
||||
if ( (inBwStr == null) && (_log.shouldLog(Log.DEBUG)) )
|
||||
_log.debug("Inbound bandwidth limits not specified in the config via " + PROP_INBOUND_BANDWIDTH);
|
||||
}
|
||||
}
|
||||
private void updateOutboundRate() {
|
||||
String outBwStr = _context.getProperty(PROP_OUTBOUND_BANDWIDTH);
|
||||
|
||||
if ( (outBwStr != null) &&
|
||||
(outBwStr.trim().length() > 0) &&
|
||||
(!(outBwStr.equals(String.valueOf(_outboundKBytesPerSecond)))) ) {
|
||||
// bandwidth was specified *and* changed
|
||||
try {
|
||||
long out = Long.parseLong(outBwStr);
|
||||
_outboundKBytesPerSecond = out;
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid outbound bandwidth limit [" + outBwStr
|
||||
+ "], keeping as " + _outboundKBytesPerSecond);
|
||||
}
|
||||
} else {
|
||||
if ( (outBwStr == null) && (_log.shouldLog(Log.DEBUG)) )
|
||||
_log.debug("Outbound bandwidth limits not specified in the config via " + PROP_OUTBOUND_BANDWIDTH);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateInboundPeak() {
|
||||
String inBwStr = _context.getProperty(PROP_INBOUND_BANDWIDTH_PEAK);
|
||||
if ( (inBwStr != null) &&
|
||||
(inBwStr.trim().length() > 0) &&
|
||||
(!(inBwStr.equals(String.valueOf(_inboundBurstBytes)))) ) {
|
||||
// peak bw was specified *and* changed
|
||||
try {
|
||||
long in = Long.parseLong(inBwStr);
|
||||
_inboundBurstBytes = in * 1024;
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid inbound bandwidth burst limit [" + inBwStr
|
||||
+ "], keeping as " + _inboundBurstBytes);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Inbound bandwidth burst limits not specified in the config via " + PROP_INBOUND_BANDWIDTH_PEAK);
|
||||
}
|
||||
}
|
||||
private void updateOutboundPeak() {
|
||||
String outBwStr = _context.getProperty(PROP_OUTBOUND_BANDWIDTH_PEAK);
|
||||
if ( (outBwStr != null) &&
|
||||
(outBwStr.trim().length() > 0) &&
|
||||
(!(outBwStr.equals(String.valueOf(_outboundBurstBytes)))) ) {
|
||||
// peak bw was specified *and* changed
|
||||
try {
|
||||
long out = Long.parseLong(outBwStr);
|
||||
if (out >= 0) {
|
||||
_outboundBurstBytes = out * 1024;
|
||||
}
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid outbound bandwidth burst limit [" + outBwStr
|
||||
+ "], keeping as " + _outboundBurstBytes);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Outbound bandwidth burst limits not specified in the config via " + PROP_OUTBOUND_BANDWIDTH_PEAK);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateReplenishFrequency() {
|
||||
String freqMs = _context.getProperty(PROP_REPLENISH_FREQUENCY);
|
||||
if ( (freqMs != null) &&
|
||||
(freqMs.trim().length() > 0) &&
|
||||
(!(freqMs.equals(String.valueOf(_replenishFrequency)))) ) {
|
||||
// frequency was specified *and* changed
|
||||
try {
|
||||
long ms = Long.parseLong(freqMs);
|
||||
if (ms >= 0) {
|
||||
_replenishFrequency = ms;
|
||||
}
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid replenish frequency [" + freqMs
|
||||
+ "], keeping as " + _replenishFrequency);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Replenish frequency not specified in the config via " + PROP_REPLENISH_FREQUENCY);
|
||||
_replenishFrequency = DEFAULT_REPLENISH_FREQUENCY;
|
||||
}
|
||||
}
|
||||
|
||||
private void updateMinNonZeroDelay() {
|
||||
String delayMs = _context.getProperty(PROP_MIN_NON_ZERO_DELAY);
|
||||
if ( (delayMs != null) &&
|
||||
(delayMs.trim().length() > 0) &&
|
||||
(!(delayMs.equals(String.valueOf(_minNonZeroDelay)))) ) {
|
||||
// delay was specified *and* changed
|
||||
try {
|
||||
long ms = Long.parseLong(delayMs);
|
||||
if (ms >= 0) {
|
||||
_minNonZeroDelay = ms;
|
||||
}
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid minimum nonzero delay [" + delayMs
|
||||
+ "], keeping as " + _minNonZeroDelay);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Minimum nonzero delay not specified in the config via " + PROP_MIN_NON_ZERO_DELAY);
|
||||
_minNonZeroDelay = DEFAULT_MIN_NON_ZERO_DELAY;
|
||||
}
|
||||
}
|
||||
|
||||
public void reinitialize() {
|
||||
_inboundAvailable = 0;
|
||||
_inboundBurstBytes = 0;
|
||||
_inboundKBytesPerSecond = -1;
|
||||
_lastResync = _context.clock().now();
|
||||
_lastUpdateLimits = -1;
|
||||
_minNonZeroDelay = DEFAULT_MIN_NON_ZERO_DELAY;
|
||||
_outboundAvailable = 0;
|
||||
_outboundBurstBytes = 0;
|
||||
_outboundKBytesPerSecond = -1;
|
||||
_replenishFrequency = DEFAULT_REPLENISH_FREQUENCY;
|
||||
_totalInboundBytes = 0;
|
||||
_totalOutboundBytes = 0;
|
||||
updateLimits();
|
||||
updateBW();
|
||||
}
|
||||
|
||||
private void updateBW() {
|
||||
long now = _context.clock().now();
|
||||
long numMs = (now - _lastResync);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Updating bandwidth after " + numMs + " (available in=" + _inboundAvailable + ", out=" + _outboundAvailable + ", rate in=" + _inboundKBytesPerSecond + ", out=" + _outboundKBytesPerSecond +")");
|
||||
if (numMs > 1000) {
|
||||
long inboundToAdd = 1024*_inboundKBytesPerSecond * (numMs/1000);
|
||||
long outboundToAdd = 1024*_outboundKBytesPerSecond * (numMs/1000);
|
||||
|
||||
if (inboundToAdd < 0) inboundToAdd = 0;
|
||||
if (outboundToAdd < 0) outboundToAdd = 0;
|
||||
|
||||
_inboundAvailable += inboundToAdd;
|
||||
_outboundAvailable += outboundToAdd;
|
||||
|
||||
if (_inboundAvailable > _inboundBurstBytes)
|
||||
_inboundAvailable = _inboundBurstBytes;
|
||||
if (_outboundAvailable > _outboundBurstBytes)
|
||||
_outboundAvailable = _outboundBurstBytes;
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable (current: " + _inboundAvailable + ")");
|
||||
_log.debug("Adding " + outboundToAdd + " bytes to outboundAvailable (current: " + _outboundAvailable + ")");
|
||||
}
|
||||
|
||||
if (inboundToAdd > 0) synchronized (_inboundWaitLock) { _inboundWaitLock.notify(); }
|
||||
if (outboundToAdd > 0) synchronized (_outboundWaitLock) { _outboundWaitLock.notify(); }
|
||||
}
|
||||
_lastResync = now;
|
||||
}
|
||||
|
||||
private class UpdateBWRunner implements Runnable {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
synchronized (_updateBwLock) {
|
||||
_updateBwLock.wait(_replenishFrequency);
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
try {
|
||||
updateBW();
|
||||
if (_context.clock().now() > _lastUpdateLimits + UPDATE_LIMIT_FREQUENCY)
|
||||
updateLimits();
|
||||
} catch (Exception e) {
|
||||
_log.log(Log.CRIT, "Error updating bandwidth!", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -128,7 +128,7 @@ class PHTTPPoller {
|
||||
byte authData[] = getAuthData();
|
||||
if (authData == null) return 0;
|
||||
|
||||
_context.bandwidthLimiter().delayOutbound(null, authData.length + 512, false); // HTTP overhead
|
||||
//_context.bandwidthLimiter().delayOutbound(null, authData.length + 512, false); // HTTP overhead
|
||||
|
||||
try {
|
||||
_log.debug("Before opening " + _pollURL.toExternalForm());
|
||||
@ -226,7 +226,7 @@ class PHTTPPoller {
|
||||
}
|
||||
}
|
||||
|
||||
_context.bandwidthLimiter().delayInbound(null, bytesRead);
|
||||
//_context.bandwidthLimiter().delayInbound(null, bytesRead);
|
||||
|
||||
return numSuccessful;
|
||||
} catch (Throwable t) {
|
||||
|
@ -113,7 +113,7 @@ class PHTTPSender {
|
||||
byte data[] = getData(msg);
|
||||
if (data == null) return false;
|
||||
|
||||
_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), data.length+512, false); // HTTP overhead
|
||||
//_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), data.length+512, false); // HTTP overhead
|
||||
|
||||
con.setRequestProperty("Content-length", ""+data.length);
|
||||
OutputStream out = con.getOutputStream();
|
||||
@ -176,8 +176,8 @@ class PHTTPSender {
|
||||
URL checkStatusURL = new URL(checkURLStr);
|
||||
long delay = RECHECK_DELAY;
|
||||
do {
|
||||
_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), 512, false); // HTTP overhead
|
||||
_context.bandwidthLimiter().delayInbound(msg.getTarget().getIdentity(), 512); // HTTP overhead
|
||||
//_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), 512, false); // HTTP overhead
|
||||
//_context.bandwidthLimiter().delayInbound(msg.getTarget().getIdentity(), 512); // HTTP overhead
|
||||
|
||||
_log.debug("Checking delivery at " + checkURLStr);
|
||||
HttpURLConnection con = (HttpURLConnection)checkStatusURL.openConnection();
|
||||
|
@ -160,8 +160,8 @@ public class PHTTPTransport extends TransportImpl {
|
||||
_context.router().getRouterInfo().getIdentity().writeBytes(baos);
|
||||
int postLength = baos.size();
|
||||
|
||||
_context.bandwidthLimiter().delayOutbound(null, postLength+512, false); // HTTP overhead
|
||||
_context.bandwidthLimiter().delayInbound(null, 2048+512); // HTTP overhead
|
||||
//_context.bandwidthLimiter().delayOutbound(null, postLength+512, false); // HTTP overhead
|
||||
//_context.bandwidthLimiter().delayInbound(null, 2048+512); // HTTP overhead
|
||||
|
||||
long now = _context.clock().now();
|
||||
_log.debug("Before opening " + _myRegisterURL);
|
||||
|
@ -10,6 +10,8 @@ package net.i2p.router.transport.tcp;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.math.BigInteger;
|
||||
import java.net.Socket;
|
||||
@ -22,6 +24,8 @@ import net.i2p.data.DataHelper;
|
||||
import net.i2p.data.RouterIdentity;
|
||||
import net.i2p.router.Router;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.transport.BandwidthLimitedInputStream;
|
||||
import net.i2p.router.transport.BandwidthLimitedOutputStream;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
@ -43,7 +47,7 @@ class RestrictiveTCPConnection extends TCPConnection {
|
||||
private final static long PROTO_ID = 12;
|
||||
|
||||
/** read / write buffer size */
|
||||
private final static int BUF_SIZE = 4*1024;
|
||||
private final static int BUF_SIZE = 32*1024;
|
||||
|
||||
private boolean validateVersion() throws DataFormatException, IOException {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before validating version");
|
||||
@ -291,12 +295,9 @@ class RestrictiveTCPConnection extends TCPConnection {
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("TCP connection " + _id + " established with " + _remoteIdentity.getHash().toBase64());
|
||||
//_in = new AESInputStream(_context, new BandwidthLimitedInputStream(_context, _in, _remoteIdentity), _key, _iv);
|
||||
//_out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv);
|
||||
//_in = new BandwidthLimitedInputStream(_context, _in, _remoteIdentity);
|
||||
//_out = new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE);
|
||||
_in = new AESInputStream(_context, _in, _key, _iv);
|
||||
_out = new AESOutputStream(_context, _out, _key, _iv);
|
||||
|
||||
_in = new BandwidthLimitedInputStream(_context, new AESInputStream(_context, _in, _key, _iv), _remoteIdentity);
|
||||
_out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv);
|
||||
_socket.setSoTimeout(0);
|
||||
success = _context.clock().now();
|
||||
established();
|
||||
|
@ -521,8 +521,9 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
|
||||
byte data[] = msg.getMessageData();
|
||||
if (data == null) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("wtf, for some reason, an I2NPMessage couldn't be serialized...");
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("message " + msg.getMessageType() + "/" + msg.getMessageId() + "expired before it could be sent");
|
||||
_transport.afterSend(msg, false, false);
|
||||
return true;
|
||||
}
|
||||
msg.timestamp("TCPConnection.runner.processSlice before sending "
|
||||
@ -531,6 +532,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
_log.debug("Sending " + data.length + " bytes in the slice... to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
|
||||
long exp = msg.getMessage().getMessageExpiration().getTime();
|
||||
|
||||
long beforeWrite = 0;
|
||||
try {
|
||||
synchronized (_out) {
|
||||
@ -545,14 +548,13 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
return false;
|
||||
}
|
||||
|
||||
long exp = msg.getMessage().getMessageExpiration().getTime();
|
||||
long end = _context.clock().now();
|
||||
long timeLeft = exp - end;
|
||||
|
||||
msg.timestamp("TCPConnection.runner.processSlice sent and flushed");
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Message " + msg.getMessage().getClass().getName()
|
||||
_log.info("Message " + msg.getMessageType()
|
||||
+ " (expiring in " + timeLeft + "ms) sent to "
|
||||
+ _remoteIdentity.getHash().toBase64() + " from "
|
||||
+ _context.routerHash().toBase64()
|
||||
|
Reference in New Issue
Block a user