propagate from branch 'i2p.i2p.zzz.test' (head 8424049f1510c378ac5c6d74a51fcc914f6082f5)

to branch 'i2p.i2p' (head d14d24978b11daeff7d37002b7ac3ec5b5535475)
This commit is contained in:
zzz
2009-02-07 21:18:06 +00:00
287 changed files with 1392 additions and 52957 deletions

View File

@ -124,6 +124,7 @@ public class Connection {
_isInbound = false;
_updatedShareOpts = false;
_connectionEvent = new ConEvent();
_hardDisconnected = false;
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -151,7 +152,7 @@ public class Connection {
* @return true if the packet should be sent
*/
boolean packetSendChoke(long timeoutMs) {
if (false) return true;
// if (false) return true; // <--- what the fuck??
long start = _context.clock().now();
long writeExpire = start + timeoutMs;
boolean started = false;
@ -165,9 +166,9 @@ public class Connection {
// no need to wait until the other side has ACKed us before sending the first few wsize
// packets through
// if (!_connected)
// return false;
// Incorrect assumption, the constructor defaults _connected to true --Sponge
if (!_connected)
return false;
started = true;
if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ||
(_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) {
@ -183,12 +184,12 @@ public class Connection {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/"
+ _activeResends + "), waiting " + timeLeft);
try { _outboundPackets.wait(timeLeft); } catch (InterruptedException ie) {}
try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) {}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
+ "), waiting indefinitely");
try { _outboundPackets.wait(10*1000); } catch (InterruptedException ie) {}
try { _outboundPackets.wait(250); } catch (InterruptedException ie) {} //10*1000
}
} else {
_context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start);
@ -490,7 +491,6 @@ public class Connection {
synchronized (_connectLock) { _connectLock.notifyAll(); }
if (_log.shouldLog(Log.DEBUG))
_log.debug("Disconnecting " + toString(), new Exception("discon"));
if (!cleanDisconnect) {
_hardDisconnected = true;
if (_log.shouldLog(Log.WARN))

View File

@ -97,6 +97,7 @@ public class MessageOutputStream extends OutputStream {
long begin = _context.clock().now();
while (remaining > 0) {
WriteStatus ws = null;
if (_closed) throw new IOException("closed underneath us");
// we do any waiting outside the synchronized() block because we
// want to allow other threads to flushAvailable() whenever they want.
// this is the only method that *adds* to the _buf, and all