* I2CP Client: Add support for muxing
This commit is contained in:
@ -89,9 +89,17 @@ class PacketQueue {
|
||||
// so if we retransmit it will use a new tunnel/lease combo
|
||||
expires = rpe.getNextSendTime() - 500;
|
||||
if (expires > 0)
|
||||
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires);
|
||||
// I2PSessionImpl2
|
||||
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires);
|
||||
// I2PSessionMuxedImpl
|
||||
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires,
|
||||
I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
|
||||
else
|
||||
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent);
|
||||
// I2PSessionImpl2
|
||||
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0);
|
||||
// I2PSessionMuxedImpl
|
||||
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent,
|
||||
I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
|
||||
end = _context.clock().now();
|
||||
|
||||
if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) )
|
||||
|
@ -77,6 +77,6 @@ class I2PClientImpl implements I2PClient {
|
||||
*
|
||||
*/
|
||||
public I2PSession createSession(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException {
|
||||
return new I2PSessionImpl2(context, destKeyStream, options); // thread safe
|
||||
return new I2PSessionMuxedImpl(context, destKeyStream, options); // thread safe and muxed
|
||||
}
|
||||
}
|
||||
|
@ -40,6 +40,8 @@ public interface I2PSession {
|
||||
*/
|
||||
public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException;
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException;
|
||||
/** See I2PSessionMuxedImpl for details */
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException;
|
||||
|
||||
/**
|
||||
* Like sendMessage above, except the key used and the tags sent are exposed to the
|
||||
@ -71,6 +73,12 @@ public interface I2PSession {
|
||||
public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException;
|
||||
/** See I2PSessionMuxedImpl for details */
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent,
|
||||
int proto, int fromport, int toport) throws I2PSessionException;
|
||||
/** See I2PSessionMuxedImpl for details */
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
|
||||
int proto, int fromport, int toport) throws I2PSessionException;
|
||||
|
||||
/** Receive a message that the router has notified the client about, returning
|
||||
* the payload.
|
||||
@ -134,4 +142,18 @@ public interface I2PSession {
|
||||
*
|
||||
*/
|
||||
public Destination lookupDest(Hash h) throws I2PSessionException;
|
||||
|
||||
/** See I2PSessionMuxedImpl for details */
|
||||
public void addSessionListener(I2PSessionListener lsnr, int proto, int port);
|
||||
/** See I2PSessionMuxedImpl for details */
|
||||
public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port);
|
||||
/** See I2PSessionMuxedImpl for details */
|
||||
public void removeListener(int proto, int port);
|
||||
|
||||
public static final int PORT_ANY = 0;
|
||||
public static final int PORT_UNSPECIFIED = 0;
|
||||
public static final int PROTO_ANY = 0;
|
||||
public static final int PROTO_UNSPECIFIED = 0;
|
||||
public static final int PROTO_STREAMING = 6;
|
||||
public static final int PROTO_DATAGRAM = 17;
|
||||
}
|
||||
|
135
core/java/src/net/i2p/client/I2PSessionDemultiplexer.java
Normal file
135
core/java/src/net/i2p/client/I2PSessionDemultiplexer.java
Normal file
@ -0,0 +1,135 @@
|
||||
package net.i2p.client;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/*
|
||||
* public domain
|
||||
*/
|
||||
|
||||
/**
|
||||
* Implement multiplexing with a 1-byte 'protocol' and a two-byte 'port'.
|
||||
* Listeners register with either addListener() or addMuxedListener(),
|
||||
* depending on whether they want to hear about the
|
||||
* protocol, from port, and to port for every received message.
|
||||
*
|
||||
* This only calls one listener, not all that apply.
|
||||
*
|
||||
* @author zzz
|
||||
*/
|
||||
public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
|
||||
private Log _log;
|
||||
private Map<Integer, I2PSessionMuxedListener> _listeners;
|
||||
|
||||
public I2PSessionDemultiplexer(I2PAppContext ctx) {
|
||||
_log = ctx.logManager().getLog(I2PSessionDemultiplexer.class);
|
||||
_listeners = new ConcurrentHashMap();
|
||||
}
|
||||
|
||||
/** unused */
|
||||
public void messageAvailable(I2PSession session, int msgId, long size) {}
|
||||
|
||||
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport ) {
|
||||
I2PSessionMuxedListener l = findListener(proto, toport);
|
||||
if (l != null)
|
||||
l.messageAvailable(session, msgId, size, proto, fromport, toport);
|
||||
else {
|
||||
// no listener, throw it out
|
||||
_log.error("No listener found for proto: " + proto + " port: " + toport + "msg id: " + msgId +
|
||||
" from pool of " + _listeners.size() + " listeners");
|
||||
try {
|
||||
session.receiveMessage(msgId);
|
||||
} catch (I2PSessionException ise) {}
|
||||
}
|
||||
}
|
||||
|
||||
public void reportAbuse(I2PSession session, int severity) {
|
||||
for (I2PSessionMuxedListener l : _listeners.values())
|
||||
l.reportAbuse(session, severity);
|
||||
}
|
||||
|
||||
public void disconnected(I2PSession session) {
|
||||
for (I2PSessionMuxedListener l : _listeners.values())
|
||||
l.disconnected(session);
|
||||
}
|
||||
|
||||
public void errorOccurred(I2PSession session, String message, Throwable error) {
|
||||
for (I2PSessionMuxedListener l : _listeners.values())
|
||||
l.errorOccurred(session, message, error);
|
||||
}
|
||||
|
||||
/**
|
||||
* For those that don't need to hear about the protocol and ports
|
||||
* in messageAvailable()
|
||||
* (Streaming lib)
|
||||
*/
|
||||
public void addListener(I2PSessionListener l, int proto, int port) {
|
||||
_listeners.put(key(proto, port), new NoPortsListener(l));
|
||||
}
|
||||
|
||||
/**
|
||||
* For those that do care
|
||||
* UDP perhaps
|
||||
*/
|
||||
public void addMuxedListener(I2PSessionMuxedListener l, int proto, int port) {
|
||||
_listeners.put(key(proto, port), l);
|
||||
}
|
||||
|
||||
public void removeListener(int proto, int port) {
|
||||
_listeners.remove(key(proto, port));
|
||||
}
|
||||
|
||||
/** find the one listener that most specifically matches the request */
|
||||
private I2PSessionMuxedListener findListener(int proto, int port) {
|
||||
I2PSessionMuxedListener rv = getListener(proto, port);
|
||||
if (rv != null) return rv;
|
||||
if (port != I2PSession.PORT_ANY) { // try any port
|
||||
rv = getListener(proto, I2PSession.PORT_ANY);
|
||||
if (rv != null) return rv;
|
||||
}
|
||||
if (proto != I2PSession.PROTO_ANY) { // try any protocol
|
||||
rv = getListener(I2PSession.PROTO_ANY, port);
|
||||
if (rv != null) return rv;
|
||||
}
|
||||
if (proto != I2PSession.PROTO_ANY && port != I2PSession.PORT_ANY) { // try default
|
||||
rv = getListener(I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
private I2PSessionMuxedListener getListener(int proto, int port) {
|
||||
return _listeners.get(key(proto, port));
|
||||
}
|
||||
|
||||
private Integer key(int proto, int port) {
|
||||
return Integer.valueOf(((port << 8) & 0xffff00) | proto);
|
||||
}
|
||||
|
||||
/** for those that don't care about proto and ports */
|
||||
private static class NoPortsListener implements I2PSessionMuxedListener {
|
||||
private I2PSessionListener _l;
|
||||
|
||||
public NoPortsListener(I2PSessionListener l) {
|
||||
_l = l;
|
||||
}
|
||||
|
||||
public void messageAvailable(I2PSession session, int msgId, long size) {
|
||||
throw new IllegalArgumentException("no");
|
||||
}
|
||||
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport) {
|
||||
_l.messageAvailable(session, msgId, size);
|
||||
}
|
||||
public void reportAbuse(I2PSession session, int severity) {
|
||||
_l.reportAbuse(session, severity);
|
||||
}
|
||||
public void disconnected(I2PSession session) {
|
||||
_l.disconnected(session);
|
||||
}
|
||||
public void errorOccurred(I2PSession session, String message, Throwable error) {
|
||||
_l.errorOccurred(session, message, error);
|
||||
}
|
||||
}
|
||||
}
|
@ -77,12 +77,12 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
|
||||
protected OutputStream _out;
|
||||
|
||||
/** who we send events to */
|
||||
private I2PSessionListener _sessionListener;
|
||||
protected I2PSessionListener _sessionListener;
|
||||
|
||||
/** class that generates new messages */
|
||||
protected I2CPMessageProducer _producer;
|
||||
/** map of Long --> MessagePayloadMessage */
|
||||
private Map<Long, MessagePayloadMessage> _availableMessages;
|
||||
protected Map<Long, MessagePayloadMessage> _availableMessages;
|
||||
|
||||
protected I2PClientMessageHandlerMap _handlerMap;
|
||||
|
||||
@ -366,14 +366,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
|
||||
}
|
||||
SimpleScheduler.getInstance().addEvent(new VerifyUsage(mid), 30*1000);
|
||||
}
|
||||
private class VerifyUsage implements SimpleTimer.TimedEvent {
|
||||
protected class VerifyUsage implements SimpleTimer.TimedEvent {
|
||||
private Long _msgId;
|
||||
public VerifyUsage(Long id) { _msgId = id; }
|
||||
|
||||
public void timeReached() {
|
||||
MessagePayloadMessage removed = _availableMessages.remove(_msgId);
|
||||
if (removed != null && !isClosed())
|
||||
_log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed);
|
||||
_log.error("Message NOT removed! id=" + _msgId + ": " + removed);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,7 +93,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
* set to false.
|
||||
*/
|
||||
private static final int DONT_COMPRESS_SIZE = 66;
|
||||
private boolean shouldCompress(int size) {
|
||||
protected boolean shouldCompress(int size) {
|
||||
if (size <= DONT_COMPRESS_SIZE)
|
||||
return false;
|
||||
String p = getOptions().getProperty("i2cp.gzip");
|
||||
@ -102,12 +102,35 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
return SHOULD_COMPRESS;
|
||||
}
|
||||
|
||||
public void addSessionListener(I2PSessionListener lsnr, int proto, int port) {
|
||||
throw new IllegalArgumentException("Use MuxedImpl");
|
||||
}
|
||||
public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) {
|
||||
throw new IllegalArgumentException("Use MuxedImpl");
|
||||
}
|
||||
public void removeListener(int proto, int port) {
|
||||
throw new IllegalArgumentException("Use MuxedImpl");
|
||||
}
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException {
|
||||
throw new IllegalArgumentException("Use MuxedImpl");
|
||||
}
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent,
|
||||
int proto, int fromport, int toport) throws I2PSessionException {
|
||||
throw new IllegalArgumentException("Use MuxedImpl");
|
||||
}
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
|
||||
int proto, int fromport, int toport) throws I2PSessionException {
|
||||
throw new IllegalArgumentException("Use MuxedImpl");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException {
|
||||
return sendMessage(dest, payload, 0, payload.length);
|
||||
}
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException {
|
||||
return sendMessage(dest, payload, offset, size, new SessionKey(), new HashSet(64), 0);
|
||||
// we don't do end-to-end crypto any more
|
||||
//return sendMessage(dest, payload, offset, size, new SessionKey(), new HashSet(64), 0);
|
||||
return sendMessage(dest, payload, offset, size, null, null, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -173,7 +196,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
|
||||
private static final int NUM_TAGS = 50;
|
||||
|
||||
private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent, long expires)
|
||||
protected boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent, long expires)
|
||||
throws I2PSessionException {
|
||||
SessionKey key = null;
|
||||
SessionKey newKey = null;
|
||||
|
@ -20,7 +20,7 @@ public interface I2PSessionListener {
|
||||
* size # of bytes.
|
||||
* @param session session to notify
|
||||
* @param msgId message number available
|
||||
* @param size size of the message
|
||||
* @param size size of the message - why it's a long and not an int is a mystery
|
||||
*/
|
||||
void messageAvailable(I2PSession session, int msgId, long size);
|
||||
|
||||
|
319
core/java/src/net/i2p/client/I2PSessionMuxedImpl.java
Normal file
319
core/java/src/net/i2p/client/I2PSessionMuxedImpl.java
Normal file
@ -0,0 +1,319 @@
|
||||
package net.i2p.client;
|
||||
|
||||
/*
|
||||
* public domain
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.HashSet;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.DataHelper;
|
||||
import net.i2p.data.Destination;
|
||||
import net.i2p.data.SessionKey;
|
||||
import net.i2p.data.SessionTag;
|
||||
import net.i2p.data.i2cp.MessagePayloadMessage;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleScheduler;
|
||||
|
||||
/**
|
||||
* I2PSession with protocol and ports
|
||||
*
|
||||
* Streaming lib has been modified to send I2PSession.PROTO_STREAMING but
|
||||
* still receives all. It sends with fromPort and toPort = 0, and receives on all ports.
|
||||
*
|
||||
* No datagram apps have been modified yet.
|
||||
|
||||
* Therefore the compatibility situation is as follows:
|
||||
*
|
||||
* Compatibility:
|
||||
* old streaming -> new streaming: sends proto anything, rcvs proto anything
|
||||
* new streaming -> old streaming: sends PROTO_STREAMING, ignores rcvd proto
|
||||
* old datagram -> new datagram: sends proto anything, rcvs proto anything
|
||||
* new datagram -> old datagram: sends PROTO_DATAGRAM, ignores rcvd proto
|
||||
* In all the above cases, streaming and datagram receive traffic for the other
|
||||
* protocol, same as before.
|
||||
*
|
||||
* old datagram -> new muxed: doesn't work because the old sends proto 0 but the udp side
|
||||
* of the mux registers with PROTO_DATAGRAM, so the datagrams
|
||||
* go to the streaming side, same as before.
|
||||
* old streaming -> new muxed: works
|
||||
*
|
||||
* Typical Usage:
|
||||
* Streaming + datagrams:
|
||||
* I2PSocketManager sockMgr = getSocketManager();
|
||||
* I2PSession session = sockMgr.getSession();
|
||||
* session.addMuxedSessionListener(myI2PSessionMuxedListener, I2PSession.PROTO_DATAGRAM, I2PSession.PORT_ANY);
|
||||
* * or *
|
||||
* session.addSessionListener(myI2PSessionListener, I2PSession.PROTO_DATAGRAM, I2PSession.PORT_ANY);
|
||||
* session.sendMessage(dest, payload, I2PSession.PROTO_DATAGRAM, fromPort, toPort);
|
||||
*
|
||||
* Datagrams only, with multiple ports:
|
||||
* I2PClient client = I2PClientFactory.createClient();
|
||||
* ...
|
||||
* I2PSession session = client.createSession(...);
|
||||
* session.addMuxedSessionListener(myI2PSessionMuxedListener, I2PSession.PROTO_DATAGRAM, I2PSession.PORT_ANY);
|
||||
* * or *
|
||||
* session.addSessionListener(myI2PSessionListener, I2PSession.PROTO_DATAGRAM, I2PSession.PORT_ANY);
|
||||
* session.sendMessage(dest, payload, I2PSession.PROTO_DATAGRAM, fromPort, toPort);
|
||||
*
|
||||
* Multiple streaming ports:
|
||||
* Needs some streaming lib hacking
|
||||
*
|
||||
* @author zzz
|
||||
*/
|
||||
class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
|
||||
private I2PSessionDemultiplexer _demultiplexer;
|
||||
|
||||
public I2PSessionMuxedImpl(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException {
|
||||
super(ctx, destKeyStream, options);
|
||||
// also stored in _sessionListener but we keep it in _demultipexer
|
||||
// as well so we don't have to keep casting
|
||||
_demultiplexer = new I2PSessionDemultiplexer(ctx);
|
||||
super.setSessionListener(_demultiplexer);
|
||||
// discards the one in super(), sorry about that... (no it wasn't started yet)
|
||||
_availabilityNotifier = new MuxedAvailabilityNotifier();
|
||||
}
|
||||
|
||||
/** listen on all protocols and ports */
|
||||
@Override
|
||||
public void setSessionListener(I2PSessionListener lsnr) {
|
||||
_demultiplexer.addListener(lsnr, PROTO_ANY, PORT_ANY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Listen on specified protocol and port.
|
||||
*
|
||||
* An existing listener with the same proto and port is replaced.
|
||||
* Only the listener with the best match is called back for each message.
|
||||
*
|
||||
* @param proto 1-254 or PROTO_ANY for all; recommended:
|
||||
* I2PSession.PROTO_STREAMING
|
||||
* I2PSession.PROTO_DATAGRAM
|
||||
* 255 disallowed
|
||||
* @param port 1-65535 or PORT_ANY for all
|
||||
*/
|
||||
public void addSessionListener(I2PSessionListener lsnr, int proto, int port) {
|
||||
_demultiplexer.addListener(lsnr, proto, port);
|
||||
}
|
||||
|
||||
/**
|
||||
* Listen on specified protocol and port, and receive notification
|
||||
* of proto, fromPort, and toPort for every message.
|
||||
* @param proto 1-254 or 0 for all; 255 disallowed
|
||||
* @param port 1-65535 or 0 for all
|
||||
*/
|
||||
public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) {
|
||||
_demultiplexer.addMuxedListener(l, proto, port);
|
||||
}
|
||||
|
||||
/** removes the specified listener (only) */
|
||||
public void removeListener(int proto, int port) {
|
||||
_demultiplexer.removeListener(proto, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException {
|
||||
return sendMessage(dest, payload, 0, 0, null, null, 0, PROTO_UNSPECIFIED, PORT_UNSPECIFIED, PORT_UNSPECIFIED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException {
|
||||
return sendMessage(dest, payload, 0, 0, null, null, 0, proto, fromport, toport);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
|
||||
SessionKey keyUsed, Set tagsSent, long expires)
|
||||
throws I2PSessionException {
|
||||
return sendMessage(dest, payload, offset, size, keyUsed, tagsSent, 0, PROTO_UNSPECIFIED, PORT_UNSPECIFIED, PORT_UNSPECIFIED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent,
|
||||
int proto, int fromport, int toport) throws I2PSessionException {
|
||||
return sendMessage(dest, payload, offset, size, keyUsed, tagsSent, 0, proto, fromport, toport);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param proto 1-254 or 0 for unset; recommended:
|
||||
* I2PSession.PROTO_UNSPECIFIED
|
||||
* I2PSession.PROTO_STREAMING
|
||||
* I2PSession.PROTO_DATAGRAM
|
||||
* 255 disallowed
|
||||
* @param fromport 1-65535 or 0 for unset
|
||||
* @param toport 1-65535 or 0 for unset
|
||||
*/
|
||||
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
|
||||
SessionKey keyUsed, Set tagsSent, long expires,
|
||||
int proto, int fromPort, int toPort)
|
||||
throws I2PSessionException {
|
||||
if (isClosed()) throw new I2PSessionException("Already closed");
|
||||
updateActivity();
|
||||
|
||||
boolean sc = shouldCompress(size);
|
||||
if (sc)
|
||||
payload = DataHelper.compress(payload, offset, size);
|
||||
else
|
||||
payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION);
|
||||
|
||||
setProto(payload, proto);
|
||||
setFromPort(payload, fromPort);
|
||||
setToPort(payload, toPort);
|
||||
|
||||
_context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
|
||||
_context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
|
||||
return sendBestEffort(dest, payload, keyUsed, tagsSent, expires);
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive a payload message and let the app know its available
|
||||
*/
|
||||
@Override
|
||||
public void addNewMessage(MessagePayloadMessage msg) {
|
||||
Long mid = new Long(msg.getMessageId());
|
||||
_availableMessages.put(mid, msg);
|
||||
long id = msg.getMessageId();
|
||||
byte data[] = msg.getPayload().getUnencryptedData();
|
||||
if ((data == null) || (data.length <= 0)) {
|
||||
if (_log.shouldLog(Log.CRIT))
|
||||
_log.log(Log.CRIT, getPrefix() + "addNewMessage of a message with no unencrypted data",
|
||||
new Exception("Empty message"));
|
||||
return;
|
||||
}
|
||||
int size = data.length;
|
||||
if (size < 10) {
|
||||
_log.error(getPrefix() + "length too short for gzip header: " + size);
|
||||
return;
|
||||
}
|
||||
((MuxedAvailabilityNotifier)_availabilityNotifier).available(id, size, getProto(msg),
|
||||
getFromPort(msg), getToPort(msg));
|
||||
SimpleScheduler.getInstance().addEvent(new VerifyUsage(mid), 30*1000);
|
||||
}
|
||||
|
||||
protected class MuxedAvailabilityNotifier extends AvailabilityNotifier {
|
||||
private LinkedBlockingQueue<MsgData> _msgs;
|
||||
private boolean _alive;
|
||||
private static final int POISON_SIZE = -99999;
|
||||
|
||||
public MuxedAvailabilityNotifier() {
|
||||
_msgs = new LinkedBlockingQueue();
|
||||
}
|
||||
|
||||
public void stopNotifying() {
|
||||
_msgs.clear();
|
||||
if (_alive) {
|
||||
_alive = false;
|
||||
try {
|
||||
_msgs.put(new MsgData(0, POISON_SIZE, 0, 0, 0));
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
}
|
||||
|
||||
/** unused */
|
||||
public void available(long msgId, int size) { throw new IllegalArgumentException("no"); }
|
||||
|
||||
public void available(long msgId, int size, int proto, int fromPort, int toPort) {
|
||||
try {
|
||||
_msgs.put(new MsgData((int)(msgId & 0xffffffff), size, proto, fromPort, toPort));
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
|
||||
public void run() {
|
||||
_alive = true;
|
||||
while (true) {
|
||||
MsgData msg;
|
||||
try {
|
||||
msg = _msgs.take();
|
||||
} catch (InterruptedException ie) {
|
||||
continue;
|
||||
}
|
||||
if (msg.size == POISON_SIZE)
|
||||
break;
|
||||
try {
|
||||
_demultiplexer.messageAvailable(I2PSessionMuxedImpl.this, msg.id,
|
||||
msg.size, msg.proto, msg.fromPort, msg.toPort);
|
||||
} catch (Exception e) {
|
||||
_log.error("Error notifying app of message availability");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** let's keep this simple */
|
||||
private static class MsgData {
|
||||
public int id, size, proto, fromPort, toPort;
|
||||
public MsgData(int i, int s, int p, int f, int t) {
|
||||
id = i;
|
||||
size = s;
|
||||
proto = p;
|
||||
fromPort = f;
|
||||
toPort = t;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* No, we couldn't put any protocol byte in front of everything and
|
||||
* keep backward compatibility. But there are several bytes that
|
||||
* are unused AND unchecked in the gzip header in releases <= 0.7.
|
||||
* So let's use 5 of them for a protocol and two 2-byte ports.
|
||||
*
|
||||
* Following are all the methods to hide the
|
||||
* protocol, fromPort, and toPort in the gzip header
|
||||
*
|
||||
* The fields used are all ignored on receive in ResettableGzipInputStream
|
||||
*
|
||||
* See also ResettableGzipOutputStream.
|
||||
* Ref: RFC 1952
|
||||
*
|
||||
*/
|
||||
|
||||
/** OS byte in gzip header */
|
||||
private static final int PROTO_BYTE = 9;
|
||||
|
||||
/** Upper two bytes of MTIME in gzip header */
|
||||
private static final int FROMPORT_BYTES = 4;
|
||||
|
||||
/** Lower two bytes of MTIME in gzip header */
|
||||
private static final int TOPORT_BYTES = 6;
|
||||
|
||||
/** Non-muxed sets the OS byte to 0xff */
|
||||
private static int getProto(MessagePayloadMessage msg) {
|
||||
int rv = getByte(msg, PROTO_BYTE) & 0xff;
|
||||
return rv == 0xff ? PROTO_UNSPECIFIED : rv;
|
||||
}
|
||||
|
||||
/** Non-muxed sets the MTIME bytes to 0 */
|
||||
private static int getFromPort(MessagePayloadMessage msg) {
|
||||
return (((getByte(msg, FROMPORT_BYTES) & 0xff) << 8) |
|
||||
(getByte(msg, FROMPORT_BYTES + 1) & 0xff));
|
||||
}
|
||||
|
||||
/** Non-muxed sets the MTIME bytes to 0 */
|
||||
private static int getToPort(MessagePayloadMessage msg) {
|
||||
return (((getByte(msg, TOPORT_BYTES) & 0xff) << 8) |
|
||||
(getByte(msg, TOPORT_BYTES + 1) & 0xff));
|
||||
}
|
||||
|
||||
private static int getByte(MessagePayloadMessage msg, int i) {
|
||||
return msg.getPayload().getUnencryptedData()[i] & 0xff;
|
||||
}
|
||||
|
||||
private static void setProto(byte[] payload, int p) {
|
||||
payload[PROTO_BYTE] = (byte) (p & 0xff);
|
||||
}
|
||||
|
||||
private static void setFromPort(byte[] payload, int p) {
|
||||
payload[FROMPORT_BYTES] = (byte) ((p >> 8) & 0xff);
|
||||
payload[FROMPORT_BYTES + 1] = (byte) (p & 0xff);
|
||||
}
|
||||
|
||||
private static void setToPort(byte[] payload, int p) {
|
||||
payload[TOPORT_BYTES] = (byte) ((p >> 8) & 0xff);
|
||||
payload[TOPORT_BYTES + 1] = (byte) (p & 0xff);
|
||||
}
|
||||
}
|
62
core/java/src/net/i2p/client/I2PSessionMuxedListener.java
Normal file
62
core/java/src/net/i2p/client/I2PSessionMuxedListener.java
Normal file
@ -0,0 +1,62 @@
|
||||
package net.i2p.client;
|
||||
|
||||
/*
|
||||
* public domain
|
||||
*/
|
||||
|
||||
/**
|
||||
* Define a means for the router to asynchronously notify the client that a
|
||||
* new message is available or the router is under attack.
|
||||
*
|
||||
* @author zzz extends I2PSessionListener
|
||||
*/
|
||||
public interface I2PSessionMuxedListener extends I2PSessionListener {
|
||||
|
||||
/**
|
||||
* Will be called only if you register via
|
||||
* setSessionListener() or addSessionListener().
|
||||
* And if you are doing that, just use I2PSessionListener.
|
||||
*
|
||||
* If you register via addSessionListener(),
|
||||
* this will be called only for the proto(s) and toport(s) you register for.
|
||||
*
|
||||
* @param session session to notify
|
||||
* @param msgId message number available
|
||||
* @param size size of the message - why it's a long and not an int is a mystery
|
||||
*/
|
||||
void messageAvailable(I2PSession session, int msgId, long size);
|
||||
|
||||
/**
|
||||
* Instruct the client that the given session has received a message
|
||||
*
|
||||
* Will be called only if you register via addMuxedSessionListener().
|
||||
* Will be called only for the proto(s) and toport(s) you register for.
|
||||
*
|
||||
* @param session session to notify
|
||||
* @param msgId message number available
|
||||
* @param size size of the message - why it's a long and not an int is a mystery
|
||||
* @param proto 1-254 or 0 for unspecified
|
||||
* @param fromport 1-65535 or 0 for unspecified
|
||||
* @param toport 1-65535 or 0 for unspecified
|
||||
*/
|
||||
void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport);
|
||||
|
||||
/** Instruct the client that the session specified seems to be under attack
|
||||
* and that the client may wish to move its destination to another router.
|
||||
* @param session session to report abuse to
|
||||
* @param severity how bad the abuse is
|
||||
*/
|
||||
void reportAbuse(I2PSession session, int severity);
|
||||
|
||||
/**
|
||||
* Notify the client that the session has been terminated
|
||||
*
|
||||
*/
|
||||
void disconnected(I2PSession session);
|
||||
|
||||
/**
|
||||
* Notify the client that some error occurred
|
||||
*
|
||||
*/
|
||||
void errorOccurred(I2PSession session, String message, Throwable error);
|
||||
}
|
Reference in New Issue
Block a user