Node: switch the net validator to Tezos_worker

This commit is contained in:
Benjamin Canou 2018-01-22 17:21:23 +01:00 committed by Grégoire Henry
parent e5405c2f72
commit 0779221d04
12 changed files with 543 additions and 318 deletions

View File

@ -53,11 +53,10 @@ and log = {
} }
and shell = { and shell = {
bootstrap_threshold : int ;
block_validator_limits : Node.block_validator_limits ; block_validator_limits : Node.block_validator_limits ;
prevalidator_limits : Node.prevalidator_limits ; prevalidator_limits : Node.prevalidator_limits ;
timeout : Node.timeout ;
peer_validator_limits : Node.peer_validator_limits ; peer_validator_limits : Node.peer_validator_limits ;
net_validator_limits : Node.net_validator_limits ;
} }
let default_net_limits : P2p.limits = { let default_net_limits : P2p.limits = {
@ -106,7 +105,6 @@ let default_log = {
} }
let default_shell = { let default_shell = {
bootstrap_threshold = 4 ;
block_validator_limits = { block_validator_limits = {
protocol_timeout = 120. ; protocol_timeout = 120. ;
worker_limits = { worker_limits = {
@ -126,12 +124,6 @@ let default_shell = {
zombie_memory = 120. ; zombie_memory = 120. ;
} }
} ; } ;
timeout = {
block_header = 60. ;
block_operations = 60. ;
protocol = 120. ;
new_head_request = 90. ;
} ;
peer_validator_limits = { peer_validator_limits = {
block_header_timeout = 60. ; block_header_timeout = 60. ;
block_operations_timeout = 60. ; block_operations_timeout = 60. ;
@ -144,6 +136,15 @@ let default_shell = {
zombie_memory = 120. ; zombie_memory = 120. ;
} }
} ; } ;
net_validator_limits = {
bootstrap_threshold = 4 ;
worker_limits = {
backlog_size = 1000 ;
backlog_level = Logging.Info ;
zombie_lifetime = 600. ;
zombie_memory = 120. ;
}
}
} }
let default_config = { let default_config = {
@ -367,44 +368,39 @@ let peer_validator_limits_encoding =
default_limits.worker_limits.zombie_lifetime default_limits.worker_limits.zombie_lifetime
default_limits.worker_limits.zombie_memory)) default_limits.worker_limits.zombie_memory))
let timeout_encoding = let net_validator_limits_encoding =
let open Data_encoding in let open Data_encoding in
let uint8 = conv int_of_float float_of_int uint8 in
conv conv
(fun { Node.block_header ; block_operations ; (fun { Node.bootstrap_threshold ; worker_limits } ->
protocol ; new_head_request } -> (bootstrap_threshold, worker_limits))
(block_header, block_operations, (fun (bootstrap_threshold, worker_limits) ->
protocol, new_head_request)) { bootstrap_threshold ; worker_limits})
(fun (block_header, block_operations, (merge_objs
protocol, new_head_request) -> (obj1
{ block_header ; block_operations ; (dft "bootstrap_threshold" uint8
protocol ; new_head_request }) default_shell.net_validator_limits.bootstrap_threshold))
(obj4 (worker_limits_encoding
(dft "block_header" uint8 default_shell.timeout.block_header) default_shell.net_validator_limits.worker_limits.backlog_size
(dft "block_operations" uint8 default_shell.timeout.block_operations) default_shell.net_validator_limits.worker_limits.backlog_level
(dft "protocol" uint8 default_shell.timeout.protocol) default_shell.net_validator_limits.worker_limits.zombie_lifetime
(dft "new_head_request" uint8 default_shell.timeout.new_head_request) default_shell.net_validator_limits.worker_limits.zombie_memory))
)
let shell = let shell =
let open Data_encoding in let open Data_encoding in
conv conv
(fun { bootstrap_threshold ; timeout ; peer_validator_limits ; (fun { peer_validator_limits ; block_validator_limits ;
block_validator_limits ; prevalidator_limits } -> prevalidator_limits ; net_validator_limits } ->
bootstrap_threshold, timeout, peer_validator_limits, (peer_validator_limits, block_validator_limits,
block_validator_limits, prevalidator_limits) prevalidator_limits, net_validator_limits))
(fun (bootstrap_threshold, timeout, peer_validator_limits, (fun (peer_validator_limits, block_validator_limits,
block_validator_limits, prevalidator_limits) -> prevalidator_limits, net_validator_limits) ->
{ bootstrap_threshold ; timeout ; { peer_validator_limits ; block_validator_limits ;
peer_validator_limits ; prevalidator_limits ; net_validator_limits })
block_validator_limits ; (obj4
prevalidator_limits })
(obj5
(dft "bootstrap_threshold" uint8 default_shell.bootstrap_threshold)
(dft "timeout" timeout_encoding default_shell.timeout)
(dft "peer_validator" peer_validator_limits_encoding default_shell.peer_validator_limits) (dft "peer_validator" peer_validator_limits_encoding default_shell.peer_validator_limits)
(dft "block_validator" block_validator_limits_encoding default_shell.block_validator_limits) (dft "block_validator" block_validator_limits_encoding default_shell.block_validator_limits)
(dft "prevalidator" prevalidator_limits_encoding default_shell.prevalidator_limits) (dft "prevalidator" prevalidator_limits_encoding default_shell.prevalidator_limits)
(dft "net_validator" net_validator_limits_encoding default_shell.net_validator_limits)
) )
let encoding = let encoding =
@ -518,14 +514,16 @@ let update
output = Option.unopt ~default:cfg.log.output log_output ; output = Option.unopt ~default:cfg.log.output log_output ;
} }
and shell : shell = { and shell : shell = {
bootstrap_threshold =
Option.unopt
~default:cfg.shell.bootstrap_threshold
bootstrap_threshold ;
timeout = cfg.shell.timeout ;
peer_validator_limits = cfg.shell.peer_validator_limits ; peer_validator_limits = cfg.shell.peer_validator_limits ;
block_validator_limits = cfg.shell.block_validator_limits ; block_validator_limits = cfg.shell.block_validator_limits ;
prevalidator_limits = cfg.shell.prevalidator_limits ; prevalidator_limits = cfg.shell.prevalidator_limits ;
net_validator_limits =
Option.unopt_map
~default:cfg.shell.net_validator_limits
~f:(fun bootstrap_threshold ->
{ cfg.shell.net_validator_limits
with bootstrap_threshold })
bootstrap_threshold
} }
in in
return { data_dir ; net ; rpc ; log ; shell } return { data_dir ; net ; rpc ; log ; shell }

