diff --git a/history.txt b/history.txt
index 4fcb50aaf1..0092e5f22d 100644
--- a/history.txt
+++ b/history.txt
@@ -1,3 +1,7 @@
+2015-07-05 zzz
+ * SSU: Compete better with NTCP for outbound bandwidth allocations
+ * Transport: Adjust thread priorities to prevent I/O stalling
+
2015-06-29 zzz
* Transport: More fixes for SSU stalling
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index 7f4edb9752..31c35f8c3e 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
- public final static long BUILD = 13;
+ public final static long BUILD = 14;
/** for example "-test" */
public final static String EXTRA = "";
diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java
index 35f2bb7630..3029f96c39 100644
--- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java
+++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java
@@ -9,13 +9,13 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
+import net.i2p.router.RouterContext;
import net.i2p.router.transport.TransportUtil;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
@@ -99,7 +99,7 @@ around briefly, to address packet loss and reordering.
*
*/
class PacketBuilder {
- private final I2PAppContext _context;
+ private final RouterContext _context;
private final Log _log;
private final UDPTransport _transport;
@@ -169,7 +169,7 @@ class PacketBuilder {
/**
* @param transport may be null for unit testing only
*/
- public PacketBuilder(I2PAppContext ctx, UDPTransport transport) {
+ public PacketBuilder(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_transport = transport;
_log = ctx.logManager().getLog(PacketBuilder.class);
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java
index 33eb2d2e67..e2e1b3f592 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java
@@ -6,11 +6,11 @@ import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
-import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext;
+import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CDQEntry;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
@@ -22,7 +22,7 @@ import net.i2p.util.SystemVersion;
*
*/
class UDPPacket implements CDQEntry {
- private I2PAppContext _context;
+ private RouterContext _context;
private final DatagramPacket _packet;
private volatile short _priority;
private volatile long _initializeTime;
@@ -43,6 +43,7 @@ class UDPPacket implements CDQEntry {
//private long _afterHandlingTime;
private int _validateCount;
// private boolean _isInbound;
+ private FIFOBandwidthLimiter.Request _bandwidthRequest;
// Warning - this mixes contexts in a multi-router JVM
private static final Queue _packetCache;
@@ -102,7 +103,7 @@ class UDPPacket implements CDQEntry {
private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE;
- private UDPPacket(I2PAppContext ctx) {
+ private UDPPacket(RouterContext ctx) {
//ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES);
// the data buffer is clobbered on init(..), but we need it to bootstrap
_data = new byte[MAX_PACKET_SIZE];
@@ -112,7 +113,7 @@ class UDPPacket implements CDQEntry {
init(ctx);
}
- private synchronized void init(I2PAppContext ctx) {
+ private synchronized void init(RouterContext ctx) {
_context = ctx;
//_dataBuf = _dataCache.acquire();
Arrays.fill(_data, (byte)0);
@@ -231,7 +232,7 @@ class UDPPacket implements CDQEntry {
str.append("\n\tCalc HMAC: ").append(Base64.encode(calc, 0, MAC_SIZE));
str.append("\n\tRead HMAC: ").append(Base64.encode(_data, _packet.getOffset(), MAC_SIZE));
str.append("\n\tUsing key: ").append(macKey.toBase64());
- if (DataHelper.eq(macKey.getData(), 0, ((RouterContext)_context).routerHash().getData(), 0, 32))
+ if (DataHelper.eq(macKey.getData(), 0, _context.routerHash().getData(), 0, 32))
str.append(" (Intro)");
else
str.append(" (Session)");
@@ -300,6 +301,40 @@ class UDPPacket implements CDQEntry {
/** a packet handler has finished parsing out the good bits */
//long getTimeSinceHandling() { return (_afterHandlingTime > 0 ? _context.clock().now() - _afterHandlingTime : 0); }
+ /**
+ * So that we can compete with NTCP, we want to request bandwidth
+ * in parallel, on the way into the queue, not on the way out.
+ * Call before enqueueing.
+ * @since 0.9.21
+ * @deprecated unused
+ */
+ public synchronized void requestInboundBandwidth() {
+ verifyNotReleased();
+ _bandwidthRequest = _context.bandwidthLimiter().requestInbound(_packet.getLength(), "UDP receiver");
+ }
+
+ /**
+ * So that we can compete with NTCP, we want to request bandwidth
+ * in parallel, on the way into the queue, not on the way out.
+ * Call before enqueueing.
+ * @since 0.9.21
+ */
+ public synchronized void requestOutboundBandwidth() {
+ verifyNotReleased();
+ _bandwidthRequest = _context.bandwidthLimiter().requestOutbound(_packet.getLength(), 0, "UDP sender");
+ }
+
+ /**
+ * So that we can compete with NTCP, we want to request bandwidth
+ * in parallel, on the way into the queue, not on the way out.
+ * Call after dequeueing.
+ * @since 0.9.21
+ */
+ public synchronized FIFOBandwidthLimiter.Request getBandwidthRequest() {
+ verifyNotReleased();
+ return _bandwidthRequest;
+ }
+
// Following 5: All used only for stats in PacketHandler, commented out
/** when it was pulled off the endpoint receive queue */
@@ -339,7 +374,7 @@ class UDPPacket implements CDQEntry {
/**
* @param inbound unused
*/
- public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) {
+ public static UDPPacket acquire(RouterContext ctx, boolean inbound) {
UDPPacket rv = null;
if (CACHE) {
rv = _packetCache.poll();
@@ -375,6 +410,13 @@ class UDPPacket implements CDQEntry {
//_acquiredBy = null;
//
//_dataCache.release(_dataBuf);
+ if (_bandwidthRequest != null) {
+ synchronized(_bandwidthRequest) {
+ if (_bandwidthRequest.getPendingRequested() > 0)
+ _bandwidthRequest.abort();
+ }
+ _bandwidthRequest = null;
+ }
if (!CACHE)
return;
_packetCache.offer(this);
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
index 7091b4ceda..ebb2de795e 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
@@ -32,6 +32,9 @@ class UDPSender {
private static final int TYPE_POISON = 99999;
+ // Queue needs to be big enough that we can compete with NTCP for
+ // bandwidth requests, and so CoDel can work well.
+ // When full, packets back up into the PacketPusher thread, pre-CoDel.
private static final int MIN_QUEUE_SIZE = 64;
private static final int MAX_QUEUE_SIZE = 384;
@@ -195,9 +198,11 @@ class UDPSender {
packet.release();
return;
}
+ packet.requestOutboundBandwidth();
try {
_outboundQueue.put(packet);
} catch (InterruptedException ie) {
+ packet.release();
return;
}
//size = _outboundQueue.size();
@@ -229,17 +234,18 @@ class UDPSender {
// ?? int size2 = packet.getPacket().getLength();
if (size > 0) {
//_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender");
- FIFOBandwidthLimiter.Request req =
- _context.bandwidthLimiter().requestOutbound(size, 0, "UDP sender");
- // failsafe, don't wait forever
- int waitCount = 0;
- while (req.getPendingRequested() > 0 && waitCount++ < 5) {
- req.waitForNextAllocation();
- }
- if (waitCount >= 5) {
- // tell FBL we didn't send it, but send it anyway
- req.abort();
- _context.statManager().addRateData("udp.sendFailsafe", 1);
+ FIFOBandwidthLimiter.Request req = packet.getBandwidthRequest();
+ if (req != null) {
+ // failsafe, don't wait forever
+ int waitCount = 0;
+ while (req.getPendingRequested() > 0 && waitCount++ < 5) {
+ req.waitForNextAllocation();
+ }
+ if (waitCount >= 5) {
+ // tell FBL we didn't send it, but send it anyway
+ req.abort();
+ _context.statManager().addRateData("udp.sendFailsafe", 1);
+ }
}
}