Mempool_worker: error management at create

This commit is contained in:
Raphaël Proust 2018-11-08 09:33:14 +08:00
parent 89372a8e28
commit f593677e99
No known key found for this signature in database
GPG Key ID: F4B685504488CEC0
2 changed files with 59 additions and 73 deletions

View File

@ -47,7 +47,7 @@ module type T = sig
| Not_in_branch | Not_in_branch
(** Creates/tear-down a new mempool validator context. *) (** Creates/tear-down a new mempool validator context. *)
val create : limits -> Distributed_db.chain_db -> t Lwt.t val create : limits -> Distributed_db.chain_db -> t tzresult Lwt.t
val shutdown : t -> unit Lwt.t val shutdown : t -> unit Lwt.t
(** parse a new operation and add it to the mempool context *) (** parse a new operation and add it to the mempool context *)
@ -56,7 +56,7 @@ module type T = sig
(** validate a new operation and add it to the mempool context *) (** validate a new operation and add it to the mempool context *)
val validate : t -> operation -> result tzresult Lwt.t val validate : t -> operation -> result tzresult Lwt.t
val chain_db : t -> Distributed_db.chain_db tzresult val chain_db : t -> Distributed_db.chain_db
val rpc_directory : t RPC_directory.t val rpc_directory : t RPC_directory.t
@ -352,11 +352,12 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
type parameters = { type parameters = {
limits : limits ; limits : limits ;
chain_db : Distributed_db.chain_db chain_db : Distributed_db.chain_db ;
validation_state : Proto.validation_state ;
} }
(* internal worker state *) (* internal worker state *)
type worker_state = type state =
{ {
(* state of the validator. this is updated at each apply_operation *) (* state of the validator. this is updated at each apply_operation *)
mutable validation_state : Proto.validation_state ; mutable validation_state : Proto.validation_state ;
@ -375,22 +376,17 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
parameters : parameters ; parameters : parameters ;
} }
type state = worker_state tzresult
type worker_view = { cache : Cache.t } type view = { cache : Cache.t }
type view = worker_view tzresult
let view (state : state) _ : view = let view (state : state) _ : view = { cache = state.cache }
state >|? fun state -> { cache = state.cache }
let encoding = let encoding =
let open Data_encoding in let open Data_encoding in
Error_monad.result_encoding ( conv
conv (fun { cache } -> cache)
(fun { cache } -> cache) (fun cache -> { cache })
(fun cache -> { cache }) Cache.encoding
Cache.encoding
)
let pp ppf _view = let pp ppf _view =
Format.fprintf ppf "lots of operations" Format.fprintf ppf "lots of operations"
@ -484,7 +480,7 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
(* this function update the internal state of the worker *) (* this function update the internal state of the worker *)
let validate_helper w parsed_op = let validate_helper w parsed_op =
Lwt.return (Worker.state w) >>=? fun state -> let state = Worker.state w in
apply_operation state parsed_op >>= fun (validation_state,result) -> apply_operation state parsed_op >>= fun (validation_state,result) ->
begin begin
match validation_state with match validation_state with
@ -494,19 +490,17 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
return result return result
let notify_helper w result { Operation.shell ; proto } = let notify_helper w result { Operation.shell ; proto } =
match Worker.state w with let state = Worker.state w in
(* this function is called by on_validate where we take care of the error *) (* this function is called by on_validate where we take care of the error *)
| Error _err -> () let protocol_data =
| Ok state -> Data_encoding.Binary.of_bytes_exn
let protocol_data = Proto.operation_data_encoding
Data_encoding.Binary.of_bytes_exn proto in
Proto.operation_data_encoding Lwt_watcher.notify state.operation_stream (result, shell, protocol_data)
proto in
Lwt_watcher.notify state.operation_stream (result, shell, protocol_data)
(* memoization is done only at on_* level *) (* memoization is done only at on_* level *)
let on_validate w parsed_op = let on_validate w parsed_op =
Lwt.return (Worker.state w) >>=? fun state -> let state = Worker.state w in
match Cache.find_validated_opt state.cache parsed_op with match Cache.find_validated_opt state.cache parsed_op with
| None | Some (Branch_delayed _) -> | None | Some (Branch_delayed _) ->
validate_helper w parsed_op >>=? fun result -> validate_helper w parsed_op >>=? fun result ->
@ -517,7 +511,7 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
| Some result -> return result | Some result -> return result
let on_parse w raw_op = let on_parse w raw_op =
Lwt.return (Worker.state w) >>=? fun state -> let state = Worker.state w in
match Cache.find_parsed_opt state.cache raw_op with match Cache.find_parsed_opt state.cache raw_op with
| None -> | None ->
parse_helper w raw_op >>= fun parsed_op -> parse_helper w raw_op >>= fun parsed_op ->
@ -532,14 +526,10 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
| Request.Parse raw_op -> on_parse w raw_op | Request.Parse raw_op -> on_parse w raw_op
| Request.Validate parsed_op -> on_validate w parsed_op | Request.Validate parsed_op -> on_validate w parsed_op
let on_launch (_ : t) (_ : Name.t) ( { chain_db } as parameters ) = let on_launch (_ : t) (_ : Name.t) ( { chain_db ; validation_state } as parameters ) =
let chain_state = Distributed_db.chain_state chain_db in let chain_state = Distributed_db.chain_state chain_db in
Chain.data chain_state >>= fun Chain.data chain_state >>= fun { current_mempool = _mempool ; live_blocks ; live_operations } ->
{ current_head = predecessor ; current_mempool = _mempool ; Lwt.return {
live_blocks ; live_operations } ->
let timestamp = Time.now () in
create ~predecessor ~timestamp () >>=? fun validation_state ->
return {
validation_state ; validation_state ;
cache = Cache.create () ; cache = Cache.create () ;
live_blocks ; live_blocks ;
@ -549,16 +539,14 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
} }
let on_close w = let on_close w =
match Worker.state w with let state = Worker.state w in
| Error _err -> Lwt.return_unit Lwt_watcher.shutdown_input state.operation_stream;
| Ok state -> Cache.iter_validated (fun hash _ ->
Lwt_watcher.shutdown_input state.operation_stream; Distributed_db.Operation.clear_or_cancel
Cache.iter_validated (fun hash _ -> state.parameters.chain_db hash)
Distributed_db.Operation.clear_or_cancel state.cache ;
state.parameters.chain_db hash) Cache.clear state.cache;
state.cache ; Lwt.return_unit
Cache.clear state.cache;
Lwt.return_unit
let on_error w r st errs = let on_error w r st errs =
Worker.record_event w (Event.Request (r, st, Some errs)) ; Worker.record_event w (Event.Request (r, st, Some errs)) ;
@ -575,7 +563,6 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
let chain_id = State.Chain.id chain_state in let chain_id = State.Chain.id chain_state in
let module Handlers = struct let module Handlers = struct
type self = t type self = t
let on_launch = on_launch let on_launch = on_launch
let on_close = on_close let on_close = on_close
let on_error = on_error let on_error = on_error
@ -583,12 +570,15 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
let on_no_request _ = return_unit let on_no_request _ = return_unit
let on_request = on_request let on_request = on_request
end in end in
Worker.launch Chain.data chain_state >>= fun { current_head = predecessor } ->
table let timestamp = Time.now () in
limits.worker_limits create ~predecessor ~timestamp () >>=? fun validation_state ->
(chain_id, Proto.hash) (Worker.launch
{ limits ; chain_db } table
(module Handlers) limits.worker_limits
(chain_id, Proto.hash)
{ limits ; chain_db ; validation_state }
(module Handlers) >>= return)
(* Exporting functions *) (* Exporting functions *)
@ -600,7 +590,6 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
let chain_db t = let chain_db t =
let state = Worker.state t in let state = Worker.state t in
state >|? fun state ->
state.parameters.chain_db state.parameters.chain_db
let pending_rpc_directory : t RPC_directory.t = let pending_rpc_directory : t RPC_directory.t =
@ -608,9 +597,8 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
RPC_directory.empty RPC_directory.empty
(Proto_services.S.Mempool.pending_operations RPC_path.open_root) (Proto_services.S.Mempool.pending_operations RPC_path.open_root)
(fun w () () -> (fun w () () ->
match Worker.state w with let state = Worker.state w in
| Error err -> RPC_answer.fail err RPC_answer.return (Cache.to_mempool state.cache)
| Ok state -> RPC_answer.return (Cache.to_mempool state.cache)
) )
let monitor_rpc_directory : t RPC_directory.t = let monitor_rpc_directory : t RPC_directory.t =
@ -618,24 +606,22 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
RPC_directory.empty RPC_directory.empty
(Proto_services.S.Mempool.monitor_operations RPC_path.open_root) (Proto_services.S.Mempool.monitor_operations RPC_path.open_root)
(fun w params () -> (fun w params () ->
match Worker.state w with let state = Worker.state w in
| Error err -> RPC_answer.fail err let filter_result = function
| Ok state -> | Applied _ -> params#applied
let filter_result = function | Refused _ -> params#branch_refused
| Applied _ -> params#applied | Branch_refused _ -> params#refused
| Refused _ -> params#branch_refused | Branch_delayed _ -> params#branch_delayed
| Branch_refused _ -> params#refused | _ -> false in
| Branch_delayed _ -> params#branch_delayed
| _ -> false in
let op_stream, stopper = Lwt_watcher.create_stream state.operation_stream in let op_stream, stopper = Lwt_watcher.create_stream state.operation_stream in
let shutdown () = Lwt_watcher.shutdown stopper in let shutdown () = Lwt_watcher.shutdown stopper in
let next () = let next () =
Lwt_stream.get op_stream >>= function Lwt_stream.get op_stream >>= function
| Some (kind, shell, protocol_data) when filter_result kind -> | Some (kind, shell, protocol_data) when filter_result kind ->
Lwt.return_some [ { Proto.shell ; protocol_data } ] Lwt.return_some [ { Proto.shell ; protocol_data } ]
| _ -> Lwt.return_none in | _ -> Lwt.return_none in
RPC_answer.return_stream { next ; shutdown } RPC_answer.return_stream { next ; shutdown }
) )
(* /mempool/<chain_id>/pending (* /mempool/<chain_id>/pending

View File

@ -48,7 +48,7 @@ module type T = sig
| Not_in_branch | Not_in_branch
(** Creates/tear-down a new mempool validator context. *) (** Creates/tear-down a new mempool validator context. *)
val create : limits -> Distributed_db.chain_db -> t Lwt.t val create : limits -> Distributed_db.chain_db -> t tzresult Lwt.t
val shutdown : t -> unit Lwt.t val shutdown : t -> unit Lwt.t
(** parse a new operation and add it to the mempool context *) (** parse a new operation and add it to the mempool context *)
@ -57,7 +57,7 @@ module type T = sig
(** validate a new operation and add it to the mempool context *) (** validate a new operation and add it to the mempool context *)
val validate : t -> operation -> result tzresult Lwt.t val validate : t -> operation -> result tzresult Lwt.t
val chain_db : t -> Distributed_db.chain_db tzresult val chain_db : t -> Distributed_db.chain_db
val rpc_directory : t RPC_directory.t val rpc_directory : t RPC_directory.t