* Streaming:

- Add new real sockets for easier porting of apps.
        See http://zzz.i2p/topics/792 for info.
        Untested.
      - de-SpongeCase
      - Javadoc
This commit is contained in:
zzz
2011-01-05 16:41:41 +00:00
parent 532c9d3fc5
commit 226cb7fdb9
9 changed files with 637 additions and 21 deletions

View File

@ -4,6 +4,7 @@ package net.i2p.client.streaming;
* Like a StringBuffer, but for bytes. This class is not internally synchronized,
* so care should be taken when using in a multithreaded environment.
*
* @deprecated Only used by deprecated I2PSocketImpl
*/
class ByteCollector {
byte[] contents;
@ -294,4 +295,4 @@ class ByteCollector {
size = 0;
return bb;
}
}
}

View File

@ -4,9 +4,12 @@
*/
package net.i2p.client.streaming;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Properties;
import java.util.Set;
@ -84,7 +87,7 @@ public interface I2PSocketManager {
*
* @return a set of currently connected I2PSockets
*/
public Set listSockets();
public Set<I2PSocket> listSockets();
/**
* Ping the specified peer, returning true if they replied to the ping within
@ -107,4 +110,25 @@ public interface I2PSocketManager {
public static interface DisconnectListener {
public void sessionDisconnected();
}
/**
* Like getServerSocket but returns a real ServerSocket for easier porting of apps.
* @since 0.8.4
*/
public ServerSocket getStandardServerSocket() throws IOException;
/**
* Like connect() but returns a real Socket, and throws only IOE,
* for easier porting of apps.
* @since 0.8.4
*/
public Socket connectToSocket(Destination peer) throws IOException;
/**
* Like connect() but returns a real Socket, and throws only IOE,
* for easier porting of apps.
* @param timeout ms if > 0, forces blocking (disables connectDelay)
* @since 0.8.4
*/
public Socket connectToSocket(Destination peer, int timeout) throws IOException;
}

View File

