diff --git a/src/lib_shell/worker.ml b/src/lib_shell/worker.ml index 497f61b76..8ec17ea09 100644 --- a/src/lib_shell/worker.ml +++ b/src/lib_shell/worker.ml @@ -57,12 +57,186 @@ module type TYPES = sig val pp : Format.formatter -> view -> unit end +module type T = sig + + module Name: NAME + module Event: EVENT + module Request: REQUEST + module Types: TYPES + + (** A handle to a specific worker, parameterized by the type of + internal message buffer. *) + type 'kind t + + (** A handle to a table of workers. *) + type 'kind table + + (** Internal buffer kinds used as parameters to {!t}. *) + type 'a queue and bounded and infinite + type dropbox + + + (** Supported kinds of internal buffers. *) + type _ buffer_kind = + | Queue : infinite queue buffer_kind + | Bounded : { size : int } -> bounded queue buffer_kind + | Dropbox : + { merge : (dropbox t -> + any_request -> + any_request option -> + any_request option) } + -> dropbox buffer_kind + and any_request = Any_request : _ Request.t -> any_request + + (** Create a table of workers. *) + val create_table : 'kind buffer_kind -> 'kind table + + (** An error returned when trying to communicate with a worker that + has been closed. *) + type Error_monad.error += Closed of Name.t + + (** The callback handlers specific to each worker instance. *) + module type HANDLERS = sig + + (** Placeholder replaced with {!t} with the right parameters + provided by the type of buffer chosen at {!launch}.*) + type self + + (** Builds the initial internal state of a worker at launch. + It is possible to initialize the message queue. + Of course calling {!state} will fail at that point. *) + val on_launch : + self -> Name.t -> Types.parameters -> Types.state Lwt.t + + (** The main request processor, i.e. the body of the event loop. *) + val on_request : + self -> 'a Request.t -> 'a tzresult Lwt.t + + (** Called when no request has been made before the timeout, if + the parameter has been passed to {!launch}. *) + val on_no_request : + self -> unit tzresult Lwt.t + + (** A function called when terminating a worker. *) + val on_close : + self -> unit Lwt.t + + (** A function called at the end of the worker loop in case of an + abnormal error. This function can handle the error by + returning [Ok ()], or leave the default unexpected error + behaviour by returning its parameter. A possibility is to + handle the error for ad-hoc logging, and still use + {!trigger_shutdown} to kill the worker. *) + val on_error : + self -> + Request.view -> + Worker_types.request_status -> + error list -> + unit tzresult Lwt.t + + (** A function called at the end of the worker loop in case of a + successful treatment of the current request. *) + val on_completion : + self -> + 'a Request.t -> 'a -> + Worker_types.request_status -> + unit Lwt.t + + end + + (** Creates a new worker instance. + Parameter [queue_size] not passed means unlimited queue. *) + val launch : + 'kind table -> ?timeout:float -> + Worker_types.limits -> Name.t -> Types.parameters -> + (module HANDLERS with type self = 'kind t) -> + 'kind t Lwt.t + + (** Triggers a worker termination and waits for its completion. + Cannot be called from within the handlers. *) + val shutdown : + _ t -> unit Lwt.t + + (** Adds a message to the queue and waits for its result. + Cannot be called from within the handlers. *) + val push_request_and_wait : + _ queue t -> 'a Request.t -> 'a tzresult Lwt.t + + (** Adds a message to the queue. *) + val push_request : + _ queue t -> 'a Request.t -> unit Lwt.t + + (** Adds a message to the queue immediately. + Returns [false] if the queue is full. *) + val try_push_request_now : + bounded queue t -> 'a Request.t -> bool + + (** Adds a message to the queue immediately. *) + val push_request_now : + infinite queue t -> 'a Request.t -> unit + + (** Sets the current request. *) + val drop_request : + dropbox t -> 'a Request.t -> unit + + (** Detects cancelation from within the request handler to stop + asynchronous operations. *) + val protect : + _ t -> + ?on_error: (error list -> 'b tzresult Lwt.t) -> + (unit -> 'b tzresult Lwt.t) -> + 'b tzresult Lwt.t + + (** Exports the canceler to allow cancelation of other tasks when this + worker is shutdowned or when it dies. *) + val canceler : _ t -> Lwt_canceler.t + + (** Triggers a worker termination. *) + val trigger_shutdown : _ t -> unit + + (** Recod an event in the backlog. *) + val record_event : _ t -> Event.t -> unit + + (** Record an event and make sure it is logged. *) + val log_event : _ t -> Event.t -> unit Lwt.t + + (** Access the internal state, once initialized. *) + val state : _ t -> Types.state + + (** Access the event backlog. *) + val last_events : _ t -> (Logging.level * Event.t list) list + + (** Introspect the message queue, gives the times requests were pushed. *) + val pending_requests : _ queue t -> (Time.t * Request.view) list + + (** Get the running status of a worker. *) + val status : _ t -> Worker_types.worker_status + + (** Get the request being treated by a worker. + Gives the time the request was pushed, and the time its + treatment started. *) + val current_request : _ t -> (Time.t * Time.t * Request.view) option + + (** Introspect the state of a worker. *) + val view : _ t -> Types.view + + (** Lists the running workers in this group. + After they are killed, workers are kept in the table + for a number of seconds given in the {!Worker_types.limits}. *) + val list : 'a table -> (Name.t * 'a t) list +end + module Make (Name : NAME) (Event : EVENT) (Request : REQUEST) (Types : TYPES) = struct + module Name = Name + module Event = Event + module Request = Request + module Types = Types + let base_name = String.concat "." Name.base type message = Message: 'a Request.t * 'a tzresult Lwt.u option -> message diff --git a/src/lib_shell/worker.mli b/src/lib_shell/worker.mli index 76888f1cd..b35d7700b 100644 --- a/src/lib_shell/worker.mli +++ b/src/lib_shell/worker.mli @@ -122,11 +122,12 @@ end At that point, all the types are fixed and introspectable, but the actual parameters and event handlers can be tweaked for each individual worker. *) -module Make - (Name : NAME) - (Event : EVENT) - (Request : REQUEST) - (Types : TYPES) : sig +module type T = sig + + module Name: NAME + module Event: EVENT + module Request: REQUEST + module Types: TYPES (** A handle to a specific worker, parameterized by the type of internal message buffer. *) @@ -288,5 +289,11 @@ module Make After they are killed, workers are kept in the table for a number of seconds given in the {!Worker_types.limits}. *) val list : 'a table -> (Name.t * 'a t) list - end + + +module Make (Name : NAME) (Event : EVENT) (Request : REQUEST) (Types : TYPES) + : T with module Name = Name + and module Event = Event + and module Request = Request + and module Types = Types