diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index 93da2e14e..35c521cdf 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -10,8 +10,6 @@ (* TODO encode/encrypt before to push into the writer pipe. *) (* TODO patch Sodium.Box to avoid allocation of the encrypted buffer.*) (* TODO patch Data_encoding for continuation-based binary writer/reader. *) -(* TODO use queue bound by memory size of its elements, not by the - number of elements. *) (* TODO test `close ~wait:true`. *) (* TODO nothing in welcoming message proves that the incoming peer is the owner of the public key... only the first message will @@ -247,6 +245,11 @@ module Reader = struct Lwt.return_unit let run ?size conn encoding canceler = + let compute_size = function + | Ok (size, _) -> (Sys.word_size / 8) * 11 + size + | Error _ -> 0 (* we push Error only when we close the socket, + we don't fear memory leaks in that case... *) in + let size = map_option size ~f:(fun max -> (max, compute_size)) in let st = { canceler ; conn ; encoding ; messages = Lwt_pipe.create ?size () ; @@ -303,6 +306,13 @@ module Writer = struct Lwt.return_unit let run ?size conn encoding canceler = + let compute_size = function + | msg, None -> + 10 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg + | msg, Some _ -> + 18 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg + in + let size = map_option size ~f:(fun max -> max, compute_size) in let st = { canceler ; conn ; encoding ; messages = Lwt_pipe.create ?size () ; @@ -350,24 +360,9 @@ let accept let canceler = Canceler.create () in let conn = { fd ; info ; cryptobox_data } in let reader = - let compute_size = function - | Ok (size, _) -> (Sys.word_size / 8) * 11 + size - | Error err -> (Sys.word_size / 8) * (3 + 3 * (List.length err)) - in - let size = map_option incoming_message_queue_size - ~f:(fun qs -> (qs, compute_size)) in - Reader.run ?size conn encoding canceler + Reader.run ?size:incoming_message_queue_size conn encoding canceler and writer = - let compute_size = function - | msg, None -> - 10 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg - | msg, Some _ -> - 18 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg - in - let size = map_option outgoing_message_queue_size - ~f:(fun qs -> qs, compute_size) - in - Writer.run ?size conn encoding canceler in + Writer.run ?size:outgoing_message_queue_size conn encoding canceler in let conn = { conn ; reader ; writer } in Canceler.on_cancel canceler begin fun () -> P2p_io_scheduler.close fd >>= fun _ -> diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index 55e4b8205..4d9a34161 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -248,12 +248,13 @@ let active_connections pool = Gid.Table.length pool.connected_gids let create_connection pool conn id_point pi gi = let gid = Gid_info.gid gi in let canceler = Canceler.create () in - let size = map_option pool.config.incoming_app_message_queue_size - ~f:(fun qs -> qs, fun (size, _) -> (Sys.word_size / 8) * (11 + size)) - in + let size = + map_option pool.config.incoming_app_message_queue_size + ~f:(fun qs -> qs, fun (size, _) -> (Sys.word_size / 8) * 11 + size) in let messages = Lwt_pipe.create ?size () in let callback = - { Answerer.message = (fun size msg -> Lwt_pipe.push messages (size, msg)) ; + { Answerer.message = + (fun size msg -> Lwt_pipe.push messages (size, msg)) ; advertise = register_new_points pool gid ; bootstrap = list_known_points pool gid ; } in diff --git a/src/node/net/p2p_io_scheduler.ml b/src/node/net/p2p_io_scheduler.ml index 8ffe2e3cf..af74ac9b9 100644 --- a/src/node/net/p2p_io_scheduler.ml +++ b/src/node/net/p2p_io_scheduler.ml @@ -315,11 +315,12 @@ let create exception Closed -let read_size_fun = function +let read_size = function | Ok buf -> (Sys.word_size / 8) * 8 + MBytes.length buf - | Error exns -> (Sys.word_size / 8) * (1 + 3 * (List.length exns)) + | Error _ -> 0 (* we push Error only when we close the socket, + we don't fear memory leaks in that case... *) -let write_size_fun mbytes = (Sys.word_size / 8) * 6 + MBytes.length mbytes +let write_size mbytes = (Sys.word_size / 8) * 6 + MBytes.length mbytes let register = let cpt = ref 0 in @@ -330,10 +331,12 @@ let register = end else begin let id = incr cpt; !cpt in let canceler = Canceler.create () in - let read_size_arg = map_option st.read_queue_size ~f:(fun v -> v, read_size_fun) in - let write_size_arg = map_option st.write_queue_size ~f:(fun v -> v, write_size_fun) in - let read_queue = Lwt_pipe.create ?size:read_size_arg () in - let write_queue = Lwt_pipe.create ?size:write_size_arg () in + let read_size = + map_option st.read_queue_size ~f:(fun v -> v, read_size) in + let write_size = + map_option st.write_queue_size ~f:(fun v -> v, write_size) in + let read_queue = Lwt_pipe.create ?size:read_size () in + let write_queue = Lwt_pipe.create ?size:write_size () in let read_conn = ReadScheduler.create_connection st.read_scheduler (conn, st.read_buffer_size) read_queue canceler id diff --git a/src/utils/lwt_pipe.ml b/src/utils/lwt_pipe.ml index 4051bcce8..0fede0225 100644 --- a/src/utils/lwt_pipe.ml +++ b/src/utils/lwt_pipe.ml @@ -21,7 +21,8 @@ type 'a t = } let create ?size () = - let max_size, compute_size = match size with + let max_size, compute_size = + match size with | None -> max_int, (fun _ -> 0) | Some (max_size, compute_size) -> max_size, (fun e -> 4 * (Sys.word_size / 8) + compute_size e) in @@ -77,16 +78,15 @@ exception Closed let rec push ({ closed ; queue ; current_size ; max_size ; compute_size} as q) elt = - if closed then Lwt.fail Closed - else - let elt_size = compute_size elt in - if current_size + elt_size < max_size then begin + let elt_size = compute_size elt in + if closed then + Lwt.fail Closed + else if current_size + elt_size < max_size || Queue.is_empty queue then begin Queue.push (elt_size, elt) queue ; q.current_size <- current_size + elt_size ; notify_push q ; Lwt.return_unit - end - else + end else wait_pop q >>= fun () -> push q elt @@ -95,7 +95,7 @@ let rec push_now ({ closed ; queue ; compute_size ; } as q) elt = if closed then raise Closed ; let elt_size = compute_size elt in - (current_size + elt_size < max_size) + (current_size + elt_size < max_size || Queue.is_empty queue) && begin Queue.push (elt_size, elt) queue ; q.current_size <- current_size + elt_size ; diff --git a/test/test_p2p_io_scheduler.ml b/test/test_p2p_io_scheduler.ml index 81efc7822..5a5b7937f 100644 --- a/test/test_p2p_io_scheduler.ml +++ b/test/test_p2p_io_scheduler.ml @@ -170,8 +170,8 @@ let max_download_speed = ref None let max_upload_speed = ref None let read_buffer_size = ref (1 lsl 14) -let read_queue_size = ref (Some (4096 * 16)) -let write_queue_size = ref (Some (4096 * 16)) +let read_queue_size = ref (Some (1 lsl 14)) +let write_queue_size = ref (Some (1 lsl 14)) let delay = ref 60. let clients = ref 8