Node: cleanup logging of shell workers
This commit is contained in:
parent
40a26759fc
commit
8eee1c7a9c
@ -12,7 +12,7 @@ open Chain_validator_worker_state
|
||||
module Name = struct
|
||||
type t = Chain_id.t
|
||||
let encoding = Chain_id.encoding
|
||||
let base = [ "chain_validator" ]
|
||||
let base = [ "validator.chain" ]
|
||||
let pp = Chain_id.pp_short
|
||||
end
|
||||
|
||||
|
@ -15,7 +15,7 @@ module Name = struct
|
||||
type t = Chain_id.t * P2p_peer.Id.t
|
||||
let encoding =
|
||||
Data_encoding.tup2 Chain_id.encoding P2p_peer.Id.encoding
|
||||
let base = [ "peer_validator" ]
|
||||
let base = [ "validator.peer" ]
|
||||
let pp ppf (chain, peer) =
|
||||
Format.fprintf ppf "%a:%a"
|
||||
Chain_id.pp_short chain P2p_peer.Id.pp_short peer
|
||||
|
@ -49,8 +49,6 @@ module Make
|
||||
|
||||
let base_name = String.concat "." Name.base
|
||||
|
||||
module Logger = Logging.Make(struct let name = base_name end)
|
||||
|
||||
type message = Message: 'a Request.t * 'a tzresult Lwt.u option -> message
|
||||
|
||||
type 'a queue and bounded and infinite
|
||||
@ -80,6 +78,7 @@ module Make
|
||||
mutable (* only for init *) state : Types.state option ;
|
||||
buffer : 'kind buffer ;
|
||||
event_log : (Logging.level * Event.t Ring.t) list ;
|
||||
logger : (module Logging.LOG) ;
|
||||
canceler : Lwt_canceler.t ;
|
||||
name : Name.t ;
|
||||
id : int ;
|
||||
@ -206,6 +205,7 @@ module Make
|
||||
let canceler { canceler } = canceler
|
||||
|
||||
let log_event w evt =
|
||||
let (module Logger) = w.logger in
|
||||
let level = Event.level evt in
|
||||
let log =
|
||||
match level with
|
||||
@ -215,10 +215,9 @@ module Make
|
||||
| Warning -> Logger.lwt_warn
|
||||
| Error -> Logger.lwt_log_error
|
||||
| Fatal -> Logger.lwt_fatal_error in
|
||||
log "[%a] %a" Name.pp w.name Event.pp evt >>= fun () ->
|
||||
begin if level >= w.limits.backlog_level then
|
||||
Ring.add (List.assoc level w.event_log) evt
|
||||
end ;
|
||||
log "@[<v 0>%a@]" Event.pp evt >>= fun () ->
|
||||
if level >= w.limits.backlog_level then
|
||||
Ring.add (List.assoc level w.event_log) evt ;
|
||||
Lwt.return_unit
|
||||
|
||||
let record_event w evt =
|
||||
@ -248,17 +247,18 @@ module Make
|
||||
|
||||
let worker_loop (type kind) handlers (w : kind t) =
|
||||
let (module Handlers : HANDLERS with type self = kind t) = handlers in
|
||||
let (module Logger) = w.logger in
|
||||
let do_close errs =
|
||||
let t0 = match w.status with
|
||||
| Running t0 -> t0
|
||||
| _ -> assert false in
|
||||
w.status <- Closing (t0, Time.now ()) ;
|
||||
Handlers.on_close w >>= fun () ->
|
||||
close w ;
|
||||
Lwt_canceler.cancel w.canceler >>= fun () ->
|
||||
w.status <- Closed (t0, Time.now (), errs) ;
|
||||
w.state <- None ;
|
||||
Hashtbl.remove w.table.instances w.name ;
|
||||
Handlers.on_close w >>= fun () ->
|
||||
w.state <- None ;
|
||||
Hashtbl.add w.table.zombies w.id w ;
|
||||
Lwt.ignore_result
|
||||
(Lwt_unix.sleep w.limits.zombie_memory >>= fun () ->
|
||||
@ -277,8 +277,7 @@ module Make
|
||||
let current_request = Request.view request in
|
||||
let treated = Time.now () in
|
||||
w.current_request <- Some (pushed, treated, current_request) ;
|
||||
Logger.debug "[%a] request: @[%a@]"
|
||||
Name.pp w.name
|
||||
Logger.debug "@[<v 2>Request:@,%a@]"
|
||||
Request.pp current_request ;
|
||||
match u with
|
||||
| None ->
|
||||
@ -301,9 +300,7 @@ module Make
|
||||
| Ok () ->
|
||||
loop ()
|
||||
| Error [Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] ->
|
||||
Logger.lwt_log_info
|
||||
"[%a] worker terminated"
|
||||
Name.pp w.name >>= fun () ->
|
||||
Logger.lwt_log_notice "Worker terminated" >>= fun () ->
|
||||
do_close None
|
||||
| Error errs ->
|
||||
begin match w.current_request with
|
||||
@ -318,9 +315,8 @@ module Make
|
||||
loop ()
|
||||
| Error errs ->
|
||||
Logger.lwt_log_error
|
||||
"@[[%a] worker crashed:@ %a@]"
|
||||
Name.pp w.name
|
||||
pp_print_error errs >>= fun () ->
|
||||
"@[<v 0>Worker crashed:@,%a@]"
|
||||
(Format.pp_print_list Error_monad.pp) errs >>= fun () ->
|
||||
do_close (Some errs) in
|
||||
loop ()
|
||||
|
||||
@ -331,14 +327,17 @@ module Make
|
||||
(module HANDLERS with type self = kind t) ->
|
||||
kind t Lwt.t
|
||||
= fun table ?timeout limits name parameters (module Handlers) ->
|
||||
let name_s =
|
||||
Format.asprintf "%a" Name.pp name in
|
||||
let full_name =
|
||||
if name_s = "" then base_name else Format.asprintf "%s(%s)" base_name name_s in
|
||||
let id =
|
||||
table.last_id <- table.last_id + 1 ;
|
||||
table.last_id in
|
||||
let id_name =
|
||||
if name_s = "" then base_name else Format.asprintf "%s(%d)" base_name id in
|
||||
if Hashtbl.mem table.instances name then
|
||||
invalid_arg
|
||||
(Format.asprintf
|
||||
"Lwt_worker.launch: \
|
||||
duplicate worker %s[%a]" base_name Name.pp name) ;
|
||||
Logger.lwt_log_info
|
||||
"[%a] worker started"
|
||||
Name.pp name >>= fun () ->
|
||||
invalid_arg (Format.asprintf "Lwt_worker.launch: duplicate worker %s" full_name) ;
|
||||
let canceler = Lwt_canceler.create () in
|
||||
let buffer : kind buffer =
|
||||
match table.buffer_kind with
|
||||
@ -352,38 +351,53 @@ module Make
|
||||
let levels =
|
||||
[ Logging.Debug ; Info ; Notice ; Warning ; Error ; Fatal ] in
|
||||
List.map (fun l -> l, Ring.create limits.backlog_size) levels in
|
||||
let module Logger = Logging.Make(struct let name = id_name end) in
|
||||
let w = { limits ; parameters ; name ; canceler ;
|
||||
table ; buffer ;
|
||||
state = None ; id = (table.last_id <- table.last_id + 1; table.last_id) ;
|
||||
table ; buffer ; logger = (module Logger) ;
|
||||
state = None ; id ;
|
||||
worker = Lwt.return_unit ;
|
||||
event_log ; timeout ;
|
||||
current_request = None ;
|
||||
status = Launching (Time.now ())} in
|
||||
begin
|
||||
if id_name = base_name then
|
||||
Logger.lwt_log_notice "Worker started"
|
||||
else
|
||||
Logger.lwt_log_notice "Worker started for %s" name_s
|
||||
end >>= fun () ->
|
||||
Hashtbl.add table.instances name w ;
|
||||
Handlers.on_launch w name parameters >>= fun state ->
|
||||
w.status <- Running (Time.now ()) ;
|
||||
w.state <- Some state ;
|
||||
w.worker <-
|
||||
Lwt_utils.worker
|
||||
(Format.asprintf "%s[%a]"
|
||||
base_name
|
||||
Name.pp w.name)
|
||||
full_name
|
||||
~run:(fun () -> worker_loop (module Handlers) w)
|
||||
~cancel:(fun () -> Lwt_canceler.cancel w.canceler) ;
|
||||
Lwt.return w
|
||||
|
||||
let shutdown w =
|
||||
Logger.lwt_debug "triggering shutdown" >>= fun () ->
|
||||
let (module Logger) = w.logger in
|
||||
Logger.lwt_debug "Triggering shutdown" >>= fun () ->
|
||||
Lwt_canceler.cancel w.canceler >>= fun () ->
|
||||
w.worker
|
||||
|
||||
let state w =
|
||||
match w.state with
|
||||
| None ->
|
||||
match w.state, w.status with
|
||||
| None, Launching _ ->
|
||||
invalid_arg
|
||||
"Lwt_worker.state: \
|
||||
(Format.asprintf
|
||||
"Lwt_worker.state (%s[%a]): \
|
||||
state called before worker was initialized"
|
||||
| Some state -> state
|
||||
base_name Name.pp w.name)
|
||||
| None, (Closing _ | Closed _) ->
|
||||
invalid_arg
|
||||
(Format.asprintf
|
||||
"Lwt_worker.state (%s[%a]): \
|
||||
state called after worker was terminated"
|
||||
base_name Name.pp w.name)
|
||||
| None, _ -> assert false
|
||||
| Some state, _ -> state
|
||||
|
||||
let last_events w =
|
||||
List.map
|
||||
|
@ -71,18 +71,18 @@ module Event = struct
|
||||
| Debug msg -> Format.fprintf ppf "%s" msg
|
||||
| Validation_success (req, { pushed ; treated ; completed }) ->
|
||||
Format.fprintf ppf
|
||||
"@[<v 2>Block %a succesfully validated@,\
|
||||
"@[<v 0>Block %a succesfully validated@,\
|
||||
Pushed: %a, Treated: %a, Completed: %a@]"
|
||||
Block_hash.pp req.block
|
||||
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
|
||||
| Validation_failure (req, { pushed ; treated ; completed }, errs)->
|
||||
Format.fprintf ppf
|
||||
"@[<v 2>Validation of block %a failed@,\
|
||||
Pushed: %a, Treated: %a, Completed: %a@,\
|
||||
Error: %a@]"
|
||||
"@[<v 0>Validation of block %a failed@,\
|
||||
Pushed: %a, Treated: %a, Failed: %a@,\
|
||||
%a@]"
|
||||
Block_hash.pp req.block
|
||||
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
|
||||
Error_monad.pp_print_error errs
|
||||
(Format.pp_print_list Error_monad.pp) errs
|
||||
end
|
||||
|
||||
module Worker_state = struct
|
||||
|
@ -62,7 +62,7 @@ module Event = struct
|
||||
|
||||
let pp ppf = function
|
||||
| Processed_block req ->
|
||||
Format.fprintf ppf "@[<v 2>" ;
|
||||
Format.fprintf ppf "@[<v 0>" ;
|
||||
begin match req.update with
|
||||
| Ignored_head ->
|
||||
Format.fprintf ppf
|
||||
@ -82,8 +82,8 @@ module Event = struct
|
||||
Time.pp_hum req.request_status.treated
|
||||
Time.pp_hum req.request_status.completed
|
||||
| Could_not_switch_testchain err ->
|
||||
Format.fprintf ppf "@[<v 2>Error while switching test chain:@ %a@]"
|
||||
Error_monad.pp_print_error err
|
||||
Format.fprintf ppf "@[<v 0>Error while switching test chain:@ %a@]"
|
||||
(Format.pp_print_list Error_monad.pp) err
|
||||
|
||||
end
|
||||
|
||||
|
@ -72,18 +72,18 @@ module Event = struct
|
||||
| Debug msg -> Format.fprintf ppf "%s" msg
|
||||
| Request (view, { pushed ; treated ; completed }, None) ->
|
||||
Format.fprintf ppf
|
||||
"@[<v 2>%a@,\
|
||||
"@[<v 0>%a@,\
|
||||
Pushed: %a, Treated: %a, Completed: %a@]"
|
||||
Request.pp view
|
||||
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
|
||||
| Request (view, { pushed ; treated ; completed }, Some errors) ->
|
||||
Format.fprintf ppf
|
||||
"@[<v 2>%a@,\
|
||||
"@[<v 0>%a@,\
|
||||
Pushed: %a, Treated: %a, Failed: %a@,\
|
||||
Error: %a@]"
|
||||
%a@]"
|
||||
Request.pp view
|
||||
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
|
||||
Error_monad.pp_print_error errors
|
||||
(Format.pp_print_list Error_monad.pp) errors
|
||||
end
|
||||
|
||||
module Worker_state = struct
|
||||
|
@ -120,18 +120,18 @@ module Event = struct
|
||||
| Debug msg -> Format.fprintf ppf "%s" msg
|
||||
| Request (view, { pushed ; treated ; completed }, None) ->
|
||||
Format.fprintf ppf
|
||||
"@[<v 2>%a@,\
|
||||
"@[<v 0>%a@,\
|
||||
Pushed: %a, Treated: %a, Completed: %a@]"
|
||||
Request.pp view
|
||||
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
|
||||
| Request (view, { pushed ; treated ; completed }, Some errors) ->
|
||||
Format.fprintf ppf
|
||||
"@[<v 2>%a@,\
|
||||
"@[<v 0>%a@,\
|
||||
Pushed: %a, Treated: %a, Failed: %a@,\
|
||||
Error: %a@]"
|
||||
%a@]"
|
||||
Request.pp view
|
||||
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
|
||||
Error_monad.pp_print_error errors
|
||||
(Format.pp_print_list Error_monad.pp) errors
|
||||
end
|
||||
|
||||
module Worker_state = struct
|
||||
|
Loading…
Reference in New Issue
Block a user