Shell/Distributed_db: allow to precheck data.

This commit is contained in:
Grégoire Henry 2017-03-28 13:31:41 +02:00
parent a6307c40cf
commit e273cfa07f
7 changed files with 234 additions and 78 deletions

View File

@ -21,13 +21,18 @@ type 'a request_param = {
}
module Make_raw
(Hash : HASH)
(Disk_table : State.DATA_STORE with type key := Hash.t)
(Memory_table : Hashtbl.S with type key := Hash.t)
(Hash : sig type t end)
(Disk_table :
Distributed_db_functors.DISK_TABLE with type key := Hash.t)
(Memory_table :
Distributed_db_functors.MEMORY_TABLE with type key := Hash.t)
(Request_message : sig
type param
val forge : param -> Hash.t list -> Message.t
end) = struct
end)
(Precheck : Distributed_db_functors.PRECHECK
with type key := Hash.t
and type value := Disk_table.value) = struct
type key = Hash.t
type value = Disk_table.value
@ -45,7 +50,7 @@ module Make_raw
(Hash) (Memory_table) (Request)
module Table =
Distributed_db_functors.Make_table
(Hash) (Disk_table) (Memory_table) (Scheduler)
(Hash) (Disk_table) (Memory_table) (Scheduler) (Precheck)
type t = {
scheduler: Scheduler.t ;
@ -62,23 +67,51 @@ module Make_raw
end
module No_precheck = struct
type param = unit
let precheck _ _ _ = true
end
module Raw_operation =
Make_raw (Operation_hash) (State.Operation) (Operation_hash.Table) (struct
type param = Net_id.t
let forge net_id keys = Message.Get_operations (net_id, keys)
end)
Make_raw
(Operation_hash)
(State.Operation)
(Operation_hash.Table)
(struct
type param = Net_id.t
let forge net_id keys = Message.Get_operations (net_id, keys)
end)
(No_precheck)
module Raw_block_header =
Make_raw (Block_hash) (State.Block_header) (Block_hash.Table) (struct
type param = Net_id.t
let forge net_id keys = Message.Get_block_headers (net_id, keys)
Make_raw
(Block_hash)
(State.Block_header)
(Block_hash.Table)
(struct
type param = Net_id.t
let forge net_id keys = Message.Get_block_headers (net_id, keys)
end)
(No_precheck)
module Operation_list_table =
Hashtbl.Make(struct
type t = Block_hash.t * int
let hash = Hashtbl.hash
let equal (b1, i1) (b2, i2) =
Block_hash.equal b1 b2 && i1 = i2
end)
module Raw_protocol =
Make_raw (Protocol_hash) (State.Protocol) (Protocol_hash.Table) (struct
type param = unit
let forge () keys = Message.Get_protocols keys
end)
Make_raw
(Protocol_hash)
(State.Protocol)
(Protocol_hash.Table)
(struct
type param = unit
let forge () keys = Message.Get_protocols keys
end)
(No_precheck)
type callback = {
notify_branch: P2p.Peer_id.t -> Block_hash.t list -> unit ;
@ -403,10 +436,13 @@ let shutdown { p2p ; p2p_readers ; active_nets } =
P2p.shutdown p2p >>= fun () ->
Lwt.return_unit
module type DISTRIBUTED_DB = Distributed_db_functors.DISTRIBUTED_DB
module type PARAMETRIZED_DISTRIBUTED_DB =
Distributed_db_functors.PARAMETRIZED_DISTRIBUTED_DB
module type DISTRIBUTED_DB =
Distributed_db_functors.DISTRIBUTED_DB
module Make
(Table : DISTRIBUTED_DB)
(Table : PARAMETRIZED_DISTRIBUTED_DB with type param := unit)
(Kind : sig
type t
val proj: t -> Table.t
@ -417,8 +453,8 @@ module Make
let known t k = Table.known (Kind.proj t) k
let read t k = Table.read (Kind.proj t) k
let read_exn t k = Table.read_exn (Kind.proj t) k
let prefetch t ?peer k = Table.prefetch (Kind.proj t) ?peer k
let fetch t ?peer k = Table.fetch (Kind.proj t) ?peer k
let prefetch t ?peer k = Table.prefetch (Kind.proj t) ?peer k ()
let fetch t ?peer k = Table.fetch (Kind.proj t) ?peer k ()
let commit t k = Table.commit (Kind.proj t) k
let inject t k v = Table.inject (Kind.proj t) k v
let watch t = Table.watch (Kind.proj t)

View File

@ -40,11 +40,11 @@ module type DISTRIBUTED_DB = sig
val known: t -> key -> bool Lwt.t
val read: t -> key -> value option Lwt.t
val read_exn: t -> key -> value Lwt.t
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t
val commit: t -> key -> unit Lwt.t
val inject: t -> key -> value -> bool Lwt.t
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t
end
module Operation :

View File

@ -7,19 +7,63 @@
(* *)
(**************************************************************************)
module type DISTRIBUTED_DB = sig
module type PARAMETRIZED_RO_DISTRIBUTED_DB = sig
type t
type key
type value
type param
val known: t -> key -> bool Lwt.t
val read: t -> key -> value option Lwt.t
val read_exn: t -> key -> value Lwt.t
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
end
module type PARAMETRIZED_DISTRIBUTED_DB = sig
include PARAMETRIZED_RO_DISTRIBUTED_DB
val commit: t -> key -> unit Lwt.t
(* val commit_invalid: t -> key -> unit Lwt.t *) (* TODO *)
val inject: t -> key -> value -> bool Lwt.t
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
end
module type DISTRIBUTED_DB = sig
include PARAMETRIZED_DISTRIBUTED_DB with type param := unit
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t
end
module type DISK_TABLE = sig
type store
type key
type value
val known: store -> key -> bool Lwt.t
val read: store -> key -> value tzresult Lwt.t
val read_opt: store -> key -> value option Lwt.t
val read_exn: store -> key -> value Lwt.t
val store: store -> key -> value -> bool Lwt.t
val remove: store -> key -> bool Lwt.t
end
module type MEMORY_TABLE = sig
type 'a t
type key
val create: int -> 'a t
val find: 'a t -> key -> 'a
val add: 'a t -> key -> 'a -> unit
val replace: 'a t -> key -> 'a -> unit
val remove: 'a t -> key -> unit
val fold: (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b
end
module type SCHEDULER_EVENTS = sig
@ -29,16 +73,27 @@ module type SCHEDULER_EVENTS = sig
val notify: t -> P2p.Peer_id.t -> key -> unit
val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit
val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit
val notify_invalid: t -> P2p.Peer_id.t -> key -> unit
end
module type PRECHECK = sig
type key
type param
type value
val precheck: key -> param -> value -> bool
end
module Make_table
(Hash : HASH)
(Disk_table : State.DATA_STORE with type key := Hash.t)
(Memory_table : Hashtbl.S with type key := Hash.t)
(Scheduler : SCHEDULER_EVENTS with type key := Hash.t) : sig
(Hash : sig type t end)
(Disk_table : DISK_TABLE with type key := Hash.t)
(Memory_table : MEMORY_TABLE with type key := Hash.t)
(Scheduler : SCHEDULER_EVENTS with type key := Hash.t)
(Precheck : PRECHECK with type key := Hash.t
and type value := Disk_table.value) : sig
include DISTRIBUTED_DB with type key = Hash.t
and type value = Disk_table.value
include PARAMETRIZED_DISTRIBUTED_DB with type key = Hash.t
and type value = Disk_table.value
and type param = Precheck.param
val create:
?global_input:(key * value) Watcher.input ->
Scheduler.t -> Disk_table.store -> t
@ -48,6 +103,7 @@ end = struct
type key = Hash.t
type value = Disk_table.value
type param = Precheck.param
type t = {
scheduler: Scheduler.t ;
@ -58,7 +114,7 @@ end = struct
}
and status =
| Pending of value Lwt.u
| Pending of value Lwt.u * param
| Found of value
let known s k =
@ -79,24 +135,23 @@ end = struct
| Found v -> Lwt.return v
| Pending _ -> Lwt.fail Not_found
let fetch s ?peer k =
let fetch s ?peer k param =
match Memory_table.find s.memory k with
| exception Not_found -> begin
Disk_table.read_opt s.disk k >>= function
| None ->
let waiter, wakener = Lwt.wait () in
Memory_table.add s.memory k (Pending wakener) ;
Memory_table.add s.memory k (Pending (wakener, param)) ;
Scheduler.request s.scheduler peer k ;
waiter
| Some v -> Lwt.return v
end
| Pending w -> Lwt.waiter_of_wakener w
| Pending (w, _) -> Lwt.waiter_of_wakener w
| Found v -> Lwt.return v
let prefetch s ?peer k = Lwt.ignore_result (fetch s ?peer k)
let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param)
let notify s p k v =
Scheduler.notify s.scheduler p k ;
match Memory_table.find s.memory k with
| exception Not_found -> begin
Disk_table.known s.disk k >>= function
@ -107,13 +162,19 @@ end = struct
Scheduler.notify_unrequested s.scheduler p k ;
Lwt.return_unit
end
| Pending w ->
Memory_table.replace s.memory k (Found v) ;
Lwt.wakeup w v ;
iter_option s.global_input
~f:(fun input -> Watcher.notify input (k, v)) ;
Watcher.notify s.input (k, v) ;
Lwt.return_unit
| Pending (w, param) ->
if not (Precheck.precheck k param v) then begin
Scheduler.notify_invalid s.scheduler p k ;
Lwt.return_unit
end else begin
Scheduler.notify s.scheduler p k ;
Memory_table.replace s.memory k (Found v) ;
Lwt.wakeup w v ;
iter_option s.global_input
~f:(fun input -> Watcher.notify input (k, v)) ;
Watcher.notify s.input (k, v) ;
Lwt.return_unit
end
| Found _ ->
Scheduler.notify_duplicate s.scheduler p k ;
Lwt.return_unit
@ -137,7 +198,7 @@ end = struct
| exception Not_found -> Lwt.return_unit
| Pending _ -> assert false
| Found v ->
Disk_table.store s.disk v >>= fun _ ->
Disk_table.store s.disk k v >>= fun _ ->
Memory_table.remove s.memory k ;
Lwt.return_unit
@ -158,8 +219,8 @@ module type REQUEST = sig
end
module Make_request_scheduler
(Hash : HASH)
(Table : Hashtbl.S with type key := Hash.t)
(Hash : sig type t end)
(Table : MEMORY_TABLE with type key := Hash.t)
(Request : REQUEST with type key := Hash.t) : sig
type t
@ -181,6 +242,7 @@ end = struct
and event =
| Request of P2p.Peer_id.t option * key
| Notify of P2p.Peer_id.t * key
| Notify_invalid of P2p.Peer_id.t * key
| Notify_duplicate of P2p.Peer_id.t * key
| Notify_unrequested of P2p.Peer_id.t * key
@ -188,6 +250,8 @@ end = struct
t.push_to_worker (Request (p, k))
let notify t p k =
t.push_to_worker (Notify (p, k))
let notify_invalid t p k =
t.push_to_worker (Notify_invalid (p, k))
let notify_duplicate t p k =
t.push_to_worker (Notify_duplicate (p, k))
let notify_unrequested t p k =
@ -240,6 +304,7 @@ end = struct
| Notify (_gid, key) ->
Table.remove state.pending key ;
Lwt.return_unit
| Notify_invalid _
| Notify_unrequested _
| Notify_duplicate _ ->
(* TODO *)

View File

@ -7,19 +7,65 @@
(* *)
(**************************************************************************)
module type DISTRIBUTED_DB = sig
module type PARAMETRIZED_RO_DISTRIBUTED_DB = sig
type t
type key
type value
type param
val known: t -> key -> bool Lwt.t
val read: t -> key -> value option Lwt.t
val read_exn: t -> key -> value Lwt.t
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
end
module type PARAMETRIZED_DISTRIBUTED_DB = sig
include PARAMETRIZED_RO_DISTRIBUTED_DB
val commit: t -> key -> unit Lwt.t
(* val commit_invalid: t -> key -> unit Lwt.t *) (* TODO *)
val inject: t -> key -> value -> bool Lwt.t
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
end
module type DISTRIBUTED_DB = sig
include PARAMETRIZED_DISTRIBUTED_DB with type param := unit
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t
end
module type DISK_TABLE = sig
(* A subtype of State.DATA_STORE *)
type store
type key
type value
val known: store -> key -> bool Lwt.t
val read: store -> key -> value tzresult Lwt.t
val read_opt: store -> key -> value option Lwt.t
val read_exn: store -> key -> value Lwt.t
val store: store -> key -> value -> bool Lwt.t
val remove: store -> key -> bool Lwt.t
end
module type MEMORY_TABLE = sig
(* A subtype of Hashtbl.S *)
type 'a t
type key
val create: int -> 'a t
val find: 'a t -> key -> 'a
val add: 'a t -> key -> 'a -> unit
val replace: 'a t -> key -> 'a -> unit
val remove: 'a t -> key -> unit
val fold: (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b
end
module type SCHEDULER_EVENTS = sig
@ -29,16 +75,27 @@ module type SCHEDULER_EVENTS = sig
val notify: t -> P2p.Peer_id.t -> key -> unit
val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit
val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit
val notify_invalid: t -> P2p.Peer_id.t -> key -> unit
end
module type PRECHECK = sig
type key
type param
type value
val precheck: key -> param -> value -> bool
end
module Make_table
(Hash : HASH)
(Disk_table : State.DATA_STORE with type key := Hash.t)
(Memory_table : Hashtbl.S with type key := Hash.t)
(Scheduler : SCHEDULER_EVENTS with type key := Hash.t) : sig
(Hash : sig type t end)
(Disk_table : DISK_TABLE with type key := Hash.t)
(Memory_table : MEMORY_TABLE with type key := Hash.t)
(Scheduler : SCHEDULER_EVENTS with type key := Hash.t)
(Precheck : PRECHECK with type key := Hash.t
and type value := Disk_table.value) : sig
include DISTRIBUTED_DB with type key = Hash.t
and type value = Disk_table.value
include PARAMETRIZED_DISTRIBUTED_DB with type key = Hash.t
and type value = Disk_table.value
and type param := Precheck.param
val create:
?global_input:(key * value) Watcher.input ->
Scheduler.t -> Disk_table.store -> t
@ -54,8 +111,8 @@ module type REQUEST = sig
end
module Make_request_scheduler
(Hash : HASH)
(Table : Hashtbl.S with type key := Hash.t)
(Hash : sig type t end)
(Table : MEMORY_TABLE with type key := Hash.t)
(Request : REQUEST with type key := Hash.t) : sig
type t

View File

@ -33,7 +33,7 @@ let inject_protocol state ?force:_ proto =
"Compilation failed (%a)"
Protocol_hash.pp_short hash
| true ->
State.Protocol.store state proto >>= function
State.Protocol.store state hash proto >>= function
| false ->
failwith
"Previously registred protocol (%a)"

View File

@ -176,7 +176,7 @@ module type DATA_STORE = sig
val read_discovery_time_opt: store -> key -> Time.t option Lwt.t
val read_discovery_time_exn: store -> key -> Time.t Lwt.t
val store: store -> value -> bool Lwt.t
val store: store -> key -> value -> bool Lwt.t
val store_raw: store -> key -> MBytes.t -> value option tzresult Lwt.t
val remove: store -> key -> bool Lwt.t
@ -263,14 +263,12 @@ end = struct
S.Contents.read_opt (s, k) >>= function
| None -> Lwt.return_none
| Some v -> Lwt.return (Some { Time.data = Ok v ; time })
let store s v =
let bytes = Data_encoding.Binary.to_bytes S.encoding v in
let k = S.hash_raw bytes in
let store s k v =
S.Discovery_time.known s k >>= function
| true -> Lwt.return_false
| false ->
let time = Time.now () in
S.RawContents.store (s, k) bytes >>= fun () ->
S.Contents.store (s, k) v >>= fun () ->
S.Discovery_time.store s k time >>= fun () ->
S.Pending.store s k >>= fun () ->
Lwt.return_true
@ -366,7 +364,7 @@ end = struct
let read_discovery_time = atomic2 Locked.read_discovery_time
let read_discovery_time_opt = atomic2 Locked.read_discovery_time_opt
let read_discovery_time_exn = atomic2 Locked.read_discovery_time_exn
let store = atomic2 Locked.store
let store = atomic3 Locked.store
let store_raw = atomic3 Locked.store_raw
let remove = atomic2 Locked.remove
let mark_valid = atomic2 Locked.mark_valid
@ -901,17 +899,17 @@ module Valid_block = struct
let store net hash context =
Shared.use net.state begin fun net_state ->
Shared.use net.block_header_store begin fun block_header_store ->
Context.exists net_state.context_index hash >>= function
| true -> return None (* Previously stored context. *)
| false ->
Raw_block_header.Locked.invalid
block_header_store hash >>= function
| Some _ -> return None (* Previously invalidated block. *)
| None ->
Locked.store
block_header_store net_state net.valid_block_watcher
hash context net.forked_network_ttl >>=? fun valid_block ->
return (Some valid_block)
Context.exists net_state.context_index hash >>= function
| true -> return None (* Previously stored context. *)
| false ->
Raw_block_header.Locked.invalid
block_header_store hash >>= function
| Some _ -> return None (* Previously invalidated block. *)
| None ->
Locked.store
block_header_store net_state net.valid_block_watcher
hash context net.forked_network_ttl >>=? fun valid_block ->
return (Some valid_block)
end
end

View File

@ -119,7 +119,7 @@ module type DATA_STORE = sig
returns [false] when the value is already stored, or [true]
otherwise. For a given value, only one call to `store` (or an
equivalent call to `store_raw`) might return [true]. *)
val store: store -> value -> bool Lwt.t
val store: store -> key -> value -> bool Lwt.t
(** Store a value in the local database (unparsed data). It returns
[Ok None] when the data is already stored, or [Ok (Some (hash,