diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 6c7aafda0..954c28bd3 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -514,10 +514,10 @@ module Make (P: PARAMS) = struct let crypt buf = let nonce = get_nonce remote_nonce in Crypto_box.box my_secret_key public_key buf nonce in - let writer = Lwt_pipe.create 2 in + let writer = Lwt_pipe.create ~size:2 () in let send p = Lwt_pipe.push writer p in let try_send p = Lwt_pipe.push_now writer p in - let reader = Lwt_pipe.create 2 in + let reader = Lwt_pipe.create ~size:2 () in let total_sent () = !sent in let total_recv () = !received in let current_inflow () = received_ema#get in @@ -763,9 +763,9 @@ module Make (P: PARAMS) = struct (* a non exception-based cancelation mechanism *) let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* create the internal event pipe *) - let events = Lwt_pipe.create 100 in + let events = Lwt_pipe.create ~size:100 () in (* create the external message pipe *) - let messages = Lwt_pipe.create 100 in + let messages = Lwt_pipe.create ~size:100 () in (* fill the known peers pools from last time *) Data_encoding_ezjsonm.read_file config.peers_file >>= fun res -> let known_peers, black_list, my_gid, diff --git a/src/utils/lwt_pipe.ml b/src/utils/lwt_pipe.ml index f6348218a..e86293fb4 100644 --- a/src/utils/lwt_pipe.ml +++ b/src/utils/lwt_pipe.ml @@ -11,15 +11,25 @@ open Lwt.Infix type 'a t = { queue : 'a Queue.t ; - size : int ; + size : int option ; + mutable closed : bool ; mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ; - mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option } + mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option ; + empty: unit Lwt_condition.t ; + full: unit Lwt_condition.t ; + not_full : unit Lwt_condition.t ; + } -let create ~size = +let create ?size () = { queue = Queue.create () ; size ; + closed = false ; push_waiter = None ; - pop_waiter = None } + pop_waiter = None ; + empty = Lwt_condition.create () ; + full = Lwt_condition.create () ; + not_full = Lwt_condition.create () ; + } let notify_push q = match q.push_waiter with @@ -37,69 +47,164 @@ let notify_pop q = let wait_push q = match q.push_waiter with - | Some (t, _) -> t + | Some (t, _) -> Lwt.protected t | None -> let waiter, wakener = Lwt.wait () in q.push_waiter <- Some (waiter, wakener) ; - waiter + Lwt.protected waiter let wait_pop q = match q.pop_waiter with - | Some (t, _) -> t + | Some (t, _) -> Lwt.protected t | None -> let waiter, wakener = Lwt.wait () in q.pop_waiter <- Some (waiter, wakener) ; - waiter + Lwt.protected waiter -let rec push ({ queue ; size } as q) elt = - if Queue.length queue < size then begin +let available_space { size } len = + match size with + | None -> true + | Some size -> len < size + +let length { queue } = Queue.length queue +let is_empty { queue } = Queue.is_empty queue +let is_full ({ queue } as q) = not (available_space q (Queue.length queue)) + +let rec empty q = + if is_empty q + then Lwt.return_unit + else (Lwt_condition.wait q.empty >>= fun () -> empty q) +let rec full q = + if is_full q + then Lwt.return_unit + else (Lwt_condition.wait q.full >>= fun () -> full q) +let rec not_full q = + if not (is_empty q) + then Lwt.return_unit + else (Lwt_condition.wait q.not_full >>= fun () -> not_full q) + +exception Closed + +let rec push ({ closed ; queue ; full } as q) elt = + let len = Queue.length queue in + if closed then Lwt.fail Closed + else if available_space q len then begin Queue.push elt queue ; notify_push q ; + (if not (available_space q (len + 1)) then Lwt_condition.signal full ()); Lwt.return_unit end else wait_pop q >>= fun () -> push q elt -let rec push_now ({ queue; size } as q) elt = - Queue.length queue < size && begin +let rec push_now ({ closed ; queue ; full } as q) elt = + if closed then raise Closed ; + let len = Queue.length queue in + available_space q len && begin Queue.push elt queue ; notify_push q ; + (if not (available_space q (len + 1)) then Lwt_condition.signal full ()) ; true end -let rec pop ({ queue } as q) = +exception Full + +let push_now_exn q elt = + if not (push_now q elt) then raise Full + +let rec pop_all ({ closed ; queue ; empty ; not_full } as q) = + let was_full = is_full q in + if not (Queue.is_empty queue) then + let queue_copy = Queue.copy queue in + Queue.clear queue; + notify_pop q ; + (if was_full then Lwt_condition.signal not_full ()); + Lwt_condition.signal empty (); + Lwt.return queue_copy + else if closed then + Lwt.fail Closed + else + wait_push q >>= fun () -> + pop_all q + +let rec pop ({ closed ; queue ; empty ; not_full } as q) = + let was_full = is_full q in if not (Queue.is_empty queue) then let elt = Queue.pop queue in notify_pop q ; + (if was_full then Lwt_condition.signal not_full ()); + (if Queue.length queue = 0 then Lwt_condition.signal empty ()); Lwt.return elt + else if closed then + Lwt.fail Closed else wait_push q >>= fun () -> pop q -let rec peek ({ queue } as q) = +let rec peek ({ closed ; queue } as q) = if not (Queue.is_empty queue) then let elt = Queue.peek queue in Lwt.return elt + else if closed then + Lwt.fail Closed else wait_push q >>= fun () -> peek q -let pop_now_exn ({ queue } as q) = +exception Empty + +let pop_now_exn ({ closed ; queue ; empty ; not_full } as q) = + let was_full = is_full q in + if Queue.is_empty queue then + (if closed then raise Closed else raise Empty) ; let elt = Queue.pop queue in + (if was_full then Lwt_condition.signal not_full ()); + (if Queue.length queue = 0 then Lwt_condition.signal empty ()); notify_pop q ; elt +let pop_all_now ({ closed ; queue ; empty ; not_full } as q) = + let was_empty = is_empty q in + let was_full = is_full q in + if Queue.is_empty queue then + (if closed then raise Closed else raise Empty) ; + let queue_copy = Queue.copy queue in + Queue.clear queue ; + (if was_full then Lwt_condition.signal not_full ()); + (if not was_empty then Lwt_condition.signal empty ()); + notify_pop q ; + queue_copy + let pop_now q = match pop_now_exn q with - | exception Queue.Empty -> None + | exception Empty -> None | elt -> Some elt -let length { queue } = Queue.length queue -let is_empty { queue } = Queue.is_empty queue - let rec values_available q = if is_empty q then - wait_push q >>= fun () -> - values_available q + if q.closed then + raise Closed + else + wait_push q >>= fun () -> + values_available q else Lwt.return_unit + +let close q = + if not q.closed then begin + q.closed <- true ; + notify_push q ; + notify_pop q ; + Lwt_condition.broadcast_exn q.full Closed ; + end + +let rec iter q ~f = + Lwt.catch begin fun () -> + pop q >>= fun elt -> + f elt >>= fun () -> + iter q ~f + end begin function + | Closed -> Lwt.return_unit + | exn -> Lwt.fail exn + end + diff --git a/src/utils/lwt_pipe.mli b/src/utils/lwt_pipe.mli index f880522d8..8b282f36a 100644 --- a/src/utils/lwt_pipe.mli +++ b/src/utils/lwt_pipe.mli @@ -14,7 +14,7 @@ type 'a t (** Type of queues holding values of type ['a]. *) -val create : size:int -> 'a t +val create : ?size:int -> unit -> 'a t (** [create ~size] is an empty queue that can hold max [size] elements. *) @@ -22,6 +22,10 @@ val push : 'a t -> 'a -> unit Lwt.t (** [push q v] is a thread that blocks while [q] contains more than [size] elements, then adds [v] at the end of [q]. *) +val pop_all : 'a t -> 'a Queue.t Lwt.t +(** [pop' q] is a thread that returns all elements in [q] or waits + till there is at least one element in [q]. *) + val pop : 'a t -> 'a Lwt.t (** [pop q] is a thread that blocks while [q] is empty, then removes and returns the first element in [q]. *) @@ -38,10 +42,22 @@ val push_now : 'a t -> 'a -> bool (** [push_now q v] adds [v] at the ends of [q] immediately and returns [false] if [q] is currently full, [true] otherwise. *) +exception Full + +val push_now_exn : 'a t -> 'a -> unit +(** [push_now q v] adds [v] at the ends of [q] immediately or + raise [Full] if [q] is currently full. *) + +val pop_all_now : 'a t -> 'a Queue.t +(** [pop_all_now q] is a copy of [q]'s internal queue, that may be + empty. *) + val pop_now : 'a t -> 'a option (** [pop_now q] maybe removes and returns the first element in [q] if [q] contains at least one element. *) +exception Empty + val pop_now_exn : 'a t -> 'a (** [pop_now_exn q] removes and returns the first element in [q] if [q] contains at least one element, or raise [Empty] otherwise. *) @@ -52,3 +68,30 @@ val length : 'a t -> int val is_empty : 'a t -> bool (** [is_empty q] is [true] if [q] is empty, [false] otherwise. *) +val is_full : 'a t -> bool +(** [is_full q] is [true] if [q] is full, [false] otherwise. *) + +val empty : 'a t -> unit Lwt.t +(** [empty q] returns when [q] becomes empty. *) + +val full : 'a t -> unit Lwt.t +(** [full q] returns when [q] becomes full. *) + +val not_full : 'a t -> unit Lwt.t +(** [not_full q] returns when [q] stop being full. *) + +val iter : 'a t -> f:('a -> unit Lwt.t) -> unit Lwt.t +(** [iter q ~f] pops all elements of [q] and applies [f] on them. *) + +exception Closed + +val close : 'a t -> unit +(** [close q] the write end of [q]: + + * Future write attempts will fail with [Closed]. + * If there are reads blocked, they will unblock and fail with [Closed]. + * Future read attempts will drain the data until there is no data left. + + Thus, after a pipe has been closed, reads never block. + Close is idempotent. +*) diff --git a/test/test_lwt_pipe.ml b/test/test_lwt_pipe.ml new file mode 100644 index 000000000..3756a1483 --- /dev/null +++ b/test/test_lwt_pipe.ml @@ -0,0 +1,50 @@ +open Lwt.Infix +include Logging.Make (struct let name = "test-pipe" end) + +let rec producer queue = function + | 0 -> + lwt_log_notice "Done producing." + | n -> + Lwt_pipe.push queue () >>= fun () -> + producer queue (pred n) + +let rec consumer queue = function + | 0 -> + lwt_log_notice "Done consuming." + | n -> + Lwt_pipe.pop queue >>= fun _ -> + consumer queue (pred n) + +let rec gen acc f = function + | 0 -> acc + | n -> gen (f () :: acc) f (pred n) + +let run qsize nbp nbc p c = + let q = Lwt_pipe.create qsize in + let producers = gen [] (fun () -> producer q p) nbp in + let consumers = gen [] (fun () -> consumer q c) nbc in + Lwt.join producers <&> Lwt.join consumers + +let main () = + let qsize = ref 10 in + let nb_producers = ref 10 in + let nb_consumers = ref 10 in + let produced_per_producer = ref 10 in + let consumed_per_consumer = ref 10 in + let spec = Arg.[ + "-qsize", Set_int qsize, " Size of the pipe"; + "-nc", Set_int nb_consumers, " Number of consumers"; + "-np", Set_int nb_producers, " Number of producers"; + "-n", Set_int consumed_per_consumer, " Number of consumed items per consumers"; + "-p", Set_int produced_per_producer, " Number of produced items per producers"; + "-v", Unit (fun () -> Lwt_log_core.(add_rule "*" Info)), " Log up to info msgs"; + "-vv", Unit (fun () -> Lwt_log_core.(add_rule "*" Debug)), " Log up to debug msgs"; + ] + in + let anon_fun _ = () in + let usage_msg = "Usage: %s .\nArguments are:" in + Arg.parse spec anon_fun usage_msg; + run !qsize !nb_producers + !nb_consumers !produced_per_producer !consumed_per_consumer + +let () = Lwt_main.run @@ main ()