2009-04-27 sponge

* more BOB fixes, complete with warnings when things go wrong, and
      success messages when things turn around and go right. Terminates
      early so that applications wait no more than 10 seconds or so.
    * Reversed a few earlier patches that caused some odd behavior.
    * Changed some core println()'s to debugging messages.
This commit is contained in:
sponge
2009-04-27 17:09:47 +00:00
parent 542e0f2ed3
commit c1d7562331
12 changed files with 124 additions and 202 deletions

View File

@ -82,18 +82,6 @@ public class I2Plistener implements Runnable {
die: {
serverSocket.setSoTimeout(50);
// try {
// if (info.exists("INPORT")) {
// tgwatch = 2;
// }
// } catch (Exception e) {
// try {
// runlock();
// } catch (Exception e2) {
// break die;
// }
// break die;
// }
boolean spin = true;
while (spin) {
@ -129,40 +117,12 @@ die: {
t.start();
}
} catch (I2PException e) {
} catch (Exception e) {
// System.out.println("Exception " + e);
}
}
}
// System.out.println("I2Plistener: Close");
// Previous level does this cleanup now.
//
// try {
// serverSocket.close();
// } catch (I2PException e) {
// nop
//}
// need to kill off the socket manager too.
// I2PSession session = socketManager.getSession();
// if (session != null) {
// System.out.println("I2Plistener: destroySession");
// try {
// session.destroySession();
// } catch (I2PSessionException ex) {
// nop
// }
//}
// System.out.println("I2Plistener: Waiting for children");
// while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish
// try {
// Thread.sleep(100); //sleep for 100 ms (One tenth second)
// } catch (Exception e) {
// nop
// }
//}
// System.out.println("I2Plistener: Done.");
}
}

View File

@ -23,7 +23,6 @@
*/
package net.i2p.BOB;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

View File

