P2p: improve P2p_socket
There is no notion of unauthenticated connection, since the function `authenticate` is immediately called on a `P2p_io_scheduler.connection` and returns an authenticated connection, or nothing. So, we only deal with authenticated connections. The identifier of a connection is the same one as the one of the `P2p_io_scheduler.connection` underneath.
@ -29,9 +29,20 @@ include Logging.Make(struct let name = "p2p.connection" end)
module Crypto = struct
(* maximal size of the buffer *)
let bufsize = 1 lsl 16 - 1
let header_length = 2
let max_content_length = bufsize - header_length - Crypto_box.boxzerobytes
let max_content_length = bufsize - Crypto_box.zerobytes
(* The header length is only stored in the encrypted message, but
within the space allowed by boxzerobytes, so it does not cost in
space in the buffer. *)
let max_encrypted_length = bufsize - Crypto_box.boxzerobytes
(* The size of extra data added by encryption. *)
let boxextrabytes = Crypto_box.zerobytes - Crypto_box.boxzerobytes
(* The number of bytes added by encryption + header *)
let extrabytes = header_length + boxextrabytes
type data = {
channel_key : Crypto_box.channel_key ;
@ -39,28 +50,35 @@ module Crypto = struct
mutable remote_nonce : Crypto_box.nonce ;
(* We do the following assumptions on the NaCl library. Note that
we also make the assumption, here, that the NaCl library allows
in-place boxing and unboxing, since we use the same buffer for
input and output. *)
let () = assert (Crypto_box.boxzerobytes >= header_length)
let write_chunk fd cryptobox_data msg =
let msglen = MBytes.length msg in
(msglen <= max_content_length) P2p_errors.Invalid_message_size >>=? fun () ->
let buf = MBytes.make (msglen + Crypto_box.zerobytes) '\x00' in
let buf_length = msglen + Crypto_box.zerobytes in
let buf = MBytes.make buf_length '\x00' in
MBytes.blit msg 0 buf Crypto_box.zerobytes msglen ;
let local_nonce = cryptobox_data.local_nonce in
cryptobox_data.local_nonce <- Crypto_box.increment_nonce local_nonce ;
cryptobox_data.channel_key local_nonce buf ;
let encrypted_length = msglen + Crypto_box.boxzerobytes in
MBytes.set_int16 buf
(Crypto_box.boxzerobytes - header_length) encrypted_length ;
let payload = MBytes.sub buf (Crypto_box.boxzerobytes - header_length)
(header_length + encrypted_length) in
let encrypted_length = buf_length - Crypto_box.boxzerobytes in
let header_pos = Crypto_box.boxzerobytes - header_length in
MBytes.set_int16 buf header_pos encrypted_length ;
let payload = MBytes.sub buf header_pos (buf_length - header_pos) in
P2p_io_scheduler.write fd payload
let read_chunk fd cryptobox_data =
let header_buf = MBytes.create header_length in
P2p_io_scheduler.read_full ~len:header_length fd header_buf >>=? fun () ->
let encrypted_length = MBytes.get_uint16 header_buf 0 in
let buf = MBytes.make (encrypted_length + Crypto_box.boxzerobytes) '\x00' in
let buf_length = encrypted_length + Crypto_box.boxzerobytes in
let buf = MBytes.make buf_length '\x00' in
~pos:Crypto_box.boxzerobytes ~len:encrypted_length fd buf >>=? fun () ->
let remote_nonce = cryptobox_data.remote_nonce in
@ -73,18 +91,22 @@ module Crypto = struct
fail P2p_errors.Decipher_error
| true ->
return (MBytes.sub buf Crypto_box.zerobytes
(encrypted_length - Crypto_box.boxzerobytes))
(buf_length - Crypto_box.zerobytes))
(* Note: there is an inconsistency here, since we display an error in
bytes, whereas the option is set in kbytes. Also, since the default
size is 64kB-1, it is actually impossible to set the default
size using the option (the max is 63 kB). *)
let check_binary_chunks_size size =
let value = size - Crypto_box.boxzerobytes - Crypto.header_length in
let value = size - Crypto.extrabytes in
(value > 0 &&
value <= Crypto.max_content_length)
{ value = size ;
min = Crypto.(header_length + Crypto_box.boxzerobytes + 1) ;
min = Crypto.extrabytes + 1 ;
max = Crypto.bufsize ;
@ -184,9 +206,9 @@ module Metadata = struct
let read metadata_config fd cryptobox_data =
Crypto.read_chunk fd cryptobox_data >>=? fun buf ->
let length = MBytes.length buf in
let encoding = metadata_config.conn_meta_encoding in
metadata_config.conn_meta_encoding buf 0 length
Data_encoding.Binary.read encoding buf 0 length
| None ->
fail P2p_errors.Decoding_error
@ -226,7 +248,7 @@ module Ack = struct
nack_case (Tag 255) ;
let write cryptobox_data fd message =
let write fd cryptobox_data message =
let encoded_message_len =
Data_encoding.Binary.length encoding message in
let buf = MBytes.create encoded_message_len in
@ -236,7 +258,7 @@ module Ack = struct
| Some last ->
fail_unless (last = encoded_message_len)
P2p_errors.Encoding_error >>=? fun () ->
Crypto.write_chunk cryptobox_data fd buf
Crypto.write_chunk fd cryptobox_data buf
let read fd cryptobox_data =
Crypto.read_chunk fd cryptobox_data >>=? fun buf ->
@ -252,9 +274,9 @@ module Ack = struct
type 'conn_meta authenticated_fd = {
type 'meta authenticated_connection = {
fd: P2p_io_scheduler.connection ;
info: 'conn_meta P2p_connection.Info.t ;
info: 'meta P2p_connection.Info.t ;
cryptobox_data: Crypto.data ;
@ -308,22 +330,11 @@ let authenticate
} in
return (info, { fd ; info ; cryptobox_data })
type 'meta connection = {
id : int ;
info : 'meta P2p_connection.Info.t ;
fd : P2p_io_scheduler.connection ;
cryptobox_data : Crypto.data ;
let next_conn_id =
let cpt = ref 0 in
fun () -> incr cpt ;!cpt
module Reader = struct
type ('msg, 'meta) t = {
canceler: Lwt_canceler.t ;
conn: 'meta connection ;
conn: 'meta authenticated_connection ;
encoding: 'msg Data_encoding.t ;
messages: (int * 'msg) tzresult Lwt_pipe.t ;
mutable worker: unit Lwt.t ;
@ -410,7 +421,7 @@ module Writer = struct
type ('msg, 'meta) t = {
canceler: Lwt_canceler.t ;
conn: 'meta connection ;
conn: 'meta authenticated_connection ;
encoding: 'msg Data_encoding.t ;
messages: (MBytes.t list * unit tzresult Lwt.u option) Lwt_pipe.t ;
mutable worker: unit Lwt.t ;
@ -487,7 +498,7 @@ module Writer = struct
match binary_chunks_size with
| None -> Crypto.max_content_length
| Some size ->
let size = size - Crypto_box.boxzerobytes - Crypto.header_length in
let size = size - Crypto.extrabytes in
assert (size > 0) ;
assert (size <= Crypto.max_content_length) ;
@ -533,12 +544,13 @@ module Writer = struct
type ('msg, 'meta) t = {
conn : 'meta connection ;
conn : 'meta authenticated_connection ;
reader : ('msg, 'meta) Reader.t ;
writer : ('msg, 'meta) Writer.t ;
let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2
let equal { conn = { fd = fd2 } } { conn = { fd = fd1 } } =
P2p_io_scheduler.id fd1 = P2p_io_scheduler.id fd2
let pp ppf { conn } = P2p_connection.Info.pp (fun _ _ -> ()) ppf conn.info
let info { conn } = conn.info
@ -549,13 +561,13 @@ let private_node { conn } = conn.info.private_node
let accept
?incoming_message_queue_size ?outgoing_message_queue_size
({ fd ; info ; cryptobox_data } : 'meta authenticated_fd)
encoding =
protect begin fun () ->
Ack.write fd cryptobox_data Ack >>=? fun () ->
Ack.read fd cryptobox_data
Ack.write conn.fd conn.cryptobox_data Ack >>=? fun () ->
Ack.read conn.fd conn.cryptobox_data
end ~on_error:begin fun err ->
P2p_io_scheduler.close fd >>= fun _ ->
P2p_io_scheduler.close conn.fd >>= fun _ ->
match err with
| [ P2p_errors.Connection_closed ] -> fail P2p_errors.Rejected_socket_connection
| [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth
@ -563,7 +575,6 @@ let accept
end >>=? function
| Ack ->
let canceler = Lwt_canceler.create () in
let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in
let reader =
Reader.run ?size:incoming_message_queue_size conn encoding canceler
and writer =
@ -573,7 +584,7 @@ let accept
let conn = { conn ; reader ; writer } in
Lwt_canceler.on_cancel canceler begin fun () ->
P2p_io_scheduler.close fd >>= fun _ ->
P2p_io_scheduler.close conn.conn.fd >>= fun _ ->
end ;
return conn
@ -605,7 +616,7 @@ let write_sync { writer ; conn } msg =
catch_closed_pipe begin fun () ->
let waiter, wakener = Lwt.wait () in
debug "Sending message to %a: %a"
P2p_peer.Id.pp_short conn.info.peer_id (pp_json writer.encoding) msg ;
P2p_peer.Id.pp_short conn.info.peer_id ( pp_json writer.encoding ) msg ;
Lwt.return (Writer.encode_message writer msg) >>=? fun buf ->
Lwt_pipe.push writer.messages (buf, Some wakener) >>= fun () ->
@ -42,7 +42,7 @@ type 'meta metadata_config = {
(** Type for the parameter negotiation mechanism. *)
type 'meta authenticated_fd
type 'meta authenticated_connection
(** Type of a connection that successfully passed the authentication
phase, but has not been accepted yet. Parametrized by the type
of expected parameter in the `ack` message. *)
@ -68,27 +68,27 @@ val authenticate:
?listening_port: int ->
P2p_identity.t -> P2p_version.t list ->
'meta metadata_config ->
('meta P2p_connection.Info.t * 'meta authenticated_fd) tzresult Lwt.t
('meta P2p_connection.Info.t * 'meta authenticated_connection) tzresult Lwt.t
(** (Low-level) (Cancelable) Authentication function of a remote
peer. Used in [P2p_connection_pool], to promote a
[P2P_io_scheduler.connection] into an [authenticated_fd] (auth
[P2P_io_scheduler.connection] into an [authenticated_connection] (auth
correct, acceptation undecided). *)
val kick: 'meta authenticated_fd -> unit Lwt.t
val kick: 'meta authenticated_connection -> unit Lwt.t
(** (Low-level) (Cancelable) [kick afd] notifies the remote peer that
we refuse this connection and then closes [afd]. Used in
[P2p_connection_pool] to reject an [aunthenticated_fd] which we do
[P2p_connection_pool] to reject an [authenticated_connection] which we do
not want to connect to for some reason. *)
val accept:
?incoming_message_queue_size:int ->
?outgoing_message_queue_size:int ->
?binary_chunks_size: int ->
'meta authenticated_fd ->
'meta authenticated_connection ->
'msg Data_encoding.t -> ('msg, 'meta) t tzresult Lwt.t
(** (Low-level) (Cancelable) Accepts a remote peer given an
authenticated_fd. Used in [P2p_connection_pool], to promote an
[authenticated_fd] to the status of an active peer. *)
authenticated_connection. Used in [P2p_connection_pool], to promote an
[authenticated_connection] to the status of an active peer. *)
val check_binary_chunks_size: int -> unit tzresult Lwt.t
(** Precheck for the [?binary_chunks_size] parameter of [accept]. *)
