diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 425d51aeb6..0d71dcd346 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -261,7 +261,8 @@ public class I2PSnarkUtil { // FIXME this can cause race NPEs elsewhere _manager = null; _shitlist.clear(); - mgr.destroySocketManager(); + if (mgr != null) + mgr.destroySocketManager(); // this will delete a .torrent file d/l in progress so don't do that... FileUtil.rmdir(_tmpDir, false); // in case the user will d/l a .torrent file next... @@ -405,6 +406,7 @@ public class I2PSnarkUtil { byte[] b = Base32.decode(ip.substring(0, BASE32_HASH_LENGTH)); if (b != null) { Hash h = new Hash(b); + //Hash h = Hash.create(b); if (_log.shouldLog(Log.INFO)) _log.info("Using existing session for lookup of " + ip); try { diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 9bfb885cf3..42927df00f 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -1514,15 +1514,23 @@ public class SnarkManager implements Snark.CompleteListener { } } - public class SnarkManagerShutdown extends I2PAppThread { + private class SnarkManagerShutdown extends I2PAppThread { @Override public void run() { Set names = listTorrentFiles(); + int running = 0; for (Iterator iter = names.iterator(); iter.hasNext(); ) { Snark snark = getTorrent((String)iter.next()); - if ( (snark != null) && (!snark.isStopped()) ) + if (snark != null && !snark.isStopped()) { snark.stopTorrent(); + running++; + } } + _snarks.clear(); + if (running > 0) { + try { sleep(1500); } catch (InterruptedException ie) {}; + } + _util.disconnect(); } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/DHTNodes.java b/apps/i2psnark/java/src/org/klomp/snark/dht/DHTNodes.java index 0e57d1f26a..e18fca6fef 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/DHTNodes.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/DHTNodes.java @@ -15,8 +15,7 @@ import net.i2p.I2PAppContext; import net.i2p.crypto.SHA1Hash; import net.i2p.data.DataHelper; import net.i2p.util.Log; -import net.i2p.util.SimpleScheduler; -import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** * All the nodes we know about, stored as a mapping from @@ -33,6 +32,7 @@ class DHTNodes extends ConcurrentHashMap { private final I2PAppContext _context; private long _expireTime; private final Log _log; + private volatile boolean _isRunning; /** stagger with other cleaners */ private static final long CLEAN_TIME = 237*1000; @@ -46,7 +46,16 @@ class DHTNodes extends ConcurrentHashMap { _context = ctx; _expireTime = MAX_EXPIRE_TIME; _log = _context.logManager().getLog(DHTNodes.class); - SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME); + } + + public void start() { + _isRunning = true; + new Cleaner(); + } + + public void stop() { + clear(); + _isRunning = false; } /** @@ -82,9 +91,15 @@ class DHTNodes extends ConcurrentHashMap { ****/ /** */ - private class Cleaner implements SimpleTimer.TimedEvent { + private class Cleaner extends SimpleTimer2.TimedEvent { + + public Cleaner() { + super(SimpleTimer2.getInstance(), CLEAN_TIME); + } public void timeReached() { + if (!_isRunning) + return; long now = _context.clock().now(); int peerCount = 0; for (Iterator iter = DHTNodes.this.values().iterator(); iter.hasNext(); ) { @@ -100,11 +115,12 @@ class DHTNodes extends ConcurrentHashMap { else _expireTime = Math.min(_expireTime + DELTA_EXPIRE_TIME, MAX_EXPIRE_TIME); - if (_log.shouldLog(Log.INFO)) - _log.info("DHT storage cleaner done, now with " + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("DHT storage cleaner done, now with " + peerCount + " peers, " + DataHelper.formatDuration(_expireTime) + " expiration"); + schedule(CLEAN_TIME); } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/DHTTracker.java b/apps/i2psnark/java/src/org/klomp/snark/dht/DHTTracker.java index d7e9df93ed..97fbcef73d 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/DHTTracker.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/DHTTracker.java @@ -12,8 +12,7 @@ import net.i2p.I2PAppContext; import net.i2p.data.DataHelper; import net.i2p.data.Hash; import net.i2p.util.Log; -import net.i2p.util.SimpleScheduler; -import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** * The tracker stores peers, i.e. Dest hashes (not nodes). @@ -27,6 +26,7 @@ class DHTTracker { private final Torrents _torrents; private long _expireTime; private final Log _log; + private volatile boolean _isRunning; /** stagger with other cleaners */ private static final long CLEAN_TIME = 199*1000; @@ -41,12 +41,16 @@ class DHTTracker { _torrents = new Torrents(); _expireTime = MAX_EXPIRE_TIME; _log = _context.logManager().getLog(DHTTracker.class); - SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME); + } + + public void start() { + _isRunning = true; + new Cleaner(); } void stop() { _torrents.clear(); - // no way to stop the cleaner + _isRunning = false; } void announce(InfoHash ih, Hash hash) { @@ -93,9 +97,15 @@ class DHTTracker { return rv; } - private class Cleaner implements SimpleTimer.TimedEvent { + private class Cleaner extends SimpleTimer2.TimedEvent { + + public Cleaner() { + super(SimpleTimer2.getInstance(), CLEAN_TIME); + } public void timeReached() { + if (!_isRunning) + return; long now = _context.clock().now(); int torrentCount = 0; int peerCount = 0; @@ -122,11 +132,12 @@ class DHTTracker { else _expireTime = Math.min(_expireTime + DELTA_EXPIRE_TIME, MAX_EXPIRE_TIME); - if (_log.shouldLog(Log.INFO)) - _log.info("DHT tracker cleaner done, now with " + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("DHT tracker cleaner done, now with " + torrentCount + " torrents, " + peerCount + " peers, " + DataHelper.formatDuration(_expireTime) + " expiration"); + schedule(CLEAN_TIME); } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java index f34a6a1dda..2c44deb68f 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java @@ -5,6 +5,7 @@ package org.klomp.snark.dht; */ import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -36,8 +37,6 @@ import net.i2p.data.Destination; import net.i2p.data.Hash; import net.i2p.data.SimpleDataStructure; import net.i2p.util.Log; -import net.i2p.util.SimpleScheduler; -import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer2; import org.klomp.snark.bencode.BDecoder; @@ -108,6 +107,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { private final int _rPort; /** signed dgrams */ private final int _qPort; + private final File _dhtFile; + private volatile boolean _isRunning; /** all-zero NID used for pings */ private static final NID _fakeNID = new NID(new byte[NID.HASH_LENGTH]); @@ -134,6 +135,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { private static final long DEFAULT_QUERY_TIMEOUT = 75*1000; /** stagger with other cleaners */ private static final long CLEAN_TIME = 63*1000; + private static final String DHT_FILE = "i2psnark.dht.dat"; public KRPC (I2PAppContext ctx, I2PSession session) { _context = ctx; @@ -156,11 +158,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ctx.random().nextBytes(_myID); _myNID = new NID(_myID); _myNodeInfo = new NodeInfo(_myNID, session.getMyDestination(), _qPort); + _dhtFile = new File(ctx.getConfigDir(), DHT_FILE); session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort); session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM, _qPort); - // can't be stopped - SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME); + start(); } ///////////////// Public methods @@ -391,6 +393,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { public void announce(byte[] ih, byte[] peerHash) { InfoHash iHash = new InfoHash(ih); _tracker.announce(iHash, new Hash(peerHash)); +// _tracker.announce(iHash, Hash.create(peerHash)); } /** @@ -506,24 +509,32 @@ public class KRPC implements I2PSessionMuxedListener, DHT { } /** - * Does nothing yet, everything is prestarted. + * Loads the DHT from file. * Can't be restarted after stopping? */ public void start() { + _knownNodes.start(); + _tracker.start(); + PersistDHT.loadDHT(this, _dhtFile); // start the explore thread + _isRunning = true; + // no need to keep ref, it will eventually stop + new Cleaner(); } /** * Stop everything. */ public void stop() { + _isRunning = false; // FIXME stop the explore thread // unregister port listeners _session.removeListener(I2PSession.PROTO_DATAGRAM, _qPort); _session.removeListener(I2PSession.PROTO_DATAGRAM_RAW, _rPort); // clear the DHT and tracker _tracker.stop(); - _knownNodes.clear(); + PersistDHT.saveDHT(_knownNodes, _dhtFile); + _knownNodes.stop(); _sentQueries.clear(); _outgoingTokens.clear(); _incomingTokens.clear(); @@ -1175,6 +1186,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { for (BEValue bev : peers) { byte[] b = bev.getBytes(); Hash h = new Hash(b); + //Hash h = Hash.create(b); rv.add(h); } if (_log.shouldLog(Log.INFO)) @@ -1303,7 +1315,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { if (onTimeout != null) onTimeout.run(); if (_log.shouldLog(Log.INFO)) - _log.warn("timeout waiting for reply from " + this.toString()); + _log.warn("timeout waiting for reply from " + ReplyWaiter.this.toString()); } } } @@ -1370,9 +1382,15 @@ public class KRPC implements I2PSessionMuxedListener, DHT { /** * Cleaner-upper */ - private class Cleaner implements SimpleTimer.TimedEvent { + private class Cleaner extends SimpleTimer2.TimedEvent { + + public Cleaner() { + super(SimpleTimer2.getInstance(), CLEAN_TIME); + } public void timeReached() { + if (!_isRunning) + return; long now = _context.clock().now(); for (Iterator iter = _outgoingTokens.keySet().iterator(); iter.hasNext(); ) { Token tok = iter.next(); @@ -1390,12 +1408,13 @@ public class KRPC implements I2PSessionMuxedListener, DHT { if (ni.lastSeen() < now - MAX_NODEINFO_AGE) iter.remove(); } - if (_log.shouldLog(Log.INFO)) - _log.info("KRPC cleaner done, now with " + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("KRPC cleaner done, now with " + _outgoingTokens.size() + " sent Tokens, " + _incomingTokens.size() + " rcvd Tokens, " + _knownNodes.size() + " known peers, " + _sentQueries.size() + " queries awaiting response"); + schedule(CLEAN_TIME); } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/NodeInfo.java b/apps/i2psnark/java/src/org/klomp/snark/dht/NodeInfo.java index 1d82b268a4..e4f52091bf 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/NodeInfo.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/NodeInfo.java @@ -3,6 +3,8 @@ package org.klomp.snark.dht; * From zzzot, modded and relicensed to GPLv2 */ +import net.i2p.data.Base64; +import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.data.Hash; @@ -41,7 +43,7 @@ class NodeInfo extends SimpleDataStructure { this.dest = dest; this.hash = dest.calculateHash(); this.port = port; - initialize(nID, this.hash, port); + initialize(); } /** @@ -53,7 +55,7 @@ class NodeInfo extends SimpleDataStructure { this.nID = nID; this.hash = hash; this.port = port; - initialize(nID, hash, port); + initialize(); } /** @@ -81,6 +83,36 @@ class NodeInfo extends SimpleDataStructure { initialize(d); } + /** + * Form persistent storage string. + * Format: NID:Hash:Destination:port + * First 3 in base 64; Destination may be empty string + * @throws IllegalArgumentException + */ + public NodeInfo(String s) throws DataFormatException { + super(); + String[] parts = s.split(":", 4); + if (parts.length != 4) + throw new DataFormatException("Bad format"); + byte[] nid = Base64.decode(parts[0]); + if (nid == null) + throw new DataFormatException("Bad NID"); + nID = new NID(nid); + byte[] h = Base64.decode(parts[1]); + if (h == null) + throw new DataFormatException("Bad hash"); + hash = new Hash(h); + //hash = Hash.create(h); + if (parts[2].length() > 0) + dest = new Destination(parts[2]); + try { + port = Integer.parseInt(parts[3]); + } catch (NumberFormatException nfe) { + throw new DataFormatException("Bad port", nfe); + } + initialize(); + } + /** * Creates data structures from the compact info * @throws IllegalArgumentException @@ -91,18 +123,22 @@ class NodeInfo extends SimpleDataStructure { byte[] ndata = new byte[NID.HASH_LENGTH]; System.arraycopy(compactInfo, 0, ndata, 0, NID.HASH_LENGTH); this.nID = new NID(ndata); + //3 lines or... byte[] hdata = new byte[Hash.HASH_LENGTH]; System.arraycopy(compactInfo, NID.HASH_LENGTH, hdata, 0, Hash.HASH_LENGTH); this.hash = new Hash(hdata); + //this.hash = Hash.create(compactInfo, NID.HASH_LENGTH); this.port = (int) DataHelper.fromLong(compactInfo, NID.HASH_LENGTH + Hash.HASH_LENGTH, 2); + if (port <= 0 || port >= 65535) + throw new IllegalArgumentException("Bad port"); } /** * Creates 54-byte compact info * @throws IllegalArgumentException */ - private void initialize(NID nID, Hash hash, int port) { - if (port < 0 || port > 65535) + private void initialize() { + if (port <= 0 || port >= 65535) throw new IllegalArgumentException("Bad port"); byte[] compactInfo = new byte[LENGTH]; System.arraycopy(nID.getData(), 0, compactInfo, 0, NID.HASH_LENGTH); @@ -173,7 +209,24 @@ class NodeInfo extends SimpleDataStructure { } } + @Override public String toString() { return "NodeInfo: " + nID + ' ' + hash + " port: " + port; } + + /** + * To persistent storage string. + * Format: NID:Hash:Destination:port + * First 3 in base 64; Destination may be empty string + */ + public String toPersistentString() { + StringBuilder buf = new StringBuilder(650); + buf.append(nID.toBase64()).append(':'); + buf.append(hash.toBase64()).append(':'); + if (dest != null) + buf.append(dest.toBase64()); + buf.append(':').append(port); + return buf.toString(); + } + } diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/PersistDHT.java b/apps/i2psnark/java/src/org/klomp/snark/dht/PersistDHT.java new file mode 100644 index 0000000000..dde495a04d --- /dev/null +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/PersistDHT.java @@ -0,0 +1,77 @@ +package org.klomp.snark.dht; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataFormatException; +import net.i2p.util.Log; +import net.i2p.util.SecureFileOutputStream; + +/** + * Retrieve / Store the local DHT in a file + * + */ +abstract class PersistDHT { + + public static synchronized void loadDHT(KRPC krpc, File file) { + Log log = I2PAppContext.getGlobalContext().logManager().getLog(PersistDHT.class); + int count = 0; + FileInputStream in = null; + try { + in = new FileInputStream(file); + BufferedReader br = new BufferedReader(new InputStreamReader(in, "ISO-8859-1")); + String line = null; + while ( (line = br.readLine()) != null) { + if (line.startsWith("#")) + continue; + try { + krpc.addNode(new NodeInfo(line)); + count++; + // TODO limit number? this will flush the router's SDS caches + } catch (IllegalArgumentException iae) { + if (log.shouldLog(Log.WARN)) + log.warn("Error reading DHT entry", iae); + } catch (DataFormatException dfe) { + if (log.shouldLog(Log.WARN)) + log.warn("Error reading DHT entry", dfe); + } + } + } catch (IOException ioe) { + if (log.shouldLog(Log.WARN) && file.exists()) + log.warn("Error reading the DHT File", ioe); + } finally { + if (in != null) try { in.close(); } catch (IOException ioe) {} + } + if (log.shouldLog(Log.INFO)) + log.info("Loaded " + count + " nodes from " + file); + } + + public static synchronized void saveDHT(DHTNodes nodes, File file) { + Log log = I2PAppContext.getGlobalContext().logManager().getLog(PersistDHT.class); + int count = 0; + PrintWriter out = null; + try { + out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new SecureFileOutputStream(file), "ISO-8859-1"))); + out.println("# DHT nodes, format is NID:Hash:Destination:port"); + for (NodeInfo ni : nodes.values()) { + // DHTNodes shouldn't contain us, if that changes check here + out.println(ni.toPersistentString()); + count++; + } + } catch (IOException ioe) { + if (log.shouldLog(Log.WARN)) + log.warn("Error writing the DHT File", ioe); + } finally { + if (out != null) out.close(); + } + if (log.shouldLog(Log.INFO)) + log.info("Stored " + count + " nodes to " + file); + } +}