Shell: implement P2p_{connection_pool,welcome}
This commit is contained in:
node/net/p2p_types.mli \
node/net/p2p_types.mli \
node/net/p2p_io_scheduler.mli \
node/net/p2p_io_scheduler.mli \
node/net/p2p_connection.mli \
node/net/p2p_connection.mli \
node/net/p2p_connection_pool_types.mli \
node/net/p2p_connection_pool.mli \
node/net/p2p_welcome.mli \
node/net/p2p.mli \
node/net/p2p.mli \
node/net/RPC_server.mli \
node/net/RPC_server.mli \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
node/net/ \
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(* TODO check version negotiation *)
(* TODO Test cancelation of a (pending) connection *)
(* TODO do not recompute list_known_points at each requests... but
only once in a while, e.g. every minutes or when a point
or the associated gid is blacklisted. *)
(* TODO allow to track "requested gids" when we reconnect to a point. *)
open P2p_types
open P2p_connection_pool_types
include Logging.Make (struct let name = "p2p.connection-pool" end)
type 'msg encoding = Encoding : {
tag: int ;
encoding: 'a Data_encoding.t ;
wrap: 'a -> 'msg ;
unwrap: 'msg -> 'a option ;
max_length: int option ;
} -> 'msg encoding
module Message = struct
type 'msg t =
| Bootstrap
| Advertise of Point.t list
| Message of 'msg
| Disconnect
let encoding msg_encoding =
let open Data_encoding in
union ~tag_size:`Uint16
([ case ~tag:0x01 null
(function Disconnect -> Some () | _ -> None)
(fun () -> Disconnect);
case ~tag:0x02 null
(function Bootstrap -> Some () | _ -> None)
(fun () -> Bootstrap);
case ~tag:0x03 (Variable.list Point.encoding)
(function Advertise points -> Some points | _ -> None)
(fun points -> Advertise points);
] @
| msg_encoding
~f:(function Encoding { tag ; encoding ; wrap ; unwrap } ->
case ~tag encoding
(function Message msg -> unwrap msg | _ -> None)
(fun msg -> Message (wrap msg))))
module Answerer = struct
type 'msg callback = {
bootstrap: unit -> Point.t list Lwt.t ;
advertise: Point.t list -> unit Lwt.t ;
message: 'msg -> unit Lwt.t ;
type 'msg t = {
canceler: Canceler.t ;
conn: 'msg Message.t P2p_connection.t ;
callback: 'msg callback ;
mutable worker: unit Lwt.t ;
let rec worker_loop st =
Lwt_unix.yield () >>= fun () ->
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
| st.conn
end >>= function
| Ok Bootstrap -> begin
st.callback.bootstrap () >>= function
| [] ->
worker_loop st
| points ->
match P2p_connection.write_now st.conn (Advertise points) with
| Ok _sent ->
(* if not sent then ?? TODO count dropped message ?? *)
worker_loop st
| Error _ ->
Canceler.cancel st.canceler >>= fun () ->
| Ok (Advertise points) ->
st.callback.advertise points >>= fun () ->
worker_loop st
| Ok (Message msg) ->
st.callback.message msg >>= fun () ->
worker_loop st
| Ok Disconnect | Error [P2p_io_scheduler.Connection_closed] ->
Canceler.cancel st.canceler >>= fun () ->
| Error [Lwt_utils.Canceled] ->
| Error err ->
lwt_log_error "@[Answerer unexpected error:@ %a@]"
Error_monad.pp_print_error err >>= fun () ->
Canceler.cancel st.canceler >>= fun () ->
let run conn canceler callback =
let st = {
canceler ; conn ; callback ;
worker = Lwt.return_unit ;
} in
st.worker <-
Lwt_utils.worker "answerer"
(fun () -> worker_loop st)
(fun () -> Canceler.cancel canceler) ;
let shutdown st =
Canceler.cancel st.canceler >>= fun () ->
type config = {
identity : Identity.t ;
proof_of_work_target : ;
trusted_points : Point.t list ;
peers_file : string ;
closed_network : bool ;
listening_port : port option ;
min_connections : int ;
max_connections : int ;
max_incoming_connections : int ;
authentification_timeout : float ;
incoming_app_message_queue_size : int option ;
incoming_message_queue_size : int option ;
outgoing_message_queue_size : int option ;
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
type 'msg message_config = {
encoding : 'msg encoding list ;
versions : P2p_types.Version.t list;
type ('msg, 'meta) t = {
config : config ;
meta_config : 'meta meta_config ;
message_config : 'msg message_config ;
my_id_points : unit Point.Table.t ;
known_gids : (('msg, 'meta) connection, 'meta) Gid_info.t Gid.Table.t ;
connected_gids : (('msg, 'meta) connection, 'meta) Gid_info.t Gid.Table.t ;
known_points : ('msg, 'meta) connection Point_info.t Point.Table.t ;
connected_points : ('msg, 'meta) connection Point_info.t Point.Table.t ;
incoming : Canceler.t Point.Table.t ;
io_sched : P2p_io_scheduler.t ;
encoding : 'msg Message.t Data_encoding.t ;
events : events ;
and events = {
too_few_connections : unit Lwt_condition.t ;
too_many_connections : unit Lwt_condition.t ;
new_point : unit Lwt_condition.t ;
and ('msg, 'meta) connection = {
canceler : Canceler.t ;
messages : 'msg Lwt_pipe.t ;
conn : 'msg Message.t P2p_connection.t ;
gid_info : (('msg, 'meta) connection, 'meta) Gid_info.t ;
point_info : ('msg, 'meta) connection Point_info.t option ;
answerer : 'msg Answerer.t ;
mutable wait_close : bool ;
type ('msg, 'meta) pool = ('msg, 'meta) t
let register_point pool ?trusted (addr, port as point) =
match Point.Table.find pool.known_points point with
| exception Not_found ->
let pi = Point_info.create ?trusted addr port in
Point.Table.add pool.known_points point pi ;
| pi -> pi
let register_peer pool gid =
match Gid.Table.find pool.known_gids gid with
| exception Not_found ->
Lwt_condition.broadcast () ;
let peer = Gid_info.create gid ~metadata:pool.meta_config.initial in
Gid.Table.add pool.known_gids gid peer ;
| peer -> peer
let register_new_point pool _gid point =
if not (Point.Table.mem pool.my_id_points point) then
ignore (register_point pool point)
let register_new_points pool gid points =
List.iter (register_new_point pool gid) points ;
let compare_known_point_info p1 p2 =
(* The most-recently disconnected peers are greater. *)
(* Then come long-standing connected peers. *)
let disconnected1 = Point_info.State.is_disconnected p1
and disconnected2 = Point_info.State.is_disconnected p2 in
let compare_last_seen p1 p2 =
match Point_info.last_seen p1, Point_info.last_seen p2 with
| None, None -> 2 * 2 - 1 (* HACK... *)
| Some _, None -> 1
| None, Some _ -> -1
| Some (_, time1), Some (_, time2) ->
match compare time1 time2 with
| 0 -> 2 * 2 - 1 (* HACK... *)
| x -> x in
match disconnected1, disconnected2 with
| false, false -> compare_last_seen p1 p2
| false, true -> -1
| true, false -> 1
| true, true -> compare_last_seen p2 p1
let list_known_points pool _gid () =
let knowns =
Point.Table.fold (fun _ pi acc -> pi :: acc) pool.known_points [] in
let best_knowns =
Utils.take_n ~compare:compare_known_point_info 50 knowns in
Lwt.return ( Point_info.point best_knowns)
let active_connections pool = Gid.Table.length pool.connected_gids
let create_connection pool conn id_point pi gi =
let gid = Gid_info.gid gi in
let canceler = Canceler.create () in
let messages =
Lwt_pipe.create ?size:pool.config.incoming_app_message_queue_size () in
let callback =
{ Answerer.message = Lwt_pipe.push messages ;
advertise = register_new_points pool gid ;
bootstrap = list_known_points pool gid ;
} in
let answerer = conn canceler callback in
let conn =
{ conn ; point_info = pi ; gid_info = gi ;
messages ; canceler ; answerer ; wait_close = false } in
iter_option pi ~f:begin fun pi ->
Point_info.State.set_running pi gid conn ;
Point.Table.add pool.connected_points (Point_info.point pi) pi ;
end ;
Gid_info.State.set_running gi id_point conn ;
Gid.Table.add pool.connected_gids gid gi ;
Canceler.on_cancel canceler begin fun () ->
lwt_debug "Disconnect: %a (%a)"
Gid.pp gid Id_point.pp id_point >>= fun () ->
iter_option ~f:Point_info.State.set_disconnected pi;
Gid_info.State.set_disconnected gi ;
iter_option pi ~f:begin fun pi ->
Point.Table.remove pool.connected_points (Point_info.point pi) ;
end ;
Gid.Table.remove pool.connected_gids gid ;
if pool.config.max_connections <= active_connections pool then
Lwt_condition.broadcast () ;
P2p_connection.close ~wait:conn.wait_close conn.conn
end ;
if active_connections pool < pool.config.min_connections then
Lwt_condition.broadcast () ;
let disconnect ?(wait = false) conn =
conn.wait_close <- wait ;
Canceler.cancel conn.canceler >>= fun () ->
type error += Rejected of Gid.t
type error += Unexpected_point_state
let may_register_my_id_point pool = function
| [P2p_connection.Myself (addr, Some port)] ->
Point.Table.add pool.my_id_points (addr, port) () ;
Point.Table.remove pool.known_points (addr, port)
| _ -> ()
let authenticate pool ?pi canceler fd point =
let incoming = pi = None in
lwt_debug "authenticate: %a%s"
Point.pp point
(if incoming then " incoming" else "") >>= fun () ->
Lwt_utils.protect ~canceler begin fun () ->
~incoming (P2p_io_scheduler.register pool.io_sched fd) point
pool.config.identity pool.message_config.versions
end ~on_error: begin fun err ->
(* TODO do something when the error is Not_enough_proof_of_work ?? *)
lwt_debug "authenticate: %a%s -> failed %a"
Point.pp point
(if incoming then " incoming" else "")
pp_print_error err >>= fun () ->
may_register_my_id_point pool err ;
if incoming then
Point.Table.remove pool.incoming point
iter_option Point_info.State.set_disconnected pi ;
Lwt.return (Error err)
end >>=? fun (info, auth_fd) ->
lwt_debug "authenticate: %a -> auth %a"
Point.pp point
Connection_info.pp info >>= fun () ->
let remote_pi =
match info.id_point with
| addr, Some port
when not (Point.Table.mem pool.my_id_points (addr, port)) ->
Some (register_point pool (addr, port))
| _ -> None in
let connection_pi =
match pi, remote_pi with
| None, None -> None
| Some _ as pi, _ | _, (Some _ as pi) -> pi in
let gi = register_peer pool info.gid in
let acceptable_point =
unopt_map connection_pi
~default:(not pool.config.closed_network)
~f:begin fun connection_pi ->
match Point_info.State.get connection_pi with
| Requested _ -> not incoming
| Disconnected ->
not pool.config.closed_network
|| Point_info.trusted connection_pi
| Accepted _ | Running _ -> false
let acceptable_gid =
match Gid_info.State.get gi with
| Accepted _ ->
(* TODO: in some circumstances cancel and accept... *)
| Running _ -> false
| Disconnected -> true
if incoming then Point.Table.remove pool.incoming point ;
if not acceptable_gid || not acceptable_point then begin
lwt_debug "authenticate: %a -> kick %a point: %B gid: %B"
Point.pp point
Connection_info.pp info
acceptable_point acceptable_gid >>= fun () ->
P2p_connection.kick auth_fd >>= fun () ->
if not incoming then begin
iter_option ~f:Point_info.State.set_disconnected pi ;
(* FIXME Gid_info.State.set_disconnected ~requested:true gi ; *)
end ;
fail (Rejected info.gid)
end else begin
iter_option connection_pi
~f:(fun pi -> Point_info.State.set_accepted pi info.gid canceler) ;
Gid_info.State.set_accepted gi info.id_point canceler ;
lwt_debug "authenticate: %a -> accept %a"
Point.pp point
Connection_info.pp info >>= fun () ->
Lwt_utils.protect ~canceler begin fun () ->
auth_fd pool.encoding >>= fun conn ->
lwt_debug "authenticate: %a -> Connected %a"
Point.pp point
Connection_info.pp info >>= fun () ->
Lwt.return conn
end ~on_error: begin fun err ->
lwt_debug "authenticate: %a -> rejected %a"
Point.pp point
Connection_info.pp info >>= fun () ->
iter_option connection_pi ~f:Point_info.State.set_disconnected;
Gid_info.State.set_disconnected gi ;
Lwt.return (Error err)
end >>=? fun conn ->
let id_point =
match info.id_point, map_option Point_info.point pi with
| (addr, _), Some (_, port) -> addr, Some port
| id_point, None -> id_point in
return (create_connection pool conn id_point connection_pi gi)
type error += Pending_connection
type error += Connected
type error += Connection_closed = P2p_io_scheduler.Connection_closed
type error += Connection_refused
type error += Closed_network
let fail_unless_disconnected_point pi =
match Point_info.State.get pi with
| Disconnected -> return ()
| Requested _ | Accepted _ -> fail Pending_connection
| Running _ -> fail Connected
let fail_unless_disconnected_gid gi =
match Gid_info.State.get gi with
| Disconnected -> return ()
| Accepted _ -> fail Pending_connection
| Running _ -> fail Connected
let raw_connect canceler pool point =
let pi = register_point pool point in
let addr, port as point = Point_info.point pi in
(not pool.config.closed_network || Point_info.trusted pi)
Closed_network >>=? fun () ->
fail_unless_disconnected_point pi >>=? fun () ->
Point_info.State.set_requested pi canceler ;
let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in
let uaddr =
Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in
lwt_debug "connect: %a" Point.pp point >>= fun () ->
Lwt_utils.protect ~canceler begin fun () ->
Lwt_unix.connect fd uaddr >>= fun () ->
return ()
end ~on_error: begin fun err ->
lwt_debug "connect: %a -> disconnect" Point.pp point >>= fun () ->
Point_info.State.set_disconnected pi ;
match err with
| [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] ->
fail Connection_refused
| err -> Lwt.return (Error err)
end >>=? fun () ->
lwt_debug "connect: %a -> authenticate" Point.pp point >>= fun () ->
authenticate pool ~pi canceler fd point
type error += Too_many_connections
let connect ~timeout pool point =
(active_connections pool <= pool.config.max_connections)
Too_many_connections >>=? fun () ->
let canceler = Canceler.create () in
Lwt_utils.with_timeout ~canceler timeout begin fun canceler ->
raw_connect canceler pool point
let accept pool fd point =
if pool.config.max_incoming_connections <= Point.Table.length pool.incoming
|| pool.config.max_connections <= active_connections pool then
Lwt.async (fun () -> Lwt_utils.safe_close fd)
let canceler = Canceler.create () in
Point.Table.add pool.incoming point canceler ;
Lwt.async begin fun () ->
~canceler pool.config.authentification_timeout
(fun canceler -> authenticate pool canceler fd point)
let read { messages } =
(fun () -> Lwt_pipe.pop messages >>= return)
(fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed)
let is_readable { messages } =
(fun () -> Lwt_pipe.values_available messages >>= return)
(fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed)
let write { conn } msg =
P2p_connection.write conn (Message msg)
let write_sync { conn } msg =
P2p_connection.write_sync conn (Message msg)
let write_now { conn } msg =
P2p_connection.write_now conn (Message msg)
let write_all pool msg =
(fun _gid gi ->
match Gid_info.State.get gi with
| Running { data = conn } ->
ignore (write_now conn msg : bool tzresult )
| _ -> ())
let broadcast_bootstrap_msg pool =
(fun _gid gi ->
match Gid_info.State.get gi with
| Running { data = { conn } } ->
ignore (P2p_connection.write_now conn Bootstrap : bool tzresult )
| _ -> ())
module Gids = struct
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) Gid_info.t
let info { known_gids } point =
try Some (Gid.Table.find known_gids point)
with Not_found -> None
let get_metadata pool gid =
try Some (Gid_info.metadata (Gid.Table.find pool.known_gids gid))
with Not_found -> None
let set_metadata pool gid data =
Gid_info.set_metadata (register_peer pool gid) data
let get_trusted pool gid =
try Gid_info.trusted (Gid.Table.find pool.known_gids gid)
with Not_found -> false
let set_trusted pool gid =
try Gid_info.set_trusted (register_peer pool gid)
with Not_found -> ()
let unset_trusted pool gid =
try Gid_info.unset_trusted (Gid.Table.find pool.known_gids gid)
with Not_found -> ()
let find_connection pool gid =
(info pool gid)
~f:(fun p ->
match Gid_info.State.get p with
| Running { data } -> Some data
| _ -> None)
let fold_known pool ~init ~f =
Gid.Table.fold f pool.known_gids init
let fold_connected pool ~init ~f =
Gid.Table.fold f pool.connected_gids init
let fold_connections pool ~init ~f =
Gids.fold_connected pool ~init ~f:begin fun gid gi acc ->
match Gid_info.State.get gi with
| Running { data } -> f gid data acc
| _ -> acc
module Points = struct
type ('msg, 'meta) info = ('msg, 'meta) connection Point_info.t
let info { known_points } point =
try Some (Point.Table.find known_points point)
with Not_found -> None
let get_trusted pool gid =
try Point_info.trusted (Point.Table.find pool.known_points gid)
with Not_found -> false
let set_trusted pool gid =
try Point_info.set_trusted (register_point pool gid)
with Not_found -> ()
let unset_trusted pool gid =
try Point_info.unset_trusted (Point.Table.find pool.known_points gid)
with Not_found -> ()
let find_connection pool point =
(info pool point)
~f:(fun p ->
match Point_info.State.get p with
| Running { data } -> Some data
| _ -> None)
let fold_known pool ~init ~f =
Point.Table.fold f pool.known_points init
let fold_connected pool ~init ~f =
Point.Table.fold f pool.connected_points init
module Events = struct
let too_few_connections pool =
let too_many_connections pool =
let new_point pool =
let connection_stat { conn } =
P2p_connection.stat conn
let pool_stat { io_sched } =
P2p_io_scheduler.global_stat io_sched
let connection_info { conn } =
| conn
let create config meta_config message_config io_sched =
let events = {
too_few_connections = Lwt_condition.create () ;
too_many_connections = Lwt_condition.create () ;
new_point = Lwt_condition.create () ;
} in
let pool = {
config ; meta_config ; message_config ;
my_id_points = Point.Table.create 7 ;
known_gids = Gid.Table.create 53 ;
connected_gids = Gid.Table.create 53 ;
known_points = Point.Table.create 53 ;
connected_points = Point.Table.create 53 ;
incoming = Point.Table.create 53 ;
io_sched ;
encoding = Message.encoding message_config.encoding ;
events ;
} in
List.iter (Points.set_trusted pool) config.trusted_points ;
(fun () ->
Gid_info.File.load config.peers_file meta_config.encoding)
(fun _ ->
(* TODO log error *)
Lwt.return_nil) >>= fun gids ->
(fun gi -> Gid.Table.add pool.known_gids (Gid_info.gid gi) gi)
gids ;
Lwt.return pool
let destroy pool =
Point.Table.fold (fun _point pi acc ->
match Point_info.State.get pi with
| Requested { cancel } | Accepted { cancel } ->
Canceler.cancel cancel >>= fun () -> acc
| Running { data = conn } ->
disconnect conn >>= fun () -> acc
| Disconnected -> acc)
pool.known_points @@
Gid.Table.fold (fun _gid gi acc ->
match Gid_info.State.get gi with
| Accepted { cancel } ->
Canceler.cancel cancel >>= fun () -> acc
| Running { data = conn } ->
disconnect conn >>= fun () -> acc
| Disconnected -> acc)
pool.known_gids @@
Point.Table.fold (fun _point canceler acc ->
Canceler.cancel canceler >>= fun () -> acc)
pool.incoming Lwt.return_unit
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(** Pool of connections. This module manages the connection pool that
the shell needs to maintain in order to function correctly.
A pool and its connections are parametrized by the type of
messages exchanged over the connection and the type of
meta-information associated with a peer. The type [('msg, 'meta)
connection] is a wrapper on top of [P2p_connection.t] that adds
meta-information, a data-structure describing a fine-grained state
of the connection, as well as a new message queue (referred to
"app message queue") that will only contain the messages from the
internal [P2p_connection.t] that needs to be examined by the
higher layers. Some messages are directly processed by an internal
worker and thus never propagated above.
open P2p_types
open P2p_connection_pool_types
type 'msg encoding = Encoding : {
tag: int ;
encoding: 'a Data_encoding.t ;
wrap: 'a -> 'msg ;
unwrap: 'msg -> 'a option ;
max_length: int option ;
} -> 'msg encoding
(** {1 Pool management} *)
type ('msg, 'meta) t
type ('msg, 'meta) pool = ('msg, 'meta) t
(** The type of a pool of connections, parametrized by resp. the type
of messages and the meta-information associated to an identity. *)
type config = {
identity : Identity.t ;
(** Our identity. *)
proof_of_work_target : ;
(** The proof of work target we require from peers. *)
trusted_points : Point.t list ;
(** List of hard-coded known peers to bootstrap the network from. *)
peers_file : string ;
(** The path to the JSON file where the metadata associated to
gids are loaded / stored. *)
closed_network : bool ;
(** If [true], the only accepted connections are from peers whose
addresses are in [trusted_peers]. *)
listening_port : port option ;
(** If provided, it will be passed to [P2p_connection.authenticate]
when we authenticate against a new peer. *)
min_connections : int ;
(** Strict minimum number of connections
(triggers [Event.too_few_connections]). *)
max_connections : int ;
(** Max number of connections. If it's reached, [connect] and
[accept] will fail, i.e. not add more connections
(also triggers [Event.too_many_connections]). *)
max_incoming_connections : int ;
(** Max not-yet-authentified incoming connections.
Above this number, [accept] will start dropping incoming
connections. *)
authentification_timeout : float ;
(** Delay granted to a peer to perform authentication, in seconds. *)
incoming_app_message_queue_size : int option ;
(** Size of the message queue for user messages (messages returned
by this module's [read] function. *)
incoming_message_queue_size : int option ;
(** Size of the incoming message queue internal of a peer's Reader
(See [P2p_connection.accept]). *)
outgoing_message_queue_size : int option ;
(** Size of the outgoing message queue internal to a peer's Writer
(See [P2p_connection.accept]). *)
type 'meta meta_config = {
encoding : 'meta Data_encoding.t;
initial : 'meta;
type 'msg message_config = {
encoding : 'msg encoding list ;
versions : P2p_types.Version.t list;
val create:
config ->
'meta meta_config ->
'msg message_config ->
P2p_io_scheduler.t ->
('msg, 'meta) pool Lwt.t
(** [create config meta_cfg msg_cfg io_sched] is a freshly minted
pool. *)
val destroy: ('msg, 'meta) pool -> unit Lwt.t
(** [destroy pool] returns when member connections are either
disconnected or canceled. *)
val active_connections: ('msg, 'meta) pool -> int
(** [active_connections pool] is the number of connections inside
[pool]. *)
val pool_stat: ('msg, 'meta) pool -> Stat.t
(** [pool_stat pool] is a snapshot of current bandwidth usage for the
entire [pool]. *)
(** {2 Pool events} *)
module Events : sig
val too_few_connections: ('msg, 'meta) pool -> unit Lwt.t
val too_many_connections: ('msg, 'meta) pool -> unit Lwt.t
val new_point: ('msg, 'meta) pool -> unit Lwt.t
(** {1 Connections management} *)
type ('msg, 'meta) connection
(** Type of a connection to a peer, parametrized by the type of
messages exchanged as well as meta-information associated to a
peer. It mostly wraps [P2p_connection.connection], adding
meta-information and data-structures describing a more
fine-grained logical state of the connection. *)
type error += Pending_connection
type error += Connected
type error += Connection_refused
type error += Rejected of Gid.t
type error += Too_many_connections
type error += Closed_network
val connect:
timeout:float ->
('msg, 'meta) pool -> Point.t ->
('msg, 'meta) connection tzresult Lwt.t
(** [connect ~timeout pool point] tries to add a
connection to [point] in [pool] in less than [timeout] seconds. *)
val accept:
('msg, 'meta) pool -> Lwt_unix.file_descr -> Point.t -> unit
(** [accept pool fd point] instructs [pool] to start the process of
accepting a connection from [fd]. Used by [P2p]. *)
val disconnect:
?wait:bool -> ('msg, 'meta) connection -> unit Lwt.t
(** [disconnect conn] cleanly closes [conn] and returns after [conn]'s
internal worker has returned. *)
val connection_info: ('msg, 'meta) connection -> Connection_info.t
val connection_stat: ('msg, 'meta) connection -> Stat.t
(** [stat conn] is a snapshot of current bandwidth usage for
[conn]. *)
val fold_connections:
('msg, 'meta) pool ->
init:'a ->
f:(Gid.t -> ('msg, 'meta) connection -> 'a -> 'a) ->
(** {1 I/O on connections} *)
type error += Connection_closed
val read: ('msg, 'meta) connection -> 'msg tzresult Lwt.t
(** [read conn] returns a message popped from [conn]'s app message
queue, or fails with [Connection_closed]. *)
val is_readable: ('msg, 'meta) connection -> unit tzresult Lwt.t
(** [is_readable conn] returns when there is at least one message
ready to be read. *)
val write: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t
(** [write conn msg] is [P2p_connection.write conn' msg] where [conn']
is the internal [P2p_connection.t] inside [conn]. *)
val write_sync: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t
(** [write_sync conn msg] is [P2p_connection.write_sync conn' msg]
where [conn'] is the internal [P2p_connection.t] inside [conn]. *)
val write_now: ('msg, 'meta) connection -> 'msg -> bool tzresult
(** [write_now conn msg] is [P2p_connection.write_now conn' msg] where
[conn'] is the internal [P2p_connection.t] inside [conn]. *)
(** {2 Broadcast functions} *)
val write_all: ('msg, 'meta) pool -> 'msg -> unit
(** [write_all pool msg] is [write_now conn msg] for all member
connections to [pool] in [Running] state. *)
val broadcast_bootstrap_msg: ('msg, 'meta) pool -> unit
(** [write_all pool msg] is [P2P_connection.write_now conn Bootstrap]
for all member connections to [pool] in [Running] state. *)
(** {1 Functions on [Gid]} *)
module Gids : sig
type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) Gid_info.t
val info:
('msg, 'meta) pool -> Gid.t -> ('msg, 'meta) info option
val get_metadata: ('msg, 'meta) pool -> Gid.t -> 'meta option
val set_metadata: ('msg, 'meta) pool -> Gid.t -> 'meta -> unit
val get_trusted: ('msg, 'meta) pool -> Gid.t -> bool
val set_trusted: ('msg, 'meta) pool -> Gid.t -> unit
val unset_trusted: ('msg, 'meta) pool -> Gid.t -> unit
val find_connection:
('msg, 'meta) pool -> Gid.t -> ('msg, 'meta) connection option
val fold_known:
('msg, 'meta) pool ->
init:'a ->
f:(Gid.t -> ('msg, 'meta) info -> 'a -> 'a) ->
val fold_connected:
('msg, 'meta) pool ->
init:'a ->
f:(Gid.t -> ('msg, 'meta) info -> 'a -> 'a) ->
(** {1 Functions on [Points]} *)
module Points : sig
type ('msg, 'meta) info = ('msg, 'meta) connection Point_info.t
val info:
('msg, 'meta) pool -> Point.t -> ('msg, 'meta) info option
val get_trusted: ('msg, 'meta) pool -> Point.t -> bool
val set_trusted: ('msg, 'meta) pool -> Point.t -> unit
val unset_trusted: ('msg, 'meta) pool -> Point.t -> unit
val find_connection:
('msg, 'meta) pool -> Point.t -> ('msg, 'meta) connection option
val fold_known:
('msg, 'meta) pool ->
init:'a ->
f:(Point.t -> ('msg, 'meta) info -> 'a -> 'a) ->
val fold_connected:
('msg, 'meta) pool ->
init:'a ->
f:(Point.t -> ('msg, 'meta) info -> 'a -> 'a) ->
module Message : sig
type 'msg t =
| Bootstrap
| Advertise of Point.t list
| Message of 'msg
| Disconnect
val encoding: 'msg encoding list -> 'msg t Data_encoding.t
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
open P2p_types
module Point_info = struct
type 'data state =
| Requested of { cancel: Canceler.t }
| Accepted of { current_gid: Gid.t ;
cancel: Canceler.t }
| Running of { data: 'data ;
current_gid: Gid.t }
| Disconnected
module Event = struct
type kind =
| Outgoing_request
| Accepting_request of Gid.t
| Rejecting_request of Gid.t
| Request_rejected of Gid.t option
| Connection_established of Gid.t
| Disconnection of Gid.t
| External_disconnection of Gid.t
type t = {
kind : kind ;
timestamp : Time.t ;
type greylisting_config = {
factor: float ;
initial_delay: int ;
disconnection_delay: int ;
type 'data t = {
point : Point.t ;
mutable trusted : bool ;
mutable state : 'data state ;
mutable last_failed_connection : Time.t option ;
mutable last_rejected_connection : (Gid.t * Time.t) option ;
mutable last_established_connection : (Gid.t * Time.t) option ;
mutable last_disconnection : (Gid.t * Time.t) option ;
greylisting : greylisting_config ;
mutable greylisting_delay : float ;
mutable greylisting_end : Time.t ;
events : Event.t Ring.t ;
type 'data point_info = 'data t
let compare pi1 pi2 = pi1.point pi2.point
let log_size = 100
let default_greylisting_config = {
factor = 1.2 ;
initial_delay = 1 ;
disconnection_delay = 60 ;
let create
?(trusted = false)
?(greylisting_config = default_greylisting_config) addr port = {
point = (addr, port) ;
trusted ;
state = Disconnected ;
last_failed_connection = None ;
last_rejected_connection = None ;
last_established_connection = None ;
last_disconnection = None ;
events = Ring.create log_size ;
greylisting = greylisting_config ;
greylisting_delay = 1. ;
greylisting_end = () ;
let point s = s.point
let trusted s = s.trusted
let set_trusted gi = gi.trusted <- true
let unset_trusted gi = gi.trusted <- false
let last_established_connection s = s.last_established_connection
let last_disconnection s = s.last_disconnection
let last_failed_connection s = s.last_failed_connection
let last_rejected_connection s = s.last_rejected_connection
let greylisted ?(now = ()) s =
| now s.greylisting_end <= 0
let recent a1 a2 =
match a1, a2 with
| (None, None) -> None
| (None, (Some _ as a))
| (Some _ as a, None) -> a
| (Some (_, t1), Some (_, t2)) ->
if t1 t2 < 0 then a2 else a1
let last_seen s =
recent s.last_rejected_connection
(recent s.last_established_connection s.last_disconnection)
let last_miss s =
(map_option ~f:(fun (_, time) -> time) @@
recent s.last_rejected_connection s.last_disconnection) with
| (None, None) -> None
| (None, (Some _ as a))
| (Some _ as a, None) -> a
| (Some t1 as a1 , (Some t2 as a2)) ->
if t1 t2 < 0 then a2 else a1
let fold_events { events } ~init ~f = Ring.fold events ~init ~f
let log { events } ?(timestamp = ()) kind =
Ring.add events { kind ; timestamp }
let log_incoming_rejection ?timestamp point_info gid =
log point_info ?timestamp (Rejecting_request gid)
module State = struct
type 'data t = 'data state =
| Requested of { cancel: Canceler.t }
| Accepted of { current_gid: Gid.t ;
cancel: Canceler.t }
| Running of { data: 'data ;
current_gid: Gid.t }
| Disconnected
type 'data state = 'data t
let pp ppf = function
| Requested _ ->
Format.fprintf ppf "requested"
| Accepted { current_gid } ->
Format.fprintf ppf "accepted %a" Gid.pp current_gid
| Running { current_gid } ->
Format.fprintf ppf "running %a" Gid.pp current_gid
| Disconnected ->
Format.fprintf ppf "disconnected"
let get { state } = state
let is_disconnected { state } =
match state with
| Disconnected -> true
| Requested _ | Accepted _ | Running _ -> false
let set_requested ?timestamp point_info cancel =
assert begin
match point_info.state with
| Requested _ -> true
| Accepted _ | Running _ -> false
| Disconnected -> true
end ;
point_info.state <- Requested { cancel } ;
log point_info ?timestamp Outgoing_request
let set_accepted
?(timestamp = ())
point_info current_gid cancel =
(* log_notice "SET_ACCEPTED %a@." Point.pp point_info.point ; *)
assert begin
match point_info.state with
| Accepted _ | Running _ -> false
| Requested _ | Disconnected -> true
end ;
point_info.state <- Accepted { current_gid ; cancel } ;
log point_info ~timestamp (Accepting_request current_gid)
let set_running
?(timestamp = ())
point_info gid data =
assert begin
match point_info.state with
| Disconnected -> true (* request to unknown gid. *)
| Running _ -> false
| Accepted { current_gid } -> Gid.equal gid current_gid
| Requested _ -> true
end ;
point_info.state <- Running { data ; current_gid = gid } ;
point_info.last_established_connection <- Some (gid, timestamp) ;
log point_info ~timestamp (Connection_established gid)
let set_greylisted timestamp point_info =
point_info.greylisting_end <-
(Int64.of_float point_info.greylisting_delay) ;
point_info.greylisting_delay <-
point_info.greylisting_delay *. point_info.greylisting.factor
let set_disconnected
?(timestamp = ()) ?(requested = false) point_info =
let event : Event.kind =
match point_info.state with
| Requested _ ->
set_greylisted timestamp point_info ;
point_info.last_failed_connection <- Some timestamp ;
Request_rejected None
| Accepted { current_gid } ->
set_greylisted timestamp point_info ;
point_info.last_rejected_connection <-
Some (current_gid, timestamp) ;
Request_rejected (Some current_gid)
| Running { current_gid } ->
point_info.greylisting_delay <-
float_of_int point_info.greylisting.initial_delay ;
point_info.greylisting_end <-
Time.add timestamp
(Int64.of_int point_info.greylisting.disconnection_delay) ;
point_info.last_disconnection <- Some (current_gid, timestamp) ;
if requested
then Disconnection current_gid
else External_disconnection current_gid
| Disconnected ->
assert false
point_info.state <- Disconnected ;
log point_info ~timestamp event
module Gid_info = struct
type 'data state =
| Accepted of { current_point: Id_point.t ;
cancel: Canceler.t }
| Running of { data: 'data ;
current_point: Id_point.t }
| Disconnected
module Event = struct
type kind =
| Accepting_request
| Rejecting_request
| Request_rejected
| Connection_established
| Disconnection
| External_disconnection
let kind_encoding =
let open Data_encoding in
Data_encoding.string_enum [
"incoming_request", Accepting_request ;
"rejecting_request", Rejecting_request ;
"request_rejected", Request_rejected ;
"connection_established", Connection_established ;
"disconnection", Disconnection ;
"external_disconnection", External_disconnection ;
type t = {
kind : kind ;
timestamp : Time.t ;
point : Id_point.t ;
let encoding =
let open Data_encoding in
(fun { kind ; timestamp ; point = (addr, port) } ->
(kind, timestamp, Ipaddr.V6.to_string addr, port))
(fun (kind, timestamp, addr, port) ->
let addr = Ipaddr.V6.of_string_exn addr in
{ kind ; timestamp ; point = (addr, port) })
(req "kind" kind_encoding)
(req "timestamp" Time.encoding)
(req "addr" string)
(opt "port" int16))
type ('conn, 'meta) t = {
gid : Gid.t ;
mutable state : 'conn state ;
mutable metadata : 'meta ;
mutable trusted : bool ;
mutable last_failed_connection : (Id_point.t * Time.t) option ;
mutable last_rejected_connection : (Id_point.t * Time.t) option ;
mutable last_established_connection : (Id_point.t * Time.t) option ;
mutable last_disconnection : (Id_point.t * Time.t) option ;
events : Event.t Ring.t ;
type ('conn, 'meta) gid_info = ('conn, 'meta) t
let compare gi1 gi2 = gi1.gid gi2.gid
let log_size = 100
let create ?(trusted = false) ~metadata gid =
{ gid ;
state = Disconnected ;
metadata ;
trusted ;
events = Ring.create log_size ;
last_failed_connection = None ;
last_rejected_connection = None ;
last_established_connection = None ;
last_disconnection = None ;
let encoding metadata_encoding =
let open Data_encoding in
(fun { gid ; trusted ; metadata ; events ;
last_failed_connection ; last_rejected_connection ;
last_established_connection ; last_disconnection } ->
(gid, trusted, metadata, Ring.elements events,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection))
(fun (gid, trusted, metadata, event_list,
last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection) ->
let info = create ~trusted ~metadata gid in
let events = Ring.create log_size in
Ring.add_list event_list ;
{ state = Disconnected ;
trusted ; gid ; metadata ; events ;
last_failed_connection ;
last_rejected_connection ;
last_established_connection ;
last_disconnection ;
(req "gid" Gid.encoding)
(dft "trusted" bool false)
(req "metadata" metadata_encoding)
(dft "events" (list Event.encoding) [])
(opt "last_failed_connection"
(tup2 Id_point.encoding Time.encoding))
(opt "last_rejected_connection"
(tup2 Id_point.encoding Time.encoding))
(opt "last_established_connection"
(tup2 Id_point.encoding Time.encoding))
(opt "last_disconnection"
(tup2 Id_point.encoding Time.encoding)))
let gid { gid } = gid
let metadata { metadata } = metadata
let set_metadata gi metadata = gi.metadata <- metadata
let trusted { trusted } = trusted
let set_trusted gi = gi.trusted <- true
let unset_trusted gi = gi.trusted <- false
let fold_events { events } ~init ~f = Ring.fold events ~init ~f
let last_established_connection s = s.last_established_connection
let last_disconnection s = s.last_disconnection
let last_failed_connection s = s.last_failed_connection
let last_rejected_connection s = s.last_rejected_connection
let recent = Point_info.recent
let last_seen s =
(recent s.last_rejected_connection s.last_disconnection)
let last_miss s =
(recent s.last_rejected_connection s.last_disconnection)
let log { events } ?(timestamp = ()) point kind =
Ring.add events { kind ; timestamp ; point }
let log_incoming_rejection ?timestamp gid_info point =
log gid_info ?timestamp point Rejecting_request
module State = struct
type 'data t = 'data state =
| Accepted of { current_point: Id_point.t ;
cancel: Canceler.t }
| Running of { data: 'data ;
current_point: Id_point.t }
| Disconnected
type 'data state = 'data t
let pp ppf = function
| Accepted { current_point } ->
Format.fprintf ppf "accepted %a" Id_point.pp current_point
| Running { current_point } ->
Format.fprintf ppf "running %a" Id_point.pp current_point
| Disconnected ->
Format.fprintf ppf "disconnected"
let get { state } = state
let is_disconnected { state } =
match state with
| Disconnected -> true
| Accepted _ | Running _ -> false
let set_accepted
?(timestamp = ())
gid_info current_point cancel =
assert begin
match gid_info.state with
| Accepted _ | Running _ -> false
| Disconnected -> true
end ;
gid_info.state <- Accepted { current_point ; cancel } ;
log gid_info ~timestamp current_point Accepting_request
let set_running
?(timestamp = ())
gid_info point data =
assert begin
match gid_info.state with
| Disconnected -> true (* request to unknown gid. *)
| Running _ -> false
| Accepted { current_point } ->
Id_point.equal point current_point
end ;
gid_info.state <- Running { data ; current_point = point } ;
gid_info.last_established_connection <- Some (point, timestamp) ;
log gid_info ~timestamp point Connection_established
let set_disconnected
?(timestamp = ()) ?(requested = false) gid_info =
let current_point, (event : Event.kind) =
match gid_info.state with
| Accepted { current_point } ->
gid_info.last_rejected_connection <-
Some (current_point, timestamp) ;
current_point, Request_rejected
| Running { current_point } ->
gid_info.last_disconnection <-
Some (current_point, timestamp) ;
if requested then Disconnection else External_disconnection
| Disconnected -> assert false
gid_info.state <- Disconnected ;
log gid_info ~timestamp current_point event
module File = struct
let load path metadata_encoding =
let enc = Data_encoding.list (encoding metadata_encoding) in
Data_encoding_ezjsonm.read_file path >|=
map_option ~f:(Data_encoding.Json.destruct enc) >|=
unopt []
let save path metadata_encoding peers =
let open Data_encoding in
Data_encoding_ezjsonm.write_file path @@
Json.construct (list (encoding metadata_encoding)) peers
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
open P2p_types
module Point_info : sig
type 'conn t
type 'conn point_info = 'conn t
(** Type of info associated to a point. *)
val compare : 'conn point_info -> 'conn point_info -> int
type greylisting_config = {
factor: float ;
initial_delay: int ;
disconnection_delay: int ;
val create :
?trusted:bool ->
?greylisting_config:greylisting_config ->
addr -> port -> 'conn point_info
(** [create ~trusted addr port] is a freshly minted point_info. If
[trusted] is true, this point is considered trusted and will
be treated as such. *)
val trusted : 'conn point_info -> bool
(** [trusted pi] is [true] iff [pi] has is trusted,
i.e. "whitelisted". *)
val set_trusted : 'conn point_info -> unit
val unset_trusted : 'conn point_info -> unit
val last_failed_connection :
'conn point_info -> Time.t option
val last_rejected_connection :
'conn point_info -> (Gid.t * Time.t) option
val last_established_connection :
'conn point_info -> (Gid.t * Time.t) option
val last_disconnection :
'conn point_info -> (Gid.t * Time.t) option
val last_seen :
'conn point_info -> (Gid.t * Time.t) option
(** [last_seen pi] is the most recent of:
* last established connection
* last rejected connection
* last disconnection
val last_miss :
'conn point_info -> Time.t option
val greylisted :
?now:Time.t -> 'conn point_info -> bool
val point : 'conn point_info -> Point.t
module State : sig
type 'conn t =
| Requested of { cancel: Canceler.t }
(** We initiated a connection. *)
| Accepted of { current_gid: Gid.t ;
cancel: Canceler.t }
(** We accepted a incoming connection. *)
| Running of { data: 'conn ;
current_gid: Gid.t }
(** Successfully authentificated connection, normal business. *)
| Disconnected
(** No connection established currently. *)
type 'conn state = 'conn t
val pp : Format.formatter -> 'conn t -> unit
val get : 'conn point_info -> 'conn state
val is_disconnected : 'conn point_info -> bool
val set_requested :
?timestamp:Time.t ->
'conn point_info -> Canceler.t -> unit
val set_accepted :
?timestamp:Time.t ->
'conn point_info -> Gid.t -> Canceler.t -> unit
val set_running :
?timestamp:Time.t -> 'conn point_info -> Gid.t -> 'conn -> unit
val set_disconnected :
?timestamp:Time.t -> ?requested:bool -> 'conn point_info -> unit
module Event : sig
type kind =
| Outgoing_request
(** We initiated a connection. *)
| Accepting_request of Gid.t
(** We accepted a connection after authentifying the remote peer. *)
| Rejecting_request of Gid.t
(** We rejected a connection after authentifying the remote peer. *)
| Request_rejected of Gid.t option
(** The remote peer rejected our connection. *)
| Connection_established of Gid.t
(** We succesfully established a authentified connection. *)
| Disconnection of Gid.t
(** We decided to close the connection. *)
| External_disconnection of Gid.t
(** The connection was closed for external reason. *)
type t = {
kind : kind ;
timestamp : Time.t ;
val fold_events :
'conn point_info -> init:'a -> f:('a -> Event.t -> 'a) -> 'a
val log_incoming_rejection :
?timestamp:Time.t -> 'conn point_info -> Gid.t -> unit
(** Gid info: current and historical information about a gid *)
module Gid_info : sig
type ('conn, 'meta) t
type ('conn, 'meta) gid_info = ('conn, 'meta) t
val compare : ('conn, 'meta) t -> ('conn, 'meta) t -> int
val create :
?trusted:bool ->
metadata:'meta ->
Gid.t -> ('conn, 'meta) gid_info
(** [create ~trusted ~meta gid] is a freshly minted gid info for
[gid]. *)
val gid : ('conn, 'meta) gid_info -> Gid.t
val metadata : ('conn, 'meta) gid_info -> 'meta
val set_metadata : ('conn, 'meta) gid_info -> 'meta -> unit
val trusted : ('conn, 'meta) gid_info -> bool
val set_trusted : ('conn, 'meta) gid_info -> unit
val unset_trusted : ('conn, 'meta) gid_info -> unit
val last_failed_connection :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
val last_rejected_connection :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
val last_established_connection :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
val last_disconnection :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
val last_seen :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
(** [last_seen gi] is the most recent of:
* last established connection
* last rejected connection
* last disconnection
val last_miss :
('conn, 'meta) gid_info -> (Id_point.t * Time.t) option
(** [last_miss gi] is the most recent of:
* last failed connection
* last rejected connection
* last disconnection
module State : sig
type 'conn t =
| Accepted of { current_point: Id_point.t ;
cancel: Canceler.t }
(** We accepted a incoming connection, we greeted back and
we are waiting for an acknowledgement. *)
| Running of { data: 'conn ;
current_point: Id_point.t }
(** Successfully authentificated connection, normal business. *)
| Disconnected
(** No connection established currently. *)
type 'conn state = 'conn t
val pp : Format.formatter -> 'conn t -> unit
val get : ('conn, 'meta) gid_info -> 'conn state
val is_disconnected : ('conn, 'meta) gid_info -> bool
val set_accepted :
?timestamp:Time.t ->
('conn, 'meta) gid_info -> Id_point.t -> Canceler.t -> unit
val set_running :
?timestamp:Time.t ->
('conn, 'meta) gid_info -> Id_point.t -> 'conn -> unit
val set_disconnected :
?timestamp:Time.t ->
?requested:bool ->
('conn, 'meta) gid_info -> unit
module Event : sig
type kind =
| Accepting_request
(** We accepted a connection after authentifying the remote peer. *)
| Rejecting_request
(** We rejected a connection after authentifying the remote peer. *)
| Request_rejected
(** The remote peer rejected our connection. *)
| Connection_established
(** We succesfully established a authentified connection. *)
| Disconnection
(** We decided to close the connection. *)
| External_disconnection
(** The connection was closed for external reason. *)
type t = {
kind : kind ;
timestamp : Time.t ;
point : Id_point.t ;
val fold_events :
('conn, 'meta) gid_info -> init:'a -> f:('a -> Event.t -> 'a) -> 'a
val log_incoming_rejection :
?timestamp:Time.t ->
('conn, 'meta) gid_info -> Id_point.t -> unit
module File : sig
val load :
string -> 'meta Data_encoding.t ->
('conn, 'meta) gid_info list Lwt.t
val save :
string -> 'meta Data_encoding.t ->
('conn, 'meta) gid_info list -> bool Lwt.t
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
include Logging.Make (struct let name = "p2p.welcome" end)
open P2p_types
type pool = Pool : ('msg, 'meta) P2p_connection_pool.t -> pool
type t = {
socket: Lwt_unix.file_descr ;
canceler: Canceler.t ;
pool: pool ;
mutable worker: unit Lwt.t ;
let rec worker_loop st =
let Pool pool = st.pool in
Lwt_unix.yield () >>= fun () ->
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Lwt_unix.accept st.socket >>= return
end >>= function
| Ok (fd, addr) ->
let point =
match addr with
| Lwt_unix.ADDR_UNIX _ -> assert false
| Lwt_unix.ADDR_INET (addr, port) ->
(Ipaddr_unix.V6.of_inet_addr_exn addr, port) in
P2p_connection_pool.accept pool fd point ;
worker_loop st
| Error [Lwt_utils.Canceled] ->
| Error err ->
lwt_log_error "@[<v 2>Unexpected error in the Welcome worker@ %a@]"
pp_print_error err >>= fun () ->
let create_listening_socket ~backlog ?(addr = Ipaddr.V6.unspecified) port =
let main_socket = Lwt_unix.(socket PF_INET6 SOCK_STREAM 0) in
Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ;
main_socket (Point.to_sockaddr (addr, port)) >>= fun () ->
Lwt_unix.listen main_socket backlog ;
Lwt.return main_socket
let run ~backlog pool ?addr port =
Lwt.catch begin fun () ->
~backlog ?addr port >>= fun socket ->
let canceler = Canceler.create () in
Canceler.on_cancel canceler begin fun () ->
Lwt_utils.safe_close socket
end ;
let st = {
socket ; canceler ; pool = Pool pool ;
worker = Lwt.return_unit ;
} in
st.worker <-
Lwt_utils.worker "welcome"
(fun () -> worker_loop st)
(fun () -> Canceler.cancel st.canceler) ;
Lwt.return st
end begin fun exn ->
"@[<v 2>Cannot accept incoming connections@ %a@]"
pp_exn exn >>= fun () ->
| exn
let shutdown st =
Canceler.cancel st.canceler >>= fun () ->
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
open P2p_types
(** Welcome worker. Accept incoming connections and add them to its
connection pool. *)
type t
(** Type of a welcome worker, parametrized like a
[P2p_connection_pool.pool]. *)
val run:
backlog:int ->
('msg, 'meta) P2p_connection_pool.t ->
?addr:addr -> port -> t Lwt.t
(** [run ~backlog ~addr pool port] returns a running welcome worker
feeding [pool] listening at [(addr, port)]. [backlog] is the
argument passed to [Lwt_unix.accept]. *)
val shutdown: t -> unit Lwt.t
basic \
basic \
p2p-io-scheduler \
p2p-io-scheduler \
p2p-connection \
p2p-connection \
all: test
all: test
.PHONY:build-test-p2p-connection-pool run-test-p2p-connection-pool
build-test-p2p-connection-pool: test-p2p-connection-pool
./test-p2p-connection-pool --clients 10 --repeat 5
lib/ \
lib/ \
lib/ \
lib/ \
lib/ \
${}: ${NODELIB}
test-p2p-io-scheduler: ${NODELIB} ${}
test-p2p-io-scheduler: ${NODELIB} ${}
ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^
ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^
test-p2p-connection: ${NODELIB} ${}
test-p2p-connection: ${NODELIB} ${}
ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^
ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^
test-p2p-connection-pool: ${NODELIB} ${}
ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^
-rm -f test-p2p-io_scheduler
-rm -f test-p2p-io_scheduler
-rm -f test-p2p-connection
-rm -f test-p2p-connection
-rm -f test-p2p-connection-pool
## lwt pipe test program
## lwt pipe test program
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
open Error_monad
open P2p_types
include Logging.Make (struct let name = "test-p2p-connection-pool" end)
type message =
| Ping
let msg_config : message P2p_connection_pool.message_config = {
encoding = [
P2p_connection_pool.Encoding {
tag = 0x10 ;
encoding = Data_encoding.empty ;
wrap = (function () -> Ping) ;
unwrap = (function Ping -> Some ()) ;
max_length = None ;
} ;
] ;
versions = Version.[ { name = "TEST" ; major = 0 ; minor = 0 } ] ;
type metadata = unit
let meta_config : metadata P2p_connection_pool.meta_config = {
encoding = Data_encoding.empty ;
initial = () ;
let rec connect ~timeout pool point =
lwt_log_info "Connect to %a" Point.pp point >>= fun () ->
P2p_connection_pool.connect pool point ~timeout >>= function
| Error [P2p_connection_pool.Connected] -> begin
match P2p_connection_pool.Points.find_connection pool point with
| Some conn -> return conn
| None -> failwith "Woops..."
| Error ([ P2p_connection_pool.Connection_refused
| P2p_connection_pool.Pending_connection
| P2p_connection.Rejected
| Lwt_utils.Canceled
| Lwt_utils.Timeout
| P2p_connection_pool.Rejected _
] as err) ->
lwt_log_info "@[Connection to %a failed:@ %a@]"
Point.pp point pp_print_error err >>= fun () ->
Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () ->
connect ~timeout pool point
| Ok _ | Error _ as res -> Lwt.return res
let connect_all ~timeout pool points =
map_p (connect ~timeout pool) points
type error += Connect | Write | Read
let write_all conns msg =
(fun conn ->
trace Write @@ P2p_connection_pool.write_sync conn msg)
let read_all conns =
(fun conn ->
trace Read @@ conn >>=? fun Ping ->
return ())
let rec connect_random pool total rem point n =
Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () ->
(trace Connect @@ connect ~timeout:2. pool point) >>=? fun conn ->
(trace Write @@ P2p_connection_pool.write conn Ping) >>= fun _ ->
(trace Read @@ conn) >>=? fun Ping ->
Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () ->
P2p_connection_pool.disconnect conn >>= fun () ->
decr rem ;
if !rem mod total = 0 then
lwt_log_notice "Remaining: %d." (!rem / total)
Lwt.return ()
end >>= fun () ->
if n > 1 then
connect_random pool total rem point (pred n)
return ()
let connect_random_all pool points n =
let total = List.length points in
let rem = ref (n * total) in
iter_p (fun point -> connect_random pool total rem point n) points
let close_all conns =
Lwt_list.iter_p P2p_connection_pool.disconnect conns
let run_net config repeat points addr port =
Lwt_unix.sleep (Random.float 2.0) >>= fun () ->
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
config meta_config msg_config sched >>= fun pool ->
| ~backlog:10 pool ~addr port >>= fun welcome ->
connect_all ~timeout:2. pool points >>=? fun conns ->
lwt_log_notice "Bootstrap OK" >>= fun () ->
write_all conns Ping >>=? fun () ->
lwt_log_notice "Sent all messages." >>= fun () ->
read_all conns >>=? fun () ->
lwt_log_notice "Read all messages." >>= fun () ->
close_all conns >>= fun () ->
lwt_log_notice "Begin random connections." >>= fun () ->
connect_random_all pool points repeat >>=? fun () ->
lwt_log_notice "Shutting down" >>= fun () ->
P2p_welcome.shutdown welcome >>= fun () ->
P2p_connection_pool.destroy pool >>= fun () ->
P2p_io_scheduler.shutdown sched >>= fun () ->
lwt_log_notice "Shutdown Ok" >>= fun () ->
return ()
let make_net points repeat n =
let point, points = n points in
let proof_of_work_target = Crypto_box.make_target [] in
let identity = Identity.generate proof_of_work_target in
let config = P2p_connection_pool.{
identity ;
proof_of_work_target ;
trusted_points = points ;
peers_file = "/dev/null" ;
closed_network = true ;
listening_port = Some (snd point) ;
min_connections = List.length points ;
max_connections = List.length points ;
max_incoming_connections = List.length points ;
authentification_timeout = 2. ;
incoming_app_message_queue_size = None ;
incoming_message_queue_size = None ;
outgoing_message_queue_size = None ;
} in
~prefix:(Format.asprintf "%a " Gid.pp identity.gid)
begin fun () ->
run_net config repeat points (fst point) (snd point) >>= function
| Ok () -> Lwt.return_unit
| Error err ->
lwt_log_error "@[<v 2>Unexpected error: %d@ %a@]"
(List.length err)
pp_print_error err >>= fun () ->
exit 1
let addr = ref Ipaddr.V6.localhost
let port = ref (1024 + 8192)
let clients = ref 10
let repeat = ref 5
let spec = Arg.[
"--port", Int (fun p -> port := p), " Listening port of the first peer.";
"--addr", String (fun p -> addr := Ipaddr.V6.of_string_exn p),
" Listening addr";
"--clients", Set_int clients, " Number of concurrent clients." ;
"--repeat", Set_int repeat, " Number of connections/disconnections." ;
"-v", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Info)),
" Log up to info msgs" ;
"-vv", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Debug)),
" Log up to debug msgs";
let main () =
let open Utils in
let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in
let usage_msg = "Usage: %s <num_peers>.\nArguments are:" in
Arg.parse spec anon_fun usage_msg ;
let ports = !port -- (!port + !clients - 1) in
let points = (fun port -> !addr, port) ports in
Lwt_list.iter_p (make_net points !repeat) (0 -- (!clients - 1))
let () =
Sys.catch_break true ;
Logging.init Stderr ;
| @@ main ()
with _ -> ()
