InNetMessagePool cleanup

This commit is contained in:
zzz
2009-12-16 17:25:09 +00:00
parent cdb390f7ce
commit 6b83fc6b3b

View File

@ -18,8 +18,6 @@ import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.data.i2np.DatabaseSearchReplyMessage; import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
//import net.i2p.data.i2np.TunnelCreateMessage;
//import net.i2p.data.i2np.TunnelCreateStatusMessage;
import net.i2p.data.i2np.TunnelDataMessage; import net.i2p.data.i2np.TunnelDataMessage;
import net.i2p.data.i2np.TunnelGatewayMessage; import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
@ -35,6 +33,7 @@ public class InNetMessagePool implements Service {
private Log _log; private Log _log;
private RouterContext _context; private RouterContext _context;
private HandlerJobBuilder _handlerJobBuilders[]; private HandlerJobBuilder _handlerJobBuilders[];
/** following 5 unused unless DISPATCH_DIRECT == false */
private final List _pendingDataMessages; private final List _pendingDataMessages;
private final List _pendingDataMessagesFrom; private final List _pendingDataMessagesFrom;
private final List _pendingGatewayMessages; private final List _pendingGatewayMessages;
@ -63,21 +62,27 @@ public class InNetMessagePool implements Service {
public InNetMessagePool(RouterContext context) { public InNetMessagePool(RouterContext context) {
_context = context; _context = context;
// 32 is greater than the max I2NP message type number (currently 22) + 1
_handlerJobBuilders = new HandlerJobBuilder[32]; _handlerJobBuilders = new HandlerJobBuilder[32];
_pendingDataMessages = new ArrayList(16); if (DISPATCH_DIRECT) {
_pendingDataMessagesFrom = new ArrayList(16); // keep the compiler happy since they are final
_pendingGatewayMessages = new ArrayList(16); _pendingDataMessages = null;
_shortCircuitDataJob = new SharedShortCircuitDataJob(context); _pendingDataMessagesFrom = null;
_shortCircuitGatewayJob = new SharedShortCircuitGatewayJob(context); _pendingGatewayMessages = null;
} else {
_pendingDataMessages = new ArrayList(16);
_pendingDataMessagesFrom = new ArrayList(16);
_pendingGatewayMessages = new ArrayList(16);
_shortCircuitDataJob = new SharedShortCircuitDataJob(context);
_shortCircuitGatewayJob = new SharedShortCircuitGatewayJob(context);
}
_log = _context.logManager().getLog(InNetMessagePool.class); _log = _context.logManager().getLog(InNetMessagePool.class);
_alive = false; _alive = false;
_context.statManager().createRateStat("inNetPool.dropped", "How often do we drop a message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("inNetPool.dropped", "How often do we drop a message", "InNetPool", new long[] { 60*60*1000l });
_context.statManager().createRateStat("inNetPool.droppedDeliveryStatusDelay", "How long after a delivery status message is created do we receive it back again (for messages that are too slow to be handled)", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("inNetPool.droppedDeliveryStatusDelay", "How long after a delivery status message is created do we receive it back again (for messages that are too slow to be handled)", "InNetPool", new long[] { 60*60*1000l });
_context.statManager().createRateStat("inNetPool.duplicate", "How often do we receive a duplicate message", "InNetPool", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("inNetPool.duplicate", "How often do we receive a duplicate message", "InNetPool", new long[] { 60*60*1000l });
//_context.statManager().createRateStat("inNetPool.droppedTunnelCreateStatusMessage", "How often we drop a slow-to-arrive tunnel request response", "InNetPool", new long[] { 60*60*1000l, 24*60*60*1000l }); //_context.statManager().createRateStat("inNetPool.droppedTunnelCreateStatusMessage", "How often we drop a slow-to-arrive tunnel request response", "InNetPool", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("inNetPool.droppedDbLookupResponseMessage", "How often we drop a slow-to-arrive db search response", "InNetPool", new long[] { 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("inNetPool.droppedDbLookupResponseMessage", "How often we drop a slow-to-arrive db search response", "InNetPool", new long[] { 60*60*1000l });
_context.statManager().createRateStat("pool.dispatchDataTime", "How long a tunnel dispatch takes", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("pool.dispatchGatewayTime", "How long a tunnel gateway dispatch takes", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
} }
public HandlerJobBuilder registerHandlerJobBuilder(int i2npMessageType, HandlerJobBuilder builder) { public HandlerJobBuilder registerHandlerJobBuilder(int i2npMessageType, HandlerJobBuilder builder) {
@ -309,10 +314,12 @@ public class InNetMessagePool implements Service {
} }
public void shutdown() { public void shutdown() {
_alive = false; _alive = false;
synchronized (_pendingDataMessages) { if (!DISPATCH_DIRECT) {
_pendingDataMessages.clear(); synchronized (_pendingDataMessages) {
_pendingDataMessagesFrom.clear(); _pendingDataMessages.clear();
_pendingDataMessages.notifyAll(); _pendingDataMessagesFrom.clear();
_pendingDataMessages.notifyAll();
}
} }
} }
@ -324,6 +331,8 @@ public class InNetMessagePool implements Service {
_dispatchThreaded = Boolean.valueOf(threadedStr).booleanValue(); _dispatchThreaded = Boolean.valueOf(threadedStr).booleanValue();
} }
if (_dispatchThreaded) { if (_dispatchThreaded) {
_context.statManager().createRateStat("pool.dispatchDataTime", "How long a tunnel dispatch takes", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("pool.dispatchGatewayTime", "How long a tunnel gateway dispatch takes", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
I2PThread data = new I2PThread(new TunnelDataDispatcher(), "Tunnel data dispatcher"); I2PThread data = new I2PThread(new TunnelDataDispatcher(), "Tunnel data dispatcher");
data.setDaemon(true); data.setDaemon(true);
data.start(); data.start();
@ -333,6 +342,7 @@ public class InNetMessagePool implements Service {
} }
} }
/** unused unless DISPATCH_DIRECT == false */
private class SharedShortCircuitDataJob extends JobImpl { private class SharedShortCircuitDataJob extends JobImpl {
public SharedShortCircuitDataJob(RouterContext ctx) { public SharedShortCircuitDataJob(RouterContext ctx) {
super(ctx); super(ctx);
@ -355,6 +365,8 @@ public class InNetMessagePool implements Service {
getContext().jobQueue().addJob(SharedShortCircuitDataJob.this); getContext().jobQueue().addJob(SharedShortCircuitDataJob.this);
} }
} }
/** unused unless DISPATCH_DIRECT == false */
private class SharedShortCircuitGatewayJob extends JobImpl { private class SharedShortCircuitGatewayJob extends JobImpl {
public SharedShortCircuitGatewayJob(RouterContext ctx) { public SharedShortCircuitGatewayJob(RouterContext ctx) {
super(ctx); super(ctx);
@ -375,6 +387,7 @@ public class InNetMessagePool implements Service {
} }
} }
/** unused unless router.dispatchThreaded=true */
private class TunnelGatewayDispatcher implements Runnable { private class TunnelGatewayDispatcher implements Runnable {
public void run() { public void run() {
while (_alive) { while (_alive) {
@ -403,6 +416,8 @@ public class InNetMessagePool implements Service {
} }
} }
} }
/** unused unless router.dispatchThreaded=true */
private class TunnelDataDispatcher implements Runnable { private class TunnelDataDispatcher implements Runnable {
public void run() { public void run() {
while (_alive) { while (_alive) {