From d866b1bfa572b57392f074fd162384a80ae7b5ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sat, 11 Nov 2017 03:34:12 +0100 Subject: [PATCH] Distributed_db: add primitive for sending 'message'. --- src/node/shell/distributed_db.ml | 56 ++++++++++++++++++++++++++----- src/node/shell/distributed_db.mli | 18 ++++++++-- src/node/shell/prevalidator.ml | 2 +- src/node/shell/validator.ml | 2 +- 4 files changed, 64 insertions(+), 14 deletions(-) diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index 84f308bcf..2ba817cbe 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -849,15 +849,6 @@ let clear_block net_db hash n = Raw_operation_hashes.clear_all net_db.operation_hashes_db.table hash n ; Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash -let broadcast_head net_db head mempool = - let net_id = State.Net.id net_db.net_state in - assert (Net_id.equal net_id (State.Block.net_id head)) ; - let msg : Message.t = - Current_head (net_id, State.Block.header head, mempool) in - P2p.Peer_id.Table.iter - (fun _peer_id state -> - ignore (P2p.try_send net_db.global_db.p2p state.conn msg)) - net_db.active_connections let watch_block_header { block_input } = Watcher.create_stream block_input @@ -958,3 +949,50 @@ module Protocol = let proj db = db.protocol_db.table end) + +let broadcast net_db msg = + P2p.Peer_id.Table.iter + (fun _peer_id state -> + ignore (P2p.try_send net_db.global_db.p2p state.conn msg)) + net_db.active_connections + +let try_send net_db peer_id msg = + try + let conn = P2p.Peer_id.Table.find net_db.active_connections peer_id in + ignore (P2p.try_send net_db.global_db.p2p conn.conn msg : bool) + with Not_found -> () + +let send net_db ?peer msg = + match peer with + | Some peer -> try_send net_db peer msg + | None -> broadcast net_db msg + +module Request = struct + + let current_head net_db ?peer () = + let net_id = State.Net.id net_db.net_state in + send net_db ?peer @@ Get_current_head net_id + + let current_branch net_db ?peer () = + let net_id = State.Net.id net_db.net_state in + send net_db ?peer @@ Get_current_branch net_id + +end + +module Advertise = struct + + let current_head net_db ?peer ?(mempool = []) head = + let net_id = State.Net.id net_db.net_state in + assert (Net_id.equal net_id (State.Block.net_id head)) ; + send net_db ?peer @@ + Current_head (net_id, State.Block.header head, mempool) + + let current_branch net_db ?peer head = + let net_id = State.Net.id net_db.net_state in + assert (Net_id.equal net_id (State.Block.net_id head)) ; + Block_locator.compute head 200 >>= fun locator -> + send net_db ?peer @@ Current_branch (net_id, locator) ; + Lwt.return_unit + +end + diff --git a/src/node/shell/distributed_db.mli b/src/node/shell/distributed_db.mli index fb80ef29e..92eea4021 100644 --- a/src/node/shell/distributed_db.mli +++ b/src/node/shell/distributed_db.mli @@ -38,9 +38,6 @@ val deactivate: net_db -> unit Lwt.t val disconnect: net_db -> P2p.Peer_id.t -> unit Lwt.t -val broadcast_head: - net_db -> State.Block.t -> Operation_hash.t list -> unit - type operation = | Blob of Operation.t | Hash of Operation_hash.t @@ -135,3 +132,18 @@ module Raw : sig val encoding: Message.t P2p.Raw.t Data_encoding.t val supported_versions: P2p_types.Version.t list end + +module Request : sig + val current_branch: net_db -> ?peer:P2p.Peer_id.t -> unit -> unit + val current_head: net_db -> ?peer:P2p.Peer_id.t -> unit -> unit +end + +module Advertise : sig + val current_head: + net_db -> ?peer:P2p.Peer_id.t -> + ?mempool:Operation_hash.t list -> State.Block.t -> unit + val current_branch: + net_db -> ?peer:P2p.Peer_id.t -> + State.Block.t -> unit Lwt.t +end + diff --git a/src/node/shell/prevalidator.ml b/src/node/shell/prevalidator.ml index 26b43ed2b..059c1d362 100644 --- a/src/node/shell/prevalidator.ml +++ b/src/node/shell/prevalidator.ml @@ -97,7 +97,7 @@ let create net_db = Lwt.return_unit in let broadcast_operation ops = - Distributed_db.broadcast_head net_db !head ops in + Distributed_db.Advertise.current_head net_db ~mempool:ops !head in let handle_unprocessed () = if Operation_hash.Set.is_empty !unprocessed then diff --git a/src/node/shell/validator.ml b/src/node/shell/validator.ml index a7710620f..4273be039 100644 --- a/src/node/shell/validator.ml +++ b/src/node/shell/validator.ml @@ -113,7 +113,7 @@ let rec may_set_head v (block: State.Block.t) = Chain.test_and_set_head v.net ~old:head block >>= function | false -> may_set_head v block | true -> - Distributed_db.broadcast_head v.net_db block [] ; + Distributed_db.Advertise.current_head v.net_db block ; Prevalidator.flush v.prevalidator block ; begin begin