* track the message progress through the send process more carefully
* drop the outbound message as soon as it expires rather than transferring an expired message * drop hard any outbound message that takes us over 5 seconds to process (if we have a 5s message processing time, we do no one any good) * don't try to resend (only useful when dealing with multiple transports - aka insufficiently tested code) * don't republish netDb messages as often
This commit is contained in:
@ -85,10 +85,21 @@ public class OutNetMessage {
|
|||||||
"OutNetMessage", new long[] { 5*60*1000, 30*60*1000, 60*60*1000 });
|
"OutNetMessage", new long[] { 5*60*1000, 30*60*1000, 60*60*1000 });
|
||||||
}
|
}
|
||||||
|
|
||||||
public void timestamp(String eventName) {
|
/**
|
||||||
|
* Stamp the message's progress
|
||||||
|
*
|
||||||
|
* @param eventName what occurred
|
||||||
|
* @return how long this message has been 'in flight'
|
||||||
|
*/
|
||||||
|
public long timestamp(String eventName) {
|
||||||
synchronized (_timestamps) {
|
synchronized (_timestamps) {
|
||||||
_timestamps.put(eventName, new Long(_context.clock().now()));
|
long now = _context.clock().now();
|
||||||
|
while (_timestamps.containsKey(eventName)) {
|
||||||
|
eventName = eventName + '.';
|
||||||
|
}
|
||||||
|
_timestamps.put(eventName, new Long(now));
|
||||||
_timestampOrder.add(eventName);
|
_timestampOrder.add(eventName);
|
||||||
|
return now - _created;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
public Map getTimestamps() {
|
public Map getTimestamps() {
|
||||||
|
@ -286,7 +286,7 @@ public class Router {
|
|||||||
if ( (notSent > 0) && (notReceived > 0) ) {
|
if ( (notSent > 0) && (notReceived > 0) ) {
|
||||||
double notSendKBps = notSent / (lifetime*1024.0);
|
double notSendKBps = notSent / (lifetime*1024.0);
|
||||||
double notReceivedKBps = notReceived / (lifetime*1024.0);
|
double notReceivedKBps = notReceived / (lifetime*1024.0);
|
||||||
buf.append("<li>Lifetime rate: ");
|
buf.append("<li>Lifetime unused rate: ");
|
||||||
buf.append(fmt.format(notSendKBps)).append("KBps outbound unused ");
|
buf.append(fmt.format(notSendKBps)).append("KBps outbound unused ");
|
||||||
buf.append(fmt.format(notReceivedKBps)).append("KBps inbound unused");
|
buf.append(fmt.format(notReceivedKBps)).append("KBps inbound unused");
|
||||||
buf.append("</li>");
|
buf.append("</li>");
|
||||||
|
@ -23,7 +23,7 @@ import net.i2p.util.Log;
|
|||||||
class DataPublisherJob extends JobImpl {
|
class DataPublisherJob extends JobImpl {
|
||||||
private Log _log;
|
private Log _log;
|
||||||
private KademliaNetworkDatabaseFacade _facade;
|
private KademliaNetworkDatabaseFacade _facade;
|
||||||
private final static long RERUN_DELAY_MS = 60*1000;
|
private final static long RERUN_DELAY_MS = 120*1000;
|
||||||
private final static int MAX_SEND_PER_RUN = 1; // publish no more than 2 at a time
|
private final static int MAX_SEND_PER_RUN = 1; // publish no more than 2 at a time
|
||||||
private final static long STORE_TIMEOUT = 60*1000; // give 'er a minute to send the data
|
private final static long STORE_TIMEOUT = 60*1000; // give 'er a minute to send the data
|
||||||
|
|
||||||
|
@ -63,15 +63,28 @@ public abstract class TransportImpl implements Transport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void afterSend(OutNetMessage msg, boolean sendSuccessful) {
|
public void afterSend(OutNetMessage msg, boolean sendSuccessful) {
|
||||||
afterSend(msg, sendSuccessful, true);
|
afterSend(msg, sendSuccessful, true, 0);
|
||||||
}
|
}
|
||||||
public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue) {
|
public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue) {
|
||||||
|
afterSend(msg, sendSuccessful, allowRequeue, 0);
|
||||||
|
}
|
||||||
|
public void afterSend(OutNetMessage msg, boolean sendSuccessful, long msToSend) {
|
||||||
|
afterSend(msg, sendSuccessful, true, msToSend);
|
||||||
|
}
|
||||||
|
public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) {
|
||||||
boolean log = false;
|
boolean log = false;
|
||||||
msg.timestamp("afterSend(" + sendSuccessful + ")");
|
msg.timestamp("afterSend(" + sendSuccessful + ")");
|
||||||
|
|
||||||
if (!sendSuccessful)
|
if (!sendSuccessful)
|
||||||
msg.transportFailed(getStyle());
|
msg.transportFailed(getStyle());
|
||||||
|
|
||||||
|
if (msToSend > 1000) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn("afterSend: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte "
|
||||||
|
+ msg.getMessageType() + " " + msg.getMessageId() + " from "
|
||||||
|
+ _context.routerHash().toBase64().substring(0,6) + " took " + msToSend);
|
||||||
|
}
|
||||||
|
|
||||||
long lifetime = msg.getLifetime();
|
long lifetime = msg.getLifetime();
|
||||||
if (lifetime > 5000) {
|
if (lifetime > 5000) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
@ -104,23 +117,31 @@ public abstract class TransportImpl implements Transport {
|
|||||||
_context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime);
|
_context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime);
|
||||||
|
|
||||||
if (allowRequeue) {
|
if (allowRequeue) {
|
||||||
if ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) {
|
if (true) {
|
||||||
// this may not be the last transport available - keep going
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_context.outNetMessagePool().add(msg);
|
_log.error("wtf, requeueing message " + msg.getMessageId() + " of type " + msg.getMessageType(),
|
||||||
// don't discard the data yet!
|
new Exception("requeued by"));
|
||||||
} else {
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info("No more time left (" + new Date(msg.getExpiration())
|
|
||||||
+ ", expiring without sending successfully the "
|
|
||||||
+ msg.getMessageType());
|
|
||||||
if (msg.getOnFailedSendJob() != null)
|
|
||||||
_context.jobQueue().addJob(msg.getOnFailedSendJob());
|
|
||||||
MessageSelector selector = msg.getReplySelector();
|
|
||||||
if (selector != null) {
|
|
||||||
_context.messageRegistry().unregisterPending(msg);
|
|
||||||
}
|
|
||||||
log = true;
|
log = true;
|
||||||
msg.discardData();
|
msg.discardData();
|
||||||
|
} else {
|
||||||
|
if ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) {
|
||||||
|
// this may not be the last transport available - keep going
|
||||||
|
_context.outNetMessagePool().add(msg);
|
||||||
|
// don't discard the data yet!
|
||||||
|
} else {
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info("No more time left (" + new Date(msg.getExpiration())
|
||||||
|
+ ", expiring without sending successfully the "
|
||||||
|
+ msg.getMessageType());
|
||||||
|
if (msg.getOnFailedSendJob() != null)
|
||||||
|
_context.jobQueue().addJob(msg.getOnFailedSendJob());
|
||||||
|
MessageSelector selector = msg.getReplySelector();
|
||||||
|
if (selector != null) {
|
||||||
|
_context.messageRegistry().unregisterPending(msg);
|
||||||
|
}
|
||||||
|
log = true;
|
||||||
|
msg.discardData();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
@ -211,9 +232,13 @@ public abstract class TransportImpl implements Transport {
|
|||||||
protected abstract void outboundMessageReady();
|
protected abstract void outboundMessageReady();
|
||||||
|
|
||||||
public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) {
|
public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) {
|
||||||
if (_log.shouldLog(Log.INFO)) {
|
int level = Log.INFO;
|
||||||
|
if (msToReceive > 5000)
|
||||||
|
level = Log.ERROR;
|
||||||
|
if (_log.shouldLog(level)) {
|
||||||
StringBuffer buf = new StringBuffer(128);
|
StringBuffer buf = new StringBuffer(128);
|
||||||
buf.append("Message received: ").append(inMsg.getClass().getName());
|
buf.append("Message received: ").append(inMsg.getClass().getName());
|
||||||
|
buf.append(" / ").append(inMsg.getUniqueId());
|
||||||
buf.append(" in ").append(msToReceive).append("ms containing ");
|
buf.append(" in ").append(msToReceive).append("ms containing ");
|
||||||
buf.append(bytesReceived).append(" bytes ");
|
buf.append(bytesReceived).append(" bytes ");
|
||||||
buf.append(" from ");
|
buf.append(" from ");
|
||||||
@ -228,7 +253,7 @@ public abstract class TransportImpl implements Transport {
|
|||||||
if (_listener != null)
|
if (_listener != null)
|
||||||
buf.append(_listener);
|
buf.append(_listener);
|
||||||
|
|
||||||
_log.info(buf.toString());
|
_log.log(level, buf.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (remoteIdent != null)
|
if (remoteIdent != null)
|
||||||
@ -239,8 +264,9 @@ public abstract class TransportImpl implements Transport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive);
|
_context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive);
|
||||||
if (msToReceive > 1000)
|
if (msToReceive > 1000) {
|
||||||
_context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive);
|
_context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive);
|
||||||
|
}
|
||||||
|
|
||||||
//// this functionality is built into the InNetMessagePool
|
//// this functionality is built into the InNetMessagePool
|
||||||
//String type = inMsg.getClass().getName();
|
//String type = inMsg.getClass().getName();
|
||||||
|
@ -309,7 +309,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
for (int i = 0; i < removed.size(); i++) {
|
for (int i = 0; i < removed.size(); i++) {
|
||||||
OutNetMessage cur = (OutNetMessage)removed.get(i);
|
OutNetMessage cur = (OutNetMessage)removed.get(i);
|
||||||
msg.timestamp("TCPConnection.addMessage expired but not our fault");
|
msg.timestamp("TCPConnection.addMessage expired but not our fault");
|
||||||
_transport.afterSend(cur, false);
|
_transport.afterSend(cur, false, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -331,7 +331,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
_context.profileManager().commErrorOccurred(_remoteIdentity.getHash());
|
_context.profileManager().commErrorOccurred(_remoteIdentity.getHash());
|
||||||
|
|
||||||
msg.timestamp("TCPConnection.addMessage saw an expired queued message");
|
msg.timestamp("TCPConnection.addMessage saw an expired queued message");
|
||||||
_transport.afterSend(msg, false);
|
_transport.afterSend(msg, false, false);
|
||||||
// should we really be closing a connection if they're that slow?
|
// should we really be closing a connection if they're that slow?
|
||||||
// yeah, i think we should.
|
// yeah, i think we should.
|
||||||
closeConnection();
|
closeConnection();
|
||||||
@ -490,7 +490,19 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
timedOut.add(cur);
|
timedOut.add(cur);
|
||||||
_toBeSent.remove(i);
|
_toBeSent.remove(i);
|
||||||
i--;
|
i--;
|
||||||
}
|
} else {
|
||||||
|
long lifetime = cur.timestamp("TCPConnection.runner.locked_expireOldMessages still ok with "
|
||||||
|
+ (i) + " ahead and " + (_toBeSent.size()-i-1)
|
||||||
|
+ " behind on the queue");
|
||||||
|
if (lifetime > 5*1000) {
|
||||||
|
cur.timestamp("TCPConnection.runner.locked_expireOldMessages lifetime too long - " + lifetime);
|
||||||
|
if (timedOut == null)
|
||||||
|
timedOut = new ArrayList(2);
|
||||||
|
timedOut.add(cur);
|
||||||
|
_toBeSent.remove(i);
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean reallySlowFound = false;
|
boolean reallySlowFound = false;
|
||||||
@ -503,8 +515,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
+ " timed out while sitting on the TCP Connection's queue! was too slow by: "
|
+ " timed out while sitting on the TCP Connection's queue! was too slow by: "
|
||||||
+ (now-failed.getExpiration()) + "ms to "
|
+ (now-failed.getExpiration()) + "ms to "
|
||||||
+ _remoteIdentity.getHash().toBase64() + ": " + failed);
|
+ _remoteIdentity.getHash().toBase64() + ": " + failed);
|
||||||
failed.timestamp("TCPConnection.runner.locked_expireOldMessages expired");
|
failed.timestamp("TCPConnection.runner.locked_expireOldMessages expired with " + _toBeSent.size() + " left");
|
||||||
_transport.afterSend(failed, false);
|
_transport.afterSend(failed, false, false);
|
||||||
if (failed.getLifetime() >= MIN_MESSAGE_LIFETIME_FOR_PENALTY)
|
if (failed.getLifetime() >= MIN_MESSAGE_LIFETIME_FOR_PENALTY)
|
||||||
reallySlowFound = true;
|
reallySlowFound = true;
|
||||||
}
|
}
|
||||||
@ -521,6 +533,15 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
msg.timestamp("TCPConnection.runner.doSend fetched");
|
msg.timestamp("TCPConnection.runner.doSend fetched");
|
||||||
long afterExpire = _context.clock().now();
|
long afterExpire = _context.clock().now();
|
||||||
|
|
||||||
|
long remaining = msg.getExpiration() - afterExpire;
|
||||||
|
if (remaining < 0) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn("Message " + msg.getMessageType() + "/" + msg.getMessageId()
|
||||||
|
+ " expired before doSend (too slow by " + remaining + "ms)");
|
||||||
|
_transport.afterSend(msg, false, false);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
byte data[] = msg.getMessageData();
|
byte data[] = msg.getMessageData();
|
||||||
if (data == null) {
|
if (data == null) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
@ -573,7 +594,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
+ "ms) - time left (" + timeLeft + ") to "
|
+ "ms) - time left (" + timeLeft + ") to "
|
||||||
+ _remoteIdentity.getHash().toBase64() + "\n" + msg.toString());
|
+ _remoteIdentity.getHash().toBase64() + "\n" + msg.toString());
|
||||||
}
|
}
|
||||||
_transport.afterSend(msg, true);
|
_transport.afterSend(msg, true, (end-beforeWrite));
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("doSend - message sent completely: "
|
_log.debug("doSend - message sent completely: "
|
||||||
|
Reference in New Issue
Block a user