From 04c920df297a61451a83a82fc99c03183b75d4a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sat, 21 Apr 2018 13:09:59 +0200 Subject: [PATCH] Shell/RPC: use query parameters in `/protocols` --- src/bin_client/client_protocols_commands.ml | 4 +- src/lib_data_encoding/data_encoding.mli | 2 +- src/lib_data_encoding/json.ml | 6 ++- src/lib_data_encoding/json.mli | 2 +- src/lib_rpc_http/media_type.ml | 2 +- src/lib_shell/monitor_directory.ml | 7 ++++ src/lib_shell/node.ml | 2 +- src/lib_shell/protocol_directory.ml | 37 ++---------------- src/lib_shell/protocol_directory.mli | 2 +- src/lib_shell/state.ml | 6 +++ src/lib_shell/state.mli | 2 + src/lib_shell_services/monitor_services.ml | 10 +++++ src/lib_shell_services/monitor_services.mli | 9 +++++ src/lib_shell_services/protocol_services.ml | 41 +++----------------- src/lib_shell_services/protocol_services.mli | 21 +++------- 15 files changed, 60 insertions(+), 93 deletions(-) diff --git a/src/bin_client/client_protocols_commands.ml b/src/bin_client/client_protocols_commands.ml index 2ca54fe78..817494b39 100644 --- a/src/bin_client/client_protocols_commands.ml +++ b/src/bin_client/client_protocols_commands.ml @@ -25,8 +25,8 @@ let commands () = no_options (prefixes [ "list" ; "protocols" ] stop) (fun () (cctxt : #Client_context.full) -> - Protocol_services.list ~contents:false cctxt >>=? fun protos -> - Lwt_list.iter_s (fun (ph, _p) -> cctxt#message "%a" Protocol_hash.pp ph) protos >>= fun () -> + Protocol_services.list cctxt >>=? fun protos -> + Lwt_list.iter_s (fun ph -> cctxt#message "%a" Protocol_hash.pp ph) protos >>= fun () -> return () ); diff --git a/src/lib_data_encoding/data_encoding.mli b/src/lib_data_encoding/data_encoding.mli index bdb997942..4a07877d6 100644 --- a/src/lib_data_encoding/data_encoding.mli +++ b/src/lib_data_encoding/data_encoding.mli @@ -571,7 +571,7 @@ module Json: sig (** Write a JSON document to a string. This goes via an intermediate buffer and so may be slow on large documents. *) - val to_string : ?minify:bool -> json -> string + val to_string : ?newline:bool -> ?minify:bool -> json -> string val pp : Format.formatter -> json -> unit diff --git a/src/lib_data_encoding/json.ml b/src/lib_data_encoding/json.ml index d418ef505..a39b5425c 100644 --- a/src/lib_data_encoding/json.ml +++ b/src/lib_data_encoding/json.ml @@ -271,8 +271,10 @@ let to_root = function | `Null -> `O [] | oth -> `A [ oth ] -let to_string ?minify j = - Format.asprintf "%a" Json_repr.(pp ?compact:minify (module Ezjsonm)) j +let to_string ?(newline = false) ?minify j = + Format.asprintf "%a%s" + Json_repr.(pp ?compact:minify (module Ezjsonm)) j + (if newline then "\n" else "") let pp = Json_repr.(pp (module Ezjsonm)) diff --git a/src/lib_data_encoding/json.mli b/src/lib_data_encoding/json.mli index b28e85902..13f2989af 100644 --- a/src/lib_data_encoding/json.mli +++ b/src/lib_data_encoding/json.mli @@ -50,7 +50,7 @@ val wrap_error : ('a -> 'b) -> 'a -> 'b val from_string : string -> (json, string) result val from_stream : string Lwt_stream.t -> (json, string) result Lwt_stream.t -val to_string : ?minify:bool -> json -> string +val to_string : ?newline:bool -> ?minify:bool -> json -> string val pp : Format.formatter -> json -> unit val bytes_jsont: MBytes.t Json_encoding.encoding diff --git a/src/lib_rpc_http/media_type.ml b/src/lib_rpc_http/media_type.ml index f850e0aa2..030dc933d 100644 --- a/src/lib_rpc_http/media_type.ml +++ b/src/lib_rpc_http/media_type.ml @@ -24,7 +24,7 @@ let json = { Data_encoding.Json.pp ppf json end ; construct = begin fun enc v -> - Data_encoding.Json.to_string ~minify:true @@ + Data_encoding.Json.to_string ~newline:true ~minify:true @@ Data_encoding.Json.construct enc v end ; destruct = begin fun enc body -> diff --git a/src/lib_shell/monitor_directory.ml b/src/lib_shell/monitor_directory.ml index 8d7ff6e6e..03d9701e9 100644 --- a/src/lib_shell/monitor_directory.ml +++ b/src/lib_shell/monitor_directory.ml @@ -93,4 +93,11 @@ let build_rpc_directory validator = RPC_answer.return_stream { next ; shutdown } end ; + gen_register0 Monitor_services.S.protocols begin fun () () -> + let stream, stopper = State.Protocol.watcher state in + let shutdown () = Lwt_watcher.shutdown stopper in + let next () = Lwt_stream.get stream in + RPC_answer.return_stream { next ; shutdown } + end ; + !dir diff --git a/src/lib_shell/node.ml b/src/lib_shell/node.ml index ac390e620..cdd05ba6d 100644 --- a/src/lib_shell/node.ml +++ b/src/lib_shell/node.ml @@ -138,7 +138,7 @@ let build_rpc_directory node = let register0 s f = dir := RPC_directory.register !dir s (fun () p q -> f p q) in - merge (Protocol_directory.build_rpc_directory node.state node.distributed_db) ; + merge (Protocol_directory.build_rpc_directory node.state) ; merge (Monitor_directory.build_rpc_directory node.validator) ; merge (Shell_directory.build_rpc_directory node.validator node.mainchain_validator) ; diff --git a/src/lib_shell/protocol_directory.ml b/src/lib_shell/protocol_directory.ml index a01bc613d..0aff34ced 100644 --- a/src/lib_shell/protocol_directory.ml +++ b/src/lib_shell/protocol_directory.ml @@ -7,7 +7,7 @@ (* *) (**************************************************************************) -let build_rpc_directory state distributed_db = +let build_rpc_directory state = let dir : unit RPC_directory.t ref = ref RPC_directory.empty in let gen_register0 s f = @@ -15,40 +15,11 @@ let build_rpc_directory state distributed_db = let register1 s f = dir := RPC_directory.register !dir s (fun ((), a) p q -> f a p q) in - gen_register0 Protocol_services.S.list begin fun () p -> - let { Protocol_services.S.monitor ; contents } = p in - let monitor = match monitor with None -> false | Some x -> x in - let include_contents = match contents with None -> false | Some x -> x in + gen_register0 Protocol_services.S.list begin fun () () -> State.Protocol.list state >>= fun set -> let protocols = Protocol_hash.Set.elements set in - Lwt_list.map_p - (fun hash -> - if include_contents then - State.Protocol.read state hash >>= function - | Error _ -> Lwt.return (hash, None) - | Ok bytes -> Lwt.return (hash, Some bytes) - else - Lwt.return (hash, None)) - protocols >>= fun protocols -> - if not monitor then - RPC_answer.return protocols - else - let stream, stopper = - Distributed_db.Protocol.watch distributed_db in - let shutdown () = Lwt_watcher.shutdown stopper in - let first_request = ref true in - let next () = - if not !first_request then - Lwt_stream.get stream >>= function - | None -> Lwt.return_none - | Some (h, op) when include_contents -> Lwt.return (Some [h, Some op]) - | Some (h, _) -> Lwt.return (Some [h, None]) - else begin - first_request := false ; - Lwt.return (Some protocols) - end in - RPC_answer.return_stream { next ; shutdown } - end; + RPC_answer.return protocols + end ; register1 Protocol_services.S.contents begin fun hash () () -> State.Protocol.read state hash diff --git a/src/lib_shell/protocol_directory.mli b/src/lib_shell/protocol_directory.mli index b882ea375..059c61640 100644 --- a/src/lib_shell/protocol_directory.mli +++ b/src/lib_shell/protocol_directory.mli @@ -8,4 +8,4 @@ (**************************************************************************) val build_rpc_directory: - State.t -> Distributed_db.t -> unit RPC_directory.t + State.t -> unit RPC_directory.t diff --git a/src/lib_shell/state.ml b/src/lib_shell/state.ml index b0bd4cd58..0a6675cf1 100644 --- a/src/lib_shell/state.ml +++ b/src/lib_shell/state.ml @@ -24,6 +24,7 @@ type global_state = { global_data: global_data Shared.t ; protocol_store: Store.Protocol.store Shared.t ; main_chain: Chain_id.t ; + protocol_watcher: Protocol_hash.t Lwt_watcher.input ; block_watcher: block Lwt_watcher.input ; } @@ -841,6 +842,7 @@ module Protocol = struct Lwt.return None else Store.Protocol.RawContents.store (store, hash) bytes >>= fun () -> + Lwt_watcher.notify global_state.protocol_watcher hash ; Lwt.return (Some hash) end @@ -861,6 +863,9 @@ module Protocol = struct ~f:(fun x acc -> Lwt.return (Protocol_hash.Set.add x acc)) end + let watcher (state : global_state) = + Lwt_watcher.create_stream state.protocol_watcher + end module Current_mempool = struct @@ -903,6 +908,7 @@ let read global_data = Shared.create global_data ; protocol_store = Shared.create @@ Store.Protocol.get global_store ; main_chain ; + protocol_watcher = Lwt_watcher.create_input () ; block_watcher = Lwt_watcher.create_input () ; } in Chain.read_all state >>=? fun () -> diff --git a/src/lib_shell/state.mli b/src/lib_shell/state.mli index fe70970fd..33711fbf6 100644 --- a/src/lib_shell/state.mli +++ b/src/lib_shell/state.mli @@ -215,6 +215,8 @@ module Protocol : sig val list: global_state -> Protocol_hash.Set.t Lwt.t + val watcher: global_state -> Protocol_hash.t Lwt_stream.t * Lwt_watcher.stopper + end module Current_mempool : sig diff --git a/src/lib_shell_services/monitor_services.ml b/src/lib_shell_services/monitor_services.ml index ad1d1943f..9d22a0484 100644 --- a/src/lib_shell_services/monitor_services.ml +++ b/src/lib_shell_services/monitor_services.ml @@ -52,6 +52,13 @@ module S = struct ~output: Block_hash.encoding RPC_path.(path / "heads" /: Chain_services.chain_arg) + let protocols = + RPC_service.get_service + ~description:"...FIXME..." + ~query: RPC_query.empty + ~output: Protocol_hash.encoding + RPC_path.(path / "protocols") + end open RPC_context @@ -68,3 +75,6 @@ let heads ctxt ?(next_protocols = []) chain = make_streamed_call S.heads ctxt ((), chain) (object method next_protocols = next_protocols end) () + +let protocols ctxt = + make_streamed_call S.protocols ctxt () () () diff --git a/src/lib_shell_services/monitor_services.mli b/src/lib_shell_services/monitor_services.mli index 7933ed0a3..d47ae8b15 100644 --- a/src/lib_shell_services/monitor_services.mli +++ b/src/lib_shell_services/monitor_services.mli @@ -22,6 +22,10 @@ val heads: Chain_services.chain -> (Block_hash.t Lwt_stream.t * stopper) tzresult Lwt.t +val protocols: + #streamed -> + (Protocol_hash.t Lwt_stream.t * stopper) tzresult Lwt.t + module S : sig val valid_blocks: @@ -37,5 +41,10 @@ module S : sig < next_protocols : Protocol_hash.t list >, unit, Block_hash.t) RPC_service.t + val protocols: + ([ `GET ], unit, + unit, unit, unit, + Protocol_hash.t) RPC_service.t + end diff --git a/src/lib_shell_services/protocol_services.ml b/src/lib_shell_services/protocol_services.ml index 0abd45c20..a88b4c796 100644 --- a/src/lib_shell_services/protocol_services.ml +++ b/src/lib_shell_services/protocol_services.ml @@ -9,45 +9,20 @@ open Data_encoding - module S = struct let protocols_arg = Protocol_hash.rpc_arg let contents = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: empty - ~output: - (obj1 (req "data" (Protocol.encoding))) + ~output: Protocol.encoding RPC_path.(root / "protocols" /: protocols_arg) - type list_param = { - contents: bool option ; - monitor: bool option ; - } - - let list_param_encoding = - conv - (fun {contents; monitor} -> (contents, monitor)) - (fun (contents, monitor) -> {contents; monitor}) - (obj2 - (opt "contents" bool) - (opt "monitor" bool)) - let list = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: list_param_encoding - ~output: - (obj1 - (req "protocols" - (list - (obj2 - (req "hash" Protocol_hash.encoding) - (opt "contents" - (dynamic_size Protocol.encoding))) - ))) + ~output: (list Protocol_hash.encoding) RPC_path.(root / "protocols") end @@ -55,10 +30,6 @@ end open RPC_context let contents ctxt h = make_call1 S.contents ctxt h () () -let monitor ?(contents = false) ctxt = - make_streamed_call S.list ctxt () () - { contents = Some contents ; monitor = Some true } -let list ?(contents = false) ctxt = - make_call S.list ctxt () () - { contents = Some contents ; monitor = Some false } +let list ctxt = + make_call S.list ctxt () () () diff --git a/src/lib_shell_services/protocol_services.mli b/src/lib_shell_services/protocol_services.mli index 016d80d17..5df2fa6f1 100644 --- a/src/lib_shell_services/protocol_services.mli +++ b/src/lib_shell_services/protocol_services.mli @@ -13,30 +13,19 @@ val contents: #simple -> Protocol_hash.t -> Protocol.t tzresult Lwt.t val list: - ?contents:bool -> #simple -> - (Protocol_hash.t * Protocol.t option) list tzresult Lwt.t - -val monitor: - ?contents:bool -> - #streamed -> - ((Protocol_hash.t * Protocol.t option) list Lwt_stream.t * stopper) tzresult Lwt.t + Protocol_hash.t list tzresult Lwt.t module S : sig val contents: - ([ `POST ], unit, + ([ `GET ], unit, unit * Protocol_hash.t, unit, unit, Protocol.t) RPC_service.t - type list_param = { - contents: bool option ; - monitor: bool option ; - } - val list: - ([ `POST ], unit, - unit, unit, list_param, - (Protocol_hash.t * Protocol.t option) list) RPC_service.t + ([ `GET ], unit, + unit, unit, unit, + Protocol_hash.t list) RPC_service.t end