P2p: fix race condition in node initialisation

This commit is contained in:
Grégoire Henry 2019-02-19 16:29:38 +01:00
parent 1853889637
commit 15b61d6b84
No known key found for this signature in database
GPG Key ID: 50D984F20BD445D2
9 changed files with 61 additions and 28 deletions

View File

@ -158,13 +158,13 @@ let create_maintenance_worker limits pool =
~expected:limits.expected_connections ~expected:limits.expected_connections
~max:limits.max_connections ~max:limits.max_connections
in in
P2p_maintenance.run bounds pool P2p_maintenance.create bounds pool
let may_create_welcome_worker config limits pool = let may_create_welcome_worker config limits pool =
match config.listening_port with match config.listening_port with
| None -> Lwt.return_none | None -> Lwt.return_none
| Some port -> | Some port ->
P2p_welcome.run P2p_welcome.create
~backlog:limits.backlog pool ~backlog:limits.backlog pool
?addr:config.listening_addr ?addr:config.listening_addr
port >>= fun w -> port >>= fun w ->
@ -201,9 +201,21 @@ module Real = struct
let peer_id { config } = config.identity.peer_id let peer_id { config } = config.identity.peer_id
let maintain { maintenance } () = let maintain { maintenance } () =
P2p_maintenance.maintain maintenance P2p_maintenance.maintain maintenance
let activate t () =
log_info "activate";
begin
match t.welcome with
| None -> ()
| Some w -> P2p_welcome.activate w
end ;
P2p_maintenance.activate t.maintenance;
Lwt.async (fun () -> P2p_maintenance.maintain t.maintenance) ;
()
let roll _net () = Lwt.return_unit (* TODO implement *) let roll _net () = Lwt.return_unit (* TODO implement *)
(* returns when all workers have shutted down in the opposite (* returns when all workers have shutted down in the opposite
@ -377,6 +389,7 @@ type ('msg, 'peer_meta, 'conn_meta) t = {
on_new_connection : on_new_connection :
(P2p_peer.Id.t -> (P2p_peer.Id.t ->
('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit ; ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit ;
activate : unit -> unit ;
} }
type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t
@ -446,8 +459,13 @@ let create ~config ~limits peer_cfg conn_cfg msg_cfg =
fold_connections = (fun ~init ~f -> Real.fold_connections net ~init ~f) ; fold_connections = (fun ~init ~f -> Real.fold_connections net ~init ~f) ;
iter_connections = Real.iter_connections net ; iter_connections = Real.iter_connections net ;
on_new_connection = Real.on_new_connection net ; on_new_connection = Real.on_new_connection net ;
activate = Real.activate net ;
} }
let activate t =
log_info "activate P2P layer !";
t.activate ()
let faked_network peer_cfg faked_metadata = { let faked_network peer_cfg faked_metadata = {
versions = [] ; versions = [] ;
peer_id = Fake.id.peer_id ; peer_id = Fake.id.peer_id ;
@ -472,7 +490,8 @@ let faked_network peer_cfg faked_metadata = {
iter_connections = (fun _f -> ()) ; iter_connections = (fun _f -> ()) ;
on_new_connection = (fun _f -> ()) ; on_new_connection = (fun _f -> ()) ;
broadcast = ignore ; broadcast = ignore ;
pool = None pool = None ;
activate = (fun _ -> ()) ;
} }
let peer_id net = net.peer_id let peer_id net = net.peer_id

View File

@ -173,6 +173,8 @@ val create :
'peer_meta peer_meta_config -> 'conn_meta conn_meta_config -> 'peer_meta peer_meta_config -> 'conn_meta conn_meta_config ->
'msg message_config -> ('msg, 'peer_meta, 'conn_meta) net tzresult Lwt.t 'msg message_config -> ('msg, 'peer_meta, 'conn_meta) net tzresult Lwt.t
val activate : ('msg, 'peer_meta, 'conn_meta) net -> unit
(** Return one's peer_id *) (** Return one's peer_id *)
val peer_id : ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t val peer_id : ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t

View File

@ -206,21 +206,20 @@ let rec worker_loop st =
| Error [ Canceled ] -> Lwt.return_unit | Error [ Canceled ] -> Lwt.return_unit
| Error _ -> Lwt.return_unit | Error _ -> Lwt.return_unit
let run bounds pool = let create bounds pool = {
let canceler = Lwt_canceler.create () in canceler = Lwt_canceler.create ();
let st = {
canceler ;
bounds ; bounds ;
pool = Pool pool ; pool = Pool pool ;
just_maintained = Lwt_condition.create () ; just_maintained = Lwt_condition.create () ;
please_maintain = Lwt_condition.create () ; please_maintain = Lwt_condition.create () ;
maintain_worker = Lwt.return_unit ; maintain_worker = Lwt.return_unit ;
} in }
let activate st =
st.maintain_worker <- st.maintain_worker <-
Lwt_utils.worker "maintenance" Lwt_utils.worker "maintenance"
~run:(fun () -> worker_loop st) ~run:(fun () -> worker_loop st)
~cancel:(fun () -> Lwt_canceler.cancel canceler) ; ~cancel:(fun () -> Lwt_canceler.cancel st.canceler)
st
let maintain { just_maintained ; please_maintain } = let maintain { just_maintained ; please_maintain } =
let wait = Lwt_condition.wait just_maintained in let wait = Lwt_condition.wait just_maintained in

View File

@ -52,9 +52,12 @@ type bounds = {
type 'meta t type 'meta t
(** Type of a maintenance worker. *) (** Type of a maintenance worker. *)
val run: bounds -> ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta t val create : bounds -> ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta t
(** [run ~greylist_timeout bounds pool] is a maintenance worker for (** [create ~greylist_timeout bounds pool] prepares a maintenance
[pool] with connection targets specified in [bounds]. *) worker for [pool] with connection targets specified in [bounds]. *)
val activate : 'meta t -> unit
(** [activate t] start the worker that will maintain connections *)
val maintain: 'meta t -> unit Lwt.t val maintain: 'meta t -> unit Lwt.t
(** [maintain t] gives a hint to maintenance worker [t] that (** [maintain t] gives a hint to maintenance worker [t] that

View File

@ -63,7 +63,7 @@ let create_listening_socket ~backlog ?(addr = Ipaddr.V6.unspecified) port =
Lwt_unix.listen main_socket backlog ; Lwt_unix.listen main_socket backlog ;
Lwt.return main_socket Lwt.return main_socket
let run ?addr ~backlog pool port = let create ?addr ~backlog pool port =
Lwt.catch begin fun () -> Lwt.catch begin fun () ->
create_listening_socket create_listening_socket
~backlog ?addr port >>= fun socket -> ~backlog ?addr port >>= fun socket ->
@ -75,10 +75,6 @@ let run ?addr ~backlog pool port =
socket ; canceler ; pool = Pool pool ; socket ; canceler ; pool = Pool pool ;
worker = Lwt.return_unit ; worker = Lwt.return_unit ;
} in } in
st.worker <-
Lwt_utils.worker "welcome"
~run:(fun () -> worker_loop st)
~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ;
Lwt.return st Lwt.return st
end begin fun exn -> end begin fun exn ->
lwt_log_error lwt_log_error
@ -87,6 +83,12 @@ let run ?addr ~backlog pool port =
Lwt.fail exn Lwt.fail exn
end end
let activate st =
st.worker <-
Lwt_utils.worker "welcome"
~run:(fun () -> worker_loop st)
~cancel:(fun () -> Lwt_canceler.cancel st.canceler)
let shutdown st = let shutdown st =
Lwt_canceler.cancel st.canceler >>= fun () -> Lwt_canceler.cancel st.canceler >>= fun () ->
st.worker st.worker

View File

@ -31,12 +31,15 @@
type t type t
(** Type of a welcome worker. *) (** Type of a welcome worker. *)
val run: val create :
?addr:P2p_addr.t -> backlog:int -> ?addr:P2p_addr.t -> backlog:int ->
('msg, 'meta, 'meta_conn) P2p_pool.t -> P2p_addr.port -> t Lwt.t ('msg, 'meta, 'meta_conn) P2p_pool.t -> P2p_addr.port -> t Lwt.t
(** [run ?addr ~backlog pool port] returns a running welcome worker (** [create ?addr ~backlog pool port] returns a running welcome worker
adding connections into [pool] listening on [addr:port]. [backlog] adding connections into [pool] listening on [addr:port]. [backlog]
is passed to [Lwt_unix.listen]. *) is passed to [Lwt_unix.listen]. *)
val activate : t -> unit
(** [activate t] start the worker that will accept connections *)
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t
(** [shutdown t] returns when [t] has completed shutdown. *) (** [shutdown t] returns when [t] has completed shutdown. *)

View File

@ -111,7 +111,8 @@ let detach_node f points n =
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
P2p_pool.create P2p_pool.create
config peer_meta_config conn_meta_config msg_config sched >>= fun pool -> config peer_meta_config conn_meta_config msg_config sched >>= fun pool ->
P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome -> P2p_welcome.create ~backlog:10 pool ~addr port >>= fun welcome ->
P2p_welcome.activate welcome;
lwt_log_info "Node ready (port: %d)" port >>= fun () -> lwt_log_info "Node ready (port: %d)" port >>= fun () ->
sync channel >>=? fun () -> sync channel >>=? fun () ->
f channel pool points >>=? fun () -> f channel pool points >>=? fun () ->

View File

@ -120,6 +120,7 @@ let may_toggle_bootstrapped_chain w =
if not nv.bootstrapped && if not nv.bootstrapped &&
P2p_peer.Table.length nv.bootstrapped_peers >= nv.parameters.limits.bootstrap_threshold P2p_peer.Table.length nv.bootstrapped_peers >= nv.parameters.limits.bootstrap_threshold
then begin then begin
Log.log_info "bootstrapped";
nv.bootstrapped <- true ; nv.bootstrapped <- true ;
Lwt.wakeup_later nv.bootstrapped_wakener () ; Lwt.wakeup_later nv.bootstrapped_wakener () ;
end end

View File

@ -25,6 +25,8 @@
module Message = Distributed_db_message module Message = Distributed_db_message
include Logging.Make(struct let name = "node.distributed_db" end)
type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net
type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection
@ -837,11 +839,12 @@ let create disk p2p =
active_chains ; protocol_db ; active_chains ; protocol_db ;
block_input ; operation_input ; block_input ; operation_input ;
} in } in
P2p.on_new_connection p2p (P2p_reader.run db) ;
P2p.iter_connections p2p (P2p_reader.run db) ;
db db
let activate ({ p2p ; active_chains } as global_db) chain_state = let activate ({ p2p ; active_chains } as global_db) chain_state =
P2p.on_new_connection p2p (P2p_reader.run global_db) ;
P2p.iter_connections p2p (P2p_reader.run global_db) ;
P2p.activate p2p;
let chain_id = State.Chain.id chain_state in let chain_id = State.Chain.id chain_state in
match Chain_id.Table.find_opt active_chains chain_id with match Chain_id.Table.find_opt active_chains chain_id with
| None -> | None ->