Shell: mempool RPC fixes

This commit is contained in:
Tom Jack 2018-12-11 21:31:28 +00:00 committed by Grégoire Henry
parent 1357aff0ba
commit d53918451b
2 changed files with 11 additions and 9 deletions

View File

@ -602,18 +602,19 @@ module Make(Static: STATIC)(Proto: Registered_protocol.T)
let state = Worker.state w in let state = Worker.state w in
let filter_result = function let filter_result = function
| Applied _ -> params#applied | Applied _ -> params#applied
| Refused _ -> params#branch_refused | Refused _ -> params#refused
| Branch_refused _ -> params#refused | Branch_refused _ -> params#branch_refused
| Branch_delayed _ -> params#branch_delayed | Branch_delayed _ -> params#branch_delayed
| _ -> false in | _ -> false in
let op_stream, stopper = Lwt_watcher.create_stream state.operation_stream in let op_stream, stopper = Lwt_watcher.create_stream state.operation_stream in
let shutdown () = Lwt_watcher.shutdown stopper in let shutdown () = Lwt_watcher.shutdown stopper in
let next () = let rec next () =
Lwt_stream.get op_stream >>= function Lwt_stream.get op_stream >>= function
| Some (kind, shell, protocol_data) when filter_result kind -> | Some (kind, shell, protocol_data) when filter_result kind ->
Lwt.return_some [ { Proto.shell ; protocol_data } ] Lwt.return_some [ { Proto.shell ; protocol_data } ]
| _ -> Lwt.return_none in | Some _ -> next ()
| None -> Lwt.return_none in
RPC_answer.return_stream { next ; shutdown } RPC_answer.return_stream { next ; shutdown }
) )

View File

@ -470,7 +470,7 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
(fun (hash, op) -> (hash, map_op op)) (fun (hash, op) -> (hash, map_op op))
(List.rev pv.applied) ; (List.rev pv.applied) ;
refused = refused =
Operation_hash.Map.map map_op_error pv.branch_refusals ; Operation_hash.Map.map map_op_error pv.refusals ;
branch_refused = branch_refused =
Operation_hash.Map.map map_op_error pv.branch_refusals ; Operation_hash.Map.map map_op_error pv.branch_refusals ;
branch_delayed = branch_delayed =
@ -510,11 +510,11 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
let current_mempool = ref (Some current_mempool) in let current_mempool = ref (Some current_mempool) in
let filter_result = function let filter_result = function
| `Applied -> params#applied | `Applied -> params#applied
| `Refused -> params#branch_refused | `Refused -> params#refused
| `Branch_refused -> params#refused | `Branch_refused -> params#branch_refused
| `Branch_delayed -> params#branch_delayed | `Branch_delayed -> params#branch_delayed
in in
let next () = let rec next () =
match !current_mempool with match !current_mempool with
| Some mempool -> begin | Some mempool -> begin
current_mempool := None ; current_mempool := None ;
@ -532,7 +532,8 @@ module Make(Proto: Registered_protocol.T)(Arg: ARG): T = struct
Proto.operation_data_encoding Proto.operation_data_encoding
bytes in bytes in
Lwt.return_some [ { Proto.shell ; protocol_data } ] Lwt.return_some [ { Proto.shell ; protocol_data } ]
| _ -> Lwt.return_none | Some _ -> next ()
| None -> Lwt.return_none
end end
in in
let shutdown () = Lwt_watcher.shutdown stopper in let shutdown () = Lwt_watcher.shutdown stopper in