9ecc2e517c
Co-authored with `Grégoire Henry <gregoire.henry@tezos.com>`
271 lines
10 KiB
OCaml
271 lines
10 KiB
OCaml
(*****************************************************************************)
|
|
(* *)
|
|
(* Open Source License *)
|
|
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
|
(* *)
|
|
(* Permission is hereby granted, free of charge, to any person obtaining a *)
|
|
(* copy of this software and associated documentation files (the "Software"),*)
|
|
(* to deal in the Software without restriction, including without limitation *)
|
|
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
|
|
(* and/or sell copies of the Software, and to permit persons to whom the *)
|
|
(* Software is furnished to do so, subject to the following conditions: *)
|
|
(* *)
|
|
(* The above copyright notice and this permission notice shall be included *)
|
|
(* in all copies or substantial portions of the Software. *)
|
|
(* *)
|
|
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
|
|
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
|
|
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
|
|
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
|
|
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
|
|
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
|
|
(* DEALINGS IN THE SOFTWARE. *)
|
|
(* *)
|
|
(*****************************************************************************)
|
|
|
|
open Block_validator_worker_state
|
|
open Block_validator_errors
|
|
|
|
type limits = {
|
|
protocol_timeout: float ;
|
|
worker_limits : Worker_types.limits ;
|
|
}
|
|
|
|
type validator_kind = Block_validator_process.validator_kind =
|
|
| Internal of Context.index
|
|
|
|
module Name = struct
|
|
type t = unit
|
|
let encoding = Data_encoding.empty
|
|
let base = [ "validator.block" ]
|
|
let pp _ () = ()
|
|
end
|
|
|
|
module Types = struct
|
|
include Worker_state
|
|
type state = {
|
|
protocol_validator: Protocol_validator.t ;
|
|
validation_process: Block_validator_process.t ;
|
|
limits : limits ;
|
|
}
|
|
type parameters = limits * Distributed_db.t * Block_validator_process.validator_kind
|
|
let view _state _parameters = ()
|
|
end
|
|
|
|
module Request = struct
|
|
include Request
|
|
type 'a t =
|
|
| Request_validation : {
|
|
chain_db: Distributed_db.chain_db ;
|
|
notify_new_block: State.Block.t -> unit ;
|
|
canceler: Lwt_canceler.t option ;
|
|
peer: P2p_peer.Id.t option ;
|
|
hash: Block_hash.t ;
|
|
header: Block_header.t ;
|
|
operations: Operation.t list list ;
|
|
} -> State.Block.t option tzresult t
|
|
let view
|
|
: type a. a t -> view
|
|
= fun (Request_validation { chain_db ; peer ; hash }) ->
|
|
let chain_id = chain_db |> Distributed_db.chain_state |> State.Chain.id in
|
|
{ chain_id ; block = hash ; peer = peer }
|
|
end
|
|
|
|
module Worker = Worker.Make (Name) (Event) (Request) (Types)
|
|
|
|
type t = Worker.infinite Worker.queue Worker.t
|
|
type error += Closed = Worker.Closed
|
|
|
|
let debug w =
|
|
Format.kasprintf (fun msg -> Worker.record_event w (Debug msg))
|
|
|
|
let check_chain_liveness chain_db hash (header: Block_header.t) =
|
|
let chain_state = Distributed_db.chain_state chain_db in
|
|
match State.Chain.expiration chain_state with
|
|
| Some eol when Time.(eol <= header.shell.timestamp) ->
|
|
fail @@ invalid_block hash @@
|
|
Expired_chain { chain_id = State.Chain.id chain_state ;
|
|
expiration = eol ;
|
|
timestamp = header.shell.timestamp }
|
|
| None | Some _ -> return_unit
|
|
|
|
let get_proto pred hash =
|
|
State.Block.context pred >>= fun pred_context ->
|
|
Context.get_protocol pred_context >>= fun pred_protocol_hash ->
|
|
match Registered_protocol.get pred_protocol_hash with
|
|
| None ->
|
|
fail (Unavailable_protocol { block = hash ;
|
|
protocol = pred_protocol_hash })
|
|
| Some p -> return p
|
|
|
|
|
|
let on_request
|
|
: type r. t -> r Request.t -> r tzresult Lwt.t
|
|
= fun w
|
|
(Request.Request_validation
|
|
{ chain_db ; notify_new_block ; canceler ;
|
|
peer ; hash ; header ; operations }) ->
|
|
let bv = Worker.state w in
|
|
let chain_state = Distributed_db.chain_state chain_db in
|
|
State.Block.read_opt chain_state hash >>= function
|
|
| Some block ->
|
|
debug w "previously validated block %a (after pipe)"
|
|
Block_hash.pp_short hash ;
|
|
Protocol_validator.prefetch_and_compile_protocols
|
|
bv.protocol_validator
|
|
?peer ~timeout:bv.limits.protocol_timeout
|
|
block ;
|
|
return (Ok None)
|
|
| None ->
|
|
State.Block.read_invalid chain_state hash >>= function
|
|
| Some { errors } ->
|
|
return (Error errors)
|
|
| None ->
|
|
begin
|
|
debug w "validating block %a" Block_hash.pp_short hash ;
|
|
State.Block.read
|
|
chain_state header.shell.predecessor >>=? fun pred ->
|
|
(* TODO also protect with [Worker.canceler w]. *)
|
|
protect ?canceler begin fun () ->
|
|
Block_validator_process.apply_block
|
|
bv.validation_process
|
|
~predecessor:pred
|
|
header operations >>=? fun { validation_result ; block_metadata ;
|
|
ops_metadata ; context_hash } ->
|
|
let validation_store =
|
|
({ context_hash ;
|
|
message = validation_result.message ;
|
|
max_operations_ttl = validation_result.max_operations_ttl ;
|
|
last_allowed_fork_level = validation_result.last_allowed_fork_level} :
|
|
State.Block.validation_store) in
|
|
Distributed_db.commit_block
|
|
chain_db hash
|
|
header block_metadata operations ops_metadata
|
|
validation_store >>=? function
|
|
| None -> assert false (* should not happen *)
|
|
| Some block -> return block
|
|
end
|
|
end >>= function
|
|
| Ok block ->
|
|
Protocol_validator.prefetch_and_compile_protocols
|
|
bv.protocol_validator
|
|
?peer ~timeout:bv.limits.protocol_timeout
|
|
block ;
|
|
notify_new_block block ;
|
|
return (Ok (Some block))
|
|
(* TODO catch other temporary error (e.g. system errors)
|
|
and do not 'commit' them on disk... *)
|
|
| Error [Canceled | Unavailable_protocol _] as err ->
|
|
(* FIXME: Canceled can escape. Canceled is not registered. BOOM! *)
|
|
return err
|
|
| Error errors ->
|
|
Worker.protect w begin fun () ->
|
|
Distributed_db.commit_invalid_block
|
|
chain_db hash header errors
|
|
end >>=? fun commited ->
|
|
assert commited ;
|
|
return (Error errors)
|
|
|
|
let on_launch _ _ (limits, db, validation_kind) =
|
|
let protocol_validator = Protocol_validator.create db in
|
|
Block_validator_process.init validation_kind >>= fun validation_process ->
|
|
Lwt.return { Types.protocol_validator ; validation_process ; limits }
|
|
|
|
let on_error w r st errs =
|
|
Worker.record_event w (Validation_failure (r, st, errs)) ;
|
|
Lwt.return (Error errs)
|
|
|
|
let on_completion
|
|
: type a. t -> a Request.t -> a -> Worker_types.request_status -> unit Lwt.t
|
|
= fun w (Request.Request_validation _ as r) v st ->
|
|
match v with
|
|
| Ok (Some _) ->
|
|
Worker.record_event w
|
|
(Event.Validation_success (Request.view r, st)) ;
|
|
Lwt.return_unit
|
|
| Ok None ->
|
|
Lwt.return_unit
|
|
| Error errs ->
|
|
Worker.record_event w
|
|
(Event.Validation_failure (Request.view r, st, errs)) ;
|
|
Lwt.return_unit
|
|
|
|
let on_close w =
|
|
let bv = Worker.state w in
|
|
Block_validator_process.close bv.validation_process
|
|
|
|
let table = Worker.create_table Queue
|
|
|
|
let create limits db validation_process_kind =
|
|
let module Handlers = struct
|
|
type self = t
|
|
let on_launch = on_launch
|
|
let on_request = on_request
|
|
let on_close = on_close
|
|
let on_error = on_error
|
|
let on_completion = on_completion
|
|
let on_no_request _ = return_unit
|
|
end in
|
|
Worker.launch
|
|
table
|
|
limits.worker_limits
|
|
()
|
|
(limits, db, validation_process_kind)
|
|
(module Handlers)
|
|
|
|
let shutdown = Worker.shutdown
|
|
|
|
let validate w
|
|
?canceler ?peer ?(notify_new_block = fun _ -> ())
|
|
chain_db hash (header : Block_header.t) operations =
|
|
let bv = Worker.state w in
|
|
let chain_state = Distributed_db.chain_state chain_db in
|
|
State.Block.read_opt chain_state hash >>= function
|
|
| Some block ->
|
|
debug w "previously validated block %a (before pipe)"
|
|
Block_hash.pp_short hash ;
|
|
Protocol_validator.prefetch_and_compile_protocols
|
|
bv.protocol_validator
|
|
?peer ~timeout:bv.limits.protocol_timeout
|
|
block ;
|
|
return_none
|
|
| None ->
|
|
map_p (map_p (fun op ->
|
|
let op_hash = Operation.hash op in
|
|
return op_hash))
|
|
operations >>=? fun hashes ->
|
|
let computed_hash =
|
|
Operation_list_list_hash.compute
|
|
(List.map Operation_list_hash.compute hashes) in
|
|
fail_when
|
|
(Operation_list_list_hash.compare
|
|
computed_hash header.shell.operations_hash <> 0)
|
|
(Inconsistent_operations_hash {
|
|
block = hash ;
|
|
expected = header.shell.operations_hash ;
|
|
found = computed_hash ;
|
|
}) >>=? fun () ->
|
|
check_chain_liveness chain_db hash header >>=? fun () ->
|
|
Worker.push_request_and_wait w
|
|
(Request_validation
|
|
{ chain_db ; notify_new_block ; canceler ;
|
|
peer ; hash ; header ; operations }) >>=? fun result ->
|
|
Lwt.return result
|
|
|
|
let fetch_and_compile_protocol w =
|
|
let bv = Worker.state w in
|
|
Protocol_validator.fetch_and_compile_protocol bv.protocol_validator
|
|
|
|
let status = Worker.status
|
|
|
|
let running_worker () =
|
|
match Worker.list table with
|
|
| (_, single) :: _ -> single
|
|
| [] -> raise Not_found
|
|
|
|
let pending_requests t = Worker.pending_requests t
|
|
|
|
let current_request t = Worker.current_request t
|
|
|
|
let last_events = Worker.last_events
|