added stat - tunnel.unknownTunnelTimeLeft which gathers the time until expiration of tunnel messages we're dropping because we dont know about the tunnel Id.
formatting / logging
This commit is contained in:
@ -57,356 +57,378 @@ public class HandleTunnelMessageJob extends JobImpl {
|
|||||||
private final static int FORWARD_PRIORITY = 400;
|
private final static int FORWARD_PRIORITY = 400;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
StatManager.getInstance().createFrequencyStat("tunnel.unknownTunnelFrequency", "How often do we receive tunnel messages for unknown tunnels?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
StatManager.getInstance().createRateStat("tunnel.unknownTunnelTimeLeft", "How much time is left on tunnel messages we receive that are for unknown tunnels?", "Tunnels", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
StatManager.getInstance().createRateStat("tunnel.gatewayMessageSize", "How large are the messages we are forwarding on as an inbound gateway?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
StatManager.getInstance().createRateStat("tunnel.gatewayMessageSize", "How large are the messages we are forwarding on as an inbound gateway?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
StatManager.getInstance().createRateStat("tunnel.relayMessageSize", "How large are the messages we are forwarding on as a participant in a tunnel?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
StatManager.getInstance().createRateStat("tunnel.relayMessageSize", "How large are the messages we are forwarding on as a participant in a tunnel?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
StatManager.getInstance().createRateStat("tunnel.endpointMessageSize", "How large are the messages we are forwarding in as an outbound endpoint?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
StatManager.getInstance().createRateStat("tunnel.endpointMessageSize", "How large are the messages we are forwarding in as an outbound endpoint?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
}
|
}
|
||||||
|
|
||||||
public HandleTunnelMessageJob(TunnelMessage msg, RouterIdentity from, Hash fromHash) {
|
public HandleTunnelMessageJob(TunnelMessage msg, RouterIdentity from, Hash fromHash) {
|
||||||
super();
|
super();
|
||||||
_message = msg;
|
_message = msg;
|
||||||
_from = from;
|
_from = from;
|
||||||
_fromHash = fromHash;
|
_fromHash = fromHash;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() { return "Handle Inbound Tunnel Message"; }
|
public String getName() { return "Handle Inbound Tunnel Message"; }
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
TunnelId id = _message.getTunnelId();
|
TunnelId id = _message.getTunnelId();
|
||||||
TunnelInfo info = TunnelManagerFacade.getInstance().getTunnelInfo(id);
|
TunnelInfo info = TunnelManagerFacade.getInstance().getTunnelInfo(id);
|
||||||
|
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
Hash from = _fromHash;
|
Hash from = _fromHash;
|
||||||
if (_from != null)
|
if (_from != null)
|
||||||
from = _from.getHash();
|
from = _from.getHash();
|
||||||
MessageHistory.getInstance().droppedTunnelMessage(id, from);
|
MessageHistory.getInstance().droppedTunnelMessage(id, from);
|
||||||
_log.error("Received a message for an unknown tunnel [" + id.getTunnelId() + "], dropping it: " + _message, getAddedBy());
|
if (_log.shouldLog(Log.ERROR))
|
||||||
StatManager.getInstance().updateFrequency("tunnel.unknownTunnelFrequency");
|
_log.error("Received a message for an unknown tunnel [" + id.getTunnelId()
|
||||||
return;
|
+ "], dropping it: " + _message, getAddedBy());
|
||||||
}
|
long timeRemaining = _message.getMessageExpiration().getTime() - Clock.getInstance().now();
|
||||||
|
StatManager.getInstance().addRateData("tunnel.unknownTunnelTimeLeft", timeRemaining, 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
info = getUs(info);
|
info = getUs(info);
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
_log.error("We are not part of a known tunnel?? wtf! drop.", getAddedBy());
|
if (_log.shouldLog(Log.ERROR))
|
||||||
StatManager.getInstance().updateFrequency("tunnel.unknownTunnelFrequency");
|
_log.error("We are not part of a known tunnel?? wtf! drop.", getAddedBy());
|
||||||
return;
|
long timeRemaining = _message.getMessageExpiration().getTime() - Clock.getInstance().now();
|
||||||
} else {
|
StatManager.getInstance().addRateData("tunnel.unknownTunnelTimeLeft", timeRemaining, 0);
|
||||||
_log.debug("Tunnel message received for tunnel: \n" + info);
|
return;
|
||||||
}
|
} else {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Tunnel message received for tunnel: \n" + info);
|
||||||
|
}
|
||||||
|
|
||||||
//if ( (_message.getVerificationStructure() == null) && (info.getSigningKey() != null) ) {
|
//if ( (_message.getVerificationStructure() == null) && (info.getSigningKey() != null) ) {
|
||||||
if (_message.getVerificationStructure() == null) {
|
if (_message.getVerificationStructure() == null) {
|
||||||
if (info.getSigningKey() != null) {
|
if (info.getSigningKey() != null) {
|
||||||
if (info.getNextHop() != null) {
|
if (info.getNextHop() != null) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("We are the gateway to tunnel " + id.getTunnelId());
|
_log.debug("We are the gateway to tunnel " + id.getTunnelId());
|
||||||
byte data[] = _message.getData();
|
byte data[] = _message.getData();
|
||||||
I2NPMessage msg = getBody(data);
|
I2NPMessage msg = getBody(data);
|
||||||
JobQueue.getInstance().addJob(new HandleGatewayMessageJob(msg, info, data.length));
|
JobQueue.getInstance().addJob(new HandleGatewayMessageJob(msg, info, data.length));
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("We are the gateway and the endpoint for tunnel " + id.getTunnelId());
|
_log.debug("We are the gateway and the endpoint for tunnel " + id.getTunnelId());
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Process locally");
|
_log.debug("Process locally");
|
||||||
if (info.getDestination() != null) {
|
if (info.getDestination() != null) {
|
||||||
if (!ClientManagerFacade.getInstance().isLocal(info.getDestination())) {
|
if (!ClientManagerFacade.getInstance().isLocal(info.getDestination())) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Received a message on a tunnel allocated to a client that has disconnected - dropping it!");
|
_log.warn("Received a message on a tunnel allocated to a client that has disconnected - dropping it!");
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Dropping message for disconnected client: " + _message);
|
_log.debug("Dropping message for disconnected client: " + _message);
|
||||||
|
|
||||||
MessageHistory.getInstance().droppedOtherMessage(_message);
|
MessageHistory.getInstance().droppedOtherMessage(_message);
|
||||||
MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(), _message.getClass().getName(), "Disconnected client");
|
MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(),
|
||||||
return;
|
_message.getClass().getName(),
|
||||||
}
|
"Disconnected client");
|
||||||
}
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
I2NPMessage body = getBody(_message.getData());
|
I2NPMessage body = getBody(_message.getData());
|
||||||
if (body != null) {
|
if (body != null) {
|
||||||
JobQueue.getInstance().addJob(new HandleLocallyJob(body, info));
|
JobQueue.getInstance().addJob(new HandleLocallyJob(body, info));
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Body is null! content of message.getData() = [" + DataHelper.toString(_message.getData()) + "]", getAddedBy());
|
_log.error("Body is null! content of message.getData() = [" +
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
DataHelper.toString(_message.getData()) + "]", getAddedBy());
|
||||||
_log.debug("Message that failed: " + _message, getAddedBy());
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
return;
|
_log.debug("Message that failed: " + _message, getAddedBy());
|
||||||
}
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
if (_log.shouldLog(Log.ERROR))
|
} else {
|
||||||
_log.error("Received a message that we are not the gateway for on tunnel "
|
if (_log.shouldLog(Log.ERROR))
|
||||||
+ id.getTunnelId() + " without a verification structure: " + _message, getAddedBy());
|
_log.error("Received a message that we are not the gateway for on tunnel "
|
||||||
return;
|
+ id.getTunnelId() + " without a verification structure: " + _message, getAddedBy());
|
||||||
}
|
return;
|
||||||
} else {
|
}
|
||||||
// participant
|
} else {
|
||||||
TunnelVerificationStructure struct = _message.getVerificationStructure();
|
// participant
|
||||||
boolean ok = struct.verifySignature(info.getVerificationKey().getKey());
|
TunnelVerificationStructure struct = _message.getVerificationStructure();
|
||||||
if (!ok) {
|
boolean ok = struct.verifySignature(info.getVerificationKey().getKey());
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (!ok) {
|
||||||
_log.warn("Failed tunnel verification! Spoofing / tagging attack? " + _message, getAddedBy());
|
if (_log.shouldLog(Log.WARN))
|
||||||
return;
|
_log.warn("Failed tunnel verification! Spoofing / tagging attack? " + _message, getAddedBy());
|
||||||
} else {
|
return;
|
||||||
if (info.getNextHop() != null) {
|
} else {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (info.getNextHop() != null) {
|
||||||
_log.info("Message for tunnel " + id.getTunnelId() + " received where we're not the gateway and there are remaining hops, so forward it on to "
|
if (_log.shouldLog(Log.INFO))
|
||||||
+ info.getNextHop().toBase64() + " via SendTunnelMessageJob");
|
_log.info("Message for tunnel " + id.getTunnelId()
|
||||||
|
+ " received where we're not the gateway and there are remaining hops, so forward it on to "
|
||||||
|
+ info.getNextHop().toBase64() + " via SendTunnelMessageJob");
|
||||||
|
|
||||||
StatManager.getInstance().addRateData("tunnel.relayMessageSize", _message.getData().length, 0);
|
StatManager.getInstance().addRateData("tunnel.relayMessageSize",
|
||||||
|
_message.getData().length, 0);
|
||||||
|
|
||||||
JobQueue.getInstance().addJob(new SendMessageDirectJob(_message, info.getNextHop(), Clock.getInstance().now() + FORWARD_TIMEOUT, FORWARD_PRIORITY));
|
JobQueue.getInstance().addJob(new SendMessageDirectJob(_message, info.getNextHop(),
|
||||||
return;
|
Clock.getInstance().now() + FORWARD_TIMEOUT, FORWARD_PRIORITY));
|
||||||
} else {
|
return;
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
} else {
|
||||||
_log.debug("No more hops, unwrap and follow the instructions");
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
JobQueue.getInstance().addJob(new HandleEndpointJob(info));
|
_log.debug("No more hops, unwrap and follow the instructions");
|
||||||
return;
|
JobQueue.getInstance().addJob(new HandleEndpointJob(info));
|
||||||
}
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processLocally(TunnelInfo ourPlace) {
|
private void processLocally(TunnelInfo ourPlace) {
|
||||||
if (ourPlace.getEncryptionKey() == null) {
|
if (ourPlace.getEncryptionKey() == null) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Argh, somehow we don't have the decryption key and we have no more steps", getAddedBy());
|
_log.error("Argh, somehow we don't have the decryption key and we have no more steps", getAddedBy());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
DeliveryInstructions instructions = getInstructions(_message.getEncryptedDeliveryInstructions(), ourPlace.getEncryptionKey().getKey());
|
DeliveryInstructions instructions = getInstructions(_message.getEncryptedDeliveryInstructions(),
|
||||||
if (instructions == null) {
|
ourPlace.getEncryptionKey().getKey());
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (instructions == null) {
|
||||||
_log.error("We are the endpoint of a non-zero length tunnel and we don't have instructions. DROP.", getAddedBy());
|
if (_log.shouldLog(Log.ERROR))
|
||||||
return;
|
_log.error("We are the endpoint of a non-zero length tunnel and we don't have instructions. DROP.", getAddedBy());
|
||||||
} else {
|
return;
|
||||||
I2NPMessage body = null;
|
} else {
|
||||||
if (instructions.getEncrypted()) {
|
I2NPMessage body = null;
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (instructions.getEncrypted()) {
|
||||||
_log.debug("Body in the tunnel IS encrypted");
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
body = decryptBody(_message.getData(), instructions.getEncryptionKey());
|
_log.debug("Body in the tunnel IS encrypted");
|
||||||
} else {
|
body = decryptBody(_message.getData(), instructions.getEncryptionKey());
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
} else {
|
||||||
_log.debug("Body in the tunnel is NOT encrypted: " + instructions + "\n" + _message, new Exception("Hmmm..."));
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
body = getBody(_message.getData());
|
_log.debug("Body in the tunnel is NOT encrypted: " + instructions
|
||||||
}
|
+ "\n" + _message, new Exception("Hmmm..."));
|
||||||
|
body = getBody(_message.getData());
|
||||||
|
}
|
||||||
|
|
||||||
if (body == null) {
|
if (body == null) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Unable to recover the body from the tunnel", getAddedBy());
|
_log.error("Unable to recover the body from the tunnel", getAddedBy());
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
JobQueue.getInstance().addJob(new ProcessBodyLocallyJob(body, instructions, ourPlace));
|
JobQueue.getInstance().addJob(new ProcessBodyLocallyJob(body, instructions, ourPlace));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void honorInstructions(DeliveryInstructions instructions, I2NPMessage body) {
|
private void honorInstructions(DeliveryInstructions instructions, I2NPMessage body) {
|
||||||
StatManager.getInstance().addRateData("tunnel.endpointMessageSize", _message.getData().length, 0);
|
StatManager.getInstance().addRateData("tunnel.endpointMessageSize", _message.getData().length, 0);
|
||||||
|
|
||||||
switch (instructions.getDeliveryMode()) {
|
switch (instructions.getDeliveryMode()) {
|
||||||
case DeliveryInstructions.DELIVERY_MODE_LOCAL:
|
case DeliveryInstructions.DELIVERY_MODE_LOCAL:
|
||||||
sendToLocal(body);
|
sendToLocal(body);
|
||||||
break;
|
break;
|
||||||
case DeliveryInstructions.DELIVERY_MODE_ROUTER:
|
case DeliveryInstructions.DELIVERY_MODE_ROUTER:
|
||||||
if (Router.getInstance().getRouterInfo().getIdentity().getHash().equals(instructions.getRouter())) {
|
if (Router.getInstance().getRouterInfo().getIdentity().getHash().equals(instructions.getRouter())) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Delivery instructions point at a router, but we're that router, so send to local");
|
_log.debug("Delivery instructions point at a router, but we're that router, so send to local");
|
||||||
sendToLocal(body);
|
sendToLocal(body);
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Delivery instructions point at a router, and we're not that router, so forward it off");
|
_log.debug("Delivery instructions point at a router, and we're not that router, so forward it off");
|
||||||
sendToRouter(instructions.getRouter(), body);
|
sendToRouter(instructions.getRouter(), body);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case DeliveryInstructions.DELIVERY_MODE_TUNNEL:
|
case DeliveryInstructions.DELIVERY_MODE_TUNNEL:
|
||||||
sendToTunnel(instructions.getRouter(), instructions.getTunnelId(), body);
|
sendToTunnel(instructions.getRouter(), instructions.getTunnelId(), body);
|
||||||
break;
|
break;
|
||||||
case DeliveryInstructions.DELIVERY_MODE_DESTINATION:
|
case DeliveryInstructions.DELIVERY_MODE_DESTINATION:
|
||||||
sendToDest(instructions.getDestination(), body);
|
sendToDest(instructions.getDestination(), body);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendToDest(Hash dest, I2NPMessage body) {
|
private void sendToDest(Hash dest, I2NPMessage body) {
|
||||||
if (body instanceof DataMessage) {
|
if (body instanceof DataMessage) {
|
||||||
boolean isLocal = ClientManagerFacade.getInstance().isLocal(dest);
|
boolean isLocal = ClientManagerFacade.getInstance().isLocal(dest);
|
||||||
if (isLocal) {
|
if (isLocal) {
|
||||||
deliverMessage(null, dest, (DataMessage)body);
|
deliverMessage(null, dest, (DataMessage)body);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Delivery to remote destinations is not yet supported", getAddedBy());
|
_log.error("Delivery to remote destinations is not yet supported", getAddedBy());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Deliver something other than a DataMessage to a Destination? I don't think so.");
|
_log.error("Deliver something other than a DataMessage to a Destination? I don't think so.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendToTunnel(Hash router, TunnelId id, I2NPMessage body) {
|
private void sendToTunnel(Hash router, TunnelId id, I2NPMessage body) {
|
||||||
// TODO: we may want to send it via a tunnel later on, but for now, direct will do.
|
// TODO: we may want to send it via a tunnel later on, but for now, direct will do.
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Sending on to requested tunnel " + id.getTunnelId() + " on router " + router.toBase64());
|
_log.debug("Sending on to requested tunnel " + id.getTunnelId() + " on router "
|
||||||
TunnelMessage msg = new TunnelMessage();
|
+ router.toBase64());
|
||||||
msg.setTunnelId(id);
|
TunnelMessage msg = new TunnelMessage();
|
||||||
try {
|
msg.setTunnelId(id);
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
|
try {
|
||||||
body.writeBytes(baos);
|
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
|
||||||
msg.setData(baos.toByteArray());
|
body.writeBytes(baos);
|
||||||
JobQueue.getInstance().addJob(new SendMessageDirectJob(msg, router, Clock.getInstance().now() + FORWARD_TIMEOUT, FORWARD_PRIORITY));
|
msg.setData(baos.toByteArray());
|
||||||
|
long exp = Clock.getInstance().now() + FORWARD_TIMEOUT;
|
||||||
|
JobQueue.getInstance().addJob(new SendMessageDirectJob(msg, router, exp, FORWARD_PRIORITY));
|
||||||
|
|
||||||
String bodyType = body.getClass().getName();
|
String bodyType = body.getClass().getName();
|
||||||
MessageHistory.getInstance().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());
|
MessageHistory.getInstance().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());
|
||||||
} catch (DataFormatException dfe) {
|
} catch (DataFormatException dfe) {
|
||||||
_log.error("Error writing out the message to forward to the tunnel", dfe);
|
if (_log.shouldLog(Log.ERROR))
|
||||||
} catch (IOException ioe) {
|
_log.error("Error writing out the message to forward to the tunnel", dfe);
|
||||||
_log.error("Error writing out the message to forward to the tunnel", ioe);
|
} catch (IOException ioe) {
|
||||||
}
|
if (_log.shouldLog(Log.ERROR))
|
||||||
|
_log.error("Error writing out the message to forward to the tunnel", ioe);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendToRouter(Hash router, I2NPMessage body) {
|
private void sendToRouter(Hash router, I2NPMessage body) {
|
||||||
// TODO: we may want to send it via a tunnel later on, but for now, direct will do.
|
// TODO: we may want to send it via a tunnel later on, but for now, direct will do.
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Sending on to requested router " + router.toBase64());
|
_log.debug("Sending on to requested router " + router.toBase64());
|
||||||
JobQueue.getInstance().addJob(new SendMessageDirectJob(body, router, Clock.getInstance().now() + FORWARD_TIMEOUT, FORWARD_PRIORITY));
|
long exp = Clock.getInstance().now() + FORWARD_TIMEOUT;
|
||||||
|
JobQueue.getInstance().addJob(new SendMessageDirectJob(body, router, exp, FORWARD_PRIORITY));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendToLocal(I2NPMessage body) {
|
private void sendToLocal(I2NPMessage body) {
|
||||||
InNetMessage msg = new InNetMessage();
|
InNetMessage msg = new InNetMessage();
|
||||||
msg.setMessage(body);
|
msg.setMessage(body);
|
||||||
msg.setFromRouter(_from);
|
msg.setFromRouter(_from);
|
||||||
msg.setFromRouterHash(_fromHash);
|
msg.setFromRouterHash(_fromHash);
|
||||||
InNetMessagePool.getInstance().add(msg);
|
InNetMessagePool.getInstance().add(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deliverMessage(Destination dest, Hash destHash, DataMessage msg) {
|
private void deliverMessage(Destination dest, Hash destHash, DataMessage msg) {
|
||||||
boolean valid = MessageValidator.getInstance().validateMessage(msg.getUniqueId(), msg.getMessageExpiration().getTime());
|
boolean valid = MessageValidator.getInstance().validateMessage(msg.getUniqueId(), msg.getMessageExpiration().getTime());
|
||||||
if (!valid) {
|
if (!valid) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Duplicate data message received [" + msg.getUniqueId() + " expiring on " + msg.getMessageExpiration() + "]");
|
_log.warn("Duplicate data message received [" + msg.getUniqueId()
|
||||||
MessageHistory.getInstance().droppedOtherMessage(msg);
|
+ " expiring on " + msg.getMessageExpiration() + "]");
|
||||||
MessageHistory.getInstance().messageProcessingError(msg.getUniqueId(), msg.getClass().getName(), "Duplicate payload");
|
MessageHistory.getInstance().droppedOtherMessage(msg);
|
||||||
return;
|
MessageHistory.getInstance().messageProcessingError(msg.getUniqueId(), msg.getClass().getName(),
|
||||||
}
|
"Duplicate payload");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
ClientMessage cmsg = new ClientMessage();
|
ClientMessage cmsg = new ClientMessage();
|
||||||
|
|
||||||
Payload payload = new Payload();
|
Payload payload = new Payload();
|
||||||
payload.setEncryptedData(msg.getData());
|
payload.setEncryptedData(msg.getData());
|
||||||
|
|
||||||
MessageReceptionInfo info = new MessageReceptionInfo();
|
MessageReceptionInfo info = new MessageReceptionInfo();
|
||||||
info.setFromPeer(_fromHash);
|
info.setFromPeer(_fromHash);
|
||||||
info.setFromTunnel(_message.getTunnelId());
|
info.setFromTunnel(_message.getTunnelId());
|
||||||
|
|
||||||
cmsg.setDestination(dest);
|
cmsg.setDestination(dest);
|
||||||
cmsg.setDestinationHash(destHash);
|
cmsg.setDestinationHash(destHash);
|
||||||
cmsg.setPayload(payload);
|
cmsg.setPayload(payload);
|
||||||
cmsg.setReceptionInfo(info);
|
cmsg.setReceptionInfo(info);
|
||||||
|
|
||||||
MessageHistory.getInstance().receivePayloadMessage(msg.getUniqueId());
|
MessageHistory.getInstance().receivePayloadMessage(msg.getUniqueId());
|
||||||
// if the destination isn't local, the ClientMessagePool forwards it off as an OutboundClientMessageJob
|
// if the destination isn't local, the ClientMessagePool forwards it off as an OutboundClientMessageJob
|
||||||
ClientMessagePool.getInstance().add(cmsg);
|
ClientMessagePool.getInstance().add(cmsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
private I2NPMessage getBody(byte body[]) {
|
private I2NPMessage getBody(byte body[]) {
|
||||||
try {
|
try {
|
||||||
return _handler.readMessage(new ByteArrayInputStream(body));
|
return _handler.readMessage(new ByteArrayInputStream(body));
|
||||||
} catch (I2NPMessageException ime) {
|
} catch (I2NPMessageException ime) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Error parsing the message body", ime);
|
_log.error("Error parsing the message body", ime);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Error reading the message body", ioe);
|
_log.error("Error reading the message body", ioe);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private I2NPMessage decryptBody(byte encryptedMessage[], SessionKey key) {
|
private I2NPMessage decryptBody(byte encryptedMessage[], SessionKey key) {
|
||||||
byte iv[] = new byte[16];
|
byte iv[] = new byte[16];
|
||||||
Hash h = SHA256Generator.getInstance().calculateHash(key.getData());
|
Hash h = SHA256Generator.getInstance().calculateHash(key.getData());
|
||||||
System.arraycopy(h.getData(), 0, iv, 0, iv.length);
|
System.arraycopy(h.getData(), 0, iv, 0, iv.length);
|
||||||
byte decrypted[] = AESEngine.getInstance().safeDecrypt(encryptedMessage, key, iv);
|
byte decrypted[] = AESEngine.getInstance().safeDecrypt(encryptedMessage, key, iv);
|
||||||
if (decrypted == null) {
|
if (decrypted == null) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Error decrypting the message", getAddedBy());
|
_log.error("Error decrypting the message", getAddedBy());
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return getBody(decrypted);
|
return getBody(decrypted);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DeliveryInstructions getInstructions(byte encryptedInstructions[], SessionKey key) {
|
private DeliveryInstructions getInstructions(byte encryptedInstructions[], SessionKey key) {
|
||||||
try {
|
try {
|
||||||
byte iv[] = new byte[16];
|
byte iv[] = new byte[16];
|
||||||
Hash h = SHA256Generator.getInstance().calculateHash(key.getData());
|
Hash h = SHA256Generator.getInstance().calculateHash(key.getData());
|
||||||
System.arraycopy(h.getData(), 0, iv, 0, iv.length);
|
System.arraycopy(h.getData(), 0, iv, 0, iv.length);
|
||||||
byte decrypted[] = AESEngine.getInstance().safeDecrypt(encryptedInstructions, key, iv);
|
byte decrypted[] = AESEngine.getInstance().safeDecrypt(encryptedInstructions, key, iv);
|
||||||
if (decrypted == null) {
|
if (decrypted == null) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Error decrypting the instructions", getAddedBy());
|
_log.error("Error decrypting the instructions", getAddedBy());
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
DeliveryInstructions instructions = new DeliveryInstructions();
|
DeliveryInstructions instructions = new DeliveryInstructions();
|
||||||
instructions.readBytes(new ByteArrayInputStream(decrypted));
|
instructions.readBytes(new ByteArrayInputStream(decrypted));
|
||||||
return instructions;
|
return instructions;
|
||||||
} catch (DataFormatException dfe) {
|
} catch (DataFormatException dfe) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Error parsing the decrypted instructions", dfe);
|
_log.error("Error parsing the decrypted instructions", dfe);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Error reading the decrypted instructions", ioe);
|
_log.error("Error reading the decrypted instructions", ioe);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private TunnelInfo getUs(TunnelInfo info) {
|
private TunnelInfo getUs(TunnelInfo info) {
|
||||||
Hash us = Router.getInstance().getRouterInfo().getIdentity().getHash();
|
Hash us = Router.getInstance().getRouterInfo().getIdentity().getHash();
|
||||||
while (info != null) {
|
while (info != null) {
|
||||||
if (us.equals(info.getThisHop()))
|
if (us.equals(info.getThisHop()))
|
||||||
return info;
|
return info;
|
||||||
info = info.getNextHopInfo();
|
info = info.getNextHopInfo();
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean validateMessage(TunnelMessage msg, TunnelInfo info) {
|
private boolean validateMessage(TunnelMessage msg, TunnelInfo info) {
|
||||||
TunnelVerificationStructure vstruct = _message.getVerificationStructure();
|
TunnelVerificationStructure vstruct = _message.getVerificationStructure();
|
||||||
if (vstruct == null) {
|
if (vstruct == null) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Verification structure missing. invalid");
|
_log.warn("Verification structure missing. invalid");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( (info.getVerificationKey() == null) || (info.getVerificationKey().getKey() == null) ) {
|
if ( (info.getVerificationKey() == null) || (info.getVerificationKey().getKey() == null) ) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("wtf, no verification key for the tunnel? " + info, getAddedBy());
|
_log.error("wtf, no verification key for the tunnel? " + info, getAddedBy());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!vstruct.verifySignature(info.getVerificationKey().getKey())) {
|
if (!vstruct.verifySignature(info.getVerificationKey().getKey())) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("Received a tunnel message with an invalid signature!");
|
_log.error("Received a tunnel message with an invalid signature!");
|
||||||
// shitlist the sender?
|
// shitlist the sender?
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// now validate the message
|
// now validate the message
|
||||||
Hash msgHash = SHA256Generator.getInstance().calculateHash(_message.getData());
|
Hash msgHash = SHA256Generator.getInstance().calculateHash(_message.getData());
|
||||||
if (msgHash.equals(vstruct.getMessageHash())) {
|
if (msgHash.equals(vstruct.getMessageHash())) {
|
||||||
// hash matches. good.
|
// hash matches. good.
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("validateMessage: Signed hash does not match real hash. Data has been tampered with!");
|
_log.error("validateMessage: Signed hash does not match real hash. Data has been tampered with!");
|
||||||
// shitlist the sender!
|
// shitlist the sender!
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void dropped() {
|
public void dropped() {
|
||||||
MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(), _message.getClass().getName(), "Dropped due to overload");
|
MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(), _message.getClass().getName(),
|
||||||
|
"Dropped due to overload");
|
||||||
}
|
}
|
||||||
|
|
||||||
////
|
////
|
||||||
@ -415,112 +437,114 @@ public class HandleTunnelMessageJob extends JobImpl {
|
|||||||
|
|
||||||
/** we're the gateway, lets deal */
|
/** we're the gateway, lets deal */
|
||||||
private class HandleGatewayMessageJob extends JobImpl {
|
private class HandleGatewayMessageJob extends JobImpl {
|
||||||
private I2NPMessage _body;
|
private I2NPMessage _body;
|
||||||
private int _length;
|
private int _length;
|
||||||
private TunnelInfo _info;
|
private TunnelInfo _info;
|
||||||
|
|
||||||
public HandleGatewayMessageJob(I2NPMessage body, TunnelInfo tunnel, int length) {
|
public HandleGatewayMessageJob(I2NPMessage body, TunnelInfo tunnel, int length) {
|
||||||
_body = body;
|
_body = body;
|
||||||
_length = length;
|
_length = length;
|
||||||
_info = tunnel;
|
_info = tunnel;
|
||||||
}
|
}
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
if (_body != null) {
|
if (_body != null) {
|
||||||
StatManager.getInstance().addRateData("tunnel.gatewayMessageSize", _length, 0);
|
StatManager.getInstance().addRateData("tunnel.gatewayMessageSize", _length, 0);
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Message for tunnel " + _info.getTunnelId() + " received at the gateway (us), and since its > 0 length, forward the "
|
_log.info("Message for tunnel " + _info.getTunnelId() + " received at the gateway (us), and since its > 0 length, forward the "
|
||||||
+ _body.getClass().getName() + " message on to " + _info.getNextHop().toBase64() + " via SendTunnelMessageJob");
|
+ _body.getClass().getName() + " message on to " + _info.getNextHop().toBase64() + " via SendTunnelMessageJob");
|
||||||
JobQueue.getInstance().addJob(new SendTunnelMessageJob(_body, _info.getTunnelId(), null, null, null, null, FORWARD_TIMEOUT, FORWARD_PRIORITY));
|
JobQueue.getInstance().addJob(new SendTunnelMessageJob(_body, _info.getTunnelId(), null, null, null, null, FORWARD_TIMEOUT, FORWARD_PRIORITY));
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Body of the message for the tunnel could not be parsed");
|
_log.warn("Body of the message for the tunnel could not be parsed");
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Message that failed: " + _message);
|
_log.debug("Message that failed: " + _message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
public String getName() { return "Handle Tunnel Message (gateway)"; }
|
public String getName() { return "Handle Tunnel Message (gateway)"; }
|
||||||
}
|
}
|
||||||
|
|
||||||
/** zero hop tunnel */
|
/** zero hop tunnel */
|
||||||
private class HandleLocallyJob extends JobImpl {
|
private class HandleLocallyJob extends JobImpl {
|
||||||
private I2NPMessage _body;
|
private I2NPMessage _body;
|
||||||
private TunnelInfo _info;
|
private TunnelInfo _info;
|
||||||
|
|
||||||
public HandleLocallyJob(I2NPMessage body, TunnelInfo tunnel) {
|
public HandleLocallyJob(I2NPMessage body, TunnelInfo tunnel) {
|
||||||
_body = body;
|
_body = body;
|
||||||
_info = tunnel;
|
_info = tunnel;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
if (_body instanceof DataMessage) {
|
if (_body instanceof DataMessage) {
|
||||||
// we know where to send it and its something a client can handle, so lets send 'er to the client
|
// we know where to send it and its something a client can handle, so lets send 'er to the client
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.debug("Deliver the message to a local client, as its a payload message and we know the destination");
|
_log.debug("Deliver the message to a local client, as its a payload message and we know the destination");
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Message for tunnel " + _info.getTunnelId() + " received at the gateway (us), but its a 0 length tunnel and the message is a DataMessage, so send it to "
|
_log.info("Message for tunnel " + _info.getTunnelId() + " received at the gateway (us), but its a 0 length tunnel and the message is a DataMessage, so send it to "
|
||||||
+ _info.getDestination().calculateHash().toBase64());
|
+ _info.getDestination().calculateHash().toBase64());
|
||||||
deliverMessage(_info.getDestination(), null, (DataMessage)_body);
|
deliverMessage(_info.getDestination(), null, (DataMessage)_body);
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Message for tunnel " + _info.getTunnelId() +
|
_log.info("Message for tunnel " + _info.getTunnelId() +
|
||||||
" received at the gateway (us), but its a 0 length tunnel though it is a " + _body.getClass().getName() + ", so process it locally");
|
" received at the gateway (us), but its a 0 length tunnel though it is a "
|
||||||
InNetMessage msg = new InNetMessage();
|
+ _body.getClass().getName() + ", so process it locally");
|
||||||
msg.setFromRouter(_from);
|
InNetMessage msg = new InNetMessage();
|
||||||
msg.setFromRouterHash(_fromHash);
|
msg.setFromRouter(_from);
|
||||||
msg.setMessage(_body);
|
msg.setFromRouterHash(_fromHash);
|
||||||
InNetMessagePool.getInstance().add(msg);
|
msg.setMessage(_body);
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
InNetMessagePool.getInstance().add(msg);
|
||||||
_log.debug("Message added to Inbound network pool for local processing: " + _message);
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
}
|
_log.debug("Message added to Inbound network pool for local processing: " + _message);
|
||||||
}
|
}
|
||||||
public String getName() { return "Handle Tunnel Message (0 hop)"; }
|
}
|
||||||
|
public String getName() { return "Handle Tunnel Message (0 hop)"; }
|
||||||
}
|
}
|
||||||
|
|
||||||
/** we're the endpoint of the inbound tunnel */
|
/** we're the endpoint of the inbound tunnel */
|
||||||
private class HandleEndpointJob extends JobImpl {
|
private class HandleEndpointJob extends JobImpl {
|
||||||
private TunnelInfo _info;
|
private TunnelInfo _info;
|
||||||
public HandleEndpointJob(TunnelInfo info) {
|
public HandleEndpointJob(TunnelInfo info) {
|
||||||
_info = info;
|
_info = info;
|
||||||
}
|
}
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
processLocally(_info);
|
processLocally(_info);
|
||||||
}
|
}
|
||||||
public String getName() { return "Handle Tunnel Message (inbound endpoint)"; }
|
public String getName() { return "Handle Tunnel Message (inbound endpoint)"; }
|
||||||
}
|
}
|
||||||
|
|
||||||
/** endpoint of outbound 1+ hop tunnel with instructions */
|
/** endpoint of outbound 1+ hop tunnel with instructions */
|
||||||
private class ProcessBodyLocallyJob extends JobImpl {
|
private class ProcessBodyLocallyJob extends JobImpl {
|
||||||
private I2NPMessage _body;
|
private I2NPMessage _body;
|
||||||
private TunnelInfo _ourPlace;
|
private TunnelInfo _ourPlace;
|
||||||
private DeliveryInstructions _instructions;
|
private DeliveryInstructions _instructions;
|
||||||
public ProcessBodyLocallyJob(I2NPMessage body, DeliveryInstructions instructions, TunnelInfo ourPlace) {
|
public ProcessBodyLocallyJob(I2NPMessage body, DeliveryInstructions instructions, TunnelInfo ourPlace) {
|
||||||
_body = body;
|
_body = body;
|
||||||
_instructions = instructions;
|
_instructions = instructions;
|
||||||
_ourPlace = ourPlace;
|
_ourPlace = ourPlace;
|
||||||
}
|
}
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Body read: " + _body);
|
_log.debug("Body read: " + _body);
|
||||||
if ( (_ourPlace.getDestination() != null) && (_body instanceof DataMessage) ) {
|
if ( (_ourPlace.getDestination() != null) && (_body instanceof DataMessage) ) {
|
||||||
// we know where to send it and its something a client can handle, so lets send 'er to the client
|
// we know where to send it and its something a client can handle, so lets send 'er to the client
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Deliver the message to a local client, as its a payload message and we know the destination");
|
_log.debug("Deliver the message to a local client, as its a payload message and we know the destination");
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Message for tunnel " + _ourPlace.getTunnelId().getTunnelId()
|
_log.info("Message for tunnel " + _ourPlace.getTunnelId().getTunnelId()
|
||||||
+ " received where we're the endpoint containing a DataMessage message, so deliver it to "
|
+ " received where we're the endpoint containing a DataMessage message, so deliver it to "
|
||||||
+ _ourPlace.getDestination().calculateHash().toBase64());
|
+ _ourPlace.getDestination().calculateHash().toBase64());
|
||||||
deliverMessage(_ourPlace.getDestination(), null, (DataMessage)_body);
|
deliverMessage(_ourPlace.getDestination(), null, (DataMessage)_body);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
// Honor the delivery instructions
|
// Honor the delivery instructions
|
||||||
//TunnelMonitor.endpointReceive(ourPlace.getTunnelId(), body.getClass().getName(), instructions, ourPlace.getDestination());
|
if (_log.shouldLog(Log.INFO))
|
||||||
if (_log.shouldLog(Log.INFO))
|
_log.info("Message for tunnel " + _ourPlace.getTunnelId().getTunnelId()
|
||||||
_log.info("Message for tunnel " + _ourPlace.getTunnelId().getTunnelId() + " received where we're the endpoint containing a "
|
+ " received where we're the endpoint containing a "
|
||||||
+ _body.getClass().getName() + " message, so honor the delivery instructions: " + _instructions.toString());
|
+ _body.getClass().getName() + " message, so honor the delivery instructions: "
|
||||||
honorInstructions(_instructions, _body);
|
+ _instructions.toString());
|
||||||
return;
|
honorInstructions(_instructions, _body);
|
||||||
}
|
return;
|
||||||
}
|
}
|
||||||
public String getName() { return "Handle Tunnel Message (outbound endpoint)"; }
|
}
|
||||||
|
public String getName() { return "Handle Tunnel Message (outbound endpoint)"; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user