Alpha/Endorser: wait for first block
This commit is contained in:
parent
f4cadd37a5
commit
b0e9e44673
@ -248,41 +248,65 @@ let check_error f =
|
||||
| Ok () -> Lwt.return_unit
|
||||
| Error errs -> lwt_log_error "Error while endorsing:@\n%a" pp_print_error errs
|
||||
|
||||
let create (cctxt : #Proto_alpha.full) ?(max_past=110L) ~delay contracts (block_stream : Client_baking_blocks.block_info tzresult Lwt_stream.t) =
|
||||
let create
|
||||
(cctxt: #Proto_alpha.full)
|
||||
~max_past
|
||||
~delay
|
||||
contracts
|
||||
block_stream
|
||||
bi =
|
||||
lwt_log_info "Preparing endorsement daemon" >>= fun () ->
|
||||
Lwt_stream.get block_stream >>= function
|
||||
| None | Some (Error _) ->
|
||||
cctxt#error "Can't fetch the current block head."
|
||||
| Some (Ok head) ->
|
||||
|
||||
let last_get_block = ref None in
|
||||
let get_block () =
|
||||
match !last_get_block with
|
||||
| None ->
|
||||
let t = Lwt_stream.get block_stream in
|
||||
last_get_block := Some t ;
|
||||
t
|
||||
| Some t -> t in
|
||||
let state = create_state contracts (Int64.of_int delay) in
|
||||
(* statefulness setup *)
|
||||
let last_get_block = ref None in
|
||||
let get_block () =
|
||||
match !last_get_block with
|
||||
| None ->
|
||||
let t = Lwt_stream.get block_stream in
|
||||
last_get_block := Some t ;
|
||||
t
|
||||
| Some t -> t in
|
||||
let state = create_state contracts (Int64.of_int delay) in
|
||||
|
||||
let rec worker_loop () =
|
||||
begin
|
||||
let timeout = compute_timeout state in
|
||||
Lwt.choose [ (timeout >|= fun () -> `Timeout) ;
|
||||
(get_block () >|= fun b -> `Hash b) ] >>= function
|
||||
| `Hash (None | Some (Error _)) ->
|
||||
Lwt.cancel timeout;
|
||||
last_get_block := None;
|
||||
Lwt.return_unit
|
||||
| `Hash (Some (Ok bi)) ->
|
||||
Lwt.cancel timeout;
|
||||
last_get_block := None;
|
||||
check_error (prepare_endorsement cctxt ~max_past state bi)
|
||||
| `Timeout ->
|
||||
check_error (endorse_for cctxt state.to_endorse)
|
||||
end >>= fun () ->
|
||||
worker_loop () in
|
||||
(* main loop *)
|
||||
let rec worker_loop () =
|
||||
begin
|
||||
let timeout = compute_timeout state in
|
||||
Lwt.choose [ (timeout >|= fun () -> `Timeout) ;
|
||||
(get_block () >|= fun b -> `Hash b) ] >>= function
|
||||
| `Hash (None | Some (Error _)) ->
|
||||
Lwt.cancel timeout;
|
||||
last_get_block := None;
|
||||
Lwt.return_unit
|
||||
| `Hash (Some (Ok bi)) ->
|
||||
Lwt.cancel timeout;
|
||||
last_get_block := None;
|
||||
check_error (prepare_endorsement cctxt ~max_past state bi)
|
||||
| `Timeout ->
|
||||
check_error (endorse_for cctxt state.to_endorse)
|
||||
end >>= fun () ->
|
||||
worker_loop () in
|
||||
|
||||
check_error (prepare_endorsement cctxt ~max_past state head) >>= fun () ->
|
||||
lwt_log_info "Starting endorsement daemon" >>= fun () ->
|
||||
worker_loop ()
|
||||
(* ignition *)
|
||||
check_error (prepare_endorsement cctxt ~max_past state bi) >>= fun () ->
|
||||
lwt_log_info "Starting endorsement daemon" >>= fun () ->
|
||||
worker_loop ()
|
||||
|
||||
(* A wrapper around the main create function (above) to wait for the initial
|
||||
block. *)
|
||||
let create
|
||||
(cctxt: #Proto_alpha.full)
|
||||
?(max_past=110L)
|
||||
~delay
|
||||
contracts
|
||||
(block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) =
|
||||
let rec wait_for_first_block () =
|
||||
Lwt_stream.get block_stream >>= function
|
||||
| None | Some (Error _) ->
|
||||
cctxt#message "Can't fetch the current block head. Retrying soon." >>= fun () ->
|
||||
(* NOTE: this is not a tight loop because of Lwt_stream.get *)
|
||||
wait_for_first_block ()
|
||||
| Some (Ok bi) ->
|
||||
create cctxt ~max_past ~delay contracts block_stream bi
|
||||
in
|
||||
wait_for_first_block ()
|
||||
|
Loading…
Reference in New Issue
Block a user