2004-11-14 jrandom

* Fix a long standing leak in I2PTunnel (hanging on to i2psocket objects)
    * Fix a leak injected into the SimpleTimer
    * Fix a race condition in the tunnel message handling
This commit is contained in:
jrandom
2004-11-15 14:35:16 +00:00
committed by zzz
parent ad7dc66f90
commit 3780d290fa
10 changed files with 62 additions and 19 deletions

View File

@ -56,14 +56,20 @@ public class I2PTunnelClient extends I2PTunnelClientBase {
public long getReadTimeout() { return readTimeout; } public long getReadTimeout() { return readTimeout; }
protected void clientConnectionRun(Socket s) { protected void clientConnectionRun(Socket s) {
I2PSocket i2ps = null;
try { try {
I2PSocket i2ps = createI2PSocket(dest); i2ps = createI2PSocket(dest);
i2ps.setReadTimeout(readTimeout); i2ps.setReadTimeout(readTimeout);
new I2PTunnelRunner(s, i2ps, sockLock, null); new I2PTunnelRunner(s, i2ps, sockLock, null, mySockets);
} catch (Exception ex) { } catch (Exception ex) {
_log.info("Error connecting", ex); _log.info("Error connecting", ex);
l.log(ex.getMessage()); l.log(ex.getMessage());
closeSocket(s); closeSocket(s);
if (i2ps != null) {
synchronized (sockLock) {
mySockets.remove(sockLock);
}
}
} }
} }
} }

View File

@ -39,7 +39,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
protected long _clientId; protected long _clientId;
protected Object sockLock = new Object(); // Guards sockMgr and mySockets protected Object sockLock = new Object(); // Guards sockMgr and mySockets
private I2PSocketManager sockMgr; private I2PSocketManager sockMgr;
private List mySockets = new ArrayList(); protected List mySockets = new ArrayList();
protected Destination dest = null; protected Destination dest = null;
private int localPort; private int localPort;
@ -263,6 +263,9 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
} catch (IOException ex) { } catch (IOException ex) {
_log.error("Error listening for connections", ex); _log.error("Error listening for connections", ex);
notifyEvent("openBaseClientResult", "error"); notifyEvent("openBaseClientResult", "error");
synchronized (sockLock) {
mySockets.clear();
}
} }
} }

View File

