From e273cfa07ff436a4214b602794538b19cb390d5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Tue, 28 Mar 2017 13:31:41 +0200 Subject: [PATCH] Shell/Distributed_db: allow to `precheck` data. --- src/node/shell/distributed_db.ml | 76 ++++++++++---- src/node/shell/distributed_db.mli | 4 +- src/node/shell/distributed_db_functors.ml | 115 ++++++++++++++++----- src/node/shell/distributed_db_functors.mli | 79 ++++++++++++-- src/node/shell/node.ml | 2 +- src/node/shell/state.ml | 34 +++--- src/node/shell/state.mli | 2 +- 7 files changed, 234 insertions(+), 78 deletions(-) diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index 34fcf1f1e..1d6cafca3 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -21,13 +21,18 @@ type 'a request_param = { } module Make_raw - (Hash : HASH) - (Disk_table : State.DATA_STORE with type key := Hash.t) - (Memory_table : Hashtbl.S with type key := Hash.t) + (Hash : sig type t end) + (Disk_table : + Distributed_db_functors.DISK_TABLE with type key := Hash.t) + (Memory_table : + Distributed_db_functors.MEMORY_TABLE with type key := Hash.t) (Request_message : sig type param val forge : param -> Hash.t list -> Message.t - end) = struct + end) + (Precheck : Distributed_db_functors.PRECHECK + with type key := Hash.t + and type value := Disk_table.value) = struct type key = Hash.t type value = Disk_table.value @@ -45,7 +50,7 @@ module Make_raw (Hash) (Memory_table) (Request) module Table = Distributed_db_functors.Make_table - (Hash) (Disk_table) (Memory_table) (Scheduler) + (Hash) (Disk_table) (Memory_table) (Scheduler) (Precheck) type t = { scheduler: Scheduler.t ; @@ -62,23 +67,51 @@ module Make_raw end +module No_precheck = struct + type param = unit + let precheck _ _ _ = true +end + module Raw_operation = - Make_raw (Operation_hash) (State.Operation) (Operation_hash.Table) (struct - type param = Net_id.t - let forge net_id keys = Message.Get_operations (net_id, keys) - end) + Make_raw + (Operation_hash) + (State.Operation) + (Operation_hash.Table) + (struct + type param = Net_id.t + let forge net_id keys = Message.Get_operations (net_id, keys) + end) + (No_precheck) module Raw_block_header = - Make_raw (Block_hash) (State.Block_header) (Block_hash.Table) (struct - type param = Net_id.t - let forge net_id keys = Message.Get_block_headers (net_id, keys) + Make_raw + (Block_hash) + (State.Block_header) + (Block_hash.Table) + (struct + type param = Net_id.t + let forge net_id keys = Message.Get_block_headers (net_id, keys) + end) + (No_precheck) + +module Operation_list_table = + Hashtbl.Make(struct + type t = Block_hash.t * int + let hash = Hashtbl.hash + let equal (b1, i1) (b2, i2) = + Block_hash.equal b1 b2 && i1 = i2 end) module Raw_protocol = - Make_raw (Protocol_hash) (State.Protocol) (Protocol_hash.Table) (struct - type param = unit - let forge () keys = Message.Get_protocols keys - end) + Make_raw + (Protocol_hash) + (State.Protocol) + (Protocol_hash.Table) + (struct + type param = unit + let forge () keys = Message.Get_protocols keys + end) + (No_precheck) type callback = { notify_branch: P2p.Peer_id.t -> Block_hash.t list -> unit ; @@ -403,10 +436,13 @@ let shutdown { p2p ; p2p_readers ; active_nets } = P2p.shutdown p2p >>= fun () -> Lwt.return_unit -module type DISTRIBUTED_DB = Distributed_db_functors.DISTRIBUTED_DB +module type PARAMETRIZED_DISTRIBUTED_DB = + Distributed_db_functors.PARAMETRIZED_DISTRIBUTED_DB +module type DISTRIBUTED_DB = + Distributed_db_functors.DISTRIBUTED_DB module Make - (Table : DISTRIBUTED_DB) + (Table : PARAMETRIZED_DISTRIBUTED_DB with type param := unit) (Kind : sig type t val proj: t -> Table.t @@ -417,8 +453,8 @@ module Make let known t k = Table.known (Kind.proj t) k let read t k = Table.read (Kind.proj t) k let read_exn t k = Table.read_exn (Kind.proj t) k - let prefetch t ?peer k = Table.prefetch (Kind.proj t) ?peer k - let fetch t ?peer k = Table.fetch (Kind.proj t) ?peer k + let prefetch t ?peer k = Table.prefetch (Kind.proj t) ?peer k () + let fetch t ?peer k = Table.fetch (Kind.proj t) ?peer k () let commit t k = Table.commit (Kind.proj t) k let inject t k v = Table.inject (Kind.proj t) k v let watch t = Table.watch (Kind.proj t) diff --git a/src/node/shell/distributed_db.mli b/src/node/shell/distributed_db.mli index e3d21c687..6efeb5a30 100644 --- a/src/node/shell/distributed_db.mli +++ b/src/node/shell/distributed_db.mli @@ -40,11 +40,11 @@ module type DISTRIBUTED_DB = sig val known: t -> key -> bool Lwt.t val read: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t - val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit - val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t val commit: t -> key -> unit Lwt.t val inject: t -> key -> value -> bool Lwt.t val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper + val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit + val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t end module Operation : diff --git a/src/node/shell/distributed_db_functors.ml b/src/node/shell/distributed_db_functors.ml index f1d6f7b89..df4a1e960 100644 --- a/src/node/shell/distributed_db_functors.ml +++ b/src/node/shell/distributed_db_functors.ml @@ -7,19 +7,63 @@ (* *) (**************************************************************************) -module type DISTRIBUTED_DB = sig +module type PARAMETRIZED_RO_DISTRIBUTED_DB = sig + type t type key type value + type param + val known: t -> key -> bool Lwt.t val read: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t - val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit - val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t + + val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit + val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t + +end + +module type PARAMETRIZED_DISTRIBUTED_DB = sig + + include PARAMETRIZED_RO_DISTRIBUTED_DB + val commit: t -> key -> unit Lwt.t (* val commit_invalid: t -> key -> unit Lwt.t *) (* TODO *) val inject: t -> key -> value -> bool Lwt.t val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper + +end + +module type DISTRIBUTED_DB = sig + + include PARAMETRIZED_DISTRIBUTED_DB with type param := unit + + val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit + val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t + +end + +module type DISK_TABLE = sig + type store + type key + type value + val known: store -> key -> bool Lwt.t + val read: store -> key -> value tzresult Lwt.t + val read_opt: store -> key -> value option Lwt.t + val read_exn: store -> key -> value Lwt.t + val store: store -> key -> value -> bool Lwt.t + val remove: store -> key -> bool Lwt.t +end + +module type MEMORY_TABLE = sig + type 'a t + type key + val create: int -> 'a t + val find: 'a t -> key -> 'a + val add: 'a t -> key -> 'a -> unit + val replace: 'a t -> key -> 'a -> unit + val remove: 'a t -> key -> unit + val fold: (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b end module type SCHEDULER_EVENTS = sig @@ -29,16 +73,27 @@ module type SCHEDULER_EVENTS = sig val notify: t -> P2p.Peer_id.t -> key -> unit val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit + val notify_invalid: t -> P2p.Peer_id.t -> key -> unit +end + +module type PRECHECK = sig + type key + type param + type value + val precheck: key -> param -> value -> bool end module Make_table - (Hash : HASH) - (Disk_table : State.DATA_STORE with type key := Hash.t) - (Memory_table : Hashtbl.S with type key := Hash.t) - (Scheduler : SCHEDULER_EVENTS with type key := Hash.t) : sig + (Hash : sig type t end) + (Disk_table : DISK_TABLE with type key := Hash.t) + (Memory_table : MEMORY_TABLE with type key := Hash.t) + (Scheduler : SCHEDULER_EVENTS with type key := Hash.t) + (Precheck : PRECHECK with type key := Hash.t + and type value := Disk_table.value) : sig - include DISTRIBUTED_DB with type key = Hash.t - and type value = Disk_table.value + include PARAMETRIZED_DISTRIBUTED_DB with type key = Hash.t + and type value = Disk_table.value + and type param = Precheck.param val create: ?global_input:(key * value) Watcher.input -> Scheduler.t -> Disk_table.store -> t @@ -48,6 +103,7 @@ end = struct type key = Hash.t type value = Disk_table.value + type param = Precheck.param type t = { scheduler: Scheduler.t ; @@ -58,7 +114,7 @@ end = struct } and status = - | Pending of value Lwt.u + | Pending of value Lwt.u * param | Found of value let known s k = @@ -79,24 +135,23 @@ end = struct | Found v -> Lwt.return v | Pending _ -> Lwt.fail Not_found - let fetch s ?peer k = + let fetch s ?peer k param = match Memory_table.find s.memory k with | exception Not_found -> begin Disk_table.read_opt s.disk k >>= function | None -> let waiter, wakener = Lwt.wait () in - Memory_table.add s.memory k (Pending wakener) ; + Memory_table.add s.memory k (Pending (wakener, param)) ; Scheduler.request s.scheduler peer k ; waiter | Some v -> Lwt.return v end - | Pending w -> Lwt.waiter_of_wakener w + | Pending (w, _) -> Lwt.waiter_of_wakener w | Found v -> Lwt.return v - let prefetch s ?peer k = Lwt.ignore_result (fetch s ?peer k) + let prefetch s ?peer k param = Lwt.ignore_result (fetch s ?peer k param) let notify s p k v = - Scheduler.notify s.scheduler p k ; match Memory_table.find s.memory k with | exception Not_found -> begin Disk_table.known s.disk k >>= function @@ -107,13 +162,19 @@ end = struct Scheduler.notify_unrequested s.scheduler p k ; Lwt.return_unit end - | Pending w -> - Memory_table.replace s.memory k (Found v) ; - Lwt.wakeup w v ; - iter_option s.global_input - ~f:(fun input -> Watcher.notify input (k, v)) ; - Watcher.notify s.input (k, v) ; - Lwt.return_unit + | Pending (w, param) -> + if not (Precheck.precheck k param v) then begin + Scheduler.notify_invalid s.scheduler p k ; + Lwt.return_unit + end else begin + Scheduler.notify s.scheduler p k ; + Memory_table.replace s.memory k (Found v) ; + Lwt.wakeup w v ; + iter_option s.global_input + ~f:(fun input -> Watcher.notify input (k, v)) ; + Watcher.notify s.input (k, v) ; + Lwt.return_unit + end | Found _ -> Scheduler.notify_duplicate s.scheduler p k ; Lwt.return_unit @@ -137,7 +198,7 @@ end = struct | exception Not_found -> Lwt.return_unit | Pending _ -> assert false | Found v -> - Disk_table.store s.disk v >>= fun _ -> + Disk_table.store s.disk k v >>= fun _ -> Memory_table.remove s.memory k ; Lwt.return_unit @@ -158,8 +219,8 @@ module type REQUEST = sig end module Make_request_scheduler - (Hash : HASH) - (Table : Hashtbl.S with type key := Hash.t) + (Hash : sig type t end) + (Table : MEMORY_TABLE with type key := Hash.t) (Request : REQUEST with type key := Hash.t) : sig type t @@ -181,6 +242,7 @@ end = struct and event = | Request of P2p.Peer_id.t option * key | Notify of P2p.Peer_id.t * key + | Notify_invalid of P2p.Peer_id.t * key | Notify_duplicate of P2p.Peer_id.t * key | Notify_unrequested of P2p.Peer_id.t * key @@ -188,6 +250,8 @@ end = struct t.push_to_worker (Request (p, k)) let notify t p k = t.push_to_worker (Notify (p, k)) + let notify_invalid t p k = + t.push_to_worker (Notify_invalid (p, k)) let notify_duplicate t p k = t.push_to_worker (Notify_duplicate (p, k)) let notify_unrequested t p k = @@ -240,6 +304,7 @@ end = struct | Notify (_gid, key) -> Table.remove state.pending key ; Lwt.return_unit + | Notify_invalid _ | Notify_unrequested _ | Notify_duplicate _ -> (* TODO *) diff --git a/src/node/shell/distributed_db_functors.mli b/src/node/shell/distributed_db_functors.mli index e16e5cd05..1a57848f6 100644 --- a/src/node/shell/distributed_db_functors.mli +++ b/src/node/shell/distributed_db_functors.mli @@ -7,19 +7,65 @@ (* *) (**************************************************************************) -module type DISTRIBUTED_DB = sig +module type PARAMETRIZED_RO_DISTRIBUTED_DB = sig + type t type key type value + type param + val known: t -> key -> bool Lwt.t val read: t -> key -> value option Lwt.t val read_exn: t -> key -> value Lwt.t - val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit - val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t + + val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit + val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t + +end + +module type PARAMETRIZED_DISTRIBUTED_DB = sig + + include PARAMETRIZED_RO_DISTRIBUTED_DB + val commit: t -> key -> unit Lwt.t (* val commit_invalid: t -> key -> unit Lwt.t *) (* TODO *) val inject: t -> key -> value -> bool Lwt.t val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper + +end + +module type DISTRIBUTED_DB = sig + + include PARAMETRIZED_DISTRIBUTED_DB with type param := unit + + val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> unit + val fetch: t -> ?peer:P2p.Peer_id.t -> key -> value Lwt.t + +end + +module type DISK_TABLE = sig + (* A subtype of State.DATA_STORE *) + type store + type key + type value + val known: store -> key -> bool Lwt.t + val read: store -> key -> value tzresult Lwt.t + val read_opt: store -> key -> value option Lwt.t + val read_exn: store -> key -> value Lwt.t + val store: store -> key -> value -> bool Lwt.t + val remove: store -> key -> bool Lwt.t +end + +module type MEMORY_TABLE = sig + (* A subtype of Hashtbl.S *) + type 'a t + type key + val create: int -> 'a t + val find: 'a t -> key -> 'a + val add: 'a t -> key -> 'a -> unit + val replace: 'a t -> key -> 'a -> unit + val remove: 'a t -> key -> unit + val fold: (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b end module type SCHEDULER_EVENTS = sig @@ -29,16 +75,27 @@ module type SCHEDULER_EVENTS = sig val notify: t -> P2p.Peer_id.t -> key -> unit val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit + val notify_invalid: t -> P2p.Peer_id.t -> key -> unit +end + +module type PRECHECK = sig + type key + type param + type value + val precheck: key -> param -> value -> bool end module Make_table - (Hash : HASH) - (Disk_table : State.DATA_STORE with type key := Hash.t) - (Memory_table : Hashtbl.S with type key := Hash.t) - (Scheduler : SCHEDULER_EVENTS with type key := Hash.t) : sig + (Hash : sig type t end) + (Disk_table : DISK_TABLE with type key := Hash.t) + (Memory_table : MEMORY_TABLE with type key := Hash.t) + (Scheduler : SCHEDULER_EVENTS with type key := Hash.t) + (Precheck : PRECHECK with type key := Hash.t + and type value := Disk_table.value) : sig - include DISTRIBUTED_DB with type key = Hash.t - and type value = Disk_table.value + include PARAMETRIZED_DISTRIBUTED_DB with type key = Hash.t + and type value = Disk_table.value + and type param := Precheck.param val create: ?global_input:(key * value) Watcher.input -> Scheduler.t -> Disk_table.store -> t @@ -54,8 +111,8 @@ module type REQUEST = sig end module Make_request_scheduler - (Hash : HASH) - (Table : Hashtbl.S with type key := Hash.t) + (Hash : sig type t end) + (Table : MEMORY_TABLE with type key := Hash.t) (Request : REQUEST with type key := Hash.t) : sig type t diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index d2f6f3ad5..f2f5d5c5c 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -33,7 +33,7 @@ let inject_protocol state ?force:_ proto = "Compilation failed (%a)" Protocol_hash.pp_short hash | true -> - State.Protocol.store state proto >>= function + State.Protocol.store state hash proto >>= function | false -> failwith "Previously registred protocol (%a)" diff --git a/src/node/shell/state.ml b/src/node/shell/state.ml index 343f650e1..71757001d 100644 --- a/src/node/shell/state.ml +++ b/src/node/shell/state.ml @@ -176,7 +176,7 @@ module type DATA_STORE = sig val read_discovery_time_opt: store -> key -> Time.t option Lwt.t val read_discovery_time_exn: store -> key -> Time.t Lwt.t - val store: store -> value -> bool Lwt.t + val store: store -> key -> value -> bool Lwt.t val store_raw: store -> key -> MBytes.t -> value option tzresult Lwt.t val remove: store -> key -> bool Lwt.t @@ -263,14 +263,12 @@ end = struct S.Contents.read_opt (s, k) >>= function | None -> Lwt.return_none | Some v -> Lwt.return (Some { Time.data = Ok v ; time }) - let store s v = - let bytes = Data_encoding.Binary.to_bytes S.encoding v in - let k = S.hash_raw bytes in + let store s k v = S.Discovery_time.known s k >>= function | true -> Lwt.return_false | false -> let time = Time.now () in - S.RawContents.store (s, k) bytes >>= fun () -> + S.Contents.store (s, k) v >>= fun () -> S.Discovery_time.store s k time >>= fun () -> S.Pending.store s k >>= fun () -> Lwt.return_true @@ -366,7 +364,7 @@ end = struct let read_discovery_time = atomic2 Locked.read_discovery_time let read_discovery_time_opt = atomic2 Locked.read_discovery_time_opt let read_discovery_time_exn = atomic2 Locked.read_discovery_time_exn - let store = atomic2 Locked.store + let store = atomic3 Locked.store let store_raw = atomic3 Locked.store_raw let remove = atomic2 Locked.remove let mark_valid = atomic2 Locked.mark_valid @@ -443,7 +441,7 @@ module Raw_block_header = struct } in Locked.store_raw store genesis.block bytes >>= fun _created -> Lwt.return shell - + end module Raw_helpers = struct @@ -901,17 +899,17 @@ module Valid_block = struct let store net hash context = Shared.use net.state begin fun net_state -> Shared.use net.block_header_store begin fun block_header_store -> - Context.exists net_state.context_index hash >>= function - | true -> return None (* Previously stored context. *) - | false -> - Raw_block_header.Locked.invalid - block_header_store hash >>= function - | Some _ -> return None (* Previously invalidated block. *) - | None -> - Locked.store - block_header_store net_state net.valid_block_watcher - hash context net.forked_network_ttl >>=? fun valid_block -> - return (Some valid_block) + Context.exists net_state.context_index hash >>= function + | true -> return None (* Previously stored context. *) + | false -> + Raw_block_header.Locked.invalid + block_header_store hash >>= function + | Some _ -> return None (* Previously invalidated block. *) + | None -> + Locked.store + block_header_store net_state net.valid_block_watcher + hash context net.forked_network_ttl >>=? fun valid_block -> + return (Some valid_block) end end diff --git a/src/node/shell/state.mli b/src/node/shell/state.mli index 8e2c314e9..4f325e61e 100644 --- a/src/node/shell/state.mli +++ b/src/node/shell/state.mli @@ -119,7 +119,7 @@ module type DATA_STORE = sig returns [false] when the value is already stored, or [true] otherwise. For a given value, only one call to `store` (or an equivalent call to `store_raw`) might return [true]. *) - val store: store -> value -> bool Lwt.t + val store: store -> key -> value -> bool Lwt.t (** Store a value in the local database (unparsed data). It returns [Ok None] when the data is already stored, or [Ok (Some (hash,