* I2CP: Per-message status codes back through streaming (ticket #788)

- New I2PSessionException
   - Streaming PacketQueue requests status for SYNs on outbound conns
   - PacketQueue throws I2PSessionException in streams
This commit is contained in:
zzz
2014-05-18 00:05:13 +00:00
parent 1acd5caaa8
commit d32b4e9f24
4 changed files with 259 additions and 4 deletions

View File

@ -0,0 +1,112 @@
package net.i2p.client.streaming;
import java.net.SocketException;
import net.i2p.client.SendMessageStatusListener;
import net.i2p.data.i2cp.MessageStatusMessage;
/**
* An I2P-specific IOException thrown from input and output streams.
* with a stored status code to be used for programmatic responses.
*
* @since 0.9.14
*/
public class I2PSocketException extends SocketException {
private final int _status;
private static final int CUSTOM = -1;
/**
* Use canned message for this status code
* @param status >= 0 from MessageStatusMessage or SendMessageStatusListener
*/
public I2PSocketException(int status) {
super();
_status = status;
}
/**
* Use message provided
*/
public I2PSocketException(String message) {
super(message);
_status = CUSTOM;
}
/**
* For programmatic action based on specific failure code
*
* @return value from int constructor or -1 for String constructor
*/
public int getStatus() {
return _status;
}
/**
* For programmatic action based on specific failure code
*
* @return canned message based on status in int constructor or message from String constructor
*/
@Override
public String getMessage() {
switch (_status) {
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
return "Message timeout";
case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
return "Failed delivery to local destination";
case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
return "Local router failure";
case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
return "Local network failure";
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:
return "Session closed";
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE:
return "Invalid message";
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS:
return "Invalid message options";
case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW:
return "Buffer overflow";
case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
return "Message expired";
case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET:
return "Local lease set invalid";
case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
return "No local tunnels";
case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION:
return "Unsupported encryption options";
case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION:
return "Invalid destination";
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET:
return "Local router failure";
case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET:
return "Destination lease set expired";
case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET:
return "Destination lease set not found";
case SendMessageStatusListener.STATUS_CANCELLED:
return "Local destination shutdown";
case CUSTOM:
return super.getMessage();
default:
return "Failure code: " + _status;
}
}
}

View File

@ -617,6 +617,7 @@ class ConnectionManager {
disconnectAllHard();
_tcbShare.stop();
_timer.stop();
_outboundQueue.close();
}
/**

View File

@ -1,12 +1,21 @@
package net.i2p.client.streaming.impl;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.SendMessageOptions;
import net.i2p.client.SendMessageStatusListener;
import net.i2p.client.streaming.I2PSocketException;
import net.i2p.data.ByteArray;
import net.i2p.data.i2cp.MessageStatusMessage;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* Queue out packets to be sent through the session.
@ -16,12 +25,14 @@ import net.i2p.util.Log;
*<p>
* MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession
*/
class PacketQueue {
class PacketQueue implements SendMessageStatusListener {
private final I2PAppContext _context;
private final Log _log;
private final I2PSession _session;
private final ConnectionManager _connectionManager;
private final ByteCache _cache = ByteCache.getInstance(64, 36*1024);
private final Map<Long, Connection> _messageStatusMap;
private volatile boolean _dead;
private static final int FLAGS_INITIAL_TAGS = Packet.FLAG_SYNCHRONIZE;
private static final int FLAGS_FINAL_TAGS = Packet.FLAG_CLOSE |
@ -32,14 +43,27 @@ class PacketQueue {
private static final int TAG_WINDOW_FACTOR = 5;
private static final int FINAL_TAGS_TO_SEND = 4;
private static final int FINAL_TAG_THRESHOLD = 2;
private static final long REMOVE_EXPIRED_TIME = 67*1000;
public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
_context = context;
_session = session;
_connectionManager = mgr;
_log = context.logManager().getLog(PacketQueue.class);
_messageStatusMap = new ConcurrentHashMap<Long, Connection>(16);
new RemoveExpired();
// all createRateStats in ConnectionManager
}
/**
* Cannot be restarted.
*
* @since 0.9.14
*/
public void close() {
_dead = true;
_messageStatusMap.clear();
}
/**
* Add a new packet to be sent out ASAP
@ -48,6 +72,8 @@ class PacketQueue {
* @return true if sent
*/
public boolean enqueue(PacketLocal packet) {
if (_dead)
return false;
// this updates the ack/nack field
packet.prepare();
@ -102,10 +128,15 @@ class PacketQueue {
SendMessageOptions options = new SendMessageOptions();
if (expires > 0)
options.setDate(expires);
boolean listenForStatus = false;
if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) {
Connection con = packet.getConnection();
if (con != null && con.isInbound())
options.setSendLeaseSet(false);
if (con != null) {
if (con.isInbound())
options.setSendLeaseSet(false);
else
listenForStatus = true;
}
options.setTagsToSend(INITIAL_TAGS_TO_SEND);
options.setTagThreshold(MIN_TAG_THRESHOLD);
} else if (packet.isFlagSet(FLAGS_FINAL_TAGS)) {
@ -130,9 +161,17 @@ class PacketQueue {
options.setTagThreshold(thresh);
}
}
sent = _session.sendMessage(packet.getTo(), buf, 0, size,
if (listenForStatus) {
long id = _session.sendMessage(packet.getTo(), buf, 0, size,
I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(),
options, this);
_messageStatusMap.put(Long.valueOf(id), packet.getConnection());
sent = true;
} else {
sent = _session.sendMessage(packet.getTo(), buf, 0, size,
I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(),
options);
}
end = _context.clock().now();
if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) )
@ -192,5 +231,107 @@ class PacketQueue {
}
return sent;
}
/**
* SendMessageStatusListener interface
*
* Tell the client of an update in the send status for a message
* previously sent with I2PSession.sendMessage().
* Multiple calls for a single message ID are possible.
*
* @param session session notifying
* @param msgId message number returned from a previous sendMessage() call
* @param status of the message, as defined in MessageStatusMessage and this class.
* @since 0.9.14
*/
public void messageStatus(I2PSession session, long msgId, int status) {
if (_dead)
return;
Connection con = _messageStatusMap.get(Long.valueOf(msgId));
if (con == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Rcvd status " + status + " for msg " + msgId + " on unknown connection");
return;
}
switch (status) {
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
_messageStatusMap.remove(Long.valueOf(msgId));
if (_log.shouldLog(Log.WARN))
_log.warn("Rcvd soft failure status " + status + " for msg " + msgId + " on " + con);
break;
case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE:
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS:
case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW:
case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET:
case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION:
case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION:
case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET:
case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET:
case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET:
case SendMessageStatusListener.STATUS_CANCELLED:
IOException ioe = new I2PSocketException(status);
if (_log.shouldLog(Log.WARN))
_log.warn("Rcvd hard failure status " + status + " for msg " + msgId + " on " + con);
_messageStatusMap.remove(Long.valueOf(msgId));
con.getOutputStream().streamErrorOccurred(ioe);
con.getInputStream().streamErrorOccurred(ioe);
con.setConnectionError("barf boof bazzle code " + status);
con.disconnect(false);
break;
case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL:
if (_log.shouldLog(Log.WARN))
_log.warn("Rcvd success status " + status + " for msg " + msgId + " on " + con);
_messageStatusMap.remove(Long.valueOf(msgId));
break;
case MessageStatusMessage.STATUS_SEND_ACCEPTED:
if (_log.shouldLog(Log.WARN))
_log.warn("Rcvd accept status " + status + " for msg " + msgId + " on " + con);
break;
default:
if (_log.shouldLog(Log.WARN))
_log.warn("Rcvd unknown status " + status + " for msg " + msgId + " on " + con);
_messageStatusMap.remove(Long.valueOf(msgId));
break;
}
}
/**
* Check for expired message states, without wastefully setting a timer for each
* message.
* @since 0.9.14
*/
private class RemoveExpired extends SimpleTimer2.TimedEvent {
public RemoveExpired() {
super(_context.simpleTimer2(), REMOVE_EXPIRED_TIME);
}
public void timeReached() {
if (_dead)
return;
if (!_messageStatusMap.isEmpty()) {
for (Iterator<Connection> iter = _messageStatusMap.values().iterator(); iter.hasNext(); ) {
Connection con = iter.next();
if (!con.getIsConnected() || con.getLifetime() > 2*60*1000L)
iter.remove();
}
}
schedule(REMOVE_EXPIRED_TIME);
}
}
}

View File

@ -165,6 +165,7 @@ class MessageState {
case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL:
// trumps all
_state = State.SUCCESS;
break;
default:
break;