From 201b851f69080c6bc3974e80d795c8eb76867fa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Fri, 29 Sep 2017 18:43:13 +0200 Subject: [PATCH] Node: store the mempool in `State`. This simplifies the interaction between the `validator` and the `distributed_db` by removing some "callbacks". --- src/node/shell/chain.ml | 19 ++++++++++++++++-- src/node/shell/chain.mli | 8 +++++++- src/node/shell/distributed_db.ml | 26 +++++++++++++++++-------- src/node/shell/distributed_db.mli | 5 ++--- src/node/shell/prevalidator.ml | 3 +++ src/node/shell/state.ml | 4 +++- src/node/shell/state.mli | 1 + src/node/shell/validator.ml | 32 +++++++++---------------------- 8 files changed, 60 insertions(+), 38 deletions(-) diff --git a/src/node/shell/chain.ml b/src/node/shell/chain.ml index 0a4b56b6f..13fca8455 100644 --- a/src/node/shell/chain.ml +++ b/src/node/shell/chain.ml @@ -82,7 +82,9 @@ let locked_set_head chain_store data block = let set_head net_state block = update_chain_store net_state begin fun chain_store data -> locked_set_head chain_store data block >>= fun () -> - Lwt.return (Some { current_head = block }, ()) + Lwt.return (Some { current_head = block ; + current_reversed_mempool = [] }, + ()) end let test_and_set_head net_state ~old block = @@ -91,5 +93,18 @@ let test_and_set_head net_state ~old block = Lwt.return (None, false) else locked_set_head chain_store data block >>= fun () -> - Lwt.return (Some { current_head = block }, true) + Lwt.return (Some { current_head = block ; + current_reversed_mempool = [] }, + true) + end + +let set_reversed_mempool net_state current_reversed_mempool = + update_chain_store net_state begin fun _chain_store data -> + Lwt.return (Some { data with current_reversed_mempool }, + ()) + end + +let mempool net_state = + read_chain_store net_state begin fun _chain_store data -> + Lwt.return (List.rev data.current_reversed_mempool) end diff --git a/src/node/shell/chain.mli b/src/node/shell/chain.mli index ba3c8b7b7..49ffda47e 100644 --- a/src/node/shell/chain.mli +++ b/src/node/shell/chain.mli @@ -19,10 +19,16 @@ val head: Net.t -> Block.t Lwt.t val known_heads: Net.t -> Block.t list Lwt.t +val mem: Net.t -> Block_hash.t -> bool Lwt.t +(** Test whether a block belongs to the current mainnet. *) + val set_head: Net.t -> Block.t -> unit Lwt.t (** Record a block as the current head of the network's blockchain. *) -val mem: Net.t -> Block_hash.t -> bool Lwt.t +val set_reversed_mempool: Net.t -> Operation_hash.t list -> unit Lwt.t +(** Record a list as the current list of pending operations. *) + +val mempool: Net.t -> Operation_hash.t list Lwt.t val test_and_set_head: Net.t -> old:Block.t -> Block.t -> bool Lwt.t diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index 1cd1fda88..72d59146a 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -301,9 +301,7 @@ module Raw_protocol = type callback = { notify_branch: P2p.Peer_id.t -> Block_hash.t list -> unit ; - current_branch: int -> Block_hash.t list Lwt.t ; notify_head: P2p.Peer_id.t -> Block_hash.t -> Operation_hash.t list -> unit ; - current_head: int -> (Block_hash.t * Operation_hash.t list) Lwt.t ; disconnection: P2p.Peer_id.t -> unit ; } @@ -324,7 +322,7 @@ and net_db = { block_header_db: Raw_block_header.t ; operation_hashes_db: Raw_operation_hashes.t ; operations_db: Raw_operations.t ; - callback: callback ; + mutable callback: callback ; active_peers: P2p.Peer_id.Set.t ref ; active_connections: p2p_reader P2p.Peer_id.Table.t ; } @@ -337,6 +335,12 @@ and p2p_reader = { mutable worker: unit Lwt.t ; } +let noop_callback = { + notify_branch = begin fun _gid _locator -> () end ; + notify_head = begin fun _gid _block _ops -> () end ; + disconnection = begin fun _gid -> () end ; + } + type t = db let state { net_state } = net_state @@ -399,7 +403,8 @@ module P2p_reader = struct ignore @@ P2p.try_send global_db.p2p state.conn @@ Get_current_branch net_id ; - net_db.callback.current_branch 200 >>= fun locator -> + Chain.head net_db.net_state >>= fun head -> + Chain_traversal.block_locator head 200 >>= fun locator -> ignore @@ P2p.try_send global_db.p2p state.conn @@ Current_branch (net_id, locator) ; @@ -423,10 +428,12 @@ module P2p_reader = struct | Get_current_head net_id -> may_handle state net_id @@ fun net_db -> - net_db.callback.current_head 200 >>= fun (head, mempool) -> + Chain.head net_db.net_state >>= fun head -> + Chain.mempool net_db.net_state >>= fun mempool -> ignore @@ P2p.try_send global_db.p2p state.conn - @@ Current_head (net_id, head, mempool) ; + @@ Current_head (net_id, State.Block.hash head, + Utils.list_sub mempool 200) ; Lwt.return_unit | Current_head (net_id, head, mempool) -> @@ -640,7 +647,7 @@ let create disk p2p = P2p.iter_connections p2p (P2p_reader.run db) ; db -let activate ~callback ({ p2p ; active_nets } as global_db) net_state = +let activate ({ p2p ; active_nets } as global_db) net_state = let net_id = State.Net.id net_state in match Net_id.Table.find active_nets net_id with | exception Not_found -> @@ -663,7 +670,7 @@ let activate ~callback ({ p2p ; active_nets } as global_db) net_state = let net = { global_db ; operation_db ; block_header_db ; operation_hashes_db ; operations_db ; - net_state ; callback ; active_peers ; + net_state ; callback = noop_callback ; active_peers ; active_connections = P2p.Peer_id.Table.create 53 ; } in P2p.iter_connections p2p (fun _peer_id conn -> @@ -675,6 +682,9 @@ let activate ~callback ({ p2p ; active_nets } as global_db) net_state = | net -> net +let set_callback net_db callback = + net_db.callback <- callback + let deactivate net_db = let { active_nets ; p2p } = net_db.global_db in let net_id = State.Net.id net_db.net_state in diff --git a/src/node/shell/distributed_db.mli b/src/node/shell/distributed_db.mli index e8e8e5b7e..54feb2dcb 100644 --- a/src/node/shell/distributed_db.mli +++ b/src/node/shell/distributed_db.mli @@ -24,13 +24,12 @@ val state: net_db -> State.Net.t type callback = { notify_branch: P2p.Peer_id.t -> Block_hash.t list -> unit ; - current_branch: int -> Block_hash.t list Lwt.t ; notify_head: P2p.Peer_id.t -> Block_hash.t -> Operation_hash.t list -> unit ; - current_head: int -> (Block_hash.t * Operation_hash.t list) Lwt.t ; disconnection: P2p.Peer_id.t -> unit ; } -val activate: callback:callback -> t -> State.Net.t -> net_db +val activate: t -> State.Net.t -> net_db +val set_callback: net_db -> callback -> unit val deactivate: net_db -> unit Lwt.t val broadcast_head: diff --git a/src/node/shell/prevalidator.ml b/src/node/shell/prevalidator.ml index eed486588..f37ce025c 100644 --- a/src/node/shell/prevalidator.ml +++ b/src/node/shell/prevalidator.ml @@ -146,6 +146,7 @@ let create net_db = (filter_out r.applied !operations.branch_delayed) r.branch_delayed ; } ; + Chain.set_reversed_mempool net_state !operations.applied >>= fun () -> if broadcast then broadcast_operation r.applied ; Lwt_list.iter_s (fun (_op, _exns) -> @@ -200,6 +201,8 @@ let create net_db = applied = h :: !operations.applied }; return () ) res.applied >>=? fun () -> + Chain.set_reversed_mempool + net_state !operations.applied >>= fun () -> broadcast_operation res.applied ; begin if force then diff --git a/src/node/shell/state.ml b/src/node/shell/state.ml index e26c857f3..3cdd2f829 100644 --- a/src/node/shell/state.ml +++ b/src/node/shell/state.ml @@ -71,6 +71,7 @@ and chain_state = { and chain_data = { current_head: block ; + current_reversed_mempool: Operation_hash.t list ; } and block = { @@ -155,7 +156,8 @@ module Net = struct net_state ; hash = current_head ; contents = current_block ; - } + } ; + current_reversed_mempool = [] ; } ; chain_store ; } diff --git a/src/node/shell/state.mli b/src/node/shell/state.mli index 8859fdb65..362732233 100644 --- a/src/node/shell/state.mli +++ b/src/node/shell/state.mli @@ -153,6 +153,7 @@ val fork_testnet: type chain_data = { current_head: Block.t ; + current_reversed_mempool: Operation_hash.t list ; } val read_chain_store: diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index 3e33febb7..2dfff9d5c 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -607,37 +607,23 @@ end let rec create_validator ?parent worker ?max_child_ttl state db net = - let queue = Lwt_pipe.create () in - let current_ops = ref (fun () -> []) in + let net_id = State.Net.id net in + let net_db = Distributed_db.activate db net in + let session = Context_db.create net_db in - let callback : Distributed_db.callback = { + let queue = Lwt_pipe.create () in + Prevalidator.create net_db >>= fun prevalidator -> + let new_blocks = ref Lwt.return_unit in + + Distributed_db.set_callback net_db { notify_branch = begin fun gid locator -> Lwt.async (fun () -> Lwt_pipe.push queue (`Branch (gid, locator))) end ; - current_branch = begin fun size -> - Chain.head net >>= fun head -> - Chain_traversal.block_locator head size - end ; notify_head = begin fun gid block ops -> Lwt.async (fun () -> Lwt_pipe.push queue (`Head (gid, block, ops))) ; end ; - current_head = begin fun size -> - Chain.head net >>= fun head -> - Lwt.return (State.Block.hash head, Utils.list_sub (!current_ops ()) size) - end ; disconnection = (fun _gid -> ()) ; - } in - - let net_id = State.Net.id net in - let net_db = Distributed_db.activate ~callback db net in - let session = Context_db.create net_db in - - Prevalidator.create net_db >>= fun prevalidator -> - current_ops := - (fun () -> - let res, _ = Prevalidator.operations prevalidator in - res.applied); - let new_blocks = ref Lwt.return_unit in + } ; let shutdown () = lwt_log_notice "shutdown %a" Net_id.pp net_id >>= fun () ->