* i2psnark: Implement basic partial-piece saves across connections
    * i2psnark: Implement keep-alive sending. This will keep non-i2psnark clients
      from dropping us for inactivity but also renders the 2-minute transmit-inactivity
      code in i2psnark ineffective. Will have to research why there is transmit but
      not receive inactivity code. With the current connection limit of 24 peers
      we aren't in any danger of keeping out new peers by keeping inactive ones.
    * i2psnark: Increase CHECK_PERIOD from 20 to 40 since nothing happens in 20 seconds
    * i2psnark: Fix dropped chunk handling
    * i2psnark: Web rate report cleanup
This commit is contained in:
zzz
2006-09-06 06:32:53 +00:00
committed by zzz
parent b92ee364bc
commit 678f7d8f72
9 changed files with 186 additions and 17 deletions

View File

@ -318,6 +318,12 @@ public class Peer implements Comparable
PeerState s = state; PeerState s = state;
if (s != null) if (s != null)
{ {
// try to save partial piece
if (this.deregister) {
PeerListener p = state.listener;
if (p != null)
p.savePeerPartial(state);
}
state = null; state = null;
PeerConnectionIn in = s.in; PeerConnectionIn in = s.in;
@ -458,4 +464,15 @@ public class Peer implements Comparable
return -1; //"no state"; return -1; //"no state";
} }
} }
/**
* Send keepalive
*/
public void keepAlive()
{
PeerState s = state;
if (s != null)
s.keepAlive();
}
} }

View File

@ -164,6 +164,7 @@ class PeerCheckerTask extends TimerTask
worstDownloader = peer; worstDownloader = peer;
} }
} }
peer.keepAlive();
} }
// Resync actual uploaders value // Resync actual uploaders value

View File

@ -259,7 +259,13 @@ class PeerConnectionOut implements Runnable
{ {
Message m = new Message(); Message m = new Message();
m.type = Message.KEEP_ALIVE; m.type = Message.KEEP_ALIVE;
addMessage(m); // addMessage(m);
synchronized(sendQueue)
{
if(sendQueue.isEmpty())
sendQueue.add(m);
sendQueue.notifyAll();
}
} }
void sendChoke(boolean choke) void sendChoke(boolean choke)

View File

