diff --git a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java index e67466f4a5..abde95e801 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java +++ b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java @@ -28,6 +28,9 @@ abstract class ExtensionHandler { public static final int ID_PEX = 2; /** not ut_pex since the compact format is different */ public static final String TYPE_PEX = "i2p_pex"; + public static final int ID_DHT = 3; + /** not using the option bit since the compact format is different */ + public static final String TYPE_DHT = "i2p_dht"; /** Pieces * SHA1 Hash length, + 25% extra for file names, benconding overhead, etc */ private static final int MAX_METADATA_SIZE = Storage.MAX_PIECES * 20 * 5 / 4; private static final int PARALLEL_REQUESTS = 3; @@ -36,9 +39,10 @@ abstract class ExtensionHandler { /** * @param metasize -1 if unknown * @param pexAndMetadata advertise these capabilities + * @param dht advertise DHT capability * @return bencoded outgoing handshake message */ - public static byte[] getHandshake(int metasize, boolean pexAndMetadata) { + public static byte[] getHandshake(int metasize, boolean pexAndMetadata, boolean dht) { Map handshake = new HashMap(); Map m = new HashMap(); if (pexAndMetadata) { @@ -47,6 +51,9 @@ abstract class ExtensionHandler { if (metasize >= 0) handshake.put("metadata_size", Integer.valueOf(metasize)); } + if (dht) { + m.put(TYPE_DHT, Integer.valueOf(ID_DHT)); + } // include the map even if empty so the far-end doesn't NPE handshake.put("m", m); handshake.put("p", Integer.valueOf(6881)); @@ -65,6 +72,8 @@ abstract class ExtensionHandler { handleMetadata(peer, listener, bs, log); else if (id == ID_PEX) handlePEX(peer, listener, bs, log); + else if (id == ID_DHT) + handleDHT(peer, listener, bs, log); else if (log.shouldLog(Log.INFO)) log.info("Unknown extension msg " + id + " from " + peer); } @@ -87,6 +96,12 @@ abstract class ExtensionHandler { // peer state calls peer listener calls sendPEX() } + if (msgmap.get(TYPE_DHT) != null) { + if (log.shouldLog(Log.DEBUG)) + log.debug("Peer supports DHT extension: " + peer); + // peer state calls peer listener calls sendDHT() + } + MagnetState state = peer.getMagnetState(); if (msgmap.get(TYPE_METADATA) == null) { @@ -332,6 +347,28 @@ abstract class ExtensionHandler { } } + /** + * Receive the DHT port numbers + * @since DHT + */ + private static void handleDHT(Peer peer, PeerListener listener, byte[] bs, Log log) { + if (log.shouldLog(Log.DEBUG)) + log.debug("Got DHT msg from " + peer); + try { + InputStream is = new ByteArrayInputStream(bs); + BDecoder dec = new BDecoder(is); + BEValue bev = dec.bdecodeMap(); + Map map = bev.getMap(); + int qport = map.get("port").getInt(); + int rport = map.get("rport").getInt(); + listener.gotPort(peer, qport, rport); + } catch (Exception e) { + if (log.shouldLog(Log.INFO)) + log.info("DHT msg exception from " + peer, e); + //peer.disconnect(false); + } + } + /** * added.f and dropped unsupported * @param pList non-null @@ -359,4 +396,22 @@ abstract class ExtensionHandler { } } + /** + * Send the DHT port numbers + * @since DHT + */ + public static void sendDHT(Peer peer, int qport, int rport) { + Map map = new HashMap(); + map.put("port", Integer.valueOf(qport)); + map.put("rport", Integer.valueOf(rport)); + byte[] payload = BEncoder.bencode(map); + try { + int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get(TYPE_DHT).getInt(); + peer.sendExtension(hisMsgCode, payload); + } catch (Exception e) { + // NPE, no DHT caps + //if (log.shouldLog(Log.INFO)) + // log.info("DHT msg exception to " + peer, e); + } + } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 320b5005a2..dca9fbaf2b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -36,7 +36,7 @@ import net.i2p.util.SimpleTimer; import net.i2p.util.Translate; import org.klomp.snark.dht.DHT; -//import org.klomp.snark.dht.KRPC; +import org.klomp.snark.dht.KRPC; /** * I2P specific helpers for I2PSnark @@ -63,6 +63,7 @@ public class I2PSnarkUtil { private final File _tmpDir; private int _startupDelay; private boolean _shouldUseOT; + private boolean _shouldUseDHT; private boolean _areFilesPublic; private String _openTrackerString; private DHT _dht; @@ -73,7 +74,7 @@ public class I2PSnarkUtil { public static final int DEFAULT_MAX_UP_BW = 8; //KBps public static final int MAX_CONNECTIONS = 16; // per torrent public static final String PROP_MAX_BW = "i2cp.outboundBytesPerSecond"; - //private static final boolean ENABLE_DHT = true; + public static final boolean DEFAULT_USE_DHT = true; public I2PSnarkUtil(I2PAppContext ctx) { _context = ctx; @@ -88,6 +89,7 @@ public class I2PSnarkUtil { _maxConnections = MAX_CONNECTIONS; _startupDelay = DEFAULT_STARTUP_DELAY; _shouldUseOT = DEFAULT_USE_OPENTRACKERS; + _shouldUseDHT = DEFAULT_USE_DHT; // This is used for both announce replies and .torrent file downloads, // so it must be available even if not connected to I2CP. // so much for multiple instances @@ -234,8 +236,8 @@ public class I2PSnarkUtil { _manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts); } // FIXME this only instantiates krpc once, left stuck with old manager - //if (ENABLE_DHT && _manager != null && _dht == null) - // _dht = new KRPC(_context, _manager.getSession()); + if (_shouldUseDHT && _manager != null && _dht == null) + _dht = new KRPC(_context, _manager.getSession()); return (_manager != null); } @@ -250,7 +252,11 @@ public class I2PSnarkUtil { /** * Destroy the destination itself */ - public void disconnect() { + public synchronized void disconnect() { + if (_dht != null) { + _dht.stop(); + _dht = null; + } I2PSocketManager mgr = _manager; // FIXME this can cause race NPEs elsewhere _manager = null; @@ -490,6 +496,22 @@ public class I2PSnarkUtil { public boolean shouldUseOpenTrackers() { return _shouldUseOT; } + + /** @since DHT */ + public synchronized void setUseDHT(boolean yes) { + _shouldUseDHT = yes; + if (yes && _manager != null && _dht == null) { + _dht = new KRPC(_context, _manager.getSession()); + } else if (!yes && _dht != null) { + _dht.stop(); + _dht = null; + } + } + + /** @since DHT */ + public boolean shouldUseDHT() { + return _shouldUseDHT; + } /** * Like DataHelper.toHexString but ensures no loss of leading zero bytes diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index 02fb635605..8f33c01a72 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -80,7 +80,9 @@ public class Peer implements Comparable static final long OPTION_FAST = 0x0000000000000004l; static final long OPTION_DHT = 0x0000000000000001l; /** we use a different bit since the compact format is different */ +/* no, let's use an extension message static final long OPTION_I2P_DHT = 0x0000000040000000l; +*/ static final long OPTION_AZMP = 0x1000000000000000l; private long options; @@ -269,15 +271,17 @@ public class Peer implements Comparable _log.debug("Peer supports extensions, sending reply message"); int metasize = metainfo != null ? metainfo.getInfoBytes().length : -1; boolean pexAndMetadata = metainfo == null || !metainfo.isPrivate(); - out.sendExtension(0, ExtensionHandler.getHandshake(metasize, pexAndMetadata)); + boolean dht = util.getDHT() != null; + out.sendExtension(0, ExtensionHandler.getHandshake(metasize, pexAndMetadata, dht)); } - if ((options & OPTION_I2P_DHT) != 0 && util.getDHT() != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Peer supports DHT, sending PORT message"); - int port = util.getDHT().getPort(); - out.sendPort(port); - } + // Old DHT PORT message + //if ((options & OPTION_I2P_DHT) != 0 && util.getDHT() != null) { + // if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Peer supports DHT, sending PORT message"); + // int port = util.getDHT().getPort(); + // out.sendPort(port); + //} // Send our bitmap if (bitfield != null) diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 45b6ef82ac..845b989046 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -1273,6 +1273,7 @@ class PeerCoordinator implements PeerListener } } else if (id == ExtensionHandler.ID_HANDSHAKE) { sendPeers(peer); + sendDHT(peer); } } @@ -1301,6 +1302,26 @@ class PeerCoordinator implements PeerListener } catch (InvalidBEncodingException ibee) {} } + /** + * Send a DHT message to the peer, if we both support DHT. + * @since DHT + */ + void sendDHT(Peer peer) { + DHT dht = _util.getDHT(); + if (dht == null) + return; + Map handshake = peer.getHandshakeMap(); + if (handshake == null) + return; + BEValue bev = handshake.get("m"); + if (bev == null) + return; + try { + if (bev.getMap().get(ExtensionHandler.TYPE_DHT) != null) + ExtensionHandler.sendDHT(peer, dht.getPort(), dht.getRPort()); + } catch (InvalidBEncodingException ibee) {} + } + /** * Sets the storage after transition out of magnet mode * Snark calls this after we call gotMetaInfo() @@ -1318,11 +1339,13 @@ class PeerCoordinator implements PeerListener /** * PeerListener callback * Tell the DHT to ping it, this will get back the node info + * @param rport must be port + 1 * @since 0.8.4 */ - public void gotPort(Peer peer, int port) { + public void gotPort(Peer peer, int port, int rport) { DHT dht = _util.getDHT(); - if (dht != null) + if (dht != null && + port > 0 && port < 65535 && rport == port + 1) dht.ping(peer.getDestination(), port); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java index f573d44552..ee2de562d2 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java @@ -190,13 +190,14 @@ interface PeerListener void gotExtension(Peer peer, int id, byte[] bs); /** - * Called when a port message is received. + * Called when a DHT port message is received. * * @param peer the Peer that got the message. - * @param port the port + * @param port the query port + * @param port the response port * @since 0.8.4 */ - void gotPort(Peer peer, int port); + void gotPort(Peer peer, int port, int qport); /** * Called when peers are received via PEX diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index f41bf1b575..2ed9c373df 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -526,10 +526,14 @@ class PeerState implements DataLoader setInteresting(true); } - /** @since 0.8.4 */ + /** + * Unused + * @since 0.8.4 + */ void portMessage(int port) { - listener.gotPort(peer, port); + // for compatibility with old DHT PORT message + listener.gotPort(peer, port, port + 1); } void unknownMessage(int type, byte[] bs) diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 7c1c421a8c..56c6ce431a 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -85,6 +85,7 @@ public class SnarkManager implements Snark.CompleteListener { public static final String DEFAULT_THEME = "ubergine"; private static final String PROP_USE_OPENTRACKERS = "i2psnark.useOpentrackers"; public static final String PROP_OPENTRACKERS = "i2psnark.opentrackers"; + private static final String PROP_USE_DHT = "i2psnark.enableDHT"; public static final int MIN_UP_BW = 2; public static final int DEFAULT_MAX_UP_BW = 10; @@ -273,6 +274,8 @@ public class SnarkManager implements Snark.CompleteListener { _config.setProperty(PROP_STARTUP_DELAY, Integer.toString(DEFAULT_STARTUP_DELAY)); if (!_config.containsKey(PROP_THEME)) _config.setProperty(PROP_THEME, DEFAULT_THEME); + if (!_config.containsKey(PROP_USE_DHT)) + _config.setProperty(PROP_USE_DHT, Boolean.toString(I2PSnarkUtil.DEFAULT_USE_DHT)); updateConfig(); } /** @@ -347,6 +350,7 @@ public class SnarkManager implements Snark.CompleteListener { String useOT = _config.getProperty(PROP_USE_OPENTRACKERS); boolean bOT = useOT == null || Boolean.valueOf(useOT).booleanValue(); _util.setUseOpenTrackers(bOT); + _util.setUseDHT(Boolean.valueOf(PROP_USE_DHT).booleanValue()); getDataDir().mkdirs(); initTrackerMap(); } @@ -365,7 +369,7 @@ public class SnarkManager implements Snark.CompleteListener { public void updateConfig(String dataDir, boolean filesPublic, boolean autoStart, String refreshDelay, String startDelay, String seedPct, String eepHost, String eepPort, String i2cpHost, String i2cpPort, String i2cpOpts, - String upLimit, String upBW, boolean useOpenTrackers, String theme) { + String upLimit, String upBW, boolean useOpenTrackers, boolean useDHT, String theme) { boolean changed = false; //if (eepHost != null) { // // unused, we use socket eepget @@ -549,6 +553,15 @@ public class SnarkManager implements Snark.CompleteListener { _util.setUseOpenTrackers(useOpenTrackers); changed = true; } + if (_util.shouldUseDHT() != useDHT) { + _config.setProperty(PROP_USE_DHT, Boolean.toString(useDHT)); + if (useDHT) + addMessage(_("Enabled DHT.")); + else + addMessage(_("Disabled DHT.")); + _util.setUseDHT(useDHT); + changed = true; + } if (theme != null) { if(!theme.equals(_config.getProperty(PROP_THEME))) { _config.setProperty(PROP_THEME, theme); diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java b/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java index 6a16e4e605..2e401f060d 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java @@ -17,10 +17,15 @@ public interface DHT { /** - * @return The UDP port that should be included in a PORT message. + * @return The UDP query port */ public int getPort(); + /** + * @return The UDP response port + */ + public int getRPort(); + /** * Ping. We don't have a NID yet so the node is presumed * to be absent from our DHT. @@ -79,4 +84,9 @@ public interface DHT { * @return the number of successful announces, not counting ourselves. */ public int announce(byte[] ih, int max, long maxWait); + + /** + * Stop everything. + */ + public void stop(); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java index ef6757a831..fc81f5479b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java @@ -151,7 +151,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { NID myNID = new NID(myID); _myNodeInfo = new NodeInfo(myNID, session.getMyDestination(), _qPort); - session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM, _rPort); + session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort); session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM, _qPort); // can't be stopped SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME); @@ -183,12 +183,19 @@ public class KRPC implements I2PSessionMuxedListener, DHT { } /** - * @return The UDP port that should be included in a PORT message. + * @return The UDP query port */ public int getPort() { return _qPort; } + /** + * @return The UDP response port + */ + public int getRPort() { + return _rPort; + } + /** * Ping. We don't have a NID yet so the node is presumed * to be absent from our DHT. @@ -481,12 +488,19 @@ public class KRPC implements I2PSessionMuxedListener, DHT { } /** - * Does nothing yet. + * Stop everything. */ public void stop() { - // stop the explore thread + // FIXME stop the explore thread // unregister port listeners - // does not clear the DHT or tracker yet. + _session.removeListener(I2PSession.PROTO_DATAGRAM, _qPort); + _session.removeListener(I2PSession.PROTO_DATAGRAM_RAW, _rPort); + // clear the DHT and tracker + _tracker.stop(); + _knownNodes.clear(); + _sentQueries.clear(); + _outgoingTokens.clear(); + _incomingTokens.clear(); } /** diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index dd9b14ba3d..515ee18bf2 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -693,11 +693,12 @@ public class I2PSnarkServlet extends DefaultServlet { String refreshDel = req.getParameter("refreshDelay"); String startupDel = req.getParameter("startupDelay"); boolean useOpenTrackers = req.getParameter("useOpenTrackers") != null; + boolean useDHT = req.getParameter("useDHT") != null; //String openTrackers = req.getParameter("openTrackers"); String theme = req.getParameter("theme"); _manager.updateConfig(dataDir, filesPublic, autoStart, refreshDel, startupDel, seedPct, eepHost, eepPort, i2cpHost, i2cpPort, i2cpOpts, - upLimit, upBW, useOpenTrackers, theme); + upLimit, upBW, useOpenTrackers, useDHT, theme); } else if ("Save2".equals(action)) { String taction = req.getParameter("taction"); if (taction != null) @@ -1438,6 +1439,7 @@ public class I2PSnarkServlet extends DefaultServlet { boolean autoStart = _manager.shouldAutoStart(); boolean useOpenTrackers = _manager.util().shouldUseOpenTrackers(); //String openTrackers = _manager.util().getOpenTrackerString(); + boolean useDHT = _manager.util().shouldUseDHT(); //int seedPct = 0; out.write("
\n" + @@ -1551,6 +1553,14 @@ public class I2PSnarkServlet extends DefaultServlet { + (useOpenTrackers ? "checked " : "") + "title=\""); out.write(_("If checked, announce torrents to open trackers as well as the tracker listed in the torrent file")); + out.write("\" >\n" + + + ""); + out.write(_("Enable DHT")); + out.write(": \n"); // ""); diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index 06b094488c..d91c3f20d3 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -252,5 +252,16 @@ public interface I2PSession { public static final int PROTO_ANY = 0; public static final int PROTO_UNSPECIFIED = 0; public static final int PROTO_STREAMING = 6; + + /** + * Generally a signed datagram, but could + * also be a raw datagram, depending on the application + */ public static final int PROTO_DATAGRAM = 17; + + /** + * A raw (unsigned) datagram + * @since 0.9.1 + */ + public static final int PROTO_DATAGRAM_RAW = 18; }