diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index 8c00b04b5..35c521cdf 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -10,8 +10,6 @@ (* TODO encode/encrypt before to push into the writer pipe. *) (* TODO patch Sodium.Box to avoid allocation of the encrypted buffer.*) (* TODO patch Data_encoding for continuation-based binary writer/reader. *) -(* TODO use queue bound by memory size of its elements, not by the - number of elements. *) (* TODO test `close ~wait:true`. *) (* TODO nothing in welcoming message proves that the incoming peer is the owner of the public key... only the first message will @@ -218,7 +216,7 @@ module Reader = struct canceler: Canceler.t ; conn: connection ; encoding: 'msg Data_encoding.t ; - messages: 'msg tzresult Lwt_pipe.t ; + messages: (int * 'msg) tzresult Lwt_pipe.t ; mutable worker: unit Lwt.t ; } @@ -229,13 +227,15 @@ module Reader = struct Lwt_unix.yield () >>= fun () -> Lwt_utils.protect ~canceler:st.canceler begin fun () -> Crypto.read_chunk st.conn.fd st.conn.cryptobox_data >>=? fun buf -> - read_message st buf + let size = 6 * (Sys.word_size / 8) + MBytes.length buf in + read_message st buf >>|? fun msg -> + size, msg end >>= function - | Ok None -> + | Ok (_, None) -> Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () -> worker_loop st - | Ok (Some msg) -> - Lwt_pipe.push st.messages (Ok msg) >>= fun () -> + | Ok (size, Some msg) -> + Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> worker_loop st | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> Lwt.return_unit @@ -245,6 +245,11 @@ module Reader = struct Lwt.return_unit let run ?size conn encoding canceler = + let compute_size = function + | Ok (size, _) -> (Sys.word_size / 8) * 11 + size + | Error _ -> 0 (* we push Error only when we close the socket, + we don't fear memory leaks in that case... *) in + let size = map_option size ~f:(fun max -> (max, compute_size)) in let st = { canceler ; conn ; encoding ; messages = Lwt_pipe.create ?size () ; @@ -301,6 +306,13 @@ module Writer = struct Lwt.return_unit let run ?size conn encoding canceler = + let compute_size = function + | msg, None -> + 10 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg + | msg, Some _ -> + 18 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg + in + let size = map_option size ~f:(fun max -> max, compute_size) in let st = { canceler ; conn ; encoding ; messages = Lwt_pipe.create ?size () ; @@ -367,10 +379,6 @@ let catch_closed_pipe f = | exn -> fail (Exn exn) end -let is_writable { writer } = - not (Lwt_pipe.is_full writer.messages) -let wait_writable { writer } = - Lwt_pipe.not_full writer.messages let write { writer } msg = catch_closed_pipe begin fun () -> Lwt_pipe.push writer.messages (msg, None) >>= return diff --git a/src/node/net/p2p_connection.mli b/src/node/net/p2p_connection.mli index 890cc6c34..4a4cc8b58 100644 --- a/src/node/net/p2p_connection.mli +++ b/src/node/net/p2p_connection.mli @@ -71,14 +71,6 @@ val accept: (** {2 Output functions} *) -val is_writable: 'msg t -> bool -(** [is_writable conn] is [true] iff [conn] internal write queue is - not full. *) - -val wait_writable: 'msg t -> unit Lwt.t -(** (Cancelable) [wait_writable conn] returns when [conn]'s internal - write queue becomes writable (i.e. not full). *) - val write: 'msg t -> 'msg -> unit tzresult Lwt.t (** [write conn msg] returns when [msg] has successfully been added to [conn]'s internal write queue or fails with a corresponding @@ -103,12 +95,12 @@ val wait_readable: 'msg t -> unit tzresult Lwt.t (** (Cancelable) [wait_readable conn] returns when [conn]'s internal read queue becomes readable (i.e. not empty). *) -val read: 'msg t -> 'msg tzresult Lwt.t +val read: 'msg t -> (int * 'msg) tzresult Lwt.t (** [read conn msg] returns when [msg] has successfully been popped from [conn]'s internal read queue or fails with a corresponding error. *) -val read_now: 'msg t -> 'msg tzresult option +val read_now: 'msg t -> (int * 'msg) tzresult option (** [read_now conn msg] is [Some msg] if [conn]'s internal read queue is not empty, [None] if it is empty, or fails with a correponding error otherwise. *) diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index 53826a029..23d2eb09e 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -65,7 +65,7 @@ module Answerer = struct type 'msg callback = { bootstrap: unit -> Point.t list Lwt.t ; advertise: Point.t list -> unit Lwt.t ; - message: 'msg -> unit Lwt.t ; + message: int -> 'msg -> unit Lwt.t ; } type 'msg t = { @@ -80,7 +80,7 @@ module Answerer = struct Lwt_utils.protect ~canceler:st.canceler begin fun () -> P2p_connection.read st.conn end >>= function - | Ok Bootstrap -> begin + | Ok (_, Bootstrap) -> begin st.callback.bootstrap () >>= function | [] -> worker_loop st @@ -93,13 +93,13 @@ module Answerer = struct Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit end - | Ok (Advertise points) -> + | Ok (_, Advertise points) -> st.callback.advertise points >>= fun () -> worker_loop st - | Ok (Message msg) -> - st.callback.message msg >>= fun () -> + | Ok (size, Message msg) -> + st.callback.message size msg >>= fun () -> worker_loop st - | Ok Disconnect | Error [P2p_io_scheduler.Connection_closed] -> + | Ok (_, Disconnect) | Error [P2p_io_scheduler.Connection_closed] -> Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit | Error [Lwt_utils.Canceled] -> @@ -181,7 +181,7 @@ and events = { and ('msg, 'meta) connection = { canceler : Canceler.t ; - messages : 'msg Lwt_pipe.t ; + messages : (int * 'msg) Lwt_pipe.t ; conn : 'msg Message.t P2p_connection.t ; gid_info : (('msg, 'meta) connection, 'meta) Gid_info.t ; point_info : ('msg, 'meta) connection Point_info.t option ; @@ -248,10 +248,13 @@ let active_connections pool = Gid.Table.length pool.connected_gids let create_connection pool conn id_point pi gi = let gid = Gid_info.gid gi in let canceler = Canceler.create () in - let messages = - Lwt_pipe.create ?size:pool.config.incoming_app_message_queue_size () in + let size = + map_option pool.config.incoming_app_message_queue_size + ~f:(fun qs -> qs, fun (size, _) -> (Sys.word_size / 8) * 11 + size) in + let messages = Lwt_pipe.create ?size () in let callback = - { Answerer.message = Lwt_pipe.push messages ; + { Answerer.message = + (fun size msg -> Lwt_pipe.push messages (size, msg)) ; advertise = register_new_points pool gid ; bootstrap = list_known_points pool gid ; } in @@ -471,7 +474,7 @@ let accept pool fd point = let read { messages } = Lwt.catch - (fun () -> Lwt_pipe.pop messages >>= return) + (fun () -> Lwt_pipe.pop messages >>= fun ( _, msg) -> return msg) (fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed) let is_readable { messages } = diff --git a/src/node/net/p2p_io_scheduler.ml b/src/node/net/p2p_io_scheduler.ml index 14c00b357..7b3fcd164 100644 --- a/src/node/net/p2p_io_scheduler.ml +++ b/src/node/net/p2p_io_scheduler.ml @@ -315,6 +315,13 @@ let create exception Closed +let read_size = function + | Ok buf -> (Sys.word_size / 8) * 8 + MBytes.length buf + | Error _ -> 0 (* we push Error only when we close the socket, + we don't fear memory leaks in that case... *) + +let write_size mbytes = (Sys.word_size / 8) * 6 + MBytes.length mbytes + let register = let cpt = ref 0 in fun st conn -> @@ -324,8 +331,12 @@ let register = end else begin let id = incr cpt; !cpt in let canceler = Canceler.create () in - let read_queue = Lwt_pipe.create ?size:st.read_queue_size () - and write_queue = Lwt_pipe.create ?size:st.write_queue_size () in + let read_size = + map_option st.read_queue_size ~f:(fun v -> v, read_size) in + let write_size = + map_option st.write_queue_size ~f:(fun v -> v, write_size) in + let read_queue = Lwt_pipe.create ?size:read_size () in + let write_queue = Lwt_pipe.create ?size:write_size () in let read_conn = ReadScheduler.create_connection st.read_scheduler (conn, st.read_buffer_size) read_queue canceler id diff --git a/src/node/net/p2p_io_scheduler.mli b/src/node/net/p2p_io_scheduler.mli index 9e5d20139..f5641ff35 100644 --- a/src/node/net/p2p_io_scheduler.mli +++ b/src/node/net/p2p_io_scheduler.mli @@ -41,7 +41,7 @@ val create: (** [create ~max_upload_speed ~max_download_speed ~read_queue_size ~write_queue_size ()] is an IO scheduler with specified (global) max upload (resp. download) speed, and specified read - (resp. write) queue sizes for connections. *) + (resp. write) queue sizes (in bytes) for connections. *) val register: t -> Lwt_unix.file_descr -> connection (** [register sched fd] is a [connection] managed by [sched]. *) diff --git a/src/utils/lwt_pipe.ml b/src/utils/lwt_pipe.ml index e86293fb4..0fede0225 100644 --- a/src/utils/lwt_pipe.ml +++ b/src/utils/lwt_pipe.ml @@ -10,25 +10,30 @@ open Lwt.Infix type 'a t = - { queue : 'a Queue.t ; - size : int option ; + { queue : (int * 'a) Queue.t ; + mutable current_size : int ; + max_size : int ; + compute_size : ('a -> int) ; mutable closed : bool ; mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ; mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option ; empty: unit Lwt_condition.t ; - full: unit Lwt_condition.t ; - not_full : unit Lwt_condition.t ; } let create ?size () = + let max_size, compute_size = + match size with + | None -> max_int, (fun _ -> 0) + | Some (max_size, compute_size) -> + max_size, (fun e -> 4 * (Sys.word_size / 8) + compute_size e) in { queue = Queue.create () ; - size ; + current_size = 0 ; + max_size ; + compute_size ; closed = false ; push_waiter = None ; pop_waiter = None ; empty = Lwt_condition.create () ; - full = Lwt_condition.create () ; - not_full = Lwt_condition.create () ; } let notify_push q = @@ -61,49 +66,40 @@ let wait_pop q = q.pop_waiter <- Some (waiter, wakener) ; Lwt.protected waiter -let available_space { size } len = - match size with - | None -> true - | Some size -> len < size - let length { queue } = Queue.length queue let is_empty { queue } = Queue.is_empty queue -let is_full ({ queue } as q) = not (available_space q (Queue.length queue)) let rec empty q = if is_empty q then Lwt.return_unit else (Lwt_condition.wait q.empty >>= fun () -> empty q) -let rec full q = - if is_full q - then Lwt.return_unit - else (Lwt_condition.wait q.full >>= fun () -> full q) -let rec not_full q = - if not (is_empty q) - then Lwt.return_unit - else (Lwt_condition.wait q.not_full >>= fun () -> not_full q) exception Closed -let rec push ({ closed ; queue ; full } as q) elt = - let len = Queue.length queue in - if closed then Lwt.fail Closed - else if available_space q len then begin - Queue.push elt queue ; +let rec push ({ closed ; queue ; current_size ; + max_size ; compute_size} as q) elt = + let elt_size = compute_size elt in + if closed then + Lwt.fail Closed + else if current_size + elt_size < max_size || Queue.is_empty queue then begin + Queue.push (elt_size, elt) queue ; + q.current_size <- current_size + elt_size ; notify_push q ; - (if not (available_space q (len + 1)) then Lwt_condition.signal full ()); Lwt.return_unit end else wait_pop q >>= fun () -> push q elt -let rec push_now ({ closed ; queue ; full } as q) elt = +let rec push_now ({ closed ; queue ; compute_size ; + current_size ; max_size + } as q) elt = if closed then raise Closed ; - let len = Queue.length queue in - available_space q len && begin - Queue.push elt queue ; + let elt_size = compute_size elt in + (current_size + elt_size < max_size || Queue.is_empty queue) + && begin + Queue.push (elt_size, elt) queue ; + q.current_size <- current_size + elt_size ; notify_push q ; - (if not (available_space q (len + 1)) then Lwt_condition.signal full ()) ; true end @@ -112,27 +108,11 @@ exception Full let push_now_exn q elt = if not (push_now q elt) then raise Full -let rec pop_all ({ closed ; queue ; empty ; not_full } as q) = - let was_full = is_full q in +let rec pop ({ closed ; queue ; empty ; current_size } as q) = if not (Queue.is_empty queue) then - let queue_copy = Queue.copy queue in - Queue.clear queue; + let (elt_size, elt) = Queue.pop queue in notify_pop q ; - (if was_full then Lwt_condition.signal not_full ()); - Lwt_condition.signal empty (); - Lwt.return queue_copy - else if closed then - Lwt.fail Closed - else - wait_push q >>= fun () -> - pop_all q - -let rec pop ({ closed ; queue ; empty ; not_full } as q) = - let was_full = is_full q in - if not (Queue.is_empty queue) then - let elt = Queue.pop queue in - notify_pop q ; - (if was_full then Lwt_condition.signal not_full ()); + q.current_size <- current_size - elt_size ; (if Queue.length queue = 0 then Lwt_condition.signal empty ()); Lwt.return elt else if closed then @@ -143,7 +123,7 @@ let rec pop ({ closed ; queue ; empty ; not_full } as q) = let rec peek ({ closed ; queue } as q) = if not (Queue.is_empty queue) then - let elt = Queue.peek queue in + let (_elt_size, elt) = Queue.peek queue in Lwt.return elt else if closed then Lwt.fail Closed @@ -153,28 +133,15 @@ let rec peek ({ closed ; queue } as q) = exception Empty -let pop_now_exn ({ closed ; queue ; empty ; not_full } as q) = - let was_full = is_full q in +let pop_now_exn ({ closed ; queue ; empty ; current_size } as q) = if Queue.is_empty queue then (if closed then raise Closed else raise Empty) ; - let elt = Queue.pop queue in - (if was_full then Lwt_condition.signal not_full ()); + let (elt_size, elt) = Queue.pop queue in (if Queue.length queue = 0 then Lwt_condition.signal empty ()); + q.current_size <- current_size - elt_size ; notify_pop q ; elt -let pop_all_now ({ closed ; queue ; empty ; not_full } as q) = - let was_empty = is_empty q in - let was_full = is_full q in - if Queue.is_empty queue then - (if closed then raise Closed else raise Empty) ; - let queue_copy = Queue.copy queue in - Queue.clear queue ; - (if was_full then Lwt_condition.signal not_full ()); - (if not was_empty then Lwt_condition.signal empty ()); - notify_pop q ; - queue_copy - let pop_now q = match pop_now_exn q with | exception Empty -> None @@ -195,7 +162,6 @@ let close q = q.closed <- true ; notify_push q ; notify_pop q ; - Lwt_condition.broadcast_exn q.full Closed ; end let rec iter q ~f = diff --git a/src/utils/lwt_pipe.mli b/src/utils/lwt_pipe.mli index 8b282f36a..cbeb995a5 100644 --- a/src/utils/lwt_pipe.mli +++ b/src/utils/lwt_pipe.mli @@ -14,18 +14,15 @@ type 'a t (** Type of queues holding values of type ['a]. *) -val create : ?size:int -> unit -> 'a t -(** [create ~size] is an empty queue that can hold max [size] - elements. *) +val create : ?size:(int * ('a -> int)) -> unit -> 'a t +(** [create ~size:(max_size, compute_size)] is an empty queue that can + hold max [size] bytes of data, using [compute_size] to compute the + size of a datum. *) val push : 'a t -> 'a -> unit Lwt.t (** [push q v] is a thread that blocks while [q] contains more than [size] elements, then adds [v] at the end of [q]. *) -val pop_all : 'a t -> 'a Queue.t Lwt.t -(** [pop' q] is a thread that returns all elements in [q] or waits - till there is at least one element in [q]. *) - val pop : 'a t -> 'a Lwt.t (** [pop q] is a thread that blocks while [q] is empty, then removes and returns the first element in [q]. *) @@ -48,10 +45,6 @@ val push_now_exn : 'a t -> 'a -> unit (** [push_now q v] adds [v] at the ends of [q] immediately or raise [Full] if [q] is currently full. *) -val pop_all_now : 'a t -> 'a Queue.t -(** [pop_all_now q] is a copy of [q]'s internal queue, that may be - empty. *) - val pop_now : 'a t -> 'a option (** [pop_now q] maybe removes and returns the first element in [q] if [q] contains at least one element. *) @@ -68,18 +61,9 @@ val length : 'a t -> int val is_empty : 'a t -> bool (** [is_empty q] is [true] if [q] is empty, [false] otherwise. *) -val is_full : 'a t -> bool -(** [is_full q] is [true] if [q] is full, [false] otherwise. *) - val empty : 'a t -> unit Lwt.t (** [empty q] returns when [q] becomes empty. *) -val full : 'a t -> unit Lwt.t -(** [full q] returns when [q] becomes full. *) - -val not_full : 'a t -> unit Lwt.t -(** [not_full q] returns when [q] stop being full. *) - val iter : 'a t -> f:('a -> unit Lwt.t) -> unit Lwt.t (** [iter q ~f] pops all elements of [q] and applies [f] on them. *) diff --git a/test/test_p2p_connection.ml b/test/test_p2p_connection.ml index c0d6dbf19..bf041db8a 100644 --- a/test/test_p2p_connection.ml +++ b/test/test_p2p_connection.ml @@ -161,14 +161,14 @@ let client addr port = (* let's exchange a simple message. *) connect sched addr port id2 >>=? fun auth_fd -> P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.read conn >>=? fun msg -> + P2p_connection.read conn >>=? fun (msg_size, msg) -> assert (MBytes.compare simple_msg msg = 0) ; P2p_connection.close conn >>= fun _stat -> lwt_log_notice "Simple OK" >>= fun () -> (* let's detect a closed connection on `read`. *) connect sched addr port id2 >>=? fun auth_fd -> P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.read conn >>=? fun msg -> + P2p_connection.read conn >>=? fun (msg_size, msg) -> assert (MBytes.compare simple_msg msg = 0) ; P2p_connection.read conn >>= fun msg -> assert (is_connection_closed msg) ; diff --git a/test/test_p2p_io_scheduler.ml b/test/test_p2p_io_scheduler.ml index 0db147c3d..2319964a7 100644 --- a/test/test_p2p_io_scheduler.ml +++ b/test/test_p2p_io_scheduler.ml @@ -170,8 +170,8 @@ let max_download_speed = ref None let max_upload_speed = ref None let read_buffer_size = ref (1 lsl 14) -let read_queue_size = ref (Some 1) -let write_queue_size = ref (Some 1) +let read_queue_size = ref (Some (1 lsl 14)) +let write_queue_size = ref (Some (1 lsl 14)) let delay = ref 60. let clients = ref 8