- Uncomment DHT

- Change DHT from option bit to extension message
- Add DHT start/stop code
- Add UI for DHT enabling
- Add raw datagram protocol type and use for response port
This commit is contained in:
zzz
2012-06-02 18:52:46 +00:00
parent bec33cad87
commit 7b07eb89a3
11 changed files with 195 additions and 28 deletions

View File

@ -28,6 +28,9 @@ abstract class ExtensionHandler {
public static final int ID_PEX = 2;
/** not ut_pex since the compact format is different */
public static final String TYPE_PEX = "i2p_pex";
public static final int ID_DHT = 3;
/** not using the option bit since the compact format is different */
public static final String TYPE_DHT = "i2p_dht";
/** Pieces * SHA1 Hash length, + 25% extra for file names, benconding overhead, etc */
private static final int MAX_METADATA_SIZE = Storage.MAX_PIECES * 20 * 5 / 4;
private static final int PARALLEL_REQUESTS = 3;
@ -36,9 +39,10 @@ abstract class ExtensionHandler {
/**
* @param metasize -1 if unknown
* @param pexAndMetadata advertise these capabilities
* @param dht advertise DHT capability
* @return bencoded outgoing handshake message
*/
public static byte[] getHandshake(int metasize, boolean pexAndMetadata) {
public static byte[] getHandshake(int metasize, boolean pexAndMetadata, boolean dht) {
Map<String, Object> handshake = new HashMap();
Map<String, Integer> m = new HashMap();
if (pexAndMetadata) {
@ -47,6 +51,9 @@ abstract class ExtensionHandler {
if (metasize >= 0)
handshake.put("metadata_size", Integer.valueOf(metasize));
}
if (dht) {
m.put(TYPE_DHT, Integer.valueOf(ID_DHT));
}
// include the map even if empty so the far-end doesn't NPE
handshake.put("m", m);
handshake.put("p", Integer.valueOf(6881));
@ -65,6 +72,8 @@ abstract class ExtensionHandler {
handleMetadata(peer, listener, bs, log);
else if (id == ID_PEX)
handlePEX(peer, listener, bs, log);
else if (id == ID_DHT)
handleDHT(peer, listener, bs, log);
else if (log.shouldLog(Log.INFO))
log.info("Unknown extension msg " + id + " from " + peer);
}
@ -87,6 +96,12 @@ abstract class ExtensionHandler {
// peer state calls peer listener calls sendPEX()
}
if (msgmap.get(TYPE_DHT) != null) {
if (log.shouldLog(Log.DEBUG))
log.debug("Peer supports DHT extension: " + peer);
// peer state calls peer listener calls sendDHT()
}
MagnetState state = peer.getMagnetState();
if (msgmap.get(TYPE_METADATA) == null) {
@ -332,6 +347,28 @@ abstract class ExtensionHandler {
}
}
/**
* Receive the DHT port numbers
* @since DHT
*/
private static void handleDHT(Peer peer, PeerListener listener, byte[] bs, Log log) {
if (log.shouldLog(Log.DEBUG))
log.debug("Got DHT msg from " + peer);
try {
InputStream is = new ByteArrayInputStream(bs);
BDecoder dec = new BDecoder(is);
BEValue bev = dec.bdecodeMap();
Map<String, BEValue> map = bev.getMap();
int qport = map.get("port").getInt();
int rport = map.get("rport").getInt();
listener.gotPort(peer, qport, rport);
} catch (Exception e) {
if (log.shouldLog(Log.INFO))
log.info("DHT msg exception from " + peer, e);
//peer.disconnect(false);
}
}
/**
* added.f and dropped unsupported
* @param pList non-null
@ -359,4 +396,22 @@ abstract class ExtensionHandler {
}
}
/**
* Send the DHT port numbers
* @since DHT
*/
public static void sendDHT(Peer peer, int qport, int rport) {
Map<String, Object> map = new HashMap();
map.put("port", Integer.valueOf(qport));
map.put("rport", Integer.valueOf(rport));
byte[] payload = BEncoder.bencode(map);
try {
int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get(TYPE_DHT).getInt();
peer.sendExtension(hisMsgCode, payload);
} catch (Exception e) {
// NPE, no DHT caps
//if (log.shouldLog(Log.INFO))
// log.info("DHT msg exception to " + peer, e);
}
}
}

View File

@ -36,7 +36,7 @@ import net.i2p.util.SimpleTimer;
import net.i2p.util.Translate;
import org.klomp.snark.dht.DHT;
//import org.klomp.snark.dht.KRPC;
import org.klomp.snark.dht.KRPC;
/**
* I2P specific helpers for I2PSnark
@ -63,6 +63,7 @@ public class I2PSnarkUtil {
private final File _tmpDir;
private int _startupDelay;
private boolean _shouldUseOT;
private boolean _shouldUseDHT;
private boolean _areFilesPublic;
private String _openTrackerString;
private DHT _dht;
@ -73,7 +74,7 @@ public class I2PSnarkUtil {
public static final int DEFAULT_MAX_UP_BW = 8; //KBps
public static final int MAX_CONNECTIONS = 16; // per torrent
public static final String PROP_MAX_BW = "i2cp.outboundBytesPerSecond";
//private static final boolean ENABLE_DHT = true;
public static final boolean DEFAULT_USE_DHT = true;
public I2PSnarkUtil(I2PAppContext ctx) {
_context = ctx;
@ -88,6 +89,7 @@ public class I2PSnarkUtil {
_maxConnections = MAX_CONNECTIONS;
_startupDelay = DEFAULT_STARTUP_DELAY;
_shouldUseOT = DEFAULT_USE_OPENTRACKERS;
_shouldUseDHT = DEFAULT_USE_DHT;
// This is used for both announce replies and .torrent file downloads,
// so it must be available even if not connected to I2CP.
// so much for multiple instances
@ -234,8 +236,8 @@ public class I2PSnarkUtil {
_manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts);
}
// FIXME this only instantiates krpc once, left stuck with old manager
//if (ENABLE_DHT && _manager != null && _dht == null)
// _dht = new KRPC(_context, _manager.getSession());
if (_shouldUseDHT && _manager != null && _dht == null)
_dht = new KRPC(_context, _manager.getSession());
return (_manager != null);
}
@ -250,7 +252,11 @@ public class I2PSnarkUtil {
/**
* Destroy the destination itself
*/
public void disconnect() {
public synchronized void disconnect() {
if (_dht != null) {
_dht.stop();
_dht = null;
}
I2PSocketManager mgr = _manager;
// FIXME this can cause race NPEs elsewhere
_manager = null;
@ -490,6 +496,22 @@ public class I2PSnarkUtil {
public boolean shouldUseOpenTrackers() {
return _shouldUseOT;
}
/** @since DHT */
public synchronized void setUseDHT(boolean yes) {
_shouldUseDHT = yes;
if (yes && _manager != null && _dht == null) {
_dht = new KRPC(_context, _manager.getSession());
} else if (!yes && _dht != null) {
_dht.stop();
_dht = null;
}
}
/** @since DHT */
public boolean shouldUseDHT() {
return _shouldUseDHT;
}
/**
* Like DataHelper.toHexString but ensures no loss of leading zero bytes

View File

@ -80,7 +80,9 @@ public class Peer implements Comparable
static final long OPTION_FAST = 0x0000000000000004l;
static final long OPTION_DHT = 0x0000000000000001l;
/** we use a different bit since the compact format is different */
/* no, let's use an extension message
static final long OPTION_I2P_DHT = 0x0000000040000000l;
*/
static final long OPTION_AZMP = 0x1000000000000000l;
private long options;
@ -269,15 +271,17 @@ public class Peer implements Comparable
_log.debug("Peer supports extensions, sending reply message");
int metasize = metainfo != null ? metainfo.getInfoBytes().length : -1;
boolean pexAndMetadata = metainfo == null || !metainfo.isPrivate();
out.sendExtension(0, ExtensionHandler.getHandshake(metasize, pexAndMetadata));
boolean dht = util.getDHT() != null;
out.sendExtension(0, ExtensionHandler.getHandshake(metasize, pexAndMetadata, dht));
}
if ((options & OPTION_I2P_DHT) != 0 && util.getDHT() != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer supports DHT, sending PORT message");
int port = util.getDHT().getPort();
out.sendPort(port);
}
// Old DHT PORT message
//if ((options & OPTION_I2P_DHT) != 0 && util.getDHT() != null) {
// if (_log.shouldLog(Log.DEBUG))
// _log.debug("Peer supports DHT, sending PORT message");
// int port = util.getDHT().getPort();
// out.sendPort(port);
//}
// Send our bitmap
if (bitfield != null)

View File

@ -1273,6 +1273,7 @@ class PeerCoordinator implements PeerListener
}
} else if (id == ExtensionHandler.ID_HANDSHAKE) {
sendPeers(peer);
sendDHT(peer);
}
}
@ -1301,6 +1302,26 @@ class PeerCoordinator implements PeerListener
} catch (InvalidBEncodingException ibee) {}
}
/**
* Send a DHT message to the peer, if we both support DHT.
* @since DHT
*/
void sendDHT(Peer peer) {
DHT dht = _util.getDHT();
if (dht == null)
return;
Map<String, BEValue> handshake = peer.getHandshakeMap();
if (handshake == null)
return;
BEValue bev = handshake.get("m");
if (bev == null)
return;
try {
if (bev.getMap().get(ExtensionHandler.TYPE_DHT) != null)
ExtensionHandler.sendDHT(peer, dht.getPort(), dht.getRPort());
} catch (InvalidBEncodingException ibee) {}
}
/**
* Sets the storage after transition out of magnet mode
* Snark calls this after we call gotMetaInfo()
@ -1318,11 +1339,13 @@ class PeerCoordinator implements PeerListener
/**
* PeerListener callback
* Tell the DHT to ping it, this will get back the node info
* @param rport must be port + 1
* @since 0.8.4
*/
public void gotPort(Peer peer, int port) {
public void gotPort(Peer peer, int port, int rport) {
DHT dht = _util.getDHT();
if (dht != null)
if (dht != null &&
port > 0 && port < 65535 && rport == port + 1)
dht.ping(peer.getDestination(), port);
}

View File

@ -190,13 +190,14 @@ interface PeerListener
void gotExtension(Peer peer, int id, byte[] bs);
/**
* Called when a port message is received.
* Called when a DHT port message is received.
*
* @param peer the Peer that got the message.
* @param port the port
* @param port the query port
* @param port the response port
* @since 0.8.4
*/
void gotPort(Peer peer, int port);
void gotPort(Peer peer, int port, int qport);
/**
* Called when peers are received via PEX

View File

@ -526,10 +526,14 @@ class PeerState implements DataLoader
setInteresting(true);
}
/** @since 0.8.4 */
/**
* Unused
* @since 0.8.4
*/
void portMessage(int port)
{
listener.gotPort(peer, port);
// for compatibility with old DHT PORT message
listener.gotPort(peer, port, port + 1);
}
void unknownMessage(int type, byte[] bs)

View File

@ -85,6 +85,7 @@ public class SnarkManager implements Snark.CompleteListener {
public static final String DEFAULT_THEME = "ubergine";
private static final String PROP_USE_OPENTRACKERS = "i2psnark.useOpentrackers";
public static final String PROP_OPENTRACKERS = "i2psnark.opentrackers";
private static final String PROP_USE_DHT = "i2psnark.enableDHT";
public static final int MIN_UP_BW = 2;
public static final int DEFAULT_MAX_UP_BW = 10;
@ -273,6 +274,8 @@ public class SnarkManager implements Snark.CompleteListener {
_config.setProperty(PROP_STARTUP_DELAY, Integer.toString(DEFAULT_STARTUP_DELAY));
if (!_config.containsKey(PROP_THEME))
_config.setProperty(PROP_THEME, DEFAULT_THEME);
if (!_config.containsKey(PROP_USE_DHT))
_config.setProperty(PROP_USE_DHT, Boolean.toString(I2PSnarkUtil.DEFAULT_USE_DHT));
updateConfig();
}
/**
@ -347,6 +350,7 @@ public class SnarkManager implements Snark.CompleteListener {
String useOT = _config.getProperty(PROP_USE_OPENTRACKERS);
boolean bOT = useOT == null || Boolean.valueOf(useOT).booleanValue();
_util.setUseOpenTrackers(bOT);
_util.setUseDHT(Boolean.valueOf(PROP_USE_DHT).booleanValue());
getDataDir().mkdirs();
initTrackerMap();
}
@ -365,7 +369,7 @@ public class SnarkManager implements Snark.CompleteListener {
public void updateConfig(String dataDir, boolean filesPublic, boolean autoStart, String refreshDelay,
String startDelay, String seedPct, String eepHost,
String eepPort, String i2cpHost, String i2cpPort, String i2cpOpts,
String upLimit, String upBW, boolean useOpenTrackers, String theme) {
String upLimit, String upBW, boolean useOpenTrackers, boolean useDHT, String theme) {
boolean changed = false;
//if (eepHost != null) {
// // unused, we use socket eepget
@ -549,6 +553,15 @@ public class SnarkManager implements Snark.CompleteListener {
_util.setUseOpenTrackers(useOpenTrackers);
changed = true;
}
if (_util.shouldUseDHT() != useDHT) {
_config.setProperty(PROP_USE_DHT, Boolean.toString(useDHT));
if (useDHT)
addMessage(_("Enabled DHT."));
else
addMessage(_("Disabled DHT."));
_util.setUseDHT(useDHT);
changed = true;
}
if (theme != null) {
if(!theme.equals(_config.getProperty(PROP_THEME))) {
_config.setProperty(PROP_THEME, theme);

View File

@ -17,10 +17,15 @@ public interface DHT {
/**
* @return The UDP port that should be included in a PORT message.
* @return The UDP query port
*/
public int getPort();
/**
* @return The UDP response port
*/
public int getRPort();
/**
* Ping. We don't have a NID yet so the node is presumed
* to be absent from our DHT.
@ -79,4 +84,9 @@ public interface DHT {
* @return the number of successful announces, not counting ourselves.
*/
public int announce(byte[] ih, int max, long maxWait);
/**
* Stop everything.
*/
public void stop();
}

View File

@ -151,7 +151,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
NID myNID = new NID(myID);
_myNodeInfo = new NodeInfo(myNID, session.getMyDestination(), _qPort);
session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM, _rPort);
session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort);
session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM, _qPort);
// can't be stopped
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME);
@ -183,12 +183,19 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
}
/**
* @return The UDP port that should be included in a PORT message.
* @return The UDP query port
*/
public int getPort() {
return _qPort;
}
/**
* @return The UDP response port
*/
public int getRPort() {
return _rPort;
}
/**
* Ping. We don't have a NID yet so the node is presumed
* to be absent from our DHT.
@ -481,12 +488,19 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
}
/**
* Does nothing yet.
* Stop everything.
*/
public void stop() {
// stop the explore thread
// FIXME stop the explore thread
// unregister port listeners
// does not clear the DHT or tracker yet.
_session.removeListener(I2PSession.PROTO_DATAGRAM, _qPort);
_session.removeListener(I2PSession.PROTO_DATAGRAM_RAW, _rPort);
// clear the DHT and tracker
_tracker.stop();
_knownNodes.clear();
_sentQueries.clear();
_outgoingTokens.clear();
_incomingTokens.clear();
}
/**

View File

@ -693,11 +693,12 @@ public class I2PSnarkServlet extends DefaultServlet {
String refreshDel = req.getParameter("refreshDelay");
String startupDel = req.getParameter("startupDelay");
boolean useOpenTrackers = req.getParameter("useOpenTrackers") != null;
boolean useDHT = req.getParameter("useDHT") != null;
//String openTrackers = req.getParameter("openTrackers");
String theme = req.getParameter("theme");
_manager.updateConfig(dataDir, filesPublic, autoStart, refreshDel, startupDel,
seedPct, eepHost, eepPort, i2cpHost, i2cpPort, i2cpOpts,
upLimit, upBW, useOpenTrackers, theme);
upLimit, upBW, useOpenTrackers, useDHT, theme);
} else if ("Save2".equals(action)) {
String taction = req.getParameter("taction");
if (taction != null)
@ -1438,6 +1439,7 @@ public class I2PSnarkServlet extends DefaultServlet {
boolean autoStart = _manager.shouldAutoStart();
boolean useOpenTrackers = _manager.util().shouldUseOpenTrackers();
//String openTrackers = _manager.util().getOpenTrackerString();
boolean useDHT = _manager.util().shouldUseDHT();
//int seedPct = 0;
out.write("<form action=\"/i2psnark/configure\" method=\"POST\">\n" +
@ -1551,6 +1553,14 @@ public class I2PSnarkServlet extends DefaultServlet {
+ (useOpenTrackers ? "checked " : "")
+ "title=\"");
out.write(_("If checked, announce torrents to open trackers as well as the tracker listed in the torrent file"));
out.write("\" ></td></tr>\n" +
"<tr><td>");
out.write(_("Enable DHT"));
out.write(": <td><input type=\"checkbox\" class=\"optbox\" name=\"useDHT\" value=\"true\" "
+ (useDHT ? "checked " : "")
+ "title=\"");
out.write(_("If checked, use DHT"));
out.write("\" ></td></tr>\n");
// "<tr><td>");