- Rework traffic counters

- Record the metadata bandwidth
- More null announce handling
- Callbacks for got MetaInfo event
- Cleanups
This commit is contained in:
zzz
2010-12-21 23:43:13 +00:00
parent f15b329874
commit 690aea255b
7 changed files with 129 additions and 52 deletions

View File

@ -31,6 +31,12 @@ public interface CoordinatorListener
*/ */
void peerChange(PeerCoordinator coordinator, Peer peer); void peerChange(PeerCoordinator coordinator, Peer peer);
/**
* Called when the PeerCoordinator got the MetaInfo via magnet.
* @since 0.8.4
*/
void gotMetaInfo(PeerCoordinator coordinator, MetaInfo metainfo);
public boolean overUploadLimit(int uploaders); public boolean overUploadLimit(int uploaders);
public boolean overUpBWLimit(); public boolean overUpBWLimit();
public boolean overUpBWLimit(long total); public boolean overUpBWLimit(long total);

View File

@ -26,7 +26,8 @@ abstract class ExtensionHandler {
public static final int ID_METADATA = 3; public static final int ID_METADATA = 3;
private static final String TYPE_METADATA = "ut_metadata"; private static final String TYPE_METADATA = "ut_metadata";
private static final int MAX_METADATA_SIZE = Storage.MAX_PIECES * 32 * 5 / 4; /** Pieces * SHA1 Hash length, + 25% extra for file names, benconding overhead, etc */
private static final int MAX_METADATA_SIZE = Storage.MAX_PIECES * 20 * 5 / 4;
private static final int PARALLEL_REQUESTS = 3; private static final int PARALLEL_REQUESTS = 3;
@ -49,16 +50,16 @@ abstract class ExtensionHandler {
return BEncoder.bencode(handshake); return BEncoder.bencode(handshake);
} }
public static void handleMessage(Peer peer, int id, byte[] bs) { public static void handleMessage(Peer peer, PeerListener listener, int id, byte[] bs) {
if (id == 0) if (id == 0)
handleHandshake(peer, bs); handleHandshake(peer, listener, bs);
else if (id == ID_METADATA) else if (id == ID_METADATA)
handleMetadata(peer, bs); handleMetadata(peer, listener, bs);
else if (_log.shouldLog(Log.INFO)) else if (_log.shouldLog(Log.INFO))
_log.info("Unknown extension msg " + id + " from " + peer); _log.info("Unknown extension msg " + id + " from " + peer);
} }
private static void handleHandshake(Peer peer, byte[] bs) { private static void handleHandshake(Peer peer, PeerListener listener, byte[] bs) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Got handshake msg from " + peer); _log.debug("Got handshake msg from " + peer);
try { try {
@ -67,11 +68,11 @@ abstract class ExtensionHandler {
BDecoder dec = new BDecoder(is); BDecoder dec = new BDecoder(is);
BEValue bev = dec.bdecodeMap(); BEValue bev = dec.bdecodeMap();
Map<String, BEValue> map = bev.getMap(); Map<String, BEValue> map = bev.getMap();
Map<String, BEValue> msgmap = map.get("m").getMap();
peer.setHandshakeMap(map); peer.setHandshakeMap(map);
Map<String, BEValue> msgmap = map.get("m").getMap();
// not used, just to throw out of here // rv not used, just to throw an NPE to get out of here
int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get(TYPE_METADATA).getInt(); msgmap.get(TYPE_METADATA).getInt();
int metaSize = map.get("metadata_size").getInt(); int metaSize = map.get("metadata_size").getInt();
MagnetState state = peer.getMagnetState(); MagnetState state = peer.getMagnetState();
@ -121,13 +122,12 @@ abstract class ExtensionHandler {
private static final int TYPE_REJECT = 2; private static final int TYPE_REJECT = 2;
private static final int CHUNK_SIZE = 16*1024; private static final int CHUNK_SIZE = 16*1024;
/** 25% extra for file names, benconding overhead, etc */
/** /**
* REF: BEP 9 * REF: BEP 9
* @since 0.8.4 * @since 0.8.4
*/ */
private static void handleMetadata(Peer peer, byte[] bs) { private static void handleMetadata(Peer peer, PeerListener listener, byte[] bs) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Got metadata msg from " + peer); _log.debug("Got metadata msg from " + peer);
try { try {
@ -145,6 +145,9 @@ abstract class ExtensionHandler {
pc = state.getChunk(piece); pc = state.getChunk(piece);
} }
sendPiece(peer, piece, pc); sendPiece(peer, piece, pc);
// Do this here because PeerConnectionOut only reports for PIECE messages
peer.uploaded(pc.length);
listener.uploaded(peer, pc.length);
} else if (type == TYPE_DATA) { } else if (type == TYPE_DATA) {
int size = map.get("total_size").getInt(); int size = map.get("total_size").getInt();
boolean done; boolean done;
@ -153,6 +156,8 @@ abstract class ExtensionHandler {
if (state.isComplete()) if (state.isComplete())
return; return;
int len = is.available(); int len = is.available();
peer.downloaded(len);
listener.downloaded(peer, len);
done = state.saveChunk(piece, bs, bs.length - len, len); done = state.saveChunk(piece, bs, bs.length - len, len);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Got chunk " + piece + " from " + peer); _log.info("Got chunk " + piece + " from " + peer);
@ -189,8 +194,17 @@ abstract class ExtensionHandler {
} }
private static void sendRequest(Peer peer, int piece) { private static void sendRequest(Peer peer, int piece) {
sendMessage(peer, TYPE_REQUEST, piece);
}
private static void sendReject(Peer peer, int piece) {
sendMessage(peer, TYPE_REJECT, piece);
}
/** REQUEST and REJECT are the same except for message type */
private static void sendMessage(Peer peer, int type, int piece) {
Map<String, Object> map = new HashMap(); Map<String, Object> map = new HashMap();
map.put("msg_type", TYPE_REQUEST); map.put("msg_type", Integer.valueOf(type));
map.put("piece", Integer.valueOf(piece)); map.put("piece", Integer.valueOf(piece));
byte[] payload = BEncoder.bencode(map); byte[] payload = BEncoder.bencode(map);
try { try {
@ -205,7 +219,7 @@ abstract class ExtensionHandler {
private static void sendPiece(Peer peer, int piece, byte[] data) { private static void sendPiece(Peer peer, int piece, byte[] data) {
Map<String, Object> map = new HashMap(); Map<String, Object> map = new HashMap();
map.put("msg_type", TYPE_REQUEST); map.put("msg_type", Integer.valueOf(TYPE_REQUEST));
map.put("piece", Integer.valueOf(piece)); map.put("piece", Integer.valueOf(piece));
map.put("total_size", Integer.valueOf(data.length)); map.put("total_size", Integer.valueOf(data.length));
byte[] dict = BEncoder.bencode(map); byte[] dict = BEncoder.bencode(map);

View File

@ -53,6 +53,10 @@ public class Peer implements Comparable
private DataInputStream din; private DataInputStream din;
private DataOutputStream dout; private DataOutputStream dout;
/** running counters */
private long downloaded;
private long uploaded;
// Keeps state for in/out connections. Non-null when the handshake // Keeps state for in/out connections. Non-null when the handshake
// was successful, the connection setup and runs // was successful, the connection setup and runs
PeerState state; PeerState state;
@ -410,10 +414,10 @@ public class Peer implements Comparable
* Switch from magnet mode to normal mode * Switch from magnet mode to normal mode
* @since 0.8.4 * @since 0.8.4
*/ */
public void gotMetaInfo(MetaInfo meta) { public void setMetaInfo(MetaInfo meta) {
PeerState s = state; PeerState s = state;
if (s != null) if (s != null)
s.gotMetaInfo(meta); s.setMetaInfo(meta);
} }
public boolean isConnected() public boolean isConnected()
@ -577,14 +581,29 @@ public class Peer implements Comparable
return (s == null) || s.choked; return (s == null) || s.choked;
} }
/**
* Increment the counter.
* @since 0.8.4
*/
public void downloaded(int size) {
downloaded += size;
}
/**
* Increment the counter.
* @since 0.8.4
*/
public void uploaded(int size) {
uploaded += size;
}
/** /**
* Returns the number of bytes that have been downloaded. * Returns the number of bytes that have been downloaded.
* Can be reset to zero with <code>resetCounters()</code>/ * Can be reset to zero with <code>resetCounters()</code>/
*/ */
public long getDownloaded() public long getDownloaded()
{ {
PeerState s = state; return downloaded;
return (s != null) ? s.downloaded : 0;
} }
/** /**
@ -593,8 +612,7 @@ public class Peer implements Comparable
*/ */
public long getUploaded() public long getUploaded()
{ {
PeerState s = state; return uploaded;
return (s != null) ? s.uploaded : 0;
} }
/** /**
@ -602,12 +620,8 @@ public class Peer implements Comparable
*/ */
public void resetCounters() public void resetCounters()
{ {
PeerState s = state; downloaded = 0;
if (s != null) uploaded = 0;
{
s.downloaded = 0;
s.uploaded = 0;
}
} }
public long getInactiveTime() { public long getInactiveTime() {

View File

@ -45,13 +45,15 @@ public class PeerCoordinator implements PeerListener
/** /**
* External use by PeerMonitorTask only. * External use by PeerMonitorTask only.
* Will be null when in magnet mode.
*/ */
final MetaInfo metainfo; MetaInfo metainfo;
/** /**
* External use by PeerMonitorTask only. * External use by PeerMonitorTask only.
* Will be null when in magnet mode.
*/ */
final Storage storage; Storage storage;
private final Snark snark; private final Snark snark;
// package local for access by CheckDownLoadersTask // package local for access by CheckDownLoadersTask
@ -1150,7 +1152,10 @@ public class PeerCoordinator implements PeerListener
} }
} }
/** @since 0.8.4 */ /**
* PeerListener callback
* @since 0.8.4
*/
public void gotExtension(Peer peer, int id, byte[] bs) { public void gotExtension(Peer peer, int id, byte[] bs) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Got extension message " + id + " from " + peer); _log.debug("Got extension message " + id + " from " + peer);
@ -1160,18 +1165,29 @@ public class PeerCoordinator implements PeerListener
if (magnetState.isComplete()) { if (magnetState.isComplete()) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Got completed metainfo via extension"); _log.warn("Got completed metainfo via extension");
MetaInfo newinfo = magnetState.getMetaInfo(); metainfo = magnetState.getMetaInfo();
// more validation listener.gotMetaInfo(this, metainfo);
// set global for (Peer p : peers) {
// instantiate storage p.setMetaInfo(metainfo);
// tell Snark listener }
// tell all peers
} }
} }
} }
} }
/** @since 0.8.4 */ /**
* Sets the storage after transition out of magnet mode
* Snark calls this after we call gotMetaInfo()
* @since 0.8.4
*/
public void setStorage(Storage stg) {
storage = stg;
}
/**
* PeerListener callback
* @since 0.8.4
*/
public void gotPort(Peer peer, int port) { public void gotPort(Peer peer, int port) {
// send to DHT // send to DHT
} }

View File

@ -36,8 +36,9 @@ class PeerState implements DataLoader
{ {
private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(PeerState.class); private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(PeerState.class);
private final Peer peer; private final Peer peer;
/** Fixme, used by Peer.disconnect() to get to the coordinator */
final PeerListener listener; final PeerListener listener;
private final MetaInfo metainfo; private MetaInfo metainfo;
// Interesting and choking describes whether we are interested in or // Interesting and choking describes whether we are interested in or
// are choking the other side. // are choking the other side.
@ -49,10 +50,6 @@ class PeerState implements DataLoader
boolean interested = false; boolean interested = false;
boolean choked = true; boolean choked = true;
// Package local for use by Peer.
long downloaded;
long uploaded;
/** the pieces the peer has */ /** the pieces the peer has */
BitField bitfield; BitField bitfield;
@ -285,7 +282,7 @@ class PeerState implements DataLoader
*/ */
void uploaded(int size) void uploaded(int size)
{ {
uploaded += size; peer.uploaded(size);
listener.uploaded(peer, size); listener.uploaded(peer, size);
} }
@ -305,7 +302,7 @@ class PeerState implements DataLoader
void pieceMessage(Request req) void pieceMessage(Request req)
{ {
int size = req.len; int size = req.len;
downloaded += size; peer.downloaded(size);
listener.downloaded(peer, size); listener.downloaded(peer, size);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -326,9 +323,6 @@ class PeerState implements DataLoader
{ {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Got BAD " + req.piece + " from " + peer); _log.warn("Got BAD " + req.piece + " from " + peer);
// XXX ARGH What now !?!
// FIXME Why would we set downloaded to 0?
downloaded = 0;
} }
} }
@ -372,7 +366,6 @@ class PeerState implements DataLoader
_log.info("Unrequested 'piece: " + piece + ", " _log.info("Unrequested 'piece: " + piece + ", "
+ begin + ", " + length + "' received from " + begin + ", " + length + "' received from "
+ peer); + peer);
downloaded = 0; // XXX - punishment?
return null; return null;
} }
@ -397,7 +390,6 @@ class PeerState implements DataLoader
+ begin + ", " + begin + ", "
+ length + "' received from " + length + "' received from "
+ peer); + peer);
downloaded = 0; // XXX - punishment?
return null; return null;
} }
@ -497,7 +489,7 @@ class PeerState implements DataLoader
/** @since 0.8.2 */ /** @since 0.8.2 */
void extensionMessage(int id, byte[] bs) void extensionMessage(int id, byte[] bs)
{ {
ExtensionHandler.handleMessage(peer, id, bs); ExtensionHandler.handleMessage(peer, listener, id, bs);
// Peer coord will get metadata from MagnetState, // Peer coord will get metadata from MagnetState,
// verify, and then call gotMetaInfo() // verify, and then call gotMetaInfo()
listener.gotExtension(peer, id, bs); listener.gotExtension(peer, id, bs);
@ -507,9 +499,18 @@ class PeerState implements DataLoader
* Switch from magnet mode to normal mode * Switch from magnet mode to normal mode
* @since 0.8.4 * @since 0.8.4
*/ */
public void gotMetaInfo(MetaInfo meta) { public void setMetaInfo(MetaInfo meta) {
// set metainfo BitField oldBF = bitfield;
// fix bitfield if (oldBF != null) {
if (oldBF.size() != meta.getPieces())
// fix bitfield, it was too big by 1-7 bits
bitfield = new BitField(oldBF.getFieldBytes(), meta.getPieces());
// else no extra
} else {
// it will be initialized later
//bitfield = new BitField(meta.getPieces());
}
metainfo = meta;
} }
/** @since 0.8.4 */ /** @since 0.8.4 */

View File

@ -1004,6 +1004,30 @@ public class Snark
// System.out.println(peer.toString()); // System.out.println(peer.toString());
} }
/**
* Called when the PeerCoordinator got the MetaInfo via magnet.
* CoordinatorListener.
* Create the storage, tell SnarkManager, and give the storage
* back to the coordinator.
*
* @throws RuntimeException via fatal()
* @since 0.8.4
*/
public void gotMetaInfo(PeerCoordinator coordinator, MetaInfo metainfo) {
meta = metainfo;
try {
storage = new Storage(_util, meta, this);
if (completeListener != null)
completeListener.gotMetaInfo(this);
coordinator.setStorage(storage);
} catch (IOException ioe) {
if (storage != null) {
try { storage.close(); } catch (IOException ioee) {}
}
fatal("Could not check or create storage", ioe);
}
}
private boolean allocating = false; private boolean allocating = false;
public void storageCreateFile(Storage storage, String name, long length) public void storageCreateFile(Storage storage, String name, long length)
{ {

View File

@ -573,9 +573,11 @@ public class SnarkManager implements Snark.CompleteListener {
if (!TrackerClient.isValidAnnounce(info.getAnnounce())) { if (!TrackerClient.isValidAnnounce(info.getAnnounce())) {
if (_util.shouldUseOpenTrackers() && _util.getOpenTrackers() != null) { if (_util.shouldUseOpenTrackers() && _util.getOpenTrackers() != null) {
addMessage(_("Warning - Ignoring non-i2p tracker in \"{0}\", will announce to i2p open trackers only", info.getName())); addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers and DHT only.", info.getName()));
} else if (_util.getDHT() != null) {
addMessage(_("Warning - No I2P trackers in \"{0}\", and open trackers are disabled, will announce to DHT only.", info.getName()));
} else { } else {
addMessage(_("Warning - Ignoring non-i2p tracker in \"{0}\", and open trackers are disabled, you must enable open trackers before starting the torrent!", info.getName())); addMessage(_("Warning - No I2P trackers in \"{0}\", and DHT and open trackers are disabled, you should enable open trackers or DHT before starting the torrent.", info.getName()));
dontAutoStart = true; dontAutoStart = true;
} }
} }