forked from I2P_Developers/i2p.i2p
Transports: Make unreachable maps concurrent
Adapted from i2speed / jogger ref: http://zzz.i2p/topics/2894 item 5
This commit is contained in:
@ -26,6 +26,7 @@ import java.util.Set;
|
|||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import net.i2p.data.DataHelper;
|
import net.i2p.data.DataHelper;
|
||||||
import net.i2p.data.Hash;
|
import net.i2p.data.Hash;
|
||||||
@ -101,8 +102,8 @@ public abstract class TransportImpl implements Transport {
|
|||||||
_sendPool = new ArrayBlockingQueue<OutNetMessage>(8);
|
_sendPool = new ArrayBlockingQueue<OutNetMessage>(8);
|
||||||
else
|
else
|
||||||
_sendPool = null;
|
_sendPool = null;
|
||||||
_unreachableEntries = new HashMap<Hash, Long>(32);
|
_unreachableEntries = new ConcurrentHashMap<Hash, Long>(32);
|
||||||
_wasUnreachableEntries = new HashMap<Hash, Long>(32);
|
_wasUnreachableEntries = new ConcurrentHashMap<Hash, Long>(32);
|
||||||
_localAddresses = new ConcurrentHashSet<InetAddress>(4);
|
_localAddresses = new ConcurrentHashSet<InetAddress>(4);
|
||||||
_context.simpleTimer2().addPeriodicEvent(new CleanupUnreachable(), 2 * UNREACHABLE_PERIOD, UNREACHABLE_PERIOD / 2);
|
_context.simpleTimer2().addPeriodicEvent(new CleanupUnreachable(), 2 * UNREACHABLE_PERIOD, UNREACHABLE_PERIOD / 2);
|
||||||
}
|
}
|
||||||
@ -850,17 +851,16 @@ public abstract class TransportImpl implements Transport {
|
|||||||
public void mayDisconnect(Hash peer) {}
|
public void mayDisconnect(Hash peer) {}
|
||||||
|
|
||||||
public boolean isUnreachable(Hash peer) {
|
public boolean isUnreachable(Hash peer) {
|
||||||
synchronized (_unreachableEntries) {
|
boolean rv;
|
||||||
Long when = _unreachableEntries.get(peer);
|
Long when = _unreachableEntries.get(peer);
|
||||||
if (when == null) return false;
|
if ((rv = when != null)) {
|
||||||
long now = _context.clock().now();
|
long now = _context.clock().now();
|
||||||
if (when.longValue() + UNREACHABLE_PERIOD < now) {
|
rv = when.longValue() + UNREACHABLE_PERIOD >= now;
|
||||||
|
if (!rv) {
|
||||||
_unreachableEntries.remove(peer);
|
_unreachableEntries.remove(peer);
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** called when we can't reach a peer */
|
/** called when we can't reach a peer */
|
||||||
@ -870,10 +870,8 @@ public abstract class TransportImpl implements Transport {
|
|||||||
status == Status.HOSED)
|
status == Status.HOSED)
|
||||||
return;
|
return;
|
||||||
Long now = Long.valueOf(_context.clock().now());
|
Long now = Long.valueOf(_context.clock().now());
|
||||||
synchronized (_unreachableEntries) {
|
// This isn't very useful since it is cleared when they contact us
|
||||||
// This isn't very useful since it is cleared when they contact us
|
_unreachableEntries.put(peer, now);
|
||||||
_unreachableEntries.put(peer, now);
|
|
||||||
}
|
|
||||||
// This is not cleared when they contact us
|
// This is not cleared when they contact us
|
||||||
markWasUnreachable(peer, true);
|
markWasUnreachable(peer, true);
|
||||||
}
|
}
|
||||||
@ -882,9 +880,7 @@ public abstract class TransportImpl implements Transport {
|
|||||||
public void markReachable(Hash peer, boolean isInbound) {
|
public void markReachable(Hash peer, boolean isInbound) {
|
||||||
// if *some* transport can reach them, then we shouldn't banlist 'em
|
// if *some* transport can reach them, then we shouldn't banlist 'em
|
||||||
_context.banlist().unbanlistRouter(peer);
|
_context.banlist().unbanlistRouter(peer);
|
||||||
synchronized (_unreachableEntries) {
|
_unreachableEntries.remove(peer);
|
||||||
_unreachableEntries.remove(peer);
|
|
||||||
}
|
|
||||||
if (!isInbound)
|
if (!isInbound)
|
||||||
markWasUnreachable(peer, false);
|
markWasUnreachable(peer, false);
|
||||||
}
|
}
|
||||||
@ -892,19 +888,15 @@ public abstract class TransportImpl implements Transport {
|
|||||||
private class CleanupUnreachable implements SimpleTimer.TimedEvent {
|
private class CleanupUnreachable implements SimpleTimer.TimedEvent {
|
||||||
public void timeReached() {
|
public void timeReached() {
|
||||||
long now = _context.clock().now();
|
long now = _context.clock().now();
|
||||||
synchronized (_unreachableEntries) {
|
for (Iterator<Long> iter = _unreachableEntries.values().iterator(); iter.hasNext(); ) {
|
||||||
for (Iterator<Long> iter = _unreachableEntries.values().iterator(); iter.hasNext(); ) {
|
Long when = iter.next();
|
||||||
Long when = iter.next();
|
if (when.longValue() + UNREACHABLE_PERIOD < now)
|
||||||
if (when.longValue() + UNREACHABLE_PERIOD < now)
|
iter.remove();
|
||||||
iter.remove();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
synchronized (_wasUnreachableEntries) {
|
for (Iterator<Long> iter = _wasUnreachableEntries.values().iterator(); iter.hasNext(); ) {
|
||||||
for (Iterator<Long> iter = _wasUnreachableEntries.values().iterator(); iter.hasNext(); ) {
|
Long when = iter.next();
|
||||||
Long when = iter.next();
|
if (when.longValue() + WAS_UNREACHABLE_PERIOD < now)
|
||||||
if (when.longValue() + WAS_UNREACHABLE_PERIOD < now)
|
iter.remove();
|
||||||
iter.remove();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -914,16 +906,14 @@ public abstract class TransportImpl implements Transport {
|
|||||||
* This is NOT reset if the peer contacts us.
|
* This is NOT reset if the peer contacts us.
|
||||||
*/
|
*/
|
||||||
public boolean wasUnreachable(Hash peer) {
|
public boolean wasUnreachable(Hash peer) {
|
||||||
synchronized (_wasUnreachableEntries) {
|
Long when = _wasUnreachableEntries.get(peer);
|
||||||
Long when = _wasUnreachableEntries.get(peer);
|
if (when != null) {
|
||||||
if (when != null) {
|
long now = _context.clock().now();
|
||||||
long now = _context.clock().now();
|
if (when.longValue() + WAS_UNREACHABLE_PERIOD < now) {
|
||||||
if (when.longValue() + WAS_UNREACHABLE_PERIOD < now) {
|
_unreachableEntries.remove(peer);
|
||||||
_unreachableEntries.remove(peer);
|
return false;
|
||||||
return false;
|
} else {
|
||||||
} else {
|
return true;
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer);
|
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer);
|
||||||
@ -938,13 +928,9 @@ public abstract class TransportImpl implements Transport {
|
|||||||
private void markWasUnreachable(Hash peer, boolean yes) {
|
private void markWasUnreachable(Hash peer, boolean yes) {
|
||||||
if (yes) {
|
if (yes) {
|
||||||
Long now = Long.valueOf(_context.clock().now());
|
Long now = Long.valueOf(_context.clock().now());
|
||||||
synchronized (_wasUnreachableEntries) {
|
_wasUnreachableEntries.put(peer, now);
|
||||||
_wasUnreachableEntries.put(peer, now);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
synchronized (_wasUnreachableEntries) {
|
_wasUnreachableEntries.remove(peer);
|
||||||
_wasUnreachableEntries.remove(peer);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (_log.shouldDebug())
|
if (_log.shouldDebug())
|
||||||
_log.debug(this.getStyle() + " setting wasUnreachable to " + yes + " for " + peer,
|
_log.debug(this.getStyle() + " setting wasUnreachable to " + yes + " for " + peer,
|
||||||
|
Reference in New Issue
Block a user