add protocol store + rpcs
This commit is contained in:
parent
d11e44dead
commit
488373551b
@ -148,6 +148,8 @@ let inject_block ?(wait = true) ?force block =
|
||||
call_service0 Services.inject_block (block, wait, force)
|
||||
let inject_operation ?(wait = true) ?force operation =
|
||||
call_service0 Services.inject_operation (operation, wait, force)
|
||||
let inject_protocol ?(wait = true) ?force protocol =
|
||||
call_service0 Services.inject_protocol (protocol, wait, force)
|
||||
let describe ?recurse path =
|
||||
let prefix, arg = RPC.forge_request Services.describe () recurse in
|
||||
get_json (prefix @ path) arg >>=
|
||||
|
@ -25,6 +25,8 @@ val inject_block:
|
||||
Block_hash.t tzresult Lwt.t
|
||||
val inject_operation:
|
||||
?wait:bool -> ?force:bool -> MBytes.t -> Operation_hash.t tzresult Lwt.t
|
||||
val inject_protocol:
|
||||
?wait:bool -> ?force:bool -> Store.protocol -> Protocol_hash.t tzresult Lwt.t
|
||||
|
||||
module Blocks : sig
|
||||
|
||||
|
@ -216,7 +216,7 @@ let remove_rec (module View : VIEW) key =
|
||||
GitStore.FunView.remove_rec View.v (data_key key) >>= fun v ->
|
||||
Lwt.return (pack (module GitStore) View.s v)
|
||||
|
||||
|
||||
let keys (module View : VIEW) = Store.undefined_key_fn
|
||||
|
||||
(*-- Initialisation ----------------------------------------------------------*)
|
||||
|
||||
|
@ -23,6 +23,8 @@ module type DISTRIBUTED_DB = sig
|
||||
val update: t -> key -> value -> bool Lwt.t
|
||||
val remove: t -> key -> bool Lwt.t
|
||||
val shutdown: t -> unit Lwt.t
|
||||
|
||||
val keys: t -> key list Lwt.t
|
||||
end
|
||||
|
||||
type operation_state = {
|
||||
@ -106,3 +108,42 @@ module Block =
|
||||
Persist.MakeImperativeProxy
|
||||
(Store.Faked_functional_block)
|
||||
(Block_hash_table) (Block_scheduler)
|
||||
|
||||
type protocol_state = {
|
||||
request_protocols: Protocol_hash.t list -> unit ;
|
||||
}
|
||||
|
||||
module Protocol_scheduler = struct
|
||||
let name = "protocol_scheduler"
|
||||
type rdata = Store.net_id
|
||||
type data = float ref
|
||||
type state = protocol_state
|
||||
let init_request _ _ = Lwt.return (ref 0.0)
|
||||
let request net ~get:_ ~set:_ pendings =
|
||||
let current_time = Unix.gettimeofday () in
|
||||
let time = current_time -. (3. +. Random.float 8.) in
|
||||
let protocols =
|
||||
List.fold_left
|
||||
(fun acc (hash, last_request, Store.Net net_id) ->
|
||||
if !last_request < time then begin
|
||||
last_request := current_time ;
|
||||
let prev =
|
||||
try Block_hash_map.find net_id acc
|
||||
with Not_found -> [] in
|
||||
Block_hash_map.add net_id (hash :: prev) acc
|
||||
end else
|
||||
acc)
|
||||
Block_hash_map.empty
|
||||
pendings in
|
||||
if Block_hash_map.is_empty protocols then
|
||||
0.
|
||||
else begin
|
||||
Block_hash_map.iter (fun _net_id -> net.request_protocols) protocols ;
|
||||
1. +. Random.float 4.
|
||||
end
|
||||
end
|
||||
|
||||
module Protocol =
|
||||
Persist.MakeImperativeProxy
|
||||
(Store.Faked_functional_protocol)
|
||||
(Protocol_hash_table) (Protocol_scheduler)
|
||||
|
@ -23,6 +23,8 @@ module type DISTRIBUTED_DB = sig
|
||||
val update: t -> key -> value -> bool Lwt.t
|
||||
val remove: t -> key -> bool Lwt.t
|
||||
val shutdown: t -> unit Lwt.t
|
||||
|
||||
val keys: t -> key list Lwt.t
|
||||
end
|
||||
|
||||
type operation_state = {
|
||||
@ -44,3 +46,13 @@ module Block :
|
||||
and type key := Store.Block.key
|
||||
and type value := Store.Block.value
|
||||
and type state := block_state
|
||||
|
||||
|
||||
type protocol_state = {
|
||||
request_protocols: Protocol_hash.t list -> unit ;
|
||||
}
|
||||
module Protocol :
|
||||
DISTRIBUTED_DB with type store := Store.Protocol.t
|
||||
and type key := Store.Protocol.key
|
||||
and type value := Store.Protocol.value
|
||||
and type state := protocol_state
|
||||
|
@ -24,6 +24,8 @@ module type STORE = sig
|
||||
val del: t -> key -> t Lwt.t
|
||||
val list: t -> key list -> key list Lwt.t
|
||||
val remove_rec: t -> key -> t Lwt.t
|
||||
|
||||
val keys : t -> key list Lwt.t
|
||||
end
|
||||
|
||||
module type BYTES_STORE = sig
|
||||
@ -35,6 +37,8 @@ module type BYTES_STORE = sig
|
||||
val del: t -> key -> t Lwt.t
|
||||
val list: t -> key list -> key list Lwt.t
|
||||
val remove_rec: t -> key -> t Lwt.t
|
||||
|
||||
val keys : t -> key list Lwt.t
|
||||
end
|
||||
|
||||
module type TYPED_STORE = sig
|
||||
@ -45,6 +49,8 @@ module type TYPED_STORE = sig
|
||||
val get: t -> key -> value option Lwt.t
|
||||
val set: t -> key -> value -> t Lwt.t
|
||||
val del: t -> key -> t Lwt.t
|
||||
|
||||
val keys: t -> key list Lwt.t
|
||||
end
|
||||
|
||||
module type KEY = sig
|
||||
@ -146,6 +152,7 @@ module MakeBytesStore
|
||||
let remove_rec s k =
|
||||
S.remove_rec s (to_path k)
|
||||
|
||||
let keys s = S.keys s >|= List.map of_path
|
||||
end
|
||||
|
||||
module MakeTypedStore
|
||||
@ -167,6 +174,7 @@ module MakeTypedStore
|
||||
|
||||
let raw_get = S.get
|
||||
|
||||
let keys = S.keys
|
||||
end
|
||||
|
||||
module RawKey = struct
|
||||
@ -369,6 +377,8 @@ module type IMPERATIVE_PROXY = sig
|
||||
val fetch: t -> rdata -> Store.key -> Store.value Lwt.t
|
||||
val pending: t -> Store.key -> bool
|
||||
val shutdown: t -> unit Lwt.t
|
||||
|
||||
val keys: t -> Store.key list Lwt.t
|
||||
end
|
||||
|
||||
module type IMPERATIVE_PROXY_SCHEDULER = sig
|
||||
@ -457,6 +467,8 @@ module MakeImperativeProxy
|
||||
let known { store } hash =
|
||||
use store (fun store -> Store.mem store hash)
|
||||
|
||||
let keys { store } = use store Store.keys
|
||||
|
||||
let read { store } hash =
|
||||
use store (fun store -> Store.get store hash)
|
||||
|
||||
@ -528,6 +540,8 @@ module MakeImperativeProxy
|
||||
let shutdown { cancel ; worker } =
|
||||
cancel () >>= fun () -> worker
|
||||
|
||||
let keys { store } =
|
||||
use store (fun store -> Store.keys store)
|
||||
end
|
||||
|
||||
(*-- Predefined Instances ----------------------------------------------------*)
|
||||
|
@ -27,6 +27,8 @@ module type STORE = sig
|
||||
val del: t -> key -> t Lwt.t
|
||||
val list: t -> key list -> key list Lwt.t
|
||||
val remove_rec: t -> key -> t Lwt.t
|
||||
|
||||
val keys : t -> key list Lwt.t
|
||||
end
|
||||
|
||||
(** Projection of OCaml keys of some abstract type to concrete storage
|
||||
@ -55,6 +57,8 @@ module type BYTES_STORE = sig
|
||||
val del: t -> key -> t Lwt.t
|
||||
val list: t -> key list -> key list Lwt.t
|
||||
val remove_rec: t -> key -> t Lwt.t
|
||||
|
||||
val keys : t -> key list Lwt.t
|
||||
end
|
||||
|
||||
module MakeBytesStore (S : STORE) (K : KEY) :
|
||||
@ -82,6 +86,8 @@ module type TYPED_STORE = sig
|
||||
val get: t -> key -> value option Lwt.t
|
||||
val set: t -> key -> value -> t Lwt.t
|
||||
val del: t -> key -> t Lwt.t
|
||||
|
||||
val keys: t -> key list Lwt.t (** Not always relevant, BEWARE! *)
|
||||
end
|
||||
|
||||
(** Gives a typed view of a store (values of a given type stored under
|
||||
@ -91,6 +97,7 @@ end
|
||||
module MakeTypedStore (S : STORE) (K : KEY) (C : VALUE) :
|
||||
TYPED_STORE with type t = S.t and type key = K.t and type value = C.t
|
||||
|
||||
|
||||
(** {2 Persistent Sets} ******************************************************)
|
||||
|
||||
(** Signature of a set as returned by {!MakePersistentSet} *)
|
||||
@ -194,6 +201,8 @@ module type IMPERATIVE_PROXY = sig
|
||||
val fetch: t -> rdata -> Store.key -> Store.value Lwt.t
|
||||
val pending: t -> Store.key -> bool
|
||||
val shutdown: t -> unit Lwt.t
|
||||
|
||||
val keys: t -> Store.key list Lwt.t
|
||||
end
|
||||
|
||||
module type IMPERATIVE_PROXY_SCHEDULER = sig
|
||||
|
@ -92,11 +92,13 @@ type generic_store = FS.t
|
||||
type block_store = FS.t
|
||||
type blockchain_store = FS.t
|
||||
type operation_store = FS.t
|
||||
type protocol_store = FS.t
|
||||
|
||||
type store = {
|
||||
block: block_store Persist.shared_ref ;
|
||||
blockchain: blockchain_store Persist.shared_ref ;
|
||||
operation: operation_store Persist.shared_ref ;
|
||||
protocol: protocol_store Persist.shared_ref ;
|
||||
global_store: generic_store Persist.shared_ref ;
|
||||
net_init: ?expiration:Time.t -> genesis -> net_store Lwt.t ;
|
||||
net_read: net_id -> net_store tzresult Lwt.t ;
|
||||
@ -126,6 +128,8 @@ module type TYPED_IMPERATIVE_STORE = sig
|
||||
val get_exn: t -> key -> value Lwt.t
|
||||
val set: t -> key -> value -> unit Lwt.t
|
||||
val del: t -> key -> unit Lwt.t
|
||||
|
||||
val keys: t -> key list Lwt.t
|
||||
end
|
||||
|
||||
module type IMPERATIVE_STORE = sig
|
||||
@ -146,6 +150,14 @@ module type KEY = sig
|
||||
val to_path: t -> string list
|
||||
end
|
||||
|
||||
module type HASHKEY = sig
|
||||
type t
|
||||
val to_path: t -> string list
|
||||
val of_path: string list -> t
|
||||
val prefix : string list
|
||||
val length : int
|
||||
end
|
||||
|
||||
module Raw_key = struct
|
||||
type t = string list
|
||||
let to_path x = x
|
||||
@ -187,6 +199,7 @@ module Errors_value = struct
|
||||
let of_bytes b = Data_encoding.(Binary.of_bytes (list (error_encoding ()))) b
|
||||
end
|
||||
|
||||
let undefined_key_fn = Lwt.fail_invalid_arg "function keys cannot be implemented in this module"
|
||||
|
||||
module Make (K : KEY) (V : Persist.VALUE) = struct
|
||||
type t = FS.t
|
||||
@ -205,6 +218,8 @@ module Make (K : KEY) (V : Persist.VALUE) = struct
|
||||
let del t k = FS.del t (K.to_path k)
|
||||
let list t ks = FS.list t (List.map K.to_path ks)
|
||||
let remove_rec t k = FS.remove_rec t (K.to_path k)
|
||||
|
||||
let keys _t = undefined_key_fn
|
||||
end
|
||||
|
||||
module Data_store : IMPERATIVE_STORE with type t = FS.t =
|
||||
@ -212,6 +227,7 @@ module Data_store : IMPERATIVE_STORE with type t = FS.t =
|
||||
|
||||
include Data_store
|
||||
|
||||
|
||||
(*-- Typed block store under "blocks/" ---------------------------------------*)
|
||||
|
||||
type shell_block = {
|
||||
@ -350,6 +366,7 @@ module Block = struct
|
||||
|
||||
let raw_get t k = Raw_block.get t k
|
||||
|
||||
let keys _t = undefined_key_fn (** We never list keys here *)
|
||||
end
|
||||
|
||||
module Blockchain_succ_key = struct
|
||||
@ -484,9 +501,111 @@ module Operation = struct
|
||||
let to_bytes = Raw_operation_value.to_bytes
|
||||
let hash op = Operation_hash.hash_bytes [to_bytes op]
|
||||
let raw_get t k = Raw_operation_data.get t k
|
||||
|
||||
let keys _t = undefined_key_fn (** We never list keys here *)
|
||||
end
|
||||
|
||||
|
||||
(*-- Typed operation store under "protocols/" -------------------------------*)
|
||||
|
||||
type component = {
|
||||
name : string ;
|
||||
interface : string option ;
|
||||
implementation : string ;
|
||||
}
|
||||
|
||||
let component_encoding =
|
||||
let open Data_encoding in
|
||||
conv
|
||||
(fun { name ; interface; implementation } -> (name, interface, implementation))
|
||||
(fun (name, interface, implementation) -> { name ; interface ; implementation })
|
||||
(obj3
|
||||
(req "name" string)
|
||||
(opt "interface" string)
|
||||
(req "implementation" string))
|
||||
|
||||
type protocol = component list
|
||||
let protocol_encoding = Data_encoding.list component_encoding
|
||||
|
||||
module Raw_protocol_value = struct
|
||||
type t = protocol
|
||||
let to_bytes v = Data_encoding.Binary.to_bytes protocol_encoding v
|
||||
let of_bytes b = Data_encoding.Binary.of_bytes protocol_encoding b
|
||||
end
|
||||
|
||||
module Raw_protocol_key = struct
|
||||
type t = Protocol_hash.t
|
||||
let to_path p = "protocols" :: Protocol_hash.to_path p @ [ "contents" ]
|
||||
end
|
||||
|
||||
module Protocol_data = Make (Raw_protocol_key) (Raw_protocol_value)
|
||||
module Raw_protocol_data = Make (Raw_protocol_key) (Raw_value)
|
||||
|
||||
module Protocol_time_key = struct
|
||||
type t = Protocol_hash.t
|
||||
let to_path p = "protocols" :: Protocol_hash.to_path p @ [ "discovery_time" ]
|
||||
end
|
||||
module Protocol_time = Make (Protocol_time_key) (Time_value)
|
||||
|
||||
module Protocol_errors_key = struct
|
||||
type t = Protocol_hash.t
|
||||
let to_path p = "protocols" :: Protocol_hash.to_path p @ [ "errors" ]
|
||||
end
|
||||
module Protocol_errors = Make (Protocol_errors_key) (Errors_value)
|
||||
|
||||
module Protocol = struct
|
||||
type t = FS.t
|
||||
type key = Protocol_hash.t
|
||||
type value = protocol tzresult Time.timed_data
|
||||
let mem = Protocol_data.mem
|
||||
let get s k =
|
||||
Protocol_time.get s k >>= function
|
||||
| None -> Lwt.return_none
|
||||
| Some time ->
|
||||
Protocol_errors.get s k >>= function
|
||||
| Some exns -> Lwt.return (Some { Time.data = Error exns ; time })
|
||||
| None ->
|
||||
Protocol_data.get s k >>= function
|
||||
| None -> Lwt.return_none
|
||||
| Some bytes -> Lwt.return (Some { Time.data = Ok bytes ; time })
|
||||
let get_exn s k =
|
||||
get s k >>= function
|
||||
| None -> Lwt.fail Not_found
|
||||
| Some x -> Lwt.return x
|
||||
let set s k { Time.data ; time } =
|
||||
Protocol_time.set s k time >>= fun () ->
|
||||
match data with
|
||||
| Ok bytes ->
|
||||
Protocol_data.set s k bytes >>= fun () ->
|
||||
Protocol_errors.del s k
|
||||
| Error exns ->
|
||||
Protocol_errors.set s k exns >>= fun () ->
|
||||
Protocol_data.del s k
|
||||
let del s k =
|
||||
Protocol_time.del s k >>= fun () ->
|
||||
Protocol_data.del s k >>= fun () ->
|
||||
Protocol_errors.del s k
|
||||
let of_bytes = Raw_protocol_value.of_bytes
|
||||
let to_bytes = Raw_protocol_value.to_bytes
|
||||
let hash proto = Protocol_hash.hash_bytes [to_bytes proto]
|
||||
let compare p1 p2 =
|
||||
Protocol_hash.(compare (hash_bytes [to_bytes p1]) (hash_bytes [to_bytes p2]))
|
||||
let equal b1 b2 = compare b1 b2 = 0
|
||||
let raw_get t k = Raw_protocol_data.get t k
|
||||
|
||||
let fold s x ~f =
|
||||
let rec dig i root acc =
|
||||
if i <= 0 then
|
||||
f (Protocol_hash.of_path @@ List.tl root) acc
|
||||
else
|
||||
FS.list s [root] >>= fun roots ->
|
||||
Lwt_list.fold_right_s (dig (i - 1)) roots acc
|
||||
in
|
||||
dig Protocol_hash.path_len ["protocols"] x
|
||||
|
||||
let keys s = fold s [] ~f:(fun k a -> Lwt.return @@ k :: a)
|
||||
end
|
||||
|
||||
(*- Genesis and initialization -----------------------------------------------*)
|
||||
|
||||
let genesis_encoding =
|
||||
@ -620,6 +739,7 @@ let init root =
|
||||
{ block = Persist.share t ;
|
||||
blockchain = Persist.share t ;
|
||||
operation = Persist.share t ;
|
||||
protocol = Persist.share t ;
|
||||
global_store = Persist.share t ;
|
||||
net_init = net_init ~root ;
|
||||
net_read = net_read ~root ;
|
||||
@ -638,6 +758,7 @@ end
|
||||
|
||||
module Faked_functional_operation = Faked_functional_typed_store (Operation)
|
||||
module Faked_functional_block = Faked_functional_typed_store (Block)
|
||||
module Faked_functional_protocol = Faked_functional_typed_store (Protocol)
|
||||
|
||||
module Faked_functional_store : Persist.STORE with type t = t
|
||||
= struct
|
||||
@ -645,4 +766,6 @@ module Faked_functional_store : Persist.STORE with type t = t
|
||||
let set s k v = Data_store.set s k v >>= fun () -> Lwt.return s
|
||||
let del s k = Data_store.del s k >>= fun () -> Lwt.return s
|
||||
let remove_rec s k = Data_store.remove_rec s k >>= fun () -> Lwt.return s
|
||||
|
||||
let keys _s = invalid_arg "function keys not implementable here" (** We never use list here. *)
|
||||
end
|
||||
|
@ -21,6 +21,7 @@ module type TYPED_IMPERATIVE_STORE = sig
|
||||
val get_exn: t -> key -> value Lwt.t
|
||||
val set: t -> key -> value -> unit Lwt.t
|
||||
val del: t -> key -> unit Lwt.t
|
||||
val keys: t -> key list Lwt.t
|
||||
end
|
||||
|
||||
module type IMPERATIVE_STORE = sig
|
||||
@ -39,11 +40,13 @@ type generic_store
|
||||
type block_store
|
||||
type blockchain_store
|
||||
type operation_store
|
||||
type protocol_store
|
||||
|
||||
type store = private {
|
||||
block: block_store Persist.shared_ref ;
|
||||
blockchain: blockchain_store Persist.shared_ref ;
|
||||
operation: operation_store Persist.shared_ref ;
|
||||
protocol: protocol_store Persist.shared_ref ;
|
||||
global_store: generic_store Persist.shared_ref ;
|
||||
net_init: ?expiration:Time.t -> genesis -> net_store Lwt.t ;
|
||||
net_read: net_id -> net_store tzresult Lwt.t ;
|
||||
@ -70,6 +73,9 @@ val pp_net_id: Format.formatter -> net_id -> unit
|
||||
(** Open or initialize a store at a given path. *)
|
||||
val init: string -> store Lwt.t
|
||||
|
||||
(** Lwt exn returned when function keys is not implemented *)
|
||||
val undefined_key_fn : 'a Lwt.t
|
||||
|
||||
(** {2 Generic interface} ****************************************************)
|
||||
|
||||
(** The generic primitives do work on the direct root, but in a
|
||||
@ -107,6 +113,16 @@ type block = {
|
||||
val shell_block_encoding: shell_block Data_encoding.t
|
||||
val block_encoding: block Data_encoding.t
|
||||
|
||||
(** Protocol *)
|
||||
type component = {
|
||||
name : string ;
|
||||
interface : string option ;
|
||||
implementation : string ;
|
||||
}
|
||||
|
||||
type protocol = component list
|
||||
val protocol_encoding : protocol Data_encoding.t
|
||||
|
||||
(** {2 Block and operations store} ********************************************)
|
||||
|
||||
module Block : sig
|
||||
@ -177,6 +193,19 @@ module Operation : sig
|
||||
|
||||
end
|
||||
|
||||
module Protocol : sig
|
||||
val of_bytes: MBytes.t -> protocol option
|
||||
val to_bytes: protocol -> MBytes.t
|
||||
val hash: protocol -> Protocol_hash.t
|
||||
|
||||
include TYPED_IMPERATIVE_STORE
|
||||
with type t = protocol_store
|
||||
and type key = Protocol_hash.t
|
||||
and type value = protocol tzresult Time.timed_data
|
||||
|
||||
val raw_get: t -> Protocol_hash.t -> MBytes.t option Lwt.t
|
||||
end
|
||||
|
||||
(**/**) (* For testing only *)
|
||||
|
||||
(* module LwtUnixStore : sig *)
|
||||
@ -198,4 +227,9 @@ module Faked_functional_block :
|
||||
and type value = Block.value
|
||||
and type key = Block.key
|
||||
|
||||
module Faked_functional_protocol :
|
||||
Persist.TYPED_STORE with type t = Protocol.t
|
||||
and type value = Protocol.value
|
||||
and type key = Protocol.key
|
||||
|
||||
module Faked_functional_store : Persist.STORE with type t = t
|
||||
|
@ -25,8 +25,8 @@ type message =
|
||||
| Get_operations of Operation_hash.t list
|
||||
| Operation of MBytes.t
|
||||
|
||||
| Current_protocol of net_id
|
||||
| Protocol_inventory of Protocol_hash.t
|
||||
| Get_protocols of Protocol_hash.t list
|
||||
| Protocol of MBytes.t
|
||||
|
||||
|
||||
let to_frame msg =
|
||||
@ -54,10 +54,10 @@ let to_frame msg =
|
||||
| Operation b ->
|
||||
[ S 2703 ; B b ]
|
||||
|
||||
| Current_protocol (Net net_id) ->
|
||||
[ S 2800 ; bh net_id ]
|
||||
| Protocol_inventory p ->
|
||||
[ S 2801 ; ph p ]
|
||||
| Get_protocols protos ->
|
||||
[ S 2800 ; F (List.map ph protos) ]
|
||||
| Protocol p ->
|
||||
[ S 2801 ; B p ]
|
||||
|
||||
let from_frame msg =
|
||||
|
||||
@ -82,9 +82,9 @@ let from_frame msg =
|
||||
Some (Get_operations (List.map oph ops))
|
||||
| [ S 2703 ; B contents ] -> Some (Operation contents)
|
||||
|
||||
| [ S 2800 ; B netid ] -> Some (Current_protocol (net netid))
|
||||
| [ S 2800 ; F protos ] -> Some (Get_protocols (List.map ph protos))
|
||||
|
||||
| [ S 2801 ; p ] -> Some (Protocol_inventory (ph p))
|
||||
| [ S 2801 ; B contents ] -> Some (Protocol contents)
|
||||
|
||||
| _ -> None
|
||||
|
||||
|
@ -22,8 +22,9 @@ type message =
|
||||
| Get_operations of Operation_hash.t list
|
||||
| Operation of MBytes.t
|
||||
|
||||
| Current_protocol of Store.net_id
|
||||
| Protocol_inventory of Protocol_hash.t
|
||||
| Get_protocols of Protocol_hash.t list
|
||||
| Protocol of MBytes.t
|
||||
|
||||
|
||||
(** Converts a high level message to a network frame *)
|
||||
val to_frame: message -> Netbits.frame
|
||||
|
@ -24,6 +24,13 @@ let inject_operation validator ?force bytes =
|
||||
let hash = Operation_hash.hash_bytes [bytes] in
|
||||
Lwt.return (hash, t)
|
||||
|
||||
let inject_protocol state ?force:_ proto =
|
||||
(* TODO: Validate the protocol *)
|
||||
let proto_bytes = Store.Protocol.to_bytes proto in
|
||||
let hash = Protocol_hash.hash_bytes [proto_bytes] in
|
||||
let t = State.Protocol.store state proto_bytes >>|? ignore in
|
||||
Lwt.return (hash, t)
|
||||
|
||||
let process_operation state validator bytes =
|
||||
State.Operation.store state bytes >>= function
|
||||
| Error _ | Ok None -> Lwt.return_unit
|
||||
@ -38,6 +45,13 @@ let process_operation state validator bytes =
|
||||
Prevalidator.register_operation prevalidator hash ;
|
||||
Lwt.return_unit
|
||||
|
||||
let process_protocol state _validator bytes =
|
||||
State.Protocol.store state bytes >>= function
|
||||
| Error _ | Ok None -> Lwt.return_unit
|
||||
| Ok (Some (hash, _proto)) ->
|
||||
(* TODO: Store only pending protocols... *)
|
||||
lwt_log_info "process Protocol %a" Protocol_hash.pp_short hash
|
||||
|
||||
let process_block state validator bytes =
|
||||
State.Block.store state bytes >>= function
|
||||
| Error _ | Ok None -> Lwt.return_unit
|
||||
@ -144,23 +158,20 @@ let process state validator msg =
|
||||
process_operation state validator content >>= fun () ->
|
||||
Lwt.return_nil
|
||||
|
||||
| Current_protocol net_id ->
|
||||
lwt_log_info "process Current_protocol" >>= fun () ->
|
||||
if not (State.Net.is_active state net_id) then
|
||||
Lwt.return_nil
|
||||
else begin
|
||||
match State.Net.get state net_id with
|
||||
| Error _ -> Lwt.return_nil
|
||||
| Ok net ->
|
||||
State.Net.Blockchain.head net >>= fun head ->
|
||||
Lwt.return [Protocol_inventory head.protocol_hash]
|
||||
end
|
||||
| Get_protocols protos ->
|
||||
lwt_log_info "process Get_protocols" >>= fun () ->
|
||||
Lwt_list.map_p (State.Protocol.raw_read state) protos >>= fun protos ->
|
||||
let cons_protocol acc = function
|
||||
| Some proto -> Protocol proto :: acc
|
||||
| None -> acc in
|
||||
Lwt.return (List.fold_left cons_protocol [] protos)
|
||||
|
||||
| Protocol_inventory _ ->
|
||||
lwt_log_info "process Protocol_inventory" >>= fun () ->
|
||||
(* TODO... *)
|
||||
| Protocol content ->
|
||||
lwt_log_info "process Protocol" >>= fun () ->
|
||||
process_protocol state validator content >>= fun () ->
|
||||
Lwt.return_nil
|
||||
|
||||
|
||||
type t = {
|
||||
state: State.t ;
|
||||
validator: Validator.worker ;
|
||||
@ -170,6 +181,8 @@ type t = {
|
||||
?force:bool -> MBytes.t -> (Block_hash.t * unit tzresult Lwt.t) Lwt.t ;
|
||||
inject_operation:
|
||||
?force:bool -> MBytes.t -> (Operation_hash.t * unit tzresult Lwt.t) Lwt.t ;
|
||||
inject_protocol:
|
||||
?force:bool -> Store.protocol -> (Protocol_hash.t * unit tzresult Lwt.t) Lwt.t ;
|
||||
shutdown: unit -> unit Lwt.t ;
|
||||
}
|
||||
|
||||
@ -184,6 +197,11 @@ let request_blocks net _net_id blocks =
|
||||
For now simply broadcast the request to all our neighbours. *)
|
||||
P2p.broadcast (Messages.(to_frame (Get_blocks blocks))) net
|
||||
|
||||
let request_protocols net protocols =
|
||||
(* TODO improve the lookup strategy.
|
||||
For now simply broadcast the request to all our neighbours. *)
|
||||
P2p.broadcast (Messages.(to_frame (Get_protocols protocols))) net
|
||||
|
||||
let init_p2p net_params =
|
||||
match net_params with
|
||||
| None ->
|
||||
@ -200,8 +218,9 @@ let create
|
||||
lwt_log_info "reading state..." >>= fun () ->
|
||||
let request_operations = request_operations p2p in
|
||||
let request_blocks = request_blocks p2p in
|
||||
let request_protocols = request_protocols p2p in
|
||||
State.read
|
||||
~request_operations ~request_blocks
|
||||
~request_operations ~request_blocks ~request_protocols
|
||||
~store_root ~context_root ~ttl:(48 * 3600) (* 2 days *)
|
||||
?patch_context () >>= fun state ->
|
||||
let validator = Validator.create_worker p2p state in
|
||||
@ -264,6 +283,7 @@ let create
|
||||
global_validator ;
|
||||
inject_block = inject_block state validator ;
|
||||
inject_operation = inject_operation validator ;
|
||||
inject_protocol = inject_protocol state ;
|
||||
shutdown ;
|
||||
}
|
||||
|
||||
@ -310,6 +330,7 @@ module RPC = struct
|
||||
|
||||
let inject_block node = node.inject_block
|
||||
let inject_operation node = node.inject_operation
|
||||
let inject_protocol node = node.inject_protocol
|
||||
|
||||
let raw_block_info node hash =
|
||||
State.Valid_block.read_exn node.state hash >|= convert
|
||||
@ -449,6 +470,11 @@ module RPC = struct
|
||||
State.Net.Mempool.for_block net b >|= fun ops ->
|
||||
Updater.empty_result, ops
|
||||
|
||||
let protocols { state } = State.Protocol.keys state
|
||||
|
||||
let protocol_content node hash =
|
||||
State.Protocol.read node.state hash
|
||||
|
||||
let preapply node block ~timestamp ~sort ops =
|
||||
begin
|
||||
match block with
|
||||
@ -539,6 +565,9 @@ module RPC = struct
|
||||
let operation_watcher node =
|
||||
State.Operation.create_watcher node.state
|
||||
|
||||
let protocol_watcher node =
|
||||
State.Protocol.create_watcher node.state
|
||||
|
||||
let validate node net_id block =
|
||||
Validator.get node.validator net_id >>=? fun net_v ->
|
||||
Validator.fetch_block net_v block >>=? fun _ ->
|
||||
|
@ -29,6 +29,8 @@ module RPC : sig
|
||||
t -> ?force:bool -> MBytes.t -> (Block_hash.t * unit tzresult Lwt.t) Lwt.t
|
||||
val inject_operation:
|
||||
t -> ?force:bool -> MBytes.t -> (Operation_hash.t * unit tzresult Lwt.t) Lwt.t
|
||||
val inject_protocol:
|
||||
t -> ?force:bool -> Store.protocol -> (Protocol_hash.t * unit tzresult Lwt.t) Lwt.t
|
||||
|
||||
val raw_block_info:
|
||||
t -> Block_hash.t -> block_info Lwt.t
|
||||
@ -54,6 +56,13 @@ module RPC : sig
|
||||
val pending_operations:
|
||||
t -> block -> (error Updater.preapply_result * Operation_hash_set.t) Lwt.t
|
||||
|
||||
val protocols:
|
||||
t -> Protocol_hash.t list Lwt.t
|
||||
val protocol_content:
|
||||
t -> Protocol_hash.t -> Store.protocol tzresult Time.timed_data option Lwt.t
|
||||
val protocol_watcher:
|
||||
t -> (Protocol_hash.t * Store.protocol) Lwt_stream.t * (unit -> unit)
|
||||
|
||||
val context_dir:
|
||||
t -> block -> 'a RPC.directory option Lwt.t
|
||||
|
||||
|
@ -332,6 +332,42 @@ let get_operations node hash () =
|
||||
| Some bytes -> RPC.Answer.return bytes
|
||||
| None -> raise Not_found
|
||||
|
||||
let list_protocols node {Services.Protocols.monitor; contents} =
|
||||
let monitor = match monitor with None -> false | Some x -> x in
|
||||
let include_contents = match contents with None -> false | Some x -> x in
|
||||
Node.RPC.protocols node >>= fun protocols ->
|
||||
Lwt_list.map_p
|
||||
(fun hash ->
|
||||
if include_contents then
|
||||
Node.RPC.protocol_content node hash >>= function
|
||||
| None | Some { Time.data = Error _ } -> Lwt.return (hash, None)
|
||||
| Some { Time.data = Ok bytes }->
|
||||
Lwt.return (hash, Some bytes)
|
||||
else
|
||||
Lwt.return (hash, None))
|
||||
protocols >>= fun protocols ->
|
||||
if not monitor then
|
||||
RPC.Answer.return protocols
|
||||
else
|
||||
let stream, shutdown = Node.RPC.protocol_watcher node in
|
||||
let first_request = ref true in
|
||||
let next () =
|
||||
if not !first_request then
|
||||
Lwt_stream.get stream >>= function
|
||||
| None -> Lwt.return_none
|
||||
| Some (h, op) when include_contents -> Lwt.return (Some [h, Some op])
|
||||
| Some (h, _) -> Lwt.return (Some [h, None])
|
||||
else begin
|
||||
first_request := false ;
|
||||
Lwt.return (Some protocols)
|
||||
end in
|
||||
RPC.Answer.return_stream { next ; shutdown }
|
||||
|
||||
let get_protocols node hash () =
|
||||
Node.RPC.protocol_content node hash >>= function
|
||||
| Some bytes -> RPC.Answer.return bytes
|
||||
| None -> raise Not_found
|
||||
|
||||
let build_rpc_directory node =
|
||||
let dir = RPC.empty in
|
||||
let dir = RPC.register0 dir Services.Blocks.list (list_blocks node) in
|
||||
@ -351,6 +387,10 @@ let build_rpc_directory node =
|
||||
RPC.register0 dir Services.Operations.list (list_operations node) in
|
||||
let dir =
|
||||
RPC.register1 dir Services.Operations.bytes (get_operations node) in
|
||||
let dir =
|
||||
RPC.register0 dir Services.Protocols.list (list_protocols node) in
|
||||
let dir =
|
||||
RPC.register1 dir Services.Protocols.bytes (get_protocols node) in
|
||||
let dir =
|
||||
let implementation (net_id, pred, time, fitness, operations, header) =
|
||||
Node.RPC.block_info node (`Head 0) >>= fun bi ->
|
||||
@ -383,6 +423,13 @@ let build_rpc_directory node =
|
||||
(if blocking then wait else return ()) >>=? fun () -> return hash
|
||||
end >>= RPC.Answer.return in
|
||||
RPC.register0 dir Services.inject_operation implementation in
|
||||
let dir =
|
||||
let implementation (proto, blocking, force) =
|
||||
Node.RPC.inject_protocol ?force node proto >>= fun (hash, wait) ->
|
||||
begin
|
||||
(if blocking then wait else return ()) >>=? fun () -> return hash
|
||||
end >>= RPC.Answer.return in
|
||||
RPC.register0 dir Services.inject_protocol implementation in
|
||||
let dir =
|
||||
let implementation () =
|
||||
RPC.Answer.return Data_encoding.Json.(schema (Error_monad.error_encoding ())) in
|
||||
|
@ -383,6 +383,56 @@ module Operations = struct
|
||||
|
||||
end
|
||||
|
||||
module Protocols = struct
|
||||
let protocols_arg =
|
||||
let name = "protocol_id" in
|
||||
let descr =
|
||||
"A protocol identifier in hexadecimal." in
|
||||
let construct = Protocol_hash.to_b48check in
|
||||
let destruct h =
|
||||
try Ok (Protocol_hash.of_b48check h)
|
||||
with _ -> Error "Can't parse hash" in
|
||||
RPC.Arg.make ~name ~descr ~construct ~destruct ()
|
||||
|
||||
let bytes =
|
||||
RPC.service
|
||||
~input: empty
|
||||
~output:
|
||||
(obj1 (req "data"
|
||||
(describe ~title: "Tezos protocol"
|
||||
(Time.timed_encoding @@
|
||||
RPC.Error.wrap @@
|
||||
Store.protocol_encoding))))
|
||||
RPC.Path.(root / "protocols" /: protocols_arg)
|
||||
|
||||
type list_param = {
|
||||
contents: bool option ;
|
||||
monitor: bool option ;
|
||||
}
|
||||
|
||||
let list_param_encoding =
|
||||
conv
|
||||
(fun {contents; monitor} -> (contents, monitor))
|
||||
(fun (contents, monitor) -> {contents; monitor})
|
||||
(obj2
|
||||
(opt "contents" bool)
|
||||
(opt "monitor" bool))
|
||||
|
||||
let list =
|
||||
RPC.service
|
||||
~input: list_param_encoding
|
||||
~output:
|
||||
(obj1
|
||||
(req "protocols"
|
||||
(list
|
||||
(obj2
|
||||
(req "hash" Protocol_hash.encoding)
|
||||
(opt "contents"
|
||||
(dynamic_size Store.protocol_encoding)))
|
||||
)))
|
||||
RPC.Path.(root / "protocols")
|
||||
end
|
||||
|
||||
let forge_block =
|
||||
RPC.service
|
||||
~description: "Forge a block header"
|
||||
@ -480,6 +530,59 @@ let inject_operation =
|
||||
(obj1 (req "injectedOperation" Operation_hash.encoding)))
|
||||
RPC.Path.(root / "inject_operation")
|
||||
|
||||
let inject_protocol =
|
||||
let proto =
|
||||
(list
|
||||
(obj3
|
||||
(req "name"
|
||||
(describe ~title:"OCaml module name"
|
||||
string))
|
||||
(opt "interface"
|
||||
(describe
|
||||
~description:"Content of the .mli file"
|
||||
string))
|
||||
(req "implementation"
|
||||
(describe
|
||||
~description:"Content of the .ml file"
|
||||
string))))
|
||||
in
|
||||
let proto_of_rpc =
|
||||
List.map (fun (name, interface, implementation) ->
|
||||
{ Store.name; interface; implementation })
|
||||
in
|
||||
let rpc_of_proto =
|
||||
List.map (fun { Store.name; interface; implementation } ->
|
||||
(name, interface, implementation))
|
||||
in
|
||||
RPC.service
|
||||
~description:
|
||||
"Inject a protocol in node. Returns the ID of the protocol."
|
||||
~input:
|
||||
(conv
|
||||
(fun (proto, blocking, force) -> (rpc_of_proto proto, Some blocking, force))
|
||||
(fun (proto, blocking, force) -> (proto_of_rpc proto, unopt true blocking, force))
|
||||
(obj3
|
||||
(req "protocol"
|
||||
(describe ~title: "Tezos protocol"
|
||||
proto))
|
||||
(opt "blocking"
|
||||
(describe
|
||||
~description:
|
||||
"Should the RPC wait for the protocol to be \
|
||||
validated before to answer. (default: true)"
|
||||
bool))
|
||||
(opt "force"
|
||||
(describe
|
||||
~description:
|
||||
"Should we inject protocol that is invalid. (default: false)"
|
||||
bool))))
|
||||
~output:
|
||||
(RPC.Error.wrap @@
|
||||
describe
|
||||
~title: "Hash of the injected protocol" @@
|
||||
(obj1 (req "injectedProtocol" Protocol_hash.encoding)))
|
||||
RPC.Path.(root / "inject_protocol")
|
||||
|
||||
let describe =
|
||||
RPC.Description.service
|
||||
~description: "RPCs documentation and input/output schema"
|
||||
|
@ -97,6 +97,19 @@ module Operations : sig
|
||||
list_param, (Operation_hash.t * Store.operation option) list) RPC.service
|
||||
end
|
||||
|
||||
module Protocols : sig
|
||||
val bytes:
|
||||
(unit, unit * Protocol_hash.t, unit,
|
||||
Store.protocol tzresult Time.timed_data) RPC.service
|
||||
type list_param = {
|
||||
contents: bool option ;
|
||||
monitor: bool option ;
|
||||
}
|
||||
val list:
|
||||
(unit, unit,
|
||||
list_param, (Protocol_hash.t * Store.protocol option) list) RPC.service
|
||||
end
|
||||
|
||||
val forge_block:
|
||||
(unit, unit,
|
||||
Updater.net_id option * Block_hash.t option * Time.t option *
|
||||
@ -115,5 +128,9 @@ val inject_operation:
|
||||
(unit, unit,
|
||||
(MBytes.t * bool * bool option), Operation_hash.t tzresult) RPC.service
|
||||
|
||||
val inject_protocol:
|
||||
(unit, unit,
|
||||
(Store.protocol * bool * bool option), Protocol_hash.t tzresult) RPC.service
|
||||
|
||||
val describe:
|
||||
(unit, unit, bool option, RPC.Description.directory_descr) RPC.service
|
||||
|
@ -82,6 +82,9 @@ type t = {
|
||||
operation_db: Db_proxy.Operation.t ;
|
||||
operation_watchers:
|
||||
(Operation_hash.t * Store.operation) Watcher.t list ref ;
|
||||
protocol_db: Db_proxy.Protocol.t ;
|
||||
protocol_watchers:
|
||||
(Protocol_hash.t * Store.protocol) Watcher.t list ref ;
|
||||
valid_block_state: valid_block_state Persist.shared_ref ;
|
||||
}
|
||||
|
||||
@ -154,6 +157,15 @@ module InvalidOperations =
|
||||
Persist.MakeBufferedPersistentSet
|
||||
(Store.Faked_functional_store) (InvalidOperations_key) (Operation_hash_set)
|
||||
|
||||
module InvalidProtocols_key = struct
|
||||
include Protocol_hash
|
||||
let prefix = ["state"; "invalid_protocols"]
|
||||
let length = path_len
|
||||
end
|
||||
module InvalidProtocols =
|
||||
Persist.MakeBufferedPersistentSet
|
||||
(Store.Faked_functional_store) (InvalidProtocols_key) (Protocol_hash_set)
|
||||
|
||||
module InvalidBlocks_key = struct
|
||||
include Block_hash
|
||||
let prefix = ["state"; "invalid_blocks"]
|
||||
@ -236,6 +248,66 @@ module Operation = struct
|
||||
|
||||
end
|
||||
|
||||
module Protocol = struct
|
||||
type key = Store.Protocol.key
|
||||
|
||||
type component = Store.component = {
|
||||
name: string;
|
||||
interface: string option;
|
||||
implementation: string
|
||||
}
|
||||
|
||||
type t = Store.protocol
|
||||
|
||||
type protocol = t
|
||||
exception Invalid of key * error list
|
||||
let of_bytes = Store.Protocol.of_bytes
|
||||
let to_bytes = Store.Protocol.to_bytes
|
||||
let known t k = Db_proxy.Protocol.known t.protocol_db k
|
||||
let read t k = Db_proxy.Protocol.read t.protocol_db k
|
||||
let read_exn t k =
|
||||
Db_proxy.Protocol.read t.protocol_db k >>= function
|
||||
| None -> Lwt.fail Not_found
|
||||
| Some { data = Error e } -> Lwt.fail (Invalid (k, e))
|
||||
| Some { data = Ok data ; time } -> Lwt.return { Time.data ; time }
|
||||
let hash = Store.Protocol.hash
|
||||
let raw_read t k =
|
||||
Persist.use t.store.Store.protocol
|
||||
(fun store -> Store.Protocol.raw_get store k)
|
||||
let prefetch t net_id ks =
|
||||
List.iter (Db_proxy.Protocol.prefetch t.protocol_db net_id) ks
|
||||
let fetch t net_id k = Db_proxy.Protocol.fetch t.protocol_db net_id k
|
||||
let store t bytes =
|
||||
match of_bytes bytes with
|
||||
| None -> fail Cannot_parse
|
||||
| Some proto ->
|
||||
let h = hash proto in
|
||||
Db_proxy.Protocol.store t.protocol_db h (Time.make_timed (Ok proto))
|
||||
>>= function
|
||||
| true ->
|
||||
Watcher.notify !(t.protocol_watchers) (h, proto) ;
|
||||
return (Some (h, proto))
|
||||
| false ->
|
||||
return None
|
||||
let mark_invalid t k e =
|
||||
Db_proxy.Protocol.update t.protocol_db k (Time.make_timed (Error e))
|
||||
>>= function
|
||||
| true ->
|
||||
Persist.update t.store.global_store (fun store ->
|
||||
InvalidProtocols.set store k >>= fun store ->
|
||||
Lwt.return (Some store)) >>= fun _ ->
|
||||
Lwt.return true
|
||||
| false -> Lwt.return false
|
||||
|
||||
let invalid state =
|
||||
Persist.use state.store.global_store InvalidProtocols.read
|
||||
|
||||
let create_watcher t = Watcher.create_stream t.protocol_watchers ()
|
||||
|
||||
let keys { protocol_db } = Db_proxy.Protocol.keys protocol_db
|
||||
|
||||
end
|
||||
|
||||
let iter_predecessors
|
||||
(type t)
|
||||
(compare: t -> t -> int)
|
||||
@ -458,7 +530,7 @@ module Valid_block = struct
|
||||
hash: Block_hash.t ;
|
||||
pred: Block_hash.t ;
|
||||
timestamp: Time.t ;
|
||||
fitness: Protocol.fitness ;
|
||||
fitness: Fitness.fitness ;
|
||||
operations: Operation_hash.t list ;
|
||||
discovery_time: Time.t ;
|
||||
protocol_hash: Protocol_hash.t ;
|
||||
@ -785,6 +857,8 @@ module Valid_block = struct
|
||||
| Error exns ->
|
||||
locked_store_invalid vstate hash exns >>= fun _changed ->
|
||||
Lwt.return vstate
|
||||
|
||||
let keys _ = Store.undefined_key_fn
|
||||
end
|
||||
|
||||
let iter_predecessors =
|
||||
@ -1216,12 +1290,14 @@ let () =
|
||||
(** Whole protocol state : read and store. *)
|
||||
|
||||
let read
|
||||
~request_operations ~request_blocks
|
||||
~request_operations ~request_blocks ~request_protocols
|
||||
~store_root ~context_root ~ttl ?patch_context () =
|
||||
Store.init store_root >>= fun store ->
|
||||
lwt_log_info "Initialising the distributed database..." >>= fun () ->
|
||||
let operation_db =
|
||||
Db_proxy.Operation.create { request_operations } store.operation in
|
||||
let protocol_db =
|
||||
Db_proxy.Protocol.create { request_protocols } store.protocol in
|
||||
let block_db =
|
||||
Db_proxy.Block.create { request_blocks } store.block in
|
||||
Valid_block.create
|
||||
@ -1233,6 +1309,8 @@ let read
|
||||
nets = Block_hash_table.create 7 ;
|
||||
operation_db ;
|
||||
operation_watchers = ref [] ;
|
||||
protocol_db ;
|
||||
protocol_watchers = ref [] ;
|
||||
block_db ; block_watchers = ref [] ;
|
||||
valid_block_state ;
|
||||
}
|
||||
|
@ -39,6 +39,7 @@ type error +=
|
||||
val read:
|
||||
request_operations: (net_id -> Operation_hash.t list -> unit) ->
|
||||
request_blocks: (net_id -> Block_hash.t list -> unit) ->
|
||||
request_protocols: (Protocol_hash.t list -> unit) ->
|
||||
store_root:string ->
|
||||
context_root:string ->
|
||||
ttl:int ->
|
||||
@ -342,6 +343,78 @@ module Valid_block : sig
|
||||
|
||||
end
|
||||
|
||||
(** {2 Protocol database} ****************************************************)
|
||||
|
||||
(** The local and distributed database of protocols. *)
|
||||
module Protocol : sig
|
||||
|
||||
type key = Protocol_hash.t
|
||||
|
||||
type component = Store.component = {
|
||||
name : string ;
|
||||
interface : string option ;
|
||||
implementation : string ;
|
||||
}
|
||||
|
||||
type t = Store.protocol
|
||||
|
||||
type protocol = t
|
||||
|
||||
(** Is a protocol stored in the local database ? *)
|
||||
val known: state -> key -> bool Lwt.t
|
||||
|
||||
(** Read a protocol in the local database. This returns [None]
|
||||
when the protocol does not exist in the local database; this returns
|
||||
[Some (Error _)] when [mark_invalid] was used. This also returns
|
||||
the time when the protocol was stored on the local database. *)
|
||||
val read:
|
||||
state -> key -> protocol tzresult Time.timed_data option Lwt.t
|
||||
|
||||
(** Read a protocol in the local database. This throws [Not_found]
|
||||
when the protocol does not exist in the local database or when
|
||||
[mark_invalid] was used. *)
|
||||
val read_exn:
|
||||
state -> key -> protocol Time.timed_data Lwt.t
|
||||
exception Invalid of key * error list
|
||||
|
||||
(** Read an operation in the local database (without parsing). *)
|
||||
val raw_read: state -> key -> MBytes.t option Lwt.t
|
||||
|
||||
(** Read a protocol from the distributed database. This may block
|
||||
while the block is fetched from the P2P network. *)
|
||||
val fetch:
|
||||
state -> Store.net_id -> key -> protocol tzresult Time.timed_data Lwt.t
|
||||
|
||||
(** Request protocols on the P2P network without waiting for answers. *)
|
||||
val prefetch: state -> Store.net_id -> key list -> unit
|
||||
|
||||
(** Add a protocol to the local database. This returns [Ok None]
|
||||
if the protocol was already stored in the database, or returns
|
||||
the parsed operation if not. It may also fails when the shell
|
||||
part of the operation cannot be parsed or when the operation
|
||||
does not belong to an active "network". For a given sequence of
|
||||
bytes, it is guaranted that at most one call to [store] returns
|
||||
[Some _]. *)
|
||||
val store:
|
||||
state -> MBytes.t -> (Protocol_hash.t * protocol) option tzresult Lwt.t
|
||||
|
||||
(** Mark a protocol as invalid in the local database. This returns
|
||||
[false] if the protocol was previously stored in the local
|
||||
database. The protocol is not removed from the local database,
|
||||
but its content is replaced by a list of errors. *)
|
||||
val mark_invalid: state -> key -> error list -> bool Lwt.t
|
||||
|
||||
(** Returns the list known-invalid procols. *)
|
||||
val invalid: state -> Protocol_hash_set.t Lwt.t
|
||||
|
||||
(** Create a stream of all the newly locally-stored protocols.
|
||||
The returned function allows to terminate the stream. *)
|
||||
val create_watcher:
|
||||
state -> (key * protocol) Lwt_stream.t * (unit -> unit)
|
||||
|
||||
val keys: state -> key list Lwt.t
|
||||
end
|
||||
|
||||
(** {2 Network} ****************************************************************)
|
||||
|
||||
(** Data specific to a given network. *)
|
||||
|
@ -131,7 +131,7 @@ let get_basedir () =
|
||||
let init dir =
|
||||
basedir := Some dir
|
||||
|
||||
type component = {
|
||||
type component = Store.component = {
|
||||
name : string ;
|
||||
interface : string option ;
|
||||
implementation : string ;
|
||||
|
@ -18,6 +18,8 @@ module type STORE = sig
|
||||
val del: t -> key -> t Lwt.t
|
||||
val list: t -> key list -> key list Lwt.t
|
||||
val remove_rec: t -> key -> t Lwt.t
|
||||
|
||||
val keys: t -> key list Lwt.t
|
||||
end
|
||||
|
||||
(** Projection of OCaml keys of some abstract type to concrete storage
|
||||
@ -57,6 +59,8 @@ module type BYTES_STORE = sig
|
||||
val del: t -> key -> t Lwt.t
|
||||
val list: t -> key list -> key list Lwt.t
|
||||
val remove_rec: t -> key -> t Lwt.t
|
||||
|
||||
val keys: t -> key list Lwt.t
|
||||
end
|
||||
|
||||
module MakeBytesStore (S : STORE) (K : KEY) :
|
||||
@ -73,6 +77,8 @@ module type TYPED_STORE = sig
|
||||
val get: t -> key -> value option Lwt.t
|
||||
val set: t -> key -> value -> t Lwt.t
|
||||
val del: t -> key -> t Lwt.t
|
||||
|
||||
val keys: t -> key list Lwt.t
|
||||
end
|
||||
|
||||
(** Gives a typed view of a store (values of a given type stored under
|
||||
|
Loading…
Reference in New Issue
Block a user