Shell: introduce Lwt_pipe
This commit is contained in:
parent
fdff344989
commit
26c84de550
@ -111,6 +111,7 @@ UTILS_LIB_INTFS := \
|
||||
utils/error_monad.mli \
|
||||
utils/logging.mli \
|
||||
utils/lwt_utils.mli \
|
||||
utils/lwt_pipe.mli \
|
||||
utils/IO.mli \
|
||||
|
||||
UTILS_LIB_IMPLS := \
|
||||
@ -128,6 +129,7 @@ UTILS_LIB_IMPLS := \
|
||||
utils/error_monad.ml \
|
||||
utils/logging.ml \
|
||||
utils/lwt_utils.ml \
|
||||
utils/lwt_pipe.ml \
|
||||
utils/IO.ml \
|
||||
|
||||
UTILS_PACKAGES := \
|
||||
|
105
src/utils/lwt_pipe.ml
Normal file
105
src/utils/lwt_pipe.ml
Normal file
@ -0,0 +1,105 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2016. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
open Lwt.Infix
|
||||
|
||||
type 'a t =
|
||||
{ queue : 'a Queue.t ;
|
||||
size : int ;
|
||||
mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ;
|
||||
mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option }
|
||||
|
||||
let create ~size =
|
||||
{ queue = Queue.create () ;
|
||||
size ;
|
||||
push_waiter = None ;
|
||||
pop_waiter = None }
|
||||
|
||||
let notify_push q =
|
||||
match q.push_waiter with
|
||||
| None -> ()
|
||||
| Some (_, w) ->
|
||||
q.push_waiter <- None ;
|
||||
Lwt.wakeup_later w ()
|
||||
|
||||
let notify_pop q =
|
||||
match q.pop_waiter with
|
||||
| None -> ()
|
||||
| Some (_, w) ->
|
||||
q.pop_waiter <- None ;
|
||||
Lwt.wakeup_later w ()
|
||||
|
||||
let wait_push q =
|
||||
match q.push_waiter with
|
||||
| Some (t, _) -> t
|
||||
| None ->
|
||||
let waiter, wakener = Lwt.wait () in
|
||||
q.push_waiter <- Some (waiter, wakener) ;
|
||||
waiter
|
||||
|
||||
let wait_pop q =
|
||||
match q.pop_waiter with
|
||||
| Some (t, _) -> t
|
||||
| None ->
|
||||
let waiter, wakener = Lwt.wait () in
|
||||
q.pop_waiter <- Some (waiter, wakener) ;
|
||||
waiter
|
||||
|
||||
let rec push ({ queue ; size } as q) elt =
|
||||
if Queue.length queue < size then begin
|
||||
Queue.push elt queue ;
|
||||
notify_push q ;
|
||||
Lwt.return_unit
|
||||
end else
|
||||
wait_pop q >>= fun () ->
|
||||
push q elt
|
||||
|
||||
let rec push_now ({ queue; size } as q) elt =
|
||||
Queue.length queue < size && begin
|
||||
Queue.push elt queue ;
|
||||
notify_push q ;
|
||||
true
|
||||
end
|
||||
|
||||
let rec pop ({ queue } as q) =
|
||||
if not (Queue.is_empty queue) then
|
||||
let elt = Queue.pop queue in
|
||||
notify_pop q ;
|
||||
Lwt.return elt
|
||||
else
|
||||
wait_push q >>= fun () ->
|
||||
pop q
|
||||
|
||||
let rec peek ({ queue } as q) =
|
||||
if not (Queue.is_empty queue) then
|
||||
let elt = Queue.peek queue in
|
||||
Lwt.return elt
|
||||
else
|
||||
wait_push q >>= fun () ->
|
||||
peek q
|
||||
|
||||
let pop_now_exn ({ queue } as q) =
|
||||
let elt = Queue.pop queue in
|
||||
notify_pop q ;
|
||||
elt
|
||||
|
||||
let pop_now q =
|
||||
match pop_now_exn q with
|
||||
| exception Queue.Empty -> None
|
||||
| elt -> Some elt
|
||||
|
||||
let length { queue } = Queue.length queue
|
||||
let is_empty { queue } = Queue.is_empty queue
|
||||
|
||||
let rec values_available q =
|
||||
if is_empty q then
|
||||
wait_push q >>= fun () ->
|
||||
values_available q
|
||||
else
|
||||
Lwt.return_unit
|
54
src/utils/lwt_pipe.mli
Normal file
54
src/utils/lwt_pipe.mli
Normal file
@ -0,0 +1,54 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2016. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(** Data queues similar to the [Pipe] module in Jane Street's [Async]
|
||||
library. They are implemented with [Queue]s, limited in size, and
|
||||
use lwt primitives for concurrent access. *)
|
||||
|
||||
type 'a t
|
||||
(** Type of queues holding values of type ['a]. *)
|
||||
|
||||
val create : size:int -> 'a t
|
||||
(** [create ~size] is an empty queue that can hold max [size]
|
||||
elements. *)
|
||||
|
||||
val push : 'a t -> 'a -> unit Lwt.t
|
||||
(** [push q v] is a thread that blocks while [q] contains more
|
||||
than [size] elements, then adds [v] at the end of [q]. *)
|
||||
|
||||
val pop : 'a t -> 'a Lwt.t
|
||||
(** [pop q] is a thread that blocks while [q] is empty, then
|
||||
removes and returns the first element in [q]. *)
|
||||
|
||||
val peek : 'a t -> 'a Lwt.t
|
||||
(** [peek] is like [pop] except it does not removes the first
|
||||
element. *)
|
||||
|
||||
val values_available : 'a t -> unit Lwt.t
|
||||
(** [values_available] is like [peek] but it ignores the value
|
||||
returned. *)
|
||||
|
||||
val push_now : 'a t -> 'a -> bool
|
||||
(** [push_now q v] adds [v] at the ends of [q] immediately and returns
|
||||
[false] if [q] is currently full, [true] otherwise. *)
|
||||
|
||||
val pop_now : 'a t -> 'a option
|
||||
(** [pop_now q] maybe removes and returns the first element in [q] if
|
||||
[q] contains at least one element. *)
|
||||
|
||||
val pop_now_exn : 'a t -> 'a
|
||||
(** [pop_now_exn q] removes and returns the first element in [q] if
|
||||
[q] contains at least one element, or raise [Empty] otherwise. *)
|
||||
|
||||
val length : 'a t -> int
|
||||
(** [length q] is the number of elements in [q]. *)
|
||||
|
||||
val is_empty : 'a t -> bool
|
||||
(** [is_empty q] is [true] if [q] is empty, [false] otherwise. *)
|
||||
|
Loading…
Reference in New Issue
Block a user