Distributed_db: handle timeout in fetch
.
This commit is contained in:
parent
a8a906b1ae
commit
ed75bc5acd
@ -876,13 +876,17 @@ module type DISTRIBUTED_DB = sig
|
||||
val known: t -> key -> bool Lwt.t
|
||||
type error += Missing_data of key
|
||||
type error += Canceled of key
|
||||
type error += Timeout of key
|
||||
val read: t -> key -> value tzresult Lwt.t
|
||||
val read_opt: t -> key -> value option Lwt.t
|
||||
val read_exn: t -> key -> value Lwt.t
|
||||
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
||||
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
||||
val fetch:
|
||||
t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t
|
||||
t ->
|
||||
?peer:P2p.Peer_id.t ->
|
||||
?timeout:float ->
|
||||
key -> param -> value tzresult Lwt.t
|
||||
val clear_or_cancel: t -> key -> unit
|
||||
end
|
||||
|
||||
@ -899,11 +903,13 @@ module Make
|
||||
let known t k = Table.known (Kind.proj t) k
|
||||
type error += Missing_data = Table.Missing_data
|
||||
type error += Canceled = Table.Canceled
|
||||
type error += Timeout = Table.Timeout
|
||||
let read t k = Table.read (Kind.proj t) k
|
||||
let read_opt t k = Table.read_opt (Kind.proj t) k
|
||||
let read_exn t k = Table.read_exn (Kind.proj t) k
|
||||
let prefetch t ?peer k p = Table.prefetch (Kind.proj t) ?peer k p
|
||||
let fetch t ?peer k p = Table.fetch (Kind.proj t) ?peer k p
|
||||
let fetch t ?peer ?timeout k p =
|
||||
Table.fetch (Kind.proj t) ?peer ?timeout k p
|
||||
let clear_or_cancel t k = Table.clear_or_cancel (Kind.proj t) k
|
||||
let inject t k v = Table.inject (Kind.proj t) k v
|
||||
let watch t = Table.watch (Kind.proj t)
|
||||
|
@ -79,13 +79,17 @@ module type DISTRIBUTED_DB = sig
|
||||
val known: t -> key -> bool Lwt.t
|
||||
type error += Missing_data of key
|
||||
type error += Canceled of key
|
||||
type error += Timeout of key
|
||||
val read: t -> key -> value tzresult Lwt.t
|
||||
val read_opt: t -> key -> value option Lwt.t
|
||||
val read_exn: t -> key -> value Lwt.t
|
||||
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
||||
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
||||
val fetch:
|
||||
t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t
|
||||
t ->
|
||||
?peer:P2p.Peer_id.t ->
|
||||
?timeout:float ->
|
||||
key -> param -> value tzresult Lwt.t
|
||||
val clear_or_cancel: t -> key -> unit
|
||||
end
|
||||
|
||||
|
@ -20,13 +20,18 @@ module type DISTRIBUTED_DB = sig
|
||||
|
||||
type error += Missing_data of key
|
||||
type error += Canceled of key
|
||||
type error += Timeout of key
|
||||
|
||||
val read: t -> key -> value tzresult Lwt.t
|
||||
val read_opt: t -> key -> value option Lwt.t
|
||||
val read_exn: t -> key -> value Lwt.t
|
||||
|
||||
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
||||
val fetch:
|
||||
t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t
|
||||
t ->
|
||||
?peer:P2p.Peer_id.t ->
|
||||
?timeout:float ->
|
||||
key -> param -> value tzresult Lwt.t
|
||||
|
||||
val clear_or_cancel: t -> key -> unit
|
||||
val inject: t -> key -> value -> bool Lwt.t
|
||||
@ -135,6 +140,7 @@ end = struct
|
||||
|
||||
type error += Missing_data of key
|
||||
type error += Canceled of key
|
||||
type error += Timeout of key
|
||||
|
||||
let () =
|
||||
Error_monad.register_error_kind `Permanent
|
||||
@ -155,7 +161,7 @@ end = struct
|
||||
| Found v -> return v
|
||||
| Pending _ -> fail (Missing_data k)
|
||||
|
||||
let wrap s k t =
|
||||
let wrap s k ?timeout t =
|
||||
let t = Lwt.protected t in
|
||||
Lwt.on_cancel t begin fun () ->
|
||||
match Memory_table.find s.memory k with
|
||||
@ -168,9 +174,14 @@ end = struct
|
||||
Scheduler.notify_cancelation s.scheduler k ;
|
||||
end
|
||||
end ;
|
||||
t
|
||||
match timeout with
|
||||
| None -> t
|
||||
| Some delay ->
|
||||
let timeout =
|
||||
Lwt_unix.sleep delay >>= fun () -> fail (Timeout k) in
|
||||
Lwt.pick [ t ; timeout ]
|
||||
|
||||
let fetch s ?peer k param =
|
||||
let fetch s ?peer ?timeout k param =
|
||||
match Memory_table.find s.memory k with
|
||||
| exception Not_found -> begin
|
||||
Disk_table.read_opt s.disk k >>= function
|
||||
@ -182,18 +193,18 @@ end = struct
|
||||
Memory_table.add s.memory k
|
||||
(Pending { wakener ; waiters = 1 ; param }) ;
|
||||
Scheduler.request s.scheduler peer k ;
|
||||
wrap s k waiter
|
||||
wrap s k ?timeout waiter
|
||||
end
|
||||
| Pending data ->
|
||||
Scheduler.request s.scheduler peer k ;
|
||||
data.waiters <- data.waiters + 1 ;
|
||||
wrap s k (Lwt.waiter_of_wakener data.wakener)
|
||||
wrap s k ?timeout (Lwt.waiter_of_wakener data.wakener)
|
||||
| Found v -> return v
|
||||
end
|
||||
| Pending data ->
|
||||
Scheduler.request s.scheduler peer k ;
|
||||
data.waiters <- data.waiters + 1 ;
|
||||
wrap s k (Lwt.waiter_of_wakener data.wakener)
|
||||
wrap s k ?timeout (Lwt.waiter_of_wakener data.wakener)
|
||||
| Found v -> return v
|
||||
|
||||
let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param)
|
||||
|
@ -18,13 +18,18 @@ module type DISTRIBUTED_DB = sig
|
||||
|
||||
type error += Missing_data of key
|
||||
type error += Canceled of key
|
||||
type error += Timeout of key
|
||||
|
||||
val read: t -> key -> value tzresult Lwt.t
|
||||
val read_opt: t -> key -> value option Lwt.t
|
||||
val read_exn: t -> key -> value Lwt.t
|
||||
|
||||
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
||||
val fetch:
|
||||
t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t
|
||||
t ->
|
||||
?peer:P2p.Peer_id.t ->
|
||||
?timeout:float ->
|
||||
key -> param -> value tzresult Lwt.t
|
||||
|
||||
val clear_or_cancel: t -> key -> unit
|
||||
val inject: t -> key -> value -> bool Lwt.t
|
||||
|
Loading…
Reference in New Issue
Block a user