diff --git a/src/lib_p2p/p2p_point_state.ml b/src/lib_p2p/p2p_point_state.ml index e71fe8235..df9aed853 100644 --- a/src/lib_p2p/p2p_point_state.ml +++ b/src/lib_p2p/p2p_point_state.ml @@ -43,6 +43,7 @@ module Info = struct mutable last_failed_connection : Time.t option ; mutable last_rejected_connection : (P2p_peer.Id.t * Time.t) option ; mutable last_established_connection : (P2p_peer.Id.t * Time.t) option ; + mutable known_public : bool ; mutable last_disconnection : (P2p_peer.Id.t * Time.t) option ; greylisting : greylisting_config ; mutable greylisting_delay : float ; @@ -72,6 +73,7 @@ module Info = struct last_rejected_connection = None ; last_established_connection = None ; last_disconnection = None ; + known_public = false ; events = Ring.create log_size ; greylisting = greylisting_config ; greylisting_delay = 1. ; @@ -87,6 +89,7 @@ module Info = struct let last_disconnection s = s.last_disconnection let last_failed_connection s = s.last_failed_connection let last_rejected_connection s = s.last_rejected_connection + let known_public s = s.known_public let greylisted ?(now = Time.now ()) s = Time.compare now s.greylisting_end <= 0 let greylisted_until s = s.greylisting_end @@ -151,7 +154,7 @@ let set_accepted let set_running ?(timestamp = Time.now ()) - point_info peer_id data = + ~known_private point_info peer_id data = assert begin match point_info.Info.state with | Disconnected -> true (* request to unknown peer_id. *) @@ -160,6 +163,7 @@ let set_running | Requested _ -> true end ; point_info.state <- Running { data ; current_peer_id = peer_id } ; + point_info.known_public <- not known_private ; point_info.last_established_connection <- Some (peer_id, timestamp) ; Info.log point_info ~timestamp (Connection_established peer_id) diff --git a/src/lib_p2p/p2p_point_state.mli b/src/lib_p2p/p2p_point_state.mli index f79ffa0f8..3781af901 100644 --- a/src/lib_p2p/p2p_point_state.mli +++ b/src/lib_p2p/p2p_point_state.mli @@ -50,6 +50,8 @@ module Info : sig (** [trusted pi] is [true] iff [pi] has is trusted, i.e. "whitelisted". *) + val known_public : 'conn point_info -> bool + val set_trusted : 'conn point_info -> unit val unset_trusted : 'conn point_info -> unit @@ -110,7 +112,9 @@ val set_accepted : 'conn Info.t -> P2p_peer.Id.t -> Lwt_canceler.t -> unit val set_running : - ?timestamp:Time.t -> 'conn Info.t -> P2p_peer.Id.t -> 'conn -> unit + ?timestamp:Time.t -> + known_private: bool -> + 'conn Info.t -> P2p_peer.Id.t -> 'conn -> unit val set_disconnected : ?timestamp:Time.t -> ?requested:bool -> 'conn Info.t -> unit diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index 66d96a630..ec43451a8 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -815,6 +815,7 @@ and authenticate pool ?point_info canceler fd point = ?incoming_message_queue_size:pool.config.incoming_message_queue_size ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size ?binary_chunks_size:pool.config.binary_chunks_size + ~private_node:pool.conn_meta_config.private_node auth_fd (pool.conn_meta_config.conn_meta_value info.peer_id) pool.encoding >>=? fun conn -> @@ -927,7 +928,10 @@ and create_connection pool p2p_conn id_point point_info peer_info _version = ignore (Lazy.force answerer) ; Option.iter point_info ~f:begin fun point_info -> let point = P2p_point_state.Info.point point_info in - P2p_point_state.set_running point_info peer_id conn ; + let conn_meta = P2p_socket.meta p2p_conn in + P2p_point_state.set_running + ~known_private:(pool.conn_meta_config.private_node conn_meta) + point_info peer_id conn; P2p_point.Table.add pool.connected_points point point_info ; end ; log pool (Connection_established (id_point, peer_id)) ; @@ -1102,7 +1106,7 @@ let create config peer_meta_config conn_meta_config message_config io_sched = new_connection = Lwt_condition.create () ; } in let pool = { - config ; peer_meta_config ; conn_meta_config; message_config ; + config ; peer_meta_config ; conn_meta_config ; message_config ; my_id_points = P2p_point.Table.create 7 ; known_peer_ids = P2p_peer.Table.create 53 ; connected_peer_ids = P2p_peer.Table.create 53 ; diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index dc149f2d2..7981de35c 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -33,9 +33,9 @@ type 'msg encoding = Encoding : { (** {1 Pool management} *) -type ('msg, 'peer_meta,'conn_meta) t +type ('msg, 'peer_meta, 'conn_meta) t -type ('msg, 'peer_meta,'conn_meta) pool = ('msg, 'peer_meta,'conn_meta) t +type ('msg, 'peer_meta, 'conn_meta) pool = ('msg, 'peer_meta, 'conn_meta) t (** The type of a pool of connections, parametrized by resp. the type of messages and the meta-informations associated to an identity and a connection. *) diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index 7576b8d26..691ff79d9 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -251,7 +251,8 @@ let authenticate let info = { P2p_connection.Info.peer_id = remote_peer_id ; versions = msg.versions ; incoming ; - id_point ; remote_socket_port ;} in + id_point ; remote_socket_port ; + } in return (info, { fd ; info ; cryptobox_data ; ack_encoding }) type connection = { @@ -483,6 +484,7 @@ type ('msg, 'meta) t = { meta : 'meta ; reader : 'msg Reader.t ; writer : 'msg Writer.t ; + private_node : bool ; } let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2 @@ -490,10 +492,12 @@ let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2 let pp ppf { conn } = P2p_connection.Info.pp ppf conn.info let info { conn } = conn.info let meta { meta } = meta +let private_node { private_node } = private_node let accept ?incoming_message_queue_size ?outgoing_message_queue_size ?binary_chunks_size + ~private_node { fd ; info ; cryptobox_data ; ack_encoding } ack_param encoding = @@ -517,7 +521,8 @@ let accept ?size:outgoing_message_queue_size ?binary_chunks_size conn encoding canceler in - let conn = { conn ; reader ; writer ; meta } in + let conn = { conn ; reader ; writer ; meta ; + private_node = private_node meta } in Lwt_canceler.on_cancel canceler begin fun () -> P2p_io_scheduler.close fd >>= fun _ -> Lwt.return_unit diff --git a/src/lib_p2p/p2p_socket.mli b/src/lib_p2p/p2p_socket.mli index 1d58ccdff..a04c6f529 100644 --- a/src/lib_p2p/p2p_socket.mli +++ b/src/lib_p2p/p2p_socket.mli @@ -33,6 +33,7 @@ val equal: ('mst, 'meta) t -> ('msg, 'meta) t -> bool val pp: Format.formatter -> ('msg, 'meta) t -> unit val info: ('msg, 'meta) t -> P2p_connection.Info.t val meta: ('msg, 'meta) t -> 'meta +val private_node: ('msg, 'meta) t -> bool (** {1 Low-level functions (do not use directly)} *) @@ -58,6 +59,7 @@ val accept: ?incoming_message_queue_size:int -> ?outgoing_message_queue_size:int -> ?binary_chunks_size: int -> + private_node:('conn_meta -> bool) -> 'conn_meta authenticated_fd -> 'conn_meta -> 'msg Data_encoding.t -> ('msg, 'conn_meta) t tzresult Lwt.t (** (Low-level) (Cancelable) Accepts a remote peer given an diff --git a/src/lib_p2p/test/test_p2p_socket.ml b/src/lib_p2p/test/test_p2p_socket.ml index 42fc91931..ca65a9f70 100644 --- a/src/lib_p2p/test/test_p2p_socket.ml +++ b/src/lib_p2p/test/test_p2p_socket.ml @@ -172,7 +172,9 @@ module Kick = struct let client _ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>= fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>= fun conn -> _assert (is_rejected conn) __LOC__ "" >>=? fun () -> return () @@ -186,7 +188,9 @@ module Kicked = struct let server _ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>= fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>= fun conn -> _assert (Kick.is_rejected conn) __LOC__ "" >>=? fun () -> return () @@ -208,7 +212,9 @@ module Simple_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -218,7 +224,9 @@ module Simple_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -240,6 +248,7 @@ module Chunked_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> P2p_socket.accept + ~private_node:(fun _ -> false) ~binary_chunks_size:21 auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> @@ -251,6 +260,7 @@ module Chunked_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> P2p_socket.accept + ~private_node:(fun _ -> false) ~binary_chunks_size:21 auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> @@ -272,7 +282,9 @@ module Oversized_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -282,7 +294,9 @@ module Oversized_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -302,14 +316,18 @@ module Close_on_read = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return () let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> sync ch >>=? fun () -> P2p_socket.read conn >>= fun err -> _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> @@ -328,14 +346,18 @@ module Close_on_write = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> P2p_socket.close conn >>= fun _stat -> sync ch >>=? fun ()-> return () let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> sync ch >>=? fun ()-> Lwt_unix.sleep 0.1 >>= fun () -> P2p_socket.write_sync conn simple_msg >>= fun err -> @@ -365,7 +387,9 @@ module Garbled_data = struct let server _ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> P2p_socket.raw_write_sync conn garbled_msg >>=? fun () -> P2p_socket.read conn >>= fun err -> _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> @@ -374,7 +398,9 @@ module Garbled_data = struct let client _ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd () encoding >>=? fun conn -> + P2p_socket.accept + ~private_node:(fun _ -> false) + auth_fd () encoding >>=? fun conn -> P2p_socket.read conn >>= fun err -> _assert (is_decoding_error err) __LOC__ "" >>=? fun () -> P2p_socket.close conn >>= fun _stat ->