View File

@ -43,11 +43,10 @@ and log = {
} }
and shell = { and shell = {
bootstrap_threshold : int ;
block_validator_limits : Node.block_validator_limits ; block_validator_limits : Node.block_validator_limits ;
prevalidator_limits : Node.prevalidator_limits ; prevalidator_limits : Node.prevalidator_limits ;
timeout : Node.timeout ;
peer_validator_limits : Node.peer_validator_limits ; peer_validator_limits : Node.peer_validator_limits ;
net_validator_limits : Node.net_validator_limits ;
} }
val default_data_dir: string val default_data_dir: string

View File

@ -168,14 +168,13 @@ let init_node ?sandbox (config : Node_config_file.t) =
context_root = context_dir config.data_dir ; context_root = context_dir config.data_dir ;
p2p = p2p_config ; p2p = p2p_config ;
test_network_max_tll = Some (48 * 3600) ; (* 2 days *) test_network_max_tll = Some (48 * 3600) ; (* 2 days *)
bootstrap_threshold = config.shell.bootstrap_threshold ;
} in } in
Node.create Node.create
node_config node_config
config.shell.timeout
config.shell.peer_validator_limits config.shell.peer_validator_limits
config.shell.block_validator_limits config.shell.block_validator_limits
config.shell.prevalidator_limits config.shell.prevalidator_limits
config.shell.net_validator_limits
let () = let () =
let old_hook = !Lwt.async_exception_hook in let old_hook = !Lwt.async_exception_hook in

View File

