* i2psnark:

- Defer piece loading until required
      - Stub out Extension message support
This commit is contained in:
zzz
2010-11-21 21:19:12 +00:00
parent b5ae626425
commit 6c19e7e399
6 changed files with 145 additions and 12 deletions

View File

@ -0,0 +1,14 @@
package org.klomp.snark;
/**
* Callback used to fetch data
* @since 0.8.2
*/
interface DataLoader
{
/**
* This is the callback that PeerConnectionOut calls to get the data from disk
* @return bytes or null for errors
*/
public byte[] loadData(int piece, int begin, int length);
}

View File

@ -39,23 +39,28 @@ class Message
final static byte REQUEST = 6;
final static byte PIECE = 7;
final static byte CANCEL = 8;
final static byte EXTENSION = 20;
// Not all fields are used for every message.
// KEEP_ALIVE doesn't have a real wire representation
byte type;
// Used for HAVE, REQUEST, PIECE and CANCEL messages.
// low byte used for EXTENSION message
int piece;
// Used for REQUEST, PIECE and CANCEL messages.
int begin;
int length;
// Used for PIECE and BITFIELD messages
// Used for PIECE and BITFIELD and EXTENSION messages
byte[] data;
int off;
int len;
// Used to do deferred fetch of data
DataLoader dataLoader;
SimpleTimer.TimedEvent expireEvent;
/** Utility method for sending a message through a DataStream. */
@ -68,6 +73,13 @@ class Message
return;
}
// Get deferred data
if (data == null && dataLoader != null) {
data = dataLoader.loadData(piece, begin, length);
if (data == null)
return; // hmm will get retried, but shouldn't happen
}
// Calculate the total length in bytes
// Type is one byte.
@ -85,8 +97,12 @@ class Message
if (type == REQUEST || type == CANCEL)
datalen += 4;
// length is 1 byte
if (type == EXTENSION)
datalen += 1;
// add length of data for piece or bitfield array.
if (type == BITFIELD || type == PIECE)
if (type == BITFIELD || type == PIECE || type == EXTENSION)
datalen += len;
// Send length
@ -105,8 +121,11 @@ class Message
if (type == REQUEST || type == CANCEL)
dos.writeInt(length);
if (type == EXTENSION)
dos.writeByte((byte) piece & 0xff);
// Send actual data
if (type == BITFIELD || type == PIECE)
if (type == BITFIELD || type == PIECE || type == EXTENSION)
dos.write(data, off, len);
}
@ -135,6 +154,8 @@ class Message
return "PIECE(" + piece + "," + begin + "," + length + ")";
case CANCEL:
return "CANCEL(" + piece + "," + begin + "," + length + ")";
case EXTENSION:
return "EXTENSION(" + piece + ',' + data.length + ')';
default:
return "<UNKNOWN>";
}

View File

