* SSU: Fixes for i2np.udp.allowLocal, log tweaks, sender/receiver thread name tweaks

* Limit tunnel GW pumper threads when testing
This commit is contained in:
zzz
2013-05-18 18:00:17 +00:00
parent 226c7eb8e3
commit 0b49fa98f9
8 changed files with 35 additions and 17 deletions

View File

@ -687,7 +687,8 @@ public abstract class TransportImpl implements Transport {
else else
_wasUnreachableEntries.remove(peer); _wasUnreachableEntries.remove(peer);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(this.getStyle() + " setting wasUnreachable to " + yes + " for " + peer); _log.info(this.getStyle() + " setting wasUnreachable to " + yes + " for " + peer,
yes ? new Exception() : null);
} }
/** /**

View File

@ -249,7 +249,7 @@ class EstablishmentManager {
maybeTo = new RemoteHostId(remAddr.getAddress(), port); maybeTo = new RemoteHostId(remAddr.getAddress(), port);
if ((!_transport.isValid(maybeTo.getIP())) || if ((!_transport.isValid(maybeTo.getIP())) ||
Arrays.equals(maybeTo.getIP(), _transport.getExternalIP())) { (Arrays.equals(maybeTo.getIP(), _transport.getExternalIP()) && !_transport.allowLocal())) {
_transport.failed(msg, "Remote peer's IP isn't valid"); _transport.failed(msg, "Remote peer's IP isn't valid");
_transport.markUnreachable(toHash); _transport.markUnreachable(toHash);
//_context.banlist().banlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE); //_context.banlist().banlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE);

View File

@ -309,8 +309,8 @@ class OutboundMessageFragments {
// use max of 1 second so finishMessages() and/or PeerState.finishMessages() // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
// gets called regularly // gets called regularly
int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT); int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT);
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("wait for " + toWait); // _log.debug("wait for " + toWait);
// wait.. or somethin' // wait.. or somethin'
synchronized (_activePeers) { synchronized (_activePeers) {
try { try {

View File

@ -6,6 +6,7 @@ import java.net.InetAddress;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.Inet6Address; import java.net.Inet6Address;
import java.net.SocketException; import java.net.SocketException;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -24,6 +25,7 @@ class UDPEndpoint {
private DatagramSocket _socket; private DatagramSocket _socket;
private final InetAddress _bindAddress; private final InetAddress _bindAddress;
private final boolean _isIPv4, _isIPv6; private final boolean _isIPv4, _isIPv6;
private static final AtomicInteger _counter = new AtomicInteger();
/** /**
* @param transport may be null for unit testing ONLY * @param transport may be null for unit testing ONLY
@ -50,10 +52,11 @@ class UDPEndpoint {
_log.log(Log.CRIT, "UDP Unable to open a port"); _log.log(Log.CRIT, "UDP Unable to open a port");
throw new SocketException("SSU Unable to bind to a port on " + _bindAddress); throw new SocketException("SSU Unable to bind to a port on " + _bindAddress);
} }
_sender = new UDPSender(_context, _socket, "UDPSender"); int count = _counter.incrementAndGet();
_sender = new UDPSender(_context, _socket, "UDPSender " + count);
_sender.startup(); _sender.startup();
if (_transport != null) { if (_transport != null) {
_receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceiver"); _receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceiver " + count);
_receiver.startup(); _receiver.startup();
} }
} }

View File

@ -31,15 +31,12 @@ class UDPReceiver {
private final Runner _runner; private final Runner _runner;
private final UDPTransport _transport; private final UDPTransport _transport;
private final PacketHandler _handler; private final PacketHandler _handler;
private static int __id;
private final int _id;
private static final boolean _isAndroid = SystemVersion.isAndroid(); private static final boolean _isAndroid = SystemVersion.isAndroid();
public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) { public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(UDPReceiver.class); _log = ctx.logManager().getLog(UDPReceiver.class);
_id = ++__id;
_name = name; _name = name;
_socket = socket; _socket = socket;
_transport = transport; _transport = transport;
@ -57,7 +54,7 @@ class UDPReceiver {
public synchronized void startup() { public synchronized void startup() {
//adjustDropProbability(); //adjustDropProbability();
_keepRunning = true; _keepRunning = true;
I2PThread t = new I2PThread(_runner, _name + '.' + _id, true); I2PThread t = new I2PThread(_runner, _name, true);
t.start(); t.start();
} }
@ -154,7 +151,7 @@ class UDPReceiver {
} }
// drop anything apparently from our IP (any port) // drop anything apparently from our IP (any port)
if (Arrays.equals(from.getIP(), _transport.getExternalIP())) { if (Arrays.equals(from.getIP(), _transport.getExternalIP()) && !_transport.allowLocal()) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Dropping (spoofed?) packet from ourselves"); _log.warn("Dropping (spoofed?) packet from ourselves");
packet.release(); packet.release();

View File

@ -888,6 +888,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (isPubliclyRoutable(addr) && if (isPubliclyRoutable(addr) &&
(addr.length != 16 || _haveIPv6Address)) (addr.length != 16 || _haveIPv6Address))
return true; return true;
return allowLocal();
}
/**
* Are we allowed to connect to local addresses?
*
* @since IPv6
*/
boolean allowLocal() {
return _context.getBooleanProperty("i2np.udp.allowLocal"); return _context.getBooleanProperty("i2np.udp.allowLocal");
} }
@ -1522,7 +1531,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
int port = addr.getPort(); int port = addr.getPort();
if (ip == null || port < MIN_PEER_PORT || if (ip == null || port < MIN_PEER_PORT ||
(!isValid(ip)) || (!isValid(ip)) ||
Arrays.equals(ip, getExternalIP())) { (Arrays.equals(ip, getExternalIP()) && !allowLocal())) {
continue; continue;
} }
} }
@ -1960,6 +1969,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// dropPeer(msg.getPeer(), false); // dropPeer(msg.getPeer(), false);
//else if (consecutive > 2 * MAX_CONSECUTIVE_FAILED) // they're sending us data, but we cant reply? //else if (consecutive > 2 * MAX_CONSECUTIVE_FAILED) // they're sending us data, but we cant reply?
// dropPeer(msg.getPeer(), false); // dropPeer(msg.getPeer(), false);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Failed sending " + msg + " to " + msg.getPeer());
} }
noteSend(msg, false); noteSend(msg, false);
if (m != null) if (m != null)

