Shell/Prevalidation: add the streaming of the newly validated operations
This commit is contained in:
parent
fee4e684c1
commit
e5b2927310
@ -64,12 +64,17 @@ let rec apply_operations apply_operation state r max_ops ~sort ops =
|
||||
Lwt.return (state, max_ops, r)
|
||||
|
||||
type prevalidation_state =
|
||||
State : { proto : 'a proto ; state : 'a ;
|
||||
max_number_of_operations : int }
|
||||
State : { proto : ('state, 'operation_data) proto ; state : 'state ;
|
||||
max_number_of_operations : int ;
|
||||
new_operation_input : ([ `Applied | `Refused | `Branch_refused | `Branch_delayed ] *
|
||||
Operation.shell_header * 'operation_data) Lwt_watcher.input ;
|
||||
}
|
||||
-> prevalidation_state
|
||||
|
||||
and 'a proto =
|
||||
(module Registered_protocol.T with type P.validation_state = 'a)
|
||||
and ('state, 'operation_data) proto =
|
||||
(module Registered_protocol.T
|
||||
with type P.validation_state = 'state
|
||||
and type P.operation_data = 'operation_data )
|
||||
|
||||
let start_prevalidation
|
||||
?protocol_data
|
||||
@ -120,14 +125,16 @@ let start_prevalidation
|
||||
>>=? fun state ->
|
||||
(* FIXME arbitrary value, to be customisable *)
|
||||
let max_number_of_operations = 1000 in
|
||||
let new_operation_input = Lwt_watcher.create_input () in
|
||||
return (State { proto = (module Proto) ; state ;
|
||||
max_number_of_operations })
|
||||
|
||||
max_number_of_operations ;
|
||||
new_operation_input ;
|
||||
})
|
||||
|
||||
let prevalidate
|
||||
(State { proto = (module Proto) ; state ;
|
||||
max_number_of_operations })
|
||||
~sort (ops : (Operation_hash.t * Operation.t) list)=
|
||||
max_number_of_operations ; new_operation_input })
|
||||
~sort (ops : (Operation_hash.t * Operation.t) list) =
|
||||
let ops =
|
||||
List.map
|
||||
(fun (h, op) ->
|
||||
@ -176,7 +183,7 @@ let prevalidate
|
||||
(fun map (h, op, err) -> Operation_hash.Map.add h (op, err) map)
|
||||
r.branch_refused invalid_ops } in
|
||||
Lwt.return (State { proto = (module Proto) ; state ;
|
||||
max_number_of_operations },
|
||||
max_number_of_operations ; new_operation_input },
|
||||
r)
|
||||
|
||||
let end_prevalidation (State { proto = (module Proto) ; state }) =
|
||||
@ -236,3 +243,97 @@ let preapply ~predecessor ~timestamp ~protocol_data ~sort_operations:sort ops =
|
||||
end >>=? fun (context, message) ->
|
||||
Context.commit ?message ~time:timestamp context >>= fun context ->
|
||||
return ({ shell_header with context }, rs)
|
||||
|
||||
let notify_operation (State { proto = (module Proto) ; new_operation_input ; }) result =
|
||||
let { applied ; refused ; branch_refused ; branch_delayed } = result in
|
||||
(* Notify new opperations *)
|
||||
let map_op kind { Operation.shell ; proto } =
|
||||
let protocol_data =
|
||||
Data_encoding.Binary.of_bytes_exn
|
||||
Proto.operation_data_encoding
|
||||
proto in
|
||||
kind, shell, protocol_data in
|
||||
let fold_op kind _k (op, _error) acc = map_op kind op :: acc in
|
||||
let applied = List.map (map_op `Applied) (List.map snd applied) in
|
||||
let refused = Operation_hash.Map.fold (fold_op `Refused) refused [] in
|
||||
let branch_refused = Operation_hash.Map.fold (fold_op `Branch_refused) branch_refused [] in
|
||||
let branch_delayed = Operation_hash.Map.fold (fold_op `Branch_delayed) branch_delayed [] in
|
||||
let ops = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in
|
||||
List.iter (Lwt_watcher.notify new_operation_input) ops
|
||||
|
||||
let shutdown_operation_input (State { new_operation_input }) =
|
||||
Lwt_watcher.shutdown_input new_operation_input
|
||||
|
||||
let build_rpc_directory protocol =
|
||||
begin
|
||||
match Registered_protocol.get protocol with
|
||||
| None ->
|
||||
(* FIXME. *)
|
||||
(* This should not happen: it should be handled in the validator. *)
|
||||
failwith "Prevalidation: missing protocol '%a' for the current block."
|
||||
Protocol_hash.pp_short protocol
|
||||
| Some protocol ->
|
||||
return protocol
|
||||
end >>=? fun (module Proto) ->
|
||||
let module Proto_services = Block_services.Make(Proto)(Proto) in
|
||||
|
||||
let dir : (prevalidation_state * Error_monad.error Preapply_result.t) RPC_directory.t ref =
|
||||
ref RPC_directory.empty in
|
||||
|
||||
let gen_register s f =
|
||||
dir := RPC_directory.gen_register !dir s f in
|
||||
|
||||
gen_register
|
||||
(Proto_services.S.Mempool.monitor_operations RPC_path.open_root)
|
||||
begin fun ((State { new_operation_input ; proto = (module Next_proto) }), current_mempool) params () ->
|
||||
let operation_stream, stopper =
|
||||
Lwt_watcher.create_stream new_operation_input in
|
||||
(* Convert ops *)
|
||||
let map_op op =
|
||||
let protocol_data =
|
||||
Data_encoding.Binary.of_bytes_exn
|
||||
Proto.operation_data_encoding
|
||||
op.Operation.proto in
|
||||
Proto.{ shell = op.shell ; protocol_data } in
|
||||
let fold_op _k (op, _error) acc = map_op op :: acc in
|
||||
(* First call : retrieve the current set of op from the mempool *)
|
||||
let { applied ; refused ; branch_refused ; branch_delayed } = current_mempool in
|
||||
let applied = if params#applied then List.map map_op (List.map snd applied) else [] in
|
||||
let refused = if params#refused then
|
||||
Operation_hash.Map.fold fold_op refused [] else [] in
|
||||
let branch_refused = if params#branch_refused then
|
||||
Operation_hash.Map.fold fold_op branch_refused [] else [] in
|
||||
let branch_delayed = if params#branch_delayed then
|
||||
Operation_hash.Map.fold fold_op branch_delayed [] else [] in
|
||||
let current_mempool = List.concat [ applied ; refused ; branch_refused ; branch_delayed ] in
|
||||
let current_mempool = ref (Some current_mempool) in
|
||||
let filter_result = function
|
||||
| `Applied -> params#applied
|
||||
| `Refused -> params#branch_refused
|
||||
| `Branch_refused -> params#refused
|
||||
| `Branch_delayed -> params#branch_delayed
|
||||
in
|
||||
let next () =
|
||||
match !current_mempool with
|
||||
| Some mempool -> begin
|
||||
current_mempool := None ;
|
||||
Lwt.return_some mempool
|
||||
end
|
||||
| None -> begin
|
||||
Lwt_stream.get operation_stream >>= function
|
||||
| Some (kind, shell, protocol_data) when filter_result kind ->
|
||||
let bytes = Data_encoding.Binary.to_bytes_exn
|
||||
Next_proto.operation_data_encoding
|
||||
protocol_data in
|
||||
let protocol_data = Data_encoding.Binary.of_bytes_exn
|
||||
Proto.operation_data_encoding
|
||||
bytes in
|
||||
Lwt.return_some [ { Proto.shell ; protocol_data } ]
|
||||
| _ -> Lwt.return_none
|
||||
end
|
||||
in
|
||||
let shutdown () = Lwt_watcher.shutdown stopper in
|
||||
RPC_answer.return_stream { next ; shutdown }
|
||||
end ;
|
||||
|
||||
return !dir
|
||||
|
@ -48,3 +48,15 @@ val preapply :
|
||||
Operation.t list list ->
|
||||
(Block_header.shell_header * error Preapply_result.t list) tzresult Lwt.t
|
||||
|
||||
val notify_operation :
|
||||
prevalidation_state ->
|
||||
error Preapply_result.t ->
|
||||
unit
|
||||
|
||||
val shutdown_operation_input :
|
||||
prevalidation_state ->
|
||||
unit
|
||||
|
||||
val build_rpc_directory :
|
||||
Protocol_hash.t ->
|
||||
(prevalidation_state * error Preapply_result.t) RPC_directory.t tzresult Lwt.t
|
||||
|
@ -122,9 +122,12 @@ let rpc_directory protocol =
|
||||
return protocol
|
||||
end >>=? fun (module Proto) ->
|
||||
let module Proto_services = Block_services.Make(Proto)(Proto) in
|
||||
return @@
|
||||
RPC_directory.register
|
||||
RPC_directory.empty
|
||||
|
||||
let dir : state RPC_directory.t ref = ref RPC_directory.empty in
|
||||
let register s f =
|
||||
dir := RPC_directory.register !dir s f in
|
||||
|
||||
register
|
||||
(Proto_services.S.Mempool.pending_operations RPC_path.open_root)
|
||||
(fun pv () () ->
|
||||
let map_op op =
|
||||
@ -147,7 +150,17 @@ let rpc_directory protocol =
|
||||
Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ;
|
||||
unprocessed =
|
||||
Operation_hash.Map.map map_op pv.pending ;
|
||||
})
|
||||
}) ;
|
||||
|
||||
Prevalidation.build_rpc_directory protocol >>=? fun prevalidation_dir ->
|
||||
let prevalidation_dir =
|
||||
RPC_directory.map (fun state ->
|
||||
match state.validation_state with
|
||||
| Error _ -> assert false
|
||||
| Ok pv -> Lwt.return (pv, state.validation_result)
|
||||
) prevalidation_dir in
|
||||
|
||||
return (RPC_directory.merge !dir prevalidation_dir)
|
||||
|
||||
let list_pendings ?maintain_chain_db ~from_block ~to_block old_mempool =
|
||||
let rec pop_blocks ancestor block mempool =
|
||||
@ -302,6 +315,7 @@ let handle_unprocessed w pv =
|
||||
Operation_hash.Map.empty ;
|
||||
advertise w pv
|
||||
(mempool_of_prevalidation_result validation_result) ;
|
||||
Prevalidation.notify_operation validation_state validation_result ;
|
||||
Lwt.return_unit
|
||||
end >>= fun () ->
|
||||
pv.mempool <-
|
||||
@ -448,6 +462,9 @@ let on_request
|
||||
(* TODO: rebase the advertisement instead *)
|
||||
let chain_state = Distributed_db.chain_state pv.chain_db in
|
||||
State.Block.read chain_state hash >>=? fun block ->
|
||||
begin match pv.validation_state with
|
||||
| Ok pv -> Prevalidation.shutdown_operation_input pv
|
||||
| Error _ -> () end ;
|
||||
on_flush w pv block >>=? fun () ->
|
||||
return (() : r)
|
||||
| Request.Notify (peer, mempool) ->
|
||||
|
@ -40,6 +40,15 @@ let create_input () =
|
||||
{ watchers = [];
|
||||
cpt = 0 }
|
||||
|
||||
let shutdown_input input =
|
||||
let { watchers ; _ } = input in
|
||||
List.iter (fun w ->
|
||||
w.active <- false ;
|
||||
w.push None
|
||||
) watchers ;
|
||||
input.cpt <- 0 ;
|
||||
input.watchers <- []
|
||||
|
||||
let create_fake_stream () =
|
||||
let str, push = Lwt_stream.create () in
|
||||
str, (fun () -> push None)
|
||||
|
@ -29,6 +29,7 @@ type 'a input
|
||||
type stopper
|
||||
|
||||
val create_input : unit -> 'a input
|
||||
val shutdown_input : 'a input -> unit
|
||||
val notify : 'a input -> 'a -> unit
|
||||
val create_stream : 'a input -> 'a Lwt_stream.t * stopper
|
||||
val create_fake_stream : unit -> 'a Lwt_stream.t * stopper
|
||||
|
Loading…
Reference in New Issue
Block a user