* UDP PacketBuilder:

- Again allow transmitting all new acks if there is room;
      only limit resend acks
    - Sanity checks to limit to 255 acks
This commit is contained in:
zzz
2011-12-07 00:51:31 +00:00
parent 3bd641abd0
commit 258effcc84
3 changed files with 61 additions and 18 deletions

View File

@ -372,6 +372,8 @@ class OutboundMessageFragments {
// ok, simplest possible thing is to always tack on the bitfields if // ok, simplest possible thing is to always tack on the bitfields if
List<Long> msgIds = peer.getCurrentFullACKs(); List<Long> msgIds = peer.getCurrentFullACKs();
int newFullAckCount = msgIds.size();
msgIds.addAll(peer.getCurrentResendACKs());
List<ACKBitfield> partialACKBitfields = new ArrayList(); List<ACKBitfield> partialACKBitfields = new ArrayList();
peer.fetchPartialACKs(partialACKBitfields); peer.fetchPartialACKs(partialACKBitfields);
int piggybackedPartialACK = partialACKBitfields.size(); int piggybackedPartialACK = partialACKBitfields.size();
@ -382,14 +384,17 @@ class OutboundMessageFragments {
UDPPacket rv[] = new UDPPacket[fragments]; //sparse UDPPacket rv[] = new UDPPacket[fragments]; //sparse
for (int i = 0; i < fragments; i++) { for (int i = 0; i < fragments; i++) {
if (state.needsSending(i)) { if (state.needsSending(i)) {
int before = remaining.size();
try { try {
rv[i] = _builder.buildPacket(state, i, peer, remaining, partialACKBitfields); rv[i] = _builder.buildPacket(state, i, peer, remaining, newFullAckCount, partialACKBitfields);
} catch (ArrayIndexOutOfBoundsException aioobe) { } catch (ArrayIndexOutOfBoundsException aioobe) {
_log.log(Log.CRIT, "Corrupt trying to build a packet - please tell jrandom: " + _log.log(Log.CRIT, "Corrupt trying to build a packet - please tell jrandom: " +
partialACKBitfields + " / " + remaining + " / " + msgIds); partialACKBitfields + " / " + remaining + " / " + msgIds);
sparseCount++; sparseCount++;
continue; continue;
} }
int after = remaining.size();
newFullAckCount = Math.max(0, newFullAckCount - (before - after));
if (rv[i] == null) { if (rv[i] == null) {
sparseCount++; sparseCount++;
continue; continue;

View File

@ -138,17 +138,20 @@ class PacketBuilder {
/** 74 */ /** 74 */
public static final int MIN_DATA_PACKET_OVERHEAD = IP_HEADER_SIZE + UDP_HEADER_SIZE + DATA_HEADER_SIZE; public static final int MIN_DATA_PACKET_OVERHEAD = IP_HEADER_SIZE + UDP_HEADER_SIZE + DATA_HEADER_SIZE;
/** /** one byte field */
* Only for data packets. No limit in ack-only packets. public static final int ABSOLUTE_MAX_ACKS = 255;
* This directly affects data packet overhead.
*/
private static final int MAX_EXPLICIT_ACKS_LARGE = 9;
/** /**
* Only for data packets. No limit in ack-only packets. * Only for data packets. No limit in ack-only packets.
* This directly affects data packet overhead. * This directly affects data packet overhead.
*/ */
private static final int MAX_EXPLICIT_ACKS_SMALL = 4; private static final int MAX_RESEND_ACKS_LARGE = 9;
/**
* Only for data packets. No limit in ack-only packets.
* This directly affects data packet overhead.
*/
private static final int MAX_RESEND_ACKS_SMALL = 4;
public PacketBuilder(I2PAppContext ctx, UDPTransport transport) { public PacketBuilder(I2PAppContext ctx, UDPTransport transport) {
_context = ctx; _context = ctx;
@ -202,6 +205,9 @@ class PacketBuilder {
* Not all message IDs will necessarily be sent, there may not be room. * Not all message IDs will necessarily be sent, there may not be room.
* non-null. * non-null.
* *
* @param newAckCount the number of ackIdsRemaining entries that are new. These must be the first
* ones in the list
*
* @param partialACKsRemaining list of messageIds (ACKBitfield) that should be acked by this packet. * @param partialACKsRemaining list of messageIds (ACKBitfield) that should be acked by this packet.
* The list itself is passed by reference, and if a messageId is * The list itself is passed by reference, and if a messageId is
* included, it should be removed from the list. * included, it should be removed from the list.
@ -212,7 +218,8 @@ class PacketBuilder {
* @return null on error * @return null on error
*/ */
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer,
List<Long> ackIdsRemaining, List<ACKBitfield> partialACKsRemaining) { List<Long> ackIdsRemaining, int newAckCount,
List<ACKBitfield> partialACKsRemaining) {
UDPPacket packet = buildPacketHeader((byte)(UDPPacket.PAYLOAD_TYPE_DATA << 4)); UDPPacket packet = buildPacketHeader((byte)(UDPPacket.PAYLOAD_TYPE_DATA << 4));
byte data[] = packet.getPacket().getData(); byte data[] = packet.getPacket().getData();
int off = HEADER_SIZE; int off = HEADER_SIZE;
@ -245,6 +252,8 @@ class PacketBuilder {
int partialAcksToSend = 0; int partialAcksToSend = 0;
if (availableForExplicitAcks >= 6 && !partialACKsRemaining.isEmpty()) { if (availableForExplicitAcks >= 6 && !partialACKsRemaining.isEmpty()) {
for (ACKBitfield bf : partialACKsRemaining) { for (ACKBitfield bf : partialACKsRemaining) {
if (partialAcksToSend >= ABSOLUTE_MAX_ACKS)
break; // ack count
if (bf.receivedComplete()) if (bf.receivedComplete())
continue; continue;
int acksz = 4 + (bf.fragmentCount() / 7) + 1; int acksz = 4 + (bf.fragmentCount() / 7) + 1;
@ -272,14 +281,17 @@ class PacketBuilder {
if (msg != null) { if (msg != null) {
msg.append(" data: ").append(dataSize).append(" bytes, mtu: ") msg.append(" data: ").append(dataSize).append(" bytes, mtu: ")
.append(currentMTU).append(", ") .append(currentMTU).append(", ")
.append(ackIdsRemaining.size()).append(" full acks requested, ") .append(newAckCount).append(" new full acks requested, ")
.append(ackIdsRemaining.size() - newAckCount).append(" resend acks requested, ")
.append(partialACKsRemaining.size()).append(" partial acks requested, ") .append(partialACKsRemaining.size()).append(" partial acks requested, ")
.append(availableForAcks).append(" avail. for all acks, ") .append(availableForAcks).append(" avail. for all acks, ")
.append(availableForExplicitAcks).append(" for full acks, "); .append(availableForExplicitAcks).append(" for full acks, ");
} }
int explicitToSend = Math.min(currentMTU > PeerState.MIN_MTU ? MAX_EXPLICIT_ACKS_LARGE : MAX_EXPLICIT_ACKS_SMALL, // always send all the new acks if we have room
Math.min((availableForExplicitAcks - 1) / 4, ackIdsRemaining.size())); int explicitToSend = Math.min(ABSOLUTE_MAX_ACKS,
Math.min(newAckCount + (currentMTU > PeerState.MIN_MTU ? MAX_RESEND_ACKS_LARGE : MAX_RESEND_ACKS_SMALL),
Math.min((availableForExplicitAcks - 1) / 4, ackIdsRemaining.size())));
if (explicitToSend > 0) { if (explicitToSend > 0) {
if (msg != null) if (msg != null)
msg.append(explicitToSend).append(" full acks included:"); msg.append(explicitToSend).append(" full acks included:");
@ -421,6 +433,9 @@ class PacketBuilder {
* An ack packet is just a data packet with no data. * An ack packet is just a data packet with no data.
* See buildPacket() for format. * See buildPacket() for format.
* *
* TODO MTU not enforced.
* TODO handle huge number of acks better
*
* @param ackBitfields list of ACKBitfield instances to either fully or partially ACK * @param ackBitfields list of ACKBitfield instances to either fully or partially ACK
*/ */
public UDPPacket buildACK(PeerState peer, List<ACKBitfield> ackBitfields) { public UDPPacket buildACK(PeerState peer, List<ACKBitfield> ackBitfields) {
@ -442,6 +457,12 @@ class PacketBuilder {
else else
partialACKCount++; partialACKCount++;
} }
// FIXME do better than this, we could still exceed MTU
if (fullACKCount > ABSOLUTE_MAX_ACKS ||
partialACKCount > ABSOLUTE_MAX_ACKS)
throw new IllegalArgumentException("Too many acks full/partial " + fullACKCount +
'/' + partialACKCount);
// ok, now for the body... // ok, now for the body...
if (fullACKCount > 0) if (fullACKCount > 0)
data[off] |= UDPPacket.DATA_FLAG_EXPLICIT_ACK; data[off] |= UDPPacket.DATA_FLAG_EXPLICIT_ACK;

View File

@ -714,8 +714,7 @@ class PeerState {
* "want to send" list. If the message id is transmitted to the peer, * "want to send" list. If the message id is transmitted to the peer,
* removeACKMessage(Long) should be called. * removeACKMessage(Long) should be called.
* *
* The returned list contains acks not yet sent, followed by * The returned list contains acks not yet sent only.
* a random assortment of acks already sent.
* The caller should NOT transmit all of them all the time, * The caller should NOT transmit all of them all the time,
* even if there is room, * even if there is room,
* or the packets will have way too much overhead. * or the packets will have way too much overhead.
@ -725,13 +724,31 @@ class PeerState {
public List<Long> getCurrentFullACKs() { public List<Long> getCurrentFullACKs() {
// no such element exception seen here // no such element exception seen here
List<Long> rv = new ArrayList(_currentACKs); List<Long> rv = new ArrayList(_currentACKs);
// include some for retransmission if (_log.shouldLog(Log.DEBUG))
_log.debug("Returning " + _currentACKs.size() + " current acks");
return rv;
}
/**
* Grab a list of message ids (Long) that we want to send to the remote
* peer, regardless of the packet size, but don't remove it from our
* "want to send" list.
*
* The returned list contains
* a random assortment of acks already sent.
* The caller should NOT transmit all of them all the time,
* even if there is room,
* or the packets will have way too much overhead.
*
* @return a new list, do as you like with it
* @since 0.8.12 was included in getCurrentFullACKs()
*/
public List<Long> getCurrentResendACKs() {
List<Long> randomResends = new ArrayList(_currentACKsResend); List<Long> randomResends = new ArrayList(_currentACKsResend);
Collections.shuffle(randomResends, _context.random()); Collections.shuffle(randomResends, _context.random());
rv.addAll(randomResends); if (_log.shouldLog(Log.DEBUG))
if (_log.shouldLog(Log.INFO)) _log.debug("Returning " + randomResends.size() + " resend acks");
_log.info("Returning " + _currentACKs.size() + " current and " + randomResends.size() + " resend acks"); return randomResends;
return rv;
} }
/** the ack was sent */ /** the ack was sent */