2005-12-30 jrandom

* Small streaming lib bugfixes for the modified timeouts
    * Minor Syndie/Sucker RSS html fix
    * Small synchronization fix in I2PSnark (thanks fsm!)
This commit is contained in:
jrandom
2005-12-30 20:57:53 +00:00
committed by zzz
parent 5b1a6391f3
commit 8e87ae08fb
11 changed files with 47 additions and 25 deletions

View File

@ -123,8 +123,8 @@ public class Connection {
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
if (_log.shouldLog(Log.DEBUG))
_log.debug("New connection created with options: " + _options);
if (_log.shouldLog(Log.INFO))
_log.info("New connection created with options: " + _options);
}
public long getNextOutboundPacketNum() {
@ -164,11 +164,13 @@ public class Connection {
started = true;
if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ||
(_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) {
if (writeExpire > 0) {
if (timeoutMs > 0) {
if (timeLeft <= 0) {
_log.error("Outbound window is full of " + _outboundPackets.size()
+ " with " + _activeResends + " active resends"
+ " and we've waited too long (" + writeExpire + "ms)");
if (_log.shouldLog(Log.INFO))
_log.info("Outbound window is full of " + _outboundPackets.size()
+ " with " + _activeResends + " active resends"
+ " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): "
+ toString());
return false;
}
if (_log.shouldLog(Log.DEBUG))
@ -387,8 +389,8 @@ public class Connection {
_ackedPackets++;
if (p.getNumSends() > 1) {
_activeResends--;
if (_log.shouldLog(Log.INFO))
_log.info("Active resend of " + p + " successful, # active left: " + _activeResends);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Active resend of " + p + " successful, # active left: " + _activeResends);
}
}
}

View File

@ -355,6 +355,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
buf.append(" cwin=").append(_windowSize);
buf.append(" maxResends=").append(_maxResends);
buf.append(" writeTimeout=").append(getWriteTimeout());
buf.append(" readTimeout=").append(getReadTimeout());
buf.append(" inactivityTimeout=").append(_inactivityTimeout);
buf.append(" inboundBuffer=").append(_inboundBufferSize);
buf.append(" maxWindowSize=").append(_maxWindowSize);

View File

@ -158,7 +158,11 @@ public class MessageInputStream extends InputStream {
* but if it is 0, do not block at all)
*/
public int getReadTimeout() { return _readTimeout; }
public void setReadTimeout(int timeout) { _readTimeout = timeout; }
public void setReadTimeout(int timeout) {
if (_log.shouldLog(Log.INFO))
_log.info("Changing read timeout from " + _readTimeout + " to " + timeout);
_readTimeout = timeout;
}
public void closeReceived() {
synchronized (_dataLock) {
@ -302,8 +306,8 @@ public class MessageInputStream extends InputStream {
throwAnyError();
} else { // readTimeout == 0
// noop, don't block
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset+", " + length+ ")[" + i
if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with nonblocking setup: " + toString());
return i;
}
@ -320,8 +324,8 @@ public class MessageInputStream extends InputStream {
// we looped a few times then got data, so this pass doesnt count
i--;
} else if (_readyDataBlocks.size() <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset+", " + length+ ")[" + i
if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] no more ready blocks, returning");
return i;
} else {
@ -351,7 +355,7 @@ public class MessageInputStream extends InputStream {
} // synchronized (_dataLock)
if (_log.shouldLog(Log.DEBUG))
_log.info("read(...," + offset+", " + length+ ") read fully total read: " +_readTotal);
_log.debug("read(...," + offset+", " + length+ ") read fully total read: " +_readTotal);
return length;
}
@ -370,7 +374,7 @@ public class MessageInputStream extends InputStream {
}
}
if (_log.shouldLog(Log.DEBUG))
_log.info("available(): " + numBytes + " " + toString());
_log.debug("available(): " + numBytes + " " + toString());
return numBytes;
}

View File

@ -68,7 +68,12 @@ public class MessageOutputStream extends OutputStream {
_log.debug("MessageOutputStream created");
}
public void setWriteTimeout(int ms) { _writeTimeout = ms; }
public void setWriteTimeout(int ms) {
if (_log.shouldLog(Log.INFO))
_log.info("Changing write timeout from " + _writeTimeout + " to " + ms);
_writeTimeout = ms;
}
public int getWriteTimeout() { return _writeTimeout; }
public void setBufferSize(int size) { _nextBufferSize = size; }

View File

@ -516,7 +516,8 @@ public class Packet {
boolean ok = ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey());
if (!ok) {
Log l = ctx.logManager().getLog(Packet.class);
l.error("Signature failed on " + toString(), new Exception("moo"));
if (l.shouldLog(Log.WARN))
l.warn("Signature failed on " + toString(), new Exception("moo"));
if (false) {
l.error(Base64.encode(buffer, 0, size));
l.error("Signature: " + Base64.encode(_optionSignature.getData()));