Mempool: add simple limits to the mempool
This commit is contained in:
@ -47,12 +47,17 @@ module type T = sig
mutable live_blocks : Block_hash.Set.t ;
mutable live_blocks : Block_hash.Set.t ;
mutable live_operations : Operation_hash.Set.t ;
mutable live_operations : Operation_hash.Set.t ;
refused : Operation_hash.t Ring.t ;
refused : Operation_hash.t Ring.t ;
mutable refusals : error list Operation_hash.Map.t ;
mutable refusals : (Operation.t * error list) Operation_hash.Map.t ;
branch_refused : Operation_hash.t Ring.t ;
mutable branch_refusals : (Operation.t * error list) Operation_hash.Map.t;
branch_delayed : Operation_hash.t Ring.t ;
mutable branch_delays : (Operation.t * error list) Operation_hash.Map.t;
mutable fetching : Operation_hash.Set.t ;
mutable fetching : Operation_hash.Set.t ;
mutable pending : Operation.t Operation_hash.Map.t ;
mutable pending : Operation.t Operation_hash.Map.t ;
mutable mempool : Mempool.t ;
mutable mempool : Mempool.t ;
mutable in_mempool : Operation_hash.Set.t ;
mutable in_mempool : Operation_hash.Set.t ;
mutable validation_result : error Preapply_result.t ;
mutable applied : (Operation_hash.t * Operation.t) list;
mutable applied_count : int ;
mutable validation_state : Prevalidation.t tzresult ;
mutable validation_state : Prevalidation.t tzresult ;
mutable operation_stream :
mutable operation_stream :
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
@ -76,7 +81,7 @@ module type T = sig
to_block:State.Block.t ->
to_block:State.Block.t ->
Operation.t Operation_hash.Map.t ->
Operation.t Operation_hash.Map.t ->
(Operation.t Operation_hash.Map.t * Block_hash.Set.t * Operation_hash.Set.t) Lwt.t
(Operation.t Operation_hash.Map.t * Block_hash.Set.t * Operation_hash.Set.t) Lwt.t
val validation_result: types_state -> error Preapply_result.t
val worker: worker Lwt.t
val worker: worker Lwt.t
@ -102,12 +107,17 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
mutable live_blocks : Block_hash.Set.t ; (* just a cache *)
mutable live_blocks : Block_hash.Set.t ; (* just a cache *)
mutable live_operations : Operation_hash.Set.t ; (* just a cache *)
mutable live_operations : Operation_hash.Set.t ; (* just a cache *)
refused : Operation_hash.t Ring.t ;
refused : Operation_hash.t Ring.t ;
mutable refusals : error list Operation_hash.Map.t ;
mutable refusals : (Operation.t * error list) Operation_hash.Map.t ;
branch_refused : Operation_hash.t Ring.t ;
mutable branch_refusals : (Operation.t * error list) Operation_hash.Map.t;
branch_delayed : Operation_hash.t Ring.t ;
mutable branch_delays : (Operation.t * error list) Operation_hash.Map.t;
mutable fetching : Operation_hash.Set.t ;
mutable fetching : Operation_hash.Set.t ;
mutable pending : Operation.t Operation_hash.Map.t ;
mutable pending : Operation.t Operation_hash.Map.t ;
mutable mempool : Mempool.t ;
mutable mempool : Mempool.t ;
mutable in_mempool : Operation_hash.Set.t ;
mutable in_mempool : Operation_hash.Set.t ;
mutable validation_result : error Preapply_result.t ;
mutable applied : (Operation_hash.t * Operation.t) list;
mutable applied_count : int ;
mutable validation_state : Prevalidation.t tzresult ;
mutable validation_state : Prevalidation.t tzresult ;
mutable operation_stream :
mutable operation_stream :
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
@ -163,11 +173,11 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
applied =
applied =
( (fun (h, _) -> h)
( (fun (h, _) -> h)
state.validation_result.applied) ;
state.applied) ;
delayed =
delayed =
(domain state.validation_result.branch_delayed)
(domain state.branch_delays)
(domain state.validation_result.branch_refused) }
(domain state.branch_refusals) }
@ -256,6 +266,12 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
|| Operation_hash.Set.mem oph pv.live_operations
|| Operation_hash.Set.mem oph pv.live_operations
|| Operation_hash.Set.mem oph pv.in_mempool
|| Operation_hash.Set.mem oph pv.in_mempool
let validation_result (state : types_state) =
{ Preapply_result.applied = List.rev state.applied ;
branch_delayed = state.branch_delays ;
branch_refused = state.branch_refusals ;
refused = Operation_hash.Map.empty }
let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t =
let mempool_of_prevalidation_result (r : error Preapply_result.t) : Mempool.t =
{ Mempool.known_valid = fst r.applied ;
{ Mempool.known_valid = fst r.applied ;
pending =
pending =
@ -305,12 +321,17 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
let handle_unprocessed w pv =
let handle_unprocessed w pv =
begin match pv.validation_state with
begin match pv.validation_state with
| Error err ->
| Error err ->
pv.validation_result <-
{ Preapply_result.empty with
(fun h op ->
branch_delayed =
Option.iter (Ring.add_and_return_erased pv.branch_delayed h)
~f:(fun e ->
(fun h op m -> Operation_hash.Map.add h (op, err) m)
pv.branch_delays <- Operation_hash.Map.remove e pv.branch_delays ;
pv.pending Operation_hash.Map.empty } ;
pv.in_mempool <- Operation_hash.Set.remove e pv.in_mempool) ;
pv.in_mempool <-
Operation_hash.Set.add h pv.in_mempool ;
pv.branch_delays <-
Operation_hash.Map.add h (op, err) pv.branch_delays)
pv.pending ;
pv.pending <-
pv.pending <-
Operation_hash.Map.empty ;
Operation_hash.Map.empty ;
@ -320,70 +341,57 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
| n ->
| n ->
debug w "processing %d operations" n ;
debug w "processing %d operations" n ;
let operations = snd (Operation_hash.Map.bindings pv.pending) in
let operations = snd (Operation_hash.Map.bindings pv.pending) in
Lwt_list.fold_left_s (fun (acc_validation_state, acc_mempool) op ->
(fun (acc_validation_result, acc_validation_state, acc_mempool) op ->
let refused hash raw errors =
notify_operation pv `Refused raw ;
let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add hash acc_mempool.pending } in
Option.iter (Ring.add_and_return_erased pv.refused hash)
~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ;
pv.refusals <- Operation_hash.Map.add hash (raw, errors) pv.refusals ;
Distributed_db.Operation.clear_or_cancel pv.chain_db hash ;
Lwt.return (acc_validation_state, new_mempool) in
match Prevalidation.parse op with
match Prevalidation.parse op with
| Error _ ->
| Error errors ->
(* FIXME *)
refused (Operation.hash op) op errors
Lwt.return (acc_validation_result, acc_validation_state, acc_mempool)
| Ok op ->
| Ok op ->
let open Preapply_result in
let open Preapply_result in
Prevalidation.apply_operation acc_validation_state op >>= function
Prevalidation.apply_operation state op >>= function
| Applied (new_acc_validation_state, _) ->
| Applied (new_acc_validation_state, _) ->
if pv.applied_count <= 2000 (* this test is a quick fix while we wait for the new mempool *)
|| Proto.acceptable_passes { shell = ; protocol_data = op.protocol_data } = [0] then begin
notify_operation pv `Applied op.raw ;
notify_operation pv `Applied op.raw ;
let new_mempool = Mempool.{ acc_mempool with known_valid = op.hash :: acc_mempool.known_valid } in
let new_mempool = Mempool.{ acc_mempool with known_valid = op.hash :: acc_mempool.known_valid } in
let applied = (op.hash, op.raw) :: acc_validation_result.applied in
pv.applied <- (op.hash, op.raw) :: pv.applied ;
Lwt.return ({ acc_validation_result with applied }, new_acc_validation_state, new_mempool)
pv.in_mempool <- Operation_hash.Set.add op.hash pv.in_mempool ;
Lwt.return (new_acc_validation_state, new_mempool)
end else
Lwt.return (acc_validation_state, acc_mempool)
| Branch_delayed errors ->
| Branch_delayed errors ->
notify_operation pv `Branch_delayed op.raw ;
notify_operation pv `Branch_delayed op.raw ;
let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in
let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in
let branch_delayed =
Option.iter (Ring.add_and_return_erased pv.branch_delayed op.hash)
~f:(fun e ->
pv.branch_delays <- Operation_hash.Map.remove e pv.branch_delays ;
(op.raw, errors)
pv.in_mempool <- Operation_hash.Set.remove e pv.in_mempool) ;
acc_validation_result.branch_delayed in
pv.in_mempool <- Operation_hash.Set.add op.hash pv.in_mempool ;
Lwt.return ({ acc_validation_result with branch_delayed }, acc_validation_state, new_mempool)
pv.branch_delays <- Operation_hash.Map.add op.hash (op.raw, errors) pv.branch_delays ;
Lwt.return (acc_validation_state, new_mempool)
| Branch_refused errors ->
| Branch_refused errors ->
notify_operation pv `Branch_refused op.raw ;
notify_operation pv `Branch_refused op.raw ;
let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in
let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in
let branch_refused =
Option.iter (Ring.add_and_return_erased pv.branch_refused op.hash)
~f:(fun e ->
pv.branch_refusals <- Operation_hash.Map.remove e pv.branch_refusals ;
(op.raw, errors)
pv.in_mempool <- Operation_hash.Set.remove e pv.in_mempool) ;
acc_validation_result.branch_refused in
pv.in_mempool <- Operation_hash.Set.add op.hash pv.in_mempool ;
Lwt.return ({ acc_validation_result with branch_refused }, acc_validation_state, new_mempool)
pv.branch_refusals <- Operation_hash.Map.add op.hash (op.raw, errors) pv.branch_refusals ;
Lwt.return (acc_validation_state, new_mempool)
| Refused errors ->
| Refused errors ->
notify_operation pv `Refused op.raw ;
refused op.hash op.raw errors
let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in
| Duplicate | Outdated -> Lwt.return (acc_validation_state, acc_mempool))
let refused =
(state, Mempool.empty)
operations >>= fun (state, advertised_mempool) ->
pv.validation_state <- Ok state ;
(op.raw, errors)
acc_validation_result.refused in
Lwt.return ({ acc_validation_result with refused }, acc_validation_state, new_mempool)
| Duplicate | Outdated -> Lwt.return (acc_validation_result, acc_validation_state, acc_mempool))
(pv.validation_result, state, Mempool.empty)
operations >>= fun (new_result, new_state, advertised_mempool) ->
pv.validation_state <- Ok new_state ;
pv.validation_result <- new_result ;
pv.in_mempool <-
(fun h _ in_mempool -> Operation_hash.Set.add h in_mempool)
pv.pending @@
(fun h _ in_mempool -> Operation_hash.Set.remove h in_mempool)
pv.validation_result.refused @@
pv.in_mempool) ;
(fun h (_, errs) ->
Option.iter (Ring.add_and_return_erased pv.refused h)
~f:(fun e -> pv.refusals <- Operation_hash.Map.remove e pv.refusals) ;
pv.refusals <-
Operation_hash.Map.add h errs pv.refusals)
pv.validation_result.refused ;
(fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.chain_db oph)
pv.validation_result.refused ;
pv.pending <- Operation_hash.Map.empty ;
pv.pending <- Operation_hash.Map.empty ;
advertise w pv
advertise w pv
{ advertised_mempool with known_valid = List.rev advertised_mempool.known_valid } ;
{ advertised_mempool with known_valid = List.rev advertised_mempool.known_valid } ;
@ -391,14 +399,14 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
end >>= fun () ->
end >>= fun () ->
pv.mempool <-
pv.mempool <-
{ Mempool.known_valid =
{ Mempool.known_valid =
List.rev_map fst pv.validation_result.applied ;
List.rev_map fst pv.applied ;
pending =
pending =
(fun k _ s -> Operation_hash.Set.add k s)
(fun k _ s -> Operation_hash.Set.add k s)
pv.validation_result.branch_delayed @@
pv.branch_delays @@
(fun k _ s -> Operation_hash.Set.add k s)
(fun k _ s -> Operation_hash.Set.add k s)
pv.validation_result.branch_refused @@
pv.branch_refusals @@
Operation_hash.Set.empty } ;
Operation_hash.Set.empty } ;
State.Current_mempool.set (Distributed_db.chain_state pv.chain_db)
State.Current_mempool.set (Distributed_db.chain_state pv.chain_db)
~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () ->
~head:(State.Block.hash pv.predecessor) pv.mempool >>= fun () ->
@ -441,20 +449,20 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
Proto_services.Mempool.applied =
Proto_services.Mempool.applied =
(fun (hash, op) -> (hash, map_op op))
(fun (hash, op) -> (hash, map_op op))
(List.rev pv.validation_result.applied) ;
(List.rev pv.applied) ;
refused =
refused =
|||||| map_op_error pv.validation_result.refused ;
| map_op_error pv.branch_refusals ;
branch_refused =
branch_refused =
|||||| map_op_error pv.validation_result.branch_refused ;
| map_op_error pv.branch_refusals ;
branch_delayed =
branch_delayed =
|||||| map_op_error pv.validation_result.branch_delayed ;
| map_op_error pv.branch_delays ;
unprocessed =
unprocessed =
|||||| map_op pv.pending ;
| map_op pv.pending ;
}) ;
}) ;
dir := RPC_directory.gen_register !dir
dir := RPC_directory.gen_register !dir
(Proto_services.S.Mempool.monitor_operations RPC_path.open_root)
(Proto_services.S.Mempool.monitor_operations RPC_path.open_root)
begin fun { validation_result = current_mempool ; operation_stream } params () ->
begin fun { applied ; refusals = refused ; branch_refusals = branch_refused ; branch_delays = branch_delayed ; operation_stream } params () ->
let open Preapply_result in
let open Preapply_result in
let op_stream, stopper = Lwt_watcher.create_stream operation_stream in
let op_stream, stopper = Lwt_watcher.create_stream operation_stream in
(* Convert ops *)
(* Convert ops *)
@ -466,7 +474,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
Proto.{ shell = ; protocol_data } in
Proto.{ shell = ; protocol_data } in
let fold_op _k (op, _error) acc = map_op op :: acc in
let fold_op _k (op, _error) acc = map_op op :: acc in
(* First call : retrieve the current set of op from the mempool *)
(* First call : retrieve the current set of op from the mempool *)
let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in
let applied = if params#applied then map_op ( snd applied) else [] in
let applied = if params#applied then map_op ( snd applied) else [] in
let refused = if params#refused then
let refused = if params#refused then
Operation_hash.Map.fold fold_op refused [] else [] in
Operation_hash.Map.fold fold_op refused [] else [] in
@ -560,11 +567,10 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
~from_block:pv.predecessor ~to_block:predecessor
~from_block:pv.predecessor ~to_block:predecessor
(Preapply_result.operations pv.validation_result)
(Preapply_result.operations (validation_result pv))
>>= fun (pending, new_live_blocks, new_live_operations) ->
>>= fun (pending, new_live_blocks, new_live_operations) ->
let timestamp = () in
let timestamp = () in
Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state ->
Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state ->
let validation_result = Preapply_result.empty in
debug w "%d operations were not washed by the flush"
debug w "%d operations were not washed by the flush"
(Operation_hash.Map.cardinal pending) ;
(Operation_hash.Map.cardinal pending) ;
pv.predecessor <- predecessor ;
pv.predecessor <- predecessor ;
@ -574,7 +580,12 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
pv.mempool <- { known_valid = [] ; pending = Operation_hash.Set.empty };
pv.mempool <- { known_valid = [] ; pending = Operation_hash.Set.empty };
pv.pending <- pending ;
pv.pending <- pending ;
pv.in_mempool <- Operation_hash.Set.empty ;
pv.in_mempool <- Operation_hash.Set.empty ;
pv.validation_result <- validation_result ;
Ring.clear pv.branch_delayed ;
pv.branch_delays <- Operation_hash.Map.empty ;
Ring.clear pv.branch_refused ;
pv.branch_refusals <- Operation_hash.Map.empty ;
pv.applied <- [] ;
pv.applied_count <- 0 ;
pv.validation_state <- validation_state ;
pv.validation_state <- validation_state ;
pv.operation_stream <- Lwt_watcher.create_input () ;
pv.operation_stream <- Lwt_watcher.create_input () ;
@ -627,7 +638,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
live_blocks ; live_operations } ->
live_blocks ; live_operations } ->
let timestamp = () in
let timestamp = () in
Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state ->
Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state ->
let validation_result = Preapply_result.empty in
let fetching =
let fetching =
(fun s h -> Operation_hash.Set.add h s)
(fun s h -> Operation_hash.Set.add h s)
@ -641,7 +651,12 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
fetching ;
fetching ;
pending = Operation_hash.Map.empty ;
pending = Operation_hash.Map.empty ;
in_mempool = Operation_hash.Set.empty ;
in_mempool = Operation_hash.Set.empty ;
validation_result ;
applied = [] ;
applied_count = 0 ;
branch_refused = Ring.create limits.max_refused_operations ;
branch_refusals = Operation_hash.Map.empty ;
branch_delayed = Ring.create limits.max_refused_operations ;
branch_delays = Operation_hash.Map.empty ;
validation_state ;
validation_state ;
operation_stream = Lwt_watcher.create_input () ;
operation_stream = Lwt_watcher.create_input () ;
advertisement = `None ;
advertisement = `None ;
@ -734,15 +749,15 @@ let operations (t:t) =
(Preapply_result.empty, Operation_hash.Map.empty)
(Preapply_result.empty, Operation_hash.Map.empty)
| Lwt.Return w ->
| Lwt.Return w ->
let pv = Prevalidator.Worker.state w in
let pv = Prevalidator.Worker.state w in
({ pv.Prevalidator.validation_result with
({ (Prevalidator.validation_result pv) with
applied = List.rev pv.validation_result.applied },
applied = List.rev pv.applied },
let pending ?block (t:t) =
let pending ?block (t:t) =
let module Prevalidator: T = (val t) in
let module Prevalidator: T = (val t) in
Prevalidator.worker >>= fun w ->
Prevalidator.worker >>= fun w ->
let pv = Prevalidator.Worker.state w in
let pv = Prevalidator.Worker.state w in
let ops = Preapply_result.operations pv.validation_result in
let ops = Preapply_result.operations (Prevalidator.validation_result pv) in
match block with
match block with
| Some to_block ->
| Some to_block ->
Reference in New Issue
Block a user