From bbf5c7408bdc9b34d99b0892b07ea14f083e73ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sat, 21 Apr 2018 12:57:30 +0200 Subject: [PATCH] Shell/RPC: split out `Monitor_services` --- src/lib_client_base/client_confirmations.ml | 2 +- src/lib_shell/chain_directory.ml | 5 +- src/lib_shell/chain_directory.mli | 2 +- src/lib_shell/monitor_directory.ml | 96 +++++++++++++++++++ src/lib_shell/monitor_directory.mli | 10 ++ src/lib_shell/node.ml | 5 +- src/lib_shell/shell_directory.ml | 82 +--------------- src/lib_shell/shell_directory.mli | 2 +- src/lib_shell/validator.ml | 2 + src/lib_shell/validator.mli | 2 + src/lib_shell_services/monitor_services.ml | 70 ++++++++++++++ src/lib_shell_services/monitor_services.mli | 41 ++++++++ src/lib_shell_services/shell_services.ml | 65 ------------- src/lib_shell_services/shell_services.mli | 35 ------- .../lib_baking/client_baking_blocks.ml | 4 +- 15 files changed, 237 insertions(+), 186 deletions(-) create mode 100644 src/lib_shell/monitor_directory.ml create mode 100644 src/lib_shell/monitor_directory.mli create mode 100644 src/lib_shell_services/monitor_services.ml create mode 100644 src/lib_shell_services/monitor_services.mli diff --git a/src/lib_client_base/client_confirmations.ml b/src/lib_client_base/client_confirmations.ml index 2042f1729..f87941c9a 100644 --- a/src/lib_client_base/client_confirmations.ml +++ b/src/lib_client_base/client_confirmations.ml @@ -81,7 +81,7 @@ let wait_for_operation_inclusion end end in - Shell_services.Monitor.heads ctxt chain >>=? fun (stream, stop) -> + Monitor_services.heads ctxt chain >>=? fun (stream, stop) -> Lwt_stream.get stream >>= function | None -> assert false | Some head -> diff --git a/src/lib_shell/chain_directory.ml b/src/lib_shell/chain_directory.ml index 4930ecc62..dae314910 100644 --- a/src/lib_shell/chain_directory.ml +++ b/src/lib_shell/chain_directory.ml @@ -133,7 +133,10 @@ let rpc_directory = !dir -let build_rpc_directory state validator = +let build_rpc_directory validator = + + let distributed_db = Validator.distributed_db validator in + let state = Distributed_db.state distributed_db in let dir = ref rpc_directory in diff --git a/src/lib_shell/chain_directory.mli b/src/lib_shell/chain_directory.mli index 7baee7024..f231e5865 100644 --- a/src/lib_shell/chain_directory.mli +++ b/src/lib_shell/chain_directory.mli @@ -12,4 +12,4 @@ val get_chain: State.t -> Chain_services.chain -> State.Chain.t Lwt.t val rpc_directory: State.Chain.t Lwt.t RPC_directory.t -val build_rpc_directory: State.t -> Validator.t -> unit RPC_directory.t +val build_rpc_directory: Validator.t -> unit RPC_directory.t diff --git a/src/lib_shell/monitor_directory.ml b/src/lib_shell/monitor_directory.ml new file mode 100644 index 000000000..8d7ff6e6e --- /dev/null +++ b/src/lib_shell/monitor_directory.ml @@ -0,0 +1,96 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +let build_rpc_directory validator = + + let distributed_db = Validator.distributed_db validator in + let state = Distributed_db.state distributed_db in + + let dir : unit RPC_directory.t ref = ref RPC_directory.empty in + let gen_register0 s f = + dir := RPC_directory.gen_register !dir s (fun () p q -> f p q) in + let gen_register1 s f = + dir := RPC_directory.gen_register !dir s (fun ((), a) p q -> f a p q) in + + gen_register0 Monitor_services.S.valid_blocks begin fun q () -> + let block_stream, stopper = State.watcher state in + let shutdown () = Lwt_watcher.shutdown stopper in + let in_chains block = + Lwt_list.map_p (Chain_directory.get_chain_id state) q#chains >>= function + | [] -> Lwt.return_true + | chains -> + let chain_id = State.Block.chain_id block in + Lwt.return (List.exists (Chain_id.equal chain_id) chains) in + let in_protocols block = + match q#protocols with + | [] -> Lwt.return_true + | protocols -> + State.Block.predecessor block >>= function + | None -> Lwt.return_false (* won't happen *) + | Some pred -> + State.Block.context pred >>= fun context -> + Context.get_protocol context >>= fun protocol -> + Lwt.return (List.exists (Protocol_hash.equal protocol) protocols) in + let in_next_protocols block = + match q#next_protocols with + | [] -> Lwt.return_true + | protocols -> + State.Block.context block >>= fun context -> + Context.get_protocol context >>= fun next_protocol -> + Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in + let stream = + Lwt_stream.filter_map_s + (fun block -> + in_chains block >>= fun in_chains -> + in_next_protocols block >>= fun in_next_protocols -> + in_protocols block >>= fun in_protocols -> + if in_chains && in_protocols && in_next_protocols then + Lwt.return_some + (State.Block.chain_id block, State.Block.hash block) + else + Lwt.return_none) + block_stream in + let next () = Lwt_stream.get stream in + RPC_answer.return_stream { next ; shutdown } + end ; + + gen_register1 Monitor_services.S.heads begin fun chain q () -> + (* TODO: when `chain = `Test`, should we reset then stream when + the `testnet` change, or dias we currently do ?? *) + Chain_directory.get_chain state chain >>= fun chain -> + Validator.get_exn validator (State.Chain.id chain) >>= fun chain_validator -> + let block_stream, stopper = Chain_validator.new_head_watcher chain_validator in + Chain.head chain >>= fun head -> + let shutdown () = Lwt_watcher.shutdown stopper in + let in_next_protocols block = + match q#next_protocols with + | [] -> Lwt.return_true + | protocols -> + State.Block.context block >>= fun context -> + Context.get_protocol context >>= fun next_protocol -> + Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in + let stream = + Lwt_stream.filter_map_s + (fun block -> + in_next_protocols block >>= fun in_next_protocols -> + if in_next_protocols then + Lwt.return_some (State.Block.hash block) + else + Lwt.return_none) + block_stream in + let first_call = ref true in + let next () = + if !first_call then begin + first_call := false ; Lwt.return_some (State.Block.hash head) + end else + Lwt_stream.get stream in + RPC_answer.return_stream { next ; shutdown } + end ; + + !dir diff --git a/src/lib_shell/monitor_directory.mli b/src/lib_shell/monitor_directory.mli new file mode 100644 index 000000000..73a48c55b --- /dev/null +++ b/src/lib_shell/monitor_directory.mli @@ -0,0 +1,10 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +val build_rpc_directory: Validator.t -> unit RPC_directory.t diff --git a/src/lib_shell/node.ml b/src/lib_shell/node.ml index a0876035d..ac390e620 100644 --- a/src/lib_shell/node.ml +++ b/src/lib_shell/node.ml @@ -139,9 +139,10 @@ let build_rpc_directory node = dir := RPC_directory.register !dir s (fun () p q -> f p q) in merge (Protocol_directory.build_rpc_directory node.state node.distributed_db) ; + merge (Monitor_directory.build_rpc_directory node.validator) ; merge (Shell_directory.build_rpc_directory - node.state node.validator node.mainchain_validator) ; - merge (Chain_directory.build_rpc_directory node.state node.validator) ; + node.validator node.mainchain_validator) ; + merge (Chain_directory.build_rpc_directory node.validator) ; merge (P2p.build_rpc_directory node.p2p) ; merge Worker_directory.rpc_directory ; diff --git a/src/lib_shell/shell_directory.ml b/src/lib_shell/shell_directory.ml index 298cafcbb..39661fed5 100644 --- a/src/lib_shell/shell_directory.ml +++ b/src/lib_shell/shell_directory.ml @@ -42,15 +42,16 @@ let inject_protocol state ?force:_ proto = in Lwt.return (hash, validation) -let build_rpc_directory state validator mainchain_validator = +let build_rpc_directory validator mainchain_validator = + + let distributed_db = Validator.distributed_db validator in + let state = Distributed_db.state distributed_db in let dir : unit RPC_directory.t ref = ref RPC_directory.empty in let gen_register0 s f = dir := RPC_directory.gen_register !dir s (fun () p q -> f p q) in let register0 s f = dir := RPC_directory.register !dir s (fun () p q -> f p q) in - let gen_register1 s f = - dir := RPC_directory.gen_register !dir s (fun ((), a) p q -> f a p q) in register0 Shell_services.S.forge_block_header begin fun () header -> return (Data_encoding.Binary.to_bytes_exn Block_header.encoding header) @@ -101,79 +102,4 @@ let build_rpc_directory state validator mainchain_validator = RPC_answer.return_stream { next ; shutdown } end ; - gen_register0 Shell_services.S.Monitor.valid_blocks begin fun q () -> - let block_stream, stopper = State.watcher state in - let shutdown () = Lwt_watcher.shutdown stopper in - let in_chains block = - Lwt_list.map_p (Chain_directory.get_chain_id state) q#chains >>= function - | [] -> Lwt.return_true - | chains -> - let chain_id = State.Block.chain_id block in - Lwt.return (List.exists (Chain_id.equal chain_id) chains) in - let in_protocols block = - match q#protocols with - | [] -> Lwt.return_true - | protocols -> - State.Block.predecessor block >>= function - | None -> Lwt.return_false (* won't happen *) - | Some pred -> - State.Block.context pred >>= fun context -> - Context.get_protocol context >>= fun protocol -> - Lwt.return (List.exists (Protocol_hash.equal protocol) protocols) in - let in_next_protocols block = - match q#next_protocols with - | [] -> Lwt.return_true - | protocols -> - State.Block.context block >>= fun context -> - Context.get_protocol context >>= fun next_protocol -> - Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in - let stream = - Lwt_stream.filter_map_s - (fun block -> - in_chains block >>= fun in_chains -> - in_next_protocols block >>= fun in_next_protocols -> - in_protocols block >>= fun in_protocols -> - if in_chains && in_protocols && in_next_protocols then - Lwt.return_some - (State.Block.chain_id block, State.Block.hash block) - else - Lwt.return_none) - block_stream in - let next () = Lwt_stream.get stream in - RPC_answer.return_stream { next ; shutdown } - end ; - - gen_register1 Shell_services.S.Monitor.heads begin fun chain q () -> - (* TODO: when `chain = `Test`, should we reset then stream when - the `testnet` change, or dias we currently do ?? *) - Chain_directory.get_chain state chain >>= fun chain -> - Validator.get_exn validator (State.Chain.id chain) >>= fun chain_validator -> - let block_stream, stopper = Chain_validator.new_head_watcher chain_validator in - Chain.head chain >>= fun head -> - let shutdown () = Lwt_watcher.shutdown stopper in - let in_next_protocols block = - match q#next_protocols with - | [] -> Lwt.return_true - | protocols -> - State.Block.context block >>= fun context -> - Context.get_protocol context >>= fun next_protocol -> - Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in - let stream = - Lwt_stream.filter_map_s - (fun block -> - in_next_protocols block >>= fun in_next_protocols -> - if in_next_protocols then - Lwt.return_some (State.Block.hash block) - else - Lwt.return_none) - block_stream in - let first_call = ref true in - let next () = - if !first_call then begin - first_call := false ; Lwt.return_some (State.Block.hash head) - end else - Lwt_stream.get stream in - RPC_answer.return_stream { next ; shutdown } - end ; - !dir diff --git a/src/lib_shell/shell_directory.mli b/src/lib_shell/shell_directory.mli index 446422f0b..2b441a6f0 100644 --- a/src/lib_shell/shell_directory.mli +++ b/src/lib_shell/shell_directory.mli @@ -8,4 +8,4 @@ (**************************************************************************) val build_rpc_directory: - State.t -> Validator.t -> Chain_validator.t -> unit RPC_directory.t + Validator.t -> Chain_validator.t -> unit RPC_directory.t diff --git a/src/lib_shell/validator.ml b/src/lib_shell/validator.ml index 731d05868..eb43bf2b1 100644 --- a/src/lib_shell/validator.ml +++ b/src/lib_shell/validator.ml @@ -131,3 +131,5 @@ let inject_operation v ?chain_id op = match pv_opt with | Some pv -> Prevalidator.inject_operation pv op | None -> failwith "Prevalidator is not running, cannot inject the operation." + +let distributed_db { db } = db diff --git a/src/lib_shell/validator.mli b/src/lib_shell/validator.mli index cc4a49df6..8a88de8f0 100644 --- a/src/lib_shell/validator.mli +++ b/src/lib_shell/validator.mli @@ -46,3 +46,5 @@ val inject_operation: t -> ?chain_id:Chain_id.t -> Operation.t -> unit tzresult Lwt.t + +val distributed_db: t -> Distributed_db.t diff --git a/src/lib_shell_services/monitor_services.ml b/src/lib_shell_services/monitor_services.ml new file mode 100644 index 000000000..ad1d1943f --- /dev/null +++ b/src/lib_shell_services/monitor_services.ml @@ -0,0 +1,70 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module S = struct + + open Data_encoding + let path = RPC_path.(root / "monitor") + + let valid_blocks_query = + let open RPC_query in + query (fun protocols next_protocols chains -> object + method protocols = protocols + method next_protocols = next_protocols + method chains = chains + end) + |+ multi_field "protocol" + Protocol_hash.rpc_arg (fun t -> t#protocols) + |+ multi_field "next_protocol" + Protocol_hash.rpc_arg (fun t -> t#next_protocols) + |+ multi_field "chain" + Chain_services.chain_arg (fun t -> t#chains) + |> seal + + let valid_blocks = + RPC_service.get_service + ~description:"" + ~query: valid_blocks_query + ~output: (obj2 + (req "chain_id" Chain_id.encoding) + (req "hash" Block_hash.encoding)) + RPC_path.(path / "valid_blocks") + + let heads_query = + let open RPC_query in + query (fun next_protocols -> object + method next_protocols = next_protocols + end) + |+ multi_field "next_protocol" + Protocol_hash.rpc_arg (fun t -> t#next_protocols) + |> seal + + let heads = + RPC_service.get_service + ~description:"" + ~query: heads_query + ~output: Block_hash.encoding + RPC_path.(path / "heads" /: Chain_services.chain_arg) + +end + +open RPC_context + +let valid_blocks + ctxt ?(chains = [`Main]) ?(protocols = []) ?(next_protocols = []) () = + make_streamed_call S.valid_blocks ctxt () (object + method chains = chains + method protocols = protocols + method next_protocols = next_protocols + end) () + +let heads ctxt ?(next_protocols = []) chain = + make_streamed_call S.heads ctxt ((), chain) (object + method next_protocols = next_protocols + end) () diff --git a/src/lib_shell_services/monitor_services.mli b/src/lib_shell_services/monitor_services.mli new file mode 100644 index 000000000..7933ed0a3 --- /dev/null +++ b/src/lib_shell_services/monitor_services.mli @@ -0,0 +1,41 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open RPC_context + +val valid_blocks: + #streamed -> + ?chains:Chain_services.chain list -> + ?protocols:Protocol_hash.t list -> + ?next_protocols:Protocol_hash.t list -> + unit -> ((Chain_id.t * Block_hash.t) Lwt_stream.t * stopper) tzresult Lwt.t + +val heads: + #streamed -> + ?next_protocols:Protocol_hash.t list -> + Chain_services.chain -> + (Block_hash.t Lwt_stream.t * stopper) tzresult Lwt.t + +module S : sig + + val valid_blocks: + ([ `GET ], unit, + unit, < chains : Chain_services.chain list; + next_protocols : Protocol_hash.t list; + protocols : Protocol_hash.t list >, unit, + Chain_id.t * Block_hash.t) RPC_service.t + + val heads: + ([ `GET ], unit, + unit * Chain_services.chain, + < next_protocols : Protocol_hash.t list >, unit, + Block_hash.t) RPC_service.t + +end + diff --git a/src/lib_shell_services/shell_services.ml b/src/lib_shell_services/shell_services.ml index ef7de0817..ed151fe21 100644 --- a/src/lib_shell_services/shell_services.ml +++ b/src/lib_shell_services/shell_services.ml @@ -124,52 +124,6 @@ module S = struct (req "timestamp" Time.encoding)) RPC_path.(root / "bootstrapped") - module Monitor = struct - - let path = RPC_path.(root / "monitor") - - let valid_blocks_query = - let open RPC_query in - query (fun protocols next_protocols chains -> object - method protocols = protocols - method next_protocols = next_protocols - method chains = chains - end) - |+ multi_field "protocol" - Protocol_hash.rpc_arg (fun t -> t#protocols) - |+ multi_field "next_protocol" - Protocol_hash.rpc_arg (fun t -> t#next_protocols) - |+ multi_field "chain" - Chain_services.chain_arg (fun t -> t#chains) - |> seal - - let valid_blocks = - RPC_service.get_service - ~description:"" - ~query: valid_blocks_query - ~output: (obj2 - (req "chain_id" Chain_id.encoding) - (req "hash" Block_hash.encoding)) - RPC_path.(path / "valid_blocks") - - let heads_query = - let open RPC_query in - query (fun next_protocols -> object - method next_protocols = next_protocols - end) - |+ multi_field "next_protocol" - Protocol_hash.rpc_arg (fun t -> t#next_protocols) - |> seal - - let heads = - RPC_service.get_service - ~description:"" - ~query: heads_query - ~output: Block_hash.encoding - RPC_path.(path / "heads" /: Chain_services.chain_arg) - - end - end open RPC_context @@ -193,22 +147,3 @@ let inject_protocol ctxt ?(async = false) ?force protocol = let bootstrapped ctxt = make_streamed_call S.bootstrapped ctxt () () () - -module Monitor = struct - - module S = S.Monitor - - let valid_blocks - ctxt ?(chains = [`Main]) ?(protocols = []) ?(next_protocols = []) () = - make_streamed_call S.valid_blocks ctxt () (object - method chains = chains - method protocols = protocols - method next_protocols = next_protocols - end) () - - let heads ctxt ?(next_protocols = []) chain = - make_streamed_call S.heads ctxt ((), chain) (object - method next_protocols = next_protocols - end) () - -end diff --git a/src/lib_shell_services/shell_services.mli b/src/lib_shell_services/shell_services.mli index 97ef52c2b..e0d9ef87a 100644 --- a/src/lib_shell_services/shell_services.mli +++ b/src/lib_shell_services/shell_services.mli @@ -7,7 +7,6 @@ (* *) (**************************************************************************) - open RPC_context val forge_block_header: @@ -41,23 +40,6 @@ val inject_protocol: val bootstrapped: #streamed -> ((Block_hash.t * Time.t) Lwt_stream.t * stopper) tzresult Lwt.t -module Monitor : sig - - val valid_blocks: - #streamed -> - ?chains:Chain_services.chain list -> - ?protocols:Protocol_hash.t list -> - ?next_protocols:Protocol_hash.t list -> - unit -> ((Chain_id.t * Block_hash.t) Lwt_stream.t * stopper) tzresult Lwt.t - - val heads: - #streamed -> - ?next_protocols:Protocol_hash.t list -> - Chain_services.chain -> - (Block_hash.t Lwt_stream.t * stopper) tzresult Lwt.t - -end - module S : sig val forge_block_header: @@ -93,21 +75,4 @@ module S : sig unit, unit, unit, Block_hash.t * Time.t) RPC_service.t - module Monitor : sig - - val valid_blocks: - ([ `GET ], unit, - unit, < chains : Chain_services.chain list; - next_protocols : Protocol_hash.t list; - protocols : Protocol_hash.t list >, unit, - Chain_id.t * Block_hash.t) RPC_service.t - - val heads: - ([ `GET ], unit, - unit * Chain_services.chain, - < next_protocols : Protocol_hash.t list >, unit, - Block_hash.t) RPC_service.t - - end - end diff --git a/src/proto_alpha/lib_baking/client_baking_blocks.ml b/src/proto_alpha/lib_baking/client_baking_blocks.ml index 0e0e042e1..01d332f4c 100644 --- a/src/proto_alpha/lib_baking/client_baking_blocks.ml +++ b/src/proto_alpha/lib_baking/client_baking_blocks.ml @@ -35,14 +35,14 @@ let info cctxt ?(chain = `Main) block = timestamp ; protocol ; next_protocol ; level } let monitor_valid_blocks cctxt ?chains ?protocols ?next_protocols () = - Shell_services.Monitor.valid_blocks cctxt + Monitor_services.valid_blocks cctxt ?chains ?protocols ?next_protocols () >>=? fun (block_stream, _stop) -> return (Lwt_stream.map_s (fun (chain, block) -> info cctxt ~chain:(`Hash chain) (`Hash (block, 0))) block_stream) let monitor_heads cctxt ?next_protocols chain = - Shell_services.Monitor.heads + Monitor_services.heads cctxt ?next_protocols chain >>=? fun (block_stream, _stop) -> return (Lwt_stream.map_s (fun block -> info cctxt ~chain (`Hash (block, 0)))