Lwt_pipe: do not force the size function to count bytes.

For now, we were always bounding a queue size by the total amount of
allocated bytes. We might want to use ather kind of bounds (e.g. the
total number of elements).
This commit is contained in:
Grégoire Henry 2017-11-11 03:34:12 +01:00 committed by Benjamin Canou
parent 2d08ba778f
commit e98e175c21
5 changed files with 19 additions and 9 deletions

View File

@ -291,7 +291,7 @@ module Reader = struct
let run ?size conn encoding canceler =
let compute_size = function
| Ok (size, _) -> (Sys.word_size / 8) * 11 + size
| Ok (size, _) -> (Sys.word_size / 8) * 11 + size + Lwt_pipe.push_overhead
| Error _ -> 0 (* we push Error only when we close the socket,
we don't fear memory leaks in that case... *) in
let size = map_option size ~f:(fun max -> (max, compute_size)) in
@ -406,8 +406,10 @@ module Writer = struct
sz + MBytes.length buf + 2 * Sys.word_size) 0
in
function
| buf_l, None -> Sys.word_size + buf_list_size buf_l
| buf_l, Some _ -> 2 * Sys.word_size + buf_list_size buf_l
| buf_l, None ->
Sys.word_size + buf_list_size buf_l + Lwt_pipe.push_overhead
| buf_l, Some _ ->
2 * Sys.word_size + buf_list_size buf_l + Lwt_pipe.push_overhead
in
let size = map_option size ~f:(fun max -> max, compute_size) in
let st =

View File

@ -884,7 +884,8 @@ and create_connection pool p2p_conn id_point point_info peer_info _version =
let canceler = Canceler.create () in
let size =
map_option pool.config.incoming_app_message_queue_size
~f:(fun qs -> qs, fun (size, _) -> (Sys.word_size / 8) * 11 + size) in
~f:(fun qs -> qs, fun (size, _) ->
(Sys.word_size / 8) * 11 + size + Lwt_pipe.push_overhead) in
let messages = Lwt_pipe.create ?size () in
let rec callback =
{ Answerer.message =

View File

@ -339,11 +339,12 @@ let create
exception Closed
let read_size = function
| Ok buf -> (Sys.word_size / 8) * 8 + MBytes.length buf
| Ok buf -> (Sys.word_size / 8) * 8 + MBytes.length buf + Lwt_pipe.push_overhead
| Error _ -> 0 (* we push Error only when we close the socket,
we don't fear memory leaks in that case... *)
let write_size mbytes = (Sys.word_size / 8) * 6 + MBytes.length mbytes
let write_size mbytes =
(Sys.word_size / 8) * 6 + MBytes.length mbytes + Lwt_pipe.push_overhead
let register =
let cpt = ref 0 in

View File

@ -20,12 +20,13 @@ type 'a t =
empty: unit Lwt_condition.t ;
}
let push_overhead = 4 * (Sys.word_size / 8)
let create ?size () =
let max_size, compute_size =
match size with
| None -> max_int, (fun _ -> 0)
| Some (max_size, compute_size) ->
max_size, (fun e -> 4 * (Sys.word_size / 8) + compute_size e) in
| Some (max_size, compute_size) -> max_size, compute_size in
{ queue = Queue.create () ;
current_size = 0 ;
max_size ;

View File

@ -17,7 +17,9 @@ type 'a t
val create : ?size:(int * ('a -> int)) -> unit -> 'a t
(** [create ~size:(max_size, compute_size)] is an empty queue that can
hold max [size] bytes of data, using [compute_size] to compute the
size of a datum. *)
size of a datum. If want to count allocated bytes precisely, you
need to add [push_overhead] to the result of[compute_size].
When no [size] argument is provided, the queue is unbounded. *)
val push : 'a t -> 'a -> unit Lwt.t
(** [push q v] is a thread that blocks while [q] contains more
@ -88,3 +90,6 @@ val close : 'a t -> unit
Thus, after a pipe has been closed, reads never block.
Close is idempotent.
*)
val push_overhead: int
(** The allocated size in bytes when pushing in the queue. *)