added beginnings of splitfiles handling code

might not work yet for files > 28k
This commit is contained in:
aum
2004-08-16 16:18:41 +00:00
committed by zzz
parent 3d7029493a
commit 2e99e3d9c5

View File

@ -163,10 +163,10 @@ runCore = True
# timeouts - calibrate as needed # timeouts - calibrate as needed
timeout = { timeout = {
'ping' : 60, 'ping' : 120,
'findNode' : 60, 'findNode' : 120,
'findData' : 60, 'findData' : 120,
'store' : 60, 'store' : 120,
} }
logToSocket = None logToSocket = None
@ -1186,7 +1186,7 @@ class KRpc(KBase):
#@-node:terminate #@-node:terminate
#@+node:returnValue #@+node:returnValue
def returnValue(self, result=None, **kw): def returnValue(self, res=None, **kw):
""" """
Passes a return value back to the original caller, be it Passes a return value back to the original caller, be it
the local application, or an upstream peer the local application, or an upstream peer
@ -1207,11 +1207,11 @@ class KRpc(KBase):
self.terminate() self.terminate()
if self.callback: if self.callback:
if hasattr(self, 'cbArgs'): if hasattr(self, 'cbArgs'):
self.callback(result, self.cbArgs) self.callback(res, self.cbArgs)
else: else:
self.callback(result) self.callback(res)
elif self.isLocal: elif self.isLocal:
self.queue.put(result) self.queue.put(res)
else: else:
self.upstreamPeer.send_reply(msgId=self.upstreamMsgId, self.upstreamPeer.send_reply(msgId=self.upstreamMsgId,
**kw) **kw)
@ -1986,7 +1986,7 @@ class KRpcFindNode(KRpc):
peerRec.state = 'replied' peerRec.state = 'replied'
# wrap the returned peers as KPeer objects # wrap the returned peers as KPeer objects
peersReturned = details.get('nodes', []) peersReturned = details.get('result', [])
peersReturned = [self.localNode._normalisePeer(p) for p in peersReturned] peersReturned = [self.localNode._normalisePeer(p) for p in peersReturned]
self.numPeersRecommended += len(peersReturned) self.numPeersRecommended += len(peersReturned)
@ -2001,6 +2001,7 @@ class KRpcFindNode(KRpc):
# and check for and action possible end of query round # and check for and action possible end of query round
self.checkEndOfRound() self.checkEndOfRound()
#@-node:on_reply #@-node:on_reply
#@+node:on_tick #@+node:on_tick
def on_tick(self): def on_tick(self):
@ -2157,7 +2158,8 @@ class KRpcFindNode(KRpc):
self.reportStats() self.reportStats()
KRpc.returnValue(self, items, nodes=items) KRpc.returnValue(self, items, result=items)
#@-node:returnValue #@-node:returnValue
#@+node:reportStats #@+node:reportStats
@ -2201,7 +2203,7 @@ class KRpcFindData(KRpcFindNode):
if value != None: if value != None:
self.log(4, "Found required value in local storage") self.log(4, "Found required value in local storage")
self.log(4, "VALUE='%s'" % value) self.log(4, "VALUE='%s'" % value)
self.returnValue(value) self.on_gotValue(value, self.hashWanted.asHex())
return return
# no such luck - pass on to parent # no such luck - pass on to parent
@ -2213,13 +2215,117 @@ class KRpcFindData(KRpcFindNode):
""" """
Callback for FIND_NODE reply Callback for FIND_NODE reply
""" """
res = details.get('nodes', None) res = details.get('result', None)
if isinstance(res, str): if isinstance(res, str):
self.returnValue(res) self.on_gotValue(res, self.hashWanted.asHex())
else: else:
KRpcFindNode.on_reply(self, peer, msgId, **details) KRpcFindNode.on_reply(self, peer, msgId, **details)
#@-node:on_reply #@-node:on_reply
#@+node:on_gotValue
def on_gotValue(self, value, hash=None):
"""
Callback which fires when we get the value stored under a key
Value is either the real value, or a splitfile manifest
If a real value, just return it.
If a splitfile manifest, launch nested findValue RPCs to get each chunk
"""
nchunks = 0
try:
firstline, rest = value.split("\n", 1)
firstline = firstline.strip()
kwd, str_nchunks = firstline.split(":")
if kwd != 'chunks':
raise hell
nchunks = int(nchunks)
value = rest
except:
pass # in this case, hell hath no fury at all
if nchunks == 0:
self.returnValue(value)
return
# now we get to the hard bit - we have to set up nested findData RPCs to
# get all the chunks and reassemble them
hashes = rest.strip().split("\n")
# do sanity checks
hashesAllValid = [len(h) == 40 for h in hashes]
if len(hashes) != nchunks:
self.log(
2,
"Splitfile retrieval failure\nmanifest contains %s hashes, should have been %s" % (
len(hashes), nchunks))
self.returnValue(None)
if False in hashesAllValid:
self.log(2, "Splitfile retrieval failure - one or more invalid hashes")
# now this is a bit weird - we need to bind each chunk to its hash, so we create a
# class which produces callables which fire our on_gotChunk callback
class ChunkNotifier:
def __init__(me, h, cb):
me.h = h
me.cb = cb
def __call__(me, val):
me.cb(me.h, val)
# now launch the chunk retrieval RPCs
# result is that for each retrieved chunk, our on_gotChunk callback will
# be invoked with the arguments (hash, value), so we can tick them off
self.numChunks = nchunks
self.numChunksReceived = 0
self.chunkHashes = hashes
self.chunks = dict.fromkeys(hashes)
for h in hashes:
KRpcFindData(self.localNode, h, ChunkNotifier(h, self.on_gotChunk))
# now, we can sit back and receive the chunks
#@-node:on_gotValue
#@+node:on_gotChunk
def on_gotChunk(self, hexhash, value):
"""
Callback which fires when a nested chunk findNode returns
"""
if value == None:
self.log(2, "Chunk retrieval failed, fatal to this findData")
self.returnValue(None)
return
# got a value - vet it against hash
if shahash(value) != hexhash:
self.log(2, "Got a chunk, but it doesn't hash right - fatal to this findData")
self.returnValue(None)
return
# it's valid - stash it
self.chunks[hexhash] = value
self.numChunksReceived += 1
# have we finished yet?
if self.numChunksReceived <= self.numChunks:
# no
self.log(4, "Received chunk %s of %s" % (self.numChunksReceived, self.numChunks))
return
# maybe we have
self.log(4, "We appear to have all chunks, checking further")
# sanity check
if None in self.chunks.values():
self.log(2, "Fatal - reached chunk count, but chunks still missing")
self.returnValue(None)
return
# finally done - got all chunks, hashes are valid, reassemble in order
allChunks = [self.chunks[h] for h in self.chunkHashes]
reassembled = "".join(allChunks)
self.log(4, "Reassembled all %s chunks, SUCCESS" % self.numChunks)
self.returnValue(reassembled)
#@-node:on_gotChunk
#@+node:returnValue #@+node:returnValue
def returnValue(self, items): def returnValue(self, items):
""" """
@ -2229,9 +2335,10 @@ class KRpcFindData(KRpcFindNode):
# so we can introspect it # so we can introspect it
self.localNode.lastrpc = self self.localNode.lastrpc = self
# another debugging hack
self.reportStats() self.reportStats()
KRpc.returnValue(self, items, nodes=items) KRpc.returnValue(self, items, result=items)
#@-node:returnValue #@-node:returnValue
#@-others #@-others
@ -2272,6 +2379,9 @@ class KRpcStore(KRpc):
self.value = kw['value'] self.value = kw['value']
self.isLocalOnly = kw.get('local', True) self.isLocalOnly = kw.get('local', True)
# set 'splitting' flag to indicate if we need to insert as splitfiles
self.splitting = len(self.value) > maxValueSize
self.log(4, "isLocalOnly=%s" % self.isLocalOnly) self.log(4, "isLocalOnly=%s" % self.isLocalOnly)
if kw.has_key('cbArgs'): if kw.has_key('cbArgs'):
@ -2285,6 +2395,14 @@ class KRpcStore(KRpc):
""" """
Kicks off this RPC Kicks off this RPC
""" """
# if too big, then break up into <30k chunks
if self.splitting:
self.storeSplit()
return
# not too big - prefix a 0 chunk count, and go ahead as a single entity
self.value = "chunks:0\n" + self.value
# if local only, or no peers, just save locally # if local only, or no peers, just save locally
if self.isLocalOnly or len(self.localNode.peers) == 0: if self.isLocalOnly or len(self.localNode.peers) == 0:
result = self.localNode.storage.putKey(self.keyHashed, self.value, keyIsHashed=True) result = self.localNode.storage.putKey(self.keyHashed, self.value, keyIsHashed=True)
@ -2303,7 +2421,85 @@ class KRpcStore(KRpc):
hash=self.keyHashed, raw=True, local=False) hash=self.keyHashed, raw=True, local=False)
return return
#@-node:start #@-node:start
#@+node:storeSplit
def storeSplit(self):
"""
Gets called if we're splitting a big file into smaller chunks
Here, we:
- break the file up into chunks
- build a manifest
- launch store RPCs to store each chunk, where the key is SHA(chunk)
- launch a store RPC to store the 'manifest' (noting that if the manifest
is too big, it'll get recursively inserted as a splitfile as well
"""
# break up into chunks
chunks = []
hashes = []
size = len(self.value)
i = 0
self.nchunks = 0
while i < size:
chunks.append(self.value[i:i+maxValueSize])
hashes.append(shahash(chunks[-1]))
i += maxValueSize
self.nchunks += 1
# build the manifest
manifest = "chunks:%s\n%s\n" % (self.nchunks, "\n".join(hashes))
# set progress attributes
self.chunkManifestInserted = False
self.chunksInserted = 0
# launch nested Store RPCs for manifest, and each chunk
KRpcStore(self.localNode, self.on_doneChunkManifest,
local=self.isLocalOnly,
key=self.key,
value=manifest)
i = 0
while i < self.nchunks:
KRpcStore(self.localNode, self.on_doneChunk,
local=self.isLocalOnly,
key=hashes[i],
value=chunks[i])
i += 1
# now sit back and wait for the callbacks
#@-node:storeSplit
#@+node:on_doneChunkManifest
def on_doneChunkManifest(self, result):
"""
Callback which fires when a manifest insert succeeds/fails
"""
# the chunk callback handles all
self.on_doneChunk(result, isManifest=True)
#@-node:on_doneChunkManifest
#@+node:on_doneChunk
def on_doneChunk(self, result, isManifest=False):
"""
Callback which fires when a single chunk insert succeeds/fails
"""
# a failure either way means the whole RPC has failed
if not result:
# one huge fuck-up
self.returnValue(False)
return
# update our tally
if isManifest:
self.chunkManifestInserted = True
else:
self.chunksInserted += 1
# finished?
if self.chunkManifestInserted and (self.chunksInserted == self.nchunks):
# yep = success
self.returnValue(True)
#@-node:on_doneChunk
#@+node:returnValue #@+node:returnValue
def returnValue(self, result): def returnValue(self, result):
""" """