ligo/src/lib_shell/bootstrap_pipeline.ml

291 lines
12 KiB
OCaml
Raw Normal View History

(**************************************************************************)
(* *)
2018-02-06 00:17:03 +04:00
(* Copyright (c) 2014 - 2018. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
include Logging.Make(struct let name = "node.validator.bootstrap_pipeline" end)
open Validation_errors
2017-12-17 22:51:06 +04:00
type t = {
canceler: Lwt_canceler.t ;
block_header_timeout: float ;
block_operations_timeout: float ;
mutable headers_fetch_worker: unit Lwt.t ;
mutable operations_fetch_worker: unit Lwt.t ;
mutable validation_worker: unit Lwt.t ;
peer_id: P2p_peer.Id.t ;
2018-02-16 04:26:24 +04:00
chain_db: Distributed_db.chain_db ;
locator: Block_locator.t ;
block_validator: Block_validator.t ;
notify_new_block: State.Block.t -> unit ;
fetched_headers:
(Block_hash.t * Block_header.t) Lwt_pipe.t ;
fetched_blocks:
(Block_hash.t * Block_header.t * Operation.t list list tzresult Lwt.t) Lwt_pipe.t ;
(* HACK, a worker should be able to return the 'error'. *)
mutable errors: Error_monad.error list ;
}
let assert_acceptable_header pipeline
?(first = false) hash (header : Block_header.t) =
let chain_state = Distributed_db.chain_state pipeline.chain_db in
let time_now = Time.now () in
fail_unless
(Time.(add time_now 15L >= header.shell.timestamp))
(Future_block_header { block = hash; time = time_now;
block_time = header.shell.timestamp }) >>=? fun () ->
State.Chain.checkpoint chain_state >>= fun (level, checkpoint) ->
fail_when
(Int32.equal header.shell.level level &&
not (Block_hash.equal checkpoint hash))
(Checkpoint_error (hash, Some pipeline.peer_id)) >>=? fun () ->
(* Early detection of fork point before the current checkpoint. *)
Chain.head chain_state >>= fun head ->
let bootstraping = (State.Block.header head).shell.level < level in
fail_when (first && not bootstraping && header.shell.level < level)
(Checkpoint_error (hash, Some pipeline.peer_id)) >>=? fun () ->
return ()
let fetch_step pipeline ?first (step : Block_locator.step) =
lwt_log_info "fetching step %a -> %a (%d%s) from peer %a."
Block_hash.pp_short step.block
Block_hash.pp_short step.predecessor
step.step
(if step.strict_step then "" else " max")
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
let rec fetch_loop ?first acc hash cpt =
Lwt_unix.yield () >>= fun () ->
if cpt < 0 then
lwt_log_info "invalid step from peer %a (too long)."
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
2017-12-17 22:51:06 +04:00
fail (Invalid_locator (pipeline.peer_id, pipeline.locator))
else if Block_hash.equal hash step.predecessor then
if step.strict_step && cpt <> 0 then
lwt_log_info "invalid step from peer %a (too short)."
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
2017-12-17 22:51:06 +04:00
fail (Invalid_locator (pipeline.peer_id, pipeline.locator))
else
return acc
else
lwt_debug "fetching block header %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
protect ~canceler:pipeline.canceler begin fun () ->
Distributed_db.Block_header.fetch
~timeout:pipeline.block_header_timeout
2018-02-16 04:26:24 +04:00
pipeline.chain_db ~peer:pipeline.peer_id
hash ()
end >>=? fun header ->
assert_acceptable_header ?first pipeline hash header >>=? fun () ->
lwt_debug "fetched block header %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
fetch_loop ((hash, header) :: acc) header.shell.predecessor (cpt - 1)
in
fetch_loop ?first [] step.block step.step >>=? fun headers ->
iter_s
begin fun header ->
protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.push pipeline.fetched_headers header >>= return
end
end
headers >>=? fun () ->
return ()
let headers_fetch_worker_loop pipeline =
begin
let sender_id = Distributed_db.my_peer_id pipeline.chain_db in
(* sender and receiver are inverted here because they are from
the point of view of the node sending the locator *)
let seed = {Block_locator.sender_id=pipeline.peer_id; receiver_id=sender_id } in
let steps = Block_locator.to_steps seed pipeline.locator in
match steps with
| [] -> return ()
| step :: steps ->
fetch_step pipeline ~first:true step >>=? fun () ->
iter_s (fetch_step pipeline) steps >>=? fun () ->
return ()
end >>= function
| Ok () ->
lwt_log_info "fetched all step from peer %a."
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
Lwt_pipe.close pipeline.fetched_headers ;
Lwt.return_unit
| Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] ->
Lwt.return_unit
2017-11-28 18:07:34 +04:00
| Error [ Distributed_db.Block_header.Timeout bh ] ->
lwt_log_info "request for header %a from peer %a timed out."
Block_hash.pp_short bh
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
2017-11-28 18:07:34 +04:00
Lwt.return_unit
| Error [ Future_block_header { block; block_time; time } ] ->
lwt_log_notice "Block locator %a from peer %a contains future blocks. \
local time: %a, block time: %a"
Block_hash.pp_short block
Time.pp_hum time
Time.pp_hum block_time
2018-05-05 22:25:57 +04:00
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
| Error err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error "@[Unexpected error (headers fetch):@ %a@]"
pp_print_error err >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
let rec operations_fetch_worker_loop pipeline =
begin
Lwt_unix.yield () >>= fun () ->
protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.pop pipeline.fetched_headers >>= return
end >>=? fun (hash, header) ->
lwt_log_info "fetching operations of block %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
let operations =
map_p
(fun i ->
protect ~canceler:pipeline.canceler begin fun () ->
Distributed_db.Operations.fetch
~timeout:pipeline.block_operations_timeout
pipeline.chain_db ~peer:pipeline.peer_id
(hash, i) header.shell.operations_hash
end)
(0 -- (header.shell.validation_passes - 1)) >>=? fun operations ->
lwt_log_info "fetched operations of block %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
return operations in
protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.push pipeline.fetched_blocks
(hash, header, operations) >>= return
end
end >>= function
| Ok () ->
operations_fetch_worker_loop pipeline
| Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] ->
Lwt_pipe.close pipeline.fetched_blocks ;
Lwt.return_unit
2017-11-28 18:07:34 +04:00
| Error [ Distributed_db.Operations.Timeout (bh, n) ] ->
lwt_log_info "request for operations %a:%d from peer %a timed out."
Block_hash.pp_short bh n
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
2017-11-28 18:07:34 +04:00
Lwt.return_unit
| Error err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error "@[Unexpected error (operations fetch):@ %a@]"
pp_print_error err >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
let rec validation_worker_loop pipeline =
begin
Lwt_unix.yield () >>= fun () ->
protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.pop pipeline.fetched_blocks >>= return
end >>=? fun (hash, header, operations) ->
lwt_log_info "requesting validation for block %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
operations >>=? fun operations ->
protect ~canceler:pipeline.canceler begin fun () ->
Block_validator.validate
~canceler:pipeline.canceler
~notify_new_block:pipeline.notify_new_block
pipeline.block_validator
2018-02-16 04:26:24 +04:00
pipeline.chain_db hash header operations
end >>=? fun _block ->
lwt_log_info "validated block %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
return ()
end >>= function
| Ok () -> validation_worker_loop pipeline
| Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] ->
Lwt.return_unit
| Error ([ Block_validator_errors.Invalid_block _
| Block_validator_errors.Unavailable_protocol _ ] as err ) ->
(* Propagate the error to the peer validator. *)
pipeline.errors <- pipeline.errors @ err ;
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
| Error err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error "@[Unexpected error (validator):@ %a@]"
pp_print_error err >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
let create
?(notify_new_block = fun _ -> ())
~block_header_timeout ~block_operations_timeout
2018-02-16 04:26:24 +04:00
block_validator peer_id chain_db locator =
let canceler = Lwt_canceler.create () in
let fetched_headers =
Lwt_pipe.create ~size:(1024, fun _ -> 1) () in
let fetched_blocks =
Lwt_pipe.create ~size:(128, fun _ -> 1) () in
let pipeline = {
canceler ;
block_header_timeout ; block_operations_timeout ;
headers_fetch_worker = Lwt.return_unit ;
operations_fetch_worker = Lwt.return_unit ;
validation_worker = Lwt.return_unit ;
notify_new_block ;
2018-02-16 04:26:24 +04:00
peer_id ; chain_db ; locator ;
block_validator ;
fetched_headers ; fetched_blocks ;
errors = [] ;
} in
Lwt_canceler.on_cancel pipeline.canceler begin fun () ->
Lwt_pipe.close fetched_blocks ;
Lwt_pipe.close fetched_headers ;
2018-05-05 22:25:57 +04:00
(* TODO proper cleanup of ressources... *)
Lwt.return_unit
end ;
let head, _ = (pipeline.locator : Block_locator.t :> _ * _) in
let hash = Block_header.hash head in
pipeline.headers_fetch_worker <-
Lwt_utils.worker
(Format.asprintf "bootstrap_pipeline-headers_fetch.%a.%a"
P2p_peer.Id.pp_short peer_id Block_hash.pp_short hash)
~run:(fun () -> headers_fetch_worker_loop pipeline)
~cancel:(fun () -> Lwt_canceler.cancel pipeline.canceler) ;
pipeline.operations_fetch_worker <-
Lwt_utils.worker
(Format.asprintf "bootstrap_pipeline-operations_fetch.%a.%a"
P2p_peer.Id.pp_short peer_id Block_hash.pp_short hash)
~run:(fun () -> operations_fetch_worker_loop pipeline)
~cancel:(fun () -> Lwt_canceler.cancel pipeline.canceler) ;
pipeline.validation_worker <-
Lwt_utils.worker
(Format.asprintf "bootstrap_pipeline-validation.%a.%a"
P2p_peer.Id.pp_short peer_id Block_hash.pp_short hash)
~run:(fun () -> validation_worker_loop pipeline)
~cancel:(fun () -> Lwt_canceler.cancel pipeline.canceler) ;
pipeline
let wait_workers pipeline =
pipeline.headers_fetch_worker >>= fun () ->
pipeline.operations_fetch_worker >>= fun () ->
pipeline.validation_worker >>= fun () ->
Lwt.return_unit
let wait pipeline =
wait_workers pipeline >>= fun () ->
match pipeline.errors with
| [] -> return ()
| errors -> Lwt.return_error errors
let cancel pipeline =
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
wait_workers pipeline