Node: switch the block validator to Tezos_worker

This commit is contained in:
Grégoire Henry 2018-01-26 13:10:20 +01:00
parent 4e04e233a0
commit 6d3f5af163
15 changed files with 700 additions and 465 deletions

View File

@ -54,6 +54,7 @@ and log = {
and shell = { and shell = {
bootstrap_threshold : int ; bootstrap_threshold : int ;
block_validator_limits : Node.block_validator_limits ;
prevalidator_limits : Node.prevalidator_limits ; prevalidator_limits : Node.prevalidator_limits ;
timeout : Node.timeout ; timeout : Node.timeout ;
} }
@ -105,6 +106,15 @@ let default_log = {
let default_shell = { let default_shell = {
bootstrap_threshold = 4 ; bootstrap_threshold = 4 ;
block_validator_limits = {
protocol_timeout = 120. ;
worker_limits = {
backlog_size = 1000 ;
backlog_level = Logging.Debug ;
zombie_lifetime = 3600. ;
zombie_memory = 1800. ;
}
} ;
prevalidator_limits = { prevalidator_limits = {
operation_timeout = 10. ; operation_timeout = 10. ;
max_refused_operations = 1000 ; max_refused_operations = 1000 ;
@ -284,6 +294,23 @@ let worker_limits_encoding
let timeout_encoding = let timeout_encoding =
Data_encoding.ranged_float 0. 500. Data_encoding.ranged_float 0. 500.
let block_validator_limits_encoding =
let open Data_encoding in
conv
(fun { Node.protocol_timeout ; worker_limits } ->
(protocol_timeout, worker_limits))
(fun (protocol_timeout, worker_limits) ->
{ protocol_timeout ; worker_limits})
(merge_objs
(obj1
(dft "protocol_request_timeout" timeout_encoding
default_shell.block_validator_limits.protocol_timeout))
(worker_limits_encoding
default_shell.block_validator_limits.worker_limits.backlog_size
default_shell.block_validator_limits.worker_limits.backlog_level
default_shell.block_validator_limits.worker_limits.zombie_lifetime
default_shell.block_validator_limits.worker_limits.zombie_memory))
let prevalidator_limits_encoding = let prevalidator_limits_encoding =
let open Data_encoding in let open Data_encoding in
conv conv
@ -325,13 +352,19 @@ let timeout_encoding =
let shell = let shell =
let open Data_encoding in let open Data_encoding in
conv conv
(fun { bootstrap_threshold ; timeout ; prevalidator_limits } -> (fun { bootstrap_threshold ; timeout ;
bootstrap_threshold, timeout, prevalidator_limits) block_validator_limits ; prevalidator_limits } ->
(fun (bootstrap_threshold, timeout, prevalidator_limits) -> bootstrap_threshold, timeout,
{ bootstrap_threshold ; timeout ; prevalidator_limits }) block_validator_limits, prevalidator_limits)
(obj3 (fun (bootstrap_threshold, timeout,
block_validator_limits, prevalidator_limits) ->
{ bootstrap_threshold ; timeout ;
block_validator_limits ;
prevalidator_limits })
(obj4
(dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold) (dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold)
(dft "timeout" timeout_encoding default_shell.timeout) (dft "timeout" timeout_encoding default_shell.timeout)
(dft "block_validator" block_validator_limits_encoding default_shell.block_validator_limits)
(dft "prevalidator" prevalidator_limits_encoding default_shell.prevalidator_limits) (dft "prevalidator" prevalidator_limits_encoding default_shell.prevalidator_limits)
) )
@ -451,6 +484,7 @@ let update
~default:cfg.shell.bootstrap_threshold ~default:cfg.shell.bootstrap_threshold
bootstrap_threshold ; bootstrap_threshold ;
timeout = cfg.shell.timeout ; timeout = cfg.shell.timeout ;
block_validator_limits = cfg.shell.block_validator_limits ;
prevalidator_limits = cfg.shell.prevalidator_limits ; prevalidator_limits = cfg.shell.prevalidator_limits ;
} }
in in

View File

@ -44,6 +44,7 @@ and log = {
and shell = { and shell = {
bootstrap_threshold : int ; bootstrap_threshold : int ;
block_validator_limits : Node.block_validator_limits ;
prevalidator_limits : Node.prevalidator_limits ; prevalidator_limits : Node.prevalidator_limits ;
timeout : Node.timeout ; timeout : Node.timeout ;
} }

View File

@ -173,6 +173,7 @@ let init_node ?sandbox (config : Node_config_file.t) =
Node.create Node.create
node_config node_config
config.shell.timeout config.shell.timeout
config.shell.block_validator_limits
config.shell.prevalidator_limits config.shell.prevalidator_limits
let () = let () =

View File

@ -7,294 +7,58 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
include Logging.Make(struct let name = "node.validator.block" end) open Block_validator_worker_state
open Block_validator_errors
type 'a request =
| Request_validation: {
net_db: Distributed_db.net_db ;
notify_new_block: State.Block.t -> unit ;
canceler: Lwt_canceler.t option ;
peer: P2p.Peer_id.t option ;
hash: Block_hash.t ;
header: Block_header.t ;
operations: Operation.t list list ;
} -> State.Block.t tzresult request
type message = Message: 'a request * 'a Lwt.u option -> message
type t = {
protocol_validator: Protocol_validator.t ;
protocol_timeout: float ;
mutable worker: unit Lwt.t ;
messages: message Lwt_pipe.t ;
canceler: Lwt_canceler.t ;
}
(** Block validation *)
type block_error =
| Cannot_parse_operation of Operation_hash.t
| Invalid_fitness of { expected: Fitness.t ; found: Fitness.t }
| Non_increasing_timestamp
| Non_increasing_fitness
| Invalid_level of { expected: Int32.t ; found: Int32.t }
| Invalid_proto_level of { expected: int ; found: int }
| Replayed_operation of Operation_hash.t
| Outdated_operation of
{ operation: Operation_hash.t;
originating_block: Block_hash.t }
| Expired_network of
{ net_id: Net_id.t ;
expiration: Time.t ;
timestamp: Time.t ;
}
| Unexpected_number_of_validation_passes of int (* uint8 *)
| Too_many_operations of { pass: int; found: int; max: int }
| Oversized_operation of { operation: Operation_hash.t;
size: int; max: int }
let block_error_encoding =
let open Data_encoding in
union
[
case (Tag 0)
(obj2
(req "error" (constant "cannot_parse_operation"))
(req "operation" Operation_hash.encoding))
(function Cannot_parse_operation operation -> Some ((), operation)
| _ -> None)
(fun ((), operation) -> Cannot_parse_operation operation) ;
case (Tag 1)
(obj3
(req "error" (constant "invalid_fitness"))
(req "expected" Fitness.encoding)
(req "found" Fitness.encoding))
(function
| Invalid_fitness { expected ; found } ->
Some ((), expected, found)
| _ -> None)
(fun ((), expected, found) -> Invalid_fitness { expected ; found }) ;
case (Tag 2)
(obj1
(req "error" (constant "non_increasing_timestamp")))
(function Non_increasing_timestamp -> Some ()
| _ -> None)
(fun () -> Non_increasing_timestamp) ;
case (Tag 3)
(obj1
(req "error" (constant "non_increasing_fitness")))
(function Non_increasing_fitness -> Some ()
| _ -> None)
(fun () -> Non_increasing_fitness) ;
case (Tag 4)
(obj3
(req "error" (constant "invalid_level"))
(req "expected" int32)
(req "found" int32))
(function
| Invalid_level { expected ; found } ->
Some ((), expected, found)
| _ -> None)
(fun ((), expected, found) -> Invalid_level { expected ; found }) ;
case (Tag 5)
(obj3
(req "error" (constant "invalid_proto_level"))
(req "expected" uint8)
(req "found" uint8))
(function
| Invalid_proto_level { expected ; found } ->
Some ((), expected, found)
| _ -> None)
(fun ((), expected, found) ->
Invalid_proto_level { expected ; found }) ;
case (Tag 6)
(obj2
(req "error" (constant "replayed_operation"))
(req "operation" Operation_hash.encoding))
(function Replayed_operation operation -> Some ((), operation)
| _ -> None)
(fun ((), operation) -> Replayed_operation operation) ;
case (Tag 7)
(obj3
(req "error" (constant "outdated_operation"))
(req "operation" Operation_hash.encoding)
(req "originating_block" Block_hash.encoding))
(function
| Outdated_operation { operation ; originating_block } ->
Some ((), operation, originating_block)
| _ -> None)
(fun ((), operation, originating_block) ->
Outdated_operation { operation ; originating_block }) ;
case (Tag 8)
(obj2
(req "error" (constant "unexpected_number_of_passes"))
(req "found" uint8))
(function
| Unexpected_number_of_validation_passes n -> Some ((), n)
| _ -> None)
(fun ((), n) -> Unexpected_number_of_validation_passes n) ;
case (Tag 9)
(obj4
(req "error" (constant "too_many_operations"))
(req "validation_pass" uint8)
(req "found" uint16)
(req "max" uint16))
(function
| Too_many_operations { pass ; found ; max } ->
Some ((), pass, found, max)
| _ -> None)
(fun ((), pass, found, max) ->
Too_many_operations { pass ; found ; max }) ;
case (Tag 10)
(obj4
(req "error" (constant "oversized_operation"))
(req "operation" Operation_hash.encoding)
(req "found" int31)
(req "max" int31))
(function
| Oversized_operation { operation ; size ; max } ->
Some ((), operation, size, max)
| _ -> None)
(fun ((), operation, size, max) ->
Oversized_operation { operation ; size ; max }) ;
]
let pp_block_error ppf = function
| Cannot_parse_operation oph ->
Format.fprintf ppf
"Failed to parse the operation %a."
Operation_hash.pp_short oph
| Invalid_fitness { expected ; found } ->
Format.fprintf ppf
"@[<v 2>Invalid fitness:@ \
\ expected %a@ \
\ found %a@]"
Fitness.pp expected
Fitness.pp found
| Non_increasing_timestamp ->
Format.fprintf ppf "Non increasing timestamp"
| Non_increasing_fitness ->
Format.fprintf ppf "Non increasing fitness"
| Invalid_level { expected ; found } ->
Format.fprintf ppf
"Invalid level:@ \
\ expected %ld@ \
\ found %ld"
expected
found
| Invalid_proto_level { expected ; found } ->
Format.fprintf ppf
"Invalid protocol level:@ \
\ expected %d@ \
\ found %d"
expected
found
| Replayed_operation oph ->
Format.fprintf ppf
"The operation %a was previously included in the chain."
Operation_hash.pp_short oph
| Outdated_operation { operation ; originating_block } ->
Format.fprintf ppf
"The operation %a is outdated (originated in block: %a)"
Operation_hash.pp_short operation
Block_hash.pp_short originating_block
| Expired_network { net_id ; expiration ; timestamp } ->
Format.fprintf ppf
"The block timestamp (%a) is later than \
its network expiration date: %a (net: %a)."
Time.pp_hum timestamp
Time.pp_hum expiration
Net_id.pp_short net_id
| Unexpected_number_of_validation_passes n ->
Format.fprintf ppf
"Invalid number of validation passes (found: %d)"
n
| Too_many_operations { pass ; found ; max } ->
Format.fprintf ppf
"Too many operations in validation pass %d (found: %d, max: %d)"
pass found max
| Oversized_operation { operation ; size ; max } ->
Format.fprintf ppf
"Oversized operation %a (size: %d, max: %d)"
Operation_hash.pp_short operation size max
type error +=
| Invalid_block of
{ block: Block_hash.t ; error: block_error }
| Unavailable_protocol of
{ block: Block_hash.t ; protocol: Protocol_hash.t }
| Inconsistent_operations_hash of
{ block: Block_hash.t ;
expected: Operation_list_list_hash.t ;
found: Operation_list_list_hash.t }
let invalid_block block error = Invalid_block { block ; error } let invalid_block block error = Invalid_block { block ; error }
let () = type limits = {
Error_monad.register_error_kind protocol_timeout: float ;
`Permanent worker_limits : Worker_types.limits ;
~id:"validator.invalid_block" }
~title:"Invalid block"
~description:"Invalid block." module Name = struct
~pp:begin fun ppf (block, error) -> type t = unit
Format.fprintf ppf let encoding = Data_encoding.empty
"@[<v 2>Invalid block %a@ %a@]" let base = [ "validator.block" ]
Block_hash.pp_short block pp_block_error error let pp _ () = ()
end end
Data_encoding.(merge_objs
(obj1 (req "invalid_block" Block_hash.encoding)) module Types = struct
block_error_encoding) include Worker_state
(function Invalid_block { block ; error } -> type state = {
Some (block, error) | _ -> None) protocol_validator: Protocol_validator.t ;
(fun (block, error) -> limits : limits ;
Invalid_block { block ; error }) ; }
Error_monad.register_error_kind type parameters = limits * Distributed_db.t
`Temporary let view _state _parameters = ()
~id:"validator.unavailable_protocol" end
~title:"Missing protocol"
~description:"The protocol required for validating a block is missing." module Request = struct
~pp:begin fun ppf (block, protocol) -> include Request
Format.fprintf ppf type 'a t =
"Missing protocol (%a) when validating the block %a." | Request_validation : {
Protocol_hash.pp_short protocol net_db: Distributed_db.net_db ;
Block_hash.pp_short block notify_new_block: State.Block.t -> unit ;
end canceler: Lwt_canceler.t option ;
Data_encoding.( peer: P2p.Peer_id.t option ;
obj2 hash: Block_hash.t ;
(req "block" Block_hash.encoding) header: Block_header.t ;
(req "missing_protocol" Protocol_hash.encoding)) operations: Operation.t list list ;
(function } -> State.Block.t tzresult t
| Unavailable_protocol { block ; protocol } -> let view
Some (block, protocol) : type a. a t -> view
| _ -> None) = fun (Request_validation { net_db ; peer ; hash }) ->
(fun (block, protocol) -> Unavailable_protocol { block ; protocol }) ; let net_id = net_db |> Distributed_db.net_state |> State.Net.id in
Error_monad.register_error_kind { net_id ; block = hash ; peer = peer }
`Temporary end
~id:"validator.inconsistent_operations_hash"
~title:"Invalid merkle tree" module Worker = Worker.Make (Name) (Event) (Request) (Types)
~description:"The provided list of operations is inconsistent with \
the block header." type t = Worker.infinite Worker.queue Worker.t
~pp:begin fun ppf (block, expected, found) -> type error += Closed = Worker.Closed
Format.fprintf ppf
"@[<v 2>The provided list of operations for block %a \ let debug w =
\ is inconsistent with the block header@ \ Format.kasprintf (fun msg -> Worker.record_event w (Debug msg))
\ expected: %a@ \
\ found: %a@]"
Block_hash.pp_short block
Operation_list_list_hash.pp_short expected
Operation_list_list_hash.pp_short found
end
Data_encoding.(
obj3
(req "block" Block_hash.encoding)
(req "expected" Operation_list_list_hash.encoding)
(req "found" Operation_list_list_hash.encoding))
(function
| Inconsistent_operations_hash { block ; expected ; found } ->
Some (block, expected, found)
| _ -> None)
(fun (block, expected, found) ->
Inconsistent_operations_hash { block ; expected ; found })
let check_header let check_header
(pred: State.Block.t) hash (header: Block_header.t) = (pred: State.Block.t) hash (header: Block_header.t) =
@ -442,137 +206,124 @@ let get_proto pred hash =
protocol = pred_protocol_hash }) protocol = pred_protocol_hash })
| Some p -> return p | Some p -> return p
let rec worker_loop bv = let on_request
begin : type r. t -> r Request.t -> r tzresult Lwt.t
Lwt_utils.protect ~canceler:bv.canceler begin fun () -> = fun w
Lwt_pipe.pop bv.messages >>= return (Request.Request_validation
end >>=? fun (Message (request, wakener)) -> { net_db ; notify_new_block ; canceler ;
let may_wakeup = peer ; hash ; header ; operations }) ->
match wakener with let bv = Worker.state w in
| None -> (fun _ -> ()) let net_state = Distributed_db.net_state net_db in
| Some wakener -> (fun v -> Lwt.wakeup_later wakener v) State.Block.read_opt net_state hash >>= function
in | Some block ->
match request with debug w "previously validated block %a (after pipe)"
| Request_validation { net_db ; notify_new_block ; canceler ; Block_hash.pp_short hash ;
peer ; hash ; header ; operations } -> Protocol_validator.prefetch_and_compile_protocols
let net_state = Distributed_db.net_state net_db in bv.protocol_validator
State.Block.read_opt net_state hash >>= function ?peer ~timeout:bv.limits.protocol_timeout
| Some block -> block ;
lwt_debug "previously validated block %a (after pipe)" return (Ok block)
Block_hash.pp_short hash >>= fun () -> | None ->
Protocol_validator.prefetch_and_compile_protocols State.Block.read_invalid net_state hash >>= function
bv.protocol_validator | Some { errors } ->
?peer ~timeout:bv.protocol_timeout return (Error errors)
block ;
may_wakeup (Ok block) ;
return ()
| None -> | None ->
State.Block.read_invalid net_state hash >>= function begin
| Some { errors } -> debug w "validating block %a" Block_hash.pp_short hash ;
may_wakeup (Error errors) ; State.Block.read
return () net_state header.shell.predecessor >>=? fun pred ->
| None -> get_proto pred hash >>=? fun proto ->
begin (* TODO also protect with [Worker.canceler w]. *)
lwt_debug "validating block %a" Lwt_utils.protect ?canceler begin fun () ->
Block_hash.pp_short hash >>= fun () -> apply_block
State.Block.read (Distributed_db.net_state net_db)
net_state header.shell.predecessor >>=? fun pred -> pred proto hash header operations
get_proto pred hash >>=? fun proto -> end
(* TODO also protect with [bv.canceler]. *) end >>= function
Lwt_utils.protect ?canceler begin fun () -> | Ok result -> begin
apply_block Worker.protect w begin fun () ->
(Distributed_db.net_state net_db) Distributed_db.commit_block
pred proto hash header operations net_db hash header operations result
end end >>=? function
end >>= function | None ->
| Ok result -> begin assert false (* should not happen *)
Lwt_utils.protect ~canceler:bv.canceler begin fun () -> | Some block ->
Distributed_db.commit_block Protocol_validator.prefetch_and_compile_protocols
net_db hash header operations result bv.protocol_validator
end >>=? function ?peer ~timeout:bv.limits.protocol_timeout
| None -> block ;
assert false (* should not happen *) notify_new_block block ;
| Some block -> return (Ok block)
lwt_log_info "validated block %a" end
Block_hash.pp_short hash >>= fun () -> (* TODO catch other temporary error (e.g. system errors)
Protocol_validator.prefetch_and_compile_protocols and do not 'commit' them on disk... *)
bv.protocol_validator | Error [Lwt_utils.Canceled | Unavailable_protocol _] as err ->
?peer ~timeout:bv.protocol_timeout return err
block ; | Error errors ->
may_wakeup (Ok block) ; Worker.protect w begin fun () ->
notify_new_block block ; Distributed_db.commit_invalid_block
return () net_db hash header errors
end end >>=? fun commited ->
(* TODO catch other temporary error (e.g. system errors) assert commited ;
and do not 'commit' them on disk... *) return (Error errors)
| Error [Lwt_utils.Canceled | Unavailable_protocol _] as err ->
may_wakeup err ;
return ()
| Error errors as err ->
lwt_log_error "@[<v 2>Received invalid block %a:@ %a@]"
Block_hash.pp_short hash
Error_monad.pp_print_error errors >>= fun () ->
Lwt_utils.protect ~canceler:bv.canceler begin fun () ->
Distributed_db.commit_invalid_block
net_db hash header errors
end >>=? fun commited ->
assert commited ;
may_wakeup err ;
return ()
end >>= function
| Ok () ->
worker_loop bv
| Error [Exn (Unix.Unix_error _) as err] ->
lwt_log_error "validation failed with %a"
pp_print_error [err] >>= fun () ->
worker_loop bv
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
lwt_log_notice "terminating" >>= fun () ->
Lwt.return_unit
| Error err ->
lwt_log_error "@[Unexpected error:@ %a@]"
pp_print_error err >>= fun () ->
Lwt_canceler.cancel bv.canceler >>= fun () ->
Lwt.return_unit
let create ~protocol_timeout db = let on_launch _ _ (limits, db) =
let protocol_validator = Protocol_validator.create db in let protocol_validator = Protocol_validator.create db in
let canceler = Lwt_canceler.create () in Lwt.return { Types.protocol_validator ; limits }
let messages = Lwt_pipe.create () in
let bv = {
protocol_validator ;
protocol_timeout ;
canceler ; messages ;
worker = Lwt.return_unit } in
Lwt_canceler.on_cancel bv.canceler begin fun () ->
Lwt_pipe.close bv.messages ;
Lwt.return_unit
end ;
bv.worker <-
Lwt_utils.worker "block_validator"
~run:(fun () -> worker_loop bv)
~cancel:(fun () -> Lwt_canceler.cancel bv.canceler) ;
bv
let shutdown { canceler ; worker } = let on_error w r st errs =
Lwt_canceler.cancel canceler >>= fun () -> Worker.record_event w (Validation_failure (r, st, errs)) ;
worker Lwt.return (Error errs)
let validate { messages ; protocol_validator ; protocol_timeout } let on_completion
: type a. t -> a Request.t -> a -> Worker_types.request_status -> unit Lwt.t
= fun w (Request.Request_validation _ as r) v st ->
match v with
| Ok _ ->
Worker.record_event w
(Event.Validation_success (Request.view r, st)) ;
Lwt.return ()
| Error errs ->
Worker.record_event w
(Event.Validation_failure (Request.view r, st, errs)) ;
Lwt.return ()
let table = Worker.create_table Queue
let create limits db =
let module Handlers = struct
type self = t
let on_launch = on_launch
let on_request = on_request
let on_close _ = Lwt.return ()
let on_error = on_error
let on_completion = on_completion
let on_no_request _ = return ()
end in
Worker.launch
table
limits.worker_limits
()
(limits, db)
(module Handlers)
let shutdown = Worker.shutdown
let validate w
?canceler ?peer ?(notify_new_block = fun _ -> ()) ?canceler ?peer ?(notify_new_block = fun _ -> ())
net_db hash (header : Block_header.t) operations = net_db hash (header : Block_header.t) operations =
let bv = Worker.state w in
let net_state = Distributed_db.net_state net_db in let net_state = Distributed_db.net_state net_db in
State.Block.read_opt net_state hash >>= function State.Block.read_opt net_state hash >>= function
| Some block -> | Some block ->
lwt_debug "previously validated block %a (before pipe)" debug w "previously validated block %a (before pipe)"
Block_hash.pp_short hash >>= fun () -> Block_hash.pp_short hash ;
Protocol_validator.prefetch_and_compile_protocols Protocol_validator.prefetch_and_compile_protocols
protocol_validator bv.protocol_validator
?peer ~timeout:protocol_timeout ?peer ~timeout:bv.limits.protocol_timeout
block ; block ;
return block return block
| None -> | None ->
let res, wakener = Lwt.task () in
map_p (map_p (fun op -> map_p (map_p (fun op ->
let op_hash = Operation.hash op in let op_hash = Operation.hash op in
return op_hash)) return op_hash))
@ -589,14 +340,25 @@ let validate { messages ; protocol_validator ; protocol_timeout }
found = computed_hash ; found = computed_hash ;
}) >>=? fun () -> }) >>=? fun () ->
check_net_liveness net_db hash header >>=? fun () -> check_net_liveness net_db hash header >>=? fun () ->
lwt_debug "pushing validation request for block %a" Worker.push_request_and_wait w
Block_hash.pp_short hash >>= fun () -> (Request_validation
Lwt_pipe.push messages { net_db ; notify_new_block ; canceler ;
(Message (Request_validation peer ; hash ; header ; operations }) >>=? fun result ->
{ net_db ; notify_new_block ; canceler ; Lwt.return result
peer ; hash ; header ; operations },
Some wakener)) >>= fun () ->
res
let fetch_and_compile_protocol bv = let fetch_and_compile_protocol w =
let bv = Worker.state w in
Protocol_validator.fetch_and_compile_protocol bv.protocol_validator Protocol_validator.fetch_and_compile_protocol bv.protocol_validator
let status = Worker.status
let running_worker () =
match Worker.list table with
| (_, single) :: _ -> single
| [] -> raise Not_found
let pending_requests t = Worker.pending_requests t
let current_request t = Worker.current_request t
let last_events = Worker.last_events

View File

@ -9,40 +9,15 @@
type t type t
type block_error = type limits = {
| Cannot_parse_operation of Operation_hash.t protocol_timeout: float ;
| Invalid_fitness of { expected: Fitness.t ; found: Fitness.t } worker_limits : Worker_types.limits ;
| Non_increasing_timestamp }
| Non_increasing_fitness
| Invalid_level of { expected: Int32.t ; found: Int32.t }
| Invalid_proto_level of { expected: int ; found: int }
| Replayed_operation of Operation_hash.t
| Outdated_operation of
{ operation: Operation_hash.t;
originating_block: Block_hash.t }
| Expired_network of
{ net_id: Net_id.t ;
expiration: Time.t ;
timestamp: Time.t ;
}
| Unexpected_number_of_validation_passes of int (* uint8 *)
| Too_many_operations of { pass: int; found: int; max: int }
| Oversized_operation of { operation: Operation_hash.t;
size: int; max: int }
type error += type error += Closed of unit
| Invalid_block of
{ block: Block_hash.t ; error: block_error }
| Unavailable_protocol of
{ block: Block_hash.t ; protocol: Protocol_hash.t }
| Inconsistent_operations_hash of
{ block: Block_hash.t ;
expected: Operation_list_list_hash.t ;
found: Operation_list_list_hash.t }
val create: val create:
protocol_timeout:float -> limits -> Distributed_db.t -> t Lwt.t
Distributed_db.t -> t
val validate: val validate:
t -> t ->
@ -60,3 +35,10 @@ val fetch_and_compile_protocol:
Protocol_hash.t -> State.Registred_protocol.t tzresult Lwt.t Protocol_hash.t -> State.Registred_protocol.t tzresult Lwt.t
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t
val running_worker: unit -> t
val status: t -> Worker_types.worker_status
val pending_requests : t -> (Time.t * Block_validator_worker_state.Request.view) list
val current_request : t -> (Time.t * Time.t * Block_validator_worker_state.Request.view) option
val last_events : t -> (Lwt_log_core.level * Block_validator_worker_state.Event.t list) list

View File

@ -170,8 +170,8 @@ let rec validation_worker_loop pipeline =
| Ok () -> validation_worker_loop pipeline | Ok () -> validation_worker_loop pipeline
| Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
Lwt.return_unit Lwt.return_unit
| Error ([ Block_validator.Invalid_block _ | Error ([ Block_validator_errors.Invalid_block _
| Block_validator.Unavailable_protocol _ ] as err ) -> | Block_validator_errors.Unavailable_protocol _ ] as err ) ->
(* Propagate the error to the peer validator. *) (* Propagate the error to the peer validator. *)
pipeline.errors <- pipeline.errors @ err ; pipeline.errors <- pipeline.errors @ err ;
Lwt_canceler.cancel pipeline.canceler >>= fun () -> Lwt_canceler.cancel pipeline.canceler >>= fun () ->

View File

@ -102,6 +102,11 @@ and prevalidator_limits = Prevalidator.limits = {
worker_limits : Worker_types.limits ; worker_limits : Worker_types.limits ;
} }
and block_validator_limits = Block_validator.limits = {
protocol_timeout: float ;
worker_limits : Worker_types.limits ;
}
let may_create_net state genesis = let may_create_net state genesis =
State.Net.get state (Net_id.of_block_hash genesis.State.Net.block) >>= function State.Net.get state (Net_id.of_block_hash genesis.State.Net.block) >>= function
| Ok net -> Lwt.return net | Ok net -> Lwt.return net
@ -112,13 +117,15 @@ let create { genesis ; store_root ; context_root ;
patch_context ; p2p = net_params ; patch_context ; p2p = net_params ;
test_network_max_tll = max_child_ttl ; test_network_max_tll = max_child_ttl ;
bootstrap_threshold } bootstrap_threshold }
timeout prevalidator_limits = timeout
block_validator_limits
prevalidator_limits =
init_p2p net_params >>=? fun p2p -> init_p2p net_params >>=? fun p2p ->
State.read State.read
~store_root ~context_root ?patch_context () >>=? fun state -> ~store_root ~context_root ?patch_context () >>=? fun state ->
let distributed_db = Distributed_db.create state p2p in let distributed_db = Distributed_db.create state p2p in
let validator = Validator.create state distributed_db timeout
Validator.create state distributed_db timeout prevalidator_limits in block_validator_limits prevalidator_limits >>= fun validator ->
may_create_net state genesis >>= fun mainnet_state -> may_create_net state genesis >>= fun mainnet_state ->
Validator.activate validator Validator.activate validator
~bootstrap_threshold ~bootstrap_threshold

View File

@ -30,8 +30,17 @@ and prevalidator_limits = {
operation_timeout: float ; operation_timeout: float ;
worker_limits : Worker_types.limits ; worker_limits : Worker_types.limits ;
} }
and block_validator_limits = {
protocol_timeout: float ;
worker_limits : Worker_types.limits ;
}
val create: config -> timeout -> prevalidator_limits -> t tzresult Lwt.t val create:
config ->
timeout ->
block_validator_limits ->
prevalidator_limits ->
t tzresult Lwt.t
module RPC : sig module RPC : sig

View File

@ -200,7 +200,7 @@ let rec worker_loop pv =
lwt_debug "%a" Error_monad.pp_print_error errors >>= fun () -> lwt_debug "%a" Error_monad.pp_print_error errors >>= fun () ->
Lwt_canceler.cancel pv.canceler >>= fun () -> Lwt_canceler.cancel pv.canceler >>= fun () ->
Lwt.return_unit Lwt.return_unit
| Error [Block_validator.Unavailable_protocol { protocol } ] -> begin | Error [Block_validator_errors.Unavailable_protocol { protocol } ] -> begin
Block_validator.fetch_and_compile_protocol Block_validator.fetch_and_compile_protocol
pv.block_validator pv.block_validator
~peer:pv.peer_id ~peer:pv.peer_id

View File

@ -15,6 +15,7 @@ type t = {
db: Distributed_db.t ; db: Distributed_db.t ;
block_validator: Block_validator.t ; block_validator: Block_validator.t ;
timeout: Net_validator.timeout ; timeout: Net_validator.timeout ;
block_validator_limits: Block_validator.limits ;
prevalidator_limits: Prevalidator.limits ; prevalidator_limits: Prevalidator.limits ;
valid_block_input: State.Block.t Lwt_watcher.input ; valid_block_input: State.Block.t Lwt_watcher.input ;
@ -22,16 +23,17 @@ type t = {
} }
let create state db timeout prevalidator_limits = let create state db timeout
let block_validator = block_validator_limits
Block_validator.create prevalidator_limits =
~protocol_timeout:timeout.Net_validator.protocol Block_validator.create block_validator_limits db >>= fun block_validator ->
db in
let valid_block_input = Lwt_watcher.create_input () in let valid_block_input = Lwt_watcher.create_input () in
{ state ; db ; timeout ; prevalidator_limits ; block_validator ; Lwt.return
valid_block_input ; { state ; db ; timeout ; block_validator ;
active_nets = Net_id.Table.create 7 ; prevalidator_limits ; block_validator_limits ;
} valid_block_input ;
active_nets = Net_id.Table.create 7 ;
}
let activate v ?bootstrap_threshold ?max_child_ttl net_state = let activate v ?bootstrap_threshold ?max_child_ttl net_state =
let net_id = State.Net.id net_state in let net_id = State.Net.id net_state in

View File

@ -15,8 +15,9 @@ val create:
State.t -> State.t ->
Distributed_db.t -> Distributed_db.t ->
Net_validator.timeout -> Net_validator.timeout ->
Block_validator.limits ->
Prevalidator.limits -> Prevalidator.limits ->
t t Lwt.t
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t
(** Start the validation scheduler of a given network. *) (** Start the validation scheduler of a given network. *)

View File

@ -0,0 +1,271 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type block_error =
| Cannot_parse_operation of Operation_hash.t
| Invalid_fitness of { expected: Fitness.t ; found: Fitness.t }
| Non_increasing_timestamp
| Non_increasing_fitness
| Invalid_level of { expected: Int32.t ; found: Int32.t }
| Invalid_proto_level of { expected: int ; found: int }
| Replayed_operation of Operation_hash.t
| Outdated_operation of
{ operation: Operation_hash.t;
originating_block: Block_hash.t }
| Expired_network of
{ net_id: Net_id.t ;
expiration: Time.t ;
timestamp: Time.t ;
}
| Unexpected_number_of_validation_passes of int (* uint8 *)
| Too_many_operations of { pass: int; found: int; max: int }
| Oversized_operation of { operation: Operation_hash.t;
size: int; max: int }
let block_error_encoding =
let open Data_encoding in
union
[
case (Tag 0)
(obj2
(req "error" (constant "cannot_parse_operation"))
(req "operation" Operation_hash.encoding))
(function Cannot_parse_operation operation -> Some ((), operation)
| _ -> None)
(fun ((), operation) -> Cannot_parse_operation operation) ;
case (Tag 1)
(obj3
(req "error" (constant "invalid_fitness"))
(req "expected" Fitness.encoding)
(req "found" Fitness.encoding))
(function
| Invalid_fitness { expected ; found } ->
Some ((), expected, found)
| _ -> None)
(fun ((), expected, found) -> Invalid_fitness { expected ; found }) ;
case (Tag 2)
(obj1
(req "error" (constant "non_increasing_timestamp")))
(function Non_increasing_timestamp -> Some ()
| _ -> None)
(fun () -> Non_increasing_timestamp) ;
case (Tag 3)
(obj1
(req "error" (constant "non_increasing_fitness")))
(function Non_increasing_fitness -> Some ()
| _ -> None)
(fun () -> Non_increasing_fitness) ;
case (Tag 4)
(obj3
(req "error" (constant "invalid_level"))
(req "expected" int32)
(req "found" int32))
(function
| Invalid_level { expected ; found } ->
Some ((), expected, found)
| _ -> None)
(fun ((), expected, found) -> Invalid_level { expected ; found }) ;
case (Tag 5)
(obj3
(req "error" (constant "invalid_proto_level"))
(req "expected" uint8)
(req "found" uint8))
(function
| Invalid_proto_level { expected ; found } ->
Some ((), expected, found)
| _ -> None)
(fun ((), expected, found) ->
Invalid_proto_level { expected ; found }) ;
case (Tag 6)
(obj2
(req "error" (constant "replayed_operation"))
(req "operation" Operation_hash.encoding))
(function Replayed_operation operation -> Some ((), operation)
| _ -> None)
(fun ((), operation) -> Replayed_operation operation) ;
case (Tag 7)
(obj3
(req "error" (constant "outdated_operation"))
(req "operation" Operation_hash.encoding)
(req "originating_block" Block_hash.encoding))
(function
| Outdated_operation { operation ; originating_block } ->
Some ((), operation, originating_block)
| _ -> None)
(fun ((), operation, originating_block) ->
Outdated_operation { operation ; originating_block }) ;
case (Tag 8)
(obj2
(req "error" (constant "unexpected_number_of_passes"))
(req "found" uint8))
(function
| Unexpected_number_of_validation_passes n -> Some ((), n)
| _ -> None)
(fun ((), n) -> Unexpected_number_of_validation_passes n) ;
case (Tag 9)
(obj4
(req "error" (constant "too_many_operations"))
(req "validation_pass" uint8)
(req "found" uint16)
(req "max" uint16))
(function
| Too_many_operations { pass ; found ; max } ->
Some ((), pass, found, max)
| _ -> None)
(fun ((), pass, found, max) ->
Too_many_operations { pass ; found ; max }) ;
case (Tag 10)
(obj4
(req "error" (constant "oversized_operation"))
(req "operation" Operation_hash.encoding)
(req "found" int31)
(req "max" int31))
(function
| Oversized_operation { operation ; size ; max } ->
Some ((), operation, size, max)
| _ -> None)
(fun ((), operation, size, max) ->
Oversized_operation { operation ; size ; max }) ;
]
let pp_block_error ppf = function
| Cannot_parse_operation oph ->
Format.fprintf ppf
"Failed to parse the operation %a."
Operation_hash.pp_short oph
| Invalid_fitness { expected ; found } ->
Format.fprintf ppf
"@[<v 2>Invalid fitness:@ \
\ expected %a@ \
\ found %a@]"
Fitness.pp expected
Fitness.pp found
| Non_increasing_timestamp ->
Format.fprintf ppf "Non increasing timestamp"
| Non_increasing_fitness ->
Format.fprintf ppf "Non increasing fitness"
| Invalid_level { expected ; found } ->
Format.fprintf ppf
"Invalid level:@ \
\ expected %ld@ \
\ found %ld"
expected
found
| Invalid_proto_level { expected ; found } ->
Format.fprintf ppf
"Invalid protocol level:@ \
\ expected %d@ \
\ found %d"
expected
found
| Replayed_operation oph ->
Format.fprintf ppf
"The operation %a was previously included in the chain."
Operation_hash.pp_short oph
| Outdated_operation { operation ; originating_block } ->
Format.fprintf ppf
"The operation %a is outdated (originated in block: %a)"
Operation_hash.pp_short operation
Block_hash.pp_short originating_block
| Expired_network { net_id ; expiration ; timestamp } ->
Format.fprintf ppf
"The block timestamp (%a) is later than \
its network expiration date: %a (net: %a)."
Time.pp_hum timestamp
Time.pp_hum expiration
Net_id.pp_short net_id
| Unexpected_number_of_validation_passes n ->
Format.fprintf ppf
"Invalid number of validation passes (found: %d)"
n
| Too_many_operations { pass ; found ; max } ->
Format.fprintf ppf
"Too many operations in validation pass %d (found: %d, max: %d)"
pass found max
| Oversized_operation { operation ; size ; max } ->
Format.fprintf ppf
"Oversized operation %a (size: %d, max: %d)"
Operation_hash.pp_short operation size max
type error +=
| Invalid_block of
{ block: Block_hash.t ; error: block_error }
| Unavailable_protocol of
{ block: Block_hash.t ; protocol: Protocol_hash.t }
| Inconsistent_operations_hash of
{ block: Block_hash.t ;
expected: Operation_list_list_hash.t ;
found: Operation_list_list_hash.t }
let () =
Error_monad.register_error_kind
`Permanent
~id:"validator.invalid_block"
~title:"Invalid block"
~description:"Invalid block."
~pp:begin fun ppf (block, error) ->
Format.fprintf ppf
"@[<v 2>Invalid block %a@ %a@]"
Block_hash.pp_short block pp_block_error error
end
Data_encoding.(merge_objs
(obj1 (req "invalid_block" Block_hash.encoding))
block_error_encoding)
(function Invalid_block { block ; error } ->
Some (block, error) | _ -> None)
(fun (block, error) ->
Invalid_block { block ; error }) ;
Error_monad.register_error_kind
`Temporary
~id:"validator.unavailable_protocol"
~title:"Missing protocol"
~description:"The protocol required for validating a block is missing."
~pp:begin fun ppf (block, protocol) ->
Format.fprintf ppf
"Missing protocol (%a) when validating the block %a."
Protocol_hash.pp_short protocol
Block_hash.pp_short block
end
Data_encoding.(
obj2
(req "block" Block_hash.encoding)
(req "missing_protocol" Protocol_hash.encoding))
(function
| Unavailable_protocol { block ; protocol } ->
Some (block, protocol)
| _ -> None)
(fun (block, protocol) -> Unavailable_protocol { block ; protocol }) ;
Error_monad.register_error_kind
`Temporary
~id:"validator.inconsistent_operations_hash"
~title:"Invalid merkle tree"
~description:"The provided list of operations is inconsistent with \
the block header."
~pp:begin fun ppf (block, expected, found) ->
Format.fprintf ppf
"@[<v 2>The provided list of operations for block %a \
\ is inconsistent with the block header@ \
\ expected: %a@ \
\ found: %a@]"
Block_hash.pp_short block
Operation_list_list_hash.pp_short expected
Operation_list_list_hash.pp_short found
end
Data_encoding.(
obj3
(req "block" Block_hash.encoding)
(req "expected" Operation_list_list_hash.encoding)
(req "found" Operation_list_list_hash.encoding))
(function
| Inconsistent_operations_hash { block ; expected ; found } ->
Some (block, expected, found)
| _ -> None)
(fun (block, expected, found) ->
Inconsistent_operations_hash { block ; expected ; found })

View File

@ -0,0 +1,39 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type block_error =
| Cannot_parse_operation of Operation_hash.t
| Invalid_fitness of { expected: Fitness.t ; found: Fitness.t }
| Non_increasing_timestamp
| Non_increasing_fitness
| Invalid_level of { expected: Int32.t ; found: Int32.t }
| Invalid_proto_level of { expected: int ; found: int }
| Replayed_operation of Operation_hash.t
| Outdated_operation of
{ operation: Operation_hash.t;
originating_block: Block_hash.t }
| Expired_network of
{ net_id: Net_id.t ;
expiration: Time.t ;
timestamp: Time.t ;
}
| Unexpected_number_of_validation_passes of int (* uint8 *)
| Too_many_operations of { pass: int; found: int; max: int }
| Oversized_operation of { operation: Operation_hash.t;
size: int; max: int }
type error +=
| Invalid_block of
{ block: Block_hash.t ; error: block_error }
| Unavailable_protocol of
{ block: Block_hash.t ; protocol: Protocol_hash.t }
| Inconsistent_operations_hash of
{ block: Block_hash.t ;
expected: Operation_list_list_hash.t ;
found: Operation_list_list_hash.t }

View File

@ -0,0 +1,92 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
module Request = struct
type view = {
net_id : Net_id.t ;
block : Block_hash.t ;
peer : P2p_types.Peer_id.t option ;
}
let encoding =
let open Data_encoding in
conv
(fun { net_id ; block ; peer } -> (block, net_id, peer))
(fun (block, net_id, peer) -> { net_id ; block ; peer })
(obj3
(req "block" Block_hash.encoding)
(req "net_id" Net_id.encoding)
(opt "peer" P2p_types.Peer_id.encoding))
let pp ppf { net_id ; block ; peer } =
Format.fprintf ppf "Validation of %a (net: %a)"
Block_hash.pp block
Net_id.pp_short net_id ;
match peer with
| None -> ()
| Some peer ->
Format.fprintf ppf "from peer %a"
P2p_types.Peer_id.pp_short peer
end
module Event = struct
type t =
| Validation_success of Request.view * Worker_types.request_status
| Validation_failure of Request.view * Worker_types.request_status * error list
| Debug of string
let level req =
match req with
| Debug _ -> Logging.Debug
| Validation_success _
| Validation_failure _ -> Logging.Notice
let encoding error_encoding =
let open Data_encoding in
union
[ case (Tag 0)
(obj1 (req "message" string))
(function Debug msg -> Some msg | _ -> None)
(fun msg -> Debug msg) ;
case (Tag 1)
(obj2
(req "successful_validation" Request.encoding)
(req "status" Worker_types.request_status_encoding))
(function Validation_success (r, s) -> Some (r, s) | _ -> None)
(fun (r, s) -> Validation_success (r, s)) ;
case (Tag 2)
(obj3
(req "failed_validation" Request.encoding)
(req "status" Worker_types.request_status_encoding)
(dft "errors" error_encoding []))
(function Validation_failure (r, s, err) -> Some (r, s, err) | _ -> None)
(fun (r, s, err) -> Validation_failure (r, s, err)) ]
let pp ppf = function
| Debug msg -> Format.fprintf ppf "%s" msg
| Validation_success (req, { pushed ; treated ; completed }) ->
Format.fprintf ppf
"@[<v 2>Block %a succesfully validated@,\
Pushed: %a, Treated: %a, Completed: %a@]"
Block_hash.pp req.block
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
| Validation_failure (req, { pushed ; treated ; completed }, errs)->
Format.fprintf ppf
"@[<v 2>Validation of block %a failed@,\
Pushed: %a, Treated: %a, Completed: %a@,\
Error: %a@]"
Block_hash.pp req.block
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
Error_monad.pp_print_error errs
end
module Worker_state = struct
type view = unit
let encoding = Data_encoding.empty
let pp _ppf _view = ()
end

View File

@ -0,0 +1,34 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
module Request : sig
type view = {
net_id : Net_id.t ;
block : Block_hash.t ;
peer: P2p_types.Peer_id.t option ;
}
val encoding : view Data_encoding.encoding
val pp : Format.formatter -> view -> unit
end
module Event : sig
type t =
| Validation_success of Request.view * Worker_types.request_status
| Validation_failure of Request.view * Worker_types.request_status * error list
| Debug of string
val level : t -> Logging.level
val encoding : error list Data_encoding.encoding -> t Data_encoding.encoding
val pp : Format.formatter -> t -> unit
end
module Worker_state : sig
type view = unit
val encoding : view Data_encoding.encoding
val pp : Format.formatter -> view -> unit
end