Distributed_db: fix concurency issue

Some `events` might be lost in case of the timeout happens before
concomitantly. We avoid this by storing the `events` promise into the
worker state.
This commit is contained in:
Grégoire Henry 2017-09-29 18:43:13 +02:00 committed by Benjamin Canou
parent 5c03d92457
commit 79ae54625d

View File

@ -281,6 +281,7 @@ end = struct
pending: status Table.t ; pending: status Table.t ;
cancelation: unit -> unit Lwt.t ; cancelation: unit -> unit Lwt.t ;
wait_events: unit -> event list Lwt.t ; wait_events: unit -> event list Lwt.t ;
mutable events: event list Lwt.t ;
} }
and status = { and status = {
@ -329,62 +330,62 @@ end = struct
(* TODO *) (* TODO *)
Lwt.return_unit Lwt.return_unit
let worker_loop state = let rec worker_loop state =
let process = process_event state in let shutdown = state.cancelation ()
let rec loop () = and timeout = compute_timeout state in
let shutdown = state.cancelation () >|= fun () -> `Shutdown Lwt.choose [ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () ->
and timeout = compute_timeout state >|= fun () -> `Timeout if Lwt.state shutdown <> Lwt.Sleep then
and events = state.wait_events () >|= fun events -> `Events events in Lwt.return_unit
Lwt.pick [ timeout ; events ; shutdown ] >>= function else if Lwt.state state.events <> Lwt.Sleep then
| `Shutdown -> Lwt.return_unit state.events >>= fun events ->
| `Events events -> state.events <- state.wait_events () ;
Lwt_list.iter_s process events >>= fun () -> Lwt_list.iter_s (process_event state) events >>= fun () ->
loop () worker_loop state
| `Timeout -> else
let now = Unix.gettimeofday () in let now = Unix.gettimeofday () in
let active_peers = Request.active state.param in let active_peers = Request.active state.param in
let requests = let requests =
Table.fold Table.fold
(fun key { peers ; next_request ; delay } acc -> (fun key { peers ; next_request ; delay } acc ->
if next_request > now +. 0.2 then if next_request > now +. 0.2 then
acc acc
else else
let still_peers = P2p.Peer_id.Set.inter peers active_peers in let remaining_peers =
if P2p.Peer_id.Set.is_empty still_peers && P2p.Peer_id.Set.inter peers active_peers in
not (P2p.Peer_id.Set.is_empty peers) then if P2p.Peer_id.Set.is_empty remaining_peers &&
( Table.remove state.pending key ; acc ) not (P2p.Peer_id.Set.is_empty peers) then
else ( Table.remove state.pending key ; acc )
let requested_peers = else
if P2p.Peer_id.Set.is_empty peers let requested_peers =
then active_peers if P2p.Peer_id.Set.is_empty remaining_peers
else peers in then active_peers
let next = { peers = still_peers ; else remaining_peers
next_request = now +. delay ; in
delay = delay *. 1.2 } in let next = { peers = remaining_peers ;
Table.replace state.pending key next ; next_request = now +. delay ;
P2p.Peer_id.Set.fold delay = delay *. 1.2 } in
(fun gid acc -> Table.replace state.pending key next ;
let requests = P2p.Peer_id.Set.fold
try key :: P2p_types.Peer_id.Map.find gid acc (fun gid acc ->
with Not_found -> [key] in let requests =
P2p_types.Peer_id.Map.add gid requests acc) try key :: P2p_types.Peer_id.Map.find gid acc
requested_peers with Not_found -> [key] in
acc) P2p_types.Peer_id.Map.add gid requests acc)
state.pending P2p_types.Peer_id.Map.empty in requested_peers
P2p_types.Peer_id.Map.iter (Request.send state.param) requests ; acc)
loop () state.pending P2p_types.Peer_id.Map.empty in
in P2p_types.Peer_id.Map.iter (Request.send state.param) requests ;
loop worker_loop state
let create param = let create param =
let cancelation, cancel_worker, _ = Lwt_utils.canceler () in let cancelation, cancel_worker, _ = Lwt_utils.canceler () in
let push_to_worker, wait_events = Lwt_utils.queue () in let push_to_worker, wait_events = Lwt_utils.queue () in
let pending = Table.create 17 in let pending = Table.create 17 in
let worker_state = let worker_state =
{ cancelation ; wait_events ; pending ; param } in { cancelation ; wait_events ; pending ; param ; events = wait_events () } in
let worker = let worker =
Lwt_utils.worker "db_request_scheduler" Lwt_utils.worker "db_request_scheduler"
~run:(worker_loop worker_state) ~run:(fun () -> worker_loop worker_state)
~cancel:cancel_worker in ~cancel:cancel_worker in
{ cancel_worker ; push_to_worker ; worker } { cancel_worker ; push_to_worker ; worker }