more on resume/accept, untested

This commit is contained in:
zzz
2011-07-16 20:22:00 +00:00
parent c826f7fb48
commit f87e3b52e3
4 changed files with 204 additions and 24 deletions

View File

@ -201,18 +201,30 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements DCCHelper
}
public int resumeOutgoing(int port) {
DCCClientManager tracker = _DCCClientManager;
if (tracker != null)
return tracker.resumeOutgoing(port);
return -1;
}
public int resumeIncoming(int port) {
I2PTunnelDCCServer server = _DCCServer;
if (server != null)
return server.resumeIncoming(port);
return -1;
}
public int acceptOutgoing(int port) {
I2PTunnelDCCServer server = _DCCServer;
if (server != null)
return server.acceptOutgoing(port);
return -1;
}
public int acceptIncoming(int port) {
DCCClientManager tracker = _DCCClientManager;
if (tracker != null)
return tracker.acceptIncoming(port);
return -1;
}
}

View File

@ -15,10 +15,14 @@ import net.i2p.util.Log;
*
* <pre>
*
* <--- I2PTunnelDCCServer <--------------- I2PTunnelDCCClient <----
* direct conn
* <---> I2PTunnelDCCServer <--------------->I2PTunnelDCCClient <---->
* originating responding
* chat client chat client
* ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
* CHAT ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
* SEND ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
* RESUME <--- I2PTunnelIRCClient <-- IRC server <-- I2TunnelIRCClient <-----
* ACCEPT ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
*
* </pre>
*
@ -31,8 +35,12 @@ public class DCCClientManager extends EventReceiver {
private final I2PTunnel _tunnel;
private final Log _log;
/** key is the DCC client's local port */
private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _incoming;
/** key is the DCC client's local port */
private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _active;
/** key is the DCC client's local port */
private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _complete;
// list of client tunnels?
private static long _id;
@ -50,6 +58,7 @@ public class DCCClientManager extends EventReceiver {
_log = tunnel.getContext().logManager().getLog(DCCClientManager.class);
_incoming = new ConcurrentHashMap(8);
_active = new ConcurrentHashMap(8);
_complete = new ConcurrentHashMap(8);
}
public boolean close(boolean forced) {
@ -61,18 +70,26 @@ public class DCCClientManager extends EventReceiver {
c.stop();
}
_active.clear();
_complete.clear();
return true;
}
/**
* An incoming DCC request
*
* @param b32 remote dcc server address
* @param port remote dcc server port
* @param b32 remote dcc server b32 address
* @param port remote dcc server I2P port
* @param type ignored
* @return local server port or -1 on error
* @return local DCC client tunnel port or -1 on error
*/
public int newIncoming(String b32, int port, String type) {
return newIncoming(b32, port, type, 0);
}
/**
* @param localPort bind to port or 0; if nonzero it will be the rv
*/
private int newIncoming(String b32, int port, String type, int localPort) {
expireInbound();
if (_incoming.size() >= MAX_INCOMING_PENDING ||
_active.size() >= MAX_INCOMING_PENDING) {
@ -83,7 +100,7 @@ public class DCCClientManager extends EventReceiver {
try {
// Transparent tunnel used for all types...
// Do we need to do any filtering for chat?
I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, port, l, sockMgr,
I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, localPort, port, l, sockMgr,
_dispatch, _tunnel, ++_id);
cTunnel.attachEventDispatcher(this);
int lport = cTunnel.getLocalPort();
@ -99,6 +116,54 @@ public class DCCClientManager extends EventReceiver {
}
}
/**
* An outgoing RESUME request
*
* @param port local DCC client tunnel port
* @return remote DCC server i2p port or -1 on error
*/
public int resumeOutgoing(int port) {
Integer lport = Integer.valueOf(port);
I2PTunnelDCCClient tun = _complete.get(lport);
if (tun == null) {
tun = _active.get(lport);
if (tun == null)
// shouldn't happen
tun = _incoming.get(lport);
}
if (tun != null) {
tun.stop();
return tun.getLocalPort();
}
return -1;
}
/**
* An incoming ACCEPT response
*
* @param port remote dcc server I2P port
* @return local DCC client tunnel port or -1 on error
*/
public int acceptIncoming(int port) {
// do a reverse lookup
for (I2PTunnelDCCClient tun : _complete.values()) {
if (tun.getRemotePort() == port)
return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort());
}
for (I2PTunnelDCCClient tun : _active.values()) {
if (tun.getRemotePort() == port)
return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort());
}
for (I2PTunnelDCCClient tun : _incoming.values()) {
if (tun.getRemotePort() == port) {
// shouldn't happen
tun.stop();
return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort());
}
}
return -1;
}
/**
* The EventReceiver callback
*/
@ -124,17 +189,23 @@ public class DCCClientManager extends EventReceiver {
if (_log.shouldLog(Log.WARN))
_log.warn("Added client tunnel for port " + lport +
" pending count now: " + _incoming.size() +
" active count now: " + _active.size());
" active count now: " + _active.size() +
" complete count now: " + _complete.size());
}
}
private void connStopped(Integer lport) {
_incoming.remove(lport);
_active.remove(lport);
I2PTunnelDCCClient tun = _incoming.remove(lport);
if (tun != null)
_complete.put(lport, tun);
tun = _active.remove(lport);
if (tun != null)
_complete.put(lport, tun);
if (_log.shouldLog(Log.WARN))
_log.warn("Removed client tunnel for port " + lport +
" pending count now: " + _incoming.size() +
" active count now: " + _active.size());
" active count now: " + _active.size() +
" complete count now: " + _complete.size());
}
private void expireInbound() {
@ -146,5 +217,12 @@ public class DCCClientManager extends EventReceiver {
}
}
// shouldn't need to expire active
for (Iterator<I2PTunnelDCCClient> iter = _complete.values().iterator(); iter.hasNext(); ) {
I2PTunnelDCCClient c = iter.next();
if (c.getExpires() < _tunnel.getContext().clock().now()) {
iter.remove();
c.stop();
}
}
}
}

