From 61f217c6107c76f1f122e47564f93e79cbfabfc8 Mon Sep 17 00:00:00 2001 From: jrandom Date: Mon, 17 Jan 2005 08:15:00 +0000 Subject: [PATCH] 2005-01-17 jrandom * Added meaningful support for adjusting the preferred message size in the streaming lib by setting the i2p.streaming.maxMessageSize=32768 (or whatever). The other side will mimic a reduction (but never an increase). * Always make sure to use distinct ConnectionOption objects for each connection (duh) * Reduced the default ACK delay to 500ms on in the streaming lib * Only shrink the streaming window once per window * Don't bundle a new jetty.xml with updates * Catch another local routerInfo corruption issue on startup. --- .../streaming/I2PSocketOptionsImpl.java | 12 ++++++ .../net/i2p/client/streaming/Connection.java | 32 +++++++++++---- .../streaming/ConnectionDataReceiver.java | 2 +- .../client/streaming/ConnectionManager.java | 6 ++- .../client/streaming/ConnectionOptions.java | 39 +++++++++++++++++-- .../streaming/ConnectionPacketHandler.java | 20 +++++++--- .../streaming/I2PSocketManagerFull.java | 12 ++++-- .../client/streaming/MessageOutputStream.java | 30 ++++++++++++++ .../src/net/i2p/client/streaming/Packet.java | 2 +- .../i2p/client/streaming/PacketHandler.java | 10 ++--- .../net/i2p/client/streaming/PacketQueue.java | 4 +- .../net/i2p/client/streaming/PingTest.java | 2 +- build.xml | 1 - history.txt | 13 ++++++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../KademliaNetworkDatabaseFacade.java | 7 +++- 16 files changed, 161 insertions(+), 35 deletions(-) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java index 6eb405f550..2bc223682a 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java @@ -35,6 +35,18 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { init(opts); } + public void setProperties(Properties opts) { + if (opts == null) return; + if (opts.containsKey(PROP_BUFFER_SIZE)) + _maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); + if (opts.containsKey(PROP_CONNECT_TIMEOUT)) + _connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); + if (opts.containsKey(PROP_READ_TIMEOUT)) + _readTimeout = getInt(opts, PROP_READ_TIMEOUT, -1); + if (opts.containsKey(PROP_WRITE_TIMEOUT)) + _writeTimeout = getInt(opts, PROP_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT); + } + protected void init(Properties opts) { _maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); _connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 7beb188d59..3213fae39e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -61,6 +61,8 @@ public class Connection { private ActivityTimer _activityTimer; /** window size when we last saw congestion */ private int _lastCongestionSeenAt; + private long _lastCongestionTime; + private long _lastCongestionHighestUnacked; private boolean _ackSinceCongestion; /** Notify this on connection (or connection failure) */ private Object _connectLock; @@ -89,7 +91,7 @@ public class Connection { _log = ctx.logManager().getLog(Connection.class); _receiver = new ConnectionDataReceiver(ctx, this); _inputStream = new MessageInputStream(ctx); - _outputStream = new MessageOutputStream(ctx, _receiver); + _outputStream = new MessageOutputStream(ctx, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); _chooser = chooser; _outboundPackets = new TreeMap(); _outboundQueue = queue; @@ -105,6 +107,8 @@ public class Connection { _congestionWindowEnd = 0; _highestAckedThrough = -1; _lastCongestionSeenAt = MAX_WINDOW_SIZE; + _lastCongestionTime = -1; + _lastCongestionHighestUnacked = -1; _connectionManager = manager; _resetReceived = false; _connected = true; @@ -599,6 +603,8 @@ public class Connection { // dont set the size to (winSize >> 4). only set the if (_ackSinceCongestion) { _lastCongestionSeenAt = _options.getWindowSize(); + _lastCongestionTime = _context.clock().now(); + _lastCongestionHighestUnacked = _lastSendId; _ackSinceCongestion = false; } } @@ -813,14 +819,24 @@ public class Connection { _packet.setReceiveStreamId(_receiveStreamId); _packet.setSendStreamId(_sendStreamId); - // shrink the window int newWindowSize = getOptions().getWindowSize(); - congestionOccurred(); - _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime()); - newWindowSize /= 2; - if (newWindowSize <= 0) - newWindowSize = 1; - getOptions().setWindowSize(newWindowSize); + + if (_ackSinceCongestion) { + // only shrink the window once per window + if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) { + congestionOccurred(); + _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime()); + newWindowSize /= 2; + if (newWindowSize <= 0) + newWindowSize = 1; + + if (_log.shouldLog(Log.WARN)) + _log.warn("Congestion resending packet " + _packet.getSequenceNum() + ": new windowSize " + newWindowSize + + ") for " + Connection.this.toString()); + + getOptions().setWindowSize(newWindowSize); + } + } int numSends = _packet.getNumSends() + 1; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 4e95325cc4..404808aa8d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -146,7 +146,6 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { con.getInputStream().updateAcks(packet); packet.setOptionalDelay(con.getOptions().getChoke()); - packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); packet.setResendDelay(con.getOptions().getResendDelay()); if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) @@ -159,6 +158,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) { packet.setFlag(Packet.FLAG_SYNCHRONIZE); packet.setOptionalFrom(con.getSession().getMyDestination()); + packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); } // don't set the closed flag if this is a plain ACK and there are outstanding diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index d97c1a40c8..e1d59abce0 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -37,10 +37,11 @@ public class ConnectionManager { private Map _pendingPings; private boolean _allowIncoming; private int _maxConcurrentStreams; + private ConnectionOptions _defaultOptions; private volatile int _numWaiting; private Object _connectionLock; - public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent) { + public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { _context = context; _log = context.logManager().getLog(ConnectionManager.class); _connectionByInboundId = new HashMap(32); @@ -56,6 +57,7 @@ public class ConnectionManager { _outboundQueue = new PacketQueue(context, session, this); _allowIncoming = false; _maxConcurrentStreams = maxConcurrent; + _defaultOptions = defaultOptions; _numWaiting = 0; _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); @@ -103,7 +105,7 @@ public class ConnectionManager { * it, or null if the syn's streamId was already taken */ public Connection receiveConnection(Packet synPacket) { - Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler); + Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); byte receiveId[] = new byte[4]; _context.random().nextBytes(receiveId); boolean reject = false; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index df28a660bd..364260a350 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -9,7 +9,7 @@ import java.util.Properties; public class ConnectionOptions extends I2PSocketOptionsImpl { private int _connectDelay; private boolean _fullySigned; - private int _windowSize; + private volatile int _windowSize; private int _receiveWindow; private int _profile; private int _rtt; @@ -81,8 +81,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE)); setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000)); setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); - setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 5*1000)); - setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2*1000)); + setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 500)); + setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500)); setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5)); setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); @@ -93,6 +93,39 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); } + public void setProperties(Properties opts) { + super.setProperties(opts); + if (opts == null) return; + if (opts.containsKey(PROP_CONNECT_DELAY)) + setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); + if (opts.containsKey(PROP_PROFILE)) + setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); + if (opts.containsKey(PROP_MAX_MESSAGE_SIZE)) + setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE)); + if (opts.containsKey(PROP_INITIAL_RTT)) + setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000)); + if (opts.containsKey(PROP_INITIAL_RECEIVE_WINDOW)) + setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); + if (opts.containsKey(PROP_INITIAL_RESEND_DELAY)) + setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 500)); + if (opts.containsKey(PROP_INITIAL_ACK_DELAY)) + setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500)); + if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE)) + setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1)); + if (opts.containsKey(PROP_MAX_RESENDS)) + setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5)); + if (opts.containsKey(PROP_WRITE_TIMEOUT)) + setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) + setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); + if (opts.containsKey(PROP_INACTIVITY_ACTION)) + setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT)); + setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE); + + if (opts.containsKey(PROP_CONNECT_TIMEOUT)) + setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); + } + /** * how long will we wait after instantiating a new con * before actually attempting to connect. If this is diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 99d724d6d2..1f63786c30 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -49,7 +49,16 @@ public class ConnectionPacketHandler { } return; } - + + if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) { + if (packet.getOptionalMaxSize() < con.getOptions().getMaxMessageSize()) { + if (_log.shouldLog(Log.INFO)) + _log.info("Reducing our max message size to " + packet.getOptionalMaxSize() + + " from " + con.getOptions().getMaxMessageSize()); + con.getOptions().setMaxMessageSize(packet.getOptionalMaxSize()); + con.getOutputStream().setBufferSize(packet.getOptionalMaxSize()); + } + } con.packetReceived(); @@ -185,20 +194,21 @@ public class ConnectionPacketHandler { oldSize >>>= 1; if (oldSize <= 0) oldSize = 1; - con.getOptions().setWindowSize(oldSize); - if (_log.shouldLog(Log.DEBUG)) _log.debug("Congestion occurred - new windowSize " + oldSize + " congestionSeenAt: " + con.getLastCongestionSeenAt() + " (#resends: " + numResends + ") for " + con); + con.getOptions().setWindowSize(oldSize); + congested = true; } long lowest = con.getHighestAckedThrough(); if (lowest >= con.getCongestionWindowEnd()) { // new packet that ack'ed uncongested data, or an empty ack - int newWindowSize = con.getOptions().getWindowSize(); + int oldWindow = con.getOptions().getWindowSize(); + int newWindowSize = oldWindow; if ( (!congested) && (acked > 0) && (numResends <= 0) ) { if (newWindowSize > con.getLastCongestionSeenAt() / 2) { @@ -216,7 +226,7 @@ public class ConnectionPacketHandler { } if (_log.shouldLog(Log.DEBUG)) - _log.debug("New window size " + newWindowSize + " congestionSeenAt: " + _log.debug("New window size " + newWindowSize + "/" + oldWindow + " congestionSeenAt: " + con.getLastCongestionSeenAt() + " (#resends: " + numResends + ") for " + con); con.getOptions().setWindowSize(newWindowSize); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index daa8eb7f06..6d2adb4467 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -77,10 +77,10 @@ public class I2PSocketManagerFull implements I2PSocketManager { _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); _maxStreams = -1; } - _connectionManager = new ConnectionManager(_context, _session, _maxStreams); _name = name + " " + (++__managerId); _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; _defaultOptions = new ConnectionOptions(opts); + _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions); _serverSocket = new I2PServerSocketFull(this); if (_log.shouldLog(Log.INFO)) { @@ -91,7 +91,9 @@ public class I2PSocketManagerFull implements I2PSocketManager { public I2PSocketOptions buildOptions() { return buildOptions(null); } public I2PSocketOptions buildOptions(Properties opts) { - return new ConnectionOptions(opts); + ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions); + curOpts.setProperties(opts); + return curOpts; } public I2PSession getSession() { @@ -164,9 +166,13 @@ public class I2PSocketManagerFull implements I2PSocketManager { options = _defaultOptions; ConnectionOptions opts = null; if (options instanceof ConnectionOptions) - opts = (ConnectionOptions)options; + opts = new ConnectionOptions((ConnectionOptions)options); else opts = new ConnectionOptions(options); + + if (_log.shouldLog(Log.INFO)) + _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) + + " with options: " + opts); Connection con = _connectionManager.connect(peer, opts); if (con == null) throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 2e8a8778d3..aba5399a38 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -32,6 +32,12 @@ public class MessageOutputStream extends OutputStream { private long _lastBuffered; /** if we enqueue data but don't flush it in this period, flush it passively */ private int _passiveFlushDelay; + /** + * if we are changing the buffer size during operation, set this to the new + * buffer size, and next time we are flushing, update the _buf array to the new + * size + */ + private volatile int _nextBufferSize; public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE); @@ -48,6 +54,7 @@ public class MessageOutputStream extends OutputStream { _closed = false; _writeTimeout = -1; _passiveFlushDelay = 500; + _nextBufferSize = -1; _flusher = new Flusher(); if (_log.shouldLog(Log.DEBUG)) _log.debug("MessageOutputStream created"); @@ -55,6 +62,7 @@ public class MessageOutputStream extends OutputStream { public void setWriteTimeout(int ms) { _writeTimeout = ms; } public int getWriteTimeout() { return _writeTimeout; } + public void setBufferSize(int size) { _nextBufferSize = size; } public void write(byte b[]) throws IOException { write(b, 0, b.length); @@ -103,6 +111,8 @@ public class MessageOutputStream extends OutputStream { _valid = 0; throwAnyError(); _lastFlushed = _context.clock().now(); + + locked_updateBufferSize(); } } if (ws != null) { @@ -134,6 +144,22 @@ public class MessageOutputStream extends OutputStream { throwAnyError(); } + /** + * If the other side requested we shrink our buffer, do so. + * + */ + private final void locked_updateBufferSize() { + int size = _nextBufferSize; + if (size > 0) { + // update the buffer size to the requested amount + _dataCache.release(new ByteArray(_buf)); + _dataCache = ByteCache.getInstance(128, size); + ByteArray ba = _dataCache.acquire(); + _buf = ba.getData(); + _nextBufferSize = -1; + } + } + /** * Flush data that has been enqued but not flushed after a certain * period of inactivity @@ -180,6 +206,7 @@ public class MessageOutputStream extends OutputStream { _written += _valid; _valid = 0; _lastFlushed = _context.clock().now(); + locked_updateBufferSize(); _dataLock.notifyAll(); sent = true; } @@ -213,6 +240,7 @@ public class MessageOutputStream extends OutputStream { ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; + locked_updateBufferSize(); _lastFlushed = _context.clock().now(); _dataLock.notifyAll(); } @@ -251,6 +279,7 @@ public class MessageOutputStream extends OutputStream { ba = new ByteArray(_buf); _buf = null; _valid = 0; + locked_updateBufferSize(); } } if (ba != null) { @@ -314,6 +343,7 @@ public class MessageOutputStream extends OutputStream { ws = target.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; + locked_updateBufferSize(); _dataLock.notifyAll(); _lastFlushed = _context.clock().now(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index a70b2a9719..6416dfcbcc 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -563,7 +563,7 @@ public class Packet { if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY ").append(_optionDelay); if (isFlagSet(FLAG_ECHO)) buf.append(" ECHO"); if (isFlagSet(FLAG_FROM_INCLUDED)) buf.append(" FROM"); - if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS"); + if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS ").append(_optionMaxSize); if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE"); if (isFlagSet(FLAG_RESET)) buf.append(" RESET"); if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG"); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 913bd967b9..14fa66cd9c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -35,7 +35,7 @@ public class PacketHandler { // artificial choke: 2% random drop and a 0-30s // random tiered delay from 0-30s if (_context.random().nextInt(100) >= 95) { - displayPacket(packet, "DROP"); + displayPacket(packet, "DROP", null); return false; } else { // if (true) return true; // no lag, just drop @@ -97,18 +97,18 @@ public class PacketHandler { Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null); if (con != null) { receiveKnownCon(con, packet); - displayPacket(packet, "RECV"); + displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize()); } else { receiveUnknownCon(packet, sendId); - displayPacket(packet, "UNKN"); + displayPacket(packet, "UNKN", null); } } private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS"); - void displayPacket(Packet packet, String prefix) { + void displayPacket(Packet packet, String prefix, String suffix) { String msg = null; synchronized (_fmt) { - msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString(); + msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString() + (suffix != null ? " " + suffix : ""); } if (_log.shouldLog(Log.DEBUG)) System.out.println(msg); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 8002fe4f8d..51a5d6915d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -121,7 +121,9 @@ class PacketQueue { + " con: " + conStr; _log.debug(msg); } - _connectionManager.getPacketHandler().displayPacket(packet, "SEND"); + Connection c = packet.getConnection(); + String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() : null); + _connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix); } } diff --git a/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java b/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java index 1e0215a1c6..7ad5ba4420 100644 --- a/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java +++ b/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java @@ -19,7 +19,7 @@ public class PingTest { try { I2PAppContext context = I2PAppContext.getGlobalContext(); I2PSession session = createSession(); - ConnectionManager mgr = new ConnectionManager(context, session, -1); + ConnectionManager mgr = new ConnectionManager(context, session, -1, null); Log log = context.logManager().getLog(PingTest.class); for (int i = 0; i < 10; i++) { log.debug("ping " + i); diff --git a/build.xml b/build.xml index 91a5739cfd..9f23ad7da5 100644 --- a/build.xml +++ b/build.xml @@ -248,7 +248,6 @@ - diff --git a/history.txt b/history.txt index f0e8f01439..328a7d0225 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,15 @@ -$Id: history.txt,v 1.127 2005/01/15 16:03:15 jrandom Exp $ +$Id: history.txt,v 1.128 2005/01/15 18:16:13 jrandom Exp $ + +2005-01-17 jrandom + * Added meaningful support for adjusting the preferred message size in the + streaming lib by setting the i2p.streaming.maxMessageSize=32768 (or + whatever). The other side will mimic a reduction (but never an increase). + * Always make sure to use distinct ConnectionOption objects for each + connection (duh) + * Reduced the default ACK delay to 500ms on in the streaming lib + * Only shrink the streaming window once per window + * Don't bundle a new jetty.xml with updates + * Catch another local routerInfo corruption issue on startup. 2005-01-15 cervantes * Added support to the eepproxy for URLs such as diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index b64344d787..af0a0de620 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.132 $ $Date: 2005/01/15 16:03:14 $"; + public final static String ID = "$Revision: 1.133 $ $Date: 2005/01/15 18:16:12 $"; public final static String VERSION = "0.4.2.6"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index 3e9b89b278..1ac0bbe766 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -343,7 +343,12 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { // periodically update and resign the router's 'published date', which basically // serves as a version _context.jobQueue().addJob(new PublishLocalRouterInfoJob(_context)); - publish(ri); + try { + publish(ri); + } catch (IllegalArgumentException iae) { + _log.log(Log.CRIT, "Our local router info is b0rked, clearing from scratch", iae); + _context.router().rebuildNewIdentity(); + } } /**