* added a way to control how large we let the buffers grow before we block,

or even whether to have the blocking action timeout and close the socket after
a certain delay
* refactored the I2PSocketOptions to be more actively used
* added a pair of ministreaming lib demo apps:
- StreamSinkServer listens to a destination and dumps any data it receives on a socket to a per-socket file
- StreamSinkClient sends a destination a specified number of random bytes, then disconnects
This commit is contained in:
jrandom
2004-08-01 18:34:02 +00:00
committed by zzz
parent 8101fa1c92
commit f85ce180ed
7 changed files with 436 additions and 12 deletions

View File

@ -32,8 +32,18 @@ public interface I2PSocket {
*/
public OutputStream getOutputStream() throws IOException;
/**
* Retrieve this socket's configuration
*/
public I2PSocketOptions getOptions();
/**
* Configure the socket
*/
public void setOptions(I2PSocketOptions options);
/**
* How long we will wait blocked on a read() operation.
* How long we will wait blocked on a read() operation. This is simply a
* helper to query the I2PSocketOptions
*
* @return milliseconds to wait, or -1 if we will wait indefinitely
*/
@ -41,7 +51,8 @@ public interface I2PSocket {
/**
* Define how long we will wait blocked on a read() operation (-1 will make
* the socket wait forever).
* the socket wait forever). This is simply a helper to adjust the
* I2PSocketOptions
*
*/
public void setReadTimeout(long ms);

View File

@ -40,6 +40,7 @@ class I2PSocketImpl implements I2PSocket {
private long _createdOn;
private long _closedOn;
private long _remoteIdSetTime;
private I2PSocketOptions _options;
private Object flagLock = new Object();
/**
@ -81,6 +82,7 @@ class I2PSocketImpl implements I2PSocket {
_createdOn = I2PAppContext.getGlobalContext().clock().now();
_remoteIdSetTime = -1;
_closedOn = -1;
_options = mgr.getDefaultOptions();
}
/**
@ -176,7 +178,21 @@ class I2PSocketImpl implements I2PSocket {
*/
public void queueData(byte[] data) {
_bytesRead += data.length;
in.queueData(data);
try {
in.queueData(data);
} catch (InterruptedIOException iie) {
if (_log.shouldLog(Log.ERROR))
_log.error("Queue overflow, closing the stream", iie);
try {
close();
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error closing the stream due to overflow", ioe);
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Connection closed while writing to the socket", ioe);
}
}
/**
@ -245,19 +261,36 @@ class I2PSocketImpl implements I2PSocket {
return (byte)(I2PSocketManager.DATA_OUT + (byte)add);
}
public void setOptions(I2PSocketOptions options) {
_options = options;
in.setReadTimeout(options.getReadTimeout());
}
public I2PSocketOptions getOptions() {
return _options;
}
/**
* What is the longest we'll block on the input stream while waiting
* for more data? If this value is exceeded, the read() throws
* InterruptedIOException
* How long we will wait blocked on a read() operation. This is simply a
* helper to query the I2PSocketOptions
*
* @return milliseconds to wait, or -1 if we will wait indefinitely
*/
public long getReadTimeout() {
return in.getReadTimeout();
return _options.getReadTimeout();
}
/**
* Define how long we will wait blocked on a read() operation (-1 will make
* the socket wait forever). This is simply a helper to adjust the
* I2PSocketOptions
*
*/
public void setReadTimeout(long ms) {
_options.setReadTimeout(ms);
in.setReadTimeout(ms);
}
public void setSocketErrorListener(SocketErrorListener lsnr) {
_socketErrorListener = lsnr;
}
@ -279,6 +312,7 @@ class I2PSocketImpl implements I2PSocket {
private class I2PInputStream extends InputStream {
private ByteCollector bc = new ByteCollector();
private boolean inStreamClosed = false;
private long readTimeout = -1;
@ -306,6 +340,7 @@ class I2PSocketImpl implements I2PSocket {
byte[] read = null;
synchronized (bc) {
read = bc.startToByteArray(len);
bc.notifyAll();
}
boolean timedOut = false;
@ -334,6 +369,7 @@ class I2PSocketImpl implements I2PSocket {
synchronized (bc) {
read = bc.startToByteArray(len);
bc.notifyAll();
}
}
if (read.length > len) throw new RuntimeException("BUG");
@ -357,14 +393,44 @@ class I2PSocketImpl implements I2PSocket {
}
}
public void queueData(byte[] data) {
/**
* Add the data to the queue
*
* @throws InterruptedIOException if the queue's buffer is full, the socket has
* a write timeout, and that timeout is exceeded
* @throws IOException if the connection was closed while queueing up the data
*/
public void queueData(byte[] data) throws InterruptedIOException, IOException {
queueData(data, 0, data.length);
}
public void queueData(byte[] data, int off, int len) {
/**
* Add the data to the queue
*
* @throws InterruptedIOException if the queue's buffer is full, the socket has
* a write timeout, and that timeout is exceeded
* @throws IOException if the connection was closed while queueing up the data
*/
public void queueData(byte[] data, int off, int len) throws InterruptedIOException, IOException {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
synchronized (bc) {
if (_options.getMaxBufferSize() > 0) {
int waited = 0;
while (bc.getCurrentSize() + len > _options.getMaxBufferSize()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Buffer size exceeded: pending " + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize());
if ( (_options.getWriteTimeout() > 0) && (waited > _options.getWriteTimeout()) ) {
throw new InterruptedIOException("Waited " + waited + "ms to write " + len + " with a buffer at " + bc.getCurrentSize());
}
if (inStreamClosed)
throw new IOException("Stream closed while writing");
try {
bc.wait(1000);
waited += 1000;
} catch (InterruptedException ie) {}
}
}
bc.append(data, off, len);
}
synchronized (I2PInputStream.this) {
@ -381,6 +447,10 @@ class I2PSocketImpl implements I2PSocket {
public void close() throws IOException {
super.close();
notifyClosed();
synchronized (bc) {
inStreamClosed = true;
bc.notifyAll();
}
}
}

View File

@ -377,7 +377,7 @@ public class I2PSocketManager implements I2PSessionListener {
*
* @throws IllegalStateException if the socket isn't open or isn't known
*/
private void sendIncoming(String id, byte payload[]) {
private void sendIncoming(String id, byte payload[]) throws IllegalStateException {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _inSockets.get(id);
@ -469,7 +469,10 @@ public class I2PSocketManager implements I2PSessionListener {
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
throw new I2PException("Error sending through I2P network");
}
remoteID = s.getRemoteID(true, options.getConnectTimeout());
if (options != null)
remoteID = s.getRemoteID(true, options.getConnectTimeout());
else
remoteID = s.getRemoteID(true, getDefaultOptions().getConnectTimeout());
if (remoteID == null) {
_context.statManager().addRateData("streaming.nackReceived", 1, 1);

View File

@ -81,6 +81,7 @@ public class I2PSocketManagerFactory {
private static I2PSocketManager createManager(I2PSession session) {
I2PSocketManager mgr = new I2PSocketManager();
mgr.setSession(session);
mgr.setDefaultOptions(new I2PSocketOptions());
return mgr;
}
}

View File

@ -6,9 +6,17 @@ package net.i2p.client.streaming;
*/
public class I2PSocketOptions {
private long _connectTimeout;
private long _readTimeout;
private long _writeTimeout;
private int _maxBufferSize;
public static final int DEFAULT_BUFFER_SIZE = 1024*128;
public I2PSocketOptions() {
_connectTimeout = -1;
_readTimeout = -1;
_writeTimeout = -1;
_maxBufferSize = DEFAULT_BUFFER_SIZE;
}
/**
@ -27,4 +35,65 @@ public class I2PSocketOptions {
public void setConnectTimeout(long ms) {
_connectTimeout = ms;
}
/**
* What is the longest we'll block on the input stream while waiting
* for more data? If this value is exceeded, the read() throws
* InterruptedIOException
*/
public long getReadTimeout() {
return _readTimeout;
}
/**
* What is the longest we'll block on the input stream while waiting
* for more data? If this value is exceeded, the read() throws
* InterruptedIOException
*/
public void setReadTimeout(long ms) {
_readTimeout = ms;
}
/**
* How much data will we accept that hasn't been written out yet. After
* this amount has been exceeded, subsequent .write calls will block until
* either some data is removed or the connection is closed. If this is
* less than or equal to zero, there is no limit (warning: can eat ram)
*
* @return buffer size limit, in bytes
*/
public int getMaxBufferSize() {
return _maxBufferSize;
}
/**
* How much data will we accept that hasn't been written out yet. After
* this amount has been exceeded, subsequent .write calls will block until
* either some data is removed or the connection is closed. If this is
* less than or equal to zero, there is no limit (warning: can eat ram)
*
*/
public void setMaxBufferSize(int numBytes) {
_maxBufferSize = numBytes;
}
/**
* What is the longest we'll block on the output stream while waiting
* for the data to flush? If this value is exceeded, the write() throws
* InterruptedIOException. If this is less than or equal to zero, there
* is no timeout.
*/
public long getWriteTimeout() {
return _writeTimeout;
}
/**
* What is the longest we'll block on the output stream while waiting
* for the data to flush? If this value is exceeded, the write() throws
* InterruptedIOException. If this is less than or equal to zero, there
* is no timeout.
*/
public void setWriteTimeout(long ms) {
_writeTimeout = ms;
}
}

View File

@ -0,0 +1,135 @@
package net.i2p.client.streaming;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.Random;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.data.Destination;
import net.i2p.data.DataFormatException;
import net.i2p.util.Log;
/**
* Simple streaming lib test app that connects to a given destination and sends
* it a particular amount of random data, then disconnects. See the {@link main}
*
*/
public class StreamSinkClient {
private Log _log;
private int _sendSize;
private int _writeDelay;
private String _peerDestFile;
/**
* Build the client but don't fire it up.
* @param sendSize how many KB to send
* @param writeDelayMs how long to wait between each .write (0 for no delay)
* @param serverDestFile file containing the StreamSinkServer's binary Destination
*/
public StreamSinkClient(int sendSize, int writeDelayMs, String serverDestFile) {
_sendSize = sendSize;
_writeDelay = writeDelayMs;
_peerDestFile = serverDestFile;
_log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkClient.class);
}
/**
* Actually connect and run the client - this call blocks until completion.
*
*/
public void runClient() {
I2PSocketManager mgr = I2PSocketManagerFactory.createManager();
Destination peer = null;
FileInputStream fis = null;
try {
fis = new FileInputStream(_peerDestFile);
peer = new Destination();
peer.readBytes(fis);
} catch (IOException ioe) {
_log.error("Error finding the peer destination to contact in " + _peerDestFile, ioe);
return;
} catch (DataFormatException dfe) {
_log.error("Peer destination is not valid in " + _peerDestFile, dfe);
return;
} finally {
if (fis == null) try { fis.close(); } catch (IOException ioe) {}
}
System.out.println("Send " + _sendSize + "KB to " + peer.calculateHash().toBase64());
try {
I2PSocket sock = mgr.connect(peer);
byte buf[] = new byte[32*1024];
Random rand = new Random();
OutputStream out = sock.getOutputStream();
long beforeSending = System.currentTimeMillis();
for (int i = 0; i < _sendSize; i+= 32) {
rand.nextBytes(buf);
out.write(buf);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Wrote " + (i+32) + "/" + _sendSize + "KB");
if (_writeDelay > 0) {
try { Thread.sleep(_writeDelay); } catch (InterruptedException ie) {}
}
}
long afterSending = System.currentTimeMillis();
System.out.println("Sent " + _sendSize + "KB in " + (afterSending-beforeSending) + "ms");
sock.close();
} catch (InterruptedIOException iie) {
_log.error("Timeout connecting to the peer", iie);
return;
} catch (NoRouteToHostException nrthe) {
_log.error("Unable to connect to the peer", nrthe);
return;
} catch (ConnectException ce) {
_log.error("Connection already dropped", ce);
return;
} catch (I2PException ie) {
_log.error("Error connecting to the peer", ie);
return;
} catch (IOException ioe) {
_log.error("IO error sending", ioe);
return;
}
}
/**
* Fire up the client. <code>Usage: StreamSinkClient sendSizeKB writeDelayMs serverDestFile</code> <br />
* <ul>
* <li><b>sendSizeKB</b>: how many KB to send</li>
* <li><b>writeDelayMs</b>: how long to wait between each .write (0 for no delay)</li>
* <li><b>serverDestFile</b>: file containing the StreamSinkServer's binary Destination</li>
* </ul>
*/
public static void main(String args[]) {
if (args.length != 3) {
System.out.println("Usage: StreamSinkClient sendSizeKB writeDelayMs serverDestFile");
} else {
int sendSizeKB = -1;
int writeDelayMs = -1;
try {
sendSizeKB = Integer.parseInt(args[0]);
} catch (NumberFormatException nfe) {
System.err.println("Send size invalid [" + args[0] + "]");
return;
}
try {
writeDelayMs = Integer.parseInt(args[1]);
} catch (NumberFormatException nfe) {
System.err.println("Write delay ms invalid [" + args[1] + "]");
return;
}
StreamSinkClient client = new StreamSinkClient(sendSizeKB, writeDelayMs, args[2]);
client.runClient();
}
}
}

View File

@ -0,0 +1,135 @@
package net.i2p.client.streaming;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Listen to a destination, receiving any sockets and writing anything they
* send to a new file.
*
*/
public class StreamSinkServer {
private Log _log;
private String _sinkDir;
private String _destFile;
/**
* Create but do not start the streaming server.
*
* @param sinkDir Directory to store received files in
* @param ourDestFile filename to write our binary destination to
*/
public StreamSinkServer(String sinkDir, String ourDestFile) {
_sinkDir = sinkDir;
_destFile = ourDestFile;
_log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
}
/**
* Actually fire up the server - this call blocks forever (or until the server
* socket closes)
*
*/
public void runServer() {
I2PSocketManager mgr = I2PSocketManagerFactory.createManager();
Destination dest = mgr.getSession().getMyDestination();
System.out.println("Listening for connections on: " + dest.calculateHash().toBase64());
FileOutputStream fos = null;
try {
fos = new FileOutputStream(_destFile);
dest.writeBytes(fos);
} catch (IOException ioe) {
_log.error("Error writing out our destination to " + _destFile, ioe);
return;
} catch (DataFormatException dfe) {
_log.error("Error formatting the destination", dfe);
return;
} finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {}
}
I2PServerSocket sock = mgr.getServerSocket();
while (true) {
try {
I2PSocket curSock = sock.accept();
handle(curSock);
} catch (I2PException ie) {
_log.error("Error accepting connection", ie);
return;
} catch (ConnectException ce) {
_log.error("Connection already dropped", ce);
return;
}
}
}
private void handle(I2PSocket socket) {
I2PThread t = new I2PThread(new ClientRunner(socket));
t.setName("Handle " + socket.getPeerDestination().calculateHash().toBase64().substring(0,4));
t.start();
}
/**
* Actually deal with a client - pull anything they send us and write it to a file.
*
*/
private class ClientRunner implements Runnable {
private I2PSocket _sock;
private FileOutputStream _fos;
public ClientRunner(I2PSocket socket) {
_sock = socket;
try {
File sink = new File(_sinkDir);
if (!sink.exists())
sink.mkdirs();
File cur = File.createTempFile("clientSink", ".dat", sink);
_fos = new FileOutputStream(cur);
} catch (IOException ioe) {
_log.error("Error creating sink", ioe);
_fos = null;
}
}
public void run() {
if (_fos == null) return;
try {
InputStream in = _sock.getInputStream();
byte buf[] = new byte[4096];
int read = 0;
while ( (read = in.read(buf)) != -1) {
_fos.write(buf, 0, read);
}
} catch (IOException ioe) {
_log.error("Error writing the sink", ioe);
} finally {
if (_fos != null) try { _fos.close(); } catch (IOException ioe) {}
}
}
}
/**
* Fire up the streaming server. <code>Usage: StreamSinkServer sinkDir ourDestFile</code><br />
* <ul>
* <li><b>sinkDir</b>: Directory to store received files in</li>
* <li><b>ourDestFile</b>: filename to write our binary destination to</li>
* </ul>
*/
public static void main(String args[]) {
if (args.length != 2) {
System.out.println("Usage: StreamSinkServer sinkDir ourDestFile");
} else {
StreamSinkServer server = new StreamSinkServer(args[0], args[1]);
server.runServer();
}
}
}