From a08d6b8cd972e1593a01a827a35bfd9488fab0ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Thu, 10 May 2018 16:30:41 +0200 Subject: [PATCH] Shell: update the checkpoint when updating the head The new checkpoint is the current `last_allowed_fork_level` of the new head. When updating the checkpoint the shell tags as invalid all blocks with a level strictly higher to the new checkpoint that are inconstant with it. And it removes from the disk all the block with a level lower or equal to the new checkpoint that do not belongs to the current chain. Though, the shell removes nothing from the disk when the current head is below the current checkpoint: this will allow to configure an expected checkpoint when bootstraping a node. The first patch is very conservative and only detects new incompatible blocks when they are stored on disk (i.e. after the validation). Fiture patches try to detect earlier such incompatible block. --- src/lib_shell/chain_validator.ml | 14 +++ src/lib_shell/state.ml | 171 ++++++++++++++++++++++++++++++- src/lib_shell/state.mli | 11 ++ 3 files changed, 195 insertions(+), 1 deletion(-) diff --git a/src/lib_shell/chain_validator.ml b/src/lib_shell/chain_validator.ml index 19aacf11e..b695adba5 100644 --- a/src/lib_shell/chain_validator.ml +++ b/src/lib_shell/chain_validator.ml @@ -127,6 +127,19 @@ let may_activate_peer_validator w peer_id = P2p_peer.Table.add nv.active_peers peer_id pv ; pv +let may_update_checkpoint chain_state new_head = + State.Chain.checkpoint chain_state >>= fun (old_level, _old_block) -> + let new_level = State.Block.last_allowed_fork_level new_head in + if new_level <= old_level then + Lwt.return_unit + else + let head_level = State.Block.level new_head in + State.Block.predecessor_n new_head + (Int32.to_int (Int32.sub head_level new_level)) >>= function + | None -> Lwt.return_unit (* should not happen *) + | Some new_block -> + State.Chain.set_checkpoint chain_state (new_level, new_block) + let may_switch_test_chain w spawn_child block = let nv = Worker.state w in let create_child genesis protocol expiration = @@ -234,6 +247,7 @@ let on_request (type a) w spawn_child (req : a Request.t) : a tzresult Lwt.t = return Event.Ignored_head else begin Chain.set_head nv.parameters.chain_state block >>= fun previous -> + may_update_checkpoint nv.parameters.chain_state block >>= fun () -> broadcast_head w ~previous block >>= fun () -> begin match nv.prevalidator with | Some prevalidator -> diff --git a/src/lib_shell/state.ml b/src/lib_shell/state.ml index e36be6e9b..83aa18358 100644 --- a/src/lib_shell/state.ml +++ b/src/lib_shell/state.ml @@ -35,6 +35,8 @@ and global_data = { } and chain_state = { + (* never take the lock on 'block_store' when holding + the lock on 'chain_data'. *) global_state: global_state ; chain_id: Chain_id.t ; genesis: genesis ; @@ -170,8 +172,10 @@ let predecessor_n (store: Store.Block.store) (block_hash: Block_hash.t) (distanc in (* actual predecessor function *) - if distance <= 0 then + if distance < 0 then invalid_arg ("State.predecessor: distance <= 0"^(string_of_int distance)) + else if distance = 0 then + Lwt.return_some block_hash else let rec loop block_hash distance = if distance = 1 @@ -228,8 +232,112 @@ module Locked_block = struct } >>= fun () -> Lwt.return header + (* Will that block is compatible with the current checkpoint. *) + let acceptable chain_data hash (header : Block_header.t) = + let level, block = chain_data.checkpoint in + if level < header.shell.level then + (* the predecessor is assumed compatible. *) + Lwt.return_true + else if level = header.shell.level then + Lwt.return (Block_hash.equal hash block) + else (* header.shell.level < level *) + (* valid only if the current head is lower than the checkpoint. *) + let head_level = + chain_data.data.current_head.contents.header.shell.level in + Lwt.return (head_level < level) + + (* Is a block still valid for a given checkpoint ? *) + let is_valid_for_checkpoint + block_store hash (header : Block_header.t) (level, block) = + if Compare.Int32.(header.shell.level < level) then + Lwt.return_true + else + predecessor_n block_store hash + (Int32.to_int @@ + Int32.sub header.shell.level level) >>= function + | None -> assert false + | Some pred -> + if Block_hash.equal pred block then + Lwt.return_true + else + Lwt.return_false + end +(* Find the branches that are still valid with a given checkpoint, i.e. + heads with lower level, or branches that goes through the checkpoint. *) +let locked_valid_heads_for_checkpoint block_store data checkpoint = + Store.Chain_data.Known_heads.read_all + data.chain_data_store >>= fun heads -> + Block_hash.Set.fold + (fun head acc -> + let valid_header = + Store.Block.Contents.read_exn + (block_store, head) >>= fun { header } -> + Locked_block.is_valid_for_checkpoint + block_store head header checkpoint >>= fun valid -> + Lwt.return (valid, header) in + acc >>= fun (valid_heads, invalid_heads) -> + valid_header >>= fun (valid, header) -> + if valid then + Lwt.return ((head, header) :: valid_heads, invalid_heads) + else + Lwt.return (valid_heads, (head, header) :: invalid_heads)) + heads + (Lwt.return ([], [])) + +(* Tag as invalid all blocks in `heads` and their predecessors whose + level strictly higher to 'level'. *) +let tag_invalid_heads block_store chain_store heads level = + let rec tag_invalid_head (hash, header) = + if header.Block_header.shell.level <= level then + Store.Chain_data.Known_heads.store chain_store hash >>= fun () -> + Lwt.return_some (hash, header) + else + let errors = [ Validation_errors.Checkpoint_error (hash, None) ] in + Store.Block.Invalid_block.store block_store hash + { level = header.shell.level ; errors } >>= fun () -> + Store.Block.Contents.remove (block_store, hash) >>= fun () -> + Store.Block.Operation_hashes.remove_all (block_store, hash) >>= fun () -> + Store.Block.Operation_path.remove_all (block_store, hash) >>= fun () -> + Store.Block.Operations.remove_all (block_store, hash) >>= fun () -> + Store.Block.Predecessors.remove_all (block_store, hash) >>= fun () -> + Store.Block.Contents.read_opt + (block_store, header.shell.predecessor) >>= function + | None -> + Lwt.return_none + | Some { header } -> + tag_invalid_head (Block_header.hash header, header) in + Lwt_list.iter_p + (fun (hash, _header) -> + Store.Chain_data.Known_heads.remove chain_store hash) + heads >>= fun () -> + Lwt_list.filter_map_s tag_invalid_head heads + +(* Remove all blocks that are not in the chain. *) +let cut_alternate_heads block_store chain_store heads = + let rec cut_alternate_head hash header = + Store.Chain_data.In_main_branch.known (chain_store, hash) >>= fun in_chain -> + if in_chain then + Lwt.return_unit + else + Store.Block.Contents.remove (block_store, hash) >>= fun () -> + Store.Block.Operation_hashes.remove_all (block_store, hash) >>= fun () -> + Store.Block.Operation_path.remove_all (block_store, hash) >>= fun () -> + Store.Block.Operations.remove_all (block_store, hash) >>= fun () -> + Store.Block.Predecessors.remove_all (block_store, hash) >>= fun () -> + Store.Block.Contents.read_opt + (block_store, header.Block_header.shell.predecessor) >>= function + | None -> + Lwt.return_unit + | Some { header } -> + cut_alternate_head (Block_header.hash header) header in + Lwt_list.iter_p + (fun (hash, header) -> + Store.Chain_data.Known_heads.remove chain_store hash >>= fun () -> + cut_alternate_head hash header) + heads + module Chain = struct type nonrec genesis = genesis = { @@ -418,6 +526,51 @@ module Chain = struct Lwt.return checkpoint end + let set_checkpoint chain_state ((level, _block) as checkpoint) = + Shared.use chain_state.block_store begin fun store -> + Shared.use chain_state.chain_data begin fun data -> + let head_header = + data.data.current_head.contents.header in + let head_hash = data.data.current_head.hash in + Locked_block.is_valid_for_checkpoint + store head_hash head_header checkpoint >>= fun valid -> + assert valid ; + (* Remove outdated invalid blocks. *) + Store.Block.Invalid_block.iter store ~f: begin fun hash iblock -> + if iblock.level <= level then + Store.Block.Invalid_block.remove store hash + else + Lwt.return_unit + end >>= fun () -> + (* Remove outdated heads and tag invalid branches. *) + begin + locked_valid_heads_for_checkpoint + store data checkpoint >>= fun (valid_heads, invalid_heads) -> + tag_invalid_heads store data.chain_data_store + invalid_heads level >>= fun outdated_invalid_heads -> + if head_header.shell.level < level then + Lwt.return_unit + else + let outdated_valid_heads = + List.filter + (fun (hash, { Block_header.shell } ) -> + shell.level <= level && + not (Block_hash.equal hash head_hash)) + valid_heads in + cut_alternate_heads store data.chain_data_store + outdated_valid_heads >>= fun () -> + cut_alternate_heads store data.chain_data_store + outdated_invalid_heads + end >>= fun () -> + (* Store the new checkpoint. *) + Store.Chain_data.Checkpoint.store + data.chain_data_store checkpoint >>= fun () -> + data.checkpoint <- checkpoint ; + (* TODO 'git fsck' in the context. *) + Lwt.return_unit + end + end + let destroy state chain = lwt_debug "destroy %a" Chain_id.pp (id chain) >>= fun () -> Shared.use state.global_data begin fun { global_store ; chains } -> @@ -576,6 +729,22 @@ module Block = struct if known then return None else begin + (* safety check: never ever commit a block that is not compatible + with the current checkpoint. *) + begin + let predecessor = block_header.shell.predecessor in + Store.Block.Contents.known + (store, predecessor) >>= fun valid_predecessor -> + if not valid_predecessor then + Lwt.return_false + else + Shared.use chain_state.chain_data begin fun chain_data -> + Locked_block.acceptable chain_data hash block_header + end + end >>= fun acceptable_block -> + fail_unless + acceptable_block + (Checkpoint_error (hash, None)) >>=? fun () -> Context.commit ~time:block_header.shell.timestamp ?message context >>= fun commit -> fail_unless diff --git a/src/lib_shell/state.mli b/src/lib_shell/state.mli index 05ae0c97d..5889419a7 100644 --- a/src/lib_shell/state.mli +++ b/src/lib_shell/state.mli @@ -73,6 +73,17 @@ module Chain : sig val checkpoint: chain_state -> (Int32.t * Block_hash.t) Lwt.t + (** Update the current checkpoint. The current head should be + consistent (i.e. it should either have a lower level or pass + through the checkpoint). In the process all the blocks from + invalid alternate heads are removed from the disk, either + completely (when `level <= checkpoint`) or still tagged as + invalid (when `level > checkpoint`). *) + val set_checkpoint: + chain_state -> + Int32.t * Block_hash.t -> + unit Lwt.t + end (** {2 Block database} *****************************************************)