RPC: batch operation parsing/retrieval
This commit is contained in:
parent
cb2aa6ea2b
commit
329b72d1aa
@ -212,14 +212,16 @@ module Blocks = struct
|
||||
end
|
||||
|
||||
module Operations = struct
|
||||
let contents cctxt hashes =
|
||||
call_service1 cctxt Services.Operations.contents hashes ()
|
||||
let monitor cctxt ?contents () =
|
||||
call_streamed_service0 cctxt Services.Operations.list
|
||||
{ monitor = Some true ; contents }
|
||||
end
|
||||
|
||||
module Protocols = struct
|
||||
let bytes cctxt hash =
|
||||
call_service1 cctxt Services.Protocols.bytes hash ()
|
||||
let contents cctxt hash =
|
||||
call_service1 cctxt Services.Protocols.contents hash ()
|
||||
let list cctxt ?contents () =
|
||||
call_service0 cctxt Services.Protocols.list { contents; monitor = Some false }
|
||||
end
|
||||
|
@ -144,14 +144,21 @@ module Blocks : sig
|
||||
end
|
||||
|
||||
module Operations : sig
|
||||
|
||||
val contents:
|
||||
Client_commands.context ->
|
||||
Operation_hash.t list -> Store.Operation.t list Lwt.t
|
||||
|
||||
val monitor:
|
||||
Client_commands.context ->
|
||||
?contents:bool -> unit ->
|
||||
(Operation_hash.t * Store.Operation.t option) list list Lwt_stream.t Lwt.t
|
||||
|
||||
end
|
||||
|
||||
module Protocols : sig
|
||||
val bytes:
|
||||
|
||||
val contents:
|
||||
Client_commands.context ->
|
||||
Protocol_hash.t -> Store.Protocol.t Lwt.t
|
||||
|
||||
@ -159,6 +166,7 @@ module Protocols : sig
|
||||
Client_commands.context ->
|
||||
?contents:bool -> unit ->
|
||||
(Protocol_hash.t * Store.Protocol.t option) list Lwt.t
|
||||
|
||||
end
|
||||
|
||||
val bootstrapped:
|
||||
|
@ -50,7 +50,7 @@ let commands () =
|
||||
@@ param ~name:"protocol hash" ~desc:"" check_hash
|
||||
@@ stop)
|
||||
(fun ph cctxt ->
|
||||
Client_node_rpcs.Protocols.bytes cctxt ph >>= fun proto ->
|
||||
Client_node_rpcs.Protocols.contents cctxt ph >>= fun proto ->
|
||||
Updater.extract "" ph proto >>= fun () ->
|
||||
cctxt.message "Extracted protocol %a" Protocol_hash.pp_short ph) ;
|
||||
(* | Error err -> *)
|
||||
|
@ -22,19 +22,24 @@ let monitor cctxt ?contents ?check () =
|
||||
Client_node_rpcs.Operations.monitor cctxt ?contents () >>= fun ops_stream ->
|
||||
let convert ops =
|
||||
Lwt_list.filter_map_p
|
||||
(fun (hash, bytes) ->
|
||||
match bytes with
|
||||
(fun (hash, op) ->
|
||||
match op with
|
||||
| None -> Lwt.return (Some { hash; content = None })
|
||||
| Some ({ Store.Operation.shell ; proto } : Updater.raw_operation) ->
|
||||
Client_proto_rpcs.Helpers.Parse.operation cctxt
|
||||
`Prevalidation ?check shell proto >>= function
|
||||
| Ok proto -> Lwt.return (Some { hash ; content = Some (shell, proto) })
|
||||
| Some op ->
|
||||
Client_proto_rpcs.Helpers.Parse.operations cctxt
|
||||
`Prevalidation ?check [op] >>= function
|
||||
| Ok [proto] ->
|
||||
Lwt.return (Some { hash ; content = Some (op.shell, proto) })
|
||||
| Ok _ ->
|
||||
lwt_log_error
|
||||
"@[<v 2>Error while parsing operations@[" >>= fun () ->
|
||||
Lwt.return None
|
||||
| Error err ->
|
||||
lwt_log_error
|
||||
"@[<v 2>Error while parsing operations@,%a@["
|
||||
pp_print_error err >>= fun () ->
|
||||
Lwt.return None)
|
||||
(List.concat ops)
|
||||
(List.concat ops)
|
||||
in
|
||||
Lwt.return (Lwt_stream.map_s convert ops_stream)
|
||||
|
||||
|
@ -259,10 +259,9 @@ module Helpers = struct
|
||||
end
|
||||
|
||||
module Parse = struct
|
||||
let operation cctxt block ?check shell proto =
|
||||
let operations cctxt block ?check operations =
|
||||
call_error_service1 cctxt
|
||||
Services.Helpers.Parse.operation block
|
||||
(({ shell ; proto } : Updater.raw_operation), check)
|
||||
Services.Helpers.Parse.operations block (operations, check)
|
||||
let block cctxt block shell proto =
|
||||
call_error_service1 cctxt
|
||||
Services.Helpers.Parse.block block
|
||||
|
@ -318,10 +318,10 @@ module Helpers : sig
|
||||
end
|
||||
|
||||
module Parse : sig
|
||||
val operation:
|
||||
val operations:
|
||||
Client_commands.context ->
|
||||
block -> ?check:bool -> Updater.shell_operation -> MBytes.t ->
|
||||
proto_operation tzresult Lwt.t
|
||||
block -> ?check:bool -> Updater.raw_operation list ->
|
||||
proto_operation list tzresult Lwt.t
|
||||
val block:
|
||||
Client_commands.context ->
|
||||
block -> Updater.shell_block -> MBytes.t ->
|
||||
|
@ -329,9 +329,8 @@ module RPC = struct
|
||||
| Some { operations } -> operations
|
||||
|
||||
let operation_content node hash =
|
||||
Distributed_db.read_operation node.distributed_db hash >>= function
|
||||
| None -> Lwt.return_none
|
||||
| Some (_, op) -> Lwt.return (Some op)
|
||||
Distributed_db.read_operation node.distributed_db hash >>= fun op ->
|
||||
Lwt.return (map_option ~f:snd op)
|
||||
|
||||
let pending_operations node (block: block) =
|
||||
match block with
|
||||
|
@ -305,16 +305,17 @@ let list_blocks
|
||||
let list_operations node {Services.Operations.monitor; contents} =
|
||||
let monitor = match monitor with None -> false | Some x -> x in
|
||||
let include_ops = match contents with None -> false | Some x -> x in
|
||||
Node.RPC.operations node `Prevalidation >>= fun operations ->
|
||||
Lwt_list.map_p
|
||||
(Lwt_list.map_p
|
||||
(fun hash ->
|
||||
if include_ops then
|
||||
Node.RPC.operation_content node hash >>= fun op ->
|
||||
Lwt.return (hash, op)
|
||||
else
|
||||
Lwt.return (hash, None)))
|
||||
operations >>= fun operations ->
|
||||
Node.RPC.operations node `Prevalidation >>= fun operationss ->
|
||||
let fetch_operations_content operations =
|
||||
if include_ops then
|
||||
Lwt_list.map_s
|
||||
(fun h ->
|
||||
Node.RPC.operation_content node h >>= fun content ->
|
||||
Lwt.return (h, content))
|
||||
operations
|
||||
else
|
||||
Lwt.return @@ ListLabels.map operations ~f:(fun h -> h, None) in
|
||||
Lwt_list.map_p fetch_operations_content operationss >>= fun operations ->
|
||||
if not monitor then
|
||||
RPC.Answer.return operations
|
||||
else
|
||||
@ -333,10 +334,14 @@ let list_operations node {Services.Operations.monitor; contents} =
|
||||
end in
|
||||
RPC.Answer.return_stream { next ; shutdown }
|
||||
|
||||
let get_operations node hash () =
|
||||
Node.RPC.operation_content node hash >>= function
|
||||
| Some bytes -> RPC.Answer.return bytes
|
||||
| None -> raise Not_found
|
||||
let get_operations node hashes () =
|
||||
Lwt_list.map_p
|
||||
(fun h ->
|
||||
Node.RPC.operation_content node h >>= function
|
||||
| None -> Lwt.fail Not_found
|
||||
| Some h -> Lwt.return h)
|
||||
hashes >>= fun ops ->
|
||||
RPC.Answer.return ops
|
||||
|
||||
let list_protocols node {Services.Protocols.monitor; contents} =
|
||||
let monitor = match monitor with None -> false | Some x -> x in
|
||||
@ -393,11 +398,11 @@ let build_rpc_directory node =
|
||||
let dir =
|
||||
RPC.register0 dir Services.Operations.list (list_operations node) in
|
||||
let dir =
|
||||
RPC.register1 dir Services.Operations.bytes (get_operations node) in
|
||||
RPC.register1 dir Services.Operations.contents (get_operations node) in
|
||||
let dir =
|
||||
RPC.register0 dir Services.Protocols.list (list_protocols node) in
|
||||
let dir =
|
||||
RPC.register1 dir Services.Protocols.bytes (get_protocols node) in
|
||||
RPC.register1 dir Services.Protocols.contents (get_protocols node) in
|
||||
let dir =
|
||||
let implementation (net_id, pred, time, fitness, operations, header) =
|
||||
Node.RPC.block_info node (`Head 0) >>= fun bi ->
|
||||
|
@ -408,16 +408,18 @@ module Operations = struct
|
||||
let name = "operation_id" in
|
||||
let descr =
|
||||
"A operation identifier in hexadecimal." in
|
||||
let construct = Operation_hash.to_b58check in
|
||||
let construct ops =
|
||||
String.concat "," (List.map Operation_hash.to_b58check ops) in
|
||||
let destruct h =
|
||||
try Ok (Operation_hash.of_b58check h)
|
||||
let ops = split ',' h in
|
||||
try Ok (List.map Operation_hash.of_b58check ops)
|
||||
with _ -> Error "Can't parse hash" in
|
||||
RPC.Arg.make ~name ~descr ~construct ~destruct ()
|
||||
|
||||
let bytes =
|
||||
let contents =
|
||||
RPC.service
|
||||
~input: empty
|
||||
~output: Updater.raw_operation_encoding
|
||||
~output: (list (dynamic_size Updater.raw_operation_encoding))
|
||||
RPC.Path.(root / "operations" /: operations_arg)
|
||||
|
||||
type list_param = {
|
||||
@ -435,6 +437,8 @@ module Operations = struct
|
||||
|
||||
let list =
|
||||
RPC.service
|
||||
~description:
|
||||
"List operations in the mempool."
|
||||
~input: list_param_encoding
|
||||
~output:
|
||||
(obj1
|
||||
@ -451,6 +455,7 @@ module Operations = struct
|
||||
end
|
||||
|
||||
module Protocols = struct
|
||||
|
||||
let protocols_arg =
|
||||
let name = "protocol_id" in
|
||||
let descr =
|
||||
@ -461,7 +466,7 @@ module Protocols = struct
|
||||
with _ -> Error "Can't parse hash" in
|
||||
RPC.Arg.make ~name ~descr ~construct ~destruct ()
|
||||
|
||||
let bytes =
|
||||
let contents =
|
||||
RPC.service
|
||||
~input: empty
|
||||
~output:
|
||||
@ -496,6 +501,7 @@ module Protocols = struct
|
||||
(dynamic_size Store.Protocol.encoding)))
|
||||
)))
|
||||
RPC.Path.(root / "protocols")
|
||||
|
||||
end
|
||||
|
||||
module Network = struct
|
||||
|
@ -99,28 +99,39 @@ module Blocks : sig
|
||||
end
|
||||
|
||||
module Operations : sig
|
||||
val bytes:
|
||||
(unit, unit * Operation_hash.t, unit, State.Operation.t) RPC.service
|
||||
|
||||
val contents:
|
||||
(unit, unit * Operation_hash.t list,
|
||||
unit, State.Operation.t list) RPC.service
|
||||
|
||||
|
||||
type list_param = {
|
||||
contents: bool option ;
|
||||
monitor: bool option ;
|
||||
}
|
||||
|
||||
val list:
|
||||
(unit, unit,
|
||||
list_param, (Operation_hash.t * Store.Operation.t option) list list) RPC.service
|
||||
list_param,
|
||||
(Operation_hash.t * Store.Operation.t option) list list) RPC.service
|
||||
|
||||
end
|
||||
|
||||
module Protocols : sig
|
||||
val bytes:
|
||||
|
||||
val contents:
|
||||
(unit, unit * Protocol_hash.t, unit, Tezos_compiler.Protocol.t) RPC.service
|
||||
|
||||
type list_param = {
|
||||
contents: bool option ;
|
||||
monitor: bool option ;
|
||||
}
|
||||
|
||||
val list:
|
||||
(unit, unit,
|
||||
list_param,
|
||||
(Protocol_hash.t * Tezos_compiler.Protocol.t option) list) RPC.service
|
||||
|
||||
end
|
||||
|
||||
module Network : sig
|
||||
|
@ -574,15 +574,16 @@ module Helpers = struct
|
||||
|
||||
module Parse = struct
|
||||
|
||||
let operation custom_root =
|
||||
let operations custom_root =
|
||||
RPC.service
|
||||
~description:"Parse an operation"
|
||||
~description:"Parse operations"
|
||||
~input:
|
||||
(merge_objs
|
||||
Updater.raw_operation_encoding
|
||||
(obj1 (opt "check_signature" bool)))
|
||||
~output: (wrap_tzerror Operation.proto_operation_encoding)
|
||||
RPC.Path.(custom_root / "helpers" / "parse" / "operation" )
|
||||
(obj2
|
||||
(req "operations" (list (dynamic_size Updater.raw_operation_encoding)))
|
||||
(opt "check_signature" bool))
|
||||
~output:
|
||||
(wrap_tzerror (list Operation.proto_operation_encoding))
|
||||
RPC.Path.(custom_root / "helpers" / "parse" / "operations" )
|
||||
|
||||
let block custom_root =
|
||||
RPC.service
|
||||
|
@ -468,17 +468,18 @@ let check_signature ctxt signature shell contents =
|
||||
Operation.check_signature key
|
||||
{ signature ; shell ; contents ; hash = dummy_hash }
|
||||
|
||||
let parse_operation ctxt
|
||||
(({ shell ; proto } : Updater.raw_operation), check) =
|
||||
Operation.parse_proto proto >>=? fun (proto, signature) ->
|
||||
begin
|
||||
match check with
|
||||
| Some true -> check_signature ctxt signature shell proto
|
||||
| Some false | None -> return ()
|
||||
end >>=? fun () ->
|
||||
return proto
|
||||
let parse_operations ctxt (operations, check) =
|
||||
map_s begin fun ({ shell ; proto } : Updater.raw_operation) ->
|
||||
begin
|
||||
Operation.parse_proto proto >>=? fun (proto, signature) ->
|
||||
begin match check with
|
||||
| Some true -> check_signature ctxt signature shell proto
|
||||
| Some false | None -> return ()
|
||||
end >>|? fun () -> proto
|
||||
end
|
||||
end operations
|
||||
|
||||
let () = register1 Services.Helpers.Parse.operation parse_operation
|
||||
let () = register1 Services.Helpers.Parse.operations parse_operations
|
||||
|
||||
let parse_block _ctxt raw_block =
|
||||
Lwt.return (Block.parse_header raw_block) >>=? fun { proto } ->
|
||||
|
Loading…
Reference in New Issue
Block a user