- Limit PacketHandler threads to 1 (ticket #660)
   - Limit queue sizes between UDPReceiver and PacketHandler,
     and between PacketHandler and MessageReceiver, to prevent OOMs
     and/or excessive queue delays
   - Increase UDPPacket cache size based on max mem
   - Remove more stats
This commit is contained in:
zzz
2012-08-05 14:24:14 +00:00
parent 4efa87d6bf
commit 5ba5d537b5
7 changed files with 72 additions and 24 deletions

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 2;
public final static long BUILD = 3;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -43,8 +43,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
_context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivedACKs", "How many messages were ACKed at a time", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.ignoreRecentDuplicate", "Take note that we received a packet for a recently completed message", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveMessagePeriod", "How long it takes to pull the message fragments out of a packet", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveACKPeriod", "How long it takes to pull the ACKs out of a packet", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receiveMessagePeriod", "How long it takes to pull the message fragments out of a packet", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receiveACKPeriod", "How long it takes to pull the ACKs out of a packet", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePiggyback", "How many acks were included in a packet with data fragments (time == # data fragments)", "udp", UDPTransport.RATES);
}
@ -71,15 +71,16 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
* Pull the fragments and ACKs out of the authenticated data packet
*/
public void receiveData(PeerState from, UDPPacketReader.DataReader data) {
long beforeMsgs = _context.clock().now();
//long beforeMsgs = _context.clock().now();
int fragmentsIncluded = receiveMessages(from, data);
long afterMsgs = _context.clock().now();
//long afterMsgs = _context.clock().now();
int acksIncluded = receiveACKs(from, data);
long afterACKs = _context.clock().now();
//long afterACKs = _context.clock().now();
from.packetReceived(data.getPacketSize());
_context.statManager().addRateData("udp.receiveMessagePeriod", afterMsgs-beforeMsgs, afterACKs-beforeMsgs);
_context.statManager().addRateData("udp.receiveACKPeriod", afterACKs-afterMsgs, afterACKs-beforeMsgs);
// each of these was less than 0.1 ms
//_context.statManager().addRateData("udp.receiveMessagePeriod", afterMsgs-beforeMsgs, afterACKs-beforeMsgs);
//_context.statManager().addRateData("udp.receiveACKPeriod", afterACKs-afterMsgs, afterACKs-beforeMsgs);
if ( (fragmentsIncluded > 0) && (acksIncluded > 0) )
_context.statManager().addRateData("udp.receivePiggyback", acksIncluded, fragmentsIncluded);
}

View File

@ -28,8 +28,11 @@ class MessageReceiver {
private final BlockingQueue<InboundMessageState> _completeMessages;
private boolean _alive;
//private ByteCache _cache;
private static final int MIN_THREADS = 2; // unless < 32MB
private static final int MAX_THREADS = 5;
private static final int MIN_QUEUE_SIZE = 32; // unless < 32MB
private static final int MAX_QUEUE_SIZE = 128;
private final int _threadCount;
private static final long POISON_IMS = -99999999999l;
@ -37,17 +40,22 @@ class MessageReceiver {
_context = ctx;
_log = ctx.logManager().getLog(MessageReceiver.class);
_transport = transport;
_completeMessages = new LinkedBlockingQueue();
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
if (maxMemory < 32*1024*1024)
int qsize;
if (maxMemory < 32*1024*1024) {
_threadCount = 1;
else if (maxMemory < 64*1024*1024)
qsize = 16;
} else if (maxMemory < 64*1024*1024) {
_threadCount = 2;
else
qsize = 32;
} else {
_threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
}
_completeMessages = new LinkedBlockingQueue(qsize);
// the runners run forever, no need to have a cache
//_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
@ -56,7 +64,7 @@ class MessageReceiver {
_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundLag", "How long the olded ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundLag", "How long the oldest ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
_alive = true;
}
@ -93,12 +101,18 @@ class MessageReceiver {
/**
* This queues the message for processing.
* Processing will call state.releaseResources(), do not access state after calling this.
* BLOCKING if queue is full.
*/
public void receiveMessage(InboundMessageState state) {
//int total = 0;
//long lag = -1;
if (_alive)
_completeMessages.offer(state);
if (_alive) {
try {
_completeMessages.put(state);
} catch (InterruptedException ie) {
_alive = false;
}
}
//total = _completeMessages.size();
//if (total > 1)
// lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime();

View File

@ -31,8 +31,8 @@ class PacketHandler {
private volatile boolean _keepReading;
private final Handler[] _handlers;
private static final int MIN_NUM_HANDLERS = 2; // unless < 32MB
private static final int MAX_NUM_HANDLERS = 5;
private static final int MIN_NUM_HANDLERS = 1; // unless < 32MB
private static final int MAX_NUM_HANDLERS = 1;
/** let packets be up to 30s slow */
private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000;

View File

@ -42,12 +42,18 @@ class UDPPacket {
// Warning - this mixes contexts in a multi-router JVM
private static final Queue<UDPPacket> _packetCache;
private static final boolean CACHE = true;
private static final int CACHE_SIZE = 64;
private static final int MIN_CACHE_SIZE = 64;
private static final int MAX_CACHE_SIZE = 256;
static {
if (CACHE)
_packetCache = new LinkedBlockingQueue(CACHE_SIZE);
else
if (CACHE) {
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
int csize = (int) Math.max(MIN_CACHE_SIZE, Math.min(MAX_CACHE_SIZE, maxMemory / (1024*1024)));
_packetCache = new LinkedBlockingQueue(csize);
} else {
_packetCache = null;
}
}
/**

View File

@ -30,14 +30,21 @@ class UDPReceiver {
private final UDPTransport _transport;
private static int __id;
private final int _id;
private static final int TYPE_POISON = -99999;
private static final int MIN_QUEUE_SIZE = 16;
private static final int MAX_QUEUE_SIZE = 192;
public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) {
_context = ctx;
_log = ctx.logManager().getLog(UDPReceiver.class);
_id = ++__id;
_name = name;
_inboundQueue = new LinkedBlockingQueue();
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
_inboundQueue = new LinkedBlockingQueue(qsize);
_socket = socket;
_transport = transport;
_runner = new Runner();
@ -138,7 +145,11 @@ class UDPReceiver {
return doReceive(packet);
}
/** @return zero (was queue size) */
/**
* BLOCKING if queue between here and PacketHandler is full.
*
* @return zero (was queue size)
*/
private final int doReceive(UDPPacket packet) {
if (!_keepRunning)
return 0;
@ -168,7 +179,12 @@ class UDPReceiver {
}
}
if (!rejected) {
_inboundQueue.offer(packet);
try {
_inboundQueue.put(packet);
} catch (InterruptedException ie) {
packet.release();
_keepRunning = false;
}
//return queueSize + 1;
return 0;
}