diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index 45b2b912f..6d9c64ee3 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -291,7 +291,7 @@ module Reader = struct let run ?size conn encoding canceler = let compute_size = function - | Ok (size, _) -> (Sys.word_size / 8) * 11 + size + | Ok (size, _) -> (Sys.word_size / 8) * 11 + size + Lwt_pipe.push_overhead | Error _ -> 0 (* we push Error only when we close the socket, we don't fear memory leaks in that case... *) in let size = map_option size ~f:(fun max -> (max, compute_size)) in @@ -406,8 +406,10 @@ module Writer = struct sz + MBytes.length buf + 2 * Sys.word_size) 0 in function - | buf_l, None -> Sys.word_size + buf_list_size buf_l - | buf_l, Some _ -> 2 * Sys.word_size + buf_list_size buf_l + | buf_l, None -> + Sys.word_size + buf_list_size buf_l + Lwt_pipe.push_overhead + | buf_l, Some _ -> + 2 * Sys.word_size + buf_list_size buf_l + Lwt_pipe.push_overhead in let size = map_option size ~f:(fun max -> max, compute_size) in let st = diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index 48bc4fc40..aa631a80e 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -884,7 +884,8 @@ and create_connection pool p2p_conn id_point point_info peer_info _version = let canceler = Canceler.create () in let size = map_option pool.config.incoming_app_message_queue_size - ~f:(fun qs -> qs, fun (size, _) -> (Sys.word_size / 8) * 11 + size) in + ~f:(fun qs -> qs, fun (size, _) -> + (Sys.word_size / 8) * 11 + size + Lwt_pipe.push_overhead) in let messages = Lwt_pipe.create ?size () in let rec callback = { Answerer.message = diff --git a/src/node/net/p2p_io_scheduler.ml b/src/node/net/p2p_io_scheduler.ml index ee4d89c12..94832c216 100644 --- a/src/node/net/p2p_io_scheduler.ml +++ b/src/node/net/p2p_io_scheduler.ml @@ -339,11 +339,12 @@ let create exception Closed let read_size = function - | Ok buf -> (Sys.word_size / 8) * 8 + MBytes.length buf + | Ok buf -> (Sys.word_size / 8) * 8 + MBytes.length buf + Lwt_pipe.push_overhead | Error _ -> 0 (* we push Error only when we close the socket, we don't fear memory leaks in that case... *) -let write_size mbytes = (Sys.word_size / 8) * 6 + MBytes.length mbytes +let write_size mbytes = + (Sys.word_size / 8) * 6 + MBytes.length mbytes + Lwt_pipe.push_overhead let register = let cpt = ref 0 in diff --git a/src/utils/lwt_pipe.ml b/src/utils/lwt_pipe.ml index 3838ec423..670b0bad3 100644 --- a/src/utils/lwt_pipe.ml +++ b/src/utils/lwt_pipe.ml @@ -20,12 +20,13 @@ type 'a t = empty: unit Lwt_condition.t ; } +let push_overhead = 4 * (Sys.word_size / 8) + let create ?size () = let max_size, compute_size = match size with | None -> max_int, (fun _ -> 0) - | Some (max_size, compute_size) -> - max_size, (fun e -> 4 * (Sys.word_size / 8) + compute_size e) in + | Some (max_size, compute_size) -> max_size, compute_size in { queue = Queue.create () ; current_size = 0 ; max_size ; diff --git a/src/utils/lwt_pipe.mli b/src/utils/lwt_pipe.mli index 431d939d2..7bcc7ff61 100644 --- a/src/utils/lwt_pipe.mli +++ b/src/utils/lwt_pipe.mli @@ -17,7 +17,9 @@ type 'a t val create : ?size:(int * ('a -> int)) -> unit -> 'a t (** [create ~size:(max_size, compute_size)] is an empty queue that can hold max [size] bytes of data, using [compute_size] to compute the - size of a datum. *) + size of a datum. If want to count allocated bytes precisely, you + need to add [push_overhead] to the result of[compute_size]. + When no [size] argument is provided, the queue is unbounded. *) val push : 'a t -> 'a -> unit Lwt.t (** [push q v] is a thread that blocks while [q] contains more @@ -88,3 +90,6 @@ val close : 'a t -> unit Thus, after a pipe has been closed, reads never block. Close is idempotent. *) + +val push_overhead: int +(** The allocated size in bytes when pushing in the queue. *)