diff --git a/apps/BOB/src/net/i2p/BOB/DoCMDS.java b/apps/BOB/src/net/i2p/BOB/DoCMDS.java index 12d8d6156..16da28ce9 100644 --- a/apps/BOB/src/net/i2p/BOB/DoCMDS.java +++ b/apps/BOB/src/net/i2p/BOB/DoCMDS.java @@ -748,14 +748,12 @@ public class DoCMDS implements Runnable { nickinfo = (NamedDB) database.get(Arg); if (!tunnelactive(nickinfo)) { nickinfo = null; - ns = - true; + ns = true; } } catch (Exception b) { nickinfo = null; - ns = - true; + ns = true; } try { @@ -775,10 +773,10 @@ public class DoCMDS implements Runnable { try { database.add(Arg, nickinfo); nickinfo.add(P_NICKNAME, Arg); - nickinfo.add(P_STARTING, Boolean.FALSE); - nickinfo.add(P_RUNNING, Boolean.FALSE); - nickinfo.add(P_STOPPING, Boolean.FALSE); - nickinfo.add(P_QUIET, Boolean.FALSE); + nickinfo.add(P_STARTING, new Boolean(false)); + nickinfo.add(P_RUNNING, new Boolean(false)); + nickinfo.add(P_STOPPING, new Boolean(false)); + nickinfo.add(P_QUIET, new Boolean(false)); nickinfo.add(P_INHOST, "localhost"); nickinfo.add(P_OUTHOST, "localhost"); Properties Q = new Properties(); @@ -1265,13 +1263,17 @@ public class DoCMDS implements Runnable { tunnel = new MUXlisten(database, nickinfo, _log); Thread t = new Thread(tunnel); t.start(); + try { + Thread.sleep(1000 * 10); // Slow down the startup. + } catch(InterruptedException ie) { + // ignore it + } out.println("OK tunnel starting"); } catch (I2PException e) { out.println("ERROR starting tunnel: " + e); } catch (IOException e) { out.println("ERROR starting tunnel: " + e); } - } } catch (Exception ex) { break die; @@ -1304,7 +1306,7 @@ public class DoCMDS implements Runnable { break die; } - nickinfo.add(P_STOPPING, Boolean.TRUE); + nickinfo.add(P_STOPPING, new Boolean(true)); try { wunlock(); diff --git a/apps/BOB/src/net/i2p/BOB/I2Plistener.java b/apps/BOB/src/net/i2p/BOB/I2Plistener.java index 2dfa4bbb2..a8115893d 100644 --- a/apps/BOB/src/net/i2p/BOB/I2Plistener.java +++ b/apps/BOB/src/net/i2p/BOB/I2Plistener.java @@ -26,8 +26,6 @@ package net.i2p.BOB; import java.net.ConnectException; import java.net.SocketTimeoutException; import net.i2p.I2PException; -import net.i2p.client.I2PSession; -import net.i2p.client.I2PSessionException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketManager; @@ -42,7 +40,7 @@ public class I2Plistener implements Runnable { private NamedDB info, database; private Log _log; - private int tgwatch; +// private int tgwatch; public I2PSocketManager socketManager; public I2PServerSocket serverSocket; @@ -60,7 +58,7 @@ public class I2Plistener implements Runnable { this._log = _log; this.socketManager = S; serverSocket = SS; - tgwatch = 1; +// tgwatch = 1; } private void rlock() throws Exception { @@ -84,18 +82,18 @@ public class I2Plistener implements Runnable { die: { serverSocket.setSoTimeout(50); - try { - if (info.exists("INPORT")) { - tgwatch = 2; - } - } catch (Exception e) { - try { - runlock(); - } catch (Exception e2) { - break die; - } - break die; - } +// try { +// if (info.exists("INPORT")) { +// tgwatch = 2; +// } +// } catch (Exception e) { +// try { +// runlock(); +// } catch (Exception e2) { +// break die; +// } +// break die; +// } boolean spin = true; while (spin) { @@ -137,29 +135,33 @@ die: { } } // System.out.println("I2Plistener: Close"); - try { - serverSocket.close(); - } catch (I2PException e) { + + + // Previous level does this cleanup now. + // + // try { + // serverSocket.close(); + // } catch (I2PException e) { // nop - } + //} // need to kill off the socket manager too. - I2PSession session = socketManager.getSession(); - if (session != null) { + // I2PSession session = socketManager.getSession(); + // if (session != null) { // System.out.println("I2Plistener: destroySession"); - try { - session.destroySession(); - } catch (I2PSessionException ex) { + // try { + // session.destroySession(); + // } catch (I2PSessionException ex) { // nop - } - } + // } + //} // System.out.println("I2Plistener: Waiting for children"); - while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish - try { - Thread.sleep(100); //sleep for 100 ms (One tenth second) - } catch (Exception e) { + // while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish + // try { + // Thread.sleep(100); //sleep for 100 ms (One tenth second) + // } catch (Exception e) { // nop - } - } + // } + //} // System.out.println("I2Plistener: Done."); } diff --git a/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java b/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java index 06c3131fe..0984823b6 100644 --- a/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java +++ b/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java @@ -23,6 +23,7 @@ */ package net.i2p.BOB; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; @@ -116,7 +117,22 @@ die: { try { Thread.sleep(10); //sleep for 10 ms } catch(InterruptedException e) { - // nop + try { + in.close(); + } catch(Exception ex) { + } + try { + out.close(); + } catch(Exception ex) { + } + try { + Iin.close(); + } catch(Exception ex) { + } + try { + Iout.close(); + } catch(Exception ex) { + } } } // System.out.println("I2PtoTCP: Going away..."); diff --git a/apps/BOB/src/net/i2p/BOB/MUXlisten.java b/apps/BOB/src/net/i2p/BOB/MUXlisten.java index 1987fbf6b..dc30c5445 100644 --- a/apps/BOB/src/net/i2p/BOB/MUXlisten.java +++ b/apps/BOB/src/net/i2p/BOB/MUXlisten.java @@ -28,14 +28,13 @@ import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; import java.util.Properties; -import java.util.logging.Level; -import java.util.logging.Logger; import net.i2p.I2PException; +import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManagerFactory; import net.i2p.util.Log; -import org.tanukisoftware.wrapper.WrapperManager; /** * @@ -59,8 +58,8 @@ public class MUXlisten implements Runnable { /** * Constructor Will fail if INPORT is occupied. * - * @param info - * @param database + * @param info DB entry for this tunnel + * @param database master database of tunnels * @param _log * @throws net.i2p.I2PException * @throws java.io.IOException @@ -134,11 +133,11 @@ public class MUXlisten implements Runnable { */ public void run() { I2PServerSocket SS = null; + int ticks = 1200; // Allow 120 seconds, no more. try { wlock(); try { info.add("RUNNING", new Boolean(true)); - info.add("STARTING", new Boolean(false)); } catch (Exception e) { wunlock(); return; @@ -177,12 +176,28 @@ die: q.start(); } + try { + wlock(); + try { + info.add("STARTING", new Boolean(false)); + } catch (Exception e) { + wunlock(); + break die; + } + } catch (Exception e) { + break die; + } + try { + wunlock(); + } catch (Exception e) { + break die; + } boolean spin = true; while (spin) { try { - Thread.sleep(200); //sleep for 200 ms (Two thenths second) + Thread.sleep(1000); //sleep for 1 second } catch (InterruptedException e) { - // nop + break die; } try { rlock(); @@ -220,22 +235,49 @@ die: } } // die -// try { -// Thread.sleep(500); //sleep for 500 ms (One half second) -// } catch (InterruptedException ex) { -// // nop -// } - // wait for child threads and thread groups to die + if (SS != null) { + try { + SS.close(); + } catch (I2PException ex) { + //Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex); + } + } + if (this.come_in) { + try { + listener.close(); + } catch (IOException e) { + } + } + + I2PSession session = socketManager.getSession(); + if (session != null) { + // System.out.println("I2Plistener: destroySession"); + try { + session.destroySession(); + } catch (I2PSessionException ex) { + // nop + } + } + try { + socketManager.destroySocketManager(); + } catch (Exception e) { + // nop + } + // Wait for child threads and thread groups to die // System.out.println("MUXlisten: waiting for children"); if (tg.activeCount() + tg.activeGroupCount() != 0) { - while (tg.activeCount() + tg.activeGroupCount() != 0) { + while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) { tg.interrupt(); // unwedge any blocking threads. + ticks--; try { Thread.sleep(100); //sleep for 100 ms (One tenth second) } catch (InterruptedException ex) { - // NOP + break quit; } } + if (tg.activeCount() + tg.activeGroupCount() != 0) { + break quit; // Uh-oh. + } } tg.destroy(); // Zap reference to the ThreadGroup so the JVM can GC it. @@ -245,20 +287,32 @@ die: break quit; } } // quit - // This is here to catch when something fucks up REALLY bad. - if (tg != null) { - System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!"); - System.out.println("BOB: MUXlisten: Please email the following dump to sponge@mail.i2p"); - WrapperManager.requestThreadDump(); - System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!"); - System.out.println("BOB: MUXlisten: Please email the above dump to sponge@mail.i2p"); - } // This is here to catch when something fucks up REALLY bad. if (tg != null) { + if (SS != null) { + try { + SS.close(); + } catch (I2PException ex) { + //Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex); + } + } + if (this.come_in) { + try { + listener.close(); + } catch (IOException e) { + } + } + try { + socketManager.destroySocketManager(); + } catch (Exception e) { + // nop + } + ticks = 600; // 60 seconds if (tg.activeCount() + tg.activeGroupCount() != 0) { - tg.interrupt(); // unwedge any blocking threads. - while (tg.activeCount() + tg.activeGroupCount() != 0) { + while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) { + tg.interrupt(); // unwedge any blocking threads. + ticks--; try { Thread.sleep(100); //sleep for 100 ms (One tenth second) } catch (InterruptedException ex) { @@ -266,30 +320,26 @@ die: } } } - tg.destroy(); - // Zap reference to the ThreadGroup so the JVM can GC it. - tg = null; + if (tg.activeCount() + tg.activeGroupCount() == 0) { + tg.destroy(); + // Zap reference to the ThreadGroup so the JVM can GC it. + tg = null; + } else { + System.out.println("BOB: MUXlisten: Can't kill threads. Please send the following dump to sponge@mail.i2p"); + System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN"); + visit(tg, 0); + System.out.println("BOB: MUXlisten: ThreadGroup dump END\n\n"); + } } - if (SS != null) { - try { - SS.close(); - } catch (I2PException ex) { - //Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex); - } - } - // Lastly try to close things again. - if (this.come_in) { - try { - listener.close(); - } catch (IOException e) { - } - } - try { - socketManager.destroySocketManager(); - } catch (Exception e) { - // nop - } + // This is here to catch when something fucks up REALLY bad. +// if (tg != null) { +// System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!"); +// System.out.println("BOB: MUXlisten: Please email the following dump to sponge@mail.i2p"); +// WrapperManager.requestThreadDump(); +// System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!"); +// System.out.println("BOB: MUXlisten: Please email the above dump to sponge@mail.i2p"); +// } // zero out everything. try { wlock(); @@ -307,13 +357,49 @@ die: } -// private class DisconnectListener implements I2PSocketManager.DisconnectListener { -// -// public void sessionDisconnected() { -// close(); -// } -// } -// public void close() { -// socketManager.destroySocketManager(); -// } + + // Debugging... + + /** + * Find the root thread group and print them all. + * + */ + private void visitAllThreads() { + ThreadGroup root = Thread.currentThread().getThreadGroup().getParent(); + while (root.getParent() != null) { + root = root.getParent(); + } + + // Visit each thread group + visit(root, 0); + } + + /** + * Recursively visits all thread groups under `group' and dumps them. + * @param group ThreadGroup to visit + * @param level Current level + */ + private static void visit(ThreadGroup group, int level) { + // Get threads in `group' + int numThreads = group.activeCount(); + Thread[] threads = new Thread[numThreads * 2]; + numThreads = group.enumerate(threads, false); + String indent = "------------------------------------".substring(0, level) + "-> "; + // Enumerate each thread in `group' and print it. + for (int i = 0; i < numThreads; i++) { + // Get thread + Thread thread = threads[i]; + System.out.println("BOB: MUXlisten: " + indent + thread.toString()); + } + + // Get thread subgroups of `group' + int numGroups = group.activeGroupCount(); + ThreadGroup[] groups = new ThreadGroup[numGroups * 2]; + numGroups = group.enumerate(groups, false); + + // Recursively visit each subgroup + for (int i = 0; i < numGroups; i++) { + visit(groups[i], level + 1); + } + } } diff --git a/apps/BOB/src/net/i2p/BOB/TCPio.java b/apps/BOB/src/net/i2p/BOB/TCPio.java index 109d8e8cb..d4b353c54 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPio.java +++ b/apps/BOB/src/net/i2p/BOB/TCPio.java @@ -23,6 +23,7 @@ */ package net.i2p.BOB; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -40,9 +41,8 @@ public class TCPio implements Runnable { /** * Constructor * - * @param Ain - * @param Aout - * @param info + * @param Ain InputStream + * @param Aout OutputStream * * param database */ @@ -99,11 +99,11 @@ public class TCPio implements Runnable { } else if(b == 0) { Thread.yield(); // this should act like a mini sleep. if(Ain.available() == 0) { - try { +// try { // Thread.yield(); Thread.sleep(10); - } catch(InterruptedException ex) { - } +// } catch(InterruptedException ex) { +// } } } else { /* according to the specs: @@ -114,13 +114,25 @@ public class TCPio implements Runnable { * */ // System.out.println("TCPio: End Of Stream"); + Ain.close(); + Aout.close(); return; } } // System.out.println("TCPio: RUNNING = false"); } catch(Exception e) { // Eject!!! Eject!!! - // System.out.println("TCPio: Caught an exception " + e); + //System.out.println("TCPio: Caught an exception " + e); + try { + Ain.close(); + } catch (IOException ex) { +// Logger.getLogger(TCPio.class.getName()).log(Level.SEVERE, null, ex); + } + try { + Aout.close(); + } catch (IOException ex) { +// Logger.getLogger(TCPio.class.getName()).log(Level.SEVERE, null, ex); + } return; } // System.out.println("TCPio: Leaving."); diff --git a/apps/BOB/src/net/i2p/BOB/TCPlistener.java b/apps/BOB/src/net/i2p/BOB/TCPlistener.java index 7e931768c..78155eb78 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPlistener.java +++ b/apps/BOB/src/net/i2p/BOB/TCPlistener.java @@ -27,8 +27,8 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; -import net.i2p.client.I2PSession; -import net.i2p.client.I2PSessionException; +// import net.i2p.client.I2PSession; +// import net.i2p.client.I2PSessionException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.util.Log; @@ -187,23 +187,25 @@ die: { } } } + // Previous level does this cleanup now. + // // need to kill off the socket manager too. - I2PSession session = socketManager.getSession(); - if (session != null) { - try { - session.destroySession(); - } catch (I2PSessionException ex) { + // I2PSession session = socketManager.getSession(); + // if (session != null) { + // try { + // session.destroySession(); + // } catch (I2PSessionException ex) { // nop - } - } + // } + //} //System.out.println("TCPlistener: Waiting for children"); - while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish - try { - Thread.sleep(100); //sleep for 100 ms (One tenth second) - } catch (Exception e) { - // nop - } - } + //while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish + // try { + // Thread.sleep(100); //sleep for 100 ms (One tenth second) + // } catch (Exception e) { + // // nop + // } + //} //System.out.println("TCPlistener: Done."); } } diff --git a/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java b/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java index fe1ca3278..c376e16fe 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java +++ b/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java @@ -114,11 +114,15 @@ public class TCPtoI2P implements Runnable { */ public void run() { String line, input; + InputStream Iin = null; + OutputStream Iout = null; + InputStream in = null; + OutputStream out = null; try { - InputStream in = sock.getInputStream(); - OutputStream out = sock.getOutputStream(); + in = sock.getInputStream(); + out = sock.getOutputStream(); try { line = lnRead(in); input = line.toLowerCase(); @@ -136,8 +140,8 @@ public class TCPtoI2P implements Runnable { I2P = socketManager.connect(dest); I2P.setReadTimeout(0); // temp bugfix, this *SHOULD* be the default // make readers/writers - InputStream Iin = I2P.getInputStream(); - OutputStream Iout = I2P.getOutputStream(); + Iin = I2P.getInputStream(); + Iout = I2P.getOutputStream(); // setup to cross the streams TCPio conn_c = new TCPio(in, Iout /*, info, database */); // app -> I2P TCPio conn_a = new TCPio(Iin, out /*, info, database */); // I2P -> app @@ -147,11 +151,11 @@ public class TCPtoI2P implements Runnable { t.start(); q.start(); while(t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread - try { +// try { Thread.sleep(10); //sleep for 10 ms - } catch(InterruptedException e) { +// } catch(InterruptedException e) { // nop - } +// } } // System.out.println("TCPtoI2P: Going away..."); @@ -171,6 +175,22 @@ public class TCPtoI2P implements Runnable { } catch(Exception e) { // bail on anything else } + try { + in.close(); + } catch(Exception e) { + } + try { + out.close(); + } catch(Exception e) { + } + try { + Iin.close(); + } catch(Exception e) { + } + try { + Iout.close(); + } catch(Exception e) { + } try { // System.out.println("TCPtoI2P: Close I2P"); I2P.close(); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index f5f0df6b9..5b9aa23cb 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -100,7 +100,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } /** - * @param privKeyFile null to generate a transient key + * @param pkf null to generate a transient key * * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java index c92da6ae8..b592af5e4 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java @@ -34,7 +34,29 @@ import net.i2p.util.EventDispatcher; import net.i2p.util.I2PThread; import net.i2p.util.Log; -public abstract class I2PTunnelUDPClientBase extends I2PTunnelTask implements Source, Sink { + /** + * Base client class that sets up an I2P Datagram client destination. + * The UDP side is not implemented here, as there are at least + * two possibilities: + * + * 1) UDP side is a "server" + * Example: Streamr Consumer + * - Configure a destination host and port + * - External application sends no data + * - Extending class must have a constructor with host and port arguments + * + * 2) UDP side is a client/server + * Example: SOCKS UDP (DNS requests?) + * - configure an inbound port and a destination host and port + * - External application sends and receives data + * - Extending class must have a constructor with host and 2 port arguments + * + * So the implementing class must create a UDPSource and/or UDPSink, + * and must call setSink(). + * + * @author zzz with portions from welterde's streamr + */ + public abstract class I2PTunnelUDPClientBase extends I2PTunnelTask implements Source, Sink { private static final Log _log = new Log(I2PTunnelUDPClientBase.class); protected I2PAppContext _context; @@ -69,33 +91,11 @@ public abstract class I2PTunnelUDPClientBase extends I2PTunnelTask implements So private Source _i2pSource; private Sink _i2pSink; private Destination _otherDest; - /** - * Base client class that sets up an I2P Datagram client destination. - * The UDP side is not implemented here, as there are at least - * two possibilities: - * - * 1) UDP side is a "server" - * Example: Streamr Consumer - * - Configure a destination host and port - * - External application sends no data - * - Extending class must have a constructor with host and port arguments - * - * 2) UDP side is a client/server - * Example: SOCKS UDP (DNS requests?) - * - configure an inbound port and a destination host and port - * - External application sends and receives data - * - Extending class must have a constructor with host and 2 port arguments - * - * So the implementing class must create a UDPSource and/or UDPSink, - * and must call setSink(). - * * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager - * - * @author zzz with portions from welterde's streamr */ - public I2PTunnelUDPClientBase(String destination, Logging l, EventDispatcher notifyThis, + public I2PTunnelUDPClientBase(String destination, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) throws IllegalArgumentException { super("UDPServer", notifyThis, tunnel); _clientId = ++__clientId; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java index 8dcd66a36..2ab2687ff 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java @@ -32,25 +32,6 @@ import net.i2p.util.EventDispatcher; import net.i2p.util.I2PThread; import net.i2p.util.Log; -public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sink { - - private final static Log _log = new Log(I2PTunnelUDPServerBase.class); - - private Object lock = new Object(); - protected Object slock = new Object(); - - private static volatile long __serverId = 0; - - protected Logging l; - - private static final long DEFAULT_READ_TIMEOUT = -1; // 3*60*1000; - /** default timeout to 3 minutes - override if desired */ - protected long readTimeout = DEFAULT_READ_TIMEOUT; - - private I2PSession _session; - private Source _i2pSource; - private Sink _i2pSink; - /** * Base client class that sets up an I2P Datagram server destination. * The UDP side is not implemented here, as there are at least @@ -70,11 +51,34 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin * * So the implementing class must create a UDPSource and/or UDPSink, * and must call setSink(). + * + * @author zzz with portions from welterde's streamr + */ + +public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sink { + + private final static Log _log = new Log(I2PTunnelUDPServerBase.class); + + private Object lock = new Object(); + protected Object slock = new Object(); + + private static volatile long __serverId = 0; + + protected Logging l; + + private static final long DEFAULT_READ_TIMEOUT = -1; // 3*60*1000; + /** default timeout to 3 minutes - override if desired */ + protected long readTimeout = DEFAULT_READ_TIMEOUT; + + private I2PSession _session; + private Source _i2pSource; + private Sink _i2pSink; + + /** * * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager * - * @author zzz with portions from welterde's streamr */ public I2PTunnelUDPServerBase(boolean verify, File privkey, String privkeyname, Logging l, diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 6b99fdc00..f73632683 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -187,12 +187,12 @@ public class Connection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/" + _activeResends + "), waiting " + timeLeft); - try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) {} + try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;} } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + "), waiting indefinitely"); - try { _outboundPackets.wait(250); } catch (InterruptedException ie) {} //10*1000 + try { _outboundPackets.wait(250); } catch (InterruptedException ie) {if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")"); return false;} //10*1000 } } else { _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start); @@ -810,7 +810,11 @@ public class Connection { synchronized (_connectLock) { _connectLock.wait(timeLeft); } - } catch (InterruptedException ie) {} + } catch (InterruptedException ie) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("waitForConnect(): InterruptedException"); + _connectionError = "InterruptedException"; + return; + } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index b96eaea63..2d198ad66 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -126,7 +126,7 @@ class ConnectionHandler { if (timeoutMs <= 0) { try { syn = _synQueue.take(); // waits forever - } catch (InterruptedException ie) {} + } catch (InterruptedException ie) { break;} } else { long remaining = expiration - _context.clock().now(); // (dont think this applies anymore for LinkedBlockingQueue) @@ -138,7 +138,7 @@ class ConnectionHandler { break; try { syn = _synQueue.poll(remaining, TimeUnit.MILLISECONDS); // waits the specified time max - } catch (InterruptedException ie) {} + } catch (InterruptedException ie) { } break; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index cbb89e79e..197b92754 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -213,7 +213,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat timeRemaining = 10*1000; wait(timeRemaining); } - } catch (InterruptedException ie) {} + } catch (InterruptedException ie) { break; } } if (!writeSuccessful()) releasePayload(); diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index b08d01c26..a3e5e57a7 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -146,8 +146,8 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { * I2PSession.PROTO_STREAMING * I2PSession.PROTO_DATAGRAM * 255 disallowed - * @param fromport 1-65535 or 0 for unset - * @param toport 1-65535 or 0 for unset + * @param fromPort 1-65535 or 0 for unset + * @param toPort 1-65535 or 0 for unset */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expires, diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java index 13b01a67a..79066c4c6 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java @@ -178,7 +178,11 @@ public class I2CPMessageReader { // pause .5 secs when we're paused try { Thread.sleep(500); - } catch (InterruptedException ie) { // nop + } catch (InterruptedException ie) { + // we should break away here. + _log.warn("Breaking away stream", ie); + _listener.disconnected(I2CPMessageReader.this); + cancelRunner(); } } } diff --git a/core/java/src/net/i2p/util/ConvertToHash.java b/core/java/src/net/i2p/util/ConvertToHash.java index 087855640..28da87d21 100644 --- a/core/java/src/net/i2p/util/ConvertToHash.java +++ b/core/java/src/net/i2p/util/ConvertToHash.java @@ -16,12 +16,15 @@ import net.i2p.data.Hash; * Base32 desthash.b32.i2p * example.i2p * - * @return null on failure - * * @author zzz */ public class ConvertToHash { + /** + * Convert any kind of destination String to a hash + * + * @return null on failure + */ public static Hash getHash(String peer) { if (peer == null) return null; diff --git a/history.txt b/history.txt index 1e88a8c7d..807629d8c 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2009-04-10 sponge + * More BOB threadgroup fixes, plus debug dump when things go wrong. + * Fixes to streaminglib, I2CP, which are related to the TG problem. + * JavaDocs fixups. + 2009-04-08 sponge * More hopeful fixups to the infamous orpahned tunnel problem. *Sigh* diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index fe47169bc..6501cf5a7 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -17,7 +17,7 @@ import net.i2p.CoreVersion; public class RouterVersion { public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 14; + public final static long BUILD = 15; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);