diff --git a/src/lib_error_monad/error_monad.mli b/src/lib_error_monad/error_monad.mli index b51894018..1a73f5f46 100644 --- a/src/lib_error_monad/error_monad.mli +++ b/src/lib_error_monad/error_monad.mli @@ -71,6 +71,18 @@ type error += Exn of exn type error += Canceled +(** [protect] is a wrapper around [Lwt.catch] where the error handler operates + over `error list` instead of `exn`. Besides, [protect ~on_error ~canceler ~f] + may *cancel* [f] via a [Lwt_canceler.t]. + + More precisely, [protect ~on_error ~canceler f] runs [f ()]. An Lwt failure + triggered by [f ()] is wrapped into an [Exn]. If a [canceler] is given and + [Lwt_canceler.cancelation canceler] is determined before [f ()], + a [Canceled] error is returned. + + Errors are caught by [~on_error] (if given), otherwise the previous value + is returned. An Lwt failure triggered by [~on_error] is wrapped into an + [Exn] *) val protect : ?on_error:(error list -> 'a tzresult Lwt.t) -> ?canceler:Lwt_canceler.t -> diff --git a/src/lib_stdlib/lwt_canceler.mli b/src/lib_stdlib/lwt_canceler.mli index a1e361bd6..8c23d03ac 100644 --- a/src/lib_stdlib/lwt_canceler.mli +++ b/src/lib_stdlib/lwt_canceler.mli @@ -23,9 +23,32 @@ (* *) (*****************************************************************************) +(** A [Canceler.t] is a three-states synchronization object with transitions + "waiting -> canceling -> canceled", starting in waiting state. A chain + of hooks can be attached to the canceler. Hooks are triggered when + switching to the canceling state. The canceler switches to canceled state + when the hooks have completed. *) + type t + +(** [create t] returns a canceler in waiting state. *) val create : unit -> t + +(** If [t] is in wait state, [cancel t] triggers the cancelation process: + 1. it switches to canceling state, + 2. executes the hooks sequentially in separate Lwt threads, + 3. waits for hooks execution to complete, + 4. switches to cancel state. + If [t] is in canceled state, [cancel t] is determined immediately. + If [t] is in canceling state, [cancel t] is determined at the end of the + cancelation process. *) val cancel : t -> unit Lwt.t + +(** [cancelation t] is determined when [t] is in canceling or canceled state. *) val cancelation : t -> unit Lwt.t + +(** [on_cancel t hook] adds [hook] to the end of the current chain. *) val on_cancel : t -> (unit -> unit Lwt.t) -> unit + +(** [canceled t] is [true] iff [t] is canceled or canceling. *) val canceled : t -> bool diff --git a/src/lib_stdlib/lwt_utils.mli b/src/lib_stdlib/lwt_utils.mli index ff124b6bb..437c7da2e 100644 --- a/src/lib_stdlib/lwt_utils.mli +++ b/src/lib_stdlib/lwt_utils.mli @@ -27,6 +27,8 @@ val may: f:('a -> unit Lwt.t) -> 'a option -> unit Lwt.t val never_ending: unit -> 'a Lwt.t +(** [worker name ~run ~cancel] runs worker [run], and logs worker + creation, ending or failure. [cancel] is called if worker fails. *) val worker: string -> run:(unit -> unit Lwt.t) -> @@ -34,8 +36,8 @@ val worker: unit Lwt.t val trigger: unit -> (unit -> unit) * (unit -> unit Lwt.t) -val sort: ('a -> 'a -> int Lwt.t) -> 'a list -> 'a list Lwt.t +val sort: ('a -> 'a -> int Lwt.t) -> 'a list -> 'a list Lwt.t val unless: bool -> (unit -> unit Lwt.t) -> unit Lwt.t diff --git a/src/lib_stdlib/lwt_watcher.mli b/src/lib_stdlib/lwt_watcher.mli index b384b2a20..73c94e0df 100644 --- a/src/lib_stdlib/lwt_watcher.mli +++ b/src/lib_stdlib/lwt_watcher.mli @@ -23,14 +23,32 @@ (* *) (*****************************************************************************) -(** {1 Notification callbacks} *) +(** This module implements a one-to-many publish/suscribe pattern. + + Clients can register/unregister to an [input]. Events notified to the input + (through [notify]) are dispatched asynchronously to all registered clients + through an [Lwt_stream]. A client receives only events sent after + registration and before unregistration. *) type 'a input -type stopper val create_input : unit -> 'a input -val shutdown_input : 'a input -> unit + +(** [notify t v] publishes value v to the input t *) val notify : 'a input -> 'a -> unit + +type stopper + +(** [create_stream t] registers a new client which can read published + values via a stream. A [stopper] is used to shutdown the client. *) val create_stream : 'a input -> 'a Lwt_stream.t * stopper + +(** A fake stream never receives any value. *) val create_fake_stream : unit -> 'a Lwt_stream.t * stopper + +(** [shutdown s] unregisters the client associated to [s]. [None] is pushed + to the stream. *) val shutdown : stopper -> unit + +(** Shutdowns all the clients of this input *) +val shutdown_input : 'a input -> unit