Mempool: cache result and operation in ValidatedCache

This commit is contained in:
Pietro Abate 2018-11-19 11:37:43 +01:00 committed by Benjamin Canou
parent 52d7215ed2
commit 2d5c56eeca

View File

@ -253,10 +253,15 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
(* validated operations' cache. used for memoization *) (* validated operations' cache. used for memoization *)
module ValidatedCache = struct module ValidatedCache = struct
type t = result Operation_hash.Table.t type t = (result * Operation.t) Operation_hash.Table.t
let encoding = let encoding =
Operation_hash.Table.encoding result_encoding let open Data_encoding in
Operation_hash.Table.encoding (
tup2
result_encoding
Operation.encoding
)
let create () = Operation_hash.Table.create 1000 let create () = Operation_hash.Table.create 1000
@ -269,7 +274,7 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
let iter f t = let iter f t =
Operation_hash.Table.iter f t Operation_hash.Table.iter f t
let to_mempool t parsed_cache = let to_mempool t =
let empty = { let empty = {
Proto_services.Mempool.applied = [] ; Proto_services.Mempool.applied = [] ;
refused = Operation_hash.Map.empty ; refused = Operation_hash.Map.empty ;
@ -284,23 +289,20 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
op.Operation.proto in op.Operation.proto in
{ Proto.shell = op.shell ; protocol_data } in { Proto.shell = op.shell ; protocol_data } in
Operation_hash.Table.fold Operation_hash.Table.fold
(fun hash result acc -> (fun hash (result,raw_op) acc ->
match ParsedCache.find_hash_opt parsed_cache hash with let proto_op = map_op raw_op in
(* XXX this invariant should be better enforced *)
| None | Some (Error _) -> assert false
| Some (Ok op) -> begin
match result with match result with
| Applied _ -> { | Applied _ -> {
acc with acc with
Proto_services.Mempool.applied = Proto_services.Mempool.applied =
(hash, map_op op.raw)::acc.Proto_services.Mempool.applied (hash, proto_op)::acc.Proto_services.Mempool.applied
} }
| Branch_refused err -> { | Branch_refused err -> {
acc with acc with
Proto_services.Mempool.branch_refused = Proto_services.Mempool.branch_refused =
Operation_hash.Map.add Operation_hash.Map.add
hash hash
(map_op op.raw,err) (proto_op,err)
acc.Proto_services.Mempool.branch_refused acc.Proto_services.Mempool.branch_refused
} }
| Branch_delayed err -> { | Branch_delayed err -> {
@ -308,7 +310,7 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
Proto_services.Mempool.branch_delayed = Proto_services.Mempool.branch_delayed =
Operation_hash.Map.add Operation_hash.Map.add
hash hash
(map_op op.raw,err) (proto_op,err)
acc.Proto_services.Mempool.branch_delayed acc.Proto_services.Mempool.branch_delayed
} }
| Refused err -> { | Refused err -> {
@ -316,12 +318,11 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
Proto_services.Mempool.refused = Proto_services.Mempool.refused =
Operation_hash.Map.add Operation_hash.Map.add
hash hash
(map_op op.raw,err) (proto_op,err)
acc.Proto_services.Mempool.refused acc.Proto_services.Mempool.refused
} }
| _ -> acc | _ -> acc
end) ) t empty
t empty
let clear t = Operation_hash.Table.clear t let clear t = Operation_hash.Table.clear t
@ -483,13 +484,13 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
let on_validate w parsed_op = let on_validate w parsed_op =
let state = Worker.state w in let state = Worker.state w in
match ValidatedCache.find_opt state.cache parsed_op with match ValidatedCache.find_opt state.cache parsed_op with
| None | Some (Branch_delayed _) -> | None | Some ((Branch_delayed _),_) ->
validate_helper w parsed_op >>= fun result -> validate_helper w parsed_op >>= fun result ->
ValidatedCache.add state.cache parsed_op result; ValidatedCache.add state.cache parsed_op (result, parsed_op.raw);
(* operations are notified only the first time *) (* operations are notified only the first time *)
notify_helper w result parsed_op.raw ; notify_helper w result parsed_op.raw ;
Lwt.return result Lwt.return result
| Some result -> Lwt.return result | Some (result,_) -> Lwt.return result
(* worker's handlers *) (* worker's handlers *)
let on_request : let on_request :
@ -582,7 +583,7 @@ module Make(Proto: Registered_protocol.T) : T with module Proto = Proto = struct
(Proto_services.S.Mempool.pending_operations RPC_path.open_root) (Proto_services.S.Mempool.pending_operations RPC_path.open_root)
(fun w () () -> (fun w () () ->
let state = Worker.state w in let state = Worker.state w in
RPC_answer.return (ValidatedCache.to_mempool state.cache parsed_cache) RPC_answer.return (ValidatedCache.to_mempool state.cache)
) )
let monitor_rpc_directory : t RPC_directory.t = let monitor_rpc_directory : t RPC_directory.t =