Prevalidator/Prevalidation: move existential

Note that now, the chain_validator is responsible for swapping the
prevalidator when a new protocol arrives.

Co-authored-by: Raphaël Proust <code@bnwr.net>
Co-authored-by: Pietro Abate <pietro.abate@tezcore.com>
Co-authored-by: Grégoire Henry <gregoire.henry@tezos.com>
This commit is contained in:
Raphaël Proust 2018-10-10 14:32:42 +08:00
parent 129caccf4e
commit c4e65879fc
No known key found for this signature in database
GPG Key ID: F4B685504488CEC0
8 changed files with 1080 additions and 817 deletions

View File

@ -333,6 +333,7 @@ let on_request
(* TODO catch other temporary error (e.g. system errors)
and do not 'commit' them on disk... *)
| Error [Canceled | Unavailable_protocol _] as err ->
(* FIXME: Canceled can escape. Canceled is not registered. BOOM! *)
return err
| Error errors ->
Worker.protect w begin fun () ->

View File

@ -25,6 +25,8 @@
open Chain_validator_worker_state
module Log = Tezos_stdlib.Logging.Make(struct let name = "node.chain_validator" end)
module Name = struct
type t = Chain_id.t
let encoding = Chain_id.encoding
@ -72,7 +74,7 @@ module Types = struct
mutable child:
(state * (unit -> unit Lwt.t (* shutdown *))) option ;
prevalidator: Prevalidator.t option ;
mutable prevalidator: Prevalidator.t option ;
active_peers: Peer_validator.t Lwt.t P2p_peer.Table.t ;
bootstrapped_peers: unit P2p_peer.Table.t ;
}
@ -249,6 +251,16 @@ let broadcast_head w ~previous block =
end
end
let safe_get_protocol hash =
match Registered_protocol.get hash with
| None ->
(* FIXME. *)
(* This should not happen: it should be handled in the validator. *)
failwith "chain_validator: missing protocol '%a' for the current block."
Protocol_hash.pp_short hash
| Some protocol ->
return protocol
let on_request (type a) w spawn_child (req : a Request.t) : a tzresult Lwt.t =
let Request.Validated block = req in
let nv = Worker.state w in
@ -266,8 +278,28 @@ let on_request (type a) w spawn_child (req : a Request.t) : a tzresult Lwt.t =
may_update_checkpoint nv.parameters.chain_state block >>= fun () ->
broadcast_head w ~previous block >>= fun () ->
begin match nv.prevalidator with
| Some prevalidator ->
Prevalidator.flush prevalidator block_hash
| Some old_prevalidator ->
State.Block.protocol_hash block >>= fun new_protocol ->
let old_protocol = Prevalidator.protocol_hash old_prevalidator in
begin
if not (Protocol_hash.equal old_protocol new_protocol) then begin
safe_get_protocol new_protocol >>=? fun (module Proto) ->
let (limits, chain_db) = Prevalidator.parameters old_prevalidator in
(* TODO inject in the new prevalidator the operation
from the previous one. *)
Prevalidator.create
limits
(module Proto)
chain_db >>= fun prevalidator ->
nv.prevalidator <- Some prevalidator ;
Prevalidator.shutdown old_prevalidator >>= fun () ->
return_unit
end else begin
Prevalidator.flush old_prevalidator block_hash >>=? fun () ->
return_unit
end
end >>=? fun () ->
return_unit
| None -> return_unit
end >>=? fun () ->
may_switch_test_chain w spawn_child block >>= fun () ->
@ -302,9 +334,20 @@ let on_close w =
let on_launch start_prevalidator w _ parameters =
Chain.init_head parameters.chain_state >>= fun () ->
(if start_prevalidator then
State.read_chain_data parameters.chain_state
(fun _ {State.current_head} -> Lwt.return current_head) >>= fun head ->
State.Block.protocol_hash head >>= fun head_hash ->
safe_get_protocol head_hash >>= function
| Ok (module Proto) ->
Prevalidator.create
parameters.prevalidator_limits parameters.chain_db >>= fun prevalidator ->
Lwt.return_some prevalidator
parameters.prevalidator_limits
(module Proto)
parameters.chain_db >>= fun prevalor ->
Lwt.return_some prevalor
| Error err ->
Log.lwt_log_error "@[Failed to instantiate prevalidator:@ %a@]"
pp_print_error err >>= fun () ->
Lwt.return_none
else Lwt.return_none) >>= fun prevalidator ->
let valid_block_input = Lwt_watcher.create_input () in
let new_head_input = Lwt_watcher.create_input () in
@ -336,12 +379,9 @@ let on_launch start_prevalidator w _ parameters =
may_activate_peer_validator w peer_id >>= fun pv ->
Peer_validator.notify_head pv block ;
(* TODO notify prevalidator only if head is known ??? *)
begin match nv.prevalidator with
| Some prevalidator ->
Prevalidator.notify_operations prevalidator peer_id ops
| None -> ()
end ;
Lwt.return_unit
match nv.prevalidator with
| Some prevalidator -> Prevalidator.notify_operations prevalidator peer_id ops
| None -> Lwt.return_unit
end;
end ;
disconnection = begin fun peer_id ->

View File

