note that we've successfully processed a message (and as such drop its payload) ASAP, and only use safely cached snippets of it afterwards
This commit is contained in:
@ -15,6 +15,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import net.i2p.data.i2np.DeliveryStatusMessage;
|
||||
import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
@ -56,35 +57,37 @@ public class InNetMessagePool {
|
||||
*
|
||||
*/
|
||||
public int add(InNetMessage msg) {
|
||||
Date exp = msg.getMessage().getMessageExpiration();
|
||||
boolean valid = _context.messageValidator().validateMessage(msg.getMessage().getUniqueId(), exp.getTime());
|
||||
I2NPMessage messageBody = msg.getMessage();
|
||||
msg.processingComplete();
|
||||
Date exp = messageBody.getMessageExpiration();
|
||||
boolean valid = _context.messageValidator().validateMessage(messageBody.getUniqueId(), exp.getTime());
|
||||
if (!valid) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Duplicate message received [" + msg.getMessage().getUniqueId()
|
||||
+ " expiring on " + exp + "]: " + msg.getMessage().getClass().getName());
|
||||
_log.warn("Duplicate message received [" + messageBody.getUniqueId()
|
||||
+ " expiring on " + exp + "]: " + messageBody.getClass().getName());
|
||||
_context.statManager().addRateData("inNetPool.dropped", 1, 0);
|
||||
_context.statManager().addRateData("inNetPool.duplicate", 1, 0);
|
||||
_context.messageHistory().droppedOtherMessage(msg.getMessage());
|
||||
_context.messageHistory().messageProcessingError(msg.getMessage().getUniqueId(),
|
||||
msg.getMessage().getClass().getName(),
|
||||
_context.messageHistory().droppedOtherMessage(messageBody);
|
||||
_context.messageHistory().messageProcessingError(messageBody.getUniqueId(),
|
||||
messageBody.getClass().getName(),
|
||||
"Duplicate/expired");
|
||||
return -1;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Message received [" + msg.getMessage().getUniqueId()
|
||||
_log.debug("Message received [" + messageBody.getUniqueId()
|
||||
+ " expiring on " + exp + "] is NOT a duplicate or exipired");
|
||||
}
|
||||
|
||||
int size = -1;
|
||||
int type = msg.getMessage().getType();
|
||||
int type = messageBody.getType();
|
||||
HandlerJobBuilder builder = (HandlerJobBuilder)_handlerJobBuilders.get(new Integer(type));
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Add message to the inNetMessage pool - builder: " + builder
|
||||
+ " message class: " + msg.getMessage().getClass().getName());
|
||||
+ " message class: " + messageBody.getClass().getName());
|
||||
|
||||
if (builder != null) {
|
||||
Job job = builder.createJob(msg.getMessage(), msg.getFromRouter(),
|
||||
Job job = builder.createJob(messageBody, msg.getFromRouter(),
|
||||
msg.getFromRouterHash(), msg.getReplyBlock());
|
||||
if (job != null) {
|
||||
_context.jobQueue().addJob(job);
|
||||
@ -94,7 +97,7 @@ public class InNetMessagePool {
|
||||
}
|
||||
}
|
||||
|
||||
List origMessages = _context.messageRegistry().getOriginalMessages(msg.getMessage());
|
||||
List origMessages = _context.messageRegistry().getOriginalMessages(messageBody);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Original messages for inbound message: " + origMessages.size());
|
||||
if (origMessages.size() > 1) {
|
||||
@ -111,7 +114,7 @@ public class InNetMessagePool {
|
||||
+ " : " + omsg + ": reply job: " + job);
|
||||
|
||||
if (job != null) {
|
||||
job.setMessage(msg.getMessage());
|
||||
job.setMessage(messageBody);
|
||||
_context.jobQueue().addJob(job);
|
||||
}
|
||||
}
|
||||
@ -120,31 +123,31 @@ public class InNetMessagePool {
|
||||
// not handled as a reply
|
||||
if (size == -1) {
|
||||
// was not handled via HandlerJobBuilder
|
||||
_context.messageHistory().droppedOtherMessage(msg.getMessage());
|
||||
if (msg.getMessage().getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
|
||||
_context.messageHistory().droppedOtherMessage(messageBody);
|
||||
if (type == DeliveryStatusMessage.MESSAGE_TYPE) {
|
||||
long timeSinceSent = _context.clock().now() -
|
||||
((DeliveryStatusMessage)msg.getMessage()).getArrival().getTime();
|
||||
((DeliveryStatusMessage)messageBody).getArrival().getTime();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Dropping unhandled delivery status message created " + timeSinceSent + "ms ago: " + msg);
|
||||
_context.statManager().addRateData("inNetPool.droppedDeliveryStatusDelay", timeSinceSent, timeSinceSent);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Message " + msg.getMessage() + " was not handled by a HandlerJobBuilder - DROPPING: "
|
||||
_log.error("Message " + messageBody + " was not handled by a HandlerJobBuilder - DROPPING: "
|
||||
+ msg, new Exception("DROPPED MESSAGE"));
|
||||
_context.statManager().addRateData("inNetPool.dropped", 1, 0);
|
||||
}
|
||||
} else {
|
||||
String mtype = msg.getMessage().getClass().getName();
|
||||
_context.messageHistory().receiveMessage(mtype, msg.getMessage().getUniqueId(),
|
||||
msg.getMessage().getMessageExpiration(),
|
||||
String mtype = messageBody.getClass().getName();
|
||||
_context.messageHistory().receiveMessage(mtype, messageBody.getUniqueId(),
|
||||
messageBody.getMessageExpiration(),
|
||||
msg.getFromRouterHash(), true);
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
||||
String mtype = msg.getMessage().getClass().getName();
|
||||
_context.messageHistory().receiveMessage(mtype, msg.getMessage().getUniqueId(),
|
||||
msg.getMessage().getMessageExpiration(),
|
||||
String mtype = messageBody.getClass().getName();
|
||||
_context.messageHistory().receiveMessage(mtype, messageBody.getUniqueId(),
|
||||
messageBody.getMessageExpiration(),
|
||||
msg.getFromRouterHash(), true);
|
||||
return size;
|
||||
}
|
||||
|
Reference in New Issue
Block a user