diff --git a/core/java/src/net/i2p/data/RouterAddress.java b/core/java/src/net/i2p/data/RouterAddress.java index 696afe2de8..c21f0f5bf1 100644 --- a/core/java/src/net/i2p/data/RouterAddress.java +++ b/core/java/src/net/i2p/data/RouterAddress.java @@ -252,12 +252,14 @@ public class RouterAddress extends DataStructureImpl { } /** - * Just use style and hashCode for speed (expiration is always null). - * If we add multiple addresses of the same style, this may need to be changed. + * Just use a few items for speed (expiration is always null). */ @Override public int hashCode() { - return DataHelper.hashCode(_transportStyle) ^ _cost; + return DataHelper.hashCode(_transportStyle) ^ + DataHelper.hashCode(getIP()) ^ + getPort() ^ + _cost; } /** diff --git a/core/java/src/net/i2p/data/RouterInfo.java b/core/java/src/net/i2p/data/RouterInfo.java index 662f6b7408..e8155b7824 100644 --- a/core/java/src/net/i2p/data/RouterInfo.java +++ b/core/java/src/net/i2p/data/RouterInfo.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.Vector; import net.i2p.I2PAppContext; import net.i2p.crypto.DSAEngine; @@ -450,8 +449,9 @@ public class RouterInfo extends DatabaseEntry { /** - * Pull the first workable target address for the given transport - * + * Pull the first workable target address for the given transport. + * Use to check for any address. For all addresses, use getTargetAddresses(), + * which you probably want if you care about IPv6. */ public RouterAddress getTargetAddress(String transportStyle) { for (RouterAddress addr : _addresses) { @@ -462,11 +462,12 @@ public class RouterInfo extends DatabaseEntry { } /** - * For future multiple addresses per-transport (IPV6), currently unused + * For multiple addresses per-transport (IPv4 or IPv6) + * @return non-null * @since 0.7.11 */ public List getTargetAddresses(String transportStyle) { - List ret = new Vector(); + List ret = new ArrayList(_addresses.size()); for (RouterAddress addr : _addresses) { if(addr.getTransportStyle().equals(transportStyle)) ret.add(addr); diff --git a/history.txt b/history.txt index a7f75dbec7..ce3bc8489f 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2013-04-29 zzz + * Transports: + - Initial prep for multiple addresses per style + - Simplify NTCP send pool + 2013-04-28 zzz * i2psnark: - Improve page nav diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 5b1ba79e39..f5c9264549 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 19; + public final static long BUILD = 20; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 8464b1d249..a8d79389bf 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.Set; import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import net.i2p.data.DataHelper; import net.i2p.data.Hash; @@ -49,7 +51,8 @@ public abstract class TransportImpl implements Transport { private final Log _log; private TransportEventListener _listener; private RouterAddress _currentAddress; - private final List _sendPool; + // Only used by NTCP. SSU does not use. See send() below. + private final BlockingQueue _sendPool; protected final RouterContext _context; /** map from routerIdentHash to timestamp (Long) that the peer was last unreachable */ private final Map _unreachableEntries; @@ -84,7 +87,10 @@ public abstract class TransportImpl implements Transport { _context.statManager().createRequiredRateStat("transport.sendProcessingTime", "Time to process and send a message (ms)", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); //_context.statManager().createRateStat("transport.sendProcessingTime." + getStyle(), "Time to process and send a message (ms)", "Transport", new long[] { 60*1000l }); _context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); - _sendPool = new ArrayList(16); + if (getStyle().equals("NTCP")) + _sendPool = new ArrayBlockingQueue(8); + else + _sendPool = null; _unreachableEntries = new HashMap(16); _wasUnreachableEntries = new ConcurrentHashSet(16); _context.simpleScheduler().addPeriodicEvent(new CleanupUnreachable(), 2 * UNREACHABLE_PERIOD, UNREACHABLE_PERIOD / 2); @@ -166,15 +172,14 @@ public abstract class TransportImpl implements Transport { * Nonblocking call to pull the next outbound message * off the queue. * + * Only used by NTCP. SSU does not call. + * * @return the next message or null if none are available */ - public OutNetMessage getNextMessage() { - OutNetMessage msg = null; - synchronized (_sendPool) { - if (_sendPool.isEmpty()) return null; - msg = (OutNetMessage)_sendPool.remove(0); // use priority queues later - } - msg.beginSend(); + protected OutNetMessage getNextMessage() { + OutNetMessage msg = _sendPool.poll(); + if (msg != null) + msg.beginSend(); return msg; } @@ -361,6 +366,12 @@ public abstract class TransportImpl implements Transport { * with the OutboundMessageRegistry (if it has a reply selector). If the * send fails, queue up any msg.getOnFailedSendJob * + * Only used by NTCP. SSU overrides. + * + * Note that this adds to the queue and then takes it back off in the same thread, + * so it actually blocks, and we don't need a big queue. + * + * TODO: Override in NTCP also and get rid of queue? */ public void send(OutNetMessage msg) { if (msg.getTarget() == null) { @@ -368,29 +379,26 @@ public abstract class TransportImpl implements Transport { _log.error("Error - bad message enqueued [target is null]: " + msg, new Exception("Added by")); return; } - boolean duplicate = false; - synchronized (_sendPool) { - if (_sendPool.contains(msg)) - duplicate = true; - else - _sendPool.add(msg); - } - if (duplicate) { + try { + _sendPool.put(msg); + } catch (InterruptedException ie) { if (_log.shouldLog(Log.ERROR)) - _log.error("Message already is in the queue? wtf. msg = " + msg, - new Exception("wtf, requeued?")); + _log.error("Interrupted during send " + msg); + return; } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Message added to send pool"); - msg.timestamp("send on " + getStyle()); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Message added to send pool"); + //msg.timestamp("send on " + getStyle()); outboundMessageReady(); - if (_log.shouldLog(Log.INFO)) - _log.debug("OutboundMessageReady called"); + //if (_log.shouldLog(Log.INFO)) + // _log.debug("OutboundMessageReady called"); } /** * This message is called whenever a new message is added to the send pool, * and it should not block + * + * Only used by NTCP. SSU throws UOE. */ protected abstract void outboundMessageReady(); diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 9877b698ee..66cd37e819 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -12,6 +12,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -98,8 +99,8 @@ public class NTCPTransport extends TransportImpl { _context.statManager().createRateStat("ntcp.attemptUnreachablePeer", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.closeOnBacklog", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.connectFailedIOE", "", "ntcp", RATES); - _context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", RATES); - _context.statManager().createRateStat("ntcp.bidRejectedLocalAddress", "", "ntcp", RATES); + //_context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", RATES); + //_context.statManager().createRateStat("ntcp.bidRejectedLocalAddress", "", "ntcp", RATES); //_context.statManager().createRateStat("ntcp.bidRejectedNoNTCPAddress", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.connectFailedTimeout", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.connectFailedTimeoutIOE", "", "ntcp", RATES); @@ -183,7 +184,8 @@ public class NTCPTransport extends TransportImpl { protected void outboundMessageReady() { OutNetMessage msg = getNextMessage(); if (msg != null) { - RouterIdentity ident = msg.getTarget().getIdentity(); + RouterInfo target = msg.getTarget(); + RouterIdentity ident = target.getIdentity(); Hash ih = ident.calculateHash(); NTCPConnection con = null; boolean isNew = false; @@ -191,7 +193,7 @@ public class NTCPTransport extends TransportImpl { con = _conByIdent.get(ih); if (con == null) { isNew = true; - RouterAddress addr = msg.getTarget().getTargetAddress(STYLE); + RouterAddress addr = getTargetAddress(target); if (addr != null) { NTCPAddress naddr = new NTCPAddress(addr); con = new NTCPConnection(_context, this, ident, naddr); @@ -199,7 +201,7 @@ public class NTCPTransport extends TransportImpl { _log.debug("Send on a new con: " + con + " at " + addr + " for " + ih.toBase64()); _conByIdent.put(ih, con); } else { - _log.error("we bid on a peer who doesn't have an ntcp address? " + msg.getTarget()); + _log.error("we bid on a peer who doesn't have an ntcp address? " + target); return; } } @@ -297,34 +299,12 @@ public class NTCPTransport extends TransportImpl { _log.debug("fast bid when trying to send to " + peer + " as its already established"); return _fastBid; } - RouterAddress addr = toAddress.getTargetAddress(STYLE); + RouterAddress addr = getTargetAddress(toAddress); if (addr == null) { markUnreachable(peer); - //_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1); - //_context.banlist().banlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("no bid when trying to send to " + peer + " as they don't have an ntcp address"); return null; } - byte[] ip = addr.getIP(); - if ( (addr.getPort() < MIN_PEER_PORT) || (ip == null) ) { - _context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1); - markUnreachable(peer); - //_context.banlist().banlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("no bid when trying to send to " + peer + " as they don't have a valid ntcp address"); - return null; - } - if (!isPubliclyRoutable(ip)) { - if (! _context.getProperty("i2np.ntcp.allowLocal", "false").equals("true")) { - _context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1); - markUnreachable(peer); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("no bid when trying to send to " + peer + " as they have a private ntcp address"); - return null; - } - } if (!allowConnection()) { if (_log.shouldLog(Log.WARN)) @@ -350,6 +330,36 @@ public class NTCPTransport extends TransportImpl { } } + /** + * Get first available address we can use. + * @return address or null + * @since 0.9.6 + */ + private RouterAddress getTargetAddress(RouterInfo target) { + List addrs = target.getTargetAddresses(STYLE); + for (int i = 0; i < addrs.size(); i++) { + RouterAddress addr = addrs.get(i); + byte[] ip = addr.getIP(); + if (addr.getPort() < MIN_PEER_PORT || ip == null) { + //_context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1); + //_context.banlist().banlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("no bid when trying to send to " + peer + " as they don't have a valid ntcp address"); + continue; + } + if (!isPubliclyRoutable(ip)) { + if (! _context.getBooleanProperty("i2np.ntcp.allowLocal")) { + //_context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("no bid when trying to send to " + peer + " as they have a private ntcp address"); + continue; + } + } + return addr; + } + return null; + } + public boolean allowConnection() { return countActivePeers() < getMaxConnections(); } diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 5395f7f6e3..4987314a36 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -225,7 +225,7 @@ class EstablishmentManager { */ private void establish(OutNetMessage msg, boolean queueIfMaxExceeded) { RouterInfo toRouterInfo = msg.getTarget(); - RouterAddress ra = toRouterInfo.getTargetAddress(_transport.getStyle()); + RouterAddress ra = _transport.getTargetAddress(toRouterInfo); if (ra == null) { _transport.failed(msg, "Remote peer has no address, cannot establish"); return; @@ -668,7 +668,7 @@ class EstablishmentManager { // Perhaps netdb should notify transport when it gets a new RI... RouterInfo info = _context.netDb().lookupRouterInfoLocally(remote.calculateHash()); if (info != null) { - RouterAddress addr = info.getTargetAddress(UDPTransport.STYLE); + RouterAddress addr = _transport.getTargetAddress(info); if (addr != null) { String smtu = addr.getOption(UDPAddress.PROP_MTU); if (smtu != null) { diff --git a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java index 5ebb64078c..0d49d2b5b1 100644 --- a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java +++ b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java @@ -136,7 +136,7 @@ class IntroductionManager { _log.info("Picked peer has no local routerInfo: " + cur); continue; } - RouterAddress ra = ri.getTargetAddress(UDPTransport.STYLE); + RouterAddress ra = _transport.getTargetAddress(ri); if (ra == null) { if (_log.shouldLog(Log.INFO)) _log.info("Picked peer has no SSU address: " + ri); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java index e32decb03e..2adecf5217 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java @@ -727,7 +727,7 @@ class PeerTestManager { aliceIntroKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]); testInfo.readIntroKey(aliceIntroKey.getData(), 0); - RouterAddress raddr = charlieInfo.getTargetAddress(UDPTransport.STYLE); + RouterAddress raddr = _transport.getTargetAddress(charlieInfo); if (raddr == null) { if (_log.shouldLog(Log.WARN)) _log.warn("Unable to pick a charlie"); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index d387a28f66..567ce43ece 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -1286,25 +1286,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } // Validate his SSU address - RouterAddress addr = toAddress.getTargetAddress(STYLE); + RouterAddress addr = getTargetAddress(toAddress); if (addr == null) { markUnreachable(to); return null; } - // don't do this - object churn parsing the whole thing - //UDPAddress ua = new UDPAddress(addr); - //if (ua.getIntroducerCount() <= 0) { - if (addr.getOption("ihost0") == null) { - byte[] ip = addr.getIP(); - int port = addr.getPort(); - if (ip == null || port < MIN_PEER_PORT || - (!isValid(ip)) || - Arrays.equals(ip, getExternalIP())) { - markUnreachable(to); - return null; - } - } if (!allowConnection()) return _cachedBid[TRANSIENT_FAIL_BID]; @@ -1337,6 +1324,29 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } + /** + * Get first available address we can use. + * @return address or null + * @since 0.9.6 + */ + RouterAddress getTargetAddress(RouterInfo target) { + List addrs = target.getTargetAddresses(STYLE); + for (int i = 0; i < addrs.size(); i++) { + RouterAddress addr = addrs.get(i); + if (addr.getOption("ihost0") == null) { + byte[] ip = addr.getIP(); + int port = addr.getPort(); + if (ip == null || port < MIN_PEER_PORT || + (!isValid(ip)) || + Arrays.equals(ip, getExternalIP())) { + continue; + } + } + return addr; + } + return null; + } + private boolean preferUDP() { String pref = _context.getProperty(PROP_PREFER_UDP, DEFAULT_PREFER_UDP); return (pref != null) && ! "false".equals(pref);