Shell/RPC: split out Monitor_services

This commit is contained in:
Grégoire Henry 2018-04-21 12:57:30 +02:00 committed by Benjamin Canou
parent c175cd1c65
commit bbf5c7408b
15 changed files with 237 additions and 186 deletions

View File

@ -81,7 +81,7 @@ let wait_for_operation_inclusion
end
end in
Shell_services.Monitor.heads ctxt chain >>=? fun (stream, stop) ->
Monitor_services.heads ctxt chain >>=? fun (stream, stop) ->
Lwt_stream.get stream >>= function
| None -> assert false
| Some head ->

View File

@ -133,7 +133,10 @@ let rpc_directory =
!dir
let build_rpc_directory state validator =
let build_rpc_directory validator =
let distributed_db = Validator.distributed_db validator in
let state = Distributed_db.state distributed_db in
let dir = ref rpc_directory in

View File

@ -12,4 +12,4 @@ val get_chain: State.t -> Chain_services.chain -> State.Chain.t Lwt.t
val rpc_directory: State.Chain.t Lwt.t RPC_directory.t
val build_rpc_directory: State.t -> Validator.t -> unit RPC_directory.t
val build_rpc_directory: Validator.t -> unit RPC_directory.t

View File

@ -0,0 +1,96 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
let build_rpc_directory validator =
let distributed_db = Validator.distributed_db validator in
let state = Distributed_db.state distributed_db in
let dir : unit RPC_directory.t ref = ref RPC_directory.empty in
let gen_register0 s f =
dir := RPC_directory.gen_register !dir s (fun () p q -> f p q) in
let gen_register1 s f =
dir := RPC_directory.gen_register !dir s (fun ((), a) p q -> f a p q) in
gen_register0 Monitor_services.S.valid_blocks begin fun q () ->
let block_stream, stopper = State.watcher state in
let shutdown () = Lwt_watcher.shutdown stopper in
let in_chains block =
Lwt_list.map_p (Chain_directory.get_chain_id state) q#chains >>= function
| [] -> Lwt.return_true
| chains ->
let chain_id = State.Block.chain_id block in
Lwt.return (List.exists (Chain_id.equal chain_id) chains) in
let in_protocols block =
match q#protocols with
| [] -> Lwt.return_true
| protocols ->
State.Block.predecessor block >>= function
| None -> Lwt.return_false (* won't happen *)
| Some pred ->
State.Block.context pred >>= fun context ->
Context.get_protocol context >>= fun protocol ->
Lwt.return (List.exists (Protocol_hash.equal protocol) protocols) in
let in_next_protocols block =
match q#next_protocols with
| [] -> Lwt.return_true
| protocols ->
State.Block.context block >>= fun context ->
Context.get_protocol context >>= fun next_protocol ->
Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in
let stream =
Lwt_stream.filter_map_s
(fun block ->
in_chains block >>= fun in_chains ->
in_next_protocols block >>= fun in_next_protocols ->
in_protocols block >>= fun in_protocols ->
if in_chains && in_protocols && in_next_protocols then
Lwt.return_some
(State.Block.chain_id block, State.Block.hash block)
else
Lwt.return_none)
block_stream in
let next () = Lwt_stream.get stream in
RPC_answer.return_stream { next ; shutdown }
end ;
gen_register1 Monitor_services.S.heads begin fun chain q () ->
(* TODO: when `chain = `Test`, should we reset then stream when
the `testnet` change, or dias we currently do ?? *)
Chain_directory.get_chain state chain >>= fun chain ->
Validator.get_exn validator (State.Chain.id chain) >>= fun chain_validator ->
let block_stream, stopper = Chain_validator.new_head_watcher chain_validator in
Chain.head chain >>= fun head ->
let shutdown () = Lwt_watcher.shutdown stopper in
let in_next_protocols block =
match q#next_protocols with
| [] -> Lwt.return_true
| protocols ->
State.Block.context block >>= fun context ->
Context.get_protocol context >>= fun next_protocol ->
Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in
let stream =
Lwt_stream.filter_map_s
(fun block ->
in_next_protocols block >>= fun in_next_protocols ->
if in_next_protocols then
Lwt.return_some (State.Block.hash block)
else
Lwt.return_none)
block_stream in
let first_call = ref true in
let next () =
if !first_call then begin
first_call := false ; Lwt.return_some (State.Block.hash head)
end else
Lwt_stream.get stream in
RPC_answer.return_stream { next ; shutdown }
end ;
!dir

View File

@ -0,0 +1,10 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
val build_rpc_directory: Validator.t -> unit RPC_directory.t

View File

@ -139,9 +139,10 @@ let build_rpc_directory node =
dir := RPC_directory.register !dir s (fun () p q -> f p q) in
merge (Protocol_directory.build_rpc_directory node.state node.distributed_db) ;
merge (Monitor_directory.build_rpc_directory node.validator) ;
merge (Shell_directory.build_rpc_directory
node.state node.validator node.mainchain_validator) ;
merge (Chain_directory.build_rpc_directory node.state node.validator) ;
node.validator node.mainchain_validator) ;
merge (Chain_directory.build_rpc_directory node.validator) ;
merge (P2p.build_rpc_directory node.p2p) ;
merge Worker_directory.rpc_directory ;

View File

@ -42,15 +42,16 @@ let inject_protocol state ?force:_ proto =
in
Lwt.return (hash, validation)
let build_rpc_directory state validator mainchain_validator =
let build_rpc_directory validator mainchain_validator =
let distributed_db = Validator.distributed_db validator in
let state = Distributed_db.state distributed_db in
let dir : unit RPC_directory.t ref = ref RPC_directory.empty in
let gen_register0 s f =
dir := RPC_directory.gen_register !dir s (fun () p q -> f p q) in
let register0 s f =
dir := RPC_directory.register !dir s (fun () p q -> f p q) in
let gen_register1 s f =
dir := RPC_directory.gen_register !dir s (fun ((), a) p q -> f a p q) in
register0 Shell_services.S.forge_block_header begin fun () header ->
return (Data_encoding.Binary.to_bytes_exn Block_header.encoding header)
@ -101,79 +102,4 @@ let build_rpc_directory state validator mainchain_validator =
RPC_answer.return_stream { next ; shutdown }
end ;
gen_register0 Shell_services.S.Monitor.valid_blocks begin fun q () ->
let block_stream, stopper = State.watcher state in
let shutdown () = Lwt_watcher.shutdown stopper in
let in_chains block =
Lwt_list.map_p (Chain_directory.get_chain_id state) q#chains >>= function
| [] -> Lwt.return_true
| chains ->
let chain_id = State.Block.chain_id block in
Lwt.return (List.exists (Chain_id.equal chain_id) chains) in
let in_protocols block =
match q#protocols with
| [] -> Lwt.return_true
| protocols ->
State.Block.predecessor block >>= function
| None -> Lwt.return_false (* won't happen *)
| Some pred ->
State.Block.context pred >>= fun context ->
Context.get_protocol context >>= fun protocol ->
Lwt.return (List.exists (Protocol_hash.equal protocol) protocols) in
let in_next_protocols block =
match q#next_protocols with
| [] -> Lwt.return_true
| protocols ->
State.Block.context block >>= fun context ->
Context.get_protocol context >>= fun next_protocol ->
Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in
let stream =
Lwt_stream.filter_map_s
(fun block ->
in_chains block >>= fun in_chains ->
in_next_protocols block >>= fun in_next_protocols ->
in_protocols block >>= fun in_protocols ->
if in_chains && in_protocols && in_next_protocols then
Lwt.return_some
(State.Block.chain_id block, State.Block.hash block)
else
Lwt.return_none)
block_stream in
let next () = Lwt_stream.get stream in
RPC_answer.return_stream { next ; shutdown }
end ;
gen_register1 Shell_services.S.Monitor.heads begin fun chain q () ->
(* TODO: when `chain = `Test`, should we reset then stream when
the `testnet` change, or dias we currently do ?? *)
Chain_directory.get_chain state chain >>= fun chain ->
Validator.get_exn validator (State.Chain.id chain) >>= fun chain_validator ->
let block_stream, stopper = Chain_validator.new_head_watcher chain_validator in
Chain.head chain >>= fun head ->
let shutdown () = Lwt_watcher.shutdown stopper in
let in_next_protocols block =
match q#next_protocols with
| [] -> Lwt.return_true
| protocols ->
State.Block.context block >>= fun context ->
Context.get_protocol context >>= fun next_protocol ->
Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in
let stream =
Lwt_stream.filter_map_s
(fun block ->
in_next_protocols block >>= fun in_next_protocols ->
if in_next_protocols then
Lwt.return_some (State.Block.hash block)
else
Lwt.return_none)
block_stream in
let first_call = ref true in
let next () =
if !first_call then begin
first_call := false ; Lwt.return_some (State.Block.hash head)
end else
Lwt_stream.get stream in
RPC_answer.return_stream { next ; shutdown }
end ;
!dir

View File

@ -8,4 +8,4 @@
(**************************************************************************)
val build_rpc_directory:
State.t -> Validator.t -> Chain_validator.t -> unit RPC_directory.t
Validator.t -> Chain_validator.t -> unit RPC_directory.t

View File

@ -131,3 +131,5 @@ let inject_operation v ?chain_id op =
match pv_opt with
| Some pv -> Prevalidator.inject_operation pv op
| None -> failwith "Prevalidator is not running, cannot inject the operation."
let distributed_db { db } = db

View File

@ -46,3 +46,5 @@ val inject_operation:
t ->
?chain_id:Chain_id.t ->
Operation.t -> unit tzresult Lwt.t
val distributed_db: t -> Distributed_db.t

View File

@ -0,0 +1,70 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
module S = struct
open Data_encoding
let path = RPC_path.(root / "monitor")
let valid_blocks_query =
let open RPC_query in
query (fun protocols next_protocols chains -> object
method protocols = protocols
method next_protocols = next_protocols
method chains = chains
end)
|+ multi_field "protocol"
Protocol_hash.rpc_arg (fun t -> t#protocols)
|+ multi_field "next_protocol"
Protocol_hash.rpc_arg (fun t -> t#next_protocols)
|+ multi_field "chain"
Chain_services.chain_arg (fun t -> t#chains)
|> seal
let valid_blocks =
RPC_service.get_service
~description:""
~query: valid_blocks_query
~output: (obj2
(req "chain_id" Chain_id.encoding)
(req "hash" Block_hash.encoding))
RPC_path.(path / "valid_blocks")
let heads_query =
let open RPC_query in
query (fun next_protocols -> object
method next_protocols = next_protocols
end)
|+ multi_field "next_protocol"
Protocol_hash.rpc_arg (fun t -> t#next_protocols)
|> seal
let heads =
RPC_service.get_service
~description:""
~query: heads_query
~output: Block_hash.encoding
RPC_path.(path / "heads" /: Chain_services.chain_arg)
end
open RPC_context
let valid_blocks
ctxt ?(chains = [`Main]) ?(protocols = []) ?(next_protocols = []) () =
make_streamed_call S.valid_blocks ctxt () (object
method chains = chains
method protocols = protocols
method next_protocols = next_protocols
end) ()
let heads ctxt ?(next_protocols = []) chain =
make_streamed_call S.heads ctxt ((), chain) (object
method next_protocols = next_protocols
end) ()

View File

@ -0,0 +1,41 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open RPC_context
val valid_blocks:
#streamed ->
?chains:Chain_services.chain list ->
?protocols:Protocol_hash.t list ->
?next_protocols:Protocol_hash.t list ->
unit -> ((Chain_id.t * Block_hash.t) Lwt_stream.t * stopper) tzresult Lwt.t
val heads:
#streamed ->
?next_protocols:Protocol_hash.t list ->
Chain_services.chain ->
(Block_hash.t Lwt_stream.t * stopper) tzresult Lwt.t
module S : sig
val valid_blocks:
([ `GET ], unit,
unit, < chains : Chain_services.chain list;
next_protocols : Protocol_hash.t list;
protocols : Protocol_hash.t list >, unit,
Chain_id.t * Block_hash.t) RPC_service.t
val heads:
([ `GET ], unit,
unit * Chain_services.chain,
< next_protocols : Protocol_hash.t list >, unit,
Block_hash.t) RPC_service.t
end

View File

@ -124,52 +124,6 @@ module S = struct
(req "timestamp" Time.encoding))
RPC_path.(root / "bootstrapped")
module Monitor = struct
let path = RPC_path.(root / "monitor")
let valid_blocks_query =
let open RPC_query in
query (fun protocols next_protocols chains -> object
method protocols = protocols
method next_protocols = next_protocols
method chains = chains
end)
|+ multi_field "protocol"
Protocol_hash.rpc_arg (fun t -> t#protocols)
|+ multi_field "next_protocol"
Protocol_hash.rpc_arg (fun t -> t#next_protocols)
|+ multi_field "chain"
Chain_services.chain_arg (fun t -> t#chains)
|> seal
let valid_blocks =
RPC_service.get_service
~description:""
~query: valid_blocks_query
~output: (obj2
(req "chain_id" Chain_id.encoding)
(req "hash" Block_hash.encoding))
RPC_path.(path / "valid_blocks")
let heads_query =
let open RPC_query in
query (fun next_protocols -> object
method next_protocols = next_protocols
end)
|+ multi_field "next_protocol"
Protocol_hash.rpc_arg (fun t -> t#next_protocols)
|> seal
let heads =
RPC_service.get_service
~description:""
~query: heads_query
~output: Block_hash.encoding
RPC_path.(path / "heads" /: Chain_services.chain_arg)
end
end
open RPC_context
@ -193,22 +147,3 @@ let inject_protocol ctxt ?(async = false) ?force protocol =
let bootstrapped ctxt =
make_streamed_call S.bootstrapped ctxt () () ()
module Monitor = struct
module S = S.Monitor
let valid_blocks
ctxt ?(chains = [`Main]) ?(protocols = []) ?(next_protocols = []) () =
make_streamed_call S.valid_blocks ctxt () (object
method chains = chains
method protocols = protocols
method next_protocols = next_protocols
end) ()
let heads ctxt ?(next_protocols = []) chain =
make_streamed_call S.heads ctxt ((), chain) (object
method next_protocols = next_protocols
end) ()
end

View File

@ -7,7 +7,6 @@
(* *)
(**************************************************************************)
open RPC_context
val forge_block_header:
@ -41,23 +40,6 @@ val inject_protocol:
val bootstrapped:
#streamed -> ((Block_hash.t * Time.t) Lwt_stream.t * stopper) tzresult Lwt.t
module Monitor : sig
val valid_blocks:
#streamed ->
?chains:Chain_services.chain list ->
?protocols:Protocol_hash.t list ->
?next_protocols:Protocol_hash.t list ->
unit -> ((Chain_id.t * Block_hash.t) Lwt_stream.t * stopper) tzresult Lwt.t
val heads:
#streamed ->
?next_protocols:Protocol_hash.t list ->
Chain_services.chain ->
(Block_hash.t Lwt_stream.t * stopper) tzresult Lwt.t
end
module S : sig
val forge_block_header:
@ -93,21 +75,4 @@ module S : sig
unit, unit, unit,
Block_hash.t * Time.t) RPC_service.t
module Monitor : sig
val valid_blocks:
([ `GET ], unit,
unit, < chains : Chain_services.chain list;
next_protocols : Protocol_hash.t list;
protocols : Protocol_hash.t list >, unit,
Chain_id.t * Block_hash.t) RPC_service.t
val heads:
([ `GET ], unit,
unit * Chain_services.chain,
< next_protocols : Protocol_hash.t list >, unit,
Block_hash.t) RPC_service.t
end
end

View File

@ -35,14 +35,14 @@ let info cctxt ?(chain = `Main) block =
timestamp ; protocol ; next_protocol ; level }
let monitor_valid_blocks cctxt ?chains ?protocols ?next_protocols () =
Shell_services.Monitor.valid_blocks cctxt
Monitor_services.valid_blocks cctxt
?chains ?protocols ?next_protocols () >>=? fun (block_stream, _stop) ->
return (Lwt_stream.map_s
(fun (chain, block) ->
info cctxt ~chain:(`Hash chain) (`Hash (block, 0))) block_stream)
let monitor_heads cctxt ?next_protocols chain =
Shell_services.Monitor.heads
Monitor_services.heads
cctxt ?next_protocols chain >>=? fun (block_stream, _stop) ->
return (Lwt_stream.map_s
(fun block -> info cctxt ~chain (`Hash (block, 0)))