Storage: better use of LMDB transactions for better perf and to force sync at commit
Review and edit by @klalplok, @vbot, @samoht.
This commit is contained in:
parent
7d681f66e3
commit
43bf1b4cc4
89
vendors/irmin-lmdb/irmin_lmdb.ml
vendored
89
vendors/irmin-lmdb/irmin_lmdb.ml
vendored
@ -42,12 +42,48 @@ open Lwt.Infix
|
|||||||
type t = {
|
type t = {
|
||||||
db: Lmdb.t ;
|
db: Lmdb.t ;
|
||||||
root: string ;
|
root: string ;
|
||||||
|
mutable wtxn: (Lmdb.rw Lmdb.txn * Lmdb.db) option;
|
||||||
}
|
}
|
||||||
|
|
||||||
let of_result = function
|
let of_result = function
|
||||||
| Ok v -> Lwt.return v
|
| Ok v -> Lwt.return v
|
||||||
| Error err -> Lwt.fail_with (Lmdb.string_of_error err)
|
| Error err -> Lwt.fail_with (Lmdb.string_of_error err)
|
||||||
|
|
||||||
|
let (|>>) v f =
|
||||||
|
match v with
|
||||||
|
| Ok v -> f v
|
||||||
|
| Error e -> Error e
|
||||||
|
|
||||||
|
let get_wtxn db =
|
||||||
|
match db.wtxn with
|
||||||
|
| Some t -> Ok t
|
||||||
|
| None ->
|
||||||
|
Lmdb.create_rw_txn db.db |>> fun txn ->
|
||||||
|
Lmdb.opendb txn |>> fun ddb ->
|
||||||
|
db.wtxn <- Some (txn, ddb);
|
||||||
|
Ok (txn, ddb)
|
||||||
|
|
||||||
|
let commit_wtxn db =
|
||||||
|
match db.wtxn with
|
||||||
|
| None -> Ok ()
|
||||||
|
| Some (t, _ddb) ->
|
||||||
|
db.wtxn <- None;
|
||||||
|
Lmdb.commit_txn t
|
||||||
|
|
||||||
|
let add db k v =
|
||||||
|
get_wtxn db |>> fun (txn, ddb) ->
|
||||||
|
Lmdb.put_string txn ddb k v
|
||||||
|
|
||||||
|
let add db k v =
|
||||||
|
of_result @@ add db k v
|
||||||
|
|
||||||
|
let add_cstruct db k v =
|
||||||
|
get_wtxn db |>> fun (txn, ddb) ->
|
||||||
|
Lmdb.put txn ddb k (Cstruct.to_bigarray v)
|
||||||
|
|
||||||
|
let add_cstruct db k v =
|
||||||
|
of_result @@ add_cstruct db k v
|
||||||
|
|
||||||
let src = Logs.Src.create "irmin.lmdb" ~doc:"Irmin in a Lmdb store"
|
let src = Logs.Src.create "irmin.lmdb" ~doc:"Irmin in a Lmdb store"
|
||||||
module Log = (val Logs.src_log src : Logs.LOG)
|
module Log = (val Logs.src_log src : Logs.LOG)
|
||||||
|
|
||||||
@ -81,27 +117,25 @@ let config
|
|||||||
let config = C.add config Conf.readonly readonly in
|
let config = C.add config Conf.readonly readonly in
|
||||||
Option.value_map mapsize ~default:config ~f:(C.add config Conf.mapsize)
|
Option.value_map mapsize ~default:config ~f:(C.add config Conf.mapsize)
|
||||||
|
|
||||||
|
type ('r) reader = { f : 'k. 'k Lmdb.txn -> Lmdb.db -> ('r, Lmdb.error) result } [@@unboxed]
|
||||||
|
|
||||||
|
let with_read_db db ~f =
|
||||||
|
match db.wtxn with
|
||||||
|
| None ->
|
||||||
|
Lmdb.with_ro_db db.db ~f:f.f
|
||||||
|
| Some (txn, ddb) ->
|
||||||
|
f.f txn ddb
|
||||||
|
|
||||||
let mem db k =
|
let mem db k =
|
||||||
Lmdb.with_ro_db db ~f:(fun txn db -> Lmdb.mem txn db k) |>
|
with_read_db db ~f:{ f = fun txn db -> Lmdb.mem txn db k } |>
|
||||||
of_result
|
of_result
|
||||||
|
|
||||||
let find_bind db k ~f =
|
let find_bind db k ~f =
|
||||||
match Lmdb.with_ro_db db
|
match with_read_db db ~f:{ f = fun txn db -> Result.map ~f (Lmdb.get txn db k) } with
|
||||||
~f:(fun txn db -> Result.map ~f (Lmdb.get txn db k)) with
|
|
||||||
| Error KeyNotFound -> Lwt.return_none
|
| Error KeyNotFound -> Lwt.return_none
|
||||||
| Error err -> Lwt.fail_with (Lmdb.string_of_error err)
|
| Error err -> Lwt.fail_with (Lmdb.string_of_error err)
|
||||||
| Ok v -> Lwt.return v
|
| Ok v -> Lwt.return v
|
||||||
|
|
||||||
let add db k v =
|
|
||||||
Lmdb.with_rw_db db ~f:begin fun txn db ->
|
|
||||||
Lmdb.put_string txn db k v ;
|
|
||||||
end |> of_result
|
|
||||||
|
|
||||||
let add_cstruct db k v =
|
|
||||||
Lmdb.with_rw_db db ~f:begin fun txn db ->
|
|
||||||
Lmdb.put txn db k (Cstruct.to_bigarray v) ;
|
|
||||||
end |> of_result
|
|
||||||
|
|
||||||
module Irmin_value_store
|
module Irmin_value_store
|
||||||
(M: Irmin.Metadata.S)
|
(M: Irmin.Metadata.S)
|
||||||
(H: Irmin.Hash.S)
|
(H: Irmin.Hash.S)
|
||||||
@ -117,11 +151,11 @@ module Irmin_value_store
|
|||||||
let lmdb_of_key h =
|
let lmdb_of_key h =
|
||||||
"contents/" ^ Cstruct.to_string (H.to_raw h)
|
"contents/" ^ Cstruct.to_string (H.to_raw h)
|
||||||
|
|
||||||
let mem { db ; _ } key =
|
let mem db key =
|
||||||
let key = lmdb_of_key key in
|
let key = lmdb_of_key key in
|
||||||
mem db key
|
mem db key
|
||||||
|
|
||||||
let find { db ; _ } key =
|
let find db key =
|
||||||
let key = lmdb_of_key key in
|
let key = lmdb_of_key key in
|
||||||
find_bind db key ~f:begin fun v ->
|
find_bind db key ~f:begin fun v ->
|
||||||
Option.of_result (C.of_string Cstruct.(to_string (of_bigarray v)))
|
Option.of_result (C.of_string Cstruct.(to_string (of_bigarray v)))
|
||||||
@ -129,7 +163,7 @@ module Irmin_value_store
|
|||||||
|
|
||||||
let to_string = Fmt.to_to_string C.pp
|
let to_string = Fmt.to_to_string C.pp
|
||||||
|
|
||||||
let add { db ; _ } v =
|
let add db v =
|
||||||
let k = H.digest C.t v in
|
let k = H.digest C.t v in
|
||||||
let k_lmdb = lmdb_of_key k in
|
let k_lmdb = lmdb_of_key k in
|
||||||
let v = to_string v in
|
let v = to_string v in
|
||||||
@ -376,7 +410,7 @@ module Irmin_value_store
|
|||||||
let lmdb_of_key h =
|
let lmdb_of_key h =
|
||||||
"node/" ^ Cstruct.to_string (H.to_raw h)
|
"node/" ^ Cstruct.to_string (H.to_raw h)
|
||||||
|
|
||||||
let mem { db ; _ } key =
|
let mem db key =
|
||||||
let key = lmdb_of_key key in
|
let key = lmdb_of_key key in
|
||||||
mem db key
|
mem db key
|
||||||
|
|
||||||
@ -384,11 +418,11 @@ module Irmin_value_store
|
|||||||
Irmin.Type.decode_cstruct (Irmin.Type.list Val.entry_t) v |>
|
Irmin.Type.decode_cstruct (Irmin.Type.list Val.entry_t) v |>
|
||||||
Option.of_result
|
Option.of_result
|
||||||
|
|
||||||
let find { db ; _ } key =
|
let find db key =
|
||||||
let key = lmdb_of_key key in
|
let key = lmdb_of_key key in
|
||||||
find_bind db key ~f:(fun v -> of_cstruct (cstruct_of_ba_copy v))
|
find_bind db key ~f:(fun v -> of_cstruct (cstruct_of_ba_copy v))
|
||||||
|
|
||||||
let add { db ; _ } v =
|
let add db v =
|
||||||
let v = Irmin.Type.encode_cstruct (Irmin.Type.list Val.entry_t) v in
|
let v = Irmin.Type.encode_cstruct (Irmin.Type.list Val.entry_t) v in
|
||||||
let k = H.digest Irmin.Type.cstruct v in
|
let k = H.digest Irmin.Type.cstruct v in
|
||||||
let k_lmdb = lmdb_of_key k in
|
let k_lmdb = lmdb_of_key k in
|
||||||
@ -439,7 +473,7 @@ module Irmin_value_store
|
|||||||
type key = H.t
|
type key = H.t
|
||||||
type value = Val.t
|
type value = Val.t
|
||||||
|
|
||||||
let mem { db ; _ } key =
|
let mem db key =
|
||||||
let key = lmdb_of_key key in
|
let key = lmdb_of_key key in
|
||||||
mem db key
|
mem db key
|
||||||
|
|
||||||
@ -447,15 +481,16 @@ module Irmin_value_store
|
|||||||
Irmin.Type.decode_cstruct Val.t v |>
|
Irmin.Type.decode_cstruct Val.t v |>
|
||||||
Option.of_result
|
Option.of_result
|
||||||
|
|
||||||
let find { db ; _ } key =
|
let find db key =
|
||||||
let key = lmdb_of_key key in
|
let key = lmdb_of_key key in
|
||||||
find_bind db key ~f:(fun v -> of_cstruct (cstruct_of_ba_copy v))
|
find_bind db key ~f:(fun v -> of_cstruct (cstruct_of_ba_copy v))
|
||||||
|
|
||||||
let add { db ; _ } v =
|
let add db v =
|
||||||
let v = Irmin.Type.encode_cstruct Val.t v in
|
let v = Irmin.Type.encode_cstruct Val.t v in
|
||||||
let k = H.digest Irmin.Type.cstruct v in
|
let k = H.digest Irmin.Type.cstruct v in
|
||||||
let k_lmdb = lmdb_of_key k in
|
let k_lmdb = lmdb_of_key k in
|
||||||
add_cstruct db k_lmdb v >|= fun () -> k
|
add_cstruct db k_lmdb v >>= fun () ->
|
||||||
|
of_result @@ commit_wtxn db >|= fun () -> k
|
||||||
|
|
||||||
end
|
end
|
||||||
include AO
|
include AO
|
||||||
@ -508,10 +543,10 @@ module Irmin_branch_store (B: Branch) (H: Irmin.Hash.S) = struct
|
|||||||
|
|
||||||
let lmdb_of_branch r = Fmt.to_to_string B.pp_ref r
|
let lmdb_of_branch r = Fmt.to_to_string B.pp_ref r
|
||||||
|
|
||||||
let mem { db ; _ } r =
|
let mem db r =
|
||||||
mem db.db (lmdb_of_branch r)
|
mem db.db (lmdb_of_branch r)
|
||||||
|
|
||||||
let find { db ; _ } r =
|
let find db r =
|
||||||
find_bind db.db (lmdb_of_branch r)
|
find_bind db.db (lmdb_of_branch r)
|
||||||
~f:(fun v -> Some (H.of_raw (cstruct_of_ba_copy v)))
|
~f:(fun v -> Some (H.of_raw (cstruct_of_ba_copy v)))
|
||||||
|
|
||||||
@ -642,12 +677,12 @@ module Make
|
|||||||
let root = match root with None -> "irmin.ldb" | Some root -> root in
|
let root = match root with None -> "irmin.ldb" | Some root -> root in
|
||||||
if not (Sys.file_exists root) then Unix.mkdir root 0o755 ;
|
if not (Sys.file_exists root) then Unix.mkdir root 0o755 ;
|
||||||
let flags = if readonly then [ Lmdb.RdOnly ] else [] in
|
let flags = if readonly then [ Lmdb.RdOnly ] else [] in
|
||||||
let flags = Lmdb.NoMetaSync :: Lmdb.NoRdAhead :: Lmdb.NoTLS :: flags in
|
let flags = Lmdb.NoRdAhead :: Lmdb.NoTLS :: flags in
|
||||||
let file_flags = if readonly then 0o444 else 0o644 in
|
let file_flags = if readonly then 0o444 else 0o644 in
|
||||||
match Lmdb.opendir ~mapsize ~flags root file_flags with
|
match Lmdb.opendir ~mapsize ~flags root file_flags with
|
||||||
| Error err -> Lwt.fail_with (Lmdb.string_of_error err)
|
| Error err -> Lwt.fail_with (Lmdb.string_of_error err)
|
||||||
| Ok db ->
|
| Ok db ->
|
||||||
let db = { db ; root } in
|
let db = { db ; root ; wtxn = None } in
|
||||||
Branch.v db >|= fun branch ->
|
Branch.v db >|= fun branch ->
|
||||||
{ db; branch; config = conf }
|
{ db; branch; config = conf }
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user