* i2psnark: Add file priority feature;

Use context random for shuffle; other cleanups
This commit is contained in:
zzz
2010-10-15 13:48:36 +00:00
parent 1aba324481
commit 9afff4f80a
6 changed files with 271 additions and 25 deletions

View File

@ -163,6 +163,10 @@ public class I2PSnarkUtil {
opts.setProperty("inbound.nickname", "I2PSnark");
if (opts.getProperty("outbound.nickname") == null)
opts.setProperty("outbound.nickname", "I2PSnark");
// Dont do this for now, it is set in I2PSocketEepGet for announces,
// we don't need fast handshake for peer connections.
//if (opts.getProperty("i2p.streaming.connectDelay") == null)
// opts.setProperty("i2p.streaming.connectDelay", "500");
if (opts.getProperty("i2p.streaming.inactivityTimeout") == null)
opts.setProperty("i2p.streaming.inactivityTimeout", "240000");
if (opts.getProperty("i2p.streaming.inactivityAction") == null)

View File

@ -78,6 +78,7 @@ public class PeerCoordinator implements PeerListener
private final CoordinatorListener listener;
public I2PSnarkUtil _util;
private static final Random _random = I2PAppContext.getGlobalContext().random();
public String trackerProblems = null;
public int trackerSeenPeers = 0;
@ -97,8 +98,7 @@ public class PeerCoordinator implements PeerListener
// Install a timer to check the uploaders.
// Randomize the first start time so multiple tasks are spread out,
// this will help the behavior with global limits
Random r = I2PAppContext.getGlobalContext().random();
timer.schedule(new PeerCheckerTask(_util, this), (CHECK_PERIOD / 2) + r.nextInt((int) CHECK_PERIOD), CHECK_PERIOD);
timer.schedule(new PeerCheckerTask(_util, this), (CHECK_PERIOD / 2) + _random.nextInt((int) CHECK_PERIOD), CHECK_PERIOD);
}
// only called externally from Storage after the double-check fails
@ -107,10 +107,16 @@ public class PeerCoordinator implements PeerListener
// Make a list of pieces
wantedPieces = new ArrayList();
BitField bitfield = storage.getBitField();
for(int i = 0; i < metainfo.getPieces(); i++)
if (!bitfield.get(i))
wantedPieces.add(new Piece(i));
Collections.shuffle(wantedPieces);
int[] pri = storage.getPiecePriorities();
for(int i = 0; i < metainfo.getPieces(); i++) {
if (!bitfield.get(i)) {
Piece p = new Piece(i);
if (pri != null)
p.setPriority(pri[i]);
wantedPieces.add(p);
}
}
Collections.shuffle(wantedPieces, _random);
}
public Storage getStorage() { return storage; }
@ -520,6 +526,9 @@ public class PeerCoordinator implements PeerListener
while (piece == null && it.hasNext())
{
Piece p = it.next();
// sorted by priority, so when we hit a disabled piece we are done
if (p.isDisabled())
break;
if (havePieces.get(p.getId()) && !p.isRequested())
{
piece = p;
@ -538,7 +547,7 @@ public class PeerCoordinator implements PeerListener
if (wantedPieces.size() > END_GAME_THRESHOLD)
return -1; // nothing to request and not in end game
// let's not all get on the same piece
Collections.shuffle(requested);
Collections.shuffle(requested, _random);
Iterator<Piece> it2 = requested.iterator();
while (piece == null && it2.hasNext())
{
@ -563,11 +572,64 @@ public class PeerCoordinator implements PeerListener
_log.debug("parallel request (end game?) for " + peer + ": piece = " + piece);
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Now requesting: piece " + piece + " priority " + piece.getPriority());
piece.setRequested(true);
return piece.getId();
}
}
/**
* Maps file priorities to piece priorities.
* Call after updating file priorities Storage.setPriority()
* @since 0.8.1
*/
public void updatePiecePriorities() {
int[] pri = storage.getPiecePriorities();
if (pri == null)
return;
synchronized(wantedPieces) {
// Add incomplete and previously unwanted pieces to the list
// Temp to avoid O(n**2)
BitField want = new BitField(pri.length);
for (Piece p : wantedPieces) {
want.set(p.getId());
}
BitField bitfield = storage.getBitField();
for (int i = 0; i < pri.length; i++) {
if (pri[i] >= 0 && !bitfield.get(i)) {
if (!want.get(i)) {
Piece piece = new Piece(i);
wantedPieces.add(piece);
// As connections are already up, new Pieces will
// not have their PeerID list populated, so do that.
synchronized(peers) {
for (Peer p : peers) {
PeerState s = p.state;
if (s != null) {
BitField bf = s.bitfield;
if (bf != null && bf.get(i))
piece.addPeer(p);
}
}
}
}
}
}
// now set the new priorities and remove newly unwanted pieces
for (Iterator<Piece> iter = wantedPieces.iterator(); iter.hasNext(); ) {
Piece p = iter.next();
int id = pri[p.getId()];
if (id >= 0)
p.setPriority(pri[p.getId()]);
else
iter.remove();
}
// if we added pieces, they will be in-order unless we shuffle
Collections.shuffle(wantedPieces, _random);
}
}
/**
* Returns a byte array containing the requested piece or null of
* the piece is unknown.
@ -632,6 +694,9 @@ public class PeerCoordinator implements PeerListener
// No need to announce have piece to peers.
// Assume we got a good piece, we don't really care anymore.
// Well, this could be caused by a change in priorities, so
// only return true if we already have it, otherwise might as well keep it.
if (storage.getBitField().get(piece))
return true;
}
@ -639,6 +704,7 @@ public class PeerCoordinator implements PeerListener
{
if (storage.putPiece(piece, bs))
{
if (_log.shouldLog(Log.INFO))
_log.info("Got valid piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName());
}
else

View File

@ -1,22 +1,31 @@
package org.klomp.snark;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import net.i2p.util.ConcurrentHashSet;
public class Piece implements Comparable {
private int id;
private Set peers;
private Set<PeerID> peers;
private boolean requested;
/** @since 0.8.1 */
private int priority;
public Piece(int id) {
this.id = id;
this.peers = Collections.synchronizedSet(new HashSet());
this.requested = false;
this.peers = new ConcurrentHashSet();
}
/**
* Highest priority first,
* then rarest first
*/
public int compareTo(Object o) throws ClassCastException {
int pdiff = ((Piece)o).priority - this.priority; // reverse
if (pdiff != 0)
return pdiff;
return this.peers.size() - ((Piece)o).peers.size();
}
@ -37,12 +46,25 @@ public class Piece implements Comparable {
}
public int getId() { return this.id; }
public Set getPeers() { return this.peers; }
/** @deprecated unused */
public Set<PeerID> getPeers() { return this.peers; }
public boolean addPeer(Peer peer) { return this.peers.add(peer.getPeerID()); }
public boolean removePeer(Peer peer) { return this.peers.remove(peer.getPeerID()); }
public boolean isRequested() { return this.requested; }
public void setRequested(boolean requested) { this.requested = requested; }
/** @return default 0 @since 0.8.1 */
public int getPriority() { return this.priority; }
/** @since 0.8.1 */
public void setPriority(int p) { this.priority = p; }
/** @since 0.8.1 */
public boolean isDisabled() { return this.priority < 0; }
/** @since 0.8.1 */
public void setDisabled() { this.priority = -1; }
@Override
public String toString() {
return String.valueOf(id);

View File

@ -42,6 +42,8 @@ public class Storage
private Object[] RAFlock; // lock on RAF access
private long[] RAFtime; // when was RAF last accessed, or 0 if closed
private File[] RAFfile; // File to make it easier to reopen
/** priorities by file; default 0; may be null. @since 0.8.1 */
private int[] priorities;
private final StorageListener listener;
private I2PSnarkUtil _util;
@ -228,6 +230,8 @@ public class Storage
RAFlock = new Object[size];
RAFtime = new long[size];
RAFfile = new File[size];
priorities = new int[size];
int i = 0;
Iterator it = files.iterator();
@ -330,6 +334,83 @@ public class Storage
return -1;
}
/**
* @param file canonical path (non-directory)
* @since 0.8.1
*/
public int getPriority(String file) {
if (complete() || metainfo.getFiles() == null || priorities == null)
return 0;
for (int i = 0; i < rafs.length; i++) {
File f = RAFfile[i];
// use canonical in case snark dir or sub dirs are symlinked
if (f != null) {
try {
String canonical = f.getCanonicalPath();
if (canonical.equals(file))
return priorities[i];
} catch (IOException ioe) {}
}
}
return 0;
}
/**
* Must call setPiecePriorities() after calling this
* @param file canonical path (non-directory)
* @param priority default 0; <0 to disable
* @since 0.8.1
*/
public void setPriority(String file, int pri) {
if (complete() || metainfo.getFiles() == null || priorities == null)
return;
for (int i = 0; i < rafs.length; i++) {
File f = RAFfile[i];
// use canonical in case snark dir or sub dirs are symlinked
if (f != null) {
try {
String canonical = f.getCanonicalPath();
if (canonical.equals(file)) {
priorities[i] = pri;
return;
}
} catch (IOException ioe) {}
}
}
}
/**
* Call setPriority() for all changed files first,
* then call this.
* Set the piece priority to the highest priority
* of all files spanning the piece.
* Caller must pass array to the PeerCoordinator.
* @return null on error, if complete, or if only one file
* @since 0.8.1
*/
public int[] getPiecePriorities() {
if (complete() || metainfo.getFiles() == null)
return null;
int[] rv = new int[metainfo.getPieces()];
int file = 0;
long pcEnd = -1;
long fileEnd = lengths[0] - 1;
int psz = metainfo.getPieceLength(0);
for (int i = 0; i < rv.length; i++) {
pcEnd += psz;
int pri = priorities[file];
while (fileEnd <= pcEnd && file < lengths.length - 1) {
file++;
long oldFileEnd = fileEnd;
fileEnd += lengths[file];
if (priorities[file] > pri && pcEnd < oldFileEnd)
pri = priorities[file];
}
rv[i] = pri;
}
return rv;
}
/**
* The BitField that tells which pieces this storage contains.
* Do not change this since this is the current state of the storage.
@ -436,11 +517,15 @@ public class Storage
changed = true;
checkCreateFiles();
}
if (complete())
if (complete()) {
_util.debug("Torrent is complete", Snark.NOTICE);
else
} else {
// fixme saved priorities
if (files != null)
priorities = new int[files.size()];
_util.debug("Still need " + needed + " out of " + metainfo.getPieces() + " pieces", Snark.NOTICE);
}
}
/**
* Reopen the file descriptors for a restart
@ -624,7 +709,12 @@ public class Storage
// the whole file?
listener.storageCreateFile(this, names[nr], lengths[nr]);
final int ZEROBLOCKSIZE = metainfo.getPieceLength(0);
byte[] zeros = new byte[ZEROBLOCKSIZE];
byte[] zeros;
try {
zeros = new byte[ZEROBLOCKSIZE];
} catch (OutOfMemoryError oom) {
throw new IOException(oom.toString());
}
int i;
for (i = 0; i < lengths[nr]/ZEROBLOCKSIZE; i++)
{

View File

@ -266,7 +266,7 @@ public class TrackerClient extends I2PAppThread
// we only want to talk to new people if we need things
// from them (duh)
List ordered = new ArrayList(peers);
Collections.shuffle(ordered);
Collections.shuffle(ordered, r);
Iterator it = ordered.iterator();
while (it.hasNext()) {
Peer cur = (Peer)it.next();

View File

@ -133,6 +133,7 @@ public class I2PSnarkServlet extends Default {
// bypass the horrid Resource.getListHTML()
String pathInfo = req.getPathInfo();
String pathInContext = URI.addPaths(path, pathInfo);
req.setCharacterEncoding("UTF-8");
resp.setCharacterEncoding("UTF-8");
resp.setContentType("text/html; charset=UTF-8");
Resource resource = getResource(pathInContext);
@ -140,7 +141,7 @@ public class I2PSnarkServlet extends Default {
resp.sendError(HttpResponse.__404_Not_Found);
} else {
String base = URI.addPaths(req.getRequestURI(), "/");
String listing = getListHTML(resource, base, true);
String listing = getListHTML(resource, base, true, method.equals("POST") ? req.getParameterMap() : null);
if (listing != null)
resp.getWriter().write(listing);
else // shouldn't happen
@ -1252,10 +1253,11 @@ public class I2PSnarkServlet extends Default {
* @param r The Resource
* @param base The base URL
* @param parent True if the parent directory should be included
* @param postParams map of POST parameters or null if not a POST
* @return String of HTML
* @since 0.7.14
*/
private String getListHTML(Resource r, String base, boolean parent)
private String getListHTML(Resource r, String base, boolean parent, Map postParams)
throws IOException
{
if (!r.isDirectory())
@ -1280,6 +1282,10 @@ public class I2PSnarkServlet extends Default {
else
torrentName = title;
Snark snark = _manager.getTorrentByBaseName(torrentName);
if (snark != null && postParams != null)
savePriorities(snark, postParams);
if (title.endsWith("/"))
title = title.substring(0, title.length() - 1);
title = _("Torrent") + ": " + title;
@ -1297,12 +1303,19 @@ public class I2PSnarkServlet extends Default {
.append(_("Up to higher level directory")).append("</A>\n");
}
buf.append("</div><div class=\"page\"><div class=\"mainsection\">" +
"<TABLE BORDER=0 class=\"snarkTorrents\" cellpadding=\"5px 10px\">" +
buf.append("</div><div class=\"page\"><div class=\"mainsection\">");
boolean showPriority = snark != null && !snark.storage.complete();
if (showPriority)
buf.append("<form action=\"").append(base).append("\" method=\"POST\">\n");
buf.append("<TABLE BORDER=0 class=\"snarkTorrents\" cellpadding=\"5px 10px\">" +
"<thead><tr><th>").append(_("File")).append("</th><th>").append(_("Size"))
.append("</th><th>").append(_("Status")).append("</th></tr></thead>");
.append("</th><th>").append(_("Status")).append("</th>");
if (showPriority)
buf.append("<th>").append(_("Priority")).append("</th>");
buf.append("</tr></thead>\n");
//DateFormat dfmt=DateFormat.getDateTimeInstance(DateFormat.MEDIUM,
// DateFormat.MEDIUM);
boolean showSaveButton = false;
for (int i=0 ; i< ls.length ; i++)
{
String encoded=URI.encodePath(ls[i]);
@ -1340,7 +1353,8 @@ public class I2PSnarkServlet extends Default {
complete = true;
status = toImg("tick") + _("Complete");
} else {
status = toImg("clock") +
status =
(snark.storage.getPriority(f.getCanonicalPath()) < 0 ? toImg("cancel") : toImg("clock")) +
(100 * (length - remaining) / length) + "% " + _("complete") +
" (" + DataHelper.formatSize2(remaining) + _("bytes remaining") + ")";
}
@ -1384,9 +1398,40 @@ public class I2PSnarkServlet extends Default {
buf.append("</TD><TD class=\"").append(rowClass).append(" snarkFileStatus\">");
//buf.append(dfmt.format(new Date(item.lastModified())));
buf.append(status);
buf.append("</TD></TR>\n");
buf.append("</TD>");
if (showPriority) {
buf.append("<td>");
File f = item.getFile();
if ((!complete) && (!item.isDirectory()) && f != null) {
int pri = snark.storage.getPriority(f.getCanonicalPath());
buf.append("<input type=\"radio\" value=\"5\" name=\"pri.").append(f.getCanonicalPath()).append("\" ");
if (pri > 0)
buf.append("checked=\"true\"");
buf.append('>').append(_("High"));
buf.append("<input type=\"radio\" value=\"0\" name=\"pri.").append(f.getCanonicalPath()).append("\" ");
if (pri == 0)
buf.append("checked=\"true\"");
buf.append('>').append(_("Normal"));
buf.append("<input type=\"radio\" value=\"-9\" name=\"pri.").append(f.getCanonicalPath()).append("\" ");
if (pri < 0)
buf.append("checked=\"true\"");
buf.append('>').append(_("Do not download"));
showSaveButton = true;
}
buf.append("</td>");
}
buf.append("</TR>\n");
}
if (showSaveButton) {
buf.append("<thead><tr><th colspan=\"3\">&nbsp;</th><th align=\"center\"><input type=\"submit\" value=\"");
buf.append(_("Save priorities"));
buf.append("\" name=\"foo\" ></th></tr></thead>\n");
}
buf.append("</TABLE>\n");
if (showPriority)
buf.append("</form>");
buf.append("</div></div></BODY></HTML>\n");
return buf.toString();
@ -1452,6 +1497,25 @@ public class I2PSnarkServlet extends Default {
return "<img alt=\"\" height=\"16\" width=\"16\" src=\"/i2psnark/_icons/" + icon + ".png\"> ";
}
/** @since 0.8.1 */
private static void savePriorities(Snark snark, Map postParams) {
Set<Map.Entry> entries = postParams.entrySet();
for (Map.Entry entry : entries) {
String key = (String)entry.getKey();
if (key.startsWith("pri.")) {
try {
String file = key.substring(4);
String val = ((String[])entry.getValue())[0]; // jetty arrays
int pri = Integer.parseInt(val);
snark.storage.setPriority(file, pri);
//System.err.println("Priority now " + pri + " for " + file);
} catch (Throwable t) { t.printStackTrace(); }
}
}
if (snark.coordinator != null)
snark.coordinator.updatePiecePriorities();
}
/** inner class, don't bother reindenting */
private static class FetchAndAdd implements Runnable {