* Streaming - Fix several bugs and improve performance

when the initial data is larger than one MTU,
      e.g. HTTP GETs with large URLs, CGI params or cookies,
      or large HTTP POSTS:
      - Don't reject additional packets received without a
        send stream ID (i.e. sent before the SYN ACK was received)
      - Put unknown non-SYN packets on the SYN queue also
        so they won't be rejected
      - Reduce flusher delay to 250ms (was 500)
      - Flush unless window is full (was window is non-empty)
This commit is contained in:
zzz
2008-11-10 20:30:14 +00:00
parent 6ce2767514
commit b0ec6a0870
7 changed files with 105 additions and 26 deletions

View File

@ -25,12 +25,22 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
_connection = con;
}
/**
* This tells the flusher in MessageOutputStream whether to flush.
* It won't flush if this returns true.
* It was: return con.getUnackedPacketsSent() > 0;
* But then, for data that fills more than one packet, the last part of
* the data isn't sent until all the previous packets are acked. Which is very slow.
*
* So let's send data along unless the outbound window is full.
*
* @return !flush
*/
public boolean writeInProcess() {
Connection con = _connection;
if (con != null)
return con.getUnackedPacketsSent() > 0;
else
return false;
return con.getUnackedPacketsSent() >= con.getOptions().getWindowSize();
return false;
}
/**

View File

@ -41,6 +41,10 @@ class ConnectionHandler {
}
public boolean getActive() { return _active; }
/**
* Non-SYN packets with a zero SendStreamID may also be queued here so
* that they don't get thrown away while the SYN packet before it is queued.
*/
public void receiveNewSyn(Packet packet) {
if (!_active) {
if (_log.shouldLog(Log.WARN))
@ -59,6 +63,8 @@ class ConnectionHandler {
/**
* Receive an incoming connection (built from a received SYN)
* Non-SYN packets with a zero SendStreamID may also be queued here so
* that they don't get thrown away while the SYN packet before it is queued.
*
* @param timeoutMs max amount of time to wait for a connection (if less
* than 1ms, wait indefinitely)
@ -111,14 +117,40 @@ class ConnectionHandler {
if (syn != null) {
// deal with forged / invalid syn packets
Connection con = _manager.receiveConnection(syn);
if (con != null)
return con;
// Handle both SYN and non-SYN packets in the queue
if (syn.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
Connection con = _manager.receiveConnection(syn);
if (con != null)
return con;
} else {
reReceivePacket(syn);
// ... and keep looping
}
}
// keep looping...
}
}
/**
* We found a non-SYN packet that was queued in the syn queue,
* check to see if it has a home now, else drop it ...
*/
private void reReceivePacket(Packet packet) {
Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId());
if (con != null) {
// Send it through the packet handler again
if (_log.shouldLog(Log.WARN))
_log.warn("Found con for queued non-syn packet: " + packet);
_manager.getPacketHandler().receivePacket(packet);
} else {
// goodbye
if (_log.shouldLog(Log.WARN))
_log.warn("Did not find con for queued non-syn packet, dropping: " + packet);
packet.releasePayload();
}
}
private void sendReset(Packet packet) {
boolean ok = packet.verifySignature(_context, packet.getOptionalFrom(), null);
if (!ok) {
@ -152,8 +184,12 @@ class ConnectionHandler {
}
if (removed) {
// timeout - send RST
sendReset(_synPacket);
if (_synPacket.isFlagSet(Packet.FLAG_SYNCHRONIZE))
// timeout - send RST
sendReset(_synPacket);
else
// non-syn packet got stranded on the syn queue, send it to the con
reReceivePacket(_synPacket);
} else {
// handled. noop
}

View File

@ -108,9 +108,12 @@ public class ConnectionPacketHandler {
boolean isNew = false;
boolean allowAck = true;
// We allow the SendStreamID to be 0 so that the originator can send
// multiple packets before he gets the first ACK back.
// If we want to limit the number of packets we receive without a
// SendStreamID, do it in PacketHandler.receiveUnknownCon().
if ( (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) &&
( (packet.getSendStreamId() <= 0) ||
(packet.getReceiveStreamId() <= 0) ) )
(packet.getReceiveStreamId() <= 0) )
allowAck = false;
if (allowAck) {

View File

@ -43,10 +43,15 @@ public class MessageOutputStream extends OutputStream {
private long _sendPeriodBytes;
private int _sendBps;
private static final int DEFAULT_PASSIVE_FLUSH_DELAY = 250;
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
}
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize) {
this(ctx, receiver, bufSize, DEFAULT_PASSIVE_FLUSH_DELAY);
}
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize, int passiveFlushDelay) {
super();
_dataCache = ByteCache.getInstance(128, bufSize);
_context = ctx;
@ -57,7 +62,7 @@ public class MessageOutputStream extends OutputStream {
_written = 0;
_closed = false;
_writeTimeout = -1;
_passiveFlushDelay = 500;
_passiveFlushDelay = passiveFlushDelay;
_nextBufferSize = -1;
_sendPeriodBeginTime = ctx.clock().now();
_sendPeriodBytes = 0;

View File

@ -31,6 +31,8 @@ public class PacketHandler {
_lastDelay = _context.random().nextInt(30*1000);
}
/** what is the point of this ? */
/*****
private boolean choke(Packet packet) {
if (true) return true;
//if ( (_dropped == 0) && true ) { //&& (_manager.getSent() <= 0) ) {
@ -45,9 +47,7 @@ public class PacketHandler {
return false;
} else {
// if (true) return true; // no lag, just drop
/*
int delay = _context.random().nextInt(5*1000);
*/
// int delay = _context.random().nextInt(5*1000);
int delay = _context.random().nextInt(1*1000);
int delayFactor = _context.random().nextInt(100);
if (delayFactor > 80) {
@ -85,10 +85,11 @@ public class PacketHandler {
receivePacketDirect(_packet);
}
}
*****/
void receivePacket(Packet packet) {
boolean ok = choke(packet);
if (ok)
//boolean ok = choke(packet);
//if (ok)
receivePacketDirect(packet);
}
@ -239,19 +240,20 @@ public class PacketHandler {
}
packet.releasePayload();
} else {
//if (_log.shouldLog(Log.DEBUG) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
// _log.debug("Packet received on an unknown stream (and not an ECHO or SYN): " + packet);
if (_log.shouldLog(Log.WARN) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
_log.warn("Packet received on an unknown stream (and not an ECHO or SYN): " + packet);
if (sendId <= 0) {
Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId());
if (con != null) {
if ( (con.getHighestAckedThrough() <= 5) && (packet.getSequenceNum() <= 5) ) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Received additional packets before the syn on " + con + ": " + packet);
if (_log.shouldLog(Log.WARN))
_log.warn("Received additional packet w/o SendStreamID after the syn on " + con + ": " + packet);
receiveKnownCon(con, packet);
return;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets());
if (_log.shouldLog(Log.WARN))
_log.warn("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets());
// allow unlimited packets without a SendStreamID for now
receiveKnownCon(con, packet);
return;
}
@ -261,8 +263,17 @@ public class PacketHandler {
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
_manager.getConnectionHandler().receiveNewSyn(packet);
} else {
// We can get here on the 2nd+ packet if the 1st (SYN) packet
// is still on the _synQueue in the ConnectionHandler, and
// ConnectionManager.receiveConnection() hasn't run yet to put
// the StreamID on the getConnectionByOutboundId list.
// Then the 2nd packet gets discarded and has to be retransmitted.
//
// We fix this by putting this packet on the syn queue too!
// Then ConnectionHandler.accept() will check the connection list
// and call receivePacket() above instead of receiveConnection().
if (_log.shouldLog(Log.WARN)) {
_log.warn("Packet belongs to no other cons: " + packet);
_log.warn("Packet belongs to no other cons, putting on the syn queue: " + packet);
}
if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(128);
@ -274,7 +285,8 @@ public class PacketHandler {
_log.debug("connections: " + buf.toString() + " sendId: "
+ (sendId > 0 ? Packet.toId(sendId) : " unknown"));
}
packet.releasePayload();
//packet.releasePayload();
_manager.getConnectionHandler().receiveNewSyn(packet);
}
}
}

View File

@ -1,3 +1,16 @@
2008-11-11 zzz
* Streaming - Fix several bugs and improve performance
when the initial data is larger than one MTU,
e.g. HTTP GETs with large URLs, CGI params or cookies,
or large HTTP POSTS:
- Don't reject additional packets received without a
send stream ID (i.e. sent before the SYN ACK was received)
- Put unknown non-SYN packets on the SYN queue also
so they won't be rejected
- Reduce flusher delay to 250ms (was 500)
- Flush unless window is full (was window is non-empty)
* NetDb: Fix a deadlock caused by last checkin
2008-11-09 zzz
* build.xml: Build speedups:
- Don't distclean in the updaterRouter target

View File

@ -17,7 +17,7 @@ import net.i2p.CoreVersion;
public class RouterVersion {
public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $";
public final static String VERSION = "0.6.4";
public final static long BUILD = 9;
public final static long BUILD = 10;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);