diff --git a/docs/index.rst b/docs/index.rst index 96f8ffa15..b5eeb5216 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -60,6 +60,7 @@ license when the main network lunches. :caption: White doc: whitedoc/the_big_picture + whitedoc/p2p whitedoc/validation whitedoc/michelson whitedoc/proof_of_stake diff --git a/docs/whitedoc/p2p.rst b/docs/whitedoc/p2p.rst new file mode 100644 index 000000000..565433cc0 --- /dev/null +++ b/docs/whitedoc/p2p.rst @@ -0,0 +1,114 @@ +.. _p2p: + +The peer-to-peer layer +====================== + +This document explains the inner workings of the peer-to-peer layer of +the Tezos shell. This part is in charge of establishing and +maintaining network connections with other nodes (gossip). + +The P2P layer is instanciated by the node. It is parametrized by the +type of messages that are exchanged over the network (to allow +different P2P protocol versions/extensions), and the type of metadata +associated to each peer. The latter is useful to compute a score for +each peer that reflects the level of trust we have in it. Different +policies can be used when communicating with peers with different +score values. + +The P2P layer is comprised of a pool of connections, a set of +operations on those connections, and a set of workers following the +worker pattern pervasively used in the code base. + +The P2P layer is packaged in :package:`tezos-p2p`, which has +documentation for all modules. + +General operation +----------------- + +I/O Scheduling +~~~~~~~~~~~~~~ + +The P2P layer uses I/O scheduling in order to be able to control its +bandwidth usage as well as implementing different policies +(e.g. read/write quotas) to different peers. For now, each peer is +granted a fair share of the global allocated bandwidth, but it is +planned for the individual allocated bandwidth to each peer to be a +function of the peer's score. + +Encryption +~~~~~~~~~~ + +The connection between each peer is encrypted using `NaCl` +authenticated-encryption `API `__. This +is done to provide an additional level of security and tamper-proof +guarantees in the communication between peers. + +Message queues +~~~~~~~~~~~~~~ + +On top of basic I/O scheduling, two finite-size typed message queues +are used to store incoming (resp. outgoing) messages for each +peer. This further restricts the speed at which communication is +possible with a peer; when a queue is full, it is not possible to read +(resp. write) an additional message. The high-level +`P2p_socket.connection +<../api/odoc/tezos-p2p/Tezos_p2p/P2p_socket/index.html#type-connection>`__ +type by the P2P layer is basically a UNIX socket upgraded with I/O +scheduling, peer metadata, cryptographic keys and two messages queues +operated by dedicated workers which operate on those queues. + +Pool of connections +~~~~~~~~~~~~~~~~~~~ + +All the above modules are used in `P2p_pool +<../api/odoc/tezos-p2p/Tezos_p2p/P2p_pool/index.html>`__, which +constitutes the core of the P2P layer, together with the worker +processes described below. It comprises various tables of connections +as well as methods to query them, also connections are extended with +another message queue where lower level messages (like responses to +ping) are filtered out and only application-level messages are kept. + +The main entry point of the P2P layer is in module `P2p +<../api/odoc/tezos-p2p/Tezos_p2p/P2p/index.html>`__. See below +for a description of workers acting onto the P2P layer. + +Welcome worker +-------------- + +The welcome worker is responsible for accepting incoming connections +and register them into the pool of connections managed by the P2P +layer. It basically runs the ``accept(2)`` syscall and call +`P2p_pool.accept +<../api/odoc/tezos-p2p/Tezos_p2p/P2p_pool/index.html#val-accept>`__ so +that it is made aware of an incoming connection. From there, the pool +will decide how this new connection must be handled. + +Maintenance worker +------------------ + +The maintenance worker is in charge of establishing an appropriate +number of connections with other nodes in order to guarantee a +realistic view of the state of the blockchain. It is created with a +set of targets to reach regarding the desired amount of peers it needs +to keep an active connection to. + +At the pool level, the minimum (resp. maximum) acceptable number of +connections is defined. + +At the maintenance worker level, two other sets of thresholds are +defined: ``target`` (min and max) and ``threshold`` (min and max). + +Given these bounds, the maintenance worker: + +* Will be triggered every two minutes, when asked by the shell, or + when the minimum or maximum number of acceptable connections is + reached, whichever happens first. + +* Will perform the following actions when triggered: if the number of + connections is above ``max_threshold``, it will kill connections + randomly until it reaches ``max_target`` connections. If the number of + connections is below ``min_threshold``, it will attempt to connect to + peers until it reaches at least ``min_target`` connections (and never + more than ``max_target`` connections). + + diff --git a/src/bin_node/node_config_file.ml b/src/bin_node/node_config_file.ml index a8a8a3a4f..bb8a8cfcd 100644 --- a/src/bin_node/node_config_file.ml +++ b/src/bin_node/node_config_file.ml @@ -60,7 +60,8 @@ and shell = { } let default_p2p_limits : P2p.limits = { - authentification_timeout = 5. ; + connection_timeout = 10. ; + authentication_timeout = 5. ; min_connections = 10 ; expected_connections = 50 ; max_connections = 100 ; @@ -158,7 +159,7 @@ let default_config = { let limit : P2p.limits Data_encoding.t = let open Data_encoding in conv - (fun { P2p.authentification_timeout ; + (fun { P2p.connection_timeout ; authentication_timeout ; min_connections ; expected_connections ; max_connections ; backlog ; max_incoming_connections ; max_download_speed ; max_upload_speed ; @@ -169,27 +170,27 @@ let limit : P2p.limits Data_encoding.t = max_known_points ; max_known_peer_ids ; swap_linger ; binary_chunks_size } -> - ( ( authentification_timeout, min_connections, expected_connections, - max_connections, backlog, max_incoming_connections, - max_download_speed, max_upload_speed, swap_linger, - binary_chunks_size) , - ( read_buffer_size, read_queue_size, write_queue_size, - incoming_app_message_queue_size, - incoming_message_queue_size, outgoing_message_queue_size, - known_points_history_size, known_peer_ids_history_size, - max_known_points, max_known_peer_ids - ))) - (fun ( ( authentification_timeout, min_connections, expected_connections, - max_connections, backlog, max_incoming_connections, - max_download_speed, max_upload_speed, swap_linger, - binary_chunks_size) , - ( read_buffer_size, read_queue_size, write_queue_size, - incoming_app_message_queue_size, - incoming_message_queue_size, outgoing_message_queue_size, - known_points_history_size, known_peer_ids_history_size, - max_known_points, max_known_peer_ids - ) ) -> - { authentification_timeout ; min_connections ; expected_connections ; + (((( connection_timeout, + authentication_timeout, min_connections, expected_connections, + max_connections, backlog, max_incoming_connections, + max_download_speed, max_upload_speed, swap_linger), + ( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size, + incoming_app_message_queue_size, + incoming_message_queue_size, outgoing_message_queue_size, + known_points_history_size, known_peer_ids_history_size, + max_known_points)), + max_known_peer_ids))) + (fun (((( connection_timeout, + authentication_timeout, min_connections, expected_connections, + max_connections, backlog, max_incoming_connections, + max_download_speed, max_upload_speed, swap_linger), + ( binary_chunks_size, read_buffer_size, read_queue_size, write_queue_size, + incoming_app_message_queue_size, + incoming_message_queue_size, outgoing_message_queue_size, + known_points_history_size, known_peer_ids_history_size, + max_known_points)), + max_known_peer_ids)) -> + { connection_timeout ; authentication_timeout ; min_connections ; expected_connections ; max_connections ; backlog ; max_incoming_connections ; max_download_speed ; max_upload_speed ; read_buffer_size ; read_queue_size ; write_queue_size ; @@ -200,71 +201,78 @@ let limit : P2p.limits Data_encoding.t = binary_chunks_size }) (merge_objs - (obj10 - (dft "authentification-timeout" - (Data_encoding.describe - ~description: "Delay granted to a peer to perform authentication, \ - in seconds." - float) default_p2p_limits.authentification_timeout) - (dft "min-connections" - (Data_encoding.describe - ~description: "Strict minimum number of connections (triggers an \ - urgent maintenance)." - uint16) - default_p2p_limits.min_connections) - (dft "expected-connections" - (Data_encoding.describe - ~description: "Targeted number of connections to reach when \ - bootstraping / maintaining." - uint16) - default_p2p_limits.expected_connections) - (dft "max-connections" - (Data_encoding.describe - ~description: "Maximum number of connections (exceeding peers are \ - disconnected)." - uint16) - default_p2p_limits.max_connections) - (dft "backlog" - (Data_encoding.describe - ~description: "Number above which pending incoming connections are \ - immediately rejected." - uint8) - default_p2p_limits.backlog) - (dft "max-incoming-connections" - (Data_encoding.describe - ~description: "Number above which pending incoming connections are \ - immediately rejected." - uint8) - default_p2p_limits.max_incoming_connections) - (opt "max-download-speed" - (Data_encoding.describe - ~description: "Max download speeds in KiB/s." - int31)) - (opt "max-upload-speed" - (Data_encoding.describe - ~description: "Max upload speeds in KiB/s." + (merge_objs + (obj10 + (dft "connection-timeout" + (Data_encoding.describe + ~description: "Delay acceptable when initiating a \ + connection to a new peer, in seconds." + float) default_p2p_limits.authentication_timeout) + (dft "authentication-timeout" + (Data_encoding.describe + ~description: "Delay granted to a peer to perform authentication, \ + in seconds." + float) default_p2p_limits.authentication_timeout) + (dft "min-connections" + (Data_encoding.describe + ~description: "Strict minimum number of connections (triggers an \ + urgent maintenance)." + uint16) + default_p2p_limits.min_connections) + (dft "expected-connections" + (Data_encoding.describe + ~description: "Targeted number of connections to reach when \ + bootstraping / maintaining." + uint16) + default_p2p_limits.expected_connections) + (dft "max-connections" + (Data_encoding.describe + ~description: "Maximum number of connections (exceeding peers are \ + disconnected)." + uint16) + default_p2p_limits.max_connections) + (dft "backlog" + (Data_encoding.describe + ~description: "Number above which pending incoming connections are \ + immediately rejected." + uint8) + default_p2p_limits.backlog) + (dft "max-incoming-connections" + (Data_encoding.describe + ~description: "Number above which pending incoming connections are \ + immediately rejected." + uint8) + default_p2p_limits.max_incoming_connections) + (opt "max-download-speed" + (Data_encoding.describe + ~description: "Max download speeds in KiB/s." + int31)) + (opt "max-upload-speed" + (Data_encoding.describe + ~description: "Max upload speeds in KiB/s." - int31)) - (dft "swap-linger" float default_p2p_limits.swap_linger) - (opt "binary-chunks-size" uint8)) - (obj10 - (dft "read-buffer-size" - (Data_encoding.describe - ~description: "Size of the buffer passed to read(2)." - int31) - default_p2p_limits.read_buffer_size) - (opt "read-queue-size" int31) - (opt "write-queue-size" int31) - (opt "incoming-app-message-queue-size" int31) - (opt "incoming-message-queue-size" int31) - (opt "outgoing-message-queue-size" int31) - (dft "known_points_history_size" uint16 - default_p2p_limits.known_points_history_size) - (dft "known_peer_ids_history_size" uint16 - default_p2p_limits.known_points_history_size) - (opt "max_known_points" (tup2 uint16 uint16)) - (opt "max_known_peer_ids" (tup2 uint16 uint16)) - )) + int31)) + (dft "swap-linger" float default_p2p_limits.swap_linger)) + (obj10 + (opt "binary-chunks-size" uint8) + (dft "read-buffer-size" + (Data_encoding.describe + ~description: "Size of the buffer passed to read(2)." + int31) + default_p2p_limits.read_buffer_size) + (opt "read-queue-size" int31) + (opt "write-queue-size" int31) + (opt "incoming-app-message-queue-size" int31) + (opt "incoming-message-queue-size" int31) + (opt "outgoing-message-queue-size" int31) + (dft "known_points_history_size" uint16 + default_p2p_limits.known_points_history_size) + (dft "known_peer_ids_history_size" uint16 + default_p2p_limits.known_points_history_size) + (opt "max_known_points" (tup2 uint16 uint16)) + )) + (obj1 + (opt "max_known_peer_ids" (tup2 uint16 uint16)))) let p2p = let open Data_encoding in diff --git a/src/lib_p2p/moving_average.mli b/src/lib_p2p/moving_average.mli index 8dc6a76fd..e40e4625d 100644 --- a/src/lib_p2p/moving_average.mli +++ b/src/lib_p2p/moving_average.mli @@ -7,18 +7,44 @@ (* *) (**************************************************************************) +(** Moving averages. + + This module implements bandwidth counters based on (cumulative) + exponential moving average. Each counter is identified by an + integer. They are stored in an internal hash table. + + See i.e. + https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + for the algorithm. +*) + type t +(** Type of one bandwidth counter. *) val create: init:int -> alpha:float -> t +(** [create ~init ~alpha] is a counter with initial value [init] and + factor [alpha]. *) + val destroy: t -> unit +(** [destroy t] removes counter [t] from the internal hash table. *) val add: t -> int -> unit +(** [add t id] adds [t] in the internal hash table under identifies + [id]. *) val on_update: (unit -> unit) -> unit +(** [of_update f] registers [f] to be called on each update of the + internal worker (currently every 1s). *) + val updated: unit Lwt_condition.t +(** [updated] is a condition variable that gets signaled on each + update of the internal worker (currently every 1s). *) type stat = { total: int64 ; average: int ; } + val stat: t -> stat +(** [stat t] is a stat record reflecting the state of [t] at the time + of the call. *) diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index 3d8c39317..985e5d243 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -41,7 +41,8 @@ type config = { type limits = { - authentification_timeout : float ; + connection_timeout : float ; + authentication_timeout : float ; min_connections : int ; expected_connections : int ; @@ -94,7 +95,8 @@ let create_connection_pool config limits meta_cfg msg_cfg io_sched = min_connections = limits.min_connections ; max_connections = limits.max_connections ; max_incoming_connections = limits.max_incoming_connections ; - authentification_timeout = limits.authentification_timeout ; + connection_timeout = limits.connection_timeout ; + authentication_timeout = limits.authentication_timeout ; incoming_app_message_queue_size = limits.incoming_app_message_queue_size ; incoming_message_queue_size = limits.incoming_message_queue_size ; outgoing_message_queue_size = limits.outgoing_message_queue_size ; @@ -123,19 +125,14 @@ let bounds ~min ~expected ~max = max_threshold = max - step_max ; } -let may_create_discovery_worker _config pool = - Some (P2p_discovery.create pool) - -let create_maintenance_worker limits pool disco = +let create_maintenance_worker limits pool = let bounds = bounds ~min:limits.min_connections ~expected:limits.expected_connections ~max:limits.max_connections in - P2p_maintenance.run - ~connection_timeout:limits.authentification_timeout - bounds pool disco + P2p_maintenance.run bounds pool let may_create_welcome_worker config limits pool = match config.listening_port with @@ -156,7 +153,6 @@ module Real = struct limits: limits ; io_sched: P2p_io_scheduler.t ; pool: ('msg, 'meta) P2p_pool.t ; - discoverer: P2p_discovery.t option ; maintenance: 'meta P2p_maintenance.t ; welcome: P2p_welcome.t option ; } @@ -165,15 +161,13 @@ module Real = struct let io_sched = create_scheduler limits in create_connection_pool config limits meta_cfg msg_cfg io_sched >>= fun pool -> - let discoverer = may_create_discovery_worker config pool in - let maintenance = create_maintenance_worker limits pool discoverer in + let maintenance = create_maintenance_worker limits pool in may_create_welcome_worker config limits pool >>= fun welcome -> return { config ; limits ; io_sched ; pool ; - discoverer ; maintenance ; welcome ; } @@ -190,7 +184,6 @@ module Real = struct let shutdown net () = Lwt_utils.may ~f:P2p_welcome.shutdown net.welcome >>= fun () -> P2p_maintenance.shutdown net.maintenance >>= fun () -> - Lwt_utils.may ~f:P2p_discovery.shutdown net.discoverer >>= fun () -> P2p_pool.destroy net.pool >>= fun () -> P2p_io_scheduler.shutdown ~timeout:3.0 net.io_sched @@ -352,8 +345,8 @@ let check_limits = Error_monad.failwith "value of option %S cannot be negative@." orig in fun c -> - fail_1 c.authentification_timeout - "authentification-timeout" >>=? fun () -> + fail_1 c.authentication_timeout + "authentication-timeout" >>=? fun () -> fail_2 c.min_connections "min-connections" >>=? fun () -> fail_2 c.expected_connections diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index 54fe15c81..d28725507 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -7,7 +7,13 @@ (* *) (**************************************************************************) -(** Tezos Shell Net - Low level API for the Gossip network *) +(** Tezos Shell Net - Low level API for the Gossip network + + This is the entry point of the peer-to-peer layer. + + It is used by the Shell as the API to communicate with other + nodes. +*) type 'meta meta_config = { encoding : 'meta Data_encoding.t; @@ -61,7 +67,10 @@ type config = { (** Network capacities *) type limits = { - authentification_timeout : float ; + connection_timeout : float ; + (** Maximum time allowed to the establishment of a connection. *) + + authentication_timeout : float ; (** Delay granted to a peer to perform authentication, in seconds. *) min_connections : int ; @@ -113,6 +122,10 @@ type limits = { } +(** Type of a P2P layer instance, parametrized by: + ['msg]: type of messages exchanged between peers + ['meta]: type of the metadata associated with peers (score, etc.) +*) type ('msg, 'meta) t type ('msg, 'meta) net = ('msg, 'meta) t diff --git a/src/lib_p2p/p2p_discovery.ml b/src/lib_p2p/p2p_discovery.ml deleted file mode 100644 index 54e889b84..000000000 --- a/src/lib_p2p/p2p_discovery.ml +++ /dev/null @@ -1,137 +0,0 @@ -(**************************************************************************) -(* *) -(* Copyright (c) 2014 - 2018. *) -(* Dynamic Ledger Solutions, Inc. *) -(* *) -(* All rights reserved. No warranty, explicit or implicit, provided. *) -(* *) -(**************************************************************************) - -include Logging.Make (struct let name = "p2p.discovery" end) - -type t = () -let create _pool = () -let restart () = (() : unit) -let shutdown () = Lwt.return_unit - -let inet_addr = Unix.inet_addr_of_string "ff0e::54:455a:3053" - -(* -module Message = struct - - let encoding = - Data_encoding.(tup3 (Fixed.string 10) P2p_peer.Id.encoding int16) - - let length = Data_encoding.Binary.fixed_length_exn encoding - - let make peer_id port = - Data_encoding.Binary.to_bytes encoding ("DISCOMAGIC", peer_id, port) - -end - -(* Sends discover messages into space in an exponentially delayed loop, - restartable using a condition *) -let sender sock saddr my_peer_id inco_port cancelation restart = - let buf = Message.make my_peer_id inco_port in - let rec loop delay n = - Lwt.catch - (fun () -> - Lwt_bytes.sendto sock buf 0 Message.length [] saddr >>= fun _nb_sent -> - Lwt.return_unit) - (fun exn -> - lwt_debug "(%a) error broadcasting a discovery request: %a" - P2p_peer.Id.pp my_peer_id Error_monad.pp (Exn exn)) >>= fun () -> - Lwt.pick - [ (Lwt_unix.sleep delay >>= fun () -> Lwt.return (Some (delay, n + 1))) ; - (cancelation () >>= fun () -> Lwt.return_none) ; - (Lwt_condition.wait restart >>= fun () -> Lwt.return (Some (0.1, 0))) ] - >>= function - | Some (delay, n) when n = 10 -> loop delay 9 - | Some (delay, n) -> loop (delay *. 2.) n - | None -> Lwt.return_unit - in - loop 0.2 1 - -let create_socket (iface, disco_addr, disco_port) = - let usock = Unix.socket PF_INET6 SOCK_DGRAM 0 in - let sock = Lwt_unix.of_unix_file_descr ~blocking:false usock in - let saddr = Unix.ADDR_INET (disco_addr, disco_port) in - Unix.setsockopt usock SO_REUSEADDR true ; - Ipv6_multicast.Unix.bind ?iface usock saddr ; - Ipv6_multicast.Unix.membership ?iface usock disco_addr `Join ; - iface, sock, saddr - -module Answerer = struct - - (* Launch an answer machine for the discovery mechanism, takes a - callback to fill the answers and returns a canceler function *) - let answerer sock my_peer_id cancelation callback = - (* the answering function *) - let buf = MBytes.create Message.length in - let rec step () = - Lwt.pick - [ (cancelation () >>= fun () -> Lwt.return_none) ; - (Lwt_bytes.recvfrom sock buf 0 Message.length [] >>= fun r -> - Lwt.return (Some r)) ] >>= function - | None -> Lwt.return_unit - | Some (len', Lwt_unix.ADDR_INET (remote_addr, _mcast_port)) - when len' = Message.length -> begin - match (Data_encoding.Binary.of_bytes Message.encoding buf) with - | Some ("DISCOMAGIC", remote_peer_id, remote_inco_port) - when remote_peer_id <> my_peer_id -> - Lwt.catch - (fun () -> callback ~remote_addr ~remote_inco_port) - (fun exn -> - lwt_debug "Error processing a discovery request: %a" - pp_exn exn) >>= - step - | _ -> - step () - end - | Some _ -> step () - in - step () - - let worker_loop st = - let callback ~remote_addr ~remote_inco_port = - let remote_uaddr = Ipaddr_unix.V6.of_inet_addr_exn remote_addr in - P2p_connection_loop.notify_new_peer - in - Lwt.catch - (fun () -> - Lwt_utils.worker - (Format.asprintf "(%a) discovery answerer" P2p_peer.Id.pp my_peer_id) - (fun () -> answerer fd my_peer_id cancelation callback) - cancel) - (fun exn -> - lwt_log_error "Discovery answerer not started: %a" - Error_monad.pp (Exn exn)) - -end -let discovery_sender = - match config.pending_authentification_port with - | None -> Lwt.return_unit - | Some inco_port -> - Lwt.catch - (fun () -> - let sender () = - Discovery.sender fd - saddr my_peer_id inco_port cancelation restart_discovery in - Lwt_utils.worker - (Format.asprintf "(%a) discovery sender" P2p_peer.Id.pp my_peer_id) - sender cancel) - (fun exn -> - lwt_log_error "Discovery sender not started: %a" - Error_monad.pp (Exn exn)) - - -let discovery_answerer, discovery_sender = - match map_option ~f:create_socket st.config.local_discovery with - | exception exn -> - log_error "Error creating discovery socket: %a" Error_monad.pp (Exn exn) ; - (Lwt.return_unit, Lwt.return_unit) - | None -> Lwt.return_unit, Lwt.return_unit - | Some (iface, fd, saddr) -> - discovery_answerer, discovery_sender - -*) diff --git a/src/lib_p2p/p2p_discovery.mli b/src/lib_p2p/p2p_discovery.mli deleted file mode 100644 index 14b41de9d..000000000 --- a/src/lib_p2p/p2p_discovery.mli +++ /dev/null @@ -1,13 +0,0 @@ -(**************************************************************************) -(* *) -(* Copyright (c) 2014 - 2018. *) -(* Dynamic Ledger Solutions, Inc. *) -(* *) -(* All rights reserved. No warranty, explicit or implicit, provided. *) -(* *) -(**************************************************************************) - -type t -val create : ('msg, 'meta) P2p_pool.t -> t -val restart : t -> unit -val shutdown : t -> unit Lwt.t diff --git a/src/lib_p2p/p2p_io_scheduler.mli b/src/lib_p2p/p2p_io_scheduler.mli index 45adf73c7..52436ef1f 100644 --- a/src/lib_p2p/p2p_io_scheduler.mli +++ b/src/lib_p2p/p2p_io_scheduler.mli @@ -7,12 +7,13 @@ (* *) (**************************************************************************) -(** IO Scheduling. This module implements generic IO scheduling - between file descriptors. In order to use IO scheduling, the - [register] function must be used to make a file descriptor managed - by a [scheduler].. It will return a value of type [connection] - that must be used to perform IO on the managed file descriptor - using this module's dedicated IO functions (read, write, etc.). +(** Generic IO scheduling between file descriptors. + + In order to use IO scheduling, the [register] function must be + used to make a file descriptor managed by a [scheduler].. It will + return a value of type [connection] that must be used to perform IO + on the managed file descriptor using this module's dedicated IO + functions (read, write, etc.). Each connection is allowed a read (resp. write) quota, which is for now fairly distributed among connections. diff --git a/src/lib_p2p/p2p_maintenance.ml b/src/lib_p2p/p2p_maintenance.ml index bf56abe70..dac5243e7 100644 --- a/src/lib_p2p/p2p_maintenance.ml +++ b/src/lib_p2p/p2p_maintenance.ml @@ -20,10 +20,8 @@ type 'meta pool = Pool : ('msg, 'meta) P2p_pool.t -> 'meta pool type 'meta t = { canceler: Lwt_canceler.t ; - connection_timeout: float ; bounds: bounds ; pool: 'meta pool ; - disco: P2p_discovery.t option ; just_maintained: unit Lwt_condition.t ; please_maintain: unit Lwt_condition.t ; mutable maintain_worker : unit Lwt.t ; @@ -80,8 +78,7 @@ let rec try_to_contact else List.fold_left (fun acc point -> - P2p_pool.connect - ~timeout:st.connection_timeout pool point >>= function + P2p_pool.connect pool point >>= function | Ok _ -> acc >|= succ | Error _ -> acc) (Lwt.return 0) @@ -115,9 +112,7 @@ and too_few_connections st n_connected = if success then begin maintain st end else begin - (* not enough contacts, ask the pals of our pals, - discover the local network and then wait *) - Option.iter ~f:P2p_discovery.restart st.disco ; + (* not enough contacts, ask the pals of our pals, and then wait *) P2p_pool.broadcast_bootstrap_msg pool ; protect ~canceler:st.canceler begin fun () -> Lwt.pick [ @@ -168,14 +163,12 @@ let rec worker_loop st = | Error [ Canceled ] -> Lwt.return_unit | Error _ -> Lwt.return_unit -let run ~connection_timeout bounds pool disco = +let run bounds pool = let canceler = Lwt_canceler.create () in let st = { canceler ; - connection_timeout ; bounds ; pool = Pool pool ; - disco ; just_maintained = Lwt_condition.create () ; please_maintain = Lwt_condition.create () ; maintain_worker = Lwt.return_unit ; diff --git a/src/lib_p2p/p2p_maintenance.mli b/src/lib_p2p/p2p_maintenance.mli index 57c780c78..58c5b1025 100644 --- a/src/lib_p2p/p2p_maintenance.mli +++ b/src/lib_p2p/p2p_maintenance.mli @@ -9,19 +9,22 @@ (* min <= min_threshold <= min_target <= max_target <= max_threshold <= max *) -(* The 'pool' urges the maintainer to work when the number of - connections reaches `max` or is below `min`. Otherwise, the - maintener is lazy and only lookup for connection every two - minutes. The [maintain] function is another way to signal the - maintainer that a maintenance step is desired. +(** P2P maintenance worker. - When the maintener detects that the number of connections is over - `max_threshold`, it randomly kills connections to reach `max_target`. + The P2P layer urges the maintainer to work when the number of + connections reaches `max` or is below `min`. Otherwise, the + maintener is lazy and only looks up for connections every two + minutes (hardcoded constant). The [maintain] function is another + way to signal the maintainer that a maintenance step is desired. - When the maintener detects that the number of connections is below - `min_threshold`, it creates enough connection to reach at least - `min_target` (and never more than `max_target`). In the process, it - might ask its actual peers for new peers. *) + When the maintener detects that the number of connections is over + `max_threshold`, it randomly kills connections to reach + `max_target`. + + When the maintener detects that the number of connections is below + `min_threshold`, it creates enough connection to reach at least + `min_target` (and never more than `max_target`). In the process, it + might ask its actual peers for new peers. *) type bounds = { min_threshold: int ; @@ -34,12 +37,15 @@ type 'meta t (** Type of a maintenance worker. *) val run: - connection_timeout:float -> - bounds -> - ('msg, 'meta) P2p_pool.t -> - P2p_discovery.t option -> - 'meta t + bounds -> ('msg, 'meta) P2p_pool.t -> 'meta t +(** [run bounds pool] is a maintenance worker for [pool] with + connection targets specified in [bounds]. *) val maintain: 'meta t -> unit Lwt.t +(** [maintain t] gives a hint to maintenance worker [t] that + maintenance is needed and returns whenever [t] has done a + maintenance cycle. *) val shutdown: 'meta t -> unit Lwt.t +(** [shutdown t] is a thread that returns whenever [t] has + successfully shut down. *) diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index e631d5f0b..87ed5c5d8 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -160,7 +160,8 @@ type config = { min_connections : int ; max_connections : int ; max_incoming_connections : int ; - authentification_timeout : float ; + connection_timeout : float ; + authentication_timeout : float ; incoming_app_message_queue_size : int option ; incoming_message_queue_size : int option ; @@ -555,7 +556,9 @@ let compare_known_point_info p1 p2 = | true, false -> 1 | true, true -> compare_last_seen p2 p1 -let rec connect ~timeout pool point = +let rec connect ?timeout pool point = + let timeout = + Option.unopt ~default:pool.config.connection_timeout timeout in fail_unless (active_connections pool <= pool.config.max_connections) P2p_errors.Too_many_connections >>=? fun () -> @@ -858,7 +861,7 @@ and swap_ack pool conn new_point _new_peer_id = and swap pool conn current_peer_id new_point = let source_peer_id = P2p_peer_state.Info.peer_id conn.peer_info in pool.latest_accepted_swap <- Time.now () ; - connect ~timeout:10. pool new_point >>= function + connect pool new_point >>= function | Ok _new_conn -> begin pool.latest_succesfull_swap <- Time.now () ; log pool (Swap_success { source = source_peer_id }) ; @@ -891,7 +894,7 @@ let accept pool fd point = P2p_point.Table.add pool.incoming point canceler ; Lwt.async begin fun () -> with_timeout - ~canceler (Lwt_unix.sleep pool.config.authentification_timeout) + ~canceler (Lwt_unix.sleep pool.config.authentication_timeout) (fun canceler -> authenticate pool canceler fd point) end diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index ad3d3fee2..7db6b658b 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -8,19 +8,19 @@ (**************************************************************************) (** Pool of connections. This module manages the connection pool that - the shell needs to maintain in order to function correctly. + the peer-to-peer layer needs to maintain in order to function + correctly. A pool and its connections are parametrized by the type of messages exchanged over the connection and the type of meta-information associated with a peer. The type [('msg, 'meta) - connection] is a wrapper on top of [P2p_connection.t] that adds - meta-information, a data-structure describing a fine-grained state - of the connection, as well as a new message queue (referred to - "app message queue") that will only contain the messages from the - internal [P2p_connection.t] that needs to be examined by the - higher layers. Some messages are directly processed by an internal - worker and thus never propagated above. -*) + connection] is a wrapper on top of [P2p_socket.t] that adds + meta-information, a data-structure describing the detailed state of + the connection, as well as a new message queue (referred to "app + message queue") that will only contain the messages from the + internal [P2p_socket.t] that needs to be examined by the higher + layers. Some messages are directly processed by an internal worker + and thus never propagated above. *) type 'msg encoding = Encoding : { tag: int ; @@ -75,7 +75,10 @@ type config = { Above this number, [accept] will start dropping incoming connections. *) - authentification_timeout : float ; + connection_timeout : float ; + (** Maximum time allowed to the establishment of a connection. *) + + authentication_timeout : float ; (** Delay granted to a peer to perform authentication, in seconds. *) incoming_app_message_queue_size : int option ; @@ -181,11 +184,11 @@ type ('msg, 'meta) connection fine-grained logical state of the connection. *) val connect: - timeout:float -> + ?timeout:float -> ('msg, 'meta) pool -> P2p_point.Id.t -> ('msg, 'meta) connection tzresult Lwt.t -(** [connect ~timeout pool point] tries to add a - connection to [point] in [pool] in less than [timeout] seconds. *) +(** [connect ?timeout pool point] tries to add a connection to [point] + in [pool] in less than [timeout] seconds. *) val accept: ('msg, 'meta) pool -> Lwt_unix.file_descr -> P2p_point.Id.t -> unit diff --git a/src/lib_p2p/p2p_socket.mli b/src/lib_p2p/p2p_socket.mli index 9c852dba2..a3cb36338 100644 --- a/src/lib_p2p/p2p_socket.mli +++ b/src/lib_p2p/p2p_socket.mli @@ -7,7 +7,9 @@ (* *) (**************************************************************************) -(** This modules adds message encoding and encryption to +(** Typed and encrypted connections to peers. + + This modules adds message encoding and encryption to [P2p_io_scheduler]'s generic throttled connections. Each connection have an associated internal read (resp. write) diff --git a/src/lib_p2p/p2p_welcome.ml b/src/lib_p2p/p2p_welcome.ml index 213b7c9e6..b17c47685 100644 --- a/src/lib_p2p/p2p_welcome.ml +++ b/src/lib_p2p/p2p_welcome.ml @@ -47,7 +47,7 @@ let create_listening_socket ~backlog ?(addr = Ipaddr.V6.unspecified) port = Lwt_unix.listen main_socket backlog ; Lwt.return main_socket -let run ~backlog pool ?addr port = +let run ?addr ~backlog pool port = Lwt.catch begin fun () -> create_listening_socket ~backlog ?addr port >>= fun socket -> diff --git a/src/lib_p2p/p2p_welcome.mli b/src/lib_p2p/p2p_welcome.mli index 3f5ff10c8..3e83d6daf 100644 --- a/src/lib_p2p/p2p_welcome.mli +++ b/src/lib_p2p/p2p_welcome.mli @@ -7,19 +7,20 @@ (* *) (**************************************************************************) -(** Welcome worker. Accept incoming connections and add them to its - connection pool. *) +(** Welcome worker. + + Accept incoming connections and add them to the pool. +*) type t -(** Type of a welcome worker, parametrized like a - [P2p_connection_pool.pool]. *) +(** Type of a welcome worker. *) val run: - backlog:int -> - ('msg, 'meta) P2p_pool.t -> - ?addr:P2p_addr.t -> P2p_addr.port -> t Lwt.t -(** [run ~backlog ~addr pool port] returns a running welcome worker - feeding [pool] listening at [(addr, port)]. [backlog] is the - argument passed to [Lwt_unix.accept]. *) + ?addr:P2p_addr.t -> backlog:int -> + ('msg, 'meta) P2p_pool.t -> P2p_addr.port -> t Lwt.t +(** [run ?addr ~backlog pool port] returns a running welcome worker + adding connections into [pool] listening on [addr:port]. [backlog] + is passed to [Lwt_unix.listen]. *) val shutdown: t -> unit Lwt.t +(** [shutdown t] returns when [t] has completed shutdown. *) diff --git a/src/lib_p2p/test/test_p2p_pool.ml b/src/lib_p2p/test/test_p2p_pool.ml index c919f25d9..8fb822fac 100644 --- a/src/lib_p2p/test/test_p2p_pool.ml +++ b/src/lib_p2p/test/test_p2p_pool.ml @@ -69,7 +69,8 @@ let detach_node f points n = min_connections = nb_points ; max_connections = nb_points ; max_incoming_connections = nb_points ; - authentification_timeout = 2. ; + connection_timeout = 10. ; + authentication_timeout = 2. ; incoming_app_message_queue_size = None ; incoming_message_queue_size = None ; outgoing_message_queue_size = None ;