From 536e64d93fcff021434374985f5034eccbd45cff Mon Sep 17 00:00:00 2001 From: Benjamin Canou Date: Fri, 26 Oct 2018 02:18:16 +0200 Subject: [PATCH] Mempool: add simple limits to the mempool --- src/lib_shell/prevalidator.ml | 205 ++++++++++++++++++---------------- 1 file changed, 110 insertions(+), 95 deletions(-) diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index a9971fb74..97c5d8427 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -47,12 +47,17 @@ module type T = sig mutable live_blocks : Block_hash.Set.t ; mutable live_operations : Operation_hash.Set.t ; refused : Operation_hash.t Ring.t ; - mutable refusals : error list Operation_hash.Map.t ; + mutable refusals : (Operation.t * error list) Operation_hash.Map.t ; + branch_refused : Operation_hash.t Ring.t ; + mutable branch_refusals : (Operation.t * error list) Operation_hash.Map.t; + branch_delayed : Operation_hash.t Ring.t ; + mutable branch_delays : (Operation.t * error list) Operation_hash.Map.t; mutable fetching : Operation_hash.Set.t ; mutable pending : Operation.t Operation_hash.Map.t ; mutable mempool : Mempool.t ; mutable in_mempool : Operation_hash.Set.t ; - mutable validation_result : error Preapply_result.t ; + mutable applied : (Operation_hash.t * Operation.t) list; + mutable applied_count : int ; mutable validation_state : Prevalidation.t tzresult ; mutable operation_stream : ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * @@ -76,7 +81,7 @@ module type T = sig to_block:State.Block.t -> Operation.t Operation_hash.Map.t -> (Operation.t Operation_hash.Map.t * Block_hash.Set.t * Operation_hash.Set.t) Lwt.t - + val validation_result: types_state -> error Preapply_result.t val worker: worker Lwt.t end @@ -102,12 +107,17 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct mutable live_blocks : Block_hash.Set.t ; (* just a cache *) mutable live_operations : Operation_hash.Set.t ; (* just a cache *) refused : Operation_hash.t Ring.t ; - mutable refusals : error list Operation_hash.Map.t ; + mutable refusals : (Operation.t * error list) Operation_hash.Map.t ; + branch_refused : Operation_hash.t Ring.t ; + mutable branch_refusals : (Operation.t * error list) Operation_hash.Map.t; + branch_delayed : Operation_hash.t Ring.t ; + mutable branch_delays : (Operation.t * error list) Operation_hash.Map.t; mutable fetching : Operation_hash.Set.t ; mutable pending : Operation.t Operation_hash.Map.t ; mutable mempool : Mempool.t ; mutable in_mempool : Operation_hash.Set.t ; - mutable validation_result : error Preapply_result.t ; + mutable applied : (Operation_hash.t * Operation.t) list; + mutable applied_count : int ; mutable validation_state : Prevalidation.t tzresult ; mutable operation_stream : ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * @@ -163,11 +173,11 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct applied = List.rev (List.map (fun (h, _) -> h) - state.validation_result.applied) ; + state.applied) ; delayed = Operation_hash.Set.union - (domain state.validation_result.branch_delayed) - (domain state.validation_result.branch_refused) } + (domain state.branch_delays) + (domain state.branch_refusals) } end @@ -256,6 +266,12 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct || Operation_hash.Set.mem oph pv.live_operations || Operation_hash.Set.mem oph pv.in_mempool + let validation_result (state : types_state) = + { Preapply_result.applied = List.rev state.applied ; + branch_delayed = state.branch_delays ; + branch_refused = state.branch_refusals ; + refused = Operation_hash.Map.empty } + let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t = { Mempool.known_valid = List.map fst r.applied ; pending = @@ -305,12 +321,17 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct let handle_unprocessed w pv = begin match pv.validation_state with | Error err -> - pv.validation_result <- - { Preapply_result.empty with - branch_delayed = - Operation_hash.Map.fold - (fun h op m -> Operation_hash.Map.add h (op, err) m) - pv.pending Operation_hash.Map.empty } ; + Operation_hash.Map.iter + (fun h op -> + Option.iter (Ring.add_and_return_erased pv.branch_delayed h) + ~f:(fun e -> + pv.branch_delays <- Operation_hash.Map.remove e pv.branch_delays ; + pv.in_mempool <- Operation_hash.Set.remove e pv.in_mempool) ; + pv.in_mempool <- + Operation_hash.Set.add h pv.in_mempool ; + pv.branch_delays <- + Operation_hash.Map.add h (op, err) pv.branch_delays) + pv.pending ; pv.pending <- Operation_hash.Map.empty ; Lwt.return_unit @@ -320,70 +341,57 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct | n -> debug w "processing %d operations" n ; let operations = List.map snd (Operation_hash.Map.bindings pv.pending) in - Lwt_list.fold_left_s - (fun (acc_validation_result, acc_validation_state, acc_mempool) op -> - match Prevalidation.parse op with - | Error _ -> - (* FIXME *) - Lwt.return (acc_validation_result, acc_validation_state, acc_mempool) - | Ok op -> - let open Preapply_result in - Prevalidation.apply_operation acc_validation_state op >>= function - | Applied (new_acc_validation_state, _) -> - notify_operation pv `Applied op.raw ; - let new_mempool = Mempool.{ acc_mempool with known_valid = op.hash :: acc_mempool.known_valid } in - let applied = (op.hash, op.raw) :: acc_validation_result.applied in - Lwt.return ({ acc_validation_result with applied }, new_acc_validation_state, new_mempool) - | Branch_delayed errors -> - notify_operation pv `Branch_delayed op.raw ; - let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in - let branch_delayed = - Operation_hash.Map.add - op.hash - (op.raw, errors) - acc_validation_result.branch_delayed in - Lwt.return ({ acc_validation_result with branch_delayed }, acc_validation_state, new_mempool) - | Branch_refused errors -> - notify_operation pv `Branch_refused op.raw ; - let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in - let branch_refused = - Operation_hash.Map.add - op.hash - (op.raw, errors) - acc_validation_result.branch_refused in - Lwt.return ({ acc_validation_result with branch_refused }, acc_validation_state, new_mempool) - | Refused errors -> - notify_operation pv `Refused op.raw ; - let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in - let refused = - Operation_hash.Map.add - op.hash - (op.raw, errors) - acc_validation_result.refused in - Lwt.return ({ acc_validation_result with refused }, acc_validation_state, new_mempool) - | Duplicate | Outdated -> Lwt.return (acc_validation_result, acc_validation_state, acc_mempool)) - (pv.validation_result, state, Mempool.empty) - operations >>= fun (new_result, new_state, advertised_mempool) -> - pv.validation_state <- Ok new_state ; - pv.validation_result <- new_result ; - pv.in_mempool <- - (Operation_hash.Map.fold - (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) - pv.pending @@ - Operation_hash.Map.fold - (fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool) - pv.validation_result.refused @@ - pv.in_mempool) ; - Operation_hash.Map.iter - (fun h (_, errs) -> - Option.iter (Ring.add_and_return_erased pv.refused h) - ~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ; - pv.refusals <- - Operation_hash.Map.add h errs pv.refusals) - pv.validation_result.refused ; - Operation_hash.Map.iter - (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.chain_db oph) - pv.validation_result.refused ; + Lwt_list.fold_left_s (fun (acc_validation_state, acc_mempool) op -> + let refused hash raw errors = + notify_operation pv `Refused raw ; + let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add hash acc_mempool.pending } in + Option.iter (Ring.add_and_return_erased pv.refused hash) + ~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ; + pv.refusals <- Operation_hash.Map.add hash (raw, errors) pv.refusals ; + Distributed_db.Operation.clear_or_cancel pv.chain_db hash ; + Lwt.return (acc_validation_state, new_mempool) in + match Prevalidation.parse op with + | Error errors -> + refused (Operation.hash op) op errors + | Ok op -> + let open Preapply_result in + Prevalidation.apply_operation state op >>= function + | Applied (new_acc_validation_state, _) -> + if pv.applied_count <= 2000 (* this test is a quick fix while we wait for the new mempool *) + || Proto.acceptable_passes { shell = op.raw.shell ; protocol_data = op.protocol_data } = [0] then begin + notify_operation pv `Applied op.raw ; + let new_mempool = Mempool.{ acc_mempool with known_valid = op.hash :: acc_mempool.known_valid } in + pv.applied <- (op.hash, op.raw) :: pv.applied ; + pv.in_mempool <- Operation_hash.Set.add op.hash pv.in_mempool ; + Lwt.return (new_acc_validation_state, new_mempool) + end else + Lwt.return (acc_validation_state, acc_mempool) + | Branch_delayed errors -> + notify_operation pv `Branch_delayed op.raw ; + let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in + Option.iter (Ring.add_and_return_erased pv.branch_delayed op.hash) + ~f:(fun e -> + pv.branch_delays <- Operation_hash.Map.remove e pv.branch_delays ; + pv.in_mempool <- Operation_hash.Set.remove e pv.in_mempool) ; + pv.in_mempool <- Operation_hash.Set.add op.hash pv.in_mempool ; + pv.branch_delays <- Operation_hash.Map.add op.hash (op.raw, errors) pv.branch_delays ; + Lwt.return (acc_validation_state, new_mempool) + | Branch_refused errors -> + notify_operation pv `Branch_refused op.raw ; + let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in + Option.iter (Ring.add_and_return_erased pv.branch_refused op.hash) + ~f:(fun e -> + pv.branch_refusals <- Operation_hash.Map.remove e pv.branch_refusals ; + pv.in_mempool <- Operation_hash.Set.remove e pv.in_mempool) ; + pv.in_mempool <- Operation_hash.Set.add op.hash pv.in_mempool ; + pv.branch_refusals <- Operation_hash.Map.add op.hash (op.raw, errors) pv.branch_refusals ; + Lwt.return (acc_validation_state, new_mempool) + | Refused errors -> + refused op.hash op.raw errors + | Duplicate | Outdated -> Lwt.return (acc_validation_state, acc_mempool)) + (state, Mempool.empty) + operations >>= fun (state, advertised_mempool) -> + pv.validation_state <- Ok state ; pv.pending <- Operation_hash.Map.empty ; advertise w pv { advertised_mempool with known_valid = List.rev advertised_mempool.known_valid } ; @@ -391,14 +399,14 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct end >>= fun () -> pv.mempool <- { Mempool.known_valid = - List.rev_map fst pv.validation_result.applied ; + List.rev_map fst pv.applied ; pending = Operation_hash.Map.fold (fun k _ s -> Operation_hash.Set.add k s) - pv.validation_result.branch_delayed @@ + pv.branch_delays @@ Operation_hash.Map.fold (fun k _ s -> Operation_hash.Set.add k s) - pv.validation_result.branch_refused @@ + pv.branch_refusals @@ Operation_hash.Set.empty } ; State.Current_mempool.set (Distributed_db.chain_state pv.chain_db) ~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () -> @@ -441,20 +449,20 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct Proto_services.Mempool.applied = List.map (fun (hash, op) -> (hash, map_op op)) - (List.rev pv.validation_result.applied) ; + (List.rev pv.applied) ; refused = - Operation_hash.Map.map map_op_error pv.validation_result.refused ; + Operation_hash.Map.map map_op_error pv.branch_refusals ; branch_refused = - Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ; + Operation_hash.Map.map map_op_error pv.branch_refusals ; branch_delayed = - Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ; + Operation_hash.Map.map map_op_error pv.branch_delays ; unprocessed = Operation_hash.Map.map map_op pv.pending ; }) ; dir := RPC_directory.gen_register !dir (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) - begin fun { validation_result = current_mempool ; operation_stream } params () -> + begin fun { applied ; refusals = refused ; branch_refusals = branch_refused ; branch_delays = branch_delayed ; operation_stream } params () -> let open Preapply_result in let op_stream, stopper = Lwt_watcher.create_stream operation_stream in (* Convert ops *) @@ -466,7 +474,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct Proto.{ shell = op.shell ; protocol_data } in let fold_op _k (op, _error) acc = map_op op :: acc in (* First call : retrieve the current set of op from the mempool *) - let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in let applied = if params#applied then List.map map_op (List.map snd applied) else [] in let refused = if params#refused then Operation_hash.Map.fold fold_op refused [] else [] in @@ -560,11 +567,10 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct list_pendings ~maintain_chain_db:pv.chain_db ~from_block:pv.predecessor ~to_block:predecessor - (Preapply_result.operations pv.validation_result) + (Preapply_result.operations (validation_result pv)) >>= fun (pending, new_live_blocks, new_live_operations) -> let timestamp = Time.now () in Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state -> - let validation_result = Preapply_result.empty in debug w "%d operations were not washed by the flush" (Operation_hash.Map.cardinal pending) ; pv.predecessor <- predecessor ; @@ -574,7 +580,12 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct pv.mempool <- { known_valid = [] ; pending = Operation_hash.Set.empty }; pv.pending <- pending ; pv.in_mempool <- Operation_hash.Set.empty ; - pv.validation_result <- validation_result ; + Ring.clear pv.branch_delayed ; + pv.branch_delays <- Operation_hash.Map.empty ; + Ring.clear pv.branch_refused ; + pv.branch_refusals <- Operation_hash.Map.empty ; + pv.applied <- [] ; + pv.applied_count <- 0 ; pv.validation_state <- validation_state ; pv.operation_stream <- Lwt_watcher.create_input () ; return_unit @@ -627,7 +638,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct live_blocks ; live_operations } -> let timestamp = Time.now () in Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state -> - let validation_result = Preapply_result.empty in let fetching = List.fold_left (fun s h -> Operation_hash.Set.add h s) @@ -641,7 +651,12 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct fetching ; pending = Operation_hash.Map.empty ; in_mempool = Operation_hash.Set.empty ; - validation_result ; + applied = [] ; + applied_count = 0 ; + branch_refused = Ring.create limits.max_refused_operations ; + branch_refusals = Operation_hash.Map.empty ; + branch_delayed = Ring.create limits.max_refused_operations ; + branch_delays = Operation_hash.Map.empty ; validation_state ; operation_stream = Lwt_watcher.create_input () ; advertisement = `None ; @@ -734,15 +749,15 @@ let operations (t:t) = (Preapply_result.empty, Operation_hash.Map.empty) | Lwt.Return w -> let pv = Prevalidator.Worker.state w in - ({ pv.Prevalidator.validation_result with - applied = List.rev pv.validation_result.applied }, + ({ (Prevalidator.validation_result pv) with + applied = List.rev pv.applied }, pv.pending) let pending ?block (t:t) = let module Prevalidator: T = (val t) in Prevalidator.worker >>= fun w -> let pv = Prevalidator.Worker.state w in - let ops = Preapply_result.operations pv.validation_result in + let ops = Preapply_result.operations (Prevalidator.validation_result pv) in match block with | Some to_block -> Prevalidator.list_pendings