Shell: implement P2p_{maintenance,discovery}.

This commit is contained in:
Grégoire Henry 2017-01-14 13:14:11 +01:00
parent b694a62810
commit d9fc93a5c0
8 changed files with 395 additions and 0 deletions

View File

@ -32,6 +32,7 @@ PKG cstruct
PKG dynlink PKG dynlink
PKG ezjsonm PKG ezjsonm
PKG git PKG git
PKG ipv6-multicast
PKG irmin PKG irmin
PKG lwt PKG lwt
PKG mtime.os PKG mtime.os

View File

@ -263,6 +263,8 @@ NODE_LIB_INTFS := \
node/net/p2p_connection_pool_types.mli \ node/net/p2p_connection_pool_types.mli \
node/net/p2p_connection_pool.mli \ node/net/p2p_connection_pool.mli \
node/net/p2p_welcome.mli \ node/net/p2p_welcome.mli \
node/net/p2p_discovery.mli \
node/net/p2p_maintenance.mli \
node/net/p2p.mli \ node/net/p2p.mli \
node/net/RPC_server.mli \ node/net/RPC_server.mli \
\ \
@ -299,6 +301,8 @@ NODE_LIB_IMPLS := \
node/net/p2p_connection_pool_types.ml \ node/net/p2p_connection_pool_types.ml \
node/net/p2p_connection_pool.ml \ node/net/p2p_connection_pool.ml \
node/net/p2p_welcome.ml \ node/net/p2p_welcome.ml \
node/net/p2p_discovery.ml \
node/net/p2p_maintenance.ml \
node/net/p2p.ml \ node/net/p2p.ml \
\ \
node/net/RPC_server.ml \ node/net/RPC_server.ml \
@ -335,6 +339,7 @@ NODE_PACKAGES := \
cohttp.lwt \ cohttp.lwt \
dynlink \ dynlink \
git \ git \
ipv6-multicast \
irmin.unix \ irmin.unix \
ocplib-resto.directory \ ocplib-resto.directory \
cmdliner \ cmdliner \

View File

