* I2CP: Implement an internal "socket" class that

allows clients in the same JVM to connect to the
      router without going through the kernel
This commit is contained in:
zzz
2009-12-04 00:20:43 +00:00
parent 9f7bd99051
commit 7262c014c0
9 changed files with 584 additions and 20 deletions

View File

@ -39,6 +39,7 @@ import net.i2p.data.i2cp.I2CPMessageReader;
import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.SessionId;
import net.i2p.util.I2PThread;
import net.i2p.util.InternalSocket;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
@ -268,12 +269,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
long startConnect = _context.clock().now();
try {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "connect begin to " + _hostname + ":" + _portNum);
_socket = new Socket(_hostname, _portNum);
// If we are in the router JVM, connect using the interal pseudo-socket
_socket = InternalSocket.getSocket(_hostname, _portNum);
// _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
_out = _socket.getOutputStream();
synchronized (_out) {
_out.write(I2PClient.PROTOCOL_BYTE);
_out.flush();
}
InputStream in = _socket.getInputStream();
_reader = new I2CPMessageReader(in, this);

View File

@ -20,12 +20,12 @@ import net.i2p.data.Destination;
* just used to talk to the router.
*/
public class I2PSimpleClient implements I2PClient {
/** Don't do this */
/** @deprecated Don't do this */
public Destination createDestination(OutputStream destKeyStream) throws I2PException, IOException {
return null;
}
/** or this */
/** @deprecated or this */
public Destination createDestination(OutputStream destKeyStream, Certificate cert) throws I2PException, IOException {
return null;
}

View File

@ -20,6 +20,7 @@ import net.i2p.data.i2cp.DestReplyMessage;
import net.i2p.data.i2cp.GetBandwidthLimitsMessage;
import net.i2p.data.i2cp.I2CPMessageReader;
import net.i2p.util.I2PThread;
import net.i2p.util.InternalSocket;
/**
* Create a new session for doing naming and bandwidth queries only. Do not create a Destination.
@ -71,10 +72,12 @@ class I2PSimpleSession extends I2PSessionImpl2 {
notifier.start();
try {
_socket = new Socket(_hostname, _portNum);
// If we are in the router JVM, connect using the interal pseudo-socket
_socket = InternalSocket.getSocket(_hostname, _portNum);
_out = _socket.getOutputStream();
synchronized (_out) {
_out.write(I2PClient.PROTOCOL_BYTE);
_out.flush();
}
InputStream in = _socket.getInputStream();
_reader = new I2CPMessageReader(in, this);

View File

@ -0,0 +1,199 @@
package net.i2p.util;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
/**
* A simple in-JVM ServerSocket using Piped Streams.
* We use port numbers just like regular sockets.
* Can only be connected by InternalSocket.
*/
public class InternalServerSocket extends ServerSocket {
private static final ConcurrentHashMap<Integer, InternalServerSocket> _sockets = new ConcurrentHashMap(4);
private BlockingQueue<InternalSocket> _acceptQueue;
private Integer _port;
private boolean _running;
private static Log _log = I2PAppContext.getGlobalContext().logManager().getLog(InternalServerSocket.class);
public InternalServerSocket(int port) throws IOException {
if (port <= 0)
throw new IOException("Bad port: " + port);
_port = Integer.valueOf(port);
InternalServerSocket previous = _sockets.putIfAbsent(_port, this);
if (previous != null)
throw new IOException("Internal port in use: " + port);
_running = true;
_acceptQueue = new LinkedBlockingQueue();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Registered " + _port);
}
@Override
public void close() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Closing " + _port);
_running = false;
_sockets.remove(_port);
_acceptQueue.clear();
try {
// use null streams as a poison
_acceptQueue.put(new InternalSocket(null, null));
} catch (InterruptedException ie) {}
}
@Override
public Socket accept() throws IOException {
InternalSocket serverSock = null;
while (_running) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Accepting " + _port);
try {
serverSock = _acceptQueue.take();
} catch (InterruptedException ie) {
continue;
}
if (serverSock.getInputStream() == null) // poison
throw new IOException("closed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Accepted " + _port);
break;
}
return serverSock;
}
@Override
public String toString() {
return ("Internal server socket on port " + _port);
}
/**
* This is how the client connects.
*
* Todo: Java 1.5 PipedInputStream buffers are only 1024 bytes; our I2CP messages are typically 1730 bytes,
* thus causing thread blockage before the whole message is transferred.
* We can specify buffer size in 1.6 but not in 1.5.
* Does wrapping the PipedOutputStreams in BufferedOutputStreams gain capacity?
* No?
*/
static void internalConnect(int port, InternalSocket clientSock) throws IOException {
InternalServerSocket iss = _sockets.get(Integer.valueOf(port));
if (iss == null)
throw new IOException("No server for port: " + port);
PipedInputStream cis = new BigPipedInputStream();
PipedInputStream sis = new BigPipedInputStream();
PipedOutputStream cos = new PipedOutputStream(sis);
PipedOutputStream sos = new PipedOutputStream(cis);
clientSock.setInputStream(cis);
clientSock.setOutputStream(cos);
iss.queueConnection(new InternalSocket(sis, sos));
}
/**
* Until we switch to Java 1.6
* http://javatechniques.com/blog/low-memory-deep-copy-technique-for-java-objects/
*/
private static class BigPipedInputStream extends PipedInputStream {
protected static int PIPE_SIZE = 64*1024;
public BigPipedInputStream() {
super();
buffer = new byte[PIPE_SIZE];
}
}
private void queueConnection(InternalSocket sock) throws IOException {
if (!_running)
throw new IOException("Server closed for port: " + _port);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Queueing " + _port);
try {
_acceptQueue.put(sock);
} catch (InterruptedException ie) {}
}
@Override
public int getLocalPort() {
return _port.intValue();
}
// ignored stuff
/** warning - unsupported */
@Override
public void setSoTimeout(int timeout) {}
@Override
public int getSoTimeout () {
return 0;
}
// everything below here unsupported
/** @deprecated unsupported */
@Override
public void bind(SocketAddress endpoint) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void bind(SocketAddress endpoint, int backlog) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public ServerSocketChannel getChannel() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public InetAddress getInetAddress() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public SocketAddress getLocalSocketAddress() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public int getReceiveBufferSize() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean getReuseAddress() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean isBound() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean isClosed() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setReceiveBufferSize(int size) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setReuseAddress(boolean on) {
throw new IllegalArgumentException("unsupported");
}
}

