From c66623d0a7571978b290934739d50ae4e8c6bb18 Mon Sep 17 00:00:00 2001 From: Pietro Abate Date: Tue, 27 Nov 2018 16:07:23 +0100 Subject: [PATCH] Worker: register Closed error globally --- src/lib_shell/block_validator.ml | 1 - src/lib_shell/block_validator.mli | 2 -- src/lib_shell/worker.ml | 56 ++++++++++++++++++------------- src/lib_shell/worker.mli | 9 ++--- 4 files changed, 37 insertions(+), 31 deletions(-) diff --git a/src/lib_shell/block_validator.ml b/src/lib_shell/block_validator.ml index c3e0a497d..98cb7b06b 100644 --- a/src/lib_shell/block_validator.ml +++ b/src/lib_shell/block_validator.ml @@ -75,7 +75,6 @@ end module Worker = Worker.Make (Name) (Event) (Request) (Types) type t = Worker.infinite Worker.queue Worker.t -type error += Closed = Worker.Closed let debug w = Format.kasprintf (fun msg -> Worker.record_event w (Debug msg)) diff --git a/src/lib_shell/block_validator.mli b/src/lib_shell/block_validator.mli index 3184a0845..88d231680 100644 --- a/src/lib_shell/block_validator.mli +++ b/src/lib_shell/block_validator.mli @@ -34,8 +34,6 @@ type limits = { type validator_kind = | Internal of Context.index -type error += Closed of unit - val create: limits -> Distributed_db.t -> validator_kind -> t tzresult Lwt.t diff --git a/src/lib_shell/worker.ml b/src/lib_shell/worker.ml index 1c41211ae..344e9563e 100644 --- a/src/lib_shell/worker.ml +++ b/src/lib_shell/worker.ml @@ -58,6 +58,33 @@ module type TYPES = sig val pp : Format.formatter -> view -> unit end +(** An error returned when trying to communicate with a worker that + has been closed.*) +type worker_name = {base: string; name:string} +type Error_monad.error += Closed of worker_name + +let () = + register_error_kind `Permanent + ~id:("worker.closed") + ~title:("Worker closed") + ~description: + ("An operation on a worker could not complete \ + before it was shut down.") + ~pp: (fun ppf w -> + Format.fprintf ppf + "Worker %s[%s] has been shut down." + w.base w.name) + Data_encoding.( + conv + (fun { base ; name } -> (base,name)) + (fun (name,base) -> { base ; name }) + (obj1 + (req "worker" (tup2 string string)) + ) + ) + (function Closed w -> Some w | _ -> None) + (fun w -> Closed w) + module type T = sig module Name: NAME @@ -92,10 +119,6 @@ module type T = sig (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table - (** An error returned when trying to communicate with a worker that - has been closed. *) - type Error_monad.error += Closed of Name.t - (** The callback handlers specific to each worker instance. *) module type HANDLERS = sig @@ -284,24 +307,6 @@ module Make zombies : (int, 'kind t) Hashtbl.t } - type error += Closed of Name.t - - let () = - register_error_kind `Permanent - ~id:("worker." ^ base_name ^ ".closed") - ~title:("Worker " ^ base_name ^ " closed") - ~description: - ("An operation on a " ^ base_name ^ - " worker could not complete \ - before it was shut down.") - ~pp: (fun ppf name -> - Format.fprintf ppf - "Worker %s[%a] has been shut down." - base_name Name.pp name) - Data_encoding.(obj1 (req "worker_id" Name.encoding)) - (function Closed chain_id -> Some chain_id | _ -> None) - (fun chain_id -> Closed chain_id) - let queue_item ?u r = Time.now (), Message (r, u) @@ -348,13 +353,16 @@ module Make Lwt_pipe.push message_queue (queue_item ~u request) >>= fun () -> t) (function - | Lwt_pipe.Closed -> fail (Closed w.name) + | Lwt_pipe.Closed -> + let name = Format.asprintf "%a" Name.pp w.name in + fail (Closed {base=base_name; name}) | exn -> fail (Exn exn)) let close (type a) (w : a t) = let wakeup = function | _, Message (_, Some u) -> - Lwt.wakeup_later u (Error [ Closed w.name ]) + let name = Format.asprintf "%a" Name.pp w.name in + Lwt.wakeup_later u (Error [ Closed {base=base_name; name} ]) | _ -> () in let close_queue message_queue = let messages = Lwt_pipe.pop_all_now message_queue in diff --git a/src/lib_shell/worker.mli b/src/lib_shell/worker.mli index b03606938..54288e623 100644 --- a/src/lib_shell/worker.mli +++ b/src/lib_shell/worker.mli @@ -119,6 +119,11 @@ end (** {2 Worker group maker} *) +(** An error returned when trying to communicate with a worker that + has been closed. *) +type worker_name = {base: string; name:string} +type Error_monad.error += Closed of worker_name + (** Functor to build a group of workers. At that point, all the types are fixed and introspectable, but the actual parameters and event handlers can be tweaked @@ -157,10 +162,6 @@ module type T = sig (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table - (** An error returned when trying to communicate with a worker that - has been closed. *) - type Error_monad.error += Closed of Name.t - (** The callback handlers specific to each worker instance. *) module type HANDLERS = sig