propagate from branch 'i2p.i2p.zzz.test2' (head b6de226d1664089488ab2b438fe7457e9fb8e563)

to branch 'i2p.i2p' (head 0cf35c87b68a5360bd35257e36dfe7f740e53693)
This commit is contained in:
zzz
2015-04-17 13:18:22 +00:00
18 changed files with 775 additions and 371 deletions

View File

@ -41,7 +41,8 @@ class Connection {
private final MessageInputStream _inputStream;
private final MessageOutputStream _outputStream;
private final SchedulerChooser _chooser;
private volatile long _nextSendTime;
/** Locking: _nextSendLock */
private long _nextSendTime;
private long _ackedPackets;
private final long _createdOn;
private final AtomicLong _closeSentOn = new AtomicLong();
@ -70,6 +71,8 @@ class Connection {
private final AtomicBoolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */
private final Object _connectLock;
/** Locking for _nextSendTime */
private final Object _nextSendLock;
/** how many messages have been resent and not yet ACKed? */
private final AtomicInteger _activeResends = new AtomicInteger();
private final ConEvent _connectionEvent;
@ -145,6 +148,7 @@ class Connection {
_activityTimer = new ActivityTimer();
_ackSinceCongestion = new AtomicBoolean(true);
_connectLock = new Object();
_nextSendLock = new Object();
_connectionEvent = new ConEvent();
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
// all createRateStats in ConnectionManager
@ -907,7 +911,11 @@ class Connection {
* instance, or want to delay an ACK.
* @return the next time the scheduler will want to send a packet, or -1 if never.
*/
public long getNextSendTime() { return _nextSendTime; }
public long getNextSendTime() {
synchronized(_nextSendLock) {
return _nextSendTime;
}
}
/**
* If the next send time is currently >= 0 (i.e. not "never"),
@ -917,25 +925,20 @@ class Connection {
* options.getSendAckDelay() from now (1000 ms)
*/
public void setNextSendTime(long when) {
if (_nextSendTime >= 0) {
if (when < _nextSendTime)
_nextSendTime = when;
} else {
_nextSendTime = when;
}
synchronized(_nextSendLock) {
if (_nextSendTime >= 0) {
if (when < _nextSendTime)
_nextSendTime = when;
} else {
_nextSendTime = when;
}
if (_nextSendTime >= 0) {
long max = _context.clock().now() + _options.getSendAckDelay();
if (max < _nextSendTime)
_nextSendTime = max;
if (_nextSendTime >= 0) {
long max = _context.clock().now() + _options.getSendAckDelay();
if (max < _nextSendTime)
_nextSendTime = max;
}
}
//if (_log.shouldLog(Log.DEBUG) && false) {
// if (_nextSendTime <= 0)
// _log.debug("set next send time to an unknown time", new Exception(toString()));
// else
// _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
//}
}
/** how many packets have we sent and the other side has ACKed?
@ -1260,17 +1263,17 @@ class Connection {
*/
class ResendPacketEvent extends SimpleTimer2.TimedEvent {
private final PacketLocal _packet;
private long _nextSendTime;
private long _nextSend;
public ResendPacketEvent(PacketLocal packet, long delay) {
super(_timer);
_packet = packet;
_nextSendTime = delay + _context.clock().now();
_nextSend = delay + _context.clock().now();
packet.setResendPacketEvent(ResendPacketEvent.this);
schedule(delay);
}
public long getNextSendTime() { return _nextSendTime; }
public long getNextSendTime() { return _nextSend; }
public void timeReached() { retransmit(); }
/**
* Retransmit the packet if we need to.
@ -1320,7 +1323,7 @@ class Connection {
+ _activeResends + " active resend, "
+ _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
forceReschedule(1333);
_nextSendTime = 1333 + _context.clock().now();
_nextSend = 1333 + _context.clock().now();
return false;
}
@ -1407,7 +1410,7 @@ class Connection {
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
timeout = MAX_RESEND_DELAY;
// set this before enqueue() as it passes it on to the router
_nextSendTime = timeout + _context.clock().now();
_nextSend = timeout + _context.clock().now();
if (_outboundQueue.enqueue(_packet)) {
// first resend for this packet ?