Move Persist
from the shell to the protocol.
It is not used anymore in the shell.
This commit is contained in:
parent
1d357587d0
commit
49b7db258d
@ -38,7 +38,6 @@
|
||||
|
||||
;; Tezos specifics
|
||||
v1/tezos_data.mli
|
||||
v1/persist.mli
|
||||
v1/context.mli
|
||||
v1/RPC.mli
|
||||
v1/updater.mli
|
||||
|
@ -10,7 +10,21 @@
|
||||
(** View over the context store, restricted to types, access and
|
||||
functional manipulation of an existing context. *)
|
||||
|
||||
include Persist.STORE
|
||||
type t
|
||||
|
||||
(** Keys in (kex x value) database implementations *)
|
||||
type key = string list
|
||||
|
||||
(** Values in (kex x value) database implementations *)
|
||||
type value = MBytes.t
|
||||
|
||||
val mem: t -> key -> bool Lwt.t
|
||||
val dir_mem: t -> key -> bool Lwt.t
|
||||
val get: t -> key -> value option Lwt.t
|
||||
val set: t -> key -> value -> t Lwt.t
|
||||
val del: t -> key -> t Lwt.t
|
||||
val list: t -> key list -> key list Lwt.t
|
||||
val remove_rec: t -> key -> t Lwt.t
|
||||
|
||||
val register_resolver:
|
||||
'a Base58.encoding -> (t -> string -> 'a list Lwt.t) -> unit
|
||||
|
@ -140,6 +140,9 @@ let undata_key = function
|
||||
| "data" :: key -> key
|
||||
| _ -> assert false
|
||||
|
||||
type key = string list
|
||||
type value = MBytes.t
|
||||
|
||||
let mem ctxt key =
|
||||
Lwt_utils.Idle_waiter.task ctxt.index.repack_scheduler @@ fun () ->
|
||||
GitStore.Tree.mem ctxt.tree (data_key key) >>= fun v ->
|
||||
|
@ -40,7 +40,16 @@ val commit_test_network_genesis:
|
||||
|
||||
(** {2 Generic interface} ****************************************************)
|
||||
|
||||
include Persist.STORE with type t := context
|
||||
type key = string list
|
||||
type value = MBytes.t
|
||||
|
||||
val mem: context -> key -> bool Lwt.t
|
||||
val dir_mem: context -> key -> bool Lwt.t
|
||||
val get: context -> key -> value option Lwt.t
|
||||
val set: context -> key -> value -> t Lwt.t
|
||||
val del: context -> key -> t Lwt.t
|
||||
val list: context -> key list -> key list Lwt.t
|
||||
val remove_rec: context -> key -> t Lwt.t
|
||||
|
||||
(** {2 Accessing and Updating Versions} **************************************)
|
||||
|
||||
|
@ -1,212 +0,0 @@
|
||||
(**************************************************************************)
|
||||
(* *)
|
||||
(* Copyright (c) 2014 - 2017. *)
|
||||
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
||||
(* *)
|
||||
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(** Tezos - Persistent structures on top of {!Context} *)
|
||||
|
||||
(** Keys in (kex x value) database implementations *)
|
||||
type key = string list
|
||||
|
||||
(** Values in (kex x value) database implementations *)
|
||||
type value = MBytes.t
|
||||
|
||||
(** Low level view over a (key x value) database implementation. *)
|
||||
module type STORE = sig
|
||||
type t
|
||||
val mem: t -> key -> bool Lwt.t
|
||||
val dir_mem: t -> key -> bool Lwt.t
|
||||
val get: t -> key -> value option Lwt.t
|
||||
val set: t -> key -> value -> t Lwt.t
|
||||
val del: t -> key -> t Lwt.t
|
||||
val list: t -> key list -> key list Lwt.t
|
||||
val remove_rec: t -> key -> t Lwt.t
|
||||
end
|
||||
|
||||
(** Projection of OCaml keys of some abstract type to concrete storage
|
||||
keys. For practical reasons, all such keys must fall under a same
|
||||
{!prefix} and have the same relative {!length}. Functions
|
||||
{!to_path} and {!of_path} only take the relative part into account
|
||||
(the prefix is added and removed when needed). *)
|
||||
module type KEY = sig
|
||||
type t
|
||||
val prefix: key
|
||||
val length: int
|
||||
val to_path: t -> key
|
||||
val of_path: key -> t
|
||||
val compare: t -> t -> int
|
||||
end
|
||||
|
||||
(** A KEY instance for using raw implementation paths as keys *)
|
||||
module RawKey : KEY with type t = key
|
||||
|
||||
module type BYTES_STORE = sig
|
||||
type t
|
||||
type key
|
||||
val mem: t -> key -> bool Lwt.t
|
||||
val get: t -> key -> value option Lwt.t
|
||||
val set: t -> key -> value -> t Lwt.t
|
||||
val del: t -> key -> t Lwt.t
|
||||
val list: t -> key list -> key list Lwt.t
|
||||
val remove_rec: t -> key -> t Lwt.t
|
||||
end
|
||||
|
||||
module MakeBytesStore (S : STORE) (K : KEY) :
|
||||
BYTES_STORE with type t = S.t and type key = K.t
|
||||
|
||||
(** {2 Typed Store Overlays} *************************************************)
|
||||
|
||||
(** Projection of OCaml values of some abstract type to concrete
|
||||
storage data. *)
|
||||
module type VALUE = sig
|
||||
type t
|
||||
val of_bytes: value -> t option
|
||||
val to_bytes: t -> value
|
||||
end
|
||||
|
||||
(** A VALUE instance for using the raw bytes values *)
|
||||
module RawValue : VALUE with type t = value
|
||||
|
||||
(** Signature of a typed store as returned by {!MakeTypedStore} *)
|
||||
module type TYPED_STORE = sig
|
||||
type t
|
||||
type key
|
||||
type value
|
||||
val mem: t -> key -> bool Lwt.t
|
||||
val get: t -> key -> value option Lwt.t
|
||||
val set: t -> key -> value -> t Lwt.t
|
||||
val del: t -> key -> t Lwt.t
|
||||
end
|
||||
|
||||
(** Gives a typed view of a store (values of a given type stored under
|
||||
keys of a given type). The view is also restricted to a prefix,
|
||||
(which can be empty). For all primitives to work as expected, all
|
||||
keys under this prefix must be homogeneously typed. *)
|
||||
module MakeTypedStore (S : STORE) (K : KEY) (C : VALUE) :
|
||||
TYPED_STORE with type t = S.t and type key = K.t and type value = C.t
|
||||
|
||||
|
||||
(** {2 Persistent Sets} ******************************************************)
|
||||
|
||||
(** Signature of a set as returned by {!MakePersistentSet} *)
|
||||
module type PERSISTENT_SET = sig
|
||||
type t and key
|
||||
val mem : t -> key -> bool Lwt.t
|
||||
val set : t -> key -> t Lwt.t
|
||||
val del : t -> key -> t Lwt.t
|
||||
val elements : t -> key list Lwt.t
|
||||
val clear : t -> t Lwt.t
|
||||
val iter : t -> f:(key -> unit Lwt.t) -> unit Lwt.t
|
||||
val fold : t -> 'a -> f:(key -> 'a -> 'a Lwt.t) -> 'a Lwt.t
|
||||
end
|
||||
|
||||
(** Signature of a buffered set as returned by {!MakeBufferedPersistentSet} *)
|
||||
module type BUFFERED_PERSISTENT_SET = sig
|
||||
include PERSISTENT_SET
|
||||
module Set : Set.S with type elt = key
|
||||
val read : t -> Set.t Lwt.t
|
||||
val write : t -> Set.t -> t Lwt.t
|
||||
end
|
||||
|
||||
(** Build a set in the (key x value) storage by encoding elements as
|
||||
keys and using the association of (any) data to these keys as
|
||||
membership. For this to work, the prefix passed must be reserved
|
||||
for the set (every key under it is considered a member). *)
|
||||
module MakePersistentSet (S : STORE) (K : KEY)
|
||||
: PERSISTENT_SET with type t := S.t and type key := K.t
|
||||
|
||||
(** Same as {!MakePersistentSet} but also provides a way to use an
|
||||
OCaml set as an explicitly synchronized in-memory buffer. *)
|
||||
module MakeBufferedPersistentSet
|
||||
(S : STORE) (K : KEY) (Set : Set.S with type elt = K.t)
|
||||
: BUFFERED_PERSISTENT_SET
|
||||
with type t := S.t
|
||||
and type key := K.t
|
||||
and module Set := Set
|
||||
|
||||
(** {2 Persistent Maps} ******************************************************)
|
||||
|
||||
(** Signature of a map as returned by {!MakePersistentMap} *)
|
||||
module type PERSISTENT_MAP = sig
|
||||
type t and key and value
|
||||
val mem : t -> key -> bool Lwt.t
|
||||
val get : t -> key -> value option Lwt.t
|
||||
val set : t -> key -> value -> t Lwt.t
|
||||
val del : t -> key -> t Lwt.t
|
||||
val bindings : t -> (key * value) list Lwt.t
|
||||
val clear : t -> t Lwt.t
|
||||
val iter : t -> f:(key -> value -> unit Lwt.t) -> unit Lwt.t
|
||||
val fold : t -> 'a -> f:(key -> value -> 'a -> 'a Lwt.t) -> 'a Lwt.t
|
||||
end
|
||||
|
||||
(** Signature of a buffered map as returned by {!MakeBufferedPersistentMap} *)
|
||||
module type BUFFERED_PERSISTENT_MAP = sig
|
||||
include PERSISTENT_MAP
|
||||
module Map : Map.S with type key = key
|
||||
val read : t -> value Map.t Lwt.t
|
||||
val write : t -> value Map.t -> t Lwt.t
|
||||
end
|
||||
|
||||
(** Build a map in the (key x value) storage. For this to work, the
|
||||
prefix passed must be reserved for the map (every key under it is
|
||||
considered the key of a binding). *)
|
||||
module MakePersistentMap (S : STORE) (K : KEY) (C : VALUE)
|
||||
: PERSISTENT_MAP
|
||||
with type t := S.t and type key := K.t and type value := C.t
|
||||
|
||||
(** Same as {!MakePersistentMap} but also provides a way to use an
|
||||
OCaml map as an explicitly synchronized in-memory buffer. *)
|
||||
module MakeBufferedPersistentMap
|
||||
(S : STORE) (K : KEY) (C : VALUE) (Map : Map.S with type key = K.t)
|
||||
: BUFFERED_PERSISTENT_MAP
|
||||
with type t := S.t
|
||||
and type key := K.t
|
||||
and type value := C.t
|
||||
and module Map := Map
|
||||
|
||||
|
||||
(** {2 Predefined Instances} *************************************************)
|
||||
|
||||
module MakePersistentBytesMap (S : STORE) (K : KEY)
|
||||
: PERSISTENT_MAP
|
||||
with type t := S.t and type key := K.t and type value := MBytes.t
|
||||
|
||||
module MakeBufferedPersistentBytesMap
|
||||
(S : STORE) (K : KEY) (Map : Map.S with type key = K.t)
|
||||
: BUFFERED_PERSISTENT_MAP
|
||||
with type t := S.t
|
||||
and type key := K.t
|
||||
and type value := MBytes.t
|
||||
and module Map := Map
|
||||
|
||||
module type TYPED_VALUE_REPR = sig
|
||||
type value
|
||||
val encoding: value Data_encoding.t
|
||||
end
|
||||
|
||||
module MakePersistentTypedMap (S : STORE) (K : KEY) (T : TYPED_VALUE_REPR)
|
||||
: PERSISTENT_MAP
|
||||
with type t := S.t and type key := K.t and type value := T.value
|
||||
|
||||
module MakeBufferedPersistentTypedMap
|
||||
(S : STORE) (K : KEY) (T : TYPED_VALUE_REPR) (Map : Map.S with type key = K.t)
|
||||
: BUFFERED_PERSISTENT_MAP
|
||||
with type t := S.t
|
||||
and type key := K.t
|
||||
and type value := T.value
|
||||
and module Map := Map
|
||||
|
||||
module MakeHashResolver
|
||||
(Store : sig
|
||||
type t
|
||||
val dir_mem: t -> string list -> bool Lwt.t
|
||||
val list: t -> string list list -> string list list Lwt.t
|
||||
val prefix: string list
|
||||
end)
|
||||
(H: HASH) : sig
|
||||
val resolve : Store.t -> string -> H.t list Lwt.t
|
||||
end
|
@ -254,7 +254,6 @@ module Make(Param : sig val name: string end)() = struct
|
||||
module Ed25519 = Ed25519
|
||||
module Hash = Hash
|
||||
module Tezos_data = Tezos_data
|
||||
module Persist = Persist
|
||||
module RPC = RPC
|
||||
module Micheline = Micheline
|
||||
module Fitness = Fitness
|
||||
|
@ -26,6 +26,7 @@
|
||||
"Manager_repr",
|
||||
"Block_header_repr",
|
||||
|
||||
"Persist",
|
||||
"Storage_sigs",
|
||||
"Storage_functors",
|
||||
"Storage",
|
||||
|
@ -64,3 +64,16 @@ let take n l =
|
||||
| [] -> None
|
||||
| x :: xs -> loop (x :: acc) (n-1) xs in
|
||||
loop [] n l
|
||||
|
||||
let remove_prefix ~prefix s =
|
||||
let x = String.length prefix in
|
||||
let n = String.length s in
|
||||
if Compare.Int.(n >= x) && Compare.String.(String.sub s 0 x = prefix) then
|
||||
Some (String.sub s x (n - x))
|
||||
else
|
||||
None
|
||||
|
||||
let rec remove_elem_from_list nb = function
|
||||
| [] -> []
|
||||
| l when Compare.Int.(nb <= 0) -> l
|
||||
| _ :: tl -> remove_elem_from_list (nb - 1) tl
|
||||
|
@ -20,3 +20,9 @@ val (--->) : Int32.t -> Int32.t -> Int32.t list
|
||||
val pp_print_paragraph : Format.formatter -> string -> unit
|
||||
|
||||
val take: int -> 'a list -> ('a list * 'a list) option
|
||||
|
||||
(** Some (input with [prefix] removed), if string has [prefix], else [None] **)
|
||||
val remove_prefix: prefix:string -> string -> string option
|
||||
|
||||
(** [remove nb list] remove the first [nb] elements from the list [list]. *)
|
||||
val remove_elem_from_list: int -> 'a list -> 'a list
|
||||
|
@ -9,8 +9,6 @@
|
||||
|
||||
(* Tezos - Persistent structures on top of {!Store} or {!Context} *)
|
||||
|
||||
open Lwt.Infix
|
||||
|
||||
(*-- Signatures --------------------------------------------------------------*)
|
||||
|
||||
type key = string list
|
||||
@ -108,7 +106,7 @@ let prefix prf key =
|
||||
let unprefix prf key =
|
||||
let rec eat = function
|
||||
| k :: key, p :: prefix ->
|
||||
assert (k = p) ;
|
||||
assert Compare.String.(k = p) ;
|
||||
eat (key, prefix)
|
||||
| key, [] -> key
|
||||
| _ -> assert false in
|
||||
@ -165,17 +163,17 @@ module MakeTypedStore
|
||||
let set s k v = S.set s k (C.to_bytes v)
|
||||
let del = S.del
|
||||
|
||||
let raw_get = S.get
|
||||
|
||||
end
|
||||
|
||||
module CompareStringList = Compare.List(Compare.String)
|
||||
|
||||
module RawKey = struct
|
||||
type t = key
|
||||
let prefix = []
|
||||
let length = 0
|
||||
let to_path p = p
|
||||
let of_path p = p
|
||||
let compare pa pb = Pervasives.compare pa pb
|
||||
let compare = CompareStringList.compare
|
||||
end
|
||||
module RawValue = struct
|
||||
type t = value
|
||||
@ -190,7 +188,7 @@ module MakePersistentSet
|
||||
|
||||
let to_path k =
|
||||
let suffix = K.to_path k in
|
||||
assert (List.length suffix = K.length) ;
|
||||
assert Compare.Int.(List.length suffix = K.length) ;
|
||||
prefix K.prefix suffix
|
||||
|
||||
let of_path p = K.of_path (unprefix K.prefix p)
|
||||
@ -216,9 +214,9 @@ module MakePersistentSet
|
||||
|
||||
let fold c x ~f =
|
||||
let rec dig i root acc =
|
||||
if root = inited_key then
|
||||
if CompareStringList.(root = inited_key) then
|
||||
Lwt.return acc
|
||||
else if i <= 0 then
|
||||
else if Compare.Int.(i <= 0) then
|
||||
f (of_path root) acc
|
||||
else
|
||||
S.list c [root] >>= fun roots ->
|
||||
@ -259,7 +257,7 @@ module MakePersistentMap
|
||||
|
||||
let to_path k =
|
||||
let suffix = K.to_path k in
|
||||
assert (List.length suffix = K.length) ;
|
||||
assert Compare.Int.(List.length suffix = K.length) ;
|
||||
prefix K.prefix suffix
|
||||
|
||||
let of_path p = K.of_path (unprefix K.prefix p)
|
||||
@ -290,9 +288,9 @@ module MakePersistentMap
|
||||
|
||||
let fold c x ~f =
|
||||
let rec dig i root acc =
|
||||
if root = inited_key then
|
||||
if CompareStringList.(root = inited_key) then
|
||||
Lwt.return acc
|
||||
else if i <= 0 then
|
||||
else if Compare.Int.(i <= 0) then
|
||||
S.get c root >>= function
|
||||
| None -> Lwt.return acc
|
||||
| Some b ->
|
||||
@ -327,206 +325,6 @@ module MakeBufferedPersistentMap
|
||||
|
||||
end
|
||||
|
||||
(*-- Imperative overlays ----------------------------------------------------*)
|
||||
|
||||
type 'a shared_ref =
|
||||
{ mutable contents : 'a ;
|
||||
lock : Lwt_mutex.t }
|
||||
let share contents =
|
||||
{ contents ;
|
||||
lock = Lwt_mutex.create () }
|
||||
let update r f =
|
||||
Lwt_mutex.with_lock r.lock
|
||||
(fun () -> f r.contents >>= function
|
||||
| None -> Lwt.return false
|
||||
| Some new_contents ->
|
||||
r.contents <- new_contents ;
|
||||
Lwt.return true)
|
||||
let update_with_res r f =
|
||||
Lwt_mutex.with_lock r.lock
|
||||
(fun () -> f r.contents >>= function
|
||||
| (None, x) -> Lwt.return (false, x)
|
||||
| (Some new_contents, x) ->
|
||||
r.contents <- new_contents ;
|
||||
Lwt.return (true, x))
|
||||
let use r f =
|
||||
Lwt_mutex.with_lock r.lock
|
||||
(fun () -> f r.contents)
|
||||
|
||||
module type IMPERATIVE_PROXY = sig
|
||||
module Store : TYPED_STORE
|
||||
|
||||
type t
|
||||
type rdata
|
||||
type state
|
||||
val create: state -> Store.t shared_ref -> t
|
||||
val known: t -> Store.key -> bool Lwt.t
|
||||
val read: t -> Store.key -> Store.value option Lwt.t
|
||||
val store: t -> Store.key -> Store.value -> bool Lwt.t
|
||||
val update: t -> Store.key -> Store.value -> bool Lwt.t
|
||||
val remove: t -> Store.key -> bool Lwt.t
|
||||
val prefetch: t -> rdata -> Store.key -> unit
|
||||
val fetch: t -> rdata -> Store.key -> Store.value Lwt.t
|
||||
val pending: t -> Store.key -> bool
|
||||
val shutdown: t -> unit Lwt.t
|
||||
end
|
||||
|
||||
module type IMPERATIVE_PROXY_SCHEDULER = sig
|
||||
module Store : TYPED_STORE
|
||||
type state
|
||||
type rdata
|
||||
type data
|
||||
|
||||
val name : string
|
||||
val init_request :
|
||||
state -> Store.key -> data Lwt.t
|
||||
val request :
|
||||
state ->
|
||||
get:(rdata -> Store.key -> Store.value Lwt.t) ->
|
||||
set:(Store.key -> Store.value -> unit Lwt.t) ->
|
||||
(Store.key * data * rdata) list -> float
|
||||
end
|
||||
|
||||
module MakeImperativeProxy
|
||||
(Store : TYPED_STORE)
|
||||
(Table : Hashtbl.S with type key = Store.key)
|
||||
(Scheduler : IMPERATIVE_PROXY_SCHEDULER with module Store := Store)
|
||||
: IMPERATIVE_PROXY with module Store := Store and type state = Scheduler.state and type rdata = Scheduler.rdata = struct
|
||||
|
||||
type rdata = Scheduler.rdata
|
||||
type data =
|
||||
{ rdata: rdata ;
|
||||
state: [ `Inited of Scheduler.data | `Initing of Scheduler.data Lwt.t ] ;
|
||||
wakener: Store.value Lwt.u }
|
||||
type state = Scheduler.state
|
||||
|
||||
type t =
|
||||
{ tbl : data Table.t ;
|
||||
store : Store.t shared_ref ;
|
||||
cancelation: unit -> unit Lwt.t ;
|
||||
cancel: unit -> unit Lwt.t ;
|
||||
on_cancel: (unit -> unit Lwt.t) -> unit ;
|
||||
worker_trigger: unit -> unit;
|
||||
worker_waiter: unit -> unit Lwt.t ;
|
||||
worker: unit Lwt.t ;
|
||||
gstate : state }
|
||||
|
||||
let pending_requests { tbl } =
|
||||
Table.fold
|
||||
(fun h data acc ->
|
||||
match data.state with
|
||||
| `Initing _ -> acc
|
||||
| `Inited d -> (h, d, data.rdata) :: acc)
|
||||
tbl []
|
||||
|
||||
let pending { tbl } hash = Table.mem tbl hash
|
||||
|
||||
let request { tbl ; worker_trigger ; gstate } rdata hash =
|
||||
assert (not (Table.mem tbl hash));
|
||||
let waiter, wakener = Lwt.wait () in
|
||||
let data = Scheduler.init_request gstate hash in
|
||||
match Lwt.state data with
|
||||
| Lwt.Return data ->
|
||||
let state = `Inited data in
|
||||
Table.add tbl hash { rdata ; state ; wakener } ;
|
||||
worker_trigger () ;
|
||||
waiter
|
||||
| _ ->
|
||||
let state = `Initing data in
|
||||
Table.add tbl hash { rdata ; state ; wakener } ;
|
||||
Lwt.async
|
||||
(fun () ->
|
||||
data >>= fun data ->
|
||||
let state = `Inited data in
|
||||
Table.add tbl hash { rdata ; state ; wakener } ;
|
||||
worker_trigger () ;
|
||||
Lwt.return_unit) ;
|
||||
waiter
|
||||
|
||||
let prefetch ({ store ; tbl } as session) rdata hash =
|
||||
Lwt.ignore_result
|
||||
(use store (fun store -> Store.mem store hash) >>= fun exists ->
|
||||
if not exists && not (Table.mem tbl hash) then
|
||||
request session rdata hash >>= fun _ -> Lwt.return_unit
|
||||
else
|
||||
Lwt.return_unit)
|
||||
|
||||
let known { store } hash =
|
||||
use store (fun store -> Store.mem store hash)
|
||||
|
||||
let read { store } hash =
|
||||
use store (fun store -> Store.get store hash)
|
||||
|
||||
let fetch ({ store ; tbl } as session) rdata hash =
|
||||
try Lwt.waiter_of_wakener (Table.find tbl hash).wakener
|
||||
with Not_found ->
|
||||
use store (fun store -> Store.get store hash) >>= function
|
||||
| Some op -> Lwt.return op
|
||||
| None ->
|
||||
try Lwt.waiter_of_wakener (Table.find tbl hash).wakener
|
||||
with Not_found -> request session rdata hash
|
||||
|
||||
let store { store ; tbl } hash data =
|
||||
update store (fun store ->
|
||||
Store.mem store hash >>= fun exists ->
|
||||
if exists then Lwt.return_none
|
||||
else ( Store.set store hash data >>= fun store ->
|
||||
Lwt.return (Some store) ) ) >>= fun changed ->
|
||||
try
|
||||
let wakener = (Table.find tbl hash).wakener in
|
||||
Table.remove tbl hash;
|
||||
Lwt.wakeup wakener data;
|
||||
Lwt.return changed
|
||||
with Not_found -> Lwt.return changed
|
||||
|
||||
let remove { store ; _ } hash =
|
||||
update store (fun store ->
|
||||
Store.mem store hash >>= fun exists ->
|
||||
if not exists then Lwt.return_none
|
||||
else ( Store.del store hash >>= fun store ->
|
||||
Lwt.return (Some store) ) )
|
||||
|
||||
let update { store ; _ } hash data =
|
||||
update store (fun store ->
|
||||
Store.mem store hash >>= fun exists ->
|
||||
if not exists then Lwt.return_none
|
||||
else ( Store.set store hash data >>= fun store ->
|
||||
Lwt.return (Some store) ) )
|
||||
|
||||
let create gstate st =
|
||||
let tbl = Table.create 50 in
|
||||
let cancelation, cancel, on_cancel = Lwt_utils.canceler () in
|
||||
let worker_trigger, worker_waiter = Lwt_utils.trigger () in
|
||||
let session =
|
||||
{ tbl ; gstate ; store = st ; worker = Lwt.return () ;
|
||||
cancelation ; cancel ; on_cancel ;
|
||||
worker_trigger ; worker_waiter } in
|
||||
let worker =
|
||||
let rec worker_loop () =
|
||||
Lwt.pick [(worker_waiter () >|= fun () -> `Process);
|
||||
(cancelation () >|= fun () -> `Cancel)] >>= function
|
||||
| `Cancel -> Lwt.return_unit
|
||||
| `Process ->
|
||||
begin
|
||||
match pending_requests session with
|
||||
| [] -> ()
|
||||
| requests ->
|
||||
let get = fetch session
|
||||
and set k v = store session k v >>= fun _ -> Lwt.return_unit in
|
||||
let timeout = Scheduler.request gstate ~get ~set requests in
|
||||
if timeout > 0. then
|
||||
Lwt.ignore_result (Lwt_unix.sleep timeout >|= worker_trigger);
|
||||
end;
|
||||
worker_loop ()
|
||||
in
|
||||
Lwt_utils.worker Scheduler.name ~run:worker_loop ~cancel in
|
||||
{ session with worker }
|
||||
|
||||
let shutdown { cancel ; worker } =
|
||||
cancel () >>= fun () -> worker
|
||||
|
||||
end
|
||||
|
||||
(*-- Predefined Instances ----------------------------------------------------*)
|
||||
|
||||
module MBytesValue = struct
|
||||
@ -578,7 +376,7 @@ module MakeHashResolver
|
||||
let plen = List.length Store.prefix
|
||||
let build path =
|
||||
H.of_path_exn @@
|
||||
Utils.remove_elem_from_list plen path
|
||||
Misc.remove_elem_from_list plen path
|
||||
let resolve t p =
|
||||
let rec loop prefix = function
|
||||
| [] ->
|
||||
@ -590,7 +388,7 @@ module MakeHashResolver
|
||||
| [d] ->
|
||||
Store.list t [prefix] >>= fun prefixes ->
|
||||
Lwt_list.filter_map_p (fun prefix ->
|
||||
match remove_prefix ~prefix:d (List.hd (List.rev prefix)) with
|
||||
match Misc.remove_prefix ~prefix:d (List.hd (List.rev prefix)) with
|
||||
| None -> Lwt.return_none
|
||||
| Some _ -> Lwt.return (Some (build prefix))
|
||||
) prefixes
|
Loading…
Reference in New Issue
Block a user