* min resend delay = 20s

* rework the messageInputStream to implement read(byte[], off, len), and fix some fencepost
  bugs in the byte retrieval
This commit is contained in:
jrandom
2004-11-08 05:42:57 +00:00
committed by zzz
parent 0c049f39d9
commit 71c1cb4e12
3 changed files with 104 additions and 53 deletions

View File

@ -55,6 +55,7 @@ public class Connection {
private String _connectionError; private String _connectionError;
public static final long MAX_RESEND_DELAY = 60*1000; public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 20*1000;
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) {
this(ctx, manager, chooser, queue, handler, null); this(ctx, manager, chooser, queue, handler, null);
@ -178,7 +179,7 @@ public class Connection {
} }
packet.setFlag(Packet.FLAG_DELAY_REQUESTED); packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
long timeout = (_options.getRTT() < 10000 ? 10000 : _options.getRTT()); long timeout = (_options.getRTT() < MIN_RESEND_DELAY ? MIN_RESEND_DELAY : _options.getRTT());
if (timeout > MAX_RESEND_DELAY) if (timeout > MAX_RESEND_DELAY)
timeout = MAX_RESEND_DELAY; timeout = MAX_RESEND_DELAY;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -491,6 +492,8 @@ public class Connection {
} else { } else {
//long timeout = _options.getResendDelay() << numSends; //long timeout = _options.getResendDelay() << numSends;
long timeout = _options.getRTT() << (numSends-1); long timeout = _options.getRTT() << (numSends-1);
if (timeout < MIN_RESEND_DELAY)
timeout = MIN_RESEND_DELAY;
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) ) if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
timeout = MAX_RESEND_DELAY; timeout = MAX_RESEND_DELAY;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@ -40,8 +40,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if (_connection.getUnackedPacketsReceived() > 0) if (_connection.getUnackedPacketsReceived() > 0)
doSend = true; doSend = true;
if (_log.shouldLog(Log.ERROR) && !doSend) if (_log.shouldLog(Log.INFO) && !doSend)
_log.error("writeData called: size="+size + " doSend=" + doSend _log.info("writeData called: size="+size + " doSend=" + doSend
+ " unackedReceived: " + _connection.getUnackedPacketsReceived() + " unackedReceived: " + _connection.getUnackedPacketsReceived()
+ " con: " + _connection, new Exception("write called by")); + " con: " + _connection, new Exception("write called by"));

View File

