Added Simple true/false storage class to the utilities

Added socketSoTimeout
CHANGED RetransmissionTimer is now public
FIXED SimpleTimer has a way to be stopped, and reap it's children
FIXED Lots of javadoc additions, where I could
CLEANUP all code that needed to catch the timeout exception for socketSoTimeout
This commit is contained in:
sponge
2008-09-25 23:59:01 +00:00
parent ee2fd32a97
commit dd7d993631
13 changed files with 540 additions and 146 deletions

View File

@ -2,6 +2,7 @@ package net.i2p.client.streaming;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import net.i2p.I2PException;
/**
@ -9,6 +10,7 @@ import net.i2p.I2PException;
*
*/
public interface I2PServerSocket {
/**
* Closes the socket.
*/
@ -24,8 +26,21 @@ public interface I2PServerSocket {
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
* @throws SocketTimeoutException
*/
public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
/**
* Set Sock Option accept timeout
* @param x
*/
public void setSoTimeout(long x);
/**
* Get Sock Option accept timeout
* @return timeout
*/
public I2PSocket accept() throws I2PException, ConnectException;
public long getSoTimeout();
/**
* Access the manager which is coordinating the server socket

View File

@ -17,19 +17,33 @@ import net.i2p.util.Log;
*
*/
class I2PServerSocketImpl implements I2PServerSocket {
private final static Log _log = new Log(I2PServerSocketImpl.class);
private I2PSocketManager mgr;
/** list of sockets waiting for the client to accept them */
private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
/** have we been closed */
private volatile boolean closing = false;
/** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
private Object socketAcceptedLock = new Object();
/** lock on this when adding a new socket to the pending list, and wait on it accordingly */
private Object socketAddedLock = new Object();
/**
* Set Sock Option accept timeout stub, does nothing
* @param x
*/
public void setSoTimeout(long x) {
}
/**
* Get Sock Option accept timeout stub, does nothing
* @return timeout
*/
public long getSoTimeout() {
return -1;
}
public I2PServerSocketImpl(I2PSocketManager mgr) {
this.mgr = mgr;
}
@ -47,19 +61,22 @@ class I2PServerSocketImpl implements I2PServerSocket {
* @throws ConnectException if the I2PServerSocket is closed
*/
public I2PSocket accept() throws I2PException, ConnectException {
if (_log.shouldLog(Log.DEBUG))
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("accept() called, pending: " + pendingSockets.size());
}
I2PSocket ret = null;
while ( (ret == null) && (!closing) ){
while (pendingSockets.size() <= 0) {
if (closing) throw new ConnectException("I2PServerSocket closed");
if(closing) {
throw new ConnectException("I2PServerSocket closed");
}
try {
synchronized(socketAddedLock) {
socketAddedLock.wait();
}
} catch (InterruptedException ie) {}
} catch(InterruptedException ie) {
}
}
synchronized (pendingSockets) {
if (pendingSockets.size() > 0) {
@ -73,8 +90,9 @@ class I2PServerSocketImpl implements I2PServerSocket {
}
}
if (_log.shouldLog(Log.DEBUG))
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("TIMING: handed out accept result " + ret.hashCode());
}
return ret;
}
@ -88,12 +106,13 @@ class I2PServerSocketImpl implements I2PServerSocket {
* or the socket was closed
*/
public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
if (_log.shouldLog(Log.DEBUG))
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
}
if (closing) {
if (_log.shouldLog(Log.WARN))
if(_log.shouldLog(Log.WARN)) {
_log.warn("Already closing the socket");
}
return false;
}
@ -110,14 +129,16 @@ class I2PServerSocketImpl implements I2PServerSocket {
while (pendingSockets.contains(s)) {
long now = clock.now();
if (now >= end) {
if (_log.shouldLog(Log.INFO))
if(_log.shouldLog(Log.INFO)) {
_log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
}
pendingSockets.remove(s);
return false;
}
if (closing) {
if (_log.shouldLog(Log.WARN))
if(_log.shouldLog(Log.WARN)) {
_log.warn("Server socket closed while waiting for accept");
}
pendingSockets.remove(s);
return false;
}
@ -126,11 +147,13 @@ class I2PServerSocketImpl implements I2PServerSocket {
synchronized (socketAcceptedLock) {
socketAcceptedLock.wait(remaining);
}
} catch (InterruptedException ie) {}
} catch(InterruptedException ie) {
}
}
long now = clock.now();
if (_log.shouldLog(Log.DEBUG))
if(_log.shouldLog(Log.DEBUG)) {
_log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
}
return true;
}
@ -146,5 +169,7 @@ class I2PServerSocketImpl implements I2PServerSocket {
}
}
public I2PSocketManager getManager() { return mgr; }
public I2PSocketManager getManager() {
return mgr;
}
}

