Shell/RPC: use query parameters in /protocols

This commit is contained in:
Grégoire Henry 2018-04-21 13:09:59 +02:00 committed by Benjamin Canou
parent bbf5c7408b
commit 04c920df29
15 changed files with 60 additions and 93 deletions

View File

@ -25,8 +25,8 @@ let commands () =
no_options no_options
(prefixes [ "list" ; "protocols" ] stop) (prefixes [ "list" ; "protocols" ] stop)
(fun () (cctxt : #Client_context.full) -> (fun () (cctxt : #Client_context.full) ->
Protocol_services.list ~contents:false cctxt >>=? fun protos -> Protocol_services.list cctxt >>=? fun protos ->
Lwt_list.iter_s (fun (ph, _p) -> cctxt#message "%a" Protocol_hash.pp ph) protos >>= fun () -> Lwt_list.iter_s (fun ph -> cctxt#message "%a" Protocol_hash.pp ph) protos >>= fun () ->
return () return ()
); );

View File

@ -571,7 +571,7 @@ module Json: sig
(** Write a JSON document to a string. This goes via an intermediate (** Write a JSON document to a string. This goes via an intermediate
buffer and so may be slow on large documents. *) buffer and so may be slow on large documents. *)
val to_string : ?minify:bool -> json -> string val to_string : ?newline:bool -> ?minify:bool -> json -> string
val pp : Format.formatter -> json -> unit val pp : Format.formatter -> json -> unit

View File

@ -271,8 +271,10 @@ let to_root = function
| `Null -> `O [] | `Null -> `O []
| oth -> `A [ oth ] | oth -> `A [ oth ]
let to_string ?minify j = let to_string ?(newline = false) ?minify j =
Format.asprintf "%a" Json_repr.(pp ?compact:minify (module Ezjsonm)) j Format.asprintf "%a%s"
Json_repr.(pp ?compact:minify (module Ezjsonm)) j
(if newline then "\n" else "")
let pp = Json_repr.(pp (module Ezjsonm)) let pp = Json_repr.(pp (module Ezjsonm))

View File

@ -50,7 +50,7 @@ val wrap_error : ('a -> 'b) -> 'a -> 'b
val from_string : string -> (json, string) result val from_string : string -> (json, string) result
val from_stream : string Lwt_stream.t -> (json, string) result Lwt_stream.t val from_stream : string Lwt_stream.t -> (json, string) result Lwt_stream.t
val to_string : ?minify:bool -> json -> string val to_string : ?newline:bool -> ?minify:bool -> json -> string
val pp : Format.formatter -> json -> unit val pp : Format.formatter -> json -> unit
val bytes_jsont: MBytes.t Json_encoding.encoding val bytes_jsont: MBytes.t Json_encoding.encoding

View File

@ -24,7 +24,7 @@ let json = {
Data_encoding.Json.pp ppf json Data_encoding.Json.pp ppf json
end ; end ;
construct = begin fun enc v -> construct = begin fun enc v ->
Data_encoding.Json.to_string ~minify:true @@ Data_encoding.Json.to_string ~newline:true ~minify:true @@
Data_encoding.Json.construct enc v Data_encoding.Json.construct enc v
end ; end ;
destruct = begin fun enc body -> destruct = begin fun enc body ->

View File

@ -93,4 +93,11 @@ let build_rpc_directory validator =
RPC_answer.return_stream { next ; shutdown } RPC_answer.return_stream { next ; shutdown }
end ; end ;
gen_register0 Monitor_services.S.protocols begin fun () () ->
let stream, stopper = State.Protocol.watcher state in
let shutdown () = Lwt_watcher.shutdown stopper in
let next () = Lwt_stream.get stream in
RPC_answer.return_stream { next ; shutdown }
end ;
!dir !dir

View File

@ -138,7 +138,7 @@ let build_rpc_directory node =
let register0 s f = let register0 s f =
dir := RPC_directory.register !dir s (fun () p q -> f p q) in dir := RPC_directory.register !dir s (fun () p q -> f p q) in
merge (Protocol_directory.build_rpc_directory node.state node.distributed_db) ; merge (Protocol_directory.build_rpc_directory node.state) ;
merge (Monitor_directory.build_rpc_directory node.validator) ; merge (Monitor_directory.build_rpc_directory node.validator) ;
merge (Shell_directory.build_rpc_directory merge (Shell_directory.build_rpc_directory
node.validator node.mainchain_validator) ; node.validator node.mainchain_validator) ;

View File

@ -7,7 +7,7 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
let build_rpc_directory state distributed_db = let build_rpc_directory state =
let dir : unit RPC_directory.t ref = ref RPC_directory.empty in let dir : unit RPC_directory.t ref = ref RPC_directory.empty in
let gen_register0 s f = let gen_register0 s f =
@ -15,40 +15,11 @@ let build_rpc_directory state distributed_db =
let register1 s f = let register1 s f =
dir := RPC_directory.register !dir s (fun ((), a) p q -> f a p q) in dir := RPC_directory.register !dir s (fun ((), a) p q -> f a p q) in
gen_register0 Protocol_services.S.list begin fun () p -> gen_register0 Protocol_services.S.list begin fun () () ->
let { Protocol_services.S.monitor ; contents } = p in
let monitor = match monitor with None -> false | Some x -> x in
let include_contents = match contents with None -> false | Some x -> x in
State.Protocol.list state >>= fun set -> State.Protocol.list state >>= fun set ->
let protocols = Protocol_hash.Set.elements set in let protocols = Protocol_hash.Set.elements set in
Lwt_list.map_p
(fun hash ->
if include_contents then
State.Protocol.read state hash >>= function
| Error _ -> Lwt.return (hash, None)
| Ok bytes -> Lwt.return (hash, Some bytes)
else
Lwt.return (hash, None))
protocols >>= fun protocols ->
if not monitor then
RPC_answer.return protocols RPC_answer.return protocols
else end ;
let stream, stopper =
Distributed_db.Protocol.watch distributed_db in
let shutdown () = Lwt_watcher.shutdown stopper in
let first_request = ref true in
let next () =
if not !first_request then
Lwt_stream.get stream >>= function
| None -> Lwt.return_none
| Some (h, op) when include_contents -> Lwt.return (Some [h, Some op])
| Some (h, _) -> Lwt.return (Some [h, None])
else begin
first_request := false ;
Lwt.return (Some protocols)
end in
RPC_answer.return_stream { next ; shutdown }
end;
register1 Protocol_services.S.contents begin fun hash () () -> register1 Protocol_services.S.contents begin fun hash () () ->
State.Protocol.read state hash State.Protocol.read state hash

View File

@ -8,4 +8,4 @@
(**************************************************************************) (**************************************************************************)
val build_rpc_directory: val build_rpc_directory:
State.t -> Distributed_db.t -> unit RPC_directory.t State.t -> unit RPC_directory.t

View File

@ -24,6 +24,7 @@ type global_state = {
global_data: global_data Shared.t ; global_data: global_data Shared.t ;
protocol_store: Store.Protocol.store Shared.t ; protocol_store: Store.Protocol.store Shared.t ;
main_chain: Chain_id.t ; main_chain: Chain_id.t ;
protocol_watcher: Protocol_hash.t Lwt_watcher.input ;
block_watcher: block Lwt_watcher.input ; block_watcher: block Lwt_watcher.input ;
} }
@ -841,6 +842,7 @@ module Protocol = struct
Lwt.return None Lwt.return None
else else
Store.Protocol.RawContents.store (store, hash) bytes >>= fun () -> Store.Protocol.RawContents.store (store, hash) bytes >>= fun () ->
Lwt_watcher.notify global_state.protocol_watcher hash ;
Lwt.return (Some hash) Lwt.return (Some hash)
end end
@ -861,6 +863,9 @@ module Protocol = struct
~f:(fun x acc -> Lwt.return (Protocol_hash.Set.add x acc)) ~f:(fun x acc -> Lwt.return (Protocol_hash.Set.add x acc))
end end
let watcher (state : global_state) =
Lwt_watcher.create_stream state.protocol_watcher
end end
module Current_mempool = struct module Current_mempool = struct
@ -903,6 +908,7 @@ let read
global_data = Shared.create global_data ; global_data = Shared.create global_data ;
protocol_store = Shared.create @@ Store.Protocol.get global_store ; protocol_store = Shared.create @@ Store.Protocol.get global_store ;
main_chain ; main_chain ;
protocol_watcher = Lwt_watcher.create_input () ;
block_watcher = Lwt_watcher.create_input () ; block_watcher = Lwt_watcher.create_input () ;
} in } in
Chain.read_all state >>=? fun () -> Chain.read_all state >>=? fun () ->

