diff --git a/src/lib_shell/block_directory.ml b/src/lib_shell/block_directory.ml index 7e4af4e8f..ea821fd49 100644 --- a/src/lib_shell/block_directory.ml +++ b/src/lib_shell/block_directory.ml @@ -264,7 +264,6 @@ let build_raw_rpc_directory ~predecessor:block ~timestamp ~protocol_data - ~sort_operations:q#sort_operations operations end ; diff --git a/src/lib_shell/prevalidation.ml b/src/lib_shell/prevalidation.ml index d87d8f977..f518366e9 100644 --- a/src/lib_shell/prevalidation.ml +++ b/src/lib_shell/prevalidation.ml @@ -68,45 +68,45 @@ module type T = sig module Proto: Registered_protocol.T - type state + type t + + type operation = private { + hash: Operation_hash.t ; + raw: Operation.t ; + protocol_data: Proto.operation_data ; + } + val compare: operation -> operation -> int + + val parse: Operation.t -> operation tzresult (** Creates a new prevalidation context w.r.t. the protocol associate to the predecessor block . When ?protocol_data is passed to this function, it will be used to create the new block *) - val start_prevalidation : + val create : ?protocol_data: MBytes.t -> predecessor: State.Block.t -> timestamp: Time.t -> - unit -> state tzresult Lwt.t + unit -> t tzresult Lwt.t - (** Given a prevalidation context applies a list of operations, - returns a new prevalidation context plus the preapply result containing the - list of operations that cannot be applied to this context *) - val prevalidate : - state -> sort:bool -> - (Operation_hash.t * Operation.t) list -> - (state * error Preapply_result.t) Lwt.t + type result = + | Applied of t * Proto.operation_receipt + | Branch_delayed of error list + | Branch_refused of error list + | Refused of error list + | Duplicate + | Outdated - val end_prevalidation : - state -> - Tezos_protocol_environment_shell.validation_result tzresult Lwt.t + val apply_operation: t -> operation -> result Lwt.t + val apply_operation_with_preapply_result: + error Preapply_result.t -> t -> operation -> (error Preapply_result.t * t) Lwt.t - val notify_operation : - state -> - error Preapply_result.t -> - unit + type status = { + applied_operations : (operation * Proto.operation_receipt) list ; + block_result : Tezos_protocol_environment_shell.validation_result ; + block_metadata : Proto.block_header_metadata ; + } - val shutdown_operation_input : - state -> - unit - - type new_operation_input = - ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * - Operation.shell_header * - Proto.operation_data - ) Lwt_watcher.input - - val new_operation_input: state -> new_operation_input + val status: t -> status tzresult Lwt.t end @@ -114,16 +114,49 @@ module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struc module Proto = Proto - type state = + type operation = { + hash: Operation_hash.t ; + raw: Operation.t ; + protocol_data: Proto.operation_data ; + } + + + type t = { state : Proto.validation_state ; - max_number_of_operations : int ; - new_operation_input : ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * - Operation.shell_header * Proto.operation_data) Lwt_watcher.input ; + applied : (operation * Proto.operation_receipt) list ; + live_blocks : Block_hash.Set.t ; + live_operations : Operation_hash.Set.t ; } - let start_prevalidation - ?protocol_data - ~predecessor ~timestamp () = + type result = + | Applied of t * Proto.operation_receipt + | Branch_delayed of error list + | Branch_refused of error list + | Refused of error list + | Duplicate + | Outdated + + let parse (raw : Operation.t) = + let hash = Operation.hash raw in + let size = Data_encoding.Binary.length Operation.encoding raw in + if size > Proto.max_operation_data_length then + error + (Oversized_operation + { size ; max = Proto.max_operation_data_length }) + else + match Data_encoding.Binary.of_bytes + Proto.operation_data_encoding + raw.Operation.proto with + | None -> error Parse_error + | Some protocol_data -> + ok { hash ; raw ; protocol_data } + + let compare op1 op2 = + Proto.compare_operations + { shell = op1.raw.shell ; protocol_data = op1.protocol_data } + { shell = op2.raw.shell ; protocol_data = op2.protocol_data } + + let create ?protocol_data ~predecessor ~timestamp () = let { Block_header.shell = { fitness = predecessor_fitness ; timestamp = predecessor_timestamp ; @@ -131,6 +164,14 @@ module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struc State.Block.header predecessor in State.Block.context predecessor >>= fun predecessor_context -> let predecessor_hash = State.Block.hash predecessor in + Chain_traversal.live_blocks + predecessor + (State.Block.max_operations_ttl predecessor) + >>= fun (live_blocks, live_operations) -> + Context.reset_test_chain + predecessor_context predecessor_hash + timestamp >>= fun predecessor_context -> + Context.reset_test_chain predecessor_context predecessor_hash timestamp >>= fun predecessor_context -> @@ -158,99 +199,79 @@ module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struc () >>=? fun state -> (* FIXME arbitrary value, to be customisable *) - let max_number_of_operations = 1000 in - let new_operation_input = Lwt_watcher.create_input () in - return { state ; max_number_of_operations ; new_operation_input ; } + return { + state ; + applied = [] ; + live_blocks ; + live_operations ; + } - let prevalidate - { state ; max_number_of_operations ; new_operation_input ; } - ~sort (ops : (Operation_hash.t * Operation.t) list) = - let ops = - List.map - (fun (h, op) -> - let parsed_op = - match Data_encoding.Binary.of_bytes - Proto.operation_data_encoding - op.Operation.proto with - | None -> error Parse_error - | Some protocol_data -> - Ok ({ shell = op.shell ; protocol_data } : Proto.operation) in - (h, op, parsed_op)) - ops in - let invalid_ops = - List.filter_map - (fun (h, op, parsed_op) -> match parsed_op with - | Ok _ -> None - | Error err -> Some (h, op, err)) ops - and parsed_ops = - List.filter_map - (fun (h, op, parsed_op) -> match parsed_op with - | Ok parsed_op -> Some (h, op, parsed_op) - | Error _ -> None) ops in - ignore invalid_ops; (* FIXME *) - let sorted_ops = - if sort then - let compare (_, _, op1) (_, _, op2) = Proto.compare_operations op1 op2 in - List.sort compare parsed_ops - else parsed_ops in - let apply_operation state max_ops op (parse_op) = - let size = Data_encoding.Binary.length Operation.encoding op in - if max_ops <= 0 then - fail Too_many_operations - else if size > Proto.max_operation_data_length then - fail (Oversized_operation { size ; max = Proto.max_operation_data_length }) - else - Proto.apply_operation state parse_op >>=? fun (state, receipt) -> - return (state, receipt) in - apply_operations - apply_operation - state Preapply_result.empty max_number_of_operations - ~sort sorted_ops >>= fun (state, max_number_of_operations, r) -> - let r = - { r with - applied = List.rev r.applied ; - branch_refused = - List.fold_left - (fun map (h, op, err) -> Operation_hash.Map.add h (op, err) map) - r.branch_refused invalid_ops } in - Lwt.return ({ state ; max_number_of_operations ; new_operation_input ; }, r) + let apply_operation pv op = + if Operation_hash.Set.mem op.hash pv.live_operations then + Lwt.return Outdated + else + Proto.apply_operation pv.state + { shell = op.raw.shell ; protocol_data = op.protocol_data } >|= function + | Ok (state, receipt) -> + let pv = + { state ; + applied = (op, receipt) :: pv.applied ; + live_blocks = pv.live_blocks ; + live_operations = Operation_hash.Set.add op.hash pv.live_operations ; + } in + Applied (pv, receipt) + | Error errors -> + match classify_errors errors with + | `Branch -> Branch_refused errors + | `Permanent -> Refused errors + | `Temporary -> Branch_delayed errors - let end_prevalidation { state } = - Proto.finalize_block state >>=? fun (result, _metadata) -> - return result - - let notify_operation { new_operation_input } result = + let apply_operation_with_preapply_result preapp t op = let open Preapply_result in - let { applied ; refused ; branch_refused ; branch_delayed } = result in - (* Notify new opperations *) - let map_op kind { Operation.shell ; proto } = - let protocol_data = - Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - proto in - kind, shell, protocol_data in - let fold_op kind _k (op, _error) acc = map_op kind op :: acc in - let applied = List.map (map_op `Applied) (List.map snd applied) in - let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in - let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in - let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in - let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in - List.iter (Lwt_watcher.notify new_operation_input) ops + apply_operation t op >>= function + | Applied (t, _) -> + let applied = (op.hash, op.raw) :: preapp.applied in + Lwt.return ({ preapp with applied }, t) + | Branch_delayed errors -> + let branch_delayed = + Operation_hash.Map.add + op.hash + (op.raw, errors) + preapp.branch_delayed in + Lwt.return ({ preapp with branch_delayed }, t) + | Branch_refused errors -> + let branch_refused = + Operation_hash.Map.add + op.hash + (op.raw, errors) + preapp.branch_refused in + Lwt.return ({ preapp with branch_refused }, t) + | Refused errors -> + let refused = + Operation_hash.Map.add + op.hash + (op.raw, errors) + preapp.refused in + Lwt.return ({ preapp with refused }, t) + | Duplicate | Outdated -> Lwt.return (preapp, t) - let shutdown_operation_input { new_operation_input } = - Lwt_watcher.shutdown_input new_operation_input + type status = { + applied_operations : (operation * Proto.operation_receipt) list ; + block_result : Tezos_protocol_environment_shell.validation_result ; + block_metadata : Proto.block_header_metadata ; + } - type new_operation_input = - ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * - Operation.shell_header * - Proto.operation_data - ) Lwt_watcher.input - - let new_operation_input { new_operation_input } = new_operation_input + let status pv = + Proto.finalize_block pv.state >>=? fun (block_result, block_metadata) -> + return { + block_metadata ; + block_result ; + applied_operations = pv.applied ; + } end -let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = +let preapply ~predecessor ~timestamp ~protocol_data operations = State.Block.context predecessor >>= fun predecessor_context -> Context.get_protocol predecessor_context >>= fun protocol -> begin @@ -264,27 +285,37 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = return protocol end >>=? fun (module Proto) -> let module Prevalidation = Make(Proto) in - Prevalidation.start_prevalidation + Prevalidation.create ~protocol_data ~predecessor ~timestamp () >>=? fun validation_state -> - let ops = List.map (List.map (fun x -> Operation.hash x, x)) ops in Lwt_list.fold_left_s - (fun (validation_state, rs) ops -> - Prevalidation.prevalidate - validation_state ~sort ops >>= fun (validation_state, r) -> - Lwt.return (validation_state, rs @ [r])) - (validation_state, []) ops >>= fun (validation_state, rs) -> + (fun (acc_validation_result, acc_validation_state) operations -> + Lwt_list.fold_left_s + (fun (acc_validation_result, acc_validation_state) op -> + match Prevalidation.parse op with + | Error _ -> + (* FIXME *) + Lwt.return (acc_validation_result, acc_validation_state) + | Ok op -> + Prevalidation.apply_operation_with_preapply_result + acc_validation_result acc_validation_state op) + (Preapply_result.empty, acc_validation_state) + operations + >>= fun (new_validation_result, new_validation_state) -> + Lwt.return (acc_validation_result @ [new_validation_result], new_validation_state) + ) ([], validation_state) operations + >>= fun (validation_result_list, validation_state) -> let operations_hash = Operation_list_list_hash.compute - (List.map - (fun r -> - Operation_list_hash.compute - (List.map fst r.Preapply_result.applied)) - rs) in - Prevalidation.end_prevalidation validation_state >>=? fun validation_result -> + (List.map (fun r -> + Operation_list_hash.compute + (List.map fst r.Preapply_result.applied) + ) validation_result_list) + in + Prevalidation.status validation_state >>=? fun { block_result ; _ } -> let pred_shell_header = State.Block.shell_header predecessor in let level = Int32.succ pred_shell_header.level in Block_validator.may_patch_protocol - ~level validation_result >>=? fun { fitness ; context ; message } -> + ~level block_result >>=? fun { fitness ; context ; message } -> State.Block.protocol_hash predecessor >>= fun pred_protocol -> Context.get_protocol context >>= fun protocol -> let proto_level = @@ -297,7 +328,7 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = proto_level ; predecessor = State.Block.hash predecessor ; timestamp ; - validation_passes = List.length rs ; + validation_passes = List.length validation_result_list ; operations_hash ; fitness ; context = Context_hash.zero ; (* place holder *) @@ -315,4 +346,4 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = return (context, message) end >>=? fun (context, message) -> Context.hash ?message ~time:timestamp context >>= fun context -> - return ({ shell_header with context }, rs) + return ({ shell_header with context }, validation_result_list) diff --git a/src/lib_shell/prevalidation.mli b/src/lib_shell/prevalidation.mli index 69ba4c90d..0c6ea93cb 100644 --- a/src/lib_shell/prevalidation.mli +++ b/src/lib_shell/prevalidation.mli @@ -32,45 +32,45 @@ module type T = sig module Proto: Registered_protocol.T - type state + type t + + type operation = private { + hash: Operation_hash.t ; + raw: Operation.t ; + protocol_data: Proto.operation_data ; + } + val compare: operation -> operation -> int + + val parse: Operation.t -> operation tzresult (** Creates a new prevalidation context w.r.t. the protocol associate to the predecessor block . When ?protocol_data is passed to this function, it will be used to create the new block *) - val start_prevalidation : + val create : ?protocol_data: MBytes.t -> predecessor: State.Block.t -> timestamp: Time.t -> - unit -> state tzresult Lwt.t + unit -> t tzresult Lwt.t - (** Given a prevalidation context applies a list of operations, - returns a new prevalidation context plus the preapply result containing the - list of operations that cannot be applied to this context *) - val prevalidate : - state -> sort:bool -> - (Operation_hash.t * Operation.t) list -> - (state * error Preapply_result.t) Lwt.t + type result = + | Applied of t * Proto.operation_receipt + | Branch_delayed of error list + | Branch_refused of error list + | Refused of error list + | Duplicate + | Outdated - val end_prevalidation : - state -> - Tezos_protocol_environment_shell.validation_result tzresult Lwt.t + val apply_operation: t -> operation -> result Lwt.t + val apply_operation_with_preapply_result: + error Preapply_result.t -> t -> operation -> (error Preapply_result.t * t) Lwt.t - val notify_operation : - state -> - error Preapply_result.t -> - unit + type status = { + applied_operations : (operation * Proto.operation_receipt) list ; + block_result : Tezos_protocol_environment_shell.validation_result ; + block_metadata : Proto.block_header_metadata ; + } - val shutdown_operation_input : - state -> - unit - - type new_operation_input = - ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * - Operation.shell_header * - Proto.operation_data - ) Lwt_watcher.input - - val new_operation_input: state -> new_operation_input + val status: t -> status tzresult Lwt.t end @@ -81,6 +81,5 @@ val preapply : predecessor:State.Block.t -> timestamp:Time.t -> protocol_data:MBytes.t -> - sort_operations:bool -> Operation.t list list -> (Block_header.shell_header * error Preapply_result.t list) tzresult Lwt.t diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 727b9a50b..0b580ef1a 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -53,7 +53,12 @@ module type T = sig mutable mempool : Mempool.t ; mutable in_mempool : Operation_hash.Set.t ; mutable validation_result : error Preapply_result.t ; - mutable validation_state : Prevalidation.state tzresult ; + mutable validation_state : Prevalidation.t tzresult ; + mutable operation_stream : + ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * + Operation.shell_header * + Proto.operation_data + ) Lwt_watcher.input; mutable advertisement : [ `Pending of Mempool.t | `None ] ; mutable rpc_directory : types_state RPC_directory.t lazy_t ; } @@ -103,7 +108,12 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct mutable mempool : Mempool.t ; mutable in_mempool : Operation_hash.Set.t ; mutable validation_result : error Preapply_result.t ; - mutable validation_state : Prevalidation.state tzresult ; + mutable validation_state : Prevalidation.t tzresult ; + mutable operation_stream : + ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * + Operation.shell_header * + Proto.operation_data + ) Lwt_watcher.input; mutable advertisement : [ `Pending of Mempool.t | `None ] ; mutable rpc_directory : types_state RPC_directory.t lazy_t ; } @@ -171,6 +181,27 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct = Worker.Make (Name) (Prevalidator_worker_state.Event) (Prevalidator_worker_state.Request) (Types) + + (** Centralised operation stream for the RPCs *) + let notify_operation { operation_stream } result = + let open Preapply_result in + let { applied ; refused ; branch_refused ; branch_delayed } = result in + (* Notify new opperations *) + let map_op kind { Operation.shell ; proto } = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + proto in + kind, shell, protocol_data in + let fold_op kind _k (op, _error) acc = map_op kind op :: acc in + let applied = List.map (map_op `Applied) (List.map snd applied) in + let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in + let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in + let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in + let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in + List.iter (Lwt_watcher.notify operation_stream) ops + + open Types type worker = Worker.infinite Worker.queue Worker.t @@ -296,15 +327,26 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct pv.pending <- Operation_hash.Map.empty ; Lwt.return_unit - | Ok validation_state -> + | Ok state -> match Operation_hash.Map.cardinal pv.pending with | 0 -> Lwt.return_unit - | n -> debug w "processing %d operations" n ; - Prevalidation.prevalidate validation_state ~sort:true - (Operation_hash.Map.bindings pv.pending) - >>= fun (validation_state, validation_result) -> - pv.validation_state <- - Ok validation_state ; + | n -> + debug w "processing %d operations" n ; + let operations = List.map snd (Operation_hash.Map.bindings pv.pending) in + Lwt_list.fold_left_s + (fun (acc_validation_result, acc_validation_state) op -> + match Prevalidation.parse op with + | Error _ -> + (* FIXME *) + Lwt.return (acc_validation_result, acc_validation_state) + | Ok op -> + Prevalidation.apply_operation_with_preapply_result + acc_validation_result acc_validation_state op) + (pv.validation_result, state) + operations + >>= fun (new_result, new_state) -> + pv.validation_state <- Ok new_state ; + pv.validation_result <- new_result ; pv.in_mempool <- (Operation_hash.Map.fold (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) @@ -323,15 +365,10 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct Operation_hash.Map.iter (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.chain_db oph) pv.validation_result.refused ; - pv.validation_result <- - merge_validation_results - ~old:pv.validation_result - ~neu:validation_result ; - pv.pending <- - Operation_hash.Map.empty ; + pv.pending <- Operation_hash.Map.empty ; advertise w pv - (mempool_of_prevalidation_result validation_result) ; - Prevalidation.notify_operation validation_state validation_result ; + (mempool_of_prevalidation_result new_result) ; + notify_operation pv new_result ; Lwt.return_unit end >>= fun () -> pv.mempool <- @@ -399,62 +436,57 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct dir := RPC_directory.gen_register !dir (Proto_services.S.Mempool.monitor_operations RPC_path.open_root) - begin fun { validation_state ; validation_result = current_mempool } params () -> - match validation_state with - | Error _ -> assert false - | Ok pv -> - let new_operation_input = Prevalidation.new_operation_input pv in - let open Preapply_result in - let operation_stream, stopper = - Lwt_watcher.create_stream new_operation_input in - (* Convert ops *) - let map_op op = - let protocol_data = - Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - op.Operation.proto in - Proto.{ shell = op.shell ; protocol_data } in - let fold_op _k (op, _error) acc = map_op op :: acc in - (* First call : retrieve the current set of op from the mempool *) - let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in - let applied = if params#applied then List.map map_op (List.map snd applied) else [] in - let refused = if params#refused then - Operation_hash.Map.fold fold_op refused [] else [] in - let branch_refused = if params#branch_refused then - Operation_hash.Map.fold fold_op branch_refused [] else [] in - let branch_delayed = if params#branch_delayed then - Operation_hash.Map.fold fold_op branch_delayed [] else [] in - let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in - let current_mempool = ref (Some current_mempool) in - let filter_result = function - | `Applied -> params#applied - | `Refused -> params#branch_refused - | `Branch_refused -> params#refused - | `Branch_delayed -> params#branch_delayed - in - let next () = - match !current_mempool with - | Some mempool -> begin - current_mempool := None ; - Lwt.return_some mempool - end - | None -> begin - Lwt_stream.get operation_stream >>= function - | Some (kind, shell, protocol_data) when filter_result kind -> - (* NOTE: Should the protocol change, a new Prevalidation - * context would be created. Thus, we use the same Proto. *) - let bytes = Data_encoding.Binary.to_bytes_exn - Proto.operation_data_encoding - protocol_data in - let protocol_data = Data_encoding.Binary.of_bytes_exn - Proto.operation_data_encoding - bytes in - Lwt.return_some [ { Proto.shell ; protocol_data } ] - | _ -> Lwt.return_none - end - in - let shutdown () = Lwt_watcher.shutdown stopper in - RPC_answer.return_stream { next ; shutdown } + begin fun { validation_result = current_mempool ; operation_stream } params () -> + let open Preapply_result in + let op_stream, stopper = Lwt_watcher.create_stream operation_stream in + (* Convert ops *) + let map_op op = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + op.Operation.proto in + Proto.{ shell = op.shell ; protocol_data } in + let fold_op _k (op, _error) acc = map_op op :: acc in + (* First call : retrieve the current set of op from the mempool *) + let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in + let applied = if params#applied then List.map map_op (List.map snd applied) else [] in + let refused = if params#refused then + Operation_hash.Map.fold fold_op refused [] else [] in + let branch_refused = if params#branch_refused then + Operation_hash.Map.fold fold_op branch_refused [] else [] in + let branch_delayed = if params#branch_delayed then + Operation_hash.Map.fold fold_op branch_delayed [] else [] in + let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in + let current_mempool = ref (Some current_mempool) in + let filter_result = function + | `Applied -> params#applied + | `Refused -> params#branch_refused + | `Branch_refused -> params#refused + | `Branch_delayed -> params#branch_delayed + in + let next () = + match !current_mempool with + | Some mempool -> begin + current_mempool := None ; + Lwt.return_some mempool + end + | None -> begin + Lwt_stream.get op_stream >>= function + | Some (kind, shell, protocol_data) when filter_result kind -> + (* NOTE: Should the protocol change, a new Prevalidation + * context would be created. Thus, we use the same Proto. *) + let bytes = Data_encoding.Binary.to_bytes_exn + Proto.operation_data_encoding + protocol_data in + let protocol_data = Data_encoding.Binary.of_bytes_exn + Proto.operation_data_encoding + bytes in + Lwt.return_some [ { Proto.shell ; protocol_data } ] + | _ -> Lwt.return_none + end + in + let shutdown () = Lwt_watcher.shutdown stopper in + RPC_answer.return_stream { next ; shutdown } end ; !dir @@ -475,38 +507,18 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct let on_inject pv op = let oph = Operation.hash op in - begin - if already_handled pv oph then - return pv.validation_result - else - Lwt.return pv.validation_state >>=? fun validation_state -> - Prevalidation.prevalidate - validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) -> - match result.applied with - | [ app_oph, _ ] when Operation_hash.equal app_oph oph -> - Distributed_db.inject_operation pv.chain_db oph op >>= fun (_ : bool) -> - pv.pending <- Operation_hash.Map.add oph op pv.pending ; - return result - | _ -> - return result - end >>=? fun result -> - if List.mem_assoc oph result.applied then - return_unit + if already_handled pv oph then + return_unit (* FIXME : is this an error ? *) else - let try_in_map map proj or_else = - try - Lwt.return (Error (proj (Operation_hash.Map.find oph map))) - with Not_found -> or_else () in - try_in_map pv.refusals (fun h -> h) @@ fun () -> - try_in_map result.refused snd @@ fun () -> - try_in_map result.branch_refused snd @@ fun () -> - try_in_map result.branch_delayed snd @@ fun () -> - if Operation_hash.Set.mem oph pv.live_operations then - failwith "Injected operation %a included in a previous block." - Operation_hash.pp oph - else - failwith "Injected operation %a is not in prevalidation result." - Operation_hash.pp oph + Lwt.return pv.validation_state >>=? fun validation_state -> + Lwt.return (Prevalidation.parse op) >>=? fun parsed_op -> + Prevalidation.apply_operation validation_state parsed_op >>= function + | Applied (_, _result) -> + Distributed_db.inject_operation pv.chain_db oph op >>= fun (_ : bool) -> + pv.pending <- Operation_hash.Map.add parsed_op.hash op pv.pending ; + return_unit + | _ -> + failwith "Error while applying operation %a" Operation_hash.pp oph let on_notify w pv peer mempool = let all_ophs = @@ -526,21 +538,15 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct to_fetch let on_flush w pv predecessor = + Lwt_watcher.shutdown_input pv.operation_stream; list_pendings ~maintain_chain_db:pv.chain_db ~from_block:pv.predecessor ~to_block:predecessor (Preapply_result.operations pv.validation_result) >>= fun (pending, new_live_blocks, new_live_operations) -> let timestamp = Time.now () in - Prevalidation.start_prevalidation - ~predecessor ~timestamp () >>= fun validation_state -> - begin match validation_state with - | Error _ -> Lwt.return (validation_state, Preapply_result.empty) - | Ok validation_state -> - Prevalidation.prevalidate - validation_state ~sort:false [] >>= fun (state, result) -> - Lwt.return (Ok state, result) - end >>= fun (validation_state, validation_result) -> + Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state -> + let validation_result = Preapply_result.empty in debug w "%d operations were not washed by the flush" (Operation_hash.Map.cardinal pending) ; pv.predecessor <- predecessor ; @@ -552,6 +558,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct pv.in_mempool <- Operation_hash.Set.empty ; pv.validation_result <- validation_result ; pv.validation_state <- validation_state ; + pv.operation_stream <- Lwt_watcher.create_input () ; return_unit let on_advertise pv = @@ -571,9 +578,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct (* TODO: rebase the advertisement instead *) let chain_state = Distributed_db.chain_state pv.chain_db in State.Block.read chain_state hash >>=? fun block -> - begin match pv.validation_state with - | Ok pv -> Prevalidation.shutdown_operation_input pv - | Error _ -> () end ; on_flush w pv block >>=? fun () -> return (() : r) | Request.Notify (peer, mempool) -> @@ -604,16 +608,8 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct { current_head = predecessor ; current_mempool = mempool ; live_blocks ; live_operations } -> let timestamp = Time.now () in - Prevalidation.start_prevalidation - ~predecessor ~timestamp () >>= fun validation_state -> - begin match validation_state with - | Error _ -> Lwt.return (validation_state, Preapply_result.empty) - | Ok validation_state -> - Prevalidation.prevalidate validation_state ~sort:false [] - >>= fun (validation_state, validation_result) -> - - Lwt.return (Ok validation_state, validation_result) - end >>= fun (validation_state, validation_result) -> + Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state -> + let validation_result = Preapply_result.empty in let fetching = List.fold_left (fun s h -> Operation_hash.Set.add h s) @@ -627,7 +623,9 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct fetching ; pending = Operation_hash.Map.empty ; in_mempool = Operation_hash.Set.empty ; - validation_result ; validation_state ; + validation_result ; + validation_state ; + operation_stream = Lwt_watcher.create_input () ; advertisement = `None ; rpc_directory = rpc_directory ; } in