diff --git a/src/bin_node/node_config_file.ml b/src/bin_node/node_config_file.ml index 0433adee7..356852f8e 100644 --- a/src/bin_node/node_config_file.ml +++ b/src/bin_node/node_config_file.ml @@ -54,6 +54,7 @@ and log = { and shell = { bootstrap_threshold : int ; + block_validator_limits : Node.block_validator_limits ; prevalidator_limits : Node.prevalidator_limits ; timeout : Node.timeout ; } @@ -105,6 +106,15 @@ let default_log = { let default_shell = { bootstrap_threshold = 4 ; + block_validator_limits = { + protocol_timeout = 120. ; + worker_limits = { + backlog_size = 1000 ; + backlog_level = Logging.Debug ; + zombie_lifetime = 3600. ; + zombie_memory = 1800. ; + } + } ; prevalidator_limits = { operation_timeout = 10. ; max_refused_operations = 1000 ; @@ -284,6 +294,23 @@ let worker_limits_encoding let timeout_encoding = Data_encoding.ranged_float 0. 500. +let block_validator_limits_encoding = + let open Data_encoding in + conv + (fun { Node.protocol_timeout ; worker_limits } -> + (protocol_timeout, worker_limits)) + (fun (protocol_timeout, worker_limits) -> + { protocol_timeout ; worker_limits}) + (merge_objs + (obj1 + (dft "protocol_request_timeout" timeout_encoding + default_shell.block_validator_limits.protocol_timeout)) + (worker_limits_encoding + default_shell.block_validator_limits.worker_limits.backlog_size + default_shell.block_validator_limits.worker_limits.backlog_level + default_shell.block_validator_limits.worker_limits.zombie_lifetime + default_shell.block_validator_limits.worker_limits.zombie_memory)) + let prevalidator_limits_encoding = let open Data_encoding in conv @@ -325,13 +352,19 @@ let timeout_encoding = let shell = let open Data_encoding in conv - (fun { bootstrap_threshold ; timeout ; prevalidator_limits } -> - bootstrap_threshold, timeout, prevalidator_limits) - (fun (bootstrap_threshold, timeout, prevalidator_limits) -> - { bootstrap_threshold ; timeout ; prevalidator_limits }) - (obj3 + (fun { bootstrap_threshold ; timeout ; + block_validator_limits ; prevalidator_limits } -> + bootstrap_threshold, timeout, + block_validator_limits, prevalidator_limits) + (fun (bootstrap_threshold, timeout, + block_validator_limits, prevalidator_limits) -> + { bootstrap_threshold ; timeout ; + block_validator_limits ; + prevalidator_limits }) + (obj4 (dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold) (dft "timeout" timeout_encoding default_shell.timeout) + (dft "block_validator" block_validator_limits_encoding default_shell.block_validator_limits) (dft "prevalidator" prevalidator_limits_encoding default_shell.prevalidator_limits) ) @@ -451,6 +484,7 @@ let update ~default:cfg.shell.bootstrap_threshold bootstrap_threshold ; timeout = cfg.shell.timeout ; + block_validator_limits = cfg.shell.block_validator_limits ; prevalidator_limits = cfg.shell.prevalidator_limits ; } in diff --git a/src/bin_node/node_config_file.mli b/src/bin_node/node_config_file.mli index 25002d6f9..917551e8a 100644 --- a/src/bin_node/node_config_file.mli +++ b/src/bin_node/node_config_file.mli @@ -44,6 +44,7 @@ and log = { and shell = { bootstrap_threshold : int ; + block_validator_limits : Node.block_validator_limits ; prevalidator_limits : Node.prevalidator_limits ; timeout : Node.timeout ; } diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index edc7ddd6a..040905929 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -173,6 +173,7 @@ let init_node ?sandbox (config : Node_config_file.t) = Node.create node_config config.shell.timeout + config.shell.block_validator_limits config.shell.prevalidator_limits let () = diff --git a/src/lib_node_shell/block_validator.ml b/src/lib_node_shell/block_validator.ml index 6b21a97fe..c448b608d 100644 --- a/src/lib_node_shell/block_validator.ml +++ b/src/lib_node_shell/block_validator.ml @@ -7,294 +7,58 @@ (* *) (**************************************************************************) -include Logging.Make(struct let name = "node.validator.block" end) - -type 'a request = - | Request_validation: { - net_db: Distributed_db.net_db ; - notify_new_block: State.Block.t -> unit ; - canceler: Lwt_canceler.t option ; - peer: P2p.Peer_id.t option ; - hash: Block_hash.t ; - header: Block_header.t ; - operations: Operation.t list list ; - } -> State.Block.t tzresult request - -type message = Message: 'a request * 'a Lwt.u option -> message - -type t = { - protocol_validator: Protocol_validator.t ; - protocol_timeout: float ; - mutable worker: unit Lwt.t ; - messages: message Lwt_pipe.t ; - canceler: Lwt_canceler.t ; -} - -(** Block validation *) - -type block_error = - | Cannot_parse_operation of Operation_hash.t - | Invalid_fitness of { expected: Fitness.t ; found: Fitness.t } - | Non_increasing_timestamp - | Non_increasing_fitness - | Invalid_level of { expected: Int32.t ; found: Int32.t } - | Invalid_proto_level of { expected: int ; found: int } - | Replayed_operation of Operation_hash.t - | Outdated_operation of - { operation: Operation_hash.t; - originating_block: Block_hash.t } - | Expired_network of - { net_id: Net_id.t ; - expiration: Time.t ; - timestamp: Time.t ; - } - | Unexpected_number_of_validation_passes of int (* uint8 *) - | Too_many_operations of { pass: int; found: int; max: int } - | Oversized_operation of { operation: Operation_hash.t; - size: int; max: int } - -let block_error_encoding = - let open Data_encoding in - union - [ - case (Tag 0) - (obj2 - (req "error" (constant "cannot_parse_operation")) - (req "operation" Operation_hash.encoding)) - (function Cannot_parse_operation operation -> Some ((), operation) - | _ -> None) - (fun ((), operation) -> Cannot_parse_operation operation) ; - case (Tag 1) - (obj3 - (req "error" (constant "invalid_fitness")) - (req "expected" Fitness.encoding) - (req "found" Fitness.encoding)) - (function - | Invalid_fitness { expected ; found } -> - Some ((), expected, found) - | _ -> None) - (fun ((), expected, found) -> Invalid_fitness { expected ; found }) ; - case (Tag 2) - (obj1 - (req "error" (constant "non_increasing_timestamp"))) - (function Non_increasing_timestamp -> Some () - | _ -> None) - (fun () -> Non_increasing_timestamp) ; - case (Tag 3) - (obj1 - (req "error" (constant "non_increasing_fitness"))) - (function Non_increasing_fitness -> Some () - | _ -> None) - (fun () -> Non_increasing_fitness) ; - case (Tag 4) - (obj3 - (req "error" (constant "invalid_level")) - (req "expected" int32) - (req "found" int32)) - (function - | Invalid_level { expected ; found } -> - Some ((), expected, found) - | _ -> None) - (fun ((), expected, found) -> Invalid_level { expected ; found }) ; - case (Tag 5) - (obj3 - (req "error" (constant "invalid_proto_level")) - (req "expected" uint8) - (req "found" uint8)) - (function - | Invalid_proto_level { expected ; found } -> - Some ((), expected, found) - | _ -> None) - (fun ((), expected, found) -> - Invalid_proto_level { expected ; found }) ; - case (Tag 6) - (obj2 - (req "error" (constant "replayed_operation")) - (req "operation" Operation_hash.encoding)) - (function Replayed_operation operation -> Some ((), operation) - | _ -> None) - (fun ((), operation) -> Replayed_operation operation) ; - case (Tag 7) - (obj3 - (req "error" (constant "outdated_operation")) - (req "operation" Operation_hash.encoding) - (req "originating_block" Block_hash.encoding)) - (function - | Outdated_operation { operation ; originating_block } -> - Some ((), operation, originating_block) - | _ -> None) - (fun ((), operation, originating_block) -> - Outdated_operation { operation ; originating_block }) ; - case (Tag 8) - (obj2 - (req "error" (constant "unexpected_number_of_passes")) - (req "found" uint8)) - (function - | Unexpected_number_of_validation_passes n -> Some ((), n) - | _ -> None) - (fun ((), n) -> Unexpected_number_of_validation_passes n) ; - case (Tag 9) - (obj4 - (req "error" (constant "too_many_operations")) - (req "validation_pass" uint8) - (req "found" uint16) - (req "max" uint16)) - (function - | Too_many_operations { pass ; found ; max } -> - Some ((), pass, found, max) - | _ -> None) - (fun ((), pass, found, max) -> - Too_many_operations { pass ; found ; max }) ; - case (Tag 10) - (obj4 - (req "error" (constant "oversized_operation")) - (req "operation" Operation_hash.encoding) - (req "found" int31) - (req "max" int31)) - (function - | Oversized_operation { operation ; size ; max } -> - Some ((), operation, size, max) - | _ -> None) - (fun ((), operation, size, max) -> - Oversized_operation { operation ; size ; max }) ; - ] - -let pp_block_error ppf = function - | Cannot_parse_operation oph -> - Format.fprintf ppf - "Failed to parse the operation %a." - Operation_hash.pp_short oph - | Invalid_fitness { expected ; found } -> - Format.fprintf ppf - "@[Invalid fitness:@ \ - \ expected %a@ \ - \ found %a@]" - Fitness.pp expected - Fitness.pp found - | Non_increasing_timestamp -> - Format.fprintf ppf "Non increasing timestamp" - | Non_increasing_fitness -> - Format.fprintf ppf "Non increasing fitness" - | Invalid_level { expected ; found } -> - Format.fprintf ppf - "Invalid level:@ \ - \ expected %ld@ \ - \ found %ld" - expected - found - | Invalid_proto_level { expected ; found } -> - Format.fprintf ppf - "Invalid protocol level:@ \ - \ expected %d@ \ - \ found %d" - expected - found - | Replayed_operation oph -> - Format.fprintf ppf - "The operation %a was previously included in the chain." - Operation_hash.pp_short oph - | Outdated_operation { operation ; originating_block } -> - Format.fprintf ppf - "The operation %a is outdated (originated in block: %a)" - Operation_hash.pp_short operation - Block_hash.pp_short originating_block - | Expired_network { net_id ; expiration ; timestamp } -> - Format.fprintf ppf - "The block timestamp (%a) is later than \ - its network expiration date: %a (net: %a)." - Time.pp_hum timestamp - Time.pp_hum expiration - Net_id.pp_short net_id - | Unexpected_number_of_validation_passes n -> - Format.fprintf ppf - "Invalid number of validation passes (found: %d)" - n - | Too_many_operations { pass ; found ; max } -> - Format.fprintf ppf - "Too many operations in validation pass %d (found: %d, max: %d)" - pass found max - | Oversized_operation { operation ; size ; max } -> - Format.fprintf ppf - "Oversized operation %a (size: %d, max: %d)" - Operation_hash.pp_short operation size max - -type error += - | Invalid_block of - { block: Block_hash.t ; error: block_error } - | Unavailable_protocol of - { block: Block_hash.t ; protocol: Protocol_hash.t } - | Inconsistent_operations_hash of - { block: Block_hash.t ; - expected: Operation_list_list_hash.t ; - found: Operation_list_list_hash.t } - +open Block_validator_worker_state +open Block_validator_errors let invalid_block block error = Invalid_block { block ; error } -let () = - Error_monad.register_error_kind - `Permanent - ~id:"validator.invalid_block" - ~title:"Invalid block" - ~description:"Invalid block." - ~pp:begin fun ppf (block, error) -> - Format.fprintf ppf - "@[Invalid block %a@ %a@]" - Block_hash.pp_short block pp_block_error error - end - Data_encoding.(merge_objs - (obj1 (req "invalid_block" Block_hash.encoding)) - block_error_encoding) - (function Invalid_block { block ; error } -> - Some (block, error) | _ -> None) - (fun (block, error) -> - Invalid_block { block ; error }) ; - Error_monad.register_error_kind - `Temporary - ~id:"validator.unavailable_protocol" - ~title:"Missing protocol" - ~description:"The protocol required for validating a block is missing." - ~pp:begin fun ppf (block, protocol) -> - Format.fprintf ppf - "Missing protocol (%a) when validating the block %a." - Protocol_hash.pp_short protocol - Block_hash.pp_short block - end - Data_encoding.( - obj2 - (req "block" Block_hash.encoding) - (req "missing_protocol" Protocol_hash.encoding)) - (function - | Unavailable_protocol { block ; protocol } -> - Some (block, protocol) - | _ -> None) - (fun (block, protocol) -> Unavailable_protocol { block ; protocol }) ; - Error_monad.register_error_kind - `Temporary - ~id:"validator.inconsistent_operations_hash" - ~title:"Invalid merkle tree" - ~description:"The provided list of operations is inconsistent with \ - the block header." - ~pp:begin fun ppf (block, expected, found) -> - Format.fprintf ppf - "@[The provided list of operations for block %a \ - \ is inconsistent with the block header@ \ - \ expected: %a@ \ - \ found: %a@]" - Block_hash.pp_short block - Operation_list_list_hash.pp_short expected - Operation_list_list_hash.pp_short found - end - Data_encoding.( - obj3 - (req "block" Block_hash.encoding) - (req "expected" Operation_list_list_hash.encoding) - (req "found" Operation_list_list_hash.encoding)) - (function - | Inconsistent_operations_hash { block ; expected ; found } -> - Some (block, expected, found) - | _ -> None) - (fun (block, expected, found) -> - Inconsistent_operations_hash { block ; expected ; found }) +type limits = { + protocol_timeout: float ; + worker_limits : Worker_types.limits ; +} + +module Name = struct + type t = unit + let encoding = Data_encoding.empty + let base = [ "validator.block" ] + let pp _ () = () +end + +module Types = struct + include Worker_state + type state = { + protocol_validator: Protocol_validator.t ; + limits : limits ; + } + type parameters = limits * Distributed_db.t + let view _state _parameters = () +end + +module Request = struct + include Request + type 'a t = + | Request_validation : { + net_db: Distributed_db.net_db ; + notify_new_block: State.Block.t -> unit ; + canceler: Lwt_canceler.t option ; + peer: P2p.Peer_id.t option ; + hash: Block_hash.t ; + header: Block_header.t ; + operations: Operation.t list list ; + } -> State.Block.t tzresult t + let view + : type a. a t -> view + = fun (Request_validation { net_db ; peer ; hash }) -> + let net_id = net_db |> Distributed_db.net_state |> State.Net.id in + { net_id ; block = hash ; peer = peer } +end + +module Worker = Worker.Make (Name) (Event) (Request) (Types) + +type t = Worker.infinite Worker.queue Worker.t +type error += Closed = Worker.Closed + +let debug w = + Format.kasprintf (fun msg -> Worker.record_event w (Debug msg)) let check_header (pred: State.Block.t) hash (header: Block_header.t) = @@ -442,137 +206,124 @@ let get_proto pred hash = protocol = pred_protocol_hash }) | Some p -> return p -let rec worker_loop bv = - begin - Lwt_utils.protect ~canceler:bv.canceler begin fun () -> - Lwt_pipe.pop bv.messages >>= return - end >>=? fun (Message (request, wakener)) -> - let may_wakeup = - match wakener with - | None -> (fun _ -> ()) - | Some wakener -> (fun v -> Lwt.wakeup_later wakener v) - in - match request with - | Request_validation { net_db ; notify_new_block ; canceler ; - peer ; hash ; header ; operations } -> - let net_state = Distributed_db.net_state net_db in - State.Block.read_opt net_state hash >>= function - | Some block -> - lwt_debug "previously validated block %a (after pipe)" - Block_hash.pp_short hash >>= fun () -> - Protocol_validator.prefetch_and_compile_protocols - bv.protocol_validator - ?peer ~timeout:bv.protocol_timeout - block ; - may_wakeup (Ok block) ; - return () +let on_request + : type r. t -> r Request.t -> r tzresult Lwt.t + = fun w + (Request.Request_validation + { net_db ; notify_new_block ; canceler ; + peer ; hash ; header ; operations }) -> + let bv = Worker.state w in + let net_state = Distributed_db.net_state net_db in + State.Block.read_opt net_state hash >>= function + | Some block -> + debug w "previously validated block %a (after pipe)" + Block_hash.pp_short hash ; + Protocol_validator.prefetch_and_compile_protocols + bv.protocol_validator + ?peer ~timeout:bv.limits.protocol_timeout + block ; + return (Ok block) + | None -> + State.Block.read_invalid net_state hash >>= function + | Some { errors } -> + return (Error errors) | None -> - State.Block.read_invalid net_state hash >>= function - | Some { errors } -> - may_wakeup (Error errors) ; - return () - | None -> - begin - lwt_debug "validating block %a" - Block_hash.pp_short hash >>= fun () -> - State.Block.read - net_state header.shell.predecessor >>=? fun pred -> - get_proto pred hash >>=? fun proto -> - (* TODO also protect with [bv.canceler]. *) - Lwt_utils.protect ?canceler begin fun () -> - apply_block - (Distributed_db.net_state net_db) - pred proto hash header operations - end - end >>= function - | Ok result -> begin - Lwt_utils.protect ~canceler:bv.canceler begin fun () -> - Distributed_db.commit_block - net_db hash header operations result - end >>=? function - | None -> - assert false (* should not happen *) - | Some block -> - lwt_log_info "validated block %a" - Block_hash.pp_short hash >>= fun () -> - Protocol_validator.prefetch_and_compile_protocols - bv.protocol_validator - ?peer ~timeout:bv.protocol_timeout - block ; - may_wakeup (Ok block) ; - notify_new_block block ; - return () - end - (* TODO catch other temporary error (e.g. system errors) - and do not 'commit' them on disk... *) - | Error [Lwt_utils.Canceled | Unavailable_protocol _] as err -> - may_wakeup err ; - return () - | Error errors as err -> - lwt_log_error "@[Received invalid block %a:@ %a@]" - Block_hash.pp_short hash - Error_monad.pp_print_error errors >>= fun () -> - Lwt_utils.protect ~canceler:bv.canceler begin fun () -> - Distributed_db.commit_invalid_block - net_db hash header errors - end >>=? fun commited -> - assert commited ; - may_wakeup err ; - return () - end >>= function - | Ok () -> - worker_loop bv - | Error [Exn (Unix.Unix_error _) as err] -> - lwt_log_error "validation failed with %a" - pp_print_error [err] >>= fun () -> - worker_loop bv - | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> - lwt_log_notice "terminating" >>= fun () -> - Lwt.return_unit - | Error err -> - lwt_log_error "@[Unexpected error:@ %a@]" - pp_print_error err >>= fun () -> - Lwt_canceler.cancel bv.canceler >>= fun () -> - Lwt.return_unit + begin + debug w "validating block %a" Block_hash.pp_short hash ; + State.Block.read + net_state header.shell.predecessor >>=? fun pred -> + get_proto pred hash >>=? fun proto -> + (* TODO also protect with [Worker.canceler w]. *) + Lwt_utils.protect ?canceler begin fun () -> + apply_block + (Distributed_db.net_state net_db) + pred proto hash header operations + end + end >>= function + | Ok result -> begin + Worker.protect w begin fun () -> + Distributed_db.commit_block + net_db hash header operations result + end >>=? function + | None -> + assert false (* should not happen *) + | Some block -> + Protocol_validator.prefetch_and_compile_protocols + bv.protocol_validator + ?peer ~timeout:bv.limits.protocol_timeout + block ; + notify_new_block block ; + return (Ok block) + end + (* TODO catch other temporary error (e.g. system errors) + and do not 'commit' them on disk... *) + | Error [Lwt_utils.Canceled | Unavailable_protocol _] as err -> + return err + | Error errors -> + Worker.protect w begin fun () -> + Distributed_db.commit_invalid_block + net_db hash header errors + end >>=? fun commited -> + assert commited ; + return (Error errors) -let create ~protocol_timeout db = +let on_launch _ _ (limits, db) = let protocol_validator = Protocol_validator.create db in - let canceler = Lwt_canceler.create () in - let messages = Lwt_pipe.create () in - let bv = { - protocol_validator ; - protocol_timeout ; - canceler ; messages ; - worker = Lwt.return_unit } in - Lwt_canceler.on_cancel bv.canceler begin fun () -> - Lwt_pipe.close bv.messages ; - Lwt.return_unit - end ; - bv.worker <- - Lwt_utils.worker "block_validator" - ~run:(fun () -> worker_loop bv) - ~cancel:(fun () -> Lwt_canceler.cancel bv.canceler) ; - bv + Lwt.return { Types.protocol_validator ; limits } -let shutdown { canceler ; worker } = - Lwt_canceler.cancel canceler >>= fun () -> - worker +let on_error w r st errs = + Worker.record_event w (Validation_failure (r, st, errs)) ; + Lwt.return (Error errs) -let validate { messages ; protocol_validator ; protocol_timeout } +let on_completion + : type a. t -> a Request.t -> a -> Worker_types.request_status -> unit Lwt.t + = fun w (Request.Request_validation _ as r) v st -> + match v with + | Ok _ -> + Worker.record_event w + (Event.Validation_success (Request.view r, st)) ; + Lwt.return () + | Error errs -> + Worker.record_event w + (Event.Validation_failure (Request.view r, st, errs)) ; + Lwt.return () + +let table = Worker.create_table Queue + +let create limits db = + let module Handlers = struct + type self = t + let on_launch = on_launch + let on_request = on_request + let on_close _ = Lwt.return () + let on_error = on_error + let on_completion = on_completion + let on_no_request _ = return () + end in + Worker.launch + table + limits.worker_limits + () + (limits, db) + (module Handlers) + +let shutdown = Worker.shutdown + +let validate w ?canceler ?peer ?(notify_new_block = fun _ -> ()) net_db hash (header : Block_header.t) operations = + let bv = Worker.state w in let net_state = Distributed_db.net_state net_db in State.Block.read_opt net_state hash >>= function | Some block -> - lwt_debug "previously validated block %a (before pipe)" - Block_hash.pp_short hash >>= fun () -> + debug w "previously validated block %a (before pipe)" + Block_hash.pp_short hash ; Protocol_validator.prefetch_and_compile_protocols - protocol_validator - ?peer ~timeout:protocol_timeout + bv.protocol_validator + ?peer ~timeout:bv.limits.protocol_timeout block ; return block | None -> - let res, wakener = Lwt.task () in map_p (map_p (fun op -> let op_hash = Operation.hash op in return op_hash)) @@ -589,14 +340,25 @@ let validate { messages ; protocol_validator ; protocol_timeout } found = computed_hash ; }) >>=? fun () -> check_net_liveness net_db hash header >>=? fun () -> - lwt_debug "pushing validation request for block %a" - Block_hash.pp_short hash >>= fun () -> - Lwt_pipe.push messages - (Message (Request_validation - { net_db ; notify_new_block ; canceler ; - peer ; hash ; header ; operations }, - Some wakener)) >>= fun () -> - res + Worker.push_request_and_wait w + (Request_validation + { net_db ; notify_new_block ; canceler ; + peer ; hash ; header ; operations }) >>=? fun result -> + Lwt.return result -let fetch_and_compile_protocol bv = +let fetch_and_compile_protocol w = + let bv = Worker.state w in Protocol_validator.fetch_and_compile_protocol bv.protocol_validator + +let status = Worker.status + +let running_worker () = + match Worker.list table with + | (_, single) :: _ -> single + | [] -> raise Not_found + +let pending_requests t = Worker.pending_requests t + +let current_request t = Worker.current_request t + +let last_events = Worker.last_events diff --git a/src/lib_node_shell/block_validator.mli b/src/lib_node_shell/block_validator.mli index 7436d14b4..484039f87 100644 --- a/src/lib_node_shell/block_validator.mli +++ b/src/lib_node_shell/block_validator.mli @@ -9,40 +9,15 @@ type t -type block_error = - | Cannot_parse_operation of Operation_hash.t - | Invalid_fitness of { expected: Fitness.t ; found: Fitness.t } - | Non_increasing_timestamp - | Non_increasing_fitness - | Invalid_level of { expected: Int32.t ; found: Int32.t } - | Invalid_proto_level of { expected: int ; found: int } - | Replayed_operation of Operation_hash.t - | Outdated_operation of - { operation: Operation_hash.t; - originating_block: Block_hash.t } - | Expired_network of - { net_id: Net_id.t ; - expiration: Time.t ; - timestamp: Time.t ; - } - | Unexpected_number_of_validation_passes of int (* uint8 *) - | Too_many_operations of { pass: int; found: int; max: int } - | Oversized_operation of { operation: Operation_hash.t; - size: int; max: int } +type limits = { + protocol_timeout: float ; + worker_limits : Worker_types.limits ; +} -type error += - | Invalid_block of - { block: Block_hash.t ; error: block_error } - | Unavailable_protocol of - { block: Block_hash.t ; protocol: Protocol_hash.t } - | Inconsistent_operations_hash of - { block: Block_hash.t ; - expected: Operation_list_list_hash.t ; - found: Operation_list_list_hash.t } +type error += Closed of unit val create: - protocol_timeout:float -> - Distributed_db.t -> t + limits -> Distributed_db.t -> t Lwt.t val validate: t -> @@ -60,3 +35,10 @@ val fetch_and_compile_protocol: Protocol_hash.t -> State.Registred_protocol.t tzresult Lwt.t val shutdown: t -> unit Lwt.t + +val running_worker: unit -> t +val status: t -> Worker_types.worker_status + +val pending_requests : t -> (Time.t * Block_validator_worker_state.Request.view) list +val current_request : t -> (Time.t * Time.t * Block_validator_worker_state.Request.view) option +val last_events : t -> (Lwt_log_core.level * Block_validator_worker_state.Event.t list) list diff --git a/src/lib_node_shell/bootstrap_pipeline.ml b/src/lib_node_shell/bootstrap_pipeline.ml index d844847f3..80effe5c9 100644 --- a/src/lib_node_shell/bootstrap_pipeline.ml +++ b/src/lib_node_shell/bootstrap_pipeline.ml @@ -170,8 +170,8 @@ let rec validation_worker_loop pipeline = | Ok () -> validation_worker_loop pipeline | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> Lwt.return_unit - | Error ([ Block_validator.Invalid_block _ - | Block_validator.Unavailable_protocol _ ] as err ) -> + | Error ([ Block_validator_errors.Invalid_block _ + | Block_validator_errors.Unavailable_protocol _ ] as err ) -> (* Propagate the error to the peer validator. *) pipeline.errors <- pipeline.errors @ err ; Lwt_canceler.cancel pipeline.canceler >>= fun () -> diff --git a/src/lib_node_shell/node.ml b/src/lib_node_shell/node.ml index 3671658a5..7a59d398c 100644 --- a/src/lib_node_shell/node.ml +++ b/src/lib_node_shell/node.ml @@ -102,6 +102,11 @@ and prevalidator_limits = Prevalidator.limits = { worker_limits : Worker_types.limits ; } +and block_validator_limits = Block_validator.limits = { + protocol_timeout: float ; + worker_limits : Worker_types.limits ; +} + let may_create_net state genesis = State.Net.get state (Net_id.of_block_hash genesis.State.Net.block) >>= function | Ok net -> Lwt.return net @@ -112,13 +117,15 @@ let create { genesis ; store_root ; context_root ; patch_context ; p2p = net_params ; test_network_max_tll = max_child_ttl ; bootstrap_threshold } - timeout prevalidator_limits = + timeout + block_validator_limits + prevalidator_limits = init_p2p net_params >>=? fun p2p -> State.read ~store_root ~context_root ?patch_context () >>=? fun state -> let distributed_db = Distributed_db.create state p2p in - let validator = - Validator.create state distributed_db timeout prevalidator_limits in + Validator.create state distributed_db timeout + block_validator_limits prevalidator_limits >>= fun validator -> may_create_net state genesis >>= fun mainnet_state -> Validator.activate validator ~bootstrap_threshold diff --git a/src/lib_node_shell/node.mli b/src/lib_node_shell/node.mli index 1b0cfd125..d9be11501 100644 --- a/src/lib_node_shell/node.mli +++ b/src/lib_node_shell/node.mli @@ -30,8 +30,17 @@ and prevalidator_limits = { operation_timeout: float ; worker_limits : Worker_types.limits ; } +and block_validator_limits = { + protocol_timeout: float ; + worker_limits : Worker_types.limits ; +} -val create: config -> timeout -> prevalidator_limits -> t tzresult Lwt.t +val create: + config -> + timeout -> + block_validator_limits -> + prevalidator_limits -> + t tzresult Lwt.t module RPC : sig diff --git a/src/lib_node_shell/peer_validator.ml b/src/lib_node_shell/peer_validator.ml index c782eb920..f4497c8df 100644 --- a/src/lib_node_shell/peer_validator.ml +++ b/src/lib_node_shell/peer_validator.ml @@ -200,7 +200,7 @@ let rec worker_loop pv = lwt_debug "%a" Error_monad.pp_print_error errors >>= fun () -> Lwt_canceler.cancel pv.canceler >>= fun () -> Lwt.return_unit - | Error [Block_validator.Unavailable_protocol { protocol } ] -> begin + | Error [Block_validator_errors.Unavailable_protocol { protocol } ] -> begin Block_validator.fetch_and_compile_protocol pv.block_validator ~peer:pv.peer_id diff --git a/src/lib_node_shell/validator.ml b/src/lib_node_shell/validator.ml index 1b23eccf0..d73742a03 100644 --- a/src/lib_node_shell/validator.ml +++ b/src/lib_node_shell/validator.ml @@ -15,6 +15,7 @@ type t = { db: Distributed_db.t ; block_validator: Block_validator.t ; timeout: Net_validator.timeout ; + block_validator_limits: Block_validator.limits ; prevalidator_limits: Prevalidator.limits ; valid_block_input: State.Block.t Lwt_watcher.input ; @@ -22,16 +23,17 @@ type t = { } -let create state db timeout prevalidator_limits = - let block_validator = - Block_validator.create - ~protocol_timeout:timeout.Net_validator.protocol - db in +let create state db timeout + block_validator_limits + prevalidator_limits = + Block_validator.create block_validator_limits db >>= fun block_validator -> let valid_block_input = Lwt_watcher.create_input () in - { state ; db ; timeout ; prevalidator_limits ; block_validator ; - valid_block_input ; - active_nets = Net_id.Table.create 7 ; - } + Lwt.return + { state ; db ; timeout ; block_validator ; + prevalidator_limits ; block_validator_limits ; + valid_block_input ; + active_nets = Net_id.Table.create 7 ; + } let activate v ?bootstrap_threshold ?max_child_ttl net_state = let net_id = State.Net.id net_state in diff --git a/src/lib_node_shell/validator.mli b/src/lib_node_shell/validator.mli index 80b1d866a..397fc215c 100644 --- a/src/lib_node_shell/validator.mli +++ b/src/lib_node_shell/validator.mli @@ -15,8 +15,9 @@ val create: State.t -> Distributed_db.t -> Net_validator.timeout -> + Block_validator.limits -> Prevalidator.limits -> - t + t Lwt.t val shutdown: t -> unit Lwt.t (** Start the validation scheduler of a given network. *) diff --git a/src/lib_node_shell_base/block_validator_errors.ml b/src/lib_node_shell_base/block_validator_errors.ml new file mode 100644 index 000000000..6e43f2e60 --- /dev/null +++ b/src/lib_node_shell_base/block_validator_errors.ml @@ -0,0 +1,271 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type block_error = + | Cannot_parse_operation of Operation_hash.t + | Invalid_fitness of { expected: Fitness.t ; found: Fitness.t } + | Non_increasing_timestamp + | Non_increasing_fitness + | Invalid_level of { expected: Int32.t ; found: Int32.t } + | Invalid_proto_level of { expected: int ; found: int } + | Replayed_operation of Operation_hash.t + | Outdated_operation of + { operation: Operation_hash.t; + originating_block: Block_hash.t } + | Expired_network of + { net_id: Net_id.t ; + expiration: Time.t ; + timestamp: Time.t ; + } + | Unexpected_number_of_validation_passes of int (* uint8 *) + | Too_many_operations of { pass: int; found: int; max: int } + | Oversized_operation of { operation: Operation_hash.t; + size: int; max: int } + +let block_error_encoding = + let open Data_encoding in + union + [ + case (Tag 0) + (obj2 + (req "error" (constant "cannot_parse_operation")) + (req "operation" Operation_hash.encoding)) + (function Cannot_parse_operation operation -> Some ((), operation) + | _ -> None) + (fun ((), operation) -> Cannot_parse_operation operation) ; + case (Tag 1) + (obj3 + (req "error" (constant "invalid_fitness")) + (req "expected" Fitness.encoding) + (req "found" Fitness.encoding)) + (function + | Invalid_fitness { expected ; found } -> + Some ((), expected, found) + | _ -> None) + (fun ((), expected, found) -> Invalid_fitness { expected ; found }) ; + case (Tag 2) + (obj1 + (req "error" (constant "non_increasing_timestamp"))) + (function Non_increasing_timestamp -> Some () + | _ -> None) + (fun () -> Non_increasing_timestamp) ; + case (Tag 3) + (obj1 + (req "error" (constant "non_increasing_fitness"))) + (function Non_increasing_fitness -> Some () + | _ -> None) + (fun () -> Non_increasing_fitness) ; + case (Tag 4) + (obj3 + (req "error" (constant "invalid_level")) + (req "expected" int32) + (req "found" int32)) + (function + | Invalid_level { expected ; found } -> + Some ((), expected, found) + | _ -> None) + (fun ((), expected, found) -> Invalid_level { expected ; found }) ; + case (Tag 5) + (obj3 + (req "error" (constant "invalid_proto_level")) + (req "expected" uint8) + (req "found" uint8)) + (function + | Invalid_proto_level { expected ; found } -> + Some ((), expected, found) + | _ -> None) + (fun ((), expected, found) -> + Invalid_proto_level { expected ; found }) ; + case (Tag 6) + (obj2 + (req "error" (constant "replayed_operation")) + (req "operation" Operation_hash.encoding)) + (function Replayed_operation operation -> Some ((), operation) + | _ -> None) + (fun ((), operation) -> Replayed_operation operation) ; + case (Tag 7) + (obj3 + (req "error" (constant "outdated_operation")) + (req "operation" Operation_hash.encoding) + (req "originating_block" Block_hash.encoding)) + (function + | Outdated_operation { operation ; originating_block } -> + Some ((), operation, originating_block) + | _ -> None) + (fun ((), operation, originating_block) -> + Outdated_operation { operation ; originating_block }) ; + case (Tag 8) + (obj2 + (req "error" (constant "unexpected_number_of_passes")) + (req "found" uint8)) + (function + | Unexpected_number_of_validation_passes n -> Some ((), n) + | _ -> None) + (fun ((), n) -> Unexpected_number_of_validation_passes n) ; + case (Tag 9) + (obj4 + (req "error" (constant "too_many_operations")) + (req "validation_pass" uint8) + (req "found" uint16) + (req "max" uint16)) + (function + | Too_many_operations { pass ; found ; max } -> + Some ((), pass, found, max) + | _ -> None) + (fun ((), pass, found, max) -> + Too_many_operations { pass ; found ; max }) ; + case (Tag 10) + (obj4 + (req "error" (constant "oversized_operation")) + (req "operation" Operation_hash.encoding) + (req "found" int31) + (req "max" int31)) + (function + | Oversized_operation { operation ; size ; max } -> + Some ((), operation, size, max) + | _ -> None) + (fun ((), operation, size, max) -> + Oversized_operation { operation ; size ; max }) ; + ] + +let pp_block_error ppf = function + | Cannot_parse_operation oph -> + Format.fprintf ppf + "Failed to parse the operation %a." + Operation_hash.pp_short oph + | Invalid_fitness { expected ; found } -> + Format.fprintf ppf + "@[Invalid fitness:@ \ + \ expected %a@ \ + \ found %a@]" + Fitness.pp expected + Fitness.pp found + | Non_increasing_timestamp -> + Format.fprintf ppf "Non increasing timestamp" + | Non_increasing_fitness -> + Format.fprintf ppf "Non increasing fitness" + | Invalid_level { expected ; found } -> + Format.fprintf ppf + "Invalid level:@ \ + \ expected %ld@ \ + \ found %ld" + expected + found + | Invalid_proto_level { expected ; found } -> + Format.fprintf ppf + "Invalid protocol level:@ \ + \ expected %d@ \ + \ found %d" + expected + found + | Replayed_operation oph -> + Format.fprintf ppf + "The operation %a was previously included in the chain." + Operation_hash.pp_short oph + | Outdated_operation { operation ; originating_block } -> + Format.fprintf ppf + "The operation %a is outdated (originated in block: %a)" + Operation_hash.pp_short operation + Block_hash.pp_short originating_block + | Expired_network { net_id ; expiration ; timestamp } -> + Format.fprintf ppf + "The block timestamp (%a) is later than \ + its network expiration date: %a (net: %a)." + Time.pp_hum timestamp + Time.pp_hum expiration + Net_id.pp_short net_id + | Unexpected_number_of_validation_passes n -> + Format.fprintf ppf + "Invalid number of validation passes (found: %d)" + n + | Too_many_operations { pass ; found ; max } -> + Format.fprintf ppf + "Too many operations in validation pass %d (found: %d, max: %d)" + pass found max + | Oversized_operation { operation ; size ; max } -> + Format.fprintf ppf + "Oversized operation %a (size: %d, max: %d)" + Operation_hash.pp_short operation size max + +type error += + | Invalid_block of + { block: Block_hash.t ; error: block_error } + | Unavailable_protocol of + { block: Block_hash.t ; protocol: Protocol_hash.t } + | Inconsistent_operations_hash of + { block: Block_hash.t ; + expected: Operation_list_list_hash.t ; + found: Operation_list_list_hash.t } + +let () = + Error_monad.register_error_kind + `Permanent + ~id:"validator.invalid_block" + ~title:"Invalid block" + ~description:"Invalid block." + ~pp:begin fun ppf (block, error) -> + Format.fprintf ppf + "@[Invalid block %a@ %a@]" + Block_hash.pp_short block pp_block_error error + end + Data_encoding.(merge_objs + (obj1 (req "invalid_block" Block_hash.encoding)) + block_error_encoding) + (function Invalid_block { block ; error } -> + Some (block, error) | _ -> None) + (fun (block, error) -> + Invalid_block { block ; error }) ; + Error_monad.register_error_kind + `Temporary + ~id:"validator.unavailable_protocol" + ~title:"Missing protocol" + ~description:"The protocol required for validating a block is missing." + ~pp:begin fun ppf (block, protocol) -> + Format.fprintf ppf + "Missing protocol (%a) when validating the block %a." + Protocol_hash.pp_short protocol + Block_hash.pp_short block + end + Data_encoding.( + obj2 + (req "block" Block_hash.encoding) + (req "missing_protocol" Protocol_hash.encoding)) + (function + | Unavailable_protocol { block ; protocol } -> + Some (block, protocol) + | _ -> None) + (fun (block, protocol) -> Unavailable_protocol { block ; protocol }) ; + Error_monad.register_error_kind + `Temporary + ~id:"validator.inconsistent_operations_hash" + ~title:"Invalid merkle tree" + ~description:"The provided list of operations is inconsistent with \ + the block header." + ~pp:begin fun ppf (block, expected, found) -> + Format.fprintf ppf + "@[The provided list of operations for block %a \ + \ is inconsistent with the block header@ \ + \ expected: %a@ \ + \ found: %a@]" + Block_hash.pp_short block + Operation_list_list_hash.pp_short expected + Operation_list_list_hash.pp_short found + end + Data_encoding.( + obj3 + (req "block" Block_hash.encoding) + (req "expected" Operation_list_list_hash.encoding) + (req "found" Operation_list_list_hash.encoding)) + (function + | Inconsistent_operations_hash { block ; expected ; found } -> + Some (block, expected, found) + | _ -> None) + (fun (block, expected, found) -> + Inconsistent_operations_hash { block ; expected ; found }) + diff --git a/src/lib_node_shell_base/block_validator_errors.mli b/src/lib_node_shell_base/block_validator_errors.mli new file mode 100644 index 000000000..3f946313a --- /dev/null +++ b/src/lib_node_shell_base/block_validator_errors.mli @@ -0,0 +1,39 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type block_error = + | Cannot_parse_operation of Operation_hash.t + | Invalid_fitness of { expected: Fitness.t ; found: Fitness.t } + | Non_increasing_timestamp + | Non_increasing_fitness + | Invalid_level of { expected: Int32.t ; found: Int32.t } + | Invalid_proto_level of { expected: int ; found: int } + | Replayed_operation of Operation_hash.t + | Outdated_operation of + { operation: Operation_hash.t; + originating_block: Block_hash.t } + | Expired_network of + { net_id: Net_id.t ; + expiration: Time.t ; + timestamp: Time.t ; + } + | Unexpected_number_of_validation_passes of int (* uint8 *) + | Too_many_operations of { pass: int; found: int; max: int } + | Oversized_operation of { operation: Operation_hash.t; + size: int; max: int } + +type error += + | Invalid_block of + { block: Block_hash.t ; error: block_error } + | Unavailable_protocol of + { block: Block_hash.t ; protocol: Protocol_hash.t } + | Inconsistent_operations_hash of + { block: Block_hash.t ; + expected: Operation_list_list_hash.t ; + found: Operation_list_list_hash.t } diff --git a/src/lib_node_shell_base/block_validator_worker_state.ml b/src/lib_node_shell_base/block_validator_worker_state.ml new file mode 100644 index 000000000..6bcb79e1a --- /dev/null +++ b/src/lib_node_shell_base/block_validator_worker_state.ml @@ -0,0 +1,92 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module Request = struct + type view = { + net_id : Net_id.t ; + block : Block_hash.t ; + peer : P2p_types.Peer_id.t option ; + } + let encoding = + let open Data_encoding in + conv + (fun { net_id ; block ; peer } -> (block, net_id, peer)) + (fun (block, net_id, peer) -> { net_id ; block ; peer }) + (obj3 + (req "block" Block_hash.encoding) + (req "net_id" Net_id.encoding) + (opt "peer" P2p_types.Peer_id.encoding)) + + let pp ppf { net_id ; block ; peer } = + Format.fprintf ppf "Validation of %a (net: %a)" + Block_hash.pp block + Net_id.pp_short net_id ; + match peer with + | None -> () + | Some peer -> + Format.fprintf ppf "from peer %a" + P2p_types.Peer_id.pp_short peer +end + +module Event = struct + type t = + | Validation_success of Request.view * Worker_types.request_status + | Validation_failure of Request.view * Worker_types.request_status * error list + | Debug of string + + let level req = + match req with + | Debug _ -> Logging.Debug + | Validation_success _ + | Validation_failure _ -> Logging.Notice + + let encoding error_encoding = + let open Data_encoding in + union + [ case (Tag 0) + (obj1 (req "message" string)) + (function Debug msg -> Some msg | _ -> None) + (fun msg -> Debug msg) ; + case (Tag 1) + (obj2 + (req "successful_validation" Request.encoding) + (req "status" Worker_types.request_status_encoding)) + (function Validation_success (r, s) -> Some (r, s) | _ -> None) + (fun (r, s) -> Validation_success (r, s)) ; + case (Tag 2) + (obj3 + (req "failed_validation" Request.encoding) + (req "status" Worker_types.request_status_encoding) + (dft "errors" error_encoding [])) + (function Validation_failure (r, s, err) -> Some (r, s, err) | _ -> None) + (fun (r, s, err) -> Validation_failure (r, s, err)) ] + + let pp ppf = function + | Debug msg -> Format.fprintf ppf "%s" msg + | Validation_success (req, { pushed ; treated ; completed }) -> + Format.fprintf ppf + "@[Block %a succesfully validated@,\ + Pushed: %a, Treated: %a, Completed: %a@]" + Block_hash.pp req.block + Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed + | Validation_failure (req, { pushed ; treated ; completed }, errs)-> + Format.fprintf ppf + "@[Validation of block %a failed@,\ + Pushed: %a, Treated: %a, Completed: %a@,\ + Error: %a@]" + Block_hash.pp req.block + Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed + Error_monad.pp_print_error errs +end + +module Worker_state = struct + type view = unit + let encoding = Data_encoding.empty + let pp _ppf _view = () +end diff --git a/src/lib_node_shell_base/block_validator_worker_state.mli b/src/lib_node_shell_base/block_validator_worker_state.mli new file mode 100644 index 000000000..e297cce04 --- /dev/null +++ b/src/lib_node_shell_base/block_validator_worker_state.mli @@ -0,0 +1,34 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module Request : sig + type view = { + net_id : Net_id.t ; + block : Block_hash.t ; + peer: P2p_types.Peer_id.t option ; + } + val encoding : view Data_encoding.encoding + val pp : Format.formatter -> view -> unit +end + +module Event : sig + type t = + | Validation_success of Request.view * Worker_types.request_status + | Validation_failure of Request.view * Worker_types.request_status * error list + | Debug of string + val level : t -> Logging.level + val encoding : error list Data_encoding.encoding -> t Data_encoding.encoding + val pp : Format.formatter -> t -> unit +end + +module Worker_state : sig + type view = unit + val encoding : view Data_encoding.encoding + val pp : Format.formatter -> view -> unit +end