2009-05-06 sponge

* Hopefully the last fixes for BOB.
    * Fixes to prevent race in client-side I2CP and Notifier.
This commit is contained in:
sponge
2009-05-06 05:34:33 +00:00
parent 7e3bda9d4d
commit eba6ca5430
9 changed files with 430 additions and 421 deletions

View File

@ -25,6 +25,8 @@ package net.i2p.BOB;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.i2p.I2PException;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
@ -78,8 +80,10 @@ public class I2Plistener implements Runnable {
public void run() {
boolean g = false;
I2PSocket sessSocket = null;
die: {
int conn = 0;
try {
die:
{
serverSocket.setSoTimeout(50);
boolean spin = true;
@ -111,9 +115,10 @@ die: {
}
if (g) {
g = false;
conn++;
// toss the connection to a new thread.
I2PtoTCP conn_c = new I2PtoTCP(sessSocket, info, database);
Thread t = new Thread(conn_c, "BOBI2PtoTCP");
Thread t = new Thread(conn_c, Thread.currentThread().getName() + " I2PtoTCP " + conn);
t.start();
}
@ -122,7 +127,12 @@ die: {
}
}
}
} finally {
try {
serverSocket.close();
} catch (I2PException ex) {
}
// System.out.println("I2Plistener: Close");
// System.out.println("I2Plistener: Done.");
}
}
}

View File