@ -37,7 +37,7 @@ public class PeerCoordinator implements PeerListener
final Snark snark; final Snark snark;
// package local for access by CheckDownLoadersTask // package local for access by CheckDownLoadersTask
final static long CHECK_PERIOD = 20*1000; // 20 seconds final static long CHECK_PERIOD = 40*1000; // 40 seconds
final static int MAX_CONNECTIONS = 24; final static int MAX_CONNECTIONS = 24;
final static int MAX_UPLOADERS = 4; final static int MAX_UPLOADERS = 4;
@ -159,7 +159,7 @@ public class PeerCoordinator implements PeerListener
} }
/** /**
* Returns the 2-minute-average rate in Bps * Returns the 4-minute-average rate in Bps
*/ */
public long getDownloadRate() public long getDownloadRate()
{ {
@ -171,7 +171,7 @@ public class PeerCoordinator implements PeerListener
} }
/** /**
* Returns the 2-minute-average rate in Bps * Returns the 4-minute-average rate in Bps
*/ */
public long getUploadRate() public long getUploadRate()
{ {
@ -233,6 +233,8 @@ public class PeerCoordinator implements PeerListener
Peer old = peerIDInList(peer.getPeerID(), peers); Peer old = peerIDInList(peer.getPeerID(), peers);
if ( (old != null) && (old.getInactiveTime() > 2*60*1000) ) { if ( (old != null) && (old.getInactiveTime() > 2*60*1000) ) {
// idle for 2 minutes, kill the old con // idle for 2 minutes, kill the old con
if (_log.shouldLog(Log.WARN))
_log.warn("Remomving old peer: " + peer + ": " + old + ", inactive for " + old.getInactiveTime());
peers.remove(old); peers.remove(old);
toDisconnect = old; toDisconnect = old;
old = null; old = null;
@ -449,6 +451,14 @@ public class PeerCoordinator implements PeerListener
_log.warn("nothing to even rerequest from " + peer + ": requested = " + requested _log.warn("nothing to even rerequest from " + peer + ": requested = " + requested
+ " wanted = " + wantedPieces + " peerHas = " + havePieces); + " wanted = " + wantedPieces + " peerHas = " + havePieces);
return -1; //If we still can't find a piece we want, so be it. return -1; //If we still can't find a piece we want, so be it.
} else {
// Should be a lot smarter here - limit # of parallel attempts and
// share data rather than starting from 0 with each peer.
// And could share-as-we-go too.
// This is where the flaws of the snark data model are really exposed.
// Could also randomize within the duplicate set rather than strict rarest-first
if (_log.shouldLog(Log.DEBUG))
_log.debug("parallel request (end game?) for " + peer + ": piece = " + piece);
} }
} }
piece.setRequested(true); piece.setRequested(true);
@ -625,4 +635,68 @@ public class PeerCoordinator implements PeerListener
} }
} }
} }
/** Simple method to save a partial piece on peer disconnection
* and hopefully restart it later.
* Only one partial piece is saved at a time.
* Replace it if a new one is bigger or the old one is too old.
* Storage method is private so we can expand to save multiple partials
* if we wish.
*/
private Request savedRequest = null;
private long savedRequestTime = 0;
public void savePeerPartial(PeerState state)
{
Request req = state.getPartialRequest();
if (req == null)
return;
if (savedRequest == null ||
req.off > savedRequest.off ||
System.currentTimeMillis() > savedRequestTime + (15 * 60 * 1000)) {
savedRequest = req;
savedRequestTime = System.currentTimeMillis();
if (_log.shouldLog(Log.DEBUG))
_log.debug(" Saving orphaned partial piece " + req);
if (savedRequest != null)
if (_log.shouldLog(Log.DEBUG))
_log.debug(" (Discarding previously saved orphan) " + savedRequest);
} else {
if (req.piece != savedRequest.piece)
if (_log.shouldLog(Log.DEBUG))
_log.debug(" Discarding orphaned partial piece " + req);
}
}
/** Return partial piece if it's still wanted and peer has it.
*/
public Request getPeerPartial(BitField havePieces) {
if (savedRequest == null)
return null;
if (! havePieces.get(savedRequest.piece)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer doesn't have orphaned piece " + savedRequest);
return null;
}
synchronized(wantedPieces)
{
for(Iterator iter = wantedPieces.iterator(); iter.hasNext(); ) {
Piece piece = (Piece)iter.next();
if (piece.getId() == savedRequest.piece) {
Request req = savedRequest;
piece.setRequested(true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Restoring orphaned partial piece " + req);
savedRequest = null;
return req;
}
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("We no longer want orphaned piece " + savedRequest);
savedRequest = null;
return null;
}
} }

View File

@ -142,4 +142,22 @@ public interface PeerListener
* we are no longer interested in the peer. * we are no longer interested in the peer.
*/ */
int wantPiece(Peer peer, BitField bitfield); int wantPiece(Peer peer, BitField bitfield);
/**
* Called when the peer has disconnected and the peer task may have a partially
* downloaded piece that the PeerCoordinator can save
*
* @param state the PeerState for the peer
*/
void savePeerPartial(PeerState state);
/**
* Called when a peer has connected and there may be a partially
* downloaded piece that the coordinatorator can give the peer task
*
* @param havePieces the have-pieces bitmask for the peer
*
* @return request (contains the partial data and valid length)
*/
Request getPeerPartial(BitField havePieces);
} }

View File

@ -221,6 +221,10 @@ class PeerState
listener.uploaded(peer, size); listener.uploaded(peer, size);
} }
// This is used to flag that we have to back up from the firstOutstandingRequest
// when calculating how far we've gotten
Request pendingRequest = null;
/** /**
* Called when a partial piece request has been handled by * Called when a partial piece request has been handled by
* PeerConnectionIn. * PeerConnectionIn.
@ -231,6 +235,8 @@ class PeerState
downloaded += size; downloaded += size;
listener.downloaded(peer, size); listener.downloaded(peer, size);
pendingRequest = null;
// Last chunk needed for this piece? // Last chunk needed for this piece?
if (getFirstOutstandingRequest(req.piece) == -1) if (getFirstOutstandingRequest(req.piece) == -1)
{ {
@ -318,14 +324,8 @@ class PeerState
{ {
Request dropReq = (Request)outstandingRequests.remove(0); Request dropReq = (Request)outstandingRequests.remove(0);
outstandingRequests.add(dropReq); outstandingRequests.add(dropReq);
// We used to rerequest the missing chunks but that mostly
// just confuses the other side. So now we just keep
// waiting for them. They will be rerequested when we get
// choked/unchoked again.
/*
if (!choked) if (!choked)
out.sendRequest(dropReq); out.sendRequest(dropReq);
*/
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("dropped " + dropReq + " with peer " + peer); _log.warn("dropped " + dropReq + " with peer " + peer);
} }
@ -336,10 +336,33 @@ class PeerState
// Request more if necessary to keep the pipeline filled. // Request more if necessary to keep the pipeline filled.
addRequest(); addRequest();
pendingRequest = req;
return req; return req;
} }
// get longest partial piece
Request getPartialRequest()
{
Request req = null;
for (int i = 0; i < outstandingRequests.size(); i++) {
Request r1 = (Request)outstandingRequests.get(i);
int j = getFirstOutstandingRequest(r1.piece);
if (j == -1)
continue;
Request r2 = (Request)outstandingRequests.get(j);
if (r2.off > 0 && ((req == null) || (r2.off > req.off)))
req = r2;
}
if (pendingRequest != null && req != null && pendingRequest.off < req.off) {
if (pendingRequest.off != 0)
req = pendingRequest;
else
req = null;
}
return req;
}
void cancelMessage(int piece, int begin, int length) void cancelMessage(int piece, int begin, int length)
{ {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -472,6 +495,15 @@ class PeerState
// Check that we already know what the other side has. // Check that we already know what the other side has.
if (bitfield != null) if (bitfield != null)
{ {
// Check for adopting an orphaned partial piece
Request r = listener.getPeerPartial(bitfield);
if (r != null) {
outstandingRequests.add(r);
if (!choked)
out.sendRequest(r);
lastRequest = r;
return true;
}
int nextPiece = listener.wantPiece(peer, bitfield); int nextPiece = listener.wantPiece(peer, bitfield);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " want piece " + nextPiece); _log.debug(peer + " want piece " + nextPiece);
@ -523,4 +555,9 @@ class PeerState
out.sendChoke(choke); out.sendChoke(choke);
} }
} }
synchronized void keepAlive()
{
out.sendAlive();
}
} }

