propagate from branch 'i2p.i2p' (head 3d405c867f6903bf1d69b04c1daebf3146882525)

to branch 'i2p.i2p.zzz.test4' (head bfd85b10fdd1542526a4b9c53e5d4a733087f317)
This commit is contained in:
zzz
2010-12-15 15:09:48 +00:00
72 changed files with 2147 additions and 736 deletions

View File

@ -161,7 +161,8 @@ public class I2NPMessageReader {
cancelRunner();
}
}
if (!_doRun) {
// ??? unused
if (_stayAlive && !_doRun) {
// pause .5 secs when we're paused
try { Thread.sleep(500); } catch (InterruptedException ie) {}
}

View File

@ -395,10 +395,8 @@ public class JobQueue {
for (int i = _queueRunners.size(); i < numThreads; i++) {
JobQueueRunner runner = new JobQueueRunner(_context, i);
_queueRunners.put(Integer.valueOf(i), runner);
Thread t = new I2PThread(runner);
t.setName("JobQueue"+(_runnerId++));
Thread t = new I2PThread(runner, "JobQueue " + (++_runnerId) + '/' + numThreads, false);
//t.setPriority(I2PThread.MAX_PRIORITY-1);
t.setDaemon(false);
t.start();
}
} else if (_queueRunners.size() == numThreads) {

View File

@ -1281,11 +1281,7 @@ public class Router {
*/
private void beginMarkingLiveliness() {
File f = getPingFile();
// not an I2PThread for context creation issues
Thread t = new Thread(new MarkLiveliness(_context, this, f));
t.setName("Mark router liveliness");
t.setDaemon(true);
t.start();
SimpleScheduler.getInstance().addPeriodicEvent(new MarkLiveliness(this, f), 0, LIVELINESS_DELAY);
}
public static final String PROP_BANDWIDTH_SHARE_PERCENTAGE = "router.sharePercentage";
@ -1523,22 +1519,24 @@ private static class UpdateRoutingKeyModifierJob extends JobImpl {
}
}
private static class MarkLiveliness implements Runnable {
private RouterContext _context;
/**
* Write a timestamp to the ping file where the wrapper can see it
*/
private static class MarkLiveliness implements SimpleTimer.TimedEvent {
private Router _router;
private File _pingFile;
public MarkLiveliness(RouterContext ctx, Router router, File pingFile) {
_context = ctx;
public MarkLiveliness(Router router, File pingFile) {
_router = router;
_pingFile = pingFile;
}
public void run() {
_pingFile.deleteOnExit();
do {
}
public void timeReached() {
if (_router.isAlive())
ping();
try { Thread.sleep(Router.LIVELINESS_DELAY); } catch (InterruptedException ie) {}
} while (_router.isAlive());
_pingFile.delete();
else
_pingFile.delete();
}
private void ping() {

View File

@ -6,6 +6,7 @@ import java.util.Properties;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
import net.i2p.internal.InternalClientManager;
import net.i2p.router.client.ClientManagerFacadeImpl;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.router.peermanager.Calculator;
@ -34,7 +35,7 @@ import net.i2p.util.KeyRing;
*/
public class RouterContext extends I2PAppContext {
private Router _router;
private ClientManagerFacade _clientManagerFacade;
private ClientManagerFacadeImpl _clientManagerFacade;
private ClientMessagePool _clientMessagePool;
private JobQueue _jobQueue;
private InNetMessagePool _inNetMessagePool;
@ -106,10 +107,12 @@ public class RouterContext extends I2PAppContext {
}
public void initAll() {
if ("false".equals(getProperty("i2p.dummyClientFacade", "false")))
_clientManagerFacade = new ClientManagerFacadeImpl(this);
else
_clientManagerFacade = new DummyClientManagerFacade(this);
if (getBooleanProperty("i2p.dummyClientFacade"))
System.err.println("i2p.dummpClientFacade currently unsupported");
_clientManagerFacade = new ClientManagerFacadeImpl(this);
// removed since it doesn't implement InternalClientManager for now
//else
// _clientManagerFacade = new DummyClientManagerFacade(this);
_clientMessagePool = new ClientMessagePool(this);
_jobQueue = new JobQueue(this);
_inNetMessagePool = new InNetMessagePool(this);
@ -395,4 +398,13 @@ public class RouterContext extends I2PAppContext {
public boolean isRouterContext() {
return true;
}
/**
* Use this to connect to the router in the same JVM.
* @return the client manager
* @since 0.8.3
*/
public InternalClientManager internalClientManager() {
return _clientManagerFacade;
}
}

View File

@ -50,9 +50,9 @@ import net.i2p.util.SimpleTimer;
*
* @author jrandom
*/
public class ClientConnectionRunner {
class ClientConnectionRunner {
private Log _log;
private RouterContext _context;
protected final RouterContext _context;
private ClientManager _manager;
/** socket for this particular peer connection */
private Socket _socket;
@ -71,7 +71,7 @@ public class ClientConnectionRunner {
/** set of messageIds created but not yet ACCEPTED */
private Set<MessageId> _acceptedPending;
/** thingy that does stuff */
private I2CPMessageReader _reader;
protected I2CPMessageReader _reader;
/** just for this destination */
private SessionKeyManager _sessionKeyManager;
/**
@ -109,7 +109,7 @@ public class ClientConnectionRunner {
*/
public void startRunning() {
try {
_reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this));
_reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this, true));
_writer = new ClientWriterRunner(_context, this);
I2PThread t = new I2PThread(_writer);
t.setName("I2CP Writer " + ++__id);
@ -469,18 +469,8 @@ public class ClientConnectionRunner {
_log.warn("Error sending I2CP message - client went away", eofe);
stopRunning();
} catch (IOException ioe) {
// only warn if client went away
int level;
String emsg;
if (ioe.getMessage() != null && ioe.getMessage().startsWith("Pipe closed")) {
level = Log.WARN;
emsg = "Error sending I2CP message - client went away";
} else {
level = Log.ERROR;
emsg = "IO Error sending I2CP message to client";
}
if (_log.shouldLog(level))
_log.log(level, emsg, ioe);
if (_log.shouldLog(Log.ERROR))
_log.error("IO Error sending I2CP message to client", ioe);
stopRunning();
} catch (Throwable t) {
_log.log(Log.CRIT, "Unhandled exception sending I2CP message to client", t);

View File

@ -24,13 +24,13 @@ import net.i2p.util.Log;
*
* @author jrandom
*/
public class ClientListenerRunner implements Runnable {
protected Log _log;
protected RouterContext _context;
protected ClientManager _manager;
class ClientListenerRunner implements Runnable {
protected final Log _log;
protected final RouterContext _context;
protected final ClientManager _manager;
protected ServerSocket _socket;
protected int _port;
private boolean _bindAllInterfaces;
protected final int _port;
protected final boolean _bindAllInterfaces;
protected boolean _running;
protected boolean _listening;
@ -38,18 +38,33 @@ public class ClientListenerRunner implements Runnable {
public ClientListenerRunner(RouterContext context, ClientManager manager, int port) {
_context = context;
_log = _context.logManager().getLog(ClientListenerRunner.class);
_log = _context.logManager().getLog(getClass());
_manager = manager;
_port = port;
String val = context.getProperty(BIND_ALL_INTERFACES);
_bindAllInterfaces = Boolean.valueOf(val).booleanValue();
_bindAllInterfaces = context.getBooleanProperty(BIND_ALL_INTERFACES);
}
public void setPort(int port) { _port = port; }
public int getPort() { return _port; }
public boolean isListening() { return _running && _listening; }
/**
* Get a ServerSocket.
* Split out so it can be overridden for SSL.
* @since 0.8.3
*/
protected ServerSocket getServerSocket() throws IOException {
if (_bindAllInterfaces) {
if (_log.shouldLog(Log.INFO))
_log.info("Listening on port " + _port + " on all interfaces");
return new ServerSocket(_port);
} else {
String listenInterface = _context.getProperty(ClientManagerFacadeImpl.PROP_CLIENT_HOST,
ClientManagerFacadeImpl.DEFAULT_HOST);
if (_log.shouldLog(Log.INFO))
_log.info("Listening on port " + _port + " of the specific interface: " + listenInterface);
return new ServerSocket(_port, 0, InetAddress.getByName(listenInterface));
}
}
/**
* Start up the socket listener, listens for connections, and
* fires those connections off via {@link #runConnection runConnection}.
@ -62,18 +77,7 @@ public class ClientListenerRunner implements Runnable {
int curDelay = 1000;
while (_running) {
try {
if (_bindAllInterfaces) {
if (_log.shouldLog(Log.INFO))
_log.info("Listening on port " + _port + " on all interfaces");
_socket = new ServerSocket(_port);
} else {
String listenInterface = _context.getProperty(ClientManagerFacadeImpl.PROP_CLIENT_HOST,
ClientManagerFacadeImpl.DEFAULT_HOST);
if (_log.shouldLog(Log.INFO))
_log.info("Listening on port " + _port + " of the specific interface: " + listenInterface);
_socket = new ServerSocket(_port, 0, InetAddress.getByName(listenInterface));
}
_socket = getServerSocket();
if (_log.shouldLog(Log.DEBUG))
_log.debug("ServerSocket created, before accept: " + _socket);
@ -131,7 +135,8 @@ public class ClientListenerRunner implements Runnable {
}
/** give the i2cp client 5 seconds to show that they're really i2cp clients */
private final static int CONNECT_TIMEOUT = 5*1000;
protected final static int CONNECT_TIMEOUT = 5*1000;
private final static int LOOP_DELAY = 250;
/**
* Verify the first byte.
@ -141,16 +146,17 @@ public class ClientListenerRunner implements Runnable {
protected boolean validate(Socket socket) {
try {
InputStream is = socket.getInputStream();
for (int i = 0; i < 20; i++) {
for (int i = 0; i < CONNECT_TIMEOUT / LOOP_DELAY; i++) {
if (is.available() > 0)
return is.read() == I2PClient.PROTOCOL_BYTE;
try { Thread.sleep(250); } catch (InterruptedException ie) {}
try { Thread.sleep(LOOP_DELAY); } catch (InterruptedException ie) {}
}
} catch (IOException ioe) {}
if (_log.shouldLog(Log.WARN))
_log.warn("Peer did not authenticate themselves as I2CP quickly enough, dropping");
return false;
}
/**
* Handle the connection by passing it off to a {@link ClientConnectionRunner ClientConnectionRunner}
*

View File

@ -15,7 +15,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.client.I2PSessionException;
import net.i2p.crypto.SessionKeyManager;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
@ -23,8 +25,10 @@ import net.i2p.data.Hash;
import net.i2p.data.LeaseSet;
import net.i2p.data.Payload;
import net.i2p.data.TunnelId;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.SessionConfig;
import net.i2p.internal.I2CPMessageQueue;
import net.i2p.router.ClientManagerFacade;
import net.i2p.router.ClientMessage;
import net.i2p.router.Job;
@ -39,13 +43,18 @@ import net.i2p.util.Log;
*
* @author jrandom
*/
public class ClientManager {
private Log _log;
class ClientManager {
private final Log _log;
private ClientListenerRunner _listener;
private ClientListenerRunner _internalListener;
private final HashMap<Destination, ClientConnectionRunner> _runners; // Destination --> ClientConnectionRunner
private final Set<ClientConnectionRunner> _pendingRunners; // ClientConnectionRunner for clients w/out a Dest yet
private RouterContext _ctx;
private final RouterContext _ctx;
private boolean _isStarted;
/** Disable external interface, allow internal clients only @since 0.8.3 */
private static final String PROP_DISABLE_EXTERNAL = "i2cp.disableInterface";
/** SSL interface (only) @since 0.8.3 */
private static final String PROP_ENABLE_SSL = "i2cp.SSL";
/** ms to wait before rechecking for inbound messages to deliver to clients */
private final static int INBOUND_POLL_INTERVAL = 300;
@ -53,10 +62,10 @@ public class ClientManager {
public ClientManager(RouterContext context, int port) {
_ctx = context;
_log = context.logManager().getLog(ClientManager.class);
_ctx.statManager().createRateStat("client.receiveMessageSize",
"How large are messages received by the client?",
"ClientMessages",
new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
//_ctx.statManager().createRateStat("client.receiveMessageSize",
// "How large are messages received by the client?",
// "ClientMessages",
// new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_runners = new HashMap();
_pendingRunners = new HashSet();
startListeners(port);
@ -64,16 +73,16 @@ public class ClientManager {
/** Todo: Start a 3rd listener for IPV6? */
private void startListeners(int port) {
_listener = new ClientListenerRunner(_ctx, this, port);
Thread t = new I2PThread(_listener);
t.setName("ClientListener:" + port);
t.setDaemon(true);
t.start();
_internalListener = new InternalClientListenerRunner(_ctx, this, port);
t = new I2PThread(_internalListener);
t.setName("ClientListener:" + port + "-i");
t.setDaemon(true);
t.start();
if (!_ctx.getBooleanProperty(PROP_DISABLE_EXTERNAL)) {
// there's no option to start both an SSL and non-SSL listener
if (_ctx.getBooleanProperty(PROP_ENABLE_SSL))
_listener = new SSLClientListenerRunner(_ctx, this, port);
else
_listener = new ClientListenerRunner(_ctx, this, port);
Thread t = new I2PThread(_listener, "ClientListener:" + port, true);
t.start();
}
_isStarted = true;
}
public void restart() {
@ -95,9 +104,10 @@ public class ClientManager {
}
public void shutdown() {
_isStarted = false;
_log.info("Shutting down the ClientManager");
_listener.stopListening();
_internalListener.stopListening();
if (_listener != null)
_listener.stopListening();
Set<ClientConnectionRunner> runners = new HashSet();
synchronized (_runners) {
for (Iterator<ClientConnectionRunner> iter = _runners.values().iterator(); iter.hasNext();) {
@ -117,7 +127,28 @@ public class ClientManager {
}
}
public boolean isAlive() { return _listener.isListening(); }
/**
* The InternalClientManager interface.
* Connects to the router, receiving a message queue to talk to the router with.
* @throws I2PSessionException if the router isn't ready
* @since 0.8.3
*/
public I2CPMessageQueue internalConnect() throws I2PSessionException {
if (!_isStarted)
throw new I2PSessionException("Router client manager is shut down");
// for now we make these unlimited size
LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue();
LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue();
I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out);
I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in);
ClientConnectionRunner runner = new QueuedClientConnectionRunner(_ctx, this, myQueue);
registerConnection(runner);
return hisQueue;
}
public boolean isAlive() {
return _isStarted && (_listener == null || _listener.isListening());
}
public void registerConnection(ClientConnectionRunner runner) {
synchronized (_pendingRunners) {
@ -469,8 +500,8 @@ public class ClientManager {
runner = getRunner(_msg.getDestinationHash());
if (runner != null) {
_ctx.statManager().addRateData("client.receiveMessageSize",
_msg.getPayload().getSize(), 0);
//_ctx.statManager().addRateData("client.receiveMessageSize",
// _msg.getPayload().getSize(), 0);
runner.receiveMessage(_msg.getDestination(), null, _msg.getPayload());
} else {
// no client connection...

View File

@ -14,6 +14,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import net.i2p.client.I2PSessionException;
import net.i2p.crypto.SessionKeyManager;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
@ -21,6 +22,8 @@ import net.i2p.data.Hash;
import net.i2p.data.LeaseSet;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.SessionConfig;
import net.i2p.internal.I2CPMessageQueue;
import net.i2p.internal.InternalClientManager;
import net.i2p.router.ClientManagerFacade;
import net.i2p.router.ClientMessage;
import net.i2p.router.Job;
@ -32,7 +35,7 @@ import net.i2p.util.Log;
*
* @author jrandom
*/
public class ClientManagerFacadeImpl extends ClientManagerFacade {
public class ClientManagerFacadeImpl extends ClientManagerFacade implements InternalClientManager {
private final static Log _log = new Log(ClientManagerFacadeImpl.class);
private ClientManager _manager;
private RouterContext _context;
@ -220,4 +223,16 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade {
else
return Collections.EMPTY_SET;
}
/**
* The InternalClientManager interface.
* Connect to the router, receiving a message queue to talk to the router with.
* @throws I2PSessionException if the router isn't ready
* @since 0.8.3
*/
public I2CPMessageQueue connect() throws I2PSessionException {
if (_manager != null)
return _manager.internalConnect();
throw new I2PSessionException("No manager yet");
}
}

View File

@ -42,14 +42,19 @@ import net.i2p.util.RandomSource;
*
*/
class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventListener {
private Log _log;
private RouterContext _context;
private ClientConnectionRunner _runner;
private final Log _log;
private final RouterContext _context;
private final ClientConnectionRunner _runner;
private final boolean _enforceAuth;
public ClientMessageEventListener(RouterContext context, ClientConnectionRunner runner) {
/**
* @param enforceAuth set false for in-JVM, true for socket access
*/
public ClientMessageEventListener(RouterContext context, ClientConnectionRunner runner, boolean enforceAuth) {
_context = context;
_log = _context.logManager().getLog(ClientMessageEventListener.class);
_runner = runner;
_enforceAuth = enforceAuth;
_context.statManager().createRateStat("client.distributeTime", "How long it took to inject the client message into the router", "ClientMessages", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
}
@ -153,10 +158,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
}
// Auth, since 0.8.2
// In-JVM accesses have access to the same context properties, so
// they will be set on the client side... therefore we don't need to pass in
// some indication of (socket instanceof InternalSocket)
if (Boolean.valueOf(_context.getProperty("i2cp.auth")).booleanValue()) {
if (_enforceAuth && Boolean.valueOf(_context.getProperty("i2cp.auth")).booleanValue()) {
String configUser = _context.getProperty("i2cp.username");
String configPW = _context.getProperty("i2cp.password");
if (configUser != null && configPW != null) {

View File

@ -8,6 +8,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageImpl;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.internal.PoisonI2CPMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
@ -52,7 +53,7 @@ class ClientWriterRunner implements Runnable {
public void stopWriting() {
_messagesToWrite.clear();
try {
_messagesToWrite.put(new PoisonMessage());
_messagesToWrite.put(new PoisonI2CPMessage());
} catch (InterruptedException ie) {}
}
@ -64,23 +65,9 @@ class ClientWriterRunner implements Runnable {
} catch (InterruptedException ie) {
continue;
}
if (msg.getType() == PoisonMessage.MESSAGE_TYPE)
if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE)
break;
_runner.writeMessage(msg);
}
}
/**
* End-of-stream msg used to stop the concurrent queue
* See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
*
*/
private static class PoisonMessage extends I2CPMessageImpl {
public static final int MESSAGE_TYPE = 999999;
public int getType() {
return MESSAGE_TYPE;
}
public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
}
}

View File

@ -0,0 +1,57 @@
package net.i2p.router.client;
import java.util.concurrent.BlockingQueue;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.internal.I2CPMessageQueue;
/**
* Contains the methods to talk to a router or client via I2CP,
* when both are in the same JVM.
* This interface contains methods to access two queues,
* one for transmission and one for receiving.
* The methods are identical to those in java.util.concurrent.BlockingQueue
*
* @author zzz
* @since 0.8.3
*/
class I2CPMessageQueueImpl extends I2CPMessageQueue {
private final BlockingQueue<I2CPMessage> _in;
private final BlockingQueue<I2CPMessage> _out;
public I2CPMessageQueueImpl(BlockingQueue<I2CPMessage> in, BlockingQueue<I2CPMessage> out) {
_in = in;
_out = out;
}
/**
* Send a message, nonblocking
* @return success (false if no space available)
*/
public boolean offer(I2CPMessage msg) {
return _out.offer(msg);
}
/**
* Receive a message, nonblocking
* @return message or null if none available
*/
public I2CPMessage poll() {
return _in.poll();
}
/**
* Send a message, blocking until space is available
*/
public void put(I2CPMessage msg) throws InterruptedException {
_out.put(msg);
}
/**
* Receive a message, blocking until one is available
* @return message
*/
public I2CPMessage take() throws InterruptedException {
return _in.take();
}
}

View File

@ -1,89 +0,0 @@
package net.i2p.router.client;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.IOException;
import java.net.Socket;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.util.InternalServerSocket;
/**
* Listen for in-JVM connections on the internal "socket"
*
* @author zzz
* @since 0.7.9
*/
public class InternalClientListenerRunner extends ClientListenerRunner {
public InternalClientListenerRunner(RouterContext context, ClientManager manager, int port) {
super(context, manager, port);
_log = _context.logManager().getLog(InternalClientListenerRunner.class);
}
/**
* Start up the socket listener, listens for connections, and
* fires those connections off via {@link #runConnection runConnection}.
* This only returns if the socket cannot be opened or there is a catastrophic
* failure.
*
*/
public void runServer() {
try {
if (_log.shouldLog(Log.INFO))
_log.info("Listening on internal port " + _port);
_socket = new InternalServerSocket(_port);
if (_log.shouldLog(Log.DEBUG))
_log.debug("InternalServerSocket created, before accept: " + _socket);
_listening = true;
_running = true;
while (_running) {
try {
Socket socket = _socket.accept();
if (validate(socket)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Internal connection received");
runConnection(socket);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Refused connection from " + socket.getInetAddress());
try {
socket.close();
} catch (IOException ioe) {}
}
} catch (IOException ioe) {
if (_context.router().isAlive())
_log.error("Server error accepting", ioe);
} catch (Throwable t) {
if (_context.router().isAlive())
_log.error("Fatal error running client listener - killing the thread!", t);
_listening = false;
return;
}
}
} catch (IOException ioe) {
if (_context.router().isAlive())
_log.error("Error listening on internal port " + _port, ioe);
}
_listening = false;
if (_socket != null) {
try { _socket.close(); } catch (IOException ioe) {}
_socket = null;
}
if (_context.router().isAlive())
_log.error("CANCELING I2CP LISTEN", new Exception("I2CP Listen cancelled!!!"));
_running = false;
}
}

View File

@ -0,0 +1,76 @@
package net.i2p.router.client;
import java.io.IOException;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.internal.I2CPMessageQueue;
import net.i2p.internal.QueuedI2CPMessageReader;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Zero-copy in-JVM.
* While super() starts both a reader and a writer thread, we only need a reader thread here.
*
* @author zzz
* @since 0.8.3
*/
class QueuedClientConnectionRunner extends ClientConnectionRunner {
private final I2CPMessageQueue queue;
/**
* Create a new runner with the given queues
*
*/
public QueuedClientConnectionRunner(RouterContext context, ClientManager manager, I2CPMessageQueue queue) {
super(context, manager, null);
this.queue = queue;
}
/**
* Starts the reader thread. Does not call super().
*/
@Override
public void startRunning() {
_reader = new QueuedI2CPMessageReader(this.queue, new ClientMessageEventListener(_context, this, false));
_reader.startReading();
}
/**
* Calls super() to stop the reader, and sends a poison message to the client.
*/
@Override
void stopRunning() {
super.stopRunning();
queue.close();
}
/**
* In super(), doSend queues it to the writer thread and
* the writer thread calls writeMessage() to write to the output stream.
* Since we have no writer thread this shouldn't happen.
*/
@Override
void writeMessage(I2CPMessage msg) {
throw new RuntimeException("huh?");
}
/**
* Actually send the I2CPMessage to the client.
* Nonblocking.
*/
@Override
void doSend(I2CPMessage msg) throws I2CPMessageException {
// This will never fail, for now, as the router uses unbounded queues
// Perhaps in the future we may want to use bounded queues,
// with non-blocking writes for the router
// and blocking writes for the client?
boolean success = queue.offer(msg);
if (!success)
throw new I2CPMessageException("I2CP write to queue failed");
}
}

View File

@ -0,0 +1,282 @@
package net.i2p.router.client;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.net.ServerSocket;
import java.security.KeyStore;
import java.security.GeneralSecurityException;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.Arrays;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLContext;
import net.i2p.client.I2PClient;
import net.i2p.data.Base32;
import net.i2p.data.Base64;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.util.SecureDirectory;
import net.i2p.util.SecureFileOutputStream;
import net.i2p.util.ShellCommand;
/**
* SSL version of ClientListenerRunner
*
* @since 0.8.3
* @author zzz
*/
class SSLClientListenerRunner extends ClientListenerRunner {
private SSLServerSocketFactory _factory;
private static final String PROP_KEYSTORE_PASSWORD = "i2cp.keystorePassword";
private static final String DEFAULT_KEYSTORE_PASSWORD = "changeit";
private static final String PROP_KEY_PASSWORD = "i2cp.keyPassword";
private static final String KEY_ALIAS = "i2cp";
private static final String ASCII_KEYFILE = "i2cp.local.crt";
public SSLClientListenerRunner(RouterContext context, ClientManager manager, int port) {
super(context, manager, port);
}
/**
* @return success if it exists and we have a password, or it was created successfully.
*/
private boolean verifyKeyStore(File ks) {
if (ks.exists()) {
boolean rv = _context.getProperty(PROP_KEY_PASSWORD) != null;
if (!rv)
_log.error("I2CP SSL error, must set " + PROP_KEY_PASSWORD + " in " +
(new File(_context.getConfigDir(), "router.config")).getAbsolutePath());
return rv;
}
File dir = ks.getParentFile();
if (!dir.exists()) {
File sdir = new SecureDirectory(dir.getAbsolutePath());
if (!sdir.mkdir())
return false;
}
boolean rv = createKeyStore(ks);
// Now read it back out of the new keystore and save it in ascii form
// where the clients can get to it.
// Failure of this part is not fatal.
if (rv)
exportCert(ks);
return rv;
}
/**
* Call out to keytool to create a new keystore with a keypair in it.
* Trying to do this programatically is a nightmare, requiring either BouncyCastle
* libs or using proprietary Sun libs, and it's a huge mess.
* If successful, stores the keystore password and key password in router.config.
*
* @return success
*/
private boolean createKeyStore(File ks) {
// make a random 48 character password (30 * 8 / 5)
byte[] rand = new byte[30];
_context.random().nextBytes(rand);
String keyPassword = Base32.encode(rand);
// and one for the cname
_context.random().nextBytes(rand);
String cname = Base32.encode(rand) + ".i2cp.i2p.net";
String keytool = (new File(System.getProperty("java.home"), "bin/keytool")).getAbsolutePath();
String[] args = new String[] {
keytool,
"-genkey", // -genkeypair preferred in newer keytools, but this works with more
"-storetype", KeyStore.getDefaultType(),
"-keystore", ks.getAbsolutePath(),
"-storepass", DEFAULT_KEYSTORE_PASSWORD,
"-alias", KEY_ALIAS,
"-dname", "CN=" + cname + ",OU=I2CP,O=I2P Anonymous Network,L=XX,ST=XX,C=XX",
"-validity", "3652", // 10 years
"-keyalg", "DSA",
"-keysize", "1024",
"-keypass", keyPassword};
boolean success = (new ShellCommand()).executeSilentAndWaitTimed(args, 30); // 30 secs
if (success) {
success = ks.exists();
if (success) {
SecureFileOutputStream.setPerms(ks);
_context.router().setConfigSetting(PROP_KEYSTORE_PASSWORD, DEFAULT_KEYSTORE_PASSWORD);
_context.router().setConfigSetting(PROP_KEY_PASSWORD, keyPassword);
_context.router().saveConfig();
}
}
if (success) {
_log.logAlways(Log.INFO, "Created self-signed certificate for " + cname + " in keystore: " + ks.getAbsolutePath() + "\n" +
"The certificate name was generated randomly, and is not associated with your " +
"IP address, host name, router identity, or destination keys.");
} else {
_log.error("Failed to create I2CP SSL keystore using command line:");
StringBuilder buf = new StringBuilder(256);
for (int i = 0; i < args.length; i++) {
buf.append('"').append(args[i]).append("\" ");
}
_log.error(buf.toString());
_log.error("This is for the Sun/Oracle keytool, others may be incompatible.\n" +
"If you create the keystore manually, you must add " + PROP_KEYSTORE_PASSWORD + " and " + PROP_KEY_PASSWORD +
" to " + (new File(_context.getConfigDir(), "router.config")).getAbsolutePath());
}
return success;
}
/**
* Pull the cert back OUT of the keystore and save it as ascii
* so the clients can get to it.
*/
private void exportCert(File ks) {
File sdir = new SecureDirectory(_context.getConfigDir(), "certificates");
if (sdir.exists() || sdir.mkdir()) {
try {
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
InputStream fis = new FileInputStream(ks);
String ksPass = _context.getProperty(PROP_KEYSTORE_PASSWORD, DEFAULT_KEYSTORE_PASSWORD);
keyStore.load(fis, ksPass.toCharArray());
fis.close();
Certificate cert = keyStore.getCertificate(KEY_ALIAS);
if (cert != null) {
File certFile = new File(sdir, ASCII_KEYFILE);
saveCert(cert, certFile);
} else {
_log.error("Error getting SSL cert to save as ASCII");
}
} catch (GeneralSecurityException gse) {
_log.error("Error saving ASCII SSL keys", gse);
} catch (IOException ioe) {
_log.error("Error saving ASCII SSL keys", ioe);
}
} else {
_log.error("Error saving ASCII SSL keys");
}
}
private static final int LINE_LENGTH = 64;
/**
* Modified from:
* http://www.exampledepot.com/egs/java.security.cert/ExportCert.html
*
* Write a certificate to a file in base64 format.
*/
private void saveCert(Certificate cert, File file) {
OutputStream os = null;
try {
// Get the encoded form which is suitable for exporting
byte[] buf = cert.getEncoded();
os = new SecureFileOutputStream(file);
PrintWriter wr = new PrintWriter(os);
wr.println("-----BEGIN CERTIFICATE-----");
String b64 = Base64.encode(buf, true); // true = use standard alphabet
for (int i = 0; i < b64.length(); i += LINE_LENGTH) {
wr.println(b64.substring(i, Math.min(i + LINE_LENGTH, b64.length())));
}
wr.println("-----END CERTIFICATE-----");
wr.flush();
} catch (CertificateEncodingException cee) {
_log.error("Error writing X509 Certificate " + file.getAbsolutePath(), cee);
} catch (IOException ioe) {
_log.error("Error writing X509 Certificate " + file.getAbsolutePath(), ioe);
} finally {
try { if (os != null) os.close(); } catch (IOException foo) {}
}
}
/**
* Sets up the SSLContext and sets the socket factory.
* @return success
*/
private boolean initializeFactory(File ks) {
String ksPass = _context.getProperty(PROP_KEYSTORE_PASSWORD, DEFAULT_KEYSTORE_PASSWORD);
String keyPass = _context.getProperty(PROP_KEY_PASSWORD);
if (keyPass == null) {
_log.error("No key password, set " + PROP_KEY_PASSWORD +
" in " + (new File(_context.getConfigDir(), "router.config")).getAbsolutePath());
return false;
}
try {
SSLContext sslc = SSLContext.getInstance("TLS");
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
InputStream fis = new FileInputStream(ks);
keyStore.load(fis, ksPass.toCharArray());
fis.close();
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, keyPass.toCharArray());
sslc.init(kmf.getKeyManagers(), null, _context.random());
_factory = sslc.getServerSocketFactory();
return true;
} catch (GeneralSecurityException gse) {
_log.error("Error loading SSL keys", gse);
} catch (IOException ioe) {
_log.error("Error loading SSL keys", ioe);
}
return false;
}
/**
* Get a SSLServerSocket.
*/
@Override
protected ServerSocket getServerSocket() throws IOException {
ServerSocket rv;
if (_bindAllInterfaces) {
if (_log.shouldLog(Log.INFO))
_log.info("Listening on port " + _port + " on all interfaces");
rv = _factory.createServerSocket(_port);
} else {
String listenInterface = _context.getProperty(ClientManagerFacadeImpl.PROP_CLIENT_HOST,
ClientManagerFacadeImpl.DEFAULT_HOST);
if (_log.shouldLog(Log.INFO))
_log.info("Listening on port " + _port + " of the specific interface: " + listenInterface);
rv = _factory.createServerSocket(_port, 0, InetAddress.getByName(listenInterface));
}
return rv;
}
/**
* Create (if necessary) and load the key store, then run.
*/
@Override
public void runServer() {
File keyStore = new File(_context.getConfigDir(), "keystore/i2cp.ks");
if (verifyKeyStore(keyStore) && initializeFactory(keyStore)) {
super.runServer();
} else {
_log.error("SSL I2CP server error - Failed to create or open key store");
}
}
/**
* Overridden because SSL handshake may need more time,
* and available() in super doesn't work.
* The handshake doesn't start until a read().
*/
@Override
protected boolean validate(Socket socket) {
try {
InputStream is = socket.getInputStream();
int oldTimeout = socket.getSoTimeout();
socket.setSoTimeout(4 * CONNECT_TIMEOUT);
boolean rv = is.read() == I2PClient.PROTOCOL_BYTE;
socket.setSoTimeout(oldTimeout);
return rv;
} catch (IOException ioe) {}
if (_log.shouldLog(Log.WARN))
_log.warn("Peer did not authenticate themselves as I2CP quickly enough, dropping");
return false;
}
}

View File

@ -45,6 +45,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
_context = context;
_log = _context.logManager().getLog(CommSystemFacadeImpl.class);
_manager = null;
_context.statManager().createRateStat("transport.getBidsJobTime", "How long does it take?", "Transport", new long[] { 10*60*1000l });
startGeoIP();
}
@ -131,7 +132,9 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public void processMessage(OutNetMessage msg) {
//GetBidsJob j = new GetBidsJob(_context, this, msg);
//j.runJob();
long before = _context.clock().now();
GetBidsJob.getBids(_context, this, msg);
_context.statManager().addRateData("transport.getBidsJobTime", _context.clock().now() - before, 0);
}
@Override

View File

@ -24,22 +24,27 @@ import net.i2p.util.Log;
* @author zzz
*/
public class NTCPSendFinisher {
private static final int THREADS = 4;
private static final int MIN_THREADS = 1;
private static final int MAX_THREADS = 4;
private final I2PAppContext _context;
private final NTCPTransport _transport;
private final Log _log;
private int _count;
private static int _count;
private ThreadPoolExecutor _executor;
private static int _threads;
public NTCPSendFinisher(I2PAppContext context, NTCPTransport transport) {
_context = context;
_log = _context.logManager().getLog(NTCPSendFinisher.class);
_transport = transport;
_context.statManager().createRateStat("ntcp.sendFinishTime", "How long to queue and excecute msg.afterSend()", "ntcp", new long[] {5*1000});
}
public void start() {
_count = 0;
_executor = new CustomThreadPoolExecutor();
long maxMemory = Runtime.getRuntime().maxMemory();
_threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024))));
_executor = new CustomThreadPoolExecutor(_threads);
}
public void stop() {
@ -57,18 +62,18 @@ public class NTCPSendFinisher {
}
// not really needed for now but in case we want to add some hooks like afterExecute()
private class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor() {
private static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int num) {
// use unbounded queue, so maximumPoolSize and keepAliveTime have no effect
super(THREADS, THREADS, 1000, TimeUnit.MILLISECONDS,
super(num, num, 1000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(), new CustomThreadFactory());
}
}
private class CustomThreadFactory implements ThreadFactory {
private static class CustomThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread rv = Executors.defaultThreadFactory().newThread(r);
rv.setName("NTCPSendFinisher " + (++_count) + '/' + THREADS);
rv.setName("NTCPSendFinisher " + (++_count) + '/' + _threads);
rv.setDaemon(true);
return rv;
}
@ -78,15 +83,18 @@ public class NTCPSendFinisher {
* Call afterSend() for the message
*/
private class RunnableEvent implements Runnable {
private OutNetMessage _msg;
private final OutNetMessage _msg;
private final long _queued;
public RunnableEvent(OutNetMessage msg) {
_msg = msg;
_queued = _context.clock().now();
}
public void run() {
try {
_transport.afterSend(_msg, true, false, _msg.getSendTime());
_context.statManager().addRateData("ntcp.sendFinishTime", _context.clock().now() - _queued, 0);
} catch (Throwable t) {
_log.log(Log.CRIT, " wtf, afterSend borked", t);
}

View File

@ -433,8 +433,10 @@ public class NTCPTransport extends TransportImpl {
return skews;
}
private static final int NUM_CONCURRENT_READERS = 3;
private static final int NUM_CONCURRENT_WRITERS = 3;
private static final int MIN_CONCURRENT_READERS = 2; // unless < 32MB
private static final int MIN_CONCURRENT_WRITERS = 2; // unless < 32MB
private static final int MAX_CONCURRENT_READERS = 4;
private static final int MAX_CONCURRENT_WRITERS = 4;
/**
* Called by TransportManager.
@ -449,12 +451,8 @@ public class NTCPTransport extends TransportImpl {
if (_pumper.isAlive())
return _myAddress != null ? _myAddress.toRouterAddress() : null;
if (_log.shouldLog(Log.WARN)) _log.warn("Starting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
_reader.startReading(NUM_CONCURRENT_READERS);
_writer.startWriting(NUM_CONCURRENT_WRITERS);
startIt();
configureLocalAddress();
return bindAddress();
}
@ -471,12 +469,8 @@ public class NTCPTransport extends TransportImpl {
if (_pumper.isAlive())
return _myAddress != null ? _myAddress.toRouterAddress() : null;
if (_log.shouldLog(Log.WARN)) _log.warn("Restarting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
_reader.startReading(NUM_CONCURRENT_READERS);
_writer.startWriting(NUM_CONCURRENT_WRITERS);
startIt();
if (addr == null)
_myAddress = null;
else
@ -484,6 +478,28 @@ public class NTCPTransport extends TransportImpl {
return bindAddress();
}
/**
* Start up. Caller must synchronize.
* @since 0.8.3
*/
private void startIt() {
_finisher.start();
_pumper.startPumping();
long maxMemory = Runtime.getRuntime().maxMemory();
int nr, nw;
if (maxMemory < 32*1024*1024) {
nr = nw = 1;
} else if (maxMemory < 64*1024*1024) {
nr = nw = 2;
} else {
nr = Math.max(MIN_CONCURRENT_READERS, Math.min(MAX_CONCURRENT_READERS, _context.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
nw = Math.max(MIN_CONCURRENT_WRITERS, Math.min(MAX_CONCURRENT_WRITERS, _context.bandwidthLimiter().getOutboundKBytesPerSecond() / 20));
}
_reader.startReading(nr);
_writer.startWriting(nw);
}
public boolean isAlive() {
return _pumper.isAlive();
}

View File

@ -15,13 +15,13 @@ import net.i2p.util.Log;
*
*/
class Reader {
private RouterContext _context;
private Log _log;
private final RouterContext _context;
private final Log _log;
// TODO change to LBQ ??
private final List<NTCPConnection> _pendingConnections;
private List<NTCPConnection> _liveReads;
private List<NTCPConnection> _readAfterLive;
private List<Runner> _runners;
private final List<NTCPConnection> _liveReads;
private final List<NTCPConnection> _readAfterLive;
private final List<Runner> _runners;
public Reader(RouterContext ctx) {
_context = ctx;
@ -33,9 +33,9 @@ class Reader {
}
public void startReading(int numReaders) {
for (int i = 0; i < numReaders; i++) {
for (int i = 1; i <= numReaders; i++) {
Runner r = new Runner();
I2PThread t = new I2PThread(r, "NTCP read " + i, true);
I2PThread t = new I2PThread(r, "NTCP reader " + i + '/' + numReaders, true);
_runners.add(r);
t.start();
}

View File

@ -14,12 +14,12 @@ import net.i2p.util.Log;
*
*/
class Writer {
private RouterContext _context;
private Log _log;
private final RouterContext _context;
private final Log _log;
private final List<NTCPConnection> _pendingConnections;
private List<NTCPConnection> _liveWrites;
private List<NTCPConnection> _writeAfterLive;
private List<Runner> _runners;
private final List<NTCPConnection> _liveWrites;
private final List<NTCPConnection> _writeAfterLive;
private final List<Runner> _runners;
public Writer(RouterContext ctx) {
_context = ctx;
@ -31,9 +31,9 @@ class Writer {
}
public void startWriting(int numWriters) {
for (int i = 0; i < numWriters; i++) {
for (int i = 1; i <=numWriters; i++) {
Runner r = new Runner();
I2PThread t = new I2PThread(r, "NTCP write " + i, true);
I2PThread t = new I2PThread(r, "NTCP writer " + i + '/' + numWriters, true);
_runners.add(r);
t.start();
}

View File

@ -27,7 +27,9 @@ class MessageReceiver {
private final BlockingQueue<InboundMessageState> _completeMessages;
private boolean _alive;
//private ByteCache _cache;
private static final int THREADS = 5;
private static final int MIN_THREADS = 2; // unless < 32MB
private static final int MAX_THREADS = 5;
private final int _threadCount;
private static final long POISON_IMS = -99999999999l;
public MessageReceiver(RouterContext ctx, UDPTransport transport) {
@ -35,10 +37,19 @@ class MessageReceiver {
_log = ctx.logManager().getLog(MessageReceiver.class);
_transport = transport;
_completeMessages = new LinkedBlockingQueue();
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory < 32*1024*1024)
_threadCount = 1;
else if (maxMemory < 64*1024*1024)
_threadCount = 2;
else
_threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
// the runners run forever, no need to have a cache
//_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
_context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
@ -49,8 +60,8 @@ class MessageReceiver {
public void startup() {
_alive = true;
for (int i = 0; i < THREADS; i++) {
I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i + '/' + THREADS, true);
for (int i = 0; i < _threadCount; i++) {
I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + (i+1) + '/' + _threadCount, true);
t.start();
}
}
@ -64,7 +75,7 @@ class MessageReceiver {
public void shutdown() {
_alive = false;
_completeMessages.clear();
for (int i = 0; i < THREADS; i++) {
for (int i = 0; i < _threadCount; i++) {
InboundMessageState ims = new InboundMessageState(_context, POISON_IMS, null);
_completeMessages.offer(ims);
}
@ -119,8 +130,8 @@ class MessageReceiver {
if (message != null) {
long before = System.currentTimeMillis();
if (remaining > 0)
_context.statManager().addRateData("udp.inboundRemaining", remaining, 0);
//if (remaining > 0)
// _context.statManager().addRateData("udp.inboundRemaining", remaining, 0);
int size = message.getCompleteSize();
if (_log.shouldLog(Log.INFO))
_log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());

View File

@ -31,11 +31,13 @@ class PacketHandler {
private boolean _keepReading;
private final Handler[] _handlers;
private static final int NUM_HANDLERS = 5;
private static final int MIN_NUM_HANDLERS = 2; // unless < 32MB
private static final int MAX_NUM_HANDLERS = 5;
/** let packets be up to 30s slow */
private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000;
PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound, PeerTestManager testManager, IntroductionManager introManager) {// LINT -- Exporting non-public type through public API
PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher,
InboundMessageFragments inbound, PeerTestManager testManager, IntroductionManager introManager) {
_context = ctx;
_log = ctx.logManager().getLog(PacketHandler.class);
_transport = transport;
@ -44,10 +46,20 @@ class PacketHandler {
_inbound = inbound;
_testManager = testManager;
_introManager = introManager;
_handlers = new Handler[NUM_HANDLERS];
for (int i = 0; i < NUM_HANDLERS; i++) {
long maxMemory = Runtime.getRuntime().maxMemory();
int num_handlers;
if (maxMemory < 32*1024*1024)
num_handlers = 1;
else if (maxMemory < 64*1024*1024)
num_handlers = 2;
else
num_handlers = Math.max(MIN_NUM_HANDLERS, Math.min(MAX_NUM_HANDLERS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
_handlers = new Handler[num_handlers];
for (int i = 0; i < num_handlers; i++) {
_handlers[i] = new Handler();
}
_context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", UDPTransport.RATES);
@ -79,8 +91,8 @@ class PacketHandler {
public void startup() {
_keepReading = true;
for (int i = 0; i < NUM_HANDLERS; i++) {
I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + i + '/' + NUM_HANDLERS, true);
for (int i = 0; i < _handlers.length; i++) {
I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + (i+1) + '/' + _handlers.length, true);
t.start();
}
}
@ -91,8 +103,8 @@ class PacketHandler {
String getHandlerStatus() {
StringBuilder rv = new StringBuilder();
rv.append("Handlers: ").append(NUM_HANDLERS);
for (int i = 0; i < NUM_HANDLERS; i++) {
rv.append("Handlers: ").append(_handlers.length);
for (int i = 0; i < _handlers.length; i++) {
Handler handler = _handlers[i];
rv.append(" handler ").append(i).append(" state: ").append(handler._state);
}

View File

@ -16,22 +16,26 @@ public class TunnelGatewayPumper implements Runnable {
private RouterContext _context;
private final BlockingQueue<PumpedTunnelGateway> _wantsPumping;
private boolean _stop;
private static final int PUMPERS = 4;
private static final int MIN_PUMPERS = 1;
private static final int MAX_PUMPERS = 4;
private final int _pumpers;
/** Creates a new instance of TunnelGatewayPumper */
public TunnelGatewayPumper(RouterContext ctx) {
_context = ctx;
_wantsPumping = new LinkedBlockingQueue();
_stop = false;
for (int i = 0; i < PUMPERS; i++)
new I2PThread(this, "Tunnel GW pumper " + i + '/' + PUMPERS, true).start();
long maxMemory = Runtime.getRuntime().maxMemory();
_pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024))));
for (int i = 0; i < _pumpers; i++)
new I2PThread(this, "Tunnel GW pumper " + (i+1) + '/' + _pumpers, true).start();
}
public void stopPumping() {
_stop=true;
_wantsPumping.clear();
PumpedTunnelGateway poison = new PoisonPTG(_context);
for (int i = 0; i < PUMPERS; i++)
for (int i = 0; i < _pumpers; i++)
_wantsPumping.offer(poison);
for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
try {