* new simple 'invalid reply rate' tracking the number of replies a peer sends that Suck.

(e.g. we can't find the referenced peer or the data they send back is corrupt/expired).
This is like the old invalidReplies, except its a rate that decays.
* if we receive more than 5 invalid replies from a peer in a 1-2 hour period,
stop verifying any subsequent replies, and also stop asking them for keys.
* cleaned up the store validation even further
This commit is contained in:
jrandom
2004-08-17 02:03:09 +00:00
committed by zzz
parent 7ed310ffd2
commit 8e9c541eba
10 changed files with 265 additions and 61 deletions

View File

@ -38,10 +38,18 @@ public abstract class NetworkDatabaseFacade implements Service {
public abstract LeaseSet lookupLeaseSetLocally(Hash key); public abstract LeaseSet lookupLeaseSetLocally(Hash key);
public abstract void lookupRouterInfo(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs); public abstract void lookupRouterInfo(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs);
public abstract RouterInfo lookupRouterInfoLocally(Hash key); public abstract RouterInfo lookupRouterInfoLocally(Hash key);
/** return the leaseSet if another leaseSet already existed at that key */ /**
public abstract LeaseSet store(Hash key, LeaseSet leaseSet); * return the leaseSet if another leaseSet already existed at that key
/** return the routerInfo if another router already existed at that key */ *
public abstract RouterInfo store(Hash key, RouterInfo routerInfo); * @throws IllegalArgumentException if the data is not valid
*/
public abstract LeaseSet store(Hash key, LeaseSet leaseSet) throws IllegalArgumentException;
/**
* return the routerInfo if another router already existed at that key
*
* @throws IllegalArgumentException if the data is not valid
*/
public abstract RouterInfo store(Hash key, RouterInfo routerInfo) throws IllegalArgumentException;
public abstract void publish(RouterInfo localRouterInfo); public abstract void publish(RouterInfo localRouterInfo);
public abstract void publish(LeaseSet localLeaseSet); public abstract void publish(LeaseSet localLeaseSet);
public abstract void unpublish(LeaseSet localLeaseSet); public abstract void unpublish(LeaseSet localLeaseSet);

View File

@ -48,17 +48,30 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling database store message"); _log.debug("Handling database store message");
boolean invalid = false;
boolean wasNew = false; boolean wasNew = false;
if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) { if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) {
Object match = getContext().netDb().store(_message.getKey(), _message.getLeaseSet()); try {
wasNew = (null == match); Object match = getContext().netDb().store(_message.getKey(), _message.getLeaseSet());
wasNew = (null == match);
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.WARN))
_log.warn("Not storing a leaseSet", iae);
invalid = true;
}
} else if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) { } else if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of " _log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of "
+ new Date(_message.getRouterInfo().getPublished())); + new Date(_message.getRouterInfo().getPublished()));
Object match = getContext().netDb().store(_message.getKey(), _message.getRouterInfo()); try {
wasNew = (null == match); Object match = getContext().netDb().store(_message.getKey(), _message.getRouterInfo());
getContext().profileManager().heardAbout(_message.getKey()); wasNew = (null == match);
getContext().profileManager().heardAbout(_message.getKey());
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.WARN))
_log.warn("Not storing a routerInfo", iae);
invalid = true;
}
} else { } else {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Invalid DatabaseStoreMessage data type - " + _message.getValueType() _log.error("Invalid DatabaseStoreMessage data type - " + _message.getValueType()
@ -70,9 +83,12 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
if (_from != null) if (_from != null)
_fromHash = _from.getHash(); _fromHash = _from.getHash();
if (_fromHash != null) if (_fromHash != null) {
getContext().profileManager().dbStoreReceived(_fromHash, wasNew); if (!invalid) {
getContext().statManager().addRateData("netDb.storeHandled", 1, 0); getContext().profileManager().dbStoreReceived(_fromHash, wasNew);
getContext().statManager().addRateData("netDb.storeHandled", 1, 0);
}
}
} }
private void sendAck() { private void sendAck() {
@ -86,8 +102,8 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
return; return;
} else { } else {
getContext().jobQueue().addJob(new SendTunnelMessageJob(getContext(), msg, outTunnelId, getContext().jobQueue().addJob(new SendTunnelMessageJob(getContext(), msg, outTunnelId,
_message.getReplyGateway(), _message.getReplyTunnel(), _message.getReplyGateway(), _message.getReplyTunnel(),
null, null, null, null, ACK_TIMEOUT, ACK_PRIORITY)); null, null, null, null, ACK_TIMEOUT, ACK_PRIORITY));
} }
} }

