diff --git a/src/proto_alpha/lib_delegate/client_baking_endorsement.ml b/src/proto_alpha/lib_delegate/client_baking_endorsement.ml index ab970b0a1..9e17ca55a 100644 --- a/src/proto_alpha/lib_delegate/client_baking_endorsement.ml +++ b/src/proto_alpha/lib_delegate/client_baking_endorsement.ml @@ -248,41 +248,65 @@ let check_error f = | Ok () -> Lwt.return_unit | Error errs -> lwt_log_error "Error while endorsing:@\n%a" pp_print_error errs -let create (cctxt : #Proto_alpha.full) ?(max_past=110L) ~delay contracts (block_stream : Client_baking_blocks.block_info tzresult Lwt_stream.t) = +let create + (cctxt: #Proto_alpha.full) + ~max_past + ~delay + contracts + block_stream + bi = lwt_log_info "Preparing endorsement daemon" >>= fun () -> - Lwt_stream.get block_stream >>= function - | None | Some (Error _) -> - cctxt#error "Can't fetch the current block head." - | Some (Ok head) -> - let last_get_block = ref None in - let get_block () = - match !last_get_block with - | None -> - let t = Lwt_stream.get block_stream in - last_get_block := Some t ; - t - | Some t -> t in - let state = create_state contracts (Int64.of_int delay) in + (* statefulness setup *) + let last_get_block = ref None in + let get_block () = + match !last_get_block with + | None -> + let t = Lwt_stream.get block_stream in + last_get_block := Some t ; + t + | Some t -> t in + let state = create_state contracts (Int64.of_int delay) in - let rec worker_loop () = - begin - let timeout = compute_timeout state in - Lwt.choose [ (timeout >|= fun () -> `Timeout) ; - (get_block () >|= fun b -> `Hash b) ] >>= function - | `Hash (None | Some (Error _)) -> - Lwt.cancel timeout; - last_get_block := None; - Lwt.return_unit - | `Hash (Some (Ok bi)) -> - Lwt.cancel timeout; - last_get_block := None; - check_error (prepare_endorsement cctxt ~max_past state bi) - | `Timeout -> - check_error (endorse_for cctxt state.to_endorse) - end >>= fun () -> - worker_loop () in + (* main loop *) + let rec worker_loop () = + begin + let timeout = compute_timeout state in + Lwt.choose [ (timeout >|= fun () -> `Timeout) ; + (get_block () >|= fun b -> `Hash b) ] >>= function + | `Hash (None | Some (Error _)) -> + Lwt.cancel timeout; + last_get_block := None; + Lwt.return_unit + | `Hash (Some (Ok bi)) -> + Lwt.cancel timeout; + last_get_block := None; + check_error (prepare_endorsement cctxt ~max_past state bi) + | `Timeout -> + check_error (endorse_for cctxt state.to_endorse) + end >>= fun () -> + worker_loop () in - check_error (prepare_endorsement cctxt ~max_past state head) >>= fun () -> - lwt_log_info "Starting endorsement daemon" >>= fun () -> - worker_loop () + (* ignition *) + check_error (prepare_endorsement cctxt ~max_past state bi) >>= fun () -> + lwt_log_info "Starting endorsement daemon" >>= fun () -> + worker_loop () + +(* A wrapper around the main create function (above) to wait for the initial + block. *) +let create + (cctxt: #Proto_alpha.full) + ?(max_past=110L) + ~delay + contracts + (block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) = + let rec wait_for_first_block () = + Lwt_stream.get block_stream >>= function + | None | Some (Error _) -> + cctxt#message "Can't fetch the current block head. Retrying soon." >>= fun () -> + (* NOTE: this is not a tight loop because of Lwt_stream.get *) + wait_for_first_block () + | Some (Ok bi) -> + create cctxt ~max_past ~delay contracts block_stream bi + in + wait_for_first_block ()