Streaming: Increase MTU for ratchet (proposal 155)

Set MTU in receiveConnection() for inbound connections
Cleanup CDR.isAckOnly()
Only call now() in MOS for debugging
Set receive.streamActive stat to 1, not 0
This commit is contained in:
zzz
2020-05-30 16:24:25 +00:00
parent d8b308dd9d
commit 370f96acfb
9 changed files with 159 additions and 44 deletions

View File

@ -144,7 +144,8 @@ class Connection {
_inputStream = new MessageInputStream(_context, _options.getMaxMessageSize(),
_options.getMaxWindowSize(), _options.getInboundBufferSize());
// FIXME pass through a passive flush delay setting as the 4th arg
_outputStream = new MessageOutputStream(_context, timer, _receiver, _options.getMaxMessageSize());
_outputStream = new MessageOutputStream(_context, timer, _receiver,
_options.getMaxMessageSize(), _options.getMaxInitialMessageSize());
_timer = timer;
_outboundPackets = new TreeMap<Long, PacketLocal>();
if (opts != null) {

View File

@ -146,12 +146,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
return packet;
}
private static boolean isAckOnly(Connection con, int size) {
boolean ackOnly = ( (size <= 0) && // no data
(con.getLastSendId() >= 0) && // not a SYN
( (!con.getOutputStream().getClosed()) || // not a CLOSE
(con.getOutputStream().getClosed() &&
con.getCloseSentOn() > 0) )); // or it is a dup CLOSE
private boolean isAckOnly(int size) {
boolean ackOnly = size <= 0 && // no data
_connection.getLastSendId() >= 0 && // not a SYN
(!_connection.getOutputStream().getClosed() || // not a CLOSE
_connection.getCloseSentOn() > 0); // or it is a dup CLOSE
return ackOnly;
}
@ -170,7 +169,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
*/
private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) {
if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")");
boolean ackOnly = isAckOnly(_connection, size);
boolean ackOnly = isAckOnly(size);
boolean isFirst = (_connection.getAckedPackets() <= 0) && (_connection.getUnackedPacketsSent() <= 0);
PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection);

View File

