diff --git a/src/lib_stdlib_lwt/lwt_pipe.ml b/src/lib_stdlib_lwt/lwt_pipe.ml index e0a663666..de79ea6c3 100644 --- a/src/lib_stdlib_lwt/lwt_pipe.ml +++ b/src/lib_stdlib_lwt/lwt_pipe.ml @@ -126,6 +126,27 @@ let rec pop ({ closed ; queue ; empty ; current_size ; _ } as q) = wait_push q >>= fun () -> pop q +let rec pop_with_timeout timeout q = + if not (Queue.is_empty q.queue) then begin + Lwt.cancel timeout ; + pop q >>= Lwt.return_some + end else if Lwt.is_sleeping timeout then + if q.closed then begin + Lwt.cancel timeout ; + Lwt.fail Closed + end else + let waiter = wait_push q in + Lwt.choose [ + timeout ; + Lwt.protected waiter ; + ] >>= fun () -> + pop_with_timeout timeout q + else + Lwt.return_none + +let pop_with_timeout timeout q = + pop_with_timeout (Lwt_unix.sleep timeout) q + let rec peek ({ closed ; queue ; _ } as q) = if not (Queue.is_empty queue) then let (_elt_size, elt) = Queue.peek queue in diff --git a/src/lib_stdlib_lwt/lwt_pipe.mli b/src/lib_stdlib_lwt/lwt_pipe.mli index c81def955..b5ae11994 100644 --- a/src/lib_stdlib_lwt/lwt_pipe.mli +++ b/src/lib_stdlib_lwt/lwt_pipe.mli @@ -29,6 +29,14 @@ val pop : 'a t -> 'a Lwt.t (** [pop q] is a thread that blocks while [q] is empty, then removes and returns the first element in [q]. *) +val pop_with_timeout : float -> 'a t -> 'a option Lwt.t +(** [pop t q] is a thread that blocks while [q] is empty, then + removes and returns the first element [v] in [q] and + to return [Some v], unless no message could be popped + in [t] seconds, in which case it returns [None]. + As concurrent readers are allowed, [None] does not + necessarily mean that no value has been pushed. *) + val pop_all : 'a t -> 'a list Lwt.t (** [pop_all q] is a thread that blocks while [q] is empty, then removes and returns all the element in [q] (in the order they