Mempool_worker: error management at create

This commit is contained in:
Raphaël Proust 2018-11-08 09:33:14 +08:00 committed by MBourgoin
parent 0d22209028
commit 7e687f8608
No known key found for this signature in database
GPG Key ID: 4B3F7008ABB5B2D0

View File

@ -365,7 +365,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
parameters : parameters ; parameters : parameters ;
} }
type view = { cache : ValidatedCache.t } type view = { cache : Cache.t }
let view (state : state) _ : view = { cache = state.cache } let view (state : state) _ : view = { cache = state.cache }
@ -374,7 +374,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
conv conv
(fun { cache } -> cache) (fun { cache } -> cache)
(fun cache -> { cache }) (fun cache -> { cache })
ValidatedCache.encoding Cache.encoding
let pp ppf _view = let pp ppf _view =
Format.fprintf ppf "lots of operations" Format.fprintf ppf "lots of operations"
@ -491,14 +491,23 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
(* memoization is done only at on_* level *) (* memoization is done only at on_* level *)
let on_validate w parsed_op = let on_validate w parsed_op =
let state = Worker.state w in let state = Worker.state w in
match ValidatedCache.find_opt state.cache parsed_op with match Cache.find_validated_opt state.cache parsed_op with
| None | Some ((Branch_delayed _),_) -> | None | Some (Branch_delayed _) ->
validate_helper w parsed_op >>= fun result -> validate_helper w parsed_op >>=? fun result ->
ValidatedCache.add state.cache parsed_op (result, parsed_op.raw); Cache.add_validated state.cache parsed_op result;
(* operations are notified only the first time *) (* operations are notified only the first time *)
notify_helper w result parsed_op.raw ; notify_helper w result parsed_op.raw ;
Lwt.return result return result
| Some (result,_) -> Lwt.return result | Some result -> return result
let on_parse w raw_op =
let state = Worker.state w in
match Cache.find_parsed_opt state.cache raw_op with
| None ->
parse_helper w raw_op >>= fun parsed_op ->
Cache.add_parsed state.cache raw_op parsed_op;
Lwt.return parsed_op
| Some parsed_op -> Lwt.return parsed_op
(* worker's handlers *) (* worker's handlers *)
let on_request : let on_request :
@ -508,13 +517,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
let on_launch (_ : t) (_ : Name.t) ( { chain_db ; validation_state } as parameters ) = let on_launch (_ : t) (_ : Name.t) ( { chain_db ; validation_state } as parameters ) =
let chain_state = Distributed_db.chain_state chain_db in let chain_state = Distributed_db.chain_state chain_db in
Chain.data chain_state >>= fun { Chain.data chain_state >>= fun { current_mempool = _mempool ; live_blocks ; live_operations } ->
current_mempool = _mempool ;
live_blocks ; live_operations } ->
(* remove all operations that are already included *)
Operation_hash.Set.iter (fun hash ->
ParsedCache.rem parsed_cache hash
) live_operations;
Lwt.return { Lwt.return {
validation_state ; validation_state ;
cache = ValidatedCache.create () ; cache = ValidatedCache.create () ;
@ -527,11 +530,11 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
let on_close w = let on_close w =
let state = Worker.state w in let state = Worker.state w in
Lwt_watcher.shutdown_input state.operation_stream; Lwt_watcher.shutdown_input state.operation_stream;
ValidatedCache.iter (fun hash _ -> Cache.iter_validated (fun hash _ ->
Distributed_db.Operation.clear_or_cancel Distributed_db.Operation.clear_or_cancel
state.parameters.chain_db hash) state.parameters.chain_db hash)
state.cache ; state.cache ;
ValidatedCache.clear state.cache; Cache.clear state.cache;
Lwt.return_unit Lwt.return_unit
let on_error w r st errs = let on_error w r st errs =
@ -591,7 +594,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
(Proto_services.S.Mempool.pending_operations RPC_path.open_root) (Proto_services.S.Mempool.pending_operations RPC_path.open_root)
(fun w () () -> (fun w () () ->
let state = Worker.state w in let state = Worker.state w in
RPC_answer.return (ValidatedCache.to_mempool state.cache) RPC_answer.return (Cache.to_mempool state.cache)
) )
let monitor_rpc_directory : t RPC_directory.t = let monitor_rpc_directory : t RPC_directory.t =
@ -602,19 +605,18 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
let state = Worker.state w in let state = Worker.state w in
let filter_result = function let filter_result = function
| Applied _ -> params#applied | Applied _ -> params#applied
| Refused _ -> params#refused | Refused _ -> params#branch_refused
| Branch_refused _ -> params#branch_refused | Branch_refused _ -> params#refused
| Branch_delayed _ -> params#branch_delayed | Branch_delayed _ -> params#branch_delayed
| _ -> false in | _ -> false in
let op_stream, stopper = Lwt_watcher.create_stream state.operation_stream in let op_stream, stopper = Lwt_watcher.create_stream state.operation_stream in
let shutdown () = Lwt_watcher.shutdown stopper in let shutdown () = Lwt_watcher.shutdown stopper in
let rec next () = let next () =
Lwt_stream.get op_stream >>= function Lwt_stream.get op_stream >>= function
| Some (kind, shell, protocol_data) when filter_result kind -> | Some (kind, shell, protocol_data) when filter_result kind ->
Lwt.return_some [ { Proto.shell ; protocol_data } ] Lwt.return_some [ { Proto.shell ; protocol_data } ]
| Some _ -> next () | _ -> Lwt.return_none in
| None -> Lwt.return_none in
RPC_answer.return_stream { next ; shutdown } RPC_answer.return_stream { next ; shutdown }
) )