2006-02-21 jrandom

* Throttle the outbound SSU establishment queue, so it doesn't fill up the
      heap when backlogged (and so that the messages queued up on it don't sit
      there forever)
    * Further SSU memory cleanup
    * Clean up the address regeneration code so it knows when to rebuild the
      local info more precisely.
This commit is contained in:
jrandom
2006-02-21 13:31:18 +00:00
committed by zzz
parent 9990126e3e
commit b4c495531a
14 changed files with 242 additions and 121 deletions

View File

@ -372,8 +372,6 @@ public class MessageOutputStream extends OutputStream {
/** /**
* called whenever the engine wants to push more data to the * called whenever the engine wants to push more data to the
* peer * peer
*
* @return true if the data was flushed
*/ */
void flushAvailable(DataReceiver target) throws IOException { void flushAvailable(DataReceiver target) throws IOException {
flushAvailable(target, true); flushAvailable(target, true);

View File

@ -102,7 +102,8 @@ public class PacketHandler {
Connection con = (sendId > 0 ? _manager.getConnectionByInboundId(sendId) : null); Connection con = (sendId > 0 ? _manager.getConnectionByInboundId(sendId) : null);
if (con != null) { if (con != null) {
receiveKnownCon(con, packet); receiveKnownCon(con, packet);
displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO()); if (_log.shouldLog(Log.INFO))
displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO());
} else { } else {
receiveUnknownCon(packet, sendId); receiveUnknownCon(packet, sendId);
displayPacket(packet, "UNKN", null); displayPacket(packet, "UNKN", null);

View File

@ -153,7 +153,6 @@ public class AESEngine {
/** decrypt the data with the session key provided /** decrypt the data with the session key provided
* @param payload encrypted data * @param payload encrypted data
* @param sessionKey private session key * @param sessionKey private session key
* @return unencrypted data
*/ */
public void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) { public void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) {
System.arraycopy(payload, inIndex, rv, outIndex, rv.length - outIndex); System.arraycopy(payload, inIndex, rv, outIndex, rv.length - outIndex);

View File

@ -139,7 +139,6 @@ public class CryptixAESEngine extends AESEngine {
/** decrypt the data with the session key provided /** decrypt the data with the session key provided
* @param payload encrypted data * @param payload encrypted data
* @param sessionKey private session key * @param sessionKey private session key
* @return unencrypted data
*/ */
public final void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) { public final void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) {
if ( (payload == null) || (rv == null) ) if ( (payload == null) || (rv == null) )

View File

@ -58,8 +58,8 @@ public class SimpleTimer {
*/ */
public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); }
/** /**
* @param useEarliestEventTime if its already scheduled, use the earlier of the * @param useEarliestTime if its already scheduled, use the earlier of the
* two timeouts, else use the later * two timeouts, else use the later
*/ */
public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) { public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
int totalEvents = 0; int totalEvents = 0;

View File

@ -1,10 +1,12 @@
$Id: history.txt,v 1.411 2006/02/20 13:12:47 jrandom Exp $ $Id: history.txt,v 1.412 2006/02/20 21:02:49 jrandom Exp $
2006-02-20 jrandom 2006-02-21 jrandom
* Throttle the outbound SSU establishment queue, so it doesn't fill up the * Throttle the outbound SSU establishment queue, so it doesn't fill up the
heap when backlogged (and so that the messages queued up on it don't sit heap when backlogged (and so that the messages queued up on it don't sit
there forever) there forever)
* Further SSU memory cleanup * Further SSU memory cleanup
* Clean up the address regeneration code so it knows when to rebuild the
local info more precisely.
2006-02-20 jrandom 2006-02-20 jrandom
* Properly enable TCP this time (oops) * Properly enable TCP this time (oops)

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.353 $ $Date: 2006/02/20 13:12:48 $"; public final static String ID = "$Revision: 1.354 $ $Date: 2006/02/20 21:02:49 $";
public final static String VERSION = "0.6.1.10"; public final static String VERSION = "0.6.1.10";
public final static long BUILD = 9; public final static long BUILD = 10;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -211,6 +211,7 @@ public class StatisticsManager implements Service {
Rate curRate = rate.getRate(periods[i]); Rate curRate = rate.getRate(periods[i]);
if (curRate == null) continue; if (curRate == null) continue;
if (curRate.getLifetimeEventCount() <= 0) continue;
stats.setProperty("stat_" + rateName + '.' + getPeriod(curRate), renderRate(curRate, fudgeQuantity)); stats.setProperty("stat_" + rateName + '.' + getPeriod(curRate), renderRate(curRate, fudgeQuantity));
} }
} }

View File

@ -174,6 +174,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
if (!fragmentOK) if (!fragmentOK)
break; break;
} }
from.expireInboundMessages();
return fragments; return fragments;
} }

