Client/Endorser: simpler state with lock
This commit is contained in:
parent
abc7b7338c
commit
d33568464a
@ -16,70 +16,58 @@ module State : sig
|
||||
|
||||
val get_endorsement:
|
||||
#Client_context.wallet ->
|
||||
Raw_level.t ->
|
||||
int ->
|
||||
(Block_hash.t * Operation_hash.t) option tzresult Lwt.t
|
||||
Signature.Public_key_hash.t ->
|
||||
Raw_level.t option tzresult Lwt.t
|
||||
|
||||
val record_endorsement:
|
||||
#Client_context.wallet ->
|
||||
Signature.Public_key_hash.t ->
|
||||
Raw_level.t ->
|
||||
Block_hash.t ->
|
||||
int -> Operation_hash.t -> unit tzresult Lwt.t
|
||||
unit tzresult Lwt.t
|
||||
|
||||
end = struct
|
||||
|
||||
module LevelMap = Map.Make(Raw_level)
|
||||
type t = (string * Raw_level.t) list
|
||||
|
||||
type t = (int * Block_hash.t * Operation_hash.t) list LevelMap.t
|
||||
let encoding : t Data_encoding.t =
|
||||
let open Data_encoding in
|
||||
conv
|
||||
(fun x -> LevelMap.bindings x)
|
||||
(fun l ->
|
||||
List.fold_left
|
||||
(fun x (y, z) -> LevelMap.add y z x)
|
||||
LevelMap.empty l)
|
||||
(list (obj2
|
||||
(req "level" Raw_level.encoding)
|
||||
(req "endorsement"
|
||||
(list (obj3
|
||||
(req "slot" int31)
|
||||
(req "block" Block_hash.encoding)
|
||||
(req "operation" Operation_hash.encoding))))))
|
||||
Data_encoding.(
|
||||
list (obj2
|
||||
(req "delegate" string)
|
||||
(req "last_level" Raw_level.encoding)
|
||||
))
|
||||
|
||||
let name =
|
||||
"endorsements"
|
||||
|
||||
let load (wallet : #Client_context.wallet) =
|
||||
wallet#load name encoding ~default:LevelMap.empty
|
||||
wallet#load name encoding ~default:[]
|
||||
|
||||
let save (wallet : #Client_context.wallet) map =
|
||||
wallet#write name encoding map
|
||||
let save (wallet : #Client_context.wallet) list =
|
||||
wallet#write name list encoding
|
||||
|
||||
let lock = Lwt_mutex.create ()
|
||||
|
||||
let get_endorsement (wallet : #Client_context.wallet) level slot =
|
||||
Lwt_mutex.with_lock lock
|
||||
let get_endorsement (wallet : #Client_context.wallet) (delegate_key:Signature.public_key_hash) =
|
||||
wallet#with_lock
|
||||
(fun () ->
|
||||
load wallet >>=? fun map ->
|
||||
try
|
||||
let _, block, op =
|
||||
LevelMap.find level map
|
||||
|> List.find (fun (slot',_,_) -> slot = slot') in
|
||||
return (Some (block, op))
|
||||
with Not_found -> return None)
|
||||
|
||||
let record_endorsement (wallet : #Client_context.wallet) level hash slot oph =
|
||||
Lwt_mutex.with_lock lock
|
||||
(fun () ->
|
||||
load wallet >>=? fun map ->
|
||||
let previous =
|
||||
try LevelMap.find level map
|
||||
with Not_found -> [] in
|
||||
wallet#write name
|
||||
(LevelMap.add level ((slot, hash, oph) :: previous) map)
|
||||
encoding)
|
||||
load wallet >>=? fun l ->
|
||||
return (List.assoc_opt (Signature.Public_key_hash.to_short_b58check delegate_key) l)
|
||||
)
|
||||
|
||||
let record_endorsement (wallet : #Client_context.wallet) (delegate:Signature.public_key_hash) (new_lvl:Raw_level.t) =
|
||||
begin
|
||||
wallet#with_lock (fun () ->
|
||||
begin
|
||||
load wallet >>=? fun l ->
|
||||
let delegate_key = Signature.Public_key_hash.to_short_b58check delegate
|
||||
in
|
||||
match List.assoc_opt delegate_key l with
|
||||
| None ->
|
||||
save wallet ((delegate_key, new_lvl)::l)
|
||||
| Some _ ->
|
||||
save wallet ((delegate_key, new_lvl)::
|
||||
List.remove_assoc delegate_key l)
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
let get_signing_slots cctxt ?(chain = `Main) block delegate level =
|
||||
@ -94,7 +82,7 @@ let get_signing_slots cctxt ?(chain = `Main) block delegate level =
|
||||
let inject_endorsement
|
||||
(cctxt : #Proto_alpha.full)
|
||||
?(chain = `Main) block level ?async
|
||||
src_sk slots =
|
||||
src_sk slots pkh =
|
||||
Shell_services.Blocks.hash cctxt ~chain ~block () >>=? fun hash ->
|
||||
Alpha_services.Forge.endorsement cctxt
|
||||
(chain, block)
|
||||
@ -106,25 +94,17 @@ let inject_endorsement
|
||||
Client_keys.append cctxt
|
||||
src_sk ~watermark:Endorsement bytes >>=? fun signed_bytes ->
|
||||
Shell_services.Injection.operation cctxt ?async ~chain signed_bytes >>=? fun oph ->
|
||||
iter_s
|
||||
(fun slot ->
|
||||
State.record_endorsement cctxt level hash slot oph)
|
||||
slots >>=? fun () ->
|
||||
State.record_endorsement cctxt pkh level >>=? fun () ->
|
||||
return oph
|
||||
|
||||
let previously_endorsed_slot cctxt level slot =
|
||||
State.get_endorsement cctxt level slot >>=? function
|
||||
| None -> return false
|
||||
| Some _ -> return true
|
||||
|
||||
let check_endorsement cctxt level slot =
|
||||
State.get_endorsement cctxt level slot >>=? function
|
||||
let check_endorsement cctxt level pkh =
|
||||
State.get_endorsement cctxt pkh >>=? function
|
||||
| None -> return ()
|
||||
| Some (block, _) ->
|
||||
Error_monad.failwith
|
||||
"Already signed block %a at level %a, slot %d"
|
||||
Block_hash.pp_short block Raw_level.pp level slot
|
||||
|
||||
| Some recorded_level ->
|
||||
if Raw_level.(level = recorded_level) then
|
||||
Error_monad.failwith "Level %a already endorsed" Raw_level.pp recorded_level
|
||||
else
|
||||
return ()
|
||||
|
||||
let forge_endorsement (cctxt : #Proto_alpha.full)
|
||||
?(chain = `Main) block
|
||||
@ -141,11 +121,8 @@ let forge_endorsement (cctxt : #Proto_alpha.full)
|
||||
| [] -> cctxt#error "No slot found at level %a" Raw_level.pp level
|
||||
| slots -> return slots
|
||||
end >>=? fun slots ->
|
||||
iter_s (check_endorsement cctxt level) slots >>=? fun () ->
|
||||
inject_endorsement cctxt
|
||||
~chain block level
|
||||
src_sk slots
|
||||
|
||||
check_endorsement cctxt level src_pkh >>=? fun () ->
|
||||
inject_endorsement cctxt ~chain block level src_sk slots src_pkh
|
||||
|
||||
(** Worker *)
|
||||
|
||||
@ -159,7 +136,7 @@ and endorsement = {
|
||||
time: Time.t ;
|
||||
delegate: public_key_hash ;
|
||||
block: Client_baking_blocks.block_info ;
|
||||
slot: int;
|
||||
slots: int list;
|
||||
}
|
||||
|
||||
let create_state delegates best delay =
|
||||
@ -197,64 +174,59 @@ let schedule_endorsements (cctxt : #Proto_alpha.full) state bi =
|
||||
let b = `Hash (block.hash, 0) in
|
||||
let level = block.level.level in
|
||||
get_signing_slots cctxt b delegate level >>=? fun slots ->
|
||||
lwt_debug "Found slots for %a/%s (%d)"
|
||||
Block_hash.pp_short block.hash name (List.length slots) >>= fun () ->
|
||||
iter_p
|
||||
(fun slot ->
|
||||
if Fitness.compare state.best.fitness block.fitness < 0 then begin
|
||||
state.best <- block ;
|
||||
drop_old_endorsement ~before:block.fitness state ;
|
||||
end ;
|
||||
previously_endorsed_slot cctxt level slot >>=? function
|
||||
| true ->
|
||||
lwt_debug "slot %d: previously endorsed." slot >>= fun () ->
|
||||
return ()
|
||||
| false ->
|
||||
try
|
||||
let same_slot e =
|
||||
e.block.level = block.level && e.slot = slot in
|
||||
let old = List.find same_slot state.to_endorse in
|
||||
if Fitness.compare old.block.fitness block.fitness < 0
|
||||
then begin
|
||||
lwt_log_info
|
||||
"Schedule endorsement for block %a \
|
||||
(level %a, slot %d, time %a) (replace block %a)"
|
||||
Block_hash.pp_short block.hash
|
||||
Raw_level.pp level
|
||||
slot
|
||||
Time.pp_hum time
|
||||
Block_hash.pp_short old.block.hash
|
||||
>>= fun () ->
|
||||
state.to_endorse <-
|
||||
insert
|
||||
{ time ; delegate ; block ; slot }
|
||||
(List.filter
|
||||
(fun e -> not (same_slot e))
|
||||
state.to_endorse) ;
|
||||
return ()
|
||||
end else begin
|
||||
lwt_debug
|
||||
"slot %d: better pending endorsement"
|
||||
slot >>= fun () ->
|
||||
return ()
|
||||
end
|
||||
with Not_found ->
|
||||
lwt_log_info
|
||||
"Schedule endorsement for block %a \
|
||||
(level %a, slot %d, time %a)"
|
||||
Block_hash.pp_short block.hash
|
||||
Raw_level.pp level
|
||||
slot
|
||||
Time.pp_hum time >>= fun () ->
|
||||
state.to_endorse <-
|
||||
insert { time ; delegate ; block ; slot } state.to_endorse ;
|
||||
return ())
|
||||
slots in
|
||||
lwt_debug "Found %d slots for %a/%s"
|
||||
(List.length slots) Block_hash.pp_short block.hash name >>= fun () ->
|
||||
|
||||
if Fitness.compare state.best.fitness block.fitness < 0 then begin
|
||||
state.best <- block ;
|
||||
drop_old_endorsement ~before:block.fitness state ;
|
||||
end ;
|
||||
begin try
|
||||
let same_slot endorsement =
|
||||
endorsement.block.level = block.level && endorsement.slots = slots in
|
||||
let old = List.find same_slot state.to_endorse in
|
||||
if Fitness.compare old.block.fitness block.fitness < 0
|
||||
then begin
|
||||
lwt_log_info
|
||||
"Schedule endorsement for block %a \
|
||||
(level %a, slots { %a }, time %a) (replace block %a)"
|
||||
Block_hash.pp_short block.hash
|
||||
Raw_level.pp level
|
||||
(Format.pp_print_list Format.pp_print_int) slots
|
||||
Time.pp_hum time
|
||||
Block_hash.pp_short old.block.hash
|
||||
>>= fun () ->
|
||||
state.to_endorse <-
|
||||
insert
|
||||
{ time ; delegate ; block ; slots }
|
||||
(List.filter
|
||||
(fun e -> not (same_slot e))
|
||||
state.to_endorse) ;
|
||||
return ()
|
||||
end else begin
|
||||
lwt_debug
|
||||
"slot { %a } : better pending endorsement"
|
||||
(Format.pp_print_list Format.pp_print_int) slots >>= fun () ->
|
||||
return ()
|
||||
end
|
||||
with Not_found ->
|
||||
lwt_log_info
|
||||
"Schedule endorsement for block %a \
|
||||
(level %a, slot { %a }, time %a)"
|
||||
Block_hash.pp_short block.hash
|
||||
Raw_level.pp level
|
||||
(Format.pp_print_list Format.pp_print_int) slots
|
||||
Time.pp_hum time >>= fun () ->
|
||||
state.to_endorse <-
|
||||
insert { time ; delegate ; block ; slots } state.to_endorse ;
|
||||
return ()
|
||||
end
|
||||
in
|
||||
|
||||
let time = Time.(add (now ()) state.delay) in
|
||||
get_delegates cctxt state >>=? fun delegates ->
|
||||
iter_p
|
||||
(fun delegate ->
|
||||
may_endorse bi delegate time)
|
||||
iter_s
|
||||
(fun delegate -> may_endorse bi delegate time)
|
||||
delegates
|
||||
|
||||
let schedule_endorsements (cctxt : #Proto_alpha.full) state bis =
|
||||
@ -278,29 +250,24 @@ let pop_endorsements state =
|
||||
|
||||
let endorse cctxt state =
|
||||
let to_endorse = pop_endorsements state in
|
||||
iter_p
|
||||
(fun { delegate ; block ; slot } ->
|
||||
let hash = block.hash in
|
||||
let b = `Hash (hash, 0) in
|
||||
let level = block.level.level in
|
||||
previously_endorsed_slot cctxt level slot >>=? function
|
||||
| true -> return ()
|
||||
| false ->
|
||||
Client_keys.get_key cctxt delegate >>=? fun (name, _pk, sk) ->
|
||||
lwt_debug "Endorsing %a for %s (slot %d)!"
|
||||
Block_hash.pp_short hash name slot >>= fun () ->
|
||||
inject_endorsement cctxt
|
||||
b level
|
||||
sk [slot] >>=? fun oph ->
|
||||
cctxt#message
|
||||
"Injected endorsement for block '%a' \
|
||||
(level %a, slot %d, contract %s) '%a'"
|
||||
Block_hash.pp_short hash
|
||||
Raw_level.pp level
|
||||
slot name
|
||||
Operation_hash.pp_short oph >>= fun () ->
|
||||
return ())
|
||||
to_endorse
|
||||
iter_p (fun { time = _ ; block ; slots ; delegate } ->
|
||||
let hash = block.hash in
|
||||
let b = `Hash (hash, 0) in
|
||||
let level = block.level.level in
|
||||
Client_keys.get_key cctxt delegate >>=? fun (name, pk, sk) ->
|
||||
let pkh = Signature.Public_key.hash pk in
|
||||
lwt_debug "Endorsing %a for %s (slots : { %a } )!"
|
||||
Block_hash.pp_short block.hash name
|
||||
(Format.pp_print_list Format.pp_print_int) slots >>= fun () ->
|
||||
inject_endorsement cctxt b block.level.level sk slots pkh >>=? fun oph ->
|
||||
cctxt#message
|
||||
"Injected endorsement for block '%a' \
|
||||
(level %a, slots { %a }, contract %s) '%a'"
|
||||
Block_hash.pp_short hash
|
||||
Raw_level.pp level
|
||||
(Format.pp_print_list Format.pp_print_int) slots name
|
||||
Operation_hash.pp_short oph >>= fun () -> return ()
|
||||
) to_endorse
|
||||
|
||||
let compute_timeout state =
|
||||
match state.to_endorse with
|
||||
|
Loading…
Reference in New Issue
Block a user