ligo/src/node/net/p2p_maintenance.ml

192 lines
6.7 KiB
OCaml
Raw Normal View History

(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open P2p_types
open P2p_connection_pool_types
include Logging.Make (struct let name = "p2p.maintenance" end)
type bounds = {
min_threshold: int ;
min_target: int ;
max_target: int ;
max_threshold: int ;
}
type 'meta pool = Pool : ('msg, 'meta) P2p_connection_pool.t -> 'meta pool
type 'meta t = {
canceler: Canceler.t ;
connection_timeout: float ;
bounds: bounds ;
pool: 'meta pool ;
disco: P2p_discovery.t option ;
just_maintained: unit Lwt_condition.t ;
please_maintain: unit Lwt_condition.t ;
mutable worker : unit Lwt.t ;
}
(** Select [expected] points amongst the disconnected known points.
It ignores points which are greylisted, or for which a connection
failed after [start_time]. It first selects points with the oldest
last tentative. *)
let connectable st start_time expected =
let now = Time.now () in
let module Bounded_point_info =
Utils.Bounded(struct
type t = (Time.t option * Point.t)
let compare (t1, _) (t2, _) =
match t1, t2 with
| None, None -> 0
| None, Some _ -> 1
| Some _, None -> -1
| Some t1, Some t2 -> Time.compare t2 t1
end) in
let acc = Bounded_point_info.create expected in
let Pool pool = st.pool in
P2p_connection_pool.Points.fold_known
pool ~init:()
~f:begin fun point pi () ->
match Point_info.State.get pi with
| Disconnected -> begin
match Point_info.last_miss pi with
| Some last when Time.(start_time < last)
&& not (Point_info.greylisted ~now pi) -> ()
| last ->
Bounded_point_info.insert (last, point) acc
end
| _ -> ()
end ;
List.map snd (Bounded_point_info.get acc)
(** Try to create connections to new peers. It tries to create at
least [min_to_contact] connections, and will never creates more
than [max_to_contact]. But, if after trying once all disconnected
peers, it returns [false]. *)
let rec try_to_contact
st ?(start_time = Time.now ())
min_to_contact max_to_contact =
let Pool pool = st.pool in
if min_to_contact <= 0 then
Lwt.return_true
else
let contactable =
connectable st start_time max_to_contact in
if contactable = [] then
Lwt.return_false
else
List.fold_left
(fun acc point ->
P2p_connection_pool.connect
~timeout:st.connection_timeout pool point >>= function
| Ok _ -> acc >|= succ
| Error _ -> acc)
(Lwt.return 0)
contactable >>= fun established ->
try_to_contact st ~start_time
(min_to_contact - established) (max_to_contact - established)
(** Do a maintenance step. It will terminate only when the number
of connections is between `min_threshold` and `max_threshold`. *)
let rec maintain st =
let Pool pool = st.pool in
let n_connected = P2p_connection_pool.active_connections pool in
if n_connected < st.bounds.min_threshold then
too_few_connections st n_connected
else if st.bounds.max_threshold < n_connected then
too_many_connections st n_connected
else begin
(* end of maintenance when enough users have been reached *)
Lwt_condition.broadcast st.just_maintained () ;
lwt_debug "Maintenance step ended" >>= fun () ->
return ()
end
and too_few_connections st n_connected =
let Pool pool = st.pool in
(* too few connections, try and contact many peers *)
lwt_debug "Too few connections (%d)" n_connected >>= fun () ->
let min_to_contact = st.bounds.min_target - n_connected in
let max_to_contact = st.bounds.max_target - n_connected in
try_to_contact st min_to_contact max_to_contact >>= fun continue ->
if not continue then begin
maintain st
end else begin
(* not enough contacts, ask the pals of our pals,
discover the local network and then wait *)
iter_option ~f:P2p_discovery.restart st.disco ;
P2p_connection_pool.broadcast_bootstrap_msg pool ;
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Lwt.pick [
P2p_connection_pool.Events.new_point pool ;
Lwt_unix.sleep 5.0 (* TODO exponential back-off ??
or wait for the existence of a
non grey-listed peer ?? *)
] >>= return
end >>=? fun () ->
maintain st
end
and too_many_connections st n_connected =
let Pool pool = st.pool in
(* too many connections, start the russian roulette *)
let to_kill = n_connected - st.bounds.max_target in
lwt_debug "Too many connections, will kill %d" to_kill >>= fun () ->
snd @@ P2p_connection_pool.fold_connections pool
~init:(to_kill, Lwt.return_unit)
~f:(fun _ conn (i, t) ->
if i = 0 then (0, t)
else (i - 1, t >>= fun () -> P2p_connection_pool.disconnect conn))
>>= fun () ->
maintain st
let rec worker_loop st =
begin
let Pool pool = st.pool in
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Lwt.pick [
Lwt_unix.sleep 120. ; (* every two minutes *)
Lwt_condition.wait st.please_maintain ; (* when asked *)
P2p_connection_pool.Events.too_few_connections pool ; (* limits *)
P2p_connection_pool.Events.too_many_connections pool
] >>= fun () ->
return ()
end >>=? fun () ->
maintain st
end >>= function
| Ok () -> worker_loop st
| Error [Lwt_utils.Canceled] -> Lwt.return_unit
| Error _ -> Lwt.return_unit
let run ?(connection_timeout = 5.) bounds pool disco =
let canceler = Canceler.create () in
let st = {
canceler ; connection_timeout ;
bounds ; pool = Pool pool ; disco ;
just_maintained = Lwt_condition.create () ;
please_maintain = Lwt_condition.create () ;
worker = Lwt.return_unit ;
} in
st.worker <-
Lwt_utils.worker "maintenance"
(fun () -> worker_loop st)
(fun () -> Canceler.cancel canceler);
st
let maintain { just_maintained ; please_maintain } =
let wait = Lwt_condition.wait just_maintained in
Lwt_condition.broadcast please_maintain () ;
wait
let shutdown { canceler ; worker ; just_maintained } =
Canceler.cancel canceler >>= fun () ->
worker >>= fun () ->
Lwt_condition.broadcast just_maintained () ;
Lwt.return_unit