diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index a8c92c017..84c71d639 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -7,6 +7,12 @@ (* *) (**************************************************************************) +module LU = Lwt_unix +module LC = Lwt_condition +open Lwt +open Lwt_utils +open Logging.Net + (* public types *) type addr = Ipaddr.t type port = int @@ -47,139 +53,73 @@ type gid = string let gid_length = 16 -type 'msg msg_encoding = Encoding : { +let pp_gid ppf gid = + Format.pp_print_string ppf (Hex_encode.hex_encode gid) + +(* the common version for a pair of peers, if any, is the maximum one, + in lexicographic order *) +let common_version la lb = + let la = List.sort (fun l r -> compare r l) la in + let lb = List.sort (fun l r -> compare r l) lb in + let rec find = function + | [], _ | _, [] -> None + | ((a :: ta) as la), ((b :: tb) as lb) -> + if a = b then Some a + else if a < b then find (ta, lb) + else find (la, tb) + in find (la, lb) + +(* A net point (address x port). *) +type point = addr * port + +let point_encoding = + let open Data_encoding in + let open Ipaddr in + conv + (fun (addr, port) -> + (match addr with + | V4 v4 -> V4.to_bytes v4 + | V6 v6 -> V6.to_bytes v6), port) + (fun (addr, port) -> + (match String.length addr with + | 4 -> V4 (V4.of_bytes_exn addr) + | 16 -> V6 (V6.of_bytes_exn addr) + | _ -> Pervasives.failwith "point_encoding"), port) + (obj2 + (req "addr" string) + (req "port" int16)) + +type 'msg encoding = Encoding : { tag: int ; encoding: 'a Data_encoding.t ; wrap: 'a -> 'msg ; unwrap: 'msg -> 'a option ; max_length: int option ; - } -> 'msg msg_encoding + } -> 'msg encoding -module type NET_PARAMS = sig - type meta (** Type of metadata associated to an identity *) - type msg (** Type of message used by higher layers *) +module type PARAMS = sig - val msg_encodings : msg msg_encoding list + (** Type of message used by higher layers *) + type msg - val init_meta : meta - val score_enc : meta Data_encoding.t - val score: meta -> float + val encodings : msg encoding list + + (** Type of metadata associated to an identity *) + type metadata + + val initial_metadata : metadata + val metadata_encoding : metadata Data_encoding.t + val score : metadata -> float (** High level protocol(s) talked by the peer. When two peers initiate a connection, they exchange their list of supported versions. The chosen one, if any, is the maximum common one (in lexicographic order) *) val supported_versions : version list + end -module type S = sig - include NET_PARAMS - - type net - - (** A faked p2p layer, which do not initiate any connection - nor open any listening socket. *) - val faked_network : net - - (** Main network initialisation function *) - val bootstrap : config:config -> limits:limits -> net Lwt.t - - (** A maintenance operation : try and reach the ideal number of peers *) - val maintain : net -> unit Lwt.t - - (** Voluntarily drop some peers and replace them by new buddies *) - val roll : net -> unit Lwt.t - - (** Close all connections properly *) - val shutdown : net -> unit Lwt.t - - (** A connection to a peer *) - type peer - - (** Access the domain of active peers *) - val peers : net -> peer list - - (** Return the active peer with identity [gid] *) - val find_peer : net -> gid -> peer option - - type peer_info = { - gid : gid; - addr : addr; - port : port; - version : version; - } - - (** Access the info of an active peer, if available *) - val peer_info : net -> peer -> peer_info - - (** Accessors for meta information about a peer *) - val get_meta : net -> gid -> meta option - val set_meta : net -> gid -> meta -> unit - - (** Wait for a payload from any peer in the network *) - val recv : net -> (peer * msg) Lwt.t - - (** Send a payload to a peer and wait for it to be in the tube *) - val send : net -> peer -> msg -> unit Lwt.t - - (** Send a payload to a peer without waiting for the result. Return - [true] if the message can be enqueued in the peer's output queue - or [false] otherwise. *) - val try_send : net -> peer -> msg -> bool - - (** Send a payload to all peers *) - val broadcast : net -> msg -> unit - - (** Shutdown the connection to all peers at this address and stop the - communications with this machine for [duration] seconds *) - val blacklist : net -> gid -> unit - - (** Keep a connection to this pair as often as possible *) - val whitelist : net -> gid -> unit -end - -module Make (P: NET_PARAMS) = struct - module LU = Lwt_unix - module LC = Lwt_condition - open Lwt - open Lwt_utils - open Logging.Net - - let pp_gid ppf gid = - Format.pp_print_string ppf (Hex_encode.hex_encode gid) - - (* the common version for a pair of peers, if any, is the maximum one, - in lexicographic order *) - let common_version la lb = - let la = List.sort (fun l r -> compare r l) la in - let lb = List.sort (fun l r -> compare r l) lb in - let rec find = function - | [], _ | _, [] -> None - | ((a :: ta) as la), ((b :: tb) as lb) -> - if a = b then Some a - else if a < b then find (ta, lb) - else find (la, tb) - in find (la, lb) - - (* A net point (address x port). *) - type point = addr * port - - let point_encoding = - let open Data_encoding in - let open Ipaddr in - conv - (fun (addr, port) -> - (match addr with - | V4 v4 -> V4.to_bytes v4 - | V6 v6 -> V6.to_bytes v6), port) - (fun (addr, port) -> - (match String.length addr with - | 4 -> V4 (V4.of_bytes_exn addr) - | 16 -> V6 (V6.of_bytes_exn addr) - | _ -> Pervasives.failwith "point_encoding"), port) - (obj2 - (req "addr" string) - (req "port" int16)) +module Make (P: PARAMS) = struct (* Low-level network protocol packets (internal). The protocol is completely symmetrical and asynchronous. First both peers must @@ -236,7 +176,7 @@ module Make (P: NET_PARAMS) = struct (function Bootstrap -> Some () | _ -> None) (fun () -> Bootstrap); ] @ - ListLabels.map P.msg_encodings ~f:begin function Encoding { tag; encoding; wrap; unwrap } -> + ListLabels.map P.encodings ~f:begin function Encoding { tag; encoding; wrap; unwrap } -> case ~tag encoding (function Message msg -> unwrap msg | _ -> None) (fun msg -> Message (wrap msg)) @@ -250,7 +190,7 @@ module Make (P: NET_PARAMS) = struct | 3 -> Some 0 | 4 -> Some (1 + 1000 * 17) (* tag + 1000 * max (point size) *) | 5 -> Some 0 - | n -> ListLabels.fold_left P.msg_encodings ~init:None ~f:begin fun a -> function + | n -> ListLabels.fold_left P.encodings ~init:None ~f:begin fun a -> function Encoding { tag; max_length } -> if tag = n then max_length else a end @@ -359,8 +299,8 @@ module Make (P: NET_PARAMS) = struct peers : unit -> peer list ; find_peer : gid -> peer option ; peer_info : peer -> peer_info ; - set_meta : gid -> P.meta -> unit ; - get_meta : gid -> P.meta option ; + set_metadata : gid -> P.metadata -> unit ; + get_metadata : gid -> P.metadata option ; } (* The (internal) type of network events, those dispatched from peer @@ -639,7 +579,7 @@ module Make (P: NET_PARAMS) = struct unreachable_since : float option; connections : (int * float) option ; white_listed : bool ; - meta : P.meta ; + meta : P.metadata ; } (* Ad hoc comparison on sources such as good source < bad source *) @@ -789,7 +729,7 @@ module Make (P: NET_PARAMS) = struct let source = { unreachable_since = None ; connections = None ; white_listed = true ; - meta = P.init_meta ; + meta = P.initial_metadata ; } in List.fold_left @@ -821,14 +761,14 @@ module Make (P: NET_PARAMS) = struct { unreachable_since = None ; connections = None ; white_listed = true ; - meta = P.init_meta ; } in + meta = P.initial_metadata ; } in PeerMap.update (addr, port) source r | Some (c, t, gid) -> let source = { unreachable_since = None ; connections = Some (c, t) ; white_listed = PointSet.mem (addr, port) white_list ; - meta = P.init_meta ; } in + meta = P.initial_metadata ; } in PeerMap.update (addr, port) ~gid source r) PeerMap.empty k in let black_list = @@ -1066,17 +1006,17 @@ module Make (P: NET_PARAMS) = struct { connections = Some (1, Unix.gettimeofday ()) ; unreachable_since = None ; white_listed ; - meta = P.init_meta } + meta = P.initial_metadata } | { connections = Some (n, _) ; white_listed } -> { connections = Some (n + 1, Unix.gettimeofday ()) ; unreachable_since = None ; white_listed ; - meta = P.init_meta } + meta = P.initial_metadata } with Not_found -> { connections = Some (1, Unix.gettimeofday ()) ; unreachable_since = None ; white_listed = white_listed point ; - meta = P.init_meta } + meta = P.initial_metadata } in (* if it's me, it's probably not me *) if my_gid = peer.gid then begin @@ -1136,7 +1076,7 @@ module Make (P: NET_PARAMS) = struct { unreachable_since = None ; connections = None ; white_listed = false ; - meta = P.init_meta } in + meta = P.initial_metadata } in known_peers := PeerMap.update point source !known_peers ; LC.broadcast new_contact point) peers ; @@ -1276,7 +1216,7 @@ module Make (P: NET_PARAMS) = struct { unreachable_since = None ; connections = None ; white_listed = true ; - meta = P.init_meta }, + meta = P.initial_metadata }, None in known_peers := PeerMap.update point ?gid source !known_peers and whitelist peer = @@ -1292,11 +1232,11 @@ module Make (P: NET_PARAMS) = struct LC.broadcast please_maintain () ; waiter and roll () = Pervasives.failwith "roll" - and get_meta _gid = None (* TODO: implement *) - and set_meta _gid _meta = () (* TODO: implement *) + and get_metadata _gid = None (* TODO: implement *) + and set_metadata _gid _meta = () (* TODO: implement *) in let net = { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; peer_info ; get_meta ; set_meta } in + blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } in (* main thread, returns after first successful maintenance *) maintain () >>= fun () -> debug "(%a) network succesfully bootstrapped" pp_gid my_gid ; @@ -1318,10 +1258,10 @@ module Make (P: NET_PARAMS) = struct let maintain () = Lwt.return_unit in let roll () = Lwt.return_unit in let peer_info _ = assert false in - let get_meta _ = None in - let set_meta _ _ = () in + let get_metadata _ = None in + let set_metadata _ _ = () in { shutdown ; peers ; find_peer ; recv_from ; send_to ; try_send ; broadcast ; - blacklist ; whitelist ; maintain ; roll ; peer_info ; get_meta ; set_meta } + blacklist ; whitelist ; maintain ; roll ; peer_info ; get_metadata ; set_metadata } (* Plug toplevel functions to callback calls. *) @@ -1337,7 +1277,7 @@ module Make (P: NET_PARAMS) = struct let roll net = net.roll () let blacklist _net _gid = () let whitelist _net _gid = () - let get_meta net gid = net.get_meta gid - let set_meta net gid meta = net.set_meta gid meta + let get_metadata net gid = net.get_metadata gid + let set_metadata net gid meta = net.set_metadata gid meta end diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index 550f4aeb2..c96059b8f 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -56,36 +56,42 @@ type limits = { (** A global identifier for a peer, a.k.a. an identity *) type gid -type 'msg msg_encoding = Encoding : { +type 'msg encoding = Encoding : { tag: int ; encoding: 'a Data_encoding.t ; wrap: 'a -> 'msg ; unwrap: 'msg -> 'a option ; max_length: int option ; - } -> 'msg msg_encoding + } -> 'msg encoding -module type NET_PARAMS = sig - type meta (** Type of metadata associated to an identity *) - type msg (** Type of message used by higher layers *) +module type PARAMS = sig - val msg_encodings : msg msg_encoding list + (** Type of message used by higher layers *) + type msg - val init_meta : meta - val score_enc : meta Data_encoding.t - val score: meta -> float + val encodings : msg encoding list + + (** Type of metadata associated to an identity *) + type metadata + + val initial_metadata : metadata + val metadata_encoding : metadata Data_encoding.t + val score : metadata -> float (** High level protocol(s) talked by the peer. When two peers initiate a connection, they exchange their list of supported versions. The chosen one, if any, is the maximum common one (in lexicographic order) *) val supported_versions : version list + end -module Make (P : NET_PARAMS) : sig +module Make (P : PARAMS) : sig + type net (** A faked p2p layer, which do not initiate any connection - nor open any listening socket. *) + nor open any listening socket *) val faked_network : net (** Main network initialisation function *) @@ -110,18 +116,18 @@ module Make (P : NET_PARAMS) : sig val find_peer : net -> gid -> peer option type peer_info = { - gid : gid; - addr : addr; - port : port; - version : version; + gid : gid ; + addr : addr ; + port : port ; + version : version ; } (** Access the info of an active peer, if available *) val peer_info : net -> peer -> peer_info - (** Accessors for meta information about a peer *) - val get_meta : net -> gid -> P.meta option - val set_meta : net -> gid -> P.meta -> unit + (** Accessors for meta information about a global identifier *) + val get_metadata : net -> gid -> P.metadata option + val set_metadata : net -> gid -> P.metadata -> unit (** Wait for a payload from any peer in the network *) val recv : net -> (peer * P.msg) Lwt.t @@ -143,4 +149,5 @@ module Make (P : NET_PARAMS) : sig (** Keep a connection to this pair as often as possible *) val whitelist : net -> gid -> unit + end diff --git a/src/node/shell/tezos_p2p.ml b/src/node/shell/tezos_p2p.ml index 4b89512e2..a013c3281 100644 --- a/src/node/shell/tezos_p2p.ml +++ b/src/node/shell/tezos_p2p.ml @@ -20,7 +20,7 @@ module Param = struct | Get_protocols of Protocol_hash.t list | Protocol of MBytes.t - let msg_encodings = + let encodings = let open Data_encoding in let case ?max_length ~tag encoding unwrap wrap = P2p.Encoding { tag; encoding; wrap; unwrap; max_length } in @@ -71,9 +71,9 @@ module Param = struct (fun proto -> Protocol proto); ] - type meta = unit - let init_meta = () - let score_enc = Data_encoding.empty + type metadata = unit + let initial_metadata = () + let metadata_encoding = Data_encoding.empty let score () = 0. let supported_versions = diff --git a/src/node/shell/tezos_p2p.mli b/src/node/shell/tezos_p2p.mli index d584dd288..94262be34 100644 --- a/src/node/shell/tezos_p2p.mli +++ b/src/node/shell/tezos_p2p.mli @@ -40,10 +40,10 @@ val peer_info : net -> peer -> peer_info (** Accessors for meta information about a global identifier *) -type meta = unit +type metadata = unit -val get_meta : net -> gid -> meta option -val set_meta : net -> gid -> meta -> unit +val get_metadata : net -> gid -> metadata option +val set_metadata : net -> gid -> metadata -> unit type net_id = Store.net_id