From b694a6281052101423f4755a501ad4a6a5a1efed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sat, 14 Jan 2017 13:14:07 +0100 Subject: [PATCH] Shell: implement `P2p_{connection_pool,welcome}` --- .gitignore | 1 + src/Makefile | 6 + src/node/net/p2p_connection_pool.ml | 667 +++++++++++++++++++++ src/node/net/p2p_connection_pool.mli | 290 +++++++++ src/node/net/p2p_connection_pool_types.ml | 463 ++++++++++++++ src/node/net/p2p_connection_pool_types.mli | 265 ++++++++ src/node/net/p2p_welcome.ml | 77 +++ src/node/net/p2p_welcome.mli | 27 + test/Makefile | 15 + test/test_p2p_connection_pool.ml | 196 ++++++ 10 files changed, 2007 insertions(+) create mode 100644 src/node/net/p2p_connection_pool.ml create mode 100644 src/node/net/p2p_connection_pool.mli create mode 100644 src/node/net/p2p_connection_pool_types.ml create mode 100644 src/node/net/p2p_connection_pool_types.mli create mode 100644 src/node/net/p2p_welcome.ml create mode 100644 src/node/net/p2p_welcome.mli create mode 100644 test/test_p2p_connection_pool.ml diff --git a/.gitignore b/.gitignore index 199bb28a1..c1f626690 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ /test/test-data-encoding /test/test-p2p-io-scheduler /test/test-p2p-connection +/test/test-p2p-connection-pool /test/LOG *~ diff --git a/src/Makefile b/src/Makefile index 2e8d89a92..49aaaf097 100644 --- a/src/Makefile +++ b/src/Makefile @@ -260,6 +260,9 @@ NODE_LIB_INTFS := \ node/net/p2p_types.mli \ node/net/p2p_io_scheduler.mli \ node/net/p2p_connection.mli \ + node/net/p2p_connection_pool_types.mli \ + node/net/p2p_connection_pool.mli \ + node/net/p2p_welcome.mli \ node/net/p2p.mli \ node/net/RPC_server.mli \ \ @@ -293,6 +296,9 @@ NODE_LIB_IMPLS := \ node/net/p2p_types.ml \ node/net/p2p_io_scheduler.ml \ node/net/p2p_connection.ml \ + node/net/p2p_connection_pool_types.ml \ + node/net/p2p_connection_pool.ml \ + node/net/p2p_welcome.ml \ node/net/p2p.ml \ \ node/net/RPC_server.ml \ diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml new file mode 100644 index 000000000..dc437985a --- /dev/null +++ b/src/node/net/p2p_connection_pool.ml @@ -0,0 +1,667 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(* TODO check version negotiation *) + +(* TODO Test cancelation of a (pending) connection *) + +(* TODO do not recompute list_known_points at each requests... but + only once in a while, e.g. every minutes or when a point + or the associated gid is blacklisted. *) + +(* TODO allow to track "requested gids" when we reconnect to a point. *) + +open P2p_types +open P2p_connection_pool_types + +include Logging.Make (struct let name = "p2p.connection-pool" end) + +type 'msg encoding = Encoding : { + tag: int ; + encoding: 'a Data_encoding.t ; + wrap: 'a -> 'msg ; + unwrap: 'msg -> 'a option ; + max_length: int option ; + } -> 'msg encoding + +module Message = struct + + type 'msg t = + | Bootstrap + | Advertise of Point.t list + | Message of 'msg + | Disconnect + + let encoding msg_encoding = + let open Data_encoding in + union ~tag_size:`Uint16 + ([ case ~tag:0x01 null + (function Disconnect -> Some () | _ -> None) + (fun () -> Disconnect); + case ~tag:0x02 null + (function Bootstrap -> Some () | _ -> None) + (fun () -> Bootstrap); + case ~tag:0x03 (Variable.list Point.encoding) + (function Advertise points -> Some points | _ -> None) + (fun points -> Advertise points); + ] @ + ListLabels.map msg_encoding + ~f:(function Encoding { tag ; encoding ; wrap ; unwrap } -> + case ~tag encoding + (function Message msg -> unwrap msg | _ -> None) + (fun msg -> Message (wrap msg)))) + +end + + +module Answerer = struct + + type 'msg callback = { + bootstrap: unit -> Point.t list Lwt.t ; + advertise: Point.t list -> unit Lwt.t ; + message: 'msg -> unit Lwt.t ; + } + + type 'msg t = { + canceler: Canceler.t ; + conn: 'msg Message.t P2p_connection.t ; + callback: 'msg callback ; + mutable worker: unit Lwt.t ; + } + + let rec worker_loop st = + Lwt_unix.yield () >>= fun () -> + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + P2p_connection.read st.conn + end >>= function + | Ok Bootstrap -> begin + st.callback.bootstrap () >>= function + | [] -> + worker_loop st + | points -> + match P2p_connection.write_now st.conn (Advertise points) with + | Ok _sent -> + (* if not sent then ?? TODO count dropped message ?? *) + worker_loop st + | Error _ -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + end + | Ok (Advertise points) -> + st.callback.advertise points >>= fun () -> + worker_loop st + | Ok (Message msg) -> + st.callback.message msg >>= fun () -> + worker_loop st + | Ok Disconnect | Error [P2p_io_scheduler.Connection_closed] -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + | Error [Lwt_utils.Canceled] -> + Lwt.return_unit + | Error err -> + lwt_log_error "@[Answerer unexpected error:@ %a@]" + Error_monad.pp_print_error err >>= fun () -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + + let run conn canceler callback = + let st = { + canceler ; conn ; callback ; + worker = Lwt.return_unit ; + } in + st.worker <- + Lwt_utils.worker "answerer" + (fun () -> worker_loop st) + (fun () -> Canceler.cancel canceler) ; + st + + let shutdown st = + Canceler.cancel st.canceler >>= fun () -> + st.worker + +end + +type config = { + + identity : Identity.t ; + proof_of_work_target : Crypto_box.target ; + + trusted_points : Point.t list ; + peers_file : string ; + closed_network : bool ; + + listening_port : port option ; + min_connections : int ; + max_connections : int ; + max_incoming_connections : int ; + authentification_timeout : float ; + + incoming_app_message_queue_size : int option ; + incoming_message_queue_size : int option ; + outgoing_message_queue_size : int option ; + +} + +type 'meta meta_config = { + encoding : 'meta Data_encoding.t; + initial : 'meta; +} + +type 'msg message_config = { + encoding : 'msg encoding list ; + versions : P2p_types.Version.t list; +} + +type ('msg, 'meta) t = { + config : config ; + meta_config : 'meta meta_config ; + message_config : 'msg message_config ; + my_id_points : unit Point.Table.t ; + known_gids : (('msg, 'meta) connection, 'meta) Gid_info.t Gid.Table.t ; + connected_gids : (('msg, 'meta) connection, 'meta) Gid_info.t Gid.Table.t ; + known_points : ('msg, 'meta) connection Point_info.t Point.Table.t ; + connected_points : ('msg, 'meta) connection Point_info.t Point.Table.t ; + incoming : Canceler.t Point.Table.t ; + io_sched : P2p_io_scheduler.t ; + encoding : 'msg Message.t Data_encoding.t ; + events : events ; +} + +and events = { + too_few_connections : unit Lwt_condition.t ; + too_many_connections : unit Lwt_condition.t ; + new_point : unit Lwt_condition.t ; +} + +and ('msg, 'meta) connection = { + canceler : Canceler.t ; + messages : 'msg Lwt_pipe.t ; + conn : 'msg Message.t P2p_connection.t ; + gid_info : (('msg, 'meta) connection, 'meta) Gid_info.t ; + point_info : ('msg, 'meta) connection Point_info.t option ; + answerer : 'msg Answerer.t ; + mutable wait_close : bool ; +} + +type ('msg, 'meta) pool = ('msg, 'meta) t + +let register_point pool ?trusted (addr, port as point) = + match Point.Table.find pool.known_points point with + | exception Not_found -> + let pi = Point_info.create ?trusted addr port in + Point.Table.add pool.known_points point pi ; + pi + | pi -> pi + +let register_peer pool gid = + match Gid.Table.find pool.known_gids gid with + | exception Not_found -> + Lwt_condition.broadcast pool.events.new_point () ; + let peer = Gid_info.create gid ~metadata:pool.meta_config.initial in + Gid.Table.add pool.known_gids gid peer ; + peer + | peer -> peer + +let register_new_point pool _gid point = + if not (Point.Table.mem pool.my_id_points point) then + ignore (register_point pool point) + +let register_new_points pool gid points = + List.iter (register_new_point pool gid) points ; + Lwt.return_unit + +let compare_known_point_info p1 p2 = + (* The most-recently disconnected peers are greater. *) + (* Then come long-standing connected peers. *) + let disconnected1 = Point_info.State.is_disconnected p1 + and disconnected2 = Point_info.State.is_disconnected p2 in + let compare_last_seen p1 p2 = + match Point_info.last_seen p1, Point_info.last_seen p2 with + | None, None -> Random.int 2 * 2 - 1 (* HACK... *) + | Some _, None -> 1 + | None, Some _ -> -1 + | Some (_, time1), Some (_, time2) -> + match compare time1 time2 with + | 0 -> Random.int 2 * 2 - 1 (* HACK... *) + | x -> x in + match disconnected1, disconnected2 with + | false, false -> compare_last_seen p1 p2 + | false, true -> -1 + | true, false -> 1 + | true, true -> compare_last_seen p2 p1 + +let list_known_points pool _gid () = + let knowns = + Point.Table.fold (fun _ pi acc -> pi :: acc) pool.known_points [] in + let best_knowns = + Utils.take_n ~compare:compare_known_point_info 50 knowns in + Lwt.return (List.map Point_info.point best_knowns) + +let active_connections pool = Gid.Table.length pool.connected_gids + +let create_connection pool conn id_point pi gi = + let gid = Gid_info.gid gi in + let canceler = Canceler.create () in + let messages = + Lwt_pipe.create ?size:pool.config.incoming_app_message_queue_size () in + let callback = + { Answerer.message = Lwt_pipe.push messages ; + advertise = register_new_points pool gid ; + bootstrap = list_known_points pool gid ; + } in + let answerer = Answerer.run conn canceler callback in + let conn = + { conn ; point_info = pi ; gid_info = gi ; + messages ; canceler ; answerer ; wait_close = false } in + iter_option pi ~f:begin fun pi -> + Point_info.State.set_running pi gid conn ; + Point.Table.add pool.connected_points (Point_info.point pi) pi ; + end ; + Gid_info.State.set_running gi id_point conn ; + Gid.Table.add pool.connected_gids gid gi ; + Canceler.on_cancel canceler begin fun () -> + lwt_debug "Disconnect: %a (%a)" + Gid.pp gid Id_point.pp id_point >>= fun () -> + iter_option ~f:Point_info.State.set_disconnected pi; + Gid_info.State.set_disconnected gi ; + iter_option pi ~f:begin fun pi -> + Point.Table.remove pool.connected_points (Point_info.point pi) ; + end ; + Gid.Table.remove pool.connected_gids gid ; + if pool.config.max_connections <= active_connections pool then + Lwt_condition.broadcast pool.events.too_many_connections () ; + P2p_connection.close ~wait:conn.wait_close conn.conn + end ; + if active_connections pool < pool.config.min_connections then + Lwt_condition.broadcast pool.events.too_few_connections () ; + conn + +let disconnect ?(wait = false) conn = + conn.wait_close <- wait ; + Canceler.cancel conn.canceler >>= fun () -> + conn.answerer.worker + +type error += Rejected of Gid.t +type error += Unexpected_point_state +type error += Unexpected_gid_state + +let may_register_my_id_point pool = function + | [P2p_connection.Myself (addr, Some port)] -> + Point.Table.add pool.my_id_points (addr, port) () ; + Point.Table.remove pool.known_points (addr, port) + | _ -> () + +let authenticate pool ?pi canceler fd point = + let incoming = pi = None in + lwt_debug "authenticate: %a%s" + Point.pp point + (if incoming then " incoming" else "") >>= fun () -> + Lwt_utils.protect ~canceler begin fun () -> + P2p_connection.authenticate + ~proof_of_work_target:pool.config.proof_of_work_target + ~incoming (P2p_io_scheduler.register pool.io_sched fd) point + ?listening_port:pool.config.listening_port + pool.config.identity pool.message_config.versions + end ~on_error: begin fun err -> + (* TODO do something when the error is Not_enough_proof_of_work ?? *) + lwt_debug "authenticate: %a%s -> failed %a" + Point.pp point + (if incoming then " incoming" else "") + pp_print_error err >>= fun () -> + may_register_my_id_point pool err ; + if incoming then + Point.Table.remove pool.incoming point + else + iter_option Point_info.State.set_disconnected pi ; + Lwt.return (Error err) + end >>=? fun (info, auth_fd) -> + lwt_debug "authenticate: %a -> auth %a" + Point.pp point + Connection_info.pp info >>= fun () -> + let remote_pi = + match info.id_point with + | addr, Some port + when not (Point.Table.mem pool.my_id_points (addr, port)) -> + Some (register_point pool (addr, port)) + | _ -> None in + let connection_pi = + match pi, remote_pi with + | None, None -> None + | Some _ as pi, _ | _, (Some _ as pi) -> pi in + let gi = register_peer pool info.gid in + let acceptable_point = + unopt_map connection_pi + ~default:(not pool.config.closed_network) + ~f:begin fun connection_pi -> + match Point_info.State.get connection_pi with + | Requested _ -> not incoming + | Disconnected -> + not pool.config.closed_network + || Point_info.trusted connection_pi + | Accepted _ | Running _ -> false + end + in + let acceptable_gid = + match Gid_info.State.get gi with + | Accepted _ -> + (* TODO: in some circumstances cancel and accept... *) + false + | Running _ -> false + | Disconnected -> true + in + if incoming then Point.Table.remove pool.incoming point ; + if not acceptable_gid || not acceptable_point then begin + lwt_debug "authenticate: %a -> kick %a point: %B gid: %B" + Point.pp point + Connection_info.pp info + acceptable_point acceptable_gid >>= fun () -> + P2p_connection.kick auth_fd >>= fun () -> + if not incoming then begin + iter_option ~f:Point_info.State.set_disconnected pi ; + (* FIXME Gid_info.State.set_disconnected ~requested:true gi ; *) + end ; + fail (Rejected info.gid) + end else begin + iter_option connection_pi + ~f:(fun pi -> Point_info.State.set_accepted pi info.gid canceler) ; + Gid_info.State.set_accepted gi info.id_point canceler ; + lwt_debug "authenticate: %a -> accept %a" + Point.pp point + Connection_info.pp info >>= fun () -> + Lwt_utils.protect ~canceler begin fun () -> + P2p_connection.accept + ?incoming_message_queue_size:pool.config.incoming_message_queue_size + ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size + auth_fd pool.encoding >>= fun conn -> + lwt_debug "authenticate: %a -> Connected %a" + Point.pp point + Connection_info.pp info >>= fun () -> + Lwt.return conn + end ~on_error: begin fun err -> + lwt_debug "authenticate: %a -> rejected %a" + Point.pp point + Connection_info.pp info >>= fun () -> + iter_option connection_pi ~f:Point_info.State.set_disconnected; + Gid_info.State.set_disconnected gi ; + Lwt.return (Error err) + end >>=? fun conn -> + let id_point = + match info.id_point, map_option Point_info.point pi with + | (addr, _), Some (_, port) -> addr, Some port + | id_point, None -> id_point in + return (create_connection pool conn id_point connection_pi gi) + end + +type error += Pending_connection +type error += Connected +type error += Connection_closed = P2p_io_scheduler.Connection_closed +type error += Connection_refused +type error += Closed_network + +let fail_unless_disconnected_point pi = + match Point_info.State.get pi with + | Disconnected -> return () + | Requested _ | Accepted _ -> fail Pending_connection + | Running _ -> fail Connected + +let fail_unless_disconnected_gid gi = + match Gid_info.State.get gi with + | Disconnected -> return () + | Accepted _ -> fail Pending_connection + | Running _ -> fail Connected + +let raw_connect canceler pool point = + let pi = register_point pool point in + let addr, port as point = Point_info.point pi in + fail_unless + (not pool.config.closed_network || Point_info.trusted pi) + Closed_network >>=? fun () -> + fail_unless_disconnected_point pi >>=? fun () -> + Point_info.State.set_requested pi canceler ; + let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in + let uaddr = + Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in + lwt_debug "connect: %a" Point.pp point >>= fun () -> + Lwt_utils.protect ~canceler begin fun () -> + Lwt_unix.connect fd uaddr >>= fun () -> + return () + end ~on_error: begin fun err -> + lwt_debug "connect: %a -> disconnect" Point.pp point >>= fun () -> + Point_info.State.set_disconnected pi ; + match err with + | [Exn (Unix.Unix_error (Unix.ECONNREFUSED, _, _))] -> + fail Connection_refused + | err -> Lwt.return (Error err) + end >>=? fun () -> + lwt_debug "connect: %a -> authenticate" Point.pp point >>= fun () -> + authenticate pool ~pi canceler fd point + +type error += Too_many_connections + +let connect ~timeout pool point = + fail_unless + (active_connections pool <= pool.config.max_connections) + Too_many_connections >>=? fun () -> + let canceler = Canceler.create () in + Lwt_utils.with_timeout ~canceler timeout begin fun canceler -> + raw_connect canceler pool point + end + +let accept pool fd point = + if pool.config.max_incoming_connections <= Point.Table.length pool.incoming + || pool.config.max_connections <= active_connections pool then + Lwt.async (fun () -> Lwt_utils.safe_close fd) + else + let canceler = Canceler.create () in + Point.Table.add pool.incoming point canceler ; + Lwt.async begin fun () -> + Lwt_utils.with_timeout + ~canceler pool.config.authentification_timeout + (fun canceler -> authenticate pool canceler fd point) + end + + +(***************************************************************************) + +let read { messages } = + Lwt.catch + (fun () -> Lwt_pipe.pop messages >>= return) + (fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed) + +let is_readable { messages } = + Lwt.catch + (fun () -> Lwt_pipe.values_available messages >>= return) + (fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed) + +let write { conn } msg = + P2p_connection.write conn (Message msg) + +let write_sync { conn } msg = + P2p_connection.write_sync conn (Message msg) + +let write_now { conn } msg = + P2p_connection.write_now conn (Message msg) + +let write_all pool msg = + Gid.Table.iter + (fun _gid gi -> + match Gid_info.State.get gi with + | Running { data = conn } -> + ignore (write_now conn msg : bool tzresult ) + | _ -> ()) + pool.connected_gids + +let broadcast_bootstrap_msg pool = + Gid.Table.iter + (fun _gid gi -> + match Gid_info.State.get gi with + | Running { data = { conn } } -> + ignore (P2p_connection.write_now conn Bootstrap : bool tzresult ) + | _ -> ()) + pool.connected_gids + + +(***************************************************************************) + +module Gids = struct + + type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) Gid_info.t + + let info { known_gids } point = + try Some (Gid.Table.find known_gids point) + with Not_found -> None + + let get_metadata pool gid = + try Some (Gid_info.metadata (Gid.Table.find pool.known_gids gid)) + with Not_found -> None + + let set_metadata pool gid data = + Gid_info.set_metadata (register_peer pool gid) data + + let get_trusted pool gid = + try Gid_info.trusted (Gid.Table.find pool.known_gids gid) + with Not_found -> false + + let set_trusted pool gid = + try Gid_info.set_trusted (register_peer pool gid) + with Not_found -> () + + let unset_trusted pool gid = + try Gid_info.unset_trusted (Gid.Table.find pool.known_gids gid) + with Not_found -> () + + let find_connection pool gid = + apply_option + (info pool gid) + ~f:(fun p -> + match Gid_info.State.get p with + | Running { data } -> Some data + | _ -> None) + + let fold_known pool ~init ~f = + Gid.Table.fold f pool.known_gids init + let fold_connected pool ~init ~f = + Gid.Table.fold f pool.connected_gids init + +end + +let fold_connections pool ~init ~f = + Gids.fold_connected pool ~init ~f:begin fun gid gi acc -> + match Gid_info.State.get gi with + | Running { data } -> f gid data acc + | _ -> acc + end + +module Points = struct + + type ('msg, 'meta) info = ('msg, 'meta) connection Point_info.t + + let info { known_points } point = + try Some (Point.Table.find known_points point) + with Not_found -> None + + let get_trusted pool gid = + try Point_info.trusted (Point.Table.find pool.known_points gid) + with Not_found -> false + + let set_trusted pool gid = + try Point_info.set_trusted (register_point pool gid) + with Not_found -> () + + let unset_trusted pool gid = + try Point_info.unset_trusted (Point.Table.find pool.known_points gid) + with Not_found -> () + + let find_connection pool point = + apply_option + (info pool point) + ~f:(fun p -> + match Point_info.State.get p with + | Running { data } -> Some data + | _ -> None) + + let fold_known pool ~init ~f = + Point.Table.fold f pool.known_points init + + let fold_connected pool ~init ~f = + Point.Table.fold f pool.connected_points init + +end + +module Events = struct + let too_few_connections pool = + Lwt_condition.wait pool.events.too_few_connections + let too_many_connections pool = + Lwt_condition.wait pool.events.too_many_connections + let new_point pool = + Lwt_condition.wait pool.events.new_point +end + + +let connection_stat { conn } = + P2p_connection.stat conn + +let pool_stat { io_sched } = + P2p_io_scheduler.global_stat io_sched + +let connection_info { conn } = + P2p_connection.info conn + +(***************************************************************************) + +let create config meta_config message_config io_sched = + let events = { + too_few_connections = Lwt_condition.create () ; + too_many_connections = Lwt_condition.create () ; + new_point = Lwt_condition.create () ; + } in + let pool = { + config ; meta_config ; message_config ; + my_id_points = Point.Table.create 7 ; + known_gids = Gid.Table.create 53 ; + connected_gids = Gid.Table.create 53 ; + known_points = Point.Table.create 53 ; + connected_points = Point.Table.create 53 ; + incoming = Point.Table.create 53 ; + io_sched ; + encoding = Message.encoding message_config.encoding ; + events ; + } in + List.iter (Points.set_trusted pool) config.trusted_points ; + Lwt.catch + (fun () -> + Gid_info.File.load config.peers_file meta_config.encoding) + (fun _ -> + (* TODO log error *) + Lwt.return_nil) >>= fun gids -> + List.iter + (fun gi -> Gid.Table.add pool.known_gids (Gid_info.gid gi) gi) + gids ; + Lwt.return pool + +let destroy pool = + Point.Table.fold (fun _point pi acc -> + match Point_info.State.get pi with + | Requested { cancel } | Accepted { cancel } -> + Canceler.cancel cancel >>= fun () -> acc + | Running { data = conn } -> + disconnect conn >>= fun () -> acc + | Disconnected -> acc) + pool.known_points @@ + Gid.Table.fold (fun _gid gi acc -> + match Gid_info.State.get gi with + | Accepted { cancel } -> + Canceler.cancel cancel >>= fun () -> acc + | Running { data = conn } -> + disconnect conn >>= fun () -> acc + | Disconnected -> acc) + pool.known_gids @@ + Point.Table.fold (fun _point canceler acc -> + Canceler.cancel canceler >>= fun () -> acc) + pool.incoming Lwt.return_unit diff --git a/src/node/net/p2p_connection_pool.mli b/src/node/net/p2p_connection_pool.mli new file mode 100644 index 000000000..27ef938c0 --- /dev/null +++ b/src/node/net/p2p_connection_pool.mli @@ -0,0 +1,290 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** Pool of connections. This module manages the connection pool that + the shell needs to maintain in order to function correctly. + + A pool and its connections are parametrized by the type of + messages exchanged over the connection and the type of + meta-information associated with a peer. The type [('msg, 'meta) + connection] is a wrapper on top of [P2p_connection.t] that adds + meta-information, a data-structure describing a fine-grained state + of the connection, as well as a new message queue (referred to + "app message queue") that will only contain the messages from the + internal [P2p_connection.t] that needs to be examined by the + higher layers. Some messages are directly processed by an internal + worker and thus never propagated above. +*) + +open P2p_types +open P2p_connection_pool_types + +type 'msg encoding = Encoding : { + tag: int ; + encoding: 'a Data_encoding.t ; + wrap: 'a -> 'msg ; + unwrap: 'msg -> 'a option ; + max_length: int option ; + } -> 'msg encoding + +(** {1 Pool management} *) + +type ('msg, 'meta) t + +type ('msg, 'meta) pool = ('msg, 'meta) t +(** The type of a pool of connections, parametrized by resp. the type + of messages and the meta-information associated to an identity. *) + +type config = { + + identity : Identity.t ; + (** Our identity. *) + + proof_of_work_target : Crypto_box.target ; + (** The proof of work target we require from peers. *) + + trusted_points : Point.t list ; + (** List of hard-coded known peers to bootstrap the network from. *) + + peers_file : string ; + (** The path to the JSON file where the metadata associated to + gids are loaded / stored. *) + + closed_network : bool ; + (** If [true], the only accepted connections are from peers whose + addresses are in [trusted_peers]. *) + + listening_port : port option ; + (** If provided, it will be passed to [P2p_connection.authenticate] + when we authenticate against a new peer. *) + + min_connections : int ; + (** Strict minimum number of connections + (triggers [Event.too_few_connections]). *) + + max_connections : int ; + (** Max number of connections. If it's reached, [connect] and + [accept] will fail, i.e. not add more connections + (also triggers [Event.too_many_connections]). *) + + max_incoming_connections : int ; + (** Max not-yet-authentified incoming connections. + Above this number, [accept] will start dropping incoming + connections. *) + + authentification_timeout : float ; + (** Delay granted to a peer to perform authentication, in seconds. *) + + incoming_app_message_queue_size : int option ; + (** Size of the message queue for user messages (messages returned + by this module's [read] function. *) + + incoming_message_queue_size : int option ; + (** Size of the incoming message queue internal of a peer's Reader + (See [P2p_connection.accept]). *) + + outgoing_message_queue_size : int option ; + (** Size of the outgoing message queue internal to a peer's Writer + (See [P2p_connection.accept]). *) +} + +type 'meta meta_config = { + encoding : 'meta Data_encoding.t; + initial : 'meta; +} + +type 'msg message_config = { + encoding : 'msg encoding list ; + versions : P2p_types.Version.t list; +} + +val create: + config -> + 'meta meta_config -> + 'msg message_config -> + P2p_io_scheduler.t -> + ('msg, 'meta) pool Lwt.t +(** [create config meta_cfg msg_cfg io_sched] is a freshly minted + pool. *) + +val destroy: ('msg, 'meta) pool -> unit Lwt.t +(** [destroy pool] returns when member connections are either + disconnected or canceled. *) + +val active_connections: ('msg, 'meta) pool -> int +(** [active_connections pool] is the number of connections inside + [pool]. *) + +val pool_stat: ('msg, 'meta) pool -> Stat.t +(** [pool_stat pool] is a snapshot of current bandwidth usage for the + entire [pool]. *) + +(** {2 Pool events} *) + +module Events : sig + val too_few_connections: ('msg, 'meta) pool -> unit Lwt.t + val too_many_connections: ('msg, 'meta) pool -> unit Lwt.t + val new_point: ('msg, 'meta) pool -> unit Lwt.t +end + +(** {1 Connections management} *) + +type ('msg, 'meta) connection +(** Type of a connection to a peer, parametrized by the type of + messages exchanged as well as meta-information associated to a + peer. It mostly wraps [P2p_connection.connection], adding + meta-information and data-structures describing a more + fine-grained logical state of the connection. *) + +type error += Pending_connection +type error += Connected +type error += Connection_refused +type error += Rejected of Gid.t +type error += Too_many_connections +type error += Closed_network + +val connect: + timeout:float -> + ('msg, 'meta) pool -> Point.t -> + ('msg, 'meta) connection tzresult Lwt.t +(** [connect ~timeout pool point] tries to add a + connection to [point] in [pool] in less than [timeout] seconds. *) + +val accept: + ('msg, 'meta) pool -> Lwt_unix.file_descr -> Point.t -> unit +(** [accept pool fd point] instructs [pool] to start the process of + accepting a connection from [fd]. Used by [P2p]. *) + +val disconnect: + ?wait:bool -> ('msg, 'meta) connection -> unit Lwt.t +(** [disconnect conn] cleanly closes [conn] and returns after [conn]'s + internal worker has returned. *) + +val connection_info: ('msg, 'meta) connection -> Connection_info.t + +val connection_stat: ('msg, 'meta) connection -> Stat.t +(** [stat conn] is a snapshot of current bandwidth usage for + [conn]. *) + +val fold_connections: + ('msg, 'meta) pool -> + init:'a -> + f:(Gid.t -> ('msg, 'meta) connection -> 'a -> 'a) -> + 'a + +(** {1 I/O on connections} *) + +type error += Connection_closed + +val read: ('msg, 'meta) connection -> 'msg tzresult Lwt.t +(** [read conn] returns a message popped from [conn]'s app message + queue, or fails with [Connection_closed]. *) + +val is_readable: ('msg, 'meta) connection -> unit tzresult Lwt.t +(** [is_readable conn] returns when there is at least one message + ready to be read. *) + +val write: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t +(** [write conn msg] is [P2p_connection.write conn' msg] where [conn'] + is the internal [P2p_connection.t] inside [conn]. *) + +val write_sync: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t +(** [write_sync conn msg] is [P2p_connection.write_sync conn' msg] + where [conn'] is the internal [P2p_connection.t] inside [conn]. *) + +val write_now: ('msg, 'meta) connection -> 'msg -> bool tzresult +(** [write_now conn msg] is [P2p_connection.write_now conn' msg] where + [conn'] is the internal [P2p_connection.t] inside [conn]. *) + +(** {2 Broadcast functions} *) + +val write_all: ('msg, 'meta) pool -> 'msg -> unit +(** [write_all pool msg] is [write_now conn msg] for all member + connections to [pool] in [Running] state. *) + +val broadcast_bootstrap_msg: ('msg, 'meta) pool -> unit +(** [write_all pool msg] is [P2P_connection.write_now conn Bootstrap] + for all member connections to [pool] in [Running] state. *) + +(** {1 Functions on [Gid]} *) + +module Gids : sig + + type ('msg, 'meta) info = (('msg, 'meta) connection, 'meta) Gid_info.t + + val info: + ('msg, 'meta) pool -> Gid.t -> ('msg, 'meta) info option + + val get_metadata: ('msg, 'meta) pool -> Gid.t -> 'meta option + val set_metadata: ('msg, 'meta) pool -> Gid.t -> 'meta -> unit + + val get_trusted: ('msg, 'meta) pool -> Gid.t -> bool + val set_trusted: ('msg, 'meta) pool -> Gid.t -> unit + val unset_trusted: ('msg, 'meta) pool -> Gid.t -> unit + + val find_connection: + ('msg, 'meta) pool -> Gid.t -> ('msg, 'meta) connection option + + val fold_known: + ('msg, 'meta) pool -> + init:'a -> + f:(Gid.t -> ('msg, 'meta) info -> 'a -> 'a) -> + 'a + + val fold_connected: + ('msg, 'meta) pool -> + init:'a -> + f:(Gid.t -> ('msg, 'meta) info -> 'a -> 'a) -> + 'a + +end + +(** {1 Functions on [Points]} *) + +module Points : sig + + type ('msg, 'meta) info = ('msg, 'meta) connection Point_info.t + + val info: + ('msg, 'meta) pool -> Point.t -> ('msg, 'meta) info option + + val get_trusted: ('msg, 'meta) pool -> Point.t -> bool + val set_trusted: ('msg, 'meta) pool -> Point.t -> unit + val unset_trusted: ('msg, 'meta) pool -> Point.t -> unit + + val find_connection: + ('msg, 'meta) pool -> Point.t -> ('msg, 'meta) connection option + + val fold_known: + ('msg, 'meta) pool -> + init:'a -> + f:(Point.t -> ('msg, 'meta) info -> 'a -> 'a) -> + 'a + + val fold_connected: + ('msg, 'meta) pool -> + init:'a -> + f:(Point.t -> ('msg, 'meta) info -> 'a -> 'a) -> + 'a + +end + +(**/**) + +module Message : sig + + type 'msg t = + | Bootstrap + | Advertise of Point.t list + | Message of 'msg + | Disconnect + + val encoding: 'msg encoding list -> 'msg t Data_encoding.t + +end diff --git a/src/node/net/p2p_connection_pool_types.ml b/src/node/net/p2p_connection_pool_types.ml new file mode 100644 index 000000000..2e2bcd5f9 --- /dev/null +++ b/src/node/net/p2p_connection_pool_types.ml @@ -0,0 +1,463 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open P2p_types + +module Point_info = struct + + type 'data state = + | Requested of { cancel: Canceler.t } + | Accepted of { current_gid: Gid.t ; + cancel: Canceler.t } + | Running of { data: 'data ; + current_gid: Gid.t } + | Disconnected + + module Event = struct + + type kind = + | Outgoing_request + | Accepting_request of Gid.t + | Rejecting_request of Gid.t + | Request_rejected of Gid.t option + | Connection_established of Gid.t + | Disconnection of Gid.t + | External_disconnection of Gid.t + + type t = { + kind : kind ; + timestamp : Time.t ; + } + + end + + type greylisting_config = { + factor: float ; + initial_delay: int ; + disconnection_delay: int ; + } + + type 'data t = { + point : Point.t ; + mutable trusted : bool ; + mutable state : 'data state ; + mutable last_failed_connection : Time.t option ; + mutable last_rejected_connection : (Gid.t * Time.t) option ; + mutable last_established_connection : (Gid.t * Time.t) option ; + mutable last_disconnection : (Gid.t * Time.t) option ; + greylisting : greylisting_config ; + mutable greylisting_delay : float ; + mutable greylisting_end : Time.t ; + events : Event.t Ring.t ; + } + type 'data point_info = 'data t + + let compare pi1 pi2 = Point.compare pi1.point pi2.point + + let log_size = 100 + + let default_greylisting_config = { + factor = 1.2 ; + initial_delay = 1 ; + disconnection_delay = 60 ; + } + + let create + ?(trusted = false) + ?(greylisting_config = default_greylisting_config) addr port = { + point = (addr, port) ; + trusted ; + state = Disconnected ; + last_failed_connection = None ; + last_rejected_connection = None ; + last_established_connection = None ; + last_disconnection = None ; + events = Ring.create log_size ; + greylisting = greylisting_config ; + greylisting_delay = 1. ; + greylisting_end = Time.now () ; + } + + let point s = s.point + let trusted s = s.trusted + let set_trusted gi = gi.trusted <- true + let unset_trusted gi = gi.trusted <- false + let last_established_connection s = s.last_established_connection + let last_disconnection s = s.last_disconnection + let last_failed_connection s = s.last_failed_connection + let last_rejected_connection s = s.last_rejected_connection + let greylisted ?(now = Time.now ()) s = + Time.compare now s.greylisting_end <= 0 + + let recent a1 a2 = + match a1, a2 with + | (None, None) -> None + | (None, (Some _ as a)) + | (Some _ as a, None) -> a + | (Some (_, t1), Some (_, t2)) -> + if Time.compare t1 t2 < 0 then a2 else a1 + let last_seen s = + recent s.last_rejected_connection + (recent s.last_established_connection s.last_disconnection) + let last_miss s = + match + s.last_failed_connection, + (map_option ~f:(fun (_, time) -> time) @@ + recent s.last_rejected_connection s.last_disconnection) with + | (None, None) -> None + | (None, (Some _ as a)) + | (Some _ as a, None) -> a + | (Some t1 as a1 , (Some t2 as a2)) -> + if Time.compare t1 t2 < 0 then a2 else a1 + + let fold_events { events } ~init ~f = Ring.fold events ~init ~f + + let log { events } ?(timestamp = Time.now ()) kind = + Ring.add events { kind ; timestamp } + + let log_incoming_rejection ?timestamp point_info gid = + log point_info ?timestamp (Rejecting_request gid) + + module State = struct + + type 'data t = 'data state = + | Requested of { cancel: Canceler.t } + | Accepted of { current_gid: Gid.t ; + cancel: Canceler.t } + | Running of { data: 'data ; + current_gid: Gid.t } + | Disconnected + type 'data state = 'data t + + let pp ppf = function + | Requested _ -> + Format.fprintf ppf "requested" + | Accepted { current_gid } -> + Format.fprintf ppf "accepted %a" Gid.pp current_gid + | Running { current_gid } -> + Format.fprintf ppf "running %a" Gid.pp current_gid + | Disconnected -> + Format.fprintf ppf "disconnected" + + let get { state } = state + + let is_disconnected { state } = + match state with + | Disconnected -> true + | Requested _ | Accepted _ | Running _ -> false + + let set_requested ?timestamp point_info cancel = + assert begin + match point_info.state with + | Requested _ -> true + | Accepted _ | Running _ -> false + | Disconnected -> true + end ; + point_info.state <- Requested { cancel } ; + log point_info ?timestamp Outgoing_request + + let set_accepted + ?(timestamp = Time.now ()) + point_info current_gid cancel = + (* log_notice "SET_ACCEPTED %a@." Point.pp point_info.point ; *) + assert begin + match point_info.state with + | Accepted _ | Running _ -> false + | Requested _ | Disconnected -> true + end ; + point_info.state <- Accepted { current_gid ; cancel } ; + log point_info ~timestamp (Accepting_request current_gid) + + let set_running + ?(timestamp = Time.now ()) + point_info gid data = + assert begin + match point_info.state with + | Disconnected -> true (* request to unknown gid. *) + | Running _ -> false + | Accepted { current_gid } -> Gid.equal gid current_gid + | Requested _ -> true + end ; + point_info.state <- Running { data ; current_gid = gid } ; + point_info.last_established_connection <- Some (gid, timestamp) ; + log point_info ~timestamp (Connection_established gid) + + let set_greylisted timestamp point_info = + point_info.greylisting_end <- + Time.add + timestamp + (Int64.of_float point_info.greylisting_delay) ; + point_info.greylisting_delay <- + point_info.greylisting_delay *. point_info.greylisting.factor + + let set_disconnected + ?(timestamp = Time.now ()) ?(requested = false) point_info = + let event : Event.kind = + match point_info.state with + | Requested _ -> + set_greylisted timestamp point_info ; + point_info.last_failed_connection <- Some timestamp ; + Request_rejected None + | Accepted { current_gid } -> + set_greylisted timestamp point_info ; + point_info.last_rejected_connection <- + Some (current_gid, timestamp) ; + Request_rejected (Some current_gid) + | Running { current_gid } -> + point_info.greylisting_delay <- + float_of_int point_info.greylisting.initial_delay ; + point_info.greylisting_end <- + Time.add timestamp + (Int64.of_int point_info.greylisting.disconnection_delay) ; + point_info.last_disconnection <- Some (current_gid, timestamp) ; + if requested + then Disconnection current_gid + else External_disconnection current_gid + | Disconnected -> + assert false + in + point_info.state <- Disconnected ; + log point_info ~timestamp event + + end + +end + +module Gid_info = struct + + type 'data state = + | Accepted of { current_point: Id_point.t ; + cancel: Canceler.t } + | Running of { data: 'data ; + current_point: Id_point.t } + | Disconnected + + module Event = struct + + type kind = + | Accepting_request + | Rejecting_request + | Request_rejected + | Connection_established + | Disconnection + | External_disconnection + + let kind_encoding = + let open Data_encoding in + Data_encoding.string_enum [ + "incoming_request", Accepting_request ; + "rejecting_request", Rejecting_request ; + "request_rejected", Request_rejected ; + "connection_established", Connection_established ; + "disconnection", Disconnection ; + "external_disconnection", External_disconnection ; + ] + + type t = { + kind : kind ; + timestamp : Time.t ; + point : Id_point.t ; + } + + let encoding = + let open Data_encoding in + conv + (fun { kind ; timestamp ; point = (addr, port) } -> + (kind, timestamp, Ipaddr.V6.to_string addr, port)) + (fun (kind, timestamp, addr, port) -> + let addr = Ipaddr.V6.of_string_exn addr in + { kind ; timestamp ; point = (addr, port) }) + (obj4 + (req "kind" kind_encoding) + (req "timestamp" Time.encoding) + (req "addr" string) + (opt "port" int16)) + + end + + type ('conn, 'meta) t = { + gid : Gid.t ; + mutable state : 'conn state ; + mutable metadata : 'meta ; + mutable trusted : bool ; + mutable last_failed_connection : (Id_point.t * Time.t) option ; + mutable last_rejected_connection : (Id_point.t * Time.t) option ; + mutable last_established_connection : (Id_point.t * Time.t) option ; + mutable last_disconnection : (Id_point.t * Time.t) option ; + events : Event.t Ring.t ; + } + type ('conn, 'meta) gid_info = ('conn, 'meta) t + + let compare gi1 gi2 = Gid.compare gi1.gid gi2.gid + + let log_size = 100 + + let create ?(trusted = false) ~metadata gid = + { gid ; + state = Disconnected ; + metadata ; + trusted ; + events = Ring.create log_size ; + last_failed_connection = None ; + last_rejected_connection = None ; + last_established_connection = None ; + last_disconnection = None ; + } + + let encoding metadata_encoding = + let open Data_encoding in + conv + (fun { gid ; trusted ; metadata ; events ; + last_failed_connection ; last_rejected_connection ; + last_established_connection ; last_disconnection } -> + (gid, trusted, metadata, Ring.elements events, + last_failed_connection, last_rejected_connection, + last_established_connection, last_disconnection)) + (fun (gid, trusted, metadata, event_list, + last_failed_connection, last_rejected_connection, + last_established_connection, last_disconnection) -> + let info = create ~trusted ~metadata gid in + let events = Ring.create log_size in + Ring.add_list info.events event_list ; + { state = Disconnected ; + trusted ; gid ; metadata ; events ; + last_failed_connection ; + last_rejected_connection ; + last_established_connection ; + last_disconnection ; + }) + (obj8 + (req "gid" Gid.encoding) + (dft "trusted" bool false) + (req "metadata" metadata_encoding) + (dft "events" (list Event.encoding) []) + (opt "last_failed_connection" + (tup2 Id_point.encoding Time.encoding)) + (opt "last_rejected_connection" + (tup2 Id_point.encoding Time.encoding)) + (opt "last_established_connection" + (tup2 Id_point.encoding Time.encoding)) + (opt "last_disconnection" + (tup2 Id_point.encoding Time.encoding))) + + let gid { gid } = gid + let metadata { metadata } = metadata + let set_metadata gi metadata = gi.metadata <- metadata + let trusted { trusted } = trusted + let set_trusted gi = gi.trusted <- true + let unset_trusted gi = gi.trusted <- false + let fold_events { events } ~init ~f = Ring.fold events ~init ~f + + let last_established_connection s = s.last_established_connection + let last_disconnection s = s.last_disconnection + let last_failed_connection s = s.last_failed_connection + let last_rejected_connection s = s.last_rejected_connection + + let recent = Point_info.recent + let last_seen s = + recent + s.last_established_connection + (recent s.last_rejected_connection s.last_disconnection) + let last_miss s = + recent + s.last_failed_connection + (recent s.last_rejected_connection s.last_disconnection) + + let log { events } ?(timestamp = Time.now ()) point kind = + Ring.add events { kind ; timestamp ; point } + + let log_incoming_rejection ?timestamp gid_info point = + log gid_info ?timestamp point Rejecting_request + + module State = struct + + type 'data t = 'data state = + | Accepted of { current_point: Id_point.t ; + cancel: Canceler.t } + | Running of { data: 'data ; + current_point: Id_point.t } + | Disconnected + type 'data state = 'data t + + let pp ppf = function + | Accepted { current_point } -> + Format.fprintf ppf "accepted %a" Id_point.pp current_point + | Running { current_point } -> + Format.fprintf ppf "running %a" Id_point.pp current_point + | Disconnected -> + Format.fprintf ppf "disconnected" + + let get { state } = state + + let is_disconnected { state } = + match state with + | Disconnected -> true + | Accepted _ | Running _ -> false + + let set_accepted + ?(timestamp = Time.now ()) + gid_info current_point cancel = + assert begin + match gid_info.state with + | Accepted _ | Running _ -> false + | Disconnected -> true + end ; + gid_info.state <- Accepted { current_point ; cancel } ; + log gid_info ~timestamp current_point Accepting_request + + let set_running + ?(timestamp = Time.now ()) + gid_info point data = + assert begin + match gid_info.state with + | Disconnected -> true (* request to unknown gid. *) + | Running _ -> false + | Accepted { current_point } -> + Id_point.equal point current_point + end ; + gid_info.state <- Running { data ; current_point = point } ; + gid_info.last_established_connection <- Some (point, timestamp) ; + log gid_info ~timestamp point Connection_established + + let set_disconnected + ?(timestamp = Time.now ()) ?(requested = false) gid_info = + let current_point, (event : Event.kind) = + match gid_info.state with + | Accepted { current_point } -> + gid_info.last_rejected_connection <- + Some (current_point, timestamp) ; + current_point, Request_rejected + | Running { current_point } -> + gid_info.last_disconnection <- + Some (current_point, timestamp) ; + current_point, + if requested then Disconnection else External_disconnection + | Disconnected -> assert false + in + gid_info.state <- Disconnected ; + log gid_info ~timestamp current_point event + + end + + module File = struct + + let load path metadata_encoding = + let enc = Data_encoding.list (encoding metadata_encoding) in + Data_encoding_ezjsonm.read_file path >|= + map_option ~f:(Data_encoding.Json.destruct enc) >|= + unopt [] + + let save path metadata_encoding peers = + let open Data_encoding in + Data_encoding_ezjsonm.write_file path @@ + Json.construct (list (encoding metadata_encoding)) peers + + end + +end diff --git a/src/node/net/p2p_connection_pool_types.mli b/src/node/net/p2p_connection_pool_types.mli new file mode 100644 index 000000000..8c2c3a584 --- /dev/null +++ b/src/node/net/p2p_connection_pool_types.mli @@ -0,0 +1,265 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open P2p_types + +module Point_info : sig + + type 'conn t + type 'conn point_info = 'conn t + (** Type of info associated to a point. *) + + val compare : 'conn point_info -> 'conn point_info -> int + + type greylisting_config = { + factor: float ; + initial_delay: int ; + disconnection_delay: int ; + } + + val create : + ?trusted:bool -> + ?greylisting_config:greylisting_config -> + addr -> port -> 'conn point_info + (** [create ~trusted addr port] is a freshly minted point_info. If + [trusted] is true, this point is considered trusted and will + be treated as such. *) + + val trusted : 'conn point_info -> bool + (** [trusted pi] is [true] iff [pi] has is trusted, + i.e. "whitelisted". *) + + val set_trusted : 'conn point_info -> unit + val unset_trusted : 'conn point_info -> unit + + val last_failed_connection : + 'conn point_info -> Time.t option + val last_rejected_connection : + 'conn point_info -> (Gid.t * Time.t) option + val last_established_connection : + 'conn point_info -> (Gid.t * Time.t) option + val last_disconnection : + 'conn point_info -> (Gid.t * Time.t) option + + val last_seen : + 'conn point_info -> (Gid.t * Time.t) option + (** [last_seen pi] is the most recent of: + + * last established connection + * last rejected connection + * last disconnection + *) + + val last_miss : + 'conn point_info -> Time.t option + + val greylisted : + ?now:Time.t -> 'conn point_info -> bool + + val point : 'conn point_info -> Point.t + + module State : sig + + type 'conn t = + | Requested of { cancel: Canceler.t } + (** We initiated a connection. *) + | Accepted of { current_gid: Gid.t ; + cancel: Canceler.t } + (** We accepted a incoming connection. *) + | Running of { data: 'conn ; + current_gid: Gid.t } + (** Successfully authentificated connection, normal business. *) + | Disconnected + (** No connection established currently. *) + type 'conn state = 'conn t + + val pp : Format.formatter -> 'conn t -> unit + + val get : 'conn point_info -> 'conn state + + val is_disconnected : 'conn point_info -> bool + + val set_requested : + ?timestamp:Time.t -> + 'conn point_info -> Canceler.t -> unit + + val set_accepted : + ?timestamp:Time.t -> + 'conn point_info -> Gid.t -> Canceler.t -> unit + + val set_running : + ?timestamp:Time.t -> 'conn point_info -> Gid.t -> 'conn -> unit + + val set_disconnected : + ?timestamp:Time.t -> ?requested:bool -> 'conn point_info -> unit + + end + + module Event : sig + + type kind = + | Outgoing_request + (** We initiated a connection. *) + | Accepting_request of Gid.t + (** We accepted a connection after authentifying the remote peer. *) + | Rejecting_request of Gid.t + (** We rejected a connection after authentifying the remote peer. *) + | Request_rejected of Gid.t option + (** The remote peer rejected our connection. *) + | Connection_established of Gid.t + (** We succesfully established a authentified connection. *) + | Disconnection of Gid.t + (** We decided to close the connection. *) + | External_disconnection of Gid.t + (** The connection was closed for external reason. *) + + type t = { + kind : kind ; + timestamp : Time.t ; + } + + end + + val fold_events : + 'conn point_info -> init:'a -> f:('a -> Event.t -> 'a) -> 'a + + val log_incoming_rejection : + ?timestamp:Time.t -> 'conn point_info -> Gid.t -> unit + +end + + +(** Gid info: current and historical information about a gid *) + +module Gid_info : sig + + type ('conn, 'meta) t + type ('conn, 'meta) gid_info = ('conn, 'meta) t + + val compare : ('conn, 'meta) t -> ('conn, 'meta) t -> int + + val create : + ?trusted:bool -> + metadata:'meta -> + Gid.t -> ('conn, 'meta) gid_info + (** [create ~trusted ~meta gid] is a freshly minted gid info for + [gid]. *) + + val gid : ('conn, 'meta) gid_info -> Gid.t + + val metadata : ('conn, 'meta) gid_info -> 'meta + val set_metadata : ('conn, 'meta) gid_info -> 'meta -> unit + + val trusted : ('conn, 'meta) gid_info -> bool + val set_trusted : ('conn, 'meta) gid_info -> unit + val unset_trusted : ('conn, 'meta) gid_info -> unit + + val last_failed_connection : + ('conn, 'meta) gid_info -> (Id_point.t * Time.t) option + val last_rejected_connection : + ('conn, 'meta) gid_info -> (Id_point.t * Time.t) option + val last_established_connection : + ('conn, 'meta) gid_info -> (Id_point.t * Time.t) option + val last_disconnection : + ('conn, 'meta) gid_info -> (Id_point.t * Time.t) option + + val last_seen : + ('conn, 'meta) gid_info -> (Id_point.t * Time.t) option + (** [last_seen gi] is the most recent of: + + * last established connection + * last rejected connection + * last disconnection + *) + + val last_miss : + ('conn, 'meta) gid_info -> (Id_point.t * Time.t) option + (** [last_miss gi] is the most recent of: + + * last failed connection + * last rejected connection + * last disconnection + *) + + module State : sig + + type 'conn t = + | Accepted of { current_point: Id_point.t ; + cancel: Canceler.t } + (** We accepted a incoming connection, we greeted back and + we are waiting for an acknowledgement. *) + | Running of { data: 'conn ; + current_point: Id_point.t } + (** Successfully authentificated connection, normal business. *) + | Disconnected + (** No connection established currently. *) + type 'conn state = 'conn t + + val pp : Format.formatter -> 'conn t -> unit + + val get : ('conn, 'meta) gid_info -> 'conn state + + val is_disconnected : ('conn, 'meta) gid_info -> bool + + val set_accepted : + ?timestamp:Time.t -> + ('conn, 'meta) gid_info -> Id_point.t -> Canceler.t -> unit + + val set_running : + ?timestamp:Time.t -> + ('conn, 'meta) gid_info -> Id_point.t -> 'conn -> unit + + val set_disconnected : + ?timestamp:Time.t -> + ?requested:bool -> + ('conn, 'meta) gid_info -> unit + + end + + module Event : sig + + type kind = + | Accepting_request + (** We accepted a connection after authentifying the remote peer. *) + | Rejecting_request + (** We rejected a connection after authentifying the remote peer. *) + | Request_rejected + (** The remote peer rejected our connection. *) + | Connection_established + (** We succesfully established a authentified connection. *) + | Disconnection + (** We decided to close the connection. *) + | External_disconnection + (** The connection was closed for external reason. *) + + type t = { + kind : kind ; + timestamp : Time.t ; + point : Id_point.t ; + } + + end + + val fold_events : + ('conn, 'meta) gid_info -> init:'a -> f:('a -> Event.t -> 'a) -> 'a + + val log_incoming_rejection : + ?timestamp:Time.t -> + ('conn, 'meta) gid_info -> Id_point.t -> unit + + module File : sig + val load : + string -> 'meta Data_encoding.t -> + ('conn, 'meta) gid_info list Lwt.t + val save : + string -> 'meta Data_encoding.t -> + ('conn, 'meta) gid_info list -> bool Lwt.t + end + +end diff --git a/src/node/net/p2p_welcome.ml b/src/node/net/p2p_welcome.ml new file mode 100644 index 000000000..a30dadd29 --- /dev/null +++ b/src/node/net/p2p_welcome.ml @@ -0,0 +1,77 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +include Logging.Make (struct let name = "p2p.welcome" end) +open P2p_types + +type pool = Pool : ('msg, 'meta) P2p_connection_pool.t -> pool + +type t = { + socket: Lwt_unix.file_descr ; + canceler: Canceler.t ; + pool: pool ; + mutable worker: unit Lwt.t ; +} + +let rec worker_loop st = + let Pool pool = st.pool in + Lwt_unix.yield () >>= fun () -> + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt_unix.accept st.socket >>= return + end >>= function + | Ok (fd, addr) -> + let point = + match addr with + | Lwt_unix.ADDR_UNIX _ -> assert false + | Lwt_unix.ADDR_INET (addr, port) -> + (Ipaddr_unix.V6.of_inet_addr_exn addr, port) in + P2p_connection_pool.accept pool fd point ; + worker_loop st + | Error [Lwt_utils.Canceled] -> + Lwt.return_unit + | Error err -> + lwt_log_error "@[Unexpected error in the Welcome worker@ %a@]" + pp_print_error err >>= fun () -> + Lwt.return_unit + +let create_listening_socket ~backlog ?(addr = Ipaddr.V6.unspecified) port = + let main_socket = Lwt_unix.(socket PF_INET6 SOCK_STREAM 0) in + Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ; + Lwt_unix.Versioned.bind_2 + main_socket (Point.to_sockaddr (addr, port)) >>= fun () -> + Lwt_unix.listen main_socket backlog ; + Lwt.return main_socket + +let run ~backlog pool ?addr port = + Lwt.catch begin fun () -> + create_listening_socket + ~backlog ?addr port >>= fun socket -> + let canceler = Canceler.create () in + Canceler.on_cancel canceler begin fun () -> + Lwt_utils.safe_close socket + end ; + let st = { + socket ; canceler ; pool = Pool pool ; + worker = Lwt.return_unit ; + } in + st.worker <- + Lwt_utils.worker "welcome" + (fun () -> worker_loop st) + (fun () -> Canceler.cancel st.canceler) ; + Lwt.return st + end begin fun exn -> + lwt_log_error + "@[Cannot accept incoming connections@ %a@]" + pp_exn exn >>= fun () -> + Lwt.fail exn + end + +let shutdown st = + Canceler.cancel st.canceler >>= fun () -> + st.worker diff --git a/src/node/net/p2p_welcome.mli b/src/node/net/p2p_welcome.mli new file mode 100644 index 000000000..9fd3853bb --- /dev/null +++ b/src/node/net/p2p_welcome.mli @@ -0,0 +1,27 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open P2p_types + +(** Welcome worker. Accept incoming connections and add them to its + connection pool. *) + +type t +(** Type of a welcome worker, parametrized like a + [P2p_connection_pool.pool]. *) + +val run: + backlog:int -> + ('msg, 'meta) P2p_connection_pool.t -> + ?addr:addr -> port -> t Lwt.t +(** [run ~backlog ~addr pool port] returns a running welcome worker + feeding [pool] listening at [(addr, port)]. [backlog] is the + argument passed to [Lwt_unix.accept]. *) + +val shutdown: t -> unit Lwt.t diff --git a/test/Makefile b/test/Makefile index dd5f5c8fc..ac52d5b75 100644 --- a/test/Makefile +++ b/test/Makefile @@ -5,6 +5,7 @@ TESTS := \ basic basic.sh \ p2p-io-scheduler \ p2p-connection \ + p2p-connection-pool all: test @@ -196,6 +197,11 @@ build-test-p2p-connection: test-p2p-connection run-test-p2p-connection: ./test-p2p-connection +.PHONY:build-test-p2p-connection-pool run-test-p2p-connection-pool +build-test-p2p-connection-pool: test-p2p-connection-pool +run-test-p2p-connection-pool: + ./test-p2p-connection-pool --clients 10 --repeat 5 + TEST_P2P_IO_SCHEDULER_IMPLS = \ lib/process.ml \ test_p2p_io_scheduler.ml @@ -204,6 +210,10 @@ TEST_P2P_CONNECTION_IMPLS = \ lib/process.ml \ test_p2p_connection.ml +TEST_P2P_CONNECTION_POOL_IMPLS = \ + lib/process.ml \ + test_p2p_connection_pool.ml + ${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx}: ${NODELIB} test-p2p-io-scheduler: ${NODELIB} ${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx} ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^ @@ -212,9 +222,14 @@ ${TEST_P2P_CONNECTION_IMPLS:.ml=.cmx}: ${NODELIB} test-p2p-connection: ${NODELIB} ${TEST_P2P_CONNECTION_IMPLS:.ml=.cmx} ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^ +${TEST_P2P_CONNECTION_POOL_IMPLS:.ml=.cmx}: ${NODELIB} +test-p2p-connection-pool: ${NODELIB} ${TEST_P2P_CONNECTION_POOL_IMPLS:.ml=.cmx} + ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^ + clean:: -rm -f test-p2p-io_scheduler -rm -f test-p2p-connection + -rm -f test-p2p-connection-pool ############################################################################ ## lwt pipe test program diff --git a/test/test_p2p_connection_pool.ml b/test/test_p2p_connection_pool.ml new file mode 100644 index 000000000..bf3e8b20c --- /dev/null +++ b/test/test_p2p_connection_pool.ml @@ -0,0 +1,196 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Error_monad +open P2p_types +include Logging.Make (struct let name = "test-p2p-connection-pool" end) + +type message = + | Ping + + +let msg_config : message P2p_connection_pool.message_config = { + encoding = [ + P2p_connection_pool.Encoding { + tag = 0x10 ; + encoding = Data_encoding.empty ; + wrap = (function () -> Ping) ; + unwrap = (function Ping -> Some ()) ; + max_length = None ; + } ; + ] ; + versions = Version.[ { name = "TEST" ; major = 0 ; minor = 0 } ] ; +} + +type metadata = unit + +let meta_config : metadata P2p_connection_pool.meta_config = { + encoding = Data_encoding.empty ; + initial = () ; +} + +let rec connect ~timeout pool point = + lwt_log_info "Connect to %a" Point.pp point >>= fun () -> + P2p_connection_pool.connect pool point ~timeout >>= function + | Error [P2p_connection_pool.Connected] -> begin + match P2p_connection_pool.Points.find_connection pool point with + | Some conn -> return conn + | None -> failwith "Woops..." + end + | Error ([ P2p_connection_pool.Connection_refused + | P2p_connection_pool.Pending_connection + | P2p_connection.Rejected + | Lwt_utils.Canceled + | Lwt_utils.Timeout + | P2p_connection_pool.Rejected _ + ] as err) -> + lwt_log_info "@[Connection to %a failed:@ %a@]" + Point.pp point pp_print_error err >>= fun () -> + Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () -> + connect ~timeout pool point + | Ok _ | Error _ as res -> Lwt.return res + +let connect_all ~timeout pool points = + map_p (connect ~timeout pool) points + +type error += Connect | Write | Read + +let write_all conns msg = + iter_p + (fun conn -> + trace Write @@ P2p_connection_pool.write_sync conn msg) + conns + +let read_all conns = + iter_p + (fun conn -> + trace Read @@ P2p_connection_pool.read conn >>=? fun Ping -> + return ()) + conns + +let rec connect_random pool total rem point n = + Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> + (trace Connect @@ connect ~timeout:2. pool point) >>=? fun conn -> + (trace Write @@ P2p_connection_pool.write conn Ping) >>= fun _ -> + (trace Read @@ P2p_connection_pool.read conn) >>=? fun Ping -> + Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> + P2p_connection_pool.disconnect conn >>= fun () -> + begin + decr rem ; + if !rem mod total = 0 then + lwt_log_notice "Remaining: %d." (!rem / total) + else + Lwt.return () + end >>= fun () -> + if n > 1 then + connect_random pool total rem point (pred n) + else + return () + +let connect_random_all pool points n = + let total = List.length points in + let rem = ref (n * total) in + iter_p (fun point -> connect_random pool total rem point n) points + +let close_all conns = + Lwt_list.iter_p P2p_connection_pool.disconnect conns + + +let run_net config repeat points addr port = + Lwt_unix.sleep (Random.float 2.0) >>= fun () -> + let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in + P2p_connection_pool.create + config meta_config msg_config sched >>= fun pool -> + P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome -> + connect_all ~timeout:2. pool points >>=? fun conns -> + lwt_log_notice "Bootstrap OK" >>= fun () -> + write_all conns Ping >>=? fun () -> + lwt_log_notice "Sent all messages." >>= fun () -> + read_all conns >>=? fun () -> + lwt_log_notice "Read all messages." >>= fun () -> + close_all conns >>= fun () -> + lwt_log_notice "Begin random connections." >>= fun () -> + connect_random_all pool points repeat >>=? fun () -> + lwt_log_notice "Shutting down" >>= fun () -> + P2p_welcome.shutdown welcome >>= fun () -> + P2p_connection_pool.destroy pool >>= fun () -> + P2p_io_scheduler.shutdown sched >>= fun () -> + lwt_log_notice "Shutdown Ok" >>= fun () -> + return () + +let make_net points repeat n = + let point, points = Utils.select n points in + let proof_of_work_target = Crypto_box.make_target [] in + let identity = Identity.generate proof_of_work_target in + let config = P2p_connection_pool.{ + identity ; + proof_of_work_target ; + trusted_points = points ; + peers_file = "/dev/null" ; + closed_network = true ; + listening_port = Some (snd point) ; + min_connections = List.length points ; + max_connections = List.length points ; + max_incoming_connections = List.length points ; + authentification_timeout = 2. ; + incoming_app_message_queue_size = None ; + incoming_message_queue_size = None ; + outgoing_message_queue_size = None ; + } in + Process.detach + ~prefix:(Format.asprintf "%a " Gid.pp identity.gid) + begin fun () -> + run_net config repeat points (fst point) (snd point) >>= function + | Ok () -> Lwt.return_unit + | Error err -> + lwt_log_error "@[Unexpected error: %d@ %a@]" + (List.length err) + pp_print_error err >>= fun () -> + exit 1 + end + +let addr = ref Ipaddr.V6.localhost +let port = ref (1024 + Random.int 8192) +let clients = ref 10 +let repeat = ref 5 + +let spec = Arg.[ + + "--port", Int (fun p -> port := p), " Listening port of the first peer."; + + "--addr", String (fun p -> addr := Ipaddr.V6.of_string_exn p), + " Listening addr"; + + "--clients", Set_int clients, " Number of concurrent clients." ; + + "--repeat", Set_int repeat, " Number of connections/disconnections." ; + + "-v", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Info)), + " Log up to info msgs" ; + + "-vv", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Debug)), + " Log up to debug msgs"; + + ] + +let main () = + let open Utils in + let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in + let usage_msg = "Usage: %s .\nArguments are:" in + Arg.parse spec anon_fun usage_msg ; + let ports = !port -- (!port + !clients - 1) in + let points = List.map (fun port -> !addr, port) ports in + Lwt_list.iter_p (make_net points !repeat) (0 -- (!clients - 1)) + +let () = + Sys.catch_break true ; + try + Logging.init Stderr ; + Lwt_main.run @@ main () + with _ -> ()