diff --git a/src/proto_alpha/lib_delegate/client_baking_denunciation.ml b/src/proto_alpha/lib_delegate/client_baking_denunciation.ml index 466f4e763..311333f00 100644 --- a/src/proto_alpha/lib_delegate/client_baking_denunciation.ml +++ b/src/proto_alpha/lib_delegate/client_baking_denunciation.ml @@ -10,42 +10,36 @@ include Logging.Make(struct let name = "client.denunciation" end) let create cctxt endorsement_stream = - let last_get_endorsement = ref None in - let get_endorsement () = - match !last_get_endorsement with - | None -> - let t = Lwt_stream.get endorsement_stream in - last_get_endorsement := Some t ; - t - | Some t -> t in - let rec worker_loop () = - (* let timeout = compute_timeout state in *) - Lwt.choose [ - (* (timeout >|= fun () -> `Timeout) ; *) - (get_endorsement () >|= fun e -> `Endorsement e) ; - ] >>= function - | `Endorsement (None | Some (Error _)) -> - lwt_log_error "Connection to node lost, exiting." >>= fun () -> - exit 1 - | `Endorsement (Some (Ok e)) -> - last_get_endorsement := None ; - Client_keys.Public_key_hash.name cctxt - e.Client_baking_operations.source >>= function - | Ok source -> - lwt_debug - "Discovered endorsement for block %a by %s (slot @[%a@])" - Block_hash.pp_short e.block - source - Format.(pp_print_list pp_print_int) e.slots >>= fun () -> - worker_loop () - | Error errs -> - lwt_log_error "Error whilst checking the endorsment %a/%a:@\n%a" - Block_hash.pp_short e.block - Format.(pp_print_list pp_print_int) e.slots - pp_print_error errs >>= fun () -> - worker_loop () + let never_ends = Lwt_utils.never_ending () in + + let event_k cctxt () e = + (* TODO: more than just logging *) + Client_keys.Public_key_hash.name + cctxt + e.Client_baking_operations.source >>= function + | Ok source -> + lwt_debug + "Discovered endorsement for block %a by %s (slot @[%a@])" + Block_hash.pp_short e.block + source + Format.(pp_print_list pp_print_int) e.slots >>= fun () -> + return () + | Error errs -> + lwt_log_error "Error whilst checking the endorsment %a/%a:@\n%a" + Block_hash.pp_short e.block + Format.(pp_print_list pp_print_int) e.slots + pp_print_error errs >>= fun () -> + return () in - lwt_log_info "Starting denunciation daemon" >>= fun () -> - worker_loop () + Client_baking_scheduling.main + ~name:"denunciator" + ~cctxt + ~stream:endorsement_stream + ~state_maker:(fun _ _ -> return ()) + ~pre_loop:(fun _ _ _ -> return ()) + ~compute_timeout:(fun () -> never_ends) + ~timeout_k:(fun _ _ () -> return ()) + ~event_k + diff --git a/src/proto_alpha/lib_delegate/client_baking_denunciation.mli b/src/proto_alpha/lib_delegate/client_baking_denunciation.mli index 873ecec16..ebbcf6a52 100644 --- a/src/proto_alpha/lib_delegate/client_baking_denunciation.mli +++ b/src/proto_alpha/lib_delegate/client_baking_denunciation.mli @@ -10,4 +10,4 @@ val create: #Proto_alpha.full -> Client_baking_operations.valid_endorsement tzresult Lwt_stream.t -> - unit Lwt.t + unit tzresult Lwt.t diff --git a/src/proto_alpha/lib_delegate/client_baking_endorsement.ml b/src/proto_alpha/lib_delegate/client_baking_endorsement.ml index 2eb1bcbfc..47d6420e0 100644 --- a/src/proto_alpha/lib_delegate/client_baking_endorsement.ml +++ b/src/proto_alpha/lib_delegate/client_baking_endorsement.ml @@ -133,7 +133,7 @@ let allowed_to_endorse cctxt bi delegate = | false -> return true -let prepare_endorsement (cctxt : #Proto_alpha.full) ~(max_past:int64) state bi = +let prepare_endorsement ~(max_past:int64) () (cctxt : #Proto_alpha.full) state bi = if Time.diff (Time.now ()) bi.Client_baking_blocks.timestamp > max_past then lwt_log_info "Ignore block %a: forged too far the past" Block_hash.pp_short bi.hash >>= fun () -> @@ -158,77 +158,49 @@ let compute_timeout state = | None -> Lwt_utils.never_ending () | Some { timeout ; block ; delegates } -> timeout >>= fun () -> - Lwt.return (`Timeout (block, delegates)) + Lwt.return (block, delegates) let check_error f = f >>= function | Ok () -> Lwt.return_unit | Error errs -> lwt_log_error "Error while endorsing:@\n%a" pp_print_error errs -let create - (cctxt: #Proto_alpha.full) - ~max_past - ~delay - contracts - block_stream - bi = - lwt_log_info "Preparing endorsement daemon" >>= fun () -> - (* statefulness setup *) - let last_get_block = ref None in - let get_block () = - match !last_get_block with - | None -> - let t = Lwt_stream.get block_stream in - last_get_block := Some t ; - t - | Some t -> t in - - let contracts = match contracts with - | [] -> - tzlazy (fun () -> - Client_keys.get_keys cctxt >>=? fun keys -> - return (List.map (fun (_, pkh, _, _) -> pkh) keys) - ) - | _ :: _ -> - tzlazy (fun () -> return contracts) in - let state = create_state contracts (Int64.of_int delay) in - - (* main loop *) - let rec worker_loop () = - begin - Lwt.choose [ compute_timeout state ; - (get_block () >|= fun b -> `Hash b) ] >>= function - | `Hash None -> - last_get_block := None ; - lwt_log_error "Connection to node lost, exiting." >>= fun () -> - exit 1 - | `Hash (Some (Error _)) -> - last_get_block := None ; - Lwt.return_unit - | `Hash (Some (Ok bi)) -> - last_get_block := None ; - state.pending <- None ; - check_error @@ prepare_endorsement cctxt ~max_past state bi - | `Timeout (block, delegates) -> - state.pending <- None ; - check_error @@ iter_p (endorse_for_delegate cctxt block) delegates - end >>= fun () -> - worker_loop () in - - (* ignition *) - check_error (prepare_endorsement cctxt ~max_past state bi) >>= fun () -> - lwt_log_notice "Starting endorsement daemon" >>= fun () -> - worker_loop () - -(* A wrapper around the main create function (above) to wait for the initial - block. *) let create (cctxt: #Proto_alpha.full) ?(max_past=110L) ~delay contracts - (block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) = - Client_baking_scheduling.wait_for_first_block - ~info:lwt_log_info block_stream - (create cctxt ~max_past ~delay contracts block_stream) + = + + let state_maker _ _ = + let contracts = match contracts with + | [] -> + tzlazy (fun () -> + Client_keys.get_keys cctxt >>=? fun keys -> + return (List.map (fun (_, pkh, _, _) -> pkh) keys) + ) + | _ :: _ -> + tzlazy (fun () -> return contracts) in + let state = create_state contracts (Int64.of_int delay) in + return state + in + + let timeout_k cctxt state (block, delegates) = + state.pending <- None ; + iter_p (endorse_for_delegate cctxt block) delegates + in + let event_k cctxt state bi = + state.pending <- None ; + prepare_endorsement ~max_past () cctxt state bi + in + + Client_baking_scheduling.main + ~name:"endorser" + ~cctxt + ~stream:block_stream + ~state_maker + ~pre_loop:(prepare_endorsement ~max_past ()) + ~compute_timeout + ~timeout_k + ~event_k diff --git a/src/proto_alpha/lib_delegate/client_baking_endorsement.mli b/src/proto_alpha/lib_delegate/client_baking_endorsement.mli index 4c8dd535e..030ad6265 100644 --- a/src/proto_alpha/lib_delegate/client_baking_endorsement.mli +++ b/src/proto_alpha/lib_delegate/client_baking_endorsement.mli @@ -24,4 +24,4 @@ val create : ?max_past:int64 (* number of seconds *) -> delay:int -> public_key_hash list -> - Client_baking_blocks.block_info tzresult Lwt_stream.t -> unit Lwt.t + Client_baking_blocks.block_info tzresult Lwt_stream.t -> unit tzresult Lwt.t diff --git a/src/proto_alpha/lib_delegate/client_baking_forge.ml b/src/proto_alpha/lib_delegate/client_baking_forge.ml index be780b450..a2615077a 100644 --- a/src/proto_alpha/lib_delegate/client_baking_forge.ml +++ b/src/proto_alpha/lib_delegate/client_baking_forge.ml @@ -441,8 +441,9 @@ let safe_get_unrevealed_nonces cctxt block = let insert_block - (cctxt: #Proto_alpha.full) ?max_priority + () + (cctxt: #Proto_alpha.full) state (bi: Client_baking_blocks.block_info) = begin @@ -654,7 +655,12 @@ let pp_operation_list_list = (* [bake] create a single block when woken up to do so. All the necessary information (e.g., slot) is available in the [state]. *) -let bake (cctxt : #Proto_alpha.full) ?threshold state = +let bake + ?threshold + () + (cctxt : #Proto_alpha.full) + state + () = let slots = pop_baking_slots state in lwt_log_info "Found %d current slots and %d future slots." (List.length slots) @@ -715,11 +721,6 @@ let bake (cctxt : #Proto_alpha.full) ?threshold state = lwt_debug "No valid candidates." >>= fun () -> return () -let check_error p = - p >>= function - | Ok () -> Lwt.return_unit - | Error errs -> lwt_log_error "Error while baking:@\n%a" pp_print_error errs - (* [create] starts the main loop of the baker. The loop monitors new blocks and @@ -732,80 +733,31 @@ let create ~(context_path: string) (delegates: public_key_hash list) (block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) - (bi: Client_baking_blocks.block_info) = + = - lwt_log_info "Setting up before the baker can start." >>= fun () -> - Shell_services.Blocks.hash cctxt ~block:`Genesis () >>=? fun genesis_hash -> + let state_maker genesis_hash bi = + let delegates = match delegates with + | [] -> + tzlazy (fun () -> + Client_keys.get_keys cctxt >>=? fun keys -> + let delegates = List.map (fun (_,pkh,_,_) -> pkh) keys in + return delegates + ) + | _ :: _ -> tzlazy (fun () -> return delegates) in + let constants = + tzlazy (fun () -> Alpha_services.Constants.all cctxt (`Main, `Head 0)) in + Client_baking_simulator.load_context ~context_path >>= fun index -> + let state = create_state genesis_hash index delegates constants bi in + return state + in - (* statefulness *) - let last_get_block = ref None in - let get_block () = - match !last_get_block with - | None -> - let t = Lwt_stream.get block_stream in - last_get_block := Some t ; - t - | Some t -> t in - lwt_debug "Opening shell context" >>= fun () -> - Client_baking_simulator.load_context ~context_path >>= fun index -> - let delegates = match delegates with - | [] -> - tzlazy (fun () -> - Client_keys.get_keys cctxt >>=? fun keys -> - let delegates = List.map (fun (_,pkh,_,_) -> pkh) keys in - return delegates - ) - | _ :: _ -> tzlazy (fun () -> return delegates) in - let constants = - tzlazy (fun () -> Alpha_services.Constants.all cctxt (`Main, `Head 0)) in - let state = create_state genesis_hash index delegates constants bi in - check_error @@ insert_block cctxt ?max_priority state bi >>= fun () -> + Client_baking_scheduling.main + ~name:"baker" + ~cctxt + ~stream:block_stream + ~state_maker + ~pre_loop:(insert_block ?max_priority ()) + ~compute_timeout + ~timeout_k:(bake ?threshold ()) + ~event_k:(insert_block ?max_priority ()) - (* main loop *) - let rec worker_loop () = - begin - (* event construction *) - let timeout = compute_timeout state in - Lwt.choose [ (timeout >|= fun () -> `Timeout) ; - (get_block () >|= fun b -> `Hash b) ; - ] >>= function - (* event matching *) - | `Hash (None | Some (Error _)) -> - (* exit when the node is unavailable *) - last_get_block := None ; - lwt_log_error "Connection to node lost, exiting." >>= fun () -> - exit 1 - | `Hash (Some (Ok bi)) -> begin - (* new block: cancel everything and bake on the new head *) - last_get_block := None ; - lwt_debug - "Discoverered block: %a" - Block_hash.pp_short bi.Client_baking_blocks.hash >>= fun () -> - check_error @@ insert_block cctxt ?max_priority state bi - end - | `Timeout -> - (* main event: it's baking time *) - lwt_debug "Waking up for baking..." >>= fun () -> - (* core functionality *) - check_error @@ bake cctxt ?threshold state - end >>= fun () -> - (* and restart *) - worker_loop () in - - (* ignition *) - lwt_log_info "Starting baking daemon" >>= fun () -> - worker_loop () - -(* Wrapper around previous [create] function that handles the case of - unavailable blocks (empty block chain). *) -let create - (cctxt : #Proto_alpha.full) - ?threshold - ?max_priority - ~(context_path: string) - (delegates: public_key_hash list) - (block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) = - Client_baking_scheduling.wait_for_first_block - ~info:lwt_log_info - block_stream - (create cctxt ?threshold ?max_priority ~context_path delegates block_stream) diff --git a/src/proto_alpha/lib_delegate/client_baking_scheduling.ml b/src/proto_alpha/lib_delegate/client_baking_scheduling.ml index bef74c213..6be4bfb21 100644 --- a/src/proto_alpha/lib_delegate/client_baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/client_baking_scheduling.ml @@ -7,6 +7,7 @@ (* *) (**************************************************************************) +include Logging.Make(struct let name = "client.scheduling" end) let sleep_until time = let delay = Time.diff time (Time.now ()) in @@ -15,17 +16,89 @@ let sleep_until time = else Some (Lwt_unix.sleep (Int64.to_float delay)) -let wait_for_first_block - ?(info = fun (_: (unit Lwt.t, unit) Client_context.lwt_format) -> Lwt.return_unit) - (block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) - k = - let rec wait_for_first_block () = - Lwt_stream.get block_stream >>= function - | None | Some (Error _) -> - info "Can't fetch the current block head. Retrying soon." >>= fun () -> - (* NOTE: this is not a tight loop because of Lwt_stream.get *) - wait_for_first_block () - | Some (Ok bi) -> - k bi - in - wait_for_first_block () +let rec wait_for_first_event stream = + Lwt_stream.get stream >>= function + | None | Some (Error _) -> + lwt_log_info "Can't fetch the current event. Waiting for new event." >>= fun () -> + (* NOTE: this is not a tight loop because of Lwt_stream.get *) + wait_for_first_event stream + | Some (Ok bi) -> + Lwt.return bi + +let log_errors_and_continue p = + p >>= function + | Ok () -> Lwt.return_unit + | Error errs -> lwt_log_error "Error while baking:@\n%a" pp_print_error errs + +let main + ~(name: string) + ~(cctxt: #Proto_alpha.full) + ~(stream: 'event tzresult Lwt_stream.t) + ~(state_maker: (Block_hash.t -> + 'event -> + 'state tzresult Lwt.t)) + ~(pre_loop: (#Proto_alpha.full -> + 'state -> + 'event -> + unit tzresult Lwt.t)) + ~(compute_timeout: ('state -> 'timesup Lwt.t)) + ~(timeout_k: (#Proto_alpha.full -> + 'state -> + 'timesup -> + unit tzresult Lwt.t)) + ~(event_k: (#Proto_alpha.full -> + 'state -> + 'event -> + unit tzresult Lwt.t)) + = + + lwt_log_info "Setting up before the %s can start." name >>= fun () -> + + wait_for_first_event stream >>= fun first_event -> + Shell_services.Blocks.hash cctxt ~block:`Genesis () >>=? fun genesis_hash -> + + (* statefulness *) + let last_get_event = ref None in + let get_event () = + match !last_get_event with + | None -> + let t = Lwt_stream.get stream in + last_get_event := Some t ; + t + | Some t -> t in + state_maker genesis_hash first_event >>=? fun state -> + + log_errors_and_continue @@ pre_loop cctxt state first_event >>= fun () -> + + (* main loop *) + let rec worker_loop () = + begin + (* event construction *) + let timeout = compute_timeout state in + Lwt.choose [ (timeout >|= fun timesup -> `Timeout timesup) ; + (get_event () >|= fun e -> `Event e) ; + ] >>= function + (* event matching *) + | `Event (None | Some (Error _)) -> + (* exit when the node is unavailable *) + last_get_event := None ; + lwt_log_error "Connection to node lost, %s exiting." name >>= fun () -> + exit 1 + | `Event (Some (Ok event)) -> begin + (* new event: cancel everything and execute callback *) + last_get_event := None ; + (* TODO: pretty-print events (requires passing a pp as argument) *) + log_errors_and_continue @@ event_k cctxt state event + end + | `Timeout timesup -> + (* main event: it's time *) + lwt_debug "Waking up for %s." name >>= fun () -> + (* core functionality *) + log_errors_and_continue @@ timeout_k cctxt state timesup + end >>= fun () -> + (* and restart *) + worker_loop () in + + (* ignition *) + lwt_log_info "Starting %s daemon" name >>= fun () -> + worker_loop () diff --git a/src/proto_alpha/lib_delegate/client_baking_scheduling.mli b/src/proto_alpha/lib_delegate/client_baking_scheduling.mli index 0a079ec36..24cffc0af 100644 --- a/src/proto_alpha/lib_delegate/client_baking_scheduling.mli +++ b/src/proto_alpha/lib_delegate/client_baking_scheduling.mli @@ -10,8 +10,30 @@ val sleep_until: Time.t -> unit Lwt.t option -val wait_for_first_block: - ?info:((unit Lwt.t, unit) Client_context.lwt_format -> unit Lwt.t) -> - Client_baking_blocks.block_info tzresult Lwt_stream.t -> - (Client_baking_blocks.block_info -> 'a Lwt.t) -> - 'a Lwt.t +val wait_for_first_event: + 'event tzresult Lwt_stream.t -> + 'event Lwt.t + +val main : + name:string -> + cctxt:(#Proto_alpha.full as 'a) -> + stream:'event tzresult Lwt_stream.t -> + state_maker:(Block_hash.t -> 'event -> 'state tzresult Lwt.t) -> + pre_loop:('a -> 'state -> 'event -> unit tzresult Lwt.t) -> + compute_timeout:('state -> 'timesup Lwt.t) -> + timeout_k:('a -> 'state -> 'timesup -> unit tzresult Lwt.t) -> + event_k:('a -> 'state -> 'event -> unit tzresult Lwt.t) -> + unit tzresult Lwt.t + +(** [main ~name ~cctxt ~stream ~state_maker ~pre_loop ~timeout_maker ~timeout_k + ~event_k] is an infinitely running loop that + monitors new events arriving on [stream]. The loop exits when the + [stream] gives an error. + + The function [pre_loop] is called before the loop starts. + + The loop maintains a state (of type ['state]) initialized by [state_maker] + and passed to the callbacks [timeout_maker] (used to set up waking-up + timeouts), [timeout_k] (when a computed timeout happens), and [event_k] + (when a new event arrives on the stream). +*) diff --git a/src/proto_alpha/lib_delegate/client_daemon.ml b/src/proto_alpha/lib_delegate/client_daemon.ml index 8a0c4fd53..8203eaa9f 100644 --- a/src/proto_alpha/lib_delegate/client_daemon.ml +++ b/src/proto_alpha/lib_delegate/client_daemon.ml @@ -12,7 +12,7 @@ module Endorser = struct let run (cctxt : #Proto_alpha.full) ~delay ?min_date delegates = Client_baking_blocks.monitor_heads cctxt `Main >>=? fun block_stream -> - Client_baking_endorsement.create cctxt ~delay delegates block_stream >>= fun () -> + Client_baking_endorsement.create cctxt ~delay delegates block_stream >>=? fun () -> ignore min_date; return () @@ -35,7 +35,7 @@ module Accuser = struct let run (cctxt : #Proto_alpha.full) = Client_baking_operations.monitor_endorsement cctxt >>=? fun endorsement_stream -> - Client_baking_denunciation.create cctxt endorsement_stream >>= fun () -> + Client_baking_denunciation.create cctxt endorsement_stream >>=? fun () -> return () end