P2P/Shell: Split metadata into peer_metadata and conn_metadata

Peer_metadata is meant to keep track of peer's score.

Conn_metadata is meant to keep track of connection configuration given
during Ack exchange.
This commit is contained in:
michael 2018-05-15 13:16:08 +02:00 committed by Grégoire Henry
parent c13b7dd39a
commit 3f1363b9ba
21 changed files with 513 additions and 329 deletions

View File

@ -9,10 +9,15 @@
include Logging.Make(struct let name = "p2p" end)
type 'meta meta_config = 'meta P2p_pool.meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
score : 'meta -> float
type 'peer_meta peer_meta_config = 'peer_meta P2p_pool.peer_meta_config = {
peer_meta_encoding : 'peer_meta Data_encoding.t ;
peer_meta_initial : 'peer_meta ;
score : 'peer_meta -> float ;
}
type 'conn_meta conn_meta_config = 'conn_meta P2p_pool.conn_meta_config = {
conn_meta_encoding : 'conn_meta Data_encoding.t ;
conn_meta_value : P2p_peer.Id.t -> 'conn_meta ;
}
type 'msg app_message_encoding = 'msg P2p_pool.encoding =
@ -146,23 +151,24 @@ let may_create_welcome_worker config limits pool =
port >>= fun w ->
Lwt.return (Some w)
type ('msg, 'meta) connection = ('msg, 'meta) P2p_pool.connection
type ('msg, 'peer_meta, 'conn_meta) connection =
('msg, 'peer_meta, 'conn_meta) P2p_pool.connection
module Real = struct
type ('msg, 'meta) net = {
type ('msg, 'peer_meta, 'conn_meta) net = {
config: config ;
limits: limits ;
io_sched: P2p_io_scheduler.t ;
pool: ('msg, 'meta) P2p_pool.t ;
maintenance: 'meta P2p_maintenance.t ;
pool: ('msg, 'peer_meta, 'conn_meta) P2p_pool.t ;
maintenance: 'peer_meta P2p_maintenance.t ;
welcome: P2p_welcome.t option ;
}
let create ~config ~limits meta_cfg msg_cfg =
let create ~config ~limits meta_cfg conn_meta_cfg msg_cfg =
let io_sched = create_scheduler limits in
create_connection_pool
config limits meta_cfg msg_cfg io_sched >>= fun pool ->
config limits meta_cfg conn_meta_cfg msg_cfg io_sched >>= fun pool ->
let maintenance = create_maintenance_worker limits pool in
may_create_welcome_worker config limits pool >>= fun welcome ->
return {
@ -202,10 +208,10 @@ module Real = struct
P2p_pool.Connection.stat conn
let global_stat { pool } () =
P2p_pool.pool_stat pool
let set_metadata { pool } conn meta =
P2p_pool.Peers.set_metadata pool conn meta
let get_metadata { pool } conn =
P2p_pool.Peers.get_metadata pool conn
let set_peer_metadata { pool } conn meta =
P2p_pool.Peers.set_peer_metadata pool conn meta
let get_peer_metadata { pool } conn =
P2p_pool.Peers.get_peer_metadata pool conn
let recv _net conn =
P2p_pool.read conn >>=? fun msg ->
@ -307,32 +313,42 @@ module Fake = struct
end
type ('msg, 'meta) t = {
type ('msg, 'peer_meta, 'conn_meta) t = {
versions : P2p_version.t list ;
peer_id : P2p_peer.Id.t ;
maintain : unit -> unit Lwt.t ;
roll : unit -> unit Lwt.t ;
shutdown : unit -> unit Lwt.t ;
connections : unit -> ('msg, 'meta) connection list ;
find_connection : P2p_peer.Id.t -> ('msg, 'meta) connection option ;
disconnect : ?wait:bool -> ('msg, 'meta) connection -> unit Lwt.t ;
connection_info : ('msg, 'meta) connection -> P2p_connection.Info.t ;
connection_stat : ('msg, 'meta) connection -> P2p_stat.t ;
connections : unit -> ('msg, 'peer_meta, 'conn_meta) connection list ;
find_connection :
P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection option ;
disconnect :
?wait:bool -> ('msg, 'peer_meta, 'conn_meta) connection -> unit Lwt.t ;
connection_info :
('msg, 'peer_meta, 'conn_meta) connection -> P2p_connection.Info.t ;
connection_stat : ('msg, 'peer_meta, 'conn_meta) connection -> P2p_stat.t ;
global_stat : unit -> P2p_stat.t ;
get_metadata : P2p_peer.Id.t -> 'meta ;
set_metadata : P2p_peer.Id.t -> 'meta -> unit ;
recv : ('msg, 'meta) connection -> 'msg tzresult Lwt.t ;
recv_any : unit -> (('msg, 'meta) connection * 'msg) Lwt.t ;
send : ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t ;
try_send : ('msg, 'meta) connection -> 'msg -> bool ;
get_peer_metadata : P2p_peer.Id.t -> 'peer_meta ;
set_peer_metadata : P2p_peer.Id.t -> 'peer_meta -> unit ;
recv : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg tzresult Lwt.t ;
recv_any : unit -> (('msg, 'peer_meta, 'conn_meta) connection * 'msg) Lwt.t ;
send :
('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit tzresult Lwt.t ;
try_send : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> bool ;
broadcast : 'msg -> unit ;
pool : ('msg, 'meta) P2p_pool.t option ;
pool : ('msg, 'peer_meta, 'conn_meta) P2p_pool.t option ;
fold_connections :
'a. init:'a -> f:(P2p_peer.Id.t -> ('msg, 'meta) connection -> 'a -> 'a) -> 'a ;
iter_connections : (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit ;
on_new_connection : (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit ;
'a. init: 'a ->
f:(P2p_peer.Id.t ->
('msg, 'peer_meta, 'conn_meta) connection -> 'a -> 'a) -> 'a ;
iter_connections :
(P2p_peer.Id.t ->
('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit ;
on_new_connection :
(P2p_peer.Id.t ->
('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit ;
}
type ('msg, 'meta) net = ('msg, 'meta) t
type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t
let check_limits =
let fail_1 v orig =
@ -372,9 +388,9 @@ let check_limits =
end >>=? fun () ->
return ()
let create ~config ~limits meta_cfg msg_cfg =
let create ~config ~limits peer_cfg conn_cfg msg_cfg =
check_limits limits >>=? fun () ->
Real.create ~config ~limits meta_cfg msg_cfg >>=? fun net ->
Real.create ~config ~limits peer_cfg conn_cfg msg_cfg >>=? fun net ->
return {
versions = msg_cfg.versions ;
peer_id = Real.peer_id net ;
@ -387,8 +403,8 @@ let create ~config ~limits meta_cfg msg_cfg =
connection_info = Real.connection_info net ;
connection_stat = Real.connection_stat net ;
global_stat = Real.global_stat net ;
get_metadata = Real.get_metadata net ;
set_metadata = Real.set_metadata net ;
get_peer_metadata = Real.get_peer_metadata net ;
set_peer_metadata = Real.set_peer_metadata net ;
recv = Real.recv net ;
recv_any = Real.recv_any net ;
send = Real.send net ;
@ -400,7 +416,7 @@ let create ~config ~limits meta_cfg msg_cfg =
on_new_connection = Real.on_new_connection net ;
}
let faked_network meta_config = {
let faked_network peer_cfg = {
versions = [] ;
peer_id = Fake.id.peer_id ;
maintain = Lwt.return ;
@ -412,8 +428,8 @@ let faked_network meta_config = {
connection_info = (fun _ -> Fake.connection_info) ;
connection_stat = (fun _ -> Fake.empty_stat) ;
global_stat = (fun () -> Fake.empty_stat) ;
get_metadata = (fun _ -> meta_config.initial) ;
set_metadata = (fun _ _ -> ()) ;
get_peer_metadata = (fun _ -> peer_cfg.peer_meta_initial) ;
set_peer_metadata = (fun _ _ -> ()) ;
recv = (fun _ -> Lwt_utils.never_ending) ;
recv_any = (fun () -> Lwt_utils.never_ending) ;
send = (fun _ _ -> fail P2p_errors.Connection_closed) ;
@ -435,8 +451,8 @@ let find_connection net = net.find_connection
let connection_info net = net.connection_info
let connection_stat net = net.connection_stat
let global_stat net = net.global_stat ()
let get_metadata net = net.get_metadata
let set_metadata net = net.set_metadata
let get_peer_metadata net = net.get_peer_metadata
let set_peer_metadata net = net.set_peer_metadata
let recv net = net.recv
let recv_any net = net.recv_any ()
let send net = net.send

View File

@ -15,10 +15,15 @@
nodes.
*)
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
score : 'meta -> float
type 'peer_meta peer_meta_config = {
peer_meta_encoding : 'peer_meta Data_encoding.t;
peer_meta_initial : 'peer_meta;
score : 'peer_meta -> float ;
}
type 'conn_meta conn_meta_config = {
conn_meta_encoding : 'conn_meta Data_encoding.t;
conn_meta_value : P2p_peer.Id.t -> 'conn_meta ;
}
type 'msg app_message_encoding = Encoding : {
@ -127,94 +132,123 @@ type limits = {
(** Type of a P2P layer instance, parametrized by:
['msg]: type of messages exchanged between peers
['meta]: type of the metadata associated with peers (score, etc.)
['peer_meta]: type of the metadata associated with peers (score, etc.)
['conn_meta]: type of the metadata associated with connection (ack_cfg)
*)
type ('msg, 'meta) t
type ('msg, 'meta) net = ('msg, 'meta) t
type ('msg, 'peer_meta, 'conn_meta) t
type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t
(** A faked p2p layer, which do not initiate any connection
nor open any listening socket *)
val faked_network : 'meta meta_config -> ('msg, 'meta) net
val faked_network :
'peer_meta peer_meta_config ->
('msg, 'peer_meta, 'conn_meta) net
(** Main network initialisation function *)
val create :
config:config -> limits:limits ->
'meta meta_config -> 'msg message_config -> ('msg, 'meta) net tzresult Lwt.t
'peer_meta peer_meta_config -> 'conn_meta conn_meta_config ->
'msg message_config -> ('msg, 'peer_meta, 'conn_meta) net tzresult Lwt.t
(** Return one's peer_id *)
val peer_id : ('msg, 'meta) net -> P2p_peer.Id.t
val peer_id : ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t
(** A maintenance operation : try and reach the ideal number of peers *)
val maintain : ('msg, 'meta) net -> unit Lwt.t
val maintain : ('msg, 'peer_meta, 'conn_meta) net -> unit Lwt.t
(** Voluntarily drop some peers and replace them by new buddies *)
val roll : ('msg, 'meta) net -> unit Lwt.t
val roll : ('msg, 'peer_meta, 'conn_meta) net -> unit Lwt.t
(** Close all connections properly *)
val shutdown : ('msg, 'meta) net -> unit Lwt.t
val shutdown : ('msg, 'peer_meta, 'conn_meta) net -> unit Lwt.t
(** A connection to a peer *)
type ('msg, 'meta) connection
type ('msg, 'peer_meta, 'conn_meta) connection
(** Access the domain of active peers *)
val connections : ('msg, 'meta) net -> ('msg, 'meta) connection list
val connections :
('msg, 'peer_meta, 'conn_meta) net ->
('msg, 'peer_meta, 'conn_meta) connection list
(** Return the active peer with identity [peer_id] *)
val find_connection : ('msg, 'meta) net -> P2p_peer.Id.t -> ('msg, 'meta) connection option
val find_connection :
('msg, 'peer_meta, 'conn_meta) net ->
P2p_peer.Id.t ->
('msg, 'peer_meta, 'conn_meta) connection option
(** Access the info of an active peer, if available *)
val connection_info :
('msg, 'meta) net -> ('msg, 'meta) connection -> P2p_connection.Info.t
('msg, 'peer_meta, 'conn_meta) net ->
('msg, 'peer_meta, 'conn_meta) connection ->
P2p_connection.Info.t
val connection_stat :
('msg, 'meta) net -> ('msg, 'meta) connection -> P2p_stat.t
('msg, 'peer_meta, 'conn_meta) net ->
('msg, 'peer_meta, 'conn_meta) connection ->
P2p_stat.t
(** Cleanly closes a connection. *)
val disconnect :
('msg, 'meta) net -> ?wait:bool -> ('msg, 'meta) connection -> unit Lwt.t
('msg, 'peer_meta, 'conn_meta) net ->
?wait:bool ->
('msg, 'peer_meta, 'conn_meta) connection ->
unit Lwt.t
val global_stat : ('msg, 'meta) net -> P2p_stat.t
val global_stat : ('msg, 'peer_meta, 'conn_meta) net -> P2p_stat.t
(** Accessors for meta information about a global identifier *)
val get_metadata : ('msg, 'meta) net -> P2p_peer.Id.t -> 'meta
val set_metadata : ('msg, 'meta) net -> P2p_peer.Id.t -> 'meta -> unit
val get_peer_metadata :
('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t -> 'peer_meta
val set_peer_metadata :
('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t -> 'peer_meta -> unit
(** Wait for a message from a given connection. *)
val recv :
('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg tzresult Lwt.t
('msg, 'peer_meta, 'conn_meta) net ->
('msg, 'peer_meta, 'conn_meta) connection ->
'msg tzresult Lwt.t
(** Wait for a message from any active connections. *)
val recv_any :
('msg, 'meta) net -> (('msg, 'meta) connection * 'msg) Lwt.t
('msg, 'peer_meta, 'conn_meta) net ->
(('msg, 'peer_meta, 'conn_meta) connection * 'msg) Lwt.t
(** [send net peer msg] is a thread that returns when [msg] has been
successfully enqueued in the send queue. *)
val send :
('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t
('msg, 'peer_meta, 'conn_meta) net ->
('msg, 'peer_meta, 'conn_meta) connection ->
'msg ->
unit tzresult Lwt.t
(** [try_send net peer msg] is [true] if [msg] has been added to the
send queue for [peer], [false] otherwise *)
val try_send :
('msg, 'meta) net -> ('msg, 'meta) connection -> 'msg -> bool
('msg, 'peer_meta, 'conn_meta) net ->
('msg, 'peer_meta, 'conn_meta) connection ->
'msg ->
bool
(** Send a message to all peers *)
val broadcast : ('msg, 'meta) net -> 'msg -> unit
val broadcast : ('msg, 'peer_meta, 'conn_meta) net -> 'msg -> unit
val fold_connections :
('msg, 'meta) net ->
init:'a -> f:(P2p_peer.Id.t -> ('msg, 'meta) connection -> 'a -> 'a) -> 'a
('msg, 'peer_meta, 'conn_meta) net ->
init:'a ->
f:(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> 'a -> 'a) ->
'a
val iter_connections :
('msg, 'meta) net ->
(P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit
('msg, 'peer_meta, 'conn_meta) net ->
(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit
val on_new_connection :
('msg, 'meta) net ->
(P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit
('msg, 'peer_meta, 'conn_meta) net ->
(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit
val build_rpc_directory : _ t -> unit RPC_directory.t
val greylist_addr : ('msg, 'meta) net -> P2p_addr.t -> unit
val greylist_peer : ('msg, 'meta) net -> P2p_peer.Id.t -> unit
val greylist_addr : ('msg, 'peer_meta, 'conn_meta) net -> P2p_addr.t -> unit
val greylist_peer : ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t -> unit
(**/**)

View File

@ -16,7 +16,7 @@ type bounds = {
max_threshold: int ;
}
type 'meta pool = Pool : ('msg, 'meta) P2p_pool.t -> 'meta pool
type 'meta pool = Pool : ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta pool
type 'meta t = {
canceler: Lwt_canceler.t ;

View File

@ -36,7 +36,7 @@ type bounds = {
type 'meta t
(** Type of a maintenance worker. *)
val run: bounds -> ('msg, 'meta) P2p_pool.t -> 'meta t
val run: bounds -> ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta t
(** [run ~greylist_timeout bounds pool] is a maintenance worker for
[pool] with connection targets specified in [bounds]. *)

View File

@ -9,13 +9,14 @@
open P2p_peer
type 'data t =
type ('conn, 'conn_meta) t =
| Accepted of { current_point: P2p_connection.Id.t ;
cancel: Lwt_canceler.t }
| Running of { data: 'data ;
| Running of { data: 'conn ;
conn_metadata: 'conn_meta ;
current_point: P2p_connection.Id.t }
| Disconnected
type 'data state = 'data t
type ('conn, 'conn_meta) state = ('conn, 'conn_meta) t
let pp ppf = function
| Accepted { current_point ; _ } ->
@ -27,11 +28,11 @@ let pp ppf = function
module Info = struct
type ('conn, 'meta) t = {
type ('conn, 'peer_meta, 'conn_meta) t = {
peer_id : Id.t ;
created : Time.t ;
mutable state : 'conn state ;
mutable metadata : 'meta ;
mutable state : ('conn, 'conn_meta) state ;
mutable peer_metadata : 'peer_meta ;
mutable trusted : bool ;
mutable last_failed_connection : (P2p_connection.Id.t * Time.t) option ;
mutable last_rejected_connection : (P2p_connection.Id.t * Time.t) option ;
@ -40,17 +41,17 @@ module Info = struct
events : Pool_event.t Ring.t ;
watchers : Pool_event.t Lwt_watcher.input ;
}
type ('conn, 'meta) peer_info = ('conn, 'meta) t
type ('conn, 'peer_meta, 'conn_meta) peer_info = ('conn, 'peer_meta, 'conn_meta) t
let compare gi1 gi2 = Id.compare gi1.peer_id gi2.peer_id
let log_size = 100
let create ?(created = Time.now ()) ?(trusted = false) ~metadata peer_id =
let create ?(created = Time.now ()) ?(trusted = false) ~peer_metadata peer_id =
{ peer_id ;
created ;
state = Disconnected ;
metadata ;
peer_metadata ;
trusted ;
last_failed_connection = None ;
last_rejected_connection = None ;
@ -60,23 +61,23 @@ module Info = struct
watchers = Lwt_watcher.create_input () ;
}
let encoding metadata_encoding =
let encoding peer_metadata_encoding =
let open Data_encoding in
conv
(fun { peer_id ; trusted ; metadata ; events ; created ;
(fun { peer_id ; trusted ; peer_metadata ; events ; created ;
last_failed_connection ; last_rejected_connection ;
last_established_connection ; last_disconnection ; _ } ->
(peer_id, created, trusted, metadata, Ring.elements events,
(peer_id, created, trusted, peer_metadata, Ring.elements events,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection))
(fun (peer_id, created, trusted, metadata, event_list,
(fun (peer_id, created, trusted, peer_metadata, event_list,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection) ->
let info = create ~trusted ~metadata peer_id in
let info = create ~trusted ~peer_metadata peer_id in
let events = Ring.create log_size in
Ring.add_list info.events event_list ;
{ state = Disconnected ;
trusted ; peer_id ; metadata ; created ;
trusted ; peer_id ; peer_metadata ; created ;
last_failed_connection ;
last_rejected_connection ;
last_established_connection ;
@ -88,7 +89,7 @@ module Info = struct
(req "peer_id" Id.encoding)
(req "created" Time.encoding)
(dft "trusted" bool false)
(req "metadata" metadata_encoding)
(req "peer_metadata" peer_metadata_encoding)
(dft "events" (list Pool_event.encoding) [])
(opt "last_failed_connection"
(tup2 P2p_connection.Id.encoding Time.encoding))
@ -101,8 +102,8 @@ module Info = struct
let peer_id { peer_id ; _ } = peer_id
let created { created ; _ } = created
let metadata { metadata ; _ } = metadata
let set_metadata gi metadata = gi.metadata <- metadata
let peer_metadata { peer_metadata ; _ } = peer_metadata
let set_peer_metadata gi peer_metadata = gi.peer_metadata <- peer_metadata
let trusted { trusted ; _ } = trusted
let set_trusted gi = gi.trusted <- true
let unset_trusted gi = gi.trusted <- false
@ -130,18 +131,19 @@ module Info = struct
module File = struct
let load path metadata_encoding =
let enc = Data_encoding.list (encoding metadata_encoding) in
let load path peer_metadata_encoding =
let enc =
Data_encoding.list (encoding peer_metadata_encoding) in
if path <> "/dev/null" && Sys.file_exists path then
Lwt_utils_unix.Json.read_file path >>=? fun json ->
return (Data_encoding.Json.destruct enc json)
else
return []
let save path metadata_encoding peers =
let save path peer_metadata_encoding peers =
let open Data_encoding in
Lwt_utils_unix.Json.write_file path @@
Json.construct (list (encoding metadata_encoding)) peers
Json.construct (list (encoding peer_metadata_encoding)) peers
end
@ -170,7 +172,7 @@ let set_accepted
let set_running
?(timestamp = Time.now ())
peer_info point data =
peer_info point data conn_metadata =
assert begin
match peer_info.Info.state with
| Disconnected -> true (* request to unknown peer_id. *)
@ -178,7 +180,7 @@ let set_running
| Accepted { current_point ; _ } ->
P2p_connection.Id.equal point current_point
end ;
peer_info.state <- Running { data ; current_point = point } ;
peer_info.state <- Running { data ; conn_metadata ; current_point = point } ;
peer_info.last_established_connection <- Some (point, timestamp) ;
Info.log peer_info ~timestamp point Connection_established

View File

@ -9,56 +9,57 @@
open P2p_peer
type 'conn t =
type ('conn, 'conn_meta) t =
| Accepted of { current_point: P2p_connection.Id.t ;
cancel: Lwt_canceler.t }
(** We accepted a incoming connection, we greeted back and
we are waiting for an acknowledgement. *)
| Running of { data: 'conn ;
conn_metadata: 'conn_meta ;
current_point: P2p_connection.Id.t }
(** Successfully authentificated connection, normal business. *)
| Disconnected
(** No connection established currently. *)
type 'conn state = 'conn t
type ('conn, 'conn_meta) state = ('conn, 'conn_meta) t
val pp : Format.formatter -> 'conn t -> unit
val pp : Format.formatter -> ('conn, 'conn_meta) t -> unit
module Info : sig
type ('conn, 'meta) t
type ('conn, 'meta) peer_info = ('conn, 'meta) t
type ('conn, 'peer_meta, 'conn_meta) t
type ('conn, 'peer_meta, 'conn_meta) peer_info = ('conn, 'peer_meta, 'conn_meta) t
val compare : ('conn, 'meta) t -> ('conn, 'meta) t -> int
val compare : ('conn, 'peer_meta, 'conn_meta) t -> ('conn, 'peer_meta, 'conn_meta) t -> int
val create :
?created:Time.t ->
?trusted:bool ->
metadata:'meta ->
Id.t -> ('conn, 'meta) peer_info
peer_metadata:'peer_meta ->
Id.t -> ('conn, 'peer_meta, 'conn_meta) peer_info
(** [create ~trusted ~meta peer_id] is a freshly minted peer_id info for
[peer_id]. *)
val peer_id : ('conn, 'meta) peer_info -> Id.t
val peer_id : ('conn, 'peer_meta, 'conn_meta) peer_info -> Id.t
val created : ('conn, 'meta) peer_info -> Time.t
val metadata : ('conn, 'meta) peer_info -> 'meta
val set_metadata : ('conn, 'meta) peer_info -> 'meta -> unit
val created : ('conn, 'peer_meta, 'conn_meta) peer_info -> Time.t
val peer_metadata : ('conn, 'peer_meta, 'conn_meta) peer_info -> 'peer_meta
val set_peer_metadata : ('conn, 'peer_meta, 'conn_meta) peer_info -> 'peer_meta -> unit
val trusted : ('conn, 'meta) peer_info -> bool
val set_trusted : ('conn, 'meta) peer_info -> unit
val unset_trusted : ('conn, 'meta) peer_info -> unit
val trusted : ('conn, 'peer_meta, 'conn_meta) peer_info -> bool
val set_trusted : ('conn, 'peer_meta, 'conn_meta) peer_info -> unit
val unset_trusted : ('conn, 'peer_meta, 'conn_meta) peer_info -> unit
val last_failed_connection :
('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option
('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option
val last_rejected_connection :
('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option
('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option
val last_established_connection :
('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option
('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option
val last_disconnection :
('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option
('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option
val last_seen :
('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option
('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option
(** [last_seen gi] is the most recent of:
* last established connection
@ -67,7 +68,7 @@ module Info : sig
*)
val last_miss :
('conn, 'meta) peer_info -> (P2p_connection.Id.t * Time.t) option
('conn, 'peer_meta, 'conn_meta) peer_info -> (P2p_connection.Id.t * Time.t) option
(** [last_miss gi] is the most recent of:
* last failed connection
@ -77,39 +78,38 @@ module Info : sig
val log_incoming_rejection :
?timestamp:Time.t ->
('conn, 'meta) peer_info -> P2p_connection.Id.t -> unit
('conn, 'peer_meta, 'conn_meta) peer_info -> P2p_connection.Id.t -> unit
module File : sig
val load :
string -> 'meta Data_encoding.t ->
('conn, 'meta) peer_info list tzresult Lwt.t
string -> 'peer_meta Data_encoding.t ->
('conn, 'peer_meta, 'conn_meta) peer_info list tzresult Lwt.t
val save :
string -> 'meta Data_encoding.t ->
('conn, 'meta) peer_info list -> unit tzresult Lwt.t
string -> 'peer_meta Data_encoding.t ->
('conn, 'peer_meta, 'conn_meta) peer_info list -> unit tzresult Lwt.t
end
val fold :
('conn, 'meta) t -> init:'a -> f:('a -> Pool_event.t -> 'a) -> 'a
('conn, 'peer_meta, 'conn_meta) t -> init:'a -> f:('a -> Pool_event.t -> 'a) -> 'a
val watch :
('conn, 'meta) t -> Pool_event.t Lwt_stream.t * Lwt_watcher.stopper
('conn, 'peer_meta, 'conn_meta) t -> Pool_event.t Lwt_stream.t * Lwt_watcher.stopper
end
val get : ('conn, 'peer_meta, 'conn_meta) Info.t -> ('conn, 'conn_meta) state
val get : ('conn, 'meta) Info.t -> 'conn state
val is_disconnected : ('conn, 'meta) Info.t -> bool
val is_disconnected : ('conn, 'peer_meta, 'conn_meta) Info.t -> bool
val set_accepted :
?timestamp:Time.t ->
('conn, 'meta) Info.t -> P2p_connection.Id.t -> Lwt_canceler.t -> unit
('conn, 'peer_meta, 'conn_meta) Info.t -> P2p_connection.Id.t -> Lwt_canceler.t -> unit
val set_running :
?timestamp:Time.t ->
('conn, 'meta) Info.t -> P2p_connection.Id.t -> 'conn -> unit
('conn, 'peer_meta, 'conn_meta) Info.t -> P2p_connection.Id.t -> 'conn -> 'conn_meta -> unit
val set_disconnected :
?timestamp:Time.t ->
?requested:bool ->
('conn, 'meta) Info.t -> unit
('conn, 'peer_meta, 'conn_meta) Info.t -> unit

View File

@ -192,10 +192,10 @@ type config = {
binary_chunks_size : int option ;
}
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
score : 'meta -> float;
type 'peer_meta peer_meta_config = {
peer_meta_encoding : 'peer_meta Data_encoding.t ;
peer_meta_initial : 'peer_meta ;
score : 'peer_meta -> float ;
}
type 'msg message_config = {
@ -203,17 +203,29 @@ type 'msg message_config = {
versions : P2p_version.t list;
}
type ('msg, 'meta) t = {
type 'conn_meta conn_meta_config = {
conn_meta_encoding : 'conn_meta Data_encoding.t ;
conn_meta_value : P2p_peer.Id.t -> 'conn_meta ;
}
type ('msg, 'peer_meta, 'conn_meta) t = {
config : config ;
meta_config : 'meta meta_config ;
peer_meta_config : 'peer_meta peer_meta_config ;
conn_meta_config : 'conn_meta conn_meta_config ;
message_config : 'msg message_config ;
my_id_points : unit P2p_point.Table.t ;
known_peer_ids :
(('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t P2p_peer.Table.t ;
(('msg, 'peer_meta, 'conn_meta) connection,
'peer_meta,
'conn_meta) P2p_peer_state.Info.t P2p_peer.Table.t ;
connected_peer_ids :
(('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t P2p_peer.Table.t ;
known_points : ('msg, 'meta) connection P2p_point_state.Info.t P2p_point.Table.t ;
connected_points : ('msg, 'meta) connection P2p_point_state.Info.t P2p_point.Table.t ;
(('msg, 'peer_meta, 'conn_meta) connection,
'peer_meta,
'conn_meta) P2p_peer_state.Info.t P2p_peer.Table.t ;
known_points :
('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t P2p_point.Table.t ;
connected_points :
('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t P2p_point.Table.t ;
incoming : Lwt_canceler.t P2p_point.Table.t ;
io_sched : P2p_io_scheduler.t ;
encoding : 'msg Message.t Data_encoding.t ;
@ -221,7 +233,7 @@ type ('msg, 'meta) t = {
watcher : P2p_connection.Pool_event.t Lwt_watcher.input ;
acl : P2p_acl.t ;
mutable new_connection_hook :
(P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) list ;
(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) list ;
mutable latest_accepted_swap : Time.t ;
mutable latest_succesfull_swap : Time.t ;
}
@ -233,18 +245,20 @@ and events = {
new_connection : unit Lwt_condition.t ;
}
and ('msg, 'meta) connection = {
and ('msg, 'peer_meta, 'conn_meta) connection = {
canceler : Lwt_canceler.t ;
messages : (int * 'msg) Lwt_pipe.t ;
conn : 'msg Message.t P2p_socket.t ;
peer_info : (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t ;
point_info : ('msg, 'meta) connection P2p_point_state.Info.t option ;
peer_info :
(('msg, 'peer_meta, 'conn_meta) connection, 'peer_meta, 'conn_meta) P2p_peer_state.Info.t ;
point_info :
('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t option ;
answerer : 'msg Answerer.t Lazy.t ;
mutable last_sent_swap_request : (Time.t * P2p_peer.Id.t) option ;
mutable wait_close : bool ;
}
type ('msg, 'meta) pool = ('msg, 'meta) t
type ('msg, 'peer_meta, 'conn_meta) pool = ('msg, 'peer_meta, 'conn_meta) t
module Pool_event = struct
let wait_too_few_connections pool =
@ -318,7 +332,7 @@ module Gc_peer_set = List.Bounded(struct
if score_cmp = 0 then Time.compare t t' else - score_cmp
end)
let gc_peer_ids ({ meta_config = { score } ;
let gc_peer_ids ({ peer_meta_config = { score } ;
config = { max_known_peer_ids } ;
known_peer_ids ; } as pool) =
match max_known_peer_ids with
@ -327,7 +341,7 @@ let gc_peer_ids ({ meta_config = { score } ;
let table = Gc_peer_set.create target in
P2p_peer.Table.iter (fun peer_id peer_info ->
let created = P2p_peer_state.Info.created peer_info in
let score = score @@ P2p_peer_state.Info.metadata peer_info in
let score = score @@ P2p_peer_state.Info.peer_metadata peer_info in
Gc_peer_set.insert (score, created, peer_id) table
) known_peer_ids ;
let to_remove = Gc_peer_set.get table in
@ -340,7 +354,9 @@ let register_peer pool peer_id =
match P2p_peer.Table.find pool.known_peer_ids peer_id with
| exception Not_found ->
Lwt_condition.broadcast pool.events.new_peer () ;
let peer = P2p_peer_state.Info.create peer_id ~metadata:pool.meta_config.initial in
let peer =
P2p_peer_state.Info.create peer_id
~peer_metadata:pool.peer_meta_config.peer_meta_initial in
Option.iter pool.config.max_known_peer_ids ~f:begin fun (max, _) ->
if P2p_peer.Table.length pool.known_peer_ids >= max then gc_peer_ids pool
end ;
@ -421,7 +437,8 @@ let get_addr pool peer_id =
module Points = struct
type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t
type ('msg, 'peer_meta, 'conn_meta) info =
('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t
let info { known_points } point =
P2p_point.Table.find_opt known_points point
@ -461,21 +478,22 @@ end
module Peers = struct
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t
type ('msg, 'peer_meta, 'conn_meta) info =
(('msg, 'peer_meta, 'conn_meta) connection, 'peer_meta, 'conn_meta) P2p_peer_state.Info.t
let info { known_peer_ids } peer_id =
try Some (P2p_peer.Table.find known_peer_ids peer_id)
with Not_found -> None
let get_metadata pool peer_id =
try P2p_peer_state.Info.metadata (P2p_peer.Table.find pool.known_peer_ids peer_id)
with Not_found -> pool.meta_config.initial
let get_peer_metadata pool peer_id =
try P2p_peer_state.Info.peer_metadata (P2p_peer.Table.find pool.known_peer_ids peer_id)
with Not_found -> pool.peer_meta_config.peer_meta_initial
let get_score pool peer_id =
pool.meta_config.score (get_metadata pool peer_id)
pool.peer_meta_config.score (get_peer_metadata pool peer_id)
let set_metadata pool peer_id data =
P2p_peer_state.Info.set_metadata (register_peer pool peer_id) data
let set_peer_metadata pool peer_id data =
P2p_peer_state.Info.set_peer_metadata (register_peer pool peer_id) data
let get_trusted pool peer_id =
try P2p_peer_state.Info.trusted (P2p_peer.Table.find pool.known_peer_ids peer_id)
@ -561,7 +579,7 @@ module Connection = struct
let stat { conn } =
P2p_socket.stat conn
let score { meta_config = { score }} meta = score meta
let score { peer_meta_config = { score }} meta = score meta
let info { conn } =
P2p_socket.info conn
@ -688,6 +706,7 @@ and authenticate pool ?point_info canceler fd point =
~incoming (P2p_io_scheduler.register pool.io_sched fd) point
?listening_port:pool.config.listening_port
pool.config.identity pool.message_config.versions
pool.conn_meta_config.conn_meta_encoding
end ~on_error: begin fun err ->
begin match err with
| [ Canceled ] ->
@ -779,11 +798,13 @@ and authenticate pool ?point_info canceler fd point =
?incoming_message_queue_size:pool.config.incoming_message_queue_size
?outgoing_message_queue_size:pool.config.outgoing_message_queue_size
?binary_chunks_size:pool.config.binary_chunks_size
auth_fd pool.encoding >>= fun conn ->
auth_fd
(pool.conn_meta_config.conn_meta_value info.peer_id)
pool.encoding >>=? fun (conn, ack_cfg) ->
lwt_debug "authenticate: %a -> Connected %a"
P2p_point.Id.pp point
P2p_connection.Info.pp info >>= fun () ->
Lwt.return conn
return (conn, ack_cfg)
end ~on_error: begin fun err ->
if incoming then
log pool
@ -795,7 +816,7 @@ and authenticate pool ?point_info canceler fd point =
~f:P2p_point_state.set_disconnected ;
P2p_peer_state.set_disconnected peer_info ;
Lwt.return (Error err)
end >>=? fun conn ->
end >>=? fun (conn, ack_cfg) ->
let id_point =
match info.id_point, Option.map ~f:P2p_point_state.Info.point point_info with
| (addr, _), Some (_, port) -> addr, Some port
@ -803,7 +824,7 @@ and authenticate pool ?point_info canceler fd point =
return
(create_connection
pool conn
id_point connection_point_info peer_info version)
id_point connection_point_info peer_info version ack_cfg)
end
| _ -> begin
log pool (Rejecting_request (point, info.id_point, info.peer_id)) ;
@ -819,7 +840,7 @@ and authenticate pool ?point_info canceler fd point =
fail (P2p_errors.Rejected info.peer_id)
end
and create_connection pool p2p_conn id_point point_info peer_info _version =
and create_connection pool p2p_conn id_point point_info peer_info _version ack_cfg =
let peer_id = P2p_peer_state.Info.peer_id peer_info in
let canceler = Lwt_canceler.create () in
let size =
@ -851,7 +872,7 @@ and create_connection pool p2p_conn id_point point_info peer_info _version =
P2p_point.Table.add pool.connected_points point point_info ;
end ;
log pool (Connection_established (id_point, peer_id)) ;
P2p_peer_state.set_running peer_info id_point conn ;
P2p_peer_state.set_running peer_info id_point conn ack_cfg ;
P2p_peer.Table.add pool.connected_peer_ids peer_id peer_info ;
Lwt_condition.broadcast pool.events.new_connection () ;
Lwt_canceler.on_cancel canceler begin fun () ->
@ -1013,7 +1034,7 @@ let send_swap_request pool =
(***************************************************************************)
let create config meta_config message_config io_sched =
let create config peer_meta_config conn_meta_config message_config io_sched =
let events = {
too_few_connections = Lwt_condition.create () ;
too_many_connections = Lwt_condition.create () ;
@ -1021,7 +1042,7 @@ let create config meta_config message_config io_sched =
new_connection = Lwt_condition.create () ;
} in
let pool = {
config ; meta_config ; message_config ;
config ; peer_meta_config ; conn_meta_config; message_config ;
my_id_points = P2p_point.Table.create 7 ;
known_peer_ids = P2p_peer.Table.create 53 ;
connected_peer_ids = P2p_peer.Table.create 53 ;
@ -1038,7 +1059,9 @@ let create config meta_config message_config io_sched =
latest_succesfull_swap = Time.epoch ;
} in
List.iter (Points.set_trusted pool) config.trusted_points ;
P2p_peer_state.Info.File.load config.peers_file meta_config.encoding >>= function
P2p_peer_state.Info.File.load
config.peers_file
peer_meta_config.peer_meta_encoding >>= function
| Ok peer_ids ->
List.iter
(fun peer_info ->

View File

@ -13,14 +13,15 @@
A pool and its connections are parametrized by the type of
messages exchanged over the connection and the type of
meta-information associated with a peer. The type [('msg, 'meta)
meta-information associated with a peer. The type
[('msg, 'peer_meta,'conn_meta)
connection] is a wrapper on top of [P2p_socket.t] that adds
meta-information, a data-structure describing the detailed state of
the connection, as well as a new message queue (referred to "app
message queue") that will only contain the messages from the
internal [P2p_socket.t] that needs to be examined by the higher
layers. Some messages are directly processed by an internal worker
and thus never propagated above. *)
meta-informations, data-structures describing the detailed state of
the peer and the connection, as well as a new message queue
(referred to "app message queue") that will only contain the
messages from the internal [P2p_socket.t] that needs to be examined
by the higher layers. Some messages are directly processed by an
internal worker and thus never propagated above. *)
type 'msg encoding = Encoding : {
tag: int ;
@ -32,11 +33,12 @@ type 'msg encoding = Encoding : {
(** {1 Pool management} *)
type ('msg, 'meta) t
type ('msg, 'peer_meta,'conn_meta) t
type ('msg, 'meta) pool = ('msg, 'meta) t
type ('msg, 'peer_meta,'conn_meta) pool = ('msg, 'peer_meta,'conn_meta) t
(** The type of a pool of connections, parametrized by resp. the type
of messages and the meta-information associated to an identity. *)
of messages and the meta-informations associated to an identity and
a connection. *)
type config = {
@ -121,10 +123,15 @@ type config = {
peers. Default value is 64 kB. *)
}
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
score : 'meta -> float;
type 'peer_meta peer_meta_config = {
peer_meta_encoding : 'peer_meta Data_encoding.t ;
peer_meta_initial : 'peer_meta ;
score : 'peer_meta -> float ;
}
type 'conn_meta conn_meta_config = {
conn_meta_encoding : 'conn_meta Data_encoding.t ;
conn_meta_value : P2p_peer.Id.t -> 'conn_meta ;
}
type 'msg message_config = {
@ -134,22 +141,23 @@ type 'msg message_config = {
val create:
config ->
'meta meta_config ->
'peer_meta peer_meta_config ->
'conn_meta conn_meta_config ->
'msg message_config ->
P2p_io_scheduler.t ->
('msg, 'meta) pool Lwt.t
('msg, 'peer_meta,'conn_meta) pool Lwt.t
(** [create config meta_cfg msg_cfg io_sched] is a freshly minted
pool. *)
val destroy: ('msg, 'meta) pool -> unit Lwt.t
val destroy: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t
(** [destroy pool] returns when member connections are either
disconnected or canceled. *)
val active_connections: ('msg, 'meta) pool -> int
val active_connections: ('msg, 'peer_meta,'conn_meta) pool -> int
(** [active_connections pool] is the number of connections inside
[pool]. *)
val pool_stat: ('msg, 'meta) pool -> P2p_stat.t
val pool_stat: ('msg, 'peer_meta,'conn_meta) pool -> P2p_stat.t
(** [pool_stat pool] is a snapshot of current bandwidth usage for the
entire [pool]. *)
@ -157,25 +165,25 @@ val config : _ pool -> config
(** [config pool] is the [config] argument passed to [pool] at
creation. *)
val send_swap_request: ('msg, 'meta) pool -> unit
val send_swap_request: ('msg, 'peer_meta,'conn_meta) pool -> unit
(** {2 Pool events} *)
module Pool_event : sig
val wait_too_few_connections: ('msg, 'meta) pool -> unit Lwt.t
val wait_too_few_connections: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t
(** [wait_too_few_connections pool] is determined when the number of
connections drops below the desired level. *)
val wait_too_many_connections: ('msg, 'meta) pool -> unit Lwt.t
val wait_too_many_connections: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t
(** [wait_too_many_connections pool] is determined when the number of
connections exceeds the desired level. *)
val wait_new_peer: ('msg, 'meta) pool -> unit Lwt.t
val wait_new_peer: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t
(** [wait_new_peer pool] is determined when a new peer
(i.e. authentication successful) gets added to the pool. *)
val wait_new_connection: ('msg, 'meta) pool -> unit Lwt.t
val wait_new_connection: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t
(** [wait_new_connection pool] is determined when a new connection is
succesfully established in the pool. *)
@ -184,142 +192,154 @@ end
(** {1 Connections management} *)
type ('msg, 'meta) connection
type ('msg, 'peer_meta,'conn_meta) connection
(** Type of a connection to a peer, parametrized by the type of
messages exchanged as well as meta-information associated to a
peer. It mostly wraps [P2p_connection.connection], adding
meta-information and data-structures describing a more
peer and a connection. It mostly wraps [P2p_connection.connection],
adding meta-information and data-structures describing a more
fine-grained logical state of the connection. *)
val connect:
?timeout:float ->
('msg, 'meta) pool -> P2p_point.Id.t ->
('msg, 'meta) connection tzresult Lwt.t
('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t ->
('msg, 'peer_meta,'conn_meta) connection tzresult Lwt.t
(** [connect ?timeout pool point] tries to add a connection to [point]
in [pool] in less than [timeout] seconds. *)
val accept:
('msg, 'meta) pool -> Lwt_unix.file_descr -> P2p_point.Id.t -> unit
('msg, 'peer_meta,'conn_meta) pool -> Lwt_unix.file_descr -> P2p_point.Id.t -> unit
(** [accept pool fd point] instructs [pool] to start the process of
accepting a connection from [fd]. Used by [P2p]. *)
val disconnect:
?wait:bool -> ('msg, 'meta) connection -> unit Lwt.t
?wait:bool -> ('msg, 'peer_meta,'conn_meta) connection -> unit Lwt.t
(** [disconnect conn] cleanly closes [conn] and returns after [conn]'s
internal worker has returned. *)
module Connection : sig
val info: ('msg, 'meta) connection -> P2p_connection.Info.t
val info: ('msg, 'peer_meta,'conn_meta) connection -> P2p_connection.Info.t
val stat: ('msg, 'meta) connection -> P2p_stat.t
val stat: ('msg, 'peer_meta,'conn_meta) connection -> P2p_stat.t
(** [stat conn] is a snapshot of current bandwidth usage for
[conn]. *)
val fold:
('msg, 'meta) pool ->
('msg, 'peer_meta,'conn_meta) pool ->
init:'a ->
f:(P2p_peer.Id.t -> ('msg, 'meta) connection -> 'a -> 'a) ->
f:(P2p_peer.Id.t -> ('msg, 'peer_meta,'conn_meta) connection -> 'a -> 'a) ->
'a
val list:
('msg, 'meta) pool -> (P2p_peer.Id.t * ('msg, 'meta) connection) list
('msg, 'peer_meta,'conn_meta) pool ->
(P2p_peer.Id.t * ('msg, 'peer_meta,'conn_meta) connection) list
val find_by_point:
('msg, 'meta) pool -> P2p_point.Id.t -> ('msg, 'meta) connection option
('msg, 'peer_meta,'conn_meta) pool ->
P2p_point.Id.t ->
('msg, 'peer_meta,'conn_meta) connection option
val find_by_peer_id:
('msg, 'meta) pool -> P2p_peer.Id.t -> ('msg, 'meta) connection option
('msg, 'peer_meta,'conn_meta) pool ->
P2p_peer.Id.t ->
('msg, 'peer_meta,'conn_meta) connection option
end
val on_new_connection:
('msg, 'meta) pool ->
(P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit
('msg, 'peer_meta,'conn_meta) pool ->
(P2p_peer.Id.t -> ('msg, 'peer_meta,'conn_meta) connection -> unit) -> unit
(** {1 I/O on connections} *)
val read: ('msg, 'meta) connection -> 'msg tzresult Lwt.t
val read: ('msg, 'peer_meta,'conn_meta) connection -> 'msg tzresult Lwt.t
(** [read conn] returns a message popped from [conn]'s app message
queue, or fails with [Connection_closed]. *)
val is_readable: ('msg, 'meta) connection -> unit tzresult Lwt.t
val is_readable: ('msg, 'peer_meta,'conn_meta) connection -> unit tzresult Lwt.t
(** [is_readable conn] returns when there is at least one message
ready to be read. *)
val write: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t
val write:
('msg, 'peer_meta,'conn_meta) connection -> 'msg -> unit tzresult Lwt.t
(** [write conn msg] is [P2p_connection.write conn' msg] where [conn']
is the internal [P2p_connection.t] inside [conn]. *)
val write_sync: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t
val write_sync:
('msg, 'peer_meta,'conn_meta) connection -> 'msg -> unit tzresult Lwt.t
(** [write_sync conn msg] is [P2p_connection.write_sync conn' msg]
where [conn'] is the internal [P2p_connection.t] inside [conn]. *)
(**/**)
val raw_write_sync:
('msg, 'meta) connection -> MBytes.t -> unit tzresult Lwt.t
('msg, 'peer_meta,'conn_meta) connection -> MBytes.t -> unit tzresult Lwt.t
(**/**)
val write_now: ('msg, 'meta) connection -> 'msg -> bool tzresult
val write_now: ('msg, 'peer_meta,'conn_meta) connection -> 'msg -> bool tzresult
(** [write_now conn msg] is [P2p_connection.write_now conn' msg] where
[conn'] is the internal [P2p_connection.t] inside [conn]. *)
(** {2 Broadcast functions} *)
val write_all: ('msg, 'meta) pool -> 'msg -> unit
val write_all: ('msg, 'peer_meta,'conn_meta) pool -> 'msg -> unit
(** [write_all pool msg] is [write_now conn msg] for all member
connections to [pool] in [Running] state. *)
val broadcast_bootstrap_msg: ('msg, 'meta) pool -> unit
val broadcast_bootstrap_msg: ('msg, 'peer_meta,'conn_meta) pool -> unit
(** [write_all pool msg] is [P2P_connection.write_now conn Bootstrap]
for all member connections to [pool] in [Running] state. *)
val greylist_addr : ('msg, 'meta) pool -> P2p_addr.t -> unit
val greylist_addr : ('msg, 'peer_meta,'conn_meta) pool -> P2p_addr.t -> unit
(** [greylist_addr pool addr] adds [addr] to [pool]'s IP greylist. *)
val greylist_peer : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val greylist_peer : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit
(** [greylist_peer pool peer] adds [peer] to [pool]'s peer greylist
and [peer]'s address to [pool]'s IP greylist. *)
val gc_greylist: older_than:Time.t -> ('msg, 'meta) pool -> unit
val gc_greylist: older_than:Time.t -> ('msg, 'peer_meta,'conn_meta) pool -> unit
(** [gc_greylist ~older_than pool] *)
val acl_clear : ('msg, 'meta) pool -> unit
val acl_clear : ('msg, 'peer_meta,'conn_meta) pool -> unit
(** [acl_clear pool] clears ACL tables. *)
(** {1 Functions on [Peer_id]} *)
module Peers : sig
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) P2p_peer_state.Info.t
type ('msg, 'peer_meta,'conn_meta) info =
(('msg, 'peer_meta,'conn_meta) connection, 'peer_meta,'conn_meta) P2p_peer_state.Info.t
val info:
('msg, 'meta) pool -> P2p_peer.Id.t -> ('msg, 'meta) info option
('msg, 'peer_meta,'conn_meta) pool ->
P2p_peer.Id.t ->
('msg, 'peer_meta,'conn_meta) info option
val get_metadata: ('msg, 'meta) pool -> P2p_peer.Id.t -> 'meta
val set_metadata: ('msg, 'meta) pool -> P2p_peer.Id.t -> 'meta -> unit
val get_score: ('msg, 'meta) pool -> P2p_peer.Id.t -> float
val get_peer_metadata:
('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> 'peer_meta
val set_peer_metadata:
('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> 'peer_meta -> unit
val get_score: ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> float
val get_trusted: ('msg, 'meta) pool -> P2p_peer.Id.t -> bool
val set_trusted: ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val unset_trusted: ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val get_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> bool
val set_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit
val unset_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit
val fold_known:
('msg, 'meta) pool ->
('msg, 'peer_meta,'conn_meta) pool ->
init:'a ->
f:(P2p_peer.Id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
f:(P2p_peer.Id.t -> ('msg, 'peer_meta,'conn_meta) info -> 'a -> 'a) ->
'a
val fold_connected:
('msg, 'meta) pool ->
('msg, 'peer_meta,'conn_meta) pool ->
init:'a ->
f:(P2p_peer.Id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
f:(P2p_peer.Id.t -> ('msg, 'peer_meta,'conn_meta) info -> 'a -> 'a) ->
'a
val forget : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val ban : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val trust : ('msg, 'meta) pool -> P2p_peer.Id.t -> unit
val banned : ('msg, 'meta) pool -> P2p_peer.Id.t -> bool
val forget : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit
val ban : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit
val trust : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> unit
val banned : ('msg, 'peer_meta,'conn_meta) pool -> P2p_peer.Id.t -> bool
end
@ -327,35 +347,40 @@ end
module Points : sig
type ('msg, 'meta) info = ('msg, 'meta) connection P2p_point_state.Info.t
type ('msg, 'peer_meta,'conn_meta) info =
('msg, 'peer_meta,'conn_meta) connection P2p_point_state.Info.t
val info:
('msg, 'meta) pool -> P2p_point.Id.t -> ('msg, 'meta) info option
('msg, 'peer_meta,'conn_meta) pool ->
P2p_point.Id.t ->
('msg, 'peer_meta,'conn_meta) info option
val get_trusted: ('msg, 'meta) pool -> P2p_point.Id.t -> bool
val set_trusted: ('msg, 'meta) pool -> P2p_point.Id.t -> unit
val unset_trusted: ('msg, 'meta) pool -> P2p_point.Id.t -> unit
val get_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> bool
val set_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit
val unset_trusted: ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit
val fold_known:
('msg, 'meta) pool ->
('msg, 'peer_meta,'conn_meta) pool ->
init:'a ->
f:(P2p_point.Id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
f:(P2p_point.Id.t -> ('msg, 'peer_meta,'conn_meta) info -> 'a -> 'a) ->
'a
val fold_connected:
('msg, 'meta) pool ->
('msg, 'peer_meta,'conn_meta) pool ->
init:'a ->
f:(P2p_point.Id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
f:(P2p_point.Id.t -> ('msg, 'peer_meta,'conn_meta) info -> 'a -> 'a) ->
'a
val forget : ('msg, 'meta) pool -> P2p_point.Id.t -> unit
val ban : ('msg, 'meta) pool -> P2p_point.Id.t -> unit
val trust : ('msg, 'meta) pool -> P2p_point.Id.t -> unit
val banned : ('msg, 'meta) pool -> P2p_point.Id.t -> bool
val forget : ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit
val ban : ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit
val trust : ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> unit
val banned : ('msg, 'peer_meta,'conn_meta) pool -> P2p_point.Id.t -> bool
end
val watch: ('msg, 'meta) pool -> P2p_connection.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper
val watch:
('msg, 'peer_meta,'conn_meta) pool ->
P2p_connection.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper
(** [watch pool] is a [stream, close] a [stream] of events and a
[close] function for this stream. *)

View File

@ -152,25 +152,67 @@ end
module Ack = struct
type t = Ack | Nack
let ack = MBytes.of_string "\255"
let nack = MBytes.of_string "\000"
type 'a t = Ack of 'a | Nack
let write cryptobox_data fd b =
Crypto.write_chunk cryptobox_data fd
(match b with Ack -> ack | Nack -> nack)
let encoding ack_encoding =
let open Data_encoding in
let ack_encoding = obj1 (req "ack" ack_encoding) in
let nack_encoding = obj1 (req "nack" empty) in
let ack_case tag =
case tag ack_encoding
(function
| Ack param -> Some param
| _ -> None)
(fun param -> Ack param) in
let nack_case tag =
case tag nack_encoding
(function
| Nack -> Some ()
| _ -> None
)
(fun _ -> Nack) in
union [
ack_case (Tag 0) ;
nack_case (Tag 1) ;
]
let read fd cryptobox_data =
let write ack_encoding cryptobox_data fd message =
let encoding = encoding ack_encoding in
let encoded_message_len =
Data_encoding.Binary.length encoding message in
let buf = MBytes.create encoded_message_len in
match Data_encoding.Binary.write encoding message buf 0 encoded_message_len with
| None ->
fail P2p_errors.Encoding_error
| Some last ->
fail_unless (last = encoded_message_len)
P2p_errors.Encoding_error >>=? fun () ->
Crypto.write_chunk cryptobox_data fd buf
let read ack_encoding fd cryptobox_data =
let encoding = encoding ack_encoding in
Crypto.read_chunk fd cryptobox_data >>=? fun buf ->
return (buf <> nack)
let length = MBytes.length buf in
match Data_encoding.Binary.read encoding buf 0 length with
| None ->
fail P2p_errors.Decoding_error
| Some (read_len, message) ->
if read_len <> length then
fail P2p_errors.Decoding_error
else
return message
end
type authenticated_fd =
P2p_io_scheduler.connection * P2p_connection.Info.t * Crypto.data
type 'conn_meta authenticated_fd = {
fd: P2p_io_scheduler.connection ;
info: P2p_connection.Info.t ;
cryptobox_data: Crypto.data ;
ack_encoding: 'conn_meta Data_encoding.t ;
}
let kick (fd, _ , cryptobox_data) =
Ack.write fd cryptobox_data Nack >>= fun _ ->
let kick { fd ; ack_encoding ; cryptobox_data ; _ } =
Ack.write ack_encoding fd cryptobox_data Nack >>= fun _ ->
P2p_io_scheduler.close fd >>= fun _ ->
Lwt.return_unit
@ -180,7 +222,7 @@ let kick (fd, _ , cryptobox_data) =
let authenticate
~proof_of_work_target
~incoming fd (remote_addr, remote_socket_port as point)
?listening_port identity supported_versions =
?listening_port identity supported_versions ack_encoding =
let local_nonce_seed = Crypto_box.random_nonce () in
lwt_debug "Sending authenfication to %a" P2p_point.Id.pp point >>= fun () ->
Connection_message.write fd
@ -210,7 +252,7 @@ let authenticate
{ P2p_connection.Info.peer_id = remote_peer_id ;
versions = msg.versions ; incoming ;
id_point ; remote_socket_port ;} in
return (info, (fd, info, cryptobox_data))
return (info, { fd ; info ; cryptobox_data ; ack_encoding })
type connection = {
id : int ;
@ -449,18 +491,21 @@ let info { conn } = conn.info
let accept
?incoming_message_queue_size ?outgoing_message_queue_size
?binary_chunks_size (fd, info, cryptobox_data) encoding =
?binary_chunks_size
{ fd ; info ; cryptobox_data ; ack_encoding }
ack_param
encoding =
protect begin fun () ->
Ack.write fd cryptobox_data Ack >>=? fun () ->
Ack.read fd cryptobox_data
Ack.write ack_encoding fd cryptobox_data (Ack ack_param) >>=? fun () ->
Ack.read ack_encoding fd cryptobox_data
end ~on_error:begin fun err ->
P2p_io_scheduler.close fd >>= fun _ ->
match err with
| [ P2p_errors.Connection_closed ] -> fail P2p_errors.Rejected_socket_connection
| [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth
| err -> Lwt.return (Error err)
end >>=? fun accepted ->
fail_unless accepted P2p_errors.Rejected_socket_connection >>=? fun () ->
end >>=? function
| Ack ack_cfg ->
let canceler = Lwt_canceler.create () in
let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in
let reader =
@ -475,7 +520,9 @@ let accept
P2p_io_scheduler.close fd >>= fun _ ->
Lwt.return_unit
end ;
return conn
return (conn, ack_cfg)
| Nack ->
fail P2p_errors.Rejected_socket_connection
let catch_closed_pipe f =
Lwt.catch f begin function

View File

@ -19,9 +19,10 @@
(** {1 Types} *)
type authenticated_fd
type 'conn_meta authenticated_fd
(** Type of a connection that successfully passed the authentication
phase, but has not been accepted yet. *)
phase, but has not been accepted yet. Parametrized by the type
of expected parameter in the `ack` message. *)
type 'msg t
(** Type of an accepted connection, parametrized by the type of
@ -39,14 +40,14 @@ val authenticate:
incoming:bool ->
P2p_io_scheduler.connection -> P2p_point.Id.t ->
?listening_port: int ->
P2p_identity.t -> P2p_version.t list ->
(P2p_connection.Info.t * authenticated_fd) tzresult Lwt.t
P2p_identity.t -> P2p_version.t list -> 'conn_meta Data_encoding.t ->
(P2p_connection.Info.t * 'conn_meta authenticated_fd) tzresult Lwt.t
(** (Low-level) (Cancelable) Authentication function of a remote
peer. Used in [P2p_connection_pool], to promote a
[P2P_io_scheduler.connection] into an [authenticated_fd] (auth
correct, acceptation undecided). *)
val kick: authenticated_fd -> unit Lwt.t
val kick: 'conn_meta authenticated_fd -> unit Lwt.t
(** (Low-level) (Cancelable) [kick afd] notifies the remote peer that
we refuse this connection and then closes [afd]. Used in
[P2p_connection_pool] to reject an [aunthenticated_fd] which we do
@ -56,7 +57,8 @@ val accept:
?incoming_message_queue_size:int ->
?outgoing_message_queue_size:int ->
?binary_chunks_size: int ->
authenticated_fd -> 'msg Data_encoding.t -> 'msg t tzresult Lwt.t
'conn_meta authenticated_fd -> 'conn_meta ->
'msg Data_encoding.t -> ('msg t * 'conn_meta) tzresult Lwt.t
(** (Low-level) (Cancelable) Accepts a remote peer given an
authenticated_fd. Used in [P2p_connection_pool], to promote an
[authenticated_fd] to the status of an active peer. *)

View File

@ -9,7 +9,7 @@
include Logging.Make (struct let name = "p2p.welcome" end)
type pool = Pool : ('msg, 'meta) P2p_pool.t -> pool
type pool = Pool : ('msg, 'meta, 'meta_conn) P2p_pool.t -> pool
type t = {
socket: Lwt_unix.file_descr ;

View File

@ -17,7 +17,7 @@ type t
val run:
?addr:P2p_addr.t -> backlog:int ->
('msg, 'meta) P2p_pool.t -> P2p_addr.port -> t Lwt.t
('msg, 'meta, 'meta_conn) P2p_pool.t -> P2p_addr.port -> t Lwt.t
(** [run ?addr ~backlog pool port] returns a running welcome worker
adding connections into [pool] listening on [addr:port]. [backlog]
is passed to [Lwt_unix.listen]. *)

View File

@ -27,12 +27,17 @@ let msg_config : message P2p_pool.message_config = {
type metadata = unit
let meta_config : metadata P2p_pool.meta_config = {
encoding = Data_encoding.empty ;
initial = () ;
let peer_meta_config : metadata P2p_pool.peer_meta_config = {
peer_meta_encoding = Data_encoding.empty ;
peer_meta_initial = () ;
score = fun () -> 0. ;
}
let conn_meta_config : metadata P2p_pool.conn_meta_config = {
conn_meta_encoding = Data_encoding.empty ;
conn_meta_value = (fun _ -> ()) ;
}
let sync ch =
Process.Channel.push ch () >>=? fun () ->
Process.Channel.pop ch >>=? fun () ->
@ -87,7 +92,7 @@ let detach_node f points n =
begin fun channel ->
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
P2p_pool.create
config meta_config msg_config sched >>= fun pool ->
config peer_meta_config conn_meta_config msg_config sched >>= fun pool ->
P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome ->
lwt_log_info "Node ready (port: %d)" port >>= fun () ->
sync channel >>=? fun () ->

View File

@ -95,7 +95,7 @@ let accept sched main_socket =
raw_accept sched main_socket >>= fun (fd, point) ->
P2p_socket.authenticate
~proof_of_work_target
~incoming:true fd point id1 versions
~incoming:true fd point id1 versions Data_encoding.unit
let raw_connect sched addr port =
let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in
@ -109,7 +109,7 @@ let connect sched addr port id =
raw_connect sched addr port >>= fun fd ->
P2p_socket.authenticate
~proof_of_work_target
~incoming:false fd (addr, port) id versions >>=? fun (info, auth_fd) ->
~incoming:false fd (addr, port) id versions Data_encoding.unit >>=? fun (info, auth_fd) ->
_assert (not info.incoming) __LOC__ "" >>=? fun () ->
_assert (P2p_peer.Id.compare info.peer_id id1.peer_id = 0)
__LOC__ "" >>=? fun () ->
@ -172,7 +172,7 @@ module Kick = struct
let client _ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd encoding >>= fun conn ->
P2p_socket.accept auth_fd () encoding >>= fun conn ->
_assert (is_rejected conn) __LOC__ "" >>=? fun () ->
return ()
@ -186,7 +186,7 @@ module Kicked = struct
let server _ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd encoding >>= fun conn ->
P2p_socket.accept auth_fd () encoding >>= fun conn ->
_assert (Kick.is_rejected conn) __LOC__ "" >>=? fun () ->
return ()
@ -208,7 +208,7 @@ module Simple_message = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.write_sync conn simple_msg >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
@ -218,7 +218,7 @@ module Simple_message = struct
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.write_sync conn simple_msg2 >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
@ -240,7 +240,7 @@ module Chunked_message = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept
~binary_chunks_size:21 auth_fd encoding >>=? fun conn ->
~binary_chunks_size:21 auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.write_sync conn simple_msg >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
@ -251,7 +251,7 @@ module Chunked_message = struct
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept
~binary_chunks_size:21 auth_fd encoding >>=? fun conn ->
~binary_chunks_size:21 auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.write_sync conn simple_msg2 >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
@ -272,7 +272,7 @@ module Oversized_message = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.write_sync conn simple_msg >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
@ -282,7 +282,7 @@ module Oversized_message = struct
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.write_sync conn simple_msg2 >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
@ -302,14 +302,14 @@ module Close_on_read = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
sync ch >>=? fun () ->
P2p_socket.close conn >>= fun _stat ->
return ()
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
sync ch >>=? fun () ->
P2p_socket.read conn >>= fun err ->
_assert (is_connection_closed err) __LOC__ "" >>=? fun () ->
@ -328,14 +328,14 @@ module Close_on_write = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.close conn >>= fun _stat ->
sync ch >>=? fun ()->
return ()
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
sync ch >>=? fun ()->
Lwt_unix.sleep 0.1 >>= fun () ->
P2p_socket.write_sync conn simple_msg >>= fun err ->
@ -365,7 +365,7 @@ module Garbled_data = struct
let server _ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.raw_write_sync conn garbled_msg >>=? fun () ->
P2p_socket.read conn >>= fun err ->
_assert (is_connection_closed err) __LOC__ "" >>=? fun () ->
@ -374,7 +374,7 @@ module Garbled_data = struct
let client _ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd encoding >>=? fun conn ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.read conn >>= fun err ->
_assert (is_decoding_error err) __LOC__ "" >>=? fun () ->
P2p_socket.close conn >>= fun _stat ->

View File

@ -8,10 +8,9 @@
(**************************************************************************)
module Message = Distributed_db_message
module Metadata = Distributed_db_metadata
type p2p = (Message.t, Metadata.t) P2p.net
type connection = (Message.t, Metadata.t) P2p.connection
type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net
type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection
type 'a request_param = {
data: 'a ;

View File

@ -15,9 +15,8 @@ type t
type db = t
module Message = Distributed_db_message
module Metadata = Distributed_db_metadata
type p2p = (Message.t, Metadata.t) P2p.net
type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net
val create: State.t -> p2p -> t
val state: db -> State.t

View File

@ -65,19 +65,31 @@ type t = {
shutdown: unit -> unit Lwt.t ;
}
let peer_metadata_cfg : _ P2p.peer_meta_config = {
peer_meta_encoding = Peer_metadata.encoding ;
peer_meta_initial = () ;
score = fun _ -> 0. ;
}
let connection_metadata_cfg : _ P2p.conn_meta_config = {
conn_meta_encoding = Peer_metadata.encoding ;
conn_meta_value = fun _ -> () ;
}
let init_p2p p2p_params =
match p2p_params with
| None ->
lwt_log_notice "P2P layer is disabled" >>= fun () ->
Error_monad.return (P2p.faked_network Distributed_db_metadata.cfg)
return (P2p.faked_network peer_metadata_cfg)
| Some (config, limits) ->
lwt_log_notice "bootstraping chain..." >>= fun () ->
P2p.create
~config ~limits
Distributed_db_metadata.cfg
peer_metadata_cfg
connection_metadata_cfg
Distributed_db_message.cfg >>=? fun p2p ->
Lwt.async (fun () -> P2p.maintain p2p) ;
Error_monad.return p2p
return p2p
type config = {
genesis: State.Chain.genesis ;

View File

@ -8,8 +8,4 @@
(**************************************************************************)
type t = unit
let initial = ()
let encoding = Data_encoding.empty
let score () = 0.
let cfg : _ P2p.meta_config = { encoding ; initial ; score }

View File

@ -7,7 +7,7 @@
(* *)
(**************************************************************************)
(** Tezos Shell - All the (persistent) metadata associated to a peer. *)
(** All the metadata associated to a running connection. *)
type t = unit (* TODO *)
val cfg : t P2p.meta_config
val encoding: t Data_encoding.t

View File

@ -0,0 +1,11 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type t = unit
let encoding = Data_encoding.empty

View File

@ -0,0 +1,13 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(** All the (persistent) metadata associated to a peer. *)
type t = unit (* TODO *)
val encoding: t Data_encoding.t