From fbacfff9e604e13526fdb0368506f847f2cc566e Mon Sep 17 00:00:00 2001 From: OCamlPro-Iguernlala Date: Tue, 18 Apr 2017 18:32:31 +0200 Subject: [PATCH] P2p: use incremental encoding/decoding --- src/node/main/node_config_file.ml | 21 +++-- src/node/main/node_config_file.mli | 1 + src/node/main/node_shared_arg.ml | 19 +++- src/node/main/node_shared_arg.mli | 1 + src/node/net/p2p.ml | 10 +- src/node/net/p2p.mli | 4 + src/node/net/p2p_connection.ml | 132 ++++++++++++++++++++------- src/node/net/p2p_connection.mli | 5 + src/node/net/p2p_connection_pool.ml | 4 + src/node/net/p2p_connection_pool.mli | 6 +- test/p2p/test_p2p_connection.ml | 34 +++++++ test/p2p/test_p2p_connection_pool.ml | 1 + 12 files changed, 192 insertions(+), 46 deletions(-) diff --git a/src/node/main/node_config_file.ml b/src/node/main/node_config_file.ml index 1efe6637d..4c7f3fe84 100644 --- a/src/node/main/node_config_file.ml +++ b/src/node/main/node_config_file.ml @@ -73,6 +73,7 @@ let default_net_limits : P2p.limits = { max_known_points = Some (400, 300) ; max_known_peer_ids = Some (400, 300) ; swap_linger = 30. ; + binary_chunks_size = None ; } let default_net = { @@ -116,11 +117,12 @@ let limit : P2p.limits Data_encoding.t = incoming_message_queue_size ; outgoing_message_queue_size ; known_points_history_size ; known_peer_ids_history_size ; max_known_points ; max_known_peer_ids ; - swap_linger ; + swap_linger ; binary_chunks_size } -> ( ( authentification_timeout, min_connections, expected_connections, max_connections, backlog, max_incoming_connections, - max_download_speed, max_upload_speed, swap_linger) , + max_download_speed, max_upload_speed, swap_linger, + binary_chunks_size) , ( read_buffer_size, read_queue_size, write_queue_size, incoming_app_message_queue_size, incoming_message_queue_size, outgoing_message_queue_size, @@ -129,7 +131,8 @@ let limit : P2p.limits Data_encoding.t = ))) (fun ( ( authentification_timeout, min_connections, expected_connections, max_connections, backlog, max_incoming_connections, - max_download_speed, max_upload_speed, swap_linger) , + max_download_speed, max_upload_speed, swap_linger, + binary_chunks_size) , ( read_buffer_size, read_queue_size, write_queue_size, incoming_app_message_queue_size, incoming_message_queue_size, outgoing_message_queue_size, @@ -143,9 +146,11 @@ let limit : P2p.limits Data_encoding.t = incoming_app_message_queue_size ; incoming_message_queue_size ; outgoing_message_queue_size ; known_points_history_size ; known_peer_ids_history_size ; - max_known_points ; max_known_peer_ids ; swap_linger }) + max_known_points ; max_known_peer_ids ; swap_linger ; + binary_chunks_size + }) (merge_objs - (obj9 + (obj10 (dft "authentification-timeout" float default_net_limits.authentification_timeout) (dft "min-connections" uint16 @@ -160,7 +165,8 @@ let limit : P2p.limits Data_encoding.t = default_net_limits.max_incoming_connections) (opt "max-download-speed" int31) (opt "max-upload-speed" int31) - (dft "swap-linger" float default_net_limits.swap_linger)) + (dft "swap-linger" float default_net_limits.swap_linger) + (opt "binary-chunks-size" uint8)) (obj10 (dft "read-buffer-size" int31 default_net_limits.read_buffer_size) @@ -266,6 +272,7 @@ let update ?max_connections ?max_download_speed ?max_upload_speed + ?binary_chunks_size ?peer_table_size ?expected_pow ?bootstrap_peers @@ -308,6 +315,8 @@ let update max_known_peer_ids = Utils.first_some peer_table_size cfg.net.limits.max_known_peer_ids ; + binary_chunks_size = + Utils.map_option (fun x -> x lsl 10) binary_chunks_size ; } in let net : net = { expected_pow = diff --git a/src/node/main/node_config_file.mli b/src/node/main/node_config_file.mli index 487e3ffcc..0e391d51a 100644 --- a/src/node/main/node_config_file.mli +++ b/src/node/main/node_config_file.mli @@ -56,6 +56,7 @@ val update: ?max_connections:int -> ?max_download_speed:int -> ?max_upload_speed:int -> + ?binary_chunks_size:int-> ?peer_table_size:int -> ?expected_pow:float -> ?bootstrap_peers:string list -> diff --git a/src/node/main/node_shared_arg.ml b/src/node/main/node_shared_arg.ml index 99e5d1fb3..c1880bbec 100644 --- a/src/node/main/node_shared_arg.ml +++ b/src/node/main/node_shared_arg.ml @@ -20,6 +20,7 @@ type t = { max_connections: int option ; max_download_speed: int option ; max_upload_speed: int option ; + binary_chunks_size: int option ; peer_table_size: int option ; expected_pow: float option ; peers: string list ; @@ -35,7 +36,7 @@ type t = { let wrap data_dir config_file - connections max_download_speed max_upload_speed + connections max_download_speed max_upload_speed binary_chunks_size peer_table_size listen_addr peers no_bootstrap_peers closed expected_pow rpc_listen_addr rpc_tls @@ -66,6 +67,7 @@ let wrap max_connections ; max_download_speed ; max_upload_speed ; + binary_chunks_size ; expected_pow ; peers ; no_bootstrap_peers ; @@ -150,6 +152,14 @@ module Term = struct Arg.(value & opt (some int) None & info ~docs ~doc ~docv:"NUM" ["max-upload-speed"]) + let binary_chunks_size = + let doc = + Format.sprintf + "Size limit (in kB) of binary blocks that are sent to other peers." + in + Arg.(value & opt (some int) None & + info ~docs ~doc ~docv:"NUM" ["binary-chunks-size"]) + let peer_table_size = let doc = "Maximum size of internal peer tables, \ used to store metadata/logs about a peer or about a \ @@ -223,7 +233,7 @@ module Term = struct let open Term in const wrap $ data_dir $ config_file $ connections - $ max_download_speed $ max_upload_speed + $ max_download_speed $ max_upload_speed $ binary_chunks_size $ peer_table_size $ listen_addr $ peers $ no_bootstrap_peers $ closed $ expected_pow $ rpc_listen_addr $ rpc_tls @@ -241,7 +251,7 @@ let read_and_patch_config_file args = end >>=? fun cfg -> let { data_dir ; min_connections ; expected_connections ; max_connections ; - max_download_speed ; max_upload_speed ; + max_download_speed ; max_upload_speed ; binary_chunks_size ; peer_table_size ; expected_pow ; peers ; no_bootstrap_peers ; @@ -257,6 +267,7 @@ let read_and_patch_config_file args = return @@ Node_config_file.update ?data_dir ?min_connections ?expected_connections ?max_connections - ?max_download_speed ?max_upload_speed ?peer_table_size ?expected_pow + ?max_download_speed ?max_upload_speed ?binary_chunks_size + ?peer_table_size ?expected_pow ~bootstrap_peers ?listen_addr ?rpc_listen_addr ~closed ~cors_origins ~cors_headers ?rpc_tls ?log_output cfg diff --git a/src/node/main/node_shared_arg.mli b/src/node/main/node_shared_arg.mli index 574612dc6..c31c249fd 100644 --- a/src/node/main/node_shared_arg.mli +++ b/src/node/main/node_shared_arg.mli @@ -17,6 +17,7 @@ type t = { max_connections: int option ; max_download_speed: int option ; max_upload_speed: int option ; + binary_chunks_size: int option ; peer_table_size: int option ; expected_pow: float option ; peers: string list ; diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index e5a8033d0..c4c496ba9 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -69,6 +69,7 @@ type limits = { swap_linger : float ; + binary_chunks_size : int option ; } let create_scheduler limits = @@ -104,6 +105,7 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched = max_known_points = limits.max_known_points ; max_known_peer_ids = limits.max_known_peer_ids ; swap_linger = limits.swap_linger ; + binary_chunks_size = limits.binary_chunks_size ; } in let pool = @@ -365,7 +367,13 @@ let check_limits = fail_2 c.known_points_history_size "known-points-history-size" >>=? fun () -> fail_1 c.swap_linger - "swap-linger" + "swap-linger" >>=? fun () -> + begin + match c.binary_chunks_size with + | None -> return () + | Some size -> P2p_connection.check_binary_chunks_size size + end >>=? fun () -> + return () let create ~config ~limits meta_cfg msg_cfg = check_limits limits >>=? fun () -> diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index 5d78a85a7..265ca978a 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -127,6 +127,10 @@ type limits = { (** Peer swapping does not occur more than once during a timespan of [swap_linger] seconds. *) + binary_chunks_size : int option ; + (** Size (in bytes) of binary blocks that are sent to other + peers. Default value is 64 kB. Max value is 64kB. *) + } type ('msg, 'meta) t diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index 68a059932..42ea116b4 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -31,13 +31,14 @@ type error += Decoding_error type error += Myself of Id_point.t type error += Not_enough_proof_of_work of Peer_id.t type error += Invalid_auth +type error += Invalid_chunks_size of { value: int ; min: int ; max: int } module Crypto = struct let header_length = 2 let crypto_overhead = 18 (* FIXME import from Sodium.Box. *) let max_content_length = - 1 lsl (header_length * 8) - crypto_overhead + 1 lsl (header_length * 8) - crypto_overhead - header_length type data = { channel_key : Crypto_box.channel_key ; @@ -78,6 +79,17 @@ module Crypto = struct end +let check_binary_chunks_size size = + let value = size - Crypto.crypto_overhead - Crypto.header_length in + fail_unless + (value > 0 && + value <= Crypto.max_content_length) + (Invalid_chunks_size + { value = size ; + min = Crypto.(header_length + crypto_overhead + 1) ; + max = Crypto.(max_content_length + crypto_overhead + header_length) + }) + module Connection_message = struct type t = { @@ -226,33 +238,48 @@ module Reader = struct mutable worker: unit Lwt.t ; } - let rec read_message st buf = - return (Data_encoding.Binary.of_bytes st.encoding buf) + let rec read_message st init_mbytes = + let rec loop status = + Lwt_unix.yield () >>= fun () -> + let open Data_encoding.Binary in + match status with + | Success { res ; res_len ; remaining } -> + return (Some (res, res_len, remaining)) + | Error -> + lwt_debug "[read_message] incremental decoding error" >>= fun () -> + return None + | Await decode_next_buf -> + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Crypto.read_chunk st.conn.fd st.conn.cryptobox_data + end >>=? fun buf -> + lwt_debug + "reading %d bytes from %a" + (MBytes.length buf) Connection_info.pp st.conn.info >>= fun () -> + loop (decode_next_buf buf) in + loop + (Data_encoding.Binary.read_stream_of_bytes ~init:init_mbytes st.encoding) - let rec worker_loop st = + + let rec worker_loop st init_mbytes = Lwt_unix.yield () >>= fun () -> - Lwt_utils.protect ~canceler:st.canceler begin fun () -> - Crypto.read_chunk st.conn.fd st.conn.cryptobox_data >>=? fun buf -> - let size = 6 * (Sys.word_size / 8) + MBytes.length buf in - lwt_debug "reading %d bytes from %a" - size Connection_info.pp st.conn.info >>= fun () -> - read_message st buf >>=? fun msg -> + begin + read_message st init_mbytes >>=? fun msg -> match msg with | None -> Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () -> - return false - | Some msg -> + return None + | Some (msg, size, rem_mbytes) -> Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> - return true + return (Some rem_mbytes) end >>= function - | Ok true -> - worker_loop st - | Ok false -> + | Ok Some rem_mbytes -> + worker_loop st rem_mbytes + | Ok None -> Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> - lwt_debug "connection closed to %a" - Connection_info.pp st.conn.info >>= fun () -> + lwt_debug "connection closed to %a" + Connection_info.pp st.conn.info >>= fun () -> Lwt.return_unit | Error _ as err -> Lwt_pipe.safe_push_now st.messages err ; @@ -276,7 +303,7 @@ module Reader = struct end ; st.worker <- Lwt_utils.worker "reader" - (fun () -> worker_loop st) + (fun () -> worker_loop st []) (fun () -> Canceler.cancel st.canceler) ; st @@ -292,12 +319,25 @@ module Writer = struct canceler: Canceler.t ; conn: connection ; encoding: 'msg Data_encoding.t ; - messages: (MBytes.t * unit tzresult Lwt.u option) Lwt_pipe.t ; + messages: (MBytes.t list * unit tzresult Lwt.u option) Lwt_pipe.t ; mutable worker: unit Lwt.t ; + binary_chunks_size: int ; (* in bytes *) } + let rec send_message st buf = + let rec loop = function + | [] -> return () + | buf :: l -> + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf + end >>=? fun () -> + lwt_debug "writing %d bytes to %a" + (MBytes.length buf) Connection_info.pp st.conn.info >>= fun () -> + loop l in + loop buf + let encode_message st msg = - try ok (Data_encoding.Binary.to_bytes st.encoding msg) + try ok (Data_encoding.Binary.to_bytes_list st.binary_chunks_size st.encoding msg) with _ -> error Encoding_error let rec worker_loop st = @@ -316,11 +356,7 @@ module Writer = struct Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit | Ok (buf, wakener) -> - lwt_debug "writing %d bytes to %a" - (MBytes.length buf) Connection_info.pp st.conn.info >>= fun () -> - Lwt_utils.protect ~canceler:st.canceler begin fun () -> - Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf - end >>= fun res -> + send_message st buf >>= fun res -> match res with | Ok () -> iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ; @@ -348,16 +384,34 @@ module Writer = struct Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit - let run ?size conn encoding canceler = - let compute_size = function - | buf, None -> Sys.word_size + MBytes.length buf - | buf, Some _ -> 2 * Sys.word_size + MBytes.length buf + let run + ?size ?binary_chunks_size + conn encoding canceler = + let binary_chunks_size = + match binary_chunks_size with + | None -> Crypto.max_content_length + | Some size -> + let size = size - Crypto.crypto_overhead - Crypto.header_length in + assert (size > 0) ; + assert (size <= Crypto.max_content_length) ; + size + in + let compute_size = + let buf_list_size = + List.fold_left + (fun sz buf -> + sz + MBytes.length buf + 2 * Sys.word_size) 0 + in + function + | buf_l, None -> Sys.word_size + buf_list_size buf_l + | buf_l, Some _ -> 2 * Sys.word_size + buf_list_size buf_l in let size = map_option size ~f:(fun max -> max, compute_size) in let st = { canceler ; conn ; encoding ; messages = Lwt_pipe.create ?size () ; worker = Lwt.return_unit ; + binary_chunks_size = binary_chunks_size ; } in Canceler.on_cancel st.canceler begin fun () -> Lwt_pipe.close st.messages ; @@ -388,13 +442,12 @@ type 'msg t = { let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2 - let pp ppf { conn } = Connection_info.pp ppf conn.info let info { conn } = conn.info let accept ?incoming_message_queue_size ?outgoing_message_queue_size - (fd, info, cryptobox_data) encoding = + ?binary_chunks_size (fd, info, cryptobox_data) encoding = Lwt_utils.protect begin fun () -> Ack.write fd cryptobox_data Ack >>=? fun () -> Ack.read fd cryptobox_data @@ -407,11 +460,14 @@ let accept end >>=? fun accepted -> fail_unless accepted Rejected >>=? fun () -> let canceler = Canceler.create () in - let conn = { id = next_conn_id (); fd ; info ; cryptobox_data } in + let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in let reader = Reader.run ?size:incoming_message_queue_size conn encoding canceler and writer = - Writer.run ?size:outgoing_message_queue_size conn encoding canceler in + Writer.run + ?size:outgoing_message_queue_size ?binary_chunks_size + conn encoding canceler + in let conn = { conn ; reader ; writer } in Canceler.on_cancel canceler begin fun () -> P2p_io_scheduler.close fd >>= fun _ -> @@ -445,7 +501,15 @@ let write_now { writer } msg = try Ok (Lwt_pipe.push_now writer.messages (buf, None)) with Lwt_pipe.Closed -> Error [P2p_io_scheduler.Connection_closed] +let rec split_bytes size bytes = + if MBytes.length bytes <= size then + [bytes] + else + MBytes.sub bytes 0 size :: + split_bytes size (MBytes.sub bytes size (MBytes.length bytes - size)) + let raw_write_sync { writer } bytes = + let bytes = split_bytes writer.binary_chunks_size bytes in catch_closed_pipe begin fun () -> let waiter, wakener = Lwt.wait () in Lwt_pipe.push writer.messages (bytes, Some wakener) >>= fun () -> diff --git a/src/node/net/p2p_connection.mli b/src/node/net/p2p_connection.mli index 4ec769413..5d9a5cf25 100644 --- a/src/node/net/p2p_connection.mli +++ b/src/node/net/p2p_connection.mli @@ -27,6 +27,7 @@ type error += Rejected type error += Myself of Id_point.t type error += Not_enough_proof_of_work of Peer_id.t type error += Invalid_auth +type error += Invalid_chunks_size of { value: int ; min: int ; max: int } type authenticated_fd (** Type of a connection that successfully passed the authentication @@ -64,11 +65,15 @@ val kick: authenticated_fd -> unit Lwt.t val accept: ?incoming_message_queue_size:int -> ?outgoing_message_queue_size:int -> + ?binary_chunks_size: int -> authenticated_fd -> 'msg Data_encoding.t -> 'msg t tzresult Lwt.t (** (Low-level) (Cancelable) Accepts a remote peer given an authenticated_fd. Used in [P2p_connection_pool], to promote an [authenticated_fd] to the status of an active peer. *) +val check_binary_chunks_size: int -> unit tzresult Lwt.t +(** Precheck for the [?binary_chunks_size] parameter of [accept]. *) + (** {1 IO functions on connections} *) (** {2 Output functions} *) diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index 22e4564a9..bdb9c330e 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -40,6 +40,7 @@ module Message = struct let encoding msg_encoding = let open Data_encoding in + dynamic_size @@ union ~tag_size:`Uint16 ([ case ~tag:0x01 null (function Disconnect -> Some () | _ -> None) @@ -329,6 +330,8 @@ type config = { max_known_peer_ids : (int * int) option ; (* max, gc target *) swap_linger : float ; + + binary_chunks_size : int option ; } type 'meta meta_config = { @@ -835,6 +838,7 @@ and authenticate pool ?point_info canceler fd point = P2p_connection.accept ?incoming_message_queue_size:pool.config.incoming_message_queue_size ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size + ?binary_chunks_size:pool.config.binary_chunks_size auth_fd pool.encoding >>= fun conn -> lwt_debug "authenticate: %a -> Connected %a" Point.pp point diff --git a/src/node/net/p2p_connection_pool.mli b/src/node/net/p2p_connection_pool.mli index ce645156d..180dcd6af 100644 --- a/src/node/net/p2p_connection_pool.mli +++ b/src/node/net/p2p_connection_pool.mli @@ -113,6 +113,9 @@ type config = { (** Peer swapping does not occur more than once during a timespan of [spap_linger] seconds. *) + binary_chunks_size : int option ; + (** Size (in bytes) of binary blocks that are sent to other + peers. Default value is 64 kB. *) } type 'meta meta_config = { @@ -254,7 +257,8 @@ val write_sync: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t where [conn'] is the internal [P2p_connection.t] inside [conn]. *) (**/**) -val raw_write_sync: ('msg, 'meta) connection -> MBytes.t -> unit tzresult Lwt.t +val raw_write_sync: + ('msg, 'meta) connection -> MBytes.t -> unit tzresult Lwt.t (**/**) val write_now: ('msg, 'meta) connection -> 'msg -> bool tzresult diff --git a/test/p2p/test_p2p_connection.ml b/test/p2p/test_p2p_connection.ml index 3985ca683..7e541022a 100644 --- a/test/p2p/test_p2p_connection.ml +++ b/test/p2p/test_p2p_connection.ml @@ -235,6 +235,39 @@ module Simple_message = struct end +module Chunked_message = struct + + let encoding = Data_encoding.bytes + + let simple_msg = MBytes.create (1 lsl 8) + let simple_msg2 = MBytes.create (1 lsl 8) + + let server ch sched socket = + accept sched socket >>=? fun (_info, auth_fd) -> + P2p_connection.accept + ~binary_chunks_size:21 auth_fd encoding >>=? fun conn -> + P2p_connection.write_sync conn simple_msg >>=? fun () -> + P2p_connection.read conn >>=? fun (_msg_size, msg) -> + _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> + sync ch >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let client ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept + ~binary_chunks_size:21 auth_fd encoding >>=? fun conn -> + P2p_connection.write_sync conn simple_msg2 >>=? fun () -> + P2p_connection.read conn >>=? fun (_msg_size, msg) -> + _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> + sync ch >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let run _dir = run_nodes client server + +end + module Close_on_read = struct let encoding = Data_encoding.bytes @@ -346,6 +379,7 @@ let main () = "kick", Kick.run ; "kicked", Kicked.run ; "simple-message", Simple_message.run ; + "chunked-message", Chunked_message.run ; "close-on-read", Close_on_read.run ; "close-on-write", Close_on_write.run ; "garbled-data", Garbled_data.run ; diff --git a/test/p2p/test_p2p_connection_pool.ml b/test/p2p/test_p2p_connection_pool.ml index e1a693617..dfb297b0a 100644 --- a/test/p2p/test_p2p_connection_pool.ml +++ b/test/p2p/test_p2p_connection_pool.ml @@ -81,6 +81,7 @@ let detach_node f points n = max_known_points = None ; max_known_peer_ids = None ; swap_linger = 0. ; + binary_chunks_size = None } in Process.detach ~prefix:(Format.asprintf "%a: " Peer_id.pp_short identity.peer_id)