From 1853889637a3121dd97daf412b6a94d24df53179 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Tue, 19 Feb 2019 16:28:12 +0100 Subject: [PATCH] P2p: improve `P2p_socket` There is no notion of unauthenticated connection, since the function `authenticate` is immediately called on a `P2p_io_scheduler.connection` and returns an authenticated connection, or nothing. So, we only deal with authenticated connections. The identifier of a connection is the same one as the one of the `P2p_io_scheduler.connection` underneath. --- src/lib_p2p/p2p_socket.ml | 91 +++++++++++++++++++++----------------- src/lib_p2p/p2p_socket.mli | 16 +++---- 2 files changed, 59 insertions(+), 48 deletions(-) diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index e838a54ea..fe4f4d5e3 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -29,9 +29,20 @@ include Logging.Make(struct let name = "p2p.connection" end) module Crypto = struct + (* maximal size of the buffer *) let bufsize = 1 lsl 16 - 1 let header_length = 2 - let max_content_length = bufsize - header_length - Crypto_box.boxzerobytes + let max_content_length = bufsize - Crypto_box.zerobytes + + (* The header length is only stored in the encrypted message, but + within the space allowed by boxzerobytes, so it does not cost in + space in the buffer. *) + let max_encrypted_length = bufsize - Crypto_box.boxzerobytes + + (* The size of extra data added by encryption. *) + let boxextrabytes = Crypto_box.zerobytes - Crypto_box.boxzerobytes + (* The number of bytes added by encryption + header *) + let extrabytes = header_length + boxextrabytes type data = { channel_key : Crypto_box.channel_key ; @@ -39,28 +50,35 @@ module Crypto = struct mutable remote_nonce : Crypto_box.nonce ; } + (* We do the following assumptions on the NaCl library. Note that + we also make the assumption, here, that the NaCl library allows + in-place boxing and unboxing, since we use the same buffer for + input and output. *) + let () = assert (Crypto_box.boxzerobytes >= header_length) + let write_chunk fd cryptobox_data msg = let msglen = MBytes.length msg in fail_unless (msglen <= max_content_length) P2p_errors.Invalid_message_size >>=? fun () -> - let buf = MBytes.make (msglen + Crypto_box.zerobytes) '\x00' in + let buf_length = msglen + Crypto_box.zerobytes in + let buf = MBytes.make buf_length '\x00' in MBytes.blit msg 0 buf Crypto_box.zerobytes msglen ; let local_nonce = cryptobox_data.local_nonce in cryptobox_data.local_nonce <- Crypto_box.increment_nonce local_nonce ; Crypto_box.fast_box_noalloc cryptobox_data.channel_key local_nonce buf ; - let encrypted_length = msglen + Crypto_box.boxzerobytes in - MBytes.set_int16 buf - (Crypto_box.boxzerobytes - header_length) encrypted_length ; - let payload = MBytes.sub buf (Crypto_box.boxzerobytes - header_length) - (header_length + encrypted_length) in + let encrypted_length = buf_length - Crypto_box.boxzerobytes in + let header_pos = Crypto_box.boxzerobytes - header_length in + MBytes.set_int16 buf header_pos encrypted_length ; + let payload = MBytes.sub buf header_pos (buf_length - header_pos) in P2p_io_scheduler.write fd payload let read_chunk fd cryptobox_data = let header_buf = MBytes.create header_length in P2p_io_scheduler.read_full ~len:header_length fd header_buf >>=? fun () -> let encrypted_length = MBytes.get_uint16 header_buf 0 in - let buf = MBytes.make (encrypted_length + Crypto_box.boxzerobytes) '\x00' in + let buf_length = encrypted_length + Crypto_box.boxzerobytes in + let buf = MBytes.make buf_length '\x00' in P2p_io_scheduler.read_full ~pos:Crypto_box.boxzerobytes ~len:encrypted_length fd buf >>=? fun () -> let remote_nonce = cryptobox_data.remote_nonce in @@ -73,18 +91,22 @@ module Crypto = struct fail P2p_errors.Decipher_error | true -> return (MBytes.sub buf Crypto_box.zerobytes - (encrypted_length - Crypto_box.boxzerobytes)) + (buf_length - Crypto_box.zerobytes)) end +(* Note: there is an inconsistency here, since we display an error in + bytes, whereas the option is set in kbytes. Also, since the default + size is 64kB-1, it is actually impossible to set the default + size using the option (the max is 63 kB). *) let check_binary_chunks_size size = - let value = size - Crypto_box.boxzerobytes - Crypto.header_length in + let value = size - Crypto.extrabytes in fail_unless (value > 0 && value <= Crypto.max_content_length) (P2p_errors.Invalid_chunks_size { value = size ; - min = Crypto.(header_length + Crypto_box.boxzerobytes + 1) ; + min = Crypto.extrabytes + 1 ; max = Crypto.bufsize ; }) @@ -184,9 +206,9 @@ module Metadata = struct let read metadata_config fd cryptobox_data = Crypto.read_chunk fd cryptobox_data >>=? fun buf -> let length = MBytes.length buf in + let encoding = metadata_config.conn_meta_encoding in match - Data_encoding.Binary.read - metadata_config.conn_meta_encoding buf 0 length + Data_encoding.Binary.read encoding buf 0 length with | None -> fail P2p_errors.Decoding_error @@ -226,7 +248,7 @@ module Ack = struct nack_case (Tag 255) ; ] - let write cryptobox_data fd message = + let write fd cryptobox_data message = let encoded_message_len = Data_encoding.Binary.length encoding message in let buf = MBytes.create encoded_message_len in @@ -236,7 +258,7 @@ module Ack = struct | Some last -> fail_unless (last = encoded_message_len) P2p_errors.Encoding_error >>=? fun () -> - Crypto.write_chunk cryptobox_data fd buf + Crypto.write_chunk fd cryptobox_data buf let read fd cryptobox_data = Crypto.read_chunk fd cryptobox_data >>=? fun buf -> @@ -252,9 +274,9 @@ module Ack = struct end -type 'conn_meta authenticated_fd = { +type 'meta authenticated_connection = { fd: P2p_io_scheduler.connection ; - info: 'conn_meta P2p_connection.Info.t ; + info: 'meta P2p_connection.Info.t ; cryptobox_data: Crypto.data ; } @@ -308,22 +330,11 @@ let authenticate } in return (info, { fd ; info ; cryptobox_data }) -type 'meta connection = { - id : int ; - info : 'meta P2p_connection.Info.t ; - fd : P2p_io_scheduler.connection ; - cryptobox_data : Crypto.data ; -} - -let next_conn_id = - let cpt = ref 0 in - fun () -> incr cpt ;!cpt - module Reader = struct type ('msg, 'meta) t = { canceler: Lwt_canceler.t ; - conn: 'meta connection ; + conn: 'meta authenticated_connection ; encoding: 'msg Data_encoding.t ; messages: (int * 'msg) tzresult Lwt_pipe.t ; mutable worker: unit Lwt.t ; @@ -410,7 +421,7 @@ module Writer = struct type ('msg, 'meta) t = { canceler: Lwt_canceler.t ; - conn: 'meta connection ; + conn: 'meta authenticated_connection ; encoding: 'msg Data_encoding.t ; messages: (MBytes.t list * unit tzresult Lwt.u option) Lwt_pipe.t ; mutable worker: unit Lwt.t ; @@ -487,7 +498,7 @@ module Writer = struct match binary_chunks_size with | None -> Crypto.max_content_length | Some size -> - let size = size - Crypto_box.boxzerobytes - Crypto.header_length in + let size = size - Crypto.extrabytes in assert (size > 0) ; assert (size <= Crypto.max_content_length) ; size @@ -533,12 +544,13 @@ module Writer = struct end type ('msg, 'meta) t = { - conn : 'meta connection ; + conn : 'meta authenticated_connection ; reader : ('msg, 'meta) Reader.t ; writer : ('msg, 'meta) Writer.t ; } -let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2 +let equal { conn = { fd = fd2 } } { conn = { fd = fd1 } } = + P2p_io_scheduler.id fd1 = P2p_io_scheduler.id fd2 let pp ppf { conn } = P2p_connection.Info.pp (fun _ _ -> ()) ppf conn.info let info { conn } = conn.info @@ -549,13 +561,13 @@ let private_node { conn } = conn.info.private_node let accept ?incoming_message_queue_size ?outgoing_message_queue_size ?binary_chunks_size - ({ fd ; info ; cryptobox_data } : 'meta authenticated_fd) + conn encoding = protect begin fun () -> - Ack.write fd cryptobox_data Ack >>=? fun () -> - Ack.read fd cryptobox_data + Ack.write conn.fd conn.cryptobox_data Ack >>=? fun () -> + Ack.read conn.fd conn.cryptobox_data end ~on_error:begin fun err -> - P2p_io_scheduler.close fd >>= fun _ -> + P2p_io_scheduler.close conn.fd >>= fun _ -> match err with | [ P2p_errors.Connection_closed ] -> fail P2p_errors.Rejected_socket_connection | [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth @@ -563,7 +575,6 @@ let accept end >>=? function | Ack -> let canceler = Lwt_canceler.create () in - let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in let reader = Reader.run ?size:incoming_message_queue_size conn encoding canceler and writer = @@ -573,7 +584,7 @@ let accept in let conn = { conn ; reader ; writer } in Lwt_canceler.on_cancel canceler begin fun () -> - P2p_io_scheduler.close fd >>= fun _ -> + P2p_io_scheduler.close conn.conn.fd >>= fun _ -> Lwt.return_unit end ; return conn @@ -605,7 +616,7 @@ let write_sync { writer ; conn } msg = catch_closed_pipe begin fun () -> let waiter, wakener = Lwt.wait () in debug "Sending message to %a: %a" - P2p_peer.Id.pp_short conn.info.peer_id (pp_json writer.encoding) msg ; + P2p_peer.Id.pp_short conn.info.peer_id ( pp_json writer.encoding ) msg ; Lwt.return (Writer.encode_message writer msg) >>=? fun buf -> Lwt_pipe.push writer.messages (buf, Some wakener) >>= fun () -> waiter diff --git a/src/lib_p2p/p2p_socket.mli b/src/lib_p2p/p2p_socket.mli index ff1fe8ad0..755b32fe6 100644 --- a/src/lib_p2p/p2p_socket.mli +++ b/src/lib_p2p/p2p_socket.mli @@ -42,7 +42,7 @@ type 'meta metadata_config = { } (** Type for the parameter negotiation mechanism. *) -type 'meta authenticated_fd +type 'meta authenticated_connection (** Type of a connection that successfully passed the authentication phase, but has not been accepted yet. Parametrized by the type of expected parameter in the `ack` message. *) @@ -68,27 +68,27 @@ val authenticate: ?listening_port: int -> P2p_identity.t -> P2p_version.t list -> 'meta metadata_config -> - ('meta P2p_connection.Info.t * 'meta authenticated_fd) tzresult Lwt.t + ('meta P2p_connection.Info.t * 'meta authenticated_connection) tzresult Lwt.t (** (Low-level) (Cancelable) Authentication function of a remote peer. Used in [P2p_connection_pool], to promote a - [P2P_io_scheduler.connection] into an [authenticated_fd] (auth + [P2P_io_scheduler.connection] into an [authenticated_connection] (auth correct, acceptation undecided). *) -val kick: 'meta authenticated_fd -> unit Lwt.t +val kick: 'meta authenticated_connection -> unit Lwt.t (** (Low-level) (Cancelable) [kick afd] notifies the remote peer that we refuse this connection and then closes [afd]. Used in - [P2p_connection_pool] to reject an [aunthenticated_fd] which we do + [P2p_connection_pool] to reject an [authenticated_connection] which we do not want to connect to for some reason. *) val accept: ?incoming_message_queue_size:int -> ?outgoing_message_queue_size:int -> ?binary_chunks_size: int -> - 'meta authenticated_fd -> + 'meta authenticated_connection -> 'msg Data_encoding.t -> ('msg, 'meta) t tzresult Lwt.t (** (Low-level) (Cancelable) Accepts a remote peer given an - authenticated_fd. Used in [P2p_connection_pool], to promote an - [authenticated_fd] to the status of an active peer. *) + authenticated_connection. Used in [P2p_connection_pool], to promote an + [authenticated_connection] to the status of an active peer. *) val check_binary_chunks_size: int -> unit tzresult Lwt.t (** Precheck for the [?binary_chunks_size] parameter of [accept]. *)