SSU minor cleanups

This commit is contained in:
zzz
2019-11-15 11:06:14 +00:00
parent 8218d55874
commit e22810fd93
2 changed files with 54 additions and 51 deletions

View File

@ -395,7 +395,7 @@ public class PeerState {
* Caller should sync; UDPTransport must remove and add to peersByRemoteHost map * Caller should sync; UDPTransport must remove and add to peersByRemoteHost map
* @since 0.9.3 * @since 0.9.3
*/ */
public void changePort(int newPort) { void changePort(int newPort) {
if (newPort != _remotePort) { if (newPort != _remotePort) {
_remoteHostId = new RemoteHostId(_remoteIP, newPort); _remoteHostId = new RemoteHostId(_remoteIP, newPort);
_remotePort = newPort; _remotePort = newPort;
@ -413,12 +413,12 @@ public class PeerState {
* The AES key used to verify packets, set only after the connection is * The AES key used to verify packets, set only after the connection is
* established. * established.
*/ */
public SessionKey getCurrentMACKey() { return _currentMACKey; } SessionKey getCurrentMACKey() { return _currentMACKey; }
/** /**
* The AES key used to encrypt/decrypt packets, set only after the * The AES key used to encrypt/decrypt packets, set only after the
* connection is established. * connection is established.
*/ */
public SessionKey getCurrentCipherKey() { return _currentCipherKey; } SessionKey getCurrentCipherKey() { return _currentCipherKey; }
/** /**
* The pending AES key for verifying packets if we are rekeying the * The pending AES key for verifying packets if we are rekeying the
@ -426,7 +426,7 @@ public class PeerState {
* *
* @return null always, rekeying unimplemented * @return null always, rekeying unimplemented
*/ */
public SessionKey getNextMACKey() { return _nextMACKey; } SessionKey getNextMACKey() { return _nextMACKey; }
/** /**
* The pending AES key for encrypting/decrypting packets if we are * The pending AES key for encrypting/decrypting packets if we are
@ -435,7 +435,7 @@ public class PeerState {
* *
* @return null always, rekeying unimplemented * @return null always, rekeying unimplemented
*/ */
public SessionKey getNextCipherKey() { return _nextCipherKey; } SessionKey getNextCipherKey() { return _nextCipherKey; }
/** /**
* The keying material used for the rekeying, or null if we are not in * The keying material used for the rekeying, or null if we are not in
@ -557,13 +557,13 @@ public class PeerState {
* The AES key used to verify packets, set only after the connection is * The AES key used to verify packets, set only after the connection is
* established. * established.
*/ */
public void setCurrentMACKey(SessionKey key) { _currentMACKey = key; } void setCurrentMACKey(SessionKey key) { _currentMACKey = key; }
/** /**
* The AES key used to encrypt/decrypt packets, set only after the * The AES key used to encrypt/decrypt packets, set only after the
* connection is established. * connection is established.
*/ */
public void setCurrentCipherKey(SessionKey key) { _currentCipherKey = key; } void setCurrentCipherKey(SessionKey key) { _currentCipherKey = key; }
/** /**
* The pending AES key for verifying packets if we are rekeying the * The pending AES key for verifying packets if we are rekeying the
@ -571,7 +571,7 @@ public class PeerState {
* @deprecated unused * @deprecated unused
*/ */
@Deprecated @Deprecated
public void setNextMACKey(SessionKey key) { _nextMACKey = key; } void setNextMACKey(SessionKey key) { _nextMACKey = key; }
/** /**
* The pending AES key for encrypting/decrypting packets if we are * The pending AES key for encrypting/decrypting packets if we are
@ -580,7 +580,7 @@ public class PeerState {
* @deprecated unused * @deprecated unused
*/ */
@Deprecated @Deprecated
public void setNextCipherKey(SessionKey key) { _nextCipherKey = key; } void setNextCipherKey(SessionKey key) { _nextCipherKey = key; }
/** /**
* The keying material used for the rekeying, or null if we are not in * The keying material used for the rekeying, or null if we are not in
@ -600,7 +600,7 @@ public class PeerState {
* @deprecated unused * @deprecated unused
*/ */
@Deprecated @Deprecated
public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; } void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; }
/** /**
* Update the moving-average clock skew based on the current difference. * Update the moving-average clock skew based on the current difference.
@ -608,7 +608,7 @@ public class PeerState {
* A positive number means our clock is ahead of theirs. * A positive number means our clock is ahead of theirs.
* @param skew milliseconds, NOT adjusted for RTT. * @param skew milliseconds, NOT adjusted for RTT.
*/ */
public void adjustClockSkew(long skew) { void adjustClockSkew(long skew) {
// the real one-way delay is much less than RTT / 2, due to ack delays, // the real one-way delay is much less than RTT / 2, due to ack delays,
// so add a fudge factor // so add a fudge factor
long actualSkew = skew + CLOCK_SKEW_FUDGE - (_rtt / 2); long actualSkew = skew + CLOCK_SKEW_FUDGE - (_rtt / 2);
@ -630,21 +630,21 @@ public class PeerState {
} }
/** when did we last send them a packet? */ /** when did we last send them a packet? */
public void setLastSendTime(long when) { _lastSendTime = when; } void setLastSendTime(long when) { _lastSendTime = when; }
/** when did we last receive a packet from them? */ /** when did we last receive a packet from them? */
public void setLastReceiveTime(long when) { _lastReceiveTime = when; } void setLastReceiveTime(long when) { _lastReceiveTime = when; }
/** /**
* Note ping sent. Does not update last send time. * Note ping sent. Does not update last send time.
* @since 0.9.3 * @since 0.9.3
*/ */
public void setLastPingTime(long when) { _lastPingTime = when; } void setLastPingTime(long when) { _lastPingTime = when; }
/** /**
* Latest of last sent, last ACK, last ping * Latest of last sent, last ACK, last ping
* @since 0.9.3 * @since 0.9.3
*/ */
public long getLastSendOrPingTime() { long getLastSendOrPingTime() {
return Math.max(Math.max(_lastSendTime, _lastACKSend), _lastPingTime); return Math.max(Math.max(_lastSendTime, _lastACKSend), _lastPingTime);
} }
@ -660,7 +660,7 @@ public class PeerState {
*/ */
public synchronized int getReceiveBps() { return _receiveBps; } public synchronized int getReceiveBps() { return _receiveBps; }
public int incrementConsecutiveFailedSends() { int incrementConsecutiveFailedSends() {
synchronized(_outboundMessages) { synchronized(_outboundMessages) {
_concurrentMessagesActive--; _concurrentMessagesActive--;
if (_concurrentMessagesActive < 0) if (_concurrentMessagesActive < 0)
@ -767,14 +767,14 @@ public class PeerState {
* a relay introduction to the current peer * a relay introduction to the current peer
* @param tag 1 to Integer.MAX_VALUE, or 0 if relaying disabled * @param tag 1 to Integer.MAX_VALUE, or 0 if relaying disabled
*/ */
public void setWeRelayToThemAs(long tag) { _weRelayToThemAs = tag; } void setWeRelayToThemAs(long tag) { _weRelayToThemAs = tag; }
/** /**
* If they have offered to serve as an introducer to us, this is the tag * If they have offered to serve as an introducer to us, this is the tag
* we can use to publish that fact. * we can use to publish that fact.
* @param tag 1 to Integer.MAX_VALUE, or 0 if relaying disabled * @param tag 1 to Integer.MAX_VALUE, or 0 if relaying disabled
*/ */
public void setTheyRelayToUsAs(long tag) { _theyRelayToUsAs = tag; } void setTheyRelayToUsAs(long tag) { _theyRelayToUsAs = tag; }
/** what is the largest packet we can send to the peer? */ /** what is the largest packet we can send to the peer? */
/**** /****
@ -828,16 +828,16 @@ public class PeerState {
} }
/** the last time we used them as an introducer, or 0 */ /** the last time we used them as an introducer, or 0 */
public long getIntroducerTime() { return _lastIntroducerTime; } long getIntroducerTime() { return _lastIntroducerTime; }
/** set the last time we used them as an introducer to now */ /** set the last time we used them as an introducer to now */
public void setIntroducerTime() { _lastIntroducerTime = _context.clock().now(); } void setIntroducerTime() { _lastIntroducerTime = _context.clock().now(); }
/** /**
* We received the message specified completely. * We received the message specified completely.
* @param bytes if less than or equal to zero, message is a duplicate. * @param bytes if less than or equal to zero, message is a duplicate.
*/ */
public void messageFullyReceived(Long messageId, int bytes) { messageFullyReceived(messageId, bytes, false); } void messageFullyReceived(Long messageId, int bytes) { messageFullyReceived(messageId, bytes, false); }
/** /**
* We received the message specified completely. * We received the message specified completely.
@ -876,7 +876,7 @@ public class PeerState {
_currentACKs.add(messageId); _currentACKs.add(messageId);
} }
public void messagePartiallyReceived() { void messagePartiallyReceived() {
if (_wantACKSendSince <= 0) if (_wantACKSendSince <= 0)
_wantACKSendSince = _context.clock().now(); _wantACKSendSince = _context.clock().now();
} }
@ -885,7 +885,7 @@ public class PeerState {
* Fetch the internal id (Long) to InboundMessageState for incomplete inbound messages. * Fetch the internal id (Long) to InboundMessageState for incomplete inbound messages.
* Access to this map must be synchronized explicitly! * Access to this map must be synchronized explicitly!
*/ */
public Map<Long, InboundMessageState> getInboundMessages() { return _inboundMessages; } Map<Long, InboundMessageState> getInboundMessages() { return _inboundMessages; }
/** /**
* Expire partially received inbound messages, returning how many are still pending. * Expire partially received inbound messages, returning how many are still pending.
@ -893,7 +893,7 @@ public class PeerState {
* try to send them any messages (and don't receive any messages from them either) * try to send them any messages (and don't receive any messages from them either)
* *
*/ */
public int expireInboundMessages() { int expireInboundMessages() {
int rv = 0; int rv = 0;
synchronized (_inboundMessages) { synchronized (_inboundMessages) {
@ -953,7 +953,7 @@ public class PeerState {
* *
* @return a new list, do as you like with it * @return a new list, do as you like with it
*/ */
public List<Long> getCurrentFullACKs() { List<Long> getCurrentFullACKs() {
// no such element exception seen here // no such element exception seen here
List<Long> rv = new ArrayList<Long>(_currentACKs); List<Long> rv = new ArrayList<Long>(_currentACKs);
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
@ -975,7 +975,7 @@ public class PeerState {
* @return a new list, do as you like with it * @return a new list, do as you like with it
* @since 0.8.12 was included in getCurrentFullACKs() * @since 0.8.12 was included in getCurrentFullACKs()
*/ */
public List<Long> getCurrentResendACKs() { List<Long> getCurrentResendACKs() {
int sz = _currentACKsResend.size(); int sz = _currentACKsResend.size();
List<Long> randomResends = new ArrayList<Long>(sz); List<Long> randomResends = new ArrayList<Long>(sz);
if (sz > 0) { if (sz > 0) {
@ -1002,7 +1002,7 @@ public class PeerState {
* The ack was sent. * The ack was sent.
* Side effect - sets _lastACKSend * Side effect - sets _lastACKSend
*/ */
public void removeACKMessage(Long messageId) { void removeACKMessage(Long messageId) {
boolean removed = _currentACKs.remove(messageId); boolean removed = _currentACKs.remove(messageId);
if (removed) { if (removed) {
// only add if removed from current, as this may be called for // only add if removed from current, as this may be called for
@ -1029,7 +1029,7 @@ public class PeerState {
* @deprecated unused * @deprecated unused
*/ */
@Deprecated @Deprecated
public List<ACKBitfield> retrieveACKBitfields() { return retrieveACKBitfields(true); } List<ACKBitfield> retrieveACKBitfields() { return retrieveACKBitfields(true); }
/** /**
* See above. Only called by ACKSender with alwaysIncludeRetransmissions = false. * See above. Only called by ACKSender with alwaysIncludeRetransmissions = false.
@ -1039,7 +1039,7 @@ public class PeerState {
* *
* @return non-null, possibly empty * @return non-null, possibly empty
*/ */
public List<ACKBitfield> retrieveACKBitfields(boolean alwaysIncludeRetransmissions) { List<ACKBitfield> retrieveACKBitfields(boolean alwaysIncludeRetransmissions) {
int bytesRemaining = countMaxACKData(); int bytesRemaining = countMaxACKData();
// Limit the overhead of all the resent acks when using small MTU // Limit the overhead of all the resent acks when using small MTU
@ -1310,7 +1310,7 @@ public class PeerState {
/** /**
* @since 0.9.2 * @since 0.9.2
*/ */
public synchronized void setHisMTU(int mtu) { synchronized void setHisMTU(int mtu) {
if (mtu <= MIN_MTU || mtu >= _largeMTU || if (mtu <= MIN_MTU || mtu >= _largeMTU ||
(_remoteIP.length == 16 && mtu <= MIN_IPV6_MTU)) (_remoteIP.length == 16 && mtu <= MIN_IPV6_MTU))
return; return;
@ -1320,7 +1320,7 @@ public class PeerState {
} }
/** we are resending a packet, so lets jack up the rto */ /** we are resending a packet, so lets jack up the rto */
public synchronized void messageRetransmitted(int packets) { synchronized void messageRetransmitted(int packets) {
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes); _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes);
_context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation); _context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation);
_packetsRetransmitted += packets; _packetsRetransmitted += packets;
@ -1328,7 +1328,7 @@ public class PeerState {
adjustMTU(); adjustMTU();
} }
public synchronized void packetsTransmitted(int packets) { synchronized void packetsTransmitted(int packets) {
_packetsTransmitted += packets; _packetsTransmitted += packets;
} }
@ -1377,7 +1377,7 @@ public class PeerState {
/** /**
* @param size not including IP header, UDP header, MAC or IV * @param size not including IP header, UDP header, MAC or IV
*/ */
public synchronized void packetReceived(int size) { synchronized void packetReceived(int size) {
_packetsReceived++; _packetsReceived++;
int minMTU; int minMTU;
if (_remoteIP.length == 4) { if (_remoteIP.length == 4) {
@ -1402,7 +1402,7 @@ public class PeerState {
* We received a backoff request, so cut our send window. * We received a backoff request, so cut our send window.
* NOTE: ECN sending is unimplemented, this is never called. * NOTE: ECN sending is unimplemented, this is never called.
*/ */
public void ECNReceived() { void ECNReceived() {
synchronized(this) { synchronized(this) {
congestionOccurred(); congestionOccurred();
} }
@ -1411,7 +1411,7 @@ public class PeerState {
_lastReceiveTime = _context.clock().now(); _lastReceiveTime = _context.clock().now();
} }
public void dataReceived() { void dataReceived() {
_lastReceiveTime = _context.clock().now(); _lastReceiveTime = _context.clock().now();
} }
@ -1432,7 +1432,7 @@ public class PeerState {
* packet is lost the acks have a decent chance of getting retransmitted. * packet is lost the acks have a decent chance of getting retransmitted.
* Used only by ACKSender. * Used only by ACKSender.
*/ */
public boolean unsentACKThresholdReached() { boolean unsentACKThresholdReached() {
//int threshold = countMaxACKData() / 4; //int threshold = countMaxACKData() / 4;
//return _currentACKs.size() >= threshold; //return _currentACKs.size() >= threshold;
return _currentACKs.size() >= MAX_RESEND_ACKS / 2; return _currentACKs.size() >= MAX_RESEND_ACKS / 2;
@ -1475,7 +1475,7 @@ public class PeerState {
* TODO backlog / pushback / block instead of dropping? Can't really block here. * TODO backlog / pushback / block instead of dropping? Can't really block here.
* TODO SSU does not support isBacklogged() now * TODO SSU does not support isBacklogged() now
*/ */
public void add(OutboundMessageState state) { void add(OutboundMessageState state) {
if (_dead) { if (_dead) {
_transport.failed(state, false); _transport.failed(state, false);
return; return;
@ -1554,7 +1554,7 @@ public class PeerState {
} }
/** drop all outbound messages */ /** drop all outbound messages */
public void dropOutbound() { void dropOutbound() {
//if (_dead) return; //if (_dead) return;
_dead = true; _dead = true;
//_outboundMessages = null; //_outboundMessages = null;
@ -1605,7 +1605,7 @@ public class PeerState {
* *
* @return number of active outbound messages remaining * @return number of active outbound messages remaining
*/ */
public int finishMessages(long now) { int finishMessages(long now) {
// short circuit, unsynchronized // short circuit, unsynchronized
if (_outboundMessages.isEmpty()) if (_outboundMessages.isEmpty())
return _outboundQueue.size(); return _outboundQueue.size();
@ -1679,7 +1679,7 @@ public class PeerState {
* *
* @return allocated messages to send (never empty), or null if no messages or no resources * @return allocated messages to send (never empty), or null if no messages or no resources
*/ */
public List<OutboundMessageState> allocateSend() { List<OutboundMessageState> allocateSend() {
if (_dead) return null; if (_dead) return null;
List<OutboundMessageState> rv = null; List<OutboundMessageState> rv = null;
synchronized (_outboundMessages) { synchronized (_outboundMessages) {
@ -1764,7 +1764,7 @@ public class PeerState {
* @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send. * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
* If ready now, will return 0 or a negative value. * If ready now, will return 0 or a negative value.
*/ */
public int getNextDelay() { int getNextDelay() {
int rv = Integer.MAX_VALUE; int rv = Integer.MAX_VALUE;
if (_dead) return rv; if (_dead) return rv;
long now = _context.clock().now(); long now = _context.clock().now();
@ -1822,7 +1822,7 @@ public class PeerState {
* how much payload data can we shove in there? * how much payload data can we shove in there?
* @return MTU - 87, i.e. 533 or 1397 (IPv4), MTU - 107 (IPv6) * @return MTU - 87, i.e. 533 or 1397 (IPv4), MTU - 107 (IPv6)
*/ */
public int fragmentSize() { int fragmentSize() {
// 46 + 20 + 8 + 13 = 74 + 13 = 87 (IPv4) // 46 + 20 + 8 + 13 = 74 + 13 = 87 (IPv4)
// 46 + 40 + 8 + 13 = 94 + 13 = 107 (IPv6) // 46 + 40 + 8 + 13 = 94 + 13 = 107 (IPv6)
return _mtu - return _mtu -
@ -1919,7 +1919,7 @@ public class PeerState {
* *
* @return true if the message was acked for the first time * @return true if the message was acked for the first time
*/ */
public boolean acked(long messageId) { boolean acked(long messageId) {
if (_dead) return false; if (_dead) return false;
OutboundMessageState state = null; OutboundMessageState state = null;
synchronized (_outboundMessages) { synchronized (_outboundMessages) {
@ -1977,7 +1977,7 @@ public class PeerState {
* *
* @return true if the message was completely acked for the first time * @return true if the message was completely acked for the first time
*/ */
public boolean acked(ACKBitfield bitfield) { boolean acked(ACKBitfield bitfield) {
if (_dead) if (_dead)
return false; return false;
@ -2054,7 +2054,7 @@ public class PeerState {
* *
* @param oldPeer non-null * @param oldPeer non-null
*/ */
public void loadFrom(PeerState oldPeer) { void loadFrom(PeerState oldPeer) {
_rto = oldPeer._rto; _rto = oldPeer._rto;
_rtt = oldPeer._rtt; _rtt = oldPeer._rtt;
_rttDeviation = oldPeer._rttDeviation; _rttDeviation = oldPeer._rttDeviation;
@ -2112,7 +2112,7 @@ public class PeerState {
* Convenience for OutboundMessageState so it can fail itself * Convenience for OutboundMessageState so it can fail itself
* @since 0.9.3 * @since 0.9.3
*/ */
public UDPTransport getTransport() { UDPTransport getTransport() {
return _transport; return _transport;
} }

View File

@ -1964,10 +1964,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return (pref != null) && "always".equals(pref); return (pref != null) && "always".equals(pref);
} }
// We used to have MAX_IDLE_TIME = 5m, but this causes us to drop peers /**
// and lose the old introducer tags, causing introduction fails, * We used to have MAX_IDLE_TIME = 5m, but this causes us to drop peers
// so we keep the max time long to give the introducer keepalive code * and lose the old introducer tags, causing introduction fails,
// in the IntroductionManager a chance to work. * so we keep the max time long to give the introducer keepalive code
* in the IntroductionManager a chance to work.
*/
public static final int EXPIRE_TIMEOUT = 20*60*1000; public static final int EXPIRE_TIMEOUT = 20*60*1000;
private static final int MAX_IDLE_TIME = EXPIRE_TIMEOUT; private static final int MAX_IDLE_TIME = EXPIRE_TIMEOUT;
public static final int MIN_EXPIRE_TIMEOUT = 165*1000; public static final int MIN_EXPIRE_TIMEOUT = 165*1000;
@ -2648,7 +2650,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.info("Consecutive failure #" + consecutive _log.info("Consecutive failure #" + consecutive
+ " on " + msg.toString() + " on " + msg.toString()
+ " to " + msg.getPeer()); + " to " + msg.getPeer());
if ( (_context.clock().now() - msg.getPeer().getLastSendFullyTime() <= 60*1000) || (consecutive < MAX_CONSECUTIVE_FAILED) ) { if (consecutive < MAX_CONSECUTIVE_FAILED ||
_context.clock().now() - msg.getPeer().getLastSendFullyTime() <= 60*1000) {
// ok, a few conseutive failures, but we /are/ getting through to them // ok, a few conseutive failures, but we /are/ getting through to them
} else { } else {
_context.statManager().addRateData("udp.dropPeerConsecutiveFailures", consecutive, msg.getPeer().getInactivityTime()); _context.statManager().addRateData("udp.dropPeerConsecutiveFailures", consecutive, msg.getPeer().getInactivityTime());