Distributed_db: inline the header when broadcasting a new block.

This adds a small size overhead in the network message, but in most
cases it will avoid a subsequent 'fetch' of the header.
This commit is contained in:
Grégoire Henry 2017-11-11 03:34:12 +01:00 committed by Benjamin Canou
parent 06873da197
commit f63c5acbf5
11 changed files with 81 additions and 48 deletions

View File

@ -9,14 +9,16 @@
open State open State
type t = Block_hash.t list type t = Block_header.t * Block_hash.t list
type error += Invalid_locator of P2p.Peer_id.t * t type error += Invalid_locator of P2p.Peer_id.t * t
let encoding = let encoding =
let open Data_encoding in let open Data_encoding in
(* TODO add a [description] *) (* TODO add a [description] *)
list Block_hash.encoding (obj2
(req "current_head" (dynamic_size Block_header.encoding))
(req "history" (dynamic_size (list Block_hash.encoding))))
let compute (b: Block.t) sz = let compute (b: Block.t) sz =
let rec loop acc sz step cpt b = let rec loop acc sz step cpt b =
@ -35,9 +37,13 @@ let compute (b: Block.t) sz =
step (cpt - 1) predecessor step (cpt - 1) predecessor
else else
loop acc sz step (cpt - 1) predecessor in loop acc sz step (cpt - 1) predecessor in
loop [] sz 1 9 b Block.predecessor b >>= function
| None -> Lwt.return (State.Block.header b, [])
| Some p ->
loop [] sz 1 9 p >>= fun hist ->
Lwt.return (State.Block.header b, hist)
let estimated_length hist = let estimated_length (_head, hist) =
let rec loop acc step cpt = function let rec loop acc step cpt = function
| [] -> acc | [] -> acc
| _ :: hist -> | _ :: hist ->
@ -46,9 +52,9 @@ let estimated_length hist =
else else
loop (acc+step) step (cpt-1) hist loop (acc+step) step (cpt-1) hist
in in
loop 0 1 9 hist loop 1 1 9 hist
let fold ~f acc hist = let fold ~f acc (head, hist) =
let rec loop step cpt acc = function let rec loop step cpt acc = function
| [] | [_] -> acc | [] | [_] -> acc
| block :: (pred :: rem as hist) -> | block :: (pred :: rem as hist) ->
@ -60,7 +66,7 @@ let fold ~f acc hist =
let acc = f acc ~block ~pred ~step ~strict_step:(rem <> []) in let acc = f acc ~block ~pred ~step ~strict_step:(rem <> []) in
loop step cpt acc hist loop step cpt acc hist
in in
loop 1 9 acc hist loop 1 10 acc (Block_header.hash head :: hist)
type step = { type step = {
block: Block_hash.t ; block: Block_hash.t ;
@ -69,13 +75,13 @@ type step = {
strict_step: bool ; strict_step: bool ;
} }
let to_steps hist = let to_steps locator =
fold fold
~f:begin fun acc ~block ~pred ~step ~strict_step -> { ~f:begin fun acc ~block ~pred ~step ~strict_step -> {
block ; predecessor = pred ; step ; strict_step ; block ; predecessor = pred ; step ; strict_step ;
} :: acc } :: acc
end end
[] hist [] locator
let rec known_ancestor net_state acc hist = let rec known_ancestor net_state acc hist =
match hist with match hist with
@ -88,10 +94,22 @@ let rec known_ancestor net_state acc hist =
| true -> Lwt.return_none | true -> Lwt.return_none
| false -> known_ancestor net_state (h :: acc) hist | false -> known_ancestor net_state (h :: acc) hist
let known_ancestor net_state hist = let known_ancestor net_state (head, hist) =
known_ancestor net_state [] hist let hash = Block_header.hash head in
if Block_hash.equal hash (State.Net.faked_genesis_hash net_state) then
State.Block.read_exn
net_state (State.Net.genesis net_state).block >>= fun genesis ->
Lwt.return_some (genesis, (head, []))
else
State.Block.read_opt net_state hash >>= function
| Some ancestor -> Lwt.return_some (ancestor, (head, []))
| None ->
known_ancestor net_state [] hist >>= function
| None -> Lwt.return_none
| Some (ancestor, prefix) ->
Lwt.return_some (ancestor, (head, prefix))
let find_new net_state hist sz = let find_new net_state locator sz =
let rec path sz acc h = let rec path sz acc h =
if sz <= 0 then Lwt.return (List.rev acc) if sz <= 0 then Lwt.return (List.rev acc)
else else
@ -100,7 +118,7 @@ let find_new net_state hist sz =
end >>= function end >>= function
| None -> Lwt.return (List.rev acc) | None -> Lwt.return (List.rev acc)
| Some s -> path (sz-1) (s :: acc) s in | Some s -> path (sz-1) (s :: acc) s in
known_ancestor net_state hist >>= function known_ancestor net_state locator >>= function
| None -> Lwt.return_nil | None -> Lwt.return_nil
| Some (known, _) -> | Some (known, _) ->
Chain.head net_state >>= fun head -> Chain.head net_state >>= fun head ->

View File

@ -9,7 +9,7 @@
open State open State
type t = private Block_hash.t list type t = private (Block_header.t * Block_hash.t list)
(** A type for sparse block locator (/à la/ Bitcoin) *) (** A type for sparse block locator (/à la/ Bitcoin) *)
val encoding: t Data_encoding.t val encoding: t Data_encoding.t

View File

@ -300,8 +300,10 @@ module Raw_protocol =
end) end)
type callback = { type callback = {
notify_branch: P2p.Peer_id.t -> Block_locator.t -> unit ; notify_branch:
notify_head: P2p.Peer_id.t -> Block_hash.t -> Operation_hash.t list -> unit ; P2p.Peer_id.t -> Block_locator.t -> unit ;
notify_head:
P2p.Peer_id.t -> Block_header.t -> Operation_hash.t list -> unit ;
disconnection: P2p.Peer_id.t -> unit ; disconnection: P2p.Peer_id.t -> unit ;
} }
@ -416,9 +418,10 @@ module P2p_reader = struct
| Current_branch (net_id, locator) -> | Current_branch (net_id, locator) ->
may_activate global_db state net_id @@ fun net_db -> may_activate global_db state net_id @@ fun net_db ->
let head, hist = (locator :> Block_header.t * Block_hash.t list) in
Lwt_list.exists_p Lwt_list.exists_p
(State.Block.known_invalid net_db.net_state) (State.Block.known_invalid net_db.net_state)
(locator :> Block_hash.t list) >>= fun known_invalid -> (Block_header.hash head :: hist) >>= fun known_invalid ->
if not known_invalid then if not known_invalid then
net_db.callback.notify_branch state.gid locator ; net_db.callback.notify_branch state.gid locator ;
(* TODO Kickban *) (* TODO Kickban *)
@ -436,15 +439,16 @@ module P2p_reader = struct
Chain.mempool net_db.net_state >>= fun mempool -> Chain.mempool net_db.net_state >>= fun mempool ->
ignore ignore
@@ P2p.try_send global_db.p2p state.conn @@ P2p.try_send global_db.p2p state.conn
@@ Current_head (net_id, State.Block.hash head, @@ Current_head (net_id, State.Block.header head,
Utils.list_sub mempool 200) ; Utils.list_sub mempool 200) ;
Lwt.return_unit Lwt.return_unit
| Current_head (net_id, head, mempool) -> | Current_head (net_id, header, mempool) ->
may_handle state net_id @@ fun net_db -> may_handle state net_id @@ fun net_db ->
let head = Block_header.hash header in
State.Block.known_invalid net_db.net_state head >>= fun known_invalid -> State.Block.known_invalid net_db.net_state head >>= fun known_invalid ->
if not known_invalid then if not known_invalid then
net_db.callback.notify_head state.gid head mempool ; net_db.callback.notify_head state.gid header mempool ;
(* TODO Kickban *) (* TODO Kickban *)
Lwt.return_unit Lwt.return_unit
@ -846,8 +850,10 @@ let clear_block net_db hash n =
Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash Raw_block_header.Table.clear_or_cancel net_db.block_header_db.table hash
let broadcast_head net_db head mempool = let broadcast_head net_db head mempool =
let net_id = State.Net.id net_db.net_state in
assert (Net_id.equal net_id (State.Block.net_id head)) ;
let msg : Message.t = let msg : Message.t =
Current_head (State.Net.id net_db.net_state, head, mempool) in Current_head (net_id, State.Block.header head, mempool) in
P2p.Peer_id.Table.iter P2p.Peer_id.Table.iter
(fun _peer_id state -> (fun _peer_id state ->
ignore (P2p.try_send net_db.global_db.p2p state.conn msg)) ignore (P2p.try_send net_db.global_db.p2p state.conn msg))

