* protocol doc & impl cleanup

* more defensive programming
* more javadoc updates
This commit is contained in:
jrandom
2004-09-28 08:34:48 +00:00
committed by zzz
parent cf780e296e
commit 2741ac195d
8 changed files with 140 additions and 91 deletions

View File

@ -324,11 +324,36 @@ public abstract class TransportImpl implements Transport {
}
/** What addresses are we currently listening to? */
public Set getCurrentAddresses() { return _currentAddresses; }
/** Add an address to our listening set */
protected void addCurrentAddress(RouterAddress address) { _currentAddresses.add(address); }
/** Remove an address from our listening set */
protected void removeCurrentAddress(RouterAddress address) { _currentAddresses.remove(address); }
public Set getCurrentAddresses() {
synchronized (_currentAddresses) {
return new HashSet(_currentAddresses);
}
}
/**
* Replace any existing addresses for the current transport with the given
* one.
*/
protected void replaceAddress(RouterAddress address) {
synchronized (_currentAddresses) {
Set addresses = _currentAddresses;
List toRemove = null;
for (Iterator iter = addresses.iterator(); iter.hasNext(); ) {
RouterAddress cur = (RouterAddress)iter.next();
if (getStyle().equals(cur.getTransportStyle())) {
if (toRemove == null)
toRemove = new ArrayList(1);
toRemove.add(cur);
}
}
if (toRemove != null) {
for (int i = 0; i < toRemove.size(); i++) {
addresses.remove(toRemove.get(i));
}
}
_currentAddresses.add(address);
}
}
/** Who to notify on message availability */
public void setListener(TransportEventListener listener) { _listener = listener; }
/** Make this stuff pretty (only used in the old console) */

View File

