diff --git a/src/utils/lwt_pipe.ml b/src/utils/lwt_pipe.ml index f50caf7a0..3838ec423 100644 --- a/src/utils/lwt_pipe.ml +++ b/src/utils/lwt_pipe.ml @@ -161,6 +161,15 @@ let rec values_available q = else Lwt.return_unit +let pop_all q = + let rec loop acc = + match pop_now_exn q with + | exception Empty -> List.rev acc + | e -> loop (e :: acc) + in + pop q >>= fun e -> + Lwt.return (loop [e]) + let close q = if not q.closed then begin q.closed <- true ; diff --git a/src/utils/lwt_pipe.mli b/src/utils/lwt_pipe.mli index a5d56a151..431d939d2 100644 --- a/src/utils/lwt_pipe.mli +++ b/src/utils/lwt_pipe.mli @@ -27,6 +27,11 @@ val pop : 'a t -> 'a Lwt.t (** [pop q] is a thread that blocks while [q] is empty, then removes and returns the first element in [q]. *) +val pop_all : 'a t -> 'a list Lwt.t +(** [pop_all q] is a thread that blocks while [q] is empty, then + removes and returns all the element in [q] (in the order they + were inserted). *) + val peek : 'a t -> 'a Lwt.t (** [peek] is like [pop] except it does not removes the first element. *)