SSU: Implement bundling of multiple fragments in a single data message.

This has always been in the spec and implemented in the receive side
since the beginning, so it is compatible with all versions.
- Switch preparePackets() return value from a "sparse array" to a list
- UDPPacketReader cleanups
- UDPPacket javadocs
This commit is contained in:
zzz
2014-09-12 15:17:14 +00:00
parent 0817b58b9d
commit 42eb43f713
7 changed files with 356 additions and 164 deletions

View File

@ -1,14 +1,19 @@
package net.i2p.router.transport.udp;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.router.RouterInfo;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.udp.PacketBuilder.Fragment;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
@ -74,6 +79,7 @@ class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendConfirmTime", "How long it takes to send a message and get the ACK", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendConfirmFragments", "How many fragments are included in a fully ACKed message", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendFragmentsPerPacket", "How many fragments are sent in a data packet", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendConfirmVolley", "How many times did fragments need to be sent before ACK", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendFailed", "How many sends a failed message was pushed", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES);
@ -81,7 +87,7 @@ class OutboundMessageFragments {
_context.statManager().createRateStat("udp.outboundActivePeers", "How many peers we are actively sending to", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPiggyback", "How many acks were piggybacked on a data packet (time == message lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPiggybackPartial", "How many partial acks were piggybacked on a data packet (time == message lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRequiredRateStat("udp.packetsRetransmitted", "Lifetime of packets during retransmission (ms)", "udp", UDPTransport.RATES);
@ -236,19 +242,17 @@ class OutboundMessageFragments {
/**
* Fetch all the packets for a message volley, blocking until there is a
* message which can be fully transmitted (or the transport is shut down).
* The returned array may be sparse, with null packets taking the place of
* already ACKed fragments.
*
* NOT thread-safe. Called by the PacketPusher thread only.
*
* @return null only on shutdown
*/
public UDPPacket[] getNextVolley() {
public List<UDPPacket> getNextVolley() {
PeerState peer = null;
OutboundMessageState state = null;
List<OutboundMessageState> states = null;
// Keep track of how many we've looked at, since we don't start the iterator at the beginning.
int peersProcessed = 0;
while (_alive && (state == null) ) {
while (_alive && (states == null) ) {
int nextSendDelay = Integer.MAX_VALUE;
// no, not every time - O(n**2) - do just before waiting below
//finishMessages();
@ -275,8 +279,8 @@ class OutboundMessageFragments {
continue;
}
peersProcessed++;
state = peer.allocateSend();
if (state != null) {
states = peer.allocateSend();
if (states != null) {
// we have something to send and we will be returning it
break;
} else if (peersProcessed >= _activePeers.size()) {
@ -292,13 +296,13 @@ class OutboundMessageFragments {
}
}
if (peer != null && _log.shouldLog(Log.DEBUG))
_log.debug("Done looping, next peer we are sending for: " +
peer.getRemotePeer());
//if (peer != null && _log.shouldLog(Log.DEBUG))
// _log.debug("Done looping, next peer we are sending for: " +
// peer.getRemotePeer());
// if we've gone all the way through the loop, wait
// ... unless nextSendDelay says we have more ready now
if (state == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
_isWaiting = true;
peersProcessed = 0;
// why? we do this in the loop one at a time
@ -328,9 +332,9 @@ class OutboundMessageFragments {
} // while alive && state == null
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + state);
_log.debug("Sending " + DataHelper.toString(states));
UDPPacket packets[] = preparePackets(state, peer);
List<UDPPacket> packets = preparePackets(states, peer);
/****
if ( (state != null) && (state.getMessage() != null) ) {
@ -352,58 +356,108 @@ class OutboundMessageFragments {
/**
* @return null if state or peer is null
*/
private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
if ( (state != null) && (peer != null) ) {
int fragments = state.getFragmentCount();
if (fragments < 0)
return null;
private List<UDPPacket> preparePackets(List<OutboundMessageState> states, PeerState peer) {
if (states == null || peer == null)
return null;
// ok, simplest possible thing is to always tack on the bitfields if
List<Long> msgIds = peer.getCurrentFullACKs();
int newFullAckCount = msgIds.size();
msgIds.addAll(peer.getCurrentResendACKs());
List<ACKBitfield> partialACKBitfields = new ArrayList<ACKBitfield>();
peer.fetchPartialACKs(partialACKBitfields);
int piggybackedPartialACK = partialACKBitfields.size();
// getCurrentFullACKs() already makes a copy, do we need to copy again?
// YES because buildPacket() now removes them (maybe)
List<Long> remaining = new ArrayList<Long>(msgIds);
int sparseCount = 0;
UDPPacket rv[] = new UDPPacket[fragments]; //sparse
// ok, simplest possible thing is to always tack on the bitfields if
List<Long> msgIds = peer.getCurrentFullACKs();
int newFullAckCount = msgIds.size();
msgIds.addAll(peer.getCurrentResendACKs());
List<ACKBitfield> partialACKBitfields = new ArrayList<ACKBitfield>();
peer.fetchPartialACKs(partialACKBitfields);
int piggybackedPartialACK = partialACKBitfields.size();
// getCurrentFullACKs() already makes a copy, do we need to copy again?
// YES because buildPacket() now removes them (maybe)
List<Long> remaining = new ArrayList<Long>(msgIds);
// build the list of fragments to send
List<Fragment> toSend = new ArrayList<Fragment>(8);
for (OutboundMessageState state : states) {
int fragments = state.getFragmentCount();
int queued = 0;
for (int i = 0; i < fragments; i++) {
if (state.needsSending(i)) {
int before = remaining.size();
try {
rv[i] = _builder.buildPacket(state, i, peer, remaining, newFullAckCount, partialACKBitfields);
} catch (ArrayIndexOutOfBoundsException aioobe) {
_log.log(Log.CRIT, "Corrupt trying to build a packet - please tell jrandom: " +
partialACKBitfields + " / " + remaining + " / " + msgIds);
sparseCount++;
continue;
}
int after = remaining.size();
newFullAckCount = Math.max(0, newFullAckCount - (before - after));
if (rv[i] == null) {
sparseCount++;
continue;
}
rv[i].setFragmentCount(fragments);
OutNetMessage msg = state.getMessage();
if (msg != null)
rv[i].setMessageType(msg.getMessageTypeId());
else
rv[i].setMessageType(-1);
} else {
sparseCount++;
toSend.add(new Fragment(state, i));
queued++;
}
}
if (sparseCount > 0)
remaining.clear();
// per-state stats
if (queued > 0 && state.getPushCount() > 1) {
peer.messageRetransmitted(queued);
// _packetsRetransmitted += toSend; // lifetime for the transport
_context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted());
_context.statManager().addRateData("udp.packetsRetransmitted", state.getLifetime(), peer.getPacketsTransmitted());
if (_log.shouldLog(Log.INFO))
_log.info("Retransmitting " + state + " to " + peer);
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), queued);
}
}
if (toSend.isEmpty())
return null;
int fragmentsToSend = toSend.size();
// sort by size, biggest first
// don't bother unless more than one state (fragments are already sorted within a state)
if (fragmentsToSend > 1 && states.size() > 1)
Collections.sort(toSend, new FragmentComparator());
List<Fragment> sendNext = new ArrayList<Fragment>(Math.min(toSend.size(), 4));
List<UDPPacket> rv = new ArrayList<UDPPacket>(toSend.size());
for (int i = 0; i < toSend.size(); i++) {
Fragment next = toSend.get(i);
sendNext.add(next);
OutboundMessageState state = next.state;
OutNetMessage msg = state.getMessage();
int msgType = (msg != null) ? msg.getMessageTypeId() : -1;
if (_log.shouldLog(Log.INFO))
_log.info("Building packet for " + next + " to " + peer);
int curTotalDataSize = state.fragmentSize(next.num);
// now stuff in more fragments if they fit
if (i +1 < toSend.size()) {
int maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
for (int j = i + 1; j < toSend.size(); j++) {
next = toSend.get(j);
int nextDataSize = next.state.fragmentSize(next.num);
//if (PacketBuilder.canFitAnotherFragment(peer, sendNext.size(), curTotalDataSize, nextDataSize)) {
//if (_builder.canFitAnotherFragment(peer, sendNext.size(), curTotalDataSize, nextDataSize)) {
if (nextDataSize <= maxAvail) {
// add it
toSend.remove(j);
j--;
sendNext.add(next);
curTotalDataSize += nextDataSize;
maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
if (_log.shouldLog(Log.INFO))
_log.info("Adding in additional " + next + " to " + peer);
} // else too big
}
}
int before = remaining.size();
UDPPacket pkt = _builder.buildPacket(sendNext, peer, remaining, newFullAckCount, partialACKBitfields);
if (pkt != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Built packet with " + sendNext.size() + " fragments totalling " + curTotalDataSize +
" data bytes to " + peer);
_context.statManager().addRateData("udp.sendFragmentsPerPacket", sendNext.size());
}
sendNext.clear();
if (pkt == null) {
if (_log.shouldLog(Log.WARN))
_log.info("Build packet FAIL for " + DataHelper.toString(sendNext) + " to " + peer);
continue;
}
rv.add(pkt);
int after = remaining.size();
newFullAckCount = Math.max(0, newFullAckCount - (before - after));
int piggybackedAck = 0;
if (msgIds.size() != remaining.size()) {
for (int i = 0; i < msgIds.size(); i++) {
Long id = msgIds.get(i);
for (int j = 0; j < msgIds.size(); j++) {
Long id = msgIds.get(j);
if (!remaining.contains(id)) {
peer.removeACKMessage(id);
piggybackedAck++;
@ -411,29 +465,36 @@ class OutboundMessageFragments {
}
}
if (sparseCount > 0)
_context.statManager().addRateData("udp.sendSparse", sparseCount, state.getLifetime());
if (piggybackedAck > 0)
_context.statManager().addRateData("udp.sendPiggyback", piggybackedAck, state.getLifetime());
_context.statManager().addRateData("udp.sendPiggyback", piggybackedAck);
if (piggybackedPartialACK - partialACKBitfields.size() > 0)
_context.statManager().addRateData("udp.sendPiggybackPartial", piggybackedPartialACK - partialACKBitfields.size(), state.getLifetime());
if (_log.shouldLog(Log.INFO))
_log.info("Building packet for " + state + " to " + peer + " with sparse count: " + sparseCount);
peer.packetsTransmitted(fragments - sparseCount);
if (state.getPushCount() > 1) {
int toSend = fragments-sparseCount;
peer.messageRetransmitted(toSend);
// _packetsRetransmitted += toSend; // lifetime for the transport
_context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted());
_context.statManager().addRateData("udp.packetsRetransmitted", state.getLifetime(), peer.getPacketsTransmitted());
if (_log.shouldLog(Log.INFO))
_log.info("Retransmitting " + state + " to " + peer);
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), toSend);
}
return rv;
} else {
// !alive
return null;
// following for debugging and stats
pkt.setFragmentCount(sendNext.size());
pkt.setMessageType(msgType); //type of first fragment
}
int sent = rv.size();
peer.packetsTransmitted(sent);
if (_log.shouldLog(Log.INFO))
_log.info("Sent " + fragmentsToSend + " fragments of " + states.size() +
" messages in " + sent + " packets to " + peer);
return rv;
}
/**
* Biggest first
* @since 0.9.16
*/
private static class FragmentComparator implements Comparator<Fragment>, Serializable {
public int compare(Fragment l, Fragment r) {
// reverse
return r.state.fragmentSize(r.num) - l.state.fragmentSize(l.num);
}
}

View File

@ -30,7 +30,10 @@ class OutboundMessageState implements CDPQEntry {
private int _fragmentSize;
/** size of the I2NP message */
private int _totalSize;
/** sends[i] is how many times the fragment has been sent, or -1 if ACKed */
/** sends[i] is how many times the fragment has been sent, or -1 if ACKed
* TODO this may not accurately track the number of retransmissions per-fragment,
* and we don't make any use of it anyway, so we should just make it a bitfield.
*/
private short _fragmentSends[];
private final long _startedOn;
private long _nextSendTime;
@ -205,7 +208,6 @@ class OutboundMessageState implements CDPQEntry {
}
public boolean needsSending(int fragment) {
short sends[] = _fragmentSends;
if ( (sends == null) || (fragment >= sends.length) || (fragment < 0) )
return false;
@ -225,10 +227,12 @@ class OutboundMessageState implements CDPQEntry {
public boolean acked(ACKBitfield bitfield) {
// stupid brute force, but the cardinality should be trivial
short sends[] = _fragmentSends;
if (sends != null)
for (int i = 0; i < bitfield.fragmentCount() && i < sends.length; i++)
if (sends != null) {
for (int i = 0; i < bitfield.fragmentCount() && i < sends.length; i++) {
if (bitfield.received(i))
sends[i] = (short)-1;
}
}
boolean rv = isComplete();
/****
@ -263,7 +267,10 @@ class OutboundMessageState implements CDPQEntry {
*/
public int getPushCount() { return _pushCount; }
/** note that we have pushed the message fragments */
/**
* Note that we have pushed the message fragments.
* Increments push count (and max sends... why?)
*/
public void push() {
// these will never be different...
_pushCount++;
@ -272,7 +279,7 @@ class OutboundMessageState implements CDPQEntry {
if (_fragmentSends != null)
for (int i = 0; i < _fragmentSends.length; i++)
if (_fragmentSends[i] >= (short)0)
_fragmentSends[i] = (short)(1 + _fragmentSends[i]);
_fragmentSends[i]++;
}
@ -342,12 +349,15 @@ class OutboundMessageState implements CDPQEntry {
* Throws NPE before then.
*
* Caller should synchronize
*
* @return true if fragment is not acked yet
*/
public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; }
/**
* This assumes fragment(int size) has been called
* @param fragmentNum the number of the fragment
*
* @return the size of the fragment specified by the number
*/
public int fragmentSize(int fragmentNum) {

View File

@ -130,8 +130,10 @@ class PacketBuilder {
/** if no extended options or rekey data, which we don't support = 37 */
public static final int HEADER_SIZE = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE + 1 + 4;
/** 4 byte msg ID + 3 byte fragment info */
public static final int FRAGMENT_HEADER_SIZE = 7;
/** not including acks. 46 */
public static final int DATA_HEADER_SIZE = HEADER_SIZE + 9;
public static final int DATA_HEADER_SIZE = HEADER_SIZE + 2 + FRAGMENT_HEADER_SIZE;
/** IPv4 only */
public static final int IP_HEADER_SIZE = 20;
@ -178,6 +180,49 @@ class PacketBuilder {
}
****/
/**
* Class for passing multiple fragments to buildPacket()
*
* @since 0.9.16
*/
public static class Fragment {
public final OutboundMessageState state;
public final int num;
public Fragment(OutboundMessageState state, int num) {
this.state = state;
this.num = num;
}
@Override
public String toString() {
return "Fragment " + num + " (" + state.fragmentSize(num) + " bytes) of " + state;
}
}
/**
* Will a packet to 'peer' that already has 'numFragments' fragments
* totalling 'curDataSize' bytes fit another fragment of size 'newFragSize' ??
*
* This doesn't leave anything for acks.
*
* @param numFragments >= 1
* @since 0.9.16
*/
public static int getMaxAdditionalFragmentSize(PeerState peer, int numFragments, int curDataSize) {
int available = peer.getMTU() - curDataSize;
if (peer.isIPv6())
available -= MIN_IPV6_DATA_PACKET_OVERHEAD;
else
available -= MIN_DATA_PACKET_OVERHEAD;
// OVERHEAD above includes 1 * FRAGMENT+HEADER_SIZE;
// this adds for the others, plus the new one.
available -= numFragments * FRAGMENT_HEADER_SIZE;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("now: " + numFragments + " / " + curDataSize + " avail: " + available);
return available;
}
/**
* This builds a data packet (PAYLOAD_TYPE_DATA).
* See the methods below for the other message types.
@ -231,37 +276,65 @@ class PacketBuilder {
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer,
List<Long> ackIdsRemaining, int newAckCount,
List<ACKBitfield> partialACKsRemaining) {
List<Fragment> frags = Collections.singletonList(new Fragment(state, fragment));
return buildPacket(frags, peer, ackIdsRemaining, newAckCount, partialACKsRemaining);
}
/*
* Multiple fragments
*
* @since 0.9.16
*/
public UDPPacket buildPacket(List<Fragment> fragments, PeerState peer,
List<Long> ackIdsRemaining, int newAckCount,
List<ACKBitfield> partialACKsRemaining) {
StringBuilder msg = null;
if (_log.shouldLog(Log.INFO)) {
msg = new StringBuilder(256);
msg.append("Data pkt to ").append(peer.getRemotePeer().toBase64());
}
// calculate data size
int numFragments = fragments.size();
int dataSize = 0;
for (int i = 0; i < numFragments; i++) {
Fragment frag = fragments.get(i);
OutboundMessageState state = frag.state;
int fragment = frag.num;
int sz = state.fragmentSize(fragment);
dataSize += sz;
if (msg != null) {
msg.append(" Fragment ").append(i);
msg.append(": msg ").append(state.getMessageId()).append(' ').append(fragment);
msg.append('/').append(state.getFragmentCount());
msg.append(' ').append(sz);
}
}
if (dataSize < 0)
return null;
// calculate size available for acks
int currentMTU = peer.getMTU();
int availableForAcks = currentMTU - dataSize;
int ipHeaderSize;
if (peer.isIPv6()) {
availableForAcks -= MIN_IPV6_DATA_PACKET_OVERHEAD;
ipHeaderSize = IPV6_HEADER_SIZE;
} else {
availableForAcks -= MIN_DATA_PACKET_OVERHEAD;
ipHeaderSize = IP_HEADER_SIZE;
}
if (numFragments > 1)
availableForAcks -= (numFragments - 1) * FRAGMENT_HEADER_SIZE;
int availableForExplicitAcks = availableForAcks;
// make the packet
UDPPacket packet = buildPacketHeader((byte)(UDPPacket.PAYLOAD_TYPE_DATA << 4));
DatagramPacket pkt = packet.getPacket();
byte data[] = pkt.getData();
int off = HEADER_SIZE;
StringBuilder msg = null;
if (_log.shouldLog(Log.INFO)) {
msg = new StringBuilder(128);
msg.append("Data pkt to ").append(peer.getRemotePeer().toBase64());
msg.append(" msg ").append(state.getMessageId()).append(" frag:").append(fragment);
msg.append('/').append(state.getFragmentCount());
}
int dataSize = state.fragmentSize(fragment);
if (dataSize < 0) {
packet.release();
return null;
}
int currentMTU = peer.getMTU();
int availableForAcks = currentMTU - dataSize;
int ipHeaderSize;
if (peer.getRemoteIP().length == 4) {
availableForAcks -= MIN_DATA_PACKET_OVERHEAD;
ipHeaderSize = IP_HEADER_SIZE;
} else {
availableForAcks -= MIN_IPV6_DATA_PACKET_OVERHEAD;
ipHeaderSize = IPV6_HEADER_SIZE;
}
int availableForExplicitAcks = availableForAcks;
// ok, now for the body...
// just always ask for an ACK for now...
@ -299,7 +372,7 @@ class PacketBuilder {
off++;
if (msg != null) {
msg.append(" data: ").append(dataSize).append(" bytes, mtu: ")
msg.append(" Total data: ").append(dataSize).append(" bytes, mtu: ")
.append(currentMTU).append(", ")
.append(newAckCount).append(" new full acks requested, ")
.append(ackIdsRemaining.size() - newAckCount).append(" resend acks requested, ")
@ -325,7 +398,7 @@ class PacketBuilder {
DataHelper.toLong(data, off, 4, ackId.longValue());
off += 4;
if (msg != null) // logging it
msg.append(" full ack: ").append(ackId.longValue());
msg.append(' ').append(ackId.longValue());
}
//acksIncluded = true;
}
@ -357,7 +430,7 @@ class PacketBuilder {
}
iter.remove();
if (msg != null) // logging it
msg.append(" partial ack: ").append(bitfield);
msg.append(' ').append(bitfield);
}
//acksIncluded = true;
// now jump back and fill in the number of bitfields *actually* included
@ -367,30 +440,42 @@ class PacketBuilder {
//if ( (msg != null) && (acksIncluded) )
// _log.debug(msg.toString());
DataHelper.toLong(data, off, 1, 1); // only one fragment in this message
DataHelper.toLong(data, off, 1, numFragments);
off++;
DataHelper.toLong(data, off, 4, state.getMessageId());
off += 4;
// now write each fragment
int sizeWritten = 0;
for (int i = 0; i < numFragments; i++) {
Fragment frag = fragments.get(i);
OutboundMessageState state = frag.state;
int fragment = frag.num;
DataHelper.toLong(data, off, 4, state.getMessageId());
off += 4;
data[off] |= fragment << 1;
if (fragment == state.getFragmentCount() - 1)
data[off] |= 1; // isLast
off++;
data[off] |= fragment << 1;
if (fragment == state.getFragmentCount() - 1)
data[off] |= 1; // isLast
off++;
DataHelper.toLong(data, off, 2, dataSize);
data[off] &= (byte)0x3F; // 2 highest bits are reserved
off += 2;
int fragSize = state.fragmentSize(fragment);
DataHelper.toLong(data, off, 2, fragSize);
data[off] &= (byte)0x3F; // 2 highest bits are reserved
off += 2;
int sizeWritten = state.writeFragment(data, off, fragment);
int sz = state.writeFragment(data, off, fragment);
off += sz;
sizeWritten += sz;
}
if (sizeWritten != dataSize) {
if (sizeWritten < 0) {
// probably already freed from OutboundMessageState
if (_log.shouldLog(Log.WARN))
_log.warn("Write failed for fragment " + fragment + " of " + state.getMessageId());
_log.warn("Write failed for " + DataHelper.toString(fragments));
} else {
_log.error("Size written: " + sizeWritten + " but size: " + dataSize
+ " for fragment " + fragment + " of " + state.getMessageId());
_log.error("Size written: " + sizeWritten + " but size: " + dataSize +
" for " + DataHelper.toString(fragments));
}
packet.release();
return null;
@ -398,31 +483,44 @@ class PacketBuilder {
// _log.debug("Size written: " + sizeWritten + " for fragment " + fragment
// + " of " + state.getMessageId());
}
// put this after writeFragment() since dataSize will be zero for use-after-free
if (dataSize == 0) {
// OK according to the protocol but if we send it, it's a bug
_log.error("Sending zero-size fragment " + fragment + " of " + state + " for " + peer);
_log.error("Sending zero-size fragment??? for " + DataHelper.toString(fragments));
}
off += dataSize;
// pad up so we're on the encryption boundary
off = pad1(data, off);
off = pad2(data, off, currentMTU - (ipHeaderSize + UDP_HEADER_SIZE));
pkt.setLength(off);
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
if (_log.shouldLog(Log.INFO)) {
if (msg != null) {
// verify multi-fragment packet
//if (numFragments > 1) {
// msg.append("\nDataReader dump\n:");
// UDPPacketReader reader = new UDPPacketReader(_context);
// reader.initialize(packet);
// UDPPacketReader.DataReader dreader = reader.getDataReader();
// try {
// msg.append(dreader.toString());
// } catch (Exception e) {
// _log.info("blowup, dump follows", e);
// msg.append('\n');
// msg.append(net.i2p.util.HexDump.dump(data, 0, off));
// }
//}
msg.append(" pkt size ").append(off + (ipHeaderSize + UDP_HEADER_SIZE));
_log.info(msg.toString());
}
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
// the packet could have been built before the current mtu got lowered, so
// compare to LARGE_MTU
if (off + (ipHeaderSize + UDP_HEADER_SIZE) > PeerState.LARGE_MTU) {
_log.error("Size is " + off + " for " + packet +
" fragment " + fragment +
" data size " + dataSize +
" pkt size " + (off + (ipHeaderSize + UDP_HEADER_SIZE)) +
" MTU " + currentMTU +
@ -430,7 +528,7 @@ class PacketBuilder {
availableForExplicitAcks + " for full acks " +
explicitToSend + " full acks included " +
partialAcksToSend + " partial acks included " +
" OMS " + state, new Exception());
" Fragments: " + DataHelper.toString(fragments), new Exception());
}
return packet;

View File

@ -37,11 +37,10 @@ class PacketPusher implements Runnable {
public void run() {
while (_alive) {
try {
UDPPacket packets[] = _fragments.getNextVolley();
List<UDPPacket> packets = _fragments.getNextVolley();
if (packets != null) {
for (int i = 0; i < packets.length; i++) {
if (packets[i] != null) // null for ACKed fragments
send(packets[i]);
for (int i = 0; i < packets.size(); i++) {
send(packets.get(i));
}
}
} catch (Exception e) {

View File

@ -242,6 +242,9 @@ class PeerState {
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
/** max number of msgs returned from allocateSend() */
private static final int MAX_ALLOCATE_SEND = 2;
/**
* Was 32 before 0.9.2, but since the streaming lib goes up to 128,
* we would just drop our own msgs right away during slow start.
@ -1563,15 +1566,16 @@ class PeerState {
}
/**
* Pick a message we want to send and allocate it out of our window
* Pick one or more messages we want to send and allocate them out of our window
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 2nd, if finishMessages() returned > 0.
* TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
*
* @return allocated message to send, or null if no messages or no resources
* @return allocated messages to send (never empty), or null if no messages or no resources
*/
public OutboundMessageState allocateSend() {
public List<OutboundMessageState> allocateSend() {
if (_dead) return null;
List<OutboundMessageState> rv = null;
synchronized (_outboundMessages) {
for (OutboundMessageState state : _outboundMessages) {
// We have 3 return values, because if allocateSendingBytes() returns false,
@ -1588,44 +1592,54 @@ class PeerState {
msg.timestamp("not reached for allocation " + msgs.size() + " other peers");
}
*/
return state;
if (rv == null)
rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
rv.add(state);
if (rv.size() >= MAX_ALLOCATE_SEND)
return rv;
} else if (should == ShouldSend.NO_BW) {
// no more bandwidth available
// we don't bother looking for a smaller msg that would fit.
// By not looking further, we keep strict sending order, and that allows
// some efficiency in acked() below.
if (_log.shouldLog(Log.DEBUG))
if (rv == null && _log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
return null;
return rv;
} /* else {
OutNetMessage msg = state.getMessage();
if (msg != null)
msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
} */
}
// Peek at head of _outboundQueue and see if we can send it.
// If so, pull it off, put it in _outbundMessages, test
// again for bandwidth if necessary, and return it.
OutboundMessageState state = _outboundQueue.peek();
if (state != null && ShouldSend.YES == locked_shouldSend(state)) {
OutboundMessageState state;
while ((state = _outboundQueue.peek()) != null &&
ShouldSend.YES == locked_shouldSend(state)) {
// we could get a different state, or null, when we poll,
// due to AQM drops, so we test again if necessary
OutboundMessageState dequeuedState = _outboundQueue.poll();
if (dequeuedState != null) {
_outboundMessages.add(dequeuedState);
if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(dequeuedState)) {
if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(state)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
return dequeuedState;
if (rv == null)
rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
rv.add(state);
if (rv.size() >= MAX_ALLOCATE_SEND)
return rv;
}
}
}
}
if (_log.shouldLog(Log.DEBUG))
if ( rv == null && _log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
return null;
return rv;
}
/**

View File

@ -166,7 +166,11 @@ class UDPPacket implements CDQEntry {
int getMessageType() { return _messageType; }
/** only for debugging and stats, does not go on the wire */
void setMessageType(int type) { _messageType = type; }
/** only for debugging and stats */
int getFragmentCount() { return _fragmentCount; }
/** only for debugging and stats */
void setFragmentCount(int count) { _fragmentCount = count; }
RemoteHostId getRemoteHost() {

View File

@ -279,26 +279,33 @@ class UDPPacketReader {
public boolean readACKsIncluded() {
return flagSet(UDPPacket.DATA_FLAG_EXPLICIT_ACK);
}
public boolean readACKBitfieldsIncluded() {
return flagSet(UDPPacket.DATA_FLAG_ACK_BITFIELDS);
}
public boolean readECN() {
return flagSet(UDPPacket.DATA_FLAG_ECN);
}
public boolean readWantPreviousACKs() {
return flagSet(UDPPacket.DATA_FLAG_WANT_ACKS);
}
public boolean readReplyRequested() {
return flagSet(UDPPacket.DATA_FLAG_WANT_REPLY);
}
public boolean readExtendedDataIncluded() {
return flagSet(UDPPacket.DATA_FLAG_EXTENDED);
}
public int readACKCount() {
if (!readACKsIncluded()) return 0;
int off = readBodyOffset() + 1;
return (int)DataHelper.fromLong(_message, off, 1);
}
public long readACK(int index) {
if (!readACKsIncluded()) return -1;
int off = readBodyOffset() + 1;
@ -306,6 +313,7 @@ class UDPPacketReader {
off++;
return DataHelper.fromLong(_message, off + (4 * index), 4);
}
public ACKBitfield[] readACKBitfields() {
if (!readACKBitfieldsIncluded()) return null;
int off = readBodyOffset() + 1;
@ -354,28 +362,29 @@ class UDPPacketReader {
int fragmentBegin = getFragmentBegin(fragmentNum);
return DataHelper.fromLong(_message, fragmentBegin, 4);
}
public int readMessageFragmentNum(int fragmentNum) {
int off = getFragmentBegin(fragmentNum);
off += 4; // messageId
return (_message[off] & 0xFF) >>> 1;
}
public boolean readMessageIsLast(int fragmentNum) {
int off = getFragmentBegin(fragmentNum);
off += 4; // messageId
return ((_message[off] & 1) != 0);
}
public int readMessageFragmentSize(int fragmentNum) {
int off = getFragmentBegin(fragmentNum);
off += 4; // messageId
off++; // fragment info
off += 5; // messageId + fragment info
return ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
}
public void readMessageFragment(int fragmentNum, byte target[], int targetOffset)
throws ArrayIndexOutOfBoundsException {
int off = getFragmentBegin(fragmentNum);
off += 4; // messageId
off++; // fragment info
off += 5; // messageId + fragment info
int size = ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
off += 2;
System.arraycopy(_message, off, target, targetOffset, size);
@ -405,16 +414,14 @@ class UDPPacketReader {
}
off++; // # fragments
if (fragmentNum == 0) {
return off;
} else {
if (fragmentNum > 0) {
for (int i = 0; i < fragmentNum; i++) {
off += 5; // messageId+info
off += ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
off += 2;
}
return off;
}
return off;
}
private boolean flagSet(byte flag) {
@ -477,10 +484,9 @@ class UDPPacketReader {
buf.append(" isLast? ").append(isLast);
buf.append(" info ").append(_message[off-1]);
int size = ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
buf.append(" with ").append(size).append(" bytes");
buf.append(' ');
off += size;
off += 2;
buf.append(" with ").append(size).append(" bytes; ");
off += size;
}
return buf.toString();