diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java
index 6eb405f550..2bc223682a 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java
@@ -35,6 +35,18 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
init(opts);
}
+ public void setProperties(Properties opts) {
+ if (opts == null) return;
+ if (opts.containsKey(PROP_BUFFER_SIZE))
+ _maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
+ if (opts.containsKey(PROP_CONNECT_TIMEOUT))
+ _connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
+ if (opts.containsKey(PROP_READ_TIMEOUT))
+ _readTimeout = getInt(opts, PROP_READ_TIMEOUT, -1);
+ if (opts.containsKey(PROP_WRITE_TIMEOUT))
+ _writeTimeout = getInt(opts, PROP_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT);
+ }
+
protected void init(Properties opts) {
_maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
_connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index 7beb188d59..3213fae39e 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -61,6 +61,8 @@ public class Connection {
private ActivityTimer _activityTimer;
/** window size when we last saw congestion */
private int _lastCongestionSeenAt;
+ private long _lastCongestionTime;
+ private long _lastCongestionHighestUnacked;
private boolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */
private Object _connectLock;
@@ -89,7 +91,7 @@ public class Connection {
_log = ctx.logManager().getLog(Connection.class);
_receiver = new ConnectionDataReceiver(ctx, this);
_inputStream = new MessageInputStream(ctx);
- _outputStream = new MessageOutputStream(ctx, _receiver);
+ _outputStream = new MessageOutputStream(ctx, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize()));
_chooser = chooser;
_outboundPackets = new TreeMap();
_outboundQueue = queue;
@@ -105,6 +107,8 @@ public class Connection {
_congestionWindowEnd = 0;
_highestAckedThrough = -1;
_lastCongestionSeenAt = MAX_WINDOW_SIZE;
+ _lastCongestionTime = -1;
+ _lastCongestionHighestUnacked = -1;
_connectionManager = manager;
_resetReceived = false;
_connected = true;
@@ -599,6 +603,8 @@ public class Connection {
// dont set the size to (winSize >> 4). only set the
if (_ackSinceCongestion) {
_lastCongestionSeenAt = _options.getWindowSize();
+ _lastCongestionTime = _context.clock().now();
+ _lastCongestionHighestUnacked = _lastSendId;
_ackSinceCongestion = false;
}
}
@@ -813,14 +819,24 @@ public class Connection {
_packet.setReceiveStreamId(_receiveStreamId);
_packet.setSendStreamId(_sendStreamId);
- // shrink the window
int newWindowSize = getOptions().getWindowSize();
- congestionOccurred();
- _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
- newWindowSize /= 2;
- if (newWindowSize <= 0)
- newWindowSize = 1;
- getOptions().setWindowSize(newWindowSize);
+
+ if (_ackSinceCongestion) {
+ // only shrink the window once per window
+ if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
+ congestionOccurred();
+ _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
+ newWindowSize /= 2;
+ if (newWindowSize <= 0)
+ newWindowSize = 1;
+
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Congestion resending packet " + _packet.getSequenceNum() + ": new windowSize " + newWindowSize
+ + ") for " + Connection.this.toString());
+
+ getOptions().setWindowSize(newWindowSize);
+ }
+ }
int numSends = _packet.getNumSends() + 1;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
index 4e95325cc4..404808aa8d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
@@ -146,7 +146,6 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
con.getInputStream().updateAcks(packet);
packet.setOptionalDelay(con.getOptions().getChoke());
- packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
packet.setResendDelay(con.getOptions().getResendDelay());
if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE)
@@ -159,6 +158,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) {
packet.setFlag(Packet.FLAG_SYNCHRONIZE);
packet.setOptionalFrom(con.getSession().getMyDestination());
+ packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
}
// don't set the closed flag if this is a plain ACK and there are outstanding
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
index d97c1a40c8..e1d59abce0 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
@@ -37,10 +37,11 @@ public class ConnectionManager {
private Map _pendingPings;
private boolean _allowIncoming;
private int _maxConcurrentStreams;
+ private ConnectionOptions _defaultOptions;
private volatile int _numWaiting;
private Object _connectionLock;
- public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent) {
+ public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
_context = context;
_log = context.logManager().getLog(ConnectionManager.class);
_connectionByInboundId = new HashMap(32);
@@ -56,6 +57,7 @@ public class ConnectionManager {
_outboundQueue = new PacketQueue(context, session, this);
_allowIncoming = false;
_maxConcurrentStreams = maxConcurrent;
+ _defaultOptions = defaultOptions;
_numWaiting = 0;
_context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
@@ -103,7 +105,7 @@ public class ConnectionManager {
* it, or null if the syn's streamId was already taken
*/
public Connection receiveConnection(Packet synPacket) {
- Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler);
+ Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
byte receiveId[] = new byte[4];
_context.random().nextBytes(receiveId);
boolean reject = false;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
index df28a660bd..364260a350 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
@@ -9,7 +9,7 @@ import java.util.Properties;
public class ConnectionOptions extends I2PSocketOptionsImpl {
private int _connectDelay;
private boolean _fullySigned;
- private int _windowSize;
+ private volatile int _windowSize;
private int _receiveWindow;
private int _profile;
private int _rtt;
@@ -81,8 +81,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE));
setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000));
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
- setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 5*1000));
- setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2*1000));
+ setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 500));
+ setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
@@ -93,6 +93,39 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
}
+ public void setProperties(Properties opts) {
+ super.setProperties(opts);
+ if (opts == null) return;
+ if (opts.containsKey(PROP_CONNECT_DELAY))
+ setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
+ if (opts.containsKey(PROP_PROFILE))
+ setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));
+ if (opts.containsKey(PROP_MAX_MESSAGE_SIZE))
+ setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE));
+ if (opts.containsKey(PROP_INITIAL_RTT))
+ setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000));
+ if (opts.containsKey(PROP_INITIAL_RECEIVE_WINDOW))
+ setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
+ if (opts.containsKey(PROP_INITIAL_RESEND_DELAY))
+ setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 500));
+ if (opts.containsKey(PROP_INITIAL_ACK_DELAY))
+ setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
+ if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
+ setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
+ if (opts.containsKey(PROP_MAX_RESENDS))
+ setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
+ if (opts.containsKey(PROP_WRITE_TIMEOUT))
+ setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
+ if (opts.containsKey(PROP_INACTIVITY_TIMEOUT))
+ setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
+ if (opts.containsKey(PROP_INACTIVITY_ACTION))
+ setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT));
+ setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE);
+
+ if (opts.containsKey(PROP_CONNECT_TIMEOUT))
+ setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
+ }
+
/**
* how long will we wait after instantiating a new con
* before actually attempting to connect. If this is
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index 99d724d6d2..1f63786c30 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -49,7 +49,16 @@ public class ConnectionPacketHandler {
}
return;
}
-
+
+ if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) {
+ if (packet.getOptionalMaxSize() < con.getOptions().getMaxMessageSize()) {
+ if (_log.shouldLog(Log.INFO))
+ _log.info("Reducing our max message size to " + packet.getOptionalMaxSize()
+ + " from " + con.getOptions().getMaxMessageSize());
+ con.getOptions().setMaxMessageSize(packet.getOptionalMaxSize());
+ con.getOutputStream().setBufferSize(packet.getOptionalMaxSize());
+ }
+ }
con.packetReceived();
@@ -185,20 +194,21 @@ public class ConnectionPacketHandler {
oldSize >>>= 1;
if (oldSize <= 0)
oldSize = 1;
- con.getOptions().setWindowSize(oldSize);
-
if (_log.shouldLog(Log.DEBUG))
_log.debug("Congestion occurred - new windowSize " + oldSize + " congestionSeenAt: "
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends
+ ") for " + con);
+ con.getOptions().setWindowSize(oldSize);
+
congested = true;
}
long lowest = con.getHighestAckedThrough();
if (lowest >= con.getCongestionWindowEnd()) {
// new packet that ack'ed uncongested data, or an empty ack
- int newWindowSize = con.getOptions().getWindowSize();
+ int oldWindow = con.getOptions().getWindowSize();
+ int newWindowSize = oldWindow;
if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
@@ -216,7 +226,7 @@ public class ConnectionPacketHandler {
}
if (_log.shouldLog(Log.DEBUG))
- _log.debug("New window size " + newWindowSize + " congestionSeenAt: "
+ _log.debug("New window size " + newWindowSize + "/" + oldWindow + " congestionSeenAt: "
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends
+ ") for " + con);
con.getOptions().setWindowSize(newWindowSize);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java
index daa8eb7f06..6d2adb4467 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java
@@ -77,10 +77,10 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
_maxStreams = -1;
}
- _connectionManager = new ConnectionManager(_context, _session, _maxStreams);
_name = name + " " + (++__managerId);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_defaultOptions = new ConnectionOptions(opts);
+ _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
_serverSocket = new I2PServerSocketFull(this);
if (_log.shouldLog(Log.INFO)) {
@@ -91,7 +91,9 @@ public class I2PSocketManagerFull implements I2PSocketManager {
public I2PSocketOptions buildOptions() { return buildOptions(null); }
public I2PSocketOptions buildOptions(Properties opts) {
- return new ConnectionOptions(opts);
+ ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
+ curOpts.setProperties(opts);
+ return curOpts;
}
public I2PSession getSession() {
@@ -164,9 +166,13 @@ public class I2PSocketManagerFull implements I2PSocketManager {
options = _defaultOptions;
ConnectionOptions opts = null;
if (options instanceof ConnectionOptions)
- opts = (ConnectionOptions)options;
+ opts = new ConnectionOptions((ConnectionOptions)options);
else
opts = new ConnectionOptions(options);
+
+ if (_log.shouldLog(Log.INFO))
+ _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
+ + " with options: " + opts);
Connection con = _connectionManager.connect(peer, opts);
if (con == null)
throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
index 2e8a8778d3..aba5399a38 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
@@ -32,6 +32,12 @@ public class MessageOutputStream extends OutputStream {
private long _lastBuffered;
/** if we enqueue data but don't flush it in this period, flush it passively */
private int _passiveFlushDelay;
+ /**
+ * if we are changing the buffer size during operation, set this to the new
+ * buffer size, and next time we are flushing, update the _buf array to the new
+ * size
+ */
+ private volatile int _nextBufferSize;
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
@@ -48,6 +54,7 @@ public class MessageOutputStream extends OutputStream {
_closed = false;
_writeTimeout = -1;
_passiveFlushDelay = 500;
+ _nextBufferSize = -1;
_flusher = new Flusher();
if (_log.shouldLog(Log.DEBUG))
_log.debug("MessageOutputStream created");
@@ -55,6 +62,7 @@ public class MessageOutputStream extends OutputStream {
public void setWriteTimeout(int ms) { _writeTimeout = ms; }
public int getWriteTimeout() { return _writeTimeout; }
+ public void setBufferSize(int size) { _nextBufferSize = size; }
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
@@ -103,6 +111,8 @@ public class MessageOutputStream extends OutputStream {
_valid = 0;
throwAnyError();
_lastFlushed = _context.clock().now();
+
+ locked_updateBufferSize();
}
}
if (ws != null) {
@@ -134,6 +144,22 @@ public class MessageOutputStream extends OutputStream {
throwAnyError();
}
+ /**
+ * If the other side requested we shrink our buffer, do so.
+ *
+ */
+ private final void locked_updateBufferSize() {
+ int size = _nextBufferSize;
+ if (size > 0) {
+ // update the buffer size to the requested amount
+ _dataCache.release(new ByteArray(_buf));
+ _dataCache = ByteCache.getInstance(128, size);
+ ByteArray ba = _dataCache.acquire();
+ _buf = ba.getData();
+ _nextBufferSize = -1;
+ }
+ }
+
/**
* Flush data that has been enqued but not flushed after a certain
* period of inactivity
@@ -180,6 +206,7 @@ public class MessageOutputStream extends OutputStream {
_written += _valid;
_valid = 0;
_lastFlushed = _context.clock().now();
+ locked_updateBufferSize();
_dataLock.notifyAll();
sent = true;
}
@@ -213,6 +240,7 @@ public class MessageOutputStream extends OutputStream {
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
+ locked_updateBufferSize();
_lastFlushed = _context.clock().now();
_dataLock.notifyAll();
}
@@ -251,6 +279,7 @@ public class MessageOutputStream extends OutputStream {
ba = new ByteArray(_buf);
_buf = null;
_valid = 0;
+ locked_updateBufferSize();
}
}
if (ba != null) {
@@ -314,6 +343,7 @@ public class MessageOutputStream extends OutputStream {
ws = target.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
+ locked_updateBufferSize();
_dataLock.notifyAll();
_lastFlushed = _context.clock().now();
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java
index a70b2a9719..6416dfcbcc 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java
@@ -563,7 +563,7 @@ public class Packet {
if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY ").append(_optionDelay);
if (isFlagSet(FLAG_ECHO)) buf.append(" ECHO");
if (isFlagSet(FLAG_FROM_INCLUDED)) buf.append(" FROM");
- if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS");
+ if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS ").append(_optionMaxSize);
if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE");
if (isFlagSet(FLAG_RESET)) buf.append(" RESET");
if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG");
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
index 913bd967b9..14fa66cd9c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
@@ -35,7 +35,7 @@ public class PacketHandler {
// artificial choke: 2% random drop and a 0-30s
// random tiered delay from 0-30s
if (_context.random().nextInt(100) >= 95) {
- displayPacket(packet, "DROP");
+ displayPacket(packet, "DROP", null);
return false;
} else {
// if (true) return true; // no lag, just drop
@@ -97,18 +97,18 @@ public class PacketHandler {
Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null);
if (con != null) {
receiveKnownCon(con, packet);
- displayPacket(packet, "RECV");
+ displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize());
} else {
receiveUnknownCon(packet, sendId);
- displayPacket(packet, "UNKN");
+ displayPacket(packet, "UNKN", null);
}
}
private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS");
- void displayPacket(Packet packet, String prefix) {
+ void displayPacket(Packet packet, String prefix, String suffix) {
String msg = null;
synchronized (_fmt) {
- msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString();
+ msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString() + (suffix != null ? " " + suffix : "");
}
if (_log.shouldLog(Log.DEBUG))
System.out.println(msg);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java
index 8002fe4f8d..51a5d6915d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java
@@ -121,7 +121,9 @@ class PacketQueue {
+ " con: " + conStr;
_log.debug(msg);
}
- _connectionManager.getPacketHandler().displayPacket(packet, "SEND");
+ Connection c = packet.getConnection();
+ String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() : null);
+ _connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix);
}
}
diff --git a/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java b/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java
index 1e0215a1c6..7ad5ba4420 100644
--- a/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java
+++ b/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java
@@ -19,7 +19,7 @@ public class PingTest {
try {
I2PAppContext context = I2PAppContext.getGlobalContext();
I2PSession session = createSession();
- ConnectionManager mgr = new ConnectionManager(context, session, -1);
+ ConnectionManager mgr = new ConnectionManager(context, session, -1, null);
Log log = context.logManager().getLog(PingTest.class);
for (int i = 0; i < 10; i++) {
log.debug("ping " + i);
diff --git a/build.xml b/build.xml
index 91a5739cfd..9f23ad7da5 100644
--- a/build.xml
+++ b/build.xml
@@ -248,7 +248,6 @@
-
diff --git a/history.txt b/history.txt
index f0e8f01439..328a7d0225 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,15 @@
-$Id: history.txt,v 1.127 2005/01/15 16:03:15 jrandom Exp $
+$Id: history.txt,v 1.128 2005/01/15 18:16:13 jrandom Exp $
+
+2005-01-17 jrandom
+ * Added meaningful support for adjusting the preferred message size in the
+ streaming lib by setting the i2p.streaming.maxMessageSize=32768 (or
+ whatever). The other side will mimic a reduction (but never an increase).
+ * Always make sure to use distinct ConnectionOption objects for each
+ connection (duh)
+ * Reduced the default ACK delay to 500ms on in the streaming lib
+ * Only shrink the streaming window once per window
+ * Don't bundle a new jetty.xml with updates
+ * Catch another local routerInfo corruption issue on startup.
2005-01-15 cervantes
* Added support to the eepproxy for URLs such as
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index b64344d787..af0a0de620 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
- public final static String ID = "$Revision: 1.132 $ $Date: 2005/01/15 16:03:14 $";
+ public final static String ID = "$Revision: 1.133 $ $Date: 2005/01/15 18:16:12 $";
public final static String VERSION = "0.4.2.6";
- public final static long BUILD = 2;
+ public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
index 3e9b89b278..1ac0bbe766 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
@@ -343,7 +343,12 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
// periodically update and resign the router's 'published date', which basically
// serves as a version
_context.jobQueue().addJob(new PublishLocalRouterInfoJob(_context));
- publish(ri);
+ try {
+ publish(ri);
+ } catch (IllegalArgumentException iae) {
+ _log.log(Log.CRIT, "Our local router info is b0rked, clearing from scratch", iae);
+ _context.router().rebuildNewIdentity();
+ }
}
/**