diff --git a/src/client/client_node_rpcs.ml b/src/client/client_node_rpcs.ml index 6d65d0d42..155faf6ac 100644 --- a/src/client/client_node_rpcs.ml +++ b/src/client/client_node_rpcs.ml @@ -18,9 +18,6 @@ let errors cctxt = let forge_block_header cctxt header = call_service0 cctxt Services.forge_block_header header -let validate_block cctxt net block = - call_err_service0 cctxt Services.validate_block (net, block) - type operation = Node_rpc_services.operation = | Blob of Operation.t | Hash of Operation_hash.t diff --git a/src/client/client_node_rpcs.mli b/src/client/client_node_rpcs.mli index b2001a47d..5fdf17909 100644 --- a/src/client/client_node_rpcs.mli +++ b/src/client/client_node_rpcs.mli @@ -17,11 +17,6 @@ val forge_block_header: Block_header.t -> MBytes.t tzresult Lwt.t -val validate_block: - config -> - Net_id.t -> Block_hash.t -> - unit tzresult Lwt.t - type operation = | Blob of Operation.t | Hash of Operation_hash.t diff --git a/src/node/main/node_run_command.ml b/src/node/main/node_run_command.ml index d378e2cf9..538e56aa5 100644 --- a/src/node/main/node_run_command.ml +++ b/src/node/main/node_run_command.ml @@ -142,6 +142,7 @@ let init_node ?sandbox (config : Node_config_file.t) = context_root = context_dir config.data_dir ; p2p = p2p_config ; test_network_max_tll = Some (48 * 3600) ; (* 2 days *) + bootstrap_threshold = 4 ; (* TODO add parameter *) } in Node.create node_config diff --git a/src/node/shell/block_validator.ml b/src/node/shell/block_validator.ml new file mode 100644 index 000000000..b6ca655fa --- /dev/null +++ b/src/node/shell/block_validator.ml @@ -0,0 +1,545 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +include Logging.Make(struct let name = "node.validator.block" end) +module Canceler = Lwt_utils.Canceler + +type 'a request = + | Request_validation: { + net_db: Distributed_db.net_db ; + notify_new_block: State.Block.t -> unit ; + canceler: Canceler.t option ; + peer: P2p.Peer_id.t option ; + hash: Block_hash.t ; + header: Block_header.t ; + operations: Operation.t list list ; + } -> State.Block.t tzresult request + +type message = Message: 'a request * 'a Lwt.u option -> message + +type t = { + protocol_validator: Protocol_validator.t ; + mutable worker: unit Lwt.t ; + messages: message Lwt_pipe.t ; + canceler: Canceler.t ; +} + +(** Block validation *) + +type block_error = + | Cannot_parse_operation of Operation_hash.t + | Invalid_fitness of { expected: Fitness.t ; found: Fitness.t } + | Inconsistent_netid of { operation: Operation_hash.t ; + expected: Net_id.t ; found: Net_id.t } + | Non_increasing_timestamp + | Non_increasing_fitness + | Invalid_level of { expected: Int32.t ; found: Int32.t } + | Invalid_proto_level of { expected: int ; found: int } + | Replayed_operation of Operation_hash.t + | Outdated_operation of + { operation: Operation_hash.t; + originating_block: Block_hash.t } + | Expired_network of + { net_id: Net_id.t ; + expiration: Time.t ; + timestamp: Time.t ; + } + | Unexpected_number_of_validation_passes of int (* uint8 *) + +let block_error_encoding = + let open Data_encoding in + union + [ + case + (obj2 + (req "error" (constant "cannot_parse_operation")) + (req "operation" Operation_hash.encoding)) + (function Cannot_parse_operation operation -> Some ((), operation) + | _ -> None) + (fun ((), operation) -> Cannot_parse_operation operation) ; + case + (obj3 + (req "error" (constant "invalid_fitness")) + (req "expected" Fitness.encoding) + (req "found" Fitness.encoding)) + (function + | Invalid_fitness { expected ; found } -> + Some ((), expected, found) + | _ -> None) + (fun ((), expected, found) -> Invalid_fitness { expected ; found }) ; + case + (obj1 + (req "error" (constant "non_increasing_timestamp"))) + (function Non_increasing_timestamp -> Some () + | _ -> None) + (fun () -> Non_increasing_timestamp) ; + case + (obj1 + (req "error" (constant "non_increasing_fitness"))) + (function Non_increasing_fitness -> Some () + | _ -> None) + (fun () -> Non_increasing_fitness) ; + case + (obj3 + (req "error" (constant "invalid_level")) + (req "expected" int32) + (req "found" int32)) + (function + | Invalid_level { expected ; found } -> + Some ((), expected, found) + | _ -> None) + (fun ((), expected, found) -> Invalid_level { expected ; found }) ; + case + (obj3 + (req "error" (constant "invalid_proto_level")) + (req "expected" uint8) + (req "found" uint8)) + (function + | Invalid_proto_level { expected ; found } -> + Some ((), expected, found) + | _ -> None) + (fun ((), expected, found) -> + Invalid_proto_level { expected ; found }) ; + case + (obj2 + (req "error" (constant "replayed_operation")) + (req "operation" Operation_hash.encoding)) + (function Replayed_operation operation -> Some ((), operation) + | _ -> None) + (fun ((), operation) -> Replayed_operation operation) ; + case + (obj3 + (req "error" (constant "outdated_operation")) + (req "operation" Operation_hash.encoding) + (req "originating_block" Block_hash.encoding)) + (function + | Outdated_operation { operation ; originating_block } -> + Some ((), operation, originating_block) + | _ -> None) + (fun ((), operation, originating_block) -> + Outdated_operation { operation ; originating_block }) ; + case + (obj2 + (req "error" (constant "unexpected_number_of_passes")) + (req "found" uint8)) + (function + | Unexpected_number_of_validation_passes n -> Some ((), n) + | _ -> None) + (fun ((), n) -> Unexpected_number_of_validation_passes n) ; + ] + +let pp_block_error ppf = function + | Cannot_parse_operation oph -> + Format.fprintf ppf + "Failed to parse the operation %a." + Operation_hash.pp_short oph + | Invalid_fitness { expected ; found } -> + Format.fprintf ppf + "@[Invalid fitness:@ \ + \ expected %a@ \ + \ found %a@]" + Fitness.pp expected + Fitness.pp found + | Inconsistent_netid { operation ; expected ; found } -> + Format.fprintf ppf + "@[The network identifier of the operation %a is not \ + \ constitent with the network identifier of the block: @ \ + \ expected: %a@ \ + \ found: %a@]" + Operation_hash.pp_short operation + Net_id.pp expected + Net_id.pp found + | Non_increasing_timestamp -> + Format.fprintf ppf "Non increasing timestamp" + | Non_increasing_fitness -> + Format.fprintf ppf "Non increasing fitness" + | Invalid_level { expected ; found } -> + Format.fprintf ppf + "Invalid level:@ \ + \ expected %ld@ \ + \ found %ld" + expected + found + | Invalid_proto_level { expected ; found } -> + Format.fprintf ppf + "Invalid protocol level:@ \ + \ expected %d@ \ + \ found %d" + expected + found + | Replayed_operation oph -> + Format.fprintf ppf + "The operation %a was previously included in the chain." + Operation_hash.pp_short oph + | Outdated_operation { operation ; originating_block } -> + Format.fprintf ppf + "The operation %a is outdated (originated in block: %a)" + Operation_hash.pp_short operation + Block_hash.pp_short originating_block + | Expired_network { net_id ; expiration ; timestamp } -> + Format.fprintf ppf + "The block timestamp (%a) is later than \ + its network expiration date: %a (net: %a)." + Time.pp_hum timestamp + Time.pp_hum expiration + Net_id.pp_short net_id + | Unexpected_number_of_validation_passes n -> + Format.fprintf ppf + "Invalid number of validation passes (found: %d)" + n + +type error += + | Invalid_block of + { block: Block_hash.t ; error: block_error } + | Unavailable_protocol of + { block: Block_hash.t ; protocol: Protocol_hash.t } + | Inconsistent_operations_hash of + { block: Block_hash.t ; + expected: Operation_list_list_hash.t ; + found: Operation_list_list_hash.t } + +let invalid_block block error = Invalid_block { block ; error } + +let () = + Error_monad.register_error_kind + `Permanent + ~id:"validator.invalid_block" + ~title:"Invalid block" + ~description:"Invalid block." + ~pp:begin fun ppf (block, error) -> + Format.fprintf ppf + "@[Invalid block %a@ %a@]" + Block_hash.pp_short block pp_block_error error + end + Data_encoding.(merge_objs + (obj1 (req "invalid_block" Block_hash.encoding)) + block_error_encoding) + (function Invalid_block { block ; error } -> + Some (block, error) | _ -> None) + (fun (block, error) -> + Invalid_block { block ; error }) ; + Error_monad.register_error_kind + `Temporary + ~id:"validator.unavailable_protocol" + ~title:"Missing protocol" + ~description:"The protocol required for validating a block is missing." + ~pp:begin fun ppf (block, protocol) -> + Format.fprintf ppf + "Missing protocol (%a) when validating the block %a." + Protocol_hash.pp_short protocol + Block_hash.pp_short block + end + Data_encoding.( + obj2 + (req "block" Block_hash.encoding) + (req "missing_protocol" Protocol_hash.encoding)) + (function + | Unavailable_protocol { block ; protocol } -> + Some (block, protocol) + | _ -> None) + (fun (block, protocol) -> Unavailable_protocol { block ; protocol }) ; + Error_monad.register_error_kind + `Temporary + ~id:"validator.inconsistent_operations_hash" + ~title:"Invalid merkle tree" + ~description:"The provided list of operations is inconsistent with \ + the block header." + ~pp:begin fun ppf (block, expected, found) -> + Format.fprintf ppf + "@[The provided list of operations for block %a \ + \ is inconsistent with the block header@ \ + \ expected: %a@ \ + \ found: %a@]" + Block_hash.pp_short block + Operation_list_list_hash.pp_short expected + Operation_list_list_hash.pp_short found + end + Data_encoding.( + obj3 + (req "block" Block_hash.encoding) + (req "expected" Operation_list_list_hash.encoding) + (req "found" Operation_list_list_hash.encoding)) + (function + | Inconsistent_operations_hash { block ; expected ; found } -> + Some (block, expected, found) + | _ -> None) + (fun (block, expected, found) -> + Inconsistent_operations_hash { block ; expected ; found }) + +let check_header + (pred_header: Block_header.t) hash (header: Block_header.t) = + fail_unless + (Int32.succ pred_header.shell.level = header.shell.level) + (invalid_block hash @@ + Invalid_level { expected = Int32.succ pred_header.shell.level ; + found = header.shell.level }) >>=? fun () -> + fail_unless + Time.(pred_header.shell.timestamp < header.shell.timestamp) + (invalid_block hash Non_increasing_timestamp) >>=? fun () -> + fail_unless + Fitness.(pred_header.shell.fitness < header.shell.fitness) + (invalid_block hash Non_increasing_fitness) >>=? fun () -> + fail_unless + (header.shell.validation_passes <= 1) (* FIXME to be found in Proto *) + (invalid_block hash + (Unexpected_number_of_validation_passes header.shell.validation_passes) + ) >>=? fun () -> + return () + +let assert_no_duplicate_operations block live_operations operation_hashes = + fold_left_s (fold_left_s (fun live_operations oph -> + fail_when (Operation_hash.Set.mem oph live_operations) + (invalid_block block @@ Replayed_operation oph) >>=? fun () -> + return (Operation_hash.Set.add oph live_operations))) + live_operations operation_hashes >>=? fun _ -> + return () + +let assert_operation_liveness block live_blocks operations = + iter_s (iter_s (fun op -> + fail_unless + (Block_hash.Set.mem op.Operation.shell.branch live_blocks) + (invalid_block block @@ + Outdated_operation { operation = Operation.hash op ; + originating_block = op.shell.branch }))) + operations + +let check_liveness pred hash operations_hashes operations = + Chain_traversal.live_blocks + pred (State.Block.max_operations_ttl pred) >>= fun (live_blocks, + live_operations) -> + assert_no_duplicate_operations + hash live_operations operations_hashes >>=? fun () -> + assert_operation_liveness hash live_blocks operations >>=? fun () -> + return () + +let apply_block + pred (module Proto : State.Registred_protocol.T) + hash (header: Block_header.t) + operations = + let pred_header = State.Block.header pred + and pred_hash = State.Block.hash pred in + check_header pred_header hash header >>=? fun () -> + let operation_hashes = List.map (List.map Operation.hash) operations in + check_liveness pred hash operation_hashes operations >>=? fun () -> + iter_p (iter_p (fun op -> + let op_hash = Operation.hash op in + fail_unless + (Net_id.equal op.shell.net_id header.shell.net_id) + (invalid_block hash @@ Inconsistent_netid { + operation = op_hash ; + expected = header.shell.net_id ; + found = op.shell.net_id ; + }))) + operations >>=? fun () -> + map2_s (map2_s begin fun op_hash raw -> + Lwt.return (Proto.parse_operation op_hash raw) + |> trace (invalid_block hash (Cannot_parse_operation op_hash)) + end) + operation_hashes + operations >>=? fun parsed_operations -> + State.Block.context pred >>= fun pred_context -> + Context.reset_test_network + pred_context pred_hash header.shell.timestamp >>= fun context -> + (* TODO wrap 'proto_error' into 'block_error' *) + Proto.begin_application + ~predecessor_context:context + ~predecessor_timestamp:pred_header.shell.timestamp + ~predecessor_fitness:pred_header.shell.fitness + header >>=? fun state -> + fold_left_s (fold_left_s (fun state op -> + Proto.apply_operation state op >>=? fun state -> + return state)) + state parsed_operations >>=? fun state -> + Proto.finalize_block state >>=? fun new_context -> + Context.get_protocol new_context.context >>= fun new_protocol -> + let expected_proto_level = + if Protocol_hash.equal new_protocol Proto.hash then + pred_header.shell.proto_level + else + (pred_header.shell.proto_level + 1) mod 256 in + fail_when (header.shell.proto_level <> expected_proto_level) + (invalid_block hash @@ Invalid_proto_level { + found = header.shell.proto_level ; + expected = expected_proto_level ; + }) >>=? fun () -> + fail_when + Fitness.(new_context.fitness <> header.shell.fitness) + (invalid_block hash @@ Invalid_fitness { + expected = header.shell.fitness ; + found = new_context.fitness ; + }) >>=? fun () -> + let max_operations_ttl = + max 0 + (min + ((State.Block.max_operations_ttl pred)+1) + new_context.max_operations_ttl) in + let new_context = + { new_context with max_operations_ttl } in + return new_context + +let check_net_liveness net_db hash (header: Block_header.t) = + let net_state = Distributed_db.net_state net_db in + match State.Net.expiration net_state with + | Some eol when Time.(eol <= header.shell.timestamp) -> + fail @@ invalid_block hash @@ + Expired_network { net_id = State.Net.id net_state ; + expiration = eol ; + timestamp = header.shell.timestamp } + | None | Some _ -> return () + +let get_proto pred hash = + State.Block.context pred >>= fun pred_context -> + Context.get_protocol pred_context >>= fun pred_protocol_hash -> + match State.Registred_protocol.get pred_protocol_hash with + | None -> + fail (Unavailable_protocol { block = hash ; + protocol = pred_protocol_hash }) + | Some p -> return p + +let rec worker_loop bv = + begin + Lwt_utils.protect ~canceler:bv.canceler begin fun () -> + Lwt_pipe.pop bv.messages >>= return + end >>=? fun (Message (request, wakener)) -> + let may_wakeup = + match wakener with + | None -> (fun _ -> ()) + | Some wakener -> (fun v -> Lwt.wakeup_later wakener v) + in + match request with + | Request_validation { net_db ; notify_new_block ; canceler ; + peer ; hash ; header ; operations } -> + let net_state = Distributed_db.net_state net_db in + State.Block.read_opt net_state hash >>= function + | Some block -> + lwt_debug "previously validated block %a (after pipe)" + Block_hash.pp_short hash >>= fun () -> + Protocol_validator.prefetch_and_compile_protocols + bv.protocol_validator ?peer ~timeout:60. block ; + may_wakeup (Ok block) ; + return () + | None -> + begin + lwt_debug "validating block %a" + Block_hash.pp_short hash >>= fun () -> + State.Block.read + net_state header.shell.predecessor >>=? fun pred -> + get_proto pred hash >>=? fun proto -> + (* TODO also protect with [bv.canceler]. *) + Lwt_utils.protect ?canceler begin fun () -> + apply_block pred proto hash header operations + end + end >>= function + | Ok result -> begin + lwt_log_info "validated block %a" + Block_hash.pp_short hash >>= fun () -> + Lwt_utils.protect ~canceler:bv.canceler begin fun () -> + Distributed_db.commit_block + net_db hash header operations result + end >>=? function + | None -> + assert false (* should not happen *) + | Some block -> + Protocol_validator.prefetch_and_compile_protocols + bv.protocol_validator ?peer ~timeout:60. block ; + may_wakeup (Ok block) ; + notify_new_block block ; + return () + end + (* TODO catch other temporary error (e.g. system errors) + and do not 'commit' them on disk... *) + | Error [Lwt_utils.Canceled | Unavailable_protocol _] as err -> + may_wakeup err ; + return () + | Error errors as err -> + Lwt_utils.protect ~canceler:bv.canceler begin fun () -> + Distributed_db.commit_invalid_block + net_db hash header errors + end >>=? fun commited -> + assert commited ; + may_wakeup err ; + return () + end >>= function + | Ok () -> + worker_loop bv + | Error [Exn (Unix.Unix_error _) as err] -> + lwt_log_error "validation failed with %a" + pp_print_error [err] >>= fun () -> + worker_loop bv + | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + lwt_log_notice "terminating" >>= fun () -> + Lwt.return_unit + | Error err -> + lwt_log_error "@[Unexpected error:@ %a@]" + pp_print_error err >>= fun () -> + Canceler.cancel bv.canceler >>= fun () -> + Lwt.return_unit + +let create db = + let protocol_validator = Protocol_validator.create db in + let canceler = Canceler.create () in + let messages = Lwt_pipe.create () in + let bv = { + protocol_validator ; + canceler ; messages ; + worker = Lwt.return_unit } in + Canceler.on_cancel bv.canceler begin fun () -> + Lwt_pipe.close bv.messages ; + Lwt.return_unit + end ; + bv.worker <- + Lwt_utils.worker "block_validator" + ~run:(fun () -> worker_loop bv) + ~cancel:(fun () -> Canceler.cancel bv.canceler) ; + bv + +let shutdown { canceler ; worker } = + Canceler.cancel canceler >>= fun () -> + worker + +let validate { messages ; protocol_validator } + ?canceler ?peer ?(notify_new_block = fun _ -> ()) + net_db hash (header : Block_header.t) operations = + let net_state = Distributed_db.net_state net_db in + State.Block.read_opt net_state hash >>= function + | Some block -> + lwt_debug "previously validated block %a (before pipe)" + Block_hash.pp_short hash >>= fun () -> + Protocol_validator.prefetch_and_compile_protocols + protocol_validator ?peer ~timeout:60. block ; + return block + | None -> + let res, wakener = Lwt.task () in + map_p (map_p (fun op -> + let op_hash = Operation.hash op in + return op_hash)) + operations >>=? fun hashes -> + let computed_hash = + Operation_list_list_hash.compute + (List.map Operation_list_hash.compute hashes) in + fail_when + (Operation_list_list_hash.compare + computed_hash header.shell.operations_hash <> 0) + (Inconsistent_operations_hash { + block = hash ; + expected = header.shell.operations_hash ; + found = computed_hash ; + }) >>=? fun () -> + check_net_liveness net_db hash header >>=? fun () -> + lwt_debug "pushing validation request for block %a" + Block_hash.pp_short hash >>= fun () -> + Lwt_pipe.push messages + (Message (Request_validation + { net_db ; notify_new_block ; canceler ; + peer ; hash ; header ; operations }, + Some wakener)) >>= fun () -> + res + +let fetch_and_compile_protocol bv = + Protocol_validator.fetch_and_compile_protocol bv.protocol_validator diff --git a/src/node/shell/block_validator.mli b/src/node/shell/block_validator.mli new file mode 100644 index 000000000..4d3d129ae --- /dev/null +++ b/src/node/shell/block_validator.mli @@ -0,0 +1,59 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type t + +type block_error = + | Cannot_parse_operation of Operation_hash.t + | Invalid_fitness of { expected: Fitness.t ; found: Fitness.t } + | Inconsistent_netid of { operation: Operation_hash.t ; + expected: Net_id.t ; found: Net_id.t } + | Non_increasing_timestamp + | Non_increasing_fitness + | Invalid_level of { expected: Int32.t ; found: Int32.t } + | Invalid_proto_level of { expected: int ; found: int } + | Replayed_operation of Operation_hash.t + | Outdated_operation of + { operation: Operation_hash.t; + originating_block: Block_hash.t } + | Expired_network of + { net_id: Net_id.t ; + expiration: Time.t ; + timestamp: Time.t ; + } + | Unexpected_number_of_validation_passes of int (* uint8 *) + +type error += + | Invalid_block of + { block: Block_hash.t ; error: block_error } + | Unavailable_protocol of + { block: Block_hash.t ; protocol: Protocol_hash.t } + | Inconsistent_operations_hash of + { block: Block_hash.t ; + expected: Operation_list_list_hash.t ; + found: Operation_list_list_hash.t } + +val create: Distributed_db.t -> t + +val validate: + t -> + ?canceler:Lwt_utils.Canceler.t -> + ?peer:P2p.Peer_id.t -> + ?notify_new_block:(State.Block.t -> unit) -> + Distributed_db.net_db -> + Block_hash.t -> Block_header.t -> Operation.t list list -> + State.Block.t tzresult Lwt.t + +val fetch_and_compile_protocol: + t -> + ?peer:P2p.Peer_id.t -> + ?timeout:float -> + Protocol_hash.t -> State.Registred_protocol.t tzresult Lwt.t + +val shutdown: t -> unit Lwt.t diff --git a/src/node/shell/bootstrap_pipeline.ml b/src/node/shell/bootstrap_pipeline.ml new file mode 100644 index 000000000..0bcd99a02 --- /dev/null +++ b/src/node/shell/bootstrap_pipeline.ml @@ -0,0 +1,231 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +include Logging.Make(struct let name = "node.validator.bootstrap_pipeline" end) +module Canceler = Lwt_utils.Canceler + +type t = { + canceler: Canceler.t ; + mutable headers_fetch_worker: unit Lwt.t ; + mutable operations_fetch_worker: unit Lwt.t ; + mutable validation_worker: unit Lwt.t ; + peer_id: P2p.Peer_id.t ; + net_db: Distributed_db.net_db ; + locator: Block_locator.t ; + block_validator: Block_validator.t ; + notify_new_block: State.Block.t -> unit ; + fetched_headers: + (Block_hash.t * Block_header.t) Lwt_pipe.t ; + fetched_blocks: + (Block_hash.t * Block_header.t * Operation.t list list) Lwt_pipe.t ; + (* HACK, a worker should be able to return the 'error'. *) + mutable errors: Error_monad.error list ; +} + +let fetch_step pipeline (step : Block_locator.step) = + lwt_log_info "fetching step %a -> %a (%d%s) from peer %a." + Block_hash.pp_short step.block + Block_hash.pp_short step.predecessor + step.step + (if step.strict_step then "" else " max") + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + let rec fetch_loop acc hash cpt = + Lwt_unix.yield () >>= fun () -> + if cpt < 0 then + lwt_log_info "invalid step from peer %a (too long)." + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + fail (Block_locator.Invalid_locator + (pipeline.peer_id, pipeline.locator)) + else if Block_hash.equal hash step.predecessor then + if step.strict_step && cpt <> 0 then + lwt_log_info "invalid step from peer %a (too short)." + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + fail (Block_locator.Invalid_locator + (pipeline.peer_id, pipeline.locator)) + else + return acc + else + lwt_debug "fetching block header %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Distributed_db.Block_header.fetch + ~timeout:60. (* TODO allow to adjust the constant ... *) + pipeline.net_db ~peer:pipeline.peer_id + hash () + end >>=? fun header -> + lwt_debug "fetched block header %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + fetch_loop ((hash, header) :: acc) header.shell.predecessor (cpt - 1) + in + fetch_loop [] step.block step.step >>=? fun headers -> + iter_s + begin fun header -> + Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_pipe.push pipeline.fetched_headers header >>= return + end + end + headers >>=? fun () -> + return () + +let headers_fetch_worker_loop pipeline = + begin + let steps = Block_locator.to_steps pipeline.locator in + iter_s (fetch_step pipeline) steps >>=? fun () -> + return () + end >>= function + | Ok () -> + lwt_log_info "fetched all step from peer %a." + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + Lwt_pipe.close pipeline.fetched_headers ; + Lwt.return_unit + | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + Lwt.return_unit + | Error err -> + pipeline.errors <- pipeline.errors @ err ; + lwt_log_error "@[Unexpected error (headers fetch):@ %a@]" + pp_print_error err >>= fun () -> + Canceler.cancel pipeline.canceler >>= fun () -> + Lwt.return_unit + +let rec operations_fetch_worker_loop pipeline = + begin + Lwt_unix.yield () >>= fun () -> + Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_pipe.pop pipeline.fetched_headers >>= return + end >>=? fun (hash, header) -> + lwt_log_info "fetching operations of block %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + map_p + (fun i -> + Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Distributed_db.Operations.fetch + ~timeout:60. (* TODO allow to adjust the constant ... *) + pipeline.net_db ~peer:pipeline.peer_id + (hash, i) header.shell.operations_hash + end) + (0 -- (header.shell.validation_passes - 1)) >>=? fun operations -> + lwt_log_info "fetched operations of block %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_pipe.push pipeline.fetched_blocks + (hash, header, operations) >>= return + end + end >>= function + | Ok () -> + operations_fetch_worker_loop pipeline + | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + Lwt_pipe.close pipeline.fetched_blocks ; + Lwt.return_unit + | Error err -> + pipeline.errors <- pipeline.errors @ err ; + lwt_log_error "@[Unexpected error (operations fetch):@ %a@]" + pp_print_error err >>= fun () -> + Canceler.cancel pipeline.canceler >>= fun () -> + Lwt.return_unit + +let rec validation_worker_loop pipeline = + begin + Lwt_unix.yield () >>= fun () -> + Lwt_utils.protect ~canceler:pipeline.canceler begin fun () -> + Lwt_pipe.pop pipeline.fetched_blocks >>= return + end >>=? fun (hash, header, operations) -> + lwt_log_info "requesting validation for block %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + Block_validator.validate + ~canceler:pipeline.canceler + ~notify_new_block:pipeline.notify_new_block + pipeline.block_validator + pipeline.net_db hash header operations >>=? fun _block -> + lwt_log_info "validated block %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pipeline.peer_id >>= fun () -> + return () + end >>= function + | Ok () -> validation_worker_loop pipeline + | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + Lwt.return_unit + | Error ([ Block_validator.Invalid_block _ + | Block_validator.Unavailable_protocol _ ] as err ) -> + (* Propagate the error to the peer validator. *) + pipeline.errors <- pipeline.errors @ err ; + Canceler.cancel pipeline.canceler >>= fun () -> + Lwt.return_unit + | Error err -> + pipeline.errors <- pipeline.errors @ err ; + lwt_log_error "@[Unexpected error (validator):@ %a@]" + pp_print_error err >>= fun () -> + Canceler.cancel pipeline.canceler >>= fun () -> + Lwt.return_unit + +let create + ?(notify_new_block = fun _ -> ()) + block_validator peer_id net_db locator = + let canceler = Canceler.create () in + let fetched_headers = + Lwt_pipe.create ~size:(50, fun _ -> 1) () in + let fetched_blocks = + Lwt_pipe.create ~size:(50, fun _ -> 1) () in + let pipeline = { + canceler ; + headers_fetch_worker = Lwt.return_unit ; + operations_fetch_worker = Lwt.return_unit ; + validation_worker = Lwt.return_unit ; + notify_new_block ; + peer_id ; net_db ; locator ; + block_validator ; + fetched_headers ; fetched_blocks ; + errors = [] ; + } in + Canceler.on_cancel pipeline.canceler begin fun () -> + Lwt_pipe.close fetched_blocks ; + Lwt_pipe.close fetched_headers ; + Lwt.return_unit + end ; + let head, _ = (pipeline.locator : Block_locator.t :> _ * _) in + let hash = Block_header.hash head in + pipeline.headers_fetch_worker <- + Lwt_utils.worker + (Format.asprintf "bootstrap_pipeline-headers_fetch.%a.%a" + P2p.Peer_id.pp_short peer_id Block_hash.pp_short hash) + ~run:(fun () -> headers_fetch_worker_loop pipeline) + ~cancel:(fun () -> Canceler.cancel pipeline.canceler) ; + pipeline.operations_fetch_worker <- + Lwt_utils.worker + (Format.asprintf "bootstrap_pipeline-operations_fetch.%a.%a" + P2p.Peer_id.pp_short peer_id Block_hash.pp_short hash) + ~run:(fun () -> operations_fetch_worker_loop pipeline) + ~cancel:(fun () -> Canceler.cancel pipeline.canceler) ; + pipeline.validation_worker <- + Lwt_utils.worker + (Format.asprintf "bootstrap_pipeline-validation.%a.%a" + P2p.Peer_id.pp_short peer_id Block_hash.pp_short hash) + ~run:(fun () -> validation_worker_loop pipeline) + ~cancel:(fun () -> Canceler.cancel pipeline.canceler) ; + pipeline + +let wait_workers pipeline = + pipeline.headers_fetch_worker >>= fun () -> + pipeline.operations_fetch_worker >>= fun () -> + pipeline.validation_worker >>= fun () -> + Lwt.return_unit + +let wait pipeline = + wait_workers pipeline >>= fun () -> + match pipeline.errors with + | [] -> return () + | errors -> Lwt.return_error errors + +let cancel pipeline = + Canceler.cancel pipeline.canceler >>= fun () -> + wait_workers pipeline diff --git a/src/node/shell/bootstrap_pipeline.mli b/src/node/shell/bootstrap_pipeline.mli new file mode 100644 index 000000000..dd0290543 --- /dev/null +++ b/src/node/shell/bootstrap_pipeline.mli @@ -0,0 +1,20 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type t + +val create: + ?notify_new_block: (State.Block.t -> unit) -> + Block_validator.t -> + P2p.Peer_id.t -> Distributed_db.net_db -> + Block_locator.t -> t + +val wait: t -> unit tzresult Lwt.t + +val cancel: t -> unit Lwt.t diff --git a/src/node/shell/chain.ml b/src/node/shell/chain.ml index 33b7fa8af..1c3e4c51a 100644 --- a/src/node/shell/chain.ml +++ b/src/node/shell/chain.ml @@ -65,7 +65,7 @@ let set_head net_state block = locked_set_head chain_store data block >>= fun () -> Lwt.return (Some { current_head = block ; current_reversed_mempool = [] }, - ()) + data.current_head) end let test_and_set_head net_state ~old block = diff --git a/src/node/shell/chain.mli b/src/node/shell/chain.mli index 43f50c6b5..09f34d723 100644 --- a/src/node/shell/chain.mli +++ b/src/node/shell/chain.mli @@ -22,8 +22,9 @@ val known_heads: Net.t -> Block.t list Lwt.t val mem: Net.t -> Block_hash.t -> bool Lwt.t (** Test whether a block belongs to the current mainnet. *) -val set_head: Net.t -> Block.t -> unit Lwt.t -(** Record a block as the current head of the network's blockchain. *) +val set_head: Net.t -> Block.t -> Block.t Lwt.t +(** Record a block as the current head of the network's blockchain. + It returns the previous head. *) val set_reversed_mempool: Net.t -> Operation_hash.t list -> unit Lwt.t (** Record a list as the current list of pending operations. *) diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index b4241660f..d86f3e05b 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -516,24 +516,10 @@ module P2p_reader = struct | Operation_hashes_for_block (net_id, block, ofs, ops, path) -> begin may_handle state net_id @@ fun net_db -> (* TODO early detection of non-requested list. *) - let found_hash, found_ofs = - Operation_list_list_hash.check_path - path (Operation_list_hash.compute ops) in - if found_ofs <> ofs then - Lwt.return_unit - else - Raw_block_header.Table.read_opt - net_db.block_header_db.table block >>= function - | None -> Lwt.return_unit - | Some bh -> - if Operation_list_list_hash.compare - found_hash bh.shell.operations_hash <> 0 then - Lwt.return_unit - else - Raw_operation_hashes.Table.notify - net_db.operation_hashes_db.table state.gid - (block, ofs) (ops, path) >>= fun () -> - Lwt.return_unit + Raw_operation_hashes.Table.notify + net_db.operation_hashes_db.table state.gid + (block, ofs) (ops, path) >>= fun () -> + Lwt.return_unit end | Get_operations_for_blocks (net_id, blocks) -> @@ -555,24 +541,10 @@ module P2p_reader = struct | Operations_for_block (net_id, block, ofs, ops, path) -> may_handle state net_id @@ fun net_db -> (* TODO early detection of non-requested operations. *) - let found_hash, found_ofs = - Operation_list_list_hash.check_path - path (Operation_list_hash.compute (List.map Operation.hash ops)) in - if found_ofs <> ofs then - Lwt.return_unit - else - Raw_block_header.Table.read_opt - net_db.block_header_db.table block >>= function - | None -> Lwt.return_unit - | Some bh -> - if Operation_list_list_hash.compare - found_hash bh.shell.operations_hash <> 0 then - Lwt.return_unit - else - Raw_operations.Table.notify - net_db.operations_db.table state.gid - (block, ofs) (ops, path) >>= fun () -> - Lwt.return_unit + Raw_operations.Table.notify + net_db.operations_db.table state.gid + (block, ofs) (ops, path) >>= fun () -> + Lwt.return_unit let rec worker_loop global_db state = Lwt_utils.protect ~canceler:state.canceler begin fun () -> @@ -601,7 +573,9 @@ module P2p_reader = struct end) db.active_nets ; state.worker <- - Lwt_utils.worker "db_network_reader" + Lwt_utils.worker + (Format.asprintf "db_network_reader.%a" + P2p.Peer_id.pp_short gid) ~run:(fun () -> worker_loop db state) ~cancel:(fun () -> Lwt_utils.Canceler.cancel canceler) ; P2p.Peer_id.Table.add db.p2p_readers gid state @@ -738,36 +712,41 @@ let read_all_operations net_db hash n = map_p (Raw_operation.Table.read net_db.operation_db.table) hashes) operations -let commit_block net_db hash validation_result = - Raw_block_header.Table.read - net_db.block_header_db.table hash >>=? fun header -> - read_all_operations net_db - hash header.shell.validation_passes >>=? fun operations -> - State.Block.store - net_db.net_state header operations validation_result >>=? fun res -> - Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash ; - Raw_operation_hashes.clear_all - net_db.operation_hashes_db.table hash header.shell.validation_passes ; - Raw_operations.clear_all - net_db.operations_db.table hash header.shell.validation_passes ; - (* TODO: proper handling of the operations table by the prevalidator. *) - List.iter - (List.iter - (fun op -> Raw_operation.Table.clear_or_cancel - net_db.operation_db.table - (Operation.hash op))) - operations ; +let clear_block net_db hash n = + (* TODO use a reference counter ?? *) + Raw_operations.clear_all net_db.operations_db.table hash n ; + Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ; + Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash + +let commit_block net_db hash header operations result = + assert (Block_hash.equal hash (Block_header.hash header)) ; + assert (Net_id.equal (State.Net.id net_db.net_state) header.shell.net_id) ; + assert (List.length operations = header.shell.validation_passes) ; + State.Block.store net_db.net_state header operations result >>=? fun res -> + clear_block net_db hash header.shell.validation_passes ; return res -let commit_invalid_block net_db hash = - Raw_block_header.Table.read - net_db.block_header_db.table hash >>=? fun header -> +let commit_invalid_block net_db hash header _err = + assert (Block_hash.equal hash (Block_header.hash header)) ; + assert (Net_id.equal (State.Net.id net_db.net_state) header.shell.net_id) ; State.Block.store_invalid net_db.net_state header >>=? fun res -> - Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash ; - Raw_operation_hashes.clear_all - net_db.operation_hashes_db.table hash header.shell.validation_passes ; - Raw_operations.clear_all - net_db.operations_db.table hash header.shell.validation_passes ; + clear_block net_db hash header.shell.validation_passes ; + return res + +let clear_operations net_db operations = + List.iter + (List.iter + (Raw_operation.Table.clear_or_cancel net_db.operation_db.table)) + operations + +let inject_block_header net_db h b = + fail_unless + (Net_id.equal + b.Block_header.shell.net_id + (State.Net.id net_db.net_state)) + (failure "Inconsitent net_id in operation") >>=? fun () -> + Raw_block_header.Table.inject + net_db.block_header_db.table h b >>= fun res -> return res let inject_operation net_db h op = @@ -780,8 +759,7 @@ let inject_operation net_db h op = let inject_protocol db h p = Raw_protocol.Table.inject db.protocol_db.table h p -let commit_protocol db h = - Raw_protocol.Table.read db.protocol_db.table h >>=? fun p -> +let commit_protocol db h p = State.Protocol.store db.disk p >>= fun res -> Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ; return (res <> None) @@ -795,49 +773,10 @@ let resolve_operation net_db = function fail_unless (Net_id.equal op.shell.net_id (State.Net.id net_db.net_state)) (failure "Inconsistent net_id in operation.") >>=? fun () -> - return (Operation.hash op, op) + return op | Hash oph -> Raw_operation.Table.read net_db.operation_db.table oph >>=? fun op -> - return (oph, op) - -let inject_block db bytes operations = - let hash = Block_hash.hash_bytes [bytes] in - match Block_header.of_bytes bytes with - | None -> - failwith "Cannot parse block header." - | Some block -> - match get_net db block.shell.net_id with - | None -> - failwith "Unknown network." - | Some net_db -> - map_p - (map_p (resolve_operation net_db)) - operations >>=? fun operations -> - let hashes = List.map (List.map fst) operations in - let operations = List.map (List.map snd) operations in - let computed_hash = - Operation_list_list_hash.compute - (List.map Operation_list_hash.compute hashes) in - fail_when - (Operation_list_list_hash.compare - computed_hash block.shell.operations_hash <> 0) - (Exn (Failure "Incoherent operation list")) >>=? fun () -> - Raw_block_header.Table.inject - net_db.block_header_db.table hash block >>= function - | false -> - failwith "Previously injected block." - | true -> - Raw_operation_hashes.inject_all - net_db.operation_hashes_db.table hash hashes >>= fun _ -> - Raw_operations.inject_all - net_db.operations_db.table hash operations >>= fun _ -> - return (hash, block) - -let clear_block net_db hash n = - Raw_operations.clear_all net_db.operations_db.table hash n ; - Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ; - Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash - + return op let watch_block_header { block_input } = Watcher.create_stream block_input diff --git a/src/node/shell/distributed_db.mli b/src/node/shell/distributed_db.mli index 272f1595f..1b3f8a024 100644 --- a/src/node/shell/distributed_db.mli +++ b/src/node/shell/distributed_db.mli @@ -43,24 +43,30 @@ type operation = | Hash of Operation_hash.t val resolve_operation: - net_db -> operation -> (Operation_hash.t * Operation.t) tzresult Lwt.t + net_db -> operation -> Operation.t tzresult Lwt.t val commit_block: - net_db -> Block_hash.t -> Updater.validation_result -> + net_db -> + Block_hash.t -> + Block_header.t -> Operation.t list list -> + Updater.validation_result -> State.Block.t option tzresult Lwt.t + val commit_invalid_block: - net_db -> Block_hash.t -> + net_db -> + Block_hash.t -> Block_header.t -> Error_monad.error list -> bool tzresult Lwt.t -val inject_block: - t -> MBytes.t -> operation list list -> - (Block_hash.t * Block_header.t) tzresult Lwt.t -val clear_block: net_db -> Block_hash.t -> int -> unit + +val clear_operations: net_db -> Operation_hash.t list list -> unit + +val inject_block_header: + net_db -> Block_hash.t -> Block_header.t -> bool tzresult Lwt.t val inject_operation: net_db -> Operation_hash.t -> Operation.t -> bool tzresult Lwt.t val commit_protocol: - db -> Protocol_hash.t -> bool tzresult Lwt.t + db -> Protocol_hash.t -> Protocol.t -> bool tzresult Lwt.t val inject_protocol: db -> Protocol_hash.t -> Protocol.t -> bool Lwt.t @@ -110,6 +116,10 @@ module Operations : and type value = Operation.t list and type param := Operation_list_list_hash.t +val read_all_operations: + net_db -> Block_hash.t -> int -> Operation.t list list tzresult Lwt.t + + module Operation_hashes : DISTRIBUTED_DB with type t = net_db and type key = Block_hash.t * int diff --git a/src/node/shell/net_validator.ml b/src/node/shell/net_validator.ml new file mode 100644 index 000000000..e42073b50 --- /dev/null +++ b/src/node/shell/net_validator.ml @@ -0,0 +1,325 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +include Logging.Make(struct let name = "node.validator.net" end) +module Canceler = Lwt_utils.Canceler + +type t = { + + db: Distributed_db.t ; + net_state: State.Net.t ; + net_db: Distributed_db.net_db ; + block_validator: Block_validator.t ; + + bootstrap_threshold: int ; + mutable bootstrapped: bool ; + bootstrapped_wakener: unit Lwt.u ; + valid_block_input: State.Block.t Watcher.input ; + global_valid_block_input: State.Block.t Watcher.input ; + new_head_input: State.Block.t Watcher.input ; + + parent: t option ; + max_child_ttl: int option ; + + mutable child: t option ; + prevalidator: Prevalidator.t ; + active_peers: Peer_validator.t Lwt.t P2p.Peer_id.Table.t ; + bootstrapped_peers: unit P2p.Peer_id.Table.t ; + + mutable worker: unit Lwt.t ; + queue: State.Block.t Lwt_pipe.t ; + canceler: Canceler.t ; + +} + +let rec shutdown nv = + Canceler.cancel nv.canceler >>= fun () -> + Distributed_db.deactivate nv.net_db >>= fun () -> + Lwt.join + ( nv.worker :: + Prevalidator.shutdown nv.prevalidator :: + Lwt_utils.may ~f:shutdown nv.child :: + P2p.Peer_id.Table.fold + (fun _ pv acc -> (pv >>= Peer_validator.shutdown) :: acc) + nv.active_peers [] ) >>= fun () -> + Lwt.return_unit + +let shutdown_child nv = + Lwt_utils.may ~f:shutdown nv.child + +let notify_new_block nv block = + iter_option nv.parent + ~f:(fun nv -> Watcher.notify nv.valid_block_input block) ; + Watcher.notify nv.valid_block_input block ; + Watcher.notify nv.global_valid_block_input block ; + assert (Lwt_pipe.push_now nv.queue block) + +let may_toggle_bootstrapped_network nv = + if not nv.bootstrapped && + P2p.Peer_id.Table.length nv.bootstrapped_peers >= nv.bootstrap_threshold + then begin + nv.bootstrapped <- true ; + Lwt.wakeup_later nv.bootstrapped_wakener () ; + end + +let may_activate_peer_validator nv peer_id = + try P2p.Peer_id.Table.find nv.active_peers peer_id + with Not_found -> + let pv = + Peer_validator.create + ~notify_new_block:(notify_new_block nv) + ~notify_bootstrapped: begin fun () -> + P2p.Peer_id.Table.add nv.bootstrapped_peers peer_id () ; + may_toggle_bootstrapped_network nv + end + ~notify_termination: begin fun _pv -> + P2p.Peer_id.Table.remove nv.active_peers peer_id ; + P2p.Peer_id.Table.remove nv.bootstrapped_peers peer_id ; + end + nv.block_validator nv.net_db peer_id in + P2p.Peer_id.Table.add nv.active_peers peer_id pv ; + pv + +let broadcast_head nv ~previous block = + if not nv.bootstrapped then + Lwt.return_unit + else begin + begin + State.Block.predecessor block >>= function + | None -> Lwt.return_true + | Some predecessor -> + Lwt.return (State.Block.equal predecessor previous) + end >>= fun successor -> + if successor then begin + Distributed_db.Advertise.current_head nv.net_db block ; + Lwt.return_unit + end else begin + Distributed_db.Advertise.current_branch nv.net_db block + end + end + + +let rec create + ?max_child_ttl ?parent + ?(bootstrap_threshold = 1) + block_validator + global_valid_block_input db net_state = + let net_db = Distributed_db.activate db net_state in + Prevalidator.create net_db >>= fun prevalidator -> + let valid_block_input = Watcher.create_input () in + let new_head_input = Watcher.create_input () in + let canceler = Canceler.create () in + let _, bootstrapped_wakener = Lwt.wait () in + let nv = { + db ; net_state ; net_db ; block_validator ; + prevalidator ; + valid_block_input ; global_valid_block_input ; + new_head_input ; + parent ; max_child_ttl ; child = None ; + bootstrapped = (bootstrap_threshold <= 0) ; + bootstrapped_wakener ; + bootstrap_threshold ; + active_peers = + P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *) + bootstrapped_peers = + P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *) + worker = Lwt.return_unit ; + queue = Lwt_pipe.create () ; + canceler ; + } in + if nv.bootstrapped then Lwt.wakeup_later bootstrapped_wakener () ; + Distributed_db.set_callback net_db { + notify_branch = begin fun peer_id locator -> + Lwt.async begin fun () -> + may_activate_peer_validator nv peer_id >>= fun pv -> + Peer_validator.notify_branch pv locator ; + Lwt.return_unit + end + end ; + notify_head = begin fun peer_id block ops -> + Lwt.async begin fun () -> + may_activate_peer_validator nv peer_id >>= fun pv -> + Peer_validator.notify_head pv block ; + (* TODO notify prevalidator only if head is known ??? *) + Prevalidator.notify_operations nv.prevalidator peer_id ops ; + Lwt.return_unit + end; + end ; + disconnection = begin fun peer_id -> + Lwt.async begin fun () -> + may_activate_peer_validator nv peer_id >>= fun pv -> + Peer_validator.shutdown pv >>= fun () -> + Lwt.return_unit + end + end ; + } ; + nv.worker <- + Lwt_utils.worker + (Format.asprintf "net_validator.%a" Net_id.pp (State.Net.id net_state)) + ~run:(fun () -> worker_loop nv) + ~cancel:(fun () -> Canceler.cancel nv.canceler) ; + Lwt.return nv + +(** Current block computation *) + +and worker_loop nv = + begin + Lwt_utils.protect ~canceler:nv.canceler begin fun () -> + Lwt_pipe.pop nv.queue >>= return + end >>=? fun block -> + Chain.head nv.net_state >>= fun head -> + let head_header = State.Block.header head + and head_hash = State.Block.hash head + and block_header = State.Block.header block + and block_hash = State.Block.hash block in + if + Fitness.(block_header.shell.fitness <= head_header.shell.fitness) + then + return () + else begin + Chain.set_head nv.net_state block >>= fun previous -> + broadcast_head nv ~previous block >>= fun () -> + Prevalidator.flush nv.prevalidator block ; (* FIXME *) + may_switch_test_network nv block >>= fun () -> + Watcher.notify nv.new_head_input block ; + lwt_log_notice "update current head %a %a %a(%t)" + Block_hash.pp_short block_hash + Fitness.pp block_header.shell.fitness + Time.pp_hum block_header.shell.timestamp + (fun ppf -> + if Block_hash.equal head_hash block_header.shell.predecessor then + Format.fprintf ppf "same branch" + else + Format.fprintf ppf "changing branch") >>= fun () -> + return () + end + end >>= function + | Ok () -> + worker_loop nv + | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + Lwt.return_unit + | Error err -> + lwt_log_error "@[Unexpected error:@ %a@]" + pp_print_error err >>= fun () -> + Canceler.cancel nv.canceler >>= fun () -> + Lwt.return_unit + +and may_switch_test_network nv block = + + let create_child genesis protocol expiration = + if State.Net.allow_forked_network nv.net_state then begin + shutdown_child nv >>= fun () -> + begin + let net_id = Net_id.of_block_hash (State.Block.hash genesis) in + State.Net.get + (State.Net.global_state nv.net_state) net_id >>= function + | Ok net_state -> return net_state + | Error _ -> + State.fork_testnet + genesis protocol expiration >>=? fun net_state -> + Chain.head net_state >>= fun new_genesis_block -> + Watcher.notify nv.global_valid_block_input new_genesis_block ; + Watcher.notify nv.valid_block_input new_genesis_block ; + return net_state + end >>=? fun net_state -> + create + ~parent:nv nv.block_validator + nv.global_valid_block_input + nv.db net_state >>= fun child -> + nv.child <- Some child ; + return () + end else begin + (* Ignoring request... *) + return () + end in + + let check_child genesis protocol expiration current_time = + let activated = + match nv.child with + | None -> false + | Some child -> + Block_hash.equal + (State.Net.genesis child.net_state).block + genesis in + State.Block.read nv.net_state genesis >>=? fun genesis -> + begin + match nv.max_child_ttl with + | None -> Lwt.return expiration + | Some ttl -> + Lwt.return + (Time.min expiration + (Time.add (State.Block.timestamp genesis) (Int64.of_int ttl))) + end >>= fun local_expiration -> + let expired = Time.(local_expiration <= current_time) in + if expired && activated then + shutdown_child nv >>= return + else if not activated && not expired then + create_child genesis protocol expiration + else + return () in + + begin + let block_header = State.Block.header block in + State.Block.test_network block >>= function + | Not_running -> shutdown_child nv >>= return + | Running { genesis ; protocol ; expiration } -> + check_child genesis protocol expiration + block_header.shell.timestamp + | Forking { protocol ; expiration } -> + create_child block protocol expiration + end >>= function + | Ok () -> Lwt.return_unit + | Error err -> + lwt_log_error "@[Error while switch test network:@ %a@]" + Error_monad.pp_print_error err >>= fun () -> + Lwt.return_unit + + +(* TODO check the initial sequence of message when connecting to a new + peer, and the one when activating a network. *) + + +let create + ?max_child_ttl + ?bootstrap_threshold + block_validator global_valid_block_input global_db state = + (* hide the optional ?parent *) + create + ?max_child_ttl + ?bootstrap_threshold + block_validator global_valid_block_input global_db state + +let net_id { net_state } = State.Net.id net_state +let net_state { net_state } = net_state +let prevalidator { prevalidator } = prevalidator +let net_db { net_db } = net_db +let child { child } = child + +let validate_block nv ?(force = false) hash block operations = + assert (Block_hash.equal hash (Block_header.hash block)) ; + Chain.head nv.net_state >>= fun head -> + let head = State.Block.header head in + if + force || Fitness.(head.shell.fitness <= block.shell.fitness) + then + Block_validator.validate + ~canceler:nv.canceler + ~notify_new_block:(notify_new_block nv) + nv.block_validator nv.net_db hash block operations + else + failwith "Fitness too low" + +let bootstrapped { bootstrapped_wakener } = + Lwt.protected (Lwt.waiter_of_wakener bootstrapped_wakener) + +let valid_block_watcher { valid_block_input } = + Watcher.create_stream valid_block_input + +let new_head_watcher { new_head_input } = + Watcher.create_stream new_head_input diff --git a/src/node/shell/net_validator.mli b/src/node/shell/net_validator.mli new file mode 100644 index 000000000..76c46fdaa --- /dev/null +++ b/src/node/shell/net_validator.mli @@ -0,0 +1,39 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type t + +val create: + ?max_child_ttl:int -> + ?bootstrap_threshold:int -> + Block_validator.t -> + State.Block.t Watcher.input -> + Distributed_db.t -> + State.Net.t -> + t Lwt.t + +val bootstrapped: t -> unit Lwt.t + +val net_id: t -> Net_id.t +val net_state: t -> State.Net.t +val prevalidator: t -> Prevalidator.t +val net_db: t -> Distributed_db.net_db +val child: t -> t option + +val validate_block: + t -> + ?force:bool -> + Block_hash.t -> Block_header.t -> Operation.t list list -> + State.Block.t tzresult Lwt.t + +val shutdown: t -> unit Lwt.t + +val valid_block_watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper +val new_head_watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper + diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index 9c2a252b0..a064d2d9f 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -17,7 +17,7 @@ let inject_operation validator ?force bytes = | Some operation -> Validator.get validator operation.shell.net_id >>=? fun net_validator -> - let pv = Validator.prevalidator net_validator in + let pv = Net_validator.prevalidator net_validator in Prevalidator.inject_operation pv ?force operation in let hash = Operation_hash.hash_bytes [bytes] in Lwt.return (hash, t) @@ -44,17 +44,14 @@ let inject_protocol state ?force:_ proto = let inject_block validator ?force bytes operations = Validator.inject_block - validator ?force - bytes operations >>=? fun (hash, block) -> + validator ?force bytes operations >>=? fun (hash, block) -> return (hash, (block >>=? fun _ -> return ())) type t = { state: State.t ; distributed_db: Distributed_db.t ; validator: Validator.t ; - mainnet_db: Distributed_db.net_db ; - mainnet_net: State.Net.t ; - mainnet_validator: Validator.net_validator ; + mainnet_validator: Net_validator.t ; inject_block: ?force:bool -> MBytes.t -> Distributed_db.operation list list -> @@ -90,6 +87,7 @@ type config = { patch_context: (Context.t -> Context.t Lwt.t) option ; p2p: (P2p.config * P2p.limits) option ; test_network_max_tll: int option ; + bootstrap_threshold: int ; } let may_create_net state genesis = @@ -101,16 +99,17 @@ let may_create_net state genesis = let create { genesis ; store_root ; context_root ; patch_context ; p2p = net_params ; - test_network_max_tll = max_child_ttl } = + test_network_max_tll = max_child_ttl ; + bootstrap_threshold } = init_p2p net_params >>=? fun p2p -> State.read ~store_root ~context_root ?patch_context () >>=? fun state -> let distributed_db = Distributed_db.create state p2p in let validator = Validator.create state distributed_db in - may_create_net state genesis >>= fun mainnet_net -> + may_create_net state genesis >>= fun mainnet_state -> Validator.activate validator - ?max_child_ttl mainnet_net >>= fun mainnet_validator -> - let mainnet_db = Validator.net_db mainnet_validator in + ~bootstrap_threshold + ?max_child_ttl mainnet_state >>= fun mainnet_validator -> let shutdown () = State.close state >>= fun () -> P2p.shutdown p2p >>= fun () -> @@ -121,8 +120,6 @@ let create { genesis ; store_root ; context_root ; state ; distributed_db ; validator ; - mainnet_db ; - mainnet_net ; mainnet_validator ; inject_block = inject_block validator ; inject_operation = inject_operation validator ; @@ -190,35 +187,29 @@ module RPC = struct Block_hash.of_b58check_exn "BLockPrevaLidationPrevaLidationPrevaLidationPrZ4mr6" - let get_net node = function - | `Genesis | `Head _ | `Prevalidation -> - node.mainnet_validator, node.mainnet_db - | `Test_head _ | `Test_prevalidation -> - match Validator.test_validator node.mainnet_validator with - | None -> raise Not_found - | Some v -> v - let get_validator node = function | `Genesis | `Head _ | `Prevalidation -> node.mainnet_validator | `Test_head _ | `Test_prevalidation -> - match Validator.test_validator node.mainnet_validator with + match Net_validator.child node.mainnet_validator with | None -> raise Not_found - | Some (v, _) -> v + | Some v -> v let get_validator_per_hash node hash = State.read_block_exn node.state hash >>= fun block -> let header = State.Block.header block in if Net_id.equal - (State.Net.id node.mainnet_net) + (Net_validator.net_id node.mainnet_validator) header.shell.net_id then - Lwt.return (Some (node.mainnet_validator, node.mainnet_db)) + Lwt.return (Some node.mainnet_validator) else - match Validator.test_validator node.mainnet_validator with - | Some (test_validator, net_db) - when Net_id.equal - (State.Net.id (Validator.net_state test_validator)) - header.shell.net_id -> - Lwt.return (Some (node.mainnet_validator, net_db)) + match Net_validator.child node.mainnet_validator with + | Some test_validator -> + if Net_id.equal + (Net_validator.net_id test_validator) + header.shell.net_id then + Lwt.return_some test_validator + else + Lwt.return_none | _ -> Lwt.return_none let read_valid_block node h = @@ -238,19 +229,20 @@ module RPC = struct let block_info node (block: block) = match block with | `Genesis -> - Chain.genesis node.mainnet_net >>= convert + let net_state = Net_validator.net_state node.mainnet_validator in + Chain.genesis net_state >>= convert | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in - let net_db = Validator.net_db validator in - let net_state = Validator.net_state validator in + let net_db = Net_validator.net_db validator in + let net_state = Net_validator.net_state validator in Chain.head net_state >>= fun head -> predecessor net_db n head >>= convert | `Hash h -> read_valid_block_exn node h >>= convert | ( `Prevalidation | `Test_prevalidation ) as block -> let validator = get_validator node block in - let pv = Validator.prevalidator validator in - let net_state = Validator.net_state validator in + let pv = Net_validator.prevalidator validator in + let net_state = Net_validator.net_state validator in Chain.head net_state >>= fun head -> let head_header = State.Block.header head in let head_hash = State.Block.hash head in @@ -301,13 +293,14 @@ module RPC = struct let get_rpc_context node block = match block with | `Genesis -> - Chain.genesis node.mainnet_net >>= fun block -> + let net_state = Net_validator.net_state node.mainnet_validator in + Chain.genesis net_state >>= fun block -> rpc_context block >>= fun ctxt -> Lwt.return (Some ctxt) | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in - let net_state = Validator.net_state validator in - let net_db = Validator.net_db validator in + let net_state = Net_validator.net_state validator in + let net_db = Net_validator.net_db validator in Chain.head net_state >>= fun head -> predecessor net_db n head >>= fun block -> rpc_context block >>= fun ctxt -> @@ -321,9 +314,10 @@ module RPC = struct Lwt.return (Some ctxt) end | ( `Prevalidation | `Test_prevalidation ) as block -> - let validator, net_db = get_net node block in - let pv = Validator.prevalidator validator in - let net_state = Validator.net_state validator in + let validator = get_validator node block in + let pv = Net_validator.prevalidator validator in + let net_db = Net_validator.net_db validator in + let net_state = Net_validator.net_state validator in Chain.head net_state >>= fun head -> let head_header = State.Block.header head in let head_hash = State.Block.hash head in @@ -374,14 +368,14 @@ module RPC = struct | `Genesis -> Lwt.return [] | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in - let net_state = Validator.net_state validator in - let net_db = Validator.net_db validator in + let net_state = Net_validator.net_state validator in + let net_db = Net_validator.net_db validator in Chain.head net_state >>= fun head -> predecessor net_db n head >>= fun block -> State.Block.all_operation_hashes block | (`Prevalidation | `Test_prevalidation) as block -> - let validator, _net = get_net node block in - let pv = Validator.prevalidator validator in + let validator = get_validator node block in + let pv = Net_validator.prevalidator validator in let { Prevalidation.applied }, _ = Prevalidator.operations pv in Lwt.return [applied] | `Hash hash -> @@ -395,14 +389,15 @@ module RPC = struct | `Genesis -> Lwt.return [] | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in - let net_state = Validator.net_state validator in - let net_db = Validator.net_db validator in + let net_state = Net_validator.net_state validator in + let net_db = Net_validator.net_db validator in Chain.head net_state >>= fun head -> predecessor net_db n head >>= fun block -> State.Block.all_operations block | (`Prevalidation | `Test_prevalidation) as block -> - let validator, net_db = get_net node block in - let pv = Validator.prevalidator validator in + let validator = get_validator node block in + let net_db = Net_validator.net_db validator in + let pv = Net_validator.prevalidator validator in let { Prevalidation.applied }, _ = Prevalidator.operations pv in Lwt_list.map_p (Distributed_db.Operation.read_exn net_db) applied >>= fun applied -> @@ -417,32 +412,32 @@ module RPC = struct match block with | ( `Head 0 | `Prevalidation | `Test_head 0 | `Test_prevalidation ) as block -> - let validator, _net = get_net node block in - let pv = Validator.prevalidator validator in + let validator = get_validator node block in + let pv = Net_validator.prevalidator validator in Lwt.return (Prevalidator.operations pv) | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in - let prevalidator = Validator.prevalidator validator in - let net_state = Validator.net_state validator in - let net_db = Validator.net_db validator in + let prevalidator = Net_validator.prevalidator validator in + let net_state = Net_validator.net_state validator in + let net_db = Net_validator.net_db validator in Chain.head net_state >>= fun head -> predecessor net_db n head >>= fun b -> Prevalidator.pending ~block:b prevalidator >|= fun ops -> Prevalidation.empty_result, ops | `Genesis -> - let net = node.mainnet_net in - Chain.genesis net >>= fun b -> - let validator = get_validator node `Genesis in - let prevalidator = Validator.prevalidator validator in + let net_state = Net_validator.net_state node.mainnet_validator in + let prevalidator = + Net_validator.prevalidator node.mainnet_validator in + Chain.genesis net_state >>= fun b -> Prevalidator.pending ~block:b prevalidator >|= fun ops -> Prevalidation.empty_result, ops | `Hash h -> begin get_validator_per_hash node h >>= function | None -> Lwt.return (Prevalidation.empty_result, Operation_hash.Set.empty) - | Some (validator, net_db) -> - let net_state = Distributed_db.net_state net_db in - let prevalidator = Validator.prevalidator validator in + | Some validator -> + let net_state = Net_validator.net_state validator in + let prevalidator = Net_validator.prevalidator validator in State.Block.read_exn net_state h >>= fun block -> Prevalidator.pending ~block prevalidator >|= fun ops -> Prevalidation.empty_result, ops @@ -461,17 +456,17 @@ module RPC = struct begin match block with | `Genesis -> - let net = node.mainnet_net in - Chain.genesis net >>= return + let net_state = Net_validator.net_state node.mainnet_validator in + Chain.genesis net_state >>= return | ( `Head 0 | `Prevalidation | `Test_head 0 | `Test_prevalidation ) as block -> let validator = get_validator node block in - let net_state = Validator.net_state validator in + let net_state = Net_validator.net_state validator in Chain.head net_state >>= return | `Head n | `Test_head n as block -> begin let validator = get_validator node block in - let net_state = Validator.net_state validator in - let net_db = Validator.net_db validator in + let net_state = Net_validator.net_state validator in + let net_db = Net_validator.net_db validator in Chain.head net_state >>= fun head -> predecessor net_db n head >>= return end @@ -480,10 +475,11 @@ module RPC = struct | None -> Lwt.return (error_exn Not_found) | Some data -> return data end >>=? fun predecessor -> - let net_db = Validator.net_db node.mainnet_validator in + let net_db = Net_validator.net_db node.mainnet_validator in map_p (Distributed_db.resolve_operation net_db) ops >>=? fun rops -> Prevalidation.start_prevalidation ~proto_header ~predecessor ~timestamp () >>=? fun validation_state -> + let rops = List.map (fun x -> Operation.hash x, x) rops in Prevalidation.prevalidate validation_state ~sort rops >>= fun (validation_state, r) -> let operations_hash = @@ -535,12 +531,14 @@ module RPC = struct Lwt.return (Some (RPC.map (fun _ -> ()) dir)) let heads node = - Chain.known_heads node.mainnet_net >>= fun heads -> + let net_state = Net_validator.net_state node.mainnet_validator in + Chain.known_heads net_state >>= fun heads -> begin - match Validator.test_validator node.mainnet_validator with + match Net_validator.child node.mainnet_validator with | None -> Lwt.return_nil - | Some (_, net_db) -> - Chain.known_heads (Distributed_db.net_state net_db) + | Some test_validator -> + let net_state = Net_validator.net_state test_validator in + Chain.known_heads net_state end >>= fun test_heads -> Lwt_list.fold_left_s (fun map block -> @@ -600,7 +598,7 @@ module RPC = struct Distributed_db.watch_block_header node.distributed_db let block_watcher node = - let stream, shutdown = Validator.global_watcher node.validator in + let stream, shutdown = Validator.watcher node.validator in Lwt_stream.map_s (fun block -> convert block) stream, shutdown @@ -610,19 +608,15 @@ module RPC = struct let protocol_watcher node = Distributed_db.watch_protocol node.distributed_db - let validate node net_id block = - Validator.get node.validator net_id >>=? fun net_v -> - Validator.fetch_block net_v block >>=? fun _ -> - return () - let bootstrapped node = let block_stream, stopper = - Validator.new_head_watcher node.mainnet_validator in + Net_validator.new_head_watcher node.mainnet_validator in let first_run = ref true in let next () = if !first_run then begin first_run := false ; - Chain.head node.mainnet_net >>= fun head -> + let net_state = Net_validator.net_state node.mainnet_validator in + Chain.head net_state >>= fun head -> let head_hash = State.Block.hash head in let head_header = State.Block.header head in Lwt.return (Some (head_hash, head_header.shell.timestamp)) @@ -631,7 +625,7 @@ module RPC = struct ( Lwt_stream.get block_stream >|= map_option ~f:(fun b -> (State.Block.hash b, (State.Block.header b).shell.timestamp)) ) ; - (Validator.bootstrapped node.mainnet_validator >|= fun () -> None) ; + (Net_validator.bootstrapped node.mainnet_validator >|= fun () -> None) ; ] end in let shutdown () = Watcher.shutdown stopper in diff --git a/src/node/shell/node.mli b/src/node/shell/node.mli index f357bbdac..89f58a306 100644 --- a/src/node/shell/node.mli +++ b/src/node/shell/node.mli @@ -16,6 +16,7 @@ type config = { patch_context: (Context.t -> Context.t Lwt.t) option ; p2p: (P2p.config * P2p.limits) option ; test_network_max_tll: int option ; + bootstrap_threshold: int ; } val create: config -> t tzresult Lwt.t @@ -84,8 +85,6 @@ module RPC : sig sort_operations:bool -> Distributed_db.operation list -> (Block_header.shell_header * error Prevalidation.preapply_result) tzresult Lwt.t - val validate: t -> Net_id.t -> Block_hash.t -> unit tzresult Lwt.t - val context_dir: t -> block -> 'a RPC.directory option Lwt.t diff --git a/src/node/shell/node_rpc.ml b/src/node/shell/node_rpc.ml index 32c63d784..82af36aec 100644 --- a/src/node/shell/node_rpc.ml +++ b/src/node/shell/node_rpc.ml @@ -391,11 +391,6 @@ let build_rpc_directory node = Data_encoding.Binary.to_bytes Block_header.encoding header in RPC.Answer.return res in RPC.register0 dir Services.forge_block_header implementation in - let dir = - let implementation (net_id, block_hash) = - Node.RPC.validate node net_id block_hash >>= fun res -> - RPC.Answer.return res in - RPC.register0 dir Services.validate_block implementation in let dir = let implementation { Node_rpc_services.raw ; blocking ; force ; operations } = diff --git a/src/node/shell/node_rpc_services.ml b/src/node/shell/node_rpc_services.ml index 66dbc8278..9ab346827 100644 --- a/src/node/shell/node_rpc_services.ml +++ b/src/node/shell/node_rpc_services.ml @@ -619,18 +619,6 @@ let forge_block_header = ~output: (obj1 (req "block" bytes)) RPC.Path.(root / "forge_block_header") -let validate_block = - RPC.service - ~description: - "Force the node to fetch and validate the given block hash." - ~input: - (obj2 - (req "net" Net_id.encoding) - (req "hash" Block_hash.encoding)) - ~output: - (Error.wrap @@ empty) - RPC.Path.(root / "validate_block") - type inject_block_param = { raw: MBytes.t ; blocking: bool ; diff --git a/src/node/shell/node_rpc_services.mli b/src/node/shell/node_rpc_services.mli index d85f3799e..d1830dbe6 100644 --- a/src/node/shell/node_rpc_services.mli +++ b/src/node/shell/node_rpc_services.mli @@ -179,9 +179,6 @@ end val forge_block_header: (unit, unit, Block_header.t, MBytes.t) RPC.service -val validate_block: - (unit, unit, Net_id.t * Block_hash.t, unit tzresult) RPC.service - type inject_block_param = { raw: MBytes.t ; blocking: bool ; diff --git a/src/node/shell/peer_validator.ml b/src/node/shell/peer_validator.ml new file mode 100644 index 000000000..8c1a9a45f --- /dev/null +++ b/src/node/shell/peer_validator.ml @@ -0,0 +1,289 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(* FIXME ignore/postpone fetching/validating of block in the future... *) + +include Logging.Make(struct let name = "node.validator.peer" end) +module Canceler = Lwt_utils.Canceler + +type msg = + | New_head of Block_hash.t * Block_header.t + | New_branch of Block_hash.t * Block_locator.t + +type t = { + + peer_id: P2p.Peer_id.t ; + net_db: Distributed_db.net_db ; + block_validator: Block_validator.t ; + + (* callback to net_validator *) + notify_new_block: State.Block.t -> unit ; + notify_bootstrapped: unit -> unit ; + + mutable bootstrapped: bool ; + mutable last_validated_head: Block_hash.t ; + mutable last_advertised_head: Block_hash.t ; + + mutable worker: unit Lwt.t ; + dropbox: msg Lwt_dropbox.t ; + canceler: Canceler.t ; + +} + +type error += + | Unknown_ancestor + | Known_invalid + +let set_bootstrapped pv = + if not pv.bootstrapped then begin + pv.bootstrapped <- true ; + pv.notify_bootstrapped () ; + end + +let bootstrap_new_branch pv _ancestor _head unknown_prefix = + let len = Block_locator.estimated_length unknown_prefix in + lwt_log_info + "validating new branch from peer %a (approx. %d blocks)" + P2p.Peer_id.pp_short pv.peer_id len >>= fun () -> + let pipeline = + Bootstrap_pipeline.create + ~notify_new_block:pv.notify_new_block + pv.block_validator + pv.peer_id pv.net_db unknown_prefix in + Lwt_utils.protect ~canceler:pv.canceler + ~on_error:begin fun error -> + (* if the peer_validator is killed, let's cancel the pipeline *) + Bootstrap_pipeline.cancel pipeline >>= fun () -> + Lwt.return_error error + end + begin fun () -> + Bootstrap_pipeline.wait pipeline + end >>=? fun () -> + set_bootstrapped pv ; + lwt_log_info + "done validating new branch from peer %a." + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + return () + +let validate_new_head pv hash (header : Block_header.t) = + let net_state = Distributed_db.net_state pv.net_db in + State.Block.known net_state header.shell.predecessor >>= function + | false -> + lwt_debug + "missing predecessor for new head %a from peer %a" + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + Distributed_db.Request.current_branch pv.net_db ~peer:pv.peer_id () ; + return () + | true -> + lwt_debug + "fetching operations for new head %a from peer %a" + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + Distributed_db.inject_block_header pv.net_db hash header >>=? fun _ -> + (* TODO look for predownloaded (individual) + operations in the prevalidator ?? *) + map_p + (fun i -> + Lwt_utils.protect ~canceler:pv.canceler begin fun () -> + Distributed_db.Operations.fetch + ~timeout:60. (* TODO allow to adjust the constant ... *) + pv.net_db ~peer:pv.peer_id + (hash, i) header.shell.operations_hash + end) + (0 -- (header.shell.validation_passes - 1)) >>=? fun operations -> + lwt_debug + "requesting validation for new head %a from peer %a" + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + Block_validator.validate + ~notify_new_block:pv.notify_new_block + pv.block_validator pv.net_db + hash header operations >>=? fun _block -> + lwt_debug "end of validation for new head %a from peer %a" + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + set_bootstrapped pv ; + return () + +let may_validate_new_head pv hash header = + let net_state = Distributed_db.net_state pv.net_db in + State.Block.known net_state hash >>= function + | true -> begin + State.Block.known_valid net_state hash >>= function + | true -> + lwt_debug + "ignoring previously validated block %a from peer %a" + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + set_bootstrapped pv ; + pv.last_validated_head <- hash ; + return () + | false -> + lwt_log_info + "ignoring known invalid block %a from peer %a" + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + fail Known_invalid + end + | false -> + validate_new_head pv hash header + +let may_validate_new_branch pv distant_hash locator = + let distant_header, _ = (locator : Block_locator.t :> Block_header.t * _) in + let net_state = Distributed_db.net_state pv.net_db in + Chain.head net_state >>= fun local_header -> + if Fitness.compare + distant_header.Block_header.shell.fitness + (State.Block.fitness local_header) < 0 then begin + set_bootstrapped pv ; + lwt_debug + "ignoring branch %a with low fitness from peer: %a." + Block_hash.pp_short distant_hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + (* Don't bother with downloading a branch with a low fitness. *) + return () + end else begin + let net_state = Distributed_db.net_state pv.net_db in + Block_locator.known_ancestor net_state locator >>= function + | None -> + lwt_log_info + "ignoring branch %a without common ancestor from peer: %a." + Block_hash.pp_short distant_hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + fail Unknown_ancestor + | Some (ancestor, unknown_prefix) -> + bootstrap_new_branch pv ancestor distant_header unknown_prefix + end + +let rec worker_loop pv = + begin + Lwt_utils.protect ~canceler:pv.canceler begin fun () -> + (* TODO should the timeout be protocol dependent ?? *) + (* TODO or setup by the local admin ?? or a mix ??*) + Lwt_dropbox.take_with_timeout 90. pv.dropbox >>= return + end >>=? function + | None -> + lwt_log_info "no new head from peer %a for 90 seconds." + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + Distributed_db.Request.current_head pv.net_db ~peer:pv.peer_id () ; + return () + | Some (New_head (hash, header)) -> + lwt_log_info "processing new head %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + may_validate_new_head pv hash header + | Some (New_branch (hash, locator)) -> + (* TODO penalize empty locator... ?? *) + lwt_log_info "processing new branch %a from peer %a." + Block_hash.pp_short hash + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + may_validate_new_branch pv hash locator + end >>= function + | Ok () -> + worker_loop pv + | Error (( Unknown_ancestor + | Block_locator.Invalid_locator _ + | Block_validator.Invalid_block _ ) :: _) -> + (* TODO ban the peer_id... *) + lwt_log_info "Terminating the validation worker for peer %a (kickban)." + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + Canceler.cancel pv.canceler >>= fun () -> + Lwt.return_unit + | Error [Block_validator.Unavailable_protocol { protocol } ] -> begin + Block_validator.fetch_and_compile_protocol + pv.block_validator + ~peer:pv.peer_id ~timeout:60. protocol >>= function + | Ok _ -> worker_loop pv + | Error _ -> + (* TODO penality... *) + lwt_log_info "Terminating the validation worker for peer %a \ + \ (missing protocol %a)." + P2p.Peer_id.pp_short pv.peer_id + Protocol_hash.pp_short protocol >>= fun () -> + Canceler.cancel pv.canceler >>= fun () -> + Lwt.return_unit + end + | Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_dropbox.Closed] -> + lwt_log_info "Terminating the validation worker for peer %a." + P2p.Peer_id.pp_short pv.peer_id >>= fun () -> + Lwt.return_unit + | Error err -> + lwt_log_error + "@[Unexpected error in the validation worker for peer %a:@ \ + \ %a@]" + P2p.Peer_id.pp_short pv.peer_id + pp_print_error err >>= fun () -> + Canceler.cancel pv.canceler >>= fun () -> + Lwt.return_unit + +let create + ?notify_new_block:(external_notify_new_block = fun _ -> ()) + ?(notify_bootstrapped = fun () -> ()) + ?(notify_termination = fun _ -> ()) + block_validator net_db peer_id = + lwt_debug "creating validator for peer %a." + P2p.Peer_id.pp_short peer_id >>= fun () -> + let canceler = Canceler.create () in + let dropbox = Lwt_dropbox.create () in + let net_state = Distributed_db.net_state net_db in + let genesis = (State.Net.genesis net_state).block in + let rec notify_new_block block = + pv.last_validated_head <- State.Block.hash block ; + external_notify_new_block block + and pv = { + block_validator ; + notify_new_block ; + notify_bootstrapped ; + net_db ; + peer_id ; + bootstrapped = false ; + last_validated_head = genesis ; + last_advertised_head = genesis ; + canceler ; + dropbox ; + worker = Lwt.return_unit ; + } in + Canceler.on_cancel pv.canceler begin fun () -> + Lwt_dropbox.close pv.dropbox ; + Distributed_db.disconnect pv.net_db pv.peer_id >>= fun () -> + notify_termination pv ; + Lwt.return_unit + end ; + pv.worker <- + Lwt_utils.worker + (Format.asprintf "peer_validator.%a.%a" + Net_id.pp (State.Net.id net_state) P2p.Peer_id.pp_short peer_id) + ~run:(fun () -> worker_loop pv) + ~cancel:(fun () -> Canceler.cancel pv.canceler) ; + Lwt.return pv + +let notify_branch pv locator = + let head, _ = (locator : Block_locator.t :> _ * _) in + let hash = Block_header.hash head in + pv.last_advertised_head <- hash ; + try Lwt_dropbox.put pv.dropbox (New_branch (hash, locator)) + with Lwt_dropbox.Closed -> () + +let notify_head pv header = + let hash = Block_header.hash header in + pv.last_advertised_head <- hash ; + match Lwt_dropbox.peek pv.dropbox with + | Some (New_branch _) -> () (* ignore *) + | None | Some (New_head _) -> + try Lwt_dropbox.put pv.dropbox (New_head (hash, header)) + with Lwt_dropbox.Closed -> () + +let shutdown pv = + Canceler.cancel pv.canceler >>= fun () -> + pv.worker + +let peer_id pv = pv.peer_id +let bootstrapped pv = pv.bootstrapped +let current_head pv = pv.last_validated_head diff --git a/src/node/shell/peer_validator.mli b/src/node/shell/peer_validator.mli new file mode 100644 index 000000000..824c403f4 --- /dev/null +++ b/src/node/shell/peer_validator.mli @@ -0,0 +1,25 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type t + +val peer_id: t -> P2p.Peer_id.t +val bootstrapped: t -> bool +val current_head: t -> Block_hash.t + +val create: + ?notify_new_block: (State.Block.t -> unit) -> + ?notify_bootstrapped: (unit -> unit) -> + ?notify_termination: (t -> unit) -> + Block_validator.t -> + Distributed_db.net_db -> P2p.Peer_id.t -> t Lwt.t +val shutdown: t -> unit Lwt.t + +val notify_branch: t -> Block_locator.t -> unit +val notify_head: t -> Block_header.t -> unit diff --git a/src/node/shell/prevalidator.ml b/src/node/shell/prevalidator.ml index bb953e102..758e51bd1 100644 --- a/src/node/shell/prevalidator.ml +++ b/src/node/shell/prevalidator.ml @@ -9,23 +9,31 @@ open Logging.Node.Prevalidator -let list_pendings ~from_block ~to_block old_mempool = +let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool = let rec pop_blocks ancestor block mempool = let hash = State.Block.hash block in if Block_hash.equal hash ancestor then Lwt.return mempool else - State.Block.all_operation_hashes block >>= fun operations -> - let mempool = - List.fold_left - (List.fold_left (fun mempool h -> Operation_hash.Set.add h mempool)) - mempool operations in + State.Block.all_operations block >>= fun operations -> + Lwt_list.fold_left_s + (Lwt_list.fold_left_s (fun mempool op -> + let h = Operation.hash op in + Lwt_utils.may maintain_net_db + ~f:begin fun net_db -> + Distributed_db.inject_operation net_db h op >>= fun _ -> + Lwt.return_unit + end >>= fun () -> + Lwt.return (Operation_hash.Set.add h mempool))) + mempool operations >>= fun mempool -> State.Block.predecessor block >>= function | None -> assert false | Some predecessor -> pop_blocks ancestor predecessor mempool in let push_block mempool block = State.Block.all_operation_hashes block >|= fun operations -> + iter_option maintain_net_db + ~f:(fun net_db -> Distributed_db.clear_operations net_db operations) ; List.fold_left (List.fold_left (fun mempool h -> Operation_hash.Set.remove h mempool)) mempool operations @@ -238,21 +246,23 @@ let create net_db = Operation_hash.Table.mem pending op || Operation_hash.Set.mem op !live_operations) ops in - let fetch op = + let fetch h = Distributed_db.Operation.fetch ~timeout:10. (* TODO allow to adjust the constant ... *) - net_db ~peer:gid op () >>= function + net_db ~peer:gid h () >>= function | Ok _op -> - push_to_worker (`Handle op) ; + push_to_worker (`Handle h) ; Lwt.return_unit | Error [ Distributed_db.Operation.Canceled _ ] -> lwt_debug "operation %a included before being prevalidated" - Operation_hash.pp_short op >>= fun () -> + Operation_hash.pp_short h >>= fun () -> + Operation_hash.Table.remove pending h ; Lwt.return_unit | Error _ -> - (* should not happen *) - Lwt.return_unit in + Operation_hash.Table.remove pending h ; + Lwt.return_unit + in List.iter (fun op -> Operation_hash.Table.add pending op (fetch op)) unknown_ops ; @@ -272,7 +282,9 @@ let create net_db = lwt_debug "register %a" Operation_hash.pp_short op >>= fun () -> Lwt.return_unit | `Flush (new_head : State.Block.t) -> - list_pendings ~from_block:!head ~to_block:new_head + list_pendings + ~maintain_net_db:net_db + ~from_block:!head ~to_block:new_head (preapply_result_operations !operations) >>= fun new_mempool -> Chain_traversal.live_blocks new_head @@ -294,7 +306,10 @@ let create net_db = q >>= fun () -> worker_loop () in - Lwt_utils.worker "prevalidator" ~run:worker_loop ~cancel in + Lwt_utils.worker + (Format.asprintf "prevalidator.%a" + Net_id.pp (State.Net.id net_state)) + ~run:worker_loop ~cancel in let flush head = push_to_worker (`Flush head) ; diff --git a/src/node/shell/protocol_validator.ml b/src/node/shell/protocol_validator.ml new file mode 100644 index 000000000..fd2abb575 --- /dev/null +++ b/src/node/shell/protocol_validator.ml @@ -0,0 +1,196 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +include Logging.Make(struct let name = "node.validator.block" end) +module Canceler = Lwt_utils.Canceler + +type 'a request = + | Request_validation: { + hash: Protocol_hash.t ; + protocol: Protocol.t ; + } -> State.Registred_protocol.t tzresult request + +type message = Message: 'a request * 'a Lwt.u option -> message + +type t = { + db: Distributed_db.t ; + mutable worker: unit Lwt.t ; + messages: message Lwt_pipe.t ; + canceler: Canceler.t ; +} + +(** Block validation *) + +type protocol_error = + | Compilation_failed + | Dynlinking_failed + +let protocol_error_encoding = + let open Data_encoding in + union + [ + case + (obj1 + (req "error" (constant "compilation_failed"))) + (function Compilation_failed -> Some () + | _ -> None) + (fun () -> Compilation_failed) ; + case + (obj1 + (req "error" (constant "dynlinking_failed"))) + (function Dynlinking_failed -> Some () + | _ -> None) + (fun () -> Dynlinking_failed) ; + ] + +let pp_protocol_error ppf = function + | Compilation_failed -> + Format.fprintf ppf "compilation error" + | Dynlinking_failed -> + Format.fprintf ppf "dynlinking error" + +type error += + | Invalid_protocol of { hash: Protocol_hash.t ; error: protocol_error } + +let () = + Error_monad.register_error_kind + `Permanent + ~id:"validator.invalid_protocol" + ~title:"Invalid protocol" + ~description:"Invalid protocol." + ~pp:begin fun ppf (protocol, error) -> + Format.fprintf ppf + "@[Invalid protocol %a@ %a@]" + Protocol_hash.pp_short protocol pp_protocol_error error + end + Data_encoding.(merge_objs + (obj1 (req "invalid_protocol" Protocol_hash.encoding)) + protocol_error_encoding) + (function Invalid_protocol { hash ; error } -> + Some (hash, error) | _ -> None) + (fun (hash, error) -> + Invalid_protocol { hash ; error }) + +let rec worker_loop bv = + begin + Lwt_utils.protect ~canceler:bv.canceler begin fun () -> + Lwt_pipe.pop bv.messages >>= return + end >>=? function Message (request, wakener) -> + match request with + | Request_validation { hash ; protocol } -> + Updater.compile hash protocol >>= fun valid -> + begin + if valid then + Distributed_db.commit_protocol bv.db hash protocol + else + (* no need to tag 'invalid' protocol on disk, + the economic protocol prevents us from + being spammed with protocol validation. *) + return true + end >>=? fun _ -> + match wakener with + | None -> + return () + | Some wakener -> + if valid then + match State.Registred_protocol.get hash with + | Some protocol -> + Lwt.wakeup_later wakener (Ok protocol) + | None -> + Lwt.wakeup_later wakener + (Error + [Invalid_protocol { hash ; + error = Dynlinking_failed }]) + else + Lwt.wakeup_later wakener + (Error + [Invalid_protocol { hash ; + error = Compilation_failed }]) ; + return () + end >>= function + | Ok () -> + worker_loop bv + | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + lwt_log_notice "terminating" >>= fun () -> + Lwt.return_unit + | Error err -> + lwt_log_error "@[Unexpected error (worker):@ %a@]" + pp_print_error err >>= fun () -> + Canceler.cancel bv.canceler >>= fun () -> + Lwt.return_unit + +let create db = + let canceler = Canceler.create () in + let messages = Lwt_pipe.create () in + let bv = { + canceler ; messages ; db ; + worker = Lwt.return_unit } in + Canceler.on_cancel bv.canceler begin fun () -> + Lwt_pipe.close bv.messages ; + Lwt.return_unit + end ; + bv.worker <- + Lwt_utils.worker "block_validator" + ~run:(fun () -> worker_loop bv) + ~cancel:(fun () -> Canceler.cancel bv.canceler) ; + bv + +let shutdown { canceler ; worker } = + Canceler.cancel canceler >>= fun () -> + worker + +let validate { messages } hash protocol = + match State.Registred_protocol.get hash with + | Some protocol -> + lwt_debug "previously validated protocol %a (before pipe)" + Protocol_hash.pp_short hash >>= fun () -> + return protocol + | None -> + let res, wakener = Lwt.task () in + lwt_debug "pushing validation request for protocol %a" + Protocol_hash.pp_short hash >>= fun () -> + Lwt_pipe.push messages + (Message (Request_validation { hash ; protocol }, + Some wakener)) >>= fun () -> + res + +let fetch_and_compile_protocol pv ?peer ?timeout hash = + match State.Registred_protocol.get hash with + | Some proto -> return proto + | None -> + begin + Distributed_db.Protocol.read_opt pv.db hash >>= function + | Some protocol -> return protocol + | None -> + lwt_log_notice "Fetching protocol %a from peer " + Protocol_hash.pp_short hash >>= fun () -> + Distributed_db.Protocol.fetch pv.db ?peer ?timeout hash () + end >>=? fun protocol -> + validate pv hash protocol >>=? fun proto -> + return proto + +let fetch_and_compile_protocols pv ?peer ?timeout (block: State.Block.t) = + State.Block.context block >>= fun context -> + let protocol = + Context.get_protocol context >>= fun protocol_hash -> + fetch_and_compile_protocol pv ?peer ?timeout protocol_hash >>=? fun _ -> + return () + and test_protocol = + Context.get_test_network context >>= function + | Not_running -> return () + | Forking { protocol } + | Running { protocol } -> + fetch_and_compile_protocol pv ?peer ?timeout protocol >>=? fun _ -> + return () in + protocol >>=? fun () -> + test_protocol >>=? fun () -> + return () + +let prefetch_and_compile_protocols pv ?peer ?timeout block = + try ignore (fetch_and_compile_protocols pv ?peer ?timeout block) with _ -> () diff --git a/src/node/shell/protocol_validator.mli b/src/node/shell/protocol_validator.mli new file mode 100644 index 000000000..f0f9ac8b9 --- /dev/null +++ b/src/node/shell/protocol_validator.mli @@ -0,0 +1,46 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type t + +type protocol_error = + | Compilation_failed + | Dynlinking_failed + +type error += + | Invalid_protocol of + { hash: Protocol_hash.t ; error: protocol_error } + +val create: Distributed_db.t -> t + +val validate: + t -> + Protocol_hash.t -> Protocol.t -> + State.Registred_protocol.t tzresult Lwt.t + +val shutdown: t -> unit Lwt.t + +val fetch_and_compile_protocol: + t -> + ?peer:P2p.Peer_id.t -> + ?timeout:float -> + Protocol_hash.t -> State.Registred_protocol.t tzresult Lwt.t + +val fetch_and_compile_protocols: + t -> + ?peer:P2p.Peer_id.t -> + ?timeout:float -> + State.Block.t -> unit tzresult Lwt.t + +val prefetch_and_compile_protocols: + t -> + ?peer:P2p.Peer_id.t -> + ?timeout:float -> + State.Block.t -> unit + diff --git a/src/node/shell/state.ml b/src/node/shell/state.ml index 90aad5ecb..727c326bc 100644 --- a/src/node/shell/state.ml +++ b/src/node/shell/state.ml @@ -338,6 +338,8 @@ module Block = struct let max_operations_ttl { contents = { max_operations_ttl } } = max_operations_ttl + let is_genesis b = Block_hash.equal b.hash b.net_state.genesis.block + let known_valid net_state hash = Shared.use net_state.block_store begin fun store -> Store.Block.Contents.known (store, hash) diff --git a/src/node/shell/state.mli b/src/node/shell/state.mli index ec0480c3d..333b8f533 100644 --- a/src/node/shell/state.mli +++ b/src/node/shell/state.mli @@ -126,6 +126,7 @@ module Block : sig val message: t -> string val max_operations_ttl: t -> int + val is_genesis: t -> bool val predecessor: t -> block option Lwt.t val context: t -> Context.t Lwt.t diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index 2e4a54394..a0c21ef6c 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -7,923 +7,72 @@ (* *) (**************************************************************************) -open Logging.Node.Validator +include Logging.Make(struct let name = "node.validator" end) +module Canceler = Lwt_utils.Canceler type t = { - activate: ?parent:net_validator -> ?max_child_ttl:int -> State.Net.t -> net_validator Lwt.t ; - get: Net_id.t -> net_validator tzresult Lwt.t ; - get_exn: Net_id.t -> net_validator Lwt.t ; - deactivate: net_validator -> unit Lwt.t ; - inject_block: - ?force:bool -> - MBytes.t -> Distributed_db.operation list list -> - (Block_hash.t * State.Block.t tzresult Lwt.t) tzresult Lwt.t ; - notify_block: Block_hash.t -> Block_header.t -> unit Lwt.t ; - shutdown: unit -> unit Lwt.t ; - valid_block_input: State.Block.t Watcher.input ; + + state: State.t ; db: Distributed_db.t ; + block_validator: Block_validator.t ; + + valid_block_input: State.Block.t Watcher.input ; + active_nets: Net_validator.t Lwt.t Net_id.Table.t ; + } -and net_validator = { - net: State.Net.t ; - worker: t ; - parent: net_validator option ; - mutable child: net_validator option ; - prevalidator: Prevalidator.t ; - net_db: Distributed_db.net_db ; - notify_block: Block_hash.t -> Block_header.t -> unit Lwt.t ; - fetch_block: Block_hash.t -> State.Block.t tzresult Lwt.t ; - create_child: - State.Block.t -> Protocol_hash.t -> Time.t -> unit tzresult Lwt.t ; - check_child: - Block_hash.t -> Protocol_hash.t -> Time.t -> Time.t -> unit tzresult Lwt.t ; - deactivate_child: unit -> unit Lwt.t ; - test_validator: unit -> (net_validator * Distributed_db.net_db) option ; - shutdown: unit -> unit Lwt.t ; - valid_block_input_for_net: State.Block.t Watcher.input ; - new_head_input: State.Block.t Watcher.input ; - bootstrapped: unit Lwt.t ; -} - -let net_state { net } = net -let net_db { net_db } = net_db - -let activate w ?max_child_ttl net = w.activate ?max_child_ttl net -let deactivate net_validator = net_validator.worker.deactivate net_validator -let get w = w.get -let get_exn w = w.get_exn -let notify_block w = w.notify_block -let inject_block w = w.inject_block -let shutdown w = w.shutdown () -let test_validator w = w.test_validator () - -let fetch_block v = v.fetch_block -let prevalidator v = v.prevalidator -let bootstrapped v = v.bootstrapped - -(** Current block computation *) - -let fetch_protocol v hash = - lwt_log_notice "Fetching protocol %a" - Protocol_hash.pp_short hash >>= fun () -> - Distributed_db.Protocol.fetch v.worker.db hash () >>=? fun protocol -> - Updater.compile hash protocol >>= fun valid -> - if valid then begin - lwt_log_notice "Successfully compiled protocol %a" - Protocol_hash.pp_short hash >>= fun () -> - Distributed_db.commit_protocol v.worker.db hash >>=? fun _ -> - return true - end else begin - lwt_log_error "Failed to compile protocol %a" - Protocol_hash.pp_short hash >>= fun () -> - failwith "Cannot compile the protocol %a" Protocol_hash.pp_short hash - end - -let fetch_protocols v (block: State.Block.t) = - State.Block.context block >>= fun context -> - let proto_updated = - Context.get_protocol context >>= fun protocol_hash -> - match State.Registred_protocol.get protocol_hash with - | Some _ -> return false - | None -> fetch_protocol v protocol_hash - and test_proto_updated = - Context.get_test_network context >>= function - | Not_running -> return false - | Forking { protocol } - | Running { protocol } -> - match State.Registred_protocol.get protocol with - | Some _ -> return false - | None -> fetch_protocol v protocol in - proto_updated >>=? fun proto_updated -> - test_proto_updated >>=? fun test_proto_updated -> - return (proto_updated && test_proto_updated) - -let rec may_set_head v (block: State.Block.t) = - Chain.head v.net >>= fun head -> - let head_header = State.Block.header head - and head_hash = State.Block.hash head - and block_header = State.Block.header block - and block_hash = State.Block.hash block in - if - Fitness.compare - head_header.shell.fitness block_header.shell.fitness >= 0 - then - Lwt.return_unit - else begin - Chain.test_and_set_head v.net ~old:head block >>= function - | false -> may_set_head v block - | true -> - Distributed_db.Advertise.current_head v.net_db block ; - Prevalidator.flush v.prevalidator block ; - begin - begin - State.Block.test_network block >>= function - | Not_running -> v.deactivate_child () >>= return - | Running { genesis ; protocol ; expiration } -> - v.check_child genesis protocol expiration - block_header.shell.timestamp - | Forking { protocol ; expiration } -> - v.create_child block protocol expiration - end >>= function - | Ok () -> Lwt.return_unit - | Error err -> - lwt_log_error "@[Error while switch test network:@ %a@]" - Error_monad.pp_print_error err - end >>= fun () -> - Watcher.notify v.new_head_input block ; - lwt_log_notice "update current head %a %a %a(%t)" - Block_hash.pp_short block_hash - Fitness.pp block_header.shell.fitness - Time.pp_hum block_header.shell.timestamp - (fun ppf -> - if Block_hash.equal head_hash block_header.shell.predecessor then - Format.fprintf ppf "same branch" - else - Format.fprintf ppf "changing branch") >>= fun () -> - Lwt.return_unit - end - -(** Block validation *) - -type error += - | Invalid_operation of Operation_hash.t - | Invalid_fitness of { block: Block_hash.t ; - expected: Fitness.t ; - found: Fitness.t } - | Unknown_protocol - | Non_increasing_timestamp - | Non_increasing_fitness - | Wrong_level of Int32.t * Int32.t - | Wrong_proto_level of int * int - | Replayed_operation of Operation_hash.t - | Outdated_operation of Operation_hash.t * Block_hash.t - -let () = - Error_monad.register_error_kind - `Permanent - ~id:"validator.invalid_fitness" - ~title:"Invalid fitness" - ~description:"The computed fitness differs from the fitness found \ - \ in the block header." - ~pp:(fun ppf (block, expected, found) -> - Format.fprintf ppf - "@[Invalid fitness for block %a@ \ - \ expected %a@ \ - \ found %a" - Block_hash.pp_short block - Fitness.pp expected - Fitness.pp found) - Data_encoding.(obj3 - (req "block" Block_hash.encoding) - (req "expected" Fitness.encoding) - (req "found" Fitness.encoding)) - (function Invalid_fitness { block ; expected ; found } -> - Some (block, expected, found) | _ -> None) - (fun (block, expected, found) -> - Invalid_fitness { block ; expected ; found }) ; - register_error_kind - `Permanent - ~id:"validator.wrong_level" - ~title:"Wrong level" - ~description:"The block level is not the expected one" - ~pp:(fun ppf (e, g) -> - Format.fprintf ppf - "The declared level %ld is not %ld" g e) - Data_encoding.(obj2 - (req "expected" int32) - (req "provided" int32)) - (function Wrong_level (e, g) -> Some (e, g) | _ -> None) - (fun (e, g) -> Wrong_level (e, g)) ; - register_error_kind - `Permanent - ~id:"validator.wrong_proto_level" - ~title:"Wrong protocol level" - ~description:"The protocol level is not the expected one" - ~pp:(fun ppf (e, g) -> - Format.fprintf ppf - "The declared protocol level %d is not %d" g e) - Data_encoding.(obj2 - (req "expected" uint8) - (req "provided" uint8)) - (function Wrong_proto_level (e, g) -> Some (e, g) | _ -> None) - (fun (e, g) -> Wrong_proto_level (e, g)) ; - register_error_kind - `Permanent - ~id:"validator.replayed_operation" - ~title:"Replayed operation" - ~description:"The block contains an operation that was previously \ - included in the chain" - ~pp:(fun ppf oph -> - Format.fprintf ppf - "The operation %a was previously included in the chain." - Operation_hash.pp oph) - Data_encoding.(obj1 (req "hash" Operation_hash.encoding)) - (function Replayed_operation oph -> Some oph | _ -> None) - (function oph -> Replayed_operation oph) ; - register_error_kind - `Permanent - ~id:"validator.outdated_operations" - ~title:"Outdated operation" - ~description:"The block contains an operation which is outdated." - ~pp:(fun ppf (oph, bh)-> - Format.fprintf ppf - "The operation %a is outdated (%a)" - Operation_hash.pp oph - Block_hash.pp bh) - Data_encoding.(obj2 - (req "operation" Operation_hash.encoding) - (req "block" Block_hash.encoding)) - (function Outdated_operation (oph, bh) -> Some (oph, bh) | _ -> None) - (function (oph, bh) -> Outdated_operation (oph, bh)) - -let apply_block net_state db - (pred: State.Block.t) hash (block: Block_header.t) = - let pred_header = State.Block.header pred - and pred_hash = State.Block.hash pred in - State.Block.context pred >>= fun pred_context -> - let id = State.Net.id net_state in - lwt_log_notice "validate block %a (after %a), net %a" - Block_hash.pp_short hash - Block_hash.pp_short block.shell.predecessor - Net_id.pp id >>= fun () -> - fail_unless - (Int32.succ pred_header.shell.level = block.shell.level) - (Wrong_level (Int32.succ pred_header.shell.level, - block.shell.level)) >>=? fun () -> - lwt_log_info "validation of %a: looking for dependencies..." - Block_hash.pp_short hash >>= fun () -> - Distributed_db.Operations.fetch - db (hash, 0) block.shell.operations_hash >>=? fun operations -> - fail_unless (block.shell.validation_passes <= 1) - (* TODO constant to be exported from the protocol... *) - (failure "unexpected error (TO BE REMOVED)") >>=? fun () -> - let operation_hashes = List.map Operation.hash operations in - lwt_debug "validation of %a: found operations" - Block_hash.pp_short hash >>= fun () -> - begin (* Are we validating a block in an expired test network ? *) - match State.Net.expiration net_state with - | Some eol when Time.(eol <= block.shell.timestamp) -> - failwith "This test network expired..." - | None | Some _ -> return () - end >>=? fun () -> - begin - if Time.(pred_header.shell.timestamp >= block.shell.timestamp) then - fail Non_increasing_timestamp - else - return () - end >>=? fun () -> - begin - if Fitness.compare pred_header.shell.fitness block.shell.fitness >= 0 then - fail Non_increasing_fitness - else - return () - end >>=? fun () -> - begin - Chain_traversal.live_blocks - pred (State.Block.max_operations_ttl pred) >>= fun (live_blocks, - live_operations) -> - let rec assert_no_duplicates live_operations = function - | [] -> return () - | oph :: ophs -> - if Operation_hash.Set.mem oph live_operations then - fail (Replayed_operation oph) - else - assert_no_duplicates - (Operation_hash.Set.add oph live_operations) ophs in - let assert_live operations = - List.fold_left - (fun acc op -> - acc >>=? fun () -> - fail_unless - (Block_hash.Set.mem op.Operation.shell.branch live_blocks) - (Outdated_operation (Operation.hash op, op.shell.branch))) - (return ()) operations in - assert_no_duplicates live_operations operation_hashes >>=? fun () -> - assert_live operations - end >>=? fun () -> - Context.get_protocol pred_context >>= fun pred_protocol_hash -> - begin - match State.Registred_protocol.get pred_protocol_hash with - | None -> fail Unknown_protocol - | Some p -> return p - end >>=? fun (module Proto) -> - lwt_debug "validation of %a: Proto %a" - Block_hash.pp_short hash - Protocol_hash.pp_short Proto.hash >>= fun () -> - lwt_debug "validation of %a: parsing header..." - Block_hash.pp_short hash >>= fun () -> - lwt_debug "validation of %a: parsing operations..." - Block_hash.pp_short hash >>= fun () -> - map2_s - (fun op_hash raw -> - Lwt.return (Proto.parse_operation op_hash raw) - |> trace (Invalid_operation op_hash)) - operation_hashes - operations >>=? fun parsed_operations -> - lwt_debug "validation of %a: applying block..." - Block_hash.pp_short hash >>= fun () -> - Context.reset_test_network - pred_context pred_hash block.shell.timestamp >>= fun context -> - Proto.begin_application - ~predecessor_context:context - ~predecessor_timestamp:pred_header.shell.timestamp - ~predecessor_fitness:pred_header.shell.fitness - block >>=? fun state -> - fold_left_s (fun state op -> - Proto.apply_operation state op >>=? fun state -> - return state) - state parsed_operations >>=? fun state -> - Proto.finalize_block state >>=? fun new_context -> - Context.get_protocol new_context.context >>= fun new_protocol -> - let expected_proto_level = - if Protocol_hash.equal new_protocol pred_protocol_hash then - pred_header.shell.proto_level - else - (pred_header.shell.proto_level + 1) mod 256 in - fail_when (block.shell.proto_level <> expected_proto_level) - (Wrong_proto_level (block.shell.proto_level, expected_proto_level)) - >>=? fun () -> - fail_unless - (Fitness.equal new_context.fitness block.shell.fitness) - (Invalid_fitness - { block = hash ; - expected = block.shell.fitness ; - found = new_context.fitness ; - }) >>=? fun () -> - let max_operations_ttl = - max 0 - (min - ((State.Block.max_operations_ttl pred)+1) - new_context.max_operations_ttl) in - let new_context = - { new_context with max_operations_ttl } in - lwt_log_info "validation of %a: success" - Block_hash.pp_short hash >>= fun () -> - return new_context - -(** *) - -module Context_db = struct - - type data = - { validator: net_validator ; - state: [ `Inited of Block_header.t tzresult - | `Initing of Block_header.t tzresult Lwt.t - | `Running of State.Block.t tzresult Lwt.t ] ; - wakener: State.Block.t tzresult Lwt.u } - - type context = - { tbl : data Block_hash.Table.t ; - canceler : Lwt_utils.Canceler.t ; - worker_trigger: unit -> unit; - worker_waiter: unit -> unit Lwt.t ; - worker: unit Lwt.t ; - net_db : Distributed_db.net_db ; - net_state : State.Net.t } - - let pending_requests { tbl } = - Block_hash.Table.fold - (fun h data acc -> - match data.state with - | `Initing _ -> acc - | `Running _ -> acc - | `Inited d -> (h, d, data) :: acc) - tbl [] - - let pending { tbl } hash = Block_hash.Table.mem tbl hash - - let request validator { tbl ; worker_trigger ; net_db } hash = - assert (not (Block_hash.Table.mem tbl hash)); - let waiter, wakener = Lwt.wait () in - let data = - Distributed_db.Block_header.fetch net_db hash () in - match Lwt.state data with - | Lwt.Return data -> - let state = `Inited data in - Block_hash.Table.add tbl hash { validator ; state ; wakener } ; - worker_trigger () ; - waiter - | _ -> - let state = `Initing data in - Block_hash.Table.add tbl hash { validator ; state ; wakener } ; - Lwt.async - (fun () -> - data >>= fun data -> - let state = `Inited data in - Block_hash.Table.replace tbl hash { validator ; state ; wakener } ; - worker_trigger () ; - Lwt.return_unit) ; - waiter - - let prefetch validator ({ net_state ; tbl } as session) hash = - Lwt.ignore_result - (State.Block.known_valid net_state hash >>= fun exists -> - if not exists && not (Block_hash.Table.mem tbl hash) then - request validator session hash >>= fun _ -> Lwt.return_unit - else - Lwt.return_unit) - - let known { net_state } hash = - State.Block.known_valid net_state hash - - let read { net_state } hash = - State.Block.read net_state hash - - let fetch ({ net_state ; tbl } as session) validator hash = - try Lwt.waiter_of_wakener (Block_hash.Table.find tbl hash).wakener - with Not_found -> - State.Block.known_invalid net_state hash >>= fun known_invalid -> - if known_invalid then - Lwt.return (Error [failure "Invalid predecessor"]) - else - State.Block.read_opt net_state hash >>= function - | Some op -> - Lwt.return (Ok op) - | None -> - try Lwt.waiter_of_wakener (Block_hash.Table.find tbl hash).wakener - with Not_found -> request validator session hash - - let store { net_db ; tbl } hash data = - begin - match data with - | Ok data -> begin - Distributed_db.commit_block net_db hash data >>=? function - | None -> - (* Should not happen if the block is not validated twice *) - assert false - | Some block -> - return (Ok block) - end - | Error err -> - Distributed_db.commit_invalid_block net_db hash >>=? fun changed -> - assert changed ; - return (Error err) - end >>= function - | Ok block -> - let wakener = (Block_hash.Table.find tbl hash).wakener in - Block_hash.Table.remove tbl hash; - Lwt.wakeup wakener block ; - Lwt.return_unit - | Error _ as err -> - let wakener = (Block_hash.Table.find tbl hash).wakener in - Block_hash.Table.remove tbl hash; - Lwt.wakeup wakener err ; - Lwt.return_unit - - let process (v: net_validator) ~get_context ~set_context hash block = - let net_state = Distributed_db.net_state v.net_db in - get_context v block.Block_header.shell.predecessor >>= function - | Error _ as error -> - Lwt_unix.yield () >>= fun () -> - set_context v hash (Error [(* TODO *)]) >>= fun () -> - Lwt.return error - | Ok _context -> - lwt_debug "process %a" Block_hash.pp_short hash >>= fun () -> - begin - Chain.genesis net_state >>= fun genesis -> - if Block_hash.equal (State.Block.hash genesis) - block.shell.predecessor then - Lwt.return genesis - else - State.Block.read_exn net_state block.shell.predecessor - end >>= fun pred -> - apply_block net_state v.net_db pred hash block >>= function - | Error ([Unknown_protocol] as err) as error -> - lwt_log_error - "@[Ignoring block %a@ %a@]" - Block_hash.pp_short hash - Error_monad.pp_print_error err >>= fun () -> - Lwt.return error - | Error exns as error -> - set_context v hash error >>= fun () -> - lwt_warn "Failed to validate block %a." - Block_hash.pp_short hash >>= fun () -> - lwt_debug "%a" Error_monad.pp_print_error exns >>= fun () -> - Lwt.return error - | Ok new_context -> - (* The sanity check `set_context` detects differences - between the computed fitness and the fitness announced - in the block header. Then `Block.read` will - return an error. *) - set_context v hash (Ok new_context) >>= fun () -> - State.Block.read net_state hash >>= function - | Error err as error -> - lwt_log_error - "@[Ignoring block %a@ %a@]" - Block_hash.pp_short hash - Error_monad.pp_print_error err >>= fun () -> - Lwt.return error - | Ok block -> - lwt_debug - "validation of %a: reevaluate current block" - Block_hash.pp_short hash >>= fun () -> - Watcher.notify v.worker.valid_block_input block ; - Watcher.notify v.valid_block_input_for_net block ; - fetch_protocols v block >>=? fun _fetched -> - may_set_head v block >>= fun () -> - return block - - let request session ~get_context ~set_context pendings = - let time = Time.now () in - let min_block b pb = - match pb with - | None -> Some b - | Some pb - when b.Block_header.shell.timestamp - < pb.Block_header.shell.timestamp -> - Some b - | Some _ as pb -> pb in - let next = - List.fold_left - (fun acc (hash, block, (data : data)) -> - match block with - | Error _ -> - acc - | Ok block -> - if Time.(block.Block_header.shell.timestamp > time) then - min_block block acc - else begin - Block_hash.Table.replace session.tbl hash { data with state = `Running begin - Lwt_main.yield () >>= fun () -> - process data.validator ~get_context ~set_context hash block >>= fun res -> - Block_hash.Table.remove session.tbl hash ; - Lwt.return res - end } ; - acc - end) - None - pendings in - match next with - | None -> 0. - | Some b -> Int64.to_float (Time.diff b.Block_header.shell.timestamp time) - - let create net_db = - let net_state = Distributed_db.net_state net_db in - let tbl = Block_hash.Table.create 50 in - let canceler = Lwt_utils.Canceler.create () in - let worker_trigger, worker_waiter = Lwt_utils.trigger () in - let session = - { tbl ; net_db ; net_state ; worker = Lwt.return () ; - canceler ; worker_trigger ; worker_waiter } in - let worker = - let rec worker_loop () = - Lwt_utils.protect ~canceler begin fun () -> - worker_waiter () >>= return - end >>= function - | Error [Lwt_utils.Canceled] -> Lwt.return_unit - | Error err -> - lwt_log_error - "@[Unexpected error in validation:@ %a@]" - pp_print_error err >>= fun () -> - worker_loop () - | Ok () -> - begin - match pending_requests session with - | [] -> () - | requests -> - let set_context _validator hash context = - store session hash context >>= fun _ -> - Lwt.return_unit in - let timeout = - request session - ~get_context:(fetch session) - ~set_context requests in - if timeout > 0. then - Lwt.ignore_result - (Lwt_unix.sleep timeout >|= worker_trigger); - end ; - worker_loop () - in - Lwt_utils.worker "validation" - ~run:worker_loop - ~cancel:(fun () -> Lwt_utils.Canceler.cancel canceler) in - { session with worker } - - let shutdown { canceler ; worker } = - Lwt_utils.Canceler.cancel canceler >>= fun () -> worker - -end - - -let create_validator ?parent worker ?max_child_ttl state db net = - - let net_id = State.Net.id net in - let net_db = Distributed_db.activate db net in - let session = Context_db.create net_db in - - let queue = Lwt_pipe.create () in - Prevalidator.create net_db >>= fun prevalidator -> - let new_blocks = ref Lwt.return_unit in - - Distributed_db.set_callback net_db { - notify_branch = begin fun gid locator -> - Lwt.async (fun () -> Lwt_pipe.push queue (`Branch (gid, locator))) - end ; - notify_head = begin fun gid block ops -> - Lwt.async (fun () -> Lwt_pipe.push queue (`Head (gid, Block_header.hash block, ops))) ; - end ; - disconnection = (fun _gid -> ()) ; - } ; - - let shutdown () = - lwt_log_notice "shutdown %a" Net_id.pp net_id >>= fun () -> - Distributed_db.deactivate net_db >>= fun () -> - Lwt_pipe.close queue ; - Lwt.join [ - Context_db.shutdown session ; - !new_blocks ; - Prevalidator.shutdown prevalidator ; - ] - in - - let valid_block_input_for_net = Watcher.create_input () in - let new_head_input = Watcher.create_input () in - - let bootstrapped = - (* TODO improve by taking current peers count and current - locators into account... *) - let stream, stopper = - Watcher.create_stream valid_block_input_for_net in - let rec wait () = - Lwt.pick [ ( Lwt_stream.get stream ) ; - ( Lwt_unix.sleep 30. >|= fun () -> None) ] >>= function - | Some block when - Time.((State.Block.header block).shell.timestamp < add (Time.now ()) (-60L)) -> - wait () - | _ -> - Chain.head net >>= fun head -> - Chain.genesis net >>= fun genesis -> - if State.Block.equal head genesis then - wait () - else - Lwt.return_unit in - let net_validator = - wait () >>= fun () -> - Watcher.shutdown stopper ; - Lwt.return_unit in - Lwt.no_cancel net_validator - in - - let rec v = { - net ; - worker ; - parent ; - child = None ; - prevalidator ; - net_db ; - shutdown ; - notify_block ; - fetch_block ; - create_child ; - check_child ; - deactivate_child ; - test_validator ; - bootstrapped ; - new_head_input ; - valid_block_input_for_net ; - } - - and notify_block hash block = - lwt_debug "-> Validator.notify_block %a" - Block_hash.pp_short hash >>= fun () -> - Chain.head net >>= fun head -> - let head_header = State.Block.header head in - if Fitness.compare head_header.shell.fitness block.shell.fitness <= 0 then - Context_db.prefetch v session hash ; - Lwt.return_unit - - and fetch_block hash = - Context_db.fetch session v hash - - and create_child block protocol expiration = - if State.Net.allow_forked_network net then begin - deactivate_child () >>= fun () -> - begin - State.Net.get state net_id >>= function - | Ok net_store -> return net_store - | Error _ -> - State.fork_testnet block protocol expiration >>=? fun net_store -> - Chain.head net_store >>= fun block -> - Watcher.notify v.worker.valid_block_input block ; - return net_store - end >>=? fun net_store -> - worker.activate ~parent:v net_store >>= fun child -> - v.child <- Some child ; - return () - end else begin - (* Ignoring request... *) - return () - end - - and deactivate_child () = - match v.child with - | None -> Lwt.return_unit - | Some child -> - v.child <- None ; - deactivate child - - and check_child genesis protocol expiration current_time = - let activated = - match v.child with - | None -> false - | Some child -> - Block_hash.equal (State.Net.genesis child.net).block genesis in - begin - match max_child_ttl with - | None -> return expiration - | Some ttl -> - Distributed_db.Block_header.fetch net_db genesis () >>=? fun genesis -> - return - (Time.min expiration - (Time.add genesis.shell.timestamp (Int64.of_int ttl))) - end >>=? fun local_expiration -> - let expired = Time.(local_expiration <= current_time) in - if expired && activated then - deactivate_child () >>= return - else if not activated && not expired then - fetch_block genesis >>=? fun genesis -> - create_child genesis protocol expiration - else - return () - - and test_validator () = - match v.child with - | None -> None - | Some child -> Some (child, child.net_db) - in - - new_blocks := begin - let rec loop () = - Lwt_pipe.pop queue >>= function - | `Branch (_gid, locator) -> - let head, hist = (locator : Block_locator.t :> _ * _) in - List.iter - (Context_db.prefetch v session) - (Block_header.hash head :: hist) ; - loop () - | `Head (gid, head, ops) -> - Context_db.prefetch v session head ; - Prevalidator.notify_operations prevalidator gid ops ; - loop () - in - Lwt.catch loop - (function Lwt_pipe.Closed -> Lwt.return_unit - | exn -> Lwt.fail exn) - end ; - - Lwt.return v - let create state db = - - let validators : net_validator Lwt.t Net_id.Table.t = - Net_id.Table.create 7 in - + let block_validator = Block_validator.create db in let valid_block_input = Watcher.create_input () in - - let get_exn net = Net_id.Table.find validators net in - let get net = - try get_exn net >>= fun v -> return v - with Not_found -> fail (State.Unknown_network net) in - let remove net = Net_id.Table.remove validators net in - - let deactivate { net } = - let id = State.Net.id net in - get id >>= function - | Error _ -> Lwt.return_unit - | Ok v -> - lwt_log_notice "deactivate network %a" Net_id.pp id >>= fun () -> - remove id ; - v.shutdown () - in - - let notify_block hash (block : Block_header.t) = - match get_exn block.shell.net_id with - | exception Not_found -> Lwt.return_unit - | net -> - net >>= fun net -> - net.notify_block hash block in - - let cancelation, cancel, _on_cancel = Lwt_utils.canceler () in - - let maintenance_worker = - let next_net_maintenance = ref (Time.now ()) in - let net_maintenance () = - lwt_log_info "net maintenance" >>= fun () -> - let time = Time.now () in - Net_id.Table.fold - (fun _ v acc -> - v >>= fun v -> - acc >>= fun () -> - match State.Net.expiration v.net with - | Some eol when Time.(eol <= time) -> deactivate v - | Some _ | None -> Lwt.return_unit) - validators Lwt.return_unit >>= fun () -> - State.Net.all state >>= fun all_net -> - Lwt_list.iter_p - (fun net -> - match State.Net.expiration net with - | Some eol when Time.(eol <= time) -> - lwt_log_notice "destroy network %a" - Net_id.pp (State.Net.id net) >>= fun () -> - State.Net.destroy state net - | Some _ | None -> Lwt.return_unit) - all_net >>= fun () -> - next_net_maintenance := Time.add (Time.now ()) (Int64.of_int 55) ; - Lwt.return_unit in - let next_head_maintenance = ref (Time.now ()) in - let head_maintenance () = - lwt_log_info "head maintenance" >>= fun () -> - (* TODO *) - next_head_maintenance := Time.add (Time.now ()) (Int64.of_int 55) ; - Lwt.return_unit in - let rec worker_loop () = - let timeout = - let next = min !next_head_maintenance !next_net_maintenance in - let delay = Time.(diff next (now ())) in - if delay <= 0L then - Lwt.return_unit - else - Lwt_unix.sleep (Int64.to_float delay) in - Lwt.pick [(timeout >|= fun () -> `Process); - (cancelation () >|= fun () -> `Cancel)] >>= function - | `Cancel -> Lwt.return_unit - | `Process -> - begin - if !next_net_maintenance < Time.now () then - net_maintenance () - else - Lwt.return () - end >>= fun () -> - begin - if !next_head_maintenance < Time.now () then - head_maintenance () - else - Lwt.return () - end >>= fun () -> - worker_loop () - in - Lwt_utils.worker "validator_maintenance" ~run:worker_loop ~cancel in - - let shutdown () = - cancel () >>= fun () -> - let validators = - Net_id.Table.fold - (fun _ (v: net_validator Lwt.t) acc -> (v >>= fun v -> v.shutdown ()) :: acc) - validators [] in - Lwt.join (maintenance_worker :: validators) in - - let inject_block ?(force = false) bytes operations = - Distributed_db.inject_block db bytes operations >>=? fun (hash, block) -> - get block.shell.net_id >>=? fun net -> - let validation = - protect - ~on_error: begin fun err -> - Distributed_db.clear_block - net.net_db hash (List.length operations) ; - Lwt.return (Error err) - end - begin fun () -> - Chain.head net.net >>= fun head -> - let head_header = State.Block.header head in - if force || - Fitness.compare head_header.shell.fitness block.shell.fitness <= 0 - then - fetch_block net hash - else - failwith "Fitness is below the current one" - end in - return (hash, validation) in - - let rec activate ?parent ?max_child_ttl net = - let net_id = State.Net.id net in - lwt_log_notice "activate network %a" - Net_id.pp net_id >>= fun () -> - get net_id >>= function - | Error _ -> - let v = create_validator ?max_child_ttl ?parent worker state db net in - Net_id.Table.add validators net_id v ; - v - | Ok v -> Lwt.return v - - and worker = { - get ; get_exn ; - activate ; deactivate ; - notify_block ; - inject_block ; - shutdown ; + { state ; db ; block_validator ; valid_block_input ; - db ; + active_nets = Net_id.Table.create 7 ; } - in +let activate v ?bootstrap_threshold ?max_child_ttl net_state = + let net_id = State.Net.id net_state in + lwt_log_notice "activate network %a" Net_id.pp net_id >>= fun () -> + try Net_id.Table.find v.active_nets net_id + with Not_found -> + let nv = + Net_validator.create + ?bootstrap_threshold + ?max_child_ttl + v.block_validator v.valid_block_input v.db net_state in + Net_id.Table.add v.active_nets net_id nv ; + nv - worker +let get_exn { active_nets } net_id = + Net_id.Table.find active_nets net_id -let new_head_watcher { new_head_input } = - Watcher.create_stream new_head_input +type error += + | Inactive_network of Net_id.t -let watcher { valid_block_input_for_net } = - Watcher.create_stream valid_block_input_for_net +let get v net_id = + try get_exn v net_id >>= fun nv -> return nv + with Not_found -> fail (Inactive_network net_id) -let global_watcher ({ valid_block_input } : t) = +let inject_block v ?force bytes operations = + let hash = Block_hash.hash_bytes [bytes] in + match Block_header.of_bytes bytes with + | None -> failwith "Cannot parse block header." + | Some block -> + get v block.shell.net_id >>=? fun nv -> + (* TODO... remove `Distributed_db.operation` + and only accept raw operations ??? *) + let validation = + map_p (map_p (Distributed_db.resolve_operation (Net_validator.net_db nv))) operations >>=? fun operations -> + Net_validator.validate_block nv ?force hash block operations in + return (hash, validation) + +let shutdown { active_nets ; block_validator } = + let jobs = + Block_validator.shutdown block_validator :: + Net_id.Table.fold + (fun _ nv acc -> (nv >>= Net_validator.shutdown) :: acc) + active_nets [] in + Lwt.join jobs >>= fun () -> + Lwt.return_unit + +let watcher { valid_block_input } = Watcher.create_stream valid_block_input diff --git a/src/node/shell/validator.mli b/src/node/shell/validator.mli index 07cc8917a..5c02d68e8 100644 --- a/src/node/shell/validator.mli +++ b/src/node/shell/validator.mli @@ -12,35 +12,21 @@ type t val create: State.t -> Distributed_db.t -> t val shutdown: t -> unit Lwt.t -val notify_block: t -> Block_hash.t -> Block_header.t -> unit Lwt.t - -type net_validator +val activate: + t -> + ?bootstrap_threshold:int -> + ?max_child_ttl:int -> + State.Net.t -> Net_validator.t Lwt.t type error += - | Non_increasing_timestamp - | Non_increasing_fitness - -val activate: t -> ?max_child_ttl:int -> State.Net.t -> net_validator Lwt.t -val get: t -> Net_id.t -> net_validator tzresult Lwt.t -val get_exn: t -> Net_id.t -> net_validator Lwt.t -val deactivate: net_validator -> unit Lwt.t - -val net_state: net_validator -> State.Net.t -val net_db: net_validator -> Distributed_db.net_db - -val fetch_block: - net_validator -> Block_hash.t -> State.Block.t tzresult Lwt.t + | Inactive_network of Net_id.t +val get: t -> Net_id.t -> Net_validator.t tzresult Lwt.t +val get_exn: t -> Net_id.t -> Net_validator.t Lwt.t val inject_block: - t -> ?force:bool -> + t -> + ?force:bool -> MBytes.t -> Distributed_db.operation list list -> (Block_hash.t * State.Block.t tzresult Lwt.t) tzresult Lwt.t -val prevalidator: net_validator -> Prevalidator.t -val test_validator: net_validator -> (net_validator * Distributed_db.net_db) option - -val watcher: net_validator -> State.Block.t Lwt_stream.t * Watcher.stopper -val new_head_watcher: net_validator -> State.Block.t Lwt_stream.t * Watcher.stopper -val global_watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper - -val bootstrapped: net_validator -> unit Lwt.t +val watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper diff --git a/src/utils/lwt_utils.ml b/src/utils/lwt_utils.ml index 19fc3c5b1..9fdf86406 100644 --- a/src/utils/lwt_utils.ml +++ b/src/utils/lwt_utils.ml @@ -468,7 +468,8 @@ let protect ?on_error ?canceler t = let err = if canceled then [Canceled] else err in match on_error with | None -> Lwt.return (Error err) - | Some on_error -> on_error err + | Some on_error -> + Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn)) type error += Timeout diff --git a/test/test_multinode.sh b/test/test_multinode.sh index b886367fe..b13800291 100755 --- a/test/test_multinode.sh +++ b/test/test_multinode.sh @@ -5,7 +5,7 @@ set -e test_dir="$(cd "$(dirname "$0")" && echo "$(pwd -P)")" source $test_dir/lib/test_lib.inc.sh -expected_connections=3 +expected_connections=4 max_peer_id=8 for i in $(seq 1 $max_peer_id); do echo @@ -15,18 +15,18 @@ for i in $(seq 1 $max_peer_id); do echo done -## waiting for the node to establich connections -sleep 2 +## waiting for the node to establish connections + for client in "${client_instances[@]}"; do echo echo "### $client network stat" echo + $client bootstrapped $client network stat echo done activate_alpha -sleep 5 assert_propagation_level() { level=$1 @@ -38,12 +38,16 @@ assert_propagation_level() { done } -printf "\n\nAsserting protocol propagation\n" -for client in "${client_instances[@]}"; do - $client rpc call /blocks/head/protocol \ - | assert_in_output "ProtoALphaALphaALphaALphaALphaALphaALphaALphaDdp3zK" -done +assert_protocol() { + proto=$1 + printf "\n\nAsserting protocol propagation\n" + for client in "${client_instances[@]}"; do + ( $client rpc call /blocks/head/protocol | assert_in_output "$proto" ) \ + || exit 2 + done +} + printf "\n\n" @@ -66,6 +70,8 @@ retry() { done } +retry 2 15 assert_protocol "ProtoALphaALphaALphaALphaALphaALphaALphaALphaDdp3zK" + $client1 mine for bootstrap1 -max-priority 512 retry 2 15 assert_propagation_level 2 @@ -81,7 +87,8 @@ retry 2 15 assert_propagation_level 5 endorse_hash=$($client3 endorse for bootstrap3 | extract_operation_hash) transfer_hash=$($client4 transfer 500 from bootstrap1 to bootstrap3 | extract_operation_hash) -retry 2 15 $client4 mine for bootstrap4 -max-priority 512 +# wait for the propagation of operations +sleep 2 assert_contains_operation() { hash="$1" @@ -93,8 +100,9 @@ assert_contains_operation() { done } +$client4 mine for bootstrap4 -max-priority 512 retry 2 15 assert_contains_operation $endorse_hash -assert_contains_operation $transfer_hash +retry 2 15 assert_contains_operation $transfer_hash echo echo End of test