ligo/lib_stdlib_lwt/lwt_pipe.ml

191 lines
4.9 KiB
OCaml
Raw Normal View History

2016-11-29 00:46:26 +04:00
(**************************************************************************)
(* *)
2017-11-14 03:36:14 +04:00
(* Copyright (c) 2014 - 2017. *)
2016-11-29 00:46:26 +04:00
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open Lwt.Infix
type 'a t =
{ queue : (int * 'a) Queue.t ;
mutable current_size : int ;
max_size : int ;
compute_size : ('a -> int) ;
2017-01-14 16:13:30 +04:00
mutable closed : bool ;
2016-11-29 00:46:26 +04:00
mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ;
2017-01-14 16:13:30 +04:00
mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option ;
empty: unit Lwt_condition.t ;
}
2016-11-29 00:46:26 +04:00
let push_overhead = 4 * (Sys.word_size / 8)
2017-01-14 16:13:30 +04:00
let create ?size () =
2017-01-24 02:59:16 +04:00
let max_size, compute_size =
match size with
| None -> max_int, (fun _ -> 0)
| Some (max_size, compute_size) -> max_size, compute_size in
2016-11-29 00:46:26 +04:00
{ queue = Queue.create () ;
current_size = 0 ;
max_size ;
compute_size ;
2017-01-14 16:13:30 +04:00
closed = false ;
2016-11-29 00:46:26 +04:00
push_waiter = None ;
2017-01-14 16:13:30 +04:00
pop_waiter = None ;
empty = Lwt_condition.create () ;
}
2016-11-29 00:46:26 +04:00
let notify_push q =
match q.push_waiter with
| None -> ()
| Some (_, w) ->
q.push_waiter <- None ;
Lwt.wakeup_later w ()
let notify_pop q =
match q.pop_waiter with
| None -> ()
| Some (_, w) ->
q.pop_waiter <- None ;
Lwt.wakeup_later w ()
let wait_push q =
match q.push_waiter with
2017-01-14 16:13:30 +04:00
| Some (t, _) -> Lwt.protected t
2016-11-29 00:46:26 +04:00
| None ->
let waiter, wakener = Lwt.wait () in
q.push_waiter <- Some (waiter, wakener) ;
2017-01-14 16:13:30 +04:00
Lwt.protected waiter
2016-11-29 00:46:26 +04:00
let wait_pop q =
match q.pop_waiter with
2017-01-14 16:13:30 +04:00
| Some (t, _) -> Lwt.protected t
2016-11-29 00:46:26 +04:00
| None ->
let waiter, wakener = Lwt.wait () in
q.pop_waiter <- Some (waiter, wakener) ;
2017-01-14 16:13:30 +04:00
Lwt.protected waiter
2016-11-29 00:46:26 +04:00
let length { queue ; _ } = Queue.length queue
let is_empty { queue ; _ } = Queue.is_empty queue
2017-01-14 16:13:30 +04:00
let rec empty q =
if is_empty q
then Lwt.return_unit
else (Lwt_condition.wait q.empty >>= fun () -> empty q)
exception Closed
let rec push ({ closed ; queue ; current_size ;
max_size ; compute_size ; _ } as q) elt =
2017-01-24 02:59:16 +04:00
let elt_size = compute_size elt in
if closed then
Lwt.fail Closed
else if current_size + elt_size < max_size || Queue.is_empty queue then begin
Queue.push (elt_size, elt) queue ;
q.current_size <- current_size + elt_size ;
2016-11-29 00:46:26 +04:00
notify_push q ;
Lwt.return_unit
2017-01-24 02:59:16 +04:00
end else
2016-11-29 00:46:26 +04:00
wait_pop q >>= fun () ->
push q elt
2017-11-13 17:29:28 +04:00
let push_now ({ closed ; queue ; compute_size ;
current_size ; max_size ; _
2017-11-13 17:29:28 +04:00
} as q) elt =
2017-01-14 16:13:30 +04:00
if closed then raise Closed ;
let elt_size = compute_size elt in
2017-01-24 02:59:16 +04:00
(current_size + elt_size < max_size || Queue.is_empty queue)
&& begin
Queue.push (elt_size, elt) queue ;
q.current_size <- current_size + elt_size ;
2016-11-29 00:46:26 +04:00
notify_push q ;
true
end
2017-01-14 16:13:30 +04:00
exception Full
let push_now_exn q elt =
if not (push_now q elt) then raise Full
let safe_push_now q elt =
try push_now_exn q elt
with _ -> ()
let rec pop ({ closed ; queue ; empty ; current_size ; _ } as q) =
2017-01-14 16:13:30 +04:00
if not (Queue.is_empty queue) then
let (elt_size, elt) = Queue.pop queue in
2017-01-14 16:13:30 +04:00
notify_pop q ;
q.current_size <- current_size - elt_size ;
2017-01-14 16:13:30 +04:00
(if Queue.length queue = 0 then Lwt_condition.signal empty ());
2016-11-29 00:46:26 +04:00
Lwt.return elt
2017-01-14 16:13:30 +04:00
else if closed then
Lwt.fail Closed
2016-11-29 00:46:26 +04:00
else
wait_push q >>= fun () ->
pop q
let rec peek ({ closed ; queue ; _ } as q) =
2016-11-29 00:46:26 +04:00
if not (Queue.is_empty queue) then
let (_elt_size, elt) = Queue.peek queue in
2016-11-29 00:46:26 +04:00
Lwt.return elt
2017-01-14 16:13:30 +04:00
else if closed then
Lwt.fail Closed
2016-11-29 00:46:26 +04:00
else
wait_push q >>= fun () ->
peek q
2017-01-14 16:13:30 +04:00
exception Empty
let pop_now_exn ({ closed ; queue ; empty ; current_size ; _ } as q) =
2017-01-14 16:13:30 +04:00
if Queue.is_empty queue then
(if closed then raise Closed else raise Empty) ;
let (elt_size, elt) = Queue.pop queue in
2017-01-14 16:13:30 +04:00
(if Queue.length queue = 0 then Lwt_condition.signal empty ());
q.current_size <- current_size - elt_size ;
2016-11-29 00:46:26 +04:00
notify_pop q ;
elt
let pop_now q =
match pop_now_exn q with
2017-01-14 16:13:30 +04:00
| exception Empty -> None
2016-11-29 00:46:26 +04:00
| elt -> Some elt
let rec values_available q =
if is_empty q then
2017-01-14 16:13:30 +04:00
if q.closed then
raise Closed
else
wait_push q >>= fun () ->
values_available q
2016-11-29 00:46:26 +04:00
else
Lwt.return_unit
2017-01-14 16:13:30 +04:00
2017-11-08 15:24:25 +04:00
let pop_all q =
let rec loop acc =
match pop_now_exn q with
| exception Empty -> List.rev acc
| e -> loop (e :: acc)
in
pop q >>= fun e ->
Lwt.return (loop [e])
2017-01-14 16:13:30 +04:00
let close q =
if not q.closed then begin
q.closed <- true ;
notify_push q ;
notify_pop q ;
end
let rec iter q ~f =
Lwt.catch begin fun () ->
pop q >>= fun elt ->
f elt >>= fun () ->
iter q ~f
end begin function
| Closed -> Lwt.return_unit
| exn -> Lwt.fail exn
end