@ -7,252 +7,152 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
include Logging.Make(struct let name = "node.validator.net" end) open Net_validator_worker_state
type t = { module Name = struct
type t = Net_id.t
let encoding = Net_id.encoding
let base = [ "net_validator" ]
let pp = Net_id.pp_short
end
db: Distributed_db.t ; module Request = struct
net_state: State.Net.t ; include Request
net_db: Distributed_db.net_db ; type _ t = Validated : State.Block.t -> Event.update t
block_validator: Block_validator.t ; let view (type a) (Validated block : a t) : view =
State.Block.hash block
end
timeout: timeout ; type limits = {
prevalidator_limits: Prevalidator.limits ;
peer_validator_limits: Peer_validator.limits ;
bootstrap_threshold: int ; bootstrap_threshold: int ;
mutable bootstrapped: bool ; worker_limits: Worker_types.limits
bootstrapped_waiter: unit Lwt.t ;
bootstrapped_wakener: unit Lwt.u ;
valid_block_input: State.Block.t Lwt_watcher.input ;
global_valid_block_input: State.Block.t Lwt_watcher.input ;
new_head_input: State.Block.t Lwt_watcher.input ;
parent: t option ;
max_child_ttl: int option ;
mutable child: t option ;
prevalidator: Prevalidator.t ;
active_peers: Peer_validator.t Lwt.t P2p.Peer_id.Table.t ;
bootstrapped_peers: unit P2p.Peer_id.Table.t ;
mutable worker: unit Lwt.t ;
queue: State.Block.t Lwt_pipe.t ;
canceler: Lwt_canceler.t ;
} }
and timeout = { module Types = struct
block_header: float ; include Worker_state
block_operations: float ;
protocol: float ;
new_head_request: float ;
}
type parameters = {
parent: Name.t option ;
db: Distributed_db.t ;
net_state: State.Net.t ;
net_db: Distributed_db.net_db ;
block_validator: Block_validator.t ;
global_valid_block_input: State.Block.t Lwt_watcher.input ;
let rec shutdown nv = prevalidator_limits: Prevalidator.limits ;
Lwt_canceler.cancel nv.canceler >>= fun () -> peer_validator_limits: Peer_validator.limits ;
Distributed_db.deactivate nv.net_db >>= fun () -> max_child_ttl: int option ;
Lwt.join limits: limits;
( nv.worker :: }
Prevalidator.shutdown nv.prevalidator ::
Lwt_utils.may ~f:shutdown nv.child :: type state = {
P2p.Peer_id.Table.fold parameters: parameters ;
(fun _ pv acc -> (pv >>= Peer_validator.shutdown) :: acc)
nv.active_peers [] ) >>= fun () -> mutable bootstrapped: bool ;
Lwt.return_unit bootstrapped_waiter: unit Lwt.t ;
bootstrapped_wakener: unit Lwt.u ;
valid_block_input: State.Block.t Lwt_watcher.input ;
new_head_input: State.Block.t Lwt_watcher.input ;
mutable child:
(state * (unit -> unit Lwt.t (* shutdown *))) option ;
prevalidator: Prevalidator.t ;
active_peers: Peer_validator.t Lwt.t P2p.Peer_id.Table.t ;
bootstrapped_peers: unit P2p.Peer_id.Table.t ;
}
let view (state : state) _ : view =
let { bootstrapped ; active_peers ; bootstrapped_peers } = state in
{ bootstrapped ;
active_peers =
P2p.Peer_id.Table.fold (fun id _ l -> id :: l) active_peers [] ;
bootstrapped_peers =
P2p.Peer_id.Table.fold (fun id _ l -> id :: l) bootstrapped_peers [] }
end
module Worker = Worker.Make (Name) (Event) (Request) (Types)
open Types
type t = Worker.infinite Worker.queue Worker.t
let table = Worker.create_table Queue
let shutdown w =
Worker.shutdown w
let shutdown_child nv = let shutdown_child nv =
Lwt_utils.may ~f:shutdown nv.child Lwt_utils.may ~f:(fun (_, shutdown) -> shutdown ()) nv.child
let notify_new_block nv block = let notify_new_block w block =
Option.iter nv.parent let nv = Worker.state w in
~f:(fun nv -> Lwt_watcher.notify nv.valid_block_input block) ; Option.iter nv.parameters.parent
~f:(fun id -> try
let w = List.assoc id (Worker.list table) in
let nv = Worker.state w in
Lwt_watcher.notify nv.valid_block_input block
with Not_found -> ()) ;
Lwt_watcher.notify nv.valid_block_input block ; Lwt_watcher.notify nv.valid_block_input block ;
Lwt_watcher.notify nv.global_valid_block_input block ; Lwt_watcher.notify nv.parameters.global_valid_block_input block ;
assert (Lwt_pipe.push_now nv.queue block) Worker.push_request_now w (Validated block)
let may_toggle_bootstrapped_network nv = let may_toggle_bootstrapped_network w =
let nv = Worker.state w in
if not nv.bootstrapped && if not nv.bootstrapped &&
P2p.Peer_id.Table.length nv.bootstrapped_peers >= nv.bootstrap_threshold P2p.Peer_id.Table.length nv.bootstrapped_peers >= nv.parameters.limits.bootstrap_threshold
then begin then begin
nv.bootstrapped <- true ; nv.bootstrapped <- true ;
Lwt.wakeup_later nv.bootstrapped_wakener () ; Lwt.wakeup_later nv.bootstrapped_wakener () ;
end end
let may_activate_peer_validator nv peer_id = let may_activate_peer_validator w peer_id =
let nv = Worker.state w in
try P2p.Peer_id.Table.find nv.active_peers peer_id try P2p.Peer_id.Table.find nv.active_peers peer_id
with Not_found -> with Not_found ->
let pv = let pv =
Peer_validator.create Peer_validator.create
~notify_new_block:(notify_new_block nv) ~notify_new_block:(notify_new_block w)
~notify_bootstrapped: begin fun () -> ~notify_bootstrapped: begin fun () ->
P2p.Peer_id.Table.add nv.bootstrapped_peers peer_id () ; P2p.Peer_id.Table.add nv.bootstrapped_peers peer_id () ;
may_toggle_bootstrapped_network nv may_toggle_bootstrapped_network w
end end
~notify_termination: begin fun _pv -> ~notify_termination: begin fun _pv ->
P2p.Peer_id.Table.remove nv.active_peers peer_id ; P2p.Peer_id.Table.remove nv.active_peers peer_id ;
P2p.Peer_id.Table.remove nv.bootstrapped_peers peer_id ; P2p.Peer_id.Table.remove nv.bootstrapped_peers peer_id ;
end end
nv.peer_validator_limits nv.parameters.peer_validator_limits
nv.block_validator nv.net_db peer_id in nv.parameters.block_validator
nv.parameters.net_db
peer_id in
P2p.Peer_id.Table.add nv.active_peers peer_id pv ; P2p.Peer_id.Table.add nv.active_peers peer_id pv ;
pv pv
let broadcast_head nv ~previous block = let may_switch_test_network w spawn_child block =
if not nv.bootstrapped then let nv = Worker.state w in
Lwt.return_unit
else begin
begin
State.Block.predecessor block >>= function
| None -> Lwt.return_true
| Some predecessor ->
Lwt.return (State.Block.equal predecessor previous)
end >>= fun successor ->
if successor then begin
Distributed_db.Advertise.current_head nv.net_db block ;
Lwt.return_unit
end else begin
Chain.locator (Distributed_db.net_state nv.net_db) >>= fun locator ->
Distributed_db.Advertise.current_branch nv.net_db locator
end
end
let rec create
?max_child_ttl ?parent
?(bootstrap_threshold = 1)
timeout peer_validator_limits prevalidator_limits block_validator
global_valid_block_input db net_state =
Chain.init_head net_state >>= fun () ->
let net_db = Distributed_db.activate db net_state in
Prevalidator.create
prevalidator_limits net_db >>= fun prevalidator ->
let valid_block_input = Lwt_watcher.create_input () in
let new_head_input = Lwt_watcher.create_input () in
let canceler = Lwt_canceler.create () in
let bootstrapped_waiter, bootstrapped_wakener = Lwt.wait () in
let nv = {
db ; net_state ; net_db ; block_validator ;
prevalidator ;
timeout ; prevalidator_limits ; peer_validator_limits ;
valid_block_input ; global_valid_block_input ;
new_head_input ;
parent ; max_child_ttl ; child = None ;
bootstrapped = (bootstrap_threshold <= 0) ;
bootstrapped_waiter ;
bootstrapped_wakener ;
bootstrap_threshold ;
active_peers =
P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *)
bootstrapped_peers =
P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *)
worker = Lwt.return_unit ;
queue = Lwt_pipe.create () ;
canceler ;
} in
if nv.bootstrapped then Lwt.wakeup_later bootstrapped_wakener () ;
Distributed_db.set_callback net_db {
notify_branch = begin fun peer_id locator ->
Lwt.async begin fun () ->
may_activate_peer_validator nv peer_id >>= fun pv ->
Peer_validator.notify_branch pv locator ;
Lwt.return_unit
end
end ;
notify_head = begin fun peer_id block ops ->
Lwt.async begin fun () ->
may_activate_peer_validator nv peer_id >>= fun pv ->
Peer_validator.notify_head pv block ;
(* TODO notify prevalidator only if head is known ??? *)
Prevalidator.notify_operations nv.prevalidator peer_id ops ;
Lwt.return_unit
end;
end ;
disconnection = begin fun peer_id ->
Lwt.async begin fun () ->
may_activate_peer_validator nv peer_id >>= fun pv ->
Peer_validator.shutdown pv >>= fun () ->
Lwt.return_unit
end
end ;
} ;
nv.worker <-
Lwt_utils.worker
(Format.asprintf "net_validator.%a" Net_id.pp (State.Net.id net_state))
~run:(fun () -> worker_loop nv)
~cancel:(fun () -> Lwt_canceler.cancel nv.canceler) ;
Lwt.return nv
(** Current block computation *)
and worker_loop nv =
begin
Lwt_utils.protect ~canceler:nv.canceler begin fun () ->
Lwt_pipe.pop nv.queue >>= return
end >>=? fun block ->
Chain.head nv.net_state >>= fun head ->
let head_header = State.Block.header head
and head_hash = State.Block.hash head
and block_header = State.Block.header block
and block_hash = State.Block.hash block in
if
Fitness.(block_header.shell.fitness <= head_header.shell.fitness)
then
lwt_log_info "current head is better than %a %a %a, we do not switch"
Block_hash.pp_short block_hash
Fitness.pp block_header.shell.fitness
Time.pp_hum block_header.shell.timestamp >>= fun () ->
return ()
else begin
Chain.set_head nv.net_state block >>= fun previous ->
broadcast_head nv ~previous block >>= fun () ->
Prevalidator.flush nv.prevalidator block_hash >>=? fun () ->
may_switch_test_network nv block >>= fun () ->
Lwt_watcher.notify nv.new_head_input block ;
lwt_log_notice "update current head %a %a %a(%t)"
Block_hash.pp_short block_hash
Fitness.pp block_header.shell.fitness
Time.pp_hum block_header.shell.timestamp
(fun ppf ->
if Block_hash.equal head_hash block_header.shell.predecessor then
Format.fprintf ppf "same branch"
else
Format.fprintf ppf "changing branch") >>= fun () ->
return ()
end
end >>= function
| Ok () ->
worker_loop nv
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
Lwt.return_unit
| Error err ->
lwt_log_error "@[Unexpected error:@ %a@]"
pp_print_error err >>= fun () ->
Lwt_canceler.cancel nv.canceler >>= fun () ->
Lwt.return_unit
and may_switch_test_network nv block =
let create_child genesis protocol expiration = let create_child genesis protocol expiration =
if State.Net.allow_forked_network nv.net_state then begin if State.Net.allow_forked_network nv.parameters.net_state then begin
shutdown_child nv >>= fun () -> shutdown_child nv >>= fun () ->
begin begin
let net_id = Net_id.of_block_hash (State.Block.hash genesis) in let net_id = Net_id.of_block_hash (State.Block.hash genesis) in
State.Net.get State.Net.get
(State.Net.global_state nv.net_state) net_id >>= function (State.Net.global_state nv.parameters.net_state) net_id >>= function
| Ok net_state -> return net_state | Ok net_state -> return net_state
| Error _ -> | Error _ ->
State.fork_testnet State.fork_testnet
genesis protocol expiration >>=? fun net_state -> genesis protocol expiration >>=? fun net_state ->
Chain.head net_state >>= fun new_genesis_block -> Chain.head net_state >>= fun new_genesis_block ->
Lwt_watcher.notify nv.global_valid_block_input new_genesis_block ; Lwt_watcher.notify nv.parameters.global_valid_block_input new_genesis_block ;
Lwt_watcher.notify nv.valid_block_input new_genesis_block ; Lwt_watcher.notify nv.valid_block_input new_genesis_block ;
return net_state return net_state
end >>=? fun net_state -> end >>=? fun net_state ->
create spawn_child
~parent:nv nv.timeout nv.peer_validator_limits ~parent:(State.Net.id net_state)
nv.prevalidator_limits nv.block_validator nv.parameters.peer_validator_limits
nv.global_valid_block_input nv.parameters.prevalidator_limits
nv.db net_state >>= fun child -> nv.parameters.block_validator
nv.parameters.global_valid_block_input
nv.parameters.db net_state
nv.parameters.limits (* TODO: different limits main/test ? *) >>= fun child ->
nv.child <- Some child ; nv.child <- Some child ;
return () return ()
end else begin end else begin
@ -264,13 +164,13 @@ and may_switch_test_network nv block =
let activated = let activated =
match nv.child with match nv.child with
| None -> false | None -> false
| Some child -> | Some (child , _) ->
Block_hash.equal Block_hash.equal
(State.Net.genesis child.net_state).block (State.Net.genesis child.parameters.net_state).block
genesis in genesis in
State.Block.read nv.net_state genesis >>=? fun genesis -> State.Block.read nv.parameters.net_state genesis >>=? fun genesis ->
begin begin
match nv.max_child_ttl with match nv.parameters.max_child_ttl with
| None -> Lwt.return expiration | None -> Lwt.return expiration
| Some ttl -> | Some ttl ->
Lwt.return Lwt.return
@ -297,51 +197,225 @@ and may_switch_test_network nv block =
end >>= function end >>= function
| Ok () -> Lwt.return_unit | Ok () -> Lwt.return_unit
| Error err -> | Error err ->
lwt_log_error "@[<v 2>Error while switch test network:@ %a@]" Worker.record_event w (Could_not_switch_testnet err) ;
Error_monad.pp_print_error err >>= fun () ->
Lwt.return_unit Lwt.return_unit
let broadcast_head w ~previous block =
let nv = Worker.state w in
if not nv.bootstrapped then
Lwt.return_unit
else begin
begin
State.Block.predecessor block >>= function
| None -> Lwt.return_true
| Some predecessor ->
Lwt.return (State.Block.equal predecessor previous)
end >>= fun successor ->
if successor then begin
Distributed_db.Advertise.current_head
nv.parameters.net_db block ;
Lwt.return_unit
end else begin
let net_state = Distributed_db.net_state nv.parameters.net_db in
Chain.locator net_state >>= fun locator ->
Distributed_db.Advertise.current_branch
nv.parameters.net_db locator
end
end
(* TODO check the initial sequence of message when connecting to a new let on_request (type a) w spawn_child (req : a Request.t) : a tzresult Lwt.t =
peer, and the one when activating a network. *) let Request.Validated block = req in
let nv = Worker.state w in
Chain.head nv.parameters.net_state >>= fun head ->
let head_header = State.Block.header head
and head_hash = State.Block.hash head
and block_header = State.Block.header block
and block_hash = State.Block.hash block in
if
Fitness.(block_header.shell.fitness <= head_header.shell.fitness)
then
return Event.Ignored_head
else begin
Chain.set_head nv.parameters.net_state block >>= fun previous ->
broadcast_head w ~previous block >>= fun () ->
Prevalidator.flush nv.prevalidator block_hash >>=? fun () ->
may_switch_test_network w spawn_child block >>= fun () ->
Lwt_watcher.notify nv.new_head_input block ;
if Block_hash.equal head_hash block_header.shell.predecessor then
return Event.Head_incrememt
else
return Event.Branch_switch
end
let on_completion (type a) w (req : a Request.t) (update : a) request_status =
let Request.Validated block = req in
let fitness = State.Block.fitness block in
let request = State.Block.hash block in
Worker.record_event w (Processed_block { request ; request_status ; update ; fitness }) ;
Lwt.return ()
let on_close w =
let nv = Worker.state w in
Distributed_db.deactivate nv.parameters.net_db >>= fun () ->
Lwt.join
(Prevalidator.shutdown nv.prevalidator ::
Lwt_utils.may ~f:(fun (_, shutdown) -> shutdown ()) nv.child ::
P2p.Peer_id.Table.fold
(fun _ pv acc -> (pv >>= Peer_validator.shutdown) :: acc)
nv.active_peers []) >>= fun () ->
Lwt.return_unit
let on_launch w _ parameters =
Chain.init_head parameters.net_state >>= fun () ->
Prevalidator.create
parameters.prevalidator_limits parameters.net_db >>= fun prevalidator ->
let valid_block_input = Lwt_watcher.create_input () in
let new_head_input = Lwt_watcher.create_input () in
let bootstrapped_waiter, bootstrapped_wakener = Lwt.wait () in
let nv =
{ parameters ;
valid_block_input ;
new_head_input ;
bootstrapped_wakener ;
bootstrapped_waiter ;
bootstrapped = (parameters.limits.bootstrap_threshold <= 0) ;
active_peers =
P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *)
bootstrapped_peers =
P2p.Peer_id.Table.create 50 ; (* TODO use `2 * max_connection` *)
child = None ;
prevalidator } in
if nv.bootstrapped then Lwt.wakeup_later bootstrapped_wakener () ;
Distributed_db.set_callback parameters.net_db {
notify_branch = begin fun peer_id locator ->
Lwt.async begin fun () ->
may_activate_peer_validator w peer_id >>= fun pv ->
Peer_validator.notify_branch pv locator ;
Lwt.return_unit
end
end ;
notify_head = begin fun peer_id block ops ->
Lwt.async begin fun () ->
may_activate_peer_validator w peer_id >>= fun pv ->
Peer_validator.notify_head pv block ;
(* TODO notify prevalidator only if head is known ??? *)
Prevalidator.notify_operations nv.prevalidator peer_id ops ;
Lwt.return_unit
end;
end ;
disconnection = begin fun peer_id ->
Lwt.async begin fun () ->
may_activate_peer_validator w peer_id >>= fun pv ->
Peer_validator.shutdown pv >>= fun () ->
Lwt.return_unit
end
end ;
} ;
Lwt.return nv
let rec create
?max_child_ttl ?parent
peer_validator_limits prevalidator_limits block_validator
global_valid_block_input db net_state limits =
let spawn_child ~parent pvl pl bl gvbi db n l =
create ~parent pvl pl bl gvbi db n l >>= fun w ->
Lwt.return (Worker.state w, (fun () -> Worker.shutdown w)) in
let module Handlers = struct
type self = t
let on_launch = on_launch
let on_request w = on_request w spawn_child
let on_close = on_close
let on_error _ _ _ errs = Lwt.return (Error errs)
let on_completion = on_completion
let on_no_request _ = return ()
end in
let parameters =
{ max_child_ttl ;
parent ;
peer_validator_limits ;
prevalidator_limits ;
block_validator ;
global_valid_block_input ;
db ;
net_db = Distributed_db.activate db net_state ;
net_state ;
limits } in
Worker.launch table
prevalidator_limits.worker_limits
(State.Net.id net_state)
parameters
(module Handlers)
(** Current block computation *)
let create let create
?max_child_ttl ?max_child_ttl
?bootstrap_threshold peer_validator_limits prevalidator_limits
timeout block_validator global_valid_block_input global_db state limits =
block_validator global_valid_block_input global_db state =
(* hide the optional ?parent *) (* hide the optional ?parent *)
create create
?max_child_ttl ?max_child_ttl
?bootstrap_threshold peer_validator_limits prevalidator_limits
timeout block_validator global_valid_block_input global_db state block_validator global_valid_block_input global_db state limits
let net_id { net_state } = State.Net.id net_state let net_id w =
let net_state { net_state } = net_state let { parameters = { net_state } } = Worker.state w in
let prevalidator { prevalidator } = prevalidator State.Net.id net_state
let net_db { net_db } = net_db
let child { child } = child
let validate_block nv ?(force = false) hash block operations = let net_state w =
let { parameters = { net_state } } = Worker.state w in
net_state
let prevalidator w =
let { prevalidator } = Worker.state w in
prevalidator
let net_db w =
let { parameters = { net_db } } = Worker.state w in
net_db
let child w =
match (Worker.state w).child with
| None -> None
| Some ({ parameters = { net_state } }, _) ->
try Some (List.assoc (State.Net.id net_state) (Worker.list table))
with Not_found -> None
let validate_block w ?(force = false) hash block operations =
let nv = Worker.state w in
assert (Block_hash.equal hash (Block_header.hash block)) ; assert (Block_hash.equal hash (Block_header.hash block)) ;
Chain.head nv.net_state >>= fun head -> Chain.head nv.parameters.net_state >>= fun head ->
let head = State.Block.header head in let head = State.Block.header head in
if if
force || Fitness.(head.shell.fitness <= block.shell.fitness) force || Fitness.(head.shell.fitness <= block.shell.fitness)
then then
Block_validator.validate Block_validator.validate
~canceler:nv.canceler ~canceler:(Worker.canceler w)
~notify_new_block:(notify_new_block nv) ~notify_new_block:(notify_new_block w)
nv.block_validator nv.net_db hash block operations nv.parameters.block_validator
nv.parameters.net_db
hash block operations
else else
failwith "Fitness too low" failwith "Fitness too low"
let bootstrapped { bootstrapped_waiter } = let bootstrapped w =
let { bootstrapped_waiter } = Worker.state w in
Lwt.protected bootstrapped_waiter Lwt.protected bootstrapped_waiter
let valid_block_watcher { valid_block_input } = let valid_block_watcher w =
let{ valid_block_input } = Worker.state w in
Lwt_watcher.create_stream valid_block_input Lwt_watcher.create_stream valid_block_input
let new_head_watcher { new_head_input } = let new_head_watcher w =
let { new_head_input } = Worker.state w in
Lwt_watcher.create_stream new_head_input Lwt_watcher.create_stream new_head_input
let status = Worker.status
let running_workers () = Worker.list table
let pending_requests t = Worker.pending_requests t
let current_request t = Worker.current_request t
let last_events = Worker.last_events

