* i2psnark:

- Drop queued outbound requests when choked
      - Redo some data structures and locking to hopefully prevent deadlock
      - Memory reduction part 3: Return partial pieces to PeerCoordinator when choked
This commit is contained in:
zzz
2010-11-27 14:34:08 +00:00
parent ff828e6417
commit 3c45b038c6
8 changed files with 223 additions and 132 deletions

View File

@ -68,6 +68,7 @@ class PeerState implements DataLoader
/** the tail (NOT the head) of the request queue */
private Request lastRequest = null;
// FIXME if piece size < PARTSIZE, pipeline could be bigger
private final static int MAX_PIPELINE = 5; // this is for outbound requests
private final static int MAX_PIPELINE_BYTES = 128*1024; // this is for inbound requests
public final static int PARTSIZE = 16*1024; // outbound request
@ -107,9 +108,14 @@ class PeerState implements DataLoader
request(resend);
if (choked) {
// TODO
// savePartialPieces
// clear request list
out.cancelRequestMessages();
// old Roberts thrash us here, choke+unchoke right together
List<PartialPiece> pcs = returnPartialPieces();
if (!pcs.isEmpty()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " got choked, returning partial pieces to the PeerCoordinator: " + pcs);
listener.savePartialPieces(this.peer, pcs);
}
}
}
@ -432,7 +438,8 @@ class PeerState implements DataLoader
}
/**
* get partial pieces, give them back to PeerCoordinator
* Get partial pieces, give them back to PeerCoordinator.
* Clears the request queue.
* @return List of PartialPieces, even those with an offset == 0, or empty list
* @since 0.8.2
*/
@ -445,6 +452,9 @@ class PeerState implements DataLoader
if (req != null)
rv.add(new PartialPiece(req));
}
outstandingRequests.clear();
pendingRequest = null;
lastRequest = null;
return rv;
}
@ -513,12 +523,14 @@ class PeerState implements DataLoader
// Request something else if necessary.
addRequest();
/**** taken care of in addRequest()
synchronized(this)
{
// Is the peer still interesting?
if (lastRequest == null)
setInteresting(false);
}
****/
}
/**
@ -568,9 +580,11 @@ class PeerState implements DataLoader
if (resend)
{
synchronized (this) {
out.sendRequests(outstandingRequests);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resending requests to " + peer + outstandingRequests);
if (!outstandingRequests.isEmpty()) {
out.sendRequests(outstandingRequests);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resending requests to " + peer + outstandingRequests);
}
}
}
@ -583,6 +597,15 @@ class PeerState implements DataLoader
* Then send interested if we weren't.
* Then send new requests if not choked.
* If nothing to request, send not interested if we were.
*
* This is called from several places:
*<pre>
* By getOustandingRequest() when the first part of a chunk comes in
* By havePiece() when somebody got a new piece completed
* By chokeMessage() when we receive an unchoke
* By setInteresting() when we are now interested
* By PeerCoordinator.updatePiecePriorities()
*</pre>
*/
synchronized void addRequest()
{
@ -591,9 +614,30 @@ class PeerState implements DataLoader
{
more_pieces = outstandingRequests.size() < MAX_PIPELINE;
// We want something and we don't have outstanding requests?
if (more_pieces && lastRequest == null)
if (more_pieces && lastRequest == null) {
// we have nothing in the queue right now
if (!interesting) {
// If we need something, set interesting but delay pulling
// a request from the PeerCoordinator until unchoked.
if (listener.needPiece(this.peer, bitfield)) {
setInteresting(true);
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() we need something, setting interesting, delaying requestNextPiece()");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() needs nothing");
}
return;
}
if (choked) {
// If choked, delay pulling
// a request from the PeerCoordinator until unchoked.
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() we are choked, delaying requestNextPiece()");
return;
}
more_pieces = requestNextPiece();
else if (more_pieces) // We want something
} else if (more_pieces) // We want something
{
int pieceLength;
boolean isLastChunk;
@ -621,6 +665,10 @@ class PeerState implements DataLoader
}
}
// failsafe
if (interesting && lastRequest == null && outstandingRequests.isEmpty())
setInteresting(false);
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " requests " + outstandingRequests);
}
@ -633,8 +681,7 @@ class PeerState implements DataLoader
private boolean requestNextPiece()
{
// Check that we already know what the other side has.
if (bitfield != null)
{
if (bitfield != null) {
// Check for adopting an orphaned partial piece
PartialPiece pp = listener.getPartialPiece(peer, bitfield);
if (pp != null) {
@ -649,6 +696,7 @@ class PeerState implements DataLoader
}
}
/******* getPartialPiece() does it all now
// Note that in addition to the bitfield, PeerCoordinator uses
// its request tracking and isRequesting() to determine
// what piece to give us next.
@ -683,11 +731,12 @@ class PeerState implements DataLoader
out.sendRequest(req);
lastRequest = req;
return true;
} else {
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " no more pieces to request");
}
}
}
*******/
}
// failsafe
if (outstandingRequests.isEmpty())
@ -707,11 +756,10 @@ class PeerState implements DataLoader
synchronized void setInteresting(boolean interest)
{
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " setInteresting(" + interest + ")");
if (interest != interesting)
{
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " setInteresting(" + interest + ")");
interesting = interest;
out.sendInterest(interest);
@ -722,11 +770,10 @@ class PeerState implements DataLoader
synchronized void setChoking(boolean choke)
{
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " setChoking(" + choke + ")");
if (choking != choke)
{
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " setChoking(" + choke + ")");
choking = choke;
out.sendChoke(choke);
}