Distributed_db: use the error monad for fetch.

This commit is contained in:
Grégoire Henry 2017-11-08 12:42:09 +01:00 committed by Benjamin Canou
parent f40c418d0f
commit a8a906b1ae
5 changed files with 27 additions and 17 deletions

View File

@ -875,12 +875,14 @@ module type DISTRIBUTED_DB = sig
type param type param
val known: t -> key -> bool Lwt.t val known: t -> key -> bool Lwt.t
type error += Missing_data of key type error += Missing_data of key
type error += Canceled of key
val read: t -> key -> value tzresult Lwt.t val read: t -> key -> value tzresult Lwt.t
val read_opt: t -> key -> value option Lwt.t val read_opt: t -> key -> value option Lwt.t
val read_exn: t -> key -> value Lwt.t val read_exn: t -> key -> value Lwt.t
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t val fetch:
t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t
val clear_or_cancel: t -> key -> unit val clear_or_cancel: t -> key -> unit
end end
@ -896,6 +898,7 @@ module Make
type param = Table.param type param = Table.param
let known t k = Table.known (Kind.proj t) k let known t k = Table.known (Kind.proj t) k
type error += Missing_data = Table.Missing_data type error += Missing_data = Table.Missing_data
type error += Canceled = Table.Canceled
let read t k = Table.read (Kind.proj t) k let read t k = Table.read (Kind.proj t) k
let read_opt t k = Table.read_opt (Kind.proj t) k let read_opt t k = Table.read_opt (Kind.proj t) k
let read_exn t k = Table.read_exn (Kind.proj t) k let read_exn t k = Table.read_exn (Kind.proj t) k

View File

@ -78,12 +78,14 @@ module type DISTRIBUTED_DB = sig
type param type param
val known: t -> key -> bool Lwt.t val known: t -> key -> bool Lwt.t
type error += Missing_data of key type error += Missing_data of key
type error += Canceled of key
val read: t -> key -> value tzresult Lwt.t val read: t -> key -> value tzresult Lwt.t
val read_opt: t -> key -> value option Lwt.t val read_opt: t -> key -> value option Lwt.t
val read_exn: t -> key -> value Lwt.t val read_exn: t -> key -> value Lwt.t
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t val fetch:
t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t
val clear_or_cancel: t -> key -> unit val clear_or_cancel: t -> key -> unit
end end

View File

@ -19,12 +19,14 @@ module type DISTRIBUTED_DB = sig
val known: t -> key -> bool Lwt.t val known: t -> key -> bool Lwt.t
type error += Missing_data of key type error += Missing_data of key
type error += Canceled of key
val read: t -> key -> value tzresult Lwt.t val read: t -> key -> value tzresult Lwt.t
val read_opt: t -> key -> value option Lwt.t val read_opt: t -> key -> value option Lwt.t
val read_exn: t -> key -> value Lwt.t val read_exn: t -> key -> value Lwt.t
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t val fetch:
t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t
val clear_or_cancel: t -> key -> unit val clear_or_cancel: t -> key -> unit
val inject: t -> key -> value -> bool Lwt.t val inject: t -> key -> value -> bool Lwt.t
@ -108,7 +110,7 @@ end = struct
} }
and status = and status =
| Pending of { wakener : value Lwt.u ; | Pending of { wakener : value tzresult Lwt.u ;
mutable waiters : int ; mutable waiters : int ;
param : param } param : param }
| Found of value | Found of value
@ -132,6 +134,7 @@ end = struct
| Pending _ -> Lwt.fail Not_found | Pending _ -> Lwt.fail Not_found
type error += Missing_data of key type error += Missing_data of key
type error += Canceled of key
let () = let () =
Error_monad.register_error_kind `Permanent Error_monad.register_error_kind `Permanent
@ -171,7 +174,7 @@ end = struct
match Memory_table.find s.memory k with match Memory_table.find s.memory k with
| exception Not_found -> begin | exception Not_found -> begin
Disk_table.read_opt s.disk k >>= function Disk_table.read_opt s.disk k >>= function
| Some v -> Lwt.return v | Some v -> return v
| None -> | None ->
match Memory_table.find s.memory k with match Memory_table.find s.memory k with
| exception Not_found -> begin | exception Not_found -> begin
@ -185,13 +188,13 @@ end = struct
Scheduler.request s.scheduler peer k ; Scheduler.request s.scheduler peer k ;
data.waiters <- data.waiters + 1 ; data.waiters <- data.waiters + 1 ;
wrap s k (Lwt.waiter_of_wakener data.wakener) wrap s k (Lwt.waiter_of_wakener data.wakener)
| Found v -> Lwt.return v | Found v -> return v
end end
| Pending data -> | Pending data ->
Scheduler.request s.scheduler peer k ; Scheduler.request s.scheduler peer k ;
data.waiters <- data.waiters + 1 ; data.waiters <- data.waiters + 1 ;
wrap s k (Lwt.waiter_of_wakener data.wakener) wrap s k (Lwt.waiter_of_wakener data.wakener)
| Found v -> Lwt.return v | Found v -> return v
let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param) let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param)
@ -214,7 +217,7 @@ end = struct
| Some v -> | Some v ->
Scheduler.notify s.scheduler p k ; Scheduler.notify s.scheduler p k ;
Memory_table.replace s.memory k (Found v) ; Memory_table.replace s.memory k (Found v) ;
Lwt.wakeup w v ; Lwt.wakeup_later w (Ok v) ;
iter_option s.global_input iter_option s.global_input
~f:(fun input -> Watcher.notify input (k, v)) ; ~f:(fun input -> Watcher.notify input (k, v)) ;
Watcher.notify s.input (k, v) ; Watcher.notify s.input (k, v) ;
@ -244,7 +247,7 @@ end = struct
| Pending { wakener = w ; _ } -> | Pending { wakener = w ; _ } ->
Scheduler.notify_cancelation s.scheduler k ; Scheduler.notify_cancelation s.scheduler k ;
Memory_table.remove s.memory k ; Memory_table.remove s.memory k ;
Lwt.wakeup_later_exn w Lwt.Canceled Lwt.wakeup_later w (Error [Canceled k])
| Found _ -> Memory_table.remove s.memory k | Found _ -> Memory_table.remove s.memory k
let watch s = Watcher.create_stream s.input let watch s = Watcher.create_stream s.input

