d5925f088c
This allows a node to specify that it doesn't want to receive operations that are not included in a block. To do so, one should use the new --disable-mempool option. A disabled mempool node announce its configuration during the ACK exchange. This configuration should be easily expandable with other options. Node will keep track of the ack exchange configuration for each nodes it is connected to and will not propagate its mempool to disabled mempool nodes. A disabled mempool node will ignore non-empty mempool received. In the future, this incident should be punish by a decrement of its score and / or ban.
1003 lines
33 KiB
OCaml
1003 lines
33 KiB
OCaml
(**************************************************************************)
|
|
(* *)
|
|
(* Copyright (c) 2014 - 2018. *)
|
|
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
|
|
(* *)
|
|
(* All rights reserved. No warranty, explicit or implicit, provided. *)
|
|
(* *)
|
|
(**************************************************************************)
|
|
|
|
module Message = Distributed_db_message
|
|
|
|
type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net
|
|
type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection
|
|
|
|
type 'a request_param = {
|
|
data: 'a ;
|
|
active: unit -> P2p_peer.Set.t ;
|
|
send: P2p_peer.Id.t -> Message.t -> unit ;
|
|
}
|
|
|
|
module Make_raw
|
|
(Hash : sig
|
|
type t
|
|
val name : string
|
|
val encoding : t Data_encoding.t
|
|
val pp : Format.formatter -> t -> unit
|
|
end)
|
|
(Disk_table :
|
|
Distributed_db_functors.DISK_TABLE with type key := Hash.t)
|
|
(Memory_table :
|
|
Distributed_db_functors.MEMORY_TABLE with type key := Hash.t)
|
|
(Request_message : sig
|
|
type param
|
|
val forge : param -> Hash.t list -> Message.t
|
|
end)
|
|
(Precheck : Distributed_db_functors.PRECHECK
|
|
with type key := Hash.t
|
|
and type value := Disk_table.value) = struct
|
|
|
|
module Request = struct
|
|
type param = Request_message.param request_param
|
|
let active { active } = active ()
|
|
let send { data ; send } gid keys =
|
|
send gid (Request_message.forge data keys)
|
|
end
|
|
|
|
module Scheduler =
|
|
Distributed_db_functors.Make_request_scheduler
|
|
(Hash) (Memory_table) (Request)
|
|
module Table =
|
|
Distributed_db_functors.Make_table
|
|
(Hash) (Disk_table) (Memory_table) (Scheduler) (Precheck)
|
|
|
|
type t = {
|
|
scheduler: Scheduler.t ;
|
|
table: Table.t ;
|
|
}
|
|
|
|
let create ?global_input request_param param =
|
|
let scheduler = Scheduler.create request_param in
|
|
let table = Table.create ?global_input scheduler param in
|
|
{ scheduler ; table }
|
|
|
|
let shutdown { scheduler } =
|
|
Scheduler.shutdown scheduler
|
|
|
|
end
|
|
|
|
module Fake_operation_storage = struct
|
|
type store = State.Chain.t
|
|
type value = Operation.t
|
|
let known _ _ = Lwt.return_false
|
|
let read _ _ = Lwt.return (Error_monad.error_exn Not_found)
|
|
let read_opt _ _ = Lwt.return_none
|
|
let read_exn _ _ = raise Not_found
|
|
end
|
|
|
|
module Raw_operation =
|
|
Make_raw
|
|
(Operation_hash)
|
|
(Fake_operation_storage)
|
|
(Operation_hash.Table)
|
|
(struct
|
|
type param = unit
|
|
let forge () keys = Message.Get_operations keys
|
|
end)
|
|
(struct
|
|
type param = unit
|
|
type notified_value = Operation.t
|
|
let precheck _ _ v = Some v
|
|
end)
|
|
|
|
module Block_header_storage = struct
|
|
type store = State.Chain.t
|
|
type value = Block_header.t
|
|
let known = State.Block.known_valid
|
|
let read chain_state h =
|
|
State.Block.read chain_state h >>=? fun b ->
|
|
return (State.Block.header b)
|
|
let read_opt chain_state h =
|
|
State.Block.read_opt chain_state h >>= fun b ->
|
|
Lwt.return (Option.map ~f:State.Block.header b)
|
|
let read_exn chain_state h =
|
|
State.Block.read_exn chain_state h >>= fun b ->
|
|
Lwt.return (State.Block.header b)
|
|
end
|
|
|
|
module Raw_block_header =
|
|
Make_raw
|
|
(Block_hash)
|
|
(Block_header_storage)
|
|
(Block_hash.Table)
|
|
(struct
|
|
type param = unit
|
|
let forge () keys = Message.Get_block_headers keys
|
|
end)
|
|
(struct
|
|
type param = unit
|
|
type notified_value = Block_header.t
|
|
let precheck _ _ v = Some v
|
|
end)
|
|
|
|
module Operation_hashes_storage = struct
|
|
type store = State.Chain.t
|
|
type value = Operation_hash.t list
|
|
let known chain_state (h, _) = State.Block.known_valid chain_state h
|
|
let read chain_state (h, i) =
|
|
State.Block.read chain_state h >>=? fun b ->
|
|
State.Block.operation_hashes b i >>= fun (ops, _) ->
|
|
return ops
|
|
let read_opt chain_state (h, i) =
|
|
State.Block.read_opt chain_state h >>= function
|
|
| None -> Lwt.return_none
|
|
| Some b ->
|
|
State.Block.operation_hashes b i >>= fun (ops, _) ->
|
|
Lwt.return (Some ops)
|
|
let read_exn chain_state (h, i) =
|
|
State.Block.read_exn chain_state h >>= fun b ->
|
|
State.Block.operation_hashes b i >>= fun (ops, _) ->
|
|
Lwt.return ops
|
|
end
|
|
|
|
module Operations_table =
|
|
Hashtbl.Make(struct
|
|
type t = Block_hash.t * int
|
|
let hash = Hashtbl.hash
|
|
let equal (b1, i1) (b2, i2) =
|
|
Block_hash.equal b1 b2 && i1 = i2
|
|
end)
|
|
|
|
module Raw_operation_hashes = struct
|
|
|
|
include
|
|
Make_raw
|
|
(struct
|
|
type t = Block_hash.t * int
|
|
let name = "operation_hashes"
|
|
let pp ppf (h, n) = Format.fprintf ppf "%a:%d" Block_hash.pp h n
|
|
let encoding =
|
|
let open Data_encoding in
|
|
obj2 (req "block" Block_hash.encoding) (req "index" uint16)
|
|
end)
|
|
(Operation_hashes_storage)
|
|
(Operations_table)
|
|
(struct
|
|
type param = unit
|
|
let forge () keys =
|
|
Message.Get_operation_hashes_for_blocks keys
|
|
end)
|
|
(struct
|
|
type param = Operation_list_list_hash.t
|
|
type notified_value =
|
|
Operation_hash.t list * Operation_list_list_hash.path
|
|
let precheck (_block, expected_ofs) expected_hash (ops, path) =
|
|
let received_hash, received_ofs =
|
|
Operation_list_list_hash.check_path path
|
|
(Operation_list_hash.compute ops) in
|
|
if
|
|
received_ofs = expected_ofs &&
|
|
Operation_list_list_hash.compare expected_hash received_hash = 0
|
|
then
|
|
Some ops
|
|
else
|
|
None
|
|
end)
|
|
|
|
let inject_all table hash operations =
|
|
Lwt_list.mapi_p
|
|
(fun i ops -> Table.inject table (hash, i) ops)
|
|
operations >>= Lwt_list.for_all_s (fun x -> Lwt.return x)
|
|
|
|
let read_all table hash n =
|
|
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
|
|
|
|
let clear_all table hash n =
|
|
List.iter (fun i -> Table.clear_or_cancel table (hash, i)) (0 -- (n-1))
|
|
|
|
end
|
|
|
|
module Operations_storage = struct
|
|
type store = State.Chain.t
|
|
type value = Operation.t list
|
|
let known chain_state (h, _) = State.Block.known_valid chain_state h
|
|
let read chain_state (h, i) =
|
|
State.Block.read chain_state h >>=? fun b ->
|
|
State.Block.operations b i >>= fun (ops, _) ->
|
|
return ops
|
|
let read_opt chain_state (h, i) =
|
|
State.Block.read_opt chain_state h >>= function
|
|
| None -> Lwt.return_none
|
|
| Some b ->
|
|
State.Block.operations b i >>= fun (ops, _) ->
|
|
Lwt.return (Some ops)
|
|
let read_exn chain_state (h, i) =
|
|
State.Block.read_exn chain_state h >>= fun b ->
|
|
State.Block.operations b i >>= fun (ops, _) ->
|
|
Lwt.return ops
|
|
end
|
|
|
|
module Raw_operations = struct
|
|
include
|
|
Make_raw
|
|
(struct
|
|
type t = Block_hash.t * int
|
|
let name = "operations"
|
|
let pp ppf (h, n) = Format.fprintf ppf "%a:%d" Block_hash.pp h n
|
|
let encoding =
|
|
let open Data_encoding in
|
|
obj2 (req "block" Block_hash.encoding) (req "index" uint16)
|
|
end)
|
|
(Operations_storage)
|
|
(Operations_table)
|
|
(struct
|
|
type param = unit
|
|
let forge () keys =
|
|
Message.Get_operations_for_blocks keys
|
|
end)
|
|
(struct
|
|
type param = Operation_list_list_hash.t
|
|
type notified_value = Operation.t list * Operation_list_list_hash.path
|
|
let precheck (_block, expected_ofs) expected_hash (ops, path) =
|
|
let received_hash, received_ofs =
|
|
Operation_list_list_hash.check_path path
|
|
(Operation_list_hash.compute
|
|
(List.map Operation.hash ops)) in
|
|
if
|
|
received_ofs = expected_ofs &&
|
|
Operation_list_list_hash.compare expected_hash received_hash = 0
|
|
then
|
|
Some ops
|
|
else
|
|
None
|
|
end)
|
|
|
|
let inject_all table hash operations =
|
|
Lwt_list.mapi_p
|
|
(fun i ops -> Table.inject table (hash, i) ops)
|
|
operations >>= Lwt_list.for_all_s (fun x -> Lwt.return x)
|
|
|
|
let read_all table hash n =
|
|
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
|
|
|
|
let clear_all table hash n =
|
|
List.iter (fun i -> Table.clear_or_cancel table (hash, i)) (0 -- (n-1))
|
|
|
|
end
|
|
|
|
module Protocol_storage = struct
|
|
type store = State.t
|
|
type value = Protocol.t
|
|
let known = State.Protocol.known
|
|
let read = State.Protocol.read
|
|
let read_opt = State.Protocol.read_opt
|
|
let read_exn = State.Protocol.read_exn
|
|
end
|
|
|
|
module Raw_protocol =
|
|
Make_raw
|
|
(Protocol_hash)
|
|
(Protocol_storage)
|
|
(Protocol_hash.Table)
|
|
(struct
|
|
type param = unit
|
|
let forge () keys = Message.Get_protocols keys
|
|
end)
|
|
(struct
|
|
type param = unit
|
|
type notified_value = Protocol.t
|
|
let precheck _ _ v = Some v
|
|
end)
|
|
|
|
type callback = {
|
|
notify_branch:
|
|
P2p_peer.Id.t -> Block_locator.t -> unit ;
|
|
notify_head:
|
|
P2p_peer.Id.t -> Block_header.t -> Mempool.t -> unit ;
|
|
disconnection: P2p_peer.Id.t -> unit ;
|
|
}
|
|
|
|
type db = {
|
|
p2p: p2p ;
|
|
p2p_readers: p2p_reader P2p_peer.Table.t ;
|
|
disk: State.t ;
|
|
active_chains: chain_db Chain_id.Table.t ;
|
|
protocol_db: Raw_protocol.t ;
|
|
block_input: (Block_hash.t * Block_header.t) Lwt_watcher.input ;
|
|
operation_input: (Operation_hash.t * Operation.t) Lwt_watcher.input ;
|
|
connection_metadata_value: (P2p_peer.Id.t -> Connection_metadata.t)
|
|
}
|
|
|
|
and chain_db = {
|
|
chain_state: State.Chain.t ;
|
|
global_db: db ;
|
|
operation_db: Raw_operation.t ;
|
|
block_header_db: Raw_block_header.t ;
|
|
operation_hashes_db: Raw_operation_hashes.t ;
|
|
operations_db: Raw_operations.t ;
|
|
mutable callback: callback ;
|
|
active_peers: P2p_peer.Set.t ref ;
|
|
active_connections: p2p_reader P2p_peer.Table.t ;
|
|
}
|
|
|
|
and p2p_reader = {
|
|
gid: P2p_peer.Id.t ;
|
|
conn: connection ;
|
|
peer_active_chains: chain_db Chain_id.Table.t ;
|
|
canceler: Lwt_canceler.t ;
|
|
mutable worker: unit Lwt.t ;
|
|
}
|
|
|
|
let noop_callback = {
|
|
notify_branch = begin fun _gid _locator -> () end ;
|
|
notify_head = begin fun _gid _block _ops -> () end ;
|
|
disconnection = begin fun _gid -> () end ;
|
|
}
|
|
|
|
type t = db
|
|
|
|
let state { disk } = disk
|
|
let chain_state { chain_state } = chain_state
|
|
let db { global_db } = global_db
|
|
|
|
let my_peer_id chain_db = P2p.peer_id chain_db.global_db.p2p
|
|
|
|
let read_block_header { disk } h =
|
|
State.read_block disk h >>= function
|
|
| Some b ->
|
|
Lwt.return_some (State.Block.chain_id b, State.Block.header b)
|
|
| None ->
|
|
Lwt.return_none
|
|
|
|
let find_pending_block_header { peer_active_chains } h =
|
|
Chain_id.Table.fold
|
|
(fun _chain_id chain_db acc ->
|
|
match acc with
|
|
| Some _ -> acc
|
|
| None when Raw_block_header.Table.pending
|
|
chain_db.block_header_db.table h ->
|
|
Some chain_db
|
|
| None -> None)
|
|
peer_active_chains
|
|
None
|
|
|
|
let find_pending_operations { peer_active_chains } h i =
|
|
Chain_id.Table.fold
|
|
(fun _chain_id chain_db acc ->
|
|
match acc with
|
|
| Some _ -> acc
|
|
| None when Raw_operations.Table.pending
|
|
chain_db.operations_db.table (h, i) ->
|
|
Some chain_db
|
|
| None -> None)
|
|
peer_active_chains
|
|
None
|
|
|
|
let find_pending_operation_hashes { peer_active_chains } h i =
|
|
Chain_id.Table.fold
|
|
(fun _chain_id chain_db acc ->
|
|
match acc with
|
|
| Some _ -> acc
|
|
| None when Raw_operation_hashes.Table.pending
|
|
chain_db.operation_hashes_db.table (h, i) ->
|
|
Some chain_db
|
|
| None -> None)
|
|
peer_active_chains
|
|
None
|
|
|
|
let find_pending_operation { peer_active_chains } h =
|
|
Chain_id.Table.fold
|
|
(fun _chain_id chain_db acc ->
|
|
match acc with
|
|
| Some _ -> acc
|
|
| None when Raw_operation.Table.pending
|
|
chain_db.operation_db.table h ->
|
|
Some chain_db
|
|
| None -> None)
|
|
peer_active_chains
|
|
None
|
|
|
|
let read_operation { active_chains } h =
|
|
Chain_id.Table.fold
|
|
(fun chain_id chain_db acc ->
|
|
acc >>= function
|
|
| Some _ -> acc
|
|
| None ->
|
|
Raw_operation.Table.read_opt
|
|
chain_db.operation_db.table h >>= function
|
|
| None -> Lwt.return_none
|
|
| Some bh -> Lwt.return_some (chain_id, bh))
|
|
active_chains
|
|
Lwt.return_none
|
|
|
|
module P2p_reader = struct
|
|
|
|
let may_activate global_db state chain_id f =
|
|
match Chain_id.Table.find state.peer_active_chains chain_id with
|
|
| chain_db ->
|
|
f chain_db
|
|
| exception Not_found ->
|
|
match Chain_id.Table.find global_db.active_chains chain_id with
|
|
| chain_db ->
|
|
chain_db.active_peers :=
|
|
P2p_peer.Set.add state.gid !(chain_db.active_peers) ;
|
|
P2p_peer.Table.add chain_db.active_connections
|
|
state.gid state ;
|
|
Chain_id.Table.add state.peer_active_chains chain_id chain_db ;
|
|
f chain_db
|
|
| exception Not_found ->
|
|
(* TODO decrease peer score. *)
|
|
Lwt.return_unit
|
|
|
|
let deactivate state chain_db =
|
|
chain_db.callback.disconnection state.gid ;
|
|
chain_db.active_peers :=
|
|
P2p_peer.Set.remove state.gid !(chain_db.active_peers) ;
|
|
P2p_peer.Table.remove chain_db.active_connections state.gid
|
|
|
|
let may_handle state chain_id f =
|
|
match Chain_id.Table.find state.peer_active_chains chain_id with
|
|
| exception Not_found ->
|
|
(* TODO decrease peer score *)
|
|
Lwt.return_unit
|
|
| chain_db ->
|
|
f chain_db
|
|
|
|
let may_handle_global global_db chain_id f =
|
|
match Chain_id.Table.find global_db.active_chains chain_id with
|
|
| exception Not_found ->
|
|
Lwt.return_unit
|
|
| chain_db ->
|
|
f chain_db
|
|
|
|
let handle_msg global_db state msg =
|
|
|
|
let open Message in
|
|
let module Logging =
|
|
Logging.Make(struct let name = "node.distributed_db.p2p_reader" end) in
|
|
let open Logging in
|
|
|
|
lwt_debug "Read message from %a: %a"
|
|
P2p_peer.Id.pp_short state.gid Message.pp_json msg >>= fun () ->
|
|
|
|
match msg with
|
|
|
|
| Get_current_branch chain_id ->
|
|
may_handle_global global_db chain_id @@ fun chain_db ->
|
|
if not (Chain_id.Table.mem state.peer_active_chains chain_id) then
|
|
ignore
|
|
@@ P2p.try_send global_db.p2p state.conn
|
|
@@ Get_current_branch chain_id ;
|
|
let seed = {
|
|
Block_locator.receiver_id=state.gid;
|
|
sender_id=(my_peer_id chain_db) } in
|
|
(Chain.locator chain_db.chain_state seed) >>= fun locator ->
|
|
ignore
|
|
@@ P2p.try_send global_db.p2p state.conn
|
|
@@ Current_branch (chain_id, locator) ;
|
|
Lwt.return_unit
|
|
|
|
| Current_branch (chain_id, locator) ->
|
|
may_activate global_db state chain_id @@ fun chain_db ->
|
|
let head, hist = (locator :> Block_header.t * Block_hash.t list) in
|
|
Lwt_list.exists_p
|
|
(State.Block.known_invalid chain_db.chain_state)
|
|
(Block_header.hash head :: hist) >>= fun known_invalid ->
|
|
if not known_invalid then
|
|
chain_db.callback.notify_branch state.gid locator
|
|
else
|
|
(* Kickban *)
|
|
P2p.greylist_peer global_db.p2p state.gid;
|
|
Lwt.return_unit
|
|
|
|
| Deactivate chain_id ->
|
|
may_handle state chain_id @@ fun chain_db ->
|
|
deactivate state chain_db ;
|
|
Chain_id.Table.remove state.peer_active_chains chain_id ;
|
|
Lwt.return_unit
|
|
|
|
| Get_current_head chain_id ->
|
|
may_handle state chain_id @@ fun chain_db ->
|
|
let { Connection_metadata.disable_mempool } =
|
|
P2p.connection_metadata chain_db.global_db.p2p state.conn in
|
|
begin
|
|
if disable_mempool then
|
|
Chain.head chain_db.chain_state >>= fun head ->
|
|
Lwt.return (State.Block.header head, Mempool.empty)
|
|
else
|
|
State.Current_mempool.get chain_db.chain_state
|
|
end >>= fun (head, mempool) ->
|
|
(* TODO bound the sent mempool size *)
|
|
ignore
|
|
@@ P2p.try_send global_db.p2p state.conn
|
|
@@ Current_head (chain_id, head, mempool) ;
|
|
Lwt.return_unit
|
|
|
|
| Current_head (chain_id, header, mempool) ->
|
|
may_handle state chain_id @@ fun chain_db ->
|
|
let head = Block_header.hash header in
|
|
State.Block.known_invalid chain_db.chain_state head >>= fun known_invalid ->
|
|
let { Connection_metadata.disable_mempool } =
|
|
chain_db.global_db.connection_metadata_value state.gid in
|
|
let known_invalid =
|
|
known_invalid ||
|
|
(disable_mempool && mempool <> Mempool.empty)
|
|
(* A non-empty mempool was received while mempool is desactivated,
|
|
so the message is ignored.
|
|
This should probably warrant a reduction of the sender's score. *)
|
|
in
|
|
if not known_invalid then
|
|
chain_db.callback.notify_head state.gid header mempool
|
|
else
|
|
(* Kickban *)
|
|
P2p.greylist_peer global_db.p2p state.gid ;
|
|
Lwt.return_unit
|
|
|
|
| Get_block_headers hashes ->
|
|
Lwt_list.iter_p
|
|
(fun hash ->
|
|
read_block_header global_db hash >>= function
|
|
| None ->
|
|
(* TODO: Blame request of unadvertised blocks ? *)
|
|
Lwt.return_unit
|
|
| Some (_chain_id, header) ->
|
|
ignore @@
|
|
P2p.try_send global_db.p2p state.conn (Block_header header) ;
|
|
Lwt.return_unit)
|
|
hashes
|
|
|
|
| Block_header block -> begin
|
|
let hash = Block_header.hash block in
|
|
match find_pending_block_header state hash with
|
|
| None ->
|
|
(* TODO some penalty. *)
|
|
Lwt.return_unit
|
|
| Some chain_db ->
|
|
Raw_block_header.Table.notify
|
|
chain_db.block_header_db.table state.gid hash block >>= fun () ->
|
|
Lwt.return_unit
|
|
end
|
|
|
|
| Get_operations hashes ->
|
|
Lwt_list.iter_p
|
|
(fun hash ->
|
|
read_operation global_db hash >>= function
|
|
| None ->
|
|
(* TODO: Blame request of unadvertised operations ? *)
|
|
Lwt.return_unit
|
|
| Some (_chain_id, op) ->
|
|
ignore @@
|
|
P2p.try_send global_db.p2p state.conn (Operation op) ;
|
|
Lwt.return_unit)
|
|
hashes
|
|
|
|
| Operation operation -> begin
|
|
let hash = Operation.hash operation in
|
|
match find_pending_operation state hash with
|
|
| None ->
|
|
(* TODO some penalty. *)
|
|
Lwt.return_unit
|
|
| Some chain_db ->
|
|
Raw_operation.Table.notify
|
|
chain_db.operation_db.table state.gid hash operation >>= fun () ->
|
|
Lwt.return_unit
|
|
end
|
|
|
|
| Get_protocols hashes ->
|
|
Lwt_list.iter_p
|
|
(fun hash ->
|
|
State.Protocol.read_opt global_db.disk hash >>= function
|
|
| None ->
|
|
(* TODO: Blame request of unadvertised protocol ? *)
|
|
Lwt.return_unit
|
|
| Some p ->
|
|
ignore @@
|
|
P2p.try_send global_db.p2p state.conn (Protocol p) ;
|
|
Lwt.return_unit)
|
|
hashes
|
|
|
|
| Protocol protocol ->
|
|
let hash = Protocol.hash protocol in
|
|
Raw_protocol.Table.notify
|
|
global_db.protocol_db.table state.gid hash protocol >>= fun () ->
|
|
Lwt.return_unit
|
|
|
|
| Get_operation_hashes_for_blocks blocks ->
|
|
Lwt_list.iter_p
|
|
(fun (hash, ofs) ->
|
|
State.read_block global_db.disk hash >>= function
|
|
| None -> Lwt.return_unit
|
|
| Some block ->
|
|
State.Block.operation_hashes
|
|
block ofs >>= fun (hashes, path) ->
|
|
ignore @@
|
|
P2p.try_send global_db.p2p state.conn @@
|
|
Operation_hashes_for_block (hash, ofs, hashes, path) ;
|
|
Lwt.return_unit)
|
|
blocks
|
|
|
|
| Operation_hashes_for_block (block, ofs, ops, path) -> begin
|
|
match find_pending_operation_hashes state block ofs with
|
|
| None ->
|
|
(* TODO some penalty. *)
|
|
Lwt.return_unit
|
|
| Some chain_db ->
|
|
Raw_operation_hashes.Table.notify
|
|
chain_db.operation_hashes_db.table state.gid
|
|
(block, ofs) (ops, path) >>= fun () ->
|
|
Lwt.return_unit
|
|
end
|
|
|
|
| Get_operations_for_blocks blocks ->
|
|
Lwt_list.iter_p
|
|
(fun (hash, ofs) ->
|
|
State.read_block global_db.disk hash >>= function
|
|
| None -> Lwt.return_unit
|
|
| Some block ->
|
|
State.Block.operations
|
|
block ofs >>= fun (ops, path) ->
|
|
ignore @@
|
|
P2p.try_send global_db.p2p state.conn @@
|
|
Operations_for_block (hash, ofs, ops, path) ;
|
|
Lwt.return_unit)
|
|
blocks
|
|
|
|
| Operations_for_block (block, ofs, ops, path) -> begin
|
|
match find_pending_operations state block ofs with
|
|
| None ->
|
|
(* TODO some penalty. *)
|
|
Lwt.return_unit
|
|
| Some chain_db ->
|
|
Raw_operations.Table.notify
|
|
chain_db.operations_db.table state.gid
|
|
(block, ofs) (ops, path) >>= fun () ->
|
|
Lwt.return_unit
|
|
end
|
|
|
|
let rec worker_loop global_db state =
|
|
protect ~canceler:state.canceler begin fun () ->
|
|
P2p.recv global_db.p2p state.conn
|
|
end >>= function
|
|
| Ok msg ->
|
|
handle_msg global_db state msg >>= fun () ->
|
|
worker_loop global_db state
|
|
| Error _ ->
|
|
Chain_id.Table.iter
|
|
(fun _ -> deactivate state)
|
|
state.peer_active_chains ;
|
|
P2p_peer.Table.remove global_db.p2p_readers state.gid ;
|
|
Lwt.return_unit
|
|
|
|
let run db gid conn =
|
|
let canceler = Lwt_canceler.create () in
|
|
let state = {
|
|
conn ; gid ; canceler ;
|
|
peer_active_chains = Chain_id.Table.create 17 ;
|
|
worker = Lwt.return_unit ;
|
|
} in
|
|
Chain_id.Table.iter (fun chain_id _chain_db ->
|
|
Lwt.async begin fun () ->
|
|
P2p.send db.p2p conn (Get_current_branch chain_id)
|
|
end)
|
|
db.active_chains ;
|
|
state.worker <-
|
|
Lwt_utils.worker
|
|
(Format.asprintf "db_network_reader.%a"
|
|
P2p_peer.Id.pp_short gid)
|
|
~run:(fun () -> worker_loop db state)
|
|
~cancel:(fun () -> Lwt_canceler.cancel canceler) ;
|
|
P2p_peer.Table.add db.p2p_readers gid state
|
|
|
|
let shutdown s =
|
|
Lwt_canceler.cancel s.canceler >>= fun () ->
|
|
s.worker
|
|
|
|
end
|
|
|
|
let active_peer_ids p2p () =
|
|
List.fold_left
|
|
(fun acc conn ->
|
|
let { P2p_connection.Info.peer_id } = P2p.connection_info p2p conn in
|
|
P2p_peer.Set.add peer_id acc)
|
|
P2p_peer.Set.empty
|
|
(P2p.connections p2p)
|
|
|
|
let raw_try_send p2p peer_id msg =
|
|
match P2p.find_connection p2p peer_id with
|
|
| None -> ()
|
|
| Some conn -> ignore (P2p.try_send p2p conn msg : bool)
|
|
|
|
|
|
let create disk p2p connection_metadata_value =
|
|
let global_request =
|
|
{ data = () ;
|
|
active = active_peer_ids p2p ;
|
|
send = raw_try_send p2p ;
|
|
} in
|
|
let protocol_db = Raw_protocol.create global_request disk in
|
|
let active_chains = Chain_id.Table.create 17 in
|
|
let p2p_readers = P2p_peer.Table.create 17 in
|
|
let block_input = Lwt_watcher.create_input () in
|
|
let operation_input = Lwt_watcher.create_input () in
|
|
let db =
|
|
{ p2p ; p2p_readers ; disk ;
|
|
active_chains ; protocol_db ;
|
|
block_input ; operation_input ;
|
|
connection_metadata_value } in
|
|
P2p.on_new_connection p2p (P2p_reader.run db) ;
|
|
P2p.iter_connections p2p (P2p_reader.run db) ;
|
|
db
|
|
|
|
let activate ({ p2p ; active_chains } as global_db) chain_state =
|
|
let chain_id = State.Chain.id chain_state in
|
|
match Chain_id.Table.find active_chains chain_id with
|
|
| exception Not_found ->
|
|
let active_peers = ref P2p_peer.Set.empty in
|
|
let p2p_request =
|
|
{ data = () ;
|
|
active = (fun () -> !active_peers) ;
|
|
send = raw_try_send p2p ;
|
|
} in
|
|
let operation_db =
|
|
Raw_operation.create
|
|
~global_input:global_db.operation_input p2p_request chain_state in
|
|
let block_header_db =
|
|
Raw_block_header.create
|
|
~global_input:global_db.block_input p2p_request chain_state in
|
|
let operation_hashes_db =
|
|
Raw_operation_hashes.create p2p_request chain_state in
|
|
let operations_db =
|
|
Raw_operations.create p2p_request chain_state in
|
|
let chain = {
|
|
global_db ; operation_db ; block_header_db ;
|
|
operation_hashes_db ; operations_db ;
|
|
chain_state ; callback = noop_callback ; active_peers ;
|
|
active_connections = P2p_peer.Table.create 53 ;
|
|
} in
|
|
P2p.iter_connections p2p (fun _peer_id conn ->
|
|
Lwt.async begin fun () ->
|
|
P2p.send p2p conn (Get_current_branch chain_id)
|
|
end) ;
|
|
Chain_id.Table.add active_chains chain_id chain ;
|
|
chain
|
|
| chain ->
|
|
chain
|
|
|
|
let set_callback chain_db callback =
|
|
chain_db.callback <- callback
|
|
|
|
let deactivate chain_db =
|
|
let { active_chains ; p2p } = chain_db.global_db in
|
|
let chain_id = State.Chain.id chain_db.chain_state in
|
|
Chain_id.Table.remove active_chains chain_id ;
|
|
P2p_peer.Table.iter
|
|
(fun _peer_id reader ->
|
|
P2p_reader.deactivate reader chain_db ;
|
|
Lwt.async begin fun () ->
|
|
P2p.send p2p reader.conn (Deactivate chain_id)
|
|
end)
|
|
chain_db.active_connections ;
|
|
Raw_operation.shutdown chain_db.operation_db >>= fun () ->
|
|
Raw_block_header.shutdown chain_db.block_header_db >>= fun () ->
|
|
Lwt.return_unit >>= fun () ->
|
|
Lwt.return_unit
|
|
|
|
let get_chain { active_chains } chain_id =
|
|
try Some (Chain_id.Table.find active_chains chain_id)
|
|
with Not_found -> None
|
|
|
|
let greylist { global_db = { p2p } } peer_id =
|
|
Lwt.return (P2p.greylist_peer p2p peer_id)
|
|
|
|
let disconnect { global_db = { p2p } } peer_id =
|
|
match P2p.find_connection p2p peer_id with
|
|
| None -> Lwt.return_unit
|
|
| Some conn -> P2p.disconnect p2p conn
|
|
|
|
let shutdown { p2p ; p2p_readers ; active_chains } =
|
|
P2p_peer.Table.fold
|
|
(fun _peer_id reader acc ->
|
|
P2p_reader.shutdown reader >>= fun () -> acc)
|
|
p2p_readers
|
|
Lwt.return_unit >>= fun () ->
|
|
Chain_id.Table.fold
|
|
(fun _ chain_db acc ->
|
|
Raw_operation.shutdown chain_db.operation_db >>= fun () ->
|
|
Raw_block_header.shutdown chain_db.block_header_db >>= fun () ->
|
|
acc)
|
|
active_chains
|
|
Lwt.return_unit >>= fun () ->
|
|
P2p.shutdown p2p >>= fun () ->
|
|
Lwt.return_unit
|
|
|
|
let clear_block chain_db hash n =
|
|
Raw_operations.clear_all chain_db.operations_db.table hash n ;
|
|
Raw_operation_hashes.clear_all chain_db.operation_hashes_db.table hash n ;
|
|
Raw_block_header.Table.clear_or_cancel chain_db.block_header_db.table hash
|
|
|
|
let commit_block chain_db hash
|
|
header header_data operations operations_data result =
|
|
assert (Block_hash.equal hash (Block_header.hash header)) ;
|
|
assert (List.length operations = header.shell.validation_passes) ;
|
|
State.Block.store chain_db.chain_state
|
|
header header_data operations operations_data result >>=? fun res ->
|
|
clear_block chain_db hash header.shell.validation_passes ;
|
|
return res
|
|
|
|
let commit_invalid_block chain_db hash header errors =
|
|
assert (Block_hash.equal hash (Block_header.hash header)) ;
|
|
State.Block.store_invalid chain_db.chain_state header errors >>=? fun res ->
|
|
clear_block chain_db hash header.shell.validation_passes ;
|
|
return res
|
|
|
|
let inject_operation chain_db h op =
|
|
assert (Operation_hash.equal h (Operation.hash op)) ;
|
|
Raw_operation.Table.inject chain_db.operation_db.table h op
|
|
|
|
let commit_protocol db h p =
|
|
State.Protocol.store db.disk p >>= fun res ->
|
|
Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ;
|
|
return (res <> None)
|
|
|
|
let watch_block_header { block_input } =
|
|
Lwt_watcher.create_stream block_input
|
|
let watch_operation { operation_input } =
|
|
Lwt_watcher.create_stream operation_input
|
|
|
|
module Raw = struct
|
|
let encoding = P2p.Raw.encoding Message.cfg.encoding
|
|
let supported_versions = Message.cfg.versions
|
|
end
|
|
|
|
module Make
|
|
(Table : Distributed_db_functors.DISTRIBUTED_DB)
|
|
(Kind : sig
|
|
type t
|
|
val proj: t -> Table.t
|
|
end) = struct
|
|
type key = Table.key
|
|
type value = Table.value
|
|
let known t k = Table.known (Kind.proj t) k
|
|
type error += Missing_data = Table.Missing_data
|
|
type error += Canceled = Table.Canceled
|
|
type error += Timeout = Table.Timeout
|
|
let read t k = Table.read (Kind.proj t) k
|
|
let read_opt t k = Table.read_opt (Kind.proj t) k
|
|
let read_exn t k = Table.read_exn (Kind.proj t) k
|
|
let prefetch t ?peer ?timeout k p =
|
|
Table.prefetch (Kind.proj t) ?peer ?timeout k p
|
|
let fetch t ?peer ?timeout k p =
|
|
Table.fetch (Kind.proj t) ?peer ?timeout k p
|
|
let clear_or_cancel t k = Table.clear_or_cancel (Kind.proj t) k
|
|
let inject t k v = Table.inject (Kind.proj t) k v
|
|
let pending t k = Table.pending (Kind.proj t) k
|
|
let watch t = Table.watch (Kind.proj t)
|
|
end
|
|
|
|
module Block_header = struct
|
|
type t = Block_header.t
|
|
include (Make (Raw_block_header.Table) (struct
|
|
type t = chain_db
|
|
let proj chain = chain.block_header_db.table
|
|
end) : Distributed_db_functors.DISTRIBUTED_DB with type t := chain_db
|
|
and type key := Block_hash.t
|
|
and type value := Block_header.t
|
|
and type param := unit)
|
|
end
|
|
|
|
module Operation_hashes =
|
|
Make (Raw_operation_hashes.Table) (struct
|
|
type t = chain_db
|
|
let proj chain = chain.operation_hashes_db.table
|
|
end)
|
|
|
|
module Operations =
|
|
Make (Raw_operations.Table) (struct
|
|
type t = chain_db
|
|
let proj chain = chain.operations_db.table
|
|
end)
|
|
|
|
module Operation = struct
|
|
include Operation
|
|
include (Make (Raw_operation.Table) (struct
|
|
type t = chain_db
|
|
let proj chain = chain.operation_db.table
|
|
end) : Distributed_db_functors.DISTRIBUTED_DB with type t := chain_db
|
|
and type key := Operation_hash.t
|
|
and type value := Operation.t
|
|
and type param := unit)
|
|
end
|
|
|
|
module Protocol = struct
|
|
type t = Protocol.t
|
|
include (Make (Raw_protocol.Table) (struct
|
|
type t = db
|
|
let proj db = db.protocol_db.table
|
|
end) : Distributed_db_functors.DISTRIBUTED_DB with type t := db
|
|
and type key := Protocol_hash.t
|
|
and type value := Protocol.t
|
|
and type param := unit)
|
|
end
|
|
|
|
|
|
let broadcast chain_db msg =
|
|
P2p_peer.Table.iter
|
|
(fun _peer_id state ->
|
|
ignore (P2p.try_send chain_db.global_db.p2p state.conn msg))
|
|
chain_db.active_connections
|
|
|
|
let try_send chain_db peer_id msg =
|
|
try
|
|
let conn = P2p_peer.Table.find chain_db.active_connections peer_id in
|
|
ignore (P2p.try_send chain_db.global_db.p2p conn.conn msg : bool)
|
|
with Not_found -> ()
|
|
|
|
let send chain_db ?peer msg =
|
|
match peer with
|
|
| Some peer -> try_send chain_db peer msg
|
|
| None -> broadcast chain_db msg
|
|
|
|
module Request = struct
|
|
|
|
let current_head chain_db ?peer () =
|
|
let chain_id = State.Chain.id chain_db.chain_state in
|
|
send chain_db ?peer @@ Get_current_head chain_id
|
|
|
|
let current_branch chain_db ?peer () =
|
|
let chain_id = State.Chain.id chain_db.chain_state in
|
|
send chain_db ?peer @@ Get_current_branch chain_id
|
|
|
|
end
|
|
|
|
module Advertise = struct
|
|
|
|
let current_head chain_db ?peer ?(mempool = Mempool.empty) head =
|
|
let chain_id = State.Chain.id chain_db.chain_state in
|
|
assert (Chain_id.equal chain_id (State.Block.chain_id head)) ;
|
|
let msg_mempool =
|
|
Message.Current_head (chain_id, State.Block.header head, mempool) in
|
|
if mempool = Mempool.empty then
|
|
send chain_db ?peer msg_mempool
|
|
else
|
|
let msg_disable_mempool =
|
|
Message.Current_head (chain_id, State.Block.header head, Mempool.empty) in
|
|
let send_mempool state =
|
|
let { Connection_metadata.disable_mempool } =
|
|
P2p.connection_metadata chain_db.global_db.p2p state.conn in
|
|
let msg = if disable_mempool then msg_disable_mempool else msg_mempool in
|
|
ignore @@ P2p.try_send chain_db.global_db.p2p state.conn msg
|
|
in
|
|
match peer with
|
|
| Some receiver_id ->
|
|
let state = P2p_peer.Table.find chain_db.active_connections receiver_id in
|
|
send_mempool state
|
|
| None ->
|
|
List.iter (fun (_receiver_id, state) -> send_mempool state)
|
|
(P2p_peer.Table.fold (fun k v acc -> (k,v)::acc) chain_db.active_connections [])
|
|
|
|
let current_branch ?peer chain_db =
|
|
let chain_id = State.Chain.id chain_db.chain_state in
|
|
let chain_state = chain_state chain_db in
|
|
let sender_id = my_peer_id chain_db in
|
|
match peer with
|
|
| Some receiver_id ->
|
|
let seed = {
|
|
Block_locator.receiver_id=receiver_id; sender_id } in
|
|
(Chain.locator chain_state seed) >>= fun locator ->
|
|
let msg = Message.Current_branch (chain_id, locator) in
|
|
try_send chain_db receiver_id msg;
|
|
Lwt.return_unit
|
|
| None ->
|
|
Lwt_list.iter_p
|
|
(fun (receiver_id,state) ->
|
|
let seed = {
|
|
Block_locator.receiver_id=receiver_id; sender_id } in
|
|
(Chain.locator chain_state seed) >>= fun locator ->
|
|
let msg = Message.Current_branch (chain_id, locator) in
|
|
ignore (P2p.try_send chain_db.global_db.p2p state.conn msg);
|
|
Lwt.return_unit
|
|
) (P2p_peer.Table.fold (fun k v acc -> (k,v)::acc) chain_db.active_connections [])
|
|
|
|
end
|
|
|