Shell: improve Lwt_pipe
@ -514,10 +514,10 @@ module Make (P: PARAMS) = struct
let crypt buf =
let nonce = get_nonce remote_nonce in
|||| my_secret_key public_key buf nonce in
let writer = Lwt_pipe.create 2 in
let writer = Lwt_pipe.create ~size:2 () in
let send p = Lwt_pipe.push writer p in
let try_send p = Lwt_pipe.push_now writer p in
let reader = Lwt_pipe.create 2 in
let reader = Lwt_pipe.create ~size:2 () in
let total_sent () = !sent in
let total_recv () = !received in
let current_inflow () = received_ema#get in
@ -763,9 +763,9 @@ module Make (P: PARAMS) = struct
(* a non exception-based cancelation mechanism *)
let cancelation, cancel, on_cancel = Lwt_utils.canceler () in
(* create the internal event pipe *)
let events = Lwt_pipe.create 100 in
let events = Lwt_pipe.create ~size:100 () in
(* create the external message pipe *)
let messages = Lwt_pipe.create 100 in
let messages = Lwt_pipe.create ~size:100 () in
(* fill the known peers pools from last time *)
Data_encoding_ezjsonm.read_file config.peers_file >>= fun res ->
let known_peers, black_list, my_gid,
@ -11,15 +11,25 @@ open Lwt.Infix
type 'a t =
{ queue : 'a Queue.t ;
size : int ;
size : int option ;
mutable closed : bool ;
mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ;
mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option }
mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option ;
empty: unit Lwt_condition.t ;
full: unit Lwt_condition.t ;
not_full : unit Lwt_condition.t ;
let create ~size =
let create ?size () =
{ queue = Queue.create () ;
size ;
closed = false ;
push_waiter = None ;
pop_waiter = None }
pop_waiter = None ;
empty = Lwt_condition.create () ;
full = Lwt_condition.create () ;
not_full = Lwt_condition.create () ;
let notify_push q =
match q.push_waiter with
@ -37,69 +47,164 @@ let notify_pop q =
let wait_push q =
match q.push_waiter with
| Some (t, _) -> t
| Some (t, _) -> Lwt.protected t
| None ->
let waiter, wakener = Lwt.wait () in
q.push_waiter <- Some (waiter, wakener) ;
Lwt.protected waiter
let wait_pop q =
match q.pop_waiter with
| Some (t, _) -> t
| Some (t, _) -> Lwt.protected t
| None ->
let waiter, wakener = Lwt.wait () in
q.pop_waiter <- Some (waiter, wakener) ;
Lwt.protected waiter
let rec push ({ queue ; size } as q) elt =
if Queue.length queue < size then begin
let available_space { size } len =
match size with
| None -> true
| Some size -> len < size
let length { queue } = Queue.length queue
let is_empty { queue } = Queue.is_empty queue
let is_full ({ queue } as q) = not (available_space q (Queue.length queue))
let rec empty q =
if is_empty q
then Lwt.return_unit
else (Lwt_condition.wait q.empty >>= fun () -> empty q)
let rec full q =
if is_full q
then Lwt.return_unit
else (Lwt_condition.wait q.full >>= fun () -> full q)
let rec not_full q =
if not (is_empty q)
then Lwt.return_unit
else (Lwt_condition.wait q.not_full >>= fun () -> not_full q)
exception Closed
let rec push ({ closed ; queue ; full } as q) elt =
let len = Queue.length queue in
if closed then Closed
else if available_space q len then begin
Queue.push elt queue ;
notify_push q ;
(if not (available_space q (len + 1)) then Lwt_condition.signal full ());
end else
wait_pop q >>= fun () ->
push q elt
let rec push_now ({ queue; size } as q) elt =
Queue.length queue < size && begin
let rec push_now ({ closed ; queue ; full } as q) elt =
if closed then raise Closed ;
let len = Queue.length queue in
available_space q len && begin
Queue.push elt queue ;
notify_push q ;
(if not (available_space q (len + 1)) then Lwt_condition.signal full ()) ;
let rec pop ({ queue } as q) =
exception Full
let push_now_exn q elt =
if not (push_now q elt) then raise Full
let rec pop_all ({ closed ; queue ; empty ; not_full } as q) =
let was_full = is_full q in
if not (Queue.is_empty queue) then
let queue_copy = Queue.copy queue in
Queue.clear queue;
notify_pop q ;
(if was_full then Lwt_condition.signal not_full ());
Lwt_condition.signal empty ();
Lwt.return queue_copy
else if closed then
|||| Closed
wait_push q >>= fun () ->
pop_all q
let rec pop ({ closed ; queue ; empty ; not_full } as q) =
let was_full = is_full q in
if not (Queue.is_empty queue) then
let elt = Queue.pop queue in
notify_pop q ;
(if was_full then Lwt_condition.signal not_full ());
(if Queue.length queue = 0 then Lwt_condition.signal empty ());
Lwt.return elt
else if closed then
|||| Closed
wait_push q >>= fun () ->
pop q
let rec peek ({ queue } as q) =
let rec peek ({ closed ; queue } as q) =
if not (Queue.is_empty queue) then
let elt = Queue.peek queue in
Lwt.return elt
else if closed then
|||| Closed
wait_push q >>= fun () ->
peek q
let pop_now_exn ({ queue } as q) =
exception Empty
let pop_now_exn ({ closed ; queue ; empty ; not_full } as q) =
let was_full = is_full q in
if Queue.is_empty queue then
(if closed then raise Closed else raise Empty) ;
let elt = Queue.pop queue in
(if was_full then Lwt_condition.signal not_full ());
(if Queue.length queue = 0 then Lwt_condition.signal empty ());
notify_pop q ;
let pop_all_now ({ closed ; queue ; empty ; not_full } as q) =
let was_empty = is_empty q in
let was_full = is_full q in
if Queue.is_empty queue then
(if closed then raise Closed else raise Empty) ;
let queue_copy = Queue.copy queue in
Queue.clear queue ;
(if was_full then Lwt_condition.signal not_full ());
(if not was_empty then Lwt_condition.signal empty ());
notify_pop q ;
let pop_now q =
match pop_now_exn q with
| exception Queue.Empty -> None
| exception Empty -> None
| elt -> Some elt
let length { queue } = Queue.length queue
let is_empty { queue } = Queue.is_empty queue
let rec values_available q =
if is_empty q then
wait_push q >>= fun () ->
values_available q
if q.closed then
raise Closed
wait_push q >>= fun () ->
values_available q
let close q =
if not q.closed then begin
q.closed <- true ;
notify_push q ;
notify_pop q ;
Lwt_condition.broadcast_exn q.full Closed ;
let rec iter q ~f =
Lwt.catch begin fun () ->
pop q >>= fun elt ->
f elt >>= fun () ->
iter q ~f
end begin function
| Closed -> Lwt.return_unit
| exn -> exn
@ -14,7 +14,7 @@
type 'a t
(** Type of queues holding values of type ['a]. *)
val create : size:int -> 'a t
val create : ?size:int -> unit -> 'a t
(** [create ~size] is an empty queue that can hold max [size]
elements. *)
@ -22,6 +22,10 @@ val push : 'a t -> 'a -> unit Lwt.t
(** [push q v] is a thread that blocks while [q] contains more
than [size] elements, then adds [v] at the end of [q]. *)
val pop_all : 'a t -> 'a Queue.t Lwt.t
(** [pop' q] is a thread that returns all elements in [q] or waits
till there is at least one element in [q]. *)
val pop : 'a t -> 'a Lwt.t
(** [pop q] is a thread that blocks while [q] is empty, then
removes and returns the first element in [q]. *)
@ -38,10 +42,22 @@ val push_now : 'a t -> 'a -> bool
(** [push_now q v] adds [v] at the ends of [q] immediately and returns
[false] if [q] is currently full, [true] otherwise. *)
exception Full
val push_now_exn : 'a t -> 'a -> unit
(** [push_now q v] adds [v] at the ends of [q] immediately or
raise [Full] if [q] is currently full. *)
val pop_all_now : 'a t -> 'a Queue.t
(** [pop_all_now q] is a copy of [q]'s internal queue, that may be
empty. *)
val pop_now : 'a t -> 'a option
(** [pop_now q] maybe removes and returns the first element in [q] if
[q] contains at least one element. *)
exception Empty
val pop_now_exn : 'a t -> 'a
(** [pop_now_exn q] removes and returns the first element in [q] if
[q] contains at least one element, or raise [Empty] otherwise. *)
@ -52,3 +68,30 @@ val length : 'a t -> int
val is_empty : 'a t -> bool
(** [is_empty q] is [true] if [q] is empty, [false] otherwise. *)
val is_full : 'a t -> bool
(** [is_full q] is [true] if [q] is full, [false] otherwise. *)
val empty : 'a t -> unit Lwt.t
(** [empty q] returns when [q] becomes empty. *)
val full : 'a t -> unit Lwt.t
(** [full q] returns when [q] becomes full. *)
val not_full : 'a t -> unit Lwt.t
(** [not_full q] returns when [q] stop being full. *)
val iter : 'a t -> f:('a -> unit Lwt.t) -> unit Lwt.t
(** [iter q ~f] pops all elements of [q] and applies [f] on them. *)
exception Closed
val close : 'a t -> unit
(** [close q] the write end of [q]:
* Future write attempts will fail with [Closed].
* If there are reads blocked, they will unblock and fail with [Closed].
* Future read attempts will drain the data until there is no data left.
Thus, after a pipe has been closed, reads never block.
Close is idempotent.
@ -0,0 +1,50 @@
open Lwt.Infix
include Logging.Make (struct let name = "test-pipe" end)
let rec producer queue = function
| 0 ->
lwt_log_notice "Done producing."
| n ->
Lwt_pipe.push queue () >>= fun () ->
producer queue (pred n)
let rec consumer queue = function
| 0 ->
lwt_log_notice "Done consuming."
| n ->
Lwt_pipe.pop queue >>= fun _ ->
consumer queue (pred n)
let rec gen acc f = function
| 0 -> acc
| n -> gen (f () :: acc) f (pred n)
let run qsize nbp nbc p c =
let q = Lwt_pipe.create qsize in
let producers = gen [] (fun () -> producer q p) nbp in
let consumers = gen [] (fun () -> consumer q c) nbc in
Lwt.join producers <&> Lwt.join consumers
let main () =
let qsize = ref 10 in
let nb_producers = ref 10 in
let nb_consumers = ref 10 in
let produced_per_producer = ref 10 in
let consumed_per_consumer = ref 10 in
let spec = Arg.[
"-qsize", Set_int qsize, "<int> Size of the pipe";
"-nc", Set_int nb_consumers, "<int> Number of consumers";
"-np", Set_int nb_producers, "<int> Number of producers";
"-n", Set_int consumed_per_consumer, "<int> Number of consumed items per consumers";
"-p", Set_int produced_per_producer, "<int> Number of produced items per producers";
"-v", Unit (fun () -> Lwt_log_core.(add_rule "*" Info)), " Log up to info msgs";
"-vv", Unit (fun () -> Lwt_log_core.(add_rule "*" Debug)), " Log up to debug msgs";
let anon_fun _ = () in
let usage_msg = "Usage: %s <num_peers>.\nArguments are:" in
Arg.parse spec anon_fun usage_msg;
run !qsize !nb_producers
!nb_consumers !produced_per_producer !consumed_per_consumer
let () = @@ main ()
