Distributed_db: fix concurency issue on Peer_validator creation

This commit is contained in:
Grégoire Henry 2019-03-04 19:12:39 +01:00 committed by Pierre Boutillier
parent eba7f5ac8c
commit a3379065ea
No known key found for this signature in database
GPG Key ID: C2F73508B56A193C

View File

@ -76,7 +76,7 @@ module Types = struct
mutable child:
(state * (unit -> unit Lwt.t (* shutdown *))) option ;
mutable prevalidator: Prevalidator.t option ;
active_peers: Peer_validator.t P2p_peer.Table.t ;
active_peers: Peer_validator.t tzresult Lwt.t P2p_peer.Table.t ;
bootstrapped_peers: unit P2p_peer.Table.t ;
}
@ -128,25 +128,26 @@ let may_toggle_bootstrapped_chain w =
let may_activate_peer_validator w peer_id =
let nv = Worker.state w in
match P2p_peer.Table.find_opt nv.active_peers peer_id with
|Some pv -> return pv
|None ->
Peer_validator.create
~notify_new_block:(notify_new_block w)
~notify_bootstrapped: begin fun () ->
P2p_peer.Table.add nv.bootstrapped_peers peer_id () ;
may_toggle_bootstrapped_chain w
end
~notify_termination: begin fun _pv ->
P2p_peer.Table.remove nv.active_peers peer_id ;
P2p_peer.Table.remove nv.bootstrapped_peers peer_id ;
end
nv.parameters.peer_validator_limits
nv.parameters.block_validator
nv.parameters.chain_db
peer_id
>>=? fun pv ->
| Some pv ->
pv
| None ->
let pv =
Peer_validator.create
~notify_new_block:(notify_new_block w)
~notify_bootstrapped: begin fun () ->
P2p_peer.Table.add nv.bootstrapped_peers peer_id () ;
may_toggle_bootstrapped_chain w
end
~notify_termination: begin fun _pv ->
P2p_peer.Table.remove nv.active_peers peer_id ;
P2p_peer.Table.remove nv.bootstrapped_peers peer_id ;
end
nv.parameters.peer_validator_limits
nv.parameters.block_validator
nv.parameters.chain_db
peer_id in
P2p_peer.Table.add nv.active_peers peer_id pv ;
return pv
pv
let may_update_checkpoint chain_state new_head =
State.Chain.checkpoint chain_state >>= fun (old_level, _old_block) ->
@ -339,15 +340,22 @@ let on_completion (type a) w (req : a Request.t) (update : a) request_status =
let on_close w =
let nv = Worker.state w in
Distributed_db.deactivate nv.parameters.chain_db >>= fun () ->
begin
P2p_peer.Table.fold
(fun _ pv acc ->
acc >>= fun acc ->
pv >|= function
| Ok pv -> Peer_validator.shutdown pv :: acc
| Error _ -> acc)
nv.active_peers (Lwt.return [])
end >>= fun pvs ->
Lwt.join
(begin match nv.prevalidator with
| Some prevalidator -> Prevalidator.shutdown prevalidator
| None -> Lwt.return_unit
end ::
Lwt_utils.may ~f:(fun (_, shutdown) -> shutdown ()) nv.child ::
P2p_peer.Table.fold
(fun _ pv acc -> Peer_validator.shutdown pv :: acc)
nv.active_peers []) >>= fun () ->
pvs) >>= fun () ->
Lwt.return_unit
let on_launch start_prevalidator w _ parameters =
@ -413,9 +421,14 @@ let on_launch start_prevalidator w _ parameters =
end ;
disconnection = begin fun peer_id ->
Lwt.async begin fun () ->
may_activate_peer_validator w peer_id >>=? fun pv ->
Peer_validator.shutdown pv >>= fun () ->
return_unit
let nv = Worker.state w in
match P2p_peer.Table.find_opt nv.active_peers peer_id with
| Some pv ->
pv >>=? fun pv ->
Peer_validator.shutdown pv >>= fun () ->
return_unit
| None ->
return_unit
end
end ;
} ;