View File

@ -29,21 +29,23 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase {
// delay resolution until connect time
private final String _dest;
private final int _remotePort;
private final long _expires;
private long _expires;
private static final long INBOUND_EXPIRE = 30*60*1000;
private static final long INBOUND_STOP_EXPIRE = 30*60*1000;
public static final String CONNECT_START_EVENT = "connectionStarted";
public static final String CONNECT_STOP_EVENT = "connectionStopped";
/**
* @param dest the target, presumably b32
* @param localPort if 0, use any port, get actual port selected with getLocalPort()
* @throws IllegalArgumentException if the I2PTunnel does not contain
* valid config to contact the router
*/
public I2PTunnelDCCClient(String dest, int remotePort, Logging l,
public I2PTunnelDCCClient(String dest, int localPort, int remotePort, Logging l,
I2PSocketManager sktMgr, EventDispatcher notifyThis,
I2PTunnel tunnel, long clientId) throws IllegalArgumentException {
super(0, l, sktMgr, tunnel, notifyThis, clientId);
super(localPort, l, sktMgr, tunnel, notifyThis, clientId);
_dest = dest;
_remotePort = remotePort;
_expires = tunnel.getContext().clock().now() + INBOUND_EXPIRE;
@ -89,6 +91,14 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase {
return _expires;
}
public String getDest() {
return _dest;
}
public int getRemotePort() {
return _remotePort;
}
/**
* Stop listening for new sockets.
* We can't call super.close() as it kills all sockets in the sockMgr
@ -112,8 +122,10 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase {
@Override
public void run() {
_expires = getTunnel().getContext().clock().now() + INBOUND_STOP_EXPIRE;
notifyEvent(CONNECT_START_EVENT, I2PTunnelDCCClient.this);
super.run();
_expires = getTunnel().getContext().clock().now() + INBOUND_STOP_EXPIRE;
notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort()));
}
}

View File

@ -6,8 +6,10 @@ import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
@ -24,10 +26,14 @@ import net.i2p.util.Log;
*
* <pre>
*
* <--- I2PTunnelDCCServer <--------------- I2PTunnelDCCClient <----
* direct conn
* <---> I2PTunnelDCCServer <--------------->I2PTunnelDCCClient <---->
* originating responding
* chat client chat client
* ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
* CHAT ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
* SEND ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
* RESUME <--- I2PTunnelIRCClient <-- IRC server <-- I2TunnelIRCClient <-----
* ACCEPT ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
*
* </pre>
*
@ -35,8 +41,13 @@ import net.i2p.util.Log;
*/
public class I2PTunnelDCCServer extends I2PTunnelServer {
/** key is the server's local I2P port */
private final ConcurrentHashMap<Integer, LocalAddress> _outgoing;
private final ConcurrentHashMap<Integer, I2PSocket> _active;
/** key is the server's local I2P port */
private final ConcurrentHashMap<Integer, LocalAddress> _active;
/** key is the server's local I2P port */
private final ConcurrentHashMap<Integer, LocalAddress> _resume;
private final List<I2PSocket> _sockList;
// list of client tunnels?
private static long _id;
@ -71,6 +82,8 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
super(DUMMY, 0, sktMgr, l, notifyThis, tunnel);
_outgoing = new ConcurrentHashMap(8);
_active = new ConcurrentHashMap(8);
_resume = new ConcurrentHashMap(8);
_sockList = new CopyOnWriteArrayList();
}
/**
@ -99,8 +112,11 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
_log.warn("Incoming DCC connection for I2P port " + myPort +
" sending to " + local.ia + ':' + local.port);
Socket s = new Socket(local.ia, local.port);
new I2PTunnelRunner(s, socket, slock, null, null);
_active.put(Integer.valueOf(myPort), socket);
_sockList.add(socket);
new I2PTunnelRunner(s, socket, slock, null, _sockList);
local.socket = socket;
local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE;
_active.put(Integer.valueOf(myPort), local);
} catch (SocketException ex) {
try {
socket.close();
@ -116,6 +132,12 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
public boolean close(boolean forced) {
_outgoing.clear();
_active.clear();
for (I2PSocket s : _sockList) {
try {
s.close();
} catch (IOException ioe) {}
}
_sockList.clear();
return super.close(forced);
}
@ -128,6 +150,13 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
* @return i2p port or -1 on error
*/
public int newOutgoing(byte[] ip, int port, String type) {
return newOutgoing(ip, port, type, 0);
}
/**
* @param port local dcc server I2P port or 0 to pick one at random
*/
private int newOutgoing(byte[] ip, int port, String type, int i2pPort) {
expireOutbound();
if (_outgoing.size() >= MAX_OUTGOING_PENDING ||
_active.size() >= MAX_OUTGOING_ACTIVE) {
@ -141,9 +170,14 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
} catch (UnknownHostException uhe) {
return -1;
}
int limit = i2pPort > 0 ? 10 : 1;
LocalAddress client = new LocalAddress(ia, port, getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE);
for (int i = 0; i < 10; i++) {
int iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT);
for (int i = 0; i < limit; i++) {
int iport;
if (i2pPort > 0)
iport = i2pPort;
else
iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT);
if (_active.containsKey(Integer.valueOf(iport)))
continue;
LocalAddress old = _outgoing.putIfAbsent(Integer.valueOf(iport), client);
@ -156,6 +190,48 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
return -1;
}
/**
* An incoming RESUME request
*
* @param port local dcc server I2P port
* @return local IRC client DCC port or -1 on error
*/
public int resumeIncoming(int port) {
Integer iport = Integer.valueOf(port);
LocalAddress local = _active.remove(iport);
if (local != null) {
local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE;
_resume.put(Integer.valueOf(local.port), local);
return local.port;
}
local = _outgoing.get(iport);
if (local != null) {
// shouldn't happen
local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE;
return local.port;
}
return -1;
}
/**
* An outgoing ACCEPT response
*
* @param port local irc client DCC port
* @return local DCC server i2p port or -1 on error
*/
public int acceptOutgoing(int port) {
// do a reverse lookup
for (Iterator<Map.Entry<Integer, LocalAddress>> iter = _resume.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Integer, LocalAddress> e = iter.next();
LocalAddress local = e.getValue();
if (local.port == port) {
iter.remove();
return newOutgoing(local.ia.getAddress(), port, "ACCEPT", e.getKey().intValue());
}
}
return -1;
}
private InetAddress getListenHost(Logging l) {
try {
return InetAddress.getByName(getTunnel().listenHost);
@ -173,9 +249,10 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
if (a.expire < getTunnel().getContext().clock().now())
iter.remove();
}
for (Iterator<I2PSocket> iter = _active.values().iterator(); iter.hasNext(); ) {
I2PSocket s = iter.next();
if (s.isClosed())
for (Iterator<LocalAddress> iter = _active.values().iterator(); iter.hasNext(); ) {
LocalAddress a = iter.next();
I2PSocket s = a.socket;
if (s != null && s.isClosed())
iter.remove();
}
}
@ -183,7 +260,8 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
private static class LocalAddress {
public final InetAddress ia;
public final int port;
public final long expire;
public long expire;
public I2PSocket socket;
public LocalAddress(InetAddress a, int p, long exp) {
ia = a;