Shell: fixme broken invariant in Distributed_db_functors.
The invariant of the `clear` function was not properly inforced by the module interface. This patch remove the inappropriate invariant and properly rename the function.
This commit is contained in:
parent
673f70c5d0
commit
1a10504959
@ -201,7 +201,7 @@ module Raw_operation_hashes = struct
|
|||||||
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
|
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
|
||||||
|
|
||||||
let clear_all table hash n =
|
let clear_all table hash n =
|
||||||
List.iter (fun i -> Table.clear table (hash, i)) (0 -- (n-1))
|
List.iter (fun i -> Table.clear_or_cancel table (hash, i)) (0 -- (n-1))
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -270,7 +270,7 @@ module Raw_operations = struct
|
|||||||
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
|
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
|
||||||
|
|
||||||
let clear_all table hash n =
|
let clear_all table hash n =
|
||||||
List.iter (fun i -> Table.clear table (hash, i)) (0 -- (n-1))
|
List.iter (fun i -> Table.clear_or_cancel table (hash, i)) (0 -- (n-1))
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -750,7 +750,7 @@ let commit_block net_db hash validation_result =
|
|||||||
hash header.shell.validation_passes >>=? fun operations ->
|
hash header.shell.validation_passes >>=? fun operations ->
|
||||||
State.Block.store
|
State.Block.store
|
||||||
net_db.net_state header operations validation_result >>=? fun res ->
|
net_db.net_state header operations validation_result >>=? fun res ->
|
||||||
Raw_block_header.Table.clear net_db.block_header_db.table hash ;
|
Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash ;
|
||||||
Raw_operation_hashes.clear_all
|
Raw_operation_hashes.clear_all
|
||||||
net_db.operation_hashes_db.table hash header.shell.validation_passes ;
|
net_db.operation_hashes_db.table hash header.shell.validation_passes ;
|
||||||
Raw_operations.clear_all
|
Raw_operations.clear_all
|
||||||
@ -758,7 +758,7 @@ let commit_block net_db hash validation_result =
|
|||||||
(* TODO: proper handling of the operations table by the prevalidator. *)
|
(* TODO: proper handling of the operations table by the prevalidator. *)
|
||||||
List.iter
|
List.iter
|
||||||
(List.iter
|
(List.iter
|
||||||
(fun op -> Raw_operation.Table.clear
|
(fun op -> Raw_operation.Table.clear_or_cancel
|
||||||
net_db.operation_db.table
|
net_db.operation_db.table
|
||||||
(Operation.hash op)))
|
(Operation.hash op)))
|
||||||
operations ;
|
operations ;
|
||||||
@ -768,7 +768,7 @@ let commit_invalid_block net_db hash =
|
|||||||
Raw_block_header.Table.read
|
Raw_block_header.Table.read
|
||||||
net_db.block_header_db.table hash >>=? fun header ->
|
net_db.block_header_db.table hash >>=? fun header ->
|
||||||
State.Block.store_invalid net_db.net_state header >>=? fun res ->
|
State.Block.store_invalid net_db.net_state header >>=? fun res ->
|
||||||
Raw_block_header.Table.clear net_db.block_header_db.table hash ;
|
Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash ;
|
||||||
Raw_operation_hashes.clear_all
|
Raw_operation_hashes.clear_all
|
||||||
net_db.operation_hashes_db.table hash header.shell.validation_passes ;
|
net_db.operation_hashes_db.table hash header.shell.validation_passes ;
|
||||||
Raw_operations.clear_all
|
Raw_operations.clear_all
|
||||||
@ -788,7 +788,7 @@ let inject_protocol db h p =
|
|||||||
let commit_protocol db h =
|
let commit_protocol db h =
|
||||||
Raw_protocol.Table.read db.protocol_db.table h >>=? fun p ->
|
Raw_protocol.Table.read db.protocol_db.table h >>=? fun p ->
|
||||||
State.Protocol.store db.disk p >>= fun res ->
|
State.Protocol.store db.disk p >>= fun res ->
|
||||||
Raw_protocol.Table.clear db.protocol_db.table h ;
|
Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ;
|
||||||
return (res <> None)
|
return (res <> None)
|
||||||
|
|
||||||
type operation =
|
type operation =
|
||||||
@ -841,7 +841,7 @@ let inject_block db bytes operations =
|
|||||||
let clear_block net_db hash n =
|
let clear_block net_db hash n =
|
||||||
Raw_operations.clear_all net_db.operations_db.table hash n ;
|
Raw_operations.clear_all net_db.operations_db.table hash n ;
|
||||||
Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ;
|
Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ;
|
||||||
Raw_block_header.Table.clear net_db.block_header_db.table hash
|
Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash
|
||||||
|
|
||||||
let broadcast_head net_db head mempool =
|
let broadcast_head net_db head mempool =
|
||||||
let msg : Message.t =
|
let msg : Message.t =
|
||||||
@ -881,7 +881,7 @@ module type DISTRIBUTED_DB = sig
|
|||||||
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
||||||
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
||||||
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
|
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
|
||||||
val clear: t -> key -> unit
|
val clear_or_cancel: t -> key -> unit
|
||||||
end
|
end
|
||||||
|
|
||||||
module Make
|
module Make
|
||||||
@ -901,10 +901,9 @@ module Make
|
|||||||
let read_exn t k = Table.read_exn (Kind.proj t) k
|
let read_exn t k = Table.read_exn (Kind.proj t) k
|
||||||
let prefetch t ?peer k p = Table.prefetch (Kind.proj t) ?peer k p
|
let prefetch t ?peer k p = Table.prefetch (Kind.proj t) ?peer k p
|
||||||
let fetch t ?peer k p = Table.fetch (Kind.proj t) ?peer k p
|
let fetch t ?peer k p = Table.fetch (Kind.proj t) ?peer k p
|
||||||
let clear t k = Table.clear (Kind.proj t) k
|
let clear_or_cancel t k = Table.clear_or_cancel (Kind.proj t) k
|
||||||
let inject t k v = Table.inject (Kind.proj t) k v
|
let inject t k v = Table.inject (Kind.proj t) k v
|
||||||
let watch t = Table.watch (Kind.proj t)
|
let watch t = Table.watch (Kind.proj t)
|
||||||
let clear t k = Table.clear (Kind.proj t) k
|
|
||||||
end
|
end
|
||||||
|
|
||||||
module Block_header =
|
module Block_header =
|
||||||
|
@ -84,7 +84,7 @@ module type DISTRIBUTED_DB = sig
|
|||||||
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
||||||
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
||||||
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
|
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
|
||||||
val clear: t -> key -> unit
|
val clear_or_cancel: t -> key -> unit
|
||||||
end
|
end
|
||||||
|
|
||||||
module Block_header :
|
module Block_header :
|
||||||
|
@ -24,7 +24,7 @@ module type DISTRIBUTED_DB = sig
|
|||||||
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
||||||
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
|
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
|
||||||
|
|
||||||
val clear: t -> key -> unit
|
val clear_or_cancel: t -> key -> unit
|
||||||
val inject: t -> key -> value -> bool Lwt.t
|
val inject: t -> key -> value -> bool Lwt.t
|
||||||
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
||||||
|
|
||||||
@ -56,6 +56,7 @@ module type SCHEDULER_EVENTS = sig
|
|||||||
type key
|
type key
|
||||||
val request: t -> P2p.Peer_id.t option -> key -> unit
|
val request: t -> P2p.Peer_id.t option -> key -> unit
|
||||||
val notify: t -> P2p.Peer_id.t -> key -> unit
|
val notify: t -> P2p.Peer_id.t -> key -> unit
|
||||||
|
val notify_cancelation: t -> key -> unit
|
||||||
val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit
|
val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit
|
||||||
val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit
|
val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit
|
||||||
val notify_invalid: t -> P2p.Peer_id.t -> key -> unit
|
val notify_invalid: t -> P2p.Peer_id.t -> key -> unit
|
||||||
@ -215,10 +216,13 @@ end = struct
|
|||||||
| Found _ ->
|
| Found _ ->
|
||||||
Lwt.return_false
|
Lwt.return_false
|
||||||
|
|
||||||
let clear s k =
|
let clear_or_cancel s k =
|
||||||
match Memory_table.find s.memory k with
|
match Memory_table.find s.memory k with
|
||||||
| exception Not_found -> ()
|
| exception Not_found -> ()
|
||||||
| Pending _ -> assert false
|
| Pending (w, _) ->
|
||||||
|
Scheduler.notify_cancelation s.scheduler k ;
|
||||||
|
Memory_table.remove s.memory k ;
|
||||||
|
Lwt.wakeup_later_exn w Lwt.Canceled
|
||||||
| Found _ -> Memory_table.remove s.memory k
|
| Found _ -> Memory_table.remove s.memory k
|
||||||
|
|
||||||
let watch s = Watcher.create_stream s.input
|
let watch s = Watcher.create_stream s.input
|
||||||
@ -268,6 +272,7 @@ end = struct
|
|||||||
and event =
|
and event =
|
||||||
| Request of P2p.Peer_id.t option * key
|
| Request of P2p.Peer_id.t option * key
|
||||||
| Notify of P2p.Peer_id.t * key
|
| Notify of P2p.Peer_id.t * key
|
||||||
|
| Notify_cancelation of key
|
||||||
| Notify_invalid of P2p.Peer_id.t * key
|
| Notify_invalid of P2p.Peer_id.t * key
|
||||||
| Notify_duplicate of P2p.Peer_id.t * key
|
| Notify_duplicate of P2p.Peer_id.t * key
|
||||||
| Notify_unrequested of P2p.Peer_id.t * key
|
| Notify_unrequested of P2p.Peer_id.t * key
|
||||||
@ -278,6 +283,10 @@ end = struct
|
|||||||
debug "push received %a from %a"
|
debug "push received %a from %a"
|
||||||
Hash.pp k P2p.Peer_id.pp_short p ;
|
Hash.pp k P2p.Peer_id.pp_short p ;
|
||||||
t.push_to_worker (Notify (p, k))
|
t.push_to_worker (Notify (p, k))
|
||||||
|
let notify_cancelation t k =
|
||||||
|
debug "push cancelation %a"
|
||||||
|
Hash.pp k ;
|
||||||
|
t.push_to_worker (Notify_cancelation k)
|
||||||
let notify_invalid t p k =
|
let notify_invalid t p k =
|
||||||
debug "push received invalid %a from %a"
|
debug "push received invalid %a from %a"
|
||||||
Hash.pp k P2p.Peer_id.pp_short p ;
|
Hash.pp k P2p.Peer_id.pp_short p ;
|
||||||
@ -361,6 +370,11 @@ end = struct
|
|||||||
lwt_debug "received %a from %a"
|
lwt_debug "received %a from %a"
|
||||||
Hash.pp key P2p.Peer_id.pp_short peer >>= fun () ->
|
Hash.pp key P2p.Peer_id.pp_short peer >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
| Notify_cancelation key ->
|
||||||
|
Table.remove state.pending key ;
|
||||||
|
lwt_debug "canceled %a"
|
||||||
|
Hash.pp key >>= fun () ->
|
||||||
|
Lwt.return_unit
|
||||||
| Notify_invalid (peer, key) ->
|
| Notify_invalid (peer, key) ->
|
||||||
lwt_debug "received invalid %a from %a"
|
lwt_debug "received invalid %a from %a"
|
||||||
Hash.pp key P2p.Peer_id.pp_short peer >>= fun () ->
|
Hash.pp key P2p.Peer_id.pp_short peer >>= fun () ->
|
||||||
|
@ -24,7 +24,7 @@ module type DISTRIBUTED_DB = sig
|
|||||||
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit
|
||||||
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
|
val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t
|
||||||
|
|
||||||
val clear: t -> key -> unit
|
val clear_or_cancel: t -> key -> unit
|
||||||
val inject: t -> key -> value -> bool Lwt.t
|
val inject: t -> key -> value -> bool Lwt.t
|
||||||
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper
|
||||||
|
|
||||||
@ -57,6 +57,7 @@ module type SCHEDULER_EVENTS = sig
|
|||||||
type key
|
type key
|
||||||
val request: t -> P2p.Peer_id.t option -> key -> unit
|
val request: t -> P2p.Peer_id.t option -> key -> unit
|
||||||
val notify: t -> P2p.Peer_id.t -> key -> unit
|
val notify: t -> P2p.Peer_id.t -> key -> unit
|
||||||
|
val notify_cancelation: t -> key -> unit
|
||||||
val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit
|
val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit
|
||||||
val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit
|
val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit
|
||||||
val notify_invalid: t -> P2p.Peer_id.t -> key -> unit
|
val notify_invalid: t -> P2p.Peer_id.t -> key -> unit
|
||||||
|
Loading…
Reference in New Issue
Block a user