* Peer Manager:

- Replace some locks with concurrent
      - Switch back to fast version of getPeersByCapability()
      - Java 5 cleanup
This commit is contained in:
zzz
2010-03-08 22:02:42 +00:00
parent 9b05d8e774
commit 91e854e99c
5 changed files with 97 additions and 78 deletions

View File

@ -24,8 +24,8 @@ class DummyPeerManagerFacade implements PeerManagerFacade {
public void startup() {}
public void restart() {}
public void renderStatusHTML(Writer out) { }
public List selectPeers(PeerSelectionCriteria criteria) { return null; }
public List getPeersByCapability(char capability) { return null; }
public List<Hash> selectPeers(PeerSelectionCriteria criteria) { return null; }
public List<Hash> getPeersByCapability(char capability) { return null; }
public void setCapabilities(Hash peer, String caps) {}
public void removeCapabilities(Hash peer) {}
public Hash selectRandomByCapability(char capability) { return null; }

View File

@ -25,8 +25,8 @@ public interface PeerManagerFacade extends Service {
*
* @return List of Hash objects of the RouterIdentity for matching peers
*/
public List selectPeers(PeerSelectionCriteria criteria);
public List getPeersByCapability(char capability);
public List<Hash> selectPeers(PeerSelectionCriteria criteria);
public List<Hash> getPeersByCapability(char capability);
public void setCapabilities(Hash peer, String caps);
public void removeCapabilities(Hash peer);
public Hash selectRandomByCapability(char capability);

View File

@ -11,12 +11,12 @@ package net.i2p.router.peermanager;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
@ -26,6 +26,7 @@ import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
import net.i2p.util.ConcurrentHashSet;
/**
* Manage the current state of the statistics
@ -46,8 +47,8 @@ class PeerManager {
private RouterContext _context;
private ProfileOrganizer _organizer;
private ProfilePersistenceHelper _persistenceHelper;
private List _peersByCapability[];
private final Map _capabilitiesByPeer;
private Set<Hash> _peersByCapability[];
private final Map<Hash, String> _capabilitiesByPeer;
public PeerManager(RouterContext context) {
_context = context;
@ -55,10 +56,10 @@ class PeerManager {
_persistenceHelper = new ProfilePersistenceHelper(context);
_organizer = context.profileOrganizer();
_organizer.setUs(context.routerHash());
_capabilitiesByPeer = new HashMap(128);
_peersByCapability = new List[26];
_capabilitiesByPeer = new ConcurrentHashMap(128);
_peersByCapability = new Set[26];
for (int i = 0; i < _peersByCapability.length; i++)
_peersByCapability[i] = new ArrayList(64);
_peersByCapability[i] = new ConcurrentHashSet();
loadProfiles();
////_context.jobQueue().addJob(new EvaluateProfilesJob(_context));
SimpleScheduler.getInstance().addPeriodicEvent(new Reorg(), 0, 45*1000);
@ -77,14 +78,16 @@ class PeerManager {
void storeProfiles() {
Set peers = selectPeers();
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
for (Iterator<Hash> iter = peers.iterator(); iter.hasNext(); ) {
Hash peer = iter.next();
storeProfile(peer);
}
}
Set selectPeers() {
return _organizer.selectAllPeers();
}
void storeProfile(Hash peer) {
if (peer == null) return;
PeerProfile prof = _organizer.getProfile(peer);
@ -92,10 +95,11 @@ class PeerManager {
if (true)
_persistenceHelper.writeProfile(prof);
}
void loadProfiles() {
Set profiles = _persistenceHelper.readProfiles();
for (Iterator iter = profiles.iterator(); iter.hasNext();) {
PeerProfile prof = (PeerProfile)iter.next();
Set<PeerProfile> profiles = _persistenceHelper.readProfiles();
for (Iterator<PeerProfile> iter = profiles.iterator(); iter.hasNext();) {
PeerProfile prof = iter.next();
if (prof != null) {
_organizer.addProfile(prof);
if (_log.shouldLog(Log.DEBUG))
@ -107,10 +111,11 @@ class PeerManager {
/**
* Find some peers that meet the criteria and we have the netDb info for locally
*
* Only used by PeerTestJob (PURPOSE_TEST)
*/
List selectPeers(PeerSelectionCriteria criteria) {
Set peers = new HashSet(criteria.getMinimumRequired());
Set exclude = new HashSet(1);
List<Hash> selectPeers(PeerSelectionCriteria criteria) {
Set<Hash> peers = new HashSet(criteria.getMinimumRequired());
Set<Hash> exclude = new HashSet(1);
exclude.add(_context.routerHash());
switch (criteria.getPurpose()) {
case PeerSelectionCriteria.PURPOSE_TEST:
@ -143,10 +148,10 @@ class PeerManager {
default:
break;
}
if (peers.size() <= 0) {
if (peers.isEmpty()) {
if (_log.shouldLog(Log.WARN))
_log.warn("We ran out of peers when looking for reachable ones after finding "
+ peers.size() + " with "
+ "0 with "
+ _organizer.countWellIntegratedPeers() + "/"
+ _organizer.countHighCapacityPeers() + "/"
+ _organizer.countFastPeers() + " integrated/high capacity/fast peers");
@ -160,18 +165,18 @@ class PeerManager {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Setting capabilities for " + peer.toBase64() + " to " + caps);
if (caps != null) caps = caps.toLowerCase();
synchronized (_capabilitiesByPeer) {
String oldCaps = null;
if (caps != null)
oldCaps = (String)_capabilitiesByPeer.put(peer, caps);
oldCaps = _capabilitiesByPeer.put(peer, caps);
else
oldCaps = (String)_capabilitiesByPeer.remove(peer);
oldCaps = _capabilitiesByPeer.remove(peer);
if (oldCaps != null) {
for (int i = 0; i < oldCaps.length(); i++) {
char c = oldCaps.charAt(i);
if ( (caps == null) || (caps.indexOf(c) < 0) ) {
List peers = locked_getPeers(c);
Set<Hash> peers = locked_getPeers(c);
if (peers != null)
peers.remove(peer);
}
@ -182,15 +187,15 @@ class PeerManager {
char c = caps.charAt(i);
if ( (oldCaps != null) && (oldCaps.indexOf(c) >= 0) )
continue;
List peers = locked_getPeers(c);
if ( (peers != null) && (!peers.contains(peer)) )
Set<Hash> peers = locked_getPeers(c);
if (peers != null)
peers.add(peer);
}
}
}
}
private List locked_getPeers(char c) {
/** locking no longer req'd */
private Set<Hash> locked_getPeers(char c) {
c = Character.toLowerCase(c);
int i = c - 'a';
if ( (i < 0) || (i >= _peersByCapability.length) ) {
@ -204,18 +209,19 @@ class PeerManager {
public void removeCapabilities(Hash peer) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing capabilities from " + peer.toBase64());
synchronized (_capabilitiesByPeer) {
String oldCaps = (String)_capabilitiesByPeer.remove(peer);
if (oldCaps != null) {
for (int i = 0; i < oldCaps.length(); i++) {
char c = oldCaps.charAt(i);
List peers = locked_getPeers(c);
Set<Hash> peers = locked_getPeers(c);
if (peers != null)
peers.remove(peer);
}
}
}
}
/*******
public Hash selectRandomByCapability(char capability) {
int index = _context.random().nextInt(Integer.MAX_VALUE);
synchronized (_capabilitiesByPeer) {
@ -227,20 +233,29 @@ class PeerManager {
}
return null;
}
public List getPeersByCapability(char capability) {
if (false) {
synchronized (_capabilitiesByPeer) {
List peers = locked_getPeers(capability);
********/
/**
* The only user of this is TunnelPeerSelector for unreachables?
*/
public List<Hash> getPeersByCapability(char capability) {
if (true) {
Set<Hash> peers = locked_getPeers(capability);
if (peers != null)
return new ArrayList(peers);
}
return null;
} else {
// Wow this looks really slow...
// What is the point of keeping all the data structures above
// if we are going to go through the whole netdb anyway?
// Not sure why jrandom switched to do it this way,
// the checkin comments aren't clear...
// Since the locking is gone, switch back to the above.
FloodfillNetworkDatabaseFacade f = (FloodfillNetworkDatabaseFacade)_context.netDb();
List routerInfos = f.getKnownRouterData();
List rv = new ArrayList();
for (Iterator iter = routerInfos.iterator(); iter.hasNext(); ) {
RouterInfo ri = (RouterInfo)iter.next();
List<RouterInfo> routerInfos = f.getKnownRouterData();
List<Hash> rv = new ArrayList();
for (Iterator<RouterInfo> iter = routerInfos.iterator(); iter.hasNext(); ) {
RouterInfo ri = iter.next();
String caps = ri.getCapabilities();
if (caps.indexOf(capability) >= 0)
rv.add(ri.getIdentity().calculateHash());

View File

@ -57,7 +57,7 @@ public class PeerManagerFacadeImpl implements PeerManagerFacade {
_manager.loadProfiles();
}
public List selectPeers(PeerSelectionCriteria criteria) {
public List<Hash> selectPeers(PeerSelectionCriteria criteria) {
return _manager.selectPeers(criteria);
}
@ -69,11 +69,15 @@ public class PeerManagerFacadeImpl implements PeerManagerFacade {
if (_manager == null) return;
_manager.removeCapabilities(peer);
}
/** @deprecated unused */
public Hash selectRandomByCapability(char capability) {
if (_manager == null) return null;
return _manager.selectRandomByCapability(capability);
//if (_manager == null) return null;
//return _manager.selectRandomByCapability(capability);
return null;
}
public List getPeersByCapability(char capability) {
public List<Hash> getPeersByCapability(char capability) {
if (_manager == null) return new ArrayList(0);
return _manager.getPeersByCapability(capability);
}

View File

@ -217,7 +217,7 @@ public class ProfileOrganizer {
return activePeers;
}
private boolean isX(Map m, Hash peer) {
private boolean isX(Map<Hash, PeerProfile> m, Hash peer) {
getReadLock();
try {
return m.containsKey(peer);
@ -272,10 +272,10 @@ public class ProfileOrganizer {
* @param matches set to store the return value in
*
*/
public void selectFastPeers(int howMany, Set exclude, Set matches) {
public void selectFastPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
selectFastPeers(howMany, exclude, matches, 0);
}
public void selectFastPeers(int howMany, Set exclude, Set matches, int mask) {
public void selectFastPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
getReadLock();
try {
locked_selectPeers(_fastPeers, howMany, exclude, matches, mask);
@ -295,10 +295,10 @@ public class ProfileOrganizer {
* Return a set of Hashes for peers that have a high capacity
*
*/
public void selectHighCapacityPeers(int howMany, Set exclude, Set matches) {
public void selectHighCapacityPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
selectHighCapacityPeers(howMany, exclude, matches, 0);
}
public void selectHighCapacityPeers(int howMany, Set exclude, Set matches, int mask) {
public void selectHighCapacityPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
getReadLock();
try {
// we only use selectHighCapacityPeers when we are selecting for PURPOSE_TEST
@ -326,10 +326,10 @@ public class ProfileOrganizer {
* Return a set of Hashes for peers that are well integrated into the network.
*
*/
public void selectWellIntegratedPeers(int howMany, Set exclude, Set matches) {
public void selectWellIntegratedPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
selectWellIntegratedPeers(howMany, exclude, matches, 0);
}
public void selectWellIntegratedPeers(int howMany, Set exclude, Set matches, int mask) {
public void selectWellIntegratedPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
getReadLock();
try {
locked_selectPeers(_wellIntegratedPeers, howMany, exclude, matches, mask);
@ -350,13 +350,13 @@ public class ProfileOrganizer {
* we are already talking with
*
*/
public void selectNotFailingPeers(int howMany, Set exclude, Set matches) {
public void selectNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
selectNotFailingPeers(howMany, exclude, matches, false, 0);
}
public void selectNotFailingPeers(int howMany, Set exclude, Set matches, int mask) {
public void selectNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
selectNotFailingPeers(howMany, exclude, matches, false, mask);
}
public void selectNotFailingPeers(int howMany, Set exclude, Set matches, boolean onlyNotFailing) {
public void selectNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, boolean onlyNotFailing) {
selectNotFailingPeers(howMany, exclude, matches, onlyNotFailing, 0);
}
/**
@ -368,7 +368,7 @@ public class ProfileOrganizer {
* @param matches set to store the matches in
* @param onlyNotFailing if true, don't include any high capacity peers
*/
public void selectNotFailingPeers(int howMany, Set exclude, Set matches, boolean onlyNotFailing, int mask) {
public void selectNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, boolean onlyNotFailing, int mask) {
if (matches.size() < howMany)
selectAllNotFailingPeers(howMany, exclude, matches, onlyNotFailing, mask);
return;
@ -388,7 +388,7 @@ public class ProfileOrganizer {
* @param exclude non-null
* No mask parameter, to be fixed
*/
public void selectActiveNotFailingPeers(int howMany, Set exclude, Set matches) {
public void selectActiveNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
if (matches.size() < howMany) {
getReadLock();
try {
@ -412,7 +412,7 @@ public class ProfileOrganizer {
*
* This DOES cascade further to non-connected peers.
*/
private void selectActiveNotFailingPeers2(int howMany, Set exclude, Set matches, int mask) {
private void selectActiveNotFailingPeers2(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
if (matches.size() < howMany) {
Map<Hash, PeerProfile> activePeers = new HashMap();
getReadLock();
@ -439,14 +439,14 @@ public class ProfileOrganizer {
* Return a set of Hashes for peers that are not failing.
*
*/
public void selectAllNotFailingPeers(int howMany, Set exclude, Set matches, boolean onlyNotFailing) {
public void selectAllNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, boolean onlyNotFailing) {
selectAllNotFailingPeers(howMany, exclude, matches, onlyNotFailing, 0);
}
/**
* @param mask ignored, should call locked_selectPeers, to be fixed
*
*/
private void selectAllNotFailingPeers(int howMany, Set exclude, Set matches, boolean onlyNotFailing, int mask) {
private void selectAllNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, boolean onlyNotFailing, int mask) {
if (matches.size() < howMany) {
int orig = matches.size();
int needed = howMany - orig;
@ -495,7 +495,7 @@ public class ProfileOrganizer {
* I'm not quite sure why you'd want this... (other than for failover from the better results)
*
*/
public void selectFailingPeers(int howMany, Set exclude, Set matches) {
public void selectFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
getReadLock();
try {
locked_selectPeers(_failingPeers, howMany, exclude, matches);
@ -564,12 +564,12 @@ public class ProfileOrganizer {
* recent == last 20s
*
*/
public List selectPeersRecentlyRejecting() {
public List<Hash> selectPeersRecentlyRejecting() {
getReadLock();
try {
long cutoff = _context.clock().now() - (20*1000);
int count = _notFailingPeers.size();
List l = new ArrayList(count / 128);
List<Hash> l = new ArrayList(count / 128);
for (Iterator<PeerProfile> iter = _notFailingPeers.values().iterator(); iter.hasNext(); ) {
PeerProfile prof = iter.next();
if (prof.getTunnelHistory().getLastRejectedBandwidth() > cutoff)
@ -583,10 +583,10 @@ public class ProfileOrganizer {
* Find the hashes for all peers we are actively profiling
*
*/
public Set selectAllPeers() {
public Set<Hash> selectAllPeers() {
getReadLock();
try {
Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
Set<Hash> allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
allPeers.addAll(_failingPeers.keySet());
allPeers.addAll(_notFailingPeers.keySet());
allPeers.addAll(_highCapacityPeers.keySet());
@ -853,10 +853,10 @@ public class ProfileOrganizer {
* high capacity group to define the integration threshold.
*
*/
private void locked_calculateThresholds(Set allPeers) {
private void locked_calculateThresholds(Set<PeerProfile> allPeers) {
double totalCapacity = 0;
double totalIntegration = 0;
Set reordered = new TreeSet(_comp);
Set<PeerProfile> reordered = new TreeSet(_comp);
for (Iterator<PeerProfile> iter = allPeers.iterator(); iter.hasNext(); ) {
PeerProfile profile = iter.next();
@ -895,7 +895,7 @@ public class ProfileOrganizer {
* (highest first) for active nonfailing peers whose
* capacity is greater than the growth factor
*/
private void locked_calculateCapacityThreshold(double totalCapacity, Set reordered) {
private void locked_calculateCapacityThreshold(double totalCapacity, Set<PeerProfile> reordered) {
int numNotFailing = reordered.size();
double meanCapacity = avg(totalCapacity, numNotFailing);
@ -964,7 +964,7 @@ public class ProfileOrganizer {
* @param reordered ordered set of PeerProfile objects, ordered by capacity
* (highest first) for active nonfailing peers
*/
private void locked_calculateSpeedThreshold(Set reordered) {
private void locked_calculateSpeedThreshold(Set<PeerProfile> reordered) {
if (true) {
locked_calculateSpeedThresholdMean(reordered);
return;
@ -996,7 +996,7 @@ public class ProfileOrganizer {
*****/
}
private void locked_calculateSpeedThresholdMean(Set reordered) {
private void locked_calculateSpeedThresholdMean(Set<PeerProfile> reordered) {
double total = 0;
int count = 0;
for (Iterator<PeerProfile> iter = reordered.iterator(); iter.hasNext(); ) {
@ -1040,10 +1040,10 @@ public class ProfileOrganizer {
* matches set until it has howMany elements in it.
*
*/
private void locked_selectPeers(Map peers, int howMany, Set toExclude, Set matches) {
private void locked_selectPeers(Map<Hash, PeerProfile> peers, int howMany, Set<Hash> toExclude, Set<Hash> matches) {
locked_selectPeers(peers, howMany, toExclude, matches, 0);
}
private void locked_selectPeers(Map peers, int howMany, Set toExclude, Set matches, int mask) {
private void locked_selectPeers(Map<Hash, PeerProfile> peers, int howMany, Set<Hash> toExclude, Set<Hash> matches, int mask) {
List all = new ArrayList(peers.keySet());
if (toExclude != null)
all.removeAll(toExclude);
@ -1051,7 +1051,7 @@ public class ProfileOrganizer {
all.removeAll(matches);
all.remove(_us);
Collections.shuffle(all, _random);
Set IPSet = new HashSet(8);
Set<Integer> IPSet = new HashSet(8);
for (int i = 0; (matches.size() < howMany) && (i < all.size()); i++) {
Hash peer = (Hash)all.get(i);
boolean ok = isSelectable(peer);
@ -1073,8 +1073,8 @@ public class ProfileOrganizer {
* @param mask is 1-4 (number of bytes to match)
* @param IPMatches all IPs so far, modified by this routine
*/
private boolean notRestricted(Hash peer, Set IPSet, int mask) {
Set peerIPs = maskedIPSet(peer, mask);
private boolean notRestricted(Hash peer, Set<Integer> IPSet, int mask) {
Set<Integer> peerIPs = maskedIPSet(peer, mask);
if (containsAny(IPSet, peerIPs))
return false;
IPSet.addAll(peerIPs);
@ -1087,8 +1087,8 @@ public class ProfileOrganizer {
*
* @return an opaque set of masked IPs for this peer
*/
private Set maskedIPSet(Hash peer, int mask) {
Set rv = new HashSet(2);
private Set<Integer> maskedIPSet(Hash peer, int mask) {
Set<Integer> rv = new HashSet(2);
byte[] commIP = _context.commSystem().getIP(peer);
if (commIP != null)
rv.add(maskedIP(commIP, mask));