From 119f724e64c9e421e8a6687a0c5a5dec24647e03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 13 Nov 2017 14:33:39 +0100 Subject: [PATCH] Mempool: also broadcast 'branch_{delayed,refused}' operations --- src/node/shell/chain.ml | 18 ++---- src/node/shell/chain.mli | 8 +-- src/node/shell/distributed_db.ml | 11 ++-- src/node/shell/distributed_db.mli | 4 +- src/node/shell/distributed_db_message.ml | 8 +-- src/node/shell/distributed_db_message.mli | 2 +- src/node/shell/mempool.ml | 34 ++++++++++++ src/node/shell/mempool.mli | 32 +++++++++++ src/node/shell/prevalidator.ml | 67 +++++++++++++++++------ src/node/shell/prevalidator.mli | 2 +- src/node/shell/state.ml | 23 +++++++- src/node/shell/state.mli | 10 +++- 12 files changed, 167 insertions(+), 52 deletions(-) create mode 100644 src/node/shell/mempool.ml create mode 100644 src/node/shell/mempool.mli diff --git a/src/node/shell/chain.ml b/src/node/shell/chain.ml index 1c3e4c51a..9b2f2baeb 100644 --- a/src/node/shell/chain.ml +++ b/src/node/shell/chain.ml @@ -7,10 +7,11 @@ (* *) (**************************************************************************) - open Logging.Node.State open State +let mempool_encoding = State.mempool_encoding + let genesis net_state = let genesis = Net.genesis net_state in Block.read_exn net_state genesis.block @@ -64,7 +65,7 @@ let set_head net_state block = update_chain_store net_state begin fun chain_store data -> locked_set_head chain_store data block >>= fun () -> Lwt.return (Some { current_head = block ; - current_reversed_mempool = [] }, + current_mempool = State.empty_mempool }, data.current_head) end @@ -75,17 +76,6 @@ let test_and_set_head net_state ~old block = else locked_set_head chain_store data block >>= fun () -> Lwt.return (Some { current_head = block ; - current_reversed_mempool = [] }, + current_mempool = State.empty_mempool }, true) end - -let set_reversed_mempool net_state current_reversed_mempool = - update_chain_store net_state begin fun _chain_store data -> - Lwt.return (Some { data with current_reversed_mempool }, - ()) - end - -let mempool net_state = - read_chain_store net_state begin fun _chain_store data -> - Lwt.return (List.rev data.current_reversed_mempool) - end diff --git a/src/node/shell/chain.mli b/src/node/shell/chain.mli index 09f34d723..a25464adf 100644 --- a/src/node/shell/chain.mli +++ b/src/node/shell/chain.mli @@ -7,6 +7,7 @@ (* *) (**************************************************************************) +(** Tezos Shell Module - Manging the current head. *) open State @@ -18,6 +19,7 @@ val head: Net.t -> Block.t Lwt.t (** The current head of the network's blockchain. *) val known_heads: Net.t -> Block.t list Lwt.t +(** The current head and all the known (valid) alternate heads. *) val mem: Net.t -> Block_hash.t -> bool Lwt.t (** Test whether a block belongs to the current mainnet. *) @@ -26,13 +28,9 @@ val set_head: Net.t -> Block.t -> Block.t Lwt.t (** Record a block as the current head of the network's blockchain. It returns the previous head. *) -val set_reversed_mempool: Net.t -> Operation_hash.t list -> unit Lwt.t -(** Record a list as the current list of pending operations. *) - -val mempool: Net.t -> Operation_hash.t list Lwt.t - val test_and_set_head: Net.t -> old:Block.t -> Block.t -> bool Lwt.t (** Atomically change the current head of the network's blockchain. This returns [true] whenever the change succeeded, or [false] when the current head os not equal to the [old] argument. *) + diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index 698c1ec0e..d6a41f0eb 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -294,7 +294,7 @@ type callback = { notify_branch: P2p.Peer_id.t -> Block_locator.t -> unit ; notify_head: - P2p.Peer_id.t -> Block_header.t -> Operation_hash.t list -> unit ; + P2p.Peer_id.t -> Block_header.t -> Mempool.t -> unit ; disconnection: P2p.Peer_id.t -> unit ; } @@ -424,12 +424,11 @@ module P2p_reader = struct | Get_current_head net_id -> may_handle state net_id @@ fun net_db -> - Chain.head net_db.net_state >>= fun head -> - Chain.mempool net_db.net_state >>= fun mempool -> + Mempool.get net_db.net_state >>= fun (head, mempool) -> + (* TODO bound the sent mempool size *) ignore @@ P2p.try_send global_db.p2p state.conn - @@ Current_head (net_id, State.Block.header head, - Utils.list_sub mempool 200) ; + @@ Current_head (net_id, head, mempool) ; Lwt.return_unit | Current_head (net_id, header, mempool) -> @@ -889,7 +888,7 @@ end module Advertise = struct - let current_head net_db ?peer ?(mempool = []) head = + let current_head net_db ?peer ?(mempool = Mempool.empty) head = let net_id = State.Net.id net_db.net_state in assert (Net_id.equal net_id (State.Block.net_id head)) ; send net_db ?peer @@ diff --git a/src/node/shell/distributed_db.mli b/src/node/shell/distributed_db.mli index 45eff9d9c..40f09e2f7 100644 --- a/src/node/shell/distributed_db.mli +++ b/src/node/shell/distributed_db.mli @@ -28,7 +28,7 @@ type callback = { notify_branch: P2p.Peer_id.t -> Block_locator.t -> unit ; notify_head: - P2p.Peer_id.t -> Block_header.t -> Operation_hash.t list -> unit ; + P2p.Peer_id.t -> Block_header.t -> Mempool.t -> unit ; disconnection: P2p.Peer_id.t -> unit ; } @@ -144,7 +144,7 @@ end module Advertise : sig val current_head: net_db -> ?peer:P2p.Peer_id.t -> - ?mempool:Operation_hash.t list -> State.Block.t -> unit + ?mempool:Mempool.t -> State.Block.t -> unit val current_branch: net_db -> ?peer:P2p.Peer_id.t -> State.Block.t -> unit Lwt.t diff --git a/src/node/shell/distributed_db_message.ml b/src/node/shell/distributed_db_message.ml index 205c29299..9dc2d1ed0 100644 --- a/src/node/shell/distributed_db_message.ml +++ b/src/node/shell/distributed_db_message.ml @@ -14,7 +14,7 @@ type t = | Deactivate of Net_id.t | Get_current_head of Net_id.t - | Current_head of Net_id.t * Block_header.t * Operation_hash.t list + | Current_head of Net_id.t * Block_header.t * Mempool.t | Get_block_headers of Net_id.t * Block_hash.t list | Block_header of Block_header.t @@ -77,11 +77,11 @@ let encoding = (obj3 (req "net_id" Net_id.encoding) (req "current_block_header" (dynamic_size Block_header.encoding)) - (req "current_mempool" (list Operation_hash.encoding))) + (req "current_mempool" Mempool.encoding)) (function - | Current_head (net_id, bh, ops) -> Some (net_id, bh, ops) + | Current_head (net_id, bh, mempool) -> Some (net_id, bh, mempool) | _ -> None) - (fun (net_id, bh, ops) -> Current_head (net_id, bh, ops)) ; + (fun (net_id, bh, mempool) -> Current_head (net_id, bh, mempool)) ; case ~tag:0x20 (obj2 diff --git a/src/node/shell/distributed_db_message.mli b/src/node/shell/distributed_db_message.mli index 687f0727b..3ffcd55a9 100644 --- a/src/node/shell/distributed_db_message.mli +++ b/src/node/shell/distributed_db_message.mli @@ -14,7 +14,7 @@ type t = | Deactivate of Net_id.t | Get_current_head of Net_id.t - | Current_head of Net_id.t * Block_header.t * Operation_hash.t list + | Current_head of Net_id.t * Block_header.t * Mempool.t | Get_block_headers of Net_id.t * Block_hash.t list | Block_header of Block_header.t diff --git a/src/node/shell/mempool.ml b/src/node/shell/mempool.ml new file mode 100644 index 000000000..b6eda1408 --- /dev/null +++ b/src/node/shell/mempool.ml @@ -0,0 +1,34 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open State + +type t = State.mempool = { + known_valid: Operation_hash.t list ; + pending: Operation_hash.Set.t ; +} +type mempool = t + +let encoding = State.mempool_encoding +let empty = State.empty_mempool + +let set net_state ~head mempool = + update_chain_store net_state begin fun _chain_store data -> + if Block_hash.equal head (Block.hash data.current_head) then + Lwt.return (Some { data with current_mempool = mempool }, + ()) + else + Lwt.return (None, ()) + end + +let get net_state = + read_chain_store net_state begin fun _chain_store data -> + Lwt.return (Block.header data.current_head, data.current_mempool) + end + diff --git a/src/node/shell/mempool.mli b/src/node/shell/mempool.mli new file mode 100644 index 000000000..e82771917 --- /dev/null +++ b/src/node/shell/mempool.mli @@ -0,0 +1,32 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** Tezos Shell Module - Mempool, a.k.a. the operations safe to be + broadcasted. *) + +type t = State.mempool = { + known_valid: Operation_hash.t list ; + (** A valid sequence of operations on top of the current head. *) + pending: Operation_hash.Set.t ; + (** Set of known not-invalid operation. *) +} +type mempool = t + +val encoding: mempool Data_encoding.t + +val empty: mempool +(** Empty mempool. *) + +val get: State.Net.t -> (Block_header.t * mempool) Lwt.t +(** The current mempool, *) + +val set: State.Net.t -> head:Block_hash.t -> mempool -> unit Lwt.t +(** Set the current mempool. It is ignored if the current head is + not the provided one. *) + diff --git a/src/node/shell/prevalidator.ml b/src/node/shell/prevalidator.ml index 77f1344c5..5df84f29d 100644 --- a/src/node/shell/prevalidator.ml +++ b/src/node/shell/prevalidator.ml @@ -53,7 +53,7 @@ open Prevalidation type t = { net_db: Distributed_db.net_db ; flush: State.Block.t -> unit; - notify_operations: P2p.Peer_id.t -> Operation_hash.t list -> unit ; + notify_operations: P2p.Peer_id.t -> Mempool.t -> unit ; prevalidate_operations: bool -> Operation.t list -> (Operation_hash.t list * error preapply_result) tzresult Lwt.t ; @@ -84,7 +84,7 @@ let create (start_prevalidation ~predecessor:head ~timestamp:!timestamp () >|= ref) >>= fun validation_state -> let pending = Operation_hash.Table.create 53 in let head = ref head in - let mempool = ref [] in + let mempool = ref Mempool.empty in let operations = ref empty_result in Chain_traversal.live_blocks !head @@ -105,8 +105,25 @@ let create validation_state := state; Lwt.return_unit in - let broadcast_operation ops = - Distributed_db.Advertise.current_head net_db ~mempool:ops !head in + let broadcast_new_operations r = + Distributed_db.Advertise.current_head + net_db + ~mempool:{ + known_valid = [] ; + pending = + List.fold_right + (fun (k, _) s -> Operation_hash.Set.add k s) + r.applied @@ + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + r.branch_delayed @@ + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + r.branch_refused @@ + Operation_hash.Set.empty ; + } + !head + in let handle_unprocessed () = if Operation_hash.Map.is_empty !unprocessed then @@ -145,10 +162,21 @@ let create ops Operation_hash.Map.empty ; } in Lwt.return (!validation_state, r) end >>= fun (state, r) -> + let filter_out s m = + List.fold_right (fun (h, _op) -> Operation_hash.Set.remove h) s m in + mempool := { + known_valid = !mempool.known_valid @ List.rev_map fst r.applied ; + pending = + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + r.branch_delayed @@ + Operation_hash.Map.fold + (fun k _ s -> Operation_hash.Set.add k s) + r.branch_refused @@ + filter_out r.applied !mempool.pending ; + } ; let filter_out s m = List.fold_right (fun (h, _op) -> Operation_hash.Map.remove h) s m in - let new_ops = List.map fst r.applied in - mempool := List.rev_append new_ops !mempool ; operations := { applied = List.rev_append r.applied !operations.applied ; refused = Operation_hash.Map.empty ; @@ -162,8 +190,9 @@ let create (filter_out r.applied !operations.branch_delayed) r.branch_delayed ; } ; - Chain.set_reversed_mempool net_state !mempool >>= fun () -> - if broadcast then broadcast_operation new_ops ; + Mempool.set net_state + ~head:(State.Block.hash !head) !mempool >>= fun () -> + if broadcast then broadcast_new_operations r ; Lwt_list.iter_s (fun (_op, _exns) -> (* FIXME *) @@ -212,14 +241,17 @@ let create iter_s (fun (h, op) -> register h op >>=? fun () -> - mempool := h :: !mempool ; + mempool := { !mempool with + known_valid = + !mempool.known_valid @ [h] } ; operations := { !operations with - applied = (h, op) :: !operations.applied }; + applied = (h, op) :: !operations.applied } ; return () ) res.applied >>=? fun () -> - Chain.set_reversed_mempool net_state !mempool >>= fun () -> - broadcast_operation (List.map fst res.applied) ; + Mempool.set net_state + ~head:(State.Block.hash !head) !mempool >>= fun () -> + broadcast_new_operations res ; begin if force then iter_p @@ -250,7 +282,10 @@ let create Lwt.wakeup w result ; Lwt.return_unit end - | `Register (gid, ops) -> + | `Register (gid, mempool) -> + let ops = + Operation_hash.Set.elements mempool.Mempool.pending @ + mempool.known_valid in let known_ops, unknown_ops = List.partition (fun op -> @@ -305,7 +340,7 @@ let create (Operation_hash.Map.cardinal new_mempool) >>= fun () -> (* Reset the pre-validation context *) head := new_head ; - mempool := [] ; + mempool := Mempool.empty ; operations := empty_result ; broadcast_unprocessed := false ; unprocessed := new_mempool ; @@ -327,9 +362,9 @@ let create if not (Lwt.is_sleeping !running_validation) then Lwt.cancel !running_validation in - let notify_operations gid ops = + let notify_operations gid mempool = Lwt.async begin fun () -> - push_to_worker (`Register (gid, ops)) ; + push_to_worker (`Register (gid, mempool)) ; Lwt.return_unit end in let prevalidate_operations force raw_ops = diff --git a/src/node/shell/prevalidator.mli b/src/node/shell/prevalidator.mli index ec01fc995..cf8fb9120 100644 --- a/src/node/shell/prevalidator.mli +++ b/src/node/shell/prevalidator.mli @@ -34,7 +34,7 @@ val create: Distributed_db.net_db -> t Lwt.t val shutdown: t -> unit Lwt.t -val notify_operations: t -> P2p.Peer_id.t -> Operation_hash.t list -> unit +val notify_operations: t -> P2p.Peer_id.t -> Mempool.t -> unit (** Conditionnaly inject a new operation in the node: the operation will be ignored when it is (strongly) refused This is the diff --git a/src/node/shell/state.ml b/src/node/shell/state.ml index 727c326bc..c663519a7 100644 --- a/src/node/shell/state.ml +++ b/src/node/shell/state.ml @@ -87,7 +87,12 @@ and chain_state = { and chain_data = { current_head: block ; - current_reversed_mempool: Operation_hash.t list ; + current_mempool: mempool ; +} + +and mempool = { + known_valid: Operation_hash.t list ; + pending: Operation_hash.Set.t ; } and block = { @@ -96,6 +101,20 @@ and block = { contents: Store.Block.contents ; } +let mempool_encoding = + let open Data_encoding in + conv + (fun { known_valid ; pending } -> (known_valid, pending)) + (fun (known_valid, pending) -> { known_valid ; pending }) + (obj2 + (req "known_valid" (dynamic_size (list Operation_hash.encoding))) + (req "pending" (dynamic_size Operation_hash.Set.encoding))) + +let empty_mempool = { + known_valid = [] ; + pending = Operation_hash.Set.empty ; +} + let read_chain_store { chain_state } f = Shared.use chain_state begin fun state -> f state.chain_store state.data @@ -173,7 +192,7 @@ module Net = struct hash = current_head ; contents = current_block ; } ; - current_reversed_mempool = [] ; + current_mempool = empty_mempool ; } ; chain_store ; } diff --git a/src/node/shell/state.mli b/src/node/shell/state.mli index 333b8f533..4e0c1f9e9 100644 --- a/src/node/shell/state.mli +++ b/src/node/shell/state.mli @@ -155,9 +155,17 @@ val read_block_exn: val fork_testnet: Block.t -> Protocol_hash.t -> Time.t -> Net.t tzresult Lwt.t +type mempool = { + known_valid: Operation_hash.t list ; + pending: Operation_hash.Set.t ; +} + +val empty_mempool: mempool +val mempool_encoding: mempool Data_encoding.t + type chain_data = { current_head: Block.t ; - current_reversed_mempool: Operation_hash.t list ; + current_mempool: mempool ; } val read_chain_store: