Fixed n bugs, particularly relating to store
This commit is contained in:
@ -165,7 +165,7 @@ timeout = {
|
|||||||
'ping' : 60,
|
'ping' : 60,
|
||||||
'findNode' : 60,
|
'findNode' : 60,
|
||||||
'findData' : 60,
|
'findData' : 60,
|
||||||
'store' : 120,
|
'store' : 60,
|
||||||
}
|
}
|
||||||
|
|
||||||
logToSocket = None
|
logToSocket = None
|
||||||
@ -1863,15 +1863,18 @@ class KRpcFindNode(KRpc):
|
|||||||
# start our ticker
|
# start our ticker
|
||||||
self.nextTickTime = time.time() + timeout['findNode']
|
self.nextTickTime = time.time() + timeout['findNode']
|
||||||
|
|
||||||
|
numQueriesSent = 0
|
||||||
|
|
||||||
# and send them findNode queries
|
# and send them findNode queries
|
||||||
if len(somePeerRecs) > 0:
|
if len(somePeerRecs) > 0:
|
||||||
for peerRec in somePeerRecs:
|
for peerRec in somePeerRecs:
|
||||||
self.log(3, "querying %s" % peerRec)
|
self.log(3, "querying %s" % peerRec)
|
||||||
if self.numQueriesPending < maxConcurrentQueries:
|
if self.numQueriesPending < maxConcurrentQueries:
|
||||||
self.sendOneQuery(peerRec)
|
self.sendOneQuery(peerRec)
|
||||||
|
numQueriesSent += 1
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
self.log(3, "queries sent, awaiting reply")
|
self.log(3, "%s queries sent, awaiting reply" % numQueriesSent)
|
||||||
else:
|
else:
|
||||||
self.log(3, "no peer recs???")
|
self.log(3, "no peer recs???")
|
||||||
for peerRec in self.peerTab:
|
for peerRec in self.peerTab:
|
||||||
@ -2317,13 +2320,12 @@ class KRpcStore(KRpc):
|
|||||||
|
|
||||||
self.log(2, "STORE RPC findNode - got peers %s" % repr(peers))
|
self.log(2, "STORE RPC findNode - got peers %s" % repr(peers))
|
||||||
|
|
||||||
self.numPeersToStore = min(len(peers), numStorePeers)
|
i = 0
|
||||||
|
|
||||||
self.numPeersSucceeded = 0
|
self.numPeersSucceeded = 0
|
||||||
self.numPeersFailed = 0
|
self.numPeersFailed = 0
|
||||||
self.numPeersFinished = 0
|
self.numPeersFinished = 0
|
||||||
|
|
||||||
i = 0
|
|
||||||
|
|
||||||
# and fire off store messages for each peer
|
# and fire off store messages for each peer
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
|
|
||||||
@ -2341,8 +2343,13 @@ class KRpcStore(KRpc):
|
|||||||
if i >= numStorePeers:
|
if i >= numStorePeers:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
self.nextTickTime = time.time() + timeout['store']
|
||||||
|
|
||||||
self.log(2, "Sent store cmd to %s peers, awaiting responses" % i)
|
self.log(2, "Sent store cmd to %s peers, awaiting responses" % i)
|
||||||
|
|
||||||
|
self.numPeersToStore = i
|
||||||
|
|
||||||
|
|
||||||
#@-node:on_doneFindNode
|
#@-node:on_doneFindNode
|
||||||
#@+node:on_reply
|
#@+node:on_reply
|
||||||
def on_reply(self, peer, msgId, **details):
|
def on_reply(self, peer, msgId, **details):
|
||||||
@ -2360,9 +2367,15 @@ class KRpcStore(KRpc):
|
|||||||
#@-node:on_reply
|
#@-node:on_reply
|
||||||
#@+node:on_tick
|
#@+node:on_tick
|
||||||
def on_tick(self):
|
def on_tick(self):
|
||||||
self.log(3, "got a timeout tick, what should we do??")
|
|
||||||
|
|
||||||
self.nextTickTime = time.time() + 3
|
self.log(3, "Timeout awaiting store reply from %d out of %d peers" % (
|
||||||
|
self.numPeersToStore - self.numPeersSucceeded, self.numPeersToStore))
|
||||||
|
|
||||||
|
if self.numPeersSucceeded == 0:
|
||||||
|
self.log(3, "Store timeout - no peers replied, storing locally")
|
||||||
|
self.localNode.storage.putKey(self.keyHashed, self.value, keyIsHashed=True)
|
||||||
|
|
||||||
|
self.returnValue(True)
|
||||||
|
|
||||||
#@-node:on_tick
|
#@-node:on_tick
|
||||||
#@-others
|
#@-others
|
||||||
@ -3262,7 +3275,7 @@ class KNode(KBase):
|
|||||||
|
|
||||||
# check for timed-out RPCs
|
# check for timed-out RPCs
|
||||||
for rpc in self.rpcPending[:]:
|
for rpc in self.rpcPending[:]:
|
||||||
if now >= rpc.nextTickTime:
|
if rpc.nextTickTime != None and now >= rpc.nextTickTime:
|
||||||
try:
|
try:
|
||||||
rpc.on_tick()
|
rpc.on_tick()
|
||||||
except:
|
except:
|
||||||
|
Reference in New Issue
Block a user