Utils: introduce Lwt_dropbox.

This is a blocking "dropbox" containing a single element. Writing in
the dropbox is a non-blocking operation, that might overwrite the
current element. Reading in the dropbox is blocking while the
'dropbox' is empty.
This commit is contained in:
Grégoire Henry 2017-11-11 03:34:12 +01:00 committed by Benjamin Canou
parent f63c5acbf5
commit f0acd2e4da
2 changed files with 139 additions and 0 deletions

93
src/utils/lwt_dropbox.ml Normal file
View File

@ -0,0 +1,93 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open Lwt.Infix
exception Closed
type 'a t =
{ mutable data : 'a option ;
mutable closed : bool ;
mutable put_waiter : unit Lwt.u option ;
}
let create () =
{ data = None ;
closed = false ;
put_waiter = None ;
}
let notify_put dropbox =
match dropbox.put_waiter with
| None -> ()
| Some w ->
dropbox.put_waiter <- None ;
Lwt.wakeup_later w ()
let rec put dropbox elt =
if dropbox.closed then
raise Closed
else begin
dropbox.data <- Some elt ;
notify_put dropbox
end
let peek dropbox = dropbox.data
let close dropbox =
if not dropbox.closed then begin
dropbox.closed <- true ;
notify_put dropbox ;
end
let wait_put ~timeout dropbox =
match dropbox.put_waiter with
| Some w ->
Lwt.choose [
timeout ;
Lwt.protected (Lwt.waiter_of_wakener w)
]
| None ->
let waiter, wakener = Lwt.wait () in
dropbox.put_waiter <- Some wakener ;
Lwt.choose [
timeout ;
Lwt.protected waiter ;
]
let rec take dropbox =
match dropbox.data with
| Some elt ->
dropbox.data <- None ;
Lwt.return elt
| None ->
if dropbox.closed then
Lwt.fail Closed
else
wait_put ~timeout:Lwt_utils.never_ending dropbox >>= fun () ->
take dropbox
let rec take_with_timeout timeout dropbox =
match dropbox.data with
| Some elt ->
Lwt.cancel timeout ;
dropbox.data <- None ;
Lwt.return (Some elt)
| None ->
if Lwt.is_sleeping timeout then
if dropbox.closed then
Lwt.fail Closed
else
wait_put ~timeout dropbox >>= fun () ->
take_with_timeout timeout dropbox
else
Lwt.return_none
let take_with_timeout timeout dropbox =
take_with_timeout (Lwt_unix.sleep timeout) dropbox

46
src/utils/lwt_dropbox.mli Normal file
View File

@ -0,0 +1,46 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(** A 'dropbox' with a single element. *)
type 'a t
(** Type of dropbox holding a value of type ['a] *)
val create: unit -> 'a t
(** Create an empty dropbox. *)
val put: 'a t -> 'a -> unit
(** Put an element inside the dropbox. If the dropbox was already
containing an element, the old element is replaced by the new one.
The function might return [Closed] if the dropbox has been closed
with [close]. *)
val take: 'a t -> 'a Lwt.t
(** Wait until the dropbox contains an element, then returns the elements.
The elements is removed from the dropbox. The function might return
[Closed] if the dropbox is empty and closed. *)
val take_with_timeout: float -> 'a t -> 'a option Lwt.t
(** Like [take] except that it returns [None] after [timeout seconds]
if the dropbox is still empty. *)
val peek: 'a t -> 'a option
(** Read the current element of the dropbox without removing it. It
immediatly returns [None] if the dropbox is empty. *)
exception Closed
(** The exception returned when trying to access a 'closed' dropbox. *)
val close: 'a t -> unit
(** Close the dropox. It terminates all the waiting reader with the
exception [Closed]. All further read or write will also immediatly
fail with [Closed], except if the dropbox is not empty when
[close] is called. In that can, a single (and last) [take] will
succeed. *)