* Streaming: TCB control block sharing

also tweak ResendPacketEvent to prepare for PacketQueue sending timeout to I2CP
This commit is contained in:
zzz
2009-01-20 17:20:37 +00:00
parent 0e2a4227ef
commit ab92206b77
4 changed files with 181 additions and 23 deletions

View File

@ -45,6 +45,7 @@ public class Connection {
private long _congestionWindowEnd;
private long _highestAckedThrough;
private boolean _isInbound;
private boolean _updatedShareOpts;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */
private Map _outboundPackets;
private PacketQueue _outboundQueue;
@ -120,6 +121,7 @@ public class Connection {
_activeResends = 0;
_resetSentOn = -1;
_isInbound = false;
_updatedShareOpts = false;
_connectionEvent = new ConEvent();
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -586,6 +588,8 @@ public class Connection {
if (_remotePeerSet) throw new RuntimeException("Remote peer already set [" + _remotePeer + ", " + peer + "]");
_remotePeerSet = true;
_remotePeer = peer;
// now that we know who the other end is, get the rtt etc. from the cache
_connectionManager.updateOptsFromShare(this);
}
private boolean _sendStreamIdSet = false;
@ -709,7 +713,13 @@ public class Connection {
}
public long getCloseReceivedOn() { return _closeReceivedOn; }
public void setCloseReceivedOn(long when) { _closeReceivedOn = when; }
public void updateShareOpts() {
if (_closeSentOn > 0 && !_updatedShareOpts) {
_connectionManager.updateShareOpts(this);
_updatedShareOpts = true;
}
}
public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; }
public int getUnackedPacketsReceived() { return _unackedPacketsReceived; }
/** how many packets have we sent but not yet received an ACK for?
@ -998,7 +1008,7 @@ public class Connection {
/**
* Coordinate the resends of a given packet
*/
private class ResendPacketEvent implements SimpleTimer.TimedEvent {
public class ResendPacketEvent implements SimpleTimer.TimedEvent {
private PacketLocal _packet;
private long _nextSendTime;
public ResendPacketEvent(PacketLocal packet, long sendTime) {
@ -1104,26 +1114,6 @@ public class Connection {
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
}
if (numSends - 1 <= _options.getMaxResends()) {
if (_log.shouldLog(Log.INFO))
_log.info("Resend packet " + _packet + " time " + numSends +
" activeResends: " + _activeResends +
" (wsize "
+ newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_outboundQueue.enqueue(_packet);
_lastSendTime = _context.clock().now();
}
// acked during resending (... or somethin')
if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) {
_activeResends--;
synchronized (_outboundPackets) {
_outboundPackets.notifyAll();
}
return true;
}
if (numSends - 1 > _options.getMaxResends()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Too many resends");
@ -1137,11 +1127,32 @@ public class Connection {
long timeout = rto << (numSends-1);
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
timeout = MAX_RESEND_DELAY;
// set this before enqueue() as it passes it on to the router
_nextSendTime = timeout + _context.clock().now();
if (_log.shouldLog(Log.INFO))
_log.info("Resend packet " + _packet + " time " + numSends +
" activeResends: " + _activeResends +
" (wsize "
+ newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_outboundQueue.enqueue(_packet);
_lastSendTime = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling resend in " + timeout + "ms for " + _packet);
RetransmissionTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
_nextSendTime = timeout + _context.clock().now();
}
// acked during resending (... or somethin')
if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) {
_activeResends--;
synchronized (_outboundPackets) {
_outboundPackets.notifyAll();
}
return true;
}
return true;
} else {
//if (_log.shouldLog(Log.DEBUG))

View File

@ -30,6 +30,7 @@ public class ConnectionManager {
private PacketQueue _outboundQueue;
private SchedulerChooser _schedulerChooser;
private ConnectionPacketHandler _conPacketHandler;
private TCBShare _tcbShare;
/** Inbound stream ID (Long) to Connection map */
private Map _connectionByInboundId;
/** Ping ID (Long) to PingRequest */
@ -52,6 +53,7 @@ public class ConnectionManager {
_connectionHandler = new ConnectionHandler(context, this);
_schedulerChooser = new SchedulerChooser(context);
_conPacketHandler = new ConnectionPacketHandler(context);
_tcbShare = new TCBShare(context);
_session = session;
session.setSessionListener(_messageHandler);
_outboundQueue = new PacketQueue(context, session, this);
@ -127,6 +129,7 @@ public class ConnectionManager {
*/
public Connection receiveConnection(Packet synPacket) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
_tcbShare.updateOptsFromShare(con);
con.setInbound();
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
boolean reject = false;
@ -277,6 +280,8 @@ public class ConnectionManager {
public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
public I2PSession getSession() { return _session; }
public PacketQueue getPacketQueue() { return _outboundQueue; }
public void updateOptsFromShare(Connection con) { _tcbShare.updateOptsFromShare(con); }
public void updateShareOpts(Connection con) { _tcbShare.updateShareOpts(con); }
/**
* Something b0rked hard, so kill all of our connections without mercy.
@ -292,6 +297,7 @@ public class ConnectionManager {
_connectionByInboundId.clear();
_connectionLock.notifyAll();
}
_tcbShare.stop();
}
/**

View File

@ -213,6 +213,10 @@ public class ConnectionPacketHandler {
packet.releasePayload();
}
// update the TCB Cache now that we've processed the acks and updated our rtt etc.
if (isNew && packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
con.updateShareOpts();
//if (choke)
// con.fastRetransmit();
}

View File

@ -0,0 +1,137 @@
package net.i2p.client.streaming;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/**
* Share important TCP Control Block parameters across Connections
* to the same remote peer.
* This is intended for "temporal" sharing at connection open/close time,
* not "ensemble" sharing during a connection. Ref. RFC 2140.
*
* There is a TCB share per ConnectionManager (i.e. per local Destination)
* so that there is no information leakage to other Destinations on the
* same router.
*
*/
public class TCBShare {
private I2PAppContext _context;
private Log _log;
private Map<Destination, Entry> _cache;
private CleanEvent _cleaner;
private static final long EXPIRE_TIME = 30*60*1000;
private static final long CLEAN_TIME = 10*60*1000;
private static final double RTT_DAMPENING = 0.75;
private static final double WDW_DAMPENING = 0.75;
private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE / 4;
public TCBShare(I2PAppContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(TCBShare.class);
_cache = new ConcurrentHashMap(4);
_cleaner = new CleanEvent();
SimpleTimer.getInstance().addEvent(_cleaner, CLEAN_TIME);
}
public void stop() {
SimpleTimer.getInstance().removeEvent(_cleaner);
}
public void updateOptsFromShare(Connection con) {
Destination dest = con.getRemotePeer();
if (dest == null)
return;
ConnectionOptions opts = con.getOptions();
if (opts == null)
return;
Entry e = _cache.get(dest);
if (e == null || e.isExpired())
return;
if (_log.shouldLog(Log.DEBUG))
_log.debug("From cache: " +
con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
'-' +
dest.calculateHash().toBase64().substring(0, 4) +
" RTT: " + e.getRTT() + " wdw: " + e.getWindowSize());
opts.setRTT(e.getRTT());
opts.setWindowSize(e.getWindowSize());
}
public void updateShareOpts(Connection con) {
Destination dest = con.getRemotePeer();
if (dest == null)
return;
if (con.getAckedPackets() <= 0)
return;
ConnectionOptions opts = con.getOptions();
if (opts == null)
return;
int old = -1;
int oldw = -1;
Entry e = _cache.get(dest);
if (e == null || e.isExpired()) {
e = new Entry(opts.getRTT(), opts.getWindowSize());
_cache.put(dest, e);
} else {
old = e.getRTT();
oldw = e.getWindowSize();
e.setRTT(opts.getRTT());
e.setWindowSize(opts.getWindowSize());
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("To cache: " +
con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
'-' +
dest.calculateHash().toBase64().substring(0, 4) +
" old: " + old + " con: " + opts.getRTT() + " new: " + e.getRTT() +
" oldw: " + oldw + " conw: " + opts.getWindowSize() + " neww: " + e.getWindowSize());
}
private class Entry {
int _rtt;
int _wdw;
long _updated;
public Entry(int ms, int wdw) {
_rtt = ms;
_wdw = wdw;
_updated = _context.clock().now();
}
public int getRTT() { return _rtt; }
public void setRTT(int ms) {
_rtt = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*ms);
if (_rtt > MAX_RTT)
_rtt = MAX_RTT;
_updated = _context.clock().now();
}
public int getWindowSize() { return _wdw; }
public void setWindowSize(int wdw) {
_wdw = (int)(0.5 + WDW_DAMPENING*_wdw + (1-WDW_DAMPENING)*wdw);
if (_wdw > MAX_WINDOW_SIZE)
_wdw = MAX_WINDOW_SIZE;
_updated = _context.clock().now();
}
public boolean isExpired() {
return _updated < _context.clock().now() - EXPIRE_TIME;
}
}
private class CleanEvent implements SimpleTimer.TimedEvent {
public CleanEvent() {}
public void timeReached() {
for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) {
if (_cache.get(iter.next()).isExpired())
iter.remove();
}
SimpleTimer.getInstance().addEvent(CleanEvent.this, CLEAN_TIME);
}
}
}