From 6b7031ad3c325c156e34d19cc234bd58c32d5a7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Thu, 8 Nov 2018 19:45:10 +0800 Subject: [PATCH] Mempool_peer_worker: simpler interface Removes unecessary primitives, types, etc. --- src/lib_shell/mempool_peer_worker.ml | 74 ++++++++++++++++----------- src/lib_shell/mempool_peer_worker.mli | 49 +++++------------- 2 files changed, 58 insertions(+), 65 deletions(-) diff --git a/src/lib_shell/mempool_peer_worker.ml b/src/lib_shell/mempool_peer_worker.ml index 8487fb736..6273bdbf8 100644 --- a/src/lib_shell/mempool_peer_worker.ml +++ b/src/lib_shell/mempool_peer_worker.ml @@ -31,31 +31,21 @@ type limits = { } module type T = sig - module Proto: Registered_protocol.T - module Mempool_worker: Mempool_worker.T with module Proto = Proto + module Mempool_worker: Mempool_worker.T type t type input = Operation_hash.t list - type result = - | Cannot_download of error list - | Cannot_parse of error list - | Cannot_validate of error list - | Mempool_result of Mempool_worker.result - type output = result Operation_hash.Map.t - val create: limits -> P2p_peer.Id.t -> t Lwt.t - val shutdown: t -> unit Lwt.t + val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> input -> t Lwt.t + val shutdown: t -> input Lwt.t - val validate: Mempool_worker.t -> t -> input -> output tzresult Lwt.t - - (* For special use when a bunch of operations is not attributed to a specific - * peer worker. E.g., for injecting operations, for recylcing known operations - * from a previous protocol, etc. *) - val bypass_peer_workers: Mempool_worker.t -> input -> output Lwt.t + val validate: Mempool_worker.t -> t -> input -> unit tzresult Lwt.t end -module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_worker.Proto = struct +module Make (Mempool_worker: Mempool_worker.T) + : T with module Mempool_worker = Mempool_worker += struct (* 0. Prelude: set up base modules and types *) (* See interface file for info if needed. *) @@ -295,10 +285,10 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w end module Types = struct - type parameters = unit - type state = unit + type parameters = Mempool_worker.t + type state = { mempool_worker: Mempool_worker.t } type view = unit - let view () () = () + let view _ _ = () let encoding = Data_encoding.unit let pp _ _ = () end @@ -314,7 +304,8 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w type self = t - let on_launch _ _ () = Lwt.return_unit + let on_launch _ _ mempool_worker = + Lwt.return Types.{ mempool_worker } let on_request : type a. self -> a Request.t -> a tzresult Lwt.t = fun t (Request.Batch (mempool_worker, os)) -> @@ -342,17 +333,40 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w (* 4. Public interface: exporting a thin wrapper around workers and work. *) (* See interface file for documentation *) - let create limits peer_id = - Worker.launch table limits.worker_limits peer_id () (module Handlers) - let shutdown w = Worker.shutdown w - let validate mempool_worker t os = Worker.push_request_and_wait t (Request.Batch (mempool_worker, os)) + >>=? fun (_: output) -> return_unit - let bypass_peer_workers mempool_worker input = - Log.lwt_log_info "Bypassing workers to work on: %a" pp_input input >>= fun () -> - Work.work mempool_worker input >>= fun output -> - Log.lwt_log_info "Finished work" >>= fun () -> - Lwt.return output + let create limits peer_id mempool_worker input = + Worker.launch table limits.worker_limits peer_id mempool_worker (module Handlers) >>= fun w -> + validate mempool_worker w input >>= fun (_: unit tzresult) -> + (* NOTE: We ignore errors here. The validation of the [input] is only for + * recycling purposes and is not essential. *) + Lwt.return w + + let shutdown w = + let recycled = Operation_hash.Set.empty in + let recycled = + List.fold_left + (fun recycled (_, input) -> + List.fold_left + (fun recycled op_h -> Operation_hash.Set.add op_h recycled) + recycled + input) + recycled + (Worker.pending_requests w) + in + let recycled = + match Worker.current_request w with + | Some (_, _, input) -> + List.fold_left + (fun recycled op_h -> Operation_hash.Set.add op_h recycled) + recycled + input + | None -> recycled + in + let input = Operation_hash.Set.elements recycled in + Worker.shutdown w >>= fun () -> + Lwt.return input end diff --git a/src/lib_shell/mempool_peer_worker.mli b/src/lib_shell/mempool_peer_worker.mli index 016237516..c16c21e43 100644 --- a/src/lib_shell/mempool_peer_worker.mli +++ b/src/lib_shell/mempool_peer_worker.mli @@ -30,8 +30,7 @@ type limits = { } module type T = sig - module Proto: Registered_protocol.T - module Mempool_worker: Mempool_worker.T with module Proto = Proto + module Mempool_worker: Mempool_worker.T (** The type of a peer worker. Each peer worker should be used for treating * all the operations from a given peer. *) @@ -44,45 +43,25 @@ module type T = sig * their validity before gossiping them furhter. *) type input = Operation_hash.t list - (** [result] are the different possible outcome of the validation of a single - * operation. It signals either errors in the validation process, or results - * from further down the validation system. *) - type result = - | Cannot_download of error list - | Cannot_parse of error list - | Cannot_validate of error list - | Mempool_result of Mempool_worker.result + (** [create limits peer_id mempool_worker input] creates a peer worker meant + * to be used for validating batches of operations sent by the peer [peer_id]. + * The [mempool_worker] the underlying worker that individual validations of + * singular operations are delegated to. The [input[] argument is for recycled + * operations that are carried over when the protocol updates. *) + val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> input -> t Lwt.t - (** [output] are the outcome of the validation of a batch of operations. *) - type output = result Operation_hash.Map.t - - (** [create limits peer_id] creates a peer worker meant to be used for - * validating operations sent by the peer [peer_id]. *) - val create: limits -> P2p_peer.Id.t -> t Lwt.t - - (** [shutdown t] closes the peer worker [t]. *) - val shutdown: t -> unit Lwt.t + (** [shutdown t] closes the peer worker [t]. It returns a list of operation + * hashes that can be recycled when a new worker is created for the same peer. + * *) + val shutdown: t -> input Lwt.t (** [validate mempool_worker worker input] validates the batch of operations * [input]. The work is performed by [worker] and the underlying validation of * each operation is performed by [mempool_worker]. *) - val validate: Mempool_worker.t -> t -> input -> output tzresult Lwt.t - - (** [bypass_peer_workers mempool_worker input] validates the batch of - * operations [input]. Unlike [validate] above, the work is not performed by a - * specific worker. - * - * This is intended to be used for cases where the [input] cannot be - * attributed to a specific peer. Typically, this happens when injecting an - * operation from the local client, or when recycling pending operations after - * a protocol change. - * - * Note that, unlike [validate], this bypasses the worker mechanics entirely. - * As a result, there is no possible introspection and the work from spearate - * calls to [bypass_peer_workers] is not sequentialised. *) - val bypass_peer_workers: Mempool_worker.t -> input -> output Lwt.t + val validate: Mempool_worker.t -> t -> input -> unit tzresult Lwt.t end -module Make (Mempool_worker : Mempool_worker.T) : T with module Proto = Mempool_worker.Proto +module Make (Mempool_worker : Mempool_worker.T) + : T with module Mempool_worker = Mempool_worker