diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index cbc0473414..e3db623cb4 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -233,6 +233,12 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa notifier.setName("Notifier " + _myDestination.calculateHash().toBase64().substring(0,4)); notifier.setDaemon(true); notifier.start(); + + if ( (_options != null) && + (I2PClient.PROP_RELIABILITY_GUARANTEED.equals(_options.getProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT))) ) { + if (_log.shouldLog(Log.ERROR)) + _log.error("I2CP guaranteed delivery mode has been removed, using best effort."); + } long startConnect = _context.clock().now(); try { @@ -322,18 +328,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa public abstract void receiveStatus(int msgId, long nonce, int status); - protected boolean isGuaranteed() { - if (_log.shouldLog(Log.DEBUG)) { - String str = _options.getProperty(I2PClient.PROP_RELIABILITY); - if (str == null) - _log.debug("reliability is not specified, fallback"); - else - _log.debug("reliability is specified: " + str); - } - String reliability = _options.getProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); - return I2PClient.PROP_RELIABILITY_GUARANTEED.equals(reliability); - } - protected static final Set createNewTags(int num) { Set tags = new HashSet(); for (int i = 0; i < num; i++) diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 1be9b887ee..2125279443 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -85,16 +85,6 @@ class I2PSessionImpl2 extends I2PSessionImpl { if (isClosed()) throw new I2PSessionException("Already closed"); if (SHOULD_COMPRESS) payload = DataHelper.compress(payload, offset, size); else throw new IllegalStateException("we need to update sendGuaranteed to support partial send"); - if (_log.shouldLog(Log.DEBUG)) _log.debug("message compressed"); - // we always send as guaranteed (so we get the session keys/tags acked), - // but only block until the appropriate event has been reached (guaranteed - // success or accepted). we may want to break this out into a seperate - // attribute, allowing both nonblocking sends and transparently managed keys, - // as well as the nonblocking sends with application managed keys. Later. - if (isGuaranteed() || false) { - //_log.error("sendGuaranteed"); - return sendGuaranteed(dest, payload, keyUsed, tagsSent); - } return sendBestEffort(dest, payload, keyUsed, tagsSent); } @@ -269,142 +259,6 @@ class I2PSessionImpl2 extends I2PSessionImpl { return found; } - private boolean sendGuaranteed(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent) - throws I2PSessionException { - SessionKey key = _context.sessionKeyManager().getCurrentKey(dest.getPublicKey()); - if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey()); - SessionTag tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key); - Set sentTags = null; - if (_context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key) < 10) { - sentTags = createNewTags(NUM_TAGS); - } else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 2 * 60 * 1000) { - // if we have > 10 tags, but they expire in under 30 seconds, we want more - sentTags = createNewTags(NUM_TAGS); - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding " + NUM_TAGS + " new ones"); - } - SessionKey newKey = null; - if (false) // rekey - newKey = _context.keyGenerator().generateSessionKey(); - - long nonce = _context.random().nextInt(Integer.MAX_VALUE); - MessageState state = new MessageState(_context, nonce, getPrefix()); - state.setKey(key); - state.setTags(sentTags); - state.setNewKey(newKey); - state.setTo(dest); - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Setting key = " + key); - - if (keyUsed != null) { - if (newKey != null) - keyUsed.setData(newKey.getData()); - else - keyUsed.setData(key.getData()); - } - if (tagsSent != null) { - if (sentTags != null) { - tagsSent.addAll(sentTags); - } - } - - long beforeSendingSync = _context.clock().now(); - long inSendingSync = 0; - synchronized (_sendingStates) { - inSendingSync = _context.clock().now(); - _sendingStates.add(state); - } - long afterSendingSync = _context.clock().now(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " - + state.getNonce() + " for guaranteed " - + " sync took " + (inSendingSync-beforeSendingSync) - + " add took " + (afterSendingSync-inSendingSync)); - _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); - long beforeWaitFor = _context.clock().now(); - if (isGuaranteed()) - state.waitFor(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS, - _context.clock().now() + SEND_TIMEOUT); - else - state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, - _context.clock().now() + SEND_TIMEOUT); - - long afterWaitFor = _context.clock().now(); - long inRemovingSync = 0; - synchronized (_sendingStates) { - inRemovingSync = _context.clock().now(); - _sendingStates.remove(state); - } - long afterRemovingSync = _context.clock().now(); - boolean guaranteed = isGuaranteed(); - boolean found = false; - boolean accepted = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED); - if (guaranteed) - found = state.received(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS); - else - found = accepted; - - if ((!accepted) || (state.getMessageId() == null)) { - if (_log.shouldLog(Log.CRIT)) - _log.log(Log.CRIT, getPrefix() + "State with nonce " + state.getNonce() - + " was not accepted? (no messageId!! found=" + found - + " msgId=" + state.getMessageId() - + ", sendingSync=" + (afterSendingSync-beforeSendingSync) - + ", sendMessage=" + (beforeWaitFor-afterSendingSync) - + ", waitFor=" + (afterWaitFor-beforeWaitFor) - + ", removingSync=" + (afterRemovingSync-afterWaitFor) - + ")"); - //if (true) - // throw new OutOfMemoryError("not really an OOM, but more of jr fucking shit up"); - if (guaranteed) - nackTags(state); - return false; - } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId().getMessageId() - + " / " + state.getNonce() + " found = " + found); - - // the 'found' value is only useful for mode=Guaranteed, as mode=BestEffort - // doesn't block - if (guaranteed) { - if (found) { - if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with " - + payload.length + " bytes"); - ackTags(state); - } else { - if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with " - + payload.length + " bytes"); - nackTags(state); - } - } else { - if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + "Message send enqueued after " + state.getElapsed() + "ms with " - + payload.length + " bytes"); - } - return found; - } - - private void ackTags(MessageState state) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "ack tags for msgId " + state.getMessageId() + " / " - + state.getNonce() + " key = " + state.getKey() + ", tags = " - + state.getTags()); - if ((state.getTags() != null) && (state.getTags().size() > 0)) { - if (state.getNewKey() == null) - _context.sessionKeyManager().tagsDelivered(state.getTo().getPublicKey(), state.getKey(), state.getTags()); - else - _context.sessionKeyManager().tagsDelivered(state.getTo().getPublicKey(), state.getNewKey(), state.getTags()); - } - } - - private void nackTags(MessageState state) { - if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + "nack tags for msgId " + state.getMessageId() + " / " + state.getNonce() - + " key = " + state.getKey()); - _context.sessionKeyManager().failTags(state.getTo().getPublicKey()); - } - public void receiveStatus(int msgId, long nonce, int status) { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce); MessageState state = null; diff --git a/core/java/src/net/i2p/client/MessageStatusMessageHandler.java b/core/java/src/net/i2p/client/MessageStatusMessageHandler.java index 41d59304b4..a18be3286a 100644 --- a/core/java/src/net/i2p/client/MessageStatusMessageHandler.java +++ b/core/java/src/net/i2p/client/MessageStatusMessageHandler.java @@ -28,9 +28,6 @@ class MessageStatusMessageHandler extends HandlerImpl { public void handleMessage(I2CPMessage message, I2PSessionImpl session) { boolean skipStatus = true; - if (I2PClient.PROP_RELIABILITY_GUARANTEED.equals(session.getOptions().getProperty(I2PClient.PROP_RELIABILITY, - I2PClient.PROP_RELIABILITY_BEST_EFFORT))) - skipStatus = false; if (_log.shouldLog(Log.DEBUG)) _log.debug("Handle message " + message); MessageStatusMessage msg = (MessageStatusMessage) message; diff --git a/history.txt b/history.txt index 533a10bd61..1fe77cd07c 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,16 @@ -$Id: history.txt,v 1.245 2005/09/12 19:12:04 jrandom Exp $ +$Id: history.txt,v 1.246 2005/09/12 20:12:44 jrandom Exp $ + +2005-09-12 jrandom + * Removed guaranteed delivery mode entirely (so existing i2phex clients + using it can get the benefits of mode=best_effort). Guaranteed delivery + is offered at the streaming lib level. + * Improve the peer selection code for peer testing, as everyone now + supports tests. + * Give the watchdog its fangs - if it detects obscene job lag or if + clients have been unable to get a leaseSet for more than 5 minutes, + restart the router. This was disabled a year ago due to spurious + restarts, and can be disabled by "watchdog.haltOnHang=false", but the + cause of the spurious restarts should be gone. 2005-09-12 jrandom * Bugfix for skewed store which could kill a UDP thread (causing complete diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 13d3ae9902..4384bda4db 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.232 $ $Date: 2005/09/12 19:12:00 $"; + public final static String ID = "$Revision: 1.233 $ $Date: 2005/09/12 20:12:43 $"; public final static String VERSION = "0.6.0.5"; - public final static long BUILD = 7; + public final static long BUILD = 8; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/RouterWatchdog.java b/router/java/src/net/i2p/router/RouterWatchdog.java index d1140d246b..eafdd46d11 100644 --- a/router/java/src/net/i2p/router/RouterWatchdog.java +++ b/router/java/src/net/i2p/router/RouterWatchdog.java @@ -47,7 +47,7 @@ class RouterWatchdog implements Runnable { } private boolean shutdownOnHang() { - return Boolean.valueOf(_context.getProperty("watchdog.haltOnHang", "false")).booleanValue(); + return Boolean.valueOf(_context.getProperty("watchdog.haltOnHang", "true")).booleanValue(); } private void dumpStatus() { diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 67bb89221c..3dac182678 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -170,7 +170,7 @@ public class PeerState { * of 608 */ private static final int DEFAULT_MTU = 608;//600; //1500; - private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY; + private static final int MIN_RTO = 800 + ACKSender.ACK_FREQUENCY; private static final int MAX_RTO = 3000; // 5000; public PeerState(I2PAppContext ctx) { diff --git a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java index 62ab59f662..eff77968e1 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java @@ -13,6 +13,7 @@ import net.i2p.router.CommSystemFacade; import net.i2p.router.RouterContext; import net.i2p.data.Base64; import net.i2p.data.DataHelper; +import net.i2p.data.Hash; import net.i2p.data.RouterInfo; import net.i2p.data.SessionKey; import net.i2p.util.SimpleTimer; @@ -400,17 +401,9 @@ class PeerTestManager { PeerState charlie = null; RouterInfo charlieInfo = null; if (state == null) { // pick a new charlie - for (int i = 0; i < 5; i++) { - charlie = _transport.getPeerState(UDPAddress.CAPACITY_TESTING); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Picking charlie as " + charlie + " for alice of " + from); - if ( (charlie != null) && (!DataHelper.eq(charlie.getRemoteHostId(), from)) ) { - charlieInfo = _context.netDb().lookupRouterInfoLocally(charlie.getRemotePeer()); - if (charlieInfo != null) - break; - } - charlie = null; - } + charlie = _transport.pickTestPeer(from); + if (charlie != null) + charlieInfo = _context.netDb().lookupRouterInfoLocally(charlie.getRemotePeer()); } else { charlie = _transport.getPeerState(new RemoteHostId(state.getCharlieIP().getAddress(), state.getCharliePort())); if (charlie != null) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 0d6f7f5929..2be15f3258 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -40,11 +40,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private Map _peersByIdent; /** RemoteHostId to PeerState */ private Map _peersByRemoteHost; - /** - * Array of list of PeerState instances, where each list contains peers with one - * of the given capacities (from 0-25, referencing 'A'-'Z'). - */ - private List _peersByCapacity[]; private PacketHandler _handler; private EstablishmentManager _establisher; private MessageQueue _outboundMessages; @@ -122,9 +117,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _log = ctx.logManager().getLog(UDPTransport.class); _peersByIdent = new HashMap(128); _peersByRemoteHost = new HashMap(128); - _peersByCapacity = new ArrayList['Z'-'A'+1]; - for (int i = 0; i < _peersByCapacity.length; i++) - _peersByCapacity[i] = new ArrayList(16); _endpoint = null; TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this); @@ -146,7 +138,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.droppedPeerInactive", "How long ago did we receive from a dropped peer (duration == session lifetime)", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("udp.peersByCapacity", "How many peers of the given capacity were available to pick between? (duration == (int)capacity)", "udp", new long[] { 1*60*1000, 5*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.statusOK", "How many times the peer test returned OK", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.statusDifferent", "How many times the peer test returned different IP/ports", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.statusReject", "How many times the peer test returned reject unsolicited", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); @@ -370,52 +361,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return _introManager.get(relayTag); } - /** - * if we haven't received anything in the last 5 minutes from a peer, don't - * trust its known capacities - */ - private static final int MAX_INACTIVITY_FOR_CAPACITY = 5*60*1000; - /** pick a random peer with the given capacity */ - public PeerState getPeerState(char capacity) { - long now = _context.clock().now(); - int index = _context.random().nextInt(1024); - int cap = capacity - 'A'; - if ( (cap < 0) || (cap >= _peersByCapacity.length) ) return null; - List peers = _peersByCapacity[cap]; - int size = 0; - int off = 0; - PeerState rv = null; - while (rv == null) { - synchronized (peers) { - size = peers.size(); - if (size > 0) { - index = (index + off) % size; - rv = (PeerState)peers.get(index); - } - } - if (rv == null) - break; - if (_context.shitlist().isShitlisted(rv.getRemotePeer())) - rv = null; - else if (now - rv.getLastReceiveTime() > MAX_INACTIVITY_FOR_CAPACITY) - rv = null; - else - break; - off++; - if (off >= size) - break; - } - _context.statManager().addRateData("udp.peersByCapacity", size, capacity); - return rv; - } - - private static final int MAX_PEERS_PER_CAPACITY = 64; - /** * Intercept RouterInfo entries received directly from a peer to inject them into * the PeersByCapacity listing. * */ + /* public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) { if (inMsg instanceof DatabaseStoreMessage) { @@ -461,6 +412,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } super.messageReceived(inMsg, remoteIdent, remoteIdentHash, msToReceive, bytesReceived); } + */ /** * add the peer info, returning true if it went in properly, false if @@ -618,6 +570,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority * */ private void dropPeerCapacities(PeerState peer) { + /* RouterInfo info = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer()); if (info != null) { String capacities = info.getOptions().getProperty(UDPAddress.PROP_CAPACITY); @@ -634,6 +587,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } } + */ } int send(UDPPacket packet) { @@ -1209,6 +1163,21 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _testEvent.runTest(); } + public PeerState pickTestPeer(RemoteHostId dontInclude) { + List peers = null; + synchronized (_peersByIdent) { + peers = new ArrayList(_peersByIdent.values()); + } + Collections.shuffle(peers, _context.random()); + for (int i = 0; i < peers.size(); i++) { + PeerState peer = (PeerState)peers.get(i); + if ( (dontInclude != null) && (dontInclude.equals(peer.getRemoteHostId())) ) + continue; + return peer; + } + return null; + } + private static final String PROP_SHOULD_TEST = "i2np.udp.shouldTest"; private boolean shouldTest() { @@ -1237,16 +1206,18 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } private void runTest() { - PeerState bob = getPeerState(UDPAddress.CAPACITY_TESTING); + PeerState bob = pickTestPeer(null); if (bob != null) { if (_log.shouldLog(Log.INFO)) _log.info("Running periodic test with bob = " + bob); _testManager.runTest(bob.getRemoteIPAddress(), bob.getRemotePort(), bob.getCurrentCipherKey(), bob.getCurrentMACKey()); - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("Unable to run a periodic test, as there are no peers with the capacity required"); + _lastTested = _context.clock().now(); + _forceRun = false; + return; } - _lastTested = _context.clock().now(); + + if (_log.shouldLog(Log.ERROR)) + _log.error("Unable to run a periodic test, as there are no peers with the capacity required"); _forceRun = false; }