@ -70,7 +70,13 @@ public class I2PtoTCP implements Runnable {
String host;
int port;
boolean tell;
die: {
InputStream in = null;
OutputStream out = null;
InputStream Iin = null;
OutputStream Iout = null;
try {
die:
{
try {
try {
rlock();
@ -92,10 +98,10 @@ die: {
}
sock = new Socket(host, port);
// make readers/writers
InputStream in = sock.getInputStream();
OutputStream out = sock.getOutputStream();
InputStream Iin = I2P.getInputStream();
OutputStream Iout = I2P.getOutputStream();
in = sock.getInputStream();
out = sock.getOutputStream();
Iin = I2P.getInputStream();
Iout = I2P.getOutputStream();
I2P.setReadTimeout(0); // temp bugfix, this *SHOULD* be the default
if (tell) {
@ -107,8 +113,8 @@ die: {
// setup to cross the streams
TCPio conn_c = new TCPio(in, Iout /*, info, database */); // app -> I2P
TCPio conn_a = new TCPio(Iin, out /* , info, database */); // I2P -> app
Thread t = new Thread(conn_c, "TCPioA");
Thread q = new Thread(conn_a, "TCPioB");
Thread t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA");
Thread q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB");
// Fire!
t.start();
q.start();
@ -116,6 +122,16 @@ die: {
try {
Thread.sleep(10); //sleep for 10 ms
} catch (InterruptedException e) {
break die;
}
}
// System.out.println("I2PtoTCP: Going away...");
} catch (Exception e) {
// System.out.println("I2PtoTCP: Owch! damn!");
break die;
}
} // die
} finally {
try {
in.close();
} catch (Exception ex) {
@ -132,14 +148,6 @@ die: {
Iout.close();
} catch (Exception ex) {
}
}
}
// System.out.println("I2PtoTCP: Going away...");
} catch(Exception e) {
// System.out.println("I2PtoTCP: Owch! damn!");
break die;
}
} // die
try {
// System.out.println("I2PtoTCP: Close I2P");
I2P.close();
@ -157,3 +165,4 @@ die: {
}
}
}

View File

@ -29,8 +29,6 @@ import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.Properties;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketManagerFactory;
@ -50,7 +48,7 @@ public class MUXlisten implements Runnable {
private ByteArrayInputStream prikey;
private ThreadGroup tg;
private String N;
private ServerSocket listener;
private ServerSocket listener = null;
private int backlog = 50; // should this be more? less?
boolean go_out;
boolean come_in;
@ -133,7 +131,9 @@ public class MUXlisten implements Runnable {
*/
public void run() {
I2PServerSocket SS = null;
int ticks = 100; // Allow 10 seconds, no more.
Thread t = null;
Thread q = null;
try {
try {
wlock();
try {
@ -165,14 +165,14 @@ public class MUXlisten implements Runnable {
// I2P -> TCP
SS = socketManager.getServerSocket();
I2Plistener conn = new I2Plistener(SS, socketManager, info, database, _log);
Thread t = new Thread(tg, conn, "BOBI2Plistener " + N);
t = new Thread(tg, conn, "BOBI2Plistener " + N);
t.start();
}
if (come_in) {
// TCP -> I2P
TCPlistener conn = new TCPlistener(listener, socketManager, info, database, _log);
Thread q = new Thread(tg, conn, "BOBTCPlistener" + N);
q = new Thread(tg, conn, "BOBTCPlistener " + N);
q.start();
}
@ -235,59 +235,31 @@ public class MUXlisten implements Runnable {
}
} // die
if (SS != null) {
try {
SS.close();
} catch (I2PException ex) {
//Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex);
}
}
if (this.come_in) {
try {
listener.close();
} catch (IOException e) {
}
}
I2PSession session = socketManager.getSession();
if (session != null) {
// I2PSession session = socketManager.getSession();
// if (session != null) {
// System.out.println("I2Plistener: destroySession");
try {
session.destroySession();
} catch (I2PSessionException ex) {
// nop
}
}
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
// Wait for child threads and thread groups to die
// System.out.println("MUXlisten: waiting for children");
// if (tg.activeCount() + tg.activeGroupCount() != 0) {
// tg.interrupt(); // give my stuff a small smack.
// while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) {
// ticks--;
// try {
// Thread.sleep(100); //sleep for 100 ms (One tenth second)
// } catch (InterruptedException ex) {
// break quit;
// session.destroySession();
// } catch (I2PSessionException ex) {
// nop
// }
// }
// if (tg.activeCount() + tg.activeGroupCount() != 0) {
// break quit; // Uh-oh.
// try {
// socketManager.destroySocketManager();
//} catch (Exception e) {
// nop
//}
//}
//tg.destroy();
// Zap reference to the ThreadGroup so the JVM can GC it.
//tg = null;
} catch (Exception e) {
// System.out.println("MUXlisten: Caught an exception" + e);
break quit;
}
} // quit
} finally {
// allow threads above this one to catch the stop signal.
try {
Thread.sleep(250);
} catch (InterruptedException ex) {
}
// zero out everything.
try {
wlock();
@ -303,17 +275,18 @@ public class MUXlisten implements Runnable {
} catch (Exception e) {
}
// This is here to catch when something fucks up REALLY bad, like those annoying stuck threads!
if (tg != null) {
// tg.interrupt(); // give my stuff a small smack again.
//try {
// Thread.sleep(1000 * 20); // how long?? is this even needed??
//} catch (InterruptedException ex) {
//}
if (SS != null) {
try {
SS.close();
} catch (I2PException ex) {
//Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex);
}
}
if (this.come_in) {
if (listener != null) {
try {
listener.close();
} catch (IOException e) {
@ -324,45 +297,41 @@ public class MUXlisten implements Runnable {
} catch (Exception e) {
// nop
}
// This is here to catch when something fucks up REALLY bad, like those annoying stuck threads!
if (tg != null) {
String boner = tg.getName();
// tg.interrupt(); // give my stuff a small smack again.
if (tg.activeCount() + tg.activeGroupCount() != 0) {
int foo = tg.activeCount() + tg.activeGroupCount();
int bar = foo;
String boner = tg.getName();
System.out.println("BOB: MUXlisten: Waiting on threads for " + boner);
System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
visit(tg, 0, boner);
System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n\n");
// hopefully no longer needed!
// System.out.println("BOB: MUXlisten: Waiting on threads for " + boner);
// System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
// visit(tg, 0, boner);
// System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n\n");
// Happily spin forever :-(
while ((tg.activeCount() + tg.activeGroupCount() != 0)) {
foo = tg.activeCount() + tg.activeGroupCount();
if (foo != bar) {
System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
visit(tg, 0, boner);
System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n\n");
}
// if (foo != bar) {
// System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
// visit(tg, 0, boner);
// System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n\n");
// }
bar = foo;
try {
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
} catch (Exception e) {
// nop
}
try {
Thread.sleep(100); //sleep for 100 ms (One tenth second)
} catch (InterruptedException ex) {
// nop
}
}
System.out.println("BOB: MUXlisten: Threads went away. Success: " + boner);
}
System.out.println("BOB: MUXlisten: Threads went away. Success: " + boner);
tg.destroy();
// Zap reference to the ThreadGroup so the JVM can GC it.
tg = null;
}
}
}
// Debugging...

View File

@ -29,6 +29,8 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
// import net.i2p.client.I2PSession;
// import net.i2p.client.I2PSessionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.util.Log;
@ -80,8 +82,10 @@ public class TCPlistener implements Runnable {
public void run() {
boolean g = false;
boolean spin = true;
die: {
int conn = 0;
try {
die:
{
try {
rlock();
} catch (Exception e) {
@ -130,9 +134,10 @@ die: {
g = false;
}
if (g) {
conn++;
// toss the connection to a new thread.
TCPtoI2P conn_c = new TCPtoI2P(socketManager, server);
Thread t = new Thread(conn_c, "BOBTCPtoI2P");
Thread t = new Thread(conn_c, Thread.currentThread().getName() + " TCPtoI2P " + conn);
t.start();
g = false;
}
@ -145,6 +150,12 @@ die: {
}
}
}
//System.out.println("TCPlistener: Done.");
} finally {
try {
listener.close();
} catch (IOException ex) {
}
//System.out.println("TCPlistener: " + Thread.currentThread().getName() + "Done.");
}
}
}

View File

@ -118,7 +118,7 @@ public class TCPtoI2P implements Runnable {
OutputStream Iout = null;
InputStream in = null;
OutputStream out = null;
try {
try {
in = sock.getInputStream();
@ -145,8 +145,8 @@ public class TCPtoI2P implements Runnable {
// setup to cross the streams
TCPio conn_c = new TCPio(in, Iout /*, info, database */); // app -> I2P
TCPio conn_a = new TCPio(Iin, out /*, info, database */); // I2P -> app
Thread t = new Thread(conn_c, "TCPioA");
Thread q = new Thread(conn_a, "TCPioB");
Thread t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA");
Thread q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB");
// Fire!
t.start();
q.start();
@ -170,6 +170,7 @@ public class TCPtoI2P implements Runnable {
} catch (Exception e) {
// bail on anything else
}
} finally {
try {
in.close();
} catch (Exception e) {
@ -197,6 +198,7 @@ public class TCPtoI2P implements Runnable {
sock.close();
} catch (Exception e) {
}
}
// System.out.println("TCPtoI2P: Done.");
}
}

View File

@ -202,6 +202,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
private LinkedBlockingQueue<MsgData> _msgs;
private AtomicBoolean _alive = new AtomicBoolean(false);
private static final int POISON_SIZE = -99999;
private AtomicBoolean stopping = new AtomicBoolean(false);
public MuxedAvailabilityNotifier() {
_msgs = new LinkedBlockingQueue();
@ -209,13 +210,15 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
@Override
public void stopNotifying() {
if(stopping.get()) return;
stopping.set(true);
boolean again = true;
_msgs.clear();
// _msgs.clear();
// Thread.yield();
if (_alive.get()) {
// System.out.println("I2PSessionMuxedImpl.stopNotifying()");
while(again) {
_msgs.clear();
while(again) {
try {
_msgs.put(new MsgData(0, POISON_SIZE, 0, 0, 0));
again = false;
@ -226,6 +229,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
}
}
_alive.set(false);
stopping.set(false); // Do we need this?
}
/** unused */

View File

@ -115,8 +115,8 @@ public class I2CPMessageReader {
}
private class I2CPMessageReaderRunner implements Runnable {
private boolean _doRun;
private boolean _stayAlive;
private volatile boolean _doRun;
private volatile boolean _stayAlive;
public I2CPMessageReaderRunner() {
_doRun = true;

View File

@ -1,3 +1,7 @@
2009-05-06 sponge
* Hopefully the last fixes for BOB.
* Fixes to prevent race in client-side I2CP and Notifier.
2009-05-03 sponge
* More hopeful fixes for BOB.
* Added new Robert ID to snark

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 7;
public final static long BUILD = 8;
/** for example "-test" */
public final static String EXTRA = "";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;