diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index f4cf4fabbc..c1baaad8d2 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -318,6 +318,12 @@ public class Peer implements Comparable PeerState s = state; if (s != null) { + // try to save partial piece + if (this.deregister) { + PeerListener p = state.listener; + if (p != null) + p.savePeerPartial(state); + } state = null; PeerConnectionIn in = s.in; @@ -458,4 +464,15 @@ public class Peer implements Comparable return -1; //"no state"; } } + + /** + * Send keepalive + */ + public void keepAlive() + { + PeerState s = state; + if (s != null) + s.keepAlive(); + } + } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java index 8827189710..1efb25b4c2 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java @@ -164,6 +164,7 @@ class PeerCheckerTask extends TimerTask worstDownloader = peer; } } + peer.keepAlive(); } // Resync actual uploaders value diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index 2fef412c77..ede9151583 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -259,7 +259,13 @@ class PeerConnectionOut implements Runnable { Message m = new Message(); m.type = Message.KEEP_ALIVE; - addMessage(m); +// addMessage(m); + synchronized(sendQueue) + { + if(sendQueue.isEmpty()) + sendQueue.add(m); + sendQueue.notifyAll(); + } } void sendChoke(boolean choke) diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 154de22beb..0317f2be63 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -37,7 +37,7 @@ public class PeerCoordinator implements PeerListener final Snark snark; // package local for access by CheckDownLoadersTask - final static long CHECK_PERIOD = 20*1000; // 20 seconds + final static long CHECK_PERIOD = 40*1000; // 40 seconds final static int MAX_CONNECTIONS = 24; final static int MAX_UPLOADERS = 4; @@ -159,7 +159,7 @@ public class PeerCoordinator implements PeerListener } /** - * Returns the 2-minute-average rate in Bps + * Returns the 4-minute-average rate in Bps */ public long getDownloadRate() { @@ -171,7 +171,7 @@ public class PeerCoordinator implements PeerListener } /** - * Returns the 2-minute-average rate in Bps + * Returns the 4-minute-average rate in Bps */ public long getUploadRate() { @@ -233,6 +233,8 @@ public class PeerCoordinator implements PeerListener Peer old = peerIDInList(peer.getPeerID(), peers); if ( (old != null) && (old.getInactiveTime() > 2*60*1000) ) { // idle for 2 minutes, kill the old con + if (_log.shouldLog(Log.WARN)) + _log.warn("Remomving old peer: " + peer + ": " + old + ", inactive for " + old.getInactiveTime()); peers.remove(old); toDisconnect = old; old = null; @@ -449,6 +451,14 @@ public class PeerCoordinator implements PeerListener _log.warn("nothing to even rerequest from " + peer + ": requested = " + requested + " wanted = " + wantedPieces + " peerHas = " + havePieces); return -1; //If we still can't find a piece we want, so be it. + } else { + // Should be a lot smarter here - limit # of parallel attempts and + // share data rather than starting from 0 with each peer. + // And could share-as-we-go too. + // This is where the flaws of the snark data model are really exposed. + // Could also randomize within the duplicate set rather than strict rarest-first + if (_log.shouldLog(Log.DEBUG)) + _log.debug("parallel request (end game?) for " + peer + ": piece = " + piece); } } piece.setRequested(true); @@ -625,4 +635,68 @@ public class PeerCoordinator implements PeerListener } } } + + + /** Simple method to save a partial piece on peer disconnection + * and hopefully restart it later. + * Only one partial piece is saved at a time. + * Replace it if a new one is bigger or the old one is too old. + * Storage method is private so we can expand to save multiple partials + * if we wish. + */ + private Request savedRequest = null; + private long savedRequestTime = 0; + public void savePeerPartial(PeerState state) + { + Request req = state.getPartialRequest(); + if (req == null) + return; + if (savedRequest == null || + req.off > savedRequest.off || + System.currentTimeMillis() > savedRequestTime + (15 * 60 * 1000)) { + savedRequest = req; + savedRequestTime = System.currentTimeMillis(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(" Saving orphaned partial piece " + req); + if (savedRequest != null) + if (_log.shouldLog(Log.DEBUG)) + _log.debug(" (Discarding previously saved orphan) " + savedRequest); + } else { + if (req.piece != savedRequest.piece) + if (_log.shouldLog(Log.DEBUG)) + _log.debug(" Discarding orphaned partial piece " + req); + } + } + + /** Return partial piece if it's still wanted and peer has it. + */ + public Request getPeerPartial(BitField havePieces) { + if (savedRequest == null) + return null; + if (! havePieces.get(savedRequest.piece)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Peer doesn't have orphaned piece " + savedRequest); + return null; + } + synchronized(wantedPieces) + { + for(Iterator iter = wantedPieces.iterator(); iter.hasNext(); ) { + Piece piece = (Piece)iter.next(); + if (piece.getId() == savedRequest.piece) { + Request req = savedRequest; + piece.setRequested(true); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Restoring orphaned partial piece " + req); + savedRequest = null; + return req; + } + } + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("We no longer want orphaned piece " + savedRequest); + savedRequest = null; + return null; + } + } + diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java index 959b42fa14..016e4442b7 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java @@ -142,4 +142,22 @@ public interface PeerListener * we are no longer interested in the peer. */ int wantPiece(Peer peer, BitField bitfield); + + /** + * Called when the peer has disconnected and the peer task may have a partially + * downloaded piece that the PeerCoordinator can save + * + * @param state the PeerState for the peer + */ + void savePeerPartial(PeerState state); + + /** + * Called when a peer has connected and there may be a partially + * downloaded piece that the coordinatorator can give the peer task + * + * @param havePieces the have-pieces bitmask for the peer + * + * @return request (contains the partial data and valid length) + */ + Request getPeerPartial(BitField havePieces); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index 5bbefbadff..ae42a1a021 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -221,6 +221,10 @@ class PeerState listener.uploaded(peer, size); } + // This is used to flag that we have to back up from the firstOutstandingRequest + // when calculating how far we've gotten + Request pendingRequest = null; + /** * Called when a partial piece request has been handled by * PeerConnectionIn. @@ -231,6 +235,8 @@ class PeerState downloaded += size; listener.downloaded(peer, size); + pendingRequest = null; + // Last chunk needed for this piece? if (getFirstOutstandingRequest(req.piece) == -1) { @@ -318,14 +324,8 @@ class PeerState { Request dropReq = (Request)outstandingRequests.remove(0); outstandingRequests.add(dropReq); - // We used to rerequest the missing chunks but that mostly - // just confuses the other side. So now we just keep - // waiting for them. They will be rerequested when we get - // choked/unchoked again. - /* - if (!choked) + if (!choked) out.sendRequest(dropReq); - */ if (_log.shouldLog(Log.WARN)) _log.warn("dropped " + dropReq + " with peer " + peer); } @@ -336,10 +336,33 @@ class PeerState // Request more if necessary to keep the pipeline filled. addRequest(); + pendingRequest = req; return req; } + // get longest partial piece + Request getPartialRequest() + { + Request req = null; + for (int i = 0; i < outstandingRequests.size(); i++) { + Request r1 = (Request)outstandingRequests.get(i); + int j = getFirstOutstandingRequest(r1.piece); + if (j == -1) + continue; + Request r2 = (Request)outstandingRequests.get(j); + if (r2.off > 0 && ((req == null) || (r2.off > req.off))) + req = r2; + } + if (pendingRequest != null && req != null && pendingRequest.off < req.off) { + if (pendingRequest.off != 0) + req = pendingRequest; + else + req = null; + } + return req; + } + void cancelMessage(int piece, int begin, int length) { if (_log.shouldLog(Log.DEBUG)) @@ -472,6 +495,15 @@ class PeerState // Check that we already know what the other side has. if (bitfield != null) { + // Check for adopting an orphaned partial piece + Request r = listener.getPeerPartial(bitfield); + if (r != null) { + outstandingRequests.add(r); + if (!choked) + out.sendRequest(r); + lastRequest = r; + return true; + } int nextPiece = listener.wantPiece(peer, bitfield); if (_log.shouldLog(Log.DEBUG)) _log.debug(peer + " want piece " + nextPiece); @@ -523,4 +555,9 @@ class PeerState out.sendChoke(choke); } } + + synchronized void keepAlive() + { + out.sendAlive(); + } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index 66505c2a0e..04fdc16a48 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -353,8 +353,12 @@ public class I2PSnarkServlet extends HttpServlet { out.write("