@ -23,10 +23,10 @@
(* *)
(*****************************************************************************)
open Preapply_result
open Validation_errors
let rec apply_operations apply_operation state r max_ops ~sort ops =
let open Preapply_result in
Lwt_list.fold_left_s
(fun (state, max_ops, r) (hash, op, parsed_op) ->
apply_operation state max_ops op parsed_op >>= function
@ -63,18 +63,53 @@ let rec apply_operations apply_operation state r max_ops ~sort ops =
| _ ->
Lwt.return (state, max_ops, r)
type prevalidation_state =
State : { proto : ('state, 'operation_data) proto ; state : 'state ;
module type T = sig
type state
(** Creates a new prevalidation context w.r.t. the protocol associate to the
predecessor block . When ?protocol_data is passed to this function, it will
be used to create the new block *)
val start_prevalidation :
?protocol_data: MBytes.t ->
predecessor: State.Block.t ->
timestamp: Time.t ->
unit -> state tzresult Lwt.t
(** Given a prevalidation context applies a list of operations,
returns a new prevalidation context plus the preapply result containing the
list of operations that cannot be applied to this context *)
val prevalidate :
state -> sort:bool ->
(Operation_hash.t * Operation.t) list ->
(state * error Preapply_result.t) Lwt.t
val end_prevalidation :
state ->
Tezos_protocol_environment_shell.validation_result tzresult Lwt.t
val notify_operation :
state ->
error Preapply_result.t ->
unit
val shutdown_operation_input :
state ->
unit
val rpc_directory : (state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t
end
module Make(Proto : Registered_protocol.T) : T = struct
type state =
{ state : Proto.validation_state ;
max_number_of_operations : int ;
new_operation_input : ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
Operation.shell_header * 'operation_data) Lwt_watcher.input ;
Operation.shell_header * Proto.operation_data) Lwt_watcher.input ;
}
-> prevalidation_state
and ('state, 'operation_data) proto =
(module Registered_protocol.T
with type P.validation_state = 'state
and type P.operation_data = 'operation_data )
let start_prevalidation
?protocol_data
@ -85,18 +120,7 @@ let start_prevalidation
level = predecessor_level } } =
State.Block.header predecessor in
State.Block.context predecessor >>= fun predecessor_context ->
Context.get_protocol predecessor_context >>= fun protocol ->
let predecessor_hash = State.Block.hash predecessor in
begin
match Registered_protocol.get protocol with
| None ->
(* FIXME. *)
(* This should not happen: it should be handled in the validator. *)
failwith "Prevalidation: missing protocol '%a' for the current block."
Protocol_hash.pp_short protocol
| Some protocol ->
return protocol
end >>=? fun (module Proto) ->
Context.reset_test_chain
predecessor_context predecessor_hash
timestamp >>= fun predecessor_context ->
@ -126,14 +150,10 @@ let start_prevalidation
(* FIXME arbitrary value, to be customisable *)
let max_number_of_operations = 1000 in
let new_operation_input = Lwt_watcher.create_input () in
return (State { proto = (module Proto) ; state ;
max_number_of_operations ;
new_operation_input ;
})
return { state ; max_number_of_operations ; new_operation_input ; }
let prevalidate
(State { proto = (module Proto) ; state ;
max_number_of_operations ; new_operation_input })
{ state ; max_number_of_operations ; new_operation_input ; }
~sort (ops : (Operation_hash.t * Operation.t) list) =
let ops =
List.map
@ -157,6 +177,7 @@ let prevalidate
(fun (h, op, parsed_op) -> match parsed_op with
| Ok parsed_op -> Some (h, op, parsed_op)
| Error _ -> None) ops in
ignore invalid_ops; (* FIXME *)
let sorted_ops =
if sort then
let compare (_, _, op1) (_, _, op2) = Proto.compare_operations op1 op2 in
@ -182,69 +203,14 @@ let prevalidate
List.fold_left
(fun map (h, op, err) -> Operation_hash.Map.add h (op, err) map)
r.branch_refused invalid_ops } in
Lwt.return (State { proto = (module Proto) ; state ;
max_number_of_operations ; new_operation_input },
r)
Lwt.return ({ state ; max_number_of_operations ; new_operation_input ; }, r)
let end_prevalidation (State { proto = (module Proto) ; state }) =
let end_prevalidation { state } =
Proto.finalize_block state >>=? fun (result, _metadata) ->
return result
let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops =
start_prevalidation
~protocol_data ~predecessor ~timestamp () >>=? fun validation_state ->
let ops = List.map (List.map (fun x -> Operation.hash x, x)) ops in
Lwt_list.fold_left_s
(fun (validation_state, rs) ops ->
prevalidate
validation_state ~sort ops >>= fun (validation_state, r) ->
Lwt.return (validation_state, rs @ [r]))
(validation_state, []) ops >>= fun (validation_state, rs) ->
let operations_hash =
Operation_list_list_hash.compute
(List.map
(fun r ->
Operation_list_hash.compute
(List.map fst r.Preapply_result.applied))
rs) in
end_prevalidation validation_state >>=? fun validation_result ->
let pred_shell_header = State.Block.shell_header predecessor in
let level = Int32.succ pred_shell_header.level in
Block_validator.may_patch_protocol
~level validation_result >>=? fun { fitness ; context ; message } ->
State.Block.protocol_hash predecessor >>= fun pred_protocol ->
Context.get_protocol context >>= fun protocol ->
let proto_level =
if Protocol_hash.equal protocol pred_protocol then
pred_shell_header.proto_level
else
((pred_shell_header.proto_level + 1) mod 256) in
let shell_header : Block_header.shell_header = {
level ;
proto_level ;
predecessor = State.Block.hash predecessor ;
timestamp ;
validation_passes = List.length rs ;
operations_hash ;
fitness ;
context = Context_hash.zero ; (* place holder *)
} in
begin
if Protocol_hash.equal protocol pred_protocol then
return (context, message)
else
match Registered_protocol.get protocol with
| None ->
fail (Block_validator_errors.Unavailable_protocol
{ block = State.Block.hash predecessor ; protocol })
| Some (module NewProto) ->
NewProto.init context shell_header >>=? fun { context ; message ; _ } ->
return (context, message)
end >>=? fun (context, message) ->
Context.hash ?message ~time:timestamp context >>= fun context ->
return ({ shell_header with context }, rs)
let notify_operation (State { proto = (module Proto) ; new_operation_input ; }) result =
let notify_operation { new_operation_input } result =
let open Preapply_result in
let { applied ; refused ; branch_refused ; branch_delayed } = result in
(* Notify new opperations *)
let map_op kind { Operation.shell ; proto } =
@ -261,23 +227,13 @@ let notify_operation (State { proto = (module Proto) ; new_operation_input ; })
let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in
List.iter (Lwt_watcher.notify new_operation_input) ops
let shutdown_operation_input (State { new_operation_input }) =
let shutdown_operation_input { new_operation_input } =
Lwt_watcher.shutdown_input new_operation_input
let build_rpc_directory protocol =
begin
match Registered_protocol.get protocol with
| None ->
(* FIXME. *)
(* This should not happen: it should be handled in the validator. *)
failwith "Prevalidation: missing protocol '%a' for the current block."
Protocol_hash.pp_short protocol
| Some protocol ->
return protocol
end >>=? fun (module Proto) ->
let rpc_directory =
let module Proto_services = Block_services.Make(Proto)(Proto) in
let dir : (prevalidation_state * Error_monad.error Preapply_result.t) RPC_directory.t ref =
let dir : (state * Error_monad.error Preapply_result.t) RPC_directory.t ref =
ref RPC_directory.empty in
let gen_register s f =
@ -285,7 +241,8 @@ let build_rpc_directory protocol =
gen_register
(Proto_services.S.Mempool.monitor_operations RPC_path.open_root)
begin fun ((State { new_operation_input ; proto = (module Next_proto) }), current_mempool) params () ->
begin fun ({ new_operation_input }, current_mempool) params () ->
let open Preapply_result in
let operation_stream, stopper =
Lwt_watcher.create_stream new_operation_input in
(* Convert ops *)
@ -322,8 +279,10 @@ let build_rpc_directory protocol =
| None -> begin
Lwt_stream.get operation_stream >>= function
| Some (kind, shell, protocol_data) when filter_result kind ->
(* NOTE: Should the protocol change, a new Prevalidation
* context would be created. Thus, we use the same Proto. *)
let bytes = Data_encoding.Binary.to_bytes_exn
Next_proto.operation_data_encoding
Proto.operation_data_encoding
protocol_data in
let protocol_data = Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
@ -337,3 +296,72 @@ let build_rpc_directory protocol =
end ;
return !dir
end
let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops =
State.Block.context predecessor >>= fun predecessor_context ->
Context.get_protocol predecessor_context >>= fun protocol ->
begin
match Registered_protocol.get protocol with
| None ->
(* FIXME. *)
(* This should not happen: it should be handled in the validator. *)
failwith "Prevalidation: missing protocol '%a' for the current block."
Protocol_hash.pp_short protocol
| Some protocol ->
return protocol
end >>=? fun (module Proto) ->
let module Prevalidation = Make(Proto) in
Prevalidation.start_prevalidation
~protocol_data ~predecessor ~timestamp () >>=? fun validation_state ->
let ops = List.map (List.map (fun x -> Operation.hash x, x)) ops in
Lwt_list.fold_left_s
(fun (validation_state, rs) ops ->
Prevalidation.prevalidate
validation_state ~sort ops >>= fun (validation_state, r) ->
Lwt.return (validation_state, rs @ [r]))
(validation_state, []) ops >>= fun (validation_state, rs) ->
let operations_hash =
Operation_list_list_hash.compute
(List.map
(fun r ->
Operation_list_hash.compute
(List.map fst r.Preapply_result.applied))
rs) in
Prevalidation.end_prevalidation validation_state >>=? fun validation_result ->
let pred_shell_header = State.Block.shell_header predecessor in
let level = Int32.succ pred_shell_header.level in
Block_validator.may_patch_protocol
~level validation_result >>=? fun { fitness ; context ; message } ->
State.Block.protocol_hash predecessor >>= fun pred_protocol ->
Context.get_protocol context >>= fun protocol ->
let proto_level =
if Protocol_hash.equal protocol pred_protocol then
pred_shell_header.proto_level
else
((pred_shell_header.proto_level + 1) mod 256) in
let shell_header : Block_header.shell_header = {
level ;
proto_level ;
predecessor = State.Block.hash predecessor ;
timestamp ;
validation_passes = List.length rs ;
operations_hash ;
fitness ;
context = Context_hash.zero ; (* place holder *)
} in
begin
if Protocol_hash.equal protocol pred_protocol then
return (context, message)
else
match Registered_protocol.get protocol with
| None ->
fail (Block_validator_errors.Unavailable_protocol
{ block = State.Block.hash predecessor ; protocol })
| Some (module NewProto) ->
NewProto.init context shell_header >>=? fun { context ; message ; _ } ->
return (context, message)
end >>=? fun (context, message) ->
Context.hash ?message ~time:timestamp context >>= fun context ->
return ({ shell_header with context }, rs)