View File

@ -2,7 +2,10 @@ package net.i2p.router.transport.udp;
import java.util.*; import java.util.*;
import net.i2p.data.Base64;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.data.RouterInfo;
import net.i2p.data.RouterAddress;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -68,18 +71,47 @@ public class IntroductionManager {
return (PeerState)_outbound.get(new Long(id)); return (PeerState)_outbound.get(new Long(id));
} }
public void pickInbound(List rv, int howMany) { /**
* Grab a bunch of peers who are willing to be introducers for us that
* are locally known (duh) and have published their own SSU address (duh^2).
* The picked peers have their info tacked on to the ssuOptions parameter for
* use in the SSU RouterAddress.
*
*/
public int pickInbound(Properties ssuOptions, int howMany) {
List peers = null;
int start = _context.random().nextInt(Integer.MAX_VALUE); int start = _context.random().nextInt(Integer.MAX_VALUE);
synchronized (_inbound) { synchronized (_inbound) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Picking inbound out of " + _inbound); _log.debug("Picking inbound out of " + _inbound);
if (_inbound.size() <= 0) return; if (_inbound.size() <= 0) return 0;
start = start % _inbound.size(); peers = new ArrayList(_inbound);
for (int i = 0; i < _inbound.size() && rv.size() < howMany; i++) {
PeerState cur = (PeerState)_inbound.get((start + i) % _inbound.size());
rv.add(cur);
}
} }
int sz = peers.size();
start = start % sz;
int found = 0;
for (int i = 0; i < sz && found < howMany; i++) {
PeerState cur = (PeerState)peers.get((start + i) % sz);
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(cur.getRemotePeer());
if (ri == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Picked peer has no local routerInfo: " + cur);
continue;
}
RouterAddress ra = ri.getTargetAddress(UDPTransport.STYLE);
if (ra == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Picked peer has no SSU address: " + ri);
continue;
}
UDPAddress ura = new UDPAddress(ra);
ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, cur.getRemoteHostId().toHostString());
ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(cur.getRemotePort()));
ssuOptions.setProperty(UDPAddress.PROP_INTRO_KEY_PREFIX + found, Base64.encode(ura.getIntroKey()));
ssuOptions.setProperty(UDPAddress.PROP_INTRO_TAG_PREFIX + found, String.valueOf(cur.getTheyRelayToUsAs()));
found++;
}
return found;
} }
public void receiveRelayIntro(RemoteHostId bob, UDPPacketReader reader) { public void receiveRelayIntro(RemoteHostId bob, UDPPacketReader reader) {

View File

@ -142,6 +142,11 @@ public class OutboundMessageFragments {
boolean ok = state.initialize(msg, msgBody); boolean ok = state.initialize(msg, msgBody);
if (ok) { if (ok) {
PeerState peer = _transport.getPeerState(target.getIdentity().calculateHash()); PeerState peer = _transport.getPeerState(target.getIdentity().calculateHash());
if (peer == null) {
_transport.failed(msg, "Peer disconnected quickly");
state.releaseResources();
return;
}
int active = peer.add(state); int active = peer.add(state);
synchronized (_activePeers) { synchronized (_activePeers) {
if (!_activePeers.contains(peer)) { if (!_activePeers.contains(peer)) {

View File

@ -61,7 +61,8 @@ public class PacketHandler {
_context.statManager().createRateStat("udp.droppedInvalidInboundEstablish", "How old the packet we dropped due to invalidity (inbound establishment, bad key) was", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.droppedInvalidInboundEstablish", "How old the packet we dropped due to invalidity (inbound establishment, bad key) was", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.droppedInvalidSkew", "How skewed the packet we dropped due to invalidity (valid except bad skew) was", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.droppedInvalidSkew", "How skewed the packet we dropped due to invalidity (valid except bad skew) was", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.packetDequeueTime", "How long it takes the UDPReader to pull a packet off the inbound packet queue (when its slow)", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.packetDequeueTime", "How long it takes the UDPReader to pull a packet off the inbound packet queue (when its slow)", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (when its slow)", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (period is dequeue time)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.packetVerifyTimeSlow", "How long it takes the PacketHandler to verify a data packet after dequeueing when its slow (period is dequeue time)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
} }
public void startup() { public void startup() {
@ -140,8 +141,11 @@ public class PacketHandler {
timeToVerify = beforeRecv - packet.getTimeSinceReceived(); timeToVerify = beforeRecv - packet.getTimeSinceReceived();
if (timeToDequeue > 50) if (timeToDequeue > 50)
_context.statManager().addRateData("udp.packetDequeueTime", timeToDequeue, timeToDequeue); _context.statManager().addRateData("udp.packetDequeueTime", timeToDequeue, timeToDequeue);
if (timeToVerify > 50) if (timeToVerify > 0) {
_context.statManager().addRateData("udp.packetVerifyTime", timeToVerify, timeToVerify); _context.statManager().addRateData("udp.packetVerifyTime", timeToVerify, timeToDequeue);
if (timeToVerify > 100)
_context.statManager().addRateData("udp.packetVerifyTimeSlow", timeToVerify, timeToDequeue);
}
// back to the cache with thee! // back to the cache with thee!
packet.release(); packet.release();

View File

@ -606,7 +606,7 @@ public class PeerState {
int remaining = _inboundMessages.size(); int remaining = _inboundMessages.size();
for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) { for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) {
InboundMessageState state = (InboundMessageState)iter.next(); InboundMessageState state = (InboundMessageState)iter.next();
if (state.isExpired()) { if (state.isExpired() || _dead) {
iter.remove(); iter.remove();
} else { } else {
if (state.isComplete()) { if (state.isComplete()) {
@ -977,6 +977,10 @@ public class PeerState {
public int add(OutboundMessageState state) { public int add(OutboundMessageState state) {
if (_dead) {
_transport.failed(state, false);
return 0;
}
state.setPeer(this); state.setPeer(this);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
@ -989,20 +993,23 @@ public class PeerState {
} }
/** drop all outbound messages */ /** drop all outbound messages */
public void dropOutbound() { public void dropOutbound() {
if (_dead) return; //if (_dead) return;
_dead = true; _dead = true;
List msgs = _outboundMessages; List msgs = _outboundMessages;
//_outboundMessages = null; //_outboundMessages = null;
_retransmitter = null; _retransmitter = null;
if (msgs != null) { if (msgs != null) {
int sz = 0;
List tempList = null; List tempList = null;
synchronized (msgs) { synchronized (msgs) {
tempList = new ArrayList(msgs); sz = msgs.size();
msgs.clear(); if (sz > 0) {
tempList = new ArrayList(msgs);
msgs.clear();
}
} }
int sz = tempList.size();
for (int i = 0; i < sz; i++) for (int i = 0; i < sz; i++)
_transport.failed((OutboundMessageState)tempList.get(i)); _transport.failed((OutboundMessageState)tempList.get(i), false);
} }
} }
@ -1025,7 +1032,10 @@ public class PeerState {
public int finishMessages() { public int finishMessages() {
int rv = 0; int rv = 0;
List msgs = _outboundMessages; List msgs = _outboundMessages;
if (_dead) return 0; if (_dead) {
dropOutbound();
return 0;
}
List succeeded = null; List succeeded = null;
List failed = null; List failed = null;
synchronized (msgs) { synchronized (msgs) {
@ -1404,14 +1414,18 @@ public class PeerState {
tmp.addAll(oldPeer._currentACKs); tmp.addAll(oldPeer._currentACKs);
oldPeer._currentACKs.clear(); oldPeer._currentACKs.clear();
} }
synchronized (_currentACKs) { _currentACKs.addAll(tmp); } if (!_dead) {
synchronized (_currentACKs) { _currentACKs.addAll(tmp); }
}
tmp.clear(); tmp.clear();
synchronized (oldPeer._currentACKsResend) { synchronized (oldPeer._currentACKsResend) {
tmp.addAll(oldPeer._currentACKsResend); tmp.addAll(oldPeer._currentACKsResend);
oldPeer._currentACKsResend.clear(); oldPeer._currentACKsResend.clear();
} }
synchronized (_currentACKsResend) { _currentACKsResend.addAll(tmp); } if (!_dead) {
synchronized (_currentACKsResend) { _currentACKsResend.addAll(tmp); }
}
tmp.clear(); tmp.clear();
Map msgs = new HashMap(); Map msgs = new HashMap();
@ -1419,7 +1433,9 @@ public class PeerState {
msgs.putAll(oldPeer._inboundMessages); msgs.putAll(oldPeer._inboundMessages);
oldPeer._inboundMessages.clear(); oldPeer._inboundMessages.clear();
} }
synchronized (_inboundMessages) { _inboundMessages.putAll(msgs); } if (!_dead) {
synchronized (_inboundMessages) { _inboundMessages.putAll(msgs); }
}
msgs.clear(); msgs.clear();
OutboundMessageState retransmitter = null; OutboundMessageState retransmitter = null;
@ -1429,9 +1445,11 @@ public class PeerState {
retransmitter = oldPeer._retransmitter; retransmitter = oldPeer._retransmitter;
oldPeer._retransmitter = null; oldPeer._retransmitter = null;
} }
synchronized (_outboundMessages) { if (!_dead) {
_outboundMessages.addAll(tmp); synchronized (_outboundMessages) {
_retransmitter = retransmitter; _outboundMessages.addAll(tmp);
_retransmitter = retransmitter;
}
} }
tmp.clear(); tmp.clear();
} }

View File

@ -59,6 +59,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private long _introducersSelectedOn; private long _introducersSelectedOn;
private long _lastInboundReceivedOn; private long _lastInboundReceivedOn;
/** do we need to rebuild our external router address asap? */
private boolean _needsRebuild;
/** summary info to distribute */ /** summary info to distribute */
private RouterAddress _externalAddress; private RouterAddress _externalAddress;
/** port number on which we can be reached, or -1 */ /** port number on which we can be reached, or -1 */
@ -140,6 +143,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_introManager = new IntroductionManager(_context, this); _introManager = new IntroductionManager(_context, this);
_introducersSelectedOn = -1; _introducersSelectedOn = -1;
_lastInboundReceivedOn = -1; _lastInboundReceivedOn = -1;
_needsRebuild = true;
_context.statManager().createRateStat("udp.alreadyConnected", "What is the lifetime of a reestablished session", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.alreadyConnected", "What is the lifetime of a reestablished session", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
@ -302,7 +306,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("External address received: " + RemoteHostId.toString(ourIP) + ":" + ourPort + " from " _log.info("External address received: " + RemoteHostId.toString(ourIP) + ":" + ourPort + " from "
+ from.toBase64() + ", isValid? " + isValid + ", explicitSpecified? " + explicitSpecified + from.toBase64() + ", isValid? " + isValid + ", explicitSpecified? " + explicitSpecified
+ ", receivedInboundRecent? " + inboundRecent); + ", receivedInboundRecent? " + inboundRecent + " status " + _reachabilityStatus);
if (explicitSpecified) if (explicitSpecified)
return; return;
@ -319,8 +323,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return; return;
} else if (inboundRecent) { } else if (inboundRecent) {
// use OS clock since its an ordering thing, not a time thing // use OS clock since its an ordering thing, not a time thing
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn("Ignoring IP address suggestion, since we have received an inbound con recently"); _log.info("Ignoring IP address suggestion, since we have received an inbound con recently");
} else { } else {
synchronized (this) { synchronized (this) {
if ( (_externalListenHost == null) || if ( (_externalListenHost == null) ||
@ -328,6 +332,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if ( (_reachabilityStatus == CommSystemFacade.STATUS_UNKNOWN) || if ( (_reachabilityStatus == CommSystemFacade.STATUS_UNKNOWN) ||
(_context.clock().now() - _reachabilityStatusLastUpdated > 2*TEST_FREQUENCY) ) { (_context.clock().now() - _reachabilityStatusLastUpdated > 2*TEST_FREQUENCY) ) {
// they told us something different and our tests are either old or failing // they told us something different and our tests are either old or failing
if (_log.shouldLog(Log.INFO))
_log.info("Trying to change our external address...");
try { try {
_externalListenHost = InetAddress.getByAddress(ourIP); _externalListenHost = InetAddress.getByAddress(ourIP);
if (!fixedPort) if (!fixedPort)
@ -337,14 +343,20 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
updated = true; updated = true;
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
_externalListenHost = null; _externalListenHost = null;
if (_log.shouldLog(Log.INFO))
_log.info("Error trying to change our external address", uhe);
} }
} else { } else {
// they told us something different, but our tests are recent and positive, // they told us something different, but our tests are recent and positive,
// so lets test again // so lets test again
fireTest = true; fireTest = true;
if (_log.shouldLog(Log.INFO))
_log.info("Different address, but we're fine..");
} }
} else { } else {
// matched what we expect // matched what we expect
if (_log.shouldLog(Log.INFO))
_log.info("Same address as the current one");
} }
} }
} }
@ -524,8 +536,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (oldEstablishedOn > 0) if (oldEstablishedOn > 0)
_context.statManager().addRateData("udp.alreadyConnected", oldEstablishedOn, 0); _context.statManager().addRateData("udp.alreadyConnected", oldEstablishedOn, 0);
// if we need introducers, try to shift 'em around every 10 minutes if (needsRebuild())
if (introducersRequired() && (_introducersSelectedOn < _context.clock().now() - 10*60*1000))
rebuildExternalAddress(); rebuildExternalAddress();
if (getReachabilityStatus() != CommSystemFacade.STATUS_OK) { if (getReachabilityStatus() != CommSystemFacade.STATUS_OK) {
@ -536,8 +547,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
public RouterAddress getCurrentAddress() { public RouterAddress getCurrentAddress() {
// if we need introducers, try to shift 'em around every 10 minutes if (needsRebuild())
if (introducersRequired() && (_introducersSelectedOn < _context.clock().now() - 10*60*1000))
rebuildExternalAddress(false); rebuildExternalAddress(false);
return super.getCurrentAddress(); return super.getCurrentAddress();
} }
@ -578,8 +588,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
} }
} else { } else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Received another message: " + inMsg.getClass().getName()); _log.debug("Received another message: " + inMsg.getClass().getName());
} }
PeerState peer = getPeerState(remoteIdentHash); PeerState peer = getPeerState(remoteIdentHash);
super.messageReceived(inMsg, remoteIdent, remoteIdentHash, msToReceive, bytesReceived); super.messageReceived(inMsg, remoteIdent, remoteIdentHash, msToReceive, bytesReceived);
@ -636,12 +646,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
peer.dropOutbound(); peer.dropOutbound();
peer.expireInboundMessages();
_introManager.remove(peer); _introManager.remove(peer);
_fragments.dropPeer(peer); _fragments.dropPeer(peer);
// a bit overzealous - perhaps we should only rebuild the external if the peer being dropped
// is one of our introducers? dropping it only if we are considered 'not reachable' is a start PeerState altByIdent = null;
if (introducersRequired()) PeerState altByHost = null;
rebuildExternalAddress();
if (peer.getRemotePeer() != null) { if (peer.getRemotePeer() != null) {
dropPeerCapacities(peer); dropPeerCapacities(peer);
@ -655,14 +665,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_context.statManager().addRateData("udp.droppedPeerInactive", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime()); _context.statManager().addRateData("udp.droppedPeerInactive", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
} }
synchronized (_peersByIdent) { synchronized (_peersByIdent) {
_peersByIdent.remove(peer.getRemotePeer()); altByIdent = (PeerState)_peersByIdent.remove(peer.getRemotePeer());
} }
} }
RemoteHostId remoteId = peer.getRemoteHostId(); RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId != null) { if (remoteId != null) {
synchronized (_peersByRemoteHost) { synchronized (_peersByRemoteHost) {
_peersByRemoteHost.remove(remoteId); altByHost = (PeerState)_peersByRemoteHost.remove(remoteId);
} }
} }
@ -672,6 +682,64 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (SHOULD_FLOOD_PEERS) if (SHOULD_FLOOD_PEERS)
_flooder.removePeer(peer); _flooder.removePeer(peer);
_expireEvent.remove(peer); _expireEvent.remove(peer);
if (needsRebuild())
rebuildExternalAddress();
// deal with races to make sure we drop the peers fully
if ( (altByIdent != null) && (peer != altByIdent) ) dropPeer(altByIdent, shouldShitlist);
if ( (altByHost != null) && (peer != altByHost) ) dropPeer(altByHost, shouldShitlist);
}
private boolean needsRebuild() {
if (_needsRebuild) return true; // simple enough
if (_context.router().isHidden()) return false;
if (introducersRequired()) {
RouterAddress addr = _externalAddress;
UDPAddress ua = new UDPAddress(addr);
int valid = 0;
Hash peerHash = new Hash();
for (int i = 0; i < ua.getIntroducerCount(); i++) {
// warning: this is only valid as long as we use the ident hash as their key.
peerHash.setData(ua.getIntroducerKey(i));
PeerState peer = getPeerState(peerHash);
if (peer != null)
valid++;
}
if (valid >= PUBLIC_RELAY_COUNT) {
// try to shift 'em around every 10 minutes or so
if (_introducersSelectedOn < _context.clock().now() - 10*60*1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("Our introducers are valid, but thy havent changed in a while, so lets rechoose");
return true;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Our introducers are valid and haven't changed in a while");
return false;
}
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Our introducers are not valid (" +valid + ")");
return true;
}
} else {
boolean rv = (_externalListenHost == null) || (_externalListenPort <= 0);
if (_log.shouldLog(Log.INFO)) {
if (rv) {
_log.info("Need to initialize our direct SSU info");
} else {
RouterAddress addr = _externalAddress;
UDPAddress ua = new UDPAddress(addr);
if ( (ua.getPort() <= 0) || (ua.getHost() == null) ) {
_log.info("Our direct SSU info is initialized, but not used in our address yet");
rv = true;
} else {
_log.info("Our direct SSU info is initialized");
}
}
}
return rv;
}
} }
/** /**
@ -786,20 +854,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments.add(state); _fragments.add(state);
} }
//public OutNetMessage getNextMessage() { return getNextMessage(-1); }
/**
* Get the next message, blocking until one is found or the expiration
* reached.
*
* @param blockUntil expiration, or -1 if indefinite
*/
/*
public OutNetMessage getNextMessage(long blockUntil) {
return _outboundMessages.getNext(blockUntil);
}
*/
// we don't need the following, since we have our own queueing // we don't need the following, since we have our own queueing
protected void outboundMessageReady() { throw new UnsupportedOperationException("Not used for UDP"); } protected void outboundMessageReady() { throw new UnsupportedOperationException("Not used for UDP"); }
@ -848,68 +902,58 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
Properties options = new Properties(); Properties options = new Properties();
boolean directIncluded = false;
if ( allowDirectUDP() && (_externalListenPort > 0) && (_externalListenHost != null) && (isValid(_externalListenHost.getAddress())) ) {
options.setProperty(UDPAddress.PROP_PORT, String.valueOf(_externalListenPort));
options.setProperty(UDPAddress.PROP_HOST, _externalListenHost.getHostAddress());
directIncluded = true;
}
boolean introducersRequired = introducersRequired(); boolean introducersRequired = introducersRequired();
if (introducersRequired) { boolean introducersIncluded = false;
List peers = new ArrayList(PUBLIC_RELAY_COUNT); if (introducersRequired || !directIncluded) {
int found = 0; int found = _introManager.pickInbound(options, PUBLIC_RELAY_COUNT);
_introManager.pickInbound(peers, PUBLIC_RELAY_COUNT);
if (_log.shouldLog(Log.INFO))
_log.info("Introducers required, picked peers: " + peers);
for (int i = 0; i < peers.size(); i++) {
PeerState peer = (PeerState)peers.get(i);
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer());
if (ri == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Picked peer has no local routerInfo: " + peer);
continue;
}
RouterAddress ra = ri.getTargetAddress(STYLE);
if (ra == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Picked peer has no SSU address: " + ri);
continue;
}
UDPAddress ura = new UDPAddress(ra);
options.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + i, peer.getRemoteHostId().toHostString());
options.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + i, String.valueOf(peer.getRemotePort()));
options.setProperty(UDPAddress.PROP_INTRO_KEY_PREFIX + i, Base64.encode(ura.getIntroKey()));
options.setProperty(UDPAddress.PROP_INTRO_TAG_PREFIX + i, String.valueOf(peer.getTheyRelayToUsAs()));
found++;
}
if (found > 0) { if (found > 0) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Picked peers: " + found); _log.info("Picked peers: " + found);
_introducersSelectedOn = _context.clock().now(); _introducersSelectedOn = _context.clock().now();
introducersIncluded = true;
} }
} }
if ( allowDirectUDP() && (_externalListenPort > 0) && (_externalListenHost != null) && (isValid(_externalListenHost.getAddress())) ) {
options.setProperty(UDPAddress.PROP_PORT, String.valueOf(_externalListenPort)); // if we have explicit external addresses, they had better be reachable
options.setProperty(UDPAddress.PROP_HOST, _externalListenHost.getHostAddress()); if (introducersRequired)
// if we have explicit external addresses, they had better be reachable options.setProperty(UDPAddress.PROP_CAPACITY, ""+UDPAddress.CAPACITY_TESTING);
if (introducersRequired) else
options.setProperty(UDPAddress.PROP_CAPACITY, ""+UDPAddress.CAPACITY_TESTING); options.setProperty(UDPAddress.PROP_CAPACITY, ""+UDPAddress.CAPACITY_TESTING + UDPAddress.CAPACITY_INTRODUCER);
else
options.setProperty(UDPAddress.PROP_CAPACITY, ""+UDPAddress.CAPACITY_TESTING + UDPAddress.CAPACITY_INTRODUCER); if (directIncluded || introducersIncluded) {
options.setProperty(UDPAddress.PROP_INTRO_KEY, _introKey.toBase64());
RouterAddress addr = new RouterAddress();
addr.setCost(5);
addr.setExpiration(null);
addr.setTransportStyle(STYLE);
addr.setOptions(options);
boolean wantsRebuild = false;
if ( (_externalAddress == null) || !(_externalAddress.equals(addr)) )
wantsRebuild = true;
RouterAddress oldAddress = _externalAddress;
_externalAddress = addr;
if (_log.shouldLog(Log.INFO))
_log.info("Address rebuilt: " + addr);
replaceAddress(addr, oldAddress);
if (allowRebuildRouterInfo && wantsRebuild)
_context.router().rebuildRouterInfo();
_needsRebuild = false;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Wanted to rebuild my SSU address, but couldn't specify either the direct or indirect info (needs introducers? "
+ introducersRequired + ")", new Exception("source"));
_needsRebuild = true;
} }
options.setProperty(UDPAddress.PROP_INTRO_KEY, _introKey.toBase64());
RouterAddress addr = new RouterAddress();
addr.setCost(5);
addr.setExpiration(null);
addr.setTransportStyle(STYLE);
addr.setOptions(options);
boolean wantsRebuild = false;
if ( (_externalAddress == null) || !(_externalAddress.equals(addr)) )
wantsRebuild = true;
RouterAddress oldAddress = _externalAddress;
_externalAddress = addr;
if (_log.shouldLog(Log.INFO))
_log.info("Address rebuilt: " + addr);
replaceAddress(addr, oldAddress);
if (allowRebuildRouterInfo)
_context.router().rebuildRouterInfo();
} }
protected void replaceAddress(RouterAddress address, RouterAddress oldAddress) { protected void replaceAddress(RouterAddress address, RouterAddress oldAddress) {
@ -940,13 +984,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public boolean introducersRequired() { public boolean introducersRequired() {
String forceIntroducers = _context.getProperty(PROP_FORCE_INTRODUCERS); String forceIntroducers = _context.getProperty(PROP_FORCE_INTRODUCERS);
if ( (forceIntroducers != null) && (Boolean.valueOf(forceIntroducers).booleanValue()) ) if ( (forceIntroducers != null) && (Boolean.valueOf(forceIntroducers).booleanValue()) ) {
if (_log.shouldLog(Log.INFO))
_log.info("Force introducers specified");
return true; return true;
switch (getReachabilityStatus()) { }
short status = getReachabilityStatus();
switch (status) {
case CommSystemFacade.STATUS_REJECT_UNSOLICITED: case CommSystemFacade.STATUS_REJECT_UNSOLICITED:
case CommSystemFacade.STATUS_DIFFERENT: case CommSystemFacade.STATUS_DIFFERENT:
if (_log.shouldLog(Log.INFO))
_log.info("Require introducers, because our status is " + status);
return true; return true;
default: default:
if (!allowDirectUDP()) {
if (_log.shouldLog(Log.INFO))
_log.info("Require introducers, because we do not allow direct UDP connections");
return true;
}
return false; return false;
} }
} }
@ -966,11 +1021,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int DROP_INACTIVITY_TIME = 60*1000; private static final int DROP_INACTIVITY_TIME = 60*1000;
public void failed(OutboundMessageState msg) { public void failed(OutboundMessageState msg) { failed(msg, true); }
void failed(OutboundMessageState msg, boolean allowPeerFailure) {
if (msg == null) return; if (msg == null) return;
int consecutive = 0; int consecutive = 0;
OutNetMessage m = msg.getMessage(); OutNetMessage m = msg.getMessage();
if ( (msg.getPeer() != null) && if ( allowPeerFailure && (msg.getPeer() != null) &&
( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) || ( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) ||
(msg.isExpired())) ) { (msg.isExpired())) ) {
//long recvDelay = _context.clock().now() - msg.getPeer().getLastReceiveTime(); //long recvDelay = _context.clock().now() - msg.getPeer().getLastReceiveTime();
@ -1456,6 +1512,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final long STATUS_GRACE_PERIOD = 5*60*1000; private static final long STATUS_GRACE_PERIOD = 5*60*1000;
void setReachabilityStatus(short status) { void setReachabilityStatus(short status) {
short old = _reachabilityStatus;
long now = _context.clock().now(); long now = _context.clock().now();
switch (status) { switch (status) {
case CommSystemFacade.STATUS_OK: case CommSystemFacade.STATUS_OK:
@ -1485,6 +1542,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
//} //}
break; break;
} }
if ( (status != old) && (status != CommSystemFacade.STATUS_UNKNOWN) ) {
if (needsRebuild())
rebuildExternalAddress();
}
} }
private static final String PROP_REACHABILITY_STATUS_OVERRIDE = "i2np.udp.status"; private static final String PROP_REACHABILITY_STATUS_OVERRIDE = "i2np.udp.status";
public short getReachabilityStatus() { public short getReachabilityStatus() {