From 26c84de550a4a06c3850891f183b92abc1a376bf Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Mon, 28 Nov 2016 21:46:26 +0100 Subject: [PATCH] Shell: introduce `Lwt_pipe` --- src/Makefile | 2 + src/utils/lwt_pipe.ml | 105 +++++++++++++++++++++++++++++++++++++++++ src/utils/lwt_pipe.mli | 54 +++++++++++++++++++++ 3 files changed, 161 insertions(+) create mode 100644 src/utils/lwt_pipe.ml create mode 100644 src/utils/lwt_pipe.mli diff --git a/src/Makefile b/src/Makefile index 316ca629d..f550b9fcf 100644 --- a/src/Makefile +++ b/src/Makefile @@ -111,6 +111,7 @@ UTILS_LIB_INTFS := \ utils/error_monad.mli \ utils/logging.mli \ utils/lwt_utils.mli \ + utils/lwt_pipe.mli \ utils/IO.mli \ UTILS_LIB_IMPLS := \ @@ -128,6 +129,7 @@ UTILS_LIB_IMPLS := \ utils/error_monad.ml \ utils/logging.ml \ utils/lwt_utils.ml \ + utils/lwt_pipe.ml \ utils/IO.ml \ UTILS_PACKAGES := \ diff --git a/src/utils/lwt_pipe.ml b/src/utils/lwt_pipe.ml new file mode 100644 index 000000000..f6348218a --- /dev/null +++ b/src/utils/lwt_pipe.ml @@ -0,0 +1,105 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Lwt.Infix + +type 'a t = + { queue : 'a Queue.t ; + size : int ; + mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ; + mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option } + +let create ~size = + { queue = Queue.create () ; + size ; + push_waiter = None ; + pop_waiter = None } + +let notify_push q = + match q.push_waiter with + | None -> () + | Some (_, w) -> + q.push_waiter <- None ; + Lwt.wakeup_later w () + +let notify_pop q = + match q.pop_waiter with + | None -> () + | Some (_, w) -> + q.pop_waiter <- None ; + Lwt.wakeup_later w () + +let wait_push q = + match q.push_waiter with + | Some (t, _) -> t + | None -> + let waiter, wakener = Lwt.wait () in + q.push_waiter <- Some (waiter, wakener) ; + waiter + +let wait_pop q = + match q.pop_waiter with + | Some (t, _) -> t + | None -> + let waiter, wakener = Lwt.wait () in + q.pop_waiter <- Some (waiter, wakener) ; + waiter + +let rec push ({ queue ; size } as q) elt = + if Queue.length queue < size then begin + Queue.push elt queue ; + notify_push q ; + Lwt.return_unit + end else + wait_pop q >>= fun () -> + push q elt + +let rec push_now ({ queue; size } as q) elt = + Queue.length queue < size && begin + Queue.push elt queue ; + notify_push q ; + true + end + +let rec pop ({ queue } as q) = + if not (Queue.is_empty queue) then + let elt = Queue.pop queue in + notify_pop q ; + Lwt.return elt + else + wait_push q >>= fun () -> + pop q + +let rec peek ({ queue } as q) = + if not (Queue.is_empty queue) then + let elt = Queue.peek queue in + Lwt.return elt + else + wait_push q >>= fun () -> + peek q + +let pop_now_exn ({ queue } as q) = + let elt = Queue.pop queue in + notify_pop q ; + elt + +let pop_now q = + match pop_now_exn q with + | exception Queue.Empty -> None + | elt -> Some elt + +let length { queue } = Queue.length queue +let is_empty { queue } = Queue.is_empty queue + +let rec values_available q = + if is_empty q then + wait_push q >>= fun () -> + values_available q + else + Lwt.return_unit diff --git a/src/utils/lwt_pipe.mli b/src/utils/lwt_pipe.mli new file mode 100644 index 000000000..f880522d8 --- /dev/null +++ b/src/utils/lwt_pipe.mli @@ -0,0 +1,54 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** Data queues similar to the [Pipe] module in Jane Street's [Async] + library. They are implemented with [Queue]s, limited in size, and + use lwt primitives for concurrent access. *) + +type 'a t +(** Type of queues holding values of type ['a]. *) + +val create : size:int -> 'a t +(** [create ~size] is an empty queue that can hold max [size] + elements. *) + +val push : 'a t -> 'a -> unit Lwt.t +(** [push q v] is a thread that blocks while [q] contains more + than [size] elements, then adds [v] at the end of [q]. *) + +val pop : 'a t -> 'a Lwt.t +(** [pop q] is a thread that blocks while [q] is empty, then + removes and returns the first element in [q]. *) + +val peek : 'a t -> 'a Lwt.t +(** [peek] is like [pop] except it does not removes the first + element. *) + +val values_available : 'a t -> unit Lwt.t +(** [values_available] is like [peek] but it ignores the value + returned. *) + +val push_now : 'a t -> 'a -> bool +(** [push_now q v] adds [v] at the ends of [q] immediately and returns + [false] if [q] is currently full, [true] otherwise. *) + +val pop_now : 'a t -> 'a option +(** [pop_now q] maybe removes and returns the first element in [q] if + [q] contains at least one element. *) + +val pop_now_exn : 'a t -> 'a +(** [pop_now_exn q] removes and returns the first element in [q] if + [q] contains at least one element, or raise [Empty] otherwise. *) + +val length : 'a t -> int +(** [length q] is the number of elements in [q]. *) + +val is_empty : 'a t -> bool +(** [is_empty q] is [true] if [q] is empty, [false] otherwise. *) +