- Add support for b64 conversion in destLookup()
  - Catch invalid message length sooner
I2Ping:
  - Extend I2PTunnelClientBase so non-shared-client,
    I2CP options, and other features will work
  - Fixes for fields and threading
Streaming:
  - Send LS with ping (broken since 0.9.2)
  - Set the NO_ACK flag on pings and pongs
This commit is contained in:
zzz
2013-12-21 18:10:59 +00:00
parent cc97a19d3c
commit 5219791673
9 changed files with 166 additions and 78 deletions

View File

@ -1508,20 +1508,19 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
*/
private void runPing(String allargs, Logging l) {
if (allargs.length() != 0) {
I2PTunnelTask task;
// pings always use the main destination
task = new I2Ping(allargs, l, false, this, this);
_clientOptions.setProperty(I2Ping.PROP_COMMAND, allargs);
I2PTunnelTask task = new I2Ping(l, ownDest, this, this);
addtask(task);
notifyEvent("pingTaskId", Integer.valueOf(task.getId()));
} else {
l.log("ping <opts> <dest>");
l.log("ping <opts> <b64dest|host>");
l.log("ping <opts> -h (pings all hosts in hosts.txt)");
l.log("ping <opts> -l <destlistfile> (pings a list of hosts in a file)");
l.log(" Options:\n" +
" -c (require 5 consecutive pings to report success)\n" +
" -m maxSimultaneousPings (default 10)\n" +
" -n numberOfPings (default 3)\n" +
" -t timeout (ms, default 5000)\n");
" -t timeout (ms, default 30000)\n");
l.log(" Tests communication with peers.\n");
notifyEvent("pingTaskId", Integer.valueOf(-1));
}
@ -1732,7 +1731,7 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
// we do it below instead so we can set the host and port,
// which we can't do with lookup()
d = inst.lookup(name);
if (d != null || ctx.isRouterContext())
if (d != null || ctx.isRouterContext() || name.length() >= 516)
return d;
}
// Outside router context only,

View File

