From 71790470ad7f22bac6395377df4c48e2ab867d74 Mon Sep 17 00:00:00 2001 From: Pietro Abate Date: Tue, 13 Nov 2018 15:45:55 +0100 Subject: [PATCH] Mempool: make Mempool_worker.parse non-blocking. --- src/lib_shell/mempool_worker.ml | 29 +++++++++++------------------ src/lib_shell/mempool_worker.mli | 4 ++-- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/src/lib_shell/mempool_worker.ml b/src/lib_shell/mempool_worker.ml index 209cd7e30..f26bb792e 100644 --- a/src/lib_shell/mempool_worker.ml +++ b/src/lib_shell/mempool_worker.ml @@ -51,7 +51,7 @@ module type T = sig val shutdown : t -> unit Lwt.t (** parse a new operation and add it to the mempool context *) - val parse : Operation.t -> operation tzresult + val parse : t -> Operation.t -> operation tzresult (** validate a new operation and add it to the mempool context *) val validate : t -> operation -> result tzresult Lwt.t @@ -155,7 +155,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) module Request = struct - type 'a t = Validate : operation -> result t [@@ocaml.unboxed] + type 'a t = Validate : operation -> result t type view = View : _ t -> view @@ -169,7 +169,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) operation_encoding let pp ppf (View (Validate { hash })) = - Format.fprintf ppf "Validating new operation %a" Operation_hash.pp hash + Format.fprintf ppf "New parsed operation hash %a" Operation_hash.pp hash end module Event = struct @@ -500,25 +500,17 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) Lwt.return result | Some result -> Lwt.return result - let on_parse w raw_op = - let state = Worker.state w in - match Cache.find_parsed_opt state.cache raw_op with - | None -> - parse_helper w raw_op >>= fun parsed_op -> - Cache.add_parsed state.cache raw_op parsed_op; - Lwt.return parsed_op - | Some parsed_op -> Lwt.return parsed_op - (* worker's handlers *) let on_request : type r. t -> r Request.t -> r tzresult Lwt.t = fun w request -> match request with - | Request.Parse raw_op -> on_parse w raw_op | Request.Validate parsed_op -> on_validate w parsed_op >>= return let on_launch (_ : t) (_ : Name.t) ( { chain_db ; validation_state } as parameters ) = let chain_state = Distributed_db.chain_state chain_db in - Chain.data chain_state >>= fun { current_mempool = _mempool ; live_blocks ; live_operations } -> + Chain.data chain_state >>= fun { + current_mempool = _mempool ; + live_blocks ; live_operations } -> Lwt.return { validation_state ; cache = ValidatedCache.create () ; @@ -576,11 +568,12 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) Worker.push_request_and_wait t (Request.Validate parsed_op) (* atomic parse + memoization *) - let parse raw_op = - begin match ParsedCache.find_opt parsed_cache raw_op with + let parse t raw_op = + let state = Worker.state t in + begin match Cache.find_parsed_opt state.cache raw_op with | None -> - let parsed_op = parse_helper raw_op in - ParsedCache.add parsed_cache raw_op parsed_op; + let parsed_op = parse_helper t raw_op in + Cache.add_parsed state.cache raw_op parsed_op; parsed_op | Some parsed_op -> parsed_op end diff --git a/src/lib_shell/mempool_worker.mli b/src/lib_shell/mempool_worker.mli index f5c33e8c9..a485641b7 100644 --- a/src/lib_shell/mempool_worker.mli +++ b/src/lib_shell/mempool_worker.mli @@ -51,8 +51,8 @@ module type T = sig val create : limits -> Distributed_db.chain_db -> t tzresult Lwt.t val shutdown : t -> unit Lwt.t - (** parse a new operation *) - val parse : Operation.t -> operation tzresult + (** parse a new operation and add it to the mempool context *) + val parse : t -> Operation.t -> operation tzresult (** validate a new operation and add it to the mempool context *) val validate : t -> operation -> result tzresult Lwt.t