From f0acd2e4daa4551b7d4c559af3f34acbaf66efce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sat, 11 Nov 2017 03:34:12 +0100 Subject: [PATCH] Utils: introduce `Lwt_dropbox`. This is a blocking "dropbox" containing a single element. Writing in the dropbox is a non-blocking operation, that might overwrite the current element. Reading in the dropbox is blocking while the 'dropbox' is empty. --- src/utils/lwt_dropbox.ml | 93 +++++++++++++++++++++++++++++++++++++++ src/utils/lwt_dropbox.mli | 46 +++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 src/utils/lwt_dropbox.ml create mode 100644 src/utils/lwt_dropbox.mli diff --git a/src/utils/lwt_dropbox.ml b/src/utils/lwt_dropbox.ml new file mode 100644 index 000000000..2528ec756 --- /dev/null +++ b/src/utils/lwt_dropbox.ml @@ -0,0 +1,93 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Lwt.Infix + +exception Closed + +type 'a t = + { mutable data : 'a option ; + mutable closed : bool ; + mutable put_waiter : unit Lwt.u option ; + } + +let create () = + { data = None ; + closed = false ; + put_waiter = None ; + } + +let notify_put dropbox = + match dropbox.put_waiter with + | None -> () + | Some w -> + dropbox.put_waiter <- None ; + Lwt.wakeup_later w () + +let rec put dropbox elt = + if dropbox.closed then + raise Closed + else begin + dropbox.data <- Some elt ; + notify_put dropbox + end + +let peek dropbox = dropbox.data + +let close dropbox = + if not dropbox.closed then begin + dropbox.closed <- true ; + notify_put dropbox ; + end + +let wait_put ~timeout dropbox = + match dropbox.put_waiter with + | Some w -> + Lwt.choose [ + timeout ; + Lwt.protected (Lwt.waiter_of_wakener w) + ] + | None -> + let waiter, wakener = Lwt.wait () in + dropbox.put_waiter <- Some wakener ; + Lwt.choose [ + timeout ; + Lwt.protected waiter ; + ] + +let rec take dropbox = + match dropbox.data with + | Some elt -> + dropbox.data <- None ; + Lwt.return elt + | None -> + if dropbox.closed then + Lwt.fail Closed + else + wait_put ~timeout:Lwt_utils.never_ending dropbox >>= fun () -> + take dropbox + +let rec take_with_timeout timeout dropbox = + match dropbox.data with + | Some elt -> + Lwt.cancel timeout ; + dropbox.data <- None ; + Lwt.return (Some elt) + | None -> + if Lwt.is_sleeping timeout then + if dropbox.closed then + Lwt.fail Closed + else + wait_put ~timeout dropbox >>= fun () -> + take_with_timeout timeout dropbox + else + Lwt.return_none + +let take_with_timeout timeout dropbox = + take_with_timeout (Lwt_unix.sleep timeout) dropbox diff --git a/src/utils/lwt_dropbox.mli b/src/utils/lwt_dropbox.mli new file mode 100644 index 000000000..db8709c0e --- /dev/null +++ b/src/utils/lwt_dropbox.mli @@ -0,0 +1,46 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** A 'dropbox' with a single element. *) + +type 'a t +(** Type of dropbox holding a value of type ['a] *) + +val create: unit -> 'a t +(** Create an empty dropbox. *) + +val put: 'a t -> 'a -> unit +(** Put an element inside the dropbox. If the dropbox was already + containing an element, the old element is replaced by the new one. + The function might return [Closed] if the dropbox has been closed + with [close]. *) + +val take: 'a t -> 'a Lwt.t +(** Wait until the dropbox contains an element, then returns the elements. + The elements is removed from the dropbox. The function might return + [Closed] if the dropbox is empty and closed. *) + +val take_with_timeout: float -> 'a t -> 'a option Lwt.t +(** Like [take] except that it returns [None] after [timeout seconds] + if the dropbox is still empty. *) + +val peek: 'a t -> 'a option +(** Read the current element of the dropbox without removing it. It + immediatly returns [None] if the dropbox is empty. *) + +exception Closed +(** The exception returned when trying to access a 'closed' dropbox. *) + +val close: 'a t -> unit +(** Close the dropox. It terminates all the waiting reader with the + exception [Closed]. All further read or write will also immediatly + fail with [Closed], except if the dropbox is not empty when + [close] is called. In that can, a single (and last) [take] will + succeed. *) +