2005-02-27 jrandom

* Don't rerequest leaseSets if there are already pending requests
    * Reverted the insufficiently tested caching in the DSA/SHA1 impl, and
      temporary disabled the streaming lib packet caching.
    * Reduced the resend RTT penalty to 10s
This commit is contained in:
jrandom
2005-02-27 22:09:37 +00:00
committed by zzz
parent 7983bb1490
commit 469a0852d7
13 changed files with 219 additions and 322 deletions

View File

@ -188,7 +188,8 @@ public class Connection {
}
void ackImmediately() {
_receiver.send(null, 0, 0);
PacketLocal packet = _receiver.send(null, 0, 0);
//packet.releasePayload();
}
/**
@ -871,7 +872,7 @@ public class Connection {
+ ") for " + Connection.this.toString());
// setRTT has its own ceiling
getOptions().setRTT(getOptions().getRTT() + 30*1000);
getOptions().setRTT(getOptions().getRTT() + 10*1000);
getOptions().setWindowSize(newWindowSize);
windowAdjusted();
}

View File

@ -20,13 +20,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
private Log _log;
private Connection _connection;
private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus();
private ByteCache _cache;
public ConnectionDataReceiver(I2PAppContext ctx, Connection con) {
_context = ctx;
_log = ctx.logManager().getLog(ConnectionDataReceiver.class);
_connection = con;
_cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE);
}
public boolean writeInProcess() {
@ -135,9 +133,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
}
private PacketLocal buildPacket(Connection con, byte buf[], int off, int size, boolean forceIncrement) {
if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")");
boolean ackOnly = isAckOnly(con, size);
PacketLocal packet = new PacketLocal(_context, con.getRemotePeer(), con);
ByteArray data = (size <= Packet.MAX_PAYLOAD_SIZE ? _cache.acquire() : new ByteArray(new byte[size]));
//ByteArray data = packet.acquirePayload();
ByteArray data = new ByteArray(new byte[size]);
if (size > 0)
System.arraycopy(buf, off, data.getData(), 0, size);
data.setValid(size);

View File

@ -18,11 +18,9 @@ import net.i2p.util.SimpleTimer;
public class ConnectionPacketHandler {
private I2PAppContext _context;
private Log _log;
private ByteCache _cache;
public ConnectionPacketHandler(I2PAppContext context) {
_context = context;
_cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE);
_log = context.logManager().getLog(ConnectionPacketHandler.class);
_context.statManager().createRateStat("stream.con.receiveMessageSize", "Size of a message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.receiveDuplicateSize", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@ -37,7 +35,7 @@ public class ConnectionPacketHandler {
if (!ok) {
if ( (!packet.isFlagSet(Packet.FLAG_RESET)) && (_log.shouldLog(Log.ERROR)) )
_log.error("Packet does NOT verify: " + packet);
_cache.release(packet.getPayload());
packet.releasePayload();
return;
}
@ -51,7 +49,7 @@ public class ConnectionPacketHandler {
if (_log.shouldLog(Log.WARN))
_log.warn("Received a packet after hard disconnect, ignoring: " + packet + " on " + con);
}
_cache.release(packet.getPayload());
packet.releasePayload();
return;
}
@ -156,9 +154,9 @@ public class ConnectionPacketHandler {
}
}
if (ackOnly) {
if (ackOnly || !isNew) {
// non-ack message payloads are queued in the MessageInputStream
_cache.release(packet.getPayload());
packet.releasePayload();
}
}
@ -220,7 +218,7 @@ public class ConnectionPacketHandler {
+ ") for " + con);
// setRTT has its own ceiling
con.getOptions().setRTT(con.getOptions().getRTT() + 30*1000);
con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
con.getOptions().setWindowSize(oldSize);
congested = true;

View File

@ -219,10 +219,17 @@ public class Packet {
return (_payload == null ? 0 : _payload.getValid());
}
public void releasePayload() {
if (_payload != null)
_cache.release(_payload);
//if (_payload != null)
// _cache.release(_payload);
_payload = null;
}
public ByteArray acquirePayload() {
ByteArray old = _payload;
_payload = new ByteArray(new byte[Packet.MAX_PAYLOAD_SIZE]); //_cache.acquire();
//if (old != null)
// _cache.release(old);
return _payload;
}
/** is a particular flag set on this packet? */
public boolean isFlagSet(int flag) { return 0 != (_flags & flag); }

