metadata handling - untested, still some stubs

This commit is contained in:
zzz
2010-12-21 03:04:10 +00:00
parent 8451610737
commit 8e40b35210
10 changed files with 572 additions and 79 deletions

View File

@ -0,0 +1,224 @@
package org.klomp.snark;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
import org.klomp.snark.bencode.BDecoder;
import org.klomp.snark.bencode.BEncoder;
import org.klomp.snark.bencode.BEValue;
import org.klomp.snark.bencode.InvalidBEncodingException;
/**
* REF: BEP 10 Extension Protocol
* @since 0.8.2
* @author zzz
*/
abstract class ExtensionHandler {
private static final byte[] _handshake = buildHandshake();
private static final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(ExtensionHandler.class);
public static final int ID_METADATA = 3;
private static final String TYPE_METADATA = "ut_metadata";
private static final int MAX_METADATA_SIZE = Storage.MAX_PIECES * 32 * 5 / 4;
private static final int PARALLEL_REQUESTS = 3;
/**
* @return bencoded outgoing handshake message
*/
public static byte[] getHandshake() {
return _handshake;
}
/** outgoing handshake message */
private static byte[] buildHandshake() {
Map<String, Object> handshake = new HashMap();
Map<String, Integer> m = new HashMap();
m.put(TYPE_METADATA, Integer.valueOf(ID_METADATA));
handshake.put("m", m);
handshake.put("p", Integer.valueOf(6881));
handshake.put("v", "I2PSnark");
handshake.put("reqq", Integer.valueOf(5));
return BEncoder.bencode(handshake);
}
public static void handleMessage(Peer peer, int id, byte[] bs) {
if (id == 0)
handleHandshake(peer, bs);
else if (id == ID_METADATA)
handleMetadata(peer, bs);
else if (_log.shouldLog(Log.INFO))
_log.info("Unknown extension msg " + id + " from " + peer);
}
private static void handleHandshake(Peer peer, byte[] bs) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Got handshake msg from " + peer);
try {
// this throws NPE on missing keys
InputStream is = new ByteArrayInputStream(bs);
BDecoder dec = new BDecoder(is);
BEValue bev = dec.bdecodeMap();
Map<String, BEValue> map = bev.getMap();
Map<String, BEValue> msgmap = map.get("m").getMap();
peer.setHandshakeMap(map);
// not used, just to throw out of here
int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get(TYPE_METADATA).getInt();
int metaSize = map.get("metadata_size").getInt();
MagnetState state = peer.getMagnetState();
int remaining;
synchronized(state) {
if (state.isComplete())
return;
if (state.isInitialized()) {
if (state.getSize() != metaSize) {
peer.disconnect();
return;
}
} else {
// initialize it
if (metaSize > MAX_METADATA_SIZE) {
peer.disconnect(false);
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("Initialized state, metadata size = " + metaSize + " from " + peer);
state.initialize(metaSize);
}
remaining = state.chunksRemaining();
}
// send requests for chunks
int count = Math.min(remaining, PARALLEL_REQUESTS);
for (int i = 0; i < count; i++) {
int chk;
synchronized(state) {
chk = state.getNextRequest();
}
if (_log.shouldLog(Log.INFO))
_log.info("Request chunk " + chk + " from " + peer);
sendRequest(peer, chk);
}
} catch (Exception e) {
if (_log.shouldLog(Log.WARN))
_log.info("Handshake exception from " + peer, e);
//peer.disconnect(false);
}
}
private static final int TYPE_REQUEST = 0;
private static final int TYPE_DATA = 1;
private static final int TYPE_REJECT = 2;
private static final int CHUNK_SIZE = 16*1024;
/** 25% extra for file names, benconding overhead, etc */
/**
* REF: BEP 9
* @since 0.8.4
*/
private static void handleMetadata(Peer peer, byte[] bs) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Got metadata msg from " + peer);
try {
InputStream is = new ByteArrayInputStream(bs);
BDecoder dec = new BDecoder(is);
BEValue bev = dec.bdecodeMap();
Map<String, BEValue> map = bev.getMap();
int type = map.get("msg_type").getInt();
int piece = map.get("piece").getInt();
MagnetState state = peer.getMagnetState();
if (type == TYPE_REQUEST) {
byte[] pc;
synchronized(state) {
pc = state.getChunk(piece);
}
sendPiece(peer, piece, pc);
} else if (type == TYPE_DATA) {
int size = map.get("total_size").getInt();
boolean done;
int chk = -1;
synchronized(state) {
if (state.isComplete())
return;
int len = is.available();
done = state.saveChunk(piece, bs, bs.length - len, len);
if (_log.shouldLog(Log.INFO))
_log.info("Got chunk " + piece + " from " + peer);
if (!done)
chk = state.getNextRequest();
}
// out of the lock
if (done) {
// Done!
// PeerState will call the listener (peer coord), who will
// check to see if the MagnetState has it
if (_log.shouldLog(Log.WARN))
_log.warn("Got last chunk from " + peer);
} else {
// get the next chunk
if (_log.shouldLog(Log.INFO))
_log.info("Request chunk " + chk + " from " + peer);
sendRequest(peer, chk);
}
} else if (type == TYPE_REJECT) {
if (_log.shouldLog(Log.WARN))
_log.warn("Got reject msg from " + peer);
peer.disconnect(false);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Got unknown metadata msg from " + peer);
peer.disconnect(false);
}
} catch (Exception e) {
if (_log.shouldLog(Log.WARN))
_log.info("Metadata ext. msg. exception from " + peer, e);
peer.disconnect(false);
}
}
private static void sendRequest(Peer peer, int piece) {
Map<String, Object> map = new HashMap();
map.put("msg_type", TYPE_REQUEST);
map.put("piece", Integer.valueOf(piece));
byte[] payload = BEncoder.bencode(map);
try {
int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get(TYPE_METADATA).getInt();
peer.sendExtension(hisMsgCode, payload);
} catch (Exception e) {
// NPE, no metadata capability
if (_log.shouldLog(Log.WARN))
_log.info("Metadata send req msg exception to " + peer, e);
}
}
private static void sendPiece(Peer peer, int piece, byte[] data) {
Map<String, Object> map = new HashMap();
map.put("msg_type", TYPE_REQUEST);
map.put("piece", Integer.valueOf(piece));
map.put("total_size", Integer.valueOf(data.length));
byte[] dict = BEncoder.bencode(map);
byte[] payload = new byte[dict.length + data.length];
System.arraycopy(dict, 0, payload, 0, dict.length);
System.arraycopy(data, 0, payload, dict.length, payload.length);
try {
int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get("METADATA").getInt();
peer.sendExtension(hisMsgCode, payload);
} catch (Exception e) {
// NPE, no metadata caps
if (_log.shouldLog(Log.WARN))
_log.info("Metadata send piece msg exception to " + peer, e);
}
}
}

