Client/Endorser: remove endorsing scheduler

This commit is contained in:
Mathias 2018-05-23 16:29:19 +02:00 committed by Grégoire Henry
parent 852acc4710
commit fcd29a36f3
2 changed files with 118 additions and 132 deletions

View File

@ -123,25 +123,35 @@ let forge_endorsement (cctxt : #Proto_alpha.full)
Alpha_block_services.metadata cctxt Alpha_block_services.metadata cctxt
~chain ~block () >>=? fun { protocol_data = { level = { level } } } -> ~chain ~block () >>=? fun { protocol_data = { level = { level } } } ->
check_endorsement cctxt level src_pkh >>=? fun () -> check_endorsement cctxt level src_pkh >>=? fun () ->
begin previously_endorsed_level cctxt src_pkh level >>=? function
match slots with | true ->
| Some slots -> return slots cctxt#error "Level %a : previously endorsed."
| None -> Raw_level.pp level
get_signing_slots | false ->
cctxt ~chain block src_pkh level >>=? function begin
| [] -> cctxt#error "No slot found at level %a" Raw_level.pp level match slots with
| slots -> return slots | Some slots -> return slots
end >>=? fun slots -> | None ->
check_endorsement cctxt level src_pkh >>=? fun () -> get_signing_slots
inject_endorsement cctxt ~chain ?async block level src_sk slots src_pkh cctxt ~chain block src_pkh level >>=? function
| [] -> cctxt#error "No slot found at level %a" Raw_level.pp level
| slots -> return slots
end >>=? fun slots ->
inject_endorsement cctxt ~chain ?async block level src_sk slots src_pkh >>=? fun oph ->
Client_keys.get_key cctxt src_pkh >>=? fun (name, _pk, _sk) ->
cctxt#message
"Injected endorsement level %a, contract %s '%a'"
Raw_level.pp level
name
Operation_hash.pp_short oph >>=
fun () -> return oph
(** Worker *) (** Worker *)
type state = { type state = {
delegates: public_key_hash list ; delegates: public_key_hash list ;
mutable best: Client_baking_blocks.block_info ;
mutable to_endorse: endorsement list ;
delay: int64; delay: int64;
mutable to_endorse : endorsement option
} }
and endorsement = { and endorsement = {
time: Time.t ; time: Time.t ;
@ -150,11 +160,10 @@ and endorsement = {
slots: int list; slots: int list;
} }
let create_state delegates best delay = let create_state delegates delay =
{ delegates ; { delegates ;
best ;
to_endorse = [] ;
delay ; delay ;
to_endorse = None ;
} }
let rec insert ({time} as e) = function let rec insert ({time} as e) = function
@ -171,13 +180,29 @@ let get_delegates cctxt state =
| _ :: _ as delegates -> | _ :: _ as delegates ->
return delegates return delegates
let drop_old_endorsement ~before state = let endorse_for cctxt = function
state.to_endorse <- None -> return ()
List.filter | Some {delegate; block ; slots} ->
(fun { block } -> Fitness.compare before block.fitness <= 0) let hash = block.hash in
state.to_endorse let b = `Hash (hash, 0) in
let level = block.level.level in
Client_keys.get_key cctxt delegate >>=? fun (name, _pk, sk) ->
lwt_debug "Endorsing %a for %s (level %a using %d slots)!"
Block_hash.pp_short hash name
Raw_level.pp level
(List.length slots) >>= fun () ->
inject_endorsement cctxt
b level
sk slots delegate >>=? fun oph ->
lwt_log_info
"Injected endorsement for block '%a' \
(level %a, contract %s) '%a'"
Block_hash.pp_short hash
Raw_level.pp level
name
Operation_hash.pp_short oph >>= return
let schedule_endorsements (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state bis = let prepare_endorsement (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state bis =
let may_endorse (block: Client_baking_blocks.block_info) delegate time = let may_endorse (block: Client_baking_blocks.block_info) delegate time =
Client_keys.Public_key_hash.name cctxt delegate >>=? fun name -> Client_keys.Public_key_hash.name cctxt delegate >>=? fun name ->
lwt_log_info "May endorse block %a for %s" lwt_log_info "May endorse block %a for %s"
@ -185,59 +210,21 @@ let schedule_endorsements (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state b
let b = `Hash (block.hash, 0) in let b = `Hash (block.hash, 0) in
let level = block.level.level in let level = block.level.level in
get_signing_slots cctxt b delegate level >>=? fun slots -> get_signing_slots cctxt b delegate level >>=? fun slots ->
lwt_debug "Found %d slots for %a/%s" lwt_debug "Found slots for %a/%s (%d)"
(List.length slots) Block_hash.pp_short block.hash name >>= fun () -> Block_hash.pp_short block.hash name (List.length slots) >>= fun () ->
previously_endorsed_level cctxt delegate level >>=? function previously_endorsed_level cctxt delegate level >>=? function
| true -> | true ->
lwt_debug "Level %a : previously endorsed." lwt_debug "Level %a : previously endorsed."
Raw_level.pp level >>= return Raw_level.pp level >>= return
| false -> | false ->
if Fitness.compare state.best.fitness block.fitness < 0 then begin return
state.best <- block ; (match state.to_endorse with
drop_old_endorsement ~before:block.fitness state ; | None ->
end ; state.to_endorse <- Some {time ; delegate ; block; slots}
begin try | Some old ->
let same_slot endorsement = if Fitness.compare old.block.fitness block.fitness < 0 then
endorsement.block.level = block.level && endorsement.slots = slots in state.to_endorse <- Some {time ; delegate ; block; slots})
let old = List.find same_slot state.to_endorse in
if Fitness.compare old.block.fitness block.fitness < 0
then begin
lwt_log_info
"Schedule endorsement for block %a \
(level %a, slots { %a }, time %a) (replace block %a)"
Block_hash.pp_short block.hash
Raw_level.pp level
(Format.pp_print_list Format.pp_print_int) slots
Time.pp_hum time
Block_hash.pp_short old.block.hash
>>= fun () ->
state.to_endorse <-
insert
{ time ; delegate ; block ; slots }
(List.filter
(fun e -> not (same_slot e))
state.to_endorse) ;
return ()
end else begin
lwt_debug
"slot { %a } : better pending endorsement"
(Format.pp_print_list Format.pp_print_int) slots >>= fun () ->
return ()
end
with Not_found ->
lwt_log_info
"Schedule endorsement for block %a \
(level %a, slot { %a }, time %a)"
Block_hash.pp_short block.hash
Raw_level.pp level
(Format.pp_print_list Format.pp_print_int) slots
Time.pp_hum time >>= fun () ->
state.to_endorse <-
insert { time ; delegate ; block ; slots } state.to_endorse ;
return ()
end
in in
let time = Time.(add (now ()) state.delay) in
get_delegates cctxt state >>=? fun delegates -> get_delegates cctxt state >>=? fun delegates ->
iter_p iter_p
(fun delegate -> (fun delegate ->
@ -250,67 +237,75 @@ let schedule_endorsements (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state b
lwt_log_info "Ignore block %a: forged too far the past" lwt_log_info "Ignore block %a: forged too far the past"
Block_hash.pp_short bi.hash >>= return Block_hash.pp_short bi.hash >>= return
else else
may_endorse bi delegate time) let time = Time.(add (now ()) state.delay) in
bis) may_endorse bi delegate time
) bis
)
delegates delegates
let schedule_endorsements (cctxt : #Proto_alpha.full) ~max_past state bis = (* let endorse (cctxt : #Proto_alpha.full) ~(max_past:Time.t) state bis =
schedule_endorsements cctxt ~max_past state bis >>= function * let may_endorse (block: Client_baking_blocks.block_info) delegate =
| Error exns -> * Client_keys.Public_key_hash.name cctxt delegate >>=? fun name ->
lwt_log_error * lwt_log_info "May endorse block %a for %s"
"@[<v 2>Error(s) while scheduling endorsements@,%a@]" * Block_hash.pp_short block.hash name >>= fun () ->
pp_print_error exns * let b = `Hash (block.hash, 0) in
| Ok () -> Lwt.return_unit * let level = block.level.level in
* get_signing_slots cctxt b delegate level >>=? fun slots ->
let pop_endorsements state = * lwt_debug "Found %d slots for %a/%s"
let now = Time.now () in * (List.length slots) Block_hash.pp_short block.hash name >>= fun () ->
let rec pop acc = function * previously_endorsed_level cctxt delegate level >>=? function
| [] -> List.rev acc, [] * | true ->
| {time} :: _ as slots when Time.compare now time <= 0 -> * lwt_debug "Level %a : previously endorsed."
List.rev acc, slots * Raw_level.pp level >>= fun () ->
| slot :: slots -> pop (slot :: acc) slots in * return []
let to_endorse, future_endorsement = pop [] state.to_endorse in * | false ->
state.to_endorse <- future_endorsement ; * return slots
to_endorse * in
* get_delegates cctxt state >>=? fun delegates ->
let endorse cctxt state = * iter_p
let to_endorse = pop_endorsements state in * (fun delegate ->
iter_p (fun { time = _ ; block ; slots ; delegate } -> * iter_p
let hash = block.hash in * (fun (bi : Client_baking_blocks.block_info) ->
let b = `Hash (hash, 0) in * if Time.compare bi.timestamp (Time.now ()) > 0 then
let level = block.level.level in * lwt_log_info "Ignore block %a: forged in the future"
Client_keys.get_key cctxt delegate >>=? fun (name, pk, sk) -> * Block_hash.pp_short bi.hash >>= return
let pkh = Signature.Public_key.hash pk in * else if Time.(min (now ()) bi.timestamp > max_past) then
lwt_debug "Endorsing %a for %s (slots : { %a } )!" * lwt_log_info "Ignore block %a: forged too far the past"
Block_hash.pp_short block.hash name * Block_hash.pp_short bi.hash >>= return
(Format.pp_print_list Format.pp_print_int) slots >>= fun () -> * else
inject_endorsement cctxt b block.level.level sk slots pkh >>=? fun oph -> * may_endorse bi delegate >>=? function
cctxt#message * | [] ->
"Injected endorsement for block '%a' \ * return ()
(level %a, slots { %a }, contract %s) '%a'" * | slots ->
Block_hash.pp_short hash * endorse_for cctxt delegate bi slots )
Raw_level.pp level * bis)
(Format.pp_print_list Format.pp_print_int) slots name * delegates *)
Operation_hash.pp_short oph >>= fun () -> return ()
) to_endorse
let compute_timeout state = let compute_timeout state =
match state.to_endorse with match state.to_endorse with
| [] -> Lwt_utils.never_ending | None -> Lwt_utils.never_ending
| {time} :: _ -> | Some {time} ->
let delay = (Time.diff time (Time.now ())) in let delay = (Time.diff time (Time.now ())) in
if delay <= 0L then if delay <= 0L then
Lwt.return_unit Lwt.return_unit
else else
Lwt_unix.sleep (Int64.to_float delay) Lwt_unix.sleep (Int64.to_float delay)
let check_error f =
f >>= function
| Ok () -> Lwt.return_unit
| Error errs ->
lwt_log_error "Error while endorsing:@\n%a"
pp_print_error
errs >>= fun () ->
Lwt.return_unit
let create (cctxt : #Proto_alpha.full) ?(max_past=(Time.of_seconds 110L)) ~delay contracts block_stream = let create (cctxt : #Proto_alpha.full) ?(max_past=(Time.of_seconds 110L)) ~delay contracts block_stream =
lwt_log_info "Starting endorsement daemon" >>= fun () -> lwt_log_info "Starting endorsement daemon" >>= fun () ->
Lwt_stream.get block_stream >>= function Lwt_stream.get block_stream >>= function
| None | Some (Error _) -> | None | Some (Ok []) | Some (Error _) ->
cctxt#error "Can't fetch the current block head." cctxt#error "Can't fetch the current block head."
| Some (Ok head) -> | Some (Ok initial_heads) ->
let last_get_block = ref None in let last_get_block = ref None in
let get_block () = let get_block () =
match !last_get_block with match !last_get_block with
@ -319,29 +314,20 @@ let create (cctxt : #Proto_alpha.full) ?(max_past=(Time.of_seconds 110L)) ~delay
last_get_block := Some t ; last_get_block := Some t ;
t t
| Some t -> t in | Some t -> t in
let state = create_state contracts head (Int64.of_int delay) in let state = create_state contracts (Int64.of_int delay) in
let rec worker_loop () = let rec worker_loop () =
let timeout = compute_timeout state in let timeout = compute_timeout state in
Lwt.choose [ (timeout >|= fun () -> `Timeout) ; Lwt.choose [ (timeout >|= fun () -> `Timeout) ;
(get_block () >|= fun b -> `Hash b) ] >>= function (get_block () >|= fun b -> `Hash b) ] >>= function
| `Hash (None | Some (Error _)) -> | `Hash (None | Some (Error _)) ->
Lwt.return_unit Lwt.return_unit
| `Hash (Some (Ok bi)) -> | `Hash (Some (Ok bis)) ->
Lwt.cancel timeout ; Lwt.cancel timeout;
last_get_block := None ; last_get_block := None ;
schedule_endorsements cctxt ~max_past state [ bi ] >>= fun () -> check_error (prepare_endorsement cctxt ~max_past state bis) >>= fun () ->
worker_loop () worker_loop ()
| `Timeout -> | `Timeout ->
begin check_error (endorse_for cctxt state.to_endorse) >>= fun () ->
endorse cctxt state >>= function worker_loop () in
| Ok () -> Lwt.return_unit check_error (prepare_endorsement cctxt ~max_past state initial_heads) >>= fun () ->
| Error errs ->
lwt_log_error "Error while endorsing:@\n%a"
pp_print_error
errs >>= fun () ->
Lwt.return_unit
end >>= fun () ->
worker_loop ()
in
schedule_endorsements cctxt ~max_past state [ head ] >>= fun () ->
worker_loop () worker_loop ()

View File

@ -25,4 +25,4 @@ val create :
?max_past:Time.t -> ?max_past:Time.t ->
delay:int -> delay:int ->
public_key_hash list -> public_key_hash list ->
Client_baking_blocks.block_info tzresult Lwt_stream.t -> unit Lwt.t Client_baking_blocks.block_info list tzresult Lwt_stream.t -> unit Lwt.t