From bb6983590f1666acf6fef9ce0d9c959608d09eb7 Mon Sep 17 00:00:00 2001 From: Vincent Botbol Date: Wed, 24 Oct 2018 17:59:02 +0200 Subject: [PATCH] Shell: fix notification of new operations in the mempool --- src/lib_shell/prevalidation.ml | 61 +++++++++++++-------------- src/lib_shell/prevalidation.mli | 2 - src/lib_shell/prevalidator.ml | 74 ++++++++++++++++++++------------- 3 files changed, 75 insertions(+), 62 deletions(-) diff --git a/src/lib_shell/prevalidation.ml b/src/lib_shell/prevalidation.ml index f518366e9..3829a5907 100644 --- a/src/lib_shell/prevalidation.ml +++ b/src/lib_shell/prevalidation.ml @@ -97,8 +97,6 @@ module type T = sig | Outdated val apply_operation: t -> operation -> result Lwt.t - val apply_operation_with_preapply_result: - error Preapply_result.t -> t -> operation -> (error Preapply_result.t * t) Lwt.t type status = { applied_operations : (operation * Proto.operation_receipt) list ; @@ -226,35 +224,6 @@ module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struc | `Permanent -> Refused errors | `Temporary -> Branch_delayed errors - let apply_operation_with_preapply_result preapp t op = - let open Preapply_result in - apply_operation t op >>= function - | Applied (t, _) -> - let applied = (op.hash, op.raw) :: preapp.applied in - Lwt.return ({ preapp with applied }, t) - | Branch_delayed errors -> - let branch_delayed = - Operation_hash.Map.add - op.hash - (op.raw, errors) - preapp.branch_delayed in - Lwt.return ({ preapp with branch_delayed }, t) - | Branch_refused errors -> - let branch_refused = - Operation_hash.Map.add - op.hash - (op.raw, errors) - preapp.branch_refused in - Lwt.return ({ preapp with branch_refused }, t) - | Refused errors -> - let refused = - Operation_hash.Map.add - op.hash - (op.raw, errors) - preapp.refused in - Lwt.return ({ preapp with refused }, t) - | Duplicate | Outdated -> Lwt.return (preapp, t) - type status = { applied_operations : (operation * Proto.operation_receipt) list ; block_result : Tezos_protocol_environment_shell.validation_result ; @@ -285,6 +254,34 @@ let preapply ~predecessor ~timestamp ~protocol_data operations = return protocol end >>=? fun (module Proto) -> let module Prevalidation = Make(Proto) in + let apply_operation_with_preapply_result preapp t op = + let open Preapply_result in + Prevalidation.apply_operation t op >>= function + | Applied (t, _) -> + let applied = (op.hash, op.raw) :: preapp.applied in + Lwt.return ({ preapp with applied }, t) + | Branch_delayed errors -> + let branch_delayed = + Operation_hash.Map.add + op.hash + (op.raw, errors) + preapp.branch_delayed in + Lwt.return ({ preapp with branch_delayed }, t) + | Branch_refused errors -> + let branch_refused = + Operation_hash.Map.add + op.hash + (op.raw, errors) + preapp.branch_refused in + Lwt.return ({ preapp with branch_refused }, t) + | Refused errors -> + let refused = + Operation_hash.Map.add + op.hash + (op.raw, errors) + preapp.refused in + Lwt.return ({ preapp with refused }, t) + | Duplicate | Outdated -> Lwt.return (preapp, t) in Prevalidation.create ~protocol_data ~predecessor ~timestamp () >>=? fun validation_state -> Lwt_list.fold_left_s @@ -296,7 +293,7 @@ let preapply ~predecessor ~timestamp ~protocol_data operations = (* FIXME *) Lwt.return (acc_validation_result, acc_validation_state) | Ok op -> - Prevalidation.apply_operation_with_preapply_result + apply_operation_with_preapply_result acc_validation_result acc_validation_state op) (Preapply_result.empty, acc_validation_state) operations diff --git a/src/lib_shell/prevalidation.mli b/src/lib_shell/prevalidation.mli index 0c6ea93cb..d429aa818 100644 --- a/src/lib_shell/prevalidation.mli +++ b/src/lib_shell/prevalidation.mli @@ -61,8 +61,6 @@ module type T = sig | Outdated val apply_operation: t -> operation -> result Lwt.t - val apply_operation_with_preapply_result: - error Preapply_result.t -> t -> operation -> (error Preapply_result.t * t) Lwt.t type status = { applied_operations : (operation * Proto.operation_receipt) list ; diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 0b580ef1a..a9971fb74 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -181,26 +181,13 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct = Worker.Make (Name) (Prevalidator_worker_state.Event) (Prevalidator_worker_state.Request) (Types) - (** Centralised operation stream for the RPCs *) - let notify_operation { operation_stream } result = - let open Preapply_result in - let { applied ; refused ; branch_refused ; branch_delayed } = result in - (* Notify new opperations *) - let map_op kind { Operation.shell ; proto } = - let protocol_data = - Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - proto in - kind, shell, protocol_data in - let fold_op kind _k (op, _error) acc = map_op kind op :: acc in - let applied = List.map (map_op `Applied) (List.map snd applied) in - let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in - let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in - let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in - let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in - List.iter (Lwt_watcher.notify operation_stream) ops - + let notify_operation { operation_stream } result { Operation.shell ; proto } = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + proto in + Lwt_watcher.notify operation_stream (result, shell, protocol_data) open Types @@ -334,17 +321,49 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct debug w "processing %d operations" n ; let operations = List.map snd (Operation_hash.Map.bindings pv.pending) in Lwt_list.fold_left_s - (fun (acc_validation_result, acc_validation_state) op -> + (fun (acc_validation_result, acc_validation_state, acc_mempool) op -> match Prevalidation.parse op with | Error _ -> (* FIXME *) - Lwt.return (acc_validation_result, acc_validation_state) + Lwt.return (acc_validation_result, acc_validation_state, acc_mempool) | Ok op -> - Prevalidation.apply_operation_with_preapply_result - acc_validation_result acc_validation_state op) - (pv.validation_result, state) - operations - >>= fun (new_result, new_state) -> + let open Preapply_result in + Prevalidation.apply_operation acc_validation_state op >>= function + | Applied (new_acc_validation_state, _) -> + notify_operation pv `Applied op.raw ; + let new_mempool = Mempool.{ acc_mempool with known_valid = op.hash :: acc_mempool.known_valid } in + let applied = (op.hash, op.raw) :: acc_validation_result.applied in + Lwt.return ({ acc_validation_result with applied }, new_acc_validation_state, new_mempool) + | Branch_delayed errors -> + notify_operation pv `Branch_delayed op.raw ; + let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in + let branch_delayed = + Operation_hash.Map.add + op.hash + (op.raw, errors) + acc_validation_result.branch_delayed in + Lwt.return ({ acc_validation_result with branch_delayed }, acc_validation_state, new_mempool) + | Branch_refused errors -> + notify_operation pv `Branch_refused op.raw ; + let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in + let branch_refused = + Operation_hash.Map.add + op.hash + (op.raw, errors) + acc_validation_result.branch_refused in + Lwt.return ({ acc_validation_result with branch_refused }, acc_validation_state, new_mempool) + | Refused errors -> + notify_operation pv `Refused op.raw ; + let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in + let refused = + Operation_hash.Map.add + op.hash + (op.raw, errors) + acc_validation_result.refused in + Lwt.return ({ acc_validation_result with refused }, acc_validation_state, new_mempool) + | Duplicate | Outdated -> Lwt.return (acc_validation_result, acc_validation_state, acc_mempool)) + (pv.validation_result, state, Mempool.empty) + operations >>= fun (new_result, new_state, advertised_mempool) -> pv.validation_state <- Ok new_state ; pv.validation_result <- new_result ; pv.in_mempool <- @@ -367,8 +386,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct pv.validation_result.refused ; pv.pending <- Operation_hash.Map.empty ; advertise w pv - (mempool_of_prevalidation_result new_result) ; - notify_operation pv new_result ; + { advertised_mempool with known_valid = List.rev advertised_mempool.known_valid } ; Lwt.return_unit end >>= fun () -> pv.mempool <-