View File

@ -1,36 +0,0 @@
package org.klomp.snark;
import java.util.HashMap;
import java.util.Map;
import org.klomp.snark.bencode.BEncoder;
import org.klomp.snark.bencode.BEValue;
/**
* REF: BEP 10 Extension Protocol
* @since 0.8.2
*/
class ExtensionHandshake {
private static final byte[] _payload = buildPayload();
/**
* @return bencoded data
*/
static byte[] getPayload() {
return _payload;
}
/** just a test for now */
private static byte[] buildPayload() {
Map<String, Object> handshake = new HashMap();
Map<String, Integer> m = new HashMap();
m.put("foo", Integer.valueOf(99));
m.put("bar", Integer.valueOf(101));
handshake.put("m", m);
handshake.put("p", Integer.valueOf(6881));
handshake.put("v", "I2PSnark");
handshake.put("reqq", Integer.valueOf(5));
return BEncoder.bencode(handshake);
}
}

View File

@ -0,0 +1,204 @@
package org.klomp.snark;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import org.klomp.snark.bencode.BDecoder;
import org.klomp.snark.bencode.BEValue;
/**
* Simple state for the download of the metainfo, shared between
* Peer and ExtensionHandler.
*
* Nothing is synchronized here!
* Caller must synchronize on this for everything!
*
* Reference: BEP 9
*
* @since 0.8.4
* author zzz
*/
class MagnetState {
public static final int CHUNK_SIZE = 16*1024;
private static final Random random = I2PAppContext.getGlobalContext().random();
private final byte[] infohash;
private boolean complete;
/** if false, nothing below is valid */
private boolean isInitialized;
private int metaSize;
private int totalChunks;
/** bitfield for the metainfo chunks - will remain null if we start out complete */
private BitField requested;
private BitField have;
/** bitfield for the metainfo */
private byte[] metainfoBytes;
/** only valid when finished */
private MetaInfo metainfo;
/**
* @param meta null for new magnet
*/
public MagnetState(byte[] iHash, MetaInfo meta) {
infohash = iHash;
if (meta != null) {
metainfo = meta;
initialize(meta.getInfoBytes().length);
complete = true;
} else {
metainfoBytes = new byte[metaSize];
}
}
/**
* @param call this for a new magnet when you have the size
* @throws IllegalArgumentException
*/
public void initialize(int size) {
if (isInitialized)
throw new IllegalArgumentException("already set");
isInitialized = true;
metaSize = size;
totalChunks = (size + (CHUNK_SIZE - 1)) / CHUNK_SIZE;
if (metainfo == null) {
// we don't need these if complete
have = new BitField(totalChunks);
requested = new BitField(totalChunks);
}
}
/**
* @param Call this for a new magnet when the download is complete.
* @throws IllegalArgumentException
*/
public void setMetaInfo(MetaInfo meta) {
metainfo = meta;
}
/**
* @throws IllegalArgumentException
*/
public MetaInfo getMetaInfo() {
if (!complete)
throw new IllegalArgumentException("not complete");
return metainfo;
}
/**
* @throws IllegalArgumentException
*/
public int getSize() {
if (!isInitialized)
throw new IllegalArgumentException("not initialized");
return metaSize;
}
public boolean isInitialized() {
return isInitialized;
}
public boolean isComplete() {
return complete;
}
public int chunkSize(int chunk) {
return Math.min(CHUNK_SIZE, metaSize - (chunk * CHUNK_SIZE));
}
/** @return chunk count */
public int chunksRemaining() {
if (!isInitialized)
throw new IllegalArgumentException("not initialized");
if (complete)
return 0;
return totalChunks - have.count();
}
/** @return chunk number */
public int getNextRequest() {
if (!isInitialized)
throw new IllegalArgumentException("not initialized");
if (complete)
throw new IllegalArgumentException("complete");
int rand = random.nextInt(totalChunks);
for (int i = 0; i < totalChunks; i++) {
int chk = (i + rand) % totalChunks;
if (!(have.get(chk) || requested.get(chk))) {
requested.set(chk);
return chk;
}
}
// all requested - end game
for (int i = 0; i < totalChunks; i++) {
int chk = (i + rand) % totalChunks;
if (!have.get(chk))
return chk;
}
throw new IllegalArgumentException("complete");
}
/**
* @throws IllegalArgumentException
*/
public byte[] getChunk(int chunk) {
if (!complete)
throw new IllegalArgumentException("not complete");
if (chunk < 0 || chunk >= totalChunks)
throw new IllegalArgumentException("bad chunk number");
int size = chunkSize(chunk);
byte[] rv = new byte[size];
System.arraycopy(metainfoBytes, chunk * CHUNK_SIZE, rv, 0, size);
// use meta.getInfoBytes() so we don't save it in memory
return rv;
}
/**
* @return true if this was the last piece
* @throws NPE, IllegalArgumentException, IOException, ...
*/
public boolean saveChunk(int chunk, byte[] data, int off, int length) throws Exception {
if (!isInitialized)
throw new IllegalArgumentException("not initialized");
if (chunk < 0 || chunk >= totalChunks)
throw new IllegalArgumentException("bad chunk number");
if (have.get(chunk))
return false; // shouldn't happen if synced
int size = chunkSize(chunk);
if (size != length)
throw new IllegalArgumentException("bad chunk length");
System.arraycopy(data, off, metainfoBytes, chunk * CHUNK_SIZE, size);
have.set(chunk);
boolean done = have.complete();
if (done) {
metainfo = buildMetaInfo();
complete = true;
}
return done;
}
/**
* @return true if this was the last piece
* @throws NPE, IllegalArgumentException, IOException, ...
*/
public MetaInfo buildMetaInfo() throws Exception {
// top map has nothing in it but the info map (no announce)
Map<String, Object> map = new HashMap();
InputStream is = new ByteArrayInputStream(metainfoBytes);
BDecoder dec = new BDecoder(is);
BEValue bev = dec.bdecodeMap();
Map<String, BEValue> info = bev.getMap();
map.put("info", info);
MetaInfo newmeta = new MetaInfo(map);
if (!DataHelper.eq(newmeta.getInfoHash(), infohash))
throw new IOException("info hash mismatch");
return newmeta;
}
}

