Distributed_db: remove the last "lol-object".

This commit is contained in:
Grégoire Henry 2017-11-08 12:06:20 +01:00 committed by Benjamin Canou
parent 41c82d7481
commit f40c418d0f

View File

@ -7,6 +7,8 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
module Canceler = Lwt_utils.Canceler
module type DISTRIBUTED_DB = sig module type DISTRIBUTED_DB = sig
type t type t
@ -284,9 +286,20 @@ end = struct
type param = Request.param type param = Request.param
type t = { type t = {
push_to_worker: event -> unit ; param: Request.param ;
cancel_worker: unit -> unit Lwt.t ; pending: status Table.t ;
worker: unit Lwt.t ;
queue: event Lwt_pipe.t ;
mutable events: event list Lwt.t ;
canceler: Canceler.t ;
mutable worker: unit Lwt.t ;
}
and status = {
peers: P2p.Peer_id.Set.t ;
next_request: float ;
delay: float ;
} }
and event = and event =
@ -298,41 +311,27 @@ end = struct
| Notify_unrequested of P2p.Peer_id.t * key | Notify_unrequested of P2p.Peer_id.t * key
let request t p k = let request t p k =
t.push_to_worker (Request (p, k)) assert (Lwt_pipe.push_now t.queue (Request (p, k)))
let notify t p k = let notify t p k =
debug "push received %a from %a" debug "push received %a from %a"
Hash.pp k P2p.Peer_id.pp_short p ; Hash.pp k P2p.Peer_id.pp_short p ;
t.push_to_worker (Notify (p, k)) assert (Lwt_pipe.push_now t.queue (Notify (p, k)))
let notify_cancelation t k = let notify_cancelation t k =
debug "push cancelation %a" debug "push cancelation %a"
Hash.pp k ; Hash.pp k ;
t.push_to_worker (Notify_cancelation k) assert (Lwt_pipe.push_now t.queue (Notify_cancelation k))
let notify_invalid t p k = let notify_invalid t p k =
debug "push received invalid %a from %a" debug "push received invalid %a from %a"
Hash.pp k P2p.Peer_id.pp_short p ; Hash.pp k P2p.Peer_id.pp_short p ;
t.push_to_worker (Notify_invalid (p, k)) assert (Lwt_pipe.push_now t.queue (Notify_invalid (p, k)))
let notify_duplicate t p k = let notify_duplicate t p k =
debug "push received duplicate %a from %a" debug "push received duplicate %a from %a"
Hash.pp k P2p.Peer_id.pp_short p ; Hash.pp k P2p.Peer_id.pp_short p ;
t.push_to_worker (Notify_duplicate (p, k)) assert (Lwt_pipe.push_now t.queue (Notify_duplicate (p, k)))
let notify_unrequested t p k = let notify_unrequested t p k =
debug "push received unrequested %a from %a" debug "push received unrequested %a from %a"
Hash.pp k P2p.Peer_id.pp_short p ; Hash.pp k P2p.Peer_id.pp_short p ;
t.push_to_worker (Notify_unrequested (p, k)) assert (Lwt_pipe.push_now t.queue (Notify_unrequested (p, k)))
type worker_state = {
param: Request.param ;
pending: status Table.t ;
cancelation: unit -> unit Lwt.t ;
wait_events: unit -> event list Lwt.t ;
mutable events: event list Lwt.t ;
}
and status = {
peers: P2p.Peer_id.Set.t ;
next_request: float ;
delay: float ;
}
let compute_timeout state = let compute_timeout state =
let next = let next =
@ -412,16 +411,17 @@ end = struct
Lwt.return_unit Lwt.return_unit
let rec worker_loop state = let rec worker_loop state =
let shutdown = state.cancelation () let shutdown = Canceler.cancelation state.canceler
and timeout = compute_timeout state in and timeout = compute_timeout state in
Lwt.choose [ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () -> Lwt.choose
[ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () ->
if Lwt.state shutdown <> Lwt.Sleep then if Lwt.state shutdown <> Lwt.Sleep then
lwt_debug "terminating" >>= fun () -> lwt_debug "terminating" >>= fun () ->
Lwt.return_unit Lwt.return_unit
else if Lwt.state state.events <> Lwt.Sleep then else if Lwt.state state.events <> Lwt.Sleep then
let now = Unix.gettimeofday () in let now = Unix.gettimeofday () in
state.events >>= fun events -> state.events >>= fun events ->
state.events <- state.wait_events () ; state.events <- Lwt_pipe.pop_all state.queue ;
Lwt_list.iter_s (process_event state now) events >>= fun () -> Lwt_list.iter_s (process_event state now) events >>= fun () ->
worker_loop state worker_loop state
else else
@ -465,19 +465,22 @@ end = struct
worker_loop state worker_loop state
let create param = let create param =
let cancelation, cancel_worker, _ = Lwt_utils.canceler () in let state = {
let push_to_worker, wait_events = Lwt_utils.queue () in param ;
let pending = Table.create 17 in queue = Lwt_pipe.create () ;
let worker_state = pending = Table.create 17 ;
{ cancelation ; wait_events ; pending ; param ; events = wait_events () } in events = Lwt.return [] ;
let worker = canceler = Canceler.create () ;
worker = Lwt.return_unit ;
} in
state.worker <-
Lwt_utils.worker "db_request_scheduler" Lwt_utils.worker "db_request_scheduler"
~run:(fun () -> worker_loop worker_state) ~run:(fun () -> worker_loop state)
~cancel:cancel_worker in ~cancel:(fun () -> Canceler.cancel state.canceler) ;
{ cancel_worker ; push_to_worker ; worker } state
let shutdown s = let shutdown s =
s.cancel_worker () >>= fun () -> Canceler.cancel s.canceler >>= fun () ->
s.worker s.worker
end end