From 15b61d6b840b2c2c1366dbc50ebf5e0ed057e8d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Tue, 19 Feb 2019 16:29:38 +0100 Subject: [PATCH] P2p: fix race condition in node initialisation --- src/lib_p2p/p2p.ml | 25 ++++++++++++++++++++++--- src/lib_p2p/p2p.mli | 2 ++ src/lib_p2p/p2p_maintenance.ml | 23 +++++++++++------------ src/lib_p2p/p2p_maintenance.mli | 9 ++++++--- src/lib_p2p/p2p_welcome.ml | 12 +++++++----- src/lib_p2p/p2p_welcome.mli | 7 +++++-- src/lib_p2p/test/test_p2p_pool.ml | 3 ++- src/lib_shell/chain_validator.ml | 1 + src/lib_shell/distributed_db.ml | 7 +++++-- 9 files changed, 61 insertions(+), 28 deletions(-) diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index d20879c6d..70bce3146 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -158,13 +158,13 @@ let create_maintenance_worker limits pool = ~expected:limits.expected_connections ~max:limits.max_connections in - P2p_maintenance.run bounds pool + P2p_maintenance.create bounds pool let may_create_welcome_worker config limits pool = match config.listening_port with | None -> Lwt.return_none | Some port -> - P2p_welcome.run + P2p_welcome.create ~backlog:limits.backlog pool ?addr:config.listening_addr port >>= fun w -> @@ -201,9 +201,21 @@ module Real = struct let peer_id { config } = config.identity.peer_id + let maintain { maintenance } () = P2p_maintenance.maintain maintenance + let activate t () = + log_info "activate"; + begin + match t.welcome with + | None -> () + | Some w -> P2p_welcome.activate w + end ; + P2p_maintenance.activate t.maintenance; + Lwt.async (fun () -> P2p_maintenance.maintain t.maintenance) ; + () + let roll _net () = Lwt.return_unit (* TODO implement *) (* returns when all workers have shutted down in the opposite @@ -377,6 +389,7 @@ type ('msg, 'peer_meta, 'conn_meta) t = { on_new_connection : (P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit ; + activate : unit -> unit ; } type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t @@ -446,8 +459,13 @@ let create ~config ~limits peer_cfg conn_cfg msg_cfg = fold_connections = (fun ~init ~f -> Real.fold_connections net ~init ~f) ; iter_connections = Real.iter_connections net ; on_new_connection = Real.on_new_connection net ; + activate = Real.activate net ; } +let activate t = + log_info "activate P2P layer !"; + t.activate () + let faked_network peer_cfg faked_metadata = { versions = [] ; peer_id = Fake.id.peer_id ; @@ -472,7 +490,8 @@ let faked_network peer_cfg faked_metadata = { iter_connections = (fun _f -> ()) ; on_new_connection = (fun _f -> ()) ; broadcast = ignore ; - pool = None + pool = None ; + activate = (fun _ -> ()) ; } let peer_id net = net.peer_id diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index fc160f507..9a810d860 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -173,6 +173,8 @@ val create : 'peer_meta peer_meta_config -> 'conn_meta conn_meta_config -> 'msg message_config -> ('msg, 'peer_meta, 'conn_meta) net tzresult Lwt.t +val activate : ('msg, 'peer_meta, 'conn_meta) net -> unit + (** Return one's peer_id *) val peer_id : ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t diff --git a/src/lib_p2p/p2p_maintenance.ml b/src/lib_p2p/p2p_maintenance.ml index 6a486f36f..52a855bad 100644 --- a/src/lib_p2p/p2p_maintenance.ml +++ b/src/lib_p2p/p2p_maintenance.ml @@ -206,21 +206,20 @@ let rec worker_loop st = | Error [ Canceled ] -> Lwt.return_unit | Error _ -> Lwt.return_unit -let run bounds pool = - let canceler = Lwt_canceler.create () in - let st = { - canceler ; - bounds ; - pool = Pool pool ; - just_maintained = Lwt_condition.create () ; - please_maintain = Lwt_condition.create () ; - maintain_worker = Lwt.return_unit ; - } in +let create bounds pool = { + canceler = Lwt_canceler.create (); + bounds ; + pool = Pool pool ; + just_maintained = Lwt_condition.create () ; + please_maintain = Lwt_condition.create () ; + maintain_worker = Lwt.return_unit ; +} + +let activate st = st.maintain_worker <- Lwt_utils.worker "maintenance" ~run:(fun () -> worker_loop st) - ~cancel:(fun () -> Lwt_canceler.cancel canceler) ; - st + ~cancel:(fun () -> Lwt_canceler.cancel st.canceler) let maintain { just_maintained ; please_maintain } = let wait = Lwt_condition.wait just_maintained in diff --git a/src/lib_p2p/p2p_maintenance.mli b/src/lib_p2p/p2p_maintenance.mli index 1c0e853a9..b5f6f942f 100644 --- a/src/lib_p2p/p2p_maintenance.mli +++ b/src/lib_p2p/p2p_maintenance.mli @@ -52,9 +52,12 @@ type bounds = { type 'meta t (** Type of a maintenance worker. *) -val run: bounds -> ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta t -(** [run ~greylist_timeout bounds pool] is a maintenance worker for - [pool] with connection targets specified in [bounds]. *) +val create : bounds -> ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta t +(** [create ~greylist_timeout bounds pool] prepares a maintenance + worker for [pool] with connection targets specified in [bounds]. *) + +val activate : 'meta t -> unit +(** [activate t] start the worker that will maintain connections *) val maintain: 'meta t -> unit Lwt.t (** [maintain t] gives a hint to maintenance worker [t] that diff --git a/src/lib_p2p/p2p_welcome.ml b/src/lib_p2p/p2p_welcome.ml index bacaa6c30..f0edecc0a 100644 --- a/src/lib_p2p/p2p_welcome.ml +++ b/src/lib_p2p/p2p_welcome.ml @@ -63,7 +63,7 @@ let create_listening_socket ~backlog ?(addr = Ipaddr.V6.unspecified) port = Lwt_unix.listen main_socket backlog ; Lwt.return main_socket -let run ?addr ~backlog pool port = +let create ?addr ~backlog pool port = Lwt.catch begin fun () -> create_listening_socket ~backlog ?addr port >>= fun socket -> @@ -75,10 +75,6 @@ let run ?addr ~backlog pool port = socket ; canceler ; pool = Pool pool ; worker = Lwt.return_unit ; } in - st.worker <- - Lwt_utils.worker "welcome" - ~run:(fun () -> worker_loop st) - ~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ; Lwt.return st end begin fun exn -> lwt_log_error @@ -87,6 +83,12 @@ let run ?addr ~backlog pool port = Lwt.fail exn end +let activate st = + st.worker <- + Lwt_utils.worker "welcome" + ~run:(fun () -> worker_loop st) + ~cancel:(fun () -> Lwt_canceler.cancel st.canceler) + let shutdown st = Lwt_canceler.cancel st.canceler >>= fun () -> st.worker diff --git a/src/lib_p2p/p2p_welcome.mli b/src/lib_p2p/p2p_welcome.mli index 828596878..c1b81c18b 100644 --- a/src/lib_p2p/p2p_welcome.mli +++ b/src/lib_p2p/p2p_welcome.mli @@ -31,12 +31,15 @@ type t (** Type of a welcome worker. *) -val run: +val create : ?addr:P2p_addr.t -> backlog:int -> ('msg, 'meta, 'meta_conn) P2p_pool.t -> P2p_addr.port -> t Lwt.t -(** [run ?addr ~backlog pool port] returns a running welcome worker +(** [create ?addr ~backlog pool port] returns a running welcome worker adding connections into [pool] listening on [addr:port]. [backlog] is passed to [Lwt_unix.listen]. *) +val activate : t -> unit +(** [activate t] start the worker that will accept connections *) + val shutdown: t -> unit Lwt.t (** [shutdown t] returns when [t] has completed shutdown. *) diff --git a/src/lib_p2p/test/test_p2p_pool.ml b/src/lib_p2p/test/test_p2p_pool.ml index b0cc3b21c..8265221d3 100644 --- a/src/lib_p2p/test/test_p2p_pool.ml +++ b/src/lib_p2p/test/test_p2p_pool.ml @@ -111,7 +111,8 @@ let detach_node f points n = let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in P2p_pool.create config peer_meta_config conn_meta_config msg_config sched >>= fun pool -> - P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome -> + P2p_welcome.create ~backlog:10 pool ~addr port >>= fun welcome -> + P2p_welcome.activate welcome; lwt_log_info "Node ready (port: %d)" port >>= fun () -> sync channel >>=? fun () -> f channel pool points >>=? fun () -> diff --git a/src/lib_shell/chain_validator.ml b/src/lib_shell/chain_validator.ml index 21f754b82..c0508dd26 100644 --- a/src/lib_shell/chain_validator.ml +++ b/src/lib_shell/chain_validator.ml @@ -120,6 +120,7 @@ let may_toggle_bootstrapped_chain w = if not nv.bootstrapped && P2p_peer.Table.length nv.bootstrapped_peers >= nv.parameters.limits.bootstrap_threshold then begin + Log.log_info "bootstrapped"; nv.bootstrapped <- true ; Lwt.wakeup_later nv.bootstrapped_wakener () ; end diff --git a/src/lib_shell/distributed_db.ml b/src/lib_shell/distributed_db.ml index 2d54cff6f..ff2dbc966 100644 --- a/src/lib_shell/distributed_db.ml +++ b/src/lib_shell/distributed_db.ml @@ -25,6 +25,8 @@ module Message = Distributed_db_message +include Logging.Make(struct let name = "node.distributed_db" end) + type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection @@ -837,11 +839,12 @@ let create disk p2p = active_chains ; protocol_db ; block_input ; operation_input ; } in - P2p.on_new_connection p2p (P2p_reader.run db) ; - P2p.iter_connections p2p (P2p_reader.run db) ; db let activate ({ p2p ; active_chains } as global_db) chain_state = + P2p.on_new_connection p2p (P2p_reader.run global_db) ; + P2p.iter_connections p2p (P2p_reader.run global_db) ; + P2p.activate p2p; let chain_id = State.Chain.id chain_state in match Chain_id.Table.find_opt active_chains chain_id with | None ->