refactored the cleanup job
logging
This commit is contained in:
@ -119,8 +119,8 @@ public class OutboundMessageRegistry {
|
|||||||
m = (OutNetMessage)_pendingMessages.remove(expiration);
|
m = (OutNetMessage)_pendingMessages.remove(expiration);
|
||||||
}
|
}
|
||||||
long diff = _context.clock().now() - before;
|
long diff = _context.clock().now() - before;
|
||||||
if (diff > 500)
|
if ( (diff > 500) && (_log.shouldLog(Log.WARN)) )
|
||||||
_log.error("Took too long syncing on remove (" + diff + "ms");
|
_log.warn("Took too long syncing on remove (" + diff + "ms");
|
||||||
|
|
||||||
if (m != null) {
|
if (m != null) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
@ -186,12 +186,13 @@ public class OutboundMessageRegistry {
|
|||||||
long sync1 = afterSync1 - beforeSync;
|
long sync1 = afterSync1 - beforeSync;
|
||||||
long done = afterDone - afterSync1;
|
long done = afterDone - afterSync1;
|
||||||
String warn = delay + "ms (sync = " + sync1 + "ms, done = " + done + "ms)";
|
String warn = delay + "ms (sync = " + sync1 + "ms, done = " + done + "ms)";
|
||||||
if (delay > 1000) {
|
if ( (delay > 1000) && (_log.shouldLog(Log.WARN)) ) {
|
||||||
_log.error("Synchronizing in the registry.register took too long! " + warn);
|
_log.error("Synchronizing in the registry.register took too long! " + warn);
|
||||||
_context.messageHistory().messageProcessingError(msg.getMessage().getUniqueId(),
|
_context.messageHistory().messageProcessingError(msg.getMessage().getUniqueId(),
|
||||||
msg.getMessage().getClass().getName(),
|
msg.getMessage().getClass().getName(),
|
||||||
"RegisterPending took too long: " + warn);
|
"RegisterPending took too long: " + warn);
|
||||||
} else {
|
} else {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Synchronizing in the registry.register was quick: " + warn);
|
_log.debug("Synchronizing in the registry.register was quick: " + warn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -227,10 +228,11 @@ public class OutboundMessageRegistry {
|
|||||||
} finally {
|
} finally {
|
||||||
long delay = _context.clock().now() - beforeSync;
|
long delay = _context.clock().now() - beforeSync;
|
||||||
String warn = delay + "ms";
|
String warn = delay + "ms";
|
||||||
if (delay > 1000) {
|
if ( (delay > 1000) && (_log.shouldLog(Log.WARN)) ) {
|
||||||
_log.error("Synchronizing in the registry.unRegister took too long! " + warn);
|
_log.warn("Synchronizing in the registry.unRegister took too long! " + warn);
|
||||||
_context.messageHistory().messageProcessingError(msg.getMessage().getUniqueId(), msg.getMessage().getClass().getName(), "Unregister took too long: " + warn);
|
_context.messageHistory().messageProcessingError(msg.getMessage().getUniqueId(), msg.getMessage().getClass().getName(), "Unregister took too long: " + warn);
|
||||||
} else {
|
} else {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Synchronizing in the registry.unRegister was quick: " + warn);
|
_log.debug("Synchronizing in the registry.unRegister was quick: " + warn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -269,42 +271,14 @@ public class OutboundMessageRegistry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public String getName() { return "Cleanup any messages that timed out"; }
|
public String getName() { return "Cleanup any messages that timed out"; }
|
||||||
|
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
List toRemove = new ArrayList();
|
List removed = removeMessages();
|
||||||
long now = CleanupPendingMessagesJob.this._context.clock().now();
|
|
||||||
Map messages = null;
|
|
||||||
synchronized (_pendingMessages) {
|
|
||||||
messages = (Map)_pendingMessages.clone();
|
|
||||||
}
|
|
||||||
long afterCreate = CleanupPendingMessagesJob.this._context.clock().now();
|
|
||||||
|
|
||||||
for (Iterator iter = messages.keySet().iterator(); iter.hasNext(); ) {
|
|
||||||
Long exp = (Long)iter.next();
|
|
||||||
OutNetMessage msg = (OutNetMessage)messages.get(exp);
|
|
||||||
if (msg.getExpiration() < now) {
|
|
||||||
toRemove.add(exp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
long findRemove = CleanupPendingMessagesJob.this._context.clock().now();
|
|
||||||
|
|
||||||
long removeTime = 0;
|
|
||||||
long loopTime = 0;
|
|
||||||
|
|
||||||
RouterContext ctx = OutboundMessageRegistry.this._context;
|
RouterContext ctx = OutboundMessageRegistry.this._context;
|
||||||
|
|
||||||
for (Iterator iter = toRemove.iterator(); iter.hasNext(); ) {
|
for (int i = 0; i < removed.size(); i++) {
|
||||||
long beforeRemove = ctx.clock().now();
|
OutNetMessage msg = (OutNetMessage)removed.get(i);
|
||||||
Long exp = (Long)iter.next();
|
|
||||||
OutNetMessage msg = null;
|
|
||||||
synchronized (_pendingMessages) {
|
|
||||||
msg = (OutNetMessage)_pendingMessages.remove(exp);
|
|
||||||
}
|
|
||||||
long afterRemove = ctx.clock().now();
|
|
||||||
long diff = afterRemove - beforeRemove;
|
|
||||||
|
|
||||||
if (diff > 500)
|
|
||||||
_log.error("Synchronize during remove took too long " + diff + "ms");
|
|
||||||
removeTime += diff;
|
|
||||||
|
|
||||||
if (msg != null) {
|
if (msg != null) {
|
||||||
ctx.messageHistory().replyTimedOut(msg);
|
ctx.messageHistory().replyTimedOut(msg);
|
||||||
@ -321,29 +295,40 @@ public class OutboundMessageRegistry {
|
|||||||
+ " and not firing any job");
|
+ " and not firing any job");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
long doneLoop = ctx.clock().now();
|
|
||||||
long ldiff = doneLoop - beforeRemove;
|
|
||||||
if (ldiff > 500)
|
|
||||||
_log.error("Loop took too long [" + ldiff + "ms]");
|
|
||||||
loopTime += ldiff;
|
|
||||||
}
|
|
||||||
|
|
||||||
long cleanupDelay = ctx.clock().now() - now;
|
|
||||||
long findTime = findRemove - afterCreate;
|
|
||||||
long syncTime = afterCreate - now;
|
|
||||||
String warn = cleanupDelay + "ms (syncTime = " + syncTime + "ms, findTime ="
|
|
||||||
+ findTime + "ms, removeTime = " + removeTime + "ms, loopTime = "
|
|
||||||
+ loopTime + ")";
|
|
||||||
if (cleanupDelay > 1000) {
|
|
||||||
_log.error("Cleanup took too long! " + warn);
|
|
||||||
// yes, the following is a kludge, as its not specific to a particular message but to a whole series of messages
|
|
||||||
ctx.messageHistory().messageProcessingError(-1, OutboundMessageRegistry.CleanupPendingMessagesJob.class.getName(),
|
|
||||||
"Cleanup took too long: " + warn);
|
|
||||||
} else {
|
|
||||||
_log.debug("Cleanup was quick: " + warn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
requeue(CLEANUP_DELAY);
|
requeue(CLEANUP_DELAY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove any messages whose expirations are in the past
|
||||||
|
*
|
||||||
|
* @return list of OutNetMessage objects that have expired
|
||||||
|
*/
|
||||||
|
private List removeMessages() {
|
||||||
|
long now = OutboundMessageRegistry.this._context.clock().now();
|
||||||
|
List removedMessages = new ArrayList(8);
|
||||||
|
List expirationsToRemove = new ArrayList(8);
|
||||||
|
synchronized (_pendingMessages) {
|
||||||
|
for (Iterator iter = _pendingMessages.keySet().iterator(); iter.hasNext();) {
|
||||||
|
Long expiration = (Long)iter.next();
|
||||||
|
if (expiration.longValue() < now) {
|
||||||
|
expirationsToRemove.add(expiration);
|
||||||
|
} else {
|
||||||
|
// its sorted
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = 0; i < expirationsToRemove.size(); i++) {
|
||||||
|
Long expiration = (Long)expirationsToRemove.get(i);
|
||||||
|
OutNetMessage msg = (OutNetMessage)_pendingMessages.remove(expiration);
|
||||||
|
if (msg != null)
|
||||||
|
removedMessages.add(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Removed " + removedMessages.size() + " messages");
|
||||||
|
return removedMessages;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user