discard the payload of a message ASAP (even though we may need to hang on to the message for a while, for its replyJob, etc)

take note of the fact that the tunnel had activity
minor logging and formatting updates
This commit is contained in:
jrandom
2004-06-20 00:24:06 +00:00
committed by zzz
parent 1b7fb96ca8
commit bab7b8b9ed

View File

@ -76,7 +76,11 @@ public class SendTunnelMessageJob extends JobImpl {
_priority = priority; _priority = priority;
if (timeoutMs < 50*1000) { if (timeoutMs < 50*1000) {
_log.info("Sending tunnel message to expire in " + timeoutMs + "ms containing " + msg.getUniqueId() + " (a " + msg.getClass().getName() + ")", new Exception("SendTunnel from")); if (_log.shouldLog(Log.INFO))
_log.info("Sending tunnel message to expire in " + timeoutMs
+ "ms containing " + msg.getUniqueId() + " (a "
+ msg.getClass().getName() + ")",
new Exception("SendTunnel from"));
} }
//_log.info("Send tunnel message " + msg.getClass().getName() + " to " + _destRouter + " over " + _tunnelId + " targetting tunnel " + _targetTunnelId, new Exception("SendTunnel from")); //_log.info("Send tunnel message " + msg.getClass().getName() + " to " + _destRouter + " over " + _tunnelId + " targetting tunnel " + _targetTunnelId, new Exception("SendTunnel from"));
_expiration = _context.clock().now() + timeoutMs; _expiration = _context.clock().now() + timeoutMs;
@ -85,9 +89,13 @@ public class SendTunnelMessageJob extends JobImpl {
public void runJob() { public void runJob() {
TunnelInfo info = _context.tunnelManager().getTunnelInfo(_tunnelId); TunnelInfo info = _context.tunnelManager().getTunnelInfo(_tunnelId);
if (info == null) { if (info == null) {
_log.debug("Message for unknown tunnel [" + _tunnelId + "] received, forward to " + _destRouter); if (_log.shouldLog(Log.DEBUG))
_log.debug("Message for unknown tunnel [" + _tunnelId
+ "] received, forward to " + _destRouter);
if ( (_tunnelId == null) || (_destRouter == null) ) { if ( (_tunnelId == null) || (_destRouter == null) ) {
_log.error("Someone br0ke us. where is this message supposed to go again?", getAddedBy()); if (_log.shouldLog(Log.ERROR))
_log.error("Someone br0ke us. where is this message supposed to go again?",
getAddedBy());
return; return;
} }
TunnelMessage msg = new TunnelMessage(_context); TunnelMessage msg = new TunnelMessage(_context);
@ -97,20 +105,30 @@ public class SendTunnelMessageJob extends JobImpl {
msg.setData(baos.toByteArray()); msg.setData(baos.toByteArray());
msg.setTunnelId(_tunnelId); msg.setTunnelId(_tunnelId);
msg.setMessageExpiration(new Date(_expiration)); msg.setMessageExpiration(new Date(_expiration));
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, _destRouter, _onSend, _onReply, _onFailure, _selector, _expiration, _priority)); _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg,
_destRouter, _onSend,
_onReply, _onFailure,
_selector, _expiration,
_priority));
String bodyType = _message.getClass().getName(); String bodyType = _message.getClass().getName();
_context.messageHistory().wrap(bodyType, _message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); _context.messageHistory().wrap(bodyType, _message.getUniqueId(),
TunnelMessage.class.getName(), msg.getUniqueId());
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error writing out the tunnel message to send to the tunnel", ioe); if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the tunnel message to send to the tunnel", ioe);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
_log.error("Error writing out the tunnel message to send to the tunnel", dfe); if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the tunnel message to send to the tunnel", dfe);
} }
return; return;
} }
info.messageProcessed();
if (isEndpoint(info)) { if (isEndpoint(info)) {
_log.info("Tunnel message where we're both the gateway and the endpoint - honor instructions"); if (_log.shouldLog(Log.INFO))
_log.info("Tunnel message where we're both the gateway and the endpoint - honor instructions");
honorInstructions(info); honorInstructions(info);
return; return;
} else if (isGateway(info)) { } else if (isGateway(info)) {
@ -126,7 +144,8 @@ public class SendTunnelMessageJob extends JobImpl {
// since we are the gateway, we don't need to verify the data structures // since we are the gateway, we don't need to verify the data structures
TunnelInfo us = getUs(info); TunnelInfo us = getUs(info);
if (us == null) { if (us == null) {
_log.error("We are not participating in this /known/ tunnel - was the router reset?"); if (_log.shouldLog(Log.ERROR))
_log.error("We are not participating in this /known/ tunnel - was the router reset?");
if (_onFailure != null) if (_onFailure != null)
_context.jobQueue().addJob(_onFailure); _context.jobQueue().addJob(_onFailure);
} else { } else {
@ -139,13 +158,21 @@ public class SendTunnelMessageJob extends JobImpl {
_context.jobQueue().addJob(_onFailure); _context.jobQueue().addJob(_onFailure);
return; return;
} }
_log.debug("Tunnel message created: " + msg + " out of encrypted message: " + _message); if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel message created: " + msg + " out of encrypted message: "
+ _message);
long now = _context.clock().now(); long now = _context.clock().now();
if (_expiration < now + 15*1000) { if (_expiration < now + 15*1000) {
_log.warn("Adding a tunnel message that will expire shortly [" + new Date(_expiration) + "]", getAddedBy()); if (_log.shouldLog(Log.WARN))
_log.warn("Adding a tunnel message that will expire shortly ["
+ new Date(_expiration) + "]", getAddedBy());
} }
msg.setMessageExpiration(new Date(_expiration)); msg.setMessageExpiration(new Date(_expiration));
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, info.getNextHop(), _onSend, _onReply, _onFailure, _selector, _expiration, _priority)); _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg,
info.getNextHop(), _onSend,
_onReply, _onFailure,
_selector, _expiration,
_priority));
} }
} }
@ -240,19 +267,24 @@ public class SendTunnelMessageJob extends JobImpl {
if (_destRouter != null) { if (_destRouter != null) {
instructions.setRouter(_destRouter); instructions.setRouter(_destRouter);
if (_targetTunnelId != null) { if (_targetTunnelId != null) {
_log.debug("Instructions target tunnel " + _targetTunnelId + " on router " + _destRouter.calculateHash()); if (_log.shouldLog(Log.DEBUG))
_log.debug("Instructions target tunnel " + _targetTunnelId
+ " on router " + _destRouter.calculateHash());
instructions.setTunnelId(_targetTunnelId); instructions.setTunnelId(_targetTunnelId);
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_TUNNEL); instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_TUNNEL);
} else { } else {
_log.debug("Instructions target router " + _destRouter.toBase64()); if (_log.shouldLog(Log.DEBUG))
_log.debug("Instructions target router " + _destRouter.toBase64());
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_ROUTER); instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_ROUTER);
} }
} else { } else {
if (_message instanceof DataMessage) { if (_message instanceof DataMessage) {
_log.debug("Instructions are for local message delivery at the endpoint with a DataMessage to be sent to a Destination"); if (_log.shouldLog(Log.DEBUG))
_log.debug("Instructions are for local message delivery at the endpoint with a DataMessage to be sent to a Destination");
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL); instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL);
} else { } else {
_log.debug("Instructions are for local delivery at the endpoint targetting the now-local router"); if (_log.shouldLog(Log.DEBUG))
_log.debug("Instructions are for local delivery at the endpoint targetting the now-local router");
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL); instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL);
} }
} }
@ -274,7 +306,8 @@ public class SendTunnelMessageJob extends JobImpl {
String bodyType = _message.getClass().getName(); String bodyType = _message.getClass().getName();
_context.messageHistory().wrap(bodyType, _message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); _context.messageHistory().wrap(bodyType, _message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());
_log.debug("Tunnel message prepared: instructions = " + instructions); if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel message prepared: instructions = " + instructions);
msg.setData(encryptedMessage); msg.setData(encryptedMessage);
msg.setEncryptedDeliveryInstructions(encryptedInstructions); msg.setEncryptedDeliveryInstructions(encryptedInstructions);
@ -300,9 +333,11 @@ public class SendTunnelMessageJob extends JobImpl {
System.arraycopy(h.getData(), 0, iv, 0, iv.length); System.arraycopy(h.getData(), 0, iv, 0, iv.length);
return _context.AESEngine().safeEncrypt(baos.toByteArray(), key, iv, paddedSize); return _context.AESEngine().safeEncrypt(baos.toByteArray(), key, iv, paddedSize);
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error writing out data to encrypt", ioe); if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out data to encrypt", ioe);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
_log.error("Error formatting data to encrypt", dfe); if (_log.shouldLog(Log.ERROR))
_log.error("Error formatting data to encrypt", dfe);
} }
return null; return null;
} }
@ -312,7 +347,8 @@ public class SendTunnelMessageJob extends JobImpl {
createFakeOutNetMessage(); createFakeOutNetMessage();
if (_onSend != null) { if (_onSend != null) {
_log.debug("Firing onSend as we're honoring the instructions"); if (_log.shouldLog(Log.DEBUG))
_log.debug("Firing onSend as we're honoring the instructions");
_context.jobQueue().addJob(_onSend); _context.jobQueue().addJob(_onSend);
} }
@ -323,7 +359,11 @@ public class SendTunnelMessageJob extends JobImpl {
if (_destRouter != null) { if (_destRouter != null) {
I2NPMessage msg = null; I2NPMessage msg = null;
if (_targetTunnelId != null) { if (_targetTunnelId != null) {
_log.debug("Forward " + _message.getClass().getName() + " message off to remote tunnel " + _targetTunnelId.getTunnelId() + " on router " + _destRouter.toBase64()); if (_log.shouldLog(Log.DEBUG))
_log.debug("Forward " + _message.getClass().getName()
+ " message off to remote tunnel "
+ _targetTunnelId.getTunnelId() + " on router "
+ _destRouter.toBase64());
TunnelMessage tmsg = new TunnelMessage(_context); TunnelMessage tmsg = new TunnelMessage(_context);
tmsg.setEncryptedDeliveryInstructions(null); tmsg.setEncryptedDeliveryInstructions(null);
tmsg.setTunnelId(_targetTunnelId); tmsg.setTunnelId(_targetTunnelId);
@ -332,14 +372,18 @@ public class SendTunnelMessageJob extends JobImpl {
try { try {
_message.writeBytes(baos); _message.writeBytes(baos);
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error writing out the message to be forwarded...??", ioe); if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the message to be forwarded...??", ioe);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
_log.error("Error writing message to be forwarded...???", dfe); if (_log.shouldLog(Log.ERROR))
_log.error("Error writing message to be forwarded...???", dfe);
} }
tmsg.setData(baos.toByteArray()); tmsg.setData(baos.toByteArray());
msg = tmsg; msg = tmsg;
} else { } else {
_log.debug("Forward " + _message.getClass().getName() + " message off to remote router " + _destRouter.toBase64()); if (_log.shouldLog(Log.DEBUG))
_log.debug("Forward " + _message.getClass().getName()
+ " message off to remote router " + _destRouter.toBase64());
msg = _message; msg = _message;
} }
long now = _context.clock().now(); long now = _context.clock().now();
@ -349,26 +393,33 @@ public class SendTunnelMessageJob extends JobImpl {
//} //}
if (_expiration - 30*1000 < now) { if (_expiration - 30*1000 < now) {
_log.error("Why are we trying to send a " + _message.getClass().getName() + " message with " + (_expiration-now) + "ms left?", getAddedBy()); if (_log.shouldLog(Log.ERROR))
_log.error("Why are we trying to send a " + _message.getClass().getName()
+ " message with " + (_expiration-now) + "ms left?", getAddedBy());
} }
String bodyType = _message.getClass().getName(); String bodyType = _message.getClass().getName();
_context.messageHistory().wrap(bodyType, _message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); _context.messageHistory().wrap(bodyType, _message.getUniqueId(),
TunnelMessage.class.getName(), msg.getUniqueId());
// don't specify a selector, since createFakeOutNetMessage already does that // don't specify a selector, since createFakeOutNetMessage already does that
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, _destRouter, _onSend, _onReply, _onFailure, null, _expiration, _priority)); _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, _destRouter,
_onSend, _onReply, _onFailure,
null, _expiration, _priority));
} else { } else {
if ( (info.getDestination() == null) || !(_message instanceof DataMessage) ) { if ( (info.getDestination() == null) || !(_message instanceof DataMessage) ) {
// its a network message targeting us... // its a network message targeting us...
_log.debug("Destination is null or its not a DataMessage - pass it off to the InNetMessagePool"); if (_log.shouldLog(Log.DEBUG))
InNetMessage msg = new InNetMessage(); _log.debug("Destination is null or its not a DataMessage - pass it off to the InNetMessagePool");
InNetMessage msg = new InNetMessage(_context);
msg.setFromRouter(ident); msg.setFromRouter(ident);
msg.setFromRouterHash(ident.getHash()); msg.setFromRouterHash(ident.getHash());
msg.setMessage(_message); msg.setMessage(_message);
msg.setReplyBlock(null); msg.setReplyBlock(null);
_context.inNetMessagePool().add(msg); _context.inNetMessagePool().add(msg);
} else { } else {
_log.debug("Destination is not null and it is a DataMessage - pop it into the ClientMessagePool"); if (_log.shouldLog(Log.DEBUG))
_log.debug("Destination is not null and it is a DataMessage - pop it into the ClientMessagePool");
DataMessage msg = (DataMessage)_message; DataMessage msg = (DataMessage)_message;
boolean valid = _context.messageValidator().validateMessage(msg.getUniqueId(), msg.getMessageExpiration().getTime()); boolean valid = _context.messageValidator().validateMessage(msg.getUniqueId(), msg.getMessageExpiration().getTime());
if (!valid) { if (!valid) {
@ -398,7 +449,8 @@ public class SendTunnelMessageJob extends JobImpl {
private void createFakeOutNetMessage() { private void createFakeOutNetMessage() {
// now we create a fake outNetMessage to go onto the registry so we can select // now we create a fake outNetMessage to go onto the registry so we can select
_log.debug("Registering a fake outNetMessage for the message tunneled locally since we have a selector"); if (_log.shouldLog(Log.DEBUG))
_log.debug("Registering a fake outNetMessage for the message tunneled locally since we have a selector");
OutNetMessage outM = new OutNetMessage(_context); OutNetMessage outM = new OutNetMessage(_context);
outM.setExpiration(_expiration); outM.setExpiration(_expiration);
outM.setMessage(_message); outM.setMessage(_message);
@ -410,6 +462,8 @@ public class SendTunnelMessageJob extends JobImpl {
outM.setReplySelector(_selector); outM.setReplySelector(_selector);
outM.setTarget(null); outM.setTarget(null);
_context.messageRegistry().registerPending(outM); _context.messageRegistry().registerPending(outM);
// we dont really need the data
outM.discardData();
} }
public String getName() { return "Send Tunnel Message"; } public String getName() { return "Send Tunnel Message"; }