Mempool_peer_worker: minor improvements

This commit is contained in:
Raphaël Proust 2018-11-08 12:08:11 +08:00 committed by MBourgoin
parent 460262130e
commit 1431bcd6c8
No known key found for this signature in database
GPG Key ID: 4B3F7008ABB5B2D0
2 changed files with 14 additions and 18 deletions

View File

@ -53,8 +53,6 @@ module type T = sig
* from a previous protocol, etc. *)
val bypass_peer_workers: Mempool_worker.t -> input -> output Lwt.t
val rpc_directory : t RPC_directory.t
end
module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_worker.Proto = struct
@ -73,6 +71,8 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w
| Mempool_result of Mempool_worker.result
type output = result Operation_hash.Map.t
let pp_input = Format.pp_print_list Operation_hash.pp
module Log = Tezos_stdlib.Logging.Make(struct
let name = "node.mempool.peer_worker"
end)
@ -140,6 +140,12 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w
results = Operation_hash.Map.empty;
}
let cancel pipeline =
let cancel_snd (_, p) = Lwt.cancel p in
Queue.iter cancel_snd pipeline.downloading;
Queue.iter cancel_snd pipeline.parsing;
Queue.iter cancel_snd pipeline.applying
(* Exported interactions *)
@ -208,12 +214,7 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w
loop ()
in
let work = loop () in
Lwt.on_cancel work
(fun () ->
let cancel_snd (_, p) = Lwt.cancel p in
Queue.iter cancel_snd pipeline.downloading;
Queue.iter cancel_snd pipeline.parsing;
Queue.iter cancel_snd pipeline.applying);
Lwt.on_cancel work (fun () -> cancel pipeline);
work
end
@ -280,7 +281,7 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w
| Start input ->
Format.fprintf ppf
"@[<v 0>Starting: %a@]"
(Format.pp_print_list Operation_hash.pp)
pp_input
input
| End_ok (view, _) ->
Format.fprintf ppf
@ -349,12 +350,9 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w
Worker.push_request_and_wait t (Request.Batch (mempool_worker, os))
let bypass_peer_workers mempool_worker input =
(* TODO: log, but not through the Worker's event system because no worker is available. *)
Work.work mempool_worker input
(* 5. Introspection *)
let rpc_directory = Pervasives.failwith "TODO"
Log.lwt_log_info "Bypassing workers to work on: %a" pp_input input >>= fun () ->
Work.work mempool_worker input >>= fun output ->
Log.lwt_log_info "Finished work" >>= fun () ->
Lwt.return output
end

View File

@ -82,8 +82,6 @@ module type T = sig
* calls to [bypass_peer_workers] is not sequentialised. *)
val bypass_peer_workers: Mempool_worker.t -> input -> output Lwt.t
val rpc_directory : t RPC_directory.t
end