* i2psnark: Clean up and enhance the PeerCoordinator's partial piece handling,

in preparation for more improvements
This commit is contained in:
zzz
2010-11-26 00:44:00 +00:00
parent d37944e081
commit 7f1ace4dbe
7 changed files with 304 additions and 143 deletions

View File

@ -0,0 +1,102 @@
package org.klomp.snark;
/**
* This is the class passed from PeerCoordinator to PeerState so
* PeerState may start requests.
*
* It is also passed from PeerState to PeerCoordinator when
* a piece is not completely downloaded, for example
* when the Peer disconnects or chokes.
*/
class PartialPiece implements Comparable {
private final int piece;
private final byte[] bs;
private final int off;
private final long createdTime;
/**
* Used by PeerCoordinator.
* Creates a new PartialPiece, with no chunks yet downloaded.
* Allocates the data.
*
* @param piece Piece number requested.
* @param bs length must be equal to the piece length
*/
public PartialPiece (int piece, int len) throws OutOfMemoryError {
this.piece = piece;
this.bs = new byte[len];
this.off = 0;
this.createdTime = 0;
}
/**
* Used by PeerState.
* Creates a new PartialPiece, with chunks up to but not including
* firstOutstandingRequest already downloaded and stored in the Request byte array.
*
* Note that this cannot handle gaps; chunks after a missing chunk cannot be saved.
* That would be harder.
*
* @param firstOutstandingRequest the first request not fulfilled for the piece
*/
public PartialPiece (Request firstOutstandingRequest) {
this.piece = firstOutstandingRequest.piece;
this.bs = firstOutstandingRequest.bs;
this.off = firstOutstandingRequest.off;
this.createdTime = System.currentTimeMillis();
}
/**
* Convert this PartialPiece to a request for the next chunk.
* Used by PeerState only.
*/
public Request getRequest() {
return new Request(this.piece, this.bs, this.off, Math.min(this.bs.length - this.off, PeerState.PARTSIZE));
}
/** piece number */
public int getPiece() {
return this.piece;
}
/** how many bytes are good */
public int getDownloaded() {
return this.off;
}
public long getCreated() {
return this.createdTime;
}
/**
* Highest downloaded first
*/
public int compareTo(Object o) throws ClassCastException {
return ((PartialPiece)o).off - this.off; // reverse
}
@Override
public int hashCode() {
return piece * 7777;
}
/**
* Make this simple so PeerCoordinator can keep a List.
* Warning - compares piece number only!
*/
@Override
public boolean equals(Object o) {
if (o instanceof PartialPiece) {
PartialPiece pp = (PartialPiece)o;
return pp.piece == this.piece;
}
return false;
}
@Override
public String toString() {
return "Partial(" + piece + ',' + off + ',' + bs.length + ')';
}
}

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.util.Log;
@ -368,8 +369,11 @@ public class Peer implements Comparable
if (this.deregister) {
PeerListener p = s.listener;
if (p != null) {
p.savePeerPartial(s);
p.markUnrequested(this);
List<PartialPiece> pcs = s.returnPartialPieces();
if (!pcs.isEmpty())
p.savePartialPieces(this, pcs);
// now covered by savePartialPieces
//p.markUnrequested(this);
}
}
state = null;

View File

