Shell/worker: export functor result signature

This commit is contained in:
Raphaël Proust 2018-10-10 14:05:37 +08:00 committed by Grégoire Henry
parent 8b8b355bd3
commit 129caccf4e
No known key found for this signature in database
GPG Key ID: 50D984F20BD445D2
2 changed files with 187 additions and 6 deletions

View File

@ -57,12 +57,186 @@ module type TYPES = sig
val pp : Format.formatter -> view -> unit val pp : Format.formatter -> view -> unit
end end
module type T = sig
module Name: NAME
module Event: EVENT
module Request: REQUEST
module Types: TYPES
(** A handle to a specific worker, parameterized by the type of
internal message buffer. *)
type 'kind t
(** A handle to a table of workers. *)
type 'kind table
(** Internal buffer kinds used as parameters to {!t}. *)
type 'a queue and bounded and infinite
type dropbox
(** Supported kinds of internal buffers. *)
type _ buffer_kind =
| Queue : infinite queue buffer_kind
| Bounded : { size : int } -> bounded queue buffer_kind
| Dropbox :
{ merge : (dropbox t ->
any_request ->
any_request option ->
any_request option) }
-> dropbox buffer_kind
and any_request = Any_request : _ Request.t -> any_request
(** Create a table of workers. *)
val create_table : 'kind buffer_kind -> 'kind table
(** An error returned when trying to communicate with a worker that
has been closed. *)
type Error_monad.error += Closed of Name.t
(** The callback handlers specific to each worker instance. *)
module type HANDLERS = sig
(** Placeholder replaced with {!t} with the right parameters
provided by the type of buffer chosen at {!launch}.*)
type self
(** Builds the initial internal state of a worker at launch.
It is possible to initialize the message queue.
Of course calling {!state} will fail at that point. *)
val on_launch :
self -> Name.t -> Types.parameters -> Types.state Lwt.t
(** The main request processor, i.e. the body of the event loop. *)
val on_request :
self -> 'a Request.t -> 'a tzresult Lwt.t
(** Called when no request has been made before the timeout, if
the parameter has been passed to {!launch}. *)
val on_no_request :
self -> unit tzresult Lwt.t
(** A function called when terminating a worker. *)
val on_close :
self -> unit Lwt.t
(** A function called at the end of the worker loop in case of an
abnormal error. This function can handle the error by
returning [Ok ()], or leave the default unexpected error
behaviour by returning its parameter. A possibility is to
handle the error for ad-hoc logging, and still use
{!trigger_shutdown} to kill the worker. *)
val on_error :
self ->
Request.view ->
Worker_types.request_status ->
error list ->
unit tzresult Lwt.t
(** A function called at the end of the worker loop in case of a
successful treatment of the current request. *)
val on_completion :
self ->
'a Request.t -> 'a ->
Worker_types.request_status ->
unit Lwt.t
end
(** Creates a new worker instance.
Parameter [queue_size] not passed means unlimited queue. *)
val launch :
'kind table -> ?timeout:float ->
Worker_types.limits -> Name.t -> Types.parameters ->
(module HANDLERS with type self = 'kind t) ->
'kind t Lwt.t
(** Triggers a worker termination and waits for its completion.
Cannot be called from within the handlers. *)
val shutdown :
_ t -> unit Lwt.t
(** Adds a message to the queue and waits for its result.
Cannot be called from within the handlers. *)
val push_request_and_wait :
_ queue t -> 'a Request.t -> 'a tzresult Lwt.t
(** Adds a message to the queue. *)
val push_request :
_ queue t -> 'a Request.t -> unit Lwt.t
(** Adds a message to the queue immediately.
Returns [false] if the queue is full. *)
val try_push_request_now :
bounded queue t -> 'a Request.t -> bool
(** Adds a message to the queue immediately. *)
val push_request_now :
infinite queue t -> 'a Request.t -> unit
(** Sets the current request. *)
val drop_request :
dropbox t -> 'a Request.t -> unit
(** Detects cancelation from within the request handler to stop
asynchronous operations. *)
val protect :
_ t ->
?on_error: (error list -> 'b tzresult Lwt.t) ->
(unit -> 'b tzresult Lwt.t) ->
'b tzresult Lwt.t
(** Exports the canceler to allow cancelation of other tasks when this
worker is shutdowned or when it dies. *)
val canceler : _ t -> Lwt_canceler.t
(** Triggers a worker termination. *)
val trigger_shutdown : _ t -> unit
(** Recod an event in the backlog. *)
val record_event : _ t -> Event.t -> unit
(** Record an event and make sure it is logged. *)
val log_event : _ t -> Event.t -> unit Lwt.t
(** Access the internal state, once initialized. *)
val state : _ t -> Types.state
(** Access the event backlog. *)
val last_events : _ t -> (Logging.level * Event.t list) list
(** Introspect the message queue, gives the times requests were pushed. *)
val pending_requests : _ queue t -> (Time.t * Request.view) list
(** Get the running status of a worker. *)
val status : _ t -> Worker_types.worker_status
(** Get the request being treated by a worker.
Gives the time the request was pushed, and the time its
treatment started. *)
val current_request : _ t -> (Time.t * Time.t * Request.view) option
(** Introspect the state of a worker. *)
val view : _ t -> Types.view
(** Lists the running workers in this group.
After they are killed, workers are kept in the table
for a number of seconds given in the {!Worker_types.limits}. *)
val list : 'a table -> (Name.t * 'a t) list
end
module Make module Make
(Name : NAME) (Name : NAME)
(Event : EVENT) (Event : EVENT)
(Request : REQUEST) (Request : REQUEST)
(Types : TYPES) = struct (Types : TYPES) = struct
module Name = Name
module Event = Event
module Request = Request
module Types = Types
let base_name = String.concat "." Name.base let base_name = String.concat "." Name.base
type message = Message: 'a Request.t * 'a tzresult Lwt.u option -> message type message = Message: 'a Request.t * 'a tzresult Lwt.u option -> message

View File

@ -122,11 +122,12 @@ end
At that point, all the types are fixed and introspectable, At that point, all the types are fixed and introspectable,
but the actual parameters and event handlers can be tweaked but the actual parameters and event handlers can be tweaked
for each individual worker. *) for each individual worker. *)
module Make module type T = sig
(Name : NAME)
(Event : EVENT) module Name: NAME
(Request : REQUEST) module Event: EVENT
(Types : TYPES) : sig module Request: REQUEST
module Types: TYPES
(** A handle to a specific worker, parameterized by the type of (** A handle to a specific worker, parameterized by the type of
internal message buffer. *) internal message buffer. *)
@ -288,5 +289,11 @@ module Make
After they are killed, workers are kept in the table After they are killed, workers are kept in the table
for a number of seconds given in the {!Worker_types.limits}. *) for a number of seconds given in the {!Worker_types.limits}. *)
val list : 'a table -> (Name.t * 'a t) list val list : 'a table -> (Name.t * 'a t) list
end end
module Make (Name : NAME) (Event : EVENT) (Request : REQUEST) (Types : TYPES)
: T with module Name = Name
and module Event = Event
and module Request = Request
and module Types = Types