@ -51,15 +51,18 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
protected final List<I2PSocket> mySockets = new ArrayList<I2PSocket>();
protected boolean _ownDest;
protected Destination dest = null;
protected Destination dest;
private int localPort;
private boolean listenerReady = false;
/**
* Protected for I2Ping since 0.9.10. Not for use outside package.
*/
protected boolean listenerReady;
protected ServerSocket ss;
private final Object startLock = new Object();
private boolean startRunning = false;
private boolean startRunning;
// private Object closeLock = new Object();
@ -68,7 +71,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
private String privKeyFile;
// true if we are chained from a server.
private boolean chained = false;
private boolean chained;
/** how long to wait before dropping an idle thread */
private static final long HANDLER_KEEPALIVE_MS = 2*60*1000;
@ -582,7 +585,11 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
return i2ps;
}
public final void run() {
/**
* Non-final since 0.9.10.
* Any overrides must set listenerReady = true.
*/
public void run() {
try {
InetAddress addr = getListenHost(l);
if (addr == null) {

View File

@ -6,67 +6,70 @@ package net.i2p.i2ptunnel;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.data.Destination;
import net.i2p.util.EventDispatcher;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
public class I2Ping extends I2PTunnelTask implements Runnable {
private final Log _log = new Log(I2Ping.class);
/**
* Warning - not necessarily a stable API.
* Used by I2PTunnel CLI only. Consider this sample code.
* Not for use outside this package.
*/
public class I2Ping extends I2PTunnelClientBase {
private int PING_COUNT = 3;
public static final String PROP_COMMAND = "command";
private static final int PING_COUNT = 3;
private static final int CPING_COUNT = 5;
private static final int PING_TIMEOUT = 5000;
private static final int PING_TIMEOUT = 30*1000;
private static final long PING_DISTANCE = 1000;
private int MAX_SIMUL_PINGS = 10; // not really final...
private boolean countPing;
private boolean reportTimes = true;
private final I2PSocketManager sockMgr;
private final Logging l;
private boolean finished;
private final String command;
private long timeout = PING_TIMEOUT;
private volatile boolean finished;
private final Object simulLock = new Object();
private int simulPings;
private long lastPingTime;
private final Object lock = new Object();
//public I2Ping(String cmd, Logging l,
// boolean ownDest) {
// I2Ping(cmd, l, (EventDispatcher)null);
//}
public I2Ping(String cmd, Logging l, boolean ownDest, EventDispatcher notifyThis, I2PTunnel tunnel) {
super("I2Ping [" + cmd + "]", notifyThis, tunnel);
this.l = l;
command = cmd;
if (ownDest) {
sockMgr = I2PTunnelClient.buildSocketManager(tunnel);
} else {
sockMgr = I2PTunnelClient.getSocketManager(tunnel);
/**
* tunnel.getOptions must contain "command".
* @throws IllegalArgumentException if it doesn't
*/
public I2Ping(Logging l, boolean ownDest, EventDispatcher notifyThis, I2PTunnel tunnel) {
super(-1, ownDest, l, notifyThis, "I2Ping", tunnel);
if (!tunnel.getClientOptions().containsKey(PROP_COMMAND)) {
// todo clean up
throw new IllegalArgumentException("Options does not contain " + PROP_COMMAND);
}
Thread t = new I2PAppThread(this);
t.setName("Client");
t.start();
open = true;
}
/**
* Overrides super. No client ServerSocket is created.
*/
@Override
public void run() {
// Notify constructor that port is ready
synchronized (this) {
listenerReady = true;
notify();
}
l.log("*** I2Ping results:");
try {
runCommand(command);
runCommand(getTunnel().getClientOptions().getProperty(PROP_COMMAND));
} catch (InterruptedException ex) {
l.log("*** Interrupted");
_log.error("Pinger interrupted", ex);
@ -74,13 +77,15 @@ public class I2Ping extends I2PTunnelTask implements Runnable {
_log.error("Pinger exception", ex);
}
l.log("*** Finished.");
synchronized (lock) {
finished = true;
}
finished = true;
close(false);
}
public void runCommand(String cmd) throws InterruptedException, IOException {
long timeout = PING_TIMEOUT;
int count = PING_COUNT;
boolean countPing = false;
boolean reportTimes = true;
while (true) {
if (cmd.startsWith("-t ")) { // timeout
cmd = cmd.substring(3);
@ -90,6 +95,9 @@ public class I2Ping extends I2PTunnelTask implements Runnable {
return;
} else {
timeout = Long.parseLong(cmd.substring(0, pos));
// convenience, convert msec to sec
if (timeout < 100)
timeout *= 1000;
cmd = cmd.substring(pos + 1);
}
} else if (cmd.startsWith("-m ")) { // max simultaneous pings
@ -109,11 +117,12 @@ public class I2Ping extends I2PTunnelTask implements Runnable {
l.log("Syntax error");
return;
} else {
PING_COUNT = Integer.parseInt(cmd.substring(0, pos));
count = Integer.parseInt(cmd.substring(0, pos));
cmd = cmd.substring(pos + 1);
}
} else if (cmd.startsWith("-c ")) { // "count" ping
countPing = true;
count = CPING_COUNT;
cmd = cmd.substring(3);
} else if (cmd.equals("-h")) { // ping all hosts
cmd = "-l hosts.txt";
@ -129,7 +138,9 @@ public class I2Ping extends I2PTunnelTask implements Runnable {
if (line.indexOf("=") != -1) { // maybe file is hosts.txt?
line = line.substring(0, line.indexOf("="));
}
pingHandlers.add(new PingHandler(line));
PingHandler ph = new PingHandler(line, count, timeout, countPing, reportTimes);
ph.start();
pingHandlers.add(ph);
if (++i > 1)
reportTimes = false;
}
@ -138,28 +149,28 @@ public class I2Ping extends I2PTunnelTask implements Runnable {
t.join();
return;
} else {
Thread t = new PingHandler(cmd);
Thread t = new PingHandler(cmd, count, timeout, countPing, reportTimes);
t.start();
t.join();
return;
}
}
}
@Override
public boolean close(boolean forced) {
if (!open) return true;
synchronized (lock) {
if (!forced && !finished) {
l.log("There are still pings running!");
return false;
}
l.log("Closing pinger " + toString());
l.log("Pinger closed.");
open = false;
return true;
super.close(forced);
if (!forced && !finished) {
l.log("There are still pings running!");
return false;
}
l.log("Closing pinger " + toString());
l.log("Pinger closed.");
return true;
}
public boolean ping(Destination dest) throws I2PException {
private boolean ping(Destination dest, long timeout) throws I2PException {
try {
synchronized (simulLock) {
while (simulPings >= MAX_SIMUL_PINGS) {
@ -184,33 +195,48 @@ public class I2Ping extends I2PTunnelTask implements Runnable {
}
}
public class PingHandler extends I2PAppThread {
private final String destination;
/**
* Does nothing.
* @since 0.9.10
*/
protected void clientConnectionRun(Socket s) {}
public PingHandler(String dest) {
private class PingHandler extends I2PAppThread {
private final String destination;
private final int cnt;
private final long timeout;
private final boolean countPing;
private final boolean reportTimes;
/**
* As of 0.9.10, does NOT start itself.
* Caller must call start()
* @param dest b64 or b32 or host name
*/
public PingHandler(String dest, int count, long timeout, boolean countPings, boolean report) {
this.destination = dest;
cnt = count;
this.timeout = timeout;
countPing = countPings;
reportTimes = report;
setName("PingHandler for " + dest);
start();
}
@Override
public void run() {
try {
Destination dest = I2PAppContext.getGlobalContext().namingService().lookup(destination);
Destination dest = lookup(destination);
if (dest == null) {
synchronized (lock) { // Logger is not thread safe
l.log("Unresolvable: " + destination + "");
}
l.log("Unresolvable: " + destination);
return;
}
int pass = 0;
int fail = 0;
long totalTime = 0;
int cnt = countPing ? CPING_COUNT : PING_COUNT;
StringBuilder pingResults = new StringBuilder(2 * cnt + destination.length() + 3);
for (int i = 0; i < cnt; i++) {
boolean sent;
sent = ping(dest);
sent = ping(dest, timeout);
if (countPing) {
if (!sent) {
pingResults.append(i).append(" ");
@ -242,12 +268,35 @@ public class I2Ping extends I2PTunnelTask implements Runnable {
pingResults.append("and ").append(fail).append(" lost for destination: ");
}
pingResults.append(" ").append(destination);
synchronized (lock) { // Logger is not thread safe
l.log(pingResults.toString());
}
l.log(pingResults.toString());
} catch (I2PException ex) {
_log.error("Error pinging " + destination, ex);
}
}
/**
* @param name b64 or b32 or host name
* @since 0.9.10
*/
private Destination lookup(String name) {
I2PAppContext ctx = I2PAppContext.getGlobalContext();
boolean b32 = name.length() == 60 && name.toLowerCase(Locale.US).endsWith(".b32.i2p");
if (ctx.isRouterContext() && !b32) {
// Local lookup.
// Even though we could do b32 outside router ctx here,
// we do it below instead so we can use the session,
// which we can't do with lookup()
Destination dest = ctx.namingService().lookup(name);
if (dest != null || ctx.isRouterContext() || name.length() >= 516)
return dest;
}
try {
I2PSession sess = sockMgr.getSession();
return sess.lookupDest(name);
} catch (I2PSessionException ise) {
_log.error("Error looking up " + name, ise);
return null;
}
}
}
}

View File

@ -593,8 +593,9 @@ class ConnectionManager {
Long id = Long.valueOf(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
PacketLocal packet = new PacketLocal(_context, peer);
packet.setSendStreamId(id.longValue());
packet.setFlag(Packet.FLAG_ECHO);
packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
packet.setFlag(Packet.FLAG_ECHO |
Packet.FLAG_NO_ACK |
Packet.FLAG_SIGNATURE_INCLUDED);
packet.setOptionalFrom(_session.getMyDestination());
//if ( (keyToUse != null) && (tagsToSend != null) ) {
// packet.setKeyUsed(keyToUse);

View File

@ -295,8 +295,15 @@ class Packet {
*/
public boolean isFlagSet(int flag) { return 0 != (_flags & flag); }
/**
* @param flag bitmask of any flag(s)
*/
public void setFlag(int flag) { _flags |= flag; }
/**
* @param flag bitmask of any flag(s)
* @param set true to set, false to clear
*/
public void setFlag(int flag, boolean set) {
if (set)
_flags |= flag;
@ -304,7 +311,7 @@ class Packet {
_flags &= ~flag;
}
public void setFlags(int flags) { _flags = flags; }
private void setFlags(int flags) { _flags = flags; }
/** the signature on the packet (only included if the flag for it is set)
* @return signature on the packet if the flag for signatures is set

View File

@ -348,8 +348,7 @@ class PacketHandler {
}
} else {
PacketLocal pong = new PacketLocal(_context, packet.getOptionalFrom());
pong.setFlag(Packet.FLAG_ECHO, true);
pong.setFlag(Packet.FLAG_SIGNATURE_INCLUDED, false);
pong.setFlag(Packet.FLAG_ECHO | Packet.FLAG_NO_ACK);
pong.setReceiveStreamId(packet.getSendStreamId());
_manager.getPacketQueue().enqueue(pong);
}

View File

@ -109,7 +109,13 @@ class PacketQueue {
options.setTagsToSend(INITIAL_TAGS_TO_SEND);
options.setTagThreshold(MIN_TAG_THRESHOLD);
} else if (packet.isFlagSet(FLAGS_FINAL_TAGS)) {
options.setSendLeaseSet(false);
if (packet.isFlagSet(Packet.FLAG_ECHO)) {
// Send LS for PING, not for PONG
if (packet.getSendStreamId() <= 0) // pong
options.setSendLeaseSet(false);
} else {
options.setSendLeaseSet(false);
}
options.setTagsToSend(FINAL_TAGS_TO_SEND);
options.setTagThreshold(FINAL_TAG_THRESHOLD);
} else {

View File

@ -1209,6 +1209,19 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* @return null on failure
*/
public Destination lookupDest(String name, long maxWait) throws I2PSessionException {
if (name.length() == 0)
return null;
// Shortcut for b64
if (name.length() >= 516) {
try {
return new Destination(name);
} catch (DataFormatException dfe) {
return null;
}
}
// won't fit in Mapping
if (name.length() >= 256 && !_context.isRouterContext())
return null;
synchronized (_lookupCache) {
Destination rv = _lookupCache.get(name);
if (rv != null)

View File

@ -21,6 +21,12 @@ import net.i2p.data.DataHelper;
*/
public class I2CPMessageHandler {
/**
* This is huge. Mainly to catch a completly bogus response, possibly not an I2CP socket.
* @since 0.9.10
*/
public static final int MAX_LENGTH = 128*1024;
/**
* Read an I2CPMessage from the stream and return the fully populated object.
*
@ -37,8 +43,9 @@ public class I2CPMessageHandler {
} catch (DataFormatException dfe) {
throw new IOException("Connection closed");
}
if (length > MAX_LENGTH)
throw new I2CPMessageException("Invalid message length specified");
try {
if (length < 0) throw new I2CPMessageException("Invalid message length specified");
int type = (int) DataHelper.readLong(in, 1);
I2CPMessage msg = createMessage(type);
// Note that the readMessage() calls don't, in general, read and discard