diff --git a/src/lib_shell/prevalidation.ml b/src/lib_shell/prevalidation.ml index ed365c951..d87d8f977 100644 --- a/src/lib_shell/prevalidation.ml +++ b/src/lib_shell/prevalidation.ml @@ -66,6 +66,8 @@ let rec apply_operations apply_operation state r max_ops ~sort ops = module type T = sig + module Proto: Registered_protocol.T + type state (** Creates a new prevalidation context w.r.t. the protocol associate to the @@ -98,11 +100,19 @@ module type T = sig state -> unit - val rpc_directory : (state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t + type new_operation_input = + ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * + Operation.shell_header * + Proto.operation_data + ) Lwt_watcher.input + + val new_operation_input: state -> new_operation_input end -module Make(Proto : Registered_protocol.T) : T = struct +module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struct + + module Proto = Proto type state = { state : Proto.validation_state ; @@ -230,72 +240,13 @@ module Make(Proto : Registered_protocol.T) : T = struct let shutdown_operation_input { new_operation_input } = Lwt_watcher.shutdown_input new_operation_input - let rpc_directory = - let module Proto_services = Block_services.Make(Proto)(Proto) in + type new_operation_input = + ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * + Operation.shell_header * + Proto.operation_data + ) Lwt_watcher.input - let dir : (state * Error_monad.error Preapply_result.t) RPC_directory.t ref = - ref RPC_directory.empty in - - let gen_register s f = - dir := RPC_directory.gen_register !dir s f in - - gen_register - (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) - begin fun ({ new_operation_input }, current_mempool) params () -> - let open Preapply_result in - let operation_stream, stopper = - Lwt_watcher.create_stream new_operation_input in - (* Convert ops *) - let map_op op = - let protocol_data = - Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - op.Operation.proto in - Proto.{ shell = op.shell ; protocol_data } in - let fold_op _k (op, _error) acc = map_op op :: acc in - (* First call : retrieve the current set of op from the mempool *) - let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in - let applied = if params#applied then List.map map_op (List.map snd applied) else [] in - let refused = if params#refused then - Operation_hash.Map.fold fold_op refused [] else [] in - let branch_refused = if params#branch_refused then - Operation_hash.Map.fold fold_op branch_refused [] else [] in - let branch_delayed = if params#branch_delayed then - Operation_hash.Map.fold fold_op branch_delayed [] else [] in - let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in - let current_mempool = ref (Some current_mempool) in - let filter_result = function - | `Applied -> params#applied - | `Refused -> params#branch_refused - | `Branch_refused -> params#refused - | `Branch_delayed -> params#branch_delayed - in - let next () = - match !current_mempool with - | Some mempool -> begin - current_mempool := None ; - Lwt.return_some mempool - end - | None -> begin - Lwt_stream.get operation_stream >>= function - | Some (kind, shell, protocol_data) when filter_result kind -> - (* NOTE: Should the protocol change, a new Prevalidation - * context would be created. Thus, we use the same Proto. *) - let bytes = Data_encoding.Binary.to_bytes_exn - Proto.operation_data_encoding - protocol_data in - let protocol_data = Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - bytes in - Lwt.return_some [ { Proto.shell ; protocol_data } ] - | _ -> Lwt.return_none - end - in - let shutdown () = Lwt_watcher.shutdown stopper in - RPC_answer.return_stream { next ; shutdown } - end ; - - return !dir + let new_operation_input { new_operation_input } = new_operation_input end diff --git a/src/lib_shell/prevalidation.mli b/src/lib_shell/prevalidation.mli index a534c393e..69ba4c90d 100644 --- a/src/lib_shell/prevalidation.mli +++ b/src/lib_shell/prevalidation.mli @@ -30,6 +30,8 @@ module type T = sig + module Proto: Registered_protocol.T + type state (** Creates a new prevalidation context w.r.t. the protocol associate to the @@ -62,11 +64,17 @@ module type T = sig state -> unit - val rpc_directory : (state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t + type new_operation_input = + ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * + Operation.shell_header * + Proto.operation_data + ) Lwt_watcher.input + + val new_operation_input: state -> new_operation_input end -module Make(Proto : Registered_protocol.T) : T +module Make(Proto : Registered_protocol.T) : T with module Proto = Proto (** Pre-apply creates a new block and returns it. *) val preapply : diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 42b4af54e..727b9a50b 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -38,7 +38,7 @@ module type T = sig module Proto: Registered_protocol.T val name: name_t val parameters: limits * Distributed_db.chain_db - module Prevalidation: Prevalidation.T + module Prevalidation: Prevalidation.T with module Proto = Proto type types_state = { chain_db : Distributed_db.chain_db ; limits : limits ; @@ -55,7 +55,7 @@ module type T = sig mutable validation_result : error Preapply_result.t ; mutable validation_state : Prevalidation.state tzresult ; mutable advertisement : [ `Pending of Mempool.t | `None ] ; - mutable rpc_directory : types_state RPC_directory.t tzresult Lwt.t lazy_t ; + mutable rpc_directory : types_state RPC_directory.t lazy_t ; } module Name: Worker.NAME with type t = name_t module Types: Worker.TYPES with type state = types_state @@ -105,7 +105,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct mutable validation_result : error Preapply_result.t ; mutable validation_state : Prevalidation.state tzresult ; mutable advertisement : [ `Pending of Mempool.t | `None ] ; - mutable rpc_directory : types_state RPC_directory.t tzresult Lwt.t lazy_t ; + mutable rpc_directory : types_state RPC_directory.t lazy_t ; } module Name = struct @@ -367,57 +367,98 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct | Error _ -> (* should not happen *) Lwt.return_unit - let rpc_directory_of_protocol protocol = - begin - match Registered_protocol.get protocol with - | None -> - (* FIXME. *) - (* This should not happen: it should be handled in the validator. *) - failwith "Prevalidation: missing protocol '%a' for the current block." - Protocol_hash.pp_short protocol - | Some protocol -> - return protocol - end >>=? fun (module Proto) -> + let rpc_directory = lazy ( + let dir : state RPC_directory.t ref = ref RPC_directory.empty in + let module Proto_services = Block_services.Make(Proto)(Proto) in - let dir : state RPC_directory.t ref = ref RPC_directory.empty in - let register s f = - dir := RPC_directory.register !dir s f in + dir := RPC_directory.register !dir + (Proto_services.S.Mempool.pending_operations RPC_path.open_root) + (fun pv () () -> + let map_op op = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + op.Operation.proto in + { Proto.shell = op.shell ; protocol_data } in + let map_op_error (op, error) = (map_op op, error) in + return { + Proto_services.Mempool.applied = + List.map + (fun (hash, op) -> (hash, map_op op)) + (List.rev pv.validation_result.applied) ; + refused = + Operation_hash.Map.map map_op_error pv.validation_result.refused ; + branch_refused = + Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ; + branch_delayed = + Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ; + unprocessed = + Operation_hash.Map.map map_op pv.pending ; + }) ; - register - (Proto_services.S.Mempool.pending_operations RPC_path.open_root) - (fun pv () () -> - let map_op op = - let protocol_data = - Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - op.Operation.proto in - { Proto.shell = op.shell ; protocol_data } in - let map_op_error (op, error) = (map_op op, error) in - return { - Proto_services.Mempool.applied = - List.map - (fun (hash, op) -> (hash, map_op op)) - (List.rev pv.validation_result.applied) ; - refused = - Operation_hash.Map.map map_op_error pv.validation_result.refused ; - branch_refused = - Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ; - branch_delayed = - Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ; - unprocessed = - Operation_hash.Map.map map_op pv.pending ; - }) ; - - Prevalidation.rpc_directory >>=? fun prevalidation_dir -> - let prevalidation_dir = - RPC_directory.map (fun state -> - match state.validation_state with + dir := RPC_directory.gen_register !dir + (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) + begin fun { validation_state ; validation_result = current_mempool } params () -> + match validation_state with | Error _ -> assert false - | Ok pv -> Lwt.return (pv, state.validation_result) - ) prevalidation_dir in + | Ok pv -> + let new_operation_input = Prevalidation.new_operation_input pv in + let open Preapply_result in + let operation_stream, stopper = + Lwt_watcher.create_stream new_operation_input in + (* Convert ops *) + let map_op op = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + op.Operation.proto in + Proto.{ shell = op.shell ; protocol_data } in + let fold_op _k (op, _error) acc = map_op op :: acc in + (* First call : retrieve the current set of op from the mempool *) + let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in + let applied = if params#applied then List.map map_op (List.map snd applied) else [] in + let refused = if params#refused then + Operation_hash.Map.fold fold_op refused [] else [] in + let branch_refused = if params#branch_refused then + Operation_hash.Map.fold fold_op branch_refused [] else [] in + let branch_delayed = if params#branch_delayed then + Operation_hash.Map.fold fold_op branch_delayed [] else [] in + let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in + let current_mempool = ref (Some current_mempool) in + let filter_result = function + | `Applied -> params#applied + | `Refused -> params#branch_refused + | `Branch_refused -> params#refused + | `Branch_delayed -> params#branch_delayed + in + let next () = + match !current_mempool with + | Some mempool -> begin + current_mempool := None ; + Lwt.return_some mempool + end + | None -> begin + Lwt_stream.get operation_stream >>= function + | Some (kind, shell, protocol_data) when filter_result kind -> + (* NOTE: Should the protocol change, a new Prevalidation + * context would be created. Thus, we use the same Proto. *) + let bytes = Data_encoding.Binary.to_bytes_exn + Proto.operation_data_encoding + protocol_data in + let protocol_data = Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + bytes in + Lwt.return_some [ { Proto.shell ; protocol_data } ] + | _ -> Lwt.return_none + end + in + let shutdown () = Lwt_watcher.shutdown stopper in + RPC_answer.return_stream { next ; shutdown } + end ; - return (RPC_directory.merge !dir prevalidation_dir) + !dir + ) module Handlers = struct @@ -502,8 +543,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct end >>= fun (validation_state, validation_result) -> debug w "%d operations were not washed by the flush" (Operation_hash.Map.cardinal pending) ; - State.Block.protocol_hash pv.predecessor >>= fun old_protocol -> - State.Block.protocol_hash predecessor >>= fun new_protocol -> pv.predecessor <- predecessor ; pv.live_blocks <- new_live_blocks ; pv.live_operations <- new_live_operations ; @@ -513,8 +552,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct pv.in_mempool <- Operation_hash.Set.empty ; pv.validation_result <- validation_result ; pv.validation_state <- validation_state ; - if not (Protocol_hash.equal old_protocol new_protocol) then - pv.rpc_directory <- lazy (rpc_directory_of_protocol new_protocol) ; return_unit let on_advertise pv = @@ -566,7 +603,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct Chain.data chain_state >>= fun { current_head = predecessor ; current_mempool = mempool ; live_blocks ; live_operations } -> - State.Block.protocol_hash predecessor >>= fun protocol -> let timestamp = Time.now () in Prevalidation.start_prevalidation ~predecessor ~timestamp () >>= fun validation_state -> @@ -593,7 +629,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct in_mempool = Operation_hash.Set.empty ; validation_result ; validation_state ; advertisement = `None ; - rpc_directory = lazy (rpc_directory_of_protocol protocol) ; + rpc_directory = rpc_directory ; } in List.iter (fun oph -> Lwt.ignore_result (fetch_operation w pv oph)) @@ -771,15 +807,10 @@ let rpc_directory : t option RPC_directory.t = (Block_services.mempool_path RPC_path.open_root) (function | None -> - Lwt.return - (RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory) + Lwt.return (RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory) | Some t -> let module Prevalidator: T = (val t: T) in Prevalidator.worker >>= fun w -> let pv = Prevalidator.Worker.state w in - Lazy.force pv.rpc_directory >>= function - | Error _ -> - Lwt.return RPC_directory.empty - | Ok rpc_directory -> - Lwt.return - (RPC_directory.map (fun _ -> Lwt.return pv) rpc_directory)) + let pv_rpc_dir = Lazy.force pv.rpc_directory in + Lwt.return (RPC_directory.map (fun _ -> Lwt.return pv) pv_rpc_dir))