@ -74,6 +74,9 @@ public class PeerCoordinator implements PeerListener
// Some random wanted pieces
private List<Piece> wantedPieces;
/** partial pieces */
private final List<PartialPiece> partialPieces;
private boolean halted = false;
private final CoordinatorListener listener;
@ -94,6 +97,7 @@ public class PeerCoordinator implements PeerListener
this.snark = torrent;
setWantedPieces();
partialPieces = new ArrayList(getMaxConnections() + 1);
// Install a timer to check the uploaders.
// Randomize the first start time so multiple tasks are spread out,
@ -293,7 +297,9 @@ public class PeerCoordinator implements PeerListener
removePeerFromPieces(peer);
}
// delete any saved orphan partial piece
savedRequest = null;
synchronized (partialPieces) {
partialPieces.clear();
}
}
public void connected(Peer peer)
@ -773,6 +779,9 @@ public class PeerCoordinator implements PeerListener
wantedPieces.remove(p);
}
// just in case
removePartialPiece(piece);
// Announce to the world we have it!
// Disconnect from other seeders when we get the last piece
synchronized(peers)
@ -866,70 +875,123 @@ public class PeerCoordinator implements PeerListener
}
}
/** Simple method to save a partial piece on peer disconnection
/**
* Save partial pieces on peer disconnection
* and hopefully restart it later.
* Only one partial piece is saved at a time.
* Replace it if a new one is bigger or the old one is too old.
* Replace a partial piece in the List if the new one is bigger.
* Storage method is private so we can expand to save multiple partials
* if we wish.
*
* Also mark the piece unrequested if this peer was the only one.
*
* @param peer partials, must include the zero-offset (empty) ones too
* @since 0.8.2
*/
private Request savedRequest = null;
private long savedRequestTime = 0;
public void savePeerPartial(PeerState state)
public void savePartialPieces(Peer peer, List<PartialPiece> partials)
{
if (halted)
return;
Request req = state.getPartialRequest();
if (req == null)
return;
if (savedRequest == null ||
req.off > savedRequest.off ||
System.currentTimeMillis() > savedRequestTime + (15 * 60 * 1000)) {
if (savedRequest == null || (req.piece != savedRequest.piece && req.off != savedRequest.off)) {
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(" Saving orphaned partial piece " + req);
if (savedRequest != null)
_log.debug(" (Discarding previously saved orphan) " + savedRequest);
}
if (halted)
return;
if (_log.shouldLog(Log.INFO))
_log.info("Partials received from " + peer + ": " + partials);
synchronized(partialPieces) {
for (PartialPiece pp : partials) {
if (pp.getDownloaded() > 0) {
// PartialPiece.equals() only compares piece number, which is what we want
int idx = partialPieces.indexOf(pp);
if (idx < 0) {
partialPieces.add(pp);
if (_log.shouldLog(Log.INFO))
_log.info("Saving orphaned partial piece (new) " + pp);
} else if (idx >= 0 && pp.getDownloaded() > partialPieces.get(idx).getDownloaded()) {
// replace what's there now
partialPieces.set(idx, pp);
if (_log.shouldLog(Log.INFO))
_log.info("Saving orphaned partial piece (bigger) " + pp);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Discarding partial piece (not bigger)" + pp);
}
int max = getMaxConnections();
if (partialPieces.size() > max) {
// sorts by remaining bytes, least first
Collections.sort(partialPieces);
PartialPiece gone = partialPieces.remove(max);
if (_log.shouldLog(Log.INFO))
_log.info("Discarding orphaned partial piece (list full)" + gone);
}
} // else drop the empty partial piece
// synchs on wantedPieces...
markUnrequestedIfOnlyOne(peer, pp.getPiece());
}
if (_log.shouldLog(Log.INFO))
_log.info("Partial list size now: " + partialPieces.size());
}
savedRequest = req;
savedRequestTime = System.currentTimeMillis();
} else {
if (req.piece != savedRequest.piece)
if (_log.shouldLog(Log.DEBUG))
_log.debug(" Discarding orphaned partial piece " + req);
}
}
/** Return partial piece if it's still wanted and peer has it.
/**
* Return partial piece to the PeerState if it's still wanted and peer has it.
* @param havePieces pieces the peer has, the rv will be one of these
*
* @return PartialPiece or null
* @since 0.8.2
*/
public Request getPeerPartial(BitField havePieces) {
if (savedRequest == null)
return null;
if (! havePieces.get(savedRequest.piece)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer doesn't have orphaned piece " + savedRequest);
return null;
}
synchronized(wantedPieces)
{
for(Iterator<Piece> iter = wantedPieces.iterator(); iter.hasNext(); ) {
Piece piece = iter.next();
if (piece.getId() == savedRequest.piece) {
Request req = savedRequest;
piece.setRequested(true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Restoring orphaned partial piece " + req);
savedRequest = null;
return req;
public PartialPiece getPartialPiece(Peer peer, BitField havePieces) {
// do it in this order to avoid deadlock (same order as in savePartialPieces())
synchronized(partialPieces) {
synchronized(wantedPieces) {
// sorts by remaining bytes, least first
Collections.sort(partialPieces);
for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) {
PartialPiece pp = iter.next();
int savedPiece = pp.getPiece();
if (havePieces.get(savedPiece)) {
// this is just a double-check, it should be in there
for(Piece piece : wantedPieces) {
if (piece.getId() == savedPiece) {
piece.setRequested(true);
iter.remove();
if (_log.shouldLog(Log.INFO)) {
_log.info("Restoring orphaned partial piece " + pp +
" Partial list size now: " + partialPieces.size());
}
return pp;
}
}
}
}
}
}
// ...and this section turns this into the general move-requests-around code!
// Temporary? So PeerState never calls wantPiece() directly for now...
int piece = wantPiece(peer, havePieces);
if (piece >= 0) {
try {
return new PartialPiece(piece, metainfo.getPieceLength(piece));
} catch (OutOfMemoryError oom) {
if (_log.shouldLog(Log.WARN))
_log.warn("OOM creating new partial piece");
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("We have no partial piece to return");
return null;
}
/**
* Remove saved state for this piece.
* Unless we are in the end game there shouldnt be anything in there.
* Do not call with wantedPieces lock held (deadlock)
*/
private void removePartialPiece(int piece) {
synchronized(partialPieces) {
for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) {
PartialPiece pp = iter.next();
if (pp.getPiece() == piece) {
iter.remove();
// there should be only one but keep going to be sure
}
}
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("We no longer want orphaned piece " + savedRequest);
savedRequest = null;
return null;
}
/** Clear the requested flag for a piece if the peer
@ -947,13 +1009,12 @@ public class PeerCoordinator implements PeerListener
continue;
if (p.state == null)
continue;
int[] arr = p.state.getRequestedPieces();
for (int i = 0; arr[i] >= 0; i++)
if(arr[i] == piece) {
// FIXME don't go into the state
if (p.state.getRequestedPieces().contains(Integer.valueOf(piece))) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Another peer is requesting piece " + piece);
return;
}
}
}
}
@ -973,20 +1034,6 @@ public class PeerCoordinator implements PeerListener
}
}
/** Mark a peer's requested pieces unrequested when it is disconnected
** Once for each piece
** This is enough trouble, maybe would be easier just to regenerate
** the requested list from scratch instead.
*/
public void markUnrequested(Peer peer)
{
if (halted || peer.state == null)
return;
int[] arr = peer.state.getRequestedPieces();
for (int i = 0; arr[i] >= 0; i++)
markUnrequestedIfOnlyOne(peer, arr[i]);
}
/** Return number of allowed uploaders for this torrent.
** Check with Snark to see if we are over the total upload limit.
*/

View File

@ -20,10 +20,12 @@
package org.klomp.snark;
import java.util.List;
/**
* Listener for Peer events.
*/
public interface PeerListener
interface PeerListener
{
/**
* Called when the connection to the peer has started and the
@ -151,7 +153,7 @@ public interface PeerListener
*
* @param state the PeerState for the peer
*/
void savePeerPartial(PeerState state); /* FIXME Exporting non-public type through public API FIXME */
void savePartialPieces(Peer peer, List<PartialPiece> pcs);
/**
* Called when a peer has connected and there may be a partially
@ -161,12 +163,5 @@ public interface PeerListener
*
* @return request (contains the partial data and valid length)
*/
Request getPeerPartial(BitField havePieces); /* FIXME Exporting non-public type through public API FIXME */
/** Mark a peer's requested pieces unrequested when it is disconnected
* This prevents premature end game
*
* @param peer the peer that is disconnecting
*/
void markUnrequested(Peer peer);
PartialPiece getPartialPiece(Peer peer, BitField havePieces);
}

View File

@ -23,9 +23,11 @@ package org.klomp.snark;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
@ -36,9 +38,9 @@ import org.klomp.snark.bencode.BEValue;
class PeerState implements DataLoader
{
private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(PeerState.class);
final Peer peer;
private final Peer peer;
final PeerListener listener;
final MetaInfo metainfo;
private final MetaInfo metainfo;
// Interesting and choking describes whether we are interested in or
// are choking the other side.
@ -54,6 +56,7 @@ class PeerState implements DataLoader
long downloaded;
long uploaded;
/** the pieces the peer has */
BitField bitfield;
// Package local for use by Peer.
@ -102,6 +105,12 @@ class PeerState implements DataLoader
if (interesting && !choked)
request(resend);
if (choked) {
// TODO
// savePartialPieces
// clear request list
}
}
void interestedMessage(boolean interest)
@ -308,8 +317,11 @@ class PeerState implements DataLoader
}
}
/**
* @return index in outstandingRequests or -1
*/
synchronized private int getFirstOutstandingRequest(int piece)
{
{
for (int i = 0; i < outstandingRequests.size(); i++)
if (outstandingRequests.get(i).piece == piece)
return i;
@ -397,54 +409,56 @@ class PeerState implements DataLoader
}
// get longest partial piece
synchronized Request getPartialRequest()
{
Request req = null;
for (int i = 0; i < outstandingRequests.size(); i++) {
Request r1 = outstandingRequests.get(i);
int j = getFirstOutstandingRequest(r1.piece);
if (j == -1)
continue;
Request r2 = outstandingRequests.get(j);
if (r2.off > 0 && ((req == null) || (r2.off > req.off)))
req = r2;
}
if (pendingRequest != null && req != null && pendingRequest.off < req.off) {
if (pendingRequest.off != 0)
req = pendingRequest;
else
req = null;
}
return req;
/**
* @return lowest offset of any request for the piece
* @since 0.8.2
*/
synchronized private Request getLowestOutstandingRequest(int piece) {
Request rv = null;
int lowest = Integer.MAX_VALUE;
for (Request r : outstandingRequests) {
if (r.piece == piece && r.off < lowest) {
lowest = r.off;
rv = r;
}
}
if (pendingRequest != null &&
pendingRequest.piece == piece && pendingRequest.off < lowest)
rv = pendingRequest;
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " lowest for " + piece + " is " + rv + " out of " + pendingRequest + " and " + outstandingRequests);
return rv;
}
/**
* return array of pieces terminated by -1
* remove most duplicates
* but still could be some duplicates, not guaranteed
* TODO rework this Java-style to return a Set or a List
* get partial pieces, give them back to PeerCoordinator
* @return List of PartialPieces, even those with an offset == 0, or empty list
* @since 0.8.2
*/
synchronized int[] getRequestedPieces()
synchronized List<PartialPiece> returnPartialPieces()
{
int size = outstandingRequests.size();
int[] arr = new int[size+2];
int pc = -1;
int pos = 0;
if (pendingRequest != null) {
pc = pendingRequest.piece;
arr[pos++] = pc;
}
Request req = null;
for (int i = 0; i < size; i++) {
Request r1 = outstandingRequests.get(i);
if (pc != r1.piece) {
pc = r1.piece;
arr[pos++] = pc;
Set<Integer> pcs = getRequestedPieces();
List<PartialPiece> rv = new ArrayList(pcs.size());
for (Integer p : pcs) {
Request req = getLowestOutstandingRequest(p.intValue());
if (req != null)
rv.add(new PartialPiece(req));
}
}
arr[pos] = -1;
return(arr);
return rv;
}
/**
* @return all pieces we are currently requesting, or empty Set
*/
synchronized Set<Integer> getRequestedPieces() {
Set<Integer> rv = new HashSet(outstandingRequests.size() + 1);
for (Request req : outstandingRequests) {
rv.add(Integer.valueOf(req.piece));
if (pendingRequest != null)
rv.add(Integer.valueOf(pendingRequest.piece));
}
return rv;
}
void cancelMessage(int piece, int begin, int length)
@ -555,6 +569,8 @@ class PeerState implements DataLoader
{
synchronized (this) {
out.sendRequests(outstandingRequests);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resending requests to " + peer + outstandingRequests);
}
}
@ -620,24 +636,17 @@ class PeerState implements DataLoader
if (bitfield != null)
{
// Check for adopting an orphaned partial piece
Request r = listener.getPeerPartial(bitfield);
if (r != null) {
// Check that r not already in outstandingRequests
int[] arr = getRequestedPieces();
boolean found = false;
for (int i = 0; arr[i] >= 0; i++) {
if (arr[i] == r.piece) {
found = true;
break;
}
}
if (!found) {
PartialPiece pp = listener.getPartialPiece(peer, bitfield);
if (pp != null) {
// Double-check that r not already in outstandingRequests
if (!getRequestedPieces().contains(Integer.valueOf(pp.getPiece()))) {
Request r = pp.getRequest();
outstandingRequests.add(r);
if (!choked)
out.sendRequest(r);
lastRequest = r;
return true;
}
}
}
// Note that in addition to the bitfield, PeerCoordinator uses

View File

@ -5,7 +5,10 @@ import java.util.Set;
import net.i2p.util.ConcurrentHashSet;
public class Piece implements Comparable {
/**
* This class is used solely by PeerCoordinator.
*/
class Piece implements Comparable {
private int id;
private Set<PeerID> peers;

View File

@ -22,6 +22,7 @@ package org.klomp.snark;
/**
* Holds all information needed for a partial piece request.
* This class should be used only by PeerState, PeerConnectionIn, and PeerConnectionOut.
*/
class Request
{