2004-11-27 jrandom
* Removed the I2PTunnel inactivity timeout thread, since the new streaming lib can do that (without an additional per-connection thread). * Close the I2PTunnel forwarder threads more aggressively
This commit is contained in:
@ -184,7 +184,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
|||||||
* create the default options (using the default timeout, etc)
|
* create the default options (using the default timeout, etc)
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private I2PSocketOptions getDefaultOptions() {
|
protected I2PSocketOptions getDefaultOptions() {
|
||||||
Properties defaultOpts = getTunnel().getClientOptions();
|
Properties defaultOpts = getTunnel().getClientOptions();
|
||||||
I2PSocketOptions opts = sockMgr.buildOptions(defaultOpts);
|
I2PSocketOptions opts = sockMgr.buildOptions(defaultOpts);
|
||||||
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_CONNECT_TIMEOUT))
|
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_CONNECT_TIMEOUT))
|
||||||
@ -192,6 +192,19 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
|||||||
return opts;
|
return opts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create the default options (using the default timeout, etc)
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
protected I2PSocketOptions getDefaultOptions(Properties overrides) {
|
||||||
|
Properties defaultOpts = getTunnel().getClientOptions();
|
||||||
|
defaultOpts.putAll(overrides);
|
||||||
|
I2PSocketOptions opts = sockMgr.buildOptions(defaultOpts);
|
||||||
|
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_CONNECT_TIMEOUT))
|
||||||
|
opts.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
|
||||||
|
return opts;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new I2PSocket towards to the specified destination,
|
* Create a new I2PSocket towards to the specified destination,
|
||||||
* adding it to the list of connections actually managed by this
|
* adding it to the list of connections actually managed by this
|
||||||
|
@ -14,6 +14,7 @@ import java.util.Date;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
import net.i2p.I2PAppContext;
|
import net.i2p.I2PAppContext;
|
||||||
import net.i2p.I2PException;
|
import net.i2p.I2PException;
|
||||||
@ -149,7 +150,6 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
|
|||||||
String targetRequest = null;
|
String targetRequest = null;
|
||||||
boolean usingWWWProxy = false;
|
boolean usingWWWProxy = false;
|
||||||
String currentProxy = null;
|
String currentProxy = null;
|
||||||
InactivityTimeoutThread timeoutThread = null;
|
|
||||||
long requestId = ++__requestId;
|
long requestId = ++__requestId;
|
||||||
try {
|
try {
|
||||||
out = s.getOutputStream();
|
out = s.getOutputStream();
|
||||||
@ -354,25 +354,26 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String remoteID;
|
String remoteID;
|
||||||
I2PSocket i2ps = createI2PSocket(dest);
|
|
||||||
|
Properties opts = new Properties();
|
||||||
|
opts.setProperty("i2p.streaming.inactivityTimeout", ""+120*1000);
|
||||||
|
// 1 == disconnect. see ConnectionOptions in the new streaming lib, which i
|
||||||
|
// dont want to hard link to here
|
||||||
|
opts.setProperty("i2p.streaming.inactivityTimeoutAction", ""+1);
|
||||||
|
I2PSocket i2ps = createI2PSocket(dest, getDefaultOptions(opts));
|
||||||
byte[] data = newRequest.toString().getBytes("ISO-8859-1");
|
byte[] data = newRequest.toString().getBytes("ISO-8859-1");
|
||||||
I2PTunnelRunner runner = new I2PTunnelRunner(s, i2ps, sockLock, data, mySockets);
|
I2PTunnelRunner runner = new I2PTunnelRunner(s, i2ps, sockLock, data, mySockets);
|
||||||
timeoutThread = new InactivityTimeoutThread(runner, out, targetRequest, usingWWWProxy, currentProxy, s, requestId);
|
|
||||||
timeoutThread.start();
|
|
||||||
} catch (SocketException ex) {
|
} catch (SocketException ex) {
|
||||||
if (timeoutThread != null) timeoutThread.disable();
|
|
||||||
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
|
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
|
||||||
l.log(ex.getMessage());
|
l.log(ex.getMessage());
|
||||||
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||||
closeSocket(s);
|
closeSocket(s);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
if (timeoutThread != null) timeoutThread.disable();
|
|
||||||
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
|
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
|
||||||
l.log(ex.getMessage());
|
l.log(ex.getMessage());
|
||||||
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||||
closeSocket(s);
|
closeSocket(s);
|
||||||
} catch (I2PException ex) {
|
} catch (I2PException ex) {
|
||||||
if (timeoutThread != null) timeoutThread.disable();
|
|
||||||
_log.info("getPrefix(requestId) + Error trying to connect", ex);
|
_log.info("getPrefix(requestId) + Error trying to connect", ex);
|
||||||
l.log(ex.getMessage());
|
l.log(ex.getMessage());
|
||||||
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||||
@ -380,91 +381,6 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final long INACTIVITY_TIMEOUT = 120 * 1000;
|
|
||||||
private static volatile long __timeoutId = 0;
|
|
||||||
|
|
||||||
private class InactivityTimeoutThread extends I2PThread {
|
|
||||||
|
|
||||||
private Socket s;
|
|
||||||
private I2PTunnelRunner _runner;
|
|
||||||
private OutputStream _out;
|
|
||||||
private String _targetRequest;
|
|
||||||
private boolean _useWWWProxy;
|
|
||||||
private String _currentProxy;
|
|
||||||
private long _requestId;
|
|
||||||
private boolean _disabled;
|
|
||||||
private Object _disableLock = new Object();
|
|
||||||
|
|
||||||
public InactivityTimeoutThread(I2PTunnelRunner runner, OutputStream out, String targetRequest,
|
|
||||||
boolean useWWWProxy, String currentProxy, Socket s, long requestId) {
|
|
||||||
this.s = s;
|
|
||||||
_runner = runner;
|
|
||||||
_out = out;
|
|
||||||
_targetRequest = targetRequest;
|
|
||||||
_useWWWProxy = useWWWProxy;
|
|
||||||
_currentProxy = currentProxy;
|
|
||||||
_disabled = false;
|
|
||||||
_requestId = requestId;
|
|
||||||
long timeoutId = ++__timeoutId;
|
|
||||||
setName("InactivityThread " + getPrefix(requestId) + timeoutId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void disable() {
|
|
||||||
_disabled = true;
|
|
||||||
synchronized (_disableLock) {
|
|
||||||
_disableLock.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void run() {
|
|
||||||
while (!_disabled) {
|
|
||||||
if (_runner.isFinished()) {
|
|
||||||
if (_log.shouldLog(Log.INFO)) _log.info(getPrefix(_requestId) + "HTTP client request completed prior to timeout");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (_runner.getLastActivityOn() < Clock.getInstance().now() - INACTIVITY_TIMEOUT) {
|
|
||||||
if (_runner.getStartedOn() < Clock.getInstance().now() - INACTIVITY_TIMEOUT) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn(getPrefix(_requestId) + "HTTP client request timed out (lastActivity: "
|
|
||||||
+ new Date(_runner.getLastActivityOn()) + ", startedOn: "
|
|
||||||
+ new Date(_runner.getStartedOn()) + ")");
|
|
||||||
timeout();
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
// runner hasn't been going to long enough
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// there has been activity in the period
|
|
||||||
}
|
|
||||||
synchronized (_disableLock) {
|
|
||||||
try {
|
|
||||||
_disableLock.wait(INACTIVITY_TIMEOUT);
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void timeout() {
|
|
||||||
_log.info(getPrefix(_requestId) + "Inactivity timeout reached");
|
|
||||||
l.log("Inactivity timeout reached");
|
|
||||||
if (_out != null) {
|
|
||||||
try {
|
|
||||||
if (_runner.getLastActivityOn() > 0) {
|
|
||||||
// some data has been sent, so don't 404 it
|
|
||||||
} else {
|
|
||||||
writeErrorMessage(ERR_TIMEOUT, _out, _targetRequest, _useWWWProxy, _currentProxy);
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.warn(getPrefix(_requestId) + "Error writing out the 'timeout' message", ioe);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
_log.warn(getPrefix(_requestId) + "Client disconnected before we could say we timed out");
|
|
||||||
}
|
|
||||||
closeSocket(s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final static String getHostName(String host) {
|
private final static String getHostName(String host) {
|
||||||
if (host == null) return null;
|
if (host == null) return null;
|
||||||
try {
|
try {
|
||||||
|
@ -120,8 +120,8 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
|
|||||||
// now one connection is dead - kill the other as well.
|
// now one connection is dead - kill the other as well.
|
||||||
s.close();
|
s.close();
|
||||||
i2ps.close();
|
i2ps.close();
|
||||||
t1.join();
|
t1.join(30*1000);
|
||||||
t2.join();
|
t2.join(30*1000);
|
||||||
closedCleanly = true;
|
closedCleanly = true;
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
|
@ -1,4 +1,9 @@
|
|||||||
$Id: history.txt,v 1.83 2004/11/26 10:20:14 jrandom Exp $
|
$Id: history.txt,v 1.84 2004/11/26 22:54:18 jrandom Exp $
|
||||||
|
|
||||||
|
2004-11-27 jrandom
|
||||||
|
* Removed the I2PTunnel inactivity timeout thread, since the new streaming
|
||||||
|
lib can do that (without an additional per-connection thread).
|
||||||
|
* Close the I2PTunnel forwarder threads more aggressively
|
||||||
|
|
||||||
2004-11-27 jrandom
|
2004-11-27 jrandom
|
||||||
* Fix for a fast loop caused by a race in the new streaming library (thanks
|
* Fix for a fast loop caused by a race in the new streaming library (thanks
|
||||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class RouterVersion {
|
public class RouterVersion {
|
||||||
public final static String ID = "$Revision: 1.88 $ $Date: 2004/11/26 10:15:17 $";
|
public final static String ID = "$Revision: 1.89 $ $Date: 2004/11/26 22:54:17 $";
|
||||||
public final static String VERSION = "0.4.2";
|
public final static String VERSION = "0.4.2";
|
||||||
public final static long BUILD = 1;
|
public final static long BUILD = 2;
|
||||||
public static void main(String args[]) {
|
public static void main(String args[]) {
|
||||||
System.out.println("I2P Router version: " + VERSION);
|
System.out.println("I2P Router version: " + VERSION);
|
||||||
System.out.println("Router ID: " + RouterVersion.ID);
|
System.out.println("Router ID: " + RouterVersion.ID);
|
||||||
|
@ -143,6 +143,8 @@ public class StatisticsManager implements Service {
|
|||||||
//includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 });
|
//includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
//includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 });
|
//includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("client.sendAckTime", stats, new long[] { 60*60*1000 }, true);
|
includeRate("client.sendAckTime", stats, new long[] { 60*60*1000 }, true);
|
||||||
|
includeRate("stream.con.sendDuplicateSize", stats, new long[] { 60*60*1000 });
|
||||||
|
includeRate("stream.con.receiveDuplicateSize", stats, new long[] { 60*60*1000 });
|
||||||
//includeRate("client.sendsPerFailure", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
//includeRate("client.sendsPerFailure", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
||||||
//includeRate("client.timeoutCongestionTunnel", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
//includeRate("client.timeoutCongestionTunnel", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
||||||
//includeRate("client.timeoutCongestionMessage", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
//includeRate("client.timeoutCongestionMessage", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
||||||
|
Reference in New Issue
Block a user