Shell: use lmdb for disk storage

This commit is contained in:
Vincent Bernardoff 2018-05-25 16:57:44 +02:00 committed by Benjamin Canou
parent be459ef312
commit a6bc6333da
14 changed files with 247 additions and 131 deletions

View File

@ -889,11 +889,15 @@ let may_create_chain state chain genesis =
let read
?patch_context
?(store_mapsize=4_096_000_000_000L)
?(context_mapsize=40_960_000_000L)
~store_root
~context_root
genesis =
Store.init store_root >>=? fun global_store ->
Context.init ?patch_context ~root:context_root >>= fun context_index ->
Store.init ~mapsize:store_mapsize store_root >>=? fun global_store ->
Context.init
~mapsize:context_mapsize ?patch_context
context_root >>= fun context_index ->
let global_data = {
chains = Chain_id.Table.create 17 ;
global_store ;

View File

@ -233,6 +233,8 @@ end
the databases. *)
val read:
?patch_context:(Context.t -> Context.t Lwt.t) ->
?store_mapsize:int64 ->
?context_mapsize:int64 ->
store_root:string ->
context_root:string ->
Chain.genesis ->

View File

@ -261,8 +261,8 @@ module Protocol = struct
end
let init dir =
Raw_store.init dir >>=? fun s ->
let init ?mapsize dir =
Raw_store.init ?mapsize dir >>=? fun s ->
Block.register s ;
Protocol.register s ;
return s

View File

@ -12,8 +12,9 @@ open Store_sigs
type t
type global_store = t
(** Open or initialize a store at a given path. *)
val init: string -> t tzresult Lwt.t
(** [init ~mapsize path] returns an initialized store at [path] of
maximum capacity [mapsize] bytes. *)
val init: ?mapsize:int64 -> string -> t tzresult Lwt.t
val close : t -> unit

View File

@ -10,7 +10,9 @@
let fail expected given msg =
Format.kasprintf Pervasives.failwith
"@[%s@ expected: %s@ got: %s@]" msg expected given
let fail_msg fmt = Format.kasprintf (fail "" "") fmt
let fail_msg ?(expected="") ?(given="") fmt =
Format.kasprintf (fail expected given) fmt
let default_printer _ = ""
@ -47,11 +49,12 @@ let make_equal_list eq prn ?(msg="") x y =
if eq hd_x hd_y then
iter (succ i) tl_x tl_y
else
let fm = Printf.sprintf "%s (at index %d)" msg i in
fail (prn hd_x) (prn hd_y) fm
fail_msg ~expected:(prn hd_x) ~given:(prn hd_y)
"%s (at index %d)" msg i
| _ :: _, [] | [], _ :: _ ->
let fm = Printf.sprintf "%s (lists of different sizes)" msg in
fail_msg "%s" fm
fail_msg ~expected:"" ~given:""
"%s (lists of different sizes %d %d)" msg
(List.length x) (List.length y)
| [], [] ->
() in
iter 0 x y

View File

@ -152,6 +152,8 @@ let wrap_state_init f base_dir =
let store_root = base_dir // "store" in
let context_root = base_dir // "context" in
State.read
~store_mapsize:4_096_000_000L
~context_mapsize:4_096_000_000L
~store_root
~context_root
genesis >>=? fun (state, chain) ->

View File

