SSU InboundMessageState -

Rewrite PartialBitfield for efficiency, less space and object churn
SSU ACKBitfield: Add ackCount()
PeerState.fetchPartialACKs() improvements
This commit is contained in:
zzz
2014-09-14 14:32:23 +00:00
parent a7763a08dc
commit 0a41052f3f
4 changed files with 77 additions and 33 deletions

View File

@ -5,12 +5,23 @@ package net.i2p.router.transport.udp;
* received messages
*/
interface ACKBitfield {
/** what message is this partially ACKing? */
public long getMessageId();
/** how many fragments are covered in this bitfield? */
public int fragmentCount();
/** has the given fragment been received? */
public boolean received(int fragmentNum);
/** has the entire message been received completely? */
public boolean receivedComplete();
/**
* Number of fragments acked in this bitfield.
* Faster than looping through received()
* @since 0.9.16
*/
public int ackCount();
}

View File

@ -220,47 +220,62 @@ class InboundMessageState implements CDQEntry {
}
public ACKBitfield createACKBitfield() {
return new PartialBitfield(_messageId, _fragments);
int sz = (_lastFragment >= 0) ? _lastFragment + 1 : _fragments.length;
return new PartialBitfield(_messageId, _fragments, sz);
}
/**
* A true partial bitfield that is not complete.
* A true partial bitfield that is probably not complete.
* fragmentCount() will return 64 if unknown.
*/
private static final class PartialBitfield implements ACKBitfield {
private final long _bitfieldMessageId;
private final boolean _fragmentsReceived[];
private final int _fragmentCount;
private final int _ackCount;
// bitfield, 1 for acked
private final long _fragmentAcks;
/**
* @param data each element is non-null or null for received or not
* @param lastFragment size of data to use
*/
public PartialBitfield(long messageId, Object data[]) {
public PartialBitfield(long messageId, Object data[], int size) {
if (size > MAX_FRAGMENTS)
throw new IllegalArgumentException();
_bitfieldMessageId = messageId;
boolean fragmentsRcvd[] = null;
for (int i = data.length - 1; i >= 0; i--) {
int ackCount = 0;
long acks = 0;
for (int i = 0; i < size; i++) {
if (data[i] != null) {
if (fragmentsRcvd == null)
fragmentsRcvd = new boolean[i+1];
fragmentsRcvd[i] = true;
acks |= mask(i);
ackCount++;
}
}
if (fragmentsRcvd == null)
_fragmentsReceived = new boolean[0];
else
_fragmentsReceived = fragmentsRcvd;
_fragmentAcks = acks;
_fragmentCount = size;
_ackCount = ackCount;
}
public int fragmentCount() { return _fragmentsReceived.length; }
/**
* @param fragment 0-63
*/
private static long mask(int fragment) {
return 1L << fragment;
}
public int fragmentCount() { return _fragmentCount; }
public int ackCount() { return _ackCount; }
public long getMessageId() { return _bitfieldMessageId; }
public boolean received(int fragmentNum) {
if ( (fragmentNum < 0) || (fragmentNum >= _fragmentsReceived.length) )
if (fragmentNum < 0 || fragmentNum >= _fragmentCount)
return false;
return _fragmentsReceived[fragmentNum];
return (_fragmentAcks & mask(fragmentNum)) != 0;
}
/** @return false always */
public boolean receivedComplete() { return false; }
public boolean receivedComplete() { return _ackCount == _fragmentCount; }
@Override
public String toString() {
@ -268,9 +283,11 @@ class InboundMessageState implements CDQEntry {
buf.append("Partial ACK of ");
buf.append(_bitfieldMessageId);
buf.append(" with ACKs for: ");
for (int i = 0; i < _fragmentsReceived.length; i++)
if (_fragmentsReceived[i])
for (int i = 0; i < _fragmentCount; i++) {
if (received(i))
buf.append(i).append(" ");
}
buf.append(" / ").append(_fragmentCount);
return buf.toString();
}
}

View File

@ -1035,7 +1035,7 @@ class PeerState {
* no full bitfields are included.
*/
void fetchPartialACKs(List<ACKBitfield> rv) {
InboundMessageState states[] = null;
List<InboundMessageState> states = null;
int curState = 0;
synchronized (_inboundMessages) {
int numMessages = _inboundMessages.size();
@ -1052,17 +1052,17 @@ class PeerState {
} else {
if (!state.isComplete()) {
if (states == null)
states = new InboundMessageState[numMessages];
states[curState++] = state;
states = new ArrayList<InboundMessageState>(numMessages);
states.add(state);
}
}
}
}
if (states != null) {
// _inboundMessages is a Map (unordered), so why bother going backwards?
for (int i = curState-1; i >= 0; i--) {
if (states[i] != null)
rv.add(states[i].createACKBitfield());
for (InboundMessageState ims : states) {
ACKBitfield abf = ims.createACKBitfield();
if (!abf.receivedComplete())
rv.add(abf);
}
}
}
@ -1076,6 +1076,7 @@ class PeerState {
public FullACKBitfield(long id) { _msgId = id; }
public int fragmentCount() { return 0; }
public int ackCount() { return 0; }
public long getMessageId() { return _msgId; }
public boolean received(int fragmentNum) { return true; }
public boolean receivedComplete() { return true; }
@ -1895,12 +1896,7 @@ class PeerState {
if (state != null) {
int numSends = state.getMaxSends();
int bits = bitfield.fragmentCount();
int numACKed = 0;
for (int i = 0; i < bits; i++)
if (bitfield.received(i))
numACKed++;
int numACKed = bitfield.ackCount();
_context.statManager().addRateData("udp.partialACKReceived", numACKed);
if (_log.shouldLog(Log.INFO))

View File

@ -529,6 +529,26 @@ class UDPPacketReader {
public int fragmentCount() { return _bitfieldSize * 7; }
public boolean receivedComplete() { return false; }
/**
* Number of fragments acked in this bitfield.
* Faster than looping through received()
* @since 0.9.16
*/
public int ackCount() {
int rv = 0;
for (int i = _bitfieldStart; i < _bitfieldStart + _bitfieldSize; i++) {
byte b = _message[i];
if ((b & 0x7f) != 0) {
for (int j = 0; j < 7; j++) {
if ((b & 0x01) != 0)
rv++;
b >>= 1;
}
}
}
return rv;
}
public boolean received(int fragmentNum) {
if ( (fragmentNum < 0) || (fragmentNum >= _bitfieldSize*7) )
return false;