diff --git a/src/bin_node/node_config_file.ml b/src/bin_node/node_config_file.ml index 356852f8e..0fd5625de 100644 --- a/src/bin_node/node_config_file.ml +++ b/src/bin_node/node_config_file.ml @@ -57,6 +57,7 @@ and shell = { block_validator_limits : Node.block_validator_limits ; prevalidator_limits : Node.prevalidator_limits ; timeout : Node.timeout ; + peer_validator_limits : Node.peer_validator_limits ; } let default_net_limits : P2p.limits = { @@ -130,7 +131,19 @@ let default_shell = { block_operations = 60. ; protocol = 120. ; new_head_request = 90. ; - } + } ; + peer_validator_limits = { + block_header_timeout = 60. ; + block_operations_timeout = 60. ; + protocol_timeout = 120. ; + new_head_request_timeout = 90. ; + worker_limits = { + backlog_size = 1000 ; + backlog_level = Logging.Info ; + zombie_lifetime = 600. ; + zombie_memory = 120. ; + } + } ; } let default_config = { @@ -330,6 +343,30 @@ let prevalidator_limits_encoding = default_shell.prevalidator_limits.worker_limits.zombie_lifetime default_shell.prevalidator_limits.worker_limits.zombie_memory)) +let peer_validator_limits_encoding = + let open Data_encoding in + let default_limits = default_shell.peer_validator_limits in + conv + (fun { Node.block_header_timeout ; block_operations_timeout ; + protocol_timeout ; new_head_request_timeout ; worker_limits } -> + ((block_header_timeout, block_operations_timeout, + protocol_timeout, new_head_request_timeout), worker_limits)) + (fun ((block_header_timeout, block_operations_timeout, + protocol_timeout, new_head_request_timeout), worker_limits) -> + { block_header_timeout ; block_operations_timeout ; + protocol_timeout ; new_head_request_timeout ; worker_limits }) + (merge_objs + (obj4 + (dft "block_header_request_timeout" timeout_encoding default_limits.block_header_timeout) + (dft "block_operations_request_timeout" timeout_encoding default_limits.block_operations_timeout) + (dft "protocol_request_timeout" timeout_encoding default_limits.protocol_timeout) + (dft "new_head_request_timeout" timeout_encoding default_limits.new_head_request_timeout)) + (worker_limits_encoding + default_limits.worker_limits.backlog_size + default_limits.worker_limits.backlog_level + default_limits.worker_limits.zombie_lifetime + default_limits.worker_limits.zombie_memory)) + let timeout_encoding = let open Data_encoding in let uint8 = conv int_of_float float_of_int uint8 in @@ -352,18 +389,20 @@ let timeout_encoding = let shell = let open Data_encoding in conv - (fun { bootstrap_threshold ; timeout ; + (fun { bootstrap_threshold ; timeout ; peer_validator_limits ; block_validator_limits ; prevalidator_limits } -> - bootstrap_threshold, timeout, + bootstrap_threshold, timeout, peer_validator_limits, block_validator_limits, prevalidator_limits) - (fun (bootstrap_threshold, timeout, + (fun (bootstrap_threshold, timeout, peer_validator_limits, block_validator_limits, prevalidator_limits) -> { bootstrap_threshold ; timeout ; + peer_validator_limits ; block_validator_limits ; prevalidator_limits }) - (obj4 + (obj5 (dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold) (dft "timeout" timeout_encoding default_shell.timeout) + (dft "peer_validator" peer_validator_limits_encoding default_shell.peer_validator_limits) (dft "block_validator" block_validator_limits_encoding default_shell.block_validator_limits) (dft "prevalidator" prevalidator_limits_encoding default_shell.prevalidator_limits) ) @@ -484,6 +523,7 @@ let update ~default:cfg.shell.bootstrap_threshold bootstrap_threshold ; timeout = cfg.shell.timeout ; + peer_validator_limits = cfg.shell.peer_validator_limits ; block_validator_limits = cfg.shell.block_validator_limits ; prevalidator_limits = cfg.shell.prevalidator_limits ; } diff --git a/src/bin_node/node_config_file.mli b/src/bin_node/node_config_file.mli index 917551e8a..9c58e1c5a 100644 --- a/src/bin_node/node_config_file.mli +++ b/src/bin_node/node_config_file.mli @@ -47,6 +47,7 @@ and shell = { block_validator_limits : Node.block_validator_limits ; prevalidator_limits : Node.prevalidator_limits ; timeout : Node.timeout ; + peer_validator_limits : Node.peer_validator_limits ; } val default_data_dir: string diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index 040905929..2ba43d1f3 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -173,6 +173,7 @@ let init_node ?sandbox (config : Node_config_file.t) = Node.create node_config config.shell.timeout + config.shell.peer_validator_limits config.shell.block_validator_limits config.shell.prevalidator_limits diff --git a/src/lib_node_shell/net_validator.ml b/src/lib_node_shell/net_validator.ml index 1a06da9e5..ef3db3da2 100644 --- a/src/lib_node_shell/net_validator.ml +++ b/src/lib_node_shell/net_validator.ml @@ -18,6 +18,7 @@ type t = { timeout: timeout ; prevalidator_limits: Prevalidator.limits ; + peer_validator_limits: Peer_validator.limits ; bootstrap_threshold: int ; mutable bootstrapped: bool ; bootstrapped_waiter: unit Lwt.t ; @@ -83,10 +84,6 @@ let may_activate_peer_validator nv peer_id = with Not_found -> let pv = Peer_validator.create - ~new_head_request_timeout:nv.timeout.new_head_request - ~block_header_timeout:nv.timeout.block_header - ~block_operations_timeout:nv.timeout.block_operations - ~protocol_timeout:nv.timeout.protocol ~notify_new_block:(notify_new_block nv) ~notify_bootstrapped: begin fun () -> P2p.Peer_id.Table.add nv.bootstrapped_peers peer_id () ; @@ -96,6 +93,7 @@ let may_activate_peer_validator nv peer_id = P2p.Peer_id.Table.remove nv.active_peers peer_id ; P2p.Peer_id.Table.remove nv.bootstrapped_peers peer_id ; end + nv.peer_validator_limits nv.block_validator nv.net_db peer_id in P2p.Peer_id.Table.add nv.active_peers peer_id pv ; pv @@ -122,7 +120,7 @@ let broadcast_head nv ~previous block = let rec create ?max_child_ttl ?parent ?(bootstrap_threshold = 1) - timeout prevalidator_limits block_validator + timeout peer_validator_limits prevalidator_limits block_validator global_valid_block_input db net_state = Chain.init_head net_state >>= fun () -> let net_db = Distributed_db.activate db net_state in @@ -135,7 +133,7 @@ let rec create let nv = { db ; net_state ; net_db ; block_validator ; prevalidator ; - timeout ; prevalidator_limits ; + timeout ; prevalidator_limits ; peer_validator_limits ; valid_block_input ; global_valid_block_input ; new_head_input ; parent ; max_child_ttl ; child = None ; @@ -251,7 +249,8 @@ and may_switch_test_network nv block = return net_state end >>=? fun net_state -> create - ~parent:nv nv.timeout nv.prevalidator_limits nv.block_validator + ~parent:nv nv.timeout nv.peer_validator_limits + nv.prevalidator_limits nv.block_validator nv.global_valid_block_input nv.db net_state >>= fun child -> nv.child <- Some child ; diff --git a/src/lib_node_shell/net_validator.mli b/src/lib_node_shell/net_validator.mli index 316f8f7bc..bbaa0ffb3 100644 --- a/src/lib_node_shell/net_validator.mli +++ b/src/lib_node_shell/net_validator.mli @@ -20,6 +20,7 @@ val create: ?max_child_ttl:int -> ?bootstrap_threshold:int -> timeout -> + Peer_validator.limits -> Prevalidator.limits -> Block_validator.t -> State.Block.t Lwt_watcher.input -> diff --git a/src/lib_node_shell/node.ml b/src/lib_node_shell/node.ml index 7a59d398c..6a2b33180 100644 --- a/src/lib_node_shell/node.ml +++ b/src/lib_node_shell/node.ml @@ -95,6 +95,13 @@ and timeout = Net_validator.timeout = { protocol: float ; new_head_request: float ; } +and peer_validator_limits = Peer_validator.limits = { + new_head_request_timeout: float ; + block_header_timeout: float ; + block_operations_timeout: float ; + protocol_timeout: float ; + worker_limits: Worker_types.limits +} and prevalidator_limits = Prevalidator.limits = { max_refused_operations: int ; @@ -118,6 +125,7 @@ let create { genesis ; store_root ; context_root ; test_network_max_tll = max_child_ttl ; bootstrap_threshold } timeout + peer_validator_limits block_validator_limits prevalidator_limits = init_p2p net_params >>=? fun p2p -> @@ -125,6 +133,7 @@ let create { genesis ; store_root ; context_root ; ~store_root ~context_root ?patch_context () >>=? fun state -> let distributed_db = Distributed_db.create state p2p in Validator.create state distributed_db timeout + peer_validator_limits block_validator_limits prevalidator_limits >>= fun validator -> may_create_net state genesis >>= fun mainnet_state -> Validator.activate validator diff --git a/src/lib_node_shell/node.mli b/src/lib_node_shell/node.mli index d9be11501..b4be0d085 100644 --- a/src/lib_node_shell/node.mli +++ b/src/lib_node_shell/node.mli @@ -25,6 +25,13 @@ and timeout = { protocol: float ; new_head_request: float ; } +and peer_validator_limits = { + new_head_request_timeout: float ; + block_header_timeout: float ; + block_operations_timeout: float ; + protocol_timeout: float ; + worker_limits: Worker_types.limits +} and prevalidator_limits = { max_refused_operations: int ; operation_timeout: float ; @@ -38,6 +45,7 @@ and block_validator_limits = { val create: config -> timeout -> + peer_validator_limits -> block_validator_limits -> prevalidator_limits -> t tzresult Lwt.t diff --git a/src/lib_node_shell/peer_validator.ml b/src/lib_node_shell/peer_validator.ml index f4497c8df..0c53861b4 100644 --- a/src/lib_node_shell/peer_validator.ml +++ b/src/lib_node_shell/peer_validator.ml @@ -9,37 +9,78 @@ (* FIXME ignore/postpone fetching/validating of block in the future... *) -include Logging.Make(struct let name = "node.validator.peer" end) +open Peer_validator_worker_state -type msg = - | New_head of Block_hash.t * Block_header.t - | New_branch of Block_hash.t * Block_locator.t +module Name = struct + type t = Net_id.t * P2p.Peer_id.t + let encoding = + Data_encoding.tup2 Net_id.encoding P2p.Peer_id.encoding + let base = [ "peer_validator" ] + let pp ppf (net, peer) = + Format.fprintf ppf "%a:%a" + Net_id.pp_short net P2p.Peer_id.pp_short peer +end -type t = { +module Request = struct + include Request - peer_id: P2p.Peer_id.t ; - net_db: Distributed_db.net_db ; - block_validator: Block_validator.t ; + type _ t = + | New_head : Block_hash.t * Block_header.t -> unit t + | New_branch : Block_hash.t * Block_locator.t -> unit t + let view (type a) (req : a t) : view = match req with + | New_head (hash, _) -> + New_head hash + | New_branch (hash, locator) -> + New_branch (hash, Block_locator_iterator.estimated_length locator) +end + +type limits = { new_head_request_timeout: float ; block_header_timeout: float ; block_operations_timeout: float ; protocol_timeout: float ; - - (* callback to net_validator *) - notify_new_block: State.Block.t -> unit ; - notify_bootstrapped: unit -> unit ; - - mutable bootstrapped: bool ; - mutable last_validated_head: Block_header.t ; - mutable last_advertised_head: Block_header.t ; - - mutable worker: unit Lwt.t ; - dropbox: msg Lwt_dropbox.t ; - canceler: Lwt_canceler.t ; - + worker_limits: Worker_types.limits } +module Types = struct + include Worker_state + + type parameters = { + net_db: Distributed_db.net_db ; + block_validator: Block_validator.t ; + (* callback to net_validator *) + notify_new_block: State.Block.t -> unit ; + notify_bootstrapped: unit -> unit ; + notify_termination: unit -> unit ; + limits: limits; + } + + type state = { + peer_id: P2p.Peer_id.t ; + parameters : parameters ; + mutable bootstrapped: bool ; + mutable last_validated_head: Block_header.t ; + mutable last_advertised_head: Block_header.t ; + } + + let view (state : state) _ : view = + let { bootstrapped ; last_validated_head ; last_advertised_head } = state in + { bootstrapped ; + last_validated_head = Block_header.hash last_validated_head ; + last_advertised_head = Block_header.hash last_advertised_head } + +end + +module Worker = Worker.Make (Name) (Event) (Request) (Types) + +open Types + +type t = Worker.dropbox Worker.t + +let debug w = + Format.kasprintf (fun msg -> Worker.record_event w (Debug msg)) + type error += | Unknown_ancestor | Known_invalid @@ -47,22 +88,23 @@ type error += let set_bootstrapped pv = if not pv.bootstrapped then begin pv.bootstrapped <- true ; - pv.notify_bootstrapped () ; + pv.parameters.notify_bootstrapped () ; end -let bootstrap_new_branch pv _ancestor _head unknown_prefix = +let bootstrap_new_branch w _ancestor _head unknown_prefix = + let pv = Worker.state w in let len = Block_locator_iterator.estimated_length unknown_prefix in - lwt_log_info + debug w "validating new branch from peer %a (approx. %d blocks)" - P2p.Peer_id.pp_short pv.peer_id len >>= fun () -> + P2p.Peer_id.pp_short pv.peer_id len ; let pipeline = Bootstrap_pipeline.create - ~notify_new_block:pv.notify_new_block - ~block_header_timeout:pv.block_header_timeout - ~block_operations_timeout:pv.block_operations_timeout - pv.block_validator - pv.peer_id pv.net_db unknown_prefix in - Lwt_utils.protect ~canceler:pv.canceler + ~notify_new_block:pv.parameters.notify_new_block + ~block_header_timeout:pv.parameters.limits.block_header_timeout + ~block_operations_timeout:pv.parameters.limits.block_operations_timeout + pv.parameters.block_validator + pv.peer_id pv.parameters.net_db unknown_prefix in + Worker.protect w ~on_error:begin fun error -> (* if the peer_validator is killed, let's cancel the pipeline *) Bootstrap_pipeline.cancel pipeline >>= fun () -> @@ -72,235 +114,263 @@ let bootstrap_new_branch pv _ancestor _head unknown_prefix = Bootstrap_pipeline.wait pipeline end >>=? fun () -> set_bootstrapped pv ; - lwt_log_info + debug w "done validating new branch from peer %a." - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + P2p.Peer_id.pp_short pv.peer_id ; return () -let validate_new_head pv hash (header : Block_header.t) = - let net_state = Distributed_db.net_state pv.net_db in +let validate_new_head w hash (header : Block_header.t) = + let pv = Worker.state w in + let net_state = Distributed_db.net_state pv.parameters.net_db in State.Block.known net_state header.shell.predecessor >>= function | false -> - lwt_debug + debug w "missing predecessor for new head %a from peer %a" Block_hash.pp_short hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> - Distributed_db.Request.current_branch pv.net_db ~peer:pv.peer_id () ; + P2p.Peer_id.pp_short pv.peer_id ; + Distributed_db.Request.current_branch pv.parameters.net_db ~peer:pv.peer_id () ; return () | true -> - lwt_debug + debug w "fetching operations for new head %a from peer %a" Block_hash.pp_short hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + P2p.Peer_id.pp_short pv.peer_id ; map_p (fun i -> - Lwt_utils.protect ~canceler:pv.canceler begin fun () -> + Worker.protect w begin fun () -> Distributed_db.Operations.fetch - ~timeout:pv.block_operations_timeout - pv.net_db ~peer:pv.peer_id + ~timeout:pv.parameters.limits.block_operations_timeout + pv.parameters.net_db ~peer:pv.peer_id (hash, i) header.shell.operations_hash end) (0 -- (header.shell.validation_passes - 1)) >>=? fun operations -> - lwt_debug + debug w "requesting validation for new head %a from peer %a" Block_hash.pp_short hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + P2p.Peer_id.pp_short pv.peer_id ; Block_validator.validate - ~notify_new_block:pv.notify_new_block - pv.block_validator pv.net_db + ~notify_new_block:pv.parameters.notify_new_block + pv.parameters.block_validator pv.parameters.net_db hash header operations >>=? fun _block -> - lwt_debug "end of validation for new head %a from peer %a" + debug w + "end of validation for new head %a from peer %a" Block_hash.pp_short hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + P2p.Peer_id.pp_short pv.peer_id ; set_bootstrapped pv ; return () -let may_validate_new_head pv hash header = - let net_state = Distributed_db.net_state pv.net_db in +let may_validate_new_head w hash header = + let pv = Worker.state w in + let net_state = Distributed_db.net_state pv.parameters.net_db in State.Block.known net_state hash >>= function | true -> begin State.Block.known_valid net_state hash >>= function | true -> - lwt_debug + debug w "ignoring previously validated block %a from peer %a" Block_hash.pp_short hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + P2p.Peer_id.pp_short pv.peer_id ; set_bootstrapped pv ; pv.last_validated_head <- header ; return () | false -> - lwt_log_info + debug w "ignoring known invalid block %a from peer %a" Block_hash.pp_short hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + P2p.Peer_id.pp_short pv.peer_id ; fail Known_invalid end | false -> - validate_new_head pv hash header + validate_new_head w hash header -let may_validate_new_branch pv distant_hash locator = +let may_validate_new_branch w distant_hash locator = + let pv = Worker.state w in let distant_header, _ = (locator : Block_locator.t :> Block_header.t * _) in - let net_state = Distributed_db.net_state pv.net_db in + let net_state = Distributed_db.net_state pv.parameters.net_db in Chain.head net_state >>= fun local_header -> if Fitness.compare distant_header.Block_header.shell.fitness (State.Block.fitness local_header) < 0 then begin set_bootstrapped pv ; - lwt_debug + debug w "ignoring branch %a with low fitness from peer: %a." Block_hash.pp_short distant_hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + P2p.Peer_id.pp_short pv.peer_id ; (* Don't bother with downloading a branch with a low fitness. *) return () end else begin - let net_state = Distributed_db.net_state pv.net_db in + let net_state = Distributed_db.net_state pv.parameters.net_db in Block_locator_iterator.known_ancestor net_state locator >>= function | None -> - lwt_log_info + debug w "ignoring branch %a without common ancestor from peer: %a." Block_hash.pp_short distant_hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + P2p.Peer_id.pp_short pv.peer_id ; fail Unknown_ancestor | Some (ancestor, unknown_prefix) -> - bootstrap_new_branch pv ancestor distant_header unknown_prefix + bootstrap_new_branch w ancestor distant_header unknown_prefix end -let rec worker_loop pv = - begin - Lwt_utils.protect ~canceler:pv.canceler begin fun () -> - Lwt_dropbox.take_with_timeout - pv.new_head_request_timeout - pv.dropbox >>= return - end >>=? function - | None -> - lwt_log_info "no new head from peer %a for 90 seconds." - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> - Distributed_db.Request.current_head pv.net_db ~peer:pv.peer_id () ; - return () - | Some (New_head (hash, header)) -> - lwt_log_info "processing new head %a from peer %a." - Block_hash.pp_short hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> - may_validate_new_head pv hash header - | Some (New_branch (hash, locator)) -> - (* TODO penalize empty locator... ?? *) - lwt_log_info "processing new branch %a from peer %a." - Block_hash.pp_short hash - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> - may_validate_new_branch pv hash locator - end >>= function - | Ok () -> - worker_loop pv - | Error ((( Unknown_ancestor - | Bootstrap_pipeline.Invalid_locator _ - | Block_validator.Invalid_block _ ) :: _) as errors ) -> +let on_no_request w = + let pv = Worker.state w in + debug w "no new head from peer %a for %g seconds." + P2p.Peer_id.pp_short pv.peer_id + pv.parameters.limits.new_head_request_timeout ; + Distributed_db.Request.current_head pv.parameters.net_db ~peer:pv.peer_id () ; + return () + +let on_request (type a) w (req : a Request.t) : a tzresult Lwt.t = + let pv = Worker.state w in + match req with + | Request.New_head (hash, header) -> + debug w + "processing new head %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id ; + may_validate_new_head w hash header + | Request.New_branch (hash, locator) -> + (* TODO penalize empty locator... ?? *) + debug w "processing new branch %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id ; + may_validate_new_branch w hash locator + +let on_completion w r _ st = + Worker.record_event w (Event.Request (Request.view r, st, None )) ; + Lwt.return () + +let on_error w r st errs = + let pv = Worker.state w in + match errs with + ((( Unknown_ancestor + | Bootstrap_pipeline.Invalid_locator _ + | Block_validator_errors.Invalid_block _ ) :: _) as errors ) -> (* TODO ban the peer_id... *) - lwt_log_info "Terminating the validation worker for peer %a (kickban)." - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> - lwt_debug "%a" Error_monad.pp_print_error errors >>= fun () -> - Lwt_canceler.cancel pv.canceler >>= fun () -> - Lwt.return_unit - | Error [Block_validator_errors.Unavailable_protocol { protocol } ] -> begin + debug w + "Terminating the validation worker for peer %a (kickban)." + P2p.Peer_id.pp_short pv.peer_id ; + debug w "%a" Error_monad.pp_print_error errors ; + Worker.trigger_shutdown w ; + Worker.record_event w (Event.Request (r, st, Some errs)) ; + Lwt.return (Error errs) + | [Block_validator_errors.Unavailable_protocol { protocol } ] -> begin Block_validator.fetch_and_compile_protocol - pv.block_validator + pv.parameters.block_validator ~peer:pv.peer_id - ~timeout:pv.protocol_timeout + ~timeout:pv.parameters.limits.protocol_timeout protocol >>= function - | Ok _ -> worker_loop pv + | Ok _ -> return () | Error _ -> (* TODO penality... *) - lwt_log_info "Terminating the validation worker for peer %a \ - \ (missing protocol %a)." + debug w + "Terminating the validation worker for peer %a \ + (missing protocol %a)." P2p.Peer_id.pp_short pv.peer_id - Protocol_hash.pp_short protocol >>= fun () -> - Lwt_canceler.cancel pv.canceler >>= fun () -> - Lwt.return_unit + Protocol_hash.pp_short protocol ; + Worker.record_event w (Event.Request (r, st, Some errs)) ; + Lwt.return (Error errs) end - | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_dropbox.Closed] -> - lwt_log_info "Terminating the validation worker for peer %a." - P2p.Peer_id.pp_short pv.peer_id >>= fun () -> - Lwt.return_unit - | Error err -> - lwt_log_error - "@[Unexpected error in the validation worker for peer %a:@ \ - \ %a@]" - P2p.Peer_id.pp_short pv.peer_id - pp_print_error err >>= fun () -> - Lwt_canceler.cancel pv.canceler >>= fun () -> - Lwt.return_unit + | _ -> + Worker.record_event w (Event.Request (r, st, Some errs)) ; + Lwt.return (Error errs) -let create - ?notify_new_block:(external_notify_new_block = fun _ -> ()) - ?(notify_bootstrapped = fun () -> ()) - ?(notify_termination = fun _ -> ()) - ~new_head_request_timeout - ~block_header_timeout - ~block_operations_timeout - ~protocol_timeout - block_validator net_db peer_id = - lwt_debug "creating validator for peer %a." - P2p.Peer_id.pp_short peer_id >>= fun () -> - let canceler = Lwt_canceler.create () in - let dropbox = Lwt_dropbox.create () in - let net_state = Distributed_db.net_state net_db in +let on_close w = + let pv = Worker.state w in + pv.parameters.notify_termination () ; + Distributed_db.disconnect pv.parameters.net_db pv.peer_id >>= fun () -> + Lwt.return () + +let on_launch _ name parameters = + let net_state = Distributed_db.net_state parameters.net_db in State.Block.read_exn net_state (State.Net.genesis net_state).block >>= fun genesis -> - let rec notify_new_block block = - pv.last_validated_head <- State.Block.header block ; - external_notify_new_block block - and pv = { - block_validator ; - notify_new_block ; - notify_bootstrapped ; - new_head_request_timeout ; - block_header_timeout ; - block_operations_timeout ; - protocol_timeout ; - net_db ; - peer_id ; + let rec pv = { + peer_id = snd name ; + parameters = { parameters with notify_new_block } ; bootstrapped = false ; last_validated_head = State.Block.header genesis ; last_advertised_head = State.Block.header genesis ; - canceler ; - dropbox ; - worker = Lwt.return_unit ; - } in - Lwt_canceler.on_cancel pv.canceler begin fun () -> - Lwt_dropbox.close pv.dropbox ; - Distributed_db.disconnect pv.net_db pv.peer_id >>= fun () -> - notify_termination pv ; - Lwt.return_unit - end ; - pv.worker <- - Lwt_utils.worker - (Format.asprintf "peer_validator.%a.%a" - Net_id.pp (State.Net.id net_state) P2p.Peer_id.pp_short peer_id) - ~run:(fun () -> worker_loop pv) - ~cancel:(fun () -> Lwt_canceler.cancel pv.canceler) ; + } + and notify_new_block block = + pv.last_validated_head <- State.Block.header block ; + parameters.notify_new_block block in Lwt.return pv -let notify_branch pv locator = +let table = + let merge w (Worker.Any_request neu) old = + let pv = Worker.state w in + match neu with + | Request.New_branch (_, locator) -> + let header, _ = (locator : Block_locator.t :> _ * _) in + pv.last_advertised_head <- header ; + Some (Worker.Any_request neu) + | Request.New_head (_, header) -> + pv.last_advertised_head <- header ; + (* TODO penalize decreasing fitness *) + match old with + | Some (Worker.Any_request (Request.New_branch _) as old) -> + Some old (* ignore *) + | Some (Worker.Any_request (Request.New_head _)) -> + Some (Any_request neu) + | None -> + Some (Any_request neu) in + Worker.create_table (Dropbox { merge }) + +let create + ?(notify_new_block = fun _ -> ()) + ?(notify_bootstrapped = fun () -> ()) + ?(notify_termination = fun _ -> ()) + limits block_validator net_db peer_id = + let name = (State.Net.id (Distributed_db.net_state net_db), peer_id) in + let parameters = { + net_db ; + notify_termination ; + block_validator ; + notify_new_block ; + notify_bootstrapped ; + limits ; + } in + let module Handlers = struct + type self = t + let on_launch = on_launch + let on_request = on_request + let on_close = on_close + let on_error = on_error + let on_completion = on_completion + let on_no_request _ = return () + end in + Worker.launch table ~timeout: limits.new_head_request_timeout limits.worker_limits + name parameters + (module Handlers) + +let notify_branch w locator = let header, _ = (locator : Block_locator.t :> _ * _) in let hash = Block_header.hash header in - (* TODO penalize decreasing fitness *) - pv.last_advertised_head <- header ; - try Lwt_dropbox.put pv.dropbox (New_branch (hash, locator)) - with Lwt_dropbox.Closed -> () + Worker.drop_request w (New_branch (hash, locator)) -let notify_head pv header = +let notify_head w header = let hash = Block_header.hash header in - pv.last_advertised_head <- header ; - (* TODO penalize decreasing fitness *) - match Lwt_dropbox.peek pv.dropbox with - | Some (New_branch _) -> () (* ignore *) - | None | Some (New_head _) -> - try Lwt_dropbox.put pv.dropbox (New_head (hash, header)) - with Lwt_dropbox.Closed -> () + Worker.drop_request w (New_head (hash, header)) -let shutdown pv = - Lwt_canceler.cancel pv.canceler >>= fun () -> - pv.worker +let shutdown w = + Worker.shutdown w -let peer_id pv = pv.peer_id -let bootstrapped pv = pv.bootstrapped -let current_head pv = pv.last_validated_head +let peer_id w = + let pv = Worker.state w in + pv.peer_id + +let bootstrapped w = + let pv = Worker.state w in + pv.bootstrapped + +let current_head w = + let pv = Worker.state w in + pv.last_validated_head + +let status = Worker.status + +let running_workers () = Worker.list table + +let current_request t = Worker.current_request t + +let last_events = Worker.last_events diff --git a/src/lib_node_shell/peer_validator.mli b/src/lib_node_shell/peer_validator.mli index 86338d1a0..3fdfb9cd5 100644 --- a/src/lib_node_shell/peer_validator.mli +++ b/src/lib_node_shell/peer_validator.mli @@ -9,6 +9,14 @@ type t +type limits = { + new_head_request_timeout: float ; + block_header_timeout: float ; + block_operations_timeout: float ; + protocol_timeout: float ; + worker_limits: Worker_types.limits +} + val peer_id: t -> P2p.Peer_id.t val bootstrapped: t -> bool val current_head: t -> Block_header.t @@ -16,14 +24,17 @@ val current_head: t -> Block_header.t val create: ?notify_new_block: (State.Block.t -> unit) -> ?notify_bootstrapped: (unit -> unit) -> - ?notify_termination: (t -> unit) -> - new_head_request_timeout:float -> - block_header_timeout:float -> - block_operations_timeout:float -> - protocol_timeout:float -> + ?notify_termination: (unit -> unit) -> + limits -> Block_validator.t -> Distributed_db.net_db -> P2p.Peer_id.t -> t Lwt.t val shutdown: t -> unit Lwt.t val notify_branch: t -> Block_locator.t -> unit val notify_head: t -> Block_header.t -> unit + +val running_workers: unit -> ((Net_id.t * P2p.Peer_id.t) * t) list +val status: t -> Worker_types.worker_status + +val current_request : t -> (Time.t * Time.t * Peer_validator_worker_state.Request.view) option +val last_events : t -> (Lwt_log_core.level * Peer_validator_worker_state.Event.t list) list diff --git a/src/lib_node_shell/validator.ml b/src/lib_node_shell/validator.ml index d73742a03..0e2f90f83 100644 --- a/src/lib_node_shell/validator.ml +++ b/src/lib_node_shell/validator.ml @@ -15,6 +15,7 @@ type t = { db: Distributed_db.t ; block_validator: Block_validator.t ; timeout: Net_validator.timeout ; + peer_validator_limits: Peer_validator.limits ; block_validator_limits: Block_validator.limits ; prevalidator_limits: Prevalidator.limits ; @@ -24,13 +25,15 @@ type t = { } let create state db timeout + peer_validator_limits block_validator_limits prevalidator_limits = Block_validator.create block_validator_limits db >>= fun block_validator -> let valid_block_input = Lwt_watcher.create_input () in Lwt.return { state ; db ; timeout ; block_validator ; - prevalidator_limits ; block_validator_limits ; + prevalidator_limits ; + peer_validator_limits ; block_validator_limits ; valid_block_input ; active_nets = Net_id.Table.create 7 ; } @@ -44,7 +47,7 @@ let activate v ?bootstrap_threshold ?max_child_ttl net_state = Net_validator.create ?bootstrap_threshold ?max_child_ttl - v.timeout v.prevalidator_limits + v.timeout v.peer_validator_limits v.prevalidator_limits v.block_validator v.valid_block_input v.db net_state in Net_id.Table.add v.active_nets net_id nv ; nv diff --git a/src/lib_node_shell/validator.mli b/src/lib_node_shell/validator.mli index 397fc215c..8d0c45e2f 100644 --- a/src/lib_node_shell/validator.mli +++ b/src/lib_node_shell/validator.mli @@ -15,6 +15,7 @@ val create: State.t -> Distributed_db.t -> Net_validator.timeout -> + Peer_validator.limits -> Block_validator.limits -> Prevalidator.limits -> t Lwt.t diff --git a/src/lib_node_shell_base/peer_validator_worker_state.ml b/src/lib_node_shell_base/peer_validator_worker_state.ml new file mode 100644 index 000000000..6ef40f1b9 --- /dev/null +++ b/src/lib_node_shell_base/peer_validator_worker_state.ml @@ -0,0 +1,115 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module Request = struct + type view = + | New_head of Block_hash.t + | New_branch of Block_hash.t * int + + let encoding = + let open Data_encoding in + union + [ case (Tag 0) + (obj2 + (req "request" (constant "new_head")) + (req "block" Block_hash.encoding)) + (function New_head h -> Some ((), h) | _ -> None) + (fun ((), h) -> New_head h) ; + case (Tag 1) + (obj3 + (req "request" (constant "new_branch")) + (req "block" Block_hash.encoding) + (req "locator_length" uint16)) + (function New_branch (h, l) -> Some ((), h, l) | _ -> None) + (fun ((), h, l) -> New_branch (h, l)) ] + + let pp ppf = function + | New_head hash -> + Format.fprintf ppf "New head %a" Block_hash.pp hash + | New_branch (hash, len) -> + Format.fprintf ppf "New branch %a, locator length %d" + Block_hash.pp hash len +end + +module Event = struct + type t = + | Request of (Request.view * Worker_types.request_status * error list option) + | Debug of string + + let level req = + match req with + | Debug _ -> Logging.Info + | Request _ -> Logging.Notice + + let encoding error_encoding = + let open Data_encoding in + union + [ case (Tag 0) + (obj1 (req "message" string)) + (function Debug msg -> Some msg | _ -> None) + (fun msg -> Debug msg) ; + case (Tag 1) + (obj2 + (req "request" Request.encoding) + (req "status" Worker_types.request_status_encoding)) + (function Request (req, t, None) -> Some (req, t) | _ -> None) + (fun (req, t) -> Request (req, t, None)) ; + case (Tag 2) + (obj3 + (req "error" error_encoding) + (req "failed_request" Request.encoding) + (req "status" Worker_types.request_status_encoding)) + (function Request (req, t, Some errs) -> Some (errs, req, t) | _ -> None) + (fun (errs, req, t) -> Request (req, t, Some errs)) ] + + let pp ppf = function + | Debug msg -> Format.fprintf ppf "%s" msg + | Request (view, { pushed ; treated ; completed }, None) -> + Format.fprintf ppf + "@[%a@,\ + Pushed: %a, Treated: %a, Completed: %a@]" + Request.pp view + Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed + | Request (view, { pushed ; treated ; completed }, Some errors) -> + Format.fprintf ppf + "@[%a@,\ + Pushed: %a, Treated: %a, Failed: %a@,\ + Error: %a@]" + Request.pp view + Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed + Error_monad.pp_print_error errors +end + +module Worker_state = struct + type view = + { bootstrapped : bool ; + mutable last_validated_head: Block_hash.t ; + mutable last_advertised_head: Block_hash.t } + let encoding = + let open Data_encoding in + conv + (function { bootstrapped ; last_validated_head ; last_advertised_head } -> + (bootstrapped, last_validated_head, last_advertised_head)) + (function (bootstrapped, last_validated_head, last_advertised_head) -> + { bootstrapped ; last_validated_head ; last_advertised_head }) + (obj3 + (req "bootstrapped" bool) + (req "last_validated_head" Block_hash.encoding) + (req "last_advertised_head" Block_hash.encoding)) + + let pp ppf state = + Format.fprintf ppf + "@[Bootstrapped: %s@,\ + Last validated head: %a@,\ + Last advertised head: %a@]" + (if state.bootstrapped then "yes" else "no") + Block_hash.pp state.last_validated_head + Block_hash.pp state.last_advertised_head + +end diff --git a/src/lib_node_shell_base/peer_validator_worker_state.mli b/src/lib_node_shell_base/peer_validator_worker_state.mli new file mode 100644 index 000000000..e1790c192 --- /dev/null +++ b/src/lib_node_shell_base/peer_validator_worker_state.mli @@ -0,0 +1,34 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module Request : sig + type view = + | New_head of Block_hash.t + | New_branch of Block_hash.t * int + val encoding : view Data_encoding.encoding + val pp : Format.formatter -> view -> unit +end + +module Event : sig + type t = + | Request of (Request.view * Worker_types.request_status * error list option) + | Debug of string + val level : t -> Logging.level + val encoding : error list Data_encoding.encoding -> t Data_encoding.encoding + val pp : Format.formatter -> t -> unit +end + +module Worker_state : sig + type view = + { bootstrapped : bool ; + mutable last_validated_head: Block_hash.t ; + mutable last_advertised_head: Block_hash.t } + val encoding : view Data_encoding.encoding + val pp : Format.formatter -> view -> unit +end