From 1163c192135af5be094ca108b6192b38dda4200a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 13 Nov 2017 14:25:02 +0100 Subject: [PATCH] Shell: add configuration variable for various timeouts. --- src/node/main/node_config_file.ml | 39 ++++++++++++++++++++++++--- src/node/main/node_config_file.mli | 1 + src/node/main/node_run_command.ml | 2 +- src/node/shell/block_locator.mli | 12 ++++++++- src/node/shell/block_validator.ml | 18 +++++++++---- src/node/shell/block_validator.mli | 4 ++- src/node/shell/bootstrap_pipeline.ml | 8 ++++-- src/node/shell/bootstrap_pipeline.mli | 2 ++ src/node/shell/net_validator.ml | 25 ++++++++++++++--- src/node/shell/net_validator.mli | 9 +++++++ src/node/shell/node.ml | 13 ++++++--- src/node/shell/node.mli | 10 ++++++- src/node/shell/peer_validator.ml | 27 +++++++++++++++---- src/node/shell/peer_validator.mli | 4 +++ src/node/shell/prevalidator.ml | 18 +++++++------ src/node/shell/prevalidator.mli | 4 ++- src/node/shell/validator.ml | 12 ++++++--- src/node/shell/validator.mli | 2 +- 18 files changed, 169 insertions(+), 41 deletions(-) diff --git a/src/node/main/node_config_file.ml b/src/node/main/node_config_file.ml index 0f5d7b855..df6ea183a 100644 --- a/src/node/main/node_config_file.ml +++ b/src/node/main/node_config_file.ml @@ -54,6 +54,7 @@ and log = { and shell = { bootstrap_threshold : int ; + timeout : Node.timeout ; } let default_net_limits : P2p.limits = { @@ -103,6 +104,13 @@ let default_log = { let default_shell = { bootstrap_threshold = 4 ; + timeout = { + operation = 10. ; + block_header = 60. ; + block_operations = 60. ; + protocol = 120. ; + new_head_request = 90. ; + } } let default_config = { @@ -245,13 +253,35 @@ let log = (opt "rules" string) (dft "template" string default_log.template)) +let timeout_encoding = + let open Data_encoding in + let uint8 = conv int_of_float float_of_int uint8 in + conv + (fun { Node.operation ; block_header ; block_operations ; + protocol ; new_head_request } -> + (operation, block_header, block_operations, + protocol, new_head_request)) + (fun (operation, block_header, block_operations, + protocol, new_head_request) -> + { operation ; block_header ; block_operations ; + protocol ; new_head_request }) + (obj5 + (dft "operation" uint8 default_shell.timeout.operation) + (dft "block_header" uint8 default_shell.timeout.block_header) + (dft "block_operations" uint8 default_shell.timeout.block_operations) + (dft "protocol" uint8 default_shell.timeout.protocol) + (dft "new_head_request" uint8 default_shell.timeout.new_head_request) + ) + let shell = let open Data_encoding in conv - (fun { bootstrap_threshold } -> bootstrap_threshold) - (fun bootstrap_threshold -> { bootstrap_threshold }) - (obj1 - (dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold)) + (fun { bootstrap_threshold ; timeout } -> bootstrap_threshold, timeout) + (fun (bootstrap_threshold, timeout) -> { bootstrap_threshold ; timeout }) + (obj2 + (dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold) + (dft "timeout" timeout_encoding default_shell.timeout) + ) let encoding = let open Data_encoding in @@ -369,6 +399,7 @@ let update Utils.unopt ~default:cfg.shell.bootstrap_threshold bootstrap_threshold ; + timeout = cfg.shell.timeout ; } in return { data_dir ; net ; rpc ; log ; shell } diff --git a/src/node/main/node_config_file.mli b/src/node/main/node_config_file.mli index dc4c985a3..702826447 100644 --- a/src/node/main/node_config_file.mli +++ b/src/node/main/node_config_file.mli @@ -44,6 +44,7 @@ and log = { and shell = { bootstrap_threshold : int ; + timeout : Node.timeout ; } val default_data_dir: string diff --git a/src/node/main/node_run_command.ml b/src/node/main/node_run_command.ml index 644413c11..89b87bd44 100644 --- a/src/node/main/node_run_command.ml +++ b/src/node/main/node_run_command.ml @@ -144,7 +144,7 @@ let init_node ?sandbox (config : Node_config_file.t) = test_network_max_tll = Some (48 * 3600) ; (* 2 days *) bootstrap_threshold = config.shell.bootstrap_threshold ; } in - Node.create node_config + Node.create node_config config.shell.timeout let () = let old_hook = !Lwt.async_exception_hook in diff --git a/src/node/shell/block_locator.mli b/src/node/shell/block_locator.mli index 210a6a1da..69941a300 100644 --- a/src/node/shell/block_locator.mli +++ b/src/node/shell/block_locator.mli @@ -21,7 +21,9 @@ val compute: Block.t -> int -> t Lwt.t the [block]. The locator contains at most [max_length] elements. *) val fold: - f:('a -> block:Block_hash.t -> pred:Block_hash.t -> step:int -> strict_step:bool -> 'a) -> + f:('a -> + block:Block_hash.t -> pred:Block_hash.t -> + step:int -> strict_step:bool -> 'a) -> 'a -> t -> 'a (** [map f l] applies [f] to each block of the locator, the last one excepted. The function also receives the expected predecessor @@ -36,7 +38,15 @@ type step = { step: int ; strict_step: bool ; } +(** A 'step' in a locator is a couple of consecutives hashes in the + locator, and the expected difference of level the two blocks (or + an upper bounds when [strict_step = false]). *) + val to_steps: t -> step list +(** Build all the 'steps' composing the locator, starting with the + oldest one (typically the predecessor of the first step will be + `genesis`). All steps contains [strict_step = true], except the + first one. *) val estimated_length: t -> int (** [estimated_length locator] estimate the length of the chain diff --git a/src/node/shell/block_validator.ml b/src/node/shell/block_validator.ml index b6ca655fa..3b7b67eae 100644 --- a/src/node/shell/block_validator.ml +++ b/src/node/shell/block_validator.ml @@ -25,6 +25,7 @@ type message = Message: 'a request * 'a Lwt.u option -> message type t = { protocol_validator: Protocol_validator.t ; + protocol_timeout: float ; mutable worker: unit Lwt.t ; messages: message Lwt_pipe.t ; canceler: Canceler.t ; @@ -421,7 +422,9 @@ let rec worker_loop bv = lwt_debug "previously validated block %a (after pipe)" Block_hash.pp_short hash >>= fun () -> Protocol_validator.prefetch_and_compile_protocols - bv.protocol_validator ?peer ~timeout:60. block ; + bv.protocol_validator + ?peer ~timeout:bv.protocol_timeout + block ; may_wakeup (Ok block) ; return () | None -> @@ -447,7 +450,9 @@ let rec worker_loop bv = assert false (* should not happen *) | Some block -> Protocol_validator.prefetch_and_compile_protocols - bv.protocol_validator ?peer ~timeout:60. block ; + bv.protocol_validator + ?peer ~timeout:bv.protocol_timeout + block ; may_wakeup (Ok block) ; notify_new_block block ; return () @@ -481,12 +486,13 @@ let rec worker_loop bv = Canceler.cancel bv.canceler >>= fun () -> Lwt.return_unit -let create db = +let create ~protocol_timeout db = let protocol_validator = Protocol_validator.create db in let canceler = Canceler.create () in let messages = Lwt_pipe.create () in let bv = { protocol_validator ; + protocol_timeout ; canceler ; messages ; worker = Lwt.return_unit } in Canceler.on_cancel bv.canceler begin fun () -> @@ -503,7 +509,7 @@ let shutdown { canceler ; worker } = Canceler.cancel canceler >>= fun () -> worker -let validate { messages ; protocol_validator } +let validate { messages ; protocol_validator ; protocol_timeout } ?canceler ?peer ?(notify_new_block = fun _ -> ()) net_db hash (header : Block_header.t) operations = let net_state = Distributed_db.net_state net_db in @@ -512,7 +518,9 @@ let validate { messages ; protocol_validator } lwt_debug "previously validated block %a (before pipe)" Block_hash.pp_short hash >>= fun () -> Protocol_validator.prefetch_and_compile_protocols - protocol_validator ?peer ~timeout:60. block ; + protocol_validator + ?peer ~timeout:protocol_timeout + block ; return block | None -> let res, wakener = Lwt.task () in diff --git a/src/node/shell/block_validator.mli b/src/node/shell/block_validator.mli index 4d3d129ae..1d4e526ed 100644 --- a/src/node/shell/block_validator.mli +++ b/src/node/shell/block_validator.mli @@ -39,7 +39,9 @@ type error += expected: Operation_list_list_hash.t ; found: Operation_list_list_hash.t } -val create: Distributed_db.t -> t +val create: + protocol_timeout:float -> + Distributed_db.t -> t val validate: t -> diff --git a/src/node/shell/bootstrap_pipeline.ml b/src/node/shell/bootstrap_pipeline.ml index 0bcd99a02..ce9fba01e 100644 --- a/src/node/shell/bootstrap_pipeline.ml +++ b/src/node/shell/bootstrap_pipeline.ml @@ -12,6 +12,8 @@ module Canceler = Lwt_utils.Canceler type t = { canceler: Canceler.t ; + block_header_timeout: float ; + block_operations_timeout: float ; mutable headers_fetch_worker: unit Lwt.t ; mutable operations_fetch_worker: unit Lwt.t ; mutable validation_worker: unit Lwt.t ; @@ -56,7 +58,7 @@ let fetch_step pipeline (step : Block_locator.step) = P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> Distributed_db.Block_header.fetch - ~timeout:60. (* TODO allow to adjust the constant ... *) + ~timeout:pipeline.block_header_timeout pipeline.net_db ~peer:pipeline.peer_id hash () end >>=? fun header -> @@ -108,7 +110,7 @@ let rec operations_fetch_worker_loop pipeline = (fun i -> Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> Distributed_db.Operations.fetch - ~timeout:60. (* TODO allow to adjust the constant ... *) + ~timeout:pipeline.block_operations_timeout pipeline.net_db ~peer:pipeline.peer_id (hash, i) header.shell.operations_hash end) @@ -170,6 +172,7 @@ let rec validation_worker_loop pipeline = let create ?(notify_new_block = fun _ -> ()) + ~block_header_timeout ~block_operations_timeout block_validator peer_id net_db locator = let canceler = Canceler.create () in let fetched_headers = @@ -178,6 +181,7 @@ let create Lwt_pipe.create ~size:(50, fun _ -> 1) () in let pipeline = { canceler ; + block_header_timeout ; block_operations_timeout ; headers_fetch_worker = Lwt.return_unit ; operations_fetch_worker = Lwt.return_unit ; validation_worker = Lwt.return_unit ; diff --git a/src/node/shell/bootstrap_pipeline.mli b/src/node/shell/bootstrap_pipeline.mli index dd0290543..e4333186f 100644 --- a/src/node/shell/bootstrap_pipeline.mli +++ b/src/node/shell/bootstrap_pipeline.mli @@ -11,6 +11,8 @@ type t val create: ?notify_new_block: (State.Block.t -> unit) -> + block_header_timeout:float -> + block_operations_timeout: float -> Block_validator.t -> P2p.Peer_id.t -> Distributed_db.net_db -> Block_locator.t -> t diff --git a/src/node/shell/net_validator.ml b/src/node/shell/net_validator.ml index e42073b50..690136b34 100644 --- a/src/node/shell/net_validator.ml +++ b/src/node/shell/net_validator.ml @@ -17,6 +17,7 @@ type t = { net_db: Distributed_db.net_db ; block_validator: Block_validator.t ; + timeout: timeout ; bootstrap_threshold: int ; mutable bootstrapped: bool ; bootstrapped_wakener: unit Lwt.u ; @@ -38,6 +39,15 @@ type t = { } +and timeout = { + operation: float ; + block_header: float ; + block_operations: float ; + protocol: float ; + new_head_request: float ; +} + + let rec shutdown nv = Canceler.cancel nv.canceler >>= fun () -> Distributed_db.deactivate nv.net_db >>= fun () -> @@ -73,6 +83,10 @@ let may_activate_peer_validator nv peer_id = with Not_found -> let pv = Peer_validator.create + ~new_head_request_timeout:nv.timeout.new_head_request + ~block_header_timeout:nv.timeout.block_header + ~block_operations_timeout:nv.timeout.block_operations + ~protocol_timeout:nv.timeout.protocol ~notify_new_block:(notify_new_block nv) ~notify_bootstrapped: begin fun () -> P2p.Peer_id.Table.add nv.bootstrapped_peers peer_id () ; @@ -108,10 +122,11 @@ let broadcast_head nv ~previous block = let rec create ?max_child_ttl ?parent ?(bootstrap_threshold = 1) - block_validator + timeout block_validator global_valid_block_input db net_state = let net_db = Distributed_db.activate db net_state in - Prevalidator.create net_db >>= fun prevalidator -> + Prevalidator.create + ~operation_timeout:timeout.operation net_db >>= fun prevalidator -> let valid_block_input = Watcher.create_input () in let new_head_input = Watcher.create_input () in let canceler = Canceler.create () in @@ -119,6 +134,7 @@ let rec create let nv = { db ; net_state ; net_db ; block_validator ; prevalidator ; + timeout ; valid_block_input ; global_valid_block_input ; new_head_input ; parent ; max_child_ttl ; child = None ; @@ -229,7 +245,7 @@ and may_switch_test_network nv block = return net_state end >>=? fun net_state -> create - ~parent:nv nv.block_validator + ~parent:nv nv.timeout nv.block_validator nv.global_valid_block_input nv.db net_state >>= fun child -> nv.child <- Some child ; @@ -288,12 +304,13 @@ and may_switch_test_network nv block = let create ?max_child_ttl ?bootstrap_threshold + timeout block_validator global_valid_block_input global_db state = (* hide the optional ?parent *) create ?max_child_ttl ?bootstrap_threshold - block_validator global_valid_block_input global_db state + timeout block_validator global_valid_block_input global_db state let net_id { net_state } = State.Net.id net_state let net_state { net_state } = net_state diff --git a/src/node/shell/net_validator.mli b/src/node/shell/net_validator.mli index 76c46fdaa..ffc9a424e 100644 --- a/src/node/shell/net_validator.mli +++ b/src/node/shell/net_validator.mli @@ -9,9 +9,18 @@ type t +type timeout = { + operation: float ; + block_header: float ; + block_operations: float ; + protocol: float ; + new_head_request: float ; +} + val create: ?max_child_ttl:int -> ?bootstrap_threshold:int -> + timeout -> Block_validator.t -> State.Block.t Watcher.input -> Distributed_db.t -> diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index a064d2d9f..8bb722e26 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -90,22 +90,29 @@ type config = { bootstrap_threshold: int ; } +and timeout = Net_validator.timeout = { + operation: float ; + block_header: float ; + block_operations: float ; + protocol: float ; + new_head_request: float ; +} + let may_create_net state genesis = State.Net.get state (Net_id.of_block_hash genesis.State.Net.block) >>= function | Ok net -> Lwt.return net | Error _ -> State.Net.create state genesis - let create { genesis ; store_root ; context_root ; patch_context ; p2p = net_params ; test_network_max_tll = max_child_ttl ; - bootstrap_threshold } = + bootstrap_threshold } timeout = init_p2p net_params >>=? fun p2p -> State.read ~store_root ~context_root ?patch_context () >>=? fun state -> let distributed_db = Distributed_db.create state p2p in - let validator = Validator.create state distributed_db in + let validator = Validator.create state distributed_db timeout in may_create_net state genesis >>= fun mainnet_state -> Validator.activate validator ~bootstrap_threshold diff --git a/src/node/shell/node.mli b/src/node/shell/node.mli index 89f58a306..6b44796e5 100644 --- a/src/node/shell/node.mli +++ b/src/node/shell/node.mli @@ -19,7 +19,15 @@ type config = { bootstrap_threshold: int ; } -val create: config -> t tzresult Lwt.t +and timeout = { + operation: float ; + block_header: float ; + block_operations: float ; + protocol: float ; + new_head_request: float ; +} + +val create: config -> timeout -> t tzresult Lwt.t module RPC : sig diff --git a/src/node/shell/peer_validator.ml b/src/node/shell/peer_validator.ml index 8c1a9a45f..2b0316158 100644 --- a/src/node/shell/peer_validator.ml +++ b/src/node/shell/peer_validator.ml @@ -22,6 +22,11 @@ type t = { net_db: Distributed_db.net_db ; block_validator: Block_validator.t ; + new_head_request_timeout: float ; + block_header_timeout: float ; + block_operations_timeout: float ; + protocol_timeout: float ; + (* callback to net_validator *) notify_new_block: State.Block.t -> unit ; notify_bootstrapped: unit -> unit ; @@ -54,6 +59,8 @@ let bootstrap_new_branch pv _ancestor _head unknown_prefix = let pipeline = Bootstrap_pipeline.create ~notify_new_block:pv.notify_new_block + ~block_header_timeout:pv.block_header_timeout + ~block_operations_timeout:pv.block_operations_timeout pv.block_validator pv.peer_id pv.net_db unknown_prefix in Lwt_utils.protect ~canceler:pv.canceler @@ -93,7 +100,7 @@ let validate_new_head pv hash (header : Block_header.t) = (fun i -> Lwt_utils.protect ~canceler:pv.canceler begin fun () -> Distributed_db.Operations.fetch - ~timeout:60. (* TODO allow to adjust the constant ... *) + ~timeout:pv.block_operations_timeout pv.net_db ~peer:pv.peer_id (hash, i) header.shell.operations_hash end) @@ -165,9 +172,9 @@ let may_validate_new_branch pv distant_hash locator = let rec worker_loop pv = begin Lwt_utils.protect ~canceler:pv.canceler begin fun () -> - (* TODO should the timeout be protocol dependent ?? *) - (* TODO or setup by the local admin ?? or a mix ??*) - Lwt_dropbox.take_with_timeout 90. pv.dropbox >>= return + Lwt_dropbox.take_with_timeout + pv.new_head_request_timeout + pv.dropbox >>= return end >>=? function | None -> lwt_log_info "no new head from peer %a for 90 seconds." @@ -199,7 +206,9 @@ let rec worker_loop pv = | Error [Block_validator.Unavailable_protocol { protocol } ] -> begin Block_validator.fetch_and_compile_protocol pv.block_validator - ~peer:pv.peer_id ~timeout:60. protocol >>= function + ~peer:pv.peer_id + ~timeout:pv.protocol_timeout + protocol >>= function | Ok _ -> worker_loop pv | Error _ -> (* TODO penality... *) @@ -227,6 +236,10 @@ let create ?notify_new_block:(external_notify_new_block = fun _ -> ()) ?(notify_bootstrapped = fun () -> ()) ?(notify_termination = fun _ -> ()) + ~new_head_request_timeout + ~block_header_timeout + ~block_operations_timeout + ~protocol_timeout block_validator net_db peer_id = lwt_debug "creating validator for peer %a." P2p.Peer_id.pp_short peer_id >>= fun () -> @@ -241,6 +254,10 @@ let create block_validator ; notify_new_block ; notify_bootstrapped ; + new_head_request_timeout ; + block_header_timeout ; + block_operations_timeout ; + protocol_timeout ; net_db ; peer_id ; bootstrapped = false ; diff --git a/src/node/shell/peer_validator.mli b/src/node/shell/peer_validator.mli index 824c403f4..2426115d5 100644 --- a/src/node/shell/peer_validator.mli +++ b/src/node/shell/peer_validator.mli @@ -17,6 +17,10 @@ val create: ?notify_new_block: (State.Block.t -> unit) -> ?notify_bootstrapped: (unit -> unit) -> ?notify_termination: (t -> unit) -> + new_head_request_timeout:float -> + block_header_timeout:float -> + block_operations_timeout:float -> + protocol_timeout:float -> Block_validator.t -> Distributed_db.net_db -> P2p.Peer_id.t -> t Lwt.t val shutdown: t -> unit Lwt.t diff --git a/src/node/shell/prevalidator.ml b/src/node/shell/prevalidator.ml index 758e51bd1..509f11524 100644 --- a/src/node/shell/prevalidator.ml +++ b/src/node/shell/prevalidator.ml @@ -70,7 +70,9 @@ let merge _key a b = | Some x, None -> Some x | _, Some y -> Some y -let create net_db = +let create + ~operation_timeout + net_db = let net_state = Distributed_db.net_state net_db in @@ -248,7 +250,7 @@ let create net_db = ops in let fetch h = Distributed_db.Operation.fetch - ~timeout:10. (* TODO allow to adjust the constant ... *) + ~timeout:operation_timeout net_db ~peer:gid h () >>= function | Ok _op -> push_to_worker (`Handle h) ; @@ -266,12 +268,12 @@ let create net_db = List.iter (fun op -> Operation_hash.Table.add pending op (fetch op)) unknown_ops ; - List.iter (fun op -> - Lwt.ignore_result - (Distributed_db.Operation.fetch - (* TODO allow to adjust the constant ... *) - ~timeout:10. - net_db ~peer:gid op ())) + List.iter + (fun op -> + Lwt.ignore_result + (Distributed_db.Operation.fetch + ~timeout:operation_timeout + net_db ~peer:gid op ())) known_ops ; Lwt.return_unit | `Handle op -> diff --git a/src/node/shell/prevalidator.mli b/src/node/shell/prevalidator.mli index fd0af4a05..b8d8f687b 100644 --- a/src/node/shell/prevalidator.mli +++ b/src/node/shell/prevalidator.mli @@ -29,7 +29,9 @@ type t (** Creation and destruction of a "prevalidation" worker. *) -val create: Distributed_db.net_db -> t Lwt.t +val create: + operation_timeout: float -> + Distributed_db.net_db -> t Lwt.t val shutdown: t -> unit Lwt.t val notify_operations: t -> P2p.Peer_id.t -> Operation_hash.t list -> unit diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index a0c21ef6c..43448df15 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -15,16 +15,20 @@ type t = { state: State.t ; db: Distributed_db.t ; block_validator: Block_validator.t ; + timeout: Net_validator.timeout ; valid_block_input: State.Block.t Watcher.input ; active_nets: Net_validator.t Lwt.t Net_id.Table.t ; } -let create state db = - let block_validator = Block_validator.create db in +let create state db timeout = + let block_validator = + Block_validator.create + ~protocol_timeout:timeout.Net_validator.protocol + db in let valid_block_input = Watcher.create_input () in - { state ; db ; block_validator ; + { state ; db ; timeout ; block_validator ; valid_block_input ; active_nets = Net_id.Table.create 7 ; } @@ -38,7 +42,7 @@ let activate v ?bootstrap_threshold ?max_child_ttl net_state = Net_validator.create ?bootstrap_threshold ?max_child_ttl - v.block_validator v.valid_block_input v.db net_state in + v.timeout v.block_validator v.valid_block_input v.db net_state in Net_id.Table.add v.active_nets net_id nv ; nv diff --git a/src/node/shell/validator.mli b/src/node/shell/validator.mli index 5c02d68e8..03b5ff816 100644 --- a/src/node/shell/validator.mli +++ b/src/node/shell/validator.mli @@ -9,7 +9,7 @@ type t -val create: State.t -> Distributed_db.t -> t +val create: State.t -> Distributed_db.t -> Net_validator.timeout -> t val shutdown: t -> unit Lwt.t val activate: