2004-12-14 jrandom
* Reenable the probabalistic drop on the TCP queues to deal with good old fashioned bandwidth limiting. However, by default the probability is rigged to reserve 0% of the queue free - meaning we just aggressively fail messages in the queue if we're transferring too slowly. That reservation factor can be increased with 'tcp.queueFreeFactor=0.25' (or whatever) and the drop code can be disabled with the parameter 'tcp.dropProbabalistically=false'. * Still penalize a peer on tunnel failure, but don't immediately drop their capacity to 0. * More aggressively ACK duplicates * Randomize the timestamper period * Display the clock skew on the connection logs when a peer sends it. * Allow the timestamper to fix skews of up to 10 minutes * Logging
This commit is contained in:
@ -547,7 +547,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
|
||||
|
||||
try {
|
||||
if (!streamSession.sendBytes(id, getClientSocketInputStream(), size)) { // data)) {
|
||||
_log.error("STREAM SEND [" + size + "] failed");
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("STREAM SEND [" + size + "] failed");
|
||||
boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n");
|
||||
streamSession.closeConnection(id);
|
||||
return rv;
|
||||
@ -698,7 +699,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
|
||||
try {
|
||||
closeClientSocket();
|
||||
} catch (IOException e) {
|
||||
_log.error("Error closing socket: " + e.getMessage());
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error closing socket", e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -733,7 +735,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
|
||||
try {
|
||||
closeClientSocket();
|
||||
} catch (IOException e) {
|
||||
_log.error("Error closing socket: " + e.getMessage());
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error closing socket", e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -802,7 +805,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
|
||||
try {
|
||||
closeClientSocket();
|
||||
} catch (IOException e) {
|
||||
_log.error("Error closing socket: " + e.getMessage());
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error closing socket", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -366,7 +366,8 @@ public class ConnectionManager {
|
||||
if (removed) {
|
||||
if (_notifier != null)
|
||||
_notifier.pingComplete(false);
|
||||
_log.error("Ping failed");
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Ping failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -65,6 +65,8 @@ public class ConnectionPacketHandler {
|
||||
if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
|
||||
con.closeReceived();
|
||||
|
||||
boolean fastAck = false;
|
||||
|
||||
if (isNew) {
|
||||
con.incrementUnackedPacketsReceived();
|
||||
con.incrementBytesReceived(packet.getPayloadSize());
|
||||
@ -93,7 +95,8 @@ public class ConnectionPacketHandler {
|
||||
_log.warn("congestion.. dup " + packet);
|
||||
SimpleTimer.getInstance().addEvent(new AckDup(con), con.getOptions().getSendAckDelay());
|
||||
//con.incrementUnackedPacketsReceived();
|
||||
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
|
||||
//con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
|
||||
fastAck = true;
|
||||
} else {
|
||||
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
|
||||
//con.incrementUnackedPacketsReceived();
|
||||
@ -105,10 +108,10 @@ public class ConnectionPacketHandler {
|
||||
}
|
||||
}
|
||||
|
||||
boolean fastAck = ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew);
|
||||
fastAck = fastAck || ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew);
|
||||
con.eventOccurred();
|
||||
if (fastAck) {
|
||||
if (con.getLastSendTime() + con.getOptions().getRTT() < _context.clock().now()) {
|
||||
if (con.getLastSendTime() + 1000 < _context.clock().now()) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Fast ack for dup " + packet);
|
||||
con.ackImmediately();
|
||||
|
@ -124,7 +124,8 @@ public class Timestamper implements Runnable {
|
||||
alreadyBitched = true;
|
||||
}
|
||||
}
|
||||
try { Thread.sleep(_queryFrequency); } catch (InterruptedException ie) {}
|
||||
long sleepTime = _context.random().nextInt(_queryFrequency) + _queryFrequency;
|
||||
try { Thread.sleep(sleepTime); } catch (InterruptedException ie) {}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
_log.log(Log.CRIT, "Timestamper died!", t);
|
||||
|
@ -42,8 +42,8 @@ public class Clock implements Timestamper.UpdateListener {
|
||||
|
||||
/** if the clock is skewed by 3+ days, fuck 'em */
|
||||
public final static long MAX_OFFSET = 3 * 24 * 60 * 60 * 1000;
|
||||
/** after we've started up and shifted the clock, don't allow shifts of more than a minute */
|
||||
public final static long MAX_LIVE_OFFSET = 60 * 1000;
|
||||
/** after we've started up and shifted the clock, don't allow shifts of more than 10 minutes */
|
||||
public final static long MAX_LIVE_OFFSET = 10 * 60 * 1000;
|
||||
/** if the clock skewed changes by less than 1s, ignore the update (so we don't slide all over the place) */
|
||||
public final static long MIN_OFFSET_CHANGE = 10 * 1000;
|
||||
|
||||
|
18
history.txt
18
history.txt
@ -1,4 +1,20 @@
|
||||
$Id: history.txt,v 1.104 2004/12/11 04:26:24 jrandom Exp $
|
||||
$Id: history.txt,v 1.105 2004/12/13 08:45:52 jrandom Exp $
|
||||
|
||||
2004-12-14 jrandom
|
||||
* Reenable the probabalistic drop on the TCP queues to deal with good old
|
||||
fashioned bandwidth limiting. However, by default the probability is
|
||||
rigged to reserve 0% of the queue free - meaning we just aggressively
|
||||
fail messages in the queue if we're transferring too slowly. That
|
||||
reservation factor can be increased with 'tcp.queueFreeFactor=0.25'
|
||||
(or whatever) and the drop code can be disabled with the parameter
|
||||
'tcp.dropProbabalistically=false'.
|
||||
* Still penalize a peer on tunnel failure, but don't immediately drop
|
||||
their capacity to 0.
|
||||
* More aggressively ACK duplicates
|
||||
* Randomize the timestamper period
|
||||
* Display the clock skew on the connection logs when a peer sends it.
|
||||
* Allow the timestamper to fix skews of up to 10 minutes
|
||||
* Logging
|
||||
|
||||
2004-12-13 jrandom
|
||||
* Added some error checking on the new client send job (thanks duck!)
|
||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.109 $ $Date: 2004/12/11 04:26:24 $";
|
||||
public final static String ID = "$Revision: 1.110 $ $Date: 2004/12/13 08:45:52 $";
|
||||
public final static String VERSION = "0.4.2.3";
|
||||
public final static long BUILD = 3;
|
||||
public final static long BUILD = 4;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION);
|
||||
System.out.println("Router ID: " + RouterVersion.ID);
|
||||
|
@ -95,9 +95,9 @@ public class CapacityCalculator extends Calculator {
|
||||
if (curFailed != null)
|
||||
failed = curFailed.getCurrentEventCount() + curFailed.getLastEventCount();
|
||||
if (failed > 0) {
|
||||
if ( (period <= 10*60*1000) && (curFailed.getCurrentEventCount() > 0) )
|
||||
return 0.0d; // their tunnels have failed in the last 0-10 minutes
|
||||
else
|
||||
//if ( (period <= 10*60*1000) && (curFailed.getCurrentEventCount() > 0) )
|
||||
// return 0.0d; // their tunnels have failed in the last 0-10 minutes
|
||||
//else
|
||||
val -= failed * stretch;
|
||||
}
|
||||
|
||||
|
@ -177,6 +177,8 @@ public class TransportManager implements TransportEventListener {
|
||||
}
|
||||
|
||||
private List orderBids(HashSet bids, OutNetMessage msg) {
|
||||
if (bids.size() <= 1)
|
||||
return new ArrayList(bids);
|
||||
// db messages should go as fast as possible, while the others
|
||||
// should use as little bandwidth as possible.
|
||||
I2NPMessage message = msg.getMessage();
|
||||
|
@ -410,12 +410,13 @@ public class ConnectionBuilder {
|
||||
RouterInfo peer = new RouterInfo();
|
||||
peer.readBytes(_rawIn);
|
||||
int status = (int)_rawIn.read() & 0xFF;
|
||||
boolean ok = validateStatus(status);
|
||||
if (!ok) return false;
|
||||
|
||||
Properties props = DataHelper.readProperties(_rawIn);
|
||||
// ignore these now
|
||||
|
||||
boolean ok = validateStatus(status, props);
|
||||
if (!ok) return false;
|
||||
|
||||
Hash readHash = new Hash();
|
||||
readHash.readBytes(_rawIn);
|
||||
|
||||
@ -564,12 +565,13 @@ public class ConnectionBuilder {
|
||||
RouterInfo peer = new RouterInfo();
|
||||
peer.readBytes(_rawIn);
|
||||
int status = (int)_rawIn.read() & 0xFF;
|
||||
boolean ok = validateStatus(status);
|
||||
if (!ok) return false;
|
||||
|
||||
Properties props = DataHelper.readProperties(_rawIn);
|
||||
// ignore these now
|
||||
|
||||
boolean ok = validateStatus(status, props);
|
||||
if (!ok) return false;
|
||||
|
||||
Signature sig = new Signature();
|
||||
sig.readBytes(_rawIn);
|
||||
|
||||
@ -620,7 +622,7 @@ public class ConnectionBuilder {
|
||||
*
|
||||
* @return true if ok, false if fail()ed
|
||||
*/
|
||||
private boolean validateStatus(int status) {
|
||||
private boolean validateStatus(int status, Properties props) {
|
||||
switch (status) {
|
||||
case -1: // EOF
|
||||
fail("Error reading the status from "
|
||||
@ -636,7 +638,7 @@ public class ConnectionBuilder {
|
||||
case ConnectionHandler.STATUS_SKEWED:
|
||||
fail("According to "
|
||||
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
|
||||
+ ", our clock is off");
|
||||
+ ", our clock is off (they think it is " + props.getProperty("SKEW") + ")");
|
||||
return false;
|
||||
case ConnectionHandler.STATUS_SIGNATURE_FAILED: // (only for new sessions)
|
||||
fail("Signature failure talking to "
|
||||
|
@ -185,7 +185,7 @@ public class TCPConnection {
|
||||
}
|
||||
|
||||
private boolean shouldDropProbabalistically() {
|
||||
return Boolean.valueOf(_context.getProperty("tcp.dropProbabalistically", "false")).booleanValue();
|
||||
return Boolean.valueOf(_context.getProperty("tcp.dropProbabalistically", "true")).booleanValue();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -210,12 +210,34 @@ public class TCPConnection {
|
||||
long sendRate = getSendRate();
|
||||
long bytesSendableUntilFirstExpire = sendRate * (earliestExpiration - _context.clock().now()) / 1000;
|
||||
|
||||
// try to keep the queue less than half full
|
||||
long excessQueued = bytesQueued - (bytesSendableUntilFirstExpire/2);
|
||||
// pretend that instead of being able to push bytesSendableUntilFirstExpire,
|
||||
// that we can only push a fraction of that amount, causing us to probabalistically
|
||||
// drop more than is necessary (leaving a fraction of the queue 'free' for bursts)
|
||||
long excessQueued = (long)(bytesQueued - ((double)bytesSendableUntilFirstExpire * (1.0-getQueueFreeFactor())));
|
||||
if ( (excessQueued > 0) && (_pendingMessages.size() > 1) && (_transport != null) )
|
||||
locked_probabalisticDrop(excessQueued);
|
||||
}
|
||||
|
||||
/**
|
||||
* by default, try to keep the queue completely full, but this can be overridden
|
||||
* with the property 'tcp.queueFreeFactor'
|
||||
*
|
||||
*/
|
||||
public static final double DEFAULT_QUEUE_FREE_FACTOR = 0.0;
|
||||
|
||||
private double getQueueFreeFactor() {
|
||||
String factor = _context.getProperty("tcp.queueFreeFactor");
|
||||
if (factor != null) {
|
||||
try {
|
||||
return Double.parseDouble(factor);
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Invalid tcp.queueFreeFactor [" + factor + "]", nfe);
|
||||
}
|
||||
}
|
||||
return DEFAULT_QUEUE_FREE_FACTOR;
|
||||
}
|
||||
|
||||
/** how many Bps we are sending data to the peer (or 2KBps if we don't know) */
|
||||
public long getSendRate() {
|
||||
if (_sendRate == null) return 2*1024;
|
||||
|
Reference in New Issue
Block a user