View File

@ -433,31 +433,47 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
/** I don't think it'll ever make sense to have a lease last for a full day */ /** I don't think it'll ever make sense to have a lease last for a full day */
private static final long MAX_LEASE_FUTURE = 24*60*60*1000; private static final long MAX_LEASE_FUTURE = 24*60*60*1000;
public LeaseSet store(Hash key, LeaseSet leaseSet) { /**
long start = _context.clock().now(); * Determine whether this leaseSet will be accepted as valid and current
if (!_initialized) return null; * given what we know now.
*
*/
boolean validate(Hash key, LeaseSet leaseSet) {
if (!key.equals(leaseSet.getDestination().calculateHash())) { if (!key.equals(leaseSet.getDestination().calculateHash())) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Invalid store attempt! key does not match leaseSet.destination! key = " _log.error("Invalid store attempt! key does not match leaseSet.destination! key = "
+ key + ", leaseSet = " + leaseSet); + key + ", leaseSet = " + leaseSet);
return null; return false;
} else if (!leaseSet.verifySignature()) { } else if (!leaseSet.verifySignature()) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Invalid leaseSet signature! leaseSet = " + leaseSet); _log.error("Invalid leaseSet signature! leaseSet = " + leaseSet);
return null; return false;
} else if (leaseSet.getEarliestLeaseDate() <= _context.clock().now()) { } else if (leaseSet.getEarliestLeaseDate() <= _context.clock().now()) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Old leaseSet! not storing it: " _log.warn("Old leaseSet! not storing it: "
+ leaseSet.getDestination().calculateHash().toBase64() + leaseSet.getDestination().calculateHash().toBase64()
+ " expires on " + new Date(leaseSet.getEarliestLeaseDate()), new Exception("Rejecting store")); + " expires on " + new Date(leaseSet.getEarliestLeaseDate()), new Exception("Rejecting store"));
return null; return false;
} else if (leaseSet.getEarliestLeaseDate() > _context.clock().now() + MAX_LEASE_FUTURE) { } else if (leaseSet.getEarliestLeaseDate() > _context.clock().now() + MAX_LEASE_FUTURE) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("LeaseSet to expire too far in the future: " _log.warn("LeaseSet to expire too far in the future: "
+ leaseSet.getDestination().calculateHash().toBase64() + leaseSet.getDestination().calculateHash().toBase64()
+ " expires on " + new Date(leaseSet.getEarliestLeaseDate()), new Exception("Rejecting store")); + " expires on " + new Date(leaseSet.getEarliestLeaseDate()), new Exception("Rejecting store"));
return null; return false;
} }
return true;
}
/**
* Store the leaseSet
*
* @throws IllegalArgumentException if the leaseSet is not valid
*/
public LeaseSet store(Hash key, LeaseSet leaseSet) throws IllegalArgumentException {
if (!_initialized) return null;
boolean valid = validate(key, leaseSet);
if (!valid) throw new IllegalArgumentException("LeaseSet is not valid");
LeaseSet rv = null; LeaseSet rv = null;
if (_ds.isKnown(key)) if (_ds.isKnown(key))
@ -485,39 +501,54 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
} }
} }
long end = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Store leaseSet took [" + (end-start) + "ms]");
return rv; return rv;
} }
public RouterInfo store(Hash key, RouterInfo routerInfo) { /**
long start = _context.clock().now(); * Determine whether this routerInfo will be accepted as valid and current
if (!_initialized) return null; * given what we know now.
*
*/
boolean validate(Hash key, RouterInfo routerInfo) {
long now = _context.clock().now();
if (!key.equals(routerInfo.getIdentity().getHash())) { if (!key.equals(routerInfo.getIdentity().getHash())) {
_log.error("Invalid store attempt! key does not match routerInfo.identity! key = " + key + ", router = " + routerInfo); _log.error("Invalid store attempt! key does not match routerInfo.identity! key = " + key + ", router = " + routerInfo);
return null; return false;
} else if (!routerInfo.isValid()) { } else if (!routerInfo.isValid()) {
_log.error("Invalid routerInfo signature! forged router structure! router = " + routerInfo); _log.error("Invalid routerInfo signature! forged router structure! router = " + routerInfo);
return null; return false;
} else if (!routerInfo.isCurrent(ExpireRoutersJob.EXPIRE_DELAY)) { } else if (!routerInfo.isCurrent(ExpireRoutersJob.EXPIRE_DELAY)) {
int existing = _kb.size(); int existing = _kb.size();
if (existing >= MIN_REMAINING_ROUTERS) { if (existing >= MIN_REMAINING_ROUTERS) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Not storing expired router for " + key.toBase64(), new Exception("Rejecting store")); _log.info("Not storing expired router for " + key.toBase64(), new Exception("Rejecting store"));
return null; return false;
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Even though the peer is old, we have only " + existing _log.warn("Even though the peer is old, we have only " + existing
+ " peers left (curPeer: " + key.toBase64() + " published on " + " peers left (curPeer: " + key.toBase64() + " published on "
+ new Date(routerInfo.getPublished())); + new Date(routerInfo.getPublished()));
} }
} else if (routerInfo.getPublished() > start + Router.CLOCK_FUDGE_FACTOR) { } else if (routerInfo.getPublished() > now + Router.CLOCK_FUDGE_FACTOR) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Peer " + key.toBase64() + " published their routerInfo in the future?! [" _log.warn("Peer " + key.toBase64() + " published their routerInfo in the future?! ["
+ new Date(routerInfo.getPublished()) + "]", new Exception("Rejecting store")); + new Date(routerInfo.getPublished()) + "]", new Exception("Rejecting store"));
return null; return false;
} }
return true;
}
/**
* store the routerInfo
*
* @throws IllegalArgumentException if the routerInfo is not valid
*/
public RouterInfo store(Hash key, RouterInfo routerInfo) throws IllegalArgumentException {
if (!_initialized) return null;
boolean valid = validate(key, routerInfo);
if (!valid) throw new IllegalArgumentException("LeaseSet is not valid");
RouterInfo rv = null; RouterInfo rv = null;
if (_ds.isKnown(key)) if (_ds.isKnown(key))
@ -534,9 +565,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_lastSent.put(key, new Long(0)); _lastSent.put(key, new Long(0));
} }
_kb.add(key); _kb.add(key);
long end = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Store routerInfo took [" + (end-start) + "ms]");
return rv; return rv;
} }
@ -576,6 +604,11 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_log.info("Dropping a lease: " + dbEntry); _log.info("Dropping a lease: " + dbEntry);
} }
if (o == null) {
boolean removed = _kb.remove(dbEntry);
// if we dont know the key, lets make sure it isn't a now-dead peer
}
_ds.remove(dbEntry); _ds.remove(dbEntry);
synchronized (_lastSent) { synchronized (_lastSent) {
_lastSent.remove(dbEntry); _lastSent.remove(dbEntry);

View File

@ -21,6 +21,9 @@ import java.util.TreeMap;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.peermanager.PeerProfile;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log; import net.i2p.util.Log;
class PeerSelector { class PeerSelector {
@ -82,16 +85,33 @@ class PeerSelector {
* *
*/ */
private void removeFailingPeers(Set peerHashes) { private void removeFailingPeers(Set peerHashes) {
List failing = new ArrayList(16); List failing = null;
for (Iterator iter = peerHashes.iterator(); iter.hasNext(); ) { for (Iterator iter = peerHashes.iterator(); iter.hasNext(); ) {
Hash cur = (Hash)iter.next(); Hash cur = (Hash)iter.next();
if (_context.profileOrganizer().isFailing(cur)) { if (_context.profileOrganizer().isFailing(cur)) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + cur.toBase64() + " is failing, don't include them in the peer selection"); _log.debug("Peer " + cur.toBase64() + " is failing, don't include them in the peer selection");
if (failing == null)
failing = new ArrayList(4);
failing.add(cur); failing.add(cur);
} else if (_context.profileOrganizer().peerSendsBadReplies(cur)) {
if (failing == null)
failing = new ArrayList(4);
failing.add(cur);
if (_log.shouldLog(Log.WARN)) {
PeerProfile profile = _context.profileOrganizer().getProfile(cur);
if (profile != null) {
RateStat invalidReplyRateStat = profile.getDBHistory().getInvalidReplyRate();
Rate invalidReplyRate = invalidReplyRateStat.getRate(60*60*1000l);
_log.warn("Peer " + cur.toBase64() + " sends us bad replies: current hour: "
+ invalidReplyRate.getCurrentEventCount() + " and last hour: "
+ invalidReplyRate.getLastEventCount() + ":\n" + invalidReplyRate.toString());
}
}
} }
} }
peerHashes.removeAll(failing); if (failing != null)
peerHashes.removeAll(failing);
} }
protected BigInteger getDistance(Hash targetKey, Hash routerInQuestion) { protected BigInteger getDistance(Hash targetKey, Hash routerInQuestion) {

View File

@ -215,9 +215,9 @@ class PersistentDataStore extends TransientDataStore {
fis = new FileInputStream(_leaseFile); fis = new FileInputStream(_leaseFile);
LeaseSet ls = new LeaseSet(); LeaseSet ls = new LeaseSet();
ls.readBytes(fis); ls.readBytes(fis);
_facade.store(ls.getDestination().calculateHash(), ls); try {
Object accepted = _facade.lookupLeaseSetLocally(ls.getDestination().calculateHash()); _facade.store(ls.getDestination().calculateHash(), ls);
if (accepted == null) { } catch (IllegalArgumentException iae) {
_log.info("Refused locally loaded leaseSet - deleting"); _log.info("Refused locally loaded leaseSet - deleting");
corrupt = true; corrupt = true;
} }
@ -271,9 +271,9 @@ class PersistentDataStore extends TransientDataStore {
fis = new FileInputStream(_routerFile); fis = new FileInputStream(_routerFile);
RouterInfo ri = new RouterInfo(); RouterInfo ri = new RouterInfo();
ri.readBytes(fis); ri.readBytes(fis);
_facade.store(ri.getIdentity().getHash(), ri); try {
Object accepted = _facade.lookupRouterInfoLocally(ri.getIdentity().getHash()); _facade.store(ri.getIdentity().getHash(), ri);
if (accepted == null) { } catch (IllegalArgumentException iae) {
_log.info("Refused locally loaded routerInfo - deleting"); _log.info("Refused locally loaded routerInfo - deleting");
corrupt = true; corrupt = true;
} }

View File

@ -26,6 +26,7 @@ import net.i2p.router.TunnelInfo;
import net.i2p.router.TunnelSelectionCriteria; import net.i2p.router.TunnelSelectionCriteria;
import net.i2p.router.message.SendMessageDirectJob; import net.i2p.router.message.SendMessageDirectJob;
import net.i2p.router.message.SendTunnelMessageJob; import net.i2p.router.message.SendTunnelMessageJob;
import net.i2p.router.peermanager.PeerProfile;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -87,6 +88,9 @@ class SearchJob extends JobImpl {
getContext().statManager().createRateStat("netDb.failedPeers", "How many peers fail to respond to a lookup?", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l }); getContext().statManager().createRateStat("netDb.failedPeers", "How many peers fail to respond to a lookup?", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchCount", "Overall number of searches sent", "Network Database", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); getContext().statManager().createRateStat("netDb.searchCount", "Overall number of searches sent", "Network Database", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchMessageCount", "Overall number of mesages for all searches sent", "Network Database", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); getContext().statManager().createRateStat("netDb.searchMessageCount", "Overall number of mesages for all searches sent", "Network Database", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchReplyValidated", "How many search replies we get that we are able to validate (fetch)", "Network Database", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchReplyNotValidated", "How many search replies we get that we are NOT able to validate (fetch)", "Network Database", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchReplyValidationSkipped", "How many search replies we get from unreliable peers that we skip?", "Network Database", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Search (" + getClass().getName() + " for " + key.toBase64(), new Exception("Search enqueued by")); _log.debug("Search (" + getClass().getName() + " for " + key.toBase64(), new Exception("Search enqueued by"));
} }
@ -396,12 +400,22 @@ class SearchJob extends JobImpl {
private final class SearchReplyJob extends JobImpl { private final class SearchReplyJob extends JobImpl {
private DatabaseSearchReplyMessage _msg; private DatabaseSearchReplyMessage _msg;
/**
* Peer who we think sent us the reply. Note: could be spoofed! If the
* attacker knew we were searching for a particular key from a
* particular peer, they could send us some searchReply messages with
* shitty values, trying to get us to consider that peer unreliable.
* Potential fixes include either authenticated 'from' address or use a
* nonce in the search + searchReply (and check for it in the selector).
*
*/
private Hash _peer; private Hash _peer;
private int _curIndex; private int _curIndex;
private int _invalidPeers; private int _invalidPeers;
private int _seenPeers; private int _seenPeers;
private int _newPeers; private int _newPeers;
private int _duplicatePeers; private int _duplicatePeers;
private int _repliesPendingVerification;
private long _duration; private long _duration;
public SearchReplyJob(DatabaseSearchReplyMessage message, Hash peer, long duration) { public SearchReplyJob(DatabaseSearchReplyMessage message, Hash peer, long duration) {
super(SearchJob.this.getContext()); super(SearchJob.this.getContext());
@ -412,24 +426,40 @@ class SearchJob extends JobImpl {
_seenPeers = 0; _seenPeers = 0;
_newPeers = 0; _newPeers = 0;
_duplicatePeers = 0; _duplicatePeers = 0;
_repliesPendingVerification = 0;
} }
public String getName() { return "Process Reply for Kademlia Search"; } public String getName() { return "Process Reply for Kademlia Search"; }
public void runJob() { public void runJob() {
if (_curIndex >= _msg.getNumReplies()) { if (_curIndex >= _msg.getNumReplies()) {
getContext().profileManager().dbLookupReply(_peer, _newPeers, _seenPeers, if (_repliesPendingVerification > 0) {
_invalidPeers, _duplicatePeers, _duration); // we received new references from the peer, but still
if (_newPeers > 0) // haven't verified all of them, so lets give it more time
newPeersFound(_newPeers); SearchReplyJob.this.requeue(_timeoutMs);
} else {
// either they didn't tell us anything new or we have verified
// (or failed to verify) all of them. we're done
getContext().profileManager().dbLookupReply(_peer, _newPeers, _seenPeers,
_invalidPeers, _duplicatePeers, _duration);
if (_newPeers > 0)
newPeersFound(_newPeers);
}
} else { } else {
Hash peer = _msg.getReply(_curIndex); Hash peer = _msg.getReply(_curIndex);
RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer); RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer);
if (info == null) { if (info == null) {
// hmm, perhaps don't always send a lookup for this... // if the peer is giving us lots of bad peer references,
// but for now, wtf, why not. we may even want to adjust it so that // dont try to fetch them.
// we penalize or benefit peers who send us that which we can or
// cannot lookup boolean sendsBadInfo = getContext().profileOrganizer().peerSendsBadReplies(_peer);
getContext().netDb().lookupRouterInfo(peer, null, null, _timeoutMs); if (!sendsBadInfo) {
getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(peer), new ReplyNotVerifiedJob(peer), _timeoutMs);
_repliesPendingVerification++;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64());
getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
}
} }
if (_state.wasAttempted(peer)) { if (_state.wasAttempted(peer)) {
@ -447,6 +477,38 @@ class SearchJob extends JobImpl {
requeue(0); requeue(0);
} }
} }
/** the peer gave us a reference to a new router, and we were able to fetch it */
private final class ReplyVerifiedJob extends JobImpl {
private Hash _key;
public ReplyVerifiedJob(Hash key) {
super(SearchReplyJob.this.getContext());
_key = key;
}
public String getName() { return "Search reply value verified"; }
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64() + " verified: " + _key.toBase64());
_repliesPendingVerification--;
getContext().statManager().addRateData("netDb.searchReplyValidated", 1, 0);
}
}
/** the peer gave us a reference to a new router, and we were NOT able to fetch it */
private final class ReplyNotVerifiedJob extends JobImpl {
private Hash _key;
public ReplyNotVerifiedJob(Hash key) {
super(SearchReplyJob.this.getContext());
_key = key;
}
public String getName() { return "Search reply value NOT verified"; }
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64() + " failed verification: " + _key.toBase64());
_repliesPendingVerification--;
_invalidPeers++;
getContext().statManager().addRateData("netDb.searchReplyNotValidated", 1, 0);
}
}
} }
/** /**
@ -534,6 +596,7 @@ class SearchJob extends JobImpl {
if (_keepStats) { if (_keepStats) {
long time = getContext().clock().now() - _state.getWhenStarted(); long time = getContext().clock().now() - _state.getWhenStarted();
getContext().statManager().addRateData("netDb.failedTime", time, 0); getContext().statManager().addRateData("netDb.failedTime", time, 0);
_facade.fail(_state.getTarget());
} }
if (_onFailure != null) if (_onFailure != null)
getContext().jobQueue().addJob(_onFailure); getContext().jobQueue().addJob(_onFailure);

View File

@ -46,19 +46,27 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
DatabaseStoreMessage msg = (DatabaseStoreMessage)_message; DatabaseStoreMessage msg = (DatabaseStoreMessage)_message;
if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) { if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) {
_facade.store(msg.getKey(), msg.getLeaseSet()); try {
_facade.store(msg.getKey(), msg.getLeaseSet());
getContext().profileManager().dbLookupSuccessful(_peer, timeToReply);
} catch (IllegalArgumentException iae) {
getContext().profileManager().dbLookupReply(_peer, 0, 0, 1, 0, timeToReply);
}
} else if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) { } else if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": dbStore received on search containing router " _log.info(getJobId() + ": dbStore received on search containing router "
+ msg.getKey() + " with publishDate of " + msg.getKey() + " with publishDate of "
+ new Date(msg.getRouterInfo().getPublished())); + new Date(msg.getRouterInfo().getPublished()));
_facade.store(msg.getKey(), msg.getRouterInfo()); try {
_facade.store(msg.getKey(), msg.getRouterInfo());
getContext().profileManager().dbLookupSuccessful(_peer, timeToReply);
} catch (IllegalArgumentException iae) {
getContext().profileManager().dbLookupReply(_peer, 0, 0, 1, 0, timeToReply);
}
} else { } else {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Unknown db store type?!@ " + msg.getValueType()); _log.error(getJobId() + ": Unknown db store type?!@ " + msg.getValueType());
} }
getContext().profileManager().dbLookupSuccessful(_peer, timeToReply);
} else if (_message instanceof DatabaseSearchReplyMessage) { } else if (_message instanceof DatabaseSearchReplyMessage) {
_job.replyFound((DatabaseSearchReplyMessage)_message, _peer); _job.replyFound((DatabaseSearchReplyMessage)_message, _peer);
} else { } else {

View File

@ -18,6 +18,7 @@ public class DBHistory {
private long _successfulLookups; private long _successfulLookups;
private long _failedLookups; private long _failedLookups;
private RateStat _failedLookupRate; private RateStat _failedLookupRate;
private RateStat _invalidReplyRate;
private long _lookupReplyNew; private long _lookupReplyNew;
private long _lookupReplyOld; private long _lookupReplyOld;
private long _lookupReplyDuplicate; private long _lookupReplyDuplicate;
@ -34,6 +35,7 @@ public class DBHistory {
_successfulLookups = 0; _successfulLookups = 0;
_failedLookups = 0; _failedLookups = 0;
_failedLookupRate = null; _failedLookupRate = null;
_invalidReplyRate = null;
_lookupReplyNew = 0; _lookupReplyNew = 0;
_lookupReplyOld = 0; _lookupReplyOld = 0;
_lookupReplyDuplicate = 0; _lookupReplyDuplicate = 0;
@ -74,6 +76,8 @@ public class DBHistory {
*/ */
public RateStat getFailedLookupRate() { return _failedLookupRate; } public RateStat getFailedLookupRate() { return _failedLookupRate; }
public RateStat getInvalidReplyRate() { return _invalidReplyRate; }
/** /**
* Note that the peer was not only able to respond to the lookup, but sent us * Note that the peer was not only able to respond to the lookup, but sent us
* the data we wanted! * the data we wanted!
@ -103,6 +107,10 @@ public class DBHistory {
_lookupReplyOld += oldPeers; _lookupReplyOld += oldPeers;
_lookupReplyInvalid += invalid; _lookupReplyInvalid += invalid;
_lookupReplyDuplicate += duplicate; _lookupReplyDuplicate += duplicate;
if (invalid > 0) {
_invalidReplyRate.addData(invalid, 0);
}
} }
/** /**
* Note that the peer sent us a lookup * Note that the peer sent us a lookup
@ -148,6 +156,7 @@ public class DBHistory {
public void coallesceStats() { public void coallesceStats() {
_log.debug("Coallescing stats"); _log.debug("Coallescing stats");
_failedLookupRate.coallesceStats(); _failedLookupRate.coallesceStats();
_invalidReplyRate.coallesceStats();
} }
private final static String NL = System.getProperty("line.separator"); private final static String NL = System.getProperty("line.separator");
@ -171,7 +180,7 @@ public class DBHistory {
add(buf, "avgDelayBetweenLookupsReceived", _avgDelayBetweenLookupsReceived, "How long is it typically between each db lookup they send us? (in milliseconds)"); add(buf, "avgDelayBetweenLookupsReceived", _avgDelayBetweenLookupsReceived, "How long is it typically between each db lookup they send us? (in milliseconds)");
out.write(buf.toString().getBytes()); out.write(buf.toString().getBytes());
_failedLookupRate.store(out, "dbHistory.failedLookupRate"); _failedLookupRate.store(out, "dbHistory.failedLookupRate");
_log.debug("Writing out dbHistory.failedLookupRate"); _invalidReplyRate.store(out, "dbHistory.invalidReplyRate");
} }
private void add(StringBuffer buf, String name, long val, String description) { private void add(StringBuffer buf, String name, long val, String description) {
@ -197,12 +206,21 @@ public class DBHistory {
_log.debug("Loading dbHistory.failedLookupRate"); _log.debug("Loading dbHistory.failedLookupRate");
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
_log.warn("DB History failed lookup rate is corrupt, resetting", iae); _log.warn("DB History failed lookup rate is corrupt, resetting", iae);
}
try {
_invalidReplyRate.load(props, "dbHistory.invalidReplyRate", true);
} catch (IllegalArgumentException iae) {
_log.warn("DB History invalid reply rate is corrupt, resetting", iae);
createRates(); createRates();
} }
} }
private void createRates() { private void createRates() {
_failedLookupRate = new RateStat("dbHistory.failedLookupRate", "How often does this peer to respond to a lookup?", "dbHistory", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); if (_failedLookupRate == null)
_failedLookupRate = new RateStat("dbHistory.failedLookupRate", "How often does this peer to respond to a lookup?", "dbHistory", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
if (_invalidReplyRate == null)
_invalidReplyRate = new RateStat("dbHistory.invalidReplyRate", "How often does this peer give us a bad (nonexistant, forged, etc) peer?", "dbHistory", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
} }
private final static long getLong(Properties props, String key) { private final static long getLong(Properties props, String key) {

View File

@ -21,6 +21,8 @@ import java.util.TreeSet;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -197,6 +199,35 @@ public class ProfileOrganizer {
public boolean isWellIntegrated(Hash peer) { synchronized (_reorganizeLock) { return _wellIntegratedPeers.containsKey(peer); } } public boolean isWellIntegrated(Hash peer) { synchronized (_reorganizeLock) { return _wellIntegratedPeers.containsKey(peer); } }
public boolean isFailing(Hash peer) { synchronized (_reorganizeLock) { return _failingPeers.containsKey(peer); } } public boolean isFailing(Hash peer) { synchronized (_reorganizeLock) { return _failingPeers.containsKey(peer); } }
/**
* if a peer sends us more than 5 replies in a searchReply that we cannot
* fetch, stop listening to them.
*
*/
private final static int MAX_BAD_REPLIES_PER_HOUR = 5;
/**
* Does the given peer send us bad replies - either invalid store messages
* (expired, corrupt, etc) or unreachable replies (pointing towards routers
* that don't exist).
*
*/
public boolean peerSendsBadReplies(Hash peer) {
PeerProfile profile = getProfile(peer);
if (profile != null) {
RateStat invalidReplyRateStat = profile.getDBHistory().getInvalidReplyRate();
Rate invalidReplyRate = invalidReplyRateStat.getRate(60*60*1000l);
if ( (invalidReplyRate.getCurrentTotalValue() > MAX_BAD_REPLIES_PER_HOUR) ||
(invalidReplyRate.getLastTotalValue() > MAX_BAD_REPLIES_PER_HOUR) ) {
return true;
}
}
return false;
}
/** /**
* Return a set of Hashes for peers that are both fast and reliable. If an insufficient * Return a set of Hashes for peers that are both fast and reliable. If an insufficient
* number of peers are both fast and reliable, fall back onto high capacity peers, and if that * number of peers are both fast and reliable, fall back onto high capacity peers, and if that

View File

@ -191,8 +191,15 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
byte signedData[] = new byte[decr.length - rsig.getData().length]; byte signedData[] = new byte[decr.length - rsig.getData().length];
System.arraycopy(decr, 0, signedData, 0, signedData.length); System.arraycopy(decr, 0, signedData, 0, signedData.length);
boolean valid = _context.dsa().verifySignature(rsig, signedData, _remoteIdentity.getSigningPublicKey()); boolean valid = _context.dsa().verifySignature(rsig, signedData, _remoteIdentity.getSigningPublicKey());
if (valid) if (valid) {
_context.netDb().store(_remoteIdentity.getHash(), peer); try {
_context.netDb().store(_remoteIdentity.getHash(), peer);
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.ERROR))
_log.error("Peer gave us invalid router info", iae);
valid = false;
}
}
return valid; return valid;
} }