2004-10-08 jrandom

* Don't kill the establisher threads during a soft restart.
    * Attempt to validate the peer's routerInfo earlier during handshaking.
    * Revamp the AESOutputStream so it doesn't allocate any temporary objects
      during its operation.
This commit is contained in:
jrandom
2004-10-08 18:38:48 +00:00
committed by zzz
parent ff8674bca9
commit 730da3aa27
7 changed files with 161 additions and 58 deletions

View File

@ -226,8 +226,8 @@ public class AESInputStream extends FilterInputStream {
_log.warn("Decrypt got odd segment - " + trailing
+ " bytes pushed back for later decryption - corrupted or slow data stream perhaps?");
} else {
if (_log.shouldLog(Log.INFO))
_log.info(encrypted.length + " bytes makes up " + numBlocks + " blocks to decrypt normally");
if (_log.shouldLog(Log.DEBUG))
_log.debug(encrypted.length + " bytes makes up " + numBlocks + " blocks to decrypt normally");
}
for (int i = 0; i < numBlocks; i++) {
@ -313,9 +313,22 @@ public class AESInputStream extends FilterInputStream {
/**
* Test AESOutputStream/AESInputStream
*/
public static void main(String args[]) {
public static void main(String args[]) {
I2PAppContext ctx = new I2PAppContext();
try {
System.out.println("pwd=" + new java.io.File(".").getAbsolutePath());
System.out.println("Beginning");
runTest(ctx);
} catch (Throwable e) {
ctx.logManager().getLog(AESInputStream.class).error("Fail", e);
}
try { Thread.sleep(30*1000); } catch (InterruptedException ie) {}
System.out.println("Done");
}
private static void runTest(I2PAppContext ctx) {
Log log = ctx.logManager().getLog(AESInputStream.class);
log.setMinimumPriority(Log.DEBUG);
byte orig[] = new byte[1024 * 32];
RandomSource.getInstance().nextBytes(orig);
//byte orig[] = "you are my sunshine, my only sunshine".getBytes();
@ -351,10 +364,51 @@ public class AESInputStream extends FilterInputStream {
}
log.info("Done testing 0 byte data");
for (int i = 0; i <= 32768; i++) {
orig = new byte[i];
ctx.random().nextBytes(orig);
try {
log.info("Testing " + orig.length);
runTest(ctx, orig, key, iv);
} catch (RuntimeException re) {
log.error("Error testing " + orig.length);
throw re;
}
}
/*
orig = new byte[615280];
RandomSource.getInstance().nextBytes(orig);
for (int i = 0; i < 20; i++) {
runTest(ctx, orig, key, iv);
}
log.info("Done testing 615280 byte data");
*/
/*
for (int i = 0; i < 100; i++) {
orig = new byte[ctx.random().nextInt(1024*1024)];
ctx.random().nextBytes(orig);
try {
runTest(ctx, orig, key, iv);
} catch (RuntimeException re) {
log.error("Error testing " + orig.length);
throw re;
}
}
log.info("Done testing 100 random lengths");
*/
orig = new byte[32];
RandomSource.getInstance().nextBytes(orig);
runOffsetTest(ctx, orig, key, iv);
try {
runOffsetTest(ctx, orig, key, iv);
} catch (Exception e) {
log.info("Error running offset test", e);
}
log.info("Done testing offset test (it should have come back with a statement NOT EQUAL!)");
@ -389,18 +443,19 @@ public class AESInputStream extends FilterInputStream {
Hash newHash = SHA256Generator.getInstance().calculateHash(fin);
boolean eq = origHash.equals(newHash);
if (eq)
log.info("Equal hashes. hash: " + origHash);
else
log.error("NOT EQUAL! \norig: \t" + Base64.encode(orig) + "\nnew : \t" + Base64.encode(fin));
if (eq) {
//log.info("Equal hashes. hash: " + origHash);
} else {
throw new RuntimeException("NOT EQUAL! len=" + orig.length + "\norig: \t" + Base64.encode(orig) + "\nnew : \t" + Base64.encode(fin));
}
boolean ok = DataHelper.eq(orig, fin);
log.debug("EQ data? " + ok + " origLen: " + orig.length + " fin.length: " + fin.length);
log.debug("Time to D(E(" + orig.length + ")): " + (end - start) + "ms");
log.debug("Time to E(" + orig.length + "): " + (endE - start) + "ms");
log.debug("Time to D(" + orig.length + "): " + (end - endE) + "ms");
} catch (Throwable t) {
log.error("ERROR transferring", t);
} catch (IOException ioe) {
log.error("ERROR transferring", ioe);
}
//try { Thread.sleep(5000); } catch (Throwable t) {}
}
@ -441,15 +496,16 @@ public class AESInputStream extends FilterInputStream {
if (eq)
log.info("Equal hashes. hash: " + origHash);
else
log.error("NOT EQUAL! \norig: \t" + Base64.encode(orig) + "\nnew : \t" + Base64.encode(fin));
throw new RuntimeException("NOT EQUAL! len=" + orig.length + "\norig: \t" + Base64.encode(orig) + "\nnew : \t" + Base64.encode(fin));
boolean ok = DataHelper.eq(orig, fin);
log.debug("EQ data? " + ok + " origLen: " + orig.length + " fin.length: " + fin.length);
log.debug("Time to D(E(" + orig.length + ")): " + (end - start) + "ms");
log.debug("Time to E(" + orig.length + "): " + (endE - start) + "ms");
log.debug("Time to D(" + orig.length + "): " + (end - endE) + "ms");
} catch (Throwable t) {
log.error("ERROR transferring", t);
} catch (RuntimeException re) {
throw re;
} catch (IOException ioe) {
log.error("ERROR transferring", ioe);
}
//try { Thread.sleep(5000); } catch (Throwable t) {}
}

View File

@ -34,7 +34,15 @@ public class AESOutputStream extends FilterOutputStream {
private I2PAppContext _context;
private SessionKey _key;
private byte[] _lastBlock;
private ByteArrayOutputStream _inBuf;
/**
* buffer containing the unwritten bytes. The first unwritten
* byte is _lastCommitted+1, and the last unwritten byte is _nextWrite-1
* (aka the next byte to be written on the array is _nextWrite)
*/
private byte[] _unencryptedBuf;
private byte _writeBlock[];
/** how many bytes have we been given since we flushed it to the stream? */
private int _writesSinceCommit;
private long _cumulativeProvided; // how many bytes provided to this stream
private long _cumulativeWritten; // how many bytes written to the underlying stream
private long _cumulativePadding; // how many bytes of padding written
@ -51,31 +59,32 @@ public class AESOutputStream extends FilterOutputStream {
_key = key;
_lastBlock = new byte[BLOCK_SIZE];
System.arraycopy(iv, 0, _lastBlock, 0, BLOCK_SIZE);
_inBuf = new ByteArrayOutputStream(MAX_BUF);
_unencryptedBuf = new byte[MAX_BUF];
_writeBlock = new byte[BLOCK_SIZE];
_writesSinceCommit = 0;
}
public void write(int val) throws IOException {
_cumulativeProvided++;
_inBuf.write(val);
if (_inBuf.size() > MAX_BUF) doFlush();
_unencryptedBuf[_writesSinceCommit++] = (byte)(val & 0xFF);
if (_writesSinceCommit == _unencryptedBuf.length)
doFlush();
}
public void write(byte src[]) throws IOException {
_cumulativeProvided += src.length;
_inBuf.write(src);
if (_inBuf.size() > MAX_BUF) doFlush();
write(src, 0, src.length);
}
public void write(byte src[], int off, int len) throws IOException {
_cumulativeProvided += len;
_inBuf.write(src, off, len);
if (_inBuf.size() > MAX_BUF) doFlush();
// i'm too lazy to unroll this into the partial writes (dealing with
// wrapping around the buffer size)
for (int i = 0; i < len; i++)
write(src[i+off]);
}
public void close() throws IOException {
flush();
out.close();
_inBuf.reset();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Cumulative bytes provided to this stream / written out / padded: "
+ _cumulativeProvided + "/" + _cumulativeWritten + "/" + _cumulativePadding);
@ -87,8 +96,10 @@ public class AESOutputStream extends FilterOutputStream {
}
private void doFlush() throws IOException {
writeEncrypted(_inBuf.toByteArray());
_inBuf.reset();
if (_log.shouldLog(Log.INFO))
_log.info("doFlush(): writesSinceCommit=" + _writesSinceCommit);
writeEncrypted();
_writesSinceCommit = 0;
}
/**
@ -101,39 +112,37 @@ public class AESOutputStream extends FilterOutputStream {
* times).
*
*/
private void writeEncrypted(byte src[]) throws IOException {
if ((src == null) || (src.length == 0)) return;
int numBlocks = src.length / (BLOCK_SIZE - 1);
private void writeEncrypted() throws IOException {
int numBlocks = _writesSinceCommit / (BLOCK_SIZE - 1);
byte block[] = new byte[BLOCK_SIZE];
if (_log.shouldLog(Log.INFO))
_log.info("writeE(): #=" + _writesSinceCommit + " blocks=" + numBlocks);
for (int i = 0; i < numBlocks; i++) {
DataHelper.xor(src, i * 15, _lastBlock, 0, block, 0, 15);
DataHelper.xor(_unencryptedBuf, i * 15, _lastBlock, 0, _writeBlock, 0, 15);
// the padding byte for "full" blocks
block[BLOCK_SIZE - 1] = (byte)(_lastBlock[BLOCK_SIZE - 1] ^ 0x01);
_context.aes().encrypt(block, 0, block, 0, _key, _lastBlock, BLOCK_SIZE);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Padding block " + i + " of " + numBlocks + " with 1 byte");
out.write(block);
System.arraycopy(block, 0, _lastBlock, 0, BLOCK_SIZE);
_writeBlock[BLOCK_SIZE - 1] = (byte)(_lastBlock[BLOCK_SIZE - 1] ^ 0x01);
_context.aes().encrypt(_writeBlock, 0, _writeBlock, 0, _key, _lastBlock, BLOCK_SIZE);
out.write(_writeBlock);
System.arraycopy(_writeBlock, 0, _lastBlock, 0, BLOCK_SIZE);
_cumulativeWritten += BLOCK_SIZE;
_cumulativePadding++;
}
if (src.length % 15 != 0) {
if (_writesSinceCommit % 15 != 0) {
// we need to do non trivial padding
int remainingBytes = src.length - numBlocks * 15;
int remainingBytes = _writesSinceCommit - numBlocks * 15;
int paddingBytes = BLOCK_SIZE - remainingBytes;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Padding " + src.length + " with " + paddingBytes + " bytes in " + numBlocks + " blocks");
System.arraycopy(src, numBlocks * 15, block, 0, remainingBytes);
Arrays.fill(block, remainingBytes, BLOCK_SIZE, (byte) paddingBytes);
DataHelper.xor(block, 0, _lastBlock, 0, block, 0, BLOCK_SIZE);
_context.aes().encrypt(block, 0, block, 0, _key, _lastBlock, BLOCK_SIZE);
out.write(block);
System.arraycopy(block, 0, _lastBlock, 0, BLOCK_SIZE);
_log.debug("Padding " + _writesSinceCommit + " with " + paddingBytes + " bytes in " + (numBlocks+1) + " blocks");
System.arraycopy(_unencryptedBuf, numBlocks * 15, _writeBlock, 0, remainingBytes);
Arrays.fill(_writeBlock, remainingBytes, BLOCK_SIZE, (byte) paddingBytes);
DataHelper.xor(_writeBlock, 0, _lastBlock, 0, _writeBlock, 0, BLOCK_SIZE);
_context.aes().encrypt(_writeBlock, 0, _writeBlock, 0, _key, _lastBlock, BLOCK_SIZE);
out.write(_writeBlock);
System.arraycopy(_writeBlock, 0, _lastBlock, 0, BLOCK_SIZE);
_cumulativePadding += paddingBytes;
_cumulativeWritten += BLOCK_SIZE;
}
}
}

View File

@ -1,4 +1,10 @@
$Id: history.txt,v 1.38 2004/10/07 14:19:52 jrandom Exp $
$Id: history.txt,v 1.39 2004/10/07 21:08:11 jrandom Exp $
2004-10-08 jrandom
* Don't kill the establisher threads during a soft restart.
* Attempt to validate the peer's routerInfo earlier during handshaking.
* Revamp the AESOutputStream so it doesn't allocate any temporary objects
during its operation.
2004-10-07 jrandom
* Reimplement the I2NP reading with less temporary memory allocation.

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.46 $ $Date: 2004/10/07 14:19:52 $";
public final static String ID = "$Revision: 1.47 $ $Date: 2004/10/07 21:08:11 $";
public final static String VERSION = "0.4.1.1";
public final static long BUILD = 12;
public final static long BUILD = 13;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -437,7 +437,15 @@ public class ConnectionBuilder {
}
_actualPeer = peer;
return true;
try {
_context.netDb().store(peer.getIdentity().getHash(), peer);
return true;
} catch (IllegalArgumentException iae) {
fail("Peer sent us bad info - " + _target.getIdentity().getHash().toBase64().substring(0,6)
+ ": " + iae.getMessage());
return false;
}
} catch (IOException ioe) {
fail("Error reading the verified info from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
@ -584,7 +592,15 @@ public class ConnectionBuilder {
}
_actualPeer = peer;
return true;
try {
_context.netDb().store(peer.getIdentity().getHash(), peer);
return true;
} catch (IllegalArgumentException iae) {
fail("Peer sent us bad info - " + _target.getIdentity().getHash().toBase64().substring(0,6)
+ ": " + iae.getMessage());
return false;
}
} catch (IOException ioe) {
fail("Error reading the verified info from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
@ -645,7 +661,6 @@ public class ConnectionBuilder {
//_connectionOut = _rawOut;
Hash peer = _actualPeer.getIdentity().getHash();
_context.netDb().store(peer, _actualPeer);
_transport.getTagManager().replaceTag(peer, _nextConnectionTag, _key);
}

View File

@ -443,7 +443,14 @@ public class ConnectionHandler {
SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddhhmmssSSS");
props.setProperty("SKEW", fmt.format(new Date(_context.clock().now())));
} else {
status = STATUS_OK;
try {
_context.netDb().store(_actualPeer.getIdentity().getHash(), _actualPeer);
status = STATUS_OK;
} catch (IllegalArgumentException iae) {
// bad peer info
status = STATUS_UNKNOWN;
props.setProperty("REASON", "RouterInfoFailed");
}
}
baos.write(status);
@ -460,7 +467,7 @@ public class ConnectionHandler {
verification.writeBytes(_rawOut);
_rawOut.flush();
return handleStatus(status, clockSkew);
return handleStatus(status, clockSkew);
} catch (IOException ioe) {
fail("Error writing the peer info to " + _from
+ ": " + ioe.getMessage(), ioe);
@ -601,7 +608,14 @@ public class ConnectionHandler {
} else if (!sigOk) {
status = STATUS_SIGNATURE_FAILED;
} else {
status = STATUS_OK;
try {
_context.netDb().store(_actualPeer.getIdentity().getHash(), _actualPeer);
status = STATUS_OK;
} catch (IllegalArgumentException iae) {
// bad peer info
status = STATUS_UNKNOWN;
props.setProperty("REASON", "RouterInfoFailed");
}
}
if (_actualPeer.getIdentity().getHash().equals(_context.routerHash())) {
@ -827,7 +841,6 @@ public class ConnectionHandler {
//_connectionOut = _rawOut;
Hash peer = _actualPeer.getIdentity().getHash();
_context.netDb().store(peer, _actualPeer);
_transport.getTagManager().replaceTag(peer, _nextConnectionTag, _key);
}

View File

@ -24,6 +24,10 @@ public class TCPConnectionEstablisher implements Runnable {
public void run() {
while (true) {
RouterInfo info = _transport.getNextPeer();
if (info == null) {
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
continue;
}
ConnectionBuilder cb = new ConnectionBuilder(_context, _transport, info);
TCPConnection con = null;