Distributed_db: fix concurency issue

Concurent request of the same ressource might insert multiple 'Pending
request' in the request tracking table. Resulting, only one of them
will ever be satisfied and some worker might be stuck for ever. We
avoid this be removing any cooperation between lookup and insertion in
the table.
This commit is contained in:
Grégoire Henry 2017-09-29 18:43:13 +02:00 committed by Benjamin Canou
parent 5c1f96f3a1
commit 5c03d92457

View File

@ -151,14 +151,23 @@ end = struct
match Memory_table.find s.memory k with match Memory_table.find s.memory k with
| exception Not_found -> begin | exception Not_found -> begin
Disk_table.read_opt s.disk k >>= function Disk_table.read_opt s.disk k >>= function
| Some v -> Lwt.return v
| None -> | None ->
match Memory_table.find s.memory k with
| exception Not_found -> begin
let waiter, wakener = Lwt.wait () in let waiter, wakener = Lwt.wait () in
Memory_table.add s.memory k (Pending (wakener, param)) ; Memory_table.add s.memory k (Pending (wakener, param)) ;
Scheduler.request s.scheduler peer k ; Scheduler.request s.scheduler peer k ;
waiter waiter
| Some v -> Lwt.return v
end end
| Pending (w, _) -> Lwt.waiter_of_wakener w | Pending (w, _) ->
Scheduler.request s.scheduler peer k ;
Lwt.waiter_of_wakener w
| Found v -> Lwt.return v
end
| Pending (w, _) ->
Scheduler.request s.scheduler peer k ;
Lwt.waiter_of_wakener w
| Found v -> Lwt.return v | Found v -> Lwt.return v
let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param) let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param)