From 56ee8ba8492bdfb89ac71c40b0dfe38cf6d0138c Mon Sep 17 00:00:00 2001 From: Pietro Abate Date: Tue, 13 Nov 2018 15:45:55 +0100 Subject: [PATCH] Mempool: make Mempool_worker.parse non-blocking. --- src/lib_shell/mempool_worker.ml | 71 +++++++++++--------------------- src/lib_shell/mempool_worker.mli | 2 +- 2 files changed, 26 insertions(+), 47 deletions(-) diff --git a/src/lib_shell/mempool_worker.ml b/src/lib_shell/mempool_worker.ml index 267846214..8711a8844 100644 --- a/src/lib_shell/mempool_worker.ml +++ b/src/lib_shell/mempool_worker.ml @@ -51,7 +51,7 @@ module type T = sig val shutdown : t -> unit Lwt.t (** parse a new operation and add it to the mempool context *) - val parse : t -> Operation.t -> operation tzresult Lwt.t + val parse : t -> Operation.t -> operation tzresult (** validate a new operation and add it to the mempool context *) val validate : t -> operation -> result tzresult Lwt.t @@ -149,9 +149,7 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct module Request = struct - type 'a t = - | Parse : Operation.t -> operation t - | Validate : operation -> result t + type 'a t = Validate : operation -> result t type view = View : _ t -> view @@ -159,32 +157,13 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct let encoding = let open Data_encoding in - union - [ case (Tag 0) - ~title:"Parsed_operation" - (* XXX: can't I use operation_encoding defined above ? *) - (obj3 - (req "hash" Operation_hash.encoding) - (req "raw" Operation.encoding) - (req "protocol_data" Proto.operation_data_encoding)) - (function - | View (Validate { hash ; raw ; protocol_data }) -> - Some ( hash, raw, protocol_data ) - | _ -> None) - (fun ( hash, raw, protocol_data ) -> - View (Validate { hash ; raw ; protocol_data })) ; - case (Tag 1) - ~title:"Raw operation" - Operation.encoding - (function | View (Parse op) -> Some op | _ -> None) - (fun op -> View (Parse op)) - ] + conv + (fun (View (Validate op)) -> op) + (fun op -> View (Validate op)) + operation_encoding - let pp ppf = function - | View (Parse op) -> - Format.fprintf ppf "New raw operation %a" Operation.pp op - | View (Validate { hash }) -> - Format.fprintf ppf "New parsed operation hash %a" Operation_hash.pp hash + let pp ppf (View (Validate { hash })) = + Format.fprintf ppf "New parsed operation hash %a" Operation_hash.pp hash end module Event = struct @@ -468,15 +447,15 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct let hash = Operation.hash raw_op in let size = Data_encoding.Binary.length Operation.encoding raw_op in if size > Proto.max_operation_data_length then - fail (Oversized_operation - { size ; max = Proto.max_operation_data_length }) + error (Oversized_operation + { size ; max = Proto.max_operation_data_length }) else match Data_encoding.Binary.of_bytes Proto.operation_data_encoding raw_op.Operation.proto with - | None -> fail Parse_error + | None -> error Parse_error | Some protocol_data -> - return { hash ; raw = raw_op ; protocol_data } + ok { hash ; raw = raw_op ; protocol_data } (* this function update the internal state of the worker *) let validate_helper w parsed_op = @@ -510,25 +489,17 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct Lwt.return result | Some result -> Lwt.return result - let on_parse w raw_op = - let state = Worker.state w in - match Cache.find_parsed_opt state.cache raw_op with - | None -> - parse_helper w raw_op >>= fun parsed_op -> - Cache.add_parsed state.cache raw_op parsed_op; - Lwt.return parsed_op - | Some parsed_op -> Lwt.return parsed_op - (* worker's handlers *) let on_request : type r. t -> r Request.t -> r tzresult Lwt.t = fun w request -> match request with - | Request.Parse raw_op -> on_parse w raw_op | Request.Validate parsed_op -> on_validate w parsed_op >>= return let on_launch (_ : t) (_ : Name.t) ( { chain_db ; validation_state } as parameters ) = let chain_state = Distributed_db.chain_state chain_db in - Chain.data chain_state >>= fun { current_mempool = _mempool ; live_blocks ; live_operations } -> + Chain.data chain_state >>= fun { + current_mempool = _mempool ; + live_blocks ; live_operations } -> Lwt.return { validation_state ; cache = Cache.create () ; @@ -585,8 +556,16 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct let validate t parsed_op = Worker.push_request_and_wait t (Request.Validate parsed_op) - let parse t op = - Worker.push_request_and_wait t (Request.Parse op) + (* atomic parse + memoization *) + let parse t raw_op = + let state = Worker.state t in + begin match Cache.find_parsed_opt state.cache raw_op with + | None -> + let parsed_op = parse_helper t raw_op in + Cache.add_parsed state.cache raw_op parsed_op; + parsed_op + | Some parsed_op -> parsed_op + end let chain_db t = let state = Worker.state t in diff --git a/src/lib_shell/mempool_worker.mli b/src/lib_shell/mempool_worker.mli index d113f4497..e3def9ea2 100644 --- a/src/lib_shell/mempool_worker.mli +++ b/src/lib_shell/mempool_worker.mli @@ -52,7 +52,7 @@ module type T = sig val shutdown : t -> unit Lwt.t (** parse a new operation and add it to the mempool context *) - val parse : t -> Operation.t -> operation tzresult Lwt.t + val parse : t -> Operation.t -> operation tzresult (** validate a new operation and add it to the mempool context *) val validate : t -> operation -> result tzresult Lwt.t