Distributed_db: proper logging
New category: - node.distributed_db.p2p_reader log all incoming message, from any peer (debug) - node.distributed_db.scheduler.* log the request scheduler of the given ressources (notice/debug), where '*' might be: - Operation_hash (individual operation) - Block_hash (block header) - operation_hashes (aggregated operation_hashes of a block) - operations (aggregated operations of a block) - Protocol_hash (protocol)
This commit is contained in:
parent
616ca33498
commit
06e4ec4d9b
@ -162,7 +162,7 @@ module Raw_operation_hashes = struct
|
||||
Make_raw
|
||||
(struct
|
||||
type t = Block_hash.t * int
|
||||
let name = "raw_operation_hash"
|
||||
let name = "operation_hashes"
|
||||
let pp ppf (h, n) = Format.fprintf ppf "%a:%d" Block_hash.pp h n
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
@ -231,7 +231,7 @@ module Raw_operations = struct
|
||||
Make_raw
|
||||
(struct
|
||||
type t = Block_hash.t * int
|
||||
let name = "raw_operation"
|
||||
let name = "operations"
|
||||
let pp ppf (h, n) = Format.fprintf ppf "%a:%d" Block_hash.pp h n
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
@ -390,7 +390,9 @@ module P2p_reader = struct
|
||||
let handle_msg global_db state msg =
|
||||
|
||||
let open Message in
|
||||
let open Logging.Node.Worker in
|
||||
let module Logging =
|
||||
Logging.Make(struct let name = "node.distributed_db.p2p_reader" end) in
|
||||
let open Logging in
|
||||
|
||||
lwt_debug "Read message from %a: %a"
|
||||
P2p.Peer_id.pp_short state.gid Message.pp_json msg >>= fun () ->
|
||||
|
@ -238,7 +238,12 @@ module type REQUEST = sig
|
||||
end
|
||||
|
||||
module Make_request_scheduler
|
||||
(Hash : sig type t end)
|
||||
(Hash : sig
|
||||
type t
|
||||
val name : string
|
||||
val encoding : t Data_encoding.t
|
||||
val pp : Format.formatter -> t -> unit
|
||||
end)
|
||||
(Table : MEMORY_TABLE with type key := Hash.t)
|
||||
(Request : REQUEST with type key := Hash.t) : sig
|
||||
|
||||
@ -249,6 +254,8 @@ module Make_request_scheduler
|
||||
|
||||
end = struct
|
||||
|
||||
include Logging.Make(struct let name = "node.distributed_db.scheduler." ^ Hash.name end)
|
||||
|
||||
type key = Hash.t
|
||||
type param = Request.param
|
||||
|
||||
@ -268,12 +275,20 @@ end = struct
|
||||
let request t p k =
|
||||
t.push_to_worker (Request (p, k))
|
||||
let notify t p k =
|
||||
debug "push received %a from %a"
|
||||
Hash.pp k P2p.Peer_id.pp_short p ;
|
||||
t.push_to_worker (Notify (p, k))
|
||||
let notify_invalid t p k =
|
||||
debug "push received invalid %a from %a"
|
||||
Hash.pp k P2p.Peer_id.pp_short p ;
|
||||
t.push_to_worker (Notify_invalid (p, k))
|
||||
let notify_duplicate t p k =
|
||||
debug "push received duplicate %a from %a"
|
||||
Hash.pp k P2p.Peer_id.pp_short p ;
|
||||
t.push_to_worker (Notify_duplicate (p, k))
|
||||
let notify_unrequested t p k =
|
||||
debug "push received unrequested %a from %a"
|
||||
Hash.pp k P2p.Peer_id.pp_short p ;
|
||||
t.push_to_worker (Notify_unrequested (p, k))
|
||||
|
||||
type worker_state = {
|
||||
@ -297,13 +312,22 @@ end = struct
|
||||
state.pending infinity in
|
||||
let now = Unix.gettimeofday () in
|
||||
let delay = next -. now in
|
||||
if delay <= 0. then Lwt.return_unit else Lwt_unix.sleep delay
|
||||
if delay <= 0. then Lwt.return_unit else begin
|
||||
(* lwt_debug "waiting at least %.2fs" delay >>= fun () -> *)
|
||||
Lwt_unix.sleep delay
|
||||
end
|
||||
|
||||
let may_pp_peer ppf = function
|
||||
| None -> ()
|
||||
| Some peer -> P2p.Peer_id.pp_short ppf peer
|
||||
|
||||
(* TODO should depend on the ressource kind... *)
|
||||
let initial_delay = 0.1
|
||||
|
||||
let process_event state now = function
|
||||
| Request (peer, key) -> begin
|
||||
lwt_debug "registering request %a from %a"
|
||||
Hash.pp key may_pp_peer peer >>= fun () ->
|
||||
try
|
||||
let data = Table.find state.pending key in
|
||||
let peers =
|
||||
@ -315,6 +339,8 @@ end = struct
|
||||
next_request = min data.next_request (now +. initial_delay) ;
|
||||
peers ;
|
||||
} ;
|
||||
lwt_debug "registering request %a from %a -> replaced"
|
||||
Hash.pp key may_pp_peer peer >>= fun () ->
|
||||
Lwt.return_unit
|
||||
with Not_found ->
|
||||
let peers =
|
||||
@ -326,14 +352,28 @@ end = struct
|
||||
next_request = now ;
|
||||
delay = initial_delay ;
|
||||
} ;
|
||||
lwt_debug "registering request %a from %a -> added"
|
||||
Hash.pp key may_pp_peer peer >>= fun () ->
|
||||
Lwt.return_unit
|
||||
end
|
||||
| Notify (_gid, key) ->
|
||||
| Notify (peer, key) ->
|
||||
Table.remove state.pending key ;
|
||||
lwt_debug "received %a from %a"
|
||||
Hash.pp key P2p.Peer_id.pp_short peer >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Notify_invalid _
|
||||
| Notify_unrequested _
|
||||
| Notify_duplicate _ ->
|
||||
| Notify_invalid (peer, key) ->
|
||||
lwt_debug "received invalid %a from %a"
|
||||
Hash.pp key P2p.Peer_id.pp_short peer >>= fun () ->
|
||||
(* TODO *)
|
||||
Lwt.return_unit
|
||||
| Notify_unrequested (peer, key) ->
|
||||
lwt_debug "received unrequested %a from %a"
|
||||
Hash.pp key P2p.Peer_id.pp_short peer >>= fun () ->
|
||||
(* TODO *)
|
||||
Lwt.return_unit
|
||||
| Notify_duplicate (peer, key) ->
|
||||
lwt_debug "received duplicate %a from %a"
|
||||
Hash.pp key P2p.Peer_id.pp_short peer >>= fun () ->
|
||||
(* TODO *)
|
||||
Lwt.return_unit
|
||||
|
||||
@ -342,6 +382,7 @@ end = struct
|
||||
and timeout = compute_timeout state in
|
||||
Lwt.choose [ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () ->
|
||||
if Lwt.state shutdown <> Lwt.Sleep then
|
||||
lwt_debug "terminating" >>= fun () ->
|
||||
Lwt.return_unit
|
||||
else if Lwt.state state.events <> Lwt.Sleep then
|
||||
let now = Unix.gettimeofday () in
|
||||
@ -350,6 +391,7 @@ end = struct
|
||||
Lwt_list.iter_s (process_event state now) events >>= fun () ->
|
||||
worker_loop state
|
||||
else
|
||||
lwt_debug "timeout" >>= fun () ->
|
||||
let now = Unix.gettimeofday () in
|
||||
let active_peers = Request.active state.param in
|
||||
let requests =
|
||||
@ -379,6 +421,13 @@ end = struct
|
||||
P2p_types.Peer_id.Map.add requested_peer requests acc)
|
||||
state.pending P2p_types.Peer_id.Map.empty in
|
||||
P2p_types.Peer_id.Map.iter (Request.send state.param) requests ;
|
||||
P2p_types.Peer_id.Map.fold begin fun peer request acc ->
|
||||
acc >>= fun () ->
|
||||
Lwt_list.iter_s (fun key ->
|
||||
lwt_debug "requested %a from %a"
|
||||
Hash.pp key P2p.Peer_id.pp_short peer)
|
||||
request
|
||||
end requests Lwt.return_unit >>= fun () ->
|
||||
worker_loop state
|
||||
|
||||
let create param =
|
||||
|
@ -101,7 +101,12 @@ module type REQUEST = sig
|
||||
end
|
||||
|
||||
module Make_request_scheduler
|
||||
(Hash : sig type t end)
|
||||
(Hash : sig
|
||||
type t
|
||||
val name : string
|
||||
val encoding : t Data_encoding.t
|
||||
val pp : Format.formatter -> t -> unit
|
||||
end)
|
||||
(Table : MEMORY_TABLE with type key := Hash.t)
|
||||
(Request : REQUEST with type key := Hash.t) : sig
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user