a chunk of streaming lib updates (cwin calc & timed win, pings influencing rtt, etc)

This commit is contained in:
jrandom
2004-10-30 23:46:01 +00:00
committed by zzz
parent 58fcbad20a
commit b37313d3f9
11 changed files with 129 additions and 38 deletions

View File

@ -3,14 +3,17 @@ package net.i2p.client.streaming;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession; import net.i2p.client.I2PSession;
import net.i2p.data.Base64; import net.i2p.data.Base64;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.SessionTag;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
@ -39,6 +42,8 @@ public class Connection {
private long _closeSentOn; private long _closeSentOn;
private long _closeReceivedOn; private long _closeReceivedOn;
private int _unackedPacketsReceived; private int _unackedPacketsReceived;
private long _congestionWindowEnd;
private long _highestAckedThrough;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */ /** Packet ID (Long) to PacketLocal for sent but unacked packets */
private Map _outboundPackets; private Map _outboundPackets;
private PacketQueue _outboundQueue; private PacketQueue _outboundQueue;
@ -70,6 +75,8 @@ public class Connection {
_closeSentOn = -1; _closeSentOn = -1;
_closeReceivedOn = -1; _closeReceivedOn = -1;
_unackedPacketsReceived = 0; _unackedPacketsReceived = 0;
_congestionWindowEnd = 0;
_highestAckedThrough = -1;
_connectionManager = manager; _connectionManager = manager;
_resetReceived = false; _resetReceived = false;
_connected = true; _connected = true;
@ -183,12 +190,38 @@ public class Connection {
// ACKs don't get ACKed, but pings do. // ACKs don't get ACKed, but pings do.
if (packet.getTagsSent().size() > 0) { if (packet.getTagsSent().size() > 0) {
_log.warn("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags"); _log.warn("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags");
_connectionManager.ping(_remotePeer, 30*1000, false, packet.getKeyUsed(), packet.getTagsSent()); _connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent(), new PingNotifier());
} }
} }
} }
private class PingNotifier implements ConnectionManager.PingNotifier {
private long _startedPingOn;
public PingNotifier() {
_startedPingOn = _context.clock().now();
}
public void pingComplete(boolean ok) {
long time = _context.clock().now()-_startedPingOn;
if (ok)
_options.updateRTT((int)time);
else
_options.updateRTT((int)time*2);
}
}
List ackPackets(long ackThrough, long nacks[]) { List ackPackets(long ackThrough, long nacks[]) {
if (nacks == null) {
_highestAckedThrough = ackThrough;
} else {
long lowest = -1;
for (int i = 0; i < nacks.length; i++) {
if ( (lowest < 0) || (nacks[i] < lowest) )
lowest = nacks[i];
}
if (lowest - 1 > _highestAckedThrough)
_highestAckedThrough = lowest - 1;
}
List acked = null; List acked = null;
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) {
@ -352,6 +385,11 @@ public class Connection {
} }
} }
public long getCongestionWindowEnd() { return _congestionWindowEnd; }
public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; }
public long getHighestAckedThrough() { return _highestAckedThrough; }
public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; }
/** stream that the local peer receives data on */ /** stream that the local peer receives data on */
public MessageInputStream getInputStream() { return _inputStream; } public MessageInputStream getInputStream() { return _inputStream; }
/** stream that the local peer sends data to the remote peer on */ /** stream that the local peer sends data to the remote peer on */
@ -370,6 +408,7 @@ public class Connection {
else else
buf.append("unknown"); buf.append("unknown");
buf.append(" wsize: ").append(_options.getWindowSize()); buf.append(" wsize: ").append(_options.getWindowSize());
buf.append(" cwin: ").append(_congestionWindowEnd - _highestAckedThrough);
buf.append(" rtt: ").append(_options.getRTT()); buf.append(" rtt: ").append(_options.getRTT());
buf.append(" unacked outbound: "); buf.append(" unacked outbound: ");
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
@ -400,8 +439,8 @@ public class Connection {
} }
public void timeReached() { public void timeReached() {
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("Resend period reached for " + _packet); // _log.debug("Resend period reached for " + _packet);
boolean resend = false; boolean resend = false;
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
if (_outboundPackets.containsKey(new Long(_packet.getSequenceNum()))) if (_outboundPackets.containsKey(new Long(_packet.getSequenceNum())))
@ -443,9 +482,9 @@ public class Connection {
SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout); SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
} }
} else { } else {
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet acked before resend (resend="+ resend + "): " // _log.debug("Packet acked before resend (resend="+ resend + "): "
+ _packet + " on " + Connection.this); // + _packet + " on " + Connection.this);
} }
} }
} }

