i2psnark ConnectionAcceptor:

- Fix ConnectionAcceptor not restarting after tunnel
    restart, preventing incoming connections
  - locking tweaks
  - cleaner reschedule tweaks
  - don't store server socket locally
  - constructor tweaks / finals
  - Stop acceptor when tunnel disconnects
  - javadocs
This commit is contained in:
zzz
2013-12-04 14:17:38 +00:00
parent 445e4301d5
commit 7fe8573df4
5 changed files with 87 additions and 62 deletions

View File

@ -41,51 +41,49 @@ import net.i2p.util.SimpleTimer2;
class ConnectionAcceptor implements Runnable
{
private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(ConnectionAcceptor.class);
private I2PServerSocket serverSocket;
private PeerAcceptor peeracceptor;
private final PeerAcceptor peeracceptor;
private Thread thread;
private final I2PSnarkUtil _util;
private final ObjectCounter<Hash> _badCounter = new ObjectCounter<Hash>();
private final SimpleTimer2.TimedEvent _cleaner;
private volatile boolean stop;
private boolean socketChanged;
// protocol errors before blacklisting.
private static final int MAX_BAD = 1;
private static final long BAD_CLEAN_INTERVAL = 30*60*1000;
/**
* Multitorrent
* Multitorrent. Caller MUST call startAccepting()
*/
public ConnectionAcceptor(I2PSnarkUtil util) {
public ConnectionAcceptor(I2PSnarkUtil util, PeerCoordinatorSet set) {
_util = util;
_cleaner = new Cleaner();
peeracceptor = new PeerAcceptor(set);
}
public synchronized void startAccepting(PeerCoordinatorSet set, I2PServerSocket socket) {
if (serverSocket != socket) {
if ( (peeracceptor == null) || (peeracceptor.coordinators != set) )
peeracceptor = new PeerAcceptor(set);
serverSocket = socket;
/**
* May be called even when already running. May be called to start up again after halt().
*/
public synchronized void startAccepting() {
stop = false;
socketChanged = true;
if (_log.shouldLog(Log.WARN))
_log.warn("ConnectionAcceptor startAccepting new thread? " + (thread == null));
if (thread == null) {
thread = new I2PAppThread(this, "I2PSnark acceptor");
thread.setDaemon(true);
thread.start();
_cleaner.schedule(BAD_CLEAN_INTERVAL);
_cleaner.reschedule(BAD_CLEAN_INTERVAL, false);
}
}
}
/**
* Unused (single torrent)
* Unused (single torrent).
* Do NOT call startAccepting().
*/
public ConnectionAcceptor(I2PSnarkUtil util, I2PServerSocket serverSocket,
public ConnectionAcceptor(I2PSnarkUtil util,
PeerAcceptor peeracceptor)
{
this.serverSocket = serverSocket;
this.peeracceptor = peeracceptor;
_util = util;
@ -95,36 +93,49 @@ class ConnectionAcceptor implements Runnable
_cleaner = new Cleaner();
}
public void halt()
/**
* May be restarted later with startAccepting().
*/
public synchronized void halt()
{
if (stop) return;
stop = true;
locked_halt();
Thread t = thread;
if (t != null) {
t.interrupt();
thread = null;
}
}
I2PServerSocket ss = serverSocket;
if (ss != null)
/**
* Caller must synch
* @since 0.9.9
*/
private void locked_halt()
{
I2PServerSocket ss = _util.getServerSocket();
if (ss != null) {
try
{
ss.close();
}
catch(I2PException ioe) { }
Thread t = thread;
if (t != null)
t.interrupt();
}
_badCounter.clear();
_cleaner.cancel();
}
/**
* Effectively unused, would only be called if we changed
* I2CP host/port, which is hidden in the gui if in router context
* FIXME this only works if already running
*/
public void restart() {
serverSocket = _util.getServerSocket();
socketChanged = true;
public synchronized void restart() {
Thread t = thread;
if (t != null)
t.interrupt();
_cleaner.schedule(BAD_CLEAN_INTERVAL);
}
public int getPort()
@ -133,17 +144,28 @@ class ConnectionAcceptor implements Runnable
}
public void run()
{
try {
run2();
} finally {
synchronized(this) {
thread = null;
}
}
}
private void run2()
{
while(!stop)
{
if (socketChanged) {
// ok, already updated
socketChanged = false;
}
I2PServerSocket serverSocket = _util.getServerSocket();
while ( (serverSocket == null) && (!stop)) {
if (!(_util.isConnecting() || _util.connected())) {
stop = true;
break;
}
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
serverSocket = _util.getServerSocket();
if (serverSocket == null)
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
}
if(stop)
break;
@ -151,15 +173,7 @@ class ConnectionAcceptor implements Runnable
{
I2PSocket socket = serverSocket.accept();
if (socket == null) {
if (socketChanged) {
continue;
} else {
I2PServerSocket ss = _util.getServerSocket();
if (ss != serverSocket) {
serverSocket = ss;
socketChanged = true;
}
}
} else {
if (socket.getPeerDestination().equals(_util.getMyDestination())) {
_log.error("Incoming connection from myself");
@ -188,26 +202,34 @@ class ConnectionAcceptor implements Runnable
}
catch (I2PException ioe)
{
if (!socketChanged) {
_log.error("Error while accepting", ioe);
stop = true;
int level = stop ? Log.WARN : Log.ERROR;
if (_log.shouldLog(level))
_log.log(level, "Error while accepting", ioe);
synchronized(this) {
if (!stop) {
locked_halt();
thread = null;
stop = true;
}
}
}
catch (IOException ioe)
{
_log.error("Error while accepting", ioe);
stop = true;
int level = stop ? Log.WARN : Log.ERROR;
if (_log.shouldLog(level))
_log.log(level, "Error while accepting", ioe);
synchronized(this) {
if (!stop) {
locked_halt();
thread = null;
stop = true;
}
}
}
// catch oom?
}
try
{
if (serverSocket != null)
serverSocket.close();
}
catch (I2PException ignored) { }
if (_log.shouldLog(Log.WARN))
_log.warn("ConnectionAcceptor closed");
}
private class Handler implements Runnable {

View File

@ -525,18 +525,17 @@ public class Snark
if (_peerCoordinatorSet != null) {
// multitorrent
_peerCoordinatorSet.add(coordinator);
if (acceptor != null) {
acceptor.startAccepting(_peerCoordinatorSet, serversocket);
} else {
// error
}
} else {
// single torrent
acceptor = new ConnectionAcceptor(_util, serversocket, new PeerAcceptor(coordinator));
acceptor = new ConnectionAcceptor(_util, new PeerAcceptor(coordinator));
}
// TODO pass saved closest DHT nodes to the tracker? or direct to the coordinator?
trackerclient = new TrackerClient(_util, meta, additionalTrackerURL, coordinator, this);
}
// ensure acceptor is running when in multitorrent
if (_peerCoordinatorSet != null && acceptor != null) {
acceptor.startAccepting();
}
stopped = false;
if (coordinator.halted()) {

View File

@ -169,7 +169,7 @@ public class SnarkManager implements CompleteListener {
public void start() {
_running = true;
_peerCoordinatorSet = new PeerCoordinatorSet();
_connectionAcceptor = new ConnectionAcceptor(_util);
_connectionAcceptor = new ConnectionAcceptor(_util, _peerCoordinatorSet);
_monitor = new I2PAppThread(new DirMonitor(), "Snark DirMonitor", true);
_monitor.start();
// only if default instance