Shell: simplify the signature of Prevalidation

Co-authored-by: Raphaël Proust <code@bnwr.net>
Co-authored-by: Pietro Abate <pietro.abate@tezcore.com>
Co-authored-by: Grégoire Henry <gregoire@tezcore.com>
This commit is contained in:
Raphaël Proust 2018-10-18 16:07:52 +02:00 committed by Grégoire Henry
parent 02bc43b094
commit 7cbfcfa608
No known key found for this signature in database
GPG Key ID: 50D984F20BD445D2
4 changed files with 320 additions and 293 deletions

View File

@ -264,7 +264,6 @@ let build_raw_rpc_directory
~predecessor:block ~predecessor:block
~timestamp ~timestamp
~protocol_data ~protocol_data
~sort_operations:q#sort_operations
operations operations
end ; end ;

View File

@ -68,45 +68,45 @@ module type T = sig
module Proto: Registered_protocol.T module Proto: Registered_protocol.T
type state type t
type operation = private {
hash: Operation_hash.t ;
raw: Operation.t ;
protocol_data: Proto.operation_data ;
}
val compare: operation -> operation -> int
val parse: Operation.t -> operation tzresult
(** Creates a new prevalidation context w.r.t. the protocol associate to the (** Creates a new prevalidation context w.r.t. the protocol associate to the
predecessor block . When ?protocol_data is passed to this function, it will predecessor block . When ?protocol_data is passed to this function, it will
be used to create the new block *) be used to create the new block *)
val start_prevalidation : val create :
?protocol_data: MBytes.t -> ?protocol_data: MBytes.t ->
predecessor: State.Block.t -> predecessor: State.Block.t ->
timestamp: Time.t -> timestamp: Time.t ->
unit -> state tzresult Lwt.t unit -> t tzresult Lwt.t
(** Given a prevalidation context applies a list of operations, type result =
returns a new prevalidation context plus the preapply result containing the | Applied of t * Proto.operation_receipt
list of operations that cannot be applied to this context *) | Branch_delayed of error list
val prevalidate : | Branch_refused of error list
state -> sort:bool -> | Refused of error list
(Operation_hash.t * Operation.t) list -> | Duplicate
(state * error Preapply_result.t) Lwt.t | Outdated
val end_prevalidation : val apply_operation: t -> operation -> result Lwt.t
state -> val apply_operation_with_preapply_result:
Tezos_protocol_environment_shell.validation_result tzresult Lwt.t error Preapply_result.t -> t -> operation -> (error Preapply_result.t * t) Lwt.t
val notify_operation : type status = {
state -> applied_operations : (operation * Proto.operation_receipt) list ;
error Preapply_result.t -> block_result : Tezos_protocol_environment_shell.validation_result ;
unit block_metadata : Proto.block_header_metadata ;
}
val shutdown_operation_input : val status: t -> status tzresult Lwt.t
state ->
unit
type new_operation_input =
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
Operation.shell_header *
Proto.operation_data
) Lwt_watcher.input
val new_operation_input: state -> new_operation_input
end end
@ -114,16 +114,49 @@ module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struc
module Proto = Proto module Proto = Proto
type state = type operation = {
hash: Operation_hash.t ;
raw: Operation.t ;
protocol_data: Proto.operation_data ;
}
type t =
{ state : Proto.validation_state ; { state : Proto.validation_state ;
max_number_of_operations : int ; applied : (operation * Proto.operation_receipt) list ;
new_operation_input : ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * live_blocks : Block_hash.Set.t ;
Operation.shell_header * Proto.operation_data) Lwt_watcher.input ; live_operations : Operation_hash.Set.t ;
} }
let start_prevalidation type result =
?protocol_data | Applied of t * Proto.operation_receipt
~predecessor ~timestamp () = | Branch_delayed of error list
| Branch_refused of error list
| Refused of error list
| Duplicate
| Outdated
let parse (raw : Operation.t) =
let hash = Operation.hash raw in
let size = Data_encoding.Binary.length Operation.encoding raw in
if size > Proto.max_operation_data_length then
error
(Oversized_operation
{ size ; max = Proto.max_operation_data_length })
else
match Data_encoding.Binary.of_bytes
Proto.operation_data_encoding
raw.Operation.proto with
| None -> error Parse_error
| Some protocol_data ->
ok { hash ; raw ; protocol_data }
let compare op1 op2 =
Proto.compare_operations
{ shell = op1.raw.shell ; protocol_data = op1.protocol_data }
{ shell = op2.raw.shell ; protocol_data = op2.protocol_data }
let create ?protocol_data ~predecessor ~timestamp () =
let { Block_header.shell = let { Block_header.shell =
{ fitness = predecessor_fitness ; { fitness = predecessor_fitness ;
timestamp = predecessor_timestamp ; timestamp = predecessor_timestamp ;
@ -131,6 +164,14 @@ module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struc
State.Block.header predecessor in State.Block.header predecessor in
State.Block.context predecessor >>= fun predecessor_context -> State.Block.context predecessor >>= fun predecessor_context ->
let predecessor_hash = State.Block.hash predecessor in let predecessor_hash = State.Block.hash predecessor in
Chain_traversal.live_blocks
predecessor
(State.Block.max_operations_ttl predecessor)
>>= fun (live_blocks, live_operations) ->
Context.reset_test_chain
predecessor_context predecessor_hash
timestamp >>= fun predecessor_context ->
Context.reset_test_chain Context.reset_test_chain
predecessor_context predecessor_hash predecessor_context predecessor_hash
timestamp >>= fun predecessor_context -> timestamp >>= fun predecessor_context ->
@ -158,99 +199,79 @@ module Make(Proto : Registered_protocol.T) : T with module Proto = Proto = struc
() ()
>>=? fun state -> >>=? fun state ->
(* FIXME arbitrary value, to be customisable *) (* FIXME arbitrary value, to be customisable *)
let max_number_of_operations = 1000 in return {
let new_operation_input = Lwt_watcher.create_input () in state ;
return { state ; max_number_of_operations ; new_operation_input ; } applied = [] ;
live_blocks ;
live_operations ;
}
let prevalidate let apply_operation pv op =
{ state ; max_number_of_operations ; new_operation_input ; } if Operation_hash.Set.mem op.hash pv.live_operations then
~sort (ops : (Operation_hash.t * Operation.t) list) = Lwt.return Outdated
let ops = else
List.map Proto.apply_operation pv.state
(fun (h, op) -> { shell = op.raw.shell ; protocol_data = op.protocol_data } >|= function
let parsed_op = | Ok (state, receipt) ->
match Data_encoding.Binary.of_bytes let pv =
Proto.operation_data_encoding { state ;
op.Operation.proto with applied = (op, receipt) :: pv.applied ;
| None -> error Parse_error live_blocks = pv.live_blocks ;
| Some protocol_data -> live_operations = Operation_hash.Set.add op.hash pv.live_operations ;
Ok ({ shell = op.shell ; protocol_data } : Proto.operation) in } in
(h, op, parsed_op)) Applied (pv, receipt)
ops in | Error errors ->
let invalid_ops = match classify_errors errors with
List.filter_map | `Branch -> Branch_refused errors
(fun (h, op, parsed_op) -> match parsed_op with | `Permanent -> Refused errors
| Ok _ -> None | `Temporary -> Branch_delayed errors
| Error err -> Some (h, op, err)) ops
and parsed_ops =
List.filter_map
(fun (h, op, parsed_op) -> match parsed_op with
| Ok parsed_op -> Some (h, op, parsed_op)
| Error _ -> None) ops in
ignore invalid_ops; (* FIXME *)
let sorted_ops =
if sort then
let compare (_, _, op1) (_, _, op2) = Proto.compare_operations op1 op2 in
List.sort compare parsed_ops
else parsed_ops in
let apply_operation state max_ops op (parse_op) =
let size = Data_encoding.Binary.length Operation.encoding op in
if max_ops <= 0 then
fail Too_many_operations
else if size > Proto.max_operation_data_length then
fail (Oversized_operation { size ; max = Proto.max_operation_data_length })
else
Proto.apply_operation state parse_op >>=? fun (state, receipt) ->
return (state, receipt) in
apply_operations
apply_operation
state Preapply_result.empty max_number_of_operations
~sort sorted_ops >>= fun (state, max_number_of_operations, r) ->
let r =
{ r with
applied = List.rev r.applied ;
branch_refused =
List.fold_left
(fun map (h, op, err) -> Operation_hash.Map.add h (op, err) map)
r.branch_refused invalid_ops } in
Lwt.return ({ state ; max_number_of_operations ; new_operation_input ; }, r)
let end_prevalidation { state } = let apply_operation_with_preapply_result preapp t op =
Proto.finalize_block state >>=? fun (result, _metadata) ->
return result
let notify_operation { new_operation_input } result =
let open Preapply_result in let open Preapply_result in
let { applied ; refused ; branch_refused ; branch_delayed } = result in apply_operation t op >>= function
(* Notify new opperations *) | Applied (t, _) ->
let map_op kind { Operation.shell ; proto } = let applied = (op.hash, op.raw) :: preapp.applied in
let protocol_data = Lwt.return ({ preapp with applied }, t)
Data_encoding.Binary.of_bytes_exn | Branch_delayed errors ->
Proto.operation_data_encoding let branch_delayed =
proto in Operation_hash.Map.add
kind, shell, protocol_data in op.hash
let fold_op kind _k (op, _error) acc = map_op kind op :: acc in (op.raw, errors)
let applied = List.map (map_op `Applied) (List.map snd applied) in preapp.branch_delayed in
let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in Lwt.return ({ preapp with branch_delayed }, t)
let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in | Branch_refused errors ->
let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in let branch_refused =
let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in Operation_hash.Map.add
List.iter (Lwt_watcher.notify new_operation_input) ops op.hash
(op.raw, errors)
preapp.branch_refused in
Lwt.return ({ preapp with branch_refused }, t)
| Refused errors ->
let refused =
Operation_hash.Map.add
op.hash
(op.raw, errors)
preapp.refused in
Lwt.return ({ preapp with refused }, t)
| Duplicate | Outdated -> Lwt.return (preapp, t)
let shutdown_operation_input { new_operation_input } = type status = {
Lwt_watcher.shutdown_input new_operation_input applied_operations : (operation * Proto.operation_receipt) list ;
block_result : Tezos_protocol_environment_shell.validation_result ;
block_metadata : Proto.block_header_metadata ;
}
type new_operation_input = let status pv =
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] * Proto.finalize_block pv.state >>=? fun (block_result, block_metadata) ->
Operation.shell_header * return {
Proto.operation_data block_metadata ;
) Lwt_watcher.input block_result ;
applied_operations = pv.applied ;
let new_operation_input { new_operation_input } = new_operation_input }
end end
let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops = let preapply ~predecessor ~timestamp ~protocol_data operations =
State.Block.context predecessor >>= fun predecessor_context -> State.Block.context predecessor >>= fun predecessor_context ->
Context.get_protocol predecessor_context >>= fun protocol -> Context.get_protocol predecessor_context >>= fun protocol ->
begin begin
@ -264,27 +285,37 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops =
return protocol return protocol
end >>=? fun (module Proto) -> end >>=? fun (module Proto) ->
let module Prevalidation = Make(Proto) in let module Prevalidation = Make(Proto) in
Prevalidation.start_prevalidation Prevalidation.create
~protocol_data ~predecessor ~timestamp () >>=? fun validation_state -> ~protocol_data ~predecessor ~timestamp () >>=? fun validation_state ->
let ops = List.map (List.map (fun x -> Operation.hash x, x)) ops in
Lwt_list.fold_left_s Lwt_list.fold_left_s
(fun (validation_state, rs) ops -> (fun (acc_validation_result, acc_validation_state) operations ->
Prevalidation.prevalidate Lwt_list.fold_left_s
validation_state ~sort ops >>= fun (validation_state, r) -> (fun (acc_validation_result, acc_validation_state) op ->
Lwt.return (validation_state, rs @ [r])) match Prevalidation.parse op with
(validation_state, []) ops >>= fun (validation_state, rs) -> | Error _ ->
(* FIXME *)
Lwt.return (acc_validation_result, acc_validation_state)
| Ok op ->
Prevalidation.apply_operation_with_preapply_result
acc_validation_result acc_validation_state op)
(Preapply_result.empty, acc_validation_state)
operations
>>= fun (new_validation_result, new_validation_state) ->
Lwt.return (acc_validation_result @ [new_validation_result], new_validation_state)
) ([], validation_state) operations
>>= fun (validation_result_list, validation_state) ->
let operations_hash = let operations_hash =
Operation_list_list_hash.compute Operation_list_list_hash.compute
(List.map (List.map (fun r ->
(fun r -> Operation_list_hash.compute
Operation_list_hash.compute (List.map fst r.Preapply_result.applied)
(List.map fst r.Preapply_result.applied)) ) validation_result_list)
rs) in in
Prevalidation.end_prevalidation validation_state >>=? fun validation_result -> Prevalidation.status validation_state >>=? fun { block_result ; _ } ->
let pred_shell_header = State.Block.shell_header predecessor in let pred_shell_header = State.Block.shell_header predecessor in
let level = Int32.succ pred_shell_header.level in let level = Int32.succ pred_shell_header.level in
Block_validator.may_patch_protocol Block_validator.may_patch_protocol
~level validation_result >>=? fun { fitness ; context ; message } -> ~level block_result >>=? fun { fitness ; context ; message } ->
State.Block.protocol_hash predecessor >>= fun pred_protocol -> State.Block.protocol_hash predecessor >>= fun pred_protocol ->
Context.get_protocol context >>= fun protocol -> Context.get_protocol context >>= fun protocol ->
let proto_level = let proto_level =
@ -297,7 +328,7 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops =
proto_level ; proto_level ;
predecessor = State.Block.hash predecessor ; predecessor = State.Block.hash predecessor ;
timestamp ; timestamp ;
validation_passes = List.length rs ; validation_passes = List.length validation_result_list ;
operations_hash ; operations_hash ;
fitness ; fitness ;
context = Context_hash.zero ; (* place holder *) context = Context_hash.zero ; (* place holder *)
@ -315,4 +346,4 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops =
return (context, message) return (context, message)
end >>=? fun (context, message) -> end >>=? fun (context, message) ->
Context.hash ?message ~time:timestamp context >>= fun context -> Context.hash ?message ~time:timestamp context >>= fun context ->
return ({ shell_header with context }, rs) return ({ shell_header with context }, validation_result_list)

View File

@ -32,45 +32,45 @@ module type T = sig
module Proto: Registered_protocol.T module Proto: Registered_protocol.T
type state type t
type operation = private {
hash: Operation_hash.t ;
raw: Operation.t ;
protocol_data: Proto.operation_data ;
}
val compare: operation -> operation -> int
val parse: Operation.t -> operation tzresult
(** Creates a new prevalidation context w.r.t. the protocol associate to the (** Creates a new prevalidation context w.r.t. the protocol associate to the
predecessor block . When ?protocol_data is passed to this function, it will predecessor block . When ?protocol_data is passed to this function, it will
be used to create the new block *) be used to create the new block *)
val start_prevalidation : val create :
?protocol_data: MBytes.t -> ?protocol_data: MBytes.t ->
predecessor: State.Block.t -> predecessor: State.Block.t ->
timestamp: Time.t -> timestamp: Time.t ->
unit -> state tzresult Lwt.t unit -> t tzresult Lwt.t
(** Given a prevalidation context applies a list of operations, type result =
returns a new prevalidation context plus the preapply result containing the | Applied of t * Proto.operation_receipt
list of operations that cannot be applied to this context *) | Branch_delayed of error list
val prevalidate : | Branch_refused of error list
state -> sort:bool -> | Refused of error list
(Operation_hash.t * Operation.t) list -> | Duplicate
(state * error Preapply_result.t) Lwt.t | Outdated
val end_prevalidation : val apply_operation: t -> operation -> result Lwt.t
state -> val apply_operation_with_preapply_result:
Tezos_protocol_environment_shell.validation_result tzresult Lwt.t error Preapply_result.t -> t -> operation -> (error Preapply_result.t * t) Lwt.t
val notify_operation : type status = {
state -> applied_operations : (operation * Proto.operation_receipt) list ;
error Preapply_result.t -> block_result : Tezos_protocol_environment_shell.validation_result ;
unit block_metadata : Proto.block_header_metadata ;
}
val shutdown_operation_input : val status: t -> status tzresult Lwt.t
state ->
unit
type new_operation_input =
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
Operation.shell_header *
Proto.operation_data
) Lwt_watcher.input
val new_operation_input: state -> new_operation_input
end end
@ -81,6 +81,5 @@ val preapply :
predecessor:State.Block.t -> predecessor:State.Block.t ->
timestamp:Time.t -> timestamp:Time.t ->
protocol_data:MBytes.t -> protocol_data:MBytes.t ->
sort_operations:bool ->
Operation.t list list -> Operation.t list list ->
(Block_header.shell_header * error Preapply_result.t list) tzresult Lwt.t (Block_header.shell_header * error Preapply_result.t list) tzresult Lwt.t