View File

@ -160,9 +160,9 @@ public class ConnectionManager {
return ping(peer, timeoutMs, true); return ping(peer, timeoutMs, true);
} }
public boolean ping(Destination peer, long timeoutMs, boolean blocking) { public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
return ping(peer, timeoutMs, blocking, null, null); return ping(peer, timeoutMs, blocking, null, null, null);
} }
public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend) { public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
byte id[] = new byte[4]; byte id[] = new byte[4];
_context.random().nextBytes(id); _context.random().nextBytes(id);
ByteArray ba = new ByteArray(id); ByteArray ba = new ByteArray(id);
@ -176,7 +176,7 @@ public class ConnectionManager {
packet.setTagsSent(tagsToSend); packet.setTagsSent(tagsToSend);
} }
PingRequest req = new PingRequest(peer, packet); PingRequest req = new PingRequest(peer, packet, notifier);
synchronized (_pendingPings) { synchronized (_pendingPings) {
_pendingPings.put(ba, req); _pendingPings.put(ba, req);
@ -194,16 +194,25 @@ public class ConnectionManager {
_pendingPings.remove(ba); _pendingPings.remove(ba);
} }
} else { } else {
SimpleTimer.getInstance().addEvent(new PingFailed(ba), timeoutMs); SimpleTimer.getInstance().addEvent(new PingFailed(ba, notifier), timeoutMs);
} }
boolean ok = req.pongReceived(); boolean ok = req.pongReceived();
return ok; return ok;
} }
interface PingNotifier {
public void pingComplete(boolean ok);
}
private class PingFailed implements SimpleTimer.TimedEvent { private class PingFailed implements SimpleTimer.TimedEvent {
private ByteArray _ba; private ByteArray _ba;
public PingFailed(ByteArray ba) { _ba = ba; } private PingNotifier _notifier;
public PingFailed(ByteArray ba, PingNotifier notifier) {
_ba = ba;
_notifier = notifier;
}
public void timeReached() { public void timeReached() {
boolean removed = false; boolean removed = false;
synchronized (_pendingPings) { synchronized (_pendingPings) {
@ -211,19 +220,24 @@ public class ConnectionManager {
if (o != null) if (o != null)
removed = true; removed = true;
} }
if (removed) if (removed) {
if (_notifier != null)
_notifier.pingComplete(false);
_log.error("Ping failed"); _log.error("Ping failed");
} }
} }
}
private class PingRequest { private class PingRequest {
private boolean _ponged; private boolean _ponged;
private Destination _peer; private Destination _peer;
private PacketLocal _packet; private PacketLocal _packet;
public PingRequest(Destination peer, PacketLocal packet) { private PingNotifier _notifier;
public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
_ponged = false; _ponged = false;
_peer = peer; _peer = peer;
_packet = packet; _packet = packet;
_notifier = notifier;
} }
public void pong() { public void pong() {
_log.debug("Ping successful"); _log.debug("Ping successful");
@ -232,6 +246,8 @@ public class ConnectionManager {
_ponged = true; _ponged = true;
ConnectionManager.PingRequest.this.notifyAll(); ConnectionManager.PingRequest.this.notifyAll();
} }
if (_notifier != null)
_notifier.pingComplete(true);
} }
public boolean pongReceived() { return _ponged; } public boolean pongReceived() { return _ponged; }
} }

View File

@ -87,12 +87,17 @@ public class ConnectionOptions extends I2PSocketOptions {
public boolean getRequireFullySigned() { return _fullySigned; } public boolean getRequireFullySigned() { return _fullySigned; }
public void setRequireFullySigned(boolean sign) { _fullySigned = sign; } public void setRequireFullySigned(boolean sign) { _fullySigned = sign; }
private static final int MAX_WINDOW_SIZE = 32;
/** /**
* How many messages will we send before waiting for an ACK? * How many messages will we send before waiting for an ACK?
* *
*/ */
public int getWindowSize() { return _windowSize; } public int getWindowSize() { return _windowSize; }
public void setWindowSize(int numMsgs) { _windowSize = numMsgs; } public void setWindowSize(int numMsgs) {
if (numMsgs > MAX_WINDOW_SIZE)
numMsgs = MAX_WINDOW_SIZE;
_windowSize = numMsgs;
}
/** after how many consecutive messages should we ack? */ /** after how many consecutive messages should we ack? */
public int getReceiveWindow() { return _receiveWindow; } public int getReceiveWindow() { return _receiveWindow; }
@ -108,6 +113,13 @@ public class ConnectionOptions extends I2PSocketOptions {
_rtt = 60*1000; _rtt = 60*1000;
} }
/** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */
private static final double RTT_DAMPENING = 0.9;
public void updateRTT(int measuredValue) {
setRTT((int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*measuredValue));
}
/** How long after sending a packet will we wait before resending? */ /** How long after sending a packet will we wait before resending? */
public int getResendDelay() { return _resendDelay; } public int getResendDelay() { return _resendDelay; }
public void setResendDelay(int ms) { _resendDelay = ms; } public void setResendDelay(int ms) { _resendDelay = ms; }

