From e5b292731085488ab3ea1d9e815aa6246d2cf930 Mon Sep 17 00:00:00 2001 From: Vincent Botbol Date: Thu, 16 Aug 2018 18:03:10 +0200 Subject: [PATCH] Shell/Prevalidation: add the streaming of the newly validated operations --- src/lib_shell/prevalidation.ml | 119 +++++++++++++++++++++++++++++--- src/lib_shell/prevalidation.mli | 12 ++++ src/lib_shell/prevalidator.ml | 25 +++++-- src/lib_stdlib/lwt_watcher.ml | 9 +++ src/lib_stdlib/lwt_watcher.mli | 1 + 5 files changed, 153 insertions(+), 13 deletions(-) diff --git a/src/lib_shell/prevalidation.ml b/src/lib_shell/prevalidation.ml index 0c36c121f..aa359041a 100644 --- a/src/lib_shell/prevalidation.ml +++ b/src/lib_shell/prevalidation.ml @@ -64,12 +64,17 @@ let rec apply_operations apply_operation state r max_ops ~sort ops = Lwt.return (state, max_ops, r) type prevalidation_state = - State : { proto : 'a proto ; state : 'a ; - max_number_of_operations : int } + State : { proto : ('state, 'operation_data) proto ; state : 'state ; + max_number_of_operations : int ; + new_operation_input : ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * + Operation.shell_header * 'operation_data) Lwt_watcher.input ; + } -> prevalidation_state -and 'a proto = - (module Registered_protocol.T with type P.validation_state = 'a) +and ('state, 'operation_data) proto = + (module Registered_protocol.T + with type P.validation_state = 'state + and type P.operation_data = 'operation_data ) let start_prevalidation ?protocol_data @@ -120,14 +125,16 @@ let start_prevalidation >>=? fun state -> (* FIXME arbitrary value, to be customisable *) let max_number_of_operations = 1000 in + let new_operation_input = Lwt_watcher.create_input () in return (State { proto = (module Proto) ; state ; - max_number_of_operations }) - + max_number_of_operations ; + new_operation_input ; + }) let prevalidate (State { proto = (module Proto) ; state ; - max_number_of_operations }) - ~sort (ops : (Operation_hash.t * Operation.t) list)= + max_number_of_operations ; new_operation_input }) + ~sort (ops : (Operation_hash.t * Operation.t) list) = let ops = List.map (fun (h, op) -> @@ -176,7 +183,7 @@ let prevalidate (fun map (h, op, err) -> Operation_hash.Map.add h (op, err) map) r.branch_refused invalid_ops } in Lwt.return (State { proto = (module Proto) ; state ; - max_number_of_operations }, + max_number_of_operations ; new_operation_input }, r) let end_prevalidation (State { proto = (module Proto) ; state }) = @@ -236,3 +243,97 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = end >>=? fun (context, message) -> Context.commit ?message ~time:timestamp context >>= fun context -> return ({ shell_header with context }, rs) + +let notify_operation (State { proto = (module Proto) ; new_operation_input ; }) result = + let { applied ; refused ; branch_refused ; branch_delayed } = result in + (* Notify new opperations *) + let map_op kind { Operation.shell ; proto } = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + proto in + kind, shell, protocol_data in + let fold_op kind _k (op, _error) acc = map_op kind op :: acc in + let applied = List.map (map_op `Applied) (List.map snd applied) in + let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in + let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in + let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in + let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in + List.iter (Lwt_watcher.notify new_operation_input) ops + +let shutdown_operation_input (State { new_operation_input }) = + Lwt_watcher.shutdown_input new_operation_input + +let build_rpc_directory protocol = + begin + match Registered_protocol.get protocol with + | None -> + (* FIXME. *) + (* This should not happen: it should be handled in the validator. *) + failwith "Prevalidation: missing protocol '%a' for the current block." + Protocol_hash.pp_short protocol + | Some protocol -> + return protocol + end >>=? fun (module Proto) -> + let module Proto_services = Block_services.Make(Proto)(Proto) in + + let dir : (prevalidation_state * Error_monad.error Preapply_result.t) RPC_directory.t ref = + ref RPC_directory.empty in + + let gen_register s f = + dir := RPC_directory.gen_register !dir s f in + + gen_register + (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) + begin fun ((State { new_operation_input ; proto = (module Next_proto) }), current_mempool) params () -> + let operation_stream, stopper = + Lwt_watcher.create_stream new_operation_input in + (* Convert ops *) + let map_op op = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + op.Operation.proto in + Proto.{ shell = op.shell ; protocol_data } in + let fold_op _k (op, _error) acc = map_op op :: acc in + (* First call : retrieve the current set of op from the mempool *) + let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in + let applied = if params#applied then List.map map_op (List.map snd applied) else [] in + let refused = if params#refused then + Operation_hash.Map.fold fold_op refused [] else [] in + let branch_refused = if params#branch_refused then + Operation_hash.Map.fold fold_op branch_refused [] else [] in + let branch_delayed = if params#branch_delayed then + Operation_hash.Map.fold fold_op branch_delayed [] else [] in + let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in + let current_mempool = ref (Some current_mempool) in + let filter_result = function + | `Applied -> params#applied + | `Refused -> params#branch_refused + | `Branch_refused -> params#refused + | `Branch_delayed -> params#branch_delayed + in + let next () = + match !current_mempool with + | Some mempool -> begin + current_mempool := None ; + Lwt.return_some mempool + end + | None -> begin + Lwt_stream.get operation_stream >>= function + | Some (kind, shell, protocol_data) when filter_result kind -> + let bytes = Data_encoding.Binary.to_bytes_exn + Next_proto.operation_data_encoding + protocol_data in + let protocol_data = Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + bytes in + Lwt.return_some [ { Proto.shell ; protocol_data } ] + | _ -> Lwt.return_none + end + in + let shutdown () = Lwt_watcher.shutdown stopper in + RPC_answer.return_stream { next ; shutdown } + end ; + + return !dir diff --git a/src/lib_shell/prevalidation.mli b/src/lib_shell/prevalidation.mli index 30f4c1668..edf2649f0 100644 --- a/src/lib_shell/prevalidation.mli +++ b/src/lib_shell/prevalidation.mli @@ -48,3 +48,15 @@ val preapply : Operation.t list list -> (Block_header.shell_header * error Preapply_result.t list) tzresult Lwt.t +val notify_operation : + prevalidation_state -> + error Preapply_result.t -> + unit + +val shutdown_operation_input : + prevalidation_state -> + unit + +val build_rpc_directory : + Protocol_hash.t -> + (prevalidation_state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 637ffa638..9157c32e6 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -122,9 +122,12 @@ let rpc_directory protocol = return protocol end >>=? fun (module Proto) -> let module Proto_services = Block_services.Make(Proto)(Proto) in - return @@ - RPC_directory.register - RPC_directory.empty + + let dir : state RPC_directory.t ref = ref RPC_directory.empty in + let register s f = + dir := RPC_directory.register !dir s f in + + register (Proto_services.S.Mempool.pending_operations RPC_path.open_root) (fun pv () () -> let map_op op = @@ -147,7 +150,17 @@ let rpc_directory protocol = Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ; unprocessed = Operation_hash.Map.map map_op pv.pending ; - }) + }) ; + + Prevalidation.build_rpc_directory protocol >>=? fun prevalidation_dir -> + let prevalidation_dir = + RPC_directory.map (fun state -> + match state.validation_state with + | Error _ -> assert false + | Ok pv -> Lwt.return (pv, state.validation_result) + ) prevalidation_dir in + + return (RPC_directory.merge !dir prevalidation_dir) let list_pendings ?maintain_chain_db ~from_block ~to_block old_mempool = let rec pop_blocks ancestor block mempool = @@ -302,6 +315,7 @@ let handle_unprocessed w pv = Operation_hash.Map.empty ; advertise w pv (mempool_of_prevalidation_result validation_result) ; + Prevalidation.notify_operation validation_state validation_result ; Lwt.return_unit end >>= fun () -> pv.mempool <- @@ -448,6 +462,9 @@ let on_request (* TODO: rebase the advertisement instead *) let chain_state = Distributed_db.chain_state pv.chain_db in State.Block.read chain_state hash >>=? fun block -> + begin match pv.validation_state with + | Ok pv -> Prevalidation.shutdown_operation_input pv + | Error _ -> () end ; on_flush w pv block >>=? fun () -> return (() : r) | Request.Notify (peer, mempool) -> diff --git a/src/lib_stdlib/lwt_watcher.ml b/src/lib_stdlib/lwt_watcher.ml index be479ee95..ad594e923 100644 --- a/src/lib_stdlib/lwt_watcher.ml +++ b/src/lib_stdlib/lwt_watcher.ml @@ -40,6 +40,15 @@ let create_input () = { watchers = []; cpt = 0 } +let shutdown_input input = + let { watchers ; _ } = input in + List.iter (fun w -> + w.active <- false ; + w.push None + ) watchers ; + input.cpt <- 0 ; + input.watchers <- [] + let create_fake_stream () = let str, push = Lwt_stream.create () in str, (fun () -> push None) diff --git a/src/lib_stdlib/lwt_watcher.mli b/src/lib_stdlib/lwt_watcher.mli index dce0a63dc..b384b2a20 100644 --- a/src/lib_stdlib/lwt_watcher.mli +++ b/src/lib_stdlib/lwt_watcher.mli @@ -29,6 +29,7 @@ type 'a input type stopper val create_input : unit -> 'a input +val shutdown_input : 'a input -> unit val notify : 'a input -> 'a -> unit val create_stream : 'a input -> 'a Lwt_stream.t * stopper val create_fake_stream : unit -> 'a Lwt_stream.t * stopper