- Continue work to use priorities in FIFOBandwidthLimiter

- Log tweaks
This commit is contained in:
zzz
2012-10-28 12:10:24 +00:00
parent 6868047ee4
commit c4fa8fabb2
7 changed files with 30 additions and 53 deletions

View File

@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.router.util.PQEntry;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@ -59,7 +60,6 @@ public class FIFOBandwidthLimiter {
private final AtomicLong _totalAllocatedInboundBytes = new AtomicLong();
/** lifetime counter of bytes sent */
private final AtomicLong _totalAllocatedOutboundBytes = new AtomicLong();
private static final AtomicLong __requestId = new AtomicLong();
/** lifetime counter of tokens available for use but exceeded our maxInboundBurst size */
//private final AtomicLong _totalWastedInboundBytes = new AtomicLong();
@ -190,8 +190,6 @@ public class FIFOBandwidthLimiter {
return _refiller.getCurrentParticipatingBandwidth();
}
public Request createRequest() { return new SimpleRequest(); }
/**
* Request some bytes. Does not block.
*/
@ -199,11 +197,7 @@ public class FIFOBandwidthLimiter {
// try to satisfy without grabbing the global lock
if (shortcutSatisfyInboundRequest(bytesIn))
return _noop;
return requestInbound(bytesIn, purpose, null, null);
}
public Request requestInbound(int bytesIn, String purpose, CompleteListener lsnr, Object attachment) {
SimpleRequest req = new SimpleRequest(bytesIn, purpose, lsnr, attachment);
SimpleRequest req = new SimpleRequest(bytesIn, 0);
requestInbound(req, bytesIn, purpose);
return req;
}
@ -230,15 +224,11 @@ public class FIFOBandwidthLimiter {
/**
* Request some bytes. Does not block.
*/
public Request requestOutbound(int bytesOut, String purpose) {
public Request requestOutbound(int bytesOut, int priority, String purpose) {
// try to satisfy without grabbing the global lock
if (shortcutSatisfyOutboundRequest(bytesOut))
return _noop;
return requestOutbound(bytesOut, purpose, null, null);
}
public Request requestOutbound(int bytesOut, String purpose, CompleteListener lsnr, Object attachment) {
SimpleRequest req = new SimpleRequest(bytesOut, purpose, lsnr, attachment);
SimpleRequest req = new SimpleRequest(bytesOut, priority);
requestOutbound(req, bytesOut, purpose);
return req;
}
@ -789,44 +779,24 @@ public class FIFOBandwidthLimiter {
private static class SimpleRequest implements Request {
private int _allocated;
private int _total;
private final int _total;
private long _requestId;
private long _requestTime;
private String _target;
private int _allocationsSinceWait;
private boolean _aborted;
private boolean _waited;
final List<Request> satisfiedBuffer;
private CompleteListener _lsnr;
private Object _attachment;
private final int _priority;
public SimpleRequest() {
satisfiedBuffer = new ArrayList(1);
init(0, null);
}
/**
* @param target for debugging, to be removed
*/
public SimpleRequest(int bytes, String target, CompleteListener lsnr, Object attachment) {
public SimpleRequest(int bytes, int priority) {
satisfiedBuffer = new ArrayList(1);
_lsnr = lsnr;
_attachment = attachment;
init(bytes, target);
}
/**
* @param target for debugging, to be removed
*/
public void init(int bytes, String target) {
_waited = false;
_total = bytes;
_allocated = 0;
_aborted = false;
_target = target;
satisfiedBuffer.clear();
_requestId = __requestId.incrementAndGet();
_requestTime = System.currentTimeMillis();
_priority = priority;
}
public long getRequestTime() { return _requestTime; }
@ -900,17 +870,22 @@ public class FIFOBandwidthLimiter {
public void attach(Object obj) { _attachment = obj; }
public Object attachment() { return _attachment; }
// PQEntry methods
public int getPriority() { return _priority; };
public void setSeqNum(long num) { _requestId = num; };
public long getSeqNum() { return _requestId; };
@Override
public String toString() {
return "Req" + _requestId + " to " + _target +
_allocated + '/' + _total;
return "Req" + _requestId + " priority " + _priority +
_allocated + '/' + _total + " bytes";
}
}
/**
* A bandwidth request, either inbound or outbound.
*/
public interface Request {
public interface Request extends PQEntry {
/** when was the request made? */
public long getRequestTime();
/** how many bytes were requested? */
@ -923,8 +898,6 @@ public class FIFOBandwidthLimiter {
public void abort();
/** was this request aborted? */
public boolean getAborted();
/** thar be dragons */
public void init(int bytes, String target);
public void setCompleteListener(CompleteListener lsnr);
/** Only supported if the request is not satisfied */
public void attach(Object obj);
@ -947,7 +920,6 @@ public class FIFOBandwidthLimiter {
public long getRequestTime() { return 0; }
public int getTotalRequested() { return 0; }
public void waitForNextAllocation() {}
public void init(int bytes, String target) {}
public CompleteListener getCompleteListener() { return null; }
public void setCompleteListener(CompleteListener lsnr) {
lsnr.complete(NoopRequest.this);
@ -956,5 +928,9 @@ public class FIFOBandwidthLimiter {
throw new UnsupportedOperationException("Don't attach to a satisfied request");
}
public Object attachment() { return null; }
// PQEntry methods
public int getPriority() { return 0; };
public void setSeqNum(long num) {};
public long getSeqNum() { return 0; };
}
}

View File

@ -375,7 +375,7 @@ class EventPumper implements Runnable {
*/
public void wantsWrite(NTCPConnection con, byte data[]) {
ByteBuffer buf = ByteBuffer.wrap(data);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write");//con, buf);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, 0, "NTCP write");//con, buf);
if (req.getPendingRequested() > 0) {
if (_log.shouldLog(Log.INFO))
_log.info("queued write on " + con + " for " + data.length);

View File

@ -824,8 +824,8 @@ class EstablishmentManager {
try {
state.generateSessionKey();
} catch (DHSessionKeyBuilder.InvalidPublicParameterException ippe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Peer " + state + " sent us an invalid DH parameter (or were spoofed)", ippe);
if (_log.shouldLog(Log.WARN))
_log.warn("Peer " + state + " sent us an invalid DH parameter", ippe);
_inboundStates.remove(state.getRemoteHostId());
return;
}

View File

@ -265,8 +265,8 @@ class OutboundEstablishState {
try {
generateSessionKey();
} catch (DHSessionKeyBuilder.InvalidPublicParameterException ippe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Peer " + getRemoteHostId() + " sent us an invalid DH parameter (or were spoofed)", ippe);
if (_log.shouldLog(Log.WARN))
_log.warn("Peer " + getRemoteHostId() + " sent us an invalid DH parameter", ippe);
valid = false;
}
if (valid)

View File

@ -253,7 +253,6 @@ class UDPReceiver {
public void run() {
//_socketChanged = false;
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest();
while (_keepRunning) {
//if (_socketChanged) {
// Thread.currentThread().setName(_name + "." + _id);
@ -292,7 +291,8 @@ class UDPReceiver {
if (size > 0) {
//FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver");
//_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver");
req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver");
FIFOBandwidthLimiter.Request req =
_context.bandwidthLimiter().requestInbound(size, "UDP receiver");
while (req.getPendingRequested() > 0)
req.waitForNextAllocation();

View File

@ -191,7 +191,6 @@ class UDPSender {
private class Runner implements Runnable {
//private volatile boolean _socketChanged;
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest();
public void run() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Running the UDP sender");
@ -211,7 +210,8 @@ class UDPSender {
// ?? int size2 = packet.getPacket().getLength();
if (size > 0) {
//_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender");
req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender");
FIFOBandwidthLimiter.Request req =
_context.bandwidthLimiter().requestOutbound(size, 0, "UDP sender");
while (req.getPendingRequested() > 0)
req.waitForNextAllocation();
}

View File

@ -2595,6 +2595,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// if old != unsolicited && now - lastUpdated > STATUS_GRACE_PERIOD)
//
// fall through...
case CommSystemFacade.STATUS_DISCONNECTED:
case CommSystemFacade.STATUS_HOSED:
_reachabilityStatus = status;
_reachabilityStatusLastUpdated = now;