From 1431bcd6c8ce49a83fad6bd1c7de228291f052df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Thu, 8 Nov 2018 12:08:11 +0800 Subject: [PATCH] Mempool_peer_worker: minor improvements --- src/lib_shell/mempool_peer_worker.ml | 30 +++++++++++++-------------- src/lib_shell/mempool_peer_worker.mli | 2 -- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/src/lib_shell/mempool_peer_worker.ml b/src/lib_shell/mempool_peer_worker.ml index 8ca677d90..8487fb736 100644 --- a/src/lib_shell/mempool_peer_worker.ml +++ b/src/lib_shell/mempool_peer_worker.ml @@ -53,8 +53,6 @@ module type T = sig * from a previous protocol, etc. *) val bypass_peer_workers: Mempool_worker.t -> input -> output Lwt.t - val rpc_directory : t RPC_directory.t - end module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_worker.Proto = struct @@ -73,6 +71,8 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w | Mempool_result of Mempool_worker.result type output = result Operation_hash.Map.t + let pp_input = Format.pp_print_list Operation_hash.pp + module Log = Tezos_stdlib.Logging.Make(struct let name = "node.mempool.peer_worker" end) @@ -140,6 +140,12 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w results = Operation_hash.Map.empty; } + let cancel pipeline = + let cancel_snd (_, p) = Lwt.cancel p in + Queue.iter cancel_snd pipeline.downloading; + Queue.iter cancel_snd pipeline.parsing; + Queue.iter cancel_snd pipeline.applying + (* Exported interactions *) @@ -208,12 +214,7 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w loop () in let work = loop () in - Lwt.on_cancel work - (fun () -> - let cancel_snd (_, p) = Lwt.cancel p in - Queue.iter cancel_snd pipeline.downloading; - Queue.iter cancel_snd pipeline.parsing; - Queue.iter cancel_snd pipeline.applying); + Lwt.on_cancel work (fun () -> cancel pipeline); work end @@ -280,7 +281,7 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w | Start input -> Format.fprintf ppf "@[Starting: %a@]" - (Format.pp_print_list Operation_hash.pp) + pp_input input | End_ok (view, _) -> Format.fprintf ppf @@ -349,12 +350,9 @@ module Make (Mempool_worker: Mempool_worker.T) : T with module Proto = Mempool_w Worker.push_request_and_wait t (Request.Batch (mempool_worker, os)) let bypass_peer_workers mempool_worker input = - (* TODO: log, but not through the Worker's event system because no worker is available. *) - Work.work mempool_worker input - - - (* 5. Introspection *) - - let rpc_directory = Pervasives.failwith "TODO" + Log.lwt_log_info "Bypassing workers to work on: %a" pp_input input >>= fun () -> + Work.work mempool_worker input >>= fun output -> + Log.lwt_log_info "Finished work" >>= fun () -> + Lwt.return output end diff --git a/src/lib_shell/mempool_peer_worker.mli b/src/lib_shell/mempool_peer_worker.mli index fcd95bb50..016237516 100644 --- a/src/lib_shell/mempool_peer_worker.mli +++ b/src/lib_shell/mempool_peer_worker.mli @@ -82,8 +82,6 @@ module type T = sig * calls to [bypass_peer_workers] is not sequentialised. *) val bypass_peer_workers: Mempool_worker.t -> input -> output Lwt.t - val rpc_directory : t RPC_directory.t - end