View File

@ -28,7 +28,9 @@
consistency. This module is stateless and creates and manupulates the
prevalidation_state. *)
type prevalidation_state
module type T = sig
type state
(** Creates a new prevalidation context w.r.t. the protocol associate to the
predecessor block . When ?protocol_data is passed to this function, it will
@ -37,22 +39,36 @@ val start_prevalidation :
?protocol_data: MBytes.t ->
predecessor: State.Block.t ->
timestamp: Time.t ->
unit -> prevalidation_state tzresult Lwt.t
unit -> state tzresult Lwt.t
(** Given a prevalidation context applies a list of operations,
returns a new prevalidation context plus the preapply result containing the
list of operations that cannot be applied to this context *)
val prevalidate :
prevalidation_state -> sort:bool ->
state -> sort:bool ->
(Operation_hash.t * Operation.t) list ->
(prevalidation_state * error Preapply_result.t) Lwt.t
(state * error Preapply_result.t) Lwt.t
val end_prevalidation :
prevalidation_state ->
state ->
Tezos_protocol_environment_shell.validation_result tzresult Lwt.t
(** Pre-apply creates a new block ( running start_prevalidation, prevalidate and
end_prevalidation), and returns a new block. *)
val notify_operation :
state ->
error Preapply_result.t ->
unit
val shutdown_operation_input :
state ->
unit
val rpc_directory : (state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t
end
module Make(Proto : Registered_protocol.T) : T
(** Pre-apply creates a new block and returns it. *)
val preapply :
predecessor:State.Block.t ->
timestamp:Time.t ->
@ -60,16 +76,3 @@ val preapply :
sort_operations:bool ->
Operation.t list list ->
(Block_header.shell_header * error Preapply_result.t list) tzresult Lwt.t
val notify_operation :
prevalidation_state ->
error Preapply_result.t ->
unit
val shutdown_operation_input :
prevalidation_state ->
unit
val build_rpc_directory :
Protocol_hash.t ->
(prevalidation_state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t

View File

@ -31,21 +31,65 @@ type limits = {
worker_limits : Worker_types.limits ;
}
module Name = struct
type t = Chain_id.t
let encoding = Chain_id.encoding
let base = [ "prevalidator" ]
let pp = Chain_id.pp_short
type name_t = (Chain_id.t * Protocol_hash.t)
module type T = sig
module Proto: Registered_protocol.T
val name: name_t
val parameters: limits * Distributed_db.chain_db
module Prevalidation: Prevalidation.T
type types_state = {
chain_db : Distributed_db.chain_db ;
limits : limits ;
mutable predecessor : State.Block.t ;
mutable timestamp : Time.t ;
mutable live_blocks : Block_hash.Set.t ;
mutable live_operations : Operation_hash.Set.t ;
refused : Operation_hash.t Ring.t ;
mutable refusals : error list Operation_hash.Map.t ;
mutable fetching : Operation_hash.Set.t ;
mutable pending : Operation.t Operation_hash.Map.t ;
mutable mempool : Mempool.t ;
mutable in_mempool : Operation_hash.Set.t ;
mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.state tzresult ;
mutable advertisement : [ `Pending of Mempool.t | `None ] ;
mutable rpc_directory : types_state RPC_directory.t tzresult Lwt.t lazy_t ;
}
module Name: Worker.NAME with type t = name_t
module Types: Worker.TYPES with type state = types_state
module Worker: Worker.T
with type Event.t = Event.t
and type 'a Request.t = 'a Request.t
and type Request.view = Request.view
and type Types.state = types_state
type worker = Worker.infinite Worker.queue Worker.t
val list_pendings:
?maintain_chain_db:Distributed_db.chain_db ->
from_block:State.Block.t ->
to_block:State.Block.t ->
Operation.t Operation_hash.Map.t ->
(Operation.t Operation_hash.Map.t * Block_hash.Set.t * Operation_hash.Set.t) Lwt.t
val worker: worker Lwt.t
end
module Types = struct
(* Invariants:
- an operation is in only one of these sets (map domains):
pv.refusals pv.pending pv.fetching pv.live_operations pv.in_mempool
- pv.in_mempool is the domain of all fields of pv.prevalidation_result
- pv.prevalidation_result.refused = Ø, refused ops are in pv.refused
- the 'applied' operations in pv.validation_result are in reverse order. *)
type state = {
module type ARG = sig
val limits: limits
val chain_db: Distributed_db.chain_db
val chain_id: Chain_id.t
end
type t = (module T)
module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
module Proto = Proto
let name = (Arg.chain_id, Proto.hash)
let parameters = (Arg.limits, Arg.chain_db)
module Prevalidation = Prevalidation.Make(Proto)
type types_state = {
chain_db : Distributed_db.chain_db ;
limits : limits ;
mutable predecessor : State.Block.t ;
@ -59,10 +103,40 @@ module Types = struct
mutable mempool : Mempool.t ;
mutable in_mempool : Operation_hash.Set.t ;
mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.prevalidation_state tzresult ;
mutable validation_state : Prevalidation.state tzresult ;
mutable advertisement : [ `Pending of Mempool.t | `None ] ;
mutable rpc_directory : state RPC_directory.t tzresult Lwt.t lazy_t ;
mutable rpc_directory : types_state RPC_directory.t tzresult Lwt.t lazy_t ;
}
module Name = struct
type t = name_t
let encoding =
Data_encoding.tup2
Chain_id.encoding
Protocol_hash.encoding
let chain_id_string =
let _: string = Format.flush_str_formatter () in
Chain_id.pp_short Format.str_formatter Arg.chain_id;
Format.flush_str_formatter ()
let proto_hash_string =
let _: string = Format.flush_str_formatter () in
Protocol_hash.pp_short Format.str_formatter Proto.hash;
Format.flush_str_formatter ()
let base = [ "prevalidator" ; chain_id_string ; proto_hash_string ]
let pp fmt (chain_id, proto_hash) =
Chain_id.pp_short fmt chain_id;
Format.pp_print_string fmt ".";
Protocol_hash.pp_short fmt proto_hash
end
module Types = struct
(* Invariants:
- an operation is in only one of these sets (map domains):
pv.refusals pv.pending pv.fetching pv.live_operations pv.in_mempool
- pv.in_mempool is the domain of all fields of pv.prevalidation_result
- pv.prevalidation_result.refused = Ø, refused ops are in pv.refused
- the 'applied' operations in pv.validation_result are in reverse order. *)
type state = types_state
type parameters = limits * Distributed_db.chain_db
include Worker_state
@ -87,81 +161,23 @@ module Types = struct
end
module Worker = Worker.Make (Name) (Event) (Request) (Types)
module Worker: Worker.T
with type Name.t = Name.t
and type Event.t = Event.t
and type 'a Request.t = 'a Request.t
and type Request.view = Request.view
and type Types.state = Types.state
and type Types.parameters = Types.parameters
= Worker.Make (Name) (Prevalidator_worker_state.Event)
(Prevalidator_worker_state.Request) (Types)
open Types
type t = Worker.infinite Worker.queue Worker.t
type error += Closed = Worker.Closed
type worker = Worker.infinite Worker.queue Worker.t
let debug w =
Format.kasprintf (fun msg -> Worker.record_event w (Debug msg))
let empty_rpc_directory : unit RPC_directory.t =
RPC_directory.register
RPC_directory.empty
(Block_services.Empty.S.Mempool.pending_operations RPC_path.open_root)
(fun _pv () () ->
return {
Block_services.Empty.Mempool.applied = [] ;
refused = Operation_hash.Map.empty ;
branch_refused = Operation_hash.Map.empty ;
branch_delayed = Operation_hash.Map.empty ;
unprocessed = Operation_hash.Map.empty ;
})
let rpc_directory protocol =
begin
match Registered_protocol.get protocol with
| None ->
(* FIXME. *)
(* This should not happen: it should be handled in the validator. *)
failwith "Prevalidation: missing protocol '%a' for the current block."
Protocol_hash.pp_short protocol
| Some protocol ->
return protocol
end >>=? fun (module Proto) ->
let module Proto_services = Block_services.Make(Proto)(Proto) in
let dir : state RPC_directory.t ref = ref RPC_directory.empty in
let register s f =
dir := RPC_directory.register !dir s f in
register
(Proto_services.S.Mempool.pending_operations RPC_path.open_root)
(fun pv () () ->
let map_op op =
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
op.Operation.proto in
{ Proto.shell = op.shell ; protocol_data } in
let map_op_error (op, error) = (map_op op, error) in
return {
Proto_services.Mempool.applied =
List.map
(fun (hash, op) -> (hash, map_op op))
(List.rev pv.validation_result.applied) ;
refused =
Operation_hash.Map.map map_op_error pv.validation_result.refused ;
branch_refused =
Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ;
branch_delayed =
Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ;
unprocessed =
Operation_hash.Map.map map_op pv.pending ;
}) ;
Prevalidation.build_rpc_directory protocol >>=? fun prevalidation_dir ->
let prevalidation_dir =
RPC_directory.map (fun state ->
match state.validation_state with
| Error _ -> assert false
| Ok pv -> Lwt.return (pv, state.validation_result)
) prevalidation_dir in
return (RPC_directory.merge !dir prevalidation_dir)
let list_pendings ?maintain_chain_db ~from_block ~to_block old_mempool =
let rec pop_blocks ancestor block mempool =
let hash = State.Block.hash block in
@ -254,7 +270,7 @@ let merge_validation_results ~old ~neu =
(filter_out neu.applied old.branch_delayed)
neu.branch_delayed }
let advertise (w : t) pv mempool =
let advertise (w : worker) pv mempool =
match pv.advertisement with
| `Pending { Mempool.known_valid ; pending } ->
pv.advertisement <-
@ -351,6 +367,62 @@ let fetch_operation w pv ?peer oph =
| Error _ -> (* should not happen *)
Lwt.return_unit
let rpc_directory_of_protocol protocol =
begin
match Registered_protocol.get protocol with
| None ->
(* FIXME. *)
(* This should not happen: it should be handled in the validator. *)
failwith "Prevalidation: missing protocol '%a' for the current block."
Protocol_hash.pp_short protocol
| Some protocol ->
return protocol
end >>=? fun (module Proto) ->
let module Proto_services = Block_services.Make(Proto)(Proto) in
let dir : state RPC_directory.t ref = ref RPC_directory.empty in
let register s f =
dir := RPC_directory.register !dir s f in
register
(Proto_services.S.Mempool.pending_operations RPC_path.open_root)
(fun pv () () ->
let map_op op =
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
op.Operation.proto in
{ Proto.shell = op.shell ; protocol_data } in
let map_op_error (op, error) = (map_op op, error) in
return {
Proto_services.Mempool.applied =
List.map
(fun (hash, op) -> (hash, map_op op))
(List.rev pv.validation_result.applied) ;
refused =
Operation_hash.Map.map map_op_error pv.validation_result.refused ;
branch_refused =
Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ;
branch_delayed =
Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ;
unprocessed =
Operation_hash.Map.map map_op pv.pending ;
}) ;
Prevalidation.rpc_directory >>=? fun prevalidation_dir ->
let prevalidation_dir =
RPC_directory.map (fun state ->
match state.validation_state with
| Error _ -> assert false
| Ok pv -> Lwt.return (pv, state.validation_result)
) prevalidation_dir in
return (RPC_directory.merge !dir prevalidation_dir)
module Handlers = struct
type self = worker
let on_operation_arrived (pv : state) oph op =
pv.fetching <- Operation_hash.Set.remove oph pv.fetching ;
if not (Block_hash.Set.mem op.Operation.shell.branch pv.live_blocks) then begin
@ -442,7 +514,7 @@ let on_flush w pv predecessor =
pv.validation_result <- validation_result ;
pv.validation_state <- validation_state ;
if not (Protocol_hash.equal old_protocol new_protocol) then
pv.rpc_directory <- lazy (rpc_directory new_protocol) ;
pv.rpc_directory <- lazy (rpc_directory_of_protocol new_protocol) ;
return_unit
let on_advertise pv =
@ -453,7 +525,7 @@ let on_advertise pv =
Distributed_db.Advertise.current_head pv.chain_db ~mempool pv.predecessor
let on_request
: type r. t -> r Request.t -> r tzresult Lwt.t
: type r. worker -> r Request.t -> r tzresult Lwt.t
= fun w request ->
let pv = Worker.state w in
begin match request with
@ -521,7 +593,7 @@ let on_launch w _ (limits, chain_db) =
in_mempool = Operation_hash.Set.empty ;
validation_result ; validation_state ;
advertisement = `None ;
rpc_directory = lazy (rpc_directory protocol) ;
rpc_directory = lazy (rpc_directory_of_protocol protocol) ;
} in
List.iter
(fun oph -> Lwt.ignore_result (fetch_operation w pv oph))
@ -538,64 +610,160 @@ let on_completion w r _ st =
Worker.record_event w (Event.Request (Request.view r, st, None)) ;
Lwt.return_unit
let on_no_request _ = return_unit
end
let table = Worker.create_table Queue
let create limits chain_db =
let chain_state = Distributed_db.chain_state chain_db in
let module Handlers = struct
type self = t
let on_launch = on_launch
let on_request = on_request
let on_close = on_close
let on_error = on_error
let on_completion = on_completion
let on_no_request _ = return_unit
end in
Worker.launch table limits.worker_limits
(State.Chain.id chain_state)
(limits, chain_db)
(* NOTE: we register a single worker for each instantiation of this Make
* functor (and thus a single worker for the single instantiaion of Worker).
* Whislt this is somewhat abusing the intended purpose of worker, it is part
* of a transition plan to a one-worker-per-peer architecture. *)
let worker =
Worker.launch table Arg.limits.worker_limits
name
(Arg.limits, Arg.chain_db)
(module Handlers)
let shutdown = Worker.shutdown
end
let flush w head =
Worker.push_request_and_wait w (Flush head)
module ChainProto_registry =
Registry.Make(struct
type v = t
type t = (Chain_id.t * Protocol_hash.t)
let compare (c1, p1) (c2, p2) =
let pc = Protocol_hash.compare p1 p2 in
if pc = 0 then
Chain_id.compare c1 c2
else
pc
end)
let notify_operations w peer mempool =
Worker.push_request_now w (Notify (peer, mempool))
let operations w =
let pv = Worker.state w in
{ pv.validation_result with
let create limits (module Proto: Registered_protocol.T) chain_db =
let chain_state = Distributed_db.chain_state chain_db in
let chain_id = State.Chain.id chain_state in
match ChainProto_registry.query (chain_id, Proto.hash) with
| None ->
let module Prevalidator =
Make(Proto)(struct
let limits = limits
let chain_db = chain_db
let chain_id = chain_id
end) in
Prevalidator.worker >>= fun _ ->
ChainProto_registry.register Prevalidator.name (module Prevalidator: T);
Lwt.return (module Prevalidator: T)
| Some p ->
Lwt.return p
let shutdown (t:t) =
let module Prevalidator: T = (val t) in
Prevalidator.worker >>= fun w ->
ChainProto_registry.remove Prevalidator.name;
Prevalidator.Worker.shutdown w
let flush (t:t) head =
let module Prevalidator: T = (val t) in
Prevalidator.worker >>= fun w ->
Prevalidator.Worker.push_request_and_wait w (Request.Flush head)
let notify_operations (t:t) peer mempool =
let module Prevalidator: T = (val t) in
Prevalidator.worker >>= fun w ->
Prevalidator.Worker.push_request w (Request.Notify (peer, mempool))
let operations (t:t) =
let module Prevalidator: T = (val t) in
match Lwt.state Prevalidator.worker with
| Lwt.Fail _ | Lwt.Sleep ->
(* FIXME: this shouldn't happen at all, here we return a safe value *)
(Preapply_result.empty, Operation_hash.Map.empty)
| Lwt.Return w ->
let pv = Prevalidator.Worker.state w in
({ pv.Prevalidator.validation_result with
applied = List.rev pv.validation_result.applied },
pv.pending
pv.pending)
let pending ?block w =
let pv = Worker.state w in
let pending ?block (t:t) =
let module Prevalidator: T = (val t) in
Prevalidator.worker >>= fun w ->
let pv = Prevalidator.Worker.state w in
let ops = Preapply_result.operations pv.validation_result in
match block with
| Some to_block ->
list_pendings
Prevalidator.list_pendings
~from_block:pv.predecessor ~to_block ops >>= fun (pending, _, _) ->
Lwt.return pending
| None -> Lwt.return ops
let timestamp w =
let pv = Worker.state w in
pv.timestamp
let timestamp (t:t) =
let module Prevalidator: T = (val t) in
Prevalidator.worker >>= fun w ->
let pv = Prevalidator.Worker.state w in
Lwt.return pv.timestamp
let inject_operation w op =
Worker.push_request_and_wait w (Inject op)
let inject_operation (t:t) op =
let module Prevalidator: T = (val t) in
Prevalidator.worker >>= fun w ->
Prevalidator.Worker.push_request_and_wait w (Inject op)
let status = Worker.status
let status (t:t) =
let module Prevalidator: T = (val t) in
Prevalidator.worker >>= fun w ->
Lwt.return (Prevalidator.Worker.status w)
let running_workers () = Worker.list table
let running_workers () =
ChainProto_registry.fold
(fun (id, proto) t acc -> (id, proto, t) :: acc)
[]
let pending_requests t = Worker.pending_requests t
let pending_requests (t:t) =
let module Prevalidator: T = (val t) in
match Lwt.state Prevalidator.worker with
| Lwt.Fail _ | Lwt.Sleep ->
(* FIXME: this shouldn't happen at all, here we return a safe value *)
[]
| Lwt.Return w -> Prevalidator.Worker.pending_requests w
let current_request t = Worker.current_request t
let current_request (t:t) =
let module Prevalidator: T = (val t) in
match Lwt.state Prevalidator.worker with
| Lwt.Fail _ | Lwt.Sleep ->
(* FIXME: this shouldn't happen at all, here we return a safe value *)
None
| Lwt.Return w -> Prevalidator.Worker.current_request w
let last_events (t:t) =
let module Prevalidator: T = (val t) in
match Lwt.state Prevalidator.worker with
| Lwt.Fail _ | Lwt.Sleep ->
(* FIXME: this shouldn't happen at all, here we return a safe value *)
[]
| Lwt.Return w -> Prevalidator.Worker.last_events w
let protocol_hash (t:t) =
let module Prevalidator: T = (val t) in
Prevalidator.Proto.hash
let parameters (t:t) =
let module Prevalidator: T = (val t) in
Prevalidator.parameters
let empty_rpc_directory : unit RPC_directory.t =
RPC_directory.register
RPC_directory.empty
(Block_services.Empty.S.Mempool.pending_operations RPC_path.open_root)
(fun _pv () () ->
return {
Block_services.Empty.Mempool.applied = [] ;
refused = Operation_hash.Map.empty ;
branch_refused = Operation_hash.Map.empty ;
branch_delayed = Operation_hash.Map.empty ;
unprocessed = Operation_hash.Map.empty ;
})
let last_events = Worker.last_events
let rpc_directory : t option RPC_directory.t =
RPC_directory.register_dynamic_directory
@ -605,8 +773,10 @@ let rpc_directory : t option RPC_directory.t =
| None ->
Lwt.return
(RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory)
| Some w ->
let pv = Worker.state w in
| Some t ->
let module Prevalidator: T = (val t: T) in
Prevalidator.worker >>= fun w ->
let pv = Prevalidator.Worker.state w in
Lazy.force pv.rpc_directory >>= function
| Error _ ->
Lwt.return RPC_directory.empty

View File

@ -25,25 +25,29 @@
(** Tezos Shell - Prevalidation of pending operations (a.k.a Mempool) *)
(** The prevalidation worker is in charge of the "mempool" (a.k.a. the
(** The prevalidator is in charge of the "mempool" (a.k.a. the
set of known not-invalid-for-sure operations that are not yet
included in the blockchain).
The worker also maintains a sorted subset of the mempool that
The prevalidator also maintains a sorted subset of the mempool that
might correspond to a valid block on top of the current head. The
"in-progress" context produced by the application of those
operations is called the (pre)validation context.
Before to include an operation into the mempool, the prevalidation
Before including an operation into the mempool, the prevalidation
worker tries to append the operation the prevalidation context. If
the operation is (strongly) refused, it will not be added into the
mempool and then it will be ignored by the node and never
broadcasted. If the operation is only "branch_refused" or
broadcast. If the operation is only "branch_refused" or
"branch_delayed", the operation won't be appended in the
prevalidation context, but still broadcasted.
prevalidation context, but still broadcast.
*)
(** An (abstract) prevalidator context. Separate prevalidator contexts should be
* used for separate chains (e.g., mainchain vs testchain). *)
type t
type limits = {
@ -52,29 +56,27 @@ type limits = {
worker_limits : Worker_types.limits ;
}
type error += Closed of Chain_id.t
(** Creates a new worker. Each chain is associated with a prevalidator. Typically,
this is the case for the main chain and a test chain *)
val create: limits -> Distributed_db.chain_db -> t Lwt.t
(** Creates/tear-down a new prevalidator context. *)
val create:
limits ->
(module Registered_protocol.T) ->
Distributed_db.chain_db ->
t Lwt.t
val shutdown: t -> unit Lwt.t
(** Notify the prevalidator worker of a set of operations (in the form of a mempool)
received from a peer. *)
val notify_operations: t -> P2p_peer.Id.t -> Mempool.t -> unit
(** Notify the prevalidator that the identified peer has sent a bunch of
* operations relevant to the specified context. *)
val notify_operations: t -> P2p_peer.Id.t -> Mempool.t -> unit Lwt.t
(** Notify the prevalidator worker of a new injected operation. This will be added
to the mempool of the worker *)
(** Notify the prevalidator worker of a new injected operation. *)
val inject_operation: t -> Operation.t -> unit tzresult Lwt.t
(** Notify the prevalidator worker that a new head was received. The new head will
cause the reset of the prevalidation context *)
(** Notify the prevalidator that a new head has been selected. *)
val flush: t -> Block_hash.t -> unit tzresult Lwt.t
(** Returns the timestamp of the prevalidator worker, that is the timestamp of the last
reset of the prevalidation context *)
val timestamp: t -> Time.t
val timestamp: t -> Time.t Lwt.t
(** Returns the list of valid operations known to this prevalidation worker *)
val operations: t -> error Preapply_result.t * Operation.t Operation_hash.Map.t
@ -82,12 +84,22 @@ val operations: t -> error Preapply_result.t * Operation.t Operation_hash.Map.t
(** Returns the list of pending operations known to this prevalidation worker *)
val pending: ?block:State.Block.t -> t -> Operation.t Operation_hash.Map.t Lwt.t
(** Returns the list of prevalidation workers running and their associated chain *)
val running_workers: unit -> (Chain_id.t * t) list
(** Returns the list of prevalidation contexts running and their associated chain *)
val running_workers: unit -> (Chain_id.t * Protocol_hash.t * t) list
(** Two functions that are useful for managing the prevalidator's transition
* from one protocol to the next. *)
(** Returns the hash of the protocol the prevalidator was instantiated with *)
val protocol_hash: t -> Protocol_hash.t
(** Returns the parameters the prevalidator was created with. *)
val parameters: t -> limits * Distributed_db.chain_db
(** Worker status and events *)
val status: t -> Worker_types.worker_status
(* None indicates the there are no workers for the current protocol. *)
val status: t -> Worker_types.worker_status Lwt.t
val pending_requests : t -> (Time.t * Prevalidator_worker_state.Request.view) list
val current_request : t -> (Time.t * Time.t * Prevalidator_worker_state.Request.view) option
val last_events : t -> (Lwt_log_core.level * Prevalidator_worker_state.Event.t list) list

View File

@ -36,20 +36,29 @@ let build_rpc_directory state =
(* Workers : Prevalidators *)
register0 Worker_services.Prevalidators.S.list begin fun () () ->
return
(List.map
(fun (id, w) -> (id, Prevalidator.status w))
(Prevalidator.running_workers ()))
let workers = Prevalidator.running_workers () in
Lwt_list.map_p
(fun (chain_id, _, t) ->
Prevalidator.status t >>= fun status ->
Lwt.return (chain_id, status))
workers >>= fun info ->
return info
end ;
register1 Worker_services.Prevalidators.S.state begin fun chain () () ->
Chain_directory.get_chain_id state chain >>= fun chain_id ->
let w = List.assoc chain_id (Prevalidator.running_workers ()) in
let workers = Prevalidator.running_workers () in
let (_, _, t) =
(* NOTE: it is technically possible to use the Prevalidator interface to
* register multiple Prevalidator for a single chain (using distinct
* protocols). However, this is never done. *)
List.find (fun (c, _, _) -> Chain_id.equal c chain_id) workers in
Prevalidator.status t >>= fun status ->
return
{ Worker_types.status = Prevalidator.status w ;
pending_requests = Prevalidator.pending_requests w ;
backlog = Prevalidator.last_events w ;
current_request = Prevalidator.current_request w }
{ Worker_types.status = status ;
pending_requests = Prevalidator.pending_requests t ;
backlog = Prevalidator.last_events t ;
current_request = Prevalidator.current_request t }
end ;
(* Workers : Block_validator *)

View File

@ -42,7 +42,7 @@ module Prevalidators = struct
let state =
RPC_service.get_service
~description:"Introspect the state of a prevalidator worker."
~description:"Introspect the state of prevalidator workers."
~query: RPC_query.empty
~output:
(Worker_types.full_status_encoding