diff --git a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java index 841a48905..ee56b243c 100644 --- a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java @@ -97,31 +97,10 @@ public class SendTunnelMessageJob extends JobImpl { _log.error("Someone br0ke us. where is this message supposed to go again?", getAddedBy()); return; + } else { + forwardToGateway(); + return; } - TunnelMessage msg = new TunnelMessage(_context); - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - _message.writeBytes(baos); - msg.setData(baos.toByteArray()); - msg.setTunnelId(_tunnelId); - msg.setMessageExpiration(new Date(_expiration)); - _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, - _destRouter, _onSend, - _onReply, _onFailure, - _selector, _expiration, - _priority)); - - String bodyType = _message.getClass().getName(); - _context.messageHistory().wrap(bodyType, _message.getUniqueId(), - TunnelMessage.class.getName(), msg.getUniqueId()); - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the tunnel message to send to the tunnel", ioe); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the tunnel message to send to the tunnel", dfe); - } - return; } info.messageProcessed(); @@ -139,7 +118,43 @@ public class SendTunnelMessageJob extends JobImpl { return; } } + + /** + * Forward this job's message to the gateway of the tunnel requested + * + */ + private void forwardToGateway() { + TunnelMessage msg = new TunnelMessage(_context); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + _message.writeBytes(baos); + msg.setData(baos.toByteArray()); + msg.setTunnelId(_tunnelId); + msg.setMessageExpiration(new Date(_expiration)); + _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, + _destRouter, _onSend, + _onReply, _onFailure, + _selector, _expiration, + _priority)); + + String bodyType = _message.getClass().getName(); + _context.messageHistory().wrap(bodyType, _message.getUniqueId(), + TunnelMessage.class.getName(), msg.getUniqueId()); + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the tunnel message to send to the tunnel", ioe); + } catch (DataFormatException dfe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the tunnel message to send to the tunnel", dfe); + } + return; + } + /** + * We are the gateway for the tunnel this message is bound to, + * so wrap it accordingly and send it on its way. + * + */ private void handleAsGateway(TunnelInfo info) { // since we are the gateway, we don't need to verify the data structures TunnelInfo us = getUs(info); @@ -176,6 +191,11 @@ public class SendTunnelMessageJob extends JobImpl { } } + /** + * We are the participant in the tunnel, so verify the signature / data and + * forward it to the next hop. + * + */ private void handleAsParticipant(TunnelInfo info) { // SendTunnelMessageJob shouldn't be used for participants! if (_log.shouldLog(Log.DEBUG)) @@ -251,6 +271,11 @@ public class SendTunnelMessageJob extends JobImpl { return (us.getSigningKey() != null); // only the gateway can sign } + /** + * Build the tunnel message with appropriate instructions for the + * tunnel endpoint, then encrypt and sign it. + * + */ private TunnelMessage prepareMessage(TunnelInfo info) { TunnelMessage msg = new TunnelMessage(_context); @@ -316,6 +341,10 @@ public class SendTunnelMessageJob extends JobImpl { return msg; } + /** + * Create and sign the verification structure, using the tunnel's signing key + * + */ private TunnelVerificationStructure createVerificationStructure(byte encryptedMessage[], TunnelInfo info) { TunnelVerificationStructure struct = new TunnelVerificationStructure(); struct.setMessageHash(_context.sha().calculateHash(encryptedMessage)); @@ -323,6 +352,11 @@ public class SendTunnelMessageJob extends JobImpl { return struct; } + /** + * encrypt the structure (the message or instructions) + * + * @param paddedSize minimum size to pad to + */ private byte[] encrypt(DataStructure struct, SessionKey key, int paddedSize) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(paddedSize); @@ -342,6 +376,12 @@ public class SendTunnelMessageJob extends JobImpl { return null; } + /** + * We are both the endpoint and gateway for the tunnel, so honor + * what was requested of us (processing the message locally, + * forwarding to a router, forwarding to a tunnel, etc) + * + */ private void honorInstructions(TunnelInfo info) { if (_selector != null) createFakeOutNetMessage(); @@ -357,93 +397,112 @@ public class SendTunnelMessageJob extends JobImpl { RouterIdentity ident = _context.router().getRouterInfo().getIdentity(); if (_destRouter != null) { - I2NPMessage msg = null; - if (_targetTunnelId != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Forward " + _message.getClass().getName() - + " message off to remote tunnel " - + _targetTunnelId.getTunnelId() + " on router " - + _destRouter.toBase64()); - TunnelMessage tmsg = new TunnelMessage(_context); - tmsg.setEncryptedDeliveryInstructions(null); - tmsg.setTunnelId(_targetTunnelId); - tmsg.setVerificationStructure(null); - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - try { - _message.writeBytes(baos); - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the message to be forwarded...??", ioe); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing message to be forwarded...???", dfe); - } - tmsg.setData(baos.toByteArray()); - msg = tmsg; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Forward " + _message.getClass().getName() - + " message off to remote router " + _destRouter.toBase64()); - msg = _message; - } - long now = _context.clock().now(); - //if (_expiration < now) { - _expiration = now + Router.CLOCK_FUDGE_FACTOR; - //_log.info("Fudging the message send so it expires in the fudge factor..."); - //} - - if (_expiration - 30*1000 < now) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Why are we trying to send a " + _message.getClass().getName() - + " message with " + (_expiration-now) + "ms left?", getAddedBy()); - } - - String bodyType = _message.getClass().getName(); - _context.messageHistory().wrap(bodyType, _message.getUniqueId(), - TunnelMessage.class.getName(), msg.getUniqueId()); - - // don't specify a selector, since createFakeOutNetMessage already does that - _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, _destRouter, - _onSend, _onReply, _onFailure, - null, _expiration, _priority)); + honorSendRemote(info, ident); } else { - if ( (info.getDestination() == null) || !(_message instanceof DataMessage) ) { - // its a network message targeting us... - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Destination is null or its not a DataMessage - pass it off to the InNetMessagePool"); - InNetMessage msg = new InNetMessage(_context); - msg.setFromRouter(ident); - msg.setFromRouterHash(ident.getHash()); - msg.setMessage(_message); - msg.setReplyBlock(null); - _context.inNetMessagePool().add(msg); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Destination is not null and it is a DataMessage - pop it into the ClientMessagePool"); - DataMessage msg = (DataMessage)_message; - boolean valid = _context.messageValidator().validateMessage(msg.getUniqueId(), msg.getMessageExpiration().getTime()); - if (!valid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Duplicate data message received [" + msg.getUniqueId() + " expiring on " + msg.getMessageExpiration() + "]"); - _context.messageHistory().droppedOtherMessage(msg); - _context.messageHistory().messageProcessingError(msg.getUniqueId(), msg.getClass().getName(), "Duplicate"); - return; - } - - Payload payload = new Payload(); - payload.setEncryptedData(msg.getData()); - - MessageReceptionInfo receptionInfo = new MessageReceptionInfo(); - receptionInfo.setFromPeer(ident.getHash()); - receptionInfo.setFromTunnel(_tunnelId); - - ClientMessage clientMessage = new ClientMessage(); - clientMessage.setDestination(info.getDestination()); - clientMessage.setPayload(payload); - clientMessage.setReceptionInfo(receptionInfo); - _context.clientMessagePool().add(clientMessage); - _context.messageHistory().receivePayloadMessage(msg.getUniqueId()); + honorSendLocal(info, ident); + } + } + + /** + * We are the gateway and endpoint and we have been asked to forward the + * message to a remote location (either a tunnel or a router). + * + */ + private void honorSendRemote(TunnelInfo info, RouterIdentity ident) { + I2NPMessage msg = null; + if (_targetTunnelId != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Forward " + _message.getClass().getName() + + " message off to remote tunnel " + + _targetTunnelId.getTunnelId() + " on router " + + _destRouter.toBase64()); + TunnelMessage tmsg = new TunnelMessage(_context); + tmsg.setEncryptedDeliveryInstructions(null); + tmsg.setTunnelId(_targetTunnelId); + tmsg.setVerificationStructure(null); + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + try { + _message.writeBytes(baos); + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the message to be forwarded...??", ioe); + } catch (DataFormatException dfe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing message to be forwarded...???", dfe); } + tmsg.setData(baos.toByteArray()); + msg = tmsg; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Forward " + _message.getClass().getName() + + " message off to remote router " + _destRouter.toBase64()); + msg = _message; + } + long now = _context.clock().now(); + //if (_expiration < now) { + //_expiration = now + Router.CLOCK_FUDGE_FACTOR; + //_log.info("Fudging the message send so it expires in the fudge factor..."); + //} + + if (_expiration - 10*1000 < now) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Why are we trying to send a " + _message.getClass().getName() + + " message with " + (_expiration-now) + "ms left?", getAddedBy()); + } + + String bodyType = _message.getClass().getName(); + _context.messageHistory().wrap(bodyType, _message.getUniqueId(), + TunnelMessage.class.getName(), msg.getUniqueId()); + + // don't specify a selector, since createFakeOutNetMessage already does that + _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, _destRouter, + _onSend, _onReply, _onFailure, + null, _expiration, _priority)); + } + + /** + * We are the gateway and endpoint, and the instructions say to forward the + * message to, uh, us. The message may be a normal network message or they + * may be a client DataMessage. + * + */ + private void honorSendLocal(TunnelInfo info, RouterIdentity ident) { + if ( (info.getDestination() == null) || !(_message instanceof DataMessage) ) { + // its a network message targeting us... + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Destination is null or its not a DataMessage - pass it off to the InNetMessagePool"); + InNetMessage msg = new InNetMessage(_context); + msg.setFromRouter(ident); + msg.setFromRouterHash(ident.getHash()); + msg.setMessage(_message); + msg.setReplyBlock(null); + _context.inNetMessagePool().add(msg); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Destination is not null and it is a DataMessage - pop it into the ClientMessagePool"); + DataMessage msg = (DataMessage)_message; + boolean valid = _context.messageValidator().validateMessage(msg.getUniqueId(), msg.getMessageExpiration().getTime()); + if (!valid) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Duplicate data message received [" + msg.getUniqueId() + " expiring on " + msg.getMessageExpiration() + "]"); + _context.messageHistory().droppedOtherMessage(msg); + _context.messageHistory().messageProcessingError(msg.getUniqueId(), msg.getClass().getName(), "Duplicate"); + return; + } + + Payload payload = new Payload(); + payload.setEncryptedData(msg.getData()); + + MessageReceptionInfo receptionInfo = new MessageReceptionInfo(); + receptionInfo.setFromPeer(ident.getHash()); + receptionInfo.setFromTunnel(_tunnelId); + + ClientMessage clientMessage = new ClientMessage(); + clientMessage.setDestination(info.getDestination()); + clientMessage.setPayload(payload); + clientMessage.setReceptionInfo(receptionInfo); + _context.clientMessagePool().add(clientMessage); + _context.messageHistory().receivePayloadMessage(msg.getUniqueId()); } }