diff --git a/src/.merlin b/src/.merlin index d351d972f..1224c6ffd 100644 --- a/src/.merlin +++ b/src/.merlin @@ -32,6 +32,7 @@ PKG cstruct PKG dynlink PKG ezjsonm PKG git +PKG ipv6-multicast PKG irmin PKG lwt PKG mtime.os diff --git a/src/Makefile b/src/Makefile index 49aaaf097..64584fdbe 100644 --- a/src/Makefile +++ b/src/Makefile @@ -263,6 +263,8 @@ NODE_LIB_INTFS := \ node/net/p2p_connection_pool_types.mli \ node/net/p2p_connection_pool.mli \ node/net/p2p_welcome.mli \ + node/net/p2p_discovery.mli \ + node/net/p2p_maintenance.mli \ node/net/p2p.mli \ node/net/RPC_server.mli \ \ @@ -299,6 +301,8 @@ NODE_LIB_IMPLS := \ node/net/p2p_connection_pool_types.ml \ node/net/p2p_connection_pool.ml \ node/net/p2p_welcome.ml \ + node/net/p2p_discovery.ml \ + node/net/p2p_maintenance.ml \ node/net/p2p.ml \ \ node/net/RPC_server.ml \ @@ -335,6 +339,7 @@ NODE_PACKAGES := \ cohttp.lwt \ dynlink \ git \ + ipv6-multicast \ irmin.unix \ ocplib-resto.directory \ cmdliner \ diff --git a/src/node/net/p2p_discovery.ml b/src/node/net/p2p_discovery.ml new file mode 100644 index 000000000..2f61b286c --- /dev/null +++ b/src/node/net/p2p_discovery.ml @@ -0,0 +1,138 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open P2p_types +include Logging.Make (struct let name = "p2p.discovery" end) + +type t = () +let create _pool = () +let restart () = (() : unit) +let shutdown () = Lwt.return_unit + +let inet_addr = Unix.inet_addr_of_string "ff0e::54:455a:3053" + +module Message = struct + + let encoding = + Data_encoding.(tup3 (Fixed.string 10) Gid.encoding int16) + + let length = Data_encoding.Binary.fixed_length_exn encoding + + let make gid port = + Data_encoding.Binary.to_bytes encoding ("DISCOMAGIC", gid, port) + +end + +(* Sends discover messages into space in an exponentially delayed loop, + restartable using a condition *) +let sender sock saddr my_gid inco_port cancelation restart = + let buf = Message.make my_gid inco_port in + let rec loop delay n = + Lwt.catch + (fun () -> + Lwt_bytes.sendto sock buf 0 Message.length [] saddr >>= fun _nb_sent -> + Lwt.return_unit) + (fun exn -> + lwt_debug "(%a) error broadcasting a discovery request: %a" + Gid.pp my_gid Error_monad.pp (Exn exn)) >>= fun () -> + Lwt.pick + [ (Lwt_unix.sleep delay >>= fun () -> Lwt.return (Some (delay, n + 1))) ; + (cancelation () >>= fun () -> Lwt.return_none) ; + (Lwt_condition.wait restart >>= fun () -> Lwt.return (Some (0.1, 0))) ] + >>= function + | Some (delay, n) when n = 10 -> loop delay 9 + | Some (delay, n) -> loop (delay *. 2.) n + | None -> Lwt.return_unit + in + loop 0.2 1 + +let create_socket (iface, disco_addr, disco_port) = + let usock = Unix.socket PF_INET6 SOCK_DGRAM 0 in + let sock = Lwt_unix.of_unix_file_descr ~blocking:false usock in + let saddr = Unix.ADDR_INET (disco_addr, disco_port) in + Unix.setsockopt usock SO_REUSEADDR true ; + Ipv6_multicast.Unix.bind ?iface usock saddr ; + Ipv6_multicast.Unix.membership ?iface usock disco_addr `Join ; + iface, sock, saddr + +(* +module Answerer = struct + + (* Launch an answer machine for the discovery mechanism, takes a + callback to fill the answers and returns a canceler function *) + let answerer sock my_gid cancelation callback = + (* the answering function *) + let buf = MBytes.create Message.length in + let rec step () = + Lwt.pick + [ (cancelation () >>= fun () -> Lwt.return_none) ; + (Lwt_bytes.recvfrom sock buf 0 Message.length [] >>= fun r -> + Lwt.return (Some r)) ] >>= function + | None -> Lwt.return_unit + | Some (len', Lwt_unix.ADDR_INET (remote_addr, _mcast_port)) + when len' = Message.length -> begin + match (Data_encoding.Binary.of_bytes Message.encoding buf) with + | Some ("DISCOMAGIC", remote_gid, remote_inco_port) + when remote_gid <> my_gid -> + Lwt.catch + (fun () -> callback ~remote_addr ~remote_inco_port) + (fun exn -> + lwt_debug "Error processing a discovery request: %a" + pp_exn exn) >>= + step + | _ -> + step () + end + | Some _ -> step () + in + step () + + let worker_loop st = + let callback ~remote_addr ~remote_inco_port = + let remote_uaddr = Ipaddr_unix.V6.of_inet_addr_exn remote_addr in + P2p_connection_loop.notify_new_peer + in + Lwt.catch + (fun () -> + Lwt_utils.worker + (Format.asprintf "(%a) discovery answerer" Gid.pp my_gid) + (fun () -> answerer fd my_gid cancelation callback) + cancel) + (fun exn -> + lwt_log_error "Discovery answerer not started: %a" + Error_monad.pp (Exn exn)) + +end +let discovery_sender = + match config.pending_authentification_port with + | None -> Lwt.return_unit + | Some inco_port -> + Lwt.catch + (fun () -> + let sender () = + Discovery.sender fd + saddr my_gid inco_port cancelation restart_discovery in + Lwt_utils.worker + (Format.asprintf "(%a) discovery sender" Gid.pp my_gid) + sender cancel) + (fun exn -> + lwt_log_error "Discovery sender not started: %a" + Error_monad.pp (Exn exn)) + + +let discovery_answerer, discovery_sender = + match map_option ~f:create_socket st.config.local_discovery with + | exception exn -> + log_error "Error creating discovery socket: %a" Error_monad.pp (Exn exn) ; + (Lwt.return_unit, Lwt.return_unit) + | None -> Lwt.return_unit, Lwt.return_unit + | Some (iface, fd, saddr) -> + discovery_answerer, discovery_sender + +*) diff --git a/src/node/net/p2p_discovery.mli b/src/node/net/p2p_discovery.mli new file mode 100644 index 000000000..d9c639ab8 --- /dev/null +++ b/src/node/net/p2p_discovery.mli @@ -0,0 +1,13 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type t +val create : ('msg, 'meta) P2p_connection_pool.pool -> t +val restart : t -> unit +val shutdown : t -> unit Lwt.t diff --git a/src/node/net/p2p_maintenance.ml b/src/node/net/p2p_maintenance.ml new file mode 100644 index 000000000..2cae195dc --- /dev/null +++ b/src/node/net/p2p_maintenance.ml @@ -0,0 +1,191 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open P2p_types +open P2p_connection_pool_types + +include Logging.Make (struct let name = "p2p.maintenance" end) + +type bounds = { + min_threshold: int ; + min_target: int ; + max_target: int ; + max_threshold: int ; +} + +type 'meta pool = Pool : ('msg, 'meta) P2p_connection_pool.t -> 'meta pool + +type 'meta t = { + canceler: Canceler.t ; + connection_timeout: float ; + bounds: bounds ; + pool: 'meta pool ; + disco: P2p_discovery.t option ; + just_maintained: unit Lwt_condition.t ; + please_maintain: unit Lwt_condition.t ; + mutable worker : unit Lwt.t ; +} + +(** Select [expected] points amongst the disconnected known points. + It ignores points which are greylisted, or for which a connection + failed after [start_time]. It first selects points with the oldest + last tentative. *) +let connectable st start_time expected = + let now = Time.now () in + let module Bounded_point_info = + Utils.Bounded(struct + type t = (Time.t option * Point.t) + let compare (t1, _) (t2, _) = + match t1, t2 with + | None, None -> 0 + | None, Some _ -> 1 + | Some _, None -> -1 + | Some t1, Some t2 -> Time.compare t2 t1 + end) in + let acc = Bounded_point_info.create expected in + let Pool pool = st.pool in + P2p_connection_pool.Points.fold_known + pool ~init:() + ~f:begin fun point pi () -> + match Point_info.State.get pi with + | Disconnected -> begin + match Point_info.last_miss pi with + | Some last when Time.(start_time < last) + && not (Point_info.greylisted ~now pi) -> () + | last -> + Bounded_point_info.insert (last, point) acc + end + | _ -> () + end ; + List.map snd (Bounded_point_info.get acc) + +(** Try to create connections to new peers. It tries to create at + least [min_to_contact] connections, and will never creates more + than [max_to_contact]. But, if after trying once all disconnected + peers, it returns [false]. *) +let rec try_to_contact + st ?(start_time = Time.now ()) + min_to_contact max_to_contact = + let Pool pool = st.pool in + if min_to_contact <= 0 then + Lwt.return_true + else + let contactable = + connectable st start_time max_to_contact in + if contactable = [] then + Lwt.return_false + else + List.fold_left + (fun acc point -> + P2p_connection_pool.connect + ~timeout:st.connection_timeout pool point >>= function + | Ok _ -> acc >|= succ + | Error _ -> acc) + (Lwt.return 0) + contactable >>= fun established -> + try_to_contact st ~start_time + (min_to_contact - established) (max_to_contact - established) + +(** Do a maintenance step. It will terminate only when the number + of connections is between `min_threshold` and `max_threshold`. *) +let rec maintain st = + let Pool pool = st.pool in + let n_connected = P2p_connection_pool.active_connections pool in + if n_connected < st.bounds.min_threshold then + too_few_connections st n_connected + else if st.bounds.max_threshold < n_connected then + too_many_connections st n_connected + else begin + (* end of maintenance when enough users have been reached *) + Lwt_condition.broadcast st.just_maintained () ; + lwt_debug "Maintenance step ended" >>= fun () -> + return () + end + +and too_few_connections st n_connected = + let Pool pool = st.pool in + (* too few connections, try and contact many peers *) + lwt_debug "Too few connections (%d)" n_connected >>= fun () -> + let min_to_contact = st.bounds.min_target - n_connected in + let max_to_contact = st.bounds.max_target - n_connected in + try_to_contact st min_to_contact max_to_contact >>= fun continue -> + if not continue then begin + maintain st + end else begin + (* not enough contacts, ask the pals of our pals, + discover the local network and then wait *) + iter_option ~f:P2p_discovery.restart st.disco ; + P2p_connection_pool.broadcast_bootstrap_msg pool ; + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt.pick [ + P2p_connection_pool.Events.new_point pool ; + Lwt_unix.sleep 5.0 (* TODO exponential back-off ?? + or wait for the existence of a + non grey-listed peer ?? *) + ] >>= return + end >>=? fun () -> + maintain st + end + +and too_many_connections st n_connected = + let Pool pool = st.pool in + (* too many connections, start the russian roulette *) + let to_kill = n_connected - st.bounds.max_target in + lwt_debug "Too many connections, will kill %d" to_kill >>= fun () -> + snd @@ P2p_connection_pool.fold_connections pool + ~init:(to_kill, Lwt.return_unit) + ~f:(fun _ conn (i, t) -> + if i = 0 then (0, t) + else (i - 1, t >>= fun () -> P2p_connection_pool.disconnect conn)) + >>= fun () -> + maintain st + +let rec worker_loop st = + begin + let Pool pool = st.pool in + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Lwt.pick [ + Lwt_unix.sleep 120. ; (* every two minutes *) + Lwt_condition.wait st.please_maintain ; (* when asked *) + P2p_connection_pool.Events.too_few_connections pool ; (* limits *) + P2p_connection_pool.Events.too_many_connections pool + ] >>= fun () -> + return () + end >>=? fun () -> + maintain st + end >>= function + | Ok () -> worker_loop st + | Error [Lwt_utils.Canceled] -> Lwt.return_unit + | Error _ -> Lwt.return_unit + +let run ?(connection_timeout = 5.) bounds pool disco = + let canceler = Canceler.create () in + let st = { + canceler ; connection_timeout ; + bounds ; pool = Pool pool ; disco ; + just_maintained = Lwt_condition.create () ; + please_maintain = Lwt_condition.create () ; + worker = Lwt.return_unit ; + } in + st.worker <- + Lwt_utils.worker "maintenance" + (fun () -> worker_loop st) + (fun () -> Canceler.cancel canceler); + st + +let maintain { just_maintained ; please_maintain } = + let wait = Lwt_condition.wait just_maintained in + Lwt_condition.broadcast please_maintain () ; + wait + +let shutdown { canceler ; worker ; just_maintained } = + Canceler.cancel canceler >>= fun () -> + worker >>= fun () -> + Lwt_condition.broadcast just_maintained () ; + Lwt.return_unit diff --git a/src/node/net/p2p_maintenance.mli b/src/node/net/p2p_maintenance.mli new file mode 100644 index 000000000..1398d2527 --- /dev/null +++ b/src/node/net/p2p_maintenance.mli @@ -0,0 +1,45 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(* min <= min_threshold <= min_target <= max_target <= max_threshold <= max *) + +(* The 'pool' urges the maintainer to work when the number of + connections reaches `max` or is below `min`. Otherwise, the + maintener is lazy and only lookup for connection every two + minutes. The [maintain] function is another way to signal the + maintainer that a maintenance step is desired. + + When the maintener detects that the number of connections is over + `max_threshold`, it randomly kills connections to reach `max_target`. + + When the maintener detects that the number of connections is below + `min_threshold`, it creates enough connection to reach at least + `min_target` (and never more than `max_target`). In the process, it + might ask its actual peers for new peers. *) + +type bounds = { + min_threshold: int ; + min_target: int ; + max_target: int ; + max_threshold: int ; +} + +type 'meta t +(** Type of a maintenance worker. *) + +val run: + ?connection_timeout:float -> + bounds -> + ('msg, 'meta) P2p_connection_pool.t -> + P2p_discovery.t option -> + 'meta t + +val maintain: 'meta t -> unit Lwt.t + +val shutdown: 'meta t -> unit Lwt.t diff --git a/src/tezos-deps.opam b/src/tezos-deps.opam index c1914a775..f078080d9 100644 --- a/src/tezos-deps.opam +++ b/src/tezos-deps.opam @@ -21,6 +21,7 @@ depends: [ "conduit" "git" "git-unix" + "ipv6-multicast" "irmin-watcher" (* for `irmin.unix` *) "irmin" {>= "0.12" } "lwt" {>= "2.7.0" } diff --git a/test/Makefile b/test/Makefile index ac52d5b75..0fe17dcb4 100644 --- a/test/Makefile +++ b/test/Makefile @@ -39,6 +39,7 @@ PACKAGES := \ dynlink \ ezjsonm \ git \ + ipv6-multicast \ irmin.unix \ lwt \ lwt.unix \