diff --git a/src/node/shell/distributed_db_functors.ml b/src/node/shell/distributed_db_functors.ml index 272a1f161..984c613f5 100644 --- a/src/node/shell/distributed_db_functors.ml +++ b/src/node/shell/distributed_db_functors.ml @@ -107,6 +107,7 @@ end = struct and status = | Pending of { wakener : value Lwt.u ; + mutable waiters : int ; param : param } | Found of value @@ -149,6 +150,21 @@ end = struct | Found v -> return v | Pending _ -> fail (Missing_data k) + let wrap s k t = + let t = Lwt.protected t in + Lwt.on_cancel t begin fun () -> + match Memory_table.find s.memory k with + | exception Not_found -> () + | Found _ -> () + | Pending data -> + data.waiters <- data.waiters - 1 ; + if data.waiters = 0 then begin + Memory_table.remove s.memory k ; + Scheduler.notify_cancelation s.scheduler k ; + end + end ; + t + let fetch s ?peer k param = match Memory_table.find s.memory k with | exception Not_found -> begin @@ -158,18 +174,21 @@ end = struct match Memory_table.find s.memory k with | exception Not_found -> begin let waiter, wakener = Lwt.wait () in - Memory_table.add s.memory k (Pending { wakener ; param }) ; + Memory_table.add s.memory k + (Pending { wakener ; waiters = 1 ; param }) ; Scheduler.request s.scheduler peer k ; - waiter + wrap s k waiter end - | Pending { wakener = w ; _ } -> + | Pending data -> Scheduler.request s.scheduler peer k ; - Lwt.waiter_of_wakener w + data.waiters <- data.waiters + 1 ; + wrap s k (Lwt.waiter_of_wakener data.wakener) | Found v -> Lwt.return v end - | Pending { wakener = w ; _ } -> + | Pending data -> Scheduler.request s.scheduler peer k ; - Lwt.waiter_of_wakener w + data.waiters <- data.waiters + 1 ; + wrap s k (Lwt.waiter_of_wakener data.wakener) | Found v -> Lwt.return v let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param)