2009-06-05 sponge

* BOB now cleans up tunnels, although they can take up to 5 minutes to
      disapear. This is due to the fact that the streaming lib doesn't
      actually remove the connections properly and kill them off when the
      manager is destroyed. I'm not certain if this is a bug, or a feature,
      but it sure is annoying, and you have to wait for the connections to
      time out. What should happen is the streaming lib should cause an IO
      error to the pending read or write.
This commit is contained in:
sponge
2009-06-05 19:46:26 +00:00
parent 7725b9e8a1
commit 2f6ae99452
7 changed files with 150 additions and 53 deletions

View File

@ -1,4 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project-private xmlns="http://www.netbeans.org/ns/project-private/1">
<editor-bookmarks xmlns="http://www.netbeans.org/ns/editor-bookmarks/1"/>
<open-files xmlns="http://www.netbeans.org/ns/projectui-open-files/1">
<file>file:/root/NetBeansProjects/i2p.i2p/apps/BOB/src/net/i2p/BOB/BOB.java</file>
<file>file:/root/NetBeansProjects/i2p.i2p/apps/BOB/src/net/i2p/BOB/DoCMDS.java</file>
<file>file:/root/NetBeansProjects/i2p.i2p/apps/BOB/src/net/i2p/BOB/MUXlisten.java</file>
</open-files>
</project-private>

View File

@ -36,7 +36,6 @@ import net.i2p.I2PException;
import net.i2p.client.I2PClientFactory;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleStore;
/**
* Simplistic command parser for BOB
@ -98,6 +97,7 @@ public class DoCMDS implements Runnable {
private static final String C_status = "status";
private static final String C_stop = "stop";
private static final String C_verify = "verify";
private static final String C_visit = "visit";
private static final String C_zap = "zap";
/* all the coomands available, plus description */
@ -124,6 +124,7 @@ public class DoCMDS implements Runnable {
{C_status, C_status + " nickname * Display status of a nicknamed tunnel."},
{C_stop, C_stop + " * Stops the current nicknamed tunnel."},
{C_verify, C_verify + " BASE64_key * Verifies BASE64 destination."},
{C_visit, C_visit + " * Thread dump to wrapper.log."},
{C_zap, C_zap + " * Shuts down BOB."},
{"", "COMMANDS: " + // this is ugly, but...
C_help + " " +
@ -148,13 +149,14 @@ public class DoCMDS implements Runnable {
C_status + " " +
C_stop + " " +
C_verify + " " +
C_visit + " " +
C_zap
},
{" ", " "} // end of list
};
/**
* @parm LIVE
* @param LIVE
* @param server
* @param props
* @param database
@ -438,6 +440,9 @@ public class DoCMDS implements Runnable {
}
}
} else if (Command.equals(C_visit)) {
visitAllThreads();
out.println("OK ");
} else if (Command.equals(C_getdest)) {
if (ns) {
if (dk) {
@ -1274,7 +1279,7 @@ public class DoCMDS implements Runnable {
} else {
MUXlisten tunnel;
try {
while(!lock.compareAndSet(false, true)) {
while (!lock.compareAndSet(false, true)) {
// wait
}
tunnel = new MUXlisten(lock, database, nickinfo, _log);
@ -1445,4 +1450,48 @@ public class DoCMDS implements Runnable {
ioe.printStackTrace();
}
}
// Debugging... None of this is normally used.
/**
* Find the root thread group and print them all.
*
*/
private void visitAllThreads() {
ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
while (root.getParent() != null) {
root = root.getParent();
}
// Visit each thread group
visit(root, 0, root.getName());
}
/**
* Recursively visits all thread groups under `group' and dumps them.
* @param group ThreadGroup to visit
* @param level Current level
*/
private static void visit(ThreadGroup group, int level, String tn) {
// Get threads in `group'
int numThreads = group.activeCount();
Thread[] threads = new Thread[numThreads * 2];
numThreads = group.enumerate(threads, false);
String indent = "------------------------------------".substring(0, level) + "-> ";
// Enumerate each thread in `group' and print it.
for (int i = 0; i < numThreads; i++) {
// Get thread
Thread thread = threads[i];
System.out.println("BOB: " + indent + tn + ": " +thread.toString());
}
// Get thread subgroups of `group'
int numGroups = group.activeGroupCount();
ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
numGroups = group.enumerate(groups, false);
// Recursively visit each subgroup
for (int i = 0; i < numGroups; i++) {
visit(groups[i], level + 1, groups[i].getName());
}
}
}