View File

@ -17,12 +17,14 @@ module type DISTRIBUTED_DB = sig
val known: t -> key -> bool Lwt.t val known: t -> key -> bool Lwt.t
type error += Missing_data of key type error += Missing_data of key
type error += Canceled of key
val read: t -> key -> value tzresult Lwt.t val read: t -> key -> value tzresult Lwt.t
val read_opt: t -> key -> value option Lwt.t val read_opt: t -> key -> value option Lwt.t
val read_exn: t -> key -> value Lwt.t val read_exn: t -> key -> value Lwt.t
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t val fetch:
t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t
val clear_or_cancel: t -> key -> unit val clear_or_cancel: t -> key -> unit
val inject: t -> key -> value -> bool Lwt.t val inject: t -> key -> value -> bool Lwt.t

View File

@ -66,7 +66,7 @@ let bootstrapped v = v.bootstrapped
let fetch_protocol v hash = let fetch_protocol v hash =
lwt_log_notice "Fetching protocol %a" lwt_log_notice "Fetching protocol %a"
Protocol_hash.pp_short hash >>= fun () -> Protocol_hash.pp_short hash >>= fun () ->
Distributed_db.Protocol.fetch v.worker.db hash () >>= fun protocol -> Distributed_db.Protocol.fetch v.worker.db hash () >>=? fun protocol ->
Updater.compile hash protocol >>= fun valid -> Updater.compile hash protocol >>= fun valid ->
if valid then begin if valid then begin
lwt_log_notice "Successfully compiled protocol %a" lwt_log_notice "Successfully compiled protocol %a"
@ -253,7 +253,7 @@ let apply_block net_state db
lwt_log_info "validation of %a: looking for dependencies..." lwt_log_info "validation of %a: looking for dependencies..."
Block_hash.pp_short hash >>= fun () -> Block_hash.pp_short hash >>= fun () ->
Distributed_db.Operations.fetch Distributed_db.Operations.fetch
db (hash, 0) block.shell.operations_hash >>= fun operations -> db (hash, 0) block.shell.operations_hash >>=? fun operations ->
fail_unless (block.shell.validation_passes <= 1) fail_unless (block.shell.validation_passes <= 1)
(* TODO constant to be exported from the protocol... *) (* TODO constant to be exported from the protocol... *)
(failure "unexpected error (TO BE REMOVED)") >>=? fun () -> (failure "unexpected error (TO BE REMOVED)") >>=? fun () ->
@ -399,7 +399,7 @@ module Context_db = struct
assert (not (Block_hash.Table.mem tbl hash)); assert (not (Block_hash.Table.mem tbl hash));
let waiter, wakener = Lwt.wait () in let waiter, wakener = Lwt.wait () in
let data = let data =
Distributed_db.Block_header.fetch net_db hash () >>= return in Distributed_db.Block_header.fetch net_db hash () in
match Lwt.state data with match Lwt.state data with
| Lwt.Return data -> | Lwt.Return data ->
let state = `Inited data in let state = `Inited data in
@ -733,13 +733,13 @@ let rec create_validator ?parent worker ?max_child_ttl state db net =
Block_hash.equal (State.Net.genesis child.net).block genesis in Block_hash.equal (State.Net.genesis child.net).block genesis in
begin begin
match max_child_ttl with match max_child_ttl with
| None -> Lwt.return expiration | None -> return expiration
| Some ttl -> | Some ttl ->
Distributed_db.Block_header.fetch net_db genesis () >>= fun genesis -> Distributed_db.Block_header.fetch net_db genesis () >>=? fun genesis ->
Lwt.return return
(Time.min expiration (Time.min expiration
(Time.add genesis.shell.timestamp (Int64.of_int ttl))) (Time.add genesis.shell.timestamp (Int64.of_int ttl)))
end >>= fun local_expiration -> end >>=? fun local_expiration ->
let expired = Time.(local_expiration <= current_time) in let expired = Time.(local_expiration <= current_time) in
if expired && activated then if expired && activated then
deactivate_child () >>= return deactivate_child () >>= return