@ -356,7 +356,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
String remoteID; String remoteID;
I2PSocket i2ps = createI2PSocket(dest); I2PSocket i2ps = createI2PSocket(dest);
byte[] data = newRequest.toString().getBytes("ISO-8859-1"); byte[] data = newRequest.toString().getBytes("ISO-8859-1");
I2PTunnelRunner runner = new I2PTunnelRunner(s, i2ps, sockLock, data); I2PTunnelRunner runner = new I2PTunnelRunner(s, i2ps, sockLock, data, mySockets);
timeoutThread = new InactivityTimeoutThread(runner, out, targetRequest, usingWWWProxy, currentProxy, s, requestId); timeoutThread = new InactivityTimeoutThread(runner, out, targetRequest, usingWWWProxy, currentProxy, s, requestId);
timeoutThread.start(); timeoutThread.start();
} catch (SocketException ex) { } catch (SocketException ex) {

View File

@ -11,6 +11,7 @@ import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocket;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
@ -44,10 +45,12 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
private long lastActivityOn; private long lastActivityOn;
/** when the runner started up */ /** when the runner started up */
private long startedOn; private long startedOn;
private List sockList;
private volatile long __forwarderId; private volatile long __forwarderId;
public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialData) { public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialData, List sockList) {
this.sockList = sockList;
this.s = s; this.s = s;
this.i2ps = i2ps; this.i2ps = i2ps;
this.slock = slock; this.slock = slock;
@ -115,9 +118,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
} }
// now one connection is dead - kill the other as well. // now one connection is dead - kill the other as well.
s.close(); s.close();
s = null;
i2ps.close(); i2ps.close();
i2ps = null;
t1.join(); t1.join();
t2.join(); t2.join();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
@ -130,9 +131,13 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Internal error", e); _log.error("Internal error", e);
} finally { } finally {
removeRef();
try { try {
if (s != null) s.close(); if (s != null) s.close();
if (i2ps != null) i2ps.close(); if (i2ps != null) {
i2ps.close();
i2ps.setSocketErrorListener(null);
}
} catch (IOException ex) { } catch (IOException ex) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Could not close socket", ex); _log.error("Could not close socket", ex);
@ -147,6 +152,16 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
} }
} }
private void removeRef() {
if (sockList != null) {
synchronized (slock) {
boolean removed = sockList.remove(i2ps);
System.out.println("Removal of i2psocket " + i2ps + " successful? "
+ removed + " remaining: " + sockList.size());
}
}
}
private class StreamForwarder extends I2PThread { private class StreamForwarder extends I2PThread {
InputStream in; InputStream in;

View File

@ -179,7 +179,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
_handleSocket.setReadTimeout(readTimeout); _handleSocket.setReadTimeout(readTimeout);
Socket s = new Socket(remoteHost, remotePort); Socket s = new Socket(remoteHost, remotePort);
afterSocket = I2PAppContext.getGlobalContext().clock().now(); afterSocket = I2PAppContext.getGlobalContext().clock().now();
new I2PTunnelRunner(s, _handleSocket, slock, null); new I2PTunnelRunner(s, _handleSocket, slock, null, null);
} catch (SocketException ex) { } catch (SocketException ex) {
try { try {
_handleSocket.close(); _handleSocket.close();

View File

@ -47,7 +47,7 @@ public class I2PSOCKSTunnel extends I2PTunnelClientBase {
SOCKSServer serv = SOCKSServerFactory.createSOCKSServer(s); SOCKSServer serv = SOCKSServerFactory.createSOCKSServer(s);
Socket clientSock = serv.getClientSocket(); Socket clientSock = serv.getClientSocket();
I2PSocket destSock = serv.getDestinationI2PSocket(); I2PSocket destSock = serv.getDestinationI2PSocket();
new I2PTunnelRunner(clientSock, destSock, sockLock, null); new I2PTunnelRunner(clientSock, destSock, sockLock, null, mySockets);
} catch (SOCKSException e) { } catch (SOCKSException e) {
_log.error("Error from SOCKS connection: " + e.getMessage()); _log.error("Error from SOCKS connection: " + e.getMessage());
closeSocket(s); closeSocket(s);

View File

@ -25,6 +25,7 @@ public class SimpleTimer {
private Map _eventTimes; private Map _eventTimes;
private SimpleTimer() { private SimpleTimer() {
_log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
_events = new TreeMap(); _events = new TreeMap();
_eventTimes = new HashMap(); _eventTimes = new HashMap();
I2PThread runner = new I2PThread(new SimpleTimerRunner()); I2PThread runner = new I2PThread(new SimpleTimerRunner());
@ -79,25 +80,34 @@ public class SimpleTimer {
synchronized (_events) { synchronized (_events) {
if (_events.size() <= 0) if (_events.size() <= 0)
_events.wait(); _events.wait();
if (_events.size() > 100)
_log.warn("> 100 events! " + _events.values());
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long nextEventDelay = -1; long nextEventDelay = -1;
Object nextEvent = null;
while (true) { while (true) {
if (_events.size() <= 0) break; if (_events.size() <= 0) break;
Long when = (Long)_events.firstKey(); Long when = (Long)_events.firstKey();
if (when.longValue() <= now) { if (when.longValue() <= now) {
TimedEvent evt = (TimedEvent)_events.remove(when); TimedEvent evt = (TimedEvent)_events.remove(when);
_eventTimes.remove(when); if (evt != null) {
eventsToFire.add(evt); _eventTimes.remove(evt);
eventsToFire.add(evt);
}
} else { } else {
nextEventDelay = when.longValue() - now; nextEventDelay = when.longValue() - now;
nextEvent = _events.get(when);
break; break;
} }
} }
if (eventsToFire.size() <= 0) { if (eventsToFire.size() <= 0) {
if (nextEventDelay != -1) if (nextEventDelay != -1) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
_events.wait(nextEventDelay); _events.wait(nextEventDelay);
else } else {
_events.wait(); _events.wait();
}
} }
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {

View File

@ -1,4 +1,9 @@
$Id: history.txt,v 1.69 2004/11/10 07:33:02 jrandom Exp $ $Id: history.txt,v 1.70 2004/11/13 04:43:35 jrandom Exp $
2004-11-14 jrandom
* Fix a long standing leak in I2PTunnel (hanging on to i2psocket objects)
* Fix a leak injected into the SimpleTimer
* Fix a race condition in the tunnel message handling
2004-11-13 jrandom 2004-11-13 jrandom
* Added throttles on how many I2PTunnel client connections we open at once * Added throttles on how many I2PTunnel client connections we open at once

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.74 $ $Date: 2004/11/10 07:33:01 $"; public final static String ID = "$Revision: 1.75 $ $Date: 2004/11/13 04:43:35 $";
public final static String VERSION = "0.4.1.4"; public final static String VERSION = "0.4.1.4";
public final static long BUILD = 3; public final static long BUILD = 4;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION); System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -331,7 +331,6 @@ public class HandleTunnelMessageJob extends JobImpl {
} }
private void sendToTunnel(Hash router, TunnelId id, I2NPMessage body) { private void sendToTunnel(Hash router, TunnelId id, I2NPMessage body) {
// TODO: we may want to send it via a tunnel later on, but for now, direct will do.
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending on to requested tunnel " + id.getTunnelId() + " on router " _log.debug("Sending on to requested tunnel " + id.getTunnelId() + " on router "
+ router.toBase64()); + router.toBase64());
@ -341,6 +340,11 @@ public class HandleTunnelMessageJob extends JobImpl {
timeoutMs = FORWARD_TIMEOUT; timeoutMs = FORWARD_TIMEOUT;
TunnelInfo curInfo = getContext().tunnelManager().getTunnelInfo(_message.getTunnelId()); TunnelInfo curInfo = getContext().tunnelManager().getTunnelInfo(_message.getTunnelId());
if (curInfo == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Tunnel went away (" + _message.getTunnelId() + ")");
return;
}
if (curInfo.getTunnelId().getType() != TunnelId.TYPE_INBOUND) { if (curInfo.getTunnelId().getType() != TunnelId.TYPE_INBOUND) {
// we are not processing a request at the end of an inbound tunnel, so // we are not processing a request at the end of an inbound tunnel, so
// there's no reason to hide our location. honor the request directly // there's no reason to hide our location. honor the request directly