* i2psnark:

- Remove static SnarkManager instance
   - Allow DHT-only torrents
   - DHT debugging
This commit is contained in:
zzz
2012-08-08 17:00:33 +00:00
parent 9c7f4cc604
commit 9cee0ee504
12 changed files with 110 additions and 28 deletions

View File

@ -40,4 +40,5 @@ interface CoordinatorListener
public boolean overUploadLimit(int uploaders);
public boolean overUpBWLimit();
public boolean overUpBWLimit(long total);
public void addMessage(String message);
}

View File

@ -87,7 +87,6 @@ public class I2PSnarkUtil {
//setProxy("127.0.0.1", 4444);
setI2CPConfig("127.0.0.1", 7654, null);
_shitlist = new ConcurrentHashSet();
_configured = false;
_maxUploaders = Snark.MAX_TOTAL_UPLOADERS;
_maxUpBW = DEFAULT_MAX_UP_BW;
_maxConnections = MAX_CONNECTIONS;
@ -248,7 +247,6 @@ public class I2PSnarkUtil {
_manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts);
_connecting = false;
}
// FIXME this only instantiates krpc once, left stuck with old manager
if (_shouldUseDHT && _manager != null && _dht == null)
_dht = new KRPC(_context, _manager.getSession());
return (_manager != null);

View File

@ -890,8 +890,10 @@ class PeerCoordinator implements PeerListener
snark.stopTorrent();
String msg = "Error reading the storage (piece " + piece + ") for " + metainfo.getName() + ": " + ioe;
_log.error(msg, ioe);
SnarkManager.instance().addMessage(msg);
SnarkManager.instance().addMessage("Fatal storage error: Stopping torrent " + metainfo.getName());
if (listener != null) {
listener.addMessage(msg);
listener.addMessage("Fatal storage error: Stopping torrent " + metainfo.getName());
}
throw new RuntimeException(msg, ioe);
}
}
@ -970,8 +972,10 @@ class PeerCoordinator implements PeerListener
snark.stopTorrent();
String msg = "Error writing storage (piece " + piece + ") for " + metainfo.getName() + ": " + ioe;
_log.error(msg, ioe);
SnarkManager.instance().addMessage(msg);
SnarkManager.instance().addMessage("Fatal storage error: Stopping torrent " + metainfo.getName());
if (listener != null) {
listener.addMessage(msg);
listener.addMessage("Fatal storage error: Stopping torrent " + metainfo.getName());
}
throw new RuntimeException(msg, ioe);
}
wantedPieces.remove(p);

View File

