2004-11-08 jrandom

* Remove spurious flush calls from I2PTunnel, and work with the
      I2PSocket's output stream directly (as the various implementations
      do their own buffering).
    * Another pass at a long standing JobQueue bug - dramatically simplify
      the job management synchronization since we dont need to deal with
      high contention (unlike last year when we had dozens of queue runners
      going at once).
    * Logging
This commit is contained in:
jrandom
2004-11-08 05:40:20 +00:00
committed by zzz
parent 096b807c37
commit 0c049f39d9
5 changed files with 131 additions and 107 deletions

View File

@ -12,7 +12,6 @@ import java.net.Socket;
import java.net.SocketException;
import java.util.HashMap;
import net.i2p.client.I2PSession;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.util.Clock;
import net.i2p.util.I2PThread;
@ -38,13 +37,14 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
Object slock, finishLock = new Object();
boolean finished = false;
HashMap ostreams, sockets;
I2PSession session;
byte[] initialData;
/** when the last data was sent/received (or -1 if never) */
private long lastActivityOn;
/** when the runner started up */
private long startedOn;
private volatile long __forwarderId;
public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialData) {
this.s = s;
this.i2ps = i2ps;
@ -55,6 +55,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
if (_log.shouldLog(Log.INFO))
_log.info("I2PTunnelRunner started");
_runnerId = ++__runnerId;
__forwarderId = i2ps.hashCode();
setName("I2PTunnelRunner " + _runnerId);
start();
}
@ -96,15 +97,15 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
OutputStream out = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE);
i2ps.setSocketErrorListener(this);
InputStream i2pin = i2ps.getInputStream();
OutputStream i2pout = new BufferedOutputStream(i2ps.getOutputStream(), MAX_PACKET_SIZE);
OutputStream i2pout = i2ps.getOutputStream(); //new BufferedOutputStream(i2ps.getOutputStream(), MAX_PACKET_SIZE);
if (initialData != null) {
synchronized (slock) {
i2pout.write(initialData);
i2pout.flush();
//i2pout.flush();
}
}
Thread t1 = new StreamForwarder(in, i2pout);
Thread t2 = new StreamForwarder(i2pin, out);
Thread t1 = new StreamForwarder(in, i2pout, "toI2P");
Thread t2 = new StreamForwarder(i2pin, out, "fromI2P");
synchronized (finishLock) {
while (!finished) {
finishLock.wait();
@ -118,19 +119,21 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
t1.join();
t2.join();
} catch (InterruptedException ex) {
_log.error("Interrupted", ex);
if (_log.shouldLog(Log.ERROR))
_log.error("Interrupted", ex);
} catch (IOException ex) {
ex.printStackTrace();
_log.debug("Error forwarding", ex);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error forwarding", ex);
} catch (Exception e) {
_log.error("Internal error", e);
if (_log.shouldLog(Log.ERROR))
_log.error("Internal error", e);
} finally {
try {
if (s != null) s.close();
if (i2ps != null) i2ps.close();
} catch (IOException ex) {
ex.printStackTrace();
_log.error("Could not close socket", ex);
if (_log.shouldLog(Log.ERROR))
_log.error("Could not close socket", ex);
}
}
}
@ -142,21 +145,31 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
}
}
private volatile long __forwarderId = 0;
private class StreamForwarder extends I2PThread {
InputStream in;
OutputStream out;
String direction;
private StreamForwarder(InputStream in, OutputStream out) {
private StreamForwarder(InputStream in, OutputStream out, String dir) {
this.in = in;
this.out = out;
direction = dir;
setName("StreamForwarder " + _runnerId + "." + (++__forwarderId));
start();
}
public void run() {
if (_log.shouldLog(Log.DEBUG)) {
String from = i2ps.getThisDestination().calculateHash().toBase64().substring(0,6);
String to = i2ps.getPeerDestination().calculateHash().toBase64().substring(0,6);
_log.debug(direction + ": Forwarding between "
+ from
+ " and "
+ to);
}
byte[] buffer = new byte[NETWORK_BUFFER_SIZE];
try {
int len;
@ -166,30 +179,39 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
if (len > 0) updateActivity();
if (in.available() == 0) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Flushing after sending " + len + " bytes through");
if (_log.shouldLog(Log.DEBUG))
_log.debug(direction + ": " + len + " bytes flushed through to "
+ i2ps.getPeerDestination().calculateHash().toBase64().substring(0,6));
try {
Thread.sleep(I2PTunnel.PACKET_DELAY);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (in.available() == 0) {
out.flush(); // make sure the data get though
if (in.available() <= 0)
out.flush(); // make sure the data get though
}
}
} catch (SocketException ex) {
// this *will* occur when the other threads closes the socket
synchronized (finishLock) {
if (!finished) {
_log.debug("Socket closed - error reading and writing",
ex);
if (_log.shouldLog(Log.DEBUG))
_log.debug(direction + ": Socket closed - error reading and writing",
ex);
}
}
} catch (InterruptedIOException ex) {
_log.warn("Closing connection due to timeout (error: \""
+ ex.getMessage() + "\")");
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Closing connection due to timeout (error: \""
+ ex.getMessage() + "\")");
} catch (IOException ex) {
if (!finished)
_log.error("Error forwarding", ex);
if (!finished) {
if (_log.shouldLog(Log.ERROR))
_log.error(direction + ": Error forwarding", ex);
}
//else
// _log.warn("You may ignore this", ex);
} finally {
@ -198,7 +220,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
in.close();
} catch (IOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error closing streams", ex);
_log.warn(direction + ": Error closing streams", ex);
}
synchronized (finishLock) {
finished = true;

View File

@ -118,11 +118,15 @@ public class StreamSinkServer {
try {
InputStream in = _sock.getInputStream();
byte buf[] = new byte[4096];
long written = 0;
int read = 0;
while ( (read = in.read(buf)) != -1) {
_fos.write(buf, 0, read);
written += read;
if (_log.shouldLog(Log.DEBUG))
_log.debug("read and wrote " + read);
}
_log.error("Got EOF from client socket");
_log.error("Got EOF from client socket [written=" + written + "]");
} catch (IOException ioe) {
_log.error("Error writing the sink", ioe);
} finally {
@ -143,6 +147,9 @@ public class StreamSinkServer {
public static void main(String args[]) {
StreamSinkServer server = null;
switch (args.length) {
case 0:
server = new StreamSinkServer("dataDir", "server.key", "localhost", 10001);
break;
case 2:
server = new StreamSinkServer(args[0], args[1]);
break;

View File

@ -1,4 +1,14 @@
$Id: history.txt,v 1.66 2004/11/06 22:00:56 jrandom Exp $
$Id: history.txt,v 1.67 2004/11/07 22:18:01 jrandom Exp $
2004-11-08 jrandom
* Remove spurious flush calls from I2PTunnel, and work with the
I2PSocket's output stream directly (as the various implementations
do their own buffering).
* Another pass at a long standing JobQueue bug - dramatically simplify
the job management synchronization since we dont need to deal with
high contention (unlike last year when we had dozens of queue runners
going at once).
* Logging
2004-11-08 jrandom
* Make the SAM bridge more resiliant to bad handshakes (thanks duck!)

View File

@ -52,6 +52,8 @@ public class JobQueue {
/** have we been killed or are we alive? */
private boolean _alive;
private Object _jobLock;
/** default max # job queue runners operating */
private final static int DEFAULT_MAX_RUNNERS = 1;
/** router.config parameter to override the max runners */
@ -109,8 +111,9 @@ public class JobQueue {
new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_alive = true;
_readyJobs = new ArrayList();
_timedJobs = new ArrayList();
_readyJobs = new ArrayList(16);
_timedJobs = new ArrayList(64);
_jobLock = new Object();
_queueRunners = new HashMap();
_jobStats = Collections.synchronizedSortedMap(new TreeMap());
_allowParallelOperation = false;
@ -134,68 +137,59 @@ public class JobQueue {
long numReady = 0;
boolean alreadyExists = false;
synchronized (_readyJobs) {
synchronized (_jobLock) {
if (_readyJobs.contains(job))
alreadyExists = true;
numReady = _readyJobs.size();
}
if (!alreadyExists) {
synchronized (_timedJobs) {
if (!alreadyExists) {
if (_timedJobs.contains(job))
alreadyExists = true;
}
}
_context.statManager().addRateData("jobQueue.readyJobs", numReady, 0);
if (shouldDrop(job, numReady)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping job due to overload! # ready jobs: "
+ numReady + ": job = " + job);
job.dropped();
_context.statManager().addRateData("jobQueue.droppedJobs", 1, 1);
synchronized (_readyJobs) {
_readyJobs.notifyAll();
_context.statManager().addRateData("jobQueue.readyJobs", numReady, 0);
if (shouldDrop(job, numReady)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping job due to overload! # ready jobs: "
+ numReady + ": job = " + job);
job.dropped();
_context.statManager().addRateData("jobQueue.droppedJobs", 1, 1);
_jobLock.notifyAll();
return;
}
return;
}
if (!alreadyExists) {
if (job.getTiming().getStartAfter() <= _context.clock().now()) {
// don't skew us - its 'start after' its been queued, or later
job.getTiming().setStartAfter(_context.clock().now());
if (job instanceof JobImpl)
((JobImpl)job).madeReady();
synchronized (_readyJobs) {
if (!alreadyExists) {
if (job.getTiming().getStartAfter() <= _context.clock().now()) {
// don't skew us - its 'start after' its been queued, or later
job.getTiming().setStartAfter(_context.clock().now());
if (job instanceof JobImpl)
((JobImpl)job).madeReady();
_readyJobs.add(job);
_readyJobs.notifyAll();
_jobLock.notifyAll();
} else {
_timedJobs.add(job);
_jobLock.notifyAll();
}
} else {
synchronized (_timedJobs) {
_timedJobs.add(job);
_timedJobs.notifyAll();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not adding already enqueued job " + job.getName());
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not adding already enqueued job " + job.getName());
}
return;
}
public void timingUpdated() {
synchronized (_timedJobs) {
_timedJobs.notifyAll();
synchronized (_jobLock) {
_jobLock.notifyAll();
}
}
public int getReadyCount() {
synchronized (_readyJobs) {
synchronized (_jobLock) {
return _readyJobs.size();
}
}
public long getMaxLag() {
synchronized (_readyJobs) {
synchronized (_jobLock) {
if (_readyJobs.size() <= 0) return 0;
// first job is the one that has been waiting the longest
long startAfter = ((Job)_readyJobs.get(0)).getTiming().getStartAfter();
@ -237,19 +231,17 @@ public class JobQueue {
public void allowParallelOperation() { _allowParallelOperation = true; }
public void restart() {
synchronized (_timedJobs) {
synchronized (_jobLock) {
_timedJobs.clear();
}
synchronized (_readyJobs) {
_readyJobs.clear();
_readyJobs.notifyAll();
_jobLock.notifyAll();
}
}
void shutdown() {
_alive = false;
synchronized (_readyJobs) {
_readyJobs.notifyAll();
synchronized (_jobLock) {
_jobLock.notifyAll();
}
if (_log.shouldLog(Log.WARN)) {
StringBuffer buf = new StringBuffer(1024);
@ -339,11 +331,11 @@ public class JobQueue {
*/
Job getNext() {
while (_alive) {
synchronized (_readyJobs) {
synchronized (_jobLock) {
if (_readyJobs.size() > 0) {
return (Job)_readyJobs.remove(0);
} else {
try { _readyJobs.wait(); } catch (InterruptedException ie) {}
try { _jobLock.wait(); } catch (InterruptedException ie) {}
}
}
}
@ -413,7 +405,7 @@ public class JobQueue {
long now = _context.clock().now();
long timeToWait = 0;
ArrayList toAdd = null;
synchronized (_timedJobs) {
synchronized (_jobLock) {
for (int i = 0; i < _timedJobs.size(); i++) {
Job j = (Job)_timedJobs.get(i);
// find jobs due to start before now
@ -431,13 +423,10 @@ public class JobQueue {
timeToWait = timeLeft;
}
}
}
if (toAdd != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not waiting - we have " + toAdd.size() + " newly ready jobs");
synchronized (_readyJobs) {
if (toAdd != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not waiting - we have " + toAdd.size() + " newly ready jobs");
// rather than addAll, which allocs a byte array rv before adding,
// we iterate, since toAdd is usually going to only be 1 or 2 entries
// and since readyJobs will often have the space, we can avoid the
@ -445,22 +434,20 @@ public class JobQueue {
// on some profiling data ;)
for (int i = 0; i < toAdd.size(); i++)
_readyJobs.add(toAdd.get(i));
_readyJobs.notifyAll();
_jobLock.notifyAll();
} else {
if (timeToWait < 100)
timeToWait = 100;
if (timeToWait > 10*1000)
timeToWait = 10*1000;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Waiting " + timeToWait + " before rechecking the timed queue");
try {
_jobLock.wait(timeToWait);
} catch (InterruptedException ie) {}
}
} else {
if (timeToWait < 100)
timeToWait = 100;
if (timeToWait > 10*1000)
timeToWait = 10*1000;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Waiting " + timeToWait + " before rechecking the timed queue");
try {
synchronized (_timedJobs) {
_timedJobs.wait(timeToWait);
}
} catch (InterruptedException ie) {}
}
}
} // synchronize (_jobLock)
} // while (_alive)
} catch (Throwable t) {
_context.clock().removeUpdateListener(this);
if (_log.shouldLog(Log.ERROR))
@ -470,8 +457,8 @@ public class JobQueue {
public void offsetChanged(long delta) {
updateJobTimings(delta);
synchronized (_timedJobs) {
_timedJobs.notifyAll();
synchronized (_jobLock) {
_jobLock.notifyAll();
}
}
@ -482,13 +469,11 @@ public class JobQueue {
* completion.
*/
private void updateJobTimings(long delta) {
synchronized (_timedJobs) {
synchronized (_jobLock) {
for (int i = 0; i < _timedJobs.size(); i++) {
Job j = (Job)_timedJobs.get(i);
j.getTiming().offsetChanged(delta);
}
}
synchronized (_readyJobs) {
for (int i = 0; i < _readyJobs.size(); i++) {
Job j = (Job)_readyJobs.get(i);
j.getTiming().offsetChanged(delta);
@ -605,11 +590,11 @@ public class JobQueue {
out.write(str.toString());
out.flush();
synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); }
out.write("<!-- jobQueue rendering: after readyJobs sync -->\n");
out.flush();
synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); }
out.write("<!-- jobQueue rendering: after timedJobs sync -->\n");
synchronized (_jobLock) {
readyJobs = new ArrayList(_readyJobs);
timedJobs = new ArrayList(_timedJobs);
}
out.write("<!-- jobQueue rendering: after jobLock sync -->\n");
out.flush();
StringBuffer buf = new StringBuffer(32*1024);

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.71 $ $Date: 2004/11/06 21:25:13 $";
public final static String ID = "$Revision: 1.72 $ $Date: 2004/11/06 22:00:57 $";
public final static String VERSION = "0.4.1.4";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);