View File

@ -33,28 +33,20 @@ import java.math.BigInteger;
import java.util.Arrays;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.Hash;
import net.i2p.data.Signature;
import net.i2p.data.SigningPrivateKey;
import net.i2p.data.SigningPublicKey;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.NativeBigInteger;
public class DSAEngine {
private Log _log;
private I2PAppContext _context;
private SHA1EntryCache _cache;
private ByteCache _rbyteCache;
private ByteCache _sbyteCache;
public DSAEngine(I2PAppContext context) {
_log = context.logManager().getLog(DSAEngine.class);
_context = context;
_cache = new SHA1EntryCache();
_rbyteCache = ByteCache.getInstance(16, 20);
_sbyteCache = ByteCache.getInstance(16, 20);
}
public static DSAEngine getInstance() {
return I2PAppContext.getGlobalContext().dsa();
@ -67,10 +59,8 @@ public class DSAEngine {
try {
byte[] sigbytes = signature.getData();
ByteArray rbyteBA = _rbyteCache.acquire();
ByteArray sbyteBA = _sbyteCache.acquire();
byte rbytes[] = rbyteBA.getData(); //new byte[20];
byte sbytes[] = sbyteBA.getData(); //new byte[20];
byte rbytes[] = new byte[20];
byte sbytes[] = new byte[20];
for (int x = 0; x < 40; x++) {
if (x < 20) {
rbytes[x] = sigbytes[x];
@ -80,18 +70,10 @@ public class DSAEngine {
}
BigInteger s = new NativeBigInteger(1, sbytes);
BigInteger r = new NativeBigInteger(1, rbytes);
_rbyteCache.release(rbyteBA);
_sbyteCache.release(sbyteBA);
BigInteger y = new NativeBigInteger(1, verifyingKey.getData());
BigInteger w = s.modInverse(CryptoConstants.dsaq);
SHAEntryCache.CacheEntry entry = _cache.acquire(size);
byte data[] = calculateHash(signedData, offset, size, entry).getData();
byte data[] = calculateHash(signedData, offset, size).getData();
NativeBigInteger bi = new NativeBigInteger(1, data);
_cache.release(entry);
BigInteger u1 = bi.multiply(w).mod(CryptoConstants.dsaq);
BigInteger u2 = r.multiply(w).mod(CryptoConstants.dsaq);
BigInteger modval = CryptoConstants.dsag.modPow(u1, CryptoConstants.dsap);
@ -128,18 +110,11 @@ public class DSAEngine {
BigInteger r = CryptoConstants.dsag.modPow(k, CryptoConstants.dsap).mod(CryptoConstants.dsaq);
BigInteger kinv = k.modInverse(CryptoConstants.dsaq);
SHAEntryCache.CacheEntry entry = _cache.acquire(length);
Hash h = calculateHash(data, offset, length, entry);
Hash h = calculateHash(data, offset, length);
if (h == null) {
_cache.release(entry);
return null;
}
if (h == null) return null;
BigInteger M = new NativeBigInteger(1, h.getData());
_cache.release(entry);
BigInteger x = new NativeBigInteger(1, signingKey.getData());
BigInteger s = (kinv.multiply(M.add(x.multiply(r)))).mod(CryptoConstants.dsaq);
@ -185,17 +160,7 @@ public class DSAEngine {
private int[] H0 = { 0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0};
static final int getWordlength(int sourceLength) {
long length = sourceLength * 8;
int k = 448 - (int) ((length + 1) % 512);
if (k < 0) {
k += 512;
}
int padbytes = k / 8;
return sourceLength / 4 + padbytes / 4 + 3;
}
private Hash calculateHash(byte[] source, int offset, int len, SHA256EntryCache.CacheEntry entry) {
private Hash calculateHash(byte[] source, int offset, int len) {
long length = len * 8;
int k = 448 - (int) ((length + 1) % 512);
if (k < 0) {
@ -203,7 +168,7 @@ public class DSAEngine {
}
int padbytes = k / 8;
int wordlength = len / 4 + padbytes / 4 + 3;
int[] M0 = (entry != null ? entry.M0 : new int[wordlength]);
int[] M0 = new int[wordlength];
int wordcount = 0;
int x = 0;
for (x = 0; x < (len / 4) * 4; x += 4) {
@ -236,13 +201,13 @@ public class DSAEngine {
}
M0[wordlength - 2] = (int) (length >>> 32);
M0[wordlength - 1] = (int) (length);
int[] H = (entry != null ? entry.H : new int[5]);
int[] H = new int[5];
for (x = 0; x < 5; x++) {
H[x] = H0[x];
}
int blocks = M0.length / 16;
int[] W = (entry != null ? entry.W : new int[80]);
int[] W = new int[80];
for (int bl = 0; bl < blocks; bl++) {
int a = H[0];
int b = H[1];
@ -276,15 +241,13 @@ public class DSAEngine {
H[4] = add(e, H[4]);
}
byte[] hashbytes = (entry != null ? entry.hashbytes : new byte[20]);
byte[] hashbytes = new byte[20];
for (x = 0; x < 5; x++) {
hashbytes[x * 4] = (byte) (H[x] << 0 >>> 24);
hashbytes[x * 4 + 1] = (byte) (H[x] << 8 >>> 24);
hashbytes[x * 4 + 2] = (byte) (H[x] << 16 >>> 24);
hashbytes[x * 4 + 3] = (byte) (H[x] << 24 >>> 24);
}
if (entry != null)
return entry.hash;
Hash hash = new Hash();
hash.setData(hashbytes);
return hash;

View File

@ -1,37 +0,0 @@
package net.i2p.crypto;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
/**
* Cache the objects used in DSA's SHA1 calculateHash method to reduce
* memory churn. The CacheEntry should be held onto as long as the
* data referenced in it is needed (which often is only one or two lines
* of code)
*
*/
public class SHA1EntryCache extends SHA256EntryCache {
protected CacheEntry createNew(int payload) {
return new SHA1CacheEntry(payload);
}
/**
* all the data alloc'ed in a calculateHash call
*/
public static class SHA1CacheEntry extends SHAEntryCache.CacheEntry {
public SHA1CacheEntry(int payload) {
wordlength = DSAEngine.getWordlength(payload);
bucket = payload;
hashbytes = new byte[20];
M0 = new int[wordlength];
W = new int[80];
H = new int[5];
hash = new Hash();
hash.setData(hashbytes);
}
}
}

View File

@ -14,20 +14,163 @@ import net.i2p.data.Hash;
* of code)
*
*/
public class SHA256EntryCache extends SHAEntryCache {
public final class SHA256EntryCache {
private static final int ONE_KB = 0;
private static final int FOUR_KB = 1;
private static final int EIGHT_KB = 2;
private static final int SIXTEEN_KB = 3;
private static final int THIRTYTWO_KB = 4;
private static final int FOURTYEIGHT_KB = 5;
private static final int LARGER = 6;
/**
* Array of Lists of free CacheEntry objects, indexed
* by the payload size they are capable of handling
*/
private List _available[] = new List[6];
/** count up how often we use the cache for each size */
private long _used[] = new long[7];
private int _sizes[] = new int[] { 1024,4*1024,8*1024,16*1024,32*1024,48*1024 };
/** no more than 32 at each size level */
private static final int MAX_CACHED = 64;
public SHA256EntryCache() {
super();
for (int i = 0; i < _available.length; i++) {
_available[i] = new ArrayList(MAX_CACHED);
//for (int j = 0; j < MAX_CACHED; j++)
// _available[i].add(new CacheEntry(_sizes[i]));
}
}
protected CacheEntry createNew(int payload) {
return new SHA256CacheEntry(payload);
/**
* Get the next available structure, either from the cache or a brand new one
*
*/
public final CacheEntry acquire(int payload) {
int entrySize = getBucket(payload);
switch (entrySize) {
case 1024:
_used[ONE_KB]++;
synchronized (_available[ONE_KB]) {
if (_available[ONE_KB].size() > 0) {
return (CacheEntry)_available[ONE_KB].remove(0);
}
}
break;
case 4*1024:
_used[FOUR_KB]++;
synchronized (_available[FOUR_KB]) {
if (_available[FOUR_KB].size() > 0) {
return (CacheEntry)_available[FOUR_KB].remove(0);
}
}
break;
case 8*1024:
_used[EIGHT_KB]++;
synchronized (_available[EIGHT_KB]) {
if (_available[EIGHT_KB].size() > 0) {
return (CacheEntry)_available[EIGHT_KB].remove(0);
}
}
break;
case 16*1024:
_used[SIXTEEN_KB]++;
synchronized (_available[SIXTEEN_KB]) {
if (_available[SIXTEEN_KB].size() > 0) {
return (CacheEntry)_available[SIXTEEN_KB].remove(0);
}
}
break;
case 32*1024:
_used[THIRTYTWO_KB]++;
synchronized (_available[THIRTYTWO_KB]) {
if (_available[THIRTYTWO_KB].size() > 0) {
return (CacheEntry)_available[THIRTYTWO_KB].remove(0);
}
}
break;
case 48*1024:
_used[FOURTYEIGHT_KB]++;
synchronized (_available[FOURTYEIGHT_KB]) {
if (_available[FOURTYEIGHT_KB].size() > 0) {
return (CacheEntry)_available[FOURTYEIGHT_KB].remove(0);
}
}
break;
default:
_used[LARGER]++;
// not for the bucket, so make it exact
return new CacheEntry(payload);
}
return new CacheEntry(entrySize);
}
/**
* Put this structure back onto the available cache for reuse
*
*/
public final void release(CacheEntry entry) {
entry.reset();
if (false) return;
switch (entry.bucket) {
case 1024:
synchronized (_available[ONE_KB]) {
if (_available[ONE_KB].size() < MAX_CACHED) {
_available[ONE_KB].add(entry);
}
}
return;
case 4*1024:
synchronized (_available[FOUR_KB]) {
if (_available[FOUR_KB].size() < MAX_CACHED) {
_available[FOUR_KB].add(entry);
}
}
return;
case 8*1024:
synchronized (_available[EIGHT_KB]) {
if (_available[EIGHT_KB].size() < MAX_CACHED) {
_available[EIGHT_KB].add(entry);
}
}
return;
case 16*1024:
synchronized (_available[SIXTEEN_KB]) {
if (_available[SIXTEEN_KB].size() < MAX_CACHED) {
_available[SIXTEEN_KB].add(entry);
}
}
return;
case 32*1024:
synchronized (_available[THIRTYTWO_KB]) {
if (_available[THIRTYTWO_KB].size() < MAX_CACHED) {
_available[THIRTYTWO_KB].add(entry);
}
}
return;
case 48*1024:
synchronized (_available[FOURTYEIGHT_KB]) {
if (_available[FOURTYEIGHT_KB].size() < MAX_CACHED) {
_available[FOURTYEIGHT_KB].add(entry);
}
}
return;
}
}
/**
* all the data alloc'ed in a calculateHash call
*/
public static class SHA256CacheEntry extends SHAEntryCache.CacheEntry {
public SHA256CacheEntry(int payload) {
public static final class CacheEntry {
byte hashbytes[];
int W[];
int M0[];
int H[];
Hash hash;
int wordlength;
int bucket;
public CacheEntry(int payload) {
wordlength = SHA256Generator.getWordlength(payload);
bucket = payload;
hashbytes = new byte[32];
@ -37,6 +180,30 @@ public class SHA256EntryCache extends SHAEntryCache {
hash = new Hash();
hash.setData(hashbytes);
}
public final void reset() {
Arrays.fill(hashbytes, (byte)0x0);
Arrays.fill(M0, (byte)0x0);
Arrays.fill(W, (byte)0x0);
Arrays.fill(H, (byte)0x0);
}
}
private static final int getBucket(int payload) {
if (payload <= 1024)
return 1024;
else if (payload <= 4*1024)
return 4*1024;
else if (payload <= 8*1024)
return 8*1024;
else if (payload <= 16*1024)
return 16*1024;
else if (payload <= 32*1024)
return 32*1024;
else if (payload <= 48*1024)
return 48*1024;
else
return payload;
}
public static void main(String args[]) {

View File

@ -72,8 +72,8 @@ public final class SHA256Generator {
return rv;
}
private final SHA256EntryCache.SHA256CacheEntry getNewEntry(int payloadSize) {
return new SHA256EntryCache.SHA256CacheEntry(payloadSize);
private final SHA256EntryCache.CacheEntry getNewEntry(int payloadSize) {
return new SHA256EntryCache.CacheEntry(payloadSize);
}
/** Calculate the SHA-256 has of the source

View File

@ -1,206 +0,0 @@
package net.i2p.crypto;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
/**
* Cache the objects used in SHA256Generator's calculate method to reduce
* memory churn. The CacheEntry should be held onto as long as the
* data referenced in it is needed (which often is only one or two lines
* of code)
*
*/
public abstract class SHAEntryCache {
private static final int ONE_KB = 0;
private static final int FOUR_KB = 1;
private static final int EIGHT_KB = 2;
private static final int SIXTEEN_KB = 3;
private static final int THIRTYTWO_KB = 4;
private static final int FOURTYEIGHT_KB = 5;
private static final int LARGER = 6;
/**
* Array of Lists of free CacheEntry objects, indexed
* by the payload size they are capable of handling
*/
private List _available[] = new List[6];
/** count up how often we use the cache for each size */
private long _used[] = new long[7];
private int _sizes[] = new int[] { 1024,4*1024,8*1024,16*1024,32*1024,48*1024 };
/** no more than 32 at each size level */
private static final int MAX_CACHED = 64;
public SHAEntryCache() {
for (int i = 0; i < _available.length; i++) {
_available[i] = new ArrayList(MAX_CACHED);
//for (int j = 0; j < MAX_CACHED; j++)
// _available[i].add(new CacheEntry(_sizes[i]));
}
}
/**
* Overridden by the impl to provide a brand new cache entry, capable
* of sustaining the data necessary to digest the specified payload
*
*/
protected abstract CacheEntry createNew(int payload);
/**
* Get the next available structure, either from the cache or a brand new one
*
*/
public final CacheEntry acquire(int payload) {
int entrySize = getBucket(payload);
switch (entrySize) {
case 1024:
_used[ONE_KB]++;
synchronized (_available[ONE_KB]) {
if (_available[ONE_KB].size() > 0) {
return (CacheEntry)_available[ONE_KB].remove(0);
}
}
break;
case 4*1024:
_used[FOUR_KB]++;
synchronized (_available[FOUR_KB]) {
if (_available[FOUR_KB].size() > 0) {
return (CacheEntry)_available[FOUR_KB].remove(0);
}
}
break;
case 8*1024:
_used[EIGHT_KB]++;
synchronized (_available[EIGHT_KB]) {
if (_available[EIGHT_KB].size() > 0) {
return (CacheEntry)_available[EIGHT_KB].remove(0);
}
}
break;
case 16*1024:
_used[SIXTEEN_KB]++;
synchronized (_available[SIXTEEN_KB]) {
if (_available[SIXTEEN_KB].size() > 0) {
return (CacheEntry)_available[SIXTEEN_KB].remove(0);
}
}
break;
case 32*1024:
_used[THIRTYTWO_KB]++;
synchronized (_available[THIRTYTWO_KB]) {
if (_available[THIRTYTWO_KB].size() > 0) {
return (CacheEntry)_available[THIRTYTWO_KB].remove(0);
}
}
break;
case 48*1024:
_used[FOURTYEIGHT_KB]++;
synchronized (_available[FOURTYEIGHT_KB]) {
if (_available[FOURTYEIGHT_KB].size() > 0) {
return (CacheEntry)_available[FOURTYEIGHT_KB].remove(0);
}
}
break;
default:
_used[LARGER]++;
// not for the bucket, so make it exact
return createNew(payload);
}
return createNew(payload);
}
/**
* Put this structure back onto the available cache for reuse
*
*/
public final void release(CacheEntry entry) {
entry.reset();
if (false) return;
switch (entry.bucket) {
case 1024:
synchronized (_available[ONE_KB]) {
if (_available[ONE_KB].size() < MAX_CACHED) {
_available[ONE_KB].add(entry);
}
}
return;
case 4*1024:
synchronized (_available[FOUR_KB]) {
if (_available[FOUR_KB].size() < MAX_CACHED) {
_available[FOUR_KB].add(entry);
}
}
return;
case 8*1024:
synchronized (_available[EIGHT_KB]) {
if (_available[EIGHT_KB].size() < MAX_CACHED) {
_available[EIGHT_KB].add(entry);
}
}
return;
case 16*1024:
synchronized (_available[SIXTEEN_KB]) {
if (_available[SIXTEEN_KB].size() < MAX_CACHED) {
_available[SIXTEEN_KB].add(entry);
}
}
return;
case 32*1024:
synchronized (_available[THIRTYTWO_KB]) {
if (_available[THIRTYTWO_KB].size() < MAX_CACHED) {
_available[THIRTYTWO_KB].add(entry);
}
}
return;
case 48*1024:
synchronized (_available[FOURTYEIGHT_KB]) {
if (_available[FOURTYEIGHT_KB].size() < MAX_CACHED) {
_available[FOURTYEIGHT_KB].add(entry);
}
}
return;
}
}
/**
* all the data alloc'ed in a calculateHash call
*/
public static abstract class CacheEntry {
byte hashbytes[];
int W[];
int M0[];
int H[];
Hash hash;
int wordlength;
int bucket;
protected CacheEntry() {}
public final void reset() {
Arrays.fill(hashbytes, (byte)0x0);
Arrays.fill(M0, (byte)0x0);
Arrays.fill(W, (byte)0x0);
Arrays.fill(H, (byte)0x0);
}
}
private static final int getBucket(int payload) {
if (payload <= 1024)
return 1024;
else if (payload <= 4*1024)
return 4*1024;
else if (payload <= 8*1024)
return 8*1024;
else if (payload <= 16*1024)
return 16*1024;
else if (payload <= 32*1024)
return 32*1024;
else if (payload <= 48*1024)
return 48*1024;
else
return payload;
}
}

View File

@ -1,4 +1,10 @@
$Id: history.txt,v 1.158 2005/02/26 14:16:46 jrandom Exp $
$Id: history.txt,v 1.159 2005/02/26 19:03:42 jrandom Exp $
2005-02-27 jrandom
* Don't rerequest leaseSets if there are already pending requests
* Reverted the insufficiently tested caching in the DSA/SHA1 impl, and
temporary disabled the streaming lib packet caching.
* Reduced the resend RTT penalty to 10s
2005-02-26 jrandom
* Force 1.3-isms on the precompiled jsps too (thanks laberhost)

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.153 $ $Date: 2005/02/26 14:16:47 $";
public final static String ID = "$Revision: 1.154 $ $Date: 2005/02/26 19:03:42 $";
public final static String VERSION = "0.5.0.1";
public final static long BUILD = 5;
public final static long BUILD = 6;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -348,6 +348,8 @@ public class ClientConnectionRunner {
if (_dead) return;
if ( (_currentLeaseSet != null) && (_currentLeaseSet.equals(set)) )
return; // no change
if (_leaseRequest != null)
return; // already requesting
_context.jobQueue().addJob(new RequestLeaseSetJob(_context, this, set, _context.clock().now() + expirationTime, onCreateJob, onFailedJob));
}

View File

@ -53,11 +53,11 @@ class RequestLeaseSetJob extends JobImpl {
if (oldReq != null) {
if (oldReq.getExpiration() > getContext().clock().now()) {
_log.info("request of a leaseSet is still active, wait a little bit before asking again");
requeue(5*1000);
return;
} else {
_log.error("Old *expired* leaseRequest exists! Why did the old request not get killed? (expiration = " + new Date(oldReq.getExpiration()) + ")", getAddedBy());
if (_log.shouldLog(Log.WARN))
_log.warn("Old *expired* leaseRequest exists! Why did the old request not get killed? (expiration = " + new Date(oldReq.getExpiration()) + ")", getAddedBy());
}
return;
}
LeaseRequestState state = new LeaseRequestState(_onCreate, _onFail, _expiration, _ls);
@ -121,10 +121,6 @@ class RequestLeaseSetJob extends JobImpl {
_runner.disconnectClient("Took too long to request leaseSet");
if (_req.getOnFailed() != null)
RequestLeaseSetJob.this.getContext().jobQueue().addJob(_req.getOnFailed());
// only zero out the request if its the one we know about
if (_req == _runner.getLeaseRequest())
_runner.setLeaseRequest(null);
}
}
public String getName() { return "Check LeaseRequest Status"; }