diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index 6b214323a..a3a073249 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -875,12 +875,14 @@ module type DISTRIBUTED_DB = sig type param val known: t -> key -> bool Lwt.t type error += Missing_data of key + type error += Canceled of key val read: t -> key -> value tzresult Lwt.t val read_opt: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit - val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t + val fetch: + t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t val clear_or_cancel: t -> key -> unit end @@ -896,6 +898,7 @@ module Make type param = Table.param let known t k = Table.known (Kind.proj t) k type error += Missing_data = Table.Missing_data + type error += Canceled = Table.Canceled let read t k = Table.read (Kind.proj t) k let read_opt t k = Table.read_opt (Kind.proj t) k let read_exn t k = Table.read_exn (Kind.proj t) k diff --git a/src/node/shell/distributed_db.mli b/src/node/shell/distributed_db.mli index 7dec7d299..b90e16847 100644 --- a/src/node/shell/distributed_db.mli +++ b/src/node/shell/distributed_db.mli @@ -78,12 +78,14 @@ module type DISTRIBUTED_DB = sig type param val known: t -> key -> bool Lwt.t type error += Missing_data of key + type error += Canceled of key val read: t -> key -> value tzresult Lwt.t val read_opt: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit - val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t + val fetch: + t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t val clear_or_cancel: t -> key -> unit end diff --git a/src/node/shell/distributed_db_functors.ml b/src/node/shell/distributed_db_functors.ml index 60cda7335..0a50d3a5a 100644 --- a/src/node/shell/distributed_db_functors.ml +++ b/src/node/shell/distributed_db_functors.ml @@ -19,12 +19,14 @@ module type DISTRIBUTED_DB = sig val known: t -> key -> bool Lwt.t type error += Missing_data of key + type error += Canceled of key val read: t -> key -> value tzresult Lwt.t val read_opt: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit - val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t + val fetch: + t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t val clear_or_cancel: t -> key -> unit val inject: t -> key -> value -> bool Lwt.t @@ -108,7 +110,7 @@ end = struct } and status = - | Pending of { wakener : value Lwt.u ; + | Pending of { wakener : value tzresult Lwt.u ; mutable waiters : int ; param : param } | Found of value @@ -132,6 +134,7 @@ end = struct | Pending _ -> Lwt.fail Not_found type error += Missing_data of key + type error += Canceled of key let () = Error_monad.register_error_kind `Permanent @@ -171,7 +174,7 @@ end = struct match Memory_table.find s.memory k with | exception Not_found -> begin Disk_table.read_opt s.disk k >>= function - | Some v -> Lwt.return v + | Some v -> return v | None -> match Memory_table.find s.memory k with | exception Not_found -> begin @@ -185,13 +188,13 @@ end = struct Scheduler.request s.scheduler peer k ; data.waiters <- data.waiters + 1 ; wrap s k (Lwt.waiter_of_wakener data.wakener) - | Found v -> Lwt.return v + | Found v -> return v end | Pending data -> Scheduler.request s.scheduler peer k ; data.waiters <- data.waiters + 1 ; wrap s k (Lwt.waiter_of_wakener data.wakener) - | Found v -> Lwt.return v + | Found v -> return v let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param) @@ -214,7 +217,7 @@ end = struct | Some v -> Scheduler.notify s.scheduler p k ; Memory_table.replace s.memory k (Found v) ; - Lwt.wakeup w v ; + Lwt.wakeup_later w (Ok v) ; iter_option s.global_input ~f:(fun input -> Watcher.notify input (k, v)) ; Watcher.notify s.input (k, v) ; @@ -244,7 +247,7 @@ end = struct | Pending { wakener = w ; _ } -> Scheduler.notify_cancelation s.scheduler k ; Memory_table.remove s.memory k ; - Lwt.wakeup_later_exn w Lwt.Canceled + Lwt.wakeup_later w (Error [Canceled k]) | Found _ -> Memory_table.remove s.memory k let watch s = Watcher.create_stream s.input diff --git a/src/node/shell/distributed_db_functors.mli b/src/node/shell/distributed_db_functors.mli index 756e5a20e..9babd8762 100644 --- a/src/node/shell/distributed_db_functors.mli +++ b/src/node/shell/distributed_db_functors.mli @@ -17,12 +17,14 @@ module type DISTRIBUTED_DB = sig val known: t -> key -> bool Lwt.t type error += Missing_data of key + type error += Canceled of key val read: t -> key -> value tzresult Lwt.t val read_opt: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit - val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t + val fetch: + t -> ?peer:P2p.Peer_id.t -> key -> param -> value tzresult Lwt.t val clear_or_cancel: t -> key -> unit val inject: t -> key -> value -> bool Lwt.t diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index 187cf7c30..78e61eb2d 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -66,7 +66,7 @@ let bootstrapped v = v.bootstrapped let fetch_protocol v hash = lwt_log_notice "Fetching protocol %a" Protocol_hash.pp_short hash >>= fun () -> - Distributed_db.Protocol.fetch v.worker.db hash () >>= fun protocol -> + Distributed_db.Protocol.fetch v.worker.db hash () >>=? fun protocol -> Updater.compile hash protocol >>= fun valid -> if valid then begin lwt_log_notice "Successfully compiled protocol %a" @@ -253,7 +253,7 @@ let apply_block net_state db lwt_log_info "validation of %a: looking for dependencies..." Block_hash.pp_short hash >>= fun () -> Distributed_db.Operations.fetch - db (hash, 0) block.shell.operations_hash >>= fun operations -> + db (hash, 0) block.shell.operations_hash >>=? fun operations -> fail_unless (block.shell.validation_passes <= 1) (* TODO constant to be exported from the protocol... *) (failure "unexpected error (TO BE REMOVED)") >>=? fun () -> @@ -399,7 +399,7 @@ module Context_db = struct assert (not (Block_hash.Table.mem tbl hash)); let waiter, wakener = Lwt.wait () in let data = - Distributed_db.Block_header.fetch net_db hash () >>= return in + Distributed_db.Block_header.fetch net_db hash () in match Lwt.state data with | Lwt.Return data -> let state = `Inited data in @@ -733,13 +733,13 @@ let rec create_validator ?parent worker ?max_child_ttl state db net = Block_hash.equal (State.Net.genesis child.net).block genesis in begin match max_child_ttl with - | None -> Lwt.return expiration + | None -> return expiration | Some ttl -> - Distributed_db.Block_header.fetch net_db genesis () >>= fun genesis -> - Lwt.return + Distributed_db.Block_header.fetch net_db genesis () >>=? fun genesis -> + return (Time.min expiration (Time.add genesis.shell.timestamp (Int64.of_int ttl))) - end >>= fun local_expiration -> + end >>=? fun local_expiration -> let expired = Time.(local_expiration <= current_time) in if expired && activated then deactivate_child () >>= return