2005-03-02 jrandom

* Fix one substantial OOM cause (session tag manager was only dropping
      tags once the critical limit was met, rather than honoring their
      expiration) (duh)
    * Lots of small memory fixes
    * Double the allowable concurrent outstanding tunnel build tasks (20)
This commit is contained in:
jrandom
2005-03-03 03:36:52 +00:00
committed by zzz
parent 2d15a42137
commit ef230cfa3d
11 changed files with 76 additions and 53 deletions

View File

@ -106,12 +106,12 @@ public class PacketHandler {
private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS");
void displayPacket(Packet packet, String prefix, String suffix) {
if (!_log.shouldLog(Log.DEBUG)) return;
String msg = null;
synchronized (_fmt) {
msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString() + (suffix != null ? " " + suffix : "");
}
if (_log.shouldLog(Log.DEBUG))
System.out.println(msg);
System.out.println(msg);
}
private void receiveKnownCon(Connection con, Packet packet) {

View File

@ -48,7 +48,9 @@ class PacketQueue {
tagsSent = new HashSet(0);
// cache this from before sendMessage
String conStr = (packet.getConnection() != null ? packet.getConnection().toString() : "");
String conStr = null;
if (_log.shouldLog(Log.DEBUG))
conStr = (packet.getConnection() != null ? packet.getConnection().toString() : "");
if (packet.getAckTime() > 0) {
_log.debug("Not resending " + packet);
return;

View File

@ -39,6 +39,7 @@ import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.SessionId;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/**
* Implementation of an I2P session running over TCP. This class is NOT thread safe -
@ -348,8 +349,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
int id = msg.getMessageId().getMessageId();
byte data[] = msg.getPayload().getUnencryptedData();
if ((data == null) || (data.length <= 0)) {
if (_log.shouldLog(Log.ERROR))
_log.error(getPrefix() + "addNewMessage of a message with no unencrypted data",
if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, getPrefix() + "addNewMessage of a message with no unencrypted data",
new Exception("Empty message"));
} else {
int size = data.length;
@ -357,6 +358,20 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id);
}
SimpleTimer.getInstance().addEvent(new VerifyUsage(id), 30*1000);
}
private class VerifyUsage implements SimpleTimer.TimedEvent {
private int _msgId;
public VerifyUsage(int id) { _msgId = id; }
public void timeReached() {
MessagePayloadMessage removed = null;
synchronized (_availableMessages) {
removed = (MessagePayloadMessage)_availableMessages.remove(new Integer(_msgId));
}
if (removed != null)
_log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed);
}
}
private class AvailabilityNotifier implements Runnable {
@ -407,6 +422,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
} catch (Exception e) {
_log.log(Log.CRIT, "Error notifying app of message availability", e);
}
} else {
_log.log(Log.CRIT, "Unable to notify an app that " + msgId + " of size " + size + " is available!");
}
}
}

View File

@ -41,7 +41,8 @@ class RequestLeaseSetMessageHandler extends HandlerImpl {
}
public void handleMessage(I2CPMessage message, I2PSessionImpl session) {
_log.debug("Handle message " + message);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle message " + message);
RequestLeaseSetMessage msg = (RequestLeaseSetMessage) message;
LeaseSet leaseSet = new LeaseSet();
for (int i = 0; i < msg.getEndpoints(); i++) {

View File

@ -24,6 +24,7 @@ import net.i2p.data.PublicKey;
import net.i2p.data.SessionKey;
import net.i2p.data.SessionTag;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/**
* Implement the session key management, but keep everything in memory (don't write
@ -67,8 +68,21 @@ class TransientSessionKeyManager extends SessionKeyManager {
_context = context;
_outboundSessions = new HashMap(1024);
_inboundTagSets = new HashMap(64*1024);
context.statManager().createRateStat("crypto.sessionTagsExpired", "How many tags/sessions are expired?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 });
context.statManager().createRateStat("crypto.sessionTagsRemaining", "How many tags/sessions are remaining after a cleanup?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 });
SimpleTimer.getInstance().addEvent(new CleanupEvent(), 60*1000);
}
private TransientSessionKeyManager() { this(null); }
private class CleanupEvent implements SimpleTimer.TimedEvent {
public void timeReached() {
long beforeExpire = _context.clock().now();
int expired = aggressiveExpire();
long expireTime = _context.clock().now() - beforeExpire;
_context.statManager().addRateData("crypto.sessionTagsExpired", expired, expireTime);
SimpleTimer.getInstance().addEvent(CleanupEvent.this, 60*1000);
}
}
/** TagSet */
protected Set getInboundTagSets() {
@ -247,6 +261,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
overage = _inboundTagSets.size() - MAX_INBOUND_SESSION_TAGS;
}
}
if (overage > 0)
clearExcess(overage);
@ -361,45 +376,32 @@ class TransientSessionKeyManager extends SessionKeyManager {
*/
public int aggressiveExpire() {
int removed = 0;
int remaining = 0;
long now = _context.clock().now();
Set tagsToDrop = null; // new HashSet(64);
synchronized (_inboundTagSets) {
for (Iterator iter = _inboundTagSets.keySet().iterator(); iter.hasNext();) {
SessionTag tag = (SessionTag) iter.next();
TagSet ts = (TagSet) _inboundTagSets.get(tag);
if (ts.getDate() < now - SESSION_LIFETIME_MAX_MS) {
if (tagsToDrop == null)
tagsToDrop = new HashSet(4);
tagsToDrop.add(tag);
iter.remove();
removed++;
}
}
if (tagsToDrop != null) {
removed += tagsToDrop.size();
for (Iterator iter = tagsToDrop.iterator(); iter.hasNext();)
_inboundTagSets.remove(iter.next());
}
remaining = _inboundTagSets.size();
}
_context.statManager().addRateData("crypto.sessionTagsRemaining", remaining, 0);
//_log.warn("Expiring tags: [" + tagsToDrop + "]");
synchronized (_outboundSessions) {
Set sessionsToDrop = null;
for (Iterator iter = _outboundSessions.keySet().iterator(); iter.hasNext();) {
PublicKey key = (PublicKey) iter.next();
OutboundSession sess = (OutboundSession) _outboundSessions.get(key);
removed += sess.expireTags();
if (sess.getTagSets().size() <= 0) {
if (sessionsToDrop == null)
sessionsToDrop = new HashSet(4);
sessionsToDrop.add(key);
}
}
if (sessionsToDrop != null) {
for (Iterator iter = sessionsToDrop.iterator(); iter.hasNext();) {
OutboundSession cur = (OutboundSession)_outboundSessions.remove(iter.next());
if ( (cur != null) && (_log.shouldLog(Log.WARN)) )
_log.warn("Removing session tags with " + cur.availableTags() + " available for "
+ (cur.getLastExpirationDate()-_context.clock().now())
+ "ms more", new Exception("Removed by"));
if (sess.availableTags() <= 0) {
iter.remove();
removed++;
}
}
}

View File

@ -73,14 +73,16 @@ public class Payload extends DataStructureImpl {
_encryptedData = new byte[size];
int read = read(in, _encryptedData);
if (read != size) throw new DataFormatException("Incorrect number of bytes read in the payload structure");
_log.debug("read payload: " + read + " bytes");
if (_log.shouldLog(Log.DEBUG))
_log.debug("read payload: " + read + " bytes");
}
public void writeBytes(OutputStream out) throws DataFormatException, IOException {
if (_encryptedData == null) throw new DataFormatException("Not yet encrypted. Please set the encrypted data");
DataHelper.writeLong(out, 4, _encryptedData.length);
out.write(_encryptedData);
_log.debug("wrote payload: " + _encryptedData.length);
if (_log.shouldLog(Log.DEBUG))
_log.debug("wrote payload: " + _encryptedData.length);
}
public int writeBytes(byte target[], int offset) {
if (_encryptedData == null) throw new IllegalStateException("Not yet encrypted. Please set the encrypted data");

View File

@ -64,14 +64,10 @@ public class ReceiveMessageBeginMessage extends I2CPMessageImpl {
protected byte[] doWriteMessage() throws I2CPMessageException, IOException {
if ((_sessionId == null) || (_messageId == null))
throw new I2CPMessageException("Unable to write out the message as there is not enough data");
ByteArrayOutputStream os = new ByteArrayOutputStream(64);
try {
_sessionId.writeBytes(os);
_messageId.writeBytes(os);
} catch (DataFormatException dfe) {
throw new I2CPMessageException("Error writing out the message data", dfe);
}
return os.toByteArray();
byte rv[] = new byte[2+4];
DataHelper.toLong(rv, 0, 2, _sessionId.getSessionId());
DataHelper.toLong(rv, 2, 4, _messageId.getMessageId());
return rv;
}
public int getType() {

View File

@ -64,14 +64,10 @@ public class ReceiveMessageEndMessage extends I2CPMessageImpl {
protected byte[] doWriteMessage() throws I2CPMessageException, IOException {
if ((_sessionId == null) || (_messageId == null))
throw new I2CPMessageException("Unable to write out the message as there is not enough data");
ByteArrayOutputStream os = new ByteArrayOutputStream(64);
try {
_sessionId.writeBytes(os);
_messageId.writeBytes(os);
} catch (DataFormatException dfe) {
throw new I2CPMessageException("Error writing out the message data", dfe);
}
return os.toByteArray();
byte rv[] = new byte[2+4];
DataHelper.toLong(rv, 0, 2, _sessionId.getSessionId());
DataHelper.toLong(rv, 2, 4, _messageId.getMessageId());
return rv;
}
public int getType() {

View File

@ -1,4 +1,11 @@
$Id: history.txt,v 1.160 2005/02/27 17:09:37 jrandom Exp $
$Id: history.txt,v 1.161 2005/03/01 12:50:54 jrandom Exp $
2005-03-02 jrandom
* Fix one substantial OOM cause (session tag manager was only dropping
tags once the critical limit was met, rather than honoring their
expiration) (duh)
* Lots of small memory fixes
* Double the allowable concurrent outstanding tunnel build tasks (20)
2005-03-01 jrandom
* Really disable the streaming lib packet caching

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.155 $ $Date: 2005/02/27 17:09:37 $";
public final static String ID = "$Revision: 1.156 $ $Date: 2005/03/01 12:50:54 $";
public final static String VERSION = "0.5.0.1";
public final static long BUILD = 7;
public final static long BUILD = 8;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -54,13 +54,13 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_clientInboundPools = new HashMap(4);
_clientOutboundPools = new HashMap(4);
_outstandingBuilds = 0;
_maxOutstandingBuilds = 10;
String max = ctx.getProperty("router.tunnel.maxConcurrentBuilds", "10");
_maxOutstandingBuilds = 20;
String max = ctx.getProperty("router.tunnel.maxConcurrentBuilds", "20");
if (max != null) {
try {
_maxOutstandingBuilds = Integer.parseInt(max);
} catch (NumberFormatException nfe) {
_maxOutstandingBuilds = 10;
_maxOutstandingBuilds = 20;
}
}