View File

@ -25,6 +25,7 @@ import java.io.InputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -59,10 +60,11 @@ public class MetaInfo
private final int piece_length;
private final byte[] piece_hashes;
private final long length;
private final Map infoMap;
private byte[] torrentdata;
private Map infoMap;
/**
* Called by Storage when creating a new torrent from local data
*/
MetaInfo(String announce, String name, String name_utf8, List files, List lengths,
int piece_length, byte[] piece_hashes, long length)
{
@ -77,7 +79,7 @@ public class MetaInfo
this.length = length;
this.info_hash = calculateInfoHash();
infoMap = null;
//infoMap = null;
}
/**
@ -104,7 +106,7 @@ public class MetaInfo
* Creates a new MetaInfo from a Map of BEValues and the SHA1 over
* the original bencoded info dictonary (this is a hack, we could
* reconstruct the bencoded stream and recalculate the hash). Will
* throw a InvalidBEncodingException if the given map does not
* NOT throw a InvalidBEncodingException if the given map does not
* contain a valid announce string or info dictonary.
*/
public MetaInfo(Map m) throws InvalidBEncodingException
@ -112,9 +114,13 @@ public class MetaInfo
if (_log.shouldLog(Log.DEBUG))
_log.debug("Creating a metaInfo: " + m, new Exception("source"));
BEValue val = (BEValue)m.get("announce");
if (val == null)
throw new InvalidBEncodingException("Missing announce string");
// Disabled check, we can get info from a magnet now
if (val == null) {
//throw new InvalidBEncodingException("Missing announce string");
this.announce = null;
} else {
this.announce = val.getString();
}
val = (BEValue)m.get("info");
if (val == null)
@ -215,6 +221,7 @@ public class MetaInfo
/**
* Returns the string representing the URL of the tracker for this torrent.
* @return may be null!
*/
public String getAnnounce()
{
@ -388,26 +395,34 @@ public class MetaInfo
piece_hashes, length);
}
public byte[] getTorrentData()
{
if (torrentdata == null)
/**
* Called by servlet to save a new torrent file generated from local data
*/
public synchronized byte[] getTorrentData()
{
Map m = new HashMap();
m.put("announce", announce);
Map info = createInfoMap();
m.put("info", info);
torrentdata = BEncoder.bencode(m);
}
return torrentdata;
// don't save this locally, we should only do this once
return BEncoder.bencode(m);
}
private Map createInfoMap()
{
Map info = new HashMap();
if (infoMap != null) {
info.putAll(infoMap);
return info;
/** @since 0.8.4 */
public synchronized byte[] getInfoBytes() {
if (infoMap == null)
createInfoMap();
return BEncoder.bencode(infoMap);
}
/** @return an unmodifiable view of the Map */
private Map<String, BEValue> createInfoMap()
{
// if we loaded this metainfo from a file, we have the map
if (infoMap != null)
return Collections.unmodifiableMap(infoMap);
// otherwise we must create it
Map info = new HashMap();
info.put("name", name);
if (name_utf8 != null)
info.put("name.utf-8", name_utf8);
@ -429,7 +444,8 @@ public class MetaInfo
}
info.put("files", l);
}
return info;
infoMap = info;
return Collections.unmodifiableMap(infoMap);
}
private byte[] calculateInfoHash()