@ -53,6 +53,8 @@ public class MessageInputStream extends InputStream {
private IOException _streamError; private IOException _streamError;
private long _readTotal; private long _readTotal;
private byte[] _oneByte = new byte[1];
private Object _dataLock; private Object _dataLock;
public MessageInputStream(I2PAppContext ctx) { public MessageInputStream(I2PAppContext ctx) {
@ -205,6 +207,7 @@ public class MessageInputStream extends InputStream {
if (messageId <= _highestReadyBlockId) { if (messageId <= _highestReadyBlockId) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("ignoring dup message " + messageId); _log.debug("ignoring dup message " + messageId);
_dataLock.notifyAll();
return false; // already received return false; // already received
} }
if (messageId > _highestBlockId) if (messageId > _highestBlockId)
@ -238,56 +241,86 @@ public class MessageInputStream extends InputStream {
_notYetReadyBlocks.put(new Long(messageId), new ByteArray(null)); _notYetReadyBlocks.put(new Long(messageId), new ByteArray(null));
else else
_notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload)); _notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload));
_dataLock.notifyAll();
} }
} }
return true; return true;
} }
public int read() throws IOException { public int read() throws IOException {
int read = read(_oneByte, 0, 1);
if (read < 0)
return -1;
else
return _oneByte[0];
}
public int read(byte target[]) throws IOException {
return read(target, 0, target.length);
}
public int read(byte target[], int offset, int length) throws IOException {
if (_locallyClosed) throw new IOException("Already locally closed"); if (_locallyClosed) throw new IOException("Already locally closed");
throwAnyError(); throwAnyError();
long expiration = -1; long expiration = -1;
if (_readTimeout > 0) if (_readTimeout > 0)
expiration = _readTimeout + System.currentTimeMillis(); expiration = _readTimeout + System.currentTimeMillis();
synchronized (_dataLock) { synchronized (_dataLock) {
while (_readyDataBlocks.size() <= 0) { for (int i = 0; i < length; i++) {
//if (_log.shouldLog(Log.DEBUG)) if ( (_readyDataBlocks.size() <= 0) && (i == 0) ) {
// _log.debug("read() with readyBlocks.size = " + _readyDataBlocks.size() + " on " + toString()); // ok, we havent found anything, so lets block until we get
// at least one byte
while (_readyDataBlocks.size() <= 0) {
if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) { if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.INFO))
_log.debug("read() got EOF after " + _readTotal + " " + toString()); _log.info("read(...," + offset + ", " + length + ")[" + i
+ "] got EOF after " + _readTotal + " " + toString());
return -1; return -1;
} else { } else {
if (_readTimeout < 0) { if (_readTimeout < 0) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read() with no timeout: " + toString()); _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with no timeout: " + toString());
try { _dataLock.wait(); } catch (InterruptedException ie) { } try { _dataLock.wait(); } catch (InterruptedException ie) { }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read() with no timeout complete: " + toString()); _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with no timeout complete: " + toString());
throwAnyError(); throwAnyError();
} else if (_readTimeout > 0) { } else if (_readTimeout > 0) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read() with timeout: " + _readTimeout + ": " + toString()); _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with timeout: " + _readTimeout + ": " + toString());
try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { } try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read() with timeout complete: " + _readTimeout + ": " + toString()); _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with timeout complete: " + _readTimeout + ": " + toString());
throwAnyError(); throwAnyError();
} else { // readTimeout == 0 } else { // readTimeout == 0
// noop, don't block // noop, don't block
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.INFO))
_log.debug("read() with nonblocking setup: " + toString()); _log.info("read(...," + offset+", " + length+ ")[" + i
+ ") with nonblocking setup: " + toString());
return i;
} }
if (_readyDataBlocks.size() <= 0) { if (_readyDataBlocks.size() <= 0) {
if ( (_readTimeout > 0) && (expiration > System.currentTimeMillis()) ) if ( (_readTimeout > 0) && (expiration < System.currentTimeMillis()) ) {
throw new InterruptedIOException("Timeout reading (timeout=" + _readTimeout + ")"); if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset+", " + length+ ")[" + i
+ ") expired: " + toString());
return i;
} }
} }
} }
}
//if (_log.shouldLog(Log.DEBUG)) // we looped a few times then got data, so this pass doesnt count
// _log.debug("read() readyBlocks = " + _readyDataBlocks.size() + ": " + toString()); i--;
} else if (_readyDataBlocks.size() <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset+", " + length+ ")[" + i
+ "] no more ready blocks, returning");
return i;
} else {
// either was already ready, or we wait()ed and it arrived // either was already ready, or we wait()ed and it arrived
ByteArray cur = (ByteArray)_readyDataBlocks.get(0); ByteArray cur = (ByteArray)_readyDataBlocks.get(0);
byte rv = cur.getData()[_readyDataBlockIndex]; byte rv = cur.getData()[_readyDataBlockIndex];
@ -297,17 +330,29 @@ public class MessageInputStream extends InputStream {
_readyDataBlocks.remove(0); _readyDataBlocks.remove(0);
} }
_readTotal++; _readTotal++;
return (rv < 0 ? rv + 256 : rv); target[offset + i] = rv; // rv < 0 ? rv + 256 : rv
if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getData().length - 5) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] after ready data: readyDataBlockIndex=" + _readyDataBlockIndex
+ " readyBlocks=" + _readyDataBlocks.size()
+ " readTotal=" + _readTotal);
} }
} }
} // for (int i = 0; i < length; i++) {
} // synchronized (_dataLock)
if (_log.shouldLog(Log.DEBUG))
_log.info("read(...," + offset+", " + length+ ") read fully total read: " +_readTotal);
return length;
}
public int available() throws IOException { public int available() throws IOException {
if (_locallyClosed) throw new IOException("Already closed, you wanker"); if (_locallyClosed) throw new IOException("Already closed, you wanker");
throwAnyError(); throwAnyError();
synchronized (_dataLock) {
if (_readyDataBlocks.size() <= 0)
return 0;
int numBytes = 0; int numBytes = 0;
synchronized (_dataLock) {
for (int i = 0; i < _readyDataBlocks.size(); i++) { for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = (ByteArray)_readyDataBlocks.get(i); ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
if (i == 0) if (i == 0)
@ -315,8 +360,11 @@ public class MessageInputStream extends InputStream {
else else
numBytes += cur.getData().length; numBytes += cur.getData().length;
} }
return numBytes;
} }
if (_log.shouldLog(Log.DEBUG))
_log.info("available(): " + numBytes + " " + toString());
return numBytes;
} }
/** /**