Mempool: make Mempool_worker.parse non-blocking.

This commit is contained in:
Pietro Abate 2018-11-13 15:45:55 +01:00 committed by Benjamin Canou
parent b843dbcb7b
commit 56ee8ba849
2 changed files with 26 additions and 47 deletions

View File

@ -51,7 +51,7 @@ module type T = sig
val shutdown : t -> unit Lwt.t val shutdown : t -> unit Lwt.t
(** parse a new operation and add it to the mempool context *) (** parse a new operation and add it to the mempool context *)
val parse : t -> Operation.t -> operation tzresult Lwt.t val parse : t -> Operation.t -> operation tzresult
(** validate a new operation and add it to the mempool context *) (** validate a new operation and add it to the mempool context *)
val validate : t -> operation -> result tzresult Lwt.t val validate : t -> operation -> result tzresult Lwt.t
@ -149,9 +149,7 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
module Request = struct module Request = struct
type 'a t = type 'a t = Validate : operation -> result t
| Parse : Operation.t -> operation t
| Validate : operation -> result t
type view = View : _ t -> view type view = View : _ t -> view
@ -159,32 +157,13 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
let encoding = let encoding =
let open Data_encoding in let open Data_encoding in
union conv
[ case (Tag 0) (fun (View (Validate op)) -> op)
~title:"Parsed_operation" (fun op -> View (Validate op))
(* XXX: can't I use operation_encoding defined above ? *) operation_encoding
(obj3
(req "hash" Operation_hash.encoding)
(req "raw" Operation.encoding)
(req "protocol_data" Proto.operation_data_encoding))
(function
| View (Validate { hash ; raw ; protocol_data }) ->
Some ( hash, raw, protocol_data )
| _ -> None)
(fun ( hash, raw, protocol_data ) ->
View (Validate { hash ; raw ; protocol_data })) ;
case (Tag 1)
~title:"Raw operation"
Operation.encoding
(function | View (Parse op) -> Some op | _ -> None)
(fun op -> View (Parse op))
]
let pp ppf = function let pp ppf (View (Validate { hash })) =
| View (Parse op) -> Format.fprintf ppf "New parsed operation hash %a" Operation_hash.pp hash
Format.fprintf ppf "New raw operation %a" Operation.pp op
| View (Validate { hash }) ->
Format.fprintf ppf "New parsed operation hash %a" Operation_hash.pp hash
end end
module Event = struct module Event = struct
@ -468,15 +447,15 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
let hash = Operation.hash raw_op in let hash = Operation.hash raw_op in
let size = Data_encoding.Binary.length Operation.encoding raw_op in let size = Data_encoding.Binary.length Operation.encoding raw_op in
if size > Proto.max_operation_data_length then if size > Proto.max_operation_data_length then
fail (Oversized_operation error (Oversized_operation
{ size ; max = Proto.max_operation_data_length }) { size ; max = Proto.max_operation_data_length })
else else
match Data_encoding.Binary.of_bytes match Data_encoding.Binary.of_bytes
Proto.operation_data_encoding Proto.operation_data_encoding
raw_op.Operation.proto with raw_op.Operation.proto with
| None -> fail Parse_error | None -> error Parse_error
| Some protocol_data -> | Some protocol_data ->
return { hash ; raw = raw_op ; protocol_data } ok { hash ; raw = raw_op ; protocol_data }
(* this function update the internal state of the worker *) (* this function update the internal state of the worker *)
let validate_helper w parsed_op = let validate_helper w parsed_op =
@ -510,25 +489,17 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
Lwt.return result Lwt.return result
| Some result -> Lwt.return result | Some result -> Lwt.return result
let on_parse w raw_op =
let state = Worker.state w in
match Cache.find_parsed_opt state.cache raw_op with
| None ->
parse_helper w raw_op >>= fun parsed_op ->
Cache.add_parsed state.cache raw_op parsed_op;
Lwt.return parsed_op
| Some parsed_op -> Lwt.return parsed_op
(* worker's handlers *) (* worker's handlers *)
let on_request : let on_request :
type r. t -> r Request.t -> r tzresult Lwt.t = fun w request -> type r. t -> r Request.t -> r tzresult Lwt.t = fun w request ->
match request with match request with
| Request.Parse raw_op -> on_parse w raw_op
| Request.Validate parsed_op -> on_validate w parsed_op >>= return | Request.Validate parsed_op -> on_validate w parsed_op >>= return
let on_launch (_ : t) (_ : Name.t) ( { chain_db ; validation_state } as parameters ) = let on_launch (_ : t) (_ : Name.t) ( { chain_db ; validation_state } as parameters ) =
let chain_state = Distributed_db.chain_state chain_db in let chain_state = Distributed_db.chain_state chain_db in
Chain.data chain_state >>= fun { current_mempool = _mempool ; live_blocks ; live_operations } -> Chain.data chain_state >>= fun {
current_mempool = _mempool ;
live_blocks ; live_operations } ->
Lwt.return { Lwt.return {
validation_state ; validation_state ;
cache = Cache.create () ; cache = Cache.create () ;
@ -585,8 +556,16 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
let validate t parsed_op = let validate t parsed_op =
Worker.push_request_and_wait t (Request.Validate parsed_op) Worker.push_request_and_wait t (Request.Validate parsed_op)
let parse t op = (* atomic parse + memoization *)
Worker.push_request_and_wait t (Request.Parse op) let parse t raw_op =
let state = Worker.state t in
begin match Cache.find_parsed_opt state.cache raw_op with
| None ->
let parsed_op = parse_helper t raw_op in
Cache.add_parsed state.cache raw_op parsed_op;
parsed_op
| Some parsed_op -> parsed_op
end
let chain_db t = let chain_db t =
let state = Worker.state t in let state = Worker.state t in

View File

@ -52,7 +52,7 @@ module type T = sig
val shutdown : t -> unit Lwt.t val shutdown : t -> unit Lwt.t
(** parse a new operation and add it to the mempool context *) (** parse a new operation and add it to the mempool context *)
val parse : t -> Operation.t -> operation tzresult Lwt.t val parse : t -> Operation.t -> operation tzresult
(** validate a new operation and add it to the mempool context *) (** validate a new operation and add it to the mempool context *)
val validate : t -> operation -> result tzresult Lwt.t val validate : t -> operation -> result tzresult Lwt.t