Client: implements ./tezos-client bootstrapped

It wait for the node to be synchronized with the network. The heuristic
is currently:

- the timestamp of current head is less than 1 minute old ;
- there was a period of 30 seconds without new block discovered.
This commit is contained in:
Grégoire Henry 2017-02-28 00:48:22 +01:00
parent de050bfee1
commit 768cf91cd6
10 changed files with 141 additions and 30 deletions

View File

@ -34,5 +34,19 @@ let commands () = Cli_entries.[
| _ :: _ :: _ when !unique -> Pervasives.exit 3 | _ :: _ :: _ when !unique -> Pervasives.exit 3
| completions -> | completions ->
List.iter print_endline completions ; List.iter print_endline completions ;
Lwt.return_unit) Lwt.return_unit) ;
command
~desc: "Wait for the node to be bootstrapped."
~args: []
(prefixes [ "bootstrapped" ] @@
stop)
(fun cctxt ->
Client_node_rpcs.bootstrapped cctxt >>= fun stream ->
Lwt_stream.iter_s (fun (hash, time) ->
cctxt.message "Current head: %a (%a)"
Block_hash.pp_short hash
Time.pp_hum time
) stream >>= fun () ->
cctxt.answer "Bootstrapped."
)
] ]

View File

@ -141,6 +141,8 @@ let inject_operation cctxt ?(wait = true) ?force operation =
call_service0 cctxt Services.inject_operation (operation, wait, force) call_service0 cctxt Services.inject_operation (operation, wait, force)
let inject_protocol cctxt ?(wait = true) ?force protocol = let inject_protocol cctxt ?(wait = true) ?force protocol =
call_service0 cctxt Services.inject_protocol (protocol, wait, force) call_service0 cctxt Services.inject_protocol (protocol, wait, force)
let bootstrapped cctxt =
call_streamed_service0 cctxt Services.bootstrapped ()
let complete cctxt ?block prefix = let complete cctxt ?block prefix =
match block with match block with
| None -> | None ->

View File

@ -149,6 +149,9 @@ module Protocols : sig
(Protocol_hash.t * Store.Protocol.t option) list Lwt.t (Protocol_hash.t * Store.Protocol.t option) list Lwt.t
end end
val bootstrapped:
Client_commands.context -> (Block_hash.t * Time.t) Lwt_stream.t Lwt.t
val complete: val complete:
Client_commands.context -> Client_commands.context ->
?block:Blocks.block -> string -> string list Lwt.t ?block:Blocks.block -> string -> string list Lwt.t

View File

