From 1a10504959a6a9becfe520d9d5aeb280f54fb99e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 6 Nov 2017 15:23:06 +0100 Subject: [PATCH] Shell: fixme broken invariant in Distributed_db_functors. The invariant of the `clear` function was not properly inforced by the module interface. This patch remove the inappropriate invariant and properly rename the function. --- src/node/shell/distributed_db.ml | 19 +++++++++---------- src/node/shell/distributed_db.mli | 2 +- src/node/shell/distributed_db_functors.ml | 20 +++++++++++++++++--- src/node/shell/distributed_db_functors.mli | 3 ++- 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index e4ad7369f..6b214323a 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -201,7 +201,7 @@ module Raw_operation_hashes = struct map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1)) let clear_all table hash n = - List.iter (fun i -> Table.clear table (hash, i)) (0 -- (n-1)) + List.iter (fun i -> Table.clear_or_cancel table (hash, i)) (0 -- (n-1)) end @@ -270,7 +270,7 @@ module Raw_operations = struct map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1)) let clear_all table hash n = - List.iter (fun i -> Table.clear table (hash, i)) (0 -- (n-1)) + List.iter (fun i -> Table.clear_or_cancel table (hash, i)) (0 -- (n-1)) end @@ -750,7 +750,7 @@ let commit_block net_db hash validation_result = hash header.shell.validation_passes >>=? fun operations -> State.Block.store net_db.net_state header operations validation_result >>=? fun res -> - Raw_block_header.Table.clear net_db.block_header_db.table hash ; + Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash ; Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash header.shell.validation_passes ; Raw_operations.clear_all @@ -758,7 +758,7 @@ let commit_block net_db hash validation_result = (* TODO: proper handling of the operations table by the prevalidator. *) List.iter (List.iter - (fun op -> Raw_operation.Table.clear + (fun op -> Raw_operation.Table.clear_or_cancel net_db.operation_db.table (Operation.hash op))) operations ; @@ -768,7 +768,7 @@ let commit_invalid_block net_db hash = Raw_block_header.Table.read net_db.block_header_db.table hash >>=? fun header -> State.Block.store_invalid net_db.net_state header >>=? fun res -> - Raw_block_header.Table.clear net_db.block_header_db.table hash ; + Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash ; Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash header.shell.validation_passes ; Raw_operations.clear_all @@ -788,7 +788,7 @@ let inject_protocol db h p = let commit_protocol db h = Raw_protocol.Table.read db.protocol_db.table h >>=? fun p -> State.Protocol.store db.disk p >>= fun res -> - Raw_protocol.Table.clear db.protocol_db.table h ; + Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ; return (res <> None) type operation = @@ -841,7 +841,7 @@ let inject_block db bytes operations = let clear_block net_db hash n = Raw_operations.clear_all net_db.operations_db.table hash n ; Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ; - Raw_block_header.Table.clear net_db.block_header_db.table hash + Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash let broadcast_head net_db head mempool = let msg : Message.t = @@ -881,7 +881,7 @@ module type DISTRIBUTED_DB = sig val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t - val clear: t -> key -> unit + val clear_or_cancel: t -> key -> unit end module Make @@ -901,10 +901,9 @@ module Make let read_exn t k = Table.read_exn (Kind.proj t) k let prefetch t ?peer k p = Table.prefetch (Kind.proj t) ?peer k p let fetch t ?peer k p = Table.fetch (Kind.proj t) ?peer k p - let clear t k = Table.clear (Kind.proj t) k + let clear_or_cancel t k = Table.clear_or_cancel (Kind.proj t) k let inject t k v = Table.inject (Kind.proj t) k v let watch t = Table.watch (Kind.proj t) - let clear t k = Table.clear (Kind.proj t) k end module Block_header = diff --git a/src/node/shell/distributed_db.mli b/src/node/shell/distributed_db.mli index 4c1abf4dc..7dec7d299 100644 --- a/src/node/shell/distributed_db.mli +++ b/src/node/shell/distributed_db.mli @@ -84,7 +84,7 @@ module type DISTRIBUTED_DB = sig val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t - val clear: t -> key -> unit + val clear_or_cancel: t -> key -> unit end module Block_header : diff --git a/src/node/shell/distributed_db_functors.ml b/src/node/shell/distributed_db_functors.ml index d6e24125e..cde6c9cf8 100644 --- a/src/node/shell/distributed_db_functors.ml +++ b/src/node/shell/distributed_db_functors.ml @@ -24,7 +24,7 @@ module type DISTRIBUTED_DB = sig val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t - val clear: t -> key -> unit + val clear_or_cancel: t -> key -> unit val inject: t -> key -> value -> bool Lwt.t val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper @@ -56,6 +56,7 @@ module type SCHEDULER_EVENTS = sig type key val request: t -> P2p.Peer_id.t option -> key -> unit val notify: t -> P2p.Peer_id.t -> key -> unit + val notify_cancelation: t -> key -> unit val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit val notify_invalid: t -> P2p.Peer_id.t -> key -> unit @@ -215,10 +216,13 @@ end = struct | Found _ -> Lwt.return_false - let clear s k = + let clear_or_cancel s k = match Memory_table.find s.memory k with | exception Not_found -> () - | Pending _ -> assert false + | Pending (w, _) -> + Scheduler.notify_cancelation s.scheduler k ; + Memory_table.remove s.memory k ; + Lwt.wakeup_later_exn w Lwt.Canceled | Found _ -> Memory_table.remove s.memory k let watch s = Watcher.create_stream s.input @@ -268,6 +272,7 @@ end = struct and event = | Request of P2p.Peer_id.t option * key | Notify of P2p.Peer_id.t * key + | Notify_cancelation of key | Notify_invalid of P2p.Peer_id.t * key | Notify_duplicate of P2p.Peer_id.t * key | Notify_unrequested of P2p.Peer_id.t * key @@ -278,6 +283,10 @@ end = struct debug "push received %a from %a" Hash.pp k P2p.Peer_id.pp_short p ; t.push_to_worker (Notify (p, k)) + let notify_cancelation t k = + debug "push cancelation %a" + Hash.pp k ; + t.push_to_worker (Notify_cancelation k) let notify_invalid t p k = debug "push received invalid %a from %a" Hash.pp k P2p.Peer_id.pp_short p ; @@ -361,6 +370,11 @@ end = struct lwt_debug "received %a from %a" Hash.pp key P2p.Peer_id.pp_short peer >>= fun () -> Lwt.return_unit + | Notify_cancelation key -> + Table.remove state.pending key ; + lwt_debug "canceled %a" + Hash.pp key >>= fun () -> + Lwt.return_unit | Notify_invalid (peer, key) -> lwt_debug "received invalid %a from %a" Hash.pp key P2p.Peer_id.pp_short peer >>= fun () -> diff --git a/src/node/shell/distributed_db_functors.mli b/src/node/shell/distributed_db_functors.mli index d11625c85..756e5a20e 100644 --- a/src/node/shell/distributed_db_functors.mli +++ b/src/node/shell/distributed_db_functors.mli @@ -24,7 +24,7 @@ module type DISTRIBUTED_DB = sig val prefetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> unit val fetch: t -> ?peer:P2p.Peer_id.t -> key -> param -> value Lwt.t - val clear: t -> key -> unit + val clear_or_cancel: t -> key -> unit val inject: t -> key -> value -> bool Lwt.t val watch: t -> (key * value) Lwt_stream.t * Watcher.stopper @@ -57,6 +57,7 @@ module type SCHEDULER_EVENTS = sig type key val request: t -> P2p.Peer_id.t option -> key -> unit val notify: t -> P2p.Peer_id.t -> key -> unit + val notify_cancelation: t -> key -> unit val notify_unrequested: t -> P2p.Peer_id.t -> key -> unit val notify_duplicate: t -> P2p.Peer_id.t -> key -> unit val notify_invalid: t -> P2p.Peer_id.t -> key -> unit