diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index 7f45e0013..6c052ce2b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -221,7 +221,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL out.flush(); // make sure the data get though } } - out.flush(); + //out.flush(); // close() flushes } catch (SocketException ex) { // this *will* occur when the other threads closes the socket synchronized (finishLock) { @@ -248,11 +248,16 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL + from + " and " + to); } try { - out.close(); in.close(); } catch (IOException ex) { if (_log.shouldLog(Log.WARN)) - _log.warn(direction + ": Error closing streams", ex); + _log.warn(direction + ": Error closing input stream", ex); + } + try { + out.close(); + } catch (IOException ex) { + if (_log.shouldLog(Log.WARN)) + _log.warn(direction + ": Error closing output stream", ex); } synchronized (finishLock) { finished = true; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index e9f7086ae..ac9c28821 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -350,31 +350,18 @@ public class Connection { disconnect(cleanDisconnect, true); } void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) { - if (!_connected) return; - _connected = false; synchronized (_connectLock) { _connectLock.notifyAll(); } if (_log.shouldLog(Log.DEBUG)) _log.debug("Disconnecting " + toString(), new Exception("discon")); - if (cleanDisconnect) { + if (cleanDisconnect && _connected) { // send close packets and schedule stuff... _outputStream.closeInternal(); _inputStream.close(); } else { - doClose(); - boolean tagsCancelled = false; - synchronized (_outboundPackets) { - for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { - PacketLocal pl = (PacketLocal)iter.next(); - if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) ) - tagsCancelled = true; - pl.cancelled(); - } - _outboundPackets.clear(); - _outboundPackets.notifyAll(); - } - if (tagsCancelled) - _context.sessionKeyManager().failTags(_remotePeer.getPublicKey()); + if (_connected) + doClose(); + killOutstandingPackets(); } if (removeFromConMgr) { if (!_disconnectScheduled) { @@ -382,6 +369,7 @@ public class Connection { SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT); } } + _connected = false; } void disconnectComplete() { @@ -409,6 +397,10 @@ public class Connection { _connectionManager.removeConnection(this); } + killOutstandingPackets(); + } + + private void killOutstandingPackets() { boolean tagsCancelled = false; synchronized (_outboundPackets) { for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { @@ -422,7 +414,6 @@ public class Connection { } if (tagsCancelled) _context.sessionKeyManager().failTags(_remotePeer.getPublicKey()); - } private class DisconnectEvent implements SimpleTimer.TimedEvent { @@ -432,6 +423,7 @@ public class Connection { + Connection.this.toString()); } public void timeReached() { + killOutstandingPackets(); if (_log.shouldLog(Log.INFO)) _log.info("Connection disconnect timer complete, drop the con " + Connection.this.toString()); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index d09667773..f3ed77354 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -31,7 +31,12 @@ class ConnectionHandler { _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; } - public void setActive(boolean active) { _active = active; } + public void setActive(boolean active) { + synchronized (_synQueue) { + _active = active; + _synQueue.notifyAll(); // so we break from the accept() + } + } public boolean getActive() { return _active; } public void receiveNewSyn(Packet packet) { @@ -66,8 +71,17 @@ class ConnectionHandler { while (true) { if ( (timeoutMs > 0) && (expiration < _context.clock().now()) ) return null; - if (!_active) + if (!_active) { + // fail all the ones we had queued up + synchronized (_synQueue) { + for (int i = 0; i < _synQueue.size(); i++) { + Packet packet = (Packet)_synQueue.get(i); + sendReset(packet); + } + _synQueue.clear(); + } return null; + } Packet syn = null; synchronized (_synQueue) { diff --git a/build.xml b/build.xml index 81cce7c46..8c267ec7f 100644 --- a/build.xml +++ b/build.xml @@ -239,6 +239,8 @@ + + diff --git a/history.txt b/history.txt index ea6e6e6e5..2d801ea03 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,19 @@ -$Id: history.txt,v 1.86 2004/11/27 16:02:06 jrandom Exp $ +$Id: history.txt,v 1.87 2004/11/28 20:58:39 jrandom Exp $ + +2004-11-29 jrandom + * Reduced contention for local client delivery + * Drop the new code that munges the wrapper.config. Instead, updates that + need to change it will include their own wrapper.config in the + i2pupdate.zip, overwriting the existing file. If the file + "wrapper.config.updated" is included, it is deleted at first opportunity + and the router shut down, displaying a notice that the router must be + started again cleanly to allow the changes to the wrapper.config to take + effect. + * Properly stop accept()ing I2PSocket connections if we close down the + session (duh). + * Make sure we cancel any outstanding Packets in flight when a connection + is terminated (thanks susi!) + * Split up the I2PTunnel closing a little further. 2004-11-28 jrandom * Accept IP address detection changes with a 2-out-of-3 minimum. diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index 20908408f..aeef673d0 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -36,7 +36,6 @@ import net.i2p.data.i2np.TunnelMessage; import net.i2p.router.message.GarlicMessageHandler; import net.i2p.router.message.TunnelMessageHandler; import net.i2p.router.startup.StartupJob; -import net.i2p.router.startup.VerifyWrapperConfig; import net.i2p.stat.Rate; import net.i2p.stat.RateStat; import net.i2p.util.FileUtil; @@ -821,8 +820,9 @@ public class Router { } private static void verifyWrapperConfig() { - boolean updated = VerifyWrapperConfig.verifyConfig(); - if (updated) { + File cfgUpdated = new File("wrapper.config.updated"); + if (cfgUpdated.exists()) { + cfgUpdated.delete(); System.out.println("INFO: Wrapper config updated, but the service wrapper requires you to manually restart"); System.out.println("INFO: Shutting down the router - please rerun it!"); System.exit(EXIT_HARD); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index ce72f6f5f..721ad96ee 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.91 $ $Date: 2004/11/27 16:02:06 $"; + public final static String ID = "$Revision: 1.92 $ $Date: 2004/11/28 20:58:39 $"; public final static String VERSION = "0.4.2"; - public final static long BUILD = 4; + public final static long BUILD = 5; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 340f1b7b8..dd30a200f 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -55,7 +55,7 @@ public class ClientConnectionRunner { /** user's config */ private SessionConfig _config; /** static mapping of MessageId to Payload, storing messages for retrieval */ - private static Map _messages; + private Map _messages; /** lease set request state, or null if there is no request pending on at the moment */ private LeaseRequestState _leaseRequest; /** currently allocated leaseSet, or null if none is allocated */ @@ -227,7 +227,7 @@ public class ClientConnectionRunner { } void disconnectClient(String reason) { - _log.error("Disconnecting the client: " + reason, new Exception("Disconnecting!")); + _log.error("Disconnecting the client: " + reason); DisconnectMessage msg = new DisconnectMessage(); msg.setReason(reason); try { diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index 54bef14b6..719dccc06 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -145,15 +145,12 @@ public class ClientManager { if (runner != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Message " + msgId + " is targeting a local destination. distribute it as such"); - runner.receiveMessage(toDest, fromDest, payload); - if (fromDest != null) { - ClientConnectionRunner sender = getRunner(fromDest); - if (sender != null) { - sender.updateMessageDeliveryStatus(msgId, true); - } else { - _log.log(Log.CRIT, "Um, wtf, we're sending a local message, but we can't find who sent it?", new Exception("wtf")); - } + ClientConnectionRunner sender = getRunner(fromDest); + if (sender == null) { + // sender went away + return; } + _context.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId)); } else { // remote. w00t if (_log.shouldLog(Log.DEBUG)) @@ -174,6 +171,32 @@ public class ClientManager { } } + private class DistributeLocal extends JobImpl { + private Destination _toDest; + private ClientConnectionRunner _to; + private ClientConnectionRunner _from; + private Destination _fromDest; + private Payload _payload; + private MessageId _msgId; + + public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from, Destination fromDest, Payload payload, MessageId id) { + super(_context); + _toDest = toDest; + _to = to; + _from = from; + _fromDest = fromDest; + _payload = payload; + _msgId = id; + } + public String getName() { return "Distribute local message"; } + public void runJob() { + _to.receiveMessage(_toDest, _fromDest, _payload); + if (_from != null) { + _from.updateMessageDeliveryStatus(_msgId, true); + } + } + } + /** * Request that a particular client authorize the Leases contained in the diff --git a/router/java/src/net/i2p/router/startup/VerifyClasspath.java b/router/java/src/net/i2p/router/startup/VerifyClasspath.java deleted file mode 100644 index eb6a36d84..000000000 --- a/router/java/src/net/i2p/router/startup/VerifyClasspath.java +++ /dev/null @@ -1,87 +0,0 @@ -package net.i2p.router.startup; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; - -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.Properties; - -import net.i2p.data.DataHelper; - -/** - * Make sure that if there is a wrapper.config file, it includes - * all of the jar files necessary for the current build. - * HOLY CRAP THIS IS UGLY. - * - */ -public class VerifyClasspath { - private static final String NL = System.getProperty("line.separator"); - private static final Set _jars = new HashSet(); - - static { - _jars.add("lib/ant.jar"); - _jars.add("lib/heartbeat.jar"); - _jars.add("lib/i2p.jar"); - _jars.add("lib/i2ptunnel.jar"); - _jars.add("lib/jasper-compiler.jar"); - _jars.add("lib/jasper-runtime.jar"); - _jars.add("lib/javax.servlet.jar"); - _jars.add("lib/jnet.jar"); - _jars.add("lib/mstreaming.jar"); - _jars.add("lib/netmonitor.jar"); - _jars.add("lib/org.mortbay.jetty.jar"); - _jars.add("lib/router.jar"); - _jars.add("lib/routerconsole.jar"); - _jars.add("lib/sam.jar"); - _jars.add("lib/wrapper.jar"); - _jars.add("lib/xercesImpl.jar"); - _jars.add("lib/xml-apis.jar"); - _jars.add("lib/jbigi.jar"); - _jars.add("lib/systray.jar"); - _jars.add("lib/systray4j.jar"); - _jars.add("lib/streaming.jar"); - } - - /** - * update the wrapper.config - * - * @return true if the classpath was updated and a restart is - * required, false otherwise. - */ - public static boolean updateClasspath() { - Properties p = new Properties(); - File configFile = new File("wrapper.config"); - Set needed = new HashSet(_jars); - try { - DataHelper.loadProps(p, configFile); - Set toAdd = new HashSet(); - int entry = 1; - while (true) { - String value = p.getProperty("wrapper.java.classpath." + entry); - if (value == null) break; - needed.remove(value); - entry++; - } - if (needed.size() <= 0) { - // we have everything we need - return false; - } else { - // add on some new lines - FileWriter out = new FileWriter(configFile, true); - out.write(NL + "# Adding new libs as required by the update" + NL); - for (Iterator iter = needed.iterator(); iter.hasNext(); ) { - String name = (String)iter.next(); - out.write("wrapper.java.classpath." + entry + "=" + name + NL); - } - out.close(); - return true; - } - } catch (IOException ioe) { - ioe.printStackTrace(); - return false; - } - } -} diff --git a/router/java/src/net/i2p/router/startup/VerifyWrapperConfig.java b/router/java/src/net/i2p/router/startup/VerifyWrapperConfig.java deleted file mode 100644 index 5af7c2fc4..000000000 --- a/router/java/src/net/i2p/router/startup/VerifyWrapperConfig.java +++ /dev/null @@ -1,43 +0,0 @@ -package net.i2p.router.startup; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; -import net.i2p.data.DataHelper; - -/** - * Ugly code to make sure the service wrapper is configured - * properly - */ -public class VerifyWrapperConfig { - private static final String NL = System.getProperty("line.separator"); - - public static boolean verifyConfig() { - boolean cpUpdated = VerifyClasspath.updateClasspath(); - boolean pingUpdated = updatePing(); - return cpUpdated; // dont force the pingUpdated to cause a restart - } - - private static boolean updatePing() { - Properties p = new Properties(); - File configFile = new File("wrapper.config"); - try { - DataHelper.loadProps(p, configFile); - if (p.containsKey("wrapper.ping.interval")) - return false; - - FileWriter out = new FileWriter(configFile, true); - out.write(NL + "# Adding ping timeout as required by the update" + NL); - out.write("wrapper.ping.interval=600" + NL); - out.write("wrapper.ping.timeout=605" + NL); - out.close(); - return true; - } catch (IOException ioe) { - ioe.printStackTrace(); - return false; - } - } -}