Shell: export Distributed_db.*.clear

This commit is contained in:
Grégoire Henry 2017-06-09 17:54:08 +02:00
parent 90780f3374
commit 747cdb1963
5 changed files with 33 additions and 38 deletions

View File

@ -188,8 +188,8 @@ module Raw_operation_hashes = struct
let read_all table hash n = let read_all table hash n =
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1)) map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
let remove_all table hash n = let clear_all table hash n =
Lwt_list.iter_p (fun i -> Table.remove table (hash, i)) (0 -- (n-1)) List.iter (fun i -> Table.clear table (hash, i)) (0 -- (n-1))
end end
@ -250,8 +250,8 @@ module Raw_operations = struct
let read_all table hash n = let read_all table hash n =
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1)) map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
let remove_all table hash n = let clear_all table hash n =
Lwt_list.iter_p (fun i -> Table.remove table (hash, i)) (0 -- (n-1)) List.iter (fun i -> Table.clear table (hash, i)) (0 -- (n-1))
end end
@ -713,31 +713,27 @@ let commit_block net_db hash n validation_result =
read_all_operations net_db hash n >>=? fun operations -> read_all_operations net_db hash n >>=? fun operations ->
State.Block.store State.Block.store
net_db.net_state header operations validation_result >>=? fun res -> net_db.net_state header operations validation_result >>=? fun res ->
Raw_block_header.Table.remove Raw_block_header.Table.clear net_db.block_header_db.table hash ;
net_db.block_header_db.table hash >>= fun () -> Raw_operation_hashes.clear_all
Raw_operation_hashes.remove_all net_db.operation_hashes_db.table hash n ;
net_db.operation_hashes_db.table hash n >>= fun () -> Raw_operations.clear_all
Raw_operations.remove_all net_db.operations_db.table hash n ;
net_db.operations_db.table hash n >>= fun () ->
(* TODO: proper handling of the operations table by the prevalidator. *) (* TODO: proper handling of the operations table by the prevalidator. *)
Lwt_list.iter_p List.iter
(Lwt_list.iter_p (List.iter
(fun op -> Raw_operation.Table.remove (fun op -> Raw_operation.Table.clear
net_db.operation_db.table net_db.operation_db.table
(Operation.hash op))) (Operation.hash op)))
operations >>= fun () -> operations ;
return res return res
let commit_invalid_block net_db hash n = let commit_invalid_block net_db hash n =
Raw_block_header.Table.read Raw_block_header.Table.read
net_db.block_header_db.table hash >>=? fun header -> net_db.block_header_db.table hash >>=? fun header ->
State.Block.store_invalid net_db.net_state header >>=? fun res -> State.Block.store_invalid net_db.net_state header >>=? fun res ->
Raw_block_header.Table.remove Raw_block_header.Table.clear net_db.block_header_db.table hash ;
net_db.block_header_db.table hash >>= fun () -> Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ;
Raw_operation_hashes.remove_all Raw_operations.clear_all net_db.operations_db.table hash n ;
net_db.operation_hashes_db.table hash n >>= fun () ->
Raw_operations.remove_all
net_db.operations_db.table hash n >>= fun () ->
return res return res
let inject_operation net_db h op = let inject_operation net_db h op =
@ -753,7 +749,7 @@ let inject_protocol db h p =
let commit_protocol db h = let commit_protocol db h =
Raw_protocol.Table.read db.protocol_db.table h >>=? fun p -> Raw_protocol.Table.read db.protocol_db.table h >>=? fun p ->
State.Protocol.store db.disk p >>= fun res -> State.Protocol.store db.disk p >>= fun res ->
Raw_protocol.Table.remove db.protocol_db.table h >>= fun () -> Raw_protocol.Table.clear db.protocol_db.table h ;
return (res <> None) return (res <> None)
type operation = type operation =
@ -803,12 +799,10 @@ let inject_block db bytes operations =
net_db.operations_db.table hash operations >>= fun _ -> net_db.operations_db.table hash operations >>= fun _ ->
return (hash, block) return (hash, block)
let remove_block net_db hash n = let clear_block net_db hash n =
Raw_operations.remove_all Raw_operations.clear_all net_db.operations_db.table hash n ;
net_db.operations_db.table hash n >>= fun () -> Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ;
Raw_operation_hashes.remove_all Raw_block_header.Table.clear net_db.block_header_db.table hash
net_db.operation_hashes_db.table hash n >>= fun () ->
Raw_block_header.Table.remove net_db.block_header_db.table hash
let broadcast_head net_db head mempool = let broadcast_head net_db head mempool =
let msg : Message.t = let msg : Message.t =
@ -848,6 +842,7 @@ module type DISTRIBUTED_DB = sig
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
val clear: t -> key -> unit
end end
module Make module Make
@ -867,9 +862,10 @@ module Make
let read_exn t k = Table.read_exn (Kind.proj t) k let read_exn t k = Table.read_exn (Kind.proj t) k
let prefetch t ?peer k p = Table.prefetch (Kind.proj t) ?peer k p let prefetch t ?peer k p = Table.prefetch (Kind.proj t) ?peer k p
let fetch t ?peer k p = Table.fetch (Kind.proj t) ?peer k p let fetch t ?peer k p = Table.fetch (Kind.proj t) ?peer k p
let remove t k = Table.remove (Kind.proj t) k let clear t k = Table.clear (Kind.proj t) k
let inject t k v = Table.inject (Kind.proj t) k v let inject t k v = Table.inject (Kind.proj t) k v
let watch t = Table.watch (Kind.proj t) let watch t = Table.watch (Kind.proj t)
let clear t k = Table.clear (Kind.proj t) k
end end
module Block_header = module Block_header =

