From b087042d8328cd8a501d345b2e42c35a27a0de1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Thu, 2 Mar 2017 14:45:23 +0100 Subject: [PATCH] Shell: do not split the validation scheduler... --- src/node/shell/validator.ml | 329 +++++++++++++++++------------------- 1 file changed, 158 insertions(+), 171 deletions(-) diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index 470e251e3..3dde9c655 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -169,102 +169,6 @@ let apply_block net db (** *) -module Validation_scheduler = struct - - type state = { - db: Distributed_db.net ; - running: Block_hash.Set.t ref ; - } - - let init_request { db } hash = - Distributed_db.Block_header.fetch db hash - - let process { db } v ~get:get_context ~set:set_context hash block = - let state = Distributed_db.state db in - get_context block.State.Block_header.shell.predecessor >>= function - | Error _ -> - set_context hash (Error [(* TODO *)]) - | Ok _context -> - lwt_debug "process %a" Block_hash.pp_short hash >>= fun () -> - begin - State.Valid_block.Current.genesis state >>= fun genesis -> - if Block_hash.equal genesis.hash block.shell.predecessor then - Lwt.return genesis - else - State.Valid_block.read_exn state block.shell.predecessor - end >>= fun pred -> - apply_block state db pred hash block >>= function - | Error ([State.Unknown_protocol _] as err) -> - lwt_log_error - "@[Ignoring block %a@ %a@]" - Block_hash.pp_short hash - Error_monad.pp_print_error err - | Error exns as error -> - set_context hash error >>= fun () -> - lwt_warn "Failed to validate block %a." - Block_hash.pp_short hash >>= fun () -> - lwt_debug "%a" Error_monad.pp_print_error exns >>= fun () -> - Lwt.return_unit - | Ok new_context -> - (* The sanity check `set_context` detects differences - between the computed fitness and the fitness announced - in the block header. Then `Valid_block.read` will - return an error. *) - set_context hash (Ok new_context) >>= fun () -> - State.Valid_block.read state hash >>= function - | Error err -> - lwt_log_error - "@[Ignoring block %a@ %a@]" - Block_hash.pp_short hash - Error_monad.pp_print_error err - | Ok block -> - lwt_debug - "validation of %a: reevaluate current block" - Block_hash.pp_short hash >>= fun () -> - Watcher.notify v.worker.valid_block_input block ; - Watcher.notify v.valid_block_input block ; - may_set_head v block - - let request state ~get ~set pendings = - let time = Time.now () in - let min_block b pb = - match pb with - | None -> Some b - | Some pb - when b.Store.Block_header.shell.timestamp - < pb.Store.Block_header.shell.timestamp -> - Some b - | Some _ as pb -> pb in - let next = - List.fold_left - (fun acc (hash, block, v) -> - match block with - | Error _ -> - acc - | Ok block -> - if Time.(block.Store.Block_header.shell.timestamp > time) then - min_block block acc - else begin - if not (Block_hash.Set.mem hash !(state.running)) then begin - state.running := Block_hash.Set.add hash !(state.running) ; - Lwt.async (fun () -> - process state v - ~get:(get v) ~set hash block >>= fun () -> - state.running := - Block_hash.Set.remove hash !(state.running) ; - Lwt.return_unit - ) - end ; - acc - end) - None - pendings in - match next with - | None -> 0. - | Some b -> Int64.to_float (Time.diff b.Store.Block_header.shell.timestamp time) - -end - module Context_db = struct type key = Block_hash.t @@ -273,32 +177,35 @@ module Context_db = struct type data = { validator: t ; state: [ `Inited of Store.Block_header.t tzresult - | `Initing of Store.Block_header.t tzresult Lwt.t ] ; + | `Initing of Store.Block_header.t tzresult Lwt.t + | `Running of State.Valid_block.t tzresult Lwt.t ] ; wakener: State.Valid_block.t tzresult Lwt.u } - type t = + type context = { tbl : data Block_hash.Table.t ; canceler : Lwt_utils.Canceler.t ; worker_trigger: unit -> unit; worker_waiter: unit -> unit Lwt.t ; worker: unit Lwt.t ; - vstate : Validation_scheduler.state } + net_db : Distributed_db.net ; + net_state : State.Net.t } let pending_requests { tbl } = Block_hash.Table.fold (fun h data acc -> match data.state with | `Initing _ -> acc - | `Inited d -> (h, d, data.validator) :: acc) + | `Running _ -> acc + | `Inited d -> (h, d, data) :: acc) tbl [] let pending { tbl } hash = Block_hash.Table.mem tbl hash - let request { tbl ; worker_trigger ; vstate } validator hash = + let request validator { tbl ; worker_trigger ; net_db } hash = assert (not (Block_hash.Table.mem tbl hash)); let waiter, wakener = Lwt.wait () in let data = - Distributed_db.Block_header.fetch vstate.db hash >>= return in + Distributed_db.Block_header.fetch net_db hash >>= return in match Lwt.state data with | Lwt.Return data -> let state = `Inited data in @@ -317,49 +224,45 @@ module Context_db = struct Lwt.return_unit) ; waiter - let prefetch ({ vstate ; tbl } as session) validator hash = - let state = Distributed_db.state vstate.db in + let prefetch validator ({ net_state ; tbl } as session) hash = Lwt.ignore_result - (State.Valid_block.known state hash >>= fun exists -> + (State.Valid_block.known net_state hash >>= fun exists -> if not exists && not (Block_hash.Table.mem tbl hash) then - request session validator hash >>= fun _ -> Lwt.return_unit + request validator session hash >>= fun _ -> Lwt.return_unit else Lwt.return_unit) - let known { vstate } hash = - let state = Distributed_db.state vstate.db in - State.Valid_block.known state hash + let known { net_state } hash = + State.Valid_block.known net_state hash - let read { vstate } hash = - let state = Distributed_db.state vstate.db in - State.Valid_block.read state hash + let read { net_state } hash = + State.Valid_block.read net_state hash - let fetch ({ vstate ; tbl } as session) validator hash = - let state = Distributed_db.state vstate.db in + let fetch ({ net_state ; tbl } as session) validator hash = try Lwt.waiter_of_wakener (Block_hash.Table.find tbl hash).wakener with Not_found -> - State.Valid_block.read_opt state hash >>= function - | Some op -> Lwt.return (Ok op) + State.Valid_block.read_opt net_state hash >>= function + | Some op -> + Lwt.return (Ok op) | None -> try Lwt.waiter_of_wakener (Block_hash.Table.find tbl hash).wakener - with Not_found -> request session validator hash + with Not_found -> request validator session hash - let store { vstate ; tbl } hash data = - let state = Distributed_db.state vstate.db in + let store { net_state ; net_db ; tbl } hash data = begin match data with | Ok data -> - Distributed_db.Block_header.commit vstate.db hash >>= fun () -> + Distributed_db.Block_header.commit net_db hash >>= fun () -> begin - State.Valid_block.store state hash data >>=? function + State.Valid_block.store net_state hash data >>=? function | None -> - State.Valid_block.read state hash >>=? fun block -> + State.Valid_block.read net_state hash >>=? fun block -> return (Ok block, false) | Some block -> return (Ok block, true) end | Error err -> - State.Block_header.mark_invalid state hash err >>= fun changed -> + State.Block_header.mark_invalid net_state hash err >>= fun changed -> return (Error err, changed) end >>= function | Ok (block, changed) -> @@ -373,48 +276,133 @@ module Context_db = struct Lwt.wakeup wakener err ; Lwt.return false - let create vstate = - let tbl = Block_hash.Table.create 50 in - let canceler = Lwt_utils.Canceler.create () in - let worker_trigger, worker_waiter = Lwt_utils.trigger () in - let session = - { tbl ; vstate ; worker = Lwt.return () ; - canceler ; worker_trigger ; worker_waiter } in - let worker = - let rec worker_loop () = - Lwt_utils.protect ~canceler begin fun () -> - worker_waiter () >>= return - end >>= function - | Error [Lwt_utils.Canceled] -> Lwt.return_unit - | Error err -> - lwt_log_error - "@[Unexpected error in validation:@ %a@]" - pp_print_error err >>= fun () -> - worker_loop () - | Ok () -> - begin - match pending_requests session with - | [] -> () - | requests -> - let get = fetch session - and set k v = - store session k v >>= fun _ -> Lwt.return_unit in - let timeout = - Validation_scheduler.request - vstate ~get ~set requests in - if timeout > 0. then - Lwt.ignore_result - (Lwt_unix.sleep timeout >|= worker_trigger); - end ; - worker_loop () - in - Lwt_utils.worker "validation" - ~run:worker_loop - ~cancel:(fun () -> Lwt_utils.Canceler.cancel canceler) in - { session with worker } + let process (v:t) ~get_context ~set_context hash block = + let state = Distributed_db.state v.net_db in + get_context v block.State.Block_header.shell.predecessor >>= function + | Error _ as error -> + set_context v hash (Error [(* TODO *)]) >>= fun () -> + Lwt.return error + | Ok _context -> + lwt_debug "process %a" Block_hash.pp_short hash >>= fun () -> + begin + State.Valid_block.Current.genesis state >>= fun genesis -> + if Block_hash.equal genesis.hash block.shell.predecessor then + Lwt.return genesis + else + State.Valid_block.read_exn state block.shell.predecessor + end >>= fun pred -> + apply_block state v.net_db pred hash block >>= function + | Error ([State.Unknown_protocol _] as err) as error -> + lwt_log_error + "@[Ignoring block %a@ %a@]" + Block_hash.pp_short hash + Error_monad.pp_print_error err >>= fun () -> + Lwt.return error + | Error exns as error -> + set_context v hash error >>= fun () -> + lwt_warn "Failed to validate block %a." + Block_hash.pp_short hash >>= fun () -> + lwt_debug "%a" Error_monad.pp_print_error exns >>= fun () -> + Lwt.return error + | Ok new_context -> + (* The sanity check `set_context` detects differences + between the computed fitness and the fitness announced + in the block header. Then `Valid_block.read` will + return an error. *) + set_context v hash (Ok new_context) >>= fun () -> + State.Valid_block.read state hash >>= function + | Error err as error -> + lwt_log_error + "@[Ignoring block %a@ %a@]" + Block_hash.pp_short hash + Error_monad.pp_print_error err >>= fun () -> + Lwt.return error + | Ok block -> + lwt_debug + "validation of %a: reevaluate current block" + Block_hash.pp_short hash >>= fun () -> + Watcher.notify v.worker.valid_block_input block ; + Watcher.notify v.valid_block_input block ; + may_set_head v block >>= fun () -> + return block - let shutdown { canceler ; worker } = - Lwt_utils.Canceler.cancel canceler >>= fun () -> worker + let request session ~get_context ~set_context pendings = + let time = Time.now () in + let min_block b pb = + match pb with + | None -> Some b + | Some pb + when b.Store.Block_header.shell.timestamp + < pb.Store.Block_header.shell.timestamp -> + Some b + | Some _ as pb -> pb in + let next = + List.fold_left + (fun acc (hash, block, (data : data)) -> + match block with + | Error _ -> + acc + | Ok block -> + if Time.(block.Store.Block_header.shell.timestamp > time) then + min_block block acc + else begin + Block_hash.Table.replace session.tbl hash { data with state = `Running begin + process data.validator ~get_context ~set_context hash block >>= fun res -> + Block_hash.Table.remove session.tbl hash ; + Lwt.return res + end } ; + acc + end) + None + pendings in + match next with + | None -> 0. + | Some b -> Int64.to_float (Time.diff b.Store.Block_header.shell.timestamp time) + + let create net_db = + let net_state = Distributed_db.state net_db in + let tbl = Block_hash.Table.create 50 in + let canceler = Lwt_utils.Canceler.create () in + let worker_trigger, worker_waiter = Lwt_utils.trigger () in + let session = + { tbl ; net_db ; net_state ; worker = Lwt.return () ; + canceler ; worker_trigger ; worker_waiter } in + let worker = + let rec worker_loop () = + Lwt_utils.protect ~canceler begin fun () -> + worker_waiter () >>= return + end >>= function + | Error [Lwt_utils.Canceled] -> Lwt.return_unit + | Error err -> + lwt_log_error + "@[Unexpected error in validation:@ %a@]" + pp_print_error err >>= fun () -> + worker_loop () + | Ok () -> + begin + match pending_requests session with + | [] -> () + | requests -> + let set_context _validator hash context = + store session hash context >>= fun _ -> + Lwt.return_unit in + let timeout = + request session + ~get_context:(fetch session) + ~set_context requests in + if timeout > 0. then + Lwt.ignore_result + (Lwt_unix.sleep timeout >|= worker_trigger); + end ; + worker_loop () + in + Lwt_utils.worker "validation" + ~run:worker_loop + ~cancel:(fun () -> Lwt_utils.Canceler.cancel canceler) in + { session with worker } + + let shutdown { canceler ; worker } = + Lwt_utils.Canceler.cancel canceler >>= fun () -> worker end @@ -444,8 +432,7 @@ let rec create_validator ?parent worker state db net = let net_id = State.Net.id net in let net_db = Distributed_db.activate ~callback db net in - let proxy = - Context_db.create { db = net_db ; running = ref Block_hash.Set.empty } in + let session = Context_db.create net_db in Prevalidator.create net_db >>= fun prevalidator -> current_ops := @@ -459,7 +446,7 @@ let rec create_validator ?parent worker state db net = Distributed_db.deactivate net_db >>= fun () -> Lwt_pipe.close queue ; Lwt.join [ - Context_db.shutdown proxy ; + Context_db.shutdown session ; !new_blocks ; Prevalidator.shutdown prevalidator ; ] @@ -509,11 +496,11 @@ let rec create_validator ?parent worker state db net = Block_hash.pp_short hash >>= fun () -> State.Valid_block.Current.head net >>= fun head -> if Fitness.compare head.fitness block.shell.fitness <= 0 then - Context_db.prefetch proxy v hash ; + Context_db.prefetch v session hash ; Lwt.return_unit and fetch_block hash = - Context_db.fetch proxy v hash + Context_db.fetch session v hash and create_child block = begin @@ -551,10 +538,10 @@ let rec create_validator ?parent worker state db net = let rec loop () = Lwt_pipe.pop queue >>= function | `Branch (_gid, locator) -> - List.iter (Context_db.prefetch proxy v) locator ; + List.iter (Context_db.prefetch v session) locator ; loop () | `Head (gid, head, ops) -> - Context_db.prefetch proxy v head ; + Context_db.prefetch v session head ; List.iter (Prevalidator.notify_operation prevalidator gid) ops ; loop () in