From cd25bb416088cadc8f5c913ac9df226d991866c0 Mon Sep 17 00:00:00 2001 From: Pietro Abate Date: Fri, 9 Nov 2018 15:49:26 +0100 Subject: [PATCH] Shell: Worker initialization in error monad MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Pietro Abate Co-authored-by: Raphaƫl Proust Co-authored-by: Mathias Bourgoin --- scripts/gen_genesis.ml | 4 +- src/lib_shell/block_validator.ml | 3 +- src/lib_shell/block_validator.mli | 3 +- src/lib_shell/chain_directory.ml | 8 ++- src/lib_shell/chain_validator.ml | 61 ++++++++++-------- src/lib_shell/chain_validator.mli | 3 +- src/lib_shell/mempool_peer_worker.ml | 5 +- src/lib_shell/mempool_peer_worker.mli | 2 +- src/lib_shell/mempool_worker.ml | 15 ++--- src/lib_shell/mempool_worker.mli | 2 +- src/lib_shell/monitor_directory.ml | 63 ++++++++++--------- src/lib_shell/node.ml | 5 +- src/lib_shell/peer_validator.ml | 3 +- src/lib_shell/peer_validator.mli | 3 +- src/lib_shell/prevalidator.ml | 91 ++++++++++++++------------- src/lib_shell/prevalidator.mli | 9 +-- src/lib_shell/validator.ml | 36 ++++++----- src/lib_shell/validator.mli | 9 +-- src/lib_shell/worker.ml | 13 ++-- src/lib_shell/worker.mli | 5 +- src/lib_shell/worker_directory.ml | 26 ++++---- 21 files changed, 203 insertions(+), 166 deletions(-) diff --git a/scripts/gen_genesis.ml b/scripts/gen_genesis.ml index 40eb2d98c..d4696c404 100644 --- a/scripts/gen_genesis.ml +++ b/scripts/gen_genesis.ml @@ -52,7 +52,7 @@ let sed = let () = Lwt_main.run (Lwt_process.exec (Lwt_process.shell sed) >>= fun _ -> - Lwt_unix.unlink "../src/bin_node/node_run_command.ml.old") + Lwt_unix.unlink "../src/bin_node/node_run_command.ml.old") let sed = Format.sprintf @@ -63,4 +63,4 @@ let sed = let () = Lwt_main.run (Lwt_process.exec (Lwt_process.shell sed) >>= fun _ -> - Lwt_unix.unlink "../src/lib_shell/distributed_db_message.ml.old") + Lwt_unix.unlink "../src/lib_shell/distributed_db_message.ml.old") diff --git a/src/lib_shell/block_validator.ml b/src/lib_shell/block_validator.ml index a66d41be4..c3e0a497d 100644 --- a/src/lib_shell/block_validator.ml +++ b/src/lib_shell/block_validator.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -169,7 +170,7 @@ let on_request let on_launch _ _ (limits, db, validation_kind) = let protocol_validator = Protocol_validator.create db in Block_validator_process.init validation_kind >>= fun validation_process -> - Lwt.return { Types.protocol_validator ; validation_process ; limits } + return { Types.protocol_validator ; validation_process ; limits } let on_error w r st errs = Worker.record_event w (Validation_failure (r, st, errs)) ; diff --git a/src/lib_shell/block_validator.mli b/src/lib_shell/block_validator.mli index dfce572da..3184a0845 100644 --- a/src/lib_shell/block_validator.mli +++ b/src/lib_shell/block_validator.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -37,7 +38,7 @@ type error += Closed of unit val create: limits -> Distributed_db.t -> validator_kind -> - t Lwt.t + t tzresult Lwt.t val validate: t -> diff --git a/src/lib_shell/chain_directory.ml b/src/lib_shell/chain_directory.ml index fa7a57f07..153b41498 100644 --- a/src/lib_shell/chain_directory.ml +++ b/src/lib_shell/chain_directory.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -162,9 +163,10 @@ let build_rpc_directory validator = merge (RPC_directory.map (fun chain -> - Validator.get_exn validator - (State.Chain.id chain) >>= fun chain_validator -> - Lwt.return (Chain_validator.prevalidator chain_validator)) + match Validator.get validator (State.Chain.id chain) with + | Error _ -> Lwt.fail Not_found + | Ok chain_validator -> + Lwt.return (Chain_validator.prevalidator chain_validator)) Prevalidator.rpc_directory) ; RPC_directory.prefix Chain_services.path @@ diff --git a/src/lib_shell/chain_validator.ml b/src/lib_shell/chain_validator.ml index 539d3a649..47ce3764a 100644 --- a/src/lib_shell/chain_validator.ml +++ b/src/lib_shell/chain_validator.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -75,7 +76,7 @@ module Types = struct mutable child: (state * (unit -> unit Lwt.t (* shutdown *))) option ; mutable prevalidator: Prevalidator.t option ; - active_peers: Peer_validator.t Lwt.t P2p_peer.Table.t ; + active_peers: Peer_validator.t P2p_peer.Table.t ; bootstrapped_peers: unit P2p_peer.Table.t ; } @@ -125,9 +126,9 @@ let may_toggle_bootstrapped_chain w = let may_activate_peer_validator w peer_id = let nv = Worker.state w in - try P2p_peer.Table.find nv.active_peers peer_id - with Not_found -> - let pv = + match P2p_peer.Table.find_opt nv.active_peers peer_id with + |Some pv -> return pv + |None -> Peer_validator.create ~notify_new_block:(notify_new_block w) ~notify_bootstrapped: begin fun () -> @@ -141,9 +142,10 @@ let may_activate_peer_validator w peer_id = nv.parameters.peer_validator_limits nv.parameters.block_validator nv.parameters.chain_db - peer_id in - P2p_peer.Table.add nv.active_peers peer_id pv ; - pv + peer_id + >>=? fun pv -> + P2p_peer.Table.add nv.active_peers peer_id pv ; + return pv let may_update_checkpoint chain_state new_head = State.Chain.checkpoint chain_state >>= fun (old_level, _old_block) -> @@ -183,7 +185,7 @@ let may_switch_test_chain w spawn_child block = nv.parameters.block_validator nv.parameters.global_valid_block_input nv.parameters.db chain_state - nv.parameters.limits (* TODO: different limits main/test ? *) >>= fun child -> + nv.parameters.limits (* TODO: different limits main/test ? *) >>=? fun child -> nv.child <- Some child ; return_unit end else begin @@ -302,7 +304,7 @@ let on_request (type a) w spawn_child (req : a Request.t) : a tzresult Lwt.t = Prevalidator.create limits (module Proto) - chain_db >>= fun prevalidator -> + chain_db >>=? fun prevalidator -> nv.prevalidator <- Some prevalidator ; Prevalidator.shutdown old_prevalidator >>= fun () -> return_unit @@ -339,7 +341,7 @@ let on_close w = end :: Lwt_utils.may ~f:(fun (_, shutdown) -> shutdown ()) nv.child :: P2p_peer.Table.fold - (fun _ pv acc -> (pv >>= Peer_validator.shutdown) :: acc) + (fun _ pv acc -> Peer_validator.shutdown pv :: acc) nv.active_peers []) >>= fun () -> Lwt.return_unit @@ -350,17 +352,23 @@ let on_launch start_prevalidator w _ parameters = (fun _ {State.current_head} -> Lwt.return current_head) >>= fun head -> State.Block.protocol_hash head >>= fun head_hash -> safe_get_protocol head_hash >>= function - | Ok (module Proto) -> + | Ok (module Proto) -> begin Prevalidator.create parameters.prevalidator_limits (module Proto) - parameters.chain_db >>= fun prevalor -> - Lwt.return_some prevalor + parameters.chain_db >>= function + | Error err -> + Log.lwt_log_error "@[Failed to instantiate prevalidator:@ %a@]" + pp_print_error err >>= fun () -> + return_none + | Ok prevalidator -> + return_some prevalidator + end | Error err -> Log.lwt_log_error "@[Failed to instantiate prevalidator:@ %a@]" pp_print_error err >>= fun () -> - Lwt.return_none - else Lwt.return_none) >>= fun prevalidator -> + return_none + else return_none) >>=? fun prevalidator -> let valid_block_input = Lwt_watcher.create_input () in let new_head_input = Lwt_watcher.create_input () in let bootstrapped_waiter, bootstrapped_wakener = Lwt.wait () in @@ -381,38 +389,41 @@ let on_launch start_prevalidator w _ parameters = Distributed_db.set_callback parameters.chain_db { notify_branch = begin fun peer_id locator -> Lwt.async begin fun () -> - may_activate_peer_validator w peer_id >>= fun pv -> + may_activate_peer_validator w peer_id >>=? fun pv -> Peer_validator.notify_branch pv locator ; - Lwt.return_unit + return_unit end end ; notify_head = begin fun peer_id block ops -> Lwt.async begin fun () -> - may_activate_peer_validator w peer_id >>= fun pv -> + may_activate_peer_validator w peer_id >>=? fun pv -> Peer_validator.notify_head pv block ; (* TODO notify prevalidator only if head is known ??? *) match nv.prevalidator with - | Some prevalidator -> Prevalidator.notify_operations prevalidator peer_id ops - | None -> Lwt.return_unit + | Some prevalidator -> + Prevalidator.notify_operations prevalidator peer_id ops >>= fun () -> + return_unit + | None -> return_unit end; end ; disconnection = begin fun peer_id -> Lwt.async begin fun () -> - may_activate_peer_validator w peer_id >>= fun pv -> + may_activate_peer_validator w peer_id >>=? fun pv -> Peer_validator.shutdown pv >>= fun () -> - Lwt.return_unit + return_unit end end ; } ; - Lwt.return nv + return nv let rec create ?max_child_ttl ~start_prevalidator ?parent peer_validator_limits prevalidator_limits block_validator global_valid_block_input db chain_state limits = let spawn_child ~parent pvl pl bl gvbi db n l = - create ~start_prevalidator ~parent pvl pl bl gvbi db n l >>= fun w -> - Lwt.return (Worker.state w, (fun () -> Worker.shutdown w)) in + create ~start_prevalidator ~parent pvl pl bl gvbi db n l >>=? fun w -> + return (Worker.state w, (fun () -> Worker.shutdown w)) + in let module Handlers = struct type self = t let on_launch = on_launch start_prevalidator diff --git a/src/lib_shell/chain_validator.mli b/src/lib_shell/chain_validator.mli index cc8845d27..a4c0be7e3 100644 --- a/src/lib_shell/chain_validator.mli +++ b/src/lib_shell/chain_validator.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -40,7 +41,7 @@ val create: Distributed_db.t -> State.Chain.t -> limits -> - t Lwt.t + t tzresult Lwt.t val bootstrapped: t -> unit Lwt.t diff --git a/src/lib_shell/mempool_peer_worker.ml b/src/lib_shell/mempool_peer_worker.ml index f97a9431e..3b370a771 100644 --- a/src/lib_shell/mempool_peer_worker.ml +++ b/src/lib_shell/mempool_peer_worker.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -37,7 +38,7 @@ module type T = sig type t type input = Operation_hash.t list - val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> t Lwt.t + val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> t tzresult Lwt.t val shutdown: t -> input Lwt.t val validate: t -> input -> unit tzresult Lwt.t @@ -336,7 +337,7 @@ module Make (Static: STATIC) (Mempool_worker: Mempool_worker.T) type self = t let on_launch _ _ (mempool_worker, pool_size) = - Lwt.return Types.{ mempool_worker; pool_size } + return Types.{ mempool_worker; pool_size } let on_request : type a. self -> a Request.t -> a tzresult Lwt.t = fun t (Request.Batch os) -> diff --git a/src/lib_shell/mempool_peer_worker.mli b/src/lib_shell/mempool_peer_worker.mli index 96553c174..7eb63cdb5 100644 --- a/src/lib_shell/mempool_peer_worker.mli +++ b/src/lib_shell/mempool_peer_worker.mli @@ -49,7 +49,7 @@ module type T = sig to be used for validating batches of operations sent by the peer [peer_id]. The validation of each operations is delegated to the associated [mempool_worker]. *) - val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> t Lwt.t + val create: limits -> P2p_peer.Id.t -> Mempool_worker.t -> t tzresult Lwt.t (** [shutdown t] closes the peer worker [t]. It returns a list of operation hashes that can be recycled when a new worker is created for the same peer. diff --git a/src/lib_shell/mempool_worker.ml b/src/lib_shell/mempool_worker.ml index b6d266a98..39d7304a7 100644 --- a/src/lib_shell/mempool_worker.ml +++ b/src/lib_shell/mempool_worker.ml @@ -1,6 +1,7 @@ (*****************************************************************************) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -515,7 +516,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) Operation_hash.Set.iter (fun hash -> ParsedCache.rem parsed_cache hash ) live_operations; - Lwt.return { + return { validation_state ; cache = ValidatedCache.create () ; live_blocks ; @@ -559,12 +560,12 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) Chain.data chain_state >>= fun { current_head = predecessor } -> let timestamp = Time.now () in create ~predecessor ~timestamp () >>=? fun validation_state -> - (Worker.launch - table - limits.worker_limits - chain_id - { limits ; chain_db ; validation_state } - (module Handlers) >>= return) + Worker.launch + table + limits.worker_limits + chain_id + { limits ; chain_db ; validation_state } + (module Handlers) (* Exporting functions *) diff --git a/src/lib_shell/mempool_worker.mli b/src/lib_shell/mempool_worker.mli index 922c9faee..61fc1e96b 100644 --- a/src/lib_shell/mempool_worker.mli +++ b/src/lib_shell/mempool_worker.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -69,4 +70,3 @@ module type STATIC = sig end module Make (Static : STATIC) (Proto : Registered_protocol.T) : T with module Proto = Proto - diff --git a/src/lib_shell/monitor_directory.ml b/src/lib_shell/monitor_directory.ml index 7f8f9f4f8..00df8668b 100644 --- a/src/lib_shell/monitor_directory.ml +++ b/src/lib_shell/monitor_directory.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -105,36 +106,38 @@ let build_rpc_directory validator mainchain_validator = (* TODO: when `chain = `Test`, should we reset then stream when the `testnet` change, or dias we currently do ?? *) Chain_directory.get_chain state chain >>= fun chain -> - Validator.get_exn validator (State.Chain.id chain) >>= fun chain_validator -> - let block_stream, stopper = Chain_validator.new_head_watcher chain_validator in - Chain.head chain >>= fun head -> - let shutdown () = Lwt_watcher.shutdown stopper in - let in_next_protocols block = - match q#next_protocols with - | [] -> Lwt.return_true - | protocols -> - State.Block.context block >>= fun context -> - Context.get_protocol context >>= fun next_protocol -> - Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in - let stream = - Lwt_stream.filter_map_s - (fun block -> - in_next_protocols block >>= fun in_next_protocols -> - if in_next_protocols then - Lwt.return_some (State.Block.hash block, State.Block.header block) - else - Lwt.return_none) - block_stream in - in_next_protocols head >>= fun first_block_is_among_next_protocols -> - let first_call = - (* Skip the first block if this is false *) - ref first_block_is_among_next_protocols in - let next () = - if !first_call then begin - first_call := false ; Lwt.return_some (State.Block.hash head, State.Block.header head) - end else - Lwt_stream.get stream in - RPC_answer.return_stream { next ; shutdown } + match Validator.get validator (State.Chain.id chain) with + | Error _ -> Lwt.fail Not_found + | Ok chain_validator -> + let block_stream, stopper = Chain_validator.new_head_watcher chain_validator in + Chain.head chain >>= fun head -> + let shutdown () = Lwt_watcher.shutdown stopper in + let in_next_protocols block = + match q#next_protocols with + | [] -> Lwt.return_true + | protocols -> + State.Block.context block >>= fun context -> + Context.get_protocol context >>= fun next_protocol -> + Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) in + let stream = + Lwt_stream.filter_map_s + (fun block -> + in_next_protocols block >>= fun in_next_protocols -> + if in_next_protocols then + Lwt.return_some (State.Block.hash block, State.Block.header block) + else + Lwt.return_none) + block_stream in + in_next_protocols head >>= fun first_block_is_among_next_protocols -> + let first_call = + (* Skip the first block if this is false *) + ref first_block_is_among_next_protocols in + let next () = + if !first_call then begin + first_call := false ; Lwt.return_some (State.Block.hash head, State.Block.header head) + end else + Lwt_stream.get stream in + RPC_answer.return_stream { next ; shutdown } end ; gen_register0 Monitor_services.S.protocols begin fun () () -> diff --git a/src/lib_shell/node.ml b/src/lib_shell/node.ml index 1c4dd04b6..bb6df9cb2 100644 --- a/src/lib_shell/node.ml +++ b/src/lib_shell/node.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -199,9 +200,9 @@ let create (Block_validator.Internal context_index) prevalidator_limits chain_validator_limits - >>= fun validator -> + >>=? fun validator -> Validator.activate validator - ?max_child_ttl ~start_prevalidator mainchain_state >>= fun mainchain_validator -> + ?max_child_ttl ~start_prevalidator mainchain_state >>=? fun mainchain_validator -> let shutdown () = P2p.shutdown p2p >>= fun () -> Validator.shutdown validator >>= fun () -> diff --git a/src/lib_shell/peer_validator.ml b/src/lib_shell/peer_validator.ml index c608d4fd0..1b7522c03 100644 --- a/src/lib_shell/peer_validator.ml +++ b/src/lib_shell/peer_validator.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -342,7 +343,7 @@ let on_launch _ name parameters = and notify_new_block block = pv.last_validated_head <- State.Block.header block ; parameters.notify_new_block block in - Lwt.return pv + return pv let table = let merge w (Worker.Any_request neu) old = diff --git a/src/lib_shell/peer_validator.mli b/src/lib_shell/peer_validator.mli index cd7e2a644..416a3a6bb 100644 --- a/src/lib_shell/peer_validator.mli +++ b/src/lib_shell/peer_validator.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -43,7 +44,7 @@ val create: ?notify_termination: (unit -> unit) -> limits -> Block_validator.t -> - Distributed_db.chain_db -> P2p_peer.Id.t -> t Lwt.t + Distributed_db.chain_db -> P2p_peer.Id.t -> t tzresult Lwt.t val shutdown: t -> unit Lwt.t val notify_branch: t -> Block_locator.t -> unit diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 91575c80c..d3467a0aa 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -84,7 +85,8 @@ module type T = sig val validation_result: types_state -> error Preapply_result.t val fitness: unit -> Fitness.t Lwt.t - val worker: worker Lwt.t + val initialization_errors: unit tzresult Lwt.t + val worker: worker Lazy.t end @@ -691,7 +693,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct List.iter (fun oph -> Lwt.ignore_result (fetch_operation w pv oph)) mempool.known_valid ; - Lwt.return pv + return pv let on_error w r st errs = Worker.record_event w (Event.Request (r, st, Some errs)) ; @@ -713,14 +715,23 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct * functor (and thus a single worker for the single instantiaion of Worker). * Whislt this is somewhat abusing the intended purpose of worker, it is part * of a transition plan to a one-worker-per-peer architecture. *) - let worker = + let worker_promise = Worker.launch table Arg.limits.worker_limits name (Arg.limits, Arg.chain_db) (module Handlers) + let initialization_errors = + worker_promise >>=? fun _ -> return_unit + + let worker = lazy begin + match Lwt.state worker_promise with + | Lwt.Return (Ok worker) -> worker + | Lwt.Return (Error _) | Lwt.Fail _ | Lwt.Sleep -> assert false + end + let fitness () = - worker >>= fun w -> + let w = Lazy.force worker in let pv = Worker.state w in begin Lwt.return pv.validation_state >>=? fun state -> @@ -757,43 +768,40 @@ let create limits (module Proto: Registered_protocol.T) chain_db = let chain_db = chain_db let chain_id = chain_id end) in - Prevalidator.worker >>= fun _ -> + (* Checking initialization errors before giving a reference to dnagerous + * `worker` value to caller. *) + Prevalidator.initialization_errors >>=? fun () -> ChainProto_registry.register Prevalidator.name (module Prevalidator: T); - Lwt.return (module Prevalidator: T) + return (module Prevalidator: T) | Some p -> - Lwt.return p + return p let shutdown (t:t) = let module Prevalidator: T = (val t) in - Prevalidator.worker >>= fun w -> + let w = Lazy.force Prevalidator.worker in ChainProto_registry.remove Prevalidator.name; Prevalidator.Worker.shutdown w let flush (t:t) head = let module Prevalidator: T = (val t) in - Prevalidator.worker >>= fun w -> + let w = Lazy.force Prevalidator.worker in Prevalidator.Worker.push_request_and_wait w (Request.Flush head) let notify_operations (t:t) peer mempool = let module Prevalidator: T = (val t) in - Prevalidator.worker >>= fun w -> + let w = Lazy.force Prevalidator.worker in Prevalidator.Worker.push_request w (Request.Notify (peer, mempool)) let operations (t:t) = let module Prevalidator: T = (val t) in - match Lwt.state Prevalidator.worker with - | Lwt.Fail _ | Lwt.Sleep -> - (* FIXME: this shouldn't happen at all, here we return a safe value *) - (Preapply_result.empty, Operation_hash.Map.empty) - | Lwt.Return w -> - let pv = Prevalidator.Worker.state w in - ({ (Prevalidator.validation_result pv) with - applied = List.rev pv.applied }, - pv.pending) + let w = Lazy.force Prevalidator.worker in + let pv = Prevalidator.Worker.state w in + ({ (Prevalidator.validation_result pv) with applied = List.rev pv.applied }, + pv.pending) let pending ?block (t:t) = let module Prevalidator: T = (val t) in - Prevalidator.worker >>= fun w -> + let w = Lazy.force Prevalidator.worker in let pv = Prevalidator.Worker.state w in let ops = Preapply_result.operations (Prevalidator.validation_result pv) in match block with @@ -805,9 +813,9 @@ let pending ?block (t:t) = let timestamp (t:t) = let module Prevalidator: T = (val t) in - Prevalidator.worker >>= fun w -> + let w = Lazy.force Prevalidator.worker in let pv = Prevalidator.Worker.state w in - Lwt.return pv.timestamp + pv.timestamp let fitness (t:t) = let module Prevalidator: T = (val t) in @@ -815,13 +823,13 @@ let fitness (t:t) = let inject_operation (t:t) op = let module Prevalidator: T = (val t) in - Prevalidator.worker >>= fun w -> + let w = Lazy.force Prevalidator.worker in Prevalidator.Worker.push_request_and_wait w (Inject op) let status (t:t) = let module Prevalidator: T = (val t) in - Prevalidator.worker >>= fun w -> - Lwt.return (Prevalidator.Worker.status w) + let w = Lazy.force Prevalidator.worker in + Prevalidator.Worker.status w let running_workers () = ChainProto_registry.fold @@ -830,27 +838,18 @@ let running_workers () = let pending_requests (t:t) = let module Prevalidator: T = (val t) in - match Lwt.state Prevalidator.worker with - | Lwt.Fail _ | Lwt.Sleep -> - (* FIXME: this shouldn't happen at all, here we return a safe value *) - [] - | Lwt.Return w -> Prevalidator.Worker.pending_requests w + let w = Lazy.force Prevalidator.worker in + Prevalidator.Worker.pending_requests w let current_request (t:t) = let module Prevalidator: T = (val t) in - match Lwt.state Prevalidator.worker with - | Lwt.Fail _ | Lwt.Sleep -> - (* FIXME: this shouldn't happen at all, here we return a safe value *) - None - | Lwt.Return w -> Prevalidator.Worker.current_request w + let w = Lazy.force Prevalidator.worker in + Prevalidator.Worker.current_request w let last_events (t:t) = let module Prevalidator: T = (val t) in - match Lwt.state Prevalidator.worker with - | Lwt.Fail _ | Lwt.Sleep -> - (* FIXME: this shouldn't happen at all, here we return a safe value *) - [] - | Lwt.Return w -> Prevalidator.Worker.last_events w + let w = Lazy.force Prevalidator.worker in + Prevalidator.Worker.last_events w let protocol_hash (t:t) = let module Prevalidator: T = (val t) in @@ -883,7 +882,11 @@ let rpc_directory : t option RPC_directory.t = Lwt.return (RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory) | Some t -> let module Prevalidator: T = (val t: T) in - Prevalidator.worker >>= fun w -> - let pv = Prevalidator.Worker.state w in - let pv_rpc_dir = Lazy.force pv.rpc_directory in - Lwt.return (RPC_directory.map (fun _ -> Lwt.return pv) pv_rpc_dir)) + Prevalidator.initialization_errors >>= function + | Error _ -> + Lwt.return (RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory) + | Ok () -> + let w = Lazy.force Prevalidator.worker in + let pv = Prevalidator.Worker.state w in + let pv_rpc_dir = Lazy.force pv.rpc_directory in + Lwt.return (RPC_directory.map (fun _ -> Lwt.return pv) pv_rpc_dir)) diff --git a/src/lib_shell/prevalidator.mli b/src/lib_shell/prevalidator.mli index 14c33022a..e6050486a 100644 --- a/src/lib_shell/prevalidator.mli +++ b/src/lib_shell/prevalidator.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -61,7 +62,7 @@ val create: limits -> (module Registered_protocol.T) -> Distributed_db.chain_db -> - t Lwt.t + t tzresult Lwt.t val shutdown: t -> unit Lwt.t (** Notify the prevalidator that the identified peer has sent a bunch of @@ -76,13 +77,13 @@ val flush: t -> Block_hash.t -> unit tzresult Lwt.t (** Returns the timestamp of the prevalidator worker, that is the timestamp of the last reset of the prevalidation context *) -val timestamp: t -> Time.t Lwt.t +val timestamp: t -> Time.t (** Returns the fitness of the current prevalidation context *) val fitness: t -> Fitness.t Lwt.t (** Returns the list of valid operations known to this prevalidation worker *) -val operations: t -> error Preapply_result.t * Operation.t Operation_hash.Map.t +val operations: t -> (error Preapply_result.t * Operation.t Operation_hash.Map.t) (** Returns the list of pending operations known to this prevalidation worker *) val pending: ?block:State.Block.t -> t -> Operation.t Operation_hash.Map.t Lwt.t @@ -102,7 +103,7 @@ val parameters: t -> limits * Distributed_db.chain_db (** Worker status and events *) (* None indicates the there are no workers for the current protocol. *) -val status: t -> Worker_types.worker_status Lwt.t +val status: t -> Worker_types.worker_status val pending_requests : t -> (Time.t * Prevalidator_worker_state.Request.view) list val current_request : t -> (Time.t * Time.t * Prevalidator_worker_state.Request.view) option val last_events : t -> (Lwt_log_core.level * Prevalidator_worker_state.Event.t list) list diff --git a/src/lib_shell/validator.ml b/src/lib_shell/validator.ml index ffa210c9d..f499f94e6 100644 --- a/src/lib_shell/validator.ml +++ b/src/lib_shell/validator.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -36,7 +37,7 @@ type t = { prevalidator_limits: Prevalidator.limits ; valid_block_input: State.Block.t Lwt_watcher.input ; - active_chains: Chain_validator.t Lwt.t Chain_id.Table.t ; + active_chains: Chain_validator.t Chain_id.Table.t ; } @@ -47,9 +48,9 @@ let create state db prevalidator_limits chain_validator_limits = - Block_validator.create block_validator_limits db block_validator_kind >>= fun block_validator -> + Block_validator.create block_validator_limits db block_validator_kind >>=? fun block_validator -> let valid_block_input = Lwt_watcher.create_input () in - Lwt.return + return { state ; db ; block_validator ; block_validator_limits ; prevalidator_limits ; @@ -63,25 +64,26 @@ let activate v ?max_child_ttl ~start_prevalidator chain_state = f "activate chain %a" -% t event "active_chain" -% a State_logging.chain_id chain_id) >>= fun () -> - try Chain_id.Table.find v.active_chains chain_id - with Not_found -> - let nv = + match Chain_id.Table.find_opt v.active_chains chain_id with + |Some nv -> return nv + |None -> Chain_validator.create ?max_child_ttl ~start_prevalidator v.peer_validator_limits v.prevalidator_limits v.block_validator v.valid_block_input v.db chain_state - v.chain_validator_limits in - Chain_id.Table.add v.active_chains chain_id nv ; - nv + v.chain_validator_limits >>=? fun nv -> + Chain_id.Table.add v.active_chains chain_id nv ; + return nv let get_exn { active_chains } chain_id = Chain_id.Table.find active_chains chain_id -let get v chain_id = - try get_exn v chain_id >>= fun nv -> return nv - with Not_found -> fail (Validation_errors.Inactive_chain chain_id) +let get { active_chains } chain_id = + match Chain_id.Table.find_opt active_chains chain_id with + |Some nv -> Ok nv + |None -> error (Validation_errors.Inactive_chain chain_id) let validate_block v ?(force = false) ?chain_id bytes operations = let hash = Block_hash.hash_bytes [bytes] in @@ -96,10 +98,10 @@ let validate_block v ?(force = false) ?chain_id bytes operations = | None -> failwith "Unknown predecessor (%a), cannot inject the block." Block_hash.pp_short block.shell.predecessor - | Some (chain_id, _bh) -> get v chain_id + | Some (chain_id, _bh) -> Lwt.return (get v chain_id) end | Some chain_id -> - get v chain_id >>=? fun nv -> + Lwt.return (get v chain_id) >>=? fun nv -> if force then return nv else @@ -120,7 +122,7 @@ let shutdown { active_chains ; block_validator } = let jobs = Block_validator.shutdown block_validator :: Chain_id.Table.fold - (fun _ nv acc -> (nv >>= Chain_validator.shutdown) :: acc) + (fun _ nv acc -> Chain_validator.shutdown nv :: acc) active_chains [] in Lwt.join jobs >>= fun () -> Lwt.return_unit @@ -137,10 +139,10 @@ let inject_operation v ?chain_id op = | None -> failwith "Unknown branch (%a), cannot inject the operation." Block_hash.pp_short op.shell.branch - | Some (chain_id, _bh) -> get v chain_id + | Some (chain_id, _bh) -> Lwt.return (get v chain_id) end | Some chain_id -> - get v chain_id >>=? fun nv -> + Lwt.return (get v chain_id) >>=? fun nv -> Distributed_db.Block_header.known (Chain_validator.chain_db nv) op.shell.branch >>= function diff --git a/src/lib_shell/validator.mli b/src/lib_shell/validator.mli index 515d4e409..aa2855f95 100644 --- a/src/lib_shell/validator.mli +++ b/src/lib_shell/validator.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -35,7 +36,7 @@ val create: Block_validator.validator_kind -> Prevalidator.limits -> Chain_validator.limits -> - t Lwt.t + t tzresult Lwt.t val shutdown: t -> unit Lwt.t (** Start the validation scheduler of a given chain. *) @@ -43,10 +44,10 @@ val activate: t -> ?max_child_ttl:int -> start_prevalidator:bool -> - State.Chain.t -> Chain_validator.t Lwt.t + State.Chain.t -> Chain_validator.t tzresult Lwt.t -val get: t -> Chain_id.t -> Chain_validator.t tzresult Lwt.t -val get_exn: t -> Chain_id.t -> Chain_validator.t Lwt.t +val get: t -> Chain_id.t -> Chain_validator.t tzresult +val get_exn: t -> Chain_id.t -> Chain_validator.t (** Force the validation of a block. *) val validate_block: diff --git a/src/lib_shell/worker.ml b/src/lib_shell/worker.ml index 514d0d2ea..a8f32c38a 100644 --- a/src/lib_shell/worker.ml +++ b/src/lib_shell/worker.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -106,7 +107,7 @@ module type T = sig It is possible to initialize the message queue. Of course calling {!state} will fail at that point. *) val on_launch : - self -> Name.t -> Types.parameters -> Types.state Lwt.t + self -> Name.t -> Types.parameters -> Types.state tzresult Lwt.t (** The main request processor, i.e. the body of the event loop. *) val on_request : @@ -150,7 +151,7 @@ module type T = sig 'kind table -> ?timeout:float -> Worker_types.limits -> Name.t -> Types.parameters -> (module HANDLERS with type self = 'kind t) -> - 'kind t Lwt.t + 'kind t tzresult Lwt.t (** Triggers a worker termination and waits for its completion. Cannot be called from within the handlers. *) @@ -416,7 +417,7 @@ module Make module type HANDLERS = sig type self val on_launch : - self -> Name.t -> Types.parameters -> Types.state Lwt.t + self -> Name.t -> Types.parameters -> Types.state tzresult Lwt.t val on_request : self -> 'a Request.t -> 'a tzresult Lwt.t val on_no_request : @@ -515,7 +516,7 @@ module Make kind table -> ?timeout:float -> Worker_types.limits -> Name.t -> Types.parameters -> (module HANDLERS with type self = kind t) -> - kind t Lwt.t + kind t tzresult Lwt.t = fun table ?timeout limits name parameters (module Handlers) -> let name_s = Format.asprintf "%a" Name.pp name in @@ -556,7 +557,7 @@ module Make Logger.lwt_log_notice "Worker started for %s" name_s end >>= fun () -> Hashtbl.add table.instances name w ; - Handlers.on_launch w name parameters >>= fun state -> + Handlers.on_launch w name parameters >>=? fun state -> w.status <- Running (Time.now ()) ; w.state <- Some state ; w.worker <- @@ -564,7 +565,7 @@ module Make full_name ~run:(fun () -> worker_loop (module Handlers) w) ~cancel:(fun () -> Lwt_canceler.cancel w.canceler) ; - Lwt.return w + return w let shutdown w = let (module Logger) = w.logger in diff --git a/src/lib_shell/worker.mli b/src/lib_shell/worker.mli index b35d7700b..b03606938 100644 --- a/src/lib_shell/worker.mli +++ b/src/lib_shell/worker.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -171,7 +172,7 @@ module type T = sig It is possible to initialize the message queue. Of course calling {!state} will fail at that point. *) val on_launch : - self -> Name.t -> Types.parameters -> Types.state Lwt.t + self -> Name.t -> Types.parameters -> Types.state tzresult Lwt.t (** The main request processor, i.e. the body of the event loop. *) val on_request : @@ -215,7 +216,7 @@ module type T = sig 'kind table -> ?timeout:float -> Worker_types.limits -> Name.t -> Types.parameters -> (module HANDLERS with type self = 'kind t) -> - 'kind t Lwt.t + 'kind t tzresult Lwt.t (** Triggers a worker termination and waits for its completion. Cannot be called from within the handlers. *) diff --git a/src/lib_shell/worker_directory.ml b/src/lib_shell/worker_directory.ml index 5f963c6cc..89ee83adb 100644 --- a/src/lib_shell/worker_directory.ml +++ b/src/lib_shell/worker_directory.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2018 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -37,12 +38,11 @@ let build_rpc_directory state = register0 Worker_services.Prevalidators.S.list begin fun () () -> let workers = Prevalidator.running_workers () in - Lwt_list.map_p - (fun (chain_id, _, t) -> - Prevalidator.status t >>= fun status -> - Lwt.return (chain_id, status)) - workers >>= fun info -> - return info + let statuses = + List.map + (fun (chain_id, _, t) -> (chain_id, Prevalidator.status t)) + workers in + return statuses end ; register1 Worker_services.Prevalidators.S.state begin fun chain () () -> @@ -53,12 +53,16 @@ let build_rpc_directory state = * register multiple Prevalidator for a single chain (using distinct * protocols). However, this is never done. *) List.find (fun (c, _, _) -> Chain_id.equal c chain_id) workers in - Prevalidator.status t >>= fun status -> + let status = Prevalidator.status t in + let pending_requests = Prevalidator.pending_requests t in + let backlog = Prevalidator.last_events t in + let current_request = Prevalidator.current_request t in return - { Worker_types.status = status ; - pending_requests = Prevalidator.pending_requests t ; - backlog = Prevalidator.last_events t ; - current_request = Prevalidator.current_request t } + { Worker_types. + status ; + pending_requests ; + backlog ; + current_request } end ; (* Workers : Block_validator *)