diff --git a/apps/BOB/src/net/i2p/BOB/I2Plistener.java b/apps/BOB/src/net/i2p/BOB/I2Plistener.java index a8115893d..3bb94e0b3 100644 --- a/apps/BOB/src/net/i2p/BOB/I2Plistener.java +++ b/apps/BOB/src/net/i2p/BOB/I2Plistener.java @@ -82,18 +82,6 @@ public class I2Plistener implements Runnable { die: { serverSocket.setSoTimeout(50); -// try { -// if (info.exists("INPORT")) { -// tgwatch = 2; -// } -// } catch (Exception e) { -// try { -// runlock(); -// } catch (Exception e2) { -// break die; -// } -// break die; -// } boolean spin = true; while (spin) { @@ -129,40 +117,12 @@ die: { t.start(); } - } catch (I2PException e) { + } catch (Exception e) { // System.out.println("Exception " + e); } } } // System.out.println("I2Plistener: Close"); - - - // Previous level does this cleanup now. - // - // try { - // serverSocket.close(); - // } catch (I2PException e) { - // nop - //} - // need to kill off the socket manager too. - // I2PSession session = socketManager.getSession(); - // if (session != null) { - // System.out.println("I2Plistener: destroySession"); - // try { - // session.destroySession(); - // } catch (I2PSessionException ex) { - // nop - // } - //} - // System.out.println("I2Plistener: Waiting for children"); - // while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish - // try { - // Thread.sleep(100); //sleep for 100 ms (One tenth second) - // } catch (Exception e) { - // nop - // } - //} - // System.out.println("I2Plistener: Done."); } } diff --git a/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java b/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java index 0984823b6..ad5e2701b 100644 --- a/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java +++ b/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java @@ -23,7 +23,6 @@ */ package net.i2p.BOB; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; diff --git a/apps/BOB/src/net/i2p/BOB/MUXlisten.java b/apps/BOB/src/net/i2p/BOB/MUXlisten.java index cf65a0b12..2c7a64924 100644 --- a/apps/BOB/src/net/i2p/BOB/MUXlisten.java +++ b/apps/BOB/src/net/i2p/BOB/MUXlisten.java @@ -152,11 +152,11 @@ public class MUXlisten implements Runnable { } // socketManager.addDisconnectListener(new DisconnectListener()); -quit: + quit: { try { tg = new ThreadGroup(N); -die: + die: { // toss the connections to a new threads. // will wrap with TCP and UDP when UDP works @@ -255,7 +255,7 @@ die: try { session.destroySession(); } catch (I2PSessionException ex) { - // nop + // nop } } try { @@ -266,6 +266,7 @@ die: // Wait for child threads and thread groups to die // System.out.println("MUXlisten: waiting for children"); if (tg.activeCount() + tg.activeGroupCount() != 0) { + tg.interrupt(); // give my stuff a small smack. while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) { ticks--; try { @@ -287,62 +288,6 @@ die: } } // quit - // This is here to catch when something fucks up REALLY bad. - if (tg != null) { - if (SS != null) { - try { - SS.close(); - } catch (I2PException ex) { - //Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex); - } - } - if (this.come_in) { - try { - listener.close(); - } catch (IOException e) { - } - } - try { - socketManager.destroySocketManager(); - } catch (Exception e) { - // nop - } - ticks = 100; // 10 seconds - if (tg.activeCount() + tg.activeGroupCount() != 0) { - while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) { - tg.interrupt(); // unwedge any blocking threads. - ticks--; - try { - Thread.sleep(100); //sleep for 100 ms (One tenth second) - } catch (InterruptedException ex) { - // nop - } - } - } - if (tg.activeCount() + tg.activeGroupCount() == 0) { - tg.destroy(); - // Zap reference to the ThreadGroup so the JVM can GC it. - tg = null; - } else { - System.out.println("BOB: MUXlisten: Forcibly killing threads."); - System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN"); - visit(tg, 0); - System.out.println("BOB: MUXlisten: ThreadGroup dump END\n\n"); - nuke(tg,0); - tg.destroy(); - // Zap reference to the ThreadGroup so the JVM can GC it. - tg = null; - } - } - - // This is here to catch when something fucks up REALLY bad. -// if (tg != null) { -// System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!"); -// System.out.println("BOB: MUXlisten: Please email the following dump to sponge@mail.i2p"); -// WrapperManager.requestThreadDump(); -// System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!"); -// System.out.println("BOB: MUXlisten: Please email the above dump to sponge@mail.i2p"); -// } // zero out everything. try { wlock(); @@ -357,11 +302,102 @@ die: wunlock(); } catch (Exception e) { } + + // This is here to catch when something fucks up REALLY bad, like those annoying stuck threads! + if (tg != null) { + tg.interrupt(); // give my stuff a small smack again. + if (SS != null) { + try { + SS.close(); + } catch (I2PException ex) { + //Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex); + } + } + if (this.come_in) { + try { + listener.close(); + } catch (IOException e) { + } + } + I2PSession session = socketManager.getSession(); + if (session != null) { + // System.out.println("I2Plistener: destroySession"); + try { + session.destroySession(); + } catch (I2PSessionException ex) { + // nop + } + } + try { + socketManager.destroySocketManager(); + } catch (Exception e) { + // nop + } + // ticks = 100; // 10 seconds + if (tg.activeCount() + tg.activeGroupCount() != 0) { + int foo = tg.activeCount() + tg.activeGroupCount(); + int bar = foo; + String boner = tg.getName(); + System.out.println("BOB: MUXlisten: Waiting on threads for " + boner); + System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner); + visit(tg, 0, boner); + System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n\n"); + // Happily spin forever :-( + while ((tg.activeCount() + tg.activeGroupCount() != 0)) { + foo = tg.activeCount() + tg.activeGroupCount(); + if (foo != bar) { + System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner); + visit(tg, 0, boner); + System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n\n"); + } + bar = foo; + try { + session = socketManager.getSession(); + if (session != null) { + // System.out.println("I2Plistener: destroySession"); + try { + session.destroySession(); + } catch (I2PSessionException ex) { + // nop + } + } + try { + socketManager.destroySocketManager(); + } catch (Exception e) { + // nop + } + } catch (Exception e) { + // nop + } + // tg.interrupt(); // unwedge any blocking threads. + // ticks--; + try { + Thread.sleep(100); //sleep for 100 ms (One tenth second) + } catch (InterruptedException ex) { + // nop + } + } + System.out.println("BOB: MUXlisten: Threads went away. Success: " + boner); + } +// if (tg.activeCount() + tg.activeGroupCount() == 0) { + tg.destroy(); + // Zap reference to the ThreadGroup so the JVM can GC it. + tg = null; +// } else { +// System.out.println("BOB: MUXlisten: Forcibly killing threads."); +// System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN"); +// visit(tg, 0); +// System.out.println("BOB: MUXlisten: ThreadGroup dump END\n\n"); +// nuke(tg,0); +// tg.destroy(); +// // Zap reference to the ThreadGroup so the JVM can GC it. +// tg = null; +// } + } } // Debugging... - /** * Find the root thread group and print them all. * @@ -373,7 +409,7 @@ die: } // Visit each thread group - visit(root, 0); + visit(root, 0, root.getName()); } /** @@ -381,7 +417,7 @@ die: * @param group ThreadGroup to visit * @param level Current level */ - private static void visit(ThreadGroup group, int level) { + private static void visit(ThreadGroup group, int level, String tn) { // Get threads in `group' int numThreads = group.activeCount(); Thread[] threads = new Thread[numThreads * 2]; @@ -391,7 +427,7 @@ die: for (int i = 0; i < numThreads; i++) { // Get thread Thread thread = threads[i]; - System.out.println("BOB: MUXlisten: " + indent + thread.toString()); + System.out.println("BOB: MUXlisten: " + tn + ": " + indent + thread.toString()); } // Get thread subgroups of `group' @@ -401,9 +437,10 @@ die: // Recursively visit each subgroup for (int i = 0; i < numGroups; i++) { - visit(groups[i], level + 1); + visit(groups[i], level + 1, groups[i].getName()); } } + private static void nuke(ThreadGroup group, int level) { // Get threads in `group' int numThreads = group.activeCount(); @@ -414,8 +451,10 @@ die: // Get thread Thread thread = threads[i]; try { - if(thread.isAlive()) thread.stop(); - } catch(SecurityException se) { + if (thread.isAlive()) { + thread.stop(); + } + } catch (SecurityException se) { //nop } } @@ -433,7 +472,7 @@ die: group.destroy(); } catch (IllegalThreadStateException IE) { //nop - } catch(SecurityException se) { + } catch (SecurityException se) { //nop } } diff --git a/apps/BOB/src/net/i2p/BOB/TCPio.java b/apps/BOB/src/net/i2p/BOB/TCPio.java index d4b353c54..d92a5cef0 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPio.java +++ b/apps/BOB/src/net/i2p/BOB/TCPio.java @@ -87,23 +87,13 @@ public class TCPio implements Runnable { boolean spin = true; try { while(spin) { - // database.getReadLock(); - // info.getReadLock(); - // spin = info.get("RUNNING").equals(Boolean.TRUE); - // info.releaseReadLock(); - // database.releaseReadLock(); b = Ain.read(a, 0, 1); - // System.out.println(info.get("NICKNAME").toString() + " " + b); if(b > 0) { Aout.write(a, 0, b); } else if(b == 0) { Thread.yield(); // this should act like a mini sleep. if(Ain.available() == 0) { -// try { - // Thread.yield(); Thread.sleep(10); -// } catch(InterruptedException ex) { -// } } } else { /* according to the specs: @@ -119,19 +109,16 @@ public class TCPio implements Runnable { return; } } - // System.out.println("TCPio: RUNNING = false"); } catch(Exception e) { // Eject!!! Eject!!! //System.out.println("TCPio: Caught an exception " + e); try { Ain.close(); } catch (IOException ex) { -// Logger.getLogger(TCPio.class.getName()).log(Level.SEVERE, null, ex); } try { Aout.close(); } catch (IOException ex) { -// Logger.getLogger(TCPio.class.getName()).log(Level.SEVERE, null, ex); } return; } diff --git a/apps/BOB/src/net/i2p/BOB/TCPlistener.java b/apps/BOB/src/net/i2p/BOB/TCPlistener.java index 78155eb78..4ce888090 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPlistener.java +++ b/apps/BOB/src/net/i2p/BOB/TCPlistener.java @@ -73,16 +73,6 @@ public class TCPlistener implements Runnable { info.releaseReadLock(); } - private void wlock() throws Exception { - database.getWriteLock(); - info.getWriteLock(); - } - - private void wunlock() throws Exception { - info.releaseWriteLock(); - database.releaseWriteLock(); - } - /** * Simply listen on TCP port, and thread connections * @@ -116,7 +106,7 @@ die: { } try { Socket server = new Socket(); - listener.setSoTimeout(50); // Half of the expected time from MUXlisten + listener.setSoTimeout(50); // We don't block, we cycle and check. while (spin) { try { rlock(); @@ -141,73 +131,20 @@ die: { } if (g) { // toss the connection to a new thread. - TCPtoI2P conn_c = new TCPtoI2P(socketManager, server /* , info, database */); + TCPtoI2P conn_c = new TCPtoI2P(socketManager, server); Thread t = new Thread(conn_c, "BOBTCPtoI2P"); t.start(); g = false; } } - //System.out.println("TCPlistener: destroySession"); listener.close(); } catch (IOException ioe) { try { listener.close(); } catch (IOException e) { } - // Fatal failure, cause a stop event - try { - rlock(); - try { - spin = info.get("RUNNING").equals(Boolean.TRUE); - } catch (Exception e) { - runlock(); - break die; - } - } catch (Exception e) { - break die; - } - if (spin) { - try { - wlock(); - try { - info.add("STOPPING", new Boolean(true)); - info.add("RUNNING", new Boolean(false)); - } catch (Exception e) { - wunlock(); - break die; - } - } catch (Exception e) { - break die; - } - try { - wunlock(); - } catch (Exception e) { - break die; - } - } } } - // Previous level does this cleanup now. - // - // need to kill off the socket manager too. - // I2PSession session = socketManager.getSession(); - // if (session != null) { - // try { - // session.destroySession(); - // } catch (I2PSessionException ex) { - // nop - // } - //} - //System.out.println("TCPlistener: Waiting for children"); - //while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish - // try { - // Thread.sleep(100); //sleep for 100 ms (One tenth second) - // } catch (Exception e) { - // // nop - // } - //} //System.out.println("TCPlistener: Done."); } } - - diff --git a/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java b/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java index c376e16fe..5fefae017 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java +++ b/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java @@ -151,13 +151,8 @@ public class TCPtoI2P implements Runnable { t.start(); q.start(); while(t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread -// try { Thread.sleep(10); //sleep for 10 ms -// } catch(InterruptedException e) { - // nop -// } } - // System.out.println("TCPtoI2P: Going away..."); } catch(I2PException e) { Emsg("ERROR " + e.toString(), out); @@ -166,7 +161,7 @@ public class TCPtoI2P implements Runnable { } catch(NoRouteToHostException e) { Emsg("ERROR " + e.toString(), out); } catch(InterruptedIOException e) { - Emsg("ERROR " + e.toString(), out); + // We're breaking away. } } catch(Exception e) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 2d198ad66..f4b00c1b5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -2,8 +2,6 @@ package net.i2p.client.streaming; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.ArrayList; -import java.util.List; import net.i2p.I2PAppContext; import net.i2p.util.Log; @@ -126,7 +124,7 @@ class ConnectionHandler { if (timeoutMs <= 0) { try { syn = _synQueue.take(); // waits forever - } catch (InterruptedException ie) { break;} + } catch (InterruptedException ie) { } // { break;} } else { long remaining = expiration - _context.clock().now(); // (dont think this applies anymore for LinkedBlockingQueue) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 197b92754..827be5b04 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -213,7 +213,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat timeRemaining = 10*1000; wait(timeRemaining); } - } catch (InterruptedException ie) { break; } + } catch (InterruptedException ie) { }//{ break; } } if (!writeSuccessful()) releasePayload(); diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 9e42eef5f..a5d8ed94d 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -16,7 +16,6 @@ import java.net.Socket; import java.net.UnknownHostException; import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -90,7 +89,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa protected I2PAppContext _context; /** monitor for waiting until a lease set has been granted */ - private Object _leaseSetWait = new Object(); + private final Object _leaseSetWait = new Object(); /** whether the session connection has already been closed (or not yet opened) */ protected boolean _closed; @@ -101,7 +100,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** have we received the current date from the router yet? */ private boolean _dateReceived; /** lock that we wait upon, that the SetDateMessageHandler notifies */ - private Object _dateReceivedLock = new Object(); + private final Object _dateReceivedLock = new Object(); /** * thread that we tell when new messages are available who then tells us @@ -253,6 +252,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa try { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "connect begin to " + _hostname + ":" + _portNum); _socket = new Socket(_hostname, _portNum); + // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it. _out = _socket.getOutputStream(); synchronized (_out) { _out.write(I2PClient.PROTOCOL_BYTE); diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index a906dbcf8..eda48e754 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -246,7 +246,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { try { msg = _msgs.take(); } catch (InterruptedException ie) { - System.out.println("I2PSessionMuxedImpl.run() InterruptedException " + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive.toString()); + _log.debug("I2PSessionMuxedImpl.run() InterruptedException " + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive.toString()); continue; } if (msg.size == POISON_SIZE) { diff --git a/history.txt b/history.txt index 0972cc484..cfc2af853 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,10 @@ +2009-04-27 sponge + * more BOB fixes, complete with warnings when things go wrong, and + success messages when things turn around and go right. Terminates + early so that applications wait no more than 10 seconds or so. + * Reversed a few earlier patches that caused some odd behavior. + * Changed some core println()'s to debugging messages. + 2009-04-27 zzz * Build files: - New updaterWithJettyFixes target, build it for pkg diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 5f58c2cdd..5203f2360 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -17,7 +17,7 @@ import net.i2p.CoreVersion; public class RouterVersion { public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 4; + public final static long BUILD = 5; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);