use ByteCache for chunks in/out

This commit is contained in:
zzz
2012-09-17 21:32:05 +00:00
parent 259c28f8c1
commit 2b14d32bea
8 changed files with 88 additions and 16 deletions

View File

@ -1,5 +1,7 @@
package org.klomp.snark; package org.klomp.snark;
import net.i2p.data.ByteArray;
/** /**
* Callback used to fetch data * Callback used to fetch data
* @since 0.8.2 * @since 0.8.2
@ -10,5 +12,5 @@ interface DataLoader
* This is the callback that PeerConnectionOut calls to get the data from disk * This is the callback that PeerConnectionOut calls to get the data from disk
* @return bytes or null for errors * @return bytes or null for errors
*/ */
public byte[] loadData(int piece, int begin, int length); public ByteArray loadData(int piece, int begin, int length);
} }

View File

@ -23,8 +23,13 @@ package org.klomp.snark;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
// Used to queue outgoing connections import net.i2p.data.ByteArray;
// sendMessage() should be used to translate them to wire format. import net.i2p.util.ByteCache;
/**
* Used to queue outgoing connections
* sendMessage() should be used to translate them to wire format.
*/
class Message class Message
{ {
final static byte KEEP_ALIVE = -1; final static byte KEEP_ALIVE = -1;
@ -69,6 +74,9 @@ class Message
// now unused // now unused
//SimpleTimer.TimedEvent expireEvent; //SimpleTimer.TimedEvent expireEvent;
private static final int BUFSIZE = PeerState.PARTSIZE;
private static final ByteCache _cache = ByteCache.getInstance(16, BUFSIZE);
/** Utility method for sending a message through a DataStream. */ /** Utility method for sending a message through a DataStream. */
void sendMessage(DataOutputStream dos) throws IOException void sendMessage(DataOutputStream dos) throws IOException
{ {
@ -79,11 +87,15 @@ class Message
return; return;
} }
ByteArray ba;
// Get deferred data // Get deferred data
if (data == null && dataLoader != null) { if (data == null && dataLoader != null) {
data = dataLoader.loadData(piece, begin, length); ba = dataLoader.loadData(piece, begin, length);
if (data == null) if (ba == null)
return; // hmm will get retried, but shouldn't happen return; // hmm will get retried, but shouldn't happen
data = ba.getData();
} else {
ba = null;
} }
// Calculate the total length in bytes // Calculate the total length in bytes
@ -139,6 +151,10 @@ class Message
// Send actual data // Send actual data
if (type == BITFIELD || type == PIECE || type == EXTENSION) if (type == BITFIELD || type == PIECE || type == EXTENSION)
dos.write(data, off, len); dos.write(data, off, len);
// Was pulled from cache in Storage.getPiece() via dataLoader
if (ba != null && ba.getData().length == BUFSIZE)
_cache.release(ba, false);
} }
@Override @Override

View File

@ -9,6 +9,8 @@ import java.security.MessageDigest;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.crypto.SHA1; import net.i2p.crypto.SHA1;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SecureFile; import net.i2p.util.SecureFile;
@ -42,6 +44,9 @@ class PartialPiece implements Comparable {
private final int pclen; private final int pclen;
private final File tempDir; private final File tempDir;
private static final int BUFSIZE = PeerState.PARTSIZE;
private static final ByteCache _cache = ByteCache.getInstance(16, BUFSIZE);
// Any bigger than this, use temp file instead of heap // Any bigger than this, use temp file instead of heap
private static final int MAX_IN_MEM = 128 * 1024; private static final int MAX_IN_MEM = 128 * 1024;
// May be reduced on OOM // May be reduced on OOM
@ -154,7 +159,16 @@ class PartialPiece implements Comparable {
sha1.update(bs); sha1.update(bs);
} else { } else {
int read = 0; int read = 0;
byte[] buf = new byte[Math.min(pclen, 16384)]; int buflen = Math.min(pclen, BUFSIZE);
ByteArray ba;
byte[] buf;
if (buflen == BUFSIZE) {
ba = _cache.acquire();
buf = ba.getData();
} else {
ba = null;
buf = new byte[buflen];
}
synchronized (this) { synchronized (this) {
if (raf == null) if (raf == null)
throw new IOException(); throw new IOException();
@ -167,6 +181,8 @@ class PartialPiece implements Comparable {
sha1.update(buf, 0, rd); sha1.update(buf, 0, rd);
} }
} }
if (ba != null)
_cache.release(ba, false);
if (read < pclen) if (read < pclen)
throw new IOException(); throw new IOException();
} }
@ -182,7 +198,15 @@ class PartialPiece implements Comparable {
din.readFully(bs, off, len); din.readFully(bs, off, len);
} else { } else {
// read in fully before synching on raf // read in fully before synching on raf
byte[] tmp = new byte[len]; ByteArray ba;
byte[] tmp;
if (len == BUFSIZE) {
ba = _cache.acquire();
tmp = ba.getData();
} else {
ba = null;
tmp = new byte[len];
}
din.readFully(tmp); din.readFully(tmp);
synchronized (this) { synchronized (this) {
if (raf == null) if (raf == null)
@ -190,6 +214,8 @@ class PartialPiece implements Comparable {
raf.seek(off); raf.seek(off);
raf.write(tmp); raf.write(tmp);
} }
if (ba != null)
_cache.release(ba, false);
} }
} }
@ -208,7 +234,16 @@ class PartialPiece implements Comparable {
out.write(bs, offset, len); out.write(bs, offset, len);
} else { } else {
int read = 0; int read = 0;
byte[] buf = new byte[Math.min(len, 16384)]; int buflen = Math.min(len, BUFSIZE);
ByteArray ba;
byte[] buf;
if (buflen == BUFSIZE) {
ba = _cache.acquire();
buf = ba.getData();
} else {
ba = null;
buf = new byte[buflen];
}
synchronized (this) { synchronized (this) {
if (raf == null) if (raf == null)
throw new IOException(); throw new IOException();
@ -220,6 +255,8 @@ class PartialPiece implements Comparable {
out.write(buf, 0, rd); out.write(buf, 0, rd);
} }
} }
if (ba != null)
_cache.release(ba, false);
} }
} }

