From a3379065eaff2012d3b07e73dc4579b734b7ce3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 4 Mar 2019 19:12:39 +0100 Subject: [PATCH] Distributed_db: fix concurency issue on `Peer_validator` creation --- src/lib_shell/chain_validator.ml | 63 +++++++++++++++++++------------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/src/lib_shell/chain_validator.ml b/src/lib_shell/chain_validator.ml index c0508dd26..d97d67469 100644 --- a/src/lib_shell/chain_validator.ml +++ b/src/lib_shell/chain_validator.ml @@ -76,7 +76,7 @@ module Types = struct mutable child: (state * (unit -> unit Lwt.t (* shutdown *))) option ; mutable prevalidator: Prevalidator.t option ; - active_peers: Peer_validator.t P2p_peer.Table.t ; + active_peers: Peer_validator.t tzresult Lwt.t P2p_peer.Table.t ; bootstrapped_peers: unit P2p_peer.Table.t ; } @@ -128,25 +128,26 @@ let may_toggle_bootstrapped_chain w = let may_activate_peer_validator w peer_id = let nv = Worker.state w in match P2p_peer.Table.find_opt nv.active_peers peer_id with - |Some pv -> return pv - |None -> - Peer_validator.create - ~notify_new_block:(notify_new_block w) - ~notify_bootstrapped: begin fun () -> - P2p_peer.Table.add nv.bootstrapped_peers peer_id () ; - may_toggle_bootstrapped_chain w - end - ~notify_termination: begin fun _pv -> - P2p_peer.Table.remove nv.active_peers peer_id ; - P2p_peer.Table.remove nv.bootstrapped_peers peer_id ; - end - nv.parameters.peer_validator_limits - nv.parameters.block_validator - nv.parameters.chain_db - peer_id - >>=? fun pv -> + | Some pv -> + pv + | None -> + let pv = + Peer_validator.create + ~notify_new_block:(notify_new_block w) + ~notify_bootstrapped: begin fun () -> + P2p_peer.Table.add nv.bootstrapped_peers peer_id () ; + may_toggle_bootstrapped_chain w + end + ~notify_termination: begin fun _pv -> + P2p_peer.Table.remove nv.active_peers peer_id ; + P2p_peer.Table.remove nv.bootstrapped_peers peer_id ; + end + nv.parameters.peer_validator_limits + nv.parameters.block_validator + nv.parameters.chain_db + peer_id in P2p_peer.Table.add nv.active_peers peer_id pv ; - return pv + pv let may_update_checkpoint chain_state new_head = State.Chain.checkpoint chain_state >>= fun (old_level, _old_block) -> @@ -339,15 +340,22 @@ let on_completion (type a) w (req : a Request.t) (update : a) request_status = let on_close w = let nv = Worker.state w in Distributed_db.deactivate nv.parameters.chain_db >>= fun () -> + begin + P2p_peer.Table.fold + (fun _ pv acc -> + acc >>= fun acc -> + pv >|= function + | Ok pv -> Peer_validator.shutdown pv :: acc + | Error _ -> acc) + nv.active_peers (Lwt.return []) + end >>= fun pvs -> Lwt.join (begin match nv.prevalidator with | Some prevalidator -> Prevalidator.shutdown prevalidator | None -> Lwt.return_unit end :: Lwt_utils.may ~f:(fun (_, shutdown) -> shutdown ()) nv.child :: - P2p_peer.Table.fold - (fun _ pv acc -> Peer_validator.shutdown pv :: acc) - nv.active_peers []) >>= fun () -> + pvs) >>= fun () -> Lwt.return_unit let on_launch start_prevalidator w _ parameters = @@ -413,9 +421,14 @@ let on_launch start_prevalidator w _ parameters = end ; disconnection = begin fun peer_id -> Lwt.async begin fun () -> - may_activate_peer_validator w peer_id >>=? fun pv -> - Peer_validator.shutdown pv >>= fun () -> - return_unit + let nv = Worker.state w in + match P2p_peer.Table.find_opt nv.active_peers peer_id with + | Some pv -> + pv >>=? fun pv -> + Peer_validator.shutdown pv >>= fun () -> + return_unit + | None -> + return_unit end end ; } ;