View File

@ -9,23 +9,20 @@
type t type t
type timeout = { type limits = {
block_header: float ; bootstrap_threshold: int ;
block_operations: float ; worker_limits: Worker_types.limits
protocol: float ;
new_head_request: float ;
} }
val create: val create:
?max_child_ttl:int -> ?max_child_ttl:int ->
?bootstrap_threshold:int ->
timeout ->
Peer_validator.limits -> Peer_validator.limits ->
Prevalidator.limits -> Prevalidator.limits ->
Block_validator.t -> Block_validator.t ->
State.Block.t Lwt_watcher.input -> State.Block.t Lwt_watcher.input ->
Distributed_db.t -> Distributed_db.t ->
State.Net.t -> State.Net.t ->
limits ->
t Lwt.t t Lwt.t
val bootstrapped: t -> unit Lwt.t val bootstrapped: t -> unit Lwt.t
@ -47,3 +44,9 @@ val shutdown: t -> unit Lwt.t
val valid_block_watcher: t -> State.Block.t Lwt_stream.t * Lwt_watcher.stopper val valid_block_watcher: t -> State.Block.t Lwt_stream.t * Lwt_watcher.stopper
val new_head_watcher: t -> State.Block.t Lwt_stream.t * Lwt_watcher.stopper val new_head_watcher: t -> State.Block.t Lwt_stream.t * Lwt_watcher.stopper
val running_workers: unit -> (Net_id.t * t) list
val status: t -> Worker_types.worker_status
val pending_requests : t -> (Time.t * Net_validator_worker_state.Request.view) list
val current_request : t -> (Time.t * Time.t * Net_validator_worker_state.Request.view) option
val last_events : t -> (Lwt_log_core.level * Net_validator_worker_state.Event.t list) list

