2005-10-11 jrandom

* Piggyback the SSU explicit ACKs with data packets (partial ACKs aren't
      yet piggybacked).  This is backwards compatible.
    * SML parser cleanup in Syndie
This commit is contained in:
jrandom
2005-10-11 07:07:39 +00:00
committed by zzz
parent 197237aa32
commit 123e0ba589
10 changed files with 181 additions and 43 deletions

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.263 $ $Date: 2005/10/09 06:35:14 $";
public final static String ID = "$Revision: 1.264 $ $Date: 2005/10/10 17:58:18 $";
public final static String VERSION = "0.6.1.2";
public final static long BUILD = 4;
public final static long BUILD = 5;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -34,6 +34,7 @@ public class ACKSender implements Runnable {
_context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", new long[] { 60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.ackFrequency", "how long ago did we send an ACK to this peer?", "udp", new long[] { 60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendACKRemaining", "when we ack a peer, how many peers are left waiting to ack?", "udp", new long[] { 60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.abortACK", "How often do we schedule up an ACK send only to find it had already been sent (through piggyback)?", "udp", new long[] { 60*1000, 60*60*1000 });
}
public void ackPeer(PeerState peer) {
@ -121,6 +122,8 @@ public class ACKSender implements Runnable {
_log.warn("Rerequesting ACK for peer " + peer);
ackPeer(peer);
}
} else {
_context.statManager().addRateData("udp.abortACK", 1, 0);
}
}
}

View File

