diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index 6978b7117..1e4b1a7c8 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -9,10 +9,15 @@ include Logging.Make(struct let name = "p2p" end) -type 'meta meta_config = 'meta P2p_pool.meta_config = { - encoding : 'meta Data_encoding.t; - initial : 'meta; - score : 'meta -> float +type 'peer_meta peer_meta_config = 'peer_meta P2p_pool.peer_meta_config = { + peer_meta_encoding : 'peer_meta Data_encoding.t ; + peer_meta_initial : 'peer_meta ; + score : 'peer_meta -> float ; +} + +type 'conn_meta conn_meta_config = 'conn_meta P2p_pool.conn_meta_config = { + conn_meta_encoding : 'conn_meta Data_encoding.t ; + conn_meta_value : P2p_peer.Id.t -> 'conn_meta ; } type 'msg app_message_encoding = 'msg P2p_pool.encoding = @@ -146,23 +151,24 @@ let may_create_welcome_worker config limits pool = port >>= fun w -> Lwt.return (Some w) -type ('msg, 'meta) connection = ('msg, 'meta) P2p_pool.connection +type ('msg, 'peer_meta, 'conn_meta) connection = + ('msg, 'peer_meta, 'conn_meta) P2p_pool.connection module Real = struct - type ('msg, 'meta) net = { + type ('msg, 'peer_meta, 'conn_meta) net = { config: config ; limits: limits ; io_sched: P2p_io_scheduler.t ; - pool: ('msg, 'meta) P2p_pool.t ; - maintenance: 'meta P2p_maintenance.t ; + pool: ('msg, 'peer_meta, 'conn_meta) P2p_pool.t ; + maintenance: 'peer_meta P2p_maintenance.t ; welcome: P2p_welcome.t option ; } - let create ~config ~limits meta_cfg msg_cfg = + let create ~config ~limits meta_cfg conn_meta_cfg msg_cfg = let io_sched = create_scheduler limits in create_connection_pool - config limits meta_cfg msg_cfg io_sched >>= fun pool -> + config limits meta_cfg conn_meta_cfg msg_cfg io_sched >>= fun pool -> let maintenance = create_maintenance_worker limits pool in may_create_welcome_worker config limits pool >>= fun welcome -> return { @@ -202,10 +208,10 @@ module Real = struct P2p_pool.Connection.stat conn let global_stat { pool } () = P2p_pool.pool_stat pool - let set_metadata { pool } conn meta = - P2p_pool.Peers.set_metadata pool conn meta - let get_metadata { pool } conn = - P2p_pool.Peers.get_metadata pool conn + let set_peer_metadata { pool } conn meta = + P2p_pool.Peers.set_peer_metadata pool conn meta + let get_peer_metadata { pool } conn = + P2p_pool.Peers.get_peer_metadata pool conn let recv _net conn = P2p_pool.read conn >>=? fun msg -> @@ -307,32 +313,42 @@ module Fake = struct end -type ('msg, 'meta) t = { +type ('msg, 'peer_meta, 'conn_meta) t = { versions : P2p_version.t list ; peer_id : P2p_peer.Id.t ; maintain : unit -> unit Lwt.t ; roll : unit -> unit Lwt.t ; shutdown : unit -> unit Lwt.t ; - connections : unit -> ('msg, 'meta) connection list ; - find_connection : P2p_peer.Id.t -> ('msg, 'meta) connection option ; - disconnect : ?wait:bool -> ('msg, 'meta) connection -> unit Lwt.t ; - connection_info : ('msg, 'meta) connection -> P2p_connection.Info.t ; - connection_stat : ('msg, 'meta) connection -> P2p_stat.t ; + connections : unit -> ('msg, 'peer_meta, 'conn_meta) connection list ; + find_connection : + P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection option ; + disconnect : + ?wait:bool -> ('msg, 'peer_meta, 'conn_meta) connection -> unit Lwt.t ; + connection_info : + ('msg, 'peer_meta, 'conn_meta) connection -> P2p_connection.Info.t ; + connection_stat : ('msg, 'peer_meta, 'conn_meta) connection -> P2p_stat.t ; global_stat : unit -> P2p_stat.t ; - get_metadata : P2p_peer.Id.t -> 'meta ; - set_metadata : P2p_peer.Id.t -> 'meta -> unit ; - recv : ('msg, 'meta) connection -> 'msg tzresult Lwt.t ; - recv_any : unit -> (('msg, 'meta) connection * 'msg) Lwt.t ; - send : ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t ; - try_send : ('msg, 'meta) connection -> 'msg -> bool ; + get_peer_metadata : P2p_peer.Id.t -> 'peer_meta ; + set_peer_metadata : P2p_peer.Id.t -> 'peer_meta -> unit ; + recv : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg tzresult Lwt.t ; + recv_any : unit -> (('msg, 'peer_meta, 'conn_meta) connection * 'msg) Lwt.t ; + send : + ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit tzresult Lwt.t ; + try_send : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> bool ; broadcast : 'msg -> unit ; - pool : ('msg, 'meta) P2p_pool.t option ; + pool : ('msg, 'peer_meta, 'conn_meta) P2p_pool.t option ; fold_connections : - 'a. init:'a -> f:(P2p_peer.Id.t -> ('msg, 'meta) connection -> 'a -> 'a) -> 'a ; - iter_connections : (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit ; - on_new_connection : (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit ; + 'a. init: 'a -> + f:(P2p_peer.Id.t -> + ('msg, 'peer_meta, 'conn_meta) connection -> 'a -> 'a) -> 'a ; + iter_connections : + (P2p_peer.Id.t -> + ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit ; + on_new_connection : + (P2p_peer.Id.t -> + ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit ; } -type ('msg, 'meta) net = ('msg, 'meta) t +type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t let check_limits = let fail_1 v orig = @@ -372,9 +388,9 @@ let check_limits = end >>=? fun () -> return () -let create ~config ~limits meta_cfg msg_cfg = +let create ~config ~limits peer_cfg conn_cfg msg_cfg = check_limits limits >>=? fun () -> - Real.create ~config ~limits meta_cfg msg_cfg >>=? fun net -> + Real.create ~config ~limits peer_cfg conn_cfg msg_cfg >>=? fun net -> return { versions = msg_cfg.versions ; peer_id = Real.peer_id net ; @@ -387,8 +403,8 @@ let create ~config ~limits meta_cfg msg_cfg = connection_info = Real.connection_info net ; connection_stat = Real.connection_stat net ; global_stat = Real.global_stat net ; - get_metadata = Real.get_metadata net ; - set_metadata = Real.set_metadata net ; + get_peer_metadata = Real.get_peer_metadata net ; + set_peer_metadata = Real.set_peer_metadata net ; recv = Real.recv net ; recv_any = Real.recv_any net ; send = Real.send net ; @@ -400,7 +416,7 @@ let create ~config ~limits meta_cfg msg_cfg = on_new_connection = Real.on_new_connection net ; } -let faked_network meta_config = { +let faked_network peer_cfg = { versions = [] ; peer_id = Fake.id.peer_id ; maintain = Lwt.return ; @@ -412,8 +428,8 @@ let faked_network meta_config = { connection_info = (fun _ -> Fake.connection_info) ; connection_stat = (fun _ -> Fake.empty_stat) ; global_stat = (fun () -> Fake.empty_stat) ; - get_metadata = (fun _ -> meta_config.initial) ; - set_metadata = (fun _ _ -> ()) ; + get_peer_metadata = (fun _ -> peer_cfg.peer_meta_initial) ; + set_peer_metadata = (fun _ _ -> ()) ; recv = (fun _ -> Lwt_utils.never_ending) ; recv_any = (fun () -> Lwt_utils.never_ending) ; send = (fun _ _ -> fail P2p_errors.Connection_closed) ; @@ -435,8 +451,8 @@ let find_connection net = net.find_connection let connection_info net = net.connection_info let connection_stat net = net.connection_stat let global_stat net = net.global_stat () -let get_metadata net = net.get_metadata -let set_metadata net = net.set_metadata +let get_peer_metadata net = net.get_peer_metadata +let set_peer_metadata net = net.set_peer_metadata let recv net = net.recv let recv_any net = net.recv_any () let send net = net.send diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index c60dd7910..427bae07f 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -15,10 +15,15 @@ nodes. *) -type 'meta meta_config = { - encoding : 'meta Data_encoding.t; - initial : 'meta; - score : 'meta -> float +type 'peer_meta peer_meta_config = { + peer_meta_encoding : 'peer_meta Data_encoding.t; + peer_meta_initial : 'peer_meta; + score : 'peer_meta -> float ; +} + +type 'conn_meta conn_meta_config = { + conn_meta_encoding : 'conn_meta Data_encoding.t; + conn_meta_value : P2p_peer.Id.t -> 'conn_meta ; } type 'msg app_message_encoding = Encoding : { @@ -127,94 +132,123 @@ type limits = { (** Type of a P2P layer instance, parametrized by: ['msg]: type of messages exchanged between peers - ['meta]: type of the metadata associated with peers (score, etc.) + ['peer_meta]: type of the metadata associated with peers (score, etc.) + ['conn_meta]: type of the metadata associated with connection (ack_cfg) *) -type ('msg, 'meta) t -type ('msg, 'meta) net = ('msg, 'meta) t +type ('msg, 'peer_meta, 'conn_meta) t +type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t (** A faked p2p layer, which do not initiate any connection nor open any listening socket *) -val faked_network : 'meta meta_config -> ('msg, 'meta) net +val faked_network : + 'peer_meta peer_meta_config -> + ('msg, 'peer_meta, 'conn_meta) net (** Main network initialisation function *) val create : config:config -> limits:limits -> - 'meta meta_config -> 'msg message_config -> ('msg, 'meta) net tzresult Lwt.t + 'peer_meta peer_meta_config -> 'conn_meta conn_meta_config -> + 'msg message_config -> ('msg, 'peer_meta, 'conn_meta) net tzresult Lwt.t (** Return one's peer_id *) -val peer_id : ('msg, 'meta) net -> P2p_peer.Id.t +val peer_id : ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t (** A maintenance operation : try and reach the ideal number of peers *) -val maintain : ('msg, 'meta) net -> unit Lwt.t +val maintain : ('msg, 'peer_meta, 'conn_meta) net -> unit Lwt.t (** Voluntarily drop some peers and replace them by new buddies *) -val roll : ('msg, 'meta) net -> unit Lwt.t +val roll : ('msg, 'peer_meta, 'conn_meta) net -> unit Lwt.t (** Close all connections properly *) -val shutdown : ('msg, 'meta) net -> unit Lwt.t +val shutdown : ('msg, 'peer_meta, 'conn_meta) net -> unit Lwt.t (** A connection to a peer *) -type ('msg, 'meta) connection +type ('msg, 'peer_meta, 'conn_meta) connection (** Access the domain of active peers *) -val connections : ('msg, 'meta) net -> ('msg, 'meta) connection list +val connections : + ('msg, 'peer_meta, 'conn_meta) net -> + ('msg, 'peer_meta, 'conn_meta) connection list (** Return the active peer with identity [peer_id] *) -val find_connection : ('msg, 'meta) net -> P2p_peer.Id.t -> ('msg, 'meta) connection option +val find_connection : + ('msg, 'peer_meta, 'conn_meta) net -> + P2p_peer.Id.t -> + ('msg, 'peer_meta, 'conn_meta) connection option (** Access the info of an active peer, if available *) val connection_info : - ('msg, 'meta) net -> ('msg, 'meta) connection -> P2p_connection.Info.t + ('msg, 'peer_meta, 'conn_meta) net -> + ('msg, 'peer_meta, 'conn_meta) connection -> + P2p_connection.Info.t val connection_stat : - ('msg, 'meta) net -> ('msg, 'meta) connection -> P2p_stat.t + ('msg, 'peer_meta, 'conn_meta) net -> + ('msg, 'peer_meta, 'conn_meta) connection -> + P2p_stat.t (** Cleanly closes a connection. *) val disconnect : - ('msg, 'meta) net -> ?wait:bool -> ('msg, 'meta) connection -> unit Lwt.t + ('msg, 'peer_meta, 'conn_meta) net -> + ?wait:bool -> + ('msg, 'peer_meta, 'conn_meta) connection -> + unit Lwt.t -val global_stat : ('msg, 'meta) net -> P2p_stat.t +val global_stat : ('msg, 'peer_meta, 'conn_meta) net -> P2p_stat.t (** Accessors for meta information about a global identifier *) -val get_metadata : ('msg, 'meta) net -> P2p_peer.Id.t -> 'meta -val set_metadata : ('msg, 'meta) net -> P2p_peer.Id.t -> 'meta -> unit +val get_peer_metadata : + ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t -> 'peer_meta +val set_peer_metadata : + ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t -> 'peer_meta -> unit (** Wait for a message from a given connection. *) val recv : - ('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg tzresult Lwt.t + ('msg, 'peer_meta, 'conn_meta) net -> + ('msg, 'peer_meta, 'conn_meta) connection -> + 'msg tzresult Lwt.t (** Wait for a message from any active connections. *) val recv_any : - ('msg, 'meta) net -> (('msg, 'meta) connection * 'msg) Lwt.t + ('msg, 'peer_meta, 'conn_meta) net -> + (('msg, 'peer_meta, 'conn_meta) connection * 'msg) Lwt.t (** [send net peer msg] is a thread that returns when [msg] has been successfully enqueued in the send queue. *) val send : - ('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t + ('msg, 'peer_meta, 'conn_meta) net -> + ('msg, 'peer_meta, 'conn_meta) connection -> + 'msg -> + unit tzresult Lwt.t (** [try_send net peer msg] is [true] if [msg] has been added to the send queue for [peer], [false] otherwise *) val try_send : - ('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg -> bool + ('msg, 'peer_meta, 'conn_meta) net -> + ('msg, 'peer_meta, 'conn_meta) connection -> + 'msg -> + bool (** Send a message to all peers *) -val broadcast : ('msg, 'meta) net -> 'msg -> unit +val broadcast : ('msg, 'peer_meta, 'conn_meta) net -> 'msg -> unit val fold_connections : - ('msg, 'meta) net -> - init:'a -> f:(P2p_peer.Id.t -> ('msg, 'meta) connection -> 'a -> 'a) -> 'a + ('msg, 'peer_meta, 'conn_meta) net -> + init:'a -> + f:(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> 'a -> 'a) -> + 'a val iter_connections : - ('msg, 'meta) net -> - (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit + ('msg, 'peer_meta, 'conn_meta) net -> + (P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit val on_new_connection : - ('msg, 'meta) net -> - (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit + ('msg, 'peer_meta, 'conn_meta) net -> + (P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit val build_rpc_directory : _ t -> unit RPC_directory.t -val greylist_addr : ('msg, 'meta) net -> P2p_addr.t -> unit -val greylist_peer : ('msg, 'meta) net -> P2p_peer.Id.t -> unit +val greylist_addr : ('msg, 'peer_meta, 'conn_meta) net -> P2p_addr.t -> unit +val greylist_peer : ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t -> unit (**/**) diff --git a/src/lib_p2p/p2p_maintenance.ml b/src/lib_p2p/p2p_maintenance.ml index 5957cbaa5..88f030f41 100644 --- a/src/lib_p2p/p2p_maintenance.ml +++ b/src/lib_p2p/p2p_maintenance.ml @@ -16,7 +16,7 @@ type bounds = { max_threshold: int ; } -type 'meta pool = Pool : ('msg, 'meta) P2p_pool.t -> 'meta pool +type 'meta pool = Pool : ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta pool type 'meta t = { canceler: Lwt_canceler.t ; diff --git a/src/lib_p2p/p2p_maintenance.mli b/src/lib_p2p/p2p_maintenance.mli index f9dd93a8a..3e9d4b703 100644 --- a/src/lib_p2p/p2p_maintenance.mli +++ b/src/lib_p2p/p2p_maintenance.mli @@ -36,7 +36,7 @@ type bounds = { type 'meta t (** Type of a maintenance worker. *) -val run: bounds -> ('msg, 'meta) P2p_pool.t -> 'meta t +val run: bounds -> ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta t (** [run ~greylist_timeout bounds pool] is a maintenance worker for [pool] with connection targets specified in [bounds]. *) diff --git a/src/lib_p2p/p2p_peer_state.ml b/src/lib_p2p/p2p_peer_state.ml index 5439eb714..5144a12a7 100644 --- a/src/lib_p2p/p2p_peer_state.ml +++ b/src/lib_p2p/p2p_peer_state.ml @@ -9,13 +9,14 @@ open P2p_peer -type 'data t = +type ('conn, 'conn_meta) t = | Accepted of { current_point: P2p_connection.Id.t ; cancel: Lwt_canceler.t } - | Running of { data: 'data ; + | Running of { data: 'conn ; + conn_metadata: 'conn_meta ; current_point: P2p_connection.Id.t } | Disconnected -type 'data state = 'data t +type ('conn, 'conn_meta) state = ('conn, 'conn_meta) t let pp ppf = function | Accepted { current_point ; _ } -> @@ -27,11 +28,11 @@ let pp ppf = function module Info = struct - type ('conn, 'meta) t = { + type ('conn, 'peer_meta, 'conn_meta) t = { peer_id : Id.t ; created : Time.t ; - mutable state : 'conn state ; - mutable metadata : 'meta ; + mutable state : ('conn, 'conn_meta) state ; + mutable peer_metadata : 'peer_meta ; mutable trusted : bool ; mutable last_failed_connection : (P2p_connection.Id.t * Time.t) option ; mutable last_rejected_connection : (P2p_connection.Id.t * Time.t) option ; @@ -40,17 +41,17 @@ module Info = struct events : Pool_event.t Ring.t ; watchers : Pool_event.t Lwt_watcher.input ; } - type ('conn, 'meta) peer_info = ('conn, 'meta) t + type ('conn, 'peer_meta, 'conn_meta) peer_info = ('conn, 'peer_meta, 'conn_meta) t let compare gi1 gi2 = Id.compare gi1.peer_id gi2.peer_id let log_size = 100 - let create ?(created = Time.now ()) ?(trusted = false) ~metadata peer_id = + let create ?(created = Time.now ()) ?(trusted = false) ~peer_metadata peer_id = { peer_id ; created ; state = Disconnected ; - metadata ; + peer_metadata ; trusted ; last_failed_connection = None ; last_rejected_connection = None ; @@ -60,23 +61,23 @@ module Info = struct watchers = Lwt_watcher.create_input () ; } - let encoding metadata_encoding = + let encoding peer_metadata_encoding = let open Data_encoding in conv - (fun { peer_id ; trusted ; metadata ; events ; created ; + (fun { peer_id ; trusted ; peer_metadata ; events ; created ; last_failed_connection ; last_rejected_connection ; last_established_connection ; last_disconnection ; _ } -> - (peer_id, created, trusted, metadata, Ring.elements events, + (peer_id, created, trusted, peer_metadata, Ring.elements events, last_failed_connection, last_rejected_connection, last_established_connection, last_disconnection)) - (fun (peer_id, created, trusted, metadata, event_list, + (fun (peer_id, created, trusted, peer_metadata, event_list, last_failed_connection, last_rejected_connection, last_established_connection, last_disconnection) -> - let info = create ~trusted ~metadata peer_id in + let info = create ~trusted ~peer_metadata peer_id in let events = Ring.create log_size in Ring.add_list info.events event_list ; { state = Disconnected ; - trusted ; peer_id ; metadata ; created ; + trusted ; peer_id ; peer_metadata ; created ; last_failed_connection ; last_rejected_connection ; last_established_connection ; @@ -88,7 +89,7 @@ module Info = struct (req "peer_id" Id.encoding) (req "created" Time.encoding) (dft "trusted" bool false) - (req "metadata" metadata_encoding) + (req "peer_metadata" peer_metadata_encoding) (dft "events" (list Pool_event.encoding) []) (opt "last_failed_connection" (tup2 P2p_connection.Id.encoding Time.encoding)) @@ -101,8 +102,8 @@ module Info = struct let peer_id { peer_id ; _ } = peer_id let created { created ; _ } = created - let metadata { metadata ; _ } = metadata - let set_metadata gi metadata = gi.metadata <- metadata + let peer_metadata { peer_metadata ; _ } = peer_metadata + let set_peer_metadata gi peer_metadata = gi.peer_metadata <- peer_metadata let trusted { trusted ; _ } = trusted let set_trusted gi = gi.trusted <- true let unset_trusted gi = gi.trusted <- false @@ -130,18 +131,19 @@ module Info = struct module File = struct - let load path metadata_encoding = - let enc = Data_encoding.list (encoding metadata_encoding) in + let load path peer_metadata_encoding = + let enc = + Data_encoding.list (encoding peer_metadata_encoding) in if path <> "/dev/null" && Sys.file_exists path then Lwt_utils_unix.Json.read_file path >>=? fun json -> return (Data_encoding.Json.destruct enc json) else return [] - let save path metadata_encoding peers = + let save path peer_metadata_encoding peers = let open Data_encoding in Lwt_utils_unix.Json.write_file path @@ - Json.construct (list (encoding metadata_encoding)) peers + Json.construct (list (encoding peer_metadata_encoding)) peers end @@ -170,7 +172,7 @@ let set_accepted let set_running ?(timestamp = Time.now ()) - peer_info point data = + peer_info point data conn_metadata = assert begin match peer_info.Info.state with | Disconnected -> true (* request to unknown peer_id. *) @@ -178,7 +180,7 @@ let set_running | Accepted { current_point ; _ } -> P2p_connection.Id.equal point current_point end ; - peer_info.state <- Running { data ; current_point = point } ; + peer_info.state <- Running { data ; conn_metadata ; current_point = point } ; peer_info.last_established_connection <- Some (point, timestamp) ; Info.log peer_info ~timestamp point Connection_established diff --git a/src/lib_p2p/p2p_peer_state.mli b/src/lib_p2p/p2p_peer_state.mli index 85c8ecd52..7a81e0912 100644 --- a/src/lib_p2p/p2p_peer_state.mli +++ b/src/lib_p2p/p2p_peer_state.mli @@ -9,56 +9,57 @@ open P2p_peer -type 'conn t = +type ('conn, 'conn_meta) t = | Accepted of { current_point: P2p_connection.Id.t ; cancel: Lwt_canceler.t } (** We accepted a incoming connection, we greeted back and we are waiting for an acknowledgement. *) | Running of { data: 'conn ; + conn_metadata: 'conn_meta ; current_point: P2p_connection.Id.t } (** Successfully authentificated connection, normal business. *) | Disconnected (** No connection established currently. *) -type 'conn state = 'conn t +type ('conn, 'conn_meta) state = ('conn, 'conn_meta) t -val pp : Format.formatter -> 'conn t -> unit +val pp : Format.formatter -> ('conn, 'conn_meta) t -> unit module Info : sig - type ('conn, 'meta) t - type ('conn, 'meta) peer_info = ('conn, 'meta) t + type ('conn, 'peer_meta, 'conn_meta) t + type ('conn, 'peer_meta, 'conn_meta) peer_info = ('conn, 'peer_meta, 'conn_meta) t - val compare : ('conn, 'meta) t -> ('conn, 'meta) t -> int + val compare : ('conn, 'peer_meta, 'conn_meta) t -> ('conn, 'peer_meta, 'conn_meta) t -> int val create : ?created:Time.t -> ?trusted:bool -> - metadata:'meta -> - Id.t -> ('conn, 'meta) peer_info + peer_metadata:'peer_meta -> + Id.t -> ('conn, 'peer_meta, 'conn_meta) peer_info (** [create ~trusted ~meta peer_id] is a freshly minted peer_id info for [peer_id]. *) - val peer_id : ('conn, 'meta) peer_info -> Id.t + val peer_id : ('conn, 'peer_meta, 'conn_meta) peer_info -> Id.t - val created : ('conn, 'meta) peer_info -> Time.t - val metadata : ('conn, 'meta) peer_info -> 'meta - val set_metadata : ('conn, 'meta) peer_info -> 'meta -> unit + val created : ('conn, 'peer_meta, 'conn_meta) peer_info -> Time.t + val peer_metadata : ('conn, 'peer_meta, 'conn_meta) peer_info -> 'peer_meta + val set_peer_metadata : ('conn, 'peer_meta, 'conn_meta) peer_info -> 'peer_meta -> unit - val trusted : ('conn, 'meta) peer_info -> bool - val set_trusted : ('conn, 'meta) peer_info -> unit - val unset_trusted : ('conn, 'meta) peer_info -> unit + val trusted : ('conn, 'peer_meta, 'conn_meta) peer_info -> bool + val set_trusted : ('conn, 'peer_meta, 'conn_meta) peer_info -> unit + val unset_trusted : ('conn, 'peer_meta, 'conn_meta) peer_info -> unit val last_failed_connection : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + ('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option val last_rejected_connection : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + ('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option val last_established_connection : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + ('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option val last_disconnection : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + ('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option val last_seen : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + ('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option (** [last_seen gi] is the most recent of: * last established connection @@ -67,7 +68,7 @@ module Info : sig *) val last_miss : - ('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option + ('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option (** [last_miss gi] is the most recent of: * last failed connection @@ -77,39 +78,38 @@ module Info : sig val log_incoming_rejection : ?timestamp:Time.t -> - ('conn, 'meta) peer_info -> P2p_connection.Id.t -> unit + ('conn, 'peer_meta, 'conn_meta) peer_info -> P2p_connection.Id.t -> unit module File : sig val load : - string -> 'meta Data_encoding.t -> - ('conn, 'meta) peer_info list tzresult Lwt.t + string -> 'peer_meta Data_encoding.t -> + ('conn, 'peer_meta, 'conn_meta) peer_info list tzresult Lwt.t val save : - string -> 'meta Data_encoding.t -> - ('conn, 'meta) peer_info list -> unit tzresult Lwt.t + string -> 'peer_meta Data_encoding.t -> + ('conn, 'peer_meta, 'conn_meta) peer_info list -> unit tzresult Lwt.t end val fold : - ('conn, 'meta) t -> init:'a -> f:('a -> Pool_event.t -> 'a) -> 'a + ('conn, 'peer_meta, 'conn_meta) t -> init:'a -> f:('a -> Pool_event.t -> 'a) -> 'a val watch : - ('conn, 'meta) t -> Pool_event.t Lwt_stream.t * Lwt_watcher.stopper + ('conn, 'peer_meta, 'conn_meta) t -> Pool_event.t Lwt_stream.t * Lwt_watcher.stopper end +val get : ('conn, 'peer_meta, 'conn_meta) Info.t -> ('conn, 'conn_meta) state -val get : ('conn, 'meta) Info.t -> 'conn state - -val is_disconnected : ('conn, 'meta) Info.t -> bool +val is_disconnected : ('conn, 'peer_meta, 'conn_meta) Info.t -> bool val set_accepted : ?timestamp:Time.t -> - ('conn, 'meta) Info.t -> P2p_connection.Id.t -> Lwt_canceler.t -> unit + ('conn, 'peer_meta, 'conn_meta) Info.t -> P2p_connection.Id.t -> Lwt_canceler.t -> unit val set_running : ?timestamp:Time.t -> - ('conn, 'meta) Info.t -> P2p_connection.Id.t -> 'conn -> unit + ('conn, 'peer_meta, 'conn_meta) Info.t -> P2p_connection.Id.t -> 'conn -> 'conn_meta -> unit val set_disconnected : ?timestamp:Time.t -> ?requested:bool -> - ('conn, 'meta) Info.t -> unit + ('conn, 'peer_meta, 'conn_meta) Info.t -> unit diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index 0f1a66f62..d8718fe69 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -192,10 +192,10 @@ type config = { binary_chunks_size : int option ; } -type 'meta meta_config = { - encoding : 'meta Data_encoding.t; - initial : 'meta; - score : 'meta -> float; +type 'peer_meta peer_meta_config = { + peer_meta_encoding : 'peer_meta Data_encoding.t ; + peer_meta_initial : 'peer_meta ; + score : 'peer_meta -> float ; } type 'msg message_config = { @@ -203,17 +203,29 @@ type 'msg message_config = { versions : P2p_version.t list; } -type ('msg, 'meta) t = { +type 'conn_meta conn_meta_config = { + conn_meta_encoding : 'conn_meta Data_encoding.t ; + conn_meta_value : P2p_peer.Id.t -> 'conn_meta ; +} + +type ('msg, 'peer_meta, 'conn_meta) t = { config : config ; - meta_config : 'meta meta_config ; + peer_meta_config : 'peer_meta peer_meta_config ; + conn_meta_config : 'conn_meta conn_meta_config ; message_config : 'msg message_config ; my_id_points : unit P2p_point.Table.t ; known_peer_ids : - (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t P2p_peer.Table.t ; + (('msg, 'peer_meta, 'conn_meta) connection, + 'peer_meta, + 'conn_meta) P2p_peer_state.Info.t P2p_peer.Table.t ; connected_peer_ids : - (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t P2p_peer.Table.t ; - known_points : ('msg, 'meta) connection P2p_point_state.Info.t P2p_point.Table.t ; - connected_points : ('msg, 'meta) connection P2p_point_state.Info.t P2p_point.Table.t ; + (('msg, 'peer_meta, 'conn_meta) connection, + 'peer_meta, + 'conn_meta) P2p_peer_state.Info.t P2p_peer.Table.t ; + known_points : + ('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t P2p_point.Table.t ; + connected_points : + ('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t P2p_point.Table.t ; incoming : Lwt_canceler.t P2p_point.Table.t ; io_sched : P2p_io_scheduler.t ; encoding : 'msg Message.t Data_encoding.t ; @@ -221,7 +233,7 @@ type ('msg, 'meta) t = { watcher : P2p_connection.Pool_event.t Lwt_watcher.input ; acl : P2p_acl.t ; mutable new_connection_hook : - (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) list ; + (P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) list ; mutable latest_accepted_swap : Time.t ; mutable latest_succesfull_swap : Time.t ; } @@ -233,18 +245,20 @@ and events = { new_connection : unit Lwt_condition.t ; } -and ('msg, 'meta) connection = { +and ('msg, 'peer_meta, 'conn_meta) connection = { canceler : Lwt_canceler.t ; messages : (int * 'msg) Lwt_pipe.t ; conn : 'msg Message.t P2p_socket.t ; - peer_info : (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t ; - point_info : ('msg, 'meta) connection P2p_point_state.Info.t option ; + peer_info : + (('msg, 'peer_meta, 'conn_meta) connection, 'peer_meta, 'conn_meta) P2p_peer_state.Info.t ; + point_info : + ('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t option ; answerer : 'msg Answerer.t Lazy.t ; mutable last_sent_swap_request : (Time.t * P2p_peer.Id.t) option ; mutable wait_close : bool ; } -type ('msg, 'meta) pool = ('msg, 'meta) t +type ('msg, 'peer_meta, 'conn_meta) pool = ('msg, 'peer_meta, 'conn_meta) t module Pool_event = struct let wait_too_few_connections pool = @@ -318,7 +332,7 @@ module Gc_peer_set = List.Bounded(struct if score_cmp = 0 then Time.compare t t' else - score_cmp end) -let gc_peer_ids ({ meta_config = { score } ; +let gc_peer_ids ({ peer_meta_config = { score } ; config = { max_known_peer_ids } ; known_peer_ids ; } as pool) = match max_known_peer_ids with @@ -327,7 +341,7 @@ let gc_peer_ids ({ meta_config = { score } ; let table = Gc_peer_set.create target in P2p_peer.Table.iter (fun peer_id peer_info -> let created = P2p_peer_state.Info.created peer_info in - let score = score @@ P2p_peer_state.Info.metadata peer_info in + let score = score @@ P2p_peer_state.Info.peer_metadata peer_info in Gc_peer_set.insert (score, created, peer_id) table ) known_peer_ids ; let to_remove = Gc_peer_set.get table in @@ -340,7 +354,9 @@ let register_peer pool peer_id = match P2p_peer.Table.find pool.known_peer_ids peer_id with | exception Not_found -> Lwt_condition.broadcast pool.events.new_peer () ; - let peer = P2p_peer_state.Info.create peer_id ~metadata:pool.meta_config.initial in + let peer = + P2p_peer_state.Info.create peer_id + ~peer_metadata:pool.peer_meta_config.peer_meta_initial in Option.iter pool.config.max_known_peer_ids ~f:begin fun (max, _) -> if P2p_peer.Table.length pool.known_peer_ids >= max then gc_peer_ids pool end ; @@ -421,7 +437,8 @@ let get_addr pool peer_id = module Points = struct - type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t + type ('msg, 'peer_meta, 'conn_meta) info = + ('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t let info { known_points } point = P2p_point.Table.find_opt known_points point @@ -461,21 +478,22 @@ end module Peers = struct - type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t + type ('msg, 'peer_meta, 'conn_meta) info = + (('msg, 'peer_meta, 'conn_meta) connection, 'peer_meta, 'conn_meta) P2p_peer_state.Info.t let info { known_peer_ids } peer_id = try Some (P2p_peer.Table.find known_peer_ids peer_id) with Not_found -> None - let get_metadata pool peer_id = - try P2p_peer_state.Info.metadata (P2p_peer.Table.find pool.known_peer_ids peer_id) - with Not_found -> pool.meta_config.initial + let get_peer_metadata pool peer_id = + try P2p_peer_state.Info.peer_metadata (P2p_peer.Table.find pool.known_peer_ids peer_id) + with Not_found -> pool.peer_meta_config.peer_meta_initial let get_score pool peer_id = - pool.meta_config.score (get_metadata pool peer_id) + pool.peer_meta_config.score (get_peer_metadata pool peer_id) - let set_metadata pool peer_id data = - P2p_peer_state.Info.set_metadata (register_peer pool peer_id) data + let set_peer_metadata pool peer_id data = + P2p_peer_state.Info.set_peer_metadata (register_peer pool peer_id) data let get_trusted pool peer_id = try P2p_peer_state.Info.trusted (P2p_peer.Table.find pool.known_peer_ids peer_id) @@ -561,7 +579,7 @@ module Connection = struct let stat { conn } = P2p_socket.stat conn - let score { meta_config = { score }} meta = score meta + let score { peer_meta_config = { score }} meta = score meta let info { conn } = P2p_socket.info conn @@ -688,6 +706,7 @@ and authenticate pool ?point_info canceler fd point = ~incoming (P2p_io_scheduler.register pool.io_sched fd) point ?listening_port:pool.config.listening_port pool.config.identity pool.message_config.versions + pool.conn_meta_config.conn_meta_encoding end ~on_error: begin fun err -> begin match err with | [ Canceled ] -> @@ -779,11 +798,13 @@ and authenticate pool ?point_info canceler fd point = ?incoming_message_queue_size:pool.config.incoming_message_queue_size ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size ?binary_chunks_size:pool.config.binary_chunks_size - auth_fd pool.encoding >>= fun conn -> + auth_fd + (pool.conn_meta_config.conn_meta_value info.peer_id) + pool.encoding >>=? fun (conn, ack_cfg) -> lwt_debug "authenticate: %a -> Connected %a" P2p_point.Id.pp point P2p_connection.Info.pp info >>= fun () -> - Lwt.return conn + return (conn, ack_cfg) end ~on_error: begin fun err -> if incoming then log pool @@ -795,7 +816,7 @@ and authenticate pool ?point_info canceler fd point = ~f:P2p_point_state.set_disconnected ; P2p_peer_state.set_disconnected peer_info ; Lwt.return (Error err) - end >>=? fun conn -> + end >>=? fun (conn, ack_cfg) -> let id_point = match info.id_point, Option.map ~f:P2p_point_state.Info.point point_info with | (addr, _), Some (_, port) -> addr, Some port @@ -803,7 +824,7 @@ and authenticate pool ?point_info canceler fd point = return (create_connection pool conn - id_point connection_point_info peer_info version) + id_point connection_point_info peer_info version ack_cfg) end | _ -> begin log pool (Rejecting_request (point, info.id_point, info.peer_id)) ; @@ -819,7 +840,7 @@ and authenticate pool ?point_info canceler fd point = fail (P2p_errors.Rejected info.peer_id) end -and create_connection pool p2p_conn id_point point_info peer_info _version = +and create_connection pool p2p_conn id_point point_info peer_info _version ack_cfg = let peer_id = P2p_peer_state.Info.peer_id peer_info in let canceler = Lwt_canceler.create () in let size = @@ -851,7 +872,7 @@ and create_connection pool p2p_conn id_point point_info peer_info _version = P2p_point.Table.add pool.connected_points point point_info ; end ; log pool (Connection_established (id_point, peer_id)) ; - P2p_peer_state.set_running peer_info id_point conn ; + P2p_peer_state.set_running peer_info id_point conn ack_cfg ; P2p_peer.Table.add pool.connected_peer_ids peer_id peer_info ; Lwt_condition.broadcast pool.events.new_connection () ; Lwt_canceler.on_cancel canceler begin fun () -> @@ -1013,7 +1034,7 @@ let send_swap_request pool = (***************************************************************************) -let create config meta_config message_config io_sched = +let create config peer_meta_config conn_meta_config message_config io_sched = let events = { too_few_connections = Lwt_condition.create () ; too_many_connections = Lwt_condition.create () ; @@ -1021,7 +1042,7 @@ let create config meta_config message_config io_sched = new_connection = Lwt_condition.create () ; } in let pool = { - config ; meta_config ; message_config ; + config ; peer_meta_config ; conn_meta_config; message_config ; my_id_points = P2p_point.Table.create 7 ; known_peer_ids = P2p_peer.Table.create 53 ; connected_peer_ids = P2p_peer.Table.create 53 ; @@ -1038,7 +1059,9 @@ let create config meta_config message_config io_sched = latest_succesfull_swap = Time.epoch ; } in List.iter (Points.set_trusted pool) config.trusted_points ; - P2p_peer_state.Info.File.load config.peers_file meta_config.encoding >>= function + P2p_peer_state.Info.File.load + config.peers_file + peer_meta_config.peer_meta_encoding >>= function | Ok peer_ids -> List.iter (fun peer_info -> diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index dcbf96495..ef25fabc9 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -13,14 +13,15 @@ A pool and its connections are parametrized by the type of messages exchanged over the connection and the type of - meta-information associated with a peer. The type [('msg, 'meta) + meta-information associated with a peer. The type + [('msg, 'peer_meta,'conn_meta) connection] is a wrapper on top of [P2p_socket.t] that adds - meta-information, a data-structure describing the detailed state of - the connection, as well as a new message queue (referred to "app - message queue") that will only contain the messages from the - internal [P2p_socket.t] that needs to be examined by the higher - layers. Some messages are directly processed by an internal worker - and thus never propagated above. *) + meta-informations, data-structures describing the detailed state of + the peer and the connection, as well as a new message queue + (referred to "app message queue") that will only contain the + messages from the internal [P2p_socket.t] that needs to be examined + by the higher layers. Some messages are directly processed by an + internal worker and thus never propagated above. *) type 'msg encoding = Encoding : { tag: int ; @@ -32,11 +33,12 @@ type 'msg encoding = Encoding : { (** {1 Pool management} *) -type ('msg, 'meta) t +type ('msg, 'peer_meta,'conn_meta) t -type ('msg, 'meta) pool = ('msg, 'meta) t +type ('msg, 'peer_meta,'conn_meta) pool = ('msg, 'peer_meta,'conn_meta) t (** The type of a pool of connections, parametrized by resp. the type - of messages and the meta-information associated to an identity. *) + of messages and the meta-informations associated to an identity and + a connection. *) type config = { @@ -121,10 +123,15 @@ type config = { peers. Default value is 64 kB. *) } -type 'meta meta_config = { - encoding : 'meta Data_encoding.t; - initial : 'meta; - score : 'meta -> float; +type 'peer_meta peer_meta_config = { + peer_meta_encoding : 'peer_meta Data_encoding.t ; + peer_meta_initial : 'peer_meta ; + score : 'peer_meta -> float ; +} + +type 'conn_meta conn_meta_config = { + conn_meta_encoding : 'conn_meta Data_encoding.t ; + conn_meta_value : P2p_peer.Id.t -> 'conn_meta ; } type 'msg message_config = { @@ -134,22 +141,23 @@ type 'msg message_config = { val create: config -> - 'meta meta_config -> + 'peer_meta peer_meta_config -> + 'conn_meta conn_meta_config -> 'msg message_config -> P2p_io_scheduler.t -> - ('msg, 'meta) pool Lwt.t + ('msg, 'peer_meta,'conn_meta) pool Lwt.t (** [create config meta_cfg msg_cfg io_sched] is a freshly minted pool. *) -val destroy: ('msg, 'meta) pool -> unit Lwt.t +val destroy: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t (** [destroy pool] returns when member connections are either disconnected or canceled. *) -val active_connections: ('msg, 'meta) pool -> int +val active_connections: ('msg, 'peer_meta,'conn_meta) pool -> int (** [active_connections pool] is the number of connections inside [pool]. *) -val pool_stat: ('msg, 'meta) pool -> P2p_stat.t +val pool_stat: ('msg, 'peer_meta,'conn_meta) pool -> P2p_stat.t (** [pool_stat pool] is a snapshot of current bandwidth usage for the entire [pool]. *) @@ -157,25 +165,25 @@ val config : _ pool -> config (** [config pool] is the [config] argument passed to [pool] at creation. *) -val send_swap_request: ('msg, 'meta) pool -> unit +val send_swap_request: ('msg, 'peer_meta,'conn_meta) pool -> unit (** {2 Pool events} *) module Pool_event : sig - val wait_too_few_connections: ('msg, 'meta) pool -> unit Lwt.t + val wait_too_few_connections: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t (** [wait_too_few_connections pool] is determined when the number of connections drops below the desired level. *) - val wait_too_many_connections: ('msg, 'meta) pool -> unit Lwt.t + val wait_too_many_connections: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t (** [wait_too_many_connections pool] is determined when the number of connections exceeds the desired level. *) - val wait_new_peer: ('msg, 'meta) pool -> unit Lwt.t + val wait_new_peer: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t (** [wait_new_peer pool] is determined when a new peer (i.e. authentication successful) gets added to the pool. *) - val wait_new_connection: ('msg, 'meta) pool -> unit Lwt.t + val wait_new_connection: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t (** [wait_new_connection pool] is determined when a new connection is succesfully established in the pool. *) @@ -184,142 +192,154 @@ end (** {1 Connections management} *) -type ('msg, 'meta) connection +type ('msg, 'peer_meta,'conn_meta) connection (** Type of a connection to a peer, parametrized by the type of messages exchanged as well as meta-information associated to a - peer. It mostly wraps [P2p_connection.connection], adding - meta-information and data-structures describing a more + peer and a connection. It mostly wraps [P2p_connection.connection], + adding meta-information and data-structures describing a more fine-grained logical state of the connection. *) val connect: ?timeout:float -> - ('msg, 'meta) pool -> P2p_point.Id.t -> - ('msg, 'meta) connection tzresult Lwt.t + ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> + ('msg, 'peer_meta,'conn_meta) connection tzresult Lwt.t (** [connect ?timeout pool point] tries to add a connection to [point] in [pool] in less than [timeout] seconds. *) val accept: - ('msg, 'meta) pool -> Lwt_unix.file_descr -> P2p_point.Id.t -> unit + ('msg, 'peer_meta,'conn_meta) pool -> Lwt_unix.file_descr -> P2p_point.Id.t -> unit (** [accept pool fd point] instructs [pool] to start the process of accepting a connection from [fd]. Used by [P2p]. *) val disconnect: - ?wait:bool -> ('msg, 'meta) connection -> unit Lwt.t + ?wait:bool -> ('msg, 'peer_meta,'conn_meta) connection -> unit Lwt.t (** [disconnect conn] cleanly closes [conn] and returns after [conn]'s internal worker has returned. *) module Connection : sig - val info: ('msg, 'meta) connection -> P2p_connection.Info.t + val info: ('msg, 'peer_meta,'conn_meta) connection -> P2p_connection.Info.t - val stat: ('msg, 'meta) connection -> P2p_stat.t + val stat: ('msg, 'peer_meta,'conn_meta) connection -> P2p_stat.t (** [stat conn] is a snapshot of current bandwidth usage for [conn]. *) val fold: - ('msg, 'meta) pool -> + ('msg, 'peer_meta,'conn_meta) pool -> init:'a -> - f:(P2p_peer.Id.t -> ('msg, 'meta) connection -> 'a -> 'a) -> + f:(P2p_peer.Id.t -> ('msg, 'peer_meta,'conn_meta) connection -> 'a -> 'a) -> 'a val list: - ('msg, 'meta) pool -> (P2p_peer.Id.t * ('msg, 'meta) connection) list + ('msg, 'peer_meta,'conn_meta) pool -> + (P2p_peer.Id.t * ('msg, 'peer_meta,'conn_meta) connection) list val find_by_point: - ('msg, 'meta) pool -> P2p_point.Id.t -> ('msg, 'meta) connection option + ('msg, 'peer_meta,'conn_meta) pool -> + P2p_point.Id.t -> + ('msg, 'peer_meta,'conn_meta) connection option val find_by_peer_id: - ('msg, 'meta) pool -> P2p_peer.Id.t -> ('msg, 'meta) connection option + ('msg, 'peer_meta,'conn_meta) pool -> + P2p_peer.Id.t -> + ('msg, 'peer_meta,'conn_meta) connection option end val on_new_connection: - ('msg, 'meta) pool -> - (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit + ('msg, 'peer_meta,'conn_meta) pool -> + (P2p_peer.Id.t -> ('msg, 'peer_meta,'conn_meta) connection -> unit) -> unit (** {1 I/O on connections} *) -val read: ('msg, 'meta) connection -> 'msg tzresult Lwt.t +val read: ('msg, 'peer_meta,'conn_meta) connection -> 'msg tzresult Lwt.t (** [read conn] returns a message popped from [conn]'s app message queue, or fails with [Connection_closed]. *) -val is_readable: ('msg, 'meta) connection -> unit tzresult Lwt.t +val is_readable: ('msg, 'peer_meta,'conn_meta) connection -> unit tzresult Lwt.t (** [is_readable conn] returns when there is at least one message ready to be read. *) -val write: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t +val write: + ('msg, 'peer_meta,'conn_meta) connection -> 'msg -> unit tzresult Lwt.t (** [write conn msg] is [P2p_connection.write conn' msg] where [conn'] is the internal [P2p_connection.t] inside [conn]. *) -val write_sync: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t +val write_sync: + ('msg, 'peer_meta,'conn_meta) connection -> 'msg -> unit tzresult Lwt.t (** [write_sync conn msg] is [P2p_connection.write_sync conn' msg] where [conn'] is the internal [P2p_connection.t] inside [conn]. *) (**/**) val raw_write_sync: - ('msg, 'meta) connection -> MBytes.t -> unit tzresult Lwt.t + ('msg, 'peer_meta,'conn_meta) connection -> MBytes.t -> unit tzresult Lwt.t (**/**) -val write_now: ('msg, 'meta) connection -> 'msg -> bool tzresult +val write_now: ('msg, 'peer_meta,'conn_meta) connection -> 'msg -> bool tzresult (** [write_now conn msg] is [P2p_connection.write_now conn' msg] where [conn'] is the internal [P2p_connection.t] inside [conn]. *) (** {2 Broadcast functions} *) -val write_all: ('msg, 'meta) pool -> 'msg -> unit +val write_all: ('msg, 'peer_meta,'conn_meta) pool -> 'msg -> unit (** [write_all pool msg] is [write_now conn msg] for all member connections to [pool] in [Running] state. *) -val broadcast_bootstrap_msg: ('msg, 'meta) pool -> unit +val broadcast_bootstrap_msg: ('msg, 'peer_meta,'conn_meta) pool -> unit (** [write_all pool msg] is [P2P_connection.write_now conn Bootstrap] for all member connections to [pool] in [Running] state. *) -val greylist_addr : ('msg, 'meta) pool -> P2p_addr.t -> unit +val greylist_addr : ('msg, 'peer_meta,'conn_meta) pool -> P2p_addr.t -> unit (** [greylist_addr pool addr] adds [addr] to [pool]'s IP greylist. *) -val greylist_peer : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit +val greylist_peer : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit (** [greylist_peer pool peer] adds [peer] to [pool]'s peer greylist and [peer]'s address to [pool]'s IP greylist. *) -val gc_greylist: older_than:Time.t -> ('msg, 'meta) pool -> unit +val gc_greylist: older_than:Time.t -> ('msg, 'peer_meta,'conn_meta) pool -> unit (** [gc_greylist ~older_than pool] *) -val acl_clear : ('msg, 'meta) pool -> unit +val acl_clear : ('msg, 'peer_meta,'conn_meta) pool -> unit (** [acl_clear pool] clears ACL tables. *) (** {1 Functions on [Peer_id]} *) module Peers : sig - type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t + type ('msg, 'peer_meta,'conn_meta) info = + (('msg, 'peer_meta,'conn_meta) connection, 'peer_meta,'conn_meta) P2p_peer_state.Info.t val info: - ('msg, 'meta) pool -> P2p_peer.Id.t -> ('msg, 'meta) info option + ('msg, 'peer_meta,'conn_meta) pool -> + P2p_peer.Id.t -> + ('msg, 'peer_meta,'conn_meta) info option - val get_metadata: ('msg, 'meta) pool -> P2p_peer.Id.t -> 'meta - val set_metadata: ('msg, 'meta) pool -> P2p_peer.Id.t -> 'meta -> unit - val get_score: ('msg, 'meta) pool -> P2p_peer.Id.t -> float + val get_peer_metadata: + ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> 'peer_meta + val set_peer_metadata: + ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> 'peer_meta -> unit + val get_score: ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> float - val get_trusted: ('msg, 'meta) pool -> P2p_peer.Id.t -> bool - val set_trusted: ('msg, 'meta) pool -> P2p_peer.Id.t -> unit - val unset_trusted: ('msg, 'meta) pool -> P2p_peer.Id.t -> unit + val get_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> bool + val set_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit + val unset_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit val fold_known: - ('msg, 'meta) pool -> + ('msg, 'peer_meta,'conn_meta) pool -> init:'a -> - f:(P2p_peer.Id.t -> ('msg, 'meta) info -> 'a -> 'a) -> + f:(P2p_peer.Id.t -> ('msg, 'peer_meta,'conn_meta) info -> 'a -> 'a) -> 'a val fold_connected: - ('msg, 'meta) pool -> + ('msg, 'peer_meta,'conn_meta) pool -> init:'a -> - f:(P2p_peer.Id.t -> ('msg, 'meta) info -> 'a -> 'a) -> + f:(P2p_peer.Id.t -> ('msg, 'peer_meta,'conn_meta) info -> 'a -> 'a) -> 'a - val forget : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit - val ban : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit - val trust : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit - val banned : ('msg, 'meta) pool -> P2p_peer.Id.t -> bool + val forget : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit + val ban : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit + val trust : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit + val banned : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> bool end @@ -327,35 +347,40 @@ end module Points : sig - type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t + type ('msg, 'peer_meta,'conn_meta) info = + ('msg, 'peer_meta,'conn_meta) connection P2p_point_state.Info.t val info: - ('msg, 'meta) pool -> P2p_point.Id.t -> ('msg, 'meta) info option + ('msg, 'peer_meta,'conn_meta) pool -> + P2p_point.Id.t -> + ('msg, 'peer_meta,'conn_meta) info option - val get_trusted: ('msg, 'meta) pool -> P2p_point.Id.t -> bool - val set_trusted: ('msg, 'meta) pool -> P2p_point.Id.t -> unit - val unset_trusted: ('msg, 'meta) pool -> P2p_point.Id.t -> unit + val get_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> bool + val set_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit + val unset_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit val fold_known: - ('msg, 'meta) pool -> + ('msg, 'peer_meta,'conn_meta) pool -> init:'a -> - f:(P2p_point.Id.t -> ('msg, 'meta) info -> 'a -> 'a) -> + f:(P2p_point.Id.t -> ('msg, 'peer_meta,'conn_meta) info -> 'a -> 'a) -> 'a val fold_connected: - ('msg, 'meta) pool -> + ('msg, 'peer_meta,'conn_meta) pool -> init:'a -> - f:(P2p_point.Id.t -> ('msg, 'meta) info -> 'a -> 'a) -> + f:(P2p_point.Id.t -> ('msg, 'peer_meta,'conn_meta) info -> 'a -> 'a) -> 'a - val forget : ('msg, 'meta) pool -> P2p_point.Id.t -> unit - val ban : ('msg, 'meta) pool -> P2p_point.Id.t -> unit - val trust : ('msg, 'meta) pool -> P2p_point.Id.t -> unit - val banned : ('msg, 'meta) pool -> P2p_point.Id.t -> bool + val forget : ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit + val ban : ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit + val trust : ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit + val banned : ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> bool end -val watch: ('msg, 'meta) pool -> P2p_connection.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper +val watch: + ('msg, 'peer_meta,'conn_meta) pool -> + P2p_connection.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper (** [watch pool] is a [stream, close] a [stream] of events and a [close] function for this stream. *) diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index 1f75c4334..98a59e67b 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -152,25 +152,67 @@ end module Ack = struct - type t = Ack | Nack - let ack = MBytes.of_string "\255" - let nack = MBytes.of_string "\000" + type 'a t = Ack of 'a | Nack - let write cryptobox_data fd b = - Crypto.write_chunk cryptobox_data fd - (match b with Ack -> ack | Nack -> nack) + let encoding ack_encoding = + let open Data_encoding in + let ack_encoding = obj1 (req "ack" ack_encoding) in + let nack_encoding = obj1 (req "nack" empty) in + let ack_case tag = + case tag ack_encoding + (function + | Ack param -> Some param + | _ -> None) + (fun param -> Ack param) in + let nack_case tag = + case tag nack_encoding + (function + | Nack -> Some () + | _ -> None + ) + (fun _ -> Nack) in + union [ + ack_case (Tag 0) ; + nack_case (Tag 1) ; + ] - let read fd cryptobox_data = + let write ack_encoding cryptobox_data fd message = + let encoding = encoding ack_encoding in + let encoded_message_len = + Data_encoding.Binary.length encoding message in + let buf = MBytes.create encoded_message_len in + match Data_encoding.Binary.write encoding message buf 0 encoded_message_len with + | None -> + fail P2p_errors.Encoding_error + | Some last -> + fail_unless (last = encoded_message_len) + P2p_errors.Encoding_error >>=? fun () -> + Crypto.write_chunk cryptobox_data fd buf + + let read ack_encoding fd cryptobox_data = + let encoding = encoding ack_encoding in Crypto.read_chunk fd cryptobox_data >>=? fun buf -> - return (buf <> nack) + let length = MBytes.length buf in + match Data_encoding.Binary.read encoding buf 0 length with + | None -> + fail P2p_errors.Decoding_error + | Some (read_len, message) -> + if read_len <> length then + fail P2p_errors.Decoding_error + else + return message end -type authenticated_fd = - P2p_io_scheduler.connection * P2p_connection.Info.t * Crypto.data +type 'conn_meta authenticated_fd = { + fd: P2p_io_scheduler.connection ; + info: P2p_connection.Info.t ; + cryptobox_data: Crypto.data ; + ack_encoding: 'conn_meta Data_encoding.t ; +} -let kick (fd, _ , cryptobox_data) = - Ack.write fd cryptobox_data Nack >>= fun _ -> +let kick { fd ; ack_encoding ; cryptobox_data ; _ } = + Ack.write ack_encoding fd cryptobox_data Nack >>= fun _ -> P2p_io_scheduler.close fd >>= fun _ -> Lwt.return_unit @@ -180,7 +222,7 @@ let kick (fd, _ , cryptobox_data) = let authenticate ~proof_of_work_target ~incoming fd (remote_addr, remote_socket_port as point) - ?listening_port identity supported_versions = + ?listening_port identity supported_versions ack_encoding = let local_nonce_seed = Crypto_box.random_nonce () in lwt_debug "Sending authenfication to %a" P2p_point.Id.pp point >>= fun () -> Connection_message.write fd @@ -210,7 +252,7 @@ let authenticate { P2p_connection.Info.peer_id = remote_peer_id ; versions = msg.versions ; incoming ; id_point ; remote_socket_port ;} in - return (info, (fd, info, cryptobox_data)) + return (info, { fd ; info ; cryptobox_data ; ack_encoding }) type connection = { id : int ; @@ -449,33 +491,38 @@ let info { conn } = conn.info let accept ?incoming_message_queue_size ?outgoing_message_queue_size - ?binary_chunks_size (fd, info, cryptobox_data) encoding = + ?binary_chunks_size + { fd ; info ; cryptobox_data ; ack_encoding } + ack_param + encoding = protect begin fun () -> - Ack.write fd cryptobox_data Ack >>=? fun () -> - Ack.read fd cryptobox_data + Ack.write ack_encoding fd cryptobox_data (Ack ack_param) >>=? fun () -> + Ack.read ack_encoding fd cryptobox_data end ~on_error:begin fun err -> P2p_io_scheduler.close fd >>= fun _ -> match err with | [ P2p_errors.Connection_closed ] -> fail P2p_errors.Rejected_socket_connection | [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth | err -> Lwt.return (Error err) - end >>=? fun accepted -> - fail_unless accepted P2p_errors.Rejected_socket_connection >>=? fun () -> - let canceler = Lwt_canceler.create () in - let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in - let reader = - Reader.run ?size:incoming_message_queue_size conn encoding canceler - and writer = - Writer.run - ?size:outgoing_message_queue_size ?binary_chunks_size - conn encoding canceler - in - let conn = { conn ; reader ; writer } in - Lwt_canceler.on_cancel canceler begin fun () -> - P2p_io_scheduler.close fd >>= fun _ -> - Lwt.return_unit - end ; - return conn + end >>=? function + | Ack ack_cfg -> + let canceler = Lwt_canceler.create () in + let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in + let reader = + Reader.run ?size:incoming_message_queue_size conn encoding canceler + and writer = + Writer.run + ?size:outgoing_message_queue_size ?binary_chunks_size + conn encoding canceler + in + let conn = { conn ; reader ; writer } in + Lwt_canceler.on_cancel canceler begin fun () -> + P2p_io_scheduler.close fd >>= fun _ -> + Lwt.return_unit + end ; + return (conn, ack_cfg) + | Nack -> + fail P2p_errors.Rejected_socket_connection let catch_closed_pipe f = Lwt.catch f begin function diff --git a/src/lib_p2p/p2p_socket.mli b/src/lib_p2p/p2p_socket.mli index a3cb36338..e8444770b 100644 --- a/src/lib_p2p/p2p_socket.mli +++ b/src/lib_p2p/p2p_socket.mli @@ -19,9 +19,10 @@ (** {1 Types} *) -type authenticated_fd +type 'conn_meta authenticated_fd (** Type of a connection that successfully passed the authentication - phase, but has not been accepted yet. *) + phase, but has not been accepted yet. Parametrized by the type + of expected parameter in the `ack` message. *) type 'msg t (** Type of an accepted connection, parametrized by the type of @@ -39,14 +40,14 @@ val authenticate: incoming:bool -> P2p_io_scheduler.connection -> P2p_point.Id.t -> ?listening_port: int -> - P2p_identity.t -> P2p_version.t list -> - (P2p_connection.Info.t * authenticated_fd) tzresult Lwt.t + P2p_identity.t -> P2p_version.t list -> 'conn_meta Data_encoding.t -> + (P2p_connection.Info.t * 'conn_meta authenticated_fd) tzresult Lwt.t (** (Low-level) (Cancelable) Authentication function of a remote peer. Used in [P2p_connection_pool], to promote a [P2P_io_scheduler.connection] into an [authenticated_fd] (auth correct, acceptation undecided). *) -val kick: authenticated_fd -> unit Lwt.t +val kick: 'conn_meta authenticated_fd -> unit Lwt.t (** (Low-level) (Cancelable) [kick afd] notifies the remote peer that we refuse this connection and then closes [afd]. Used in [P2p_connection_pool] to reject an [aunthenticated_fd] which we do @@ -56,7 +57,8 @@ val accept: ?incoming_message_queue_size:int -> ?outgoing_message_queue_size:int -> ?binary_chunks_size: int -> - authenticated_fd -> 'msg Data_encoding.t -> 'msg t tzresult Lwt.t + 'conn_meta authenticated_fd -> 'conn_meta -> + 'msg Data_encoding.t -> ('msg t * 'conn_meta) tzresult Lwt.t (** (Low-level) (Cancelable) Accepts a remote peer given an authenticated_fd. Used in [P2p_connection_pool], to promote an [authenticated_fd] to the status of an active peer. *) diff --git a/src/lib_p2p/p2p_welcome.ml b/src/lib_p2p/p2p_welcome.ml index b17c47685..5f52d9b9e 100644 --- a/src/lib_p2p/p2p_welcome.ml +++ b/src/lib_p2p/p2p_welcome.ml @@ -9,7 +9,7 @@ include Logging.Make (struct let name = "p2p.welcome" end) -type pool = Pool : ('msg, 'meta) P2p_pool.t -> pool +type pool = Pool : ('msg, 'meta, 'meta_conn) P2p_pool.t -> pool type t = { socket: Lwt_unix.file_descr ; diff --git a/src/lib_p2p/p2p_welcome.mli b/src/lib_p2p/p2p_welcome.mli index 3e83d6daf..5efb8ffc2 100644 --- a/src/lib_p2p/p2p_welcome.mli +++ b/src/lib_p2p/p2p_welcome.mli @@ -17,7 +17,7 @@ type t val run: ?addr:P2p_addr.t -> backlog:int -> - ('msg, 'meta) P2p_pool.t -> P2p_addr.port -> t Lwt.t + ('msg, 'meta, 'meta_conn) P2p_pool.t -> P2p_addr.port -> t Lwt.t (** [run ?addr ~backlog pool port] returns a running welcome worker adding connections into [pool] listening on [addr:port]. [backlog] is passed to [Lwt_unix.listen]. *) diff --git a/src/lib_p2p/test/test_p2p_pool.ml b/src/lib_p2p/test/test_p2p_pool.ml index 3bdfe7d6d..030b7fa31 100644 --- a/src/lib_p2p/test/test_p2p_pool.ml +++ b/src/lib_p2p/test/test_p2p_pool.ml @@ -27,12 +27,17 @@ let msg_config : message P2p_pool.message_config = { type metadata = unit -let meta_config : metadata P2p_pool.meta_config = { - encoding = Data_encoding.empty ; - initial = () ; +let peer_meta_config : metadata P2p_pool.peer_meta_config = { + peer_meta_encoding = Data_encoding.empty ; + peer_meta_initial = () ; score = fun () -> 0. ; } +let conn_meta_config : metadata P2p_pool.conn_meta_config = { + conn_meta_encoding = Data_encoding.empty ; + conn_meta_value = (fun _ -> ()) ; +} + let sync ch = Process.Channel.push ch () >>=? fun () -> Process.Channel.pop ch >>=? fun () -> @@ -87,7 +92,7 @@ let detach_node f points n = begin fun channel -> let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in P2p_pool.create - config meta_config msg_config sched >>= fun pool -> + config peer_meta_config conn_meta_config msg_config sched >>= fun pool -> P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome -> lwt_log_info "Node ready (port: %d)" port >>= fun () -> sync channel >>=? fun () -> diff --git a/src/lib_p2p/test/test_p2p_socket.ml b/src/lib_p2p/test/test_p2p_socket.ml index dd1490e12..165074c60 100644 --- a/src/lib_p2p/test/test_p2p_socket.ml +++ b/src/lib_p2p/test/test_p2p_socket.ml @@ -95,7 +95,7 @@ let accept sched main_socket = raw_accept sched main_socket >>= fun (fd, point) -> P2p_socket.authenticate ~proof_of_work_target - ~incoming:true fd point id1 versions + ~incoming:true fd point id1 versions Data_encoding.unit let raw_connect sched addr port = let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in @@ -109,7 +109,7 @@ let connect sched addr port id = raw_connect sched addr port >>= fun fd -> P2p_socket.authenticate ~proof_of_work_target - ~incoming:false fd (addr, port) id versions >>=? fun (info, auth_fd) -> + ~incoming:false fd (addr, port) id versions Data_encoding.unit >>=? fun (info, auth_fd) -> _assert (not info.incoming) __LOC__ "" >>=? fun () -> _assert (P2p_peer.Id.compare info.peer_id id1.peer_id = 0) __LOC__ "" >>=? fun () -> @@ -172,7 +172,7 @@ module Kick = struct let client _ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd encoding >>= fun conn -> + P2p_socket.accept auth_fd () encoding >>= fun conn -> _assert (is_rejected conn) __LOC__ "" >>=? fun () -> return () @@ -186,7 +186,7 @@ module Kicked = struct let server _ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd encoding >>= fun conn -> + P2p_socket.accept auth_fd () encoding >>= fun conn -> _assert (Kick.is_rejected conn) __LOC__ "" >>=? fun () -> return () @@ -208,7 +208,7 @@ module Simple_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -218,7 +218,7 @@ module Simple_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -240,7 +240,7 @@ module Chunked_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> P2p_socket.accept - ~binary_chunks_size:21 auth_fd encoding >>=? fun conn -> + ~binary_chunks_size:21 auth_fd () encoding >>=? fun (conn, _ack_cfg) -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -251,7 +251,7 @@ module Chunked_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> P2p_socket.accept - ~binary_chunks_size:21 auth_fd encoding >>=? fun conn -> + ~binary_chunks_size:21 auth_fd () encoding >>=? fun (conn, _ack_cfg) -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -272,7 +272,7 @@ module Oversized_message = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> P2p_socket.write_sync conn simple_msg >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> @@ -282,7 +282,7 @@ module Oversized_message = struct let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> P2p_socket.write_sync conn simple_msg2 >>=? fun () -> P2p_socket.read conn >>=? fun (_msg_size, msg) -> _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> @@ -302,14 +302,14 @@ module Close_on_read = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return () let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> sync ch >>=? fun () -> P2p_socket.read conn >>= fun err -> _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> @@ -328,14 +328,14 @@ module Close_on_write = struct let server ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> P2p_socket.close conn >>= fun _stat -> sync ch >>=? fun ()-> return () let client ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> sync ch >>=? fun ()-> Lwt_unix.sleep 0.1 >>= fun () -> P2p_socket.write_sync conn simple_msg >>= fun err -> @@ -365,7 +365,7 @@ module Garbled_data = struct let server _ch sched socket = accept sched socket >>=? fun (_info, auth_fd) -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> P2p_socket.raw_write_sync conn garbled_msg >>=? fun () -> P2p_socket.read conn >>= fun err -> _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> @@ -374,7 +374,7 @@ module Garbled_data = struct let client _ch sched addr port = connect sched addr port id2 >>=? fun auth_fd -> - P2p_socket.accept auth_fd encoding >>=? fun conn -> + P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) -> P2p_socket.read conn >>= fun err -> _assert (is_decoding_error err) __LOC__ "" >>=? fun () -> P2p_socket.close conn >>= fun _stat -> diff --git a/src/lib_shell/distributed_db.ml b/src/lib_shell/distributed_db.ml index 4f776be12..65114af56 100644 --- a/src/lib_shell/distributed_db.ml +++ b/src/lib_shell/distributed_db.ml @@ -8,10 +8,9 @@ (**************************************************************************) module Message = Distributed_db_message -module Metadata = Distributed_db_metadata -type p2p = (Message.t, Metadata.t) P2p.net -type connection = (Message.t, Metadata.t) P2p.connection +type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net +type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection type 'a request_param = { data: 'a ; diff --git a/src/lib_shell/distributed_db.mli b/src/lib_shell/distributed_db.mli index ff0ec06f6..7af249ae7 100644 --- a/src/lib_shell/distributed_db.mli +++ b/src/lib_shell/distributed_db.mli @@ -15,9 +15,8 @@ type t type db = t module Message = Distributed_db_message -module Metadata = Distributed_db_metadata -type p2p = (Message.t, Metadata.t) P2p.net +type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net val create: State.t -> p2p -> t val state: db -> State.t diff --git a/src/lib_shell/node.ml b/src/lib_shell/node.ml index e397446f9..784fc0b6c 100644 --- a/src/lib_shell/node.ml +++ b/src/lib_shell/node.ml @@ -65,19 +65,31 @@ type t = { shutdown: unit -> unit Lwt.t ; } +let peer_metadata_cfg : _ P2p.peer_meta_config = { + peer_meta_encoding = Peer_metadata.encoding ; + peer_meta_initial = () ; + score = fun _ -> 0. ; +} + +let connection_metadata_cfg : _ P2p.conn_meta_config = { + conn_meta_encoding = Peer_metadata.encoding ; + conn_meta_value = fun _ -> () ; +} + let init_p2p p2p_params = match p2p_params with | None -> lwt_log_notice "P2P layer is disabled" >>= fun () -> - Error_monad.return (P2p.faked_network Distributed_db_metadata.cfg) + return (P2p.faked_network peer_metadata_cfg) | Some (config, limits) -> lwt_log_notice "bootstraping chain..." >>= fun () -> P2p.create ~config ~limits - Distributed_db_metadata.cfg + peer_metadata_cfg + connection_metadata_cfg Distributed_db_message.cfg >>=? fun p2p -> Lwt.async (fun () -> P2p.maintain p2p) ; - Error_monad.return p2p + return p2p type config = { genesis: State.Chain.genesis ; diff --git a/src/lib_shell/distributed_db_metadata.ml b/src/lib_shell_services/connection_metadata.ml similarity index 87% rename from src/lib_shell/distributed_db_metadata.ml rename to src/lib_shell_services/connection_metadata.ml index 2bf7cf463..1bd9c324d 100644 --- a/src/lib_shell/distributed_db_metadata.ml +++ b/src/lib_shell_services/connection_metadata.ml @@ -8,8 +8,4 @@ (**************************************************************************) type t = unit -let initial = () let encoding = Data_encoding.empty -let score () = 0. - -let cfg : _ P2p.meta_config = { encoding ; initial ; score } diff --git a/src/lib_shell/distributed_db_metadata.mli b/src/lib_shell_services/connection_metadata.mli similarity index 86% rename from src/lib_shell/distributed_db_metadata.mli rename to src/lib_shell_services/connection_metadata.mli index 1518967bd..4a1dd6f08 100644 --- a/src/lib_shell/distributed_db_metadata.mli +++ b/src/lib_shell_services/connection_metadata.mli @@ -7,7 +7,7 @@ (* *) (**************************************************************************) -(** Tezos Shell - All the (persistent) metadata associated to a peer. *) +(** All the metadata associated to a running connection. *) type t = unit (* TODO *) -val cfg : t P2p.meta_config +val encoding: t Data_encoding.t diff --git a/src/lib_shell_services/peer_metadata.ml b/src/lib_shell_services/peer_metadata.ml new file mode 100644 index 000000000..1bd9c324d --- /dev/null +++ b/src/lib_shell_services/peer_metadata.ml @@ -0,0 +1,11 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type t = unit +let encoding = Data_encoding.empty diff --git a/src/lib_shell_services/peer_metadata.mli b/src/lib_shell_services/peer_metadata.mli new file mode 100644 index 000000000..9198ca464 --- /dev/null +++ b/src/lib_shell_services/peer_metadata.mli @@ -0,0 +1,13 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2018. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** All the (persistent) metadata associated to a peer. *) + +type t = unit (* TODO *) +val encoding: t Data_encoding.t