From 79ae54625d9f2184b45fe6373c3a5333bc5030d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Fri, 29 Sep 2017 18:43:13 +0200 Subject: [PATCH] Distributed_db: fix concurency issue Some `events` might be lost in case of the timeout happens before concomitantly. We avoid this by storing the `events` promise into the worker state. --- src/node/shell/distributed_db_functors.ml | 97 ++++++++++++----------- 1 file changed, 49 insertions(+), 48 deletions(-) diff --git a/src/node/shell/distributed_db_functors.ml b/src/node/shell/distributed_db_functors.ml index 8b70525fc..3d8e1f730 100644 --- a/src/node/shell/distributed_db_functors.ml +++ b/src/node/shell/distributed_db_functors.ml @@ -281,6 +281,7 @@ end = struct pending: status Table.t ; cancelation: unit -> unit Lwt.t ; wait_events: unit -> event list Lwt.t ; + mutable events: event list Lwt.t ; } and status = { @@ -329,62 +330,62 @@ end = struct (* TODO *) Lwt.return_unit - let worker_loop state = - let process = process_event state in - let rec loop () = - let shutdown = state.cancelation () >|= fun () -> `Shutdown - and timeout = compute_timeout state >|= fun () -> `Timeout - and events = state.wait_events () >|= fun events -> `Events events in - Lwt.pick [ timeout ; events ; shutdown ] >>= function - | `Shutdown -> Lwt.return_unit - | `Events events -> - Lwt_list.iter_s process events >>= fun () -> - loop () - | `Timeout -> - let now = Unix.gettimeofday () in - let active_peers = Request.active state.param in - let requests = - Table.fold - (fun key { peers ; next_request ; delay } acc -> - if next_request > now +. 0.2 then - acc - else - let still_peers = P2p.Peer_id.Set.inter peers active_peers in - if P2p.Peer_id.Set.is_empty still_peers && - not (P2p.Peer_id.Set.is_empty peers) then - ( Table.remove state.pending key ; acc ) - else - let requested_peers = - if P2p.Peer_id.Set.is_empty peers - then active_peers - else peers in - let next = { peers = still_peers ; - next_request = now +. delay ; - delay = delay *. 1.2 } in - Table.replace state.pending key next ; - P2p.Peer_id.Set.fold - (fun gid acc -> - let requests = - try key :: P2p_types.Peer_id.Map.find gid acc - with Not_found -> [key] in - P2p_types.Peer_id.Map.add gid requests acc) - requested_peers - acc) - state.pending P2p_types.Peer_id.Map.empty in - P2p_types.Peer_id.Map.iter (Request.send state.param) requests ; - loop () - in - loop + let rec worker_loop state = + let shutdown = state.cancelation () + and timeout = compute_timeout state in + Lwt.choose [ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () -> + if Lwt.state shutdown <> Lwt.Sleep then + Lwt.return_unit + else if Lwt.state state.events <> Lwt.Sleep then + state.events >>= fun events -> + state.events <- state.wait_events () ; + Lwt_list.iter_s (process_event state) events >>= fun () -> + worker_loop state + else + let now = Unix.gettimeofday () in + let active_peers = Request.active state.param in + let requests = + Table.fold + (fun key { peers ; next_request ; delay } acc -> + if next_request > now +. 0.2 then + acc + else + let remaining_peers = + P2p.Peer_id.Set.inter peers active_peers in + if P2p.Peer_id.Set.is_empty remaining_peers && + not (P2p.Peer_id.Set.is_empty peers) then + ( Table.remove state.pending key ; acc ) + else + let requested_peers = + if P2p.Peer_id.Set.is_empty remaining_peers + then active_peers + else remaining_peers + in + let next = { peers = remaining_peers ; + next_request = now +. delay ; + delay = delay *. 1.2 } in + Table.replace state.pending key next ; + P2p.Peer_id.Set.fold + (fun gid acc -> + let requests = + try key :: P2p_types.Peer_id.Map.find gid acc + with Not_found -> [key] in + P2p_types.Peer_id.Map.add gid requests acc) + requested_peers + acc) + state.pending P2p_types.Peer_id.Map.empty in + P2p_types.Peer_id.Map.iter (Request.send state.param) requests ; + worker_loop state let create param = let cancelation, cancel_worker, _ = Lwt_utils.canceler () in let push_to_worker, wait_events = Lwt_utils.queue () in let pending = Table.create 17 in let worker_state = - { cancelation ; wait_events ; pending ; param } in + { cancelation ; wait_events ; pending ; param ; events = wait_events () } in let worker = Lwt_utils.worker "db_request_scheduler" - ~run:(worker_loop worker_state) + ~run:(fun () -> worker_loop worker_state) ~cancel:cancel_worker in { cancel_worker ; push_to_worker ; worker }