SSU: Request outbound bandwidth on the way into the

sender queue, not on the way out, so that SSU requests
bandwidth allocations for each packet in parallel
and competes more effectively with NTCP for bandwidth.
Inbound stubbed-out only.
This commit is contained in:
zzz
2015-07-05 12:30:01 +00:00
parent 113a8a52f3
commit 05959d5199
5 changed files with 73 additions and 21 deletions

View File

@ -1,3 +1,7 @@
2015-07-05 zzz
* SSU: Compete better with NTCP for outbound bandwidth allocations
* Transport: Adjust thread priorities to prevent I/O stalling
2015-06-29 zzz
* Transport: More fixes for SSU stalling

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 13;
public final static long BUILD = 14;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -9,13 +9,13 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.TransportUtil;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
@ -99,7 +99,7 @@ around briefly, to address packet loss and reordering.</p>
*
*/
class PacketBuilder {
private final I2PAppContext _context;
private final RouterContext _context;
private final Log _log;
private final UDPTransport _transport;
@ -169,7 +169,7 @@ class PacketBuilder {
/**
* @param transport may be null for unit testing only
*/
public PacketBuilder(I2PAppContext ctx, UDPTransport transport) {
public PacketBuilder(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_transport = transport;
_log = ctx.logManager().getLog(PacketBuilder.class);

View File

@ -6,11 +6,11 @@ import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CDQEntry;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
@ -22,7 +22,7 @@ import net.i2p.util.SystemVersion;
*
*/
class UDPPacket implements CDQEntry {
private I2PAppContext _context;
private RouterContext _context;
private final DatagramPacket _packet;
private volatile short _priority;
private volatile long _initializeTime;
@ -43,6 +43,7 @@ class UDPPacket implements CDQEntry {
//private long _afterHandlingTime;
private int _validateCount;
// private boolean _isInbound;
private FIFOBandwidthLimiter.Request _bandwidthRequest;
// Warning - this mixes contexts in a multi-router JVM
private static final Queue<UDPPacket> _packetCache;
@ -102,7 +103,7 @@ class UDPPacket implements CDQEntry {
private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE;
private UDPPacket(I2PAppContext ctx) {
private UDPPacket(RouterContext ctx) {
//ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES);
// the data buffer is clobbered on init(..), but we need it to bootstrap
_data = new byte[MAX_PACKET_SIZE];
@ -112,7 +113,7 @@ class UDPPacket implements CDQEntry {
init(ctx);
}
private synchronized void init(I2PAppContext ctx) {
private synchronized void init(RouterContext ctx) {
_context = ctx;
//_dataBuf = _dataCache.acquire();
Arrays.fill(_data, (byte)0);
@ -231,7 +232,7 @@ class UDPPacket implements CDQEntry {
str.append("\n\tCalc HMAC: ").append(Base64.encode(calc, 0, MAC_SIZE));
str.append("\n\tRead HMAC: ").append(Base64.encode(_data, _packet.getOffset(), MAC_SIZE));
str.append("\n\tUsing key: ").append(macKey.toBase64());
if (DataHelper.eq(macKey.getData(), 0, ((RouterContext)_context).routerHash().getData(), 0, 32))
if (DataHelper.eq(macKey.getData(), 0, _context.routerHash().getData(), 0, 32))
str.append(" (Intro)");
else
str.append(" (Session)");
@ -300,6 +301,40 @@ class UDPPacket implements CDQEntry {
/** a packet handler has finished parsing out the good bits */
//long getTimeSinceHandling() { return (_afterHandlingTime > 0 ? _context.clock().now() - _afterHandlingTime : 0); }
/**
* So that we can compete with NTCP, we want to request bandwidth
* in parallel, on the way into the queue, not on the way out.
* Call before enqueueing.
* @since 0.9.21
* @deprecated unused
*/
public synchronized void requestInboundBandwidth() {
verifyNotReleased();
_bandwidthRequest = _context.bandwidthLimiter().requestInbound(_packet.getLength(), "UDP receiver");
}
/**
* So that we can compete with NTCP, we want to request bandwidth
* in parallel, on the way into the queue, not on the way out.
* Call before enqueueing.
* @since 0.9.21
*/
public synchronized void requestOutboundBandwidth() {
verifyNotReleased();
_bandwidthRequest = _context.bandwidthLimiter().requestOutbound(_packet.getLength(), 0, "UDP sender");
}
/**
* So that we can compete with NTCP, we want to request bandwidth
* in parallel, on the way into the queue, not on the way out.
* Call after dequeueing.
* @since 0.9.21
*/
public synchronized FIFOBandwidthLimiter.Request getBandwidthRequest() {
verifyNotReleased();
return _bandwidthRequest;
}
// Following 5: All used only for stats in PacketHandler, commented out
/** when it was pulled off the endpoint receive queue */
@ -339,7 +374,7 @@ class UDPPacket implements CDQEntry {
/**
* @param inbound unused
*/
public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) {
public static UDPPacket acquire(RouterContext ctx, boolean inbound) {
UDPPacket rv = null;
if (CACHE) {
rv = _packetCache.poll();
@ -375,6 +410,13 @@ class UDPPacket implements CDQEntry {
//_acquiredBy = null;
//
//_dataCache.release(_dataBuf);
if (_bandwidthRequest != null) {
synchronized(_bandwidthRequest) {
if (_bandwidthRequest.getPendingRequested() > 0)
_bandwidthRequest.abort();
}
_bandwidthRequest = null;
}
if (!CACHE)
return;
_packetCache.offer(this);

View File

@ -32,6 +32,9 @@ class UDPSender {
private static final int TYPE_POISON = 99999;
// Queue needs to be big enough that we can compete with NTCP for
// bandwidth requests, and so CoDel can work well.
// When full, packets back up into the PacketPusher thread, pre-CoDel.
private static final int MIN_QUEUE_SIZE = 64;
private static final int MAX_QUEUE_SIZE = 384;
@ -195,9 +198,11 @@ class UDPSender {
packet.release();
return;
}
packet.requestOutboundBandwidth();
try {
_outboundQueue.put(packet);
} catch (InterruptedException ie) {
packet.release();
return;
}
//size = _outboundQueue.size();
@ -229,17 +234,18 @@ class UDPSender {
// ?? int size2 = packet.getPacket().getLength();
if (size > 0) {
//_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender");
FIFOBandwidthLimiter.Request req =
_context.bandwidthLimiter().requestOutbound(size, 0, "UDP sender");
// failsafe, don't wait forever
int waitCount = 0;
while (req.getPendingRequested() > 0 && waitCount++ < 5) {
req.waitForNextAllocation();
}
if (waitCount >= 5) {
// tell FBL we didn't send it, but send it anyway
req.abort();
_context.statManager().addRateData("udp.sendFailsafe", 1);
FIFOBandwidthLimiter.Request req = packet.getBandwidthRequest();
if (req != null) {
// failsafe, don't wait forever
int waitCount = 0;
while (req.getPendingRequested() > 0 && waitCount++ < 5) {
req.waitForNextAllocation();
}
if (waitCount >= 5) {
// tell FBL we didn't send it, but send it anyway
req.abort();
_context.statManager().addRateData("udp.sendFailsafe", 1);
}
}
}