View File

@ -17,9 +17,6 @@ public class ConnectionPacketHandler {
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
/** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */
private static final double RTT_DAMPENING = 0.9;
public ConnectionPacketHandler(I2PAppContext context) { public ConnectionPacketHandler(I2PAppContext context) {
_context = context; _context = context;
_log = context.logManager().getLog(ConnectionPacketHandler.class); _log = context.logManager().getLog(ConnectionPacketHandler.class);
@ -98,9 +95,7 @@ public class ConnectionPacketHandler {
_log.debug("Packet acked after " + p.getAckTime() + "ms: " + p); _log.debug("Packet acked after " + p.getAckTime() + "ms: " + p);
} }
if (highestRTT > 0) { if (highestRTT > 0) {
int oldRTT = con.getOptions().getRTT(); con.getOptions().updateRTT(highestRTT);
int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*highestRTT);
con.getOptions().setRTT(newRTT);
} }
} }
@ -128,6 +123,8 @@ public class ConnectionPacketHandler {
// window sizes are shrunk on resend, not on ack // window sizes are shrunk on resend, not on ack
} else { } else {
if (acked > 0) { if (acked > 0) {
long lowest = con.getHighestAckedThrough();
if (lowest >= con.getCongestionWindowEnd()) {
// new packet that ack'ed uncongested data, or an empty ack // new packet that ack'ed uncongested data, or an empty ack
int newWindowSize = con.getOptions().getWindowSize(); int newWindowSize = con.getOptions().getWindowSize();
newWindowSize += 1; // acked; // 1 newWindowSize += 1; // acked; // 1
@ -135,6 +132,8 @@ public class ConnectionPacketHandler {
_log.debug("New window size " + newWindowSize + " (#resends: " + numResends _log.debug("New window size " + newWindowSize + " (#resends: " + numResends
+ ") for " + con); + ") for " + con);
con.getOptions().setWindowSize(newWindowSize); con.getOptions().setWindowSize(newWindowSize);
con.setCongestionWindowEnd(newWindowSize + lowest);
}
} }
} }
return false; return false;

View File

