Shell: add docstrings for the validator.

This commit is contained in:
Grégoire Henry 2017-11-13 19:06:30 +01:00 committed by Benjamin Canou
parent 119f724e64
commit 6a00c55c4e
14 changed files with 282 additions and 171 deletions

View File

@ -7,6 +7,8 @@
(* *)
(**************************************************************************)
(** Tezos Shell Net - Low level API for the Gossip network *)
(** A peer connection address *)
type addr = Ipaddr.V6.t

View File

@ -7,6 +7,8 @@
(* *)
(**************************************************************************)
(** Tezos Shell Module - Chain Traversal API *)
open State
val path: Block.t -> Block.t -> Block.t list option Lwt.t

View File

@ -696,23 +696,7 @@ let shutdown { p2p ; p2p_readers ; active_nets } =
P2p.shutdown p2p >>= fun () ->
Lwt.return_unit
let read_all_operations net_db hash n =
Lwt_list.map_p
(fun i ->
Raw_operations.Table.read_opt net_db.operations_db.table (hash, i))
(0 -- (n-1)) >>= fun operations ->
mapi_p
(fun i ops ->
match ops with
| Some ops -> return ops
| None ->
Raw_operation_hashes.Table.read
net_db.operation_hashes_db.table (hash, i) >>=? fun hashes ->
map_p (Raw_operation.Table.read net_db.operation_db.table) hashes)
operations
let clear_block net_db hash n =
(* TODO use a reference counter ?? *)
Raw_operations.clear_all net_db.operations_db.table hash n ;
Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ;
Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash
@ -732,12 +716,6 @@ let commit_invalid_block net_db hash header _err =
clear_block net_db hash header.shell.validation_passes ;
return res
let clear_operations net_db operations =
List.iter
(List.iter
(Raw_operation.Table.clear_or_cancel net_db.operation_db.table))
operations
let inject_block_header net_db h b =
fail_unless
(Net_id.equal
@ -755,9 +733,6 @@ let inject_operation net_db h op =
Raw_operation.Table.inject net_db.operation_db.table h op >>= fun res ->
return res
let inject_protocol db h p =
Raw_protocol.Table.inject db.protocol_db.table h p
let commit_protocol db h p =
State.Protocol.store db.disk p >>= fun res ->
Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ;
@ -767,8 +742,6 @@ let watch_block_header { block_input } =
Watcher.create_stream block_input
let watch_operation { operation_input } =
Watcher.create_stream operation_input
let watch_protocol { protocol_db } =
Raw_protocol.Table.watch protocol_db.table
module Raw = struct
let encoding = P2p.Raw.encoding Message.cfg.encoding
@ -779,26 +752,26 @@ module type DISTRIBUTED_DB = sig
type t
type key
type value
type param
val known: t -> key -> bool Lwt.t
type error += Missing_data of key
type error += Canceled of key
type error += Timeout of key
val read: t -> key -> value tzresult Lwt.t
val read_opt: t -> key -> value option Lwt.t
val read_exn: t -> key -> value Lwt.t
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
val prefetch:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
key -> param -> unit
type param
type error += Timeout of key
val fetch:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
key -> param -> value tzresult Lwt.t
val prefetch:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
key -> param -> unit
type error += Canceled of key
val clear_or_cancel: t -> key -> unit
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
end
module Make
@ -807,7 +780,6 @@ module Make
type t
val proj: t -> Table.t
end) = struct
type t = Kind.t
type key = Table.key
type value = Table.value
let known t k = Table.known (Kind.proj t) k
@ -826,11 +798,16 @@ module Make
let watch t = Table.watch (Kind.proj t)
end
module Block_header =
Make (Raw_block_header.Table) (struct
type t = net_db
let proj net = net.block_header_db.table
end)
module Block_header = struct
type t = Block_header.t
include (Make (Raw_block_header.Table) (struct
type t = net_db
let proj net = net.block_header_db.table
end) : DISTRIBUTED_DB with type t := net_db
and type key := Block_hash.t
and type value := Block_header.t
and type param := unit)
end
module Operation_hashes =
Make (Raw_operation_hashes.Table) (struct
@ -844,17 +821,27 @@ module Operations =
let proj net = net.operations_db.table
end)
module Operation =
Make (Raw_operation.Table) (struct
type t = net_db
let proj net = net.operation_db.table
end)
module Operation = struct
type t = Operation.t
include (Make (Raw_operation.Table) (struct
type t = net_db
let proj net = net.operation_db.table
end) : DISTRIBUTED_DB with type t := net_db
and type key := Operation_hash.t
and type value := Operation.t
and type param := unit)
end
module Protocol =
Make (Raw_protocol.Table) (struct
type t = db
let proj db = db.protocol_db.table
end)
module Protocol = struct
type t = Protocol.t
include (Make (Raw_protocol.Table) (struct
type t = db
let proj db = db.protocol_db.table
end) : DISTRIBUTED_DB with type t := db
and type key := Protocol_hash.t
and type value := Protocol.t
and type param := unit)
end
let broadcast net_db msg =

