diff --git a/src/lib_shell/bootstrap_pipeline.ml b/src/lib_shell/bootstrap_pipeline.ml index 3c38609a7..75f52912a 100644 --- a/src/lib_shell/bootstrap_pipeline.ml +++ b/src/lib_shell/bootstrap_pipeline.ml @@ -26,7 +26,7 @@ type t = { fetched_headers: (Block_hash.t * Block_header.t) Lwt_pipe.t ; fetched_blocks: - (Block_hash.t * Block_header.t * Operation.t list list) Lwt_pipe.t ; + (Block_hash.t * Block_header.t * Operation.t list list tzresult Lwt.t) Lwt_pipe.t ; (* HACK, a worker should be able to return the 'error'. *) mutable errors: Error_monad.error list ; } @@ -115,18 +115,20 @@ let rec operations_fetch_worker_loop pipeline = lwt_log_info "fetching operations of block %a from peer %a." Block_hash.pp_short hash P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> - map_p - (fun i -> - protect ~canceler:pipeline.canceler begin fun () -> - Distributed_db.Operations.fetch - ~timeout:pipeline.block_operations_timeout - pipeline.chain_db ~peer:pipeline.peer_id - (hash, i) header.shell.operations_hash - end) - (0 -- (header.shell.validation_passes - 1)) >>=? fun operations -> - lwt_log_info "fetched operations of block %a from peer %a." - Block_hash.pp_short hash - P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> + let operations = + map_p + (fun i -> + protect ~canceler:pipeline.canceler begin fun () -> + Distributed_db.Operations.fetch + ~timeout:pipeline.block_operations_timeout + pipeline.chain_db ~peer:pipeline.peer_id + (hash, i) header.shell.operations_hash + end) + (0 -- (header.shell.validation_passes - 1)) >>=? fun operations -> + lwt_log_info "fetched operations of block %a from peer %a." + Block_hash.pp_short hash + P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> + return operations in protect ~canceler:pipeline.canceler begin fun () -> Lwt_pipe.push pipeline.fetched_blocks (hash, header, operations) >>= return @@ -159,6 +161,7 @@ let rec validation_worker_loop pipeline = lwt_log_info "requesting validation for block %a from peer %a." Block_hash.pp_short hash P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> + operations >>=? fun operations -> protect ~canceler:pipeline.canceler begin fun () -> Block_validator.validate ~canceler:pipeline.canceler @@ -193,9 +196,9 @@ let create block_validator peer_id chain_db locator = let canceler = Lwt_canceler.create () in let fetched_headers = - Lwt_pipe.create ~size:(50, fun _ -> 1) () in + Lwt_pipe.create ~size:(1024, fun _ -> 1) () in let fetched_blocks = - Lwt_pipe.create ~size:(50, fun _ -> 1) () in + Lwt_pipe.create ~size:(128, fun _ -> 1) () in let pipeline = { canceler ; block_header_timeout ; block_operations_timeout ;