2006-02-20 jrandom

* Throttle the outbound SSU establishment queue, so it doesn't fill up the
      heap when backlogged (and so that the messages queued up on it don't sit
      there forever)
    * Further SSU memory cleanup
This commit is contained in:
jrandom
2006-02-21 02:02:48 +00:00
committed by zzz
parent ac8436a8eb
commit 9990126e3e
10 changed files with 128 additions and 66 deletions

View File

@ -1,4 +1,10 @@
$Id: history.txt,v 1.410 2006/02/20 11:42:36 jrandom Exp $
$Id: history.txt,v 1.411 2006/02/20 13:12:47 jrandom Exp $
2006-02-20 jrandom
* Throttle the outbound SSU establishment queue, so it doesn't fill up the
heap when backlogged (and so that the messages queued up on it don't sit
there forever)
* Further SSU memory cleanup
2006-02-20 jrandom
* Properly enable TCP this time (oops)

View File

@ -33,6 +33,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
private long _expiration;
private long _uniqueId;
private boolean _written;
private boolean _read;
public final static long DEFAULT_EXPIRATION_MS = 1*60*1000; // 1 minute by default
public final static int CHECKSUM_LENGTH = 1; //Hash.HASH_LENGTH;
@ -54,6 +55,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
_expiration = _context.clock().now() + DEFAULT_EXPIRATION_MS;
_uniqueId = _context.random().nextLong(MAX_ID_VALUE);
_written = false;
_read = false;
//_context.statManager().createRateStat("i2np.writeTime", "How long it takes to write an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 });
//_context.statManager().createRateStat("i2np.readTime", "How long it takes to read an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 });
}
@ -105,6 +107,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
long time = _context.clock().now() - start;
//if (time > 50)
// _context.statManager().addRateData("i2np.readTime", time, time);
_read = true;
return size + Hash.HASH_LENGTH + 1 + 4 + DataHelper.DATE_LENGTH;
} catch (DataFormatException dfe) {
throw new I2NPMessageException("Error reading the message header", dfe);
@ -149,6 +152,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
long time = _context.clock().now() - start;
//if (time > 50)
// _context.statManager().addRateData("i2np.readTime", time, time);
_read = true;
return cur - offset;
}
@ -296,7 +300,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
public static I2NPMessage fromRawByteArray(I2PAppContext ctx, byte buffer[], int offset, int len, I2NPMessageHandler handler) throws I2NPMessageException {
int type = (int)DataHelper.fromLong(buffer, offset, 1);
offset++;
I2NPMessage msg = createMessage(ctx, type);
I2NPMessageImpl msg = (I2NPMessageImpl)createMessage(ctx, type);
if (msg == null)
throw new I2NPMessageException("Unknown message type: " + type);
if (RAW_FULL_SIZE) {
@ -305,6 +309,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
} catch (IOException ioe) {
throw new I2NPMessageException("Error reading the " + msg, ioe);
}
msg.read();
return msg;
}
@ -314,6 +319,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
try {
msg.readMessage(buffer, offset, dataSize, type, handler);
msg.setMessageExpiration(expiration);
msg.read();
return msg;
} catch (IOException ioe) {
throw new I2NPMessageException("IO error reading raw message", ioe);
@ -322,6 +328,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
protected void verifyUnwritten() { if (_written) throw new RuntimeException("Already written"); }
protected void written() { _written = true; }
protected void read() { _read = true; }
/**
* Yes, this is fairly ugly, but its the only place it ever happens.

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.352 $ $Date: 2006/02/20 11:42:33 $";
public final static String ID = "$Revision: 1.353 $ $Date: 2006/02/20 13:12:48 $";
public final static String VERSION = "0.6.1.10";
public final static long BUILD = 8;
public final static long BUILD = 9;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -47,7 +47,7 @@ public class EstablishmentManager {
private Object _activityLock;
private int _activity;
private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 4;
private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 10;
public static final String PROP_MAX_CONCURRENT_ESTABLISH = "i2np.udp.maxConcurrentEstablish";
public EstablishmentManager(RouterContext ctx, UDPTransport transport) {
@ -67,6 +67,8 @@ public class EstablishmentManager {
_context.statManager().createRateStat("udp.sendIntroRelayRequest", "How often we send a relay request to reach a peer", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendIntroRelayTimeout", "How often a relay request times out before getting a response (due to the target or intro peer being offline)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
}
public void startup() {
@ -112,6 +114,9 @@ public class EstablishmentManager {
}
return DEFAULT_MAX_CONCURRENT_ESTABLISH;
}
private static final int MAX_QUEUED_OUTBOUND = 10*1000;
private static final int MAX_QUEUED_PER_PEER = 3;
/**
* Send the message to its specified recipient by establishing a connection
@ -153,16 +158,24 @@ public class EstablishmentManager {
OutboundEstablishState state = null;
int deferred = 0;
boolean rejected = false;
int queueCount = 0;
synchronized (_outboundStates) {
state = (OutboundEstablishState)_outboundStates.get(to);
if (state == null) {
if (_outboundStates.size() >= getMaxConcurrentEstablish()) {
List queued = (List)_queuedOutbound.get(to);
if (queued == null) {
queued = new ArrayList(1);
_queuedOutbound.put(to, queued);
if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) {
rejected = true;
} else {
queued = new ArrayList(1);
_queuedOutbound.put(to, queued);
}
}
queued.add(msg);
queueCount = queued.size();
if ( (queueCount < MAX_QUEUED_PER_PEER) && (!rejected) )
queued.add(msg);
deferred = _queuedOutbound.size();
} else {
state = new OutboundEstablishState(_context, remAddr, port,
@ -181,6 +194,17 @@ public class EstablishmentManager {
}
}
if (rejected) {
_transport.failed(msg, "Too many pending outbound connections");
_context.statManager().addRateData("udp.establishRejected", deferred, 0);
return;
}
if (queueCount >= MAX_QUEUED_PER_PEER) {
_transport.failed(msg, "Too many pending messages for the given peer");
_context.statManager().addRateData("udp.establishOverflow", queueCount, deferred);
return;
}
if (deferred > 0)
msg.timestamp("too many deferred establishers: " + deferred);
else if (state != null)
@ -199,7 +223,7 @@ public class EstablishmentManager {
Object removed = null;
synchronized (_outboundStates) {
removed = _outboundStates.remove(_to);
if (removed != _state) { // oops, we must have failed, then retried
if ( (removed != null) && (removed != _state) ) { // oops, we must have failed, then retried
_outboundStates.put(_to, removed);
removed = null;
}/* else {
@ -388,6 +412,8 @@ public class EstablishmentManager {
*
*/
private void handleCompletelyEstablished(InboundEstablishState state) {
if (state.complete()) return;
long now = _context.clock().now();
RouterIdentity remote = state.getConfirmedIdentity();
PeerState peer = new PeerState(_context, _transport);
@ -423,6 +449,12 @@ public class EstablishmentManager {
*
*/
private PeerState handleCompletelyEstablished(OutboundEstablishState state) {
if (state.complete()) {
RouterIdentity rem = state.getRemoteIdentity();
if (rem != null)
return _transport.getPeerState(rem.getHash());
}
long now = _context.clock().now();
RouterIdentity remote = state.getRemoteIdentity();
PeerState peer = new PeerState(_context, _transport);

View File

@ -54,6 +54,7 @@ public class InboundEstablishState {
private long _nextSend;
private RemoteHostId _remoteHostId;
private int _currentState;
private boolean _complete;
/** nothin known yet */
public static final int STATE_UNKNOWN = 0;
@ -77,11 +78,17 @@ public class InboundEstablishState {
_bobPort = localPort;
_keyBuilder = null;
_verificationAttempted = false;
_complete = false;
_currentState = STATE_UNKNOWN;
_establishBegin = ctx.clock().now();
}
public synchronized int getState() { return _currentState; }
public synchronized boolean complete() {
boolean already = _complete;
_complete = true;
return already;
}
public synchronized void receiveSessionRequest(UDPPacketReader.SessionRequestReader req) {
if (_receivedX == null)

View File

@ -59,6 +59,7 @@ public class OutboundEstablishState {
private long _introductionNonce;
// intro
private UDPAddress _remoteAddress;
private boolean _complete;
/** nothin sent yet */
public static final int STATE_UNKNOWN = 0;
@ -94,6 +95,7 @@ public class OutboundEstablishState {
_establishBegin = ctx.clock().now();
_remoteAddress = addr;
_introductionNonce = -1;
_complete = false;
prepareSessionRequest();
if ( (addr != null) && (addr.getIntroducerCount() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
@ -103,6 +105,11 @@ public class OutboundEstablishState {
}
public synchronized int getState() { return _currentState; }
public synchronized boolean complete() {
boolean already = _complete;
_complete = true;
return already;
}
public UDPAddress getRemoteAddress() { return _remoteAddress; }
public void setIntroNonce(long nonce) { _introductionNonce = nonce; }

View File

@ -34,7 +34,7 @@ public class OutboundMessageState {
private int _nextSendFragment;
public static final int MAX_FRAGMENTS = 32;
private static final ByteCache _cache = ByteCache.getInstance(128, MAX_FRAGMENTS*1024);
private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024);
public OutboundMessageState(I2PAppContext context) {
_context = context;

View File

@ -413,22 +413,24 @@ public class PacketHandler {
state = _establisher.receiveData(outState);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received new DATA packet from " + state + ": " + packet);
UDPPacketReader.DataReader dr = reader.getDataReader();
if (_log.shouldLog(Log.INFO)) {
StringBuffer msg = new StringBuffer();
msg.append("Receive ").append(System.identityHashCode(packet));
msg.append(" from ").append(state.getRemotePeer().toBase64()).append(" ").append(state.getRemoteHostId());
for (int i = 0; i < dr.readFragmentCount(); i++) {
msg.append(" msg ").append(dr.readMessageId(i));
msg.append(":").append(dr.readMessageFragmentNum(i));
if (dr.readMessageIsLast(i))
msg.append("*");
if (state != null) {
UDPPacketReader.DataReader dr = reader.getDataReader();
if (_log.shouldLog(Log.INFO)) {
StringBuffer msg = new StringBuffer();
msg.append("Receive ").append(System.identityHashCode(packet));
msg.append(" from ").append(state.getRemotePeer().toBase64()).append(" ").append(state.getRemoteHostId());
for (int i = 0; i < dr.readFragmentCount(); i++) {
msg.append(" msg ").append(dr.readMessageId(i));
msg.append(":").append(dr.readMessageFragmentNum(i));
if (dr.readMessageIsLast(i))
msg.append("*");
}
msg.append(": ").append(dr.toString());
_log.info(msg.toString());
}
msg.append(": ").append(dr.toString());
_log.info(msg.toString());
packet.beforeReceiveFragments();
_inbound.receiveData(state, dr);
}
packet.beforeReceiveFragments();
_inbound.receiveData(state, dr);
break;
case UDPPacket.PAYLOAD_TYPE_TEST:
_state = 51;

View File

@ -1427,6 +1427,7 @@ public class PeerState {
tmp.addAll(oldPeer._outboundMessages);
oldPeer._outboundMessages.clear();
retransmitter = oldPeer._retransmitter;
oldPeer._retransmitter = null;
}
synchronized (_outboundMessages) {
_outboundMessages.addAll(tmp);
@ -1434,7 +1435,8 @@ public class PeerState {
}
tmp.clear();
}
/*
public int hashCode() {
if (_remotePeer != null)
return _remotePeer.hashCode();
@ -1454,6 +1456,7 @@ public class PeerState {
return false;
}
}
*/
public String toString() {
StringBuffer buf = new StringBuffer(64);

View File

@ -483,6 +483,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
if (oldPeer != null) {
oldPeer.dropOutbound();
_introManager.remove(oldPeer);
_expireEvent.remove(oldPeer);
}
oldPeer = null;
RemoteHostId remoteId = peer.getRemoteHostId();
@ -497,6 +502,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
if (oldPeer != null) {
oldPeer.dropOutbound();
_introManager.remove(oldPeer);
_expireEvent.remove(oldPeer);
}
if ( (oldPeer != null) && (_log.shouldLog(Log.WARN)) )
_log.warn("Peer already connected: old=" + oldPeer + " new=" + peer, new Exception("dup"));
@ -624,6 +635,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.warn(buf.toString(), new Exception("Dropped by"));
}
peer.dropOutbound();
_introManager.remove(peer);
_fragments.dropPeer(peer);
// a bit overzealous - perhaps we should only rebuild the external if the peer being dropped
@ -1384,67 +1396,53 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int EXPIRE_TIMEOUT = 10*60*1000;
private class ExpirePeerEvent implements SimpleTimer.TimedEvent {
private List _peers;
// toAdd and toRemove are kept separate from _peers so that add and
// remove calls won't block packet handling while the big iteration is
// in process
private List _toAdd;
private List _toRemove;
private List _expirePeers;
private List _expireBuffer;
private boolean _alive;
public ExpirePeerEvent() {
_peers = new ArrayList(128);
_toAdd = new ArrayList(4);
_toRemove = new ArrayList(4);
_expirePeers = new ArrayList(128);
_expireBuffer = new ArrayList(128);
}
public void timeReached() {
long inactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT;
for (int i = 0; i < _peers.size(); i++) {
PeerState peer = (PeerState)_peers.get(i);
if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) {
dropPeer(peer, false);
_peers.remove(i);
i--;
_expireBuffer.clear();
synchronized (_expirePeers) {
int sz = _expirePeers.size();
for (int i = 0; i < sz; i++) {
PeerState peer = (PeerState)_expirePeers.get(i);
if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) {
_expireBuffer.add(peer);
_expirePeers.remove(i);
i--;
sz--;
}
}
}
synchronized (_toAdd) {
for (int i = 0; i < _toAdd.size(); i++) {
PeerState peer = (PeerState)_toAdd.get(i);
_peers.remove(peer); // in case we are switching peers
_peers.add(peer);
}
_toAdd.clear();
}
synchronized (_toRemove) {
for (int i = 0; i < _toRemove.size(); i++) {
PeerState peer = (PeerState)_toRemove.get(i);
_peers.remove(peer);
}
_toRemove.clear();
}
for (int i = 0; i < _expireBuffer.size(); i++)
dropPeer((PeerState)_expireBuffer.get(i), false);
_expireBuffer.clear();
if (_alive)
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 5*60*1000);
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000);
}
public void add(PeerState peer) {
synchronized (_toAdd) {
_toAdd.add(peer);
synchronized (_expirePeers) {
_expirePeers.add(peer);
}
}
public void remove(PeerState peer) {
synchronized (_toRemove) {
_toRemove.add(peer);
synchronized (_expirePeers) {
_expirePeers.remove(peer);
}
}
public void setIsAlive(boolean isAlive) {
_alive = isAlive;
if (isAlive) {
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 5*60*1000);
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000);
} else {
SimpleTimer.getInstance().removeEvent(ExpirePeerEvent.this);
synchronized (_toAdd) {
_toAdd.clear();
}
synchronized (_peers) {
_peers.clear();
synchronized (_expirePeers) {
_expirePeers.clear();
}
}
}