diff --git a/src/lib_shell/mempool_peer_worker.ml b/src/lib_shell/mempool_peer_worker.ml index 6273bdbf8..3aad729fd 100644 --- a/src/lib_shell/mempool_peer_worker.ml +++ b/src/lib_shell/mempool_peer_worker.ml @@ -36,7 +36,7 @@ module type T = sig type t type input = Operation_hash.t list - val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> input -> t Lwt.t + val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> t Lwt.t val shutdown: t -> input Lwt.t val validate: Mempool_worker.t -> t -> input -> unit tzresult Lwt.t @@ -62,6 +62,29 @@ module Make (Mempool_worker: Mempool_worker.T) type output = result Operation_hash.Map.t let pp_input = Format.pp_print_list Operation_hash.pp + let result_encoding = + let open Data_encoding in + union + [ case (Tag 0) + ~title:"Cannot download" + (obj1 (req "download_errors" (list Error_monad.error_encoding))) + (function Cannot_download errs -> Some errs | _ -> None) + (fun errs -> Cannot_download errs) ; + case (Tag 1) + ~title:"Cannot parse" + (obj1 (req "parse_errors" (list Error_monad.error_encoding))) + (function Cannot_parse errs -> Some errs | _ -> None) + (fun errs -> Cannot_parse errs) ; + case (Tag 2) + ~title:"Cannot validate" + (obj1 (req "validation_errors" (list Error_monad.error_encoding))) + (function Cannot_validate errs -> Some errs | _ -> None) + (fun errs -> Cannot_validate errs) ; + case (Tag 3) + ~title:"Validation result" + (obj1 (req "validation_result" Mempool_worker.result_encoding)) + (function Mempool_result result -> Some result | _ -> None) + (fun result -> Mempool_result result) ] module Log = Tezos_stdlib.Logging.Make(struct let name = "node.mempool.peer_worker" @@ -76,7 +99,6 @@ module Make (Mempool_worker: Mempool_worker.T) type t = { received: Operation_hash.t Queue.t; downloading: (Operation_hash.t * Operation.t tzresult Lwt.t) Queue.t; - parsing: (Operation_hash.t * Mempool_worker.operation tzresult Lwt.t) Queue.t; applying: (Mempool_worker.operation * Mempool_worker.result tzresult Lwt.t) Queue.t; mutable results: result Operation_hash.Map.t } @@ -86,7 +108,6 @@ module Make (Mempool_worker: Mempool_worker.T) let is_empty t = Queue.is_empty t.received && Queue.is_empty t.downloading && - Queue.is_empty t.parsing && Queue.is_empty t.applying let has_resolved t = match Lwt.state t with @@ -108,7 +129,6 @@ module Make (Mempool_worker: Mempool_worker.T) in Lwt.choose ( (first_task_or_never t.downloading) :: - (first_task_or_never t.parsing) :: (first_task_or_never t.applying) :: [] ) @@ -125,7 +145,6 @@ module Make (Mempool_worker: Mempool_worker.T) { received = q_of_list op_hashes; downloading = Queue.create (); - parsing = Queue.create (); applying = Queue.create (); results = Operation_hash.Map.empty; } @@ -133,7 +152,6 @@ module Make (Mempool_worker: Mempool_worker.T) let cancel pipeline = let cancel_snd (_, p) = Lwt.cancel p in Queue.iter cancel_snd pipeline.downloading; - Queue.iter cancel_snd pipeline.parsing; Queue.iter cancel_snd pipeline.applying @@ -156,18 +174,6 @@ module Make (Mempool_worker: Mempool_worker.T) Lwt.return_unit end - else if head_is_resolved pipeline.parsing then begin - let (op_hash, mop) = Queue.pop pipeline.parsing in - mop >>= function - | Error errs -> - record_result pipeline op_hash (Cannot_parse errs); - Lwt.return_unit - | Ok mop -> - let p = Mempool_worker.validate mempool_worker mop in - Queue.push (mop, p) pipeline.applying; - Lwt.return_unit - end - else if head_is_resolved pipeline.downloading then begin let (op_hash, p) = Queue.pop pipeline.downloading in p >>= function @@ -175,9 +181,14 @@ module Make (Mempool_worker: Mempool_worker.T) record_result pipeline op_hash (Cannot_download errs); Lwt.return_unit | Ok op -> - let p = Mempool_worker.parse mempool_worker op in - Queue.push (op_hash, p) pipeline.parsing; - Lwt.return_unit + match Mempool_worker.parse op with + | Error errs -> + record_result pipeline op_hash (Cannot_parse errs); + Lwt.return_unit + | Ok mop -> + let p = Mempool_worker.validate mempool_worker mop in + Queue.push (mop, p) pipeline.applying; + Lwt.return_unit end else if (not (Queue.is_empty pipeline.received)) then begin @@ -234,7 +245,7 @@ module Make (Mempool_worker: Mempool_worker.T) module Event = struct type t = | Start of input - | End_ok of (Request.view * Worker_types.request_status) + | End_ok of (Request.view * Worker_types.request_status * output) | End_error of (Request.view * Worker_types.request_status * error list) let level req = @@ -253,11 +264,12 @@ module Make (Mempool_worker: Mempool_worker.T) (fun input -> Start input) ; case (Tag 1) ~title:"End_ok" - (obj2 + (obj3 (req "request" Request.encoding) - (req "status" Worker_types.request_status_encoding)) - (function End_ok (view, status) -> Some (view, status) | _ -> None) - (fun (view, status) -> End_ok (view, status)) ; + (req "status" Worker_types.request_status_encoding) + (req "output" (Operation_hash.Map.encoding result_encoding))) + (function End_ok (view, status, result) -> Some (view, status, result) | _ -> None) + (fun (view, status, result) -> End_ok (view, status, result)) ; case (Tag 2) ~title:"End_error" (obj3 @@ -273,7 +285,7 @@ module Make (Mempool_worker: Mempool_worker.T) "@[Starting: %a@]" pp_input input - | End_ok (view, _) -> + | End_ok (view, _, _) -> Format.fprintf ppf "@[Finished: %a@]" Request.pp view @@ -323,9 +335,11 @@ module Make (Mempool_worker: Mempool_worker.T) let on_completion : type a. self -> a Request.t -> a -> Worker_types.request_status -> unit Lwt.t - = fun t r _ st -> - Worker.record_event t (Event.End_ok (Request.view r, st)) ; - Lwt.return_unit + = fun t req output st -> + match req with + | Request.Batch _ -> + Worker.record_event t (Event.End_ok (Request.view req, st, output)) ; + Lwt.return_unit end @@ -337,12 +351,8 @@ module Make (Mempool_worker: Mempool_worker.T) Worker.push_request_and_wait t (Request.Batch (mempool_worker, os)) >>=? fun (_: output) -> return_unit - let create limits peer_id mempool_worker input = - Worker.launch table limits.worker_limits peer_id mempool_worker (module Handlers) >>= fun w -> - validate mempool_worker w input >>= fun (_: unit tzresult) -> - (* NOTE: We ignore errors here. The validation of the [input] is only for - * recycling purposes and is not essential. *) - Lwt.return w + let create limits peer_id mempool_worker = + Worker.launch table limits.worker_limits peer_id mempool_worker (module Handlers) let shutdown w = let recycled = Operation_hash.Set.empty in diff --git a/src/lib_shell/mempool_peer_worker.mli b/src/lib_shell/mempool_peer_worker.mli index c16c21e43..014037220 100644 --- a/src/lib_shell/mempool_peer_worker.mli +++ b/src/lib_shell/mempool_peer_worker.mli @@ -43,12 +43,11 @@ module type T = sig * their validity before gossiping them furhter. *) type input = Operation_hash.t list - (** [create limits peer_id mempool_worker input] creates a peer worker meant + (** [create limits peer_id mempool_worker] creates a peer worker meant * to be used for validating batches of operations sent by the peer [peer_id]. * The [mempool_worker] the underlying worker that individual validations of - * singular operations are delegated to. The [input[] argument is for recycled - * operations that are carried over when the protocol updates. *) - val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> input -> t Lwt.t + * singular operations are delegated to. *) + val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> t Lwt.t (** [shutdown t] closes the peer worker [t]. It returns a list of operation * hashes that can be recycled when a new worker is created for the same peer. diff --git a/src/lib_shell/mempool_worker.ml b/src/lib_shell/mempool_worker.ml index ca7da1a97..bca7bba98 100644 --- a/src/lib_shell/mempool_worker.ml +++ b/src/lib_shell/mempool_worker.ml @@ -45,6 +45,7 @@ module type T = sig | Refused of error list | Duplicate | Not_in_branch + val result_encoding : result Data_encoding.t (** Creates/tear-down a new mempool validator context. *) val create : limits -> Distributed_db.chain_db -> t tzresult Lwt.t diff --git a/src/lib_shell/mempool_worker.mli b/src/lib_shell/mempool_worker.mli index f5c33e8c9..922c9faee 100644 --- a/src/lib_shell/mempool_worker.mli +++ b/src/lib_shell/mempool_worker.mli @@ -46,6 +46,7 @@ module type T = sig | Refused of error list | Duplicate | Not_in_branch + val result_encoding : result Data_encoding.t (** Creates/tear-down a new mempool validator context. *) val create : limits -> Distributed_db.chain_db -> t tzresult Lwt.t