View File

@ -353,8 +353,12 @@ public class I2PSnarkServlet extends HttpServlet {
out.write("<td valign=\"top\" align=\"left\" class=\"snarkTorrentUploaded " + rowClass out.write("<td valign=\"top\" align=\"left\" class=\"snarkTorrentUploaded " + rowClass
+ "\">" + formatSize(uploaded) + "</td>\n\t"); + "\">" + formatSize(uploaded) + "</td>\n\t");
out.write("<td valign=\"top\" align=\"left\" class=\"snarkTorrentRate\">"); out.write("<td valign=\"top\" align=\"left\" class=\"snarkTorrentRate\">");
if(isRunning && remaining > 0)
out.write(formatSize(downBps) + "ps");
out.write("</td>\n\t");
out.write("<td valign=\"top\" align=\"left\" class=\"snarkTorrentRate\">");
if(isRunning) if(isRunning)
out.write(formatSize(downBps) + "ps/" + formatSize(upBps) + "ps"); out.write(formatSize(upBps) + "ps");
out.write("</td>\n\t"); out.write("</td>\n\t");
out.write("<td valign=\"top\" align=\"left\" class=\"snarkTorrentAction " + rowClass + "\">"); out.write("<td valign=\"top\" align=\"left\" class=\"snarkTorrentAction " + rowClass + "\">");
if (isRunning) { if (isRunning) {
@ -580,7 +584,8 @@ public class I2PSnarkServlet extends HttpServlet {
" <th align=\"left\" valign=\"top\">Torrent</th>\n" + " <th align=\"left\" valign=\"top\">Torrent</th>\n" +
" <th align=\"left\" valign=\"top\">Downloaded</th>\n" + " <th align=\"left\" valign=\"top\">Downloaded</th>\n" +
" <th align=\"left\" valign=\"top\">Uploaded</th>\n" + " <th align=\"left\" valign=\"top\">Uploaded</th>\n" +
" <th align=\"left\" valign=\"top\">Rate Down/Up</th>\n" + " <th align=\"left\" valign=\"top\">Down Rate</th>\n" +
" <th align=\"left\" valign=\"top\">Up Rate</th>\n" +
" <th>&nbsp;</th></tr>\n" + " <th>&nbsp;</th></tr>\n" +
"</thead>\n"; "</thead>\n";

View File

@ -1,4 +1,15 @@
$Id: history.txt,v 1.509 2006-09-04 01:01:54 zzz Exp $ $Id: history.txt,v 1.510 2006-09-04 03:26:22 zzz Exp $
2006-09-05 zzz
* i2psnark: Implement basic partial-piece saves across connections
* i2psnark: Implement keep-alive sending. This will keep non-i2psnark clients
from dropping us for inactivity but also renders the 2-minute transmit-inactivity
code in i2psnark ineffective. Will have to research why there is transmit but
not receive inactivity code. With the current connection limit of 24 peers
we aren't in any danger of keeping out new peers by keeping inactive ones.
* i2psnark: Increase CHECK_PERIOD from 20 to 40 since nothing happens in 20 seconds
* i2psnark: Fix dropped chunk handling
* i2psnark: Web rate report cleanup
2006-09-04 zzz 2006-09-04 zzz
* i2psnark: Report cleared trackerErr immediately * i2psnark: Report cleared trackerErr immediately

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.449 $ $Date: 2006-09-04 01:01:53 $"; public final static String ID = "$Revision: 1.450 $ $Date: 2006-09-04 03:26:21 $";
public final static String VERSION = "0.6.1.24"; public final static String VERSION = "0.6.1.24";
public final static long BUILD = 8; public final static long BUILD = 9;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);