From 05959d519946f1bbbbdc68b41f2fddca8adfd38e Mon Sep 17 00:00:00 2001
From: zzz
Date: Sun, 5 Jul 2015 12:30:01 +0000
Subject: [PATCH] SSU: Request outbound bandwidth on the way into the sender
queue, not on the way out, so that SSU requests bandwidth allocations for
each packet in parallel and competes more effectively with NTCP for
bandwidth. Inbound stubbed-out only.
---
history.txt | 4 ++
.../src/net/i2p/router/RouterVersion.java | 2 +-
.../router/transport/udp/PacketBuilder.java | 6 +--
.../i2p/router/transport/udp/UDPPacket.java | 54 ++++++++++++++++---
.../i2p/router/transport/udp/UDPSender.java | 28 ++++++----
5 files changed, 73 insertions(+), 21 deletions(-)
diff --git a/history.txt b/history.txt
index 4fcb50aaf1..0092e5f22d 100644
--- a/history.txt
+++ b/history.txt
@@ -1,3 +1,7 @@
+2015-07-05 zzz
+ * SSU: Compete better with NTCP for outbound bandwidth allocations
+ * Transport: Adjust thread priorities to prevent I/O stalling
+
2015-06-29 zzz
* Transport: More fixes for SSU stalling
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index 7f4edb9752..31c35f8c3e 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
- public final static long BUILD = 13;
+ public final static long BUILD = 14;
/** for example "-test" */
public final static String EXTRA = "";
diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java
index 35f2bb7630..3029f96c39 100644
--- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java
+++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java
@@ -9,13 +9,13 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
+import net.i2p.router.RouterContext;
import net.i2p.router.transport.TransportUtil;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
@@ -99,7 +99,7 @@ around briefly, to address packet loss and reordering.
*
*/
class PacketBuilder {
- private final I2PAppContext _context;
+ private final RouterContext _context;
private final Log _log;
private final UDPTransport _transport;
@@ -169,7 +169,7 @@ class PacketBuilder {
/**
* @param transport may be null for unit testing only
*/
- public PacketBuilder(I2PAppContext ctx, UDPTransport transport) {
+ public PacketBuilder(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_transport = transport;
_log = ctx.logManager().getLog(PacketBuilder.class);
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java
index 33eb2d2e67..e2e1b3f592 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java
@@ -6,11 +6,11 @@ import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
-import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext;
+import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CDQEntry;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
@@ -22,7 +22,7 @@ import net.i2p.util.SystemVersion;
*
*/
class UDPPacket implements CDQEntry {
- private I2PAppContext _context;
+ private RouterContext _context;
private final DatagramPacket _packet;
private volatile short _priority;
private volatile long _initializeTime;
@@ -43,6 +43,7 @@ class UDPPacket implements CDQEntry {
//private long _afterHandlingTime;
private int _validateCount;
// private boolean _isInbound;
+ private FIFOBandwidthLimiter.Request _bandwidthRequest;
// Warning - this mixes contexts in a multi-router JVM
private static final Queue _packetCache;
@@ -102,7 +103,7 @@ class UDPPacket implements CDQEntry {
private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE;
- private UDPPacket(I2PAppContext ctx) {
+ private UDPPacket(RouterContext ctx) {
//ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES);
// the data buffer is clobbered on init(..), but we need it to bootstrap
_data = new byte[MAX_PACKET_SIZE];
@@ -112,7 +113,7 @@ class UDPPacket implements CDQEntry {
init(ctx);
}
- private synchronized void init(I2PAppContext ctx) {
+ private synchronized void init(RouterContext ctx) {
_context = ctx;
//_dataBuf = _dataCache.acquire();
Arrays.fill(_data, (byte)0);
@@ -231,7 +232,7 @@ class UDPPacket implements CDQEntry {
str.append("\n\tCalc HMAC: ").append(Base64.encode(calc, 0, MAC_SIZE));
str.append("\n\tRead HMAC: ").append(Base64.encode(_data, _packet.getOffset(), MAC_SIZE));
str.append("\n\tUsing key: ").append(macKey.toBase64());
- if (DataHelper.eq(macKey.getData(), 0, ((RouterContext)_context).routerHash().getData(), 0, 32))
+ if (DataHelper.eq(macKey.getData(), 0, _context.routerHash().getData(), 0, 32))
str.append(" (Intro)");
else
str.append(" (Session)");
@@ -300,6 +301,40 @@ class UDPPacket implements CDQEntry {
/** a packet handler has finished parsing out the good bits */
//long getTimeSinceHandling() { return (_afterHandlingTime > 0 ? _context.clock().now() - _afterHandlingTime : 0); }
+ /**
+ * So that we can compete with NTCP, we want to request bandwidth
+ * in parallel, on the way into the queue, not on the way out.
+ * Call before enqueueing.
+ * @since 0.9.21
+ * @deprecated unused
+ */
+ public synchronized void requestInboundBandwidth() {
+ verifyNotReleased();
+ _bandwidthRequest = _context.bandwidthLimiter().requestInbound(_packet.getLength(), "UDP receiver");
+ }
+
+ /**
+ * So that we can compete with NTCP, we want to request bandwidth
+ * in parallel, on the way into the queue, not on the way out.
+ * Call before enqueueing.
+ * @since 0.9.21
+ */
+ public synchronized void requestOutboundBandwidth() {
+ verifyNotReleased();
+ _bandwidthRequest = _context.bandwidthLimiter().requestOutbound(_packet.getLength(), 0, "UDP sender");
+ }
+
+ /**
+ * So that we can compete with NTCP, we want to request bandwidth
+ * in parallel, on the way into the queue, not on the way out.
+ * Call after dequeueing.
+ * @since 0.9.21
+ */
+ public synchronized FIFOBandwidthLimiter.Request getBandwidthRequest() {
+ verifyNotReleased();
+ return _bandwidthRequest;
+ }
+
// Following 5: All used only for stats in PacketHandler, commented out
/** when it was pulled off the endpoint receive queue */
@@ -339,7 +374,7 @@ class UDPPacket implements CDQEntry {
/**
* @param inbound unused
*/
- public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) {
+ public static UDPPacket acquire(RouterContext ctx, boolean inbound) {
UDPPacket rv = null;
if (CACHE) {
rv = _packetCache.poll();
@@ -375,6 +410,13 @@ class UDPPacket implements CDQEntry {
//_acquiredBy = null;
//
//_dataCache.release(_dataBuf);
+ if (_bandwidthRequest != null) {
+ synchronized(_bandwidthRequest) {
+ if (_bandwidthRequest.getPendingRequested() > 0)
+ _bandwidthRequest.abort();
+ }
+ _bandwidthRequest = null;
+ }
if (!CACHE)
return;
_packetCache.offer(this);
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
index 7091b4ceda..ebb2de795e 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
@@ -32,6 +32,9 @@ class UDPSender {
private static final int TYPE_POISON = 99999;
+ // Queue needs to be big enough that we can compete with NTCP for
+ // bandwidth requests, and so CoDel can work well.
+ // When full, packets back up into the PacketPusher thread, pre-CoDel.
private static final int MIN_QUEUE_SIZE = 64;
private static final int MAX_QUEUE_SIZE = 384;
@@ -195,9 +198,11 @@ class UDPSender {
packet.release();
return;
}
+ packet.requestOutboundBandwidth();
try {
_outboundQueue.put(packet);
} catch (InterruptedException ie) {
+ packet.release();
return;
}
//size = _outboundQueue.size();
@@ -229,17 +234,18 @@ class UDPSender {
// ?? int size2 = packet.getPacket().getLength();
if (size > 0) {
//_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender");
- FIFOBandwidthLimiter.Request req =
- _context.bandwidthLimiter().requestOutbound(size, 0, "UDP sender");
- // failsafe, don't wait forever
- int waitCount = 0;
- while (req.getPendingRequested() > 0 && waitCount++ < 5) {
- req.waitForNextAllocation();
- }
- if (waitCount >= 5) {
- // tell FBL we didn't send it, but send it anyway
- req.abort();
- _context.statManager().addRateData("udp.sendFailsafe", 1);
+ FIFOBandwidthLimiter.Request req = packet.getBandwidthRequest();
+ if (req != null) {
+ // failsafe, don't wait forever
+ int waitCount = 0;
+ while (req.getPendingRequested() > 0 && waitCount++ < 5) {
+ req.waitForNextAllocation();
+ }
+ if (waitCount >= 5) {
+ // tell FBL we didn't send it, but send it anyway
+ req.abort();
+ _context.statManager().addRateData("udp.sendFailsafe", 1);
+ }
}
}