minor refactoring, javadoc
dont add an arbitrary extra Router.CLOCK_FUDGE_FACTOR to the expiration
This commit is contained in:
@ -97,31 +97,10 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
_log.error("Someone br0ke us. where is this message supposed to go again?",
|
||||
getAddedBy());
|
||||
return;
|
||||
} else {
|
||||
forwardToGateway();
|
||||
return;
|
||||
}
|
||||
TunnelMessage msg = new TunnelMessage(_context);
|
||||
try {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
|
||||
_message.writeBytes(baos);
|
||||
msg.setData(baos.toByteArray());
|
||||
msg.setTunnelId(_tunnelId);
|
||||
msg.setMessageExpiration(new Date(_expiration));
|
||||
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg,
|
||||
_destRouter, _onSend,
|
||||
_onReply, _onFailure,
|
||||
_selector, _expiration,
|
||||
_priority));
|
||||
|
||||
String bodyType = _message.getClass().getName();
|
||||
_context.messageHistory().wrap(bodyType, _message.getUniqueId(),
|
||||
TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error writing out the tunnel message to send to the tunnel", ioe);
|
||||
} catch (DataFormatException dfe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error writing out the tunnel message to send to the tunnel", dfe);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
info.messageProcessed();
|
||||
@ -139,7 +118,43 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Forward this job's message to the gateway of the tunnel requested
|
||||
*
|
||||
*/
|
||||
private void forwardToGateway() {
|
||||
TunnelMessage msg = new TunnelMessage(_context);
|
||||
try {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
|
||||
_message.writeBytes(baos);
|
||||
msg.setData(baos.toByteArray());
|
||||
msg.setTunnelId(_tunnelId);
|
||||
msg.setMessageExpiration(new Date(_expiration));
|
||||
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg,
|
||||
_destRouter, _onSend,
|
||||
_onReply, _onFailure,
|
||||
_selector, _expiration,
|
||||
_priority));
|
||||
|
||||
String bodyType = _message.getClass().getName();
|
||||
_context.messageHistory().wrap(bodyType, _message.getUniqueId(),
|
||||
TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error writing out the tunnel message to send to the tunnel", ioe);
|
||||
} catch (DataFormatException dfe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error writing out the tunnel message to send to the tunnel", dfe);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* We are the gateway for the tunnel this message is bound to,
|
||||
* so wrap it accordingly and send it on its way.
|
||||
*
|
||||
*/
|
||||
private void handleAsGateway(TunnelInfo info) {
|
||||
// since we are the gateway, we don't need to verify the data structures
|
||||
TunnelInfo us = getUs(info);
|
||||
@ -176,6 +191,11 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We are the participant in the tunnel, so verify the signature / data and
|
||||
* forward it to the next hop.
|
||||
*
|
||||
*/
|
||||
private void handleAsParticipant(TunnelInfo info) {
|
||||
// SendTunnelMessageJob shouldn't be used for participants!
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -251,6 +271,11 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
return (us.getSigningKey() != null); // only the gateway can sign
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the tunnel message with appropriate instructions for the
|
||||
* tunnel endpoint, then encrypt and sign it.
|
||||
*
|
||||
*/
|
||||
private TunnelMessage prepareMessage(TunnelInfo info) {
|
||||
TunnelMessage msg = new TunnelMessage(_context);
|
||||
|
||||
@ -316,6 +341,10 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
return msg;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and sign the verification structure, using the tunnel's signing key
|
||||
*
|
||||
*/
|
||||
private TunnelVerificationStructure createVerificationStructure(byte encryptedMessage[], TunnelInfo info) {
|
||||
TunnelVerificationStructure struct = new TunnelVerificationStructure();
|
||||
struct.setMessageHash(_context.sha().calculateHash(encryptedMessage));
|
||||
@ -323,6 +352,11 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
return struct;
|
||||
}
|
||||
|
||||
/**
|
||||
* encrypt the structure (the message or instructions)
|
||||
*
|
||||
* @param paddedSize minimum size to pad to
|
||||
*/
|
||||
private byte[] encrypt(DataStructure struct, SessionKey key, int paddedSize) {
|
||||
try {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(paddedSize);
|
||||
@ -342,6 +376,12 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* We are both the endpoint and gateway for the tunnel, so honor
|
||||
* what was requested of us (processing the message locally,
|
||||
* forwarding to a router, forwarding to a tunnel, etc)
|
||||
*
|
||||
*/
|
||||
private void honorInstructions(TunnelInfo info) {
|
||||
if (_selector != null)
|
||||
createFakeOutNetMessage();
|
||||
@ -357,93 +397,112 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
RouterIdentity ident = _context.router().getRouterInfo().getIdentity();
|
||||
|
||||
if (_destRouter != null) {
|
||||
I2NPMessage msg = null;
|
||||
if (_targetTunnelId != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Forward " + _message.getClass().getName()
|
||||
+ " message off to remote tunnel "
|
||||
+ _targetTunnelId.getTunnelId() + " on router "
|
||||
+ _destRouter.toBase64());
|
||||
TunnelMessage tmsg = new TunnelMessage(_context);
|
||||
tmsg.setEncryptedDeliveryInstructions(null);
|
||||
tmsg.setTunnelId(_targetTunnelId);
|
||||
tmsg.setVerificationStructure(null);
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
|
||||
try {
|
||||
_message.writeBytes(baos);
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error writing out the message to be forwarded...??", ioe);
|
||||
} catch (DataFormatException dfe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error writing message to be forwarded...???", dfe);
|
||||
}
|
||||
tmsg.setData(baos.toByteArray());
|
||||
msg = tmsg;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Forward " + _message.getClass().getName()
|
||||
+ " message off to remote router " + _destRouter.toBase64());
|
||||
msg = _message;
|
||||
}
|
||||
long now = _context.clock().now();
|
||||
//if (_expiration < now) {
|
||||
_expiration = now + Router.CLOCK_FUDGE_FACTOR;
|
||||
//_log.info("Fudging the message send so it expires in the fudge factor...");
|
||||
//}
|
||||
|
||||
if (_expiration - 30*1000 < now) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Why are we trying to send a " + _message.getClass().getName()
|
||||
+ " message with " + (_expiration-now) + "ms left?", getAddedBy());
|
||||
}
|
||||
|
||||
String bodyType = _message.getClass().getName();
|
||||
_context.messageHistory().wrap(bodyType, _message.getUniqueId(),
|
||||
TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
|
||||
// don't specify a selector, since createFakeOutNetMessage already does that
|
||||
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, _destRouter,
|
||||
_onSend, _onReply, _onFailure,
|
||||
null, _expiration, _priority));
|
||||
honorSendRemote(info, ident);
|
||||
} else {
|
||||
if ( (info.getDestination() == null) || !(_message instanceof DataMessage) ) {
|
||||
// its a network message targeting us...
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Destination is null or its not a DataMessage - pass it off to the InNetMessagePool");
|
||||
InNetMessage msg = new InNetMessage(_context);
|
||||
msg.setFromRouter(ident);
|
||||
msg.setFromRouterHash(ident.getHash());
|
||||
msg.setMessage(_message);
|
||||
msg.setReplyBlock(null);
|
||||
_context.inNetMessagePool().add(msg);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Destination is not null and it is a DataMessage - pop it into the ClientMessagePool");
|
||||
DataMessage msg = (DataMessage)_message;
|
||||
boolean valid = _context.messageValidator().validateMessage(msg.getUniqueId(), msg.getMessageExpiration().getTime());
|
||||
if (!valid) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Duplicate data message received [" + msg.getUniqueId() + " expiring on " + msg.getMessageExpiration() + "]");
|
||||
_context.messageHistory().droppedOtherMessage(msg);
|
||||
_context.messageHistory().messageProcessingError(msg.getUniqueId(), msg.getClass().getName(), "Duplicate");
|
||||
return;
|
||||
}
|
||||
|
||||
Payload payload = new Payload();
|
||||
payload.setEncryptedData(msg.getData());
|
||||
|
||||
MessageReceptionInfo receptionInfo = new MessageReceptionInfo();
|
||||
receptionInfo.setFromPeer(ident.getHash());
|
||||
receptionInfo.setFromTunnel(_tunnelId);
|
||||
|
||||
ClientMessage clientMessage = new ClientMessage();
|
||||
clientMessage.setDestination(info.getDestination());
|
||||
clientMessage.setPayload(payload);
|
||||
clientMessage.setReceptionInfo(receptionInfo);
|
||||
_context.clientMessagePool().add(clientMessage);
|
||||
_context.messageHistory().receivePayloadMessage(msg.getUniqueId());
|
||||
honorSendLocal(info, ident);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We are the gateway and endpoint and we have been asked to forward the
|
||||
* message to a remote location (either a tunnel or a router).
|
||||
*
|
||||
*/
|
||||
private void honorSendRemote(TunnelInfo info, RouterIdentity ident) {
|
||||
I2NPMessage msg = null;
|
||||
if (_targetTunnelId != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Forward " + _message.getClass().getName()
|
||||
+ " message off to remote tunnel "
|
||||
+ _targetTunnelId.getTunnelId() + " on router "
|
||||
+ _destRouter.toBase64());
|
||||
TunnelMessage tmsg = new TunnelMessage(_context);
|
||||
tmsg.setEncryptedDeliveryInstructions(null);
|
||||
tmsg.setTunnelId(_targetTunnelId);
|
||||
tmsg.setVerificationStructure(null);
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
|
||||
try {
|
||||
_message.writeBytes(baos);
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error writing out the message to be forwarded...??", ioe);
|
||||
} catch (DataFormatException dfe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error writing message to be forwarded...???", dfe);
|
||||
}
|
||||
tmsg.setData(baos.toByteArray());
|
||||
msg = tmsg;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Forward " + _message.getClass().getName()
|
||||
+ " message off to remote router " + _destRouter.toBase64());
|
||||
msg = _message;
|
||||
}
|
||||
long now = _context.clock().now();
|
||||
//if (_expiration < now) {
|
||||
//_expiration = now + Router.CLOCK_FUDGE_FACTOR;
|
||||
//_log.info("Fudging the message send so it expires in the fudge factor...");
|
||||
//}
|
||||
|
||||
if (_expiration - 10*1000 < now) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Why are we trying to send a " + _message.getClass().getName()
|
||||
+ " message with " + (_expiration-now) + "ms left?", getAddedBy());
|
||||
}
|
||||
|
||||
String bodyType = _message.getClass().getName();
|
||||
_context.messageHistory().wrap(bodyType, _message.getUniqueId(),
|
||||
TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
|
||||
// don't specify a selector, since createFakeOutNetMessage already does that
|
||||
_context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, _destRouter,
|
||||
_onSend, _onReply, _onFailure,
|
||||
null, _expiration, _priority));
|
||||
}
|
||||
|
||||
/**
|
||||
* We are the gateway and endpoint, and the instructions say to forward the
|
||||
* message to, uh, us. The message may be a normal network message or they
|
||||
* may be a client DataMessage.
|
||||
*
|
||||
*/
|
||||
private void honorSendLocal(TunnelInfo info, RouterIdentity ident) {
|
||||
if ( (info.getDestination() == null) || !(_message instanceof DataMessage) ) {
|
||||
// its a network message targeting us...
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Destination is null or its not a DataMessage - pass it off to the InNetMessagePool");
|
||||
InNetMessage msg = new InNetMessage(_context);
|
||||
msg.setFromRouter(ident);
|
||||
msg.setFromRouterHash(ident.getHash());
|
||||
msg.setMessage(_message);
|
||||
msg.setReplyBlock(null);
|
||||
_context.inNetMessagePool().add(msg);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Destination is not null and it is a DataMessage - pop it into the ClientMessagePool");
|
||||
DataMessage msg = (DataMessage)_message;
|
||||
boolean valid = _context.messageValidator().validateMessage(msg.getUniqueId(), msg.getMessageExpiration().getTime());
|
||||
if (!valid) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Duplicate data message received [" + msg.getUniqueId() + " expiring on " + msg.getMessageExpiration() + "]");
|
||||
_context.messageHistory().droppedOtherMessage(msg);
|
||||
_context.messageHistory().messageProcessingError(msg.getUniqueId(), msg.getClass().getName(), "Duplicate");
|
||||
return;
|
||||
}
|
||||
|
||||
Payload payload = new Payload();
|
||||
payload.setEncryptedData(msg.getData());
|
||||
|
||||
MessageReceptionInfo receptionInfo = new MessageReceptionInfo();
|
||||
receptionInfo.setFromPeer(ident.getHash());
|
||||
receptionInfo.setFromTunnel(_tunnelId);
|
||||
|
||||
ClientMessage clientMessage = new ClientMessage();
|
||||
clientMessage.setDestination(info.getDestination());
|
||||
clientMessage.setPayload(payload);
|
||||
clientMessage.setReceptionInfo(receptionInfo);
|
||||
_context.clientMessagePool().add(clientMessage);
|
||||
_context.messageHistory().receivePayloadMessage(msg.getUniqueId());
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user