diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 2454d5a77..1554d98aa 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -17,17 +17,20 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { private I2PAppContext _context; private Log _log; private Connection _connection; - private MessageOutputStream.WriteStatus _dummyStatus; + private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus(); public ConnectionDataReceiver(I2PAppContext ctx, Connection con) { _context = ctx; _log = ctx.logManager().getLog(ConnectionDataReceiver.class); _connection = con; - _dummyStatus = new DummyStatus(); } public boolean writeInProcess() { - return _connection.getUnackedPacketsSent() > 0; + Connection con = _connection; + if (con != null) + return con.getUnackedPacketsSent() > 0; + else + return false; } /** @@ -42,10 +45,12 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * delivery. */ public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) { + Connection con = _connection; + if (con == null) return _dummyStatus; boolean doSend = true; - if ( (size <= 0) && (_connection.getLastSendId() >= 0) ) { - if (_connection.getOutputStream().getClosed()) { - if (_connection.getCloseSentOn() < 0) { + if ( (size <= 0) && (con.getLastSendId() >= 0) ) { + if (con.getOutputStream().getClosed()) { + if (con.getCloseSentOn() < 0) { doSend = true; } else { // closed, no new data, and we've already sent a close packet @@ -57,16 +62,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { } } - if (_connection.getUnackedPacketsReceived() > 0) + if (con.getUnackedPacketsReceived() > 0) doSend = true; if (_log.shouldLog(Log.INFO) && !doSend) _log.info("writeData called: size="+size + " doSend=" + doSend - + " unackedReceived: " + _connection.getUnackedPacketsReceived() - + " con: " + _connection, new Exception("write called by")); + + " unackedReceived: " + con.getUnackedPacketsReceived() + + " con: " + con, new Exception("write called by")); if (doSend) { PacketLocal packet = send(buf, off, size); + if (packet == null) return _dummyStatus; + //dont wait for non-acks if ( (packet.getSequenceNum() > 0) || (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) return packet; @@ -85,7 +92,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * @param buf data to be sent - may be null * @param off offset into the buffer to start writing from * @param size how many bytes of the buffer to write (may be 0) - * @return the packet sent + * @return the packet sent, or null if the connection died */ public PacketLocal send(byte buf[], int off, int size) { return send(buf, off, size, false); @@ -99,10 +106,12 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * @return the packet sent */ public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { + Connection con = _connection; + if (con == null) return null; long before = System.currentTimeMillis(); - PacketLocal packet = buildPacket(buf, off, size, forceIncrement); + PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement); long built = System.currentTimeMillis(); - _connection.sendPacket(packet); + con.sendPacket(packet); long sent = System.currentTimeMillis(); if ( (built-before > 1000) && (_log.shouldLog(Log.WARN)) ) @@ -112,18 +121,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { return packet; } - private boolean isAckOnly(int size) { + private boolean isAckOnly(Connection con, int size) { boolean ackOnly = ( (size <= 0) && // no data - (_connection.getLastSendId() >= 0) && // not a SYN - ( (!_connection.getOutputStream().getClosed()) || // not a CLOSE - (_connection.getOutputStream().getClosed() && - _connection.getCloseSentOn() > 0) )); // or it is a dup CLOSE + (con.getLastSendId() >= 0) && // not a SYN + ( (!con.getOutputStream().getClosed()) || // not a CLOSE + (con.getOutputStream().getClosed() && + con.getCloseSentOn() > 0) )); // or it is a dup CLOSE return ackOnly; } - private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) { - boolean ackOnly = isAckOnly(size); - PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection); + private PacketLocal buildPacket(Connection con, byte buf[], int off, int size, boolean forceIncrement) { + boolean ackOnly = isAckOnly(con, size); + PacketLocal packet = new PacketLocal(_context, con.getRemotePeer(), con); byte data[] = new byte[size]; if (size > 0) System.arraycopy(buf, off, data, 0, size); @@ -131,36 +140,36 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { if (ackOnly && !forceIncrement) packet.setSequenceNum(0); else - packet.setSequenceNum(_connection.getNextOutboundPacketNum()); - packet.setSendStreamId(_connection.getSendStreamId()); - packet.setReceiveStreamId(_connection.getReceiveStreamId()); + packet.setSequenceNum(con.getNextOutboundPacketNum()); + packet.setSendStreamId(con.getSendStreamId()); + packet.setReceiveStreamId(con.getReceiveStreamId()); - _connection.getInputStream().updateAcks(packet); - packet.setOptionalDelay(_connection.getOptions().getChoke()); - packet.setOptionalMaxSize(_connection.getOptions().getMaxMessageSize()); - packet.setResendDelay(_connection.getOptions().getResendDelay()); + con.getInputStream().updateAcks(packet); + packet.setOptionalDelay(con.getOptions().getChoke()); + packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); + packet.setResendDelay(con.getOptions().getResendDelay()); - if (_connection.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) + if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, true); else packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, false); - packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED, _connection.getOptions().getRequireFullySigned()); + packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED, con.getOptions().getRequireFullySigned()); if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) { packet.setFlag(Packet.FLAG_SYNCHRONIZE); - packet.setOptionalFrom(_connection.getSession().getMyDestination()); + packet.setOptionalFrom(con.getSession().getMyDestination()); } // don't set the closed flag if this is a plain ACK and there are outstanding // packets sent, otherwise the other side could receive the CLOSE prematurely, // since this ACK could arrive before the unacked payload message. - if (_connection.getOutputStream().getClosed() && - ( (size > 0) || (_connection.getUnackedPacketsSent() <= 0) ) ) { + if (con.getOutputStream().getClosed() && + ( (size > 0) || (con.getUnackedPacketsSent() <= 0) ) ) { packet.setFlag(Packet.FLAG_CLOSE); - _connection.setCloseSentOn(_context.clock().now()); + con.setCloseSentOn(_context.clock().now()); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Closed is set for a new packet on " + _connection + ": " + packet); + _log.debug("Closed is set for a new packet on " + con + ": " + packet); } else { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet); diff --git a/core/java/src/net/i2p/client/naming/NamingService.java b/core/java/src/net/i2p/client/naming/NamingService.java index 9420b401b..053ce35eb 100644 --- a/core/java/src/net/i2p/client/naming/NamingService.java +++ b/core/java/src/net/i2p/client/naming/NamingService.java @@ -22,7 +22,8 @@ public abstract class NamingService { private final static Log _log = new Log(NamingService.class); protected I2PAppContext _context; - private static final String PROP_IMPL = "i2p.naming.impl"; + /** what classname should be used as the naming service impl? */ + public static final String PROP_IMPL = "i2p.naming.impl"; private static final String DEFAULT_IMPL = "net.i2p.client.naming.HostsTxtNamingService"; diff --git a/history.txt b/history.txt index 385e95f03..5aa05c97e 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,16 @@ -$Id: history.txt,v 1.115 2004/12/19 13:55:09 jrandom Exp $ +$Id: history.txt,v 1.116 2004/12/20 00:14:56 jrandom Exp $ + +2004-12-21 jrandom + * Cleaned up the postinstall/startup scripts a bit more to handle winME, + and added windows info to the headless docs. (thanks ardvark!) + * Fixed a harmless (yet NPE inspiring) race during the final shutdown of + a stream (thanks frosk!) + * Add a pair of new stats for monitoring tunnel participation - + tunnel.participatingBytesProcessed (total # bytes transferred) and + tunnel.participatingBytesProcessedActive (total # bytes transferred for + tunnels whose byte count exceed the 10m average). This should help + further monitor congestion issues. + * Made the NamingService factory property public (thanks susi!) 2004-12-20 jrandom * No longer do a blocking DNS lookup within the jobqueue (thanks mule!) diff --git a/install-headless.txt b/install-headless.txt index c5390baf7..d064047ed 100644 --- a/install-headless.txt +++ b/install-headless.txt @@ -1,4 +1,4 @@ -$Id: install-headless.txt,v 1.2 2004/09/02 01:54:37 jrandom Exp $ +$Id: install-headless.txt,v 1.3 2004/09/29 14:38:15 jrandom Exp $ Headless I2P installation instructions 1) tar xjf i2p.tar.bz2 (you've already done this) @@ -10,7 +10,8 @@ If you're having trouble, swing by http://forum.i2p.net/, check the website at http://www.i2p.net/, or get on irc://irc.freenode.net/#i2p To run I2P explicitly: - sh i2prouter start + (*nix): sh i2prouter start + (win*): I2Psvc.exe -c wrapper.config To stop the router (gracefully): lynx http://localhost:7657/configservice.jsp ("Shutdown gracefully") @@ -26,4 +27,4 @@ Supported JVMs: Linux: Latest available from http://java.sun.com/ (1.3+ supported) FreeBSD: /usr/ports/java/linux-sun-jdk1.4 various: http://www.kaffe.org/ using CVS HEAD as of Sept 1, 2004 - (or any subsequent releases) \ No newline at end of file + (or any subsequent releases) diff --git a/installer/resources/i2prouter.bat b/installer/resources/i2prouter.bat index 34fbe1699..66b5b4730 100644 --- a/installer/resources/i2prouter.bat +++ b/installer/resources/i2prouter.bat @@ -1,39 +1,14 @@ @echo off setlocal +set INSTALL_PATH="%1" rem rem Java Service Wrapper general startup script rem -rem -rem Resolve the real path of the Wrapper.exe -rem For non NT systems, the _REALPATH and _WRAPPER_CONF values -rem can be hard-coded below and the following test removed. -rem -if "%OS%"=="Windows_NT" goto nt -echo This script only works with NT-based versions of Windows. -goto :eof +set _WRAPPER_EXE=%INSTALL_PATH%I2Psvc.exe +set _WRAPPER_CONF="%INSTALL_PATH%wrapper.config" -:nt -rem -rem Find the application home. -rem -rem %~dp0 is location of current script under NT -set _REALPATH=%~dp0 -set _WRAPPER_EXE=%_REALPATH%I2Psvc.exe - -rem -rem Find the wrapper.conf -rem -:conf -set _WRAPPER_CONF="%~f1" -if not %_WRAPPER_CONF%=="" goto startup -set _WRAPPER_CONF="%_REALPATH%wrapper.config" - -rem -rem Start the Wrapper -rem -:startup "%_WRAPPER_EXE%" -c %_WRAPPER_CONF% if not errorlevel 1 goto :eof pause diff --git a/installer/resources/postinstall.bat b/installer/resources/postinstall.bat index 1fedb3c25..0b896025c 100644 --- a/installer/resources/postinstall.bat +++ b/installer/resources/postinstall.bat @@ -33,7 +33,7 @@ del /f /q "%INSTALL_PATH%postinstall.sh" :: del /f /q "%INSTALL_PATH%uninstall_i2p_service_unix" del /f /q "%INSTALL_PATH%icons\*.xpm" rmdir /q /s "%INSTALL_PATH%lib\wrapper" -start /b /i /d"%INSTALL_PATH%" i2prouter.bat +start /b /i /d"%INSTALL_PATH%" i2prouter.bat %INSTALL_PATH% ) else ( @@ -47,6 +47,6 @@ del "%INSTALL_PATH%postinstall.sh" del "%INSTALL_PATH%uninstall_i2p_service_winnt.bat" del "%INSTALL_PATH%icons\*.xpm" deltree /Y "%INSTALL_PATH%lib\wrapper" -start /M "%INSTALL_PATH%i2prouter.bat" +start /M "%INSTALL_PATH%i2prouter.bat" %INSTALL_PATH% ) diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java index e52adbbb5..8e90075d6 100644 --- a/router/java/src/net/i2p/router/RouterThrottleImpl.java +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -110,22 +110,7 @@ class RouterThrottleImpl implements RouterThrottle { return false; } - // ok, we're not hosed, but can we handle the bandwidth requirements - // of another tunnel? - rs = _context.statManager().getRate("tunnel.participatingMessagesProcessed"); - r = null; - if (rs != null) - r = rs.getRate(10*60*1000); - double msgsPerTunnel = (r != null ? r.getAverageValue() : 0); - r = null; - rs = _context.statManager().getRate("tunnel.relayMessageSize"); - if (rs != null) - r = rs.getRate(10*60*1000); - double bytesPerMsg = (r != null ? r.getAverageValue() : 0); - double bytesPerTunnel = msgsPerTunnel * bytesPerMsg; - int numTunnels = _context.tunnelManager().getParticipatingCount(); - double bytesAllocated = (numTunnels + 1) * bytesPerTunnel; if (_context.getProperty(Router.PROP_SHUTDOWN_IN_PROGRESS) != null) { if (_log.shouldLog(Log.WARN)) @@ -209,6 +194,14 @@ class RouterThrottleImpl implements RouterThrottle { // no default, ignore it } } + + // ok, we're not hosed, but can we handle the bandwidth requirements + // of another tunnel? + rs = _context.statManager().getRate("tunnel.participatingBytesProcessed"); + r = null; + if (rs != null) + r = rs.getRate(10*60*1000); + double bytesAllocated = r.getCurrentTotalValue(); if (!allowTunnel(bytesAllocated, numTunnels)) { _context.statManager().addRateData("router.throttleTunnelBandwidthExceeded", (long)bytesAllocated, 0); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 93e1cc5e8..3da7064c1 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.120 $ $Date: 2004/12/19 13:55:09 $"; + public final static String ID = "$Revision: 1.121 $ $Date: 2004/12/20 00:14:56 $"; public final static String VERSION = "0.4.2.4"; - public final static long BUILD = 5; + public final static long BUILD = 6; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index 0e632c145..8eb61a4f3 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -115,6 +115,8 @@ public class StatisticsManager implements Service { //includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 }); //includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 }); + includeRate("tunnel.participatingBytesProcessed", stats, new long[] { 10*60*1000 }); + includeRate("tunnel.participatingBytesProcessedActive", stats, new long[] { 10*60*1000 }); includeRate("tunnel.testSuccessTime", stats, new long[] { 60*60*1000l, 24*60*60*1000l }); //includeRate("tunnel.outboundMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 }); //includeRate("tunnel.inboundMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 }); diff --git a/router/java/src/net/i2p/router/TunnelInfo.java b/router/java/src/net/i2p/router/TunnelInfo.java index 9a303f183..a0d6ab3a3 100644 --- a/router/java/src/net/i2p/router/TunnelInfo.java +++ b/router/java/src/net/i2p/router/TunnelInfo.java @@ -56,6 +56,7 @@ public class TunnelInfo extends DataStructureImpl { private boolean _wasEverReady; private int _messagesProcessed; private int _tunnelFailures; + private long _bytesProcessed; public TunnelInfo(I2PAppContext context) { _context = context; @@ -79,6 +80,7 @@ public class TunnelInfo extends DataStructureImpl { _lastTested = -1; _messagesProcessed = 0; _tunnelFailures = 0; + _bytesProcessed = 0; } public TunnelId getTunnelId() { return _id; } @@ -182,7 +184,12 @@ public class TunnelInfo extends DataStructureImpl { return _messagesProcessed; } /** we have just processed a message for this tunnel */ - public void messageProcessed() { _messagesProcessed++; } + public void messageProcessed(int size) { + _messagesProcessed++; + _bytesProcessed += size; + } + /** how many bytes have been pumped through this tunnel in its lifetime? */ + public long getBytesProcessed() { return _bytesProcessed; } /** * the tunnel was (potentially) unable to pass a message through. diff --git a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java index 8129e8931..a58074349 100644 --- a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java @@ -240,7 +240,7 @@ public class HandleTunnelMessageJob extends JobImpl { if (info == null) return; - info.messageProcessed(); + info.messageProcessed(_message.getMessageSize()); //if ( (_message.getVerificationStructure() == null) && (info.getSigningKey() != null) ) { if (_message.getVerificationStructure() == null) { diff --git a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java index 6d434bf37..250bef9cf 100644 --- a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java @@ -121,7 +121,7 @@ public class SendTunnelMessageJob extends JobImpl { } } - info.messageProcessed(); + info.messageProcessed(_message.getMessageSize()); if (isEndpoint(info)) { if (_log.shouldLog(Log.INFO)) diff --git a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java index 1871e3e96..65b144de3 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java @@ -68,6 +68,8 @@ class TunnelPool { _context.statManager().createRateStat("tunnel.outboundMessagesProcessed", "How many messages does an inbound tunnel process in its lifetime?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("tunnel.participatingMessagesProcessed", "How many messages does an inbound tunnel process in its lifetime?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("tunnel.participatingMessagesProcessedActive", "How many messages beyond the average were processed in a more-than-average tunnel's lifetime?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("tunnel.participatingBytesProcessed", "How many bytes does an inbound tunnel process in its lifetime?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("tunnel.participatingBytesProcessedActive", "How many bytes beyond the average were processed in a more-than-average tunnel's lifetime?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _isLive = true; _persistenceHelper = new TunnelPoolPersistenceHelper(_context); @@ -662,6 +664,11 @@ class TunnelPool { numMsgs-lastAvg, info.getSettings().getExpiration() - info.getSettings().getCreated()); + long numBytes = info.getBytesProcessed(); + lastAvg = (long)_context.statManager().getRate("tunnel.participatingBytesProcessed").getRate(10*60*1000l).getAverageValue(); + _context.statManager().addRateData("tunnel.participatingBytesProcessed", numBytes, numMsgs); + if (numBytes > lastAvg) + _context.statManager().addRateData("tunnel.participatingBytesProcessedActive", numBytes-lastAvg, numMsgs); break; case TunnelId.TYPE_UNSPECIFIED: default: