diff --git a/src/lib_shell/distributed_db_functors.ml b/src/lib_shell/distributed_db_functors.ml index 230bf9f8b..8cab19058 100644 --- a/src/lib_shell/distributed_db_functors.ml +++ b/src/lib_shell/distributed_db_functors.ml @@ -469,57 +469,57 @@ end = struct let worker_loop state = let shutdown = Lwt_canceler.cancelation state.canceler in let rec loop state = - let timeout = compute_timeout state in - Lwt.choose - [ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () -> - if Lwt.state shutdown <> Lwt.Sleep then - lwt_debug "terminating" >>= fun () -> - Lwt.return_unit - else if Lwt.state state.events <> Lwt.Sleep then - let now = Unix.gettimeofday () in - state.events >>= fun events -> - state.events <- Lwt_pipe.pop_all state.queue ; - Lwt_list.iter_s (process_event state now) events >>= fun () -> - loop state - else - lwt_debug "timeout" >>= fun () -> - let now = Unix.gettimeofday () in - let active_peers = Request.active state.param in - let requests = - Table.fold - (fun key { peers ; next_request ; delay } acc -> - if next_request > now +. 0.2 then - acc - else - let remaining_peers = - P2p_peer.Set.inter peers active_peers in - if P2p_peer.Set.is_empty remaining_peers && - not (P2p_peer.Set.is_empty peers) then - ( Table.remove state.pending key ; acc ) + let timeout = compute_timeout state in + Lwt.choose + [ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () -> + if Lwt.state shutdown <> Lwt.Sleep then + lwt_debug "terminating" >>= fun () -> + Lwt.return_unit + else if Lwt.state state.events <> Lwt.Sleep then + let now = Unix.gettimeofday () in + state.events >>= fun events -> + state.events <- Lwt_pipe.pop_all state.queue ; + Lwt_list.iter_s (process_event state now) events >>= fun () -> + loop state + else + lwt_debug "timeout" >>= fun () -> + let now = Unix.gettimeofday () in + let active_peers = Request.active state.param in + let requests = + Table.fold + (fun key { peers ; next_request ; delay } acc -> + if next_request > now +. 0.2 then + acc else - let requested_peer = - P2p_peer.Id.Set.random_elt - (if P2p_peer.Set.is_empty remaining_peers - then active_peers - else remaining_peers) in - let next = { peers = remaining_peers ; - next_request = now +. delay ; - delay = delay *. 1.5 } in - Table.replace state.pending key next ; - let requests = - try key :: P2p_peer.Map.find requested_peer acc - with Not_found -> [key] in - P2p_peer.Map.add requested_peer requests acc) - state.pending P2p_peer.Map.empty in - P2p_peer.Map.iter (Request.send state.param) requests ; - P2p_peer.Map.fold begin fun peer request acc -> - acc >>= fun () -> - Lwt_list.iter_s (fun key -> - lwt_debug "requested %a from %a" - Hash.pp key P2p_peer.Id.pp_short peer) - request - end requests Lwt.return_unit >>= fun () -> - loop state + let remaining_peers = + P2p_peer.Set.inter peers active_peers in + if P2p_peer.Set.is_empty remaining_peers && + not (P2p_peer.Set.is_empty peers) then + ( Table.remove state.pending key ; acc ) + else + let requested_peer = + P2p_peer.Id.Set.random_elt + (if P2p_peer.Set.is_empty remaining_peers + then active_peers + else remaining_peers) in + let next = { peers = remaining_peers ; + next_request = now +. delay ; + delay = delay *. 1.5 } in + Table.replace state.pending key next ; + let requests = + try key :: P2p_peer.Map.find requested_peer acc + with Not_found -> [key] in + P2p_peer.Map.add requested_peer requests acc) + state.pending P2p_peer.Map.empty in + P2p_peer.Map.iter (Request.send state.param) requests ; + P2p_peer.Map.fold begin fun peer request acc -> + acc >>= fun () -> + Lwt_list.iter_s (fun key -> + lwt_debug "requested %a from %a" + Hash.pp key P2p_peer.Id.pp_short peer) + request + end requests Lwt.return_unit >>= fun () -> + loop state in loop state