Alpha/Baker: major æsthetics

- decompose long functions into small sub-functions
- add comments
- factor some code
- polish out some unecessary bits and bobs
This commit is contained in:
Raphaël Proust 2018-06-12 17:10:52 +08:00 committed by Grégoire Henry
parent 397d011ed9
commit 34aeaadb73

View File

@ -21,6 +21,7 @@ let generate_seed_nonce () =
| Error _ -> assert false | Error _ -> assert false
| Ok nonce -> nonce | Ok nonce -> nonce
let forge_block_header let forge_block_header
(cctxt : #Proto_alpha.full) (cctxt : #Proto_alpha.full)
?(chain = `Main) block delegate_sk shell priority seed_nonce_hash = ?(chain = `Main) block delegate_sk shell priority seed_nonce_hash =
@ -42,10 +43,12 @@ let forge_block_header
loop () in loop () in
loop () loop ()
let empty_proof_of_work_nonce = let empty_proof_of_work_nonce =
MBytes.of_string MBytes.of_string
(String.make Constants_repr.proof_of_work_nonce_size '\000') (String.make Constants_repr.proof_of_work_nonce_size '\000')
let forge_faked_protocol_data ~priority ~seed_nonce_hash = let forge_faked_protocol_data ~priority ~seed_nonce_hash =
Alpha_context.Block_header.{ Alpha_context.Block_header.{
contents = { priority ; seed_nonce_hash ; contents = { priority ; seed_nonce_hash ;
@ -53,6 +56,7 @@ let forge_faked_protocol_data ~priority ~seed_nonce_hash =
signature = Signature.zero signature = Signature.zero
} }
let assert_valid_operations_hash shell_header operations = let assert_valid_operations_hash shell_header operations =
let operations_hash = let operations_hash =
Operation_list_list_hash.compute Operation_list_list_hash.compute
@ -62,9 +66,8 @@ let assert_valid_operations_hash shell_header operations =
fail_unless fail_unless
(Operation_list_list_hash.equal (Operation_list_list_hash.equal
operations_hash shell_header.Tezos_base.Block_header.operations_hash) operations_hash shell_header.Tezos_base.Block_header.operations_hash)
(failure (failure "Client_baking_forge.inject_block: inconsistent header.")
"Client_baking_forge.inject_block: \
inconsistent header.")
let inject_block cctxt let inject_block cctxt
?force ?(chain = `Main) ?force ?(chain = `Main)
@ -100,6 +103,7 @@ let () =
| _ -> None) | _ -> None)
(fun (hash, err) -> Failed_to_preapply (hash, err)) (fun (hash, err) -> Failed_to_preapply (hash, err))
let classify_operations (ops: Proto_alpha.operation list) = let classify_operations (ops: Proto_alpha.operation list) =
let t = Array.make (List.length Proto_alpha.Main.validation_passes) [] in let t = Array.make (List.length Proto_alpha.Main.validation_passes) [] in
List.iter List.iter
@ -110,6 +114,7 @@ let classify_operations (ops: Proto_alpha.operation list) =
ops ; ops ;
Array.fold_right (fun ops acc -> List.rev ops :: acc) t [] Array.fold_right (fun ops acc -> List.rev ops :: acc) t []
let parse (op : Operation.raw) : Operation.packed = let parse (op : Operation.raw) : Operation.packed =
let protocol_data = let protocol_data =
Data_encoding.Binary.of_bytes_exn Data_encoding.Binary.of_bytes_exn
@ -128,30 +133,30 @@ let forge (op : Operation.packed) : Operation.raw = {
op.protocol_data op.protocol_data
} }
let all_operations (ops : Alpha_block_services.Mempool.t) = let ops_of_mempool (ops : Alpha_block_services.Mempool.t) =
List.map (fun (_, op) -> op) ops.applied @ List.map (fun (_, op) -> op) ops.applied @
Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.refused [] @ Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.refused [] @
Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_refused [] @ Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_refused [] @
Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_delayed [] @ Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_delayed [] @
Operation_hash.Map.fold (fun _ op acc -> op :: acc) ops.unprocessed [] Operation_hash.Map.fold (fun _ op acc -> op :: acc) ops.unprocessed []
let forge_block cctxt ?(chain = `Main) block let unopt_operations cctxt chain = function
?force
?operations ?(best_effort = operations = None) ?(sort = best_effort)
?timestamp
~priority
?seed_nonce_hash ~src_sk () =
begin
match operations with
| None -> | None ->
Alpha_block_services.Mempool.pending_operations Alpha_block_services.Mempool.pending_operations cctxt ~chain () >>=? fun mpool ->
cctxt ~chain () >>=? fun ops -> let ops = ops_of_mempool mpool in
return (all_operations ops) return ops
| Some operations -> | Some operations ->
return operations return operations
end >>=? fun operations ->
begin let all_ops_valid (results: error Preapply_result.t list) =
match priority with let open Operation_hash.Map in
List.for_all (fun (result: error Preapply_result.t) ->
is_empty result.refused
&& is_empty result.branch_refused
&& is_empty result.branch_delayed)
results
let decode_priority cctxt chain block = function
| `Set priority -> begin | `Set priority -> begin
Alpha_services.Delegate.Baking_rights.get cctxt Alpha_services.Delegate.Baking_rights.get cctxt
~all:true ~max_priority:(priority+1) (chain, block) >>=? fun rights -> ~all:true ~max_priority:(priority+1) (chain, block) >>=? fun rights ->
@ -173,16 +178,13 @@ let forge_block cctxt ?(chain = `Main) block
let { Alpha_services.Delegate.Baking_rights.priority = prio ; let { Alpha_services.Delegate.Baking_rights.priority = prio ;
timestamp = time } = timestamp = time } =
List.find List.find
(fun (p : Alpha_services.Delegate.Baking_rights.t) -> (fun p -> p.Alpha_services.Delegate.Baking_rights.level = level)
p.level = level)
possibilities in possibilities in
return (prio, time) return (prio, time)
with Not_found -> with Not_found ->
failwith "No slot found at level %a" Raw_level.pp level failwith "No slot found at level %a" Raw_level.pp level
end >>=? fun (priority, minimal_timestamp) ->
(* lwt_log_info "Baking block at level %a prio %d" *) let unopt_timestamp timestamp minimal_timestamp =
(* Raw_level.pp level priority >>= fun () -> *)
begin
match timestamp, minimal_timestamp with match timestamp, minimal_timestamp with
| None, None -> return (Time.now ()) | None, None -> return (Time.now ())
| None, Some timestamp -> return timestamp | None, Some timestamp -> return timestamp
@ -195,79 +197,82 @@ let forge_block cctxt ?(chain = `Main) block
Time.pp_hum minimal_timestamp Time.pp_hum minimal_timestamp
else else
return timestamp return timestamp
end >>=? fun timestamp ->
let request = List.length operations in let merge_preapps (old: error Preapply_result.t) (neu: error Preapply_result.t) =
let protocol_data = forge_faked_protocol_data ~priority ~seed_nonce_hash in let merge _ a b = (* merge ops *)
let operations = classify_operations operations in
Alpha_block_services.Helpers.Preapply.block
cctxt ~block ~timestamp ~sort ~protocol_data operations >>=?
fun (shell_header, result) ->
let valid =
List.fold_left
(fun acc r -> acc + List.length r.Preapply_result.applied)
0 result in
lwt_log_info "Found %d valid operations (%d refused) for timestamp %a"
valid (request - valid)
Time.pp_hum timestamp >>= fun () ->
lwt_log_info "Computed fitness %a"
Fitness.pp shell_header.fitness >>= fun () ->
if best_effort
|| List.for_all (fun l ->
Operation_hash.Map.is_empty l.Preapply_result.refused
&& Operation_hash.Map.is_empty l.branch_refused
&& Operation_hash.Map.is_empty l.branch_delayed )
result
then
let operations =
if not best_effort then
List.map (List.map forge) operations
else
List.map (fun l -> List.map snd l.Preapply_result.applied) result in
inject_block cctxt
?force ~chain ~shell_header ~priority ?seed_nonce_hash ~src_sk
operations
else
let result =
let merge old neu =
let open Preapply_result in
let merge _key a b =
match a, b with match a, b with
| None, None -> None | None, None -> None
| Some x, None -> Some x | Some x, None -> Some x
| _, Some y -> Some y in | _, Some y -> Some y in
{ applied = [] ; let merge = Operation_hash.Map.merge merge in (* merge op maps *)
refused = (* merge preapplies *)
Operation_hash.Map.merge merge { Preapply_result.applied = [] ;
old.refused refused = merge old.refused neu.refused ;
neu.refused ; branch_refused = merge old.branch_refused neu.branch_refused ;
branch_refused = branch_delayed = merge old.branch_delayed neu.branch_delayed }
Operation_hash.Map.merge merge
old.branch_refused let error_of_op (result: error Preapply_result.t) op =
neu.branch_refused ;
branch_delayed =
Operation_hash.Map.merge merge
old.branch_delayed
neu.branch_delayed } in
List.fold_left merge Preapply_result.empty result in
Lwt.return_error @@
List.filter_map
(fun op ->
let op = forge op in let op = forge op in
let h = Tezos_base.Operation.hash op in let h = Tezos_base.Operation.hash op in
try Some (Failed_to_preapply try Some (Failed_to_preapply (op, snd @@ Operation_hash.Map.find h result.refused))
(op, snd @@ Operation_hash.Map.find h result.refused))
with Not_found -> with Not_found ->
try Some (Failed_to_preapply try Some (Failed_to_preapply (op, snd @@ Operation_hash.Map.find h result.branch_refused))
(op, snd @@ Operation_hash.Map.find h result.branch_refused))
with Not_found -> with Not_found ->
try Some (Failed_to_preapply try Some (Failed_to_preapply (op, snd @@ Operation_hash.Map.find h result.branch_delayed))
(op, snd @@ Operation_hash.Map.find h result.branch_delayed)) with Not_found -> None
with Not_found -> None)
(List.concat operations)
let forge_block cctxt ?(chain = `Main) block
?force
?operations ?(best_effort = operations = None) ?(sort = best_effort)
?timestamp
~priority
?seed_nonce_hash ~src_sk () =
(* making the arguments usable *)
unopt_operations cctxt chain operations >>=? fun operations_arg ->
decode_priority cctxt chain block priority >>=? fun (priority, minimal_timestamp) ->
unopt_timestamp timestamp minimal_timestamp >>=? fun timestamp ->
(* get basic building blocks *)
let protocol_data = forge_faked_protocol_data ~priority ~seed_nonce_hash in
let operations = classify_operations operations_arg in
Alpha_block_services.Helpers.Preapply.block
cctxt ~block ~timestamp ~sort ~protocol_data operations >>=? fun (shell_header, result) ->
(* now for some logging *)
let total_op_count = List.length operations_arg in
let valid_op_count =
List.fold_left
(fun acc r -> acc + List.length r.Preapply_result.applied)
0 result in
lwt_log_info "Found %d valid operations (%d refused) for timestamp %a"
valid_op_count (total_op_count - valid_op_count)
Time.pp_hum timestamp >>= fun () ->
lwt_log_info "Computed fitness %a"
Fitness.pp shell_header.fitness >>= fun () ->
(* everything went well (or we don't care about errors): GO! *)
if best_effort || all_ops_valid result then
let operations =
if best_effort then
List.map (fun l -> List.map snd l.Preapply_result.applied) result
else
List.map (List.map forge) operations in
inject_block cctxt
?force ~chain ~shell_header ~priority ?seed_nonce_hash ~src_sk
operations
(* some errors (and we care about them) *)
else
let result = List.fold_left merge_preapps Preapply_result.empty result in
Lwt.return_error @@
List.filter_map (error_of_op result) (List.concat operations)
(** Worker *) (** Worker *)
module State : sig module State : sig
(* TODO: only [record_block] is ever used, and only once. Simplify. *)
val get_block: val get_block:
#Client_context.wallet -> #Client_context.wallet ->
@ -478,12 +483,12 @@ let insert_blocks cctxt ?max_priority state bi =
| Error err -> | Error err ->
lwt_log_error "Error: %a" pp_print_error err lwt_log_error "Error: %a" pp_print_error err
let bake (cctxt : #Proto_alpha.full) state = let bake_slot
let slots = pop_baking_slots state in cctxt
let seed_nonce = generate_seed_nonce () in state
let seed_nonce_hash = Nonce.hash seed_nonce in seed_nonce_hash
filter_map_s (timestamp, (bi, priority, delegate)) (* baking slot *)
(fun (timestamp, (bi, priority, delegate)) -> =
let chain = `Hash bi.Client_baking_blocks.chain_id in let chain = `Hash bi.Client_baking_blocks.chain_id in
let block = `Hash (bi.hash, 0) in let block = `Hash (bi.hash, 0) in
Alpha_services.Helpers.current_level cctxt Alpha_services.Helpers.current_level cctxt
@ -496,11 +501,14 @@ let bake (cctxt : #Proto_alpha.full) state =
Client_keys.Public_key_hash.name cctxt delegate >>=? fun name -> Client_keys.Public_key_hash.name cctxt delegate >>=? fun name ->
lwt_debug "Try baking after %a (slot %d) for %s (%a)" lwt_debug "Try baking after %a (slot %d) for %s (%a)"
Block_hash.pp_short bi.hash Block_hash.pp_short bi.hash
priority name Time.pp_hum timestamp >>= fun () -> priority
Alpha_block_services.Mempool.pending_operations name
cctxt ~chain () >>=? fun ops -> Time.pp_hum timestamp >>= fun () ->
let operations = all_operations ops in
let request = List.length operations in (* get and process operations *)
Alpha_block_services.Mempool.pending_operations cctxt ~chain () >>=? fun mpool ->
let operations = ops_of_mempool mpool in
let total_op_count = List.length operations in
let seed_nonce_hash = let seed_nonce_hash =
if next_level.expected_commitment then if next_level.expected_commitment then
Some seed_nonce_hash Some seed_nonce_hash
@ -525,75 +533,99 @@ let bake (cctxt : #Proto_alpha.full) state =
~pp_sep:(fun ppf () -> Format.fprintf ppf "+") ~pp_sep:(fun ppf () -> Format.fprintf ppf "+")
(fun ppf operations -> Format.fprintf ppf "%d" (List.length operations.Preapply_result.applied))) (fun ppf operations -> Format.fprintf ppf "%d" (List.length operations.Preapply_result.applied)))
operations operations
request total_op_count
Fitness.pp shell_header.fitness >>= fun () -> Fitness.pp shell_header.fitness >>= fun () ->
let operations = let operations =
List.map (fun l -> List.map snd l.Preapply_result.applied) operations in List.map (fun l -> List.map snd l.Preapply_result.applied) operations in
return return
(Some (bi, priority, shell_header, operations, delegate, seed_nonce_hash))) (Some (bi, priority, shell_header, operations, delegate, seed_nonce_hash))
slots >>=? fun candidates ->
let candidates = let fittest
List.sort (_, _, (h1: Block_header.shell_header), _, _, _)
(fun (_,_,h1,_,_,_) (_,_,h2,_,_,_) -> (_, _, (h2: Block_header.shell_header), _, _, _) =
match match Fitness.compare h1.fitness h2.fitness with
Fitness.compare h1.Tezos_base.Block_header.fitness h2.fitness | 0 -> Time.compare h1.timestamp h2.timestamp
with | cmp -> ~- cmp
| 0 ->
Time.compare h1.timestamp h2.timestamp let fit_enough (state: state) (shell_header: Block_header.shell_header) =
| cmp -> ~- cmp) Fitness.compare state.best.fitness shell_header.fitness < 0
candidates in || (Fitness.compare state.best.fitness shell_header.fitness = 0
&& Time.compare shell_header.timestamp state.best.timestamp < 0)
let record_nonce_hash cctxt level block_hash seed_nonce seed_nonce_hash =
if seed_nonce_hash <> None then
State.record_block cctxt level block_hash seed_nonce
|> trace_exn (Failure "Error while recording block")
else
return ()
let pp_operation_list_list =
Format.pp_print_list
~pp_sep:(fun ppf () -> Format.fprintf ppf "+")
(fun ppf operations -> Format.fprintf ppf "%d" (List.length operations))
(* [bake] create a single block when woken up to do so. All the necessary
information (e.g., slot) is available in the [state]. *)
let bake (cctxt : #Proto_alpha.full) state =
let slots = pop_baking_slots state in
let seed_nonce = generate_seed_nonce () in
let seed_nonce_hash = Nonce.hash seed_nonce in
(* baking for each slot *)
filter_map_s (bake_slot cctxt state seed_nonce_hash) slots >>=? fun candidates ->
(* selecting the candidate baked block *)
let candidates = List.sort fittest candidates in
match candidates with match candidates with
| (bi, priority, shell_header, operations, delegate, seed_nonce_hash) :: _ | (bi, priority, shell_header, operations, delegate, seed_nonce_hash) :: _
when Fitness.compare state.best.fitness shell_header.fitness < 0 || when fit_enough state shell_header -> begin
(Fitness.compare state.best.fitness shell_header.fitness = 0 &&
Time.compare shell_header.timestamp state.best.timestamp < 0) -> begin
let level = Raw_level.succ bi.level.level in let level = Raw_level.succ bi.level.level in
cctxt#message cctxt#message
"Select candidate block after %a (slot %d) fitness: %a" "Select candidate block after %a (slot %d) fitness: %a"
Block_hash.pp_short bi.hash priority Block_hash.pp_short bi.hash priority
Fitness.pp shell_header.fitness >>= fun () -> Fitness.pp shell_header.fitness >>= fun () ->
(* core function *)
Client_keys.get_key cctxt delegate >>=? fun (_,_,src_sk) -> Client_keys.get_key cctxt delegate >>=? fun (_,_,src_sk) ->
let chain = `Hash bi.Client_baking_blocks.chain_id in let chain = `Hash bi.Client_baking_blocks.chain_id in
inject_block cctxt inject_block cctxt
~force:true ~chain ~force:true ~chain
~shell_header ~priority ?seed_nonce_hash ~src_sk ~shell_header ~priority ?seed_nonce_hash ~src_sk
operations operations
(* /core function; back to logging and info *)
|> trace_exn (Failure "Error while injecting block") >>=? fun block_hash -> |> trace_exn (Failure "Error while injecting block") >>=? fun block_hash ->
begin record_nonce_hash cctxt level block_hash seed_nonce seed_nonce_hash >>=? fun () ->
if seed_nonce_hash <> None then
State.record_block cctxt level block_hash seed_nonce
|> trace_exn (Failure "Error while recording block")
else
return ()
end >>=? fun () ->
Client_keys.Public_key_hash.name cctxt delegate >>=? fun name -> Client_keys.Public_key_hash.name cctxt delegate >>=? fun name ->
cctxt#message cctxt#message
"Injected block %a for %s after %a \ "Injected block %a for %s after %a (level %a, slot %d, fitness %a, operations %a)"
\ (level %a, slot %d, fitness %a, operations %a)"
Block_hash.pp_short block_hash Block_hash.pp_short block_hash
name name
Block_hash.pp_short bi.hash Block_hash.pp_short bi.hash
Raw_level.pp level priority Raw_level.pp level priority
Fitness.pp shell_header.fitness Fitness.pp shell_header.fitness
(Format.pp_print_list pp_operation_list_list operations >>= fun () ->
~pp_sep:(fun ppf () -> Format.fprintf ppf "+")
(fun ppf operations -> Format.fprintf ppf "%d" (List.length operations)))
operations >>= fun () ->
return () return ()
end end
| _ -> | _ ->
lwt_debug "No valid candidates." >>= fun () -> lwt_debug "No valid candidates." >>= fun () ->
return () return ()
(* [create] starts the main loop of the baker. The loop monitors new blocks and
starts individual baking operations when baking-slots are available to any of
the [delegates] *)
let create let create
(cctxt : #Proto_alpha.full) ?max_priority delegates (cctxt : #Proto_alpha.full)
(block_stream: ?max_priority
Client_baking_blocks.block_info tzresult Lwt_stream.t) = (delegates: public_key_hash list)
Lwt_stream.get block_stream >>= function (block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t)
| None | Some (Error _) -> (bi: Client_baking_blocks.block_info) =
cctxt#error "Can't fetch the current block head."
| Some (Ok bi) ->
Shell_services.Blocks.hash cctxt ~block:`Genesis () >>=? fun genesis_hash -> Shell_services.Blocks.hash cctxt ~block:`Genesis () >>=? fun genesis_hash ->
(* statefulness *)
let last_get_block = ref None in let last_get_block = ref None in
let get_block () = let get_block () =
match !last_get_block with match !last_get_block with
@ -604,34 +636,62 @@ let create
| Some t -> t in | Some t -> t in
let state = create_state genesis_hash delegates bi in let state = create_state genesis_hash delegates bi in
insert_blocks cctxt ?max_priority state bi >>= fun () -> insert_blocks cctxt ?max_priority state bi >>= fun () ->
(* main loop *)
let rec worker_loop () = let rec worker_loop () =
begin
(* event construction *)
let timeout = compute_timeout state in let timeout = compute_timeout state in
Lwt.choose [ (timeout >|= fun () -> `Timeout) ; Lwt.choose [ (timeout >|= fun () -> `Timeout) ;
(get_block () >|= fun b -> `Hash b) ; (get_block () >|= fun b -> `Hash b) ;
] >>= function ] >>= function
(* event matching *)
| `Hash (None | Some (Error _)) -> | `Hash (None | Some (Error _)) ->
(* return to restart *)
Lwt.return_unit Lwt.return_unit
| `Hash (Some (Ok bi)) -> begin | `Hash (Some (Ok bi)) -> begin
(* new block: cancel everything and bake on the new head *)
Lwt.cancel timeout ; Lwt.cancel timeout ;
last_get_block := None ; last_get_block := None ;
lwt_debug lwt_debug
"Discoverered block: %a" "Discoverered block: %a"
Block_hash.pp_short bi.Client_baking_blocks.hash >>= fun () -> Block_hash.pp_short bi.Client_baking_blocks.hash >>= fun () ->
insert_blocks cctxt ?max_priority state bi >>= fun () -> insert_blocks cctxt ?max_priority state bi
worker_loop ()
end end
| `Timeout -> | `Timeout ->
(* main event: it's baking time *)
lwt_debug "Waking up for baking..." >>= fun () -> lwt_debug "Waking up for baking..." >>= fun () ->
begin begin
(* core functionality *)
bake cctxt state >>= function bake cctxt state >>= function
| Ok () -> Lwt.return_unit | Ok () -> Lwt.return_unit
| Error errs -> | Error errs -> lwt_log_error "Error while baking:@\n%a" pp_print_error errs
lwt_log_error "Error while baking:@\n%a" end
pp_print_error
errs >>= fun () ->
Lwt.return_unit
end >>= fun () -> end >>= fun () ->
(* and restart *)
worker_loop () in worker_loop () in
(* ignition *)
lwt_log_info "Starting baking daemon" >>= fun () -> lwt_log_info "Starting baking daemon" >>= fun () ->
worker_loop () >>= fun () -> worker_loop ()
return ()
(* Wrapper around previous [create] function that handles the case of
unavailable blocks (empty block chain). *)
let create
(cctxt : #Proto_alpha.full)
?max_priority
(delegates: public_key_hash list)
(block_stream: Client_baking_blocks.block_info tzresult Lwt_stream.t) =
Lwt_stream.get block_stream >>= function
| None | Some (Error _) ->
cctxt#error "Can't fetch the current block head."
| Some (Ok bi) ->
create
cctxt ?max_priority delegates
block_stream bi