Shell: fix notification of new operations in the mempool

This commit is contained in:
Vincent Botbol 2018-10-24 17:59:02 +02:00
parent 1272b11ea2
commit bb6983590f
No known key found for this signature in database
GPG Key ID: A2CE1BDBED95DA38
3 changed files with 75 additions and 62 deletions

View File

@ -97,8 +97,6 @@ module type T = sig
| Outdated
val apply_operation: t -> operation -> result Lwt.t
val apply_operation_with_preapply_result:
error Preapply_result.t -> t -> operation -> (error Preapply_result.t * t) Lwt.t
type status = {
applied_operations : (operation * Proto.operation_receipt) list ;
@ -226,35 +224,6 @@ module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struc
| `Permanent -> Refused errors
| `Temporary -> Branch_delayed errors
let apply_operation_with_preapply_result preapp t op =
let open Preapply_result in
apply_operation t op >>= function
| Applied (t, _) ->
let applied = (op.hash, op.raw) :: preapp.applied in
Lwt.return ({ preapp with applied }, t)
| Branch_delayed errors ->
let branch_delayed =
Operation_hash.Map.add
op.hash
(op.raw, errors)
preapp.branch_delayed in
Lwt.return ({ preapp with branch_delayed }, t)
| Branch_refused errors ->
let branch_refused =
Operation_hash.Map.add
op.hash
(op.raw, errors)
preapp.branch_refused in
Lwt.return ({ preapp with branch_refused }, t)
| Refused errors ->
let refused =
Operation_hash.Map.add
op.hash
(op.raw, errors)
preapp.refused in
Lwt.return ({ preapp with refused }, t)
| Duplicate | Outdated -> Lwt.return (preapp, t)
type status = {
applied_operations : (operation * Proto.operation_receipt) list ;
block_result : Tezos_protocol_environment_shell.validation_result ;
@ -285,6 +254,34 @@ let preapply ~predecessor ~timestamp ~protocol_data operations =
return protocol
end >>=? fun (module Proto) ->
let module Prevalidation = Make(Proto) in
let apply_operation_with_preapply_result preapp t op =
let open Preapply_result in
Prevalidation.apply_operation t op >>= function
| Applied (t, _) ->
let applied = (op.hash, op.raw) :: preapp.applied in
Lwt.return ({ preapp with applied }, t)
| Branch_delayed errors ->
let branch_delayed =
Operation_hash.Map.add
op.hash
(op.raw, errors)
preapp.branch_delayed in
Lwt.return ({ preapp with branch_delayed }, t)
| Branch_refused errors ->
let branch_refused =
Operation_hash.Map.add
op.hash
(op.raw, errors)
preapp.branch_refused in
Lwt.return ({ preapp with branch_refused }, t)
| Refused errors ->
let refused =
Operation_hash.Map.add
op.hash
(op.raw, errors)
preapp.refused in
Lwt.return ({ preapp with refused }, t)
| Duplicate | Outdated -> Lwt.return (preapp, t) in
Prevalidation.create
~protocol_data ~predecessor ~timestamp () >>=? fun validation_state ->
Lwt_list.fold_left_s
@ -296,7 +293,7 @@ let preapply ~predecessor ~timestamp ~protocol_data operations =
(* FIXME *)
Lwt.return (acc_validation_result, acc_validation_state)
| Ok op ->
Prevalidation.apply_operation_with_preapply_result
apply_operation_with_preapply_result
acc_validation_result acc_validation_state op)
(Preapply_result.empty, acc_validation_state)
operations

View File

@ -61,8 +61,6 @@ module type T = sig
| Outdated
val apply_operation: t -> operation -> result Lwt.t
val apply_operation_with_preapply_result:
error Preapply_result.t -> t -> operation -> (error Preapply_result.t * t) Lwt.t
type status = {
applied_operations : (operation * Proto.operation_receipt) list ;

View File

@ -181,26 +181,13 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
= Worker.Make (Name) (Prevalidator_worker_state.Event)
(Prevalidator_worker_state.Request) (Types)
(** Centralised operation stream for the RPCs *)
let notify_operation { operation_stream } result =
let open Preapply_result in
let { applied ; refused ; branch_refused ; branch_delayed } = result in
(* Notify new opperations *)
let map_op kind { Operation.shell ; proto } =
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
proto in
kind, shell, protocol_data in
let fold_op kind _k (op, _error) acc = map_op kind op :: acc in
let applied = List.map (map_op `Applied) (List.map snd applied) in
let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in
let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in
let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in
let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in
List.iter (Lwt_watcher.notify operation_stream) ops
let notify_operation { operation_stream } result { Operation.shell ; proto } =
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
proto in
Lwt_watcher.notify operation_stream (result, shell, protocol_data)
open Types
@ -334,17 +321,49 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
debug w "processing %d operations" n ;
let operations = List.map snd (Operation_hash.Map.bindings pv.pending) in
Lwt_list.fold_left_s
(fun (acc_validation_result, acc_validation_state) op ->
(fun (acc_validation_result, acc_validation_state, acc_mempool) op ->
match Prevalidation.parse op with
| Error _ ->
(* FIXME *)
Lwt.return (acc_validation_result, acc_validation_state)
Lwt.return (acc_validation_result, acc_validation_state, acc_mempool)
| Ok op ->
Prevalidation.apply_operation_with_preapply_result
acc_validation_result acc_validation_state op)
(pv.validation_result, state)
operations
>>= fun (new_result, new_state) ->
let open Preapply_result in
Prevalidation.apply_operation acc_validation_state op >>= function
| Applied (new_acc_validation_state, _) ->
notify_operation pv `Applied op.raw ;
let new_mempool = Mempool.{ acc_mempool with known_valid = op.hash :: acc_mempool.known_valid } in
let applied = (op.hash, op.raw) :: acc_validation_result.applied in
Lwt.return ({ acc_validation_result with applied }, new_acc_validation_state, new_mempool)
| Branch_delayed errors ->
notify_operation pv `Branch_delayed op.raw ;
let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in
let branch_delayed =
Operation_hash.Map.add
op.hash
(op.raw, errors)
acc_validation_result.branch_delayed in
Lwt.return ({ acc_validation_result with branch_delayed }, acc_validation_state, new_mempool)
| Branch_refused errors ->
notify_operation pv `Branch_refused op.raw ;
let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in
let branch_refused =
Operation_hash.Map.add
op.hash
(op.raw, errors)
acc_validation_result.branch_refused in
Lwt.return ({ acc_validation_result with branch_refused }, acc_validation_state, new_mempool)
| Refused errors ->
notify_operation pv `Refused op.raw ;
let new_mempool = Mempool.{ acc_mempool with pending = Operation_hash.Set.add op.hash acc_mempool.pending } in
let refused =
Operation_hash.Map.add
op.hash
(op.raw, errors)
acc_validation_result.refused in
Lwt.return ({ acc_validation_result with refused }, acc_validation_state, new_mempool)
| Duplicate | Outdated -> Lwt.return (acc_validation_result, acc_validation_state, acc_mempool))
(pv.validation_result, state, Mempool.empty)
operations >>= fun (new_result, new_state, advertised_mempool) ->
pv.validation_state <- Ok new_state ;
pv.validation_result <- new_result ;
pv.in_mempool <-
@ -367,8 +386,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
pv.validation_result.refused ;
pv.pending <- Operation_hash.Map.empty ;
advertise w pv
(mempool_of_prevalidation_result new_result) ;
notify_operation pv new_result ;
{ advertised_mempool with known_valid = List.rev advertised_mempool.known_valid } ;
Lwt.return_unit
end >>= fun () ->
pv.mempool <-