@ -20,6 +20,7 @@ public class I2PSocketFull implements I2PSocket {
public void close() throws IOException { public void close() throws IOException {
if (_connection.getIsConnected()) { if (_connection.getIsConnected()) {
_connection.getOutputStream().close();
_connection.disconnect(true); _connection.disconnect(true);
} else { } else {
throw new IOException("Not connected"); throw new IOException("Not connected");

View File

@ -73,6 +73,6 @@ public class MessageHandler implements I2PSessionListener {
public void errorOccurred(I2PSession session, String message, Throwable error) { public void errorOccurred(I2PSession session, String message, Throwable error) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("error occurred: " + message, error); _log.error("error occurred: " + message, error);
_manager.disconnectAllHard(); //_manager.disconnectAllHard();
} }
} }

View File

@ -34,10 +34,11 @@ public class PacketHandler {
if (false) { if (false) {
// artificial choke: 2% random drop and a 0-30s // artificial choke: 2% random drop and a 0-30s
// random tiered delay from 0-30s // random tiered delay from 0-30s
if (_context.random().nextInt(100) >= 98) { if (_context.random().nextInt(100) >= 95) {
displayPacket(packet, "DROP"); displayPacket(packet, "DROP");
return false; return false;
} else { } else {
// if (true) return true; // no lag, just drop
/* /*
int delay = _context.random().nextInt(5*1000); int delay = _context.random().nextInt(5*1000);
*/ */

View File

@ -44,9 +44,10 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public Set getTagsSent() { return _tagsSent; } public Set getTagsSent() { return _tagsSent; }
public void setTagsSent(Set tags) { public void setTagsSent(Set tags) {
if ( (_tagsSent != null) && (_tagsSent.size() > 0) && (tags.size() > 0) ) { if ( (_tagsSent != null) && (_tagsSent.size() > 0) && (tags.size() > 0) ) {
int old = _tagsSent.size(); //int old = _tagsSent.size();
_tagsSent.addAll(tags); //_tagsSent.addAll(tags);
//System.out.println("Dup tags set on " +toString() + " old=" + old + " new=" + tags.size()); if (!_tagsSent.equals(tags))
System.out.println("ERROR: dup tags: old=" + _tagsSent.size() + " new=" + tags.size() + " packet: " + toString());
} else { } else {
_tagsSent = tags; _tagsSent = tags;
} }

View File

@ -44,7 +44,11 @@ class PacketQueue {
tagsSent = new HashSet(); tagsSent = new HashSet();
try { try {
// cache this from before sendMessage // cache this from before sendMessage
String conStr = packet.getConnection() + ""; String conStr = (packet.getConnection() != null ? packet.getConnection().toString() : "");
if (packet.getAckTime() > 0) {
_log.debug("Not resending " + packet);
return;
}
// this should not block! // this should not block!
long begin = _context.clock().now(); long begin = _context.clock().now();
boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent); boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent);

View File

@ -25,7 +25,7 @@ abstract class SchedulerImpl implements TaskScheduler {
private Exception _addedBy; private Exception _addedBy;
public ConEvent(Connection con) { public ConEvent(Connection con) {
_connection = con; _connection = con;
_addedBy = new Exception("added by"); //_addedBy = new Exception("added by");
} }
public void timeReached() { public void timeReached() {
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))

View File

@ -4,21 +4,39 @@ package net.i2p.client.streaming;
* *
*/ */
public class StreamSinkTest { public class StreamSinkTest {
/* private static String HOST1 = "dev.i2p.net";
private static String HOST2 = "dev.i2p.net";
private static String PORT1 = "4101";
private static String PORT2 = "4501";
*/ /* */
private static String HOST1 = "localhost";
private static String HOST2 = "localhost";
private static String PORT1 = "7654";
private static String PORT2 = "7654";
/* */ /*
private static String HOST1 = "localhost";
private static String HOST2 = "localhost";
private static String PORT1 = "10001";
private static String PORT2 = "11001";
*/
public static void main(String args[]) { public static void main(String args[]) {
System.setProperty(I2PSocketManagerFactory.PROP_MANAGER, I2PSocketManagerFull.class.getName()); System.setProperty(I2PSocketManagerFactory.PROP_MANAGER, I2PSocketManagerFull.class.getName());
//System.setProperty("tunnels.depthInbound", "0");
new Thread(new Runnable() { new Thread(new Runnable() {
public void run() { public void run() {
StreamSinkServer.main(new String[] { "streamSinkTestDir", "streamSinkTestServer.key" }); StreamSinkServer.main(new String[] { HOST1, PORT1, "streamSinkTestDir", "streamSinkTestServer.key" });
} }
}, "server").start(); }, "server").start();
try { Thread.sleep(30*1000); } catch (Exception e) {} try { Thread.sleep(60*1000); } catch (Exception e) {}
//run(256, 10000); //run(256, 10000);
//run(256, 1000); //run(256, 1000);
//run(1024, 10); //run(1024, 10);
run(32*1024, 1); run(32*1024, 1);
//run(1*1024, 1);
//run("/home/jrandom/streamSinkTestDir/clientSink36766.dat", 1); //run("/home/jrandom/streamSinkTestDir/clientSink36766.dat", 1);
//run(512*1024, 1); //run(512*1024, 1);
try { Thread.sleep(10*1000); } catch (InterruptedException e) {} try { Thread.sleep(10*1000); } catch (InterruptedException e) {}
@ -29,7 +47,7 @@ public class StreamSinkTest {
private static void run(final int kb, final int msBetweenWrites) { private static void run(final int kb, final int msBetweenWrites) {
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
public void run() { public void run() {
StreamSinkClient.main(new String[] { kb+"", msBetweenWrites+"", "streamSinkTestServer.key" }); StreamSinkClient.main(new String[] { HOST2, PORT2, kb+"", msBetweenWrites+"", "streamSinkTestServer.key" });
} }
}); });
t.start(); t.start();