From 755d63c0ef02e6c5d9e26605eb04f1c117dc3fc6 Mon Sep 17 00:00:00 2001 From: Benjamin Canou Date: Wed, 29 Nov 2017 13:51:06 +0100 Subject: [PATCH] Node: limit the refused operations cache in the prevalidator --- src/bin_node/node_config_file.ml | 41 +++++++++++++++++------ src/bin_node/node_config_file.mli | 1 + src/bin_node/node_run_command.ml | 5 ++- src/lib_node_shell/net_validator.ml | 11 +++--- src/lib_node_shell/net_validator.mli | 2 +- src/lib_node_shell/node.ml | 12 +++++-- src/lib_node_shell/node.mli | 7 ++-- src/lib_node_shell/prevalidator.ml | 50 ++++++++++++++++------------ src/lib_node_shell/prevalidator.mli | 10 +++--- src/lib_node_shell/validator.ml | 8 +++-- src/lib_node_shell/validator.mli | 7 +++- 11 files changed, 101 insertions(+), 53 deletions(-) diff --git a/src/bin_node/node_config_file.ml b/src/bin_node/node_config_file.ml index 378caa55f..53c2ee1b8 100644 --- a/src/bin_node/node_config_file.ml +++ b/src/bin_node/node_config_file.ml @@ -54,6 +54,7 @@ and log = { and shell = { bootstrap_threshold : int ; + prevalidator_limits : Node.prevalidator_limits ; timeout : Node.timeout ; } @@ -104,8 +105,11 @@ let default_log = { let default_shell = { bootstrap_threshold = 4 ; + prevalidator_limits = { + operation_timeout = 10. ; + max_refused_operations = 1000 ; + } ; timeout = { - operation = 10. ; block_header = 60. ; block_operations = 60. ; protocol = 120. ; @@ -253,20 +257,33 @@ let log = (opt "rules" string) (dft "template" string default_log.template)) +let prevalidator_limits_encoding = + let open Data_encoding in + let uint8 = conv int_of_float float_of_int uint8 in + conv + (fun { Node.operation_timeout ; max_refused_operations } -> + (operation_timeout, max_refused_operations)) + (fun (operation_timeout, max_refused_operations) -> + { operation_timeout ; max_refused_operations }) + (obj2 + (dft "operations_timeout" uint8 + default_shell.prevalidator_limits.operation_timeout) + (dft "max_refused_operations" uint16 + default_shell.prevalidator_limits.max_refused_operations)) + let timeout_encoding = let open Data_encoding in let uint8 = conv int_of_float float_of_int uint8 in conv - (fun { Node.operation ; block_header ; block_operations ; + (fun { Node.block_header ; block_operations ; protocol ; new_head_request } -> - (operation, block_header, block_operations, + (block_header, block_operations, protocol, new_head_request)) - (fun (operation, block_header, block_operations, + (fun (block_header, block_operations, protocol, new_head_request) -> - { operation ; block_header ; block_operations ; + { block_header ; block_operations ; protocol ; new_head_request }) - (obj5 - (dft "operation" uint8 default_shell.timeout.operation) + (obj4 (dft "block_header" uint8 default_shell.timeout.block_header) (dft "block_operations" uint8 default_shell.timeout.block_operations) (dft "protocol" uint8 default_shell.timeout.protocol) @@ -276,11 +293,14 @@ let timeout_encoding = let shell = let open Data_encoding in conv - (fun { bootstrap_threshold ; timeout } -> bootstrap_threshold, timeout) - (fun (bootstrap_threshold, timeout) -> { bootstrap_threshold ; timeout }) - (obj2 + (fun { bootstrap_threshold ; timeout ; prevalidator_limits } -> + bootstrap_threshold, timeout, prevalidator_limits) + (fun (bootstrap_threshold, timeout, prevalidator_limits) -> + { bootstrap_threshold ; timeout ; prevalidator_limits }) + (obj3 (dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold) (dft "timeout" timeout_encoding default_shell.timeout) + (dft "prevalidator" prevalidator_limits_encoding default_shell.prevalidator_limits) ) let encoding = @@ -399,6 +419,7 @@ let update ~default:cfg.shell.bootstrap_threshold bootstrap_threshold ; timeout = cfg.shell.timeout ; + prevalidator_limits = cfg.shell.prevalidator_limits ; } in return { data_dir ; net ; rpc ; log ; shell } diff --git a/src/bin_node/node_config_file.mli b/src/bin_node/node_config_file.mli index 702826447..25002d6f9 100644 --- a/src/bin_node/node_config_file.mli +++ b/src/bin_node/node_config_file.mli @@ -44,6 +44,7 @@ and log = { and shell = { bootstrap_threshold : int ; + prevalidator_limits : Node.prevalidator_limits ; timeout : Node.timeout ; } diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index e8623b39a..edc7ddd6a 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -170,7 +170,10 @@ let init_node ?sandbox (config : Node_config_file.t) = test_network_max_tll = Some (48 * 3600) ; (* 2 days *) bootstrap_threshold = config.shell.bootstrap_threshold ; } in - Node.create node_config config.shell.timeout + Node.create + node_config + config.shell.timeout + config.shell.prevalidator_limits let () = let old_hook = !Lwt.async_exception_hook in diff --git a/src/lib_node_shell/net_validator.ml b/src/lib_node_shell/net_validator.ml index 471d81677..d9fd9e80b 100644 --- a/src/lib_node_shell/net_validator.ml +++ b/src/lib_node_shell/net_validator.ml @@ -17,6 +17,7 @@ type t = { block_validator: Block_validator.t ; timeout: timeout ; + prevalidator_limits: Prevalidator.limits ; bootstrap_threshold: int ; mutable bootstrapped: bool ; bootstrapped_waiter: unit Lwt.t ; @@ -40,7 +41,6 @@ type t = { } and timeout = { - operation: float ; block_header: float ; block_operations: float ; protocol: float ; @@ -122,13 +122,12 @@ let broadcast_head nv ~previous block = let rec create ?max_child_ttl ?parent ?(bootstrap_threshold = 1) - timeout block_validator + timeout prevalidator_limits block_validator global_valid_block_input db net_state = Chain.init_head net_state >>= fun () -> let net_db = Distributed_db.activate db net_state in Prevalidator.create - ~max_operations:2000 (* FIXME temporary constant *) - ~operation_timeout:timeout.operation net_db >>= fun prevalidator -> + prevalidator_limits net_db >>= fun prevalidator -> let valid_block_input = Lwt_watcher.create_input () in let new_head_input = Lwt_watcher.create_input () in let canceler = Lwt_canceler.create () in @@ -136,7 +135,7 @@ let rec create let nv = { db ; net_state ; net_db ; block_validator ; prevalidator ; - timeout ; + timeout ; prevalidator_limits ; valid_block_input ; global_valid_block_input ; new_head_input ; parent ; max_child_ttl ; child = None ; @@ -252,7 +251,7 @@ and may_switch_test_network nv block = return net_state end >>=? fun net_state -> create - ~parent:nv nv.timeout nv.block_validator + ~parent:nv nv.timeout nv.prevalidator_limits nv.block_validator nv.global_valid_block_input nv.db net_state >>= fun child -> nv.child <- Some child ; diff --git a/src/lib_node_shell/net_validator.mli b/src/lib_node_shell/net_validator.mli index a25c9fa04..316f8f7bc 100644 --- a/src/lib_node_shell/net_validator.mli +++ b/src/lib_node_shell/net_validator.mli @@ -10,7 +10,6 @@ type t type timeout = { - operation: float ; block_header: float ; block_operations: float ; protocol: float ; @@ -21,6 +20,7 @@ val create: ?max_child_ttl:int -> ?bootstrap_threshold:int -> timeout -> + Prevalidator.limits -> Block_validator.t -> State.Block.t Lwt_watcher.input -> Distributed_db.t -> diff --git a/src/lib_node_shell/node.ml b/src/lib_node_shell/node.ml index 3277b009a..bc59433f5 100644 --- a/src/lib_node_shell/node.ml +++ b/src/lib_node_shell/node.ml @@ -90,13 +90,17 @@ type config = { } and timeout = Net_validator.timeout = { - operation: float ; block_header: float ; block_operations: float ; protocol: float ; new_head_request: float ; } +and prevalidator_limits = Prevalidator.limits = { + max_refused_operations: int ; + operation_timeout: float +} + let may_create_net state genesis = State.Net.get state (Net_id.of_block_hash genesis.State.Net.block) >>= function | Ok net -> Lwt.return net @@ -106,12 +110,14 @@ let may_create_net state genesis = let create { genesis ; store_root ; context_root ; patch_context ; p2p = net_params ; test_network_max_tll = max_child_ttl ; - bootstrap_threshold } timeout = + bootstrap_threshold } + timeout prevalidator_limits = init_p2p net_params >>=? fun p2p -> State.read ~store_root ~context_root ?patch_context () >>=? fun state -> let distributed_db = Distributed_db.create state p2p in - let validator = Validator.create state distributed_db timeout in + let validator = + Validator.create state distributed_db timeout prevalidator_limits in may_create_net state genesis >>= fun mainnet_state -> Validator.activate validator ~bootstrap_threshold diff --git a/src/lib_node_shell/node.mli b/src/lib_node_shell/node.mli index 61ff401dc..41c9d0a55 100644 --- a/src/lib_node_shell/node.mli +++ b/src/lib_node_shell/node.mli @@ -20,14 +20,17 @@ type config = { } and timeout = { - operation: float ; block_header: float ; block_operations: float ; protocol: float ; new_head_request: float ; } +and prevalidator_limits = { + max_refused_operations: int ; + operation_timeout: float +} -val create: config -> timeout -> t tzresult Lwt.t +val create: config -> timeout -> prevalidator_limits -> t tzresult Lwt.t module RPC : sig diff --git a/src/lib_node_shell/prevalidator.ml b/src/lib_node_shell/prevalidator.ml index c98b4c482..45fd4c2f1 100644 --- a/src/lib_node_shell/prevalidator.ml +++ b/src/lib_node_shell/prevalidator.ml @@ -70,15 +70,19 @@ let wakeup_with_result Lwt.wakeup_later u res ; Lwt.return (res >>? fun _res -> ok ()) +type limits = { + max_refused_operations : int ; + operation_timeout : float +} + (* Invariants: - an operation is in only one of these sets (map domains): - pv.refused pv.pending pv.fetching pv.live_operations pv.in_mempool + pv.refusals pv.pending pv.fetching pv.live_operations pv.in_mempool - pv.in_mempool is the domain of all fields of pv.prevalidation_result - pv.prevalidation_result.refused = Ø, refused ops are in pv.refused *) type t = { net_db : Distributed_db.net_db ; - operation_timeout : float ; - max_operations : int ; (* TODO: not sure if we should use that ? *) + limits : limits ; canceler : Lwt_canceler.t ; message_queue : message Lwt_pipe.t ; mutable (* just for init *) worker : unit Lwt.t ; @@ -86,7 +90,8 @@ type t = { mutable timestamp : Time.t ; mutable live_blocks : Block_hash.Set.t ; (* just a cache *) mutable live_operations : Operation_hash.Set.t ; (* just a cache *) - mutable refused : (Time.t * error list) Operation_hash.Map.t ; + refused : Operation_hash.t Ring.t ; + mutable refusals : error list Operation_hash.Map.t ; mutable fetching : Operation_hash.Set.t ; mutable pending : Operation.t Operation_hash.Map.t ; mutable mempool : Mempool.t ; @@ -120,7 +125,7 @@ let close_queue pv = Lwt_pipe.close pv.message_queue let already_handled pv oph = - Operation_hash.Map.mem oph pv.refused + Operation_hash.Map.mem oph pv.refusals || Operation_hash.Map.mem oph pv.pending || Operation_hash.Set.mem oph pv.fetching || Operation_hash.Set.mem oph pv.live_operations @@ -191,12 +196,13 @@ let handle_unprocessed pv = (fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool) pv.validation_result.refused @@ pv.in_mempool) ; - pv.refused <- (* TODO: cleanup *) - (let now = Time.now () in - Operation_hash.Map.fold - (fun h (_, errs) refused -> - Operation_hash.Map.add h (now, errs) refused) - pv.validation_result.refused pv.refused) ; + Operation_hash.Map.iter + (fun h (_, errs) -> + Option.iter (Ring.add_and_return_erased pv.refused h) + ~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ; + pv.refusals <- + Operation_hash.Map.add h errs pv.refusals) + pv.validation_result.refused ; Operation_hash.Map.iter (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.net_db oph) pv.validation_result.refused ; @@ -230,7 +236,7 @@ let handle_unprocessed pv = let fetch_operation pv ?peer oph = debug "fetching operation %a" Operation_hash.pp_short oph ; Distributed_db.Operation.fetch - ~timeout:pv.operation_timeout + ~timeout:pv.limits.operation_timeout pv.net_db ?peer oph () >>= function | Ok op -> push_request pv (Arrived (oph, op)) ; @@ -279,14 +285,14 @@ let on_inject pv op = if List.mem_assoc oph result.applied then return () else - let try_in_map map or_else = + let try_in_map map proj or_else = try - Lwt.return (Error (snd (Operation_hash.Map.find oph map))) + Lwt.return (Error (proj (Operation_hash.Map.find oph map))) with Not_found -> or_else () in - try_in_map pv.refused @@ fun () -> - try_in_map result.refused @@ fun () -> - try_in_map result.branch_refused @@ fun () -> - try_in_map result.branch_delayed @@ fun () -> + try_in_map pv.refusals (fun h -> h) @@ fun () -> + try_in_map result.refused snd @@ fun () -> + try_in_map result.branch_refused snd @@ fun () -> + try_in_map result.branch_delayed snd @@ fun () -> if Operation_hash.Set.mem oph pv.live_operations then failwith "Injected operation %a included in a previous block." Operation_hash.pp oph @@ -380,7 +386,7 @@ let rec worker_loop pv = Lwt_canceler.cancel pv.canceler >>= fun () -> Lwt.return_unit -let create ~max_operations ~operation_timeout net_db = +let create limits net_db = let net_state = Distributed_db.net_state net_db in let canceler = Lwt_canceler.create () in let message_queue = Lwt_pipe.create () in @@ -404,12 +410,12 @@ let create ~max_operations ~operation_timeout net_db = (fun s h -> Operation_hash.Set.add h s) Operation_hash.Set.empty mempool.known_valid in let pv = - { operation_timeout ; max_operations ; - net_db ; canceler ; + { limits ; net_db ; canceler ; worker = Lwt.return_unit ; message_queue ; predecessor ; timestamp ; live_blocks ; live_operations ; mempool = { known_valid = [] ; pending = Operation_hash.Set.empty }; - refused = Operation_hash.Map.empty ; + refused = Ring.create limits.max_refused_operations ; + refusals = Operation_hash.Map.empty ; fetching ; pending = Operation_hash.Map.empty ; in_mempool = Operation_hash.Set.empty ; diff --git a/src/lib_node_shell/prevalidator.mli b/src/lib_node_shell/prevalidator.mli index 695d516b8..1a41198b0 100644 --- a/src/lib_node_shell/prevalidator.mli +++ b/src/lib_node_shell/prevalidator.mli @@ -30,11 +30,13 @@ type t +type limits = { + max_refused_operations : int ; + operation_timeout : float +} + (** Creation and destruction of a "prevalidation" worker. *) -val create: - max_operations: int -> - operation_timeout: float -> - Distributed_db.net_db -> t Lwt.t +val create: limits -> Distributed_db.net_db -> t Lwt.t val shutdown: t -> unit Lwt.t val notify_operations: t -> P2p.Peer_id.t -> Mempool.t -> unit diff --git a/src/lib_node_shell/validator.ml b/src/lib_node_shell/validator.ml index 588e8cc70..1b23eccf0 100644 --- a/src/lib_node_shell/validator.ml +++ b/src/lib_node_shell/validator.ml @@ -15,19 +15,20 @@ type t = { db: Distributed_db.t ; block_validator: Block_validator.t ; timeout: Net_validator.timeout ; + prevalidator_limits: Prevalidator.limits ; valid_block_input: State.Block.t Lwt_watcher.input ; active_nets: Net_validator.t Lwt.t Net_id.Table.t ; } -let create state db timeout = +let create state db timeout prevalidator_limits = let block_validator = Block_validator.create ~protocol_timeout:timeout.Net_validator.protocol db in let valid_block_input = Lwt_watcher.create_input () in - { state ; db ; timeout ; block_validator ; + { state ; db ; timeout ; prevalidator_limits ; block_validator ; valid_block_input ; active_nets = Net_id.Table.create 7 ; } @@ -41,7 +42,8 @@ let activate v ?bootstrap_threshold ?max_child_ttl net_state = Net_validator.create ?bootstrap_threshold ?max_child_ttl - v.timeout v.block_validator v.valid_block_input v.db net_state in + v.timeout v.prevalidator_limits + v.block_validator v.valid_block_input v.db net_state in Net_id.Table.add v.active_nets net_id nv ; nv diff --git a/src/lib_node_shell/validator.mli b/src/lib_node_shell/validator.mli index 04d71bbfa..80b1d866a 100644 --- a/src/lib_node_shell/validator.mli +++ b/src/lib_node_shell/validator.mli @@ -11,7 +11,12 @@ type t -val create: State.t -> Distributed_db.t -> Net_validator.timeout -> t +val create: + State.t -> + Distributed_db.t -> + Net_validator.timeout -> + Prevalidator.limits -> + t val shutdown: t -> unit Lwt.t (** Start the validation scheduler of a given network. *)