propagate from branch 'i2p.i2p.zzz.test' (head 28f0df7ddfdda1df49f30016359dcb77836b06df)

to branch 'i2p.i2p' (head 40618503ea9748aedee73aaf002482424adc1f72)
This commit is contained in:
zzz
2009-04-27 14:59:15 +00:00
5 changed files with 110 additions and 30 deletions

View File

@ -1263,11 +1263,11 @@ public class DoCMDS implements Runnable {
tunnel = new MUXlisten(database, nickinfo, _log); tunnel = new MUXlisten(database, nickinfo, _log);
Thread t = new Thread(tunnel); Thread t = new Thread(tunnel);
t.start(); t.start();
try { // try {
Thread.sleep(1000 * 10); // Slow down the startup. // Thread.sleep(1000 * 10); // Slow down the startup.
} catch(InterruptedException ie) { // } catch(InterruptedException ie) {
// ignore it // // ignore it
} // }
out.println("OK tunnel starting"); out.println("OK tunnel starting");
} catch (I2PException e) { } catch (I2PException e) {
out.println("ERROR starting tunnel: " + e); out.println("ERROR starting tunnel: " + e);

View File

@ -133,7 +133,7 @@ public class MUXlisten implements Runnable {
*/ */
public void run() { public void run() {
I2PServerSocket SS = null; I2PServerSocket SS = null;
int ticks = 1200; // Allow 120 seconds, no more. int ticks = 100; // Allow 10 seconds, no more.
try { try {
wlock(); wlock();
try { try {
@ -267,7 +267,6 @@ die:
// System.out.println("MUXlisten: waiting for children"); // System.out.println("MUXlisten: waiting for children");
if (tg.activeCount() + tg.activeGroupCount() != 0) { if (tg.activeCount() + tg.activeGroupCount() != 0) {
while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) { while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) {
tg.interrupt(); // unwedge any blocking threads.
ticks--; ticks--;
try { try {
Thread.sleep(100); //sleep for 100 ms (One tenth second) Thread.sleep(100); //sleep for 100 ms (One tenth second)
@ -308,7 +307,7 @@ die:
} catch (Exception e) { } catch (Exception e) {
// nop // nop
} }
ticks = 600; // 60 seconds ticks = 100; // 10 seconds
if (tg.activeCount() + tg.activeGroupCount() != 0) { if (tg.activeCount() + tg.activeGroupCount() != 0) {
while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) { while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) {
tg.interrupt(); // unwedge any blocking threads. tg.interrupt(); // unwedge any blocking threads.
@ -325,10 +324,14 @@ die:
// Zap reference to the ThreadGroup so the JVM can GC it. // Zap reference to the ThreadGroup so the JVM can GC it.
tg = null; tg = null;
} else { } else {
System.out.println("BOB: MUXlisten: Can't kill threads. Please send the following dump to sponge@mail.i2p"); System.out.println("BOB: MUXlisten: Forcibly killing threads.");
System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN"); System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN");
visit(tg, 0); visit(tg, 0);
System.out.println("BOB: MUXlisten: ThreadGroup dump END\n\n"); System.out.println("BOB: MUXlisten: ThreadGroup dump END\n\n");
nuke(tg,0);
tg.destroy();
// Zap reference to the ThreadGroup so the JVM can GC it.
tg = null;
} }
} }
@ -354,7 +357,6 @@ die:
wunlock(); wunlock();
} catch (Exception e) { } catch (Exception e) {
} }
} }
@ -402,4 +404,37 @@ die:
visit(groups[i], level + 1); visit(groups[i], level + 1);
} }
} }
private static void nuke(ThreadGroup group, int level) {
// Get threads in `group'
int numThreads = group.activeCount();
Thread[] threads = new Thread[numThreads * 2];
numThreads = group.enumerate(threads, false);
// Enumerate each thread in `group' and stop it.
for (int i = 0; i < numThreads; i++) {
// Get thread
Thread thread = threads[i];
try {
if(thread.isAlive()) thread.stop();
} catch(SecurityException se) {
//nop
}
}
// Get thread subgroups of `group'
int numGroups = group.activeGroupCount();
ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
numGroups = group.enumerate(groups, false);
// Recursively visit each subgroup
for (int i = 0; i < numGroups; i++) {
nuke(groups[i], level + 1);
}
try {
group.destroy();
} catch (IllegalThreadStateException IE) {
//nop
} catch(SecurityException se) {
//nop
}
}
} }

View File