View File

@ -42,10 +42,14 @@ class TunnelGatewayPumper implements Runnable {
_context = ctx; _context = ctx;
_wantsPumping = new LinkedHashSet(16); _wantsPumping = new LinkedHashSet(16);
_backlogged = new HashSet(16); _backlogged = new HashSet(16);
long maxMemory = Runtime.getRuntime().maxMemory(); if (ctx.getBooleanProperty("i2p.dummyTunnelManager")) {
if (maxMemory == Long.MAX_VALUE) _pumpers = 1;
maxMemory = 96*1024*1024l; } else {
_pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024)))); long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
_pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024))));
}
for (int i = 0; i < _pumpers; i++) for (int i = 0; i < _pumpers; i++)
new I2PThread(this, "Tunnel GW pumper " + (i+1) + '/' + _pumpers, true).start(); new I2PThread(this, "Tunnel GW pumper " + (i+1) + '/' + _pumpers, true).start();
} }

View File

@ -234,7 +234,7 @@ public class SSUDemo {
// we know its a FooMessage, since thats the type of message that the handler // we know its a FooMessage, since thats the type of message that the handler
// is registered as // is registered as
FooMessage m = (FooMessage)_msg; FooMessage m = (FooMessage)_msg;
System.out.println("RECV: " + Base64.encode(m.getData()) + " from " + _from); System.out.println("RECV FooMessage: " + Base64.encode(m.getData()) + " from " + _from);
} }
public String getName() { return "Handle Foo message"; } public String getName() { return "Handle Foo message"; }
} }
@ -301,6 +301,7 @@ public class SSUDemo {
// we know its a DatabaseStoreMessage, since thats the type of message that the handler // we know its a DatabaseStoreMessage, since thats the type of message that the handler
// is registered as // is registered as
DatabaseStoreMessage m = (DatabaseStoreMessage)_msg; DatabaseStoreMessage m = (DatabaseStoreMessage)_msg;
System.out.println("RECV: " + m);
try { try {
_us.netDb().store(m.getKey(), (RouterInfo) m.getEntry()); _us.netDb().store(m.getKey(), (RouterInfo) m.getEntry());
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {