Shell: use one 'validation worker' per peer.

The single validation module is split in multiple (simpler)
modules. In the process, we introduce one "validation worker" per
peer. This worker handle all the `New_head` and `New_branch`
advertised by a given peer. For so, it sends "fetching request" and
"validation request" to respectively the `Distributed_db` and and the
`Block_validator`. These two global workers are responsible of the
'fair' allocation of network and CPU ressources amongst the connected
'peers'.
This commit is contained in:
Grégoire Henry 2017-11-11 03:34:12 +01:00 committed by Benjamin Canou
parent c5b5a87ab7
commit f3555488c7
29 changed files with 2037 additions and 1184 deletions

View File

@ -18,9 +18,6 @@ let errors cctxt =
let forge_block_header cctxt header = let forge_block_header cctxt header =
call_service0 cctxt Services.forge_block_header header call_service0 cctxt Services.forge_block_header header
let validate_block cctxt net block =
call_err_service0 cctxt Services.validate_block (net, block)
type operation = Node_rpc_services.operation = type operation = Node_rpc_services.operation =
| Blob of Operation.t | Blob of Operation.t
| Hash of Operation_hash.t | Hash of Operation_hash.t

View File

@ -17,11 +17,6 @@ val forge_block_header:
Block_header.t -> Block_header.t ->
MBytes.t tzresult Lwt.t MBytes.t tzresult Lwt.t
val validate_block:
config ->
Net_id.t -> Block_hash.t ->
unit tzresult Lwt.t
type operation = type operation =
| Blob of Operation.t | Blob of Operation.t
| Hash of Operation_hash.t | Hash of Operation_hash.t

View File

@ -142,6 +142,7 @@ let init_node ?sandbox (config : Node_config_file.t) =
context_root = context_dir config.data_dir ; context_root = context_dir config.data_dir ;
p2p = p2p_config ; p2p = p2p_config ;
test_network_max_tll = Some (48 * 3600) ; (* 2 days *) test_network_max_tll = Some (48 * 3600) ; (* 2 days *)
bootstrap_threshold = 4 ; (* TODO add parameter *)
} in } in
Node.create node_config Node.create node_config

View File

@ -0,0 +1,545 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
include Logging.Make(struct let name = "node.validator.block" end)
module Canceler = Lwt_utils.Canceler
type 'a request =
| Request_validation: {
net_db: Distributed_db.net_db ;
notify_new_block: State.Block.t -> unit ;
canceler: Canceler.t option ;
peer: P2p.Peer_id.t option ;
hash: Block_hash.t ;
header: Block_header.t ;
operations: Operation.t list list ;
} -> State.Block.t tzresult request
type message = Message: 'a request * 'a Lwt.u option -> message
type t = {
protocol_validator: Protocol_validator.t ;
mutable worker: unit Lwt.t ;
messages: message Lwt_pipe.t ;
canceler: Canceler.t ;
}
(** Block validation *)
type block_error =
| Cannot_parse_operation of Operation_hash.t
| Invalid_fitness of { expected: Fitness.t ; found: Fitness.t }
| Inconsistent_netid of { operation: Operation_hash.t ;
expected: Net_id.t ; found: Net_id.t }
| Non_increasing_timestamp
| Non_increasing_fitness
| Invalid_level of { expected: Int32.t ; found: Int32.t }
| Invalid_proto_level of { expected: int ; found: int }
| Replayed_operation of Operation_hash.t
| Outdated_operation of
{ operation: Operation_hash.t;
originating_block: Block_hash.t }
| Expired_network of
{ net_id: Net_id.t ;
expiration: Time.t ;
timestamp: Time.t ;
}
| Unexpected_number_of_validation_passes of int (* uint8 *)
let block_error_encoding =
let open Data_encoding in
union
[
case
(obj2
(req "error" (constant "cannot_parse_operation"))
(req "operation" Operation_hash.encoding))
(function Cannot_parse_operation operation -> Some ((), operation)
| _ -> None)
(fun ((), operation) -> Cannot_parse_operation operation) ;
case
(obj3
(req "error" (constant "invalid_fitness"))
(req "expected" Fitness.encoding)
(req "found" Fitness.encoding))
(function
| Invalid_fitness { expected ; found } ->
Some ((), expected, found)
| _ -> None)
(fun ((), expected, found) -> Invalid_fitness { expected ; found }) ;
case
(obj1
(req "error" (constant "non_increasing_timestamp")))
(function Non_increasing_timestamp -> Some ()
| _ -> None)
(fun () -> Non_increasing_timestamp) ;
case
(obj1
(req "error" (constant "non_increasing_fitness")))
(function Non_increasing_fitness -> Some ()
| _ -> None)
(fun () -> Non_increasing_fitness) ;
case
(obj3
(req "error" (constant "invalid_level"))
(req "expected" int32)
(req "found" int32))
(function
| Invalid_level { expected ; found } ->
Some ((), expected, found)
| _ -> None)
(fun ((), expected, found) -> Invalid_level { expected ; found }) ;
case
(obj3
(req "error" (constant "invalid_proto_level"))
(req "expected" uint8)
(req "found" uint8))
(function
| Invalid_proto_level { expected ; found } ->
Some ((), expected, found)
| _ -> None)
(fun ((), expected, found) ->
Invalid_proto_level { expected ; found }) ;
case
(obj2
(req "error" (constant "replayed_operation"))
(req "operation" Operation_hash.encoding))
(function Replayed_operation operation -> Some ((), operation)
| _ -> None)
(fun ((), operation) -> Replayed_operation operation) ;
case
(obj3
(req "error" (constant "outdated_operation"))
(req "operation" Operation_hash.encoding)
(req "originating_block" Block_hash.encoding))
(function
| Outdated_operation { operation ; originating_block } ->
Some ((), operation, originating_block)
| _ -> None)
(fun ((), operation, originating_block) ->
Outdated_operation { operation ; originating_block }) ;
case
(obj2
(req "error" (constant "unexpected_number_of_passes"))
(req "found" uint8))
(function
| Unexpected_number_of_validation_passes n -> Some ((), n)
| _ -> None)
(fun ((), n) -> Unexpected_number_of_validation_passes n) ;
]
let pp_block_error ppf = function
| Cannot_parse_operation oph ->
Format.fprintf ppf
"Failed to parse the operation %a."
Operation_hash.pp_short oph
| Invalid_fitness { expected ; found } ->
Format.fprintf ppf
"@[<v 2>Invalid fitness:@ \
\ expected %a@ \
\ found %a@]"
Fitness.pp expected
Fitness.pp found
| Inconsistent_netid { operation ; expected ; found } ->
Format.fprintf ppf
"@[<v 2>The network identifier of the operation %a is not \
\ constitent with the network identifier of the block: @ \
\ expected: %a@ \
\ found: %a@]"
Operation_hash.pp_short operation
Net_id.pp expected
Net_id.pp found
| Non_increasing_timestamp ->
Format.fprintf ppf "Non increasing timestamp"
| Non_increasing_fitness ->
Format.fprintf ppf "Non increasing fitness"
| Invalid_level { expected ; found } ->
Format.fprintf ppf
"Invalid level:@ \
\ expected %ld@ \
\ found %ld"
expected
found
| Invalid_proto_level { expected ; found } ->
Format.fprintf ppf
"Invalid protocol level:@ \
\ expected %d@ \
\ found %d"
expected
found
| Replayed_operation oph ->
Format.fprintf ppf
"The operation %a was previously included in the chain."
Operation_hash.pp_short oph
| Outdated_operation { operation ; originating_block } ->
Format.fprintf ppf
"The operation %a is outdated (originated in block: %a)"
Operation_hash.pp_short operation
Block_hash.pp_short originating_block
| Expired_network { net_id ; expiration ; timestamp } ->
Format.fprintf ppf
"The block timestamp (%a) is later than \
its network expiration date: %a (net: %a)."
Time.pp_hum timestamp
Time.pp_hum expiration
Net_id.pp_short net_id
| Unexpected_number_of_validation_passes n ->
Format.fprintf ppf
"Invalid number of validation passes (found: %d)"
n
type error +=
| Invalid_block of
{ block: Block_hash.t ; error: block_error }
| Unavailable_protocol of
{ block: Block_hash.t ; protocol: Protocol_hash.t }
| Inconsistent_operations_hash of
{ block: Block_hash.t ;
expected: Operation_list_list_hash.t ;
found: Operation_list_list_hash.t }
let invalid_block block error = Invalid_block { block ; error }
let () =
Error_monad.register_error_kind
`Permanent
~id:"validator.invalid_block"
~title:"Invalid block"
~description:"Invalid block."
~pp:begin fun ppf (block, error) ->
Format.fprintf ppf
"@[<v 2>Invalid block %a@ %a@]"
Block_hash.pp_short block pp_block_error error
end
Data_encoding.(merge_objs
(obj1 (req "invalid_block" Block_hash.encoding))
block_error_encoding)
(function Invalid_block { block ; error } ->
Some (block, error) | _ -> None)
(fun (block, error) ->
Invalid_block { block ; error }) ;
Error_monad.register_error_kind
`Temporary
~id:"validator.unavailable_protocol"
~title:"Missing protocol"
~description:"The protocol required for validating a block is missing."
~pp:begin fun ppf (block, protocol) ->
Format.fprintf ppf
"Missing protocol (%a) when validating the block %a."
Protocol_hash.pp_short protocol
Block_hash.pp_short block
end
Data_encoding.(
obj2
(req "block" Block_hash.encoding)
(req "missing_protocol" Protocol_hash.encoding))
(function
| Unavailable_protocol { block ; protocol } ->
Some (block, protocol)
| _ -> None)
(fun (block, protocol) -> Unavailable_protocol { block ; protocol }) ;
Error_monad.register_error_kind
`Temporary
~id:"validator.inconsistent_operations_hash"
~title:"Invalid merkle tree"
~description:"The provided list of operations is inconsistent with \
the block header."
~pp:begin fun ppf (block, expected, found) ->
Format.fprintf ppf
"@[<v 2>The provided list of operations for block %a \
\ is inconsistent with the block header@ \
\ expected: %a@ \
\ found: %a@]"
Block_hash.pp_short block
Operation_list_list_hash.pp_short expected
Operation_list_list_hash.pp_short found
end
Data_encoding.(
obj3
(req "block" Block_hash.encoding)
(req "expected" Operation_list_list_hash.encoding)
(req "found" Operation_list_list_hash.encoding))
(function
| Inconsistent_operations_hash { block ; expected ; found } ->
Some (block, expected, found)
| _ -> None)
(fun (block, expected, found) ->
Inconsistent_operations_hash { block ; expected ; found })
let check_header
(pred_header: Block_header.t) hash (header: Block_header.t) =
fail_unless
(Int32.succ pred_header.shell.level = header.shell.level)
(invalid_block hash @@
Invalid_level { expected = Int32.succ pred_header.shell.level ;
found = header.shell.level }) >>=? fun () ->
fail_unless
Time.(pred_header.shell.timestamp < header.shell.timestamp)
(invalid_block hash Non_increasing_timestamp) >>=? fun () ->
fail_unless
Fitness.(pred_header.shell.fitness < header.shell.fitness)
(invalid_block hash Non_increasing_fitness) >>=? fun () ->
fail_unless
(header.shell.validation_passes <= 1) (* FIXME to be found in Proto *)
(invalid_block hash
(Unexpected_number_of_validation_passes header.shell.validation_passes)
) >>=? fun () ->
return ()
let assert_no_duplicate_operations block live_operations operation_hashes =
fold_left_s (fold_left_s (fun live_operations oph ->
fail_when (Operation_hash.Set.mem oph live_operations)
(invalid_block block @@ Replayed_operation oph) >>=? fun () ->
return (Operation_hash.Set.add oph live_operations)))
live_operations operation_hashes >>=? fun _ ->
return ()
let assert_operation_liveness block live_blocks operations =
iter_s (iter_s (fun op ->
fail_unless
(Block_hash.Set.mem op.Operation.shell.branch live_blocks)
(invalid_block block @@
Outdated_operation { operation = Operation.hash op ;
originating_block = op.shell.branch })))
operations
let check_liveness pred hash operations_hashes operations =
Chain_traversal.live_blocks
pred (State.Block.max_operations_ttl pred) >>= fun (live_blocks,
live_operations) ->
assert_no_duplicate_operations
hash live_operations operations_hashes >>=? fun () ->
assert_operation_liveness hash live_blocks operations >>=? fun () ->
return ()
let apply_block
pred (module Proto : State.Registred_protocol.T)
hash (header: Block_header.t)
operations =
let pred_header = State.Block.header pred
and pred_hash = State.Block.hash pred in
check_header pred_header hash header >>=? fun () ->
let operation_hashes = List.map (List.map Operation.hash) operations in
check_liveness pred hash operation_hashes operations >>=? fun () ->
iter_p (iter_p (fun op ->
let op_hash = Operation.hash op in
fail_unless
(Net_id.equal op.shell.net_id header.shell.net_id)
(invalid_block hash @@ Inconsistent_netid {
operation = op_hash ;
expected = header.shell.net_id ;
found = op.shell.net_id ;
})))
operations >>=? fun () ->
map2_s (map2_s begin fun op_hash raw ->
Lwt.return (Proto.parse_operation op_hash raw)
|> trace (invalid_block hash (Cannot_parse_operation op_hash))
end)
operation_hashes
operations >>=? fun parsed_operations ->
State.Block.context pred >>= fun pred_context ->
Context.reset_test_network
pred_context pred_hash header.shell.timestamp >>= fun context ->
(* TODO wrap 'proto_error' into 'block_error' *)
Proto.begin_application
~predecessor_context:context
~predecessor_timestamp:pred_header.shell.timestamp
~predecessor_fitness:pred_header.shell.fitness
header >>=? fun state ->
fold_left_s (fold_left_s (fun state op ->
Proto.apply_operation state op >>=? fun state ->
return state))
state parsed_operations >>=? fun state ->
Proto.finalize_block state >>=? fun new_context ->
Context.get_protocol new_context.context >>= fun new_protocol ->
let expected_proto_level =
if Protocol_hash.equal new_protocol Proto.hash then
pred_header.shell.proto_level
else
(pred_header.shell.proto_level + 1) mod 256 in
fail_when (header.shell.proto_level <> expected_proto_level)
(invalid_block hash @@ Invalid_proto_level {
found = header.shell.proto_level ;
expected = expected_proto_level ;
}) >>=? fun () ->
fail_when
Fitness.(new_context.fitness <> header.shell.fitness)
(invalid_block hash @@ Invalid_fitness {
expected = header.shell.fitness ;
found = new_context.fitness ;
}) >>=? fun () ->
let max_operations_ttl =
max 0
(min
((State.Block.max_operations_ttl pred)+1)
new_context.max_operations_ttl) in
let new_context =
{ new_context with max_operations_ttl } in
return new_context
let check_net_liveness net_db hash (header: Block_header.t) =
let net_state = Distributed_db.net_state net_db in
match State.Net.expiration net_state with
| Some eol when Time.(eol <= header.shell.timestamp) ->
fail @@ invalid_block hash @@
Expired_network { net_id = State.Net.id net_state ;
expiration = eol ;
timestamp = header.shell.timestamp }
| None | Some _ -> return ()
let get_proto pred hash =
State.Block.context pred >>= fun pred_context ->
Context.get_protocol pred_context >>= fun pred_protocol_hash ->
match State.Registred_protocol.get pred_protocol_hash with
| None ->
fail (Unavailable_protocol { block = hash ;
protocol = pred_protocol_hash })
| Some p -> return p
let rec worker_loop bv =
begin
Lwt_utils.protect ~canceler:bv.canceler begin fun () ->
Lwt_pipe.pop bv.messages >>= return
end >>=? fun (Message (request, wakener)) ->
let may_wakeup =
match wakener with
| None -> (fun _ -> ())
| Some wakener -> (fun v -> Lwt.wakeup_later wakener v)
in
match request with
| Request_validation { net_db ; notify_new_block ; canceler ;
peer ; hash ; header ; operations } ->
let net_state = Distributed_db.net_state net_db in
State.Block.read_opt net_state hash >>= function
| Some block ->
lwt_debug "previously validated block %a (after pipe)"
Block_hash.pp_short hash >>= fun () ->
Protocol_validator.prefetch_and_compile_protocols
bv.protocol_validator ?peer ~timeout:60. block ;
may_wakeup (Ok block) ;
return ()
| None ->
begin
lwt_debug "validating block %a"
Block_hash.pp_short hash >>= fun () ->
State.Block.read
net_state header.shell.predecessor >>=? fun pred ->
get_proto pred hash >>=? fun proto ->
(* TODO also protect with [bv.canceler]. *)
Lwt_utils.protect ?canceler begin fun () ->
apply_block pred proto hash header operations
end
end >>= function
| Ok result -> begin
lwt_log_info "validated block %a"
Block_hash.pp_short hash >>= fun () ->
Lwt_utils.protect ~canceler:bv.canceler begin fun () ->
Distributed_db.commit_block
net_db hash header operations result
end >>=? function
| None ->
assert false (* should not happen *)
| Some block ->
Protocol_validator.prefetch_and_compile_protocols
bv.protocol_validator ?peer ~timeout:60. block ;
may_wakeup (Ok block) ;
notify_new_block block ;
return ()
end
(* TODO catch other temporary error (e.g. system errors)
and do not 'commit' them on disk... *)
| Error [Lwt_utils.Canceled | Unavailable_protocol _] as err ->
may_wakeup err ;
return ()
| Error errors as err ->
Lwt_utils.protect ~canceler:bv.canceler begin fun () ->
Distributed_db.commit_invalid_block
net_db hash header errors
end >>=? fun commited ->
assert commited ;
may_wakeup err ;
return ()
end >>= function
| Ok () ->
worker_loop bv
| Error [Exn (Unix.Unix_error _) as err] ->
lwt_log_error "validation failed with %a"
pp_print_error [err] >>= fun () ->
worker_loop bv
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
lwt_log_notice "terminating" >>= fun () ->
Lwt.return_unit
| Error err ->
lwt_log_error "@[Unexpected error:@ %a@]"
pp_print_error err >>= fun () ->
Canceler.cancel bv.canceler >>= fun () ->
Lwt.return_unit
let create db =
let protocol_validator = Protocol_validator.create db in
let canceler = Canceler.create () in
let messages = Lwt_pipe.create () in
let bv = {
protocol_validator ;
canceler ; messages ;
worker = Lwt.return_unit } in
Canceler.on_cancel bv.canceler begin fun () ->
Lwt_pipe.close bv.messages ;
Lwt.return_unit
end ;
bv.worker <-
Lwt_utils.worker "block_validator"
~run:(fun () -> worker_loop bv)
~cancel:(fun () -> Canceler.cancel bv.canceler) ;
bv
let shutdown { canceler ; worker } =
Canceler.cancel canceler >>= fun () ->
worker
let validate { messages ; protocol_validator }
?canceler ?peer ?(notify_new_block = fun _ -> ())
net_db hash (header : Block_header.t) operations =
let net_state = Distributed_db.net_state net_db in
State.Block.read_opt net_state hash >>= function
| Some block ->
lwt_debug "previously validated block %a (before pipe)"
Block_hash.pp_short hash >>= fun () ->
Protocol_validator.prefetch_and_compile_protocols
protocol_validator ?peer ~timeout:60. block ;
return block
| None ->
let res, wakener = Lwt.task () in
map_p (map_p (fun op ->
let op_hash = Operation.hash op in
return op_hash))
operations >>=? fun hashes ->
let computed_hash =
Operation_list_list_hash.compute
(List.map Operation_list_hash.compute hashes) in
fail_when
(Operation_list_list_hash.compare
computed_hash header.shell.operations_hash <> 0)
(Inconsistent_operations_hash {
block = hash ;
expected = header.shell.operations_hash ;
found = computed_hash ;
}) >>=? fun () ->
check_net_liveness net_db hash header >>=? fun () ->
lwt_debug "pushing validation request for block %a"
Block_hash.pp_short hash >>= fun () ->
Lwt_pipe.push messages
(Message (Request_validation
{ net_db ; notify_new_block ; canceler ;
peer ; hash ; header ; operations },
Some wakener)) >>= fun () ->
res
let fetch_and_compile_protocol bv =
Protocol_validator.fetch_and_compile_protocol bv.protocol_validator

View File

@ -0,0 +1,59 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type t
type block_error =
| Cannot_parse_operation of Operation_hash.t
| Invalid_fitness of { expected: Fitness.t ; found: Fitness.t }
| Inconsistent_netid of { operation: Operation_hash.t ;
expected: Net_id.t ; found: Net_id.t }
| Non_increasing_timestamp
| Non_increasing_fitness
| Invalid_level of { expected: Int32.t ; found: Int32.t }
| Invalid_proto_level of { expected: int ; found: int }
| Replayed_operation of Operation_hash.t
| Outdated_operation of
{ operation: Operation_hash.t;
originating_block: Block_hash.t }
| Expired_network of
{ net_id: Net_id.t ;
expiration: Time.t ;
timestamp: Time.t ;
}
| Unexpected_number_of_validation_passes of int (* uint8 *)
type error +=
| Invalid_block of
{ block: Block_hash.t ; error: block_error }
| Unavailable_protocol of
{ block: Block_hash.t ; protocol: Protocol_hash.t }
| Inconsistent_operations_hash of
{ block: Block_hash.t ;
expected: Operation_list_list_hash.t ;
found: Operation_list_list_hash.t }
val create: Distributed_db.t -> t
val validate:
t ->
?canceler:Lwt_utils.Canceler.t ->
?peer:P2p.Peer_id.t ->
?notify_new_block:(State.Block.t -> unit) ->
Distributed_db.net_db ->
Block_hash.t -> Block_header.t -> Operation.t list list ->
State.Block.t tzresult Lwt.t
val fetch_and_compile_protocol:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
Protocol_hash.t -> State.Registred_protocol.t tzresult Lwt.t
val shutdown: t -> unit Lwt.t

View File

@ -0,0 +1,231 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
include Logging.Make(struct let name = "node.validator.bootstrap_pipeline" end)
module Canceler = Lwt_utils.Canceler
type t = {
canceler: Canceler.t ;
mutable headers_fetch_worker: unit Lwt.t ;
mutable operations_fetch_worker: unit Lwt.t ;
mutable validation_worker: unit Lwt.t ;
peer_id: P2p.Peer_id.t ;
net_db: Distributed_db.net_db ;
locator: Block_locator.t ;
block_validator: Block_validator.t ;
notify_new_block: State.Block.t -> unit ;
fetched_headers:
(Block_hash.t * Block_header.t) Lwt_pipe.t ;
fetched_blocks:
(Block_hash.t * Block_header.t * Operation.t list list) Lwt_pipe.t ;
(* HACK, a worker should be able to return the 'error'. *)
mutable errors: Error_monad.error list ;
}
let fetch_step pipeline (step : Block_locator.step) =
lwt_log_info "fetching step %a -> %a (%d%s) from peer %a."
Block_hash.pp_short step.block
Block_hash.pp_short step.predecessor
step.step
(if step.strict_step then "" else " max")
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
let rec fetch_loop acc hash cpt =
Lwt_unix.yield () >>= fun () ->
if cpt < 0 then
lwt_log_info "invalid step from peer %a (too long)."
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
fail (Block_locator.Invalid_locator
(pipeline.peer_id, pipeline.locator))
else if Block_hash.equal hash step.predecessor then
if step.strict_step && cpt <> 0 then
lwt_log_info "invalid step from peer %a (too short)."
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
fail (Block_locator.Invalid_locator
(pipeline.peer_id, pipeline.locator))
else
return acc
else
lwt_debug "fetching block header %a from peer %a."
Block_hash.pp_short hash
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
Distributed_db.Block_header.fetch
~timeout:60. (* TODO allow to adjust the constant ... *)
pipeline.net_db ~peer:pipeline.peer_id
hash ()
end >>=? fun header ->
lwt_debug "fetched block header %a from peer %a."
Block_hash.pp_short hash
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
fetch_loop ((hash, header) :: acc) header.shell.predecessor (cpt - 1)
in
fetch_loop [] step.block step.step >>=? fun headers ->
iter_s
begin fun header ->
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.push pipeline.fetched_headers header >>= return
end
end
headers >>=? fun () ->
return ()
let headers_fetch_worker_loop pipeline =
begin
let steps = Block_locator.to_steps pipeline.locator in
iter_s (fetch_step pipeline) steps >>=? fun () ->
return ()
end >>= function
| Ok () ->
lwt_log_info "fetched all step from peer %a."
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
Lwt_pipe.close pipeline.fetched_headers ;
Lwt.return_unit
| Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
Lwt.return_unit
| Error err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error "@[Unexpected error (headers fetch):@ %a@]"
pp_print_error err >>= fun () ->
Canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
let rec operations_fetch_worker_loop pipeline =
begin
Lwt_unix.yield () >>= fun () ->
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.pop pipeline.fetched_headers >>= return
end >>=? fun (hash, header) ->
lwt_log_info "fetching operations of block %a from peer %a."
Block_hash.pp_short hash
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
map_p
(fun i ->
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
Distributed_db.Operations.fetch
~timeout:60. (* TODO allow to adjust the constant ... *)
pipeline.net_db ~peer:pipeline.peer_id
(hash, i) header.shell.operations_hash
end)
(0 -- (header.shell.validation_passes - 1)) >>=? fun operations ->
lwt_log_info "fetched operations of block %a from peer %a."
Block_hash.pp_short hash
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.push pipeline.fetched_blocks
(hash, header, operations) >>= return
end
end >>= function
| Ok () ->
operations_fetch_worker_loop pipeline
| Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
Lwt_pipe.close pipeline.fetched_blocks ;
Lwt.return_unit
| Error err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error "@[Unexpected error (operations fetch):@ %a@]"
pp_print_error err >>= fun () ->
Canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
let rec validation_worker_loop pipeline =
begin
Lwt_unix.yield () >>= fun () ->
Lwt_utils.protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.pop pipeline.fetched_blocks >>= return
end >>=? fun (hash, header, operations) ->
lwt_log_info "requesting validation for block %a from peer %a."
Block_hash.pp_short hash
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
Block_validator.validate
~canceler:pipeline.canceler
~notify_new_block:pipeline.notify_new_block
pipeline.block_validator
pipeline.net_db hash header operations >>=? fun _block ->
lwt_log_info "validated block %a from peer %a."
Block_hash.pp_short hash
P2p.Peer_id.pp_short pipeline.peer_id >>= fun () ->
return ()
end >>= function
| Ok () -> validation_worker_loop pipeline
| Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
Lwt.return_unit
| Error ([ Block_validator.Invalid_block _
| Block_validator.Unavailable_protocol _ ] as err ) ->
(* Propagate the error to the peer validator. *)
pipeline.errors <- pipeline.errors @ err ;
Canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
| Error err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error "@[Unexpected error (validator):@ %a@]"
pp_print_error err >>= fun () ->
Canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
let create
?(notify_new_block = fun _ -> ())
block_validator peer_id net_db locator =
let canceler = Canceler.create () in
let fetched_headers =
Lwt_pipe.create ~size:(50, fun _ -> 1) () in
let fetched_blocks =
Lwt_pipe.create ~size:(50, fun _ -> 1) () in
let pipeline = {
canceler ;
headers_fetch_worker = Lwt.return_unit ;
operations_fetch_worker = Lwt.return_unit ;
validation_worker = Lwt.return_unit ;
notify_new_block ;
peer_id ; net_db ; locator ;
block_validator ;
fetched_headers ; fetched_blocks ;
errors = [] ;
} in
Canceler.on_cancel pipeline.canceler begin fun () ->
Lwt_pipe.close fetched_blocks ;
Lwt_pipe.close fetched_headers ;
Lwt.return_unit
end ;
let head, _ = (pipeline.locator : Block_locator.t :> _ * _) in
let hash = Block_header.hash head in
pipeline.headers_fetch_worker <-
Lwt_utils.worker
(Format.asprintf "bootstrap_pipeline-headers_fetch.%a.%a"
P2p.Peer_id.pp_short peer_id Block_hash.pp_short hash)
~run:(fun () -> headers_fetch_worker_loop pipeline)
~cancel:(fun () -> Canceler.cancel pipeline.canceler) ;
pipeline.operations_fetch_worker <-
Lwt_utils.worker
(Format.asprintf "bootstrap_pipeline-operations_fetch.%a.%a"
P2p.Peer_id.pp_short peer_id Block_hash.pp_short hash)
~run:(fun () -> operations_fetch_worker_loop pipeline)
~cancel:(fun () -> Canceler.cancel pipeline.canceler) ;
pipeline.validation_worker <-
Lwt_utils.worker
(Format.asprintf "bootstrap_pipeline-validation.%a.%a"
P2p.Peer_id.pp_short peer_id Block_hash.pp_short hash)
~run:(fun () -> validation_worker_loop pipeline)
~cancel:(fun () -> Canceler.cancel pipeline.canceler) ;
pipeline
let wait_workers pipeline =
pipeline.headers_fetch_worker >>= fun () ->
pipeline.operations_fetch_worker >>= fun () ->
pipeline.validation_worker >>= fun () ->
Lwt.return_unit
let wait pipeline =
wait_workers pipeline >>= fun () ->
match pipeline.errors with
| [] -> return ()
| errors -> Lwt.return_error errors
let cancel pipeline =
Canceler.cancel pipeline.canceler >>= fun () ->
wait_workers pipeline

View File

@ -0,0 +1,20 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type t
val create:
?notify_new_block: (State.Block.t -> unit) ->
Block_validator.t ->
P2p.Peer_id.t -> Distributed_db.net_db ->
Block_locator.t -> t
val wait: t -> unit tzresult Lwt.t
val cancel: t -> unit Lwt.t

View File

@ -65,7 +65,7 @@ let set_head net_state block =
locked_set_head chain_store data block >>= fun () -> locked_set_head chain_store data block >>= fun () ->
Lwt.return (Some { current_head = block ; Lwt.return (Some { current_head = block ;
current_reversed_mempool = [] }, current_reversed_mempool = [] },
()) data.current_head)
end end
let test_and_set_head net_state ~old block = let test_and_set_head net_state ~old block =

View File

@ -22,8 +22,9 @@ val known_heads: Net.t -> Block.t list Lwt.t
val mem: Net.t -> Block_hash.t -> bool Lwt.t val mem: Net.t -> Block_hash.t -> bool Lwt.t
(** Test whether a block belongs to the current mainnet. *) (** Test whether a block belongs to the current mainnet. *)
val set_head: Net.t -> Block.t -> unit Lwt.t val set_head: Net.t -> Block.t -> Block.t Lwt.t
(** Record a block as the current head of the network's blockchain. *) (** Record a block as the current head of the network's blockchain.
It returns the previous head. *)
val set_reversed_mempool: Net.t -> Operation_hash.t list -> unit Lwt.t val set_reversed_mempool: Net.t -> Operation_hash.t list -> unit Lwt.t
(** Record a list as the current list of pending operations. *) (** Record a list as the current list of pending operations. *)

View File

@ -516,20 +516,6 @@ module P2p_reader = struct
| Operation_hashes_for_block (net_id, block, ofs, ops, path) -> begin | Operation_hashes_for_block (net_id, block, ofs, ops, path) -> begin
may_handle state net_id @@ fun net_db -> may_handle state net_id @@ fun net_db ->
(* TODO early detection of non-requested list. *) (* TODO early detection of non-requested list. *)
let found_hash, found_ofs =
Operation_list_list_hash.check_path
path (Operation_list_hash.compute ops) in
if found_ofs <> ofs then
Lwt.return_unit
else
Raw_block_header.Table.read_opt
net_db.block_header_db.table block >>= function
| None -> Lwt.return_unit
| Some bh ->
if Operation_list_list_hash.compare
found_hash bh.shell.operations_hash <> 0 then
Lwt.return_unit
else
Raw_operation_hashes.Table.notify Raw_operation_hashes.Table.notify
net_db.operation_hashes_db.table state.gid net_db.operation_hashes_db.table state.gid
(block, ofs) (ops, path) >>= fun () -> (block, ofs) (ops, path) >>= fun () ->
@ -555,20 +541,6 @@ module P2p_reader = struct
| Operations_for_block (net_id, block, ofs, ops, path) -> | Operations_for_block (net_id, block, ofs, ops, path) ->
may_handle state net_id @@ fun net_db -> may_handle state net_id @@ fun net_db ->
(* TODO early detection of non-requested operations. *) (* TODO early detection of non-requested operations. *)
let found_hash, found_ofs =
Operation_list_list_hash.check_path
path (Operation_list_hash.compute (List.map Operation.hash ops)) in
if found_ofs <> ofs then
Lwt.return_unit
else
Raw_block_header.Table.read_opt
net_db.block_header_db.table block >>= function
| None -> Lwt.return_unit
| Some bh ->
if Operation_list_list_hash.compare
found_hash bh.shell.operations_hash <> 0 then
Lwt.return_unit
else
Raw_operations.Table.notify Raw_operations.Table.notify
net_db.operations_db.table state.gid net_db.operations_db.table state.gid
(block, ofs) (ops, path) >>= fun () -> (block, ofs) (ops, path) >>= fun () ->
@ -601,7 +573,9 @@ module P2p_reader = struct
end) end)
db.active_nets ; db.active_nets ;
state.worker <- state.worker <-
Lwt_utils.worker "db_network_reader" Lwt_utils.worker
(Format.asprintf "db_network_reader.%a"
P2p.Peer_id.pp_short gid)
~run:(fun () -> worker_loop db state) ~run:(fun () -> worker_loop db state)
~cancel:(fun () -> Lwt_utils.Canceler.cancel canceler) ; ~cancel:(fun () -> Lwt_utils.Canceler.cancel canceler) ;
P2p.Peer_id.Table.add db.p2p_readers gid state P2p.Peer_id.Table.add db.p2p_readers gid state
@ -738,36 +712,41 @@ let read_all_operations net_db hash n =
map_p (Raw_operation.Table.read net_db.operation_db.table) hashes) map_p (Raw_operation.Table.read net_db.operation_db.table) hashes)
operations operations
let commit_block net_db hash validation_result = let clear_block net_db hash n =
Raw_block_header.Table.read (* TODO use a reference counter ?? *)
net_db.block_header_db.table hash >>=? fun header -> Raw_operations.clear_all net_db.operations_db.table hash n ;
read_all_operations net_db Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ;
hash header.shell.validation_passes >>=? fun operations -> Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash
State.Block.store
net_db.net_state header operations validation_result >>=? fun res -> let commit_block net_db hash header operations result =
Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash ; assert (Block_hash.equal hash (Block_header.hash header)) ;
Raw_operation_hashes.clear_all assert (Net_id.equal (State.Net.id net_db.net_state) header.shell.net_id) ;
net_db.operation_hashes_db.table hash header.shell.validation_passes ; assert (List.length operations = header.shell.validation_passes) ;
Raw_operations.clear_all State.Block.store net_db.net_state header operations result >>=? fun res ->
net_db.operations_db.table hash header.shell.validation_passes ; clear_block net_db hash header.shell.validation_passes ;
(* TODO: proper handling of the operations table by the prevalidator. *)
List.iter
(List.iter
(fun op -> Raw_operation.Table.clear_or_cancel
net_db.operation_db.table
(Operation.hash op)))
operations ;
return res return res
let commit_invalid_block net_db hash = let commit_invalid_block net_db hash header _err =
Raw_block_header.Table.read assert (Block_hash.equal hash (Block_header.hash header)) ;
net_db.block_header_db.table hash >>=? fun header -> assert (Net_id.equal (State.Net.id net_db.net_state) header.shell.net_id) ;
State.Block.store_invalid net_db.net_state header >>=? fun res -> State.Block.store_invalid net_db.net_state header >>=? fun res ->
Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash ; clear_block net_db hash header.shell.validation_passes ;
Raw_operation_hashes.clear_all return res
net_db.operation_hashes_db.table hash header.shell.validation_passes ;
Raw_operations.clear_all let clear_operations net_db operations =
net_db.operations_db.table hash header.shell.validation_passes ; List.iter
(List.iter
(Raw_operation.Table.clear_or_cancel net_db.operation_db.table))
operations
let inject_block_header net_db h b =
fail_unless
(Net_id.equal
b.Block_header.shell.net_id
(State.Net.id net_db.net_state))
(failure "Inconsitent net_id in operation") >>=? fun () ->
Raw_block_header.Table.inject
net_db.block_header_db.table h b >>= fun res ->
return res return res
let inject_operation net_db h op = let inject_operation net_db h op =
@ -780,8 +759,7 @@ let inject_operation net_db h op =
let inject_protocol db h p = let inject_protocol db h p =
Raw_protocol.Table.inject db.protocol_db.table h p Raw_protocol.Table.inject db.protocol_db.table h p
let commit_protocol db h = let commit_protocol db h p =
Raw_protocol.Table.read db.protocol_db.table h >>=? fun p ->
State.Protocol.store db.disk p >>= fun res -> State.Protocol.store db.disk p >>= fun res ->
Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ; Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ;
return (res <> None) return (res <> None)
@ -795,49 +773,10 @@ let resolve_operation net_db = function
fail_unless fail_unless
(Net_id.equal op.shell.net_id (State.Net.id net_db.net_state)) (Net_id.equal op.shell.net_id (State.Net.id net_db.net_state))
(failure "Inconsistent net_id in operation.") >>=? fun () -> (failure "Inconsistent net_id in operation.") >>=? fun () ->
return (Operation.hash op, op) return op
| Hash oph -> | Hash oph ->
Raw_operation.Table.read net_db.operation_db.table oph >>=? fun op -> Raw_operation.Table.read net_db.operation_db.table oph >>=? fun op ->
return (oph, op) return op
let inject_block db bytes operations =
let hash = Block_hash.hash_bytes [bytes] in
match Block_header.of_bytes bytes with
| None ->
failwith "Cannot parse block header."
| Some block ->
match get_net db block.shell.net_id with
| None ->
failwith "Unknown network."
| Some net_db ->
map_p
(map_p (resolve_operation net_db))
operations >>=? fun operations ->
let hashes = List.map (List.map fst) operations in
let operations = List.map (List.map snd) operations in
let computed_hash =
Operation_list_list_hash.compute
(List.map Operation_list_hash.compute hashes) in
fail_when
(Operation_list_list_hash.compare
computed_hash block.shell.operations_hash <> 0)
(Exn (Failure "Incoherent operation list")) >>=? fun () ->
Raw_block_header.Table.inject
net_db.block_header_db.table hash block >>= function
| false ->
failwith "Previously injected block."
| true ->
Raw_operation_hashes.inject_all
net_db.operation_hashes_db.table hash hashes >>= fun _ ->
Raw_operations.inject_all
net_db.operations_db.table hash operations >>= fun _ ->
return (hash, block)
let clear_block net_db hash n =
Raw_operations.clear_all net_db.operations_db.table hash n ;
Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ;
Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash
let watch_block_header { block_input } = let watch_block_header { block_input } =
Watcher.create_stream block_input Watcher.create_stream block_input

View File

@ -43,24 +43,30 @@ type operation =
| Hash of Operation_hash.t | Hash of Operation_hash.t
val resolve_operation: val resolve_operation:
net_db -> operation -> (Operation_hash.t * Operation.t) tzresult Lwt.t net_db -> operation -> Operation.t tzresult Lwt.t
val commit_block: val commit_block:
net_db -> Block_hash.t -> Updater.validation_result -> net_db ->
Block_hash.t ->
Block_header.t -> Operation.t list list ->
Updater.validation_result ->
State.Block.t option tzresult Lwt.t State.Block.t option tzresult Lwt.t
val commit_invalid_block: val commit_invalid_block:
net_db -> Block_hash.t -> net_db ->
Block_hash.t -> Block_header.t -> Error_monad.error list ->
bool tzresult Lwt.t bool tzresult Lwt.t
val inject_block:
t -> MBytes.t -> operation list list -> val clear_operations: net_db -> Operation_hash.t list list -> unit
(Block_hash.t * Block_header.t) tzresult Lwt.t
val clear_block: net_db -> Block_hash.t -> int -> unit val inject_block_header:
net_db -> Block_hash.t -> Block_header.t -> bool tzresult Lwt.t
val inject_operation: val inject_operation:
net_db -> Operation_hash.t -> Operation.t -> bool tzresult Lwt.t net_db -> Operation_hash.t -> Operation.t -> bool tzresult Lwt.t
val commit_protocol: val commit_protocol:
db -> Protocol_hash.t -> bool tzresult Lwt.t db -> Protocol_hash.t -> Protocol.t -> bool tzresult Lwt.t
val inject_protocol: val inject_protocol:
db -> Protocol_hash.t -> Protocol.t -> bool Lwt.t db -> Protocol_hash.t -> Protocol.t -> bool Lwt.t
@ -110,6 +116,10 @@ module Operations :
and type value = Operation.t list and type value = Operation.t list
and type param := Operation_list_list_hash.t and type param := Operation_list_list_hash.t
val read_all_operations:
net_db -> Block_hash.t -> int -> Operation.t list list tzresult Lwt.t
module Operation_hashes : module Operation_hashes :
DISTRIBUTED_DB with type t = net_db DISTRIBUTED_DB with type t = net_db
and type key = Block_hash.t * int and type key = Block_hash.t * int

View File

@ -0,0 +1,325 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
include Logging.Make(struct let name = "node.validator.net" end)
module Canceler = Lwt_utils.Canceler
type t = {
db: Distributed_db.t ;
net_state: State.Net.t ;
net_db: Distributed_db.net_db ;
block_validator: Block_validator.t ;
bootstrap_threshold: int ;
mutable bootstrapped: bool ;
bootstrapped_wakener: unit Lwt.u ;
valid_block_input: State.Block.t Watcher.input ;
global_valid_block_input: State.Block.t Watcher.input ;
new_head_input: State.Block.t Watcher.input ;
parent: t option ;
max_child_ttl: int option ;
mutable child: t option ;
prevalidator: Prevalidator.t ;
active_peers: Peer_validator.t Lwt.t P2p.Peer_id.Table.t ;
bootstrapped_peers: unit P2p.Peer_id.Table.t ;
mutable worker: unit Lwt.t ;
queue: State.Block.t Lwt_pipe.t ;
canceler: Canceler.t ;
}
let rec shutdown nv =
Canceler.cancel nv.canceler >>= fun () ->
Distributed_db.deactivate nv.net_db >>= fun () ->
Lwt.join
( nv.worker ::
Prevalidator.shutdown nv.prevalidator ::
Lwt_utils.may ~f:shutdown nv.child ::
P2p.Peer_id.Table.fold
(fun _ pv acc -> (pv >>= Peer_validator.shutdown) :: acc)
nv.active_peers [] ) >>= fun () ->
Lwt.return_unit
let shutdown_child nv =
Lwt_utils.may ~f:shutdown nv.child
let notify_new_block nv block =
iter_option nv.parent
~f:(fun nv -> Watcher.notify nv.valid_block_input block) ;
Watcher.notify nv.valid_block_input block ;
Watcher.notify nv.global_valid_block_input block ;
assert (Lwt_pipe.push_now nv.queue block)
let may_toggle_bootstrapped_network nv =
if not nv.bootstrapped &&
P2p.Peer_id.Table.length nv.bootstrapped_peers >= nv.bootstrap_threshold
then begin
nv.bootstrapped <- true ;
Lwt.wakeup_later nv.bootstrapped_wakener () ;
end
let may_activate_peer_validator nv peer_id =
try P2p.Peer_id.Table.find nv.active_peers peer_id
with Not_found ->
let pv =
Peer_validator.create
~notify_new_block:(notify_new_block nv)
~notify_bootstrapped: begin fun () ->
P2p.Peer_id.Table.add nv.bootstrapped_peers peer_id () ;
may_toggle_bootstrapped_network nv
end
~notify_termination: begin fun _pv ->
P2p.Peer_id.Table.remove nv.active_peers peer_id ;
P2p.Peer_id.Table.remove nv.bootstrapped_peers peer_id ;
end
nv.block_validator nv.net_db peer_id in
P2p.Peer_id.Table.add nv.active_peers peer_id pv ;
pv
let broadcast_head nv ~previous block =
if not nv.bootstrapped then
Lwt.return_unit
else begin
begin
State.Block.predecessor block >>= function
| None -> Lwt.return_true
| Some predecessor ->
Lwt.return (State.Block.equal predecessor previous)
end >>= fun successor ->
if successor then begin
Distributed_db.Advertise.current_head nv.net_db block ;
Lwt.return_unit
end else begin
Distributed_db.Advertise.current_branch nv.net_db block
end
end
let rec create
?max_child_ttl ?parent
?(bootstrap_threshold = 1)
block_validator
global_valid_block_input db net_state =
let net_db = Distributed_db.activate db net_state in
Prevalidator.create net_db >>= fun prevalidator ->
let valid_block_input = Watcher.create_input () in
let new_head_input = Watcher.create_input () in
let canceler = Canceler.create () in
let _, bootstrapped_wakener = Lwt.wait () in
let nv = {
db ; net_state ; net_db ; block_validator ;
prevalidator ;
valid_block_input ; global_valid_block_input ;
new_head_input ;
parent ; max_child_ttl ; child = None ;
bootstrapped = (bootstrap_threshold <= 0) ;
bootstrapped_wakener ;
bootstrap_threshold ;
active_peers =
P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *)
bootstrapped_peers =
P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *)
worker = Lwt.return_unit ;
queue = Lwt_pipe.create () ;
canceler ;
} in
if nv.bootstrapped then Lwt.wakeup_later bootstrapped_wakener () ;
Distributed_db.set_callback net_db {
notify_branch = begin fun peer_id locator ->
Lwt.async begin fun () ->
may_activate_peer_validator nv peer_id >>= fun pv ->
Peer_validator.notify_branch pv locator ;
Lwt.return_unit
end
end ;
notify_head = begin fun peer_id block ops ->
Lwt.async begin fun () ->
may_activate_peer_validator nv peer_id >>= fun pv ->
Peer_validator.notify_head pv block ;
(* TODO notify prevalidator only if head is known ??? *)
Prevalidator.notify_operations nv.prevalidator peer_id ops ;
Lwt.return_unit
end;
end ;
disconnection = begin fun peer_id ->
Lwt.async begin fun () ->
may_activate_peer_validator nv peer_id >>= fun pv ->
Peer_validator.shutdown pv >>= fun () ->
Lwt.return_unit
end
end ;
} ;
nv.worker <-
Lwt_utils.worker
(Format.asprintf "net_validator.%a" Net_id.pp (State.Net.id net_state))
~run:(fun () -> worker_loop nv)
~cancel:(fun () -> Canceler.cancel nv.canceler) ;
Lwt.return nv
(** Current block computation *)
and worker_loop nv =
begin
Lwt_utils.protect ~canceler:nv.canceler begin fun () ->
Lwt_pipe.pop nv.queue >>= return
end >>=? fun block ->
Chain.head nv.net_state >>= fun head ->
let head_header = State.Block.header head
and head_hash = State.Block.hash head
and block_header = State.Block.header block
and block_hash = State.Block.hash block in
if
Fitness.(block_header.shell.fitness <= head_header.shell.fitness)
then
return ()
else begin
Chain.set_head nv.net_state block >>= fun previous ->
broadcast_head nv ~previous block >>= fun () ->
Prevalidator.flush nv.prevalidator block ; (* FIXME *)
may_switch_test_network nv block >>= fun () ->
Watcher.notify nv.new_head_input block ;
lwt_log_notice "update current head %a %a %a(%t)"
Block_hash.pp_short block_hash
Fitness.pp block_header.shell.fitness
Time.pp_hum block_header.shell.timestamp
(fun ppf ->
if Block_hash.equal head_hash block_header.shell.predecessor then
Format.fprintf ppf "same branch"
else
Format.fprintf ppf "changing branch") >>= fun () ->
return ()
end
end >>= function
| Ok () ->
worker_loop nv
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
Lwt.return_unit
| Error err ->
lwt_log_error "@[Unexpected error:@ %a@]"
pp_print_error err >>= fun () ->
Canceler.cancel nv.canceler >>= fun () ->
Lwt.return_unit
and may_switch_test_network nv block =
let create_child genesis protocol expiration =
if State.Net.allow_forked_network nv.net_state then begin
shutdown_child nv >>= fun () ->
begin
let net_id = Net_id.of_block_hash (State.Block.hash genesis) in
State.Net.get
(State.Net.global_state nv.net_state) net_id >>= function
| Ok net_state -> return net_state
| Error _ ->
State.fork_testnet
genesis protocol expiration >>=? fun net_state ->
Chain.head net_state >>= fun new_genesis_block ->
Watcher.notify nv.global_valid_block_input new_genesis_block ;
Watcher.notify nv.valid_block_input new_genesis_block ;
return net_state
end >>=? fun net_state ->
create
~parent:nv nv.block_validator
nv.global_valid_block_input
nv.db net_state >>= fun child ->
nv.child <- Some child ;
return ()
end else begin
(* Ignoring request... *)
return ()
end in
let check_child genesis protocol expiration current_time =
let activated =
match nv.child with
| None -> false
| Some child ->
Block_hash.equal
(State.Net.genesis child.net_state).block
genesis in
State.Block.read nv.net_state genesis >>=? fun genesis ->
begin
match nv.max_child_ttl with
| None -> Lwt.return expiration
| Some ttl ->
Lwt.return
(Time.min expiration
(Time.add (State.Block.timestamp genesis) (Int64.of_int ttl)))
end >>= fun local_expiration ->
let expired = Time.(local_expiration <= current_time) in
if expired && activated then
shutdown_child nv >>= return
else if not activated && not expired then
create_child genesis protocol expiration
else
return () in
begin
let block_header = State.Block.header block in
State.Block.test_network block >>= function
| Not_running -> shutdown_child nv >>= return
| Running { genesis ; protocol ; expiration } ->
check_child genesis protocol expiration
block_header.shell.timestamp
| Forking { protocol ; expiration } ->
create_child block protocol expiration
end >>= function
| Ok () -> Lwt.return_unit
| Error err ->
lwt_log_error "@[<v 2>Error while switch test network:@ %a@]"
Error_monad.pp_print_error err >>= fun () ->
Lwt.return_unit
(* TODO check the initial sequence of message when connecting to a new
peer, and the one when activating a network. *)
let create
?max_child_ttl
?bootstrap_threshold
block_validator global_valid_block_input global_db state =
(* hide the optional ?parent *)
create
?max_child_ttl
?bootstrap_threshold
block_validator global_valid_block_input global_db state
let net_id { net_state } = State.Net.id net_state
let net_state { net_state } = net_state
let prevalidator { prevalidator } = prevalidator
let net_db { net_db } = net_db
let child { child } = child
let validate_block nv ?(force = false) hash block operations =
assert (Block_hash.equal hash (Block_header.hash block)) ;
Chain.head nv.net_state >>= fun head ->
let head = State.Block.header head in
if
force || Fitness.(head.shell.fitness <= block.shell.fitness)
then
Block_validator.validate
~canceler:nv.canceler
~notify_new_block:(notify_new_block nv)
nv.block_validator nv.net_db hash block operations
else
failwith "Fitness too low"
let bootstrapped { bootstrapped_wakener } =
Lwt.protected (Lwt.waiter_of_wakener bootstrapped_wakener)
let valid_block_watcher { valid_block_input } =
Watcher.create_stream valid_block_input
let new_head_watcher { new_head_input } =
Watcher.create_stream new_head_input

View File

@ -0,0 +1,39 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type t
val create:
?max_child_ttl:int ->
?bootstrap_threshold:int ->
Block_validator.t ->
State.Block.t Watcher.input ->
Distributed_db.t ->
State.Net.t ->
t Lwt.t
val bootstrapped: t -> unit Lwt.t
val net_id: t -> Net_id.t
val net_state: t -> State.Net.t
val prevalidator: t -> Prevalidator.t
val net_db: t -> Distributed_db.net_db
val child: t -> t option
val validate_block:
t ->
?force:bool ->
Block_hash.t -> Block_header.t -> Operation.t list list ->
State.Block.t tzresult Lwt.t
val shutdown: t -> unit Lwt.t
val valid_block_watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper
val new_head_watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper

View File

@ -17,7 +17,7 @@ let inject_operation validator ?force bytes =
| Some operation -> | Some operation ->
Validator.get Validator.get
validator operation.shell.net_id >>=? fun net_validator -> validator operation.shell.net_id >>=? fun net_validator ->
let pv = Validator.prevalidator net_validator in let pv = Net_validator.prevalidator net_validator in
Prevalidator.inject_operation pv ?force operation in Prevalidator.inject_operation pv ?force operation in
let hash = Operation_hash.hash_bytes [bytes] in let hash = Operation_hash.hash_bytes [bytes] in
Lwt.return (hash, t) Lwt.return (hash, t)
@ -44,17 +44,14 @@ let inject_protocol state ?force:_ proto =
let inject_block validator ?force bytes operations = let inject_block validator ?force bytes operations =
Validator.inject_block Validator.inject_block
validator ?force validator ?force bytes operations >>=? fun (hash, block) ->
bytes operations >>=? fun (hash, block) ->
return (hash, (block >>=? fun _ -> return ())) return (hash, (block >>=? fun _ -> return ()))
type t = { type t = {
state: State.t ; state: State.t ;
distributed_db: Distributed_db.t ; distributed_db: Distributed_db.t ;
validator: Validator.t ; validator: Validator.t ;
mainnet_db: Distributed_db.net_db ; mainnet_validator: Net_validator.t ;
mainnet_net: State.Net.t ;
mainnet_validator: Validator.net_validator ;
inject_block: inject_block:
?force:bool -> ?force:bool ->
MBytes.t -> Distributed_db.operation list list -> MBytes.t -> Distributed_db.operation list list ->
@ -90,6 +87,7 @@ type config = {
patch_context: (Context.t -> Context.t Lwt.t) option ; patch_context: (Context.t -> Context.t Lwt.t) option ;
p2p: (P2p.config * P2p.limits) option ; p2p: (P2p.config * P2p.limits) option ;
test_network_max_tll: int option ; test_network_max_tll: int option ;
bootstrap_threshold: int ;
} }
let may_create_net state genesis = let may_create_net state genesis =
@ -101,16 +99,17 @@ let may_create_net state genesis =
let create { genesis ; store_root ; context_root ; let create { genesis ; store_root ; context_root ;
patch_context ; p2p = net_params ; patch_context ; p2p = net_params ;
test_network_max_tll = max_child_ttl } = test_network_max_tll = max_child_ttl ;
bootstrap_threshold } =
init_p2p net_params >>=? fun p2p -> init_p2p net_params >>=? fun p2p ->
State.read State.read
~store_root ~context_root ?patch_context () >>=? fun state -> ~store_root ~context_root ?patch_context () >>=? fun state ->
let distributed_db = Distributed_db.create state p2p in let distributed_db = Distributed_db.create state p2p in
let validator = Validator.create state distributed_db in let validator = Validator.create state distributed_db in
may_create_net state genesis >>= fun mainnet_net -> may_create_net state genesis >>= fun mainnet_state ->
Validator.activate validator Validator.activate validator
?max_child_ttl mainnet_net >>= fun mainnet_validator -> ~bootstrap_threshold
let mainnet_db = Validator.net_db mainnet_validator in ?max_child_ttl mainnet_state >>= fun mainnet_validator ->
let shutdown () = let shutdown () =
State.close state >>= fun () -> State.close state >>= fun () ->
P2p.shutdown p2p >>= fun () -> P2p.shutdown p2p >>= fun () ->
@ -121,8 +120,6 @@ let create { genesis ; store_root ; context_root ;
state ; state ;
distributed_db ; distributed_db ;
validator ; validator ;
mainnet_db ;
mainnet_net ;
mainnet_validator ; mainnet_validator ;
inject_block = inject_block validator ; inject_block = inject_block validator ;
inject_operation = inject_operation validator ; inject_operation = inject_operation validator ;
@ -190,35 +187,29 @@ module RPC = struct
Block_hash.of_b58check_exn Block_hash.of_b58check_exn
"BLockPrevaLidationPrevaLidationPrevaLidationPrZ4mr6" "BLockPrevaLidationPrevaLidationPrevaLidationPrZ4mr6"
let get_net node = function
| `Genesis | `Head _ | `Prevalidation ->
node.mainnet_validator, node.mainnet_db
| `Test_head _ | `Test_prevalidation ->
match Validator.test_validator node.mainnet_validator with
| None -> raise Not_found
| Some v -> v
let get_validator node = function let get_validator node = function
| `Genesis | `Head _ | `Prevalidation -> node.mainnet_validator | `Genesis | `Head _ | `Prevalidation -> node.mainnet_validator
| `Test_head _ | `Test_prevalidation -> | `Test_head _ | `Test_prevalidation ->
match Validator.test_validator node.mainnet_validator with match Net_validator.child node.mainnet_validator with
| None -> raise Not_found | None -> raise Not_found
| Some (v, _) -> v | Some v -> v
let get_validator_per_hash node hash = let get_validator_per_hash node hash =
State.read_block_exn node.state hash >>= fun block -> State.read_block_exn node.state hash >>= fun block ->
let header = State.Block.header block in let header = State.Block.header block in
if Net_id.equal if Net_id.equal
(State.Net.id node.mainnet_net) (Net_validator.net_id node.mainnet_validator)
header.shell.net_id then header.shell.net_id then
Lwt.return (Some (node.mainnet_validator, node.mainnet_db)) Lwt.return (Some node.mainnet_validator)
else else
match Validator.test_validator node.mainnet_validator with match Net_validator.child node.mainnet_validator with
| Some (test_validator, net_db) | Some test_validator ->
when Net_id.equal if Net_id.equal
(State.Net.id (Validator.net_state test_validator)) (Net_validator.net_id test_validator)
header.shell.net_id -> header.shell.net_id then
Lwt.return (Some (node.mainnet_validator, net_db)) Lwt.return_some test_validator
else
Lwt.return_none
| _ -> Lwt.return_none | _ -> Lwt.return_none
let read_valid_block node h = let read_valid_block node h =
@ -238,19 +229,20 @@ module RPC = struct
let block_info node (block: block) = let block_info node (block: block) =
match block with match block with
| `Genesis -> | `Genesis ->
Chain.genesis node.mainnet_net >>= convert let net_state = Net_validator.net_state node.mainnet_validator in
Chain.genesis net_state >>= convert
| ( `Head n | `Test_head n ) as block -> | ( `Head n | `Test_head n ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
let net_db = Validator.net_db validator in let net_db = Net_validator.net_db validator in
let net_state = Validator.net_state validator in let net_state = Net_validator.net_state validator in
Chain.head net_state >>= fun head -> Chain.head net_state >>= fun head ->
predecessor net_db n head >>= convert predecessor net_db n head >>= convert
| `Hash h -> | `Hash h ->
read_valid_block_exn node h >>= convert read_valid_block_exn node h >>= convert
| ( `Prevalidation | `Test_prevalidation ) as block -> | ( `Prevalidation | `Test_prevalidation ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
let pv = Validator.prevalidator validator in let pv = Net_validator.prevalidator validator in
let net_state = Validator.net_state validator in let net_state = Net_validator.net_state validator in
Chain.head net_state >>= fun head -> Chain.head net_state >>= fun head ->
let head_header = State.Block.header head in let head_header = State.Block.header head in
let head_hash = State.Block.hash head in let head_hash = State.Block.hash head in
@ -301,13 +293,14 @@ module RPC = struct
let get_rpc_context node block = let get_rpc_context node block =
match block with match block with
| `Genesis -> | `Genesis ->
Chain.genesis node.mainnet_net >>= fun block -> let net_state = Net_validator.net_state node.mainnet_validator in
Chain.genesis net_state >>= fun block ->
rpc_context block >>= fun ctxt -> rpc_context block >>= fun ctxt ->
Lwt.return (Some ctxt) Lwt.return (Some ctxt)
| ( `Head n | `Test_head n ) as block -> | ( `Head n | `Test_head n ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
let net_state = Validator.net_state validator in let net_state = Net_validator.net_state validator in
let net_db = Validator.net_db validator in let net_db = Net_validator.net_db validator in
Chain.head net_state >>= fun head -> Chain.head net_state >>= fun head ->
predecessor net_db n head >>= fun block -> predecessor net_db n head >>= fun block ->
rpc_context block >>= fun ctxt -> rpc_context block >>= fun ctxt ->
@ -321,9 +314,10 @@ module RPC = struct
Lwt.return (Some ctxt) Lwt.return (Some ctxt)
end end
| ( `Prevalidation | `Test_prevalidation ) as block -> | ( `Prevalidation | `Test_prevalidation ) as block ->
let validator, net_db = get_net node block in let validator = get_validator node block in
let pv = Validator.prevalidator validator in let pv = Net_validator.prevalidator validator in
let net_state = Validator.net_state validator in let net_db = Net_validator.net_db validator in
let net_state = Net_validator.net_state validator in
Chain.head net_state >>= fun head -> Chain.head net_state >>= fun head ->
let head_header = State.Block.header head in let head_header = State.Block.header head in
let head_hash = State.Block.hash head in let head_hash = State.Block.hash head in
@ -374,14 +368,14 @@ module RPC = struct
| `Genesis -> Lwt.return [] | `Genesis -> Lwt.return []
| ( `Head n | `Test_head n ) as block -> | ( `Head n | `Test_head n ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
let net_state = Validator.net_state validator in let net_state = Net_validator.net_state validator in
let net_db = Validator.net_db validator in let net_db = Net_validator.net_db validator in
Chain.head net_state >>= fun head -> Chain.head net_state >>= fun head ->
predecessor net_db n head >>= fun block -> predecessor net_db n head >>= fun block ->
State.Block.all_operation_hashes block State.Block.all_operation_hashes block
| (`Prevalidation | `Test_prevalidation) as block -> | (`Prevalidation | `Test_prevalidation) as block ->
let validator, _net = get_net node block in let validator = get_validator node block in
let pv = Validator.prevalidator validator in let pv = Net_validator.prevalidator validator in
let { Prevalidation.applied }, _ = Prevalidator.operations pv in let { Prevalidation.applied }, _ = Prevalidator.operations pv in
Lwt.return [applied] Lwt.return [applied]
| `Hash hash -> | `Hash hash ->
@ -395,14 +389,15 @@ module RPC = struct
| `Genesis -> Lwt.return [] | `Genesis -> Lwt.return []
| ( `Head n | `Test_head n ) as block -> | ( `Head n | `Test_head n ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
let net_state = Validator.net_state validator in let net_state = Net_validator.net_state validator in
let net_db = Validator.net_db validator in let net_db = Net_validator.net_db validator in
Chain.head net_state >>= fun head -> Chain.head net_state >>= fun head ->
predecessor net_db n head >>= fun block -> predecessor net_db n head >>= fun block ->
State.Block.all_operations block State.Block.all_operations block
| (`Prevalidation | `Test_prevalidation) as block -> | (`Prevalidation | `Test_prevalidation) as block ->
let validator, net_db = get_net node block in let validator = get_validator node block in
let pv = Validator.prevalidator validator in let net_db = Net_validator.net_db validator in
let pv = Net_validator.prevalidator validator in
let { Prevalidation.applied }, _ = Prevalidator.operations pv in let { Prevalidation.applied }, _ = Prevalidator.operations pv in
Lwt_list.map_p Lwt_list.map_p
(Distributed_db.Operation.read_exn net_db) applied >>= fun applied -> (Distributed_db.Operation.read_exn net_db) applied >>= fun applied ->
@ -417,32 +412,32 @@ module RPC = struct
match block with match block with
| ( `Head 0 | `Prevalidation | ( `Head 0 | `Prevalidation
| `Test_head 0 | `Test_prevalidation ) as block -> | `Test_head 0 | `Test_prevalidation ) as block ->
let validator, _net = get_net node block in let validator = get_validator node block in
let pv = Validator.prevalidator validator in let pv = Net_validator.prevalidator validator in
Lwt.return (Prevalidator.operations pv) Lwt.return (Prevalidator.operations pv)
| ( `Head n | `Test_head n ) as block -> | ( `Head n | `Test_head n ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
let prevalidator = Validator.prevalidator validator in let prevalidator = Net_validator.prevalidator validator in
let net_state = Validator.net_state validator in let net_state = Net_validator.net_state validator in
let net_db = Validator.net_db validator in let net_db = Net_validator.net_db validator in
Chain.head net_state >>= fun head -> Chain.head net_state >>= fun head ->
predecessor net_db n head >>= fun b -> predecessor net_db n head >>= fun b ->
Prevalidator.pending ~block:b prevalidator >|= fun ops -> Prevalidator.pending ~block:b prevalidator >|= fun ops ->
Prevalidation.empty_result, ops Prevalidation.empty_result, ops
| `Genesis -> | `Genesis ->
let net = node.mainnet_net in let net_state = Net_validator.net_state node.mainnet_validator in
Chain.genesis net >>= fun b -> let prevalidator =
let validator = get_validator node `Genesis in Net_validator.prevalidator node.mainnet_validator in
let prevalidator = Validator.prevalidator validator in Chain.genesis net_state >>= fun b ->
Prevalidator.pending ~block:b prevalidator >|= fun ops -> Prevalidator.pending ~block:b prevalidator >|= fun ops ->
Prevalidation.empty_result, ops Prevalidation.empty_result, ops
| `Hash h -> begin | `Hash h -> begin
get_validator_per_hash node h >>= function get_validator_per_hash node h >>= function
| None -> | None ->
Lwt.return (Prevalidation.empty_result, Operation_hash.Set.empty) Lwt.return (Prevalidation.empty_result, Operation_hash.Set.empty)
| Some (validator, net_db) -> | Some validator ->
let net_state = Distributed_db.net_state net_db in let net_state = Net_validator.net_state validator in
let prevalidator = Validator.prevalidator validator in let prevalidator = Net_validator.prevalidator validator in
State.Block.read_exn net_state h >>= fun block -> State.Block.read_exn net_state h >>= fun block ->
Prevalidator.pending ~block prevalidator >|= fun ops -> Prevalidator.pending ~block prevalidator >|= fun ops ->
Prevalidation.empty_result, ops Prevalidation.empty_result, ops
@ -461,17 +456,17 @@ module RPC = struct
begin begin
match block with match block with
| `Genesis -> | `Genesis ->
let net = node.mainnet_net in let net_state = Net_validator.net_state node.mainnet_validator in
Chain.genesis net >>= return Chain.genesis net_state >>= return
| ( `Head 0 | `Prevalidation | ( `Head 0 | `Prevalidation
| `Test_head 0 | `Test_prevalidation ) as block -> | `Test_head 0 | `Test_prevalidation ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
let net_state = Validator.net_state validator in let net_state = Net_validator.net_state validator in
Chain.head net_state >>= return Chain.head net_state >>= return
| `Head n | `Test_head n as block -> begin | `Head n | `Test_head n as block -> begin
let validator = get_validator node block in let validator = get_validator node block in
let net_state = Validator.net_state validator in let net_state = Net_validator.net_state validator in
let net_db = Validator.net_db validator in let net_db = Net_validator.net_db validator in
Chain.head net_state >>= fun head -> Chain.head net_state >>= fun head ->
predecessor net_db n head >>= return predecessor net_db n head >>= return
end end
@ -480,10 +475,11 @@ module RPC = struct
| None -> Lwt.return (error_exn Not_found) | None -> Lwt.return (error_exn Not_found)
| Some data -> return data | Some data -> return data
end >>=? fun predecessor -> end >>=? fun predecessor ->
let net_db = Validator.net_db node.mainnet_validator in let net_db = Net_validator.net_db node.mainnet_validator in
map_p (Distributed_db.resolve_operation net_db) ops >>=? fun rops -> map_p (Distributed_db.resolve_operation net_db) ops >>=? fun rops ->
Prevalidation.start_prevalidation Prevalidation.start_prevalidation
~proto_header ~predecessor ~timestamp () >>=? fun validation_state -> ~proto_header ~predecessor ~timestamp () >>=? fun validation_state ->
let rops = List.map (fun x -> Operation.hash x, x) rops in
Prevalidation.prevalidate Prevalidation.prevalidate
validation_state ~sort rops >>= fun (validation_state, r) -> validation_state ~sort rops >>= fun (validation_state, r) ->
let operations_hash = let operations_hash =
@ -535,12 +531,14 @@ module RPC = struct
Lwt.return (Some (RPC.map (fun _ -> ()) dir)) Lwt.return (Some (RPC.map (fun _ -> ()) dir))
let heads node = let heads node =
Chain.known_heads node.mainnet_net >>= fun heads -> let net_state = Net_validator.net_state node.mainnet_validator in
Chain.known_heads net_state >>= fun heads ->
begin begin
match Validator.test_validator node.mainnet_validator with match Net_validator.child node.mainnet_validator with
| None -> Lwt.return_nil | None -> Lwt.return_nil
| Some (_, net_db) -> | Some test_validator ->
Chain.known_heads (Distributed_db.net_state net_db) let net_state = Net_validator.net_state test_validator in
Chain.known_heads net_state
end >>= fun test_heads -> end >>= fun test_heads ->
Lwt_list.fold_left_s Lwt_list.fold_left_s
(fun map block -> (fun map block ->
@ -600,7 +598,7 @@ module RPC = struct
Distributed_db.watch_block_header node.distributed_db Distributed_db.watch_block_header node.distributed_db
let block_watcher node = let block_watcher node =
let stream, shutdown = Validator.global_watcher node.validator in let stream, shutdown = Validator.watcher node.validator in
Lwt_stream.map_s (fun block -> convert block) stream, Lwt_stream.map_s (fun block -> convert block) stream,
shutdown shutdown
@ -610,19 +608,15 @@ module RPC = struct
let protocol_watcher node = let protocol_watcher node =
Distributed_db.watch_protocol node.distributed_db Distributed_db.watch_protocol node.distributed_db
let validate node net_id block =
Validator.get node.validator net_id >>=? fun net_v ->
Validator.fetch_block net_v block >>=? fun _ ->
return ()
let bootstrapped node = let bootstrapped node =
let block_stream, stopper = let block_stream, stopper =
Validator.new_head_watcher node.mainnet_validator in Net_validator.new_head_watcher node.mainnet_validator in
let first_run = ref true in let first_run = ref true in
let next () = let next () =
if !first_run then begin if !first_run then begin
first_run := false ; first_run := false ;
Chain.head node.mainnet_net >>= fun head -> let net_state = Net_validator.net_state node.mainnet_validator in
Chain.head net_state >>= fun head ->
let head_hash = State.Block.hash head in let head_hash = State.Block.hash head in
let head_header = State.Block.header head in let head_header = State.Block.header head in
Lwt.return (Some (head_hash, head_header.shell.timestamp)) Lwt.return (Some (head_hash, head_header.shell.timestamp))
@ -631,7 +625,7 @@ module RPC = struct
( Lwt_stream.get block_stream >|= ( Lwt_stream.get block_stream >|=
map_option ~f:(fun b -> map_option ~f:(fun b ->
(State.Block.hash b, (State.Block.header b).shell.timestamp)) ) ; (State.Block.hash b, (State.Block.header b).shell.timestamp)) ) ;
(Validator.bootstrapped node.mainnet_validator >|= fun () -> None) ; (Net_validator.bootstrapped node.mainnet_validator >|= fun () -> None) ;
] ]
end in end in
let shutdown () = Watcher.shutdown stopper in let shutdown () = Watcher.shutdown stopper in

View File

@ -16,6 +16,7 @@ type config = {
patch_context: (Context.t -> Context.t Lwt.t) option ; patch_context: (Context.t -> Context.t Lwt.t) option ;
p2p: (P2p.config * P2p.limits) option ; p2p: (P2p.config * P2p.limits) option ;
test_network_max_tll: int option ; test_network_max_tll: int option ;
bootstrap_threshold: int ;
} }
val create: config -> t tzresult Lwt.t val create: config -> t tzresult Lwt.t
@ -84,8 +85,6 @@ module RPC : sig
sort_operations:bool -> Distributed_db.operation list -> sort_operations:bool -> Distributed_db.operation list ->
(Block_header.shell_header * error Prevalidation.preapply_result) tzresult Lwt.t (Block_header.shell_header * error Prevalidation.preapply_result) tzresult Lwt.t
val validate: t -> Net_id.t -> Block_hash.t -> unit tzresult Lwt.t
val context_dir: val context_dir:
t -> block -> 'a RPC.directory option Lwt.t t -> block -> 'a RPC.directory option Lwt.t

View File

@ -391,11 +391,6 @@ let build_rpc_directory node =
Data_encoding.Binary.to_bytes Block_header.encoding header in Data_encoding.Binary.to_bytes Block_header.encoding header in
RPC.Answer.return res in RPC.Answer.return res in
RPC.register0 dir Services.forge_block_header implementation in RPC.register0 dir Services.forge_block_header implementation in
let dir =
let implementation (net_id, block_hash) =
Node.RPC.validate node net_id block_hash >>= fun res ->
RPC.Answer.return res in
RPC.register0 dir Services.validate_block implementation in
let dir = let dir =
let implementation let implementation
{ Node_rpc_services.raw ; blocking ; force ; operations } = { Node_rpc_services.raw ; blocking ; force ; operations } =

View File

@ -619,18 +619,6 @@ let forge_block_header =
~output: (obj1 (req "block" bytes)) ~output: (obj1 (req "block" bytes))
RPC.Path.(root / "forge_block_header") RPC.Path.(root / "forge_block_header")
let validate_block =
RPC.service
~description:
"Force the node to fetch and validate the given block hash."
~input:
(obj2
(req "net" Net_id.encoding)
(req "hash" Block_hash.encoding))
~output:
(Error.wrap @@ empty)
RPC.Path.(root / "validate_block")
type inject_block_param = { type inject_block_param = {
raw: MBytes.t ; raw: MBytes.t ;
blocking: bool ; blocking: bool ;

View File

@ -179,9 +179,6 @@ end
val forge_block_header: val forge_block_header:
(unit, unit, Block_header.t, MBytes.t) RPC.service (unit, unit, Block_header.t, MBytes.t) RPC.service
val validate_block:
(unit, unit, Net_id.t * Block_hash.t, unit tzresult) RPC.service
type inject_block_param = { type inject_block_param = {
raw: MBytes.t ; raw: MBytes.t ;
blocking: bool ; blocking: bool ;

View File

@ -0,0 +1,289 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(* FIXME ignore/postpone fetching/validating of block in the future... *)
include Logging.Make(struct let name = "node.validator.peer" end)
module Canceler = Lwt_utils.Canceler
type msg =
| New_head of Block_hash.t * Block_header.t
| New_branch of Block_hash.t * Block_locator.t
type t = {
peer_id: P2p.Peer_id.t ;
net_db: Distributed_db.net_db ;
block_validator: Block_validator.t ;
(* callback to net_validator *)
notify_new_block: State.Block.t -> unit ;
notify_bootstrapped: unit -> unit ;
mutable bootstrapped: bool ;
mutable last_validated_head: Block_hash.t ;
mutable last_advertised_head: Block_hash.t ;
mutable worker: unit Lwt.t ;
dropbox: msg Lwt_dropbox.t ;
canceler: Canceler.t ;
}
type error +=
| Unknown_ancestor
| Known_invalid
let set_bootstrapped pv =
if not pv.bootstrapped then begin
pv.bootstrapped <- true ;
pv.notify_bootstrapped () ;
end
let bootstrap_new_branch pv _ancestor _head unknown_prefix =
let len = Block_locator.estimated_length unknown_prefix in
lwt_log_info
"validating new branch from peer %a (approx. %d blocks)"
P2p.Peer_id.pp_short pv.peer_id len >>= fun () ->
let pipeline =
Bootstrap_pipeline.create
~notify_new_block:pv.notify_new_block
pv.block_validator
pv.peer_id pv.net_db unknown_prefix in
Lwt_utils.protect ~canceler:pv.canceler
~on_error:begin fun error ->
(* if the peer_validator is killed, let's cancel the pipeline *)
Bootstrap_pipeline.cancel pipeline >>= fun () ->
Lwt.return_error error
end
begin fun () ->
Bootstrap_pipeline.wait pipeline
end >>=? fun () ->
set_bootstrapped pv ;
lwt_log_info
"done validating new branch from peer %a."
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
return ()
let validate_new_head pv hash (header : Block_header.t) =
let net_state = Distributed_db.net_state pv.net_db in
State.Block.known net_state header.shell.predecessor >>= function
| false ->
lwt_debug
"missing predecessor for new head %a from peer %a"
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
Distributed_db.Request.current_branch pv.net_db ~peer:pv.peer_id () ;
return ()
| true ->
lwt_debug
"fetching operations for new head %a from peer %a"
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
Distributed_db.inject_block_header pv.net_db hash header >>=? fun _ ->
(* TODO look for predownloaded (individual)
operations in the prevalidator ?? *)
map_p
(fun i ->
Lwt_utils.protect ~canceler:pv.canceler begin fun () ->
Distributed_db.Operations.fetch
~timeout:60. (* TODO allow to adjust the constant ... *)
pv.net_db ~peer:pv.peer_id
(hash, i) header.shell.operations_hash
end)
(0 -- (header.shell.validation_passes - 1)) >>=? fun operations ->
lwt_debug
"requesting validation for new head %a from peer %a"
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
Block_validator.validate
~notify_new_block:pv.notify_new_block
pv.block_validator pv.net_db
hash header operations >>=? fun _block ->
lwt_debug "end of validation for new head %a from peer %a"
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
set_bootstrapped pv ;
return ()
let may_validate_new_head pv hash header =
let net_state = Distributed_db.net_state pv.net_db in
State.Block.known net_state hash >>= function
| true -> begin
State.Block.known_valid net_state hash >>= function
| true ->
lwt_debug
"ignoring previously validated block %a from peer %a"
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
set_bootstrapped pv ;
pv.last_validated_head <- hash ;
return ()
| false ->
lwt_log_info
"ignoring known invalid block %a from peer %a"
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
fail Known_invalid
end
| false ->
validate_new_head pv hash header
let may_validate_new_branch pv distant_hash locator =
let distant_header, _ = (locator : Block_locator.t :> Block_header.t * _) in
let net_state = Distributed_db.net_state pv.net_db in
Chain.head net_state >>= fun local_header ->
if Fitness.compare
distant_header.Block_header.shell.fitness
(State.Block.fitness local_header) < 0 then begin
set_bootstrapped pv ;
lwt_debug
"ignoring branch %a with low fitness from peer: %a."
Block_hash.pp_short distant_hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
(* Don't bother with downloading a branch with a low fitness. *)
return ()
end else begin
let net_state = Distributed_db.net_state pv.net_db in
Block_locator.known_ancestor net_state locator >>= function
| None ->
lwt_log_info
"ignoring branch %a without common ancestor from peer: %a."
Block_hash.pp_short distant_hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
fail Unknown_ancestor
| Some (ancestor, unknown_prefix) ->
bootstrap_new_branch pv ancestor distant_header unknown_prefix
end
let rec worker_loop pv =
begin
Lwt_utils.protect ~canceler:pv.canceler begin fun () ->
(* TODO should the timeout be protocol dependent ?? *)
(* TODO or setup by the local admin ?? or a mix ??*)
Lwt_dropbox.take_with_timeout 90. pv.dropbox >>= return
end >>=? function
| None ->
lwt_log_info "no new head from peer %a for 90 seconds."
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
Distributed_db.Request.current_head pv.net_db ~peer:pv.peer_id () ;
return ()
| Some (New_head (hash, header)) ->
lwt_log_info "processing new head %a from peer %a."
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
may_validate_new_head pv hash header
| Some (New_branch (hash, locator)) ->
(* TODO penalize empty locator... ?? *)
lwt_log_info "processing new branch %a from peer %a."
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
may_validate_new_branch pv hash locator
end >>= function
| Ok () ->
worker_loop pv
| Error (( Unknown_ancestor
| Block_locator.Invalid_locator _
| Block_validator.Invalid_block _ ) :: _) ->
(* TODO ban the peer_id... *)
lwt_log_info "Terminating the validation worker for peer %a (kickban)."
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
Canceler.cancel pv.canceler >>= fun () ->
Lwt.return_unit
| Error [Block_validator.Unavailable_protocol { protocol } ] -> begin
Block_validator.fetch_and_compile_protocol
pv.block_validator
~peer:pv.peer_id ~timeout:60. protocol >>= function
| Ok _ -> worker_loop pv
| Error _ ->
(* TODO penality... *)
lwt_log_info "Terminating the validation worker for peer %a \
\ (missing protocol %a)."
P2p.Peer_id.pp_short pv.peer_id
Protocol_hash.pp_short protocol >>= fun () ->
Canceler.cancel pv.canceler >>= fun () ->
Lwt.return_unit
end
| Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_dropbox.Closed] ->
lwt_log_info "Terminating the validation worker for peer %a."
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
Lwt.return_unit
| Error err ->
lwt_log_error
"@[<v 2>Unexpected error in the validation worker for peer %a:@ \
\ %a@]"
P2p.Peer_id.pp_short pv.peer_id
pp_print_error err >>= fun () ->
Canceler.cancel pv.canceler >>= fun () ->
Lwt.return_unit
let create
?notify_new_block:(external_notify_new_block = fun _ -> ())
?(notify_bootstrapped = fun () -> ())
?(notify_termination = fun _ -> ())
block_validator net_db peer_id =
lwt_debug "creating validator for peer %a."
P2p.Peer_id.pp_short peer_id >>= fun () ->
let canceler = Canceler.create () in
let dropbox = Lwt_dropbox.create () in
let net_state = Distributed_db.net_state net_db in
let genesis = (State.Net.genesis net_state).block in
let rec notify_new_block block =
pv.last_validated_head <- State.Block.hash block ;
external_notify_new_block block
and pv = {
block_validator ;
notify_new_block ;
notify_bootstrapped ;
net_db ;
peer_id ;
bootstrapped = false ;
last_validated_head = genesis ;
last_advertised_head = genesis ;
canceler ;
dropbox ;
worker = Lwt.return_unit ;
} in
Canceler.on_cancel pv.canceler begin fun () ->
Lwt_dropbox.close pv.dropbox ;
Distributed_db.disconnect pv.net_db pv.peer_id >>= fun () ->
notify_termination pv ;
Lwt.return_unit
end ;
pv.worker <-
Lwt_utils.worker
(Format.asprintf "peer_validator.%a.%a"
Net_id.pp (State.Net.id net_state) P2p.Peer_id.pp_short peer_id)
~run:(fun () -> worker_loop pv)
~cancel:(fun () -> Canceler.cancel pv.canceler) ;
Lwt.return pv
let notify_branch pv locator =
let head, _ = (locator : Block_locator.t :> _ * _) in
let hash = Block_header.hash head in
pv.last_advertised_head <- hash ;
try Lwt_dropbox.put pv.dropbox (New_branch (hash, locator))
with Lwt_dropbox.Closed -> ()
let notify_head pv header =
let hash = Block_header.hash header in
pv.last_advertised_head <- hash ;
match Lwt_dropbox.peek pv.dropbox with
| Some (New_branch _) -> () (* ignore *)
| None | Some (New_head _) ->
try Lwt_dropbox.put pv.dropbox (New_head (hash, header))
with Lwt_dropbox.Closed -> ()
let shutdown pv =
Canceler.cancel pv.canceler >>= fun () ->
pv.worker
let peer_id pv = pv.peer_id
let bootstrapped pv = pv.bootstrapped
let current_head pv = pv.last_validated_head

View File

@ -0,0 +1,25 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type t
val peer_id: t -> P2p.Peer_id.t
val bootstrapped: t -> bool
val current_head: t -> Block_hash.t
val create:
?notify_new_block: (State.Block.t -> unit) ->
?notify_bootstrapped: (unit -> unit) ->
?notify_termination: (t -> unit) ->
Block_validator.t ->
Distributed_db.net_db -> P2p.Peer_id.t -> t Lwt.t
val shutdown: t -> unit Lwt.t
val notify_branch: t -> Block_locator.t -> unit
val notify_head: t -> Block_header.t -> unit

View File

@ -9,23 +9,31 @@
open Logging.Node.Prevalidator open Logging.Node.Prevalidator
let list_pendings ~from_block ~to_block old_mempool = let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool =
let rec pop_blocks ancestor block mempool = let rec pop_blocks ancestor block mempool =
let hash = State.Block.hash block in let hash = State.Block.hash block in
if Block_hash.equal hash ancestor then if Block_hash.equal hash ancestor then
Lwt.return mempool Lwt.return mempool
else else
State.Block.all_operation_hashes block >>= fun operations -> State.Block.all_operations block >>= fun operations ->
let mempool = Lwt_list.fold_left_s
List.fold_left (Lwt_list.fold_left_s (fun mempool op ->
(List.fold_left (fun mempool h -> Operation_hash.Set.add h mempool)) let h = Operation.hash op in
mempool operations in Lwt_utils.may maintain_net_db
~f:begin fun net_db ->
Distributed_db.inject_operation net_db h op >>= fun _ ->
Lwt.return_unit
end >>= fun () ->
Lwt.return (Operation_hash.Set.add h mempool)))
mempool operations >>= fun mempool ->
State.Block.predecessor block >>= function State.Block.predecessor block >>= function
| None -> assert false | None -> assert false
| Some predecessor -> pop_blocks ancestor predecessor mempool | Some predecessor -> pop_blocks ancestor predecessor mempool
in in
let push_block mempool block = let push_block mempool block =
State.Block.all_operation_hashes block >|= fun operations -> State.Block.all_operation_hashes block >|= fun operations ->
iter_option maintain_net_db
~f:(fun net_db -> Distributed_db.clear_operations net_db operations) ;
List.fold_left List.fold_left
(List.fold_left (fun mempool h -> Operation_hash.Set.remove h mempool)) (List.fold_left (fun mempool h -> Operation_hash.Set.remove h mempool))
mempool operations mempool operations
@ -238,21 +246,23 @@ let create net_db =
Operation_hash.Table.mem pending op Operation_hash.Table.mem pending op
|| Operation_hash.Set.mem op !live_operations) || Operation_hash.Set.mem op !live_operations)
ops in ops in
let fetch op = let fetch h =
Distributed_db.Operation.fetch Distributed_db.Operation.fetch
~timeout:10. (* TODO allow to adjust the constant ... *) ~timeout:10. (* TODO allow to adjust the constant ... *)
net_db ~peer:gid op () >>= function net_db ~peer:gid h () >>= function
| Ok _op -> | Ok _op ->
push_to_worker (`Handle op) ; push_to_worker (`Handle h) ;
Lwt.return_unit Lwt.return_unit
| Error [ Distributed_db.Operation.Canceled _ ] -> | Error [ Distributed_db.Operation.Canceled _ ] ->
lwt_debug lwt_debug
"operation %a included before being prevalidated" "operation %a included before being prevalidated"
Operation_hash.pp_short op >>= fun () -> Operation_hash.pp_short h >>= fun () ->
Operation_hash.Table.remove pending h ;
Lwt.return_unit Lwt.return_unit
| Error _ -> | Error _ ->
(* should not happen *) Operation_hash.Table.remove pending h ;
Lwt.return_unit in Lwt.return_unit
in
List.iter List.iter
(fun op -> Operation_hash.Table.add pending op (fetch op)) (fun op -> Operation_hash.Table.add pending op (fetch op))
unknown_ops ; unknown_ops ;
@ -272,7 +282,9 @@ let create net_db =
lwt_debug "register %a" Operation_hash.pp_short op >>= fun () -> lwt_debug "register %a" Operation_hash.pp_short op >>= fun () ->
Lwt.return_unit Lwt.return_unit
| `Flush (new_head : State.Block.t) -> | `Flush (new_head : State.Block.t) ->
list_pendings ~from_block:!head ~to_block:new_head list_pendings
~maintain_net_db:net_db
~from_block:!head ~to_block:new_head
(preapply_result_operations !operations) >>= fun new_mempool -> (preapply_result_operations !operations) >>= fun new_mempool ->
Chain_traversal.live_blocks Chain_traversal.live_blocks
new_head new_head
@ -294,7 +306,10 @@ let create net_db =
q >>= fun () -> q >>= fun () ->
worker_loop () worker_loop ()
in in
Lwt_utils.worker "prevalidator" ~run:worker_loop ~cancel in Lwt_utils.worker
(Format.asprintf "prevalidator.%a"
Net_id.pp (State.Net.id net_state))
~run:worker_loop ~cancel in
let flush head = let flush head =
push_to_worker (`Flush head) ; push_to_worker (`Flush head) ;

View File

@ -0,0 +1,196 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
include Logging.Make(struct let name = "node.validator.block" end)
module Canceler = Lwt_utils.Canceler
type 'a request =
| Request_validation: {
hash: Protocol_hash.t ;
protocol: Protocol.t ;
} -> State.Registred_protocol.t tzresult request
type message = Message: 'a request * 'a Lwt.u option -> message
type t = {
db: Distributed_db.t ;
mutable worker: unit Lwt.t ;
messages: message Lwt_pipe.t ;
canceler: Canceler.t ;
}
(** Block validation *)
type protocol_error =
| Compilation_failed
| Dynlinking_failed
let protocol_error_encoding =
let open Data_encoding in
union
[
case
(obj1
(req "error" (constant "compilation_failed")))
(function Compilation_failed -> Some ()
| _ -> None)
(fun () -> Compilation_failed) ;
case
(obj1
(req "error" (constant "dynlinking_failed")))
(function Dynlinking_failed -> Some ()
| _ -> None)
(fun () -> Dynlinking_failed) ;
]
let pp_protocol_error ppf = function
| Compilation_failed ->
Format.fprintf ppf "compilation error"
| Dynlinking_failed ->
Format.fprintf ppf "dynlinking error"
type error +=
| Invalid_protocol of { hash: Protocol_hash.t ; error: protocol_error }
let () =
Error_monad.register_error_kind
`Permanent
~id:"validator.invalid_protocol"
~title:"Invalid protocol"
~description:"Invalid protocol."
~pp:begin fun ppf (protocol, error) ->
Format.fprintf ppf
"@[<v 2>Invalid protocol %a@ %a@]"
Protocol_hash.pp_short protocol pp_protocol_error error
end
Data_encoding.(merge_objs
(obj1 (req "invalid_protocol" Protocol_hash.encoding))
protocol_error_encoding)
(function Invalid_protocol { hash ; error } ->
Some (hash, error) | _ -> None)
(fun (hash, error) ->
Invalid_protocol { hash ; error })
let rec worker_loop bv =
begin
Lwt_utils.protect ~canceler:bv.canceler begin fun () ->
Lwt_pipe.pop bv.messages >>= return
end >>=? function Message (request, wakener) ->
match request with
| Request_validation { hash ; protocol } ->
Updater.compile hash protocol >>= fun valid ->
begin
if valid then
Distributed_db.commit_protocol bv.db hash protocol
else
(* no need to tag 'invalid' protocol on disk,
the economic protocol prevents us from
being spammed with protocol validation. *)
return true
end >>=? fun _ ->
match wakener with
| None ->
return ()
| Some wakener ->
if valid then
match State.Registred_protocol.get hash with
| Some protocol ->
Lwt.wakeup_later wakener (Ok protocol)
| None ->
Lwt.wakeup_later wakener
(Error
[Invalid_protocol { hash ;
error = Dynlinking_failed }])
else
Lwt.wakeup_later wakener
(Error
[Invalid_protocol { hash ;
error = Compilation_failed }]) ;
return ()
end >>= function
| Ok () ->
worker_loop bv
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
lwt_log_notice "terminating" >>= fun () ->
Lwt.return_unit
| Error err ->
lwt_log_error "@[Unexpected error (worker):@ %a@]"
pp_print_error err >>= fun () ->
Canceler.cancel bv.canceler >>= fun () ->
Lwt.return_unit
let create db =
let canceler = Canceler.create () in
let messages = Lwt_pipe.create () in
let bv = {
canceler ; messages ; db ;
worker = Lwt.return_unit } in
Canceler.on_cancel bv.canceler begin fun () ->
Lwt_pipe.close bv.messages ;
Lwt.return_unit
end ;
bv.worker <-
Lwt_utils.worker "block_validator"
~run:(fun () -> worker_loop bv)
~cancel:(fun () -> Canceler.cancel bv.canceler) ;
bv
let shutdown { canceler ; worker } =
Canceler.cancel canceler >>= fun () ->
worker
let validate { messages } hash protocol =
match State.Registred_protocol.get hash with
| Some protocol ->
lwt_debug "previously validated protocol %a (before pipe)"
Protocol_hash.pp_short hash >>= fun () ->
return protocol
| None ->
let res, wakener = Lwt.task () in
lwt_debug "pushing validation request for protocol %a"
Protocol_hash.pp_short hash >>= fun () ->
Lwt_pipe.push messages
(Message (Request_validation { hash ; protocol },
Some wakener)) >>= fun () ->
res
let fetch_and_compile_protocol pv ?peer ?timeout hash =
match State.Registred_protocol.get hash with
| Some proto -> return proto
| None ->
begin
Distributed_db.Protocol.read_opt pv.db hash >>= function
| Some protocol -> return protocol
| None ->
lwt_log_notice "Fetching protocol %a from peer "
Protocol_hash.pp_short hash >>= fun () ->
Distributed_db.Protocol.fetch pv.db ?peer ?timeout hash ()
end >>=? fun protocol ->
validate pv hash protocol >>=? fun proto ->
return proto
let fetch_and_compile_protocols pv ?peer ?timeout (block: State.Block.t) =
State.Block.context block >>= fun context ->
let protocol =
Context.get_protocol context >>= fun protocol_hash ->
fetch_and_compile_protocol pv ?peer ?timeout protocol_hash >>=? fun _ ->
return ()
and test_protocol =
Context.get_test_network context >>= function
| Not_running -> return ()
| Forking { protocol }
| Running { protocol } ->
fetch_and_compile_protocol pv ?peer ?timeout protocol >>=? fun _ ->
return () in
protocol >>=? fun () ->
test_protocol >>=? fun () ->
return ()
let prefetch_and_compile_protocols pv ?peer ?timeout block =
try ignore (fetch_and_compile_protocols pv ?peer ?timeout block) with _ -> ()

View File

@ -0,0 +1,46 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
type t
type protocol_error =
| Compilation_failed
| Dynlinking_failed
type error +=
| Invalid_protocol of
{ hash: Protocol_hash.t ; error: protocol_error }
val create: Distributed_db.t -> t
val validate:
t ->
Protocol_hash.t -> Protocol.t ->
State.Registred_protocol.t tzresult Lwt.t
val shutdown: t -> unit Lwt.t
val fetch_and_compile_protocol:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
Protocol_hash.t -> State.Registred_protocol.t tzresult Lwt.t
val fetch_and_compile_protocols:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
State.Block.t -> unit tzresult Lwt.t
val prefetch_and_compile_protocols:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
State.Block.t -> unit

View File

@ -338,6 +338,8 @@ module Block = struct
let max_operations_ttl { contents = { max_operations_ttl } } = let max_operations_ttl { contents = { max_operations_ttl } } =
max_operations_ttl max_operations_ttl
let is_genesis b = Block_hash.equal b.hash b.net_state.genesis.block
let known_valid net_state hash = let known_valid net_state hash =
Shared.use net_state.block_store begin fun store -> Shared.use net_state.block_store begin fun store ->
Store.Block.Contents.known (store, hash) Store.Block.Contents.known (store, hash)

View File

@ -126,6 +126,7 @@ module Block : sig
val message: t -> string val message: t -> string
val max_operations_ttl: t -> int val max_operations_ttl: t -> int
val is_genesis: t -> bool
val predecessor: t -> block option Lwt.t val predecessor: t -> block option Lwt.t
val context: t -> Context.t Lwt.t val context: t -> Context.t Lwt.t

View File

@ -7,923 +7,72 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
open Logging.Node.Validator include Logging.Make(struct let name = "node.validator" end)
module Canceler = Lwt_utils.Canceler
type t = { type t = {
activate: ?parent:net_validator -> ?max_child_ttl:int -> State.Net.t -> net_validator Lwt.t ;
get: Net_id.t -> net_validator tzresult Lwt.t ; state: State.t ;
get_exn: Net_id.t -> net_validator Lwt.t ;
deactivate: net_validator -> unit Lwt.t ;
inject_block:
?force:bool ->
MBytes.t -> Distributed_db.operation list list ->
(Block_hash.t * State.Block.t tzresult Lwt.t) tzresult Lwt.t ;
notify_block: Block_hash.t -> Block_header.t -> unit Lwt.t ;
shutdown: unit -> unit Lwt.t ;
valid_block_input: State.Block.t Watcher.input ;
db: Distributed_db.t ; db: Distributed_db.t ;
block_validator: Block_validator.t ;
valid_block_input: State.Block.t Watcher.input ;
active_nets: Net_validator.t Lwt.t Net_id.Table.t ;
} }
and net_validator = {
net: State.Net.t ;
worker: t ;
parent: net_validator option ;
mutable child: net_validator option ;
prevalidator: Prevalidator.t ;
net_db: Distributed_db.net_db ;
notify_block: Block_hash.t -> Block_header.t -> unit Lwt.t ;
fetch_block: Block_hash.t -> State.Block.t tzresult Lwt.t ;
create_child:
State.Block.t -> Protocol_hash.t -> Time.t -> unit tzresult Lwt.t ;
check_child:
Block_hash.t -> Protocol_hash.t -> Time.t -> Time.t -> unit tzresult Lwt.t ;
deactivate_child: unit -> unit Lwt.t ;
test_validator: unit -> (net_validator * Distributed_db.net_db) option ;
shutdown: unit -> unit Lwt.t ;
valid_block_input_for_net: State.Block.t Watcher.input ;
new_head_input: State.Block.t Watcher.input ;
bootstrapped: unit Lwt.t ;
}
let net_state { net } = net
let net_db { net_db } = net_db
let activate w ?max_child_ttl net = w.activate ?max_child_ttl net
let deactivate net_validator = net_validator.worker.deactivate net_validator
let get w = w.get
let get_exn w = w.get_exn
let notify_block w = w.notify_block
let inject_block w = w.inject_block
let shutdown w = w.shutdown ()
let test_validator w = w.test_validator ()
let fetch_block v = v.fetch_block
let prevalidator v = v.prevalidator
let bootstrapped v = v.bootstrapped
(** Current block computation *)
let fetch_protocol v hash =
lwt_log_notice "Fetching protocol %a"
Protocol_hash.pp_short hash >>= fun () ->
Distributed_db.Protocol.fetch v.worker.db hash () >>=? fun protocol ->
Updater.compile hash protocol >>= fun valid ->
if valid then begin
lwt_log_notice "Successfully compiled protocol %a"
Protocol_hash.pp_short hash >>= fun () ->
Distributed_db.commit_protocol v.worker.db hash >>=? fun _ ->
return true
end else begin
lwt_log_error "Failed to compile protocol %a"
Protocol_hash.pp_short hash >>= fun () ->
failwith "Cannot compile the protocol %a" Protocol_hash.pp_short hash
end
let fetch_protocols v (block: State.Block.t) =
State.Block.context block >>= fun context ->
let proto_updated =
Context.get_protocol context >>= fun protocol_hash ->
match State.Registred_protocol.get protocol_hash with
| Some _ -> return false
| None -> fetch_protocol v protocol_hash
and test_proto_updated =
Context.get_test_network context >>= function
| Not_running -> return false
| Forking { protocol }
| Running { protocol } ->
match State.Registred_protocol.get protocol with
| Some _ -> return false
| None -> fetch_protocol v protocol in
proto_updated >>=? fun proto_updated ->
test_proto_updated >>=? fun test_proto_updated ->
return (proto_updated && test_proto_updated)
let rec may_set_head v (block: State.Block.t) =
Chain.head v.net >>= fun head ->
let head_header = State.Block.header head
and head_hash = State.Block.hash head
and block_header = State.Block.header block
and block_hash = State.Block.hash block in
if
Fitness.compare
head_header.shell.fitness block_header.shell.fitness >= 0
then
Lwt.return_unit
else begin
Chain.test_and_set_head v.net ~old:head block >>= function
| false -> may_set_head v block
| true ->
Distributed_db.Advertise.current_head v.net_db block ;
Prevalidator.flush v.prevalidator block ;
begin
begin
State.Block.test_network block >>= function
| Not_running -> v.deactivate_child () >>= return
| Running { genesis ; protocol ; expiration } ->
v.check_child genesis protocol expiration
block_header.shell.timestamp
| Forking { protocol ; expiration } ->
v.create_child block protocol expiration
end >>= function
| Ok () -> Lwt.return_unit
| Error err ->
lwt_log_error "@[<v 2>Error while switch test network:@ %a@]"
Error_monad.pp_print_error err
end >>= fun () ->
Watcher.notify v.new_head_input block ;
lwt_log_notice "update current head %a %a %a(%t)"
Block_hash.pp_short block_hash
Fitness.pp block_header.shell.fitness
Time.pp_hum block_header.shell.timestamp
(fun ppf ->
if Block_hash.equal head_hash block_header.shell.predecessor then
Format.fprintf ppf "same branch"
else
Format.fprintf ppf "changing branch") >>= fun () ->
Lwt.return_unit
end
(** Block validation *)
type error +=
| Invalid_operation of Operation_hash.t
| Invalid_fitness of { block: Block_hash.t ;
expected: Fitness.t ;
found: Fitness.t }
| Unknown_protocol
| Non_increasing_timestamp
| Non_increasing_fitness
| Wrong_level of Int32.t * Int32.t
| Wrong_proto_level of int * int
| Replayed_operation of Operation_hash.t
| Outdated_operation of Operation_hash.t * Block_hash.t
let () =
Error_monad.register_error_kind
`Permanent
~id:"validator.invalid_fitness"
~title:"Invalid fitness"
~description:"The computed fitness differs from the fitness found \
\ in the block header."
~pp:(fun ppf (block, expected, found) ->
Format.fprintf ppf
"@[<v 2>Invalid fitness for block %a@ \
\ expected %a@ \
\ found %a"
Block_hash.pp_short block
Fitness.pp expected
Fitness.pp found)
Data_encoding.(obj3
(req "block" Block_hash.encoding)
(req "expected" Fitness.encoding)
(req "found" Fitness.encoding))
(function Invalid_fitness { block ; expected ; found } ->
Some (block, expected, found) | _ -> None)
(fun (block, expected, found) ->
Invalid_fitness { block ; expected ; found }) ;
register_error_kind
`Permanent
~id:"validator.wrong_level"
~title:"Wrong level"
~description:"The block level is not the expected one"
~pp:(fun ppf (e, g) ->
Format.fprintf ppf
"The declared level %ld is not %ld" g e)
Data_encoding.(obj2
(req "expected" int32)
(req "provided" int32))
(function Wrong_level (e, g) -> Some (e, g) | _ -> None)
(fun (e, g) -> Wrong_level (e, g)) ;
register_error_kind
`Permanent
~id:"validator.wrong_proto_level"
~title:"Wrong protocol level"
~description:"The protocol level is not the expected one"
~pp:(fun ppf (e, g) ->
Format.fprintf ppf
"The declared protocol level %d is not %d" g e)
Data_encoding.(obj2
(req "expected" uint8)
(req "provided" uint8))
(function Wrong_proto_level (e, g) -> Some (e, g) | _ -> None)
(fun (e, g) -> Wrong_proto_level (e, g)) ;
register_error_kind
`Permanent
~id:"validator.replayed_operation"
~title:"Replayed operation"
~description:"The block contains an operation that was previously \
included in the chain"
~pp:(fun ppf oph ->
Format.fprintf ppf
"The operation %a was previously included in the chain."
Operation_hash.pp oph)
Data_encoding.(obj1 (req "hash" Operation_hash.encoding))
(function Replayed_operation oph -> Some oph | _ -> None)
(function oph -> Replayed_operation oph) ;
register_error_kind
`Permanent
~id:"validator.outdated_operations"
~title:"Outdated operation"
~description:"The block contains an operation which is outdated."
~pp:(fun ppf (oph, bh)->
Format.fprintf ppf
"The operation %a is outdated (%a)"
Operation_hash.pp oph
Block_hash.pp bh)
Data_encoding.(obj2
(req "operation" Operation_hash.encoding)
(req "block" Block_hash.encoding))
(function Outdated_operation (oph, bh) -> Some (oph, bh) | _ -> None)
(function (oph, bh) -> Outdated_operation (oph, bh))
let apply_block net_state db
(pred: State.Block.t) hash (block: Block_header.t) =
let pred_header = State.Block.header pred
and pred_hash = State.Block.hash pred in
State.Block.context pred >>= fun pred_context ->
let id = State.Net.id net_state in
lwt_log_notice "validate block %a (after %a), net %a"
Block_hash.pp_short hash
Block_hash.pp_short block.shell.predecessor
Net_id.pp id >>= fun () ->
fail_unless
(Int32.succ pred_header.shell.level = block.shell.level)
(Wrong_level (Int32.succ pred_header.shell.level,
block.shell.level)) >>=? fun () ->
lwt_log_info "validation of %a: looking for dependencies..."
Block_hash.pp_short hash >>= fun () ->
Distributed_db.Operations.fetch
db (hash, 0) block.shell.operations_hash >>=? fun operations ->
fail_unless (block.shell.validation_passes <= 1)
(* TODO constant to be exported from the protocol... *)
(failure "unexpected error (TO BE REMOVED)") >>=? fun () ->
let operation_hashes = List.map Operation.hash operations in
lwt_debug "validation of %a: found operations"
Block_hash.pp_short hash >>= fun () ->
begin (* Are we validating a block in an expired test network ? *)
match State.Net.expiration net_state with
| Some eol when Time.(eol <= block.shell.timestamp) ->
failwith "This test network expired..."
| None | Some _ -> return ()
end >>=? fun () ->
begin
if Time.(pred_header.shell.timestamp >= block.shell.timestamp) then
fail Non_increasing_timestamp
else
return ()
end >>=? fun () ->
begin
if Fitness.compare pred_header.shell.fitness block.shell.fitness >= 0 then
fail Non_increasing_fitness
else
return ()
end >>=? fun () ->
begin
Chain_traversal.live_blocks
pred (State.Block.max_operations_ttl pred) >>= fun (live_blocks,
live_operations) ->
let rec assert_no_duplicates live_operations = function
| [] -> return ()
| oph :: ophs ->
if Operation_hash.Set.mem oph live_operations then
fail (Replayed_operation oph)
else
assert_no_duplicates
(Operation_hash.Set.add oph live_operations) ophs in
let assert_live operations =
List.fold_left
(fun acc op ->
acc >>=? fun () ->
fail_unless
(Block_hash.Set.mem op.Operation.shell.branch live_blocks)
(Outdated_operation (Operation.hash op, op.shell.branch)))
(return ()) operations in
assert_no_duplicates live_operations operation_hashes >>=? fun () ->
assert_live operations
end >>=? fun () ->
Context.get_protocol pred_context >>= fun pred_protocol_hash ->
begin
match State.Registred_protocol.get pred_protocol_hash with
| None -> fail Unknown_protocol
| Some p -> return p
end >>=? fun (module Proto) ->
lwt_debug "validation of %a: Proto %a"
Block_hash.pp_short hash
Protocol_hash.pp_short Proto.hash >>= fun () ->
lwt_debug "validation of %a: parsing header..."
Block_hash.pp_short hash >>= fun () ->
lwt_debug "validation of %a: parsing operations..."
Block_hash.pp_short hash >>= fun () ->
map2_s
(fun op_hash raw ->
Lwt.return (Proto.parse_operation op_hash raw)
|> trace (Invalid_operation op_hash))
operation_hashes
operations >>=? fun parsed_operations ->
lwt_debug "validation of %a: applying block..."
Block_hash.pp_short hash >>= fun () ->
Context.reset_test_network
pred_context pred_hash block.shell.timestamp >>= fun context ->
Proto.begin_application
~predecessor_context:context
~predecessor_timestamp:pred_header.shell.timestamp
~predecessor_fitness:pred_header.shell.fitness
block >>=? fun state ->
fold_left_s (fun state op ->
Proto.apply_operation state op >>=? fun state ->
return state)
state parsed_operations >>=? fun state ->
Proto.finalize_block state >>=? fun new_context ->
Context.get_protocol new_context.context >>= fun new_protocol ->
let expected_proto_level =
if Protocol_hash.equal new_protocol pred_protocol_hash then
pred_header.shell.proto_level
else
(pred_header.shell.proto_level + 1) mod 256 in
fail_when (block.shell.proto_level <> expected_proto_level)
(Wrong_proto_level (block.shell.proto_level, expected_proto_level))
>>=? fun () ->
fail_unless
(Fitness.equal new_context.fitness block.shell.fitness)
(Invalid_fitness
{ block = hash ;
expected = block.shell.fitness ;
found = new_context.fitness ;
}) >>=? fun () ->
let max_operations_ttl =
max 0
(min
((State.Block.max_operations_ttl pred)+1)
new_context.max_operations_ttl) in
let new_context =
{ new_context with max_operations_ttl } in
lwt_log_info "validation of %a: success"
Block_hash.pp_short hash >>= fun () ->
return new_context
(** *)
module Context_db = struct
type data =
{ validator: net_validator ;
state: [ `Inited of Block_header.t tzresult
| `Initing of Block_header.t tzresult Lwt.t
| `Running of State.Block.t tzresult Lwt.t ] ;
wakener: State.Block.t tzresult Lwt.u }
type context =
{ tbl : data Block_hash.Table.t ;
canceler : Lwt_utils.Canceler.t ;
worker_trigger: unit -> unit;
worker_waiter: unit -> unit Lwt.t ;
worker: unit Lwt.t ;
net_db : Distributed_db.net_db ;
net_state : State.Net.t }
let pending_requests { tbl } =
Block_hash.Table.fold
(fun h data acc ->
match data.state with
| `Initing _ -> acc
| `Running _ -> acc
| `Inited d -> (h, d, data) :: acc)
tbl []
let pending { tbl } hash = Block_hash.Table.mem tbl hash
let request validator { tbl ; worker_trigger ; net_db } hash =
assert (not (Block_hash.Table.mem tbl hash));
let waiter, wakener = Lwt.wait () in
let data =
Distributed_db.Block_header.fetch net_db hash () in
match Lwt.state data with
| Lwt.Return data ->
let state = `Inited data in
Block_hash.Table.add tbl hash { validator ; state ; wakener } ;
worker_trigger () ;
waiter
| _ ->
let state = `Initing data in
Block_hash.Table.add tbl hash { validator ; state ; wakener } ;
Lwt.async
(fun () ->
data >>= fun data ->
let state = `Inited data in
Block_hash.Table.replace tbl hash { validator ; state ; wakener } ;
worker_trigger () ;
Lwt.return_unit) ;
waiter
let prefetch validator ({ net_state ; tbl } as session) hash =
Lwt.ignore_result
(State.Block.known_valid net_state hash >>= fun exists ->
if not exists && not (Block_hash.Table.mem tbl hash) then
request validator session hash >>= fun _ -> Lwt.return_unit
else
Lwt.return_unit)
let known { net_state } hash =
State.Block.known_valid net_state hash
let read { net_state } hash =
State.Block.read net_state hash
let fetch ({ net_state ; tbl } as session) validator hash =
try Lwt.waiter_of_wakener (Block_hash.Table.find tbl hash).wakener
with Not_found ->
State.Block.known_invalid net_state hash >>= fun known_invalid ->
if known_invalid then
Lwt.return (Error [failure "Invalid predecessor"])
else
State.Block.read_opt net_state hash >>= function
| Some op ->
Lwt.return (Ok op)
| None ->
try Lwt.waiter_of_wakener (Block_hash.Table.find tbl hash).wakener
with Not_found -> request validator session hash
let store { net_db ; tbl } hash data =
begin
match data with
| Ok data -> begin
Distributed_db.commit_block net_db hash data >>=? function
| None ->
(* Should not happen if the block is not validated twice *)
assert false
| Some block ->
return (Ok block)
end
| Error err ->
Distributed_db.commit_invalid_block net_db hash >>=? fun changed ->
assert changed ;
return (Error err)
end >>= function
| Ok block ->
let wakener = (Block_hash.Table.find tbl hash).wakener in
Block_hash.Table.remove tbl hash;
Lwt.wakeup wakener block ;
Lwt.return_unit
| Error _ as err ->
let wakener = (Block_hash.Table.find tbl hash).wakener in
Block_hash.Table.remove tbl hash;
Lwt.wakeup wakener err ;
Lwt.return_unit
let process (v: net_validator) ~get_context ~set_context hash block =
let net_state = Distributed_db.net_state v.net_db in
get_context v block.Block_header.shell.predecessor >>= function
| Error _ as error ->
Lwt_unix.yield () >>= fun () ->
set_context v hash (Error [(* TODO *)]) >>= fun () ->
Lwt.return error
| Ok _context ->
lwt_debug "process %a" Block_hash.pp_short hash >>= fun () ->
begin
Chain.genesis net_state >>= fun genesis ->
if Block_hash.equal (State.Block.hash genesis)
block.shell.predecessor then
Lwt.return genesis
else
State.Block.read_exn net_state block.shell.predecessor
end >>= fun pred ->
apply_block net_state v.net_db pred hash block >>= function
| Error ([Unknown_protocol] as err) as error ->
lwt_log_error
"@[<v 2>Ignoring block %a@ %a@]"
Block_hash.pp_short hash
Error_monad.pp_print_error err >>= fun () ->
Lwt.return error
| Error exns as error ->
set_context v hash error >>= fun () ->
lwt_warn "Failed to validate block %a."
Block_hash.pp_short hash >>= fun () ->
lwt_debug "%a" Error_monad.pp_print_error exns >>= fun () ->
Lwt.return error
| Ok new_context ->
(* The sanity check `set_context` detects differences
between the computed fitness and the fitness announced
in the block header. Then `Block.read` will
return an error. *)
set_context v hash (Ok new_context) >>= fun () ->
State.Block.read net_state hash >>= function
| Error err as error ->
lwt_log_error
"@[<v 2>Ignoring block %a@ %a@]"
Block_hash.pp_short hash
Error_monad.pp_print_error err >>= fun () ->
Lwt.return error
| Ok block ->
lwt_debug
"validation of %a: reevaluate current block"
Block_hash.pp_short hash >>= fun () ->
Watcher.notify v.worker.valid_block_input block ;
Watcher.notify v.valid_block_input_for_net block ;
fetch_protocols v block >>=? fun _fetched ->
may_set_head v block >>= fun () ->
return block
let request session ~get_context ~set_context pendings =
let time = Time.now () in
let min_block b pb =
match pb with
| None -> Some b
| Some pb
when b.Block_header.shell.timestamp
< pb.Block_header.shell.timestamp ->
Some b
| Some _ as pb -> pb in
let next =
List.fold_left
(fun acc (hash, block, (data : data)) ->
match block with
| Error _ ->
acc
| Ok block ->
if Time.(block.Block_header.shell.timestamp > time) then
min_block block acc
else begin
Block_hash.Table.replace session.tbl hash { data with state = `Running begin
Lwt_main.yield () >>= fun () ->
process data.validator ~get_context ~set_context hash block >>= fun res ->
Block_hash.Table.remove session.tbl hash ;
Lwt.return res
end } ;
acc
end)
None
pendings in
match next with
| None -> 0.
| Some b -> Int64.to_float (Time.diff b.Block_header.shell.timestamp time)
let create net_db =
let net_state = Distributed_db.net_state net_db in
let tbl = Block_hash.Table.create 50 in
let canceler = Lwt_utils.Canceler.create () in
let worker_trigger, worker_waiter = Lwt_utils.trigger () in
let session =
{ tbl ; net_db ; net_state ; worker = Lwt.return () ;
canceler ; worker_trigger ; worker_waiter } in
let worker =
let rec worker_loop () =
Lwt_utils.protect ~canceler begin fun () ->
worker_waiter () >>= return
end >>= function
| Error [Lwt_utils.Canceled] -> Lwt.return_unit
| Error err ->
lwt_log_error
"@[Unexpected error in validation:@ %a@]"
pp_print_error err >>= fun () ->
worker_loop ()
| Ok () ->
begin
match pending_requests session with
| [] -> ()
| requests ->
let set_context _validator hash context =
store session hash context >>= fun _ ->
Lwt.return_unit in
let timeout =
request session
~get_context:(fetch session)
~set_context requests in
if timeout > 0. then
Lwt.ignore_result
(Lwt_unix.sleep timeout >|= worker_trigger);
end ;
worker_loop ()
in
Lwt_utils.worker "validation"
~run:worker_loop
~cancel:(fun () -> Lwt_utils.Canceler.cancel canceler) in
{ session with worker }
let shutdown { canceler ; worker } =
Lwt_utils.Canceler.cancel canceler >>= fun () -> worker
end
let create_validator ?parent worker ?max_child_ttl state db net =
let net_id = State.Net.id net in
let net_db = Distributed_db.activate db net in
let session = Context_db.create net_db in
let queue = Lwt_pipe.create () in
Prevalidator.create net_db >>= fun prevalidator ->
let new_blocks = ref Lwt.return_unit in
Distributed_db.set_callback net_db {
notify_branch = begin fun gid locator ->
Lwt.async (fun () -> Lwt_pipe.push queue (`Branch (gid, locator)))
end ;
notify_head = begin fun gid block ops ->
Lwt.async (fun () -> Lwt_pipe.push queue (`Head (gid, Block_header.hash block, ops))) ;
end ;
disconnection = (fun _gid -> ()) ;
} ;
let shutdown () =
lwt_log_notice "shutdown %a" Net_id.pp net_id >>= fun () ->
Distributed_db.deactivate net_db >>= fun () ->
Lwt_pipe.close queue ;
Lwt.join [
Context_db.shutdown session ;
!new_blocks ;
Prevalidator.shutdown prevalidator ;
]
in
let valid_block_input_for_net = Watcher.create_input () in
let new_head_input = Watcher.create_input () in
let bootstrapped =
(* TODO improve by taking current peers count and current
locators into account... *)
let stream, stopper =
Watcher.create_stream valid_block_input_for_net in
let rec wait () =
Lwt.pick [ ( Lwt_stream.get stream ) ;
( Lwt_unix.sleep 30. >|= fun () -> None) ] >>= function
| Some block when
Time.((State.Block.header block).shell.timestamp < add (Time.now ()) (-60L)) ->
wait ()
| _ ->
Chain.head net >>= fun head ->
Chain.genesis net >>= fun genesis ->
if State.Block.equal head genesis then
wait ()
else
Lwt.return_unit in
let net_validator =
wait () >>= fun () ->
Watcher.shutdown stopper ;
Lwt.return_unit in
Lwt.no_cancel net_validator
in
let rec v = {
net ;
worker ;
parent ;
child = None ;
prevalidator ;
net_db ;
shutdown ;
notify_block ;
fetch_block ;
create_child ;
check_child ;
deactivate_child ;
test_validator ;
bootstrapped ;
new_head_input ;
valid_block_input_for_net ;
}
and notify_block hash block =
lwt_debug "-> Validator.notify_block %a"
Block_hash.pp_short hash >>= fun () ->
Chain.head net >>= fun head ->
let head_header = State.Block.header head in
if Fitness.compare head_header.shell.fitness block.shell.fitness <= 0 then
Context_db.prefetch v session hash ;
Lwt.return_unit
and fetch_block hash =
Context_db.fetch session v hash
and create_child block protocol expiration =
if State.Net.allow_forked_network net then begin
deactivate_child () >>= fun () ->
begin
State.Net.get state net_id >>= function
| Ok net_store -> return net_store
| Error _ ->
State.fork_testnet block protocol expiration >>=? fun net_store ->
Chain.head net_store >>= fun block ->
Watcher.notify v.worker.valid_block_input block ;
return net_store
end >>=? fun net_store ->
worker.activate ~parent:v net_store >>= fun child ->
v.child <- Some child ;
return ()
end else begin
(* Ignoring request... *)
return ()
end
and deactivate_child () =
match v.child with
| None -> Lwt.return_unit
| Some child ->
v.child <- None ;
deactivate child
and check_child genesis protocol expiration current_time =
let activated =
match v.child with
| None -> false
| Some child ->
Block_hash.equal (State.Net.genesis child.net).block genesis in
begin
match max_child_ttl with
| None -> return expiration
| Some ttl ->
Distributed_db.Block_header.fetch net_db genesis () >>=? fun genesis ->
return
(Time.min expiration
(Time.add genesis.shell.timestamp (Int64.of_int ttl)))
end >>=? fun local_expiration ->
let expired = Time.(local_expiration <= current_time) in
if expired && activated then
deactivate_child () >>= return
else if not activated && not expired then
fetch_block genesis >>=? fun genesis ->
create_child genesis protocol expiration
else
return ()
and test_validator () =
match v.child with
| None -> None
| Some child -> Some (child, child.net_db)
in
new_blocks := begin
let rec loop () =
Lwt_pipe.pop queue >>= function
| `Branch (_gid, locator) ->
let head, hist = (locator : Block_locator.t :> _ * _) in
List.iter
(Context_db.prefetch v session)
(Block_header.hash head :: hist) ;
loop ()
| `Head (gid, head, ops) ->
Context_db.prefetch v session head ;
Prevalidator.notify_operations prevalidator gid ops ;
loop ()
in
Lwt.catch loop
(function Lwt_pipe.Closed -> Lwt.return_unit
| exn -> Lwt.fail exn)
end ;
Lwt.return v
let create state db = let create state db =
let block_validator = Block_validator.create db in
let validators : net_validator Lwt.t Net_id.Table.t =
Net_id.Table.create 7 in
let valid_block_input = Watcher.create_input () in let valid_block_input = Watcher.create_input () in
{ state ; db ; block_validator ;
let get_exn net = Net_id.Table.find validators net in
let get net =
try get_exn net >>= fun v -> return v
with Not_found -> fail (State.Unknown_network net) in
let remove net = Net_id.Table.remove validators net in
let deactivate { net } =
let id = State.Net.id net in
get id >>= function
| Error _ -> Lwt.return_unit
| Ok v ->
lwt_log_notice "deactivate network %a" Net_id.pp id >>= fun () ->
remove id ;
v.shutdown ()
in
let notify_block hash (block : Block_header.t) =
match get_exn block.shell.net_id with
| exception Not_found -> Lwt.return_unit
| net ->
net >>= fun net ->
net.notify_block hash block in
let cancelation, cancel, _on_cancel = Lwt_utils.canceler () in
let maintenance_worker =
let next_net_maintenance = ref (Time.now ()) in
let net_maintenance () =
lwt_log_info "net maintenance" >>= fun () ->
let time = Time.now () in
Net_id.Table.fold
(fun _ v acc ->
v >>= fun v ->
acc >>= fun () ->
match State.Net.expiration v.net with
| Some eol when Time.(eol <= time) -> deactivate v
| Some _ | None -> Lwt.return_unit)
validators Lwt.return_unit >>= fun () ->
State.Net.all state >>= fun all_net ->
Lwt_list.iter_p
(fun net ->
match State.Net.expiration net with
| Some eol when Time.(eol <= time) ->
lwt_log_notice "destroy network %a"
Net_id.pp (State.Net.id net) >>= fun () ->
State.Net.destroy state net
| Some _ | None -> Lwt.return_unit)
all_net >>= fun () ->
next_net_maintenance := Time.add (Time.now ()) (Int64.of_int 55) ;
Lwt.return_unit in
let next_head_maintenance = ref (Time.now ()) in
let head_maintenance () =
lwt_log_info "head maintenance" >>= fun () ->
(* TODO *)
next_head_maintenance := Time.add (Time.now ()) (Int64.of_int 55) ;
Lwt.return_unit in
let rec worker_loop () =
let timeout =
let next = min !next_head_maintenance !next_net_maintenance in
let delay = Time.(diff next (now ())) in
if delay <= 0L then
Lwt.return_unit
else
Lwt_unix.sleep (Int64.to_float delay) in
Lwt.pick [(timeout >|= fun () -> `Process);
(cancelation () >|= fun () -> `Cancel)] >>= function
| `Cancel -> Lwt.return_unit
| `Process ->
begin
if !next_net_maintenance < Time.now () then
net_maintenance ()
else
Lwt.return ()
end >>= fun () ->
begin
if !next_head_maintenance < Time.now () then
head_maintenance ()
else
Lwt.return ()
end >>= fun () ->
worker_loop ()
in
Lwt_utils.worker "validator_maintenance" ~run:worker_loop ~cancel in
let shutdown () =
cancel () >>= fun () ->
let validators =
Net_id.Table.fold
(fun _ (v: net_validator Lwt.t) acc -> (v >>= fun v -> v.shutdown ()) :: acc)
validators [] in
Lwt.join (maintenance_worker :: validators) in
let inject_block ?(force = false) bytes operations =
Distributed_db.inject_block db bytes operations >>=? fun (hash, block) ->
get block.shell.net_id >>=? fun net ->
let validation =
protect
~on_error: begin fun err ->
Distributed_db.clear_block
net.net_db hash (List.length operations) ;
Lwt.return (Error err)
end
begin fun () ->
Chain.head net.net >>= fun head ->
let head_header = State.Block.header head in
if force ||
Fitness.compare head_header.shell.fitness block.shell.fitness <= 0
then
fetch_block net hash
else
failwith "Fitness is below the current one"
end in
return (hash, validation) in
let rec activate ?parent ?max_child_ttl net =
let net_id = State.Net.id net in
lwt_log_notice "activate network %a"
Net_id.pp net_id >>= fun () ->
get net_id >>= function
| Error _ ->
let v = create_validator ?max_child_ttl ?parent worker state db net in
Net_id.Table.add validators net_id v ;
v
| Ok v -> Lwt.return v
and worker = {
get ; get_exn ;
activate ; deactivate ;
notify_block ;
inject_block ;
shutdown ;
valid_block_input ; valid_block_input ;
db ; active_nets = Net_id.Table.create 7 ;
} }
in let activate v ?bootstrap_threshold ?max_child_ttl net_state =
let net_id = State.Net.id net_state in
lwt_log_notice "activate network %a" Net_id.pp net_id >>= fun () ->
try Net_id.Table.find v.active_nets net_id
with Not_found ->
let nv =
Net_validator.create
?bootstrap_threshold
?max_child_ttl
v.block_validator v.valid_block_input v.db net_state in
Net_id.Table.add v.active_nets net_id nv ;
nv
worker let get_exn { active_nets } net_id =
Net_id.Table.find active_nets net_id
let new_head_watcher { new_head_input } = type error +=
Watcher.create_stream new_head_input | Inactive_network of Net_id.t
let watcher { valid_block_input_for_net } = let get v net_id =
Watcher.create_stream valid_block_input_for_net try get_exn v net_id >>= fun nv -> return nv
with Not_found -> fail (Inactive_network net_id)
let global_watcher ({ valid_block_input } : t) = let inject_block v ?force bytes operations =
let hash = Block_hash.hash_bytes [bytes] in
match Block_header.of_bytes bytes with
| None -> failwith "Cannot parse block header."
| Some block ->
get v block.shell.net_id >>=? fun nv ->
(* TODO... remove `Distributed_db.operation`
and only accept raw operations ??? *)
let validation =
map_p (map_p (Distributed_db.resolve_operation (Net_validator.net_db nv))) operations >>=? fun operations ->
Net_validator.validate_block nv ?force hash block operations in
return (hash, validation)
let shutdown { active_nets ; block_validator } =
let jobs =
Block_validator.shutdown block_validator ::
Net_id.Table.fold
(fun _ nv acc -> (nv >>= Net_validator.shutdown) :: acc)
active_nets [] in
Lwt.join jobs >>= fun () ->
Lwt.return_unit
let watcher { valid_block_input } =
Watcher.create_stream valid_block_input Watcher.create_stream valid_block_input

View File

@ -12,35 +12,21 @@ type t
val create: State.t -> Distributed_db.t -> t val create: State.t -> Distributed_db.t -> t
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t
val notify_block: t -> Block_hash.t -> Block_header.t -> unit Lwt.t val activate:
t ->
type net_validator ?bootstrap_threshold:int ->
?max_child_ttl:int ->
State.Net.t -> Net_validator.t Lwt.t
type error += type error +=
| Non_increasing_timestamp | Inactive_network of Net_id.t
| Non_increasing_fitness val get: t -> Net_id.t -> Net_validator.t tzresult Lwt.t
val get_exn: t -> Net_id.t -> Net_validator.t Lwt.t
val activate: t -> ?max_child_ttl:int -> State.Net.t -> net_validator Lwt.t
val get: t -> Net_id.t -> net_validator tzresult Lwt.t
val get_exn: t -> Net_id.t -> net_validator Lwt.t
val deactivate: net_validator -> unit Lwt.t
val net_state: net_validator -> State.Net.t
val net_db: net_validator -> Distributed_db.net_db
val fetch_block:
net_validator -> Block_hash.t -> State.Block.t tzresult Lwt.t
val inject_block: val inject_block:
t -> ?force:bool -> t ->
?force:bool ->
MBytes.t -> Distributed_db.operation list list -> MBytes.t -> Distributed_db.operation list list ->
(Block_hash.t * State.Block.t tzresult Lwt.t) tzresult Lwt.t (Block_hash.t * State.Block.t tzresult Lwt.t) tzresult Lwt.t
val prevalidator: net_validator -> Prevalidator.t val watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper
val test_validator: net_validator -> (net_validator * Distributed_db.net_db) option
val watcher: net_validator -> State.Block.t Lwt_stream.t * Watcher.stopper
val new_head_watcher: net_validator -> State.Block.t Lwt_stream.t * Watcher.stopper
val global_watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper
val bootstrapped: net_validator -> unit Lwt.t

View File

@ -468,7 +468,8 @@ let protect ?on_error ?canceler t =
let err = if canceled then [Canceled] else err in let err = if canceled then [Canceled] else err in
match on_error with match on_error with
| None -> Lwt.return (Error err) | None -> Lwt.return (Error err)
| Some on_error -> on_error err | Some on_error ->
Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn))
type error += Timeout type error += Timeout

View File

@ -5,7 +5,7 @@ set -e
test_dir="$(cd "$(dirname "$0")" && echo "$(pwd -P)")" test_dir="$(cd "$(dirname "$0")" && echo "$(pwd -P)")"
source $test_dir/lib/test_lib.inc.sh source $test_dir/lib/test_lib.inc.sh
expected_connections=3 expected_connections=4
max_peer_id=8 max_peer_id=8
for i in $(seq 1 $max_peer_id); do for i in $(seq 1 $max_peer_id); do
echo echo
@ -15,18 +15,18 @@ for i in $(seq 1 $max_peer_id); do
echo echo
done done
## waiting for the node to establich connections ## waiting for the node to establish connections
sleep 2
for client in "${client_instances[@]}"; do for client in "${client_instances[@]}"; do
echo echo
echo "### $client network stat" echo "### $client network stat"
echo echo
$client bootstrapped
$client network stat $client network stat
echo echo
done done
activate_alpha activate_alpha
sleep 5
assert_propagation_level() { assert_propagation_level() {
level=$1 level=$1
@ -38,12 +38,16 @@ assert_propagation_level() {
done done
} }
printf "\n\nAsserting protocol propagation\n"
for client in "${client_instances[@]}"; do assert_protocol() {
$client rpc call /blocks/head/protocol \ proto=$1
| assert_in_output "ProtoALphaALphaALphaALphaALphaALphaALphaALphaDdp3zK" printf "\n\nAsserting protocol propagation\n"
done for client in "${client_instances[@]}"; do
( $client rpc call /blocks/head/protocol | assert_in_output "$proto" ) \
|| exit 2
done
}
printf "\n\n" printf "\n\n"
@ -66,6 +70,8 @@ retry() {
done done
} }
retry 2 15 assert_protocol "ProtoALphaALphaALphaALphaALphaALphaALphaALphaDdp3zK"
$client1 mine for bootstrap1 -max-priority 512 $client1 mine for bootstrap1 -max-priority 512
retry 2 15 assert_propagation_level 2 retry 2 15 assert_propagation_level 2
@ -81,7 +87,8 @@ retry 2 15 assert_propagation_level 5
endorse_hash=$($client3 endorse for bootstrap3 | extract_operation_hash) endorse_hash=$($client3 endorse for bootstrap3 | extract_operation_hash)
transfer_hash=$($client4 transfer 500 from bootstrap1 to bootstrap3 | extract_operation_hash) transfer_hash=$($client4 transfer 500 from bootstrap1 to bootstrap3 | extract_operation_hash)
retry 2 15 $client4 mine for bootstrap4 -max-priority 512 # wait for the propagation of operations
sleep 2
assert_contains_operation() { assert_contains_operation() {
hash="$1" hash="$1"
@ -93,8 +100,9 @@ assert_contains_operation() {
done done
} }
$client4 mine for bootstrap4 -max-priority 512
retry 2 15 assert_contains_operation $endorse_hash retry 2 15 assert_contains_operation $endorse_hash
assert_contains_operation $transfer_hash retry 2 15 assert_contains_operation $transfer_hash
echo echo
echo End of test echo End of test