2005-08-24 jrandom

* Catch errors with corrupt tunnel messages more gracefully (no need to
      kill the thread and cause an OOM...)
    * Don't skip shitlisted peers for netDb store messages, as they aren't
      necessarily shitlisted by other people (though they probably are).
    * Adjust the netDb store per-peer timeout based on each particular peer's
      profile (timeout = 4x their average netDb store response time)
    * Don't republish leaseSets to *failed* peers - send them to peers who
      replied but just didn't know the value.
    * Set a 5 second timeout on the I2PTunnelHTTPServer reading the client's
      HTTP headers, rather than blocking indefinitely.  HTTP headers should be
      sent entirely within the first streaming packet anyway, so this won't be
      a problem.
    * Don't use the I2PTunnel*Server handler thread pool by default, as it may
      prevent any clients from accessing the server if the handlers get
      blocked by the streaming lib or other issues.
    * Don't overwrite a known status (OK/ERR-Reject/ERR-SymmetricNAT) with
      Unknown.
This commit is contained in:
jrandom
2005-08-24 22:55:25 +00:00
committed by zzz
parent 5ec6dca64d
commit 346faa3de2
14 changed files with 204 additions and 80 deletions

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.217 $ $Date: 2005/08/21 13:39:05 $";
public final static String ID = "$Revision: 1.218 $ $Date: 2005/08/23 16:25:49 $";
public final static String VERSION = "0.6.0.3";
public final static long BUILD = 1;
public final static long BUILD = 2;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -38,6 +38,7 @@ import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.DatabaseLookupMessageHandler;
import net.i2p.router.networkdb.DatabaseStoreMessageHandler;
import net.i2p.router.networkdb.PublishLocalRouterInfoJob;
import net.i2p.router.peermanager.PeerProfile;
import net.i2p.util.Log;
/**
@ -804,6 +805,20 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
}
return routers;
}
/** smallest allowed period */
private static final int MIN_PER_PEER_TIMEOUT = 1*1000;
private static final int MAX_PER_PEER_TIMEOUT = 5*1000;
public int getPeerTimeout(Hash peer) {
PeerProfile prof = _context.profileOrganizer().getProfile(peer);
double responseTime = prof.getDbResponseTime().getLifetimeAverageValue();
if (responseTime < MIN_PER_PEER_TIMEOUT)
responseTime = MIN_PER_PEER_TIMEOUT;
else if (responseTime > MAX_PER_PEER_TIMEOUT)
responseTime = MAX_PER_PEER_TIMEOUT;
return 4 * (int)responseTime; // give it up to 4x the average response time
}
public void renderStatusHTML(Writer out) throws IOException {
StringBuffer buf = new StringBuffer(10*1024);

View File

@ -135,10 +135,8 @@ class SearchJob extends JobImpl {
protected int getPerPeerTimeoutMs() {
int rv = -1;
RateStat rs = getContext().statManager().getRate("netDb.successTime");
if (rs != null) {
Rate r = rs.getRate(rs.getPeriods()[0]);
rv = (int)r.getLifetimeAverageValue();
}
if (rs != null)
rv = (int)rs.getLifetimeAverageValue();
rv <<= 1; // double it to give some leeway. (bah, too lazy to record stdev)
if ( (rv <= 0) || (rv > PER_PEER_TIMEOUT) )
@ -344,7 +342,8 @@ class SearchJob extends JobImpl {
// return;
//}
long expiration = getContext().clock().now() + getPerPeerTimeoutMs();
int timeout = _facade.getPeerTimeout(router.getIdentity().getHash());
long expiration = getContext().clock().now() + timeout;
DatabaseLookupMessage msg = buildMessage(inTunnelId, inTunnel.getPeer(0), expiration);
@ -366,13 +365,14 @@ class SearchJob extends JobImpl {
SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state);
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), getPerPeerTimeoutMs());
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout);
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, router.getIdentity().getHash());
}
/** we're searching for a router, so we can just send direct */
protected void sendRouterSearch(RouterInfo router) {
long expiration = getContext().clock().now() + getPerPeerTimeoutMs();
int timeout = _facade.getPeerTimeout(router.getIdentity().getHash());
long expiration = getContext().clock().now() + timeout;
DatabaseLookupMessage msg = buildMessage(expiration);
@ -383,7 +383,7 @@ class SearchJob extends JobImpl {
SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state);
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(),
reply, new FailedJob(getContext(), router), sel, getPerPeerTimeoutMs(), SEARCH_PRIORITY);
reply, new FailedJob(getContext(), router), sel, timeout, SEARCH_PRIORITY);
getContext().jobQueue().addJob(j);
}
@ -662,7 +662,7 @@ class SearchJob extends JobImpl {
_state.getSuccessful()));
}
} else {
Set sendTo = _state.getFailed();
Set sendTo = _state.getRepliedPeers(); // _state.getFailed();
sendTo.addAll(_state.getPending());
int numSent = 0;
for (Iterator iter = sendTo.iterator(); iter.hasNext(); ) {

View File

@ -22,6 +22,7 @@ class SearchState {
private HashSet _attemptedPeers;
private HashSet _failedPeers;
private HashSet _successfulPeers;
private HashSet _repliedPeers;
private Hash _searchKey;
private volatile long _completed;
private volatile long _started;
@ -34,6 +35,7 @@ class SearchState {
_failedPeers = new HashSet(16);
_successfulPeers = new HashSet(16);
_pendingPeerTimes = new HashMap(16);
_repliedPeers = new HashSet(16);
_completed = -1;
_started = _context.clock().now();
}
@ -120,6 +122,9 @@ class SearchState {
/** how long did it take to get the reply, or -1 if we dont know */
public long replyFound(Hash peer) {
synchronized (_repliedPeers) {
_repliedPeers.add(peer);
}
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);
Long when = (Long)_pendingPeerTimes.remove(peer);
@ -130,6 +135,8 @@ class SearchState {
}
}
public Set getRepliedPeers() { synchronized (_repliedPeers) { return (Set)_repliedPeers.clone(); } }
public void replyTimeout(Hash peer) {
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);

View File

@ -24,6 +24,7 @@ import net.i2p.router.JobImpl;
import net.i2p.router.ReplyJob;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.router.peermanager.PeerProfile;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
@ -38,7 +39,7 @@ class StoreJob extends JobImpl {
private long _expiration;
private PeerSelector _peerSelector;
private final static int PARALLELIZATION = 6; // how many sent at a time
private final static int PARALLELIZATION = 3; // how many sent at a time
private final static int REDUNDANCY = 6; // we want the data sent to 6 peers
/**
* additionally send to 1 outlier(s), in case all of the routers chosen in our
@ -52,11 +53,6 @@ class StoreJob extends JobImpl {
private final static int EXPLORATORY_REDUNDANCY = 1;
private final static int STORE_PRIORITY = 100;
/** default period we allow for an ACK to take after a store */
private final static int PER_PEER_TIMEOUT = 5*1000;
/** smallest allowed period */
private static final int MIN_PER_PEER_TIMEOUT = 1*1000;
/**
* Create a new search for the routingKey specified
*
@ -157,12 +153,22 @@ class StoreJob extends JobImpl {
_log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds);
_state.addSkipped(peer);
} else {
if (getContext().shitlist().isShitlisted(((RouterInfo)ds).getIdentity().calculateHash())) {
_state.addSkipped(peer);
} else {
int peerTimeout = _facade.getPeerTimeout(peer);
//RateStat failing = prof.getDBHistory().getFailedLookupRate();
//Rate failed = failing.getRate(60*60*1000);
//if (failed.getCurrentEventCount() + failed.getLastEventCount() > avg) {
// _state.addSkipped(peer);
//}
// we don't want to filter out peers based on our local shitlist, as that opens an avenue for
// manipulation (since a peer can get us to shitlist them by, well, being shitty, and that
// in turn would let them assume that a netDb store received didn't come from us)
//if (getContext().shitlist().isShitlisted(((RouterInfo)ds).getIdentity().calculateHash())) {
// _state.addSkipped(peer);
//} else {
_state.addPending(peer);
sendStore((RouterInfo)ds);
}
sendStore((RouterInfo)ds, peerTimeout);
//}
}
}
}
@ -189,7 +195,7 @@ class StoreJob extends JobImpl {
* DeliveryStatusMessage so we know it got there
*
*/
private void sendStore(RouterInfo router) {
private void sendStore(RouterInfo router, int responseTime) {
DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext());
msg.setKey(_state.getTarget());
if (_state.getData() instanceof RouterInfo)
@ -210,7 +216,7 @@ class StoreJob extends JobImpl {
// _log.debug(getJobId() + ": Send store to " + router.getIdentity().getHash().toBase64());
}
sendStore(msg, router, getContext().clock().now() + getPerPeerTimeoutMs());
sendStore(msg, router, getContext().clock().now() + responseTime);
}
private void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
@ -257,7 +263,7 @@ class StoreJob extends JobImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + msg);
getContext().messageRegistry().registerPending(selector, onReply, onFail, getPerPeerTimeoutMs());
getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now()));
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, peer.getIdentity().getHash());
} else {
if (_log.shouldLog(Log.ERROR))
@ -360,27 +366,4 @@ class StoreJob extends JobImpl {
_state.complete(true);
getContext().statManager().addRateData("netDb.storeFailedPeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted());
}
/**
* Let each peer take up to the average successful search RTT
*
*/
private int getPerPeerTimeoutMs() {
int rv = -1;
RateStat rs = getContext().statManager().getRate("netDb.ackTime");
if (rs != null) {
Rate r = rs.getRate(rs.getPeriods()[0]);
rv = (int)r.getLifetimeAverageValue();
}
rv <<= 1; // double it to give some leeway. (bah, too lazy to record stdev)
if (rv <= 0)
return PER_PEER_TIMEOUT;
else if (rv < MIN_PER_PEER_TIMEOUT)
return MIN_PER_PEER_TIMEOUT;
else if (rv > PER_PEER_TIMEOUT)
return PER_PEER_TIMEOUT;
else
return rv;
}
}

View File

@ -352,8 +352,8 @@ class PeerTestManager {
PeerState bob = _transport.getPeerState(from);
if (bob == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Received from bob (" + from + ") who hasn't established a session with us, refusing to help him test " + aliceIP +":" + alicePort);
if (_log.shouldLog(Log.WARN))
_log.warn("Received from bob (" + from + ") who hasn't established a session with us, refusing to help him test " + aliceIP +":" + alicePort);
return;
} else {
state.setBobCipherKey(bob.getCurrentCipherKey());

View File

@ -1080,13 +1080,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
case CommSystemFacade.STATUS_UNKNOWN:
default:
_context.statManager().addRateData("udp.statusUnknown", 1, 0);
if (now - _reachabilityStatusLastUpdated < STATUS_GRACE_PERIOD) {
_testEvent.forceRun();
SimpleTimer.getInstance().addEvent(_testEvent, 5*1000);
} else {
_reachabilityStatus = status;
_reachabilityStatusLastUpdated = now;
}
//if (now - _reachabilityStatusLastUpdated < STATUS_GRACE_PERIOD) {
// _testEvent.forceRun();
// SimpleTimer.getInstance().addEvent(_testEvent, 5*1000);
//} else {
// _reachabilityStatus = status;
// _reachabilityStatusLastUpdated = now;
//}
break;
}
}

View File

@ -82,8 +82,14 @@ public class FragmentHandler {
//_log.debug("fragments: " + Base64.encode(preprocessed, offset, preprocessed.length-offset));
}
try {
while (offset < length)
offset = receiveFragment(preprocessed, offset, length);
while (offset < length) {
int off = receiveFragment(preprocessed, offset, length);
if (off < 0) {
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
return;
}
offset = off;
}
} catch (RuntimeException e) {
if (_log.shouldLog(Log.ERROR))
_log.error("Corrupt fragment received: offset = " + offset, e);
@ -253,7 +259,8 @@ public class FragmentHandler {
msg = new FragmentedMessage(_context);
}
msg.receive(messageId, preprocessed, offset, size, !fragmented, router, tunnelId);
boolean ok = msg.receive(messageId, preprocessed, offset, size, !fragmented, router, tunnelId);
if (!ok) return -1;
if (msg.isComplete()) {
if (fragmented) {
synchronized (_fragmentedMessages) {
@ -315,7 +322,8 @@ public class FragmentHandler {
}
}
msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast);
boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast);
if (!ok) return -1;
if (msg.isComplete()) {
synchronized (_fragmentedMessages) {

View File

@ -65,11 +65,27 @@ public class FragmentedMessage {
* @param length how much past the offset should we snag?
* @param isLast is this the last fragment in the message?
*/
public void receive(long messageId, int fragmentNum, byte payload[], int offset, int length, boolean isLast) {
if (fragmentNum < 0) throw new RuntimeException("Fragment # == " + fragmentNum + " for messageId " + messageId);
if (payload == null) throw new RuntimeException("Payload is null for messageId " + messageId);
if (length <= 0) throw new RuntimeException("Length is impossible (" + length + ") for messageId " + messageId);
if (offset + length > payload.length) throw new RuntimeException("Length is impossible (" + length + "/" + offset + " out of " + payload.length + ") for messageId " + messageId);
public boolean receive(long messageId, int fragmentNum, byte payload[], int offset, int length, boolean isLast) {
if (fragmentNum < 0) {
if (_log.shouldLog(Log.ERROR))
_log.error("Fragment # == " + fragmentNum + " for messageId " + messageId);
return false;
}
if (payload == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Payload is null for messageId " + messageId);
return false;
}
if (length <= 0) {
if (_log.shouldLog(Log.ERROR))
_log.error("Length is impossible (" + length + ") for messageId " + messageId);
return false;
}
if (offset + length > payload.length) {
if (_log.shouldLog(Log.ERROR))
_log.error("Length is impossible (" + length + "/" + offset + " out of " + payload.length + ") for messageId " + messageId);
return false;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive message " + messageId + " fragment " + fragmentNum + " with " + length + " bytes (last? " + isLast + ") offset = " + offset);
_messageId = messageId;
@ -87,8 +103,12 @@ public class FragmentedMessage {
_lastReceived = _lastReceived || isLast;
if (fragmentNum > _highFragmentNum)
_highFragmentNum = fragmentNum;
if (isLast && fragmentNum <= 0)
throw new RuntimeException("hmm, isLast and fragmentNum=" + fragmentNum + " for message " + messageId);
if (isLast && fragmentNum <= 0) {
if (_log.shouldLog(Log.ERROR))
_log.error("hmm, isLast and fragmentNum=" + fragmentNum + " for message " + messageId);
return false;
}
return true;
}
/**
@ -103,10 +123,22 @@ public class FragmentedMessage {
* @param toRouter what router is this destined for (may be null)
* @param toTunnel what tunnel is this destined for (may be null)
*/
public void receive(long messageId, byte payload[], int offset, int length, boolean isLast, Hash toRouter, TunnelId toTunnel) {
if (payload == null) throw new RuntimeException("Payload is null for messageId " + messageId);
if (length <= 0) throw new RuntimeException("Length is impossible (" + length + ") for messageId " + messageId);
if (offset + length > payload.length) throw new RuntimeException("Length is impossible (" + length + "/" + offset + " out of " + payload.length + ") for messageId " + messageId);
public boolean receive(long messageId, byte payload[], int offset, int length, boolean isLast, Hash toRouter, TunnelId toTunnel) {
if (payload == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Payload is null for messageId " + messageId);
return false;
}
if (length <= 0) {
if (_log.shouldLog(Log.ERROR))
_log.error("Length is impossible (" + length + ") for messageId " + messageId);
return false;
}
if (offset + length > payload.length) {
if (_log.shouldLog(Log.ERROR))
_log.error("Length is impossible (" + length + "/" + offset + " out of " + payload.length + ") for messageId " + messageId);
return false;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive message " + messageId + " with " + length + " bytes (last? " + isLast + ") targetting " + toRouter + " / " + toTunnel + " offset=" + offset);
_messageId = messageId;
@ -124,6 +156,7 @@ public class FragmentedMessage {
_toTunnel = toTunnel;
if (_highFragmentNum < 0)
_highFragmentNum = 0;
return true;
}
public long getMessageId() { return _messageId; }