P2P: doc and associated minor changes.

This commit is contained in:
Vincent Bernardoff 2018-02-20 18:28:05 +01:00 committed by Benjamin Canou
parent 79e39b15eb
commit 684fe1110b
17 changed files with 332 additions and 317 deletions

View File

@ -60,6 +60,7 @@ license when the main network lunches.
:caption: White doc: :caption: White doc:
whitedoc/the_big_picture whitedoc/the_big_picture
whitedoc/p2p
whitedoc/validation whitedoc/validation
whitedoc/michelson whitedoc/michelson
whitedoc/proof_of_stake whitedoc/proof_of_stake

114
docs/whitedoc/p2p.rst Normal file
View File

@ -0,0 +1,114 @@
.. _p2p:
The peer-to-peer layer
======================
This document explains the inner workings of the peer-to-peer layer of
the Tezos shell. This part is in charge of establishing and
maintaining network connections with other nodes (gossip).
The P2P layer is instanciated by the node. It is parametrized by the
type of messages that are exchanged over the network (to allow
different P2P protocol versions/extensions), and the type of metadata
associated to each peer. The latter is useful to compute a score for
each peer that reflects the level of trust we have in it. Different
policies can be used when communicating with peers with different
score values.
The P2P layer is comprised of a pool of connections, a set of
operations on those connections, and a set of workers following the
worker pattern pervasively used in the code base.
The P2P layer is packaged in :package:`tezos-p2p`, which has
documentation for all modules.
General operation
-----------------
I/O Scheduling
~~~~~~~~~~~~~~
The P2P layer uses I/O scheduling in order to be able to control its
bandwidth usage as well as implementing different policies
(e.g. read/write quotas) to different peers. For now, each peer is
granted a fair share of the global allocated bandwidth, but it is
planned for the individual allocated bandwidth to each peer to be a
function of the peer's score.
Encryption
~~~~~~~~~~
The connection between each peer is encrypted using `NaCl`
authenticated-encryption `API <http://nacl.cr.yp.to/box.html>`__. This
is done to provide an additional level of security and tamper-proof
guarantees in the communication between peers.
Message queues
~~~~~~~~~~~~~~
On top of basic I/O scheduling, two finite-size typed message queues
are used to store incoming (resp. outgoing) messages for each
peer. This further restricts the speed at which communication is
possible with a peer; when a queue is full, it is not possible to read
(resp. write) an additional message. The high-level
`P2p_socket.connection
<../api/odoc/tezos-p2p/Tezos_p2p/P2p_socket/index.html#type-connection>`__
type by the P2P layer is basically a UNIX socket upgraded with I/O
scheduling, peer metadata, cryptographic keys and two messages queues
operated by dedicated workers which operate on those queues.
Pool of connections
~~~~~~~~~~~~~~~~~~~
All the above modules are used in `P2p_pool
<../api/odoc/tezos-p2p/Tezos_p2p/P2p_pool/index.html>`__, which
constitutes the core of the P2P layer, together with the worker
processes described below. It comprises various tables of connections
as well as methods to query them, also connections are extended with
another message queue where lower level messages (like responses to
ping) are filtered out and only application-level messages are kept.
The main entry point of the P2P layer is in module `P2p
<../api/odoc/tezos-p2p/Tezos_p2p/P2p/index.html>`__. See below
for a description of workers acting onto the P2P layer.
Welcome worker
--------------
The welcome worker is responsible for accepting incoming connections
and register them into the pool of connections managed by the P2P
layer. It basically runs the ``accept(2)`` syscall and call
`P2p_pool.accept
<../api/odoc/tezos-p2p/Tezos_p2p/P2p_pool/index.html#val-accept>`__ so
that it is made aware of an incoming connection. From there, the pool
will decide how this new connection must be handled.
Maintenance worker
------------------
The maintenance worker is in charge of establishing an appropriate
number of connections with other nodes in order to guarantee a
realistic view of the state of the blockchain. It is created with a
set of targets to reach regarding the desired amount of peers it needs
to keep an active connection to.
At the pool level, the minimum (resp. maximum) acceptable number of
connections is defined.
At the maintenance worker level, two other sets of thresholds are
defined: ``target`` (min and max) and ``threshold`` (min and max).
Given these bounds, the maintenance worker:
* Will be triggered every two minutes, when asked by the shell, or
when the minimum or maximum number of acceptable connections is
reached, whichever happens first.
* Will perform the following actions when triggered: if the number of
connections is above ``max_threshold``, it will kill connections
randomly until it reaches ``max_target`` connections. If the number of
connections is below ``min_threshold``, it will attempt to connect to
peers until it reaches at least ``min_target`` connections (and never
more than ``max_target`` connections).