@ -4,18 +4,16 @@ package net.i2p.client;
* public domain * public domain
*/ */
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.HashSet;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.data.SessionTag;
import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleScheduler;
@ -97,6 +95,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
* 255 disallowed * 255 disallowed
* @param port 1-65535 or PORT_ANY for all * @param port 1-65535 or PORT_ANY for all
*/ */
@Override
public void addSessionListener(I2PSessionListener lsnr, int proto, int port) { public void addSessionListener(I2PSessionListener lsnr, int proto, int port) {
_demultiplexer.addListener(lsnr, proto, port); _demultiplexer.addListener(lsnr, proto, port);
} }
@ -107,11 +106,13 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
* @param proto 1-254 or 0 for all; 255 disallowed * @param proto 1-254 or 0 for all; 255 disallowed
* @param port 1-65535 or 0 for all * @param port 1-65535 or 0 for all
*/ */
@Override
public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) { public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) {
_demultiplexer.addMuxedListener(l, proto, port); _demultiplexer.addMuxedListener(l, proto, port);
} }
/** removes the specified listener (only) */ /** removes the specified listener (only) */
@Override
public void removeListener(int proto, int port) { public void removeListener(int proto, int port) {
_demultiplexer.removeListener(proto, port); _demultiplexer.removeListener(proto, port);
} }
@ -149,6 +150,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
* @param fromPort 1-65535 or 0 for unset * @param fromPort 1-65535 or 0 for unset
* @param toPort 1-65535 or 0 for unset * @param toPort 1-65535 or 0 for unset
*/ */
@Override
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
SessionKey keyUsed, Set tagsSent, long expires, SessionKey keyUsed, Set tagsSent, long expires,
int proto, int fromPort, int toPort) int proto, int fromPort, int toPort)
@ -198,24 +200,36 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
protected class MuxedAvailabilityNotifier extends AvailabilityNotifier { protected class MuxedAvailabilityNotifier extends AvailabilityNotifier {
private LinkedBlockingQueue<MsgData> _msgs; private LinkedBlockingQueue<MsgData> _msgs;
private boolean _alive; private AtomicBoolean _alive = new AtomicBoolean(false);
private static final int POISON_SIZE = -99999; private static final int POISON_SIZE = -99999;
public MuxedAvailabilityNotifier() { public MuxedAvailabilityNotifier() {
_msgs = new LinkedBlockingQueue(); _msgs = new LinkedBlockingQueue();
} }
@Override
public void stopNotifying() { public void stopNotifying() {
boolean again = true;
_msgs.clear();
// Thread.yield();
if (_alive.get()) {
// System.out.println("I2PSessionMuxedImpl.stopNotifying()");
while(again) {
_msgs.clear(); _msgs.clear();
if (_alive) {
_alive = false;
try { try {
_msgs.put(new MsgData(0, POISON_SIZE, 0, 0, 0)); _msgs.put(new MsgData(0, POISON_SIZE, 0, 0, 0));
} catch (InterruptedException ie) {} again = false;
// System.out.println("I2PSessionMuxedImpl.stopNotifying() success.");
} catch (InterruptedException ie) {
continue;
} }
} }
}
_alive.set(false);
}
/** unused */ /** unused */
@Override
public void available(long msgId, int size) { throw new IllegalArgumentException("no"); } public void available(long msgId, int size) { throw new IllegalArgumentException("no"); }
public void available(long msgId, int size, int proto, int fromPort, int toPort) { public void available(long msgId, int size, int proto, int fromPort, int toPort) {
@ -224,20 +238,24 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
@Override
public void run() { public void run() {
_alive = true;
while (true) {
MsgData msg; MsgData msg;
_alive.set(true);
while (_alive.get()) {
try { try {
msg = _msgs.take(); msg = _msgs.take();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
System.out.println("I2PSessionMuxedImpl.run() InterruptedException " + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive.toString());
continue; continue;
} }
if (msg.size == POISON_SIZE) if (msg.size == POISON_SIZE) {
// System.out.println("I2PSessionMuxedImpl.run() POISONED");
break; break;
}
try { try {
_demultiplexer.messageAvailable(I2PSessionMuxedImpl.this, msg.id, _demultiplexer.messageAvailable(I2PSessionMuxedImpl.this,
msg.size, msg.proto, msg.fromPort, msg.toPort); msg.id, msg.size, msg.proto, msg.fromPort, msg.toPort);
} catch (Exception e) { } catch (Exception e) {
_log.error("Error notifying app of message availability"); _log.error("Error notifying app of message availability");
} }

View File

@ -1,5 +1,32 @@
2009-04-25 sponge
* I2PSessionMuxedImpl atomic fixes
* BOB fixes. This should be the final bug wack. Good Luck to everybody!
2009-04-23 zzz
* Blocklist: cleanup
* eepget: handle -h, --help, bad options, etc.
(http://forum.i2p/viewtopic.php?p=16261#16261)
* Fragmenter: don't re-throw the corrupt fragment IllegalStateException,
to limit the damage - root cause still not found
* i2psnark: (http://forum.i2p/viewtopic.php?t=3317)
- Change file limit to 512 (was 256)
- Change size limit to 10GB (was 5GB)
- Change request size to 16KB (was 32KB)
- Change pipeline to 5 (was 3)
* logs.jsp: Move version info to the top
* Jetty: Fix temp dir name handling on windows, which was
causing susidns not to start
(http://forum.i2p/viewtopic.php?t=3364)
* NTCP: Prevent IllegalStateException
* PeerProfile:
- Replace a hot lock with concurrent RW lock
- Rewrite ugly IP Restriction code
- Also use transport IP in restriction code
* RouterConsole: Make summary bar a refreshing iframe
* Transport: Start the previously unused CleanupUnreachable
2009-04-21 sponge 2009-04-21 sponge
* Code janator work, basic corrections involving @Override, and * Code janitor work, basic corrections involving @Override, and
appling final where it is important. Also fixed some equals methods appling final where it is important. Also fixed some equals methods
and commented places that need fixing. and commented places that need fixing.

View File

@ -17,7 +17,7 @@ import net.i2p.CoreVersion;
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 1; public final static long BUILD = 3;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);