From 34aeaadb73996138943bf69ace1810fe9f60d992 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 12 Jun 2018 17:10:52 +0800 Subject: [PATCH] =?UTF-8?q?Alpha/Baker:=20major=20=C3=A6sthetics?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - decompose long functions into small sub-functions - add comments - factor some code - polish out some unecessary bits and bobs --- .../lib_delegate/client_baking_forge.ml | 524 ++++++++++-------- 1 file changed, 292 insertions(+), 232 deletions(-) diff --git a/src/proto_alpha/lib_delegate/client_baking_forge.ml b/src/proto_alpha/lib_delegate/client_baking_forge.ml index 8dff44082..86d21554b 100644 --- a/src/proto_alpha/lib_delegate/client_baking_forge.ml +++ b/src/proto_alpha/lib_delegate/client_baking_forge.ml @@ -21,6 +21,7 @@ let generate_seed_nonce () = | Error _ -> assert false | Ok nonce -> nonce + let forge_block_header (cctxt : #Proto_alpha.full) ?(chain = `Main) block delegate_sk shell priority seed_nonce_hash = @@ -42,10 +43,12 @@ let forge_block_header loop () in loop () + let empty_proof_of_work_nonce = MBytes.of_string (String.make Constants_repr.proof_of_work_nonce_size '\000') + let forge_faked_protocol_data ~priority ~seed_nonce_hash = Alpha_context.Block_header.{ contents = { priority ; seed_nonce_hash ; @@ -53,6 +56,7 @@ let forge_faked_protocol_data ~priority ~seed_nonce_hash = signature = Signature.zero } + let assert_valid_operations_hash shell_header operations = let operations_hash = Operation_list_list_hash.compute @@ -62,9 +66,8 @@ let assert_valid_operations_hash shell_header operations = fail_unless (Operation_list_list_hash.equal operations_hash shell_header.Tezos_base.Block_header.operations_hash) - (failure - "Client_baking_forge.inject_block: \ - inconsistent header.") + (failure "Client_baking_forge.inject_block: inconsistent header.") + let inject_block cctxt ?force ?(chain = `Main) @@ -100,6 +103,7 @@ let () = | _ -> None) (fun (hash, err) -> Failed_to_preapply (hash, err)) + let classify_operations (ops: Proto_alpha.operation list) = let t = Array.make (List.length Proto_alpha.Main.validation_passes) [] in List.iter @@ -110,6 +114,7 @@ let classify_operations (ops: Proto_alpha.operation list) = ops ; Array.fold_right (fun ops acc -> List.rev ops :: acc) t [] + let parse (op : Operation.raw) : Operation.packed = let protocol_data = Data_encoding.Binary.of_bytes_exn @@ -128,146 +133,146 @@ let forge (op : Operation.packed) : Operation.raw = { op.protocol_data } -let all_operations (ops : Alpha_block_services.Mempool.t) = +let ops_of_mempool (ops : Alpha_block_services.Mempool.t) = List.map (fun (_, op) -> op) ops.applied @ Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.refused [] @ Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_refused [] @ Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_delayed [] @ Operation_hash.Map.fold (fun _ op acc -> op :: acc) ops.unprocessed [] +let unopt_operations cctxt chain = function + | None -> + Alpha_block_services.Mempool.pending_operations cctxt ~chain () >>=? fun mpool -> + let ops = ops_of_mempool mpool in + return ops + | Some operations -> + return operations + +let all_ops_valid (results: error Preapply_result.t list) = + let open Operation_hash.Map in + List.for_all (fun (result: error Preapply_result.t) -> + is_empty result.refused + && is_empty result.branch_refused + && is_empty result.branch_delayed) + results + +let decode_priority cctxt chain block = function + | `Set priority -> begin + Alpha_services.Delegate.Baking_rights.get cctxt + ~all:true ~max_priority:(priority+1) (chain, block) >>=? fun rights -> + let time = + Option.apply + ~f:(fun r -> r.Alpha_services.Delegate.Baking_rights.timestamp) + (List.nth_opt rights priority) in + return (priority, time) + end + | `Auto (src_pkh, max_priority) -> + Alpha_services.Helpers.current_level + cctxt ~offset:1l (chain, block)>>=? fun { level } -> + Alpha_services.Delegate.Baking_rights.get cctxt + ?max_priority + ~levels:[level] + ~delegates:[src_pkh] + (chain, block) >>=? fun possibilities -> + try + let { Alpha_services.Delegate.Baking_rights.priority = prio ; + timestamp = time } = + List.find + (fun p -> p.Alpha_services.Delegate.Baking_rights.level = level) + possibilities in + return (prio, time) + with Not_found -> + failwith "No slot found at level %a" Raw_level.pp level + +let unopt_timestamp timestamp minimal_timestamp = + match timestamp, minimal_timestamp with + | None, None -> return (Time.now ()) + | None, Some timestamp -> return timestamp + | Some timestamp, None -> return timestamp + | Some timestamp, Some minimal_timestamp -> + if timestamp < minimal_timestamp then + failwith + "Proposed timestamp %a is earlier than minimal timestamp %a" + Time.pp_hum timestamp + Time.pp_hum minimal_timestamp + else + return timestamp + +let merge_preapps (old: error Preapply_result.t) (neu: error Preapply_result.t) = + let merge _ a b = (* merge ops *) + match a, b with + | None, None -> None + | Some x, None -> Some x + | _, Some y -> Some y in + let merge = Operation_hash.Map.merge merge in (* merge op maps *) + (* merge preapplies *) + { Preapply_result.applied = [] ; + refused = merge old.refused neu.refused ; + branch_refused = merge old.branch_refused neu.branch_refused ; + branch_delayed = merge old.branch_delayed neu.branch_delayed } + +let error_of_op (result: error Preapply_result.t) op = + let op = forge op in + let h = Tezos_base.Operation.hash op in + try Some (Failed_to_preapply (op, snd @@ Operation_hash.Map.find h result.refused)) + with Not_found -> + try Some (Failed_to_preapply (op, snd @@ Operation_hash.Map.find h result.branch_refused)) + with Not_found -> + try Some (Failed_to_preapply (op, snd @@ Operation_hash.Map.find h result.branch_delayed)) + with Not_found -> None + + let forge_block cctxt ?(chain = `Main) block ?force ?operations ?(best_effort = operations = None) ?(sort = best_effort) ?timestamp ~priority ?seed_nonce_hash ~src_sk () = - begin - match operations with - | None -> - Alpha_block_services.Mempool.pending_operations - cctxt ~chain () >>=? fun ops -> - return (all_operations ops) - | Some operations -> - return operations - end >>=? fun operations -> - begin - match priority with - | `Set priority -> begin - Alpha_services.Delegate.Baking_rights.get cctxt - ~all:true ~max_priority:(priority+1) (chain, block) >>=? fun rights -> - let time = - Option.apply - ~f:(fun r -> r.Alpha_services.Delegate.Baking_rights.timestamp) - (List.nth_opt rights priority) in - return (priority, time) - end - | `Auto (src_pkh, max_priority) -> - Alpha_services.Helpers.current_level - cctxt ~offset:1l (chain, block)>>=? fun { level } -> - Alpha_services.Delegate.Baking_rights.get cctxt - ?max_priority - ~levels:[level] - ~delegates:[src_pkh] - (chain, block) >>=? fun possibilities -> - try - let { Alpha_services.Delegate.Baking_rights.priority = prio ; - timestamp = time } = - List.find - (fun (p : Alpha_services.Delegate.Baking_rights.t) -> - p.level = level) - possibilities in - return (prio, time) - with Not_found -> - failwith "No slot found at level %a" Raw_level.pp level - end >>=? fun (priority, minimal_timestamp) -> - (* lwt_log_info "Baking block at level %a prio %d" *) - (* Raw_level.pp level priority >>= fun () -> *) - begin - match timestamp, minimal_timestamp with - | None, None -> return (Time.now ()) - | None, Some timestamp -> return timestamp - | Some timestamp, None -> return timestamp - | Some timestamp, Some minimal_timestamp -> - if timestamp < minimal_timestamp then - failwith - "Proposed timestamp %a is earlier than minimal timestamp %a" - Time.pp_hum timestamp - Time.pp_hum minimal_timestamp - else - return timestamp - end >>=? fun timestamp -> - let request = List.length operations in + + (* making the arguments usable *) + unopt_operations cctxt chain operations >>=? fun operations_arg -> + decode_priority cctxt chain block priority >>=? fun (priority, minimal_timestamp) -> + unopt_timestamp timestamp minimal_timestamp >>=? fun timestamp -> + + (* get basic building blocks *) let protocol_data = forge_faked_protocol_data ~priority ~seed_nonce_hash in - let operations = classify_operations operations in + let operations = classify_operations operations_arg in Alpha_block_services.Helpers.Preapply.block - cctxt ~block ~timestamp ~sort ~protocol_data operations >>=? - fun (shell_header, result) -> - let valid = + cctxt ~block ~timestamp ~sort ~protocol_data operations >>=? fun (shell_header, result) -> + + (* now for some logging *) + let total_op_count = List.length operations_arg in + let valid_op_count = List.fold_left (fun acc r -> acc + List.length r.Preapply_result.applied) 0 result in lwt_log_info "Found %d valid operations (%d refused) for timestamp %a" - valid (request - valid) + valid_op_count (total_op_count - valid_op_count) Time.pp_hum timestamp >>= fun () -> lwt_log_info "Computed fitness %a" Fitness.pp shell_header.fitness >>= fun () -> - if best_effort - || List.for_all (fun l -> - Operation_hash.Map.is_empty l.Preapply_result.refused - && Operation_hash.Map.is_empty l.branch_refused - && Operation_hash.Map.is_empty l.branch_delayed ) - result - then + + (* everything went well (or we don't care about errors): GO! *) + if best_effort || all_ops_valid result then let operations = - if not best_effort then - List.map (List.map forge) operations + if best_effort then + List.map (fun l -> List.map snd l.Preapply_result.applied) result else - List.map (fun l -> List.map snd l.Preapply_result.applied) result in + List.map (List.map forge) operations in inject_block cctxt ?force ~chain ~shell_header ~priority ?seed_nonce_hash ~src_sk operations + + (* some errors (and we care about them) *) else - let result = - let merge old neu = - let open Preapply_result in - let merge _key a b = - match a, b with - | None, None -> None - | Some x, None -> Some x - | _, Some y -> Some y in - { applied = [] ; - refused = - Operation_hash.Map.merge merge - old.refused - neu.refused ; - branch_refused = - Operation_hash.Map.merge merge - old.branch_refused - neu.branch_refused ; - branch_delayed = - Operation_hash.Map.merge merge - old.branch_delayed - neu.branch_delayed } in - List.fold_left merge Preapply_result.empty result in + let result = List.fold_left merge_preapps Preapply_result.empty result in Lwt.return_error @@ - List.filter_map - (fun op -> - let op = forge op in - let h = Tezos_base.Operation.hash op in - try Some (Failed_to_preapply - (op, snd @@ Operation_hash.Map.find h result.refused)) - with Not_found -> - try Some (Failed_to_preapply - (op, snd @@ Operation_hash.Map.find h result.branch_refused)) - with Not_found -> - try Some (Failed_to_preapply - (op, snd @@ Operation_hash.Map.find h result.branch_delayed)) - with Not_found -> None) - (List.concat operations) + List.filter_map (error_of_op result) (List.concat operations) (** Worker *) module State : sig + (* TODO: only [record_block] is ever used, and only once. Simplify. *) val get_block: #Client_context.wallet -> @@ -478,160 +483,215 @@ let insert_blocks cctxt ?max_priority state bi = | Error err -> lwt_log_error "Error: %a" pp_print_error err +let bake_slot + cctxt + state + seed_nonce_hash + (timestamp, (bi, priority, delegate)) (* baking slot *) + = + let chain = `Hash bi.Client_baking_blocks.chain_id in + let block = `Hash (bi.hash, 0) in + Alpha_services.Helpers.current_level cctxt + ~offset:1l (chain, block) >>=? fun next_level -> + let timestamp = + if Block_hash.equal bi.Client_baking_blocks.hash state.genesis then + Time.now () + else + timestamp in + Client_keys.Public_key_hash.name cctxt delegate >>=? fun name -> + lwt_debug "Try baking after %a (slot %d) for %s (%a)" + Block_hash.pp_short bi.hash + priority + name + Time.pp_hum timestamp >>= fun () -> + + (* get and process operations *) + Alpha_block_services.Mempool.pending_operations cctxt ~chain () >>=? fun mpool -> + let operations = ops_of_mempool mpool in + let total_op_count = List.length operations in + let seed_nonce_hash = + if next_level.expected_commitment then + Some seed_nonce_hash + else + None in + let protocol_data = + forge_faked_protocol_data ~priority ~seed_nonce_hash in + let operations = classify_operations operations in + Alpha_block_services.Helpers.Preapply.block + cctxt ~chain ~block + ~timestamp ~sort:true ~protocol_data operations >>= function + | Error errs -> + lwt_log_error "Error while prevalidating operations:@\n%a" + pp_print_error + errs >>= fun () -> + return None + | Ok (shell_header, operations) -> + lwt_debug + "Computed candidate block after %a (slot %d): %a/%d fitness: %a" + Block_hash.pp_short bi.hash priority + (Format.pp_print_list + ~pp_sep:(fun ppf () -> Format.fprintf ppf "+") + (fun ppf operations -> Format.fprintf ppf "%d" (List.length operations.Preapply_result.applied))) + operations + total_op_count + Fitness.pp shell_header.fitness >>= fun () -> + let operations = + List.map (fun l -> List.map snd l.Preapply_result.applied) operations in + return + (Some (bi, priority, shell_header, operations, delegate, seed_nonce_hash)) + +let fittest + (_, _, (h1: Block_header.shell_header), _, _, _) + (_, _, (h2: Block_header.shell_header), _, _, _) = + match Fitness.compare h1.fitness h2.fitness with + | 0 -> Time.compare h1.timestamp h2.timestamp + | cmp -> ~- cmp + +let fit_enough (state: state) (shell_header: Block_header.shell_header) = + Fitness.compare state.best.fitness shell_header.fitness < 0 + || (Fitness.compare state.best.fitness shell_header.fitness = 0 + && Time.compare shell_header.timestamp state.best.timestamp < 0) + +let record_nonce_hash cctxt level block_hash seed_nonce seed_nonce_hash = + if seed_nonce_hash <> None then + State.record_block cctxt level block_hash seed_nonce + |> trace_exn (Failure "Error while recording block") + else + return () + +let pp_operation_list_list = + Format.pp_print_list + ~pp_sep:(fun ppf () -> Format.fprintf ppf "+") + (fun ppf operations -> Format.fprintf ppf "%d" (List.length operations)) + +(* [bake] create a single block when woken up to do so. All the necessary + information (e.g., slot) is available in the [state]. *) let bake (cctxt : #Proto_alpha.full) state = let slots = pop_baking_slots state in let seed_nonce = generate_seed_nonce () in let seed_nonce_hash = Nonce.hash seed_nonce in - filter_map_s - (fun (timestamp, (bi, priority, delegate)) -> - let chain = `Hash bi.Client_baking_blocks.chain_id in - let block = `Hash (bi.hash, 0) in - Alpha_services.Helpers.current_level cctxt - ~offset:1l (chain, block) >>=? fun next_level -> - let timestamp = - if Block_hash.equal bi.Client_baking_blocks.hash state.genesis then - Time.now () - else - timestamp in - Client_keys.Public_key_hash.name cctxt delegate >>=? fun name -> - lwt_debug "Try baking after %a (slot %d) for %s (%a)" - Block_hash.pp_short bi.hash - priority name Time.pp_hum timestamp >>= fun () -> - Alpha_block_services.Mempool.pending_operations - cctxt ~chain () >>=? fun ops -> - let operations = all_operations ops in - let request = List.length operations in - let seed_nonce_hash = - if next_level.expected_commitment then - Some seed_nonce_hash - else - None in - let protocol_data = - forge_faked_protocol_data ~priority ~seed_nonce_hash in - let operations = classify_operations operations in - Alpha_block_services.Helpers.Preapply.block - cctxt ~chain ~block - ~timestamp ~sort:true ~protocol_data operations >>= function - | Error errs -> - lwt_log_error "Error while prevalidating operations:@\n%a" - pp_print_error - errs >>= fun () -> - return None - | Ok (shell_header, operations) -> - lwt_debug - "Computed candidate block after %a (slot %d): %a/%d fitness: %a" - Block_hash.pp_short bi.hash priority - (Format.pp_print_list - ~pp_sep:(fun ppf () -> Format.fprintf ppf "+") - (fun ppf operations -> Format.fprintf ppf "%d" (List.length operations.Preapply_result.applied))) - operations - request - Fitness.pp shell_header.fitness >>= fun () -> - let operations = - List.map (fun l -> List.map snd l.Preapply_result.applied) operations in - return - (Some (bi, priority, shell_header, operations, delegate, seed_nonce_hash))) - slots >>=? fun candidates -> - let candidates = - List.sort - (fun (_,_,h1,_,_,_) (_,_,h2,_,_,_) -> - match - Fitness.compare h1.Tezos_base.Block_header.fitness h2.fitness - with - | 0 -> - Time.compare h1.timestamp h2.timestamp - | cmp -> ~- cmp) - candidates in + + (* baking for each slot *) + filter_map_s (bake_slot cctxt state seed_nonce_hash) slots >>=? fun candidates -> + + (* selecting the candidate baked block *) + let candidates = List.sort fittest candidates in match candidates with | (bi, priority, shell_header, operations, delegate, seed_nonce_hash) :: _ - when Fitness.compare state.best.fitness shell_header.fitness < 0 || - (Fitness.compare state.best.fitness shell_header.fitness = 0 && - Time.compare shell_header.timestamp state.best.timestamp < 0) -> begin + when fit_enough state shell_header -> begin let level = Raw_level.succ bi.level.level in cctxt#message "Select candidate block after %a (slot %d) fitness: %a" Block_hash.pp_short bi.hash priority Fitness.pp shell_header.fitness >>= fun () -> + + (* core function *) Client_keys.get_key cctxt delegate >>=? fun (_,_,src_sk) -> let chain = `Hash bi.Client_baking_blocks.chain_id in inject_block cctxt ~force:true ~chain ~shell_header ~priority ?seed_nonce_hash ~src_sk operations + (* /core function; back to logging and info *) + |> trace_exn (Failure "Error while injecting block") >>=? fun block_hash -> - begin - if seed_nonce_hash <> None then - State.record_block cctxt level block_hash seed_nonce - |> trace_exn (Failure "Error while recording block") - else - return () - end >>=? fun () -> + record_nonce_hash cctxt level block_hash seed_nonce seed_nonce_hash >>=? fun () -> Client_keys.Public_key_hash.name cctxt delegate >>=? fun name -> cctxt#message - "Injected block %a for %s after %a \ - \ (level %a, slot %d, fitness %a, operations %a)" + "Injected block %a for %s after %a (level %a, slot %d, fitness %a, operations %a)" Block_hash.pp_short block_hash name Block_hash.pp_short bi.hash Raw_level.pp level priority Fitness.pp shell_header.fitness - (Format.pp_print_list - ~pp_sep:(fun ppf () -> Format.fprintf ppf "+") - (fun ppf operations -> Format.fprintf ppf "%d" (List.length operations))) - operations >>= fun () -> + pp_operation_list_list operations >>= fun () -> return () end + | _ -> lwt_debug "No valid candidates." >>= fun () -> return () + +(* [create] starts the main loop of the baker. The loop monitors new blocks and + starts individual baking operations when baking-slots are available to any of + the [delegates] *) let create - (cctxt : #Proto_alpha.full) ?max_priority delegates - (block_stream: - Client_baking_blocks.block_info tzresult Lwt_stream.t) = + (cctxt : #Proto_alpha.full) + ?max_priority + (delegates: public_key_hash list) + (block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) + (bi: Client_baking_blocks.block_info) = + + Shell_services.Blocks.hash cctxt ~block:`Genesis () >>=? fun genesis_hash -> + + (* statefulness *) + let last_get_block = ref None in + let get_block () = + match !last_get_block with + | None -> + let t = Lwt_stream.get block_stream in + last_get_block := Some t ; + t + | Some t -> t in + let state = create_state genesis_hash delegates bi in + insert_blocks cctxt ?max_priority state bi >>= fun () -> + + (* main loop *) + let rec worker_loop () = + begin + (* event construction *) + let timeout = compute_timeout state in + Lwt.choose [ (timeout >|= fun () -> `Timeout) ; + (get_block () >|= fun b -> `Hash b) ; + ] >>= function + (* event matching *) + | `Hash (None | Some (Error _)) -> + (* return to restart *) + Lwt.return_unit + + | `Hash (Some (Ok bi)) -> begin + (* new block: cancel everything and bake on the new head *) + Lwt.cancel timeout ; + last_get_block := None ; + lwt_debug + "Discoverered block: %a" + Block_hash.pp_short bi.Client_baking_blocks.hash >>= fun () -> + insert_blocks cctxt ?max_priority state bi + end + + | `Timeout -> + (* main event: it's baking time *) + lwt_debug "Waking up for baking..." >>= fun () -> + begin + (* core functionality *) + bake cctxt state >>= function + | Ok () -> Lwt.return_unit + | Error errs -> lwt_log_error "Error while baking:@\n%a" pp_print_error errs + end + + end >>= fun () -> + (* and restart *) + worker_loop () in + + (* ignition *) + lwt_log_info "Starting baking daemon" >>= fun () -> + worker_loop () + + + +(* Wrapper around previous [create] function that handles the case of + unavailable blocks (empty block chain). *) +let create + (cctxt : #Proto_alpha.full) + ?max_priority + (delegates: public_key_hash list) + (block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) = Lwt_stream.get block_stream >>= function | None | Some (Error _) -> cctxt#error "Can't fetch the current block head." | Some (Ok bi) -> - Shell_services.Blocks.hash cctxt ~block:`Genesis () >>=? fun genesis_hash -> - let last_get_block = ref None in - let get_block () = - match !last_get_block with - | None -> - let t = Lwt_stream.get block_stream in - last_get_block := Some t ; - t - | Some t -> t in - let state = create_state genesis_hash delegates bi in - insert_blocks cctxt ?max_priority state bi >>= fun () -> - let rec worker_loop () = - let timeout = compute_timeout state in - Lwt.choose [ (timeout >|= fun () -> `Timeout) ; - (get_block () >|= fun b -> `Hash b) ; - ] >>= function - | `Hash (None | Some (Error _)) -> - Lwt.return_unit - | `Hash (Some (Ok bi)) -> begin - Lwt.cancel timeout ; - last_get_block := None ; - lwt_debug - "Discoverered block: %a" - Block_hash.pp_short bi.Client_baking_blocks.hash >>= fun () -> - insert_blocks cctxt ?max_priority state bi >>= fun () -> - worker_loop () - end - | `Timeout -> - lwt_debug "Waking up for baking..." >>= fun () -> - begin - bake cctxt state >>= function - | Ok () -> Lwt.return_unit - | Error errs -> - lwt_log_error "Error while baking:@\n%a" - pp_print_error - errs >>= fun () -> - Lwt.return_unit - end >>= fun () -> - worker_loop () in - lwt_log_info "Starting baking daemon" >>= fun () -> - worker_loop () >>= fun () -> - return () + create + cctxt ?max_priority delegates + block_stream bi +