From fcd29a36f3f6e793e355a6807b9afab2f6ffa71d Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 23 May 2018 16:29:19 +0200 Subject: [PATCH] Client/Endorser: remove endorsing scheduler --- .../lib_baking/client_baking_endorsement.ml | 248 +++++++++--------- .../lib_baking/client_baking_endorsement.mli | 2 +- 2 files changed, 118 insertions(+), 132 deletions(-) diff --git a/src/proto_alpha/lib_baking/client_baking_endorsement.ml b/src/proto_alpha/lib_baking/client_baking_endorsement.ml index 559a3e158..f8e530dfc 100644 --- a/src/proto_alpha/lib_baking/client_baking_endorsement.ml +++ b/src/proto_alpha/lib_baking/client_baking_endorsement.ml @@ -123,25 +123,35 @@ let forge_endorsement (cctxt : #Proto_alpha.full) Alpha_block_services.metadata cctxt ~chain ~block () >>=? fun { protocol_data = { level = { level } } } -> check_endorsement cctxt level src_pkh >>=? fun () -> - begin - match slots with - | Some slots -> return slots - | None -> - get_signing_slots - cctxt ~chain block src_pkh level >>=? function - | [] -> cctxt#error "No slot found at level %a" Raw_level.pp level - | slots -> return slots - end >>=? fun slots -> - check_endorsement cctxt level src_pkh >>=? fun () -> - inject_endorsement cctxt ~chain ?async block level src_sk slots src_pkh + previously_endorsed_level cctxt src_pkh level >>=? function + | true -> + cctxt#error "Level %a : previously endorsed." + Raw_level.pp level + | false -> + begin + match slots with + | Some slots -> return slots + | None -> + get_signing_slots + cctxt ~chain block src_pkh level >>=? function + | [] -> cctxt#error "No slot found at level %a" Raw_level.pp level + | slots -> return slots + end >>=? fun slots -> + inject_endorsement cctxt ~chain ?async block level src_sk slots src_pkh >>=? fun oph -> + Client_keys.get_key cctxt src_pkh >>=? fun (name, _pk, _sk) -> + cctxt#message + "Injected endorsement level %a, contract %s '%a'" + Raw_level.pp level + name + Operation_hash.pp_short oph >>= + fun () -> return oph (** Worker *) type state = { delegates: public_key_hash list ; - mutable best: Client_baking_blocks.block_info ; - mutable to_endorse: endorsement list ; delay: int64; + mutable to_endorse : endorsement option } and endorsement = { time: Time.t ; @@ -150,11 +160,10 @@ and endorsement = { slots: int list; } -let create_state delegates best delay = +let create_state delegates delay = { delegates ; - best ; - to_endorse = [] ; delay ; + to_endorse = None ; } let rec insert ({time} as e) = function @@ -171,13 +180,29 @@ let get_delegates cctxt state = | _ :: _ as delegates -> return delegates -let drop_old_endorsement ~before state = - state.to_endorse <- - List.filter - (fun { block } -> Fitness.compare before block.fitness <= 0) - state.to_endorse +let endorse_for cctxt = function + None -> return () + | Some {delegate; block ; slots} -> + let hash = block.hash in + let b = `Hash (hash, 0) in + let level = block.level.level in + Client_keys.get_key cctxt delegate >>=? fun (name, _pk, sk) -> + lwt_debug "Endorsing %a for %s (level %a using %d slots)!" + Block_hash.pp_short hash name + Raw_level.pp level + (List.length slots) >>= fun () -> + inject_endorsement cctxt + b level + sk slots delegate >>=? fun oph -> + lwt_log_info + "Injected endorsement for block '%a' \ + (level %a, contract %s) '%a'" + Block_hash.pp_short hash + Raw_level.pp level + name + Operation_hash.pp_short oph >>= return -let schedule_endorsements (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state bis = +let prepare_endorsement (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state bis = let may_endorse (block: Client_baking_blocks.block_info) delegate time = Client_keys.Public_key_hash.name cctxt delegate >>=? fun name -> lwt_log_info "May endorse block %a for %s" @@ -185,59 +210,21 @@ let schedule_endorsements (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state b let b = `Hash (block.hash, 0) in let level = block.level.level in get_signing_slots cctxt b delegate level >>=? fun slots -> - lwt_debug "Found %d slots for %a/%s" - (List.length slots) Block_hash.pp_short block.hash name >>= fun () -> + lwt_debug "Found slots for %a/%s (%d)" + Block_hash.pp_short block.hash name (List.length slots) >>= fun () -> previously_endorsed_level cctxt delegate level >>=? function | true -> lwt_debug "Level %a : previously endorsed." Raw_level.pp level >>= return | false -> - if Fitness.compare state.best.fitness block.fitness < 0 then begin - state.best <- block ; - drop_old_endorsement ~before:block.fitness state ; - end ; - begin try - let same_slot endorsement = - endorsement.block.level = block.level && endorsement.slots = slots in - let old = List.find same_slot state.to_endorse in - if Fitness.compare old.block.fitness block.fitness < 0 - then begin - lwt_log_info - "Schedule endorsement for block %a \ - (level %a, slots { %a }, time %a) (replace block %a)" - Block_hash.pp_short block.hash - Raw_level.pp level - (Format.pp_print_list Format.pp_print_int) slots - Time.pp_hum time - Block_hash.pp_short old.block.hash - >>= fun () -> - state.to_endorse <- - insert - { time ; delegate ; block ; slots } - (List.filter - (fun e -> not (same_slot e)) - state.to_endorse) ; - return () - end else begin - lwt_debug - "slot { %a } : better pending endorsement" - (Format.pp_print_list Format.pp_print_int) slots >>= fun () -> - return () - end - with Not_found -> - lwt_log_info - "Schedule endorsement for block %a \ - (level %a, slot { %a }, time %a)" - Block_hash.pp_short block.hash - Raw_level.pp level - (Format.pp_print_list Format.pp_print_int) slots - Time.pp_hum time >>= fun () -> - state.to_endorse <- - insert { time ; delegate ; block ; slots } state.to_endorse ; - return () - end + return + (match state.to_endorse with + | None -> + state.to_endorse <- Some {time ; delegate ; block; slots} + | Some old -> + if Fitness.compare old.block.fitness block.fitness < 0 then + state.to_endorse <- Some {time ; delegate ; block; slots}) in - let time = Time.(add (now ()) state.delay) in get_delegates cctxt state >>=? fun delegates -> iter_p (fun delegate -> @@ -250,67 +237,75 @@ let schedule_endorsements (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state b lwt_log_info "Ignore block %a: forged too far the past" Block_hash.pp_short bi.hash >>= return else - may_endorse bi delegate time) - bis) + let time = Time.(add (now ()) state.delay) in + may_endorse bi delegate time + ) bis + ) delegates -let schedule_endorsements (cctxt : #Proto_alpha.full) ~max_past state bis = - schedule_endorsements cctxt ~max_past state bis >>= function - | Error exns -> - lwt_log_error - "@[Error(s) while scheduling endorsements@,%a@]" - pp_print_error exns - | Ok () -> Lwt.return_unit - -let pop_endorsements state = - let now = Time.now () in - let rec pop acc = function - | [] -> List.rev acc, [] - | {time} :: _ as slots when Time.compare now time <= 0 -> - List.rev acc, slots - | slot :: slots -> pop (slot :: acc) slots in - let to_endorse, future_endorsement = pop [] state.to_endorse in - state.to_endorse <- future_endorsement ; - to_endorse - -let endorse cctxt state = - let to_endorse = pop_endorsements state in - iter_p (fun { time = _ ; block ; slots ; delegate } -> - let hash = block.hash in - let b = `Hash (hash, 0) in - let level = block.level.level in - Client_keys.get_key cctxt delegate >>=? fun (name, pk, sk) -> - let pkh = Signature.Public_key.hash pk in - lwt_debug "Endorsing %a for %s (slots : { %a } )!" - Block_hash.pp_short block.hash name - (Format.pp_print_list Format.pp_print_int) slots >>= fun () -> - inject_endorsement cctxt b block.level.level sk slots pkh >>=? fun oph -> - cctxt#message - "Injected endorsement for block '%a' \ - (level %a, slots { %a }, contract %s) '%a'" - Block_hash.pp_short hash - Raw_level.pp level - (Format.pp_print_list Format.pp_print_int) slots name - Operation_hash.pp_short oph >>= fun () -> return () - ) to_endorse +(* let endorse (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state bis = + * let may_endorse (block: Client_baking_blocks.block_info) delegate = + * Client_keys.Public_key_hash.name cctxt delegate >>=? fun name -> + * lwt_log_info "May endorse block %a for %s" + * Block_hash.pp_short block.hash name >>= fun () -> + * let b = `Hash (block.hash, 0) in + * let level = block.level.level in + * get_signing_slots cctxt b delegate level >>=? fun slots -> + * lwt_debug "Found %d slots for %a/%s" + * (List.length slots) Block_hash.pp_short block.hash name >>= fun () -> + * previously_endorsed_level cctxt delegate level >>=? function + * | true -> + * lwt_debug "Level %a : previously endorsed." + * Raw_level.pp level >>= fun () -> + * return [] + * | false -> + * return slots + * in + * get_delegates cctxt state >>=? fun delegates -> + * iter_p + * (fun delegate -> + * iter_p + * (fun (bi : Client_baking_blocks.block_info) -> + * if Time.compare bi.timestamp (Time.now ()) > 0 then + * lwt_log_info "Ignore block %a: forged in the future" + * Block_hash.pp_short bi.hash >>= return + * else if Time.(min (now ()) bi.timestamp > max_past) then + * lwt_log_info "Ignore block %a: forged too far the past" + * Block_hash.pp_short bi.hash >>= return + * else + * may_endorse bi delegate >>=? function + * | [] -> + * return () + * | slots -> + * endorse_for cctxt delegate bi slots ) + * bis) + * delegates *) let compute_timeout state = match state.to_endorse with - | [] -> Lwt_utils.never_ending - | {time} :: _ -> + | None -> Lwt_utils.never_ending + | Some {time} -> let delay = (Time.diff time (Time.now ())) in if delay <= 0L then Lwt.return_unit else Lwt_unix.sleep (Int64.to_float delay) +let check_error f = + f >>= function + | Ok () -> Lwt.return_unit + | Error errs -> + lwt_log_error "Error while endorsing:@\n%a" + pp_print_error + errs >>= fun () -> + Lwt.return_unit let create (cctxt : #Proto_alpha.full) ?(max_past=(Time.of_seconds 110L)) ~delay contracts block_stream = lwt_log_info "Starting endorsement daemon" >>= fun () -> Lwt_stream.get block_stream >>= function - | None | Some (Error _) -> + | None | Some (Ok []) | Some (Error _) -> cctxt#error "Can't fetch the current block head." - | Some (Ok head) -> + | Some (Ok initial_heads) -> let last_get_block = ref None in let get_block () = match !last_get_block with @@ -319,29 +314,20 @@ let create (cctxt : #Proto_alpha.full) ?(max_past=(Time.of_seconds 110L)) ~delay last_get_block := Some t ; t | Some t -> t in - let state = create_state contracts head (Int64.of_int delay) in + let state = create_state contracts (Int64.of_int delay) in let rec worker_loop () = let timeout = compute_timeout state in Lwt.choose [ (timeout >|= fun () -> `Timeout) ; - (get_block () >|= fun b -> `Hash b) ] >>= function + (get_block () >|= fun b -> `Hash b) ] >>= function | `Hash (None | Some (Error _)) -> Lwt.return_unit - | `Hash (Some (Ok bi)) -> - Lwt.cancel timeout ; + | `Hash (Some (Ok bis)) -> + Lwt.cancel timeout; last_get_block := None ; - schedule_endorsements cctxt ~max_past state [ bi ] >>= fun () -> + check_error (prepare_endorsement cctxt ~max_past state bis) >>= fun () -> worker_loop () | `Timeout -> - begin - endorse cctxt state >>= function - | Ok () -> Lwt.return_unit - | Error errs -> - lwt_log_error "Error while endorsing:@\n%a" - pp_print_error - errs >>= fun () -> - Lwt.return_unit - end >>= fun () -> - worker_loop () - in - schedule_endorsements cctxt ~max_past state [ head ] >>= fun () -> + check_error (endorse_for cctxt state.to_endorse) >>= fun () -> + worker_loop () in + check_error (prepare_endorsement cctxt ~max_past state initial_heads) >>= fun () -> worker_loop () diff --git a/src/proto_alpha/lib_baking/client_baking_endorsement.mli b/src/proto_alpha/lib_baking/client_baking_endorsement.mli index 52eb8721b..ae106e122 100644 --- a/src/proto_alpha/lib_baking/client_baking_endorsement.mli +++ b/src/proto_alpha/lib_baking/client_baking_endorsement.mli @@ -25,4 +25,4 @@ val create : ?max_past:Time.t -> delay:int -> public_key_hash list -> - Client_baking_blocks.block_info tzresult Lwt_stream.t -> unit Lwt.t + Client_baking_blocks.block_info list tzresult Lwt_stream.t -> unit Lwt.t