@ -10,6 +10,8 @@ import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -461,6 +463,14 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
return _serverSocket;
}
/**
* @throws UnsupportedOperationException
* @since 0.8.4
*/
public ServerSocket getStandardServerSocket() {
throw new UnsupportedOperationException();
}
/**
* Create a new connected socket (block until the socket is created)
*
@ -601,6 +611,22 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
return connect(peer, null);
}
/**
* @throws UnsupportedOperationException
* @since 0.8.4
*/
public Socket connectToSocket(Destination peer) {
throw new UnsupportedOperationException();
}
/**
* @throws UnsupportedOperationException
* @since 0.8.4
*/
public Socket connectToSocket(Destination peer, int timeout) {
throw new UnsupportedOperationException();
}
/**
* Destroy the socket manager, freeing all the associated resources. This
* method will block untill all the managed sockets are closed.
@ -660,7 +686,7 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
* Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
*
*/
public Set listSockets() {
public Set<I2PSocket> listSockets() {
Set<I2PSocket> sockets = new HashSet<I2PSocket>(8);
synchronized (lock) {
sockets.addAll(_inSockets.values());

View File

@ -40,7 +40,7 @@ class ConnectionManager {
private int _maxConcurrentStreams;
private ConnectionOptions _defaultOptions;
private volatile int _numWaiting;
private long SoTimeout;
private long _soTimeout;
private ConnThrottler _minuteThrottler;
private ConnThrottler _hourThrottler;
private ConnThrottler _dayThrottler;
@ -64,7 +64,7 @@ class ConnectionManager {
_allowIncoming = false;
_numWaiting = 0;
/** Socket timeout for accept() */
SoTimeout = -1;
_soTimeout = -1;
_context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
@ -97,16 +97,16 @@ class ConnectionManager {
* Set the socket accept() timeout.
* @param x
*/
public void MsetSoTimeout(long x) {
SoTimeout = x;
public void setSoTimeout(long x) {
_soTimeout = x;
}
/**
* Get the socket accept() timeout.
* @return accept timeout in ms.
*/
public long MgetSoTimeout() {
return SoTimeout;
public long getSoTimeout() {
return _soTimeout;
}
public void setAllowIncomingConnections(boolean allow) {

View File

@ -26,11 +26,11 @@ class I2PServerSocketFull implements I2PServerSocket {
}
public long getSoTimeout() {
return _socketManager.getConnectionManager().MgetSoTimeout();
return _socketManager.getConnectionManager().getSoTimeout();
}
public void setSoTimeout(long x) {
_socketManager.getConnectionManager().MsetSoTimeout(x);
_socketManager.getConnectionManager().setSoTimeout(x);
}
/**
* Close the connection.

View File

@ -46,6 +46,10 @@ class I2PSocketFull implements I2PSocket {
Connection getConnection() { return _connection; }
/**
* Warning, may return null instead of throwing IOE,
* which is not what the interface says.
*/
public InputStream getInputStream() {
Connection c = _connection;
if (c != null)
@ -62,6 +66,10 @@ class I2PSocketFull implements I2PSocket {
return null;
}
/**
* Warning, may return null instead of throwing IOE,
* which is not what the interface says.
*/
public OutputStream getOutputStream() throws IOException {
Connection c = _connection;
if (c != null)

View File

@ -1,6 +1,9 @@
package net.i2p.client.streaming;
import java.io.IOException;
import java.net.NoRouteToHostException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Iterator;
@ -30,6 +33,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private Log _log;
private I2PSession _session;
private I2PServerSocketFull _serverSocket;
private StandardServerSocket _realServerSocket;
private ConnectionOptions _defaultOptions;
private long _acceptTimeout;
private String _name;
@ -44,8 +48,6 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
public I2PSocketManagerFull() {
_context = null;
_session = null;
}
/**
@ -120,7 +122,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
*/
public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException {
verifySession();
Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout());
Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.getSoTimeout());
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("receiveSocket() called: " + con);
}
@ -129,7 +131,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
con.setSocket(sock);
return sock;
} else {
if(_connectionManager.MgetSoTimeout() == -1) {
if(_connectionManager.getSoTimeout() == -1) {
return null;
}
throw new SocketTimeoutException("I2PSocket timed out");
@ -171,6 +173,17 @@ public class I2PSocketManagerFull implements I2PSocketManager {
return _serverSocket;
}
/**
* Like getServerSocket but returns a real ServerSocket for easier porting of apps.
* @since 0.8.4
*/
public synchronized ServerSocket getStandardServerSocket() throws IOException {
if (_realServerSocket == null)
_realServerSocket = new StandardServerSocket(_serverSocket);
_connectionManager.setAllowIncomingConnections(true);
return _realServerSocket;
}
private void verifySession() throws I2PException {
if (!_connectionManager.getSession().isClosed())
return;
@ -185,7 +198,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* this data will be bundled in the SYN packet.
*
* @param peer Destination to connect to
* @param options I2P socket options to be used for connecting
* @param options I2P socket options to be used for connecting, may be null
*
* @return I2PSocket if successful
* @throws NoRouteToHostException if the peer is not found or not reachable
@ -235,6 +248,45 @@ public class I2PSocketManagerFull implements I2PSocketManager {
return connect(peer, _defaultOptions);
}
/**
* Like connect() but returns a real Socket, and throws only IOE,
* for easier porting of apps.
* @since 0.8.4
*/
public Socket connectToSocket(Destination peer) throws IOException {
return connectToSocket(peer, _defaultOptions);
}
/**
* Like connect() but returns a real Socket, and throws only IOE,
* for easier porting of apps.
* @param timeout ms if > 0, forces blocking (disables connectDelay)
* @since 0.8.4
*/
public Socket connectToSocket(Destination peer, int timeout) throws IOException {
ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
opts.setConnectTimeout(timeout);
if (timeout > 0)
opts.setConnectDelay(-1);
return connectToSocket(peer, opts);
}
/**
* Like connect() but returns a real Socket, and throws only IOE,
* for easier porting of apps.
* @param options may be null
* @since 0.8.4
*/
private Socket connectToSocket(Destination peer, I2PSocketOptions options) throws IOException {
try {
I2PSocket sock = connect(peer, options);
return new StandardSocket(sock);
} catch (I2PException i2pe) {
// fixme in 1.6 change to cause
throw new IOException(i2pe.toString());
}
}
/**
* Destroy the socket manager, freeing all the associated resources. This
* method will block untill all the managed sockets are closed.
@ -259,11 +311,10 @@ public class I2PSocketManagerFull implements I2PSocketManager {
*
* @return set of currently connected I2PSockets
*/
public Set listSockets() {
Set connections = _connectionManager.listConnections();
Set rv = new HashSet(connections.size());
for (Iterator iter = connections.iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
public Set<I2PSocket> listSockets() {
Set<Connection> connections = _connectionManager.listConnections();
Set<I2PSocket> rv = new HashSet(connections.size());
for (Connection con : connections) {
if (con.getSocket() != null)
rv.add(con.getSocket());
}

View File

@ -0,0 +1,168 @@
package net.i2p.client.streaming;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.ServerSocketChannel;
import net.i2p.I2PException;
/**
* Bridge to I2PServerSocket.
*
* This extends ServerSocket to make porting apps easier.
* accept() returns a real Socket (a StandardSocket).
* accept() throws IOExceptions like ServerSockets do, rather than returning
* null or throwing I2PExceptions.
*
* StandardServerSockets are always bound.
* You may not create an unbound StandardServerSocket.
* Create this through the SocketManager.
*
* @since 0.8.4
*/
class StandardServerSocket extends ServerSocket {
private final I2PServerSocketFull _socket;
/**
* Doesn't really throw IOE but super() does
*/
StandardServerSocket(I2PServerSocketFull socket) throws IOException {
_socket = socket;
}
public Socket accept() throws IOException {
try {
I2PSocket sock = _socket.accept();
if (sock == null)
throw new IOException("No socket");
return new StandardSocket(sock);
} catch (I2PException i2pe) {
// fixme in 1.6 change to cause
throw new IOException(i2pe.toString());
}
}
/**
* @throws UnsupportedOperationException always
*/
@Override
public void bind(SocketAddress endpoint) {
throw new UnsupportedOperationException();
}
/**
* @throws UnsupportedOperationException always
*/
@Override
public void bind(SocketAddress endpoint, int backlog) {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
if (isClosed())
throw new IOException("Already closed");
_socket.close();
}
/**
* @return null always
*/
@Override
public ServerSocketChannel getChannel() {
return null;
}
/**
* @return null always
*/
@Override
public InetAddress getInetAddress() {
return null;
}
/**
* @return -1 always
*/
@Override
public int getLocalPort() {
return -1;
}
/**
* @return null always
*/
@Override
public SocketAddress getLocalSocketAddress() {
return null;
}
@Override
public int getReceiveBufferSize() {
ConnectionOptions opts = (ConnectionOptions) ((I2PSocketManagerFull)_socket.getManager()).getDefaultOptions();
if (opts == null)
return 64*1024;
return opts.getInboundBufferSize();
}
/**
* @return false always
*/
@Override
public boolean getReuseAddress() {
return false;
}
@Override
public int getSoTimeout() {
return (int) _socket.getSoTimeout();
}
/**
* @return true always
*/
@Override
public boolean isBound() {
return true;
}
@Override
public boolean isClosed() {
return ((I2PSocketManagerFull)_socket.getManager()).getConnectionManager().getAllowIncomingConnections();
}
/**
* Does nothing.
*/
@Override
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
}
/**
* Does nothing.
*/
@Override
public void setReceiveBufferSize(int size) {
}
/**
* Does nothing.
*/
@Override
public void setReuseAddress(boolean on) {
}
@Override
public void setSoTimeout(int timeout) throws SocketException {
_socket.setSoTimeout(timeout);
}
@Override
public String toString() {
return _socket.toString();
}
}

View File

@ -0,0 +1,338 @@
package net.i2p.client.streaming;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SocketChannel;
import net.i2p.I2PException;
/**
* Bridge to I2PSocket.
*
* This extends Socket to make porting apps easier.
* Methods throw IOExceptions like Sockets do, rather than returning
* null for some methods.
*
* StandardSockets are always bound, and always start out connected
* (unless connectDelay is > 0).
* You may not create an unbound StandardSocket.
* Create this through the SocketManager.
*
* @since 0.8.4
*/
class StandardSocket extends Socket {
private final I2PSocket _socket;
StandardSocket(I2PSocket socket) {
_socket = socket;
}
/**
* @throws UnsupportedOperationException always
*/
@Override
public void bind(SocketAddress bindpoint) {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
if (_socket.isClosed())
throw new IOException("Already closed");
_socket.close();
}
/**
* @throws UnsupportedOperationException always
*/
@Override
public void connect(SocketAddress endpoint) {
throw new UnsupportedOperationException();
}
/**
* @throws UnsupportedOperationException always
*/
@Override
public void connect(SocketAddress endpoint, int timeout) {
throw new UnsupportedOperationException();
}
/**
* @return null always
*/
@Override
public SocketChannel getChannel() {
return null;
}
/**
* @return null always
*/
@Override
public InetAddress getInetAddress() {
return null;
}
@Override
public InputStream getInputStream() throws IOException {
InputStream rv = _socket.getInputStream();
if (rv != null)
return rv;
throw new IOException("No stream");
}
@Override
public boolean getKeepAlive() {
ConnectionOptions opts = (ConnectionOptions) _socket.getOptions();
if (opts == null)
return false;
return opts.getInactivityAction() == ConnectionOptions.INACTIVITY_ACTION_SEND;
}
/**
* @return null always
*/
@Override
public InetAddress getLocalAddress() {
return null;
}
/**
* @return -1 always
*/
@Override
public int getLocalPort() {
return -1;
}
/**
* @return null always
*/
@Override
public SocketAddress getLocalSocketAddress() {
return null;
}
/**
* @return false always
*/
@Override
public boolean getOOBInline() {
return false;
}
@Override
public OutputStream getOutputStream() throws IOException {
OutputStream rv = _socket.getOutputStream();
if (rv != null)
return rv;
throw new IOException("Mo stream");
}
/**
* @return 0 always
*/
@Override
public int getPort() {
return 0;
}
@Override
public int getReceiveBufferSize() {
ConnectionOptions opts = (ConnectionOptions) _socket.getOptions();
if (opts == null)
return 64*1024;
return opts.getInboundBufferSize();
}
/**
* @throws UnsupportedOperationException always
*/
@Override
public SocketAddress getRemoteSocketAddress() {
throw new UnsupportedOperationException();
}
/**
* @return false always
*/
@Override
public boolean getReuseAddress() {
return false;
}
@Override
public int getSendBufferSize() {
ConnectionOptions opts = (ConnectionOptions) _socket.getOptions();
if (opts == null)
return 64*1024;
return opts.getInboundBufferSize();
}
@Override
public int getSoLinger() {
I2PSocketOptions opts = _socket.getOptions();
if (opts == null)
return -1;
return -1; // fixme really?
}
@Override
public int getSoTimeout() {
I2PSocketOptions opts = _socket.getOptions();
if (opts == null)
return 0;
return (int) opts.getReadTimeout();
}
/**
* @return false always
*/
@Override
public boolean getTcpNoDelay() {
// No option yet. See ConnectionDataReceiver
return false;
}
/**
* @return 0 always
*/
@Override
public int getTrafficClass() {
return 0;
}
/**
* @return true always
*/
@Override
public boolean isBound() {
return true;
}
@Override
public boolean isClosed() {
return _socket.isClosed();
}
@Override
public boolean isConnected() {
return !_socket.isClosed();
}
@Override
public boolean isInputShutdown() {
return _socket.isClosed();
}
@Override
public boolean isOutputShutdown() {
return _socket.isClosed();
}
/**
* @throws UnsupportedOperationException always
*/
@Override
public void sendUrgentData(int data) {
throw new UnsupportedOperationException();
}
@Override
public void setKeepAlive(boolean on) {
ConnectionOptions opts = (ConnectionOptions) _socket.getOptions();
if (opts == null)
return;
if (on)
opts.setInactivityAction(ConnectionOptions.INACTIVITY_ACTION_SEND);
else
opts.setInactivityAction(ConnectionOptions.INACTIVITY_ACTION_NOOP); // DISCONNECT?
}
/**
* @throws UnsupportedOperationException if on is true
*/
@Override
public void setOOBInline(boolean on) {
if (on)
throw new UnsupportedOperationException();
}
/**
* Does nothing.
*/
@Override
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
}
/**
* Does nothing.
*/
@Override
public void setReceiveBufferSize(int size) {
}
/**
* Does nothing.
*/
@Override
public void setReuseAddress(boolean on) {
}
/**
* Does nothing.
*/
@Override
public void setSendBufferSize(int size) {
}
/**
* Does nothing.
*/
@Override
public void setSoLinger(boolean on, int linger) {
}
@Override
public void setSoTimeout(int timeout) throws SocketException {
I2PSocketOptions opts = _socket.getOptions();
if (opts == null)
throw new SocketException("No options");
opts.setReadTimeout(timeout);
}
/**
* Does nothing.
*/
@Override
public void setTcpNoDelay(boolean on) {
}
/**
* Does nothing.
*/
@Override
public void setTrafficClass(int tc) {
}
@Override
public void shutdownInput() throws IOException {
close();
}
@Override
public void shutdownOutput() throws IOException {
close();
}
@Override
public String toString() {
return _socket.toString();
}
}