Prevalidator: move Prevalidation RPCs in Prevalidator

This commit is contained in:
Raphaël Proust 2018-10-11 10:46:19 +08:00
parent 45a07d534f
commit bd116f3db7
No known key found for this signature in database
GPG Key ID: F4B685504488CEC0
3 changed files with 122 additions and 132 deletions

View File

@ -66,6 +66,8 @@ let rec apply_operations apply_operation state r max_ops ~sort ops =
module type T = sig module type T = sig
module Proto: Registered_protocol.T
type state type state
(** Creates a new prevalidation context w.r.t. the protocol associate to the (** Creates a new prevalidation context w.r.t. the protocol associate to the
@ -98,11 +100,19 @@ module type T = sig
state -> state ->
unit unit
val rpc_directory : (state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t type new_operation_input =
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
Operation.shell_header *
Proto.operation_data
) Lwt_watcher.input
val new_operation_input: state -> new_operation_input
end end
module Make(Proto : Registered_protocol.T) : T = struct module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struct
module Proto = Proto
type state = type state =
{ state : Proto.validation_state ; { state : Proto.validation_state ;
@ -230,72 +240,13 @@ module Make(Proto : Registered_protocol.T) : T = struct
let shutdown_operation_input { new_operation_input } = let shutdown_operation_input { new_operation_input } =
Lwt_watcher.shutdown_input new_operation_input Lwt_watcher.shutdown_input new_operation_input
let rpc_directory = type new_operation_input =
let module Proto_services = Block_services.Make(Proto)(Proto) in ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
Operation.shell_header *
Proto.operation_data
) Lwt_watcher.input
let dir : (state * Error_monad.error Preapply_result.t) RPC_directory.t ref = let new_operation_input { new_operation_input } = new_operation_input
ref RPC_directory.empty in
let gen_register s f =
dir := RPC_directory.gen_register !dir s f in
gen_register
(Proto_services.S.Mempool.monitor_operations RPC_path.open_root)
begin fun ({ new_operation_input }, current_mempool) params () ->
let open Preapply_result in
let operation_stream, stopper =
Lwt_watcher.create_stream new_operation_input in
(* Convert ops *)
let map_op op =
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
op.Operation.proto in
Proto.{ shell = op.shell ; protocol_data } in
let fold_op _k (op, _error) acc = map_op op :: acc in
(* First call : retrieve the current set of op from the mempool *)
let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in
let applied = if params#applied then List.map map_op (List.map snd applied) else [] in
let refused = if params#refused then
Operation_hash.Map.fold fold_op refused [] else [] in
let branch_refused = if params#branch_refused then
Operation_hash.Map.fold fold_op branch_refused [] else [] in
let branch_delayed = if params#branch_delayed then
Operation_hash.Map.fold fold_op branch_delayed [] else [] in
let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in
let current_mempool = ref (Some current_mempool) in
let filter_result = function
| `Applied -> params#applied
| `Refused -> params#branch_refused
| `Branch_refused -> params#refused
| `Branch_delayed -> params#branch_delayed
in
let next () =
match !current_mempool with
| Some mempool -> begin
current_mempool := None ;
Lwt.return_some mempool
end
| None -> begin
Lwt_stream.get operation_stream >>= function
| Some (kind, shell, protocol_data) when filter_result kind ->
(* NOTE: Should the protocol change, a new Prevalidation
* context would be created. Thus, we use the same Proto. *)
let bytes = Data_encoding.Binary.to_bytes_exn
Proto.operation_data_encoding
protocol_data in
let protocol_data = Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
bytes in
Lwt.return_some [ { Proto.shell ; protocol_data } ]
| _ -> Lwt.return_none
end
in
let shutdown () = Lwt_watcher.shutdown stopper in
RPC_answer.return_stream { next ; shutdown }
end ;
return !dir
end end

View File

@ -30,6 +30,8 @@
module type T = sig module type T = sig
module Proto: Registered_protocol.T
type state type state
(** Creates a new prevalidation context w.r.t. the protocol associate to the (** Creates a new prevalidation context w.r.t. the protocol associate to the
@ -62,11 +64,17 @@ module type T = sig
state -> state ->
unit unit
val rpc_directory : (state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t type new_operation_input =
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
Operation.shell_header *
Proto.operation_data
) Lwt_watcher.input
val new_operation_input: state -> new_operation_input
end end
module Make(Proto : Registered_protocol.T) : T module Make(Proto : Registered_protocol.T) : T with module Proto = Proto
(** Pre-apply creates a new block and returns it. *) (** Pre-apply creates a new block and returns it. *)
val preapply : val preapply :

View File

@ -38,7 +38,7 @@ module type T = sig
module Proto: Registered_protocol.T module Proto: Registered_protocol.T
val name: name_t val name: name_t
val parameters: limits * Distributed_db.chain_db val parameters: limits * Distributed_db.chain_db
module Prevalidation: Prevalidation.T module Prevalidation: Prevalidation.T with module Proto = Proto
type types_state = { type types_state = {
chain_db : Distributed_db.chain_db ; chain_db : Distributed_db.chain_db ;
limits : limits ; limits : limits ;
@ -55,7 +55,7 @@ module type T = sig
mutable validation_result : error Preapply_result.t ; mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.state tzresult ; mutable validation_state : Prevalidation.state tzresult ;
mutable advertisement : [ `Pending of Mempool.t | `None ] ; mutable advertisement : [ `Pending of Mempool.t | `None ] ;
mutable rpc_directory : types_state RPC_directory.t tzresult Lwt.t lazy_t ; mutable rpc_directory : types_state RPC_directory.t lazy_t ;
} }
module Name: Worker.NAME with type t = name_t module Name: Worker.NAME with type t = name_t
module Types: Worker.TYPES with type state = types_state module Types: Worker.TYPES with type state = types_state
@ -105,7 +105,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
mutable validation_result : error Preapply_result.t ; mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.state tzresult ; mutable validation_state : Prevalidation.state tzresult ;
mutable advertisement : [ `Pending of Mempool.t | `None ] ; mutable advertisement : [ `Pending of Mempool.t | `None ] ;
mutable rpc_directory : types_state RPC_directory.t tzresult Lwt.t lazy_t ; mutable rpc_directory : types_state RPC_directory.t lazy_t ;
} }
module Name = struct module Name = struct
@ -367,57 +367,98 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
| Error _ -> (* should not happen *) | Error _ -> (* should not happen *)
Lwt.return_unit Lwt.return_unit
let rpc_directory_of_protocol protocol = let rpc_directory = lazy (
begin let dir : state RPC_directory.t ref = ref RPC_directory.empty in
match Registered_protocol.get protocol with
| None ->
(* FIXME. *)
(* This should not happen: it should be handled in the validator. *)
failwith "Prevalidation: missing protocol '%a' for the current block."
Protocol_hash.pp_short protocol
| Some protocol ->
return protocol
end >>=? fun (module Proto) ->
let module Proto_services = Block_services.Make(Proto)(Proto) in let module Proto_services = Block_services.Make(Proto)(Proto) in
let dir : state RPC_directory.t ref = ref RPC_directory.empty in dir := RPC_directory.register !dir
let register s f = (Proto_services.S.Mempool.pending_operations RPC_path.open_root)
dir := RPC_directory.register !dir s f in (fun pv () () ->
let map_op op =
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
op.Operation.proto in
{ Proto.shell = op.shell ; protocol_data } in
let map_op_error (op, error) = (map_op op, error) in
return {
Proto_services.Mempool.applied =
List.map
(fun (hash, op) -> (hash, map_op op))
(List.rev pv.validation_result.applied) ;
refused =
Operation_hash.Map.map map_op_error pv.validation_result.refused ;
branch_refused =
Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ;
branch_delayed =
Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ;
unprocessed =
Operation_hash.Map.map map_op pv.pending ;
}) ;
register dir := RPC_directory.gen_register !dir
(Proto_services.S.Mempool.pending_operations RPC_path.open_root) (Proto_services.S.Mempool.monitor_operations RPC_path.open_root)
(fun pv () () -> begin fun { validation_state ; validation_result = current_mempool } params () ->
let map_op op = match validation_state with
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
op.Operation.proto in
{ Proto.shell = op.shell ; protocol_data } in
let map_op_error (op, error) = (map_op op, error) in
return {
Proto_services.Mempool.applied =
List.map
(fun (hash, op) -> (hash, map_op op))
(List.rev pv.validation_result.applied) ;
refused =
Operation_hash.Map.map map_op_error pv.validation_result.refused ;
branch_refused =
Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ;
branch_delayed =
Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ;
unprocessed =
Operation_hash.Map.map map_op pv.pending ;
}) ;
Prevalidation.rpc_directory >>=? fun prevalidation_dir ->
let prevalidation_dir =
RPC_directory.map (fun state ->
match state.validation_state with
| Error _ -> assert false | Error _ -> assert false
| Ok pv -> Lwt.return (pv, state.validation_result) | Ok pv ->
) prevalidation_dir in let new_operation_input = Prevalidation.new_operation_input pv in
let open Preapply_result in
let operation_stream, stopper =
Lwt_watcher.create_stream new_operation_input in
(* Convert ops *)
let map_op op =
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
op.Operation.proto in
Proto.{ shell = op.shell ; protocol_data } in
let fold_op _k (op, _error) acc = map_op op :: acc in
(* First call : retrieve the current set of op from the mempool *)
let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in
let applied = if params#applied then List.map map_op (List.map snd applied) else [] in
let refused = if params#refused then
Operation_hash.Map.fold fold_op refused [] else [] in
let branch_refused = if params#branch_refused then
Operation_hash.Map.fold fold_op branch_refused [] else [] in
let branch_delayed = if params#branch_delayed then
Operation_hash.Map.fold fold_op branch_delayed [] else [] in
let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in
let current_mempool = ref (Some current_mempool) in
let filter_result = function
| `Applied -> params#applied
| `Refused -> params#branch_refused
| `Branch_refused -> params#refused
| `Branch_delayed -> params#branch_delayed
in
let next () =
match !current_mempool with
| Some mempool -> begin
current_mempool := None ;
Lwt.return_some mempool
end
| None -> begin
Lwt_stream.get operation_stream >>= function
| Some (kind, shell, protocol_data) when filter_result kind ->
(* NOTE: Should the protocol change, a new Prevalidation
* context would be created. Thus, we use the same Proto. *)
let bytes = Data_encoding.Binary.to_bytes_exn
Proto.operation_data_encoding
protocol_data in
let protocol_data = Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
bytes in
Lwt.return_some [ { Proto.shell ; protocol_data } ]
| _ -> Lwt.return_none
end
in
let shutdown () = Lwt_watcher.shutdown stopper in
RPC_answer.return_stream { next ; shutdown }
end ;
return (RPC_directory.merge !dir prevalidation_dir) !dir
)
module Handlers = struct module Handlers = struct
@ -502,8 +543,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
end >>= fun (validation_state, validation_result) -> end >>= fun (validation_state, validation_result) ->
debug w "%d operations were not washed by the flush" debug w "%d operations were not washed by the flush"
(Operation_hash.Map.cardinal pending) ; (Operation_hash.Map.cardinal pending) ;
State.Block.protocol_hash pv.predecessor >>= fun old_protocol ->
State.Block.protocol_hash predecessor >>= fun new_protocol ->
pv.predecessor <- predecessor ; pv.predecessor <- predecessor ;
pv.live_blocks <- new_live_blocks ; pv.live_blocks <- new_live_blocks ;
pv.live_operations <- new_live_operations ; pv.live_operations <- new_live_operations ;
@ -513,8 +552,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
pv.in_mempool <- Operation_hash.Set.empty ; pv.in_mempool <- Operation_hash.Set.empty ;
pv.validation_result <- validation_result ; pv.validation_result <- validation_result ;
pv.validation_state <- validation_state ; pv.validation_state <- validation_state ;
if not (Protocol_hash.equal old_protocol new_protocol) then
pv.rpc_directory <- lazy (rpc_directory_of_protocol new_protocol) ;
return_unit return_unit
let on_advertise pv = let on_advertise pv =
@ -566,7 +603,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
Chain.data chain_state >>= fun Chain.data chain_state >>= fun
{ current_head = predecessor ; current_mempool = mempool ; { current_head = predecessor ; current_mempool = mempool ;
live_blocks ; live_operations } -> live_blocks ; live_operations } ->
State.Block.protocol_hash predecessor >>= fun protocol ->
let timestamp = Time.now () in let timestamp = Time.now () in
Prevalidation.start_prevalidation Prevalidation.start_prevalidation
~predecessor ~timestamp () >>= fun validation_state -> ~predecessor ~timestamp () >>= fun validation_state ->
@ -593,7 +629,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
in_mempool = Operation_hash.Set.empty ; in_mempool = Operation_hash.Set.empty ;
validation_result ; validation_state ; validation_result ; validation_state ;
advertisement = `None ; advertisement = `None ;
rpc_directory = lazy (rpc_directory_of_protocol protocol) ; rpc_directory = rpc_directory ;
} in } in
List.iter List.iter
(fun oph -> Lwt.ignore_result (fetch_operation w pv oph)) (fun oph -> Lwt.ignore_result (fetch_operation w pv oph))
@ -771,15 +807,10 @@ let rpc_directory : t option RPC_directory.t =
(Block_services.mempool_path RPC_path.open_root) (Block_services.mempool_path RPC_path.open_root)
(function (function
| None -> | None ->
Lwt.return Lwt.return (RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory)
(RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory)
| Some t -> | Some t ->
let module Prevalidator: T = (val t: T) in let module Prevalidator: T = (val t: T) in
Prevalidator.worker >>= fun w -> Prevalidator.worker >>= fun w ->
let pv = Prevalidator.Worker.state w in let pv = Prevalidator.Worker.state w in
Lazy.force pv.rpc_directory >>= function let pv_rpc_dir = Lazy.force pv.rpc_directory in
| Error _ -> Lwt.return (RPC_directory.map (fun _ -> Lwt.return pv) pv_rpc_dir))
Lwt.return RPC_directory.empty
| Ok rpc_directory ->
Lwt.return
(RPC_directory.map (fun _ -> Lwt.return pv) rpc_directory))