* i2psnark:

- Improve torrent shutdown handling to maximize chance of
     announces getting to tracker
   - Clean up delete-torrent messages
   - Remove redundant shutdown hook
   - Avoid NPE in PEX message handling
   - Log tweaks
This commit is contained in:
zzz
2012-06-18 18:06:47 +00:00
parent 273d7399a0
commit 4dcfe3e434
7 changed files with 117 additions and 46 deletions

View File

@ -310,7 +310,10 @@ abstract class ExtensionHandler {
BDecoder dec = new BDecoder(is);
BEValue bev = dec.bdecodeMap();
Map<String, BEValue> map = bev.getMap();
byte[] ids = map.get("added").getBytes();
bev = map.get("added");
if (bev == null)
return;
byte[] ids = bev.getBytes();
if (ids.length < HASH_LENGTH)
return;
int len = Math.min(ids.length, (I2PSnarkUtil.MAX_CONNECTIONS - 1) * HASH_LENGTH);

View File

@ -269,7 +269,11 @@ public class I2PSnarkUtil {
// FIXME this can cause race NPEs elsewhere
_manager = null;
_shitlist.clear();
mgr.destroySocketManager();
if (mgr != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Disconnecting from I2P", new Exception("I did it"));
mgr.destroySocketManager();
}
// this will delete a .torrent file d/l in progress so don't do that...
FileUtil.rmdir(_tmpDir, false);
// in case the user will d/l a .torrent file next...

View File

@ -572,14 +572,24 @@ public class Snark
debug("NOT starting TrackerClient???", NOTICE);
}
}
/**
* Stop contacting the tracker and talking with peers
*/
public void stopTorrent() {
stopTorrent(false);
}
/**
* Stop contacting the tracker and talking with peers
* @param fast if true, limit the life of the unannounce threads
* @since 0.9.1
*/
public void stopTorrent(boolean fast) {
stopped = true;
TrackerClient tc = trackerclient;
if (tc != null)
tc.halt();
tc.halt(fast);
PeerCoordinator pc = coordinator;
if (pc != null)
pc.halt();

View File

@ -34,6 +34,8 @@ import net.i2p.util.Log;
import net.i2p.util.OrderedProperties;
import net.i2p.util.SecureDirectory;
import net.i2p.util.SecureFileOutputStream;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Manage multiple snarks
@ -145,14 +147,21 @@ public class SnarkManager implements Snark.CompleteListener {
_connectionAcceptor = new ConnectionAcceptor(_util);
_monitor = new I2PAppThread(new DirMonitor(), "Snark DirMonitor", true);
_monitor.start();
_context.addShutdownTask(new SnarkManagerShutdown());
// Not required, Jetty has a shutdown hook
//_context.addShutdownTask(new SnarkManagerShutdown());
}
/*
* Called by the webapp at Jetty shutdown.
* Stops all torrents. Does not close the tunnel, so the announces have a chance.
* Fix this so an individual webaapp stop will close the tunnel.
* Runs inline.
*/
public void stop() {
_running = false;
_monitor.interrupt();
_connectionAcceptor.halt();
(new SnarkManagerShutdown()).run();
stopAllTorrents(true);
}
/** hook to I2PSnarkUtil for the servlet */
@ -1590,20 +1599,59 @@ public class SnarkManager implements Snark.CompleteListener {
}
}
public class SnarkManagerShutdown extends I2PAppThread {
@Override
public void run() {
Set names = listTorrentFiles();
for (Iterator iter = names.iterator(); iter.hasNext(); ) {
Snark snark = getTorrent((String)iter.next());
if ( (snark != null) && (!snark.isStopped()) ) {
snark.stopTorrent();
try { Thread.sleep(50); } catch (InterruptedException ie) {}
/**
* Stop all running torrents, and close the tunnel after a delay
* to allow for announces.
* If called at router shutdown via Jetty shutdown hook -> webapp destroy() -> stop(),
* the tunnel won't actually be closed as the SimpleScheduler is already shutdown
* or will be soon, so we delay a few seconds inline.
* @param finalShutdown if true, sleep at the end if any torrents were running
* @since 0.9.1
*/
public void stopAllTorrents(boolean finalShutdown) {
if (finalShutdown && _log.shouldLog(Log.WARN))
_log.warn("SnarkManager final shutdown");
int count = 0;
for (Snark snark : _snarks.values()) {
if (!snark.isStopped()) {
if (count == 0)
addMessage(_("Stopping all torrents and closing the I2P tunnel."));
count++;
if (finalShutdown)
snark.stopTorrent(true);
else
stopTorrent(snark, false);
// Throttle since every unannounce is now threaded.
// How to do this without creating a ton of threads?
try { Thread.sleep(20); } catch (InterruptedException ie) {}
}
}
if (_util.connected()) {
if (count > 0) {
// Schedule this even for final shutdown, as there's a chance
// that it's just this webapp that is stopping.
SimpleScheduler.getInstance().addEvent(new Disconnector(), 60*1000);
addMessage(_("Closing I2P tunnel after announces to trackers."));
if (finalShutdown) {
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
}
} else {
_util.disconnect();
addMessage(_("I2P tunnel closed."));
}
}
}
/** @since 0.9.1 */
private class Disconnector implements SimpleTimer.TimedEvent {
public void timeReached() {
if (_util.connected()) {
_util.disconnect();
addMessage(_("I2P tunnel closed."));
}
}
}
/**
* ignore case, current locale
* @since 0.9

View File

@ -26,6 +26,7 @@ import net.i2p.util.I2PAppThread;
/**
* Makes sure everything ends correctly when shutting down.
* @deprecated unused
*/
public class SnarkShutdown extends I2PAppThread
{
@ -61,7 +62,7 @@ public class SnarkShutdown extends I2PAppThread
//Snark.debug("Halting TrackerClient...", Snark.INFO);
if (trackerclient != null)
trackerclient.halt();
trackerclient.halt(true);
//Snark.debug("Halting PeerCoordinator...", Snark.INFO);
if (coordinator != null)

View File

@ -28,6 +28,7 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
@ -97,6 +98,7 @@ public class TrackerClient implements Runnable {
// these 2 used in loop()
private volatile boolean runStarted;
private volatile int consecutiveFails;
private volatile boolean _fastUnannounce;
private final List<Tracker> trackers;
@ -134,6 +136,7 @@ public class TrackerClient implements Runnable {
stop = false;
consecutiveFails = 0;
runStarted = false;
_fastUnannounce = false;
_thread = new I2PAppThread(this, _threadName + " #" + (++_runCount), true);
_thread.start();
started = true;
@ -144,8 +147,9 @@ public class TrackerClient implements Runnable {
/**
* Interrupts this Thread to stop it.
* @param fast if true, limit the life of the unannounce threads
*/
public synchronized void halt() {
public synchronized void halt(boolean fast) {
boolean wasStopped = stop;
if (wasStopped) {
if (_log.shouldLog(Log.WARN))
@ -168,6 +172,7 @@ public class TrackerClient implements Runnable {
_log.debug("Interrupting " + t.getName());
t.interrupt();
}
_fastUnannounce = true;
if (!wasStopped)
unannounce();
}
@ -415,6 +420,9 @@ public class TrackerClient implements Runnable {
tr.interval = LONG_SLEEP; // slow down
}
}
} else {
_util.debug("Not announcing to " + tr.announce + " last announce was " +
new Date(tr.lastRequestTime) + " interval is " + DataHelper.formatDuration(tr.interval), Snark.INFO);
}
if ((!tr.stop) && maxSeenPeers < tr.seenPeers)
maxSeenPeers = tr.seenPeers;
@ -439,6 +447,8 @@ public class TrackerClient implements Runnable {
}
}
}
} else {
_util.debug("Not getting PEX peers", Snark.INFO);
}
// Get peers from DHT
@ -475,6 +485,8 @@ public class TrackerClient implements Runnable {
}
}
}
} else {
_util.debug("Not getting DHT peers", Snark.INFO);
}
@ -533,7 +545,7 @@ public class TrackerClient implements Runnable {
if (_util.connected() &&
tr.started && (!tr.stop) && tr.trackerProblems == null) {
try {
(new I2PAppThread(new Unannouncer(tr), _threadName + " Unannounce " + (++i), true)).start();
(new I2PAppThread(new Unannouncer(tr), _threadName + " U" + (++i), true)).start();
} catch (OutOfMemoryError oom) {
// probably ran out of threads, ignore
tr.reset();
@ -610,8 +622,9 @@ public class TrackerClient implements Runnable {
_util.debug("Sending TrackerClient request: " + s, Snark.INFO);
tr.lastRequestTime = System.currentTimeMillis();
// Don't wait for a response to stopped.
File fetched = _util.get(s, true, event.equals(STOPPED_EVENT) ? -1 : 0);
// Don't wait for a response to stopped when shutting down
boolean fast = _fastUnannounce && event.equals(STOPPED_EVENT);
File fetched = _util.get(s, true, fast ? -1 : 0);
if (fetched == null) {
throw new IOException("Error fetching " + s);
}

View File

@ -74,6 +74,7 @@ public class I2PSnarkServlet extends DefaultServlet {
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(I2PSnarkServlet.class);
_nonce = _context.random().nextLong();
// FIXME instantiate new one every time
_manager = SnarkManager.instance();
String configFile = _context.getProperty(PROP_CONFIG_FILE);
if ( (configFile == null) || (configFile.trim().length() <= 0) )
@ -322,7 +323,7 @@ public class I2PSnarkServlet extends DefaultServlet {
final long stats[] = {0,0,0,0,0,0};
String peerParam = req.getParameter("p");
List snarks = getSortedSnarks(req);
List<Snark> snarks = getSortedSnarks(req);
boolean isForm = _manager.util().connected() || !snarks.isEmpty();
if (isForm) {
out.write("<form action=\"_post\" method=\"POST\">\n");
@ -644,10 +645,11 @@ public class I2PSnarkServlet extends DefaultServlet {
// multifile torrents have the getFiles() return lists of lists of filenames, but
// each of those lists just contain a single file afaict...
File df = Storage.getFileFromNames(f, files.get(i));
if (df.delete())
_manager.addMessage(_("Data file deleted: {0}", df.getAbsolutePath()));
else
if (df.delete()) {
//_manager.addMessage(_("Data file deleted: {0}", df.getAbsolutePath()));
} else {
_manager.addMessage(_("Data file could not be deleted: {0}", df.getAbsolutePath()));
}
}
// step 2 make Set of dirs with reverse sort
Set<File> dirs = new TreeSet(Collections.reverseOrder());
@ -659,16 +661,20 @@ public class I2PSnarkServlet extends DefaultServlet {
// step 3 delete dirs bottom-up
for (File df : dirs) {
if (df.delete()) {
_manager.addMessage(_("Data dir deleted: {0}", df.getAbsolutePath()));
} else if (_log.shouldLog(Log.WARN)) {
_log.warn("Could not delete dir " + df);
//_manager.addMessage(_("Data dir deleted: {0}", df.getAbsolutePath()));
} else {
_manager.addMessage(_("Directory could not be deleted: {0}", df.getAbsolutePath()));
if (_log.shouldLog(Log.WARN))
_log.warn("Could not delete dir " + df);
}
}
// step 4 delete base
if (f.delete()) {
_manager.addMessage(_("Data dir deleted: {0}", f.getAbsolutePath()));
} else if (_log.shouldLog(Log.WARN)) {
_log.warn("Could not delete dir " + f);
_manager.addMessage(_("Directory deleted: {0}", f.getAbsolutePath()));
} else {
_manager.addMessage(_("Directory could not be deleted: {0}", f.getAbsolutePath()));
if (_log.shouldLog(Log.WARN))
_log.warn("Could not delete dir " + f);
}
break;
}
@ -739,26 +745,12 @@ public class I2PSnarkServlet extends DefaultServlet {
_manager.addMessage(_("Error creating torrent - you must enter a file or directory"));
}
} else if ("StopAll".equals(action)) {
_manager.addMessage(_("Stopping all torrents and closing the I2P tunnel."));
List snarks = getSortedSnarks(req);
for (int i = 0; i < snarks.size(); i++) {
Snark snark = (Snark)snarks.get(i);
if (!snark.isStopped()) {
_manager.stopTorrent(snark, false);
try { Thread.sleep(50); } catch (InterruptedException ie) {}
}
}
if (_manager.util().connected()) {
// Give the stopped announces time to get out
try { Thread.sleep(2000); } catch (InterruptedException ie) {}
_manager.util().disconnect();
_manager.addMessage(_("I2P tunnel closed."));
}
_manager.stopAllTorrents(false);
} else if ("StartAll".equals(action)) {
_manager.addMessage(_("Opening the I2P tunnel and starting all torrents."));
List snarks = getSortedSnarks(req);
List<Snark> snarks = getSortedSnarks(req);
for (int i = 0; i < snarks.size(); i++) {
Snark snark = (Snark)snarks.get(i);
Snark snark = snarks.get(i);
if (snark.isStopped())
snark.startTorrent();
}