View File

@ -28,11 +28,14 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
import org.klomp.snark.bencode.BEValue;
public class Peer implements Comparable
{
private Log _log = new Log(Peer.class);
@ -41,7 +44,9 @@ public class Peer implements Comparable
private final byte[] my_id;
private final byte[] infohash;
final MetaInfo metainfo;
/** will start out null in magnet mode */
private MetaInfo metainfo;
private Map<String, BEValue> handshakeMap;
// The data in/output streams set during the handshake and used by
// the actual connections.
@ -52,6 +57,9 @@ public class Peer implements Comparable
// was successful, the connection setup and runs
PeerState state;
/** shared across all peers on this torrent */
MagnetState magnetState;
private I2PSocket sock;
private boolean deregister = true;
@ -197,7 +205,7 @@ public class Peer implements Comparable
* If the given BitField is non-null it is send to the peer as first
* message.
*/
public void runConnection(I2PSnarkUtil util, PeerListener listener, BitField bitfield)
public void runConnection(I2PSnarkUtil util, PeerListener listener, BitField bitfield, MagnetState mState)
{
if (state != null)
throw new IllegalStateException("Peer already started");
@ -255,7 +263,7 @@ public class Peer implements Comparable
if ((options & OPTION_EXTENSION) != 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer supports extensions, sending test message");
out.sendExtension(0, ExtensionHandshake.getPayload());
out.sendExtension(0, ExtensionHandler.getHandshake());
}
if ((options & OPTION_DHT) != 0 && util.getDHT() != null) {
@ -271,6 +279,7 @@ public class Peer implements Comparable
// We are up and running!
state = s;
magnetState = mState;
listener.connected(this);
if (_log.shouldLog(Log.DEBUG))
@ -371,6 +380,42 @@ public class Peer implements Comparable
return options;
}
/**
* Shared state across all peers, callers must sync on returned object
* @return non-null
* @since 0.8.4
*/
public MagnetState getMagnetState() {
return magnetState;
}
/** @return could be null @since 0.8.4 */
public Map<String, BEValue> getHandshakeMap() {
return handshakeMap;
}
/** @since 0.8.4 */
public void setHandshakeMap(Map<String, BEValue> map) {
handshakeMap = map;
}
/** @since 0.8.4 */
public void sendExtension(int type, byte[] payload) {
PeerState s = state;
if (s != null)
s.out.sendExtension(type, payload);
}
/**
* Switch from magnet mode to normal mode
* @since 0.8.4
*/
public void gotMetaInfo(MetaInfo meta) {
PeerState s = state;
if (s != null)
s.gotMetaInfo(meta);
}
public boolean isConnected()
{
return state != null;

View File

@ -90,7 +90,7 @@ class PeerConnectionIn implements Runnable
{
ps.keepAliveMessage();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received keepalive from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received keepalive from " + peer);
continue;
}
@ -102,35 +102,35 @@ class PeerConnectionIn implements Runnable
case 0:
ps.chokeMessage(true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received choke from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received choke from " + peer);
break;
case 1:
ps.chokeMessage(false);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received unchoke from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received unchoke from " + peer);
break;
case 2:
ps.interestedMessage(true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received interested from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received interested from " + peer);
break;
case 3:
ps.interestedMessage(false);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received not interested from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received not interested from " + peer);
break;
case 4:
piece = din.readInt();
ps.haveMessage(piece);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received havePiece(" + piece + ") from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received havePiece(" + piece + ") from " + peer);
break;
case 5:
byte[] bitmap = new byte[i-1];
din.readFully(bitmap);
ps.bitfieldMessage(bitmap);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received bitmap from " + peer + " on " + peer.metainfo.getName() + ": size=" + (i-1) /* + ": " + ps.bitfield */ );
_log.debug("Received bitmap from " + peer + ": size=" + (i-1) /* + ": " + ps.bitfield */ );
break;
case 6:
piece = din.readInt();
@ -138,7 +138,7 @@ class PeerConnectionIn implements Runnable
len = din.readInt();
ps.requestMessage(piece, begin, len);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received request(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received request(" + piece + "," + begin + ") from " + peer);
break;
case 7:
piece = din.readInt();
@ -152,7 +152,7 @@ class PeerConnectionIn implements Runnable
din.readFully(piece_bytes, begin, len);
ps.pieceMessage(req);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received data(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received data(" + piece + "," + begin + ") from " + peer);
}
else
{
@ -160,7 +160,7 @@ class PeerConnectionIn implements Runnable
piece_bytes = new byte[len];
din.readFully(piece_bytes);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer);
}
break;
case 8:
@ -169,27 +169,27 @@ class PeerConnectionIn implements Runnable
len = din.readInt();
ps.cancelMessage(piece, begin, len);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received cancel(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received cancel(" + piece + "," + begin + ") from " + peer);
break;
case 9: // PORT message
int port = din.readUnsignedShort();
ps.portMessage(port);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received port message from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received port message from " + peer);
case 20: // Extension message
int id = din.readUnsignedByte();
byte[] payload = new byte[i-2];
din.readFully(payload);
ps.extensionMessage(id, payload);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received extension message from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received extension message from " + peer);
break;
default:
byte[] bs = new byte[i-1];
din.readFully(bs);
ps.unknownMessage(b, bs);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received unknown message from " + peer + " on " + peer.metainfo.getName());
_log.debug("Received unknown message from " + peer);
}
}
}

View File

@ -151,7 +151,7 @@ class PeerConnectionOut implements Runnable
if (m != null)
{
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send " + peer + ": " + m + " on " + peer.metainfo.getName());
_log.debug("Send " + peer + ": " + m);
// This can block for quite a while.
// To help get slow peers going, and track the bandwidth better,

View File

@ -105,6 +105,7 @@ public class PeerCoordinator implements PeerListener
private boolean halted = false;
private final MagnetState magnetState;
private final CoordinatorListener listener;
private final I2PSnarkUtil _util;
private static final Random _random = I2PAppContext.getGlobalContext().random();
@ -128,6 +129,7 @@ public class PeerCoordinator implements PeerListener
setWantedPieces();
partialPieces = new ArrayList(getMaxConnections() + 1);
peers = new LinkedBlockingQueue();
magnetState = new MagnetState(infohash, metainfo);
// Install a timer to check the uploaders.
// Randomize the first start time so multiple tasks are spread out,
@ -484,7 +486,7 @@ public class PeerCoordinator implements PeerListener
{
public void run()
{
peer.runConnection(_util, listener, bitfield);
peer.runConnection(_util, listener, bitfield, magnetState);
}
};
String threadName = "Snark peer " + peer.toString();
@ -1149,10 +1151,30 @@ public class PeerCoordinator implements PeerListener
}
/** @since 0.8.4 */
public void gotExtension(Peer peer, int id, byte[] bs) {}
public void gotExtension(Peer peer, int id, byte[] bs) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Got extension message " + id + " from " + peer);
// basic handling done in PeerState... here we just check if we are done
if (metainfo == null && id == ExtensionHandler.ID_METADATA) {
synchronized (magnetState) {
if (magnetState.isComplete()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Got completed metainfo via extension");
MetaInfo newinfo = magnetState.getMetaInfo();
// more validation
// set global
// instantiate storage
// tell Snark listener
// tell all peers
}
}
}
}
/** @since 0.8.4 */
public void gotPort(Peer peer, int port) {}
public void gotPort(Peer peer, int port) {
// send to DHT
}
/** Return number of allowed uploaders for this torrent.
** Check with Snark to see if we are over the total upload limit.

View File

@ -497,9 +497,21 @@ class PeerState implements DataLoader
/** @since 0.8.2 */
void extensionMessage(int id, byte[] bs)
{
ExtensionHandler.handleMessage(peer, id, bs);
// Peer coord will get metadata from MagnetState,
// verify, and then call gotMetaInfo()
listener.gotExtension(peer, id, bs);
}
/**
* Switch from magnet mode to normal mode
* @since 0.8.4
*/
public void gotMetaInfo(MetaInfo meta) {
// set metainfo
// fix bitfield
}
/** @since 0.8.4 */
void portMessage(int port)
{

View File

@ -594,9 +594,15 @@ public class SnarkManager implements Snark.CompleteListener {
_peerCoordinatorSet, _connectionAcceptor,
false, getDataDir().getPath());
synchronized (_snarks) {
for (Snark snark : _snarks.values()) {
if (DataHelper.eq(ih, snark.getInfoHash())) {
addMessage(_("Torrent already running: {0}", snark.getBaseName()));
return;
}
}
// Tell the dir monitor not to delete us
_magnets.add(name);
synchronized (_snarks) {
_snarks.put(name, torrent);
}
if (shouldAutoStart()) {