semantic logging for client baking
AMENDED: Syn has always been DSL, Semantic has always been Make_semantic.
This commit is contained in:
parent
98961c9335
commit
e7dba18980
@ -7,7 +7,10 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
let log = Signer_logging.lwt_log_notice
|
||||
open Signer_logging
|
||||
|
||||
let log = lwt_log_notice
|
||||
|
||||
|
||||
module Authorized_key =
|
||||
Client_aliases.Alias (struct
|
||||
@ -32,10 +35,12 @@ let sign
|
||||
(cctxt : #Client_context.wallet)
|
||||
Signer_messages.Sign.Request.{ pkh ; data ; signature }
|
||||
?magic_bytes ~require_auth =
|
||||
log "Request for signing %d bytes of data for key %a, magic byte = %02X"
|
||||
(MBytes.length data)
|
||||
Signature.Public_key_hash.pp pkh
|
||||
(MBytes.get_uint8 data 0) >>= fun () ->
|
||||
log Tag.DSL.(fun f ->
|
||||
f "Request for signing %d bytes of data for key %a, magic byte = %02X"
|
||||
-% t event "request_for_signing"
|
||||
-% s num_bytes (MBytes.length data)
|
||||
-% a Signature.Public_key_hash.Logging.tag pkh
|
||||
-% s magic_byte (MBytes.get_uint8 data 0)) >>= fun () ->
|
||||
check_magic_byte magic_bytes data >>=? fun () ->
|
||||
begin match require_auth, signature with
|
||||
| false, _ -> return_unit
|
||||
@ -52,24 +57,36 @@ let sign
|
||||
failwith "invalid authentication signature"
|
||||
end >>=? fun () ->
|
||||
Client_keys.get_key cctxt pkh >>=? fun (name, _pkh, sk_uri) ->
|
||||
log "Signing data for key %s" name >>= fun () ->
|
||||
log Tag.DSL.(fun f ->
|
||||
f "Signing data for key %s"
|
||||
-% t event "signing_data"
|
||||
-% s Client_keys.Logging.tag name) >>= fun () ->
|
||||
Client_keys.sign cctxt sk_uri data >>=? fun signature ->
|
||||
return signature
|
||||
|
||||
let public_key (cctxt : #Client_context.wallet) pkh =
|
||||
log "Request for public key %a"
|
||||
Signature.Public_key_hash.pp pkh >>= fun () ->
|
||||
log Tag.DSL.(fun f ->
|
||||
f "Request for public key %a"
|
||||
-% t event "request_for_public_key"
|
||||
-% a Signature.Public_key_hash.Logging.tag pkh) >>= fun () ->
|
||||
Client_keys.list_keys cctxt >>=? fun all_keys ->
|
||||
match List.find (fun (_, h, _, _) -> Signature.Public_key_hash.equal h pkh) all_keys with
|
||||
| exception Not_found ->
|
||||
log "No public key found for hash %a"
|
||||
Signature.Public_key_hash.pp pkh >>= fun () ->
|
||||
log Tag.DSL.(fun f ->
|
||||
f "No public key found for hash %a"
|
||||
-% t event "not_found_public_key"
|
||||
-% a Signature.Public_key_hash.Logging.tag pkh) >>= fun () ->
|
||||
Lwt.fail Not_found
|
||||
| (_, _, None, _) ->
|
||||
log "No public key found for hash %a"
|
||||
Signature.Public_key_hash.pp pkh >>= fun () ->
|
||||
log Tag.DSL.(fun f ->
|
||||
f "No public key found for hash %a"
|
||||
-% t event "not_found_public_key"
|
||||
-% a Signature.Public_key_hash.Logging.tag pkh) >>= fun () ->
|
||||
Lwt.fail Not_found
|
||||
| (name, _, Some pk, _) ->
|
||||
log "Found public key for hash %a (name: %s)"
|
||||
Signature.Public_key_hash.pp pkh name >>= fun () ->
|
||||
log Tag.DSL.(fun f ->
|
||||
f "Found public key for hash %a (name: %s)"
|
||||
-% t event "found_public_key"
|
||||
-% a Signature.Public_key_hash.Logging.tag pkh
|
||||
-% s Client_keys.Logging.tag name) >>= fun () ->
|
||||
return pk
|
||||
|
@ -8,6 +8,7 @@
|
||||
(**************************************************************************)
|
||||
|
||||
let log = Signer_logging.lwt_log_notice
|
||||
open Signer_logging
|
||||
|
||||
let run (cctxt : #Client_context.wallet) ~hosts ?magic_bytes ~require_auth mode =
|
||||
let dir = RPC_directory.empty in
|
||||
@ -32,7 +33,10 @@ let run (cctxt : #Client_context.wallet) ~hosts ?magic_bytes ~require_auth mode
|
||||
List.map
|
||||
(fun host ->
|
||||
let host = Ipaddr.V6.to_string host in
|
||||
log "Listening on address %s" host >>= fun () ->
|
||||
log Tag.DSL.(fun f ->
|
||||
f "Listening on address %s"
|
||||
-% t event "signer_listening"
|
||||
-% s host_name host) >>= fun () ->
|
||||
RPC_server.launch ~host mode dir
|
||||
~media_types:Media_type.all_media_types
|
||||
>>= fun _server ->
|
||||
@ -49,7 +53,10 @@ let run_https (cctxt : #Client_context.wallet) ~host ~port ~cert ~key ?magic_byt
|
||||
failwith "Cannot resolve listening address: %S" host
|
||||
| points ->
|
||||
let hosts = fst (List.split points) in
|
||||
log "Accepting HTTPS requests on port %d" port >>= fun () ->
|
||||
log Tag.DSL.(fun f ->
|
||||
f "Accepting HTTPS requests on port %d"
|
||||
-% t event "accepting_https_requests"
|
||||
-% s port_number port) >>= fun () ->
|
||||
let mode : Conduit_lwt_unix.server =
|
||||
`TLS (`Crt_file_path cert, `Key_file_path key, `No_password, `Port port) in
|
||||
run (cctxt : #Client_context.wallet) ~hosts ?magic_bytes ~require_auth mode
|
||||
@ -60,7 +67,10 @@ let run_http (cctxt : #Client_context.wallet) ~host ~port ?magic_bytes ~require_
|
||||
failwith "Cannot resolve listening address: %S" host
|
||||
| points ->
|
||||
let hosts = fst (List.split points) in
|
||||
log "Accepting HTTP requests on port %d" port >>= fun () ->
|
||||
log Tag.DSL.(fun f ->
|
||||
f "Accepting HTTP requests on port %d"
|
||||
-% t event "accepting_http_requests"
|
||||
-% s port_number port) >>= fun () ->
|
||||
let mode : Conduit_lwt_unix.server =
|
||||
`TCP (`Port port) in
|
||||
run (cctxt : #Client_context.wallet) ~hosts ?magic_bytes ~require_auth mode
|
||||
|
@ -7,4 +7,10 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Tezos_stdlib.Logging.Make(struct let name = "client.signer" end)
|
||||
include Tezos_stdlib.Logging.Make_semantic(struct let name = "client.signer" end)
|
||||
|
||||
let host_name = Tag.def ~doc:"Host name" "host" Format.pp_print_text
|
||||
let magic_byte = Tag.def ~doc:"Magic byte" "magic_byte" Format.pp_print_int
|
||||
let num_bytes = Tag.def ~doc:"Number of bytes" "num_bytes" Format.pp_print_int
|
||||
let port_number = Tag.def ~doc:"Port number" "port" Format.pp_print_int
|
||||
let unix_socket_path = Tag.def ~doc:"UNIX socket file path" "unix_socket" Format.pp_print_text
|
||||
|
@ -7,4 +7,10 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Tezos_stdlib.Logging.LOG
|
||||
include Tezos_stdlib.Logging.SEMLOG
|
||||
|
||||
val host_name: string Tag.def
|
||||
val magic_byte: int Tag.def
|
||||
val num_bytes: int Tag.def
|
||||
val port_number: int Tag.def
|
||||
val unix_socket_path: string Tag.def
|
||||
|
@ -7,9 +7,10 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
open Signer_logging
|
||||
open Signer_messages
|
||||
|
||||
let log = Signer_logging.lwt_log_notice
|
||||
let log = lwt_log_notice
|
||||
|
||||
let run (cctxt : #Client_context.wallet) path ?magic_bytes ~require_auth =
|
||||
Lwt_utils_unix.Socket.bind path >>=? fun fd ->
|
||||
@ -46,13 +47,20 @@ let run (cctxt : #Client_context.wallet) path ?magic_bytes ~require_auth =
|
||||
begin
|
||||
match path with
|
||||
| Tcp (host, port) ->
|
||||
log "Accepting TCP requests on port %s:%d" host port
|
||||
log Tag.DSL.(fun f ->
|
||||
f "Accepting TCP requests on port %s:%d"
|
||||
-% t event "accepting_tcp_requests"
|
||||
-% s host_name host
|
||||
-% s port_number port)
|
||||
| Unix path ->
|
||||
Sys.set_signal Sys.sigint (Signal_handle begin fun _ ->
|
||||
Format.printf "Removing the local socket file and quitting.@." ;
|
||||
Unix.unlink path ;
|
||||
exit 0
|
||||
end) ;
|
||||
log "Accepting UNIX requests on %s" path
|
||||
log Tag.DSL.(fun f ->
|
||||
f "Accepting UNIX requests on %s"
|
||||
-% t event "accepting_unix_requests"
|
||||
-% s unix_socket_path path)
|
||||
end >>= fun () ->
|
||||
loop ()
|
||||
|
@ -133,6 +133,8 @@ type step = {
|
||||
strict_step: bool ;
|
||||
}
|
||||
|
||||
let pp_step ppf step = Format.fprintf ppf "%d%s" step.step (if step.strict_step then "" else " max")
|
||||
|
||||
let to_steps seed locator =
|
||||
fold locator seed
|
||||
~init:[]
|
||||
|
@ -50,6 +50,8 @@ type step = {
|
||||
locator, and the expected difference of level between the two
|
||||
blocks (or an upper bounds when [strict_step = false]). *)
|
||||
|
||||
val pp_step: Format.formatter -> step -> unit
|
||||
|
||||
val to_steps: seed -> t -> step list
|
||||
(** Build all the 'steps' composing the locator using a given seed,
|
||||
starting with the oldest one (typically the predecessor of the
|
||||
|
@ -15,3 +15,17 @@ let rpc_arg =
|
||||
~descr:"A cryptographic node identity (Base58Check-encoded)"
|
||||
"peer_id"
|
||||
|
||||
let pp_source ppf = function
|
||||
| None -> ()
|
||||
| Some peer -> Format.fprintf ppf " from peer %a" pp peer
|
||||
|
||||
module Logging = struct
|
||||
open Tezos_stdlib.Logging
|
||||
include Make_semantic(struct let name = "node.distributed_db.p2p_reader" end)
|
||||
let mk_tag pp = Tag.def ~doc:"P2P peer ID" "p2p_peer_id" pp
|
||||
let tag = mk_tag pp_short
|
||||
let tag_opt = mk_tag (fun ppf -> function
|
||||
| None -> ()
|
||||
| Some peer -> pp_short ppf peer)
|
||||
let tag_source = Tag.def ~doc:"Peer which provided information" "p2p_peer_id_source" pp_source
|
||||
end
|
||||
|
@ -8,3 +8,9 @@
|
||||
(**************************************************************************)
|
||||
|
||||
include Tezos_crypto.S.HASH with type t = Crypto_box.Public_key_hash.t
|
||||
|
||||
module Logging: sig
|
||||
val tag: t Tag.def
|
||||
val tag_opt: t option Tag.def
|
||||
val tag_source: t option Tag.def
|
||||
end
|
||||
|
@ -33,13 +33,19 @@ let () =
|
||||
(function Invalid_uri s -> Some (Uri.to_string s) | _ -> None)
|
||||
(fun s -> Invalid_uri (Uri.of_string s))
|
||||
|
||||
module Public_key_hash = Client_aliases.Alias (struct
|
||||
type t = Signature.Public_key_hash.t
|
||||
let encoding = Signature.Public_key_hash.encoding
|
||||
let of_source s = Lwt.return (Signature.Public_key_hash.of_b58check s)
|
||||
let to_source p = return (Signature.Public_key_hash.to_b58check p)
|
||||
let name = "public key hash"
|
||||
end)
|
||||
module Public_key_hash = struct
|
||||
include Client_aliases.Alias (struct
|
||||
type t = Signature.Public_key_hash.t
|
||||
let encoding = Signature.Public_key_hash.encoding
|
||||
let of_source s = Lwt.return (Signature.Public_key_hash.of_b58check s)
|
||||
let to_source p = return (Signature.Public_key_hash.to_b58check p)
|
||||
let name = "public key hash"
|
||||
end)
|
||||
end
|
||||
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:"Identity" "pk_alias" Format.pp_print_text
|
||||
end
|
||||
|
||||
module type KEY = sig
|
||||
type t
|
||||
|
@ -29,6 +29,10 @@ module Public_key :
|
||||
module Secret_key :
|
||||
Client_aliases.Alias with type t = sk_uri
|
||||
|
||||
module Logging : sig
|
||||
val tag : string Tag.def
|
||||
end
|
||||
|
||||
(** {2 Interface for external signing modules.} *)
|
||||
|
||||
module type SIGNER = sig
|
||||
|
@ -14,5 +14,11 @@ include Blake2B.Make (Base58) (struct
|
||||
let size = None
|
||||
end)
|
||||
|
||||
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:"Block Hash" "block_hash" pp_short
|
||||
let predecessor_tag = Tag.def ~doc:"Block Predecessor Hash" "predecessor_hash" pp_short
|
||||
end
|
||||
|
||||
let () =
|
||||
Base58.check_encoded_prefix b58check_encoding "B" 51
|
||||
|
@ -8,3 +8,8 @@
|
||||
(**************************************************************************)
|
||||
|
||||
include S.HASH
|
||||
|
||||
module Logging : sig
|
||||
val tag : t Tag.def
|
||||
val predecessor_tag : t Tag.def
|
||||
end
|
||||
|
@ -9,13 +9,17 @@
|
||||
|
||||
open Error_monad
|
||||
|
||||
module Public_key_hash = Blake2B.Make(Base58)(struct
|
||||
let name = "Ed25519.Public_key_hash"
|
||||
let title = "An Ed25519 public key hash"
|
||||
let b58check_prefix = Base58.Prefix.ed25519_public_key_hash
|
||||
let size = Some 20
|
||||
end)
|
||||
|
||||
module Public_key_hash = struct
|
||||
include Blake2B.Make(Base58)(struct
|
||||
let name = "Ed25519.Public_key_hash"
|
||||
let title = "An Ed25519 public key hash"
|
||||
let b58check_prefix = Base58.Prefix.ed25519_public_key_hash
|
||||
let size = Some 20
|
||||
end)
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:title name pp
|
||||
end
|
||||
end
|
||||
let () =
|
||||
Base58.check_encoded_prefix Public_key_hash.b58check_encoding "tz1" 36
|
||||
|
||||
|
@ -17,3 +17,6 @@ include Blake2B.Make (Base58) (struct
|
||||
let () =
|
||||
Base58.check_encoded_prefix b58check_encoding "o" 51
|
||||
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:title name pp
|
||||
end
|
||||
|
@ -8,3 +8,7 @@
|
||||
(**************************************************************************)
|
||||
|
||||
include S.HASH
|
||||
|
||||
module Logging : sig
|
||||
val tag : t Tag.def
|
||||
end
|
||||
|
@ -7,12 +7,18 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
module Public_key_hash = Blake2B.Make(Base58)(struct
|
||||
let name = "P256.Public_key_hash"
|
||||
let title = "A P256 public key hash"
|
||||
let b58check_prefix = Base58.Prefix.p256_public_key_hash
|
||||
let size = Some 20
|
||||
end)
|
||||
module Public_key_hash = struct
|
||||
include Blake2B.Make(Base58)(struct
|
||||
let name = "P256.Public_key_hash"
|
||||
let title = "A P256 public key hash"
|
||||
let b58check_prefix = Base58.Prefix.p256_public_key_hash
|
||||
let size = Some 20
|
||||
end)
|
||||
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:title name pp
|
||||
end
|
||||
end
|
||||
|
||||
let () =
|
||||
Base58.check_encoded_prefix Public_key_hash.b58check_encoding "tz3" 36
|
||||
|
@ -17,3 +17,6 @@ include Blake2B.Make (Base58) (struct
|
||||
let () =
|
||||
Base58.check_encoded_prefix b58check_encoding "P" 51
|
||||
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:title name pp
|
||||
end
|
||||
|
@ -8,3 +8,7 @@
|
||||
(**************************************************************************)
|
||||
|
||||
include S.HASH
|
||||
|
||||
module Logging : sig
|
||||
val tag : t Tag.def
|
||||
end
|
||||
|
@ -169,6 +169,9 @@ module type SIGNATURE = sig
|
||||
|
||||
val zero: t
|
||||
|
||||
module Logging : sig
|
||||
val tag : t Tag.def
|
||||
end
|
||||
end
|
||||
|
||||
module Public_key : sig
|
||||
|
@ -7,12 +7,17 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
module Public_key_hash = Blake2B.Make(Base58)(struct
|
||||
let name = "Secp256k1.Public_key_hash"
|
||||
let title = "A Secp256k1 public key hash"
|
||||
let b58check_prefix = Base58.Prefix.secp256k1_public_key_hash
|
||||
let size = Some 20
|
||||
end)
|
||||
module Public_key_hash = struct
|
||||
include Blake2B.Make(Base58)(struct
|
||||
let name = "Secp256k1.Public_key_hash"
|
||||
let title = "A Secp256k1 public key hash"
|
||||
let b58check_prefix = Base58.Prefix.secp256k1_public_key_hash
|
||||
let size = Some 20
|
||||
end)
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:title name pp
|
||||
end
|
||||
end
|
||||
|
||||
let () =
|
||||
Base58.check_encoded_prefix Public_key_hash.b58check_encoding "tz2" 36
|
||||
|
@ -193,6 +193,9 @@ module Public_key_hash = struct
|
||||
~descr:"A Secp256k1 of a Ed25519 public key hash (Base58Check-encoded)"
|
||||
"pkh"
|
||||
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:title name pp
|
||||
end
|
||||
end
|
||||
|
||||
module Public_key = struct
|
||||
|
@ -711,3 +711,5 @@ let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f =
|
||||
Lwt_canceler.cancel canceler >>= fun () ->
|
||||
fail Timeout
|
||||
end
|
||||
|
||||
let errs_tag = Tag.def ~doc:"Errors" "errs" pp_print_error
|
||||
|
@ -69,3 +69,5 @@ module Make(Prefix : sig val id : string end) : Error_monad_sig.S
|
||||
|
||||
(**/**)
|
||||
val json_to_string : (Data_encoding.json -> string) ref
|
||||
|
||||
val errs_tag : error list Tag.def
|
||||
|
@ -7,7 +7,10 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Logging.Make(struct let name = "node.validator.bootstrap_pipeline" end)
|
||||
include Logging.Make_semantic(struct let name = "node.validator.bootstrap_pipeline" end)
|
||||
|
||||
let node_time_tag = Tag.def ~doc:"local time at this node" "node_time" Time.pp_hum
|
||||
let block_time_tag = Tag.def ~doc:"claimed creation time of block" "block_time" Time.pp_hum
|
||||
|
||||
open Validation_errors
|
||||
|
||||
@ -31,6 +34,8 @@ type t = {
|
||||
mutable errors: Error_monad.error list ;
|
||||
}
|
||||
|
||||
let operations_index_tag = Tag.def ~doc:"Operations index" "operations_index" Format.pp_print_int
|
||||
|
||||
let assert_acceptable_header pipeline
|
||||
hash (header : Block_header.t) =
|
||||
let chain_state = Distributed_db.chain_state pipeline.chain_db in
|
||||
@ -60,29 +65,36 @@ let assert_acceptable_header pipeline
|
||||
return_unit
|
||||
|
||||
let fetch_step pipeline (step : Block_locator.step) =
|
||||
lwt_log_info "fetching step %a -> %a (%d%s) from peer %a."
|
||||
Block_hash.pp_short step.block
|
||||
Block_hash.pp_short step.predecessor
|
||||
step.step
|
||||
(if step.strict_step then "" else " max")
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "fetching step %a -> %a (%a) from peer %a."
|
||||
-% t event "fetching_step_from_peer"
|
||||
-% a Block_hash.Logging.tag step.block
|
||||
-% a Block_hash.Logging.predecessor_tag step.predecessor
|
||||
-% a (Tag.def ~doc:"" "" Block_locator.pp_step) step
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
let rec fetch_loop acc hash cpt =
|
||||
Lwt_unix.yield () >>= fun () ->
|
||||
if cpt < 0 then
|
||||
lwt_log_info "invalid step from peer %a (too long)."
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "invalid step from peer %a (too long)."
|
||||
-% t event "step_too_long"
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
fail (Invalid_locator (pipeline.peer_id, pipeline.locator))
|
||||
else if Block_hash.equal hash step.predecessor then
|
||||
if step.strict_step && cpt <> 0 then
|
||||
lwt_log_info "invalid step from peer %a (too short)."
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "invalid step from peer %a (too short)."
|
||||
-% t event "step_too_short"
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
fail (Invalid_locator (pipeline.peer_id, pipeline.locator))
|
||||
else
|
||||
return acc
|
||||
else
|
||||
lwt_debug "fetching block header %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "fetching block header %a from peer %a."
|
||||
-% t event "fetching_block_header_from_peer"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
protect ~canceler:pipeline.canceler begin fun () ->
|
||||
Distributed_db.Block_header.fetch
|
||||
~timeout:pipeline.block_header_timeout
|
||||
@ -90,9 +102,11 @@ let fetch_step pipeline (step : Block_locator.step) =
|
||||
hash ()
|
||||
end >>=? fun header ->
|
||||
assert_acceptable_header pipeline hash header >>=? fun () ->
|
||||
lwt_debug "fetched block header %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "fetched block header %a from peer %a."
|
||||
-% t event "fetched_block_header_from_peer"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
fetch_loop ((hash, header) :: acc) header.shell.predecessor (cpt - 1)
|
||||
in
|
||||
fetch_loop [] step.block step.step >>=? fun headers ->
|
||||
@ -116,31 +130,39 @@ let headers_fetch_worker_loop pipeline =
|
||||
return_unit
|
||||
end >>= function
|
||||
| Ok () ->
|
||||
lwt_log_info "fetched all step from peer %a."
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "fetched all steps from peer %a."
|
||||
-% t event "fetched_all_steps_from_peer"
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
Lwt_pipe.close pipeline.fetched_headers ;
|
||||
Lwt.return_unit
|
||||
| Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] ->
|
||||
Lwt.return_unit
|
||||
| Error [ Distributed_db.Block_header.Timeout bh ] ->
|
||||
lwt_log_info "request for header %a from peer %a timed out."
|
||||
Block_hash.pp_short bh
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "request for header %a from peer %a timed out."
|
||||
-% t event "header_request_timeout"
|
||||
-% a Block_hash.Logging.tag bh
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Error [ Future_block_header { block; block_time; time } ] ->
|
||||
lwt_log_notice "Block locator %a from peer %a contains future blocks. \
|
||||
local time: %a, block time: %a"
|
||||
Block_hash.pp_short block
|
||||
Time.pp_hum time
|
||||
Time.pp_hum block_time
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Block locator %a from peer %a contains future blocks. \
|
||||
local time: %a, block time: %a"
|
||||
-% t event "locator_contains_future_blocks"
|
||||
-% a Block_hash.Logging.tag block
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id
|
||||
-% a node_time_tag time
|
||||
-% a block_time_tag block_time) >>= fun () ->
|
||||
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Error err ->
|
||||
pipeline.errors <- pipeline.errors @ err ;
|
||||
lwt_log_error "@[Unexpected error (headers fetch):@ %a@]"
|
||||
pp_print_error err >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "@[Unexpected error (headers fetch):@ %a@]"
|
||||
-% t event "unexpected_error"
|
||||
-% a errs_tag err) >>= fun () ->
|
||||
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
|
||||
@ -150,9 +172,11 @@ let rec operations_fetch_worker_loop pipeline =
|
||||
protect ~canceler:pipeline.canceler begin fun () ->
|
||||
Lwt_pipe.pop pipeline.fetched_headers >>= return
|
||||
end >>=? fun (hash, header) ->
|
||||
lwt_log_info "fetching operations of block %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "fetching operations of block %a from peer %a."
|
||||
-% t event "fetching_operations"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
let operations =
|
||||
map_p
|
||||
(fun i ->
|
||||
@ -163,9 +187,11 @@ let rec operations_fetch_worker_loop pipeline =
|
||||
(hash, i) header.shell.operations_hash
|
||||
end)
|
||||
(0 -- (header.shell.validation_passes - 1)) >>=? fun operations ->
|
||||
lwt_log_info "fetched operations of block %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "fetched operations of block %a from peer %a."
|
||||
-% t event "fetched_operations"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
return operations in
|
||||
protect ~canceler:pipeline.canceler begin fun () ->
|
||||
Lwt_pipe.push pipeline.fetched_blocks
|
||||
@ -178,15 +204,20 @@ let rec operations_fetch_worker_loop pipeline =
|
||||
Lwt_pipe.close pipeline.fetched_blocks ;
|
||||
Lwt.return_unit
|
||||
| Error [ Distributed_db.Operations.Timeout (bh, n) ] ->
|
||||
lwt_log_info "request for operations %a:%d from peer %a timed out."
|
||||
Block_hash.pp_short bh n
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "request for operations %a:%d from peer %a timed out."
|
||||
-% t event "request_operations_timeout"
|
||||
-% a Block_hash.Logging.tag bh
|
||||
-% s operations_index_tag n
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Error err ->
|
||||
pipeline.errors <- pipeline.errors @ err ;
|
||||
lwt_log_error "@[Unexpected error (operations fetch):@ %a@]"
|
||||
pp_print_error err >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "@[Unexpected error (operations fetch):@ %a@]"
|
||||
-% t event "unexpected_error"
|
||||
-% a errs_tag err) >>= fun () ->
|
||||
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
|
||||
@ -196,9 +227,11 @@ let rec validation_worker_loop pipeline =
|
||||
protect ~canceler:pipeline.canceler begin fun () ->
|
||||
Lwt_pipe.pop pipeline.fetched_blocks >>= return
|
||||
end >>=? fun (hash, header, operations) ->
|
||||
lwt_log_info "requesting validation for block %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "requesting validation for block %a from peer %a."
|
||||
-% t event "requesting_validation"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
operations >>=? fun operations ->
|
||||
protect ~canceler:pipeline.canceler begin fun () ->
|
||||
Block_validator.validate
|
||||
@ -207,9 +240,11 @@ let rec validation_worker_loop pipeline =
|
||||
pipeline.block_validator
|
||||
pipeline.chain_db hash header operations
|
||||
end >>=? fun _block ->
|
||||
lwt_log_info "validated block %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "validated block %a from peer %a."
|
||||
-% t event "validated_block"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
|
||||
return_unit
|
||||
end >>= function
|
||||
| Ok () -> validation_worker_loop pipeline
|
||||
@ -223,8 +258,10 @@ let rec validation_worker_loop pipeline =
|
||||
Lwt.return_unit
|
||||
| Error err ->
|
||||
pipeline.errors <- pipeline.errors @ err ;
|
||||
lwt_log_error "@[Unexpected error (validator):@ %a@]"
|
||||
pp_print_error err >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "@[Unexpected error (validator):@ %a@]"
|
||||
-% t event "unexpected_error"
|
||||
-% a errs_tag err) >>= fun () ->
|
||||
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
|
||||
|
@ -9,6 +9,8 @@
|
||||
|
||||
open State_logging
|
||||
|
||||
let block_hash_tag = Tag.def ~doc:"Block hash" "block_hash" Block_hash.pp_short
|
||||
|
||||
let mempool_encoding = Mempool.encoding
|
||||
|
||||
let genesis chain_state =
|
||||
@ -57,7 +59,10 @@ let locked_set_head chain_store data block =
|
||||
if Block_hash.equal hash ancestor then
|
||||
Lwt.return_unit
|
||||
else
|
||||
lwt_debug "pop_block %a" Block_hash.pp_short hash >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "pop_block %a"
|
||||
-% t event "pop_block"
|
||||
-% a block_hash_tag hash) >>= fun () ->
|
||||
Store.Chain_data.In_main_branch.remove (chain_store, hash) >>= fun () ->
|
||||
State.Block.predecessor block >>= function
|
||||
| Some predecessor ->
|
||||
@ -66,7 +71,10 @@ let locked_set_head chain_store data block =
|
||||
in
|
||||
let push_block pred_hash block =
|
||||
let hash = State.Block.hash block in
|
||||
lwt_debug "push_block %a" Block_hash.pp_short hash >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "push_block %a"
|
||||
-% t event "push_block"
|
||||
-% a block_hash_tag hash) >>= fun () ->
|
||||
Store.Chain_data.In_main_branch.store
|
||||
(chain_store, pred_hash) hash >>= fun () ->
|
||||
Lwt.return hash
|
||||
|
@ -24,6 +24,10 @@ module Make_raw
|
||||
val name : string
|
||||
val encoding : t Data_encoding.t
|
||||
val pp : Format.formatter -> t -> unit
|
||||
|
||||
module Logging : sig
|
||||
val tag : t Tag.def
|
||||
end
|
||||
end)
|
||||
(Disk_table :
|
||||
Distributed_db_functors.DISK_TABLE with type key := Hash.t)
|
||||
@ -164,6 +168,9 @@ module Raw_operation_hashes = struct
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
obj2 (req "block" Block_hash.encoding) (req "index" uint16)
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:"Operation hashes" "operation_hashes" pp
|
||||
end
|
||||
end)
|
||||
(Operation_hashes_storage)
|
||||
(Operations_table)
|
||||
@ -233,6 +240,9 @@ module Raw_operations = struct
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
obj2 (req "block" Block_hash.encoding) (req "index" uint16)
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:"Operations" "operations" pp
|
||||
end
|
||||
end)
|
||||
(Operations_storage)
|
||||
(Operations_table)
|
||||
@ -458,15 +468,18 @@ module P2p_reader = struct
|
||||
f chain_db
|
||||
|
||||
module Handle_msg_Logging =
|
||||
Logging.Make(struct let name = "node.distributed_db.p2p_reader" end)
|
||||
Tezos_stdlib.Logging.Make_semantic(struct let name = "node.distributed_db.p2p_reader" end)
|
||||
|
||||
let handle_msg global_db state msg =
|
||||
|
||||
let open Message in
|
||||
let open Handle_msg_Logging in
|
||||
|
||||
lwt_debug "Read message from %a: %a"
|
||||
P2p_peer.Id.pp_short state.gid Message.pp_json msg >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Read message from %a: %a"
|
||||
-% t event "read_message"
|
||||
-% a P2p_peer.Id.Logging.tag state.gid
|
||||
-% a Message.Logging.tag msg) >>= fun () ->
|
||||
|
||||
match msg with
|
||||
|
||||
@ -497,9 +510,11 @@ module P2p_reader = struct
|
||||
Lwt.return_unit
|
||||
end else if Time.(add (now ()) 15L < head.shell.timestamp) then begin
|
||||
(* TODO some penalty *)
|
||||
lwt_log_notice "Received future block %a from peer %a."
|
||||
Block_hash.pp_short (Block_header.hash head)
|
||||
P2p_peer.Id.pp_short state.gid >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Received future block %a from peer %a."
|
||||
-% t event "received_future_block"
|
||||
-% a Block_hash.Logging.tag (Block_header.hash head)
|
||||
-% a P2p_peer.Id.Logging.tag state.gid) >>= fun () ->
|
||||
Lwt.return_unit
|
||||
end else begin
|
||||
chain_db.callback.notify_branch state.gid locator ;
|
||||
@ -548,9 +563,11 @@ module P2p_reader = struct
|
||||
Lwt.return_unit
|
||||
end else if Time.(add (now ()) 15L < header.shell.timestamp) then begin
|
||||
(* TODO some penalty *)
|
||||
lwt_log_notice "Received future block %a from peer %a."
|
||||
Block_hash.pp_short head
|
||||
P2p_peer.Id.pp_short state.gid >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Received future block %a from peer %a."
|
||||
-% t event "received_future_block"
|
||||
-% a Block_hash.Logging.tag head
|
||||
-% a P2p_peer.Id.Logging.tag state.gid) >>= fun () ->
|
||||
Lwt.return_unit
|
||||
end else begin
|
||||
chain_db.callback.notify_head state.gid header mempool ;
|
||||
|
@ -320,6 +320,10 @@ module Make_request_scheduler
|
||||
val name : string
|
||||
val encoding : t Data_encoding.t
|
||||
val pp : Format.formatter -> t -> unit
|
||||
|
||||
module Logging : sig
|
||||
val tag : t Tag.def
|
||||
end
|
||||
end)
|
||||
(Table : MEMORY_TABLE with type key := Hash.t)
|
||||
(Request : REQUEST with type key := Hash.t) : sig
|
||||
@ -331,7 +335,7 @@ module Make_request_scheduler
|
||||
|
||||
end = struct
|
||||
|
||||
include Logging.Make(struct let name = "node.distributed_db.scheduler." ^ Hash.name end)
|
||||
include Logging.Make_semantic(struct let name = "node.distributed_db.scheduler." ^ Hash.name end)
|
||||
|
||||
type key = Hash.t
|
||||
|
||||
@ -363,24 +367,38 @@ end = struct
|
||||
let request t p k =
|
||||
assert (Lwt_pipe.push_now t.queue (Request (p, k)))
|
||||
let notify t p k =
|
||||
debug "push received %a from %a"
|
||||
Hash.pp k P2p_peer.Id.pp_short p ;
|
||||
debug Tag.DSL.(fun f ->
|
||||
f "push received %a from %a"
|
||||
-% t event "push_received"
|
||||
-% a Hash.Logging.tag k
|
||||
-% a P2p_peer.Id.Logging.tag p);
|
||||
assert (Lwt_pipe.push_now t.queue (Notify (p, k)))
|
||||
let notify_cancelation t k =
|
||||
debug "push cancelation %a"
|
||||
Hash.pp k ;
|
||||
debug Tag.DSL.(fun f ->
|
||||
f "push cancelation %a"
|
||||
-% t event "push_cancelation"
|
||||
-% a Hash.Logging.tag k);
|
||||
assert (Lwt_pipe.push_now t.queue (Notify_cancelation k))
|
||||
let notify_invalid t p k =
|
||||
debug "push received invalid %a from %a"
|
||||
Hash.pp k P2p_peer.Id.pp_short p ;
|
||||
debug Tag.DSL.(fun f ->
|
||||
f "push received invalid %a from %a"
|
||||
-% t event "push_received_invalid"
|
||||
-% a Hash.Logging.tag k
|
||||
-% a P2p_peer.Id.Logging.tag p);
|
||||
assert (Lwt_pipe.push_now t.queue (Notify_invalid (p, k)))
|
||||
let notify_duplicate t p k =
|
||||
debug "push received duplicate %a from %a"
|
||||
Hash.pp k P2p_peer.Id.pp_short p ;
|
||||
debug Tag.DSL.(fun f ->
|
||||
f "push received duplicate %a from %a"
|
||||
-% t event "push_received_duplicate"
|
||||
-% a Hash.Logging.tag k
|
||||
-% a P2p_peer.Id.Logging.tag p);
|
||||
assert (Lwt_pipe.push_now t.queue (Notify_duplicate (p, k)))
|
||||
let notify_unrequested t p k =
|
||||
debug "push received unrequested %a from %a"
|
||||
Hash.pp k P2p_peer.Id.pp_short p ;
|
||||
debug Tag.DSL.(fun f ->
|
||||
f "push received unrequested %a from %a"
|
||||
-% t event "push_received_unrequested"
|
||||
-% a Hash.Logging.tag k
|
||||
-% a P2p_peer.Id.Logging.tag p);
|
||||
assert (Lwt_pipe.push_now t.queue (Notify_unrequested (p, k)))
|
||||
|
||||
let compute_timeout state =
|
||||
@ -401,17 +419,16 @@ end = struct
|
||||
Lwt_unix.sleep delay
|
||||
end
|
||||
|
||||
let may_pp_peer ppf = function
|
||||
| None -> ()
|
||||
| Some peer -> P2p_peer.Id.pp_short ppf peer
|
||||
|
||||
(* TODO should depend on the ressource kind... *)
|
||||
let initial_delay = 0.5
|
||||
|
||||
let process_event state now = function
|
||||
| Request (peer, key) -> begin
|
||||
lwt_debug "registering request %a from %a"
|
||||
Hash.pp key may_pp_peer peer >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "registering request %a from %a"
|
||||
-% t event "registering_request"
|
||||
-% a Hash.Logging.tag key
|
||||
-% a P2p_peer.Id.Logging.tag_opt peer) >>= fun () ->
|
||||
try
|
||||
let data = Table.find state.pending key in
|
||||
let peers =
|
||||
@ -423,8 +440,11 @@ end = struct
|
||||
next_request = min data.next_request (now +. initial_delay) ;
|
||||
peers ;
|
||||
} ;
|
||||
lwt_debug "registering request %a from %a -> replaced"
|
||||
Hash.pp key may_pp_peer peer >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "registering request %a from %a -> replaced"
|
||||
-% t event "registering_request_replaced"
|
||||
-% a Hash.Logging.tag key
|
||||
-% a P2p_peer.Id.Logging.tag_opt peer) >>= fun () ->
|
||||
Lwt.return_unit
|
||||
with Not_found ->
|
||||
let peers =
|
||||
@ -436,33 +456,50 @@ end = struct
|
||||
next_request = now ;
|
||||
delay = initial_delay ;
|
||||
} ;
|
||||
lwt_debug "registering request %a from %a -> added"
|
||||
Hash.pp key may_pp_peer peer >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "registering request %a from %a -> added"
|
||||
-% t event "registering_request_added"
|
||||
-% a Hash.Logging.tag key
|
||||
-% a P2p_peer.Id.Logging.tag_opt peer) >>= fun () ->
|
||||
Lwt.return_unit
|
||||
end
|
||||
| Notify (peer, key) ->
|
||||
Table.remove state.pending key ;
|
||||
lwt_debug "received %a from %a"
|
||||
Hash.pp key P2p_peer.Id.pp_short peer >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "received %a from %a"
|
||||
-% t event "received"
|
||||
-% a Hash.Logging.tag key
|
||||
-% a P2p_peer.Id.Logging.tag peer) >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Notify_cancelation key ->
|
||||
Table.remove state.pending key ;
|
||||
lwt_debug "canceled %a"
|
||||
Hash.pp key >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "canceled %a"
|
||||
-% t event "canceled"
|
||||
-% a Hash.Logging.tag key) >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Notify_invalid (peer, key) ->
|
||||
lwt_debug "received invalid %a from %a"
|
||||
Hash.pp key P2p_peer.Id.pp_short peer >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "received invalid %a from %a"
|
||||
-% t event "received_invalid"
|
||||
-% a Hash.Logging.tag key
|
||||
-% a P2p_peer.Id.Logging.tag peer) >>= fun () ->
|
||||
(* TODO *)
|
||||
Lwt.return_unit
|
||||
| Notify_unrequested (peer, key) ->
|
||||
lwt_debug "received unrequested %a from %a"
|
||||
Hash.pp key P2p_peer.Id.pp_short peer >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "received unrequested %a from %a"
|
||||
-% t event "received_unrequested"
|
||||
-% a Hash.Logging.tag key
|
||||
-% a P2p_peer.Id.Logging.tag peer) >>= fun () ->
|
||||
(* TODO *)
|
||||
Lwt.return_unit
|
||||
| Notify_duplicate (peer, key) ->
|
||||
lwt_debug "received duplicate %a from %a"
|
||||
Hash.pp key P2p_peer.Id.pp_short peer >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "received duplicate %a from %a"
|
||||
-% t event "received_duplicate"
|
||||
-% a Hash.Logging.tag key
|
||||
-% a P2p_peer.Id.Logging.tag peer) >>= fun () ->
|
||||
(* TODO *)
|
||||
Lwt.return_unit
|
||||
|
||||
@ -473,7 +510,8 @@ end = struct
|
||||
Lwt.choose
|
||||
[ (state.events >|= fun _ -> ()) ; timeout ; shutdown ] >>= fun () ->
|
||||
if Lwt.state shutdown <> Lwt.Sleep then
|
||||
lwt_debug "terminating" >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "terminating" -% t event "terminating") >>= fun () ->
|
||||
Lwt.return_unit
|
||||
else if Lwt.state state.events <> Lwt.Sleep then
|
||||
let now = Unix.gettimeofday () in
|
||||
@ -482,7 +520,8 @@ end = struct
|
||||
Lwt_list.iter_s (process_event state now) events >>= fun () ->
|
||||
loop state
|
||||
else
|
||||
lwt_debug "timeout" >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "timeout" -% t event "timeout") >>= fun () ->
|
||||
let now = Unix.gettimeofday () in
|
||||
let active_peers = Request.active state.param in
|
||||
let requests =
|
||||
@ -515,8 +554,11 @@ end = struct
|
||||
P2p_peer.Map.fold begin fun peer request acc ->
|
||||
acc >>= fun () ->
|
||||
Lwt_list.iter_s (fun key ->
|
||||
lwt_debug "requested %a from %a"
|
||||
Hash.pp key P2p_peer.Id.pp_short peer)
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "requested %a from %a"
|
||||
-% t event "requested"
|
||||
-% a Hash.Logging.tag key
|
||||
-% a P2p_peer.Id.Logging.tag peer))
|
||||
request
|
||||
end requests Lwt.return_unit >>= fun () ->
|
||||
loop state
|
||||
|
@ -172,6 +172,10 @@ module Make_request_scheduler
|
||||
val name : string
|
||||
val encoding : t Data_encoding.t
|
||||
val pp : Format.formatter -> t -> unit
|
||||
|
||||
module Logging : sig
|
||||
val tag : t Tag.def
|
||||
end
|
||||
end)
|
||||
(Table : MEMORY_TABLE with type key := Hash.t)
|
||||
(Request : REQUEST with type key := Hash.t) : sig
|
||||
|
@ -302,3 +302,7 @@ let raw_encoding = P2p.Raw.encoding encoding
|
||||
let pp_json ppf msg =
|
||||
Data_encoding.Json.pp ppf
|
||||
(Data_encoding.Json.construct raw_encoding (Message msg))
|
||||
|
||||
module Logging = struct
|
||||
let tag = Tag.def ~doc:"Message" "message" pp_json
|
||||
end
|
||||
|
@ -50,3 +50,7 @@ module Bounded_encoding : sig
|
||||
val set_protocol_max_size: int option -> unit
|
||||
val set_mempool_max_operations: int option -> unit
|
||||
end
|
||||
|
||||
module Logging : sig
|
||||
val tag : t Tag.def
|
||||
end
|
||||
|
@ -46,12 +46,14 @@ let init_p2p p2p_params =
|
||||
match p2p_params with
|
||||
| None ->
|
||||
let c_meta = init_connection_metadata None in
|
||||
lwt_log_notice "P2P layer is disabled" >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "P2P layer is disabled" -% t event "p2p_disabled") >>= fun () ->
|
||||
return (P2p.faked_network peer_metadata_cfg c_meta)
|
||||
| Some (config, limits) ->
|
||||
let c_meta = init_connection_metadata (Some config) in
|
||||
let conn_metadata_cfg = connection_metadata_cfg c_meta in
|
||||
lwt_log_notice "bootstrapping chain..." >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "bootstrapping chain..." -% t event "bootstrapping_chain") >>= fun () ->
|
||||
P2p.create
|
||||
~config ~limits
|
||||
peer_metadata_cfg
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
open Validation_errors
|
||||
|
||||
include Logging.Make(struct let name = "node.validator.block" end)
|
||||
include Logging.Make_semantic(struct let name = "node.validator.block" end)
|
||||
|
||||
type 'a request =
|
||||
| Request_validation: {
|
||||
@ -68,11 +68,14 @@ let rec worker_loop bv =
|
||||
| Ok () ->
|
||||
worker_loop bv
|
||||
| Error [Canceled | Exn Lwt_pipe.Closed] ->
|
||||
lwt_log_notice "terminating" >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "terminating" -% t event "terminating") >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Error err ->
|
||||
lwt_log_error "@[Unexpected error (worker):@ %a@]"
|
||||
pp_print_error err >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "@[Unexpected error (worker):@ %a@]"
|
||||
-% t event "unexpected_error"
|
||||
-% a errs_tag err) >>= fun () ->
|
||||
Lwt_canceler.cancel bv.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
|
||||
@ -99,13 +102,17 @@ let shutdown { canceler ; worker } =
|
||||
let validate { messages } hash protocol =
|
||||
match Registered_protocol.get hash with
|
||||
| Some protocol ->
|
||||
lwt_debug "previously validated protocol %a (before pipe)"
|
||||
Protocol_hash.pp_short hash >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "previously validated protocol %a (before pipe)"
|
||||
-% t event "previously_validated_protocol"
|
||||
-% a Protocol_hash.Logging.tag hash) >>= fun () ->
|
||||
return protocol
|
||||
| None ->
|
||||
let res, wakener = Lwt.task () in
|
||||
lwt_debug "pushing validation request for protocol %a"
|
||||
Protocol_hash.pp_short hash >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "pushing validation request for protocol %a"
|
||||
-% t event "pushing_validation_request"
|
||||
-% a Protocol_hash.Logging.tag hash) >>= fun () ->
|
||||
Lwt_pipe.push messages
|
||||
(Message (Request_validation { hash ; protocol },
|
||||
Some wakener)) >>= fun () ->
|
||||
@ -119,14 +126,11 @@ let fetch_and_compile_protocol pv ?peer ?timeout hash =
|
||||
Distributed_db.Protocol.read_opt pv.db hash >>= function
|
||||
| Some protocol -> return protocol
|
||||
| None ->
|
||||
let may_print_peer ppf = function
|
||||
| None -> ()
|
||||
| Some peer ->
|
||||
Format.fprintf ppf " from peer %a"
|
||||
P2p_peer.Id.pp peer in
|
||||
lwt_log_notice "Fetching protocol %a%a"
|
||||
Protocol_hash.pp_short hash
|
||||
may_print_peer peer >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Fetching protocol %a%a"
|
||||
-% t event "fetching_protocol"
|
||||
-% a Protocol_hash.Logging.tag hash
|
||||
-% a P2p_peer.Id.Logging.tag_source peer) >>= fun () ->
|
||||
Distributed_db.Protocol.fetch pv.db ?peer ?timeout hash ()
|
||||
end >>=? fun protocol ->
|
||||
validate pv hash protocol >>=? fun proto ->
|
||||
|
@ -577,7 +577,10 @@ module Chain = struct
|
||||
end
|
||||
|
||||
let destroy state chain =
|
||||
lwt_debug "destroy %a" Chain_id.pp (id chain) >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "destroy %a"
|
||||
-% t event "destroy"
|
||||
-% a chain_id (id chain)) >>= fun () ->
|
||||
Shared.use state.global_data begin fun { global_store ; chains } ->
|
||||
Chain_id.Table.remove chains (id chain) ;
|
||||
Store.Chain.destroy global_store (id chain) >>= fun () ->
|
||||
|
@ -7,4 +7,6 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Tezos_stdlib.Logging.Make(struct let name = "node.state" end)
|
||||
include Tezos_stdlib.Logging.Make_semantic(struct let name = "node.state" end)
|
||||
|
||||
let chain_id = Tag.def ~doc:"Chain ID" "chain_id" Chain_id.pp
|
||||
|
@ -7,4 +7,6 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Tezos_stdlib.Logging.LOG
|
||||
include Tezos_stdlib.Logging.SEMLOG
|
||||
|
||||
val chain_id: Chain_id.t Tag.def
|
||||
|
@ -7,7 +7,7 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Logging.Make(struct let name = "node.validator" end)
|
||||
include Logging.Make_semantic(struct let name = "node.validator" end)
|
||||
|
||||
type t = {
|
||||
|
||||
@ -40,7 +40,10 @@ let create state db
|
||||
|
||||
let activate v ?max_child_ttl ~start_prevalidator chain_state =
|
||||
let chain_id = State.Chain.id chain_state in
|
||||
lwt_log_notice "activate chain %a" Chain_id.pp chain_id >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "activate chain %a"
|
||||
-% t event "active_chain"
|
||||
-% a State_logging.chain_id chain_id) >>= fun () ->
|
||||
try Chain_id.Table.find v.active_chains chain_id
|
||||
with Not_found ->
|
||||
let nv =
|
||||
|
@ -7,4 +7,4 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Tezos_stdlib.Logging.Make(struct let name = "node.worker" end)
|
||||
include Tezos_stdlib.Logging.Make_semantic(struct let name = "node.worker" end)
|
||||
|
@ -7,4 +7,4 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Tezos_stdlib.Logging.LOG
|
||||
include Tezos_stdlib.Logging.SEMLOG
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
open Client_keys
|
||||
|
||||
include Logging.Make(struct let name = "client.signer.ledger" end)
|
||||
include Tezos_stdlib.Logging.Make(struct let name = "client.signer.ledger" end)
|
||||
|
||||
let scheme = "ledger"
|
||||
|
||||
|
@ -7,12 +7,13 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Logging.Make(struct let name = "client.denunciation" end)
|
||||
include Tezos_stdlib.Logging.Make_semantic(struct let name = "client.denunciation" end)
|
||||
|
||||
open Proto_alpha
|
||||
open Alpha_context
|
||||
|
||||
open Client_baking_blocks
|
||||
open Logging
|
||||
|
||||
module HLevel = Hashtbl.Make(struct
|
||||
include Raw_level
|
||||
@ -55,7 +56,10 @@ let get_block_offset level =
|
||||
else
|
||||
`Head 5)
|
||||
| Error errs ->
|
||||
lwt_log_error "Invalid level conversion : %a" pp_print_error errs >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Invalid level conversion : %a"
|
||||
-% t event "invalid_level_conversion"
|
||||
-% a errs_tag errs) >>= fun () ->
|
||||
Lwt.return (`Head 0)
|
||||
|
||||
let process_endorsements (cctxt : #Proto_alpha.full) state ~chain
|
||||
@ -83,17 +87,24 @@ let process_endorsements (cctxt : #Proto_alpha.full) state ~chain
|
||||
~op1:existing_endorsement
|
||||
~op2:new_endorsement () >>=? fun bytes ->
|
||||
let bytes = Signature.concat bytes Signature.zero in
|
||||
lwt_log_notice "Double endorsement detected" >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Double endorsement detected"
|
||||
-% t event "double_endorsement_detected"
|
||||
-% t conflicting_endorsements_tag (existing_endorsement, new_endorsement)) >>= fun () ->
|
||||
(* A denunciation may have already occured *)
|
||||
Shell_services.Injection.operation cctxt ~chain bytes >>=? fun op_hash ->
|
||||
lwt_log_notice "Double endorsement evidence injected %a"
|
||||
Operation_hash.pp op_hash >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Double endorsement evidence injected %a"
|
||||
-% t event "double_endorsement_denounced"
|
||||
-% a Operation_hash.Logging.tag op_hash) >>= fun () ->
|
||||
return @@ HLevel.replace state.endorsements_table level
|
||||
(Delegate_Map.add delegate new_endorsement map)
|
||||
end
|
||||
| _ ->
|
||||
lwt_log_error "Inconsistent endorsement found %a"
|
||||
Operation_hash.pp hash >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Inconsistent endorsement found %a"
|
||||
-% t event "inconsistent_endorsement"
|
||||
-% a Operation_hash.Logging.tag hash) >>= fun () ->
|
||||
return_unit
|
||||
) endorsements >>=? fun () ->
|
||||
return_unit
|
||||
@ -108,7 +119,7 @@ let process_block (cctxt : #Proto_alpha.full) state ~chain (header : Alpha_block
|
||||
(Delegate_Map.add baker hash map)
|
||||
| Some existing_hash when Block_hash.(=) existing_hash hash ->
|
||||
(* This case should never happen *)
|
||||
lwt_debug "Double baking detected but block hashes are equivalent. Skipping..." >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f -> f "Double baking detected but block hashes are equivalent. Skipping..." -% t event "double_baking_but_not") >>= fun () ->
|
||||
return @@ HLevel.replace state.blocks_table level
|
||||
(Delegate_Map.add baker hash map)
|
||||
| Some existing_hash ->
|
||||
@ -126,11 +137,15 @@ let process_block (cctxt : #Proto_alpha.full) state ~chain (header : Alpha_block
|
||||
Alpha_services.Forge.double_baking_evidence cctxt (`Main, block) ~branch:block_hash
|
||||
~bh1 ~bh2 () >>=? fun bytes ->
|
||||
let bytes = Signature.concat bytes Signature.zero in
|
||||
lwt_log_notice "Double baking detected" >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Double baking detected"
|
||||
-% t event "double_baking_detected") >>= fun () ->
|
||||
(* A denunciation may have already occured *)
|
||||
Shell_services.Injection.operation cctxt ~chain bytes >>=? fun op_hash ->
|
||||
lwt_log_notice "Double baking evidence injected %a"
|
||||
Operation_hash.pp op_hash >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Double baking evidence injected %a"
|
||||
-% t event "double_baking_denounced"
|
||||
-% a Operation_hash.Logging.tag op_hash) >>= fun () ->
|
||||
return @@ HLevel.replace state.blocks_table level
|
||||
(Delegate_Map.add baker hash map)
|
||||
end
|
||||
@ -166,10 +181,18 @@ let endorsements_index = 0
|
||||
*)
|
||||
let process_new_block (cctxt : #Proto_alpha.full) state { hash ; chain_id ; level ; protocol ; next_protocol } =
|
||||
if Protocol_hash.(protocol <> next_protocol) then
|
||||
lwt_log_error "Protocol changing detected. Skipping the block." >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Protocol changing detected. Skipping the block."
|
||||
-% t event "protocol_change_detected"
|
||||
(* TODO which protocols -- in tag *)
|
||||
) >>= fun () ->
|
||||
return_unit
|
||||
else
|
||||
lwt_debug "Block level : %a" Raw_level.pp level >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Block level : %a"
|
||||
-% t event "accuser_saw_block"
|
||||
-% a level_tag level
|
||||
-% t Block_hash.Logging.tag hash) >>= fun () ->
|
||||
let chain = `Hash chain_id in
|
||||
let block = `Hash (hash, 0) in
|
||||
state.highest_level_encountered <- Raw_level.max level state.highest_level_encountered ;
|
||||
@ -179,9 +202,11 @@ let process_new_block (cctxt : #Proto_alpha.full) state { hash ; chain_id ; leve
|
||||
| Ok block_info ->
|
||||
process_block cctxt state ~chain block_info
|
||||
| Error errs ->
|
||||
lwt_log_error "Error while fetching operations in block %a@\n%a"
|
||||
Block_hash.pp_short hash
|
||||
pp_print_error errs >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Error while fetching operations in block %a@\n%a"
|
||||
-% t event "fetch_operations_error"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% a errs_tag errs) >>= fun () ->
|
||||
return_unit
|
||||
end >>=? fun () ->
|
||||
(* Processing endorsements *)
|
||||
@ -192,9 +217,11 @@ let process_new_block (cctxt : #Proto_alpha.full) state { hash ; chain_id ; leve
|
||||
process_endorsements cctxt state ~chain endorsements level
|
||||
else return_unit
|
||||
| Error errs ->
|
||||
lwt_log_error "Error while fetching operations in block %a@\n%a"
|
||||
Block_hash.pp_short hash
|
||||
pp_print_error errs >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Error while fetching operations in block %a@\n%a"
|
||||
-% t event "fetch_operations_error"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% a errs_tag errs) >>= fun () ->
|
||||
return_unit
|
||||
end >>=? fun () ->
|
||||
cleanup_old_operations state ;
|
||||
@ -205,14 +232,17 @@ let create (cctxt : #Proto_alpha.full) ~preserved_levels valid_blocks_stream =
|
||||
let process_block cctxt state bi =
|
||||
process_new_block cctxt state bi >>= function
|
||||
| Ok () ->
|
||||
lwt_log_notice
|
||||
"Block %a registered"
|
||||
Block_hash.pp_short bi.Client_baking_blocks.hash
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Block %a registered"
|
||||
-% t event "accuser_processed_block"
|
||||
-% a Block_hash.Logging.tag bi.Client_baking_blocks.hash)
|
||||
>>= return
|
||||
| Error errs ->
|
||||
lwt_log_error "Error while processing block %a@\n%a"
|
||||
Block_hash.pp_short bi.hash
|
||||
pp_print_error errs
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Error while processing block %a@\n%a"
|
||||
-% t event "accuser_block_error"
|
||||
-% a Block_hash.Logging.tag bi.hash
|
||||
-% a errs_tag errs)
|
||||
>>= return
|
||||
in
|
||||
|
||||
|
@ -10,7 +10,9 @@
|
||||
open Proto_alpha
|
||||
open Alpha_context
|
||||
|
||||
include Logging.Make(struct let name = "client.endorsement" end)
|
||||
include Tezos_stdlib.Logging.Make_semantic(struct let name = "client.endorsement" end)
|
||||
|
||||
open Logging
|
||||
|
||||
module State = Daemon_state.Make(struct let name = "endorsement" end)
|
||||
|
||||
@ -104,51 +106,71 @@ let endorse_for_delegate cctxt block delegate =
|
||||
let { Client_baking_blocks.hash ; level } = block in
|
||||
let b = `Hash (hash, 0) in
|
||||
Client_keys.get_key cctxt delegate >>=? fun (name, _pk, sk) ->
|
||||
lwt_debug "Endorsing %a for %s (level %a)!"
|
||||
Block_hash.pp_short hash name
|
||||
Raw_level.pp level >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Endorsing %a for %s (level %a)!"
|
||||
-% t event "endorsing"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% s Client_keys.Logging.tag name
|
||||
-% a level_tag level) >>= fun () ->
|
||||
inject_endorsement cctxt
|
||||
b hash level
|
||||
sk delegate >>=? fun oph ->
|
||||
lwt_log_notice
|
||||
"Injected endorsement for block '%a' \
|
||||
(level %a, contract %s) '%a'"
|
||||
Block_hash.pp_short hash
|
||||
Raw_level.pp level
|
||||
name
|
||||
Operation_hash.pp_short oph >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f ->
|
||||
f "Injected endorsement for block '%a' \
|
||||
(level %a, contract %s) '%a'"
|
||||
-% t event "injected_endorsement"
|
||||
-% a Block_hash.Logging.tag hash
|
||||
-% a level_tag level
|
||||
-% s Client_keys.Logging.tag name
|
||||
-% a Operation_hash.Logging.tag oph) >>= fun () ->
|
||||
return_unit
|
||||
|
||||
let allowed_to_endorse cctxt bi delegate =
|
||||
Client_keys.Public_key_hash.name cctxt delegate >>=? fun name ->
|
||||
lwt_debug "Checking if allowed to endorse block %a for %s"
|
||||
Block_hash.pp_short bi.Client_baking_blocks.hash name >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Checking if allowed to endorse block %a for %s"
|
||||
-% t event "check_endorsement_ok"
|
||||
-% a Block_hash.Logging.tag bi.Client_baking_blocks.hash
|
||||
-% s Client_keys.Logging.tag name) >>= fun () ->
|
||||
let b = `Hash (bi.hash, 0) in
|
||||
let level = bi.level in
|
||||
get_signing_slots cctxt b delegate level >>=? function
|
||||
| None | Some [] ->
|
||||
lwt_debug "No slot found for %a/%s"
|
||||
Block_hash.pp_short bi.hash name >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "No slot found for %a/%s"
|
||||
-% t event "endorsement_no_slots_found"
|
||||
-% a Block_hash.Logging.tag bi.hash
|
||||
-% s Client_keys.Logging.tag name) >>= fun () ->
|
||||
return_false
|
||||
| Some (_ :: _ as slots) ->
|
||||
lwt_debug "Found slots for %a/%s (%d)"
|
||||
Block_hash.pp_short bi.hash name (List.length slots) >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Found slots for %a/%s (%a)"
|
||||
-% t event "endorsement_slots_found"
|
||||
-% a Block_hash.Logging.tag bi.hash
|
||||
-% s Client_keys.Logging.tag name
|
||||
-% a endorsement_slots_tag slots) >>= fun () ->
|
||||
previously_endorsed_level cctxt delegate level >>=? function
|
||||
| true ->
|
||||
lwt_debug "Level %a (or higher) previously endorsed: do not endorse."
|
||||
Raw_level.pp level >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Level %a (or higher) previously endorsed: do not endorse."
|
||||
-% t event "previously_endorsed"
|
||||
-% a level_tag level) >>= fun () ->
|
||||
return_false
|
||||
| false ->
|
||||
return_true
|
||||
|
||||
let prepare_endorsement ~(max_past:int64) () (cctxt : #Proto_alpha.full) state bi =
|
||||
if Time.diff (Time.now ()) bi.Client_baking_blocks.timestamp > max_past then
|
||||
lwt_log_info "Ignore block %a: forged too far the past"
|
||||
Block_hash.pp_short bi.hash >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "Ignore block %a: forged too far the past"
|
||||
-% t event "endorsement_stale_block"
|
||||
-% a Block_hash.Logging.tag bi.hash) >>= fun () ->
|
||||
return_unit
|
||||
else
|
||||
lwt_log_info "Received new block %a"
|
||||
Block_hash.pp_short bi.hash >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "Received new block %a"
|
||||
-% t event "endorsement_got_block"
|
||||
-% a Block_hash.Logging.tag bi.hash) >>= fun () ->
|
||||
let time = Time.(add (now ()) state.delay) in
|
||||
let timeout = Lwt_unix.sleep (Int64.to_float state.delay) in
|
||||
get_delegates cctxt state >>=? fun delegates ->
|
||||
@ -168,11 +190,6 @@ let compute_timeout state =
|
||||
timeout >>= fun () ->
|
||||
Lwt.return (block, delegates)
|
||||
|
||||
let check_error f =
|
||||
f >>= function
|
||||
| Ok () -> Lwt.return_unit
|
||||
| Error errs -> lwt_log_error "Error while endorsing:@\n%a" pp_print_error errs
|
||||
|
||||
let create
|
||||
(cctxt: #Proto_alpha.full)
|
||||
?(max_past=110L)
|
||||
|
@ -10,7 +10,8 @@
|
||||
open Proto_alpha
|
||||
open Alpha_context
|
||||
|
||||
include Logging.Make(struct let name = "client.baking" end)
|
||||
include Tezos_stdlib.Logging.Make_semantic(struct let name = "client.baking" end)
|
||||
open Logging
|
||||
|
||||
|
||||
(* The index of the different components of the protocol's validation passes *)
|
||||
@ -326,11 +327,13 @@ let forge_block cctxt ?(chain = `Main) block
|
||||
List.fold_left
|
||||
(fun acc r -> acc + List.length r.Preapply_result.applied)
|
||||
0 result in
|
||||
lwt_log_info "Found %d valid operations (%d refused) for timestamp %a"
|
||||
valid_op_count (total_op_count - valid_op_count)
|
||||
Time.pp_hum timestamp >>= fun () ->
|
||||
lwt_log_info "Computed fitness %a"
|
||||
Fitness.pp shell_header.fitness >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "Found %d valid operations (%d refused) for timestamp %a@.Computed fitness %a"
|
||||
-% t event "found_valid_operations"
|
||||
-% s valid_ops valid_op_count
|
||||
-% s refused_ops (total_op_count - valid_op_count)
|
||||
-% a timestamp_tag timestamp
|
||||
-% a fitness_tag shell_header.fitness) >>= fun () ->
|
||||
|
||||
(* everything went well (or we don't care about errors): GO! *)
|
||||
if best_effort || all_ops_valid result then
|
||||
@ -359,6 +362,7 @@ let previously_baked_level cctxt pkh new_lvl =
|
||||
| Some last_lvl ->
|
||||
return (Raw_level.(last_lvl >= new_lvl))
|
||||
|
||||
|
||||
let get_baking_slot cctxt
|
||||
?max_priority (bi: Client_baking_blocks.block_info) delegates =
|
||||
let chain = `Hash bi.chain_id in
|
||||
@ -370,12 +374,16 @@ let get_baking_slot cctxt
|
||||
~delegates
|
||||
(chain, block) >>= function
|
||||
| Error errs ->
|
||||
lwt_log_error "Error while fetching baking possibilities:\n%a"
|
||||
pp_print_error errs >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Error while fetching baking possibilities:\n%a"
|
||||
-% t event "baking_slot_fetch_errors"
|
||||
-% a errs_tag errs) >>= fun () ->
|
||||
Lwt.return_nil
|
||||
| Ok [] ->
|
||||
lwt_log_info "Found no baking rights for level %a"
|
||||
Raw_level.pp level >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "Found no baking rights for level %a"
|
||||
-% t event "no_baking_rights"
|
||||
-% a level_tag level) >>= fun () ->
|
||||
Lwt.return_nil
|
||||
| Ok slots ->
|
||||
let slots =
|
||||
@ -447,7 +455,11 @@ let safe_get_unrevealed_nonces cctxt block =
|
||||
get_unrevealed_nonces cctxt block >>= function
|
||||
| Ok r -> Lwt.return r
|
||||
| Error err ->
|
||||
lwt_warn "Cannot read nonces: %a@." pp_print_error err >>= fun () ->
|
||||
lwt_warn Tag.DSL.(fun f ->
|
||||
f "Cannot read nonces: %a@."
|
||||
-% t event "read_nonce_fail"
|
||||
-% a errs_tag err)
|
||||
>>= fun () ->
|
||||
Lwt.return_nil
|
||||
|
||||
let insert_block
|
||||
@ -470,16 +482,22 @@ let insert_block
|
||||
get_baking_slot cctxt ?max_priority bi delegates >>= function
|
||||
| [] ->
|
||||
lwt_debug
|
||||
"Can't compute slots for %a" Block_hash.pp_short bi.hash >>= fun () ->
|
||||
Tag.DSL.(fun f ->
|
||||
f "Can't compute slots for %a"
|
||||
-% t event "cannot_compute_slot"
|
||||
-% a Block_hash.Logging.tag bi.hash) >>= fun () ->
|
||||
return_unit
|
||||
| (_ :: _) as slots ->
|
||||
iter_p
|
||||
(fun ((timestamp, (_, _, delegate)) as slot) ->
|
||||
Client_keys.Public_key_hash.name cctxt delegate >>=? fun name ->
|
||||
lwt_log_info "New baking slot at %a for %s after %a"
|
||||
Time.pp_hum timestamp
|
||||
name
|
||||
Block_hash.pp_short bi.hash >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "New baking slot at %a for %s after %a"
|
||||
-% t event "have_baking_slot"
|
||||
-% a timestamp_tag timestamp
|
||||
-% s Client_keys.Logging.tag name
|
||||
-% a Block_hash.Logging.tag bi.hash
|
||||
-% t Signature.Public_key_hash.Logging.tag delegate) >>= fun () ->
|
||||
state.future_slots <- insert_baking_slot slot state.future_slots ;
|
||||
return_unit
|
||||
)
|
||||
@ -498,14 +516,18 @@ let pop_baking_slots state =
|
||||
|
||||
let filter_invalid_operations (cctxt : #full) state block_info (operations : packed_operation list list) =
|
||||
let open Client_baking_simulator in
|
||||
lwt_debug "Starting client-side validation %a"
|
||||
Block_hash.pp block_info.Client_baking_blocks.hash >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Starting client-side validation %a"
|
||||
-% t event "baking_local_validation_start"
|
||||
-% a Block_hash.Logging.tag block_info.Client_baking_blocks.hash) >>= fun () ->
|
||||
begin begin_construction cctxt state.index block_info >>= function
|
||||
| Ok inc -> return inc
|
||||
| Error errs ->
|
||||
lwt_log_error "Error while fetching current context : %a"
|
||||
pp_print_error errs >>= fun () ->
|
||||
lwt_log_notice "Retrying to open the context" >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Error while fetching current context : %a"
|
||||
-% t event "context_fetch_error"
|
||||
-% a errs_tag errs) >>= fun () ->
|
||||
lwt_log_notice Tag.DSL.(fun f -> f "Retrying to open the context" -% t event "reopen_context") >>= fun () ->
|
||||
Client_baking_simulator.load_context ~context_path:state.context_path >>= fun index ->
|
||||
begin_construction cctxt index block_info >>=? fun inc ->
|
||||
state.index <- index;
|
||||
@ -518,9 +540,11 @@ let filter_invalid_operations (cctxt : #full) state block_info (operations : pac
|
||||
let validate_operation inc op =
|
||||
add_operation inc op >>= function
|
||||
| Error errs ->
|
||||
lwt_log_info "Client-side validation: invalid operation filtered %a\n%a"
|
||||
Operation_hash.pp (Operation.hash_packed op)
|
||||
pp_print_error errs
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "Client-side validation: invalid operation filtered %a\n%a"
|
||||
-% t event "baking_rejected_invalid_operation"
|
||||
-% a Operation_hash.Logging.tag (Operation.hash_packed op)
|
||||
-% a errs_tag errs)
|
||||
>>= fun () ->
|
||||
return_none
|
||||
| Ok inc -> return_some inc
|
||||
@ -548,8 +572,10 @@ let filter_invalid_operations (cctxt : #full) state block_info (operations : pac
|
||||
filter_map_s (is_valid_endorsement inc) endorsements >>=? fun endorsements ->
|
||||
finalize_construction inc >>= function
|
||||
| Error errs ->
|
||||
lwt_log_error "Client-side validation: invalid block built. Building an empty block...\n%a"
|
||||
pp_print_error errs >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Client-side validation: invalid block built. Building an empty block...\n%a"
|
||||
-% t event "built_invalid_block_error"
|
||||
-% a errs_tag errs) >>= fun () ->
|
||||
return [ [] ; [] ; [] ; [] ]
|
||||
| Ok () ->
|
||||
let quota : Alpha_environment.Updater.quota list = Main.validation_passes in
|
||||
@ -586,11 +612,13 @@ let bake_slot
|
||||
else
|
||||
timestamp in
|
||||
Client_keys.Public_key_hash.name cctxt delegate >>=? fun name ->
|
||||
lwt_debug "Try baking after %a (slot %d) for %s (%a)"
|
||||
Block_hash.pp_short bi.hash
|
||||
priority
|
||||
name
|
||||
Time.pp_hum timestamp >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Try baking after %a (slot %d) for %s (%a)"
|
||||
-% t event "try_baking"
|
||||
-% a Block_hash.Logging.tag bi.hash
|
||||
-% s bake_priorty_tag priority
|
||||
-% s Client_keys.Logging.tag name
|
||||
-% a timestamp_tag timestamp) >>= fun () ->
|
||||
(* get and process operations *)
|
||||
Alpha_block_services.Mempool.pending_operations cctxt ~chain () >>=? fun mpool ->
|
||||
let operations = ops_of_mempool mpool in
|
||||
@ -611,9 +639,10 @@ let bake_slot
|
||||
return operations
|
||||
end >>= function
|
||||
| Error errs ->
|
||||
lwt_log_error "Client-side validation: error while filtering invalid operations :@\n%a"
|
||||
pp_print_error
|
||||
errs >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Client-side validation: error while filtering invalid operations :@\n%a"
|
||||
-% t event "client_side_validation_error"
|
||||
-% a errs_tag errs) >>= fun () ->
|
||||
return_none
|
||||
| Ok operations ->
|
||||
Alpha_block_services.Helpers.Preapply.block
|
||||
@ -621,21 +650,20 @@ let bake_slot
|
||||
~timestamp ~sort:true ~protocol_data operations
|
||||
>>= function
|
||||
| Error errs ->
|
||||
lwt_log_error "Error while prevalidating operations:@\n%a"
|
||||
pp_print_error
|
||||
errs >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Error while prevalidating operations:@\n%a"
|
||||
-% t event "prevalidate_operations_error"
|
||||
-% a errs_tag errs) >>= fun () ->
|
||||
return_none
|
||||
| Ok (shell_header, operations) ->
|
||||
lwt_debug
|
||||
"Computed candidate block after %a (slot %d): %a/%d fitness: %a"
|
||||
Block_hash.pp_short bi.hash priority
|
||||
(Format.pp_print_list
|
||||
~pp_sep:(fun ppf () -> Format.fprintf ppf "+")
|
||||
(fun ppf operations -> Format.fprintf ppf "%d"
|
||||
(List.length operations.Preapply_result.applied)))
|
||||
operations
|
||||
total_op_count
|
||||
Fitness.pp shell_header.fitness >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Computed candidate block after %a (slot %d): %a/%d fitness: %a"
|
||||
-% t event "candidate_block"
|
||||
-% a Block_hash.Logging.tag bi.hash
|
||||
-% s bake_priorty_tag priority
|
||||
-% a operations_tag operations
|
||||
-% s bake_op_count_tag total_op_count
|
||||
-% a fitness_tag shell_header.fitness) >>= fun () ->
|
||||
let operations =
|
||||
List.map (fun l -> List.map snd l.Preapply_result.applied) operations in
|
||||
return
|
||||
@ -674,9 +702,11 @@ let bake
|
||||
state
|
||||
() =
|
||||
let slots = pop_baking_slots state in
|
||||
lwt_log_info "Found %d current slots and %d future slots."
|
||||
(List.length slots)
|
||||
(List.length state.future_slots) >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "Found %d current slots and %d future slots."
|
||||
-% t event "pop_baking_slots"
|
||||
-% s current_slots_tag (List.length slots)
|
||||
-% s future_slots_tag (List.length state.future_slots)) >>= fun () ->
|
||||
let seed_nonce = generate_seed_nonce () in
|
||||
let seed_nonce_hash = Nonce.hash seed_nonce in
|
||||
|
||||
@ -704,8 +734,10 @@ let bake
|
||||
|
||||
(* avoid double baking *)
|
||||
previously_baked_level cctxt src_pkh level >>=? function
|
||||
| true -> lwt_log_error "Level %a : previously baked"
|
||||
Raw_level.pp level >>= return
|
||||
| true -> lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Level %a : previously baked"
|
||||
-% t event "double_bake_near_miss"
|
||||
-% a level_tag level) >>= return
|
||||
| false ->
|
||||
inject_block cctxt
|
||||
~force:true ~chain
|
||||
@ -727,9 +759,9 @@ let bake
|
||||
pp_operation_list_list operations >>= fun () ->
|
||||
return_unit
|
||||
end
|
||||
|
||||
| _ -> (* no candidates, or none fit-enough *)
|
||||
lwt_debug "No valid candidates." >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "No valid candidates." -% t event "no_baking_candidates") >>= fun () ->
|
||||
return_unit
|
||||
|
||||
|
||||
|
@ -7,7 +7,9 @@
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
include Logging.Make(struct let name = "client.scheduling" end)
|
||||
include Tezos_stdlib.Logging.Make_semantic(struct let name = "client.scheduling" end)
|
||||
|
||||
open Logging
|
||||
|
||||
let sleep_until time =
|
||||
let delay = Time.diff time (Time.now ()) in
|
||||
@ -16,19 +18,26 @@ let sleep_until time =
|
||||
else
|
||||
Some (Lwt_unix.sleep (Int64.to_float delay))
|
||||
|
||||
let rec wait_for_first_event stream =
|
||||
let rec wait_for_first_event ~name stream =
|
||||
Lwt_stream.get stream >>= function
|
||||
| None | Some (Error _) ->
|
||||
lwt_log_info "Can't fetch the current event. Waiting for new event." >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "Can't fetch the current event. Waiting for new event."
|
||||
-% t event "cannot_fetch_event"
|
||||
-% t worker_tag name) >>= fun () ->
|
||||
(* NOTE: this is not a tight loop because of Lwt_stream.get *)
|
||||
wait_for_first_event stream
|
||||
wait_for_first_event ~name stream
|
||||
| Some (Ok bi) ->
|
||||
Lwt.return bi
|
||||
|
||||
let log_errors_and_continue p =
|
||||
let log_errors_and_continue ~name p =
|
||||
p >>= function
|
||||
| Ok () -> Lwt.return_unit
|
||||
| Error errs -> lwt_log_error "Error while baking:@\n%a" pp_print_error errs
|
||||
| Error errs -> lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Error while baking:@\n%a"
|
||||
-% t event "daemon_error"
|
||||
-% t worker_tag name
|
||||
-% a errs_tag errs)
|
||||
|
||||
let main
|
||||
~(name: string)
|
||||
@ -52,9 +61,12 @@ let main
|
||||
unit tzresult Lwt.t))
|
||||
=
|
||||
|
||||
lwt_log_info "Setting up before the %s can start." name >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "Setting up before the %s can start."
|
||||
-% t event "daemon_setup"
|
||||
-% s worker_tag name) >>= fun () ->
|
||||
|
||||
wait_for_first_event stream >>= fun first_event ->
|
||||
wait_for_first_event ~name stream >>= fun first_event ->
|
||||
Shell_services.Blocks.hash cctxt ~block:`Genesis () >>=? fun genesis_hash ->
|
||||
|
||||
(* statefulness *)
|
||||
@ -68,7 +80,7 @@ let main
|
||||
| Some t -> t in
|
||||
state_maker genesis_hash first_event >>=? fun state ->
|
||||
|
||||
log_errors_and_continue @@ pre_loop cctxt state first_event >>= fun () ->
|
||||
log_errors_and_continue ~name @@ pre_loop cctxt state first_event >>= fun () ->
|
||||
|
||||
(* main loop *)
|
||||
let rec worker_loop () =
|
||||
@ -82,23 +94,32 @@ let main
|
||||
| `Event (None | Some (Error _)) ->
|
||||
(* exit when the node is unavailable *)
|
||||
last_get_event := None ;
|
||||
lwt_log_error "Connection to node lost, %s exiting." name >>= fun () ->
|
||||
lwt_log_error Tag.DSL.(fun f ->
|
||||
f "Connection to node lost, %s exiting."
|
||||
-% t event "daemon_connection_lost"
|
||||
-% s worker_tag name) >>= fun () ->
|
||||
exit 1
|
||||
| `Event (Some (Ok event)) -> begin
|
||||
(* new event: cancel everything and execute callback *)
|
||||
last_get_event := None ;
|
||||
(* TODO: pretty-print events (requires passing a pp as argument) *)
|
||||
log_errors_and_continue @@ event_k cctxt state event
|
||||
log_errors_and_continue ~name @@ event_k cctxt state event
|
||||
end
|
||||
| `Timeout timesup ->
|
||||
(* main event: it's time *)
|
||||
lwt_debug "Waking up for %s." name >>= fun () ->
|
||||
lwt_debug Tag.DSL.(fun f ->
|
||||
f "Waking up for %s."
|
||||
-% t event "daemon_wakeup"
|
||||
-% s worker_tag name) >>= fun () ->
|
||||
(* core functionality *)
|
||||
log_errors_and_continue @@ timeout_k cctxt state timesup
|
||||
log_errors_and_continue ~name @@ timeout_k cctxt state timesup
|
||||
end >>= fun () ->
|
||||
(* and restart *)
|
||||
worker_loop () in
|
||||
|
||||
(* ignition *)
|
||||
lwt_log_info "Starting %s daemon" name >>= fun () ->
|
||||
lwt_log_info Tag.DSL.(fun f ->
|
||||
f "Starting %s daemon"
|
||||
-% t event "daemon_start"
|
||||
-% s worker_tag name) >>= fun () ->
|
||||
worker_loop ()
|
||||
|
@ -11,6 +11,7 @@
|
||||
val sleep_until: Time.t -> unit Lwt.t option
|
||||
|
||||
val wait_for_first_event:
|
||||
name:string ->
|
||||
'event tzresult Lwt_stream.t ->
|
||||
'event Lwt.t
|
||||
|
||||
|
38
src/proto_alpha/lib_delegate/logging.ml
Normal file
38
src/proto_alpha/lib_delegate/logging.ml
Normal file
@ -0,0 +1,38 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
open Proto_alpha
|
||||
open Alpha_context
|
||||
|
||||
let timestamp_tag = Tag.def ~doc:"Timestamp when event occurred" "timestamp" Time.pp_hum
|
||||
let valid_ops = Tag.def ~doc:"Valid Operations" "valid_ops" Format.pp_print_int
|
||||
let refused_ops = Tag.def ~doc:"Refused Operations" "refused_ops" Format.pp_print_int
|
||||
let bake_priorty_tag = Tag.def ~doc:"Baking Priority" "bake_priority" Format.pp_print_int
|
||||
let fitness_tag = Tag.def ~doc:"Fitness" "fitness" Fitness.pp
|
||||
let current_slots_tag = Tag.def ~doc:"Number of baking slots that can be baked at this time" "current_slots" Format.pp_print_int
|
||||
let future_slots_tag = Tag.def ~doc:"Number of baking slots in the foreseeable future but not yet bakeable" "future_slots" Format.pp_print_int
|
||||
|
||||
let operations_tag = Tag.def ~doc:"Block Operations" "operations"
|
||||
(Format.pp_print_list
|
||||
~pp_sep:(fun ppf () -> Format.fprintf ppf "+")
|
||||
(fun ppf operations -> Format.fprintf ppf "%d" (List.length operations.Preapply_result.applied)))
|
||||
|
||||
let bake_op_count_tag = Tag.def ~doc:"Bake Operation Count" "operation_count" Format.pp_print_int
|
||||
|
||||
let endorsement_slot_tag = Tag.def ~doc:"Endorsement Slot" "endorsement_slot" Format.pp_print_int
|
||||
let endorsement_slots_tag = Tag.def ~doc:"Endorsement Slots" "endorsement_slots" Format.(fun ppf v -> pp_print_int ppf (List.length v))
|
||||
let denounced_endorsements_slots_tag = Tag.def ~doc:"Endorsement Slots" "denounced_endorsement_slots" Format.(pp_print_list pp_print_int)
|
||||
let denouncement_source_tag = Tag.def ~doc:"Denounce Source" "source" Format.pp_print_text
|
||||
|
||||
let level_tag = Tag.def ~doc:"Level" "level" Raw_level.pp
|
||||
|
||||
let worker_tag = Tag.def ~doc:"Worker in which event occurred" "worker" Format.pp_print_text
|
||||
|
||||
let conflicting_endorsements_tag = Tag.def ~doc:"Two conflicting endorsements signed by the same key" "conflicting_endorsements" Format.(
|
||||
fun ppf (a,b) -> fprintf ppf "%a / %a" Operation_hash.pp (Operation.hash a) Operation_hash.pp (Operation.hash b))
|
28
src/proto_alpha/lib_delegate/logging.mli
Normal file
28
src/proto_alpha/lib_delegate/logging.mli
Normal file
@ -0,0 +1,28 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2018. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
val timestamp_tag : Time.t Tag.def
|
||||
val valid_ops : int Tag.def
|
||||
val refused_ops : int Tag.def
|
||||
val bake_priorty_tag : int Tag.def
|
||||
val fitness_tag : Fitness.t Tag.def
|
||||
val current_slots_tag : int Tag.def
|
||||
val future_slots_tag : int Tag.def
|
||||
|
||||
val operations_tag : error Preapply_result.t list Tag.def
|
||||
val bake_op_count_tag : int Tag.def
|
||||
val endorsement_slot_tag : int Tag.def
|
||||
val endorsement_slots_tag : int list Tag.def
|
||||
val denounced_endorsements_slots_tag : int list Tag.def
|
||||
val denouncement_source_tag : string Tag.def
|
||||
val level_tag : Proto_alpha.Alpha_context.Raw_level.t Tag.def
|
||||
val worker_tag : string Tag.def
|
||||
|
||||
open Proto_alpha.Alpha_context
|
||||
val conflicting_endorsements_tag : (Kind.endorsement operation * Kind.endorsement operation) Tag.def
|
Loading…
Reference in New Issue
Block a user