View File

@ -86,15 +86,8 @@ type config = {
patch_context: (Context.t -> Context.t Lwt.t) option ; patch_context: (Context.t -> Context.t Lwt.t) option ;
p2p: (P2p.config * P2p.limits) option ; p2p: (P2p.config * P2p.limits) option ;
test_network_max_tll: int option ; test_network_max_tll: int option ;
bootstrap_threshold: int ;
} }
and timeout = Net_validator.timeout = {
block_header: float ;
block_operations: float ;
protocol: float ;
new_head_request: float ;
}
and peer_validator_limits = Peer_validator.limits = { and peer_validator_limits = Peer_validator.limits = {
new_head_request_timeout: float ; new_head_request_timeout: float ;
block_header_timeout: float ; block_header_timeout: float ;
@ -114,6 +107,11 @@ and block_validator_limits = Block_validator.limits = {
worker_limits : Worker_types.limits ; worker_limits : Worker_types.limits ;
} }
and net_validator_limits = Net_validator.limits = {
bootstrap_threshold: int ;
worker_limits : Worker_types.limits ;
}
let may_create_net state genesis = let may_create_net state genesis =
State.Net.get state (Net_id.of_block_hash genesis.State.Net.block) >>= function State.Net.get state (Net_id.of_block_hash genesis.State.Net.block) >>= function
| Ok net -> Lwt.return net | Ok net -> Lwt.return net
@ -122,22 +120,22 @@ let may_create_net state genesis =
let create { genesis ; store_root ; context_root ; let create { genesis ; store_root ; context_root ;
patch_context ; p2p = net_params ; patch_context ; p2p = net_params ;
test_network_max_tll = max_child_ttl ; test_network_max_tll = max_child_ttl }
bootstrap_threshold }
timeout
peer_validator_limits peer_validator_limits
block_validator_limits block_validator_limits
prevalidator_limits = prevalidator_limits
net_validator_limits =
init_p2p net_params >>=? fun p2p -> init_p2p net_params >>=? fun p2p ->
State.read State.read
~store_root ~context_root ?patch_context () >>=? fun state -> ~store_root ~context_root ?patch_context () >>=? fun state ->
let distributed_db = Distributed_db.create state p2p in let distributed_db = Distributed_db.create state p2p in
Validator.create state distributed_db timeout Validator.create state distributed_db
peer_validator_limits peer_validator_limits
block_validator_limits prevalidator_limits >>= fun validator -> block_validator_limits
prevalidator_limits
net_validator_limits >>= fun validator ->
may_create_net state genesis >>= fun mainnet_state -> may_create_net state genesis >>= fun mainnet_state ->
Validator.activate validator Validator.activate validator
~bootstrap_threshold
?max_child_ttl mainnet_state >>= fun mainnet_validator -> ?max_child_ttl mainnet_state >>= fun mainnet_validator ->
let shutdown () = let shutdown () =
P2p.shutdown p2p >>= fun () -> P2p.shutdown p2p >>= fun () ->

View File

@ -16,15 +16,8 @@ type config = {
patch_context: (Context.t -> Context.t Lwt.t) option ; patch_context: (Context.t -> Context.t Lwt.t) option ;
p2p: (P2p.config * P2p.limits) option ; p2p: (P2p.config * P2p.limits) option ;
test_network_max_tll: int option ; test_network_max_tll: int option ;
bootstrap_threshold: int ;
} }
and timeout = {
block_header: float ;
block_operations: float ;
protocol: float ;
new_head_request: float ;
}
and peer_validator_limits = { and peer_validator_limits = {
new_head_request_timeout: float ; new_head_request_timeout: float ;
block_header_timeout: float ; block_header_timeout: float ;
@ -41,13 +34,17 @@ and block_validator_limits = {
protocol_timeout: float ; protocol_timeout: float ;
worker_limits : Worker_types.limits ; worker_limits : Worker_types.limits ;
} }
and net_validator_limits = {
bootstrap_threshold: int ;
worker_limits : Worker_types.limits ;
}
val create: val create:
config -> config ->
timeout ->
peer_validator_limits -> peer_validator_limits ->
block_validator_limits -> block_validator_limits ->
prevalidator_limits -> prevalidator_limits ->
net_validator_limits ->
t tzresult Lwt.t t tzresult Lwt.t
module RPC : sig module RPC : sig

View File

@ -14,7 +14,7 @@ type t = {
state: State.t ; state: State.t ;
db: Distributed_db.t ; db: Distributed_db.t ;
block_validator: Block_validator.t ; block_validator: Block_validator.t ;
timeout: Net_validator.timeout ; net_validator_limits: Net_validator.limits ;
peer_validator_limits: Peer_validator.limits ; peer_validator_limits: Peer_validator.limits ;
block_validator_limits: Block_validator.limits ; block_validator_limits: Block_validator.limits ;
prevalidator_limits: Prevalidator.limits ; prevalidator_limits: Prevalidator.limits ;
@ -24,31 +24,31 @@ type t = {
} }
let create state db timeout let create state db
peer_validator_limits peer_validator_limits
block_validator_limits block_validator_limits
prevalidator_limits = prevalidator_limits
net_validator_limits =
Block_validator.create block_validator_limits db >>= fun block_validator -> Block_validator.create block_validator_limits db >>= fun block_validator ->
let valid_block_input = Lwt_watcher.create_input () in let valid_block_input = Lwt_watcher.create_input () in
Lwt.return Lwt.return
{ state ; db ; timeout ; block_validator ; { state ; db ; block_validator ;
prevalidator_limits ; block_validator_limits ; prevalidator_limits ;
peer_validator_limits ; block_validator_limits ; peer_validator_limits ; net_validator_limits ;
valid_block_input ; valid_block_input ;
active_nets = Net_id.Table.create 7 ; active_nets = Net_id.Table.create 7 }
}
let activate v ?bootstrap_threshold ?max_child_ttl net_state = let activate v ?max_child_ttl net_state =
let net_id = State.Net.id net_state in let net_id = State.Net.id net_state in
lwt_log_notice "activate network %a" Net_id.pp net_id >>= fun () -> lwt_log_notice "activate network %a" Net_id.pp net_id >>= fun () ->
try Net_id.Table.find v.active_nets net_id try Net_id.Table.find v.active_nets net_id
with Not_found -> with Not_found ->
let nv = let nv =
Net_validator.create Net_validator.create
?bootstrap_threshold
?max_child_ttl ?max_child_ttl
v.timeout v.peer_validator_limits v.prevalidator_limits v.peer_validator_limits v.prevalidator_limits
v.block_validator v.valid_block_input v.db net_state in v.block_validator v.valid_block_input v.db net_state
v.net_validator_limits in
Net_id.Table.add v.active_nets net_id nv ; Net_id.Table.add v.active_nets net_id nv ;
nv nv

View File

@ -14,17 +14,16 @@ type t
val create: val create:
State.t -> State.t ->
Distributed_db.t -> Distributed_db.t ->
Net_validator.timeout ->
Peer_validator.limits -> Peer_validator.limits ->
Block_validator.limits -> Block_validator.limits ->
Prevalidator.limits -> Prevalidator.limits ->
Net_validator.limits ->
t Lwt.t t Lwt.t
val shutdown: t -> unit Lwt.t val shutdown: t -> unit Lwt.t
(** Start the validation scheduler of a given network. *) (** Start the validation scheduler of a given network. *)
val activate: val activate:
t -> t ->
?bootstrap_threshold:int ->
?max_child_ttl:int -> ?max_child_ttl:int ->
State.Net.t -> Net_validator.t Lwt.t State.Net.t -> Net_validator.t Lwt.t

View File

@ -161,7 +161,7 @@ module Make
self -> 'a Request.t -> 'a tzresult Lwt.t self -> 'a Request.t -> 'a tzresult Lwt.t
(** Called when no request has been made before the timeout, if (** Called when no request has been made before the timeout, if
the parameter has been passed to {!launch}. *) the parameter has been passed to {!launch}. *)
val on_no_request : val on_no_request :
self -> unit tzresult Lwt.t self -> unit tzresult Lwt.t
@ -183,12 +183,13 @@ module Make
unit tzresult Lwt.t unit tzresult Lwt.t
(** A function called at the end of the worker loop in case of a (** A function called at the end of the worker loop in case of a
successful treatment of the current request. *) successful treatment of the current request. *)
val on_completion : val on_completion :
self -> self ->
'a Request.t -> 'a -> 'a Request.t -> 'a ->
Worker_types.request_status -> Worker_types.request_status ->
unit Lwt.t unit Lwt.t
end end
(** Creates a new worker instance. (** Creates a new worker instance.

View File

@ -0,0 +1,117 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
module Request = struct
type view = Block_hash.t
let encoding = Block_hash.encoding
let pp = Block_hash.pp
end
module Event = struct
type update =
| Ignored_head
| Branch_switch
| Head_incrememt
type t =
| Processed_block of
{ request : Request.view ;
request_status : Worker_types.request_status ;
update : update ;
fitness : Fitness.t }
| Could_not_switch_testnet of error list
let level = function
| Processed_block req ->
begin match req.update with
| Ignored_head -> Logging.Info
| Branch_switch | Head_incrememt -> Logging.Notice
end
| Could_not_switch_testnet _ -> Logging.Error
let encoding error_encoding =
let open Data_encoding in
union
[ case (Tag 0)
(obj4
(req "request" Request.encoding)
(req "status" Worker_types.request_status_encoding)
(req "outcome"
(string_enum [ "ignored", Ignored_head ;
"branch", Branch_switch ;
"increment", Head_incrememt ]))
(req "fitness" Fitness.encoding))
(function
| Processed_block { request ; request_status ; update ; fitness } ->
Some (request, request_status, update, fitness)
| _ -> None)
(fun (request, request_status, update, fitness) ->
Processed_block { request ; request_status ; update ; fitness }) ;
case (Tag 1)
error_encoding
(function
| Could_not_switch_testnet err -> Some err
| _ -> None)
(fun err -> Could_not_switch_testnet err) ]
let pp ppf = function
| Processed_block req ->
Format.fprintf ppf "@[<v 2>" ;
begin match req.update with
| Ignored_head ->
Format.fprintf ppf
"Current head is better than %a (fitness %a), we do not switch@,"
| Branch_switch ->
Format.fprintf ppf
"Update current head to %a (fitness %a), changing branch@,"
| Head_incrememt ->
Format.fprintf ppf
"Update current head to %a (fitness %a), same branch@,"
end
Request.pp req.request
Fitness.pp req.fitness ;
Format.fprintf ppf
"Pushed: %a, Treated: %a, Completed: %a@]"
Time.pp_hum req.request_status.pushed
Time.pp_hum req.request_status.treated
Time.pp_hum req.request_status.completed
| Could_not_switch_testnet err ->
Format.fprintf ppf "@[<v 2>Error while switch test network:@ %a@]"
Error_monad.pp_print_error err
end
module Worker_state = struct
type view =
{ active_peers : P2p_types.Peer_id.t list ;
bootstrapped_peers : P2p_types.Peer_id.t list ;
bootstrapped : bool }
let encoding =
let open Data_encoding in
conv
(fun { bootstrapped ; bootstrapped_peers ; active_peers } ->
(bootstrapped, bootstrapped_peers, active_peers))
(fun (bootstrapped, bootstrapped_peers, active_peers) ->
{ bootstrapped ; bootstrapped_peers ; active_peers })
(obj3
(req "bootstrapped" bool)
(req "bootstrapped_peers" (list P2p_types.Peer_id.encoding))
(req "active_peers" (list P2p_types.Peer_id.encoding)))
let pp ppf { bootstrapped ; bootstrapped_peers ; active_peers } =
Format.fprintf ppf
"@[<v 0>Network is%s bootstrapped.@,\
@[<v 2>Active peers:%a@]@,\
@[<v 2>Bootstrapped peers:%a@]@]"
(if bootstrapped then "" else " not yet")
(fun ppf -> List.iter (Format.fprintf ppf "@,- %a" P2p_types.Peer_id.pp))
active_peers
(fun ppf -> List.iter (Format.fprintf ppf "@,- %a" P2p_types.Peer_id.pp))
bootstrapped_peers
end

View File

@ -0,0 +1,40 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
module Request : sig
type view = Block_hash.t
val encoding : view Data_encoding.encoding
val pp : Format.formatter -> view -> unit
end
module Event : sig
type update =
| Ignored_head
| Branch_switch
| Head_incrememt
type t =
| Processed_block of
{ request : Request.view ;
request_status : Worker_types.request_status ;
update : update ;
fitness : Fitness.t }
| Could_not_switch_testnet of error list
val level : t -> Logging.level
val encoding : error list Data_encoding.encoding -> t Data_encoding.encoding
val pp : Format.formatter -> t -> unit
end
module Worker_state : sig
type view =
{ active_peers : P2p_types.Peer_id.t list ;
bootstrapped_peers : P2p_types.Peer_id.t list ;
bootstrapped : bool }
val encoding : view Data_encoding.encoding
val pp : Format.formatter -> view -> unit
end