* Blocklist: cleanup

* PeerProfile:
     - Replace a hot lock with concurrent RW lock
     - Rewrite ugly IP Restriction code
     - Also use transport IP in restriction code
   * Transport: Start the previously unused CleanupUnreachable
This commit is contained in:
zzz
2009-04-23 00:23:29 +00:00
parent 3ddd5f2a51
commit 2f46efe78d
4 changed files with 203 additions and 165 deletions

View File

@ -19,6 +19,7 @@ import net.i2p.data.Hash;
import net.i2p.data.RouterAddress; import net.i2p.data.RouterAddress;
import net.i2p.data.RouterInfo; import net.i2p.data.RouterInfo;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -55,20 +56,16 @@ public class Blocklist {
private int _blocklistSize; private int _blocklistSize;
private final Object _lock = new Object(); private final Object _lock = new Object();
private Entry _wrapSave; private Entry _wrapSave;
private final Set _inProcess = new HashSet(0); private final Set<Hash> _inProcess = new HashSet(0);
private Map _peerBlocklist = new HashMap(0); private Map<Hash, String> _peerBlocklist = new HashMap(0);
private final Set _singleIPBlocklist = new HashSet(0); private final Set<Integer> _singleIPBlocklist = new ConcurrentHashSet(0);
public Blocklist(RouterContext context) { public Blocklist(RouterContext context) {
_context = context; _context = context;
_log = context.logManager().getLog(Blocklist.class); _log = context.logManager().getLog(Blocklist.class);
_blocklist = null; _blocklist = null;
_blocklistSize = 0; _blocklistSize = 0;
// _lock = new Object();
_wrapSave = null; _wrapSave = null;
// _inProcess = new HashSet(0);
// _peerBlocklist = new HashMap(0);
// _singleIPBlocklist = new HashSet(0);
} }
public Blocklist() { public Blocklist() {
@ -446,15 +443,11 @@ public class Blocklist {
} }
private boolean add(int ip) { private boolean add(int ip) {
synchronized(_singleIPBlocklist) { return _singleIPBlocklist.add(Integer.valueOf(ip));
return _singleIPBlocklist.add(new Integer(ip));
}
} }
private boolean isOnSingleList(int ip) { private boolean isOnSingleList(int ip) {
synchronized(_singleIPBlocklist) { return _singleIPBlocklist.contains(Integer.valueOf(ip));
return _singleIPBlocklist.contains(new Integer(ip));
}
} }
/** /**
@ -586,11 +579,11 @@ public class Blocklist {
// methods to get and store the from/to values in the array // methods to get and store the from/to values in the array
private int getFrom(long entry) { private static int getFrom(long entry) {
return (int) ((entry >> 32) & 0xffffffff); return (int) ((entry >> 32) & 0xffffffff);
} }
private int getTo(long entry) { private static int getTo(long entry) {
return (int) (entry & 0xffffffff); return (int) (entry & 0xffffffff);
} }
@ -602,7 +595,7 @@ public class Blocklist {
* So the size is (cough) almost 2MB for the 240,000 line splist.txt. * So the size is (cough) almost 2MB for the 240,000 line splist.txt.
* *
*/ */
private long toEntry(byte ip1[], byte ip2[]) { private static long toEntry(byte ip1[], byte ip2[]) {
long entry = 0; long entry = 0;
for (int i = 0; i < 4; i++) for (int i = 0; i < 4; i++)
entry |= ((long) (ip2[i] & 0xff)) << ((3-i)*8); entry |= ((long) (ip2[i] & 0xff)) << ((3-i)*8);
@ -621,14 +614,18 @@ public class Blocklist {
_blocklist[idx] = entry; _blocklist[idx] = entry;
} }
private int toInt(byte ip[]) { private static int toInt(byte ip[]) {
int rv = 0; int rv = 0;
for (int i = 0; i < 4; i++) for (int i = 0; i < 4; i++)
rv |= (ip[i] & 0xff) << ((3-i)*8); rv |= (ip[i] & 0xff) << ((3-i)*8);
return rv; return rv;
} }
private String toStr(long entry) { public static String toStr(byte[] ip) {
return toStr(toInt(ip));
}
private static String toStr(long entry) {
StringBuffer buf = new StringBuffer(32); StringBuffer buf = new StringBuffer(32);
for (int i = 7; i >= 0; i--) { for (int i = 7; i >= 0; i--) {
buf.append((entry >> (8*i)) & 0xff); buf.append((entry >> (8*i)) & 0xff);
@ -640,7 +637,7 @@ public class Blocklist {
return buf.toString(); return buf.toString();
} }
private String toStr(int ip) { private static String toStr(int ip) {
StringBuffer buf = new StringBuffer(16); StringBuffer buf = new StringBuffer(16);
for (int i = 3; i >= 0; i--) { for (int i = 3; i >= 0; i--) {
buf.append((ip >> (8*i)) & 0xff); buf.append((ip >> (8*i)) & 0xff);
@ -756,9 +753,7 @@ public class Blocklist {
public void renderStatusHTML(Writer out) throws IOException { public void renderStatusHTML(Writer out) throws IOException {
out.write("<h2>IP Blocklist</h2>"); out.write("<h2>IP Blocklist</h2>");
Set singles = new TreeSet(); Set singles = new TreeSet();
synchronized(_singleIPBlocklist) { singles.addAll(_singleIPBlocklist);
singles.addAll(_singleIPBlocklist);
}
if (singles.size() > 0) { if (singles.size() > 0) {
out.write("<table><tr><td><b>Transient IPs</b></td></tr>"); out.write("<table><tr><td><b>Transient IPs</b></td></tr>");
for (Iterator iter = singles.iterator(); iter.hasNext(); ) { for (Iterator iter = singles.iterator(); iter.hasNext(); ) {

View File

@ -58,6 +58,7 @@ public abstract class CommSystemFacade implements Service {
public boolean isBacklogged(Hash dest) { return false; } public boolean isBacklogged(Hash dest) { return false; }
public boolean wasUnreachable(Hash dest) { return false; } public boolean wasUnreachable(Hash dest) { return false; }
public boolean isEstablished(Hash dest) { return false; } public boolean isEstablished(Hash dest) { return false; }
public byte[] getIP(Hash dest) { return null; }
/** /**
* Tell other transports our address changed * Tell other transports our address changed

View File

@ -19,6 +19,8 @@ import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.RouterAddress; import net.i2p.data.RouterAddress;
@ -40,17 +42,17 @@ public class ProfileOrganizer {
private Log _log; private Log _log;
private RouterContext _context; private RouterContext _context;
/** H(routerIdentity) to PeerProfile for all peers that are fast and high capacity*/ /** H(routerIdentity) to PeerProfile for all peers that are fast and high capacity*/
private Map _fastPeers; private Map<Hash, PeerProfile> _fastPeers;
/** H(routerIdentity) to PeerProfile for all peers that have high capacities */ /** H(routerIdentity) to PeerProfile for all peers that have high capacities */
private Map _highCapacityPeers; private Map<Hash, PeerProfile> _highCapacityPeers;
/** H(routerIdentity) to PeerProfile for all peers that well integrated into the network and not failing horribly */ /** H(routerIdentity) to PeerProfile for all peers that well integrated into the network and not failing horribly */
private Map _wellIntegratedPeers; private Map<Hash, PeerProfile> _wellIntegratedPeers;
/** H(routerIdentity) to PeerProfile for all peers that are not failing horribly */ /** H(routerIdentity) to PeerProfile for all peers that are not failing horribly */
private Map _notFailingPeers; private Map<Hash, PeerProfile> _notFailingPeers;
/** H(routerIdnetity), containing elements in _notFailingPeers */ /** H(routerIdnetity), containing elements in _notFailingPeers */
private List _notFailingPeersList; private List<Hash> _notFailingPeersList;
/** H(routerIdentity) to PeerProfile for all peers that ARE failing horribly (but that we haven't dropped reference to yet) */ /** H(routerIdentity) to PeerProfile for all peers that ARE failing horribly (but that we haven't dropped reference to yet) */
private Map _failingPeers; private Map<Hash, PeerProfile> _failingPeers;
/** who are we? */ /** who are we? */
private Hash _us; private Hash _us;
private ProfilePersistenceHelper _persistenceHelper; private ProfilePersistenceHelper _persistenceHelper;
@ -84,7 +86,7 @@ public class ProfileOrganizer {
public static final int DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS = 10; public static final int DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS = 10;
/** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */ /** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */
private final Object _reorganizeLock = new Object(); private final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock(true);
/** incredibly weak PRNG, just used for shuffling peers. no need to waste the real PRNG on this */ /** incredibly weak PRNG, just used for shuffling peers. no need to waste the real PRNG on this */
private Random _random = new Random(); private Random _random = new Random();
@ -112,6 +114,29 @@ public class ProfileOrganizer {
_context.statManager().createRateStat("peer.profileReorgTime", "How long the reorg takes overall", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("peer.profileReorgTime", "How long the reorg takes overall", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
} }
private void getReadLock() {
_reorganizeLock.readLock().lock();
}
private void releaseReadLock() {
_reorganizeLock.readLock().unlock();
}
/** @return true if the lock was acquired */
private boolean getWriteLock() {
try {
boolean rv = _reorganizeLock.writeLock().tryLock(5000, TimeUnit.MILLISECONDS);
if (!rv)
_log.error("no lock, size is: " + _reorganizeLock.getQueueLength(), new Exception("rats"));
return rv;
} catch (InterruptedException ie) {}
return false;
}
private void releaseWriteLock() {
_reorganizeLock.writeLock().unlock();
}
public void setUs(Hash us) { _us = us; } public void setUs(Hash us) { _us = us; }
Hash getUs() { return _us; } Hash getUs() { return _us; }
@ -124,42 +149,52 @@ public class ProfileOrganizer {
* *
*/ */
public PeerProfile getProfile(Hash peer) { public PeerProfile getProfile(Hash peer) {
synchronized (_reorganizeLock) { getReadLock();
try {
return locked_getProfile(peer); return locked_getProfile(peer);
} } finally { releaseReadLock(); }
} }
/** /**
* Add the new profile, returning the old value (or null if no profile existed) * Add the new profile, returning the old value (or null if no profile existed)
* *
*/ */
public PeerProfile addProfile(PeerProfile profile) throws IllegalStateException { public PeerProfile addProfile(PeerProfile profile) {
if ( (profile == null) || (profile.getPeer() == null) ) return null; if ( (profile == null) || (profile.getPeer() == null) ) return null;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("New profile created for " + profile.getPeer().toBase64()); _log.debug("New profile created for " + profile.getPeer().toBase64());
synchronized (_reorganizeLock) { PeerProfile old = getProfile(profile.getPeer());
PeerProfile old = locked_getProfile(profile.getPeer()); profile.coalesceStats();
profile.coalesceStats(); if (!getWriteLock())
return old;
try {
locked_placeProfile(profile); locked_placeProfile(profile);
_strictCapacityOrder.add(profile); _strictCapacityOrder.add(profile);
return old; } finally { releaseWriteLock(); }
} return old;
} }
public int countFastPeers() { synchronized (_reorganizeLock) { return _fastPeers.size(); } } private int count(Map m) {
public int countHighCapacityPeers() { synchronized (_reorganizeLock) { return _highCapacityPeers.size(); } } getReadLock();
public int countWellIntegratedPeers() { synchronized (_reorganizeLock) { return _wellIntegratedPeers.size(); } } try {
public int countNotFailingPeers() { synchronized (_reorganizeLock) { return _notFailingPeers.size(); } } return m.size();
public int countFailingPeers() { synchronized (_reorganizeLock) { return _failingPeers.size(); } } } finally { releaseReadLock(); }
}
public int countFastPeers() { return count(_fastPeers); }
public int countHighCapacityPeers() { return count(_highCapacityPeers); }
public int countWellIntegratedPeers() { return count(_wellIntegratedPeers); }
public int countNotFailingPeers() { return count(_notFailingPeers); }
public int countFailingPeers() { return count(_failingPeers); }
public int countActivePeers() { public int countActivePeers() {
synchronized (_reorganizeLock) { int activePeers = 0;
int activePeers = 0; long hideBefore = _context.clock().now() - 6*60*60*1000;
long hideBefore = _context.clock().now() - 6*60*60*1000; getReadLock();
try {
for (Iterator iter = _failingPeers.values().iterator(); iter.hasNext(); ) { for (Iterator iter = _failingPeers.values().iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next(); PeerProfile profile = (PeerProfile)iter.next();
if (profile.getLastSendSuccessful() >= hideBefore) if (profile.getLastSendSuccessful() >= hideBefore)
@ -174,15 +209,21 @@ public class ProfileOrganizer {
else if (profile.getLastHeardFrom() >= hideBefore) else if (profile.getLastHeardFrom() >= hideBefore)
activePeers++; activePeers++;
} }
} finally { releaseReadLock(); }
return activePeers; return activePeers;
}
} }
public boolean isFast(Hash peer) { synchronized (_reorganizeLock) { return _fastPeers.containsKey(peer); } } private boolean isX(Map m, Hash peer) {
public boolean isHighCapacity(Hash peer) { synchronized (_reorganizeLock) { return _highCapacityPeers.containsKey(peer); } } getReadLock();
public boolean isWellIntegrated(Hash peer) { synchronized (_reorganizeLock) { return _wellIntegratedPeers.containsKey(peer); } } try {
public boolean isFailing(Hash peer) { synchronized (_reorganizeLock) { return _failingPeers.containsKey(peer); } } return m.containsKey(peer);
} finally { releaseReadLock(); }
}
public boolean isFast(Hash peer) { return isX(_fastPeers, peer); }
public boolean isHighCapacity(Hash peer) { return isX(_highCapacityPeers, peer); }
public boolean isWellIntegrated(Hash peer) { return isX(_wellIntegratedPeers, peer); }
public boolean isFailing(Hash peer) { return isX(_failingPeers, peer); }
/** /**
* if a peer sends us more than 5 replies in a searchReply that we cannot * if a peer sends us more than 5 replies in a searchReply that we cannot
@ -236,9 +277,10 @@ public class ProfileOrganizer {
selectFastPeers(howMany, exclude, matches, 0); selectFastPeers(howMany, exclude, matches, 0);
} }
public void selectFastPeers(int howMany, Set exclude, Set matches, int mask) { public void selectFastPeers(int howMany, Set exclude, Set matches, int mask) {
synchronized (_reorganizeLock) { getReadLock();
try {
locked_selectPeers(_fastPeers, howMany, exclude, matches, mask); locked_selectPeers(_fastPeers, howMany, exclude, matches, mask);
} } finally { releaseReadLock(); }
if (matches.size() < howMany) { if (matches.size() < howMany) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("selectFastPeers("+howMany+"), not enough fast (" + matches.size() + ") going on to highCap"); _log.info("selectFastPeers("+howMany+"), not enough fast (" + matches.size() + ") going on to highCap");
@ -258,7 +300,8 @@ public class ProfileOrganizer {
selectHighCapacityPeers(howMany, exclude, matches, 0); selectHighCapacityPeers(howMany, exclude, matches, 0);
} }
public void selectHighCapacityPeers(int howMany, Set exclude, Set matches, int mask) { public void selectHighCapacityPeers(int howMany, Set exclude, Set matches, int mask) {
synchronized (_reorganizeLock) { getReadLock();
try {
// we only use selectHighCapacityPeers when we are selecting for PURPOSE_TEST // we only use selectHighCapacityPeers when we are selecting for PURPOSE_TEST
// or we are falling back due to _fastPeers being too small, so we can always // or we are falling back due to _fastPeers being too small, so we can always
// exclude the fast peers // exclude the fast peers
@ -269,7 +312,7 @@ public class ProfileOrganizer {
exclude.addAll(_fastPeers.keySet()); exclude.addAll(_fastPeers.keySet());
*/ */
locked_selectPeers(_highCapacityPeers, howMany, exclude, matches, mask); locked_selectPeers(_highCapacityPeers, howMany, exclude, matches, mask);
} } finally { releaseReadLock(); }
if (matches.size() < howMany) { if (matches.size() < howMany) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("selectHighCap("+howMany+"), not enough fast (" + matches.size() + ") going on to notFailing"); _log.info("selectHighCap("+howMany+"), not enough fast (" + matches.size() + ") going on to notFailing");
@ -288,9 +331,10 @@ public class ProfileOrganizer {
selectWellIntegratedPeers(howMany, exclude, matches, 0); selectWellIntegratedPeers(howMany, exclude, matches, 0);
} }
public void selectWellIntegratedPeers(int howMany, Set exclude, Set matches, int mask) { public void selectWellIntegratedPeers(int howMany, Set exclude, Set matches, int mask) {
synchronized (_reorganizeLock) { getReadLock();
try {
locked_selectPeers(_wellIntegratedPeers, howMany, exclude, matches, mask); locked_selectPeers(_wellIntegratedPeers, howMany, exclude, matches, mask);
} } finally { releaseReadLock(); }
if (matches.size() < howMany) { if (matches.size() < howMany) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("selectWellIntegrated("+howMany+"), not enough integrated (" + matches.size() + ") going on to notFailing"); _log.info("selectWellIntegrated("+howMany+"), not enough integrated (" + matches.size() + ") going on to notFailing");
@ -375,7 +419,8 @@ public class ProfileOrganizer {
int needed = howMany - orig; int needed = howMany - orig;
int start = 0; int start = 0;
List selected = new ArrayList(needed); List selected = new ArrayList(needed);
synchronized (_reorganizeLock) { getReadLock();
try {
// we randomize the whole list when rebuilding it, but randomizing // we randomize the whole list when rebuilding it, but randomizing
// the entire list on each peer selection is a bit crazy // the entire list on each peer selection is a bit crazy
start = _context.random().nextInt(_notFailingPeersList.size()); start = _context.random().nextInt(_notFailingPeersList.size());
@ -397,7 +442,7 @@ public class ProfileOrganizer {
_log.debug("Not selectable: " + cur.toBase64()); _log.debug("Not selectable: " + cur.toBase64());
} }
} }
} } finally { releaseReadLock(); }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Selecting all not failing (strict? " + onlyNotFailing + " start=" + start _log.info("Selecting all not failing (strict? " + onlyNotFailing + " start=" + start
+ ") found " + selected.size() + " new peers: " + selected + " all=" + _notFailingPeersList.size() + " strict=" + _strictCapacityOrder.size()); + ") found " + selected.size() + " new peers: " + selected + " all=" + _notFailingPeersList.size() + " strict=" + _strictCapacityOrder.size());
@ -418,25 +463,27 @@ public class ProfileOrganizer {
* *
*/ */
public void selectFailingPeers(int howMany, Set exclude, Set matches) { public void selectFailingPeers(int howMany, Set exclude, Set matches) {
synchronized (_reorganizeLock) { getReadLock();
try {
locked_selectPeers(_failingPeers, howMany, exclude, matches); locked_selectPeers(_failingPeers, howMany, exclude, matches);
} } finally { releaseReadLock(); }
return; return;
} }
/** /**
* Get the peers the transport layer thinks are unreachable, and * Get the peers the transport layer thinks are unreachable, and
* add in the peers with the SSU peer testing bug, * add in the peers with the SSU peer testing bug,
* and peers requiring introducers. * and peers requiring introducers.
* *
*/ */
public List selectPeersLocallyUnreachable() { public List selectPeersLocallyUnreachable() {
List n; List n;
int count; int count;
synchronized (_reorganizeLock) { getReadLock();
try {
count = _notFailingPeers.size(); count = _notFailingPeers.size();
n = new ArrayList(_notFailingPeers.keySet()); n = new ArrayList(_notFailingPeers.keySet());
} } finally { releaseReadLock(); }
List l = new ArrayList(count / 4); List l = new ArrayList(count / 4);
for (Iterator iter = n.iterator(); iter.hasNext(); ) { for (Iterator iter = n.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next(); Hash peer = (Hash)iter.next();
@ -483,7 +530,8 @@ public class ProfileOrganizer {
* *
*/ */
public List selectPeersRecentlyRejecting() { public List selectPeersRecentlyRejecting() {
synchronized (_reorganizeLock) { getReadLock();
try {
long cutoff = _context.clock().now() - (20*1000); long cutoff = _context.clock().now() - (20*1000);
int count = _notFailingPeers.size(); int count = _notFailingPeers.size();
List l = new ArrayList(count / 128); List l = new ArrayList(count / 128);
@ -493,7 +541,7 @@ public class ProfileOrganizer {
l.add(prof.getPeer()); l.add(prof.getPeer());
} }
return l; return l;
} } finally { releaseReadLock(); }
} }
/** /**
@ -501,14 +549,15 @@ public class ProfileOrganizer {
* *
*/ */
public Set selectAllPeers() { public Set selectAllPeers() {
synchronized (_reorganizeLock) { getReadLock();
try {
Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size()); Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
allPeers.addAll(_failingPeers.keySet()); allPeers.addAll(_failingPeers.keySet());
allPeers.addAll(_notFailingPeers.keySet()); allPeers.addAll(_notFailingPeers.keySet());
allPeers.addAll(_highCapacityPeers.keySet()); allPeers.addAll(_highCapacityPeers.keySet());
allPeers.addAll(_fastPeers.keySet()); allPeers.addAll(_fastPeers.keySet());
return allPeers; return allPeers;
} } finally { releaseReadLock(); }
} }
/** /**
@ -532,8 +581,10 @@ public class ProfileOrganizer {
expireOlderThan = _context.clock().now() - 6*60*60*1000; expireOlderThan = _context.clock().now() - 6*60*60*1000;
} }
if (!getWriteLock())
return;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
synchronized (_reorganizeLock) { try {
Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size()); Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
//allPeers.addAll(_failingPeers.values()); //allPeers.addAll(_failingPeers.values());
//allPeers.addAll(_notFailingPeers.values()); //allPeers.addAll(_notFailingPeers.values());
@ -557,35 +608,37 @@ public class ProfileOrganizer {
} }
sortTime = System.currentTimeMillis() - sortStart; sortTime = System.currentTimeMillis() - sortStart;
_strictCapacityOrder = reordered; _strictCapacityOrder = reordered;
long thresholdStart = System.currentTimeMillis(); long thresholdStart = System.currentTimeMillis();
locked_calculateThresholds(allPeers); locked_calculateThresholds(allPeers);
thresholdTime = System.currentTimeMillis()-thresholdStart; thresholdTime = System.currentTimeMillis()-thresholdStart;
_failingPeers.clear(); _failingPeers.clear();
_fastPeers.clear(); _fastPeers.clear();
_highCapacityPeers.clear(); _highCapacityPeers.clear();
_notFailingPeers.clear(); _notFailingPeers.clear();
_notFailingPeersList.clear(); _notFailingPeersList.clear();
_wellIntegratedPeers.clear(); _wellIntegratedPeers.clear();
long placeStart = System.currentTimeMillis(); long placeStart = System.currentTimeMillis();
for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) { for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next(); PeerProfile profile = (PeerProfile)iter.next();
locked_placeProfile(profile); locked_placeProfile(profile);
} }
locked_unfailAsNecessary(); locked_unfailAsNecessary();
locked_promoteFastAsNecessary(); locked_promoteFastAsNecessary();
Collections.shuffle(_notFailingPeersList, _context.random()); Collections.shuffle(_notFailingPeersList, _context.random());
placeTime = System.currentTimeMillis()-placeStart;
if (_log.shouldLog(Log.INFO)) placeTime = System.currentTimeMillis()-placeStart;
_log.info("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue } finally { releaseWriteLock(); }
+ ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]");
if (_log.shouldLog(Log.INFO))
_log.info("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue
+ ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]");
/***** /*****
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(512); StringBuffer buf = new StringBuffer(512);
@ -597,7 +650,6 @@ public class ProfileOrganizer {
_log.debug("fast: " + _fastPeers.values()); _log.debug("fast: " + _fastPeers.values());
} }
*****/ *****/
}
long total = System.currentTimeMillis()-start; long total = System.currentTimeMillis()-start;
_context.statManager().addRateData("peer.profileSortTime", sortTime, profileCount); _context.statManager().addRateData("peer.profileSortTime", sortTime, profileCount);
@ -899,11 +951,12 @@ public class ProfileOrganizer {
all.removeAll(matches); all.removeAll(matches);
all.remove(_us); all.remove(_us);
Collections.shuffle(all, _random); Collections.shuffle(all, _random);
Set IPSet = new HashSet(8);
for (int i = 0; (matches.size() < howMany) && (i < all.size()); i++) { for (int i = 0; (matches.size() < howMany) && (i < all.size()); i++) {
Hash peer = (Hash)all.get(i); Hash peer = (Hash)all.get(i);
boolean ok = isSelectable(peer); boolean ok = isSelectable(peer);
if (ok) { if (ok) {
ok = mask <= 0 || notRestricted(peer, matches, mask); ok = mask <= 0 || notRestricted(peer, IPSet, mask);
if ((!ok) && _log.shouldLog(Log.WARN)) if ((!ok) && _log.shouldLog(Log.WARN))
_log.warn("IP restriction prevents " + peer + " from joining " + matches); _log.warn("IP restriction prevents " + peer + " from joining " + matches);
} }
@ -917,79 +970,69 @@ public class ProfileOrganizer {
/** /**
* Does the peer's IP address NOT match the IP address of any peer already in the set, * Does the peer's IP address NOT match the IP address of any peer already in the set,
* on any transport, within a given mask? * on any transport, within a given mask?
* mask is 1-4 (number of bytes to match) or 0 to disable * @param mask is 1-4 (number of bytes to match)
* Perhaps rewrite this to just make a set of all the IP addresses rather than loop. * @param IPMatches all IPs so far, modified by this routine
*/ */
private boolean notRestricted(Hash peer, Set matches, int mask) { private boolean notRestricted(Hash peer, Set IPSet, int mask) {
if (mask <= 0) return true; Set peerIPs = maskedIPSet(peer, mask);
if (matches.size() <= 0) return true; if (containsAny(IPSet, peerIPs))
RouterInfo pinfo = _context.netDb().lookupRouterInfoLocally(peer);
if (pinfo == null) return false;
Set paddr = pinfo.getAddresses();
if (paddr == null || paddr.size() == 0)
return false; return false;
List pladdr = new ArrayList(paddr); IPSet.addAll(peerIPs);
List lmatches = new ArrayList(matches);
// for each match
for (int i = 0; i < matches.size(); i++) {
RouterInfo minfo = _context.netDb().lookupRouterInfoLocally((Hash) lmatches.get(i));
if (minfo == null) continue;
Set maddr = minfo.getAddresses();
if (maddr == null || maddr.size() == 0)
continue;
List mladdr = new ArrayList(maddr);
String oldphost = null;
// for each peer address
for (int j = 0; j < paddr.size(); j++) {
RouterAddress pa = (RouterAddress) pladdr.get(j);
if (pa == null) continue;
Properties pprops = pa.getOptions();
if (pprops == null) continue;
String phost = pprops.getProperty("host");
if (phost == null) continue;
if (oldphost != null && oldphost.equals(phost)) continue;
oldphost = phost;
InetAddress pi;
try {
pi = InetAddress.getByName(phost);
} catch (UnknownHostException uhe) {
continue;
}
if (pi == null) continue;
byte[] pib = pi.getAddress();
String oldmhost = null;
// for each match address
for (int k = 0; k < maddr.size(); k++) {
RouterAddress ma = (RouterAddress) mladdr.get(k);
if (ma == null) continue;
Properties mprops = ma.getOptions();
if (mprops == null) continue;
String mhost = mprops.getProperty("host");
if (mhost == null) continue;
if (oldmhost != null && oldmhost.equals(mhost)) continue;
oldmhost = mhost;
InetAddress mi;
try {
mi = InetAddress.getByName(mhost);
} catch (UnknownHostException uhe) {
continue;
}
if (mi == null) continue;
byte[] mib = mi.getAddress();
// assume ipv4, compare 1 to 4 bytes
// log.info("Comparing " + pi + " with " + mi);
for (int m = 0; m < mask; m++) {
if (pib[m] != mib[m])
break;
if (m == mask-1)
return false; // IP match
}
}
}
}
return true; return true;
} }
/**
* The Set of IPs for this peer, with a given mask.
* Includes the comm system's record of the IP, and all netDb addresses.
*
* @return an opaque set of masked IPs for this peer
*/
private Set maskedIPSet(Hash peer, int mask) {
Set rv = new HashSet(2);
byte[] commIP = _context.commSystem().getIP(peer);
if (commIP != null)
rv.add(maskedIP(commIP, mask));
RouterInfo pinfo = _context.netDb().lookupRouterInfoLocally(peer);
if (pinfo == null)
return rv;
Set<RouterAddress> paddr = pinfo.getAddresses();
if (paddr == null)
return rv;
for (RouterAddress pa : paddr) {
Properties pprops = pa.getOptions();
if (pprops == null) continue;
String phost = pprops.getProperty("host");
if (phost == null) continue;
InetAddress pi;
try {
pi = InetAddress.getByName(phost);
} catch (UnknownHostException uhe) {
continue;
}
if (pi == null) continue;
byte[] pib = pi.getAddress();
rv.add(maskedIP(pib, mask));
}
return rv;
}
/** generate an arbitrary unique value for this ip/mask (mask = 1-4) */
private Integer maskedIP(byte[] ip, int mask) {
int rv = 0;
for (int i = 0; i < mask; i++)
rv = (rv << 8) | (ip[i] & 0xff);
return Integer.valueOf(rv);
}
/** does a contain any of the elements in b? */
private boolean containsAny(Set a, Set b) {
for (Object o : b) {
if (a.contains(o))
return true;
}
return false;
}
public boolean isSelectable(Hash peer) { public boolean isSelectable(Hash peer) {
NetworkDatabaseFacade netDb = _context.netDb(); NetworkDatabaseFacade netDb = _context.netDb();
// the CLI shouldn't depend upon the netDb // the CLI shouldn't depend upon the netDb

View File

@ -36,6 +36,8 @@ import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.util.ConcurrentHashSet; import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/** /**
* Defines a way to send a message to another peer and start listening for messages * Defines a way to send a message to another peer and start listening for messages
@ -72,6 +74,7 @@ public abstract class TransportImpl implements Transport {
_unreachableEntries = new HashMap(16); _unreachableEntries = new HashMap(16);
_wasUnreachableEntries = new ConcurrentHashSet(16); _wasUnreachableEntries = new ConcurrentHashSet(16);
_currentAddress = null; _currentAddress = null;
SimpleScheduler.getInstance().addPeriodicEvent(new CleanupUnreachable(), 2 * UNREACHABLE_PERIOD, UNREACHABLE_PERIOD / 2);
} }
/** /**
@ -462,13 +465,10 @@ public abstract class TransportImpl implements Transport {
if (!isInbound) if (!isInbound)
markWasUnreachable(peer, false); markWasUnreachable(peer, false);
} }
private class CleanupUnreachable extends JobImpl {
public CleanupUnreachable(RouterContext ctx) { private class CleanupUnreachable implements SimpleTimer.TimedEvent {
super(ctx); public void timeReached() {
} long now = _context.clock().now();
public String getName() { return "Cleanup " + getStyle() + " unreachable list"; }
public void runJob() {
long now = getContext().clock().now();
synchronized (_unreachableEntries) { synchronized (_unreachableEntries) {
for (Iterator iter = _unreachableEntries.keySet().iterator(); iter.hasNext(); ) { for (Iterator iter = _unreachableEntries.keySet().iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next(); Hash peer = (Hash)iter.next();
@ -477,7 +477,6 @@ public abstract class TransportImpl implements Transport {
iter.remove(); iter.remove();
} }
} }
requeue(60*1000);
} }
} }