@ -1179,6 +1179,15 @@ public class Snark
//System.exit(0);
}
/**
* StorageListener and CoordinatorListener callback
* @since 0.9.2
*/
public void addMessage(String message) {
if (completeListener != null)
completeListener.addMessage(this, message);
}
public interface CompleteListener {
public void torrentComplete(Snark snark);
public void updateStatus(Snark snark);
@ -1198,6 +1207,11 @@ public class Snark
*/
public void fatal(Snark snark, String error);
/**
* @since 0.9.2
*/
public void addMessage(Snark snark, String message);
// not really listeners but the easiest way to get back to an optional SnarkManager
public long getSavedTorrentTime(Snark snark);
public BitField getSavedTorrentBitField(Snark snark);

View File

@ -123,15 +123,11 @@ public class SnarkManager implements Snark.CompleteListener {
/** comma delimited list of name=announceURL=baseURL for the trackers to be displayed */
public static final String PROP_TRACKERS = "i2psnark.trackers";
private static final SnarkManager _instance = new SnarkManager();
public static SnarkManager instance() { return _instance; }
private SnarkManager() {
public SnarkManager(I2PAppContext ctx) {
_snarks = new ConcurrentHashMap();
_magnets = new ConcurrentHashSet();
_addSnarkLock = new Object();
_context = I2PAppContext.getGlobalContext();
_context = ctx;
_log = _context.logManager().getLog(SnarkManager.class);
_messages = new LinkedBlockingQueue();
_util = new I2PSnarkUtil(_context);
@ -824,13 +820,13 @@ public class SnarkManager implements Snark.CompleteListener {
if (info.isPrivate()) {
addMessage(_("ERROR - No I2P trackers in private torrent \"{0}\"", info.getName()));
} else if (_util.shouldUseOpenTrackers() && _util.getOpenTrackers() != null) {
//addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers and DHT only.", info.getName()));
addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers only.", info.getName()));
//} else if (_util.getDHT() != null) {
// addMessage(_("Warning - No I2P trackers in \"{0}\", and open trackers are disabled, will announce to DHT only.", info.getName()));
addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers and DHT only.", info.getName()));
//addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers only.", info.getName()));
} else if (_util.getDHT() != null) {
addMessage(_("Warning - No I2P trackers in \"{0}\", and open trackers are disabled, will announce to DHT only.", info.getName()));
} else {
//addMessage(_("Warning - No I2P trackers in \"{0}\", and DHT and open trackers are disabled, you should enable open trackers or DHT before starting the torrent.", info.getName()));
addMessage(_("Warning - No I2P Trackers found in \"{0}\". Make sure Open Tracker is enabled before starting this torrent.", info.getName()));
addMessage(_("Warning - No I2P trackers in \"{0}\", and DHT and open trackers are disabled, you should enable open trackers or DHT before starting the torrent.", info.getName()));
//addMessage(_("Warning - No I2P Trackers found in \"{0}\". Make sure Open Tracker is enabled before starting this torrent.", info.getName()));
dontAutoStart = true;
}
}
@ -1456,6 +1452,14 @@ public class SnarkManager implements Snark.CompleteListener {
addMessage(_("Error on torrent {0}", snark.getName()) + ": " + error);
}
/**
* A Snark.CompleteListener method.
* @since 0.9.2
*/
public void addMessage(Snark snark, String message) {
addMessage(message);
}
// End Snark.CompleteListeners
/**

View File

@ -740,7 +740,8 @@ public class Storage
} else {
String msg = "File '" + names[i] + "' exists, but has wrong length (expected " +
lengths[i] + " but found " + length + ") - repairing corruption";
SnarkManager.instance().addMessage(msg);
if (listener != null)
listener.addMessage(msg);
_log.error(msg);
changed = true;
resume = true;

View File

@ -61,4 +61,6 @@ interface StorageListener
*
*/
void setWantedPieces(Storage storage);
void addMessage(String message);
}

View File

@ -297,12 +297,11 @@ public class TrackerClient implements Runnable {
}
}
if (trackers.isEmpty()) {
if (trackers.isEmpty() && _util.getDHT() == null) {
stop = true;
// FIXME translate
SnarkManager.instance().addMessage("No valid trackers for " + this.snark.getBaseName() + " - enable opentrackers?");
this.snark.addMessage(_util.getString("No valid trackers for {0} - enable opentrackers or DHT?",
this.snark.getBaseName()));
_log.error("No valid trackers for " + this.snark.getBaseName());
// FIXME keep going if DHT enabled
this.snark.stopTorrent();
return;
}

View File

@ -94,4 +94,9 @@ public interface DHT {
* Known nodes, not estimated total network size.
*/
public int size();
/**
* Debug info, HTML formatted
*/
public String renderStatusHTML();
}

View File

@ -27,6 +27,10 @@ class DHTTracker {
private long _expireTime;
private final Log _log;
private volatile boolean _isRunning;
/** not current, updated by cleaner */
private int _peerCount;
/** not current, updated by cleaner */
private int _torrentCount;
/** stagger with other cleaners */
private static final long CLEAN_TIME = 199*1000;
@ -97,6 +101,15 @@ class DHTTracker {
return rv;
}
/**
* Debug info, HTML formatted
*/
public void renderStatusHTML(StringBuilder buf) {
buf.append("DHT tracker: ").append(_torrentCount).append(" torrents ")
.append(_peerCount).append(" peers ")
.append(DataHelper.formatDuration(_expireTime)).append(" expiration<br>");
}
private class Cleaner extends SimpleTimer2.TimedEvent {
public Cleaner() {
@ -137,6 +150,8 @@ class DHTTracker {
torrentCount + " torrents, " +
peerCount + " peers, " +
DataHelper.formatDuration(_expireTime) + " expiration");
_peerCount = peerCount;
_torrentCount = torrentCount;
schedule(CLEAN_TIME);
}
}

View File

@ -21,6 +21,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PClient;
@ -111,6 +112,12 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private final File _dhtFile;
private volatile boolean _isRunning;
private volatile boolean _hasBootstrapped;
/** stats */
private final AtomicLong _rxPkts = new AtomicLong();
private final AtomicLong _txPkts = new AtomicLong();
private final AtomicLong _rxBytes = new AtomicLong();
private final AtomicLong _txBytes = new AtomicLong();
private long _started;
/** all-zero NID used for pings */
public static final NID FAKE_NID = new NID(new byte[NID.HASH_LENGTH]);
@ -519,6 +526,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
// no need to keep ref, it will eventually stop
new Cleaner();
new Explorer(5*1000);
_txPkts.set(0);
_rxPkts.set(0);
_txBytes.set(0);
_rxBytes.set(0);
_started = _context.clock().now();
}
/**
@ -548,6 +560,26 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
_knownNodes.clear();
}
/**
* Debug info, HTML formatted
*/
public String renderStatusHTML() {
long uptime = Math.max(1000, _context.clock().now() - _started);
StringBuilder buf = new StringBuilder();
buf.append("<br><b>DHT DEBUG</b><br>TX: ").append(_txPkts.get()).append(" pkts / ")
.append(DataHelper.formatSize2(_txBytes.get())).append("B / ")
.append(DataHelper.formatSize2(_txBytes.get() * 1000 / uptime)).append("Bps<br>" +
"RX: ").append(_rxPkts.get()).append(" pkts / ")
.append(DataHelper.formatSize2(_rxBytes.get())).append("B / ")
.append(DataHelper.formatSize2(_rxBytes.get() * 1000 / uptime)).append("Bps<br>" +
"DHT Peers: ").append( _knownNodes.size()).append("<br>" +
"Sent tokens: ").append(_outgoingTokens.size()).append("<br>" +
"Rcvd tokens: ").append(_incomingTokens.size()).append("<br>" +
"Pending queries: ").append(_sentQueries.size()).append("<br>");
_tracker.renderStatusHTML(buf);
return buf.toString();
}
////////// All private below here /////////////////////////////////////
///// Sending.....
@ -862,7 +894,10 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
boolean success = _session.sendMessage(dest, payload, 0, payload.length, null, null, 60*1000,
repliable ? I2PSession.PROTO_DATAGRAM : I2PSession.PROTO_DATAGRAM_RAW,
fromPort, toPort);
if (!success) {
if (success) {
_txPkts.incrementAndGet();
_txBytes.addAndGet(payload.length);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("WTF sendMessage fail");
}
@ -880,7 +915,6 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* @param from dest or null if it didn't come in on signed port
*/
private void receiveMessage(Destination from, int fromPort, byte[] payload) {
try {
InputStream is = new ByteArrayInputStream(payload);
BDecoder dec = new BDecoder(is);
@ -1352,14 +1386,18 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* @param toPort 1-65535 or 0 for unspecified
*/
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) {
// TODO throttle
try {
byte[] payload = session.receiveMessage(msgId);
_rxPkts.incrementAndGet();
_rxBytes.addAndGet(payload.length);
if (toPort == _qPort) {
// repliable
I2PDatagramDissector dgDiss = new I2PDatagramDissector();
dgDiss.loadI2PDatagram(payload);
payload = dgDiss.getPayload();
Destination from = dgDiss.getSender();
// TODO per-dest throttle
receiveMessage(from, fromPort, payload);
} else if (toPort == _rPort) {
// raw

View File

@ -75,8 +75,7 @@ public class I2PSnarkServlet extends DefaultServlet {
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(I2PSnarkServlet.class);
_nonce = _context.random().nextLong();
// FIXME instantiate new one every time
_manager = SnarkManager.instance();
_manager = new SnarkManager(_context);
String configFile = _context.getProperty(PROP_CONFIG_FILE);
if ( (configFile == null) || (configFile.trim().length() <= 0) )
configFile = "i2psnark.config";
@ -449,9 +448,9 @@ public class I2PSnarkServlet extends DefaultServlet {
}
out.write("</th></tr></thead>\n");
String uri = "/i2psnark/";
boolean showDebug = "2".equals(peerParam);
for (int i = 0; i < snarks.size(); i++) {
Snark snark = (Snark)snarks.get(i);
boolean showDebug = "2".equals(peerParam);
boolean showPeers = showDebug || "1".equals(peerParam) || Base64.encode(snark.getInfoHash()).equals(peerParam);
displaySnark(out, snark, uri, i, stats, showPeers, isDegraded, noThinsp, showDebug);
}
@ -478,6 +477,8 @@ public class I2PSnarkServlet extends DefaultServlet {
out.write(", ");
out.write(ngettext("1 DHT peer", "{0} DHT peers", dhts));
}
if (showDebug)
out.write(dht.renderStatusHTML());
}
out.write("</th>\n");
if (_manager.util().connected()) {