2004-12-15 jrandom

* Handle hard disconnects more gracefully within the streaming lib, and
      log unmonitored events more aggressively.
    * If we drop a peer after connection due to clock skew, log it to the
      /logs.jsp#connectionlogs with relevent info.  In addition, toss it in
      the stat 'tcp.disconnectAfterSkew'.
    * Fixed the formatting in the skew display
    * Added an ERROR message that is fired once after we run out of
      routerInfo files (thanks susi!)
    * Set the connect timeout equal to the streaming lib's disconnect timeout
      if not already specified (the I2PTunnel httpclient already enforces a
      60s connect timeout)
    * Fix for another connection startup problem in the streaming lib.
    * Fix for a stupid error in the probabalistic drop (rand <= P, not > P)
    * Adjust the capacity calculations so that tunnel failures alone in the
      last 10m will not trigger a 0 capacity rank.
This commit is contained in:
jrandom
2004-12-16 02:45:55 +00:00
committed by zzz
parent 5c72aca5ee
commit 66aa29e3d4
17 changed files with 164 additions and 22 deletions

View File

@ -33,6 +33,7 @@ public class Connection {
private long _lastSendId;
private boolean _resetReceived;
private boolean _connected;
private boolean _hardDisconnected;
private MessageInputStream _inputStream;
private MessageOutputStream _outputStream;
private SchedulerChooser _chooser;
@ -171,6 +172,22 @@ public class Connection {
void ackImmediately() {
_receiver.send(null, 0, 0);
}
/**
* got a packet we shouldn't have, send 'em a reset
*
*/
void sendReset() {
if ( (_remotePeer == null) || (_sendStreamId == null) ) return;
PacketLocal reply = new PacketLocal(_context, _remotePeer);
reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(_sendStreamId);
reply.setReceiveStreamId(_receiveStreamId);
reply.setOptionalFrom(_connectionManager.getSession().getMyDestination());
// this just sends the packet - no retries or whatnot
_outboundQueue.enqueue(reply);
}
/**
* Flush any data that we can
@ -362,6 +379,7 @@ public class Connection {
public boolean getResetReceived() { return _resetReceived; }
public boolean getIsConnected() { return _connected; }
public boolean getHardDisconnected() { return _hardDisconnected; }
void disconnect(boolean cleanDisconnect) {
disconnect(cleanDisconnect, true);
@ -371,6 +389,13 @@ public class Connection {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Disconnecting " + toString(), new Exception("discon"));
if (!cleanDisconnect) {
_hardDisconnected = true;
if (_log.shouldLog(Log.WARN))
_log.warn("Hard disconnecting and sending a reset on " + toString(), new Exception("cause"));
sendReset();
}
if (cleanDisconnect && _connected) {
// send close packets and schedule stuff...
_outputStream.closeInternal();

View File

@ -89,6 +89,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT));
setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE);
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
}
/**

View File

@ -25,6 +25,7 @@ public class ConnectionPacketHandler {
_context.statManager().createRateStat("stream.con.receiveDuplicateSize", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.resetReceived", "How many messages had we sent successfully before receiving a RESET?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
}
/** distribute a packet to the connection specified */
@ -35,6 +36,21 @@ public class ConnectionPacketHandler {
_log.error("Packet does NOT verify: " + packet);
return;
}
if (con.getHardDisconnected()) {
if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ||
(packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) || (packet.isFlagSet(Packet.FLAG_CLOSE)) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received a data packet after hard disconnect: " + packet + " on " + con);
con.sendReset();
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Received a packet after hard disconnect, ignoring: " + packet + " on " + con);
}
return;
}
con.packetReceived();
long ready = con.getInputStream().getHighestReadyBockId();
@ -84,7 +100,8 @@ public class ConnectionPacketHandler {
_log.debug("Scheduling ack in " + delay + "ms for received packet " + packet);
}
} else {
if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ) {
if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ||
(packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
_context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize(), 0);
con.incrementDupMessagesReceived(1);
@ -267,6 +284,8 @@ public class ConnectionPacketHandler {
con.resetReceived();
con.eventOccurred();
_context.statManager().addRateData("stream.resetReceived", con.getHighestAckedThrough(), con.getLifetime());
// no further processing
return;
}

View File

@ -32,6 +32,8 @@ public class I2PSocketFull implements I2PSocket {
destroy();
}
Connection getConnection() { return _connection; }
public InputStream getInputStream() {
return _connection.getInputStream();
}

View File

@ -152,7 +152,8 @@ public class PacketHandler {
}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive a syn packet with the wrong IDs: " + packet);
_log.warn("Receive a syn packet with the wrong IDs, sending reset: " + packet);
sendReset(packet);
}
} else {
// someone is sending us a packet on the wrong stream
@ -162,6 +163,17 @@ public class PacketHandler {
}
}
private void sendReset(Packet packet) {
PacketLocal reply = new PacketLocal(_context, packet.getOptionalFrom());
reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(packet.getReceiveStreamId());
reply.setReceiveStreamId(packet.getSendStreamId());
reply.setOptionalFrom(_manager.getSession().getMyDestination());
// this just sends the packet - no retries or whatnot
_manager.getPacketQueue().enqueue(reply);
}
private void receiveUnknownCon(Packet packet, byte sendId[]) {
if (packet.isFlagSet(Packet.FLAG_ECHO)) {
if (packet.getSendStreamId() != null) {

View File

@ -38,6 +38,7 @@ class SchedulerChooser {
private List createSchedulers() {
List rv = new ArrayList(8);
rv.add(new SchedulerHardDisconnected(_context));
rv.add(new SchedulerPreconnect(_context));
rv.add(new SchedulerConnecting(_context));
rv.add(new SchedulerReceived(_context));
@ -54,8 +55,7 @@ class SchedulerChooser {
}
public void eventOccurred(Connection con) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Event occurred on " + con, new Exception("source"));
_log.log(Log.CRIT, "Yell at jrandom: Event occurred on " + con, new Exception("source"));
}
public boolean accept(Connection con) { return true; }
};

View File

@ -37,7 +37,7 @@ class SchedulerConnecting extends SchedulerImpl {
public boolean accept(Connection con) {
if (con == null) return false;
boolean notYetConnected = (con.getIsConnected()) &&
(con.getSendStreamId() == null) &&
//(con.getSendStreamId() == null) && // not null on recv
(con.getLastSendId() >= 0) &&
(con.getAckedPackets() <= 0) &&
(!con.getResetReceived());
@ -55,6 +55,7 @@ class SchedulerConnecting extends SchedulerImpl {
_log.debug("waited too long: " + waited);
return;
} else {
// should we be doing a con.sendAvailable here?
if (con.getOptions().getConnectTimeout() > 0)
reschedule(con.getOptions().getConnectTimeout(), con);
}

View File

@ -0,0 +1,45 @@
package net.i2p.client.streaming;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
/**
* <p>Scheduler used after we've locally done a hard disconnect,
* but the final timeout hasn't passed.</p>
*
* <h2>Entry conditions:</h2><ul>
* <li>Locally disconnected hard.</li>
* <li>Less than the final timeout period has passed since the last ACK.</li>
* </ul>
*
* <h2>Events:</h2><ul>
* <li>Packets received</li>
* <li>RESET received</li>
* <li>Message sending fails (error talking to the session)</li>
* </ul>
*
* <h2>Next states:</h2>
* <li>{@link SchedulerDead dead} - after the final timeout passes</li>
* </ul>
*
*
*/
class SchedulerHardDisconnected extends SchedulerImpl {
private Log _log;
public SchedulerHardDisconnected(I2PAppContext ctx) {
super(ctx);
_log = ctx.logManager().getLog(SchedulerHardDisconnected.class);
}
public boolean accept(Connection con) {
if (con == null) return false;
long timeSinceClose = _context.clock().now() - con.getCloseSentOn();
boolean ok = (con.getHardDisconnected()) &&
(timeSinceClose < Connection.DISCONNECT_TIMEOUT);
return ok;
}
public void eventOccurred(Connection con) {
// noop. we do the timeout through the simpleTimer anyway
}
}