update requests after changing priorities

This commit is contained in:
zzz
2010-10-29 20:31:07 +00:00
parent 7efb0fa7ed
commit 9baa6e7bc8
3 changed files with 92 additions and 35 deletions

View File

@ -392,6 +392,28 @@ public class Peer implements Comparable
s.havePiece(piece);
}
/**
* Tell the other side that we are no longer interested in any of
* the outstanding requests (if any) for this piece.
* @since 0.8.1
*/
void cancel(int piece) {
PeerState s = state;
if (s != null)
s.cancelPiece(piece);
}
/**
* Update the request queue.
* Call after adding wanted pieces.
* @since 0.8.1
*/
void request() {
PeerState s = state;
if (s != null)
s.addRequest();
}
/**
* Whether or not the peer is interested in pieces we have. Returns
* false if not connected.

View File

@ -460,7 +460,7 @@ public class PeerCoordinator implements PeerListener
}
/**
* Returns true if we don't have the given piece yet.
* @return true if we still want the given piece
*/
public boolean gotHave(Peer peer, int piece)
{
@ -586,8 +586,10 @@ public class PeerCoordinator implements PeerListener
*/
public void updatePiecePriorities() {
int[] pri = storage.getPiecePriorities();
if (pri == null)
if (pri == null) {
_log.debug("Updated piece priorities called but no priorities to set?");
return;
}
synchronized(wantedPieces) {
// Add incomplete and previously unwanted pieces to the list
// Temp to avoid O(n**2)
@ -619,14 +621,31 @@ public class PeerCoordinator implements PeerListener
// now set the new priorities and remove newly unwanted pieces
for (Iterator<Piece> iter = wantedPieces.iterator(); iter.hasNext(); ) {
Piece p = iter.next();
int id = pri[p.getId()];
if (id >= 0)
p.setPriority(pri[p.getId()]);
else
int priority = pri[p.getId()];
if (priority >= 0) {
p.setPriority(priority);
} else {
iter.remove();
// cancel all peers
synchronized(peers) {
for (Peer peer : peers) {
peer.cancel(p.getId());
}
}
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Updated piece priorities, now wanted: " + wantedPieces);
// if we added pieces, they will be in-order unless we shuffle
Collections.shuffle(wantedPieces, _random);
// update request queues, in case we added wanted pieces
// and we were previously uninterested
synchronized(peers) {
for (Peer peer : peers) {
peer.request();
}
}
}
}

View File

@ -55,7 +55,7 @@ class PeerState
final PeerConnectionOut out;
// Outstanding request
private final List outstandingRequests = new ArrayList();
private final List<Request> outstandingRequests = new ArrayList();
private Request lastRequest = null;
private final static int MAX_PIPELINE = 5; // this is for outbound requests
@ -274,7 +274,7 @@ class PeerState
synchronized private int getFirstOutstandingRequest(int piece)
{
for (int i = 0; i < outstandingRequests.size(); i++)
if (((Request)outstandingRequests.get(i)).piece == piece)
if (outstandingRequests.get(i).piece == piece)
return i;
return -1;
}
@ -309,12 +309,12 @@ class PeerState
Request req;
synchronized(this)
{
req = (Request)outstandingRequests.get(r);
req = outstandingRequests.get(r);
while (req.piece == piece && req.off != begin
&& r < outstandingRequests.size() - 1)
{
r++;
req = (Request)outstandingRequests.get(r);
req = outstandingRequests.get(r);
}
// Something wrong?
@ -338,7 +338,7 @@ class PeerState
+ ", wanted for peer: " + peer);
for (int i = 0; i < r; i++)
{
Request dropReq = (Request)outstandingRequests.remove(0);
Request dropReq = outstandingRequests.remove(0);
outstandingRequests.add(dropReq);
if (!choked)
out.sendRequest(dropReq);
@ -362,11 +362,11 @@ class PeerState
{
Request req = null;
for (int i = 0; i < outstandingRequests.size(); i++) {
Request r1 = (Request)outstandingRequests.get(i);
Request r1 = outstandingRequests.get(i);
int j = getFirstOutstandingRequest(r1.piece);
if (j == -1)
continue;
Request r2 = (Request)outstandingRequests.get(j);
Request r2 = outstandingRequests.get(j);
if (r2.off > 0 && ((req == null) || (r2.off > req.off)))
req = r2;
}
@ -394,7 +394,7 @@ class PeerState
}
Request req = null;
for (int i = 0; i < size; i++) {
Request r1 = (Request)outstandingRequests.get(i);
Request r1 = outstandingRequests.get(i);
if (pc != r1.piece) {
pc = r1.piece;
arr[pos++] = pc;
@ -419,32 +419,19 @@ class PeerState
+ " length: " + bs.length);
}
/**
* We now have this piece.
* Tell the peer and cancel any requests for the piece.
*/
void havePiece(int piece)
{
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tell " + peer + " havePiece(" + piece + ")");
synchronized(this)
{
// Tell the other side that we are no longer interested in any of
// the outstanding requests for this piece.
if (lastRequest != null && lastRequest.piece == piece)
lastRequest = null;
Iterator it = outstandingRequests.iterator();
while (it.hasNext())
{
Request req = (Request)it.next();
if (req.piece == piece)
{
it.remove();
// Send cancel even when we are choked to make sure that it is
// really never ever send.
out.sendCancel(req);
}
}
}
cancelPiece(piece);
// Tell the other side that we really have this piece.
out.sendHave(piece);
@ -459,7 +446,33 @@ class PeerState
}
}
// Starts or resumes requesting pieces.
/**
* Tell the other side that we are no longer interested in any of
* the outstanding requests (if any) for this piece.
* @since 0.8.1
*/
synchronized void cancelPiece(int piece) {
if (lastRequest != null && lastRequest.piece == piece)
lastRequest = null;
Iterator<Request> it = outstandingRequests.iterator();
while (it.hasNext())
{
Request req = it.next();
if (req.piece == piece)
{
it.remove();
// Send cancel even when we are choked to make sure that it is
// really never ever send.
out.sendCancel(req);
}
}
}
/**
* Starts or resumes requesting pieces.
* @param resend should we resend outstanding requests?
*/
private void request(boolean resend)
{
// Are there outstanding requests that have to be resend?
@ -476,8 +489,11 @@ class PeerState
/**
* Adds a new request to the outstanding requests list.
* Then send interested if we weren't.
* Then send new requests if not choked.
* If nothing to request, send not interested if we were.
*/
synchronized private void addRequest()
synchronized void addRequest()
{
boolean more_pieces = true;
while (more_pieces)