View File

@ -5,6 +5,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.Properties;
import net.i2p.I2PAppContext;
@ -20,6 +21,7 @@ import net.i2p.util.Log;
*
*/
public class StreamSinkServer {
private Log _log;
private String _sinkDir;
private String _destFile;
@ -36,6 +38,7 @@ public class StreamSinkServer {
public StreamSinkServer(String sinkDir, String ourDestFile) {
this(sinkDir, ourDestFile, null, -1, 3);
}
public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
_sinkDir = sinkDir;
_destFile = ourDestFile;
@ -52,13 +55,15 @@ public class StreamSinkServer {
*/
public void runServer() {
I2PSocketManager mgr = null;
if (_i2cpHost != null)
if(_i2cpHost != null) {
mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
else
} else {
mgr = I2PSocketManagerFactory.createManager();
}
Destination dest = mgr.getSession().getMyDestination();
if (_log.shouldLog(Log.INFO))
if(_log.shouldLog(Log.INFO)) {
_log.info("Listening for connections on: " + dest.calculateHash().toBase64());
}
FileOutputStream fos = null;
try {
fos = new FileOutputStream(_destFile);
@ -70,7 +75,12 @@ public class StreamSinkServer {
_log.error("Error formatting the destination", dfe);
return;
} finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {}
if(fos != null) {
try {
fos.close();
} catch(IOException ioe) {
}
}
}
I2PServerSocket sock = mgr.getServerSocket();
@ -91,22 +101,28 @@ public class StreamSinkServer {
*
*/
private class ClientRunner implements Runnable {
private I2PServerSocket _socket;
public ClientRunner(I2PServerSocket socket) {
_socket = socket;
}
public void run() {
while (true) {
try {
I2PSocket socket = _socket.accept();
if (socket != null)
if(socket != null) {
handle(socket);
}
} catch (I2PException ie) {
_log.error("Error accepting connection", ie);
return;
} catch (ConnectException ce) {
_log.error("Connection already dropped", ce);
return;
} catch(SocketTimeoutException ste) {
// ignored
}
}
}
@ -115,12 +131,14 @@ public class StreamSinkServer {
FileOutputStream fos = null;
try {
File sink = new File(_sinkDir);
if (!sink.exists())
if(!sink.exists()) {
sink.mkdirs();
}
File cur = File.createTempFile("clientSink", ".dat", sink);
fos = new FileOutputStream(cur);
if (_log.shouldLog(Log.DEBUG))
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("Writing to " + cur.getAbsolutePath());
}
} catch (IOException ioe) {
_log.error("Error creating sink", ioe);
return;
@ -135,17 +153,28 @@ public class StreamSinkServer {
while ( (read = in.read(buf)) != -1) {
//_fos.write(buf, 0, read);
written += read;
if (_log.shouldLog(Log.DEBUG))
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("read and wrote " + read + " (" + written + ")");
}
}
fos.write(("written: [" + written + "]\n").getBytes());
long lifetime = System.currentTimeMillis() - start;
_log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
} catch (IOException ioe) {
_log.error("Error writing the sink", ioe);
} finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {}
if (sock != null) try { sock.close(); } catch (IOException ioe) {}
if(fos != null) {
try {
fos.close();
} catch(IOException ioe) {
}
}
if(sock != null) {
try {
sock.close();
} catch(IOException ioe) {
}
}
_log.debug("Client socket closed");
}
}
@ -174,7 +203,8 @@ public class StreamSinkServer {
if (args.length == 5) {
try {
handlers = Integer.parseInt(args[4]);
} catch (NumberFormatException nfe) {}
} catch(NumberFormatException nfe) {
}
}
try {
int port = Integer.parseInt(args[1]);
@ -186,7 +216,8 @@ public class StreamSinkServer {
default:
System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
}
if (server != null)
if(server != null) {
server.runServer();
}
}
}