From b37bb9372e6e0fbc76398e416e545ca3a8a4cc79 Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 15 Dec 2005 08:58:30 +0000 Subject: [PATCH] 2005-12-15 jrandom * Added multitorrent support to I2PSnark, accessible currently by running "i2psnark.jar --config i2psnark.config" (which may or may not exist). It then joins the swarm for any torrents in ./i2psnark/*.torrent, saving their data in that directory as well. Removing the .torrent file stops participation, and it is currently set to seed indefinitely. Completion is logged to the logger and standard output, with further UI interaction left to the (work in progress) web UI. --- .../src/org/klomp/snark/I2PSnarkUtil.java | 2 +- .../java/src/org/klomp/snark/Peer.java | 5 + .../src/org/klomp/snark/PeerAcceptor.java | 93 +++++- .../src/org/klomp/snark/PeerCoordinator.java | 10 + .../org/klomp/snark/PeerCoordinatorSet.java | 36 +++ .../java/src/org/klomp/snark/Snark.java | 113 ++++++-- .../src/org/klomp/snark/SnarkManager.java | 266 ++++++++++++++++++ .../java/src/org/klomp/snark/Storage.java | 17 +- .../src/org/klomp/snark/StorageListener.java | 6 + .../src/org/klomp/snark/TrackerClient.java | 35 ++- history.txt | 11 +- 11 files changed, 545 insertions(+), 49 deletions(-) create mode 100644 apps/i2psnark/java/src/org/klomp/snark/PeerCoordinatorSet.java create mode 100644 apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index c0ebf0fca2..25da3c2983 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -145,7 +145,7 @@ public class I2PSnarkUtil { int destEnd = origAnnounce.indexOf(".i2p"); int pathStart = origAnnounce.indexOf('/', destEnd); String rv = "http://i2p/" + origAnnounce.substring(destStart, destEnd) + origAnnounce.substring(pathStart); - _log.debug("Rewriting [" + origAnnounce + "] as [" + rv + "]"); + //_log.debug("Rewriting [" + origAnnounce + "] as [" + rv + "]"); return rv; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index e743a9312d..aafe529f43 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -28,9 +28,11 @@ import java.util.Map; import org.klomp.snark.bencode.*; import net.i2p.client.streaming.I2PSocket; +import net.i2p.util.Log; public class Peer implements Comparable { + private Log _log = new Log(Peer.class); // Identifying property, the peer id of the other side. private final PeerID peerID; @@ -58,6 +60,7 @@ public class Peer implements Comparable this.peerID = peerID; this.my_id = my_id; this.metainfo = metainfo; + _log.debug("Creating a new peer with " + peerID.getAddress().calculateHash().toBase64(), new Exception("creating")); } /** @@ -78,6 +81,7 @@ public class Peer implements Comparable byte[] id = handshake(bis, bos); this.peerID = new PeerID(id, sock.getPeerDestination()); + _log.debug("Creating a new peer with " + peerID.getAddress().calculateHash().toBase64(), new Exception("creating")); } /** @@ -145,6 +149,7 @@ public class Peer implements Comparable if (state != null) throw new IllegalStateException("Peer already started"); + _log.debug("Running connection to " + peerID.getAddress().calculateHash().toBase64(), new Exception("connecting")); try { // Do we need to handshake? diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerAcceptor.java b/apps/i2psnark/java/src/org/klomp/snark/PeerAcceptor.java index 966a6a455f..c09f92b3d2 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerAcceptor.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerAcceptor.java @@ -22,8 +22,11 @@ package org.klomp.snark; import java.io.*; import java.net.*; +import java.util.Iterator; import net.i2p.client.streaming.I2PSocket; +import net.i2p.data.Base64; +import net.i2p.data.DataHelper; /** * Accepts incomming connections from peers. The ConnectionAcceptor @@ -34,29 +37,91 @@ import net.i2p.client.streaming.I2PSocket; public class PeerAcceptor { private final PeerCoordinator coordinator; + private final PeerCoordinatorSet coordinators; public PeerAcceptor(PeerCoordinator coordinator) { this.coordinator = coordinator; + this.coordinators = null; } + + public PeerAcceptor(PeerCoordinatorSet coordinators) + { + this.coordinators = coordinators; + this.coordinator = null; + } + + private static final int LOOKAHEAD_SIZE = "19".length() + + "BitTorrent protocol".length() + + 8 + // blank, reserved + 20; // infohash public void connection(I2PSocket socket, BufferedInputStream bis, BufferedOutputStream bos) throws IOException { - if (coordinator.needPeers()) - { - // XXX: inside this Peer constructor's handshake is where you'd deal with the other - // side saying they want to communicate with another torrent - aka multitorrent - // support. you'd then want to grab the meta info /they/ want, look that up in - // our own list of active torrents, and put it on the right coordinator for it. - // this currently, however, throws an IOException if the metainfo doesn't match - // coodinator.getMetaInfo (Peer.java:242) - Peer peer = new Peer(socket, bis, bos, coordinator.getID(), - coordinator.getMetaInfo()); - coordinator.addPeer(peer); - } - else - socket.close(); + // inside this Peer constructor's handshake is where you'd deal with the other + // side saying they want to communicate with another torrent - aka multitorrent + // support, but because of how the protocol works, we can get away with just reading + // ahead the first $LOOKAHEAD_SIZE bytes to figure out which infohash they want to + // talk about, and we can just look for that in our list of active torrents. + bis.mark(LOOKAHEAD_SIZE); + byte peerInfoHash[] = readHash(bis); + bis.reset(); + if (coordinator != null) { + // single torrent capability + MetaInfo meta = coordinator.getMetaInfo(); + if (DataHelper.eq(meta.getInfoHash(), peerInfoHash)) { + if (coordinator.needPeers()) + { + Peer peer = new Peer(socket, bis, bos, coordinator.getID(), + coordinator.getMetaInfo()); + coordinator.addPeer(peer); + } + else + socket.close(); + } else { + // its for another infohash, but we are only single torrent capable. b0rk. + throw new IOException("Peer wants another torrent (" + Base64.encode(peerInfoHash) + + ") while we only support (" + Base64.encode(meta.getInfoHash()) + ")"); + } + } else { + // multitorrent capable, so lets see what we can handle + for (Iterator iter = coordinators.iterator(); iter.hasNext(); ) { + PeerCoordinator cur = (PeerCoordinator)iter.next(); + MetaInfo meta = cur.getMetaInfo(); + + if (DataHelper.eq(meta.getInfoHash(), peerInfoHash)) { + if (cur.needPeers()) + { + Peer peer = new Peer(socket, bis, bos, cur.getID(), + cur.getMetaInfo()); + cur.addPeer(peer); + return; + } + else + { + socket.close(); + return; + } + } + } + // this is only reached if none of the coordinators match the infohash + throw new IOException("Peer wants another torrent (" + Base64.encode(peerInfoHash) + + ") while we don't support that hash"); + } + } + + /** + * Read ahead to the infohash, throwing an exception if there isn't enough data + */ + private byte[] readHash(InputStream in) throws IOException { + byte buf[] = new byte[LOOKAHEAD_SIZE]; + int read = DataHelper.read(in, buf); + if (read != buf.length) + throw new IOException("Unable to read the hash (read " + read + ")"); + byte rv[] = new byte[20]; + System.arraycopy(buf, buf.length-rv.length-1, rv, 0, rv.length); + return rv; } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 51ea8ff028..7d8e77555c 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -23,11 +23,14 @@ package org.klomp.snark; import java.util.*; import java.io.IOException; +import net.i2p.util.Log; + /** * Coordinates what peer does what. */ public class PeerCoordinator implements PeerListener { + private final Log _log = new Log(PeerCoordinator.class); final MetaInfo metainfo; final Storage storage; @@ -80,6 +83,9 @@ public class PeerCoordinator implements PeerListener // Install a timer to check the uploaders. timer.schedule(new PeerCheckerTask(this), CHECK_PERIOD, CHECK_PERIOD); } + + public Storage getStorage() { return storage; } + public CoordinatorListener getListener() { return listener; } public byte[] getID() { @@ -137,6 +143,8 @@ public class PeerCoordinator implements PeerListener return !halted && peers.size() < MAX_CONNECTIONS; } } + + public boolean halted() { return halted; } public void halt() { @@ -215,6 +223,8 @@ public class PeerCoordinator implements PeerListener if (need_more) { + _log.debug("Addng a peer " + peer.getPeerID().getAddress().calculateHash().toBase64(), new Exception("add/run")); + // Run the peer with us as listener and the current bitfield. final PeerListener listener = this; final BitField bitfield = storage.getBitField(); diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinatorSet.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinatorSet.java new file mode 100644 index 0000000000..8c89091cab --- /dev/null +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinatorSet.java @@ -0,0 +1,36 @@ +package org.klomp.snark; + +import java.util.*; + +/** + * Hmm, any guesses as to what this is? Used by the multitorrent functionality + * in the PeerAcceptor to pick the right PeerCoordinator to accept the con for. + * Each PeerCoordinator is added to the set from within the Snark (and removed + * from it there too) + */ +public class PeerCoordinatorSet { + private static final PeerCoordinatorSet _instance = new PeerCoordinatorSet(); + public static final PeerCoordinatorSet instance() { return _instance; } + private Set _coordinators; + + private PeerCoordinatorSet() { + _coordinators = new HashSet(); + } + + public Iterator iterator() { + synchronized (_coordinators) { + return new ArrayList(_coordinators).iterator(); + } + } + + public void add(PeerCoordinator coordinator) { + synchronized (_coordinators) { + _coordinators.add(coordinator); + } + } + public void remove(PeerCoordinator coordinator) { + synchronized (_coordinators) { + _coordinators.remove(coordinator); + } + } +} diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index b09a0d4eb4..3f0768c9aa 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -85,13 +85,27 @@ public class Snark "Commands: 'info', 'list', 'quit'."; // String indicating main activity - static String activity = "Not started"; + String activity = "Not started"; public static void main(String[] args) { System.out.println(copyright); System.out.println(); + if ( (args.length > 0) && ("--config".equals(args[0])) ) { + SnarkManager sm = SnarkManager.instance(); + if (args.length > 1) + sm.loadConfig(args[1]); + System.out.println("Running in multitorrent mode"); + while (true) { + try { + synchronized (sm) { + sm.wait(); + } + } catch (InterruptedException ie) {} + } + } + // Parse debug, share/ip and torrent file options. Snark snark = parseArguments(args); @@ -101,7 +115,7 @@ public class Snark snark.acceptor, snark.trackerclient, snark); - Runtime.getRuntime().addShutdownHook(snarkhook); + //Runtime.getRuntime().addShutdownHook(snarkhook); Timer timer = new Timer(true); TimerTask monitor = new PeerMonitorTask(snark.coordinator); @@ -130,15 +144,15 @@ public class Snark quit = true; else if ("list".equals(line)) { - synchronized(coordinator.peers) + synchronized(snark.coordinator.peers) { - System.out.println(coordinator.peers.size() + System.out.println(snark.coordinator.peers.size() + " peers -" + " (i)nterested," + " (I)nteresting," + " (c)hoking," + " (C)hoked:"); - Iterator it = coordinator.peers.iterator(); + Iterator it = snark.coordinator.peers.iterator(); while (it.hasNext()) { Peer peer = (Peer)it.next(); @@ -152,18 +166,18 @@ public class Snark } else if ("info".equals(line)) { - System.out.println("Name: " + meta.getName()); - System.out.println("Torrent: " + torrent); - System.out.println("Tracker: " + meta.getAnnounce()); - List files = meta.getFiles(); + System.out.println("Name: " + snark.meta.getName()); + System.out.println("Torrent: " + snark.torrent); + System.out.println("Tracker: " + snark.meta.getAnnounce()); + List files = snark.meta.getFiles(); System.out.println("Files: " + ((files == null) ? 1 : files.size())); - System.out.println("Pieces: " + meta.getPieces()); + System.out.println("Pieces: " + snark.meta.getPieces()); System.out.println("Piece size: " - + meta.getPieceLength(0) / 1024 + + snark.meta.getPieceLength(0) / 1024 + " KB"); System.out.println("Total size: " - + meta.getTotalLength() / (1024 * 1024) + + snark.meta.getTotalLength() / (1024 * 1024) + " MB"); } else if ("".equals(line) || "help".equals(line)) @@ -195,15 +209,20 @@ public class Snark } } - static String torrent; - static MetaInfo meta; - static Storage storage; - static PeerCoordinator coordinator; - static ConnectionAcceptor acceptor; - static TrackerClient trackerclient; + String torrent; + MetaInfo meta; + Storage storage; + PeerCoordinator coordinator; + ConnectionAcceptor acceptor; + TrackerClient trackerclient; + String rootDataDir = "."; - private Snark(String torrent, String ip, int user_port, - StorageListener slistener, CoordinatorListener clistener) + Snark(String torrent, String ip, int user_port, + StorageListener slistener, CoordinatorListener clistener) { + this(torrent, ip, user_port, slistener, clistener, true, "."); + } + Snark(String torrent, String ip, int user_port, + StorageListener slistener, CoordinatorListener clistener, boolean start, String rootDir) { if (slistener == null) slistener = this; @@ -212,6 +231,7 @@ public class Snark clistener = this; this.torrent = torrent; + this.rootDataDir = rootDir; activity = "Network setup"; @@ -319,7 +339,7 @@ public class Snark { activity = "Checking storage"; storage = new Storage(meta, slistener); - storage.check(); + storage.check(rootDataDir); } catch (IOException ioe) { @@ -329,13 +349,53 @@ public class Snark activity = "Collecting pieces"; coordinator = new PeerCoordinator(id, meta, storage, clistener); - PeerAcceptor peeracceptor = new PeerAcceptor(coordinator); + PeerCoordinatorSet set = PeerCoordinatorSet.instance(); + set.add(coordinator); + PeerAcceptor peeracceptor = new PeerAcceptor(set); //coordinator); ConnectionAcceptor acceptor = new ConnectionAcceptor(serversocket, peeracceptor); trackerclient = new TrackerClient(meta, coordinator); + if (start) + startTorrent(); + } + /** + * Start up contacting peers and querying the tracker + */ + public void startTorrent() { + boolean coordinatorChanged = false; + if (coordinator.halted()) { + // ok, we have already started and stopped, but the coordinator seems a bit annoying to + // restart safely, so lets build a new one to replace the old + PeerCoordinatorSet set = PeerCoordinatorSet.instance(); + set.remove(coordinator); + PeerCoordinator newCoord = new PeerCoordinator(coordinator.getID(), coordinator.getMetaInfo(), + coordinator.getStorage(), coordinator.getListener()); + set.add(newCoord); + coordinator = newCoord; + coordinatorChanged = true; + } + if (trackerclient.halted() || coordinatorChanged) { + TrackerClient newClient = new TrackerClient(coordinator.getMetaInfo(), coordinator); + if (!trackerclient.halted()) + trackerclient.halt(); + trackerclient = newClient; + } trackerclient.start(); - + } + /** + * Stop contacting the tracker and talking with peers + */ + public void stopTorrent() { + trackerclient.halt(); + coordinator.halt(); + try { + storage.close(); + } catch (IOException ioe) { + System.out.println("Error closing " + torrent); + ioe.printStackTrace(); + } + PeerCoordinatorSet.instance().remove(coordinator); } static Snark parseArguments(String[] args) @@ -588,6 +648,13 @@ public class Snark allChecked = true; checking = false; } + + public void storageCompleted(Storage storage) + { + Snark.debug("Completely received " + torrent, Snark.INFO); + //storage.close(); + System.out.println("Completely received: " + torrent); + } public void shutdown() { diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java new file mode 100644 index 0000000000..17cd1db4a7 --- /dev/null +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -0,0 +1,266 @@ +package org.klomp.snark; + +import java.io.*; +import java.util.*; +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; +import net.i2p.util.I2PThread; +import net.i2p.util.Log; + +/** + * Manage multiple snarks + */ +public class SnarkManager { + private static SnarkManager _instance = new SnarkManager(); + public static SnarkManager instance() { return _instance; } + + /** map of (canonical) filename to Snark instance (unsynchronized) */ + private Map _snarks; + private String _configFile; + private Properties _config; + private I2PAppContext _context; + private Log _log; + private List _messages; + + public static final String PROP_I2CP_HOST = "i2psnark.i2cpHost"; + public static final String PROP_I2CP_PORT = "i2psnark.i2cpPort"; + public static final String PROP_I2CP_OPTS = "i2psnark.i2cpOptions"; + public static final String PROP_EEP_HOST = "i2psnark.eepHost"; + public static final String PROP_EEP_PORT = "i2psnark.eepPort"; + public static final String PROP_DIR = "i2psnark.dir"; + + private SnarkManager() { + _snarks = new HashMap(); + _context = I2PAppContext.getGlobalContext(); + _log = _context.logManager().getLog(SnarkManager.class); + _messages = new ArrayList(16); + loadConfig("i2psnark.config"); + I2PThread monitor = new I2PThread(new DirMonitor(), "Snark DirMonitor"); + monitor.setDaemon(true); + monitor.start(); + } + + private static final int MAX_MESSAGES = 10; + public void addMessage(String message) { + synchronized (_messages) { + _messages.add(message); + while (_messages.size() > MAX_MESSAGES) + _messages.remove(0); + } + _log.info("MSG: " + message); + } + + private boolean shouldAutoStart() { return true; } + private int getStartupDelayMinutes() { return 1; } + private File getDataDir() { + String dir = _config.getProperty(PROP_DIR); + if ( (dir == null) || (dir.trim().length() <= 0) ) + dir = "i2psnark"; + return new File(dir); + } + + public void loadConfig(String filename) { + _configFile = filename; + if (_config == null) + _config = new Properties(); + File cfg = new File(filename); + if (cfg.exists()) { + try { + DataHelper.loadProps(_config, cfg); + } catch (IOException ioe) { + _log.error("Error loading I2PSnark config '" + filename + "'", ioe); + } + } + // now add sane defaults + if (!_config.containsKey(PROP_I2CP_HOST)) + _config.setProperty(PROP_I2CP_HOST, "localhost"); + if (!_config.containsKey(PROP_I2CP_PORT)) + _config.setProperty(PROP_I2CP_PORT, "7654"); + if (!_config.containsKey(PROP_EEP_HOST)) + _config.setProperty(PROP_EEP_HOST, "localhost"); + if (!_config.containsKey(PROP_EEP_PORT)) + _config.setProperty(PROP_EEP_PORT, "4444"); + if (!_config.containsKey(PROP_DIR)) + _config.setProperty(PROP_DIR, "i2psnark"); + updateConfig(); + } + + private void updateConfig() { + String i2cpHost = _config.getProperty(PROP_I2CP_HOST); + int i2cpPort = getInt(PROP_I2CP_PORT, 7654); + String opts = _config.getProperty(PROP_I2CP_OPTS); + Properties i2cpOpts = new Properties(); + if (opts != null) { + StringTokenizer tok = new StringTokenizer(opts, " "); + while (tok.hasMoreTokens()) { + String pair = tok.nextToken(); + int split = pair.indexOf('='); + if (split > 0) + i2cpOpts.setProperty(pair.substring(0, split), pair.substring(split+1)); + } + } + if (i2cpHost != null) { + I2PSnarkUtil.instance().setI2CPConfig(i2cpHost, i2cpPort, i2cpOpts); + _log.debug("Configuring with I2CP options " + i2cpOpts); + } + //I2PSnarkUtil.instance().setI2CPConfig("66.111.51.110", 7654, new Properties()); + String eepHost = _config.getProperty(PROP_EEP_HOST); + int eepPort = getInt(PROP_EEP_PORT, 4444); + if (eepHost != null) + I2PSnarkUtil.instance().setProxy(eepHost, eepPort); + getDataDir().mkdirs(); + } + + private int getInt(String prop, int defaultVal) { + String p = _config.getProperty(prop); + try { + if ( (p != null) && (p.trim().length() > 0) ) + return Integer.parseInt(p.trim()); + } catch (NumberFormatException nfe) { + // ignore + } + return defaultVal; + } + + public void saveConfig() { + try { + DataHelper.storeProps(_config, new File(_configFile)); + } catch (IOException ioe) { + addMessage("Unable to save the config to '" + _configFile + "'"); + } + } + + /** set of filenames that we are dealing with */ + public Set listTorrentFiles() { synchronized (_snarks) { return new HashSet(_snarks.keySet()); } } + /** + * Grab the torrent given the (canonical) filename + */ + public Snark getTorrent(String filename) { synchronized (_snarks) { return (Snark)_snarks.get(filename); } } + public void addTorrent(String filename) { + File sfile = new File(filename); + try { + filename = sfile.getCanonicalPath(); + } catch (IOException ioe) { + _log.error("Unable to add the torrent " + filename, ioe); + addMessage("ERR: Could not add the torrent '" + filename + "': " + ioe.getMessage()); + return; + } + File dataDir = getDataDir(); + Snark torrent = null; + synchronized (_snarks) { + torrent = (Snark)_snarks.get(filename); + if (torrent == null) { + torrent = new Snark(filename, null, -1, null, null, false, dataDir.getPath()); + _snarks.put(filename, torrent); + } else { + return; + } + } + // ok, snark created, now lets start it up or configure it further + if (shouldAutoStart()) { + torrent.startTorrent(); + addMessage("Torrent added and started: '" + filename + "'"); + } else { + addMessage("Torrent added: '" + filename + "'"); + } + } + + /** + * Stop the torrent, leaving it on the list of torrents unless told to remove it + */ + public Snark stopTorrent(String filename, boolean shouldRemove) { + File sfile = new File(filename); + try { + filename = sfile.getCanonicalPath(); + } catch (IOException ioe) { + _log.error("Unable to remove the torrent " + filename, ioe); + addMessage("ERR: Could not remove the torrent '" + filename + "': " + ioe.getMessage()); + return null; + } + int remaining = 0; + Snark torrent = null; + synchronized (_snarks) { + if (shouldRemove) + torrent = (Snark)_snarks.remove(filename); + else + torrent = (Snark)_snarks.get(filename); + remaining = _snarks.size(); + } + if (torrent != null) { + torrent.stopTorrent(); + if (remaining == 0) { + // should we disconnect/reconnect here (taking care to deal with the other thread's + // I2PServerSocket.accept() call properly?) + ////I2PSnarkUtil.instance(). + } + addMessage("Torrent stopped: '" + filename + "'"); + } + return torrent; + } + /** + * Stop the torrent and delete the torrent file itself, but leaving the data + * behind. + */ + public void removeTorrent(String filename) { + Snark torrent = stopTorrent(filename, true); + if (torrent != null) { + File torrentFile = new File(filename); + torrentFile.delete(); + addMessage("Torrent removed: '" + filename + "'"); + } + } + + private class DirMonitor implements Runnable { + public void run() { + try { Thread.sleep(60*1000*getStartupDelayMinutes()); } catch (InterruptedException ie) {} + while (true) { + File dir = getDataDir(); + _log.debug("Directory Monitor loop over " + dir.getAbsolutePath()); + monitorTorrents(dir); + try { Thread.sleep(60*1000); } catch (InterruptedException ie) {} + } + } + } + + private void monitorTorrents(File dir) { + String fileNames[] = dir.list(TorrentFilenameFilter.instance()); + List foundNames = new ArrayList(0); + if (fileNames != null) { + for (int i = 0; i < fileNames.length; i++) { + try { + foundNames.add(new File(dir, fileNames[i]).getCanonicalPath()); + } catch (IOException ioe) { + _log.error("Error resolving '" + fileNames[i] + "' in '" + dir, ioe); + } + } + } + + Set existingNames = listTorrentFiles(); + // lets find new ones first... + for (int i = 0; i < foundNames.size(); i++) { + if (existingNames.contains(foundNames.get(i))) { + // already known. noop + } else { + addTorrent((String)foundNames.get(i)); + } + } + // now lets see which ones have been removed... + for (Iterator iter = existingNames.iterator(); iter.hasNext(); ) { + String name = (String)iter.next(); + if (foundNames.contains(name)) { + // known and still there. noop + } else { + // known, but removed. drop it + stopTorrent(name, true); + } + } + } + + private static class TorrentFilenameFilter implements FilenameFilter { + private static final TorrentFilenameFilter _filter = new TorrentFilenameFilter(); + public static TorrentFilenameFilter instance() { return _filter; } + public boolean accept(File dir, String name) { + return (name != null) && (name.endsWith(".torrent")); + } + } +} diff --git a/apps/i2psnark/java/src/org/klomp/snark/Storage.java b/apps/i2psnark/java/src/org/klomp/snark/Storage.java index f4c53bb46d..09f702175a 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Storage.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Storage.java @@ -127,7 +127,7 @@ public class Storage } // Creates piece hases for a new storage. - public void create() throws IOException + private void create() throws IOException { // Calculate piece_hashes MessageDigest digest = null; @@ -240,9 +240,9 @@ public class Storage /** * Creates (and/or checks) all files from the metainfo file list. */ - public void check() throws IOException + public void check(String rootDir) throws IOException { - File base = new File(filterName(metainfo.getName())); + File base = new File(rootDir, filterName(metainfo.getName())); List files = metainfo.getFiles(); if (files == null) @@ -368,8 +368,11 @@ public class Storage } } - if (listener != null) + if (listener != null) { listener.storageAllChecked(this); + if (needed <= 0) + listener.storageCompleted(this); + } } private void allocateFile(int nr) throws IOException @@ -483,6 +486,12 @@ public class Storage } } + if (complete) { + listener.storageCompleted(this); + // do we also need to close all of the files and reopen + // them readonly? + } + return true; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/StorageListener.java b/apps/i2psnark/java/src/org/klomp/snark/StorageListener.java index 1cb5c11939..c9fcf5ae65 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/StorageListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/StorageListener.java @@ -49,4 +49,10 @@ public interface StorageListener * storage is known. */ void storageAllChecked(Storage storage); + + /** + * Called the one time when the data is completely received and checked. + * + */ + void storageCompleted(Storage storage); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index 5cf91973f6..9a07b476df 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -64,6 +64,13 @@ public class TrackerClient extends Thread stop = false; } + public void start() { + stop = false; + super.start(); + } + + public boolean halted() { return stop; } + /** * Interrupts this Thread to stop it. */ @@ -100,9 +107,16 @@ public class TrackerClient extends Thread TrackerInfo info = doRequest(announce, infoHash, peerID, uploaded, downloaded, left, STARTED_EVENT); - Iterator it = info.getPeers().iterator(); - while (it.hasNext()) - coordinator.addPeer((Peer)it.next()); + if (!completed) { + Iterator it = info.getPeers().iterator(); + while (it.hasNext()) { + Peer cur = (Peer)it.next(); + coordinator.addPeer(cur); + int delay = 3000; + int c = ((int)cur.getPeerID().getAddress().calculateHash().toBase64().charAt(0)) % 10; + try { Thread.sleep(delay * c); } catch (InterruptedException ie) {} + } + } started = true; } catch (IOException ioe) @@ -168,9 +182,18 @@ public class TrackerClient extends Thread uploaded, downloaded, left, event); - Iterator it = info.getPeers().iterator(); - while (it.hasNext()) - coordinator.addPeer((Peer)it.next()); + if ( (left > 0) && (!completed) ) { + // we only want to talk to new people if we need things + // from them (duh) + Iterator it = info.getPeers().iterator(); + while (it.hasNext()) { + Peer cur = (Peer)it.next(); + coordinator.addPeer(cur); + int delay = 3000; + int c = ((int)cur.getPeerID().getAddress().calculateHash().toBase64().charAt(0)) % 10; + try { Thread.sleep(delay * c); } catch (InterruptedException ie) {} + } + } } catch (IOException ioe) { diff --git a/history.txt b/history.txt index 8bee02610a..9f2506acad 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,13 @@ -$Id: history.txt,v 1.354 2005/12/13 16:56:41 jrandom Exp $ +$Id: history.txt,v 1.355 2005/12/14 04:32:52 jrandom Exp $ + +2005-12-15 jrandom + * Added multitorrent support to I2PSnark, accessible currently by running + "i2psnark.jar --config i2psnark.config" (which may or may not exist). + It then joins the swarm for any torrents in ./i2psnark/*.torrent, saving + their data in that directory as well. Removing the .torrent file stops + participation, and it is currently set to seed indefinitely. Completion + is logged to the logger and standard output, with further UI interaction + left to the (work in progress) web UI. 2005-12-14 jrandom * Fix to drop peer references when we shitlist people again (thanks zzz!)