@ -28,13 +28,16 @@ let genesis_time =
(** *)
let mapsize = 4_096_000_000L (* ~4 GiB *)
let wrap_store_init f _ () =
Lwt_utils_unix.with_tempdir "tezos_test_" begin fun base_dir ->
let root = base_dir // "store" in
Store.init root >>= function
Store.init ~mapsize root >>= function
| Ok store ->
f store >>= fun () ->
Lwt.return ()
Lwt.finalize
(fun () -> f store)
(fun () -> Store.close store ; Lwt.return_unit)
| Error err ->
Format.kasprintf Pervasives.failwith
"@[Cannot initialize store:@ %a@]" pp_print_error err
@ -43,10 +46,11 @@ let wrap_store_init f _ () =
let wrap_raw_store_init f _ () =
Lwt_utils_unix.with_tempdir "tezos_test_" begin fun base_dir ->
let root = base_dir // "store" in
Raw_store.init root >>= function
Raw_store.init ~mapsize root >>= function
| Ok store ->
f store >>= fun () ->
Lwt.return ()
Lwt.finalize
(fun () -> f store)
(fun () -> Raw_store.close store ; Lwt.return_unit)
| Error err ->
Format.kasprintf Pervasives.failwith
"@[Cannot initialize store:@ %a@]" pp_print_error err
@ -153,11 +157,15 @@ let test_expand s =
let check (type t)
(module Store: Store_sigs.STORE with type t = t) (s: Store.t) k d =
Store.read_opt s k >|= fun d' ->
if d' <> Some d then begin
Assert.fail_msg
"Error while reading key %S\n%!" (String.concat Filename.dir_sep k) ;
end
Store.read_opt s k >|= function
| Some d' when MBytes.equal d d' -> ()
| Some d' ->
Assert.fail_msg ~expected:(MBytes.to_string d) ~given:(MBytes.to_string d')
"Error while reading key %d %S\n%!"
Cstruct.(compare (of_bigarray d) (of_bigarray d')) (String.concat Filename.dir_sep k)
| None ->
Assert.fail_msg ~expected:(MBytes.to_string d) ~given:""
"Error while reading key %S\n%!" (String.concat Filename.dir_sep k)
let check_none (type t)
(module Store: Store_sigs.STORE with type t = t) (s: Store.t) k =
@ -179,7 +187,7 @@ let test_generic (type t)
let list (type t)
(module Store: Store_sigs.STORE with type t = t) (s: Store.t) k =
Store.fold_keys s k ~init:[] ~f:(fun k acc -> Lwt.return (k :: acc))
Store.keys s k
let test_generic_list (type t)
(module Store: Store_sigs.STORE with type t = t) (s: Store.t) =
@ -216,12 +224,11 @@ let test_hashset (type t)
(Make_substore(Store)(struct let name = ["test_set"] end))
(Block_hash)
(BlockSet) in
let bhset : BlockSet.t = BlockSet.add bh2 (BlockSet.add bh1 BlockSet.empty) in
let bhset = BlockSet.(add bh2 (add bh1 empty)) in
StoreSet.store_all s bhset >>= fun () ->
StoreSet.read_all s >>= fun bhset' ->
Assert.equal_block_set ~msg:__LOC__ bhset bhset' ;
let bhset2 =
Pervasives.(bhset |> BlockSet.add bh3 |> BlockSet.remove bh1) in
let bhset2 = BlockSet.(bhset |> add bh3 |> remove bh1) in
StoreSet.store_all s bhset2 >>= fun () ->
StoreSet.read_all s >>= fun bhset2' ->
Assert.equal_block_set ~msg:__LOC__ bhset2 bhset2' ;
@ -252,14 +259,11 @@ let test_hashmap (type t)
end))
(BlockMap) in
let eq = (=) in
let map =
Pervasives.(BlockMap.empty |>
BlockMap.add bh1 (1, 'a') |> BlockMap.add bh2 (2, 'b')) in
let map = BlockMap.(empty |> add bh1 (1, 'a') |> add bh2 (2, 'b')) in
StoreMap.store_all s map >>= fun () ->
StoreMap.read_all s >>= fun map' ->
Assert.equal_block_map ~msg:__LOC__ ~eq map map' ;
let map2 =
Pervasives.(map |> BlockMap.add bh3 (3, 'c') |> BlockMap.remove bh1) in
let map2 = map |> BlockMap.add bh3 (3, 'c') |> BlockMap.remove bh1 in
StoreMap.store_all s map2 >>= fun () ->
StoreMap.read_all s >>= fun map2' ->
Assert.equal_block_map ~msg:__LOC__ ~eq map2 map2' ;
@ -324,16 +328,10 @@ let test_subblock s =
SubBlocksSet.known s bh2 >>= fun known ->
Assert.is_true ~msg:__LOC__ known ;
SubBlocksSet.read_all s >>= fun set ->
let set' =
Block_hash.Set.empty
|> Block_hash.Set.add bh1
|> Block_hash.Set.add bh2 in
let set' = Block_hash.Set.(empty |> add bh1 |> add bh2) in
Assert.equal_block_set ~msg:__LOC__ set set' ;
SubBlocksSet.remove s bh2 >>= fun () ->
let set =
Block_hash.Set.empty
|> Block_hash.Set.add bh3'
|> Block_hash.Set.add bh3 in
let set = Block_hash.Set.(empty |> add bh3' |> add bh3) in
SubBlocksSet.store_all s set >>= fun () ->
SubBlocksSet.elements s >>= fun elts ->
Assert.equal_block_hash_list ~msg:__LOC__
@ -355,10 +353,7 @@ let test_subblock s =
SubBlocksMap.read_opt s bh1 >>= fun v1' ->
Assert.equal ~msg:__LOC__ (Some v1) v1' ;
Assert.is_true ~msg:__LOC__ known ;
let map =
Block_hash.Map.empty
|> Block_hash.Map.add bh1 v1
|> Block_hash.Map.add bh2 v2 in
let map = Block_hash.Map.(empty |> add bh1 v1 |> add bh2 v2) in
SubBlocksMap.read_all s >>= fun map' ->
Assert.equal_block_map ~eq:(=) ~msg:__LOC__ map map' ;

View File

@ -66,7 +66,7 @@ module IrminBlake2B : Irmin.Hash.S with type t = Context_hash.t = struct
end
module GitStore =
Irmin_leveldb.Make
Irmin_lmdb.Make
(Metadata)
(MBytesContent)
(Irmin.Path.String_list)
@ -202,9 +202,9 @@ let fork_test_chain v ~protocol ~expiration =
(*-- Initialisation ----------------------------------------------------------*)
let init ?patch_context ~root =
let init ?patch_context ?mapsize root =
GitStore.Repo.v
(Irmin_leveldb.config root) >>= fun repo ->
(Irmin_lmdb.config ?mapsize root) >>= fun repo ->
Lwt.return {
path = root ;
repo ;

View File

@ -19,7 +19,8 @@ type context = t
(** Open or initialize a versioned store at a given path. *)
val init:
?patch_context:(context -> context Lwt.t) ->
root:string ->
?mapsize:int64 ->
string ->
index Lwt.t
val commit_genesis:

View File

@ -4,8 +4,8 @@
((name tezos_storage)
(public_name tezos-storage)
(libraries (tezos-base
leveldb
irmin-leveldb))
lmdb
irmin-lmdb))
(flags (:standard -w -9+27-30-32-40@8
-safe-string
-open Tezos_base__TzPervasives))))

