- Better fix for logging dropped messages (ticket #758)
   - Implement fast receive to reduce per-message handshakes
   - Make messageReliability=none the default
This commit is contained in:
zzz
2012-11-02 16:37:23 +00:00
parent d30aeb3902
commit d48fab9d98
9 changed files with 131 additions and 35 deletions

View File

@ -40,6 +40,24 @@ public interface I2PClient {
/** @since 0.8.1 */
public final static String PROP_RELIABILITY_NONE = "none";
/**
* For router->client payloads.
*
* If false, the router will send the MessageStatus,
* the client must respond with a ReceiveMessageBegin,
* the router will send the MessagePayload,
* and the client respond with a ReceiveMessageEnd.
*
* If true, the router will send the MessagePayload immediately,
* and will not send a MessageStatus.
* The client will not send ReceiveMessageBegin or ReceiveMessageEnd.
*
* Default false, but the implementation in this package sets to true.
*
* @since 0.9.4
*/
public final static String PROP_FAST_RECEIVE = "i2cp.fastReceive";
/** protocol flag that must be sent when opening the i2cp connection to the router */
public final static int PROTOCOL_BYTE = 0x2A;

View File

@ -136,6 +136,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
private long _lastActivity;
private boolean _isReduced;
private final boolean _fastReceive;
/**
* @since 0.8.9
@ -168,6 +169,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
if (options == null)
options = (Properties) System.getProperties().clone();
loadConfig(options);
_fastReceive = Boolean.parseBoolean(_options.getProperty(I2PClient.PROP_FAST_RECEIVE));
}
/**
@ -228,6 +230,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_options.setProperty("i2cp.password", configPW);
}
}
if (_options.getProperty(I2PClient.PROP_FAST_RECEIVE) == null)
_options.setProperty(I2PClient.PROP_FAST_RECEIVE, "true");
if (_options.getProperty(I2PClient.PROP_RELIABILITY) == null)
_options.setProperty(I2PClient.PROP_RELIABILITY, "none");
}
/** save some memory, don't pass along the pointless properties */
@ -280,6 +286,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
} catch (I2PSessionException ise) {}
}
/**
* @since 0.9.4
*/
public boolean getFastReceive() {
return _fastReceive;
}
void setLeaseSet(LeaseSet ls) {
_leaseSet = ls;
if (ls != null) {

View File

@ -40,10 +40,19 @@ class MessagePayloadMessageHandler extends HandlerImpl {
decryptPayload(msg, session);
session.addNewMessage(msg);
ReceiveMessageEndMessage m = new ReceiveMessageEndMessage();
m.setMessageId(id);
m.setSessionId(msg.getSessionId());
session.sendMessage(m);
// Small chance of this, but
// if we are a new I2P lib talking to an old router
// and we don't send this, the router will OOM as it has
// no cleaner for old messages.
// TODO after 0.9.4 is out, check router version from handshake
// and send it all the time if 0.9.3 or less
// (needs router version saving support in SetDateMessageHandler)
//if (!session.getFastReceive()) {
ReceiveMessageEndMessage m = new ReceiveMessageEndMessage();
m.setMessageId(id);
m.setSessionId(msg.getSessionId());
session.sendMessage(m);
//}
} catch (DataFormatException dfe) {
session.propogateError("Error handling a new payload message", dfe);
} catch (I2PSessionException ise) {

View File

@ -1,3 +1,20 @@
2012-11-02 zzz
* configstats: Fix group sorting, translate groups
* I2CP:
- Better fix for logging dropped messages (ticket #758)
- Implement fast receive to reduce per-message handshakes
- Make messageReliability=none the default
* i2psnark:
- Split buckets correctly
- More exploration fixes
* i2ptunnel:
- Better privkey backup file name
- Revert increment of privkey tunnel name
- Move deleted privkeys to backup dir
- Fix jsp build dependencies
- Fix layout issue on Chrome (ticket #757)
* KeyManager: Eliminate races, buffer I/O, eliminate periodic syncing
2012-10-31 zzz
* FIFOBandwidthRefiller: Reduce refill interval to smooth output
* I2CP: Reduce log level when outbound queue is full (ticket #758)

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 3;
public final static long BUILD = 4;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -90,6 +90,8 @@ class ClientConnectionRunner {
private volatile boolean _dead;
/** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */
private boolean _dontSendMSM;
/** For inbound traffic. true if i2cp.fastReceive = "true"; @since 0.9.4 */
private boolean _dontSendMSMOnReceive;
private final AtomicInteger _messageId; // messageId counter
// Was 32767 since the beginning (04-2004).
@ -113,6 +115,7 @@ class ClientConnectionRunner {
_log = _context.logManager().getLog(ClientConnectionRunner.class);
_manager = manager;
_socket = socket;
// unused for fastReceive
_messages = new ConcurrentHashMap();
_alreadyProcessed = new ArrayList();
_acceptedPending = new ConcurrentHashSet();
@ -136,7 +139,6 @@ class ClientConnectionRunner {
I2PThread t = new I2PThread(_writer);
t.setName("I2CP Writer " + __id.incrementAndGet());
t.setDaemon(true);
t.setPriority(I2PThread.MAX_PRIORITY);
t.start();
_out = new BufferedOutputStream(_socket.getOutputStream());
_reader.startReading();
@ -200,15 +202,24 @@ class ClientConnectionRunner {
/** already closed? */
boolean isDead() { return _dead; }
/** message body */
/**
* Only call if _dontSendMSMOnReceive is false, otherwise will always be null
*/
Payload getPayload(MessageId id) {
return _messages.get(id);
}
/**
* Only call if _dontSendMSMOnReceive is false
*/
void setPayload(MessageId id, Payload payload) {
_messages.put(id, payload);
if (!_dontSendMSMOnReceive)
_messages.put(id, payload);
}
/**
* Only call if _dontSendMSMOnReceive is false
*/
void removePayload(MessageId id) {
_messages.remove(id);
}
@ -221,8 +232,10 @@ class ClientConnectionRunner {
// We process a few options here, but most are handled by the tunnel manager.
// The ones here can't be changed later.
Properties opts = config.getOptions();
if (opts != null)
_dontSendMSM = "none".equals(config.getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
if (opts != null) {
_dontSendMSM = "none".equals(opts.getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
_dontSendMSMOnReceive = Boolean.parseBoolean(opts.getProperty(I2PClient.PROP_FAST_RECEIVE));
}
// per-destination session key manager to prevent rather easy correlation
if (_sessionKeyManager == null) {
int tags = TransientSessionKeyManager.DEFAULT_TAGS;
@ -306,7 +319,7 @@ class ClientConnectionRunner {
doSend(msg);
} catch (I2CPMessageException ime) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error writing out the disconnect message: " + ime);
_log.warn("Error writing out the disconnect message", ime);
}
// give it a little time to get sent out...
// even better would be to have stopRunning() flush it?
@ -378,7 +391,8 @@ class ClientConnectionRunner {
doSend(status);
_acceptedPending.remove(id);
} catch (I2CPMessageException ime) {
_log.error("Error writing out the message status message: " + ime);
if (_log.shouldLog(Log.WARN))
_log.warn("Error writing out the message status message", ime);
}
}
@ -388,7 +402,7 @@ class ClientConnectionRunner {
*/
void receiveMessage(Destination toDest, Destination fromDest, Payload payload) {
if (_dead) return;
MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload);
MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload, _dontSendMSMOnReceive);
// This is fast and non-blocking, run in-line
//_context.jobQueue().addJob(j);
j.runJob();
@ -680,7 +694,8 @@ class ClientConnectionRunner {
try {
doSend(msg);
} catch (I2CPMessageException ime) {
_log.warn("Error updating the status for message ID " + _messageId, ime);
if (_log.shouldLog(Log.WARN))
_log.warn("Error updating the status for message ID " + _messageId, ime);
}
}
}

View File

@ -69,8 +69,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
*/
public void messageReceived(I2CPMessageReader reader, I2CPMessage message) {
if (_runner.isDead()) return;
if (_log.shouldLog(Log.INFO))
_log.info("Message recieved: \n" + message);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Message received: \n" + message);
switch (message.getType()) {
case GetDateMessage.MESSAGE_TYPE:
handleGetDate(reader, (GetDateMessage)message);
@ -225,7 +225,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
MessageId id = _runner.distributeMessage(message);
long timeToDistribute = _context.clock().now() - beforeDistribute;
_runner.ackSendMessage(id, message.getNonce());
_context.statManager().addRateData("client.distributeTime", timeToDistribute, timeToDistribute);
_context.statManager().addRateData("client.distributeTime", timeToDistribute);
if ( (timeToDistribute > 50) && (_log.shouldLog(Log.WARN)) )
_log.warn("Took too long to distribute the message (which holds up the ack): " + timeToDistribute);
}
@ -253,7 +253,9 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
try {
_runner.doSend(msg);
} catch (I2CPMessageException ime) {
_log.error("Error delivering the payload", ime);
if (_log.shouldLog(Log.WARN))
_log.warn("Error delivering the payload", ime);
_runner.removePayload(new MessageId(message.getMessageId()));
}
}
@ -330,7 +332,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
try {
_runner.doSend(msg);
} catch (I2CPMessageException ime) {
_log.error("Error writing out the session status message", ime);
if (_log.shouldLog(Log.WARN))
_log.warn("Error writing out the session status message", ime);
}
}
@ -348,7 +351,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
try {
_runner.doSend(msg);
} catch (I2CPMessageException ime) {
_log.error("Error writing out the session status message", ime);
if (_log.shouldLog(Log.WARN))
_log.warn("Error writing out the session status message", ime);
}
}

View File

@ -12,40 +12,51 @@ import net.i2p.data.Destination;
import net.i2p.data.Payload;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.MessageStatusMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Async job to notify the client that a new message is available for them
* Async job to notify the client that a new message is available for them,
* or just send it directly if specified.
*
*/
class MessageReceivedJob extends JobImpl {
private final Log _log;
private final ClientConnectionRunner _runner;
private final Payload _payload;
private final boolean _sendDirect;
public MessageReceivedJob(RouterContext ctx, ClientConnectionRunner runner, Destination toDest, Destination fromDest, Payload payload) {
public MessageReceivedJob(RouterContext ctx, ClientConnectionRunner runner, Destination toDest,
Destination fromDest, Payload payload, boolean sendDirect) {
super(ctx);
_log = ctx.logManager().getLog(MessageReceivedJob.class);
_runner = runner;
_payload = payload;
_sendDirect = sendDirect;
}
public String getName() { return "Deliver New Message"; }
public void runJob() {
if (_runner.isDead()) return;
MessageId id = new MessageId();
id.setMessageId(_runner.getNextMessageId());
_runner.setPayload(id, _payload);
MessageId id = null;
long nextID = _runner.getNextMessageId();
try {
messageAvailable(id, _payload.getSize());
if (_sendDirect) {
sendMessage(nextID);
} else {
id = new MessageId(nextID);
_runner.setPayload(id, _payload);
messageAvailable(id, _payload.getSize());
}
} catch (I2CPMessageException ime) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the message status message", ime);
_runner.removePayload(id);
if (_log.shouldLog(Log.WARN))
_log.warn("Error writing out the message", ime);
if (!_sendDirect)
_runner.removePayload(id);
}
}
@ -65,4 +76,16 @@ class MessageReceivedJob extends JobImpl {
msg.setStatus(MessageStatusMessage.STATUS_AVAILABLE);
_runner.doSend(msg);
}
/**
* Deliver the message directly, skip notification
* @since 0.9.4
*/
private void sendMessage(long id) throws I2CPMessageException {
MessagePayloadMessage msg = new MessagePayloadMessage();
msg.setMessageId(id);
msg.setSessionId(_runner.getSessionId().getSessionId());
msg.setPayload(_payload);
_runner.doSend(msg);
}
}

View File

@ -60,16 +60,13 @@ class QueuedClientConnectionRunner extends ClientConnectionRunner {
/**
* Actually send the I2CPMessage to the client.
* Nonblocking.
* @throws I2CPMessageException if queue full or on other errors
*/
@Override
void doSend(I2CPMessage msg) throws I2CPMessageException {
// This will never fail, for now, as the router uses unbounded queues
// Perhaps in the future we may want to use bounded queues,
// with non-blocking writes for the router
// and blocking writes for the client?
boolean success = queue.offer(msg);
if (!success && _log.shouldLog(Log.WARN))
_log.warn("I2CP write to queue failed: " + msg);
if (!success)
throw new I2CPMessageException("I2CP write to queue failed");
}
}