diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index dc2ebd230..9b93542f9 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -162,7 +162,7 @@ module Raw_operation_hashes = struct Make_raw (struct type t = Block_hash.t * int - let name = "raw_operation_hash" + let name = "operation_hashes" let pp ppf (h, n) = Format.fprintf ppf "%a:%d" Block_hash.pp h n let encoding = let open Data_encoding in @@ -231,7 +231,7 @@ module Raw_operations = struct Make_raw (struct type t = Block_hash.t * int - let name = "raw_operation" + let name = "operations" let pp ppf (h, n) = Format.fprintf ppf "%a:%d" Block_hash.pp h n let encoding = let open Data_encoding in @@ -390,7 +390,9 @@ module P2p_reader = struct let handle_msg global_db state msg = let open Message in - let open Logging.Node.Worker in + let module Logging = + Logging.Make(struct let name = "node.distributed_db.p2p_reader" end) in + let open Logging in lwt_debug "Read message from %a: %a" P2p.Peer_id.pp_short state.gid Message.pp_json msg >>= fun () -> @@ -575,7 +577,7 @@ module P2p_reader = struct Raw_operations.Table.notify net_db.operations_db.table state.gid (block, ofs) (ops, path) >>= fun () -> - Lwt.return_unit + Lwt.return_unit let rec worker_loop global_db state = Lwt_utils.protect ~canceler:state.canceler begin fun () -> diff --git a/src/node/shell/distributed_db_functors.ml b/src/node/shell/distributed_db_functors.ml index c50635b5e..d6e24125e 100644 --- a/src/node/shell/distributed_db_functors.ml +++ b/src/node/shell/distributed_db_functors.ml @@ -238,7 +238,12 @@ module type REQUEST = sig end module Make_request_scheduler - (Hash : sig type t end) + (Hash : sig + type t + val name : string + val encoding : t Data_encoding.t + val pp : Format.formatter -> t -> unit + end) (Table : MEMORY_TABLE with type key := Hash.t) (Request : REQUEST with type key := Hash.t) : sig @@ -249,6 +254,8 @@ module Make_request_scheduler end = struct + include Logging.Make(struct let name = "node.distributed_db.scheduler." ^ Hash.name end) + type key = Hash.t type param = Request.param @@ -268,12 +275,20 @@ end = struct let request t p k = t.push_to_worker (Request (p, k)) let notify t p k = + debug "push received %a from %a" + Hash.pp k P2p.Peer_id.pp_short p ; t.push_to_worker (Notify (p, k)) let notify_invalid t p k = + debug "push received invalid %a from %a" + Hash.pp k P2p.Peer_id.pp_short p ; t.push_to_worker (Notify_invalid (p, k)) let notify_duplicate t p k = + debug "push received duplicate %a from %a" + Hash.pp k P2p.Peer_id.pp_short p ; t.push_to_worker (Notify_duplicate (p, k)) let notify_unrequested t p k = + debug "push received unrequested %a from %a" + Hash.pp k P2p.Peer_id.pp_short p ; t.push_to_worker (Notify_unrequested (p, k)) type worker_state = { @@ -297,13 +312,22 @@ end = struct state.pending infinity in let now = Unix.gettimeofday () in let delay = next -. now in - if delay <= 0. then Lwt.return_unit else Lwt_unix.sleep delay + if delay <= 0. then Lwt.return_unit else begin + (* lwt_debug "waiting at least %.2fs" delay >>= fun () -> *) + Lwt_unix.sleep delay + end + + let may_pp_peer ppf = function + | None -> () + | Some peer -> P2p.Peer_id.pp_short ppf peer (* TODO should depend on the ressource kind... *) let initial_delay = 0.1 let process_event state now = function | Request (peer, key) -> begin + lwt_debug "registering request %a from %a" + Hash.pp key may_pp_peer peer >>= fun () -> try let data = Table.find state.pending key in let peers = @@ -315,6 +339,8 @@ end = struct next_request = min data.next_request (now +. initial_delay) ; peers ; } ; + lwt_debug "registering request %a from %a -> replaced" + Hash.pp key may_pp_peer peer >>= fun () -> Lwt.return_unit with Not_found -> let peers = @@ -326,14 +352,28 @@ end = struct next_request = now ; delay = initial_delay ; } ; + lwt_debug "registering request %a from %a -> added" + Hash.pp key may_pp_peer peer >>= fun () -> Lwt.return_unit end - | Notify (_gid, key) -> + | Notify (peer, key) -> Table.remove state.pending key ; + lwt_debug "received %a from %a" + Hash.pp key P2p.Peer_id.pp_short peer >>= fun () -> Lwt.return_unit - | Notify_invalid _ - | Notify_unrequested _ - | Notify_duplicate _ -> + | Notify_invalid (peer, key) -> + lwt_debug "received invalid %a from %a" + Hash.pp key P2p.Peer_id.pp_short peer >>= fun () -> + (* TODO *) + Lwt.return_unit + | Notify_unrequested (peer, key) -> + lwt_debug "received unrequested %a from %a" + Hash.pp key P2p.Peer_id.pp_short peer >>= fun () -> + (* TODO *) + Lwt.return_unit + | Notify_duplicate (peer, key) -> + lwt_debug "received duplicate %a from %a" + Hash.pp key P2p.Peer_id.pp_short peer >>= fun () -> (* TODO *) Lwt.return_unit @@ -342,6 +382,7 @@ end = struct and timeout = compute_timeout state in Lwt.choose [ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () -> if Lwt.state shutdown <> Lwt.Sleep then + lwt_debug "terminating" >>= fun () -> Lwt.return_unit else if Lwt.state state.events <> Lwt.Sleep then let now = Unix.gettimeofday () in @@ -350,6 +391,7 @@ end = struct Lwt_list.iter_s (process_event state now) events >>= fun () -> worker_loop state else + lwt_debug "timeout" >>= fun () -> let now = Unix.gettimeofday () in let active_peers = Request.active state.param in let requests = @@ -379,6 +421,13 @@ end = struct P2p_types.Peer_id.Map.add requested_peer requests acc) state.pending P2p_types.Peer_id.Map.empty in P2p_types.Peer_id.Map.iter (Request.send state.param) requests ; + P2p_types.Peer_id.Map.fold begin fun peer request acc -> + acc >>= fun () -> + Lwt_list.iter_s (fun key -> + lwt_debug "requested %a from %a" + Hash.pp key P2p.Peer_id.pp_short peer) + request + end requests Lwt.return_unit >>= fun () -> worker_loop state let create param = diff --git a/src/node/shell/distributed_db_functors.mli b/src/node/shell/distributed_db_functors.mli index 083f4ce20..d11625c85 100644 --- a/src/node/shell/distributed_db_functors.mli +++ b/src/node/shell/distributed_db_functors.mli @@ -101,7 +101,12 @@ module type REQUEST = sig end module Make_request_scheduler - (Hash : sig type t end) + (Hash : sig + type t + val name : string + val encoding : t Data_encoding.t + val pp : Format.formatter -> t -> unit + end) (Table : MEMORY_TABLE with type key := Hash.t) (Request : REQUEST with type key := Hash.t) : sig