View File

@ -7,9 +7,13 @@
(* *)
(**************************************************************************)
module List = ListLabels
open Rresult
type t = {
dir : Lmdb.t ;
parent : (Lmdb.rw Lmdb.txn * Lmdb.db * Lmdb.rw Lmdb.cursor) Lwt.key ;
}
type t = LevelDB.db
type key = string list
type value = MBytes.t
@ -32,60 +36,128 @@ let () =
let concat = String.concat "/"
let split = String.split_on_char '/'
let init path =
try
return (LevelDB.open_db path)
with exn ->
Lwt.return (error_exn exn)
let lwt_fail_error err =
Lwt.fail_with (Lmdb.string_of_error err)
let close t = LevelDB.close t
let of_result = function
| Ok res -> Lwt.return res
| Error err -> lwt_fail_error err
let known t key =
Lwt.return (LevelDB.mem t (concat key))
let (>>=?) v f =
match v with
| Error err -> lwt_fail_error err
| Ok v -> f v
let read_opt t key =
Lwt.return (Option.map ~f:MBytes.of_string (LevelDB.get t (concat key)))
let init ?mapsize path =
if not (Sys.file_exists path) then Unix.mkdir path 0o755 ;
match Lmdb.opendir ?mapsize ~flags:[NoTLS] path 0o644 with
| Ok dir -> return { dir ; parent = Lwt.new_key () }
| Error err -> failwith "%a" Lmdb.pp_error err
let read t key =
match LevelDB.get t (concat key) with
| None -> fail (Unknown key)
| Some k -> return (MBytes.of_string k)
let close { dir } = Lmdb.closedir dir
let read_exn t key =
Lwt.wrap2 LevelDB.get_exn t (concat key) >|= MBytes.of_string
let known { dir ; parent } key =
begin match Lwt.get parent with
| Some (txn, db, _cursor) -> Lmdb.mem txn db (concat key)
| None ->
Lmdb.with_ro_db dir ~f:begin fun txn db ->
Lmdb.mem txn db (concat key)
end
end |> of_result
let store t k v =
LevelDB.put t (concat k) (MBytes.to_string v) ;
Lwt.return_unit
let read_opt { dir ; parent } key =
begin match Lwt.get parent with
| Some (txn, db, _cursor) -> Lmdb.get txn db (concat key) >>| MBytes.copy
| None ->
Lmdb.with_ro_db dir ~f:begin fun txn db ->
Lmdb.get txn db (concat key) >>| MBytes.copy
end
end |> function
| Ok v -> Lwt.return_some v
| Error KeyNotFound -> Lwt.return_none
| Error err -> lwt_fail_error err
let remove t k =
LevelDB.delete t (concat k) ;
Lwt.return_unit
let read { dir ; parent } key =
begin match Lwt.get parent with
| Some (txn, db, _cursor) -> Lmdb.get txn db (concat key) >>| MBytes.copy
| None ->
Lmdb.with_ro_db dir ~f:begin fun txn db ->
Lmdb.get txn db (concat key) >>| MBytes.copy
end
end |> function
| Ok v -> return v
| Error _err -> fail (Unknown key)
let read_exn { dir ; parent } key =
begin match Lwt.get parent with
| Some (txn, db, _cursor) -> Lmdb.get txn db (concat key) >>| MBytes.copy
| None ->
Lmdb.with_ro_db dir ~f:begin fun txn db ->
Lmdb.get txn db (concat key) >>| MBytes.copy
end
end |> of_result
let store { dir ; parent } k v =
begin match Lwt.get parent with
| Some (txn, db, _cursor) -> Lmdb.put txn db (concat k) v
| None ->
Lmdb.with_rw_db dir ~f:begin fun txn db ->
Lmdb.put txn db (concat k) v
end
end |> of_result
let remove { dir ; parent } k =
let remove txn db =
match Lmdb.del txn db (concat k) with
| Ok () -> Ok ()
| Error KeyNotFound -> Ok ()
| Error err -> Error err in
begin match Lwt.get parent with
| Some (txn, db, _cursor) -> remove txn db
| None -> Lmdb.with_rw_db dir ~f:remove
end |> of_result
let is_prefix s s' =
String.(length s <= length s' && compare s (sub s' 0 (length s)) = 0)
let known_dir t k =
let ret = ref false in
let known_dir { dir ; parent } k =
let k = concat k in
LevelDB.iter_from begin fun kk _ ->
if is_prefix k kk then ret := true ;
false
end t k ;
Lwt.return !ret
let cursor_fun cursor =
Lmdb.cursor_at cursor k >>= fun () ->
Lmdb.cursor_get cursor >>| fun (first_k, _v) ->
(is_prefix k (MBytes.to_string first_k))
in
begin match Lwt.get parent with
| Some (txn, db, _cursor) ->
Lmdb.with_cursor txn db ~f:cursor_fun
| None ->
Lmdb.with_ro_db dir ~f:begin fun txn db ->
Lmdb.with_cursor txn db ~f:cursor_fun
end
end |> of_result
let remove_dir t k =
let remove_dir { dir ; parent } k =
let k = concat k in
let batch = LevelDB.Batch.make () in
LevelDB.iter_from begin fun kk _ ->
if is_prefix k kk then begin
LevelDB.Batch.delete batch kk ;
true
end
else false
end t k ;
LevelDB.Batch.write t batch ;
Lwt.return_unit
let cursor_fun cursor =
Lmdb.cursor_at cursor k >>= fun () ->
Lmdb.cursor_iter cursor ~f:begin fun (kk, _v) ->
let kk_string = MBytes.to_string kk in
if is_prefix k kk_string then begin
Lmdb.cursor_del cursor
end
else Error KeyNotFound
end in
begin match Lwt.get parent with
| Some (txn, db, _cursor) ->
Lmdb.with_cursor txn db ~f:cursor_fun
| None ->
Lmdb.with_rw_db dir ~f:begin fun txn db ->
Lmdb.with_cursor txn db ~f:cursor_fun
end
end |> function
| Error KeyNotFound
| Ok () -> Lwt.return_unit
| Error err -> lwt_fail_error err
let list_equal l1 l2 len =
if len < 0 || len > List.length l1 || len > List.length l2
@ -116,45 +188,81 @@ let list_sub l pos len =
else inner (h :: acc, pred n) t in
inner ([], len) l
let with_rw_cursor_lwt ?sync ?metasync ?flags ?name { dir ; parent } ~f =
let local_parent =
match Lwt.get parent with
| None -> None
| Some (txn, _db, _cursor) -> Some txn in
Lmdb.create_rw_txn
?sync ?metasync ?parent:local_parent dir >>=? fun txn ->
Lmdb.opendb ?flags ?name txn >>=? fun db ->
Lmdb.opencursor txn db >>=? fun cursor ->
Lwt.with_value parent (Some (txn, db, cursor)) begin fun () ->
Lwt.try_bind (fun () -> f cursor)
begin fun res ->
Lmdb.cursor_close cursor ;
Lmdb.commit_txn txn >>=? fun () ->
Lwt.return res
end
begin fun exn ->
Lmdb.cursor_close cursor ;
Lmdb.abort_txn txn ;
Lwt.fail exn
end
end
let cursor_next_lwt cursor acc f =
match Lmdb.cursor_next cursor with
| Error KeyNotFound -> acc
| Error err -> lwt_fail_error err
| Ok () -> Lwt.bind acc f
let fold t k ~init ~f =
let k_concat = concat k in
let base_len = List.length k in
let i = LevelDB.Iterator.make t in
LevelDB.Iterator.seek i k_concat 0 (String.length k_concat) ;
let returned = Hashtbl.create 31 in
let rec inner acc =
match LevelDB.Iterator.valid i with
let rec inner ht cursor acc =
Lmdb.cursor_get cursor >>=? fun (kk, _v) ->
let kk = MBytes.to_string kk in
let kk_split = split kk in
match is_child ~child:kk_split ~parent:k with
| false -> Lwt.return acc
| true ->
let kk = LevelDB.Iterator.get_key i in
let kk_split = split kk in
match is_child ~child:kk_split ~parent:k with
| false -> Lwt.return acc
| true ->
let cur_len = List.length kk_split in
LevelDB.Iterator.next i ;
if cur_len = succ base_len then begin
(f (`Key kk_split) acc) >>= inner
end
else begin
let dir = list_sub kk_split 0 (succ base_len) in
if Hashtbl.mem returned dir then
inner acc
else begin
Hashtbl.add returned dir () ;
(f (`Dir dir) acc) >>= inner
end
end ;
in
inner init
let cur_len = List.length kk_split in
if cur_len = succ base_len then begin
cursor_next_lwt cursor (f (`Key kk_split) acc) (inner ht cursor)
end
else begin
let dir = list_sub kk_split 0 (succ base_len) in
if Hashtbl.mem ht dir then
cursor_next_lwt cursor (Lwt.return acc) (inner ht cursor)
else begin
Hashtbl.add ht dir () ;
cursor_next_lwt cursor (f (`Dir dir) acc) (inner ht cursor)
end
end in
with_rw_cursor_lwt t ~f:begin fun cursor ->
match Lmdb.cursor_at cursor (concat k) with
| Error KeyNotFound -> Lwt.return init
| Error err -> lwt_fail_error err
| Ok () ->
let ht = Hashtbl.create 31 in
inner ht cursor init
end
let fold_keys s k ~init ~f =
let rec loop k acc =
fold s k ~init:acc
~f:(fun file acc ->
match file with
| `Key k -> f k acc
| `Dir k -> loop k acc) in
loop k init
let fold_keys t k ~init ~f =
with_rw_cursor_lwt t ~f:begin fun cursor ->
match Lmdb.cursor_at cursor (concat k) with
| Error KeyNotFound -> Lwt.return init
| Error err -> lwt_fail_error err
| Ok () ->
let rec inner acc =
Lmdb.cursor_get cursor >>=? fun (kk, _v) ->
let kk = MBytes.to_string kk in
let kk_split = split kk in
match is_child ~child:kk_split ~parent:k with
| false -> Lwt.return acc
| true -> cursor_next_lwt cursor (f kk_split acc) inner
in inner init
end
let keys t = fold_keys t ~init:[] ~f:(fun k acc -> Lwt.return (k :: acc))
let keys t =
fold_keys t ~init:[] ~f:(fun k acc -> Lwt.return (k :: acc))

View File

@ -11,6 +11,6 @@ open Store_sigs
include STORE
val init: string -> t tzresult Lwt.t
val init: ?mapsize:int64 -> string -> t tzresult Lwt.t
val close : t -> unit

View File

@ -87,7 +87,7 @@ type t = {
let wrap_context_init f _ () =
Lwt_utils_unix.with_tempdir "tezos_test_" begin fun base_dir ->
let root = base_dir // "context" in
Context.init ~root ?patch_context:None >>= fun idx ->
Context.init ~mapsize:4_096_000L root >>= fun idx ->
Context.commit_genesis idx
~chain_id
~time:genesis_time

View File

@ -10,8 +10,8 @@ depends: [
"ocamlfind" { build }
"jbuilder" { build & = "1.0+beta20" }
"tezos-base"
"leveldb"
"irmin-leveldb"
"lmdb"
"irmin-lmdb"
"tezos-stdlib-unix" { test }
"alcotest-lwt" { test }
]