diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketException.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketException.java
new file mode 100644
index 0000000000..c657ac6723
--- /dev/null
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketException.java
@@ -0,0 +1,112 @@
+package net.i2p.client.streaming;
+
+import java.net.SocketException;
+
+import net.i2p.client.SendMessageStatusListener;
+import net.i2p.data.i2cp.MessageStatusMessage;
+
+/**
+ * An I2P-specific IOException thrown from input and output streams.
+ * with a stored status code to be used for programmatic responses.
+ *
+ * @since 0.9.14
+ */
+public class I2PSocketException extends SocketException {
+
+ private final int _status;
+ private static final int CUSTOM = -1;
+
+ /**
+ * Use canned message for this status code
+ * @param status >= 0 from MessageStatusMessage or SendMessageStatusListener
+ */
+ public I2PSocketException(int status) {
+ super();
+ _status = status;
+ }
+
+ /**
+ * Use message provided
+ */
+ public I2PSocketException(String message) {
+ super(message);
+ _status = CUSTOM;
+ }
+
+ /**
+ * For programmatic action based on specific failure code
+ *
+ * @return value from int constructor or -1 for String constructor
+ */
+ public int getStatus() {
+ return _status;
+ }
+
+ /**
+ * For programmatic action based on specific failure code
+ *
+ * @return canned message based on status in int constructor or message from String constructor
+ */
+ @Override
+ public String getMessage() {
+ switch (_status) {
+ case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
+ case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
+ return "Message timeout";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
+ return "Failed delivery to local destination";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
+ return "Local router failure";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
+ return "Local network failure";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:
+ return "Session closed";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE:
+ return "Invalid message";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS:
+ return "Invalid message options";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW:
+ return "Buffer overflow";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
+ return "Message expired";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET:
+ return "Local lease set invalid";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
+ return "No local tunnels";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION:
+ return "Unsupported encryption options";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION:
+ return "Invalid destination";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET:
+ return "Local router failure";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET:
+ return "Destination lease set expired";
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET:
+ return "Destination lease set not found";
+
+ case SendMessageStatusListener.STATUS_CANCELLED:
+ return "Local destination shutdown";
+
+ case CUSTOM:
+ return super.getMessage();
+
+ default:
+ return "Failure code: " + _status;
+ }
+ }
+}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
index 805234f4a9..196038eea6 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
@@ -617,6 +617,7 @@ class ConnectionManager {
disconnectAllHard();
_tcbShare.stop();
_timer.stop();
+ _outboundQueue.close();
}
/**
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java
index dd44087cec..18c9746281 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java
@@ -1,12 +1,21 @@
package net.i2p.client.streaming.impl;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.SendMessageOptions;
+import net.i2p.client.SendMessageStatusListener;
+import net.i2p.client.streaming.I2PSocketException;
import net.i2p.data.ByteArray;
+import net.i2p.data.i2cp.MessageStatusMessage;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
+import net.i2p.util.SimpleTimer2;
/**
* Queue out packets to be sent through the session.
@@ -16,12 +25,14 @@ import net.i2p.util.Log;
*
* MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession
*/
-class PacketQueue {
+class PacketQueue implements SendMessageStatusListener {
private final I2PAppContext _context;
private final Log _log;
private final I2PSession _session;
private final ConnectionManager _connectionManager;
private final ByteCache _cache = ByteCache.getInstance(64, 36*1024);
+ private final Map _messageStatusMap;
+ private volatile boolean _dead;
private static final int FLAGS_INITIAL_TAGS = Packet.FLAG_SYNCHRONIZE;
private static final int FLAGS_FINAL_TAGS = Packet.FLAG_CLOSE |
@@ -32,14 +43,27 @@ class PacketQueue {
private static final int TAG_WINDOW_FACTOR = 5;
private static final int FINAL_TAGS_TO_SEND = 4;
private static final int FINAL_TAG_THRESHOLD = 2;
+ private static final long REMOVE_EXPIRED_TIME = 67*1000;
public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
_context = context;
_session = session;
_connectionManager = mgr;
_log = context.logManager().getLog(PacketQueue.class);
+ _messageStatusMap = new ConcurrentHashMap(16);
+ new RemoveExpired();
// all createRateStats in ConnectionManager
}
+
+ /**
+ * Cannot be restarted.
+ *
+ * @since 0.9.14
+ */
+ public void close() {
+ _dead = true;
+ _messageStatusMap.clear();
+ }
/**
* Add a new packet to be sent out ASAP
@@ -48,6 +72,8 @@ class PacketQueue {
* @return true if sent
*/
public boolean enqueue(PacketLocal packet) {
+ if (_dead)
+ return false;
// this updates the ack/nack field
packet.prepare();
@@ -102,10 +128,15 @@ class PacketQueue {
SendMessageOptions options = new SendMessageOptions();
if (expires > 0)
options.setDate(expires);
+ boolean listenForStatus = false;
if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) {
Connection con = packet.getConnection();
- if (con != null && con.isInbound())
- options.setSendLeaseSet(false);
+ if (con != null) {
+ if (con.isInbound())
+ options.setSendLeaseSet(false);
+ else
+ listenForStatus = true;
+ }
options.setTagsToSend(INITIAL_TAGS_TO_SEND);
options.setTagThreshold(MIN_TAG_THRESHOLD);
} else if (packet.isFlagSet(FLAGS_FINAL_TAGS)) {
@@ -130,9 +161,17 @@ class PacketQueue {
options.setTagThreshold(thresh);
}
}
- sent = _session.sendMessage(packet.getTo(), buf, 0, size,
+ if (listenForStatus) {
+ long id = _session.sendMessage(packet.getTo(), buf, 0, size,
+ I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(),
+ options, this);
+ _messageStatusMap.put(Long.valueOf(id), packet.getConnection());
+ sent = true;
+ } else {
+ sent = _session.sendMessage(packet.getTo(), buf, 0, size,
I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(),
options);
+ }
end = _context.clock().now();
if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) )
@@ -192,5 +231,107 @@ class PacketQueue {
}
return sent;
}
+
+ /**
+ * SendMessageStatusListener interface
+ *
+ * Tell the client of an update in the send status for a message
+ * previously sent with I2PSession.sendMessage().
+ * Multiple calls for a single message ID are possible.
+ *
+ * @param session session notifying
+ * @param msgId message number returned from a previous sendMessage() call
+ * @param status of the message, as defined in MessageStatusMessage and this class.
+ * @since 0.9.14
+ */
+ public void messageStatus(I2PSession session, long msgId, int status) {
+ if (_dead)
+ return;
+ Connection con = _messageStatusMap.get(Long.valueOf(msgId));
+ if (con == null) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Rcvd status " + status + " for msg " + msgId + " on unknown connection");
+ return;
+ }
+
+ switch (status) {
+ case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
+ case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
+ _messageStatusMap.remove(Long.valueOf(msgId));
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Rcvd soft failure status " + status + " for msg " + msgId + " on " + con);
+ break;
+
+ case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET:
+ case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET:
+ case SendMessageStatusListener.STATUS_CANCELLED:
+ IOException ioe = new I2PSocketException(status);
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Rcvd hard failure status " + status + " for msg " + msgId + " on " + con);
+ _messageStatusMap.remove(Long.valueOf(msgId));
+ con.getOutputStream().streamErrorOccurred(ioe);
+ con.getInputStream().streamErrorOccurred(ioe);
+ con.setConnectionError("barf boof bazzle code " + status);
+ con.disconnect(false);
+ break;
+
+ case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
+ case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
+ case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL:
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Rcvd success status " + status + " for msg " + msgId + " on " + con);
+ _messageStatusMap.remove(Long.valueOf(msgId));
+ break;
+
+ case MessageStatusMessage.STATUS_SEND_ACCEPTED:
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Rcvd accept status " + status + " for msg " + msgId + " on " + con);
+ break;
+
+ default:
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Rcvd unknown status " + status + " for msg " + msgId + " on " + con);
+ _messageStatusMap.remove(Long.valueOf(msgId));
+ break;
+ }
+ }
+
+ /**
+ * Check for expired message states, without wastefully setting a timer for each
+ * message.
+ * @since 0.9.14
+ */
+ private class RemoveExpired extends SimpleTimer2.TimedEvent {
+
+ public RemoveExpired() {
+ super(_context.simpleTimer2(), REMOVE_EXPIRED_TIME);
+ }
+
+ public void timeReached() {
+ if (_dead)
+ return;
+ if (!_messageStatusMap.isEmpty()) {
+ for (Iterator iter = _messageStatusMap.values().iterator(); iter.hasNext(); ) {
+ Connection con = iter.next();
+ if (!con.getIsConnected() || con.getLifetime() > 2*60*1000L)
+ iter.remove();
+ }
+ }
+ schedule(REMOVE_EXPIRED_TIME);
+ }
+ }
}
diff --git a/core/java/src/net/i2p/client/MessageState.java b/core/java/src/net/i2p/client/MessageState.java
index ea1b4284dc..e56eb9a8a3 100644
--- a/core/java/src/net/i2p/client/MessageState.java
+++ b/core/java/src/net/i2p/client/MessageState.java
@@ -165,6 +165,7 @@ class MessageState {
case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL:
// trumps all
_state = State.SUCCESS;
+ break;
default:
break;