@ -168,6 +168,7 @@ public class ConnectionBuilder {
if (!ok) return;
}
/**
* Send <code>#bytesFollowing + #versions + v1 [+ v2 [etc]] +
* tag? + tagData + properties</code>
@ -181,10 +182,10 @@ public class ConnectionBuilder {
DataHelper.writeLong(baos, 1, TCPTransport.SUPPORTED_PROTOCOLS[i]);
}
if (_connectionTag != null) {
baos.write(0x1);
baos.write(ConnectionHandler.FLAG_TAG_FOLLOWING);
baos.write(_connectionTag.getData());
} else {
baos.write(0x0);
baos.write(ConnectionHandler.FLAG_TAG_NOT_FOLLOWING);
}
DataHelper.writeProperties(baos, null); // no options atm
byte line[] = baos.toByteArray();
@ -219,8 +220,7 @@ public class ConnectionBuilder {
try {
// #bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties
int numBytes = (int)DataHelper.readLong(_rawIn, 2);
// 0xFFFF is a reserved value
if ( (numBytes <= 0) || (numBytes >= 0xFFFF) )
if ( (numBytes <= 0) || (numBytes >= ConnectionHandler.FLAG_TEST) )
throw new IOException("Invalid number of bytes in response");
byte line[] = new byte[numBytes];
@ -244,7 +244,7 @@ public class ConnectionBuilder {
break;
}
}
if (_agreedProtocol == -1) {
if (_agreedProtocol == ConnectionHandler.FLAG_PROTOCOL_NONE) {
fail("No valid protocol versions to contact "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
@ -259,7 +259,7 @@ public class ConnectionBuilder {
_transport.ourAddressReceived(_localIP);
int tagOk = (int)DataHelper.readLong(bais, 1);
if ( (tagOk == 0x01) && (_connectionTag != null) ) {
if ( (tagOk == ConnectionHandler.FLAG_TAG_OK) && (_connectionTag != null) ) {
// tag is ok
} else {
_connectionTag = null;
@ -305,6 +305,8 @@ public class ConnectionBuilder {
/** Set the next tag to <code>H(E(nonce + tag, sessionKey))</code> */
private void updateNextTagExisting() {
byte pre[] = new byte[48];
System.arraycopy(_nonce.getData(), 0, pre, 0, 4);
System.arraycopy(_connectionTag.getData(), 0, pre, 4, 32);
byte encr[] = _context.AESEngine().encrypt(pre, _key, _iv);
Hash h = _context.sha().calculateHash(encr);
_nextConnectionTag = new ByteArray(h.getData());
@ -601,23 +603,23 @@ public class ConnectionBuilder {
*/
private boolean validateStatus(int status) {
switch (status) {
case -1:
case -1: // EOF
fail("Error reading the status from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
case 0: // ok
case ConnectionHandler.STATUS_OK:
return true;
case 1: // not reachable
case ConnectionHandler.STATUS_UNREACHABLE:
fail("According to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ", we are not reachable on " + _localIP + ":" + _transport.getPort());
return false;
case 2: // clock skew
case ConnectionHandler.STATUS_SKEWED:
fail("According to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ", our clock is off");
return false;
case 3: // signature failure (only for new sessions)
case ConnectionHandler.STATUS_SIGNATURE_FAILED: // (only for new sessions)
fail("Signature failure talking to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;

View File

@ -81,6 +81,31 @@ public class ConnectionHandler {
/** initialization vector for the encryption */
private byte[] _iv;
/** for reading/comparing, this is the #bytes sent if we are being tested */
public static final int FLAG_TEST = 0xFFFF;
/** protocol version sent if no protocols are ok */
public static final byte FLAG_PROTOCOL_NONE = 0x0;
/** alice is sending a tag to bob */
public static final byte FLAG_TAG_FOLLOWING = 0x1;
/** alice is not sending a tag to bob */
public static final byte FLAG_TAG_NOT_FOLLOWING = 0x0;
/** the connection tag is ok (we have an available key for it) */
public static final byte FLAG_TAG_OK = 0x1;
/** the connection tag is not ok (must go with a full DH) */
public static final byte FLAG_TAG_NOT_OK = 0x0;
/** dunno why the peer is bad */
public static final int STATUS_UNKNOWN = -1;
/** the peer's public addresses could not be verified */
public static final int STATUS_UNREACHABLE = 1;
/** the peer's clock is too far skewed */
public static final int STATUS_SKEWED = 2;
/** the peer's signature failed (either some crazy corruption or MITM) */
public static final int STATUS_SIGNATURE_FAILED = 3;
/** the peer is fine */
public static final int STATUS_OK = 0;
private static final int MAX_VERSIONS = 255;
public ConnectionHandler(RouterContext ctx, TCPTransport transport, Socket socket) {
_context = ctx;
_log = ctx.logManager().getLog(ConnectionHandler.class);
@ -173,8 +198,8 @@ public class ConnectionHandler {
int numBytes = (int)DataHelper.readLong(_rawIn, 2);
if (numBytes <= 0)
throw new IOException("Invalid number of bytes in connection");
// 0xFFFF is a reserved value identifying the connection as a reachability test
if (numBytes == 0xFFFF) {
// reachability test
if (numBytes == FLAG_TEST) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ReadProtocol[Y]: test called, handle it");
handleTest();
@ -194,7 +219,7 @@ public class ConnectionHandler {
ByteArrayInputStream bais = new ByteArrayInputStream(line);
int numVersions = (int)DataHelper.readLong(bais, 1);
if ( (numVersions <= 0) || (numVersions > 0x8) ) {
if ( (numVersions <= 0) || (numVersions > MAX_VERSIONS) ) {
fail("Invalid number of protocol versions from " + _from);
return false;
}
@ -212,7 +237,7 @@ public class ConnectionHandler {
}
int tag = (int)DataHelper.readLong(bais, 1);
if (tag == 0x1) {
if (tag == FLAG_TAG_FOLLOWING) {
byte tagData[] = new byte[32];
read = DataHelper.read(bais, tagData);
if (read != 32)
@ -248,7 +273,7 @@ public class ConnectionHandler {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
if (_agreedProtocol <= 0)
baos.write(0x0);
baos.write(FLAG_PROTOCOL_NONE);
else
baos.write(_agreedProtocol);
@ -257,9 +282,9 @@ public class ConnectionHandler {
baos.write(ip);
if (_key != null)
baos.write(0x1);
baos.write(FLAG_TAG_OK);
else
baos.write(0x0);
baos.write(FLAG_TAG_NOT_OK);
byte nonce[] = new byte[4];
_context.random().nextBytes(nonce);
@ -301,6 +326,8 @@ public class ConnectionHandler {
/** Set the next tag to <code>H(E(nonce + tag, sessionKey))</code> */
private void updateNextTagExisting() {
byte pre[] = new byte[48];
System.arraycopy(_nonce.getData(), 0, pre, 0, 4);
System.arraycopy(_connectionTag.getData(), 0, pre, 4, 32);
byte encr[] = _context.AESEngine().encrypt(pre, _key, _iv);
Hash h = _context.sha().calculateHash(encr);
_nextConnectionTag = new ByteArray(h.getData());
@ -313,7 +340,7 @@ public class ConnectionHandler {
* @return true if the connection went ok, or false if it failed.
*/
private boolean connectExistingSession() {
// iv to the SHA256 of the tag appended by the nonce.
// iv = H(tag+nonce)
byte data[] = new byte[36];
System.arraycopy(_connectionTag.getData(), 0, data, 0, 32);
System.arraycopy(_nonce.getData(), 0, data, 32, 4);
@ -407,16 +434,16 @@ public class ConnectionHandler {
Properties props = new Properties();
int status = -1;
int status = STATUS_UNKNOWN;
if (!reachable) {
status = 1;
status = STATUS_UNREACHABLE;
} else if ( (clockSkew > Router.CLOCK_FUDGE_FACTOR)
|| (clockSkew < 0 - Router.CLOCK_FUDGE_FACTOR) ) {
status = 2;
status = STATUS_SKEWED;
SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddhhmmssSSS");
props.setProperty("SKEW", fmt.format(new Date(_context.clock().now())));
} else {
status = 0;
status = STATUS_OK;
}
baos.write(status);
@ -559,18 +586,18 @@ public class ConnectionHandler {
Properties props = new Properties();
int status = -1;
int status = STATUS_UNKNOWN;
if (!reachable) {
status = 1;
status = STATUS_UNREACHABLE;
} else if ( (clockSkew > Router.CLOCK_FUDGE_FACTOR)
|| (clockSkew < 0 - Router.CLOCK_FUDGE_FACTOR) ) {
status = 2;
status = STATUS_SKEWED;
SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddhhmmssSSS");
props.setProperty("SKEW", fmt.format(new Date(_context.clock().now())));
} else if (!sigOk) {
status = 3;
status = STATUS_SIGNATURE_FAILED;
} else {
status = 0;
status = STATUS_OK;
}
baos.write(status);
@ -608,17 +635,17 @@ public class ConnectionHandler {
*/
private boolean handleStatus(int status, long clockSkew) {
switch (status) {
case 0: // ok
case STATUS_OK:
return true;
case 1:
case STATUS_UNREACHABLE:
fail("Peer " + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6)
+ " at " + _from + " is unreachable");
return false;
case 2:
case STATUS_SKEWED:
fail("Peer " + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6)
+ " was skewed by " + DataHelper.formatDuration(clockSkew));
return false;
case 3:
case STATUS_SIGNATURE_FAILED:
fail("Forged signature on " + _from + " pretending to be "
+ _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
@ -655,8 +682,7 @@ public class ConnectionHandler {
_log.debug("Beginning verification of reachability");
// send: 0xFFFF + #versions + v1 [+ v2 [etc]] + properties
out.write(0xFF);
out.write(0xFF);
DataHelper.writeLong(out, 2, FLAG_TEST);
out.write(TCPTransport.SUPPORTED_PROTOCOLS.length);
for (int i = 0; i < TCPTransport.SUPPORTED_PROTOCOLS.length; i++)
out.write(TCPTransport.SUPPORTED_PROTOCOLS[i]);
@ -667,16 +693,13 @@ public class ConnectionHandler {
_log.debug("Verification of reachability request sent");
// read: 0xFFFF + versionOk + #bytesIP + IP + currentTime + properties
int ok = in.read();
if (ok != 0xFF)
throw new IOException("Unable to verify the peer - invalid response");
ok = in.read();
if (ok != 0xFF)
int flag = (int)DataHelper.readLong(in, 2);
if (flag != FLAG_TEST)
throw new IOException("Unable to verify the peer - invalid response");
int version = in.read();
if (version == -1)
throw new IOException("Unable to verify the peer - invalid version");
if (version == 0)
if (version == FLAG_PROTOCOL_NONE)
throw new IOException("Unable to verify the peer - no matching version");
int numBytes = in.read();
if ( (numBytes == -1) || (numBytes > 32) )
@ -715,7 +738,7 @@ public class ConnectionHandler {
// read: #versions + v1 [+ v2 [etc]] + properties
int numVersions = _rawIn.read();
if (numVersions == -1) throw new IOException("Unable to read versions");
if (numVersions > 256) throw new IOException("Too many versions");
if (numVersions > MAX_VERSIONS) throw new IOException("Too many versions");
int versions[] = new int[numVersions];
for (int i = 0; i < numVersions; i++) {
versions[i] = _rawIn.read();
@ -738,8 +761,7 @@ public class ConnectionHandler {
_log.debug("HandleTest: version=" + version + " opts=" +opts);
// send: 0xFFFF + versionOk + #bytesIP + IP + currentTime + properties
_rawOut.write(0xFF);
_rawOut.write(0xFF);
DataHelper.writeLong(_rawOut, 2, FLAG_TEST);
_rawOut.write(version);
byte ip[] = _from.getBytes();
_rawOut.write(ip.length);

View File

@ -44,7 +44,8 @@ class ConnectionRunner implements Runnable {
public void run() {
while (_keepRunning && !_con.getIsClosed()) {
OutNetMessage msg = _con.getNextMessage();
if ( (msg == null) && (_keepRunning) ) {
if (msg == null) {
if (_keepRunning)
_log.error("next message is null but we should keep running?");
} else {
sendMessage(msg);

View File

@ -19,6 +19,8 @@ import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Simple persistent impl writing the connection tags to connectionTag.keys
* (or another file specified via "i2np.tcp.tagFile")
*
*/
public class PersistentConnectionTagManager extends ConnectionTagManager {

View File

@ -34,7 +34,9 @@ class TCPListener {
private ServerSocket _socket;
private ListenerRunner _listener;
private RouterContext _context;
/** Client Sockets that have been received but not yet handled (oldest first) */
private List _pendingSockets;
/** List of SocketHandler runners if we're listening (else an empty list) */
private List _handlers;
/**
@ -61,6 +63,7 @@ class TCPListener {
_handlers = new ArrayList(CONCURRENT_HANDLERS);
}
/** Make sure we are listening on the transport's getMyAddress() */
public void startListening() {
TCPAddress addr = _transport.getMyAddress();
if ( (addr != null) && (addr.getHost() != null) && (addr.getPort() > 0) ) {
@ -149,8 +152,9 @@ class TCPListener {
curDelay = 0;
loop();
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error listening to tcp connection " + _myAddress.getHost() + ":"
if (_isRunning && _context.router().isAlive())
if (_log.shouldLog(Log.ERROR))
_log.error("Error listening to tcp connection " + _myAddress.getHost() + ":"
+ _myAddress.getPort(), ioe);
}
@ -167,12 +171,13 @@ class TCPListener {
if (_nextFailDelay > MAX_FAIL_DELAY)
_nextFailDelay = MAX_FAIL_DELAY;
}
if (_isRunning && _context.router().isAlive())
if (_log.shouldLog(Log.ERROR))
_log.error("CANCELING TCP LISTEN. delay = " + curDelay);
_isRunning = false;
}
private void loop() {
while (_isRunning) {
while (_isRunning && _context.router().isAlive()) {
try {
if (_log.shouldLog(Log.INFO))
_log.info("Waiting for a connection on " + _myAddress.getHost() + ":" + _myAddress.getPort());

View File

@ -270,7 +270,8 @@ public class TCPTransport extends TransportImpl {
*
* @param address address that the remote host said was ours
*/
synchronized void ourAddressReceived(String address) {
void ourAddressReceived(String address) {
synchronized (_listener) { // no need to lock on the whole TCPTransport
if (allowAddressUpdate()) {
int port = getPort();
TCPAddress addr = new TCPAddress(address, port);
@ -290,6 +291,10 @@ public class TCPTransport extends TransportImpl {
if (_log.shouldLog(Log.ERROR))
_log.error("Address specified is not valid [" + address + ":" + port + "]");
}
} else {
// either we have explicitly specified our IP address, or
// we are already connected to some people.
}
}
}
@ -321,8 +326,8 @@ public class TCPTransport extends TransportImpl {
/**
* Add the given message to the list of most recent connection
* establishment error messages. This should include a timestamp of
* some sort in it.
* establishment error messages. A timestamp is prefixed to it before
* being rendered on the router console.
*
*/
void addConnectionErrorMessage(String msg) {
@ -394,22 +399,7 @@ public class TCPTransport extends TransportImpl {
_myAddress = addr;
_listener.stopListening();
Set addresses = getCurrentAddresses();
List toRemove = null;
for (Iterator iter = addresses.iterator(); iter.hasNext(); ) {
RouterAddress cur = (RouterAddress)iter.next();
if (STYLE.equals(cur.getTransportStyle())) {
if (toRemove == null)
toRemove = new ArrayList(1);
toRemove.add(cur);
}
}
if (toRemove != null) {
for (int i = 0; i < toRemove.size(); i++) {
addresses.remove(toRemove.get(i));
}
}
addresses.add(routerAddr);
replaceAddress(routerAddr);
_context.router().rebuildRouterInfo();
@ -492,7 +482,7 @@ public class TCPTransport extends TransportImpl {
*
*/
RouterInfo getNextPeer() {
while (true) {
while (_context.router().isAlive()) {
synchronized (_connectionLock) {
for (Iterator iter = _pendingMessages.keySet().iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
@ -526,6 +516,7 @@ public class TCPTransport extends TransportImpl {
} catch (InterruptedException ie) {}
}
}
return null;
}
/** Called after an establisher finished (or failed) connecting to the peer */

View File

@ -64,7 +64,8 @@ remainder of the communication is AES256 encrypted per
<p><b>8) </b> <i>Bob to Alice</i>: <br />
<code>routerInfo + status + properties + H(routerInfo + status + properties + nonce + tag)</code></p>
<p><b>9) </b> If the <code>status</code> is ok, both Alice and Bob consume the
<code>tagData</code>, updating the next tag to be <code>H(E(nonce + tag, sessionKey))</code>.
<code>tagData</code>, updating the next tag to be <code>H(E(nonce + tag, sessionKey))</code>
(with nonce+tag padded with 12 bytes of 0x0 at the end).
Otherwise, both sides disconnect and do not consume the tag. In addition, on error the
<code>properties</code> mapping has a more detailed reason under the key "MESSAGE".</p>