View File

@ -0,0 +1,259 @@
package net.i2p.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
/**
* A simple in-JVM Socket using Piped Streams.
* We use port numbers just like regular sockets.
* Can only connect to InternalServerSocket.
*/
public class InternalSocket extends Socket {
private InputStream _is;
private OutputStream _os;
/** server side */
InternalSocket(InputStream is, OutputStream os) {
_is = is;
_os = os;
}
/** client side */
public InternalSocket(int port) throws IOException {
if (port <= 0)
throw new IOException("bad port number");
InternalServerSocket.internalConnect(port, this);
}
/**
* Convenience method to return either a Socket or an InternalSocket
*/
public static Socket getSocket(String host, int port) throws IOException {
if (System.getProperty("router.version") != null &&
(host.equals("127.0.0.1") || host.equals("localhost"))) {
return new InternalSocket(port);
} else {
return new Socket(host, port);
}
}
@Override
public InputStream getInputStream() {
return _is;
}
@Override
public OutputStream getOutputStream() {
return _os;
}
void setInputStream(InputStream is) {
_is = is;
}
void setOutputStream(OutputStream os) {
_os = os;
}
@Override
public void close() {
try {
if (_is != null) _is.close();
} catch (IOException ie) {}
try {
if (_os != null) _os.close();
} catch (IOException ie) {}
}
@Override
public String toString() {
return ("Internal socket");
}
// ignored stuff
/** warning - unsupported */
@Override
public void setSoTimeout(int timeout) {}
@Override
public int getSoTimeout () {
return 0;
}
// everything below here unsupported
/** @deprecated unsupported */
@Override
public void bind(SocketAddress endpoint) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void connect(SocketAddress endpoint) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void connect(SocketAddress endpoint, int timeout) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public SocketChannel getChannel() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public InetAddress getInetAddress() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean getKeepAlive() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public InetAddress getLocalAddress() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public int getLocalPort() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public SocketAddress getLocalSocketAddress() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean getOOBInline() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public int getPort() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public int getReceiveBufferSize() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public SocketAddress getRemoteSocketAddress() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean getReuseAddress() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public int getSendBufferSize() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public int getSoLinger() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean getTcpNoDelay() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public int getTrafficClass() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean isBound() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean isClosed() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean isConnected() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean isInputShutdown() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public boolean isOutputShutdown() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void sendUrgentData(int data) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setKeepAlive(boolean on) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setOOBInline(boolean on) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setReceiveBufferSize(int size) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setReuseAddress(boolean on) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setSendBufferSize(int size) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setSoLinger(boolean on, int linger) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setTcpNoDelay(boolean on) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void setTrafficClass(int cize) {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void shutdownInput() {
throw new IllegalArgumentException("unsupported");
}
/** @deprecated unsupported */
@Override
public void shutdownOutput() {
throw new IllegalArgumentException("unsupported");
}
}