ligo/lib_node_p2p/p2p_connection_pool.mli

357 lines
11 KiB
OCaml
Raw Normal View History

(**************************************************************************)
(* *)
2017-11-14 03:36:14 +04:00
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(** Pool of connections. This module manages the connection pool that
the shell needs to maintain in order to function correctly.
A pool and its connections are parametrized by the type of
messages exchanged over the connection and the type of
meta-information associated with a peer. The type [('msg, 'meta)
connection] is a wrapper on top of [P2p_connection.t] that adds
meta-information, a data-structure describing a fine-grained state
of the connection, as well as a new message queue (referred to
"app message queue") that will only contain the messages from the
internal [P2p_connection.t] that needs to be examined by the
higher layers. Some messages are directly processed by an internal
worker and thus never propagated above.
*)
open P2p_types
open P2p_connection_pool_types
type 'msg encoding = Encoding : {
tag: int ;
encoding: 'a Data_encoding.t ;
wrap: 'a -> 'msg ;
unwrap: 'msg -> 'a option ;
max_length: int option ;
} -> 'msg encoding
(** {1 Pool management} *)
type ('msg, 'meta) t
type ('msg, 'meta) pool = ('msg, 'meta) t
(** The type of a pool of connections, parametrized by resp. the type
of messages and the meta-information associated to an identity. *)
type config = {
identity : Identity.t ;
(** Our identity. *)
proof_of_work_target : Crypto_box.target ;
(** The proof of work target we require from peers. *)
trusted_points : Point.t list ;
(** List of hard-coded known peers to bootstrap the network from. *)
peers_file : string ;
(** The path to the JSON file where the metadata associated to
2017-02-24 06:50:33 +04:00
peer_ids are loaded / stored. *)
closed_network : bool ;
(** If [true], the only accepted connections are from peers whose
addresses are in [trusted_peers]. *)
listening_port : port option ;
(** If provided, it will be passed to [P2p_connection.authenticate]
when we authenticate against a new peer. *)
min_connections : int ;
(** Strict minimum number of connections
(triggers [LogEvent.too_few_connections]). *)
max_connections : int ;
(** Max number of connections. If it's reached, [connect] and
[accept] will fail, i.e. not add more connections
(also triggers [LogEvent.too_many_connections]). *)
max_incoming_connections : int ;
(** Max not-yet-authentified incoming connections.
Above this number, [accept] will start dropping incoming
connections. *)
authentification_timeout : float ;
(** Delay granted to a peer to perform authentication, in seconds. *)
incoming_app_message_queue_size : int option ;
(** Size of the message queue for user messages (messages returned
by this module's [read] function. *)
incoming_message_queue_size : int option ;
(** Size of the incoming message queue internal of a peer's Reader
(See [P2p_connection.accept]). *)
outgoing_message_queue_size : int option ;
(** Size of the outgoing message queue internal to a peer's Writer
(See [P2p_connection.accept]). *)
2017-02-24 06:50:33 +04:00
known_peer_ids_history_size : int ;
(** Size of the known peer_ids log buffer (default: 50) *)
known_points_history_size : int ;
(** Size of the known points log buffer (default: 50) *)
max_known_points : (int * int) option ;
(** Parameters for the the garbage collection of known points. If
None, no garbage collection is performed. Otherwise, the first
integer of the couple limits the size of the "known points"
table. When this number is reached, the table is expurged from
disconnected points, older first, to try to reach the amount of
connections indicated by the second integer. *)
2017-02-24 06:50:33 +04:00
max_known_peer_ids : (int * int) option ;
(** Like [max_known_points], but for known peer_ids. *)
2017-03-14 13:51:44 +04:00
swap_linger : float ;
(** Peer swapping does not occur more than once during a timespan of
[spap_linger] seconds. *)
2017-04-18 20:32:31 +04:00
binary_chunks_size : int option ;
(** Size (in bytes) of binary blocks that are sent to other
peers. Default value is 64 kB. *)
}
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
score : 'meta -> float;
}
type 'msg message_config = {
encoding : 'msg encoding list ;
versions : P2p_types.Version.t list;
}
val create:
config ->
'meta meta_config ->
'msg message_config ->
P2p_io_scheduler.t ->
('msg, 'meta) pool Lwt.t
(** [create config meta_cfg msg_cfg io_sched] is a freshly minted
pool. *)
val destroy: ('msg, 'meta) pool -> unit Lwt.t
(** [destroy pool] returns when member connections are either
disconnected or canceled. *)
val active_connections: ('msg, 'meta) pool -> int
(** [active_connections pool] is the number of connections inside
[pool]. *)
val pool_stat: ('msg, 'meta) pool -> Stat.t
(** [pool_stat pool] is a snapshot of current bandwidth usage for the
entire [pool]. *)
2017-03-14 13:51:44 +04:00
val send_swap_request: ('msg, 'meta) pool -> unit
(** {2 Pool events} *)
2017-03-14 13:51:44 +04:00
module Pool_event : sig
val wait_too_few_connections: ('msg, 'meta) pool -> unit Lwt.t
(** [wait_too_few_connections pool] is determined when the number of
connections drops below the desired level. *)
val wait_too_many_connections: ('msg, 'meta) pool -> unit Lwt.t
(** [wait_too_many_connections pool] is determined when the number of
connections exceeds the desired level. *)
val wait_new_peer: ('msg, 'meta) pool -> unit Lwt.t
(** [wait_new_peer pool] is determined when a new peer
(i.e. authentication successful) gets added to the pool. *)
val wait_new_connection: ('msg, 'meta) pool -> unit Lwt.t
(** [wait_new_connection pool] is determined when a new connection is
succesfully established in the pool. *)
end
(** {1 Connections management} *)
type ('msg, 'meta) connection
(** Type of a connection to a peer, parametrized by the type of
messages exchanged as well as meta-information associated to a
peer. It mostly wraps [P2p_connection.connection], adding
meta-information and data-structures describing a more
fine-grained logical state of the connection. *)
type error += Pending_connection
type error += Connected
type error += Connection_refused
2017-02-24 06:50:33 +04:00
type error += Rejected of Peer_id.t
type error += Too_many_connections
type error += Closed_network
val connect:
timeout:float ->
('msg, 'meta) pool -> Point.t ->
('msg, 'meta) connection tzresult Lwt.t
(** [connect ~timeout pool point] tries to add a
connection to [point] in [pool] in less than [timeout] seconds. *)
val accept:
('msg, 'meta) pool -> Lwt_unix.file_descr -> Point.t -> unit
(** [accept pool fd point] instructs [pool] to start the process of
accepting a connection from [fd]. Used by [P2p]. *)
val disconnect:
?wait:bool -> ('msg, 'meta) connection -> unit Lwt.t
(** [disconnect conn] cleanly closes [conn] and returns after [conn]'s
internal worker has returned. *)
2017-03-14 13:51:44 +04:00
module Connection : sig
2017-03-14 13:51:44 +04:00
val info: ('msg, 'meta) connection -> Connection_info.t
2017-03-14 13:51:44 +04:00
val stat: ('msg, 'meta) connection -> Stat.t
(** [stat conn] is a snapshot of current bandwidth usage for
[conn]. *)
val fold:
('msg, 'meta) pool ->
init:'a ->
f:(Peer_id.t -> ('msg, 'meta) connection -> 'a -> 'a) ->
'a
val list:
('msg, 'meta) pool -> (Peer_id.t * ('msg, 'meta) connection) list
val find_by_point:
('msg, 'meta) pool -> Point.t -> ('msg, 'meta) connection option
val find_by_peer_id:
('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) connection option
end
val on_new_connection:
('msg, 'meta) pool ->
(Peer_id.t -> ('msg, 'meta) connection -> unit) -> unit
(** {1 I/O on connections} *)
type error += Connection_closed
val read: ('msg, 'meta) connection -> 'msg tzresult Lwt.t
(** [read conn] returns a message popped from [conn]'s app message
queue, or fails with [Connection_closed]. *)
val is_readable: ('msg, 'meta) connection -> unit tzresult Lwt.t
(** [is_readable conn] returns when there is at least one message
ready to be read. *)
val write: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t
(** [write conn msg] is [P2p_connection.write conn' msg] where [conn']
is the internal [P2p_connection.t] inside [conn]. *)
val write_sync: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t
(** [write_sync conn msg] is [P2p_connection.write_sync conn' msg]
where [conn'] is the internal [P2p_connection.t] inside [conn]. *)
(**/**)
2017-04-18 20:32:31 +04:00
val raw_write_sync:
('msg, 'meta) connection -> MBytes.t -> unit tzresult Lwt.t
(**/**)
val write_now: ('msg, 'meta) connection -> 'msg -> bool tzresult
(** [write_now conn msg] is [P2p_connection.write_now conn' msg] where
[conn'] is the internal [P2p_connection.t] inside [conn]. *)
(** {2 Broadcast functions} *)
val write_all: ('msg, 'meta) pool -> 'msg -> unit
(** [write_all pool msg] is [write_now conn msg] for all member
connections to [pool] in [Running] state. *)
val broadcast_bootstrap_msg: ('msg, 'meta) pool -> unit
(** [write_all pool msg] is [P2P_connection.write_now conn Bootstrap]
for all member connections to [pool] in [Running] state. *)
2017-02-24 06:50:33 +04:00
(** {1 Functions on [Peer_id]} *)
2017-02-24 06:50:33 +04:00
module Peer_ids : sig
2017-02-24 06:50:33 +04:00
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) Peer_info.t
val info:
2017-02-24 06:50:33 +04:00
('msg, 'meta) pool -> Peer_id.t -> ('msg, 'meta) info option
2017-03-14 13:51:44 +04:00
val get_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta
2017-02-24 06:50:33 +04:00
val set_metadata: ('msg, 'meta) pool -> Peer_id.t -> 'meta -> unit
2017-03-14 13:51:44 +04:00
val get_score: ('msg, 'meta) pool -> Peer_id.t -> float
2017-02-24 06:50:33 +04:00
val get_trusted: ('msg, 'meta) pool -> Peer_id.t -> bool
val set_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit
val unset_trusted: ('msg, 'meta) pool -> Peer_id.t -> unit
val fold_known:
('msg, 'meta) pool ->
init:'a ->
2017-02-24 06:50:33 +04:00
f:(Peer_id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
'a
val fold_connected:
('msg, 'meta) pool ->
init:'a ->
2017-02-24 06:50:33 +04:00
f:(Peer_id.t -> ('msg, 'meta) info -> 'a -> 'a) ->
'a
end
(** {1 Functions on [Points]} *)
module Points : sig
type ('msg, 'meta) info = ('msg, 'meta) connection Point_info.t
val info:
('msg, 'meta) pool -> Point.t -> ('msg, 'meta) info option
val get_trusted: ('msg, 'meta) pool -> Point.t -> bool
val set_trusted: ('msg, 'meta) pool -> Point.t -> unit
val unset_trusted: ('msg, 'meta) pool -> Point.t -> unit
val fold_known:
('msg, 'meta) pool ->
init:'a ->
f:(Point.t -> ('msg, 'meta) info -> 'a -> 'a) ->
'a
val fold_connected:
('msg, 'meta) pool ->
init:'a ->
f:(Point.t -> ('msg, 'meta) info -> 'a -> 'a) ->
'a
end
module Log_event = Connection_pool_log_event
2017-03-14 13:51:44 +04:00
val watch: ('msg, 'meta) pool -> Log_event.t Lwt_stream.t * Lwt_watcher.stopper
2017-03-14 13:51:44 +04:00
(** [watch pool] is a [stream, close] a [stream] of events and a
[close] function for this stream. *)
(**/**)
module Message : sig
type 'msg t =
| Bootstrap
| Advertise of Point.t list
2017-03-14 13:51:44 +04:00
| Swap_request of Point.t * Peer_id.t
| Swap_ack of Point.t * Peer_id.t
| Message of 'msg
| Disconnect
val encoding: 'msg encoding list -> 'msg t Data_encoding.t
end