diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index a3a073249..615f128e3 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -876,13 +876,17 @@ module type DISTRIBUTED_DB = sig val known: t -> key -> bool Lwt.t type error += Missing_data of key type error += Canceled of key + type error += Timeout of key val read: t -> key -> value tzresult Lwt.t val read_opt: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val fetch: - t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t + t -> + ?peer:P2p.Peer_id.t -> + ?timeout:float -> + key -> param -> value tzresult Lwt.t val clear_or_cancel: t -> key -> unit end @@ -899,11 +903,13 @@ module Make let known t k = Table.known (Kind.proj t) k type error += Missing_data = Table.Missing_data type error += Canceled = Table.Canceled + type error += Timeout = Table.Timeout let read t k = Table.read (Kind.proj t) k let read_opt t k = Table.read_opt (Kind.proj t) k let read_exn t k = Table.read_exn (Kind.proj t) k let prefetch t ?peer k p = Table.prefetch (Kind.proj t) ?peer k p - let fetch t ?peer k p = Table.fetch (Kind.proj t) ?peer k p + let fetch t ?peer ?timeout k p = + Table.fetch (Kind.proj t) ?peer ?timeout k p let clear_or_cancel t k = Table.clear_or_cancel (Kind.proj t) k let inject t k v = Table.inject (Kind.proj t) k v let watch t = Table.watch (Kind.proj t) diff --git a/src/node/shell/distributed_db.mli b/src/node/shell/distributed_db.mli index b90e16847..586b786fc 100644 --- a/src/node/shell/distributed_db.mli +++ b/src/node/shell/distributed_db.mli @@ -79,13 +79,17 @@ module type DISTRIBUTED_DB = sig val known: t -> key -> bool Lwt.t type error += Missing_data of key type error += Canceled of key + type error += Timeout of key val read: t -> key -> value tzresult Lwt.t val read_opt: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val fetch: - t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t + t -> + ?peer:P2p.Peer_id.t -> + ?timeout:float -> + key -> param -> value tzresult Lwt.t val clear_or_cancel: t -> key -> unit end diff --git a/src/node/shell/distributed_db_functors.ml b/src/node/shell/distributed_db_functors.ml index 0a50d3a5a..e09656800 100644 --- a/src/node/shell/distributed_db_functors.ml +++ b/src/node/shell/distributed_db_functors.ml @@ -20,13 +20,18 @@ module type DISTRIBUTED_DB = sig type error += Missing_data of key type error += Canceled of key + type error += Timeout of key + val read: t -> key -> value tzresult Lwt.t val read_opt: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val fetch: - t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t + t -> + ?peer:P2p.Peer_id.t -> + ?timeout:float -> + key -> param -> value tzresult Lwt.t val clear_or_cancel: t -> key -> unit val inject: t -> key -> value -> bool Lwt.t @@ -135,6 +140,7 @@ end = struct type error += Missing_data of key type error += Canceled of key + type error += Timeout of key let () = Error_monad.register_error_kind `Permanent @@ -155,7 +161,7 @@ end = struct | Found v -> return v | Pending _ -> fail (Missing_data k) - let wrap s k t = + let wrap s k ?timeout t = let t = Lwt.protected t in Lwt.on_cancel t begin fun () -> match Memory_table.find s.memory k with @@ -168,9 +174,14 @@ end = struct Scheduler.notify_cancelation s.scheduler k ; end end ; - t + match timeout with + | None -> t + | Some delay -> + let timeout = + Lwt_unix.sleep delay >>= fun () -> fail (Timeout k) in + Lwt.pick [ t ; timeout ] - let fetch s ?peer k param = + let fetch s ?peer ?timeout k param = match Memory_table.find s.memory k with | exception Not_found -> begin Disk_table.read_opt s.disk k >>= function @@ -182,18 +193,18 @@ end = struct Memory_table.add s.memory k (Pending { wakener ; waiters = 1 ; param }) ; Scheduler.request s.scheduler peer k ; - wrap s k waiter + wrap s k ?timeout waiter end | Pending data -> Scheduler.request s.scheduler peer k ; data.waiters <- data.waiters + 1 ; - wrap s k (Lwt.waiter_of_wakener data.wakener) + wrap s k ?timeout (Lwt.waiter_of_wakener data.wakener) | Found v -> return v end | Pending data -> Scheduler.request s.scheduler peer k ; data.waiters <- data.waiters + 1 ; - wrap s k (Lwt.waiter_of_wakener data.wakener) + wrap s k ?timeout (Lwt.waiter_of_wakener data.wakener) | Found v -> return v let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param) diff --git a/src/node/shell/distributed_db_functors.mli b/src/node/shell/distributed_db_functors.mli index 9babd8762..75813cd3d 100644 --- a/src/node/shell/distributed_db_functors.mli +++ b/src/node/shell/distributed_db_functors.mli @@ -18,13 +18,18 @@ module type DISTRIBUTED_DB = sig type error += Missing_data of key type error += Canceled of key + type error += Timeout of key + val read: t -> key -> value tzresult Lwt.t val read_opt: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val fetch: - t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t + t -> + ?peer:P2p.Peer_id.t -> + ?timeout:float -> + key -> param -> value tzresult Lwt.t val clear_or_cancel: t -> key -> unit val inject: t -> key -> value -> bool Lwt.t