* SendMessageOptions: Increase tag fields to 4 bits and use

table lookup for more flexibility
 * Streaming: Use packet type and current window size to adjust
              number of tags sent and tag threshold, to improve
              efficiency and reliability
This commit is contained in:
zzz
2012-08-26 13:02:11 +00:00
parent 9ba6c293ed
commit b01cf32321
5 changed files with 91 additions and 66 deletions

View File

@ -3,6 +3,7 @@ package net.i2p.client.streaming;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.SendMessageOptions;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
@ -22,6 +23,16 @@ class PacketQueue {
private final ConnectionManager _connectionManager;
private final ByteCache _cache = ByteCache.getInstance(64, 36*1024);
private static final int FLAGS_INITIAL_TAGS = Packet.FLAG_SYNCHRONIZE;
private static final int FLAGS_FINAL_TAGS = Packet.FLAG_CLOSE |
Packet.FLAG_RESET |
Packet.FLAG_ECHO;
private static final int INITIAL_TAGS_TO_SEND = 32;
private static final int MIN_TAG_THRESHOLD = 20;
private static final int TAG_WINDOW_FACTOR = 5;
private static final int FINAL_TAGS_TO_SEND = 4;
private static final int FINAL_TAG_THRESHOLD = 2;
public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
_context = context;
_session = session;
@ -88,24 +99,34 @@ class PacketQueue {
// we want the router to expire it a little before we do,
// so if we retransmit it will use a new tunnel/lease combo
expires = rpe.getNextSendTime() - 500;
SendMessageOptions options = new SendMessageOptions();
if (expires > 0)
// I2PSessionImpl2
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires);
// I2PSessionMuxedImpl
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires,
// I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
// I2PSessionMuxedImpl no tags
sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, expires,
I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort());
else
// I2PSessionImpl2
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0);
// I2PSessionMuxedImpl
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent,
// I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
// I2PSessionMuxedImpl no tags
sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null,
I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort());
options.setDate(expires);
if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) {
Connection con = packet.getConnection();
if (con != null && con.isInbound())
options.setSendLeaseSet(false);
options.setTagsToSend(INITIAL_TAGS_TO_SEND);
options.setTagThreshold(MIN_TAG_THRESHOLD);
} else if (packet.isFlagSet(FLAGS_FINAL_TAGS)) {
options.setSendLeaseSet(false);
options.setTagsToSend(FINAL_TAGS_TO_SEND);
options.setTagThreshold(FINAL_TAG_THRESHOLD);
} else {
Connection con = packet.getConnection();
if (con != null) {
if (con.isInbound() && con.getLifetime() < 2*60*1000)
options.setSendLeaseSet(false);
// increase threshold with higher window sizes to prevent stalls
// after tag delivery failure
int wdw = con.getOptions().getWindowSize();
int thresh = Math.max(MIN_TAG_THRESHOLD, wdw * TAG_WINDOW_FACTOR);
options.setTagThreshold(thresh);
}
}
sent = _session.sendMessage(packet.getTo(), buf, 0, size,
I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(),
options);
end = _context.clock().now();
if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) )