forked from I2P_Developers/i2p.i2p
I2CP: Ensure that callbacks are called on abnormal close
throw IAE on invalid listener protocol/port log tweaks
This commit is contained in:
@ -23,6 +23,7 @@ import net.i2p.util.Log;
|
||||
* messageAvailable() only calls one listener, not all that apply.
|
||||
* The others call all listeners.
|
||||
*
|
||||
* @since 0.7.1
|
||||
* @author zzz
|
||||
*/
|
||||
public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
|
||||
@ -31,7 +32,7 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
|
||||
|
||||
public I2PSessionDemultiplexer(I2PAppContext ctx) {
|
||||
_log = ctx.logManager().getLog(I2PSessionDemultiplexer.class);
|
||||
_listeners = new ConcurrentHashMap<Integer, I2PSessionMuxedListener>();
|
||||
_listeners = new ConcurrentHashMap<Integer, I2PSessionMuxedListener>(4);
|
||||
}
|
||||
|
||||
/** unused */
|
||||
@ -39,9 +40,9 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
|
||||
|
||||
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport ) {
|
||||
I2PSessionMuxedListener l = findListener(proto, toport);
|
||||
if (l != null)
|
||||
if (l != null) {
|
||||
l.messageAvailable(session, msgId, size, proto, fromport, toport);
|
||||
else {
|
||||
} else {
|
||||
// no listener, throw it out
|
||||
if (_listeners.isEmpty()) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
@ -58,18 +59,25 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
|
||||
}
|
||||
|
||||
public void reportAbuse(I2PSession session, int severity) {
|
||||
for (I2PSessionMuxedListener l : _listeners.values())
|
||||
for (I2PSessionMuxedListener l : _listeners.values()) {
|
||||
l.reportAbuse(session, severity);
|
||||
}
|
||||
}
|
||||
|
||||
public void disconnected(I2PSession session) {
|
||||
for (I2PSessionMuxedListener l : _listeners.values())
|
||||
for (I2PSessionMuxedListener l : _listeners.values()) {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Sending disconnected() to " + l);
|
||||
l.disconnected(session);
|
||||
}
|
||||
}
|
||||
|
||||
public void errorOccurred(I2PSession session, String message, Throwable error) {
|
||||
for (I2PSessionMuxedListener l : _listeners.values())
|
||||
for (I2PSessionMuxedListener l : _listeners.values()) {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Sending errorOccurred() \"" + message + "\" to " + l);
|
||||
l.errorOccurred(session, message, error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -78,6 +86,10 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
|
||||
* (Streaming lib)
|
||||
*/
|
||||
public void addListener(I2PSessionListener l, int proto, int port) {
|
||||
if (proto < 0 || proto > 254 || port < 0 || port > 65535)
|
||||
throw new IllegalArgumentException();
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Old addListener() " + l + ' ' + proto + ' ' + port);
|
||||
I2PSessionListener old = _listeners.put(key(proto, port), new NoPortsListener(l));
|
||||
if (old != null && _log.shouldLog(Log.WARN))
|
||||
_log.warn("Listener " + l + " replaces " + old + " for proto: " + proto + " port: " + port);
|
||||
@ -88,12 +100,20 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
|
||||
* UDP perhaps
|
||||
*/
|
||||
public void addMuxedListener(I2PSessionMuxedListener l, int proto, int port) {
|
||||
if (proto < 0 || proto > 254 || port < 0 || port > 65535)
|
||||
throw new IllegalArgumentException();
|
||||
if (_log.shouldInfo())
|
||||
_log.info("addMuxedListener() " + l + ' ' + proto + ' ' + port);
|
||||
I2PSessionListener old = _listeners.put(key(proto, port), l);
|
||||
if (old != null && _log.shouldLog(Log.WARN))
|
||||
_log.warn("Listener " + l + " replaces " + old + " for proto: " + proto + " port: " + port);
|
||||
}
|
||||
|
||||
public void removeListener(int proto, int port) {
|
||||
if (proto < 0 || proto > 254 || port < 0 || port > 65535)
|
||||
throw new IllegalArgumentException();
|
||||
if (_log.shouldInfo())
|
||||
_log.info("removeListener() " + proto + ' ' + port);
|
||||
_listeners.remove(key(proto, port));
|
||||
}
|
||||
|
||||
|
@ -1115,12 +1115,14 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
|
||||
/**
|
||||
* The I2CPMessageEventListener callback.
|
||||
* Recieve notifiation of an error reading the I2CP stream.
|
||||
* As of 0.9.41, does NOT call sessionlistener.disconnected(),
|
||||
* the I2CPMessageReader will call disconnected() also.
|
||||
*
|
||||
* @param reader unused
|
||||
* @param error non-null
|
||||
*/
|
||||
public void readError(I2CPMessageReader reader, Exception error) {
|
||||
propogateError("There was an error reading data", error);
|
||||
disconnect();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1160,7 +1162,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
|
||||
* Retrieve the configuration options, filtered.
|
||||
* All defaults passed in via constructor have been promoted to the primary map.
|
||||
*
|
||||
* @return non-null, if insantiated with null options, this will be the System properties.
|
||||
* @return non-null, if instantiated with null options, this will be the System properties.
|
||||
*/
|
||||
Properties getOptions() { return _options; }
|
||||
|
||||
@ -1257,6 +1259,8 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
|
||||
/**
|
||||
* Pass off the error to the listener
|
||||
* Misspelled, oh well.
|
||||
* Calls sessionlistener.errorOccurred()
|
||||
*
|
||||
* @param error non-null
|
||||
*/
|
||||
void propogateError(String msg, Throwable error) {
|
||||
@ -1292,6 +1296,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
|
||||
* Tear down the session, and do NOT reconnect.
|
||||
*
|
||||
* Will interrupt an open in progress.
|
||||
* Calls sessionlistener.disconnected()
|
||||
*/
|
||||
public void destroySession(boolean sendDisconnect) {
|
||||
synchronized(_stateLock) {
|
||||
@ -1379,15 +1384,16 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
|
||||
/**
|
||||
* The I2CPMessageEventListener callback.
|
||||
* Recieve notification that the I2CP connection was disconnected.
|
||||
* Calls sessionlistener.disconnected()
|
||||
* @param reader unused
|
||||
*/
|
||||
public void disconnected(I2CPMessageReader reader) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Disconnected", new Exception("Disconnected"));
|
||||
disconnect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Will interrupt a connect in progress.
|
||||
* Calls sessionlistener.disconnected()
|
||||
*/
|
||||
protected void disconnect() {
|
||||
State oldState;
|
||||
@ -1397,7 +1403,9 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
|
||||
oldState = _state;
|
||||
changeState(State.CLOSING);
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Disconnect() called", new Exception("Disconnect"));
|
||||
if (_log.shouldWarn())
|
||||
_log.warn(getPrefix() + "Disconnected", new Exception("Disconnected"));
|
||||
if (_sessionListener != null) _sessionListener.disconnected(this);
|
||||
// don't try to reconnect if it failed before GETTDATE
|
||||
if (oldState != State.OPENING && shouldReconnect()) {
|
||||
if (reconnect()) {
|
||||
@ -1409,7 +1417,6 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
|
||||
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error(getPrefix() + "Disconned from the router, and not trying to reconnect");
|
||||
if (_sessionListener != null) _sessionListener.disconnected(this);
|
||||
|
||||
closeSocket();
|
||||
changeState(State.CLOSED);
|
||||
@ -1459,7 +1466,6 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
|
||||
protected String getPrefix() {
|
||||
StringBuilder buf = new StringBuilder();
|
||||
buf.append('[');
|
||||
buf.append(_state.toString()).append(' ');
|
||||
String s = _options.getProperty("inbound.nickname");
|
||||
if (s != null)
|
||||
buf.append(s);
|
||||
@ -1468,6 +1474,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
|
||||
SessionId id = _sessionId;
|
||||
if (id != null)
|
||||
buf.append(" #").append(id.getSessionId());
|
||||
buf.append('(').append(_state.toString()).append(')');
|
||||
buf.append("]: ");
|
||||
return buf.toString();
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ public class I2CPMessageReader {
|
||||
|
||||
/**
|
||||
* Notify the listener that an exception was thrown while reading from the given
|
||||
* reader
|
||||
* reader. For most errors, disconnected() will also be called, as of 0.9.41.
|
||||
*
|
||||
* @param reader I2CPMessageReader to notify
|
||||
* @param error Exception that was thrown, non-null
|
||||
@ -117,8 +117,8 @@ public class I2CPMessageReader {
|
||||
public void readError(I2CPMessageReader reader, Exception error);
|
||||
|
||||
/**
|
||||
* Notify the listener that the stream the given reader was running off
|
||||
* closed
|
||||
* Notify the listener that the stream this reader was reading was
|
||||
* closed. For most errors, readError() will be called first, as of 0.9.41
|
||||
*
|
||||
* @param reader I2CPMessageReader to notify
|
||||
*/
|
||||
@ -165,6 +165,7 @@ public class I2CPMessageReader {
|
||||
} catch (RuntimeException e) {
|
||||
_log.log(Log.CRIT, "Uncaught I2CP error", e);
|
||||
_listener.readError(I2CPMessageReader.this, e);
|
||||
_listener.disconnected(I2CPMessageReader.this);
|
||||
cancelRunner();
|
||||
}
|
||||
}
|
||||
@ -190,6 +191,7 @@ public class I2CPMessageReader {
|
||||
cancelRunner();
|
||||
} catch (IOException ioe) {
|
||||
_log.warn("IO Error handling message", ioe);
|
||||
_listener.readError(I2CPMessageReader.this, ioe);
|
||||
_listener.disconnected(I2CPMessageReader.this);
|
||||
cancelRunner();
|
||||
} catch (OutOfMemoryError oom) {
|
||||
@ -197,6 +199,7 @@ public class I2CPMessageReader {
|
||||
throw oom;
|
||||
} catch (RuntimeException e) {
|
||||
_log.log(Log.CRIT, "Unhandled error reading I2CP stream", e);
|
||||
_listener.readError(I2CPMessageReader.this, e);
|
||||
_listener.disconnected(I2CPMessageReader.this);
|
||||
cancelRunner();
|
||||
}
|
||||
|
Reference in New Issue
Block a user