From c4e65879fcb2791c307b9155bc9e02c9cbe58f8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Wed, 10 Oct 2018 14:32:42 +0800 Subject: [PATCH] Prevalidator/Prevalidation: move existential MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Note that now, the chain_validator is responsible for swapping the prevalidator when a new protocol arrives. Co-authored-by: Raphaël Proust Co-authored-by: Pietro Abate Co-authored-by: Grégoire Henry --- src/lib_shell/block_validator.ml | 1 + src/lib_shell/chain_validator.ml | 64 +- src/lib_shell/prevalidation.ml | 452 ++++---- src/lib_shell/prevalidation.mli | 71 +- src/lib_shell/prevalidator.ml | 1224 ++++++++++++--------- src/lib_shell/prevalidator.mli | 56 +- src/lib_shell/worker_directory.ml | 27 +- src/lib_shell_services/worker_services.ml | 2 +- 8 files changed, 1080 insertions(+), 817 deletions(-) diff --git a/src/lib_shell/block_validator.ml b/src/lib_shell/block_validator.ml index eba341ee3..1c1521c92 100644 --- a/src/lib_shell/block_validator.ml +++ b/src/lib_shell/block_validator.ml @@ -333,6 +333,7 @@ let on_request (* TODO catch other temporary error (e.g. system errors) and do not 'commit' them on disk... *) | Error [Canceled | Unavailable_protocol _] as err -> + (* FIXME: Canceled can escape. Canceled is not registered. BOOM! *) return err | Error errors -> Worker.protect w begin fun () -> diff --git a/src/lib_shell/chain_validator.ml b/src/lib_shell/chain_validator.ml index d24730537..3a3361af3 100644 --- a/src/lib_shell/chain_validator.ml +++ b/src/lib_shell/chain_validator.ml @@ -25,6 +25,8 @@ open Chain_validator_worker_state +module Log = Tezos_stdlib.Logging.Make(struct let name = "node.chain_validator" end) + module Name = struct type t = Chain_id.t let encoding = Chain_id.encoding @@ -72,7 +74,7 @@ module Types = struct mutable child: (state * (unit -> unit Lwt.t (* shutdown *))) option ; - prevalidator: Prevalidator.t option ; + mutable prevalidator: Prevalidator.t option ; active_peers: Peer_validator.t Lwt.t P2p_peer.Table.t ; bootstrapped_peers: unit P2p_peer.Table.t ; } @@ -249,6 +251,16 @@ let broadcast_head w ~previous block = end end +let safe_get_protocol hash = + match Registered_protocol.get hash with + | None -> + (* FIXME. *) + (* This should not happen: it should be handled in the validator. *) + failwith "chain_validator: missing protocol '%a' for the current block." + Protocol_hash.pp_short hash + | Some protocol -> + return protocol + let on_request (type a) w spawn_child (req : a Request.t) : a tzresult Lwt.t = let Request.Validated block = req in let nv = Worker.state w in @@ -266,8 +278,28 @@ let on_request (type a) w spawn_child (req : a Request.t) : a tzresult Lwt.t = may_update_checkpoint nv.parameters.chain_state block >>= fun () -> broadcast_head w ~previous block >>= fun () -> begin match nv.prevalidator with - | Some prevalidator -> - Prevalidator.flush prevalidator block_hash + | Some old_prevalidator -> + State.Block.protocol_hash block >>= fun new_protocol -> + let old_protocol = Prevalidator.protocol_hash old_prevalidator in + begin + if not (Protocol_hash.equal old_protocol new_protocol) then begin + safe_get_protocol new_protocol >>=? fun (module Proto) -> + let (limits, chain_db) = Prevalidator.parameters old_prevalidator in + (* TODO inject in the new prevalidator the operation + from the previous one. *) + Prevalidator.create + limits + (module Proto) + chain_db >>= fun prevalidator -> + nv.prevalidator <- Some prevalidator ; + Prevalidator.shutdown old_prevalidator >>= fun () -> + return_unit + end else begin + Prevalidator.flush old_prevalidator block_hash >>=? fun () -> + return_unit + end + end >>=? fun () -> + return_unit | None -> return_unit end >>=? fun () -> may_switch_test_chain w spawn_child block >>= fun () -> @@ -302,9 +334,20 @@ let on_close w = let on_launch start_prevalidator w _ parameters = Chain.init_head parameters.chain_state >>= fun () -> (if start_prevalidator then - Prevalidator.create - parameters.prevalidator_limits parameters.chain_db >>= fun prevalidator -> - Lwt.return_some prevalidator + State.read_chain_data parameters.chain_state + (fun _ {State.current_head} -> Lwt.return current_head) >>= fun head -> + State.Block.protocol_hash head >>= fun head_hash -> + safe_get_protocol head_hash >>= function + | Ok (module Proto) -> + Prevalidator.create + parameters.prevalidator_limits + (module Proto) + parameters.chain_db >>= fun prevalor -> + Lwt.return_some prevalor + | Error err -> + Log.lwt_log_error "@[Failed to instantiate prevalidator:@ %a@]" + pp_print_error err >>= fun () -> + Lwt.return_none else Lwt.return_none) >>= fun prevalidator -> let valid_block_input = Lwt_watcher.create_input () in let new_head_input = Lwt_watcher.create_input () in @@ -336,12 +379,9 @@ let on_launch start_prevalidator w _ parameters = may_activate_peer_validator w peer_id >>= fun pv -> Peer_validator.notify_head pv block ; (* TODO notify prevalidator only if head is known ??? *) - begin match nv.prevalidator with - | Some prevalidator -> - Prevalidator.notify_operations prevalidator peer_id ops - | None -> () - end ; - Lwt.return_unit + match nv.prevalidator with + | Some prevalidator -> Prevalidator.notify_operations prevalidator peer_id ops + | None -> Lwt.return_unit end; end ; disconnection = begin fun peer_id -> diff --git a/src/lib_shell/prevalidation.ml b/src/lib_shell/prevalidation.ml index 8457845d5..ed365c951 100644 --- a/src/lib_shell/prevalidation.ml +++ b/src/lib_shell/prevalidation.ml @@ -23,10 +23,10 @@ (* *) (*****************************************************************************) -open Preapply_result open Validation_errors let rec apply_operations apply_operation state r max_ops ~sort ops = + let open Preapply_result in Lwt_list.fold_left_s (fun (state, max_ops, r) (hash, op, parsed_op) -> apply_operation state max_ops op parsed_op >>= function @@ -63,30 +63,245 @@ let rec apply_operations apply_operation state r max_ops ~sort ops = | _ -> Lwt.return (state, max_ops, r) -type prevalidation_state = - State : { proto : ('state, 'operation_data) proto ; state : 'state ; - max_number_of_operations : int ; - new_operation_input : ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * - Operation.shell_header * 'operation_data) Lwt_watcher.input ; - } - -> prevalidation_state -and ('state, 'operation_data) proto = - (module Registered_protocol.T - with type P.validation_state = 'state - and type P.operation_data = 'operation_data ) +module type T = sig -let start_prevalidation - ?protocol_data - ~predecessor ~timestamp () = - let { Block_header.shell = - { fitness = predecessor_fitness ; - timestamp = predecessor_timestamp ; - level = predecessor_level } } = - State.Block.header predecessor in + type state + + (** Creates a new prevalidation context w.r.t. the protocol associate to the + predecessor block . When ?protocol_data is passed to this function, it will + be used to create the new block *) + val start_prevalidation : + ?protocol_data: MBytes.t -> + predecessor: State.Block.t -> + timestamp: Time.t -> + unit -> state tzresult Lwt.t + + (** Given a prevalidation context applies a list of operations, + returns a new prevalidation context plus the preapply result containing the + list of operations that cannot be applied to this context *) + val prevalidate : + state -> sort:bool -> + (Operation_hash.t * Operation.t) list -> + (state * error Preapply_result.t) Lwt.t + + val end_prevalidation : + state -> + Tezos_protocol_environment_shell.validation_result tzresult Lwt.t + + val notify_operation : + state -> + error Preapply_result.t -> + unit + + val shutdown_operation_input : + state -> + unit + + val rpc_directory : (state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t + +end + +module Make(Proto : Registered_protocol.T) : T = struct + + type state = + { state : Proto.validation_state ; + max_number_of_operations : int ; + new_operation_input : ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * + Operation.shell_header * Proto.operation_data) Lwt_watcher.input ; + } + + let start_prevalidation + ?protocol_data + ~predecessor ~timestamp () = + let { Block_header.shell = + { fitness = predecessor_fitness ; + timestamp = predecessor_timestamp ; + level = predecessor_level } } = + State.Block.header predecessor in + State.Block.context predecessor >>= fun predecessor_context -> + let predecessor_hash = State.Block.hash predecessor in + Context.reset_test_chain + predecessor_context predecessor_hash + timestamp >>= fun predecessor_context -> + begin + match protocol_data with + | None -> return_none + | Some protocol_data -> + match + Data_encoding.Binary.of_bytes + Proto.block_header_data_encoding + protocol_data + with + | None -> failwith "Invalid block header" + | Some protocol_data -> return_some protocol_data + end >>=? fun protocol_data -> + Proto.begin_construction + ~chain_id: (State.Block.chain_id predecessor) + ~predecessor_context + ~predecessor_timestamp + ~predecessor_fitness + ~predecessor_level + ~predecessor: predecessor_hash + ~timestamp + ?protocol_data + () + >>=? fun state -> + (* FIXME arbitrary value, to be customisable *) + let max_number_of_operations = 1000 in + let new_operation_input = Lwt_watcher.create_input () in + return { state ; max_number_of_operations ; new_operation_input ; } + + let prevalidate + { state ; max_number_of_operations ; new_operation_input ; } + ~sort (ops : (Operation_hash.t * Operation.t) list) = + let ops = + List.map + (fun (h, op) -> + let parsed_op = + match Data_encoding.Binary.of_bytes + Proto.operation_data_encoding + op.Operation.proto with + | None -> error Parse_error + | Some protocol_data -> + Ok ({ shell = op.shell ; protocol_data } : Proto.operation) in + (h, op, parsed_op)) + ops in + let invalid_ops = + List.filter_map + (fun (h, op, parsed_op) -> match parsed_op with + | Ok _ -> None + | Error err -> Some (h, op, err)) ops + and parsed_ops = + List.filter_map + (fun (h, op, parsed_op) -> match parsed_op with + | Ok parsed_op -> Some (h, op, parsed_op) + | Error _ -> None) ops in + ignore invalid_ops; (* FIXME *) + let sorted_ops = + if sort then + let compare (_, _, op1) (_, _, op2) = Proto.compare_operations op1 op2 in + List.sort compare parsed_ops + else parsed_ops in + let apply_operation state max_ops op (parse_op) = + let size = Data_encoding.Binary.length Operation.encoding op in + if max_ops <= 0 then + fail Too_many_operations + else if size > Proto.max_operation_data_length then + fail (Oversized_operation { size ; max = Proto.max_operation_data_length }) + else + Proto.apply_operation state parse_op >>=? fun (state, receipt) -> + return (state, receipt) in + apply_operations + apply_operation + state Preapply_result.empty max_number_of_operations + ~sort sorted_ops >>= fun (state, max_number_of_operations, r) -> + let r = + { r with + applied = List.rev r.applied ; + branch_refused = + List.fold_left + (fun map (h, op, err) -> Operation_hash.Map.add h (op, err) map) + r.branch_refused invalid_ops } in + Lwt.return ({ state ; max_number_of_operations ; new_operation_input ; }, r) + + let end_prevalidation { state } = + Proto.finalize_block state >>=? fun (result, _metadata) -> + return result + + let notify_operation { new_operation_input } result = + let open Preapply_result in + let { applied ; refused ; branch_refused ; branch_delayed } = result in + (* Notify new opperations *) + let map_op kind { Operation.shell ; proto } = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + proto in + kind, shell, protocol_data in + let fold_op kind _k (op, _error) acc = map_op kind op :: acc in + let applied = List.map (map_op `Applied) (List.map snd applied) in + let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in + let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in + let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in + let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in + List.iter (Lwt_watcher.notify new_operation_input) ops + + let shutdown_operation_input { new_operation_input } = + Lwt_watcher.shutdown_input new_operation_input + + let rpc_directory = + let module Proto_services = Block_services.Make(Proto)(Proto) in + + let dir : (state * Error_monad.error Preapply_result.t) RPC_directory.t ref = + ref RPC_directory.empty in + + let gen_register s f = + dir := RPC_directory.gen_register !dir s f in + + gen_register + (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) + begin fun ({ new_operation_input }, current_mempool) params () -> + let open Preapply_result in + let operation_stream, stopper = + Lwt_watcher.create_stream new_operation_input in + (* Convert ops *) + let map_op op = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + op.Operation.proto in + Proto.{ shell = op.shell ; protocol_data } in + let fold_op _k (op, _error) acc = map_op op :: acc in + (* First call : retrieve the current set of op from the mempool *) + let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in + let applied = if params#applied then List.map map_op (List.map snd applied) else [] in + let refused = if params#refused then + Operation_hash.Map.fold fold_op refused [] else [] in + let branch_refused = if params#branch_refused then + Operation_hash.Map.fold fold_op branch_refused [] else [] in + let branch_delayed = if params#branch_delayed then + Operation_hash.Map.fold fold_op branch_delayed [] else [] in + let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in + let current_mempool = ref (Some current_mempool) in + let filter_result = function + | `Applied -> params#applied + | `Refused -> params#branch_refused + | `Branch_refused -> params#refused + | `Branch_delayed -> params#branch_delayed + in + let next () = + match !current_mempool with + | Some mempool -> begin + current_mempool := None ; + Lwt.return_some mempool + end + | None -> begin + Lwt_stream.get operation_stream >>= function + | Some (kind, shell, protocol_data) when filter_result kind -> + (* NOTE: Should the protocol change, a new Prevalidation + * context would be created. Thus, we use the same Proto. *) + let bytes = Data_encoding.Binary.to_bytes_exn + Proto.operation_data_encoding + protocol_data in + let protocol_data = Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + bytes in + Lwt.return_some [ { Proto.shell ; protocol_data } ] + | _ -> Lwt.return_none + end + in + let shutdown () = Lwt_watcher.shutdown stopper in + RPC_answer.return_stream { next ; shutdown } + end ; + + return !dir + +end + +let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = State.Block.context predecessor >>= fun predecessor_context -> Context.get_protocol predecessor_context >>= fun protocol -> - let predecessor_hash = State.Block.hash predecessor in begin match Registered_protocol.get protocol with | None -> @@ -97,106 +312,13 @@ let start_prevalidation | Some protocol -> return protocol end >>=? fun (module Proto) -> - Context.reset_test_chain - predecessor_context predecessor_hash - timestamp >>= fun predecessor_context -> - begin - match protocol_data with - | None -> return_none - | Some protocol_data -> - match - Data_encoding.Binary.of_bytes - Proto.block_header_data_encoding - protocol_data - with - | None -> failwith "Invalid block header" - | Some protocol_data -> return_some protocol_data - end >>=? fun protocol_data -> - Proto.begin_construction - ~chain_id: (State.Block.chain_id predecessor) - ~predecessor_context - ~predecessor_timestamp - ~predecessor_fitness - ~predecessor_level - ~predecessor: predecessor_hash - ~timestamp - ?protocol_data - () - >>=? fun state -> - (* FIXME arbitrary value, to be customisable *) - let max_number_of_operations = 1000 in - let new_operation_input = Lwt_watcher.create_input () in - return (State { proto = (module Proto) ; state ; - max_number_of_operations ; - new_operation_input ; - }) - -let prevalidate - (State { proto = (module Proto) ; state ; - max_number_of_operations ; new_operation_input }) - ~sort (ops : (Operation_hash.t * Operation.t) list) = - let ops = - List.map - (fun (h, op) -> - let parsed_op = - match Data_encoding.Binary.of_bytes - Proto.operation_data_encoding - op.Operation.proto with - | None -> error Parse_error - | Some protocol_data -> - Ok ({ shell = op.shell ; protocol_data } : Proto.operation) in - (h, op, parsed_op)) - ops in - let invalid_ops = - List.filter_map - (fun (h, op, parsed_op) -> match parsed_op with - | Ok _ -> None - | Error err -> Some (h, op, err)) ops - and parsed_ops = - List.filter_map - (fun (h, op, parsed_op) -> match parsed_op with - | Ok parsed_op -> Some (h, op, parsed_op) - | Error _ -> None) ops in - let sorted_ops = - if sort then - let compare (_, _, op1) (_, _, op2) = Proto.compare_operations op1 op2 in - List.sort compare parsed_ops - else parsed_ops in - let apply_operation state max_ops op (parse_op) = - let size = Data_encoding.Binary.length Operation.encoding op in - if max_ops <= 0 then - fail Too_many_operations - else if size > Proto.max_operation_data_length then - fail (Oversized_operation { size ; max = Proto.max_operation_data_length }) - else - Proto.apply_operation state parse_op >>=? fun (state, receipt) -> - return (state, receipt) in - apply_operations - apply_operation - state Preapply_result.empty max_number_of_operations - ~sort sorted_ops >>= fun (state, max_number_of_operations, r) -> - let r = - { r with - applied = List.rev r.applied ; - branch_refused = - List.fold_left - (fun map (h, op, err) -> Operation_hash.Map.add h (op, err) map) - r.branch_refused invalid_ops } in - Lwt.return (State { proto = (module Proto) ; state ; - max_number_of_operations ; new_operation_input }, - r) - -let end_prevalidation (State { proto = (module Proto) ; state }) = - Proto.finalize_block state >>=? fun (result, _metadata) -> - return result - -let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = - start_prevalidation + let module Prevalidation = Make(Proto) in + Prevalidation.start_prevalidation ~protocol_data ~predecessor ~timestamp () >>=? fun validation_state -> let ops = List.map (List.map (fun x -> Operation.hash x, x)) ops in Lwt_list.fold_left_s (fun (validation_state, rs) ops -> - prevalidate + Prevalidation.prevalidate validation_state ~sort ops >>= fun (validation_state, r) -> Lwt.return (validation_state, rs @ [r])) (validation_state, []) ops >>= fun (validation_state, rs) -> @@ -207,7 +329,7 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = Operation_list_hash.compute (List.map fst r.Preapply_result.applied)) rs) in - end_prevalidation validation_state >>=? fun validation_result -> + Prevalidation.end_prevalidation validation_state >>=? fun validation_result -> let pred_shell_header = State.Block.shell_header predecessor in let level = Int32.succ pred_shell_header.level in Block_validator.may_patch_protocol @@ -243,97 +365,3 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = end >>=? fun (context, message) -> Context.hash ?message ~time:timestamp context >>= fun context -> return ({ shell_header with context }, rs) - -let notify_operation (State { proto = (module Proto) ; new_operation_input ; }) result = - let { applied ; refused ; branch_refused ; branch_delayed } = result in - (* Notify new opperations *) - let map_op kind { Operation.shell ; proto } = - let protocol_data = - Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - proto in - kind, shell, protocol_data in - let fold_op kind _k (op, _error) acc = map_op kind op :: acc in - let applied = List.map (map_op `Applied) (List.map snd applied) in - let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in - let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in - let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in - let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in - List.iter (Lwt_watcher.notify new_operation_input) ops - -let shutdown_operation_input (State { new_operation_input }) = - Lwt_watcher.shutdown_input new_operation_input - -let build_rpc_directory protocol = - begin - match Registered_protocol.get protocol with - | None -> - (* FIXME. *) - (* This should not happen: it should be handled in the validator. *) - failwith "Prevalidation: missing protocol '%a' for the current block." - Protocol_hash.pp_short protocol - | Some protocol -> - return protocol - end >>=? fun (module Proto) -> - let module Proto_services = Block_services.Make(Proto)(Proto) in - - let dir : (prevalidation_state * Error_monad.error Preapply_result.t) RPC_directory.t ref = - ref RPC_directory.empty in - - let gen_register s f = - dir := RPC_directory.gen_register !dir s f in - - gen_register - (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) - begin fun ((State { new_operation_input ; proto = (module Next_proto) }), current_mempool) params () -> - let operation_stream, stopper = - Lwt_watcher.create_stream new_operation_input in - (* Convert ops *) - let map_op op = - let protocol_data = - Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - op.Operation.proto in - Proto.{ shell = op.shell ; protocol_data } in - let fold_op _k (op, _error) acc = map_op op :: acc in - (* First call : retrieve the current set of op from the mempool *) - let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in - let applied = if params#applied then List.map map_op (List.map snd applied) else [] in - let refused = if params#refused then - Operation_hash.Map.fold fold_op refused [] else [] in - let branch_refused = if params#branch_refused then - Operation_hash.Map.fold fold_op branch_refused [] else [] in - let branch_delayed = if params#branch_delayed then - Operation_hash.Map.fold fold_op branch_delayed [] else [] in - let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in - let current_mempool = ref (Some current_mempool) in - let filter_result = function - | `Applied -> params#applied - | `Refused -> params#branch_refused - | `Branch_refused -> params#refused - | `Branch_delayed -> params#branch_delayed - in - let next () = - match !current_mempool with - | Some mempool -> begin - current_mempool := None ; - Lwt.return_some mempool - end - | None -> begin - Lwt_stream.get operation_stream >>= function - | Some (kind, shell, protocol_data) when filter_result kind -> - let bytes = Data_encoding.Binary.to_bytes_exn - Next_proto.operation_data_encoding - protocol_data in - let protocol_data = Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - bytes in - Lwt.return_some [ { Proto.shell ; protocol_data } ] - | _ -> Lwt.return_none - end - in - let shutdown () = Lwt_watcher.shutdown stopper in - RPC_answer.return_stream { next ; shutdown } - end ; - - return !dir diff --git a/src/lib_shell/prevalidation.mli b/src/lib_shell/prevalidation.mli index b49c9e44a..a534c393e 100644 --- a/src/lib_shell/prevalidation.mli +++ b/src/lib_shell/prevalidation.mli @@ -28,31 +28,47 @@ consistency. This module is stateless and creates and manupulates the prevalidation_state. *) -type prevalidation_state +module type T = sig -(** Creates a new prevalidation context w.r.t. the protocol associate to the - predecessor block . When ?protocol_data is passed to this function, it will - be used to create the new block *) -val start_prevalidation : - ?protocol_data: MBytes.t -> - predecessor: State.Block.t -> - timestamp: Time.t -> - unit -> prevalidation_state tzresult Lwt.t + type state -(** Given a prevalidation context applies a list of operations, - returns a new prevalidation context plus the preapply result containing the - list of operations that cannot be applied to this context *) -val prevalidate : - prevalidation_state -> sort:bool -> - (Operation_hash.t * Operation.t) list -> - (prevalidation_state * error Preapply_result.t) Lwt.t + (** Creates a new prevalidation context w.r.t. the protocol associate to the + predecessor block . When ?protocol_data is passed to this function, it will + be used to create the new block *) + val start_prevalidation : + ?protocol_data: MBytes.t -> + predecessor: State.Block.t -> + timestamp: Time.t -> + unit -> state tzresult Lwt.t -val end_prevalidation : - prevalidation_state -> - Tezos_protocol_environment_shell.validation_result tzresult Lwt.t + (** Given a prevalidation context applies a list of operations, + returns a new prevalidation context plus the preapply result containing the + list of operations that cannot be applied to this context *) + val prevalidate : + state -> sort:bool -> + (Operation_hash.t * Operation.t) list -> + (state * error Preapply_result.t) Lwt.t -(** Pre-apply creates a new block ( running start_prevalidation, prevalidate and - end_prevalidation), and returns a new block. *) + val end_prevalidation : + state -> + Tezos_protocol_environment_shell.validation_result tzresult Lwt.t + + val notify_operation : + state -> + error Preapply_result.t -> + unit + + val shutdown_operation_input : + state -> + unit + + val rpc_directory : (state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t + +end + +module Make(Proto : Registered_protocol.T) : T + +(** Pre-apply creates a new block and returns it. *) val preapply : predecessor:State.Block.t -> timestamp:Time.t -> @@ -60,16 +76,3 @@ val preapply : sort_operations:bool -> Operation.t list list -> (Block_header.shell_header * error Preapply_result.t list) tzresult Lwt.t - -val notify_operation : - prevalidation_state -> - error Preapply_result.t -> - unit - -val shutdown_operation_input : - prevalidation_state -> - unit - -val build_rpc_directory : - Protocol_hash.t -> - (prevalidation_state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 9157c32e6..42b4af54e 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -31,21 +31,65 @@ type limits = { worker_limits : Worker_types.limits ; } -module Name = struct - type t = Chain_id.t - let encoding = Chain_id.encoding - let base = [ "prevalidator" ] - let pp = Chain_id.pp_short +type name_t = (Chain_id.t * Protocol_hash.t) + +module type T = sig + + module Proto: Registered_protocol.T + val name: name_t + val parameters: limits * Distributed_db.chain_db + module Prevalidation: Prevalidation.T + type types_state = { + chain_db : Distributed_db.chain_db ; + limits : limits ; + mutable predecessor : State.Block.t ; + mutable timestamp : Time.t ; + mutable live_blocks : Block_hash.Set.t ; + mutable live_operations : Operation_hash.Set.t ; + refused : Operation_hash.t Ring.t ; + mutable refusals : error list Operation_hash.Map.t ; + mutable fetching : Operation_hash.Set.t ; + mutable pending : Operation.t Operation_hash.Map.t ; + mutable mempool : Mempool.t ; + mutable in_mempool : Operation_hash.Set.t ; + mutable validation_result : error Preapply_result.t ; + mutable validation_state : Prevalidation.state tzresult ; + mutable advertisement : [ `Pending of Mempool.t | `None ] ; + mutable rpc_directory : types_state RPC_directory.t tzresult Lwt.t lazy_t ; + } + module Name: Worker.NAME with type t = name_t + module Types: Worker.TYPES with type state = types_state + module Worker: Worker.T + with type Event.t = Event.t + and type 'a Request.t = 'a Request.t + and type Request.view = Request.view + and type Types.state = types_state + type worker = Worker.infinite Worker.queue Worker.t + val list_pendings: + ?maintain_chain_db:Distributed_db.chain_db -> + from_block:State.Block.t -> + to_block:State.Block.t -> + Operation.t Operation_hash.Map.t -> + (Operation.t Operation_hash.Map.t * Block_hash.Set.t * Operation_hash.Set.t) Lwt.t + + val worker: worker Lwt.t + end -module Types = struct - (* Invariants: - - an operation is in only one of these sets (map domains): - pv.refusals pv.pending pv.fetching pv.live_operations pv.in_mempool - - pv.in_mempool is the domain of all fields of pv.prevalidation_result - - pv.prevalidation_result.refused = Ø, refused ops are in pv.refused - - the 'applied' operations in pv.validation_result are in reverse order. *) - type state = { +module type ARG = sig + val limits: limits + val chain_db: Distributed_db.chain_db + val chain_id: Chain_id.t +end + +type t = (module T) + +module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct + module Proto = Proto + let name = (Arg.chain_id, Proto.hash) + let parameters = (Arg.limits, Arg.chain_db) + module Prevalidation = Prevalidation.Make(Proto) + type types_state = { chain_db : Distributed_db.chain_db ; limits : limits ; mutable predecessor : State.Block.t ; @@ -59,43 +103,653 @@ module Types = struct mutable mempool : Mempool.t ; mutable in_mempool : Operation_hash.Set.t ; mutable validation_result : error Preapply_result.t ; - mutable validation_state : Prevalidation.prevalidation_state tzresult ; + mutable validation_state : Prevalidation.state tzresult ; mutable advertisement : [ `Pending of Mempool.t | `None ] ; - mutable rpc_directory : state RPC_directory.t tzresult Lwt.t lazy_t ; + mutable rpc_directory : types_state RPC_directory.t tzresult Lwt.t lazy_t ; } - type parameters = limits * Distributed_db.chain_db - include Worker_state + module Name = struct + type t = name_t + let encoding = + Data_encoding.tup2 + Chain_id.encoding + Protocol_hash.encoding + let chain_id_string = + let _: string = Format.flush_str_formatter () in + Chain_id.pp_short Format.str_formatter Arg.chain_id; + Format.flush_str_formatter () + let proto_hash_string = + let _: string = Format.flush_str_formatter () in + Protocol_hash.pp_short Format.str_formatter Proto.hash; + Format.flush_str_formatter () + let base = [ "prevalidator" ; chain_id_string ; proto_hash_string ] + let pp fmt (chain_id, proto_hash) = + Chain_id.pp_short fmt chain_id; + Format.pp_print_string fmt "."; + Protocol_hash.pp_short fmt proto_hash + end - let view (state : state) _ : view = - let domain map = - Operation_hash.Map.fold - (fun elt _ acc -> Operation_hash.Set.add elt acc) - map Operation_hash.Set.empty in - { head = State.Block.hash state.predecessor ; - timestamp = state.timestamp ; - fetching = state.fetching ; - pending = domain state.pending ; - applied = - List.rev - (List.map (fun (h, _) -> h) - state.validation_result.applied) ; - delayed = + module Types = struct + (* Invariants: + - an operation is in only one of these sets (map domains): + pv.refusals pv.pending pv.fetching pv.live_operations pv.in_mempool + - pv.in_mempool is the domain of all fields of pv.prevalidation_result + - pv.prevalidation_result.refused = Ø, refused ops are in pv.refused + - the 'applied' operations in pv.validation_result are in reverse order. *) + type state = types_state + type parameters = limits * Distributed_db.chain_db + + include Worker_state + + let view (state : state) _ : view = + let domain map = + Operation_hash.Map.fold + (fun elt _ acc -> Operation_hash.Set.add elt acc) + map Operation_hash.Set.empty in + { head = State.Block.hash state.predecessor ; + timestamp = state.timestamp ; + fetching = state.fetching ; + pending = domain state.pending ; + applied = + List.rev + (List.map (fun (h, _) -> h) + state.validation_result.applied) ; + delayed = + Operation_hash.Set.union + (domain state.validation_result.branch_delayed) + (domain state.validation_result.branch_refused) } + + end + + module Worker: Worker.T + with type Name.t = Name.t + and type Event.t = Event.t + and type 'a Request.t = 'a Request.t + and type Request.view = Request.view + and type Types.state = Types.state + and type Types.parameters = Types.parameters + = Worker.Make (Name) (Prevalidator_worker_state.Event) + (Prevalidator_worker_state.Request) (Types) + + open Types + + type worker = Worker.infinite Worker.queue Worker.t + + let debug w = + Format.kasprintf (fun msg -> Worker.record_event w (Debug msg)) + + let list_pendings ?maintain_chain_db ~from_block ~to_block old_mempool = + let rec pop_blocks ancestor block mempool = + let hash = State.Block.hash block in + if Block_hash.equal hash ancestor then + Lwt.return mempool + else + State.Block.all_operations block >>= fun operations -> + Lwt_list.fold_left_s + (Lwt_list.fold_left_s (fun mempool op -> + let h = Operation.hash op in + Lwt_utils.may maintain_chain_db + ~f:begin fun chain_db -> + Distributed_db.inject_operation chain_db h op >>= fun _ -> + Lwt.return_unit + end >>= fun () -> + Lwt.return (Operation_hash.Map.add h op mempool))) + mempool operations >>= fun mempool -> + State.Block.predecessor block >>= function + | None -> assert false + | Some predecessor -> pop_blocks ancestor predecessor mempool + in + let push_block mempool block = + State.Block.all_operation_hashes block >|= fun operations -> + Option.iter maintain_chain_db + ~f:(fun chain_db -> + List.iter + (List.iter (Distributed_db.Operation.clear_or_cancel chain_db)) + operations) ; + List.fold_left + (List.fold_left (fun mempool h -> Operation_hash.Map.remove h mempool)) + mempool operations + in + Chain_traversal.new_blocks ~from_block ~to_block >>= fun (ancestor, path) -> + pop_blocks + (State.Block.hash ancestor) + from_block old_mempool >>= fun mempool -> + Lwt_list.fold_left_s push_block mempool path >>= fun new_mempool -> + Chain_traversal.live_blocks + to_block + (State.Block.max_operations_ttl to_block) + >>= fun (live_blocks, live_operations) -> + let new_mempool, outdated = + Operation_hash.Map.partition + (fun _oph op -> + Block_hash.Set.mem op.Operation.shell.branch live_blocks) + new_mempool in + Option.iter maintain_chain_db + ~f:(fun chain_db -> + Operation_hash.Map.iter + (fun oph _op -> Distributed_db.Operation.clear_or_cancel chain_db oph) + outdated) ; + Lwt.return (new_mempool, live_blocks, live_operations) + + let already_handled pv oph = + Operation_hash.Map.mem oph pv.refusals + || Operation_hash.Map.mem oph pv.pending + || Operation_hash.Set.mem oph pv.fetching + || Operation_hash.Set.mem oph pv.live_operations + || Operation_hash.Set.mem oph pv.in_mempool + + let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t = + { Mempool.known_valid = List.map fst r.applied ; + pending = + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + r.branch_delayed @@ + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + r.branch_refused @@ + Operation_hash.Set.empty } + + let merge_validation_results ~old ~neu = + let open Preapply_result in + let merge _key a b = + match a, b with + | None, None -> None + | Some x, None -> Some x + | _, Some y -> Some y in + let filter_out s m = + List.fold_right (fun (h, _op) -> Operation_hash.Map.remove h) s m in + { applied = List.rev_append neu.applied old.applied ; + refused = Operation_hash.Map.empty ; + branch_refused = + Operation_hash.Map.merge merge + (* filtering should not be required if the protocol is sound *) + (filter_out neu.applied old.branch_refused) + neu.branch_refused ; + branch_delayed = + Operation_hash.Map.merge merge + (filter_out neu.applied old.branch_delayed) + neu.branch_delayed } + + let advertise (w : worker) pv mempool = + match pv.advertisement with + | `Pending { Mempool.known_valid ; pending } -> + pv.advertisement <- + `Pending + { known_valid = known_valid @ mempool.Mempool.known_valid ; + pending = Operation_hash.Set.union pending mempool.pending } + | `None -> + pv.advertisement <- `Pending mempool ; + Lwt.async (fun () -> + Lwt_unix.sleep 0.01 >>= fun () -> + Worker.push_request_now w Advertise ; + Lwt.return_unit) + + let handle_unprocessed w pv = + begin match pv.validation_state with + | Error err -> + pv.validation_result <- + { Preapply_result.empty with + branch_delayed = + Operation_hash.Map.fold + (fun h op m -> Operation_hash.Map.add h (op, err) m) + pv.pending Operation_hash.Map.empty } ; + pv.pending <- + Operation_hash.Map.empty ; + Lwt.return_unit + | Ok validation_state -> + match Operation_hash.Map.cardinal pv.pending with + | 0 -> Lwt.return_unit + | n -> debug w "processing %d operations" n ; + Prevalidation.prevalidate validation_state ~sort:true + (Operation_hash.Map.bindings pv.pending) + >>= fun (validation_state, validation_result) -> + pv.validation_state <- + Ok validation_state ; + pv.in_mempool <- + (Operation_hash.Map.fold + (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) + pv.pending @@ + Operation_hash.Map.fold + (fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool) + pv.validation_result.refused @@ + pv.in_mempool) ; + Operation_hash.Map.iter + (fun h (_, errs) -> + Option.iter (Ring.add_and_return_erased pv.refused h) + ~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ; + pv.refusals <- + Operation_hash.Map.add h errs pv.refusals) + pv.validation_result.refused ; + Operation_hash.Map.iter + (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.chain_db oph) + pv.validation_result.refused ; + pv.validation_result <- + merge_validation_results + ~old:pv.validation_result + ~neu:validation_result ; + pv.pending <- + Operation_hash.Map.empty ; + advertise w pv + (mempool_of_prevalidation_result validation_result) ; + Prevalidation.notify_operation validation_state validation_result ; + Lwt.return_unit + end >>= fun () -> + pv.mempool <- + { Mempool.known_valid = + List.rev_map fst pv.validation_result.applied ; + pending = + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + pv.validation_result.branch_delayed @@ + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + pv.validation_result.branch_refused @@ + Operation_hash.Set.empty } ; + State.Current_mempool.set (Distributed_db.chain_state pv.chain_db) + ~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () -> + Lwt.return_unit + + let fetch_operation w pv ?peer oph = + debug w + "fetching operation %a" + Operation_hash.pp_short oph ; + Distributed_db.Operation.fetch + ~timeout:pv.limits.operation_timeout + pv.chain_db ?peer oph () >>= function + | Ok op -> + Worker.push_request_now w (Arrived (oph, op)) ; + Lwt.return_unit + | Error [ Distributed_db.Operation.Canceled _ ] -> + debug w + "operation %a included before being prevalidated" + Operation_hash.pp_short oph ; + Lwt.return_unit + | Error _ -> (* should not happen *) + Lwt.return_unit + + let rpc_directory_of_protocol protocol = + begin + match Registered_protocol.get protocol with + | None -> + (* FIXME. *) + (* This should not happen: it should be handled in the validator. *) + failwith "Prevalidation: missing protocol '%a' for the current block." + Protocol_hash.pp_short protocol + | Some protocol -> + return protocol + end >>=? fun (module Proto) -> + let module Proto_services = Block_services.Make(Proto)(Proto) in + + let dir : state RPC_directory.t ref = ref RPC_directory.empty in + let register s f = + dir := RPC_directory.register !dir s f in + + register + (Proto_services.S.Mempool.pending_operations RPC_path.open_root) + (fun pv () () -> + let map_op op = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + op.Operation.proto in + { Proto.shell = op.shell ; protocol_data } in + let map_op_error (op, error) = (map_op op, error) in + return { + Proto_services.Mempool.applied = + List.map + (fun (hash, op) -> (hash, map_op op)) + (List.rev pv.validation_result.applied) ; + refused = + Operation_hash.Map.map map_op_error pv.validation_result.refused ; + branch_refused = + Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ; + branch_delayed = + Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ; + unprocessed = + Operation_hash.Map.map map_op pv.pending ; + }) ; + + Prevalidation.rpc_directory >>=? fun prevalidation_dir -> + let prevalidation_dir = + RPC_directory.map (fun state -> + match state.validation_state with + | Error _ -> assert false + | Ok pv -> Lwt.return (pv, state.validation_result) + ) prevalidation_dir in + + return (RPC_directory.merge !dir prevalidation_dir) + + module Handlers = struct + + type self = worker + + let on_operation_arrived (pv : state) oph op = + pv.fetching <- Operation_hash.Set.remove oph pv.fetching ; + if not (Block_hash.Set.mem op.Operation.shell.branch pv.live_blocks) then begin + Distributed_db.Operation.clear_or_cancel pv.chain_db oph + (* TODO: put in a specific delayed map ? *) + end else if not (already_handled pv oph) (* prevent double inclusion on flush *) then begin + pv.pending <- Operation_hash.Map.add oph op pv.pending + end + + let on_inject pv op = + let oph = Operation.hash op in + begin + if already_handled pv oph then + return pv.validation_result + else + Lwt.return pv.validation_state >>=? fun validation_state -> + Prevalidation.prevalidate + validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) -> + match result.applied with + | [ app_oph, _ ] when Operation_hash.equal app_oph oph -> + Distributed_db.inject_operation pv.chain_db oph op >>= fun (_ : bool) -> + pv.pending <- Operation_hash.Map.add oph op pv.pending ; + return result + | _ -> + return result + end >>=? fun result -> + if List.mem_assoc oph result.applied then + return_unit + else + let try_in_map map proj or_else = + try + Lwt.return (Error (proj (Operation_hash.Map.find oph map))) + with Not_found -> or_else () in + try_in_map pv.refusals (fun h -> h) @@ fun () -> + try_in_map result.refused snd @@ fun () -> + try_in_map result.branch_refused snd @@ fun () -> + try_in_map result.branch_delayed snd @@ fun () -> + if Operation_hash.Set.mem oph pv.live_operations then + failwith "Injected operation %a included in a previous block." + Operation_hash.pp oph + else + failwith "Injected operation %a is not in prevalidation result." + Operation_hash.pp oph + + let on_notify w pv peer mempool = + let all_ophs = + List.fold_left + (fun s oph -> Operation_hash.Set.add oph s) + mempool.Mempool.pending mempool.known_valid in + let to_fetch = + Operation_hash.Set.filter + (fun oph -> not (already_handled pv oph)) + all_ophs in + pv.fetching <- Operation_hash.Set.union - (domain state.validation_result.branch_delayed) - (domain state.validation_result.branch_refused) } + to_fetch + pv.fetching ; + Operation_hash.Set.iter + (fun oph -> Lwt.ignore_result (fetch_operation w pv ~peer oph)) + to_fetch + + let on_flush w pv predecessor = + list_pendings + ~maintain_chain_db:pv.chain_db + ~from_block:pv.predecessor ~to_block:predecessor + (Preapply_result.operations pv.validation_result) + >>= fun (pending, new_live_blocks, new_live_operations) -> + let timestamp = Time.now () in + Prevalidation.start_prevalidation + ~predecessor ~timestamp () >>= fun validation_state -> + begin match validation_state with + | Error _ -> Lwt.return (validation_state, Preapply_result.empty) + | Ok validation_state -> + Prevalidation.prevalidate + validation_state ~sort:false [] >>= fun (state, result) -> + Lwt.return (Ok state, result) + end >>= fun (validation_state, validation_result) -> + debug w "%d operations were not washed by the flush" + (Operation_hash.Map.cardinal pending) ; + State.Block.protocol_hash pv.predecessor >>= fun old_protocol -> + State.Block.protocol_hash predecessor >>= fun new_protocol -> + pv.predecessor <- predecessor ; + pv.live_blocks <- new_live_blocks ; + pv.live_operations <- new_live_operations ; + pv.timestamp <- timestamp ; + pv.mempool <- { known_valid = [] ; pending = Operation_hash.Set.empty }; + pv.pending <- pending ; + pv.in_mempool <- Operation_hash.Set.empty ; + pv.validation_result <- validation_result ; + pv.validation_state <- validation_state ; + if not (Protocol_hash.equal old_protocol new_protocol) then + pv.rpc_directory <- lazy (rpc_directory_of_protocol new_protocol) ; + return_unit + + let on_advertise pv = + match pv.advertisement with + | `None -> () (* should not happen *) + | `Pending mempool -> + pv.advertisement <- `None ; + Distributed_db.Advertise.current_head pv.chain_db ~mempool pv.predecessor + + let on_request + : type r. worker -> r Request.t -> r tzresult Lwt.t + = fun w request -> + let pv = Worker.state w in + begin match request with + | Request.Flush hash -> + on_advertise pv ; + (* TODO: rebase the advertisement instead *) + let chain_state = Distributed_db.chain_state pv.chain_db in + State.Block.read chain_state hash >>=? fun block -> + begin match pv.validation_state with + | Ok pv -> Prevalidation.shutdown_operation_input pv + | Error _ -> () end ; + on_flush w pv block >>=? fun () -> + return (() : r) + | Request.Notify (peer, mempool) -> + on_notify w pv peer mempool ; + return_unit + | Request.Inject op -> + on_inject pv op + | Request.Arrived (oph, op) -> + on_operation_arrived pv oph op ; + return_unit + | Request.Advertise -> + on_advertise pv ; + return_unit + end >>=? fun r -> + handle_unprocessed w pv >>= fun () -> + return r + + let on_close w = + let pv = Worker.state w in + Operation_hash.Set.iter + (Distributed_db.Operation.clear_or_cancel pv.chain_db) + pv.fetching ; + Lwt.return_unit + + let on_launch w _ (limits, chain_db) = + let chain_state = Distributed_db.chain_state chain_db in + Chain.data chain_state >>= fun + { current_head = predecessor ; current_mempool = mempool ; + live_blocks ; live_operations } -> + State.Block.protocol_hash predecessor >>= fun protocol -> + let timestamp = Time.now () in + Prevalidation.start_prevalidation + ~predecessor ~timestamp () >>= fun validation_state -> + begin match validation_state with + | Error _ -> Lwt.return (validation_state, Preapply_result.empty) + | Ok validation_state -> + Prevalidation.prevalidate validation_state ~sort:false [] + >>= fun (validation_state, validation_result) -> + + Lwt.return (Ok validation_state, validation_result) + end >>= fun (validation_state, validation_result) -> + let fetching = + List.fold_left + (fun s h -> Operation_hash.Set.add h s) + Operation_hash.Set.empty mempool.known_valid in + let pv = + { limits ; chain_db ; + predecessor ; timestamp ; live_blocks ; live_operations ; + mempool = { known_valid = [] ; pending = Operation_hash.Set.empty }; + refused = Ring.create limits.max_refused_operations ; + refusals = Operation_hash.Map.empty ; + fetching ; + pending = Operation_hash.Map.empty ; + in_mempool = Operation_hash.Set.empty ; + validation_result ; validation_state ; + advertisement = `None ; + rpc_directory = lazy (rpc_directory_of_protocol protocol) ; + } in + List.iter + (fun oph -> Lwt.ignore_result (fetch_operation w pv oph)) + mempool.known_valid ; + Lwt.return pv + + let on_error w r st errs = + Worker.record_event w (Event.Request (r, st, Some errs)) ; + match r with + | Request.(View (Inject _)) -> return_unit + | _ -> Lwt.return (Error errs) + + let on_completion w r _ st = + Worker.record_event w (Event.Request (Request.view r, st, None)) ; + Lwt.return_unit + + let on_no_request _ = return_unit + + end + + let table = Worker.create_table Queue + + (* NOTE: we register a single worker for each instantiation of this Make + * functor (and thus a single worker for the single instantiaion of Worker). + * Whislt this is somewhat abusing the intended purpose of worker, it is part + * of a transition plan to a one-worker-per-peer architecture. *) + let worker = + Worker.launch table Arg.limits.worker_limits + name + (Arg.limits, Arg.chain_db) + (module Handlers) end -module Worker = Worker.Make (Name) (Event) (Request) (Types) +module ChainProto_registry = + Registry.Make(struct + type v = t + type t = (Chain_id.t * Protocol_hash.t) + let compare (c1, p1) (c2, p2) = + let pc = Protocol_hash.compare p1 p2 in + if pc = 0 then + Chain_id.compare c1 c2 + else + pc + end) -open Types -type t = Worker.infinite Worker.queue Worker.t -type error += Closed = Worker.Closed +let create limits (module Proto: Registered_protocol.T) chain_db = + let chain_state = Distributed_db.chain_state chain_db in + let chain_id = State.Chain.id chain_state in + match ChainProto_registry.query (chain_id, Proto.hash) with + | None -> + let module Prevalidator = + Make(Proto)(struct + let limits = limits + let chain_db = chain_db + let chain_id = chain_id + end) in + Prevalidator.worker >>= fun _ -> + ChainProto_registry.register Prevalidator.name (module Prevalidator: T); + Lwt.return (module Prevalidator: T) + | Some p -> + Lwt.return p -let debug w = - Format.kasprintf (fun msg -> Worker.record_event w (Debug msg)) +let shutdown (t:t) = + let module Prevalidator: T = (val t) in + Prevalidator.worker >>= fun w -> + ChainProto_registry.remove Prevalidator.name; + Prevalidator.Worker.shutdown w + +let flush (t:t) head = + let module Prevalidator: T = (val t) in + Prevalidator.worker >>= fun w -> + Prevalidator.Worker.push_request_and_wait w (Request.Flush head) + +let notify_operations (t:t) peer mempool = + let module Prevalidator: T = (val t) in + Prevalidator.worker >>= fun w -> + Prevalidator.Worker.push_request w (Request.Notify (peer, mempool)) + +let operations (t:t) = + let module Prevalidator: T = (val t) in + match Lwt.state Prevalidator.worker with + | Lwt.Fail _ | Lwt.Sleep -> + (* FIXME: this shouldn't happen at all, here we return a safe value *) + (Preapply_result.empty, Operation_hash.Map.empty) + | Lwt.Return w -> + let pv = Prevalidator.Worker.state w in + ({ pv.Prevalidator.validation_result with + applied = List.rev pv.validation_result.applied }, + pv.pending) + +let pending ?block (t:t) = + let module Prevalidator: T = (val t) in + Prevalidator.worker >>= fun w -> + let pv = Prevalidator.Worker.state w in + let ops = Preapply_result.operations pv.validation_result in + match block with + | Some to_block -> + Prevalidator.list_pendings + ~from_block:pv.predecessor ~to_block ops >>= fun (pending, _, _) -> + Lwt.return pending + | None -> Lwt.return ops + +let timestamp (t:t) = + let module Prevalidator: T = (val t) in + Prevalidator.worker >>= fun w -> + let pv = Prevalidator.Worker.state w in + Lwt.return pv.timestamp + +let inject_operation (t:t) op = + let module Prevalidator: T = (val t) in + Prevalidator.worker >>= fun w -> + Prevalidator.Worker.push_request_and_wait w (Inject op) + +let status (t:t) = + let module Prevalidator: T = (val t) in + Prevalidator.worker >>= fun w -> + Lwt.return (Prevalidator.Worker.status w) + +let running_workers () = + ChainProto_registry.fold + (fun (id, proto) t acc -> (id, proto, t) :: acc) + [] + +let pending_requests (t:t) = + let module Prevalidator: T = (val t) in + match Lwt.state Prevalidator.worker with + | Lwt.Fail _ | Lwt.Sleep -> + (* FIXME: this shouldn't happen at all, here we return a safe value *) + [] + | Lwt.Return w -> Prevalidator.Worker.pending_requests w + +let current_request (t:t) = + let module Prevalidator: T = (val t) in + match Lwt.state Prevalidator.worker with + | Lwt.Fail _ | Lwt.Sleep -> + (* FIXME: this shouldn't happen at all, here we return a safe value *) + None + | Lwt.Return w -> Prevalidator.Worker.current_request w + +let last_events (t:t) = + let module Prevalidator: T = (val t) in + match Lwt.state Prevalidator.worker with + | Lwt.Fail _ | Lwt.Sleep -> + (* FIXME: this shouldn't happen at all, here we return a safe value *) + [] + | Lwt.Return w -> Prevalidator.Worker.last_events w + +let protocol_hash (t:t) = + let module Prevalidator: T = (val t) in + Prevalidator.Proto.hash + +let parameters (t:t) = + let module Prevalidator: T = (val t) in + Prevalidator.parameters let empty_rpc_directory : unit RPC_directory.t = RPC_directory.register @@ -110,492 +764,6 @@ let empty_rpc_directory : unit RPC_directory.t = unprocessed = Operation_hash.Map.empty ; }) -let rpc_directory protocol = - begin - match Registered_protocol.get protocol with - | None -> - (* FIXME. *) - (* This should not happen: it should be handled in the validator. *) - failwith "Prevalidation: missing protocol '%a' for the current block." - Protocol_hash.pp_short protocol - | Some protocol -> - return protocol - end >>=? fun (module Proto) -> - let module Proto_services = Block_services.Make(Proto)(Proto) in - - let dir : state RPC_directory.t ref = ref RPC_directory.empty in - let register s f = - dir := RPC_directory.register !dir s f in - - register - (Proto_services.S.Mempool.pending_operations RPC_path.open_root) - (fun pv () () -> - let map_op op = - let protocol_data = - Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - op.Operation.proto in - { Proto.shell = op.shell ; protocol_data } in - let map_op_error (op, error) = (map_op op, error) in - return { - Proto_services.Mempool.applied = - List.map - (fun (hash, op) -> (hash, map_op op)) - (List.rev pv.validation_result.applied) ; - refused = - Operation_hash.Map.map map_op_error pv.validation_result.refused ; - branch_refused = - Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ; - branch_delayed = - Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ; - unprocessed = - Operation_hash.Map.map map_op pv.pending ; - }) ; - - Prevalidation.build_rpc_directory protocol >>=? fun prevalidation_dir -> - let prevalidation_dir = - RPC_directory.map (fun state -> - match state.validation_state with - | Error _ -> assert false - | Ok pv -> Lwt.return (pv, state.validation_result) - ) prevalidation_dir in - - return (RPC_directory.merge !dir prevalidation_dir) - -let list_pendings ?maintain_chain_db ~from_block ~to_block old_mempool = - let rec pop_blocks ancestor block mempool = - let hash = State.Block.hash block in - if Block_hash.equal hash ancestor then - Lwt.return mempool - else - State.Block.all_operations block >>= fun operations -> - Lwt_list.fold_left_s - (Lwt_list.fold_left_s (fun mempool op -> - let h = Operation.hash op in - Lwt_utils.may maintain_chain_db - ~f:begin fun chain_db -> - Distributed_db.inject_operation chain_db h op >>= fun _ -> - Lwt.return_unit - end >>= fun () -> - Lwt.return (Operation_hash.Map.add h op mempool))) - mempool operations >>= fun mempool -> - State.Block.predecessor block >>= function - | None -> assert false - | Some predecessor -> pop_blocks ancestor predecessor mempool - in - let push_block mempool block = - State.Block.all_operation_hashes block >|= fun operations -> - Option.iter maintain_chain_db - ~f:(fun chain_db -> - List.iter - (List.iter (Distributed_db.Operation.clear_or_cancel chain_db)) - operations) ; - List.fold_left - (List.fold_left (fun mempool h -> Operation_hash.Map.remove h mempool)) - mempool operations - in - Chain_traversal.new_blocks ~from_block ~to_block >>= fun (ancestor, path) -> - pop_blocks - (State.Block.hash ancestor) - from_block old_mempool >>= fun mempool -> - Lwt_list.fold_left_s push_block mempool path >>= fun new_mempool -> - Chain_traversal.live_blocks - to_block - (State.Block.max_operations_ttl to_block) - >>= fun (live_blocks, live_operations) -> - let new_mempool, outdated = - Operation_hash.Map.partition - (fun _oph op -> - Block_hash.Set.mem op.Operation.shell.branch live_blocks) - new_mempool in - Option.iter maintain_chain_db - ~f:(fun chain_db -> - Operation_hash.Map.iter - (fun oph _op -> Distributed_db.Operation.clear_or_cancel chain_db oph) - outdated) ; - Lwt.return (new_mempool, live_blocks, live_operations) - -let already_handled pv oph = - Operation_hash.Map.mem oph pv.refusals - || Operation_hash.Map.mem oph pv.pending - || Operation_hash.Set.mem oph pv.fetching - || Operation_hash.Set.mem oph pv.live_operations - || Operation_hash.Set.mem oph pv.in_mempool - -let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t = - { Mempool.known_valid = List.map fst r.applied ; - pending = - Operation_hash.Map.fold - (fun k _ s -> Operation_hash.Set.add k s) - r.branch_delayed @@ - Operation_hash.Map.fold - (fun k _ s -> Operation_hash.Set.add k s) - r.branch_refused @@ - Operation_hash.Set.empty } - -let merge_validation_results ~old ~neu = - let open Preapply_result in - let merge _key a b = - match a, b with - | None, None -> None - | Some x, None -> Some x - | _, Some y -> Some y in - let filter_out s m = - List.fold_right (fun (h, _op) -> Operation_hash.Map.remove h) s m in - { applied = List.rev_append neu.applied old.applied ; - refused = Operation_hash.Map.empty ; - branch_refused = - Operation_hash.Map.merge merge - (* filtering should not be required if the protocol is sound *) - (filter_out neu.applied old.branch_refused) - neu.branch_refused ; - branch_delayed = - Operation_hash.Map.merge merge - (filter_out neu.applied old.branch_delayed) - neu.branch_delayed } - -let advertise (w : t) pv mempool = - match pv.advertisement with - | `Pending { Mempool.known_valid ; pending } -> - pv.advertisement <- - `Pending - { known_valid = known_valid @ mempool.Mempool.known_valid ; - pending = Operation_hash.Set.union pending mempool.pending } - | `None -> - pv.advertisement <- `Pending mempool ; - Lwt.async (fun () -> - Lwt_unix.sleep 0.01 >>= fun () -> - Worker.push_request_now w Advertise ; - Lwt.return_unit) - -let handle_unprocessed w pv = - begin match pv.validation_state with - | Error err -> - pv.validation_result <- - { Preapply_result.empty with - branch_delayed = - Operation_hash.Map.fold - (fun h op m -> Operation_hash.Map.add h (op, err) m) - pv.pending Operation_hash.Map.empty } ; - pv.pending <- - Operation_hash.Map.empty ; - Lwt.return_unit - | Ok validation_state -> - match Operation_hash.Map.cardinal pv.pending with - | 0 -> Lwt.return_unit - | n -> debug w "processing %d operations" n ; - Prevalidation.prevalidate validation_state ~sort:true - (Operation_hash.Map.bindings pv.pending) - >>= fun (validation_state, validation_result) -> - pv.validation_state <- - Ok validation_state ; - pv.in_mempool <- - (Operation_hash.Map.fold - (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) - pv.pending @@ - Operation_hash.Map.fold - (fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool) - pv.validation_result.refused @@ - pv.in_mempool) ; - Operation_hash.Map.iter - (fun h (_, errs) -> - Option.iter (Ring.add_and_return_erased pv.refused h) - ~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ; - pv.refusals <- - Operation_hash.Map.add h errs pv.refusals) - pv.validation_result.refused ; - Operation_hash.Map.iter - (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.chain_db oph) - pv.validation_result.refused ; - pv.validation_result <- - merge_validation_results - ~old:pv.validation_result - ~neu:validation_result ; - pv.pending <- - Operation_hash.Map.empty ; - advertise w pv - (mempool_of_prevalidation_result validation_result) ; - Prevalidation.notify_operation validation_state validation_result ; - Lwt.return_unit - end >>= fun () -> - pv.mempool <- - { Mempool.known_valid = - List.rev_map fst pv.validation_result.applied ; - pending = - Operation_hash.Map.fold - (fun k _ s -> Operation_hash.Set.add k s) - pv.validation_result.branch_delayed @@ - Operation_hash.Map.fold - (fun k _ s -> Operation_hash.Set.add k s) - pv.validation_result.branch_refused @@ - Operation_hash.Set.empty } ; - State.Current_mempool.set (Distributed_db.chain_state pv.chain_db) - ~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () -> - Lwt.return_unit - -let fetch_operation w pv ?peer oph = - debug w - "fetching operation %a" - Operation_hash.pp_short oph ; - Distributed_db.Operation.fetch - ~timeout:pv.limits.operation_timeout - pv.chain_db ?peer oph () >>= function - | Ok op -> - Worker.push_request_now w (Arrived (oph, op)) ; - Lwt.return_unit - | Error [ Distributed_db.Operation.Canceled _ ] -> - debug w - "operation %a included before being prevalidated" - Operation_hash.pp_short oph ; - Lwt.return_unit - | Error _ -> (* should not happen *) - Lwt.return_unit - -let on_operation_arrived (pv : state) oph op = - pv.fetching <- Operation_hash.Set.remove oph pv.fetching ; - if not (Block_hash.Set.mem op.Operation.shell.branch pv.live_blocks) then begin - Distributed_db.Operation.clear_or_cancel pv.chain_db oph - (* TODO: put in a specific delayed map ? *) - end else if not (already_handled pv oph) (* prevent double inclusion on flush *) then begin - pv.pending <- Operation_hash.Map.add oph op pv.pending - end - -let on_inject pv op = - let oph = Operation.hash op in - begin - if already_handled pv oph then - return pv.validation_result - else - Lwt.return pv.validation_state >>=? fun validation_state -> - Prevalidation.prevalidate - validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) -> - match result.applied with - | [ app_oph, _ ] when Operation_hash.equal app_oph oph -> - Distributed_db.inject_operation pv.chain_db oph op >>= fun (_ : bool) -> - pv.pending <- Operation_hash.Map.add oph op pv.pending ; - return result - | _ -> - return result - end >>=? fun result -> - if List.mem_assoc oph result.applied then - return_unit - else - let try_in_map map proj or_else = - try - Lwt.return (Error (proj (Operation_hash.Map.find oph map))) - with Not_found -> or_else () in - try_in_map pv.refusals (fun h -> h) @@ fun () -> - try_in_map result.refused snd @@ fun () -> - try_in_map result.branch_refused snd @@ fun () -> - try_in_map result.branch_delayed snd @@ fun () -> - if Operation_hash.Set.mem oph pv.live_operations then - failwith "Injected operation %a included in a previous block." - Operation_hash.pp oph - else - failwith "Injected operation %a is not in prevalidation result." - Operation_hash.pp oph - -let on_notify w pv peer mempool = - let all_ophs = - List.fold_left - (fun s oph -> Operation_hash.Set.add oph s) - mempool.Mempool.pending mempool.known_valid in - let to_fetch = - Operation_hash.Set.filter - (fun oph -> not (already_handled pv oph)) - all_ophs in - pv.fetching <- - Operation_hash.Set.union - to_fetch - pv.fetching ; - Operation_hash.Set.iter - (fun oph -> Lwt.ignore_result (fetch_operation w pv ~peer oph)) - to_fetch - -let on_flush w pv predecessor = - list_pendings - ~maintain_chain_db:pv.chain_db - ~from_block:pv.predecessor ~to_block:predecessor - (Preapply_result.operations pv.validation_result) - >>= fun (pending, new_live_blocks, new_live_operations) -> - let timestamp = Time.now () in - Prevalidation.start_prevalidation - ~predecessor ~timestamp () >>= fun validation_state -> - begin match validation_state with - | Error _ -> Lwt.return (validation_state, Preapply_result.empty) - | Ok validation_state -> - Prevalidation.prevalidate - validation_state ~sort:false [] >>= fun (state, result) -> - Lwt.return (Ok state, result) - end >>= fun (validation_state, validation_result) -> - debug w "%d operations were not washed by the flush" - (Operation_hash.Map.cardinal pending) ; - State.Block.protocol_hash pv.predecessor >>= fun old_protocol -> - State.Block.protocol_hash predecessor >>= fun new_protocol -> - pv.predecessor <- predecessor ; - pv.live_blocks <- new_live_blocks ; - pv.live_operations <- new_live_operations ; - pv.timestamp <- timestamp ; - pv.mempool <- { known_valid = [] ; pending = Operation_hash.Set.empty }; - pv.pending <- pending ; - pv.in_mempool <- Operation_hash.Set.empty ; - pv.validation_result <- validation_result ; - pv.validation_state <- validation_state ; - if not (Protocol_hash.equal old_protocol new_protocol) then - pv.rpc_directory <- lazy (rpc_directory new_protocol) ; - return_unit - -let on_advertise pv = - match pv.advertisement with - | `None -> () (* should not happen *) - | `Pending mempool -> - pv.advertisement <- `None ; - Distributed_db.Advertise.current_head pv.chain_db ~mempool pv.predecessor - -let on_request - : type r. t -> r Request.t -> r tzresult Lwt.t - = fun w request -> - let pv = Worker.state w in - begin match request with - | Request.Flush hash -> - on_advertise pv ; - (* TODO: rebase the advertisement instead *) - let chain_state = Distributed_db.chain_state pv.chain_db in - State.Block.read chain_state hash >>=? fun block -> - begin match pv.validation_state with - | Ok pv -> Prevalidation.shutdown_operation_input pv - | Error _ -> () end ; - on_flush w pv block >>=? fun () -> - return (() : r) - | Request.Notify (peer, mempool) -> - on_notify w pv peer mempool ; - return_unit - | Request.Inject op -> - on_inject pv op - | Request.Arrived (oph, op) -> - on_operation_arrived pv oph op ; - return_unit - | Request.Advertise -> - on_advertise pv ; - return_unit - end >>=? fun r -> - handle_unprocessed w pv >>= fun () -> - return r - -let on_close w = - let pv = Worker.state w in - Operation_hash.Set.iter - (Distributed_db.Operation.clear_or_cancel pv.chain_db) - pv.fetching ; - Lwt.return_unit - -let on_launch w _ (limits, chain_db) = - let chain_state = Distributed_db.chain_state chain_db in - Chain.data chain_state >>= fun - { current_head = predecessor ; current_mempool = mempool ; - live_blocks ; live_operations } -> - State.Block.protocol_hash predecessor >>= fun protocol -> - let timestamp = Time.now () in - Prevalidation.start_prevalidation - ~predecessor ~timestamp () >>= fun validation_state -> - begin match validation_state with - | Error _ -> Lwt.return (validation_state, Preapply_result.empty) - | Ok validation_state -> - Prevalidation.prevalidate validation_state ~sort:false [] - >>= fun (validation_state, validation_result) -> - - Lwt.return (Ok validation_state, validation_result) - end >>= fun (validation_state, validation_result) -> - let fetching = - List.fold_left - (fun s h -> Operation_hash.Set.add h s) - Operation_hash.Set.empty mempool.known_valid in - let pv = - { limits ; chain_db ; - predecessor ; timestamp ; live_blocks ; live_operations ; - mempool = { known_valid = [] ; pending = Operation_hash.Set.empty }; - refused = Ring.create limits.max_refused_operations ; - refusals = Operation_hash.Map.empty ; - fetching ; - pending = Operation_hash.Map.empty ; - in_mempool = Operation_hash.Set.empty ; - validation_result ; validation_state ; - advertisement = `None ; - rpc_directory = lazy (rpc_directory protocol) ; - } in - List.iter - (fun oph -> Lwt.ignore_result (fetch_operation w pv oph)) - mempool.known_valid ; - Lwt.return pv - -let on_error w r st errs = - Worker.record_event w (Event.Request (r, st, Some errs)) ; - match r with - | Request.(View (Inject _)) -> return_unit - | _ -> Lwt.return (Error errs) - -let on_completion w r _ st = - Worker.record_event w (Event.Request (Request.view r, st, None)) ; - Lwt.return_unit - -let table = Worker.create_table Queue - -let create limits chain_db = - let chain_state = Distributed_db.chain_state chain_db in - let module Handlers = struct - type self = t - let on_launch = on_launch - let on_request = on_request - let on_close = on_close - let on_error = on_error - let on_completion = on_completion - let on_no_request _ = return_unit - end in - Worker.launch table limits.worker_limits - (State.Chain.id chain_state) - (limits, chain_db) - (module Handlers) - -let shutdown = Worker.shutdown - -let flush w head = - Worker.push_request_and_wait w (Flush head) - -let notify_operations w peer mempool = - Worker.push_request_now w (Notify (peer, mempool)) - -let operations w = - let pv = Worker.state w in - { pv.validation_result with - applied = List.rev pv.validation_result.applied }, - pv.pending - -let pending ?block w = - let pv = Worker.state w in - let ops = Preapply_result.operations pv.validation_result in - match block with - | Some to_block -> - list_pendings - ~from_block:pv.predecessor ~to_block ops >>= fun (pending, _, _) -> - Lwt.return pending - | None -> Lwt.return ops - -let timestamp w = - let pv = Worker.state w in - pv.timestamp - -let inject_operation w op = - Worker.push_request_and_wait w (Inject op) - -let status = Worker.status - -let running_workers () = Worker.list table - -let pending_requests t = Worker.pending_requests t - -let current_request t = Worker.current_request t - -let last_events = Worker.last_events let rpc_directory : t option RPC_directory.t = RPC_directory.register_dynamic_directory @@ -605,8 +773,10 @@ let rpc_directory : t option RPC_directory.t = | None -> Lwt.return (RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory) - | Some w -> - let pv = Worker.state w in + | Some t -> + let module Prevalidator: T = (val t: T) in + Prevalidator.worker >>= fun w -> + let pv = Prevalidator.Worker.state w in Lazy.force pv.rpc_directory >>= function | Error _ -> Lwt.return RPC_directory.empty diff --git a/src/lib_shell/prevalidator.mli b/src/lib_shell/prevalidator.mli index cf1d8e544..98cabf08e 100644 --- a/src/lib_shell/prevalidator.mli +++ b/src/lib_shell/prevalidator.mli @@ -25,25 +25,29 @@ (** Tezos Shell - Prevalidation of pending operations (a.k.a Mempool) *) -(** The prevalidation worker is in charge of the "mempool" (a.k.a. the +(** The prevalidator is in charge of the "mempool" (a.k.a. the set of known not-invalid-for-sure operations that are not yet included in the blockchain). - The worker also maintains a sorted subset of the mempool that + The prevalidator also maintains a sorted subset of the mempool that might correspond to a valid block on top of the current head. The "in-progress" context produced by the application of those operations is called the (pre)validation context. - Before to include an operation into the mempool, the prevalidation + Before including an operation into the mempool, the prevalidation worker tries to append the operation the prevalidation context. If the operation is (strongly) refused, it will not be added into the mempool and then it will be ignored by the node and never - broadcasted. If the operation is only "branch_refused" or + broadcast. If the operation is only "branch_refused" or "branch_delayed", the operation won't be appended in the - prevalidation context, but still broadcasted. + prevalidation context, but still broadcast. *) + + +(** An (abstract) prevalidator context. Separate prevalidator contexts should be + * used for separate chains (e.g., mainchain vs testchain). *) type t type limits = { @@ -52,29 +56,27 @@ type limits = { worker_limits : Worker_types.limits ; } -type error += Closed of Chain_id.t - -(** Creates a new worker. Each chain is associated with a prevalidator. Typically, - this is the case for the main chain and a test chain *) -val create: limits -> Distributed_db.chain_db -> t Lwt.t - +(** Creates/tear-down a new prevalidator context. *) +val create: + limits -> + (module Registered_protocol.T) -> + Distributed_db.chain_db -> + t Lwt.t val shutdown: t -> unit Lwt.t -(** Notify the prevalidator worker of a set of operations (in the form of a mempool) - received from a peer. *) -val notify_operations: t -> P2p_peer.Id.t -> Mempool.t -> unit +(** Notify the prevalidator that the identified peer has sent a bunch of + * operations relevant to the specified context. *) +val notify_operations: t -> P2p_peer.Id.t -> Mempool.t -> unit Lwt.t -(** Notify the prevalidator worker of a new injected operation. This will be added - to the mempool of the worker *) +(** Notify the prevalidator worker of a new injected operation. *) val inject_operation: t -> Operation.t -> unit tzresult Lwt.t -(** Notify the prevalidator worker that a new head was received. The new head will - cause the reset of the prevalidation context *) +(** Notify the prevalidator that a new head has been selected. *) val flush: t -> Block_hash.t -> unit tzresult Lwt.t (** Returns the timestamp of the prevalidator worker, that is the timestamp of the last reset of the prevalidation context *) -val timestamp: t -> Time.t +val timestamp: t -> Time.t Lwt.t (** Returns the list of valid operations known to this prevalidation worker *) val operations: t -> error Preapply_result.t * Operation.t Operation_hash.Map.t @@ -82,12 +84,22 @@ val operations: t -> error Preapply_result.t * Operation.t Operation_hash.Map.t (** Returns the list of pending operations known to this prevalidation worker *) val pending: ?block:State.Block.t -> t -> Operation.t Operation_hash.Map.t Lwt.t -(** Returns the list of prevalidation workers running and their associated chain *) -val running_workers: unit -> (Chain_id.t * t) list +(** Returns the list of prevalidation contexts running and their associated chain *) +val running_workers: unit -> (Chain_id.t * Protocol_hash.t * t) list + +(** Two functions that are useful for managing the prevalidator's transition + * from one protocol to the next. *) + +(** Returns the hash of the protocol the prevalidator was instantiated with *) +val protocol_hash: t -> Protocol_hash.t + +(** Returns the parameters the prevalidator was created with. *) +val parameters: t -> limits * Distributed_db.chain_db (** Worker status and events *) -val status: t -> Worker_types.worker_status +(* None indicates the there are no workers for the current protocol. *) +val status: t -> Worker_types.worker_status Lwt.t val pending_requests : t -> (Time.t * Prevalidator_worker_state.Request.view) list val current_request : t -> (Time.t * Time.t * Prevalidator_worker_state.Request.view) option val last_events : t -> (Lwt_log_core.level * Prevalidator_worker_state.Event.t list) list diff --git a/src/lib_shell/worker_directory.ml b/src/lib_shell/worker_directory.ml index 7b514689a..5f963c6cc 100644 --- a/src/lib_shell/worker_directory.ml +++ b/src/lib_shell/worker_directory.ml @@ -36,20 +36,29 @@ let build_rpc_directory state = (* Workers : Prevalidators *) register0 Worker_services.Prevalidators.S.list begin fun () () -> - return - (List.map - (fun (id, w) -> (id, Prevalidator.status w)) - (Prevalidator.running_workers ())) + let workers = Prevalidator.running_workers () in + Lwt_list.map_p + (fun (chain_id, _, t) -> + Prevalidator.status t >>= fun status -> + Lwt.return (chain_id, status)) + workers >>= fun info -> + return info end ; register1 Worker_services.Prevalidators.S.state begin fun chain () () -> Chain_directory.get_chain_id state chain >>= fun chain_id -> - let w = List.assoc chain_id (Prevalidator.running_workers ()) in + let workers = Prevalidator.running_workers () in + let (_, _, t) = + (* NOTE: it is technically possible to use the Prevalidator interface to + * register multiple Prevalidator for a single chain (using distinct + * protocols). However, this is never done. *) + List.find (fun (c, _, _) -> Chain_id.equal c chain_id) workers in + Prevalidator.status t >>= fun status -> return - { Worker_types.status = Prevalidator.status w ; - pending_requests = Prevalidator.pending_requests w ; - backlog = Prevalidator.last_events w ; - current_request = Prevalidator.current_request w } + { Worker_types.status = status ; + pending_requests = Prevalidator.pending_requests t ; + backlog = Prevalidator.last_events t ; + current_request = Prevalidator.current_request t } end ; (* Workers : Block_validator *) diff --git a/src/lib_shell_services/worker_services.ml b/src/lib_shell_services/worker_services.ml index 2876aaa26..747a5ce63 100644 --- a/src/lib_shell_services/worker_services.ml +++ b/src/lib_shell_services/worker_services.ml @@ -42,7 +42,7 @@ module Prevalidators = struct let state = RPC_service.get_service - ~description:"Introspect the state of a prevalidator worker." + ~description:"Introspect the state of prevalidator workers." ~query: RPC_query.empty ~output: (Worker_types.full_status_encoding