From 61749aaaa9f3d537f2bafefe9671ee42443ea2ff Mon Sep 17 00:00:00 2001 From: sponge Date: Sat, 27 Sep 2008 22:59:22 +0000 Subject: [PATCH] Added Simple true/false storage class to the utilities Added socketSoTimeout CHANGED RetransmissionTimer is now public FIXED SimpleTimer has a way to be stopped, and reap it's children CLEANUP A few javadoc additions, where I could figgure out bits CLEANUP all code that needed to catch the timeout exception for socketSoTimeout --- .../net/i2p/i2ptunnel/I2PTunnelServer.java | 3 ++ .../i2p/client/streaming/I2PServerSocket.java | 16 +++++++- .../client/streaming/I2PServerSocketImpl.java | 15 ++++++++ .../client/streaming/StreamSinkServer.java | 3 ++ .../client/streaming/ConnectionManager.java | 20 ++++++++++ .../client/streaming/I2PServerSocketFull.java | 33 +++++++++++++++-- .../streaming/I2PSocketManagerFull.java | 34 +++++++++++++++-- .../client/streaming/RetransmissionTimer.java | 2 +- core/java/src/net/i2p/util/Executor.java | 8 +++- core/java/src/net/i2p/util/SimpleTimer.java | 37 +++++++++++++++---- 10 files changed, 154 insertions(+), 17 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 014d91e9b..5d0f2a923 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -12,6 +12,7 @@ import java.net.ConnectException; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.Iterator; import java.util.Properties; @@ -219,6 +220,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { if (_log.shouldLog(Log.ERROR)) _log.error("Error accepting", ce); // not killing the server.. + } catch(SocketTimeoutException ste) { + // ignored, we never set the timeout } } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index 726d462ce..ffb125209 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -2,6 +2,7 @@ package net.i2p.client.streaming; import java.net.ConnectException; +import java.net.SocketTimeoutException; import net.i2p.I2PException; /** @@ -24,8 +25,21 @@ public interface I2PServerSocket { * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed + * @throws SocketTimeoutException */ - public I2PSocket accept() throws I2PException, ConnectException; + public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; + + /** + * Set Sock Option accept timeout + * @param x + */ + public void setSoTimeout(long x); + + /** + * Get Sock Option accept timeout + * @return timeout + */ + public long getSoTimeout(); /** * Access the manager which is coordinating the server socket diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 965ba31bf..d4734b022 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -30,6 +30,21 @@ class I2PServerSocketImpl implements I2PServerSocket { /** lock on this when adding a new socket to the pending list, and wait on it accordingly */ private Object socketAddedLock = new Object(); + /** + * Set Sock Option accept timeout stub, does nothing + * @param x + */ + public void setSoTimeout(long x) { + } + + /** + * Get Sock Option accept timeout stub, does nothing + * @return timeout + */ + public long getSoTimeout() { + return -1; + } + public I2PServerSocketImpl(I2PSocketManager mgr) { this.mgr = mgr; } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java index c8b566190..7a5c28f1c 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -5,6 +5,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.util.Properties; import net.i2p.I2PAppContext; @@ -107,6 +108,8 @@ public class StreamSinkServer { } catch (ConnectException ce) { _log.error("Connection already dropped", ce); return; + } catch(SocketTimeoutException ste) { + // ignored } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index dcc93c5ec..762260ea2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -39,6 +39,7 @@ public class ConnectionManager { private ConnectionOptions _defaultOptions; private volatile int _numWaiting; private Object _connectionLock; + private long SoTimeout; public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { _context = context; @@ -58,6 +59,9 @@ public class ConnectionManager { _maxConcurrentStreams = maxConcurrent; _defaultOptions = defaultOptions; _numWaiting = 0; + /** Socket timeout for accept() */ + SoTimeout = -1; + _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); @@ -90,6 +94,22 @@ public class ConnectionManager { return null; } + /** + * Set the socket accept() timeout. + * @param x + */ + public void MsetSoTimeout(long x) { + SoTimeout = x; + } + + /** + * Get the socket accept() timeout. + * @return + */ + public long MgetSoTimeout() { + return SoTimeout; + } + public void setAllowIncomingConnections(boolean allow) { _connectionHandler.setActive(allow); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index b1a4175f2..6b68054d1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -1,5 +1,8 @@ package net.i2p.client.streaming; +import java.net.SocketTimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; import net.i2p.I2PException; /** @@ -13,11 +16,35 @@ public class I2PServerSocketFull implements I2PServerSocket { _socketManager = mgr; } - public I2PSocket accept() throws I2PException { + /** + * + * @return + * @throws net.i2p.I2PException + * @throws SocketTimeoutException + */ + public I2PSocket accept() throws I2PException, SocketTimeoutException { return _socketManager.receiveSocket(); } - public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); } + public long getSoTimeout() { + return _socketManager.getConnectionManager().MgetSoTimeout(); + } - public I2PSocketManager getManager() { return _socketManager; } + public void setSoTimeout(long x) { + _socketManager.getConnectionManager().MsetSoTimeout(x); + } + /** + * Close the connection. + */ + public void close() { + _socketManager.getConnectionManager().setAllowIncomingConnections(false); + } + + /** + * + * @return _socketManager + */ + public I2PSocketManager getManager() { + return _socketManager; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index 7384a4972..a33aa394a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -1,6 +1,7 @@ package net.i2p.client.streaming; import java.net.NoRouteToHostException; +import java.net.SocketTimeoutException; import java.util.HashSet; import java.util.Iterator; import java.util.Properties; @@ -44,6 +45,14 @@ public class I2PSocketManagerFull implements I2PSocketManager { _context = null; _session = null; } + + /** + * + * @param context + * @param session + * @param opts + * @param name + */ public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { this(); init(context, session, opts, name); @@ -54,6 +63,11 @@ public class I2PSocketManagerFull implements I2PSocketManager { /** * + * + * @param context + * @param session + * @param opts + * @param name */ public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { _context = context; @@ -96,24 +110,38 @@ public class I2PSocketManagerFull implements I2PSocketManager { return _connectionManager; } - public I2PSocket receiveSocket() throws I2PException { + /** + * + * @return + * @throws net.i2p.I2PException + * @throws java.net.SocketTimeoutException + */ + public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException { verifySession(); - Connection con = _connectionManager.getConnectionHandler().accept(-1); - if (_log.shouldLog(Log.DEBUG)) + Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout()); + if(_log.shouldLog(Log.DEBUG)) { _log.debug("receiveSocket() called: " + con); + } if (con != null) { I2PSocketFull sock = new I2PSocketFull(con); con.setSocket(sock); return sock; } else { + if(_connectionManager.MgetSoTimeout() == -1) { return null; } + throw new SocketTimeoutException("I2PSocket timed out"); + } } /** * Ping the specified peer, returning true if they replied to the ping within * the timeout specified, false otherwise. This call blocks. * + * + * @param peer + * @param timeoutMs + * @return */ public boolean ping(Destination peer, long timeoutMs) { return _connectionManager.ping(peer, timeoutMs); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java index 0ea0c83d7..c52c373b1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java @@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer; /** * */ -class RetransmissionTimer extends SimpleTimer { +public class RetransmissionTimer extends SimpleTimer { private static final RetransmissionTimer _instance = new RetransmissionTimer(); public static final SimpleTimer getInstance() { return _instance; } protected RetransmissionTimer() { super("StreamingTimer"); } diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java index e3c1b6fbf..fb8757f4f 100644 --- a/core/java/src/net/i2p/util/Executor.java +++ b/core/java/src/net/i2p/util/Executor.java @@ -8,12 +8,16 @@ class Executor implements Runnable { private I2PAppContext _context; private Log _log; private List _readyEvents; - public Executor(I2PAppContext ctx, Log log, List events) { + private SimpleStore runn; + + public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { _context = ctx; _readyEvents = events; + runn = x; } + public void run() { - while (true) { + while(runn.getAnswer()) { SimpleTimer.TimedEvent evt = null; synchronized (_readyEvents) { if (_readyEvents.size() <= 0) diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index 9543f72c5..87aa7b4e5 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -25,9 +25,11 @@ public class SimpleTimer { /** event (TimedEvent) to event time (Long) mapping */ private Map _eventTimes; private List _readyEvents; - + private SimpleStore runn; + protected SimpleTimer() { this("SimpleTimer"); } protected SimpleTimer(String name) { + runn = new SimpleStore(true); _context = I2PAppContext.getGlobalContext(); _log = _context.logManager().getLog(SimpleTimer.class); _events = new TreeMap(); @@ -38,13 +40,28 @@ public class SimpleTimer { runner.setDaemon(true); runner.start(); for (int i = 0; i < 3; i++) { - I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents)); + I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn)); executor.setName(name + "Executor " + i); executor.setDaemon(true); executor.start(); } } + /** + * Removes the SimpleTimer. + */ + public void removeSimpleTimer() { + synchronized(_events) { + runn.setAnswer(false); + _events.notifyAll(); + } + } + + /** + * + * @param event + * @param timeoutMs + */ public void reschedule(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, false); } @@ -55,9 +72,13 @@ public class SimpleTimer { * for the earlier of the two timeouts, which may be before this stated * timeout. If this is not the desired behavior, call removeEvent first. * + * @param event + * @param timeoutMs */ public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } /** + * @param event + * @param timeoutMs * @param useEarliestTime if its already scheduled, use the earlier of the * two timeouts, else use the later */ @@ -143,12 +164,12 @@ public class SimpleTimer { private long _occurredTime; private long _occurredEventCount; - private TimedEvent _recentEvents[] = new TimedEvent[5]; - + // not used + // private TimedEvent _recentEvents[] = new TimedEvent[5]; private class SimpleTimerRunner implements Runnable { public void run() { List eventsToFire = new ArrayList(1); - while (true) { + while(runn.getAnswer()) { try { synchronized (_events) { //if (_events.size() <= 0) @@ -158,8 +179,10 @@ public class SimpleTimer { long now = System.currentTimeMillis(); long nextEventDelay = -1; Object nextEvent = null; - while (true) { - if (_events.size() <= 0) break; + while(runn.getAnswer()) { + if(_events.size() <= 0) { + break; + } Long when = (Long)_events.firstKey(); if (when.longValue() <= now) { TimedEvent evt = (TimedEvent)_events.remove(when);