Node: store the mempool in State.

This simplifies the interaction between the `validator` and the
`distributed_db` by removing some "callbacks".
This commit is contained in:
Grégoire Henry 2017-09-29 18:43:13 +02:00 committed by Benjamin Canou
parent 57109435d5
commit 201b851f69
8 changed files with 60 additions and 38 deletions

View File

@ -82,7 +82,9 @@ let locked_set_head chain_store data block =
let set_head net_state block = let set_head net_state block =
update_chain_store net_state begin fun chain_store data -> update_chain_store net_state begin fun chain_store data ->
locked_set_head chain_store data block >>= fun () -> locked_set_head chain_store data block >>= fun () ->
Lwt.return (Some { current_head = block }, ()) Lwt.return (Some { current_head = block ;
current_reversed_mempool = [] },
())
end end
let test_and_set_head net_state ~old block = let test_and_set_head net_state ~old block =
@ -91,5 +93,18 @@ let test_and_set_head net_state ~old block =
Lwt.return (None, false) Lwt.return (None, false)
else else
locked_set_head chain_store data block >>= fun () -> locked_set_head chain_store data block >>= fun () ->
Lwt.return (Some { current_head = block }, true) Lwt.return (Some { current_head = block ;
current_reversed_mempool = [] },
true)
end
let set_reversed_mempool net_state current_reversed_mempool =
update_chain_store net_state begin fun _chain_store data ->
Lwt.return (Some { data with current_reversed_mempool },
())
end
let mempool net_state =
read_chain_store net_state begin fun _chain_store data ->
Lwt.return (List.rev data.current_reversed_mempool)
end end

View File

@ -19,10 +19,16 @@ val head: Net.t -> Block.t Lwt.t
val known_heads: Net.t -> Block.t list Lwt.t val known_heads: Net.t -> Block.t list Lwt.t
val mem: Net.t -> Block_hash.t -> bool Lwt.t
(** Test whether a block belongs to the current mainnet. *)
val set_head: Net.t -> Block.t -> unit Lwt.t val set_head: Net.t -> Block.t -> unit Lwt.t
(** Record a block as the current head of the network's blockchain. *) (** Record a block as the current head of the network's blockchain. *)
val mem: Net.t -> Block_hash.t -> bool Lwt.t val set_reversed_mempool: Net.t -> Operation_hash.t list -> unit Lwt.t
(** Record a list as the current list of pending operations. *)
val mempool: Net.t -> Operation_hash.t list Lwt.t
val test_and_set_head: val test_and_set_head:
Net.t -> old:Block.t -> Block.t -> bool Lwt.t Net.t -> old:Block.t -> Block.t -> bool Lwt.t

View File

