Utils: add Lwt_pipe.pop_with_timeout

This commit is contained in:
Benjamin Canou 2017-12-19 19:06:22 +01:00 committed by Grégoire Henry
parent 9b09e70f5e
commit dee86fb462
2 changed files with 29 additions and 0 deletions

View File

@ -126,6 +126,27 @@ let rec pop ({ closed ; queue ; empty ; current_size ; _ } as q) =
wait_push q >>= fun () ->
pop q
let rec pop_with_timeout timeout q =
if not (Queue.is_empty q.queue) then begin
Lwt.cancel timeout ;
pop q >>= Lwt.return_some
end else if Lwt.is_sleeping timeout then
if q.closed then begin
Lwt.cancel timeout ;
Lwt.fail Closed
end else
let waiter = wait_push q in
Lwt.choose [
timeout ;
Lwt.protected waiter ;
] >>= fun () ->
pop_with_timeout timeout q
else
Lwt.return_none
let pop_with_timeout timeout q =
pop_with_timeout (Lwt_unix.sleep timeout) q
let rec peek ({ closed ; queue ; _ } as q) =
if not (Queue.is_empty queue) then
let (_elt_size, elt) = Queue.peek queue in

View File

@ -29,6 +29,14 @@ val pop : 'a t -> 'a Lwt.t
(** [pop q] is a thread that blocks while [q] is empty, then
removes and returns the first element in [q]. *)
val pop_with_timeout : float -> 'a t -> 'a option Lwt.t
(** [pop t q] is a thread that blocks while [q] is empty, then
removes and returns the first element [v] in [q] and
to return [Some v], unless no message could be popped
in [t] seconds, in which case it returns [None].
As concurrent readers are allowed, [None] does not
necessarily mean that no value has been pushed. *)
val pop_all : 'a t -> 'a list Lwt.t
(** [pop_all q] is a thread that blocks while [q] is empty, then
removes and returns all the element in [q] (in the order they