diff --git a/src/lib_shell/mempool_worker.ml b/src/lib_shell/mempool_worker.ml index b5d303f36..2575a32d3 100644 --- a/src/lib_shell/mempool_worker.ml +++ b/src/lib_shell/mempool_worker.ml @@ -365,7 +365,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) parameters : parameters ; } - type view = { cache : ValidatedCache.t } + type view = { cache : Cache.t } let view (state : state) _ : view = { cache = state.cache } @@ -374,7 +374,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) conv (fun { cache } -> cache) (fun cache -> { cache }) - ValidatedCache.encoding + Cache.encoding let pp ppf _view = Format.fprintf ppf "lots of operations" @@ -471,7 +471,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) (* this function update the internal state of the worker *) let validate_helper w parsed_op = let state = Worker.state w in - apply_operation state parsed_op >>= fun (validation_state, result) -> + apply_operation state parsed_op >>= fun (validation_state,result) -> begin match validation_state with | Some validation_state -> state.validation_state <- validation_state @@ -491,14 +491,23 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) (* memoization is done only at on_* level *) let on_validate w parsed_op = let state = Worker.state w in - match ValidatedCache.find_opt state.cache parsed_op with - | None | Some ((Branch_delayed _),_) -> - validate_helper w parsed_op >>= fun result -> - ValidatedCache.add state.cache parsed_op (result, parsed_op.raw); + match Cache.find_validated_opt state.cache parsed_op with + | None | Some (Branch_delayed _) -> + validate_helper w parsed_op >>=? fun result -> + Cache.add_validated state.cache parsed_op result; (* operations are notified only the first time *) notify_helper w result parsed_op.raw ; - Lwt.return result - | Some (result,_) -> Lwt.return result + return result + | Some result -> return result + + let on_parse w raw_op = + let state = Worker.state w in + match Cache.find_parsed_opt state.cache raw_op with + | None -> + parse_helper w raw_op >>= fun parsed_op -> + Cache.add_parsed state.cache raw_op parsed_op; + Lwt.return parsed_op + | Some parsed_op -> Lwt.return parsed_op (* worker's handlers *) let on_request : @@ -508,13 +517,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) let on_launch (_ : t) (_ : Name.t) ( { chain_db ; validation_state } as parameters ) = let chain_state = Distributed_db.chain_state chain_db in - Chain.data chain_state >>= fun { - current_mempool = _mempool ; - live_blocks ; live_operations } -> - (* remove all operations that are already included *) - Operation_hash.Set.iter (fun hash -> - ParsedCache.rem parsed_cache hash - ) live_operations; + Chain.data chain_state >>= fun { current_mempool = _mempool ; live_blocks ; live_operations } -> Lwt.return { validation_state ; cache = ValidatedCache.create () ; @@ -527,11 +530,11 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) let on_close w = let state = Worker.state w in Lwt_watcher.shutdown_input state.operation_stream; - ValidatedCache.iter (fun hash _ -> + Cache.iter_validated (fun hash _ -> Distributed_db.Operation.clear_or_cancel state.parameters.chain_db hash) state.cache ; - ValidatedCache.clear state.cache; + Cache.clear state.cache; Lwt.return_unit let on_error w r st errs = @@ -591,7 +594,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) (Proto_services.S.Mempool.pending_operations RPC_path.open_root) (fun w () () -> let state = Worker.state w in - RPC_answer.return (ValidatedCache.to_mempool state.cache) + RPC_answer.return (Cache.to_mempool state.cache) ) let monitor_rpc_directory : t RPC_directory.t = @@ -602,19 +605,18 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T) let state = Worker.state w in let filter_result = function | Applied _ -> params#applied - | Refused _ -> params#refused - | Branch_refused _ -> params#branch_refused + | Refused _ -> params#branch_refused + | Branch_refused _ -> params#refused | Branch_delayed _ -> params#branch_delayed | _ -> false in let op_stream, stopper = Lwt_watcher.create_stream state.operation_stream in let shutdown () = Lwt_watcher.shutdown stopper in - let rec next () = + let next () = Lwt_stream.get op_stream >>= function | Some (kind, shell, protocol_data) when filter_result kind -> Lwt.return_some [ { Proto.shell ; protocol_data } ] - | Some _ -> next () - | None -> Lwt.return_none in + | _ -> Lwt.return_none in RPC_answer.return_stream { next ; shutdown } )