From fb04fc1c174d69fd8b048b1528032c77ba5c468e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Fri, 14 Apr 2017 00:05:36 +0200 Subject: [PATCH] Shell: lazy access to all the operations of a block This prepares the node to the new on-disk storage. --- src/node/shell/node.ml | 76 +++++++++++++++++++------------------ src/node/shell/state.ml | 60 +++++++++++++++++++++-------- src/node/shell/state.mli | 5 ++- src/node/shell/validator.ml | 6 ++- 4 files changed, 91 insertions(+), 56 deletions(-) diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index ffcf794d5..0cde44a97 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -150,20 +150,22 @@ module RPC = struct test_network: Context.test_network; } - let convert (block: State.Valid_block.t) = { - hash = block.hash ; - net_id = block.net_id ; - level = block.level ; - proto_level = block.proto_level ; - predecessor = block.predecessor ; - timestamp = block.timestamp ; - operations_hash = block.operations_hash ; - fitness = block.fitness ; - data = block.proto_header ; - operations = Some block.operations ; - protocol = block.protocol_hash ; - test_network = block.test_network ; - } + let convert (block: State.Valid_block.t) = + Lazy.force block.operation_hashes >>= fun operations -> + Lwt.return { + hash = block.hash ; + net_id = block.net_id ; + level = block.level ; + proto_level = block.proto_level ; + predecessor = block.predecessor ; + timestamp = block.timestamp ; + operations_hash = block.operations_hash ; + fitness = block.fitness ; + data = block.proto_header ; + operations = Some operations ; + protocol = block.protocol_hash ; + test_network = block.test_network ; + } let inject_block node = node.inject_block let inject_operation node = node.inject_operation @@ -174,7 +176,7 @@ module RPC = struct | Some (net_db, _block) -> let net = Distributed_db.state net_db in State.Valid_block.read_exn net hash >>= fun block -> - Lwt.return (convert block) + convert block | None -> Lwt.fail Not_found @@ -249,15 +251,15 @@ module RPC = struct let block_info node (block: block) = match block with | `Genesis -> - State.Valid_block.Current.genesis node.mainnet_net >|= convert + State.Valid_block.Current.genesis node.mainnet_net >>= convert | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in let net_db = Validator.net_db validator in let net_state = Validator.net_state validator in State.Valid_block.Current.head net_state >>= fun head -> - get_pred net_db n head >|= convert + get_pred net_db n head >>= convert | `Hash h -> - read_valid_block_exn node h >|= convert + read_valid_block_exn node h >>= convert | ( `Prevalidation | `Test_prevalidation ) as block -> let validator = get_validator node block in let pv = Validator.prevalidator validator in @@ -331,24 +333,25 @@ module RPC = struct let operations node block = match block with | `Genesis -> - State.Valid_block.Current.genesis node.mainnet_net >>= fun { operations } -> - Lwt.return operations + State.Valid_block.Current.genesis node.mainnet_net >>= fun { operation_hashes } -> + Lazy.force operation_hashes | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in let net_state = Validator.net_state validator in let net_db = Validator.net_db validator in State.Valid_block.Current.head net_state >>= fun head -> - get_pred net_db n head >>= fun { operations } -> - Lwt.return operations + get_pred net_db n head >>= fun { operation_hashes } -> + Lazy.force operation_hashes | (`Prevalidation | `Test_prevalidation) as block -> let validator, _net = get_net node block in let pv = Validator.prevalidator validator in let { Prevalidation.applied }, _ = Prevalidator.operations pv in Lwt.return [applied] | `Hash hash -> - read_valid_block node hash >|= function - | None -> [] - | Some { operations } -> operations + read_valid_block node hash >>= function + | None -> Lwt.return_nil + | Some { operation_hashes } -> + Lazy.force operation_hashes let operation_content node hash = Distributed_db.read_operation node.distributed_db hash >>= fun op -> @@ -464,13 +467,12 @@ module RPC = struct | Some (_, net_db) -> State.Valid_block.known_heads (Distributed_db.state net_db) end >>= fun test_heads -> - let map = - List.fold_left - (fun map block -> - Block_hash.Map.add - block.State.Valid_block.hash (convert block) map) - Block_hash.Map.empty (test_heads @ heads) in - Lwt.return map + Lwt_list.fold_left_s + (fun map block -> + convert block >|= fun bi -> + Block_hash.Map.add + block.State.Valid_block.hash bi map) + Block_hash.Map.empty (test_heads @ heads) let predecessors node len head = let rec loop net_db acc len hash (block: State.Block_header.t) = @@ -494,13 +496,13 @@ module RPC = struct try let rec loop acc len hash = State.Valid_block.read_exn state hash >>= fun block -> - let bi = convert block in + convert block >>= fun bi -> if Block_hash.equal bi.predecessor hash then Lwt.return (List.rev (bi :: acc)) else begin if len = 0 || Block_hash.Set.mem hash ignored then - Lwt.return (List.rev acc) + Lwt.return (List.rev acc) else loop (bi :: acc) (len-1) bi.predecessor end in @@ -513,12 +515,12 @@ module RPC = struct Distributed_db.read_block_exn node.distributed_db head >>= fun (net_db, _block) -> let net_state = Distributed_db.state net_db in - predecessors_bi net_state ignored len head >|= fun predecessors -> + predecessors_bi net_state ignored len head >>= fun predecessors -> let ignored = List.fold_right (fun x s -> Block_hash.Set.add x.hash s) predecessors ignored in - ignored, predecessors :: acc + Lwt.return (ignored, predecessors :: acc) ) (Block_hash.Set.empty, []) heads >>= fun (_, blocks) -> @@ -528,7 +530,7 @@ module RPC = struct let valid_block_watcher node = let stream, shutdown = Validator.global_watcher node.validator in - Lwt_stream.map (fun block -> convert block) stream, + Lwt_stream.map_s (fun block -> convert block) stream, shutdown let operation_watcher node = diff --git a/src/node/shell/state.ml b/src/node/shell/state.ml index a21ca4188..ce4846faf 100644 --- a/src/node/shell/state.ml +++ b/src/node/shell/state.ml @@ -116,7 +116,8 @@ and valid_block = { timestamp: Time.t ; fitness: Protocol.fitness ; operations_hash: Operation_list_list_hash.t ; - operations: Operation_hash.t list list ; + operation_hashes: Operation_hash.t list list Lwt.t Lazy.t ; + operations: Store.Operation.t list list Lwt.t Lazy.t ; discovery_time: Time.t ; protocol_hash: Protocol_hash.t ; protocol: (module Updater.REGISTRED_PROTOCOL) option ; @@ -128,7 +129,7 @@ and valid_block = { } let build_valid_block - hash header operations + hash header operation_hashes operations context discovery_time successors invalid_successors = Context.get_protocol context >>= fun protocol_hash -> Context.get_test_network context >>= fun test_network -> @@ -142,6 +143,7 @@ let build_valid_block timestamp = header.shell.timestamp ; discovery_time ; operations_hash = header.shell.operations_hash ; + operation_hashes ; operations ; fitness = header.shell.fitness ; protocol_hash ; @@ -724,6 +726,9 @@ module Block_header = struct let read_operations s k = Raw_operation_list.read_all s.block_header_store k + let read_operations_exn s k = + Raw_operation_list.read_all_exn s.block_header_store k + let mark_invalid net hash errors = mark_invalid net hash errors >>= fun marked -> if not marked then @@ -909,7 +914,8 @@ module Raw_net = struct Lwt.return context end >>= fun context -> build_valid_block - genesis.block header [] context genesis.time + genesis.block header (lazy Lwt.return_nil) (lazy Lwt.return_nil) + context genesis.time Block_hash.Set.empty Block_hash.Set.empty >>= fun genesis_block -> Lwt.return @@ build @@ -936,7 +942,8 @@ module Valid_block = struct timestamp: Time.t ; fitness: Fitness.fitness ; operations_hash: Operation_list_list_hash.t ; - operations: Operation_hash.t list list ; + operation_hashes: Operation_hash.t list list Lwt.t Lazy.t ; + operations: Store.Operation.t list list Lwt.t Lazy.t ; discovery_time: Time.t ; protocol_hash: Protocol_hash.t ; protocol: (module Updater.REGISTRED_PROTOCOL) option ; @@ -953,7 +960,9 @@ module Valid_block = struct let known { context_index } hash = Context.exists context_index hash - let raw_read block operations time chain_store context_index hash = + let raw_read + block operations operation_hashes + time chain_store context_index hash = Context.checkout context_index hash >>= function | None -> fail (Unknown_context hash) @@ -962,12 +971,15 @@ module Valid_block = struct >>= fun successors -> Store.Chain.Invalid_successors.read_all (chain_store, hash) >>= fun invalid_successors -> - build_valid_block hash block operations + build_valid_block hash block operation_hashes operations context time successors invalid_successors >>= fun block -> return block - let raw_read_exn block operations time chain_store context_index hash = - raw_read block operations time chain_store context_index hash >>= function + let raw_read_exn + block operations operation_hashes + time chain_store context_index hash = + raw_read block operations operation_hashes + time chain_store context_index hash >>= function | Error _ -> Lwt.fail Not_found | Ok data -> Lwt.return data @@ -976,8 +988,17 @@ module Valid_block = struct | None | Some { Time.data = Error _ } -> fail (Unknown_block hash) | Some { Time.data = Ok block ; time } -> - Block_header.read_operations net hash >>=? fun operations -> - raw_read block operations + let operation_hashes = + lazy (Block_header.read_operations_exn net hash) in + let operations = + lazy ( + Lazy.force operation_hashes >>= fun operations -> + Lwt_list.map_p + (Lwt_list.map_p + (Raw_operation.read_exn net.operation_store )) + operations) + in + raw_read block operations operation_hashes time net_state.chain_store net_state.context_index hash let read_opt net net_state hash = @@ -991,6 +1012,7 @@ module Valid_block = struct | Ok data -> Lwt.return data let store + operation_store block_header_store (net_state: net_state) valid_block_watcher @@ -1011,8 +1033,6 @@ module Valid_block = struct Raw_block_header.Locked.mark_valid block_header_store hash >>= fun _marked -> (* TODO fail if the block was previsouly stored ... ??? *) - Operation_list.Locked.read_all - block_header_store hash >>=? fun operations -> (* Let's commit the context. *) let message = match message with @@ -1031,8 +1051,17 @@ module Valid_block = struct Store.Chain.Valid_successors.store (store, predecessor) hash >>= fun () -> (* Build the `valid_block` value. *) + let operation_hashes = + lazy (Operation_list.Locked.read_all_exn block_header_store hash) in + let operations = + lazy ( + Lazy.force operation_hashes >>= fun operations -> + Lwt_list.map_p + (Lwt_list.map_p + (Raw_operation.read_exn operation_store )) + operations) in raw_read_exn - block operations discovery_time + block operations operation_hashes discovery_time net_state.chain_store net_state.context_index hash >>= fun valid_block -> Watcher.notify valid_block_watcher valid_block ; Lwt.return (Ok valid_block) @@ -1067,7 +1096,7 @@ module Valid_block = struct block_header_store hash >>= function | Some _ -> return None (* Previously invalidated block. *) | None -> - Locked.store + Locked.store net.operation_store block_header_store net_state net.valid_block_watcher hash vcontext >>=? fun valid_block -> return (Some valid_block) @@ -1328,7 +1357,8 @@ module Net = struct Block_header.Locked.read_discovery_time block_header_store genesis_hash >>=? fun genesis_discovery_time -> Valid_block.Locked.raw_read - genesis_shell_header [] genesis_discovery_time + genesis_shell_header (lazy Lwt.return_nil) (lazy Lwt.return_nil) + genesis_discovery_time chain_store context_index genesis_hash >>=? fun genesis_block -> return @@ Raw_net.build diff --git a/src/node/shell/state.mli b/src/node/shell/state.mli index a840c7f43..8d7a2b3a6 100644 --- a/src/node/shell/state.mli +++ b/src/node/shell/state.mli @@ -258,8 +258,9 @@ module Valid_block : sig fitness: Protocol.fitness ; (** The (validated) score of the block. *) operations_hash: Operation_list_list_hash.t ; - operations: Operation_hash.t list list ; - (** The sequence of operations ans its (Merkle-)hash. *) + operation_hashes: Operation_hash.t list list Lwt.t Lazy.t ; + operations: Store.Operation.t list list Lwt.t Lazy.t ; + (** The sequence of operations and its (Merkle-)hash. *) discovery_time: Time.t ; (** The data at which the block was discorevered on the P2P network. *) protocol_hash: Protocol_hash.t ; diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index 29df775f7..1daa9b96b 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -350,16 +350,18 @@ module Context_db = struct State.Valid_block.store net_state hash data >>=? function | None -> State.Valid_block.read net_state hash >>=? fun block -> + Lazy.force block.operation_hashes >>= fun ophs -> Lwt_list.iter_p (Lwt_list.iter_p (fun hash -> Distributed_db.Operation.commit net_db hash)) - block.operations >>= fun () -> + ophs >>= fun () -> return (Ok block, false) | Some block -> + Lazy.force block.operation_hashes >>= fun ophs -> Lwt_list.iter_p (Lwt_list.iter_p (fun hash -> Distributed_db.Operation.commit net_db hash)) - block.operations >>= fun () -> + ophs >>= fun () -> return (Ok block, true) end | Error err ->