diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index cbea38847..789258f92 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -3,14 +3,17 @@ package net.i2p.client.streaming; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; import net.i2p.data.Base64; import net.i2p.data.Destination; +import net.i2p.data.SessionTag; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -39,6 +42,8 @@ public class Connection { private long _closeSentOn; private long _closeReceivedOn; private int _unackedPacketsReceived; + private long _congestionWindowEnd; + private long _highestAckedThrough; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ private Map _outboundPackets; private PacketQueue _outboundQueue; @@ -70,6 +75,8 @@ public class Connection { _closeSentOn = -1; _closeReceivedOn = -1; _unackedPacketsReceived = 0; + _congestionWindowEnd = 0; + _highestAckedThrough = -1; _connectionManager = manager; _resetReceived = false; _connected = true; @@ -183,12 +190,38 @@ public class Connection { // ACKs don't get ACKed, but pings do. if (packet.getTagsSent().size() > 0) { _log.warn("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags"); - _connectionManager.ping(_remotePeer, 30*1000, false, packet.getKeyUsed(), packet.getTagsSent()); + _connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent(), new PingNotifier()); } } } + private class PingNotifier implements ConnectionManager.PingNotifier { + private long _startedPingOn; + public PingNotifier() { + _startedPingOn = _context.clock().now(); + } + public void pingComplete(boolean ok) { + long time = _context.clock().now()-_startedPingOn; + if (ok) + _options.updateRTT((int)time); + else + _options.updateRTT((int)time*2); + } + } + List ackPackets(long ackThrough, long nacks[]) { + if (nacks == null) { + _highestAckedThrough = ackThrough; + } else { + long lowest = -1; + for (int i = 0; i < nacks.length; i++) { + if ( (lowest < 0) || (nacks[i] < lowest) ) + lowest = nacks[i]; + } + if (lowest - 1 > _highestAckedThrough) + _highestAckedThrough = lowest - 1; + } + List acked = null; synchronized (_outboundPackets) { for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { @@ -352,6 +385,11 @@ public class Connection { } } + public long getCongestionWindowEnd() { return _congestionWindowEnd; } + public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; } + public long getHighestAckedThrough() { return _highestAckedThrough; } + public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; } + /** stream that the local peer receives data on */ public MessageInputStream getInputStream() { return _inputStream; } /** stream that the local peer sends data to the remote peer on */ @@ -370,6 +408,7 @@ public class Connection { else buf.append("unknown"); buf.append(" wsize: ").append(_options.getWindowSize()); + buf.append(" cwin: ").append(_congestionWindowEnd - _highestAckedThrough); buf.append(" rtt: ").append(_options.getRTT()); buf.append(" unacked outbound: "); synchronized (_outboundPackets) { @@ -400,8 +439,8 @@ public class Connection { } public void timeReached() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resend period reached for " + _packet); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Resend period reached for " + _packet); boolean resend = false; synchronized (_outboundPackets) { if (_outboundPackets.containsKey(new Long(_packet.getSequenceNum()))) @@ -415,7 +454,7 @@ public class Connection { _packet.setResendDelay(getOptions().getResendDelay()); _packet.setReceiveStreamId(_receiveStreamId); _packet.setSendStreamId(_sendStreamId); - + // shrink the window int newWindowSize = getOptions().getWindowSize(); newWindowSize /= 2; @@ -443,9 +482,9 @@ public class Connection { SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout); } } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet acked before resend (resend="+ resend + "): " - + _packet + " on " + Connection.this); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Packet acked before resend (resend="+ resend + "): " + // + _packet + " on " + Connection.this); } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index f888e06c3..3241adef7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -160,9 +160,9 @@ public class ConnectionManager { return ping(peer, timeoutMs, true); } public boolean ping(Destination peer, long timeoutMs, boolean blocking) { - return ping(peer, timeoutMs, blocking, null, null); + return ping(peer, timeoutMs, blocking, null, null, null); } - public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend) { + public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { byte id[] = new byte[4]; _context.random().nextBytes(id); ByteArray ba = new ByteArray(id); @@ -176,7 +176,7 @@ public class ConnectionManager { packet.setTagsSent(tagsToSend); } - PingRequest req = new PingRequest(peer, packet); + PingRequest req = new PingRequest(peer, packet, notifier); synchronized (_pendingPings) { _pendingPings.put(ba, req); @@ -194,16 +194,25 @@ public class ConnectionManager { _pendingPings.remove(ba); } } else { - SimpleTimer.getInstance().addEvent(new PingFailed(ba), timeoutMs); + SimpleTimer.getInstance().addEvent(new PingFailed(ba, notifier), timeoutMs); } boolean ok = req.pongReceived(); return ok; } + + interface PingNotifier { + public void pingComplete(boolean ok); + } private class PingFailed implements SimpleTimer.TimedEvent { private ByteArray _ba; - public PingFailed(ByteArray ba) { _ba = ba; } + private PingNotifier _notifier; + public PingFailed(ByteArray ba, PingNotifier notifier) { + _ba = ba; + _notifier = notifier; + } + public void timeReached() { boolean removed = false; synchronized (_pendingPings) { @@ -211,8 +220,11 @@ public class ConnectionManager { if (o != null) removed = true; } - if (removed) + if (removed) { + if (_notifier != null) + _notifier.pingComplete(false); _log.error("Ping failed"); + } } } @@ -220,10 +232,12 @@ public class ConnectionManager { private boolean _ponged; private Destination _peer; private PacketLocal _packet; - public PingRequest(Destination peer, PacketLocal packet) { + private PingNotifier _notifier; + public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { _ponged = false; _peer = peer; _packet = packet; + _notifier = notifier; } public void pong() { _log.debug("Ping successful"); @@ -232,6 +246,8 @@ public class ConnectionManager { _ponged = true; ConnectionManager.PingRequest.this.notifyAll(); } + if (_notifier != null) + _notifier.pingComplete(true); } public boolean pongReceived() { return _ponged; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index cac9fd199..06d67c0d1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -87,12 +87,17 @@ public class ConnectionOptions extends I2PSocketOptions { public boolean getRequireFullySigned() { return _fullySigned; } public void setRequireFullySigned(boolean sign) { _fullySigned = sign; } + private static final int MAX_WINDOW_SIZE = 32; /** * How many messages will we send before waiting for an ACK? * */ public int getWindowSize() { return _windowSize; } - public void setWindowSize(int numMsgs) { _windowSize = numMsgs; } + public void setWindowSize(int numMsgs) { + if (numMsgs > MAX_WINDOW_SIZE) + numMsgs = MAX_WINDOW_SIZE; + _windowSize = numMsgs; + } /** after how many consecutive messages should we ack? */ public int getReceiveWindow() { return _receiveWindow; } @@ -108,6 +113,13 @@ public class ConnectionOptions extends I2PSocketOptions { _rtt = 60*1000; } + /** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */ + private static final double RTT_DAMPENING = 0.9; + + public void updateRTT(int measuredValue) { + setRTT((int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*measuredValue)); + } + /** How long after sending a packet will we wait before resending? */ public int getResendDelay() { return _resendDelay; } public void setResendDelay(int ms) { _resendDelay = ms; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 82193aa1b..5d8332e9f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -17,9 +17,6 @@ public class ConnectionPacketHandler { private I2PAppContext _context; private Log _log; - /** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */ - private static final double RTT_DAMPENING = 0.9; - public ConnectionPacketHandler(I2PAppContext context) { _context = context; _log = context.logManager().getLog(ConnectionPacketHandler.class); @@ -98,9 +95,7 @@ public class ConnectionPacketHandler { _log.debug("Packet acked after " + p.getAckTime() + "ms: " + p); } if (highestRTT > 0) { - int oldRTT = con.getOptions().getRTT(); - int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*highestRTT); - con.getOptions().setRTT(newRTT); + con.getOptions().updateRTT(highestRTT); } } @@ -128,13 +123,17 @@ public class ConnectionPacketHandler { // window sizes are shrunk on resend, not on ack } else { if (acked > 0) { - // new packet that ack'ed uncongested data, or an empty ack - int newWindowSize = con.getOptions().getWindowSize(); - newWindowSize += 1; // acked; // 1 - if (_log.shouldLog(Log.DEBUG)) - _log.debug("New window size " + newWindowSize + " (#resends: " + numResends - + ") for " + con); - con.getOptions().setWindowSize(newWindowSize); + long lowest = con.getHighestAckedThrough(); + if (lowest >= con.getCongestionWindowEnd()) { + // new packet that ack'ed uncongested data, or an empty ack + int newWindowSize = con.getOptions().getWindowSize(); + newWindowSize += 1; // acked; // 1 + if (_log.shouldLog(Log.DEBUG)) + _log.debug("New window size " + newWindowSize + " (#resends: " + numResends + + ") for " + con); + con.getOptions().setWindowSize(newWindowSize); + con.setCongestionWindowEnd(newWindowSize + lowest); + } } } return false; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index 413d8a9b5..a7877d713 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -20,6 +20,7 @@ public class I2PSocketFull implements I2PSocket { public void close() throws IOException { if (_connection.getIsConnected()) { + _connection.getOutputStream().close(); _connection.disconnect(true); } else { throw new IOException("Not connected"); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index ca1a416c9..9d558f6c4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -73,6 +73,6 @@ public class MessageHandler implements I2PSessionListener { public void errorOccurred(I2PSession session, String message, Throwable error) { if (_log.shouldLog(Log.ERROR)) _log.error("error occurred: " + message, error); - _manager.disconnectAllHard(); + //_manager.disconnectAllHard(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 5f3246ff5..abf3ca30f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -34,10 +34,11 @@ public class PacketHandler { if (false) { // artificial choke: 2% random drop and a 0-30s // random tiered delay from 0-30s - if (_context.random().nextInt(100) >= 98) { + if (_context.random().nextInt(100) >= 95) { displayPacket(packet, "DROP"); return false; } else { + // if (true) return true; // no lag, just drop /* int delay = _context.random().nextInt(5*1000); */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index d53f8223f..cf861441f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -44,9 +44,10 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public Set getTagsSent() { return _tagsSent; } public void setTagsSent(Set tags) { if ( (_tagsSent != null) && (_tagsSent.size() > 0) && (tags.size() > 0) ) { - int old = _tagsSent.size(); - _tagsSent.addAll(tags); - //System.out.println("Dup tags set on " +toString() + " old=" + old + " new=" + tags.size()); + //int old = _tagsSent.size(); + //_tagsSent.addAll(tags); + if (!_tagsSent.equals(tags)) + System.out.println("ERROR: dup tags: old=" + _tagsSent.size() + " new=" + tags.size() + " packet: " + toString()); } else { _tagsSent = tags; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index b62117f61..53f33c169 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -44,7 +44,11 @@ class PacketQueue { tagsSent = new HashSet(); try { // cache this from before sendMessage - String conStr = packet.getConnection() + ""; + String conStr = (packet.getConnection() != null ? packet.getConnection().toString() : ""); + if (packet.getAckTime() > 0) { + _log.debug("Not resending " + packet); + return; + } // this should not block! long begin = _context.clock().now(); boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java index 5b6740849..617ca9ab5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java @@ -25,7 +25,7 @@ abstract class SchedulerImpl implements TaskScheduler { private Exception _addedBy; public ConEvent(Connection con) { _connection = con; - _addedBy = new Exception("added by"); + //_addedBy = new Exception("added by"); } public void timeReached() { //if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java b/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java index a367f3995..058c5b8b3 100644 --- a/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java +++ b/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java @@ -4,21 +4,39 @@ package net.i2p.client.streaming; * */ public class StreamSinkTest { +/* private static String HOST1 = "dev.i2p.net"; + private static String HOST2 = "dev.i2p.net"; + private static String PORT1 = "4101"; + private static String PORT2 = "4501"; +*/ /* */ + private static String HOST1 = "localhost"; + private static String HOST2 = "localhost"; + private static String PORT1 = "7654"; + private static String PORT2 = "7654"; + /* */ /* + private static String HOST1 = "localhost"; + private static String HOST2 = "localhost"; + private static String PORT1 = "10001"; + private static String PORT2 = "11001"; + */ + public static void main(String args[]) { System.setProperty(I2PSocketManagerFactory.PROP_MANAGER, I2PSocketManagerFull.class.getName()); + //System.setProperty("tunnels.depthInbound", "0"); new Thread(new Runnable() { public void run() { - StreamSinkServer.main(new String[] { "streamSinkTestDir", "streamSinkTestServer.key" }); + StreamSinkServer.main(new String[] { HOST1, PORT1, "streamSinkTestDir", "streamSinkTestServer.key" }); } }, "server").start(); - try { Thread.sleep(30*1000); } catch (Exception e) {} + try { Thread.sleep(60*1000); } catch (Exception e) {} //run(256, 10000); //run(256, 1000); //run(1024, 10); run(32*1024, 1); + //run(1*1024, 1); //run("/home/jrandom/streamSinkTestDir/clientSink36766.dat", 1); //run(512*1024, 1); try { Thread.sleep(10*1000); } catch (InterruptedException e) {} @@ -29,7 +47,7 @@ public class StreamSinkTest { private static void run(final int kb, final int msBetweenWrites) { Thread t = new Thread(new Runnable() { public void run() { - StreamSinkClient.main(new String[] { kb+"", msBetweenWrites+"", "streamSinkTestServer.key" }); + StreamSinkClient.main(new String[] { HOST2, PORT2, kb+"", msBetweenWrites+"", "streamSinkTestServer.key" }); } }); t.start();