diff --git a/router/java/src/net/i2p/router/InNetMessagePool.java b/router/java/src/net/i2p/router/InNetMessagePool.java index 5350025c3..bf08cc8d4 100644 --- a/router/java/src/net/i2p/router/InNetMessagePool.java +++ b/router/java/src/net/i2p/router/InNetMessagePool.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Map; import net.i2p.data.i2np.DeliveryStatusMessage; +import net.i2p.data.i2np.I2NPMessage; import net.i2p.util.Log; /** @@ -56,35 +57,37 @@ public class InNetMessagePool { * */ public int add(InNetMessage msg) { - Date exp = msg.getMessage().getMessageExpiration(); - boolean valid = _context.messageValidator().validateMessage(msg.getMessage().getUniqueId(), exp.getTime()); + I2NPMessage messageBody = msg.getMessage(); + msg.processingComplete(); + Date exp = messageBody.getMessageExpiration(); + boolean valid = _context.messageValidator().validateMessage(messageBody.getUniqueId(), exp.getTime()); if (!valid) { if (_log.shouldLog(Log.WARN)) - _log.warn("Duplicate message received [" + msg.getMessage().getUniqueId() - + " expiring on " + exp + "]: " + msg.getMessage().getClass().getName()); + _log.warn("Duplicate message received [" + messageBody.getUniqueId() + + " expiring on " + exp + "]: " + messageBody.getClass().getName()); _context.statManager().addRateData("inNetPool.dropped", 1, 0); _context.statManager().addRateData("inNetPool.duplicate", 1, 0); - _context.messageHistory().droppedOtherMessage(msg.getMessage()); - _context.messageHistory().messageProcessingError(msg.getMessage().getUniqueId(), - msg.getMessage().getClass().getName(), + _context.messageHistory().droppedOtherMessage(messageBody); + _context.messageHistory().messageProcessingError(messageBody.getUniqueId(), + messageBody.getClass().getName(), "Duplicate/expired"); return -1; } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Message received [" + msg.getMessage().getUniqueId() + _log.debug("Message received [" + messageBody.getUniqueId() + " expiring on " + exp + "] is NOT a duplicate or exipired"); } int size = -1; - int type = msg.getMessage().getType(); + int type = messageBody.getType(); HandlerJobBuilder builder = (HandlerJobBuilder)_handlerJobBuilders.get(new Integer(type)); if (_log.shouldLog(Log.DEBUG)) _log.debug("Add message to the inNetMessage pool - builder: " + builder - + " message class: " + msg.getMessage().getClass().getName()); + + " message class: " + messageBody.getClass().getName()); if (builder != null) { - Job job = builder.createJob(msg.getMessage(), msg.getFromRouter(), + Job job = builder.createJob(messageBody, msg.getFromRouter(), msg.getFromRouterHash(), msg.getReplyBlock()); if (job != null) { _context.jobQueue().addJob(job); @@ -94,7 +97,7 @@ public class InNetMessagePool { } } - List origMessages = _context.messageRegistry().getOriginalMessages(msg.getMessage()); + List origMessages = _context.messageRegistry().getOriginalMessages(messageBody); if (_log.shouldLog(Log.DEBUG)) _log.debug("Original messages for inbound message: " + origMessages.size()); if (origMessages.size() > 1) { @@ -111,7 +114,7 @@ public class InNetMessagePool { + " : " + omsg + ": reply job: " + job); if (job != null) { - job.setMessage(msg.getMessage()); + job.setMessage(messageBody); _context.jobQueue().addJob(job); } } @@ -120,31 +123,31 @@ public class InNetMessagePool { // not handled as a reply if (size == -1) { // was not handled via HandlerJobBuilder - _context.messageHistory().droppedOtherMessage(msg.getMessage()); - if (msg.getMessage().getType() == DeliveryStatusMessage.MESSAGE_TYPE) { + _context.messageHistory().droppedOtherMessage(messageBody); + if (type == DeliveryStatusMessage.MESSAGE_TYPE) { long timeSinceSent = _context.clock().now() - - ((DeliveryStatusMessage)msg.getMessage()).getArrival().getTime(); + ((DeliveryStatusMessage)messageBody).getArrival().getTime(); if (_log.shouldLog(Log.INFO)) _log.info("Dropping unhandled delivery status message created " + timeSinceSent + "ms ago: " + msg); _context.statManager().addRateData("inNetPool.droppedDeliveryStatusDelay", timeSinceSent, timeSinceSent); } else { if (_log.shouldLog(Log.ERROR)) - _log.error("Message " + msg.getMessage() + " was not handled by a HandlerJobBuilder - DROPPING: " + _log.error("Message " + messageBody + " was not handled by a HandlerJobBuilder - DROPPING: " + msg, new Exception("DROPPED MESSAGE")); _context.statManager().addRateData("inNetPool.dropped", 1, 0); } } else { - String mtype = msg.getMessage().getClass().getName(); - _context.messageHistory().receiveMessage(mtype, msg.getMessage().getUniqueId(), - msg.getMessage().getMessageExpiration(), + String mtype = messageBody.getClass().getName(); + _context.messageHistory().receiveMessage(mtype, messageBody.getUniqueId(), + messageBody.getMessageExpiration(), msg.getFromRouterHash(), true); return size; } } - String mtype = msg.getMessage().getClass().getName(); - _context.messageHistory().receiveMessage(mtype, msg.getMessage().getUniqueId(), - msg.getMessage().getMessageExpiration(), + String mtype = messageBody.getClass().getName(); + _context.messageHistory().receiveMessage(mtype, messageBody.getUniqueId(), + messageBody.getMessageExpiration(), msg.getFromRouterHash(), true); return size; }