Distributed_db: handle cancelation of call to fetch.

We properly count the number of waiters and cleanup the table when the
count drops to zero.
This commit is contained in:
Grégoire Henry 2017-11-08 11:59:05 +01:00 committed by Benjamin Canou
parent 2b4e898407
commit 41c82d7481

View File

@ -107,6 +107,7 @@ end = struct
and status = and status =
| Pending of { wakener : value Lwt.u ; | Pending of { wakener : value Lwt.u ;
mutable waiters : int ;
param : param } param : param }
| Found of value | Found of value
@ -149,6 +150,21 @@ end = struct
| Found v -> return v | Found v -> return v
| Pending _ -> fail (Missing_data k) | Pending _ -> fail (Missing_data k)
let wrap s k t =
let t = Lwt.protected t in
Lwt.on_cancel t begin fun () ->
match Memory_table.find s.memory k with
| exception Not_found -> ()
| Found _ -> ()
| Pending data ->
data.waiters <- data.waiters - 1 ;
if data.waiters = 0 then begin
Memory_table.remove s.memory k ;
Scheduler.notify_cancelation s.scheduler k ;
end
end ;
t
let fetch s ?peer k param = let fetch s ?peer k param =
match Memory_table.find s.memory k with match Memory_table.find s.memory k with
| exception Not_found -> begin | exception Not_found -> begin
@ -158,18 +174,21 @@ end = struct
match Memory_table.find s.memory k with match Memory_table.find s.memory k with
| exception Not_found -> begin | exception Not_found -> begin
let waiter, wakener = Lwt.wait () in let waiter, wakener = Lwt.wait () in
Memory_table.add s.memory k (Pending { wakener ; param }) ; Memory_table.add s.memory k
(Pending { wakener ; waiters = 1 ; param }) ;
Scheduler.request s.scheduler peer k ; Scheduler.request s.scheduler peer k ;
waiter wrap s k waiter
end end
| Pending { wakener = w ; _ } -> | Pending data ->
Scheduler.request s.scheduler peer k ; Scheduler.request s.scheduler peer k ;
Lwt.waiter_of_wakener w data.waiters <- data.waiters + 1 ;
wrap s k (Lwt.waiter_of_wakener data.wakener)
| Found v -> Lwt.return v | Found v -> Lwt.return v
end end
| Pending { wakener = w ; _ } -> | Pending data ->
Scheduler.request s.scheduler peer k ; Scheduler.request s.scheduler peer k ;
Lwt.waiter_of_wakener w data.waiters <- data.waiters + 1 ;
wrap s k (Lwt.waiter_of_wakener data.wakener)
| Found v -> Lwt.return v | Found v -> Lwt.return v
let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param) let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param)