* I2CP: Client-side prep for asynch status for sent messages (ticket #788)

- Clean up and reuse MessageState for asynch notification
   - New I2PSession sendMessage() method and listener
   - Move VerifyUsage from SimpleScheduler to SimpleTimer2 for efficiency
   - Fix up javadocs
This commit is contained in:
zzz
2014-05-15 20:11:21 +00:00
parent a93666cd36
commit 8371b8f806
7 changed files with 397 additions and 422 deletions

View File

@ -301,7 +301,7 @@ class I2CPMessageProducer {
* @param key unused - no end-to-end crypto
* @param newKey unused - no end-to-end crypto
*/
private Payload createPayload(Destination dest, byte[] payload, SessionTag tag, SessionKey key, Set tags,
private Payload createPayload(Destination dest, byte[] payload, SessionTag tag, SessionKey key, Set<SessionTag> tags,
SessionKey newKey) throws I2PSessionException {
if (dest == null) throw new I2PSessionException("No destination specified");
if (payload == null) throw new I2PSessionException("No payload specified");
@ -346,8 +346,8 @@ class I2CPMessageProducer {
* to the router
*
*/
public void createLeaseSet(I2PSessionImpl session, LeaseSet leaseSet, SigningPrivateKey signingPriv, PrivateKey priv)
throws I2PSessionException {
public void createLeaseSet(I2PSessionImpl session, LeaseSet leaseSet, SigningPrivateKey signingPriv,
PrivateKey priv) throws I2PSessionException {
CreateLeaseSetMessage msg = new CreateLeaseSetMessage();
msg.setLeaseSet(leaseSet);
msg.setPrivateKey(priv);

View File

@ -47,10 +47,21 @@ public interface I2PSession {
*/
public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException;
/** Send a new message to the given destination, containing the specified
* payload, returning true if the router feels confident that the message
* was delivered.
*
* WARNING: It is recommended that you use a method that specifies the protocol and ports.
*
* @param dest location to send the message
* @param payload body of the message to be sent (unencrypted)
* @return success
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException;
/**
* See I2PSessionMuxedImpl for proto/port details.
* @return success
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException;
@ -83,6 +94,7 @@ public interface I2PSession {
* @param tagsSent UNUSED, IGNORED. Set of tags delivered to the peer and associated with the keyUsed. This is also an output parameter -
* the contents of the set is ignored during the call, but afterwards it contains a set of SessionTag
* objects that were sent along side the given keyUsed.
* @return success
*/
public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
@ -90,6 +102,7 @@ public interface I2PSession {
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @return success
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
@ -98,6 +111,7 @@ public interface I2PSession {
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @param expire absolute expiration timestamp, NOT interval from now
* @return success
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException;
@ -107,6 +121,14 @@ public interface I2PSession {
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @param proto 1-254 or 0 for unset; recommended:
* I2PSession.PROTO_UNSPECIFIED
* I2PSession.PROTO_STREAMING
* I2PSession.PROTO_DATAGRAM
* 255 disallowed
* @param fromPort 1-65535 or 0 for unset
* @param toPort 1-65535 or 0 for unset
* @return success
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent,
@ -118,6 +140,14 @@ public interface I2PSession {
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @param expire absolute expiration timestamp, NOT interval from now
* @param proto 1-254 or 0 for unset; recommended:
* I2PSession.PROTO_UNSPECIFIED
* I2PSession.PROTO_STREAMING
* I2PSession.PROTO_DATAGRAM
* 255 disallowed
* @param fromPort 1-65535 or 0 for unset
* @param toPort 1-65535 or 0 for unset
* @return success
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
@ -129,6 +159,14 @@ public interface I2PSession {
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @param expire absolute expiration timestamp, NOT interval from now
* @param proto 1-254 or 0 for unset; recommended:
* I2PSession.PROTO_UNSPECIFIED
* I2PSession.PROTO_STREAMING
* I2PSession.PROTO_DATAGRAM
* 255 disallowed
* @param fromPort 1-65535 or 0 for unset
* @param toPort 1-65535 or 0 for unset
* @return success
* @since 0.8.4
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
@ -137,11 +175,45 @@ public interface I2PSession {
/**
* See I2PSessionMuxedImpl for proto/port details.
* See SendMessageOptions for option details.
*
* @param proto 1-254 or 0 for unset; recommended:
* I2PSession.PROTO_UNSPECIFIED
* I2PSession.PROTO_STREAMING
* I2PSession.PROTO_DATAGRAM
* 255 disallowed
* @param fromPort 1-65535 or 0 for unset
* @param toPort 1-65535 or 0 for unset
* @param options to be passed to the router
* @return success
* @since 0.9.2
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException;
/**
* Send a message and request an asynchronous notification of delivery status.
* Notifications will be delivered at least up to the expiration specified in the options,
* or 60 seconds if not specified.
*
* See I2PSessionMuxedImpl for proto/port details.
* See SendMessageOptions for option details.
*
* @param proto 1-254 or 0 for unset; recommended:
* I2PSession.PROTO_UNSPECIFIED
* I2PSession.PROTO_STREAMING
* I2PSession.PROTO_DATAGRAM
* 255 disallowed
* @param fromPort 1-65535 or 0 for unset
* @param toPort 1-65535 or 0 for unset
* @param options to be passed to the router
* @return the message ID to be used for later notification to the listener
* @throws I2PSessionException on all errors
* @since 0.9.14
*/
public long sendMessage(Destination dest, byte[] payload, int offset, int size,
int proto, int fromport, int toport,
SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException;
/** Receive a message that the router has notified the client about, returning
* the payload.
* This may only be called once for a given msgId (until the counter wraps)

View File

@ -51,7 +51,7 @@ import net.i2p.util.I2PSSLSocketFactory;
import net.i2p.util.LHMCache;
import net.i2p.util.Log;
import net.i2p.util.OrderedProperties;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
import net.i2p.util.VersionComparator;
/**
@ -618,20 +618,24 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
}
/**
* Fire up a periodic task to check for unclamed messages
* Fire up a periodic task to check for unclaimed messages
* @since 0.9.1
*/
private void startVerifyUsage() {
_context.simpleScheduler().addEvent(new VerifyUsage(), VERIFY_USAGE_TIME);
protected void startVerifyUsage() {
new VerifyUsage();
}
/**
* Check for unclaimed messages, without wastefully setting a timer for each
* message. Just copy all unclaimed ones and check 30 seconds later.
* message. Just copy all unclaimed ones and check some time later.
*/
private class VerifyUsage implements SimpleTimer.TimedEvent {
private class VerifyUsage extends SimpleTimer2.TimedEvent {
private final List<Long> toCheck = new ArrayList<Long>();
public VerifyUsage() {
super(_context.simpleTimer2(), VERIFY_USAGE_TIME);
}
public void timeReached() {
if (isClosed())
return;
@ -641,12 +645,12 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
for (Long msgId : toCheck) {
MessagePayloadMessage removed = _availableMessages.remove(msgId);
if (removed != null)
_log.error("Message NOT removed! id=" + msgId + ": " + removed);
_log.error(getPrefix() + " Client not responding? Message not processed! id=" + msgId + ": " + removed);
}
toCheck.clear();
}
toCheck.addAll(_availableMessages.keySet());
_context.simpleScheduler().addEvent(this, VERIFY_USAGE_TIME);
schedule(VERIFY_USAGE_TIME);
}
}

View File

@ -11,11 +11,13 @@ package net.i2p.client;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
@ -24,6 +26,7 @@ import net.i2p.data.SessionKey;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.MessageStatusMessage;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* Thread safe implementation of an I2P session running over TCP.
@ -35,7 +38,8 @@ import net.i2p.util.Log;
class I2PSessionImpl2 extends I2PSessionImpl {
/** set of MessageState objects, representing all of the messages in the process of being sent */
private /* FIXME final FIXME */ Set<MessageState> _sendingStates;
protected final Map<Long, MessageState> _sendingStates;
protected final AtomicLong _sendMessageNonce;
/** max # seconds to wait for confirmation of the message send */
private final static long SEND_TIMEOUT = 60 * 1000; // 60 seconds to send
/** should we gzip each payload prior to sending it? */
@ -44,12 +48,16 @@ class I2PSessionImpl2 extends I2PSessionImpl {
/** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */
protected boolean _noEffort;
private static final long REMOVE_EXPIRED_TIME = 63*1000;
/**
* for extension by SimpleSession (no dest)
*/
protected I2PSessionImpl2(I2PAppContext context, Properties options,
I2PClientMessageHandlerMap handlerMap) {
super(context, options, handlerMap);
_sendingStates = null;
_sendMessageNonce = null;
}
/**
@ -63,11 +71,12 @@ class I2PSessionImpl2 extends I2PSessionImpl {
*/
public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException {
super(ctx, destKeyStream, options);
_sendingStates = new HashSet<MessageState>(32);
_sendingStates = new ConcurrentHashMap<Long, MessageState>(32);
_sendMessageNonce = new AtomicLong();
// default is BestEffort
_noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } );
//ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } );
//ctx.statManager().createRateStat("i2cp.sendBestEffortStage0", "first part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
//ctx.statManager().createRateStat("i2cp.sendBestEffortStage1", "second part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
//ctx.statManager().createRateStat("i2cp.sendBestEffortStage2", "third part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
@ -80,11 +89,48 @@ class I2PSessionImpl2 extends I2PSessionImpl {
//_context.statManager().createRateStat("i2cp.receiveStatusTime.3", "How long it took to get status=3 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 });
//_context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 });
_context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 });
}
/**
* Fire up a periodic task to check for unclaimed messages
* @since 0.9.14
*/
@Override
protected void startVerifyUsage() {
super.startVerifyUsage();
new RemoveExpired();
}
/**
* Check for expired message states, without wastefully setting a timer for each
* message.
* @since 0.9.14
*/
private class RemoveExpired extends SimpleTimer2.TimedEvent {
public RemoveExpired() {
super(_context.simpleTimer2(), REMOVE_EXPIRED_TIME);
}
public void timeReached() {
if (isClosed())
return;
if (!_sendingStates.isEmpty()) {
long now = _context.clock().now();
for (Iterator<MessageState> iter = _sendingStates.values().iterator(); iter.hasNext(); ) {
MessageState state = iter.next();
if (state.getExpires() < now)
iter.remove();
}
}
schedule(REMOVE_EXPIRED_TIME);
}
}
protected long getTimeout() {
return SEND_TIMEOUT;
}
@ -109,6 +155,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
* Todo: don't compress if destination is local?
*/
private static final int DONT_COMPRESS_SIZE = 66;
protected boolean shouldCompress(int size) {
if (size <= DONT_COMPRESS_SIZE)
return false;
@ -118,33 +165,47 @@ class I2PSessionImpl2 extends I2PSessionImpl {
return SHOULD_COMPRESS;
}
/** @throws UnsupportedOperationException always, use MuxedImpl */
public void addSessionListener(I2PSessionListener lsnr, int proto, int port) {
throw new IllegalArgumentException("Use MuxedImpl");
throw new UnsupportedOperationException("Use MuxedImpl");
}
/** @throws UnsupportedOperationException always, use MuxedImpl */
public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) {
throw new IllegalArgumentException("Use MuxedImpl");
throw new UnsupportedOperationException("Use MuxedImpl");
}
/** @throws UnsupportedOperationException always, use MuxedImpl */
public void removeListener(int proto, int port) {
throw new IllegalArgumentException("Use MuxedImpl");
throw new UnsupportedOperationException("Use MuxedImpl");
}
/** @throws UnsupportedOperationException always, use MuxedImpl */
public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException {
throw new IllegalArgumentException("Use MuxedImpl");
throw new UnsupportedOperationException("Use MuxedImpl");
}
/** @throws UnsupportedOperationException always, use MuxedImpl */
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent,
int proto, int fromport, int toport) throws I2PSessionException {
throw new IllegalArgumentException("Use MuxedImpl");
throw new UnsupportedOperationException("Use MuxedImpl");
}
/** @throws UnsupportedOperationException always, use MuxedImpl */
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
int proto, int fromport, int toport) throws I2PSessionException {
throw new IllegalArgumentException("Use MuxedImpl");
throw new UnsupportedOperationException("Use MuxedImpl");
}
/** @throws UnsupportedOperationException always, use MuxedImpl */
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
int proto, int fromport, int toport, int flags) throws I2PSessionException {
throw new IllegalArgumentException("Use MuxedImpl");
throw new UnsupportedOperationException("Use MuxedImpl");
}
/** @throws UnsupportedOperationException always, use MuxedImpl */
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException {
throw new IllegalArgumentException("Use MuxedImpl");
throw new UnsupportedOperationException("Use MuxedImpl");
}
/** @throws UnsupportedOperationException always, use MuxedImpl */
public long sendMessage(Destination dest, byte[] payload, int offset, int size,
int proto, int fromport, int toport,
SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException {
throw new UnsupportedOperationException("Use MuxedImpl");
}
/** unused, see MuxedImpl override */
@ -210,8 +271,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
String d = dest.calculateHash().toBase64().substring(0,4);
_log.info("sending message to: " + d + " compress? " + sc + " sizeIn=" + size + " sizeOut=" + compressed);
}
_context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
_context.statManager().addRateData("i2cp.tx.msgCompressed", compressed);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size);
if (_noEffort)
return sendNoEffort(dest, payload, expires, 0);
else
@ -257,142 +318,29 @@ class I2PSessionImpl2 extends I2PSessionImpl {
*/
protected boolean sendBestEffort(Destination dest, byte payload[], long expires, int flags)
throws I2PSessionException {
//SessionKey key = null;
//SessionKey newKey = null;
//SessionTag tag = null;
//Set sentTags = null;
//int oldTags = 0;
long begin = _context.clock().now();
/***********
if (I2CPMessageProducer.END_TO_END_CRYPTO) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("begin sendBestEffort");
key = _context.sessionKeyManager().getCurrentKey(dest.getPublicKey());
if (_log.shouldLog(Log.DEBUG)) _log.debug("key fetched");
if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey());
tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key);
if (_log.shouldLog(Log.DEBUG)) _log.debug("tag consumed");
sentTags = null;
oldTags = _context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key);
long availTimeLeft = _context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key);
if ( (tagsSent == null) || (tagsSent.isEmpty()) ) {
if (oldTags < NUM_TAGS) {
sentTags = createNewTags(NUM_TAGS);
if (_log.shouldLog(Log.DEBUG))
_log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding " + NUM_TAGS + ": " + sentTags);
} else if (availTimeLeft < 2 * 60 * 1000) {
// if we have > 50 tags, but they expire in under 2 minutes, we want more
sentTags = createNewTags(NUM_TAGS);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding " + NUM_TAGS + " new ones: " + sentTags);
//_log.error("** sendBestEffort available time left " + availTimeLeft);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft);
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("sendBestEffort is sending " + tagsSent.size() + " with " + availTimeLeft
+ "ms left, " + oldTags + " tags known and "
+ (tag == null ? "no tag" : " a valid tag"));
}
if (false) // rekey
newKey = _context.keyGenerator().generateSessionKey();
if ( (tagsSent != null) && (!tagsSent.isEmpty()) ) {
if (sentTags == null)
sentTags = new HashSet();
sentTags.addAll(tagsSent);
}
} else {
// not using end to end crypto, so don't ever bundle any tags
}
**********/
//if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce");
long nonce = _context.random().nextInt(Integer.MAX_VALUE - 1) + 1;
//if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
long nonce = _sendMessageNonce.incrementAndGet();
MessageState state = new MessageState(_context, nonce, getPrefix());
//state.setKey(key);
//state.setTags(sentTags);
//state.setNewKey(newKey);
state.setTo(dest);
//if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Setting key = " + key);
//if (keyUsed != null) {
//if (I2CPMessageProducer.END_TO_END_CRYPTO) {
// if (newKey != null)
// keyUsed.setData(newKey.getData());
// else
// keyUsed.setData(key.getData());
//} else {
// keyUsed.setData(SessionKey.INVALID_KEY.getData());
//}
//}
//if (tagsSent != null) {
// if (sentTags != null) {
// tagsSent.addAll(sentTags);
// }
//}
//if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
long beforeSendingSync = _context.clock().now();
long inSendingSync = 0;
synchronized (_sendingStates) {
inSendingSync = _context.clock().now();
_sendingStates.add(state);
}
long afterSendingSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / "
+ state.getNonce() + " for best effort "
+ " sync took " + (inSendingSync-beforeSendingSync)
+ " add took " + (afterSendingSync-inSendingSync));
//_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey, expires);
_producer.sendMessage(this, dest, nonce, payload, expires, flags);
// since this is 'best effort', all we're waiting for is a status update
// saying that the router received it - in theory, that should come back
// immediately, but in practice can take up to a second (though usually
// much quicker). setting this to false will short-circuit that delay
boolean actuallyWait = false; // true;
long beforeWaitFor = _context.clock().now();
if (actuallyWait)
state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED,
_context.clock().now() + getTimeout());
//long afterWaitFor = _context.clock().now();
//long inRemovingSync = 0;
synchronized (_sendingStates) {
//inRemovingSync = _context.clock().now();
_sendingStates.remove(state);
}
long afterRemovingSync = _context.clock().now();
boolean found = !actuallyWait || state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
+ " / " + state.getNonce() + " found = " + found);
_sendingStates.put(Long.valueOf(nonce), state);
_producer.sendMessage(this, dest, nonce, payload, expires, flags);
long timeToSend = afterRemovingSync - beforeSendingSync;
if ( (timeToSend > 10*1000) && (_log.shouldLog(Log.WARN)) ) {
_log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz"));
if (actuallyWait) {
try {
state.waitForAccept(_context.clock().now() + getTimeout());
} catch (InterruptedException ie) {
throw new I2PSessionException("interrupted");
} finally {
_sendingStates.remove(Long.valueOf(nonce));
}
}
if ( (afterRemovingSync - begin > 500) && (_log.shouldLog(Log.WARN) ) ) {
_log.warn("Took " + (afterRemovingSync-begin) + "ms to sendBestEffort, "
+ (afterSendingSync-begin) + "ms to prepare, "
+ (beforeWaitFor-afterSendingSync) + "ms to send, "
+ (afterRemovingSync-beforeWaitFor) + "ms waiting for reply");
}
_context.statManager().addRateData("i2cp.sendBestEffortTotalTime", afterRemovingSync - begin, 0);
//_context.statManager().addRateData("i2cp.sendBestEffortStage0", beforeSendingSync- begin, 0);
//_context.statManager().addRateData("i2cp.sendBestEffortStage1", afterSendingSync- beforeSendingSync, 0);
//_context.statManager().addRateData("i2cp.sendBestEffortStage2", beforeWaitFor- afterSendingSync, 0);
//_context.statManager().addRateData("i2cp.sendBestEffortStage3", afterWaitFor- beforeWaitFor, 0);
//_context.statManager().addRateData("i2cp.sendBestEffortStage4", afterRemovingSync- afterWaitFor, 0);
boolean found = !actuallyWait || state.wasAccepted();
if (found) {
if (_log.shouldLog(Log.INFO))
@ -402,9 +350,9 @@ class I2PSessionImpl2 extends I2PSessionImpl {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with "
+ payload.length + " bytes");
if (_log.shouldLog(Log.ERROR))
_log.error(getPrefix() + "Never received *accepted* from the router! dropping and reconnecting");
disconnect();
//if (_log.shouldLog(Log.ERROR))
// _log.error(getPrefix() + "Never received *accepted* from the router! dropping and reconnecting");
//disconnect();
return false;
}
return found;
@ -432,8 +380,6 @@ class I2PSessionImpl2 extends I2PSessionImpl {
* Even when using sendBestEffort(), this is a waste, because the
* MessageState is removed from _sendingStates immediately and
* so the lookup here fails.
* And iterating through the HashSet instead of having a map
* is bad too.
*
* This is now pretty much avoided since streaming now sets
* i2cp.messageReliability = none, which forces sendNoEffort() instead of sendBestEffort(),
@ -443,32 +389,24 @@ class I2PSessionImpl2 extends I2PSessionImpl {
*/
@Override
public void receiveStatus(int msgId, long nonce, int status) {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
MessageState state = null;
long beforeSync = _context.clock().now();
long inSync = 0;
synchronized (_sendingStates) {
inSync = _context.clock().now();
for (Iterator<MessageState> iter = _sendingStates.iterator(); iter.hasNext();) {
state = iter.next();
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State " + state.getMessageId() + " / " + state.getNonce());
if (state.getNonce() == nonce) {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Found a matching state");
break;
} else if ((state.getMessageId() != null) && (state.getMessageId().getMessageId() == msgId)) {
if ((state = _sendingStates.get(Long.valueOf(nonce))) != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Found a matching state");
} else if (!_sendingStates.isEmpty()) {
// O(n**2)
// shouldn't happen, router sends good nonce for all statuses as of 0.9.14
for (MessageState s : _sendingStates.values()) {
if (s.getMessageId() != null && s.getMessageId().getMessageId() == msgId) {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Found a matching state by msgId");
state = s;
break;
} else {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State does not match");
state = null;
}
}
}
long afterSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("receiveStatus(" + msgId + ", " + nonce + ", " + status+ "): sync: "
+ (inSync-beforeSync) + "ms, check: " + (afterSync-inSync));
if (state != null) {
if (state.getMessageId() == null) {
@ -477,11 +415,13 @@ class I2PSessionImpl2 extends I2PSessionImpl {
state.setMessageId(id);
}
state.receive(status);
if (state.wasSuccessful())
_sendingStates.remove(Long.valueOf(nonce));
long lifetime = state.getElapsed();
switch (status) {
case 1:
_context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime, 0);
_context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime);
break;
// best effort codes unused
//case 2:
@ -491,10 +431,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
// _context.statManager().addRateData("i2cp.receiveStatusTime.3", lifetime, 0);
// break;
case 4:
_context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime, 0);
_context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime);
break;
case 5:
_context.statManager().addRateData("i2cp.receiveStatusTime.5", lifetime, 0);
_context.statManager().addRateData("i2cp.receiveStatusTime.5", lifetime);
break;
}
@ -503,7 +443,6 @@ class I2PSessionImpl2 extends I2PSessionImpl {
_log.info(getPrefix() + "No matching state for messageId " + msgId + " / " + nonce
+ " w/ status = " + status);
}
_context.statManager().addRateData("i2cp.receiveStatusTime", _context.clock().now() - beforeSync, 0);
}
/**
@ -522,11 +461,11 @@ class I2PSessionImpl2 extends I2PSessionImpl {
private void clearStates() {
if (_sendingStates == null) // only null if overridden by I2PSimpleSession
return;
synchronized (_sendingStates) {
for (MessageState state : _sendingStates)
state.cancel();
if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Disconnecting " + _sendingStates.size() + " states");
_sendingStates.clear();
for (MessageState state : _sendingStates.values()) {
state.cancel();
}
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Disconnecting " + _sendingStates.size() + " states");
_sendingStates.clear();
}
}

View File

@ -193,21 +193,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
SessionKey keyUsed, Set tagsSent, long expires,
int proto, int fromPort, int toPort, int flags)
throws I2PSessionException {
if (isClosed()) throw new I2PSessionException("Already closed");
updateActivity();
boolean sc = shouldCompress(size);
if (sc)
payload = DataHelper.compress(payload, offset, size);
else
payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION);
setProto(payload, proto);
setFromPort(payload, fromPort);
setToPort(payload, toPort);
_context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
payload = prepPayload(payload, offset, size, proto, fromPort, toPort);
if (_noEffort)
return sendNoEffort(dest, payload, expires, flags);
else
@ -232,11 +218,48 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
@Override
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
int proto, int fromPort, int toPort, SendMessageOptions options) throws I2PSessionException {
payload = prepPayload(payload, offset, size, proto, fromPort, toPort);
//if (_noEffort) {
sendNoEffort(dest, payload, options);
return true;
//} else {
// unimplemented
//return sendBestEffort(dest, payload, options);
//}
}
/**
* Send a message and request an asynchronous notification of delivery status.
*
* See I2PSessionMuxedImpl for proto/port details.
* See SendMessageOptions for option details.
*
* @return the message ID to be used for later notification to the listener
* @throws I2PSessionException on all errors
* @since 0.9.14
*/
@Override
public long sendMessage(Destination dest, byte[] payload, int offset, int size,
int proto, int fromPort, int toPort,
SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException {
payload = prepPayload(payload, offset, size, proto, fromPort, toPort);
long nonce = _sendMessageNonce.incrementAndGet();
long expires = Math.max(_context.clock().now() + 60*1000L, options.getTime());
MessageState state = new MessageState(_context, nonce, this, expires, listener);
_sendingStates.put(Long.valueOf(nonce), state);
_producer.sendMessage(this, dest, nonce, payload, options);
return nonce;
}
/**
* @return gzip compressed payload, ready to send
* @since 0.9.14
*/
private byte[] prepPayload(byte[] payload, int offset, int size, int proto, int fromPort, int toPort) throws I2PSessionException {
if (isClosed()) throw new I2PSessionException("Already closed");
updateActivity();
boolean sc = shouldCompress(size);
if (sc)
if (shouldCompress(size))
payload = DataHelper.compress(payload, offset, size);
else
payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION);
@ -245,15 +268,9 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
setFromPort(payload, fromPort);
setToPort(payload, toPort);
_context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
//if (_noEffort) {
sendNoEffort(dest, payload, options);
return true;
//} else {
// unimplemented
//return sendBestEffort(dest, payload, options);
//}
_context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size);
return payload;
}
/**

View File

@ -1,12 +1,8 @@
package net.i2p.client;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.data.SessionKey;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.MessageStatusMessage;
import net.i2p.util.Log;
@ -14,8 +10,10 @@ import net.i2p.util.Log;
/**
* Contains the state of a payload message being sent to a peer.
*
* This is mostly unused. See sendNoEffort vs. sendBestEffort in I2PSessionImpl2.
* TODO delete altogether? This is really bad.
* Originally was a general-purpose waiter.
* Then we got rid of guaranteed delivery.
* Then we stopped waiting for accept in best-effort delivery.
* Brought back to life for asynchronous status delivery to the client.
*/
class MessageState {
private final I2PAppContext _context;
@ -23,32 +21,59 @@ class MessageState {
private final long _nonce;
private final String _prefix;
private MessageId _id;
private final Set<Integer> _receivedStatus;
private SessionKey _key;
private SessionKey _newKey;
private Set _tags;
private Destination _to;
private boolean _cancelled;
private final long _created;
private final long _expires;
private final SendMessageStatusListener _listener;
private final I2PSession _session;
private static final AtomicLong __stateId = new AtomicLong();
private final long _stateId;
private enum State { INIT, ACCEPTED, PROBABLE_FAIL, FAIL, SUCCESS };
private State _state = State.INIT;
/**
* For synchronous waiting for accept with waitForAccept().
* UNUSED.
*/
public MessageState(I2PAppContext ctx, long nonce, String prefix) {
_stateId = __stateId.incrementAndGet();
_context = ctx;
_log = ctx.logManager().getLog(MessageState.class);
_nonce = nonce;
_prefix = prefix + "[" + _stateId + "]: ";
_receivedStatus = new HashSet<Integer>();
_prefix = prefix + '[' + _nonce + "]: ";
_created = ctx.clock().now();
//ctx.statManager().createRateStat("i2cp.checkStatusTime", "how long it takes to go through the states", "i2cp", new long[] { 60*1000 });
_expires = _created + 60*1000L;
_listener = null;
_session = null;
}
/**
* For asynchronous notification
* @param expires absolute time (not interval)
* @since 0.9.14
*/
public MessageState(I2PAppContext ctx, long nonce, I2PSession session,
long expires, SendMessageStatusListener listener) {
_context = ctx;
_log = ctx.logManager().getLog(MessageState.class);
_nonce = nonce;
_prefix = session.toString() + " [" + _nonce + "]: ";
_created = ctx.clock().now();
_expires = expires;
_listener = listener;
_session = session;
}
public void receive(int status) {
synchronized (_receivedStatus) {
_receivedStatus.add(Integer.valueOf(status));
_receivedStatus.notifyAll();
State oldState;
State newState;
synchronized (this) {
oldState = _state;
locked_update(status);
newState = _state;
this.notifyAll();
}
if (_listener != null) {
// only notify on changing state, and only if we haven't expired
if (oldState != newState && _expires > _context.clock().now())
_listener.messageStatus(_session, _nonce, status);
}
}
@ -60,221 +85,114 @@ class MessageState {
return _id;
}
public long getNonce() {
return _nonce;
}
/** @deprecated unused */
public void setKey(SessionKey key) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Setting key [" + _key + "] to [" + key + "]");
_key = key;
}
/** @deprecated unused */
public SessionKey getKey() {
return _key;
}
/** @deprecated unused */
public void setNewKey(SessionKey key) {
_newKey = key;
}
/** @deprecated unused */
public SessionKey getNewKey() {
return _newKey;
}
/** @deprecated unused */
public void setTags(Set tags) {
_tags = tags;
}
/** @deprecated unused */
public Set getTags() {
return _tags;
}
public void setTo(Destination dest) {
_to = dest;
}
/** @deprecated unused */
public Destination getTo() {
return _to;
}
public long getElapsed() {
return _context.clock().now() - _created;
}
public void waitFor(int status, long expiration) {
//long checkTime = -1;
boolean found = false;
while (!found) {
if (_cancelled) return;
/**
* @since 0.9.14
*/
public long getExpires() {
return _expires;
}
/**
* For guaranteed/best effort only. Not really used.
*/
public void waitForAccept(long expiration) throws InterruptedException {
while (true) {
long timeToWait = expiration - _context.clock().now();
if (timeToWait <= 0) {
if (_log.shouldLog(Log.WARN))
_log.warn(_prefix + "Expired waiting for the status [" + status + "]");
_log.warn(_prefix + "Expired waiting for the status");
return;
}
found = false;
synchronized (_receivedStatus) {
//long beforeCheck = _context.clock().now();
if (locked_isSuccess(status) || locked_isFailure(status)) {
synchronized (this) {
if (_state != State.INIT) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received a confirm (one way or the other)");
found = true;
}
//checkTime = _context.clock().now() - beforeCheck;
if (!found) {
if (timeToWait > 5000) {
timeToWait = 5000;
}
try {
_receivedStatus.wait(timeToWait);
} catch (InterruptedException ie) { // nop
}
return;
}
if (timeToWait > 5000)
timeToWait = 5000;
this.wait(timeToWait);
}
//if (found)
// _context.statManager().addRateData("i2cp.checkStatusTime", checkTime, 0);
}
}
private boolean locked_isSuccess(int wantedStatus) {
boolean rv = false;
/**
* Update our flags
* @since 0.9.14
*/
private void locked_update(int status) {
switch (status) {
case MessageStatusMessage.STATUS_SEND_ACCEPTED:
// only trumps init
if (_state == State.INIT)
_state = State.ACCEPTED;
break;
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "isSuccess(" + wantedStatus + "): " + _receivedStatus);
for (Integer val : _receivedStatus) {
int recv = val.intValue();
switch (recv) {
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
if (_log.shouldLog(Log.WARN))
_log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from "
+ toString());
rv = false;
break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
if (_log.shouldLog(Log.WARN))
_log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from "
+ toString());
rv = false;
break;
case MessageStatusMessage.STATUS_SEND_ACCEPTED:
if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) {
return true; // if we're only looking for accepted, take it directly (don't let any GUARANTEED_* override it)
}
// ignore accepted, as we want something better
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Got accepted, but we're waiting for more from " + toString());
continue;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received best effort success after " + getElapsed()
+ " from " + toString());
if (wantedStatus == recv) {
rv = true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Not guaranteed success, but best effort after "
+ getElapsed() + " will do... from " + toString());
rv = true;
}
break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from "
+ toString());
// even if we're waiting for best effort success, guaranteed is good enough
rv = true;
break;
case -1:
continue;
default:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received something else [" + recv + "]...");
}
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
// does not trump failure or success
if (_state != State.FAIL && _state != State.SUCCESS)
_state = State.PROBABLE_FAIL;
break;
case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE:
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS:
case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW:
case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET:
case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION:
case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION:
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET:
case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET:
case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET:
case SendMessageStatusListener.STATUS_CANCELLED:
// does not trump success
if (_state != State.SUCCESS)
_state = State.FAIL;
break;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL:
// trumps all
_state = State.SUCCESS;
default:
break;
}
return rv;
}
private boolean locked_isFailure(int wantedStatus) {
boolean rv = false;
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "isFailure(" + wantedStatus + "): " + _receivedStatus);
for (Integer val : _receivedStatus) {
int recv = val.intValue();
switch (recv) {
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
if (_log.shouldLog(Log.DEBUG))
_log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from "
+ toString());
rv = true;
break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
if (_log.shouldLog(Log.DEBUG))
_log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from "
+ toString());
rv = true;
break;
case MessageStatusMessage.STATUS_SEND_ACCEPTED:
if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) {
rv = false;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Got accepted, but we're waiting for more from "
+ toString());
continue;
// ignore accepted, as we want something better
}
break;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received best effort success after " + getElapsed()
+ " from " + toString());
if (wantedStatus == recv) {
rv = false;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Not guaranteed success, but best effort after "
+ getElapsed() + " will do... from " + toString());
rv = false;
}
break;
case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from "
+ toString());
// even if we're waiting for best effort success, guaranteed is good enough
rv = false;
break;
case -1:
continue;
default:
if (_log.shouldLog(Log.DEBUG))
_log.debug(_prefix + "Received something else [" + recv + "]...");
}
/**
* @return true if accepted (fixme and not failed)
* @since 0.9.14
*/
public boolean wasAccepted() {
synchronized (this) {
return _state != State.INIT && _state != State.FAIL;
}
return rv;
}
/** #return true if the given status (or an equivalent) was received */
public boolean received(int status) {
synchronized (_receivedStatus) {
return locked_isSuccess(status);
/**
* @return true if successful
* @since 0.9.14
*/
public boolean wasSuccessful() {
synchronized (this) {
return _state == State.SUCCESS;
}
}
public void cancel() {
_cancelled = true;
synchronized (_receivedStatus) {
_receivedStatus.notifyAll();
}
// Inject a fake status
receive(SendMessageStatusListener.STATUS_CANCELLED);
}
}

View File

@ -0,0 +1,25 @@
package net.i2p.client;
/**
* Asynchronously notify the client of the status
* of a sent message.
*
* @since 0.9.14
*/
public interface SendMessageStatusListener {
/** I2CP status codes are 0 - 255. Start our fake ones at 256. */
public static final int STATUS_CANCELLED = 256;
/**
* Tell the client of an update in the send status for a message
* previously sent with I2PSession.sendMessage().
* Multiple calls for a single message ID are possible.
*
* @param session session notifying
* @param msgId message number returned from a previous sendMessage() call
* @param status of the message, as defined in MessageStatusMessage and this class.
*/
void messageStatus(I2PSession session, long msgId, int status);
}