merge of '2b0329ad66b84d90d1b7e6e1a6247c6d826321f9'

and '77a4e2e18ab0e48d6f102388838e1367c1d56ebd'
This commit is contained in:
zzz
2010-11-27 16:34:15 +00:00
18 changed files with 280 additions and 160 deletions

View File

@ -21,7 +21,7 @@ class PartialPiece implements Comparable {
* Allocates the data. * Allocates the data.
* *
* @param piece Piece number requested. * @param piece Piece number requested.
* @param bs length must be equal to the piece length * @param len must be equal to the piece length
*/ */
public PartialPiece (int piece, int len) throws OutOfMemoryError { public PartialPiece (int piece, int len) throws OutOfMemoryError {
this.piece = piece; this.piece = piece;

View File

@ -49,10 +49,8 @@ class PeerCheckerTask extends TimerTask
public void run() public void run()
{ {
synchronized(coordinator.peers) List<Peer> peerList = coordinator.peerList();
{ if (peerList.isEmpty() || coordinator.halted()) {
Iterator it = coordinator.peers.iterator();
if ((!it.hasNext()) || coordinator.halted()) {
coordinator.peerCount = 0; coordinator.peerCount = 0;
coordinator.interestedAndChoking = 0; coordinator.interestedAndChoking = 0;
coordinator.setRateHistory(0, 0); coordinator.setRateHistory(0, 0);
@ -76,19 +74,18 @@ class PeerCheckerTask extends TimerTask
// Keep track of peers we remove now, // Keep track of peers we remove now,
// we will add them back to the end of the list. // we will add them back to the end of the list.
List removed = new ArrayList(); List<Peer> removed = new ArrayList();
int uploadLimit = coordinator.allowedUploaders(); int uploadLimit = coordinator.allowedUploaders();
boolean overBWLimit = coordinator.overUpBWLimit(); boolean overBWLimit = coordinator.overUpBWLimit();
while (it.hasNext()) for (Peer peer : peerList) {
{
Peer peer = (Peer)it.next();
// Remove dying peers // Remove dying peers
if (!peer.isConnected()) if (!peer.isConnected())
{ {
it.remove(); // This was just a failsafe, right?
coordinator.removePeerFromPieces(peer); //it.remove();
coordinator.peerCount = coordinator.peers.size(); //coordinator.removePeerFromPieces(peer);
//coordinator.peerCount = coordinator.peers.size();
continue; continue;
} }
@ -140,7 +137,6 @@ class PeerCheckerTask extends TimerTask
coordinator.uploaders--; coordinator.uploaders--;
// Put it at the back of the list // Put it at the back of the list
it.remove();
removed.add(peer); removed.add(peer);
} }
else if (overBWLimitChoke) else if (overBWLimitChoke)
@ -153,7 +149,6 @@ class PeerCheckerTask extends TimerTask
removedCount++; removedCount++;
// Put it at the back of the list for fairness, even though we won't be unchoking this time // Put it at the back of the list for fairness, even though we won't be unchoking this time
it.remove();
removed.add(peer); removed.add(peer);
} }
else if (peer.isInteresting() && peer.isChoked()) else if (peer.isInteresting() && peer.isChoked())
@ -166,7 +161,6 @@ class PeerCheckerTask extends TimerTask
removedCount++; removedCount++;
// Put it at the back of the list // Put it at the back of the list
it.remove();
removed.add(peer); removed.add(peer);
} }
else if (!peer.isInteresting() && !coordinator.completed()) else if (!peer.isInteresting() && !coordinator.completed())
@ -179,7 +173,6 @@ class PeerCheckerTask extends TimerTask
removedCount++; removedCount++;
// Put it at the back of the list // Put it at the back of the list
it.remove();
removed.add(peer); removed.add(peer);
} }
else if (peer.isInteresting() else if (peer.isInteresting()
@ -195,7 +188,6 @@ class PeerCheckerTask extends TimerTask
removedCount++; removedCount++;
// Put it at the back of the list // Put it at the back of the list
it.remove();
removed.add(peer); removed.add(peer);
} }
else if (peer.isInteresting() && !peer.isChoked() && else if (peer.isInteresting() && !peer.isChoked() &&
@ -234,8 +226,6 @@ class PeerCheckerTask extends TimerTask
removedCount++; removedCount++;
// Put it at the back of the list // Put it at the back of the list
coordinator.peers.remove(worstDownloader);
coordinator.peerCount = coordinator.peers.size();
removed.add(worstDownloader); removed.add(worstDownloader);
} }
@ -244,8 +234,12 @@ class PeerCheckerTask extends TimerTask
coordinator.unchokePeer(); coordinator.unchokePeer();
// Put peers back at the end of the list that we removed earlier. // Put peers back at the end of the list that we removed earlier.
coordinator.peers.addAll(removed); synchronized (coordinator.peers) {
coordinator.peerCount = coordinator.peers.size(); for(Peer peer : removed) {
if (coordinator.peers.remove(peer))
coordinator.peers.add(peer);
}
}
coordinator.interestedAndChoking += removedCount; coordinator.interestedAndChoking += removedCount;
// store the rates // store the rates
@ -255,6 +249,5 @@ class PeerCheckerTask extends TimerTask
if (random.nextInt(4) == 0) if (random.nextInt(4) == 0)
coordinator.getStorage().cleanRAFs(); coordinator.getStorage().cleanRAFs();
}
} }
} }

View File