View File

@ -7,6 +7,8 @@
(* *)
(**************************************************************************)
(** Tezos Shell - High-level API for the Gossip network and local storage. *)
type t
type db = t
@ -19,25 +21,179 @@ val create: State.t -> p2p -> t
val state: db -> State.t
val shutdown: t -> unit Lwt.t
(** {1 Network database} *)
(** An instance of the distributed DB for a given network (mainnet,
current testnet, ...) *)
type net_db
val net_state: net_db -> State.Net.t
val db: net_db -> db
(** Activate a given network. The node will notify its neighbours that
it now handles the given network and that it expects notification
for new head or new operations. *)
val activate: t -> State.Net.t -> net_db
(** Deactivate a given network. The node will notify its neighbours
that it does not care anymore about this network. *)
val deactivate: net_db -> unit Lwt.t
type callback = {
notify_branch:
P2p.Peer_id.t -> Block_locator.t -> unit ;
notify_head:
P2p.Peer_id.t -> Block_header.t -> Mempool.t -> unit ;
notify_branch: P2p.Peer_id.t -> Block_locator.t -> unit ;
notify_head: P2p.Peer_id.t -> Block_header.t -> Mempool.t -> unit ;
disconnection: P2p.Peer_id.t -> unit ;
}
val activate: t -> State.Net.t -> net_db
(** Register all the possible callback from the distributed DB to the
validator. *)
val set_callback: net_db -> callback -> unit
val deactivate: net_db -> unit Lwt.t
(** Kick a given peer. *)
val disconnect: net_db -> P2p.Peer_id.t -> unit Lwt.t
(** Various accessors. *)
val net_state: net_db -> State.Net.t
val db: net_db -> db
(** {1 Sending messages} *)
module Request : sig
(** Send to a given peer, or to all known active peers for the
network, a friendly request "Hey, what's your current branch
?". The expected answer is a `Block_locator.t.`. *)
val current_branch: net_db -> ?peer:P2p.Peer_id.t -> unit -> unit
(** Send to a given peer, or to all known active peers for the
given network, a friendly request "Hey, what's your current
branch ?". The expected answer is a `Block_locator.t.`. *)
val current_head: net_db -> ?peer:P2p.Peer_id.t -> unit -> unit
end
module Advertise : sig
(** Notify a given peer, or all known active peers for the
network, of a new head and possibly of new operations. *)
val current_head:
net_db -> ?peer:P2p.Peer_id.t ->
?mempool:Mempool.t -> State.Block.t -> unit
(** Notify a given peer, or all known active peers for the
network, of a new head and its sparse history. *)
val current_branch:
net_db -> ?peer:P2p.Peer_id.t ->
State.Block.t -> unit Lwt.t
end
(** {1 Indexes} *)
(** Generic interface for a "distributed" index.
By "distributed", it means that this interface abstract the p2p
gossip layer and it is able to fetch missing data from known
peers in a "synchronous" interface.
*)
module type DISTRIBUTED_DB = sig
type t
type key
(** The index key *)
type value
(** The indexed data *)
(** Is the value known locally? *)
val known: t -> key -> bool Lwt.t
type error += Missing_data of key
(** Return the value if it is known locally, otherwise fail with
the error [Missing_data]. *)
val read: t -> key -> value tzresult Lwt.t
(** Return the value if it is known locally, otherwise fail with
the value [None]. *)
val read_opt: t -> key -> value option Lwt.t
(** Return the value if it is known locally, otherwise fail with
the exception [Not_found]. *)
val read_exn: t -> key -> value Lwt.t
type param (** An extra parameter for the network lookup, usually
used for prevalidating data. *)
type error += Timeout of key
(** Return the value if it is known locally, or block until the data
is received from the network. By default, the data will be
requested to all the active peers in the network; if the [peer]
argument is provided, the data will only be requested to the
provided peer. By default, the resulting promise will block
forever if the data is never received. If [timeout] is provided
the promise will be resolved with the error [Timeout] after the
provided amount of seconds.
A internal scheduler is able to re-send the request with an
exponential back-off until the data is received. If the function
is called multiple time with the same key but with disctinct
peers, the internal scheduler randomly chooses the requested
peer (at each retry). *)
val fetch:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
key -> param -> value tzresult Lwt.t
(** Same as `fetch` but the call is non-blocking: the data will be
stored in the local index when received. *)
val prefetch:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
key -> param -> unit
type error += Canceled of key
(** Remove the data from the local index or cancel all pending
request. Any pending [fetch] promises are resolved with the
error [Canceled]. *)
val clear_or_cancel: t -> key -> unit
(** Monitor all the fetched data. A given data will appear only
once. *)
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
end
(** {2 Block index} *)
(** Index of block headers. *)
module Block_header : sig
type t = Block_header.t (* avoid shadowing. *)
include DISTRIBUTED_DB with type t := net_db
and type key := Block_hash.t
and type value := Block_header.t
and type param := unit
end
(** Index of all the operations of a given block (per validation pass). *)
module Operations :
DISTRIBUTED_DB with type t := net_db
and type key = Block_hash.t * int
and type value = Operation.t list
and type param := Operation_list_list_hash.t
(** Index of all the hashes of operations of a given block (per
validation pass). *)
module Operation_hashes :
DISTRIBUTED_DB with type t := net_db
and type key = Block_hash.t * int
and type value = Operation_hash.t list
and type param := Operation_list_list_hash.t
(** Store on disk all the data associated to a valid block. *)
val commit_block:
net_db ->
Block_hash.t ->
@ -45,108 +201,55 @@ val commit_block:
Updater.validation_result ->
State.Block.t option tzresult Lwt.t
(** Store on disk all the data associated to an invalid block. *)
val commit_invalid_block:
net_db ->
Block_hash.t -> Block_header.t -> Error_monad.error list ->
bool tzresult Lwt.t
val clear_operations: net_db -> Operation_hash.t list list -> unit
(** Monitor all the fetched block headers (for all activate networks). *)
val watch_block_header:
t -> (Block_hash.t * Block_header.t) Lwt_stream.t * Watcher.stopper
val inject_block_header:
net_db -> Block_hash.t -> Block_header.t -> bool tzresult Lwt.t
(** {2 Operations index} *)
(** Index of operations (for the mempool). *)
module Operation : sig
type t = Operation.t (* avoid shadowing. *)
include DISTRIBUTED_DB with type t := net_db
and type key := Operation_hash.t
and type value := Operation.t
and type param := unit
end
(** Inject a new operation in the local index (memory only). *)
val inject_operation:
net_db -> Operation_hash.t -> Operation.t -> bool tzresult Lwt.t
val commit_protocol:
db -> Protocol_hash.t -> Protocol.t -> bool tzresult Lwt.t
val inject_protocol:
db -> Protocol_hash.t -> Protocol.t -> bool Lwt.t
val watch_block_header:
t -> (Block_hash.t * Block_header.t) Lwt_stream.t * Watcher.stopper
(** Monitor all the fetched operations (for all activate networks). *)
val watch_operation:
t -> (Operation_hash.t * Operation.t) Lwt_stream.t * Watcher.stopper
val watch_protocol:
t -> (Protocol_hash.t * Protocol.t) Lwt_stream.t * Watcher.stopper
(** {2 Protocol index} *)
module type DISTRIBUTED_DB = sig
type t
type key
type value
type param
val known: t -> key -> bool Lwt.t
type error += Missing_data of key
type error += Canceled of key
type error += Timeout of key
val read: t -> key -> value tzresult Lwt.t
val read_opt: t -> key -> value option Lwt.t
val read_exn: t -> key -> value Lwt.t
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
val prefetch:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
key -> param -> unit
val fetch:
t ->
?peer:P2p.Peer_id.t ->
?timeout:float ->
key -> param -> value tzresult Lwt.t
val clear_or_cancel: t -> key -> unit
(** Index of protocol sources. *)
module Protocol : sig
type t = Protocol.t (* avoid shadowing. *)
include DISTRIBUTED_DB with type t := db
and type key := Protocol_hash.t
and type value := Protocol.t
and type param := unit
end
module Block_header :
DISTRIBUTED_DB with type t = net_db
and type key := Block_hash.t
and type value := Block_header.t
and type param := unit
(** Store on disk protocol sources. *)
val commit_protocol:
db -> Protocol_hash.t -> Protocol.t -> bool tzresult Lwt.t
module Operations :
DISTRIBUTED_DB with type t = net_db
and type key = Block_hash.t * int
and type value = Operation.t list
and type param := Operation_list_list_hash.t
val read_all_operations:
net_db -> Block_hash.t -> int -> Operation.t list list tzresult Lwt.t
module Operation_hashes :
DISTRIBUTED_DB with type t = net_db
and type key = Block_hash.t * int
and type value = Operation_hash.t list
and type param := Operation_list_list_hash.t
module Operation :
DISTRIBUTED_DB with type t = net_db
and type key := Operation_hash.t
and type value := Operation.t
and type param := unit
module Protocol :
DISTRIBUTED_DB with type t = db
and type key := Protocol_hash.t
and type value := Protocol.t
and type param := unit
(**/**)
module Raw : sig
val encoding: Message.t P2p.Raw.t Data_encoding.t
val supported_versions: P2p_types.Version.t list
end
module Request : sig
val current_branch: net_db -> ?peer:P2p.Peer_id.t -> unit -> unit
val current_head: net_db -> ?peer:P2p.Peer_id.t -> unit -> unit
end
module Advertise : sig
val current_head:
net_db -> ?peer:P2p.Peer_id.t ->
?mempool:Mempool.t -> State.Block.t -> unit
val current_branch:
net_db -> ?peer:P2p.Peer_id.t ->
State.Block.t -> unit Lwt.t
end

View File

@ -7,6 +7,9 @@
(* *)
(**************************************************************************)
(** Tezos Shell - High-level API for the Gossip network and local
storage (helpers). *)
module type DISTRIBUTED_DB = sig
type t

View File

@ -7,6 +7,8 @@
(* *)
(**************************************************************************)
(** Tezos Shell - Network message for the gossip P2P protocol. *)
type t =
| Get_current_branch of Net_id.t

View File

@ -7,5 +7,7 @@
(* *)
(**************************************************************************)
type t = unit
(** Tezos Shell - All the (persistent) metadata associated to a peer. *)
type t = unit (* TODO *)
val cfg : t P2p.meta_config

View File

@ -43,7 +43,7 @@ let inject_protocol state ?force:_ proto =
Lwt.return (hash, validation)
let inject_block validator ?force bytes operations =
Validator.inject_block
Validator.validate_block
validator ?force bytes operations >>=? fun (hash, block) ->
return (hash, (block >>=? fun _ -> return ()))
@ -607,7 +607,7 @@ module RPC = struct
Distributed_db.watch_operation node.distributed_db
let protocol_watcher node =
Distributed_db.watch_protocol node.distributed_db
Distributed_db.Protocol.watch node.distributed_db
let bootstrapped node =
let block_stream, stopper =

View File

@ -93,9 +93,6 @@ let validate_new_head pv hash (header : Block_header.t) =
"fetching operations for new head %a from peer %a"
Block_hash.pp_short hash
P2p.Peer_id.pp_short pv.peer_id >>= fun () ->
Distributed_db.inject_block_header pv.net_db hash header >>=? fun _ ->
(* TODO look for predownloaded (individual)
operations in the prevalidator ?? *)
map_p
(fun i ->
Lwt_utils.protect ~canceler:pv.canceler begin fun () ->

View File

@ -33,7 +33,10 @@ let list_pendings ?maintain_net_db ~from_block ~to_block old_mempool =
let push_block mempool block =
State.Block.all_operation_hashes block >|= fun operations ->
iter_option maintain_net_db
~f:(fun net_db -> Distributed_db.clear_operations net_db operations) ;
~f:(fun net_db ->
List.iter
(List.iter (Distributed_db.Operation.clear_or_cancel net_db))
operations) ;
List.fold_left
(List.fold_left (fun mempool h -> Operation_hash.Map.remove h mempool))
mempool operations

View File

@ -7,6 +7,8 @@
(* *)
(**************************************************************************)
(** Tezos Shell - Prevalidation of pending operations (a.k.a Mempool) *)
(** The prevalidation worker is in charge of the "mempool" (a.k.a. the
set of known not-invalid-for-sure operations that are not yet
included in the blockchain).
@ -37,7 +39,7 @@ val shutdown: t -> unit Lwt.t
val notify_operations: t -> P2p.Peer_id.t -> Mempool.t -> unit
(** Conditionnaly inject a new operation in the node: the operation will
be ignored when it is (strongly) refused This is the
be ignored when it is (strongly) refused. This is the
entry-point used by the P2P layer. The operation content has been
previously stored on disk. *)
val inject_operation: t -> ?force:bool -> Operation.t -> unit tzresult Lwt.t

View File

@ -7,9 +7,7 @@
(* *)
(**************************************************************************)
type t
type global_state = t
(** An abstraction over all the disk storage used by the node.
(** Tezos Shell - Abstraction over all the disk storage.
It encapsulates access to:
@ -18,14 +16,17 @@ type global_state = t
- the blockchain and its alternate heads of a "network";
- the pool of pending operations of a "network". *)
type t
type global_state = t
(** Read the internal state of the node and initialize
the databases. *)
val read:
?patch_context:(Context.t -> Context.t Lwt.t) ->
store_root:string ->
context_root:string ->
unit ->
global_state tzresult Lwt.t
(** Read the internal state of the node and initialize
the databases. *)
val close:
global_state -> unit Lwt.t
@ -38,12 +39,14 @@ type error +=
(** {2 Network} ************************************************************)
(** Data specific to a given network. *)
(** Data specific to a given network (e.g the mainnet or the current
test network). *)
module Net : sig
type t
type net_state = t
(** The chain starts from a genesis block associated to a seed protocol *)
type genesis = {
time: Time.t ;
block: Block_hash.t ;
@ -51,40 +54,40 @@ module Net : sig
}
val genesis_encoding: genesis Data_encoding.t
(** Initialize a network for a given [genesis]. By default,
the network does accept forking test network. When
[~allow_forked_network:true] is provided, test network are allowed. *)
val create:
global_state ->
?allow_forked_network:bool ->
genesis -> net_state Lwt.t
(** Initialize a network for a given [genesis]. By default,
the network does accept forking test network. When
[~allow_forked_network:true] is provided, test network are allowed. *)
val get: global_state -> Net_id.t -> net_state tzresult Lwt.t
(** Look up for a network by the hash of its genesis block. *)
val get: global_state -> Net_id.t -> net_state tzresult Lwt.t
val all: global_state -> net_state list Lwt.t
(** Returns all the known networks. *)
val all: global_state -> net_state list Lwt.t
val destroy: global_state -> net_state -> unit Lwt.t
(** Destroy a network: this completly removes from the local storage all
the data associated to the network (this includes blocks and
operations). *)
val destroy: global_state -> net_state -> unit Lwt.t
(** Various accessors. *)
val id: net_state -> Net_id.t
val genesis: net_state -> genesis
val global_state: net_state -> global_state
(** Hash of the faked block header of the genesis block. *)
val faked_genesis_hash: net_state -> Block_hash.t
(** Return the expiration timestamp of a test netwowk. *)
val expiration: net_state -> Time.t option
val allow_forked_network: net_state -> bool
(** Accessors. Respectively access to;
- the network id (the hash of its genesis block)
- its optional expiration time
- the associated global state. *)
val global_state: net_state -> global_state
end
(** {2 Block database} ********************************************************)
(** {2 Block database} *****************************************************)
module Block : sig

View File

@ -56,7 +56,7 @@ let get v net_id =
try get_exn v net_id >>= fun nv -> return nv
with Not_found -> fail (Inactive_network net_id)
let inject_block v ?force bytes operations =
let validate_block v ?force bytes operations =
let hash = Block_hash.hash_bytes [bytes] in
match Block_header.of_bytes bytes with
| None -> failwith "Cannot parse block header."

View File

@ -7,11 +7,14 @@
(* *)
(**************************************************************************)
(** Tezos Shell - Main entry point of the validation scheduler. *)
type t
val create: State.t -> Distributed_db.t -> Net_validator.timeout -> t
val shutdown: t -> unit Lwt.t
(** Start the validation scheduler of a given network. *)
val activate:
t ->
?bootstrap_threshold:int ->
@ -23,10 +26,12 @@ type error +=
val get: t -> Net_id.t -> Net_validator.t tzresult Lwt.t
val get_exn: t -> Net_id.t -> Net_validator.t Lwt.t
val inject_block:
(** Force the validation of a block. *)
val validate_block:
t ->
?force:bool ->
MBytes.t -> Operation.t list list ->
(Block_hash.t * State.Block.t tzresult Lwt.t) tzresult Lwt.t
(** Monitor all the valid block (for all activate networks). *)
val watcher: t -> State.Block.t Lwt_stream.t * Watcher.stopper