@ -301,9 +301,7 @@ module Raw_protocol =
type callback = { type callback = {
notify_branch: P2p.Peer_id.t -> Block_hash.t list -> unit ; notify_branch: P2p.Peer_id.t -> Block_hash.t list -> unit ;
current_branch: int -> Block_hash.t list Lwt.t ;
notify_head: P2p.Peer_id.t -> Block_hash.t -> Operation_hash.t list -> unit ; notify_head: P2p.Peer_id.t -> Block_hash.t -> Operation_hash.t list -> unit ;
current_head: int -> (Block_hash.t * Operation_hash.t list) Lwt.t ;
disconnection: P2p.Peer_id.t -> unit ; disconnection: P2p.Peer_id.t -> unit ;
} }
@ -324,7 +322,7 @@ and net_db = {
block_header_db: Raw_block_header.t ; block_header_db: Raw_block_header.t ;
operation_hashes_db: Raw_operation_hashes.t ; operation_hashes_db: Raw_operation_hashes.t ;
operations_db: Raw_operations.t ; operations_db: Raw_operations.t ;
callback: callback ; mutable callback: callback ;
active_peers: P2p.Peer_id.Set.t ref ; active_peers: P2p.Peer_id.Set.t ref ;
active_connections: p2p_reader P2p.Peer_id.Table.t ; active_connections: p2p_reader P2p.Peer_id.Table.t ;
} }
@ -337,6 +335,12 @@ and p2p_reader = {
mutable worker: unit Lwt.t ; mutable worker: unit Lwt.t ;
} }
let noop_callback = {
notify_branch = begin fun _gid _locator -> () end ;
notify_head = begin fun _gid _block _ops -> () end ;
disconnection = begin fun _gid -> () end ;
}
type t = db type t = db
let state { net_state } = net_state let state { net_state } = net_state
@ -399,7 +403,8 @@ module P2p_reader = struct
ignore ignore
@@ P2p.try_send global_db.p2p state.conn @@ P2p.try_send global_db.p2p state.conn
@@ Get_current_branch net_id ; @@ Get_current_branch net_id ;
net_db.callback.current_branch 200 >>= fun locator -> Chain.head net_db.net_state >>= fun head ->
Chain_traversal.block_locator head 200 >>= fun locator ->
ignore ignore
@@ P2p.try_send global_db.p2p state.conn @@ P2p.try_send global_db.p2p state.conn
@@ Current_branch (net_id, locator) ; @@ Current_branch (net_id, locator) ;
@ -423,10 +428,12 @@ module P2p_reader = struct
| Get_current_head net_id -> | Get_current_head net_id ->
may_handle state net_id @@ fun net_db -> may_handle state net_id @@ fun net_db ->
net_db.callback.current_head 200 >>= fun (head, mempool) -> Chain.head net_db.net_state >>= fun head ->
Chain.mempool net_db.net_state >>= fun mempool ->
ignore ignore
@@ P2p.try_send global_db.p2p state.conn @@ P2p.try_send global_db.p2p state.conn
@@ Current_head (net_id, head, mempool) ; @@ Current_head (net_id, State.Block.hash head,
Utils.list_sub mempool 200) ;
Lwt.return_unit Lwt.return_unit
| Current_head (net_id, head, mempool) -> | Current_head (net_id, head, mempool) ->
@ -640,7 +647,7 @@ let create disk p2p =
P2p.iter_connections p2p (P2p_reader.run db) ; P2p.iter_connections p2p (P2p_reader.run db) ;
db db
let activate ~callback ({ p2p ; active_nets } as global_db) net_state = let activate ({ p2p ; active_nets } as global_db) net_state =
let net_id = State.Net.id net_state in let net_id = State.Net.id net_state in
match Net_id.Table.find active_nets net_id with match Net_id.Table.find active_nets net_id with
| exception Not_found -> | exception Not_found ->
@ -663,7 +670,7 @@ let activate ~callback ({ p2p ; active_nets } as global_db) net_state =
let net = { let net = {
global_db ; operation_db ; block_header_db ; global_db ; operation_db ; block_header_db ;
operation_hashes_db ; operations_db ; operation_hashes_db ; operations_db ;
net_state ; callback ; active_peers ; net_state ; callback = noop_callback ; active_peers ;
active_connections = P2p.Peer_id.Table.create 53 ; active_connections = P2p.Peer_id.Table.create 53 ;
} in } in
P2p.iter_connections p2p (fun _peer_id conn -> P2p.iter_connections p2p (fun _peer_id conn ->
@ -675,6 +682,9 @@ let activate ~callback ({ p2p ; active_nets } as global_db) net_state =
| net -> | net ->
net net
let set_callback net_db callback =
net_db.callback <- callback
let deactivate net_db = let deactivate net_db =
let { active_nets ; p2p } = net_db.global_db in let { active_nets ; p2p } = net_db.global_db in
let net_id = State.Net.id net_db.net_state in let net_id = State.Net.id net_db.net_state in

View File

@ -24,13 +24,12 @@ val state: net_db -> State.Net.t
type callback = { type callback = {
notify_branch: P2p.Peer_id.t -> Block_hash.t list -> unit ; notify_branch: P2p.Peer_id.t -> Block_hash.t list -> unit ;
current_branch: int -> Block_hash.t list Lwt.t ;
notify_head: P2p.Peer_id.t -> Block_hash.t -> Operation_hash.t list -> unit ; notify_head: P2p.Peer_id.t -> Block_hash.t -> Operation_hash.t list -> unit ;
current_head: int -> (Block_hash.t * Operation_hash.t list) Lwt.t ;
disconnection: P2p.Peer_id.t -> unit ; disconnection: P2p.Peer_id.t -> unit ;
} }
val activate: callback:callback -> t -> State.Net.t -> net_db val activate: t -> State.Net.t -> net_db
val set_callback: net_db -> callback -> unit
val deactivate: net_db -> unit Lwt.t val deactivate: net_db -> unit Lwt.t
val broadcast_head: val broadcast_head:

View File

@ -146,6 +146,7 @@ let create net_db =
(filter_out r.applied !operations.branch_delayed) (filter_out r.applied !operations.branch_delayed)
r.branch_delayed ; r.branch_delayed ;
} ; } ;
Chain.set_reversed_mempool net_state !operations.applied >>= fun () ->
if broadcast then broadcast_operation r.applied ; if broadcast then broadcast_operation r.applied ;
Lwt_list.iter_s Lwt_list.iter_s
(fun (_op, _exns) -> (fun (_op, _exns) ->
@ -200,6 +201,8 @@ let create net_db =
applied = h :: !operations.applied }; applied = h :: !operations.applied };
return () ) return () )
res.applied >>=? fun () -> res.applied >>=? fun () ->
Chain.set_reversed_mempool
net_state !operations.applied >>= fun () ->
broadcast_operation res.applied ; broadcast_operation res.applied ;
begin begin
if force then if force then

