P2p: implements peer swapping

This commit is contained in:
Grégoire Henry 2017-03-14 10:51:44 +01:00
parent cf0d6d2580
commit 826f2ea4ba
13 changed files with 789 additions and 511 deletions

View File

@ -27,7 +27,7 @@ node="$src_dir/tezos-node"
cleanup () { cleanup () {
set +e set +e
echo Cleaning up... echo Cleaning up...
# rm -rf "$data_dir" rm -rf "$data_dir"
} }
trap cleanup EXIT INT trap cleanup EXIT INT

View File

@ -72,6 +72,7 @@ let default_net_limits : P2p.limits = {
known_peer_ids_history_size = 500 ; known_peer_ids_history_size = 500 ;
max_known_points = Some (400, 300) ; max_known_points = Some (400, 300) ;
max_known_peer_ids = Some (400, 300) ; max_known_peer_ids = Some (400, 300) ;
swap_linger = 30. ;
} }
let default_net = { let default_net = {
@ -115,10 +116,11 @@ let limit : P2p.limits Data_encoding.t =
incoming_message_queue_size ; outgoing_message_queue_size ; incoming_message_queue_size ; outgoing_message_queue_size ;
known_points_history_size ; known_peer_ids_history_size ; known_points_history_size ; known_peer_ids_history_size ;
max_known_points ; max_known_peer_ids ; max_known_points ; max_known_peer_ids ;
swap_linger ;
} -> } ->
( ( authentification_timeout, min_connections, expected_connections, ( ( authentification_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections, max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed) , max_download_speed, max_upload_speed, swap_linger) ,
( read_buffer_size, read_queue_size, write_queue_size, ( read_buffer_size, read_queue_size, write_queue_size,
incoming_app_message_queue_size, incoming_app_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size, incoming_message_queue_size, outgoing_message_queue_size,
@ -127,7 +129,7 @@ let limit : P2p.limits Data_encoding.t =
))) )))
(fun ( ( authentification_timeout, min_connections, expected_connections, (fun ( ( authentification_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections, max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed) , max_download_speed, max_upload_speed, swap_linger) ,
( read_buffer_size, read_queue_size, write_queue_size, ( read_buffer_size, read_queue_size, write_queue_size,
incoming_app_message_queue_size, incoming_app_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size, incoming_message_queue_size, outgoing_message_queue_size,
@ -141,10 +143,9 @@ let limit : P2p.limits Data_encoding.t =
incoming_app_message_queue_size ; incoming_app_message_queue_size ;
incoming_message_queue_size ; outgoing_message_queue_size ; incoming_message_queue_size ; outgoing_message_queue_size ;
known_points_history_size ; known_peer_ids_history_size ; known_points_history_size ; known_peer_ids_history_size ;
max_known_points ; max_known_peer_ids max_known_points ; max_known_peer_ids ; swap_linger })
})
(merge_objs (merge_objs
(obj8 (obj9
(dft "authentification-timeout" (dft "authentification-timeout"
float default_net_limits.authentification_timeout) float default_net_limits.authentification_timeout)
(dft "min-connections" uint16 (dft "min-connections" uint16
@ -158,7 +159,8 @@ let limit : P2p.limits Data_encoding.t =
(dft "max-incoming-connections" uint8 (dft "max-incoming-connections" uint8
default_net_limits.max_incoming_connections) default_net_limits.max_incoming_connections)
(opt "max-download-speed" int31) (opt "max-download-speed" int31)
(opt "max-upload-speed" int31)) (opt "max-upload-speed" int31)
(dft "swap-linger" float default_net_limits.swap_linger))
(obj10 (obj10
(dft "read-buffer-size" int31 (dft "read-buffer-size" int31
default_net_limits.read_buffer_size) default_net_limits.read_buffer_size)

View File

@ -66,6 +66,9 @@ type limits = {
known_points_history_size : int ; known_points_history_size : int ;
max_known_peer_ids : (int * int) option ; max_known_peer_ids : (int * int) option ;
max_known_points : (int * int) option ; max_known_points : (int * int) option ;
swap_linger : float ;
} }
let create_scheduler limits = let create_scheduler limits =
@ -100,6 +103,7 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched =
known_points_history_size = limits.known_points_history_size ; known_points_history_size = limits.known_points_history_size ;
max_known_points = limits.max_known_points ; max_known_points = limits.max_known_points ;
max_known_peer_ids = limits.max_known_peer_ids ; max_known_peer_ids = limits.max_known_peer_ids ;
swap_linger = limits.swap_linger ;
} }
in in
let pool = let pool =
@ -130,7 +134,8 @@ let create_maintenance_worker limits pool disco =
limits.max_connections limits.max_connections
in in
P2p_maintenance.run P2p_maintenance.run
~connection_timeout:limits.authentification_timeout bounds pool disco ~connection_timeout:limits.authentification_timeout
bounds pool disco
let may_create_welcome_worker config limits pool = let may_create_welcome_worker config limits pool =
match config.listening_port with match config.listening_port with
@ -190,14 +195,14 @@ module Real = struct
P2p_io_scheduler.shutdown ~timeout:3.0 net.io_sched P2p_io_scheduler.shutdown ~timeout:3.0 net.io_sched
let connections { pool } () = let connections { pool } () =
P2p_connection_pool.fold_connections pool P2p_connection_pool.Connection.fold pool
~init:[] ~f:(fun _peer_id c acc -> c :: acc) ~init:[] ~f:(fun _peer_id c acc -> c :: acc)
let find_connection { pool } peer_id = let find_connection { pool } peer_id =
P2p_connection_pool.Peer_ids.find_connection pool peer_id P2p_connection_pool.Connection.find_by_peer_id pool peer_id
let connection_info _net conn = let connection_info _net conn =
P2p_connection_pool.connection_info conn P2p_connection_pool.Connection.info conn
let connection_stat _net conn = let connection_stat _net conn =
P2p_connection_pool.connection_stat conn P2p_connection_pool.Connection.stat conn
let global_stat { pool } () = let global_stat { pool } () =
P2p_connection_pool.pool_stat pool P2p_connection_pool.pool_stat pool
let set_metadata { pool } conn meta = let set_metadata { pool } conn meta =
@ -209,12 +214,12 @@ module Real = struct
P2p_connection_pool.read conn >>=? fun msg -> P2p_connection_pool.read conn >>=? fun msg ->
lwt_debug "message read from %a" lwt_debug "message read from %a"
Connection_info.pp Connection_info.pp
(P2p_connection_pool.connection_info conn) >>= fun () -> (P2p_connection_pool.Connection.info conn) >>= fun () ->
return msg return msg
let rec recv_any net () = let rec recv_any net () =
let pipes = let pipes =
P2p_connection_pool.fold_connections P2p_connection_pool.Connection.fold
net.pool ~init:[] net.pool ~init:[]
~f:begin fun _peer_id conn acc -> ~f:begin fun _peer_id conn acc ->
(P2p_connection_pool.is_readable conn >>= function (P2p_connection_pool.is_readable conn >>= function
@ -222,7 +227,7 @@ module Real = struct
| Error _ -> Lwt_utils.never_ending) :: acc | Error _ -> Lwt_utils.never_ending) :: acc
end in end in
Lwt.pick ( Lwt.pick (
( P2p_connection_pool.PoolEvent.wait_new_connection net.pool >>= fun () -> ( P2p_connection_pool.Pool_event.wait_new_connection net.pool >>= fun () ->
Lwt.return_none ):: Lwt.return_none )::
pipes) >>= function pipes) >>= function
| None -> recv_any net () | None -> recv_any net ()
@ -231,12 +236,12 @@ module Real = struct
| Ok msg -> | Ok msg ->
lwt_debug "message read from %a" lwt_debug "message read from %a"
Connection_info.pp Connection_info.pp
(P2p_connection_pool.connection_info conn) >>= fun () -> (P2p_connection_pool.Connection.info conn) >>= fun () ->
Lwt.return (conn, msg) Lwt.return (conn, msg)
| Error _ -> | Error _ ->
lwt_debug "error reading message from %a" lwt_debug "error reading message from %a"
Connection_info.pp Connection_info.pp
(P2p_connection_pool.connection_info conn) >>= fun () -> (P2p_connection_pool.Connection.info conn) >>= fun () ->
Lwt_unix.yield () >>= fun () -> Lwt_unix.yield () >>= fun () ->
recv_any net () recv_any net ()
@ -245,12 +250,12 @@ module Real = struct
| Ok () -> | Ok () ->
lwt_debug "message sent to %a" lwt_debug "message sent to %a"
Connection_info.pp Connection_info.pp
(P2p_connection_pool.connection_info conn) >>= fun () -> (P2p_connection_pool.Connection.info conn) >>= fun () ->
return () return ()
| Error err -> | Error err ->
lwt_debug "error sending message from %a: %a" lwt_debug "error sending message from %a: %a"
Connection_info.pp Connection_info.pp
(P2p_connection_pool.connection_info conn) (P2p_connection_pool.Connection.info conn)
pp_print_error err >>= fun () -> pp_print_error err >>= fun () ->
Lwt.return (Error err) Lwt.return (Error err)
@ -259,12 +264,12 @@ module Real = struct
| Ok v -> | Ok v ->
debug "message trysent to %a" debug "message trysent to %a"
Connection_info.pp Connection_info.pp
(P2p_connection_pool.connection_info conn) ; (P2p_connection_pool.Connection.info conn) ;
v v
| Error err -> | Error err ->
debug "error trysending message to %a@ %a" debug "error trysending message to %a@ %a"
Connection_info.pp Connection_info.pp
(P2p_connection_pool.connection_info conn) (P2p_connection_pool.Connection.info conn)
pp_print_error err ; pp_print_error err ;
false false
@ -273,10 +278,10 @@ module Real = struct
debug "message broadcasted" debug "message broadcasted"
let fold_connections { pool } ~init ~f = let fold_connections { pool } ~init ~f =
P2p_connection_pool.fold_connections pool ~init ~f P2p_connection_pool.Connection.fold pool ~init ~f
let iter_connections { pool } f = let iter_connections { pool } f =
P2p_connection_pool.fold_connections pool P2p_connection_pool.Connection.fold pool
~init:() ~init:()
~f:(fun gid conn () -> f gid conn) ~f:(fun gid conn () -> f gid conn)
@ -315,7 +320,7 @@ type ('msg, 'meta) t = {
connection_info : ('msg, 'meta) connection -> Connection_info.t ; connection_info : ('msg, 'meta) connection -> Connection_info.t ;
connection_stat : ('msg, 'meta) connection -> Stat.t ; connection_stat : ('msg, 'meta) connection -> Stat.t ;
global_stat : unit -> Stat.t ; global_stat : unit -> Stat.t ;
get_metadata : Peer_id.t -> 'meta option ; get_metadata : Peer_id.t -> 'meta ;
set_metadata : Peer_id.t -> 'meta -> unit ; set_metadata : Peer_id.t -> 'meta -> unit ;
recv : ('msg, 'meta) connection -> 'msg tzresult Lwt.t ; recv : ('msg, 'meta) connection -> 'msg tzresult Lwt.t ;
recv_any : unit -> (('msg, 'meta) connection * 'msg) Lwt.t ; recv_any : unit -> (('msg, 'meta) connection * 'msg) Lwt.t ;
@ -355,7 +360,7 @@ let create ~config ~limits meta_cfg msg_cfg =
on_new_connection = Real.on_new_connection net ; on_new_connection = Real.on_new_connection net ;
} }
let faked_network = { let faked_network meta_config = {
peer_id = Fake.id.peer_id ; peer_id = Fake.id.peer_id ;
maintain = Lwt.return ; maintain = Lwt.return ;
roll = Lwt.return ; roll = Lwt.return ;
@ -365,7 +370,7 @@ let faked_network = {
connection_info = (fun _ -> Fake.connection_info) ; connection_info = (fun _ -> Fake.connection_info) ;
connection_stat = (fun _ -> Fake.empty_stat) ; connection_stat = (fun _ -> Fake.empty_stat) ;
global_stat = (fun () -> Fake.empty_stat) ; global_stat = (fun () -> Fake.empty_stat) ;
get_metadata = (fun _ -> None) ; get_metadata = (fun _ -> meta_config.initial) ;
set_metadata = (fun _ _ -> ()) ; set_metadata = (fun _ _ -> ()) ;
recv = (fun _ -> Lwt_utils.never_ending) ; recv = (fun _ -> Lwt_utils.never_ending) ;
recv_any = (fun () -> Lwt_utils.never_ending) ; recv_any = (fun () -> Lwt_utils.never_ending) ;
@ -402,6 +407,8 @@ module Raw = struct
type 'a t = 'a P2p_connection_pool.Message.t = type 'a t = 'a P2p_connection_pool.Message.t =
| Bootstrap | Bootstrap
| Advertise of P2p_types.Point.t list | Advertise of P2p_types.Point.t list
| Swap_request of Point.t * Peer_id.t
| Swap_ack of Point.t * Peer_id.t
| Message of 'a | Message of 'a
| Disconnect | Disconnect
let encoding = P2p_connection_pool.Message.encoding let encoding = P2p_connection_pool.Message.encoding
@ -414,7 +421,7 @@ module RPC = struct
| None -> Stat.empty | None -> Stat.empty
| Some pool -> P2p_connection_pool.pool_stat pool | Some pool -> P2p_connection_pool.pool_stat pool
module Event = P2p_connection_pool.LogEvent module Event = P2p_connection_pool.Log_event
let watch net = let watch net =
match net.pool with match net.pool with
@ -433,14 +440,14 @@ module RPC = struct
| None -> None | None -> None
| Some pool -> | Some pool ->
map_option map_option
(P2p_connection_pool.Peer_ids.find_connection pool peer_id) (P2p_connection_pool.Connection.find_by_peer_id pool peer_id)
~f:P2p_connection_pool.connection_info ~f:P2p_connection_pool.Connection.info
let kick net peer_id wait = let kick net peer_id wait =
match net.pool with match net.pool with
| None -> Lwt.return_unit | None -> Lwt.return_unit
| Some pool -> | Some pool ->
match P2p_connection_pool.Peer_ids.find_connection pool peer_id with match P2p_connection_pool.Connection.find_by_peer_id pool peer_id with
| None -> Lwt.return_unit | None -> Lwt.return_unit
| Some conn -> P2p_connection_pool.disconnect ~wait conn | Some conn -> P2p_connection_pool.disconnect ~wait conn
@ -448,10 +455,10 @@ module RPC = struct
match net.pool with match net.pool with
| None -> [] | None -> []
| Some pool -> | Some pool ->
P2p_connection_pool.fold_connections P2p_connection_pool.Connection.fold
pool ~init:[] pool ~init:[]
~f:begin fun _peer_id c acc -> ~f:begin fun _peer_id c acc ->
P2p_connection_pool.connection_info c :: acc P2p_connection_pool.Connection.info c :: acc
end end
let count net = let count net =
@ -703,12 +710,11 @@ module RPC = struct
| Disconnected -> Disconnected, None | Disconnected -> Disconnected, None
in in
let peer_id = Peer_info.peer_id i in let peer_id = Peer_info.peer_id i in
let meta = Peer_info.metadata i in let score = Peer_ids.get_score pool peer_id in
let score = P2p_connection_pool.score pool meta in
let stat = let stat =
match P2p_connection_pool.Peer_ids.find_connection pool peer_id with match P2p_connection_pool.Connection.find_by_peer_id pool peer_id with
| None -> Stat.empty | None -> Stat.empty
| Some conn -> P2p_connection_pool.connection_stat conn | Some conn -> P2p_connection_pool.Connection.stat conn
in Peer_info.{ in Peer_info.{
score ; score ;
trusted = trusted i ; trusted = trusted i ;

View File

@ -122,6 +122,11 @@ type limits = {
max_known_peer_ids : (int * int) option ; max_known_peer_ids : (int * int) option ;
max_known_points : (int * int) option ; max_known_points : (int * int) option ;
(** Optional limitation of internal hashtables (max, target) *) (** Optional limitation of internal hashtables (max, target) *)
swap_linger : float ;
(** Peer swapping does not occur more than once during a timespan of
[swap_linger] seconds. *)
} }
type ('msg, 'meta) t type ('msg, 'meta) t
@ -129,7 +134,7 @@ type ('msg, 'meta) net = ('msg, 'meta) t
(** A faked p2p layer, which do not initiate any connection (** A faked p2p layer, which do not initiate any connection
nor open any listening socket *) nor open any listening socket *)
val faked_network : ('msg, 'meta) net val faked_network : 'meta meta_config -> ('msg, 'meta) net
(** Main network initialisation function *) (** Main network initialisation function *)
val create : val create :
@ -165,7 +170,7 @@ val connection_stat :
val global_stat : ('msg, 'meta) net -> Stat.t val global_stat : ('msg, 'meta) net -> Stat.t
(** Accessors for meta information about a global identifier *) (** Accessors for meta information about a global identifier *)
val get_metadata : ('msg, 'meta) net -> Peer_id.t -> 'meta option val get_metadata : ('msg, 'meta) net -> Peer_id.t -> 'meta
val set_metadata : ('msg, 'meta) net -> Peer_id.t -> 'meta -> unit val set_metadata : ('msg, 'meta) net -> Peer_id.t -> 'meta -> unit
(** Wait for a message from a given connection. *) (** Wait for a message from a given connection. *)
@ -193,7 +198,7 @@ module RPC : sig
val stat : ('msg, 'meta) net -> Stat.t val stat : ('msg, 'meta) net -> Stat.t
module Event = P2p_connection_pool.LogEvent module Event = P2p_connection_pool.Log_event
val watch : ('msg, 'meta) net -> Event.t Lwt_stream.t * Watcher.stopper val watch : ('msg, 'meta) net -> Event.t Lwt_stream.t * Watcher.stopper
val connect : ('msg, 'meta) net -> Point.t -> float -> unit tzresult Lwt.t val connect : ('msg, 'meta) net -> Point.t -> float -> unit tzresult Lwt.t
@ -301,6 +306,8 @@ module Raw : sig
type 'a t = type 'a t =
| Bootstrap | Bootstrap
| Advertise of P2p_types.Point.t list | Advertise of P2p_types.Point.t list
| Swap_request of Point.t * Peer_id.t
| Swap_ack of Point.t * Peer_id.t
| Message of 'a | Message of 'a
| Disconnect | Disconnect
val encoding: 'msg app_message_encoding list -> 'msg t Data_encoding.t val encoding: 'msg app_message_encoding list -> 'msg t Data_encoding.t

View File

@ -206,11 +206,16 @@ let authenticate
return (info, (fd, info, cryptobox_data)) return (info, (fd, info, cryptobox_data))
type connection = { type connection = {
id : int ;
info : Connection_info.t ; info : Connection_info.t ;
fd : P2p_io_scheduler.connection ; fd : P2p_io_scheduler.connection ;
cryptobox_data : Crypto.data ; cryptobox_data : Crypto.data ;
} }
let next_conn_id =
let cpt = ref 0 in
fun () -> incr cpt ;!cpt
module Reader = struct module Reader = struct
type 'msg t = { type 'msg t = {
@ -349,6 +354,9 @@ type 'msg t = {
writer : 'msg Writer.t ; writer : 'msg Writer.t ;
} }
let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2
let pp ppf { conn } = Connection_info.pp ppf conn.info let pp ppf { conn } = Connection_info.pp ppf conn.info
let info { conn } = conn.info let info { conn } = conn.info
@ -367,7 +375,7 @@ let accept
end >>=? fun accepted -> end >>=? fun accepted ->
fail_unless accepted Rejected >>=? fun () -> fail_unless accepted Rejected >>=? fun () ->
let canceler = Canceler.create () in let canceler = Canceler.create () in
let conn = { fd ; info ; cryptobox_data } in let conn = { id = next_conn_id (); fd ; info ; cryptobox_data } in
let reader = let reader =
Reader.run ?size:incoming_message_queue_size conn encoding canceler Reader.run ?size:incoming_message_queue_size conn encoding canceler
and writer = and writer =

View File

@ -36,6 +36,8 @@ type 'msg t
(** Type of an accepted connection, parametrized by the type of (** Type of an accepted connection, parametrized by the type of
messages exchanged between peers. *) messages exchanged between peers. *)
val equal: 'mst t -> 'msg t -> bool
val pp : Format.formatter -> 'msg t -> unit val pp : Format.formatter -> 'msg t -> unit
val info: 'msg t -> Connection_info.t val info: 'msg t -> Connection_info.t

File diff suppressed because it is too large Load Diff

View File

@ -108,6 +108,11 @@ type config = {
max_known_peer_ids : (int * int) option ; max_known_peer_ids : (int * int) option ;
(** Like [max_known_points], but for known peer_ids. *) (** Like [max_known_points], but for known peer_ids. *)
swap_linger : float ;
(** Peer swapping does not occur more than once during a timespan of
[spap_linger] seconds. *)
} }
type 'meta meta_config = { type 'meta meta_config = {
@ -142,13 +147,12 @@ val pool_stat: ('msg, 'meta) pool -> Stat.t
(** [pool_stat pool] is a snapshot of current bandwidth usage for the (** [pool_stat pool] is a snapshot of current bandwidth usage for the
entire [pool]. *) entire [pool]. *)
val score: ('msg, 'meta) pool -> 'meta -> float val send_swap_request: ('msg, 'meta) pool -> unit
(** [score pool meta] is the floating-point score of [meta] using
[pool]'s metrics. *)
(** {2 Pool events} *) (** {2 Pool events} *)
module PoolEvent : sig module Pool_event : sig
val wait_too_few_connections: ('msg, 'meta) pool -> unit Lwt.t val wait_too_few_connections: ('msg, 'meta) pool -> unit Lwt.t
(** [wait_too_few_connections pool] is determined when the number of (** [wait_too_few_connections pool] is determined when the number of
connections drops below the desired level. *) connections drops below the desired level. *)
@ -164,54 +168,9 @@ module PoolEvent : sig
val wait_new_connection: ('msg, 'meta) pool -> unit Lwt.t val wait_new_connection: ('msg, 'meta) pool -> unit Lwt.t
(** [wait_new_connection pool] is determined when a new connection is (** [wait_new_connection pool] is determined when a new connection is
succesfully established in the pool. *) succesfully established in the pool. *)
end end
module LogEvent : sig
type t =
(** Pool-level events *)
| Too_few_connections
| Too_many_connections
| New_point of Point.t
| New_peer of Peer_id.t
(** Connection-level events *)
| Incoming_connection of Point.t
(** We accept(2)-ed an incoming connection *)
| Outgoing_connection of Point.t
(** We connect(2)-ed to a remote endpoint *)
| Authentication_failed of Point.t
(** Remote point failed authentication *)
| Accepting_request of Point.t * Id_point.t * Peer_id.t
(** We accepted a connection after authentifying the remote peer. *)
| Rejecting_request of Point.t * Id_point.t * Peer_id.t
(** We rejected a connection after authentifying the remote peer. *)
| Request_rejected of Point.t * (Id_point.t * Peer_id.t) option
(** The remote peer rejected our connection. *)
| Connection_established of Id_point.t * Peer_id.t
(** We succesfully established a authentified connection. *)
| Disconnection of Peer_id.t
(** We decided to close the connection. *)
| External_disconnection of Peer_id.t
(** The connection was closed for external reason. *)
| Gc_points
(** Garbage collection of known point table has been triggered. *)
| Gc_peer_ids
(** Garbage collection of known peer_ids table has been triggered. *)
val encoding : t Data_encoding.t
end
val watch: ('msg, 'meta) pool -> LogEvent.t Lwt_stream.t * Watcher.stopper
(** [watch pool] is a [stream, close] a [stream] of events and a
[close] function for this stream. *)
(** {1 Connections management} *) (** {1 Connections management} *)
type ('msg, 'meta) connection type ('msg, 'meta) connection
@ -245,18 +204,31 @@ val disconnect:
(** [disconnect conn] cleanly closes [conn] and returns after [conn]'s (** [disconnect conn] cleanly closes [conn] and returns after [conn]'s
internal worker has returned. *) internal worker has returned. *)
val connection_info: ('msg, 'meta) connection -> Connection_info.t module Connection : sig
val connection_stat: ('msg, 'meta) connection -> Stat.t val info: ('msg, 'meta) connection -> Connection_info.t
val stat: ('msg, 'meta) connection -> Stat.t
(** [stat conn] is a snapshot of current bandwidth usage for (** [stat conn] is a snapshot of current bandwidth usage for
[conn]. *) [conn]. *)
val fold_connections: val fold:
('msg, 'meta) pool -> ('msg, 'meta) pool ->
init:'a -> init:'a ->
f:(Peer_id.t -> ('msg, 'meta) connection -> 'a -> 'a) -> f:(Peer_id.t -> ('msg, 'meta) connection -> 'a -> 'a) ->
'a 'a
val list:
('msg, 'meta) pool -> (Peer_id.t * ('msg, 'meta) connection) list
val find_by_point:
('msg, 'meta) pool -> Point.t -> ('msg, 'meta) connection option
val find_by_peer_id:
('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) connection option
end
val on_new_connection: val on_new_connection:
('msg, 'meta) pool -> ('msg, 'meta) pool ->
(Peer_id.t -> ('msg, 'meta) connection -> unit) -> unit (Peer_id.t -> ('msg, 'meta) connection -> unit) -> unit
@ -304,17 +276,14 @@ module Peer_ids : sig
val info: val info:
('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) info option ('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) info option
val get_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta option val get_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta
val set_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta -> unit val set_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta -> unit
val get_score: ('msg, 'meta) pool -> Peer_id.t -> float option val get_score: ('msg, 'meta) pool -> Peer_id.t -> float
val get_trusted: ('msg, 'meta) pool -> Peer_id.t -> bool val get_trusted: ('msg, 'meta) pool -> Peer_id.t -> bool
val set_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit val set_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit
val unset_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit val unset_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit
val find_connection:
('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) connection option
val fold_known: val fold_known:
('msg, 'meta) pool -> ('msg, 'meta) pool ->
init:'a -> init:'a ->
@ -342,9 +311,6 @@ module Points : sig
val set_trusted: ('msg, 'meta) pool -> Point.t -> unit val set_trusted: ('msg, 'meta) pool -> Point.t -> unit
val unset_trusted: ('msg, 'meta) pool -> Point.t -> unit val unset_trusted: ('msg, 'meta) pool -> Point.t -> unit
val find_connection:
('msg, 'meta) pool -> Point.t -> ('msg, 'meta) connection option
val fold_known: val fold_known:
('msg, 'meta) pool -> ('msg, 'meta) pool ->
init:'a -> init:'a ->
@ -359,6 +325,70 @@ module Points : sig
end end
module Log_event : sig
type t =
(** Pool-level events *)
| Too_few_connections
| Too_many_connections
| New_point of Point.t
| New_peer of Peer_id.t
| Gc_points
(** Garbage collection of known point table has been triggered. *)
| Gc_peer_ids
(** Garbage collection of known peer_ids table has been triggered. *)
(** Connection-level events *)
| Incoming_connection of Point.t
(** We accept(2)-ed an incoming connection *)
| Outgoing_connection of Point.t
(** We connect(2)-ed to a remote endpoint *)
| Authentication_failed of Point.t
(** Remote point failed authentication *)
| Accepting_request of Point.t * Id_point.t * Peer_id.t
(** We accepted a connection after authentifying the remote peer. *)
| Rejecting_request of Point.t * Id_point.t * Peer_id.t
(** We rejected a connection after authentifying the remote peer. *)
| Request_rejected of Point.t * (Id_point.t * Peer_id.t) option
(** The remote peer rejected our connection. *)
| Connection_established of Id_point.t * Peer_id.t
(** We succesfully established a authentified connection. *)
| Swap_request_received of { source : Peer_id.t }
(** A swap request has been received. *)
| Swap_ack_received of { source : Peer_id.t }
(** A swap ack has been received *)
| Swap_request_sent of { source : Peer_id.t }
(** A swap request has been sent *)
| Swap_ack_sent of { source : Peer_id.t }
(** A swap ack has been sent *)
| Swap_request_ignored of { source : Peer_id.t }
(** A swap request has been ignored *)
| Swap_success of { source : Peer_id.t }
(** A swap operation has succeeded *)
| Swap_failure of { source : Peer_id.t }
(** A swap operation has failed *)
| Disconnection of Peer_id.t
(** We decided to close the connection. *)
| External_disconnection of Peer_id.t
(** The connection was closed for external reason. *)
val encoding : t Data_encoding.t
end
val watch: ('msg, 'meta) pool -> Log_event.t Lwt_stream.t * Watcher.stopper
(** [watch pool] is a [stream, close] a [stream] of events and a
[close] function for this stream. *)
(**/**) (**/**)
module Message : sig module Message : sig
@ -366,6 +396,8 @@ module Message : sig
type 'msg t = type 'msg t =
| Bootstrap | Bootstrap
| Advertise of Point.t list | Advertise of Point.t list
| Swap_request of Point.t * Peer_id.t
| Swap_ack of Point.t * Peer_id.t
| Message of 'msg | Message of 'msg
| Disconnect | Disconnect

View File

@ -133,6 +133,7 @@ module Point_info : sig
} }
val encoding : t Data_encoding.t val encoding : t Data_encoding.t
end end
val fold_events : val fold_events :

View File

@ -29,7 +29,7 @@ type 'meta t = {
disco: P2p_discovery.t option ; disco: P2p_discovery.t option ;
just_maintained: unit Lwt_condition.t ; just_maintained: unit Lwt_condition.t ;
please_maintain: unit Lwt_condition.t ; please_maintain: unit Lwt_condition.t ;
mutable worker : unit Lwt.t ; mutable maintain_worker : unit Lwt.t ;
} }
(** Select [expected] points amongst the disconnected known points. (** Select [expected] points amongst the disconnected known points.
@ -37,6 +37,7 @@ type 'meta t = {
failed after [start_time]. It first selects points with the oldest failed after [start_time]. It first selects points with the oldest
last tentative. *) last tentative. *)
let connectable st start_time expected = let connectable st start_time expected =
let Pool pool = st.pool in
let now = Time.now () in let now = Time.now () in
let module Bounded_point_info = let module Bounded_point_info =
Utils.Bounded(struct Utils.Bounded(struct
@ -49,9 +50,7 @@ let connectable st start_time expected =
| Some t1, Some t2 -> Time.compare t2 t1 | Some t1, Some t2 -> Time.compare t2 t1
end) in end) in
let acc = Bounded_point_info.create expected in let acc = Bounded_point_info.create expected in
let Pool pool = st.pool in P2p_connection_pool.Points.fold_known pool ~init:()
P2p_connection_pool.Points.fold_known
pool ~init:()
~f:begin fun point pi () -> ~f:begin fun point pi () ->
match Point_info.State.get pi with match Point_info.State.get pi with
| Disconnected -> begin | Disconnected -> begin
@ -125,7 +124,7 @@ and too_few_connections st n_connected =
P2p_connection_pool.broadcast_bootstrap_msg pool ; P2p_connection_pool.broadcast_bootstrap_msg pool ;
Lwt_utils.protect ~canceler:st.canceler begin fun () -> Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Lwt.pick [ Lwt.pick [
P2p_connection_pool.PoolEvent.wait_new_peer pool ; P2p_connection_pool.Pool_event.wait_new_peer pool ;
Lwt_unix.sleep 5.0 (* TODO exponential back-off ?? Lwt_unix.sleep 5.0 (* TODO exponential back-off ??
or wait for the existence of a or wait for the existence of a
non grey-listed peer ?? *) non grey-listed peer ?? *)
@ -139,7 +138,7 @@ and too_many_connections st n_connected =
(* too many connections, start the russian roulette *) (* too many connections, start the russian roulette *)
let to_kill = n_connected - st.bounds.max_target in let to_kill = n_connected - st.bounds.max_target in
lwt_debug "Too many connections, will kill %d" to_kill >>= fun () -> lwt_debug "Too many connections, will kill %d" to_kill >>= fun () ->
snd @@ P2p_connection_pool.fold_connections pool snd @@ P2p_connection_pool.Connection.fold pool
~init:(to_kill, Lwt.return_unit) ~init:(to_kill, Lwt.return_unit)
~f:(fun _ conn (i, t) -> ~f:(fun _ conn (i, t) ->
if i = 0 then (0, t) if i = 0 then (0, t)
@ -148,33 +147,43 @@ and too_many_connections st n_connected =
maintain st maintain st
let rec worker_loop st = let rec worker_loop st =
begin
let Pool pool = st.pool in let Pool pool = st.pool in
begin
Lwt_utils.protect ~canceler:st.canceler begin fun () -> Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Lwt.pick [ Lwt.pick [
Lwt_unix.sleep 120. ; (* every two minutes *) Lwt_unix.sleep 120. ; (* every two minutes *)
Lwt_condition.wait st.please_maintain ; (* when asked *) Lwt_condition.wait st.please_maintain ; (* when asked *)
P2p_connection_pool.PoolEvent.wait_too_few_connections pool ; (* limits *) P2p_connection_pool.Pool_event.wait_too_few_connections pool ; (* limits *)
P2p_connection_pool.PoolEvent.wait_too_many_connections pool P2p_connection_pool.Pool_event.wait_too_many_connections pool
] >>= fun () -> ] >>= fun () ->
return () return ()
end >>=? fun () -> end >>=? fun () ->
let n_connected = P2p_connection_pool.active_connections pool in
if n_connected < st.bounds.min_threshold
|| st.bounds.max_threshold < n_connected then
maintain st maintain st
else begin
P2p_connection_pool.send_swap_request pool ;
return ()
end
end >>= function end >>= function
| Ok () -> worker_loop st | Ok () -> worker_loop st
| Error [Lwt_utils.Canceled] -> Lwt.return_unit | Error [Lwt_utils.Canceled] -> Lwt.return_unit
| Error _ -> Lwt.return_unit | Error _ -> Lwt.return_unit
let run ?(connection_timeout = 5.) bounds pool disco = let run ~connection_timeout bounds pool disco =
let canceler = Canceler.create () in let canceler = Canceler.create () in
let st = { let st = {
canceler ; connection_timeout ; canceler ;
bounds ; pool = Pool pool ; disco ; connection_timeout ;
bounds ;
pool = Pool pool ;
disco ;
just_maintained = Lwt_condition.create () ; just_maintained = Lwt_condition.create () ;
please_maintain = Lwt_condition.create () ; please_maintain = Lwt_condition.create () ;
worker = Lwt.return_unit ; maintain_worker = Lwt.return_unit ;
} in } in
st.worker <- st.maintain_worker <-
Lwt_utils.worker "maintenance" Lwt_utils.worker "maintenance"
(fun () -> worker_loop st) (fun () -> worker_loop st)
(fun () -> Canceler.cancel canceler) ; (fun () -> Canceler.cancel canceler) ;
@ -185,8 +194,12 @@ let maintain { just_maintained ; please_maintain } =
Lwt_condition.broadcast please_maintain () ; Lwt_condition.broadcast please_maintain () ;
wait wait
let shutdown { canceler ; worker ; just_maintained } = let shutdown {
canceler ;
maintain_worker ;
just_maintained } =
Canceler.cancel canceler >>= fun () -> Canceler.cancel canceler >>= fun () ->
worker >>= fun () -> maintain_worker >>= fun () ->
Lwt_condition.broadcast just_maintained () ; Lwt_condition.broadcast just_maintained () ;
Lwt.return_unit Lwt.return_unit

View File

@ -34,7 +34,7 @@ type 'meta t
(** Type of a maintenance worker. *) (** Type of a maintenance worker. *)
val run: val run:
?connection_timeout:float -> connection_timeout:float ->
bounds -> bounds ->
('msg, 'meta) P2p_connection_pool.t -> ('msg, 'meta) P2p_connection_pool.t ->
P2p_discovery.t option -> P2p_discovery.t option ->

View File

@ -70,7 +70,7 @@ let init_p2p net_params =
match net_params with match net_params with
| None -> | None ->
lwt_log_notice "P2P layer is disabled" >>= fun () -> lwt_log_notice "P2P layer is disabled" >>= fun () ->
Lwt.return P2p.faked_network Lwt.return (P2p.faked_network Distributed_db_metadata.cfg)
| Some (config, limits) -> | Some (config, limits) ->
lwt_log_notice "bootstraping network..." >>= fun () -> lwt_log_notice "bootstraping network..." >>= fun () ->
P2p.create P2p.create

View File

@ -40,7 +40,7 @@ let rec connect ~timeout pool point =
lwt_log_info "Connect to %a" Point.pp point >>= fun () -> lwt_log_info "Connect to %a" Point.pp point >>= fun () ->
P2p_connection_pool.connect pool point ~timeout >>= function P2p_connection_pool.connect pool point ~timeout >>= function
| Error [P2p_connection_pool.Connected] -> begin | Error [P2p_connection_pool.Connected] -> begin
match P2p_connection_pool.Points.find_connection pool point with match P2p_connection_pool.Connection.find_by_point pool point with
| Some conn -> return conn | Some conn -> return conn
| None -> failwith "Woops..." | None -> failwith "Woops..."
end end
@ -148,6 +148,7 @@ let make_net points repeat n =
known_points_history_size = 100 ; known_points_history_size = 100 ;
max_known_points = None ; max_known_points = None ;
max_known_peer_ids = None ; max_known_peer_ids = None ;
swap_linger = 0. ;
} in } in
Process.detach Process.detach
~prefix:(Format.asprintf "%a " Peer_id.pp identity.peer_id) ~prefix:(Format.asprintf "%a " Peer_id.pp identity.peer_id)