Worker: register Closed error globally

This commit is contained in:
Pietro Abate 2018-11-27 16:07:23 +01:00 committed by Raphaël Proust
parent e46b0746d5
commit c66623d0a7
4 changed files with 37 additions and 31 deletions

View File

@ -75,7 +75,6 @@ end
module Worker = Worker.Make (Name) (Event) (Request) (Types)
type t = Worker.infinite Worker.queue Worker.t
type error += Closed = Worker.Closed
let debug w =
Format.kasprintf (fun msg -> Worker.record_event w (Debug msg))

View File

@ -34,8 +34,6 @@ type limits = {
type validator_kind =
| Internal of Context.index
type error += Closed of unit
val create:
limits -> Distributed_db.t -> validator_kind ->
t tzresult Lwt.t

View File

@ -58,6 +58,33 @@ module type TYPES = sig
val pp : Format.formatter -> view -> unit
end
(** An error returned when trying to communicate with a worker that
has been closed.*)
type worker_name = {base: string; name:string}
type Error_monad.error += Closed of worker_name
let () =
register_error_kind `Permanent
~id:("worker.closed")
~title:("Worker closed")
~description:
("An operation on a worker could not complete \
before it was shut down.")
~pp: (fun ppf w ->
Format.fprintf ppf
"Worker %s[%s] has been shut down."
w.base w.name)
Data_encoding.(
conv
(fun { base ; name } -> (base,name))
(fun (name,base) -> { base ; name })
(obj1
(req "worker" (tup2 string string))
)
)
(function Closed w -> Some w | _ -> None)
(fun w -> Closed w)
module type T = sig
module Name: NAME
@ -92,10 +119,6 @@ module type T = sig
(** Create a table of workers. *)
val create_table : 'kind buffer_kind -> 'kind table
(** An error returned when trying to communicate with a worker that
has been closed. *)
type Error_monad.error += Closed of Name.t
(** The callback handlers specific to each worker instance. *)
module type HANDLERS = sig
@ -284,24 +307,6 @@ module Make
zombies : (int, 'kind t) Hashtbl.t
}
type error += Closed of Name.t
let () =
register_error_kind `Permanent
~id:("worker." ^ base_name ^ ".closed")
~title:("Worker " ^ base_name ^ " closed")
~description:
("An operation on a " ^ base_name ^
" worker could not complete \
before it was shut down.")
~pp: (fun ppf name ->
Format.fprintf ppf
"Worker %s[%a] has been shut down."
base_name Name.pp name)
Data_encoding.(obj1 (req "worker_id" Name.encoding))
(function Closed chain_id -> Some chain_id | _ -> None)
(fun chain_id -> Closed chain_id)
let queue_item ?u r =
Time.now (),
Message (r, u)
@ -348,13 +353,16 @@ module Make
Lwt_pipe.push message_queue (queue_item ~u request) >>= fun () ->
t)
(function
| Lwt_pipe.Closed -> fail (Closed w.name)
| Lwt_pipe.Closed ->
let name = Format.asprintf "%a" Name.pp w.name in
fail (Closed {base=base_name; name})
| exn -> fail (Exn exn))
let close (type a) (w : a t) =
let wakeup = function
| _, Message (_, Some u) ->
Lwt.wakeup_later u (Error [ Closed w.name ])
let name = Format.asprintf "%a" Name.pp w.name in
Lwt.wakeup_later u (Error [ Closed {base=base_name; name} ])
| _ -> () in
let close_queue message_queue =
let messages = Lwt_pipe.pop_all_now message_queue in

View File

@ -119,6 +119,11 @@ end
(** {2 Worker group maker} *)
(** An error returned when trying to communicate with a worker that
has been closed. *)
type worker_name = {base: string; name:string}
type Error_monad.error += Closed of worker_name
(** Functor to build a group of workers.
At that point, all the types are fixed and introspectable,
but the actual parameters and event handlers can be tweaked
@ -157,10 +162,6 @@ module type T = sig
(** Create a table of workers. *)
val create_table : 'kind buffer_kind -> 'kind table
(** An error returned when trying to communicate with a worker that
has been closed. *)
type Error_monad.error += Closed of Name.t
(** The callback handlers specific to each worker instance. *)
module type HANDLERS = sig