concurrentify TunnelDispatcher

This commit is contained in:
zzz
2009-02-04 14:17:10 +00:00
parent 5946c35a88
commit 69f051da41

View File

@ -2,8 +2,8 @@ package net.i2p.router.tunnel;
import java.io.IOException; import java.io.IOException;
import java.io.Writer; import java.io.Writer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -30,12 +30,11 @@ import net.i2p.util.Log;
public class TunnelDispatcher implements Service { public class TunnelDispatcher implements Service {
private RouterContext _context; private RouterContext _context;
private Log _log; private Log _log;
private Map _outboundGateways; private Map<TunnelId, TunnelGateway> _outboundGateways;
private Map _outboundEndpoints; private Map<TunnelId, OutboundTunnelEndpoint> _outboundEndpoints;
private Map _participants; private Map<TunnelId, TunnelParticipant> _participants;
private Map _inboundGateways; private Map<TunnelId, TunnelGateway> _inboundGateways;
/** id to HopConfig */ private Map<TunnelId, HopConfig> _participatingConfig;
private Map _participatingConfig;
/** what is the date/time on which the last non-locally-created tunnel expires? */ /** what is the date/time on which the last non-locally-created tunnel expires? */
private long _lastParticipatingExpiration; private long _lastParticipatingExpiration;
private BloomFilterIVValidator _validator; private BloomFilterIVValidator _validator;
@ -48,11 +47,11 @@ public class TunnelDispatcher implements Service {
public TunnelDispatcher(RouterContext ctx) { public TunnelDispatcher(RouterContext ctx) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(TunnelDispatcher.class); _log = ctx.logManager().getLog(TunnelDispatcher.class);
_outboundGateways = new HashMap(); _outboundGateways = new ConcurrentHashMap();
_outboundEndpoints = new HashMap(); _outboundEndpoints = new ConcurrentHashMap();
_participants = new HashMap(); _participants = new ConcurrentHashMap();
_inboundGateways = new HashMap(); _inboundGateways = new ConcurrentHashMap();
_participatingConfig = new HashMap(); _participatingConfig = new ConcurrentHashMap();
_lastParticipatingExpiration = 0; _lastParticipatingExpiration = 0;
_lastDropTime = 0; _lastDropTime = 0;
_validator = null; _validator = null;
@ -158,17 +157,13 @@ public class TunnelDispatcher implements Service {
//TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper); TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
TunnelId outId = cfg.getConfig(0).getSendTunnel(); TunnelId outId = cfg.getConfig(0).getSendTunnel();
synchronized (_outboundGateways) { _outboundGateways.put(outId, gw);
_outboundGateways.put(outId, gw);
}
_context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0); _context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0);
_context.messageHistory().tunnelJoined("outbound", cfg); _context.messageHistory().tunnelJoined("outbound", cfg);
} else { } else {
TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
TunnelId outId = cfg.getConfig(0).getSendTunnel(); TunnelId outId = cfg.getConfig(0).getSendTunnel();
synchronized (_outboundGateways) { _outboundGateways.put(outId, gw);
_outboundGateways.put(outId, gw);
}
_context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0); _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0);
_context.messageHistory().tunnelJoined("outboundZeroHop", cfg); _context.messageHistory().tunnelJoined("outboundZeroHop", cfg);
} }
@ -183,17 +178,13 @@ public class TunnelDispatcher implements Service {
if (cfg.getLength() > 1) { if (cfg.getLength() > 1) {
TunnelParticipant participant = new TunnelParticipant(_context, new InboundEndpointProcessor(_context, cfg, _validator)); TunnelParticipant participant = new TunnelParticipant(_context, new InboundEndpointProcessor(_context, cfg, _validator));
TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel(); TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel();
synchronized (_participants) { _participants.put(recvId, participant);
_participants.put(recvId, participant);
}
_context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0); _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0);
_context.messageHistory().tunnelJoined("inboundEndpoint", cfg); _context.messageHistory().tunnelJoined("inboundEndpoint", cfg);
} else { } else {
TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
TunnelId recvId = cfg.getConfig(0).getReceiveTunnel(); TunnelId recvId = cfg.getConfig(0).getReceiveTunnel();
synchronized (_inboundGateways) { _inboundGateways.put(recvId, gw);
_inboundGateways.put(recvId, gw);
}
_context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0); _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0);
_context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg); _context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg);
} }
@ -208,12 +199,8 @@ public class TunnelDispatcher implements Service {
_log.info("Joining as participant: " + cfg); _log.info("Joining as participant: " + cfg);
TunnelId recvId = cfg.getReceiveTunnel(); TunnelId recvId = cfg.getReceiveTunnel();
TunnelParticipant participant = new TunnelParticipant(_context, cfg, new HopProcessor(_context, cfg, _validator)); TunnelParticipant participant = new TunnelParticipant(_context, cfg, new HopProcessor(_context, cfg, _validator));
synchronized (_participants) { _participants.put(recvId, participant);
_participants.put(recvId, participant); _participatingConfig.put(recvId, cfg);
}
synchronized (_participatingConfig) {
_participatingConfig.put(recvId, cfg);
}
_context.messageHistory().tunnelJoined("participant", cfg); _context.messageHistory().tunnelJoined("participant", cfg);
_context.statManager().addRateData("tunnel.joinParticipant", 1, 0); _context.statManager().addRateData("tunnel.joinParticipant", 1, 0);
if (cfg.getExpiration() > _lastParticipatingExpiration) if (cfg.getExpiration() > _lastParticipatingExpiration)
@ -229,12 +216,8 @@ public class TunnelDispatcher implements Service {
_log.info("Joining as outbound endpoint: " + cfg); _log.info("Joining as outbound endpoint: " + cfg);
TunnelId recvId = cfg.getReceiveTunnel(); TunnelId recvId = cfg.getReceiveTunnel();
OutboundTunnelEndpoint endpoint = new OutboundTunnelEndpoint(_context, cfg, new HopProcessor(_context, cfg, _validator)); OutboundTunnelEndpoint endpoint = new OutboundTunnelEndpoint(_context, cfg, new HopProcessor(_context, cfg, _validator));
synchronized (_outboundEndpoints) { _outboundEndpoints.put(recvId, endpoint);
_outboundEndpoints.put(recvId, endpoint); _participatingConfig.put(recvId, cfg);
}
synchronized (_participatingConfig) {
_participatingConfig.put(recvId, cfg);
}
_context.messageHistory().tunnelJoined("outboundEndpoint", cfg); _context.messageHistory().tunnelJoined("outboundEndpoint", cfg);
_context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0); _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0);
@ -256,12 +239,8 @@ public class TunnelDispatcher implements Service {
//TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper); TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
TunnelId recvId = cfg.getReceiveTunnel(); TunnelId recvId = cfg.getReceiveTunnel();
synchronized (_inboundGateways) { _inboundGateways.put(recvId, gw);
_inboundGateways.put(recvId, gw); _participatingConfig.put(recvId, cfg);
}
synchronized (_participatingConfig) {
_participatingConfig.put(recvId, cfg);
}
_context.messageHistory().tunnelJoined("inboundGateway", cfg); _context.messageHistory().tunnelJoined("inboundGateway", cfg);
_context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0); _context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0);
@ -271,9 +250,7 @@ public class TunnelDispatcher implements Service {
} }
public int getParticipatingCount() { public int getParticipatingCount() {
synchronized (_participatingConfig) { return _participatingConfig.size();
return _participatingConfig.size();
}
} }
/** what is the date/time on which the last non-locally-created tunnel expires? */ /** what is the date/time on which the last non-locally-created tunnel expires? */
@ -287,14 +264,9 @@ public class TunnelDispatcher implements Service {
TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel(); TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("removing our own inbound " + cfg); _log.debug("removing our own inbound " + cfg);
TunnelParticipant participant = null; TunnelParticipant participant = _participants.remove(recvId);
synchronized (_participants) {
participant = (TunnelParticipant)_participants.remove(recvId);
}
if (participant == null) { if (participant == null) {
synchronized (_inboundGateways) { _inboundGateways.remove(recvId);
_inboundGateways.remove(recvId);
}
} else { } else {
// update stats based off getCompleteCount() + getFailedCount() // update stats based off getCompleteCount() + getFailedCount()
for (int i = 0; i < cfg.getLength(); i++) { for (int i = 0; i < cfg.getLength(); i++) {
@ -311,10 +283,7 @@ public class TunnelDispatcher implements Service {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("removing our own outbound " + cfg); _log.debug("removing our own outbound " + cfg);
TunnelId outId = cfg.getConfig(0).getSendTunnel(); TunnelId outId = cfg.getConfig(0).getSendTunnel();
TunnelGateway gw = null; TunnelGateway gw = _outboundGateways.remove(outId);
synchronized (_outboundGateways) {
gw = (TunnelGateway)_outboundGateways.remove(outId);
}
if (gw != null) { if (gw != null) {
// update stats based on gw.getMessagesSent() // update stats based on gw.getMessagesSent()
} }
@ -339,26 +308,17 @@ public class TunnelDispatcher implements Service {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("removing " + cfg); _log.debug("removing " + cfg);
boolean removed = false; boolean removed = (null != _participatingConfig.remove(recvId));
synchronized (_participatingConfig) {
removed = (null != _participatingConfig.remove(recvId));
}
if (!removed) { if (!removed) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Participating tunnel, but no longer listed in participatingConfig? " + cfg); _log.warn("Participating tunnel, but no longer listed in participatingConfig? " + cfg);
} }
synchronized (_participants) { removed = (null != _participants.remove(recvId));
removed = (null != _participants.remove(recvId));
}
if (removed) return; if (removed) return;
synchronized (_inboundGateways) { removed = (null != _inboundGateways.remove(recvId));
removed = (null != _inboundGateways.remove(recvId));
}
if (removed) return; if (removed) return;
synchronized (_outboundEndpoints) { _outboundEndpoints.remove(recvId);
removed = (null != _outboundEndpoints.remove(recvId));
}
} }
/** /**
@ -372,10 +332,7 @@ public class TunnelDispatcher implements Service {
*/ */
public void dispatch(TunnelDataMessage msg, Hash recvFrom) { public void dispatch(TunnelDataMessage msg, Hash recvFrom) {
long before = System.currentTimeMillis(); long before = System.currentTimeMillis();
TunnelParticipant participant = null; TunnelParticipant participant = _participants.get(msg.getTunnelIdObj());
synchronized (_participants) {
participant = (TunnelParticipant)_participants.get(msg.getTunnelIdObj());
}
if (participant != null) { if (participant != null) {
// we are either just a random participant or the inbound endpoint // we are either just a random participant or the inbound endpoint
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -385,10 +342,7 @@ public class TunnelDispatcher implements Service {
participant.dispatch(msg, recvFrom); participant.dispatch(msg, recvFrom);
_context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0); _context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0);
} else { } else {
OutboundTunnelEndpoint endpoint = null; OutboundTunnelEndpoint endpoint = _outboundEndpoints.get(msg.getTunnelIdObj());
synchronized (_outboundEndpoints) {
endpoint = (OutboundTunnelEndpoint)_outboundEndpoints.get(msg.getTunnelIdObj());
}
if (endpoint != null) { if (endpoint != null) {
// we are the outobund endpoint // we are the outobund endpoint
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -421,10 +375,7 @@ public class TunnelDispatcher implements Service {
*/ */
public void dispatch(TunnelGatewayMessage msg) { public void dispatch(TunnelGatewayMessage msg) {
long before = System.currentTimeMillis(); long before = System.currentTimeMillis();
TunnelGateway gw = null; TunnelGateway gw = _inboundGateways.get(msg.getTunnelId());
synchronized (_inboundGateways) {
gw = (TunnelGateway)_inboundGateways.get(msg.getTunnelId());
}
if (gw != null) { if (gw != null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("dispatch where we are the inbound gateway: " + gw + ": " + msg); _log.debug("dispatch where we are the inbound gateway: " + gw + ": " + msg);
@ -489,10 +440,7 @@ public class TunnelDispatcher implements Service {
public void dispatchOutbound(I2NPMessage msg, TunnelId outboundTunnel, TunnelId targetTunnel, Hash targetPeer) { public void dispatchOutbound(I2NPMessage msg, TunnelId outboundTunnel, TunnelId targetTunnel, Hash targetPeer) {
if (outboundTunnel == null) throw new IllegalArgumentException("wtf, null outbound tunnel?"); if (outboundTunnel == null) throw new IllegalArgumentException("wtf, null outbound tunnel?");
long before = _context.clock().now(); long before = _context.clock().now();
TunnelGateway gw = null; TunnelGateway gw = _outboundGateways.get(outboundTunnel);
synchronized (_outboundGateways) {
gw = (TunnelGateway)_outboundGateways.get(outboundTunnel);
}
if (gw != null) { if (gw != null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("dispatch outbound through " + outboundTunnel.getTunnelId() _log.debug("dispatch outbound through " + outboundTunnel.getTunnelId()
@ -538,10 +486,8 @@ public class TunnelDispatcher implements Service {
_context.statManager().addRateData("tunnel.dispatchOutboundTime", dispatchTime, dispatchTime); _context.statManager().addRateData("tunnel.dispatchOutboundTime", dispatchTime, dispatchTime);
} }
public List listParticipatingTunnels() { public List<HopConfig> listParticipatingTunnels() {
synchronized (_participatingConfig) { return new ArrayList(_participatingConfig.values());
return new ArrayList(_participatingConfig.values());
}
} }
/** /**
@ -554,7 +500,7 @@ public class TunnelDispatcher implements Service {
* and computing the average from that. * and computing the average from that.
*/ */
public void updateParticipatingStats() { public void updateParticipatingStats() {
List participating = listParticipatingTunnels(); List<HopConfig> participating = listParticipatingTunnels();
int size = participating.size(); int size = participating.size();
long count = 0; long count = 0;
long bw = 0; long bw = 0;
@ -563,7 +509,7 @@ public class TunnelDispatcher implements Service {
long tooYoung = _context.clock().now() - 60*1000; long tooYoung = _context.clock().now() - 60*1000;
long tooOld = tooYoung - 9*60*1000; long tooOld = tooYoung - 9*60*1000;
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
HopConfig cfg = (HopConfig)participating.get(i); HopConfig cfg = participating.get(i);
long c = cfg.getRecentMessagesCount(); long c = cfg.getRecentMessagesCount();
bw += c; bw += c;
bwOut += cfg.getRecentSentMessagesCount(); bwOut += cfg.getRecentSentMessagesCount();
@ -645,7 +591,7 @@ public class TunnelDispatcher implements Service {
public void dropBiggestParticipating() { public void dropBiggestParticipating() {
List partTunnels = listParticipatingTunnels(); List<HopConfig> partTunnels = listParticipatingTunnels();
if ((partTunnels == null) || (partTunnels.size() == 0)) { if ((partTunnels == null) || (partTunnels.size() == 0)) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Not dropping tunnel, since partTunnels was null or had 0 items!"); _log.error("Not dropping tunnel, since partTunnels was null or had 0 items!");
@ -668,7 +614,7 @@ public class TunnelDispatcher implements Service {
for (int i=0; i<partTunnels.size(); i++) { for (int i=0; i<partTunnels.size(); i++) {
current = (HopConfig)partTunnels.get(i); current = partTunnels.get(i);
long currentMessages = current.getProcessedMessagesCount(); long currentMessages = current.getProcessedMessagesCount();
long currentAge = (_context.clock().now() - current.getCreation()); long currentAge = (_context.clock().now() - current.getCreation());
@ -716,8 +662,8 @@ public class TunnelDispatcher implements Service {
public void renderStatusHTML(Writer out) throws IOException {} public void renderStatusHTML(Writer out) throws IOException {}
private class LeaveTunnel extends JobImpl { private class LeaveTunnel extends JobImpl {
private List _configs; private List<HopConfig> _configs;
private List _times; private List<Long> _times;
public LeaveTunnel(RouterContext ctx) { public LeaveTunnel(RouterContext ctx) {
super(ctx); super(ctx);
@ -765,12 +711,12 @@ public class TunnelDispatcher implements Service {
synchronized (LeaveTunnel.this) { synchronized (LeaveTunnel.this) {
if (_configs.size() <= 0) if (_configs.size() <= 0)
return; return;
nextTime = (Long)_times.get(0); nextTime = _times.get(0);
if (nextTime.longValue() <= now) { if (nextTime.longValue() <= now) {
cur = (HopConfig)_configs.remove(0); cur = _configs.remove(0);
_times.remove(0); _times.remove(0);
if (_times.size() > 0) if (_times.size() > 0)
nextTime = (Long)_times.get(0); nextTime = _times.get(0);
else else
nextTime = null; nextTime = null;
} else { } else {