View File

@ -53,7 +53,12 @@ module type T = sig
mutable mempool : Mempool.t ; mutable mempool : Mempool.t ;
mutable in_mempool : Operation_hash.Set.t ; mutable in_mempool : Operation_hash.Set.t ;
mutable validation_result : error Preapply_result.t ; mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.state tzresult ; mutable validation_state : Prevalidation.t tzresult ;
mutable operation_stream :
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
Operation.shell_header *
Proto.operation_data
) Lwt_watcher.input;
mutable advertisement : [ `Pending of Mempool.t | `None ] ; mutable advertisement : [ `Pending of Mempool.t | `None ] ;
mutable rpc_directory : types_state RPC_directory.t lazy_t ; mutable rpc_directory : types_state RPC_directory.t lazy_t ;
} }
@ -103,7 +108,12 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
mutable mempool : Mempool.t ; mutable mempool : Mempool.t ;
mutable in_mempool : Operation_hash.Set.t ; mutable in_mempool : Operation_hash.Set.t ;
mutable validation_result : error Preapply_result.t ; mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.state tzresult ; mutable validation_state : Prevalidation.t tzresult ;
mutable operation_stream :
([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
Operation.shell_header *
Proto.operation_data
) Lwt_watcher.input;
mutable advertisement : [ `Pending of Mempool.t | `None ] ; mutable advertisement : [ `Pending of Mempool.t | `None ] ;
mutable rpc_directory : types_state RPC_directory.t lazy_t ; mutable rpc_directory : types_state RPC_directory.t lazy_t ;
} }
@ -171,6 +181,27 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
= Worker.Make (Name) (Prevalidator_worker_state.Event) = Worker.Make (Name) (Prevalidator_worker_state.Event)
(Prevalidator_worker_state.Request) (Types) (Prevalidator_worker_state.Request) (Types)
(** Centralised operation stream for the RPCs *)
let notify_operation { operation_stream } result =
let open Preapply_result in
let { applied ; refused ; branch_refused ; branch_delayed } = result in
(* Notify new opperations *)
let map_op kind { Operation.shell ; proto } =
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
proto in
kind, shell, protocol_data in
let fold_op kind _k (op, _error) acc = map_op kind op :: acc in
let applied = List.map (map_op `Applied) (List.map snd applied) in
let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in
let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in
let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in
let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in
List.iter (Lwt_watcher.notify operation_stream) ops
open Types open Types
type worker = Worker.infinite Worker.queue Worker.t type worker = Worker.infinite Worker.queue Worker.t
@ -296,15 +327,26 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
pv.pending <- pv.pending <-
Operation_hash.Map.empty ; Operation_hash.Map.empty ;
Lwt.return_unit Lwt.return_unit
| Ok validation_state -> | Ok state ->
match Operation_hash.Map.cardinal pv.pending with match Operation_hash.Map.cardinal pv.pending with
| 0 -> Lwt.return_unit | 0 -> Lwt.return_unit
| n -> debug w "processing %d operations" n ; | n ->
Prevalidation.prevalidate validation_state ~sort:true debug w "processing %d operations" n ;
(Operation_hash.Map.bindings pv.pending) let operations = List.map snd (Operation_hash.Map.bindings pv.pending) in
>>= fun (validation_state, validation_result) -> Lwt_list.fold_left_s
pv.validation_state <- (fun (acc_validation_result, acc_validation_state) op ->
Ok validation_state ; match Prevalidation.parse op with
| Error _ ->
(* FIXME *)
Lwt.return (acc_validation_result, acc_validation_state)
| Ok op ->
Prevalidation.apply_operation_with_preapply_result
acc_validation_result acc_validation_state op)
(pv.validation_result, state)
operations
>>= fun (new_result, new_state) ->
pv.validation_state <- Ok new_state ;
pv.validation_result <- new_result ;
pv.in_mempool <- pv.in_mempool <-
(Operation_hash.Map.fold (Operation_hash.Map.fold
(fun h _ in_mempool -> Operation_hash.Set.add h in_mempool) (fun h _ in_mempool -> Operation_hash.Set.add h in_mempool)
@ -323,15 +365,10 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
Operation_hash.Map.iter Operation_hash.Map.iter
(fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.chain_db oph) (fun oph _ -> Distributed_db.Operation.clear_or_cancel pv.chain_db oph)
pv.validation_result.refused ; pv.validation_result.refused ;
pv.validation_result <- pv.pending <- Operation_hash.Map.empty ;
merge_validation_results
~old:pv.validation_result
~neu:validation_result ;
pv.pending <-
Operation_hash.Map.empty ;
advertise w pv advertise w pv
(mempool_of_prevalidation_result validation_result) ; (mempool_of_prevalidation_result new_result) ;
Prevalidation.notify_operation validation_state validation_result ; notify_operation pv new_result ;
Lwt.return_unit Lwt.return_unit
end >>= fun () -> end >>= fun () ->
pv.mempool <- pv.mempool <-
@ -399,62 +436,57 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
dir := RPC_directory.gen_register !dir dir := RPC_directory.gen_register !dir
(Proto_services.S.Mempool.monitor_operations RPC_path.open_root) (Proto_services.S.Mempool.monitor_operations RPC_path.open_root)
begin fun { validation_state ; validation_result = current_mempool } params () -> begin fun { validation_result = current_mempool ; operation_stream } params () ->
match validation_state with let open Preapply_result in
| Error _ -> assert false let op_stream, stopper = Lwt_watcher.create_stream operation_stream in
| Ok pv -> (* Convert ops *)
let new_operation_input = Prevalidation.new_operation_input pv in let map_op op =
let open Preapply_result in let protocol_data =
let operation_stream, stopper = Data_encoding.Binary.of_bytes_exn
Lwt_watcher.create_stream new_operation_input in Proto.operation_data_encoding
(* Convert ops *) op.Operation.proto in
let map_op op = Proto.{ shell = op.shell ; protocol_data } in
let protocol_data = let fold_op _k (op, _error) acc = map_op op :: acc in
Data_encoding.Binary.of_bytes_exn (* First call : retrieve the current set of op from the mempool *)
Proto.operation_data_encoding let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in
op.Operation.proto in let applied = if params#applied then List.map map_op (List.map snd applied) else [] in
Proto.{ shell = op.shell ; protocol_data } in let refused = if params#refused then
let fold_op _k (op, _error) acc = map_op op :: acc in Operation_hash.Map.fold fold_op refused [] else [] in
(* First call : retrieve the current set of op from the mempool *) let branch_refused = if params#branch_refused then
let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in Operation_hash.Map.fold fold_op branch_refused [] else [] in
let applied = if params#applied then List.map map_op (List.map snd applied) else [] in let branch_delayed = if params#branch_delayed then
let refused = if params#refused then Operation_hash.Map.fold fold_op branch_delayed [] else [] in
Operation_hash.Map.fold fold_op refused [] else [] in let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in
let branch_refused = if params#branch_refused then let current_mempool = ref (Some current_mempool) in
Operation_hash.Map.fold fold_op branch_refused [] else [] in let filter_result = function
let branch_delayed = if params#branch_delayed then | `Applied -> params#applied
Operation_hash.Map.fold fold_op branch_delayed [] else [] in | `Refused -> params#branch_refused
let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in | `Branch_refused -> params#refused
let current_mempool = ref (Some current_mempool) in | `Branch_delayed -> params#branch_delayed
let filter_result = function in
| `Applied -> params#applied let next () =
| `Refused -> params#branch_refused match !current_mempool with
| `Branch_refused -> params#refused | Some mempool -> begin
| `Branch_delayed -> params#branch_delayed current_mempool := None ;
in Lwt.return_some mempool
let next () = end
match !current_mempool with | None -> begin
| Some mempool -> begin Lwt_stream.get op_stream >>= function
current_mempool := None ; | Some (kind, shell, protocol_data) when filter_result kind ->
Lwt.return_some mempool (* NOTE: Should the protocol change, a new Prevalidation
end * context would be created. Thus, we use the same Proto. *)
| None -> begin let bytes = Data_encoding.Binary.to_bytes_exn
Lwt_stream.get operation_stream >>= function Proto.operation_data_encoding
| Some (kind, shell, protocol_data) when filter_result kind -> protocol_data in
(* NOTE: Should the protocol change, a new Prevalidation let protocol_data = Data_encoding.Binary.of_bytes_exn
* context would be created. Thus, we use the same Proto. *) Proto.operation_data_encoding
let bytes = Data_encoding.Binary.to_bytes_exn bytes in
Proto.operation_data_encoding Lwt.return_some [ { Proto.shell ; protocol_data } ]
protocol_data in | _ -> Lwt.return_none
let protocol_data = Data_encoding.Binary.of_bytes_exn end
Proto.operation_data_encoding in
bytes in let shutdown () = Lwt_watcher.shutdown stopper in
Lwt.return_some [ { Proto.shell ; protocol_data } ] RPC_answer.return_stream { next ; shutdown }
| _ -> Lwt.return_none
end
in
let shutdown () = Lwt_watcher.shutdown stopper in
RPC_answer.return_stream { next ; shutdown }
end ; end ;
!dir !dir
@ -475,38 +507,18 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
let on_inject pv op = let on_inject pv op =
let oph = Operation.hash op in let oph = Operation.hash op in
begin if already_handled pv oph then
if already_handled pv oph then return_unit (* FIXME : is this an error ? *)
return pv.validation_result
else
Lwt.return pv.validation_state >>=? fun validation_state ->
Prevalidation.prevalidate
validation_state ~sort:false [ (oph, op) ] >>= fun (_, result) ->
match result.applied with
| [ app_oph, _ ] when Operation_hash.equal app_oph oph ->
Distributed_db.inject_operation pv.chain_db oph op >>= fun (_ : bool) ->
pv.pending <- Operation_hash.Map.add oph op pv.pending ;
return result
| _ ->
return result
end >>=? fun result ->
if List.mem_assoc oph result.applied then
return_unit
else else
let try_in_map map proj or_else = Lwt.return pv.validation_state >>=? fun validation_state ->
try Lwt.return (Prevalidation.parse op) >>=? fun parsed_op ->
Lwt.return (Error (proj (Operation_hash.Map.find oph map))) Prevalidation.apply_operation validation_state parsed_op >>= function
with Not_found -> or_else () in | Applied (_, _result) ->
try_in_map pv.refusals (fun h -> h) @@ fun () -> Distributed_db.inject_operation pv.chain_db oph op >>= fun (_ : bool) ->
try_in_map result.refused snd @@ fun () -> pv.pending <- Operation_hash.Map.add parsed_op.hash op pv.pending ;
try_in_map result.branch_refused snd @@ fun () -> return_unit
try_in_map result.branch_delayed snd @@ fun () -> | _ ->
if Operation_hash.Set.mem oph pv.live_operations then failwith "Error while applying operation %a" Operation_hash.pp oph
failwith "Injected operation %a included in a previous block."
Operation_hash.pp oph
else
failwith "Injected operation %a is not in prevalidation result."
Operation_hash.pp oph
let on_notify w pv peer mempool = let on_notify w pv peer mempool =
let all_ophs = let all_ophs =
@ -526,21 +538,15 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
to_fetch to_fetch
let on_flush w pv predecessor = let on_flush w pv predecessor =
Lwt_watcher.shutdown_input pv.operation_stream;
list_pendings list_pendings
~maintain_chain_db:pv.chain_db ~maintain_chain_db:pv.chain_db
~from_block:pv.predecessor ~to_block:predecessor ~from_block:pv.predecessor ~to_block:predecessor
(Preapply_result.operations pv.validation_result) (Preapply_result.operations pv.validation_result)
>>= fun (pending, new_live_blocks, new_live_operations) -> >>= fun (pending, new_live_blocks, new_live_operations) ->
let timestamp = Time.now () in let timestamp = Time.now () in
Prevalidation.start_prevalidation Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state ->
~predecessor ~timestamp () >>= fun validation_state -> let validation_result = Preapply_result.empty in
begin match validation_state with
| Error _ -> Lwt.return (validation_state, Preapply_result.empty)
| Ok validation_state ->
Prevalidation.prevalidate
validation_state ~sort:false [] >>= fun (state, result) ->
Lwt.return (Ok state, result)
end >>= fun (validation_state, validation_result) ->
debug w "%d operations were not washed by the flush" debug w "%d operations were not washed by the flush"
(Operation_hash.Map.cardinal pending) ; (Operation_hash.Map.cardinal pending) ;
pv.predecessor <- predecessor ; pv.predecessor <- predecessor ;
@ -552,6 +558,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
pv.in_mempool <- Operation_hash.Set.empty ; pv.in_mempool <- Operation_hash.Set.empty ;
pv.validation_result <- validation_result ; pv.validation_result <- validation_result ;
pv.validation_state <- validation_state ; pv.validation_state <- validation_state ;
pv.operation_stream <- Lwt_watcher.create_input () ;
return_unit return_unit
let on_advertise pv = let on_advertise pv =
@ -571,9 +578,6 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
(* TODO: rebase the advertisement instead *) (* TODO: rebase the advertisement instead *)
let chain_state = Distributed_db.chain_state pv.chain_db in let chain_state = Distributed_db.chain_state pv.chain_db in
State.Block.read chain_state hash >>=? fun block -> State.Block.read chain_state hash >>=? fun block ->
begin match pv.validation_state with
| Ok pv -> Prevalidation.shutdown_operation_input pv
| Error _ -> () end ;
on_flush w pv block >>=? fun () -> on_flush w pv block >>=? fun () ->
return (() : r) return (() : r)
| Request.Notify (peer, mempool) -> | Request.Notify (peer, mempool) ->
@ -604,16 +608,8 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
{ current_head = predecessor ; current_mempool = mempool ; { current_head = predecessor ; current_mempool = mempool ;
live_blocks ; live_operations } -> live_blocks ; live_operations } ->
let timestamp = Time.now () in let timestamp = Time.now () in
Prevalidation.start_prevalidation Prevalidation.create ~predecessor ~timestamp () >>= fun validation_state ->
~predecessor ~timestamp () >>= fun validation_state -> let validation_result = Preapply_result.empty in
begin match validation_state with
| Error _ -> Lwt.return (validation_state, Preapply_result.empty)
| Ok validation_state ->
Prevalidation.prevalidate validation_state ~sort:false []
>>= fun (validation_state, validation_result) ->
Lwt.return (Ok validation_state, validation_result)
end >>= fun (validation_state, validation_result) ->
let fetching = let fetching =
List.fold_left List.fold_left
(fun s h -> Operation_hash.Set.add h s) (fun s h -> Operation_hash.Set.add h s)
@ -627,7 +623,9 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
fetching ; fetching ;
pending = Operation_hash.Map.empty ; pending = Operation_hash.Map.empty ;
in_mempool = Operation_hash.Set.empty ; in_mempool = Operation_hash.Set.empty ;
validation_result ; validation_state ; validation_result ;
validation_state ;
operation_stream = Lwt_watcher.create_input () ;
advertisement = `None ; advertisement = `None ;
rpc_directory = rpc_directory ; rpc_directory = rpc_directory ;
} in } in