2004-11-30 jrandom

* Render the burst rate fields on /config.jsp properly (thanks ugha!)
    * Build in a simple timeout to flush data queued into the I2PSocket but
      not yet flushed.
    * Don't explicitly flush after each SAM stream write, but leave it up to
      the [nonblocking] passive flush.
    * Don't whine about 10-99 connection events occurring in a second
    * Don't wait for completion of packets that will not be ACKed (duh)
    * Adjust the congestion window, even if the packet was resent (duh)
    * Make sure to wake up any blocking read()'s when the MessageInputStream
      is close()ed (duh)
    * Never wait more than the disconnect timeout for a write to complete
This commit is contained in:
jrandom
2004-11-30 23:41:51 +00:00
committed by zzz
parent df61ae5c6f
commit 516d0b4db8
12 changed files with 127 additions and 21 deletions

View File

@ -110,6 +110,9 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
//i2pout.flush();
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Initial data " + (initialData != null ? initialData.length : 0)
+ " written, starting forwarders");
Thread t1 = new StreamForwarder(in, i2pout, "toI2P");
Thread t2 = new StreamForwarder(i2pin, out, "fromI2P");
synchronized (finishLock) {
@ -117,6 +120,8 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
finishLock.wait();
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("At least one forwarder completed, closing and joining");
// now one connection is dead - kill the other as well.
s.close();
i2ps.close();

View File

@ -120,10 +120,15 @@ public class ConfigNetHelper {
private static String getBurstFactor(int numSeconds, String name) {
StringBuffer buf = new StringBuffer(256);
buf.append("<select name=\"").append(name).append("\">\n");
boolean found = false;
for (int i = 10; i <= 60; i += 10) {
buf.append("<option value=\"").append(i).append("\" ");
if ( (i == numSeconds) || (i == 60) )
if (i == numSeconds) {
buf.append("selected ");
found = true;
} else if ( (i == 60) && (!found) ) {
buf.append("selected ");
}
buf.append(">");
buf.append(i).append(" seconds</option>\n");
}

View File

@ -391,6 +391,8 @@ public class SAMStreamSession {
while (stillRunning) {
try {
i2ps = serverSocket.accept();
if (i2ps == null)
break;
_log.debug("New incoming connection");
@ -467,6 +469,7 @@ public class SAMStreamSession {
}
try {
i2pSocketOS.write(data);
//i2pSocketOS.flush();
} catch (IOException e) {
_log.error("Error sending data through I2P socket", e);
return false;

View File

@ -70,7 +70,7 @@ public class Connection {
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 20*1000;
public static final long MIN_RESEND_DELAY = 40*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */
public static int DISCONNECT_TIMEOUT = 5*60*1000;
@ -325,8 +325,8 @@ public class Connection {
_occurredEventCount++;
} else {
_occurredTime = now;
if (_occurredEventCount > 10) {
_log.log(Log.CRIT, "More than 10 events (" + _occurredEventCount + ") in a second on "
if (_occurredEventCount > 100) {
_log.log(Log.CRIT, "More than 100 events (" + _occurredEventCount + ") in a second on "
+ toString() + ": scheduler = " + sched);
}
_occurredEventCount = 0;

View File

@ -63,7 +63,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if (doSend) {
PacketLocal packet = send(buf, off, size);
return packet;
//dont wait for non-acks
if ( (packet.getPayloadSize() > 0) || (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) )
return packet;
else
return _dummyStatus;
} else {
return _dummyStatus;
}

View File

@ -32,6 +32,8 @@ class ConnectionHandler {
}
public void setActive(boolean active) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("setActive(" + active + ") called");
synchronized (_synQueue) {
_active = active;
_synQueue.notifyAll(); // so we break from the accept()

View File

@ -157,7 +157,7 @@ public class ConnectionPacketHandler {
+ ") for " + con);
return true;
} else if (numResends > 0) {
//} else if (numResends > 0) {
// window sizes are shrunk on resend, not on ack
} else {
if (acked > 0) {
@ -166,17 +166,19 @@ public class ConnectionPacketHandler {
// new packet that ack'ed uncongested data, or an empty ack
int newWindowSize = con.getOptions().getWindowSize();
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
// congestion avoidance
// we can't use newWindowSize += 1/newWindowSize, since we're
// integers, so lets use a random distribution instead
int shouldIncrement = _context.random().nextInt(newWindowSize);
if (shouldIncrement <= 0)
if (numResends <= 0) {
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
// congestion avoidance
// we can't use newWindowSize += 1/newWindowSize, since we're
// integers, so lets use a random distribution instead
int shouldIncrement = _context.random().nextInt(newWindowSize);
if (shouldIncrement <= 0)
newWindowSize += 1;
} else {
// slow start
newWindowSize += 1;
} else {
// slow start
newWindowSize += 1;
}
}
if (_log.shouldLog(Log.DEBUG))

View File

@ -272,6 +272,9 @@ public class MessageInputStream extends InputStream {
// at least one byte
while (_readyDataBlocks.size() <= 0) {
if (_locallyClosed)
throw new IOException("Already closed, you wanker");
if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset + ", " + length + ")[" + i
@ -402,6 +405,7 @@ public class MessageInputStream extends InputStream {
ba.setData(null);
}
_locallyClosed = true;
_dataLock.notifyAll();
}
}

View File

@ -8,6 +8,7 @@ import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/**
* A stream that we can shove data into that fires off those bytes
@ -26,6 +27,11 @@ public class MessageOutputStream extends OutputStream {
private long _written;
private int _writeTimeout;
private ByteCache _dataCache;
private Flusher _flusher;
private long _lastFlushed;
private long _lastBuffered;
/** if we enqueue data but don't flush it in this period, flush it passively */
private int _passiveFlushDelay;
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
@ -41,6 +47,10 @@ public class MessageOutputStream extends OutputStream {
_written = 0;
_closed = false;
_writeTimeout = -1;
_passiveFlushDelay = 5*1000;
_flusher = new Flusher();
if (_log.shouldLog(Log.DEBUG))
_log.debug("MessageOutputStream created");
}
public void setWriteTimeout(int ms) { _writeTimeout = ms; }
@ -51,10 +61,11 @@ public class MessageOutputStream extends OutputStream {
}
public void write(byte b[], int off, int len) throws IOException {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("write(b[], " + off + ", " + len + ")");
if (_log.shouldLog(Log.DEBUG))
_log.debug("write(b[], " + off + ", " + len + ") ");
int cur = off;
int remaining = len;
long begin = _context.clock().now();
while (remaining > 0) {
WriteStatus ws = null;
// we do any waiting outside the synchronized() block because we
@ -70,6 +81,11 @@ public class MessageOutputStream extends OutputStream {
cur += remaining;
_written += remaining;
remaining = 0;
_lastBuffered = _context.clock().now();
if (_passiveFlushDelay > 0) {
// if it is already enqueued, this just pushes it further out
SimpleTimer.getInstance().addEvent(_flusher, _passiveFlushDelay);
}
} else {
// buffer whatever we can fit then flush,
// repeating until we've pushed all of the
@ -87,9 +103,12 @@ public class MessageOutputStream extends OutputStream {
_written += _valid;
_valid = 0;
throwAnyError();
_lastFlushed = _context.clock().now();
}
}
if (ws != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Waiting " + _writeTimeout + "ms for accept of " + ws);
// ok, we've actually added a new packet - lets wait until
// its accepted into the queue before moving on (so that we
// dont fill our buffer instantly)
@ -100,8 +119,14 @@ public class MessageOutputStream extends OutputStream {
else
throw new IOException("Write not accepted into the queue");
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Queued " + len + " without sending to the receiver");
}
}
long elapsed = _context.clock().now() - begin;
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo"));
throwAnyError();
}
@ -110,6 +135,33 @@ public class MessageOutputStream extends OutputStream {
throwAnyError();
}
/**
* Flush data that has been enqued but not flushed after a certain
* period of inactivity
*/
private class Flusher implements SimpleTimer.TimedEvent {
public void timeReached() {
boolean sent = false;
WriteStatus ws = null;
synchronized (_dataLock) {
if ( (_valid > 0) && (_lastBuffered + _passiveFlushDelay > _context.clock().now()) ) {
if ( (_buf != null) && (_dataReceiver != null) ) {
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
_lastFlushed = _context.clock().now();
_dataLock.notifyAll();
sent = true;
}
}
}
// ignore the ws
if (sent && _log.shouldLog(Log.DEBUG))
_log.debug("Passive flush of " + ws);
}
}
/**
* Flush the data already queued up, blocking until it has been
* delivered.
@ -118,6 +170,7 @@ public class MessageOutputStream extends OutputStream {
* @throws InterruptedIOException if the write times out
*/
public void flush() throws IOException {
long begin = _context.clock().now();
WriteStatus ws = null;
synchronized (_dataLock) {
if (_buf == null) throw new IOException("closed (buffer went away)");
@ -128,6 +181,7 @@ public class MessageOutputStream extends OutputStream {
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
_lastFlushed = _context.clock().now();
_dataLock.notifyAll();
}
@ -137,6 +191,8 @@ public class MessageOutputStream extends OutputStream {
( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ||
(_writeTimeout <= 0) ) )
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
else if ( (_writeTimeout <= 0) || (_writeTimeout > Connection.DISCONNECT_TIMEOUT) )
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
else
ws.waitForCompletion(_writeTimeout);
if (_log.shouldLog(Log.DEBUG))
@ -145,6 +201,10 @@ public class MessageOutputStream extends OutputStream {
throw new InterruptedIOException("Timed out during write");
else if (ws.writeFailed())
throw new IOException("Write failed");
long elapsed = _context.clock().now() - begin;
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("wtf, took " + elapsed + "ms to flush the stream?\n" + ws, new Exception("bar"));
throwAnyError();
}
@ -182,6 +242,7 @@ public class MessageOutputStream extends OutputStream {
_buf = null;
_valid = 0;
}
_lastFlushed = _context.clock().now();
_dataLock.notifyAll();
}
if (ba != null) {
@ -221,6 +282,7 @@ public class MessageOutputStream extends OutputStream {
_written += _valid;
_valid = 0;
_dataLock.notifyAll();
_lastFlushed = _context.clock().now();
}
if (blocking && ws != null) {
ws.waitForAccept(_writeTimeout);

View File

@ -194,6 +194,12 @@ class I2PSessionImpl2 extends I2PSessionImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
+ " / " + state.getNonce() + " found = " + found);
long timeToSend = afterRemovingSync - beforeSendingSync;
if ( (timeToSend > 10*1000) && (_log.shouldLog(Log.WARN)) ) {
_log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz"));
}
if (found) {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with "

View File

@ -1,4 +1,17 @@
$Id: history.txt,v 1.88 2004/11/29 16:57:14 jrandom Exp $
$Id: history.txt,v 1.89 2004/11/29 18:24:50 jrandom Exp $
2004-11-30 jrandom
* Render the burst rate fields on /config.jsp properly (thanks ugha!)
* Build in a simple timeout to flush data queued into the I2PSocket but
not yet flushed.
* Don't explicitly flush after each SAM stream write, but leave it up to
the [nonblocking] passive flush.
* Don't whine about 10-99 connection events occurring in a second
* Don't wait for completion of packets that will not be ACKed (duh)
* Adjust the congestion window, even if the packet was resent (duh)
* Make sure to wake up any blocking read()'s when the MessageInputStream
is close()ed (duh)
* Never wait more than the disconnect timeout for a write to complete
2004-11-29 jrandom
* Minor fixes to avoid unnecessary errors on shutdown (thanks susi!)

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.93 $ $Date: 2004/11/29 16:57:14 $";
public final static String ID = "$Revision: 1.94 $ $Date: 2004/11/29 18:24:49 $";
public final static String VERSION = "0.4.2";
public final static long BUILD = 6;
public final static long BUILD = 7;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);