View File

@ -71,6 +71,7 @@ and chain_state = {
and chain_data = { and chain_data = {
current_head: block ; current_head: block ;
current_reversed_mempool: Operation_hash.t list ;
} }
and block = { and block = {
@ -155,7 +156,8 @@ module Net = struct
net_state ; net_state ;
hash = current_head ; hash = current_head ;
contents = current_block ; contents = current_block ;
} } ;
current_reversed_mempool = [] ;
} ; } ;
chain_store ; chain_store ;
} }

View File

@ -153,6 +153,7 @@ val fork_testnet:
type chain_data = { type chain_data = {
current_head: Block.t ; current_head: Block.t ;
current_reversed_mempool: Operation_hash.t list ;
} }
val read_chain_store: val read_chain_store:

View File

@ -607,37 +607,23 @@ end
let rec create_validator ?parent worker ?max_child_ttl state db net = let rec create_validator ?parent worker ?max_child_ttl state db net =
let queue = Lwt_pipe.create () in let net_id = State.Net.id net in
let current_ops = ref (fun () -> []) in let net_db = Distributed_db.activate db net in
let session = Context_db.create net_db in
let callback : Distributed_db.callback = { let queue = Lwt_pipe.create () in
Prevalidator.create net_db >>= fun prevalidator ->
let new_blocks = ref Lwt.return_unit in
Distributed_db.set_callback net_db {
notify_branch = begin fun gid locator -> notify_branch = begin fun gid locator ->
Lwt.async (fun () -> Lwt_pipe.push queue (`Branch (gid, locator))) Lwt.async (fun () -> Lwt_pipe.push queue (`Branch (gid, locator)))
end ; end ;
current_branch = begin fun size ->
Chain.head net >>= fun head ->
Chain_traversal.block_locator head size
end ;
notify_head = begin fun gid block ops -> notify_head = begin fun gid block ops ->
Lwt.async (fun () -> Lwt_pipe.push queue (`Head (gid, block, ops))) ; Lwt.async (fun () -> Lwt_pipe.push queue (`Head (gid, block, ops))) ;
end ; end ;
current_head = begin fun size ->
Chain.head net >>= fun head ->
Lwt.return (State.Block.hash head, Utils.list_sub (!current_ops ()) size)
end ;
disconnection = (fun _gid -> ()) ; disconnection = (fun _gid -> ()) ;
} in } ;
let net_id = State.Net.id net in
let net_db = Distributed_db.activate ~callback db net in
let session = Context_db.create net_db in
Prevalidator.create net_db >>= fun prevalidator ->
current_ops :=
(fun () ->
let res, _ = Prevalidator.operations prevalidator in
res.applied);
let new_blocks = ref Lwt.return_unit in
let shutdown () = let shutdown () =
lwt_log_notice "shutdown %a" Net_id.pp net_id >>= fun () -> lwt_log_notice "shutdown %a" Net_id.pp net_id >>= fun () ->