i2psnark:

- Refactor to allow running a single Snark without a SnarkManager again,
    by moving some things from SnarkManager to I2PSnarkUtil,
    having Snark call completeListener callbacks,
    and having Storage call storageListener callbacks.
    This is in preparation for using Snark for router updates.
    Step 2 is to allow multiple I2PSnarkUtil instances.
  - Big rewrite of Storage to open file descriptors on demand, and
    close them when unused, so we can support large numbers of torrents.
This commit is contained in:
zzz
2008-11-15 23:52:40 +00:00
parent de21a5ec48
commit fa23a7b066
7 changed files with 237 additions and 98 deletions

View File

@ -2,12 +2,15 @@ package org.klomp.snark;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
@ -44,6 +47,12 @@ public class I2PSnarkUtil {
private int _maxUploaders;
private int _maxUpBW;
public static final String PROP_USE_OPENTRACKERS = "i2psnark.useOpentrackers";
public static final boolean DEFAULT_USE_OPENTRACKERS = true;
public static final String PROP_OPENTRACKERS = "i2psnark.opentrackers";
public static final String DEFAULT_OPENTRACKERS = "http://tracker.welterde.i2p/a";
public static final int DEFAULT_MAX_UP_BW = 8; //KBps
private I2PSnarkUtil() {
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(Snark.class);
@ -53,6 +62,7 @@ public class I2PSnarkUtil {
_shitlist = new HashSet(64);
_configured = false;
_maxUploaders = Snark.MAX_TOTAL_UPLOADERS;
_maxUpBW = DEFAULT_MAX_UP_BW;
}
/**
@ -267,6 +277,36 @@ public class I2PSnarkUtil {
return rv;
}
public String getOpenTrackerString() {
String rv = (String) _opts.get(PROP_OPENTRACKERS);
if (rv == null)
return DEFAULT_OPENTRACKERS;
return rv;
}
/** comma delimited list open trackers to use as backups */
/** sorted map of name to announceURL=baseURL */
public List getOpenTrackers() {
if (!shouldUseOpenTrackers())
return null;
List rv = new ArrayList(1);
String trackers = getOpenTrackerString();
StringTokenizer tok = new StringTokenizer(trackers, ", ");
while (tok.hasMoreTokens())
rv.add(tok.nextToken());
if (rv.size() <= 0)
return null;
return rv;
}
public boolean shouldUseOpenTrackers() {
String rv = (String) _opts.get(PROP_USE_OPENTRACKERS);
if (rv == null)
return DEFAULT_USE_OPENTRACKERS;
return Boolean.valueOf(rv).booleanValue();
}
/** hook between snark's logger and an i2p log */
void debug(String msg, int snarkDebugLevel, Throwable t) {
if (t instanceof OutOfMemoryError) {

View File

@ -32,7 +32,7 @@ import java.util.TimerTask;
*/
class PeerCheckerTask extends TimerTask
{
private final long KILOPERSECOND = 1024*(PeerCoordinator.CHECK_PERIOD/1000);
private static final long KILOPERSECOND = 1024*(PeerCoordinator.CHECK_PERIOD/1000);
private final PeerCoordinator coordinator;
@ -247,6 +247,10 @@ class PeerCheckerTask extends TimerTask
// store the rates
coordinator.setRateHistory(uploaded, downloaded);
// close out unused files, but we don't need to do it every time
if (random.nextInt(4) == 0)
coordinator.getStorage().cleanRAFs();
}
}
}

View File

@ -247,10 +247,11 @@ public class Snark
Snark(String torrent, String ip, int user_port,
StorageListener slistener, CoordinatorListener clistener) {
this(torrent, ip, user_port, slistener, clistener, true, ".");
this(torrent, ip, user_port, slistener, clistener, null, true, ".");
}
Snark(String torrent, String ip, int user_port,
StorageListener slistener, CoordinatorListener clistener, boolean start, String rootDir)
public Snark(String torrent, String ip, int user_port,
StorageListener slistener, CoordinatorListener clistener,
CompleteListener complistener, boolean start, String rootDir)
{
if (slistener == null)
slistener = this;
@ -258,6 +259,8 @@ public class Snark
//if (clistener == null)
// clistener = this;
completeListener = complistener;
this.torrent = torrent;
this.rootDataDir = rootDir;
@ -379,7 +382,13 @@ public class Snark
{
activity = "Checking storage";
storage = new Storage(meta, slistener);
if (completeListener != null) {
storage.check(rootDataDir,
completeListener.getSavedTorrentTime(this),
completeListener.getSavedTorrentBitField(this));
} else {
storage.check(rootDataDir);
}
// have to figure out when to reopen
// if (!start)
// storage.close();
@ -480,14 +489,15 @@ public class Snark
pc.halt();
Storage st = storage;
if (st != null) {
if (storage.changed)
SnarkManager.instance().saveTorrentStatus(storage.getMetaInfo(), storage.getBitField());
boolean changed = storage.changed;
try {
storage.close();
} catch (IOException ioe) {
System.out.println("Error closing " + torrent);
ioe.printStackTrace();
}
if (changed && completeListener != null)
completeListener.updateStatus(this);
}
if (pc != null)
PeerCoordinatorSet.instance().remove(pc);
@ -743,6 +753,8 @@ public class Snark
allChecked = true;
checking = false;
if (storage.changed && completeListener != null)
completeListener.updateStatus(this);
}
public void storageCompleted(Storage storage)
@ -768,6 +780,10 @@ public class Snark
public interface CompleteListener {
public void torrentComplete(Snark snark);
public void updateStatus(Snark snark);
// not really listeners but the easiest way to get back to an optional SnarkManager
public long getSavedTorrentTime(Snark snark);
public BitField getSavedTorrentBitField(Snark snark);
}
/** Maintain a configurable total uploader cap

View File

@ -51,10 +51,6 @@ public class SnarkManager implements Snark.CompleteListener {
public static final String PROP_AUTO_START = "i2snark.autoStart"; // oops
public static final String DEFAULT_AUTO_START = "false";
public static final String PROP_USE_OPENTRACKERS = "i2psnark.useOpentrackers";
public static final String DEFAULT_USE_OPENTRACKERS = "true";
public static final String PROP_OPENTRACKERS = "i2psnark.opentrackers";
public static final String DEFAULT_OPENTRACKERS = "http://tracker.welterde.i2p/a";
public static final String PROP_LINK_PREFIX = "i2psnark.linkPrefix";
public static final String DEFAULT_LINK_PREFIX = "file:///";
@ -98,9 +94,6 @@ public class SnarkManager implements Snark.CompleteListener {
public boolean shouldAutoStart() {
return Boolean.valueOf(_config.getProperty(PROP_AUTO_START, DEFAULT_AUTO_START+"")).booleanValue();
}
public boolean shouldUseOpenTrackers() {
return Boolean.valueOf(_config.getProperty(PROP_USE_OPENTRACKERS, DEFAULT_USE_OPENTRACKERS)).booleanValue();
}
public String linkPrefix() {
return _config.getProperty(PROP_LINK_PREFIX, DEFAULT_LINK_PREFIX + getDataDir().getAbsolutePath() + File.separatorChar);
}
@ -322,14 +315,14 @@ public class SnarkManager implements Snark.CompleteListener {
addMessage("Adjusted autostart to " + autoStart);
changed = true;
}
if (shouldUseOpenTrackers() != useOpenTrackers) {
_config.setProperty(PROP_USE_OPENTRACKERS, useOpenTrackers + "");
if (I2PSnarkUtil.instance().shouldUseOpenTrackers() != useOpenTrackers) {
_config.setProperty(I2PSnarkUtil.PROP_USE_OPENTRACKERS, useOpenTrackers + "");
addMessage((useOpenTrackers ? "En" : "Dis") + "abled open trackers - torrent restart required to take effect");
changed = true;
}
if (openTrackers != null) {
if (openTrackers.trim().length() > 0 && !openTrackers.trim().equals(getOpenTrackerString())) {
_config.setProperty(PROP_OPENTRACKERS, openTrackers.trim());
if (openTrackers.trim().length() > 0 && !openTrackers.trim().equals(I2PSnarkUtil.instance().getOpenTrackerString())) {
_config.setProperty(I2PSnarkUtil.PROP_OPENTRACKERS, openTrackers.trim());
addMessage("Open Tracker list changed - torrent restart required to take effect");
changed = true;
}
@ -407,7 +400,7 @@ public class SnarkManager implements Snark.CompleteListener {
addMessage(rejectMessage);
return;
} else {
torrent = new Snark(filename, null, -1, null, null, false, dataDir.getPath());
torrent = new Snark(filename, null, -1, null, null, this, false, dataDir.getPath());
torrent.completeListener = this;
synchronized (_snarks) {
_snarks.put(filename, torrent);
@ -438,7 +431,8 @@ public class SnarkManager implements Snark.CompleteListener {
/**
* Get the timestamp for a torrent from the config file
*/
public long getSavedTorrentTime(MetaInfo metainfo) {
public long getSavedTorrentTime(Snark snark) {
MetaInfo metainfo = snark.meta;
byte[] ih = metainfo.getInfoHash();
String infohash = Base64.encode(ih);
infohash = infohash.replace('=', '$');
@ -457,7 +451,8 @@ public class SnarkManager implements Snark.CompleteListener {
* Get the saved bitfield for a torrent from the config file.
* Convert "." to a full bitfield.
*/
public BitField getSavedTorrentBitField(MetaInfo metainfo) {
public BitField getSavedTorrentBitField(Snark snark) {
MetaInfo metainfo = snark.meta;
byte[] ih = metainfo.getInfoHash();
String infohash = Base64.encode(ih);
infohash = infohash.replace('=', '$');
@ -618,11 +613,17 @@ public class SnarkManager implements Snark.CompleteListener {
}
}
/** two listeners */
public void torrentComplete(Snark snark) {
File f = new File(snark.torrent);
long len = snark.meta.getTotalLength();
addMessage("Download complete of " + f.getName()
+ (len < 5*1024*1024 ? " (size: " + (len/1024) + "KB)" : " (size: " + (len/(1024*1024l)) + "MB)"));
updateStatus(snark);
}
public void updateStatus(Snark snark) {
saveTorrentStatus(snark.meta, snark.storage.getBitField());
}
private void monitorTorrents(File dir) {
@ -706,26 +707,6 @@ public class SnarkManager implements Snark.CompleteListener {
return trackerMap;
}
public String getOpenTrackerString() {
return _config.getProperty(PROP_OPENTRACKERS, DEFAULT_OPENTRACKERS);
}
/** comma delimited list open trackers to use as backups */
/** sorted map of name to announceURL=baseURL */
public List getOpenTrackers() {
if (!shouldUseOpenTrackers())
return null;
List rv = new ArrayList(1);
String trackers = getOpenTrackerString();
StringTokenizer tok = new StringTokenizer(trackers, ", ");
while (tok.hasMoreTokens())
rv.add(tok.nextToken());
if (rv.size() <= 0)
return null;
return rv;
}
private static class TorrentFilenameFilter implements FilenameFilter {
private static final TorrentFilenameFilter _filter = new TorrentFilenameFilter();
public static TorrentFilenameFilter instance() { return _filter; }

View File

@ -39,11 +39,15 @@ public class Storage
private long[] lengths;
private RandomAccessFile[] rafs;
private String[] names;
private Object[] RAFlock; // lock on RAF access
private long[] RAFtime; // when was RAF last accessed, or 0 if closed
private File[] RAFfile; // File to make it easier to reopen
private final StorageListener listener;
private BitField bitfield; // BitField to represent the pieces
private int needed; // Number of pieces needed
private boolean _probablyComplete; // use this to decide whether to open files RO
// XXX - Not always set correctly
int piece_size;
@ -68,6 +72,7 @@ public class Storage
this.metainfo = metainfo;
this.listener = listener;
needed = metainfo.getPieces();
_probablyComplete = false;
bitfield = new BitField(needed);
}
@ -119,7 +124,6 @@ public class Storage
files.add(file);
}
String name = baseFile.getName();
if (files.size() == 1) // FIXME: ...and if base file not a directory or should this be the only check?
// this makes a bad metainfo if the directory has only one file in it
{
@ -133,7 +137,8 @@ public class Storage
}
// Creates piece hases for a new storage.
// Creates piece hashes for a new storage.
// This does NOT create the files, just the hashes
public void create() throws IOException
{
// if (true) {
@ -197,14 +202,8 @@ public class Storage
piece_hashes[20 * i + j] = hash[j];
bitfield.set(i);
if (listener != null)
listener.storageChecked(this, i, true);
}
if (listener != null)
listener.storageAllChecked(this);
// Reannounce to force recalculating the info_hash.
metainfo = metainfo.reannounce(metainfo.getAnnounce());
}
@ -218,6 +217,9 @@ public class Storage
names = new String[size];
lengths = new long[size];
rafs = new RandomAccessFile[size];
RAFlock = new Object[size];
RAFtime = new long[size];
RAFfile = new File[size];
int i = 0;
Iterator it = files.iterator();
@ -228,7 +230,8 @@ public class Storage
if (base.isDirectory() && names[i].startsWith(base.getPath()))
names[i] = names[i].substring(base.getPath().length() + 1);
lengths[i] = f.length();
rafs[i] = new RandomAccessFile(f, "r");
RAFlock[i] = new Object();
RAFfile[i] = f;
i++;
}
}
@ -288,11 +291,14 @@ public class Storage
* Creates (and/or checks) all files from the metainfo file list.
*/
public void check(String rootDir) throws IOException
{
check(rootDir, 0, null);
}
/** use a saved bitfield and timestamp from a config file */
public void check(String rootDir, long savedTime, BitField savedBitField) throws IOException
{
File base = new File(rootDir, filterName(metainfo.getName()));
// look for saved bitfield and timestamp in the config file
long savedTime = SnarkManager.instance().getSavedTorrentTime(metainfo);
BitField savedBitField = SnarkManager.instance().getSavedTorrentBitField(metainfo);
boolean useSavedBitField = savedTime > 0 && savedBitField != null;
List files = metainfo.getFiles();
@ -306,16 +312,17 @@ public class Storage
lengths = new long[1];
rafs = new RandomAccessFile[1];
names = new String[1];
RAFlock = new Object[1];
RAFtime = new long[1];
RAFfile = new File[1];
lengths[0] = metainfo.getTotalLength();
RAFlock[0] = new Object();
RAFfile[0] = base;
if (useSavedBitField) {
long lm = base.lastModified();
if (lm <= 0 || lm > savedTime)
useSavedBitField = false;
}
if (base.exists() && ((useSavedBitField && savedBitField.complete()) || !base.canWrite()))
rafs[0] = new RandomAccessFile(base, "r");
else
rafs[0] = new RandomAccessFile(base, "rw");
names[0] = base.getName();
}
else
@ -331,20 +338,21 @@ public class Storage
lengths = new long[size];
rafs = new RandomAccessFile[size];
names = new String[size];
RAFlock = new Object[size];
RAFtime = new long[size];
RAFfile = new File[size];
for (int i = 0; i < size; i++)
{
File f = createFileFromNames(base, (List)files.get(i));
lengths[i] = ((Long)ls.get(i)).longValue();
RAFlock[i] = new Object();
RAFfile[i] = f;
total += lengths[i];
if (useSavedBitField) {
long lm = base.lastModified();
if (lm <= 0 || lm > savedTime)
useSavedBitField = false;
}
if (f.exists() && ((useSavedBitField && savedBitField.complete()) || !f.canWrite()))
rafs[i] = new RandomAccessFile(f, "r");
else
rafs[i] = new RandomAccessFile(f, "rw");
names[i] = f.getName();
}
@ -357,11 +365,17 @@ public class Storage
if (useSavedBitField) {
bitfield = savedBitField;
needed = metainfo.getPieces() - bitfield.count();
_probablyComplete = complete();
Snark.debug("Found saved state and files unchanged, skipping check", Snark.NOTICE);
} else {
// the following sets the needed variable
changed = true;
checkCreateFiles();
SnarkManager.instance().saveTorrentStatus(metainfo, bitfield);
}
if (complete())
Snark.debug("Torrent is complete", Snark.NOTICE);
else
Snark.debug("Still need " + needed + " out of " + metainfo.getPieces() + " pieces", Snark.NOTICE);
}
/**
@ -379,11 +393,6 @@ public class Storage
Snark.debug("Reopening file: " + base, Snark.NOTICE);
if (!base.exists())
throw new IOException("Could not reopen file " + base);
if (complete() || !base.canWrite()) // hope we can get away with this, if we are only seeding...
rafs[0] = new RandomAccessFile(base, "r");
else
rafs[0] = new RandomAccessFile(base, "rw");
}
else
{
@ -398,10 +407,6 @@ public class Storage
File f = getFileFromNames(base, (List)files.get(i));
if (!f.exists())
throw new IOException("Could not reopen file " + f);
if (complete() || !f.canWrite()) // see above re: only seeding
rafs[i] = new RandomAccessFile(f, "r");
else
rafs[i] = new RandomAccessFile(f, "rw");
}
}
@ -453,28 +458,44 @@ public class Storage
return base;
}
/**
* This is called at the beginning, and at presumed completion,
* so we have to be careful about locking.
*/
private void checkCreateFiles() throws IOException
{
// Whether we are resuming or not,
// if any of the files already exists we assume we are resuming.
boolean resume = false;
_probablyComplete = true;
needed = metainfo.getPieces();
// Make sure all files are available and of correct length
for (int i = 0; i < rafs.length; i++)
{
long length = rafs[i].length();
if(length == lengths[i])
long length = RAFfile[i].length();
if(RAFfile[i].exists() && length == lengths[i])
{
if (listener != null)
listener.storageAllocated(this, length);
resume = true; // XXX Could dynamicly check
}
else if (length == 0)
else if (length == 0) {
changed = true;
synchronized(RAFlock[i]) {
allocateFile(i);
else {
}
} else {
Snark.debug("File '" + names[i] + "' exists, but has wrong length - repairing corruption", Snark.ERROR);
changed = true;
_probablyComplete = false; // to force RW
synchronized(RAFlock[i]) {
checkRAF(i);
rafs[i].setLength(lengths[i]);
}
// will be closed below
}
}
// Check which pieces match and which don't
@ -497,6 +518,17 @@ public class Storage
}
}
_probablyComplete = complete();
// close all the files so we don't end up with a zillion open ones;
// we will reopen as needed
for (int i = 0; i < rafs.length; i++) {
synchronized(RAFlock[i]) {
try {
closeRAF(i);
} catch (IOException ioe) {}
}
}
if (listener != null) {
listener.storageAllChecked(this);
if (needed <= 0)
@ -506,6 +538,8 @@ public class Storage
private void allocateFile(int nr) throws IOException
{
// caller synchronized
openRAF(nr, false); // RW
// XXX - Is this the best way to make sure we have enough space for
// the whole file?
listener.storageCreateFile(this, names[nr], lengths[nr]);
@ -520,6 +554,7 @@ public class Storage
}
int size = (int)(lengths[nr] - i*ZEROBLOCKSIZE);
rafs[nr].write(zeros, 0, size);
// caller will close rafs[nr]
if (listener != null)
listener.storageAllocated(this, size);
}
@ -535,12 +570,11 @@ public class Storage
for (int i = 0; i < rafs.length; i++)
{
try {
synchronized(rafs[i])
{
rafs[i].close();
synchronized(RAFlock[i]) {
closeRAF(i);
}
} catch (IOException ioe) {
I2PSnarkUtil.instance().debug("Error closing " + rafs[i], Snark.ERROR, ioe);
I2PSnarkUtil.instance().debug("Error closing " + RAFfile[i], Snark.ERROR, ioe);
// gobble gobble
}
}
@ -587,18 +621,10 @@ public class Storage
if (!correctHash)
return false;
boolean complete;
synchronized(bitfield)
{
if (bitfield.get(piece))
return true; // No need to store twice.
else
{
bitfield.set(piece);
needed--;
complete = needed == 0;
}
}
// Early typecast, avoid possibly overflowing a temp integer
@ -618,8 +644,9 @@ public class Storage
{
int need = length - written;
int len = (start + need < raflen) ? need : (int)(raflen - start);
synchronized(rafs[i])
synchronized(RAFlock[i])
{
checkRAF(i);
rafs[i].seek(start);
rafs[i].write(bs, off + written, len);
}
@ -633,8 +660,21 @@ public class Storage
}
changed = true;
// do this after the write, so we know it succeeded, and we don't set the
// needed count to zero, which would cause checkRAF() to open the file readonly.
boolean complete = false;
synchronized(bitfield)
{
if (!bitfield.get(piece))
{
bitfield.set(piece);
needed--;
complete = needed == 0;
}
}
if (complete) {
// listener.storageCompleted(this);
// do we also need to close all of the files and reopen
// them readonly?
@ -649,11 +689,13 @@ public class Storage
bitfield = new BitField(needed);
checkCreateFiles();
if (needed > 0) {
if (listener != null)
listener.setWantedPieces(this);
Snark.debug("WARNING: Not really done, missing " + needed
+ " pieces", Snark.WARNING);
} else {
SnarkManager.instance().saveTorrentStatus(metainfo, bitfield);
if (listener != null)
listener.storageCompleted(this);
}
}
@ -688,8 +730,9 @@ public class Storage
{
int need = length - read;
int len = (start + need < raflen) ? need : (int)(raflen - start);
synchronized(rafs[i])
synchronized(RAFlock[i])
{
checkRAF(i);
rafs[i].seek(start);
rafs[i].readFully(bs, read, len);
}
@ -704,4 +747,59 @@ public class Storage
return length;
}
/**
* Close unused RAFs - call periodically
*/
private static final long RAFCloseDelay = 7*60*1000;
public void cleanRAFs() {
long cutoff = System.currentTimeMillis() - RAFCloseDelay;
for (int i = 0; i < RAFlock.length; i++) {
synchronized(RAFlock[i]) {
if (RAFtime[i] > 0 && RAFtime[i] < cutoff) {
try {
closeRAF(i);
} catch (IOException ioe) {}
}
}
}
}
/*
* For each of the following,
* caller must synchronize on RAFlock[i]
* ... except at the beginning if you're careful
*/
/**
* This must be called before using the RAF to ensure it is open
*/
private void checkRAF(int i) throws IOException {
if (RAFtime[i] > 0) {
RAFtime[i] = System.currentTimeMillis();
return;
}
openRAF(i);
}
private void openRAF(int i) throws IOException {
openRAF(i, _probablyComplete);
}
private void openRAF(int i, boolean readonly) throws IOException {
rafs[i] = new RandomAccessFile(RAFfile[i], (readonly || !RAFfile[i].canWrite()) ? "r" : "rw");
RAFtime[i] = System.currentTimeMillis();
}
/**
* Can be called even if not open
*/
private void closeRAF(int i) throws IOException {
RAFtime[i] = 0;
if (rafs[i] == null)
return;
rafs[i].close();
rafs[i] = null;
}
}

View File

@ -121,7 +121,7 @@ public class TrackerClient extends I2PThread
// the primary tracker, that we don't add it twice.
trackers = new ArrayList(2);
trackers.add(new Tracker(meta.getAnnounce(), true));
List tlist = SnarkManager.instance().getOpenTrackers();
List tlist = I2PSnarkUtil.instance().getOpenTrackers();
if (tlist != null) {
for (int i = 0; i < tlist.size(); i++) {
String url = (String)tlist.get(i);

View File

@ -690,8 +690,8 @@ public class I2PSnarkServlet extends HttpServlet {
String uri = req.getRequestURI();
String dataDir = _manager.getDataDir().getAbsolutePath();
boolean autoStart = _manager.shouldAutoStart();
boolean useOpenTrackers = _manager.shouldUseOpenTrackers();
String openTrackers = _manager.getOpenTrackerString();
boolean useOpenTrackers = I2PSnarkUtil.instance().shouldUseOpenTrackers();
String openTrackers = I2PSnarkUtil.instance().getOpenTrackerString();
//int seedPct = 0;
out.write("<form action=\"" + uri + "\" method=\"POST\">\n");