From f40c418d0fa10a4fe854dfad2f29bd251eed8948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Wed, 8 Nov 2017 12:06:20 +0100 Subject: [PATCH] Distributed_db: remove the last "lol-object". --- src/node/shell/distributed_db_functors.ml | 75 ++++++++++++----------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/src/node/shell/distributed_db_functors.ml b/src/node/shell/distributed_db_functors.ml index 984c613f5..60cda7335 100644 --- a/src/node/shell/distributed_db_functors.ml +++ b/src/node/shell/distributed_db_functors.ml @@ -7,6 +7,8 @@ (* *) (**************************************************************************) +module Canceler = Lwt_utils.Canceler + module type DISTRIBUTED_DB = sig type t @@ -284,9 +286,20 @@ end = struct type param = Request.param type t = { - push_to_worker: event -> unit ; - cancel_worker: unit -> unit Lwt.t ; - worker: unit Lwt.t ; + param: Request.param ; + pending: status Table.t ; + + queue: event Lwt_pipe.t ; + mutable events: event list Lwt.t ; + + canceler: Canceler.t ; + mutable worker: unit Lwt.t ; + } + + and status = { + peers: P2p.Peer_id.Set.t ; + next_request: float ; + delay: float ; } and event = @@ -298,41 +311,27 @@ end = struct | Notify_unrequested of P2p.Peer_id.t * key let request t p k = - t.push_to_worker (Request (p, k)) + assert (Lwt_pipe.push_now t.queue (Request (p, k))) let notify t p k = debug "push received %a from %a" Hash.pp k P2p.Peer_id.pp_short p ; - t.push_to_worker (Notify (p, k)) + assert (Lwt_pipe.push_now t.queue (Notify (p, k))) let notify_cancelation t k = debug "push cancelation %a" Hash.pp k ; - t.push_to_worker (Notify_cancelation k) + assert (Lwt_pipe.push_now t.queue (Notify_cancelation k)) let notify_invalid t p k = debug "push received invalid %a from %a" Hash.pp k P2p.Peer_id.pp_short p ; - t.push_to_worker (Notify_invalid (p, k)) + assert (Lwt_pipe.push_now t.queue (Notify_invalid (p, k))) let notify_duplicate t p k = debug "push received duplicate %a from %a" Hash.pp k P2p.Peer_id.pp_short p ; - t.push_to_worker (Notify_duplicate (p, k)) + assert (Lwt_pipe.push_now t.queue (Notify_duplicate (p, k))) let notify_unrequested t p k = debug "push received unrequested %a from %a" Hash.pp k P2p.Peer_id.pp_short p ; - t.push_to_worker (Notify_unrequested (p, k)) - - type worker_state = { - param: Request.param ; - pending: status Table.t ; - cancelation: unit -> unit Lwt.t ; - wait_events: unit -> event list Lwt.t ; - mutable events: event list Lwt.t ; - } - - and status = { - peers: P2p.Peer_id.Set.t ; - next_request: float ; - delay: float ; - } + assert (Lwt_pipe.push_now t.queue (Notify_unrequested (p, k))) let compute_timeout state = let next = @@ -412,16 +411,17 @@ end = struct Lwt.return_unit let rec worker_loop state = - let shutdown = state.cancelation () + let shutdown = Canceler.cancelation state.canceler and timeout = compute_timeout state in - Lwt.choose [ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () -> + Lwt.choose + [ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () -> if Lwt.state shutdown <> Lwt.Sleep then lwt_debug "terminating" >>= fun () -> Lwt.return_unit else if Lwt.state state.events <> Lwt.Sleep then let now = Unix.gettimeofday () in state.events >>= fun events -> - state.events <- state.wait_events () ; + state.events <- Lwt_pipe.pop_all state.queue ; Lwt_list.iter_s (process_event state now) events >>= fun () -> worker_loop state else @@ -465,19 +465,22 @@ end = struct worker_loop state let create param = - let cancelation, cancel_worker, _ = Lwt_utils.canceler () in - let push_to_worker, wait_events = Lwt_utils.queue () in - let pending = Table.create 17 in - let worker_state = - { cancelation ; wait_events ; pending ; param ; events = wait_events () } in - let worker = + let state = { + param ; + queue = Lwt_pipe.create () ; + pending = Table.create 17 ; + events = Lwt.return [] ; + canceler = Canceler.create () ; + worker = Lwt.return_unit ; + } in + state.worker <- Lwt_utils.worker "db_request_scheduler" - ~run:(fun () -> worker_loop worker_state) - ~cancel:cancel_worker in - { cancel_worker ; push_to_worker ; worker } + ~run:(fun () -> worker_loop state) + ~cancel:(fun () -> Canceler.cancel state.canceler) ; + state let shutdown s = - s.cancel_worker () >>= fun () -> + Canceler.cancel s.canceler >>= fun () -> s.worker end