Shell/Bootstrap: improve network paralellism

This commit is contained in:
Grégoire Henry 2018-04-20 20:43:28 +02:00 committed by Benjamin Canou
parent 6e4b2eab47
commit f2db5ffde3

View File

@ -26,7 +26,7 @@ type t = {
fetched_headers:
(Block_hash.t * Block_header.t) Lwt_pipe.t ;
fetched_blocks:
(Block_hash.t * Block_header.t * Operation.t list list) Lwt_pipe.t ;
(Block_hash.t * Block_header.t * Operation.t list list tzresult Lwt.t) Lwt_pipe.t ;
(* HACK, a worker should be able to return the 'error'. *)
mutable errors: Error_monad.error list ;
}
@ -115,18 +115,20 @@ let rec operations_fetch_worker_loop pipeline =
lwt_log_info "fetching operations of block %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
map_p
(fun i ->
protect ~canceler:pipeline.canceler begin fun () ->
Distributed_db.Operations.fetch
~timeout:pipeline.block_operations_timeout
pipeline.chain_db ~peer:pipeline.peer_id
(hash, i) header.shell.operations_hash
end)
(0 -- (header.shell.validation_passes - 1)) >>=? fun operations ->
lwt_log_info "fetched operations of block %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
let operations =
map_p
(fun i ->
protect ~canceler:pipeline.canceler begin fun () ->
Distributed_db.Operations.fetch
~timeout:pipeline.block_operations_timeout
pipeline.chain_db ~peer:pipeline.peer_id
(hash, i) header.shell.operations_hash
end)
(0 -- (header.shell.validation_passes - 1)) >>=? fun operations ->
lwt_log_info "fetched operations of block %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
return operations in
protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.push pipeline.fetched_blocks
(hash, header, operations) >>= return
@ -159,6 +161,7 @@ let rec validation_worker_loop pipeline =
lwt_log_info "requesting validation for block %a from peer %a."
Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
operations >>=? fun operations ->
protect ~canceler:pipeline.canceler begin fun () ->
Block_validator.validate
~canceler:pipeline.canceler
@ -193,9 +196,9 @@ let create
block_validator peer_id chain_db locator =
let canceler = Lwt_canceler.create () in
let fetched_headers =
Lwt_pipe.create ~size:(50, fun _ -> 1) () in
Lwt_pipe.create ~size:(1024, fun _ -> 1) () in
let fetched_blocks =
Lwt_pipe.create ~size:(50, fun _ -> 1) () in
Lwt_pipe.create ~size:(128, fun _ -> 1) () in
let pipeline = {
canceler ;
block_header_timeout ; block_operations_timeout ;