Node: switch the peer validator to Tezos_worker
This commit is contained in:
parent
50b1957714
commit
ecbf1805e1
@ -57,6 +57,7 @@ and shell = {
|
||||
block_validator_limits : Node.block_validator_limits ;
|
||||
prevalidator_limits : Node.prevalidator_limits ;
|
||||
timeout : Node.timeout ;
|
||||
peer_validator_limits : Node.peer_validator_limits ;
|
||||
}
|
||||
|
||||
let default_net_limits : P2p.limits = {
|
||||
@ -130,7 +131,19 @@ let default_shell = {
|
||||
block_operations = 60. ;
|
||||
protocol = 120. ;
|
||||
new_head_request = 90. ;
|
||||
}
|
||||
} ;
|
||||
peer_validator_limits = {
|
||||
block_header_timeout = 60. ;
|
||||
block_operations_timeout = 60. ;
|
||||
protocol_timeout = 120. ;
|
||||
new_head_request_timeout = 90. ;
|
||||
worker_limits = {
|
||||
backlog_size = 1000 ;
|
||||
backlog_level = Logging.Info ;
|
||||
zombie_lifetime = 600. ;
|
||||
zombie_memory = 120. ;
|
||||
}
|
||||
} ;
|
||||
}
|
||||
|
||||
let default_config = {
|
||||
@ -330,6 +343,30 @@ let prevalidator_limits_encoding =
|
||||
default_shell.prevalidator_limits.worker_limits.zombie_lifetime
|
||||
default_shell.prevalidator_limits.worker_limits.zombie_memory))
|
||||
|
||||
let peer_validator_limits_encoding =
|
||||
let open Data_encoding in
|
||||
let default_limits = default_shell.peer_validator_limits in
|
||||
conv
|
||||
(fun { Node.block_header_timeout ; block_operations_timeout ;
|
||||
protocol_timeout ; new_head_request_timeout ; worker_limits } ->
|
||||
((block_header_timeout, block_operations_timeout,
|
||||
protocol_timeout, new_head_request_timeout), worker_limits))
|
||||
(fun ((block_header_timeout, block_operations_timeout,
|
||||
protocol_timeout, new_head_request_timeout), worker_limits) ->
|
||||
{ block_header_timeout ; block_operations_timeout ;
|
||||
protocol_timeout ; new_head_request_timeout ; worker_limits })
|
||||
(merge_objs
|
||||
(obj4
|
||||
(dft "block_header_request_timeout" timeout_encoding default_limits.block_header_timeout)
|
||||
(dft "block_operations_request_timeout" timeout_encoding default_limits.block_operations_timeout)
|
||||
(dft "protocol_request_timeout" timeout_encoding default_limits.protocol_timeout)
|
||||
(dft "new_head_request_timeout" timeout_encoding default_limits.new_head_request_timeout))
|
||||
(worker_limits_encoding
|
||||
default_limits.worker_limits.backlog_size
|
||||
default_limits.worker_limits.backlog_level
|
||||
default_limits.worker_limits.zombie_lifetime
|
||||
default_limits.worker_limits.zombie_memory))
|
||||
|
||||
let timeout_encoding =
|
||||
let open Data_encoding in
|
||||
let uint8 = conv int_of_float float_of_int uint8 in
|
||||
@ -352,18 +389,20 @@ let timeout_encoding =
|
||||
let shell =
|
||||
let open Data_encoding in
|
||||
conv
|
||||
(fun { bootstrap_threshold ; timeout ;
|
||||
(fun { bootstrap_threshold ; timeout ; peer_validator_limits ;
|
||||
block_validator_limits ; prevalidator_limits } ->
|
||||
bootstrap_threshold, timeout,
|
||||
bootstrap_threshold, timeout, peer_validator_limits,
|
||||
block_validator_limits, prevalidator_limits)
|
||||
(fun (bootstrap_threshold, timeout,
|
||||
(fun (bootstrap_threshold, timeout, peer_validator_limits,
|
||||
block_validator_limits, prevalidator_limits) ->
|
||||
{ bootstrap_threshold ; timeout ;
|
||||
peer_validator_limits ;
|
||||
block_validator_limits ;
|
||||
prevalidator_limits })
|
||||
(obj4
|
||||
(obj5
|
||||
(dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold)
|
||||
(dft "timeout" timeout_encoding default_shell.timeout)
|
||||
(dft "peer_validator" peer_validator_limits_encoding default_shell.peer_validator_limits)
|
||||
(dft "block_validator" block_validator_limits_encoding default_shell.block_validator_limits)
|
||||
(dft "prevalidator" prevalidator_limits_encoding default_shell.prevalidator_limits)
|
||||
)
|
||||
@ -484,6 +523,7 @@ let update
|
||||
~default:cfg.shell.bootstrap_threshold
|
||||
bootstrap_threshold ;
|
||||
timeout = cfg.shell.timeout ;
|
||||
peer_validator_limits = cfg.shell.peer_validator_limits ;
|
||||
block_validator_limits = cfg.shell.block_validator_limits ;
|
||||
prevalidator_limits = cfg.shell.prevalidator_limits ;
|
||||
}
|
||||
|
@ -47,6 +47,7 @@ and shell = {
|
||||
block_validator_limits : Node.block_validator_limits ;
|
||||
prevalidator_limits : Node.prevalidator_limits ;
|
||||
timeout : Node.timeout ;
|
||||
peer_validator_limits : Node.peer_validator_limits ;
|
||||
}
|
||||
|
||||
val default_data_dir: string
|
||||
|
@ -173,6 +173,7 @@ let init_node ?sandbox (config : Node_config_file.t) =
|
||||
Node.create
|
||||
node_config
|
||||
config.shell.timeout
|
||||
config.shell.peer_validator_limits
|
||||
config.shell.block_validator_limits
|
||||
config.shell.prevalidator_limits
|
||||
|
||||
|
@ -18,6 +18,7 @@ type t = {
|
||||
|
||||
timeout: timeout ;
|
||||
prevalidator_limits: Prevalidator.limits ;
|
||||
peer_validator_limits: Peer_validator.limits ;
|
||||
bootstrap_threshold: int ;
|
||||
mutable bootstrapped: bool ;
|
||||
bootstrapped_waiter: unit Lwt.t ;
|
||||
@ -83,10 +84,6 @@ let may_activate_peer_validator nv peer_id =
|
||||
with Not_found ->
|
||||
let pv =
|
||||
Peer_validator.create
|
||||
~new_head_request_timeout:nv.timeout.new_head_request
|
||||
~block_header_timeout:nv.timeout.block_header
|
||||
~block_operations_timeout:nv.timeout.block_operations
|
||||
~protocol_timeout:nv.timeout.protocol
|
||||
~notify_new_block:(notify_new_block nv)
|
||||
~notify_bootstrapped: begin fun () ->
|
||||
P2p.Peer_id.Table.add nv.bootstrapped_peers peer_id () ;
|
||||
@ -96,6 +93,7 @@ let may_activate_peer_validator nv peer_id =
|
||||
P2p.Peer_id.Table.remove nv.active_peers peer_id ;
|
||||
P2p.Peer_id.Table.remove nv.bootstrapped_peers peer_id ;
|
||||
end
|
||||
nv.peer_validator_limits
|
||||
nv.block_validator nv.net_db peer_id in
|
||||
P2p.Peer_id.Table.add nv.active_peers peer_id pv ;
|
||||
pv
|
||||
@ -122,7 +120,7 @@ let broadcast_head nv ~previous block =
|
||||
let rec create
|
||||
?max_child_ttl ?parent
|
||||
?(bootstrap_threshold = 1)
|
||||
timeout prevalidator_limits block_validator
|
||||
timeout peer_validator_limits prevalidator_limits block_validator
|
||||
global_valid_block_input db net_state =
|
||||
Chain.init_head net_state >>= fun () ->
|
||||
let net_db = Distributed_db.activate db net_state in
|
||||
@ -135,7 +133,7 @@ let rec create
|
||||
let nv = {
|
||||
db ; net_state ; net_db ; block_validator ;
|
||||
prevalidator ;
|
||||
timeout ; prevalidator_limits ;
|
||||
timeout ; prevalidator_limits ; peer_validator_limits ;
|
||||
valid_block_input ; global_valid_block_input ;
|
||||
new_head_input ;
|
||||
parent ; max_child_ttl ; child = None ;
|
||||
@ -251,7 +249,8 @@ and may_switch_test_network nv block =
|
||||
return net_state
|
||||
end >>=? fun net_state ->
|
||||
create
|
||||
~parent:nv nv.timeout nv.prevalidator_limits nv.block_validator
|
||||
~parent:nv nv.timeout nv.peer_validator_limits
|
||||
nv.prevalidator_limits nv.block_validator
|
||||
nv.global_valid_block_input
|
||||
nv.db net_state >>= fun child ->
|
||||
nv.child <- Some child ;
|
||||
|
@ -20,6 +20,7 @@ val create:
|
||||
?max_child_ttl:int ->
|
||||
?bootstrap_threshold:int ->
|
||||
timeout ->
|
||||
Peer_validator.limits ->
|
||||
Prevalidator.limits ->
|
||||
Block_validator.t ->
|
||||
State.Block.t Lwt_watcher.input ->
|
||||
|
@ -95,6 +95,13 @@ and timeout = Net_validator.timeout = {
|
||||
protocol: float ;
|
||||
new_head_request: float ;
|
||||
}
|
||||
and peer_validator_limits = Peer_validator.limits = {
|
||||
new_head_request_timeout: float ;
|
||||
block_header_timeout: float ;
|
||||
block_operations_timeout: float ;
|
||||
protocol_timeout: float ;
|
||||
worker_limits: Worker_types.limits
|
||||
}
|
||||
|
||||
and prevalidator_limits = Prevalidator.limits = {
|
||||
max_refused_operations: int ;
|
||||
@ -118,6 +125,7 @@ let create { genesis ; store_root ; context_root ;
|
||||
test_network_max_tll = max_child_ttl ;
|
||||
bootstrap_threshold }
|
||||
timeout
|
||||
peer_validator_limits
|
||||
block_validator_limits
|
||||
prevalidator_limits =
|
||||
init_p2p net_params >>=? fun p2p ->
|
||||
@ -125,6 +133,7 @@ let create { genesis ; store_root ; context_root ;
|
||||
~store_root ~context_root ?patch_context () >>=? fun state ->
|
||||
let distributed_db = Distributed_db.create state p2p in
|
||||
Validator.create state distributed_db timeout
|
||||
peer_validator_limits
|
||||
block_validator_limits prevalidator_limits >>= fun validator ->
|
||||
may_create_net state genesis >>= fun mainnet_state ->
|
||||
Validator.activate validator
|
||||
|
@ -25,6 +25,13 @@ and timeout = {
|
||||
protocol: float ;
|
||||
new_head_request: float ;
|
||||
}
|
||||
and peer_validator_limits = {
|
||||
new_head_request_timeout: float ;
|
||||
block_header_timeout: float ;
|
||||
block_operations_timeout: float ;
|
||||
protocol_timeout: float ;
|
||||
worker_limits: Worker_types.limits
|
||||
}
|
||||
and prevalidator_limits = {
|
||||
max_refused_operations: int ;
|
||||
operation_timeout: float ;
|
||||
@ -38,6 +45,7 @@ and block_validator_limits = {
|
||||
val create:
|
||||
config ->
|
||||
timeout ->
|
||||
peer_validator_limits ->
|
||||
block_validator_limits ->
|
||||
prevalidator_limits ->
|
||||
t tzresult Lwt.t
|
||||
|
@ -9,37 +9,78 @@
|
||||
|
||||
(* FIXME ignore/postpone fetching/validating of block in the future... *)
|
||||
|
||||
include Logging.Make(struct let name = "node.validator.peer" end)
|
||||
open Peer_validator_worker_state
|
||||
|
||||
type msg =
|
||||
| New_head of Block_hash.t * Block_header.t
|
||||
| New_branch of Block_hash.t * Block_locator.t
|
||||
module Name = struct
|
||||
type t = Net_id.t * P2p.Peer_id.t
|
||||
let encoding =
|
||||
Data_encoding.tup2 Net_id.encoding P2p.Peer_id.encoding
|
||||
let base = [ "peer_validator" ]
|
||||
let pp ppf (net, peer) =
|
||||
Format.fprintf ppf "%a:%a"
|
||||
Net_id.pp_short net P2p.Peer_id.pp_short peer
|
||||
end
|
||||
|
||||
type t = {
|
||||
module Request = struct
|
||||
include Request
|
||||
|
||||
peer_id: P2p.Peer_id.t ;
|
||||
net_db: Distributed_db.net_db ;
|
||||
block_validator: Block_validator.t ;
|
||||
type _ t =
|
||||
| New_head : Block_hash.t * Block_header.t -> unit t
|
||||
| New_branch : Block_hash.t * Block_locator.t -> unit t
|
||||
|
||||
let view (type a) (req : a t) : view = match req with
|
||||
| New_head (hash, _) ->
|
||||
New_head hash
|
||||
| New_branch (hash, locator) ->
|
||||
New_branch (hash, Block_locator_iterator.estimated_length locator)
|
||||
end
|
||||
|
||||
type limits = {
|
||||
new_head_request_timeout: float ;
|
||||
block_header_timeout: float ;
|
||||
block_operations_timeout: float ;
|
||||
protocol_timeout: float ;
|
||||
|
||||
(* callback to net_validator *)
|
||||
notify_new_block: State.Block.t -> unit ;
|
||||
notify_bootstrapped: unit -> unit ;
|
||||
|
||||
mutable bootstrapped: bool ;
|
||||
mutable last_validated_head: Block_header.t ;
|
||||
mutable last_advertised_head: Block_header.t ;
|
||||
|
||||
mutable worker: unit Lwt.t ;
|
||||
dropbox: msg Lwt_dropbox.t ;
|
||||
canceler: Lwt_canceler.t ;
|
||||
|
||||
worker_limits: Worker_types.limits
|
||||
}
|
||||
|
||||
module Types = struct
|
||||
include Worker_state
|
||||
|
||||
type parameters = {
|
||||
net_db: Distributed_db.net_db ;
|
||||
block_validator: Block_validator.t ;
|
||||
(* callback to net_validator *)
|
||||
notify_new_block: State.Block.t -> unit ;
|
||||
notify_bootstrapped: unit -> unit ;
|
||||
notify_termination: unit -> unit ;
|
||||
limits: limits;
|
||||
}
|
||||
|
||||
type state = {
|
||||
peer_id: P2p.Peer_id.t ;
|
||||
parameters : parameters ;
|
||||
mutable bootstrapped: bool ;
|
||||
mutable last_validated_head: Block_header.t ;
|
||||
mutable last_advertised_head: Block_header.t ;
|
||||
}
|
||||
|
||||
let view (state : state) _ : view =
|
||||
let { bootstrapped ; last_validated_head ; last_advertised_head } = state in
|
||||
{ bootstrapped ;
|
||||
last_validated_head = Block_header.hash last_validated_head ;
|
||||
last_advertised_head = Block_header.hash last_advertised_head }
|
||||
|
||||
end
|
||||
|
||||
module Worker = Worker.Make (Name) (Event) (Request) (Types)
|
||||
|
||||
open Types
|
||||
|
||||
type t = Worker.dropbox Worker.t
|
||||
|
||||
let debug w =
|
||||
Format.kasprintf (fun msg -> Worker.record_event w (Debug msg))
|
||||
|
||||
type error +=
|
||||
| Unknown_ancestor
|
||||
| Known_invalid
|
||||
@ -47,22 +88,23 @@ type error +=
|
||||
let set_bootstrapped pv =
|
||||
if not pv.bootstrapped then begin
|
||||
pv.bootstrapped <- true ;
|
||||
pv.notify_bootstrapped () ;
|
||||
pv.parameters.notify_bootstrapped () ;
|
||||
end
|
||||
|
||||
let bootstrap_new_branch pv _ancestor _head unknown_prefix =
|
||||
let bootstrap_new_branch w _ancestor _head unknown_prefix =
|
||||
let pv = Worker.state w in
|
||||
let len = Block_locator_iterator.estimated_length unknown_prefix in
|
||||
lwt_log_info
|
||||
debug w
|
||||
"validating new branch from peer %a (approx. %d blocks)"
|
||||
P2p.Peer_id.pp_short pv.peer_id len >>= fun () ->
|
||||
P2p.Peer_id.pp_short pv.peer_id len ;
|
||||
let pipeline =
|
||||
Bootstrap_pipeline.create
|
||||
~notify_new_block:pv.notify_new_block
|
||||
~block_header_timeout:pv.block_header_timeout
|
||||
~block_operations_timeout:pv.block_operations_timeout
|
||||
pv.block_validator
|
||||
pv.peer_id pv.net_db unknown_prefix in
|
||||
Lwt_utils.protect ~canceler:pv.canceler
|
||||
~notify_new_block:pv.parameters.notify_new_block
|
||||
~block_header_timeout:pv.parameters.limits.block_header_timeout
|
||||
~block_operations_timeout:pv.parameters.limits.block_operations_timeout
|
||||
pv.parameters.block_validator
|
||||
pv.peer_id pv.parameters.net_db unknown_prefix in
|
||||
Worker.protect w
|
||||
~on_error:begin fun error ->
|
||||
(* if the peer_validator is killed, let's cancel the pipeline *)
|
||||
Bootstrap_pipeline.cancel pipeline >>= fun () ->
|
||||
@ -72,235 +114,263 @@ let bootstrap_new_branch pv _ancestor _head unknown_prefix =
|
||||
Bootstrap_pipeline.wait pipeline
|
||||
end >>=? fun () ->
|
||||
set_bootstrapped pv ;
|
||||
lwt_log_info
|
||||
debug w
|
||||
"done validating new branch from peer %a."
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
return ()
|
||||
|
||||
let validate_new_head pv hash (header : Block_header.t) =
|
||||
let net_state = Distributed_db.net_state pv.net_db in
|
||||
let validate_new_head w hash (header : Block_header.t) =
|
||||
let pv = Worker.state w in
|
||||
let net_state = Distributed_db.net_state pv.parameters.net_db in
|
||||
State.Block.known net_state header.shell.predecessor >>= function
|
||||
| false ->
|
||||
lwt_debug
|
||||
debug w
|
||||
"missing predecessor for new head %a from peer %a"
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
Distributed_db.Request.current_branch pv.net_db ~peer:pv.peer_id () ;
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
Distributed_db.Request.current_branch pv.parameters.net_db ~peer:pv.peer_id () ;
|
||||
return ()
|
||||
| true ->
|
||||
lwt_debug
|
||||
debug w
|
||||
"fetching operations for new head %a from peer %a"
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
map_p
|
||||
(fun i ->
|
||||
Lwt_utils.protect ~canceler:pv.canceler begin fun () ->
|
||||
Worker.protect w begin fun () ->
|
||||
Distributed_db.Operations.fetch
|
||||
~timeout:pv.block_operations_timeout
|
||||
pv.net_db ~peer:pv.peer_id
|
||||
~timeout:pv.parameters.limits.block_operations_timeout
|
||||
pv.parameters.net_db ~peer:pv.peer_id
|
||||
(hash, i) header.shell.operations_hash
|
||||
end)
|
||||
(0 -- (header.shell.validation_passes - 1)) >>=? fun operations ->
|
||||
lwt_debug
|
||||
debug w
|
||||
"requesting validation for new head %a from peer %a"
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
Block_validator.validate
|
||||
~notify_new_block:pv.notify_new_block
|
||||
pv.block_validator pv.net_db
|
||||
~notify_new_block:pv.parameters.notify_new_block
|
||||
pv.parameters.block_validator pv.parameters.net_db
|
||||
hash header operations >>=? fun _block ->
|
||||
lwt_debug "end of validation for new head %a from peer %a"
|
||||
debug w
|
||||
"end of validation for new head %a from peer %a"
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
set_bootstrapped pv ;
|
||||
return ()
|
||||
|
||||
let may_validate_new_head pv hash header =
|
||||
let net_state = Distributed_db.net_state pv.net_db in
|
||||
let may_validate_new_head w hash header =
|
||||
let pv = Worker.state w in
|
||||
let net_state = Distributed_db.net_state pv.parameters.net_db in
|
||||
State.Block.known net_state hash >>= function
|
||||
| true -> begin
|
||||
State.Block.known_valid net_state hash >>= function
|
||||
| true ->
|
||||
lwt_debug
|
||||
debug w
|
||||
"ignoring previously validated block %a from peer %a"
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
set_bootstrapped pv ;
|
||||
pv.last_validated_head <- header ;
|
||||
return ()
|
||||
| false ->
|
||||
lwt_log_info
|
||||
debug w
|
||||
"ignoring known invalid block %a from peer %a"
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
fail Known_invalid
|
||||
end
|
||||
| false ->
|
||||
validate_new_head pv hash header
|
||||
validate_new_head w hash header
|
||||
|
||||
let may_validate_new_branch pv distant_hash locator =
|
||||
let may_validate_new_branch w distant_hash locator =
|
||||
let pv = Worker.state w in
|
||||
let distant_header, _ = (locator : Block_locator.t :> Block_header.t * _) in
|
||||
let net_state = Distributed_db.net_state pv.net_db in
|
||||
let net_state = Distributed_db.net_state pv.parameters.net_db in
|
||||
Chain.head net_state >>= fun local_header ->
|
||||
if Fitness.compare
|
||||
distant_header.Block_header.shell.fitness
|
||||
(State.Block.fitness local_header) < 0 then begin
|
||||
set_bootstrapped pv ;
|
||||
lwt_debug
|
||||
debug w
|
||||
"ignoring branch %a with low fitness from peer: %a."
|
||||
Block_hash.pp_short distant_hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
(* Don't bother with downloading a branch with a low fitness. *)
|
||||
return ()
|
||||
end else begin
|
||||
let net_state = Distributed_db.net_state pv.net_db in
|
||||
let net_state = Distributed_db.net_state pv.parameters.net_db in
|
||||
Block_locator_iterator.known_ancestor net_state locator >>= function
|
||||
| None ->
|
||||
lwt_log_info
|
||||
debug w
|
||||
"ignoring branch %a without common ancestor from peer: %a."
|
||||
Block_hash.pp_short distant_hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
fail Unknown_ancestor
|
||||
| Some (ancestor, unknown_prefix) ->
|
||||
bootstrap_new_branch pv ancestor distant_header unknown_prefix
|
||||
bootstrap_new_branch w ancestor distant_header unknown_prefix
|
||||
end
|
||||
|
||||
let rec worker_loop pv =
|
||||
begin
|
||||
Lwt_utils.protect ~canceler:pv.canceler begin fun () ->
|
||||
Lwt_dropbox.take_with_timeout
|
||||
pv.new_head_request_timeout
|
||||
pv.dropbox >>= return
|
||||
end >>=? function
|
||||
| None ->
|
||||
lwt_log_info "no new head from peer %a for 90 seconds."
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
Distributed_db.Request.current_head pv.net_db ~peer:pv.peer_id () ;
|
||||
return ()
|
||||
| Some (New_head (hash, header)) ->
|
||||
lwt_log_info "processing new head %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
may_validate_new_head pv hash header
|
||||
| Some (New_branch (hash, locator)) ->
|
||||
(* TODO penalize empty locator... ?? *)
|
||||
lwt_log_info "processing new branch %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
may_validate_new_branch pv hash locator
|
||||
end >>= function
|
||||
| Ok () ->
|
||||
worker_loop pv
|
||||
| Error ((( Unknown_ancestor
|
||||
| Bootstrap_pipeline.Invalid_locator _
|
||||
| Block_validator.Invalid_block _ ) :: _) as errors ) ->
|
||||
let on_no_request w =
|
||||
let pv = Worker.state w in
|
||||
debug w "no new head from peer %a for %g seconds."
|
||||
P2p.Peer_id.pp_short pv.peer_id
|
||||
pv.parameters.limits.new_head_request_timeout ;
|
||||
Distributed_db.Request.current_head pv.parameters.net_db ~peer:pv.peer_id () ;
|
||||
return ()
|
||||
|
||||
let on_request (type a) w (req : a Request.t) : a tzresult Lwt.t =
|
||||
let pv = Worker.state w in
|
||||
match req with
|
||||
| Request.New_head (hash, header) ->
|
||||
debug w
|
||||
"processing new head %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
may_validate_new_head w hash header
|
||||
| Request.New_branch (hash, locator) ->
|
||||
(* TODO penalize empty locator... ?? *)
|
||||
debug w "processing new branch %a from peer %a."
|
||||
Block_hash.pp_short hash
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
may_validate_new_branch w hash locator
|
||||
|
||||
let on_completion w r _ st =
|
||||
Worker.record_event w (Event.Request (Request.view r, st, None )) ;
|
||||
Lwt.return ()
|
||||
|
||||
let on_error w r st errs =
|
||||
let pv = Worker.state w in
|
||||
match errs with
|
||||
((( Unknown_ancestor
|
||||
| Bootstrap_pipeline.Invalid_locator _
|
||||
| Block_validator_errors.Invalid_block _ ) :: _) as errors ) ->
|
||||
(* TODO ban the peer_id... *)
|
||||
lwt_log_info "Terminating the validation worker for peer %a (kickban)."
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
lwt_debug "%a" Error_monad.pp_print_error errors >>= fun () ->
|
||||
Lwt_canceler.cancel pv.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Error [Block_validator_errors.Unavailable_protocol { protocol } ] -> begin
|
||||
debug w
|
||||
"Terminating the validation worker for peer %a (kickban)."
|
||||
P2p.Peer_id.pp_short pv.peer_id ;
|
||||
debug w "%a" Error_monad.pp_print_error errors ;
|
||||
Worker.trigger_shutdown w ;
|
||||
Worker.record_event w (Event.Request (r, st, Some errs)) ;
|
||||
Lwt.return (Error errs)
|
||||
| [Block_validator_errors.Unavailable_protocol { protocol } ] -> begin
|
||||
Block_validator.fetch_and_compile_protocol
|
||||
pv.block_validator
|
||||
pv.parameters.block_validator
|
||||
~peer:pv.peer_id
|
||||
~timeout:pv.protocol_timeout
|
||||
~timeout:pv.parameters.limits.protocol_timeout
|
||||
protocol >>= function
|
||||
| Ok _ -> worker_loop pv
|
||||
| Ok _ -> return ()
|
||||
| Error _ ->
|
||||
(* TODO penality... *)
|
||||
lwt_log_info "Terminating the validation worker for peer %a \
|
||||
\ (missing protocol %a)."
|
||||
debug w
|
||||
"Terminating the validation worker for peer %a \
|
||||
(missing protocol %a)."
|
||||
P2p.Peer_id.pp_short pv.peer_id
|
||||
Protocol_hash.pp_short protocol >>= fun () ->
|
||||
Lwt_canceler.cancel pv.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
Protocol_hash.pp_short protocol ;
|
||||
Worker.record_event w (Event.Request (r, st, Some errs)) ;
|
||||
Lwt.return (Error errs)
|
||||
end
|
||||
| Error [Exn Lwt.Canceled | Lwt_utils.Canceled | Exn Lwt_dropbox.Closed] ->
|
||||
lwt_log_info "Terminating the validation worker for peer %a."
|
||||
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| Error err ->
|
||||
lwt_log_error
|
||||
"@[<v 2>Unexpected error in the validation worker for peer %a:@ \
|
||||
\ %a@]"
|
||||
P2p.Peer_id.pp_short pv.peer_id
|
||||
pp_print_error err >>= fun () ->
|
||||
Lwt_canceler.cancel pv.canceler >>= fun () ->
|
||||
Lwt.return_unit
|
||||
| _ ->
|
||||
Worker.record_event w (Event.Request (r, st, Some errs)) ;
|
||||
Lwt.return (Error errs)
|
||||
|
||||
let create
|
||||
?notify_new_block:(external_notify_new_block = fun _ -> ())
|
||||
?(notify_bootstrapped = fun () -> ())
|
||||
?(notify_termination = fun _ -> ())
|
||||
~new_head_request_timeout
|
||||
~block_header_timeout
|
||||
~block_operations_timeout
|
||||
~protocol_timeout
|
||||
block_validator net_db peer_id =
|
||||
lwt_debug "creating validator for peer %a."
|
||||
P2p.Peer_id.pp_short peer_id >>= fun () ->
|
||||
let canceler = Lwt_canceler.create () in
|
||||
let dropbox = Lwt_dropbox.create () in
|
||||
let net_state = Distributed_db.net_state net_db in
|
||||
let on_close w =
|
||||
let pv = Worker.state w in
|
||||
pv.parameters.notify_termination () ;
|
||||
Distributed_db.disconnect pv.parameters.net_db pv.peer_id >>= fun () ->
|
||||
Lwt.return ()
|
||||
|
||||
let on_launch _ name parameters =
|
||||
let net_state = Distributed_db.net_state parameters.net_db in
|
||||
State.Block.read_exn net_state
|
||||
(State.Net.genesis net_state).block >>= fun genesis ->
|
||||
let rec notify_new_block block =
|
||||
pv.last_validated_head <- State.Block.header block ;
|
||||
external_notify_new_block block
|
||||
and pv = {
|
||||
block_validator ;
|
||||
notify_new_block ;
|
||||
notify_bootstrapped ;
|
||||
new_head_request_timeout ;
|
||||
block_header_timeout ;
|
||||
block_operations_timeout ;
|
||||
protocol_timeout ;
|
||||
net_db ;
|
||||
peer_id ;
|
||||
let rec pv = {
|
||||
peer_id = snd name ;
|
||||
parameters = { parameters with notify_new_block } ;
|
||||
bootstrapped = false ;
|
||||
last_validated_head = State.Block.header genesis ;
|
||||
last_advertised_head = State.Block.header genesis ;
|
||||
canceler ;
|
||||
dropbox ;
|
||||
worker = Lwt.return_unit ;
|
||||
} in
|
||||
Lwt_canceler.on_cancel pv.canceler begin fun () ->
|
||||
Lwt_dropbox.close pv.dropbox ;
|
||||
Distributed_db.disconnect pv.net_db pv.peer_id >>= fun () ->
|
||||
notify_termination pv ;
|
||||
Lwt.return_unit
|
||||
end ;
|
||||
pv.worker <-
|
||||
Lwt_utils.worker
|
||||
(Format.asprintf "peer_validator.%a.%a"
|
||||
Net_id.pp (State.Net.id net_state) P2p.Peer_id.pp_short peer_id)
|
||||
~run:(fun () -> worker_loop pv)
|
||||
~cancel:(fun () -> Lwt_canceler.cancel pv.canceler) ;
|
||||
}
|
||||
and notify_new_block block =
|
||||
pv.last_validated_head <- State.Block.header block ;
|
||||
parameters.notify_new_block block in
|
||||
Lwt.return pv
|
||||
|
||||
let notify_branch pv locator =
|
||||
let table =
|
||||
let merge w (Worker.Any_request neu) old =
|
||||
let pv = Worker.state w in
|
||||
match neu with
|
||||
| Request.New_branch (_, locator) ->
|
||||
let header, _ = (locator : Block_locator.t :> _ * _) in
|
||||
pv.last_advertised_head <- header ;
|
||||
Some (Worker.Any_request neu)
|
||||
| Request.New_head (_, header) ->
|
||||
pv.last_advertised_head <- header ;
|
||||
(* TODO penalize decreasing fitness *)
|
||||
match old with
|
||||
| Some (Worker.Any_request (Request.New_branch _) as old) ->
|
||||
Some old (* ignore *)
|
||||
| Some (Worker.Any_request (Request.New_head _)) ->
|
||||
Some (Any_request neu)
|
||||
| None ->
|
||||
Some (Any_request neu) in
|
||||
Worker.create_table (Dropbox { merge })
|
||||
|
||||
let create
|
||||
?(notify_new_block = fun _ -> ())
|
||||
?(notify_bootstrapped = fun () -> ())
|
||||
?(notify_termination = fun _ -> ())
|
||||
limits block_validator net_db peer_id =
|
||||
let name = (State.Net.id (Distributed_db.net_state net_db), peer_id) in
|
||||
let parameters = {
|
||||
net_db ;
|
||||
notify_termination ;
|
||||
block_validator ;
|
||||
notify_new_block ;
|
||||
notify_bootstrapped ;
|
||||
limits ;
|
||||
} in
|
||||
let module Handlers = struct
|
||||
type self = t
|
||||
let on_launch = on_launch
|
||||
let on_request = on_request
|
||||
let on_close = on_close
|
||||
let on_error = on_error
|
||||
let on_completion = on_completion
|
||||
let on_no_request _ = return ()
|
||||
end in
|
||||
Worker.launch table ~timeout: limits.new_head_request_timeout limits.worker_limits
|
||||
name parameters
|
||||
(module Handlers)
|
||||
|
||||
let notify_branch w locator =
|
||||
let header, _ = (locator : Block_locator.t :> _ * _) in
|
||||
let hash = Block_header.hash header in
|
||||
(* TODO penalize decreasing fitness *)
|
||||
pv.last_advertised_head <- header ;
|
||||
try Lwt_dropbox.put pv.dropbox (New_branch (hash, locator))
|
||||
with Lwt_dropbox.Closed -> ()
|
||||
Worker.drop_request w (New_branch (hash, locator))
|
||||
|
||||
let notify_head pv header =
|
||||
let notify_head w header =
|
||||
let hash = Block_header.hash header in
|
||||
pv.last_advertised_head <- header ;
|
||||
(* TODO penalize decreasing fitness *)
|
||||
match Lwt_dropbox.peek pv.dropbox with
|
||||
| Some (New_branch _) -> () (* ignore *)
|
||||
| None | Some (New_head _) ->
|
||||
try Lwt_dropbox.put pv.dropbox (New_head (hash, header))
|
||||
with Lwt_dropbox.Closed -> ()
|
||||
Worker.drop_request w (New_head (hash, header))
|
||||
|
||||
let shutdown pv =
|
||||
Lwt_canceler.cancel pv.canceler >>= fun () ->
|
||||
pv.worker
|
||||
let shutdown w =
|
||||
Worker.shutdown w
|
||||
|
||||
let peer_id pv = pv.peer_id
|
||||
let bootstrapped pv = pv.bootstrapped
|
||||
let current_head pv = pv.last_validated_head
|
||||
let peer_id w =
|
||||
let pv = Worker.state w in
|
||||
pv.peer_id
|
||||
|
||||
let bootstrapped w =
|
||||
let pv = Worker.state w in
|
||||
pv.bootstrapped
|
||||
|
||||
let current_head w =
|
||||
let pv = Worker.state w in
|
||||
pv.last_validated_head
|
||||
|
||||
let status = Worker.status
|
||||
|
||||
let running_workers () = Worker.list table
|
||||
|
||||
let current_request t = Worker.current_request t
|
||||
|
||||
let last_events = Worker.last_events
|
||||
|
@ -9,6 +9,14 @@
|
||||
|
||||
type t
|
||||
|
||||
type limits = {
|
||||
new_head_request_timeout: float ;
|
||||
block_header_timeout: float ;
|
||||
block_operations_timeout: float ;
|
||||
protocol_timeout: float ;
|
||||
worker_limits: Worker_types.limits
|
||||
}
|
||||
|
||||
val peer_id: t -> P2p.Peer_id.t
|
||||
val bootstrapped: t -> bool
|
||||
val current_head: t -> Block_header.t
|
||||
@ -16,14 +24,17 @@ val current_head: t -> Block_header.t
|
||||
val create:
|
||||
?notify_new_block: (State.Block.t -> unit) ->
|
||||
?notify_bootstrapped: (unit -> unit) ->
|
||||
?notify_termination: (t -> unit) ->
|
||||
new_head_request_timeout:float ->
|
||||
block_header_timeout:float ->
|
||||
block_operations_timeout:float ->
|
||||
protocol_timeout:float ->
|
||||
?notify_termination: (unit -> unit) ->
|
||||
limits ->
|
||||
Block_validator.t ->
|
||||
Distributed_db.net_db -> P2p.Peer_id.t -> t Lwt.t
|
||||
val shutdown: t -> unit Lwt.t
|
||||
|
||||
val notify_branch: t -> Block_locator.t -> unit
|
||||
val notify_head: t -> Block_header.t -> unit
|
||||
|
||||
val running_workers: unit -> ((Net_id.t * P2p.Peer_id.t) * t) list
|
||||
val status: t -> Worker_types.worker_status
|
||||
|
||||
val current_request : t -> (Time.t * Time.t * Peer_validator_worker_state.Request.view) option
|
||||
val last_events : t -> (Lwt_log_core.level * Peer_validator_worker_state.Event.t list) list
|
||||
|
@ -15,6 +15,7 @@ type t = {
|
||||
db: Distributed_db.t ;
|
||||
block_validator: Block_validator.t ;
|
||||
timeout: Net_validator.timeout ;
|
||||
peer_validator_limits: Peer_validator.limits ;
|
||||
block_validator_limits: Block_validator.limits ;
|
||||
prevalidator_limits: Prevalidator.limits ;
|
||||
|
||||
@ -24,13 +25,15 @@ type t = {
|
||||
}
|
||||
|
||||
let create state db timeout
|
||||
peer_validator_limits
|
||||
block_validator_limits
|
||||
prevalidator_limits =
|
||||
Block_validator.create block_validator_limits db >>= fun block_validator ->
|
||||
let valid_block_input = Lwt_watcher.create_input () in
|
||||
Lwt.return
|
||||
{ state ; db ; timeout ; block_validator ;
|
||||
prevalidator_limits ; block_validator_limits ;
|
||||
prevalidator_limits ;
|
||||
peer_validator_limits ; block_validator_limits ;
|
||||
valid_block_input ;
|
||||
active_nets = Net_id.Table.create 7 ;
|
||||
}
|
||||
@ -44,7 +47,7 @@ let activate v ?bootstrap_threshold ?max_child_ttl net_state =
|
||||
Net_validator.create
|
||||
?bootstrap_threshold
|
||||
?max_child_ttl
|
||||
v.timeout v.prevalidator_limits
|
||||
v.timeout v.peer_validator_limits v.prevalidator_limits
|
||||
v.block_validator v.valid_block_input v.db net_state in
|
||||
Net_id.Table.add v.active_nets net_id nv ;
|
||||
nv
|
||||
|
@ -15,6 +15,7 @@ val create:
|
||||
State.t ->
|
||||
Distributed_db.t ->
|
||||
Net_validator.timeout ->
|
||||
Peer_validator.limits ->
|
||||
Block_validator.limits ->
|
||||
Prevalidator.limits ->
|
||||
t Lwt.t
|
||||
|
115
src/lib_node_shell_base/peer_validator_worker_state.ml
Normal file
115
src/lib_node_shell_base/peer_validator_worker_state.ml
Normal file
@ -0,0 +1,115 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2017. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
module Request = struct
|
||||
type view =
|
||||
| New_head of Block_hash.t
|
||||
| New_branch of Block_hash.t * int
|
||||
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
union
|
||||
[ case (Tag 0)
|
||||
(obj2
|
||||
(req "request" (constant "new_head"))
|
||||
(req "block" Block_hash.encoding))
|
||||
(function New_head h -> Some ((), h) | _ -> None)
|
||||
(fun ((), h) -> New_head h) ;
|
||||
case (Tag 1)
|
||||
(obj3
|
||||
(req "request" (constant "new_branch"))
|
||||
(req "block" Block_hash.encoding)
|
||||
(req "locator_length" uint16))
|
||||
(function New_branch (h, l) -> Some ((), h, l) | _ -> None)
|
||||
(fun ((), h, l) -> New_branch (h, l)) ]
|
||||
|
||||
let pp ppf = function
|
||||
| New_head hash ->
|
||||
Format.fprintf ppf "New head %a" Block_hash.pp hash
|
||||
| New_branch (hash, len) ->
|
||||
Format.fprintf ppf "New branch %a, locator length %d"
|
||||
Block_hash.pp hash len
|
||||
end
|
||||
|
||||
module Event = struct
|
||||
type t =
|
||||
| Request of (Request.view * Worker_types.request_status * error list option)
|
||||
| Debug of string
|
||||
|
||||
let level req =
|
||||
match req with
|
||||
| Debug _ -> Logging.Info
|
||||
| Request _ -> Logging.Notice
|
||||
|
||||
let encoding error_encoding =
|
||||
let open Data_encoding in
|
||||
union
|
||||
[ case (Tag 0)
|
||||
(obj1 (req "message" string))
|
||||
(function Debug msg -> Some msg | _ -> None)
|
||||
(fun msg -> Debug msg) ;
|
||||
case (Tag 1)
|
||||
(obj2
|
||||
(req "request" Request.encoding)
|
||||
(req "status" Worker_types.request_status_encoding))
|
||||
(function Request (req, t, None) -> Some (req, t) | _ -> None)
|
||||
(fun (req, t) -> Request (req, t, None)) ;
|
||||
case (Tag 2)
|
||||
(obj3
|
||||
(req "error" error_encoding)
|
||||
(req "failed_request" Request.encoding)
|
||||
(req "status" Worker_types.request_status_encoding))
|
||||
(function Request (req, t, Some errs) -> Some (errs, req, t) | _ -> None)
|
||||
(fun (errs, req, t) -> Request (req, t, Some errs)) ]
|
||||
|
||||
let pp ppf = function
|
||||
| Debug msg -> Format.fprintf ppf "%s" msg
|
||||
| Request (view, { pushed ; treated ; completed }, None) ->
|
||||
Format.fprintf ppf
|
||||
"@[<v 2>%a@,\
|
||||
Pushed: %a, Treated: %a, Completed: %a@]"
|
||||
Request.pp view
|
||||
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
|
||||
| Request (view, { pushed ; treated ; completed }, Some errors) ->
|
||||
Format.fprintf ppf
|
||||
"@[<v 2>%a@,\
|
||||
Pushed: %a, Treated: %a, Failed: %a@,\
|
||||
Error: %a@]"
|
||||
Request.pp view
|
||||
Time.pp_hum pushed Time.pp_hum treated Time.pp_hum completed
|
||||
Error_monad.pp_print_error errors
|
||||
end
|
||||
|
||||
module Worker_state = struct
|
||||
type view =
|
||||
{ bootstrapped : bool ;
|
||||
mutable last_validated_head: Block_hash.t ;
|
||||
mutable last_advertised_head: Block_hash.t }
|
||||
let encoding =
|
||||
let open Data_encoding in
|
||||
conv
|
||||
(function { bootstrapped ; last_validated_head ; last_advertised_head } ->
|
||||
(bootstrapped, last_validated_head, last_advertised_head))
|
||||
(function (bootstrapped, last_validated_head, last_advertised_head) ->
|
||||
{ bootstrapped ; last_validated_head ; last_advertised_head })
|
||||
(obj3
|
||||
(req "bootstrapped" bool)
|
||||
(req "last_validated_head" Block_hash.encoding)
|
||||
(req "last_advertised_head" Block_hash.encoding))
|
||||
|
||||
let pp ppf state =
|
||||
Format.fprintf ppf
|
||||
"@[<v 0>Bootstrapped: %s@,\
|
||||
Last validated head: %a@,\
|
||||
Last advertised head: %a@]"
|
||||
(if state.bootstrapped then "yes" else "no")
|
||||
Block_hash.pp state.last_validated_head
|
||||
Block_hash.pp state.last_advertised_head
|
||||
|
||||
end
|
34
src/lib_node_shell_base/peer_validator_worker_state.mli
Normal file
34
src/lib_node_shell_base/peer_validator_worker_state.mli
Normal file
@ -0,0 +1,34 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2017. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
module Request : sig
|
||||
type view =
|
||||
| New_head of Block_hash.t
|
||||
| New_branch of Block_hash.t * int
|
||||
val encoding : view Data_encoding.encoding
|
||||
val pp : Format.formatter -> view -> unit
|
||||
end
|
||||
|
||||
module Event : sig
|
||||
type t =
|
||||
| Request of (Request.view * Worker_types.request_status * error list option)
|
||||
| Debug of string
|
||||
val level : t -> Logging.level
|
||||
val encoding : error list Data_encoding.encoding -> t Data_encoding.encoding
|
||||
val pp : Format.formatter -> t -> unit
|
||||
end
|
||||
|
||||
module Worker_state : sig
|
||||
type view =
|
||||
{ bootstrapped : bool ;
|
||||
mutable last_validated_head: Block_hash.t ;
|
||||
mutable last_advertised_head: Block_hash.t }
|
||||
val encoding : view Data_encoding.encoding
|
||||
val pp : Format.formatter -> view -> unit
|
||||
end
|
Loading…
Reference in New Issue
Block a user