@ -59,6 +59,11 @@ public class Peer implements Comparable
private long uploaded_old[] = {-1,-1,-1};
private long downloaded_old[] = {-1,-1,-1};
// bytes per bt spec: 0011223344556677
static final long OPTION_EXTENSION = 0x0000000000100000l;
static final long OPTION_FAST = 0x0000000000000004l;
private long options;
/**
* Creates a disconnected peer given a PeerID, your own id and the
* relevant MetaInfo.
@ -285,9 +290,8 @@ public class Peer implements Comparable
// Handshake write - header
dout.write(19);
dout.write("BitTorrent protocol".getBytes("UTF-8"));
// Handshake write - zeros
byte[] zeros = new byte[8];
dout.write(zeros);
// Handshake write - options
dout.writeLong(OPTION_EXTENSION);
// Handshake write - metainfo hash
byte[] shared_hash = metainfo.getInfoHash();
dout.write(shared_hash);
@ -312,8 +316,8 @@ public class Peer implements Comparable
+ "'Bittorrent protocol', got '"
+ bittorrentProtocol + "'");
// Handshake read - zeros
din.readFully(zeros);
// Handshake read - options
options = din.readLong();
// Handshake read - metainfo hash
bs = new byte[20];
@ -325,6 +329,15 @@ public class Peer implements Comparable
din.readFully(bs);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Read the remote side's hash and peerID fully from " + toString());
// if ((options & OPTION_EXTENSION) != 0) {
if (options != 0) {
// send them something
if (_log.shouldLog(Log.DEBUG))
//_log.debug("Peer supports extension message, what should we say? " + toString());
_log.debug("Peer supports options 0x" + Long.toString(options, 16) + ", what should we say? " + toString());
}
return bs;
}

View File

@ -171,6 +171,13 @@ class PeerConnectionIn implements Runnable
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received cancel(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName());
break;
case 20: // Extension message
int id = din.readUnsignedByte();
byte[] payload = new byte[i-2];
din.readFully(payload);
ps.extensionMessage(id, payload);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received extension message from " + peer + " on " + peer.metainfo.getName());
default:
byte[] bs = new byte[i-1];
din.readFully(bs);

View File

@ -430,6 +430,33 @@ class PeerConnectionOut implements Runnable
return total;
}
/** @since 0.8.2 */
void sendPiece(int piece, int begin, int length, DataLoader loader)
{
boolean sendNow = false;
// are there any cases where we should?
if (sendNow) {
// queue the real thing
byte[] bytes = loader.loadData(piece, begin, length);
if (bytes != null)
sendPiece(piece, begin, length, bytes);
return;
}
// queue a fake message... set everything up,
// except save the PeerState instead of the bytes.
Message m = new Message();
m.type = Message.PIECE;
m.piece = piece;
m.begin = begin;
m.length = length;
m.dataLoader = loader;
m.off = 0;
m.len = length;
addMessage(m);
}
void sendPiece(int piece, int begin, int length, byte[] bytes)
{
Message m = new Message();
@ -488,4 +515,16 @@ class PeerConnectionOut implements Runnable
}
}
}
/** @since 0.8.2 */
void sendExtension(int id, byte[] bytes) {
Message m = new Message();
m.type = Message.EXTENSION;
m.piece = id;
m.data = bytes;
m.begin = 0;
m.length = bytes.length;
addMessage(m);
}
}

View File

@ -20,14 +20,20 @@
package org.klomp.snark;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
class PeerState
import org.klomp.snark.bencode.BDecoder;
import org.klomp.snark.bencode.BEValue;
class PeerState implements DataLoader
{
private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(PeerState.class);
final Peer peer;
@ -201,13 +207,28 @@ class PeerState
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("Queueing (" + piece + ", " + begin + ", "
+ length + ")" + " to " + peer);
// don't load the data into mem now, let PeerConnectionOut do it
out.sendPiece(piece, begin, length, this);
}
/**
* This is the callback that PeerConnectionOut calls
*
* @return bytes or null for errors
* @since 0.8.2
*/
public byte[] loadData(int piece, int begin, int length) {
byte[] pieceBytes = listener.gotRequest(peer, piece, begin, length);
if (pieceBytes == null)
{
// XXX - Protocol error-> diconnect?
if (_log.shouldLog(Log.WARN))
_log.warn("Got request for unknown piece: " + piece);
return;
return null;
}
// More sanity checks
@ -219,13 +240,13 @@ class PeerState
+ ", " + begin
+ ", " + length
+ "' message from " + peer);
return;
return null;
}
if (_log.shouldLog(Log.INFO))
_log.info("Sending (" + piece + ", " + begin + ", "
+ length + ")" + " to " + peer);
out.sendPiece(piece, begin, length, pieceBytes);
return pieceBytes;
}
/**
@ -413,6 +434,24 @@ class PeerState
out.cancelRequest(piece, begin, length);
}
/** @since 0.8.2 */
void extensionMessage(int id, byte[] bs)
{
if (id == 0) {
InputStream is = new ByteArrayInputStream(bs);
try {
BDecoder dec = new BDecoder(is);
BEValue bev = dec.bdecodeMap();
Map map = bev.getMap();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Got extension handshake message " + bev.toString());
} catch (Exception e) {}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Got extended message type: " + id + " length: " + bs.length);
}
}
void unknownMessage(int type, byte[] bs)
{
if (_log.shouldLog(Log.WARN))