@ -152,11 +152,11 @@ public class MUXlisten implements Runnable {
}
// socketManager.addDisconnectListener(new DisconnectListener());
quit:
quit:
{
try {
tg = new ThreadGroup(N);
die:
die:
{
// toss the connections to a new threads.
// will wrap with TCP and UDP when UDP works
@ -255,7 +255,7 @@ die:
try {
session.destroySession();
} catch (I2PSessionException ex) {
// nop
// nop
}
}
try {
@ -266,6 +266,7 @@ die:
// Wait for child threads and thread groups to die
// System.out.println("MUXlisten: waiting for children");
if (tg.activeCount() + tg.activeGroupCount() != 0) {
tg.interrupt(); // give my stuff a small smack.
while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) {
ticks--;
try {
@ -287,62 +288,6 @@ die:
}
} // quit
// This is here to catch when something fucks up REALLY bad.
if (tg != null) {
if (SS != null) {
try {
SS.close();
} catch (I2PException ex) {
//Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex);
}
}
if (this.come_in) {
try {
listener.close();
} catch (IOException e) {
}
}
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
ticks = 100; // 10 seconds
if (tg.activeCount() + tg.activeGroupCount() != 0) {
while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) {
tg.interrupt(); // unwedge any blocking threads.
ticks--;
try {
Thread.sleep(100); //sleep for 100 ms (One tenth second)
} catch (InterruptedException ex) {
// nop
}
}
}
if (tg.activeCount() + tg.activeGroupCount() == 0) {
tg.destroy();
// Zap reference to the ThreadGroup so the JVM can GC it.
tg = null;
} else {
System.out.println("BOB: MUXlisten: Forcibly killing threads.");
System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN");
visit(tg, 0);
System.out.println("BOB: MUXlisten: ThreadGroup dump END\n\n");
nuke(tg,0);
tg.destroy();
// Zap reference to the ThreadGroup so the JVM can GC it.
tg = null;
}
}
// This is here to catch when something fucks up REALLY bad.
// if (tg != null) {
// System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!");
// System.out.println("BOB: MUXlisten: Please email the following dump to sponge@mail.i2p");
// WrapperManager.requestThreadDump();
// System.out.println("BOB: MUXlisten: Something fucked up REALLY bad!");
// System.out.println("BOB: MUXlisten: Please email the above dump to sponge@mail.i2p");
// }
// zero out everything.
try {
wlock();
@ -357,11 +302,102 @@ die:
wunlock();
} catch (Exception e) {
}
// This is here to catch when something fucks up REALLY bad, like those annoying stuck threads!
if (tg != null) {
tg.interrupt(); // give my stuff a small smack again.
if (SS != null) {
try {
SS.close();
} catch (I2PException ex) {
//Logger.getLogger(MUXlisten.class.getName()).log(Level.SEVERE, null, ex);
}
}
if (this.come_in) {
try {
listener.close();
} catch (IOException e) {
}
}
I2PSession session = socketManager.getSession();
if (session != null) {
// System.out.println("I2Plistener: destroySession");
try {
session.destroySession();
} catch (I2PSessionException ex) {
// nop
}
}
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
// ticks = 100; // 10 seconds
if (tg.activeCount() + tg.activeGroupCount() != 0) {
int foo = tg.activeCount() + tg.activeGroupCount();
int bar = foo;
String boner = tg.getName();
System.out.println("BOB: MUXlisten: Waiting on threads for " + boner);
System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
visit(tg, 0, boner);
System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n\n");
// Happily spin forever :-(
while ((tg.activeCount() + tg.activeGroupCount() != 0)) {
foo = tg.activeCount() + tg.activeGroupCount();
if (foo != bar) {
System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
visit(tg, 0, boner);
System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n\n");
}
bar = foo;
try {
session = socketManager.getSession();
if (session != null) {
// System.out.println("I2Plistener: destroySession");
try {
session.destroySession();
} catch (I2PSessionException ex) {
// nop
}
}
try {
socketManager.destroySocketManager();
} catch (Exception e) {
// nop
}
} catch (Exception e) {
// nop
}
// tg.interrupt(); // unwedge any blocking threads.
// ticks--;
try {
Thread.sleep(100); //sleep for 100 ms (One tenth second)
} catch (InterruptedException ex) {
// nop
}
}
System.out.println("BOB: MUXlisten: Threads went away. Success: " + boner);
}
// if (tg.activeCount() + tg.activeGroupCount() == 0) {
tg.destroy();
// Zap reference to the ThreadGroup so the JVM can GC it.
tg = null;
// } else {
// System.out.println("BOB: MUXlisten: Forcibly killing threads.");
// System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN");
// visit(tg, 0);
// System.out.println("BOB: MUXlisten: ThreadGroup dump END\n\n");
// nuke(tg,0);
// tg.destroy();
// // Zap reference to the ThreadGroup so the JVM can GC it.
// tg = null;
// }
}
}
// Debugging...
/**
* Find the root thread group and print them all.
*
@ -373,7 +409,7 @@ die:
}
// Visit each thread group
visit(root, 0);
visit(root, 0, root.getName());
}
/**
@ -381,7 +417,7 @@ die:
* @param group ThreadGroup to visit
* @param level Current level
*/
private static void visit(ThreadGroup group, int level) {
private static void visit(ThreadGroup group, int level, String tn) {
// Get threads in `group'
int numThreads = group.activeCount();
Thread[] threads = new Thread[numThreads * 2];
@ -391,7 +427,7 @@ die:
for (int i = 0; i < numThreads; i++) {
// Get thread
Thread thread = threads[i];
System.out.println("BOB: MUXlisten: " + indent + thread.toString());
System.out.println("BOB: MUXlisten: " + tn + ": " + indent + thread.toString());
}
// Get thread subgroups of `group'
@ -401,9 +437,10 @@ die:
// Recursively visit each subgroup
for (int i = 0; i < numGroups; i++) {
visit(groups[i], level + 1);
visit(groups[i], level + 1, groups[i].getName());
}
}
private static void nuke(ThreadGroup group, int level) {
// Get threads in `group'
int numThreads = group.activeCount();
@ -414,8 +451,10 @@ die:
// Get thread
Thread thread = threads[i];
try {
if(thread.isAlive()) thread.stop();
} catch(SecurityException se) {
if (thread.isAlive()) {
thread.stop();
}
} catch (SecurityException se) {
//nop
}
}
@ -433,7 +472,7 @@ die:
group.destroy();
} catch (IllegalThreadStateException IE) {
//nop
} catch(SecurityException se) {
} catch (SecurityException se) {
//nop
}
}

View File

