From 768cf91cd6d141fa808c6955ad674b269d7ffd27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Tue, 28 Feb 2017 00:48:22 +0100 Subject: [PATCH] Client: implements `./tezos-client bootstrapped` It wait for the node to be synchronized with the network. The heuristic is currently: - the timestamp of current head is less than 1 minute old ; - there was a period of 30 seconds without new block discovered. --- src/client/client_helpers.ml | 16 +++++- src/client/client_node_rpcs.ml | 2 + src/client/client_node_rpcs.mli | 3 ++ src/node/shell/node.ml | 80 +++++++++++++++++++--------- src/node/shell/node.mli | 6 +++ src/node/shell/node_rpc.ml | 7 ++- src/node/shell/node_rpc_services.ml | 9 ++++ src/node/shell/node_rpc_services.mli | 2 + src/node/shell/validator.ml | 40 +++++++++++++- src/node/shell/validator.mli | 6 ++- 10 files changed, 141 insertions(+), 30 deletions(-) diff --git a/src/client/client_helpers.ml b/src/client/client_helpers.ml index 333ba361b..444cae07e 100644 --- a/src/client/client_helpers.ml +++ b/src/client/client_helpers.ml @@ -34,5 +34,19 @@ let commands () = Cli_entries.[ | _ :: _ :: _ when !unique -> Pervasives.exit 3 | completions -> List.iter print_endline completions ; - Lwt.return_unit) + Lwt.return_unit) ; + command + ~desc: "Wait for the node to be bootstrapped." + ~args: [] + (prefixes [ "bootstrapped" ] @@ + stop) + (fun cctxt -> + Client_node_rpcs.bootstrapped cctxt >>= fun stream -> + Lwt_stream.iter_s (fun (hash, time) -> + cctxt.message "Current head: %a (%a)" + Block_hash.pp_short hash + Time.pp_hum time + ) stream >>= fun () -> + cctxt.answer "Bootstrapped." +) ] diff --git a/src/client/client_node_rpcs.ml b/src/client/client_node_rpcs.ml index 996770581..9c8c3c148 100644 --- a/src/client/client_node_rpcs.ml +++ b/src/client/client_node_rpcs.ml @@ -141,6 +141,8 @@ let inject_operation cctxt ?(wait = true) ?force operation = call_service0 cctxt Services.inject_operation (operation, wait, force) let inject_protocol cctxt ?(wait = true) ?force protocol = call_service0 cctxt Services.inject_protocol (protocol, wait, force) +let bootstrapped cctxt = + call_streamed_service0 cctxt Services.bootstrapped () let complete cctxt ?block prefix = match block with | None -> diff --git a/src/client/client_node_rpcs.mli b/src/client/client_node_rpcs.mli index c8724bfa4..7d31a156e 100644 --- a/src/client/client_node_rpcs.mli +++ b/src/client/client_node_rpcs.mli @@ -149,6 +149,9 @@ module Protocols : sig (Protocol_hash.t * Store.Protocol.t option) list Lwt.t end +val bootstrapped: + Client_commands.context -> (Block_hash.t * Time.t) Lwt_stream.t Lwt.t + val complete: Client_commands.context -> ?block:Blocks.block -> string -> string list Lwt.t diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index a08209734..4030e0e4b 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -50,9 +50,9 @@ type t = { state: State.t ; distributed_db: Distributed_db.t ; validator: Validator.worker ; - global_db: Distributed_db.net ; - global_net: State.Net.t ; - global_validator: Validator.t ; + mainnet_db: Distributed_db.net ; + mainnet_net: State.Net.t ; + mainnet_validator: Validator.t ; inject_block: ?force:bool -> MBytes.t -> (Block_hash.t * unit tzresult Lwt.t) tzresult Lwt.t ; @@ -99,9 +99,9 @@ let create { genesis ; store_root ; context_root ; State.Net.create state ?test_protocol ~forked_network_ttl:(48 * 3600) (* 2 days *) - genesis >>= fun global_net -> - Validator.activate validator global_net >>= fun global_validator -> - let global_db = Validator.net_db global_validator in + genesis >>= fun mainnet_net -> + Validator.activate validator mainnet_net >>= fun mainnet_validator -> + let mainnet_db = Validator.net_db mainnet_validator in let shutdown () = P2p.shutdown p2p >>= fun () -> Validator.shutdown validator >>= fun () -> @@ -111,9 +111,9 @@ let create { genesis ; store_root ; context_root ; state ; distributed_db ; validator ; - global_db ; - global_net ; - global_validator ; + mainnet_db ; + mainnet_net ; + mainnet_validator ; inject_block = inject_block validator ; inject_operation = inject_operation validator ; inject_protocol = inject_protocol state ; @@ -181,16 +181,16 @@ module RPC = struct let get_net node = function | `Genesis | `Head _ | `Prevalidation -> - node.global_validator, node.global_db + node.mainnet_validator, node.mainnet_db | `Test_head _ | `Test_prevalidation -> - match Validator.test_validator node.global_validator with + match Validator.test_validator node.mainnet_validator with | None -> raise Not_found | Some v -> v let get_validator node = function - | `Genesis | `Head _ | `Prevalidation -> node.global_validator + | `Genesis | `Head _ | `Prevalidation -> node.mainnet_validator | `Test_head _ | `Test_prevalidation -> - match Validator.test_validator node.global_validator with + match Validator.test_validator node.mainnet_validator with | None -> raise Not_found | Some (v, _) -> v @@ -198,16 +198,16 @@ module RPC = struct Distributed_db.read_block_exn node.distributed_db hash >>= fun (_net_db, block) -> if State.Net_id.equal - (State.Net.id node.global_net) + (State.Net.id node.mainnet_net) block.shell.net_id then - Lwt.return (Some (node.global_validator, node.global_db)) + Lwt.return (Some (node.mainnet_validator, node.mainnet_db)) else - match Validator.test_validator node.global_validator with + match Validator.test_validator node.mainnet_validator with | Some (test_validator, net_db) when State.Net_id.equal (State.Net.id (Validator.net_state test_validator)) block.shell.net_id -> - Lwt.return (Some (node.global_validator, net_db)) + Lwt.return (Some (node.mainnet_validator, net_db)) | _ -> Lwt.return_none let read_valid_block node h = @@ -246,7 +246,7 @@ module RPC = struct let block_info node (block: block) = match block with | `Genesis -> - State.Valid_block.Current.genesis node.global_net >|= convert + State.Valid_block.Current.genesis node.mainnet_net >|= convert | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in let net_db = Validator.net_db validator in @@ -271,7 +271,7 @@ module RPC = struct let get_context node block = match block with | `Genesis -> - State.Valid_block.Current.genesis node.global_net >>= fun block -> + State.Valid_block.Current.genesis node.mainnet_net >>= fun block -> Lwt.return (Some block.context) | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in @@ -293,7 +293,7 @@ module RPC = struct let operations node block = match block with | `Genesis -> - State.Valid_block.Current.genesis node.global_net >>= fun { operations } -> + State.Valid_block.Current.genesis node.mainnet_net >>= fun { operations } -> Lwt.return operations | ( `Head n | `Test_head n ) as block -> let validator = get_validator node block in @@ -334,7 +334,7 @@ module RPC = struct Prevalidator.pending ~block:b prevalidator >|= fun ops -> Updater.empty_result, ops | `Genesis -> - let net = node.global_net in + let net = node.mainnet_net in State.Valid_block.Current.genesis net >>= fun b -> let validator = get_validator node `Genesis in let prevalidator = Validator.prevalidator validator in @@ -363,7 +363,7 @@ module RPC = struct begin match block with | `Genesis -> - let net = node.global_net in + let net = node.mainnet_net in State.Valid_block.Current.genesis net >>= return | ( `Head 0 | `Prevalidation | `Test_head 0 | `Test_prevalidation ) as block -> @@ -387,7 +387,7 @@ module RPC = struct | None -> failwith "Unknown protocol version" | Some protocol -> return protocol end >>=? fun ((module Proto) as protocol) -> - let net_db = Validator.net_db node.global_validator in + let net_db = Validator.net_db node.mainnet_validator in Prevalidator.preapply net_db context protocol hash timestamp sort ops >>=? fun (ctxt, r) -> Context.get_fitness ctxt >>= fun fitness -> @@ -417,9 +417,9 @@ module RPC = struct Lwt.return (Some (RPC.map (fun _ -> ()) dir)) let heads node = - State.Valid_block.known_heads node.global_net >>= fun heads -> + State.Valid_block.known_heads node.mainnet_net >>= fun heads -> begin - match Validator.test_validator node.global_validator with + match Validator.test_validator node.mainnet_validator with | None -> Lwt.return_nil | Some (_, net_db) -> State.Valid_block.known_heads (Distributed_db.state net_db) @@ -492,7 +492,7 @@ module RPC = struct shutdown let valid_block_watcher node = - let stream, shutdown = Validator.watcher node.validator in + let stream, shutdown = Validator.global_watcher node.validator in Lwt_stream.map (fun block -> convert block) stream, shutdown @@ -507,7 +507,27 @@ module RPC = struct Validator.fetch_block net_v block >>=? fun _ -> return () + let bootstrapped node = + let block_stream, stopper = + Validator.new_head_watcher node.mainnet_validator in + let first_run = ref true in + let rec next () = + if !first_run then begin + first_run := false ; + State.Valid_block.Current.head node.mainnet_net >>= fun head -> + Lwt.return (Some (head.hash, head.timestamp)) + end else begin + Lwt.pick [ + ( Lwt_stream.get block_stream >|= + map_option ~f:(fun b -> (b.State.Valid_block.hash, b.timestamp)) ) ; + (Validator.bootstrapped node.mainnet_validator >|= fun () -> None) ; + ] + end in + let shutdown () = Watcher.shutdown stopper in + RPC.Answer.{ next ; shutdown } + module Network = struct + let stat (node : t) = P2p.RPC.stat node.p2p @@ -518,6 +538,7 @@ module RPC = struct P2p.RPC.connect node.p2p module Connection = struct + let info (node : t) = P2p.RPC.Connection.info node.p2p @@ -529,9 +550,11 @@ module RPC = struct let count (node : t) = P2p.RPC.Connection.count node.p2p + end module Point = struct + let info (node : t) = P2p.RPC.Point.info node.p2p @@ -543,9 +566,11 @@ module RPC = struct let watch (node : t) = P2p.RPC.Point.watch node.p2p + end module Peer_id = struct + let info (node : t) = P2p.RPC.Peer_id.info node.p2p @@ -557,6 +582,9 @@ module RPC = struct let watch (node : t) = P2p.RPC.Peer_id.watch node.p2p + end + end + end diff --git a/src/node/shell/node.mli b/src/node/shell/node.mli index 398bacb3f..194e4ad12 100644 --- a/src/node/shell/node.mli +++ b/src/node/shell/node.mli @@ -86,7 +86,11 @@ module RPC : sig val complete: t -> ?block:block -> string -> string list Lwt.t + val bootstrapped: + t -> (Block_hash.t * Time.t) RPC.Answer.stream + module Network : sig + val stat : t -> P2p.Stat.t val watch : t -> P2p.RPC.Event.t Lwt_stream.t * Watcher.stopper val connect : t -> P2p.Point.t -> float -> unit tzresult Lwt.t @@ -115,7 +119,9 @@ module RPC : sig val watch : t -> P2p.Point.t -> P2p.RPC.Point.Event.t Lwt_stream.t * Watcher.stopper end + end + end val shutdown: t -> unit Lwt.t diff --git a/src/node/shell/node_rpc.ml b/src/node/shell/node_rpc.ml index 3c7ce2651..194a442c3 100644 --- a/src/node/shell/node_rpc.ml +++ b/src/node/shell/node_rpc.ml @@ -437,7 +437,12 @@ let build_rpc_directory node = RPC.register0 dir Services.inject_protocol implementation in let dir = let implementation () = - RPC.Answer.return Data_encoding.Json.(schema (Error_monad.error_encoding ())) in + RPC.Answer.return_stream (Node.RPC.bootstrapped node) in + RPC.register0 dir Services.bootstrapped implementation in + let dir = + let implementation () = + RPC.Answer.return + Data_encoding.Json.(schema (Error_monad.error_encoding ())) in RPC.register0 dir Services.Error.service implementation in let dir = RPC.register1 dir Services.complete diff --git a/src/node/shell/node_rpc_services.ml b/src/node/shell/node_rpc_services.ml index 86cf7afce..69f1f721c 100644 --- a/src/node/shell/node_rpc_services.ml +++ b/src/node/shell/node_rpc_services.ml @@ -763,6 +763,15 @@ let inject_protocol = (obj1 (req "injectedProtocol" Protocol_hash.encoding))) RPC.Path.(root / "inject_protocol") +let bootstrapped = + RPC.service + ~description:"" + ~input: empty + ~output: (obj2 + (req "block" Block_hash.encoding) + (req "timestamp" Time.encoding)) + RPC.Path.(root / "bootstrapped") + let complete = let prefix_arg = let destruct s = Ok s diff --git a/src/node/shell/node_rpc_services.mli b/src/node/shell/node_rpc_services.mli index 8e1e801ef..937e8aba8 100644 --- a/src/node/shell/node_rpc_services.mli +++ b/src/node/shell/node_rpc_services.mli @@ -183,6 +183,8 @@ val inject_protocol: (Tezos_compiler.Protocol.t * bool * bool option), Protocol_hash.t tzresult) RPC.service +val bootstrapped: (unit, unit, unit, Block_hash.t * Time.t) RPC.service + val complete: (unit, unit * string, unit, string list) RPC.service val describe: diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index 4ee276149..8db0c0ff6 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -34,6 +34,9 @@ and t = { create_child: State.Valid_block.t -> unit tzresult Lwt.t ; test_validator: unit -> (t * Distributed_db.net) option ; shutdown: unit -> unit Lwt.t ; + valid_block_input: State.Valid_block.t Watcher.input ; + new_head_input: State.Valid_block.t Watcher.input ; + bootstrapped: unit Lwt.t ; } let net_state { net } = net @@ -50,6 +53,7 @@ let test_validator w = w.test_validator () let fetch_block v = v.fetch_block let prevalidator v = v.prevalidator +let bootstrapped v = v.bootstrapped (** Current block computation *) @@ -83,6 +87,7 @@ let rec may_set_head v (block: State.Valid_block.t) = Distributed_db.broadcast_head v.net_db block.hash [] ; Prevalidator.flush v.prevalidator block ; may_change_test_network v block >>= fun () -> + Watcher.notify v.new_head_input block ; lwt_log_notice "update current head %a %a %a(%t)" Block_hash.pp_short block.hash Fitness.pp block.fitness @@ -202,6 +207,7 @@ module Validation_scheduler = struct "validation of %a: reevaluate current block" Block_hash.pp_short hash >>= fun () -> Watcher.notify v.worker.valid_block_input block ; + Watcher.notify v.valid_block_input block ; may_set_head v block let request state ~get ~set pendings = @@ -444,6 +450,28 @@ let rec create_validator ?parent worker state db net = ] in + let valid_block_input = Watcher.create_input () in + let new_head_input = Watcher.create_input () in + + let bootstrapped = + (* TODO improve by taking current peers count and current + locators into account... *) + let stream, stopper = + Watcher.create_stream valid_block_input in + let rec wait () = + Lwt.pick [ ( Lwt_stream.get stream ) ; + ( Lwt_unix.sleep 30. >|= fun () -> None) ] >>= function + | Some block + when Time.(block.State.Valid_block.timestamp < add (Time.now ()) (-60L)) -> + wait () + | Some _ | None -> Lwt.return_unit in + let t = + wait () >>= fun () -> + Watcher.shutdown stopper ; + Lwt.return_unit in + Lwt.no_cancel t + in + let rec v = { net ; worker ; @@ -456,6 +484,9 @@ let rec create_validator ?parent worker state db net = fetch_block ; create_child ; test_validator ; + bootstrapped ; + new_head_input ; + valid_block_input ; } and notify_block hash block = @@ -657,4 +688,11 @@ let create_worker state db = worker -let watcher { valid_block_input } = Watcher.create_stream valid_block_input +let new_head_watcher ({ new_head_input } : t) = + Watcher.create_stream new_head_input + +let watcher ({ valid_block_input } : t) = + Watcher.create_stream valid_block_input + +let global_watcher ({ valid_block_input } : worker) = + Watcher.create_stream valid_block_input diff --git a/src/node/shell/validator.mli b/src/node/shell/validator.mli index 67793febb..5f2913ede 100644 --- a/src/node/shell/validator.mli +++ b/src/node/shell/validator.mli @@ -34,4 +34,8 @@ val inject_block: val prevalidator: t -> Prevalidator.t val test_validator: t -> (t * Distributed_db.net) option -val watcher: worker -> State.Valid_block.t Lwt_stream.t * Watcher.stopper +val watcher: t -> State.Valid_block.t Lwt_stream.t * Watcher.stopper +val new_head_watcher: t -> State.Valid_block.t Lwt_stream.t * Watcher.stopper +val global_watcher: worker -> State.Valid_block.t Lwt_stream.t * Watcher.stopper + +val bootstrapped: t -> unit Lwt.t