Mempool: make Mempool_worker.parse non-blocking.

This commit is contained in:
Pietro Abate 2018-11-13 15:45:55 +01:00 committed by MBourgoin
parent 6b7031ad3c
commit 71790470ad
No known key found for this signature in database
GPG Key ID: 4B3F7008ABB5B2D0
2 changed files with 13 additions and 20 deletions

View File

@ -51,7 +51,7 @@ module type T = sig
val shutdown : t -> unit Lwt.t
(** parse a new operation and add it to the mempool context *)
val parse : Operation.t -> operation tzresult
val parse : t -> Operation.t -> operation tzresult
(** validate a new operation and add it to the mempool context *)
val validate : t -> operation -> result tzresult Lwt.t
@ -155,7 +155,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
module Request = struct
type 'a t = Validate : operation -> result t [@@ocaml.unboxed]
type 'a t = Validate : operation -> result t
type view = View : _ t -> view
@ -169,7 +169,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
operation_encoding
let pp ppf (View (Validate { hash })) =
Format.fprintf ppf "Validating new operation %a" Operation_hash.pp hash
Format.fprintf ppf "New parsed operation hash %a" Operation_hash.pp hash
end
module Event = struct
@ -500,25 +500,17 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
Lwt.return result
| Some result -> Lwt.return result
let on_parse w raw_op =
let state = Worker.state w in
match Cache.find_parsed_opt state.cache raw_op with
| None ->
parse_helper w raw_op >>= fun parsed_op ->
Cache.add_parsed state.cache raw_op parsed_op;
Lwt.return parsed_op
| Some parsed_op -> Lwt.return parsed_op
(* worker's handlers *)
let on_request :
type r. t -> r Request.t -> r tzresult Lwt.t = fun w request ->
match request with
| Request.Parse raw_op -> on_parse w raw_op
| Request.Validate parsed_op -> on_validate w parsed_op >>= return
let on_launch (_ : t) (_ : Name.t) ( { chain_db ; validation_state } as parameters ) =
let chain_state = Distributed_db.chain_state chain_db in
Chain.data chain_state >>= fun { current_mempool = _mempool ; live_blocks ; live_operations } ->
Chain.data chain_state >>= fun {
current_mempool = _mempool ;
live_blocks ; live_operations } ->
Lwt.return {
validation_state ;
cache = ValidatedCache.create () ;
@ -576,11 +568,12 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
Worker.push_request_and_wait t (Request.Validate parsed_op)
(* atomic parse + memoization *)
let parse raw_op =
begin match ParsedCache.find_opt parsed_cache raw_op with
let parse t raw_op =
let state = Worker.state t in
begin match Cache.find_parsed_opt state.cache raw_op with
| None ->
let parsed_op = parse_helper raw_op in
ParsedCache.add parsed_cache raw_op parsed_op;
let parsed_op = parse_helper t raw_op in
Cache.add_parsed state.cache raw_op parsed_op;
parsed_op
| Some parsed_op -> parsed_op
end

View File

@ -51,8 +51,8 @@ module type T = sig
val create : limits -> Distributed_db.chain_db -> t tzresult Lwt.t
val shutdown : t -> unit Lwt.t
(** parse a new operation *)
val parse : Operation.t -> operation tzresult
(** parse a new operation and add it to the mempool context *)
val parse : t -> Operation.t -> operation tzresult
(** validate a new operation and add it to the mempool context *)
val validate : t -> operation -> result tzresult Lwt.t