From 3e39f82bee047845e3b24251b89bb0d2a5b4a550 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 13 Nov 2017 23:27:19 +0100 Subject: [PATCH] Shell/baker: inline full operation contents in RPC. --- src/client/client_node_rpcs.ml | 10 +- src/client/client_node_rpcs.mli | 14 +-- .../embedded/alpha/client_baking_forge.ml | 57 ++++-------- .../embedded/alpha/client_baking_forge.mli | 6 +- src/node/shell/distributed_db.ml | 14 --- src/node/shell/distributed_db.mli | 7 -- src/node/shell/node.ml | 42 ++++----- src/node/shell/node.mli | 6 +- src/node/shell/node_rpc_services.ml | 73 ++++++--------- src/node/shell/node_rpc_services.mli | 14 +-- src/node/shell/peer_validator.ml | 25 ++--- src/node/shell/peer_validator.mli | 2 +- src/node/shell/prevalidation.ml | 61 +++++++------ src/node/shell/prevalidation.mli | 10 +- src/node/shell/prevalidator.ml | 91 ++++++++++--------- src/node/shell/prevalidator.mli | 4 +- src/node/shell/validator.ml | 3 - src/node/shell/validator.mli | 2 +- src/utils/error_monad.ml | 1 + test/proto_alpha/proto_alpha_helpers.ml | 5 +- test/proto_alpha/proto_alpha_helpers.mli | 2 +- test/proto_alpha/test_endorsement.ml | 4 +- 22 files changed, 192 insertions(+), 261 deletions(-) diff --git a/src/client/client_node_rpcs.ml b/src/client/client_node_rpcs.ml index 155faf6ac..8ba105914 100644 --- a/src/client/client_node_rpcs.ml +++ b/src/client/client_node_rpcs.ml @@ -18,12 +18,6 @@ let errors cctxt = let forge_block_header cctxt header = call_service0 cctxt Services.forge_block_header header -type operation = Node_rpc_services.operation = - | Blob of Operation.t - | Hash of Operation_hash.t - -let operation_encoding = Node_rpc_services.operation_encoding - let inject_block cctxt ?(async = false) ?(force = false) raw operations = call_err_service0 cctxt Services.inject_block { raw ; blocking = not async ; force ; operations } @@ -64,14 +58,14 @@ module Blocks = struct operations_hash: Operation_list_list_hash.t ; fitness: MBytes.t list ; data: MBytes.t ; - operations: Operation_hash.t list list option ; + operations: (Operation_hash.t * Operation.t) list list option ; protocol: Protocol_hash.t ; test_network: Context.test_network; } type preapply_param = Services.Blocks.preapply_param = { timestamp: Time.t ; proto_header: MBytes.t ; - operations: operation list ; + operations: Operation.t list ; sort_operations: bool ; } type preapply_result = Services.Blocks.preapply_result = { diff --git a/src/client/client_node_rpcs.mli b/src/client/client_node_rpcs.mli index 5fdf17909..30aec584f 100644 --- a/src/client/client_node_rpcs.mli +++ b/src/client/client_node_rpcs.mli @@ -17,16 +17,10 @@ val forge_block_header: Block_header.t -> MBytes.t tzresult Lwt.t -type operation = - | Blob of Operation.t - | Hash of Operation_hash.t - -val operation_encoding: operation Data_encoding.t - val inject_block: config -> ?async:bool -> ?force:bool -> - MBytes.t -> operation list list -> + MBytes.t -> Operation.t list list -> Block_hash.t tzresult Lwt.t (** [inject_block cctxt ?async ?force raw_block] tries to inject [raw_block] inside the node. If [?async] is [true], [raw_block] @@ -85,7 +79,7 @@ module Blocks : sig val pending_operations: config -> block -> - (error Prevalidation.preapply_result * Operation_hash.Set.t) tzresult Lwt.t + (error Prevalidation.preapply_result * Operation.t Operation_hash.Map.t) tzresult Lwt.t type block_info = { hash: Block_hash.t ; @@ -98,7 +92,7 @@ module Blocks : sig operations_hash: Operation_list_list_hash.t ; fitness: MBytes.t list ; data: MBytes.t ; - operations: Operation_hash.t list list option ; + operations: (Operation_hash.t * Operation.t) list list option ; protocol: Protocol_hash.t ; test_network: Context.test_network; } @@ -130,7 +124,7 @@ module Blocks : sig ?timestamp:Time.t -> ?sort:bool -> proto_header:MBytes.t -> - operation list -> preapply_result tzresult Lwt.t + Operation.t list -> preapply_result tzresult Lwt.t end diff --git a/src/client/embedded/alpha/client_baking_forge.ml b/src/client/embedded/alpha/client_baking_forge.ml index eed87c274..09ac2d6ca 100644 --- a/src/client/embedded/alpha/client_baking_forge.ml +++ b/src/client/embedded/alpha/client_baking_forge.ml @@ -52,10 +52,7 @@ let assert_valid_operations_hash shell_header operations = Operation_list_list_hash.compute (List.map Operation_list_hash.compute (List.map - (List.map - (function - | Client_node_rpcs.Blob op -> Tezos_data.Operation.hash op - | Hash oph -> oph)) operations)) in + (List.map Tezos_data.Operation.hash) operations)) in fail_unless (Operation_list_list_hash.equal operations_hash shell_header.Tezos_data.Block_header.operations_hash) @@ -74,7 +71,7 @@ let inject_block cctxt return block_hash type error += - | Failed_to_preapply of Client_node_rpcs.operation * error list + | Failed_to_preapply of Tezos_data.Operation.t * error list let () = register_error_kind @@ -83,16 +80,13 @@ let () = ~title: "Fail to preapply an operation" ~description: "" ~pp:(fun ppf (op, err) -> - let h = - match op with - | Client_node_rpcs.Hash h -> h - | Blob op -> Tezos_data.Operation.hash op in + let h = Tezos_data.Operation.hash op in Format.fprintf ppf "@[Failed to preapply %a:@ %a@]" Operation_hash.pp_short h pp_print_error err) Data_encoding. (obj2 - (req "operation" (dynamic_size Client_node_rpcs.operation_encoding)) + (req "operation" (dynamic_size Tezos_data.Operation.encoding)) (req "error" Node_rpc_services.Error.encoding)) (function | Failed_to_preapply (hash, err) -> Some (hash, err) @@ -112,11 +106,13 @@ let forge_block cctxt block Client_node_rpcs.Blocks.pending_operations cctxt block >>=? fun (ops, pendings) -> let ops = - Operation_hash.Set.elements @@ - Operation_hash.Set.union + List.map snd @@ + Operation_hash.Map.bindings @@ + Operation_hash.Map.fold + Operation_hash.Map.add (Prevalidation.preapply_result_operations ops) pendings in - return (List.map (fun x -> Client_node_rpcs.Hash x) ops) + return ops | Some operations -> return operations end >>=? fun operations -> begin @@ -177,20 +173,7 @@ let forge_block cctxt block && Operation_hash.Map.is_empty result.branch_delayed ) then let operations = if not best_effort then operations - else - let map = - List.fold_left - (fun map op -> - match op with - | Client_node_rpcs.Hash _ -> map - | Blob op -> - Operation_hash.Map.add (Tezos_data.Operation.hash op) op map) - Operation_hash.Map.empty operations in - List.map - (fun h -> - try Client_node_rpcs.Blob (Operation_hash.Map.find h map) - with _ -> Client_node_rpcs.Hash h) - result.applied in + else List.map snd result.applied in inject_block cctxt ?force ~shell_header ~priority ~seed_nonce_hash ~src_sk [operations] @@ -198,18 +181,15 @@ let forge_block cctxt block Lwt.return_error @@ Utils.filter_map (fun op -> - let h = - match op with - | Client_node_rpcs.Hash h -> h - | Blob op -> Tezos_data.Operation.hash op in + let h = Tezos_data.Operation.hash op in try Some (Failed_to_preapply - (op, Operation_hash.Map.find h result.refused)) + (op, snd @@ Operation_hash.Map.find h result.refused)) with Not_found -> try Some (Failed_to_preapply - (op, Operation_hash.Map.find h result.branch_refused)) + (op, snd @@ Operation_hash.Map.find h result.branch_refused)) with Not_found -> try Some (Failed_to_preapply - (op, Operation_hash.Map.find h result.branch_delayed)) + (op, snd @@ Operation_hash.Map.find h result.branch_delayed)) with Not_found -> None) operations @@ -481,9 +461,10 @@ let mine cctxt state = Client_node_rpcs.Blocks.pending_operations cctxt.rpc_config block >>=? fun (res, ops) -> let operations = - let open Operation_hash.Set in - List.map (fun x -> Client_node_rpcs.Hash x) @@ - elements (union ops (Prevalidation.preapply_result_operations res)) in + List.map snd @@ + Operation_hash.Map.bindings @@ + Operation_hash.Map.(fold add) + ops (Prevalidation.preapply_result_operations res) in let request = List.length operations in let proto_header = forge_faked_proto_header ~priority ~seed_nonce_hash in @@ -527,7 +508,7 @@ let mine cctxt state = Client_keys.get_key cctxt delegate >>=? fun (_,_,src_sk) -> inject_block cctxt.rpc_config ~force:true ~shell_header ~priority ~seed_nonce_hash ~src_sk - [List.map (fun h -> Client_node_rpcs.Hash h) operations.applied] + [List.map snd operations.applied] |> trace_exn (Failure "Error while injecting block") >>=? fun block_hash -> State.record_block cctxt level block_hash seed_nonce |> trace_exn (Failure "Error while recording block") >>=? fun () -> diff --git a/src/client/embedded/alpha/client_baking_forge.mli b/src/client/embedded/alpha/client_baking_forge.mli index 5ba475986..cc615e0b6 100644 --- a/src/client/embedded/alpha/client_baking_forge.mli +++ b/src/client/embedded/alpha/client_baking_forge.mli @@ -20,7 +20,7 @@ val inject_block: priority:int -> seed_nonce_hash:Nonce_hash.t -> src_sk:secret_key -> - Client_node_rpcs.operation list list -> + Tezos_data.Operation.t list list -> Block_hash.t tzresult Lwt.t (** [inject_block cctxt blk ?force ~priority ~timestamp ~fitness ~seed_nonce ~src_sk ops] tries to inject a block in the node. If @@ -29,13 +29,13 @@ val inject_block: precomputed). [src_sk] is used to sign the block header. *) type error += - | Failed_to_preapply of Client_node_rpcs.operation * error list + | Failed_to_preapply of Tezos_data.Operation.t * error list val forge_block: Client_rpcs.config -> Client_proto_rpcs.block -> ?force:bool -> - ?operations:Client_node_rpcs.operation list -> + ?operations:Tezos_data.Operation.t list -> ?best_effort:bool -> ?sort:bool -> ?timestamp:Time.t -> diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index d86f3e05b..698c1ec0e 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -764,20 +764,6 @@ let commit_protocol db h p = Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ; return (res <> None) -type operation = - | Blob of Operation.t - | Hash of Operation_hash.t - -let resolve_operation net_db = function - | Blob op -> - fail_unless - (Net_id.equal op.shell.net_id (State.Net.id net_db.net_state)) - (failure "Inconsistent net_id in operation.") >>=? fun () -> - return op - | Hash oph -> - Raw_operation.Table.read net_db.operation_db.table oph >>=? fun op -> - return op - let watch_block_header { block_input } = Watcher.create_stream block_input let watch_operation { operation_input } = diff --git a/src/node/shell/distributed_db.mli b/src/node/shell/distributed_db.mli index 1b3f8a024..45eff9d9c 100644 --- a/src/node/shell/distributed_db.mli +++ b/src/node/shell/distributed_db.mli @@ -38,13 +38,6 @@ val deactivate: net_db -> unit Lwt.t val disconnect: net_db -> P2p.Peer_id.t -> unit Lwt.t -type operation = - | Blob of Operation.t - | Hash of Operation_hash.t - -val resolve_operation: - net_db -> operation -> Operation.t tzresult Lwt.t - val commit_block: net_db -> Block_hash.t -> diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index 8bb722e26..ff2baf695 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -54,7 +54,7 @@ type t = { mainnet_validator: Net_validator.t ; inject_block: ?force:bool -> - MBytes.t -> Distributed_db.operation list list -> + MBytes.t -> Operation.t list list -> (Block_hash.t * unit tzresult Lwt.t) tzresult Lwt.t ; inject_operation: ?force:bool -> MBytes.t -> @@ -151,7 +151,7 @@ module RPC = struct operations_hash: Operation_list_list_hash.t ; fitness: MBytes.t list ; data: MBytes.t ; - operations: Operation_hash.t list list option ; + operations: (Operation_hash.t * Operation.t) list list option ; protocol: Protocol_hash.t ; test_network: Context.test_network; } @@ -159,7 +159,9 @@ module RPC = struct let convert (block: State.Block.t) = let hash = State.Block.hash block in let header = State.Block.header block in - State.Block.all_operation_hashes block >>= fun operations -> + State.Block.all_operations block >>= fun operations -> + let operations = + List.map (List.map (fun op -> (Operation.hash op, op))) operations in State.Block.context block >>= fun context -> Context.get_protocol context >>= fun protocol -> Context.get_test_network context >>= fun test_network -> @@ -279,7 +281,9 @@ module RPC = struct validation_passes = List.length operations ; operations_hash = Operation_list_list_hash.compute - (List.map Operation_list_hash.compute operations) ; + (List.map + (fun ops -> Operation_list_hash.compute (List.map fst ops)) + operations) ; operations = Some operations ; data = MBytes.of_string "" ; net_id = head_header.shell.net_id ; @@ -323,7 +327,6 @@ module RPC = struct | ( `Prevalidation | `Test_prevalidation ) as block -> let validator = get_validator node block in let pv = Net_validator.prevalidator validator in - let net_db = Net_validator.net_db validator in let net_state = Net_validator.net_state validator in Chain.head net_state >>= fun head -> let head_header = State.Block.header head in @@ -339,9 +342,10 @@ module RPC = struct head_header.shell.proto_level else ((head_header.shell.proto_level + 1) mod 256) in - let operation_hashes = + let operation_hashes, operations = let pv_result, _ = Prevalidator.operations pv in - [ pv_result.applied ] in + [ List.map fst pv_result.applied ], + [ List.map snd pv_result.applied ] in let operations_hash = Operation_list_list_hash.compute (List.map Operation_list_hash.compute operation_hashes) in @@ -361,12 +365,7 @@ module RPC = struct proto = MBytes.create 0 ; } ; operation_hashes = (fun () -> Lwt.return operation_hashes) ; - operations = begin fun () -> - Lwt_list.map_p - (Lwt_list.map_p - (Distributed_db.Operation.read_exn net_db)) - operation_hashes - end ; + operations = (fun () -> Lwt.return operations) ; context ; }) @@ -384,7 +383,7 @@ module RPC = struct let validator = get_validator node block in let pv = Net_validator.prevalidator validator in let { Prevalidation.applied }, _ = Prevalidator.operations pv in - Lwt.return [applied] + Lwt.return [List.map fst applied] | `Hash hash -> read_valid_block node hash >>= function | None -> Lwt.return_nil @@ -403,12 +402,9 @@ module RPC = struct State.Block.all_operations block | (`Prevalidation | `Test_prevalidation) as block -> let validator = get_validator node block in - let net_db = Net_validator.net_db validator in let pv = Net_validator.prevalidator validator in let { Prevalidation.applied }, _ = Prevalidator.operations pv in - Lwt_list.map_p - (Distributed_db.Operation.read_exn net_db) applied >>= fun applied -> - Lwt.return [applied] + Lwt.return [List.map snd applied] | `Hash hash -> read_valid_block node hash >>= function | None -> Lwt.return_nil @@ -441,7 +437,7 @@ module RPC = struct | `Hash h -> begin get_validator_per_hash node h >>= function | None -> - Lwt.return (Prevalidation.empty_result, Operation_hash.Set.empty) + Lwt.return (Prevalidation.empty_result, Operation_hash.Map.empty) | Some validator -> let net_state = Net_validator.net_state validator in let prevalidator = Net_validator.prevalidator validator in @@ -482,16 +478,14 @@ module RPC = struct | None -> Lwt.return (error_exn Not_found) | Some data -> return data end >>=? fun predecessor -> - let net_db = Net_validator.net_db node.mainnet_validator in - map_p (Distributed_db.resolve_operation net_db) ops >>=? fun rops -> Prevalidation.start_prevalidation ~proto_header ~predecessor ~timestamp () >>=? fun validation_state -> - let rops = List.map (fun x -> Operation.hash x, x) rops in + let ops = List.map (fun x -> Operation.hash x, x) ops in Prevalidation.prevalidate - validation_state ~sort rops >>= fun (validation_state, r) -> + validation_state ~sort ops >>= fun (validation_state, r) -> let operations_hash = Operation_list_list_hash.compute - [Operation_list_hash.compute r.applied] in + [Operation_list_hash.compute (List.map fst r.applied)] in Prevalidation.end_prevalidation validation_state >>=? fun { fitness ; context } -> let pred_shell_header = State.Block.shell_header predecessor in diff --git a/src/node/shell/node.mli b/src/node/shell/node.mli index 6b44796e5..757c4cb00 100644 --- a/src/node/shell/node.mli +++ b/src/node/shell/node.mli @@ -36,7 +36,7 @@ module RPC : sig val inject_block: t -> ?force:bool -> - MBytes.t -> Distributed_db.operation list list -> + MBytes.t -> Operation.t list list -> (Block_hash.t * unit tzresult Lwt.t) tzresult Lwt.t (** [inject_block node ?force bytes] tries to insert [bytes] (supposedly the serialization of a block header) inside @@ -75,7 +75,7 @@ module RPC : sig t -> (Operation_hash.t * Operation.t) Lwt_stream.t * Watcher.stopper val pending_operations: - t -> block -> (error Prevalidation.preapply_result * Operation_hash.Set.t) Lwt.t + t -> block -> (error Prevalidation.preapply_result * Operation.t Operation_hash.Map.t) Lwt.t val protocols: t -> Protocol_hash.t list Lwt.t @@ -90,7 +90,7 @@ module RPC : sig val preapply: t -> block -> timestamp:Time.t -> proto_header:MBytes.t -> - sort_operations:bool -> Distributed_db.operation list -> + sort_operations:bool -> Operation.t list -> (Block_header.shell_header * error Prevalidation.preapply_result) tzresult Lwt.t val context_dir: diff --git a/src/node/shell/node_rpc_services.ml b/src/node/shell/node_rpc_services.ml index 9ab346827..36cab8992 100644 --- a/src/node/shell/node_rpc_services.ml +++ b/src/node/shell/node_rpc_services.ml @@ -46,21 +46,6 @@ module Error = struct end -type operation = Distributed_db.operation = - | Blob of Operation.t - | Hash of Operation_hash.t - -let operation_encoding = - let open Data_encoding in - union [ - case Operation.encoding - (function Blob op -> Some op | Hash _ -> None) - (fun op -> Blob op) ; - case Operation_hash.encoding - (function Hash oph -> Some oph | Blob _ -> None) - (fun oph -> Hash oph) ; - ] - module Blocks = struct type block = [ @@ -81,12 +66,16 @@ module Blocks = struct operations_hash: Operation_list_list_hash.t ; fitness: MBytes.t list ; data: MBytes.t ; - operations: Operation_hash.t list list option ; + operations: (Operation_hash.t * Operation.t) list list option ; protocol: Protocol_hash.t ; test_network: Context.test_network; } let block_info_encoding = + let operation_encoding = + merge_objs + (obj1 (req "hash" Operation_hash.encoding)) + Operation.encoding in conv (fun { hash ; net_id ; level ; proto_level ; predecessor ; fitness ; timestamp ; protocol ; @@ -110,7 +99,7 @@ module Blocks = struct (merge_objs (obj4 (req "hash" Block_hash.encoding) - (opt "operations" (list (list Operation_hash.encoding))) + (opt "operations" (dynamic_size (list (dynamic_size (list (dynamic_size operation_encoding)))))) (req "protocol" Protocol_hash.encoding) (dft "test_network" Context.test_network_encoding Context.Not_running)) @@ -256,6 +245,10 @@ module Blocks = struct RPC.Path.(block_path / "test_network") let pending_operations = + let operation_encoding = + merge_objs + (obj1 (req "hash" Operation_hash.encoding)) + Operation.encoding in (* TODO: branch_delayed/... *) RPC.service ~description: @@ -263,32 +256,18 @@ module Blocks = struct ~input: empty ~output: (conv - (fun ({ Prevalidation.applied; branch_delayed ; branch_refused }, - unprocessed) -> - (applied, - Operation_hash.Map.bindings branch_delayed, - Operation_hash.Map.bindings branch_refused, - Operation_hash.Set.elements unprocessed)) - (fun (applied, branch_delayed, branch_refused, unprocessed) -> - ({ Prevalidation.applied ; refused = Operation_hash.Map.empty ; - branch_refused = - List.fold_right - (fun (k, o) -> Operation_hash.Map.add k o) - branch_refused Operation_hash.Map.empty ; - branch_delayed = - List.fold_right - (fun (k, o) -> Operation_hash.Map.add k o) - branch_delayed Operation_hash.Map.empty ; - }, - List.fold_right Operation_hash.Set.add - unprocessed Operation_hash.Set.empty)) - (obj4 - (req "applied" (list Operation_hash.encoding)) - (req "branch_delayed" - (list (tup2 Operation_hash.encoding Error.encoding))) - (req "branch_refused" - (list (tup2 Operation_hash.encoding Error.encoding))) - (req "unprocessed" (list Operation_hash.encoding)))) + (fun (preapplied, unprocessed) -> + ({ preapplied with Prevalidation.refused = Operation_hash.Map.empty }, + Operation_hash.Map.bindings unprocessed)) + (fun (preapplied, unprocessed) -> + (preapplied, + List.fold_right + (fun (h, op) m -> Operation_hash.Map.add h op m) + unprocessed Operation_hash.Map.empty)) + (merge_objs + (dynamic_size + (Prevalidation.preapply_result_encoding Error.encoding)) + (obj1 (req "unprocessed" (list (dynamic_size operation_encoding)))))) RPC.Path.(block_path / "pending_operations") let proto_path = @@ -297,7 +276,7 @@ module Blocks = struct type preapply_param = { timestamp: Time.t ; proto_header: MBytes.t ; - operations: operation list ; + operations: Operation.t list ; sort_operations: bool ; } @@ -310,7 +289,7 @@ module Blocks = struct (obj4 (req "timestamp" Time.encoding) (req "proto_header" bytes) - (req "operations" (list (dynamic_size operation_encoding))) + (req "operations" (list (dynamic_size Operation.encoding))) (dft "sort_operations" bool false))) type preapply_result = { @@ -623,7 +602,7 @@ type inject_block_param = { raw: MBytes.t ; blocking: bool ; force: bool ; - operations: operation list list ; + operations: Operation.t list list ; } let inject_block_param = @@ -651,7 +630,7 @@ let inject_block_param = (req "operations" (describe ~description:"..." - (list (list (dynamic_size operation_encoding)))))) + (list (list (dynamic_size Operation.encoding)))))) let inject_block = RPC.service diff --git a/src/node/shell/node_rpc_services.mli b/src/node/shell/node_rpc_services.mli index d1830dbe6..5fb7019e7 100644 --- a/src/node/shell/node_rpc_services.mli +++ b/src/node/shell/node_rpc_services.mli @@ -13,12 +13,6 @@ module Error : sig val wrap: 'a Data_encoding.t -> 'a tzresult Data_encoding.encoding end -type operation = Distributed_db.operation = - | Blob of Operation.t - | Hash of Operation_hash.t - -val operation_encoding: operation Data_encoding.t - module Blocks : sig type block = [ @@ -43,7 +37,7 @@ module Blocks : sig operations_hash: Operation_list_list_hash.t ; fitness: MBytes.t list ; data: MBytes.t ; - operations: Operation_hash.t list list option ; + operations: (Operation_hash.t * Operation.t) list list option ; protocol: Protocol_hash.t ; test_network: Context.test_network; } @@ -79,7 +73,7 @@ module Blocks : sig (unit, unit * block, unit, Context.test_network) RPC.service val pending_operations: (unit, unit * block, unit, - error Prevalidation.preapply_result * Hash.Operation_hash.Set.t) RPC.service + error Prevalidation.preapply_result * Operation.t Operation_hash.Map.t) RPC.service type list_param = { include_ops: bool ; @@ -96,7 +90,7 @@ module Blocks : sig type preapply_param = { timestamp: Time.t ; proto_header: MBytes.t ; - operations: operation list ; + operations: Operation.t list ; sort_operations: bool ; } @@ -183,7 +177,7 @@ type inject_block_param = { raw: MBytes.t ; blocking: bool ; force: bool ; - operations: operation list list ; + operations: Operation.t list list ; } val inject_block: diff --git a/src/node/shell/peer_validator.ml b/src/node/shell/peer_validator.ml index 2b0316158..26326230b 100644 --- a/src/node/shell/peer_validator.ml +++ b/src/node/shell/peer_validator.ml @@ -32,8 +32,8 @@ type t = { notify_bootstrapped: unit -> unit ; mutable bootstrapped: bool ; - mutable last_validated_head: Block_hash.t ; - mutable last_advertised_head: Block_hash.t ; + mutable last_validated_head: Block_header.t ; + mutable last_advertised_head: Block_header.t ; mutable worker: unit Lwt.t ; dropbox: msg Lwt_dropbox.t ; @@ -130,7 +130,7 @@ let may_validate_new_head pv hash header = Block_hash.pp_short hash P2p.Peer_id.pp_short pv.peer_id >>= fun () -> set_bootstrapped pv ; - pv.last_validated_head <- hash ; + pv.last_validated_head <- header ; return () | false -> lwt_log_info @@ -246,9 +246,10 @@ let create let canceler = Canceler.create () in let dropbox = Lwt_dropbox.create () in let net_state = Distributed_db.net_state net_db in - let genesis = (State.Net.genesis net_state).block in + State.Block.read_exn net_state + (State.Net.genesis net_state).block >>= fun genesis -> let rec notify_new_block block = - pv.last_validated_head <- State.Block.hash block ; + pv.last_validated_head <- State.Block.header block ; external_notify_new_block block and pv = { block_validator ; @@ -261,8 +262,8 @@ let create net_db ; peer_id ; bootstrapped = false ; - last_validated_head = genesis ; - last_advertised_head = genesis ; + last_validated_head = State.Block.header genesis ; + last_advertised_head = State.Block.header genesis ; canceler ; dropbox ; worker = Lwt.return_unit ; @@ -282,15 +283,17 @@ let create Lwt.return pv let notify_branch pv locator = - let head, _ = (locator : Block_locator.t :> _ * _) in - let hash = Block_header.hash head in - pv.last_advertised_head <- hash ; + let header, _ = (locator : Block_locator.t :> _ * _) in + let hash = Block_header.hash header in + (* TODO penalize decreasing fitness *) + pv.last_advertised_head <- header ; try Lwt_dropbox.put pv.dropbox (New_branch (hash, locator)) with Lwt_dropbox.Closed -> () let notify_head pv header = let hash = Block_header.hash header in - pv.last_advertised_head <- hash ; + pv.last_advertised_head <- header ; + (* TODO penalize decreasing fitness *) match Lwt_dropbox.peek pv.dropbox with | Some (New_branch _) -> () (* ignore *) | None | Some (New_head _) -> diff --git a/src/node/shell/peer_validator.mli b/src/node/shell/peer_validator.mli index 2426115d5..86338d1a0 100644 --- a/src/node/shell/peer_validator.mli +++ b/src/node/shell/peer_validator.mli @@ -11,7 +11,7 @@ type t val peer_id: t -> P2p.Peer_id.t val bootstrapped: t -> bool -val current_head: t -> Block_hash.t +val current_head: t -> Block_header.t val create: ?notify_new_block: (State.Block.t -> unit) -> diff --git a/src/node/shell/prevalidation.ml b/src/node/shell/prevalidation.ml index 25f94381a..a50393ad6 100644 --- a/src/node/shell/prevalidation.ml +++ b/src/node/shell/prevalidation.ml @@ -9,10 +9,10 @@ type 'error preapply_result = { - applied: Operation_hash.t list; - refused: 'error list Operation_hash.Map.t; - branch_refused: 'error list Operation_hash.Map.t; - branch_delayed: 'error list Operation_hash.Map.t; + applied: (Operation_hash.t * Operation.t) list; + refused: (Operation.t * 'error list) Operation_hash.Map.t; + branch_refused: (Operation.t * 'error list) Operation_hash.Map.t; + branch_delayed: (Operation.t * 'error list) Operation_hash.Map.t; } let empty_result = { @@ -31,7 +31,16 @@ let map_result f r = { let preapply_result_encoding error_encoding = let open Data_encoding in - let refused_encoding = tup2 Operation_hash.encoding error_encoding in + let operation_encoding = + merge_objs + (obj1 (req "hash" Operation_hash.encoding)) + (dynamic_size Operation.encoding) in + let refused_encoding = + merge_objs + (obj1 (req "hash" Operation_hash.encoding)) + (merge_objs + (dynamic_size Operation.encoding) + (obj1 (req "error" error_encoding))) in let build_list map = Operation_hash.Map.bindings map in let build_map list = List.fold_right @@ -47,7 +56,7 @@ let preapply_result_encoding error_encoding = let branch_delayed = build_map branch_delayed in { applied ; refused ; branch_refused ; branch_delayed }) (obj4 - (req "applied" (list Operation_hash.encoding)) + (req "applied" (list operation_encoding)) (req "refused" (list refused_encoding)) (req "branch_refused" (list refused_encoding)) (req "branch_delayed" (list refused_encoding))) @@ -55,15 +64,15 @@ let preapply_result_encoding error_encoding = let preapply_result_operations t = let ops = List.fold_left - (fun acc x -> Operation_hash.Set.add x acc) - Operation_hash.Set.empty t.applied in + (fun acc (h, op) -> Operation_hash.Map.add h op acc) + Operation_hash.Map.empty t.applied in let ops = Operation_hash.Map.fold - (fun x _ acc -> Operation_hash.Set.add x acc) + (fun h (op, _err) acc -> Operation_hash.Map.add h op acc) t.branch_delayed ops in let ops = Operation_hash.Map.fold - (fun x _ acc -> Operation_hash.Set.add x acc) + (fun h (op, _err) acc -> Operation_hash.Map.add h op acc) t.branch_refused ops in ops @@ -75,24 +84,24 @@ let empty_result = let rec apply_operations apply_operation state r ~sort ops = Lwt_list.fold_left_s - (fun (state, r) (hash, op) -> - apply_operation state op >>= function + (fun (state, r) (hash, op, parsed_op) -> + apply_operation state parsed_op >>= function | Ok state -> - let applied = hash :: r.applied in - Lwt.return (state, { r with applied} ) + let applied = (hash, op) :: r.applied in + Lwt.return (state, { r with applied } ) | Error errors -> match classify_errors errors with | `Branch -> let branch_refused = - Operation_hash.Map.add hash errors r.branch_refused in + Operation_hash.Map.add hash (op, errors) r.branch_refused in Lwt.return (state, { r with branch_refused }) | `Permanent -> let refused = - Operation_hash.Map.add hash errors r.refused in + Operation_hash.Map.add hash (op, errors) r.refused in Lwt.return (state, { r with refused }) | `Temporary -> let branch_delayed = - Operation_hash.Map.add hash errors r.branch_delayed in + Operation_hash.Map.add hash (op, errors) r.branch_delayed in Lwt.return (state, { r with branch_delayed })) (state, r) ops >>= fun (state, r) -> @@ -100,7 +109,7 @@ let rec apply_operations apply_operation state r ~sort ops = | _ :: _ when sort -> let rechecked_operations = List.filter - (fun (hash, _) -> Operation_hash.Map.mem hash r.branch_delayed) + (fun (hash, _, _) -> Operation_hash.Map.mem hash r.branch_delayed) ops in let remaining = List.length rechecked_operations in if remaining = 0 || remaining = List.length ops then @@ -155,25 +164,25 @@ type error += Parse_error let prevalidate (State { proto = (module Proto) ; state }) - ~sort ops = + ~sort (ops : (Operation_hash.t * Operation.t) list)= let ops = List.map (fun (h, op) -> - (h, Proto.parse_operation h op |> record_trace Parse_error)) + (h, op, Proto.parse_operation h op |> record_trace Parse_error)) ops in let invalid_ops = Utils.filter_map - (fun (h, op) -> match op with + (fun (h, op, parsed_op) -> match parsed_op with | Ok _ -> None - | Error err -> Some (h, err)) ops + | Error err -> Some (h, op, err)) ops and parsed_ops = Utils.filter_map - (fun (h, op) -> match op with - | Ok op -> Some (h, op) + (fun (h, op, parsed_op) -> match parsed_op with + | Ok parsed_op -> Some (h, op, parsed_op) | Error _ -> None) ops in let sorted_ops = if sort then - let compare (_, op1) (_, op2) = Proto.compare_operations op1 op2 in + let compare (_, _, op1) (_, _, op2) = Proto.compare_operations op1 op2 in List.sort compare parsed_ops else parsed_ops in apply_operations @@ -184,7 +193,7 @@ let prevalidate applied = List.rev r.applied ; branch_refused = List.fold_left - (fun map (h, err) -> Operation_hash.Map.add h err map) + (fun map (h, op, err) -> Operation_hash.Map.add h (op, err) map) r.branch_refused invalid_ops } in Lwt.return (State { proto = (module Proto) ; state }, r) diff --git a/src/node/shell/prevalidation.mli b/src/node/shell/prevalidation.mli index 166060dbb..920721e5e 100644 --- a/src/node/shell/prevalidation.mli +++ b/src/node/shell/prevalidation.mli @@ -8,19 +8,19 @@ (**************************************************************************) type 'error preapply_result = { - applied: Operation_hash.t list; - refused: 'error list Operation_hash.Map.t; + applied: (Operation_hash.t * Operation.t) list; + refused: (Operation.t * 'error list) Operation_hash.Map.t; (* e.g. invalid signature *) - branch_refused: 'error list Operation_hash.Map.t; + branch_refused: (Operation.t * 'error list) Operation_hash.Map.t; (* e.g. insufficent balance *) - branch_delayed: 'error list Operation_hash.Map.t; + branch_delayed: (Operation.t * 'error list) Operation_hash.Map.t; (* e.g. timestamp in the future *) } val empty_result : 'error preapply_result val preapply_result_operations : - 'error preapply_result -> Operation_hash.Set.t + 'error preapply_result -> Operation.t Operation_hash.Map.t val preapply_result_encoding : 'error list Data_encoding.t -> diff --git a/src/node/shell/prevalidator.ml b/src/node/shell/prevalidator.ml index 509f11524..77f1344c5 100644 --- a/src/node/shell/prevalidator.ml +++ b/src/node/shell/prevalidator.ml @@ -24,7 +24,7 @@ let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool = Distributed_db.inject_operation net_db h op >>= fun _ -> Lwt.return_unit end >>= fun () -> - Lwt.return (Operation_hash.Set.add h mempool))) + Lwt.return (Operation_hash.Map.add h op mempool))) mempool operations >>= fun mempool -> State.Block.predecessor block >>= function | None -> assert false @@ -35,7 +35,7 @@ let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool = iter_option maintain_net_db ~f:(fun net_db -> Distributed_db.clear_operations net_db operations) ; List.fold_left - (List.fold_left (fun mempool h -> Operation_hash.Set.remove h mempool)) + (List.fold_left (fun mempool h -> Operation_hash.Map.remove h mempool)) mempool operations in Chain_traversal.new_blocks ~from_block ~to_block >>= fun (ancestor, path) -> @@ -57,8 +57,8 @@ type t = { prevalidate_operations: bool -> Operation.t list -> (Operation_hash.t list * error preapply_result) tzresult Lwt.t ; - operations: unit -> error preapply_result * Operation_hash.Set.t ; - pending: ?block:State.Block.t -> unit -> Operation_hash.Set.t Lwt.t ; + operations: unit -> error preapply_result * Operation.t Operation_hash.Map.t ; + pending: ?block:State.Block.t -> unit -> Operation.t Operation_hash.Map.t Lwt.t ; timestamp: unit -> Time.t ; context: unit -> Updater.validation_result tzresult Lwt.t ; shutdown: unit -> unit Lwt.t ; @@ -84,6 +84,7 @@ let create (start_prevalidation ~predecessor:head ~timestamp:!timestamp () >|= ref) >>= fun validation_state -> let pending = Operation_hash.Table.create 53 in let head = ref head in + let mempool = ref [] in let operations = ref empty_result in Chain_traversal.live_blocks !head @@ -92,7 +93,7 @@ let create let live_blocks = ref live_blocks in let live_operations = ref live_operations in let running_validation = ref Lwt.return_unit in - let unprocessed = ref Operation_hash.Set.empty in + let unprocessed = ref Operation_hash.Map.empty in let broadcast_unprocessed = ref false in let set_validation_state state = @@ -108,24 +109,30 @@ let create Distributed_db.Advertise.current_head net_db ~mempool:ops !head in let handle_unprocessed () = - if Operation_hash.Set.is_empty !unprocessed then + if Operation_hash.Map.is_empty !unprocessed then Lwt.return () else let ops = !unprocessed in let broadcast = !broadcast_unprocessed in - unprocessed := Operation_hash.Set.empty ; + unprocessed := Operation_hash.Map.empty ; broadcast_unprocessed := false ; - let ops = Operation_hash.Set.diff ops !live_operations in - live_operations := Operation_hash.Set.(fold add) !live_operations ops ; + let ops = + Operation_hash.Set.fold + (fun k m -> Operation_hash.Map.remove k m) + !live_operations ops in + live_operations := + Operation_hash.Map.fold + (fun k _ m -> Operation_hash.Set.add k m) + ops !live_operations ; running_validation := begin begin Lwt_list.filter_map_p - (fun h -> - Distributed_db.Operation.read_opt net_db h >>= function - | Some po when Block_hash.Set.mem po.shell.branch !live_blocks -> - Lwt.return_some (h, po) - | Some _ | None -> Lwt.return_none) - (Operation_hash.Set.elements ops) >>= fun rops -> + (fun (h, op) -> + if Block_hash.Set.mem op.Operation.shell.branch !live_blocks then + Lwt.return_some (h, op) + else + Lwt.return_none) + (Operation_hash.Map.bindings ops) >>= fun rops -> (Lwt.return !validation_state >>=? fun validation_state -> (prevalidate validation_state ~sort:true rops >>= return)) >>= function | Ok (state, r) -> Lwt.return (Ok state, r) @@ -133,13 +140,15 @@ let create let r = { empty_result with branch_delayed = - Operation_hash.Set.fold - (fun op m -> Operation_hash.Map.add op err m) + Operation_hash.Map.fold + (fun h op m -> Operation_hash.Map.add h (op, err) m) ops Operation_hash.Map.empty ; } in Lwt.return (!validation_state, r) end >>= fun (state, r) -> let filter_out s m = - List.fold_right Operation_hash.Map.remove s m in + List.fold_right (fun (h, _op) -> Operation_hash.Map.remove h) s m in + let new_ops = List.map fst r.applied in + mempool := List.rev_append new_ops !mempool ; operations := { applied = List.rev_append r.applied !operations.applied ; refused = Operation_hash.Map.empty ; @@ -153,8 +162,8 @@ let create (filter_out r.applied !operations.branch_delayed) r.branch_delayed ; } ; - Chain.set_reversed_mempool net_state !operations.applied >>= fun () -> - if broadcast then broadcast_operation r.applied ; + Chain.set_reversed_mempool net_state !mempool >>= fun () -> + if broadcast then broadcast_operation new_ops ; Lwt_list.iter_s (fun (_op, _exns) -> (* FIXME *) @@ -194,31 +203,31 @@ let create Lwt.return !validation_state >>=? fun validation_state -> prevalidate validation_state ~sort:true rops >>= fun (state, res) -> - let register h = - let op = Operation_hash.Map.find h ops in - live_operations := Operation_hash.Set.add h !live_operations ; + let register h op = + live_operations := + Operation_hash.Set.add h !live_operations ; Distributed_db.inject_operation net_db h op >>=? fun (_ : bool) -> return () in iter_s - (fun h -> - register h >>=? fun () -> + (fun (h, op) -> + register h op >>=? fun () -> + mempool := h :: !mempool ; operations := { !operations with - applied = h :: !operations.applied }; + applied = (h, op) :: !operations.applied }; return () ) res.applied >>=? fun () -> - Chain.set_reversed_mempool - net_state !operations.applied >>= fun () -> - broadcast_operation res.applied ; + Chain.set_reversed_mempool net_state !mempool >>= fun () -> + broadcast_operation (List.map fst res.applied) ; begin if force then iter_p - (fun (h, _exns) -> register h) + (fun (h, (op, _exns)) -> register h op) (Operation_hash.Map.bindings res.branch_delayed) >>=? fun () -> iter_p - (fun (h, _exns) -> register h) + (fun (h, (op, _exns)) -> register h op) (Operation_hash.Map.bindings res.branch_refused) >>=? fun () -> operations := @@ -252,8 +261,8 @@ let create Distributed_db.Operation.fetch ~timeout:operation_timeout net_db ~peer:gid h () >>= function - | Ok _op -> - push_to_worker (`Handle h) ; + | Ok op -> + push_to_worker (`Handle (h, op)) ; Lwt.return_unit | Error [ Distributed_db.Operation.Canceled _ ] -> lwt_debug @@ -276,12 +285,11 @@ let create net_db ~peer:gid op ())) known_ops ; Lwt.return_unit - | `Handle op -> - lwt_debug "register %a" Operation_hash.pp_short op >>= fun () -> - Operation_hash.Table.remove pending op ; + | `Handle (h, op) -> + Operation_hash.Table.remove pending h ; broadcast_unprocessed := true ; - unprocessed := Operation_hash.Set.singleton op ; - lwt_debug "register %a" Operation_hash.pp_short op >>= fun () -> + unprocessed := Operation_hash.Map.singleton h op ; + lwt_debug "register %a" Operation_hash.pp_short h >>= fun () -> Lwt.return_unit | `Flush (new_head : State.Block.t) -> list_pendings @@ -294,9 +302,10 @@ let create >>= fun (new_live_blocks, new_live_operations) -> lwt_debug "flush %a (mempool: %d)" Block_hash.pp_short (State.Block.hash new_head) - (Operation_hash.Set.cardinal new_mempool) >>= fun () -> + (Operation_hash.Map.cardinal new_mempool) >>= fun () -> (* Reset the pre-validation context *) head := new_head ; + mempool := [] ; operations := empty_result ; broadcast_unprocessed := false ; unprocessed := new_mempool ; @@ -376,7 +385,7 @@ let inject_operation pv ?(force = false) (op: Operation.t) = let net_id = State.Net.id (Distributed_db.net_state pv.net_db) in let wrap_error h map = begin - try return (Operation_hash.Map.find h map) + try return (snd (Operation_hash.Map.find h map)) with Not_found -> failwith "unexpected protocol result" end >>=? fun errors -> @@ -385,7 +394,7 @@ let inject_operation pv ?(force = false) (op: Operation.t) = (failure "Prevalidator.inject_operation: invalid network") >>=? fun () -> pv.prevalidate_operations force [op] >>=? function - | ([h], { applied = [h'] }) when Operation_hash.equal h h' -> + | ([h], { applied = [h', _] }) when Operation_hash.equal h h' -> return () | ([h], { refused }) when Operation_hash.Map.cardinal refused = 1 -> diff --git a/src/node/shell/prevalidator.mli b/src/node/shell/prevalidator.mli index b8d8f687b..ec01fc995 100644 --- a/src/node/shell/prevalidator.mli +++ b/src/node/shell/prevalidator.mli @@ -44,7 +44,7 @@ val inject_operation: t -> ?force:bool -> Operation.t -> unit tzresult Lwt.t val flush: t -> State.Block.t -> unit val timestamp: t -> Time.t -val operations: t -> error Prevalidation.preapply_result * Operation_hash.Set.t +val operations: t -> error Prevalidation.preapply_result * Operation.t Operation_hash.Map.t val context: t -> Updater.validation_result tzresult Lwt.t -val pending: ?block:State.Block.t -> t -> Operation_hash.Set.t Lwt.t +val pending: ?block:State.Block.t -> t -> Operation.t Operation_hash.Map.t Lwt.t diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index 43448df15..446972faf 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -62,10 +62,7 @@ let inject_block v ?force bytes operations = | None -> failwith "Cannot parse block header." | Some block -> get v block.shell.net_id >>=? fun nv -> - (* TODO... remove `Distributed_db.operation` - and only accept raw operations ??? *) let validation = - map_p (map_p (Distributed_db.resolve_operation (Net_validator.net_db nv))) operations >>=? fun operations -> Net_validator.validate_block nv ?force hash block operations in return (hash, validation) diff --git a/src/node/shell/validator.mli b/src/node/shell/validator.mli index 03b5ff816..2e2599510 100644 --- a/src/node/shell/validator.mli +++ b/src/node/shell/validator.mli @@ -26,7 +26,7 @@ val get_exn: t -> Net_id.t -> Net_validator.t Lwt.t val inject_block: t -> ?force:bool -> - MBytes.t -> Distributed_db.operation list list -> + MBytes.t -> Operation.t list list -> (Block_hash.t * State.Block.t tzresult Lwt.t) tzresult Lwt.t val watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper diff --git a/src/utils/error_monad.ml b/src/utils/error_monad.ml index dc55766cb..f13df0bd3 100644 --- a/src/utils/error_monad.ml +++ b/src/utils/error_monad.ml @@ -104,6 +104,7 @@ module Make() = struct !error_kinds in let json_encoding = Data_encoding.union cases in let encoding = + Data_encoding.dynamic_size @@ Data_encoding.splitted ~json:json_encoding ~binary: diff --git a/test/proto_alpha/proto_alpha_helpers.ml b/test/proto_alpha/proto_alpha_helpers.ml index 10713c081..ca1ed7fde 100644 --- a/test/proto_alpha/proto_alpha_helpers.ml +++ b/test/proto_alpha/proto_alpha_helpers.ml @@ -325,9 +325,7 @@ module Assert = struct List.exists f errors | _ -> false - let hash = function - | Client_node_rpcs.Hash h -> h - | Blob op -> Tezos_data.Operation.hash op + let hash op = Tezos_data.Operation.hash op let failed_to_preapply ~msg ?op f = Assert.contain_error ~msg ~f:begin function @@ -426,7 +424,6 @@ end module Baking = struct let mine block (contract: Account.t) operations = - let operations = List.map (fun op -> Client_node_rpcs.Blob op) operations in let seed_nonce = match Nonce.of_bytes @@ Sodium.Random.Bigbytes.generate Constants.nonce_length with diff --git a/test/proto_alpha/proto_alpha_helpers.mli b/test/proto_alpha/proto_alpha_helpers.mli index 399b9aa5a..125e2258e 100644 --- a/test/proto_alpha/proto_alpha_helpers.mli +++ b/test/proto_alpha/proto_alpha_helpers.mli @@ -156,7 +156,7 @@ module Assert : sig val failed_to_preapply: msg:string -> - ?op:Client_node_rpcs.operation -> + ?op:Tezos_data.Operation.t -> (Environment.Error_monad.error -> bool) -> 'a tzresult -> unit diff --git a/test/proto_alpha/test_endorsement.ml b/test/proto_alpha/test_endorsement.ml index 2ec559e89..0e4c69a15 100644 --- a/test/proto_alpha/test_endorsement.ml +++ b/test/proto_alpha/test_endorsement.ml @@ -84,13 +84,13 @@ let test_wrong_delegate ~miner contract block = let test_invalid_endorsement_slot contract block = Helpers.Endorse.endorse ~slot:~-1 contract block >>=? fun op -> Helpers.Baking.mine block contract [ op ] >>= fun res -> - Assert.failed_to_preapply ~msg:__LOC__ ~op:(Blob op) begin function + Assert.failed_to_preapply ~msg:__LOC__ ~op begin function | Baking.Invalid_endorsement_slot _ -> true | _ -> false end res ; Helpers.Endorse.endorse ~slot:16 contract block >>=? fun op -> Helpers.Baking.mine block contract [ op ] >>= fun res -> - Assert.failed_to_preapply ~msg:__LOC__ ~op:(Blob op) begin function + Assert.failed_to_preapply ~msg:__LOC__ ~op begin function | Baking.Invalid_endorsement_slot _ -> true | _ -> false end res ;