@ -0,0 +1,138 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open P2p_types
include Logging.Make (struct let name = "p2p.discovery" end)
type t = ()
let create _pool = ()
let restart () = (() : unit)
let shutdown () = Lwt.return_unit
let inet_addr = Unix.inet_addr_of_string "ff0e::54:455a:3053"
module Message = struct
let encoding =
Data_encoding.(tup3 (Fixed.string 10) Gid.encoding int16)
let length = Data_encoding.Binary.fixed_length_exn encoding
let make gid port =
Data_encoding.Binary.to_bytes encoding ("DISCOMAGIC", gid, port)
end
(* Sends discover messages into space in an exponentially delayed loop,
restartable using a condition *)
let sender sock saddr my_gid inco_port cancelation restart =
let buf = Message.make my_gid inco_port in
let rec loop delay n =
Lwt.catch
(fun () ->
Lwt_bytes.sendto sock buf 0 Message.length [] saddr >>= fun _nb_sent ->
Lwt.return_unit)
(fun exn ->
lwt_debug "(%a) error broadcasting a discovery request: %a"
Gid.pp my_gid Error_monad.pp (Exn exn)) >>= fun () ->
Lwt.pick
[ (Lwt_unix.sleep delay >>= fun () -> Lwt.return (Some (delay, n + 1))) ;
(cancelation () >>= fun () -> Lwt.return_none) ;
(Lwt_condition.wait restart >>= fun () -> Lwt.return (Some (0.1, 0))) ]
>>= function
| Some (delay, n) when n = 10 -> loop delay 9
| Some (delay, n) -> loop (delay *. 2.) n
| None -> Lwt.return_unit
in
loop 0.2 1
let create_socket (iface, disco_addr, disco_port) =
let usock = Unix.socket PF_INET6 SOCK_DGRAM 0 in
let sock = Lwt_unix.of_unix_file_descr ~blocking:false usock in
let saddr = Unix.ADDR_INET (disco_addr, disco_port) in
Unix.setsockopt usock SO_REUSEADDR true ;
Ipv6_multicast.Unix.bind ?iface usock saddr ;
Ipv6_multicast.Unix.membership ?iface usock disco_addr `Join ;
iface, sock, saddr
(*
module Answerer = struct
(* Launch an answer machine for the discovery mechanism, takes a
callback to fill the answers and returns a canceler function *)
let answerer sock my_gid cancelation callback =
(* the answering function *)
let buf = MBytes.create Message.length in
let rec step () =
Lwt.pick
[ (cancelation () >>= fun () -> Lwt.return_none) ;
(Lwt_bytes.recvfrom sock buf 0 Message.length [] >>= fun r ->
Lwt.return (Some r)) ] >>= function
| None -> Lwt.return_unit
| Some (len', Lwt_unix.ADDR_INET (remote_addr, _mcast_port))
when len' = Message.length -> begin
match (Data_encoding.Binary.of_bytes Message.encoding buf) with
| Some ("DISCOMAGIC", remote_gid, remote_inco_port)
when remote_gid <> my_gid ->
Lwt.catch
(fun () -> callback ~remote_addr ~remote_inco_port)
(fun exn ->
lwt_debug "Error processing a discovery request: %a"
pp_exn exn) >>=
step
| _ ->
step ()
end
| Some _ -> step ()
in
step ()
let worker_loop st =
let callback ~remote_addr ~remote_inco_port =
let remote_uaddr = Ipaddr_unix.V6.of_inet_addr_exn remote_addr in
P2p_connection_loop.notify_new_peer
in
Lwt.catch
(fun () ->
Lwt_utils.worker
(Format.asprintf "(%a) discovery answerer" Gid.pp my_gid)
(fun () -> answerer fd my_gid cancelation callback)
cancel)
(fun exn ->
lwt_log_error "Discovery answerer not started: %a"
Error_monad.pp (Exn exn))
end
let discovery_sender =
match config.pending_authentification_port with
| None -> Lwt.return_unit
| Some inco_port ->
Lwt.catch
(fun () ->
let sender () =
Discovery.sender fd
saddr my_gid inco_port cancelation restart_discovery in
Lwt_utils.worker
(Format.asprintf "(%a) discovery sender" Gid.pp my_gid)
sender cancel)
(fun exn ->
lwt_log_error "Discovery sender not started: %a"
Error_monad.pp (Exn exn))
let discovery_answerer, discovery_sender =
match map_option ~f:create_socket st.config.local_discovery with
| exception exn ->
log_error "Error creating discovery socket: %a" Error_monad.pp (Exn exn) ;
(Lwt.return_unit, Lwt.return_unit)
| None -> Lwt.return_unit, Lwt.return_unit
| Some (iface, fd, saddr) ->
discovery_answerer, discovery_sender
*)

View File

@ -0,0 +1,13 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type t
val create : ('msg, 'meta) P2p_connection_pool.pool -> t
val restart : t -> unit
val shutdown : t -> unit Lwt.t

View File

@ -0,0 +1,191 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open P2p_types
open P2p_connection_pool_types
include Logging.Make (struct let name = "p2p.maintenance" end)
type bounds = {
min_threshold: int ;
min_target: int ;
max_target: int ;
max_threshold: int ;
}
type 'meta pool = Pool : ('msg, 'meta) P2p_connection_pool.t -> 'meta pool
type 'meta t = {
canceler: Canceler.t ;
connection_timeout: float ;
bounds: bounds ;
pool: 'meta pool ;
disco: P2p_discovery.t option ;
just_maintained: unit Lwt_condition.t ;
please_maintain: unit Lwt_condition.t ;
mutable worker : unit Lwt.t ;
}
(** Select [expected] points amongst the disconnected known points.
It ignores points which are greylisted, or for which a connection
failed after [start_time]. It first selects points with the oldest
last tentative. *)
let connectable st start_time expected =
let now = Time.now () in
let module Bounded_point_info =
Utils.Bounded(struct
type t = (Time.t option * Point.t)
let compare (t1, _) (t2, _) =
match t1, t2 with
| None, None -> 0
| None, Some _ -> 1
| Some _, None -> -1
| Some t1, Some t2 -> Time.compare t2 t1
end) in
let acc = Bounded_point_info.create expected in
let Pool pool = st.pool in
P2p_connection_pool.Points.fold_known
pool ~init:()
~f:begin fun point pi () ->
match Point_info.State.get pi with
| Disconnected -> begin
match Point_info.last_miss pi with
| Some last when Time.(start_time < last)
&& not (Point_info.greylisted ~now pi) -> ()
| last ->
Bounded_point_info.insert (last, point) acc
end
| _ -> ()
end ;
List.map snd (Bounded_point_info.get acc)
(** Try to create connections to new peers. It tries to create at
least [min_to_contact] connections, and will never creates more
than [max_to_contact]. But, if after trying once all disconnected
peers, it returns [false]. *)
let rec try_to_contact
st ?(start_time = Time.now ())
min_to_contact max_to_contact =
let Pool pool = st.pool in
if min_to_contact <= 0 then
Lwt.return_true
else
let contactable =
connectable st start_time max_to_contact in
if contactable = [] then
Lwt.return_false
else
List.fold_left
(fun acc point ->
P2p_connection_pool.connect
~timeout:st.connection_timeout pool point >>= function
| Ok _ -> acc >|= succ
| Error _ -> acc)
(Lwt.return 0)
contactable >>= fun established ->
try_to_contact st ~start_time
(min_to_contact - established) (max_to_contact - established)
(** Do a maintenance step. It will terminate only when the number
of connections is between `min_threshold` and `max_threshold`. *)
let rec maintain st =
let Pool pool = st.pool in
let n_connected = P2p_connection_pool.active_connections pool in
if n_connected < st.bounds.min_threshold then
too_few_connections st n_connected
else if st.bounds.max_threshold < n_connected then
too_many_connections st n_connected
else begin
(* end of maintenance when enough users have been reached *)
Lwt_condition.broadcast st.just_maintained () ;
lwt_debug "Maintenance step ended" >>= fun () ->
return ()
end
and too_few_connections st n_connected =
let Pool pool = st.pool in
(* too few connections, try and contact many peers *)
lwt_debug "Too few connections (%d)" n_connected >>= fun () ->
let min_to_contact = st.bounds.min_target - n_connected in
let max_to_contact = st.bounds.max_target - n_connected in
try_to_contact st min_to_contact max_to_contact >>= fun continue ->
if not continue then begin
maintain st
end else begin
(* not enough contacts, ask the pals of our pals,
discover the local network and then wait *)
iter_option ~f:P2p_discovery.restart st.disco ;
P2p_connection_pool.broadcast_bootstrap_msg pool ;
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Lwt.pick [
P2p_connection_pool.Events.new_point pool ;
Lwt_unix.sleep 5.0 (* TODO exponential back-off ??
or wait for the existence of a
non grey-listed peer ?? *)
] >>= return
end >>=? fun () ->
maintain st
end
and too_many_connections st n_connected =
let Pool pool = st.pool in
(* too many connections, start the russian roulette *)
let to_kill = n_connected - st.bounds.max_target in
lwt_debug "Too many connections, will kill %d" to_kill >>= fun () ->
snd @@ P2p_connection_pool.fold_connections pool
~init:(to_kill, Lwt.return_unit)
~f:(fun _ conn (i, t) ->
if i = 0 then (0, t)
else (i - 1, t >>= fun () -> P2p_connection_pool.disconnect conn))
>>= fun () ->
maintain st
let rec worker_loop st =
begin
let Pool pool = st.pool in
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Lwt.pick [
Lwt_unix.sleep 120. ; (* every two minutes *)
Lwt_condition.wait st.please_maintain ; (* when asked *)
P2p_connection_pool.Events.too_few_connections pool ; (* limits *)
P2p_connection_pool.Events.too_many_connections pool
] >>= fun () ->
return ()
end >>=? fun () ->
maintain st
end >>= function
| Ok () -> worker_loop st
| Error [Lwt_utils.Canceled] -> Lwt.return_unit
| Error _ -> Lwt.return_unit
let run ?(connection_timeout = 5.) bounds pool disco =
let canceler = Canceler.create () in
let st = {
canceler ; connection_timeout ;
bounds ; pool = Pool pool ; disco ;
just_maintained = Lwt_condition.create () ;
please_maintain = Lwt_condition.create () ;
worker = Lwt.return_unit ;
} in
st.worker <-
Lwt_utils.worker "maintenance"
(fun () -> worker_loop st)
(fun () -> Canceler.cancel canceler);
st
let maintain { just_maintained ; please_maintain } =
let wait = Lwt_condition.wait just_maintained in
Lwt_condition.broadcast please_maintain () ;
wait
let shutdown { canceler ; worker ; just_maintained } =
Canceler.cancel canceler >>= fun () ->
worker >>= fun () ->
Lwt_condition.broadcast just_maintained () ;
Lwt.return_unit

View File

@ -0,0 +1,45 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(* min <= min_threshold <= min_target <= max_target <= max_threshold <= max *)
(* The 'pool' urges the maintainer to work when the number of
connections reaches `max` or is below `min`. Otherwise, the
maintener is lazy and only lookup for connection every two
minutes. The [maintain] function is another way to signal the
maintainer that a maintenance step is desired.
When the maintener detects that the number of connections is over
`max_threshold`, it randomly kills connections to reach `max_target`.
When the maintener detects that the number of connections is below
`min_threshold`, it creates enough connection to reach at least
`min_target` (and never more than `max_target`). In the process, it
might ask its actual peers for new peers. *)
type bounds = {
min_threshold: int ;
min_target: int ;
max_target: int ;
max_threshold: int ;
}
type 'meta t
(** Type of a maintenance worker. *)
val run:
?connection_timeout:float ->
bounds ->
('msg, 'meta) P2p_connection_pool.t ->
P2p_discovery.t option ->
'meta t
val maintain: 'meta t -> unit Lwt.t
val shutdown: 'meta t -> unit Lwt.t

View File

@ -21,6 +21,7 @@ depends: [
"conduit" "conduit"
"git" "git"
"git-unix" "git-unix"
"ipv6-multicast"
"irmin-watcher" (* for `irmin.unix` *) "irmin-watcher" (* for `irmin.unix` *)
"irmin" {>= "0.12" } "irmin" {>= "0.12" }
"lwt" {>= "2.7.0" } "lwt" {>= "2.7.0" }

View File

@ -39,6 +39,7 @@ PACKAGES := \
dynlink \ dynlink \
ezjsonm \ ezjsonm \
git \ git \
ipv6-multicast \
irmin.unix \ irmin.unix \
lwt \ lwt \
lwt.unix \ lwt.unix \