Alpha/Bake,Endorse,Denounce: outsource scheduling

This makes the scheduling its own separate problem the solution of which
can be tackled separately from the specificities of the three binaries.
This commit is contained in:
Raphaël Proust 2018-06-20 11:20:50 +08:00
parent 14ee040e97
commit d4974aefa8
8 changed files with 216 additions and 203 deletions

View File

@ -10,27 +10,13 @@
include Logging.Make(struct let name = "client.denunciation" end)
let create cctxt endorsement_stream =
let last_get_endorsement = ref None in
let get_endorsement () =
match !last_get_endorsement with
| None ->
let t = Lwt_stream.get endorsement_stream in
last_get_endorsement := Some t ;
t
| Some t -> t in
let rec worker_loop () =
(* let timeout = compute_timeout state in *)
Lwt.choose [
(* (timeout >|= fun () -> `Timeout) ; *)
(get_endorsement () >|= fun e -> `Endorsement e) ;
] >>= function
| `Endorsement (None | Some (Error _)) ->
lwt_log_error "Connection to node lost, exiting." >>= fun () ->
exit 1
| `Endorsement (Some (Ok e)) ->
last_get_endorsement := None ;
Client_keys.Public_key_hash.name cctxt
let never_ends = Lwt_utils.never_ending () in
let event_k cctxt () e =
(* TODO: more than just logging *)
Client_keys.Public_key_hash.name
cctxt
e.Client_baking_operations.source >>= function
| Ok source ->
lwt_debug
@ -38,14 +24,22 @@ let create cctxt endorsement_stream =
Block_hash.pp_short e.block
source
Format.(pp_print_list pp_print_int) e.slots >>= fun () ->
worker_loop ()
return ()
| Error errs ->
lwt_log_error "Error whilst checking the endorsment %a/%a:@\n%a"
Block_hash.pp_short e.block
Format.(pp_print_list pp_print_int) e.slots
pp_print_error errs >>= fun () ->
worker_loop ()
return ()
in
lwt_log_info "Starting denunciation daemon" >>= fun () ->
worker_loop ()
Client_baking_scheduling.main
~name:"denunciator"
~cctxt
~stream:endorsement_stream
~state_maker:(fun _ _ -> return ())
~pre_loop:(fun _ _ _ -> return ())
~compute_timeout:(fun () -> never_ends)
~timeout_k:(fun _ _ () -> return ())
~event_k

View File

@ -10,4 +10,4 @@
val create:
#Proto_alpha.full ->
Client_baking_operations.valid_endorsement tzresult Lwt_stream.t ->
unit Lwt.t
unit tzresult Lwt.t

View File

@ -133,7 +133,7 @@ let allowed_to_endorse cctxt bi delegate =
| false ->
return true
let prepare_endorsement (cctxt : #Proto_alpha.full) ~(max_past:int64) state bi =
let prepare_endorsement ~(max_past:int64) () (cctxt : #Proto_alpha.full) state bi =
if Time.diff (Time.now ()) bi.Client_baking_blocks.timestamp > max_past then
lwt_log_info "Ignore block %a: forged too far the past"
Block_hash.pp_short bi.hash >>= fun () ->
@ -158,7 +158,7 @@ let compute_timeout state =
| None -> Lwt_utils.never_ending ()
| Some { timeout ; block ; delegates } ->
timeout >>= fun () ->
Lwt.return (`Timeout (block, delegates))
Lwt.return (block, delegates)
let check_error f =
f >>= function
@ -167,22 +167,13 @@ let check_error f =
let create
(cctxt: #Proto_alpha.full)
~max_past
?(max_past=110L)
~delay
contracts
block_stream
bi =
lwt_log_info "Preparing endorsement daemon" >>= fun () ->
(* statefulness setup *)
let last_get_block = ref None in
let get_block () =
match !last_get_block with
| None ->
let t = Lwt_stream.get block_stream in
last_get_block := Some t ;
t
| Some t -> t in
=
let state_maker _ _ =
let contracts = match contracts with
| [] ->
tzlazy (fun () ->
@ -192,43 +183,24 @@ let create
| _ :: _ ->
tzlazy (fun () -> return contracts) in
let state = create_state contracts (Int64.of_int delay) in
return state
in
(* main loop *)
let rec worker_loop () =
begin
Lwt.choose [ compute_timeout state ;
(get_block () >|= fun b -> `Hash b) ] >>= function
| `Hash None ->
last_get_block := None ;
lwt_log_error "Connection to node lost, exiting." >>= fun () ->
exit 1
| `Hash (Some (Error _)) ->
last_get_block := None ;
Lwt.return_unit
| `Hash (Some (Ok bi)) ->
last_get_block := None ;
let timeout_k cctxt state (block, delegates) =
state.pending <- None ;
check_error @@ prepare_endorsement cctxt ~max_past state bi
| `Timeout (block, delegates) ->
iter_p (endorse_for_delegate cctxt block) delegates
in
let event_k cctxt state bi =
state.pending <- None ;
check_error @@ iter_p (endorse_for_delegate cctxt block) delegates
end >>= fun () ->
worker_loop () in
prepare_endorsement ~max_past () cctxt state bi
in
(* ignition *)
check_error (prepare_endorsement cctxt ~max_past state bi) >>= fun () ->
lwt_log_notice "Starting endorsement daemon" >>= fun () ->
worker_loop ()
(* A wrapper around the main create function (above) to wait for the initial
block. *)
let create
(cctxt: #Proto_alpha.full)
?(max_past=110L)
~delay
contracts
(block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) =
Client_baking_scheduling.wait_for_first_block
~info:lwt_log_info
block_stream
(create cctxt ~max_past ~delay contracts block_stream)
Client_baking_scheduling.main
~name:"endorser"
~cctxt
~stream:block_stream
~state_maker
~pre_loop:(prepare_endorsement ~max_past ())
~compute_timeout
~timeout_k
~event_k

View File

@ -24,4 +24,4 @@ val create :
?max_past:int64 (* number of seconds *) ->
delay:int ->
public_key_hash list ->
Client_baking_blocks.block_info tzresult Lwt_stream.t -> unit Lwt.t
Client_baking_blocks.block_info tzresult Lwt_stream.t -> unit tzresult Lwt.t

View File

@ -441,8 +441,9 @@ let safe_get_unrevealed_nonces cctxt block =
let insert_block
(cctxt: #Proto_alpha.full)
?max_priority
()
(cctxt: #Proto_alpha.full)
state
(bi: Client_baking_blocks.block_info) =
begin
@ -654,7 +655,12 @@ let pp_operation_list_list =
(* [bake] create a single block when woken up to do so. All the necessary
information (e.g., slot) is available in the [state]. *)
let bake (cctxt : #Proto_alpha.full) ?threshold state =
let bake
?threshold
()
(cctxt : #Proto_alpha.full)
state
() =
let slots = pop_baking_slots state in
lwt_log_info "Found %d current slots and %d future slots."
(List.length slots)
@ -715,11 +721,6 @@ let bake (cctxt : #Proto_alpha.full) ?threshold state =
lwt_debug "No valid candidates." >>= fun () ->
return ()
let check_error p =
p >>= function
| Ok () -> Lwt.return_unit
| Error errs -> lwt_log_error "Error while baking:@\n%a" pp_print_error errs
(* [create] starts the main loop of the baker. The loop monitors new blocks and
@ -732,22 +733,9 @@ let create
~(context_path: string)
(delegates: public_key_hash list)
(block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t)
(bi: Client_baking_blocks.block_info) =
=
lwt_log_info "Setting up before the baker can start." >>= fun () ->
Shell_services.Blocks.hash cctxt ~block:`Genesis () >>=? fun genesis_hash ->
(* statefulness *)
let last_get_block = ref None in
let get_block () =
match !last_get_block with
| None ->
let t = Lwt_stream.get block_stream in
last_get_block := Some t ;
t
| Some t -> t in
lwt_debug "Opening shell context" >>= fun () ->
Client_baking_simulator.load_context ~context_path >>= fun index ->
let state_maker genesis_hash bi =
let delegates = match delegates with
| [] ->
tzlazy (fun () ->
@ -758,54 +746,18 @@ let create
| _ :: _ -> tzlazy (fun () -> return delegates) in
let constants =
tzlazy (fun () -> Alpha_services.Constants.all cctxt (`Main, `Head 0)) in
Client_baking_simulator.load_context ~context_path >>= fun index ->
let state = create_state genesis_hash index delegates constants bi in
check_error @@ insert_block cctxt ?max_priority state bi >>= fun () ->
return state
in
(* main loop *)
let rec worker_loop () =
begin
(* event construction *)
let timeout = compute_timeout state in
Lwt.choose [ (timeout >|= fun () -> `Timeout) ;
(get_block () >|= fun b -> `Hash b) ;
] >>= function
(* event matching *)
| `Hash (None | Some (Error _)) ->
(* exit when the node is unavailable *)
last_get_block := None ;
lwt_log_error "Connection to node lost, exiting." >>= fun () ->
exit 1
| `Hash (Some (Ok bi)) -> begin
(* new block: cancel everything and bake on the new head *)
last_get_block := None ;
lwt_debug
"Discoverered block: %a"
Block_hash.pp_short bi.Client_baking_blocks.hash >>= fun () ->
check_error @@ insert_block cctxt ?max_priority state bi
end
| `Timeout ->
(* main event: it's baking time *)
lwt_debug "Waking up for baking..." >>= fun () ->
(* core functionality *)
check_error @@ bake cctxt ?threshold state
end >>= fun () ->
(* and restart *)
worker_loop () in
Client_baking_scheduling.main
~name:"baker"
~cctxt
~stream:block_stream
~state_maker
~pre_loop:(insert_block ?max_priority ())
~compute_timeout
~timeout_k:(bake ?threshold ())
~event_k:(insert_block ?max_priority ())
(* ignition *)
lwt_log_info "Starting baking daemon" >>= fun () ->
worker_loop ()
(* Wrapper around previous [create] function that handles the case of
unavailable blocks (empty block chain). *)
let create
(cctxt : #Proto_alpha.full)
?threshold
?max_priority
~(context_path: string)
(delegates: public_key_hash list)
(block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) =
Client_baking_scheduling.wait_for_first_block
~info:lwt_log_info
block_stream
(create cctxt ?threshold ?max_priority ~context_path delegates block_stream)

View File

@ -7,6 +7,7 @@
(* *)
(**************************************************************************)
include Logging.Make(struct let name = "client.scheduling" end)
let sleep_until time =
let delay = Time.diff time (Time.now ()) in
@ -15,17 +16,89 @@ let sleep_until time =
else
Some (Lwt_unix.sleep (Int64.to_float delay))
let wait_for_first_block
?(info = fun (_: (unit Lwt.t, unit) Client_context.lwt_format) -> Lwt.return_unit)
(block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t)
k =
let rec wait_for_first_block () =
Lwt_stream.get block_stream >>= function
let rec wait_for_first_event stream =
Lwt_stream.get stream >>= function
| None | Some (Error _) ->
info "Can't fetch the current block head. Retrying soon." >>= fun () ->
lwt_log_info "Can't fetch the current event. Waiting for new event." >>= fun () ->
(* NOTE: this is not a tight loop because of Lwt_stream.get *)
wait_for_first_block ()
wait_for_first_event stream
| Some (Ok bi) ->
k bi
in
wait_for_first_block ()
Lwt.return bi
let log_errors_and_continue p =
p >>= function
| Ok () -> Lwt.return_unit
| Error errs -> lwt_log_error "Error while baking:@\n%a" pp_print_error errs
let main
~(name: string)
~(cctxt: #Proto_alpha.full)
~(stream: 'event tzresult Lwt_stream.t)
~(state_maker: (Block_hash.t ->
'event ->
'state tzresult Lwt.t))
~(pre_loop: (#Proto_alpha.full ->
'state ->
'event ->
unit tzresult Lwt.t))
~(compute_timeout: ('state -> 'timesup Lwt.t))
~(timeout_k: (#Proto_alpha.full ->
'state ->
'timesup ->
unit tzresult Lwt.t))
~(event_k: (#Proto_alpha.full ->
'state ->
'event ->
unit tzresult Lwt.t))
=
lwt_log_info "Setting up before the %s can start." name >>= fun () ->
wait_for_first_event stream >>= fun first_event ->
Shell_services.Blocks.hash cctxt ~block:`Genesis () >>=? fun genesis_hash ->
(* statefulness *)
let last_get_event = ref None in
let get_event () =
match !last_get_event with
| None ->
let t = Lwt_stream.get stream in
last_get_event := Some t ;
t
| Some t -> t in
state_maker genesis_hash first_event >>=? fun state ->
log_errors_and_continue @@ pre_loop cctxt state first_event >>= fun () ->
(* main loop *)
let rec worker_loop () =
begin
(* event construction *)
let timeout = compute_timeout state in
Lwt.choose [ (timeout >|= fun timesup -> `Timeout timesup) ;
(get_event () >|= fun e -> `Event e) ;
] >>= function
(* event matching *)
| `Event (None | Some (Error _)) ->
(* exit when the node is unavailable *)
last_get_event := None ;
lwt_log_error "Connection to node lost, %s exiting." name >>= fun () ->
exit 1
| `Event (Some (Ok event)) -> begin
(* new event: cancel everything and execute callback *)
last_get_event := None ;
(* TODO: pretty-print events (requires passing a pp as argument) *)
log_errors_and_continue @@ event_k cctxt state event
end
| `Timeout timesup ->
(* main event: it's time *)
lwt_debug "Waking up for %s." name >>= fun () ->
(* core functionality *)
log_errors_and_continue @@ timeout_k cctxt state timesup
end >>= fun () ->
(* and restart *)
worker_loop () in
(* ignition *)
lwt_log_info "Starting %s daemon" name >>= fun () ->
worker_loop ()

View File

@ -10,8 +10,30 @@
val sleep_until: Time.t -> unit Lwt.t option
val wait_for_first_block:
?info:((unit Lwt.t, unit) Client_context.lwt_format -> unit Lwt.t) ->
Client_baking_blocks.block_info tzresult Lwt_stream.t ->
(Client_baking_blocks.block_info -> 'a Lwt.t) ->
'a Lwt.t
val wait_for_first_event:
'event tzresult Lwt_stream.t ->
'event Lwt.t
val main :
name:string ->
cctxt:(#Proto_alpha.full as 'a) ->
stream:'event tzresult Lwt_stream.t ->
state_maker:(Block_hash.t -> 'event -> 'state tzresult Lwt.t) ->
pre_loop:('a -> 'state -> 'event -> unit tzresult Lwt.t) ->
compute_timeout:('state -> 'timesup Lwt.t) ->
timeout_k:('a -> 'state -> 'timesup -> unit tzresult Lwt.t) ->
event_k:('a -> 'state -> 'event -> unit tzresult Lwt.t) ->
unit tzresult Lwt.t
(** [main ~name ~cctxt ~stream ~state_maker ~pre_loop ~timeout_maker ~timeout_k
~event_k] is an infinitely running loop that
monitors new events arriving on [stream]. The loop exits when the
[stream] gives an error.
The function [pre_loop] is called before the loop starts.
The loop maintains a state (of type ['state]) initialized by [state_maker]
and passed to the callbacks [timeout_maker] (used to set up waking-up
timeouts), [timeout_k] (when a computed timeout happens), and [event_k]
(when a new event arrives on the stream).
*)

View File

@ -12,7 +12,7 @@ module Endorser = struct
let run (cctxt : #Proto_alpha.full) ~delay ?min_date delegates =
Client_baking_blocks.monitor_heads
cctxt `Main >>=? fun block_stream ->
Client_baking_endorsement.create cctxt ~delay delegates block_stream >>= fun () ->
Client_baking_endorsement.create cctxt ~delay delegates block_stream >>=? fun () ->
ignore min_date;
return ()
@ -35,7 +35,7 @@ module Accuser = struct
let run (cctxt : #Proto_alpha.full) =
Client_baking_operations.monitor_endorsement
cctxt >>=? fun endorsement_stream ->
Client_baking_denunciation.create cctxt endorsement_stream >>= fun () ->
Client_baking_denunciation.create cctxt endorsement_stream >>=? fun () ->
return ()
end