Transport: More fixes for SSU stalling -

Don't skip further bandwidth allocations for SSU, since
it needs the entire allocation to proceed.
Log tweaks
More synchronization of requests
This commit is contained in:
zzz
2015-06-29 16:02:07 +00:00
parent 34f6f65104
commit fcdd8be7a7
3 changed files with 51 additions and 47 deletions

View File

@ -1,3 +1,9 @@
2015-06-29 zzz
* Transport: More fixes for SSU stalling
2015-06-28 zzz
* Apache Tomcat 6.0.44
2015-06-25 zzz
* Console: Use registered host/port for eepsite link (ticket #1604)
* Jetty starter: Register host/port when started

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 12;
public final static long BUILD = 13;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -433,7 +433,7 @@ public class FIFOBandwidthLimiter {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Still denying the " + _pendingInboundRequests.size()
+ " pending inbound requests (status: " + getStatus().toString()
+ ", longest waited " + locked_getLongestInboundWait() + " in)");
+ ", longest waited " + locked_getLongestInboundWait() + ')');
}
}
}
@ -505,7 +505,6 @@ public class FIFOBandwidthLimiter {
*/
private final void locked_satisfyInboundAvailable(List<Request> satisfied) {
for (int i = 0; i < _pendingInboundRequests.size(); i++) {
if (_availableInbound.get() <= 0) break;
SimpleRequest req = _pendingInboundRequests.get(i);
long waited = now() - req.getRequestTime();
if (req.getAborted()) {
@ -520,17 +519,22 @@ public class FIFOBandwidthLimiter {
i--;
continue;
}
if ( (req.getAllocationsSinceWait() > 0) && (req.getCompleteListener() == null) ) {
int avi = _availableInbound.get();
if (avi <= 0) break;
// NO, don't do this, since SSU requires a full allocation to proceed.
// By stopping after a partial allocation, we stall SSU.
// This never affected NTCP (which also requires a full allocation)
// since it registers a CompleteListener, so getAllocationsSinceWait() was always zero.
//if (req.getAllocationsSinceWait() > 0) {
// we have already allocated some values to this request, but
// they haven't taken advantage of it yet (most likely they're
// IO bound)
// (also, the complete listener is only set for situations where
// waitForNextAllocation() is never called)
continue;
}
// continue;
//}
// ok, they are really waiting for us to give them stuff
int requested = req.getPendingRequested();
int avi = _availableInbound.get();
int allocated;
if (avi >= requested)
allocated = requested;
@ -579,7 +583,7 @@ public class FIFOBandwidthLimiter {
if (_log.shouldLog(Log.INFO))
_log.info("Still denying the " + _pendingOutboundRequests.size()
+ " pending outbound requests (status: " + getStatus().toString()
+ ", longest waited " + locked_getLongestOutboundWait() + " out)");
+ ", longest waited " + locked_getLongestOutboundWait() + ')');
}
}
}
@ -623,7 +627,6 @@ public class FIFOBandwidthLimiter {
*/
private final void locked_satisfyOutboundAvailable(List<Request> satisfied) {
for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
if (_availableOutbound.get() <= 0) break;
SimpleRequest req = _pendingOutboundRequests.get(i);
long waited = now() - req.getRequestTime();
if (req.getAborted()) {
@ -638,17 +641,22 @@ public class FIFOBandwidthLimiter {
i--;
continue;
}
if (req.getAllocationsSinceWait() > 0) {
int avo = _availableOutbound.get();
if (avo <= 0) break;
// NO, don't do this, since SSU requires a full allocation to proceed.
// By stopping after a partial allocation, we stall SSU.
// This never affected NTCP (which also requires a full allocation)
// since it registers a CompleteListener, so getAllocationsSinceWait() was always zero.
//if (req.getAllocationsSinceWait() > 0) {
// we have already allocated some values to this request, but
// they haven't taken advantage of it yet (most likely they're
// IO bound)
if (_log.shouldLog(Log.WARN))
_log.warn("multiple allocations since wait... ntcp shouldn't do this: " + req);
continue;
}
// if (_log.shouldLog(Log.WARN))
// _log.warn("multiple allocations since wait... ntcp shouldn't do this: " + req);
// continue;
//}
// ok, they are really waiting for us to give them stuff
int requested = req.getPendingRequested();
int avo = _availableOutbound.get();
int allocated;
if (avo >= requested)
allocated = requested;
@ -659,15 +667,6 @@ public class FIFOBandwidthLimiter {
req.allocateBytes(allocated);
satisfied.add(req);
if (req.getPendingRequested() > 0) {
if (req.attachment() != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocating " + allocated + " bytes outbound as a partial grant to "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingOutboundRequests.size()
+ ", longest waited " + locked_getLongestOutboundWait() + " out");
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocating " + allocated + " bytes outbound as a partial grant to "
+ req
@ -676,15 +675,6 @@ public class FIFOBandwidthLimiter {
+ "ms) pending " + _pendingOutboundRequests.size()
+ ", longest waited " + locked_getLongestOutboundWait() + " out");
} else {
if (req.attachment() != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocating " + allocated + " bytes outbound to finish the partial grant to "
+ req
+ " waited "
+ waited
+ "ms) pending " + _pendingOutboundRequests.size()
+ ", longest waited " + locked_getLongestOutboundWait() + " out)");
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocating " + allocated + " bytes outbound to finish the partial grant to "
+ req
@ -807,9 +797,9 @@ public class FIFOBandwidthLimiter {
/** uses System clock, not context clock */
public long getRequestTime() { return _requestTime; }
public int getTotalRequested() { return _total; }
public int getPendingRequested() { return _total - _allocated; }
public synchronized int getPendingRequested() { return _total - _allocated; }
public boolean getAborted() { return _aborted; }
public void abort() {
public synchronized void abort() {
_aborted = true;
// so isComplete() will return true
_allocated = _total;
@ -817,6 +807,9 @@ public class FIFOBandwidthLimiter {
}
public CompleteListener getCompleteListener() { return _lsnr; }
/**
* Only used by NTCP.
*/
public void setCompleteListener(CompleteListener lsnr) {
boolean complete = false;
synchronized (this) {
@ -832,18 +825,19 @@ public class FIFOBandwidthLimiter {
}
}
private boolean isComplete() { return _allocated >= _total; }
private synchronized boolean isComplete() { return _allocated >= _total; }
/**
* Only used by SSU.
* May return without allocating.
* Check getPendingRequested() > 0 in a loop.
*/
public void waitForNextAllocation() {
_waited = true;
_allocationsSinceWait = 0;
boolean complete = false;
try {
synchronized (this) {
_waited = true;
_allocationsSinceWait = 0;
if (isComplete())
complete = true;
else
@ -854,17 +848,21 @@ public class FIFOBandwidthLimiter {
_lsnr.complete(this);
}
int getAllocationsSinceWait() { return _waited ? _allocationsSinceWait : 0; }
/**
* Only returns nonzero if there's no listener and waitForNextAllocation()
* has been called (i.e. SSU)
* Now unused.
*/
synchronized int getAllocationsSinceWait() { return _waited ? _allocationsSinceWait : 0; }
void allocateBytes(int bytes) {
/**
* Increments allocationsSinceWait only if there is a listener.
* Does not notify; caller must call notifyAllocation()
*/
synchronized void allocateBytes(int bytes) {
_allocated += bytes;
if (_lsnr == null)
_allocationsSinceWait++;
//if (isComplete()) {
// if (_log.shouldLog(Log.INFO))
// _log.info("allocate " + bytes + " completed, listener=" + _lsnr);
//}
//notifyAllocation(); // handled within the satisfy* methods
}
void notifyAllocation() {
@ -893,8 +891,8 @@ public class FIFOBandwidthLimiter {
@Override
public String toString() {
return "Req" + _requestId + " priority " + _priority +
_allocated + '/' + _total + " bytes";
return "Req: " + _requestId + " priority: " + _priority +
' ' + _allocated + '/' + _total + " bytes";
}
}