From 31242ebcb9d7fcb70d498aacf5212c1c98bd5216 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Fri, 16 Nov 2018 14:34:31 +0800 Subject: [PATCH] Shell/mempool: throttle peer workers --- src/lib_shell/mempool_peer_worker.ml | 53 +++++++++++++++++---------- src/lib_shell/mempool_peer_worker.mli | 3 +- 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/src/lib_shell/mempool_peer_worker.ml b/src/lib_shell/mempool_peer_worker.ml index 7f79aed1c..82bdc7b1d 100644 --- a/src/lib_shell/mempool_peer_worker.ml +++ b/src/lib_shell/mempool_peer_worker.ml @@ -27,6 +27,7 @@ * compartimentatilsation. *) type limits = { + max_promises_per_request : int ; worker_limits : Worker_types.limits ; } @@ -39,7 +40,7 @@ module type T = sig val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> t Lwt.t val shutdown: t -> input Lwt.t - val validate: Mempool_worker.t -> t -> input -> unit tzresult Lwt.t + val validate: t -> input -> unit tzresult Lwt.t end @@ -94,9 +95,10 @@ module Make (Mempool_worker: Mempool_worker.T) (* 1. Core: the carefully scheduled work performed by the worker *) module Work : sig - val work: Mempool_worker.t -> input -> output Lwt.t + val work: Mempool_worker.t -> int -> input -> output Lwt.t end = struct type t = { + pool: unit Lwt_pool.t; received: Operation_hash.t Queue.t; downloading: (Operation_hash.t * Operation.t tzresult Lwt.t) Queue.t; applying: (Mempool_worker.operation * Mempool_worker.result tzresult Lwt.t) Queue.t; @@ -141,8 +143,9 @@ module Make (Mempool_worker: Mempool_worker.T) List.iter (fun x -> Queue.add x q) l; q - let create op_hashes = + let create pool_size op_hashes = { + pool = Lwt_pool.create pool_size Lwt.return; received = q_of_list op_hashes; downloading = Queue.create (); applying = Queue.create (); @@ -186,7 +189,9 @@ module Make (Mempool_worker: Mempool_worker.T) record_result pipeline op_hash (Cannot_parse errs); Lwt.return_unit | Ok mop -> - let p = Mempool_worker.validate mempool_worker mop in + let p = + Lwt_pool.use pipeline.pool (fun () -> + Mempool_worker.validate mempool_worker mop) in Queue.push (mop, p) pipeline.applying; Lwt.return_unit end @@ -195,7 +200,9 @@ module Make (Mempool_worker: Mempool_worker.T) let op_hash = Queue.pop pipeline.received in (* TODO[?] should we specify the current peer for fetching? *) let chain_db = Mempool_worker.chain_db mempool_worker in - let p = Distributed_db.Operation.fetch chain_db op_hash () in + let p = + Lwt_pool.use pipeline.pool (fun () -> + Distributed_db.Operation.fetch chain_db op_hash ()) in Queue.push (op_hash, p) pipeline.downloading; Lwt.return_unit end @@ -205,12 +212,12 @@ module Make (Mempool_worker: Mempool_worker.T) select pipeline >>= fun () -> Lwt.return_unit - let work mempool_worker input = - let pipeline = create input in + let work mempool_worker pool_size input = + let pipeline = create pool_size input in let rec loop () = - if is_empty pipeline then begin + if is_empty pipeline then Lwt.return pipeline.results - end else + else step mempool_worker pipeline >>= fun () -> loop () in @@ -236,11 +243,11 @@ module Make (Mempool_worker: Mempool_worker.T) end module Request = struct - type 'a t = Batch : (Mempool_worker.t * input) -> output t + type 'a t = Batch : input -> output t type view = input let view : type a. a t -> view - = fun (Batch (_, os)) -> os + = fun (Batch os) -> os let encoding = let open Data_encoding in list Operation_hash.encoding @@ -302,8 +309,8 @@ module Make (Mempool_worker: Mempool_worker.T) end module Types = struct - type parameters = Mempool_worker.t - type state = { mempool_worker: Mempool_worker.t } + type parameters = Mempool_worker.t * int + type state = { mempool_worker: Mempool_worker.t ; pool_size: int } type view = unit let view _ _ = () let encoding = Data_encoding.unit @@ -321,13 +328,14 @@ module Make (Mempool_worker: Mempool_worker.T) type self = t - let on_launch _ _ mempool_worker = - Lwt.return Types.{ mempool_worker } + let on_launch _ _ (mempool_worker, pool_size) = + Lwt.return Types.{ mempool_worker; pool_size } let on_request : type a. self -> a Request.t -> a tzresult Lwt.t - = fun t (Request.Batch (mempool_worker, os)) -> + = fun t (Request.Batch os) -> + let st = Worker.state t in Worker.record_event t (Event.Start os) ; - Work.work mempool_worker os >>= fun r -> + Work.work st.mempool_worker st.pool_size os >>= fun r -> return r let on_no_request _ = return_unit @@ -352,12 +360,17 @@ module Make (Mempool_worker: Mempool_worker.T) (* 4. Public interface: exporting a thin wrapper around workers and work. *) (* See interface file for documentation *) - let validate mempool_worker t os = - Worker.push_request_and_wait t (Request.Batch (mempool_worker, os)) + let validate t os = + Worker.push_request_and_wait t (Request.Batch os) >>=? fun (_: output) -> return_unit let create limits peer_id mempool_worker = - Worker.launch table limits.worker_limits peer_id mempool_worker (module Handlers) + Worker.launch + table + limits.worker_limits + peer_id + (mempool_worker, limits.max_promises_per_request) + (module Handlers) let shutdown w = let recycled = Operation_hash.Set.empty in diff --git a/src/lib_shell/mempool_peer_worker.mli b/src/lib_shell/mempool_peer_worker.mli index 014037220..62dfa7a29 100644 --- a/src/lib_shell/mempool_peer_worker.mli +++ b/src/lib_shell/mempool_peer_worker.mli @@ -26,6 +26,7 @@ (** Distributing validation work between different workers, one for each peer. *) type limits = { + max_promises_per_request : int ; worker_limits : Worker_types.limits ; } @@ -57,7 +58,7 @@ module type T = sig (** [validate mempool_worker worker input] validates the batch of operations * [input]. The work is performed by [worker] and the underlying validation of * each operation is performed by [mempool_worker]. *) - val validate: Mempool_worker.t -> t -> input -> unit tzresult Lwt.t + val validate: t -> input -> unit tzresult Lwt.t end