From 98ec3393b66e94e9a2cd7147b8017fee29a901fe Mon Sep 17 00:00:00 2001 From: Benjamin Canou Date: Thu, 30 Nov 2017 18:34:22 +0100 Subject: [PATCH] Node: switch the prevalidator to `Tezos_worker` --- src/bin_node/jbuild | 2 + src/bin_node/node_config_file.ml | 52 +- src/lib_client_base/jbuild | 1 + src/lib_node_services/jbuild | 6 +- src/lib_node_shell/chain.ml | 6 +- src/lib_node_shell/chain.mli | 2 +- src/lib_node_shell/distributed_db.ml | 2 +- src/lib_node_shell/jbuild | 2 + src/lib_node_shell/net_validator.ml | 2 +- src/lib_node_shell/node.ml | 3 +- src/lib_node_shell/node.mli | 3 +- src/lib_node_shell/prevalidator.ml | 502 +++++++++--------- src/lib_node_shell/prevalidator.mli | 21 +- src/lib_node_shell/state.ml | 41 +- src/lib_node_shell/state.mli | 21 +- src/lib_node_shell/worker.ml | 16 +- src/lib_node_shell/worker.mli | 12 +- src/lib_node_shell_base/jbuild | 14 + .../mempool.ml | 32 +- .../mempool.mli | 10 +- .../prevalidator_worker_state.ml | 182 +++++++ .../prevalidator_worker_state.mli | 42 ++ .../tezos-node-shell-base.opam | 24 + .../worker_types.ml} | 0 .../worker_types.mli} | 0 25 files changed, 638 insertions(+), 360 deletions(-) create mode 100644 src/lib_node_shell_base/jbuild rename src/{lib_node_shell => lib_node_shell_base}/mempool.ml (56%) rename src/{lib_node_shell => lib_node_shell_base}/mempool.mli (78%) create mode 100644 src/lib_node_shell_base/prevalidator_worker_state.ml create mode 100644 src/lib_node_shell_base/prevalidator_worker_state.mli create mode 100644 src/lib_node_shell_base/tezos-node-shell-base.opam rename src/{lib_node_shell/worker_state.ml => lib_node_shell_base/worker_types.ml} (100%) rename src/{lib_node_shell/worker_state.mli => lib_node_shell_base/worker_types.mli} (100%) diff --git a/src/bin_node/jbuild b/src/bin_node/jbuild index fd47d07fe..6ff24d1d9 100644 --- a/src/bin_node/jbuild +++ b/src/bin_node/jbuild @@ -8,6 +8,7 @@ tezos-node-updater tezos-node-p2p-base tezos-node-p2p + tezos-node-shell-base tezos-node-shell tezos-embedded-protocol-genesis tezos-embedded-protocol-demo @@ -21,6 +22,7 @@ -open Tezos_node_updater -open Tezos_node_p2p_base -open Tezos_node_p2p + -open Tezos_node_shell_base -open Tezos_node_shell -linkall)))) diff --git a/src/bin_node/node_config_file.ml b/src/bin_node/node_config_file.ml index 53c2ee1b8..0433adee7 100644 --- a/src/bin_node/node_config_file.ml +++ b/src/bin_node/node_config_file.ml @@ -108,6 +108,12 @@ let default_shell = { prevalidator_limits = { operation_timeout = 10. ; max_refused_operations = 1000 ; + worker_limits = { + backlog_size = 1000 ; + backlog_level = Logging.Info ; + zombie_lifetime = 600. ; + zombie_memory = 120. ; + } } ; timeout = { block_header = 60. ; @@ -257,19 +263,45 @@ let log = (opt "rules" string) (dft "template" string default_log.template)) + +let worker_limits_encoding + default_size + default_level + default_zombie_lifetime + default_zombie_memory = + let open Data_encoding in + conv + (fun { Worker_types.backlog_size ; backlog_level ; zombie_lifetime ; zombie_memory } -> + (backlog_size, backlog_level, zombie_lifetime, zombie_memory)) + (fun (backlog_size, backlog_level, zombie_lifetime, zombie_memory) -> + { backlog_size ; backlog_level ; zombie_lifetime ; zombie_memory }) + (obj4 + (dft "worker_backlog_size" uint16 default_size) + (dft "worker_backlog_level" Logging.level_encoding default_level) + (dft "worker_zombie_lifetime" float default_zombie_lifetime) + (dft "worker_zombie_memory" float default_zombie_memory)) + +let timeout_encoding = + Data_encoding.ranged_float 0. 500. + let prevalidator_limits_encoding = let open Data_encoding in - let uint8 = conv int_of_float float_of_int uint8 in conv - (fun { Node.operation_timeout ; max_refused_operations } -> - (operation_timeout, max_refused_operations)) - (fun (operation_timeout, max_refused_operations) -> - { operation_timeout ; max_refused_operations }) - (obj2 - (dft "operations_timeout" uint8 - default_shell.prevalidator_limits.operation_timeout) - (dft "max_refused_operations" uint16 - default_shell.prevalidator_limits.max_refused_operations)) + (fun { Node.operation_timeout ; max_refused_operations ; worker_limits } -> + ((operation_timeout, max_refused_operations), worker_limits)) + (fun ((operation_timeout, max_refused_operations), worker_limits) -> + { operation_timeout ; max_refused_operations ; worker_limits}) + (merge_objs + (obj2 + (dft "operations_request_timeout" timeout_encoding + default_shell.prevalidator_limits.operation_timeout) + (dft "max_refused_operations" uint16 + default_shell.prevalidator_limits.max_refused_operations)) + (worker_limits_encoding + default_shell.prevalidator_limits.worker_limits.backlog_size + default_shell.prevalidator_limits.worker_limits.backlog_level + default_shell.prevalidator_limits.worker_limits.zombie_lifetime + default_shell.prevalidator_limits.worker_limits.zombie_memory)) let timeout_encoding = let open Data_encoding in diff --git a/src/lib_client_base/jbuild b/src/lib_client_base/jbuild index b8daae869..f98e81a83 100644 --- a/src/lib_client_base/jbuild +++ b/src/lib_client_base/jbuild @@ -7,6 +7,7 @@ tezos-storage tezos-rpc-http tezos-node-p2p-base + tezos-node-shell-base tezos-node-services tezos-node-updater tezos-protocol-compiler)) diff --git a/src/lib_node_services/jbuild b/src/lib_node_services/jbuild index 26f9448a1..14900bbfd 100644 --- a/src/lib_node_services/jbuild +++ b/src/lib_node_services/jbuild @@ -4,11 +4,13 @@ ((name tezos_node_services) (public_name tezos-node-services) (libraries (tezos-base - tezos-node-p2p-base)) + tezos-node-p2p-base + tezos-node-shell-base)) (flags (:standard -w -9+27-30-32-40@8 -safe-string -open Tezos_base__TzPervasives - -open Tezos_node_p2p_base)))) + -open Tezos_node_p2p_base + -open Tezos_node_shell_base)))) (alias ((name runtest_indent) diff --git a/src/lib_node_shell/chain.ml b/src/lib_node_shell/chain.ml index eb568377c..963e5fb96 100644 --- a/src/lib_node_shell/chain.ml +++ b/src/lib_node_shell/chain.ml @@ -10,7 +10,7 @@ open Logging.Node.State open State -let mempool_encoding = State.mempool_encoding +let mempool_encoding = Mempool.encoding let genesis net_state = let genesis = Net.genesis net_state in @@ -37,7 +37,7 @@ let mem net_state hash = type data = State.chain_data = { current_head: Block.t ; - current_mempool: mempool ; + current_mempool: Mempool.t ; live_blocks: Block_hash.Set.t ; live_operations: Operation_hash.Set.t ; locator: Block_locator.t Lwt.t lazy_t ; @@ -86,7 +86,7 @@ let locked_set_head net_state chain_store data block = block (State.Block.max_operations_ttl block) >>= fun (live_blocks, live_operations) -> Lwt.return { current_head = block ; - current_mempool = State.empty_mempool ; + current_mempool = Mempool.empty ; live_blocks ; live_operations ; locator = lazy (State.compute_locator net_state block) ; diff --git a/src/lib_node_shell/chain.mli b/src/lib_node_shell/chain.mli index 86fcc0c12..696149e9d 100644 --- a/src/lib_node_shell/chain.mli +++ b/src/lib_node_shell/chain.mli @@ -22,7 +22,7 @@ val locator: Net.t -> Block_locator.t Lwt.t (** All the available chain data. *) type data = { current_head: Block.t ; - current_mempool: mempool ; + current_mempool: Mempool.t ; live_blocks: Block_hash.Set.t ; live_operations: Operation_hash.Set.t ; locator: Block_locator.t Lwt.t lazy_t ; diff --git a/src/lib_node_shell/distributed_db.ml b/src/lib_node_shell/distributed_db.ml index 658ebeac5..cb8c1db76 100644 --- a/src/lib_node_shell/distributed_db.ml +++ b/src/lib_node_shell/distributed_db.ml @@ -491,7 +491,7 @@ module P2p_reader = struct | Get_current_head net_id -> may_handle state net_id @@ fun net_db -> - Mempool.get net_db.net_state >>= fun (head, mempool) -> + State.Current_mempool.get net_db.net_state >>= fun (head, mempool) -> (* TODO bound the sent mempool size *) ignore @@ P2p.try_send global_db.p2p state.conn diff --git a/src/lib_node_shell/jbuild b/src/lib_node_shell/jbuild index 277b430d7..ca6b7db91 100644 --- a/src/lib_node_shell/jbuild +++ b/src/lib_node_shell/jbuild @@ -8,6 +8,7 @@ tezos-rpc-http tezos-node-services tezos-node-p2p-base + tezos-node-shell-base tezos-node-p2p tezos-node-updater)) (flags (:standard -w -9+27-30-32-40@8 @@ -17,6 +18,7 @@ -open Tezos_rpc_http -open Tezos_node_services -open Tezos_node_p2p_base + -open Tezos_node_shell_base -open Tezos_node_p2p -open Tezos_node_updater)))) diff --git a/src/lib_node_shell/net_validator.ml b/src/lib_node_shell/net_validator.ml index d9fd9e80b..1a06da9e5 100644 --- a/src/lib_node_shell/net_validator.ml +++ b/src/lib_node_shell/net_validator.ml @@ -207,7 +207,7 @@ and worker_loop nv = else begin Chain.set_head nv.net_state block >>= fun previous -> broadcast_head nv ~previous block >>= fun () -> - Prevalidator.flush nv.prevalidator block ; (* FIXME *) + Prevalidator.flush nv.prevalidator block_hash >>=? fun () -> may_switch_test_network nv block >>= fun () -> Lwt_watcher.notify nv.new_head_input block ; lwt_log_notice "update current head %a %a %a(%t)" diff --git a/src/lib_node_shell/node.ml b/src/lib_node_shell/node.ml index bc59433f5..3671658a5 100644 --- a/src/lib_node_shell/node.ml +++ b/src/lib_node_shell/node.ml @@ -98,7 +98,8 @@ and timeout = Net_validator.timeout = { and prevalidator_limits = Prevalidator.limits = { max_refused_operations: int ; - operation_timeout: float + operation_timeout: float ; + worker_limits : Worker_types.limits ; } let may_create_net state genesis = diff --git a/src/lib_node_shell/node.mli b/src/lib_node_shell/node.mli index 41c9d0a55..1b0cfd125 100644 --- a/src/lib_node_shell/node.mli +++ b/src/lib_node_shell/node.mli @@ -27,7 +27,8 @@ and timeout = { } and prevalidator_limits = { max_refused_operations: int ; - operation_timeout: float + operation_timeout: float ; + worker_limits : Worker_types.limits ; } val create: config -> timeout -> prevalidator_limits -> t tzresult Lwt.t diff --git a/src/lib_node_shell/prevalidator.ml b/src/lib_node_shell/prevalidator.ml index ebf0366e2..164231046 100644 --- a/src/lib_node_shell/prevalidator.ml +++ b/src/lib_node_shell/prevalidator.ml @@ -7,7 +7,78 @@ (* *) (**************************************************************************) -open Logging.Node.Prevalidator +open Prevalidator_worker_state + +type limits = { + max_refused_operations : int ; + operation_timeout : float ; + worker_limits : Worker_types.limits ; +} + +module Name = struct + type t = Net_id.t + let encoding = Net_id.encoding + let base = [ "prevalidator" ] + let pp = Net_id.pp_short +end + +module Types = struct + (* Invariants: + - an operation is in only one of these sets (map domains): + pv.refusals pv.pending pv.fetching pv.live_operations pv.in_mempool + - pv.in_mempool is the domain of all fields of pv.prevalidation_result + - pv.prevalidation_result.refused = Ø, refused ops are in pv.refused + - the 'applied' operations in pv.validation_result are in reverse order. *) + type state = { + net_db : Distributed_db.net_db ; + limits : limits ; + mutable predecessor : State.Block.t ; + mutable timestamp : Time.t ; + mutable live_blocks : Block_hash.Set.t ; (* just a cache *) + mutable live_operations : Operation_hash.Set.t ; (* just a cache *) + refused : Operation_hash.t Ring.t ; + mutable refusals : error list Operation_hash.Map.t ; + mutable fetching : Operation_hash.Set.t ; + mutable pending : Operation.t Operation_hash.Map.t ; + mutable mempool : Mempool.t ; + mutable in_mempool : Operation_hash.Set.t ; + mutable validation_result : error Preapply_result.t ; + mutable validation_state : Prevalidation.prevalidation_state tzresult ; + mutable advertisement : [ `Pending of Mempool.t | `None ] ; + } + type parameters = limits * Distributed_db.net_db + + include Worker_state + + let view (state : state) _ : view = + let domain map = + Operation_hash.Map.fold + (fun elt _ acc -> Operation_hash.Set.add elt acc) + map Operation_hash.Set.empty in + { head = State.Block.hash state.predecessor ; + timestamp = state.timestamp ; + fetching = state.fetching ; + pending = domain state.pending ; + applied = + List.rev + (List.map (fun (h, _) -> h) + state.validation_result.applied) ; + delayed = + Operation_hash.Set.union + (domain state.validation_result.branch_delayed) + (domain state.validation_result.branch_refused) } + +end + +module Worker = Worker.Make (Name) (Event) (Request) (Types) + +open Types + +type t = Worker.infinite Worker.queue Worker.t +type error += Closed = Worker.Closed + +let debug w = + Format.kasprintf (fun msg -> Worker.record_event w (Debug msg)) let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool = let rec pop_blocks ancestor block mempool = @@ -48,103 +119,6 @@ let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool = Lwt_list.fold_left_s push_block mempool path >>= fun new_mempool -> Lwt.return new_mempool -type 'a request = - | Flush : State.Block.t -> unit request - | Notify : P2p.Peer_id.t * Mempool.t -> unit request - | Inject : Operation.t -> unit tzresult request - | Arrived : Operation_hash.t * Operation.t -> unit request - | Advertise : unit request - -type message = Message: 'a request * 'a tzresult Lwt.u option -> message - -let wakeup_with_result - : type t. - t request -> - t tzresult Lwt.u option -> - (t request -> t tzresult Lwt.t) -> - unit tzresult Lwt.t - = fun req u cb -> match u with - | None -> - cb req >>=? fun _res -> return () - | Some u -> - cb req >>= fun res -> - Lwt.wakeup_later u res ; - Lwt.return (res >>? fun _res -> ok ()) - -type limits = { - max_refused_operations : int ; - operation_timeout : float -} - -(* Invariants: - - an operation is in only one of these sets (map domains): - pv.refusals pv.pending pv.fetching pv.live_operations pv.in_mempool - - pv.in_mempool is the domain of all fields of pv.prevalidation_result - - pv.prevalidation_result.refused = Ø, refused ops are in pv.refused *) -type t = { - net_db : Distributed_db.net_db ; - limits : limits ; - canceler : Lwt_canceler.t ; - message_queue : message Lwt_pipe.t ; - mutable (* just for init *) worker : unit Lwt.t ; - mutable predecessor : State.Block.t ; - mutable timestamp : Time.t ; - mutable live_blocks : Block_hash.Set.t ; (* just a cache *) - mutable live_operations : Operation_hash.Set.t ; (* just a cache *) - refused : Operation_hash.t Ring.t ; - mutable refusals : error list Operation_hash.Map.t ; - mutable fetching : Operation_hash.Set.t ; - mutable pending : Operation.t Operation_hash.Map.t ; - mutable mempool : Mempool.t ; - mutable in_mempool : Operation_hash.Set.t ; - mutable validation_result : error Preapply_result.t ; - mutable validation_state : Prevalidation.prevalidation_state tzresult ; - mutable advertisement : [ `Pending of Mempool.t | `None ] ; -} - -type error += Closed of Net_id.t - -let () = - register_error_kind `Permanent - ~id:"prevalidator.closed" - ~title:"Prevalidator closed" - ~description: - "An operation on the prevalidator could not complete \ - before the prevalidator was shut down." - ~pp: (fun ppf net_id -> - Format.fprintf ppf - "Prevalidator for network %a has been shut down." - Net_id.pp_short net_id) - Data_encoding.(obj1 (req "net_id" Net_id.encoding)) - (function Closed net_id -> Some net_id | _ -> None) - (fun net_id -> Closed net_id) - -let push_request pv request = - Lwt_pipe.safe_push_now pv.message_queue (Message (request, None)) - -let push_request_and_wait pv request = - let t, u = Lwt.wait () in - Lwt.catch - (fun () -> - Lwt_pipe.push_now_exn pv.message_queue (Message (request, Some u)) ; - t) - (function - | Lwt_pipe.Closed -> - let net_id = (State.Net.id (Distributed_db.net_state pv.net_db)) in - fail (Closed net_id) - | exn -> fail (Exn exn)) - -let close_queue pv = - let messages = Lwt_pipe.pop_all_now pv.message_queue in - List.iter - (function - | Message (_, Some u) -> - let net_id = (State.Net.id (Distributed_db.net_state pv.net_db)) in - Lwt.wakeup_later u (Error [ Closed net_id ]) - | _ -> ()) - messages ; - Lwt_pipe.close pv.message_queue - let already_handled pv oph = Operation_hash.Map.mem oph pv.refusals || Operation_hash.Map.mem oph pv.pending @@ -153,7 +127,7 @@ let already_handled pv oph = || Operation_hash.Set.mem oph pv.in_mempool let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t = - { Mempool.known_valid = fst (List.split r.applied) ; + { Mempool.known_valid = List.map fst r.applied ; pending = Operation_hash.Map.fold (fun k _ s -> Operation_hash.Set.add k s) @@ -184,7 +158,7 @@ let merge_validation_results ~old ~neu = (filter_out neu.applied old.branch_delayed) neu.branch_delayed } -let advertise pv mempool = +let advertise (w : t) pv mempool = match pv.advertisement with | `Pending { Mempool.known_valid ; pending } -> pv.advertisement <- @@ -195,10 +169,10 @@ let advertise pv mempool = pv.advertisement <- `Pending mempool ; Lwt.async (fun () -> Lwt_unix.sleep 0.01 >>= fun () -> - push_request pv Advertise ; + Worker.push_request_now w Advertise ; Lwt.return_unit) -let handle_unprocessed pv = +let handle_unprocessed w pv = begin match pv.validation_state with | Error err -> pv.validation_result <- @@ -211,49 +185,45 @@ let handle_unprocessed pv = Operation_hash.Map.empty ; Lwt.return () | Ok validation_state -> - if Operation_hash.Map.is_empty pv.pending then - Lwt.return () - else - begin match Operation_hash.Map.cardinal pv.pending with - | 0 -> Lwt.return () - | n -> lwt_debug "processing %d operations" n - end >>= fun () -> - Prevalidation.prevalidate validation_state ~sort:true - (Operation_hash.Map.bindings pv.pending) - >>= fun (validation_state, validation_result) -> - pv.validation_state <- - Ok validation_state ; - pv.in_mempool <- - (Operation_hash.Map.fold - (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) - pv.pending @@ - Operation_hash.Map.fold - (fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool) - pv.validation_result.refused @@ - pv.in_mempool) ; - Operation_hash.Map.iter - (fun h (_, errs) -> - Option.iter (Ring.add_and_return_erased pv.refused h) - ~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ; - pv.refusals <- - Operation_hash.Map.add h errs pv.refusals) - pv.validation_result.refused ; - Operation_hash.Map.iter - (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.net_db oph) - pv.validation_result.refused ; - pv.validation_result <- - merge_validation_results - ~old:pv.validation_result - ~neu:validation_result ; - pv.pending <- - Operation_hash.Map.empty ; - advertise pv - (mempool_of_prevalidation_result validation_result) ; - Lwt.return () + match Operation_hash.Map.cardinal pv.pending with + | 0 -> Lwt.return () + | n -> debug w "processing %d operations" n ; + Prevalidation.prevalidate validation_state ~sort:true + (Operation_hash.Map.bindings pv.pending) + >>= fun (validation_state, validation_result) -> + pv.validation_state <- + Ok validation_state ; + pv.in_mempool <- + (Operation_hash.Map.fold + (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) + pv.pending @@ + Operation_hash.Map.fold + (fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool) + pv.validation_result.refused @@ + pv.in_mempool) ; + Operation_hash.Map.iter + (fun h (_, errs) -> + Option.iter (Ring.add_and_return_erased pv.refused h) + ~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ; + pv.refusals <- + Operation_hash.Map.add h errs pv.refusals) + pv.validation_result.refused ; + Operation_hash.Map.iter + (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.net_db oph) + pv.validation_result.refused ; + pv.validation_result <- + merge_validation_results + ~old:pv.validation_result + ~neu:validation_result ; + pv.pending <- + Operation_hash.Map.empty ; + advertise w pv + (mempool_of_prevalidation_result validation_result) ; + Lwt.return () end >>= fun () -> pv.mempool <- { Mempool.known_valid = - fst (List.split pv.validation_result.applied) ; + List.rev_map fst pv.validation_result.applied ; pending = Operation_hash.Map.fold (fun k _ s -> Operation_hash.Set.add k s) @@ -262,33 +232,29 @@ let handle_unprocessed pv = (fun k _ s -> Operation_hash.Set.add k s) pv.validation_result.branch_refused @@ Operation_hash.Set.empty } ; - Mempool.set (Distributed_db.net_state pv.net_db) + State.Current_mempool.set (Distributed_db.net_state pv.net_db) ~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () -> Lwt.return () -let fetch_operation pv ?peer oph = - debug "fetching operation %a" Operation_hash.pp_short oph ; +let fetch_operation w pv ?peer oph = + debug w + "fetching operation %a" + Operation_hash.pp_short oph ; Distributed_db.Operation.fetch ~timeout:pv.limits.operation_timeout pv.net_db ?peer oph () >>= function | Ok op -> - push_request pv (Arrived (oph, op)) ; + Worker.push_request_now w (Arrived (oph, op)) ; Lwt.return_unit | Error [ Distributed_db.Operation.Canceled _ ] -> - lwt_debug + debug w "operation %a included before being prevalidated" - Operation_hash.pp_short oph >>= fun () -> + Operation_hash.pp_short oph ; Lwt.return_unit | Error _ -> (* should not happen *) Lwt.return_unit -let clear_fetching pv = - Operation_hash.Set.iter - (Distributed_db.Operation.clear_or_cancel pv.net_db) - pv.fetching - -let on_operation_arrived pv oph op = - debug "operation %a retrieved" Operation_hash.pp_short oph ; +let on_operation_arrived (pv : state) oph op = pv.fetching <- Operation_hash.Set.remove oph pv.fetching ; if not (Block_hash.Set.mem op.Operation.shell.branch pv.live_blocks) then begin Distributed_db.Operation.clear_or_cancel pv.net_db oph @@ -299,43 +265,40 @@ let on_operation_arrived pv oph op = let on_inject pv op = let oph = Operation.hash op in - log_notice "injection of operation %a" Operation_hash.pp_short oph ; begin - begin if already_handled pv oph then - return pv.validation_result - else - Lwt.return pv.validation_state >>=? fun validation_state -> - Prevalidation.prevalidate - validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) -> - match result.applied with - | [ app_oph, _ ] when Operation_hash.equal app_oph oph -> - Distributed_db.inject_operation pv.net_db oph op >>= fun (_ : bool) -> - pv.pending <- Operation_hash.Map.add oph op pv.pending ; - return result - | _ -> - return result - end >>=? fun result -> - if List.mem_assoc oph result.applied then - return () + if already_handled pv oph then + return pv.validation_result else - let try_in_map map proj or_else = - try - Lwt.return (Error (proj (Operation_hash.Map.find oph map))) - with Not_found -> or_else () in - try_in_map pv.refusals (fun h -> h) @@ fun () -> - try_in_map result.refused snd @@ fun () -> - try_in_map result.branch_refused snd @@ fun () -> - try_in_map result.branch_delayed snd @@ fun () -> - if Operation_hash.Set.mem oph pv.live_operations then - failwith "Injected operation %a included in a previous block." - Operation_hash.pp oph - else - failwith "Injected operation %a is not in prevalidation result." - Operation_hash.pp oph - end >>= fun tzresult -> - return tzresult + Lwt.return pv.validation_state >>=? fun validation_state -> + Prevalidation.prevalidate + validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) -> + match result.applied with + | [ app_oph, _ ] when Operation_hash.equal app_oph oph -> + Distributed_db.inject_operation pv.net_db oph op >>= fun (_ : bool) -> + pv.pending <- Operation_hash.Map.add oph op pv.pending ; + return result + | _ -> + return result + end >>=? fun result -> + if List.mem_assoc oph result.applied then + return () + else + let try_in_map map proj or_else = + try + Lwt.return (Error (proj (Operation_hash.Map.find oph map))) + with Not_found -> or_else () in + try_in_map pv.refusals (fun h -> h) @@ fun () -> + try_in_map result.refused snd @@ fun () -> + try_in_map result.branch_refused snd @@ fun () -> + try_in_map result.branch_delayed snd @@ fun () -> + if Operation_hash.Set.mem oph pv.live_operations then + failwith "Injected operation %a included in a previous block." + Operation_hash.pp oph + else + failwith "Injected operation %a is not in prevalidation result." + Operation_hash.pp oph -let on_notify pv peer mempool = +let on_notify w pv peer mempool = let all_ophs = List.fold_left (fun s oph -> Operation_hash.Set.add oph s) @@ -344,16 +307,15 @@ let on_notify pv peer mempool = Operation_hash.Set.filter (fun oph -> not (already_handled pv oph)) all_ophs in - debug "notification of %d new operations" (Operation_hash.Set.cardinal to_fetch) ; pv.fetching <- Operation_hash.Set.union to_fetch pv.fetching ; Operation_hash.Set.iter - (fun oph -> Lwt.ignore_result (fetch_operation ~peer pv oph)) + (fun oph -> Lwt.ignore_result (fetch_operation w pv ~peer oph)) to_fetch -let on_flush pv predecessor = +let on_flush w pv predecessor = list_pendings ~maintain_net_db:pv.net_db ~from_block:pv.predecessor ~to_block:predecessor @@ -372,9 +334,8 @@ let on_flush pv predecessor = validation_state ~sort:false [] >>= fun (state, result) -> Lwt.return (Ok state, result) end >>= fun (validation_state, validation_result) -> - lwt_log_notice "flushing the mempool for new head %a (%d operations)" - Block_hash.pp_short (State.Block.hash predecessor) - (Operation_hash.Map.cardinal pending) >>= fun () -> + debug w "%d operations were not washed by the flush" + (Operation_hash.Map.cardinal pending) ; pv.predecessor <- predecessor ; pv.live_blocks <- new_live_blocks ; pv.live_operations <- new_live_operations ; @@ -393,48 +354,43 @@ let on_advertise pv = pv.advertisement <- `None ; Distributed_db.Advertise.current_head pv.net_db ~mempool pv.predecessor -let rec worker_loop pv = - begin - handle_unprocessed pv >>= fun () -> - Lwt_utils.protect ~canceler:pv.canceler begin fun () -> - Lwt_pipe.pop pv.message_queue >>= return - end >>=? fun (Message (message, u)) -> - wakeup_with_result message u @@ function - | Flush block -> - on_advertise pv ; - (* TODO: rebase the advertisement instead *) - on_flush pv block >>=? fun () -> - return () - | Notify (peer, mempool) -> - on_notify pv peer mempool ; - return () - | Inject op -> - on_inject pv op - | Arrived (oph, op) -> - on_operation_arrived pv oph op ; - return () - | Advertise -> - on_advertise pv ; - return () - end >>= function - | Ok () -> - worker_loop pv - | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> - clear_fetching pv ; - close_queue pv ; - Lwt.return_unit - | Error err -> - lwt_log_error "@[Unexpected error:@ %a@]" - pp_print_error err >>= fun () -> - close_queue pv ; - clear_fetching pv ; - Lwt_canceler.cancel pv.canceler >>= fun () -> - Lwt.return_unit +let on_request + : type r. t -> r Request.t -> r tzresult Lwt.t + = fun w request -> + let pv = Worker.state w in + begin match request with + | Request.Flush hash -> + on_advertise pv ; + (* TODO: rebase the advertisement instead *) + let net_state = Distributed_db.net_state pv.net_db in + State.Block.read net_state hash >>=? fun block -> + on_flush w pv block >>=? fun () -> + return (() : r) + | Request.Notify (peer, mempool) -> + on_notify w pv peer mempool ; + return () + | Request.Inject op -> + on_inject pv op >>= fun tzresult -> + return tzresult + | Request.Arrived (oph, op) -> + on_operation_arrived pv oph op ; + return () + | Request.Advertise -> + on_advertise pv ; + return () + end >>=? fun r -> + handle_unprocessed w pv >>= fun () -> + return r -let create limits net_db = +let on_close w = + let pv = Worker.state w in + Operation_hash.Set.iter + (Distributed_db.Operation.clear_or_cancel pv.net_db) + pv.fetching ; + Lwt.return_unit + +let on_launch w _ (limits, net_db) = let net_state = Distributed_db.net_state net_db in - let canceler = Lwt_canceler.create () in - let message_queue = Lwt_pipe.create () in State.read_chain_store net_state (fun _ { current_head ; current_mempool ; live_blocks ; live_operations } -> Lwt.return (current_head, current_mempool, live_blocks, live_operations)) @@ -455,8 +411,7 @@ let create limits net_db = (fun s h -> Operation_hash.Set.add h s) Operation_hash.Set.empty mempool.known_valid in let pv = - { limits ; net_db ; canceler ; - worker = Lwt.return_unit ; message_queue ; + { limits ; net_db ; predecessor ; timestamp ; live_blocks ; live_operations ; mempool = { known_valid = [] ; pending = Operation_hash.Set.empty }; refused = Ring.create limits.max_refused_operations ; @@ -467,32 +422,52 @@ let create limits net_db = validation_result ; validation_state ; advertisement = `None } in List.iter - (fun oph -> Lwt.ignore_result (fetch_operation pv oph)) + (fun oph -> Lwt.ignore_result (fetch_operation w pv oph)) mempool.known_valid ; - pv.worker <- - Lwt_utils.worker - (Format.asprintf "net_prevalidator.%a" Net_id.pp (State.Net.id net_state)) - ~run:(fun () -> worker_loop pv) - ~cancel:(fun () -> Lwt_canceler.cancel pv.canceler) ; Lwt.return pv -let shutdown pv = - lwt_debug "shutdown" >>= fun () -> - Lwt_canceler.cancel pv.canceler >>= fun () -> - pv.worker +let on_error w r st errs = + Worker.record_event w (Event.Request (r, st, Some errs)) ; + Lwt.return (Error errs) -let flush pv head = - push_request pv (Flush head) +let on_completion w r _ st = + Worker.record_event w (Event.Request (Request.view r, st, None )) ; + Lwt.return () -let notify_operations pv peer mempool = - push_request pv (Notify (peer, mempool)) +let table = Worker.create_table Queue -let operations pv = +let create limits net_db = + let net_state = Distributed_db.net_state net_db in + let module Handlers = struct + type self = t + let on_launch = on_launch + let on_request = on_request + let on_close = on_close + let on_error = on_error + let on_completion = on_completion + let on_no_request _ = return () + end in + Worker.launch table limits.worker_limits + (State.Net.id net_state) + (limits, net_db) + (module Handlers) + +let shutdown = Worker.shutdown + +let flush w head = + Worker.push_request_and_wait w (Flush head) + +let notify_operations w peer mempool = + Worker.push_request_now w (Notify (peer, mempool)) + +let operations w = + let pv = Worker.state w in { pv.validation_result with applied = List.rev pv.validation_result.applied }, pv.pending -let pending ?block pv = +let pending ?block w = + let pv = Worker.state w in let ops = Preapply_result.operations pv.validation_result in match block with | Some to_block -> @@ -501,12 +476,25 @@ let pending ?block pv = ~from_block:pv.predecessor ~to_block ops | None -> Lwt.return ops -let timestamp pv = pv.timestamp +let timestamp w = + let pv = Worker.state w in + pv.timestamp -let context pv = +let context w = + let pv = Worker.state w in Lwt.return pv.validation_state >>=? fun validation_state -> Prevalidation.end_prevalidation validation_state -let inject_operation pv op = - push_request_and_wait pv (Inject op) >>=? fun result -> +let inject_operation w op = + Worker.push_request_and_wait w (Inject op) >>=? fun result -> Lwt.return result + +let status = Worker.status + +let running_workers () = Worker.list table + +let pending_requests t = Worker.pending_requests t + +let current_request t = Worker.current_request t + +let last_events = Worker.last_events diff --git a/src/lib_node_shell/prevalidator.mli b/src/lib_node_shell/prevalidator.mli index 1ea978365..a9dd5a465 100644 --- a/src/lib_node_shell/prevalidator.mli +++ b/src/lib_node_shell/prevalidator.mli @@ -32,26 +32,25 @@ type t type limits = { max_refused_operations : int ; - operation_timeout : float + operation_timeout : float ; + worker_limits : Worker_types.limits ; } type error += Closed of Net_id.t -(** Creation and destruction of a "prevalidation" worker. *) val create: limits -> Distributed_db.net_db -> t Lwt.t val shutdown: t -> unit Lwt.t - val notify_operations: t -> P2p.Peer_id.t -> Mempool.t -> unit - -(** Conditionnaly inject a new operation in the node: the operation will - be ignored when it is (strongly) refused This is the - entry-point used by the P2P layer. The operation content has been - previously stored on disk. *) val inject_operation: t -> Operation.t -> unit tzresult Lwt.t - -val flush: t -> State.Block.t -> unit +val flush: t -> Block_hash.t -> unit tzresult Lwt.t val timestamp: t -> Time.t val operations: t -> error Preapply_result.t * Operation.t Operation_hash.Map.t val context: t -> Updater.validation_result tzresult Lwt.t - val pending: ?block:State.Block.t -> t -> Operation.t Operation_hash.Map.t Lwt.t + +val running_workers: unit -> (Net_id.t * t) list +val status: t -> Worker_types.worker_status + +val pending_requests : t -> (Time.t * Prevalidator_worker_state.Request.view) list +val current_request : t -> (Time.t * Time.t * Prevalidator_worker_state.Request.view) option +val last_events : t -> (Lwt_log_core.level * Prevalidator_worker_state.Event.t list) list diff --git a/src/lib_node_shell/state.ml b/src/lib_node_shell/state.ml index 2e65f446b..062d5333f 100644 --- a/src/lib_node_shell/state.ml +++ b/src/lib_node_shell/state.ml @@ -101,37 +101,18 @@ and chain_state = { and chain_data = { current_head: block ; - current_mempool: mempool ; + current_mempool: Mempool.t ; live_blocks: Block_hash.Set.t ; live_operations: Operation_hash.Set.t ; locator: Block_locator.t Lwt.t lazy_t ; } -and mempool = { - known_valid: Operation_hash.t list ; - pending: Operation_hash.Set.t ; -} - and block = { net_state: net_state ; hash: Block_hash.t ; contents: Store.Block.contents ; } -let mempool_encoding = - let open Data_encoding in - conv - (fun { known_valid ; pending } -> (known_valid, pending)) - (fun (known_valid, pending) -> { known_valid ; pending }) - (obj2 - (req "known_valid" (dynamic_size (list Operation_hash.encoding))) - (req "pending" (dynamic_size Operation_hash.Set.encoding))) - -let empty_mempool = { - known_valid = [] ; - pending = Operation_hash.Set.empty ; -} - let read_chain_store { chain_state } f = Shared.use chain_state begin fun state -> f state.chain_store state.data @@ -219,7 +200,7 @@ module Net = struct hash = current_head ; contents = current_block ; } ; - current_mempool = empty_mempool ; + current_mempool = Mempool.empty ; live_blocks = Block_hash.Set.singleton genesis.block ; live_operations = Operation_hash.Set.empty ; locator = lazy (compute_locator_from_hash net_state current_head) ; @@ -776,6 +757,24 @@ module Register_embedded_protocol end +module Current_mempool = struct + + let set net_state ~head mempool = + update_chain_store net_state begin fun _chain_store data -> + if Block_hash.equal head (Block.hash data.current_head) then + Lwt.return (Some { data with current_mempool = mempool }, + ()) + else + Lwt.return (None, ()) + end + + let get net_state = + read_chain_store net_state begin fun _chain_store data -> + Lwt.return (Block.header data.current_head, data.current_mempool) + end + +end + let read ?patch_context ~store_root diff --git a/src/lib_node_shell/state.mli b/src/lib_node_shell/state.mli index 816528029..d08cadfb8 100644 --- a/src/lib_node_shell/state.mli +++ b/src/lib_node_shell/state.mli @@ -168,17 +168,9 @@ val compute_locator: Net.t -> ?size:int -> Block.t -> Block_locator.t Lwt.t val fork_testnet: Block.t -> Protocol_hash.t -> Time.t -> Net.t tzresult Lwt.t -type mempool = { - known_valid: Operation_hash.t list ; - pending: Operation_hash.Set.t ; -} - -val empty_mempool: mempool -val mempool_encoding: mempool Data_encoding.t - type chain_data = { current_head: Block.t ; - current_mempool: mempool ; + current_mempool: Mempool.t ; live_blocks: Block_hash.Set.t ; live_operations: Operation_hash.Set.t ; locator: Block_locator.t Lwt.t lazy_t ; @@ -239,6 +231,17 @@ module Registred_protocol : sig end +module Current_mempool : sig + + val get: Net.t -> (Block_header.t * Mempool.t) Lwt.t + (** The current mempool. *) + + val set: Net.t -> head:Block_hash.t -> Mempool.t -> unit Lwt.t + (** Set the current mempool. It is ignored if the current head is + not the provided one. *) + +end + module Register_embedded_protocol (Env : Updater.Node_protocol_environment_sigs.V1) (Proto : Env.Updater.PROTOCOL) diff --git a/src/lib_node_shell/worker.ml b/src/lib_node_shell/worker.ml index d4d78e285..d333c4459 100644 --- a/src/lib_node_shell/worker.ml +++ b/src/lib_node_shell/worker.ml @@ -73,7 +73,7 @@ module Make | Dropbox_buffer : (Time.t * message) Lwt_dropbox.t -> dropbox buffer and 'kind t = { - limits : Worker_state.limits ; + limits : Worker_types.limits ; timeout : float option ; parameters : Types.parameters ; mutable (* only for init *) worker : unit Lwt.t ; @@ -83,7 +83,7 @@ module Make canceler : Lwt_canceler.t ; name : Name.t ; id : int ; - mutable status : Worker_state.worker_status ; + mutable status : Worker_types.worker_status ; mutable current_request : (Time.t * Time.t * Request.view) option ; table : 'kind table ; } @@ -236,9 +236,9 @@ module Make val on_close : self -> unit Lwt.t val on_error : - self -> Request.view -> Worker_state.request_status -> error list -> unit tzresult Lwt.t + self -> Request.view -> Worker_types.request_status -> error list -> unit tzresult Lwt.t val on_completion : - self -> 'a Request.t -> 'a -> Worker_state.request_status -> unit Lwt.t + self -> 'a Request.t -> 'a -> Worker_types.request_status -> unit Lwt.t end let create_table buffer_kind = @@ -287,7 +287,7 @@ module Make let completed = Time.now () in w.current_request <- None ; Handlers.on_completion w - request res Worker_state.{ pushed ; treated ; completed } >>= fun () -> + request res Worker_types.{ pushed ; treated ; completed } >>= fun () -> return () | Some u -> Handlers.on_request w request >>= fun res -> @@ -296,7 +296,7 @@ module Make let completed = Time.now () in w.current_request <- None ; Handlers.on_completion w - request res Worker_state.{ pushed ; treated ; completed } >>= fun () -> + request res Worker_types.{ pushed ; treated ; completed } >>= fun () -> return () end >>= function | Ok () -> @@ -312,7 +312,7 @@ module Make let completed = Time.now () in w.current_request <- None ; Handlers.on_error w - request Worker_state.{ pushed ; treated ; completed } errs + request Worker_types.{ pushed ; treated ; completed } errs | None -> assert false end >>= function | Ok () -> @@ -328,7 +328,7 @@ module Make let launch : type kind. kind table -> ?timeout:float -> - Worker_state.limits -> Name.t -> Types.parameters -> + Worker_types.limits -> Name.t -> Types.parameters -> (module HANDLERS with type self = kind t) -> kind t Lwt.t = fun table ?timeout limits name parameters (module Handlers) -> diff --git a/src/lib_node_shell/worker.mli b/src/lib_node_shell/worker.mli index 129cbb87c..5ddba78fb 100644 --- a/src/lib_node_shell/worker.mli +++ b/src/lib_node_shell/worker.mli @@ -40,7 +40,7 @@ module type EVENT = sig (** Assigns a logging level to each event. Events can be ignored for logging w.r.t. the global node configuration. Events can be ignored for introspection w.r.t. to the worker's - {!Worker_state.limits}. *) + {!Worker_types.limits}. *) val level : t -> Logging.level (** Serializer for the introspection RPCs *) @@ -178,7 +178,7 @@ module Make val on_error : self -> Request.view -> - Worker_state.request_status -> + Worker_types.request_status -> error list -> unit tzresult Lwt.t @@ -187,7 +187,7 @@ module Make val on_completion : self -> 'a Request.t -> 'a -> - Worker_state.request_status -> + Worker_types.request_status -> unit Lwt.t end @@ -195,7 +195,7 @@ module Make Parameter [queue_size] not passed means unlimited queue. *) val launch : 'kind table -> ?timeout:float -> - Worker_state.limits -> Name.t -> Types.parameters -> + Worker_types.limits -> Name.t -> Types.parameters -> (module HANDLERS with type self = 'kind t) -> 'kind t Lwt.t @@ -257,7 +257,7 @@ module Make val pending_requests : _ queue t -> (Time.t * Request.view) list (** Get the running status of a worker. *) - val status : _ t -> Worker_state.worker_status + val status : _ t -> Worker_types.worker_status (** Get the request being treated by a worker. Gives the time the request was pushed, and the time its @@ -269,7 +269,7 @@ module Make (** Lists the running workers in this group. After they are killed, workers are kept in the table - for a number of seconds given in the {!Worker_state.limits}. *) + for a number of seconds given in the {!Worker_types.limits}. *) val list : 'a table -> (Name.t * 'a t) list end diff --git a/src/lib_node_shell_base/jbuild b/src/lib_node_shell_base/jbuild new file mode 100644 index 000000000..059b9a5c6 --- /dev/null +++ b/src/lib_node_shell_base/jbuild @@ -0,0 +1,14 @@ +(jbuild_version 1) + +(library + ((name tezos_node_shell_base) + (public_name tezos-node-shell-base) + (libraries (tezos-base + tezos-node-p2p-base)) + (flags (:standard -open Tezos_base__TzPervasives + -open Tezos_node_p2p_base)))) + +(alias + ((name runtest_indent) + (deps ((glob_files *.ml) (glob_files *.mli))) + (action (run bash ${libexec:tezos-stdlib:test-ocp-indent.sh} ${^})))) diff --git a/src/lib_node_shell/mempool.ml b/src/lib_node_shell_base/mempool.ml similarity index 56% rename from src/lib_node_shell/mempool.ml rename to src/lib_node_shell_base/mempool.ml index b6eda1408..4189c98c0 100644 --- a/src/lib_node_shell/mempool.ml +++ b/src/lib_node_shell_base/mempool.ml @@ -7,28 +7,22 @@ (* *) (**************************************************************************) -open State - -type t = State.mempool = { +type t = { known_valid: Operation_hash.t list ; pending: Operation_hash.Set.t ; } type mempool = t -let encoding = State.mempool_encoding -let empty = State.empty_mempool - -let set net_state ~head mempool = - update_chain_store net_state begin fun _chain_store data -> - if Block_hash.equal head (Block.hash data.current_head) then - Lwt.return (Some { data with current_mempool = mempool }, - ()) - else - Lwt.return (None, ()) - end - -let get net_state = - read_chain_store net_state begin fun _chain_store data -> - Lwt.return (Block.header data.current_head, data.current_mempool) - end +let encoding = + let open Data_encoding in + conv + (fun { known_valid ; pending } -> (known_valid, pending)) + (fun (known_valid, pending) -> { known_valid ; pending }) + (obj2 + (req "known_valid" (dynamic_size (list Operation_hash.encoding))) + (req "pending" (dynamic_size Operation_hash.Set.encoding))) +let empty = { + known_valid = [] ; + pending = Operation_hash.Set.empty ; +} diff --git a/src/lib_node_shell/mempool.mli b/src/lib_node_shell_base/mempool.mli similarity index 78% rename from src/lib_node_shell/mempool.mli rename to src/lib_node_shell_base/mempool.mli index e82771917..33272c5b8 100644 --- a/src/lib_node_shell/mempool.mli +++ b/src/lib_node_shell_base/mempool.mli @@ -10,7 +10,7 @@ (** Tezos Shell Module - Mempool, a.k.a. the operations safe to be broadcasted. *) -type t = State.mempool = { +type t = { known_valid: Operation_hash.t list ; (** A valid sequence of operations on top of the current head. *) pending: Operation_hash.Set.t ; @@ -22,11 +22,3 @@ val encoding: mempool Data_encoding.t val empty: mempool (** Empty mempool. *) - -val get: State.Net.t -> (Block_header.t * mempool) Lwt.t -(** The current mempool, *) - -val set: State.Net.t -> head:Block_hash.t -> mempool -> unit Lwt.t -(** Set the current mempool. It is ignored if the current head is - not the provided one. *) - diff --git a/src/lib_node_shell_base/prevalidator_worker_state.ml b/src/lib_node_shell_base/prevalidator_worker_state.ml new file mode 100644 index 000000000..acaacf1a5 --- /dev/null +++ b/src/lib_node_shell_base/prevalidator_worker_state.ml @@ -0,0 +1,182 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module Request = struct + type 'a t = + | Flush : Block_hash.t -> unit t + | Notify : P2p_types.Peer_id.t * Mempool.t -> unit t + | Inject : Operation.t -> unit tzresult t + | Arrived : Operation_hash.t * Operation.t -> unit t + | Advertise : unit t + type view = View : _ t -> view + + let view req = View req + + let encoding = + let open Data_encoding in + union + [ case (Tag 0) + (obj2 + (req "request" (constant "flush")) + (req "block" Block_hash.encoding)) + (function View (Flush hash) -> Some ((), hash) | _ -> None) + (fun ((), hash) -> View (Flush hash)) ; + case (Tag 1) + (obj3 + (req "request" (constant "notify")) + (req "peer" P2p_types.Peer_id.encoding) + (req "mempool" Mempool.encoding)) + (function View (Notify (peer, mempool)) -> Some ((), peer, mempool) | _ -> None) + (fun ((), peer, mempool) -> View (Notify (peer, mempool))) ; + case (Tag 2) + (obj2 + (req "request" (constant "inject")) + (req "operation" Operation.encoding)) + (function View (Inject op) -> Some ((), op) | _ -> None) + (fun ((), op) -> View (Inject op)) ; + case (Tag 3) + (obj3 + (req "request" (constant "arrived")) + (req "operation_hash" Operation_hash.encoding) + (req "operation" Operation.encoding)) + (function View (Arrived (oph, op)) -> Some ((), oph, op) | _ -> None) + (fun ((), oph, op) -> View (Arrived (oph, op))) ; + case (Tag 4) + (obj1 (req "request" (constant "advertise"))) + (function View Advertise -> Some () | _ -> None) + (fun () -> View Advertise) ] + + let pp ppf (View r) = match r with + | Flush hash -> + Format.fprintf ppf "switching to new head %a" + Block_hash.pp hash + | Notify (id, { Mempool.known_valid ; pending }) -> + Format.fprintf ppf "@[notified by %a of operations" + P2p_types.Peer_id.pp id ; + List.iter + (fun oph -> + Format.fprintf ppf "@,%a (applied)" + Operation_hash.pp oph) + known_valid ; + List.iter + (fun oph -> + Format.fprintf ppf "@,%a (pending)" + Operation_hash.pp oph) + (Operation_hash.Set.elements pending) ; + Format.fprintf ppf "@]" + | Inject op -> + Format.fprintf ppf "injecting operation %a" + Operation_hash.pp (Operation.hash op) + | Arrived (oph, _) -> + Format.fprintf ppf "operation %a arrived" + Operation_hash.pp oph + | Advertise -> + Format.fprintf ppf "advertising pending operations" +end + +module Event = struct + type t = + | Request of (Request.view * Worker_types.request_status * error list option) + | Debug of string + + let level req = + let open Request in + match req with + | Debug _ -> Logging.Debug + | Request (View (Flush _), _, _) -> Logging.Notice + | Request (View (Notify _), _, _) -> Logging.Debug + | Request (View (Inject _), _, _) -> Logging.Notice + | Request (View (Arrived _), _, _) -> Logging.Debug + | Request (View Advertise, _, _) -> Logging.Debug + + let encoding error_encoding = + let open Data_encoding in + union + [ case (Tag 0) + (obj1 (req "message" string)) + (function Debug msg -> Some msg | _ -> None) + (fun msg -> Debug msg) ; + case (Tag 1) + (obj2 + (req "request" Request.encoding) + (req "status" Worker_types.request_status_encoding)) + (function Request (req, t, None) -> Some (req, t) | _ -> None) + (fun (req, t) -> Request (req, t, None)) ; + case (Tag 2) + (obj3 + (req "error" error_encoding) + (req "failed_request" Request.encoding) + (req "status" Worker_types.request_status_encoding)) + (function Request (req, t, Some errs) -> Some (errs, req, t) | _ -> None) + (fun (errs, req, t) -> Request (req, t, Some errs)) ] + + let pp ppf = function + | Debug msg -> Format.fprintf ppf "%s" msg + | Request (view, { pushed ; treated ; completed }, None) -> + Format.fprintf ppf + "@[%a@,\ + Pushed: %a, Treated: %a, Completed: %a@]" + Request.pp view + Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed + | Request (view, { pushed ; treated ; completed }, Some errors) -> + Format.fprintf ppf + "@[%a@,\ + Pushed: %a, Treated: %a, Failed: %a@,\ + Error: %a@]" + Request.pp view + Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed + Error_monad.pp_print_error errors +end + +module Worker_state = struct + type view = + { head : Block_hash.t ; + timestamp : Time.t ; + fetching : Operation_hash.Set.t ; + pending : Operation_hash.Set.t ; + applied : Operation_hash.t list ; + delayed : Operation_hash.Set.t } + + let encoding = + let open Data_encoding in + conv + (fun { head ; timestamp ; fetching ; pending ; applied ; delayed } -> + (head, timestamp, fetching, pending, applied, delayed)) + (fun (head, timestamp, fetching, pending, applied, delayed) -> + { head ; timestamp ; fetching ; pending ; applied ; delayed }) + (obj6 + (req "head" Block_hash.encoding) + (req "timestamp" Time.encoding) + (req "fetching" Operation_hash.Set.encoding) + (req "pending" Operation_hash.Set.encoding) + (req "applied" (list Operation_hash.encoding)) + (req "delayed" Operation_hash.Set.encoding)) + + let pp ppf view = + Format.fprintf ppf + "@[\ + Head: %a@,\ + Timestamp: %a@, + @[Fetching: %a@]@, + @[Pending: %a@]@, + @[Applied: %a@]@, + @[Delayed: %a@]@]" + Block_hash.pp + view.head + Time.pp_hum + view.timestamp + (Format.pp_print_list Operation_hash.pp) + (Operation_hash.Set.elements view.fetching) + (Format.pp_print_list Operation_hash.pp) + (Operation_hash.Set.elements view.pending) + (Format.pp_print_list Operation_hash.pp) + view.applied + (Format.pp_print_list Operation_hash.pp) + (Operation_hash.Set.elements view.delayed) +end diff --git a/src/lib_node_shell_base/prevalidator_worker_state.mli b/src/lib_node_shell_base/prevalidator_worker_state.mli new file mode 100644 index 000000000..1365baea8 --- /dev/null +++ b/src/lib_node_shell_base/prevalidator_worker_state.mli @@ -0,0 +1,42 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module Request : sig + type 'a t = + | Flush : Block_hash.t -> unit t + | Notify : P2p_types.Peer_id.t * Mempool.t -> unit t + | Inject : Operation.t -> unit tzresult t + | Arrived : Operation_hash.t * Operation.t -> unit t + | Advertise : unit t + type view = View : _ t -> view + val view : 'a t -> view + val encoding : view Data_encoding.t + val pp : Format.formatter -> view -> unit +end + +module Event : sig + type t = + | Request of (Request.view * Worker_types.request_status * error list option) + | Debug of string + val level : t -> Logging.level + val encoding : error list Data_encoding.t -> t Data_encoding.t + val pp : Format.formatter -> t -> unit +end + +module Worker_state : sig + type view = + { head : Block_hash.t ; + timestamp : Time.t ; + fetching : Operation_hash.Set.t ; + pending : Operation_hash.Set.t ; + applied : Operation_hash.t list ; + delayed : Operation_hash.Set.t } + val encoding : view Data_encoding.t + val pp : Format.formatter -> view -> unit +end diff --git a/src/lib_node_shell_base/tezos-node-shell-base.opam b/src/lib_node_shell_base/tezos-node-shell-base.opam new file mode 100644 index 000000000..f4cb6e1fe --- /dev/null +++ b/src/lib_node_shell_base/tezos-node-shell-base.opam @@ -0,0 +1,24 @@ +opam-version: "1.2" +version: "dev" +maintainer: "contact@tezos.com" +authors: [ "Tezos devteam" ] +homepage: "https://www.tezos.com/" +bug-reports: "https://gitlab.com/tezos/tezos/issues" +dev-repo: "https://gitlab.com/tezos/tezos.git" +license: "unreleased" +depends: [ + "ocamlfind" { build } + "jbuilder" { build & >= "1.0+beta15" } + "base-bigarray" + "mtime.clock.os" + "ocplib-resto-cohttp" + "tezos-base" + "tezos-worker" + "tezos-node-p2p-base" +] +build: [ + [ "jbuilder" "build" "-p" name "-j" jobs ] +] +build-test: [ + [ "jbuilder" "runtest" "-p" name "-j" jobs ] +] diff --git a/src/lib_node_shell/worker_state.ml b/src/lib_node_shell_base/worker_types.ml similarity index 100% rename from src/lib_node_shell/worker_state.ml rename to src/lib_node_shell_base/worker_types.ml diff --git a/src/lib_node_shell/worker_state.mli b/src/lib_node_shell_base/worker_types.mli similarity index 100% rename from src/lib_node_shell/worker_state.mli rename to src/lib_node_shell_base/worker_types.mli