Prep for multiple SSU sockets:

- Change from single UDPEndpoint to a List of UDPEndpoints
 - Move (single) receive queue from UDPReceiver to PacketHandler
 - Multiple transmit queues (one for each UDPEndpoint/UDPSender),
   select queue in PacketPusher
 - Throw exception on UDPEndpoint.startup() failure
This commit is contained in:
zzz
2013-05-03 15:03:55 +00:00
parent eecab472eb
commit c6121cb31e
6 changed files with 204 additions and 102 deletions

View File

@ -3,9 +3,11 @@ package net.i2p.router.transport.udp;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.data.DataHelper;
import net.i2p.util.I2PThread;
import net.i2p.util.LHMCache;
@ -26,7 +28,6 @@ class PacketHandler {
private final RouterContext _context;
private final Log _log;
private final UDPTransport _transport;
private final UDPEndpoint _endpoint;
private final EstablishmentManager _establisher;
private final InboundMessageFragments _inbound;
private final PeerTestManager _testManager;
@ -34,19 +35,22 @@ class PacketHandler {
private volatile boolean _keepReading;
private final Handler[] _handlers;
private final Map<RemoteHostId, Object> _failCache;
private final BlockingQueue<UDPPacket> _inboundQueue;
private static final Object DUMMY = new Object();
private static final int TYPE_POISON = -99999;
private static final int MIN_QUEUE_SIZE = 16;
private static final int MAX_QUEUE_SIZE = 192;
private static final int MIN_NUM_HANDLERS = 1; // unless < 32MB
private static final int MAX_NUM_HANDLERS = 1;
/** let packets be up to 30s slow */
private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000;
PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher,
PacketHandler(RouterContext ctx, UDPTransport transport, EstablishmentManager establisher,
InboundMessageFragments inbound, PeerTestManager testManager, IntroductionManager introManager) {
_context = ctx;
_log = ctx.logManager().getLog(PacketHandler.class);
_transport = transport;
_endpoint = endpoint;
_establisher = establisher;
_inbound = inbound;
_testManager = testManager;
@ -56,6 +60,8 @@ class PacketHandler {
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
_inboundQueue = new CoDelBlockingQueue(ctx, "UDP-Receiver", qsize);
int num_handlers;
if (maxMemory < 32*1024*1024)
num_handlers = 1;
@ -107,6 +113,7 @@ class PacketHandler {
public synchronized void shutdown() {
_keepReading = false;
stopQueue();
}
String getHandlerStatus() {
@ -119,9 +126,55 @@ class PacketHandler {
return rv.toString();
}
/** @since 0.8.8 */
int getHandlerCount() {
return _handlers.length;
/**
* Blocking call to retrieve the next inbound packet, or null if we have
* shut down.
*
* @since IPv6 moved from UDPReceiver
*/
public void queueReceived(UDPPacket packet) throws InterruptedException {
_inboundQueue.put(packet);
}
/**
* Blocking for a while
*
* @since IPv6 moved from UDPReceiver
*/
private void stopQueue() {
_inboundQueue.clear();
for (int i = 0; i < _handlers.length; i++) {
UDPPacket poison = UDPPacket.acquire(_context, false);
poison.setMessageType(TYPE_POISON);
_inboundQueue.offer(poison);
}
for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
try {
Thread.sleep(i * 50);
} catch (InterruptedException ie) {}
}
_inboundQueue.clear();
}
/**
* Blocking call to retrieve the next inbound packet, or null if we have
* shut down.
*
* @since IPv6 moved from UDPReceiver
*/
public UDPPacket receiveNext() {
UDPPacket rv = null;
//int remaining = 0;
while (_keepReading && rv == null) {
try {
rv = _inboundQueue.take();
} catch (InterruptedException ie) {}
if (rv != null && rv.getMessageType() == TYPE_POISON)
return null;
}
//_context.statManager().addRateData("udp.receiveRemaining", remaining, 0);
return rv;
}
/** the packet is from a peer we are establishing an outbound con to, but failed validation, so fallback */
@ -144,7 +197,7 @@ class PacketHandler {
_state = 1;
while (_keepReading) {
_state = 2;
UDPPacket packet = _endpoint.receive();
UDPPacket packet = receiveNext();
_state = 3;
if (packet == null) break; // keepReading is probably false, or bind failed...

View File

@ -1,26 +1,29 @@
package net.i2p.router.transport.udp;
import java.util.List;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Blocking thread to grab new packets off the outbound fragment
* pool and toss 'em onto the outbound packet queue
* pool and toss 'em onto the outbound packet queues.
*
* Here we select which UDPEndpoint/UDPSender to send it out.
*/
class PacketPusher implements Runnable {
// private RouterContext _context;
private final Log _log;
private final OutboundMessageFragments _fragments;
private final UDPSender _sender;
private final List<UDPEndpoint> _endpoints;
private volatile boolean _alive;
public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, UDPSender sender) {
public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, List<UDPEndpoint> endpoints) {
// _context = ctx;
_log = ctx.logManager().getLog(PacketPusher.class);
_fragments = fragments;
_sender = sender;
_endpoints = endpoints;
}
public synchronized void startup() {
@ -38,8 +41,7 @@ class PacketPusher implements Runnable {
if (packets != null) {
for (int i = 0; i < packets.length; i++) {
if (packets[i] != null) // null for ACKed fragments
// BLOCKING if queue is full
_sender.add(packets[i]);
send(packets[i]);
}
}
} catch (Exception e) {
@ -47,4 +49,41 @@ class PacketPusher implements Runnable {
}
}
}
/**
* This sends it directly out, bypassing OutboundMessageFragments
* and the PacketPusher. The only queueing is for the bandwidth limiter.
* BLOCKING if OB queue is full.
*
* @param packet non-null
* @since IPv6
*/
public void send(UDPPacket packet) {
boolean handled = false;
boolean isIPv4 = packet.getPacket().getAddress().getAddress().length == 4;
for (int j = 0; j < _endpoints.size(); j++) {
// Find the best endpoint (socket) to send this out.
// TODO if we have multiple IPv4, or multiple IPv6 endpoints,
// we have to track which one we're using in the PeerState and
// somehow set that in the UDPPacket so we're consistent
UDPEndpoint ep;
try {
ep = _endpoints.get(j);
} catch (IndexOutOfBoundsException ioobe) {
// whups, list changed
break;
}
if ((isIPv4 && ep.isIPv4()) ||
((!isIPv4) && ep.isIPv6())) {
// BLOCKING if queue is full
ep.getSender().add(packet);
handled = true;
break;
}
}
if (!handled) {
_log.error("No endpoint to send " + packet);
packet.release();
}
}
}

View File

@ -2,14 +2,16 @@ package net.i2p.router.transport.udp;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.SocketException;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Coordinate the low level datagram socket, managing the UDPSender and
* UDPReceiver
* Coordinate the low-level datagram socket, creating and managing the UDPSender and
* UDPReceiver.
*/
class UDPEndpoint {
private final RouterContext _context;
@ -20,6 +22,7 @@ class UDPEndpoint {
private UDPReceiver _receiver;
private DatagramSocket _socket;
private final InetAddress _bindAddress;
private final boolean _isIPv4, _isIPv6;
/**
* @param listenPort -1 or the requested port, may not be honored
@ -31,17 +34,19 @@ class UDPEndpoint {
_transport = transport;
_bindAddress = bindAddress;
_listenPort = listenPort;
_isIPv4 = bindAddress == null || bindAddress instanceof Inet4Address;
_isIPv6 = bindAddress == null || bindAddress instanceof Inet6Address;
}
/** caller should call getListenPort() after this to get the actual bound port and determine success */
public synchronized void startup() {
public synchronized void startup() throws SocketException {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up the UDP endpoint");
shutdown();
_socket = getSocket();
if (_socket == null) {
_log.log(Log.CRIT, "UDP Unable to open a port");
return;
throw new SocketException("SSU Unable to bind to a port on " + _bindAddress);
}
_sender = new UDPSender(_context, _socket, "UDPSender");
_receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceiver");
@ -144,16 +149,6 @@ class UDPEndpoint {
_sender.add(packet);
}
/**
* Blocking call to receive the next inbound UDP packet from any peer.
* @return null if we have shut down
*/
public UDPPacket receive() {
if (_receiver == null)
return null;
return _receiver.receiveNext();
}
/**
* Clear outbound queue, probably in preparation for sending destroy() to everybody.
* @since 0.9.2
@ -162,4 +157,20 @@ class UDPEndpoint {
if (_sender != null)
_sender.clear();
}
/**
* @return true for wildcard too
* @since IPv6
*/
public boolean isIPv4() {
return _isIPv4;
}
/**
* @return true for wildcard too
* @since IPv6
*/
public boolean isIPv6() {
return _isIPv6;
}
}

View File

@ -3,11 +3,9 @@ package net.i2p.router.transport.udp;
import java.io.IOException;
import java.net.DatagramSocket;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
@ -20,37 +18,34 @@ import net.i2p.util.SystemVersion;
* waiting around too long, they are dropped. Packets should be pulled off
* from the queue ASAP by a {@link PacketHandler}
*
* There is a UDPReceiver for each UDPEndpoint.
* It contains a thread but no queue. Received packets are queued
* in the common PacketHandler queue.
*/
class UDPReceiver {
private final RouterContext _context;
private final Log _log;
private final DatagramSocket _socket;
private String _name;
private final BlockingQueue<UDPPacket> _inboundQueue;
private volatile boolean _keepRunning;
private final Runner _runner;
private final UDPTransport _transport;
private final PacketHandler _handler;
private static int __id;
private final int _id;
private static final boolean _isAndroid = SystemVersion.isAndroid();
private static final int TYPE_POISON = -99999;
private static final int MIN_QUEUE_SIZE = 16;
private static final int MAX_QUEUE_SIZE = 192;
public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) {
_context = ctx;
_log = ctx.logManager().getLog(UDPReceiver.class);
_id = ++__id;
_name = name;
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
_inboundQueue = new CoDelBlockingQueue(ctx, "UDP-Receiver", qsize);
_socket = socket;
_transport = transport;
_handler = transport.getPacketHandler();
if (_handler == null)
throw new IllegalStateException();
_runner = new Runner();
//_context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receiveRemaining", "How many packets are left sitting on the receiver's queue", "udp", UDPTransport.RATES);
@ -68,18 +63,6 @@ class UDPReceiver {
public synchronized void shutdown() {
_keepRunning = false;
_inboundQueue.clear();
for (int i = 0; i < _transport.getPacketHandlerCount(); i++) {
UDPPacket poison = UDPPacket.acquire(_context, false);
poison.setMessageType(TYPE_POISON);
_inboundQueue.offer(poison);
}
for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
try {
Thread.sleep(i * 50);
} catch (InterruptedException ie) {}
}
_inboundQueue.clear();
}
/*********
@ -194,7 +177,7 @@ class UDPReceiver {
if (!rejected) {
****/
try {
_inboundQueue.put(packet);
_handler.queueReceived(packet);
} catch (InterruptedException ie) {
packet.release();
_keepRunning = false;
@ -229,24 +212,6 @@ class UDPReceiver {
}
****/
/**
* Blocking call to retrieve the next inbound packet, or null if we have
* shut down.
*
*/
public UDPPacket receiveNext() {
UDPPacket rv = null;
//int remaining = 0;
while (_keepRunning && rv == null) {
try {
rv = _inboundQueue.take();
} catch (InterruptedException ie) {}
if (rv != null && rv.getMessageType() == TYPE_POISON)
return null;
}
//_context.statManager().addRateData("udp.receiveRemaining", remaining, 0);
return rv;
}
private class Runner implements Runnable {
//private volatile boolean _socketChanged;

View File

@ -14,6 +14,9 @@ import net.i2p.util.Log;
/**
* Lowest level packet sender, pushes anything on its queue ASAP.
*
* There is a UDPSender for each UDPEndpoint.
* It contains a thread and a queue. Packet to be sent are queued
* by the PacketPusher.
*/
class UDPSender {
private final RouterContext _context;

View File

@ -3,6 +3,7 @@ package net.i2p.router.transport.udp;
import java.io.IOException;
import java.io.Writer;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.ArrayList;
@ -18,6 +19,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import net.i2p.data.DatabaseEntry;
import net.i2p.data.DataHelper;
@ -51,7 +53,7 @@ import net.i2p.util.Translate;
*/
public class UDPTransport extends TransportImpl implements TimedWeightedPriorityMessageQueue.FailedListener {
private final Log _log;
private UDPEndpoint _endpoint;
private final List<UDPEndpoint> _endpoints;
private final Object _addDropLock = new Object();
/** Peer (Hash) to PeerState */
private final Map<Hash, PeerState> _peersByIdent;
@ -206,6 +208,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_peersByIdent = new ConcurrentHashMap(128);
_peersByRemoteHost = new ConcurrentHashMap(128);
_dropList = new ConcurrentHashSet(2);
_endpoints = new CopyOnWriteArrayList();
// See comments in DummyThrottle.java
if (USE_PRIORITY) {
@ -263,8 +266,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_pusher.shutdown();
if (_handler != null)
_handler.shutdown();
if (_endpoint != null)
_endpoint.shutdown();
for (UDPEndpoint endpoint : _endpoints) {
endpoint.shutdown();
// should we remove?
_endpoints.remove(endpoint);
}
if (_establisher != null)
_establisher.shutdown();
if (_refiller != null)
@ -327,14 +333,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.warn("Binding only to " + bindToAddr);
if (_log.shouldLog(Log.INFO))
_log.info("Binding to the port: " + port);
if (_endpoint == null) {
_endpoint = new UDPEndpoint(_context, this, port, bindToAddr);
if (_endpoints.isEmpty()) {
// will always be empty since we are removing them above
UDPEndpoint endpoint = new UDPEndpoint(_context, this, port, bindToAddr);
_endpoints.add(endpoint);
// TODO add additional endpoints for additional addresses/ports
} else {
// todo, set bind address too
_endpoint.setListenPort(port);
// unused for now
for (UDPEndpoint endpoint : _endpoints) {
if (endpoint.isIPv4()) {
// hack, first IPv4 endpoint, FIXME
// todo, set bind address too
endpoint.setListenPort(port);
break;
}
}
}
setMTU(bindToAddr);
if (_establisher == null)
_establisher = new EstablishmentManager(_context, this);
@ -342,7 +358,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_testManager = new PeerTestManager(_context, this);
if (_handler == null)
_handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager);
_handler = new PacketHandler(_context, this, _establisher, _inboundFragments, _testManager, _introManager);
// See comments in DummyThrottle.java
if (USE_PRIORITY && _refiller == null)
@ -353,15 +369,26 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// Startup the endpoint with the requested port, check the actual port, and
// take action if it failed or was different than requested or it needs to be saved
_endpoint.startup();
int newPort = _endpoint.getListenPort();
_externalListenPort = newPort;
if (newPort <= 0) {
int newPort = -1;
for (UDPEndpoint endpoint : _endpoints) {
try {
endpoint.startup();
// hack, first IPv4 endpoint, FIXME
if (newPort < 0 && endpoint.isIPv4()) {
newPort = endpoint.getListenPort();
_externalListenPort = newPort;
}
} catch (SocketException se) {
_endpoints.remove(endpoint);
}
}
if (_endpoints.isEmpty()) {
_log.log(Log.CRIT, "Unable to open UDP port");
setReachabilityStatus(CommSystemFacade.STATUS_HOSED);
return;
}
if (newPort != port || newPort != oldIPort || newPort != oldEPort) {
if (newPort > 0 &&
(newPort != port || newPort != oldIPort || newPort != oldEPort)) {
// attempt to use it as our external port - this will be overridden by
// externalAddressReceived(...)
Map<String, String> changes = new HashMap();
@ -374,7 +401,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_handler.startup();
_fragments.startup();
_inboundFragments.startup();
_pusher = new PacketPusher(_context, _fragments, _endpoint.getSender());
_pusher = new PacketPusher(_context, _fragments, _endpoints);
_pusher.startup();
if (USE_PRIORITY)
_refiller.startup();
@ -387,8 +414,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public synchronized void shutdown() {
destroyAll();
if (_endpoint != null)
_endpoint.shutdown();
for (UDPEndpoint endpoint : _endpoints) {
endpoint.shutdown();
// should we remove?
_endpoints.remove(endpoint);
}
//if (_flooder != null)
// _flooder.shutdown();
if (_refiller != null)
@ -416,11 +446,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/
SessionKey getIntroKey() { return _introKey; }
/** @deprecated unused */
public int getLocalPort() {
return _endpoint != null ? _endpoint.getListenPort() : -1;
}
public InetAddress getLocalAddress() { return _externalListenHost; }
public int getExternalPort() { return _externalListenPort; }
@ -1205,7 +1230,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
void send(UDPPacket packet) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending packet " + packet);
_endpoint.send(packet);
_pusher.send(packet);
}
/**
@ -1231,7 +1256,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* @since 0.8.9
*/
private void destroyAll() {
_endpoint.clearOutbound();
for (UDPEndpoint endpoint : _endpoints) {
endpoint.clearOutbound();
}
int howMany = _peersByIdent.size();
// use no more than 1/4 of configured bandwidth
final int burst = 8;
@ -1662,13 +1689,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return "";
}
/** @since 0.8.8 */
int getPacketHandlerCount() {
PacketHandler handler = _handler;
if (handler != null)
return handler.getHandlerCount();
else
return 0;
/** @since IPv6 */
PacketHandler getPacketHandler() {
return _handler;
}
public void failed(OutboundMessageState msg) { failed(msg, true); }
@ -2521,7 +2544,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long pingCutoff = now - (2 * 60*60*1000);
long pingFirewallCutoff = now - PING_FIREWALL_CUTOFF;
boolean shouldPingFirewall = _reachabilityStatus != CommSystemFacade.STATUS_OK;
boolean pingOneOnly = shouldPingFirewall && _externalListenPort == _endpoint.getListenPort();
int currentListenPort = -1;
for (UDPEndpoint endpoint : _endpoints) {
// hack, first IPv4 endpoint, FIXME
if (endpoint.isIPv4()) {
currentListenPort = endpoint.getListenPort();
break;
}
}
boolean pingOneOnly = shouldPingFirewall && _externalListenPort == currentListenPort;
boolean shortLoop = shouldPingFirewall;
_expireBuffer.clear();
_runCount++;