@ -50,9 +50,9 @@ type t = {
state: State.t ; state: State.t ;
distributed_db: Distributed_db.t ; distributed_db: Distributed_db.t ;
validator: Validator.worker ; validator: Validator.worker ;
global_db: Distributed_db.net ; mainnet_db: Distributed_db.net ;
global_net: State.Net.t ; mainnet_net: State.Net.t ;
global_validator: Validator.t ; mainnet_validator: Validator.t ;
inject_block: inject_block:
?force:bool -> MBytes.t -> ?force:bool -> MBytes.t ->
(Block_hash.t * unit tzresult Lwt.t) tzresult Lwt.t ; (Block_hash.t * unit tzresult Lwt.t) tzresult Lwt.t ;
@ -99,9 +99,9 @@ let create { genesis ; store_root ; context_root ;
State.Net.create state State.Net.create state
?test_protocol ?test_protocol
~forked_network_ttl:(48 * 3600) (* 2 days *) ~forked_network_ttl:(48 * 3600) (* 2 days *)
genesis >>= fun global_net -> genesis >>= fun mainnet_net ->
Validator.activate validator global_net >>= fun global_validator -> Validator.activate validator mainnet_net >>= fun mainnet_validator ->
let global_db = Validator.net_db global_validator in let mainnet_db = Validator.net_db mainnet_validator in
let shutdown () = let shutdown () =
P2p.shutdown p2p >>= fun () -> P2p.shutdown p2p >>= fun () ->
Validator.shutdown validator >>= fun () -> Validator.shutdown validator >>= fun () ->
@ -111,9 +111,9 @@ let create { genesis ; store_root ; context_root ;
state ; state ;
distributed_db ; distributed_db ;
validator ; validator ;
global_db ; mainnet_db ;
global_net ; mainnet_net ;
global_validator ; mainnet_validator ;
inject_block = inject_block validator ; inject_block = inject_block validator ;
inject_operation = inject_operation validator ; inject_operation = inject_operation validator ;
inject_protocol = inject_protocol state ; inject_protocol = inject_protocol state ;
@ -181,16 +181,16 @@ module RPC = struct
let get_net node = function let get_net node = function
| `Genesis | `Head _ | `Prevalidation -> | `Genesis | `Head _ | `Prevalidation ->
node.global_validator, node.global_db node.mainnet_validator, node.mainnet_db
| `Test_head _ | `Test_prevalidation -> | `Test_head _ | `Test_prevalidation ->
match Validator.test_validator node.global_validator with match Validator.test_validator node.mainnet_validator with
| None -> raise Not_found | None -> raise Not_found
| Some v -> v | Some v -> v
let get_validator node = function let get_validator node = function
| `Genesis | `Head _ | `Prevalidation -> node.global_validator | `Genesis | `Head _ | `Prevalidation -> node.mainnet_validator
| `Test_head _ | `Test_prevalidation -> | `Test_head _ | `Test_prevalidation ->
match Validator.test_validator node.global_validator with match Validator.test_validator node.mainnet_validator with
| None -> raise Not_found | None -> raise Not_found
| Some (v, _) -> v | Some (v, _) -> v
@ -198,16 +198,16 @@ module RPC = struct
Distributed_db.read_block_exn Distributed_db.read_block_exn
node.distributed_db hash >>= fun (_net_db, block) -> node.distributed_db hash >>= fun (_net_db, block) ->
if State.Net_id.equal if State.Net_id.equal
(State.Net.id node.global_net) (State.Net.id node.mainnet_net)
block.shell.net_id then block.shell.net_id then
Lwt.return (Some (node.global_validator, node.global_db)) Lwt.return (Some (node.mainnet_validator, node.mainnet_db))
else else
match Validator.test_validator node.global_validator with match Validator.test_validator node.mainnet_validator with
| Some (test_validator, net_db) | Some (test_validator, net_db)
when State.Net_id.equal when State.Net_id.equal
(State.Net.id (Validator.net_state test_validator)) (State.Net.id (Validator.net_state test_validator))
block.shell.net_id -> block.shell.net_id ->
Lwt.return (Some (node.global_validator, net_db)) Lwt.return (Some (node.mainnet_validator, net_db))
| _ -> Lwt.return_none | _ -> Lwt.return_none
let read_valid_block node h = let read_valid_block node h =
@ -246,7 +246,7 @@ module RPC = struct
let block_info node (block: block) = let block_info node (block: block) =
match block with match block with
| `Genesis -> | `Genesis ->
State.Valid_block.Current.genesis node.global_net >|= convert State.Valid_block.Current.genesis node.mainnet_net >|= convert
| ( `Head n | `Test_head n ) as block -> | ( `Head n | `Test_head n ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
let net_db = Validator.net_db validator in let net_db = Validator.net_db validator in
@ -271,7 +271,7 @@ module RPC = struct
let get_context node block = let get_context node block =
match block with match block with
| `Genesis -> | `Genesis ->
State.Valid_block.Current.genesis node.global_net >>= fun block -> State.Valid_block.Current.genesis node.mainnet_net >>= fun block ->
Lwt.return (Some block.context) Lwt.return (Some block.context)
| ( `Head n | `Test_head n ) as block -> | ( `Head n | `Test_head n ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
@ -293,7 +293,7 @@ module RPC = struct
let operations node block = let operations node block =
match block with match block with
| `Genesis -> | `Genesis ->
State.Valid_block.Current.genesis node.global_net >>= fun { operations } -> State.Valid_block.Current.genesis node.mainnet_net >>= fun { operations } ->
Lwt.return operations Lwt.return operations
| ( `Head n | `Test_head n ) as block -> | ( `Head n | `Test_head n ) as block ->
let validator = get_validator node block in let validator = get_validator node block in
@ -334,7 +334,7 @@ module RPC = struct
Prevalidator.pending ~block:b prevalidator >|= fun ops -> Prevalidator.pending ~block:b prevalidator >|= fun ops ->
Updater.empty_result, ops Updater.empty_result, ops
| `Genesis -> | `Genesis ->
let net = node.global_net in let net = node.mainnet_net in
State.Valid_block.Current.genesis net >>= fun b -> State.Valid_block.Current.genesis net >>= fun b ->
let validator = get_validator node `Genesis in let validator = get_validator node `Genesis in
let prevalidator = Validator.prevalidator validator in let prevalidator = Validator.prevalidator validator in
@ -363,7 +363,7 @@ module RPC = struct
begin begin
match block with match block with
| `Genesis -> | `Genesis ->
let net = node.global_net in let net = node.mainnet_net in
State.Valid_block.Current.genesis net >>= return State.Valid_block.Current.genesis net >>= return
| ( `Head 0 | `Prevalidation | ( `Head 0 | `Prevalidation
| `Test_head 0 | `Test_prevalidation ) as block -> | `Test_head 0 | `Test_prevalidation ) as block ->
@ -387,7 +387,7 @@ module RPC = struct
| None -> failwith "Unknown protocol version" | None -> failwith "Unknown protocol version"
| Some protocol -> return protocol | Some protocol -> return protocol
end >>=? fun ((module Proto) as protocol) -> end >>=? fun ((module Proto) as protocol) ->
let net_db = Validator.net_db node.global_validator in let net_db = Validator.net_db node.mainnet_validator in
Prevalidator.preapply Prevalidator.preapply
net_db context protocol hash timestamp sort ops >>=? fun (ctxt, r) -> net_db context protocol hash timestamp sort ops >>=? fun (ctxt, r) ->
Context.get_fitness ctxt >>= fun fitness -> Context.get_fitness ctxt >>= fun fitness ->
@ -417,9 +417,9 @@ module RPC = struct
Lwt.return (Some (RPC.map (fun _ -> ()) dir)) Lwt.return (Some (RPC.map (fun _ -> ()) dir))
let heads node = let heads node =
State.Valid_block.known_heads node.global_net >>= fun heads -> State.Valid_block.known_heads node.mainnet_net >>= fun heads ->
begin begin
match Validator.test_validator node.global_validator with match Validator.test_validator node.mainnet_validator with
| None -> Lwt.return_nil | None -> Lwt.return_nil
| Some (_, net_db) -> | Some (_, net_db) ->
State.Valid_block.known_heads (Distributed_db.state net_db) State.Valid_block.known_heads (Distributed_db.state net_db)
@ -492,7 +492,7 @@ module RPC = struct
shutdown shutdown
let valid_block_watcher node = let valid_block_watcher node =
let stream, shutdown = Validator.watcher node.validator in let stream, shutdown = Validator.global_watcher node.validator in
Lwt_stream.map (fun block -> convert block) stream, Lwt_stream.map (fun block -> convert block) stream,
shutdown shutdown
@ -507,7 +507,27 @@ module RPC = struct
Validator.fetch_block net_v block >>=? fun _ -> Validator.fetch_block net_v block >>=? fun _ ->
return () return ()
let bootstrapped node =
let block_stream, stopper =
Validator.new_head_watcher node.mainnet_validator in
let first_run = ref true in
let rec next () =
if !first_run then begin
first_run := false ;
State.Valid_block.Current.head node.mainnet_net >>= fun head ->
Lwt.return (Some (head.hash, head.timestamp))
end else begin
Lwt.pick [
( Lwt_stream.get block_stream >|=
map_option ~f:(fun b -> (b.State.Valid_block.hash, b.timestamp)) ) ;
(Validator.bootstrapped node.mainnet_validator >|= fun () -> None) ;
]
end in
let shutdown () = Watcher.shutdown stopper in
RPC.Answer.{ next ; shutdown }
module Network = struct module Network = struct
let stat (node : t) = let stat (node : t) =
P2p.RPC.stat node.p2p P2p.RPC.stat node.p2p
@ -518,6 +538,7 @@ module RPC = struct
P2p.RPC.connect node.p2p P2p.RPC.connect node.p2p
module Connection = struct module Connection = struct
let info (node : t) = let info (node : t) =
P2p.RPC.Connection.info node.p2p P2p.RPC.Connection.info node.p2p
@ -529,9 +550,11 @@ module RPC = struct
let count (node : t) = let count (node : t) =
P2p.RPC.Connection.count node.p2p P2p.RPC.Connection.count node.p2p
end end
module Point = struct module Point = struct
let info (node : t) = let info (node : t) =
P2p.RPC.Point.info node.p2p P2p.RPC.Point.info node.p2p
@ -543,9 +566,11 @@ module RPC = struct
let watch (node : t) = let watch (node : t) =
P2p.RPC.Point.watch node.p2p P2p.RPC.Point.watch node.p2p
end end
module Peer_id = struct module Peer_id = struct
let info (node : t) = let info (node : t) =
P2p.RPC.Peer_id.info node.p2p P2p.RPC.Peer_id.info node.p2p
@ -557,6 +582,9 @@ module RPC = struct
let watch (node : t) = let watch (node : t) =
P2p.RPC.Peer_id.watch node.p2p P2p.RPC.Peer_id.watch node.p2p
end end
end end
end end

View File

@ -86,7 +86,11 @@ module RPC : sig
val complete: val complete:
t -> ?block:block -> string -> string list Lwt.t t -> ?block:block -> string -> string list Lwt.t
val bootstrapped:
t -> (Block_hash.t * Time.t) RPC.Answer.stream
module Network : sig module Network : sig
val stat : t -> P2p.Stat.t val stat : t -> P2p.Stat.t
val watch : t -> P2p.RPC.Event.t Lwt_stream.t * Watcher.stopper val watch : t -> P2p.RPC.Event.t Lwt_stream.t * Watcher.stopper
val connect : t -> P2p.Point.t -> float -> unit tzresult Lwt.t val connect : t -> P2p.Point.t -> float -> unit tzresult Lwt.t
@ -115,7 +119,9 @@ module RPC : sig
val watch : t -> P2p.Point.t -> val watch : t -> P2p.Point.t ->
P2p.RPC.Point.Event.t Lwt_stream.t * Watcher.stopper P2p.RPC.Point.Event.t Lwt_stream.t * Watcher.stopper
end end
end end
end end
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t

View File

@ -437,7 +437,12 @@ let build_rpc_directory node =
RPC.register0 dir Services.inject_protocol implementation in RPC.register0 dir Services.inject_protocol implementation in
let dir = let dir =
let implementation () = let implementation () =
RPC.Answer.return Data_encoding.Json.(schema (Error_monad.error_encoding ())) in RPC.Answer.return_stream (Node.RPC.bootstrapped node) in
RPC.register0 dir Services.bootstrapped implementation in
let dir =
let implementation () =
RPC.Answer.return
Data_encoding.Json.(schema (Error_monad.error_encoding ())) in
RPC.register0 dir Services.Error.service implementation in RPC.register0 dir Services.Error.service implementation in
let dir = let dir =
RPC.register1 dir Services.complete RPC.register1 dir Services.complete

View File

@ -763,6 +763,15 @@ let inject_protocol =
(obj1 (req "injectedProtocol" Protocol_hash.encoding))) (obj1 (req "injectedProtocol" Protocol_hash.encoding)))
RPC.Path.(root / "inject_protocol") RPC.Path.(root / "inject_protocol")
let bootstrapped =
RPC.service
~description:""
~input: empty
~output: (obj2
(req "block" Block_hash.encoding)
(req "timestamp" Time.encoding))
RPC.Path.(root / "bootstrapped")
let complete = let complete =
let prefix_arg = let prefix_arg =
let destruct s = Ok s let destruct s = Ok s

View File

@ -183,6 +183,8 @@ val inject_protocol:
(Tezos_compiler.Protocol.t * bool * bool option), (Tezos_compiler.Protocol.t * bool * bool option),
Protocol_hash.t tzresult) RPC.service Protocol_hash.t tzresult) RPC.service
val bootstrapped: (unit, unit, unit, Block_hash.t * Time.t) RPC.service
val complete: (unit, unit * string, unit, string list) RPC.service val complete: (unit, unit * string, unit, string list) RPC.service
val describe: val describe:

View File

@ -34,6 +34,9 @@ and t = {
create_child: State.Valid_block.t -> unit tzresult Lwt.t ; create_child: State.Valid_block.t -> unit tzresult Lwt.t ;
test_validator: unit -> (t * Distributed_db.net) option ; test_validator: unit -> (t * Distributed_db.net) option ;
shutdown: unit -> unit Lwt.t ; shutdown: unit -> unit Lwt.t ;
valid_block_input: State.Valid_block.t Watcher.input ;
new_head_input: State.Valid_block.t Watcher.input ;
bootstrapped: unit Lwt.t ;
} }
let net_state { net } = net let net_state { net } = net
@ -50,6 +53,7 @@ let test_validator w = w.test_validator ()
let fetch_block v = v.fetch_block let fetch_block v = v.fetch_block
let prevalidator v = v.prevalidator let prevalidator v = v.prevalidator
let bootstrapped v = v.bootstrapped
(** Current block computation *) (** Current block computation *)
@ -83,6 +87,7 @@ let rec may_set_head v (block: State.Valid_block.t) =
Distributed_db.broadcast_head v.net_db block.hash [] ; Distributed_db.broadcast_head v.net_db block.hash [] ;
Prevalidator.flush v.prevalidator block ; Prevalidator.flush v.prevalidator block ;
may_change_test_network v block >>= fun () -> may_change_test_network v block >>= fun () ->
Watcher.notify v.new_head_input block ;
lwt_log_notice "update current head %a %a %a(%t)" lwt_log_notice "update current head %a %a %a(%t)"
Block_hash.pp_short block.hash Block_hash.pp_short block.hash
Fitness.pp block.fitness Fitness.pp block.fitness
@ -202,6 +207,7 @@ module Validation_scheduler = struct
"validation of %a: reevaluate current block" "validation of %a: reevaluate current block"
Block_hash.pp_short hash >>= fun () -> Block_hash.pp_short hash >>= fun () ->
Watcher.notify v.worker.valid_block_input block ; Watcher.notify v.worker.valid_block_input block ;
Watcher.notify v.valid_block_input block ;
may_set_head v block may_set_head v block
let request state ~get ~set pendings = let request state ~get ~set pendings =
@ -444,6 +450,28 @@ let rec create_validator ?parent worker state db net =
] ]
in in
let valid_block_input = Watcher.create_input () in
let new_head_input = Watcher.create_input () in
let bootstrapped =
(* TODO improve by taking current peers count and current
locators into account... *)
let stream, stopper =
Watcher.create_stream valid_block_input in
let rec wait () =
Lwt.pick [ ( Lwt_stream.get stream ) ;
( Lwt_unix.sleep 30. >|= fun () -> None) ] >>= function
| Some block
when Time.(block.State.Valid_block.timestamp < add (Time.now ()) (-60L)) ->
wait ()
| Some _ | None -> Lwt.return_unit in
let t =
wait () >>= fun () ->
Watcher.shutdown stopper ;
Lwt.return_unit in
Lwt.no_cancel t
in
let rec v = { let rec v = {
net ; net ;
worker ; worker ;
@ -456,6 +484,9 @@ let rec create_validator ?parent worker state db net =
fetch_block ; fetch_block ;
create_child ; create_child ;
test_validator ; test_validator ;
bootstrapped ;
new_head_input ;
valid_block_input ;
} }
and notify_block hash block = and notify_block hash block =
@ -657,4 +688,11 @@ let create_worker state db =
worker worker
let watcher { valid_block_input } = Watcher.create_stream valid_block_input let new_head_watcher ({ new_head_input } : t) =
Watcher.create_stream new_head_input
let watcher ({ valid_block_input } : t) =
Watcher.create_stream valid_block_input
let global_watcher ({ valid_block_input } : worker) =
Watcher.create_stream valid_block_input

View File

@ -34,4 +34,8 @@ val inject_block:
val prevalidator: t -> Prevalidator.t val prevalidator: t -> Prevalidator.t
val test_validator: t -> (t * Distributed_db.net) option val test_validator: t -> (t * Distributed_db.net) option
val watcher: worker -> State.Valid_block.t Lwt_stream.t * Watcher.stopper val watcher: t -> State.Valid_block.t Lwt_stream.t * Watcher.stopper
val new_head_watcher: t -> State.Valid_block.t Lwt_stream.t * Watcher.stopper
val global_watcher: worker -> State.Valid_block.t Lwt_stream.t * Watcher.stopper
val bootstrapped: t -> unit Lwt.t