diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 09386c6ff..87d9352b3 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -9,6 +9,25 @@ include P2p_types +type 'meta meta_config = 'meta P2p_connection_pool.meta_config = { + encoding : 'meta Data_encoding.t; + initial : 'meta; +} + +type 'msg app_message_encoding = 'msg P2p_connection_pool.encoding = + Encoding : { + tag: int ; + encoding: 'a Data_encoding.t ; + wrap: 'a -> 'msg ; + unwrap: 'msg -> 'a option ; + max_length: int option ; + } -> 'msg app_message_encoding + +type 'msg message_config = 'msg P2p_connection_pool.message_config = { + encoding : 'msg app_message_encoding list ; + versions : Version.t list; +} + type config = { listening_port : port option ; listening_addr : addr option ; @@ -107,224 +126,204 @@ let may_create_welcome_worker config limits pool = ?addr:config.listening_addr port >>= fun w -> Lwt.return (Some w) -module type MESSAGE = sig - type t - val encoding : t P2p_connection_pool.encoding list - val supported_versions : Version.t list -end +type ('msg, 'meta) connection = ('msg, 'meta) P2p_connection_pool.connection -module type METADATA = sig - type t - val initial : t - val encoding : t Data_encoding.t - val score : t -> float -end +module Real = struct -module Make (Message : MESSAGE) (Metadata : METADATA) = struct - - let meta_cfg = { - P2p_connection_pool.encoding = Metadata.encoding ; - initial = Metadata.initial ; - } - and msg_cfg = { - P2p_connection_pool.encoding = Message.encoding ; - versions = Message.supported_versions ; + type ('msg, 'meta) net = { + config: config ; + limits: limits ; + io_sched: P2p_io_scheduler.t ; + pool: ('msg, 'meta) P2p_connection_pool.t ; + discoverer: P2p_discovery.t option ; + maintenance: 'meta P2p_maintenance.t ; + welcome: P2p_welcome.t option ; } - type connection = (Message.t, Metadata.t) P2p_connection_pool.connection - - module Real = struct - - type net = { - config: config ; - limits: limits ; - io_sched: P2p_io_scheduler.t ; - pool: (Message.t, Metadata.t) P2p_connection_pool.t ; - discoverer: P2p_discovery.t option ; - maintenance: Metadata.t P2p_maintenance.t ; - welcome: P2p_welcome.t option ; - } - - let create ~config ~limits = - let io_sched = create_scheduler limits in - create_connection_pool - config limits meta_cfg msg_cfg io_sched >>= fun pool -> - let discoverer = may_create_discovery_worker config pool in - let maintenance = create_maintenance_worker limits pool discoverer in - may_create_welcome_worker config limits pool >>= fun welcome -> - Lwt.return { - config ; - limits ; - io_sched ; - pool ; - discoverer ; - maintenance ; - welcome ; - } - - let gid { config } = config.identity.gid - - let maintain { maintenance } () = - P2p_maintenance.maintain maintenance - - let roll _net () = Lwt.return_unit (* TODO implement *) - - (* returns when all workers have shutted down in the opposite - creation order. *) - let shutdown net () = - Lwt_utils.may ~f:P2p_welcome.shutdown net.welcome >>= fun () -> - P2p_maintenance.shutdown net.maintenance >>= fun () -> - Lwt_utils.may ~f:P2p_discovery.shutdown net.discoverer >>= fun () -> - P2p_connection_pool.destroy net.pool >>= fun () -> - P2p_io_scheduler.shutdown net.io_sched - - let connections { pool } () = - P2p_connection_pool.fold_connections pool - ~init:[] ~f:(fun _gid c acc -> c :: acc) - let find_connection { pool } gid = - P2p_connection_pool.Gids.find_connection pool gid - let connection_info _net conn = - P2p_connection_pool.connection_info conn - let connection_stat _net conn = - P2p_connection_pool.connection_stat conn - let global_stat { pool } () = - P2p_connection_pool.pool_stat pool - let set_metadata { pool } conn meta = - P2p_connection_pool.Gids.set_metadata pool conn meta - let get_metadata { pool } conn = - P2p_connection_pool.Gids.get_metadata pool conn - - let rec recv net () = - let pipes = - P2p_connection_pool.fold_connections - net.pool ~init:[] ~f:begin fun _gid conn acc -> - (P2p_connection_pool.is_readable conn >>= function - | Ok () -> Lwt.return conn - | Error _ -> Lwt_utils.never_ending) :: acc - end in - Lwt.pick pipes >>= fun conn -> - P2p_connection_pool.read conn >>= function - | Ok msg -> - Lwt.return (conn, msg) - | Error _ -> - Lwt_unix.yield () >>= fun () -> - recv net () - - let send _net c m = - P2p_connection_pool.write c m >>= function - | Ok () -> Lwt.return_unit - | Error _ -> Lwt.fail End_of_file (* temporary *) - - let try_send _net c v = - match P2p_connection_pool.write_now c v with - | Ok v -> v - | Error _ -> false - - let broadcast { pool } msg = P2p_connection_pool.write_all pool msg - - end - - module Fake = struct - - let id = Identity.generate Crypto_box.default_target - let empty_stat = { - Stat.total_sent = 0 ; - total_recv = 0 ; - current_inflow = 0 ; - current_outflow = 0 ; - } - let connection_info = { - Connection_info.incoming = false ; - gid = id.gid ; - id_point = (Ipaddr.V6.unspecified, None) ; - remote_socket_port = 0 ; - versions = [] ; - } - - end - - type net = { - gid : Gid.t ; - maintain : unit -> unit Lwt.t ; - roll : unit -> unit Lwt.t ; - shutdown : unit -> unit Lwt.t ; - connections : unit -> connection list ; - find_connection : Gid.t -> connection option ; - connection_info : connection -> Connection_info.t ; - connection_stat : connection -> Stat.t ; - global_stat : unit -> Stat.t ; - get_metadata : Gid.t -> Metadata.t option ; - set_metadata : Gid.t -> Metadata.t -> unit ; - recv : unit -> (connection * Message.t) Lwt.t ; - send : connection -> Message.t -> unit Lwt.t ; - try_send : connection -> Message.t -> bool ; - broadcast : Message.t -> unit ; - } - - let bootstrap ~config ~limits = - Real.create ~config ~limits >>= fun net -> - Real.maintain net () >>= fun () -> + let create ~config ~limits meta_cfg msg_cfg = + let io_sched = create_scheduler limits in + create_connection_pool + config limits meta_cfg msg_cfg io_sched >>= fun pool -> + let discoverer = may_create_discovery_worker config pool in + let maintenance = create_maintenance_worker limits pool discoverer in + may_create_welcome_worker config limits pool >>= fun welcome -> Lwt.return { - gid = Real.gid net ; - maintain = Real.maintain net ; - roll = Real.roll net ; - shutdown = Real.shutdown net ; - connections = Real.connections net ; - find_connection = Real.find_connection net ; - connection_info = Real.connection_info net ; - connection_stat = Real.connection_stat net ; - global_stat = Real.global_stat net ; - get_metadata = Real.get_metadata net ; - set_metadata = Real.set_metadata net ; - recv = Real.recv net ; - send = Real.send net ; - try_send = Real.try_send net ; - broadcast = Real.broadcast net ; + config ; + limits ; + io_sched ; + pool ; + discoverer ; + maintenance ; + welcome ; } - let faked_network = { - gid = Fake.id.gid ; - maintain = Lwt.return ; - roll = Lwt.return ; - shutdown = Lwt.return ; - connections = (fun () -> []) ; - find_connection = (fun _ -> None) ; - connection_info = (fun _ -> Fake.connection_info) ; - connection_stat = (fun _ -> Fake.empty_stat) ; - global_stat = (fun () -> Fake.empty_stat) ; - get_metadata = (fun _ -> None) ; - set_metadata = (fun _ _ -> ()) ; - recv = (fun () -> Lwt_utils.never_ending) ; - send = (fun _ _ -> Lwt_utils.never_ending) ; - try_send = (fun _ _ -> false) ; - broadcast = ignore ; - } + let gid { config } = config.identity.gid - let gid net = net.gid - let maintain net = net.maintain () - let roll net = net.roll () - let shutdown net = net.shutdown () - let connections net = net.connections () - let find_connection net = net.find_connection - let connection_info net = net.connection_info - let connection_stat net = net.connection_stat - let global_stat net = net.global_stat () - let get_metadata net = net.get_metadata - let set_metadata net = net.set_metadata - let recv net = net.recv () - let send net = net.send - let try_send net = net.try_send - let broadcast net = net.broadcast + let maintain { maintenance } () = + P2p_maintenance.maintain maintenance - module Raw = struct - type 'a t = 'a P2p_connection_pool.Message.t = - | Bootstrap - | Advertise of P2p_types.Point.t list - | Message of 'a - | Disconnect - type message = Message.t t - let encoding = P2p_connection_pool.Message.encoding Message.encoding - let supported_versions = Message.supported_versions - end + let roll _net () = Lwt.return_unit (* TODO implement *) + + (* returns when all workers have shutted down in the opposite + creation order. *) + let shutdown net () = + Lwt_utils.may ~f:P2p_welcome.shutdown net.welcome >>= fun () -> + P2p_maintenance.shutdown net.maintenance >>= fun () -> + Lwt_utils.may ~f:P2p_discovery.shutdown net.discoverer >>= fun () -> + P2p_connection_pool.destroy net.pool >>= fun () -> + P2p_io_scheduler.shutdown net.io_sched + + let connections { pool } () = + P2p_connection_pool.fold_connections pool + ~init:[] ~f:(fun _gid c acc -> c :: acc) + let find_connection { pool } gid = + P2p_connection_pool.Gids.find_connection pool gid + let connection_info _net conn = + P2p_connection_pool.connection_info conn + let connection_stat _net conn = + P2p_connection_pool.connection_stat conn + let global_stat { pool } () = + P2p_connection_pool.pool_stat pool + let set_metadata { pool } conn meta = + P2p_connection_pool.Gids.set_metadata pool conn meta + let get_metadata { pool } conn = + P2p_connection_pool.Gids.get_metadata pool conn + + let rec recv _net conn = + P2p_connection_pool.read conn + + let rec recv_any net () = + let pipes = + P2p_connection_pool.fold_connections + net.pool ~init:[] ~f:begin fun _gid conn acc -> + (P2p_connection_pool.is_readable conn >>= function + | Ok () -> Lwt.return conn + | Error _ -> Lwt_utils.never_ending) :: acc + end in + Lwt.pick pipes >>= fun conn -> + P2p_connection_pool.read conn >>= function + | Ok msg -> + Lwt.return (conn, msg) + | Error _ -> + Lwt_unix.yield () >>= fun () -> + recv_any net () + + let send _net c m = + P2p_connection_pool.write c m >>= function + | Ok () -> Lwt.return_unit + | Error _ -> Lwt.fail End_of_file (* temporary *) + + let try_send _net c v = + match P2p_connection_pool.write_now c v with + | Ok v -> v + | Error _ -> false + + let broadcast { pool } msg = P2p_connection_pool.write_all pool msg end + +module Fake = struct + + let id = Identity.generate Crypto_box.default_target + let empty_stat = { + Stat.total_sent = 0 ; + total_recv = 0 ; + current_inflow = 0 ; + current_outflow = 0 ; + } + let connection_info = { + Connection_info.incoming = false ; + gid = id.gid ; + id_point = (Ipaddr.V6.unspecified, None) ; + remote_socket_port = 0 ; + versions = [] ; + } + +end + +type ('msg, 'meta) t = { + gid : Gid.t ; + maintain : unit -> unit Lwt.t ; + roll : unit -> unit Lwt.t ; + shutdown : unit -> unit Lwt.t ; + connections : unit -> ('msg, 'meta) connection list ; + find_connection : Gid.t -> ('msg, 'meta) connection option ; + connection_info : ('msg, 'meta) connection -> Connection_info.t ; + connection_stat : ('msg, 'meta) connection -> Stat.t ; + global_stat : unit -> Stat.t ; + get_metadata : Gid.t -> 'meta option ; + set_metadata : Gid.t -> 'meta -> unit ; + recv : ('msg, 'meta) connection -> 'msg tzresult Lwt.t ; + recv_any : unit -> (('msg, 'meta) connection * 'msg) Lwt.t ; + send : ('msg, 'meta) connection -> 'msg -> unit Lwt.t ; + try_send : ('msg, 'meta) connection -> 'msg -> bool ; + broadcast : 'msg -> unit ; +} +type ('msg, 'meta) net = ('msg, 'meta) t + +let bootstrap ~config ~limits meta_cfg msg_cfg = + Real.create ~config ~limits meta_cfg msg_cfg >>= fun net -> + Real.maintain net () >>= fun () -> + Lwt.return { + gid = Real.gid net ; + maintain = Real.maintain net ; + roll = Real.roll net ; + shutdown = Real.shutdown net ; + connections = Real.connections net ; + find_connection = Real.find_connection net ; + connection_info = Real.connection_info net ; + connection_stat = Real.connection_stat net ; + global_stat = Real.global_stat net ; + get_metadata = Real.get_metadata net ; + set_metadata = Real.set_metadata net ; + recv = Real.recv net ; + recv_any = Real.recv_any net ; + send = Real.send net ; + try_send = Real.try_send net ; + broadcast = Real.broadcast net ; + } + +let faked_network = { + gid = Fake.id.gid ; + maintain = Lwt.return ; + roll = Lwt.return ; + shutdown = Lwt.return ; + connections = (fun () -> []) ; + find_connection = (fun _ -> None) ; + connection_info = (fun _ -> Fake.connection_info) ; + connection_stat = (fun _ -> Fake.empty_stat) ; + global_stat = (fun () -> Fake.empty_stat) ; + get_metadata = (fun _ -> None) ; + set_metadata = (fun _ _ -> ()) ; + recv = (fun _ -> Lwt_utils.never_ending) ; + recv_any = (fun () -> Lwt_utils.never_ending) ; + send = (fun _ _ -> Lwt_utils.never_ending) ; + try_send = (fun _ _ -> false) ; + broadcast = ignore ; +} + +let gid net = net.gid +let maintain net = net.maintain () +let roll net = net.roll () +let shutdown net = net.shutdown () +let connections net = net.connections () +let find_connection net = net.find_connection +let connection_info net = net.connection_info +let connection_stat net = net.connection_stat +let global_stat net = net.global_stat () +let get_metadata net = net.get_metadata +let set_metadata net = net.set_metadata +let recv net = net.recv +let recv_any net = net.recv_any () +let send net = net.send +let try_send net = net.try_send +let broadcast net = net.broadcast + +module Raw = struct + type 'a t = 'a P2p_connection_pool.Message.t = + | Bootstrap + | Advertise of P2p_types.Point.t list + | Message of 'a + | Disconnect + let encoding = P2p_connection_pool.Message.encoding +end diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index bd5d7c5ae..06b3dc93e 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -29,6 +29,24 @@ module Connection_info = P2p_types.Connection_info module Stat = P2p_types.Stat +type 'meta meta_config = { + encoding : 'meta Data_encoding.t; + initial : 'meta; +} + +type 'msg app_message_encoding = Encoding : { + tag: int ; + encoding: 'a Data_encoding.t ; + wrap: 'a -> 'msg ; + unwrap: 'msg -> 'a option ; + max_length: int option ; + } -> 'msg app_message_encoding + +type 'msg message_config = { + encoding : 'msg app_message_encoding list ; + versions : Version.t list; +} + (** Network configuration *) type config = { @@ -98,91 +116,78 @@ type limits = { } +type ('msg, 'meta) t +type ('msg, 'meta) net = ('msg, 'meta) t -(** Type of message used by higher layers *) -module type MESSAGE = sig - type t - val encoding : t P2p_connection_pool.encoding list - (** High level protocol(s) talked by the peer. When two peers - initiate a connection, they exchange their list of supported - versions. The chosen one, if any, is the maximum common one (in - lexicographic order) *) - val supported_versions : Version.t list +(** A faked p2p layer, which do not initiate any connection + nor open any listening socket *) +val faked_network : ('msg, 'meta) net + +(** Main network initialisation function *) +val bootstrap : + config:config -> limits:limits -> + 'meta meta_config -> 'msg message_config -> ('msg, 'meta) net Lwt.t + +(** Return one's gid *) +val gid : ('msg, 'meta) net -> Gid.t + +(** A maintenance operation : try and reach the ideal number of peers *) +val maintain : ('msg, 'meta) net -> unit Lwt.t + +(** Voluntarily drop some peers and replace them by new buddies *) +val roll : ('msg, 'meta) net -> unit Lwt.t + +(** Close all connections properly *) +val shutdown : ('msg, 'meta) net -> unit Lwt.t + +(** A connection to a peer *) +type ('msg, 'meta) connection + +(** Access the domain of active peers *) +val connections : ('msg, 'meta) net -> ('msg, 'meta) connection list + +(** Return the active peer with identity [gid] *) +val find_connection : ('msg, 'meta) net -> Gid.t -> ('msg, 'meta) connection option + +(** Access the info of an active peer, if available *) +val connection_info : + ('msg, 'meta) net -> ('msg, 'meta) connection -> Connection_info.t +val connection_stat : + ('msg, 'meta) net -> ('msg, 'meta) connection -> Stat.t +val global_stat : ('msg, 'meta) net -> Stat.t + +(** Accessors for meta information about a global identifier *) +val get_metadata : ('msg, 'meta) net -> Gid.t -> 'meta option +val set_metadata : ('msg, 'meta) net -> Gid.t -> 'meta -> unit + +(** Wait for a message from a given connection. *) +val recv : + ('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg tzresult Lwt.t + +(** Wait for a message from any active connections. *) +val recv_any : + ('msg, 'meta) net -> (('msg, 'meta) connection * 'msg) Lwt.t + +(** [send net peer msg] is a thread that returns when [msg] has been + successfully enqueued in the send queue. *) +val send : + ('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg -> unit Lwt.t + +(** [try_send net peer msg] is [true] if [msg] has been added to the + send queue for [peer], [false] otherwise *) +val try_send : + ('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg -> bool + +(** Send a message to all peers *) +val broadcast : ('msg, 'meta) net -> 'msg -> unit + +(**/**) +module Raw : sig + type 'a t = + | Bootstrap + | Advertise of P2p_types.Point.t list + | Message of 'a + | Disconnect + val encoding: 'msg app_message_encoding list -> 'msg t Data_encoding.t end -(** Type of metadata associated to an identity *) -module type METADATA = sig - type t - val initial : t - val encoding : t Data_encoding.t - val score : t -> float -end - -module Make (Message : MESSAGE) (Metadata : METADATA) : sig - - type net - - (** A faked p2p layer, which do not initiate any connection - nor open any listening socket *) - val faked_network : net - - (** Main network initialisation function *) - val bootstrap : config:config -> limits:limits -> net Lwt.t - - (** Return one's gid *) - val gid : net -> Gid.t - - (** A maintenance operation : try and reach the ideal number of peers *) - val maintain : net -> unit Lwt.t - - (** Voluntarily drop some peers and replace them by new buddies *) - val roll : net -> unit Lwt.t - - (** Close all connections properly *) - val shutdown : net -> unit Lwt.t - - (** A connection to a peer *) - type connection - - (** Access the domain of active peers *) - val connections : net -> connection list - - (** Return the active peer with identity [gid] *) - val find_connection : net -> Gid.t -> connection option - - (** Access the info of an active peer, if available *) - val connection_info : net -> connection -> Connection_info.t - val connection_stat : net -> connection -> Stat.t - val global_stat : net -> Stat.t - - (** Accessors for meta information about a global identifier *) - val get_metadata : net -> Gid.t -> Metadata.t option - val set_metadata : net -> Gid.t -> Metadata.t -> unit - - (** Wait for a message from any peer in the network *) - val recv : net -> (connection * Message.t) Lwt.t - - (** [send net peer msg] is a thread that returns when [msg] has been - successfully enqueued in the send queue. *) - val send : net -> connection -> Message.t -> unit Lwt.t - - (** [try_send net peer msg] is [true] if [msg] has been added to the - send queue for [peer], [false] otherwise *) - val try_send : net -> connection -> Message.t -> bool - - (** Send a message to all peers *) - val broadcast : net -> Message.t -> unit - - (**/**) - module Raw : sig - type 'a t = - | Bootstrap - | Advertise of P2p_types.Point.t list - | Message of 'a - | Disconnect - type message = Message.t t - val encoding: message Data_encoding.t - val supported_versions: P2p_types.Version.t list - end - -end diff --git a/src/node/shell/tezos_p2p.ml b/src/node/shell/tezos_p2p.ml index ce23e38df..0b4ef48d2 100644 --- a/src/node/shell/tezos_p2p.ml +++ b/src/node/shell/tezos_p2p.ml @@ -1,4 +1,6 @@ +open P2p + type net_id = Store.net_id type msg = @@ -24,7 +26,7 @@ module Message = struct let encoding = let open Data_encoding in let case ?max_length ~tag encoding unwrap wrap = - P2p_connection_pool.Encoding { tag; encoding; wrap; unwrap; max_length } in + P2p.Encoding { tag; encoding; wrap; unwrap; max_length } in [ case ~tag:0x10 (tup2 Block_hash.encoding (list Block_hash.encoding)) (function @@ -91,6 +93,44 @@ module Metadata = struct let score () = 0. end -include Message -include (Metadata : module type of Metadata with type t := metadata) -include P2p.Make(Message)(Metadata) + +let meta_cfg : _ P2p.meta_config = { + P2p.encoding = Metadata.encoding ; + initial = Metadata.initial ; +} + +and msg_cfg : _ P2p.message_config = { + encoding = Message.encoding ; + versions = Message.supported_versions ; +} + +type net = (Message.t, Metadata.t) P2p.net + +let bootstrap ~config ~limits = + P2p.bootstrap ~config ~limits meta_cfg msg_cfg + +let broadcast = P2p.broadcast +let try_send = P2p.try_send +let recv = P2p.recv_any +let send = P2p.send +let set_metadata = P2p.set_metadata +let get_metadata = P2p.get_metadata +let connection_info = P2p.connection_info +let find_connection = P2p.find_connection +let connections = P2p.connections +type connection = (Message.t, Metadata.t) P2p.connection +let shutdown = P2p.shutdown +let roll = P2p.roll +let maintain = P2p.maintain +let faked_network = P2p.faked_network + +module Raw = struct + type 'a t = 'a P2p.Raw.t = + | Bootstrap + | Advertise of Point.t list + | Message of 'a + | Disconnect + type message = Message.t t + let encoding = P2p.Raw.encoding msg_cfg.encoding + let supported_versions = msg_cfg.versions +end diff --git a/src/node/shell/tezos_p2p.mli b/src/node/shell/tezos_p2p.mli index 1dc142de9..db1344baa 100644 --- a/src/node/shell/tezos_p2p.mli +++ b/src/node/shell/tezos_p2p.mli @@ -75,10 +75,10 @@ val broadcast : net -> msg -> unit module Raw : sig type 'a t = | Bootstrap - | Advertise of P2p_types.Point.t list + | Advertise of Point.t list | Message of 'a | Disconnect type message = msg t val encoding: message Data_encoding.t - val supported_versions: P2p_types.Version.t list + val supported_versions: Version.t list end