diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java index e4d42ef16..acfdfc4ad 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java @@ -96,6 +96,22 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable "HTTP outproxy configured. Please configure an outproxy in I2PTunnel") .getBytes(); + private final static byte[] ERR_AHELPER_CONFLICT = + ("HTTP/1.1 409 Conflict\r\n"+ + "Content-Type: text/html; charset=iso-8859-1\r\n"+ + "Cache-control: no-cache\r\n"+ + "\r\n"+ + "

I2P ERROR: Destination key conflict

"+ + "The addresshelper link you followed specifies a different destination key "+ + "than a host entry in your host database. "+ + "Someone could be trying to impersonate another eepsite, "+ + "or people have given two eepsites identical names.

"+ + "You can resolve the conflict by considering which key you trust, "+ + "and either discarding the addresshelper link, "+ + "discarding the host entry from your host database, "+ + "or naming one of them differently.

") + .getBytes(); + /** used to assign unique IDs to the threads / clients. no logic or functionality */ private static volatile long __clientId = 0; @@ -243,50 +259,102 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable // Quick hack for foo.bar.i2p if (host.toLowerCase().endsWith(".i2p")) { + // Destination gets the host name destination = host; + // Host becomes the destination key host = getHostName(destination); - if ( (host != null) && ("i2p".equals(host)) ) { - int pos2; - if ((pos2 = request.indexOf("?")) != -1) { - // Try to find an address helper in the fragments - // and split the request into it's component parts for rebuilding later - String fragments = request.substring(pos2 + 1); - String uriPath = request.substring(0, pos2); - pos2 = fragments.indexOf(" "); - String protocolVersion = fragments.substring(pos2 + 1); - String urlEncoding = ""; - fragments = fragments.substring(0, pos2); - fragments = fragments + "&"; - String fragment; - while(fragments.length() > 0) { - pos2 = fragments.indexOf("&"); - fragment = fragments.substring(0, pos2); - fragments = fragments.substring(pos2 + 1); - if (fragment.startsWith("i2paddresshelper")) { - pos2 = fragment.indexOf("="); - if (pos2 >= 0) { - addressHelpers.put(destination,fragment.substring(pos2 + 1)); - } - } else { - // append each fragment unless it's the address helper - if ("".equals(urlEncoding)) { - urlEncoding = "?" + fragment; - } else { - urlEncoding = urlEncoding + "&" + fragment; - } - } - } - // reconstruct the request minus the i2paddresshelper GET var - request = uriPath + urlEncoding + " " + protocolVersion; - } - String addressHelper = (String) addressHelpers.get(destination); - if (addressHelper != null) { - destination = addressHelper; - host = getHostName(destination); - ahelper = 1; + int pos2; + if ((pos2 = request.indexOf("?")) != -1) { + // Try to find an address helper in the fragments + // and split the request into it's component parts for rebuilding later + String ahelperKey = null; + boolean ahelperConflict = false; + + String fragments = request.substring(pos2 + 1); + String uriPath = request.substring(0, pos2); + pos2 = fragments.indexOf(" "); + String protocolVersion = fragments.substring(pos2 + 1); + String urlEncoding = ""; + fragments = fragments.substring(0, pos2); + String initialFragments = fragments; + fragments = fragments + "&"; + String fragment; + while(fragments.length() > 0) { + pos2 = fragments.indexOf("&"); + fragment = fragments.substring(0, pos2); + fragments = fragments.substring(pos2 + 1); + + // Fragment looks like addresshelper key + if (fragment.startsWith("i2paddresshelper=")) { + pos2 = fragment.indexOf("="); + ahelperKey = fragment.substring(pos2 + 1); + + // Key contains data, lets not ignore it + if (ahelperKey != null) { + + // Host resolvable only with addresshelper + if ( (host == null) || ("i2p".equals(host)) ) + { + // Cannot check, use addresshelper key + addressHelpers.put(destination,ahelperKey); + } else { + // Host resolvable from database, verify addresshelper key + // Silently bypass correct keys, otherwise alert + if (!host.equals(ahelperKey)) + { + // Conflict: handle when URL reconstruction done + ahelperConflict = true; + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix(requestId) + "Addresshelper key conflict for site [" + destination + "], trusted key [" + host + "], specified key [" + ahelperKey + "]."); + } + } + } + } else { + // Other fragments, just pass along + // Append each fragment to urlEncoding + if ("".equals(urlEncoding)) { + urlEncoding = "?" + fragment; + } else { + urlEncoding = urlEncoding + "&" + fragment; + } + } + } + // Reconstruct the request minus the i2paddresshelper GET var + request = uriPath + urlEncoding + " " + protocolVersion; + + // Did addresshelper key conflict? + if (ahelperConflict) + { + String str; + byte[] header; + str = FileUtil.readTextFile("docs/ahelper-conflict-header.ht", 100, true); + if (str != null) header = str.getBytes(); + else header = ERR_AHELPER_CONFLICT; + + if (out != null) { + long alias = I2PAppContext.getGlobalContext().random().nextLong(); + String trustedURL = protocol + uriPath + urlEncoding; + String conflictURL = protocol + alias + ".i2p/?" + initialFragments; + out.write(header); + out.write(("To visit the destination in your host database, click here. To visit the conflicting addresshelper link by temporarily giving it a random alias, click here.

").getBytes()); + out.write("

I2P HTTP Proxy Server
Generated on: ".getBytes()); + out.write(new Date().toString().getBytes()); + out.write("
\n".getBytes()); + out.flush(); + } + s.close(); + return; } } + + String addressHelper = (String) addressHelpers.get(destination); + if (addressHelper != null) { + destination = addressHelper; + host = getHostName(destination); + ahelper = 1; + } + line = method + " " + request.substring(pos); } else if (host.indexOf(".") != -1) { // The request must be forwarded to a WWW proxy diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index aaf7d0a54..9d6f53c13 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -72,7 +72,7 @@ public class Connection { private long _lifetimeDupMessageSent; private long _lifetimeDupMessageReceived; - public static final long MAX_RESEND_DELAY = 5*1000; + public static final long MAX_RESEND_DELAY = 8*1000; public static final long MIN_RESEND_DELAY = 3*1000; /** wait up to 5 minutes after disconnection so we can ack/close packets */ @@ -257,13 +257,13 @@ public class Connection { if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) { packet.setOptionalDelay(0); } else { - int delay = _options.getRTT() / 2; + int delay = _options.getRTO() / 2; packet.setOptionalDelay(delay); _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet); } packet.setFlag(Packet.FLAG_DELAY_REQUESTED); - long timeout = _options.getRTT() + MIN_RESEND_DELAY; + long timeout = _options.getRTO(); if (timeout > MAX_RESEND_DELAY) timeout = MAX_RESEND_DELAY; if (_log.shouldLog(Log.DEBUG)) @@ -308,7 +308,7 @@ public class Connection { List ackPackets(long ackThrough, long nacks[]) { if (nacks == null) { - _highestAckedThrough = ackThrough; + _highestAckedThrough = ackThrough; } else { long lowest = -1; for (int i = 0; i < nacks.length; i++) { @@ -463,7 +463,9 @@ public class Connection { _receiver.destroy(); if (_activityTimer != null) SimpleTimer.getInstance().removeEvent(_activityTimer); - _activityTimer = null; + //_activityTimer = null; + if (_inputStream != null) + _inputStream.streamErrorOccurred(new IOException("disconnected!")); if (_disconnectScheduledOn < 0) { _disconnectScheduledOn = _context.clock().now(); @@ -695,11 +697,19 @@ public class Connection { } private void resetActivityTimer() { - if (_options.getInactivityTimeout() <= 0) return; - if (_activityTimer == null) return; + if (_options.getInactivityTimeout() <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?")); + return; + } + if (_activityTimer == null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?")); + return; + } long howLong = _activityTimer.getTimeLeft(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resetting the inactivity timer to " + howLong); + _log.debug("Resetting the inactivity timer to " + howLong, new Exception("Reset by")); // this will get rescheduled, and rescheduled, and rescheduled... SimpleTimer.getInstance().addEvent(_activityTimer, howLong); } @@ -707,15 +717,34 @@ public class Connection { private class ActivityTimer implements SimpleTimer.TimedEvent { public void timeReached() { // uh, nothing more to do... - if (!_connected) return; + if (!_connected) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but we are already closed"); + return; + } // we got rescheduled already - if (getTimeLeft() > 0) return; + long left = getTimeLeft(); + if (left > 0) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but there is time left (" + left + ")"); + SimpleTimer.getInstance().addEvent(ActivityTimer.this, left); + return; + } // these are either going to time out or cause further rescheduling - if (getUnackedPacketsSent() > 0) return; + if (getUnackedPacketsSent() > 0) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but there are unacked packets"); + return; + } // wtf, this shouldn't have been scheduled - if (_options.getInactivityTimeout() <= 0) return; + if (_options.getInactivityTimeout() <= 0) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but there is no timer..."); + return; + } // if one of us can't talk... - if ( (_closeSentOn > 0) || (_closeReceivedOn > 0) ) return; + if ( (_closeSentOn > 0) || (_closeReceivedOn > 0) ) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but we are closing"); + return; + } + + if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, with action=" + _options.getInactivityAction()); // bugger it, might as well do the hard work now switch (_options.getInactivityAction()) { @@ -741,7 +770,9 @@ public class Connection { _log.debug(buf.toString()); } - disconnect(true); + _inputStream.streamErrorOccurred(new IOException("Inactivity timeout")); + _outputStream.streamErrorOccurred(new IOException("Inactivity timeout")); + disconnect(false); break; } } @@ -774,6 +805,7 @@ public class Connection { buf.append(" wsize: ").append(_options.getWindowSize()); buf.append(" cwin: ").append(_congestionWindowEnd - _highestAckedThrough); buf.append(" rtt: ").append(_options.getRTT()); + buf.append(" rto: ").append(_options.getRTO()); // not synchronized to avoid some kooky races buf.append(" unacked outbound: ").append(_outboundPackets.size()).append(" "); /* @@ -950,10 +982,10 @@ public class Connection { disconnect(false); } else { //long timeout = _options.getResendDelay() << numSends; - long rtt = _options.getRTT(); - if (rtt < MIN_RESEND_DELAY) - rtt = MIN_RESEND_DELAY; - long timeout = rtt << (numSends-1); + long rto = _options.getRTO(); + if (rto < MIN_RESEND_DELAY) + rto = MIN_RESEND_DELAY; + long timeout = rto << (numSends-1); if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) ) timeout = MAX_RESEND_DELAY; if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 5bfa4b1df..a1bae2253 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -2,6 +2,7 @@ package net.i2p.client.streaming; import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; +import net.i2p.data.DataHelper; import net.i2p.util.Log; /** @@ -166,6 +167,9 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setOptionalFrom(con.getSession().getMyDestination()); packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); } + if (DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) { + packet.setFlag(Packet.FLAG_NO_ACK); + } // don't set the closed flag if this is a plain ACK and there are outstanding // packets sent, otherwise the other side could receive the CLOSE prematurely, diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 30175938d..0bddc6084 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -13,6 +13,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { private int _receiveWindow; private int _profile; private int _rtt; + private int _rttDev; + private int _rto; private int _trend[]; private int _resendDelay; private int _sendAckDelay; @@ -52,6 +54,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor"; private static final int TREND_COUNT = 3; + static final int INITIAL_WINDOW_SIZE = 4; public ConnectionOptions() { super(); @@ -68,6 +71,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { public ConnectionOptions(ConnectionOptions opts) { super(opts); if (opts != null) { + setMaxWindowSize(opts.getMaxWindowSize()); setConnectDelay(opts.getConnectDelay()); setProfile(opts.getProfile()); setRTT(opts.getRTT()); @@ -80,7 +84,6 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setInactivityTimeout(opts.getInactivityTimeout()); setInactivityAction(opts.getInactivityAction()); setInboundBufferSize(opts.getInboundBufferSize()); - setMaxWindowSize(opts.getMaxWindowSize()); setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor()); setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor()); } @@ -90,6 +93,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { super.init(opts); _trend = new int[TREND_COUNT]; + setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, 4*1024)); @@ -97,22 +101,23 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000)); setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500)); - setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1)); + setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, 10)); setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); - setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); + setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 2*60*1000)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); - setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); } public void setProperties(Properties opts) { super.setProperties(opts); if (opts == null) return; + if (opts.containsKey(PROP_MAX_WINDOW_SIZE)) + setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); if (opts.containsKey(PROP_CONNECT_DELAY)) setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); if (opts.containsKey(PROP_PROFILE)) @@ -124,17 +129,17 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { if (opts.containsKey(PROP_INITIAL_RECEIVE_WINDOW)) setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); if (opts.containsKey(PROP_INITIAL_RESEND_DELAY)) - setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 500)); + setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000)); if (opts.containsKey(PROP_INITIAL_ACK_DELAY)) setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500)); if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE)) - setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1)); + setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); if (opts.containsKey(PROP_MAX_RESENDS)) setMaxResends(getInt(opts, PROP_MAX_RESENDS, 10)); if (opts.containsKey(PROP_WRITE_TIMEOUT)) setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) - setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); + setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 2*60*1000)); if (opts.containsKey(PROP_INACTIVITY_ACTION)) setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); @@ -145,8 +150,6 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { if (opts.containsKey(PROP_CONNECT_TIMEOUT)) setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); - if (opts.containsKey(PROP_MAX_WINDOW_SIZE)) - setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); } /** @@ -191,6 +194,10 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { */ public int getRTT() { return _rtt; } public void setRTT(int ms) { + if (_rto == 0) { + _rttDev = ms; + _rto = (int)Connection.MAX_RESEND_DELAY; + } synchronized (_trend) { _trend[0] = _trend[1]; _trend[1] = _trend[2]; @@ -201,10 +208,12 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { else _trend[2] = 0; } + _rtt = ms; if (_rtt > 60*1000) _rtt = 60*1000; } + public int getRTO() { return _rto; } /** * If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have @@ -225,7 +234,15 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { private static final double RTT_DAMPENING = 0.9; public void updateRTT(int measuredValue) { - setRTT((int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*measuredValue)); + _rttDev = _rttDev + (int)(0.25d*(Math.abs(measuredValue-_rtt)-_rttDev)); + int smoothed = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*measuredValue); + _rto = smoothed + (_rttDev<<2); + if (_rto < Connection.MIN_RESEND_DELAY) + _rto = (int)Connection.MIN_RESEND_DELAY; + else if (_rto > Connection.MAX_RESEND_DELAY) + _rto = (int)Connection.MAX_RESEND_DELAY; + + setRTT(smoothed); } /** How long after sending a packet will we wait before resending? */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index b92fbaee8..83fc55266 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -187,6 +187,7 @@ public class ConnectionPacketHandler { } private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) { + if (ackThrough < 0) return false; //if ( (nacks != null) && (nacks.length > 0) ) // con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); @@ -315,6 +316,12 @@ public class ConnectionPacketHandler { return congested; } + /** + * If we don't know the send stream id yet (we're just creating a connection), allow + * the first three packets to come in. The first of those should be the SYN, of course... + */ + private static final int MAX_INITIAL_PACKETS = ConnectionOptions.INITIAL_WINDOW_SIZE; + /** * Make sure this packet is ok and that we can continue processing its data. * @@ -335,7 +342,7 @@ public class ConnectionPacketHandler { return true; } else { // neither RST nor SYN and we dont have the stream id yet? - if (packet.getSequenceNum() <= 2) { + if (packet.getSequenceNum() < MAX_INITIAL_PACKETS) { return true; } else { if (_log.shouldLog(Log.ERROR)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index da55bc3d3..74daeb9ab 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -44,6 +44,7 @@ public class MessageHandler implements I2PSessionListener { _log.warn("Error receiving the message", ise); return; } + if (data == null) return; Packet packet = new Packet(); try { packet.readPacket(data, 0, data.length); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 09f02c6b5..1b608005d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -435,7 +435,9 @@ public class MessageInputStream extends InputStream { * */ void streamErrorOccurred(IOException ioe) { - _streamError = ioe; + if (_streamError == null) + _streamError = ioe; + _locallyClosed = true; synchronized (_dataLock) { _dataLock.notifyAll(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 671330026..9e8a47b73 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -312,11 +312,16 @@ public class MessageOutputStream extends OutputStream { /** nonblocking close */ public void closeInternal() { _closed = true; - _streamError = new IOException("Closed internally"); + if (_streamError == null) + _streamError = new IOException("Closed internally"); + clearData(true); + } + + private void clearData(boolean shouldFlush) { ByteArray ba = null; synchronized (_dataLock) { // flush any data, but don't wait for it - if (_dataReceiver != null) + if ( (_dataReceiver != null) && (_valid > 0) && shouldFlush) _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; @@ -345,7 +350,9 @@ public class MessageOutputStream extends OutputStream { } void streamErrorOccurred(IOException ioe) { - _streamError = ioe; + if (_streamError == null) + _streamError = ioe; + clearData(false); } /** diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index b967a8aff..ea8f37370 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -135,6 +135,11 @@ public class Packet { * ping reply (if receiveStreamId is set). */ public static final int FLAG_ECHO = (1 << 9); + + /** + * If set, this packet doesn't really want to ack anything + */ + public static final int FLAG_NO_ACK = (1 << 10); public static final int DEFAULT_MAX_SIZE = 32*1024; private static final int MAX_DELAY_REQUEST = 65535; @@ -181,11 +186,21 @@ public class Packet { /** * The highest packet sequence number that received * on the receiveStreamId. This field is ignored on the initial - * connection packet (where receiveStreamId is the unknown id). + * connection packet (where receiveStreamId is the unknown id) or + * if FLAG_NO_ACK is set. * */ - public long getAckThrough() { return _ackThrough; } - public void setAckThrough(long id) { _ackThrough = id; } + public long getAckThrough() { + if (isFlagSet(FLAG_NO_ACK)) + return -1; + else + return _ackThrough; + } + public void setAckThrough(long id) { + if (id < 0) + setFlag(FLAG_NO_ACK); + _ackThrough = id; + } /** * List of packet sequence numbers below the getAckThrough() value @@ -566,7 +581,7 @@ public class Packet { else buf.append('\t'); buf.append(toFlagString()); - buf.append(" ACK ").append(_ackThrough); + buf.append(" ACK ").append(getAckThrough()); if (_nacks != null) { buf.append(" NACK"); for (int i = 0; i < _nacks.length; i++) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 877dd0c47..72799f2fd 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -22,19 +22,26 @@ public class PacketHandler { private I2PAppContext _context; private Log _log; private int _lastDelay; + private int _dropped; public PacketHandler(I2PAppContext ctx, ConnectionManager mgr) { _manager = mgr; _context = ctx; + _dropped = 0; _log = ctx.logManager().getLog(PacketHandler.class); _lastDelay = _context.random().nextInt(30*1000); } - private boolean choke(Packet packet) { - if (false) { - // artificial choke: 2% random drop and a 0-30s + private boolean choke(Packet packet) { + if (true) return true; + //if ( (_dropped == 0) && true ) { //&& (_manager.getSent() <= 0) ) { + // _dropped++; + // return false; + //} + if (true) { + // artificial choke: 2% random drop and a 0-5s // random tiered delay from 0-30s - if (_context.random().nextInt(100) >= 95) { + if (_context.random().nextInt(100) >= 98) { displayPacket(packet, "DROP", null); return false; } else { @@ -42,7 +49,7 @@ public class PacketHandler { /* int delay = _context.random().nextInt(5*1000); */ - int delay = _context.random().nextInt(6*1000); + int delay = _context.random().nextInt(1*1000); int delayFactor = _context.random().nextInt(100); if (delayFactor > 80) { if (delayFactor > 98) @@ -97,7 +104,7 @@ public class PacketHandler { Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null); if (con != null) { receiveKnownCon(con, packet); - displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize()); + displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO()); } else { receiveUnknownCon(packet, sendId); displayPacket(packet, "UNKN", null); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 6bbb3d94d..9ce96071d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -125,7 +125,7 @@ class PacketQueue { _log.debug(msg); } Connection c = packet.getConnection(); - String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() : null); + String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO() : null); _connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix); } diff --git a/build.xml b/build.xml index 1f1489c39..91b76269c 100644 --- a/build.xml +++ b/build.xml @@ -273,6 +273,7 @@ + @@ -339,6 +340,7 @@ + diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index e3db623cb..b9bbcd46b 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -414,7 +414,12 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if ( (msgId != null) && (size != null) ) { if (_sessionListener != null) { try { + long before = System.currentTimeMillis(); _sessionListener.messageAvailable(I2PSessionImpl.this, msgId.intValue(), size.intValue()); + long duration = System.currentTimeMillis() - before; + if ((duration > 100) && _log.shouldLog(Log.ERROR)) + _log.error("Message availability notification for " + msgId.intValue() + " took " + + duration + " to " + _sessionListener); } catch (Exception e) { _log.log(Log.CRIT, "Error notifying app of message availability", e); } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 212527944..a87a714bc 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -93,8 +93,10 @@ class I2PSessionImpl2 extends I2PSessionImpl { */ public byte[] receiveMessage(int msgId) throws I2PSessionException { byte compressed[] = super.receiveMessage(msgId); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("receiving message " + msgId + " with " + compressed.length + " compressed bytes"); + if (compressed == null) { + _log.error("Error: message " + msgId + " already received!"); + return null; + } if (SHOULD_COMPRESS) { try { return DataHelper.decompress(compressed); diff --git a/core/java/src/net/i2p/data/ByteArray.java b/core/java/src/net/i2p/data/ByteArray.java index f0339a13c..5dbed1298 100644 --- a/core/java/src/net/i2p/data/ByteArray.java +++ b/core/java/src/net/i2p/data/ByteArray.java @@ -37,7 +37,7 @@ public class ByteArray implements Serializable, Comparable { _valid = length; } - public final byte[] getData() { + public byte[] getData() { return _data; } @@ -50,10 +50,10 @@ public class ByteArray implements Serializable, Comparable { * this property does not necessarily have meaning for all byte * arrays. */ - public final int getValid() { return _valid; } - public final void setValid(int valid) { _valid = valid; } - public final int getOffset() { return _offset; } - public final void setOffset(int offset) { _offset = offset; } + public int getValid() { return _valid; } + public void setValid(int valid) { _valid = valid; } + public int getOffset() { return _offset; } + public void setOffset(int offset) { _offset = offset; } public final boolean equals(Object o) { if (o == null) return false; @@ -83,7 +83,7 @@ public class ByteArray implements Serializable, Comparable { return DataHelper.hashCode(getData()); } - public final String toString() { + public String toString() { return super.toString() + "/" + DataHelper.toString(getData(), 32) + "." + _valid; } diff --git a/history.txt b/history.txt index afaeaf0c2..4d19759f8 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,21 @@ -$Id: history.txt,v 1.262 2005/09/21 01:43:04 jrandom Exp $ +$Id: history.txt,v 1.263 2005/09/21 18:01:37 jrandom Exp $ + +2005-09-25 Complication + * Better i2paddresshelper handling in the I2PTunnel httpclient, plus a new + conflict resolution page if the i2paddresshelper parameter differs from + an existing name to destination mapping. + +2005-09-25 jrandom + * Fix a long standing streaming lib bug (in the inactivity detection code) + * Improved handling of initial streaming lib packet retransmissions to + kill the "lost first packet" bug (where a page shows up with the first + few KB missing) + * Add support for initial window sizes greater than 1 - useful for + eepsites to transmit e.g. 4 packets full of data along with the initial + ACK, thereby cutting down on the rtt latency. The congestion window + size can and does still shrink down to 1 packet though. + * Adjusted the streaming lib retransmission calculation algorithm to be + more TCP-like. 2005-09-21 redzara * Use ISO-8859-1 for the susidns xml diff --git a/installer/resources/ahelper-conflict-header.ht b/installer/resources/ahelper-conflict-header.ht new file mode 100644 index 000000000..229bbf218 --- /dev/null +++ b/installer/resources/ahelper-conflict-header.ht @@ -0,0 +1,45 @@ +HTTP/1.1 409 Conflict +Content-Type: text/html; charset=iso-8859-1 +Cache-control: no-cache + + +Destination key conflict + + + +

+
+The addresshelper link you followed specifies a different destination key +than a host entry in your host database. +Someone could be trying to impersonate another eepsite, +or people have given two eepsites identical names. +

+You can resolve the conflict by considering which key you trust, +and either discarding the addresshelper link, +discarding the host entry from your host database, +or naming one of them differently. +

diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 6596422ff..f72cdd240 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.242 $ $Date: 2005/09/18 18:08:18 $"; + public final static String ID = "$Revision: 1.243 $ $Date: 2005/09/21 01:43:04 $"; public final static String VERSION = "0.6.0.6"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);