Shell: introduce Mempool_peer_worker

A module for the ongoing mempool overhaul.

Co-authored-by: Pietro Abate <pietro.abate@tezcore.com>
Co-authored-by: Raphaël Proust <code@bnwr.net>
Co-authored-by: MBourgoin <mathias.bourgoin@tezcore.com>
This commit is contained in:
Raphaël Proust 2018-11-08 10:07:42 +08:00 committed by MBourgoin
parent ad9d087031
commit 6a14afc1b0
No known key found for this signature in database
GPG Key ID: 4B3F7008ABB5B2D0
4 changed files with 51 additions and 40 deletions

View File

@ -36,7 +36,7 @@ module type T = sig
type t
type input = Operation_hash.t list
val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> input -> t Lwt.t
val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> t Lwt.t
val shutdown: t -> input Lwt.t
val validate: Mempool_worker.t -> t -> input -> unit tzresult Lwt.t
@ -62,6 +62,29 @@ module Make (Mempool_worker: Mempool_worker.T)
type output = result Operation_hash.Map.t
let pp_input = Format.pp_print_list Operation_hash.pp
let result_encoding =
let open Data_encoding in
union
[ case (Tag 0)
~title:"Cannot download"
(obj1 (req "download_errors" (list Error_monad.error_encoding)))
(function Cannot_download errs -> Some errs | _ -> None)
(fun errs -> Cannot_download errs) ;
case (Tag 1)
~title:"Cannot parse"
(obj1 (req "parse_errors" (list Error_monad.error_encoding)))
(function Cannot_parse errs -> Some errs | _ -> None)
(fun errs -> Cannot_parse errs) ;
case (Tag 2)
~title:"Cannot validate"
(obj1 (req "validation_errors" (list Error_monad.error_encoding)))
(function Cannot_validate errs -> Some errs | _ -> None)
(fun errs -> Cannot_validate errs) ;
case (Tag 3)
~title:"Validation result"
(obj1 (req "validation_result" Mempool_worker.result_encoding))
(function Mempool_result result -> Some result | _ -> None)
(fun result -> Mempool_result result) ]
module Log = Tezos_stdlib.Logging.Make(struct
let name = "node.mempool.peer_worker"
@ -76,7 +99,6 @@ module Make (Mempool_worker: Mempool_worker.T)
type t = {
received: Operation_hash.t Queue.t;
downloading: (Operation_hash.t * Operation.t tzresult Lwt.t) Queue.t;
parsing: (Operation_hash.t * Mempool_worker.operation tzresult Lwt.t) Queue.t;
applying: (Mempool_worker.operation * Mempool_worker.result tzresult Lwt.t) Queue.t;
mutable results: result Operation_hash.Map.t
}
@ -86,7 +108,6 @@ module Make (Mempool_worker: Mempool_worker.T)
let is_empty t =
Queue.is_empty t.received &&
Queue.is_empty t.downloading &&
Queue.is_empty t.parsing &&
Queue.is_empty t.applying
let has_resolved t = match Lwt.state t with
@ -108,7 +129,6 @@ module Make (Mempool_worker: Mempool_worker.T)
in
Lwt.choose (
(first_task_or_never t.downloading) ::
(first_task_or_never t.parsing) ::
(first_task_or_never t.applying) ::
[]
)
@ -125,7 +145,6 @@ module Make (Mempool_worker: Mempool_worker.T)
{
received = q_of_list op_hashes;
downloading = Queue.create ();
parsing = Queue.create ();
applying = Queue.create ();
results = Operation_hash.Map.empty;
}
@ -133,7 +152,6 @@ module Make (Mempool_worker: Mempool_worker.T)
let cancel pipeline =
let cancel_snd (_, p) = Lwt.cancel p in
Queue.iter cancel_snd pipeline.downloading;
Queue.iter cancel_snd pipeline.parsing;
Queue.iter cancel_snd pipeline.applying
@ -156,18 +174,6 @@ module Make (Mempool_worker: Mempool_worker.T)
Lwt.return_unit
end
else if head_is_resolved pipeline.parsing then begin
let (op_hash, mop) = Queue.pop pipeline.parsing in
mop >>= function
| Error errs ->
record_result pipeline op_hash (Cannot_parse errs);
Lwt.return_unit
| Ok mop ->
let p = Mempool_worker.validate mempool_worker mop in
Queue.push (mop, p) pipeline.applying;
Lwt.return_unit
end
else if head_is_resolved pipeline.downloading then begin
let (op_hash, p) = Queue.pop pipeline.downloading in
p >>= function
@ -175,9 +181,14 @@ module Make (Mempool_worker: Mempool_worker.T)
record_result pipeline op_hash (Cannot_download errs);
Lwt.return_unit
| Ok op ->
let p = Mempool_worker.parse mempool_worker op in
Queue.push (op_hash, p) pipeline.parsing;
Lwt.return_unit
match Mempool_worker.parse op with
| Error errs ->
record_result pipeline op_hash (Cannot_parse errs);
Lwt.return_unit
| Ok mop ->
let p = Mempool_worker.validate mempool_worker mop in
Queue.push (mop, p) pipeline.applying;
Lwt.return_unit
end
else if (not (Queue.is_empty pipeline.received)) then begin
@ -234,7 +245,7 @@ module Make (Mempool_worker: Mempool_worker.T)
module Event = struct
type t =
| Start of input
| End_ok of (Request.view * Worker_types.request_status)
| End_ok of (Request.view * Worker_types.request_status * output)
| End_error of (Request.view * Worker_types.request_status * error list)
let level req =
@ -253,11 +264,12 @@ module Make (Mempool_worker: Mempool_worker.T)
(fun input -> Start input) ;
case (Tag 1)
~title:"End_ok"
(obj2
(obj3
(req "request" Request.encoding)
(req "status" Worker_types.request_status_encoding))
(function End_ok (view, status) -> Some (view, status) | _ -> None)
(fun (view, status) -> End_ok (view, status)) ;
(req "status" Worker_types.request_status_encoding)
(req "output" (Operation_hash.Map.encoding result_encoding)))
(function End_ok (view, status, result) -> Some (view, status, result) | _ -> None)
(fun (view, status, result) -> End_ok (view, status, result)) ;
case (Tag 2)
~title:"End_error"
(obj3
@ -273,7 +285,7 @@ module Make (Mempool_worker: Mempool_worker.T)
"@[<v 0>Starting: %a@]"
pp_input
input
| End_ok (view, _) ->
| End_ok (view, _, _) ->
Format.fprintf ppf
"@[<v 0>Finished: %a@]"
Request.pp view
@ -323,9 +335,11 @@ module Make (Mempool_worker: Mempool_worker.T)
let on_completion
: type a. self -> a Request.t -> a -> Worker_types.request_status -> unit Lwt.t
= fun t r _ st ->
Worker.record_event t (Event.End_ok (Request.view r, st)) ;
Lwt.return_unit
= fun t req output st ->
match req with
| Request.Batch _ ->
Worker.record_event t (Event.End_ok (Request.view req, st, output)) ;
Lwt.return_unit
end
@ -337,12 +351,8 @@ module Make (Mempool_worker: Mempool_worker.T)
Worker.push_request_and_wait t (Request.Batch (mempool_worker, os))
>>=? fun (_: output) -> return_unit
let create limits peer_id mempool_worker input =
Worker.launch table limits.worker_limits peer_id mempool_worker (module Handlers) >>= fun w ->
validate mempool_worker w input >>= fun (_: unit tzresult) ->
(* NOTE: We ignore errors here. The validation of the [input] is only for
* recycling purposes and is not essential. *)
Lwt.return w
let create limits peer_id mempool_worker =
Worker.launch table limits.worker_limits peer_id mempool_worker (module Handlers)
let shutdown w =
let recycled = Operation_hash.Set.empty in

View File

@ -43,12 +43,11 @@ module type T = sig
* their validity before gossiping them furhter. *)
type input = Operation_hash.t list
(** [create limits peer_id mempool_worker input] creates a peer worker meant
(** [create limits peer_id mempool_worker] creates a peer worker meant
* to be used for validating batches of operations sent by the peer [peer_id].
* The [mempool_worker] the underlying worker that individual validations of
* singular operations are delegated to. The [input[] argument is for recycled
* operations that are carried over when the protocol updates. *)
val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> input -> t Lwt.t
* singular operations are delegated to. *)
val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> t Lwt.t
(** [shutdown t] closes the peer worker [t]. It returns a list of operation
* hashes that can be recycled when a new worker is created for the same peer.

View File

@ -45,6 +45,7 @@ module type T = sig
| Refused of error list
| Duplicate
| Not_in_branch
val result_encoding : result Data_encoding.t
(** Creates/tear-down a new mempool validator context. *)
val create : limits -> Distributed_db.chain_db -> t tzresult Lwt.t

View File

@ -46,6 +46,7 @@ module type T = sig
| Refused of error list
| Duplicate
| Not_in_branch
val result_encoding : result Data_encoding.t
(** Creates/tear-down a new mempool validator context. *)
val create : limits -> Distributed_db.chain_db -> t tzresult Lwt.t