2005-08-08 jrandom

* Add a configurable throttle to the number of concurrent outbound SSU
      connection negotiations (via i2np.udp.maxConcurrentEstablish=4).  This
      may help those with slow connections to get integrated at the start.
    * Further fixlets to the streaming lib
This commit is contained in:
jrandom
2005-08-08 20:35:50 +00:00
committed by zzz
parent ba30b56c5f
commit 6a19501214
11 changed files with 176 additions and 21 deletions

View File

@ -72,7 +72,7 @@ public class Connection {
private long _lifetimeDupMessageSent;
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 10*1000;
public static final long MAX_RESEND_DELAY = 5*1000;
public static final long MIN_RESEND_DELAY = 3*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */

View File

@ -98,7 +98,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 10));
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT));
@ -130,7 +130,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
if (opts.containsKey(PROP_MAX_RESENDS))
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 10));
if (opts.containsKey(PROP_WRITE_TIMEOUT))
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
if (opts.containsKey(PROP_INACTIVITY_TIMEOUT))

View File

@ -62,7 +62,7 @@ public class ConnectionPacketHandler {
con.getOutputStream().setBufferSize(packet.getOptionalMaxSize());
}
}
con.packetReceived();
boolean choke = false;
@ -92,7 +92,20 @@ public class ConnectionPacketHandler {
_context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize(), 0);
boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
boolean isNew = false;
boolean allowAck = true;
if ( (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) &&
( (packet.getSendStreamId() == null) ||
(packet.getReceiveStreamId() == null) ||
(DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) ||
(DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) ) )
allowAck = false;
if (allowAck)
isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
else
isNew = con.getInputStream().messageReceived(con.getInputStream().getHighestReadyBockId(), null);
if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
@ -190,6 +203,8 @@ public class ConnectionPacketHandler {
(!DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(con.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) )
acked = con.ackPackets(ackThrough, nacks);
else
return false;
if ( (acked != null) && (acked.size() > 0) ) {
if (_log.shouldLog(Log.DEBUG))

View File

@ -202,7 +202,7 @@ public class MessageInputStream extends InputStream {
public boolean messageReceived(long messageId, ByteArray payload) {
synchronized (_dataLock) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("received " + messageId + " with " + payload.getValid());
_log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload"));
if (messageId <= _highestReadyBlockId) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ignoring dup message " + messageId);

View File

@ -14,8 +14,8 @@ package net.i2p;
*
*/
public class CoreVersion {
public final static String ID = "$Revision: 1.36 $ $Date: 2005/07/27 14:04:49 $";
public final static String VERSION = "0.6.0.1";
public final static String ID = "$Revision: 1.37 $ $Date: 2005/08/03 13:58:12 $";
public final static String VERSION = "0.6.0.2";
public static void main(String args[]) {
System.out.println("I2P Core version: " + VERSION);

View File

@ -1,4 +1,12 @@
$Id: history.txt,v 1.222 2005/08/03 13:58:13 jrandom Exp $
$Id: history.txt,v 1.223 2005/08/07 14:31:58 jrandom Exp $
* 2005-08-08 0.6.0.2 released
2005-08-08 jrandom
* Add a configurable throttle to the number of concurrent outbound SSU
connection negotiations (via i2np.udp.maxConcurrentEstablish=4). This
may help those with slow connections to get integrated at the start.
* Further fixlets to the streaming lib
2005-08-07 Complication
* Display the average clock skew for both SSU and TCP connections

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.211 $ $Date: 2005/08/03 13:58:13 $";
public final static String VERSION = "0.6.0.1";
public final static long BUILD = 1;
public final static String ID = "$Revision: 1.212 $ $Date: 2005/08/07 14:31:58 $";
public final static String VERSION = "0.6.0.2";
public final static long BUILD = 0;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -110,7 +110,7 @@ public class GarlicMessageBuilder {
msg.setMessageExpiration(config.getExpiration());
long timeFromNow = config.getExpiration() - ctx.clock().now();
if (timeFromNow < 10*1000)
if (timeFromNow < 15*1000)
log.error("Building a message expiring in " + timeFromNow + "ms: " + config, new Exception("created by"));
if (log.shouldLog(Log.WARN))

View File

@ -1,7 +1,9 @@
package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
@ -31,10 +33,15 @@ public class EstablishmentManager {
private Map _inboundStates;
/** map of RemoteHostId to OutboundEstablishState */
private Map _outboundStates;
/** map of RemoteHostId to List of OutNetMessage for messages exceeding capacity */
private Map _queuedOutbound;
private boolean _alive;
private Object _activityLock;
private int _activity;
private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 16;
public static final String PROP_MAX_CONCURRENT_ESTABLISH = "i2np.udp.maxConcurrentEstablish";
public EstablishmentManager(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(EstablishmentManager.class);
@ -42,6 +49,7 @@ public class EstablishmentManager {
_builder = new PacketBuilder(ctx);
_inboundStates = new HashMap(32);
_outboundStates = new HashMap(32);
_queuedOutbound = new HashMap(32);
_activityLock = new Object();
_context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
@ -80,6 +88,18 @@ public class EstablishmentManager {
return state;
}
}
private int getMaxConcurrentEstablish() {
String val = _context.getProperty(PROP_MAX_CONCURRENT_ESTABLISH);
if (val != null) {
try {
return Integer.parseInt(val);
} catch (NumberFormatException nfe) {
return DEFAULT_MAX_CONCURRENT_ESTABLISH;
}
}
return DEFAULT_MAX_CONCURRENT_ESTABLISH;
}
/**
* Send the message to its specified recipient by establishing a connection
@ -104,12 +124,27 @@ public class EstablishmentManager {
synchronized (_outboundStates) {
OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(to);
if (state == null) {
state = new OutboundEstablishState(_context, remAddr, port,
msg.getTarget().getIdentity(),
new SessionKey(addr.getIntroKey()));
_outboundStates.put(to, state);
if (_outboundStates.size() >= getMaxConcurrentEstablish()) {
List queued = (List)_queuedOutbound.get(to);
if (queued == null) {
queued = new ArrayList(1);
_queuedOutbound.put(to, queued);
}
queued.add(msg);
} else {
state = new OutboundEstablishState(_context, remAddr, port,
msg.getTarget().getIdentity(),
new SessionKey(addr.getIntroKey()));
_outboundStates.put(to, state);
}
}
if (state != null) {
state.addMessage(msg);
List queued = (List)_queuedOutbound.remove(to);
if (queued != null)
for (int i = 0; i < queued.size(); i++)
state.addMessage((OutNetMessage)queued.get(i));
}
state.addMessage(msg);
}
notifyActivity();
@ -177,9 +212,27 @@ public class EstablishmentManager {
*/
PeerState receiveData(OutboundEstablishState state) {
state.dataReceived();
int active = 0;
int admitted = 0;
int remaining = 0;
synchronized (_outboundStates) {
active = _outboundStates.size();
_outboundStates.remove(state.getRemoteHostId());
if (_queuedOutbound.size() > 0) {
// there shouldn't have been queued messages for this active state, but just in case...
List queued = (List)_queuedOutbound.remove(state.getRemoteHostId());
if (queued != null) {
for (int i = 0; i < queued.size(); i++)
state.addMessage((OutNetMessage)queued.get(i));
}
admitted = locked_admitQueued();
}
remaining = _queuedOutbound.size();
}
//if (admitted > 0)
// _log.log(Log.CRIT, "Admitted " + admitted + " with " + remaining + " remaining queued and " + active + " active");
if (_log.shouldLog(Log.INFO))
_log.info("Outbound established completely! yay");
PeerState peer = handleCompletelyEstablished(state);
@ -187,6 +240,40 @@ public class EstablishmentManager {
return peer;
}
private int locked_admitQueued() {
int admitted = 0;
while ( (_queuedOutbound.size() > 0) && (_outboundStates.size() < getMaxConcurrentEstablish()) ) {
// ok, active shrunk, lets let some queued in. duplicate the synchronized
// section from the add(
RemoteHostId to = (RemoteHostId)_queuedOutbound.keySet().iterator().next();
List queued = (List)_queuedOutbound.remove(to);
if (queued.size() <= 0)
continue;
OutNetMessage msg = (OutNetMessage)queued.get(0);
RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle());
if (ra == null) {
for (int i = 0; i < queued.size(); i++)
_transport.failed((OutNetMessage)queued.get(i));
continue;
}
UDPAddress addr = new UDPAddress(ra);
InetAddress remAddr = addr.getHostAddress();
int port = addr.getPort();
OutboundEstablishState qstate = new OutboundEstablishState(_context, remAddr, port,
msg.getTarget().getIdentity(),
new SessionKey(addr.getIntroKey()));
_outboundStates.put(to, qstate);
for (int i = 0; i < queued.size(); i++)
qstate.addMessage((OutNetMessage)queued.get(i));
admitted++;
}
return admitted;
}
private void notifyActivity() {
synchronized (_activityLock) {
@ -429,7 +516,11 @@ public class EstablishmentManager {
long now = _context.clock().now();
long nextSendTime = -1;
OutboundEstablishState outboundState = null;
int admitted = 0;
int remaining = 0;
int active = 0;
synchronized (_outboundStates) {
active = _outboundStates.size();
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("# outbound states: " + _outboundStates.size());
for (Iterator iter = _outboundStates.values().iterator(); iter.hasNext(); ) {
@ -473,8 +564,14 @@ public class EstablishmentManager {
}
}
}
admitted = locked_admitQueued();
remaining = _queuedOutbound.size();
}
//if (admitted > 0)
// _log.log(Log.CRIT, "Admitted " + admitted + " in push with " + remaining + " remaining queued and " + active + " active");
if (outboundState != null) {
if (outboundState.getLifetime() > MAX_ESTABLISH_TIME) {
if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
@ -484,7 +581,23 @@ public class EstablishmentManager {
break;
_transport.failed(msg);
}
_context.shitlist().shitlistRouter(outboundState.getRemoteIdentity().calculateHash(), "Unable to establish with SSU");
String err = null;
switch (outboundState.getState()) {
case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY:
err = "Took too long to establish remote connection (confirmed partially)";
break;
case OutboundEstablishState.STATE_CREATED_RECEIVED:
err = "Took too long to establish remote connection (created received)";
break;
case OutboundEstablishState.STATE_REQUEST_SENT:
err = "Took too long to establish remote connection (request sent)";
break;
case OutboundEstablishState.STATE_UNKNOWN: // fallthrough
default:
err = "Took too long to establish remote connection (unknown state)";
}
_context.shitlist().shitlistRouter(outboundState.getRemoteIdentity().calculateHash(), err);
} else {
while (true) {
OutNetMessage msg = outboundState.getNextQueuedMessage();

View File

@ -319,4 +319,23 @@ public class InboundEstablishState {
_lastReceive = _context.clock().now();
_nextSend = _lastReceive;
}
public String toString() {
StringBuffer buf = new StringBuffer(128);
buf.append(super.toString());
if (_receivedX != null)
buf.append(" ReceivedX: ").append(Base64.encode(_receivedX, 0, 4));
if (_sentY != null)
buf.append(" SentY: ").append(Base64.encode(_sentY, 0, 4));
if (_aliceIP != null)
buf.append(" AliceIP: ").append(Base64.encode(_aliceIP));
buf.append(" AlicePort: ").append(_alicePort);
if (_bobIP != null)
buf.append(" BobIP: ").append(Base64.encode(_bobIP));
buf.append(" BobPort: ").append(_bobPort);
buf.append(" RelayTag: ").append(_sentRelayTag);
buf.append(" SignedOn: ").append(_sentSignedOnTime);
buf.append(" state: ").append(_currentState);
return buf.toString();
}
}

View File

@ -96,8 +96,8 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
public void runJob() {
HandleTunnelCreateMessageJob.this.runJob();
}
private static final String NAME_OK = "Deferred netDb accept";
private static final String NAME_REJECT = "Deferred netDb reject";
private static final String NAME_OK = "Deferred tunnel accept";
private static final String NAME_REJECT = "Deferred tunnel reject";
public String getName() { return _shouldAccept ? NAME_OK : NAME_REJECT; }
}