2005-08-17 jrandom

* Revise the SSU peer testing protocol so that Bob verifies Charlie's
      viability before agreeing to Alice's request.  This doesn't work with
      older SSU peer test builds, but is backwards compatible (older nodes
      won't ask newer nodes to participate in tests, and newer nodes won't
      ask older nodes to either).
This commit is contained in:
jrandom
2005-08-17 20:05:01 +00:00
committed by zzz
parent cade27dceb
commit 11204b8a2b
11 changed files with 444 additions and 212 deletions

View File

@ -44,19 +44,19 @@ public class Base64 {
/** added by aum */
public static String encode(String source) {
return encode(source.getBytes());
return (source != null ? encode(source.getBytes()) : "");
}
public static String encode(byte[] source) {
return encode(source, 0, (source != null ? source.length : 0));
return (source != null ? encode(source, 0, (source != null ? source.length : 0)) : "");
}
public static String encode(byte[] source, int off, int len) {
return encode(source, off, len, false);
return (source != null ? encode(source, off, len, false) : "");
}
public static String encode(byte[] source, boolean useStandardAlphabet) {
return encode(source, 0, (source != null ? source.length : 0), useStandardAlphabet);
return (source != null ? encode(source, 0, (source != null ? source.length : 0), useStandardAlphabet) : "");
}
public static String encode(byte[] source, int off, int len, boolean useStandardAlphabet) {
return safeEncode(source, off, len, useStandardAlphabet);
return (source != null ? safeEncode(source, off, len, useStandardAlphabet) : "");
}
public static byte[] decode(String s) {

View File

@ -642,7 +642,7 @@ public class LogManager {
public void shutdown() {
_log.log(Log.WARN, "Shutting down logger");
_writer.flushRecords();
_writer.flushRecords(false);
}
private static int __id = 0;

View File

@ -59,7 +59,8 @@ class LogWriter implements Runnable {
}
}
public void flushRecords() {
public void flushRecords() { flushRecords(true); }
public void flushRecords(boolean shouldWait) {
try {
List records = _manager._removeAll();
if (records == null) return;
@ -77,11 +78,13 @@ class LogWriter implements Runnable {
} catch (Throwable t) {
t.printStackTrace();
} finally {
try {
synchronized (this) {
this.wait(10*1000);
if (shouldWait) {
try {
synchronized (this) {
this.wait(10*1000);
}
} catch (InterruptedException ie) { // nop
}
} catch (InterruptedException ie) { // nop
}
}
}

View File

@ -1,4 +1,11 @@
$Id: history.txt,v 1.225 2005/08/10 18:55:41 jrandom Exp $
$Id: history.txt,v 1.226 2005/08/12 18:54:47 jrandom Exp $
2005-08-17 jrandom
* Revise the SSU peer testing protocol so that Bob verifies Charlie's
viability before agreeing to Alice's request. This doesn't work with
older SSU peer test builds, but is backwards compatible (older nodes
won't ask newer nodes to participate in tests, and newer nodes won't
ask older nodes to either).
2005-08-12 jrandom
* Keep detailed stats on the peer testing, publishing the results in the

View File

@ -1,4 +1,4 @@
<code>$Id: udp.html,v 1.14 2005/07/27 14:04:07 jrandom Exp $</code>
<code>$Id: udp.html,v 1.15 2005/08/03 13:58:13 jrandom Exp $</code>
<h1>Secure Semireliable UDP (SSU)</h1>
<b>DRAFT</b>
@ -573,8 +573,10 @@ quite simple:</p>
<pre>
Alice Bob Charlie
PeerTest ------------------&gt;
&lt;-------------PeerTest PeerTest-------------&gt;
PeerTest -------------------&gt;
PeerTest--------------------&gt;
&lt;-------------------PeerTest
&lt;-------------------PeerTest
&lt;------------------------------------------PeerTest
PeerTest------------------------------------------&gt;
&lt;------------------------------------------PeerTest
@ -592,7 +594,8 @@ that may be reached are as follows:</p>
up to a certain number of times, but if no response ever arrives,
she will know that her firewall or NAT is somehow misconfigured,
rejecting all inbound UDP packets even in direct response to an
outbound packet. Alternately, Bob may be down.</li>
outbound packet. Alternately, Bob may be down or unable to get
Charlie to reply.</li>
<li>If Alice doesn't receive a PeerTest message with the
expected nonce from a third party (Charlie), she will retransmit
@ -713,4 +716,4 @@ with either Bob or Charlie, but it is not required.</p>
<dd>If the peer address contains the 'B' capability, that means
they are willing and able to serve as an introducer - serving
as a Bob for an otherwise unreachable Alice.</dd>
</dl>
</dl>

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.214 $ $Date: 2005/08/10 18:55:41 $";
public final static String ID = "$Revision: 1.215 $ $Date: 2005/08/12 18:54:47 $";
public final static String VERSION = "0.6.0.2";
public final static long BUILD = 2;
public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -102,7 +102,7 @@ public class StatisticsManager implements Service {
stats.putAll(_context.profileManager().summarizePeers(_publishedStats));
includeThroughput(stats);
//includeRate("router.invalidMessageTime", stats, new long[] { 10*60*1000 });
includeRate("router.invalidMessageTime", stats, new long[] { 10*60*1000 });
includeRate("router.duplicateMessageId", stats, new long[] { 24*60*60*1000 });
//includeRate("tunnel.duplicateIV", stats, new long[] { 24*60*60*1000 });
includeRate("tunnel.fragmentedDropped", stats, new long[] { 10*60*1000, 3*60*60*1000 });

View File

@ -4,8 +4,14 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.i2p.router.CommSystemFacade;
import net.i2p.router.RouterContext;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
@ -20,30 +26,11 @@ class PeerTestManager {
private Log _log;
private UDPTransport _transport;
private PacketBuilder _packetBuilder;
/**
* circular list of nonces which we have received as if we were 'Charlie'
* (meaning if we see it again, we aren't Bob and shouldn't find our own Charlie).
* Synchronize against this when updating it
*/
private long _receiveAsCharlie[];
/** index into _receiveAsCharlie which we should next write to */
private int _receiveAsCharlieIndex;
/** nonce we are currently running our own test as, or -1 */
private long _currentTestNonce;
private InetAddress _bobIP;
private int _bobPort;
private SessionKey _bobIntroKey;
private SessionKey _bobCipherKey;
private SessionKey _bobMACKey;
private long _testBeginTime;
private long _lastSendTime;
private long _receiveBobReplyTime;
private long _receiveCharlieReplyTime;
private InetAddress _charlieIP;
private int _charliePort;
private SessionKey _charlieIntroKey;
private int _receiveBobReplyPort;
private int _receiveCharlieReplyPort;
/** map of Long(nonce) to PeerTestState for tests currently in progress */
private Map _activeTests;
/** current test we are running, or null */
private PeerTestState _currentTest;
private List _recentTests;
/** longest we will keep track of a Charlie nonce for */
private static final int MAX_CHARLIE_LIFETIME = 10*1000;
@ -52,9 +39,10 @@ class PeerTestManager {
_context = context;
_transport = transport;
_log = context.logManager().getLog(PeerTestManager.class);
_receiveAsCharlie = new long[64];
_activeTests = new HashMap(64);
_recentTests = Collections.synchronizedList(new ArrayList(16));
_packetBuilder = new PacketBuilder(context);
_currentTestNonce = -1;
_currentTest = null;
}
private static final int RESEND_TIMEOUT = 5*1000;
@ -62,24 +50,22 @@ class PeerTestManager {
private static final long MAX_NONCE = (1l << 32) - 1l;
//public void runTest(InetAddress bobIP, int bobPort, SessionKey bobIntroKey) {
public void runTest(InetAddress bobIP, int bobPort, SessionKey bobCipherKey, SessionKey bobMACKey) {
_currentTestNonce = _context.random().nextLong(MAX_NONCE);
_bobIP = bobIP;
_bobPort = bobPort;
//_bobIntroKey = bobIntroKey;
_bobCipherKey = bobCipherKey;
_bobMACKey = bobMACKey;
_charlieIP = null;
_charliePort = -1;
_charlieIntroKey = null;
_testBeginTime = _context.clock().now();
_lastSendTime = _testBeginTime;
_receiveBobReplyTime = -1;
_receiveCharlieReplyTime = -1;
_receiveBobReplyPort = -1;
_receiveCharlieReplyPort = -1;
PeerTestState test = new PeerTestState();
test.setNonce(_context.random().nextLong(MAX_NONCE));
test.setBobIP(bobIP);
test.setBobPort(bobPort);
test.setBobCipherKey(bobCipherKey);
test.setBobMACKey(bobMACKey);
test.setBeginTime(_context.clock().now());
test.setLastSendTime(test.getBeginTime());
test.setOurRole(PeerTestState.ALICE);
_currentTest = test;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Running test with bob = " + bobIP + ":" + bobPort);
_log.debug("Running test with bob = " + bobIP + ":" + bobPort + " " + test.getNonce());
while (_recentTests.size() > 16)
_recentTests.remove(0);
_recentTests.add(new Long(test.getNonce()));
sendTestToBob();
@ -88,16 +74,17 @@ class PeerTestManager {
private class ContinueTest implements SimpleTimer.TimedEvent {
public void timeReached() {
if (_currentTestNonce < 0) {
PeerTestState state = _currentTest;
if (state == null) {
// already completed
return;
} else if (expired()) {
testComplete();
} else {
if (_receiveBobReplyTime < 0) {
} else if (_context.clock().now() - state.getLastSendTime() >= RESEND_TIMEOUT) {
if (state.getReceiveBobTime() <= 0) {
// no message from Bob yet, send it again
sendTestToBob();
} else if (_receiveCharlieReplyTime < 0) {
} else if (state.getReceiveCharlieTime() <= 0) {
// received from Bob, but no reply from Charlie. send it to
// Bob again so he pokes Charlie
sendTestToBob();
@ -109,20 +96,32 @@ class PeerTestManager {
SimpleTimer.getInstance().addEvent(ContinueTest.this, RESEND_TIMEOUT);
}
}
private boolean expired() { return _testBeginTime + MAX_TEST_TIME < _context.clock().now(); }
private boolean expired() {
PeerTestState state = _currentTest;
if (state != null)
return _currentTest.getBeginTime() + MAX_TEST_TIME < _context.clock().now();
else
return true;
}
}
private void sendTestToBob() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending test to bob: " + _bobIP + ":" + _bobPort);
_transport.send(_packetBuilder.buildPeerTestFromAlice(_bobIP, _bobPort, _bobCipherKey, _bobMACKey, //_bobIntroKey,
_currentTestNonce, _transport.getIntroKey()));
PeerTestState test = _currentTest;
if (test != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending test to bob: " + test.getBobIP() + ":" + test.getBobPort());
_transport.send(_packetBuilder.buildPeerTestFromAlice(test.getBobIP(), test.getBobPort(), test.getBobCipherKey(), test.getBobMACKey(), //_bobIntroKey,
test.getNonce(), _transport.getIntroKey()));
}
}
private void sendTestToCharlie() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending test to charlie: " + _charlieIP + ":" + _charliePort);
_transport.send(_packetBuilder.buildPeerTestFromAlice(_charlieIP, _charliePort, _charlieIntroKey,
_currentTestNonce, _transport.getIntroKey()));
PeerTestState test = _currentTest;
if (test != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending test to charlie: " + test.getCharlieIP() + ":" + test.getCharliePort());
_transport.send(_packetBuilder.buildPeerTestFromAlice(test.getCharlieIP(), test.getCharliePort(), test.getCharlieIntroKey(),
test.getNonce(), _transport.getIntroKey()));
}
}
@ -131,28 +130,53 @@ class PeerTestManager {
* test
*/
private void receiveTestReply(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo) {
if ( (DataHelper.eq(from.getIP(), _bobIP.getAddress())) && (from.getPort() == _bobPort) ) {
_receiveBobReplyTime = _context.clock().now();
_receiveBobReplyPort = testInfo.readPort();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test reply from bob @ " + _bobIP + " on " + _receiveBobReplyPort);
} else {
if (_receiveCharlieReplyTime > 0) {
// this is our second charlie, yay!
_receiveCharlieReplyPort = testInfo.readPort();
PeerTestState test = _currentTest;
if ( (DataHelper.eq(from.getIP(), test.getBobIP().getAddress())) && (from.getPort() == test.getBobPort()) ) {
byte ip[] = new byte[testInfo.readIPSize()];
testInfo.readIP(ip, 0);
try {
InetAddress addr = InetAddress.getByAddress(ip);
test.setAliceIP(addr);
test.setReceiveBobTime(_context.clock().now());
test.setAlicePort(testInfo.readPort());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test reply from charlie @ " + _charlieIP + " on " + _receiveCharlieReplyPort);
testComplete();
_log.debug("Receive test reply from bob @ " + from.getIP() + " via our " + test.getAlicePort() + "/" + test.getAlicePortFromCharlie());
if (test.getAlicePortFromCharlie() > 0)
testComplete();
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Unable to get our IP from bob's reply: " + from + ", " + testInfo, uhe);
}
} else {
if (test.getReceiveCharlieTime() > 0) {
// this is our second charlie, yay!
test.setAlicePortFromCharlie(testInfo.readPort());
byte ip[] = new byte[testInfo.readIPSize()];
testInfo.readIP(ip, 0);
try {
InetAddress addr = InetAddress.getByAddress(ip);
test.setAliceIPFromCharlie(addr);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test reply from charlie @ " + test.getCharlieIP() + " via our "
+ test.getAlicePort() + "/" + test.getAlicePortFromCharlie());
if (test.getReceiveBobTime() > 0)
testComplete();
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Charlie @ " + from + " said we were an invalid IP address: " + uhe.getMessage(), uhe);
}
} else {
// ok, first charlie. send 'em a packet
_receiveCharlieReplyTime = _context.clock().now();
_charlieIntroKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
testInfo.readIntroKey(_charlieIntroKey.getData(), 0);
_charliePort = from.getPort();
test.setReceiveCharlieTime(_context.clock().now());
SessionKey charlieIntroKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
testInfo.readIntroKey(charlieIntroKey.getData(), 0);
test.setCharlieIntroKey(charlieIntroKey);
try {
_charlieIP = InetAddress.getByAddress(from.getIP());
test.setCharlieIP(InetAddress.getByAddress(from.getIP()));
test.setCharliePort(from.getPort());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test from charlie @ " + _charlieIP + " on " + _charliePort);
_log.debug("Receive test from charlie @ " + from);
sendTestToCharlie();
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
@ -169,42 +193,34 @@ class PeerTestManager {
*/
private void testComplete() {
short status = -1;
if (_receiveCharlieReplyPort > 0) {
PeerTestState test = _currentTest;
if (test == null) return;
if (test.getAlicePortFromCharlie() > 0) {
// we received a second message from charlie
if (_receiveBobReplyPort == _receiveCharlieReplyPort) {
if ( (test.getAlicePort() == test.getAlicePortFromCharlie()) &&
(test.getAliceIP() != null) && (test.getAliceIPFromCharlie() != null) &&
(test.getAliceIP().equals(test.getAliceIPFromCharlie())) ) {
status = CommSystemFacade.STATUS_OK;
} else {
status = CommSystemFacade.STATUS_DIFFERENT;
}
} else if (_receiveCharlieReplyTime > 0) {
} else if (test.getReceiveCharlieTime() > 0) {
// we received only one message from charlie
status = CommSystemFacade.STATUS_UNKNOWN;
} else if (_receiveBobReplyTime > 0) {
} else if (test.getReceiveBobTime() > 0) {
// we received a message from bob but no messages from charlie
status = CommSystemFacade.STATUS_REJECT_UNSOLICITED;
} else {
// we never received anything from bob - he is either down or ignoring us
// we never received anything from bob - he is either down,
// ignoring us, or unable to get a Charlie to respond
status = CommSystemFacade.STATUS_UNKNOWN;
}
honorStatus(status);
if (_log.shouldLog(Log.INFO))
_log.info("Test complete: " + test);
// now zero everything out
_currentTestNonce = -1;
_bobIP = null;
_bobPort = -1;
_bobIntroKey = null;
_bobCipherKey = null;
_bobMACKey = null;
_charlieIP = null;
_charliePort = -1;
_charlieIntroKey = null;
_testBeginTime = -1;
_lastSendTime = -1;
_receiveBobReplyTime = -1;
_receiveCharlieReplyTime = -1;
_receiveBobReplyPort = -1;
_receiveCharlieReplyPort = -1;
honorStatus(status);
_currentTest = null;
}
/**
@ -226,39 +242,62 @@ class PeerTestManager {
*/
public void receiveTest(RemoteHostId from, UDPPacketReader reader) {
UDPPacketReader.PeerTestReader testInfo = reader.getPeerTestReader();
byte fromIP[] = null;
int fromPort = testInfo.readPort();
byte testIP[] = null;
int testPort = testInfo.readPort();
long nonce = testInfo.readNonce();
if (nonce == _currentTestNonce) {
PeerTestState test = _currentTest;
if ( (test != null) && (test.getNonce() == nonce) ) {
receiveTestReply(from, testInfo);
return;
}
if ( (testInfo.readIPSize() > 0) && (fromPort > 0) ) {
fromIP = new byte[testInfo.readIPSize()];
testInfo.readIP(fromIP, 0);
if ( (testInfo.readIPSize() > 0) && (testPort > 0) ) {
testIP = new byte[testInfo.readIPSize()];
testInfo.readIP(testIP, 0);
}
if ( ( (fromIP == null) && (fromPort <= 0) ) || // info is unknown or...
(DataHelper.eq(fromIP, from.getIP()) && (fromPort == from.getPort())) ) { // info matches sender
int knownIndex = -1;
boolean weAreCharlie = false;
synchronized (_receiveAsCharlie) {
for (int i = 0; (i < _receiveAsCharlie.length) && (knownIndex == -1); i++)
if (_receiveAsCharlie[i] == nonce)
knownIndex = i;
}
weAreCharlie = (knownIndex != -1);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test with nonce " + nonce + ", known as charlie @ " + knownIndex);
if (weAreCharlie) {
receiveFromAliceAsCharlie(from, testInfo, nonce);
PeerTestState state = null;
synchronized (_activeTests) {
state = (PeerTestState)_activeTests.get(new Long(nonce));
}
if (state == null) {
if ( (testIP == null) || (testPort <= 0) ) {
// we are bob, since we haven't seen this nonce before AND its coming from alice
if (_log.shouldLog(Log.DEBUG))
_log.debug("test IP/port are blank coming from " + from + ", assuming we are Bob and they are alice");
receiveFromAliceAsBob(from, testInfo, nonce, null);
} else {
receiveFromAliceAsBob(from, testInfo, nonce);
if (_recentTests.contains(new Long(nonce))) {
// ignore the packet, as its a holdover from a recently completed locally
// initiated test
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("We are charlie, as te testIP/port is " + testIP + ":" + testPort + " and the state is unknown for " + nonce);
// we are charlie, since alice never sends us her IP and port, only bob does (and,
// erm, we're not alice, since it isn't our nonce)
receiveFromBobAsCharlie(from, testInfo, nonce, null);
}
}
} else {
receiveFromBobAsCharlie(from, fromIP, fromPort, nonce, testInfo);
if (state.getOurRole() == PeerTestState.BOB) {
if (DataHelper.eq(from.getIP(), state.getAliceIP().getAddress()) &&
(from.getPort() == state.getAlicePort()) ) {
receiveFromAliceAsBob(from, testInfo, nonce, state);
} else if (DataHelper.eq(from.getIP(), state.getCharlieIP().getAddress()) &&
(from.getPort() == state.getCharliePort()) ) {
receiveFromCharlieAsBob(from, state);
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("Received from a fourth party as bob! alice: " + state.getAliceIP() + ", charlie: " + state.getCharlieIP() + ", dave: " + from);
}
} else if (state.getOurRole() == PeerTestState.CHARLIE) {
if ( (testIP == null) || (testPort <= 0) ) {
receiveFromAliceAsCharlie(from, testInfo, nonce);
} else {
receiveFromBobAsCharlie(from, testInfo, nonce, state);
}
}
}
}
@ -267,54 +306,65 @@ class PeerTestManager {
* so we must be Charlie receiving a PeerTest from Bob.
*
*/
private void receiveFromBobAsCharlie(RemoteHostId from, byte fromIP[], int fromPort, long nonce, UDPPacketReader.PeerTestReader testInfo) {
if (fromIP == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("From address received from Bob (we are Charlie) is invalid: " + from + ": " + testInfo);
return;
}
if (fromPort <= 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("From port received from Bob (we are Charlie) is invalid: " + fromPort + ": " + testInfo);
return;
}
boolean isNew = true;
int index = -1;
synchronized (_receiveAsCharlie) {
for (int i = 0; i < _receiveAsCharlie.length; i++) {
if (_receiveAsCharlie[i] == nonce) {
index = i;
isNew = false;
break;
}
}
if (index == -1) {
// ok, new nonce, store 'er
index = (_receiveAsCharlieIndex + 1) % _receiveAsCharlie.length;
_receiveAsCharlie[index] = nonce;
_receiveAsCharlieIndex = index;
}
private void receiveFromBobAsCharlie(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo, long nonce, PeerTestState state) {
boolean isNew = false;
if (state == null) {
isNew = true;
state = new PeerTestState();
state.setOurRole(PeerTestState.CHARLIE);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test as charlie nonce " + nonce + ", stored at index " + index);
_log.debug("Receive test as charlie nonce " + nonce);
if (isNew)
SimpleTimer.getInstance().addEvent(new RemoveCharlie(nonce, index), MAX_CHARLIE_LIFETIME);
int sz = testInfo.readIPSize();
byte aliceIPData[] = new byte[sz];
try {
InetAddress aliceIP = InetAddress.getByAddress(fromIP);
testInfo.readIP(aliceIPData, 0);
int alicePort = testInfo.readPort();
InetAddress aliceIP = InetAddress.getByAddress(aliceIPData);
InetAddress bobIP = InetAddress.getByAddress(from.getIP());
SessionKey aliceIntroKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
testInfo.readIntroKey(aliceIntroKey.getData(), 0);
UDPPacket packet = _packetBuilder.buildPeerTestToAlice(aliceIP, fromPort, aliceIntroKey, _transport.getIntroKey(), nonce);
state.setAliceIP(aliceIP);
state.setAlicePort(alicePort);
state.setAliceIntroKey(aliceIntroKey);
state.setNonce(nonce);
state.setBobIP(bobIP);
state.setBobPort(from.getPort());
state.setLastSendTime(_context.clock().now());
state.setOurRole(PeerTestState.CHARLIE);
state.setReceiveBobTime(_context.clock().now());
PeerState bob = _transport.getPeerState(from);
if (bob == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Received from bob (" + from + ") who hasn't established a session with us, refusing to help him test " + aliceIP +":" + alicePort);
return;
} else {
state.setBobCipherKey(bob.getCurrentCipherKey());
state.setBobMACKey(bob.getCurrentMACKey());
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive from bob as charlie and send to alice @ " + aliceIP + " on " + fromPort);
_log.debug("Receive from bob (" + from + ") as charlie, sending back to bob and sending to alice @ " + aliceIP + ":" + alicePort);
if (isNew) {
synchronized (_activeTests) {
_activeTests.put(new Long(nonce), state);
}
SimpleTimer.getInstance().addEvent(new RemoveTest(nonce), MAX_CHARLIE_LIFETIME);
}
UDPPacket packet = _packetBuilder.buildPeerTestToBob(bobIP, from.getPort(), aliceIP, alicePort, aliceIntroKey, nonce, state.getBobCipherKey(), state.getBobMACKey());
_transport.send(packet);
packet = _packetBuilder.buildPeerTestToAlice(aliceIP, alicePort, aliceIntroKey, _transport.getIntroKey(), nonce);
_transport.send(packet);
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to build the aliceIP from " + from, uhe);
_log.warn("Unable to build the aliceIP from " + from + ", ip size: " + sz + " ip val: " + Base64.encode(aliceIPData), uhe);
}
}
@ -323,24 +373,34 @@ class PeerTestManager {
* any info in the message), plus we are not acting as Charlie (so we've got to be Bob).
*
*/
private void receiveFromAliceAsBob(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo, long nonce) {
// we are Bob, so send Alice her PeerTest, pick a Charlie, and
// send Charlie Alice's info
private void receiveFromAliceAsBob(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo, long nonce, PeerTestState state) {
// we are Bob, so pick a (potentially) Charlie and send Charlie Alice's info
PeerState charlie = null;
for (int i = 0; i < 5; i++) {
charlie = _transport.getPeerState(UDPAddress.CAPACITY_TESTING);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Picking charlie as " + charlie + " for alice of " + from);
if ( (charlie != null) && (!charlie.getRemoteHostId().equals(from)) ) {
break;
RouterInfo charlieInfo = null;
if (state == null) { // pick a new charlie
for (int i = 0; i < 5; i++) {
charlie = _transport.getPeerState(UDPAddress.CAPACITY_TESTING);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Picking charlie as " + charlie + " for alice of " + from);
if ( (charlie != null) && (!DataHelper.eq(charlie.getRemoteHostId(), from)) ) {
charlieInfo = _context.netDb().lookupRouterInfoLocally(charlie.getRemotePeer());
if (charlieInfo != null)
break;
}
charlie = null;
}
charlie = null;
} else {
charlie = _transport.getPeerState(new RemoteHostId(state.getCharlieIP().getAddress(), state.getCharliePort()));
if (charlie != null)
charlieInfo = _context.netDb().lookupRouterInfoLocally(charlie.getRemotePeer());
}
if (charlie == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to pick a charlie");
return;
}
InetAddress aliceIP = null;
SessionKey aliceIntroKey = null;
try {
@ -348,27 +408,44 @@ class PeerTestManager {
aliceIntroKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
testInfo.readIntroKey(aliceIntroKey.getData(), 0);
RouterInfo info = _context.netDb().lookupRouterInfoLocally(charlie.getRemotePeer());
if (info == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No info for charlie: " + charlie);
return;
}
UDPAddress addr = new UDPAddress(info.getTargetAddress(UDPTransport.STYLE));
UDPAddress addr = new UDPAddress(charlieInfo.getTargetAddress(UDPTransport.STYLE));
SessionKey charlieIntroKey = new SessionKey(addr.getIntroKey());
UDPPacket packet = _packetBuilder.buildPeerTestToAlice(aliceIP, from.getPort(), aliceIntroKey, charlieIntroKey, nonce);
_transport.send(packet);
//UDPPacket packet = _packetBuilder.buildPeerTestToAlice(aliceIP, from.getPort(), aliceIntroKey, charlieIntroKey, nonce);
//_transport.send(packet);
packet = _packetBuilder.buildPeerTestToCharlie(aliceIP, from.getPort(), aliceIntroKey, nonce,
charlie.getRemoteIPAddress(),
charlie.getRemotePort(),
charlie.getCurrentCipherKey(),
charlie.getCurrentMACKey());
boolean isNew = false;
if (state == null) {
isNew = true;
state = new PeerTestState();
state.setBeginTime(_context.clock().now());
}
state.setAliceIP(aliceIP);
state.setAlicePort(from.getPort());
state.setAliceIntroKey(aliceIntroKey);
state.setNonce(nonce);
state.setCharlieIP(charlie.getRemoteIPAddress());
state.setCharliePort(charlie.getRemotePort());
state.setCharlieIntroKey(charlieIntroKey);
state.setLastSendTime(_context.clock().now());
state.setOurRole(PeerTestState.BOB);
state.setReceiveAliceTime(_context.clock().now());
if (isNew) {
synchronized (_activeTests) {
_activeTests.put(new Long(nonce), state);
}
SimpleTimer.getInstance().addEvent(new RemoveTest(nonce), MAX_CHARLIE_LIFETIME);
}
UDPPacket packet = _packetBuilder.buildPeerTestToCharlie(aliceIP, from.getPort(), aliceIntroKey, nonce,
charlie.getRemoteIPAddress(),
charlie.getRemotePort(),
charlie.getCurrentCipherKey(),
charlie.getCurrentMACKey());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive from alice as bob, picking charlie @ " + charlie.getRemoteIPAddress() + ":"
_log.debug("Receive from alice as bob for " + nonce + ", picking charlie @ " + charlie.getRemoteIPAddress() + ":"
+ charlie.getRemotePort() + " for alice @ " + aliceIP + ":" + from.getPort());
_transport.send(packet);
@ -378,6 +455,23 @@ class PeerTestManager {
}
}
/**
* The PeerTest message came from one of the Charlies picked for an existing test, so send Alice the
* packet verifying participation.
*
*/
private void receiveFromCharlieAsBob(RemoteHostId from, PeerTestState state) {
state.setReceiveCharlieTime(_context.clock().now());
UDPPacket packet = _packetBuilder.buildPeerTestToAlice(state.getAliceIP(), state.getAlicePort(),
state.getAliceIntroKey(), state.getCharlieIntroKey(),
state.getNonce());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive from charlie @ " + from + " as bob, sending alice back the ok @ " + state.getAliceIP() + ":" + state.getAlicePort());
_transport.send(packet);
}
/**
* We are charlie, so send Alice her PeerTest message
*
@ -402,20 +496,15 @@ class PeerTestManager {
/**
* forget about charlie's nonce after 60s.
*/
private class RemoveCharlie implements SimpleTimer.TimedEvent {
private class RemoveTest implements SimpleTimer.TimedEvent {
private long _nonce;
private int _index;
public RemoveCharlie(long nonce, int index) {
public RemoveTest(long nonce) {
_nonce = nonce;
_index = index;
}
public void timeReached() {
/** only forget about an entry if we haven't already moved on */
synchronized (_receiveAsCharlie) {
if (_receiveAsCharlie[_index] == _nonce)
_receiveAsCharlie[_index] = -1;
synchronized (_activeTests) {
_activeTests.remove(new Long(_nonce));
}
}
}
}

View File

@ -0,0 +1,118 @@
package net.i2p.router.transport.udp;
import java.net.InetAddress;
import net.i2p.data.SessionKey;
/**
*
*/
class PeerTestState {
private long _testNonce;
private short _ourRole;
private InetAddress _aliceIP;
private int _alicePort;
private InetAddress _bobIP;
private int _bobPort;
private InetAddress _charlieIP;
private int _charliePort;
private InetAddress _aliceIPFromCharlie;
private int _alicePortFromCharlie;
private SessionKey _aliceIntroKey;
private SessionKey _charlieIntroKey;
private SessionKey _bobCipherKey;
private SessionKey _bobMACKey;
private long _beginTime;
private long _lastSendTime;
private long _receiveAliceTime;
private long _receiveBobTime;
private long _receiveCharlieTime;
public static final short ALICE = 1;
public static final short BOB = 2;
public static final short CHARLIE = 3;
public synchronized long getNonce() { return _testNonce; }
public synchronized void setNonce(long nonce) { _testNonce = nonce; }
/** who are we? Alice, bob, or charlie? */
public synchronized short getOurRole() { return _ourRole; }
public synchronized void setOurRole(short role) { _ourRole = role; }
/**
* If we are Alice, this will contain the IP that Bob says we
* can be reached at - the IP Charlie says we can be reached
* at is _aliceIPFromCharlie
*
*/
public synchronized InetAddress getAliceIP() { return _aliceIP; }
public synchronized void setAliceIP(InetAddress ip) { _aliceIP = ip; }
public synchronized InetAddress getBobIP() { return _bobIP; }
public synchronized void setBobIP(InetAddress ip) { _bobIP = ip; }
public synchronized InetAddress getCharlieIP() { return _charlieIP; }
public synchronized void setCharlieIP(InetAddress ip) { _charlieIP = ip; }
public synchronized InetAddress getAliceIPFromCharlie() { return _aliceIPFromCharlie; }
public synchronized void setAliceIPFromCharlie(InetAddress ip) { _aliceIPFromCharlie = ip; }
/**
* If we are Alice, this will contain the port that Bob says we
* can be reached at - the port Charlie says we can be reached
* at is _alicePortFromCharlie
*
*/
public synchronized int getAlicePort() { return _alicePort; }
public synchronized void setAlicePort(int alicePort) { _alicePort = alicePort; }
public synchronized int getBobPort() { return _bobPort; }
public synchronized void setBobPort(int bobPort) { _bobPort = bobPort; }
public synchronized int getCharliePort() { return _charliePort; }
public synchronized void setCharliePort(int charliePort) { _charliePort = charliePort; }
public synchronized int getAlicePortFromCharlie() { return _alicePortFromCharlie; }
public synchronized void setAlicePortFromCharlie(int alicePortFromCharlie) { _alicePortFromCharlie = alicePortFromCharlie; }
public synchronized SessionKey getAliceIntroKey() { return _aliceIntroKey; }
public synchronized void setAliceIntroKey(SessionKey key) { _aliceIntroKey = key; }
public synchronized SessionKey getCharlieIntroKey() { return _charlieIntroKey; }
public synchronized void setCharlieIntroKey(SessionKey key) { _charlieIntroKey = key; }
public synchronized SessionKey getBobCipherKey() { return _bobCipherKey; }
public synchronized void setBobCipherKey(SessionKey key) { _bobCipherKey = key; }
public synchronized SessionKey getBobMACKey() { return _bobMACKey; }
public synchronized void setBobMACKey(SessionKey key) { _bobMACKey = key; }
/** when did this test begin? */
public synchronized long getBeginTime() { return _beginTime; }
public synchronized void setBeginTime(long when) { _beginTime = when; }
/** when did we last send out a packet? */
public synchronized long getLastSendTime() { return _lastSendTime; }
public synchronized void setLastSendTime(long when) { _lastSendTime = when; }
/** when did we last hear from alice? */
public synchronized long getReceiveAliceTime() { return _receiveAliceTime; }
public synchronized void setReceiveAliceTime(long when) { _receiveAliceTime = when; }
/** when did we last hear from bob? */
public synchronized long getReceiveBobTime() { return _receiveBobTime; }
public synchronized void setReceiveBobTime(long when) { _receiveBobTime = when; }
/** when did we last hear from charlie? */
public synchronized long getReceiveCharlieTime() { return _receiveCharlieTime; }
public synchronized void setReceiveCharlieTime(long when) { _receiveCharlieTime = when; }
public synchronized String toString() {
StringBuffer buf = new StringBuffer(512);
buf.append("Role: ");
if (_ourRole == ALICE) buf.append("Alice");
else if (_ourRole == BOB) buf.append("Bob");
else if (_ourRole == CHARLIE) buf.append("Charlie");
else buf.append("unkown!");
if (_aliceIP != null)
buf.append(" alice: ").append(_aliceIP).append(':').append(_alicePort);
if (_aliceIPFromCharlie != null)
buf.append(" (fromCharlie ").append(_aliceIPFromCharlie).append(':').append(_alicePortFromCharlie).append(')');
if (_bobIP != null)
buf.append(" bob: ").append(_bobIP).append(':').append(_bobPort);
if (_charlieIP != null)
buf.append(" charlie: ").append(_charlieIP).append(':').append(_charliePort);
buf.append(" last send after ").append(_lastSendTime - _beginTime).append("ms");
if (_receiveAliceTime > 0)
buf.append(" receive from alice after ").append(_receiveAliceTime - _beginTime).append("ms");
if (_receiveBobTime > 0)
buf.append(" receive from bob after ").append(_receiveBobTime - _beginTime).append("ms");
if (_receiveCharlieTime > 0)
buf.append(" receive from charlie after ").append(_receiveCharlieTime - _beginTime).append("ms");
return buf.toString();
}
}

View File

@ -21,8 +21,8 @@ public class UDPAddress {
public static final String PROP_INTRO_KEY = "key";
public static final String PROP_CAPACITY = "caps";
public static final char CAPACITY_TESTING = 'A';
public static final char CAPACITY_INTRODUCER = 'B';
public static final char CAPACITY_TESTING = 'B';
public static final char CAPACITY_INTRODUCER = 'C';
public UDPAddress(RouterAddress addr) {
parse(addr);

View File

@ -377,17 +377,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
/**
* if we haven't received anything in the last 5 minutes from a peer, don't
* trust its known capacities
*/
private static final int MAX_INACTIVITY_FOR_CAPACITY = 5*60*1000;
/** pick a random peer with the given capacity */
public PeerState getPeerState(char capacity) {
long now = _context.clock().now();
int index = _context.random().nextInt(1024);
List peers = _peersByCapacity[capacity-'A'];
int size = 0;
int off = 0;
PeerState rv = null;
for (int i = 0; i < 5; i++) {
while (rv == null) {
synchronized (peers) {
size = peers.size();
if (size > 0) {
index = (index + i) % size;
index = (index + off) % size;
rv = (PeerState)peers.get(index);
}
}
@ -395,14 +402,19 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
break;
if (_context.shitlist().isShitlisted(rv.getRemotePeer()))
rv = null;
else if (now - rv.getLastReceiveTime() > MAX_INACTIVITY_FOR_CAPACITY)
rv = null;
else
break;
off++;
if (off >= size)
break;
}
_context.statManager().addRateData("udp.peersByCapacity", size, capacity);
return rv;
}
private static final int MAX_PEERS_PER_CAPACITY = 16;
private static final int MAX_PEERS_PER_CAPACITY = 64;
/**
* Intercept RouterInfo entries received directly from a peer to inject them into