From 488373551b42cd05b05959ca1568127d15b95e3e Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Fri, 21 Oct 2016 14:01:20 +0200 Subject: [PATCH] add protocol store + rpcs --- src/client/client_node_rpcs.ml | 2 + src/client/client_node_rpcs.mli | 2 + src/node/db/context.ml | 2 +- src/node/db/db_proxy.ml | 41 +++++++++ src/node/db/db_proxy.mli | 12 +++ src/node/db/persist.ml | 14 +++ src/node/db/persist.mli | 9 ++ src/node/db/store.ml | 123 +++++++++++++++++++++++++++ src/node/db/store.mli | 34 ++++++++ src/node/shell/messages.ml | 16 ++-- src/node/shell/messages.mli | 5 +- src/node/shell/node.ml | 59 +++++++++---- src/node/shell/node.mli | 9 ++ src/node/shell/node_rpc.ml | 47 ++++++++++ src/node/shell/node_rpc_services.ml | 103 ++++++++++++++++++++++ src/node/shell/node_rpc_services.mli | 17 ++++ src/node/shell/state.ml | 82 +++++++++++++++++- src/node/shell/state.mli | 73 ++++++++++++++++ src/node/updater/updater.ml | 2 +- src/proto/environment/persist.mli | 6 ++ 20 files changed, 629 insertions(+), 29 deletions(-) diff --git a/src/client/client_node_rpcs.ml b/src/client/client_node_rpcs.ml index a0b7a567e..8197325af 100644 --- a/src/client/client_node_rpcs.ml +++ b/src/client/client_node_rpcs.ml @@ -148,6 +148,8 @@ let inject_block ?(wait = true) ?force block = call_service0 Services.inject_block (block, wait, force) let inject_operation ?(wait = true) ?force operation = call_service0 Services.inject_operation (operation, wait, force) +let inject_protocol ?(wait = true) ?force protocol = + call_service0 Services.inject_protocol (protocol, wait, force) let describe ?recurse path = let prefix, arg = RPC.forge_request Services.describe () recurse in get_json (prefix @ path) arg >>= diff --git a/src/client/client_node_rpcs.mli b/src/client/client_node_rpcs.mli index 2f9ec342b..3fa2349cc 100644 --- a/src/client/client_node_rpcs.mli +++ b/src/client/client_node_rpcs.mli @@ -25,6 +25,8 @@ val inject_block: Block_hash.t tzresult Lwt.t val inject_operation: ?wait:bool -> ?force:bool -> MBytes.t -> Operation_hash.t tzresult Lwt.t +val inject_protocol: + ?wait:bool -> ?force:bool -> Store.protocol -> Protocol_hash.t tzresult Lwt.t module Blocks : sig diff --git a/src/node/db/context.ml b/src/node/db/context.ml index a18795f2c..6f16c2b2d 100644 --- a/src/node/db/context.ml +++ b/src/node/db/context.ml @@ -216,7 +216,7 @@ let remove_rec (module View : VIEW) key = GitStore.FunView.remove_rec View.v (data_key key) >>= fun v -> Lwt.return (pack (module GitStore) View.s v) - +let keys (module View : VIEW) = Store.undefined_key_fn (*-- Initialisation ----------------------------------------------------------*) diff --git a/src/node/db/db_proxy.ml b/src/node/db/db_proxy.ml index 2777256e6..6e4f89bec 100644 --- a/src/node/db/db_proxy.ml +++ b/src/node/db/db_proxy.ml @@ -23,6 +23,8 @@ module type DISTRIBUTED_DB = sig val update: t -> key -> value -> bool Lwt.t val remove: t -> key -> bool Lwt.t val shutdown: t -> unit Lwt.t + + val keys: t -> key list Lwt.t end type operation_state = { @@ -106,3 +108,42 @@ module Block = Persist.MakeImperativeProxy (Store.Faked_functional_block) (Block_hash_table) (Block_scheduler) + +type protocol_state = { + request_protocols: Protocol_hash.t list -> unit ; +} + +module Protocol_scheduler = struct + let name = "protocol_scheduler" + type rdata = Store.net_id + type data = float ref + type state = protocol_state + let init_request _ _ = Lwt.return (ref 0.0) + let request net ~get:_ ~set:_ pendings = + let current_time = Unix.gettimeofday () in + let time = current_time -. (3. +. Random.float 8.) in + let protocols = + List.fold_left + (fun acc (hash, last_request, Store.Net net_id) -> + if !last_request < time then begin + last_request := current_time ; + let prev = + try Block_hash_map.find net_id acc + with Not_found -> [] in + Block_hash_map.add net_id (hash :: prev) acc + end else + acc) + Block_hash_map.empty + pendings in + if Block_hash_map.is_empty protocols then + 0. + else begin + Block_hash_map.iter (fun _net_id -> net.request_protocols) protocols ; + 1. +. Random.float 4. + end +end + +module Protocol = + Persist.MakeImperativeProxy + (Store.Faked_functional_protocol) + (Protocol_hash_table) (Protocol_scheduler) diff --git a/src/node/db/db_proxy.mli b/src/node/db/db_proxy.mli index 9370a1754..b69483202 100644 --- a/src/node/db/db_proxy.mli +++ b/src/node/db/db_proxy.mli @@ -23,6 +23,8 @@ module type DISTRIBUTED_DB = sig val update: t -> key -> value -> bool Lwt.t val remove: t -> key -> bool Lwt.t val shutdown: t -> unit Lwt.t + + val keys: t -> key list Lwt.t end type operation_state = { @@ -44,3 +46,13 @@ module Block : and type key := Store.Block.key and type value := Store.Block.value and type state := block_state + + +type protocol_state = { + request_protocols: Protocol_hash.t list -> unit ; +} +module Protocol : + DISTRIBUTED_DB with type store := Store.Protocol.t + and type key := Store.Protocol.key + and type value := Store.Protocol.value + and type state := protocol_state diff --git a/src/node/db/persist.ml b/src/node/db/persist.ml index 43811d6ab..b0ae55440 100644 --- a/src/node/db/persist.ml +++ b/src/node/db/persist.ml @@ -24,6 +24,8 @@ module type STORE = sig val del: t -> key -> t Lwt.t val list: t -> key list -> key list Lwt.t val remove_rec: t -> key -> t Lwt.t + + val keys : t -> key list Lwt.t end module type BYTES_STORE = sig @@ -35,6 +37,8 @@ module type BYTES_STORE = sig val del: t -> key -> t Lwt.t val list: t -> key list -> key list Lwt.t val remove_rec: t -> key -> t Lwt.t + + val keys : t -> key list Lwt.t end module type TYPED_STORE = sig @@ -45,6 +49,8 @@ module type TYPED_STORE = sig val get: t -> key -> value option Lwt.t val set: t -> key -> value -> t Lwt.t val del: t -> key -> t Lwt.t + + val keys: t -> key list Lwt.t end module type KEY = sig @@ -146,6 +152,7 @@ module MakeBytesStore let remove_rec s k = S.remove_rec s (to_path k) + let keys s = S.keys s >|= List.map of_path end module MakeTypedStore @@ -167,6 +174,7 @@ module MakeTypedStore let raw_get = S.get + let keys = S.keys end module RawKey = struct @@ -369,6 +377,8 @@ module type IMPERATIVE_PROXY = sig val fetch: t -> rdata -> Store.key -> Store.value Lwt.t val pending: t -> Store.key -> bool val shutdown: t -> unit Lwt.t + + val keys: t -> Store.key list Lwt.t end module type IMPERATIVE_PROXY_SCHEDULER = sig @@ -457,6 +467,8 @@ module MakeImperativeProxy let known { store } hash = use store (fun store -> Store.mem store hash) + let keys { store } = use store Store.keys + let read { store } hash = use store (fun store -> Store.get store hash) @@ -528,6 +540,8 @@ module MakeImperativeProxy let shutdown { cancel ; worker } = cancel () >>= fun () -> worker + let keys { store } = + use store (fun store -> Store.keys store) end (*-- Predefined Instances ----------------------------------------------------*) diff --git a/src/node/db/persist.mli b/src/node/db/persist.mli index 69380e0a6..ede8de0e4 100644 --- a/src/node/db/persist.mli +++ b/src/node/db/persist.mli @@ -27,6 +27,8 @@ module type STORE = sig val del: t -> key -> t Lwt.t val list: t -> key list -> key list Lwt.t val remove_rec: t -> key -> t Lwt.t + + val keys : t -> key list Lwt.t end (** Projection of OCaml keys of some abstract type to concrete storage @@ -55,6 +57,8 @@ module type BYTES_STORE = sig val del: t -> key -> t Lwt.t val list: t -> key list -> key list Lwt.t val remove_rec: t -> key -> t Lwt.t + + val keys : t -> key list Lwt.t end module MakeBytesStore (S : STORE) (K : KEY) : @@ -82,6 +86,8 @@ module type TYPED_STORE = sig val get: t -> key -> value option Lwt.t val set: t -> key -> value -> t Lwt.t val del: t -> key -> t Lwt.t + + val keys: t -> key list Lwt.t (** Not always relevant, BEWARE! *) end (** Gives a typed view of a store (values of a given type stored under @@ -91,6 +97,7 @@ end module MakeTypedStore (S : STORE) (K : KEY) (C : VALUE) : TYPED_STORE with type t = S.t and type key = K.t and type value = C.t + (** {2 Persistent Sets} ******************************************************) (** Signature of a set as returned by {!MakePersistentSet} *) @@ -194,6 +201,8 @@ module type IMPERATIVE_PROXY = sig val fetch: t -> rdata -> Store.key -> Store.value Lwt.t val pending: t -> Store.key -> bool val shutdown: t -> unit Lwt.t + + val keys: t -> Store.key list Lwt.t end module type IMPERATIVE_PROXY_SCHEDULER = sig diff --git a/src/node/db/store.ml b/src/node/db/store.ml index 4ffc3abd1..2447b001a 100644 --- a/src/node/db/store.ml +++ b/src/node/db/store.ml @@ -92,11 +92,13 @@ type generic_store = FS.t type block_store = FS.t type blockchain_store = FS.t type operation_store = FS.t +type protocol_store = FS.t type store = { block: block_store Persist.shared_ref ; blockchain: blockchain_store Persist.shared_ref ; operation: operation_store Persist.shared_ref ; + protocol: protocol_store Persist.shared_ref ; global_store: generic_store Persist.shared_ref ; net_init: ?expiration:Time.t -> genesis -> net_store Lwt.t ; net_read: net_id -> net_store tzresult Lwt.t ; @@ -126,6 +128,8 @@ module type TYPED_IMPERATIVE_STORE = sig val get_exn: t -> key -> value Lwt.t val set: t -> key -> value -> unit Lwt.t val del: t -> key -> unit Lwt.t + + val keys: t -> key list Lwt.t end module type IMPERATIVE_STORE = sig @@ -146,6 +150,14 @@ module type KEY = sig val to_path: t -> string list end +module type HASHKEY = sig + type t + val to_path: t -> string list + val of_path: string list -> t + val prefix : string list + val length : int +end + module Raw_key = struct type t = string list let to_path x = x @@ -187,6 +199,7 @@ module Errors_value = struct let of_bytes b = Data_encoding.(Binary.of_bytes (list (error_encoding ()))) b end +let undefined_key_fn = Lwt.fail_invalid_arg "function keys cannot be implemented in this module" module Make (K : KEY) (V : Persist.VALUE) = struct type t = FS.t @@ -205,6 +218,8 @@ module Make (K : KEY) (V : Persist.VALUE) = struct let del t k = FS.del t (K.to_path k) let list t ks = FS.list t (List.map K.to_path ks) let remove_rec t k = FS.remove_rec t (K.to_path k) + + let keys _t = undefined_key_fn end module Data_store : IMPERATIVE_STORE with type t = FS.t = @@ -212,6 +227,7 @@ module Data_store : IMPERATIVE_STORE with type t = FS.t = include Data_store + (*-- Typed block store under "blocks/" ---------------------------------------*) type shell_block = { @@ -350,6 +366,7 @@ module Block = struct let raw_get t k = Raw_block.get t k + let keys _t = undefined_key_fn (** We never list keys here *) end module Blockchain_succ_key = struct @@ -484,9 +501,111 @@ module Operation = struct let to_bytes = Raw_operation_value.to_bytes let hash op = Operation_hash.hash_bytes [to_bytes op] let raw_get t k = Raw_operation_data.get t k + + let keys _t = undefined_key_fn (** We never list keys here *) end +(*-- Typed operation store under "protocols/" -------------------------------*) + +type component = { + name : string ; + interface : string option ; + implementation : string ; +} + +let component_encoding = + let open Data_encoding in + conv + (fun { name ; interface; implementation } -> (name, interface, implementation)) + (fun (name, interface, implementation) -> { name ; interface ; implementation }) + (obj3 + (req "name" string) + (opt "interface" string) + (req "implementation" string)) + +type protocol = component list +let protocol_encoding = Data_encoding.list component_encoding + +module Raw_protocol_value = struct + type t = protocol + let to_bytes v = Data_encoding.Binary.to_bytes protocol_encoding v + let of_bytes b = Data_encoding.Binary.of_bytes protocol_encoding b +end + +module Raw_protocol_key = struct + type t = Protocol_hash.t + let to_path p = "protocols" :: Protocol_hash.to_path p @ [ "contents" ] +end + +module Protocol_data = Make (Raw_protocol_key) (Raw_protocol_value) +module Raw_protocol_data = Make (Raw_protocol_key) (Raw_value) + +module Protocol_time_key = struct + type t = Protocol_hash.t + let to_path p = "protocols" :: Protocol_hash.to_path p @ [ "discovery_time" ] +end +module Protocol_time = Make (Protocol_time_key) (Time_value) + +module Protocol_errors_key = struct + type t = Protocol_hash.t + let to_path p = "protocols" :: Protocol_hash.to_path p @ [ "errors" ] +end +module Protocol_errors = Make (Protocol_errors_key) (Errors_value) + +module Protocol = struct + type t = FS.t + type key = Protocol_hash.t + type value = protocol tzresult Time.timed_data + let mem = Protocol_data.mem + let get s k = + Protocol_time.get s k >>= function + | None -> Lwt.return_none + | Some time -> + Protocol_errors.get s k >>= function + | Some exns -> Lwt.return (Some { Time.data = Error exns ; time }) + | None -> + Protocol_data.get s k >>= function + | None -> Lwt.return_none + | Some bytes -> Lwt.return (Some { Time.data = Ok bytes ; time }) + let get_exn s k = + get s k >>= function + | None -> Lwt.fail Not_found + | Some x -> Lwt.return x + let set s k { Time.data ; time } = + Protocol_time.set s k time >>= fun () -> + match data with + | Ok bytes -> + Protocol_data.set s k bytes >>= fun () -> + Protocol_errors.del s k + | Error exns -> + Protocol_errors.set s k exns >>= fun () -> + Protocol_data.del s k + let del s k = + Protocol_time.del s k >>= fun () -> + Protocol_data.del s k >>= fun () -> + Protocol_errors.del s k + let of_bytes = Raw_protocol_value.of_bytes + let to_bytes = Raw_protocol_value.to_bytes + let hash proto = Protocol_hash.hash_bytes [to_bytes proto] + let compare p1 p2 = + Protocol_hash.(compare (hash_bytes [to_bytes p1]) (hash_bytes [to_bytes p2])) + let equal b1 b2 = compare b1 b2 = 0 + let raw_get t k = Raw_protocol_data.get t k + + let fold s x ~f = + let rec dig i root acc = + if i <= 0 then + f (Protocol_hash.of_path @@ List.tl root) acc + else + FS.list s [root] >>= fun roots -> + Lwt_list.fold_right_s (dig (i - 1)) roots acc + in + dig Protocol_hash.path_len ["protocols"] x + + let keys s = fold s [] ~f:(fun k a -> Lwt.return @@ k :: a) +end + (*- Genesis and initialization -----------------------------------------------*) let genesis_encoding = @@ -620,6 +739,7 @@ let init root = { block = Persist.share t ; blockchain = Persist.share t ; operation = Persist.share t ; + protocol = Persist.share t ; global_store = Persist.share t ; net_init = net_init ~root ; net_read = net_read ~root ; @@ -638,6 +758,7 @@ end module Faked_functional_operation = Faked_functional_typed_store (Operation) module Faked_functional_block = Faked_functional_typed_store (Block) +module Faked_functional_protocol = Faked_functional_typed_store (Protocol) module Faked_functional_store : Persist.STORE with type t = t = struct @@ -645,4 +766,6 @@ module Faked_functional_store : Persist.STORE with type t = t let set s k v = Data_store.set s k v >>= fun () -> Lwt.return s let del s k = Data_store.del s k >>= fun () -> Lwt.return s let remove_rec s k = Data_store.remove_rec s k >>= fun () -> Lwt.return s + + let keys _s = invalid_arg "function keys not implementable here" (** We never use list here. *) end diff --git a/src/node/db/store.mli b/src/node/db/store.mli index a91909889..8bbc1ebf7 100644 --- a/src/node/db/store.mli +++ b/src/node/db/store.mli @@ -21,6 +21,7 @@ module type TYPED_IMPERATIVE_STORE = sig val get_exn: t -> key -> value Lwt.t val set: t -> key -> value -> unit Lwt.t val del: t -> key -> unit Lwt.t + val keys: t -> key list Lwt.t end module type IMPERATIVE_STORE = sig @@ -39,11 +40,13 @@ type generic_store type block_store type blockchain_store type operation_store +type protocol_store type store = private { block: block_store Persist.shared_ref ; blockchain: blockchain_store Persist.shared_ref ; operation: operation_store Persist.shared_ref ; + protocol: protocol_store Persist.shared_ref ; global_store: generic_store Persist.shared_ref ; net_init: ?expiration:Time.t -> genesis -> net_store Lwt.t ; net_read: net_id -> net_store tzresult Lwt.t ; @@ -70,6 +73,9 @@ val pp_net_id: Format.formatter -> net_id -> unit (** Open or initialize a store at a given path. *) val init: string -> store Lwt.t +(** Lwt exn returned when function keys is not implemented *) +val undefined_key_fn : 'a Lwt.t + (** {2 Generic interface} ****************************************************) (** The generic primitives do work on the direct root, but in a @@ -107,6 +113,16 @@ type block = { val shell_block_encoding: shell_block Data_encoding.t val block_encoding: block Data_encoding.t +(** Protocol *) +type component = { + name : string ; + interface : string option ; + implementation : string ; +} + +type protocol = component list +val protocol_encoding : protocol Data_encoding.t + (** {2 Block and operations store} ********************************************) module Block : sig @@ -177,6 +193,19 @@ module Operation : sig end +module Protocol : sig + val of_bytes: MBytes.t -> protocol option + val to_bytes: protocol -> MBytes.t + val hash: protocol -> Protocol_hash.t + + include TYPED_IMPERATIVE_STORE + with type t = protocol_store + and type key = Protocol_hash.t + and type value = protocol tzresult Time.timed_data + + val raw_get: t -> Protocol_hash.t -> MBytes.t option Lwt.t +end + (**/**) (* For testing only *) (* module LwtUnixStore : sig *) @@ -198,4 +227,9 @@ module Faked_functional_block : and type value = Block.value and type key = Block.key +module Faked_functional_protocol : + Persist.TYPED_STORE with type t = Protocol.t + and type value = Protocol.value + and type key = Protocol.key + module Faked_functional_store : Persist.STORE with type t = t diff --git a/src/node/shell/messages.ml b/src/node/shell/messages.ml index eba85a1a0..22e32f68c 100644 --- a/src/node/shell/messages.ml +++ b/src/node/shell/messages.ml @@ -25,8 +25,8 @@ type message = | Get_operations of Operation_hash.t list | Operation of MBytes.t - | Current_protocol of net_id - | Protocol_inventory of Protocol_hash.t + | Get_protocols of Protocol_hash.t list + | Protocol of MBytes.t let to_frame msg = @@ -54,10 +54,10 @@ let to_frame msg = | Operation b -> [ S 2703 ; B b ] - | Current_protocol (Net net_id) -> - [ S 2800 ; bh net_id ] - | Protocol_inventory p -> - [ S 2801 ; ph p ] + | Get_protocols protos -> + [ S 2800 ; F (List.map ph protos) ] + | Protocol p -> + [ S 2801 ; B p ] let from_frame msg = @@ -82,9 +82,9 @@ let from_frame msg = Some (Get_operations (List.map oph ops)) | [ S 2703 ; B contents ] -> Some (Operation contents) - | [ S 2800 ; B netid ] -> Some (Current_protocol (net netid)) + | [ S 2800 ; F protos ] -> Some (Get_protocols (List.map ph protos)) - | [ S 2801 ; p ] -> Some (Protocol_inventory (ph p)) + | [ S 2801 ; B contents ] -> Some (Protocol contents) | _ -> None diff --git a/src/node/shell/messages.mli b/src/node/shell/messages.mli index 0f8c1601a..2c3df42cc 100644 --- a/src/node/shell/messages.mli +++ b/src/node/shell/messages.mli @@ -22,8 +22,9 @@ type message = | Get_operations of Operation_hash.t list | Operation of MBytes.t - | Current_protocol of Store.net_id - | Protocol_inventory of Protocol_hash.t + | Get_protocols of Protocol_hash.t list + | Protocol of MBytes.t + (** Converts a high level message to a network frame *) val to_frame: message -> Netbits.frame diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index f14d5e1b7..67291fac5 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -24,6 +24,13 @@ let inject_operation validator ?force bytes = let hash = Operation_hash.hash_bytes [bytes] in Lwt.return (hash, t) +let inject_protocol state ?force:_ proto = + (* TODO: Validate the protocol *) + let proto_bytes = Store.Protocol.to_bytes proto in + let hash = Protocol_hash.hash_bytes [proto_bytes] in + let t = State.Protocol.store state proto_bytes >>|? ignore in + Lwt.return (hash, t) + let process_operation state validator bytes = State.Operation.store state bytes >>= function | Error _ | Ok None -> Lwt.return_unit @@ -38,6 +45,13 @@ let process_operation state validator bytes = Prevalidator.register_operation prevalidator hash ; Lwt.return_unit +let process_protocol state _validator bytes = + State.Protocol.store state bytes >>= function + | Error _ | Ok None -> Lwt.return_unit + | Ok (Some (hash, _proto)) -> + (* TODO: Store only pending protocols... *) + lwt_log_info "process Protocol %a" Protocol_hash.pp_short hash + let process_block state validator bytes = State.Block.store state bytes >>= function | Error _ | Ok None -> Lwt.return_unit @@ -144,23 +158,20 @@ let process state validator msg = process_operation state validator content >>= fun () -> Lwt.return_nil - | Current_protocol net_id -> - lwt_log_info "process Current_protocol" >>= fun () -> - if not (State.Net.is_active state net_id) then - Lwt.return_nil - else begin - match State.Net.get state net_id with - | Error _ -> Lwt.return_nil - | Ok net -> - State.Net.Blockchain.head net >>= fun head -> - Lwt.return [Protocol_inventory head.protocol_hash] - end + | Get_protocols protos -> + lwt_log_info "process Get_protocols" >>= fun () -> + Lwt_list.map_p (State.Protocol.raw_read state) protos >>= fun protos -> + let cons_protocol acc = function + | Some proto -> Protocol proto :: acc + | None -> acc in + Lwt.return (List.fold_left cons_protocol [] protos) - | Protocol_inventory _ -> - lwt_log_info "process Protocol_inventory" >>= fun () -> - (* TODO... *) + | Protocol content -> + lwt_log_info "process Protocol" >>= fun () -> + process_protocol state validator content >>= fun () -> Lwt.return_nil + type t = { state: State.t ; validator: Validator.worker ; @@ -170,6 +181,8 @@ type t = { ?force:bool -> MBytes.t -> (Block_hash.t * unit tzresult Lwt.t) Lwt.t ; inject_operation: ?force:bool -> MBytes.t -> (Operation_hash.t * unit tzresult Lwt.t) Lwt.t ; + inject_protocol: + ?force:bool -> Store.protocol -> (Protocol_hash.t * unit tzresult Lwt.t) Lwt.t ; shutdown: unit -> unit Lwt.t ; } @@ -184,6 +197,11 @@ let request_blocks net _net_id blocks = For now simply broadcast the request to all our neighbours. *) P2p.broadcast (Messages.(to_frame (Get_blocks blocks))) net +let request_protocols net protocols = + (* TODO improve the lookup strategy. + For now simply broadcast the request to all our neighbours. *) + P2p.broadcast (Messages.(to_frame (Get_protocols protocols))) net + let init_p2p net_params = match net_params with | None -> @@ -200,8 +218,9 @@ let create lwt_log_info "reading state..." >>= fun () -> let request_operations = request_operations p2p in let request_blocks = request_blocks p2p in + let request_protocols = request_protocols p2p in State.read - ~request_operations ~request_blocks + ~request_operations ~request_blocks ~request_protocols ~store_root ~context_root ~ttl:(48 * 3600) (* 2 days *) ?patch_context () >>= fun state -> let validator = Validator.create_worker p2p state in @@ -264,6 +283,7 @@ let create global_validator ; inject_block = inject_block state validator ; inject_operation = inject_operation validator ; + inject_protocol = inject_protocol state ; shutdown ; } @@ -310,6 +330,7 @@ module RPC = struct let inject_block node = node.inject_block let inject_operation node = node.inject_operation + let inject_protocol node = node.inject_protocol let raw_block_info node hash = State.Valid_block.read_exn node.state hash >|= convert @@ -449,6 +470,11 @@ module RPC = struct State.Net.Mempool.for_block net b >|= fun ops -> Updater.empty_result, ops + let protocols { state } = State.Protocol.keys state + + let protocol_content node hash = + State.Protocol.read node.state hash + let preapply node block ~timestamp ~sort ops = begin match block with @@ -539,6 +565,9 @@ module RPC = struct let operation_watcher node = State.Operation.create_watcher node.state + let protocol_watcher node = + State.Protocol.create_watcher node.state + let validate node net_id block = Validator.get node.validator net_id >>=? fun net_v -> Validator.fetch_block net_v block >>=? fun _ -> diff --git a/src/node/shell/node.mli b/src/node/shell/node.mli index f1ffe52c6..d5368fa7c 100644 --- a/src/node/shell/node.mli +++ b/src/node/shell/node.mli @@ -29,6 +29,8 @@ module RPC : sig t -> ?force:bool -> MBytes.t -> (Block_hash.t * unit tzresult Lwt.t) Lwt.t val inject_operation: t -> ?force:bool -> MBytes.t -> (Operation_hash.t * unit tzresult Lwt.t) Lwt.t + val inject_protocol: + t -> ?force:bool -> Store.protocol -> (Protocol_hash.t * unit tzresult Lwt.t) Lwt.t val raw_block_info: t -> Block_hash.t -> block_info Lwt.t @@ -54,6 +56,13 @@ module RPC : sig val pending_operations: t -> block -> (error Updater.preapply_result * Operation_hash_set.t) Lwt.t + val protocols: + t -> Protocol_hash.t list Lwt.t + val protocol_content: + t -> Protocol_hash.t -> Store.protocol tzresult Time.timed_data option Lwt.t + val protocol_watcher: + t -> (Protocol_hash.t * Store.protocol) Lwt_stream.t * (unit -> unit) + val context_dir: t -> block -> 'a RPC.directory option Lwt.t diff --git a/src/node/shell/node_rpc.ml b/src/node/shell/node_rpc.ml index 9a58865f0..a938ec44c 100644 --- a/src/node/shell/node_rpc.ml +++ b/src/node/shell/node_rpc.ml @@ -332,6 +332,42 @@ let get_operations node hash () = | Some bytes -> RPC.Answer.return bytes | None -> raise Not_found +let list_protocols node {Services.Protocols.monitor; contents} = + let monitor = match monitor with None -> false | Some x -> x in + let include_contents = match contents with None -> false | Some x -> x in + Node.RPC.protocols node >>= fun protocols -> + Lwt_list.map_p + (fun hash -> + if include_contents then + Node.RPC.protocol_content node hash >>= function + | None | Some { Time.data = Error _ } -> Lwt.return (hash, None) + | Some { Time.data = Ok bytes }-> + Lwt.return (hash, Some bytes) + else + Lwt.return (hash, None)) + protocols >>= fun protocols -> + if not monitor then + RPC.Answer.return protocols + else + let stream, shutdown = Node.RPC.protocol_watcher node in + let first_request = ref true in + let next () = + if not !first_request then + Lwt_stream.get stream >>= function + | None -> Lwt.return_none + | Some (h, op) when include_contents -> Lwt.return (Some [h, Some op]) + | Some (h, _) -> Lwt.return (Some [h, None]) + else begin + first_request := false ; + Lwt.return (Some protocols) + end in + RPC.Answer.return_stream { next ; shutdown } + +let get_protocols node hash () = + Node.RPC.protocol_content node hash >>= function + | Some bytes -> RPC.Answer.return bytes + | None -> raise Not_found + let build_rpc_directory node = let dir = RPC.empty in let dir = RPC.register0 dir Services.Blocks.list (list_blocks node) in @@ -351,6 +387,10 @@ let build_rpc_directory node = RPC.register0 dir Services.Operations.list (list_operations node) in let dir = RPC.register1 dir Services.Operations.bytes (get_operations node) in + let dir = + RPC.register0 dir Services.Protocols.list (list_protocols node) in + let dir = + RPC.register1 dir Services.Protocols.bytes (get_protocols node) in let dir = let implementation (net_id, pred, time, fitness, operations, header) = Node.RPC.block_info node (`Head 0) >>= fun bi -> @@ -383,6 +423,13 @@ let build_rpc_directory node = (if blocking then wait else return ()) >>=? fun () -> return hash end >>= RPC.Answer.return in RPC.register0 dir Services.inject_operation implementation in + let dir = + let implementation (proto, blocking, force) = + Node.RPC.inject_protocol ?force node proto >>= fun (hash, wait) -> + begin + (if blocking then wait else return ()) >>=? fun () -> return hash + end >>= RPC.Answer.return in + RPC.register0 dir Services.inject_protocol implementation in let dir = let implementation () = RPC.Answer.return Data_encoding.Json.(schema (Error_monad.error_encoding ())) in diff --git a/src/node/shell/node_rpc_services.ml b/src/node/shell/node_rpc_services.ml index 75d8bea6e..4b85cd97e 100644 --- a/src/node/shell/node_rpc_services.ml +++ b/src/node/shell/node_rpc_services.ml @@ -383,6 +383,56 @@ module Operations = struct end +module Protocols = struct + let protocols_arg = + let name = "protocol_id" in + let descr = + "A protocol identifier in hexadecimal." in + let construct = Protocol_hash.to_b48check in + let destruct h = + try Ok (Protocol_hash.of_b48check h) + with _ -> Error "Can't parse hash" in + RPC.Arg.make ~name ~descr ~construct ~destruct () + + let bytes = + RPC.service + ~input: empty + ~output: + (obj1 (req "data" + (describe ~title: "Tezos protocol" + (Time.timed_encoding @@ + RPC.Error.wrap @@ + Store.protocol_encoding)))) + RPC.Path.(root / "protocols" /: protocols_arg) + + type list_param = { + contents: bool option ; + monitor: bool option ; + } + + let list_param_encoding = + conv + (fun {contents; monitor} -> (contents, monitor)) + (fun (contents, monitor) -> {contents; monitor}) + (obj2 + (opt "contents" bool) + (opt "monitor" bool)) + + let list = + RPC.service + ~input: list_param_encoding + ~output: + (obj1 + (req "protocols" + (list + (obj2 + (req "hash" Protocol_hash.encoding) + (opt "contents" + (dynamic_size Store.protocol_encoding))) + ))) + RPC.Path.(root / "protocols") +end + let forge_block = RPC.service ~description: "Forge a block header" @@ -480,6 +530,59 @@ let inject_operation = (obj1 (req "injectedOperation" Operation_hash.encoding))) RPC.Path.(root / "inject_operation") +let inject_protocol = + let proto = + (list + (obj3 + (req "name" + (describe ~title:"OCaml module name" + string)) + (opt "interface" + (describe + ~description:"Content of the .mli file" + string)) + (req "implementation" + (describe + ~description:"Content of the .ml file" + string)))) + in + let proto_of_rpc = + List.map (fun (name, interface, implementation) -> + { Store.name; interface; implementation }) + in + let rpc_of_proto = + List.map (fun { Store.name; interface; implementation } -> + (name, interface, implementation)) + in + RPC.service + ~description: + "Inject a protocol in node. Returns the ID of the protocol." + ~input: + (conv + (fun (proto, blocking, force) -> (rpc_of_proto proto, Some blocking, force)) + (fun (proto, blocking, force) -> (proto_of_rpc proto, unopt true blocking, force)) + (obj3 + (req "protocol" + (describe ~title: "Tezos protocol" + proto)) + (opt "blocking" + (describe + ~description: + "Should the RPC wait for the protocol to be \ + validated before to answer. (default: true)" + bool)) + (opt "force" + (describe + ~description: + "Should we inject protocol that is invalid. (default: false)" + bool)))) + ~output: + (RPC.Error.wrap @@ + describe + ~title: "Hash of the injected protocol" @@ + (obj1 (req "injectedProtocol" Protocol_hash.encoding))) + RPC.Path.(root / "inject_protocol") + let describe = RPC.Description.service ~description: "RPCs documentation and input/output schema" diff --git a/src/node/shell/node_rpc_services.mli b/src/node/shell/node_rpc_services.mli index 6aa778939..055de2ec6 100644 --- a/src/node/shell/node_rpc_services.mli +++ b/src/node/shell/node_rpc_services.mli @@ -97,6 +97,19 @@ module Operations : sig list_param, (Operation_hash.t * Store.operation option) list) RPC.service end +module Protocols : sig + val bytes: + (unit, unit * Protocol_hash.t, unit, + Store.protocol tzresult Time.timed_data) RPC.service + type list_param = { + contents: bool option ; + monitor: bool option ; + } + val list: + (unit, unit, + list_param, (Protocol_hash.t * Store.protocol option) list) RPC.service +end + val forge_block: (unit, unit, Updater.net_id option * Block_hash.t option * Time.t option * @@ -115,5 +128,9 @@ val inject_operation: (unit, unit, (MBytes.t * bool * bool option), Operation_hash.t tzresult) RPC.service +val inject_protocol: + (unit, unit, + (Store.protocol * bool * bool option), Protocol_hash.t tzresult) RPC.service + val describe: (unit, unit, bool option, RPC.Description.directory_descr) RPC.service diff --git a/src/node/shell/state.ml b/src/node/shell/state.ml index b4433eb6e..3fb1853dd 100644 --- a/src/node/shell/state.ml +++ b/src/node/shell/state.ml @@ -82,6 +82,9 @@ type t = { operation_db: Db_proxy.Operation.t ; operation_watchers: (Operation_hash.t * Store.operation) Watcher.t list ref ; + protocol_db: Db_proxy.Protocol.t ; + protocol_watchers: + (Protocol_hash.t * Store.protocol) Watcher.t list ref ; valid_block_state: valid_block_state Persist.shared_ref ; } @@ -154,6 +157,15 @@ module InvalidOperations = Persist.MakeBufferedPersistentSet (Store.Faked_functional_store) (InvalidOperations_key) (Operation_hash_set) +module InvalidProtocols_key = struct + include Protocol_hash + let prefix = ["state"; "invalid_protocols"] + let length = path_len +end +module InvalidProtocols = + Persist.MakeBufferedPersistentSet + (Store.Faked_functional_store) (InvalidProtocols_key) (Protocol_hash_set) + module InvalidBlocks_key = struct include Block_hash let prefix = ["state"; "invalid_blocks"] @@ -236,6 +248,66 @@ module Operation = struct end +module Protocol = struct + type key = Store.Protocol.key + + type component = Store.component = { + name: string; + interface: string option; + implementation: string + } + + type t = Store.protocol + + type protocol = t + exception Invalid of key * error list + let of_bytes = Store.Protocol.of_bytes + let to_bytes = Store.Protocol.to_bytes + let known t k = Db_proxy.Protocol.known t.protocol_db k + let read t k = Db_proxy.Protocol.read t.protocol_db k + let read_exn t k = + Db_proxy.Protocol.read t.protocol_db k >>= function + | None -> Lwt.fail Not_found + | Some { data = Error e } -> Lwt.fail (Invalid (k, e)) + | Some { data = Ok data ; time } -> Lwt.return { Time.data ; time } + let hash = Store.Protocol.hash + let raw_read t k = + Persist.use t.store.Store.protocol + (fun store -> Store.Protocol.raw_get store k) + let prefetch t net_id ks = + List.iter (Db_proxy.Protocol.prefetch t.protocol_db net_id) ks + let fetch t net_id k = Db_proxy.Protocol.fetch t.protocol_db net_id k + let store t bytes = + match of_bytes bytes with + | None -> fail Cannot_parse + | Some proto -> + let h = hash proto in + Db_proxy.Protocol.store t.protocol_db h (Time.make_timed (Ok proto)) + >>= function + | true -> + Watcher.notify !(t.protocol_watchers) (h, proto) ; + return (Some (h, proto)) + | false -> + return None + let mark_invalid t k e = + Db_proxy.Protocol.update t.protocol_db k (Time.make_timed (Error e)) + >>= function + | true -> + Persist.update t.store.global_store (fun store -> + InvalidProtocols.set store k >>= fun store -> + Lwt.return (Some store)) >>= fun _ -> + Lwt.return true + | false -> Lwt.return false + + let invalid state = + Persist.use state.store.global_store InvalidProtocols.read + + let create_watcher t = Watcher.create_stream t.protocol_watchers () + + let keys { protocol_db } = Db_proxy.Protocol.keys protocol_db + +end + let iter_predecessors (type t) (compare: t -> t -> int) @@ -458,7 +530,7 @@ module Valid_block = struct hash: Block_hash.t ; pred: Block_hash.t ; timestamp: Time.t ; - fitness: Protocol.fitness ; + fitness: Fitness.fitness ; operations: Operation_hash.t list ; discovery_time: Time.t ; protocol_hash: Protocol_hash.t ; @@ -785,6 +857,8 @@ module Valid_block = struct | Error exns -> locked_store_invalid vstate hash exns >>= fun _changed -> Lwt.return vstate + + let keys _ = Store.undefined_key_fn end let iter_predecessors = @@ -1216,12 +1290,14 @@ let () = (** Whole protocol state : read and store. *) let read - ~request_operations ~request_blocks + ~request_operations ~request_blocks ~request_protocols ~store_root ~context_root ~ttl ?patch_context () = Store.init store_root >>= fun store -> lwt_log_info "Initialising the distributed database..." >>= fun () -> let operation_db = Db_proxy.Operation.create { request_operations } store.operation in + let protocol_db = + Db_proxy.Protocol.create { request_protocols } store.protocol in let block_db = Db_proxy.Block.create { request_blocks } store.block in Valid_block.create @@ -1233,6 +1309,8 @@ let read nets = Block_hash_table.create 7 ; operation_db ; operation_watchers = ref [] ; + protocol_db ; + protocol_watchers = ref [] ; block_db ; block_watchers = ref [] ; valid_block_state ; } diff --git a/src/node/shell/state.mli b/src/node/shell/state.mli index e5084f6b9..7ea0dfdda 100644 --- a/src/node/shell/state.mli +++ b/src/node/shell/state.mli @@ -39,6 +39,7 @@ type error += val read: request_operations: (net_id -> Operation_hash.t list -> unit) -> request_blocks: (net_id -> Block_hash.t list -> unit) -> + request_protocols: (Protocol_hash.t list -> unit) -> store_root:string -> context_root:string -> ttl:int -> @@ -342,6 +343,78 @@ module Valid_block : sig end +(** {2 Protocol database} ****************************************************) + +(** The local and distributed database of protocols. *) +module Protocol : sig + + type key = Protocol_hash.t + + type component = Store.component = { + name : string ; + interface : string option ; + implementation : string ; + } + + type t = Store.protocol + + type protocol = t + + (** Is a protocol stored in the local database ? *) + val known: state -> key -> bool Lwt.t + + (** Read a protocol in the local database. This returns [None] + when the protocol does not exist in the local database; this returns + [Some (Error _)] when [mark_invalid] was used. This also returns + the time when the protocol was stored on the local database. *) + val read: + state -> key -> protocol tzresult Time.timed_data option Lwt.t + + (** Read a protocol in the local database. This throws [Not_found] + when the protocol does not exist in the local database or when + [mark_invalid] was used. *) + val read_exn: + state -> key -> protocol Time.timed_data Lwt.t + exception Invalid of key * error list + + (** Read an operation in the local database (without parsing). *) + val raw_read: state -> key -> MBytes.t option Lwt.t + + (** Read a protocol from the distributed database. This may block + while the block is fetched from the P2P network. *) + val fetch: + state -> Store.net_id -> key -> protocol tzresult Time.timed_data Lwt.t + + (** Request protocols on the P2P network without waiting for answers. *) + val prefetch: state -> Store.net_id -> key list -> unit + + (** Add a protocol to the local database. This returns [Ok None] + if the protocol was already stored in the database, or returns + the parsed operation if not. It may also fails when the shell + part of the operation cannot be parsed or when the operation + does not belong to an active "network". For a given sequence of + bytes, it is guaranted that at most one call to [store] returns + [Some _]. *) + val store: + state -> MBytes.t -> (Protocol_hash.t * protocol) option tzresult Lwt.t + + (** Mark a protocol as invalid in the local database. This returns + [false] if the protocol was previously stored in the local + database. The protocol is not removed from the local database, + but its content is replaced by a list of errors. *) + val mark_invalid: state -> key -> error list -> bool Lwt.t + + (** Returns the list known-invalid procols. *) + val invalid: state -> Protocol_hash_set.t Lwt.t + + (** Create a stream of all the newly locally-stored protocols. + The returned function allows to terminate the stream. *) + val create_watcher: + state -> (key * protocol) Lwt_stream.t * (unit -> unit) + + val keys: state -> key list Lwt.t +end + (** {2 Network} ****************************************************************) (** Data specific to a given network. *) diff --git a/src/node/updater/updater.ml b/src/node/updater/updater.ml index c7a7e2167..32a5971fd 100644 --- a/src/node/updater/updater.ml +++ b/src/node/updater/updater.ml @@ -131,7 +131,7 @@ let get_basedir () = let init dir = basedir := Some dir -type component = { +type component = Store.component = { name : string ; interface : string option ; implementation : string ; diff --git a/src/proto/environment/persist.mli b/src/proto/environment/persist.mli index 25657f2c8..e7fc1d792 100644 --- a/src/proto/environment/persist.mli +++ b/src/proto/environment/persist.mli @@ -18,6 +18,8 @@ module type STORE = sig val del: t -> key -> t Lwt.t val list: t -> key list -> key list Lwt.t val remove_rec: t -> key -> t Lwt.t + + val keys: t -> key list Lwt.t end (** Projection of OCaml keys of some abstract type to concrete storage @@ -57,6 +59,8 @@ module type BYTES_STORE = sig val del: t -> key -> t Lwt.t val list: t -> key list -> key list Lwt.t val remove_rec: t -> key -> t Lwt.t + + val keys: t -> key list Lwt.t end module MakeBytesStore (S : STORE) (K : KEY) : @@ -73,6 +77,8 @@ module type TYPED_STORE = sig val get: t -> key -> value option Lwt.t val set: t -> key -> value -> t Lwt.t val del: t -> key -> t Lwt.t + + val keys: t -> key list Lwt.t end (** Gives a typed view of a store (values of a given type stored under