2005-01-05 jrandom

* Handle unexpected network read errors more carefully (thanks parg!)
    * Added more methods to partially compare (DataHelper) and display
      arrays (Base64.encode).
    * Exposed the AES encryptBlock/decryptBlock on the context.aes()
    * Be more generous on the throttle when just starting up the router
    * Fix a missing scheduled event in the streaming lib (caused after reset)
    * Add a new DisconnectListener on the I2PSocketManager to allow
      notification of session destruction.
    * Make sure our own router identity is valid, and if it isn't, build a new
      one and restart the router.  Alternately, you can run the Router with
      the single command line argument "rebuild" and it will do the same.
This commit is contained in:
jrandom
2005-01-06 00:17:53 +00:00
committed by zzz
parent 3dd2f67ff3
commit 4838564460
21 changed files with 235 additions and 27 deletions

View File

@ -105,4 +105,11 @@ public interface I2PSocketManager {
public void setName(String name);
public void init(I2PAppContext context, I2PSession session, Properties opts, String name);
public void addDisconnectListener(DisconnectListener lsnr);
public void removeDisconnectListener(DisconnectListener lsnr);
public static interface DisconnectListener {
public void sessionDisconnected();
}
}

View File

@ -10,9 +10,11 @@ import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@ -46,6 +48,7 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
private I2PSocketOptions _defaultOptions;
private long _acceptTimeout;
private String _name;
private List _listeners;
private static int __managerId = 0;
public static final short ACK = 0x51;
@ -76,6 +79,7 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
_inSockets = new HashMap(16);
_outSockets = new HashMap(16);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_listeners = new ArrayList(1);
setSession(session);
setDefaultOptions(buildOptions(opts));
_context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
@ -109,6 +113,15 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
public void disconnected(I2PSession session) {
_log.info(getName() + ": Disconnected from the session");
destroySocketManager();
List listeners = null;
synchronized (_listeners) {
listeners = new ArrayList(_listeners);
_listeners.clear();
}
for (int i = 0; i < listeners.size(); i++) {
DisconnectListener lsnr = (DisconnectListener)listeners.get(i);
lsnr.sessionDisconnected();
}
}
public void errorOccurred(I2PSession session, String message, Throwable error) {
@ -707,6 +720,17 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
public String getName() { return _name; }
public void setName(String name) { _name = name; }
public void addDisconnectListener(DisconnectListener lsnr) {
synchronized (_listeners) {
_listeners.add(lsnr);
}
}
public void removeDisconnectListener(DisconnectListener lsnr) {
synchronized (_listeners) {
_listeners.remove(lsnr);
}
}
public static String getReadableForm(String id) {
if (id == null) return "(null)";
if (id.length() != 3) return "Bogus";

View File

@ -33,6 +33,7 @@ public class Connection {
private long _lastSendId;
private boolean _resetReceived;
private boolean _resetSent;
private long _resetSentOn;
private boolean _connected;
private boolean _hardDisconnected;
private MessageInputStream _inputStream;
@ -113,6 +114,7 @@ public class Connection {
_ackSinceCongestion = true;
_connectLock = new Object();
_activeResends = 0;
_resetSentOn = -1;
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -185,6 +187,8 @@ public class Connection {
*/
void sendReset() {
_resetSent = true;
if (_resetSentOn <= 0)
_resetSentOn = _context.clock().now();
if ( (_remotePeer == null) || (_sendStreamId == null) ) return;
PacketLocal reply = new PacketLocal(_context, _remotePeer);
reply.setFlag(Packet.FLAG_RESET);
@ -388,6 +392,7 @@ public class Connection {
public boolean getIsConnected() { return _connected; }
public boolean getHardDisconnected() { return _hardDisconnected; }
public boolean getResetSent() { return _resetSent; }
public long getResetSentOn() { return _resetSentOn; }
void disconnect(boolean cleanDisconnect) {
disconnect(cleanDisconnect, true);

View File

@ -5,9 +5,11 @@ import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@ -225,4 +227,12 @@ public class I2PSocketManagerFull implements I2PSocketManager {
public String getName() { return _name; }
public void setName(String name) { _name = name; }
public void addDisconnectListener(DisconnectListener lsnr) {
_connectionManager.getMessageHandler().addDisconnectListener(lsnr);
}
public void removeDisconnectListener(DisconnectListener lsnr) {
_connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
}
}

View File

@ -1,5 +1,8 @@
package net.i2p.client.streaming;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionListener;
@ -15,10 +18,12 @@ public class MessageHandler implements I2PSessionListener {
private ConnectionManager _manager;
private I2PAppContext _context;
private Log _log;
private List _listeners;
public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) {
_manager = mgr;
_context = ctx;
_listeners = new ArrayList(1);
_log = ctx.logManager().getLog(MessageHandler.class);
_context.statManager().createRateStat("stream.packetReceiveFailure", "When do we fail to decrypt or otherwise receive a packet sent to us?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
}
@ -69,6 +74,16 @@ public class MessageHandler implements I2PSessionListener {
if (_log.shouldLog(Log.ERROR))
_log.error("I2PSession disconnected");
_manager.disconnectAllHard();
List listeners = null;
synchronized (_listeners) {
listeners = new ArrayList(_listeners);
_listeners.clear();
}
for (int i = 0; i < listeners.size(); i++) {
I2PSocketManager.DisconnectListener lsnr = (I2PSocketManager.DisconnectListener)listeners.get(i);
lsnr.sessionDisconnected();
}
}
/**
@ -80,4 +95,15 @@ public class MessageHandler implements I2PSessionListener {
_log.error("error occurred: " + message, error);
//_manager.disconnectAllHard();
}
public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
synchronized (_listeners) {
_listeners.add(lsnr);
}
}
public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
synchronized (_listeners) {
_listeners.remove(lsnr);
}
}
}

View File

@ -33,10 +33,13 @@ class SchedulerDead extends SchedulerImpl {
public boolean accept(Connection con) {
if (con == null) return false;
long timeSinceClose = _context.clock().now() - con.getCloseSentOn();
if (con.getResetSent())
timeSinceClose = _context.clock().now() - con.getResetSentOn();
boolean nothingLeftToDo = (con.getCloseSentOn() > 0) &&
(con.getCloseReceivedOn() > 0) &&
(con.getUnackedPacketsReceived() <= 0) &&
(con.getUnackedPacketsSent() <= 0) &&
(con.getResetSent()) &&
(timeSinceClose >= Connection.DISCONNECT_TIMEOUT);
boolean timedOut = (con.getOptions().getConnectTimeout() < con.getLifetime()) &&
con.getSendStreamId() == null &&

View File

@ -34,6 +34,8 @@ class SchedulerHardDisconnected extends SchedulerImpl {
public boolean accept(Connection con) {
if (con == null) return false;
long timeSinceClose = _context.clock().now() - con.getCloseSentOn();
if (con.getResetSent())
timeSinceClose = _context.clock().now() - con.getResetSentOn();
boolean ok = (con.getHardDisconnected() || con.getResetSent()) &&
(timeSinceClose < Connection.DISCONNECT_TIMEOUT);
return ok;

View File

@ -127,6 +127,21 @@ public class AESEngine {
_log.warn("Warning: AES is disabled");
}
public void encryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte out[], int outIndex) {
System.arraycopy(payload, inIndex, out, outIndex, out.length - outIndex);
}
/** decrypt the data with the session key provided
* @param payload encrypted data
* @param sessionKey private session key
* @return unencrypted data
*/
public void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) {
System.arraycopy(payload, inIndex, rv, outIndex, rv.length - outIndex);
}
public static void main(String args[]) {
I2PAppContext ctx = new I2PAppContext();
SessionKey key = ctx.keyGenerator().generateSessionKey();

View File

@ -161,7 +161,7 @@ public class AESInputStream extends FilterInputStream {
*
*/
private void refill() throws IOException {
if (!_eofFound) {
if ( (!_eofFound) && (_writesSinceDecrypt < BLOCK_SIZE) ) {
int read = in.read(_encryptedBuf, _writesSinceDecrypt, _encryptedBuf.length - _writesSinceDecrypt);
if (read == -1) {
_eofFound = true;

View File

@ -85,14 +85,14 @@ public class CryptixAESEngine extends AESEngine {
if (length % 16 != 0) numblock++;
decryptBlock(payload, payloadIndex, sessionKey, out, outIndex);
DataHelper.xor(out, outIndex, iv, 0, out, outIndex, 16);
DataHelper.xor(out, outIndex, iv, 0, out, outIndex, 16);
for (int x = 1; x < numblock; x++) {
decryptBlock(payload, payloadIndex + (x * 16), sessionKey, out, outIndex + (x * 16));
DataHelper.xor(out, outIndex + x * 16, payload, payloadIndex + (x - 1) * 16, out, outIndex + x * 16, 16);
}
}
final void encryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte out[], int outIndex) {
public final void encryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte out[], int outIndex) {
CryptixAESKeyCache.KeyCacheEntry keyData = _cache.acquireKey();
try {
Object key = CryptixRijndael_Algorithm.makeKey(sessionKey.getData(), 16, keyData);
@ -109,7 +109,7 @@ public class CryptixAESEngine extends AESEngine {
* @param sessionKey private session key
* @return unencrypted data
*/
final void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) {
public final void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) {
if ( (payload == null) || (rv == null) )
throw new IllegalArgumentException("null block args [payload=" + payload + " rv="+rv);
if (payload.length - inIndex > rv.length - outIndex)
@ -131,9 +131,11 @@ public class CryptixAESEngine extends AESEngine {
I2PAppContext ctx = new I2PAppContext();
try {
testEDBlock(ctx);
testEDBlock2(ctx);
testED(ctx);
testFake(ctx);
testNull(ctx);
testED2(ctx);
//testFake(ctx);
//testNull(ctx);
} catch (Exception e) {
e.printStackTrace();
}
@ -155,6 +157,21 @@ public class CryptixAESEngine extends AESEngine {
else
System.out.println("full D(E(orig)) == orig");
}
private static void testED2(I2PAppContext ctx) {
SessionKey key = ctx.keyGenerator().generateSessionKey();
byte iv[] = new byte[16];
byte orig[] = new byte[128];
byte data[] = new byte[128];
ctx.random().nextBytes(iv);
ctx.random().nextBytes(orig);
CryptixAESEngine aes = new CryptixAESEngine(ctx);
aes.encrypt(orig, 0, data, 0, key, iv, data.length);
aes.decrypt(data, 0, data, 0, key, iv, data.length);
if (!DataHelper.eq(data,orig))
throw new RuntimeException("full D(E(orig)) != orig");
else
System.out.println("full D(E(orig)) == orig");
}
private static void testFake(I2PAppContext ctx) {
SessionKey key = ctx.keyGenerator().generateSessionKey();
SessionKey wrongKey = ctx.keyGenerator().generateSessionKey();
@ -207,4 +224,19 @@ public class CryptixAESEngine extends AESEngine {
else
System.out.println("block D(E(orig)) == orig");
}
private static void testEDBlock2(I2PAppContext ctx) {
SessionKey key = ctx.keyGenerator().generateSessionKey();
byte iv[] = new byte[16];
byte orig[] = new byte[16];
byte data[] = new byte[16];
ctx.random().nextBytes(iv);
ctx.random().nextBytes(orig);
CryptixAESEngine aes = new CryptixAESEngine(ctx);
aes.encryptBlock(orig, 0, key, data, 0);
aes.decryptBlock(data, 0, key, data, 0);
if (!DataHelper.eq(data,orig))
throw new RuntimeException("block D(E(orig)) != orig");
else
System.out.println("block D(E(orig)) == orig");
}
}

View File

@ -43,10 +43,16 @@ public class Base64 {
private final static Log _log = new Log(Base64.class);
public static String encode(byte[] source) {
return encode(source, false);
return encode(source, 0, (source != null ? source.length : 0));
}
public static String encode(byte[] source, int off, int len) {
return encode(source, off, len, false);
}
public static String encode(byte[] source, boolean useStandardAlphabet) {
return safeEncode(source, useStandardAlphabet);
return encode(source, 0, (source != null ? source.length : 0), useStandardAlphabet);
}
public static String encode(byte[] source, int off, int len, boolean useStandardAlphabet) {
return safeEncode(source, off, len, useStandardAlphabet);
}
public static byte[] decode(String s) {
@ -318,8 +324,8 @@ public class Base64 {
* Same as encodeBytes, except uses a filesystem / URL friendly set of characters,
* replacing / with ~, and + with -
*/
private static String safeEncode(byte[] source, boolean useStandardAlphabet) {
String encoded = encodeBytes(source);
private static String safeEncode(byte[] source, int off, int len, boolean useStandardAlphabet) {
String encoded = encodeBytes(source, off, len, false);
if (useStandardAlphabet) {
// noop
} else {

View File

@ -559,6 +559,16 @@ public class DataHelper {
return lhs == rhs;
}
public final static boolean eq(byte lhs[], int offsetLeft, byte rhs[], int offsetRight, int length) {
if ( (lhs == null) || (rhs == null) ) return false;
if (length <= 0) return true;
for (int i = 0; i < length; i++) {
if (lhs[offsetLeft + i] != rhs[offsetRight + i])
return false;
}
return true;
}
public final static int compareTo(byte lhs[], byte rhs[]) {
if ((rhs == null) && (lhs == null)) return 0;
if (lhs == null) return -1;

View File

@ -69,6 +69,7 @@ public class Payload extends DataStructureImpl {
public void readBytes(InputStream in) throws DataFormatException, IOException {
int size = (int) DataHelper.readLong(in, 4);
if (size < 0) throw new DataFormatException("payload size out of range (" + size + ")");
_encryptedData = new byte[size];
int read = read(in, _encryptedData);
if (read != size) throw new DataFormatException("Incorrect number of bytes read in the payload structure");

View File

@ -1,4 +1,17 @@
$Id: history.txt,v 1.123 2004/12/31 12:18:05 jrandom Exp $
$Id: history.txt,v 1.124 2004/12/31 19:57:01 jrandom Exp $
2005-01-05 jrandom
* Handle unexpected network read errors more carefully (thanks parg!)
* Added more methods to partially compare (DataHelper) and display
arrays (Base64.encode).
* Exposed the AES encryptBlock/decryptBlock on the context.aes()
* Be more generous on the throttle when just starting up the router
* Fix a missing scheduled event in the streaming lib (caused after reset)
* Add a new DisconnectListener on the I2PSocketManager to allow
notification of session destruction.
* Make sure our own router identity is valid, and if it isn't, build a new
one and restart the router. Alternately, you can run the Router with
the single command line argument "rebuild" and it will do the same.
2004-12-31 ragnarok
* Integrated latest addressbook changes (2.0.3) which include support for

View File

@ -138,6 +138,11 @@ public class I2NPMessageReader {
_log.warn("IO Error handling message", ioe);
_listener.disconnected(I2NPMessageReader.this);
cancelRunner();
} catch (Exception e) {
_log.log(Log.CRIT, "wtf, error reading", e);
_listener.readError(I2NPMessageReader.this, e);
_listener.disconnected(I2NPMessageReader.this);
cancelRunner();
}
}
if (!_doRun) {

View File

@ -50,7 +50,10 @@ public abstract class NetworkDatabaseFacade implements Service {
* @throws IllegalArgumentException if the data is not valid
*/
public abstract RouterInfo store(Hash key, RouterInfo routerInfo) throws IllegalArgumentException;
public abstract void publish(RouterInfo localRouterInfo);
/**
* @throws IllegalArgumentException if the local router is not valid
*/
public abstract void publish(RouterInfo localRouterInfo) throws IllegalArgumentException;
public abstract void publish(LeaseSet localLeaseSet);
public abstract void unpublish(LeaseSet localLeaseSet);
public abstract void fail(Hash dbEntry);

View File

@ -261,11 +261,49 @@ public class Router {
}
ri.sign(key);
setRouterInfo(ri);
_context.netDb().publish(ri);
try {
_context.netDb().publish(ri);
} catch (IllegalArgumentException iae) {
_log.log(Log.CRIT, "Local router info is invalid? rebuilding a new identity", iae);
rebuildNewIdentity();
}
} catch (DataFormatException dfe) {
_log.log(Log.CRIT, "Internal error - unable to sign our own address?!", dfe);
}
}
/**
* Ugly list of files that we need to kill if we are building a new identity
*
*/
private static final String _rebuildFiles[] = new String[] { "router.info",
"router.keys",
"connectionTag.keys",
"keyBackup/privateEncryption.key",
"keyBackup/privateSigning.key",
"keyBackup/publicEncryption.key",
"keyBackup/publicSigning.key",
"sessionKeys.dat" };
/**
* Rebuild a new identity the hard way - delete all of our old identity
* files, then reboot the router.
*
*/
public void rebuildNewIdentity() {
for (int i = 0; i < _rebuildFiles.length; i++) {
File f = new File(_rebuildFiles[i]);
if (f.exists()) {
boolean removed = f.delete();
if (removed)
System.out.println("INFO: Removing old identity file: " + _rebuildFiles[i]);
else
System.out.println("ERROR: Could not remove old identity file: " + _rebuildFiles[i]);
}
}
System.out.println("INFO: Restarting the router after removing any old identity files");
// hard and ugly
System.exit(EXIT_GRACEFUL_RESTART);
}
/**
* coalesce the stats framework every minute
@ -813,7 +851,11 @@ public class Router {
installUpdates();
verifyWrapperConfig();
Router r = new Router();
r.runRouter();
if ( (args != null) && (args.length == 1) && ("rebuild".equals(args[0])) ) {
r.rebuildNewIdentity();
} else {
r.runRouter();
}
}
private static final String UPDATE_FILE = "i2pupdate.zip";

View File

@ -50,7 +50,7 @@ class RouterThrottleImpl implements RouterThrottle {
public boolean acceptNetworkMessage() {
long lag = _context.jobQueue().getMaxLag();
if (lag > JOB_LAG_LIMIT) {
if ( (lag > JOB_LAG_LIMIT) && (_context.router().getUptime() > 60*1000) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Throttling network reader, as the job lag is " + lag);
_context.statManager().addRateData("router.throttleNetworkCause", lag, lag);

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.128 $ $Date: 2004/12/31 12:18:05 $";
public final static String ID = "$Revision: 1.129 $ $Date: 2004/12/31 19:57:01 $";
public final static String VERSION = "0.4.2.5";
public final static long BUILD = 6;
public final static long BUILD = 7;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -54,7 +54,12 @@ public class PublishLocalRouterInfoJob extends JobImpl {
_log.info("Newly updated routerInfo is published with " + stats.size()
+ "/" + ri.getOptions().size() + " options on "
+ new Date(ri.getPublished()));
getContext().netDb().publish(ri);
try {
getContext().netDb().publish(ri);
} catch (IllegalArgumentException iae) {
_log.log(Log.CRIT, "Error publishing our identity - corrupt?", iae);
getContext().router().rebuildNewIdentity();
}
} catch (DataFormatException dfe) {
_log.error("Error signing the updated local router info!", dfe);
}

View File

@ -487,18 +487,17 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
}
}
public void publish(RouterInfo localRouterInfo) {
/**
* @throws IllegalArgumentException if the local router info is invalid
*/
public void publish(RouterInfo localRouterInfo) throws IllegalArgumentException {
if (!_initialized) return;
Hash h = localRouterInfo.getIdentity().getHash();
try {
store(h, localRouterInfo);
synchronized (_explicitSendKeys) {
_explicitSendKeys.add(h);
}
writeMyInfo(localRouterInfo);
} catch (IllegalArgumentException iae) {
_log.error("Local routerInfo was invalid? "+ iae.getMessage(), iae);
store(h, localRouterInfo);
synchronized (_explicitSendKeys) {
_explicitSendKeys.add(h);
}
writeMyInfo(localRouterInfo);
}
/**