Shell: lazy access to all the operations of a block
This prepares the node to the new on-disk storage.
This commit is contained in:
parent
729ca9887d
commit
fb04fc1c17
@ -150,20 +150,22 @@ module RPC = struct
|
||||
test_network: Context.test_network;
|
||||
}
|
||||
|
||||
let convert (block: State.Valid_block.t) = {
|
||||
hash = block.hash ;
|
||||
net_id = block.net_id ;
|
||||
level = block.level ;
|
||||
proto_level = block.proto_level ;
|
||||
predecessor = block.predecessor ;
|
||||
timestamp = block.timestamp ;
|
||||
operations_hash = block.operations_hash ;
|
||||
fitness = block.fitness ;
|
||||
data = block.proto_header ;
|
||||
operations = Some block.operations ;
|
||||
protocol = block.protocol_hash ;
|
||||
test_network = block.test_network ;
|
||||
}
|
||||
let convert (block: State.Valid_block.t) =
|
||||
Lazy.force block.operation_hashes >>= fun operations ->
|
||||
Lwt.return {
|
||||
hash = block.hash ;
|
||||
net_id = block.net_id ;
|
||||
level = block.level ;
|
||||
proto_level = block.proto_level ;
|
||||
predecessor = block.predecessor ;
|
||||
timestamp = block.timestamp ;
|
||||
operations_hash = block.operations_hash ;
|
||||
fitness = block.fitness ;
|
||||
data = block.proto_header ;
|
||||
operations = Some operations ;
|
||||
protocol = block.protocol_hash ;
|
||||
test_network = block.test_network ;
|
||||
}
|
||||
|
||||
let inject_block node = node.inject_block
|
||||
let inject_operation node = node.inject_operation
|
||||
@ -174,7 +176,7 @@ module RPC = struct
|
||||
| Some (net_db, _block) ->
|
||||
let net = Distributed_db.state net_db in
|
||||
State.Valid_block.read_exn net hash >>= fun block ->
|
||||
Lwt.return (convert block)
|
||||
convert block
|
||||
| None ->
|
||||
Lwt.fail Not_found
|
||||
|
||||
@ -249,15 +251,15 @@ module RPC = struct
|
||||
let block_info node (block: block) =
|
||||
match block with
|
||||
| `Genesis ->
|
||||
State.Valid_block.Current.genesis node.mainnet_net >|= convert
|
||||
State.Valid_block.Current.genesis node.mainnet_net >>= convert
|
||||
| ( `Head n | `Test_head n ) as block ->
|
||||
let validator = get_validator node block in
|
||||
let net_db = Validator.net_db validator in
|
||||
let net_state = Validator.net_state validator in
|
||||
State.Valid_block.Current.head net_state >>= fun head ->
|
||||
get_pred net_db n head >|= convert
|
||||
get_pred net_db n head >>= convert
|
||||
| `Hash h ->
|
||||
read_valid_block_exn node h >|= convert
|
||||
read_valid_block_exn node h >>= convert
|
||||
| ( `Prevalidation | `Test_prevalidation ) as block ->
|
||||
let validator = get_validator node block in
|
||||
let pv = Validator.prevalidator validator in
|
||||
@ -331,24 +333,25 @@ module RPC = struct
|
||||
let operations node block =
|
||||
match block with
|
||||
| `Genesis ->
|
||||
State.Valid_block.Current.genesis node.mainnet_net >>= fun { operations } ->
|
||||
Lwt.return operations
|
||||
State.Valid_block.Current.genesis node.mainnet_net >>= fun { operation_hashes } ->
|
||||
Lazy.force operation_hashes
|
||||
| ( `Head n | `Test_head n ) as block ->
|
||||
let validator = get_validator node block in
|
||||
let net_state = Validator.net_state validator in
|
||||
let net_db = Validator.net_db validator in
|
||||
State.Valid_block.Current.head net_state >>= fun head ->
|
||||
get_pred net_db n head >>= fun { operations } ->
|
||||
Lwt.return operations
|
||||
get_pred net_db n head >>= fun { operation_hashes } ->
|
||||
Lazy.force operation_hashes
|
||||
| (`Prevalidation | `Test_prevalidation) as block ->
|
||||
let validator, _net = get_net node block in
|
||||
let pv = Validator.prevalidator validator in
|
||||
let { Prevalidation.applied }, _ = Prevalidator.operations pv in
|
||||
Lwt.return [applied]
|
||||
| `Hash hash ->
|
||||
read_valid_block node hash >|= function
|
||||
| None -> []
|
||||
| Some { operations } -> operations
|
||||
read_valid_block node hash >>= function
|
||||
| None -> Lwt.return_nil
|
||||
| Some { operation_hashes } ->
|
||||
Lazy.force operation_hashes
|
||||
|
||||
let operation_content node hash =
|
||||
Distributed_db.read_operation node.distributed_db hash >>= fun op ->
|
||||
@ -464,13 +467,12 @@ module RPC = struct
|
||||
| Some (_, net_db) ->
|
||||
State.Valid_block.known_heads (Distributed_db.state net_db)
|
||||
end >>= fun test_heads ->
|
||||
let map =
|
||||
List.fold_left
|
||||
(fun map block ->
|
||||
Block_hash.Map.add
|
||||
block.State.Valid_block.hash (convert block) map)
|
||||
Block_hash.Map.empty (test_heads @ heads) in
|
||||
Lwt.return map
|
||||
Lwt_list.fold_left_s
|
||||
(fun map block ->
|
||||
convert block >|= fun bi ->
|
||||
Block_hash.Map.add
|
||||
block.State.Valid_block.hash bi map)
|
||||
Block_hash.Map.empty (test_heads @ heads)
|
||||
|
||||
let predecessors node len head =
|
||||
let rec loop net_db acc len hash (block: State.Block_header.t) =
|
||||
@ -494,13 +496,13 @@ module RPC = struct
|
||||
try
|
||||
let rec loop acc len hash =
|
||||
State.Valid_block.read_exn state hash >>= fun block ->
|
||||
let bi = convert block in
|
||||
convert block >>= fun bi ->
|
||||
if Block_hash.equal bi.predecessor hash then
|
||||
Lwt.return (List.rev (bi :: acc))
|
||||
else begin
|
||||
if len = 0
|
||||
|| Block_hash.Set.mem hash ignored then
|
||||
Lwt.return (List.rev acc)
|
||||
Lwt.return (List.rev acc)
|
||||
else
|
||||
loop (bi :: acc) (len-1) bi.predecessor
|
||||
end in
|
||||
@ -513,12 +515,12 @@ module RPC = struct
|
||||
Distributed_db.read_block_exn
|
||||
node.distributed_db head >>= fun (net_db, _block) ->
|
||||
let net_state = Distributed_db.state net_db in
|
||||
predecessors_bi net_state ignored len head >|= fun predecessors ->
|
||||
predecessors_bi net_state ignored len head >>= fun predecessors ->
|
||||
let ignored =
|
||||
List.fold_right
|
||||
(fun x s -> Block_hash.Set.add x.hash s)
|
||||
predecessors ignored in
|
||||
ignored, predecessors :: acc
|
||||
Lwt.return (ignored, predecessors :: acc)
|
||||
)
|
||||
(Block_hash.Set.empty, [])
|
||||
heads >>= fun (_, blocks) ->
|
||||
@ -528,7 +530,7 @@ module RPC = struct
|
||||
|
||||
let valid_block_watcher node =
|
||||
let stream, shutdown = Validator.global_watcher node.validator in
|
||||
Lwt_stream.map (fun block -> convert block) stream,
|
||||
Lwt_stream.map_s (fun block -> convert block) stream,
|
||||
shutdown
|
||||
|
||||
let operation_watcher node =
|
||||
|
@ -116,7 +116,8 @@ and valid_block = {
|
||||
timestamp: Time.t ;
|
||||
fitness: Protocol.fitness ;
|
||||
operations_hash: Operation_list_list_hash.t ;
|
||||
operations: Operation_hash.t list list ;
|
||||
operation_hashes: Operation_hash.t list list Lwt.t Lazy.t ;
|
||||
operations: Store.Operation.t list list Lwt.t Lazy.t ;
|
||||
discovery_time: Time.t ;
|
||||
protocol_hash: Protocol_hash.t ;
|
||||
protocol: (module Updater.REGISTRED_PROTOCOL) option ;
|
||||
@ -128,7 +129,7 @@ and valid_block = {
|
||||
}
|
||||
|
||||
let build_valid_block
|
||||
hash header operations
|
||||
hash header operation_hashes operations
|
||||
context discovery_time successors invalid_successors =
|
||||
Context.get_protocol context >>= fun protocol_hash ->
|
||||
Context.get_test_network context >>= fun test_network ->
|
||||
@ -142,6 +143,7 @@ let build_valid_block
|
||||
timestamp = header.shell.timestamp ;
|
||||
discovery_time ;
|
||||
operations_hash = header.shell.operations_hash ;
|
||||
operation_hashes ;
|
||||
operations ;
|
||||
fitness = header.shell.fitness ;
|
||||
protocol_hash ;
|
||||
@ -724,6 +726,9 @@ module Block_header = struct
|
||||
let read_operations s k =
|
||||
Raw_operation_list.read_all s.block_header_store k
|
||||
|
||||
let read_operations_exn s k =
|
||||
Raw_operation_list.read_all_exn s.block_header_store k
|
||||
|
||||
let mark_invalid net hash errors =
|
||||
mark_invalid net hash errors >>= fun marked ->
|
||||
if not marked then
|
||||
@ -909,7 +914,8 @@ module Raw_net = struct
|
||||
Lwt.return context
|
||||
end >>= fun context ->
|
||||
build_valid_block
|
||||
genesis.block header [] context genesis.time
|
||||
genesis.block header (lazy Lwt.return_nil) (lazy Lwt.return_nil)
|
||||
context genesis.time
|
||||
Block_hash.Set.empty Block_hash.Set.empty >>= fun genesis_block ->
|
||||
Lwt.return @@
|
||||
build
|
||||
@ -936,7 +942,8 @@ module Valid_block = struct
|
||||
timestamp: Time.t ;
|
||||
fitness: Fitness.fitness ;
|
||||
operations_hash: Operation_list_list_hash.t ;
|
||||
operations: Operation_hash.t list list ;
|
||||
operation_hashes: Operation_hash.t list list Lwt.t Lazy.t ;
|
||||
operations: Store.Operation.t list list Lwt.t Lazy.t ;
|
||||
discovery_time: Time.t ;
|
||||
protocol_hash: Protocol_hash.t ;
|
||||
protocol: (module Updater.REGISTRED_PROTOCOL) option ;
|
||||
@ -953,7 +960,9 @@ module Valid_block = struct
|
||||
let known { context_index } hash =
|
||||
Context.exists context_index hash
|
||||
|
||||
let raw_read block operations time chain_store context_index hash =
|
||||
let raw_read
|
||||
block operations operation_hashes
|
||||
time chain_store context_index hash =
|
||||
Context.checkout context_index hash >>= function
|
||||
| None ->
|
||||
fail (Unknown_context hash)
|
||||
@ -962,12 +971,15 @@ module Valid_block = struct
|
||||
>>= fun successors ->
|
||||
Store.Chain.Invalid_successors.read_all (chain_store, hash)
|
||||
>>= fun invalid_successors ->
|
||||
build_valid_block hash block operations
|
||||
build_valid_block hash block operation_hashes operations
|
||||
context time successors invalid_successors >>= fun block ->
|
||||
return block
|
||||
|
||||
let raw_read_exn block operations time chain_store context_index hash =
|
||||
raw_read block operations time chain_store context_index hash >>= function
|
||||
let raw_read_exn
|
||||
block operations operation_hashes
|
||||
time chain_store context_index hash =
|
||||
raw_read block operations operation_hashes
|
||||
time chain_store context_index hash >>= function
|
||||
| Error _ -> Lwt.fail Not_found
|
||||
| Ok data -> Lwt.return data
|
||||
|
||||
@ -976,8 +988,17 @@ module Valid_block = struct
|
||||
| None | Some { Time.data = Error _ } ->
|
||||
fail (Unknown_block hash)
|
||||
| Some { Time.data = Ok block ; time } ->
|
||||
Block_header.read_operations net hash >>=? fun operations ->
|
||||
raw_read block operations
|
||||
let operation_hashes =
|
||||
lazy (Block_header.read_operations_exn net hash) in
|
||||
let operations =
|
||||
lazy (
|
||||
Lazy.force operation_hashes >>= fun operations ->
|
||||
Lwt_list.map_p
|
||||
(Lwt_list.map_p
|
||||
(Raw_operation.read_exn net.operation_store ))
|
||||
operations)
|
||||
in
|
||||
raw_read block operations operation_hashes
|
||||
time net_state.chain_store net_state.context_index hash
|
||||
|
||||
let read_opt net net_state hash =
|
||||
@ -991,6 +1012,7 @@ module Valid_block = struct
|
||||
| Ok data -> Lwt.return data
|
||||
|
||||
let store
|
||||
operation_store
|
||||
block_header_store
|
||||
(net_state: net_state)
|
||||
valid_block_watcher
|
||||
@ -1011,8 +1033,6 @@ module Valid_block = struct
|
||||
Raw_block_header.Locked.mark_valid
|
||||
block_header_store hash >>= fun _marked ->
|
||||
(* TODO fail if the block was previsouly stored ... ??? *)
|
||||
Operation_list.Locked.read_all
|
||||
block_header_store hash >>=? fun operations ->
|
||||
(* Let's commit the context. *)
|
||||
let message =
|
||||
match message with
|
||||
@ -1031,8 +1051,17 @@ module Valid_block = struct
|
||||
Store.Chain.Valid_successors.store
|
||||
(store, predecessor) hash >>= fun () ->
|
||||
(* Build the `valid_block` value. *)
|
||||
let operation_hashes =
|
||||
lazy (Operation_list.Locked.read_all_exn block_header_store hash) in
|
||||
let operations =
|
||||
lazy (
|
||||
Lazy.force operation_hashes >>= fun operations ->
|
||||
Lwt_list.map_p
|
||||
(Lwt_list.map_p
|
||||
(Raw_operation.read_exn operation_store ))
|
||||
operations) in
|
||||
raw_read_exn
|
||||
block operations discovery_time
|
||||
block operations operation_hashes discovery_time
|
||||
net_state.chain_store net_state.context_index hash >>= fun valid_block ->
|
||||
Watcher.notify valid_block_watcher valid_block ;
|
||||
Lwt.return (Ok valid_block)
|
||||
@ -1067,7 +1096,7 @@ module Valid_block = struct
|
||||
block_header_store hash >>= function
|
||||
| Some _ -> return None (* Previously invalidated block. *)
|
||||
| None ->
|
||||
Locked.store
|
||||
Locked.store net.operation_store
|
||||
block_header_store net_state net.valid_block_watcher
|
||||
hash vcontext >>=? fun valid_block ->
|
||||
return (Some valid_block)
|
||||
@ -1328,7 +1357,8 @@ module Net = struct
|
||||
Block_header.Locked.read_discovery_time block_header_store
|
||||
genesis_hash >>=? fun genesis_discovery_time ->
|
||||
Valid_block.Locked.raw_read
|
||||
genesis_shell_header [] genesis_discovery_time
|
||||
genesis_shell_header (lazy Lwt.return_nil) (lazy Lwt.return_nil)
|
||||
genesis_discovery_time
|
||||
chain_store context_index genesis_hash >>=? fun genesis_block ->
|
||||
return @@
|
||||
Raw_net.build
|
||||
|
@ -258,8 +258,9 @@ module Valid_block : sig
|
||||
fitness: Protocol.fitness ;
|
||||
(** The (validated) score of the block. *)
|
||||
operations_hash: Operation_list_list_hash.t ;
|
||||
operations: Operation_hash.t list list ;
|
||||
(** The sequence of operations ans its (Merkle-)hash. *)
|
||||
operation_hashes: Operation_hash.t list list Lwt.t Lazy.t ;
|
||||
operations: Store.Operation.t list list Lwt.t Lazy.t ;
|
||||
(** The sequence of operations and its (Merkle-)hash. *)
|
||||
discovery_time: Time.t ;
|
||||
(** The data at which the block was discorevered on the P2P network. *)
|
||||
protocol_hash: Protocol_hash.t ;
|
||||
|
@ -350,16 +350,18 @@ module Context_db = struct
|
||||
State.Valid_block.store net_state hash data >>=? function
|
||||
| None ->
|
||||
State.Valid_block.read net_state hash >>=? fun block ->
|
||||
Lazy.force block.operation_hashes >>= fun ophs ->
|
||||
Lwt_list.iter_p
|
||||
(Lwt_list.iter_p (fun hash ->
|
||||
Distributed_db.Operation.commit net_db hash))
|
||||
block.operations >>= fun () ->
|
||||
ophs >>= fun () ->
|
||||
return (Ok block, false)
|
||||
| Some block ->
|
||||
Lazy.force block.operation_hashes >>= fun ophs ->
|
||||
Lwt_list.iter_p
|
||||
(Lwt_list.iter_p (fun hash ->
|
||||
Distributed_db.Operation.commit net_db hash))
|
||||
block.operations >>= fun () ->
|
||||
ophs >>= fun () ->
|
||||
return (Ok block, true)
|
||||
end
|
||||
| Error err ->
|
||||
|
Loading…
Reference in New Issue
Block a user