diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index d99d1b70d0..59f5e8f47d 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -31,7 +31,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL * Sun's impl of BufferedOutputStream), but that is the streaming * api's job... */ - static int MAX_PACKET_SIZE = 1024 * 32; + static int MAX_PACKET_SIZE = 1024 * 16; static final int NETWORK_BUFFER_SIZE = MAX_PACKET_SIZE; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 404808aa8d..6e23dfced0 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -3,6 +3,8 @@ package net.i2p.client.streaming; import java.io.InterruptedIOException; import java.io.IOException; import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; +import net.i2p.util.ByteCache; import net.i2p.util.Log; /** @@ -18,11 +20,13 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { private Log _log; private Connection _connection; private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus(); + private ByteCache _cache; public ConnectionDataReceiver(I2PAppContext ctx, Connection con) { _context = ctx; _log = ctx.logManager().getLog(ConnectionDataReceiver.class); _connection = con; + _cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE); } public boolean writeInProcess() { @@ -133,9 +137,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { private PacketLocal buildPacket(Connection con, byte buf[], int off, int size, boolean forceIncrement) { boolean ackOnly = isAckOnly(con, size); PacketLocal packet = new PacketLocal(_context, con.getRemotePeer(), con); - byte data[] = new byte[size]; + ByteArray data = (size <= Packet.MAX_PAYLOAD_SIZE ? _cache.acquire() : new ByteArray(new byte[size])); if (size > 0) - System.arraycopy(buf, off, data, 0, size); + System.arraycopy(buf, off, data.getData(), 0, size); + data.setValid(size); + data.setOffset(0); packet.setPayload(data); if (ackOnly && !forceIncrement) packet.setSequenceNum(0); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 2e167bab21..4cdf9ff334 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -6,6 +6,7 @@ import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.data.DataHelper; import net.i2p.data.Destination; +import net.i2p.util.ByteCache; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -17,9 +18,11 @@ import net.i2p.util.SimpleTimer; public class ConnectionPacketHandler { private I2PAppContext _context; private Log _log; + private ByteCache _cache; public ConnectionPacketHandler(I2PAppContext context) { _context = context; + _cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE); _log = context.logManager().getLog(ConnectionPacketHandler.class); _context.statManager().createRateStat("stream.con.receiveMessageSize", "Size of a message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.receiveDuplicateSize", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -34,6 +37,7 @@ public class ConnectionPacketHandler { if (!ok) { if ( (!packet.isFlagSet(Packet.FLAG_RESET)) && (_log.shouldLog(Log.ERROR)) ) _log.error("Packet does NOT verify: " + packet); + _cache.release(packet.getPayload()); return; } @@ -47,6 +51,7 @@ public class ConnectionPacketHandler { if (_log.shouldLog(Log.WARN)) _log.warn("Received a packet after hard disconnect, ignoring: " + packet + " on " + con); } + _cache.release(packet.getPayload()); return; } @@ -72,6 +77,7 @@ public class ConnectionPacketHandler { + ": dropping " + packet); ack(con, packet.getAckThrough(), packet.getNacks(), null, false); con.getOptions().setChoke(5*1000); + _cache.release(packet.getPayload()); return; } con.getOptions().setChoke(0); @@ -91,6 +97,7 @@ public class ConnectionPacketHandler { con.closeReceived(); boolean fastAck = false; + boolean ackOnly = false; if (isNew) { con.incrementUnackedPacketsReceived(); @@ -127,11 +134,19 @@ public class ConnectionPacketHandler { } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("ACK only packet received: " + packet); + ackOnly = true; } } } - - fastAck = fastAck || ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew); + + if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) && + ((packet.getSendStreamId() == null) || + DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN) ) ) { + // don't honor the ACK 0 in SYN packets received when the other side + // has obviously not seen our messages + } else { + fastAck = fastAck || ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew); + } con.eventOccurred(); if (fastAck) { if (con.getLastSendTime() + 2000 < _context.clock().now()) { @@ -140,6 +155,11 @@ public class ConnectionPacketHandler { con.ackImmediately(); } } + + if (ackOnly) { + // non-ack message payloads are queued in the MessageInputStream + _cache.release(packet.getPayload()); + } } private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 618916208c..1b49245c29 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -13,6 +13,7 @@ import java.util.Map; import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; +import net.i2p.util.ByteCache; import net.i2p.util.Log; /** @@ -52,6 +53,7 @@ public class MessageInputStream extends InputStream { private int _readTimeout; private IOException _streamError; private long _readTotal; + private ByteCache _cache; private byte[] _oneByte = new byte[1]; @@ -70,6 +72,7 @@ public class MessageInputStream extends InputStream { _dataLock = new Object(); _closeReceived = false; _locallyClosed = false; + _cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE); } /** What is the highest block ID we've completely received through? */ @@ -166,7 +169,7 @@ public class MessageInputStream extends InputStream { buf.append("Close received, ready bytes: "); long available = 0; for (int i = 0; i < _readyDataBlocks.size(); i++) - available += ((ByteArray)_readyDataBlocks.get(i)).getData().length; + available += ((ByteArray)_readyDataBlocks.get(i)).getValid(); available -= _readyDataBlockIndex; buf.append(available); buf.append(" blocks: ").append(_readyDataBlocks.size()); @@ -178,8 +181,8 @@ public class MessageInputStream extends InputStream { ByteArray ba = (ByteArray)_notYetReadyBlocks.get(id); buf.append(id).append(" "); - if (ba.getData() != null) - notAvailable += ba.getData().length; + if (ba != null) + notAvailable += ba.getValid(); } buf.append("not ready bytes: ").append(notAvailable); @@ -198,10 +201,10 @@ public class MessageInputStream extends InputStream { * * @return true if this is a new packet, false if it is a dup */ - public boolean messageReceived(long messageId, byte payload[]) { + public boolean messageReceived(long messageId, ByteArray payload) { synchronized (_dataLock) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("received " + messageId + " with " + payload.length); + _log.debug("received " + messageId + " with " + payload.getValid()); if (messageId <= _highestReadyBlockId) { if (_log.shouldLog(Log.DEBUG)) _log.debug("ignoring dup message " + messageId); @@ -212,17 +215,17 @@ public class MessageInputStream extends InputStream { _highestBlockId = messageId; if (_highestReadyBlockId + 1 == messageId) { - if (!_locallyClosed && payload.length > 0) { + if (!_locallyClosed && payload.getValid() > 0) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("accepting bytes as ready: " + payload.length); - _readyDataBlocks.add(new ByteArray(payload)); + _log.debug("accepting bytes as ready: " + payload.getValid()); + _readyDataBlocks.add(payload); } _highestReadyBlockId = messageId; long cur = _highestReadyBlockId + 1; // now pull in any previously pending blocks while (_notYetReadyBlocks.containsKey(new Long(cur))) { ByteArray ba = (ByteArray)_notYetReadyBlocks.remove(new Long(cur)); - if ( (ba != null) && (ba.getData() != null) && (ba.getData().length > 0) ) { + if ( (ba != null) && (ba.getData() != null) && (ba.getValid() > 0) ) { _readyDataBlocks.add(ba); } @@ -238,7 +241,7 @@ public class MessageInputStream extends InputStream { if (_locallyClosed) // dont need the payload, just the msgId in order _notYetReadyBlocks.put(new Long(messageId), new ByteArray(null)); else - _notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload)); + _notYetReadyBlocks.put(new Long(messageId), payload); _dataLock.notifyAll(); } } @@ -324,21 +327,25 @@ public class MessageInputStream extends InputStream { } else { // either was already ready, or we wait()ed and it arrived ByteArray cur = (ByteArray)_readyDataBlocks.get(0); - byte rv = cur.getData()[_readyDataBlockIndex]; + byte rv = cur.getData()[cur.getOffset()+_readyDataBlockIndex]; _readyDataBlockIndex++; - if (cur.getData().length <= _readyDataBlockIndex) { + boolean removed = false; + if (cur.getValid() <= _readyDataBlockIndex) { _readyDataBlockIndex = 0; _readyDataBlocks.remove(0); + removed = true; } _readTotal++; target[offset + i] = rv; // rv < 0 ? rv + 256 : rv - if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getData().length - 5) ) { + if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getValid() - 5) ) { if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i + "] after ready data: readyDataBlockIndex=" + _readyDataBlockIndex + " readyBlocks=" + _readyDataBlocks.size() + " readTotal=" + _readTotal); } + if (removed) + _cache.release(cur); } } // for (int i = 0; i < length; i++) { } // synchronized (_dataLock) @@ -357,9 +364,9 @@ public class MessageInputStream extends InputStream { for (int i = 0; i < _readyDataBlocks.size(); i++) { ByteArray cur = (ByteArray)_readyDataBlocks.get(i); if (i == 0) - numBytes += cur.getData().length - _readyDataBlockIndex; + numBytes += cur.getValid() - _readyDataBlockIndex; else - numBytes += cur.getData().length; + numBytes += cur.getValid(); } } if (_log.shouldLog(Log.DEBUG)) @@ -380,13 +387,13 @@ public class MessageInputStream extends InputStream { for (int i = 0; i < _readyDataBlocks.size(); i++) { ByteArray cur = (ByteArray)_readyDataBlocks.get(i); if (i == 0) - numBytes += cur.getData().length - _readyDataBlockIndex; + numBytes += cur.getValid() - _readyDataBlockIndex; else - numBytes += cur.getData().length; + numBytes += cur.getValid(); } for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) { ByteArray cur = (ByteArray)iter.next(); - numBytes += cur.getData().length; + numBytes += cur.getValid(); } return numBytes; } @@ -399,9 +406,9 @@ public class MessageInputStream extends InputStream { for (int i = 0; i < _readyDataBlocks.size(); i++) { ByteArray cur = (ByteArray)_readyDataBlocks.get(i); if (i == 0) - numBytes += cur.getData().length - _readyDataBlockIndex; + numBytes += cur.getValid() - _readyDataBlockIndex; else - numBytes += cur.getData().length; + numBytes += cur.getValid(); } return numBytes; } @@ -409,13 +416,15 @@ public class MessageInputStream extends InputStream { public void close() { synchronized (_dataLock) { - _readyDataBlocks.clear(); + while (_readyDataBlocks.size() > 0) + _cache.release((ByteArray)_readyDataBlocks.remove(0)); // we don't need the data, but we do need to keep track of the messageIds // received, so we can ACK accordingly for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) { ByteArray ba = (ByteArray)iter.next(); - ba.setData(null); + //ba.setData(null); + _cache.release(ba); } _locallyClosed = true; _dataLock.notifyAll(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 6416dfcbcc..f346a0bffe 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -3,11 +3,13 @@ package net.i2p.client.streaming; import java.util.Arrays; import net.i2p.I2PAppContext; import net.i2p.data.Base64; +import net.i2p.data.ByteArray; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.data.Signature; import net.i2p.data.SigningPrivateKey; +import net.i2p.util.ByteCache; /** * Contain a single packet transferred as part of a streaming connection. @@ -56,12 +58,13 @@ public class Packet { private long _nacks[]; private int _resendDelay; private int _flags; - private byte _payload[]; + private ByteArray _payload; // the next four are set only if the flags say so private Signature _optionSignature; private Destination _optionFrom; private int _optionDelay; private int _optionMaxSize; + private ByteCache _cache; /** * The receiveStreamId will be set to this when the packet doesn't know @@ -135,6 +138,10 @@ public class Packet { public static final int DEFAULT_MAX_SIZE = 32*1024; private static final int MAX_DELAY_REQUEST = 65535; + + public Packet() { + _cache = ByteCache.getInstance(128, MAX_PAYLOAD_SIZE); + } /** what stream is this packet a part of? */ public byte[] getSendStreamId() { @@ -200,14 +207,14 @@ public class Packet { public static final int MAX_PAYLOAD_SIZE = 32*1024; /** get the actual payload of the message. may be null */ - public byte[] getPayload() { return _payload; } - public void setPayload(byte payload[]) { + public ByteArray getPayload() { return _payload; } + public void setPayload(ByteArray payload) { _payload = payload; - if ( (payload != null) && (payload.length > MAX_PAYLOAD_SIZE) ) - throw new IllegalArgumentException("Too large payload: " + payload.length); + if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) ) + throw new IllegalArgumentException("Too large payload: " + payload.getValid()); } public int getPayloadSize() { - return (_payload == null ? 0 : _payload.length); + return (_payload == null ? 0 : _payload.getValid()); } /** is a particular flag set on this packet? */ @@ -340,12 +347,12 @@ public class Packet { if (_payload != null) { try { - System.arraycopy(_payload, 0, buffer, cur, _payload.length); + System.arraycopy(_payload.getData(), _payload.getOffset(), buffer, cur, _payload.getValid()); } catch (ArrayIndexOutOfBoundsException aioobe) { - System.err.println("payload.length: " + _payload.length + " buffer.length: " + buffer.length + " cur: " + cur); + System.err.println("payload.length: " + _payload.getValid() + " buffer.length: " + buffer.length + " cur: " + cur); throw aioobe; } - cur += _payload.length; + cur += _payload.getValid(); } return cur - offset; @@ -382,7 +389,7 @@ public class Packet { size += 2; // option size if (_payload != null) { - size += _payload.length; + size += _payload.getValid(); } return size; @@ -445,8 +452,10 @@ public class Packet { throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin); // skip ahead to the payload - _payload = new byte[payloadSize]; - System.arraycopy(buffer, payloadBegin, _payload, 0, payloadSize); + _payload = _cache.acquire(); //new ByteArray(new byte[payloadSize]); + System.arraycopy(buffer, payloadBegin, _payload.getData(), 0, payloadSize); + _payload.setValid(payloadSize); + _payload.setOffset(0); // ok now lets go back and deal with the options if (isFlagSet(FLAG_DELAY_REQUESTED)) { @@ -545,8 +554,8 @@ public class Packet { buf.append(" ").append(_nacks[i]); } } - if ( (_payload != null) && (_payload.length > 0) ) - buf.append(" data: ").append(_payload.length); + if ( (_payload != null) && (_payload.getValid() > 0) ) + buf.append(" data: ").append(_payload.getValid()); return buf.toString(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 3dd11ae22e..ee661b8895 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -3,8 +3,10 @@ package net.i2p.client.streaming; import java.util.Set; import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; import net.i2p.data.Destination; import net.i2p.data.SessionKey; +import net.i2p.util.ByteCache; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -26,6 +28,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat private long _ackOn; private long _cancelledOn; private SimpleTimer.TimedEvent _resendEvent; + private ByteCache _cache = ByteCache.getInstance(128, MAX_PAYLOAD_SIZE); public PacketLocal(I2PAppContext ctx, Destination to) { this(ctx, to, null); @@ -80,21 +83,31 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat _lastSend = _context.clock().now(); } public void ackReceived() { + ByteArray ba = null; synchronized (this) { if (_ackOn <= 0) _ackOn = _context.clock().now(); + ba = getPayload(); + setPayload(null); notifyAll(); } SimpleTimer.getInstance().removeEvent(_resendEvent); + if (ba != null) + _cache.release(ba); } public void cancelled() { + ByteArray ba = null; synchronized (this) { _cancelledOn = _context.clock().now(); + ba = getPayload(); + setPayload(null); notifyAll(); } SimpleTimer.getInstance().removeEvent(_resendEvent); if (_log.shouldLog(Log.DEBUG)) _log.debug("Cancelled! " + toString(), new Exception("cancelled")); + if (ba != null) + _cache.release(ba); } /** how long after packet creation was it acked? */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 51a5d6915d..6478ae09a3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -25,6 +25,7 @@ class PacketQueue { private I2PSession _session; private ConnectionManager _connectionManager; private ByteCache _cache = ByteCache.getInstance(64, 36*1024); + private ByteCache _packetCache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE); public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { _context = context; @@ -125,6 +126,11 @@ class PacketQueue { String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() : null); _connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix); } + + if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { + // ack only, so release it asap + _packetCache.release(packet.getPayload()); + } } } diff --git a/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java b/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java index 24b1167bfe..dcfa0f611f 100644 --- a/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java +++ b/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Collections; import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; import net.i2p.util.Log; @@ -29,7 +30,7 @@ public class MessageInputStreamTest { for (int i = 0; i < orig.length / 1024; i++) { byte msg[] = new byte[1024]; System.arraycopy(orig, i*1024, msg, 0, 1024); - in.messageReceived(i, msg); + in.messageReceived(i, new ByteArray(msg)); } byte read[] = new byte[orig.length]; @@ -59,7 +60,7 @@ public class MessageInputStreamTest { byte msg[] = new byte[1024]; Integer cur = (Integer)order.get(i); System.arraycopy(orig, cur.intValue()*1024, msg, 0, 1024); - in.messageReceived(cur.intValue(), msg); + in.messageReceived(cur.intValue(), new ByteArray(msg)); _log.debug("Injecting " + cur); } @@ -91,7 +92,7 @@ public class MessageInputStreamTest { byte msg[] = new byte[1024]; Integer cur = (Integer)order.get(i); System.arraycopy(orig, cur.intValue()*1024, msg, 0, 1024); - in.messageReceived(cur.intValue(), msg); + in.messageReceived(cur.intValue(), new ByteArray(msg)); _log.debug("Injecting " + cur); } } @@ -126,7 +127,7 @@ public class MessageInputStreamTest { byte msg[] = new byte[1024]; Integer cur = (Integer)order.get(i); System.arraycopy(orig, cur.intValue()*1024, msg, 0, 1024); - in.messageReceived(cur.intValue(), msg); + in.messageReceived(cur.intValue(), new ByteArray(msg)); _log.debug("Injecting " + cur); try { diff --git a/core/java/src/net/i2p/crypto/DSAEngine.java b/core/java/src/net/i2p/crypto/DSAEngine.java index 56ea80c45a..8b79da2ac5 100644 --- a/core/java/src/net/i2p/crypto/DSAEngine.java +++ b/core/java/src/net/i2p/crypto/DSAEngine.java @@ -33,20 +33,28 @@ import java.math.BigInteger; import java.util.Arrays; import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; import net.i2p.data.Hash; import net.i2p.data.Signature; import net.i2p.data.SigningPrivateKey; import net.i2p.data.SigningPublicKey; +import net.i2p.util.ByteCache; import net.i2p.util.Log; import net.i2p.util.NativeBigInteger; public class DSAEngine { private Log _log; private I2PAppContext _context; + private SHA1EntryCache _cache; + private ByteCache _rbyteCache; + private ByteCache _sbyteCache; public DSAEngine(I2PAppContext context) { _log = context.logManager().getLog(DSAEngine.class); _context = context; + _cache = new SHA1EntryCache(); + _rbyteCache = ByteCache.getInstance(16, 20); + _sbyteCache = ByteCache.getInstance(16, 20); } public static DSAEngine getInstance() { return I2PAppContext.getGlobalContext().dsa(); @@ -59,8 +67,10 @@ public class DSAEngine { try { byte[] sigbytes = signature.getData(); - byte rbytes[] = new byte[20]; - byte sbytes[] = new byte[20]; + ByteArray rbyteBA = _rbyteCache.acquire(); + ByteArray sbyteBA = _sbyteCache.acquire(); + byte rbytes[] = rbyteBA.getData(); //new byte[20]; + byte sbytes[] = sbyteBA.getData(); //new byte[20]; for (int x = 0; x < 40; x++) { if (x < 20) { rbytes[x] = sigbytes[x]; @@ -70,10 +80,18 @@ public class DSAEngine { } BigInteger s = new NativeBigInteger(1, sbytes); BigInteger r = new NativeBigInteger(1, rbytes); + + _rbyteCache.release(rbyteBA); + _sbyteCache.release(sbyteBA); + BigInteger y = new NativeBigInteger(1, verifyingKey.getData()); BigInteger w = s.modInverse(CryptoConstants.dsaq); - byte data[] = calculateHash(signedData, offset, size).getData(); + + SHAEntryCache.CacheEntry entry = _cache.acquire(size); + byte data[] = calculateHash(signedData, offset, size, entry).getData(); NativeBigInteger bi = new NativeBigInteger(1, data); + _cache.release(entry); + BigInteger u1 = bi.multiply(w).mod(CryptoConstants.dsaq); BigInteger u2 = r.multiply(w).mod(CryptoConstants.dsaq); BigInteger modval = CryptoConstants.dsag.modPow(u1, CryptoConstants.dsap); @@ -110,11 +128,18 @@ public class DSAEngine { BigInteger r = CryptoConstants.dsag.modPow(k, CryptoConstants.dsap).mod(CryptoConstants.dsaq); BigInteger kinv = k.modInverse(CryptoConstants.dsaq); - Hash h = calculateHash(data, offset, length); + SHAEntryCache.CacheEntry entry = _cache.acquire(length); + Hash h = calculateHash(data, offset, length, entry); - if (h == null) return null; + if (h == null) { + _cache.release(entry); + return null; + } BigInteger M = new NativeBigInteger(1, h.getData()); + + _cache.release(entry); + BigInteger x = new NativeBigInteger(1, signingKey.getData()); BigInteger s = (kinv.multiply(M.add(x.multiply(r)))).mod(CryptoConstants.dsaq); @@ -160,7 +185,17 @@ public class DSAEngine { private int[] H0 = { 0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0}; - private Hash calculateHash(byte[] source, int offset, int len) { + static final int getWordlength(int sourceLength) { + long length = sourceLength * 8; + int k = 448 - (int) ((length + 1) % 512); + if (k < 0) { + k += 512; + } + int padbytes = k / 8; + return sourceLength / 4 + padbytes / 4 + 3; + } + + private Hash calculateHash(byte[] source, int offset, int len, SHA256EntryCache.CacheEntry entry) { long length = len * 8; int k = 448 - (int) ((length + 1) % 512); if (k < 0) { @@ -168,7 +203,7 @@ public class DSAEngine { } int padbytes = k / 8; int wordlength = len / 4 + padbytes / 4 + 3; - int[] M0 = new int[wordlength]; + int[] M0 = (entry != null ? entry.M0 : new int[wordlength]); int wordcount = 0; int x = 0; for (x = 0; x < (len / 4) * 4; x += 4) { @@ -201,13 +236,13 @@ public class DSAEngine { } M0[wordlength - 2] = (int) (length >>> 32); M0[wordlength - 1] = (int) (length); - int[] H = new int[5]; + int[] H = (entry != null ? entry.H : new int[5]); for (x = 0; x < 5; x++) { H[x] = H0[x]; } int blocks = M0.length / 16; - int[] W = new int[80]; + int[] W = (entry != null ? entry.W : new int[80]); for (int bl = 0; bl < blocks; bl++) { int a = H[0]; int b = H[1]; @@ -241,13 +276,15 @@ public class DSAEngine { H[4] = add(e, H[4]); } - byte[] hashbytes = new byte[20]; + byte[] hashbytes = (entry != null ? entry.hashbytes : new byte[20]); for (x = 0; x < 5; x++) { hashbytes[x * 4] = (byte) (H[x] << 0 >>> 24); hashbytes[x * 4 + 1] = (byte) (H[x] << 8 >>> 24); hashbytes[x * 4 + 2] = (byte) (H[x] << 16 >>> 24); hashbytes[x * 4 + 3] = (byte) (H[x] << 24 >>> 24); } + if (entry != null) + return entry.hash; Hash hash = new Hash(); hash.setData(hashbytes); return hash; diff --git a/core/java/src/net/i2p/crypto/SHA1EntryCache.java b/core/java/src/net/i2p/crypto/SHA1EntryCache.java new file mode 100644 index 0000000000..4a3c53e397 --- /dev/null +++ b/core/java/src/net/i2p/crypto/SHA1EntryCache.java @@ -0,0 +1,37 @@ +package net.i2p.crypto; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import net.i2p.I2PAppContext; +import net.i2p.data.Hash; + +/** + * Cache the objects used in DSA's SHA1 calculateHash method to reduce + * memory churn. The CacheEntry should be held onto as long as the + * data referenced in it is needed (which often is only one or two lines + * of code) + * + */ +public class SHA1EntryCache extends SHA256EntryCache { + protected CacheEntry createNew(int payload) { + return new SHA1CacheEntry(payload); + } + + /** + * all the data alloc'ed in a calculateHash call + */ + public static class SHA1CacheEntry extends SHAEntryCache.CacheEntry { + public SHA1CacheEntry(int payload) { + wordlength = DSAEngine.getWordlength(payload); + bucket = payload; + hashbytes = new byte[20]; + M0 = new int[wordlength]; + W = new int[80]; + H = new int[5]; + hash = new Hash(); + hash.setData(hashbytes); + } + } +} diff --git a/core/java/src/net/i2p/crypto/SHA256EntryCache.java b/core/java/src/net/i2p/crypto/SHA256EntryCache.java index 4b6abba371..83dac586e9 100644 --- a/core/java/src/net/i2p/crypto/SHA256EntryCache.java +++ b/core/java/src/net/i2p/crypto/SHA256EntryCache.java @@ -14,163 +14,20 @@ import net.i2p.data.Hash; * of code) * */ -public final class SHA256EntryCache { - private static final int ONE_KB = 0; - private static final int FOUR_KB = 1; - private static final int EIGHT_KB = 2; - private static final int SIXTEEN_KB = 3; - private static final int THIRTYTWO_KB = 4; - private static final int FOURTYEIGHT_KB = 5; - private static final int LARGER = 6; - /** - * Array of Lists of free CacheEntry objects, indexed - * by the payload size they are capable of handling - */ - private List _available[] = new List[6]; - /** count up how often we use the cache for each size */ - private long _used[] = new long[7]; - private int _sizes[] = new int[] { 1024,4*1024,8*1024,16*1024,32*1024,48*1024 }; - - /** no more than 32 at each size level */ - private static final int MAX_CACHED = 64; - +public class SHA256EntryCache extends SHAEntryCache { public SHA256EntryCache() { - for (int i = 0; i < _available.length; i++) { - _available[i] = new ArrayList(MAX_CACHED); - //for (int j = 0; j < MAX_CACHED; j++) - // _available[i].add(new CacheEntry(_sizes[i])); - } + super(); } - /** - * Get the next available structure, either from the cache or a brand new one - * - */ - public final CacheEntry acquire(int payload) { - int entrySize = getBucket(payload); - switch (entrySize) { - case 1024: - _used[ONE_KB]++; - synchronized (_available[ONE_KB]) { - if (_available[ONE_KB].size() > 0) { - return (CacheEntry)_available[ONE_KB].remove(0); - } - } - break; - case 4*1024: - _used[FOUR_KB]++; - synchronized (_available[FOUR_KB]) { - if (_available[FOUR_KB].size() > 0) { - return (CacheEntry)_available[FOUR_KB].remove(0); - } - } - break; - case 8*1024: - _used[EIGHT_KB]++; - synchronized (_available[EIGHT_KB]) { - if (_available[EIGHT_KB].size() > 0) { - return (CacheEntry)_available[EIGHT_KB].remove(0); - } - } - break; - case 16*1024: - _used[SIXTEEN_KB]++; - synchronized (_available[SIXTEEN_KB]) { - if (_available[SIXTEEN_KB].size() > 0) { - return (CacheEntry)_available[SIXTEEN_KB].remove(0); - } - } - break; - case 32*1024: - _used[THIRTYTWO_KB]++; - synchronized (_available[THIRTYTWO_KB]) { - if (_available[THIRTYTWO_KB].size() > 0) { - return (CacheEntry)_available[THIRTYTWO_KB].remove(0); - } - } - break; - case 48*1024: - _used[FOURTYEIGHT_KB]++; - synchronized (_available[FOURTYEIGHT_KB]) { - if (_available[FOURTYEIGHT_KB].size() > 0) { - return (CacheEntry)_available[FOURTYEIGHT_KB].remove(0); - } - } - break; - default: - _used[LARGER]++; - // not for the bucket, so make it exact - return new CacheEntry(payload); - } - return new CacheEntry(entrySize); - } - - /** - * Put this structure back onto the available cache for reuse - * - */ - public final void release(CacheEntry entry) { - entry.reset(); - if (false) return; - switch (entry.bucket) { - case 1024: - synchronized (_available[ONE_KB]) { - if (_available[ONE_KB].size() < MAX_CACHED) { - _available[ONE_KB].add(entry); - } - } - return; - case 4*1024: - synchronized (_available[FOUR_KB]) { - if (_available[FOUR_KB].size() < MAX_CACHED) { - _available[FOUR_KB].add(entry); - } - } - return; - case 8*1024: - synchronized (_available[EIGHT_KB]) { - if (_available[EIGHT_KB].size() < MAX_CACHED) { - _available[EIGHT_KB].add(entry); - } - } - return; - case 16*1024: - synchronized (_available[SIXTEEN_KB]) { - if (_available[SIXTEEN_KB].size() < MAX_CACHED) { - _available[SIXTEEN_KB].add(entry); - } - } - return; - case 32*1024: - synchronized (_available[THIRTYTWO_KB]) { - if (_available[THIRTYTWO_KB].size() < MAX_CACHED) { - _available[THIRTYTWO_KB].add(entry); - } - } - return; - case 48*1024: - synchronized (_available[FOURTYEIGHT_KB]) { - if (_available[FOURTYEIGHT_KB].size() < MAX_CACHED) { - _available[FOURTYEIGHT_KB].add(entry); - } - } - return; - } + protected CacheEntry createNew(int payload) { + return new SHA256CacheEntry(payload); } /** * all the data alloc'ed in a calculateHash call */ - public static final class CacheEntry { - byte hashbytes[]; - int W[]; - int M0[]; - int H[]; - Hash hash; - int wordlength; - int bucket; - - public CacheEntry(int payload) { + public static class SHA256CacheEntry extends SHAEntryCache.CacheEntry { + public SHA256CacheEntry(int payload) { wordlength = SHA256Generator.getWordlength(payload); bucket = payload; hashbytes = new byte[32]; @@ -180,30 +37,6 @@ public final class SHA256EntryCache { hash = new Hash(); hash.setData(hashbytes); } - - public final void reset() { - Arrays.fill(hashbytes, (byte)0x0); - Arrays.fill(M0, (byte)0x0); - Arrays.fill(W, (byte)0x0); - Arrays.fill(H, (byte)0x0); - } - } - - private static final int getBucket(int payload) { - if (payload <= 1024) - return 1024; - else if (payload <= 4*1024) - return 4*1024; - else if (payload <= 8*1024) - return 8*1024; - else if (payload <= 16*1024) - return 16*1024; - else if (payload <= 32*1024) - return 32*1024; - else if (payload <= 48*1024) - return 48*1024; - else - return payload; } public static void main(String args[]) { diff --git a/core/java/src/net/i2p/crypto/SHA256Generator.java b/core/java/src/net/i2p/crypto/SHA256Generator.java index 38a84aaa8f..444f5361d2 100644 --- a/core/java/src/net/i2p/crypto/SHA256Generator.java +++ b/core/java/src/net/i2p/crypto/SHA256Generator.java @@ -72,8 +72,8 @@ public final class SHA256Generator { return rv; } - private final SHA256EntryCache.CacheEntry getNewEntry(int payloadSize) { - return new SHA256EntryCache.CacheEntry(payloadSize); + private final SHA256EntryCache.SHA256CacheEntry getNewEntry(int payloadSize) { + return new SHA256EntryCache.SHA256CacheEntry(payloadSize); } /** Calculate the SHA-256 has of the source diff --git a/core/java/src/net/i2p/crypto/SHAEntryCache.java b/core/java/src/net/i2p/crypto/SHAEntryCache.java new file mode 100644 index 0000000000..7c7a6fbf79 --- /dev/null +++ b/core/java/src/net/i2p/crypto/SHAEntryCache.java @@ -0,0 +1,206 @@ +package net.i2p.crypto; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import net.i2p.I2PAppContext; +import net.i2p.data.Hash; + +/** + * Cache the objects used in SHA256Generator's calculate method to reduce + * memory churn. The CacheEntry should be held onto as long as the + * data referenced in it is needed (which often is only one or two lines + * of code) + * + */ +public abstract class SHAEntryCache { + private static final int ONE_KB = 0; + private static final int FOUR_KB = 1; + private static final int EIGHT_KB = 2; + private static final int SIXTEEN_KB = 3; + private static final int THIRTYTWO_KB = 4; + private static final int FOURTYEIGHT_KB = 5; + private static final int LARGER = 6; + /** + * Array of Lists of free CacheEntry objects, indexed + * by the payload size they are capable of handling + */ + private List _available[] = new List[6]; + /** count up how often we use the cache for each size */ + private long _used[] = new long[7]; + private int _sizes[] = new int[] { 1024,4*1024,8*1024,16*1024,32*1024,48*1024 }; + + /** no more than 32 at each size level */ + private static final int MAX_CACHED = 64; + + public SHAEntryCache() { + for (int i = 0; i < _available.length; i++) { + _available[i] = new ArrayList(MAX_CACHED); + //for (int j = 0; j < MAX_CACHED; j++) + // _available[i].add(new CacheEntry(_sizes[i])); + } + } + + /** + * Overridden by the impl to provide a brand new cache entry, capable + * of sustaining the data necessary to digest the specified payload + * + */ + protected abstract CacheEntry createNew(int payload); + + /** + * Get the next available structure, either from the cache or a brand new one + * + */ + public final CacheEntry acquire(int payload) { + int entrySize = getBucket(payload); + switch (entrySize) { + case 1024: + _used[ONE_KB]++; + synchronized (_available[ONE_KB]) { + if (_available[ONE_KB].size() > 0) { + return (CacheEntry)_available[ONE_KB].remove(0); + } + } + break; + case 4*1024: + _used[FOUR_KB]++; + synchronized (_available[FOUR_KB]) { + if (_available[FOUR_KB].size() > 0) { + return (CacheEntry)_available[FOUR_KB].remove(0); + } + } + break; + case 8*1024: + _used[EIGHT_KB]++; + synchronized (_available[EIGHT_KB]) { + if (_available[EIGHT_KB].size() > 0) { + return (CacheEntry)_available[EIGHT_KB].remove(0); + } + } + break; + case 16*1024: + _used[SIXTEEN_KB]++; + synchronized (_available[SIXTEEN_KB]) { + if (_available[SIXTEEN_KB].size() > 0) { + return (CacheEntry)_available[SIXTEEN_KB].remove(0); + } + } + break; + case 32*1024: + _used[THIRTYTWO_KB]++; + synchronized (_available[THIRTYTWO_KB]) { + if (_available[THIRTYTWO_KB].size() > 0) { + return (CacheEntry)_available[THIRTYTWO_KB].remove(0); + } + } + break; + case 48*1024: + _used[FOURTYEIGHT_KB]++; + synchronized (_available[FOURTYEIGHT_KB]) { + if (_available[FOURTYEIGHT_KB].size() > 0) { + return (CacheEntry)_available[FOURTYEIGHT_KB].remove(0); + } + } + break; + default: + _used[LARGER]++; + // not for the bucket, so make it exact + return createNew(payload); + } + return createNew(payload); + } + + /** + * Put this structure back onto the available cache for reuse + * + */ + public final void release(CacheEntry entry) { + entry.reset(); + if (false) return; + switch (entry.bucket) { + case 1024: + synchronized (_available[ONE_KB]) { + if (_available[ONE_KB].size() < MAX_CACHED) { + _available[ONE_KB].add(entry); + } + } + return; + case 4*1024: + synchronized (_available[FOUR_KB]) { + if (_available[FOUR_KB].size() < MAX_CACHED) { + _available[FOUR_KB].add(entry); + } + } + return; + case 8*1024: + synchronized (_available[EIGHT_KB]) { + if (_available[EIGHT_KB].size() < MAX_CACHED) { + _available[EIGHT_KB].add(entry); + } + } + return; + case 16*1024: + synchronized (_available[SIXTEEN_KB]) { + if (_available[SIXTEEN_KB].size() < MAX_CACHED) { + _available[SIXTEEN_KB].add(entry); + } + } + return; + case 32*1024: + synchronized (_available[THIRTYTWO_KB]) { + if (_available[THIRTYTWO_KB].size() < MAX_CACHED) { + _available[THIRTYTWO_KB].add(entry); + } + } + return; + case 48*1024: + synchronized (_available[FOURTYEIGHT_KB]) { + if (_available[FOURTYEIGHT_KB].size() < MAX_CACHED) { + _available[FOURTYEIGHT_KB].add(entry); + } + } + return; + } + } + + /** + * all the data alloc'ed in a calculateHash call + */ + public static abstract class CacheEntry { + byte hashbytes[]; + int W[]; + int M0[]; + int H[]; + Hash hash; + int wordlength; + int bucket; + + protected CacheEntry() {} + + public final void reset() { + Arrays.fill(hashbytes, (byte)0x0); + Arrays.fill(M0, (byte)0x0); + Arrays.fill(W, (byte)0x0); + Arrays.fill(H, (byte)0x0); + } + } + + private static final int getBucket(int payload) { + if (payload <= 1024) + return 1024; + else if (payload <= 4*1024) + return 4*1024; + else if (payload <= 8*1024) + return 8*1024; + else if (payload <= 16*1024) + return 16*1024; + else if (payload <= 32*1024) + return 32*1024; + else if (payload <= 48*1024) + return 48*1024; + else + return payload; + } +} diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java index 3e9df9a6ef..39b01c9198 100644 --- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java @@ -66,8 +66,8 @@ class TransientSessionKeyManager extends SessionKeyManager { super(context); _log = context.logManager().getLog(TransientSessionKeyManager.class); _context = context; - _outboundSessions = new HashMap(64); - _inboundTagSets = new HashMap(1024); + _outboundSessions = new HashMap(1024); + _inboundTagSets = new HashMap(64*1024); } private TransientSessionKeyManager() { this(null); } diff --git a/core/java/src/net/i2p/util/ByteCache.java b/core/java/src/net/i2p/util/ByteCache.java index e961d2ecd3..8344e0989a 100644 --- a/core/java/src/net/i2p/util/ByteCache.java +++ b/core/java/src/net/i2p/util/ByteCache.java @@ -25,11 +25,14 @@ public final class ByteCache { */ public static ByteCache getInstance(int cacheSize, int size) { Integer sz = new Integer(size); + ByteCache cache = null; synchronized (_caches) { if (!_caches.containsKey(sz)) _caches.put(sz, new ByteCache(cacheSize, size)); - return (ByteCache)_caches.get(sz); + cache = (ByteCache)_caches.get(sz); } + cache.resize(cacheSize); + return cache; } private Log _log; /** list of available and available entries */ @@ -56,6 +59,11 @@ public final class ByteCache { _log = I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class); } + private void resize(int maxCachedEntries) { + if (_maxCached >= maxCachedEntries) return; + _maxCached = maxCachedEntries; + } + /** * Get the next available structure, either from the cache or a brand new one * diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index 270e78100c..ac9b2b44dc 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -30,7 +30,7 @@ public class SimpleTimer { _context = I2PAppContext.getGlobalContext(); _log = _context.logManager().getLog(SimpleTimer.class); _events = new TreeMap(); - _eventTimes = new HashMap(); + _eventTimes = new HashMap(1024); _readyEvents = new ArrayList(4); I2PThread runner = new I2PThread(new SimpleTimerRunner()); runner.setName("SimpleTimer"); diff --git a/history.txt b/history.txt index d3745d5cbf..e154998dea 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ -$Id: history.txt,v 1.154 2005/02/23 00:00:53 jrandom Exp $ +$Id: history.txt,v 1.155 2005/02/23 16:44:32 jrandom Exp $ + +2005-02-24 jrandom + * Cache temporary memory allocation in the DSA's SHA1 impl, and the packet + data in the streaming lib. + * Fixed a streaming lib bug where the connection initiator would fail the + stream if the ACK to their SYN was lost. 2005-02-23 jrandom * Now that we don't get stale SAM sessions, it'd be nice if we didn't diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 539ad0b826..897681ad52 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.149 $ $Date: 2005/02/23 00:00:53 $"; + public final static String ID = "$Revision: 1.150 $ $Date: 2005/02/23 16:44:32 $"; public final static String VERSION = "0.5.0.1"; - public final static long BUILD = 1; + public final static long BUILD = 2; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID);