* NTCP: Move NTCPConnection outbound queue to CoDelPriority

* SSU:
    - Separate PeerState outbound message list into a queue for unsent messages
      and a list for sent messages awaiting ack
    - Implement PeerState outbound queue as CoDelPriority
    - Implement backlogged indication like in NTCP
This commit is contained in:
zzz
2012-09-08 12:40:27 +00:00
parent ca91ad3188
commit 2c866e205b
6 changed files with 232 additions and 60 deletions

View File

@ -20,6 +20,7 @@ import java.util.Set;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.util.CDPQEntry;
import net.i2p.util.Log;
/**
@ -27,7 +28,7 @@ import net.i2p.util.Log;
* delivery and jobs to be fired off if particular events occur.
*
*/
public class OutNetMessage {
public class OutNetMessage implements CDPQEntry {
private final Log _log;
private final RouterContext _context;
private RouterInfo _target;
@ -49,6 +50,8 @@ public class OutNetMessage {
private long _sendBegin;
//private Exception _createdBy;
private final long _created;
private long _enqueueTime;
private long _seqNum;
/** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */
private HashMap<String, Long> _timestamps;
/**
@ -283,6 +286,45 @@ public class OutNetMessage {
/** time the transport tries to send the message (including any queueing) */
public long getSendTime() { return _context.clock().now() - _sendBegin; }
/**
* For CDQ
* @since 0.9.3
*/
public void setEnqueueTime(long now) {
_enqueueTime = now;
}
/**
* For CDQ
* @since 0.9.3
*/
public long getEnqueueTime() {
return _enqueueTime;
}
/**
* For CDQ
* @since 0.9.3
*/
public void drop() {
}
/**
* For CDPQ
* @since 0.9.3
*/
public void setSeqNum(long num) {
_seqNum = num;
}
/**
* For CDPQ
* @since 0.9.3
*/
public long getSeqNum() {
return _seqNum;
}
/**
* We've done what we need to do with the data from this message, though
* we may keep the object around for a while to use its ID, jobs, etc.

View File

@ -4,7 +4,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -24,6 +26,7 @@ import net.i2p.router.OutNetMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.HexDump;
import net.i2p.util.Log;
@ -83,7 +86,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/**
* pending unprepared OutNetMessage instances
*/
private final Queue<OutNetMessage> _outbound;
private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
/**
* current prepared OutNetMessage, or null - synchronize on _outbound to modify
* FIXME why do we need this???
@ -136,9 +139,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
public static final int BUFFER_SIZE = 16*1024;
/** 2 bytes for length and 4 for CRC */
public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4);
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW;
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW;
/**
* Create an inbound connected (though not established) NTCP connection
*
@ -152,8 +155,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_readBufs = new ConcurrentLinkedQueue();
_writeBufs = new ConcurrentLinkedQueue();
_bwRequests = new ConcurrentHashSet(2);
// TODO possible switch to CLQ but beware non-constant size() - see below
_outbound = new LinkedBlockingQueue();
_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
_isInbound = true;
_decryptBlockBuf = new byte[BLOCK_SIZE];
_curReadState = new ReadState();
@ -177,8 +179,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_readBufs = new ConcurrentLinkedQueue();
_writeBufs = new ConcurrentLinkedQueue();
_bwRequests = new ConcurrentHashSet(8);
// TODO possible switch to CLQ but beware non-constant size() - see below
_outbound = new LinkedBlockingQueue();
_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
_isInbound = false;
_decryptBlockBuf = new byte[BLOCK_SIZE];
_curReadState = new ReadState();
@ -297,15 +298,16 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
EventPumper.releaseBuf(bb);
}
OutNetMessage msg;
while ((msg = _outbound.poll()) != null) {
List<OutNetMessage> pending = new ArrayList();
_outbound.drainAllTo(pending);
for (OutNetMessage msg : pending) {
Object buf = msg.releasePreparationBuffer();
if (buf != null)
releaseBuf((PrepBuffer)buf);
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
msg = _currentOutbound;
OutNetMessage msg = _currentOutbound;
if (msg != null) {
Object buf = msg.releasePreparationBuffer();
if (buf != null)
@ -318,6 +320,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* toss the message onto the connection's send queue
*/
public void send(OutNetMessage msg) {
/****
always enqueue, let the queue do the dropping
if (tooBacklogged()) {
boolean allowRequeue = false; // if we are too backlogged in tcp, don't try ssu
boolean successful = false;
@ -337,20 +342,20 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
return;
}
_consecutiveBacklog = 0;
int enqueued = 0;
****/
//if (FAST_LARGE)
bufferedPrepare(msg);
boolean noOutbound = false;
_outbound.offer(msg);
enqueued = _outbound.size();
//int enqueued = _outbound.size();
// although stat description says ahead of this one, not including this one...
_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
noOutbound = (_currentOutbound == null);
if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
//_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
boolean noOutbound = (_currentOutbound == null);
//if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
if (_established && noOutbound)
_transport.getWriter().wantsWrite(this, "enqueued");
}
/****
private long queueTime() {
OutNetMessage msg = _currentOutbound;
if (msg == null) {
@ -360,29 +365,31 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
return msg.getSendTime(); // does not include any of the pre-send(...) preparation
}
****/
public boolean tooBacklogged() {
long queueTime = queueTime();
if (queueTime <= 0) return false;
boolean currentOutboundSet = _currentOutbound != null;
//long queueTime = queueTime();
//if (queueTime <= 0) return false;
// perhaps we could take into account the size of the queued messages too, our
// current transmission rate, and how much time is left before the new message's expiration?
// ok, maybe later...
if (getUptime() < 10*1000) // allow some slack just after establishment
return false;
if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime...
//if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime...
if (_outbound.isBacklogged()) { // bloody arbitrary. well, its half the average message lifetime...
int size = _outbound.size();
if (_log.shouldLog(Log.WARN)) {
int writeBufs = _writeBufs.size();
boolean currentOutboundSet = _currentOutbound != null;
try {
_log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size
_log.warn("Too backlogged: size is " + size
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
+ ", currentOut set? " + currentOutboundSet
+ ", writeBufs: " + writeBufs + " on " + toString());
} catch (Exception e) {} // java.nio.channels.CancelledKeyException
}
_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime);
//_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime);
return true;
//} else if (size > 32) { // another arbitrary limit.
// if (_log.shouldLog(Log.ERROR))
@ -651,11 +658,14 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_log.info("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued");
return;
}
/****
//throw new RuntimeException("We should not be preparing a write while we still have one pending");
if (queueTime() > 3*1000) { // don't stall low-priority messages
****/
msg = _outbound.poll();
if (msg == null)
return;
/****
} else {
// FIXME
// This is a linear search to implement a priority queue, O(n**2)
@ -681,6 +691,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if ((!removed) && _log.shouldLog(Log.WARN))
_log.warn("Already removed??? " + msg.getMessage().getType());
}
****/
_currentOutbound = msg;
}

View File

@ -165,9 +165,9 @@ class OutboundMessageFragments {
state.releaseResources();
return;
}
int active = peer.add(state);
peer.add(state);
add(peer);
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Error initializing " + msg);
@ -182,9 +182,9 @@ class OutboundMessageFragments {
PeerState peer = state.getPeer();
if (peer == null)
throw new RuntimeException("wtf, null peer for " + state);
int active = peer.add(state);
peer.add(state);
add(peer);
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
}
/**

View File

@ -7,6 +7,7 @@ import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.util.CDPQEntry;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
@ -14,7 +15,7 @@ import net.i2p.util.Log;
* Maintain the outbound fragmentation for resending, for a single message.
*
*/
class OutboundMessageState {
class OutboundMessageState implements CDPQEntry {
private final I2PAppContext _context;
private final Log _log;
/** may be null if we are part of the establishment */
@ -36,6 +37,9 @@ class OutboundMessageState {
/** for tracking use-after-free bugs */
private boolean _released;
private Exception _releasedBy;
// we can't use the ones in _message since it is null for injections
private long _enqueueTime;
private long _seqNum;
public static final int MAX_MSG_SIZE = 32 * 1024;
/** is this enough for a high-bandwidth router? */
@ -104,6 +108,7 @@ class OutboundMessageState {
/**
* Called from OutboundMessageFragments
* @param m null if msg is "injected"
* @return success
*/
private boolean initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
@ -128,8 +133,8 @@ class OutboundMessageState {
_expiration = _startedOn + EXPIRATION;
//_expiration = msg.getExpiration();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
return true;
} catch (IllegalStateException ise) {
_cache.release(_messageBuf);
@ -368,6 +373,56 @@ class OutboundMessageState {
}
}
/**
* For CDQ
* @since 0.9.3
*/
public void setEnqueueTime(long now) {
_enqueueTime = now;
}
/**
* For CDQ
* @since 0.9.3
*/
public long getEnqueueTime() {
return _enqueueTime;
}
/**
* For CDQ
* @since 0.9.3
*/
public void drop() {
_peer.getTransport().failed(this, false);
releaseResources();
}
/**
* For CDPQ
* @since 0.9.3
*/
public void setSeqNum(long num) {
_seqNum = num;
}
/**
* For CDPQ
* @since 0.9.3
*/
public long getSeqNum() {
return _seqNum;
}
/**
* For CDPQ
* @return OutNetMessage priority or 1000 for injected
* @since 0.9.3
*/
public int getPriority() {
return _message != null ? _message.getPriority() : 1000;
}
@Override
public String toString() {
short sends[] = _fragmentSends;

View File

@ -16,6 +16,7 @@ import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.Log;
import net.i2p.util.ConcurrentHashSet;
@ -188,8 +189,19 @@ class PeerState {
/** list of InboundMessageState for active message */
private final Map<Long, InboundMessageState> _inboundMessages;
/** list of OutboundMessageState */
/**
* Mostly messages that have been transmitted and are awaiting acknowledgement,
* although there could be some that have not been sent yet.
*/
private final List<OutboundMessageState> _outboundMessages;
/**
* Priority queue of messages that have not yet been sent.
* They are taken from here and put in _outboundMessages.
*/
private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
/** which outbound message is currently being retransmitted */
private OutboundMessageState _retransmitter;
@ -298,6 +310,7 @@ class PeerState {
_rttDeviation = _rtt;
_inboundMessages = new HashMap(8);
_outboundMessages = new ArrayList(32);
_outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32);
// all createRateStat() moved to EstablishmentManager
_remoteIP = remoteIP;
_remotePeer = remotePeer;
@ -726,8 +739,8 @@ class PeerState {
public List<Long> getCurrentFullACKs() {
// no such element exception seen here
List<Long> rv = new ArrayList(_currentACKs);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Returning " + _currentACKs.size() + " current acks");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Returning " + _currentACKs.size() + " current acks");
return rv;
}
@ -748,8 +761,8 @@ class PeerState {
public List<Long> getCurrentResendACKs() {
List<Long> randomResends = new ArrayList(_currentACKsResend);
Collections.shuffle(randomResends, _context.random());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Returning " + randomResends.size() + " resend acks");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Returning " + randomResends.size() + " resend acks");
return randomResends;
}
@ -1194,24 +1207,26 @@ class PeerState {
* TODO priority queue? (we don't implement priorities in SSU now)
* TODO backlog / pushback / block instead of dropping? Can't really block here.
* TODO SSU does not support isBacklogged() now
* @return total pending messages
*/
public int add(OutboundMessageState state) {
public void add(OutboundMessageState state) {
if (_dead) {
_transport.failed(state, false);
return 0;
return;
}
state.setPeer(this);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
int rv = 0;
boolean fail = false;
// will never fail for CDPQ
boolean fail = !_outboundQueue.offer(state);
/****
synchronized (_outboundMessages) {
rv = _outboundMessages.size() + 1;
if (rv > MAX_SEND_MSGS_PENDING) {
// too many queued messages to one peer? nuh uh.
fail = true;
rv--;
****/
/******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless
@ -1250,17 +1265,17 @@ class PeerState {
}
*******/
/****
} else {
_outboundMessages.add(state);
}
}
****/
if (fail) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping msg, OB queue full for " + toString());
_transport.failed(state, false);
}
return rv;
}
/** drop all outbound messages */
@ -1268,19 +1283,17 @@ class PeerState {
//if (_dead) return;
_dead = true;
//_outboundMessages = null;
_retransmitter = null;
int sz = 0;
List<OutboundMessageState> tempList = null;
List<OutboundMessageState> tempList;
synchronized (_outboundMessages) {
sz = _outboundMessages.size();
if (sz > 0) {
_retransmitter = null;
tempList = new ArrayList(_outboundMessages);
_outboundMessages.clear();
}
}
for (int i = 0; i < sz; i++)
_transport.failed(tempList.get(i), false);
_outboundQueue.drainAllTo(tempList);
for (OutboundMessageState oms : tempList) {
_transport.failed(oms, false);
}
// so the ACKSender will drop this peer from its queue
_wantACKSendSince = -1;
@ -1291,7 +1304,7 @@ class PeerState {
*/
public int getOutboundMessageCount() {
if (_dead) return 0;
return _outboundMessages.size();
return _outboundMessages.size() + _outboundQueue.size();
}
/**
@ -1305,7 +1318,7 @@ class PeerState {
public int finishMessages() {
// short circuit, unsynchronized
if (_outboundMessages.isEmpty())
return 0;
return _outboundQueue.size();
if (_dead) {
dropOutbound();
@ -1367,7 +1380,7 @@ class PeerState {
state.releaseResources();
}
return rv;
return rv + _outboundQueue.size();
}
/**
@ -1387,7 +1400,7 @@ class PeerState {
ShouldSend should = locked_shouldSend(state);
if (should == ShouldSend.YES) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId());
_log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
/*
while (iter.hasNext()) {
OutboundMessageState later = (OutboundMessageState)iter.next();
@ -1402,16 +1415,37 @@ class PeerState {
// we don't bother looking for a smaller msg that would fit.
// By not looking further, we keep strict sending order, and that allows
// some efficiency in acked() below.
break;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
return null;
} /* else {
OutNetMessage msg = state.getMessage();
if (msg != null)
msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
} */
}
// Peek at head of _outboundQueue and see if we can send it.
// If so, pull it off, put it in _outbundMessages, test
// again for bandwidth if necessary, and return it.
OutboundMessageState state = _outboundQueue.peek();
if (state != null && ShouldSend.YES == locked_shouldSend(state)) {
// we could get a different state, or null, when we poll,
// due to AQM drops, so we test again if necessary
OutboundMessageState dequeuedState = _outboundQueue.poll();
if (dequeuedState != null) {
_outboundMessages.add(dequeuedState);
if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(dequeuedState)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
return dequeuedState;
}
}
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() + " remaining");
_log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
return null;
}
@ -1441,9 +1475,19 @@ class PeerState {
rv = delay;
}
}
// failsafe... is this OK?
if (rv > 100 && !_outboundQueue.isEmpty())
rv = 100;
return rv;
}
/**
* @since 0.9.3
*/
public boolean isBacklogged() {
return _dead || _outboundQueue.isBacklogged();
}
/**
* If set to true, we should throttle retransmissions of all but the first message in
* flight to a peer. If set to false, we will only throttle the initial flight of a
@ -1521,8 +1565,8 @@ class PeerState {
int size = state.getUnackedSize();
if (allocateSendingBytes(size, state.getPushCount())) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + size + " allowed with "
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocation of " + size + " allowed with "
+ getSendWindowBytesRemaining()
+ "/" + getSendWindowBytes()
+ " remaining"
@ -1566,7 +1610,7 @@ class PeerState {
/**
* A full ACK was received.
* TODO if messages awaiting ack were a HashSet this would be faster.
* TODO if messages awaiting ack were a HashMap<Long, OutboundMessageState> this would be faster.
*
* @return true if the message was acked for the first time
*/
@ -1620,8 +1664,8 @@ class PeerState {
state.releaseResources();
} else {
// dupack, likely
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received an ACK for a message not pending: " + messageId);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Received an ACK for a message not pending: " + messageId);
}
return state != null;
}
@ -1767,6 +1811,14 @@ class PeerState {
}
}
/**
* Convenience for OutboundMessageState so it can fail itself
* @since 0.9.3
*/
public UDPTransport getTransport() {
return _transport;
}
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
/*

View File

@ -1678,7 +1678,17 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return getPeerState(dest) != null;
}
/**
* @since 0.9.3
*/
@Override
public boolean isBacklogged(Hash dest) {
PeerState peer = _peersByIdent.get(dest);
return peer != null && peer.isBacklogged();
}
public boolean allowConnection() {
return _peersByIdent.size() < getMaxConnections();
}
@ -2187,6 +2197,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(THINSP).append(peer.getConcurrentSends());
buf.append(THINSP).append(peer.getConcurrentSendWindow());
buf.append(THINSP).append(peer.getConsecutiveSendRejections());
if (peer.isBacklogged())
buf.append(' ').append(_("backlogged"));
buf.append("</td>");
buf.append("<td class=\"cells\" align=\"right\">");