View File

@ -74,6 +74,8 @@ public class I2PtoTCP implements Runnable {
OutputStream out = null;
InputStream Iin = null;
OutputStream Iout = null;
Thread t = null;
Thread q = null;
try {
die:
{
@ -113,17 +115,33 @@ public class I2PtoTCP implements Runnable {
// setup to cross the streams
TCPio conn_c = new TCPio(in, Iout /*, info, database */); // app -> I2P
TCPio conn_a = new TCPio(Iin, out /* , info, database */); // I2P -> app
Thread t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA");
Thread q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB");
t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA");
q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB");
// Fire!
t.start();
q.start();
while (t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread
boolean spin = true;
while (t.isAlive() && q.isAlive() && spin) { // AND is used here to kill off the other thread
try {
Thread.sleep(10); //sleep for 10 ms
} catch (InterruptedException e) {
break die;
}
try {
rlock();
} catch (Exception e) {
break die;
}
try {
spin = info.get("RUNNING").equals(Boolean.TRUE);
} catch (Exception e) {
try {
runlock();
} catch (Exception e2) {
break die;
}
break die;
}
}
// System.out.println("I2PtoTCP: Going away...");
} catch (Exception e) {
@ -132,6 +150,14 @@ public class I2PtoTCP implements Runnable {
}
} // die
} finally {
try {
t.interrupt();
} catch (Exception e) {
}
try {
q.interrupt();
} catch (Exception e) {
}
try {
in.close();
} catch (Exception ex) {

View File

@ -159,7 +159,6 @@ public class MUXlisten implements Runnable {
{
try {
tg = new ThreadGroup(N);
die:
{
// toss the connections to a new threads.
// will wrap with TCP and UDP when UDP works
@ -185,22 +184,22 @@ public class MUXlisten implements Runnable {
info.add("STARTING", new Boolean(false));
} catch (Exception e) {
wunlock();
break die;
break quit;
}
} catch (Exception e) {
break die;
break quit;
}
try {
wunlock();
} catch (Exception e) {
break die;
break quit;
}
boolean spin = true;
while (spin) {
try {
Thread.sleep(1000); //sleep for 1 second
} catch (InterruptedException e) {
break die;
break quit;
}
try {
rlock();
@ -208,35 +207,17 @@ public class MUXlisten implements Runnable {
spin = info.get("STOPPING").equals(Boolean.FALSE);
} catch (Exception e) {
runlock();
break die;
break quit;
}
} catch (Exception e) {
break die;
break quit;
}
try {
runlock();
} catch (Exception e) {
break die;
break quit;
}
}
/* cleared in the finally...
try {
wlock();
try {
info.add("RUNNING", new Boolean(false));
} catch (Exception e) {
wunlock();
break die;
}
} catch (Exception e) {
break die;
}
try {
wunlock();
} catch (Exception e) {
break die;
}
*/
} // die
} catch (Exception e) {
@ -278,11 +259,6 @@ public class MUXlisten implements Runnable {
}
}
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
// Some grace time.
try {
Thread.sleep(250);
@ -293,25 +269,27 @@ public class MUXlisten implements Runnable {
// Wait around till all threads are collected.
if (tg != null) {
String boner = tg.getName();
System.out.println("BOB: MUXlisten: Starting thread collection for: " + boner);
_log.warn("BOB: MUXlisten: Starting thread collection for: " + boner);
// tg.interrupt(); // give my stuff a small smack again.
if (tg.activeCount() + tg.activeGroupCount() != 0) {
visit(tg, 0, boner);
int foo = tg.activeCount() + tg.activeGroupCount();
// hopefully no longer needed!
// int bar = foo;
// System.out.println("BOB: MUXlisten: Waiting on threads for " + boner);
// System.out.println("\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
// visit(tg, 0, boner);
// System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n");
int bar = foo;
System.out.println("BOB: MUXlisten: Waiting on threads for " + boner);
System.out.println("\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
visit(tg, 0, boner);
System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n");
// Happily spin forever :-(
while (foo != 0) {
foo = tg.activeCount() + tg.activeGroupCount();
// if (foo != bar) {
// System.out.println("\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
// visit(tg, 0, boner);
// System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n");
// }
// bar = foo;
if (foo != bar && foo != 0) {
System.out.println("\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
visit(tg, 0, boner);
System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n");
}
bar = foo;
try {
Thread.sleep(100); //sleep for 100 ms (One tenth second)
} catch (InterruptedException ex) {
@ -319,11 +297,18 @@ public class MUXlisten implements Runnable {
}
}
}
System.out.println("BOB: MUXlisten: Threads went away. Success: " + boner);
_log.warn("BOB: MUXlisten: Threads went away. Success: " + boner);
tg.destroy();
// Zap reference to the ThreadGroup so the JVM can GC it.
tg = null;
}
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
}
}

View File

@ -45,7 +45,7 @@ import net.i2p.i2ptunnel.I2PTunnel;
public class TCPtoI2P implements Runnable {
private I2PSocket I2P;
// private NamedDB info, database;
private NamedDB info, database;
private Socket sock;
private I2PSocketManager socketManager;
@ -108,6 +108,16 @@ public class TCPtoI2P implements Runnable {
out.flush();
}
private void rlock() throws Exception {
database.getReadLock();
info.getReadLock();
}
private void runlock() throws Exception {
database.releaseReadLock();
info.releaseReadLock();
}
/**
* TCP stream to I2P stream thread starter
*
@ -118,6 +128,8 @@ public class TCPtoI2P implements Runnable {
OutputStream Iout = null;
InputStream in = null;
OutputStream out = null;
Thread t = null;
Thread q = null;
try {
try {
@ -145,15 +157,18 @@ public class TCPtoI2P implements Runnable {
// setup to cross the streams
TCPio conn_c = new TCPio(in, Iout /*, info, database */); // app -> I2P
TCPio conn_a = new TCPio(Iin, out /*, info, database */); // I2P -> app
Thread t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA");
Thread q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB");
t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA");
q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB");
// Fire!
t.start();
q.start();
boolean spin = true;
while (t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread
Thread.sleep(10); //sleep for 10 ms
rlock();
spin = info.get("RUNNING").equals(Boolean.TRUE);
runlock();
}
} catch (I2PException e) {
Emsg("ERROR " + e.toString(), out);
} catch (ConnectException e) {
@ -171,6 +186,14 @@ public class TCPtoI2P implements Runnable {
// bail on anything else
}
} finally {
try {
t.interrupt();
} catch (Exception e) {
}
try {
q.interrupt();
} catch (Exception e) {
}
try {
in.close();
} catch (Exception e) {

View File

@ -1,3 +1,12 @@
2009-06-05 sponge
* BOB now cleans up tunnels, although they can take up to 5 minutes to
disapear. This is due to the fact that the streaming lib doesn't
actually remove the connections properly and kill them off when the
manager is destroyed. I'm not certain if this is a bug, or a feature,
but it sure is annoying, and you have to wait for the connections to
time out. What should happen is the streaming lib should cause an IO
error to the pending read or write.
2009-05-30 zzz
* Console:
- config.jsp now cause graceful restart

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 10;
public final static long BUILD = 11;
/** for example "-test" */
public final static String EXTRA = "";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;