From f615459200593b5028f788d4af7287167622c914 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 11 Jun 2018 19:02:26 +0200 Subject: [PATCH] Shell/RPC: `/chains//mempool` new returned parsed operations --- src/lib_shell/chain_directory.ml | 24 +++--- src/lib_shell/prevalidator.ml | 76 ++++++++++++++++- src/lib_shell/prevalidator.mli | 2 + src/lib_shell_services/block_services.ml | 83 +++++++++++++++++++ src/lib_shell_services/block_services.mli | 28 +++++++ src/lib_shell_services/chain_services.ml | 41 +-------- src/lib_shell_services/chain_services.mli | 19 +---- .../lib_baking/client_baking_forge.ml | 32 +++---- 8 files changed, 214 insertions(+), 91 deletions(-) diff --git a/src/lib_shell/chain_directory.ml b/src/lib_shell/chain_directory.ml index 64a67145e..034ba8c51 100644 --- a/src/lib_shell/chain_directory.ml +++ b/src/lib_shell/chain_directory.ml @@ -142,21 +142,17 @@ let build_rpc_directory validator = (* Mempool *) - let register0 s f = - dir := - RPC_directory.register !dir (RPC_service.subst0 s) - (fun chain p q -> f chain p q) in - - register0 S.Mempool.pending_operations begin fun chain () () -> - Validator.get_exn validator (State.Chain.id chain) >>= fun chain_validator -> - let pv_opt = Chain_validator.prevalidator chain_validator in - match pv_opt with - | Some pv -> - return (Prevalidator.operations pv) - | None -> - return (Preapply_result.empty, Operation_hash.Map.empty) - end ; + let merge d = dir := RPC_directory.merge !dir d in + merge + (RPC_directory.map + (fun chain -> + Validator.get_exn validator + (State.Chain.id chain) >>= fun chain_validator -> + Lwt.return (Chain_validator.prevalidator chain_validator)) + Prevalidator.rpc_directory) ; RPC_directory.prefix Chain_services.path @@ RPC_directory.map (fun ((), chain) -> get_chain state chain) !dir + + diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index a01302f2d..fe0c301df 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -45,6 +45,7 @@ module Types = struct mutable validation_result : error Preapply_result.t ; mutable validation_state : Prevalidation.prevalidation_state tzresult ; mutable advertisement : [ `Pending of Mempool.t | `None ] ; + mutable rpc_directory : state RPC_directory.t tzresult Lwt.t ; } type parameters = limits * Distributed_db.chain_db @@ -80,6 +81,59 @@ type error += Closed = Worker.Closed let debug w = Format.kasprintf (fun msg -> Worker.record_event w (Debug msg)) +let empty_rpc_directory : unit RPC_directory.t = + RPC_directory.register + RPC_directory.empty + (Block_services.Empty.S.Mempool.pending_operations RPC_path.open_root) + (fun _pv () () -> + return { + Block_services.Empty.Mempool.applied = [] ; + refused = Operation_hash.Map.empty ; + branch_refused = Operation_hash.Map.empty ; + branch_delayed = Operation_hash.Map.empty ; + unprocessed = Operation_hash.Map.empty ; + }) + +let rpc_directory block : Types.state RPC_directory.t tzresult Lwt.t = + State.Block.protocol_hash block >>= fun protocol -> + begin + match Registered_protocol.get protocol with + | None -> + (* FIXME. *) + (* This should not happen: it should be handled in the validator. *) + failwith "Prevalidation: missing protocol '%a' for the current block." + Protocol_hash.pp_short protocol + | Some protocol -> + return protocol + end >>=? fun (module Proto) -> + let module Proto_services = Block_services.Make(Proto)(Proto) in + return @@ + RPC_directory.register + RPC_directory.empty + (Proto_services.S.Mempool.pending_operations RPC_path.open_root) + (fun pv () () -> + let map_op op = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + op.Operation.proto in + { Proto.shell = op.shell ; protocol_data } in + let map_op_error (op, error) = (map_op op, error) in + return { + Proto_services.Mempool.applied = + List.map + (fun (hash, op) -> (hash, map_op op)) + (List.rev pv.validation_result.applied) ; + refused = + Operation_hash.Map.map map_op_error pv.validation_result.refused ; + branch_refused = + Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ; + branch_delayed = + Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ; + unprocessed = + Operation_hash.Map.map map_op pv.pending ; + }) + let list_pendings ?maintain_chain_db ~from_block ~to_block old_mempool = let rec pop_blocks ancestor block mempool = let hash = State.Block.hash block in @@ -356,6 +410,7 @@ let on_flush w pv predecessor = pv.in_mempool <- Operation_hash.Set.empty ; pv.validation_result <- validation_result ; pv.validation_state <- validation_state ; + pv.rpc_directory <- rpc_directory predecessor ; return () let on_advertise pv = @@ -429,7 +484,9 @@ let on_launch w _ (limits, chain_db) = pending = Operation_hash.Map.empty ; in_mempool = Operation_hash.Set.empty ; validation_result ; validation_state ; - advertisement = `None } in + advertisement = `None ; + rpc_directory = rpc_directory predecessor ; + } in List.iter (fun oph -> Lwt.ignore_result (fetch_operation w pv oph)) mempool.known_valid ; @@ -508,3 +565,20 @@ let pending_requests t = Worker.pending_requests t let current_request t = Worker.current_request t let last_events = Worker.last_events + +let rpc_directory : t option RPC_directory.t = + RPC_directory.register_dynamic_directory + RPC_directory.empty + (Block_services.mempool_path RPC_path.open_root) + (function + | None -> + Lwt.return + (RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory) + | Some w -> + let pv = Worker.state w in + pv.rpc_directory >>= function + | Error _ -> + Lwt.return RPC_directory.empty + | Ok rpc_directory -> + Lwt.return + (RPC_directory.map (fun _ -> Lwt.return pv) rpc_directory)) diff --git a/src/lib_shell/prevalidator.mli b/src/lib_shell/prevalidator.mli index 5c659bcc3..7e1afd93b 100644 --- a/src/lib_shell/prevalidator.mli +++ b/src/lib_shell/prevalidator.mli @@ -54,3 +54,5 @@ val status: t -> Worker_types.worker_status val pending_requests : t -> (Time.t * Prevalidator_worker_state.Request.view) list val current_request : t -> (Time.t * Time.t * Prevalidator_worker_state.Request.view) option val last_events : t -> (Lwt_log_core.level * Prevalidator_worker_state.Event.t list) list + +val rpc_directory : t option RPC_directory.t diff --git a/src/lib_shell_services/block_services.ml b/src/lib_shell_services/block_services.ml index 8c5c9e5a0..1ac124559 100644 --- a/src/lib_shell_services/block_services.ml +++ b/src/lib_shell_services/block_services.ml @@ -75,6 +75,7 @@ let blocks_arg = type chain_prefix = unit * chain type prefix = chain_prefix * block let chain_path = RPC_path.(root / "chains" /: chain_arg) +let mempool_path p = RPC_path.(p / "mempool") let dir_path : (chain_prefix, chain_prefix) RPC_path.t = RPC_path.(open_root / "blocks") let path = RPC_path.(dir_path /: blocks_arg) @@ -597,6 +598,72 @@ module Make(Proto : PROTO)(Next_proto : PROTO) = struct ~output: block_info_encoding path + module Mempool = struct + + type t = { + applied: (Operation_hash.t * Next_proto.operation) list ; + refused: (Next_proto.operation * error list) Operation_hash.Map.t ; + branch_refused: (Next_proto.operation * error list) Operation_hash.Map.t ; + branch_delayed: (Next_proto.operation * error list) Operation_hash.Map.t ; + unprocessed: Next_proto.operation Operation_hash.Map.t ; + } + + let encoding = + conv + (fun + { applied ; + refused ; branch_refused ; branch_delayed ; + unprocessed } -> + (applied, refused, branch_refused, branch_delayed, unprocessed)) + (fun + (applied, refused, branch_refused, branch_delayed, unprocessed) -> + { applied ; + refused ; branch_refused ; branch_delayed ; + unprocessed }) + (obj5 + (req "applied" + (list + (conv + (fun (hash, (op : Next_proto.operation)) -> + ((hash, op.shell), (op.protocol_data))) + (fun ((hash, shell), (protocol_data)) -> + (hash, { shell ; protocol_data })) + (merge_objs + (merge_objs + (obj1 (req "hash" Operation_hash.encoding)) + (dynamic_size Operation.shell_header_encoding)) + (dynamic_size Next_proto.operation_data_encoding) + )))) + (req "refused" + (Operation_hash.Map.encoding + (merge_objs + (dynamic_size next_operation_encoding) + (obj1 (req "error" RPC_error.encoding))))) + (req "branch_refused" + (Operation_hash.Map.encoding + (merge_objs + (dynamic_size next_operation_encoding) + (obj1 (req "error" RPC_error.encoding))))) + (req "branch_delayed" + (Operation_hash.Map.encoding + (merge_objs + (dynamic_size next_operation_encoding) + (obj1 (req "error" RPC_error.encoding))))) + (req "unprocessed" + (Operation_hash.Map.encoding + (dynamic_size next_operation_encoding)))) + + let pending_operations path = + (* TODO: branch_delayed/... *) + RPC_service.get_service + ~description: + "List the not-yet-prevalidated operations." + ~query: RPC_query.empty + ~output: encoding + path + + end + end let path = RPC_path.prefix chain_path path @@ -762,6 +829,22 @@ module Make(Proto : PROTO)(Next_proto : PROTO) = struct fun ?(chain = `Main) ?(block = `Head 0) () -> f chain block () () + module Mempool = struct + + type t = S.Mempool.t = { + applied: (Operation_hash.t * Next_proto.operation) list ; + refused: (Next_proto.operation * error list) Operation_hash.Map.t ; + branch_refused: (Next_proto.operation * error list) Operation_hash.Map.t ; + branch_delayed: (Next_proto.operation * error list) Operation_hash.Map.t ; + unprocessed: Next_proto.operation Operation_hash.Map.t ; + } + + let pending_operations ctxt ?(chain = `Main) () = + let s = S.Mempool.pending_operations (mempool_path chain_path) in + RPC_context.make_call1 s ctxt chain () () + + end + end module Fake_protocol = struct diff --git a/src/lib_shell_services/block_services.mli b/src/lib_shell_services/block_services.mli index 695eea39b..551484c27 100644 --- a/src/lib_shell_services/block_services.mli +++ b/src/lib_shell_services/block_services.mli @@ -32,6 +32,7 @@ val to_string: block -> string type prefix = (unit * chain) * block val dir_path: (chain_prefix, chain_prefix) RPC_path.t val path: (chain_prefix, chain_prefix * block) RPC_path.t +val mempool_path : ('a, 'b) RPC_path.t -> ('a, 'b) RPC_path.t type operation_list_quota = { max_size: int ; @@ -218,6 +219,23 @@ module Make(Proto : PROTO)(Next_proto : PROTO) : sig end + module Mempool : sig + + type t = { + applied: (Operation_hash.t * Next_proto.operation) list ; + refused: (Next_proto.operation * error list) Operation_hash.Map.t ; + branch_refused: (Next_proto.operation * error list) Operation_hash.Map.t ; + branch_delayed: (Next_proto.operation * error list) Operation_hash.Map.t ; + unprocessed: Next_proto.operation Operation_hash.Map.t ; + } + + val pending_operations: + #simple -> + ?chain:chain -> + unit -> t tzresult Lwt.t + + end + module S : sig val hash: @@ -348,6 +366,16 @@ module Make(Proto : PROTO)(Next_proto : PROTO) : sig end + module Mempool : sig + + val pending_operations: + ('a, 'b) RPC_path.t -> + ([ `GET ], 'a, + 'b , unit, unit, + Mempool.t) RPC_service.t + + end + end end diff --git a/src/lib_shell_services/chain_services.ml b/src/lib_shell_services/chain_services.ml index 9a3c3f32d..e7ab5a02e 100644 --- a/src/lib_shell_services/chain_services.ml +++ b/src/lib_shell_services/chain_services.ml @@ -48,38 +48,6 @@ module S = struct ~output: Chain_id.encoding RPC_path.(path / "chain_id") - module Mempool = struct - - let operation_encoding = - merge_objs - (obj1 (req "hash" Operation_hash.encoding)) - Operation.encoding - - let pending_operations = - (* TODO: branch_delayed/... *) - RPC_service.get_service - ~description: - "List the not-yet-prevalidated operations." - ~query: RPC_query.empty - ~output: - (conv - (fun (preapplied, unprocessed) -> - ({ preapplied with - Preapply_result.refused = Operation_hash.Map.empty }, - Operation_hash.Map.bindings unprocessed)) - (fun (preapplied, unprocessed) -> - (preapplied, - List.fold_right - (fun (h, op) m -> Operation_hash.Map.add h op m) - unprocessed Operation_hash.Map.empty)) - (merge_objs - (dynamic_size - (Preapply_result.encoding RPC_error.encoding)) - (obj1 (req "unprocessed" (list (dynamic_size operation_encoding)))))) - RPC_path.(path / "mempool") - - end - module Blocks = struct let list_query = @@ -168,13 +136,6 @@ let chain_id ctxt = | `Hash h -> return h | _ -> f chain () () -module Mempool = struct - - let pending_operations ctxt ?(chain = `Main) () = - make_call0 S.Mempool.pending_operations ctxt chain () () - -end - module Blocks = struct let list ctxt = @@ -199,6 +160,8 @@ module Blocks = struct end +module Mempool = Block_services.Empty.Mempool + module Invalid_blocks = struct let list ctxt = diff --git a/src/lib_shell_services/chain_services.mli b/src/lib_shell_services/chain_services.mli index dc6b7bdd9..01593f227 100644 --- a/src/lib_shell_services/chain_services.mli +++ b/src/lib_shell_services/chain_services.mli @@ -34,15 +34,7 @@ val chain_id: ?chain:chain -> unit -> Chain_id.t tzresult Lwt.t -module Mempool : sig - - val pending_operations: - #simple -> - ?chain:chain -> - unit -> - (error Preapply_result.t * Operation.t Operation_hash.Map.t) tzresult Lwt.t - -end +module Mempool = Block_services.Empty.Mempool module Blocks : sig @@ -93,15 +85,6 @@ module S : sig prefix, unit, unit, Chain_id.t) RPC_service.t - module Mempool : sig - - val pending_operations: - ([ `GET ], prefix, - prefix , unit, unit, - error Preapply_result.t * Operation.t Operation_hash.Map.t) RPC_service.t - - end - module Blocks : sig val path: (prefix, prefix) RPC_path.t diff --git a/src/proto_alpha/lib_baking/client_baking_forge.ml b/src/proto_alpha/lib_baking/client_baking_forge.ml index bb1f08b5c..94d022000 100644 --- a/src/proto_alpha/lib_baking/client_baking_forge.ml +++ b/src/proto_alpha/lib_baking/client_baking_forge.ml @@ -128,6 +128,13 @@ let forge (op : Operation.packed) : Operation.raw = { op.protocol_data } +let all_operations (ops : Alpha_block_services.Mempool.t) = + List.map (fun (_, op) -> op) ops.applied @ + Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.refused [] @ + Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_refused [] @ + Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_delayed [] @ + Operation_hash.Map.fold (fun _ op acc -> op :: acc) ops.unprocessed [] + let forge_block cctxt ?(chain = `Main) block ?force ?operations ?(best_effort = operations = None) ?(sort = best_effort) @@ -137,17 +144,9 @@ let forge_block cctxt ?(chain = `Main) block begin match operations with | None -> - Shell_services.Mempool.pending_operations - cctxt ~chain () >>=? fun (ops, pendings) -> - let ops = - List.map parse @@ - List.map snd @@ - Operation_hash.Map.bindings @@ - Operation_hash.Map.fold - Operation_hash.Map.add - (Preapply_result.operations ops) - pendings in - return ops + Alpha_block_services.Mempool.pending_operations + cctxt ~chain () >>=? fun ops -> + return (all_operations ops) | Some operations -> return operations end >>=? fun operations -> @@ -499,14 +498,9 @@ let bake (cctxt : #Proto_alpha.full) state = lwt_debug "Try baking after %a (slot %d) for %s (%a)" Block_hash.pp_short bi.hash priority name Time.pp_hum timestamp >>= fun () -> - Shell_services.Mempool.pending_operations - cctxt ~chain () >>=? fun (res, ops) -> - let operations = - List.map parse @@ - List.map snd @@ - Operation_hash.Map.bindings @@ - Operation_hash.Map.(fold add) - ops (Preapply_result.operations res) in + Alpha_block_services.Mempool.pending_operations + cctxt ~chain () >>=? fun ops -> + let operations = all_operations ops in let request = List.length operations in let seed_nonce_hash = if next_level.expected_commitment then