From 6faaeaf5e8c1c20c660fbead68e5d13af67b9b18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 28 May 2018 20:38:08 +0200 Subject: [PATCH] P2p: properly export connection metadata --- src/lib_p2p/p2p.ml | 7 +++++++ src/lib_p2p/p2p.mli | 4 ++++ src/lib_p2p/p2p_pool.ml | 23 +++++++++++---------- src/lib_p2p/p2p_pool.mli | 1 + src/lib_p2p/p2p_socket.ml | 10 ++++++---- src/lib_p2p/p2p_socket.mli | 31 +++++++++++++++-------------- src/lib_p2p/test/test_p2p_socket.ml | 24 +++++++++++----------- 7 files changed, 59 insertions(+), 41 deletions(-) diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index 1e4b1a7c8..6c2ba29d4 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -204,6 +204,8 @@ module Real = struct P2p_pool.disconnect ?wait conn let connection_info _net conn = P2p_pool.Connection.info conn + let connection_metadata _net conn = + P2p_pool.Connection.meta conn let connection_stat _net conn = P2p_pool.Connection.stat conn let global_stat { pool } () = @@ -326,6 +328,8 @@ type ('msg, 'peer_meta, 'conn_meta) t = { ?wait:bool -> ('msg, 'peer_meta, 'conn_meta) connection -> unit Lwt.t ; connection_info : ('msg, 'peer_meta, 'conn_meta) connection -> P2p_connection.Info.t ; + connection_metadata : + ('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta ; connection_stat : ('msg, 'peer_meta, 'conn_meta) connection -> P2p_stat.t ; global_stat : unit -> P2p_stat.t ; get_peer_metadata : P2p_peer.Id.t -> 'peer_meta ; @@ -401,6 +405,7 @@ let create ~config ~limits peer_cfg conn_cfg msg_cfg = find_connection = Real.find_connection net ; disconnect = Real.disconnect ; connection_info = Real.connection_info net ; + connection_metadata = Real.connection_metadata net ; connection_stat = Real.connection_stat net ; global_stat = Real.global_stat net ; get_peer_metadata = Real.get_peer_metadata net ; @@ -426,6 +431,7 @@ let faked_network peer_cfg = { find_connection = (fun _ -> None) ; disconnect = (fun ?wait:_ _ -> Lwt.return_unit) ; connection_info = (fun _ -> Fake.connection_info) ; + connection_metadata = (fun _ -> assert false) ; connection_stat = (fun _ -> Fake.empty_stat) ; global_stat = (fun () -> Fake.empty_stat) ; get_peer_metadata = (fun _ -> peer_cfg.peer_meta_initial) ; @@ -449,6 +455,7 @@ let connections net = net.connections () let disconnect net = net.disconnect let find_connection net = net.find_connection let connection_info net = net.connection_info +let connection_metadata net = net.connection_metadata let connection_stat net = net.connection_stat let global_stat net = net.global_stat () let get_peer_metadata net = net.get_peer_metadata diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index 427bae07f..342c8b7aa 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -181,6 +181,10 @@ val connection_info : ('msg, 'peer_meta, 'conn_meta) net -> ('msg, 'peer_meta, 'conn_meta) connection -> P2p_connection.Info.t +val connection_metadata : + ('msg, 'peer_meta, 'conn_meta) net -> + ('msg, 'peer_meta, 'conn_meta) connection -> + 'conn_meta val connection_stat : ('msg, 'peer_meta, 'conn_meta) net -> ('msg, 'peer_meta, 'conn_meta) connection -> diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index d8718fe69..747adda0a 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -92,9 +92,9 @@ module Answerer = struct swap_ack: P2p_point.Id.t -> P2p_peer.Id.t -> unit Lwt.t ; } - type 'msg t = { + type ('msg, 'meta) t = { canceler: Lwt_canceler.t ; - conn: 'msg Message.t P2p_socket.t ; + conn: ('msg Message.t, 'meta) P2p_socket.t ; callback: 'msg callback ; mutable worker: unit Lwt.t ; } @@ -248,12 +248,12 @@ and events = { and ('msg, 'peer_meta, 'conn_meta) connection = { canceler : Lwt_canceler.t ; messages : (int * 'msg) Lwt_pipe.t ; - conn : 'msg Message.t P2p_socket.t ; + conn : ('msg Message.t, 'conn_meta) P2p_socket.t ; peer_info : (('msg, 'peer_meta, 'conn_meta) connection, 'peer_meta, 'conn_meta) P2p_peer_state.Info.t ; point_info : ('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t option ; - answerer : 'msg Answerer.t Lazy.t ; + answerer : ('msg, 'conn_meta) Answerer.t Lazy.t ; mutable last_sent_swap_request : (Time.t * P2p_peer.Id.t) option ; mutable wait_close : bool ; } @@ -584,6 +584,9 @@ module Connection = struct let info { conn } = P2p_socket.info conn + let meta { conn } = + P2p_socket.meta conn + let find_by_peer_id pool peer_id = Option.apply (Peers.info pool peer_id) @@ -800,11 +803,11 @@ and authenticate pool ?point_info canceler fd point = ?binary_chunks_size:pool.config.binary_chunks_size auth_fd (pool.conn_meta_config.conn_meta_value info.peer_id) - pool.encoding >>=? fun (conn, ack_cfg) -> + pool.encoding >>=? fun conn -> lwt_debug "authenticate: %a -> Connected %a" P2p_point.Id.pp point P2p_connection.Info.pp info >>= fun () -> - return (conn, ack_cfg) + return conn end ~on_error: begin fun err -> if incoming then log pool @@ -816,7 +819,7 @@ and authenticate pool ?point_info canceler fd point = ~f:P2p_point_state.set_disconnected ; P2p_peer_state.set_disconnected peer_info ; Lwt.return (Error err) - end >>=? fun (conn, ack_cfg) -> + end >>=? fun conn -> let id_point = match info.id_point, Option.map ~f:P2p_point_state.Info.point point_info with | (addr, _), Some (_, port) -> addr, Some port @@ -824,7 +827,7 @@ and authenticate pool ?point_info canceler fd point = return (create_connection pool conn - id_point connection_point_info peer_info version ack_cfg) + id_point connection_point_info peer_info version) end | _ -> begin log pool (Rejecting_request (point, info.id_point, info.peer_id)) ; @@ -840,7 +843,7 @@ and authenticate pool ?point_info canceler fd point = fail (P2p_errors.Rejected info.peer_id) end -and create_connection pool p2p_conn id_point point_info peer_info _version ack_cfg = +and create_connection pool p2p_conn id_point point_info peer_info _version = let peer_id = P2p_peer_state.Info.peer_id peer_info in let canceler = Lwt_canceler.create () in let size = @@ -872,7 +875,7 @@ and create_connection pool p2p_conn id_point point_info peer_info _version ack_c P2p_point.Table.add pool.connected_points point point_info ; end ; log pool (Connection_established (id_point, peer_id)) ; - P2p_peer_state.set_running peer_info id_point conn ack_cfg ; + P2p_peer_state.set_running peer_info id_point conn (P2p_socket.meta conn.conn) ; P2p_peer.Table.add pool.connected_peer_ids peer_id peer_info ; Lwt_condition.broadcast pool.events.new_connection () ; Lwt_canceler.on_cancel canceler begin fun () -> diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index ef25fabc9..dc47952a8 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -219,6 +219,7 @@ val disconnect: module Connection : sig val info: ('msg, 'peer_meta,'conn_meta) connection -> P2p_connection.Info.t + val meta: ('msg, 'peer_meta,'conn_meta) connection -> 'conn_meta val stat: ('msg, 'peer_meta,'conn_meta) connection -> P2p_stat.t (** [stat conn] is a snapshot of current bandwidth usage for diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index 98a59e67b..7576b8d26 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -478,8 +478,9 @@ module Writer = struct end -type 'msg t = { +type ('msg, 'meta) t = { conn : connection ; + meta : 'meta ; reader : 'msg Reader.t ; writer : 'msg Writer.t ; } @@ -488,6 +489,7 @@ let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2 let pp ppf { conn } = P2p_connection.Info.pp ppf conn.info let info { conn } = conn.info +let meta { meta } = meta let accept ?incoming_message_queue_size ?outgoing_message_queue_size @@ -505,7 +507,7 @@ let accept | [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth | err -> Lwt.return (Error err) end >>=? function - | Ack ack_cfg -> + | Ack meta -> let canceler = Lwt_canceler.create () in let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in let reader = @@ -515,12 +517,12 @@ let accept ?size:outgoing_message_queue_size ?binary_chunks_size conn encoding canceler in - let conn = { conn ; reader ; writer } in + let conn = { conn ; reader ; writer ; meta } in Lwt_canceler.on_cancel canceler begin fun () -> P2p_io_scheduler.close fd >>= fun _ -> Lwt.return_unit end ; - return (conn, ack_cfg) + return conn | Nack -> fail P2p_errors.Rejected_socket_connection diff --git a/src/lib_p2p/p2p_socket.mli b/src/lib_p2p/p2p_socket.mli index e8444770b..1d58ccdff 100644 --- a/src/lib_p2p/p2p_socket.mli +++ b/src/lib_p2p/p2p_socket.mli @@ -24,14 +24,15 @@ type 'conn_meta authenticated_fd phase, but has not been accepted yet. Parametrized by the type of expected parameter in the `ack` message. *) -type 'msg t +type ('msg, 'meta) t (** Type of an accepted connection, parametrized by the type of messages exchanged between peers. *) -val equal: 'mst t -> 'msg t -> bool +val equal: ('mst, 'meta) t -> ('msg, 'meta) t -> bool -val pp: Format.formatter -> 'msg t -> unit -val info: 'msg t -> P2p_connection.Info.t +val pp: Format.formatter -> ('msg, 'meta) t -> unit +val info: ('msg, 'meta) t -> P2p_connection.Info.t +val meta: ('msg, 'meta) t -> 'meta (** {1 Low-level functions (do not use directly)} *) @@ -58,7 +59,7 @@ val accept: ?outgoing_message_queue_size:int -> ?binary_chunks_size: int -> 'conn_meta authenticated_fd -> 'conn_meta -> - 'msg Data_encoding.t -> ('msg t * 'conn_meta) tzresult Lwt.t + 'msg Data_encoding.t -> ('msg, 'conn_meta) t tzresult Lwt.t (** (Low-level) (Cancelable) Accepts a remote peer given an authenticated_fd. Used in [P2p_connection_pool], to promote an [authenticated_fd] to the status of an active peer. *) @@ -70,47 +71,47 @@ val check_binary_chunks_size: int -> unit tzresult Lwt.t (** {2 Output functions} *) -val write: 'msg t -> 'msg -> unit tzresult Lwt.t +val write: ('msg, 'meta) t -> 'msg -> unit tzresult Lwt.t (** [write conn msg] returns when [msg] has successfully been added to [conn]'s internal write queue or fails with a corresponding error. *) -val write_now: 'msg t -> 'msg -> bool tzresult +val write_now: ('msg, 'meta) t -> 'msg -> bool tzresult (** [write_now conn msg] is [Ok true] if [msg] has been added to [conn]'s internal write queue, [Ok false] if [msg] has been dropped, or fails with a correponding error otherwise. *) -val write_sync: 'msg t -> 'msg -> unit tzresult Lwt.t +val write_sync: ('msg, 'meta) t -> 'msg -> unit tzresult Lwt.t (** [write_sync conn msg] returns when [msg] has been successfully sent to the remote end of [conn], or fails accordingly. *) (** {2 Input functions} *) -val is_readable: 'msg t -> bool +val is_readable: ('msg, 'meta) t -> bool (** [is_readable conn] is [true] iff [conn] internal read queue is not empty. *) -val wait_readable: 'msg t -> unit tzresult Lwt.t +val wait_readable: ('msg, 'meta) t -> unit tzresult Lwt.t (** (Cancelable) [wait_readable conn] returns when [conn]'s internal read queue becomes readable (i.e. not empty). *) -val read: 'msg t -> (int * 'msg) tzresult Lwt.t +val read: ('msg, 'meta) t -> (int * 'msg) tzresult Lwt.t (** [read conn msg] returns when [msg] has successfully been popped from [conn]'s internal read queue or fails with a corresponding error. *) -val read_now: 'msg t -> (int * 'msg) tzresult option +val read_now: ('msg, 'meta) t -> (int * 'msg) tzresult option (** [read_now conn msg] is [Some msg] if [conn]'s internal read queue is not empty, [None] if it is empty, or fails with a correponding error otherwise. *) -val stat: 'msg t -> P2p_stat.t +val stat: ('msg, 'meta) t -> P2p_stat.t (** [stat conn] is a snapshot of current bandwidth usage for [conn]. *) -val close: ?wait:bool -> 'msg t -> unit Lwt.t +val close: ?wait:bool -> ('msg, 'meta) t -> unit Lwt.t (**/**) (** for testing only *) -val raw_write_sync: 'msg t -> MBytes.t -> unit tzresult Lwt.t +val raw_write_sync: ('msg, 'meta) t -> MBytes.t -> unit tzresult Lwt.t diff --git a/src/lib_p2p/test/test_p2p_socket.ml b/src/lib_p2p/test/test_p2p_socket.ml index 165074c60..42fc91931 100644 --- a/src/lib_p2p/test/test_p2p_socket.ml +++ b/src/lib_p2p/test/test_p2p_socket.ml @@ -208,7 +208,7 @@ module Simple_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -218,7 +218,7 @@ module Simple_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -240,7 +240,7 @@ module Chunked_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> P2p_socket.accept - ~binary_chunks_size:21 auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + ~binary_chunks_size:21 auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -251,7 +251,7 @@ module Chunked_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> P2p_socket.accept - ~binary_chunks_size:21 auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + ~binary_chunks_size:21 auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -272,7 +272,7 @@ module Oversized_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -282,7 +282,7 @@ module Oversized_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -302,14 +302,14 @@ module Close_on_read = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return () let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> sync ch >>=? fun () -> P2p_socket.read conn >>= fun err -> _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> @@ -328,14 +328,14 @@ module Close_on_write = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> P2p_socket.close conn >>= fun _stat -> sync ch >>=? fun ()-> return () let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> sync ch >>=? fun ()-> Lwt_unix.sleep 0.1 >>= fun () -> P2p_socket.write_sync conn simple_msg >>= fun err -> @@ -365,7 +365,7 @@ module Garbled_data = struct let server _ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> P2p_socket.raw_write_sync conn garbled_msg >>=? fun () -> P2p_socket.read conn >>= fun err -> _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> @@ -374,7 +374,7 @@ module Garbled_data = struct let client _ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> + P2p_socket.accept auth_fd () encoding >>=? fun conn -> P2p_socket.read conn >>= fun err -> _assert (is_decoding_error err) __LOC__ "" >>=? fun () -> P2p_socket.close conn >>= fun _stat ->