185 lines
5.8 KiB
OCaml
185 lines
5.8 KiB
OCaml
(**************************************************************************)
|
|
(* *)
|
|
(* Copyright (c) 2014 - 2018. *)
|
|
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
|
(* *)
|
|
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
|
(* *)
|
|
(**************************************************************************)
|
|
|
|
(** Tezos Shell - High-level API for the Gossip network and local
|
|
storage (helpers). *)
|
|
|
|
(** {1 Indexes} *)
|
|
|
|
(** Generic interface for a "distributed" index.
|
|
|
|
By "distributed", it means that this interface abstract the p2p
|
|
gossip layer and it is able to fetch missing data from known
|
|
peers in a "synchronous" interface.
|
|
|
|
*)
|
|
module type DISTRIBUTED_DB = sig
|
|
|
|
type t
|
|
|
|
(** The index key *)
|
|
type key
|
|
|
|
(** The indexed data *)
|
|
type value
|
|
|
|
(** An extra parameter for the network lookup, usually
|
|
used for prevalidating data. *)
|
|
type param
|
|
|
|
(** Is the value known locally? *)
|
|
val known: t -> key -> bool Lwt.t
|
|
|
|
type error += Missing_data of key
|
|
type error += Canceled of key
|
|
type error += Timeout of key
|
|
|
|
(** Return the value if it is known locally, otherwise fail with
|
|
the error [Missing_data]. *)
|
|
val read: t -> key -> value tzresult Lwt.t
|
|
|
|
(** Return the value if it is known locally, otherwise fail with
|
|
the value [None]. *)
|
|
val read_opt: t -> key -> value option Lwt.t
|
|
|
|
(** Return the value if it is known locally, otherwise fail with
|
|
the exception [Not_found]. *)
|
|
val read_exn: t -> key -> value Lwt.t
|
|
|
|
(** Same as `fetch` but the call is non-blocking: the data will be
|
|
stored in the local index when received. *)
|
|
val prefetch:
|
|
t ->
|
|
?peer:P2p_peer.Id.t ->
|
|
?timeout:float ->
|
|
key -> param -> unit
|
|
|
|
(** Return the value if it is known locally, or block until the data
|
|
is received from the network. By default, the data will be
|
|
requested to all the active peers in the network; if the [peer]
|
|
argument is provided, the data will only be requested to the
|
|
provided peer. By default, the resulting promise will block
|
|
forever if the data is never received. If [timeout] is provided
|
|
the promise will be resolved with the error [Timeout] after the
|
|
provided amount of seconds.
|
|
|
|
A internal scheduler is able to re-send the request with an
|
|
exponential back-off until the data is received. If the function
|
|
is called multiple time with the same key but with disctinct
|
|
peers, the internal scheduler randomly chooses the requested
|
|
peer (at each retry). *)
|
|
val fetch:
|
|
t ->
|
|
?peer:P2p_peer.Id.t ->
|
|
?timeout:float ->
|
|
key -> param -> value tzresult Lwt.t
|
|
|
|
(** Remove the data from the local index or cancel all pending
|
|
request. Any pending [fetch] promises are resolved with the
|
|
error [Canceled]. *)
|
|
val clear_or_cancel: t -> key -> unit
|
|
|
|
val inject: t -> key -> value -> bool Lwt.t
|
|
|
|
(** Monitor all the fetched data. A given data will appear only
|
|
once. *)
|
|
val watch: t -> (key * value) Lwt_stream.t * Lwt_watcher.stopper
|
|
|
|
val pending: t -> key -> bool
|
|
|
|
end
|
|
|
|
module type DISK_TABLE = sig
|
|
type store
|
|
type key
|
|
type value
|
|
val known: store -> key -> bool Lwt.t
|
|
val read: store -> key -> value tzresult Lwt.t
|
|
val read_opt: store -> key -> value option Lwt.t
|
|
val read_exn: store -> key -> value Lwt.t
|
|
end
|
|
|
|
module type MEMORY_TABLE = sig
|
|
(* A subtype of Hashtbl.S *)
|
|
type 'a t
|
|
type key
|
|
val create: int -> 'a t
|
|
val find: 'a t -> key -> 'a
|
|
val add: 'a t -> key -> 'a -> unit
|
|
val replace: 'a t -> key -> 'a -> unit
|
|
val remove: 'a t -> key -> unit
|
|
val fold: (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b
|
|
end
|
|
|
|
module type SCHEDULER_EVENTS = sig
|
|
type t
|
|
type key
|
|
val request: t -> P2p_peer.Id.t option -> key -> unit
|
|
val notify: t -> P2p_peer.Id.t -> key -> unit
|
|
val notify_cancelation: t -> key -> unit
|
|
val notify_unrequested: t -> P2p_peer.Id.t -> key -> unit
|
|
val notify_duplicate: t -> P2p_peer.Id.t -> key -> unit
|
|
val notify_invalid: t -> P2p_peer.Id.t -> key -> unit
|
|
end
|
|
|
|
module type PRECHECK = sig
|
|
type key
|
|
type param
|
|
type notified_value
|
|
type value
|
|
val precheck: key -> param -> notified_value -> value option
|
|
end
|
|
|
|
module Make_table
|
|
(Hash : sig
|
|
type t
|
|
val name : string
|
|
val encoding : t Data_encoding.t
|
|
val pp : Format.formatter -> t -> unit
|
|
end)
|
|
(Disk_table : DISK_TABLE with type key := Hash.t)
|
|
(Memory_table : MEMORY_TABLE with type key := Hash.t)
|
|
(Scheduler : SCHEDULER_EVENTS with type key := Hash.t)
|
|
(Precheck : PRECHECK with type key := Hash.t
|
|
and type value := Disk_table.value) : sig
|
|
|
|
include DISTRIBUTED_DB with type key = Hash.t
|
|
and type value = Disk_table.value
|
|
and type param = Precheck.param
|
|
val create:
|
|
?global_input:(key * value) Lwt_watcher.input ->
|
|
Scheduler.t -> Disk_table.store -> t
|
|
val notify: t -> P2p_peer.Id.t -> key -> Precheck.notified_value -> unit Lwt.t
|
|
|
|
end
|
|
|
|
module type REQUEST = sig
|
|
type key
|
|
type param
|
|
val active : param -> P2p_peer.Set.t
|
|
val send : param -> P2p_peer.Id.t -> key list -> unit
|
|
end
|
|
|
|
module Make_request_scheduler
|
|
(Hash : sig
|
|
type t
|
|
val name : string
|
|
val encoding : t Data_encoding.t
|
|
val pp : Format.formatter -> t -> unit
|
|
end)
|
|
(Table : MEMORY_TABLE with type key := Hash.t)
|
|
(Request : REQUEST with type key := Hash.t) : sig
|
|
|
|
type t
|
|
val create: Request.param -> t
|
|
val shutdown: t -> unit Lwt.t
|
|
include SCHEDULER_EVENTS with type t := t and type key := Hash.t
|
|
|
|
end
|