forked from I2P_Developers/i2p.i2p
propagate from branch 'i2p.i2p' (head d32b82100cf6076e8f3de30b6a0edfbb034caac7)
to branch 'i2p.i2p.zzz.pcap' (head 551957edb05526df88ff3a2b3c717faed4aac906)
This commit is contained in:
@ -191,9 +191,9 @@ public class Connection {
|
||||
+ _activeResends + "), waiting " + timeLeft);
|
||||
try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
|
||||
+ "), waiting indefinitely");
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
|
||||
// + "), waiting indefinitely");
|
||||
try { _outboundPackets.wait(250); } catch (InterruptedException ie) {if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")"); return false;} //10*1000
|
||||
}
|
||||
} else {
|
||||
@ -299,37 +299,48 @@ public class Connection {
|
||||
|
||||
if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
|
||||
ackOnly = true;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("No resend for " + packet);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("No resend for " + packet);
|
||||
} else {
|
||||
int remaining = 0;
|
||||
int windowSize;
|
||||
int remaining;
|
||||
synchronized (_outboundPackets) {
|
||||
_outboundPackets.put(new Long(packet.getSequenceNum()), packet);
|
||||
remaining = _options.getWindowSize() - _outboundPackets.size() ;
|
||||
windowSize = _options.getWindowSize();
|
||||
remaining = windowSize - _outboundPackets.size() ;
|
||||
_outboundPackets.notifyAll();
|
||||
}
|
||||
if (remaining < 0)
|
||||
remaining = 0;
|
||||
if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) {
|
||||
// the other end has no idea what our window size is, so
|
||||
// help him out by requesting acks below the 1/3 point,
|
||||
// if remaining < 3, and every 8 minimum.
|
||||
if (packet.isFlagSet(Packet.FLAG_CLOSE) ||
|
||||
(remaining < (windowSize + 2) / 3) ||
|
||||
(remaining < 3) ||
|
||||
(packet.getSequenceNum() % 8 == 0)) {
|
||||
packet.setOptionalDelay(0);
|
||||
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Requesting no ack delay for packet " + packet);
|
||||
} else {
|
||||
int delay = _options.getRTO() / 2;
|
||||
// This is somewhat of a waste of time, unless the RTT < 4000,
|
||||
// since the other end limits it to getSendAckDelay()
|
||||
// which is always 2000, but it's good for diagnostics to see what the other end thinks
|
||||
// the RTT is.
|
||||
int delay = _options.getRTT() / 2;
|
||||
packet.setOptionalDelay(delay);
|
||||
if (delay > 0)
|
||||
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Requesting ack delay of " + delay + "ms for packet " + packet);
|
||||
}
|
||||
// WHY always set?
|
||||
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
|
||||
|
||||
long timeout = _options.getRTO();
|
||||
if (timeout > MAX_RESEND_DELAY)
|
||||
timeout = MAX_RESEND_DELAY;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by"));
|
||||
_log.debug("Resend in " + timeout + " for " + packet);
|
||||
|
||||
// schedules itself
|
||||
ResendPacketEvent rpe = new ResendPacketEvent(packet, timeout);
|
||||
@ -372,6 +383,10 @@ public class Connection {
|
||||
}
|
||||
*********/
|
||||
|
||||
/**
|
||||
* Process the acks and nacks received in a packet
|
||||
* @return List of packets acked or null
|
||||
*/
|
||||
List ackPackets(long ackThrough, long nacks[]) {
|
||||
if (ackThrough < _highestAckedThrough) {
|
||||
// dupack which won't tell us anything
|
||||
@ -687,6 +702,14 @@ public class Connection {
|
||||
* @return the next time the scheduler will want to send a packet, or -1 if never.
|
||||
*/
|
||||
public long getNextSendTime() { return _nextSendTime; }
|
||||
|
||||
/**
|
||||
* If the next send time is currently >= 0 (i.e. not "never"),
|
||||
* this may make the next time sooner but will not make it later.
|
||||
* If the next send time is currently < 0 (i.e. "never"),
|
||||
* this will set it to the time specified, but not later than
|
||||
* options.getSendAckDelay() from now (2000 ms)
|
||||
*/
|
||||
public void setNextSendTime(long when) {
|
||||
if (_nextSendTime >= 0) {
|
||||
if (when < _nextSendTime)
|
||||
@ -701,12 +724,12 @@ public class Connection {
|
||||
_nextSendTime = max;
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG) && false) {
|
||||
if (_nextSendTime <= 0)
|
||||
_log.debug("set next send time to an unknown time", new Exception(toString()));
|
||||
else
|
||||
_log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
|
||||
}
|
||||
//if (_log.shouldLog(Log.DEBUG) && false) {
|
||||
// if (_nextSendTime <= 0)
|
||||
// _log.debug("set next send time to an unknown time", new Exception(toString()));
|
||||
// else
|
||||
// _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
|
||||
//}
|
||||
}
|
||||
|
||||
/** how many packets have we sent and the other side has ACKed?
|
||||
@ -839,8 +862,8 @@ public class Connection {
|
||||
}
|
||||
long howLong = _options.getInactivityTimeout();
|
||||
howLong += _randomWait; // randomize it a bit, so both sides don't do it at once
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Resetting the inactivity timer to " + howLong, new Exception(toString()));
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Resetting the inactivity timer to " + howLong);
|
||||
// this will get rescheduled, and rescheduled, and rescheduled...
|
||||
_activityTimer.reschedule(howLong, false); // use the later of current and previous timeout
|
||||
}
|
||||
@ -1091,6 +1114,8 @@ public class Connection {
|
||||
// we want to resend this packet, but there are already active
|
||||
// resends in the air and we dont want to make a bad situation
|
||||
// worse. wait another second
|
||||
// BUG? seq# = 0, activeResends = 0, loop forever - why?
|
||||
// also seen with seq# > 0. Is the _activeResends count reliable?
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Delaying resend of " + _packet + " as there are "
|
||||
+ _activeResends + " active resends already in play");
|
||||
@ -1108,6 +1133,7 @@ public class Connection {
|
||||
_packet.setOptionalDelay(choke);
|
||||
if (choke > 0)
|
||||
_packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
|
||||
// this seems unnecessary to send the MSS again:
|
||||
_packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
|
||||
// bugfix release 0.7.8, we weren't dividing by 1000
|
||||
_packet.setResendDelay(getOptions().getResendDelay() / 1000);
|
||||
|
@ -166,6 +166,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
|
||||
packet.setReceiveStreamId(con.getReceiveStreamId());
|
||||
|
||||
con.getInputStream().updateAcks(packet);
|
||||
// note that the optional delay is usually rewritten in Connection.sendPacket()
|
||||
int choke = con.getOptions().getChoke();
|
||||
packet.setOptionalDelay(choke);
|
||||
if (choke > 0)
|
||||
@ -197,12 +198,9 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
|
||||
( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) {
|
||||
packet.setFlag(Packet.FLAG_CLOSE);
|
||||
con.setCloseSentOn(_context.clock().now());
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Closed is set for a new packet on " + con + ": " + packet);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Closed is not set for a new packet on " + _connection + ": " + packet);
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("New outbound packet on " + _connection + ": " + packet);
|
||||
return packet;
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
public static final String PROP_INITIAL_RESEND_DELAY = "i2p.streaming.initialResendDelay";
|
||||
public static final String PROP_INITIAL_ACK_DELAY = "i2p.streaming.initialAckDelay";
|
||||
public static final String PROP_INITIAL_WINDOW_SIZE = "i2p.streaming.initialWindowSize";
|
||||
/** unused */
|
||||
public static final String PROP_INITIAL_RECEIVE_WINDOW = "i2p.streaming.initialReceiveWindow";
|
||||
public static final String PROP_INACTIVITY_TIMEOUT = "i2p.streaming.inactivityTimeout";
|
||||
public static final String PROP_INACTIVITY_ACTION = "i2p.streaming.inactivityAction";
|
||||
@ -58,6 +59,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
static final int INITIAL_WINDOW_SIZE = 6;
|
||||
static final int DEFAULT_MAX_SENDS = 8;
|
||||
public static final int DEFAULT_INITIAL_RTT = 8*1000;
|
||||
public static final int DEFAULT_INITIAL_ACK_DELAY = 2*1000;
|
||||
static final int MIN_WINDOW_SIZE = 1;
|
||||
private static final boolean DEFAULT_ANSWER_PINGS = true;
|
||||
|
||||
@ -217,7 +219,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
setRTT(getInt(opts, PROP_INITIAL_RTT, DEFAULT_INITIAL_RTT));
|
||||
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
|
||||
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
|
||||
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000));
|
||||
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY));
|
||||
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
|
||||
setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
|
||||
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
|
||||
@ -249,7 +251,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
if (opts.containsKey(PROP_INITIAL_RESEND_DELAY))
|
||||
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
|
||||
if (opts.containsKey(PROP_INITIAL_ACK_DELAY))
|
||||
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000));
|
||||
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY));
|
||||
if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
|
||||
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
|
||||
if (opts.containsKey(PROP_MAX_RESENDS))
|
||||
@ -295,6 +297,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
* @return if we want signatures on all packets.
|
||||
*/
|
||||
public boolean getRequireFullySigned() { return _fullySigned; }
|
||||
/** unused, see above */
|
||||
public void setRequireFullySigned(boolean sign) { _fullySigned = sign; }
|
||||
|
||||
/**
|
||||
@ -325,7 +328,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
}
|
||||
|
||||
/** after how many consecutive messages should we ack?
|
||||
* This doesn't appear to be used.
|
||||
* @deprecated This doesn't appear to be used.
|
||||
* @return receive window size.
|
||||
*/
|
||||
public int getReceiveWindow() { return _receiveWindow; }
|
||||
@ -405,6 +408,10 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
* @return ACK delay in ms
|
||||
*/
|
||||
public int getSendAckDelay() { return _sendAckDelay; }
|
||||
/**
|
||||
* Unused except here, so expect the default initial delay of 2000 ms unless set by the user
|
||||
* to remain constant.
|
||||
*/
|
||||
public void setSendAckDelay(int delayMs) { _sendAckDelay = delayMs; }
|
||||
|
||||
/** What is the largest message we want to send or receive?
|
||||
|
@ -131,11 +131,14 @@ public class ConnectionPacketHandler {
|
||||
isNew = false;
|
||||
}
|
||||
|
||||
if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew
|
||||
+ " packet: " + packet + " con: " + con);
|
||||
}
|
||||
//if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
|
||||
// if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew
|
||||
// + " packet: " + packet + " con: " + con);
|
||||
//}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug((isNew ? "New" : "Dup or ack-only") + " inbound packet on " + con + ": " + packet);
|
||||
|
||||
// close *after* receiving the data, as well as after verifying the signatures / etc
|
||||
if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
|
||||
@ -151,7 +154,9 @@ public class ConnectionPacketHandler {
|
||||
if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED) && (packet.getOptionalDelay() <= 0) ) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Scheduling immediate ack for " + packet);
|
||||
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
|
||||
//con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
|
||||
// honor request "almost" immediately
|
||||
con.setNextSendTime(_context.clock().now() + 250);
|
||||
} else {
|
||||
int delay = con.getOptions().getSendAckDelay();
|
||||
if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) // delayed ACK requested
|
||||
@ -222,6 +227,10 @@ public class ConnectionPacketHandler {
|
||||
// con.fastRetransmit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the acks in a received packet, and adjust our window and RTT
|
||||
* @return are we congested?
|
||||
*/
|
||||
private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) {
|
||||
if (ackThrough < 0) return false;
|
||||
//if ( (nacks != null) && (nacks.length > 0) )
|
||||
@ -287,7 +296,7 @@ public class ConnectionPacketHandler {
|
||||
return adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0), choke);
|
||||
}
|
||||
|
||||
|
||||
/** @return are we congested? */
|
||||
private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) {
|
||||
boolean congested = false;
|
||||
if ( (!isNew) && (sequenceNum > 0) ) {
|
||||
|
@ -99,9 +99,9 @@ public class MessageInputStream extends InputStream {
|
||||
}
|
||||
}
|
||||
private long[] locked_getNacks() {
|
||||
List ids = null;
|
||||
List<Long> ids = null;
|
||||
for (long i = _highestReadyBlockId + 1; i < _highestBlockId; i++) {
|
||||
Long l = new Long(i);
|
||||
Long l = Long.valueOf(i);
|
||||
if (_notYetReadyBlocks.containsKey(l)) {
|
||||
// ACK
|
||||
} else {
|
||||
@ -113,7 +113,7 @@ public class MessageInputStream extends InputStream {
|
||||
if (ids != null) {
|
||||
long rv[] = new long[ids.size()];
|
||||
for (int i = 0; i < rv.length; i++)
|
||||
rv[i] = ((Long)ids.get(i)).longValue();
|
||||
rv[i] = ids.get(i).longValue();
|
||||
return rv;
|
||||
} else {
|
||||
return null;
|
||||
|
Reference in New Issue
Block a user