From 701904d11948fcf61e0a111d2d07f30fafb93495 Mon Sep 17 00:00:00 2001 From: sponge Date: Fri, 5 Dec 2008 09:51:48 +0000 Subject: [PATCH] BUGFIX: streaming lib blocking on a write() will now fail when the socket is closed from under it. Enhancement: BOB can now clear a destination in under 1 second with the above fix. BOB also will do a thread dump when something really aweful happens, so that developers/users can help in debugging. --- apps/BOB/nbproject/private/private.properties | 1 + apps/BOB/nbproject/private/private.xml | 3 ++ apps/BOB/nbproject/project.properties | 20 ++++++++- apps/BOB/src/net/i2p/BOB/DoCMDS.java | 2 +- apps/BOB/src/net/i2p/BOB/I2Plistener.java | 27 ++++++------ apps/BOB/src/net/i2p/BOB/I2PtoTCP.java | 9 +++- apps/BOB/src/net/i2p/BOB/MUXlisten.java | 41 ++++++++++++++++--- apps/BOB/src/net/i2p/BOB/TCPio.java | 5 ++- apps/BOB/src/net/i2p/BOB/TCPlistener.java | 28 ++++++------- apps/BOB/src/net/i2p/BOB/TCPtoI2P.java | 8 +++- .../net/i2p/client/streaming/Connection.java | 14 +++---- .../client/streaming/MessageOutputStream.java | 1 + 12 files changed, 114 insertions(+), 45 deletions(-) diff --git a/apps/BOB/nbproject/private/private.properties b/apps/BOB/nbproject/private/private.properties index 979533fbd..416ecb6b1 100644 --- a/apps/BOB/nbproject/private/private.properties +++ b/apps/BOB/nbproject/private/private.properties @@ -1,3 +1,4 @@ +compile.on.save=false do.depend=false do.jar=true javac.debug=true diff --git a/apps/BOB/nbproject/private/private.xml b/apps/BOB/nbproject/private/private.xml index c1f155a78..2482568bf 100644 --- a/apps/BOB/nbproject/private/private.xml +++ b/apps/BOB/nbproject/private/private.xml @@ -1,4 +1,7 @@ + + file:/root/NetBeansProjects/i2p.i2p/apps/BOB/src/net/i2p/BOB/MUXlisten.java + diff --git a/apps/BOB/nbproject/project.properties b/apps/BOB/nbproject/project.properties index 89d7ad1da..aa2df5ffd 100644 --- a/apps/BOB/nbproject/project.properties +++ b/apps/BOB/nbproject/project.properties @@ -30,13 +30,31 @@ file.reference.mstreaming.jar-1=../ministreaming/java/build/mstreaming.jar file.reference.NetBeansProjects-i2p.i2p=../i2p.i2p/ file.reference.streaming.jar=../../bob/i2p/i2p.i2p/build/streaming.jar file.reference.streaming.jar-1=../streaming/java/build/streaming.jar +file.reference.wrapper-freebsd=../../installer/lib/wrapper/freebsd/ +file.reference.wrapper-linux=../../installer/lib/wrapper/linux/ +file.reference.wrapper-linux64=../../installer/lib/wrapper/linux64/ +file.reference.wrapper-macosx=../../installer/lib/wrapper/macosx/ +file.reference.wrapper-solaris=../../installer/lib/wrapper/solaris/ +file.reference.wrapper-win32=../../installer/lib/wrapper/win32/ +file.reference.wrapper.jar=../../installer/lib/wrapper/linux/wrapper.jar +file.reference.wrapper.jar-1=../../installer/lib/wrapper/freebsd/wrapper.jar +file.reference.wrapper.jar-2=../../installer/lib/wrapper/linux64/wrapper.jar +file.reference.wrapper.jar-3=../../installer/lib/wrapper/macosx/wrapper.jar +file.reference.wrapper.jar-4=../../installer/lib/wrapper/solaris/wrapper.jar +file.reference.wrapper.jar-5=../../installer/lib/wrapper/win32/wrapper.jar includes=** jar.compress=false javac.classpath=\ ${file.reference.i2p.jar-1}:\ ${file.reference.i2ptunnel.jar}:\ ${file.reference.mstreaming.jar-1}:\ - ${file.reference.streaming.jar-1} + ${file.reference.streaming.jar-1}:\ + ${file.reference.wrapper.jar-1}:\ + ${file.reference.wrapper.jar}:\ + ${file.reference.wrapper.jar-2}:\ + ${file.reference.wrapper.jar-3}:\ + ${file.reference.wrapper.jar-4}:\ + ${file.reference.wrapper.jar-5} # Space-separated list of extra javac options javac.compilerargs= javac.deprecation=false diff --git a/apps/BOB/src/net/i2p/BOB/DoCMDS.java b/apps/BOB/src/net/i2p/BOB/DoCMDS.java index 2aaeba9e9..a4fdba607 100644 --- a/apps/BOB/src/net/i2p/BOB/DoCMDS.java +++ b/apps/BOB/src/net/i2p/BOB/DoCMDS.java @@ -46,7 +46,7 @@ public class DoCMDS implements Runnable { // FIX ME // I need a better way to do versioning, but this will do for now. - public static final String BMAJ = "00", BMIN = "00", BREV = "01", BEXT = "-D"; + public static final String BMAJ = "00", BMIN = "00", BREV = "01", BEXT = "-E"; public static final String BOBversion = BMAJ + "." + BMIN + "." + BREV + BEXT; private Socket server; private Properties props; diff --git a/apps/BOB/src/net/i2p/BOB/I2Plistener.java b/apps/BOB/src/net/i2p/BOB/I2Plistener.java index c30e751f5..1561b7a22 100644 --- a/apps/BOB/src/net/i2p/BOB/I2Plistener.java +++ b/apps/BOB/src/net/i2p/BOB/I2Plistener.java @@ -70,7 +70,7 @@ public class I2Plistener implements Runnable { boolean g = false; I2PSocket sessSocket = null; - serverSocket.setSoTimeout(1000); + serverSocket.setSoTimeout(100); database.getReadLock(); info.getReadLock(); if(info.exists("INPORT")) { @@ -107,32 +107,31 @@ public class I2Plistener implements Runnable { // System.out.println("Exception " + e); } } - + // System.out.println("I2Plistener: Close"); try { serverSocket.close(); } catch(I2PException e) { // nop } - - while(Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish -// System.out.println("STOP Thread count " + Thread.activeCount()); - try { - Thread.sleep(1000); //sleep for 1000 ms (One second) - } catch(Exception e) { - // nop - } - } - - // System.out.println("STOP Thread count " + Thread.activeCount()); // need to kill off the socket manager too. I2PSession session = socketManager.getSession(); if(session != null) { + // System.out.println("I2Plistener: destroySession"); try { session.destroySession(); } catch(I2PSessionException ex) { // nop } -// System.out.println("destroySession Thread count " + Thread.activeCount()); } + // System.out.println("I2Plistener: Waiting for children"); + while(Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish + try { + Thread.sleep(100); //sleep for 100 ms (One tenth second) + } catch(Exception e) { + // nop + } + } + + // System.out.println("I2Plistener: Done."); } } diff --git a/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java b/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java index dae9f3e0f..a9a7c981b 100644 --- a/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java +++ b/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java @@ -119,20 +119,27 @@ die: { // nop } } - + System.out.println("I2PtoTCP: Going away..."); } catch(Exception e) { + System.out.println("I2PtoTCP: Owch! damn!"); break die; } } // die try { + //System.out.println("I2PtoTCP: Close I2P"); I2P.close(); } catch(Exception e) { tell = false; } + //System.out.println("I2PtoTCP: Closed I2P"); try { + //System.out.println("I2PtoTCP: Close sock"); sock.close(); } catch(Exception e) { tell = false; } + //System.out.println("I2PtoTCP: Closed sock"); + System.out.println("I2PtoTCP: Done"); + } } diff --git a/apps/BOB/src/net/i2p/BOB/MUXlisten.java b/apps/BOB/src/net/i2p/BOB/MUXlisten.java index a16c830b1..7694803d6 100644 --- a/apps/BOB/src/net/i2p/BOB/MUXlisten.java +++ b/apps/BOB/src/net/i2p/BOB/MUXlisten.java @@ -32,6 +32,7 @@ import net.i2p.I2PException; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManagerFactory; import net.i2p.util.Log; +import org.tanukisoftware.wrapper.WrapperManager; /** * @@ -64,6 +65,7 @@ public class MUXlisten implements Runnable { MUXlisten(NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException { int port = 0; InetAddress host = null; + this.tg = null; this.database = database; this.info = info; this._log = _log; @@ -207,23 +209,39 @@ die: { break die; } } // die + // wait for child threads and thread groups to die + // System.out.println("MUXlisten: waiting for children"); while(tg.activeCount() + tg.activeGroupCount() != 0) { + tg.interrupt(); // unwedge any blocking threads. try { - Thread.sleep(1000); //sleep for 1000 ms (One second) + Thread.sleep(100); //sleep for 100 ms (One tenth second) } catch(InterruptedException ex) { // nop - } + } } tg.destroy(); // Zap reference to the ThreadGroup so the JVM can GC it. tg = null; } catch(Exception e) { + // System.out.println("MUXlisten: Caught an exception" + e); break quit; } } // quit - socketManager.destroySocketManager(); - // zero out everything, just incase. + // This is here to catch when something fucks up REALLY bad. + if(tg != null) { + System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!"); + System.out.println("BOB: MUXlisten: Please email the following dump to sponge@mail.i2p"); + WrapperManager.requestThreadDump(); + System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!"); + System.out.println("BOB: MUXlisten: Please email the avove dump to sponge@mail.i2p"); + } + // zero out everything, just incase. + try { + socketManager.destroySocketManager(); + } catch(Exception e) { + // nop + } try { wlock(); try { @@ -236,7 +254,20 @@ die: { } wunlock(); } catch(Exception e) { - return; + } + // This is here to catch when something fucks up REALLY bad. + if(tg != null) { + while(tg.activeCount() + tg.activeGroupCount() != 0) { + tg.interrupt(); // unwedge any blocking threads. + try { + Thread.sleep(100); //sleep for 100 ms (One tenth second) + } catch(InterruptedException ex) { + // nop + } + } + tg.destroy(); + // Zap reference to the ThreadGroup so the JVM can GC it. + tg = null; } } } diff --git a/apps/BOB/src/net/i2p/BOB/TCPio.java b/apps/BOB/src/net/i2p/BOB/TCPio.java index e9024105a..25290bcdc 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPio.java +++ b/apps/BOB/src/net/i2p/BOB/TCPio.java @@ -23,7 +23,6 @@ */ package net.i2p.BOB; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -91,12 +90,16 @@ public class TCPio implements Runnable { * the stream has been reached. * */ + // System.out.println("TCPio: End Of Stream"); return; } } + // System.out.println("TCPio: RUNNING = false"); } catch(Exception e) { // Eject!!! Eject!!! + // System.out.println("TCPio: Caught an exception " + e); return; } + // System.out.println("TCPio: Leaving."); } } diff --git a/apps/BOB/src/net/i2p/BOB/TCPlistener.java b/apps/BOB/src/net/i2p/BOB/TCPlistener.java index 72b34eac8..99ae047d3 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPlistener.java +++ b/apps/BOB/src/net/i2p/BOB/TCPlistener.java @@ -76,7 +76,6 @@ public class TCPlistener implements Runnable { tgwatch = 2; } try { -// System.out.println("Starting thread count " + Thread.activeCount()); Socket server = new Socket(); listener.setSoTimeout(1000); info.releaseReadLock(); @@ -87,7 +86,6 @@ public class TCPlistener implements Runnable { spin = info.get("RUNNING").equals(Boolean.TRUE); info.releaseReadLock(); database.releaseReadLock(); -// System.out.println("Thread count " + Thread.activeCount()); try { server = listener.accept(); g = true; @@ -102,8 +100,13 @@ public class TCPlistener implements Runnable { g = false; } } + //System.out.println("TCPlistener: destroySession"); listener.close(); } catch(IOException ioe) { + try { + listener.close(); + } catch(IOException e) { + } // Fatal failure, cause a stop event database.getReadLock(); info.getReadLock(); @@ -120,17 +123,6 @@ public class TCPlistener implements Runnable { } } -//System.out.println("STOP!"); - - while(Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish -// System.out.println("STOP Thread count " + Thread.activeCount()); - try { - Thread.sleep(1000); //sleep for 1000 ms (One second) - } catch(Exception e) { - // nop - } - } -// System.out.println("STOP Thread count " + Thread.activeCount()); // need to kill off the socket manager too. I2PSession session = socketManager.getSession(); if(session != null) { @@ -139,8 +131,16 @@ public class TCPlistener implements Runnable { } catch(I2PSessionException ex) { // nop } -// System.out.println("destroySession Thread count " + Thread.activeCount()); } + //System.out.println("TCPlistener: Waiting for children"); + while(Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish + try { + Thread.sleep(100); //sleep for 100 ms (One tenth second) + } catch(Exception e) { + // nop + } + } + //System.out.println("TCPlistener: Done."); } } diff --git a/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java b/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java index 3323d0450..2a0e6519e 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java +++ b/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java @@ -152,6 +152,7 @@ public class TCPtoI2P implements Runnable { // nop } } + System.out.println("TCPtoI2P: Going away..."); } catch(I2PException e) { Emsg("ERROR " + e.toString(), out); @@ -169,14 +170,19 @@ public class TCPtoI2P implements Runnable { } catch(IOException ioe) { } try { + System.out.println("TCPtoI2P: Close I2P"); I2P.close(); } catch(Exception e) { } + //System.out.println("TCPtoI2P: Closed I2P"); try { + System.out.println("TCPtoI2P: Close sock"); sock.close(); } catch(Exception e) { } + //System.out.println("TCPtoI2P: Closed sock"); + System.out.println("TCPtoI2P: Done."); + } } - diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index e23ecb3df..100f59606 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -121,6 +121,7 @@ public class Connection { _resetSentOn = -1; _isInbound = false; _connectionEvent = new ConEvent(); + _hardDisconnected = false; _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -148,7 +149,7 @@ public class Connection { * @return true if the packet should be sent */ boolean packetSendChoke(long timeoutMs) { - if (false) return true; + // if (false) return true; // <--- what the fuck?? long start = _context.clock().now(); long writeExpire = start + timeoutMs; boolean started = false; @@ -162,9 +163,9 @@ public class Connection { // no need to wait until the other side has ACKed us before sending the first few wsize // packets through - // if (!_connected) - // return false; - + // Incorrect assumption, the constructor defaults _connected to true --Sponge + if (!_connected) + return false; started = true; if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) || (_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) { @@ -180,12 +181,12 @@ public class Connection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/" + _activeResends + "), waiting " + timeLeft); - try { _outboundPackets.wait(timeLeft); } catch (InterruptedException ie) {} + try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) {} } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + "), waiting indefinitely"); - try { _outboundPackets.wait(10*1000); } catch (InterruptedException ie) {} + try { _outboundPackets.wait(250); } catch (InterruptedException ie) {} //10*1000 } } else { _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start); @@ -487,7 +488,6 @@ public class Connection { synchronized (_connectLock) { _connectLock.notifyAll(); } if (_log.shouldLog(Log.DEBUG)) _log.debug("Disconnecting " + toString(), new Exception("discon")); - if (!cleanDisconnect) { _hardDisconnected = true; if (_log.shouldLog(Log.WARN)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 4a19d5e09..3819f8999 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -97,6 +97,7 @@ public class MessageOutputStream extends OutputStream { long begin = _context.clock().now(); while (remaining > 0) { WriteStatus ws = null; + if (_closed) throw new IOException("closed underneath us"); // we do any waiting outside the synchronized() block because we // want to allow other threads to flushAvailable() whenever they want. // this is the only method that *adds* to the _buf, and all