View File

@ -437,6 +437,7 @@ class PeerConnectionOut implements Runnable
*/ */
void sendPiece(int piece, int begin, int length, DataLoader loader) void sendPiece(int piece, int begin, int length, DataLoader loader)
{ {
/****
boolean sendNow = false; boolean sendNow = false;
// are there any cases where we should? // are there any cases where we should?
@ -447,6 +448,7 @@ class PeerConnectionOut implements Runnable
sendPiece(piece, begin, length, bytes); sendPiece(piece, begin, length, bytes);
return; return;
} }
****/
// queue a fake message... set everything up, // queue a fake message... set everything up,
// except save the PeerState instead of the bytes. // except save the PeerState instead of the bytes.

View File

@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.ConcurrentHashSet; import net.i2p.util.ConcurrentHashSet;
@ -874,7 +875,7 @@ class PeerCoordinator implements PeerListener
* *
* @throws RuntimeException on IOE getting the data * @throws RuntimeException on IOE getting the data
*/ */
public byte[] gotRequest(Peer peer, int piece, int off, int len) public ByteArray gotRequest(Peer peer, int piece, int off, int len)
{ {
if (halted) if (halted)
return null; return null;

View File

@ -22,6 +22,8 @@ package org.klomp.snark;
import java.util.List; import java.util.List;
import net.i2p.data.ByteArray;
/** /**
* Listener for Peer events. * Listener for Peer events.
*/ */
@ -114,7 +116,7 @@ interface PeerListener
* @return a byte array containing the piece or null when the piece * @return a byte array containing the piece or null when the piece
* is not available (which is a protocol error). * is not available (which is a protocol error).
*/ */
byte[] gotRequest(Peer peer, int piece, int off, int len); ByteArray gotRequest(Peer peer, int piece, int off, int len);
/** /**
* Called when a (partial) piece has been downloaded from the peer. * Called when a (partial) piece has been downloaded from the peer.

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.Log; import net.i2p.util.Log;
class PeerState implements DataLoader class PeerState implements DataLoader
@ -245,8 +246,8 @@ class PeerState implements DataLoader
* @return bytes or null for errors * @return bytes or null for errors
* @since 0.8.2 * @since 0.8.2
*/ */
public byte[] loadData(int piece, int begin, int length) { public ByteArray loadData(int piece, int begin, int length) {
byte[] pieceBytes = listener.gotRequest(peer, piece, begin, length); ByteArray pieceBytes = listener.gotRequest(peer, piece, begin, length);
if (pieceBytes == null) if (pieceBytes == null)
{ {
// XXX - Protocol error-> diconnect? // XXX - Protocol error-> diconnect?
@ -256,7 +257,7 @@ class PeerState implements DataLoader
} }
// More sanity checks // More sanity checks
if (length != pieceBytes.length) if (length != pieceBytes.getData().length)
{ {
// XXX - Protocol error-> disconnect? // XXX - Protocol error-> disconnect?
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))

View File

@ -34,6 +34,8 @@ import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import net.i2p.crypto.SHA1; import net.i2p.crypto.SHA1;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SecureFile; import net.i2p.util.SecureFile;
import net.i2p.util.SystemVersion; import net.i2p.util.SystemVersion;
@ -80,6 +82,9 @@ public class Storage
private static final boolean _isWindows = SystemVersion.isWindows(); private static final boolean _isWindows = SystemVersion.isWindows();
private static final int BUFSIZE = PeerState.PARTSIZE;
private static final ByteCache _cache = ByteCache.getInstance(16, BUFSIZE);
/** /**
* Creates a new storage based on the supplied MetaInfo. This will * Creates a new storage based on the supplied MetaInfo. This will
* try to create and/or check all needed files in the MetaInfo. * try to create and/or check all needed files in the MetaInfo.
@ -899,22 +904,28 @@ public class Storage
* Returns a byte array containing a portion of the requested piece or null if * Returns a byte array containing a portion of the requested piece or null if
* the storage doesn't contain the piece yet. * the storage doesn't contain the piece yet.
*/ */
public byte[] getPiece(int piece, int off, int len) throws IOException public ByteArray getPiece(int piece, int off, int len) throws IOException
{ {
if (!bitfield.get(piece)) if (!bitfield.get(piece))
return null; return null;
//Catch a common place for OOMs esp. on 1MB pieces //Catch a common place for OOMs esp. on 1MB pieces
ByteArray rv;
byte[] bs; byte[] bs;
try { try {
bs = new byte[len]; // Will be restored to cache in Message.sendMessage()
if (len == BUFSIZE)
rv = _cache.acquire();
else
rv = new ByteArray(new byte[len]);
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Out of memory, can't honor request for piece " + piece, oom); _log.warn("Out of memory, can't honor request for piece " + piece, oom);
return null; return null;
} }
bs = rv.getData();
getUncheckedPiece(piece, bs, off, len); getUncheckedPiece(piece, bs, off, len);
return bs; return rv;
} }
/** /**