Mempool: cache result and operation in ValidatedCache
This commit is contained in:
parent
1420715ab9
commit
0bb2f3d4d1
@ -259,10 +259,15 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
|
|||||||
(* validated operations' cache. used for memoization *)
|
(* validated operations' cache. used for memoization *)
|
||||||
module ValidatedCache = struct
|
module ValidatedCache = struct
|
||||||
|
|
||||||
type t = result Operation_hash.Table.t
|
type t = (result * Operation.t) Operation_hash.Table.t
|
||||||
|
|
||||||
let encoding =
|
let encoding =
|
||||||
Operation_hash.Table.encoding result_encoding
|
let open Data_encoding in
|
||||||
|
Operation_hash.Table.encoding (
|
||||||
|
tup2
|
||||||
|
result_encoding
|
||||||
|
Operation.encoding
|
||||||
|
)
|
||||||
|
|
||||||
let create () = Operation_hash.Table.create 1000
|
let create () = Operation_hash.Table.create 1000
|
||||||
|
|
||||||
@ -275,7 +280,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
|
|||||||
let iter f t =
|
let iter f t =
|
||||||
Operation_hash.Table.iter f t
|
Operation_hash.Table.iter f t
|
||||||
|
|
||||||
let to_mempool t parsed_cache =
|
let to_mempool t =
|
||||||
let empty = {
|
let empty = {
|
||||||
Proto_services.Mempool.applied = [] ;
|
Proto_services.Mempool.applied = [] ;
|
||||||
refused = Operation_hash.Map.empty ;
|
refused = Operation_hash.Map.empty ;
|
||||||
@ -290,44 +295,40 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
|
|||||||
op.Operation.proto in
|
op.Operation.proto in
|
||||||
{ Proto.shell = op.shell ; protocol_data } in
|
{ Proto.shell = op.shell ; protocol_data } in
|
||||||
Operation_hash.Table.fold
|
Operation_hash.Table.fold
|
||||||
(fun hash result acc ->
|
(fun hash (result,raw_op) acc ->
|
||||||
match ParsedCache.find_hash_opt parsed_cache hash with
|
let proto_op = map_op raw_op in
|
||||||
(* XXX this invariant should be better enforced *)
|
match result with
|
||||||
| None | Some (Error _) -> assert false
|
| Applied _ -> {
|
||||||
| Some (Ok op) -> begin
|
acc with
|
||||||
match result with
|
Proto_services.Mempool.applied =
|
||||||
| Applied _ -> {
|
(hash, proto_op)::acc.Proto_services.Mempool.applied
|
||||||
acc with
|
}
|
||||||
Proto_services.Mempool.applied =
|
| Branch_refused err -> {
|
||||||
(hash, map_op op.raw)::acc.Proto_services.Mempool.applied
|
acc with
|
||||||
}
|
Proto_services.Mempool.branch_refused =
|
||||||
| Branch_refused err -> {
|
Operation_hash.Map.add
|
||||||
acc with
|
hash
|
||||||
Proto_services.Mempool.branch_refused =
|
(proto_op,err)
|
||||||
Operation_hash.Map.add
|
acc.Proto_services.Mempool.branch_refused
|
||||||
hash
|
}
|
||||||
(map_op op.raw,err)
|
| Branch_delayed err -> {
|
||||||
acc.Proto_services.Mempool.branch_refused
|
acc with
|
||||||
}
|
Proto_services.Mempool.branch_delayed =
|
||||||
| Branch_delayed err -> {
|
Operation_hash.Map.add
|
||||||
acc with
|
hash
|
||||||
Proto_services.Mempool.branch_delayed =
|
(proto_op,err)
|
||||||
Operation_hash.Map.add
|
acc.Proto_services.Mempool.branch_delayed
|
||||||
hash
|
}
|
||||||
(map_op op.raw,err)
|
| Refused err -> {
|
||||||
acc.Proto_services.Mempool.branch_delayed
|
acc with
|
||||||
}
|
Proto_services.Mempool.refused =
|
||||||
| Refused err -> {
|
Operation_hash.Map.add
|
||||||
acc with
|
hash
|
||||||
Proto_services.Mempool.refused =
|
(proto_op,err)
|
||||||
Operation_hash.Map.add
|
acc.Proto_services.Mempool.refused
|
||||||
hash
|
}
|
||||||
(map_op op.raw,err)
|
| _ -> acc
|
||||||
acc.Proto_services.Mempool.refused
|
) t empty
|
||||||
}
|
|
||||||
| _ -> acc
|
|
||||||
end)
|
|
||||||
t empty
|
|
||||||
|
|
||||||
let clear t = Operation_hash.Table.clear t
|
let clear t = Operation_hash.Table.clear t
|
||||||
|
|
||||||
@ -489,13 +490,13 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
|
|||||||
let on_validate w parsed_op =
|
let on_validate w parsed_op =
|
||||||
let state = Worker.state w in
|
let state = Worker.state w in
|
||||||
match ValidatedCache.find_opt state.cache parsed_op with
|
match ValidatedCache.find_opt state.cache parsed_op with
|
||||||
| None | Some (Branch_delayed _) ->
|
| None | Some ((Branch_delayed _),_) ->
|
||||||
validate_helper w parsed_op >>= fun result ->
|
validate_helper w parsed_op >>= fun result ->
|
||||||
ValidatedCache.add state.cache parsed_op result;
|
ValidatedCache.add state.cache parsed_op (result, parsed_op.raw);
|
||||||
(* operations are notified only the first time *)
|
(* operations are notified only the first time *)
|
||||||
notify_helper w result parsed_op.raw ;
|
notify_helper w result parsed_op.raw ;
|
||||||
Lwt.return result
|
Lwt.return result
|
||||||
| Some result -> Lwt.return result
|
| Some (result,_) -> Lwt.return result
|
||||||
|
|
||||||
(* worker's handlers *)
|
(* worker's handlers *)
|
||||||
let on_request :
|
let on_request :
|
||||||
@ -588,7 +589,7 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
|
|||||||
(Proto_services.S.Mempool.pending_operations RPC_path.open_root)
|
(Proto_services.S.Mempool.pending_operations RPC_path.open_root)
|
||||||
(fun w () () ->
|
(fun w () () ->
|
||||||
let state = Worker.state w in
|
let state = Worker.state w in
|
||||||
RPC_answer.return (ValidatedCache.to_mempool state.cache parsed_cache)
|
RPC_answer.return (ValidatedCache.to_mempool state.cache)
|
||||||
)
|
)
|
||||||
|
|
||||||
let monitor_rpc_directory : t RPC_directory.t =
|
let monitor_rpc_directory : t RPC_directory.t =
|
||||||
|
Loading…
Reference in New Issue
Block a user