From 69f051da41a37d5e28736ed96f1e0da3b371699b Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 4 Feb 2009 14:17:10 +0000 Subject: [PATCH] concurrentify TunnelDispatcher --- .../i2p/router/tunnel/TunnelDispatcher.java | 142 ++++++------------ 1 file changed, 44 insertions(+), 98 deletions(-) diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 8bb4781fe..de29a9540 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -2,8 +2,8 @@ package net.i2p.router.tunnel; import java.io.IOException; import java.io.Writer; +import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,12 +30,11 @@ import net.i2p.util.Log; public class TunnelDispatcher implements Service { private RouterContext _context; private Log _log; - private Map _outboundGateways; - private Map _outboundEndpoints; - private Map _participants; - private Map _inboundGateways; - /** id to HopConfig */ - private Map _participatingConfig; + private Map _outboundGateways; + private Map _outboundEndpoints; + private Map _participants; + private Map _inboundGateways; + private Map _participatingConfig; /** what is the date/time on which the last non-locally-created tunnel expires? */ private long _lastParticipatingExpiration; private BloomFilterIVValidator _validator; @@ -48,11 +47,11 @@ public class TunnelDispatcher implements Service { public TunnelDispatcher(RouterContext ctx) { _context = ctx; _log = ctx.logManager().getLog(TunnelDispatcher.class); - _outboundGateways = new HashMap(); - _outboundEndpoints = new HashMap(); - _participants = new HashMap(); - _inboundGateways = new HashMap(); - _participatingConfig = new HashMap(); + _outboundGateways = new ConcurrentHashMap(); + _outboundEndpoints = new ConcurrentHashMap(); + _participants = new ConcurrentHashMap(); + _inboundGateways = new ConcurrentHashMap(); + _participatingConfig = new ConcurrentHashMap(); _lastParticipatingExpiration = 0; _lastDropTime = 0; _validator = null; @@ -158,17 +157,13 @@ public class TunnelDispatcher implements Service { //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper); TunnelId outId = cfg.getConfig(0).getSendTunnel(); - synchronized (_outboundGateways) { - _outboundGateways.put(outId, gw); - } + _outboundGateways.put(outId, gw); _context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0); _context.messageHistory().tunnelJoined("outbound", cfg); } else { TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelId outId = cfg.getConfig(0).getSendTunnel(); - synchronized (_outboundGateways) { - _outboundGateways.put(outId, gw); - } + _outboundGateways.put(outId, gw); _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0); _context.messageHistory().tunnelJoined("outboundZeroHop", cfg); } @@ -183,17 +178,13 @@ public class TunnelDispatcher implements Service { if (cfg.getLength() > 1) { TunnelParticipant participant = new TunnelParticipant(_context, new InboundEndpointProcessor(_context, cfg, _validator)); TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel(); - synchronized (_participants) { - _participants.put(recvId, participant); - } + _participants.put(recvId, participant); _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0); _context.messageHistory().tunnelJoined("inboundEndpoint", cfg); } else { TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelId recvId = cfg.getConfig(0).getReceiveTunnel(); - synchronized (_inboundGateways) { - _inboundGateways.put(recvId, gw); - } + _inboundGateways.put(recvId, gw); _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0); _context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg); } @@ -208,12 +199,8 @@ public class TunnelDispatcher implements Service { _log.info("Joining as participant: " + cfg); TunnelId recvId = cfg.getReceiveTunnel(); TunnelParticipant participant = new TunnelParticipant(_context, cfg, new HopProcessor(_context, cfg, _validator)); - synchronized (_participants) { - _participants.put(recvId, participant); - } - synchronized (_participatingConfig) { - _participatingConfig.put(recvId, cfg); - } + _participants.put(recvId, participant); + _participatingConfig.put(recvId, cfg); _context.messageHistory().tunnelJoined("participant", cfg); _context.statManager().addRateData("tunnel.joinParticipant", 1, 0); if (cfg.getExpiration() > _lastParticipatingExpiration) @@ -229,12 +216,8 @@ public class TunnelDispatcher implements Service { _log.info("Joining as outbound endpoint: " + cfg); TunnelId recvId = cfg.getReceiveTunnel(); OutboundTunnelEndpoint endpoint = new OutboundTunnelEndpoint(_context, cfg, new HopProcessor(_context, cfg, _validator)); - synchronized (_outboundEndpoints) { - _outboundEndpoints.put(recvId, endpoint); - } - synchronized (_participatingConfig) { - _participatingConfig.put(recvId, cfg); - } + _outboundEndpoints.put(recvId, endpoint); + _participatingConfig.put(recvId, cfg); _context.messageHistory().tunnelJoined("outboundEndpoint", cfg); _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0); @@ -256,12 +239,8 @@ public class TunnelDispatcher implements Service { //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper); TunnelId recvId = cfg.getReceiveTunnel(); - synchronized (_inboundGateways) { - _inboundGateways.put(recvId, gw); - } - synchronized (_participatingConfig) { - _participatingConfig.put(recvId, cfg); - } + _inboundGateways.put(recvId, gw); + _participatingConfig.put(recvId, cfg); _context.messageHistory().tunnelJoined("inboundGateway", cfg); _context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0); @@ -271,9 +250,7 @@ public class TunnelDispatcher implements Service { } public int getParticipatingCount() { - synchronized (_participatingConfig) { - return _participatingConfig.size(); - } + return _participatingConfig.size(); } /** what is the date/time on which the last non-locally-created tunnel expires? */ @@ -287,14 +264,9 @@ public class TunnelDispatcher implements Service { TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel(); if (_log.shouldLog(Log.DEBUG)) _log.debug("removing our own inbound " + cfg); - TunnelParticipant participant = null; - synchronized (_participants) { - participant = (TunnelParticipant)_participants.remove(recvId); - } + TunnelParticipant participant = _participants.remove(recvId); if (participant == null) { - synchronized (_inboundGateways) { - _inboundGateways.remove(recvId); - } + _inboundGateways.remove(recvId); } else { // update stats based off getCompleteCount() + getFailedCount() for (int i = 0; i < cfg.getLength(); i++) { @@ -311,10 +283,7 @@ public class TunnelDispatcher implements Service { if (_log.shouldLog(Log.DEBUG)) _log.debug("removing our own outbound " + cfg); TunnelId outId = cfg.getConfig(0).getSendTunnel(); - TunnelGateway gw = null; - synchronized (_outboundGateways) { - gw = (TunnelGateway)_outboundGateways.remove(outId); - } + TunnelGateway gw = _outboundGateways.remove(outId); if (gw != null) { // update stats based on gw.getMessagesSent() } @@ -339,26 +308,17 @@ public class TunnelDispatcher implements Service { if (_log.shouldLog(Log.DEBUG)) _log.debug("removing " + cfg); - boolean removed = false; - synchronized (_participatingConfig) { - removed = (null != _participatingConfig.remove(recvId)); - } + boolean removed = (null != _participatingConfig.remove(recvId)); if (!removed) { if (_log.shouldLog(Log.WARN)) _log.warn("Participating tunnel, but no longer listed in participatingConfig? " + cfg); } - synchronized (_participants) { - removed = (null != _participants.remove(recvId)); - } + removed = (null != _participants.remove(recvId)); if (removed) return; - synchronized (_inboundGateways) { - removed = (null != _inboundGateways.remove(recvId)); - } + removed = (null != _inboundGateways.remove(recvId)); if (removed) return; - synchronized (_outboundEndpoints) { - removed = (null != _outboundEndpoints.remove(recvId)); - } + _outboundEndpoints.remove(recvId); } /** @@ -372,10 +332,7 @@ public class TunnelDispatcher implements Service { */ public void dispatch(TunnelDataMessage msg, Hash recvFrom) { long before = System.currentTimeMillis(); - TunnelParticipant participant = null; - synchronized (_participants) { - participant = (TunnelParticipant)_participants.get(msg.getTunnelIdObj()); - } + TunnelParticipant participant = _participants.get(msg.getTunnelIdObj()); if (participant != null) { // we are either just a random participant or the inbound endpoint if (_log.shouldLog(Log.DEBUG)) @@ -385,10 +342,7 @@ public class TunnelDispatcher implements Service { participant.dispatch(msg, recvFrom); _context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0); } else { - OutboundTunnelEndpoint endpoint = null; - synchronized (_outboundEndpoints) { - endpoint = (OutboundTunnelEndpoint)_outboundEndpoints.get(msg.getTunnelIdObj()); - } + OutboundTunnelEndpoint endpoint = _outboundEndpoints.get(msg.getTunnelIdObj()); if (endpoint != null) { // we are the outobund endpoint if (_log.shouldLog(Log.DEBUG)) @@ -421,10 +375,7 @@ public class TunnelDispatcher implements Service { */ public void dispatch(TunnelGatewayMessage msg) { long before = System.currentTimeMillis(); - TunnelGateway gw = null; - synchronized (_inboundGateways) { - gw = (TunnelGateway)_inboundGateways.get(msg.getTunnelId()); - } + TunnelGateway gw = _inboundGateways.get(msg.getTunnelId()); if (gw != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("dispatch where we are the inbound gateway: " + gw + ": " + msg); @@ -489,10 +440,7 @@ public class TunnelDispatcher implements Service { public void dispatchOutbound(I2NPMessage msg, TunnelId outboundTunnel, TunnelId targetTunnel, Hash targetPeer) { if (outboundTunnel == null) throw new IllegalArgumentException("wtf, null outbound tunnel?"); long before = _context.clock().now(); - TunnelGateway gw = null; - synchronized (_outboundGateways) { - gw = (TunnelGateway)_outboundGateways.get(outboundTunnel); - } + TunnelGateway gw = _outboundGateways.get(outboundTunnel); if (gw != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("dispatch outbound through " + outboundTunnel.getTunnelId() @@ -538,10 +486,8 @@ public class TunnelDispatcher implements Service { _context.statManager().addRateData("tunnel.dispatchOutboundTime", dispatchTime, dispatchTime); } - public List listParticipatingTunnels() { - synchronized (_participatingConfig) { - return new ArrayList(_participatingConfig.values()); - } + public List listParticipatingTunnels() { + return new ArrayList(_participatingConfig.values()); } /** @@ -554,7 +500,7 @@ public class TunnelDispatcher implements Service { * and computing the average from that. */ public void updateParticipatingStats() { - List participating = listParticipatingTunnels(); + List participating = listParticipatingTunnels(); int size = participating.size(); long count = 0; long bw = 0; @@ -563,7 +509,7 @@ public class TunnelDispatcher implements Service { long tooYoung = _context.clock().now() - 60*1000; long tooOld = tooYoung - 9*60*1000; for (int i = 0; i < size; i++) { - HopConfig cfg = (HopConfig)participating.get(i); + HopConfig cfg = participating.get(i); long c = cfg.getRecentMessagesCount(); bw += c; bwOut += cfg.getRecentSentMessagesCount(); @@ -645,7 +591,7 @@ public class TunnelDispatcher implements Service { public void dropBiggestParticipating() { - List partTunnels = listParticipatingTunnels(); + List partTunnels = listParticipatingTunnels(); if ((partTunnels == null) || (partTunnels.size() == 0)) { if (_log.shouldLog(Log.ERROR)) _log.error("Not dropping tunnel, since partTunnels was null or had 0 items!"); @@ -668,7 +614,7 @@ public class TunnelDispatcher implements Service { for (int i=0; i _configs; + private List _times; public LeaveTunnel(RouterContext ctx) { super(ctx); @@ -765,12 +711,12 @@ public class TunnelDispatcher implements Service { synchronized (LeaveTunnel.this) { if (_configs.size() <= 0) return; - nextTime = (Long)_times.get(0); + nextTime = _times.get(0); if (nextTime.longValue() <= now) { - cur = (HopConfig)_configs.remove(0); + cur = _configs.remove(0); _times.remove(0); if (_times.size() > 0) - nextTime = (Long)_times.get(0); + nextTime = _times.get(0); else nextTime = null; } else {