From 41c82d7481c8ee4a803f082b910f8bb9889e06d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Wed, 8 Nov 2017 11:59:05 +0100 Subject: [PATCH] Distributed_db: handle `cancelation` of call to `fetch`. We properly count the number of waiters and cleanup the table when the count drops to zero. --- src/node/shell/distributed_db_functors.ml | 31 ++++++++++++++++++----- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/node/shell/distributed_db_functors.ml b/src/node/shell/distributed_db_functors.ml index 272a1f161..984c613f5 100644 --- a/src/node/shell/distributed_db_functors.ml +++ b/src/node/shell/distributed_db_functors.ml @@ -107,6 +107,7 @@ end = struct and status = | Pending of { wakener : value Lwt.u ; + mutable waiters : int ; param : param } | Found of value @@ -149,6 +150,21 @@ end = struct | Found v -> return v | Pending _ -> fail (Missing_data k) + let wrap s k t = + let t = Lwt.protected t in + Lwt.on_cancel t begin fun () -> + match Memory_table.find s.memory k with + | exception Not_found -> () + | Found _ -> () + | Pending data -> + data.waiters <- data.waiters - 1 ; + if data.waiters = 0 then begin + Memory_table.remove s.memory k ; + Scheduler.notify_cancelation s.scheduler k ; + end + end ; + t + let fetch s ?peer k param = match Memory_table.find s.memory k with | exception Not_found -> begin @@ -158,18 +174,21 @@ end = struct match Memory_table.find s.memory k with | exception Not_found -> begin let waiter, wakener = Lwt.wait () in - Memory_table.add s.memory k (Pending { wakener ; param }) ; + Memory_table.add s.memory k + (Pending { wakener ; waiters = 1 ; param }) ; Scheduler.request s.scheduler peer k ; - waiter + wrap s k waiter end - | Pending { wakener = w ; _ } -> + | Pending data -> Scheduler.request s.scheduler peer k ; - Lwt.waiter_of_wakener w + data.waiters <- data.waiters + 1 ; + wrap s k (Lwt.waiter_of_wakener data.wakener) | Found v -> Lwt.return v end - | Pending { wakener = w ; _ } -> + | Pending data -> Scheduler.request s.scheduler peer k ; - Lwt.waiter_of_wakener w + data.waiters <- data.waiters + 1 ; + wrap s k (Lwt.waiter_of_wakener data.wakener) | Found v -> Lwt.return v let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param)