(**************************************************************************) (* *) (* Copyright (c) 2014 - 2018. *) (* Dynamic Ledger Solutions, Inc. *) (* *) (* All rights reserved. No warranty, explicit or implicit, provided. *) (* *) (**************************************************************************) open Lwt.Infix type 'a t = { queue : (int * 'a) Queue.t ; mutable current_size : int ; max_size : int ; compute_size : ('a -> int) ; mutable closed : bool ; mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ; mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option ; empty: unit Lwt_condition.t ; } let push_overhead = 4 * (Sys.word_size / 8) let create ?size () = let max_size, compute_size = match size with | None -> max_int, (fun _ -> 0) | Some (max_size, compute_size) -> max_size, compute_size in { queue = Queue.create () ; current_size = 0 ; max_size ; compute_size ; closed = false ; push_waiter = None ; pop_waiter = None ; empty = Lwt_condition.create () ; } let notify_push q = match q.push_waiter with | None -> () | Some (_, w) -> q.push_waiter <- None ; Lwt.wakeup_later w () let notify_pop q = match q.pop_waiter with | None -> () | Some (_, w) -> q.pop_waiter <- None ; Lwt.wakeup_later w () let wait_push q = match q.push_waiter with | Some (t, _) -> Lwt.protected t | None -> let waiter, wakener = Lwt.wait () in q.push_waiter <- Some (waiter, wakener) ; Lwt.protected waiter let wait_pop q = match q.pop_waiter with | Some (t, _) -> Lwt.protected t | None -> let waiter, wakener = Lwt.wait () in q.pop_waiter <- Some (waiter, wakener) ; Lwt.protected waiter let length { queue ; _ } = Queue.length queue let is_empty { queue ; _ } = Queue.is_empty queue let rec empty q = if is_empty q then Lwt.return_unit else (Lwt_condition.wait q.empty >>= fun () -> empty q) exception Closed let rec push ({ closed ; queue ; current_size ; max_size ; compute_size ; _ } as q) elt = let elt_size = compute_size elt in if closed then Lwt.fail Closed else if current_size + elt_size < max_size || Queue.is_empty queue then begin Queue.push (elt_size, elt) queue ; q.current_size <- current_size + elt_size ; notify_push q ; Lwt.return_unit end else wait_pop q >>= fun () -> push q elt let push_now ({ closed ; queue ; compute_size ; current_size ; max_size ; _ } as q) elt = if closed then raise Closed ; let elt_size = compute_size elt in (current_size + elt_size < max_size || Queue.is_empty queue) && begin Queue.push (elt_size, elt) queue ; q.current_size <- current_size + elt_size ; notify_push q ; true end exception Full let push_now_exn q elt = if not (push_now q elt) then raise Full let safe_push_now q elt = try push_now_exn q elt with _ -> () let rec pop ({ closed ; queue ; empty ; current_size ; _ } as q) = if not (Queue.is_empty queue) then let (elt_size, elt) = Queue.pop queue in notify_pop q ; q.current_size <- current_size - elt_size ; (if Queue.length queue = 0 then Lwt_condition.signal empty ()); Lwt.return elt else if closed then Lwt.fail Closed else wait_push q >>= fun () -> pop q let rec pop_with_timeout timeout q = if not (Queue.is_empty q.queue) then begin Lwt.cancel timeout ; pop q >>= Lwt.return_some end else if Lwt.is_sleeping timeout then if q.closed then begin Lwt.cancel timeout ; Lwt.fail Closed end else let waiter = wait_push q in Lwt.choose [ timeout ; Lwt.protected waiter ; ] >>= fun () -> pop_with_timeout timeout q else Lwt.return_none let pop_with_timeout timeout q = pop_with_timeout (Lwt_unix.sleep timeout) q let rec peek ({ closed ; queue ; _ } as q) = if not (Queue.is_empty queue) then let (_elt_size, elt) = Queue.peek queue in Lwt.return elt else if closed then Lwt.fail Closed else wait_push q >>= fun () -> peek q let peek_all { queue ; closed } = if closed then [] else List.rev (Queue.fold (fun acc (_, e) -> e :: acc) [] queue) exception Empty let pop_now_exn ({ closed ; queue ; empty ; current_size ; _ } as q) = if Queue.is_empty queue then (if closed then raise Closed else raise Empty) ; let (elt_size, elt) = Queue.pop queue in (if Queue.length queue = 0 then Lwt_condition.signal empty ()); q.current_size <- current_size - elt_size ; notify_pop q ; elt let pop_now q = match pop_now_exn q with | exception Empty -> None | elt -> Some elt let rec values_available q = if is_empty q then if q.closed then raise Closed else wait_push q >>= fun () -> values_available q else Lwt.return_unit let rec pop_all_loop q acc = match pop_now_exn q with | exception Empty -> List.rev acc | e -> pop_all_loop q (e :: acc) let pop_all q = pop q >>= fun e -> Lwt.return (pop_all_loop q [e]) let pop_all_now q = pop_all_loop q [] let close q = if not q.closed then begin q.closed <- true ; notify_push q ; notify_pop q ; end let rec iter q ~f = Lwt.catch begin fun () -> pop q >>= fun elt -> f elt >>= fun () -> iter q ~f end begin function | Closed -> Lwt.return_unit | exn -> Lwt.fail exn end