View File

@ -25,8 +25,10 @@ val net_state: net_db -> State.Net.t
val db: net_db -> db val db: net_db -> db
type callback = { type callback = {
notify_branch: P2p.Peer_id.t -> Block_locator.t -> unit ; notify_branch:
notify_head: P2p.Peer_id.t -> Block_hash.t -> Operation_hash.t list -> unit ; P2p.Peer_id.t -> Block_locator.t -> unit ;
notify_head:
P2p.Peer_id.t -> Block_header.t -> Operation_hash.t list -> unit ;
disconnection: P2p.Peer_id.t -> unit ; disconnection: P2p.Peer_id.t -> unit ;
} }
@ -37,7 +39,7 @@ val deactivate: net_db -> unit Lwt.t
val disconnect: net_db -> P2p.Peer_id.t -> unit Lwt.t val disconnect: net_db -> P2p.Peer_id.t -> unit Lwt.t
val broadcast_head: val broadcast_head:
net_db -> Block_hash.t -> Operation_hash.t list -> unit net_db -> State.Block.t -> Operation_hash.t list -> unit
type operation = type operation =
| Blob of Operation.t | Blob of Operation.t

View File

@ -14,7 +14,7 @@ type t =
| Deactivate of Net_id.t | Deactivate of Net_id.t
| Get_current_head of Net_id.t | Get_current_head of Net_id.t
| Current_head of Net_id.t * Block_hash.t * Operation_hash.t list | Current_head of Net_id.t * Block_header.t * Operation_hash.t list
| Get_block_headers of Net_id.t * Block_hash.t list | Get_block_headers of Net_id.t * Block_hash.t list
| Block_header of Block_header.t | Block_header of Block_header.t
@ -53,9 +53,9 @@ let encoding =
(req "net_id" Net_id.encoding) (req "net_id" Net_id.encoding)
(req "current_branch" Block_locator.encoding)) (req "current_branch" Block_locator.encoding))
(function (function
| Current_branch (net_id, bhs) -> Some (net_id, bhs) | Current_branch (net_id, locator) -> Some (net_id, locator)
| _ -> None) | _ -> None)
(fun (net_id, bhs) -> Current_branch (net_id, bhs)) ; (fun (net_id, locator) -> Current_branch (net_id, locator)) ;
case ~tag:0x12 case ~tag:0x12
(obj1 (obj1
@ -76,7 +76,7 @@ let encoding =
case ~tag:0x14 case ~tag:0x14
(obj3 (obj3
(req "net_id" Net_id.encoding) (req "net_id" Net_id.encoding)
(req "current_head" Block_hash.encoding) (req "current_block_header" (dynamic_size Block_header.encoding))
(req "current_mempool" (list Operation_hash.encoding))) (req "current_mempool" (list Operation_hash.encoding)))
(function (function
| Current_head (net_id, bh, ops) -> Some (net_id, bh, ops) | Current_head (net_id, bh, ops) -> Some (net_id, bh, ops)
@ -184,4 +184,5 @@ let raw_encoding = P2p.Raw.encoding encoding
let pp_json ppf msg = let pp_json ppf msg =
Format.pp_print_string ppf Format.pp_print_string ppf
(Data_encoding_ezjsonm.to_string (Data_encoding.Json.construct raw_encoding (Message msg))) (Data_encoding_ezjsonm.to_string
(Data_encoding.Json.construct raw_encoding (Message msg)))

View File

@ -14,7 +14,7 @@ type t =
| Deactivate of Net_id.t | Deactivate of Net_id.t
| Get_current_head of Net_id.t | Get_current_head of Net_id.t
| Current_head of Net_id.t * Block_hash.t * Operation_hash.t list | Current_head of Net_id.t * Block_header.t * Operation_hash.t list
| Get_block_headers of Net_id.t * Block_hash.t list | Get_block_headers of Net_id.t * Block_hash.t list
| Block_header of Block_header.t | Block_header of Block_header.t

View File

@ -97,8 +97,7 @@ let create net_db =
Lwt.return_unit in Lwt.return_unit in
let broadcast_operation ops = let broadcast_operation ops =
let hash = State.Block.hash !head in Distributed_db.broadcast_head net_db !head ops in
Distributed_db.broadcast_head net_db hash ops in
let handle_unprocessed () = let handle_unprocessed () =
if Operation_hash.Set.is_empty !unprocessed then if Operation_hash.Set.is_empty !unprocessed then

View File

@ -65,6 +65,7 @@ and net_state = {
global_state: global_state ; global_state: global_state ;
net_id: Net_id.t ; net_id: Net_id.t ;
genesis: genesis ; genesis: genesis ;
faked_genesis_hash: Block_hash.t ;
expiration: Time.t option ; expiration: Time.t option ;
allow_forked_network: bool ; allow_forked_network: bool ;
block_store: Store.Block.store Shared.t ; block_store: Store.Block.store Shared.t ;
@ -160,7 +161,7 @@ module Net = struct
type net_state = t type net_state = t
let allocate let allocate
~genesis ~expiration ~allow_forked_network ~genesis ~faked_genesis_hash ~expiration ~allow_forked_network
~current_head ~current_head
global_state context_index chain_store block_store = global_state context_index chain_store block_store =
Store.Block.Contents.read_exn Store.Block.Contents.read_exn
@ -180,7 +181,7 @@ module Net = struct
global_state ; global_state ;
net_id = Net_id.of_block_hash genesis.block ; net_id = Net_id.of_block_hash genesis.block ;
chain_state = { Shared.data = chain_state ; lock = Lwt_mutex.create () } ; chain_state = { Shared.data = chain_state ; lock = Lwt_mutex.create () } ;
genesis ; genesis ; faked_genesis_hash ;
expiration ; expiration ;
allow_forked_network ; allow_forked_network ;
block_store = Shared.create block_store ; block_store = Shared.create block_store ;
@ -212,9 +213,10 @@ module Net = struct
Lwt.return_unit Lwt.return_unit
end >>= fun () -> end >>= fun () ->
Locked_block.store_genesis Locked_block.store_genesis
block_store genesis commit >>= fun _genesis_header -> block_store genesis commit >>= fun genesis_header ->
allocate allocate
~genesis ~genesis
~faked_genesis_hash:(Block_header.hash genesis_header)
~current_head:genesis.block ~current_head:genesis.block
~expiration ~expiration
~allow_forked_network ~allow_forked_network
@ -250,11 +252,13 @@ module Net = struct
Store.Net.Expiration.read_opt net_store >>= fun expiration -> Store.Net.Expiration.read_opt net_store >>= fun expiration ->
Store.Net.Allow_forked_network.known Store.Net.Allow_forked_network.known
data.global_store id >>= fun allow_forked_network -> data.global_store id >>= fun allow_forked_network ->
Store.Block.Contents.read (block_store, genesis_hash) >>=? fun genesis_header ->
let genesis = { time ; protocol ; block = genesis_hash } in let genesis = { time ; protocol ; block = genesis_hash } in
Store.Chain.Current_head.read chain_store >>=? fun current_head -> Store.Chain.Current_head.read chain_store >>=? fun current_head ->
try try
allocate allocate
~genesis ~genesis
~faked_genesis_hash:(Block_header.hash genesis_header.header)
~current_head ~current_head
~expiration ~expiration
~allow_forked_network ~allow_forked_network
@ -293,6 +297,7 @@ module Net = struct
let id { net_id } = net_id let id { net_id } = net_id
let genesis { genesis } = genesis let genesis { genesis } = genesis
let faked_genesis_hash { faked_genesis_hash } = faked_genesis_hash
let expiration { expiration } = expiration let expiration { expiration } = expiration
let allow_forked_network { allow_forked_network } = allow_forked_network let allow_forked_network { allow_forked_network } = allow_forked_network
let global_state { global_state } = global_state let global_state { global_state } = global_state

View File

@ -72,6 +72,7 @@ module Net : sig
val id: net_state -> Net_id.t val id: net_state -> Net_id.t
val genesis: net_state -> genesis val genesis: net_state -> genesis
val faked_genesis_hash: net_state -> Block_hash.t
val expiration: net_state -> Time.t option val expiration: net_state -> Time.t option
val allow_forked_network: net_state -> bool val allow_forked_network: net_state -> bool
(** Accessors. Respectively access to; (** Accessors. Respectively access to;

View File

@ -113,7 +113,7 @@ let rec may_set_head v (block: State.Block.t) =
Chain.test_and_set_head v.net ~old:head block >>= function Chain.test_and_set_head v.net ~old:head block >>= function
| false -> may_set_head v block | false -> may_set_head v block
| true -> | true ->
Distributed_db.broadcast_head v.net_db block_hash [] ; Distributed_db.broadcast_head v.net_db block [] ;
Prevalidator.flush v.prevalidator block ; Prevalidator.flush v.prevalidator block ;
begin begin
begin begin
@ -623,7 +623,7 @@ let rec create_validator ?parent worker ?max_child_ttl state db net =
Lwt.async (fun () -> Lwt_pipe.push queue (`Branch (gid, locator))) Lwt.async (fun () -> Lwt_pipe.push queue (`Branch (gid, locator)))
end ; end ;
notify_head = begin fun gid block ops -> notify_head = begin fun gid block ops ->
Lwt.async (fun () -> Lwt_pipe.push queue (`Head (gid, block, ops))) ; Lwt.async (fun () -> Lwt_pipe.push queue (`Head (gid, Block_header.hash block, ops))) ;
end ; end ;
disconnection = (fun _gid -> ()) ; disconnection = (fun _gid -> ()) ;
} ; } ;
@ -759,9 +759,10 @@ let rec create_validator ?parent worker ?max_child_ttl state db net =
let rec loop () = let rec loop () =
Lwt_pipe.pop queue >>= function Lwt_pipe.pop queue >>= function
| `Branch (_gid, locator) -> | `Branch (_gid, locator) ->
let head, hist = (locator : Block_locator.t :> _ * _) in
List.iter List.iter
(Context_db.prefetch v session) (Context_db.prefetch v session)
(locator :> Block_hash.t list) ; (Block_header.hash head :: hist) ;
loop () loop ()
| `Head (gid, head, ops) -> | `Head (gid, head, ops) ->
Context_db.prefetch v session head ; Context_db.prefetch v session head ;

View File

@ -279,7 +279,7 @@ let test_locator s =
let check_locator h1 expected = let check_locator h1 expected =
Block_locator.compute Block_locator.compute
(vblock s h1) (List.length expected) >>= fun l -> (vblock s h1) (List.length expected) >>= fun l ->
let l = (l :> Block_hash.t list) in let _, l = (l : Block_locator.t :> _ * _) in
if List.length l <> List.length expected then if List.length l <> List.length expected then
Assert.fail_msg Assert.fail_msg
"Invalid locator length %s (found: %d, expected: %d)" "Invalid locator length %s (found: %d, expected: %d)"
@ -287,12 +287,12 @@ let test_locator s =
List.iter2 List.iter2
(fun h h2 -> (fun h h2 ->
if not (Block_hash.equal h (State.Block.hash @@ vblock s h2)) then if not (Block_hash.equal h (State.Block.hash @@ vblock s h2)) then
Assert.fail_msg "Invalid locator %s (expectd: %s)" h1 h2) Assert.fail_msg "Invalid locator %s (expected: %s)" h1 h2)
l expected ; l expected ;
Lwt.return_unit in Lwt.return_unit in
check_locator "A8" ["A8";"A7";"A6";"A5";"A4";"A3";"A2"] >>= fun () -> check_locator "A8" ["A7";"A6";"A5";"A4";"A3";"A2"] >>= fun () ->
check_locator "B8" ["B8";"B7";"B6";"B5";"B4";"B3";"B2";"B1";"A3"] >>= fun () -> check_locator "B8" ["B7";"B6";"B5";"B4";"B3";"B2";"B1";"A3"] >>= fun () ->
check_locator "B8" ["B8";"B7";"B6";"B5";"B4"] >>= fun () -> check_locator "B8" ["B7";"B6";"B5";"B4"] >>= fun () ->
return () return ()
@ -397,7 +397,7 @@ let test_new_blocks s =
and from_block = vblock s h in and from_block = vblock s h in
Chain_traversal.new_blocks ~from_block ~to_block >>= fun (ancestor, blocks) -> Chain_traversal.new_blocks ~from_block ~to_block >>= fun (ancestor, blocks) ->
if not (Block_hash.equal (State.Block.hash ancestor) (State.Block.hash @@ vblock s expected_ancestor)) then if not (Block_hash.equal (State.Block.hash ancestor) (State.Block.hash @@ vblock s expected_ancestor)) then
Assert.fail_msg "Invalid locator %s (expected: %s)" h expected_ancestor ; Assert.fail_msg "Invalid ancestor %s -> %s (expected: %s)" head h expected_ancestor ;
if List.length blocks <> List.length expected then if List.length blocks <> List.length expected then
Assert.fail_msg Assert.fail_msg
"Invalid locator length %s (found: %d, expected: %d)" "Invalid locator length %s (found: %d, expected: %d)"
@ -405,7 +405,7 @@ let test_new_blocks s =
List.iter2 List.iter2
(fun h1 h2 -> (fun h1 h2 ->
if not (Block_hash.equal (State.Block.hash h1) (State.Block.hash @@ vblock s h2)) then if not (Block_hash.equal (State.Block.hash h1) (State.Block.hash @@ vblock s h2)) then
Assert.fail_msg "Invalid locator %s (expected: %s)" h h2) Assert.fail_msg "Invalid new blocks %s -> %s (expected: %s)" head h h2)
blocks expected ; blocks expected ;
Lwt.return_unit Lwt.return_unit
in in
@ -425,17 +425,17 @@ let test_find_new s =
Block_locator.find_new s.net loc (List.length expected) >>= fun blocks -> Block_locator.find_new s.net loc (List.length expected) >>= fun blocks ->
if List.length blocks <> List.length expected then if List.length blocks <> List.length expected then
Assert.fail_msg Assert.fail_msg
"Invalid locator length %s (found: %d, expected: %d)" "Invalid find new length %s (found: %d, expected: %d)"
h (List.length blocks) (List.length expected) ; h (List.length blocks) (List.length expected) ;
List.iter2 List.iter2
(fun h1 h2 -> (fun h1 h2 ->
if not (Block_hash.equal h1 (State.Block.hash @@ vblock s h2)) then if not (Block_hash.equal h1 (State.Block.hash @@ vblock s h2)) then
Assert.fail_msg "Invalid locator %s (expected: %s)" h h2) Assert.fail_msg "Invalid find new %s.%d (expected: %s)" h (List.length expected) h2)
blocks expected ; blocks expected ;
Lwt.return_unit Lwt.return_unit
in in
test s "A6" [] >>= fun () ->
Chain.set_head s.net (vblock s "A8") >>= fun _ -> Chain.set_head s.net (vblock s "A8") >>= fun _ ->
test s "A6" [] >>= fun () ->
test s "A6" ["A7";"A8"] >>= fun () -> test s "A6" ["A7";"A8"] >>= fun () ->
test s "A6" ["A7"] >>= fun () -> test s "A6" ["A7"] >>= fun () ->
test s "B4" ["A4"] >>= fun () -> test s "B4" ["A4"] >>= fun () ->