View File

@ -215,6 +215,8 @@ module Protocol : sig
val list: global_state -> Protocol_hash.Set.t Lwt.t val list: global_state -> Protocol_hash.Set.t Lwt.t
val watcher: global_state -> Protocol_hash.t Lwt_stream.t * Lwt_watcher.stopper
end end
module Current_mempool : sig module Current_mempool : sig

View File

@ -52,6 +52,13 @@ module S = struct
~output: Block_hash.encoding ~output: Block_hash.encoding
RPC_path.(path / "heads" /: Chain_services.chain_arg) RPC_path.(path / "heads" /: Chain_services.chain_arg)
let protocols =
RPC_service.get_service
~description:"...FIXME..."
~query: RPC_query.empty
~output: Protocol_hash.encoding
RPC_path.(path / "protocols")
end end
open RPC_context open RPC_context
@ -68,3 +75,6 @@ let heads ctxt ?(next_protocols = []) chain =
make_streamed_call S.heads ctxt ((), chain) (object make_streamed_call S.heads ctxt ((), chain) (object
method next_protocols = next_protocols method next_protocols = next_protocols
end) () end) ()
let protocols ctxt =
make_streamed_call S.protocols ctxt () () ()

View File

@ -22,6 +22,10 @@ val heads:
Chain_services.chain -> Chain_services.chain ->
(Block_hash.t Lwt_stream.t * stopper) tzresult Lwt.t (Block_hash.t Lwt_stream.t * stopper) tzresult Lwt.t
val protocols:
#streamed ->
(Protocol_hash.t Lwt_stream.t * stopper) tzresult Lwt.t
module S : sig module S : sig
val valid_blocks: val valid_blocks:
@ -37,5 +41,10 @@ module S : sig
< next_protocols : Protocol_hash.t list >, unit, < next_protocols : Protocol_hash.t list >, unit,
Block_hash.t) RPC_service.t Block_hash.t) RPC_service.t
val protocols:
([ `GET ], unit,
unit, unit, unit,
Protocol_hash.t) RPC_service.t
end end

View File

@ -9,45 +9,20 @@
open Data_encoding open Data_encoding
module S = struct module S = struct
let protocols_arg = Protocol_hash.rpc_arg let protocols_arg = Protocol_hash.rpc_arg
let contents = let contents =
RPC_service.post_service RPC_service.get_service
~query: RPC_query.empty ~query: RPC_query.empty
~input: empty ~output: Protocol.encoding
~output:
(obj1 (req "data" (Protocol.encoding)))
RPC_path.(root / "protocols" /: protocols_arg) RPC_path.(root / "protocols" /: protocols_arg)
type list_param = {
contents: bool option ;
monitor: bool option ;
}
let list_param_encoding =
conv
(fun {contents; monitor} -> (contents, monitor))
(fun (contents, monitor) -> {contents; monitor})
(obj2
(opt "contents" bool)
(opt "monitor" bool))
let list = let list =
RPC_service.post_service RPC_service.get_service
~query: RPC_query.empty ~query: RPC_query.empty
~input: list_param_encoding ~output: (list Protocol_hash.encoding)
~output:
(obj1
(req "protocols"
(list
(obj2
(req "hash" Protocol_hash.encoding)
(opt "contents"
(dynamic_size Protocol.encoding)))
)))
RPC_path.(root / "protocols") RPC_path.(root / "protocols")
end end
@ -55,10 +30,6 @@ end
open RPC_context open RPC_context
let contents ctxt h = let contents ctxt h =
make_call1 S.contents ctxt h () () make_call1 S.contents ctxt h () ()
let monitor ?(contents = false) ctxt = let list ctxt =
make_streamed_call S.list ctxt () () make_call S.list ctxt () () ()
{ contents = Some contents ; monitor = Some true }
let list ?(contents = false) ctxt =
make_call S.list ctxt () ()
{ contents = Some contents ; monitor = Some false }

View File

@ -13,30 +13,19 @@ val contents:
#simple -> Protocol_hash.t -> Protocol.t tzresult Lwt.t #simple -> Protocol_hash.t -> Protocol.t tzresult Lwt.t
val list: val list:
?contents:bool ->
#simple -> #simple ->
(Protocol_hash.t * Protocol.t option) list tzresult Lwt.t Protocol_hash.t list tzresult Lwt.t
val monitor:
?contents:bool ->
#streamed ->
((Protocol_hash.t * Protocol.t option) list Lwt_stream.t * stopper) tzresult Lwt.t
module S : sig module S : sig
val contents: val contents:
([ `POST ], unit, ([ `GET ], unit,
unit * Protocol_hash.t, unit, unit, unit * Protocol_hash.t, unit, unit,
Protocol.t) RPC_service.t Protocol.t) RPC_service.t
type list_param = {
contents: bool option ;
monitor: bool option ;
}
val list: val list:
([ `POST ], unit, ([ `GET ], unit,
unit, unit, list_param, unit, unit, unit,
(Protocol_hash.t * Protocol.t option) list) RPC_service.t Protocol_hash.t list) RPC_service.t
end end