@ -230,23 +230,13 @@ class ConnectionManager {
* Create a new connection based on the SYN packet we received.
*
* @param synPacket SYN packet to process
* @return created Connection with the packet's data already delivered to
* it, or null if the syn's streamId was already taken
* @return created Connection with the packet's data already delivered to it,
* or null if the syn's streamId was already taken,
* or if the connection was rejected
*/
public Connection receiveConnection(Packet synPacket) {
ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
opts.setPort(synPacket.getRemotePort());
opts.setLocalPort(synPacket.getLocalPort());
boolean reject = false;
int active = 0;
int total = 0;
// just for the stat
//total = _connectionByInboundId.size();
//for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
// if ( ((Connection)iter.next()).getIsConnected() )
// active++;
//}
if (locked_tooManyStreams()) {
if ((!_defaultOptions.getDisableRejectLogging()) || _log.shouldLog(Log.WARN))
_log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of "
@ -263,7 +253,7 @@ class ConnectionManager {
}
}
_context.statManager().addRateData("stream.receiveActive", active, total);
_context.statManager().addRateData("stream.receiveActive", 1);
if (reject) {
Destination from = synPacket.getOptionalFrom();
@ -331,6 +321,40 @@ class ConnectionManager {
return null;
}
ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
opts.setPort(synPacket.getRemotePort());
opts.setLocalPort(synPacket.getLocalPort());
// set up the MTU for the connection
int size;
if (synPacket.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) {
size = synPacket.getOptionalMaxSize();
if (size < ConnectionOptions.MIN_MESSAGE_SIZE) {
// log.error? connection reset?
size = ConnectionOptions.MIN_MESSAGE_SIZE;
}
} else {
// specs not clear if MTU may be omitted from SYN
size = ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE;
}
int mtu = opts.getMaxMessageSize();
if (size < mtu) {
if (_log.shouldInfo())
_log.info("Reducing MTU for IB conn to " + size
+ " from " + mtu);
opts.setMaxMessageSize(size);
opts.setMaxInitialMessageSize(size);
} else if (size > opts.getMaxInitialMessageSize()) {
if (size > mtu)
size = mtu;
if (_log.shouldInfo())
_log.info("Increasing MTU for IB conn to " + size
+ " from " + mtu);
if (size != mtu)
opts.setMaxMessageSize(size);
opts.setMaxInitialMessageSize(size);
}
Connection con = new Connection(_context, this, synPacket.getSession(), _schedulerChooser,
_timer, _outboundQueue, _conPacketHandler, opts, true);
_tcbShare.updateOptsFromShare(con);

View File

@ -34,6 +34,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
private int _resendDelay;
private int _sendAckDelay;
private int _maxMessageSize;
private int _maxInitialMessageSize;
private int _maxResends;
private int _inactivityTimeout;
private int _inactivityAction;
@ -304,6 +305,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
*/
public static final int DEFAULT_MAX_MESSAGE_SIZE = 1730;
public static final int MIN_MESSAGE_SIZE = 512;
/**
*
* See analysis in proposal 144
* @since 0.9.48
*/
public static final int DEFAULT_MAX_MESSAGE_SIZE_RATCHET = 1812;
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
@ -774,7 +781,25 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
* @return Maximum message size (MTU/MRU)
*/
public int getMaxMessageSize() { return _maxMessageSize; }
public void setMaxMessageSize(int bytes) { _maxMessageSize = Math.max(bytes, MIN_MESSAGE_SIZE); }
public void setMaxMessageSize(int bytes) {
_maxMessageSize = Math.max(bytes, MIN_MESSAGE_SIZE);
_maxInitialMessageSize = Math.min(_maxMessageSize, DEFAULT_MAX_MESSAGE_SIZE);
}
/**
* What is the largest message to send in the SYN from Alice to Bob?
* @return the max
* @since 0.9.47
*/
public int getMaxInitialMessageSize() { return _maxInitialMessageSize; }
/**
* What is the largest message to send in the SYN from Alice to Bob?
* @since 0.9.47
*/
public void setMaxInitialMessageSize(int bytes) {
_maxInitialMessageSize = bytes;
}
/**
* What profile do we want to use for this connection?

View File

@ -87,14 +87,38 @@ class ConnectionPacketHandler {
}
if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) {
int size = packet.getOptionalMaxSize();
if (size < ConnectionOptions.MIN_MESSAGE_SIZE) {
// log.error? connection reset?
size = ConnectionOptions.MIN_MESSAGE_SIZE;
// inbound SYN handled in ConnectionManager.receiveConnection()
if (!(con.isInbound() && packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))) {
int size = packet.getOptionalMaxSize();
if (size < ConnectionOptions.MIN_MESSAGE_SIZE) {
// log.error? connection reset?
size = ConnectionOptions.MIN_MESSAGE_SIZE;
}
int mtu = con.getOptions().getMaxMessageSize();
if (size < mtu) {
if (_log.shouldInfo())
_log.info("Reducing MTU to " + size
+ " from " + mtu);
con.getOptions().setMaxMessageSize(size);
con.getOutputStream().setBufferSize(size);
} else if (size > con.getOptions().getMaxInitialMessageSize()) {
if (size > mtu)
size = mtu;
if (_log.shouldInfo())
_log.info("Increasing MTU to " + size
+ " from " + con.getOptions().getMaxInitialMessageSize());
if (size != mtu)
con.getOptions().setMaxMessageSize(size);
con.getOutputStream().setBufferSize(size);
}
}
} else if (!con.isInbound() && packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
// SYN ACK w/o MAX_PACKET_SIZE?
// specs not clear if this is allowed
final int size = ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE;
if (size < con.getOptions().getMaxMessageSize()) {
if (_log.shouldLog(Log.INFO))
_log.info("Reducing our max message size to " + size
if (_log.shouldInfo())
_log.info("SYN ACK w/o MTU, Reducing MTU to " + size
+ " from " + con.getOptions().getMaxMessageSize());
con.getOptions().setMaxMessageSize(size);
con.getOutputStream().setBufferSize(size);

View File

@ -32,6 +32,7 @@ import net.i2p.client.streaming.IncomingConnectionFilter;
import net.i2p.crypto.SigAlgo;
import net.i2p.crypto.SigType;
import net.i2p.data.Certificate;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.PrivateKey;
@ -202,6 +203,27 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_name = name + " " + (__managerId.incrementAndGet());
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_defaultOptions = new ConnectionOptions(opts);
if (opts != null && opts.getProperty(ConnectionOptions.PROP_MAX_MESSAGE_SIZE) == null) {
// set higher MTU for ECIES
String senc = opts.getProperty("i2cp.leaseSetEncType");
if (senc != null && !senc.equals("0")) {
String[] senca = DataHelper.split(senc, ",");
boolean has0 = false;
boolean has4 = false;
for (int i = 0; i < senca.length; i++) {
if (senca[i].equals("0")) {
has0 = true;
} else if (senca[i].equals("4")) {
has4 = true;
}
}
if (has4) {
_defaultOptions.setMaxMessageSize(ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE_RATCHET);
if (!has0)
_defaultOptions.setMaxInitialMessageSize(ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE_RATCHET);
}
}
}
_connectionManager = new ConnectionManager(_context, _session, _defaultOptions, connectionFilter);
_serverSocket = new I2PServerSocketFull(this);

View File

@ -62,16 +62,24 @@ class MessageOutputStream extends OutputStream {
/** */
public MessageOutputStream(I2PAppContext ctx, SimpleTimer2 timer,
DataReceiver receiver, int bufSize) {
this(ctx, timer, receiver, bufSize, DEFAULT_PASSIVE_FLUSH_DELAY);
DataReceiver receiver, int bufSize, int initBufSize) {
this(ctx, timer, receiver, bufSize, initBufSize, DEFAULT_PASSIVE_FLUSH_DELAY);
}
public MessageOutputStream(I2PAppContext ctx, SimpleTimer2 timer,
DataReceiver receiver, int bufSize, int passiveFlushDelay) {
DataReceiver receiver, int bufSize, int initBufSize, int passiveFlushDelay) {
super();
// we only use two buffer sizes to prevent an attack
// where we end up with a thousand caches
if (bufSize < ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE) {
bufSize = ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE;
} else if (bufSize > ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE &&
bufSize < ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE_RATCHET) {
bufSize = ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE_RATCHET;
}
_dataCache = ByteCache.getInstance(128, bufSize);
_originalBufferSize = bufSize;
_currentBufferSize = bufSize;
_currentBufferSize = initBufSize;
_context = ctx;
_log = ctx.logManager().getLog(MessageOutputStream.class);
_buf = _dataCache.acquire().getData(); // new byte[bufSize];
@ -79,7 +87,6 @@ class MessageOutputStream extends OutputStream {
_dataLock = new Object();
_writeTimeout = -1;
_passiveFlushDelay = passiveFlushDelay;
_nextBufferSize = 0;
//_sendPeriodBeginTime = ctx.clock().now();
//_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_flusher = new Flusher(timer);
@ -119,7 +126,7 @@ class MessageOutputStream extends OutputStream {
_log.debug("write(b[], " + off + ", " + len + ") ");
int cur = off;
int remaining = len;
long begin = _context.clock().now();
long begin = _log.shouldDebug() ? _context.clock().now() : 0;
while (remaining > 0) {
WriteStatus ws = null;
if (_closed.get()) throw new IOException("Output stream closed");
@ -189,9 +196,11 @@ class MessageOutputStream extends OutputStream {
_log.debug("Queued " + len + " without sending to the receiver");
}
}
long elapsed = _context.clock().now() - begin;
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.INFO)) )
_log.info("took " + elapsed + "ms to write to the stream?", new Exception("foo"));
if (_log.shouldDebug()) {
long elapsed = _context.clock().now() - begin;
if (elapsed > 10*1000)
_log.info("took " + elapsed + "ms to write to the stream?", new Exception("foo"));
}
throwAnyError();
//updateBps(len);
}
@ -244,9 +253,11 @@ class MessageOutputStream extends OutputStream {
*/
private class Flusher extends SimpleTimer2.TimedEvent {
private boolean _enqueued;
public Flusher(SimpleTimer2 timer) {
super(timer);
}
public void enqueue() {
// no need to be overly worried about duplicates - it would just
// push it further out
@ -258,12 +269,13 @@ class MessageOutputStream extends OutputStream {
forceReschedule(_passiveFlushDelay);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Enqueueing the flusher for " + _passiveFlushDelay + "ms out");
_enqueued = true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("NOT enqueing the flusher");
}
_enqueued = true;
}
public void timeReached() {
if (_closed.get())
return;
@ -334,7 +346,7 @@ class MessageOutputStream extends OutputStream {
* @@since 0.8.1
*/
private void flush(boolean wait_for_accept_only) throws IOException {
long begin = _context.clock().now();
long begin = _log.shouldDebug() ? _context.clock().now() : 0;
WriteStatus ws = null;
if (_log.shouldLog(Log.INFO) && _valid > 0)
_log.info("flush() valid = " + _valid);
@ -388,9 +400,11 @@ class MessageOutputStream extends OutputStream {
else if (ws.writeFailed())
throw new IOException("Write failed");
long elapsed = _context.clock().now() - begin;
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("took " + elapsed + "ms to flush the stream?\n" + ws, new Exception("bar"));
if (_log.shouldDebug()) {
long elapsed = _context.clock().now() - begin;
if (elapsed > 10*1000)
_log.debug("took " + elapsed + "ms to flush the stream?\n" + ws, new Exception("bar"));
}
throwAnyError();
}