diff --git a/apps/BOB/src/net/i2p/BOB/I2Plistener.java b/apps/BOB/src/net/i2p/BOB/I2Plistener.java index 22b59fcc3..a077346bf 100644 --- a/apps/BOB/src/net/i2p/BOB/I2Plistener.java +++ b/apps/BOB/src/net/i2p/BOB/I2Plistener.java @@ -40,7 +40,7 @@ import net.i2p.util.Log; */ public class I2Plistener implements Runnable { - private nickname info; + private nickname info, database; private Log _log; private int tgwatch; public I2PSocketManager socketManager; @@ -50,9 +50,11 @@ public class I2Plistener implements Runnable { * Constructor * @param S * @param info + * @param database * @param _log */ - I2Plistener(I2PSocketManager S, nickname info, Log _log) { + I2Plistener(I2PSocketManager S, nickname info, nickname database, Log _log) { + this.database = database; this.info = info; this._log = _log; this.socketManager = S; @@ -62,38 +64,48 @@ public class I2Plistener implements Runnable { /** * Simply listen on I2P port, and thread connections - * - * @throws RuntimeException + * + * @throws RuntimeException */ public void run() throws RuntimeException { boolean g = false; I2PSocket sessSocket = null; - // needed to hack in this method :-/ serverSocket.setSoTimeout(1000); + database.getReadLock(); + info.getReadLock(); if(info.exists("INPORT")) { tgwatch = 2; } - while(info.get("RUNNING").equals(Boolean.TRUE)) { + info.releaseReadLock(); + database.releaseReadLock(); + boolean spin = true; + while(spin) { + + database.getReadLock(); + info.getReadLock(); + spin = info.get("RUNNING").equals(Boolean.TRUE); + info.releaseReadLock(); + database.releaseReadLock(); try { try { sessSocket = serverSocket.accept(); g = true; } catch(ConnectException ce) { g = false; - } catch (SocketTimeoutException ste) { + } catch(SocketTimeoutException ste) { g = false; } if(g) { g = false; // toss the connection to a new thread. - I2PtoTCP conn_c = new I2PtoTCP(sessSocket, info); + I2PtoTCP conn_c = new I2PtoTCP(sessSocket, info, database); Thread t = new Thread(conn_c, "BOBI2PtoTCP"); t.start(); } } catch(I2PException e) { - System.out.println("Exception "+e); + // System.out.println("Exception " + e); } } diff --git a/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java b/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java index d7aa79f10..7e68c6a2b 100644 --- a/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java +++ b/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java @@ -36,18 +36,20 @@ import net.i2p.client.streaming.I2PSocket; public class I2PtoTCP implements Runnable { private I2PSocket I2P; - private nickname info; + private nickname info, database; private Socket sock; /** * Constructor * * @param I2Psock - * @param db + * @param info + * @param database */ - I2PtoTCP(I2PSocket I2Psock, nickname db) { + I2PtoTCP(I2PSocket I2Psock, nickname info, nickname database) { this.I2P = I2Psock; - this.info = db; + this.info = info; + this.database = database; } /** @@ -57,7 +59,14 @@ public class I2PtoTCP implements Runnable { public void run() { try { - sock = new Socket(info.get("OUTHOST").toString(), Integer.parseInt(info.get("OUTPORT").toString())); + database.getReadLock(); + info.getReadLock(); + String host = info.get("OUTHOST").toString(); + int port = Integer.parseInt(info.get("OUTPORT").toString()); + boolean tell = info.get("QUIET").equals(Boolean.FALSE); + info.releaseReadLock(); + database.releaseReadLock(); + sock = new Socket(host, port); // make readers/writers InputStream in = sock.getInputStream(); OutputStream out = sock.getOutputStream(); @@ -65,15 +74,15 @@ public class I2PtoTCP implements Runnable { OutputStream Iout = I2P.getOutputStream(); I2P.setReadTimeout(0); // temp bugfix, this *SHOULD* be the default - if(info.get("QUIET").equals(Boolean.FALSE)) { + if(tell) { // tell who is connecting out.write(I2P.getPeerDestination().toBase64().getBytes()); out.write(10); // nl out.flush(); // not really needed, but... } // setup to cross the streams - TCPio conn_c = new TCPio(in, Iout, info); // app -> I2P - TCPio conn_a = new TCPio(Iin, out, info); // I2P -> app + TCPio conn_c = new TCPio(in, Iout, info, database); // app -> I2P + TCPio conn_a = new TCPio(Iin, out, info, database); // I2P -> app Thread t = new Thread(conn_c, "TCPioA"); Thread q = new Thread(conn_a, "TCPioB"); // Fire! diff --git a/apps/BOB/src/net/i2p/BOB/MUXlisten.java b/apps/BOB/src/net/i2p/BOB/MUXlisten.java index 29ae003c3..c640a81d4 100644 --- a/apps/BOB/src/net/i2p/BOB/MUXlisten.java +++ b/apps/BOB/src/net/i2p/BOB/MUXlisten.java @@ -39,7 +39,7 @@ import net.i2p.util.Log; */ public class MUXlisten implements Runnable { - private nickname info; + private nickname database, info; private Log _log; private I2PSocketManager socketManager; private ByteArrayInputStream prikey; @@ -50,18 +50,31 @@ public class MUXlisten implements Runnable { * Constructor * * @param info + * @param database * @param _log * @throws net.i2p.I2PException * @throws java.io.IOException */ - MUXlisten(nickname info, Log _log) throws I2PException, IOException { + MUXlisten(nickname database, nickname info, Log _log) throws I2PException, IOException { + this.database = database; this.info = info; this._log = _log; - this.info.add("STARTING", Boolean.TRUE); + this.database.getReadLock(); + this.info.getReadLock(); N = this.info.get("NICKNAME").toString(); prikey = new ByteArrayInputStream((byte[])info.get("KEYS")); - socketManager = I2PSocketManagerFactory.createManager(prikey, (Properties)info.get("PROPERTIES")); + Properties Q = (Properties)info.get("PROPERTIES"); + this.database.releaseReadLock(); + this.info.releaseReadLock(); + + this.database.getWriteLock(); + this.info.getWriteLock(); + this.info.add("STARTING", Boolean.TRUE); + this.info.releaseWriteLock(); + this.database.releaseWriteLock(); + + socketManager = I2PSocketManagerFactory.createManager(prikey, Q); } /** @@ -70,49 +83,82 @@ public class MUXlisten implements Runnable { */ public void run() { - tg = new ThreadGroup(N); + this.database.getWriteLock(); + this.info.getWriteLock(); info.add("RUNNING", Boolean.TRUE); info.add("STARTING", Boolean.FALSE); + this.info.releaseWriteLock(); + this.database.releaseWriteLock(); - // toss the connections to a new threads. - // will wrap with TCP and UDP when UDP works - if(info.exists("OUTPORT")) { - // I2P -> TCP - I2Plistener conn = new I2Plistener(socketManager, info, _log); - Thread t = new Thread(tg, conn, "BOBI2Plistener " + N); - t.start(); - } - if(info.exists("INPORT")) { - // TCP -> I2P - TCPlistener conn = new TCPlistener(socketManager, info, _log); - Thread q = new Thread(tg, conn,"BOBTCPlistener" + N); - q.start(); - } + try { + tg = new ThreadGroup(N); - while(info.get("STOPPING").equals(Boolean.FALSE)) { - try { - Thread.sleep(1000); //sleep for 1000 ms (One second) - } catch(InterruptedException e) { - // nop + // toss the connections to a new threads. + // will wrap with TCP and UDP when UDP works + this.database.getReadLock(); + this.info.getReadLock(); + boolean go_out = info.exists("OUTPORT"); + boolean come_in = info.exists("INPORT"); + this.database.releaseReadLock(); + this.info.releaseReadLock(); + + if(go_out) { + // I2P -> TCP + I2Plistener conn = new I2Plistener(socketManager, info, database, _log); + Thread t = new Thread(tg, conn, "BOBI2Plistener " + N); + t.start(); } + + if(come_in) { + // TCP -> I2P + TCPlistener conn = new TCPlistener(socketManager, info, database, _log); + Thread q = new Thread(tg, conn, "BOBTCPlistener" + N); + q.start(); + } + + boolean spin = true; + while(spin) { + try { + Thread.sleep(1000); //sleep for 1000 ms (One second) + } catch(InterruptedException e) { + // nop + } + + this.database.getReadLock(); + this.info.getReadLock(); + spin = info.get("STOPPING").equals(Boolean.FALSE); + this.database.releaseReadLock(); + this.info.releaseReadLock(); + } + + this.database.getWriteLock(); + this.info.getWriteLock(); + info.add("RUNNING", Boolean.FALSE); + this.info.releaseWriteLock(); + this.database.releaseWriteLock(); + + // wait for child threads and thread groups to die + while(tg.activeCount() + tg.activeGroupCount() != 0) { + try { + Thread.sleep(1000); //sleep for 1000 ms (One second) + } catch(InterruptedException ex) { + // nop + } + } + tg.destroy(); + // Zap reference to the ThreadGroup so the JVM can GC it. + tg = null; + } catch(Exception e) { } - info.add("RUNNING", Boolean.FALSE); - // wait for child threads and thread groups to die - while (tg.activeCount() + tg.activeGroupCount() != 0) { - try { - Thread.sleep(1000); //sleep for 1000 ms (One second) - } catch(InterruptedException ex) { - // nop - } - } - socketManager.destroySocketManager(); - tg.destroy(); - // Zap reference to the ThreadGroup so the JVM can GC it. - tg = null; - info.add("STOPPING", Boolean.FALSE); + // zero out everything, just incase. + this.database.getWriteLock(); + this.info.getWriteLock(); info.add("STARTING", Boolean.FALSE); - + info.add("STOPPING", Boolean.FALSE); + info.add("RUNNING", Boolean.FALSE); + this.info.releaseWriteLock(); + this.database.releaseWriteLock(); } } diff --git a/apps/BOB/src/net/i2p/BOB/TCPio.java b/apps/BOB/src/net/i2p/BOB/TCPio.java index 7a7a15e86..bce2cdde1 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPio.java +++ b/apps/BOB/src/net/i2p/BOB/TCPio.java @@ -36,19 +36,21 @@ public class TCPio implements Runnable { private InputStream Ain; private OutputStream Aout; - private nickname info; + private nickname info, database; /** * Constructor * * @param Ain * @param Aout - * @param db + * @param info + * @param database */ - TCPio(InputStream Ain, OutputStream Aout, nickname db) { + TCPio(InputStream Ain, OutputStream Aout, nickname info, nickname database) { this.Ain = Ain; this.Aout = Aout; - this.info = db; + this.info = info; + this.database = database; } /** @@ -63,13 +65,20 @@ public class TCPio implements Runnable { public void run() { int b; byte a[] = new byte[1]; + boolean spin = true; try { - while(info.get("RUNNING").equals(Boolean.TRUE)) { + while(spin) { + database.getReadLock(); + info.getReadLock(); + spin = info.get("RUNNING").equals(Boolean.TRUE); + info.releaseReadLock(); + database.releaseReadLock(); + b = Ain.read(a, 0, 1); // System.out.println(info.get("NICKNAME").toString() + " " + b); if(b > 0) { - Aout.write(a,0,1); - // Aout.flush(); too slow! + Aout.write(a, 0, 1); + // Aout.flush(); too slow! } else if(b == 0) { try { // Thread.yield(); @@ -87,7 +96,7 @@ public class TCPio implements Runnable { return; } } - } catch(IOException e) { + } catch(Exception e) { } } } diff --git a/apps/BOB/src/net/i2p/BOB/TCPlistener.java b/apps/BOB/src/net/i2p/BOB/TCPlistener.java index 88ce9eb9c..f06b1da27 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPlistener.java +++ b/apps/BOB/src/net/i2p/BOB/TCPlistener.java @@ -41,7 +41,7 @@ import net.i2p.util.Log; */ public class TCPlistener implements Runnable { - private nickname info; + private nickname info, database; private Log _log; private int tgwatch; public I2PSocketManager socketManager; @@ -52,9 +52,11 @@ public class TCPlistener implements Runnable { * Constructor * @param S * @param info + * @param database * @param _log */ - TCPlistener(I2PSocketManager S, nickname info, Log _log) { + TCPlistener(I2PSocketManager S, nickname info, nickname database, Log _log) { + this.database = database; this.info = info; this._log = _log; this.socketManager = S; @@ -67,6 +69,8 @@ public class TCPlistener implements Runnable { */ public void run() throws RuntimeException { boolean g = false; + database.getReadLock(); + info.getReadLock(); if(info.exists("OUTPORT")) { tgwatch = 2; } @@ -75,7 +79,15 @@ public class TCPlistener implements Runnable { ServerSocket listener = new ServerSocket(Integer.parseInt(info.get("INPORT").toString()), backlog, InetAddress.getByName(info.get("INHOST").toString())); Socket server = new Socket(); listener.setSoTimeout(1000); - while(info.get("RUNNING").equals(Boolean.TRUE)) { + info.releaseReadLock(); + database.releaseReadLock(); + boolean spin = true; + while(spin) { + database.getReadLock(); + info.getReadLock(); + spin = info.get("RUNNING").equals(Boolean.TRUE); + info.releaseReadLock(); + database.releaseReadLock(); // System.out.println("Thread count " + Thread.activeCount()); try { server = listener.accept(); @@ -85,7 +97,7 @@ public class TCPlistener implements Runnable { } if(g) { // toss the connection to a new thread. - TCPtoI2P conn_c = new TCPtoI2P(socketManager, server, info); + TCPtoI2P conn_c = new TCPtoI2P(socketManager, server, info, database); Thread t = new Thread(conn_c, "BOBTCPtoI2P"); t.start(); g = false; diff --git a/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java b/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java index ee21f462e..00b306fca 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java +++ b/apps/BOB/src/net/i2p/BOB/TCPtoI2P.java @@ -45,7 +45,7 @@ import net.i2p.i2ptunnel.I2PTunnel; public class TCPtoI2P implements Runnable { private I2PSocket I2P; - private nickname info; + private nickname info, database; private Socket sock; private I2PSocketManager socketManager; @@ -84,11 +84,13 @@ public class TCPtoI2P implements Runnable { * Constructor * @param i2p * @param socket - * @param db + * @param info + * @param database */ - TCPtoI2P(I2PSocketManager i2p, Socket socket, nickname db) { + TCPtoI2P(I2PSocketManager i2p, Socket socket, nickname info, nickname database) { this.sock = socket; - this.info = db; + this.info = info; + this.database = database; this.socketManager = i2p; } @@ -136,8 +138,8 @@ public class TCPtoI2P implements Runnable { InputStream Iin = I2P.getInputStream(); OutputStream Iout = I2P.getOutputStream(); // setup to cross the streams - TCPio conn_c = new TCPio(in, Iout, info); // app -> I2P - TCPio conn_a = new TCPio(Iin, out, info); // I2P -> app + TCPio conn_c = new TCPio(in, Iout, info, database); // app -> I2P + TCPio conn_a = new TCPio(Iin, out, info, database); // I2P -> app Thread t = new Thread(conn_c, "TCPioA"); Thread q = new Thread(conn_a, "TCPioB"); // Fire! diff --git a/apps/BOB/src/net/i2p/BOB/doCMDS.java b/apps/BOB/src/net/i2p/BOB/doCMDS.java index 3e2631359..b1527997e 100644 --- a/apps/BOB/src/net/i2p/BOB/doCMDS.java +++ b/apps/BOB/src/net/i2p/BOB/doCMDS.java @@ -46,7 +46,7 @@ public class doCMDS implements Runnable { // FIX ME // I need a better way to do versioning, but this will do for now. - public static final String BMAJ = "00", BMIN = "00", BREV = "01", BEXT = "-8"; + public static final String BMAJ = "00", BMIN = "00", BREV = "01", BEXT = "-9"; public static final String BOBversion = BMAJ + "." + BMIN + "." + BREV + BEXT; private Socket server; private Properties props; @@ -153,6 +153,42 @@ public class doCMDS implements Runnable { this._log = _log; } + private void rlock() { + rlock(nickinfo); + } + + private void rlock(nickname Arg) { + database.getReadLock(); + Arg.getReadLock(); + } + + private void runlock() { + runlock(nickinfo); + } + + private void runlock(nickname Arg) { + Arg.releaseReadLock(); + database.releaseReadLock(); + } + + private void wlock() { + wlock(nickinfo); + } + + private void wlock(nickname Arg) { + database.getWriteLock(); + Arg.getWriteLock(); + } + + private void wunlock() { + wunlock(nickinfo); + } + + private void wunlock(nickname Arg) { + Arg.releaseWriteLock(); + database.releaseWriteLock(); + } + /** * Try to print info from the database * @@ -161,12 +197,17 @@ public class doCMDS implements Runnable { * @param key */ public void trypnt(PrintStream out, nickname info, Object key) { - out.print(" " + key + ": "); - if(info.exists(key)) { - out.print(info.get(key)); - } else { - out.print("not_set"); + rlock(info); + try { + out.print(" " + key + ": "); + if(info.exists(key)) { + out.print(info.get(key)); + } else { + out.print("not_set"); + } + } catch(Exception e) { } + runlock(info); } /** @@ -177,8 +218,13 @@ public class doCMDS implements Runnable { * @param key */ public void tfpnt(PrintStream out, nickname info, Object key) { - out.print(" " + key + ": "); - out.print(info.exists(key)); + rlock(info); + try { + out.print(" " + key + ": "); + out.print(info.exists(key)); + } catch(Exception e) { + } + runlock(info); } /** @@ -197,6 +243,7 @@ public class doCMDS implements Runnable { * @param info */ public void nickprint(PrintStream out, nickname info) { + rlock(info); trypnt(out, info, P_NICKNAME); trypnt(out, info, P_STARTING); trypnt(out, info, P_RUNNING); @@ -207,21 +254,28 @@ public class doCMDS implements Runnable { trypnt(out, info, P_INHOST); trypnt(out, info, P_OUTPORT); trypnt(out, info, P_OUTHOST); - out.println(); - + try { + out.println(); + } catch(Exception e) { + } + runlock(info); } /** * Print information on a specific record, indicated by nickname * @param out - * @param database * @param Arg */ - public void ttlpnt(PrintStream out, nickname database, Object Arg) { + public void ttlpnt(PrintStream out, Object Arg) { + database.getReadLock(); if(database.exists(Arg)) { - out.print("DATA"); + try { + out.print("DATA"); + } catch(Exception e) { + } nickprint(out, (nickname)database.get(Arg)); } + database.releaseReadLock(); } /** @@ -231,10 +285,12 @@ public class doCMDS implements Runnable { * @return true if the tunnel is active */ public boolean tunnelactive(nickname Arg) { - return (Arg.get(P_STARTING).equals(Boolean.TRUE) || + rlock(Arg); + boolean retval = (Arg.get(P_STARTING).equals(Boolean.TRUE) || Arg.get(P_STOPPING).equals(Boolean.TRUE) || Arg.get(P_RUNNING).equals(Boolean.TRUE)); - + runlock(Arg); + return retval; } /** @@ -267,7 +323,6 @@ public class doCMDS implements Runnable { out.println("BOB " + BOBversion); out.println("OK"); while((line = in.readLine()) != null) { - System.gc(); // yes, this does make a huge difference... StringTokenizer token = new StringTokenizer(line, " "); // use a space as a delimiter String Command = ""; String Arg = ""; @@ -293,7 +348,12 @@ public class doCMDS implements Runnable { } else if(Command.equals(C_getdest)) { if(ns) { if(dk) { - out.println("OK " + nickinfo.get(P_DEST)); + rlock(); + try { + out.println("OK " + nickinfo.get(P_DEST)); + } catch(Exception e) { + } + runlock(); } else { out.println("ERROR keys not set."); } @@ -302,16 +362,23 @@ public class doCMDS implements Runnable { } } else if(Command.equals(C_list)) { // Produce a formatted list of all nicknames + database.getReadLock(); for(int i = 0; i < database.getcount(); i++) { try { info = (nickname)database.getnext(i); - } catch(RuntimeException b) { + } catch(Exception b) { break; // something bad happened. } + try { - out.print("DATA"); + out.print("DATA"); + } catch(Exception e) { + } + info.getReadLock(); nickprint(out, info); + info.releaseReadLock(); } + database.releaseReadLock(); out.println("OK Listing done"); } else if(Command.equals(C_quit)) { // End the command session @@ -325,11 +392,17 @@ public class doCMDS implements Runnable { // Make a new PublicKey and PrivateKey prikey = new ByteArrayOutputStream(); d = I2PClientFactory.createClient().createDestination(prikey); - dk = true; + wlock(); nickinfo.add(P_KEYS, prikey.toByteArray()); nickinfo.add(P_DEST, d.toBase64()); - // System.out.println(prikey.toByteArray().length); - out.println("OK " + nickinfo.get(P_DEST)); + dk = true; + wunlock(); + rlock(); + try { + out.println("OK " + nickinfo.get(P_DEST)); + } catch(Exception e) { + } + runlock(); } catch(IOException ioe) { BOB.error("Error generating keys" + ioe); out.println("ERROR generating keys"); @@ -345,7 +418,9 @@ public class doCMDS implements Runnable { // Return public key if(dk) { prikey = new ByteArrayOutputStream(); + rlock(); prikey.write(((byte[])nickinfo.get(P_KEYS))); + runlock(); out.println("OK " + net.i2p.data.Base64.encode(prikey.toByteArray())); } else { out.println("ERROR no public key has been set"); @@ -355,7 +430,9 @@ public class doCMDS implements Runnable { if(tunnelactive(nickinfo)) { out.println("ERROR tunnel is active"); } else { + wlock(); nickinfo.add(P_QUIET, new Boolean(Boolean.parseBoolean(Arg) == true)); + wunlock(); out.println("OK Quiet set"); } } else { @@ -375,10 +452,17 @@ public class doCMDS implements Runnable { Arg = ""; } if((Arg.length() == 884) && is64ok(Arg)) { + wlock(); nickinfo.add(P_KEYS, prikey.toByteArray()); nickinfo.add(P_DEST, d.toBase64()); - out.println("OK " + nickinfo.get(P_DEST)); dk = true; + wunlock(); + rlock(); + try { + out.println("OK " + nickinfo.get(P_DEST)); + } catch(Exception e) { + } + runlock(); } else { out.println("ERROR not in BASE64 format"); } @@ -388,20 +472,22 @@ public class doCMDS implements Runnable { } } else if(Command.equals(C_setnick)) { ns = dk = ip = op = false; + database.getReadLock(); try { nickinfo = (nickname)database.get(Arg); if(!tunnelactive(nickinfo)) { nickinfo = null; ns = true; } - } catch(RuntimeException b) { + } catch(Exception b) { nickinfo = null; ns = true; } - + database.releaseReadLock(); // Clears and Sets the initial nickname structure to work with if(ns) { nickinfo = new nickname(); + wlock(); database.add(Arg, nickinfo); nickinfo.add(P_NICKNAME, Arg); nickinfo.add(P_STARTING, Boolean.FALSE); @@ -410,10 +496,11 @@ public class doCMDS implements Runnable { nickinfo.add(P_QUIET, Boolean.FALSE); nickinfo.add(P_INHOST, "localhost"); nickinfo.add(P_OUTHOST, "localhost"); - Properties Q = props; - Q.setProperty("inbound.nickname", (String)nickinfo.get(P_NICKNAME)); - Q.setProperty("outbound.nickname", (String)nickinfo.get(P_NICKNAME)); + Properties Q = new Properties(props); + Q.setProperty("inbound.nickname", Arg); + Q.setProperty("outbound.nickname", Arg); nickinfo.add(P_PROPERTIES, Q); + wunlock(); out.println("OK Nickname set to " + Arg); } else { out.println("ERROR tunnel is active"); @@ -429,9 +516,13 @@ public class doCMDS implements Runnable { } else { String pname = otoken.nextToken(); String pval = otoken.nextToken(); + rlock(); Properties Q = (Properties)nickinfo.get(P_PROPERTIES); + runlock(); Q.setProperty(pname, pval); + wlock(); nickinfo.add(P_PROPERTIES, Q); + wunlock(); out.println("OK " + pname + " set to " + pval); } } @@ -440,16 +531,20 @@ public class doCMDS implements Runnable { } } else if(Command.equals(C_getnick)) { // Get the nickname to work with... + database.getReadLock(); try { nickinfo = (nickname)database.get(Arg); ns = true; } catch(RuntimeException b) { nns(out); } + database.releaseReadLock(); if(ns) { + rlock(); dk = nickinfo.exists(P_KEYS); ip = nickinfo.exists(P_INPORT); op = nickinfo.exists(P_OUTPORT); + runlock(); // Finally say OK. out.println("OK Nickname set to " + Arg); } @@ -461,6 +556,7 @@ public class doCMDS implements Runnable { out.println("ERROR tunnel is active"); } else { int prt; + wlock(); nickinfo.kill(P_INPORT); try { prt = Integer.parseInt(Arg); @@ -470,7 +566,10 @@ public class doCMDS implements Runnable { } catch(NumberFormatException nfe) { out.println("ERROR not a number"); } + wunlock(); + rlock(); ip = nickinfo.exists(P_INPORT); + runlock(); if(ip) { out.println("OK inbound port set"); } else { @@ -488,6 +587,7 @@ public class doCMDS implements Runnable { out.println("ERROR tunnel is active"); } else { int prt; + wlock(); nickinfo.kill(P_OUTPORT); try { prt = Integer.parseInt(Arg); @@ -497,7 +597,10 @@ public class doCMDS implements Runnable { } catch(NumberFormatException nfe) { out.println("ERROR not a number"); } + wunlock(); + rlock(); ip = nickinfo.exists(P_OUTPORT); + runlock(); if(ip) { out.println("OK outbound port set"); } else { @@ -512,7 +615,9 @@ public class doCMDS implements Runnable { if(tunnelactive(nickinfo)) { out.println("ERROR tunnel is active"); } else { + wlock(); nickinfo.add(P_INHOST, Arg); + wunlock(); out.println("OK inhost set"); } } else { @@ -523,7 +628,9 @@ public class doCMDS implements Runnable { if(tunnelactive(nickinfo)) { out.println("ERROR tunnel is active"); } else { + wlock(); nickinfo.add(P_OUTHOST, Arg); + wunlock(); out.println("OK outhost set"); } } else { @@ -533,7 +640,9 @@ public class doCMDS implements Runnable { // Get the current nickname properties if(ns) { out.print("OK"); + rlock(); nickprint(out, nickinfo); + runlock(); } else { nns(out); } @@ -545,9 +654,8 @@ public class doCMDS implements Runnable { } else { MUXlisten tunnel; try { - tunnel = new MUXlisten(nickinfo, _log); + tunnel = new MUXlisten(database, nickinfo, _log); Thread t = new Thread(tunnel); - nickinfo.add(P_STARTING, Boolean.TRUE); t.start(); out.println("OK tunnel starting"); } catch(I2PException e) { @@ -562,10 +670,15 @@ public class doCMDS implements Runnable { } else if(Command.equals(C_stop)) { // Stop the tunnel, if it is running if(ns) { - if(nickinfo.get(P_RUNNING).equals(Boolean.TRUE) && nickinfo.get(P_STOPPING).equals(Boolean.FALSE)) { + rlock(); + if(nickinfo.get(P_RUNNING).equals(Boolean.TRUE) && nickinfo.get(P_STOPPING).equals(Boolean.FALSE) && nickinfo.get(P_STARTING).equals(Boolean.FALSE)) { + runlock(); + wlock(); nickinfo.add(P_STOPPING, Boolean.TRUE); + wunlock(); out.println("OK tunnel stopping"); } else { + runlock(); out.println("ERROR tunnel is inactive"); } } else { @@ -577,7 +690,9 @@ public class doCMDS implements Runnable { if(tunnelactive(nickinfo)) { out.println("ERROR tunnel is active"); } else { + database.getWriteLock(); database.kill(nickinfo.get(P_NICKNAME)); + database.releaseWriteLock(); dk = ns = ip = op = false; out.println("OK cleared"); } @@ -588,7 +703,7 @@ public class doCMDS implements Runnable { if(database.exists(Arg)) { // Show status of a nickname out.print("OK "); - ttlpnt(out, database, Arg); + ttlpnt(out, Arg); } else { nns(out); } diff --git a/apps/BOB/src/net/i2p/BOB/nickname.java b/apps/BOB/src/net/i2p/BOB/nickname.java index b7b76b9ce..cef5950d9 100644 --- a/apps/BOB/src/net/i2p/BOB/nickname.java +++ b/apps/BOB/src/net/i2p/BOB/nickname.java @@ -21,7 +21,6 @@ * * ...for any additional details and liscense questions. */ - package net.i2p.BOB; /** @@ -29,17 +28,54 @@ package net.i2p.BOB; * * @author sponge */ -public class nickname { - - private Object[][] data; - private int index = 0; +public class nickname { + private static final int maxWritersWaiting = 2; + private volatile Object[][] data; + private volatile int index, writersWaiting, readers; + private volatile boolean writingInProgress; /** * make initial NULL object * */ public nickname() { - data = new Object[1][2]; + this.data = new Object[1][2]; + this.index = this.writersWaiting = this.readers = 0; + this.writingInProgress = false; + } + + synchronized public void getReadLock() { + while(writingInProgress | (writersWaiting >= maxWritersWaiting)) { + try { + wait(); + } catch(InterruptedException ie) { + } + readers++; + } + } + + synchronized public void releaseReadLock() { + readers--; + if((readers == 0) & (writersWaiting > 0)) { + notifyAll(); + } + } + + synchronized public void getWriteLock() { + writersWaiting++; + while((readers > 0) | writingInProgress) { + try { + wait(); + } catch(InterruptedException ie) { + } + } + writersWaiting--; + writingInProgress = true; + } + + synchronized public void releaseWriteLock() { + writingInProgress = false; + notifyAll(); } /** @@ -74,7 +110,7 @@ public class nickname { } olddata = new Object[index + 2][2]; // copy to olddata, skipping 'k' - for(i = 0 , l = 0; l < index; i++, l++) { + for(i = 0 , l = 0; l < index; i++, l++) { if(i == k) { l++; didsomething++; diff --git a/history.txt b/history.txt index a0f82cff0..5599a0613 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,7 @@ +2008-10-09 sponge + * Update version to -3 + * BOB database threadlocking fixes + 2008-10-08 sponge * Update version to -2 * Bugfixes and additions to BOB diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index a7c7152d8..88ab2c4ad 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -17,7 +17,7 @@ import net.i2p.CoreVersion; public class RouterVersion { public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String VERSION = "0.6.4"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);