@ -42,7 +42,7 @@ class PeerConnectionOut implements Runnable
private boolean quit; private boolean quit;
// Contains Messages. // Contains Messages.
private final List sendQueue = new ArrayList(); private final List<Message> sendQueue = new ArrayList();
private static long __id = 0; private static long __id = 0;
private long _id; private long _id;
@ -496,6 +496,19 @@ class PeerConnectionOut implements Runnable
addMessage(m); addMessage(m);
} }
/**
* Remove all Request messages from the queue
* @since 0.8.2
*/
void cancelRequestMessages() {
synchronized(sendQueue) {
for (Iterator<Message> it = sendQueue.iterator(); it.hasNext(); ) {
if (it.next().type == Message.REQUEST)
it.remove();
}
}
}
// Called by the PeerState when the other side doesn't want this // Called by the PeerState when the other side doesn't want this
// request to be handled anymore. Removes any pending Piece Message // request to be handled anymore. Removes any pending Piece Message
// from out send queue. // from out send queue.

View File

@ -22,12 +22,15 @@ package org.klomp.snark;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.Random; import java.util.Random;
import java.util.Timer; import java.util.Timer;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
@ -62,7 +65,8 @@ public class PeerCoordinator implements PeerListener
private long downloaded_old[] = {-1,-1,-1}; private long downloaded_old[] = {-1,-1,-1};
// synchronize on this when changing peers or downloaders // synchronize on this when changing peers or downloaders
final List<Peer> peers = new ArrayList(); // This is a Queue, not a Set, because PeerCheckerTask keeps things in order for choking/unchoking
final Queue<Peer> peers;
/** estimate of the peers, without requiring any synchronization */ /** estimate of the peers, without requiring any synchronization */
volatile int peerCount; volatile int peerCount;
@ -72,9 +76,9 @@ public class PeerCoordinator implements PeerListener
private final byte[] id; private final byte[] id;
// Some random wanted pieces // Some random wanted pieces
private List<Piece> wantedPieces; private final List<Piece> wantedPieces;
/** partial pieces */ /** partial pieces - lock by synching on wantedPieces */
private final List<PartialPiece> partialPieces; private final List<PartialPiece> partialPieces;
private boolean halted = false; private boolean halted = false;
@ -96,8 +100,10 @@ public class PeerCoordinator implements PeerListener
this.listener = listener; this.listener = listener;
this.snark = torrent; this.snark = torrent;
wantedPieces = new ArrayList();
setWantedPieces(); setWantedPieces();
partialPieces = new ArrayList(getMaxConnections() + 1); partialPieces = new ArrayList(getMaxConnections() + 1);
peers = new LinkedBlockingQueue();
// Install a timer to check the uploaders. // Install a timer to check the uploaders.
// Randomize the first start time so multiple tasks are spread out, // Randomize the first start time so multiple tasks are spread out,
@ -109,22 +115,22 @@ public class PeerCoordinator implements PeerListener
public void setWantedPieces() public void setWantedPieces()
{ {
// Make a list of pieces // Make a list of pieces
// FIXME synchronize, clear and re-add instead? synchronized(wantedPieces) {
// Don't replace something we are synchronizing on. wantedPieces.clear();
wantedPieces = new ArrayList(); BitField bitfield = storage.getBitField();
BitField bitfield = storage.getBitField(); int[] pri = storage.getPiecePriorities();
int[] pri = storage.getPiecePriorities(); for (int i = 0; i < metainfo.getPieces(); i++) {
for(int i = 0; i < metainfo.getPieces(); i++) { // only add if we don't have and the priority is >= 0
// only add if we don't have and the priority is >= 0 if ((!bitfield.get(i)) &&
if ((!bitfield.get(i)) && (pri == null || pri[i] >= 0)) {
(pri == null || pri[i] >= 0)) { Piece p = new Piece(i);
Piece p = new Piece(i); if (pri != null)
if (pri != null) p.setPriority(pri[i]);
p.setPriority(pri[i]); wantedPieces.add(p);
wantedPieces.add(p); }
}
Collections.shuffle(wantedPieces, _random);
} }
}
Collections.shuffle(wantedPieces, _random);
} }
public Storage getStorage() { return storage; } public Storage getStorage() { return storage; }
@ -133,10 +139,7 @@ public class PeerCoordinator implements PeerListener
// for web page detailed stats // for web page detailed stats
public List<Peer> peerList() public List<Peer> peerList()
{ {
synchronized(peers)
{
return new ArrayList(peers); return new ArrayList(peers);
}
} }
public byte[] getID() public byte[] getID()
@ -155,12 +158,9 @@ public class PeerCoordinator implements PeerListener
/** should be right */ /** should be right */
public int getPeers() public int getPeers()
{ {
synchronized(peers)
{
int rv = peers.size(); int rv = peers.size();
peerCount = rv; peerCount = rv;
return rv; return rv;
}
} }
/** /**
@ -254,10 +254,7 @@ public class PeerCoordinator implements PeerListener
public boolean needPeers() public boolean needPeers()
{ {
synchronized(peers)
{
return !halted && peers.size() < getMaxConnections(); return !halted && peers.size() < getMaxConnections();
}
} }
/** /**
@ -344,7 +341,10 @@ public class PeerCoordinator implements PeerListener
// Add it to the beginning of the list. // Add it to the beginning of the list.
// And try to optimistically make it a uploader. // And try to optimistically make it a uploader.
peers.add(0, peer); // Can't add to beginning since we converted from a List to a Queue
// We can do this in Java 6 with a Deque
//peers.add(0, peer);
peers.add(peer);
peerCount = peers.size(); peerCount = peers.size();
unchokePeer(); unchokePeer();
@ -358,8 +358,10 @@ public class PeerCoordinator implements PeerListener
} }
} }
// caller must synchronize on peers /**
private static Peer peerIDInList(PeerID pid, List peers) * @return peer if peer id is in the collection, else null
*/
private static Peer peerIDInList(PeerID pid, Collection<Peer> peers)
{ {
Iterator<Peer> it = peers.iterator(); Iterator<Peer> it = peers.iterator();
while (it.hasNext()) { while (it.hasNext()) {
@ -429,7 +431,6 @@ public class PeerCoordinator implements PeerListener
// At the start are the peers that have us unchoked at the end the // At the start are the peers that have us unchoked at the end the
// other peer that are interested, but are choking us. // other peer that are interested, but are choking us.
List<Peer> interested = new LinkedList(); List<Peer> interested = new LinkedList();
synchronized (peers) {
int count = 0; int count = 0;
int unchokedCount = 0; int unchokedCount = 0;
int maxUploaders = allowedUploaders(); int maxUploaders = allowedUploaders();
@ -464,7 +465,6 @@ public class PeerCoordinator implements PeerListener
peerCount = peers.size(); peerCount = peers.size();
} }
interestedAndChoking = count; interestedAndChoking = count;
}
} }
public byte[] getBitMap() public byte[] getBitMap()
@ -528,8 +528,19 @@ public class PeerCoordinator implements PeerListener
* Returns one of pieces in the given BitField that is still wanted or * Returns one of pieces in the given BitField that is still wanted or
* -1 if none of the given pieces are wanted. * -1 if none of the given pieces are wanted.
*/ */
public int wantPiece(Peer peer, BitField havePieces) public int wantPiece(Peer peer, BitField havePieces) {
{ return wantPiece(peer, havePieces, true);
}
/**
* Returns one of pieces in the given BitField that is still wanted or
* -1 if none of the given pieces are wanted.
*
* @param record if true, actually record in our data structures that we gave the
* request to this peer. If false, do not update the data structures.
* @since 0.8.2
*/
private int wantPiece(Peer peer, BitField havePieces, boolean record) {
if (halted) { if (halted) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("We don't want anything from the peer, as we are halted! peer=" + peer); _log.warn("We don't want anything from the peer, as we are halted! peer=" + peer);
@ -539,7 +550,8 @@ public class PeerCoordinator implements PeerListener
synchronized(wantedPieces) synchronized(wantedPieces)
{ {
Piece piece = null; Piece piece = null;
Collections.sort(wantedPieces); // Sort in order of rarest first. if (record)
Collections.sort(wantedPieces); // Sort in order of rarest first.
List<Piece> requested = new ArrayList(); List<Piece> requested = new ArrayList();
Iterator<Piece> it = wantedPieces.iterator(); Iterator<Piece> it = wantedPieces.iterator();
while (piece == null && it.hasNext()) while (piece == null && it.hasNext())
@ -567,7 +579,8 @@ public class PeerCoordinator implements PeerListener
return -1; // nothing to request and not in end game return -1; // nothing to request and not in end game
// let's not all get on the same piece // let's not all get on the same piece
// Even better would be to sort by number of requests // Even better would be to sort by number of requests
Collections.shuffle(requested, _random); if (record)
Collections.shuffle(requested, _random);
Iterator<Piece> it2 = requested.iterator(); Iterator<Piece> it2 = requested.iterator();
while (piece == null && it2.hasNext()) while (piece == null && it2.hasNext())
{ {
@ -575,7 +588,6 @@ public class PeerCoordinator implements PeerListener
if (havePieces.get(p.getId())) { if (havePieces.get(p.getId())) {
// limit number of parallel requests // limit number of parallel requests
int requestedCount = 0; int requestedCount = 0;
synchronized(peers) {
for (Peer pr : peers) { for (Peer pr : peers) {
if (pr.isRequesting(p.getId())) { if (pr.isRequesting(p.getId())) {
if (pr.equals(peer)) { if (pr.equals(peer)) {
@ -587,7 +599,6 @@ public class PeerCoordinator implements PeerListener
break; break;
} }
} }
}
if (requestedCount >= MAX_PARALLEL_REQUESTS) if (requestedCount >= MAX_PARALLEL_REQUESTS)
continue; continue;
piece = p; piece = p;
@ -608,9 +619,11 @@ public class PeerCoordinator implements PeerListener
_log.debug("parallel request (end game?) for " + peer + ": piece = " + piece); _log.debug("parallel request (end game?) for " + peer + ": piece = " + piece);
} }
} }
if (_log.shouldLog(Log.DEBUG)) if (record) {
_log.debug("Now requesting: piece " + piece + " priority " + piece.getPriority()); if (_log.shouldLog(Log.DEBUG))
piece.setRequested(true); _log.debug("Now requesting: piece " + piece + " priority " + piece.getPriority());
piece.setRequested(true);
}
return piece.getId(); return piece.getId();
} }
} }
@ -641,7 +654,6 @@ public class PeerCoordinator implements PeerListener
wantedPieces.add(piece); wantedPieces.add(piece);
// As connections are already up, new Pieces will // As connections are already up, new Pieces will
// not have their PeerID list populated, so do that. // not have their PeerID list populated, so do that.
synchronized(peers) {
for (Peer p : peers) { for (Peer p : peers) {
PeerState s = p.state; PeerState s = p.state;
if (s != null) { if (s != null) {
@ -650,7 +662,6 @@ public class PeerCoordinator implements PeerListener
piece.addPeer(p); piece.addPeer(p);
} }
} }
}
} }
} }
} }
@ -663,11 +674,9 @@ public class PeerCoordinator implements PeerListener
} else { } else {
iter.remove(); iter.remove();
// cancel all peers // cancel all peers
synchronized(peers) {
for (Peer peer : peers) { for (Peer peer : peers) {
peer.cancel(p.getId()); peer.cancel(p.getId());
} }
}
} }
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -677,10 +686,8 @@ public class PeerCoordinator implements PeerListener
// update request queues, in case we added wanted pieces // update request queues, in case we added wanted pieces
// and we were previously uninterested // and we were previously uninterested
synchronized(peers) {
for (Peer peer : peers) { for (Peer peer : peers) {
peer.request(); peer.request();
}
} }
} }
} }
@ -784,8 +791,6 @@ public class PeerCoordinator implements PeerListener
// Announce to the world we have it! // Announce to the world we have it!
// Disconnect from other seeders when we get the last piece // Disconnect from other seeders when we get the last piece
synchronized(peers)
{
List<Peer> toDisconnect = new ArrayList(); List<Peer> toDisconnect = new ArrayList();
Iterator<Peer> it = peers.iterator(); Iterator<Peer> it = peers.iterator();
while (it.hasNext()) while (it.hasNext())
@ -805,11 +810,11 @@ public class PeerCoordinator implements PeerListener
Peer p = it.next(); Peer p = it.next();
p.disconnect(true); p.disconnect(true);
} }
}
return true; return true;
} }
/** this does nothing but logging */
public void gotChoke(Peer peer, boolean choke) public void gotChoke(Peer peer, boolean choke)
{ {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
@ -823,8 +828,6 @@ public class PeerCoordinator implements PeerListener
{ {
if (interest) if (interest)
{ {
synchronized(peers)
{
if (uploaders < allowedUploaders()) if (uploaders < allowedUploaders())
{ {
if(peer.isChoking()) if(peer.isChoking())
@ -835,7 +838,6 @@ public class PeerCoordinator implements PeerListener
_log.info("Unchoke: " + peer); _log.info("Unchoke: " + peer);
} }
} }
}
} }
if (listener != null) if (listener != null)
@ -893,7 +895,7 @@ public class PeerCoordinator implements PeerListener
return; return;
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Partials received from " + peer + ": " + partials); _log.info("Partials received from " + peer + ": " + partials);
synchronized(partialPieces) { synchronized(wantedPieces) {
for (PartialPiece pp : partials) { for (PartialPiece pp : partials) {
if (pp.getDownloaded() > 0) { if (pp.getDownloaded() > 0) {
// PartialPiece.equals() only compares piece number, which is what we want // PartialPiece.equals() only compares piece number, which is what we want
@ -936,26 +938,23 @@ public class PeerCoordinator implements PeerListener
* @since 0.8.2 * @since 0.8.2
*/ */
public PartialPiece getPartialPiece(Peer peer, BitField havePieces) { public PartialPiece getPartialPiece(Peer peer, BitField havePieces) {
// do it in this order to avoid deadlock (same order as in savePartialPieces()) synchronized(wantedPieces) {
synchronized(partialPieces) { // sorts by remaining bytes, least first
synchronized(wantedPieces) { Collections.sort(partialPieces);
// sorts by remaining bytes, least first for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) {
Collections.sort(partialPieces); PartialPiece pp = iter.next();
for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) { int savedPiece = pp.getPiece();
PartialPiece pp = iter.next(); if (havePieces.get(savedPiece)) {
int savedPiece = pp.getPiece(); // this is just a double-check, it should be in there
if (havePieces.get(savedPiece)) { for(Piece piece : wantedPieces) {
// this is just a double-check, it should be in there if (piece.getId() == savedPiece) {
for(Piece piece : wantedPieces) { piece.setRequested(true);
if (piece.getId() == savedPiece) { iter.remove();
piece.setRequested(true); if (_log.shouldLog(Log.INFO)) {
iter.remove(); _log.info("Restoring orphaned partial piece " + pp +
if (_log.shouldLog(Log.INFO)) { " Partial list size now: " + partialPieces.size());
_log.info("Restoring orphaned partial piece " + pp + }
" Partial list size now: " + partialPieces.size()); return pp;
}
return pp;
}
} }
} }
} }
@ -977,13 +976,44 @@ public class PeerCoordinator implements PeerListener
return null; return null;
} }
/**
* Called when we are downloading from the peer and may need to ask for
* a new piece. Returns true if wantPiece() or getPartialPiece() would return a piece.
*
* @param peer the Peer that will be asked to provide the piece.
* @param havePieces a BitField containing the pieces that the other
* side has.
*
* @return if we want any of what the peer has
* @since 0.8.2
*/
public boolean needPiece(Peer peer, BitField havePieces) {
synchronized(wantedPieces) {
for (PartialPiece pp : partialPieces) {
int savedPiece = pp.getPiece();
if (havePieces.get(savedPiece)) {
// this is just a double-check, it should be in there
for(Piece piece : wantedPieces) {
if (piece.getId() == savedPiece) {
if (_log.shouldLog(Log.INFO)) {
_log.info("We could restore orphaned partial piece " + pp);
}
return true;
}
}
}
}
}
return wantPiece(peer, havePieces, false) > 0;
}
/** /**
* Remove saved state for this piece. * Remove saved state for this piece.
* Unless we are in the end game there shouldnt be anything in there. * Unless we are in the end game there shouldnt be anything in there.
* Do not call with wantedPieces lock held (deadlock) * Do not call with wantedPieces lock held (deadlock)
*/ */
private void removePartialPiece(int piece) { private void removePartialPiece(int piece) {
synchronized(partialPieces) { synchronized(wantedPieces) {
for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) { for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) {
PartialPiece pp = iter.next(); PartialPiece pp = iter.next();
if (pp.getPiece() == piece) { if (pp.getPiece() == piece) {
@ -1000,11 +1030,7 @@ public class PeerCoordinator implements PeerListener
private void markUnrequestedIfOnlyOne(Peer peer, int piece) private void markUnrequestedIfOnlyOne(Peer peer, int piece)
{ {
// see if anybody else is requesting // see if anybody else is requesting
synchronized (peers) for (Peer p : peers) {
{
Iterator<Peer> it = peers.iterator();
while (it.hasNext()) {
Peer p = it.next();
if (p.equals(peer)) if (p.equals(peer))
continue; continue;
if (p.state == null) if (p.state == null)
@ -1016,16 +1042,13 @@ public class PeerCoordinator implements PeerListener
return; return;
} }
} }
}
// nobody is, so mark unrequested // nobody is, so mark unrequested
synchronized(wantedPieces) synchronized(wantedPieces)
{ {
Iterator<Piece> it = wantedPieces.iterator(); for (Piece pc : wantedPieces) {
while (it.hasNext()) { if (pc.getId() == piece) {
Piece p = it.next(); pc.setRequested(false);
if (p.getId() == piece) {
p.setRequested(false);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing from request list piece " + piece); _log.debug("Removing from request list piece " + piece);
return; return;

View File

@ -147,11 +147,25 @@ interface PeerListener
*/ */
int wantPiece(Peer peer, BitField bitfield); int wantPiece(Peer peer, BitField bitfield);
/**
* Called when we are downloading from the peer and may need to ask for
* a new piece. Returns true if wantPiece() or getPartialPiece() would return a piece.
*
* @param peer the Peer that will be asked to provide the piece.
* @param bitfield a BitField containing the pieces that the other
* side has.
*
* @return if we want any of what the peer has
* @since 0.8.2
*/
boolean needPiece(Peer peer, BitField bitfield);
/** /**
* Called when the peer has disconnected and the peer task may have a partially * Called when the peer has disconnected and the peer task may have a partially
* downloaded piece that the PeerCoordinator can save * downloaded piece that the PeerCoordinator can save
* *
* @param state the PeerState for the peer * @param peer the peer
* @since 0.8.2
*/ */
void savePartialPieces(Peer peer, List<PartialPiece> pcs); void savePartialPieces(Peer peer, List<PartialPiece> pcs);
@ -162,6 +176,7 @@ interface PeerListener
* @param havePieces the have-pieces bitmask for the peer * @param havePieces the have-pieces bitmask for the peer
* *
* @return request (contains the partial data and valid length) * @return request (contains the partial data and valid length)
* @since 0.8.2
*/ */
PartialPiece getPartialPiece(Peer peer, BitField havePieces); PartialPiece getPartialPiece(Peer peer, BitField havePieces);
} }

View File

@ -68,6 +68,7 @@ class PeerState implements DataLoader
/** the tail (NOT the head) of the request queue */ /** the tail (NOT the head) of the request queue */
private Request lastRequest = null; private Request lastRequest = null;
// FIXME if piece size < PARTSIZE, pipeline could be bigger
private final static int MAX_PIPELINE = 5; // this is for outbound requests private final static int MAX_PIPELINE = 5; // this is for outbound requests
private final static int MAX_PIPELINE_BYTES = 128*1024; // this is for inbound requests private final static int MAX_PIPELINE_BYTES = 128*1024; // this is for inbound requests
public final static int PARTSIZE = 16*1024; // outbound request public final static int PARTSIZE = 16*1024; // outbound request
@ -107,9 +108,14 @@ class PeerState implements DataLoader
request(resend); request(resend);
if (choked) { if (choked) {
// TODO out.cancelRequestMessages();
// savePartialPieces // old Roberts thrash us here, choke+unchoke right together
// clear request list List<PartialPiece> pcs = returnPartialPieces();
if (!pcs.isEmpty()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " got choked, returning partial pieces to the PeerCoordinator: " + pcs);
listener.savePartialPieces(this.peer, pcs);
}
} }
} }
@ -432,7 +438,8 @@ class PeerState implements DataLoader
} }
/** /**
* get partial pieces, give them back to PeerCoordinator * Get partial pieces, give them back to PeerCoordinator.
* Clears the request queue.
* @return List of PartialPieces, even those with an offset == 0, or empty list * @return List of PartialPieces, even those with an offset == 0, or empty list
* @since 0.8.2 * @since 0.8.2
*/ */
@ -445,6 +452,9 @@ class PeerState implements DataLoader
if (req != null) if (req != null)
rv.add(new PartialPiece(req)); rv.add(new PartialPiece(req));
} }
outstandingRequests.clear();
pendingRequest = null;
lastRequest = null;
return rv; return rv;
} }
@ -513,12 +523,14 @@ class PeerState implements DataLoader
// Request something else if necessary. // Request something else if necessary.
addRequest(); addRequest();
/**** taken care of in addRequest()
synchronized(this) synchronized(this)
{ {
// Is the peer still interesting? // Is the peer still interesting?
if (lastRequest == null) if (lastRequest == null)
setInteresting(false); setInteresting(false);
} }
****/
} }
/** /**
@ -568,9 +580,11 @@ class PeerState implements DataLoader
if (resend) if (resend)
{ {
synchronized (this) { synchronized (this) {
out.sendRequests(outstandingRequests); if (!outstandingRequests.isEmpty()) {
if (_log.shouldLog(Log.DEBUG)) out.sendRequests(outstandingRequests);
_log.debug("Resending requests to " + peer + outstandingRequests); if (_log.shouldLog(Log.DEBUG))
_log.debug("Resending requests to " + peer + outstandingRequests);
}
} }
} }
@ -583,6 +597,15 @@ class PeerState implements DataLoader
* Then send interested if we weren't. * Then send interested if we weren't.
* Then send new requests if not choked. * Then send new requests if not choked.
* If nothing to request, send not interested if we were. * If nothing to request, send not interested if we were.
*
* This is called from several places:
*<pre>
* By getOustandingRequest() when the first part of a chunk comes in
* By havePiece() when somebody got a new piece completed
* By chokeMessage() when we receive an unchoke
* By setInteresting() when we are now interested
* By PeerCoordinator.updatePiecePriorities()
*</pre>
*/ */
synchronized void addRequest() synchronized void addRequest()
{ {
@ -591,9 +614,30 @@ class PeerState implements DataLoader
{ {
more_pieces = outstandingRequests.size() < MAX_PIPELINE; more_pieces = outstandingRequests.size() < MAX_PIPELINE;
// We want something and we don't have outstanding requests? // We want something and we don't have outstanding requests?
if (more_pieces && lastRequest == null) if (more_pieces && lastRequest == null) {
// we have nothing in the queue right now
if (!interesting) {
// If we need something, set interesting but delay pulling
// a request from the PeerCoordinator until unchoked.
if (listener.needPiece(this.peer, bitfield)) {
setInteresting(true);
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() we need something, setting interesting, delaying requestNextPiece()");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() needs nothing");
}
return;
}
if (choked) {
// If choked, delay pulling
// a request from the PeerCoordinator until unchoked.
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() we are choked, delaying requestNextPiece()");
return;
}
more_pieces = requestNextPiece(); more_pieces = requestNextPiece();
else if (more_pieces) // We want something } else if (more_pieces) // We want something
{ {
int pieceLength; int pieceLength;
boolean isLastChunk; boolean isLastChunk;
@ -621,6 +665,10 @@ class PeerState implements DataLoader
} }
} }
// failsafe
if (interesting && lastRequest == null && outstandingRequests.isEmpty())
setInteresting(false);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " requests " + outstandingRequests); _log.debug(peer + " requests " + outstandingRequests);
} }
@ -633,8 +681,7 @@ class PeerState implements DataLoader
private boolean requestNextPiece() private boolean requestNextPiece()
{ {
// Check that we already know what the other side has. // Check that we already know what the other side has.
if (bitfield != null) if (bitfield != null) {
{
// Check for adopting an orphaned partial piece // Check for adopting an orphaned partial piece
PartialPiece pp = listener.getPartialPiece(peer, bitfield); PartialPiece pp = listener.getPartialPiece(peer, bitfield);
if (pp != null) { if (pp != null) {
@ -649,6 +696,7 @@ class PeerState implements DataLoader
} }
} }
/******* getPartialPiece() does it all now
// Note that in addition to the bitfield, PeerCoordinator uses // Note that in addition to the bitfield, PeerCoordinator uses
// its request tracking and isRequesting() to determine // its request tracking and isRequesting() to determine
// what piece to give us next. // what piece to give us next.
@ -683,11 +731,12 @@ class PeerState implements DataLoader
out.sendRequest(req); out.sendRequest(req);
lastRequest = req; lastRequest = req;
return true; return true;
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " no more pieces to request"); _log.debug(peer + " no more pieces to request");
} }
} *******/
}
// failsafe // failsafe
if (outstandingRequests.isEmpty()) if (outstandingRequests.isEmpty())
@ -707,11 +756,10 @@ class PeerState implements DataLoader
synchronized void setInteresting(boolean interest) synchronized void setInteresting(boolean interest)
{ {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " setInteresting(" + interest + ")");
if (interest != interesting) if (interest != interesting)
{ {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " setInteresting(" + interest + ")");
interesting = interest; interesting = interest;
out.sendInterest(interest); out.sendInterest(interest);
@ -722,11 +770,10 @@ class PeerState implements DataLoader
synchronized void setChoking(boolean choke) synchronized void setChoking(boolean choke)
{ {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " setChoking(" + choke + ")");
if (choking != choke) if (choking != choke)
{ {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " setChoking(" + choke + ")");
choking = choke; choking = choke;
out.sendChoke(choke); out.sendChoke(choke);
} }

View File

@ -745,7 +745,7 @@ public class Snark
_util.debug(s, level, null); _util.debug(s, level, null);
} }
/** coordinatorListener */ /** CoordinatorListener - this does nothing */
public void peerChange(PeerCoordinator coordinator, Peer peer) public void peerChange(PeerCoordinator coordinator, Peer peer)
{ {
// System.out.println(peer.toString()); // System.out.println(peer.toString());

View File

@ -851,7 +851,7 @@ public class SnarkManager implements Snark.CompleteListener {
buf.append('/'); buf.append('/');
buf.append("\">").append(snark.storage.getBaseName()).append("</a>"); buf.append("\">").append(snark.storage.getBaseName()).append("</a>");
long len = snark.meta.getTotalLength(); long len = snark.meta.getTotalLength();
addMessage(_("Download finished: {0}", buf.toString()) + " (" + _("size: {0}B", DataHelper.formatSize2(len)) + ')'); addMessage(_("Download finished: {0}", buf.toString())); // + " (" + _("size: {0}B", DataHelper.formatSize2(len)) + ')');
updateStatus(snark); updateStatus(snark);
} }

View File

@ -93,10 +93,6 @@
<jar destfile="./build/routerconsole.jar" basedir="./build/obj" includes="**/*.class" update="true" /> <jar destfile="./build/routerconsole.jar" basedir="./build/obj" includes="**/*.class" update="true" />
</target> </target>
<target name="jarWithJavadoc" depends="jar">
<jar destfile="build/routerconsole.war" basedir="../../../build/" includes="javadoc/**/*" update="true" />
</target>
<target name="poupdate" depends="build"> <target name="poupdate" depends="build">
<ant target="war" /> <ant target="war" />
<!-- Update the messages_*.po files. <!-- Update the messages_*.po files.

View File

@ -5,7 +5,6 @@ import java.util.List;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import net.i2p.stat.StatManager; import net.i2p.stat.StatManager;
import net.i2p.util.Log;
/** /**
* Handler to deal with form submissions from the stats config form and act * Handler to deal with form submissions from the stats config form and act
@ -41,14 +40,10 @@ public class ConfigStatsHandler extends FormHandler {
if (stats != null) { if (stats != null) {
for (int i = 0; i < stats.length; i++) { for (int i = 0; i < stats.length; i++) {
String cur = stats[i].trim(); String cur = stats[i].trim();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stat: [" + cur + "]");
if ( (cur.length() > 0) && (!_stats.contains(cur)) ) if ( (cur.length() > 0) && (!_stats.contains(cur)) )
_stats.add(cur); _stats.add(cur);
} }
} }
if (_log.shouldLog(Log.DEBUG))
_log.debug("Updated stats: " + _stats);
} }
public void setGraphList(String stats[]) { public void setGraphList(String stats[]) {
@ -66,8 +61,6 @@ public class ConfigStatsHandler extends FormHandler {
} else { } else {
_graphs = ""; _graphs = "";
} }
if (_log.shouldLog(Log.DEBUG))
_log.debug("Updated graphs: " + _graphs);
} }
public void setExplicitFilter(String foo) { _explicitFilter = true; } public void setExplicitFilter(String foo) { _explicitFilter = true; }

View File

@ -55,7 +55,7 @@ public class FormHandler {
/** /**
* Call this to prevent changes using GET * Call this to prevent changes using GET
* *
* @param the request method * @param val the request method
* @since 0.8.2 * @since 0.8.2
*/ */
public void storeMethod(String val) { _method = val; } public void storeMethod(String val) { _method = val; }

View File

@ -366,8 +366,18 @@ public class PluginUpdateHandler extends UpdateHandler {
} else { } else {
// start everything // start everything
try { try {
if (PluginStarter.startPlugin(_context, appName)) if (PluginStarter.startPlugin(_context, appName)) {
statusDone("<b>" + _("Plugin {0} installed and started", appName) + "</b>"); String linkName = ConfigClientsHelper.stripHTML(props, "consoleLinkName_" + Messages.getLanguage(_context));
if (linkName == null)
linkName = ConfigClientsHelper.stripHTML(props, "consoleLinkName");
String linkURL = ConfigClientsHelper.stripHTML(props, "consoleLinkURL");
String link;
if (linkName != null && linkURL != null)
link = "<a target=\"_blank\" href=\"" + linkURL + "\"/>" + linkName + "</a>";
else
link = appName;
statusDone("<b>" + _("Plugin {0} installed and started", link) + "</b>");
}
else else
statusDone("<b>" + _("Plugin {0} installed but failed to start, check logs", appName) + "</b>"); statusDone("<b>" + _("Plugin {0} installed but failed to start, check logs", appName) + "</b>");
} catch (Throwable e) { } catch (Throwable e) {

View File

@ -147,8 +147,12 @@ public class SummaryBarRenderer {
.append(_("Local Destinations")) .append(_("Local Destinations"))
.append("\">") .append("\">")
.append(_("I2PTunnel")) .append(_("I2PTunnel"))
.append("</a>\n");
.append("</a></td></tr></table>\n"); File javadoc = new File(_context.getBaseDir(), "docs/javadoc/index.html");
if (javadoc.exists())
buf.append("<a href=\"/javadoc/index.html\" target=\"_blank\">Javadoc</a>\n");
buf.append("</td></tr></table>\n");
out.write(buf.toString()); out.write(buf.toString());
buf.setLength(0); buf.setLength(0);

View File

@ -12,6 +12,11 @@
<url-pattern>/themes/*</url-pattern> <url-pattern>/themes/*</url-pattern>
</servlet-mapping> </servlet-mapping>
<servlet-mapping>
<servlet-name>net.i2p.router.web.jsp.viewtheme_jsp</servlet-name>
<url-pattern>/javadoc/*</url-pattern>
</servlet-mapping>
<session-config> <session-config>
<session-timeout> <session-timeout>
30 30

View File

@ -128,13 +128,15 @@ public class MessageOutputStream extends OutputStream {
remaining -= toWrite; remaining -= toWrite;
cur += toWrite; cur += toWrite;
_valid = _buf.length; _valid = _buf.length;
if (_dataReceiver == null) { // avoid NPE from race with destroy()
DataReceiver rcvr = _dataReceiver;
if (rcvr == null) {
throwAnyError(); throwAnyError();
return; return;
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("write() direct valid = " + _valid); _log.info("write() direct valid = " + _valid);
ws = _dataReceiver.writeData(_buf, 0, _valid); ws = rcvr.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
throwAnyError(); throwAnyError();
@ -256,8 +258,10 @@ public class MessageOutputStream extends OutputStream {
if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) { if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("doFlush() valid = " + _valid); _log.info("doFlush() valid = " + _valid);
if ( (_buf != null) && (_dataReceiver != null) ) { // avoid NPE from race with destroy()
ws = _dataReceiver.writeData(_buf, 0, _valid); DataReceiver rcvr = _dataReceiver;
if ( (_buf != null) && (rcvr != null) ) {
ws = rcvr.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
_lastFlushed = _context.clock().now(); _lastFlushed = _context.clock().now();
@ -309,12 +313,16 @@ public class MessageOutputStream extends OutputStream {
WriteStatus ws = null; WriteStatus ws = null;
if (_log.shouldLog(Log.INFO) && _valid > 0) if (_log.shouldLog(Log.INFO) && _valid > 0)
_log.info("flush() valid = " + _valid); _log.info("flush() valid = " + _valid);
// avoid NPE from race with destroy()
DataReceiver rcvr = _dataReceiver;
synchronized (_dataLock) { synchronized (_dataLock) {
if (_buf == null) { if (_buf == null) {
_dataLock.notifyAll(); _dataLock.notifyAll();
throw new IOException("closed (buffer went away)"); throw new IOException("closed (buffer went away)");
} }
if (_dataReceiver == null) {
if (rcvr == null) {
_dataLock.notifyAll(); _dataLock.notifyAll();
throwAnyError(); throwAnyError();
return; return;
@ -324,7 +332,7 @@ public class MessageOutputStream extends OutputStream {
// Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below // Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below
// (disabled) // (disabled)
if (!wait_for_accept_only) { if (!wait_for_accept_only) {
ws = _dataReceiver.writeData(_buf, 0, _valid); ws = rcvr.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
locked_updateBufferSize(); locked_updateBufferSize();
@ -336,7 +344,7 @@ public class MessageOutputStream extends OutputStream {
// Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1 // Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1
// must do this outside the data lock // must do this outside the data lock
if (wait_for_accept_only) { if (wait_for_accept_only) {
flushAvailable(_dataReceiver, true); flushAvailable(rcvr, true);
return; return;
} }
@ -417,10 +425,13 @@ public class MessageOutputStream extends OutputStream {
ByteArray ba = null; ByteArray ba = null;
if (_log.shouldLog(Log.INFO) && _valid > 0) if (_log.shouldLog(Log.INFO) && _valid > 0)
_log.info("clearData() valid = " + _valid); _log.info("clearData() valid = " + _valid);
// avoid NPE from race with destroy()
DataReceiver rcvr = _dataReceiver;
synchronized (_dataLock) { synchronized (_dataLock) {
// flush any data, but don't wait for it // flush any data, but don't wait for it
if ( (_dataReceiver != null) && (_valid > 0) && shouldFlush) if ( (rcvr != null) && (_valid > 0) && shouldFlush)
_dataReceiver.writeData(_buf, 0, _valid); rcvr.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;

View File

@ -24,6 +24,7 @@
<echo message=" installer: build the GUI installer" /> <echo message=" installer: build the GUI installer" />
<echo message=" tarball: tar the full install into i2p.tar.bz2 (extracts to build a new clean install)" /> <echo message=" tarball: tar the full install into i2p.tar.bz2 (extracts to build a new clean install)" />
<echo message=" updater: tar the built i2p specific files into an i2pupdate.zip (extracts safely over existing installs)" /> <echo message=" updater: tar the built i2p specific files into an i2pupdate.zip (extracts safely over existing installs)" />
<echo message=" updaterWithJavadoc: updater including the javadocs, for display in the console" />
<echo message=" updaterWithJetty: tar the built i2p specific files and jetty into an i2pupdate.zip (extracts safely over existing installs)" /> <echo message=" updaterWithJetty: tar the built i2p specific files and jetty into an i2pupdate.zip (extracts safely over existing installs)" />
<echo message=" updaterWithJettyFixes: updater including local jetty patches" /> <echo message=" updaterWithJettyFixes: updater including local jetty patches" />
<echo message=" updaterWithGeoIP: updater including GeoIP Files" /> <echo message=" updaterWithGeoIP: updater including GeoIP Files" />
@ -360,6 +361,9 @@
<copy file="installer/resources/i2prouter" todir="pkg-temp/" /> <copy file="installer/resources/i2prouter" todir="pkg-temp/" />
<copy file="installer/resources/osid" todir="pkg-temp/" /> <copy file="installer/resources/osid" todir="pkg-temp/" />
<copy file="installer/resources/postinstall.sh" todir="pkg-temp/" /> <copy file="installer/resources/postinstall.sh" todir="pkg-temp/" />
<copy todir="pkg-temp/man/">
<fileset dir="installer/resources/man/" />
</copy>
<copy todir="pkg-temp/lib/wrapper/linux/"> <copy todir="pkg-temp/lib/wrapper/linux/">
<fileset dir="installer/lib/wrapper/linux/" /> <fileset dir="installer/lib/wrapper/linux/" />
</copy> </copy>
@ -493,9 +497,16 @@
<zip destfile="docs.zip" basedir="pkg-temp" whenempty="fail" /> <zip destfile="docs.zip" basedir="pkg-temp" whenempty="fail" />
</target> </target>
<target name="copyJavadoc" depends="javadoc">
<copy todir="pkg-temp/docs/javadoc" >
<fileset dir="build/javadoc/" />
</copy>
</target>
<target name="updater200" depends="prepupdate, preplicenses, pack200, zipit200" /> <target name="updater200" depends="prepupdate, preplicenses, pack200, zipit200" />
<target name="updater200WithJettyFixes" depends="prepjupdatefixes, preplicenses, pack200, zipit200" /> <target name="updater200WithJettyFixes" depends="prepjupdatefixes, preplicenses, pack200, zipit200" />
<target name="updater" depends="prepupdate, preplicenses, zipit" /> <target name="updater" depends="prepupdate, preplicenses, zipit" />
<target name="updaterWithJavadoc" depends="prepupdate, preplicenses, copyJavadoc, zipit" />
<target name="updaterWithGeoIP" depends="prepupdate, prepgeoupdate, preplicenses, zipit" /> <target name="updaterWithGeoIP" depends="prepupdate, prepgeoupdate, preplicenses, zipit" />
<target name="updaterWithJetty" depends="prepjupdate, preplicenses, zipit" /> <target name="updaterWithJetty" depends="prepjupdate, preplicenses, zipit" />
<target name="updaterWithJettyFixes" depends="prepjupdatefixes, preplicenses, zipit" /> <target name="updaterWithJettyFixes" depends="prepjupdatefixes, preplicenses, zipit" />

View File

@ -577,10 +577,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
*/ */
void propogateError(String msg, Throwable error) { void propogateError(String msg, Throwable error) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error(getPrefix() + "Error occurred: " + msg + " - " + error.getMessage()); _log.error(getPrefix() + "Error occurred: " + msg, error);
if (_log.shouldLog(Log.ERROR))
_log.error(getPrefix() + " cause", error);
if (_sessionListener != null) _sessionListener.errorOccurred(this, msg, error); if (_sessionListener != null) _sessionListener.errorOccurred(this, msg, error);
} }

View File

@ -60,6 +60,8 @@ public class Reseeder {
public static final String PROP_PROXY_ENABLE = "router.reseedProxyEnable"; public static final String PROP_PROXY_ENABLE = "router.reseedProxyEnable";
/** @since 0.8.2 */ /** @since 0.8.2 */
public static final String PROP_SSL_DISABLE = "router.reseedSSLDisable"; public static final String PROP_SSL_DISABLE = "router.reseedSSLDisable";
/** @since 0.8.2 */
public static final String PROP_SSL_REQUIRED = "router.reseedSSLRequired";
private static final String RESEED_TIPS = private static final String RESEED_TIPS =
_x("Ensure that nothing blocks outbound HTTP, check <a target=\"_top\" href=\"logs.jsp\">logs</a> " + _x("Ensure that nothing blocks outbound HTTP, check <a target=\"_top\" href=\"logs.jsp\">logs</a> " +