2005-11-05 jrandom

* Include the most recent ACKs with packets, rather than only sending an
      ack exactly once.  SSU differs from TCP in this regard, as TCP has ever
      increasing sequence numbers, while each message ID in SSU is random, so
      we don't get the benefit of later ACKs implicitly ACKing earlier
      messages.
    * Reduced the max retransmission timeout for SSU
    * Don't try to send messages queued up for a long time waiting for
      establishment.
This commit is contained in:
jrandom
2005-11-05 21:12:57 +00:00
committed by zzz
parent 9050d7c218
commit 14cd469c6d
11 changed files with 147 additions and 29 deletions

View File

@ -1,4 +1,14 @@
$Id: history.txt,v 1.313 2005/11/03 20:20:18 jrandom Exp $
$Id: history.txt,v 1.314 2005/11/05 06:01:57 dust Exp $
2005-11-05 jrandom
* Include the most recent ACKs with packets, rather than only sending an
ack exactly once. SSU differs from TCP in this regard, as TCP has ever
increasing sequence numbers, while each message ID in SSU is random, so
we don't get the benefit of later ACKs implicitly ACKing earlier
messages.
* Reduced the max retransmission timeout for SSU
* Don't try to send messages queued up for a long time waiting for
establishment.
2005-11-05 dust
* Fix sucker to delete its temporary files.

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.282 $ $Date: 2005/11/03 20:20:17 $";
public final static String ID = "$Revision: 1.283 $ $Date: 2005/11/05 06:01:58 $";
public final static String VERSION = "0.6.1.4";
public final static long BUILD = 3;
public final static long BUILD = 4;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -42,6 +42,7 @@ public class GetBidsJob extends JobImpl {
static void getBids(RouterContext context, CommSystemFacadeImpl facade, OutNetMessage msg) {
Log log = context.logManager().getLog(GetBidsJob.class);
Hash to = msg.getTarget().getIdentity().getHash();
msg.timestamp("bid");
if (context.shitlist().isShitlisted(to)) {
if (log.shouldLog(Log.WARN))

View File

@ -132,15 +132,18 @@ public abstract class TransportImpl implements Transport {
if (msToSend > 1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("afterSend: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte "
_log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " from "
+ _context.routerHash().toBase64().substring(0,6) + " took " + msToSend);
}
long lifetime = msg.getLifetime();
if (lifetime > 5000) {
if (_log.shouldLog(Log.WARN))
_log.warn("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "
if (lifetime > 3000) {
int level = Log.WARN;
//if (!sendSuccessful)
// level = Log.INFO;
if (_log.shouldLog(level))
_log.log(level, "afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6)
+ " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + "\n" + msg.toString());
} else {
@ -219,7 +222,7 @@ public abstract class TransportImpl implements Transport {
_log.info("Took too long from preperation to afterSend(ok? " + sendSuccessful
+ "): " + allTime + "ms " + " after failing on: "
+ msg.getFailedTransports() + " and succeeding on " + getStyle());
if (allTime > 60*1000) {
if ( (allTime > 60*1000) && (sendSuccessful) ) {
// WTF!!@#
if (_log.shouldLog(Log.WARN))
_log.warn("WTF, more than a minute slow? " + msg.getMessageType()
@ -271,6 +274,7 @@ public abstract class TransportImpl implements Transport {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Message added to send pool");
msg.timestamp("send on " + getStyle());
outboundMessageReady();
if (_log.shouldLog(Log.INFO))
_log.debug("OutboundMessageReady called");

View File

@ -92,7 +92,7 @@ public class ACKSender implements Runnable {
if (peer != null) {
long lastSend = peer.getLastACKSend();
long wanted = peer.getWantedACKSendSince();
List ackBitfields = peer.retrieveACKBitfields();
List ackBitfields = peer.retrieveACKBitfields(false);
if (wanted < 0)
_log.error("wtf, why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields);

View File

@ -17,6 +17,7 @@ import net.i2p.data.Signature;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.router.CommSystemFacade;
import net.i2p.router.OutNetMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@ -137,8 +138,10 @@ public class EstablishmentManager {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add outobund establish state to: " + to);
OutboundEstablishState state = null;
int deferred = 0;
synchronized (_outboundStates) {
OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(to);
state = (OutboundEstablishState)_outboundStates.get(to);
if (state == null) {
if (_outboundStates.size() >= getMaxConcurrentEstablish()) {
List queued = (List)_queuedOutbound.get(to);
@ -147,6 +150,7 @@ public class EstablishmentManager {
_queuedOutbound.put(to, queued);
}
queued.add(msg);
deferred = _queuedOutbound.size();
} else {
state = new OutboundEstablishState(_context, remAddr, port,
msg.getTarget().getIdentity(),
@ -163,6 +167,10 @@ public class EstablishmentManager {
}
}
if (deferred > 0)
msg.timestamp("too many deferred establishers: " + deferred);
else if (state != null)
msg.timestamp("establish state already waiting " + state.getLifetime());
notifyActivity();
}
@ -312,8 +320,11 @@ public class EstablishmentManager {
new SessionKey(addr.getIntroKey()), addr);
_outboundStates.put(to, qstate);
for (int i = 0; i < queued.size(); i++)
qstate.addMessage((OutNetMessage)queued.get(i));
for (int i = 0; i < queued.size(); i++) {
OutNetMessage m = (OutNetMessage)queued.get(i);
m.timestamp("no longer deferred... establishing");
qstate.addMessage(m);
}
admitted++;
}
return admitted;
@ -388,11 +399,19 @@ public class EstablishmentManager {
_context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0);
sendOurInfo(peer);
int i = 0;
while (true) {
OutNetMessage msg = state.getNextQueuedMessage();
if (msg == null)
break;
_transport.send(msg);
if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) {
msg.timestamp("took too long but established...");
_transport.failed(msg);
} else {
msg.timestamp("session fully established and sent " + i);
_transport.send(msg);
}
i++;
}
return peer;
}

View File

@ -43,13 +43,24 @@ public class PacketBuilder {
/**
* @param ackIdsRemaining list of messageIds (Long) that should be acked by this packet.
* The list itself is passed by reference, and if a messageId is
* included, it should be removed from the list.
* transmitted and the sender does not want the ID to be included
* in subsequent acks, it should be removed from the list. NOTE:
* right now this does NOT remove the IDs, which means it assumes
* that the IDs will be transmitted potentially multiple times,
* and should otherwise be removed from the list.
* @param partialACKsRemaining list of messageIds (ACKBitfield) that should be acked by this packet.
* The list itself is passed by reference, and if a messageId is
* included, it should be removed from the list.
*/
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) {
UDPPacket packet = UDPPacket.acquire(_context);
StringBuffer msg = null;
boolean acksIncluded = false;
if (_log.shouldLog(Log.WARN)) {
msg = new StringBuffer(128);
msg.append("building data packet with acks to ").append(peer.getRemotePeer().toBase64().substring(0,6));
}
byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0);
@ -81,10 +92,14 @@ public class PacketBuilder {
if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) ) {
DataHelper.toLong(data, off, 1, ackIdsRemaining.size());
off++;
while (ackIdsRemaining.size() > 0) {
Long ackId = (Long)ackIdsRemaining.remove(0);
for (int i = 0; i < ackIdsRemaining.size(); i++) {
//while (ackIdsRemaining.size() > 0) {
Long ackId = (Long)ackIdsRemaining.get(i);//(Long)ackIdsRemaining.remove(0);
DataHelper.toLong(data, off, 4, ackId.longValue());
off += 4;
off += 4;
if (msg != null) // logging it
msg.append(" full ack: ").append(ackId.longValue());
acksIncluded = true;
}
}
@ -111,12 +126,18 @@ public class PacketBuilder {
off++;
}
partialACKsRemaining.remove(i);
if (msg != null) // logging it
msg.append(" partial ack: ").append(bitfield);
acksIncluded = true;
i--;
}
// now jump back and fill in the number of bitfields *actually* included
DataHelper.toLong(data, numPartialOffset, 1, origNumRemaining - partialACKsRemaining.size());
}
if ( (msg != null) && (acksIncluded) )
_log.warn(msg.toString());
DataHelper.toLong(data, off, 1, 1); // only one fragment in this message
off++;
@ -171,6 +192,12 @@ public class PacketBuilder {
public UDPPacket buildACK(PeerState peer, List ackBitfields) {
UDPPacket packet = UDPPacket.acquire(_context);
StringBuffer msg = null;
if (_log.shouldLog(Log.WARN)) {
msg = new StringBuffer(128);
msg.append("building ACK packet to ").append(peer.getRemotePeer().toBase64().substring(0,6));
}
byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
@ -207,6 +234,8 @@ public class PacketBuilder {
if (bf.receivedComplete()) {
DataHelper.toLong(data, off, 4, bf.getMessageId());
off += 4;
if (msg != null) // logging it
msg.append(" full ack: ").append(bf.getMessageId());
}
}
}
@ -231,12 +260,18 @@ public class PacketBuilder {
}
off++;
}
if (msg != null) // logging it
msg.append(" partial ack: ").append(bitfield);
}
}
DataHelper.toLong(data, off, 1, 0); // no fragments in this message
off++;
if (msg != null)
_log.warn(msg.toString());
// we can pad here if we want, maybe randomized?
// pad up so we're on the encryption boundary
@ -777,8 +812,8 @@ public class PacketBuilder {
System.arraycopy(ourIntroKey.getData(), 0, data, off, SessionKey.KEYSIZE_BYTES);
off += SessionKey.KEYSIZE_BYTES;
if (_log.shouldLog(Log.WARN))
_log.warn("wrote alice intro key: " + Base64.encode(data, off-SessionKey.KEYSIZE_BYTES, SessionKey.KEYSIZE_BYTES)
if (_log.shouldLog(Log.DEBUG))
_log.debug("wrote alice intro key: " + Base64.encode(data, off-SessionKey.KEYSIZE_BYTES, SessionKey.KEYSIZE_BYTES)
+ " with nonce " + introNonce + " size=" + (off+4 + (16 - (off+4)%16))
+ " and data: " + Base64.encode(data, 0, off));

View File

@ -75,6 +75,12 @@ public class PeerState {
private long _lastFailedSendPeriod;
/** list of messageIds (Long) that we have received but not yet sent */
private List _currentACKs;
/**
* list of the most recent messageIds (Long) that we have received and sent
* an ACK for. We keep a few of these around to retransmit with _currentACKs,
* hopefully saving some spurious retransmissions
*/
private List _currentACKsResend;
/** when did we last send ACKs to the peer? */
private volatile long _lastACKSend;
/** when did we decide we need to ACK to this peer? */
@ -173,7 +179,7 @@ public class PeerState {
*/
private static final int DEFAULT_MTU = 608;//600; //1500;
private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY;
private static final int MAX_RTO = 5000; // 5000;
private static final int MAX_RTO = 1200; // 5000;
public PeerState(I2PAppContext ctx) {
_context = ctx;
@ -191,6 +197,7 @@ public class PeerState {
_lastSendTime = -1;
_lastReceiveTime = -1;
_currentACKs = new ArrayList(8);
_currentACKsResend = new ArrayList(8);
_currentSecondECNReceived = false;
_remoteWantsPreviousACKs = false;
_sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES;
@ -536,16 +543,25 @@ public class PeerState {
*/
public List getCurrentFullACKs() {
synchronized (_currentACKs) {
return new ArrayList(_currentACKs);
ArrayList rv = new ArrayList(_currentACKs);
// include some for retransmission
rv.addAll(_currentACKsResend);
return rv;
}
}
public void removeACKMessage(Long messageId) {
synchronized (_currentACKs) {
_currentACKs.remove(messageId);
_currentACKsResend.add(messageId);
// trim down the resends
while (_currentACKsResend.size() > MAX_RESEND_ACKS)
_currentACKsResend.remove(0);
}
_lastACKSend = _context.clock().now();
}
private static final int MAX_RESEND_ACKS = 8;
/**
* grab a list of ACKBitfield instances, some of which may fully
* ACK a message while others may only partially ACK a message.
@ -555,20 +571,34 @@ public class PeerState {
* will be unchanged if there are ACKs remaining.
*
*/
public List retrieveACKBitfields() {
public List retrieveACKBitfields() { return retrieveACKBitfields(true); }
public List retrieveACKBitfields(boolean alwaysIncludeRetransmissions) {
List rv = null;
int bytesRemaining = countMaxACKData();
synchronized (_currentACKs) {
rv = new ArrayList(_currentACKs.size());
int oldIndex = _currentACKsResend.size();
while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) {
long id = ((Long)_currentACKs.remove(0)).longValue();
Long val = (Long)_currentACKs.remove(0);
long id = val.longValue();
rv.add(new FullACKBitfield(id));
_currentACKsResend.add(val);
bytesRemaining -= 4;
}
if (_currentACKs.size() <= 0)
_wantACKSendSince = -1;
if (alwaysIncludeRetransmissions || rv.size() > 0) {
// now repeat by putting in some old ACKs
for (int i = 0; (i < oldIndex) && (bytesRemaining >= 4); i++) {
rv.add(new FullACKBitfield(((Long)_currentACKsResend.get(i)).longValue()));
bytesRemaining -= 4;
}
}
// trim down the resends
while (_currentACKsResend.size() > MAX_RESEND_ACKS)
_currentACKsResend.remove(0);
}
int partialIncluded = 0;
if (bytesRemaining > 4) {
// ok, there's room to *try* to fit in some partial ACKs, so
@ -674,7 +704,7 @@ public class PeerState {
_messagesSent++;
if (numSends < 2)
recalculateTimeouts(lifetime);
else
else if (_log.shouldLog(Log.WARN))
_log.warn("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
_context.statManager().addRateData("udp.sendBps", _sendBps, lifetime);

View File

@ -104,6 +104,7 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound
_addedSincePassBegan = true;
_nextLock.notifyAll();
}
message.timestamp("added to queue " + queue);
}
/**
@ -138,10 +139,14 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound
_messagesFlushed[currentQueue] = 0;
_nextQueue = (currentQueue + 1) % _queue.length;
}
_context.statManager().addRateData("udp.messageQueueSize", _queue[currentQueue].size(), currentQueue);
int sz = _queue[currentQueue].size();
_context.statManager().addRateData("udp.messageQueueSize", sz, currentQueue);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Pulling a message off queue " + currentQueue + " with "
+ _queue[currentQueue].size() + " remaining");
+ sz + " remaining");
msg.timestamp("made active with remaining queue size " + sz);
return msg;
}
@ -247,6 +252,7 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound
for (int i = 0; i < removed.size(); i++) {
OutNetMessage m = (OutNetMessage)removed.get(i);
m.timestamp("expirer killed it");
_listener.failed(m);
}
removed.clear();

View File

@ -506,9 +506,12 @@ public class UDPPacketReader {
buf.append(getMessageId());
buf.append(" with ACKs for: ");
int numFrags = fragmentCount();
for (int i = 0; i < numFrags; i++)
for (int i = 0; i < numFrags; i++) {
if (received(i))
buf.append(i).append(" ");
else
buf.append('!').append(i).append(" ");
}
return buf.toString();
}
}

View File

@ -669,7 +669,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (msg == null) return;
if (msg.getTarget() == null) return;
if (msg.getTarget().getIdentity() == null) return;
msg.timestamp("sending on UDP transport");
Hash to = msg.getTarget().getIdentity().calculateHash();
PeerState peer = getPeerState(to);
if (peer != null) {
@ -682,13 +683,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
(peer.getConsecutiveFailedSends() > 0) ) {
// peer is waaaay idle, drop the con and queue it up as a new con
dropPeer(peer, false);
msg.timestamp("peer is really idle, dropping con and reestablishing");
_establisher.establish(msg);
_context.statManager().addRateData("udp.proactiveReestablish", now-lastSend, now-peer.getKeyEstablishedTime());
return;
}
}
msg.timestamp("enqueueing for an already established peer");
_outboundMessages.add(msg);
} else {
msg.timestamp("establishing a new connection");
_establisher.establish(msg);
}
}
@ -846,6 +850,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if ( (msg.getPeer() != null) &&
( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) ||
(msg.isExpired())) ) {
OutNetMessage m = msg.getMessage();
if (m != null)
m.timestamp("message failure - volleys = " + msg.getMaxSends()
+ " lastReceived: " + (_context.clock().now() - msg.getPeer().getLastReceiveTime())
+ " lastSentFully: " + (_context.clock().now() - msg.getPeer().getLastSendFullyTime())
+ " expired? " + msg.isExpired());
consecutive = msg.getPeer().incrementConsecutiveFailedSends();
if (_log.shouldLog(Log.WARN))
_log.warn("Consecutive failure #" + consecutive + " sending to " + msg.getPeer());