@ -49,6 +49,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
_context.statManager().createRateStat("udp.ignoreRecentDuplicate", "Take note that we received a packet for a recently completed message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveMessagePeriod", "How long it takes to pull the message fragments out of a packet", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveACKPeriod", "How long it takes to pull the ACKs out of a packet", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receivePiggyback", "How many acks were included in a packet with data fragments (time == # data fragments)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public void startup() {
@ -75,14 +76,16 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
*/
public void receiveData(PeerState from, UDPPacketReader.DataReader data) {
long beforeMsgs = _context.clock().now();
receiveMessages(from, data);
int fragmentsIncluded = receiveMessages(from, data);
long afterMsgs = _context.clock().now();
receiveACKs(from, data);
int acksIncluded = receiveACKs(from, data);
long afterACKs = _context.clock().now();
from.packetReceived();
_context.statManager().addRateData("udp.receiveMessagePeriod", afterMsgs-beforeMsgs, afterACKs-beforeMsgs);
_context.statManager().addRateData("udp.receiveACKPeriod", afterACKs-afterMsgs, afterACKs-beforeMsgs);
if ( (fragmentsIncluded > 0) && (acksIncluded > 0) )
_context.statManager().addRateData("udp.receivePiggyback", acksIncluded, fragmentsIncluded);
}
/**
@ -90,10 +93,11 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
* Along the way, if any state expires, or a full message arrives, move it
* appropriately.
*
* @return number of data fragments included
*/
private void receiveMessages(PeerState from, UDPPacketReader.DataReader data) {
private int receiveMessages(PeerState from, UDPPacketReader.DataReader data) {
int fragments = data.readFragmentCount();
if (fragments <= 0) return;
if (fragments <= 0) return fragments;
Hash fromPeer = from.getRemotePeer();
Map messages = from.getInboundMessages();
@ -170,13 +174,16 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
break;
}
}
return fragments;
}
private void receiveACKs(PeerState from, UDPPacketReader.DataReader data) {
private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) {
int rv = 0;
if (data.readACKsIncluded()) {
int fragments = 0;
long acks[] = data.readACKs();
if (acks != null) {
rv += acks.length;
_context.statManager().addRateData("udp.receivedACKs", acks.length, 0);
//_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receiveACKCount", acks.length, 0);
@ -192,6 +199,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
if (data.readACKBitfieldsIncluded()) {
ACKBitfield bitfields[] = data.readACKBitfields();
if (bitfields != null) {
rv += bitfields.length;
//_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receivePartialACKCount", bitfields.length, 0);
for (int i = 0; i < bitfields.length; i++) {
@ -205,5 +213,6 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
from.ECNReceived();
else
from.dataReceived();
return rv;
}
}

View File

@ -61,6 +61,7 @@ public class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendPiggyback", "How many acks were piggybacked on a data packet (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.activeDelay", "How often we wait blocking on the active queue", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
@ -354,16 +355,34 @@ public class OutboundMessageFragments {
if (fragments < 0)
return null;
// ok, simplest possible thing is to always tack on the bitfields if
List msgIds = peer.getCurrentFullACKs();
List partialACKBitfields = null; // fill in later...
List remaining = new ArrayList(msgIds);
int sparseCount = 0;
UDPPacket rv[] = new UDPPacket[fragments]; //sparse
for (int i = 0; i < fragments; i++) {
if (state.needsSending(i))
rv[i] = _builder.buildPacket(state, i, peer);
rv[i] = _builder.buildPacket(state, i, peer, remaining, partialACKBitfields);
else
sparseCount++;
}
int piggybackedAck = 0;
if (msgIds.size() != remaining.size()) {
for (int i = 0; i < msgIds.size(); i++) {
Long id = (Long)msgIds.get(i);
if (!remaining.contains(id)) {
peer.removeACKMessage(id);
piggybackedAck++;
}
}
}
if (sparseCount > 0)
_context.statManager().addRateData("udp.sendSparse", sparseCount, state.getLifetime());
if (piggybackedAck > 0)
_context.statManager().addRateData("udp.sendPiggyback", piggybackedAck, state.getLifetime());
if (_log.shouldLog(Log.INFO))
_log.info("Building packet for " + state + " to " + peer + " with sparse count: " + sparseCount);
peer.packetsTransmitted(fragments - sparseCount);

View File

@ -38,11 +38,23 @@ public class PacketBuilder {
}
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer) {
return buildPacket(state, fragment, peer, null, null);
}
/**
* @param ackIdsRemaining list of messageIds (Long) that should be acked by this packet.
* The list itself is passed by reference, and if a messageId is
* included, it should be removed from the list.
* @param partialACKsRemaining list of messageIds (ACKBitfield) that should be acked by this packet.
* The list itself is passed by reference, and if a messageId is
* included, it should be removed from the list.
*/
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) {
UDPPacket packet = UDPPacket.acquire(_context);
byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
int start = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
int off = start;
// header
data[off] |= (UDPPacket.PAYLOAD_TYPE_DATA << 4);
@ -56,7 +68,54 @@ public class PacketBuilder {
// just always ask for an ACK for now...
data[off] |= UDPPacket.DATA_FLAG_WANT_REPLY;
// we should in theory only include explicit ACKs if the expected packet size
// is under the MTU, but for now, since the # of packets acked is so few (usually
// just one or two), and since the packets are so small anyway, an additional five
// or ten bytes doesn't hurt.
if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) )
data[off] |= UDPPacket.DATA_FLAG_EXPLICIT_ACK;
if ( (partialACKsRemaining != null) && (partialACKsRemaining.size() > 0) )
data[off] |= UDPPacket.DATA_FLAG_ACK_BITFIELDS;
off++;
if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) ) {
DataHelper.toLong(data, off, 1, ackIdsRemaining.size());
off++;
while (ackIdsRemaining.size() > 0) {
Long ackId = (Long)ackIdsRemaining.remove(0);
DataHelper.toLong(data, off, 4, ackId.longValue());
off += 4;
}
}
if ( (partialACKsRemaining != null) && (partialACKsRemaining.size() > 0) ) {
int origNumRemaining = partialACKsRemaining.size();
int numPartialOffset = off;
// leave it blank for now, since we could skip some
off++;
for (int i = 0; i < partialACKsRemaining.size(); i++) {
ACKBitfield bitfield = (ACKBitfield)partialACKsRemaining.get(i);
if (bitfield.receivedComplete()) continue;
DataHelper.toLong(data, off, 4, bitfield.getMessageId());
off += 4;
int bits = bitfield.fragmentCount();
int size = (bits / 7) + 1;
for (int curByte = 0; curByte < size; curByte++) {
if (curByte + 1 < size)
data[off] |= (byte)(1 << 7);
for (int curBit = 0; curBit < 7; curBit++) {
if (bitfield.received(curBit + 7*curByte))
data[off] |= (byte)(1 << curBit);
}
off++;
}
partialACKsRemaining.remove(i);
i--;
}
// now jump back and fill in the number of bitfields *actually* included
DataHelper.toLong(data, numPartialOffset, 1, origNumRemaining - partialACKsRemaining.size());
}
DataHelper.toLong(data, off, 1, 1); // only one fragment in this message
off++;

View File

@ -523,6 +523,25 @@ public class PeerState {
return true;
}
/**
* Grab a list of message ids (Long) that we want to send to the remote
* peer, regardless of the packet size, but don't remove it from our
* "want to send" list. If the message id is transmitted to the peer,
* removeACKMessage(Long) should be called.
*
*/
public List getCurrentFullACKs() {
synchronized (_currentACKs) {
return new ArrayList(_currentACKs);
}
}
public void removeACKMessage(Long messageId) {
synchronized (_currentACKs) {
_currentACKs.remove(messageId);
}
_lastACKSend = _context.clock().now();
}
/**
* grab a list of ACKBitfield instances, some of which may fully
* ACK a message while others may only partially ACK a message.

View File

@ -65,10 +65,12 @@ public class RequestTunnelJob extends JobImpl {
ctx.statManager().createRateStat("tunnel.receiveRejectionTransient", "How often we are rejected due to transient overload?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.receiveRejectionBandwidth", "How often we are rejected due to bandwidth overload?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.receiveRejectionCritical", "How often we are rejected due to critical failure?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildFailure", "How often we fail to build a non-exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildExploratoryFailure", "How often we fail to build an exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildFailure", "What hop was being requested when a nonexploratory tunnel request failed?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildExploratoryFailure", "What hop was beiing requested when an exploratory tunnel request failed?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildSuccess", "How often we succeed building a non-exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildExploratorySuccess", "How often we succeed building an exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildPartialTime", "How long a non-exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildExploratoryPartialTime", "How long an exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting hop " + hop + " in " + cfg);
@ -262,14 +264,20 @@ public class RequestTunnelJob extends JobImpl {
if (_onFailed != null)
getContext().jobQueue().addJob(_onFailed);
if (_isExploratory)
getContext().statManager().addRateData("tunnel.buildExploratoryFailure", 1, 0);
getContext().statManager().addRateData("tunnel.buildExploratoryFailure", _currentHop, _config.getLength());
else
getContext().statManager().addRateData("tunnel.buildFailure", 1, 0);
getContext().statManager().addRateData("tunnel.buildFailure", _currentHop, _config.getLength());
}
private void peerSuccess() {
long now = getContext().clock().now();
getContext().profileManager().tunnelJoined(_currentPeer.getIdentity().calculateHash(),
getContext().clock().now() - _lastSendTime);
now - _lastSendTime);
if (_isExploratory)
getContext().statManager().addRateData("tunnel.buildExploratoryPartialTime", now - _lastSendTime, 0);
else
getContext().statManager().addRateData("tunnel.buildPartialTime", now - _lastSendTime, 0);
if (_currentHop > 0) {
RequestTunnelJob j = new RequestTunnelJob(getContext(), _config, _onCreated, _onFailed, _currentHop - 1, _isFake, _isExploratory);
getContext().jobQueue().addJob(j);