View File

@ -60,7 +60,8 @@ and shell = {
} }
let default_p2p_limits : P2p.limits = { let default_p2p_limits : P2p.limits = {
authentification_timeout = 5. ; connection_timeout = 10. ;
authentication_timeout = 5. ;
min_connections = 10 ; min_connections = 10 ;
expected_connections = 50 ; expected_connections = 50 ;
max_connections = 100 ; max_connections = 100 ;
@ -158,7 +159,7 @@ let default_config = {
let limit : P2p.limits Data_encoding.t = let limit : P2p.limits Data_encoding.t =
let open Data_encoding in let open Data_encoding in
conv conv
(fun { P2p.authentification_timeout ; (fun { P2p.connection_timeout ; authentication_timeout ;
min_connections ; expected_connections ; max_connections ; min_connections ; expected_connections ; max_connections ;
backlog ; max_incoming_connections ; backlog ; max_incoming_connections ;
max_download_speed ; max_upload_speed ; max_download_speed ; max_upload_speed ;
@ -169,27 +170,27 @@ let limit : P2p.limits Data_encoding.t =
max_known_points ; max_known_peer_ids ; max_known_points ; max_known_peer_ids ;
swap_linger ; binary_chunks_size swap_linger ; binary_chunks_size
} -> } ->
( ( authentification_timeout, min_connections, expected_connections, (((( connection_timeout,
authentication_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections, max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed, swap_linger, max_download_speed, max_upload_speed, swap_linger),
binary_chunks_size) , ( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size,
( read_buffer_size, read_queue_size, write_queue_size,
incoming_app_message_queue_size, incoming_app_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size, incoming_message_queue_size, outgoing_message_queue_size,
known_points_history_size, known_peer_ids_history_size, known_points_history_size, known_peer_ids_history_size,
max_known_points, max_known_peer_ids max_known_points)),
))) max_known_peer_ids)))
(fun ( ( authentification_timeout, min_connections, expected_connections, (fun (((( connection_timeout,
authentication_timeout, min_connections, expected_connections,
max_connections, backlog, max_incoming_connections, max_connections, backlog, max_incoming_connections,
max_download_speed, max_upload_speed, swap_linger, max_download_speed, max_upload_speed, swap_linger),
binary_chunks_size) , ( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size,
( read_buffer_size, read_queue_size, write_queue_size,
incoming_app_message_queue_size, incoming_app_message_queue_size,
incoming_message_queue_size, outgoing_message_queue_size, incoming_message_queue_size, outgoing_message_queue_size,
known_points_history_size, known_peer_ids_history_size, known_points_history_size, known_peer_ids_history_size,
max_known_points, max_known_peer_ids max_known_points)),
) ) -> max_known_peer_ids)) ->
{ authentification_timeout ; min_connections ; expected_connections ; { connection_timeout ; authentication_timeout ; min_connections ; expected_connections ;
max_connections ; backlog ; max_incoming_connections ; max_connections ; backlog ; max_incoming_connections ;
max_download_speed ; max_upload_speed ; max_download_speed ; max_upload_speed ;
read_buffer_size ; read_queue_size ; write_queue_size ; read_buffer_size ; read_queue_size ; write_queue_size ;
@ -199,13 +200,19 @@ let limit : P2p.limits Data_encoding.t =
max_known_points ; max_known_peer_ids ; swap_linger ; max_known_points ; max_known_peer_ids ; swap_linger ;
binary_chunks_size binary_chunks_size
}) })
(merge_objs
(merge_objs (merge_objs
(obj10 (obj10
(dft "authentification-timeout" (dft "connection-timeout"
(Data_encoding.describe
~description: "Delay acceptable when initiating a \
connection to a new peer, in seconds."
float) default_p2p_limits.authentication_timeout)
(dft "authentication-timeout"
(Data_encoding.describe (Data_encoding.describe
~description: "Delay granted to a peer to perform authentication, \ ~description: "Delay granted to a peer to perform authentication, \
in seconds." in seconds."
float) default_p2p_limits.authentification_timeout) float) default_p2p_limits.authentication_timeout)
(dft "min-connections" (dft "min-connections"
(Data_encoding.describe (Data_encoding.describe
~description: "Strict minimum number of connections (triggers an \ ~description: "Strict minimum number of connections (triggers an \
@ -245,9 +252,9 @@ let limit : P2p.limits Data_encoding.t =
~description: "Max upload speeds in KiB/s." ~description: "Max upload speeds in KiB/s."
int31)) int31))
(dft "swap-linger" float default_p2p_limits.swap_linger) (dft "swap-linger" float default_p2p_limits.swap_linger))
(opt "binary-chunks-size" uint8))
(obj10 (obj10
(opt "binary-chunks-size" uint8)
(dft "read-buffer-size" (dft "read-buffer-size"
(Data_encoding.describe (Data_encoding.describe
~description: "Size of the buffer passed to read(2)." ~description: "Size of the buffer passed to read(2)."
@ -263,8 +270,9 @@ let limit : P2p.limits Data_encoding.t =
(dft "known_peer_ids_history_size" uint16 (dft "known_peer_ids_history_size" uint16
default_p2p_limits.known_points_history_size) default_p2p_limits.known_points_history_size)
(opt "max_known_points" (tup2 uint16 uint16)) (opt "max_known_points" (tup2 uint16 uint16))
(opt "max_known_peer_ids" (tup2 uint16 uint16))
)) ))
(obj1
(opt "max_known_peer_ids" (tup2 uint16 uint16))))
let p2p = let p2p =
let open Data_encoding in let open Data_encoding in

View File

@ -7,18 +7,44 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
(** Moving averages.
This module implements bandwidth counters based on (cumulative)
exponential moving average. Each counter is identified by an
integer. They are stored in an internal hash table.
See i.e.
https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
for the algorithm.
*)
type t type t
(** Type of one bandwidth counter. *)
val create: init:int -> alpha:float -> t val create: init:int -> alpha:float -> t
(** [create ~init ~alpha] is a counter with initial value [init] and
factor [alpha]. *)
val destroy: t -> unit val destroy: t -> unit
(** [destroy t] removes counter [t] from the internal hash table. *)
val add: t -> int -> unit val add: t -> int -> unit
(** [add t id] adds [t] in the internal hash table under identifies
[id]. *)
val on_update: (unit -> unit) -> unit val on_update: (unit -> unit) -> unit
(** [of_update f] registers [f] to be called on each update of the
internal worker (currently every 1s). *)
val updated: unit Lwt_condition.t val updated: unit Lwt_condition.t
(** [updated] is a condition variable that gets signaled on each
update of the internal worker (currently every 1s). *)
type stat = { type stat = {
total: int64 ; total: int64 ;
average: int ; average: int ;
} }
val stat: t -> stat val stat: t -> stat
(** [stat t] is a stat record reflecting the state of [t] at the time
of the call. *)

View File

@ -41,7 +41,8 @@ type config = {
type limits = { type limits = {
authentification_timeout : float ; connection_timeout : float ;
authentication_timeout : float ;
min_connections : int ; min_connections : int ;
expected_connections : int ; expected_connections : int ;
@ -94,7 +95,8 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched =
min_connections = limits.min_connections ; min_connections = limits.min_connections ;
max_connections = limits.max_connections ; max_connections = limits.max_connections ;
max_incoming_connections = limits.max_incoming_connections ; max_incoming_connections = limits.max_incoming_connections ;
authentification_timeout = limits.authentification_timeout ; connection_timeout = limits.connection_timeout ;
authentication_timeout = limits.authentication_timeout ;
incoming_app_message_queue_size = limits.incoming_app_message_queue_size ; incoming_app_message_queue_size = limits.incoming_app_message_queue_size ;
incoming_message_queue_size = limits.incoming_message_queue_size ; incoming_message_queue_size = limits.incoming_message_queue_size ;
outgoing_message_queue_size = limits.outgoing_message_queue_size ; outgoing_message_queue_size = limits.outgoing_message_queue_size ;
@ -123,19 +125,14 @@ let bounds ~min ~expected ~max =
max_threshold = max - step_max ; max_threshold = max - step_max ;
} }
let may_create_discovery_worker _config pool = let create_maintenance_worker limits pool =
Some (P2p_discovery.create pool)
let create_maintenance_worker limits pool disco =
let bounds = let bounds =
bounds bounds
~min:limits.min_connections ~min:limits.min_connections
~expected:limits.expected_connections ~expected:limits.expected_connections
~max:limits.max_connections ~max:limits.max_connections
in in
P2p_maintenance.run P2p_maintenance.run bounds pool
~connection_timeout:limits.authentification_timeout
bounds pool disco
let may_create_welcome_worker config limits pool = let may_create_welcome_worker config limits pool =
match config.listening_port with match config.listening_port with
@ -156,7 +153,6 @@ module Real = struct
limits: limits ; limits: limits ;
io_sched: P2p_io_scheduler.t ; io_sched: P2p_io_scheduler.t ;
pool: ('msg, 'meta) P2p_pool.t ; pool: ('msg, 'meta) P2p_pool.t ;
discoverer: P2p_discovery.t option ;
maintenance: 'meta P2p_maintenance.t ; maintenance: 'meta P2p_maintenance.t ;
welcome: P2p_welcome.t option ; welcome: P2p_welcome.t option ;
} }
@ -165,15 +161,13 @@ module Real = struct
let io_sched = create_scheduler limits in let io_sched = create_scheduler limits in
create_connection_pool create_connection_pool
config limits meta_cfg msg_cfg io_sched >>= fun pool -> config limits meta_cfg msg_cfg io_sched >>= fun pool ->
let discoverer = may_create_discovery_worker config pool in let maintenance = create_maintenance_worker limits pool in
let maintenance = create_maintenance_worker limits pool discoverer in
may_create_welcome_worker config limits pool >>= fun welcome -> may_create_welcome_worker config limits pool >>= fun welcome ->
return { return {
config ; config ;
limits ; limits ;
io_sched ; io_sched ;
pool ; pool ;
discoverer ;
maintenance ; maintenance ;
welcome ; welcome ;
} }
@ -190,7 +184,6 @@ module Real = struct
let shutdown net () = let shutdown net () =
Lwt_utils.may ~f:P2p_welcome.shutdown net.welcome >>= fun () -> Lwt_utils.may ~f:P2p_welcome.shutdown net.welcome >>= fun () ->
P2p_maintenance.shutdown net.maintenance >>= fun () -> P2p_maintenance.shutdown net.maintenance >>= fun () ->
Lwt_utils.may ~f:P2p_discovery.shutdown net.discoverer >>= fun () ->
P2p_pool.destroy net.pool >>= fun () -> P2p_pool.destroy net.pool >>= fun () ->
P2p_io_scheduler.shutdown ~timeout:3.0 net.io_sched P2p_io_scheduler.shutdown ~timeout:3.0 net.io_sched
@ -352,8 +345,8 @@ let check_limits =
Error_monad.failwith "value of option %S cannot be negative@." orig Error_monad.failwith "value of option %S cannot be negative@." orig
in in
fun c -> fun c ->
fail_1 c.authentification_timeout fail_1 c.authentication_timeout
"authentification-timeout" >>=? fun () -> "authentication-timeout" >>=? fun () ->
fail_2 c.min_connections fail_2 c.min_connections
"min-connections" >>=? fun () -> "min-connections" >>=? fun () ->
fail_2 c.expected_connections fail_2 c.expected_connections

View File

@ -7,7 +7,13 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
(** Tezos Shell Net - Low level API for the Gossip network *) (** Tezos Shell Net - Low level API for the Gossip network
This is the entry point of the peer-to-peer layer.
It is used by the Shell as the API to communicate with other
nodes.
*)
type 'meta meta_config = { type 'meta meta_config = {
encoding : 'meta Data_encoding.t; encoding : 'meta Data_encoding.t;
@ -61,7 +67,10 @@ type config = {
(** Network capacities *) (** Network capacities *)
type limits = { type limits = {
authentification_timeout : float ; connection_timeout : float ;
(** Maximum time allowed to the establishment of a connection. *)
authentication_timeout : float ;
(** Delay granted to a peer to perform authentication, in seconds. *) (** Delay granted to a peer to perform authentication, in seconds. *)
min_connections : int ; min_connections : int ;
@ -113,6 +122,10 @@ type limits = {
} }
(** Type of a P2P layer instance, parametrized by:
['msg]: type of messages exchanged between peers
['meta]: type of the metadata associated with peers (score, etc.)
*)
type ('msg, 'meta) t type ('msg, 'meta) t
type ('msg, 'meta) net = ('msg, 'meta) t type ('msg, 'meta) net = ('msg, 'meta) t

View File

@ -1,137 +0,0 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
include Logging.Make (struct let name = "p2p.discovery" end)
type t = ()
let create _pool = ()
let restart () = (() : unit)
let shutdown () = Lwt.return_unit
let inet_addr = Unix.inet_addr_of_string "ff0e::54:455a:3053"
(*
module Message = struct
let encoding =
Data_encoding.(tup3 (Fixed.string 10) P2p_peer.Id.encoding int16)
let length = Data_encoding.Binary.fixed_length_exn encoding
let make peer_id port =
Data_encoding.Binary.to_bytes encoding ("DISCOMAGIC", peer_id, port)
end
(* Sends discover messages into space in an exponentially delayed loop,
restartable using a condition *)
let sender sock saddr my_peer_id inco_port cancelation restart =
let buf = Message.make my_peer_id inco_port in
let rec loop delay n =
Lwt.catch
(fun () ->
Lwt_bytes.sendto sock buf 0 Message.length [] saddr >>= fun _nb_sent ->
Lwt.return_unit)
(fun exn ->
lwt_debug "(%a) error broadcasting a discovery request: %a"
P2p_peer.Id.pp my_peer_id Error_monad.pp (Exn exn)) >>= fun () ->
Lwt.pick
[ (Lwt_unix.sleep delay >>= fun () -> Lwt.return (Some (delay, n + 1))) ;
(cancelation () >>= fun () -> Lwt.return_none) ;
(Lwt_condition.wait restart >>= fun () -> Lwt.return (Some (0.1, 0))) ]
>>= function
| Some (delay, n) when n = 10 -> loop delay 9
| Some (delay, n) -> loop (delay *. 2.) n
| None -> Lwt.return_unit
in
loop 0.2 1
let create_socket (iface, disco_addr, disco_port) =
let usock = Unix.socket PF_INET6 SOCK_DGRAM 0 in
let sock = Lwt_unix.of_unix_file_descr ~blocking:false usock in
let saddr = Unix.ADDR_INET (disco_addr, disco_port) in
Unix.setsockopt usock SO_REUSEADDR true ;
Ipv6_multicast.Unix.bind ?iface usock saddr ;
Ipv6_multicast.Unix.membership ?iface usock disco_addr `Join ;
iface, sock, saddr
module Answerer = struct
(* Launch an answer machine for the discovery mechanism, takes a
callback to fill the answers and returns a canceler function *)
let answerer sock my_peer_id cancelation callback =
(* the answering function *)
let buf = MBytes.create Message.length in
let rec step () =
Lwt.pick
[ (cancelation () >>= fun () -> Lwt.return_none) ;
(Lwt_bytes.recvfrom sock buf 0 Message.length [] >>= fun r ->
Lwt.return (Some r)) ] >>= function
| None -> Lwt.return_unit
| Some (len', Lwt_unix.ADDR_INET (remote_addr, _mcast_port))
when len' = Message.length -> begin
match (Data_encoding.Binary.of_bytes Message.encoding buf) with
| Some ("DISCOMAGIC", remote_peer_id, remote_inco_port)
when remote_peer_id <> my_peer_id ->
Lwt.catch
(fun () -> callback ~remote_addr ~remote_inco_port)
(fun exn ->
lwt_debug "Error processing a discovery request: %a"
pp_exn exn) >>=
step
| _ ->
step ()
end
| Some _ -> step ()
in
step ()
let worker_loop st =
let callback ~remote_addr ~remote_inco_port =
let remote_uaddr = Ipaddr_unix.V6.of_inet_addr_exn remote_addr in
P2p_connection_loop.notify_new_peer
in
Lwt.catch
(fun () ->
Lwt_utils.worker
(Format.asprintf "(%a) discovery answerer" P2p_peer.Id.pp my_peer_id)
(fun () -> answerer fd my_peer_id cancelation callback)
cancel)
(fun exn ->
lwt_log_error "Discovery answerer not started: %a"
Error_monad.pp (Exn exn))
end
let discovery_sender =
match config.pending_authentification_port with
| None -> Lwt.return_unit
| Some inco_port ->
Lwt.catch
(fun () ->
let sender () =
Discovery.sender fd
saddr my_peer_id inco_port cancelation restart_discovery in
Lwt_utils.worker
(Format.asprintf "(%a) discovery sender" P2p_peer.Id.pp my_peer_id)
sender cancel)
(fun exn ->
lwt_log_error "Discovery sender not started: %a"
Error_monad.pp (Exn exn))
let discovery_answerer, discovery_sender =
match map_option ~f:create_socket st.config.local_discovery with
| exception exn ->
log_error "Error creating discovery socket: %a" Error_monad.pp (Exn exn) ;
(Lwt.return_unit, Lwt.return_unit)
| None -> Lwt.return_unit, Lwt.return_unit
| Some (iface, fd, saddr) ->
discovery_answerer, discovery_sender
*)

View File

@ -1,13 +0,0 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type t
val create : ('msg, 'meta) P2p_pool.t -> t
val restart : t -> unit
val shutdown : t -> unit Lwt.t

View File

@ -7,12 +7,13 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
(** IO Scheduling. This module implements generic IO scheduling (** Generic IO scheduling between file descriptors.
between file descriptors. In order to use IO scheduling, the
[register] function must be used to make a file descriptor managed In order to use IO scheduling, the [register] function must be
by a [scheduler].. It will return a value of type [connection] used to make a file descriptor managed by a [scheduler].. It will
that must be used to perform IO on the managed file descriptor return a value of type [connection] that must be used to perform IO
using this module's dedicated IO functions (read, write, etc.). on the managed file descriptor using this module's dedicated IO
functions (read, write, etc.).
Each connection is allowed a read (resp. write) quota, which is Each connection is allowed a read (resp. write) quota, which is
for now fairly distributed among connections. for now fairly distributed among connections.

View File

@ -20,10 +20,8 @@ type 'meta pool = Pool : ('msg, 'meta) P2p_pool.t -> 'meta pool
type 'meta t = { type 'meta t = {
canceler: Lwt_canceler.t ; canceler: Lwt_canceler.t ;
connection_timeout: float ;
bounds: bounds ; bounds: bounds ;
pool: 'meta pool ; pool: 'meta pool ;
disco: P2p_discovery.t option ;
just_maintained: unit Lwt_condition.t ; just_maintained: unit Lwt_condition.t ;
please_maintain: unit Lwt_condition.t ; please_maintain: unit Lwt_condition.t ;
mutable maintain_worker : unit Lwt.t ; mutable maintain_worker : unit Lwt.t ;
@ -80,8 +78,7 @@ let rec try_to_contact
else else
List.fold_left List.fold_left
(fun acc point -> (fun acc point ->
P2p_pool.connect P2p_pool.connect pool point >>= function
~timeout:st.connection_timeout pool point >>= function
| Ok _ -> acc >|= succ | Ok _ -> acc >|= succ
| Error _ -> acc) | Error _ -> acc)
(Lwt.return 0) (Lwt.return 0)
@ -115,9 +112,7 @@ and too_few_connections st n_connected =
if success then begin if success then begin
maintain st maintain st
end else begin end else begin
(* not enough contacts, ask the pals of our pals, (* not enough contacts, ask the pals of our pals, and then wait *)
discover the local network and then wait *)
Option.iter ~f:P2p_discovery.restart st.disco ;
P2p_pool.broadcast_bootstrap_msg pool ; P2p_pool.broadcast_bootstrap_msg pool ;
protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
Lwt.pick [ Lwt.pick [
@ -168,14 +163,12 @@ let rec worker_loop st =
| Error [ Canceled ] -> Lwt.return_unit | Error [ Canceled ] -> Lwt.return_unit
| Error _ -> Lwt.return_unit | Error _ -> Lwt.return_unit
let run ~connection_timeout bounds pool disco = let run bounds pool =
let canceler = Lwt_canceler.create () in let canceler = Lwt_canceler.create () in
let st = { let st = {
canceler ; canceler ;
connection_timeout ;
bounds ; bounds ;
pool = Pool pool ; pool = Pool pool ;
disco ;
just_maintained = Lwt_condition.create () ; just_maintained = Lwt_condition.create () ;
please_maintain = Lwt_condition.create () ; please_maintain = Lwt_condition.create () ;
maintain_worker = Lwt.return_unit ; maintain_worker = Lwt.return_unit ;

View File

@ -9,14 +9,17 @@
(* min <= min_threshold <= min_target <= max_target <= max_threshold <= max *) (* min <= min_threshold <= min_target <= max_target <= max_threshold <= max *)
(* The 'pool' urges the maintainer to work when the number of (** P2P maintenance worker.
The P2P layer urges the maintainer to work when the number of
connections reaches `max` or is below `min`. Otherwise, the connections reaches `max` or is below `min`. Otherwise, the
maintener is lazy and only lookup for connection every two maintener is lazy and only looks up for connections every two
minutes. The [maintain] function is another way to signal the minutes (hardcoded constant). The [maintain] function is another
maintainer that a maintenance step is desired. way to signal the maintainer that a maintenance step is desired.
When the maintener detects that the number of connections is over When the maintener detects that the number of connections is over
`max_threshold`, it randomly kills connections to reach `max_target`. `max_threshold`, it randomly kills connections to reach
`max_target`.
When the maintener detects that the number of connections is below When the maintener detects that the number of connections is below
`min_threshold`, it creates enough connection to reach at least `min_threshold`, it creates enough connection to reach at least
@ -34,12 +37,15 @@ type 'meta t
(** Type of a maintenance worker. *) (** Type of a maintenance worker. *)
val run: val run:
connection_timeout:float -> bounds -> ('msg, 'meta) P2p_pool.t -> 'meta t
bounds -> (** [run bounds pool] is a maintenance worker for [pool] with
('msg, 'meta) P2p_pool.t -> connection targets specified in [bounds]. *)
P2p_discovery.t option ->
'meta t
val maintain: 'meta t -> unit Lwt.t val maintain: 'meta t -> unit Lwt.t
(** [maintain t] gives a hint to maintenance worker [t] that
maintenance is needed and returns whenever [t] has done a
maintenance cycle. *)
val shutdown: 'meta t -> unit Lwt.t val shutdown: 'meta t -> unit Lwt.t
(** [shutdown t] is a thread that returns whenever [t] has
successfully shut down. *)

View File

@ -160,7 +160,8 @@ type config = {
min_connections : int ; min_connections : int ;
max_connections : int ; max_connections : int ;
max_incoming_connections : int ; max_incoming_connections : int ;
authentification_timeout : float ; connection_timeout : float ;
authentication_timeout : float ;
incoming_app_message_queue_size : int option ; incoming_app_message_queue_size : int option ;
incoming_message_queue_size : int option ; incoming_message_queue_size : int option ;
@ -555,7 +556,9 @@ let compare_known_point_info p1 p2 =
| true, false -> 1 | true, false -> 1
| true, true -> compare_last_seen p2 p1 | true, true -> compare_last_seen p2 p1
let rec connect ~timeout pool point = let rec connect ?timeout pool point =
let timeout =
Option.unopt ~default:pool.config.connection_timeout timeout in
fail_unless fail_unless
(active_connections pool <= pool.config.max_connections) (active_connections pool <= pool.config.max_connections)
P2p_errors.Too_many_connections >>=? fun () -> P2p_errors.Too_many_connections >>=? fun () ->
@ -858,7 +861,7 @@ and swap_ack pool conn new_point _new_peer_id =
and swap pool conn current_peer_id new_point = and swap pool conn current_peer_id new_point =
let source_peer_id = P2p_peer_state.Info.peer_id conn.peer_info in let source_peer_id = P2p_peer_state.Info.peer_id conn.peer_info in
pool.latest_accepted_swap <- Time.now () ; pool.latest_accepted_swap <- Time.now () ;
connect ~timeout:10. pool new_point >>= function connect pool new_point >>= function
| Ok _new_conn -> begin | Ok _new_conn -> begin
pool.latest_succesfull_swap <- Time.now () ; pool.latest_succesfull_swap <- Time.now () ;
log pool (Swap_success { source = source_peer_id }) ; log pool (Swap_success { source = source_peer_id }) ;
@ -891,7 +894,7 @@ let accept pool fd point =
P2p_point.Table.add pool.incoming point canceler ; P2p_point.Table.add pool.incoming point canceler ;
Lwt.async begin fun () -> Lwt.async begin fun () ->
with_timeout with_timeout
~canceler (Lwt_unix.sleep pool.config.authentification_timeout) ~canceler (Lwt_unix.sleep pool.config.authentication_timeout)
(fun canceler -> authenticate pool canceler fd point) (fun canceler -> authenticate pool canceler fd point)
end end

View File

@ -8,19 +8,19 @@
(**************************************************************************) (**************************************************************************)
(** Pool of connections. This module manages the connection pool that (** Pool of connections. This module manages the connection pool that
the shell needs to maintain in order to function correctly. the peer-to-peer layer needs to maintain in order to function
correctly.
A pool and its connections are parametrized by the type of A pool and its connections are parametrized by the type of
messages exchanged over the connection and the type of messages exchanged over the connection and the type of
meta-information associated with a peer. The type [('msg, 'meta) meta-information associated with a peer. The type [('msg, 'meta)
connection] is a wrapper on top of [P2p_connection.t] that adds connection] is a wrapper on top of [P2p_socket.t] that adds
meta-information, a data-structure describing a fine-grained state meta-information, a data-structure describing the detailed state of
of the connection, as well as a new message queue (referred to the connection, as well as a new message queue (referred to "app
"app message queue") that will only contain the messages from the message queue") that will only contain the messages from the
internal [P2p_connection.t] that needs to be examined by the internal [P2p_socket.t] that needs to be examined by the higher
higher layers. Some messages are directly processed by an internal layers. Some messages are directly processed by an internal worker
worker and thus never propagated above. and thus never propagated above. *)
*)
type 'msg encoding = Encoding : { type 'msg encoding = Encoding : {
tag: int ; tag: int ;
@ -75,7 +75,10 @@ type config = {
Above this number, [accept] will start dropping incoming Above this number, [accept] will start dropping incoming
connections. *) connections. *)
authentification_timeout : float ; connection_timeout : float ;
(** Maximum time allowed to the establishment of a connection. *)
authentication_timeout : float ;
(** Delay granted to a peer to perform authentication, in seconds. *) (** Delay granted to a peer to perform authentication, in seconds. *)
incoming_app_message_queue_size : int option ; incoming_app_message_queue_size : int option ;
@ -181,11 +184,11 @@ type ('msg, 'meta) connection
fine-grained logical state of the connection. *) fine-grained logical state of the connection. *)
val connect: val connect:
timeout:float -> ?timeout:float ->
('msg, 'meta) pool -> P2p_point.Id.t -> ('msg, 'meta) pool -> P2p_point.Id.t ->
('msg, 'meta) connection tzresult Lwt.t ('msg, 'meta) connection tzresult Lwt.t
(** [connect ~timeout pool point] tries to add a (** [connect ?timeout pool point] tries to add a connection to [point]
connection to [point] in [pool] in less than [timeout] seconds. *) in [pool] in less than [timeout] seconds. *)
val accept: val accept:
('msg, 'meta) pool -> Lwt_unix.file_descr -> P2p_point.Id.t -> unit ('msg, 'meta) pool -> Lwt_unix.file_descr -> P2p_point.Id.t -> unit

View File

@ -7,7 +7,9 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
(** This modules adds message encoding and encryption to (** Typed and encrypted connections to peers.
This modules adds message encoding and encryption to
[P2p_io_scheduler]'s generic throttled connections. [P2p_io_scheduler]'s generic throttled connections.
Each connection have an associated internal read (resp. write) Each connection have an associated internal read (resp. write)

View File

@ -47,7 +47,7 @@ let create_listening_socket ~backlog ?(addr = Ipaddr.V6.unspecified) port =
Lwt_unix.listen main_socket backlog ; Lwt_unix.listen main_socket backlog ;
Lwt.return main_socket Lwt.return main_socket
let run ~backlog pool ?addr port = let run ?addr ~backlog pool port =
Lwt.catch begin fun () -> Lwt.catch begin fun () ->
create_listening_socket create_listening_socket
~backlog ?addr port >>= fun socket -> ~backlog ?addr port >>= fun socket ->

View File

@ -7,19 +7,20 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
(** Welcome worker. Accept incoming connections and add them to its (** Welcome worker.
connection pool. *)
Accept incoming connections and add them to the pool.
*)
type t type t
(** Type of a welcome worker, parametrized like a (** Type of a welcome worker. *)
[P2p_connection_pool.pool]. *)
val run: val run:
backlog:int -> ?addr:P2p_addr.t -> backlog:int ->
('msg, 'meta) P2p_pool.t -> ('msg, 'meta) P2p_pool.t -> P2p_addr.port -> t Lwt.t
?addr:P2p_addr.t -> P2p_addr.port -> t Lwt.t (** [run ?addr ~backlog pool port] returns a running welcome worker
(** [run ~backlog ~addr pool port] returns a running welcome worker adding connections into [pool] listening on [addr:port]. [backlog]
feeding [pool] listening at [(addr, port)]. [backlog] is the is passed to [Lwt_unix.listen]. *)
argument passed to [Lwt_unix.accept]. *)
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t
(** [shutdown t] returns when [t] has completed shutdown. *)

View File

@ -69,7 +69,8 @@ let detach_node f points n =
min_connections = nb_points ; min_connections = nb_points ;
max_connections = nb_points ; max_connections = nb_points ;
max_incoming_connections = nb_points ; max_incoming_connections = nb_points ;
authentification_timeout = 2. ; connection_timeout = 10. ;
authentication_timeout = 2. ;
incoming_app_message_queue_size = None ; incoming_app_message_queue_size = None ;
incoming_message_queue_size = None ; incoming_message_queue_size = None ;
outgoing_message_queue_size = None ; outgoing_message_queue_size = None ;