* Transports:

- Initial prep for multiple addresses per style
   - Simplify NTCP send pool
This commit is contained in:
zzz
2013-04-29 18:09:21 +00:00
parent 650b920e11
commit 26f0c98ef8
10 changed files with 115 additions and 79 deletions

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 19;
public final static long BUILD = 20;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -21,6 +21,8 @@ import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
@ -49,7 +51,8 @@ public abstract class TransportImpl implements Transport {
private final Log _log;
private TransportEventListener _listener;
private RouterAddress _currentAddress;
private final List _sendPool;
// Only used by NTCP. SSU does not use. See send() below.
private final BlockingQueue<OutNetMessage> _sendPool;
protected final RouterContext _context;
/** map from routerIdentHash to timestamp (Long) that the peer was last unreachable */
private final Map<Hash, Long> _unreachableEntries;
@ -84,7 +87,10 @@ public abstract class TransportImpl implements Transport {
_context.statManager().createRequiredRateStat("transport.sendProcessingTime", "Time to process and send a message (ms)", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
//_context.statManager().createRateStat("transport.sendProcessingTime." + getStyle(), "Time to process and send a message (ms)", "Transport", new long[] { 60*1000l });
_context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
_sendPool = new ArrayList(16);
if (getStyle().equals("NTCP"))
_sendPool = new ArrayBlockingQueue(8);
else
_sendPool = null;
_unreachableEntries = new HashMap(16);
_wasUnreachableEntries = new ConcurrentHashSet(16);
_context.simpleScheduler().addPeriodicEvent(new CleanupUnreachable(), 2 * UNREACHABLE_PERIOD, UNREACHABLE_PERIOD / 2);
@ -166,15 +172,14 @@ public abstract class TransportImpl implements Transport {
* Nonblocking call to pull the next outbound message
* off the queue.
*
* Only used by NTCP. SSU does not call.
*
* @return the next message or null if none are available
*/
public OutNetMessage getNextMessage() {
OutNetMessage msg = null;
synchronized (_sendPool) {
if (_sendPool.isEmpty()) return null;
msg = (OutNetMessage)_sendPool.remove(0); // use priority queues later
}
msg.beginSend();
protected OutNetMessage getNextMessage() {
OutNetMessage msg = _sendPool.poll();
if (msg != null)
msg.beginSend();
return msg;
}
@ -361,6 +366,12 @@ public abstract class TransportImpl implements Transport {
* with the OutboundMessageRegistry (if it has a reply selector). If the
* send fails, queue up any msg.getOnFailedSendJob
*
* Only used by NTCP. SSU overrides.
*
* Note that this adds to the queue and then takes it back off in the same thread,
* so it actually blocks, and we don't need a big queue.
*
* TODO: Override in NTCP also and get rid of queue?
*/
public void send(OutNetMessage msg) {
if (msg.getTarget() == null) {
@ -368,29 +379,26 @@ public abstract class TransportImpl implements Transport {
_log.error("Error - bad message enqueued [target is null]: " + msg, new Exception("Added by"));
return;
}
boolean duplicate = false;
synchronized (_sendPool) {
if (_sendPool.contains(msg))
duplicate = true;
else
_sendPool.add(msg);
}
if (duplicate) {
try {
_sendPool.put(msg);
} catch (InterruptedException ie) {
if (_log.shouldLog(Log.ERROR))
_log.error("Message already is in the queue? wtf. msg = " + msg,
new Exception("wtf, requeued?"));
_log.error("Interrupted during send " + msg);
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Message added to send pool");
msg.timestamp("send on " + getStyle());
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Message added to send pool");
//msg.timestamp("send on " + getStyle());
outboundMessageReady();
if (_log.shouldLog(Log.INFO))
_log.debug("OutboundMessageReady called");
//if (_log.shouldLog(Log.INFO))
// _log.debug("OutboundMessageReady called");
}
/**
* This message is called whenever a new message is added to the send pool,
* and it should not block
*
* Only used by NTCP. SSU throws UOE.
*/
protected abstract void outboundMessageReady();

View File

@ -12,6 +12,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -98,8 +99,8 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().createRateStat("ntcp.attemptUnreachablePeer", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.closeOnBacklog", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.connectFailedIOE", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.bidRejectedLocalAddress", "", "ntcp", RATES);
//_context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", RATES);
//_context.statManager().createRateStat("ntcp.bidRejectedLocalAddress", "", "ntcp", RATES);
//_context.statManager().createRateStat("ntcp.bidRejectedNoNTCPAddress", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.connectFailedTimeout", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.connectFailedTimeoutIOE", "", "ntcp", RATES);
@ -183,7 +184,8 @@ public class NTCPTransport extends TransportImpl {
protected void outboundMessageReady() {
OutNetMessage msg = getNextMessage();
if (msg != null) {
RouterIdentity ident = msg.getTarget().getIdentity();
RouterInfo target = msg.getTarget();
RouterIdentity ident = target.getIdentity();
Hash ih = ident.calculateHash();
NTCPConnection con = null;
boolean isNew = false;
@ -191,7 +193,7 @@ public class NTCPTransport extends TransportImpl {
con = _conByIdent.get(ih);
if (con == null) {
isNew = true;
RouterAddress addr = msg.getTarget().getTargetAddress(STYLE);
RouterAddress addr = getTargetAddress(target);
if (addr != null) {
NTCPAddress naddr = new NTCPAddress(addr);
con = new NTCPConnection(_context, this, ident, naddr);
@ -199,7 +201,7 @@ public class NTCPTransport extends TransportImpl {
_log.debug("Send on a new con: " + con + " at " + addr + " for " + ih.toBase64());
_conByIdent.put(ih, con);
} else {
_log.error("we bid on a peer who doesn't have an ntcp address? " + msg.getTarget());
_log.error("we bid on a peer who doesn't have an ntcp address? " + target);
return;
}
}
@ -297,34 +299,12 @@ public class NTCPTransport extends TransportImpl {
_log.debug("fast bid when trying to send to " + peer + " as its already established");
return _fastBid;
}
RouterAddress addr = toAddress.getTargetAddress(STYLE);
RouterAddress addr = getTargetAddress(toAddress);
if (addr == null) {
markUnreachable(peer);
//_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1);
//_context.banlist().banlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE);
if (_log.shouldLog(Log.DEBUG))
_log.debug("no bid when trying to send to " + peer + " as they don't have an ntcp address");
return null;
}
byte[] ip = addr.getIP();
if ( (addr.getPort() < MIN_PEER_PORT) || (ip == null) ) {
_context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1);
markUnreachable(peer);
//_context.banlist().banlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE);
if (_log.shouldLog(Log.DEBUG))
_log.debug("no bid when trying to send to " + peer + " as they don't have a valid ntcp address");
return null;
}
if (!isPubliclyRoutable(ip)) {
if (! _context.getProperty("i2np.ntcp.allowLocal", "false").equals("true")) {
_context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1);
markUnreachable(peer);
if (_log.shouldLog(Log.DEBUG))
_log.debug("no bid when trying to send to " + peer + " as they have a private ntcp address");
return null;
}
}
if (!allowConnection()) {
if (_log.shouldLog(Log.WARN))
@ -350,6 +330,36 @@ public class NTCPTransport extends TransportImpl {
}
}
/**
* Get first available address we can use.
* @return address or null
* @since 0.9.6
*/
private RouterAddress getTargetAddress(RouterInfo target) {
List<RouterAddress> addrs = target.getTargetAddresses(STYLE);
for (int i = 0; i < addrs.size(); i++) {
RouterAddress addr = addrs.get(i);
byte[] ip = addr.getIP();
if (addr.getPort() < MIN_PEER_PORT || ip == null) {
//_context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1);
//_context.banlist().banlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("no bid when trying to send to " + peer + " as they don't have a valid ntcp address");
continue;
}
if (!isPubliclyRoutable(ip)) {
if (! _context.getBooleanProperty("i2np.ntcp.allowLocal")) {
//_context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("no bid when trying to send to " + peer + " as they have a private ntcp address");
continue;
}
}
return addr;
}
return null;
}
public boolean allowConnection() {
return countActivePeers() < getMaxConnections();
}

View File

@ -225,7 +225,7 @@ class EstablishmentManager {
*/
private void establish(OutNetMessage msg, boolean queueIfMaxExceeded) {
RouterInfo toRouterInfo = msg.getTarget();
RouterAddress ra = toRouterInfo.getTargetAddress(_transport.getStyle());
RouterAddress ra = _transport.getTargetAddress(toRouterInfo);
if (ra == null) {
_transport.failed(msg, "Remote peer has no address, cannot establish");
return;
@ -668,7 +668,7 @@ class EstablishmentManager {
// Perhaps netdb should notify transport when it gets a new RI...
RouterInfo info = _context.netDb().lookupRouterInfoLocally(remote.calculateHash());
if (info != null) {
RouterAddress addr = info.getTargetAddress(UDPTransport.STYLE);
RouterAddress addr = _transport.getTargetAddress(info);
if (addr != null) {
String smtu = addr.getOption(UDPAddress.PROP_MTU);
if (smtu != null) {

View File

@ -136,7 +136,7 @@ class IntroductionManager {
_log.info("Picked peer has no local routerInfo: " + cur);
continue;
}
RouterAddress ra = ri.getTargetAddress(UDPTransport.STYLE);
RouterAddress ra = _transport.getTargetAddress(ri);
if (ra == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Picked peer has no SSU address: " + ri);

View File

@ -727,7 +727,7 @@ class PeerTestManager {
aliceIntroKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
testInfo.readIntroKey(aliceIntroKey.getData(), 0);
RouterAddress raddr = charlieInfo.getTargetAddress(UDPTransport.STYLE);
RouterAddress raddr = _transport.getTargetAddress(charlieInfo);
if (raddr == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to pick a charlie");

View File

@ -1286,25 +1286,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
// Validate his SSU address
RouterAddress addr = toAddress.getTargetAddress(STYLE);
RouterAddress addr = getTargetAddress(toAddress);
if (addr == null) {
markUnreachable(to);
return null;
}
// don't do this - object churn parsing the whole thing
//UDPAddress ua = new UDPAddress(addr);
//if (ua.getIntroducerCount() <= 0) {
if (addr.getOption("ihost0") == null) {
byte[] ip = addr.getIP();
int port = addr.getPort();
if (ip == null || port < MIN_PEER_PORT ||
(!isValid(ip)) ||
Arrays.equals(ip, getExternalIP())) {
markUnreachable(to);
return null;
}
}
if (!allowConnection())
return _cachedBid[TRANSIENT_FAIL_BID];
@ -1337,6 +1324,29 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
/**
* Get first available address we can use.
* @return address or null
* @since 0.9.6
*/
RouterAddress getTargetAddress(RouterInfo target) {
List<RouterAddress> addrs = target.getTargetAddresses(STYLE);
for (int i = 0; i < addrs.size(); i++) {
RouterAddress addr = addrs.get(i);
if (addr.getOption("ihost0") == null) {
byte[] ip = addr.getIP();
int port = addr.getPort();
if (ip == null || port < MIN_PEER_PORT ||
(!isValid(ip)) ||
Arrays.equals(ip, getExternalIP())) {
continue;
}
}
return addr;
}
return null;
}
private boolean preferUDP() {
String pref = _context.getProperty(PROP_PREFER_UDP, DEFAULT_PREFER_UDP);
return (pref != null) && ! "false".equals(pref);