View File

@ -52,7 +52,7 @@ val commit_invalid_block:
val inject_block: val inject_block:
t -> MBytes.t -> operation list list -> t -> MBytes.t -> operation list list ->
(Block_hash.t * Block_header.t) tzresult Lwt.t (Block_hash.t * Block_header.t) tzresult Lwt.t
val remove_block: net_db -> Block_hash.t -> int -> unit Lwt.t val clear_block: net_db -> Block_hash.t -> int -> unit
val inject_operation: val inject_operation:
net_db -> Operation_hash.t -> Operation.t -> bool tzresult Lwt.t net_db -> Operation_hash.t -> Operation.t -> bool tzresult Lwt.t
@ -83,6 +83,7 @@ module type DISTRIBUTED_DB = sig
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
val clear: t -> key -> unit
end end
module Block_header : module Block_header :

View File

@ -24,7 +24,7 @@ module type DISTRIBUTED_DB = sig
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
val remove: t -> key -> unit Lwt.t val clear: t -> key -> unit
val inject: t -> key -> value -> bool Lwt.t val inject: t -> key -> value -> bool Lwt.t
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
@ -190,13 +190,11 @@ end = struct
| Found _ -> | Found _ ->
Lwt.return_false Lwt.return_false
let remove s k = let clear s k =
match Memory_table.find s.memory k with match Memory_table.find s.memory k with
| exception Not_found -> Lwt.return_unit | exception Not_found -> ()
| Pending _ -> assert false | Pending _ -> assert false
| Found _ -> | Found _ -> Memory_table.remove s.memory k
Memory_table.remove s.memory k ;
Lwt.return_unit
let watch s = Watcher.create_stream s.input let watch s = Watcher.create_stream s.input

View File

@ -24,7 +24,7 @@ module type DISTRIBUTED_DB = sig
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
val remove: t -> key -> unit Lwt.t val clear: t -> key -> unit
val inject: t -> key -> value -> bool Lwt.t val inject: t -> key -> value -> bool Lwt.t
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper

View File

@ -832,8 +832,8 @@ let create_worker ?max_ttl state db =
let validation = let validation =
protect protect
~on_error: begin fun err -> ~on_error: begin fun err ->
Distributed_db.remove_block Distributed_db.clear_block
net.net_db hash (List.length operations) >>= fun () -> net.net_db hash (List.length operations) ;
Lwt.return (Error err) Lwt.return (Error err)
end end
begin fun () -> begin fun () ->