diff --git a/src/lib_base/p2p_connection.ml b/src/lib_base/p2p_connection.ml index 17edc0fb2..070e1c460 100644 --- a/src/lib_base/p2p_connection.ml +++ b/src/lib_base/p2p_connection.ml @@ -53,40 +53,50 @@ module Table = Hashtbl.Make (Id) module Info = struct - type t = { + type 'meta t = { incoming : bool ; peer_id : P2p_peer_id.t ; id_point : Id.t ; remote_socket_port : P2p_addr.port ; versions : P2p_version.t list ; + private_node : bool ; + remote_metadata : 'meta ; } - let encoding = + let encoding metadata_encoding = let open Data_encoding in conv - (fun { incoming ; peer_id ; id_point ; remote_socket_port ; versions } -> - (incoming, peer_id, id_point, remote_socket_port, versions)) - (fun (incoming, peer_id, id_point, remote_socket_port, versions) -> - { incoming ; peer_id ; id_point ; remote_socket_port ; versions }) - (obj5 + (fun { incoming ; peer_id ; id_point ; remote_socket_port ; + versions ; private_node ; remote_metadata } -> + (incoming, peer_id, id_point, remote_socket_port, + versions, private_node, remote_metadata)) + (fun (incoming, peer_id, id_point, remote_socket_port, + versions, private_node, remote_metadata) -> + { incoming ; peer_id ; id_point ; remote_socket_port ; + versions ; private_node ; remote_metadata }) + (obj7 (req "incoming" bool) (req "peer_id" P2p_peer_id.encoding) (req "id_point" Id.encoding) (req "remote_socket_port" uint16) - (req "versions" (list P2p_version.encoding))) + (req "versions" (list P2p_version.encoding)) + (req "private" bool) + (req "remote_metadata" metadata_encoding)) - let pp ppf + let pp pp_meta ppf { incoming ; id_point = (remote_addr, remote_port) ; - remote_socket_port ; peer_id ; versions } = + remote_socket_port ; peer_id ; versions ; private_node ; remote_metadata } = let version = List.hd versions in let point = match remote_port with | None -> remote_addr, remote_socket_port | Some port -> remote_addr, port in - Format.fprintf ppf "%s %a %a (%a)" + Format.fprintf ppf "%s %a %a (%a)%s%a" (if incoming then "↘" else "↗") P2p_peer_id.pp peer_id P2p_point.Id.pp point P2p_version.pp version + (if private_node then " private" else "") + pp_meta remote_metadata end diff --git a/src/lib_base/p2p_connection.mli b/src/lib_base/p2p_connection.mli index 1e7f84115..756d82763 100644 --- a/src/lib_base/p2p_connection.mli +++ b/src/lib_base/p2p_connection.mli @@ -31,16 +31,20 @@ module Table : Hashtbl.S with type key = Id.t (** Information about a connection *) module Info : sig - type t = { + type 'meta t = { incoming : bool; peer_id : P2p_peer_id.t; id_point : Id.t; remote_socket_port : P2p_addr.port; versions : P2p_version.t list ; + private_node : bool ; + remote_metadata : 'meta ; } - val pp : Format.formatter -> t -> unit - val encoding : t Data_encoding.t + val pp : + (Format.formatter -> 'meta -> unit) -> + Format.formatter -> 'meta t -> unit + val encoding : 'meta Data_encoding.t -> 'meta t Data_encoding.t end diff --git a/src/lib_client_commands/client_p2p_commands.ml b/src/lib_client_commands/client_p2p_commands.ml index 1f1f59e33..2369c63e7 100644 --- a/src/lib_client_commands/client_p2p_commands.ml +++ b/src/lib_client_commands/client_p2p_commands.ml @@ -24,6 +24,8 @@ let port_arg () = with Failure _ -> failwith "Invalid peer-to-peer port")) +let pp_connection_info ppf conn = P2p_connection.Info.pp (fun _ _ -> ()) ppf conn + let commands () = let open Clic in let addr_parameter = @@ -44,10 +46,10 @@ let commands () = let incoming, outgoing = List.partition (fun c -> c.P2p_connection.Info.incoming) conns in Lwt_list.iter_s begin fun conn -> - cctxt#message " %a" P2p_connection.Info.pp conn + cctxt#message " %a" pp_connection_info conn end incoming >>= fun () -> Lwt_list.iter_s begin fun conn -> - cctxt#message " %a" P2p_connection.Info.pp conn + cctxt#message " %a" pp_connection_info conn end outgoing >>= fun () -> cctxt#message "KNOWN PEERS" >>= fun () -> Lwt_list.iter_s begin fun (p, pi) -> diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index 6475ec3f3..bf985b86e 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -15,7 +15,7 @@ type 'peer_meta peer_meta_config = 'peer_meta P2p_pool.peer_meta_config = { score : 'peer_meta -> float ; } -type 'conn_meta conn_meta_config = 'conn_meta P2p_pool.conn_meta_config = { +type 'conn_meta conn_meta_config = 'conn_meta P2p_socket.metadata_config = { conn_meta_encoding : 'conn_meta Data_encoding.t ; conn_meta_value : P2p_peer.Id.t -> 'conn_meta ; private_node : 'conn_meta -> bool ; @@ -206,8 +206,8 @@ module Real = struct P2p_pool.disconnect ?wait conn let connection_info _net conn = P2p_pool.Connection.info conn - let connection_metadata _net conn = - P2p_pool.Connection.meta conn + let connection_remote_metadata _net conn = + P2p_pool.Connection.remote_metadata conn let connection_stat _net conn = P2p_pool.Connection.stat conn let global_stat { pool } () = @@ -307,12 +307,14 @@ module Fake = struct current_inflow = 0 ; current_outflow = 0 ; } - let connection_info = { + let connection_info faked_metadata = { P2p_connection.Info.incoming = false ; peer_id = id.peer_id ; id_point = (Ipaddr.V6.unspecified, None) ; remote_socket_port = 0 ; versions = [] ; + remote_metadata = faked_metadata ; + private_node = false ; } end @@ -329,8 +331,8 @@ type ('msg, 'peer_meta, 'conn_meta) t = { disconnect : ?wait:bool -> ('msg, 'peer_meta, 'conn_meta) connection -> unit Lwt.t ; connection_info : - ('msg, 'peer_meta, 'conn_meta) connection -> P2p_connection.Info.t ; - connection_metadata : + ('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta P2p_connection.Info.t ; + connection_remote_metadata : ('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta ; connection_stat : ('msg, 'peer_meta, 'conn_meta) connection -> P2p_stat.t ; global_stat : unit -> P2p_stat.t ; @@ -407,7 +409,7 @@ let create ~config ~limits peer_cfg conn_cfg msg_cfg = find_connection = Real.find_connection net ; disconnect = Real.disconnect ; connection_info = Real.connection_info net ; - connection_metadata = Real.connection_metadata net ; + connection_remote_metadata = Real.connection_remote_metadata net ; connection_stat = Real.connection_stat net ; global_stat = Real.global_stat net ; get_peer_metadata = Real.get_peer_metadata net ; @@ -423,7 +425,7 @@ let create ~config ~limits peer_cfg conn_cfg msg_cfg = on_new_connection = Real.on_new_connection net ; } -let faked_network peer_cfg = { +let faked_network peer_cfg faked_metadata = { versions = [] ; peer_id = Fake.id.peer_id ; maintain = Lwt.return ; @@ -432,8 +434,8 @@ let faked_network peer_cfg = { connections = (fun () -> []) ; find_connection = (fun _ -> None) ; disconnect = (fun ?wait:_ _ -> Lwt.return_unit) ; - connection_info = (fun _ -> Fake.connection_info) ; - connection_metadata = (fun _ -> assert false) ; + connection_info = (fun _ -> Fake.connection_info faked_metadata) ; + connection_remote_metadata = (fun _ -> faked_metadata) ; connection_stat = (fun _ -> Fake.empty_stat) ; global_stat = (fun () -> Fake.empty_stat) ; get_peer_metadata = (fun _ -> peer_cfg.peer_meta_initial) ; @@ -457,7 +459,7 @@ let connections net = net.connections () let disconnect net = net.disconnect let find_connection net = net.find_connection let connection_info net = net.connection_info -let connection_metadata net = net.connection_metadata +let connection_remote_metadata net = net.connection_remote_metadata let connection_stat net = net.connection_stat let global_stat net = net.global_stat () let get_peer_metadata net = net.get_peer_metadata @@ -524,7 +526,7 @@ let info_of_peer_info pool i = let meta_opt = match conn_opt with | None -> None - | Some conn -> Some (P2p_pool.Connection.meta conn) in + | Some conn -> Some (P2p_pool.Connection.remote_metadata conn) in P2p_peer_state.Info.{ score ; trusted = trusted i ; diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index 306b5b06c..9c21ccfb0 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -147,6 +147,7 @@ type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t nor open any listening socket *) val faked_network : 'peer_meta peer_meta_config -> + 'conn_meta -> ('msg, 'peer_meta, 'conn_meta) net (** Main network initialisation function *) @@ -185,8 +186,8 @@ val find_connection : val connection_info : ('msg, 'peer_meta, 'conn_meta) net -> ('msg, 'peer_meta, 'conn_meta) connection -> - P2p_connection.Info.t -val connection_metadata : + 'conn_meta P2p_connection.Info.t +val connection_remote_metadata : ('msg, 'peer_meta, 'conn_meta) net -> ('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index 04aae29c3..6520e5601 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -207,16 +207,10 @@ type 'msg message_config = { versions : P2p_version.t list; } -type 'conn_meta conn_meta_config = { - conn_meta_encoding : 'conn_meta Data_encoding.t ; - conn_meta_value : P2p_peer.Id.t -> 'conn_meta ; - private_node : 'conn_meta -> bool ; -} - type ('msg, 'peer_meta, 'conn_meta) t = { config : config ; peer_meta_config : 'peer_meta peer_meta_config ; - conn_meta_config : 'conn_meta conn_meta_config ; + conn_meta_config : 'conn_meta P2p_socket.metadata_config ; message_config : 'msg message_config ; my_id_points : unit P2p_point.Table.t ; known_peer_ids : @@ -602,8 +596,8 @@ module Connection = struct let info { conn } = P2p_socket.info conn - let meta { conn } = - P2p_socket.meta conn + let remote_metadata { conn } = + P2p_socket.remote_metadata conn let find_by_peer_id pool peer_id = Option.apply @@ -727,7 +721,7 @@ and authenticate pool ?point_info canceler fd point = ~incoming (P2p_io_scheduler.register pool.io_sched fd) point ?listening_port:pool.config.listening_port pool.config.identity pool.message_config.versions - pool.conn_meta_config.conn_meta_encoding + pool.conn_meta_config end ~on_error: begin fun err -> begin match err with | [ Canceled ] -> @@ -825,10 +819,7 @@ and authenticate pool ?point_info canceler fd point = ?incoming_message_queue_size:pool.config.incoming_message_queue_size ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size ?binary_chunks_size:pool.config.binary_chunks_size - ~private_node:pool.conn_meta_config.private_node - auth_fd - (pool.conn_meta_config.conn_meta_value info.peer_id) - pool.encoding >>=? fun conn -> + auth_fd pool.encoding >>=? fun conn -> lwt_debug "authenticate: %a -> Connected %a" P2p_point.Id.pp point P2p_peer.Id.pp info.peer_id >>= fun () -> @@ -936,16 +927,16 @@ and create_connection pool p2p_conn id_point point_info peer_info _version = messages ; canceler ; answerer ; wait_close = false ; last_sent_swap_request = None } in ignore (Lazy.force answerer) ; + let conn_meta = P2p_socket.remote_metadata p2p_conn in Option.iter point_info ~f:begin fun point_info -> let point = P2p_point_state.Info.point point_info in - let conn_meta = P2p_socket.meta p2p_conn in P2p_point_state.set_running ~known_private:(pool.conn_meta_config.private_node conn_meta) point_info peer_id conn; P2p_point.Table.add pool.connected_points point point_info ; end ; log pool (Connection_established (id_point, peer_id)) ; - P2p_peer_state.set_running peer_info id_point conn (P2p_socket.meta conn.conn) ; + P2p_peer_state.set_running peer_info id_point conn conn_meta ; P2p_peer.Table.add pool.connected_peer_ids peer_id peer_info ; Lwt_condition.broadcast pool.events.new_connection () ; Lwt_canceler.on_cancel canceler begin fun () -> diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index 7981de35c..a4673e3c0 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -131,12 +131,6 @@ type 'peer_meta peer_meta_config = { score : 'peer_meta -> float ; } -type 'conn_meta conn_meta_config = { - conn_meta_encoding : 'conn_meta Data_encoding.t ; - conn_meta_value : P2p_peer.Id.t -> 'conn_meta ; - private_node : 'conn_meta -> bool ; -} - type 'msg message_config = { encoding : 'msg encoding list ; versions : P2p_version.t list; @@ -145,7 +139,7 @@ type 'msg message_config = { val create: config -> 'peer_meta peer_meta_config -> - 'conn_meta conn_meta_config -> + 'conn_meta P2p_socket.metadata_config -> 'msg message_config -> P2p_io_scheduler.t -> ('msg, 'peer_meta,'conn_meta) pool Lwt.t @@ -224,8 +218,8 @@ val disconnect: module Connection : sig - val info: ('msg, 'peer_meta,'conn_meta) connection -> P2p_connection.Info.t - val meta: ('msg, 'peer_meta,'conn_meta) connection -> 'conn_meta + val info: ('msg, 'peer_meta,'conn_meta) connection -> 'conn_meta P2p_connection.Info.t + val remote_metadata: ('msg, 'peer_meta,'conn_meta) connection -> 'conn_meta val stat: ('msg, 'peer_meta,'conn_meta) connection -> P2p_stat.t (** [stat conn] is a snapshot of current bandwidth usage for diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index f4f3fe3a8..bcea2af46 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -7,15 +7,7 @@ (* *) (**************************************************************************) -(* TODO patch Data_encoding for continuation-based binary writer/reader. *) (* TODO test `close ~wait:true`. *) -(* TODO nothing in welcoming message proves that the incoming peer is - the owner of the public key... only the first message will - really proves it. Should this to be changed? Not really - important, but... an attacker might forge a random public key - with enough proof of work (hard task), open a connection, wait - infinitly. This would avoid the real peer to talk with us. And - this might also have an influence on its "score". *) include Logging.Make(struct let name = "p2p.connection" end) @@ -150,20 +142,60 @@ module Connection_message = struct end +type 'meta metadata_config = { + conn_meta_encoding : 'meta Data_encoding.t ; + conn_meta_value : P2p_peer.Id.t -> 'meta ; + private_node : 'meta -> bool ; +} + +module Metadata = struct + + let write metadata_config cryptobox_data fd message = + let encoded_message_len = + Data_encoding.Binary.length metadata_config.conn_meta_encoding message in + let buf = MBytes.create encoded_message_len in + match + Data_encoding.Binary.write + metadata_config.conn_meta_encoding message buf 0 encoded_message_len + with + | None -> + fail P2p_errors.Encoding_error + | Some last -> + fail_unless (last = encoded_message_len) + P2p_errors.Encoding_error >>=? fun () -> + Crypto.write_chunk cryptobox_data fd buf + + let read metadata_config fd cryptobox_data = + Crypto.read_chunk fd cryptobox_data >>=? fun buf -> + let length = MBytes.length buf in + match + Data_encoding.Binary.read + metadata_config.conn_meta_encoding buf 0 length + with + | None -> + fail P2p_errors.Decoding_error + | Some (read_len, message) -> + if read_len <> length then + fail P2p_errors.Decoding_error + else + return message + +end + module Ack = struct - type 'a t = Ack of 'a | Nack + type t = Ack | Nack - let encoding ack_encoding = + let encoding = let open Data_encoding in - let ack_encoding = obj1 (req "ack" ack_encoding) in + let ack_encoding = obj1 (req "ack" empty) in let nack_encoding = obj1 (req "nack" empty) in let ack_case tag = case tag ack_encoding (function - | Ack param -> Some param + | Ack -> Some () | _ -> None) - (fun param -> Ack param) in + (fun () -> Ack) in let nack_case tag = case tag nack_encoding (function @@ -173,11 +205,10 @@ module Ack = struct (fun _ -> Nack) in union [ ack_case (Tag 0) ; - nack_case (Tag 1) ; + nack_case (Tag 255) ; ] - let write ack_encoding cryptobox_data fd message = - let encoding = encoding ack_encoding in + let write cryptobox_data fd message = let encoded_message_len = Data_encoding.Binary.length encoding message in let buf = MBytes.create encoded_message_len in @@ -189,8 +220,7 @@ module Ack = struct P2p_errors.Encoding_error >>=? fun () -> Crypto.write_chunk cryptobox_data fd buf - let read ack_encoding fd cryptobox_data = - let encoding = encoding ack_encoding in + let read fd cryptobox_data = Crypto.read_chunk fd cryptobox_data >>=? fun buf -> let length = MBytes.length buf in match Data_encoding.Binary.read encoding buf 0 length with @@ -206,13 +236,12 @@ end type 'conn_meta authenticated_fd = { fd: P2p_io_scheduler.connection ; - info: P2p_connection.Info.t ; + info: 'conn_meta P2p_connection.Info.t ; cryptobox_data: Crypto.data ; - ack_encoding: 'conn_meta Data_encoding.t ; } -let kick { fd ; ack_encoding ; cryptobox_data ; _ } = - Ack.write ack_encoding fd cryptobox_data Nack >>= fun _ -> +let kick { fd ; cryptobox_data ; _ } = + Ack.write fd cryptobox_data Nack >>= fun _ -> P2p_io_scheduler.close fd >>= fun _ -> Lwt.return_unit @@ -222,7 +251,7 @@ let kick { fd ; ack_encoding ; cryptobox_data ; _ } = let authenticate ~proof_of_work_target ~incoming fd (remote_addr, remote_socket_port as point) - ?listening_port identity supported_versions ack_encoding = + ?listening_port identity supported_versions metadata_config = let local_nonce_seed = Crypto_box.random_nonce () in lwt_debug "Sending authenfication to %a" P2p_point.Id.pp point >>= fun () -> Connection_message.write fd @@ -248,16 +277,21 @@ let authenticate let (local_nonce, remote_nonce) = Crypto_box.generate_nonces ~incoming ~sent_msg ~recv_msg in let cryptobox_data = { Crypto.channel_key ; local_nonce ; remote_nonce } in + let local_metadata = metadata_config.conn_meta_value remote_peer_id in + Metadata.write metadata_config fd cryptobox_data local_metadata >>=? fun () -> + Metadata.read metadata_config fd cryptobox_data >>=? fun remote_metadata -> let info = { P2p_connection.Info.peer_id = remote_peer_id ; versions = msg.versions ; incoming ; id_point ; remote_socket_port ; + private_node = metadata_config.private_node remote_metadata ; + remote_metadata ; } in - return (info, { fd ; info ; cryptobox_data ; ack_encoding }) + return (info, { fd ; info ; cryptobox_data }) -type connection = { +type 'meta connection = { id : int ; - info : P2p_connection.Info.t ; + info : 'meta P2p_connection.Info.t ; fd : P2p_io_scheduler.connection ; cryptobox_data : Crypto.data ; } @@ -268,9 +302,9 @@ let next_conn_id = module Reader = struct - type 'msg t = { + type ('msg, 'meta) t = { canceler: Lwt_canceler.t ; - conn: connection ; + conn: 'meta connection ; encoding: 'msg Data_encoding.t ; messages: (int * 'msg) tzresult Lwt_pipe.t ; mutable worker: unit Lwt.t ; @@ -355,9 +389,9 @@ end module Writer = struct - type 'msg t = { + type ('msg, 'meta) t = { canceler: Lwt_canceler.t ; - conn: connection ; + conn: 'meta connection ; encoding: 'msg Data_encoding.t ; messages: (MBytes.t list * unit tzresult Lwt.u option) Lwt_pipe.t ; mutable worker: unit Lwt.t ; @@ -480,30 +514,26 @@ module Writer = struct end type ('msg, 'meta) t = { - conn : connection ; - meta : 'meta ; - reader : 'msg Reader.t ; - writer : 'msg Writer.t ; - private_node : bool ; + conn : 'meta connection ; + reader : ('msg, 'meta) Reader.t ; + writer : ('msg, 'meta) Writer.t ; } let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2 -let pp ppf { conn } = P2p_connection.Info.pp ppf conn.info +let pp ppf { conn } = P2p_connection.Info.pp (fun _ _ -> ()) ppf conn.info let info { conn } = conn.info -let meta { meta } = meta -let private_node { private_node } = private_node +let remote_metadata { conn } = conn.info.remote_metadata +let private_node { conn } = conn.info.private_node let accept ?incoming_message_queue_size ?outgoing_message_queue_size ?binary_chunks_size - ~private_node - { fd ; info ; cryptobox_data ; ack_encoding } - ack_param + ({ fd ; info ; cryptobox_data } : 'meta authenticated_fd) encoding = protect begin fun () -> - Ack.write ack_encoding fd cryptobox_data (Ack ack_param) >>=? fun () -> - Ack.read ack_encoding fd cryptobox_data + Ack.write fd cryptobox_data Ack >>=? fun () -> + Ack.read fd cryptobox_data end ~on_error:begin fun err -> P2p_io_scheduler.close fd >>= fun _ -> match err with @@ -511,7 +541,7 @@ let accept | [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth | err -> Lwt.return (Error err) end >>=? function - | Ack meta -> + | Ack -> let canceler = Lwt_canceler.create () in let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in let reader = @@ -521,8 +551,7 @@ let accept ?size:outgoing_message_queue_size ?binary_chunks_size conn encoding canceler in - let conn = { conn ; reader ; writer ; meta ; - private_node = private_node meta } in + let conn = { conn ; reader ; writer } in Lwt_canceler.on_cancel canceler begin fun () -> P2p_io_scheduler.close fd >>= fun _ -> Lwt.return_unit diff --git a/src/lib_p2p/p2p_socket.mli b/src/lib_p2p/p2p_socket.mli index a04c6f529..56457254e 100644 --- a/src/lib_p2p/p2p_socket.mli +++ b/src/lib_p2p/p2p_socket.mli @@ -19,7 +19,14 @@ (** {1 Types} *) -type 'conn_meta authenticated_fd +type 'meta metadata_config = { + conn_meta_encoding : 'meta Data_encoding.t ; + conn_meta_value : P2p_peer.Id.t -> 'meta ; + private_node : 'meta -> bool ; +} +(** Type for the parameter negotiation mechanism. *) + +type 'meta authenticated_fd (** Type of a connection that successfully passed the authentication phase, but has not been accepted yet. Parametrized by the type of expected parameter in the `ack` message. *) @@ -31,8 +38,8 @@ type ('msg, 'meta) t val equal: ('mst, 'meta) t -> ('msg, 'meta) t -> bool val pp: Format.formatter -> ('msg, 'meta) t -> unit -val info: ('msg, 'meta) t -> P2p_connection.Info.t -val meta: ('msg, 'meta) t -> 'meta +val info: ('msg, 'meta) t -> 'meta P2p_connection.Info.t +val remote_metadata: ('msg, 'meta) t -> 'meta val private_node: ('msg, 'meta) t -> bool (** {1 Low-level functions (do not use directly)} *) @@ -42,14 +49,15 @@ val authenticate: incoming:bool -> P2p_io_scheduler.connection -> P2p_point.Id.t -> ?listening_port: int -> - P2p_identity.t -> P2p_version.t list -> 'conn_meta Data_encoding.t -> - (P2p_connection.Info.t * 'conn_meta authenticated_fd) tzresult Lwt.t + P2p_identity.t -> P2p_version.t list -> + 'meta metadata_config -> + ('meta P2p_connection.Info.t * 'meta authenticated_fd) tzresult Lwt.t (** (Low-level) (Cancelable) Authentication function of a remote peer. Used in [P2p_connection_pool], to promote a [P2P_io_scheduler.connection] into an [authenticated_fd] (auth correct, acceptation undecided). *) -val kick: 'conn_meta authenticated_fd -> unit Lwt.t +val kick: 'meta authenticated_fd -> unit Lwt.t (** (Low-level) (Cancelable) [kick afd] notifies the remote peer that we refuse this connection and then closes [afd]. Used in [P2p_connection_pool] to reject an [aunthenticated_fd] which we do @@ -59,9 +67,8 @@ val accept: ?incoming_message_queue_size:int -> ?outgoing_message_queue_size:int -> ?binary_chunks_size: int -> - private_node:('conn_meta -> bool) -> - 'conn_meta authenticated_fd -> 'conn_meta -> - 'msg Data_encoding.t -> ('msg, 'conn_meta) t tzresult Lwt.t + 'meta authenticated_fd -> + 'msg Data_encoding.t -> ('msg, 'meta) t tzresult Lwt.t (** (Low-level) (Cancelable) Accepts a remote peer given an authenticated_fd. Used in [P2p_connection_pool], to promote an [authenticated_fd] to the status of an active peer. *) diff --git a/src/lib_p2p/test/test_p2p_pool.ml b/src/lib_p2p/test/test_p2p_pool.ml index 780896d3d..7c55690f0 100644 --- a/src/lib_p2p/test/test_p2p_pool.ml +++ b/src/lib_p2p/test/test_p2p_pool.ml @@ -33,7 +33,7 @@ let peer_meta_config : metadata P2p_pool.peer_meta_config = { score = fun () -> 0. ; } -let conn_meta_config : metadata P2p_pool.conn_meta_config = { +let conn_meta_config : metadata P2p_socket.metadata_config = { conn_meta_encoding = Data_encoding.empty ; conn_meta_value = (fun _ -> ()) ; private_node = (fun _ -> false) ; diff --git a/src/lib_p2p/test/test_p2p_socket.ml b/src/lib_p2p/test/test_p2p_socket.ml index ca65a9f70..902103185 100644 --- a/src/lib_p2p/test/test_p2p_socket.ml +++ b/src/lib_p2p/test/test_p2p_socket.ml @@ -21,6 +21,13 @@ let id0 = let versions = P2p_version.[{ name = "TEST" ; minor = 0 ; major = 0 }] +type metadata = unit +let conn_meta_config : metadata P2p_socket.metadata_config = { + conn_meta_encoding = Data_encoding.empty ; + conn_meta_value = (fun _ -> ()) ; + private_node = (fun _ -> false) ; +} + let rec listen ?port addr = let tentative_port = match port with @@ -95,7 +102,8 @@ let accept sched main_socket = raw_accept sched main_socket >>= fun (fd, point) -> P2p_socket.authenticate ~proof_of_work_target - ~incoming:true fd point id1 versions Data_encoding.unit + ~incoming:true fd point id1 versions + conn_meta_config let raw_connect sched addr port = let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in @@ -109,7 +117,8 @@ let connect sched addr port id = raw_connect sched addr port >>= fun fd -> P2p_socket.authenticate ~proof_of_work_target - ~incoming:false fd (addr, port) id versions Data_encoding.unit >>=? fun (info, auth_fd) -> + ~incoming:false fd + (addr, port) id versions conn_meta_config >>=? fun (info, auth_fd) -> _assert (not info.incoming) __LOC__ "" >>=? fun () -> _assert (P2p_peer.Id.compare info.peer_id id1.peer_id = 0) __LOC__ "" >>=? fun () -> @@ -172,9 +181,7 @@ module Kick = struct let client _ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>= fun conn -> + P2p_socket.accept auth_fd encoding >>= fun conn -> _assert (is_rejected conn) __LOC__ "" >>=? fun () -> return () @@ -188,9 +195,7 @@ module Kicked = struct let server _ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>= fun conn -> + P2p_socket.accept auth_fd encoding >>= fun conn -> _assert (Kick.is_rejected conn) __LOC__ "" >>=? fun () -> return () @@ -212,9 +217,7 @@ module Simple_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -224,9 +227,7 @@ module Simple_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -248,8 +249,7 @@ module Chunked_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> P2p_socket.accept - ~private_node:(fun _ -> false) - ~binary_chunks_size:21 auth_fd () encoding >>=? fun conn -> + ~binary_chunks_size:21 auth_fd encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -260,8 +260,7 @@ module Chunked_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> P2p_socket.accept - ~private_node:(fun _ -> false) - ~binary_chunks_size:21 auth_fd () encoding >>=? fun conn -> + ~binary_chunks_size:21 auth_fd encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -282,9 +281,7 @@ module Oversized_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -294,9 +291,7 @@ module Oversized_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -316,18 +311,14 @@ module Close_on_read = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return () let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> sync ch >>=? fun () -> P2p_socket.read conn >>= fun err -> _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> @@ -346,18 +337,14 @@ module Close_on_write = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> P2p_socket.close conn >>= fun _stat -> sync ch >>=? fun ()-> return () let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> sync ch >>=? fun ()-> Lwt_unix.sleep 0.1 >>= fun () -> P2p_socket.write_sync conn simple_msg >>= fun err -> @@ -387,9 +374,7 @@ module Garbled_data = struct let server _ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> P2p_socket.raw_write_sync conn garbled_msg >>=? fun () -> P2p_socket.read conn >>= fun err -> _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> @@ -398,9 +383,7 @@ module Garbled_data = struct let client _ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept - ~private_node:(fun _ -> false) - auth_fd () encoding >>=? fun conn -> + P2p_socket.accept auth_fd encoding >>=? fun conn -> P2p_socket.read conn >>= fun err -> _assert (is_decoding_error err) __LOC__ "" >>=? fun () -> P2p_socket.close conn >>= fun _stat -> diff --git a/src/lib_shell/distributed_db.ml b/src/lib_shell/distributed_db.ml index b68781523..d2ea4f8c5 100644 --- a/src/lib_shell/distributed_db.ml +++ b/src/lib_shell/distributed_db.ml @@ -499,7 +499,7 @@ module P2p_reader = struct | Get_current_head chain_id -> may_handle state chain_id @@ fun chain_db -> let { Connection_metadata.disable_mempool } = - P2p.connection_metadata chain_db.global_db.p2p state.conn in + P2p.connection_remote_metadata chain_db.global_db.p2p state.conn in begin if disable_mempool then Chain.head chain_db.chain_state >>= fun head -> @@ -963,7 +963,7 @@ module Advertise = struct Message.Current_head (chain_id, State.Block.header head, Mempool.empty) in let send_mempool state = let { Connection_metadata.disable_mempool } = - P2p.connection_metadata chain_db.global_db.p2p state.conn in + P2p.connection_remote_metadata chain_db.global_db.p2p state.conn in let msg = if disable_mempool then msg_disable_mempool else msg_mempool in ignore @@ P2p.try_send chain_db.global_db.p2p state.conn msg in diff --git a/src/lib_shell/node.ml b/src/lib_shell/node.ml index 08cc43b0a..57dfb4886 100644 --- a/src/lib_shell/node.ml +++ b/src/lib_shell/node.ml @@ -96,7 +96,7 @@ let init_p2p p2p_params = let conn_metadata_cfg = connection_metadata_cfg c_meta in lwt_log_notice "P2P layer is disabled" >>= fun () -> return - (P2p.faked_network peer_metadata_cfg, conn_metadata_cfg) + (P2p.faked_network peer_metadata_cfg c_meta, conn_metadata_cfg) | Some (config, limits) -> let c_meta = init_connection_metadata (Some config) in let conn_metadata_cfg = connection_metadata_cfg c_meta in diff --git a/src/lib_shell_services/p2p_services.ml b/src/lib_shell_services/p2p_services.ml index 00a8fdab5..c29b2260a 100644 --- a/src/lib_shell_services/p2p_services.ml +++ b/src/lib_shell_services/p2p_services.ml @@ -54,6 +54,11 @@ let monitor_encoding = Data_encoding.(obj1 (dft "monitor" bool false)) module Connections = struct + type connection_info = Connection_metadata.t P2p_connection.Info.t + + let connection_info_encoding = + P2p_connection.Info.encoding Connection_metadata.encoding + module S = struct let list = @@ -61,14 +66,14 @@ module Connections = struct ~description:"List the running P2P connection." ~query: RPC_query.empty ~input: Data_encoding.empty - ~output: (Data_encoding.list P2p_connection.Info.encoding) + ~output: (Data_encoding.list connection_info_encoding) RPC_path.(root / "network" / "connections") let info = RPC_service.post_service ~query: RPC_query.empty ~input: Data_encoding.empty - ~output: P2p_connection.Info.encoding + ~output: connection_info_encoding ~description:"Details about the current P2P connection to the given peer." RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg) diff --git a/src/lib_shell_services/p2p_services.mli b/src/lib_shell_services/p2p_services.mli index df444438c..a3de8364d 100644 --- a/src/lib_shell_services/p2p_services.mli +++ b/src/lib_shell_services/p2p_services.mli @@ -46,9 +46,11 @@ module Connections : sig open RPC_context - val list: #simple -> P2p_connection.Info.t list tzresult Lwt.t + type connection_info = Connection_metadata.t P2p_connection.Info.t - val info: #simple -> P2p_peer.Id.t -> P2p_connection.Info.t tzresult Lwt.t + val list: #simple -> connection_info list tzresult Lwt.t + + val info: #simple -> P2p_peer.Id.t -> connection_info tzresult Lwt.t val kick: #simple -> ?wait:bool -> P2p_peer.Id.t -> unit tzresult Lwt.t @@ -57,12 +59,12 @@ module Connections : sig val list : ([ `POST ], unit, unit, unit, unit, - P2p_connection.Info.t list) RPC_service.t + connection_info list) RPC_service.t val info : ([ `POST ], unit, unit * P2p_peer.Id.t, unit, unit, - P2p_connection.Info.t) RPC_service.t + connection_info) RPC_service.t val kick : ([ `POST ], unit,