@ -87,23 +87,13 @@ public class TCPio implements Runnable {
boolean spin = true;
try {
while(spin) {
// database.getReadLock();
// info.getReadLock();
// spin = info.get("RUNNING").equals(Boolean.TRUE);
// info.releaseReadLock();
// database.releaseReadLock();
b = Ain.read(a, 0, 1);
// System.out.println(info.get("NICKNAME").toString() + " " + b);
if(b > 0) {
Aout.write(a, 0, b);
} else if(b == 0) {
Thread.yield(); // this should act like a mini sleep.
if(Ain.available() == 0) {
// try {
// Thread.yield();
Thread.sleep(10);
// } catch(InterruptedException ex) {
// }
}
} else {
/* according to the specs:
@ -119,19 +109,16 @@ public class TCPio implements Runnable {
return;
}
}
// System.out.println("TCPio: RUNNING = false");
} catch(Exception e) {
// Eject!!! Eject!!!
//System.out.println("TCPio: Caught an exception " + e);
try {
Ain.close();
} catch (IOException ex) {
// Logger.getLogger(TCPio.class.getName()).log(Level.SEVERE, null, ex);
}
try {
Aout.close();
} catch (IOException ex) {
// Logger.getLogger(TCPio.class.getName()).log(Level.SEVERE, null, ex);
}
return;
}

View File

@ -73,16 +73,6 @@ public class TCPlistener implements Runnable {
info.releaseReadLock();
}
private void wlock() throws Exception {
database.getWriteLock();
info.getWriteLock();
}
private void wunlock() throws Exception {
info.releaseWriteLock();
database.releaseWriteLock();
}
/**
* Simply listen on TCP port, and thread connections
*
@ -116,7 +106,7 @@ die: {
}
try {
Socket server = new Socket();
listener.setSoTimeout(50); // Half of the expected time from MUXlisten
listener.setSoTimeout(50); // We don't block, we cycle and check.
while (spin) {
try {
rlock();
@ -141,73 +131,20 @@ die: {
}
if (g) {
// toss the connection to a new thread.
TCPtoI2P conn_c = new TCPtoI2P(socketManager, server /* , info, database */);
TCPtoI2P conn_c = new TCPtoI2P(socketManager, server);
Thread t = new Thread(conn_c, "BOBTCPtoI2P");
t.start();
g = false;
}
}
//System.out.println("TCPlistener: destroySession");
listener.close();
} catch (IOException ioe) {
try {
listener.close();
} catch (IOException e) {
}
// Fatal failure, cause a stop event
try {
rlock();
try {
spin = info.get("RUNNING").equals(Boolean.TRUE);
} catch (Exception e) {
runlock();
break die;
}
} catch (Exception e) {
break die;
}
if (spin) {
try {
wlock();
try {
info.add("STOPPING", new Boolean(true));
info.add("RUNNING", new Boolean(false));
} catch (Exception e) {
wunlock();
break die;
}
} catch (Exception e) {
break die;
}
try {
wunlock();
} catch (Exception e) {
break die;
}
}
}
}
// Previous level does this cleanup now.
//
// need to kill off the socket manager too.
// I2PSession session = socketManager.getSession();
// if (session != null) {
// try {
// session.destroySession();
// } catch (I2PSessionException ex) {
// nop
// }
//}
//System.out.println("TCPlistener: Waiting for children");
//while (Thread.activeCount() > tgwatch) { // wait for all threads in our threadgroup to finish
// try {
// Thread.sleep(100); //sleep for 100 ms (One tenth second)
// } catch (Exception e) {
// // nop
// }
//}
//System.out.println("TCPlistener: Done.");
}
}

View File

@ -151,13 +151,8 @@ public class TCPtoI2P implements Runnable {
t.start();
q.start();
while(t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread
// try {
Thread.sleep(10); //sleep for 10 ms
// } catch(InterruptedException e) {
// nop
// }
}
// System.out.println("TCPtoI2P: Going away...");
} catch(I2PException e) {
Emsg("ERROR " + e.toString(), out);
@ -166,7 +161,7 @@ public class TCPtoI2P implements Runnable {
} catch(NoRouteToHostException e) {
Emsg("ERROR " + e.toString(), out);
} catch(InterruptedIOException e) {
Emsg("ERROR " + e.toString(), out);
// We're breaking away.
}
} catch(Exception e) {

View File

@ -2,8 +2,6 @@ package net.i2p.client.streaming;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
@ -126,7 +124,7 @@ class ConnectionHandler {
if (timeoutMs <= 0) {
try {
syn = _synQueue.take(); // waits forever
} catch (InterruptedException ie) { break;}
} catch (InterruptedException ie) { } // { break;}
} else {
long remaining = expiration - _context.clock().now();
// (dont think this applies anymore for LinkedBlockingQueue)

View File

@ -213,7 +213,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
timeRemaining = 10*1000;
wait(timeRemaining);
}
} catch (InterruptedException ie) { break; }
} catch (InterruptedException ie) { }//{ break; }
}
if (!writeSuccessful())
releasePayload();

View File

@ -16,7 +16,6 @@ import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -90,7 +89,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
protected I2PAppContext _context;
/** monitor for waiting until a lease set has been granted */
private Object _leaseSetWait = new Object();
private final Object _leaseSetWait = new Object();
/** whether the session connection has already been closed (or not yet opened) */
protected boolean _closed;
@ -101,7 +100,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** have we received the current date from the router yet? */
private boolean _dateReceived;
/** lock that we wait upon, that the SetDateMessageHandler notifies */
private Object _dateReceivedLock = new Object();
private final Object _dateReceivedLock = new Object();
/**
* thread that we tell when new messages are available who then tells us
@ -253,6 +252,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
try {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "connect begin to " + _hostname + ":" + _portNum);
_socket = new Socket(_hostname, _portNum);
// _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
_out = _socket.getOutputStream();
synchronized (_out) {
_out.write(I2PClient.PROTOCOL_BYTE);

View File

@ -246,7 +246,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
try {
msg = _msgs.take();
} catch (InterruptedException ie) {
System.out.println("I2PSessionMuxedImpl.run() InterruptedException " + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive.toString());
_log.debug("I2PSessionMuxedImpl.run() InterruptedException " + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive.toString());
continue;
}
if (msg.size == POISON_SIZE) {

View File

@ -1,3 +1,10 @@
2009-04-27 sponge
* more BOB fixes, complete with warnings when things go wrong, and
success messages when things turn around and go right. Terminates
early so that applications wait no more than 10 seconds or so.
* Reversed a few earlier patches that caused some odd behavior.
* Changed some core println()'s to debugging messages.
2009-04-27 zzz
* Build files:
- New updaterWithJettyFixes target, build it for pkg

View File

@ -17,7 +17,7 @@ import net.i2p.CoreVersion;
public class RouterVersion {
public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 4;
public final static long BUILD = 5;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);