P2p: remove functors and rename recv into recv_any.

This allows to export the underlying `P2p_connection_pool.recv`
that will be used in the refactored distributed DB.
This commit is contained in:
Grégoire Henry 2017-01-16 18:38:40 +01:00
parent ef514eb70e
commit 26ce72bc18
4 changed files with 345 additions and 301 deletions

View File

@ -9,6 +9,25 @@
include P2p_types include P2p_types
type 'meta meta_config = 'meta P2p_connection_pool.meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
}
type 'msg app_message_encoding = 'msg P2p_connection_pool.encoding =
Encoding : {
tag: int ;
encoding: 'a Data_encoding.t ;
wrap: 'a -> 'msg ;
unwrap: 'msg -> 'a option ;
max_length: int option ;
} -> 'msg app_message_encoding
type 'msg message_config = 'msg P2p_connection_pool.message_config = {
encoding : 'msg app_message_encoding list ;
versions : Version.t list;
}
type config = { type config = {
listening_port : port option ; listening_port : port option ;
listening_addr : addr option ; listening_addr : addr option ;
@ -107,45 +126,21 @@ let may_create_welcome_worker config limits pool =
?addr:config.listening_addr port >>= fun w -> ?addr:config.listening_addr port >>= fun w ->
Lwt.return (Some w) Lwt.return (Some w)
module type MESSAGE = sig type ('msg, 'meta) connection = ('msg, 'meta) P2p_connection_pool.connection
type t
val encoding : t P2p_connection_pool.encoding list
val supported_versions : Version.t list
end
module type METADATA = sig
type t
val initial : t
val encoding : t Data_encoding.t
val score : t -> float
end
module Make (Message : MESSAGE) (Metadata : METADATA) = struct
let meta_cfg = {
P2p_connection_pool.encoding = Metadata.encoding ;
initial = Metadata.initial ;
}
and msg_cfg = {
P2p_connection_pool.encoding = Message.encoding ;
versions = Message.supported_versions ;
}
type connection = (Message.t, Metadata.t) P2p_connection_pool.connection
module Real = struct module Real = struct
type net = { type ('msg, 'meta) net = {
config: config ; config: config ;
limits: limits ; limits: limits ;
io_sched: P2p_io_scheduler.t ; io_sched: P2p_io_scheduler.t ;
pool: (Message.t, Metadata.t) P2p_connection_pool.t ; pool: ('msg, 'meta) P2p_connection_pool.t ;
discoverer: P2p_discovery.t option ; discoverer: P2p_discovery.t option ;
maintenance: Metadata.t P2p_maintenance.t ; maintenance: 'meta P2p_maintenance.t ;
welcome: P2p_welcome.t option ; welcome: P2p_welcome.t option ;
} }
let create ~config ~limits = let create ~config ~limits meta_cfg msg_cfg =
let io_sched = create_scheduler limits in let io_sched = create_scheduler limits in
create_connection_pool create_connection_pool
config limits meta_cfg msg_cfg io_sched >>= fun pool -> config limits meta_cfg msg_cfg io_sched >>= fun pool ->
@ -194,7 +189,10 @@ module Make (Message : MESSAGE) (Metadata : METADATA) = struct
let get_metadata { pool } conn = let get_metadata { pool } conn =
P2p_connection_pool.Gids.get_metadata pool conn P2p_connection_pool.Gids.get_metadata pool conn
let rec recv net () = let rec recv _net conn =
P2p_connection_pool.read conn
let rec recv_any net () =
let pipes = let pipes =
P2p_connection_pool.fold_connections P2p_connection_pool.fold_connections
net.pool ~init:[] ~f:begin fun _gid conn acc -> net.pool ~init:[] ~f:begin fun _gid conn acc ->
@ -208,7 +206,7 @@ module Make (Message : MESSAGE) (Metadata : METADATA) = struct
Lwt.return (conn, msg) Lwt.return (conn, msg)
| Error _ -> | Error _ ->
Lwt_unix.yield () >>= fun () -> Lwt_unix.yield () >>= fun () ->
recv net () recv_any net ()
let send _net c m = let send _net c m =
P2p_connection_pool.write c m >>= function P2p_connection_pool.write c m >>= function
@ -243,26 +241,28 @@ module Make (Message : MESSAGE) (Metadata : METADATA) = struct
end end
type net = { type ('msg, 'meta) t = {
gid : Gid.t ; gid : Gid.t ;
maintain : unit -> unit Lwt.t ; maintain : unit -> unit Lwt.t ;
roll : unit -> unit Lwt.t ; roll : unit -> unit Lwt.t ;
shutdown : unit -> unit Lwt.t ; shutdown : unit -> unit Lwt.t ;
connections : unit -> connection list ; connections : unit -> ('msg, 'meta) connection list ;
find_connection : Gid.t -> connection option ; find_connection : Gid.t -> ('msg, 'meta) connection option ;
connection_info : connection -> Connection_info.t ; connection_info : ('msg, 'meta) connection -> Connection_info.t ;
connection_stat : connection -> Stat.t ; connection_stat : ('msg, 'meta) connection -> Stat.t ;
global_stat : unit -> Stat.t ; global_stat : unit -> Stat.t ;
get_metadata : Gid.t -> Metadata.t option ; get_metadata : Gid.t -> 'meta option ;
set_metadata : Gid.t -> Metadata.t -> unit ; set_metadata : Gid.t -> 'meta -> unit ;
recv : unit -> (connection * Message.t) Lwt.t ; recv : ('msg, 'meta) connection -> 'msg tzresult Lwt.t ;
send : connection -> Message.t -> unit Lwt.t ; recv_any : unit -> (('msg, 'meta) connection * 'msg) Lwt.t ;
try_send : connection -> Message.t -> bool ; send : ('msg, 'meta) connection -> 'msg -> unit Lwt.t ;
broadcast : Message.t -> unit ; try_send : ('msg, 'meta) connection -> 'msg -> bool ;
broadcast : 'msg -> unit ;
} }
type ('msg, 'meta) net = ('msg, 'meta) t
let bootstrap ~config ~limits = let bootstrap ~config ~limits meta_cfg msg_cfg =
Real.create ~config ~limits >>= fun net -> Real.create ~config ~limits meta_cfg msg_cfg >>= fun net ->
Real.maintain net () >>= fun () -> Real.maintain net () >>= fun () ->
Lwt.return { Lwt.return {
gid = Real.gid net ; gid = Real.gid net ;
@ -277,6 +277,7 @@ module Make (Message : MESSAGE) (Metadata : METADATA) = struct
get_metadata = Real.get_metadata net ; get_metadata = Real.get_metadata net ;
set_metadata = Real.set_metadata net ; set_metadata = Real.set_metadata net ;
recv = Real.recv net ; recv = Real.recv net ;
recv_any = Real.recv_any net ;
send = Real.send net ; send = Real.send net ;
try_send = Real.try_send net ; try_send = Real.try_send net ;
broadcast = Real.broadcast net ; broadcast = Real.broadcast net ;
@ -294,7 +295,8 @@ module Make (Message : MESSAGE) (Metadata : METADATA) = struct
global_stat = (fun () -> Fake.empty_stat) ; global_stat = (fun () -> Fake.empty_stat) ;
get_metadata = (fun _ -> None) ; get_metadata = (fun _ -> None) ;
set_metadata = (fun _ _ -> ()) ; set_metadata = (fun _ _ -> ()) ;
recv = (fun () -> Lwt_utils.never_ending) ; recv = (fun _ -> Lwt_utils.never_ending) ;
recv_any = (fun () -> Lwt_utils.never_ending) ;
send = (fun _ _ -> Lwt_utils.never_ending) ; send = (fun _ _ -> Lwt_utils.never_ending) ;
try_send = (fun _ _ -> false) ; try_send = (fun _ _ -> false) ;
broadcast = ignore ; broadcast = ignore ;
@ -311,7 +313,8 @@ module Make (Message : MESSAGE) (Metadata : METADATA) = struct
let global_stat net = net.global_stat () let global_stat net = net.global_stat ()
let get_metadata net = net.get_metadata let get_metadata net = net.get_metadata
let set_metadata net = net.set_metadata let set_metadata net = net.set_metadata
let recv net = net.recv () let recv net = net.recv
let recv_any net = net.recv_any ()
let send net = net.send let send net = net.send
let try_send net = net.try_send let try_send net = net.try_send
let broadcast net = net.broadcast let broadcast net = net.broadcast
@ -322,9 +325,5 @@ module Make (Message : MESSAGE) (Metadata : METADATA) = struct
| Advertise of P2p_types.Point.t list | Advertise of P2p_types.Point.t list
| Message of 'a | Message of 'a
| Disconnect | Disconnect
type message = Message.t t let encoding = P2p_connection_pool.Message.encoding
let encoding = P2p_connection_pool.Message.encoding Message.encoding
let supported_versions = Message.supported_versions
end
end end

View File

@ -29,6 +29,24 @@ module Connection_info = P2p_types.Connection_info
module Stat = P2p_types.Stat module Stat = P2p_types.Stat
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
}
type 'msg app_message_encoding = Encoding : {
tag: int ;
encoding: 'a Data_encoding.t ;
wrap: 'a -> 'msg ;
unwrap: 'msg -> 'a option ;
max_length: int option ;
} -> 'msg app_message_encoding
type 'msg message_config = {
encoding : 'msg app_message_encoding list ;
versions : Version.t list;
}
(** Network configuration *) (** Network configuration *)
type config = { type config = {
@ -98,80 +116,70 @@ type limits = {
} }
type ('msg, 'meta) t
(** Type of message used by higher layers *) type ('msg, 'meta) net = ('msg, 'meta) t
module type MESSAGE = sig
type t
val encoding : t P2p_connection_pool.encoding list
(** High level protocol(s) talked by the peer. When two peers
initiate a connection, they exchange their list of supported
versions. The chosen one, if any, is the maximum common one (in
lexicographic order) *)
val supported_versions : Version.t list
end
(** Type of metadata associated to an identity *)
module type METADATA = sig
type t
val initial : t
val encoding : t Data_encoding.t
val score : t -> float
end
module Make (Message : MESSAGE) (Metadata : METADATA) : sig
type net
(** A faked p2p layer, which do not initiate any connection (** A faked p2p layer, which do not initiate any connection
nor open any listening socket *) nor open any listening socket *)
val faked_network : net val faked_network : ('msg, 'meta) net
(** Main network initialisation function *) (** Main network initialisation function *)
val bootstrap : config:config -> limits:limits -> net Lwt.t val bootstrap :
config:config -> limits:limits ->
'meta meta_config -> 'msg message_config -> ('msg, 'meta) net Lwt.t
(** Return one's gid *) (** Return one's gid *)
val gid : net -> Gid.t val gid : ('msg, 'meta) net -> Gid.t
(** A maintenance operation : try and reach the ideal number of peers *) (** A maintenance operation : try and reach the ideal number of peers *)
val maintain : net -> unit Lwt.t val maintain : ('msg, 'meta) net -> unit Lwt.t
(** Voluntarily drop some peers and replace them by new buddies *) (** Voluntarily drop some peers and replace them by new buddies *)
val roll : net -> unit Lwt.t val roll : ('msg, 'meta) net -> unit Lwt.t
(** Close all connections properly *) (** Close all connections properly *)
val shutdown : net -> unit Lwt.t val shutdown : ('msg, 'meta) net -> unit Lwt.t
(** A connection to a peer *) (** A connection to a peer *)
type connection type ('msg, 'meta) connection
(** Access the domain of active peers *) (** Access the domain of active peers *)
val connections : net -> connection list val connections : ('msg, 'meta) net -> ('msg, 'meta) connection list
(** Return the active peer with identity [gid] *) (** Return the active peer with identity [gid] *)
val find_connection : net -> Gid.t -> connection option val find_connection : ('msg, 'meta) net -> Gid.t -> ('msg, 'meta) connection option
(** Access the info of an active peer, if available *) (** Access the info of an active peer, if available *)
val connection_info : net -> connection -> Connection_info.t val connection_info :
val connection_stat : net -> connection -> Stat.t ('msg, 'meta) net -> ('msg, 'meta) connection -> Connection_info.t
val global_stat : net -> Stat.t val connection_stat :
('msg, 'meta) net -> ('msg, 'meta) connection -> Stat.t
val global_stat : ('msg, 'meta) net -> Stat.t
(** Accessors for meta information about a global identifier *) (** Accessors for meta information about a global identifier *)
val get_metadata : net -> Gid.t -> Metadata.t option val get_metadata : ('msg, 'meta) net -> Gid.t -> 'meta option
val set_metadata : net -> Gid.t -> Metadata.t -> unit val set_metadata : ('msg, 'meta) net -> Gid.t -> 'meta -> unit
(** Wait for a message from any peer in the network *) (** Wait for a message from a given connection. *)
val recv : net -> (connection * Message.t) Lwt.t val recv :
('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg tzresult Lwt.t
(** Wait for a message from any active connections. *)
val recv_any :
('msg, 'meta) net -> (('msg, 'meta) connection * 'msg) Lwt.t
(** [send net peer msg] is a thread that returns when [msg] has been (** [send net peer msg] is a thread that returns when [msg] has been
successfully enqueued in the send queue. *) successfully enqueued in the send queue. *)
val send : net -> connection -> Message.t -> unit Lwt.t val send :
('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg -> unit Lwt.t
(** [try_send net peer msg] is [true] if [msg] has been added to the (** [try_send net peer msg] is [true] if [msg] has been added to the
send queue for [peer], [false] otherwise *) send queue for [peer], [false] otherwise *)
val try_send : net -> connection -> Message.t -> bool val try_send :
('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg -> bool
(** Send a message to all peers *) (** Send a message to all peers *)
val broadcast : net -> Message.t -> unit val broadcast : ('msg, 'meta) net -> 'msg -> unit
(**/**) (**/**)
module Raw : sig module Raw : sig
@ -180,9 +188,6 @@ module Make (Message : MESSAGE) (Metadata : METADATA) : sig
| Advertise of P2p_types.Point.t list | Advertise of P2p_types.Point.t list
| Message of 'a | Message of 'a
| Disconnect | Disconnect
type message = Message.t t val encoding: 'msg app_message_encoding list -> 'msg t Data_encoding.t
val encoding: message Data_encoding.t
val supported_versions: P2p_types.Version.t list
end end
end

View File

@ -1,4 +1,6 @@
open P2p
type net_id = Store.net_id type net_id = Store.net_id
type msg = type msg =
@ -24,7 +26,7 @@ module Message = struct
let encoding = let encoding =
let open Data_encoding in let open Data_encoding in
let case ?max_length ~tag encoding unwrap wrap = let case ?max_length ~tag encoding unwrap wrap =
P2p_connection_pool.Encoding { tag; encoding; wrap; unwrap; max_length } in P2p.Encoding { tag; encoding; wrap; unwrap; max_length } in
[ [
case ~tag:0x10 (tup2 Block_hash.encoding (list Block_hash.encoding)) case ~tag:0x10 (tup2 Block_hash.encoding (list Block_hash.encoding))
(function (function
@ -91,6 +93,44 @@ module Metadata = struct
let score () = 0. let score () = 0.
end end
include Message
include (Metadata : module type of Metadata with type t := metadata) let meta_cfg : _ P2p.meta_config = {
include P2p.Make(Message)(Metadata) P2p.encoding = Metadata.encoding ;
initial = Metadata.initial ;
}
and msg_cfg : _ P2p.message_config = {
encoding = Message.encoding ;
versions = Message.supported_versions ;
}
type net = (Message.t, Metadata.t) P2p.net
let bootstrap ~config ~limits =
P2p.bootstrap ~config ~limits meta_cfg msg_cfg
let broadcast = P2p.broadcast
let try_send = P2p.try_send
let recv = P2p.recv_any
let send = P2p.send
let set_metadata = P2p.set_metadata
let get_metadata = P2p.get_metadata
let connection_info = P2p.connection_info
let find_connection = P2p.find_connection
let connections = P2p.connections
type connection = (Message.t, Metadata.t) P2p.connection
let shutdown = P2p.shutdown
let roll = P2p.roll
let maintain = P2p.maintain
let faked_network = P2p.faked_network
module Raw = struct
type 'a t = 'a P2p.Raw.t =
| Bootstrap
| Advertise of Point.t list
| Message of 'a
| Disconnect
type message = Message.t t
let encoding = P2p.Raw.encoding msg_cfg.encoding
let supported_versions = msg_cfg.versions
end

View File

@ -75,10 +75,10 @@ val broadcast : net -> msg -> unit
module Raw : sig module Raw : sig
type 'a t = type 'a t =
| Bootstrap | Bootstrap
| Advertise of P2p_types.Point.t list | Advertise of Point.t list
| Message of 'a | Message of 'a
| Disconnect | Disconnect
type message = msg t type message = msg t
val encoding: message Data_encoding.t val encoding: message Data_encoding.t
val supported_versions: P2p_types.Version.t list val supported_versions: Version.t list
end end