UDP cleanups

This commit is contained in:
zzz
2011-09-01 13:24:31 +00:00
parent c9687649a4
commit a69267dc87
5 changed files with 94 additions and 75 deletions

View File

@ -62,7 +62,7 @@ class ACKSender implements Runnable {
public void shutdown() {
_alive = false;
PeerState poison = new PeerState(_context, _transport);
PeerState poison = new PeerState(_context, _transport, null, 0, null, false);
poison.setTheyRelayToUsAs(POISON_PS);
_peersToACK.offer(poison);
for (int i = 1; i <= 5 && !_peersToACK.isEmpty(); i++) {

View File

@ -285,7 +285,8 @@ class EstablishmentManager {
// count as connections, we have to keep the connection to this peer up longer if
// we are offering introductions.
if ((!_context.router().isHidden()) && (!_transport.introducersRequired()) && _transport.haveCapacity()) {
long tag = _context.random().nextLong(MAX_TAG_VALUE);
// ensure > 0
long tag = 1 + _context.random().nextLong(MAX_TAG_VALUE);
state.setSentRelayTag(tag);
if (_log.shouldLog(Log.INFO))
_log.info("Received session request from " + from + ", sending relay tag " + tag);
@ -460,18 +461,13 @@ class EstablishmentManager {
long now = _context.clock().now();
RouterIdentity remote = state.getConfirmedIdentity();
PeerState peer = new PeerState(_context, _transport);
PeerState peer = new PeerState(_context, _transport,
state.getSentIP(), state.getSentPort(), remote.calculateHash(), true);
peer.setCurrentCipherKey(state.getCipherKey());
peer.setCurrentMACKey(state.getMACKey());
peer.setCurrentReceiveSecond(now - (now % 1000));
peer.setKeyEstablishedTime(now);
peer.setLastReceiveTime(now);
peer.setLastSendTime(now);
peer.setRemoteAddress(state.getSentIP(), state.getSentPort());
peer.setRemotePeer(remote.calculateHash());
peer.setWeRelayToThemAs(state.getSentRelayTag());
peer.setTheyRelayToUsAs(0);
peer.setInbound();
// 0 is the default
//peer.setTheyRelayToUsAs(0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle completely established (inbound): " + state.getRemoteHostId().toString()
@ -547,17 +543,13 @@ class EstablishmentManager {
long now = _context.clock().now();
RouterIdentity remote = state.getRemoteIdentity();
PeerState peer = new PeerState(_context, _transport);
PeerState peer = new PeerState(_context, _transport,
state.getSentIP(), state.getSentPort(), remote.calculateHash(), false);
peer.setCurrentCipherKey(state.getCipherKey());
peer.setCurrentMACKey(state.getMACKey());
peer.setCurrentReceiveSecond(now - (now % 1000));
peer.setKeyEstablishedTime(now);
peer.setLastReceiveTime(now);
peer.setLastSendTime(now);
peer.setRemoteAddress(state.getSentIP(), state.getSentPort());
peer.setRemotePeer(remote.calculateHash());
peer.setTheyRelayToUsAs(state.getReceivedRelayTag());
peer.setWeRelayToThemAs(0);
// 0 is the default
//peer.setWeRelayToThemAs(0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle completely established (outbound): " + state.getRemoteHostId().toString()
@ -598,6 +590,7 @@ class EstablishmentManager {
_transport.send(m, peer);
}
/** the relay tag is a 4-byte field in the protocol */
public static final long MAX_TAG_VALUE = 0xFFFFFFFFl;
private void sendCreated(InboundEstablishState state) {
@ -610,8 +603,9 @@ class EstablishmentManager {
// offer to relay
// (perhaps we should check our bw usage and/or how many peers we are
// already offering introducing?)
if (state.getSentRelayTag() < 0) {
state.setSentRelayTag(_context.random().nextLong(MAX_TAG_VALUE));
if (state.getSentRelayTag() == 0) {
// ensure > 0
state.setSentRelayTag(1 + _context.random().nextLong(MAX_TAG_VALUE));
} else {
// don't change it, since we've already prepared our sig
}

View File

@ -634,8 +634,7 @@ class PacketBuilder {
// BUG: NPE here if null signature
System.arraycopy(state.getSentSignature().getData(), 0, data, off, Signature.SIGNATURE_BYTES);
packet.getPacket().setLength(off + Signature.SIGNATURE_BYTES);
authenticate(packet, state.getCipherKey(), state.getMACKey());
off += Signature.SIGNATURE_BYTES;
} else {
// nothing more to add beyond the identity fragment, though we can
// pad here if we want. maybe randomized?
@ -644,9 +643,9 @@ class PacketBuilder {
// TODO: why not random data?
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
authenticate(packet, state.getCipherKey(), state.getMACKey());
}
packet.getPacket().setLength(off);
authenticate(packet, state.getCipherKey(), state.getMACKey());
setTo(packet, to, state.getSentPort());
packet.setMessageType(TYPE_CONF);

View File

@ -21,7 +21,7 @@ import net.i2p.util.ConcurrentHashSet;
/**
* Contain all of the state about a UDP connection to a peer.
*
* This is instantiated only after a connection is fully established.
*/
class PeerState {
private final RouterContext _context;
@ -32,7 +32,7 @@ class PeerState {
* receiving the connection this will be set only after the connection
* is established.
*/
private Hash _remotePeer;
private final Hash _remotePeer;
/**
* The AES key used to verify packets, set only after the connection is
* established.
@ -118,10 +118,10 @@ class PeerState {
private int _sendBytes;
private int _receiveBps;
private int _receiveBytes;
private int _sendACKBps;
private int _sendACKBytes;
private int _receiveACKBps;
private int _receiveACKBytes;
//private int _sendACKBps;
//private int _sendZACKBytes;
//private int _receiveACKBps;
//private int _receiveACKBytes;
private long _receivePeriodBegin;
private volatile long _lastCongestionOccurred;
/**
@ -131,13 +131,14 @@ class PeerState {
*/
private volatile int _slowStartThreshold;
/** what IP is the peer sending and receiving packets on? */
private byte[] _remoteIP;
private final byte[] _remoteIP;
/** cached IP address */
private transient InetAddress _remoteIPAddress;
/** what port is the peer sending and receiving packets on? */
private int _remotePort;
private final int _remotePort;
/** cached RemoteHostId, used to find the peerState by remote info */
private RemoteHostId _remoteHostId;
private final RemoteHostId _remoteHostId;
/** if we need to contact them, do we need to talk to an introducer? */
private boolean _remoteRequiresIntroduction;
/**
@ -208,7 +209,7 @@ class PeerState {
/** how many concurrency rejections have we had in a row */
private volatile int _consecutiveRejections = 0;
/** is it inbound? **/
private boolean _isInbound;
private final boolean _isInbound;
/** Last time it was made an introducer **/
private long _lastIntroducerTime;
@ -248,23 +249,25 @@ class PeerState {
/** override the default MTU */
private static final String PROP_DEFAULT_MTU = "i2np.udp.mtu";
public PeerState(RouterContext ctx, UDPTransport transport) {
public PeerState(RouterContext ctx, UDPTransport transport,
byte[] remoteIP, int remotePort, Hash remotePeer, boolean isInbound) {
_context = ctx;
_log = ctx.logManager().getLog(PeerState.class);
_transport = transport;
_keyEstablishedTime = -1;
_currentReceiveSecond = -1;
_lastSendTime = -1;
_lastReceiveTime = -1;
long now = ctx.clock().now();
_keyEstablishedTime = now;
_currentReceiveSecond = now - (now % 1000);
_lastSendTime = now;
_lastReceiveTime = now;
_currentACKs = new ConcurrentHashSet();
_currentACKsResend = new LinkedBlockingQueue();
_sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES;
_sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
_slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
_lastSendRefill = _context.clock().now();
_receivePeriodBegin = _lastSendRefill;
_lastSendRefill = now;
_receivePeriodBegin = now;
_lastCongestionOccurred = -1;
_remotePort = -1;
_remotePort = remotePort;
_mtu = getDefaultMTU();
_mtuReceive = _mtu;
//_mtuLastChecked = -1;
@ -275,6 +278,10 @@ class PeerState {
_inboundMessages = new HashMap(8);
_outboundMessages = new ArrayList(32);
// all createRateStat() moved to EstablishmentManager
_remoteIP = remoteIP;
_remotePeer = remotePeer;
_isInbound = isInbound;
_remoteHostId = new RemoteHostId(remoteIP, remotePort);
}
private int getDefaultMTU() {
@ -303,17 +310,21 @@ class PeerState {
* connection, or null if we are not in the process of rekeying.
*/
public SessionKey getNextMACKey() { return _nextMACKey; }
/**
* The pending AES key for encrypting/decrypting packets if we are
* rekeying the connection, or null if we are not in the process
* of rekeying.
*/
public SessionKey getNextCipherKey() { return _nextCipherKey; }
/**
* The keying material used for the rekeying, or null if we are not in
* the process of rekeying.
* @deprecated unused
*/
public byte[] getNextKeyingMaterial() { return _nextKeyingMaterial; }
/** true if we began the current rekeying, false otherwise */
public boolean getRekeyBeganLocally() { return _rekeyBeganLocally; }
/** when were the current cipher and MAC keys established/rekeyed? */
@ -348,6 +359,10 @@ class PeerState {
public int getSendWindowBytesRemaining() { return _sendWindowBytesRemaining; }
/** what IP is the peer sending and receiving packets on? */
public byte[] getRemoteIP() { return _remoteIP; }
/**
* @return may be null if IP is invalid
*/
public InetAddress getRemoteIPAddress() {
if (_remoteIPAddress == null) {
try {
@ -359,21 +374,28 @@ class PeerState {
}
return _remoteIPAddress;
}
/** what port is the peer sending and receiving packets on? */
public int getRemotePort() { return _remotePort; }
/** if we need to contact them, do we need to talk to an introducer? */
public boolean getRemoteRequiresIntroduction() { return _remoteRequiresIntroduction; }
/**
* if we are serving as an introducer to them, this is the the tag that
* they can publish that, when presented to us, will cause us to send
* a relay introduction to the current peer
* @return 0 (no relay) if unset previously
*/
public long getWeRelayToThemAs() { return _weRelayToThemAs; }
/**
* If they have offered to serve as an introducer to us, this is the tag
* we can use to publish that fact.
* @return 0 (no relay) if unset previously
*/
public long getTheyRelayToUsAs() { return _theyRelayToUsAs; }
/** what is the largest packet we can send to the peer? */
public int getMTU() { return _mtu; }
@ -391,42 +413,50 @@ class PeerState {
public long getMTUDecreases() { return _mtuDecreases; }
****/
/**
* The peer are we talking to. This should be set as soon as this
* state is created if we are initiating a connection, but if we are
* receiving the connection this will be set only after the connection
* is established.
*/
public void setRemotePeer(Hash peer) { _remotePeer = peer; }
/**
* The AES key used to verify packets, set only after the connection is
* established.
*/
public void setCurrentMACKey(SessionKey key) { _currentMACKey = key; }
/**
* The AES key used to encrypt/decrypt packets, set only after the
* connection is established.
*/
public void setCurrentCipherKey(SessionKey key) { _currentCipherKey = key; }
/**
* The pending AES key for verifying packets if we are rekeying the
* connection, or null if we are not in the process of rekeying.
* @deprecated unused
*/
public void setNextMACKey(SessionKey key) { _nextMACKey = key; }
/**
* The pending AES key for encrypting/decrypting packets if we are
* rekeying the connection, or null if we are not in the process
* of rekeying.
* @deprecated unused
*/
public void setNextCipherKey(SessionKey key) { _nextCipherKey = key; }
/**
* The keying material used for the rekeying, or null if we are not in
* the process of rekeying.
* @deprecated unused
*/
public void setNextKeyingMaterial(byte data[]) { _nextKeyingMaterial = data; }
/** true if we began the current rekeying, false otherwise */
/**
* @param local true if we began the current rekeying, false otherwise
* @deprecated unused
*/
public void setRekeyBeganLocally(boolean local) { _rekeyBeganLocally = local; }
/** when were the current cipher and MAC keys established/rekeyed? */
/**
* when were the current cipher and MAC keys established/rekeyed?
* @deprecated unused
*/
public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; }
/**
@ -469,8 +499,8 @@ class PeerState {
}
/** how fast we are sending *ack* packets */
public int getSendACKBps() { return _sendACKBps; }
public int getReceiveACKBps() { return _receiveACKBps; }
//public int getSendACKBps() { return _sendACKBps; }
//public int getReceiveACKBps() { return _receiveACKBps; }
/**
* have all of the packets received in the current second requested that
@ -498,12 +528,12 @@ class PeerState {
_sendWindowBytesRemaining = _sendWindowBytes;
_sendBytes += size;
_sendBps = (int)(0.9f*(float)_sendBps + 0.1f*((float)_sendBytes * (1000f/(float)duration)));
if (isForACK) {
_sendACKBytes += size;
_sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
}
//if (isForACK) {
// _sendACKBytes += size;
// _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
//}
_sendBytes = 0;
_sendACKBytes = 0;
//_sendACKBytes = 0;
_lastSendRefill = now;
}
//if (true) return true;
@ -522,32 +552,29 @@ class PeerState {
_sendWindowBytesRemaining -= size;
_sendBytes += size;
_lastSendTime = now;
if (isForACK)
_sendACKBytes += size;
//if (isForACK)
// _sendACKBytes += size;
return true;
} else {
return false;
}
}
/** what IP+port is the peer sending and receiving packets on? */
public void setRemoteAddress(byte ip[], int port) {
_remoteIP = ip;
_remoteIPAddress = null;
_remotePort = port;
_remoteHostId = new RemoteHostId(ip, port);
}
/** if we need to contact them, do we need to talk to an introducer? */
public void setRemoteRequiresIntroduction(boolean required) { _remoteRequiresIntroduction = required; }
/**
* if we are serving as an introducer to them, this is the the tag that
* they can publish that, when presented to us, will cause us to send
* a relay introduction to the current peer
* @param tag 1 to Integer.MAX_VALUE, or 0 if relaying disabled
*/
public void setWeRelayToThemAs(long tag) { _weRelayToThemAs = tag; }
/**
* If they have offered to serve as an introducer to us, this is the tag
* we can use to publish that fact.
* @param tag 1 to Integer.MAX_VALUE, or 0 if relaying disabled
*/
public void setTheyRelayToUsAs(long tag) { _theyRelayToUsAs = tag; }
@ -564,7 +591,6 @@ class PeerState {
public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; }
public int getConsecutiveSendRejections() { return _consecutiveRejections; }
public boolean isInbound() { return _isInbound; }
public void setInbound() { _isInbound = true; }
public long getIntroducerTime() { return _lastIntroducerTime; }
public void setIntroducerTime() { _lastIntroducerTime = _context.clock().now(); }
@ -573,8 +599,8 @@ class PeerState {
public void messageFullyReceived(Long messageId, int bytes, boolean isForACK) {
if (bytes > 0) {
_receiveBytes += bytes;
if (isForACK)
_receiveACKBytes += bytes;
//if (isForACK)
// _receiveACKBytes += bytes;
} else {
if (true || _retransmissionPeriodStart + 1000 < _context.clock().now()) {
_packetsReceivedDuplicate++;
@ -588,9 +614,9 @@ class PeerState {
long duration = now - _receivePeriodBegin;
if (duration >= 1000) {
_receiveBps = (int)(0.9f*(float)_receiveBps + 0.1f*((float)_receiveBytes * (1000f/(float)duration)));
if (isForACK)
_receiveACKBps = (int)(0.9f*(float)_receiveACKBps + 0.1f*((float)_receiveACKBytes * (1000f/(float)duration)));
_receiveACKBytes = 0;
//if (isForACK)
// _receiveACKBps = (int)(0.9f*(float)_receiveACKBps + 0.1f*((float)_receiveACKBytes * (1000f/(float)duration)));
//_receiveACKBytes = 0;
_receiveBytes = 0;
_receivePeriodBegin = now;
_context.statManager().addRateData("udp.receiveBps", _receiveBps, 0);

View File

@ -189,7 +189,7 @@ class UDPPacketReader {
return (int)DataHelper.fromLong(_message, offset, 2);
}
/** write out the 4 byte relayAs tag */
/** read in the 4 byte relayAs tag */
public long readRelayTag() {
int offset = readBodyOffset() + Y_LENGTH + 1 + readIPSize() + 2;
return DataHelper.fromLong(_message, offset, 4);