diff --git a/src/bin_node/node_config_file.ml b/src/bin_node/node_config_file.ml index 0fd5625de..40278de4c 100644 --- a/src/bin_node/node_config_file.ml +++ b/src/bin_node/node_config_file.ml @@ -53,11 +53,10 @@ and log = { } and shell = { - bootstrap_threshold : int ; block_validator_limits : Node.block_validator_limits ; prevalidator_limits : Node.prevalidator_limits ; - timeout : Node.timeout ; peer_validator_limits : Node.peer_validator_limits ; + net_validator_limits : Node.net_validator_limits ; } let default_net_limits : P2p.limits = { @@ -106,7 +105,6 @@ let default_log = { } let default_shell = { - bootstrap_threshold = 4 ; block_validator_limits = { protocol_timeout = 120. ; worker_limits = { @@ -126,12 +124,6 @@ let default_shell = { zombie_memory = 120. ; } } ; - timeout = { - block_header = 60. ; - block_operations = 60. ; - protocol = 120. ; - new_head_request = 90. ; - } ; peer_validator_limits = { block_header_timeout = 60. ; block_operations_timeout = 60. ; @@ -144,6 +136,15 @@ let default_shell = { zombie_memory = 120. ; } } ; + net_validator_limits = { + bootstrap_threshold = 4 ; + worker_limits = { + backlog_size = 1000 ; + backlog_level = Logging.Info ; + zombie_lifetime = 600. ; + zombie_memory = 120. ; + } + } } let default_config = { @@ -367,44 +368,39 @@ let peer_validator_limits_encoding = default_limits.worker_limits.zombie_lifetime default_limits.worker_limits.zombie_memory)) -let timeout_encoding = +let net_validator_limits_encoding = let open Data_encoding in - let uint8 = conv int_of_float float_of_int uint8 in conv - (fun { Node.block_header ; block_operations ; - protocol ; new_head_request } -> - (block_header, block_operations, - protocol, new_head_request)) - (fun (block_header, block_operations, - protocol, new_head_request) -> - { block_header ; block_operations ; - protocol ; new_head_request }) - (obj4 - (dft "block_header" uint8 default_shell.timeout.block_header) - (dft "block_operations" uint8 default_shell.timeout.block_operations) - (dft "protocol" uint8 default_shell.timeout.protocol) - (dft "new_head_request" uint8 default_shell.timeout.new_head_request) - ) + (fun { Node.bootstrap_threshold ; worker_limits } -> + (bootstrap_threshold, worker_limits)) + (fun (bootstrap_threshold, worker_limits) -> + { bootstrap_threshold ; worker_limits}) + (merge_objs + (obj1 + (dft "bootstrap_threshold" uint8 + default_shell.net_validator_limits.bootstrap_threshold)) + (worker_limits_encoding + default_shell.net_validator_limits.worker_limits.backlog_size + default_shell.net_validator_limits.worker_limits.backlog_level + default_shell.net_validator_limits.worker_limits.zombie_lifetime + default_shell.net_validator_limits.worker_limits.zombie_memory)) let shell = let open Data_encoding in conv - (fun { bootstrap_threshold ; timeout ; peer_validator_limits ; - block_validator_limits ; prevalidator_limits } -> - bootstrap_threshold, timeout, peer_validator_limits, - block_validator_limits, prevalidator_limits) - (fun (bootstrap_threshold, timeout, peer_validator_limits, - block_validator_limits, prevalidator_limits) -> - { bootstrap_threshold ; timeout ; - peer_validator_limits ; - block_validator_limits ; - prevalidator_limits }) - (obj5 - (dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold) - (dft "timeout" timeout_encoding default_shell.timeout) + (fun { peer_validator_limits ; block_validator_limits ; + prevalidator_limits ; net_validator_limits } -> + (peer_validator_limits, block_validator_limits, + prevalidator_limits, net_validator_limits)) + (fun (peer_validator_limits, block_validator_limits, + prevalidator_limits, net_validator_limits) -> + { peer_validator_limits ; block_validator_limits ; + prevalidator_limits ; net_validator_limits }) + (obj4 (dft "peer_validator" peer_validator_limits_encoding default_shell.peer_validator_limits) (dft "block_validator" block_validator_limits_encoding default_shell.block_validator_limits) (dft "prevalidator" prevalidator_limits_encoding default_shell.prevalidator_limits) + (dft "net_validator" net_validator_limits_encoding default_shell.net_validator_limits) ) let encoding = @@ -518,14 +514,16 @@ let update output = Option.unopt ~default:cfg.log.output log_output ; } and shell : shell = { - bootstrap_threshold = - Option.unopt - ~default:cfg.shell.bootstrap_threshold - bootstrap_threshold ; - timeout = cfg.shell.timeout ; peer_validator_limits = cfg.shell.peer_validator_limits ; block_validator_limits = cfg.shell.block_validator_limits ; prevalidator_limits = cfg.shell.prevalidator_limits ; + net_validator_limits = + Option.unopt_map + ~default:cfg.shell.net_validator_limits + ~f:(fun bootstrap_threshold -> + { cfg.shell.net_validator_limits + with bootstrap_threshold }) + bootstrap_threshold } in return { data_dir ; net ; rpc ; log ; shell } diff --git a/src/bin_node/node_config_file.mli b/src/bin_node/node_config_file.mli index 9c58e1c5a..287656c0e 100644 --- a/src/bin_node/node_config_file.mli +++ b/src/bin_node/node_config_file.mli @@ -43,11 +43,10 @@ and log = { } and shell = { - bootstrap_threshold : int ; block_validator_limits : Node.block_validator_limits ; prevalidator_limits : Node.prevalidator_limits ; - timeout : Node.timeout ; peer_validator_limits : Node.peer_validator_limits ; + net_validator_limits : Node.net_validator_limits ; } val default_data_dir: string diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index 2ba43d1f3..4bd6d2c4d 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -168,14 +168,13 @@ let init_node ?sandbox (config : Node_config_file.t) = context_root = context_dir config.data_dir ; p2p = p2p_config ; test_network_max_tll = Some (48 * 3600) ; (* 2 days *) - bootstrap_threshold = config.shell.bootstrap_threshold ; } in Node.create node_config - config.shell.timeout config.shell.peer_validator_limits config.shell.block_validator_limits config.shell.prevalidator_limits + config.shell.net_validator_limits let () = let old_hook = !Lwt.async_exception_hook in diff --git a/src/lib_node_shell/net_validator.ml b/src/lib_node_shell/net_validator.ml index ef3db3da2..61e300ba8 100644 --- a/src/lib_node_shell/net_validator.ml +++ b/src/lib_node_shell/net_validator.ml @@ -7,252 +7,152 @@ (* *) (**************************************************************************) -include Logging.Make(struct let name = "node.validator.net" end) +open Net_validator_worker_state -type t = { +module Name = struct + type t = Net_id.t + let encoding = Net_id.encoding + let base = [ "net_validator" ] + let pp = Net_id.pp_short +end - db: Distributed_db.t ; - net_state: State.Net.t ; - net_db: Distributed_db.net_db ; - block_validator: Block_validator.t ; +module Request = struct + include Request + type _ t = Validated : State.Block.t -> Event.update t + let view (type a) (Validated block : a t) : view = + State.Block.hash block +end - timeout: timeout ; - prevalidator_limits: Prevalidator.limits ; - peer_validator_limits: Peer_validator.limits ; +type limits = { bootstrap_threshold: int ; - mutable bootstrapped: bool ; - bootstrapped_waiter: unit Lwt.t ; - bootstrapped_wakener: unit Lwt.u ; - valid_block_input: State.Block.t Lwt_watcher.input ; - global_valid_block_input: State.Block.t Lwt_watcher.input ; - new_head_input: State.Block.t Lwt_watcher.input ; - - parent: t option ; - max_child_ttl: int option ; - - mutable child: t option ; - prevalidator: Prevalidator.t ; - active_peers: Peer_validator.t Lwt.t P2p.Peer_id.Table.t ; - bootstrapped_peers: unit P2p.Peer_id.Table.t ; - - mutable worker: unit Lwt.t ; - queue: State.Block.t Lwt_pipe.t ; - canceler: Lwt_canceler.t ; - + worker_limits: Worker_types.limits } -and timeout = { - block_header: float ; - block_operations: float ; - protocol: float ; - new_head_request: float ; -} +module Types = struct + include Worker_state + type parameters = { + parent: Name.t option ; + db: Distributed_db.t ; + net_state: State.Net.t ; + net_db: Distributed_db.net_db ; + block_validator: Block_validator.t ; + global_valid_block_input: State.Block.t Lwt_watcher.input ; -let rec shutdown nv = - Lwt_canceler.cancel nv.canceler >>= fun () -> - Distributed_db.deactivate nv.net_db >>= fun () -> - Lwt.join - ( nv.worker :: - Prevalidator.shutdown nv.prevalidator :: - Lwt_utils.may ~f:shutdown nv.child :: - P2p.Peer_id.Table.fold - (fun _ pv acc -> (pv >>= Peer_validator.shutdown) :: acc) - nv.active_peers [] ) >>= fun () -> - Lwt.return_unit + prevalidator_limits: Prevalidator.limits ; + peer_validator_limits: Peer_validator.limits ; + max_child_ttl: int option ; + limits: limits; + } + + type state = { + parameters: parameters ; + + mutable bootstrapped: bool ; + bootstrapped_waiter: unit Lwt.t ; + bootstrapped_wakener: unit Lwt.u ; + valid_block_input: State.Block.t Lwt_watcher.input ; + new_head_input: State.Block.t Lwt_watcher.input ; + + mutable child: + (state * (unit -> unit Lwt.t (* shutdown *))) option ; + prevalidator: Prevalidator.t ; + active_peers: Peer_validator.t Lwt.t P2p.Peer_id.Table.t ; + bootstrapped_peers: unit P2p.Peer_id.Table.t ; + } + + let view (state : state) _ : view = + let { bootstrapped ; active_peers ; bootstrapped_peers } = state in + { bootstrapped ; + active_peers = + P2p.Peer_id.Table.fold (fun id _ l -> id :: l) active_peers [] ; + bootstrapped_peers = + P2p.Peer_id.Table.fold (fun id _ l -> id :: l) bootstrapped_peers [] } +end + +module Worker = Worker.Make (Name) (Event) (Request) (Types) + +open Types + +type t = Worker.infinite Worker.queue Worker.t + +let table = Worker.create_table Queue + +let shutdown w = + Worker.shutdown w let shutdown_child nv = - Lwt_utils.may ~f:shutdown nv.child + Lwt_utils.may ~f:(fun (_, shutdown) -> shutdown ()) nv.child -let notify_new_block nv block = - Option.iter nv.parent - ~f:(fun nv -> Lwt_watcher.notify nv.valid_block_input block) ; +let notify_new_block w block = + let nv = Worker.state w in + Option.iter nv.parameters.parent + ~f:(fun id -> try + let w = List.assoc id (Worker.list table) in + let nv = Worker.state w in + Lwt_watcher.notify nv.valid_block_input block + with Not_found -> ()) ; Lwt_watcher.notify nv.valid_block_input block ; - Lwt_watcher.notify nv.global_valid_block_input block ; - assert (Lwt_pipe.push_now nv.queue block) + Lwt_watcher.notify nv.parameters.global_valid_block_input block ; + Worker.push_request_now w (Validated block) -let may_toggle_bootstrapped_network nv = +let may_toggle_bootstrapped_network w = + let nv = Worker.state w in if not nv.bootstrapped && - P2p.Peer_id.Table.length nv.bootstrapped_peers >= nv.bootstrap_threshold + P2p.Peer_id.Table.length nv.bootstrapped_peers >= nv.parameters.limits.bootstrap_threshold then begin nv.bootstrapped <- true ; Lwt.wakeup_later nv.bootstrapped_wakener () ; end -let may_activate_peer_validator nv peer_id = +let may_activate_peer_validator w peer_id = + let nv = Worker.state w in try P2p.Peer_id.Table.find nv.active_peers peer_id with Not_found -> let pv = Peer_validator.create - ~notify_new_block:(notify_new_block nv) + ~notify_new_block:(notify_new_block w) ~notify_bootstrapped: begin fun () -> P2p.Peer_id.Table.add nv.bootstrapped_peers peer_id () ; - may_toggle_bootstrapped_network nv + may_toggle_bootstrapped_network w end ~notify_termination: begin fun _pv -> P2p.Peer_id.Table.remove nv.active_peers peer_id ; P2p.Peer_id.Table.remove nv.bootstrapped_peers peer_id ; end - nv.peer_validator_limits - nv.block_validator nv.net_db peer_id in + nv.parameters.peer_validator_limits + nv.parameters.block_validator + nv.parameters.net_db + peer_id in P2p.Peer_id.Table.add nv.active_peers peer_id pv ; pv -let broadcast_head nv ~previous block = - if not nv.bootstrapped then - Lwt.return_unit - else begin - begin - State.Block.predecessor block >>= function - | None -> Lwt.return_true - | Some predecessor -> - Lwt.return (State.Block.equal predecessor previous) - end >>= fun successor -> - if successor then begin - Distributed_db.Advertise.current_head nv.net_db block ; - Lwt.return_unit - end else begin - Chain.locator (Distributed_db.net_state nv.net_db) >>= fun locator -> - Distributed_db.Advertise.current_branch nv.net_db locator - end - end - -let rec create - ?max_child_ttl ?parent - ?(bootstrap_threshold = 1) - timeout peer_validator_limits prevalidator_limits block_validator - global_valid_block_input db net_state = - Chain.init_head net_state >>= fun () -> - let net_db = Distributed_db.activate db net_state in - Prevalidator.create - prevalidator_limits net_db >>= fun prevalidator -> - let valid_block_input = Lwt_watcher.create_input () in - let new_head_input = Lwt_watcher.create_input () in - let canceler = Lwt_canceler.create () in - let bootstrapped_waiter, bootstrapped_wakener = Lwt.wait () in - let nv = { - db ; net_state ; net_db ; block_validator ; - prevalidator ; - timeout ; prevalidator_limits ; peer_validator_limits ; - valid_block_input ; global_valid_block_input ; - new_head_input ; - parent ; max_child_ttl ; child = None ; - bootstrapped = (bootstrap_threshold <= 0) ; - bootstrapped_waiter ; - bootstrapped_wakener ; - bootstrap_threshold ; - active_peers = - P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *) - bootstrapped_peers = - P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *) - worker = Lwt.return_unit ; - queue = Lwt_pipe.create () ; - canceler ; - } in - if nv.bootstrapped then Lwt.wakeup_later bootstrapped_wakener () ; - Distributed_db.set_callback net_db { - notify_branch = begin fun peer_id locator -> - Lwt.async begin fun () -> - may_activate_peer_validator nv peer_id >>= fun pv -> - Peer_validator.notify_branch pv locator ; - Lwt.return_unit - end - end ; - notify_head = begin fun peer_id block ops -> - Lwt.async begin fun () -> - may_activate_peer_validator nv peer_id >>= fun pv -> - Peer_validator.notify_head pv block ; - (* TODO notify prevalidator only if head is known ??? *) - Prevalidator.notify_operations nv.prevalidator peer_id ops ; - Lwt.return_unit - end; - end ; - disconnection = begin fun peer_id -> - Lwt.async begin fun () -> - may_activate_peer_validator nv peer_id >>= fun pv -> - Peer_validator.shutdown pv >>= fun () -> - Lwt.return_unit - end - end ; - } ; - nv.worker <- - Lwt_utils.worker - (Format.asprintf "net_validator.%a" Net_id.pp (State.Net.id net_state)) - ~run:(fun () -> worker_loop nv) - ~cancel:(fun () -> Lwt_canceler.cancel nv.canceler) ; - Lwt.return nv - -(** Current block computation *) - -and worker_loop nv = - begin - Lwt_utils.protect ~canceler:nv.canceler begin fun () -> - Lwt_pipe.pop nv.queue >>= return - end >>=? fun block -> - Chain.head nv.net_state >>= fun head -> - let head_header = State.Block.header head - and head_hash = State.Block.hash head - and block_header = State.Block.header block - and block_hash = State.Block.hash block in - if - Fitness.(block_header.shell.fitness <= head_header.shell.fitness) - then - lwt_log_info "current head is better than %a %a %a, we do not switch" - Block_hash.pp_short block_hash - Fitness.pp block_header.shell.fitness - Time.pp_hum block_header.shell.timestamp >>= fun () -> - return () - else begin - Chain.set_head nv.net_state block >>= fun previous -> - broadcast_head nv ~previous block >>= fun () -> - Prevalidator.flush nv.prevalidator block_hash >>=? fun () -> - may_switch_test_network nv block >>= fun () -> - Lwt_watcher.notify nv.new_head_input block ; - lwt_log_notice "update current head %a %a %a(%t)" - Block_hash.pp_short block_hash - Fitness.pp block_header.shell.fitness - Time.pp_hum block_header.shell.timestamp - (fun ppf -> - if Block_hash.equal head_hash block_header.shell.predecessor then - Format.fprintf ppf "same branch" - else - Format.fprintf ppf "changing branch") >>= fun () -> - return () - end - end >>= function - | Ok () -> - worker_loop nv - | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> - Lwt.return_unit - | Error err -> - lwt_log_error "@[Unexpected error:@ %a@]" - pp_print_error err >>= fun () -> - Lwt_canceler.cancel nv.canceler >>= fun () -> - Lwt.return_unit - -and may_switch_test_network nv block = - +let may_switch_test_network w spawn_child block = + let nv = Worker.state w in let create_child genesis protocol expiration = - if State.Net.allow_forked_network nv.net_state then begin + if State.Net.allow_forked_network nv.parameters.net_state then begin shutdown_child nv >>= fun () -> begin let net_id = Net_id.of_block_hash (State.Block.hash genesis) in State.Net.get - (State.Net.global_state nv.net_state) net_id >>= function + (State.Net.global_state nv.parameters.net_state) net_id >>= function | Ok net_state -> return net_state | Error _ -> State.fork_testnet genesis protocol expiration >>=? fun net_state -> Chain.head net_state >>= fun new_genesis_block -> - Lwt_watcher.notify nv.global_valid_block_input new_genesis_block ; + Lwt_watcher.notify nv.parameters.global_valid_block_input new_genesis_block ; Lwt_watcher.notify nv.valid_block_input new_genesis_block ; return net_state end >>=? fun net_state -> - create - ~parent:nv nv.timeout nv.peer_validator_limits - nv.prevalidator_limits nv.block_validator - nv.global_valid_block_input - nv.db net_state >>= fun child -> + spawn_child + ~parent:(State.Net.id net_state) + nv.parameters.peer_validator_limits + nv.parameters.prevalidator_limits + nv.parameters.block_validator + nv.parameters.global_valid_block_input + nv.parameters.db net_state + nv.parameters.limits (* TODO: different limits main/test ? *) >>= fun child -> nv.child <- Some child ; return () end else begin @@ -264,13 +164,13 @@ and may_switch_test_network nv block = let activated = match nv.child with | None -> false - | Some child -> + | Some (child , _) -> Block_hash.equal - (State.Net.genesis child.net_state).block + (State.Net.genesis child.parameters.net_state).block genesis in - State.Block.read nv.net_state genesis >>=? fun genesis -> + State.Block.read nv.parameters.net_state genesis >>=? fun genesis -> begin - match nv.max_child_ttl with + match nv.parameters.max_child_ttl with | None -> Lwt.return expiration | Some ttl -> Lwt.return @@ -297,51 +197,225 @@ and may_switch_test_network nv block = end >>= function | Ok () -> Lwt.return_unit | Error err -> - lwt_log_error "@[Error while switch test network:@ %a@]" - Error_monad.pp_print_error err >>= fun () -> + Worker.record_event w (Could_not_switch_testnet err) ; Lwt.return_unit +let broadcast_head w ~previous block = + let nv = Worker.state w in + if not nv.bootstrapped then + Lwt.return_unit + else begin + begin + State.Block.predecessor block >>= function + | None -> Lwt.return_true + | Some predecessor -> + Lwt.return (State.Block.equal predecessor previous) + end >>= fun successor -> + if successor then begin + Distributed_db.Advertise.current_head + nv.parameters.net_db block ; + Lwt.return_unit + end else begin + let net_state = Distributed_db.net_state nv.parameters.net_db in + Chain.locator net_state >>= fun locator -> + Distributed_db.Advertise.current_branch + nv.parameters.net_db locator + end + end -(* TODO check the initial sequence of message when connecting to a new - peer, and the one when activating a network. *) +let on_request (type a) w spawn_child (req : a Request.t) : a tzresult Lwt.t = + let Request.Validated block = req in + let nv = Worker.state w in + Chain.head nv.parameters.net_state >>= fun head -> + let head_header = State.Block.header head + and head_hash = State.Block.hash head + and block_header = State.Block.header block + and block_hash = State.Block.hash block in + if + Fitness.(block_header.shell.fitness <= head_header.shell.fitness) + then + return Event.Ignored_head + else begin + Chain.set_head nv.parameters.net_state block >>= fun previous -> + broadcast_head w ~previous block >>= fun () -> + Prevalidator.flush nv.prevalidator block_hash >>=? fun () -> + may_switch_test_network w spawn_child block >>= fun () -> + Lwt_watcher.notify nv.new_head_input block ; + if Block_hash.equal head_hash block_header.shell.predecessor then + return Event.Head_incrememt + else + return Event.Branch_switch + end +let on_completion (type a) w (req : a Request.t) (update : a) request_status = + let Request.Validated block = req in + let fitness = State.Block.fitness block in + let request = State.Block.hash block in + Worker.record_event w (Processed_block { request ; request_status ; update ; fitness }) ; + Lwt.return () + +let on_close w = + let nv = Worker.state w in + Distributed_db.deactivate nv.parameters.net_db >>= fun () -> + Lwt.join + (Prevalidator.shutdown nv.prevalidator :: + Lwt_utils.may ~f:(fun (_, shutdown) -> shutdown ()) nv.child :: + P2p.Peer_id.Table.fold + (fun _ pv acc -> (pv >>= Peer_validator.shutdown) :: acc) + nv.active_peers []) >>= fun () -> + Lwt.return_unit + +let on_launch w _ parameters = + Chain.init_head parameters.net_state >>= fun () -> + Prevalidator.create + parameters.prevalidator_limits parameters.net_db >>= fun prevalidator -> + let valid_block_input = Lwt_watcher.create_input () in + let new_head_input = Lwt_watcher.create_input () in + let bootstrapped_waiter, bootstrapped_wakener = Lwt.wait () in + let nv = + { parameters ; + valid_block_input ; + new_head_input ; + bootstrapped_wakener ; + bootstrapped_waiter ; + bootstrapped = (parameters.limits.bootstrap_threshold <= 0) ; + active_peers = + P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *) + bootstrapped_peers = + P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *) + child = None ; + prevalidator } in + if nv.bootstrapped then Lwt.wakeup_later bootstrapped_wakener () ; + Distributed_db.set_callback parameters.net_db { + notify_branch = begin fun peer_id locator -> + Lwt.async begin fun () -> + may_activate_peer_validator w peer_id >>= fun pv -> + Peer_validator.notify_branch pv locator ; + Lwt.return_unit + end + end ; + notify_head = begin fun peer_id block ops -> + Lwt.async begin fun () -> + may_activate_peer_validator w peer_id >>= fun pv -> + Peer_validator.notify_head pv block ; + (* TODO notify prevalidator only if head is known ??? *) + Prevalidator.notify_operations nv.prevalidator peer_id ops ; + Lwt.return_unit + end; + end ; + disconnection = begin fun peer_id -> + Lwt.async begin fun () -> + may_activate_peer_validator w peer_id >>= fun pv -> + Peer_validator.shutdown pv >>= fun () -> + Lwt.return_unit + end + end ; + } ; + Lwt.return nv + +let rec create + ?max_child_ttl ?parent + peer_validator_limits prevalidator_limits block_validator + global_valid_block_input db net_state limits = + let spawn_child ~parent pvl pl bl gvbi db n l = + create ~parent pvl pl bl gvbi db n l >>= fun w -> + Lwt.return (Worker.state w, (fun () -> Worker.shutdown w)) in + let module Handlers = struct + type self = t + let on_launch = on_launch + let on_request w = on_request w spawn_child + let on_close = on_close + let on_error _ _ _ errs = Lwt.return (Error errs) + let on_completion = on_completion + let on_no_request _ = return () + end in + let parameters = + { max_child_ttl ; + parent ; + peer_validator_limits ; + prevalidator_limits ; + block_validator ; + global_valid_block_input ; + db ; + net_db = Distributed_db.activate db net_state ; + net_state ; + limits } in + Worker.launch table + prevalidator_limits.worker_limits + (State.Net.id net_state) + parameters + (module Handlers) + +(** Current block computation *) let create ?max_child_ttl - ?bootstrap_threshold - timeout - block_validator global_valid_block_input global_db state = + peer_validator_limits prevalidator_limits + block_validator global_valid_block_input global_db state limits = (* hide the optional ?parent *) create ?max_child_ttl - ?bootstrap_threshold - timeout block_validator global_valid_block_input global_db state + peer_validator_limits prevalidator_limits + block_validator global_valid_block_input global_db state limits -let net_id { net_state } = State.Net.id net_state -let net_state { net_state } = net_state -let prevalidator { prevalidator } = prevalidator -let net_db { net_db } = net_db -let child { child } = child +let net_id w = + let { parameters = { net_state } } = Worker.state w in + State.Net.id net_state -let validate_block nv ?(force = false) hash block operations = +let net_state w = + let { parameters = { net_state } } = Worker.state w in + net_state + +let prevalidator w = + let { prevalidator } = Worker.state w in + prevalidator + +let net_db w = + let { parameters = { net_db } } = Worker.state w in + net_db + +let child w = + match (Worker.state w).child with + | None -> None + | Some ({ parameters = { net_state } }, _) -> + try Some (List.assoc (State.Net.id net_state) (Worker.list table)) + with Not_found -> None + +let validate_block w ?(force = false) hash block operations = + let nv = Worker.state w in assert (Block_hash.equal hash (Block_header.hash block)) ; - Chain.head nv.net_state >>= fun head -> + Chain.head nv.parameters.net_state >>= fun head -> let head = State.Block.header head in if force || Fitness.(head.shell.fitness <= block.shell.fitness) then Block_validator.validate - ~canceler:nv.canceler - ~notify_new_block:(notify_new_block nv) - nv.block_validator nv.net_db hash block operations + ~canceler:(Worker.canceler w) + ~notify_new_block:(notify_new_block w) + nv.parameters.block_validator + nv.parameters.net_db + hash block operations else failwith "Fitness too low" -let bootstrapped { bootstrapped_waiter } = +let bootstrapped w = + let { bootstrapped_waiter } = Worker.state w in Lwt.protected bootstrapped_waiter -let valid_block_watcher { valid_block_input } = +let valid_block_watcher w = + let{ valid_block_input } = Worker.state w in Lwt_watcher.create_stream valid_block_input -let new_head_watcher { new_head_input } = +let new_head_watcher w = + let { new_head_input } = Worker.state w in Lwt_watcher.create_stream new_head_input + +let status = Worker.status + +let running_workers () = Worker.list table + +let pending_requests t = Worker.pending_requests t + +let current_request t = Worker.current_request t + +let last_events = Worker.last_events diff --git a/src/lib_node_shell/net_validator.mli b/src/lib_node_shell/net_validator.mli index bbaa0ffb3..a046c94ad 100644 --- a/src/lib_node_shell/net_validator.mli +++ b/src/lib_node_shell/net_validator.mli @@ -9,23 +9,20 @@ type t -type timeout = { - block_header: float ; - block_operations: float ; - protocol: float ; - new_head_request: float ; +type limits = { + bootstrap_threshold: int ; + worker_limits: Worker_types.limits } val create: ?max_child_ttl:int -> - ?bootstrap_threshold:int -> - timeout -> Peer_validator.limits -> Prevalidator.limits -> Block_validator.t -> State.Block.t Lwt_watcher.input -> Distributed_db.t -> State.Net.t -> + limits -> t Lwt.t val bootstrapped: t -> unit Lwt.t @@ -47,3 +44,9 @@ val shutdown: t -> unit Lwt.t val valid_block_watcher: t -> State.Block.t Lwt_stream.t * Lwt_watcher.stopper val new_head_watcher: t -> State.Block.t Lwt_stream.t * Lwt_watcher.stopper +val running_workers: unit -> (Net_id.t * t) list +val status: t -> Worker_types.worker_status + +val pending_requests : t -> (Time.t * Net_validator_worker_state.Request.view) list +val current_request : t -> (Time.t * Time.t * Net_validator_worker_state.Request.view) option +val last_events : t -> (Lwt_log_core.level * Net_validator_worker_state.Event.t list) list diff --git a/src/lib_node_shell/node.ml b/src/lib_node_shell/node.ml index 6a2b33180..49274819d 100644 --- a/src/lib_node_shell/node.ml +++ b/src/lib_node_shell/node.ml @@ -86,15 +86,8 @@ type config = { patch_context: (Context.t -> Context.t Lwt.t) option ; p2p: (P2p.config * P2p.limits) option ; test_network_max_tll: int option ; - bootstrap_threshold: int ; } -and timeout = Net_validator.timeout = { - block_header: float ; - block_operations: float ; - protocol: float ; - new_head_request: float ; -} and peer_validator_limits = Peer_validator.limits = { new_head_request_timeout: float ; block_header_timeout: float ; @@ -114,6 +107,11 @@ and block_validator_limits = Block_validator.limits = { worker_limits : Worker_types.limits ; } +and net_validator_limits = Net_validator.limits = { + bootstrap_threshold: int ; + worker_limits : Worker_types.limits ; +} + let may_create_net state genesis = State.Net.get state (Net_id.of_block_hash genesis.State.Net.block) >>= function | Ok net -> Lwt.return net @@ -122,22 +120,22 @@ let may_create_net state genesis = let create { genesis ; store_root ; context_root ; patch_context ; p2p = net_params ; - test_network_max_tll = max_child_ttl ; - bootstrap_threshold } - timeout + test_network_max_tll = max_child_ttl } peer_validator_limits block_validator_limits - prevalidator_limits = + prevalidator_limits + net_validator_limits = init_p2p net_params >>=? fun p2p -> State.read ~store_root ~context_root ?patch_context () >>=? fun state -> let distributed_db = Distributed_db.create state p2p in - Validator.create state distributed_db timeout + Validator.create state distributed_db peer_validator_limits - block_validator_limits prevalidator_limits >>= fun validator -> + block_validator_limits + prevalidator_limits + net_validator_limits >>= fun validator -> may_create_net state genesis >>= fun mainnet_state -> Validator.activate validator - ~bootstrap_threshold ?max_child_ttl mainnet_state >>= fun mainnet_validator -> let shutdown () = P2p.shutdown p2p >>= fun () -> diff --git a/src/lib_node_shell/node.mli b/src/lib_node_shell/node.mli index b4be0d085..b571d76b8 100644 --- a/src/lib_node_shell/node.mli +++ b/src/lib_node_shell/node.mli @@ -16,15 +16,8 @@ type config = { patch_context: (Context.t -> Context.t Lwt.t) option ; p2p: (P2p.config * P2p.limits) option ; test_network_max_tll: int option ; - bootstrap_threshold: int ; } -and timeout = { - block_header: float ; - block_operations: float ; - protocol: float ; - new_head_request: float ; -} and peer_validator_limits = { new_head_request_timeout: float ; block_header_timeout: float ; @@ -41,13 +34,17 @@ and block_validator_limits = { protocol_timeout: float ; worker_limits : Worker_types.limits ; } +and net_validator_limits = { + bootstrap_threshold: int ; + worker_limits : Worker_types.limits ; +} val create: config -> - timeout -> peer_validator_limits -> block_validator_limits -> prevalidator_limits -> + net_validator_limits -> t tzresult Lwt.t module RPC : sig diff --git a/src/lib_node_shell/validator.ml b/src/lib_node_shell/validator.ml index 0e2f90f83..3f7a20953 100644 --- a/src/lib_node_shell/validator.ml +++ b/src/lib_node_shell/validator.ml @@ -14,7 +14,7 @@ type t = { state: State.t ; db: Distributed_db.t ; block_validator: Block_validator.t ; - timeout: Net_validator.timeout ; + net_validator_limits: Net_validator.limits ; peer_validator_limits: Peer_validator.limits ; block_validator_limits: Block_validator.limits ; prevalidator_limits: Prevalidator.limits ; @@ -24,31 +24,31 @@ type t = { } -let create state db timeout +let create state db peer_validator_limits block_validator_limits - prevalidator_limits = + prevalidator_limits + net_validator_limits = Block_validator.create block_validator_limits db >>= fun block_validator -> let valid_block_input = Lwt_watcher.create_input () in Lwt.return - { state ; db ; timeout ; block_validator ; - prevalidator_limits ; - peer_validator_limits ; block_validator_limits ; + { state ; db ; block_validator ; + block_validator_limits ; prevalidator_limits ; + peer_validator_limits ; net_validator_limits ; valid_block_input ; - active_nets = Net_id.Table.create 7 ; - } + active_nets = Net_id.Table.create 7 } -let activate v ?bootstrap_threshold ?max_child_ttl net_state = +let activate v ?max_child_ttl net_state = let net_id = State.Net.id net_state in lwt_log_notice "activate network %a" Net_id.pp net_id >>= fun () -> try Net_id.Table.find v.active_nets net_id with Not_found -> let nv = Net_validator.create - ?bootstrap_threshold ?max_child_ttl - v.timeout v.peer_validator_limits v.prevalidator_limits - v.block_validator v.valid_block_input v.db net_state in + v.peer_validator_limits v.prevalidator_limits + v.block_validator v.valid_block_input v.db net_state + v.net_validator_limits in Net_id.Table.add v.active_nets net_id nv ; nv diff --git a/src/lib_node_shell/validator.mli b/src/lib_node_shell/validator.mli index 8d0c45e2f..2ce44aa8c 100644 --- a/src/lib_node_shell/validator.mli +++ b/src/lib_node_shell/validator.mli @@ -14,17 +14,16 @@ type t val create: State.t -> Distributed_db.t -> - Net_validator.timeout -> Peer_validator.limits -> Block_validator.limits -> Prevalidator.limits -> + Net_validator.limits -> t Lwt.t val shutdown: t -> unit Lwt.t (** Start the validation scheduler of a given network. *) val activate: t -> - ?bootstrap_threshold:int -> ?max_child_ttl:int -> State.Net.t -> Net_validator.t Lwt.t diff --git a/src/lib_node_shell/worker.mli b/src/lib_node_shell/worker.mli index 5ddba78fb..580dd5d35 100644 --- a/src/lib_node_shell/worker.mli +++ b/src/lib_node_shell/worker.mli @@ -161,7 +161,7 @@ module Make self -> 'a Request.t -> 'a tzresult Lwt.t (** Called when no request has been made before the timeout, if - the parameter has been passed to {!launch}. *) + the parameter has been passed to {!launch}. *) val on_no_request : self -> unit tzresult Lwt.t @@ -183,12 +183,13 @@ module Make unit tzresult Lwt.t (** A function called at the end of the worker loop in case of a - successful treatment of the current request. *) + successful treatment of the current request. *) val on_completion : self -> 'a Request.t -> 'a -> Worker_types.request_status -> unit Lwt.t + end (** Creates a new worker instance. diff --git a/src/lib_node_shell_base/net_validator_worker_state.ml b/src/lib_node_shell_base/net_validator_worker_state.ml new file mode 100644 index 000000000..de9eca1dd --- /dev/null +++ b/src/lib_node_shell_base/net_validator_worker_state.ml @@ -0,0 +1,117 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module Request = struct + type view = Block_hash.t + + let encoding = Block_hash.encoding + let pp = Block_hash.pp +end + +module Event = struct + type update = + | Ignored_head + | Branch_switch + | Head_incrememt + type t = + | Processed_block of + { request : Request.view ; + request_status : Worker_types.request_status ; + update : update ; + fitness : Fitness.t } + | Could_not_switch_testnet of error list + + let level = function + | Processed_block req -> + begin match req.update with + | Ignored_head -> Logging.Info + | Branch_switch | Head_incrememt -> Logging.Notice + end + | Could_not_switch_testnet _ -> Logging.Error + + let encoding error_encoding = + let open Data_encoding in + union + [ case (Tag 0) + (obj4 + (req "request" Request.encoding) + (req "status" Worker_types.request_status_encoding) + (req "outcome" + (string_enum [ "ignored", Ignored_head ; + "branch", Branch_switch ; + "increment", Head_incrememt ])) + (req "fitness" Fitness.encoding)) + (function + | Processed_block { request ; request_status ; update ; fitness } -> + Some (request, request_status, update, fitness) + | _ -> None) + (fun (request, request_status, update, fitness) -> + Processed_block { request ; request_status ; update ; fitness }) ; + case (Tag 1) + error_encoding + (function + | Could_not_switch_testnet err -> Some err + | _ -> None) + (fun err -> Could_not_switch_testnet err) ] + + let pp ppf = function + | Processed_block req -> + Format.fprintf ppf "@[" ; + begin match req.update with + | Ignored_head -> + Format.fprintf ppf + "Current head is better than %a (fitness %a), we do not switch@," + | Branch_switch -> + Format.fprintf ppf + "Update current head to %a (fitness %a), changing branch@," + | Head_incrememt -> + Format.fprintf ppf + "Update current head to %a (fitness %a), same branch@," + end + Request.pp req.request + Fitness.pp req.fitness ; + Format.fprintf ppf + "Pushed: %a, Treated: %a, Completed: %a@]" + Time.pp_hum req.request_status.pushed + Time.pp_hum req.request_status.treated + Time.pp_hum req.request_status.completed + | Could_not_switch_testnet err -> + Format.fprintf ppf "@[Error while switch test network:@ %a@]" + Error_monad.pp_print_error err + +end + +module Worker_state = struct + type view = + { active_peers : P2p_types.Peer_id.t list ; + bootstrapped_peers : P2p_types.Peer_id.t list ; + bootstrapped : bool } + let encoding = + let open Data_encoding in + conv + (fun { bootstrapped ; bootstrapped_peers ; active_peers } -> + (bootstrapped, bootstrapped_peers, active_peers)) + (fun (bootstrapped, bootstrapped_peers, active_peers) -> + { bootstrapped ; bootstrapped_peers ; active_peers }) + (obj3 + (req "bootstrapped" bool) + (req "bootstrapped_peers" (list P2p_types.Peer_id.encoding)) + (req "active_peers" (list P2p_types.Peer_id.encoding))) + + let pp ppf { bootstrapped ; bootstrapped_peers ; active_peers } = + Format.fprintf ppf + "@[Network is%s bootstrapped.@,\ + @[Active peers:%a@]@,\ + @[Bootstrapped peers:%a@]@]" + (if bootstrapped then "" else " not yet") + (fun ppf -> List.iter (Format.fprintf ppf "@,- %a" P2p_types.Peer_id.pp)) + active_peers + (fun ppf -> List.iter (Format.fprintf ppf "@,- %a" P2p_types.Peer_id.pp)) + bootstrapped_peers +end diff --git a/src/lib_node_shell_base/net_validator_worker_state.mli b/src/lib_node_shell_base/net_validator_worker_state.mli new file mode 100644 index 000000000..f3c58b9f7 --- /dev/null +++ b/src/lib_node_shell_base/net_validator_worker_state.mli @@ -0,0 +1,40 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module Request : sig + type view = Block_hash.t + val encoding : view Data_encoding.encoding + val pp : Format.formatter -> view -> unit +end + +module Event : sig + type update = + | Ignored_head + | Branch_switch + | Head_incrememt + type t = + | Processed_block of + { request : Request.view ; + request_status : Worker_types.request_status ; + update : update ; + fitness : Fitness.t } + | Could_not_switch_testnet of error list + val level : t -> Logging.level + val encoding : error list Data_encoding.encoding -> t Data_encoding.encoding + val pp : Format.formatter -> t -> unit +end + +module Worker_state : sig + type view = + { active_peers : P2p_types.Peer_id.t list ; + bootstrapped_peers : P2p_types.Peer_id.t list ; + bootstrapped : bool } + val encoding : view Data_encoding.encoding + val pp : Format.formatter -> view -> unit +end