diff --git a/src/lib_node_shell/worker.ml b/src/lib_node_shell/worker.ml new file mode 100644 index 000000000..d4d78e285 --- /dev/null +++ b/src/lib_node_shell/worker.ml @@ -0,0 +1,414 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module type NAME = sig + val base : string list + type t + val encoding : t Data_encoding.t + val pp : Format.formatter -> t -> unit +end + +module type EVENT = sig + type t + + val level : t -> Logging.level + val encoding : error list Data_encoding.t -> t Data_encoding.t + val pp : Format.formatter -> t -> unit +end + +module type REQUEST = sig + type 'a t + type view + + val view : 'a t -> view + val encoding : view Data_encoding.t + val pp : Format.formatter -> view -> unit +end + +module type TYPES = sig + type state + type parameters + type view + + val view : state -> parameters -> view + val encoding : view Data_encoding.t + val pp : Format.formatter -> view -> unit +end + +module Make + (Name : NAME) + (Event : EVENT) + (Request : REQUEST) + (Types : TYPES) = struct + + let base_name = String.concat "." Name.base + + module Logger = Logging.Make(struct let name = base_name end) + + type message = Message: 'a Request.t * 'a tzresult Lwt.u option -> message + + type 'a queue and bounded and infinite + type dropbox + + type _ buffer_kind = + | Queue : infinite queue buffer_kind + | Bounded : { size : int } -> bounded queue buffer_kind + | Dropbox : + { merge : (dropbox t -> + any_request -> + any_request option -> + any_request option) } + -> dropbox buffer_kind + and any_request = Any_request : _ Request.t -> any_request + + and _ buffer = + | Queue_buffer : (Time.t * message) Lwt_pipe.t -> infinite queue buffer + | Bounded_buffer : (Time.t * message) Lwt_pipe.t -> bounded queue buffer + | Dropbox_buffer : (Time.t * message) Lwt_dropbox.t -> dropbox buffer + + and 'kind t = { + limits : Worker_state.limits ; + timeout : float option ; + parameters : Types.parameters ; + mutable (* only for init *) worker : unit Lwt.t ; + mutable (* only for init *) state : Types.state option ; + buffer : 'kind buffer ; + event_log : (Logging.level * Event.t Ring.t) list ; + canceler : Lwt_canceler.t ; + name : Name.t ; + id : int ; + mutable status : Worker_state.worker_status ; + mutable current_request : (Time.t * Time.t * Request.view) option ; + table : 'kind table ; + } + and 'kind table = { + buffer_kind : 'kind buffer_kind ; + mutable last_id : int ; + instances : (Name.t, 'kind t) Hashtbl.t ; + zombies : (int, 'kind t) Hashtbl.t + } + + type error += Closed of Name.t + + let () = + register_error_kind `Permanent + ~id:("worker." ^ base_name ^ ".closed") + ~title:("Worker " ^ base_name ^ " closed") + ~description: + ("An operation on a " ^ base_name ^ + " worker could not complete \ + before it was shut down.") + ~pp: (fun ppf name -> + Format.fprintf ppf + "Worker %s[%a] has been shut down." + base_name Name.pp name) + Data_encoding.(obj1 (req "worker_id" Name.encoding)) + (function Closed net_id -> Some net_id | _ -> None) + (fun net_id -> Closed net_id) + + let queue_item ?u r = + Time.now (), + Message (r, u) + + let drop_request (w : dropbox t) request = + let Dropbox { merge } = w.table.buffer_kind in + let Dropbox_buffer message_box = w.buffer in + try + match + match Lwt_dropbox.peek message_box with + | None -> + merge w (Any_request request) None + | Some (_, Message (old, _)) -> + Lwt.ignore_result (Lwt_dropbox.take message_box) ; + merge w (Any_request request) (Some (Any_request old)) + with + | None -> () + | Some (Any_request neu) -> + Lwt_dropbox.put message_box (Time.now (), Message (neu, None)) + with Lwt_dropbox.Closed -> () + + let push_request (type a) (w : a queue t) request = + match w.buffer with + | Queue_buffer message_queue -> + Lwt_pipe.push message_queue (queue_item request) + | Bounded_buffer message_queue -> + Lwt_pipe.push message_queue (queue_item request) + + let push_request_now (w : infinite queue t) request = + let Queue_buffer message_queue = w.buffer in + Lwt_pipe.push_now_exn message_queue (queue_item request) + + let try_push_request_now (w : bounded queue t) request = + let Bounded_buffer message_queue = w.buffer in + Lwt_pipe.push_now message_queue (queue_item request) + + let push_request_and_wait (type a) (w : a queue t) request = + let message_queue = match w.buffer with + | Queue_buffer message_queue -> message_queue + | Bounded_buffer message_queue -> message_queue in + let t, u = Lwt.wait () in + Lwt.catch + (fun () -> + Lwt_pipe.push message_queue (queue_item ~u request) >>= fun () -> + t) + (function + | Lwt_pipe.Closed -> fail (Closed w.name) + | exn -> fail (Exn exn)) + + let close (type a) (w : a t) = + let wakeup = function + | _, Message (_, Some u) -> + Lwt.wakeup_later u (Error [ Closed w.name ]) + | _ -> () in + let close_queue message_queue = + let messages = Lwt_pipe.pop_all_now message_queue in + List.iter wakeup messages ; + Lwt_pipe.close message_queue in + match w.buffer with + | Queue_buffer message_queue -> close_queue message_queue + | Bounded_buffer message_queue -> close_queue message_queue + | Dropbox_buffer message_box -> + Option.iter ~f:wakeup (Lwt_dropbox.peek message_box) ; + Lwt_dropbox.close message_box + + let pop (type a) (w : a t) = + let pop_queue message_queue = + match w.timeout with + | None -> + Lwt_pipe.pop message_queue >>= fun m -> + return (Some m) + | Some timeout -> + Lwt_pipe.pop_with_timeout timeout message_queue >>= fun m -> + return m in + match w.buffer with + | Queue_buffer message_queue -> pop_queue message_queue + | Bounded_buffer message_queue -> pop_queue message_queue + | Dropbox_buffer message_box -> + match w.timeout with + | None -> + Lwt_dropbox.take message_box >>= fun m -> + return (Some m) + | Some timeout -> + Lwt_dropbox.take_with_timeout timeout message_box >>= fun m -> + return m + + let trigger_shutdown w = + Lwt.ignore_result (Lwt_canceler.cancel w.canceler) + + let protect { canceler } ?on_error f = + Lwt_utils.protect ?on_error ~canceler f + + let canceler { canceler } = canceler + + let log_event w evt = + let level = Event.level evt in + let log = + match level with + | Debug -> Logger.lwt_debug + | Info -> Logger.lwt_log_info + | Notice -> Logger.lwt_log_notice + | Warning -> Logger.lwt_warn + | Error -> Logger.lwt_log_error + | Fatal -> Logger.lwt_fatal_error in + log "[%a] %a" Name.pp w.name Event.pp evt >>= fun () -> + begin if level >= w.limits.backlog_level then + Ring.add (List.assoc level w.event_log) evt + end ; + Lwt.return_unit + + let record_event w evt = + Lwt.ignore_result (log_event w evt) + + module type HANDLERS = sig + type self + val on_launch : + self -> Name.t -> Types.parameters -> Types.state Lwt.t + val on_request : + self -> 'a Request.t -> 'a tzresult Lwt.t + val on_no_request : + self -> unit tzresult Lwt.t + val on_close : + self -> unit Lwt.t + val on_error : + self -> Request.view -> Worker_state.request_status -> error list -> unit tzresult Lwt.t + val on_completion : + self -> 'a Request.t -> 'a -> Worker_state.request_status -> unit Lwt.t + end + + let create_table buffer_kind = + { buffer_kind ; + last_id = 0 ; + instances = Hashtbl.create 10 ; + zombies = Hashtbl.create 10 } + + let worker_loop (type kind) handlers (w : kind t) = + let (module Handlers : HANDLERS with type self = kind t) = handlers in + let do_close errs = + let t0 = match w.status with + | Running t0 -> t0 + | _ -> assert false in + w.status <- Closing (t0, Time.now ()) ; + Handlers.on_close w >>= fun () -> + close w ; + Lwt_canceler.cancel w.canceler >>= fun () -> + w.status <- Closed (t0, Time.now (), errs) ; + w.state <- None ; + Hashtbl.remove w.table.instances w.name ; + Hashtbl.add w.table.zombies w.id w ; + Lwt.ignore_result + (Lwt_unix.sleep w.limits.zombie_memory >>= fun () -> + List.iter (fun (_, ring) -> Ring.clear ring) w.event_log ; + Lwt_unix.sleep (w.limits.zombie_lifetime -. w.limits.zombie_memory) >>= fun () -> + Hashtbl.remove w.table.zombies w.id ; + Lwt.return ()) ; + Lwt.return_unit in + let rec loop () = + begin + Lwt_utils.protect ~canceler:w.canceler begin fun () -> + pop w + end >>=? function + | None -> Handlers.on_no_request w + | Some (pushed, Message (request, u)) -> + let current_request = Request.view request in + let treated = Time.now () in + w.current_request <- Some (pushed, treated, current_request) ; + Logger.debug "[%a] request: @[%a@]" + Name.pp w.name + Request.pp current_request ; + match u with + | None -> + Handlers.on_request w request >>=? fun res -> + let completed = Time.now () in + w.current_request <- None ; + Handlers.on_completion w + request res Worker_state.{ pushed ; treated ; completed } >>= fun () -> + return () + | Some u -> + Handlers.on_request w request >>= fun res -> + Lwt.wakeup_later u res ; + Lwt.return res >>=? fun res -> + let completed = Time.now () in + w.current_request <- None ; + Handlers.on_completion w + request res Worker_state.{ pushed ; treated ; completed } >>= fun () -> + return () + end >>= function + | Ok () -> + loop () + | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] -> + Logger.lwt_log_info + "[%a] worker terminated" + Name.pp w.name >>= fun () -> + do_close None + | Error errs -> + begin match w.current_request with + | Some (pushed, treated, request) -> + let completed = Time.now () in + w.current_request <- None ; + Handlers.on_error w + request Worker_state.{ pushed ; treated ; completed } errs + | None -> assert false + end >>= function + | Ok () -> + loop () + | Error errs -> + Logger.lwt_log_error + "@[[%a] worker crashed:@ %a@]" + Name.pp w.name + pp_print_error errs >>= fun () -> + do_close (Some errs) in + loop () + + let launch + : type kind. + kind table -> ?timeout:float -> + Worker_state.limits -> Name.t -> Types.parameters -> + (module HANDLERS with type self = kind t) -> + kind t Lwt.t + = fun table ?timeout limits name parameters (module Handlers) -> + if Hashtbl.mem table.instances name then + invalid_arg + (Format.asprintf + "Lwt_worker.launch: \ + duplicate worker %s[%a]" base_name Name.pp name) ; + Logger.lwt_log_info + "[%a] worker started" + Name.pp name >>= fun () -> + let canceler = Lwt_canceler.create () in + let buffer : kind buffer = + match table.buffer_kind with + | Queue -> + Queue_buffer (Lwt_pipe.create ()) + | Bounded { size } -> + Bounded_buffer (Lwt_pipe.create ~size:(size, (fun _ -> 1)) ()) + | Dropbox _ -> + Dropbox_buffer (Lwt_dropbox.create ()) in + let event_log = + let levels = + [ Logging.Debug ; Info ; Notice ; Warning ; Error ; Fatal ] in + List.map (fun l -> l, Ring.create limits.backlog_size) levels in + let w = { limits ; parameters ; name ; canceler ; + table ; buffer ; + state = None ; id = (table.last_id <- table.last_id + 1; table.last_id) ; + worker = Lwt.return_unit ; + event_log ; timeout ; + current_request = None ; + status = Launching (Time.now ())} in + Hashtbl.add table.instances name w ; + Handlers.on_launch w name parameters >>= fun state -> + w.status <- Running (Time.now ()) ; + w.state <- Some state ; + w.worker <- + Lwt_utils.worker + (Format.asprintf "%s[%a]" + base_name + Name.pp w.name) + ~run:(fun () -> worker_loop (module Handlers) w) + ~cancel:(fun () -> Lwt_canceler.cancel w.canceler) ; + Lwt.return w + + let shutdown w = + Logger.lwt_debug "triggering shutdown" >>= fun () -> + Lwt_canceler.cancel w.canceler >>= fun () -> + w.worker + + let state w = + match w.state with + | None -> + invalid_arg + "Lwt_worker.state: \ + state called before worker was initialized" + | Some state -> state + + let last_events w = + List.map + (fun (level, ring) -> (level, Ring.elements ring)) + w.event_log + + let pending_requests (type a) (w : a queue t) = + let message_queue = match w.buffer with + | Queue_buffer message_queue -> message_queue + | Bounded_buffer message_queue -> message_queue in + List.map + (function (t, Message (req, _)) -> t, Request.view req) + (Lwt_pipe.peek_all message_queue) + + let status { status } = status + + let current_request { current_request } = current_request + + let view w = + Types.view (state w) w.parameters + + let list { instances } = + Hashtbl.fold + (fun n w acc -> (n, w) :: acc) + instances [] + +end diff --git a/src/lib_node_shell/worker.mli b/src/lib_node_shell/worker.mli new file mode 100644 index 000000000..129cbb87c --- /dev/null +++ b/src/lib_node_shell/worker.mli @@ -0,0 +1,275 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** Lwt based local event loops with automated introspection *) + +(** {2 Parameters to build a worker group} *) + +(** The name of the group of workers corresponding to an instanciation + of {!Make}, as well as the name of each worker in that group. *) +module type NAME = sig + + (** The name/path of the worker group *) + val base : string list + + (** The abstract name of a single worker *) + type t + + (** Serializer for the introspection RPCs *) + val encoding : t Data_encoding.t + + (** Pretty printer for displaying the worker name *) + val pp : Format.formatter -> t -> unit + +end + +(** Events that are used for logging and introspection. + Events are pretty printed immediately in the log, and stored in + the worker's event backlog for introspection. *) +module type EVENT = sig + + (** The type of an event. *) + type t + + (** Assigns a logging level to each event. + Events can be ignored for logging w.r.t. the global node configuration. + Events can be ignored for introspection w.r.t. to the worker's + {!Worker_state.limits}. *) + val level : t -> Logging.level + + (** Serializer for the introspection RPCs *) + val encoding : error list Data_encoding.t -> t Data_encoding.t + + (** Pretty printer, also used for logging *) + val pp : Format.formatter -> t -> unit + +end + +(** The type of messages that are fed to the worker's event loop. *) +module type REQUEST = sig + + (** The type of events. + It is possible to wait for an event to be processed from outside + the worker using {!push_request_and_wait}. In this case, the + handler for this event can return a value. The parameter is the + type of this value. *) + type 'a t + + (** As requests can contain arbitrary data that may not be + serializable and are polymorphic, this view type is a + monomorphic projection sufficient for introspection. *) + type view + + (** The projection function from full request to simple views. *) + val view : 'a t -> view + + (** Serializer for the introspection RPCs *) + val encoding : view Data_encoding.t + + (** Pretty printer, also used for logging by {!Request_event}. *) + val pp : Format.formatter -> view -> unit + +end + +(** The (imperative) state of the event loop. *) +module type TYPES = sig + + (** The internal state that is passed to the event handlers. *) + type state + + (** The parameters provided when launching a new worker. *) + type parameters + + (** A simplified view of the worker's state for introspection. *) + type view + + (** The projection function from full state to simple views. *) + val view : state -> parameters -> view + + (** Serializer for the introspection RPCs *) + val encoding : view Data_encoding.t + + (** Pretty printer for introspection. *) + val pp : Format.formatter -> view -> unit + +end + +(** {2 Worker group maker} *) + +(** Functor to build a group of workers. + At that point, all the types are fixed and introspectable, + but the actual parameters and event handlers can be tweaked + for each individual worker. *) +module Make + (Name : NAME) + (Event : EVENT) + (Request : REQUEST) + (Types : TYPES) : sig + + (** A handle to a specific worker, parameterized by the type of + internal message buffer. *) + type 'kind t + + (** A handle to a table of workers. *) + type 'kind table + + (** Internal buffer kinds used as parameters to {!t}. *) + type 'a queue and bounded and infinite + type dropbox + + + (** Supported kinds of internal buffers. *) + type _ buffer_kind = + | Queue : infinite queue buffer_kind + | Bounded : { size : int } -> bounded queue buffer_kind + | Dropbox : + { merge : (dropbox t -> + any_request -> + any_request option -> + any_request option) } + -> dropbox buffer_kind + and any_request = Any_request : _ Request.t -> any_request + + (** Create a table of workers. *) + val create_table : 'kind buffer_kind -> 'kind table + + (** An error returned when trying to communicate with a worker that + has been closed. *) + type Error_monad.error += Closed of Name.t + + (** The callback handlers specific to each worker instance. *) + module type HANDLERS = sig + + (** Placeholder replaced with {!t} with the right parameters + provided by the type of buffer chosen at {!launch}.*) + type self + + (** Builds the initial internal state of a worker at launch. + It is possible to initialize the message queue. + Of course calling {!state} will fail at that point. *) + val on_launch : + self -> Name.t -> Types.parameters -> Types.state Lwt.t + + (** The main request processor, i.e. the body of the event loop. *) + val on_request : + self -> 'a Request.t -> 'a tzresult Lwt.t + + (** Called when no request has been made before the timeout, if + the parameter has been passed to {!launch}. *) + val on_no_request : + self -> unit tzresult Lwt.t + + (** A function called when terminating a worker. *) + val on_close : + self -> unit Lwt.t + + (** A function called at the end of the worker loop in case of an + abnormal error. This function can handle the error by + returning [Ok ()], or leave the default unexpected error + behaviour by returning its parameter. A possibility is to + handle the error for ad-hoc logging, and still use + {!trigger_shutdown} to kill the worker. *) + val on_error : + self -> + Request.view -> + Worker_state.request_status -> + error list -> + unit tzresult Lwt.t + + (** A function called at the end of the worker loop in case of a + successful treatment of the current request. *) + val on_completion : + self -> + 'a Request.t -> 'a -> + Worker_state.request_status -> + unit Lwt.t + end + + (** Creates a new worker instance. + Parameter [queue_size] not passed means unlimited queue. *) + val launch : + 'kind table -> ?timeout:float -> + Worker_state.limits -> Name.t -> Types.parameters -> + (module HANDLERS with type self = 'kind t) -> + 'kind t Lwt.t + + (** Triggers a worker termination and waits for its completion. + Cannot be called from within the handlers. *) + val shutdown : + _ t -> unit Lwt.t + + (** Adds a message to the queue and waits for its result. + Cannot be called from within the handlers. *) + val push_request_and_wait : + _ queue t -> 'a Request.t -> 'a tzresult Lwt.t + + (** Adds a message to the queue. *) + val push_request : + _ queue t -> 'a Request.t -> unit Lwt.t + + (** Adds a message to the queue immediately. + Returns [false] if the queue is full. *) + val try_push_request_now : + bounded queue t -> 'a Request.t -> bool + + (** Adds a message to the queue immediately. *) + val push_request_now : + infinite queue t -> 'a Request.t -> unit + + (** Sets the current request. *) + val drop_request : + dropbox t -> 'a Request.t -> unit + + (** Detects cancelation from within the request handler to stop + asynchronous operations. *) + val protect : + _ t -> + ?on_error: (error list -> 'b tzresult Lwt.t) -> + (unit -> 'b tzresult Lwt.t) -> + 'b tzresult Lwt.t + + (** Exports the canceler to allow cancelation of other tasks when this + worker is shutdowned or when it dies. *) + val canceler : _ t -> Lwt_canceler.t + + (** Triggers a worker termination. *) + val trigger_shutdown : _ t -> unit + + (** Recod an event in the backlog. *) + val record_event : _ t -> Event.t -> unit + + (** Record an event and make sure it is logged. *) + val log_event : _ t -> Event.t -> unit Lwt.t + + (** Access the internal state, once initialized. *) + val state : _ t -> Types.state + + (** Access the event backlog. *) + val last_events : _ t -> (Logging.level * Event.t list) list + + (** Introspect the message queue, gives the times requests were pushed. *) + val pending_requests : _ queue t -> (Time.t * Request.view) list + + (** Get the running status of a worker. *) + val status : _ t -> Worker_state.worker_status + + (** Get the request being treated by a worker. + Gives the time the request was pushed, and the time its + treatment started. *) + val current_request : _ t -> (Time.t * Time.t * Request.view) option + + (** Introspect the state of a worker. *) + val view : _ t -> Types.view + + (** Lists the running workers in this group. + After they are killed, workers are kept in the table + for a number of seconds given in the {!Worker_state.limits}. *) + val list : 'a table -> (Name.t * 'a t) list + +end diff --git a/src/lib_node_shell/worker_state.ml b/src/lib_node_shell/worker_state.ml new file mode 100644 index 000000000..7069815a8 --- /dev/null +++ b/src/lib_node_shell/worker_state.ml @@ -0,0 +1,109 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +type limits = + { backlog_size : int ; + backlog_level : Logging.level ; + zombie_lifetime : float ; + zombie_memory : float } + +type worker_status = + | Launching of Time.t + | Running of Time.t + | Closing of Time.t * Time.t + | Closed of Time.t * Time.t * error list option + +let worker_status_encoding error_encoding = + let open Data_encoding in + union + [ case (Tag 0) + (obj2 + (req "phase" (constant "launching")) + (req "since" Time.encoding)) + (function Launching t -> Some ((), t) | _ -> None) + (fun ((), t) -> Launching t) ; + case (Tag 1) + (obj2 + (req "phase" (constant "running")) + (req "since" Time.encoding)) + (function Running t -> Some ((), t) | _ -> None) + (fun ((), t) -> Running t) ; + case (Tag 2) + (obj3 + (req "phase" (constant "closing")) + (req "birth" Time.encoding) + (req "since" Time.encoding)) + (function Closing (t0, t) -> Some ((), t0, t) | _ -> None) + (fun ((), t0, t) -> Closing (t0, t)) ; + case (Tag 3) + (obj3 + (req "phase" (constant "closed")) + (req "birth" Time.encoding) + (req "since" Time.encoding)) + (function Closed (t0, t, None) -> Some ((), t0, t) | _ -> None) + (fun ((), t0, t) -> Closed (t0, t, None)) ; + case (Tag 4) + (obj4 + (req "phase" (constant "crashed")) + (req "birth" Time.encoding) + (req "since" Time.encoding) + (req "errors" error_encoding)) + (function Closed (t0, t, Some errs) -> Some ((), t0, t, errs) | _ -> None) + (fun ((), t0, t, errs) -> Closed (t0, t, Some errs )) ] + +type request_status = + { pushed : Time.t ; + treated : Time.t ; + completed : Time.t } + +let request_status_encoding = + let open Data_encoding in + conv + (fun { pushed ; treated ; completed } -> + (pushed, treated, completed)) + (fun (pushed, treated, completed) -> + { pushed ; treated ; completed }) + (obj3 + (req "pushed" Time.encoding) + (req "treated" Time.encoding) + (req "completed" Time.encoding)) + +type ('req, 'evt) full_status = + { status : worker_status ; + pending_requests : (Time.t * 'req) list ; + backlog : (Logging.level * 'evt list) list ; + current_request : (Time.t * Time.t * 'req) option } + +let full_status_encoding req_encoding evt_encoding error_encoding = + let open Data_encoding in + let requests_encoding = + list + (obj2 + (req "pushed" Time.encoding) + (req "request" (dynamic_size req_encoding))) in + let events_encoding = + list + (obj2 + (req "level" Logging.level_encoding) + (req "events" (dynamic_size (list (dynamic_size evt_encoding))))) in + let current_request_encoding = + obj3 + (req "pushed" Time.encoding) + (req "treated" Time.encoding) + (req "request" req_encoding) in + conv + (fun { status ; pending_requests ; backlog ; current_request } -> + (status, pending_requests, backlog, current_request)) + (fun (status, pending_requests, backlog, current_request) -> + { status ; pending_requests ; backlog ; current_request }) + (obj4 + (req "status" (worker_status_encoding error_encoding)) + (req "pending_requests" requests_encoding) + (req "backlog" events_encoding) + (opt "current_request" current_request_encoding)) diff --git a/src/lib_node_shell/worker_state.mli b/src/lib_node_shell/worker_state.mli new file mode 100644 index 000000000..0a25473fe --- /dev/null +++ b/src/lib_node_shell/worker_state.mli @@ -0,0 +1,52 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** Some memory and time limits. *) +type limits = + { backlog_size : int + (** Number of event stored in the backlog for each debug level. *) ; + backlog_level : Logging.level + (** Stores events at least as important as this value. *) ; + zombie_lifetime : float + (** How long dead workers are kept in the introspection table. *) ; + zombie_memory : float + (** How long zombie workers' logs are kept. *) } + +(** The running status of an individual worker. *) +type worker_status = + | Launching of Time.t + | Running of Time.t + | Closing of Time.t * Time.t + | Closed of Time.t * Time.t * error list option + +(** Worker status serializer for RPCs. *) +val worker_status_encoding : error list Data_encoding.t -> worker_status Data_encoding.t + +(** The runnning status of an individual request. *) +type request_status = + { pushed : Time.t ; + treated : Time.t ; + completed : Time.t } + +(** Request status serializer for RPCs. *) +val request_status_encoding : request_status Data_encoding.t + +(** The full status of an individual worker. *) +type ('req, 'evt) full_status = + { status : worker_status ; + pending_requests : (Time.t * 'req) list ; + backlog : (Logging.level * 'evt list) list ; + current_request : (Time.t * Time.t * 'req) option } + +(** Full worker status serializer for RPCs. *) +val full_status_encoding : + 'req Data_encoding.t -> + 'evt Data_encoding.t -> + error list Data_encoding.t -> + ('req, 'evt) full_status Data_encoding.t diff --git a/src/lib_stdlib_lwt/logging.ml b/src/lib_stdlib_lwt/logging.ml index 2092b950b..f0e39f639 100644 --- a/src/lib_stdlib_lwt/logging.ml +++ b/src/lib_stdlib_lwt/logging.ml @@ -23,6 +23,7 @@ module type LOG = sig val lwt_log_notice: ('a, Format.formatter, unit, unit Lwt.t) format4 -> 'a val lwt_warn: ('a, Format.formatter, unit, unit Lwt.t) format4 -> 'a val lwt_log_error: ('a, Format.formatter, unit, unit Lwt.t) format4 -> 'a + val lwt_fatal_error: ('a, Format.formatter, unit, unit Lwt.t) format4 -> 'a end @@ -63,6 +64,7 @@ module Make(S : sig val name: string end) : LOG = struct let lwt_log_notice fmt = log_f ~section ~level:Lwt_log.Notice fmt let lwt_warn fmt = log_f ~section ~level:Lwt_log.Warning fmt let lwt_log_error fmt = log_f ~section ~level:Lwt_log.Error fmt + let lwt_fatal_error fmt = log_f ~section ~level:Lwt_log.Fatal fmt end diff --git a/src/lib_stdlib_lwt/logging.mli b/src/lib_stdlib_lwt/logging.mli index 46059bbaa..5919b8714 100644 --- a/src/lib_stdlib_lwt/logging.mli +++ b/src/lib_stdlib_lwt/logging.mli @@ -21,6 +21,7 @@ module type LOG = sig val lwt_log_notice: ('a, Format.formatter, unit, unit Lwt.t) format4 -> 'a val lwt_warn: ('a, Format.formatter, unit, unit Lwt.t) format4 -> 'a val lwt_log_error: ('a, Format.formatter, unit, unit Lwt.t) format4 -> 'a + val lwt_fatal_error: ('a, Format.formatter, unit, unit Lwt.t) format4 -> 'a end