2017-02-24 17:17:53 +01:00
(* *)
2018-02-05 21:17:03 +01:00
(* Copyright (c) 2014 - 2018. *)
2017-02-24 17:17:53 +01:00
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
module Message = Distributed_db_message
2018-05-15 13:16:08 +02:00
type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net
type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection
2017-02-24 17:17:53 +01:00
type 'a request_param = {
data: 'a ;
2018-01-24 12:48:25 +01:00
active: unit -> P2p_peer.Set.t ;
send: P2p_peer.Id.t -> Message.t -> unit ;
2017-02-24 17:17:53 +01:00
module Make_raw
2017-07-21 16:16:39 +02:00
(Hash : sig
type t
val name : string
val encoding : t Data_encoding.t
val pp : Format.formatter -> t -> unit
2017-11-13 16:34:00 +01:00
2017-03-28 13:31:41 +02:00
(Disk_table :
Distributed_db_functors.DISK_TABLE with type key := Hash.t)
(Memory_table :
Distributed_db_functors.MEMORY_TABLE with type key := Hash.t)
2017-02-24 17:17:53 +01:00
(Request_message : sig
type param
val forge : param -> Hash.t list -> Message.t
2017-03-28 13:31:41 +02:00
(Precheck : Distributed_db_functors.PRECHECK
with type key := Hash.t
and type value := Disk_table.value) = struct
2017-02-24 17:17:53 +01:00
module Request = struct
type param = Request_message.param request_param
let active { active } = active ()
let send { data ; send } gid keys =
send gid (Request_message.forge data keys)
module Scheduler =
(Hash) (Memory_table) (Request)
module Table =
2017-03-28 13:31:41 +02:00
(Hash) (Disk_table) (Memory_table) (Scheduler) (Precheck)
2017-02-24 17:17:53 +01:00
type t = {
scheduler: Scheduler.t ;
table: Table.t ;
let create ?global_input request_param param =
let scheduler = Scheduler.create request_param in
let table = Table.create ?global_input scheduler param in
{ scheduler ; table }
let shutdown { scheduler } =
Scheduler.shutdown scheduler
2017-04-19 21:46:10 +02:00
module Fake_operation_storage = struct
2018-02-16 01:26:24 +01:00
type store = State.Chain.t
2017-04-19 21:46:10 +02:00
type value = Operation.t
let known _ _ = Lwt.return_false
let read _ _ = Lwt.return (Error_monad.error_exn Not_found)
let read_opt _ _ = Lwt.return_none
let read_exn _ _ = raise Not_found
2017-03-28 13:31:41 +02:00
2017-02-24 17:17:53 +01:00
module Raw_operation =
2017-03-28 13:31:41 +02:00
2017-04-19 21:46:10 +02:00
2017-03-28 13:31:41 +02:00
2017-11-14 03:52:26 +01:00
type param = unit
let forge () keys = Message.Get_operations keys
2017-03-28 13:31:41 +02:00
2017-04-19 21:46:10 +02:00
type param = unit
type notified_value = Operation.t
let precheck _ _ v = Some v
module Block_header_storage = struct
2018-02-16 01:26:24 +01:00
type store = State.Chain.t
2017-04-19 21:46:10 +02:00
type value = Block_header.t
let known = State.Block.known_valid
2018-02-16 01:26:24 +01:00
let read chain_state h =
State.Block.read chain_state h >>=? fun b ->
2017-04-19 21:46:10 +02:00
return (State.Block.header b)
2018-02-16 01:26:24 +01:00
let read_opt chain_state h =
State.Block.read_opt chain_state h >>= fun b ->
2017-11-27 06:13:12 +01:00
Lwt.return (Option.map ~f:State.Block.header b)
2018-02-16 01:26:24 +01:00
let read_exn chain_state h =
State.Block.read_exn chain_state h >>= fun b ->
2017-04-19 21:46:10 +02:00
Lwt.return (State.Block.header b)
2017-02-24 17:17:53 +01:00
module Raw_block_header =
2017-03-28 13:31:41 +02:00
2017-04-19 21:46:10 +02:00
2017-03-28 13:31:41 +02:00
2017-11-14 03:52:26 +01:00
type param = unit
let forge () keys = Message.Get_block_headers keys
2017-03-28 13:31:41 +02:00
2017-04-19 21:46:10 +02:00
type param = unit
type notified_value = Block_header.t
let precheck _ _ v = Some v
module Operation_hashes_storage = struct
2018-02-16 01:26:24 +01:00
type store = State.Chain.t
2017-04-19 21:46:10 +02:00
type value = Operation_hash.t list
2018-02-16 01:26:24 +01:00
let known chain_state (h, _) = State.Block.known_valid chain_state h
let read chain_state (h, i) =
State.Block.read chain_state h >>=? fun b ->
2017-04-19 21:46:10 +02:00
State.Block.operation_hashes b i >>= fun (ops, _) ->
return ops
2018-02-16 01:26:24 +01:00
let read_opt chain_state (h, i) =
State.Block.read_opt chain_state h >>= function
2017-04-19 21:46:10 +02:00
| None -> Lwt.return_none
| Some b ->
State.Block.operation_hashes b i >>= fun (ops, _) ->
Lwt.return (Some ops)
2018-02-16 01:26:24 +01:00
let read_exn chain_state (h, i) =
State.Block.read_exn chain_state h >>= fun b ->
2017-04-19 21:46:10 +02:00
State.Block.operation_hashes b i >>= fun (ops, _) ->
Lwt.return ops
2017-03-28 13:31:41 +02:00
2017-04-19 21:46:10 +02:00
module Operations_table =
2017-03-28 13:31:41 +02:00
type t = Block_hash.t * int
let hash = Hashtbl.hash
let equal (b1, i1) (b2, i2) =
Block_hash.equal b1 b2 && i1 = i2
2017-02-24 17:17:53 +01:00
2017-04-19 21:46:10 +02:00
module Raw_operation_hashes = struct
2017-07-21 16:16:39 +02:00
type t = Block_hash.t * int
2017-09-29 18:43:13 +02:00
let name = "operation_hashes"
2017-07-21 16:16:39 +02:00
let pp ppf (h, n) = Format.fprintf ppf "%a:%d" Block_hash.pp h n
let encoding =
let open Data_encoding in
obj2 (req "block" Block_hash.encoding) (req "index" uint16)
2017-04-19 21:46:10 +02:00
2017-11-14 03:52:26 +01:00
type param = unit
let forge () keys =
Message.Get_operation_hashes_for_blocks keys
2017-04-19 21:46:10 +02:00
type param = Operation_list_list_hash.t
type notified_value =
Operation_hash.t list * Operation_list_list_hash.path
let precheck (_block, expected_ofs) expected_hash (ops, path) =
let received_hash, received_ofs =
Operation_list_list_hash.check_path path
(Operation_list_hash.compute ops) in
received_ofs = expected_ofs &&
Operation_list_list_hash.compare expected_hash received_hash = 0
Some ops
let inject_all table hash operations =
(fun i ops -> Table.inject table (hash, i) ops)
operations >>= Lwt_list.for_all_s (fun x -> Lwt.return x)
let read_all table hash n =
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
2017-06-09 17:54:08 +02:00
let clear_all table hash n =
2017-11-06 15:23:06 +01:00
List.iter (fun i -> Table.clear_or_cancel table (hash, i)) (0 -- (n-1))
2017-04-19 21:46:10 +02:00
module Operations_storage = struct
2018-02-16 01:26:24 +01:00
type store = State.Chain.t
2017-04-19 21:46:10 +02:00
type value = Operation.t list
2018-02-16 01:26:24 +01:00
let known chain_state (h, _) = State.Block.known_valid chain_state h
let read chain_state (h, i) =
State.Block.read chain_state h >>=? fun b ->
2017-04-19 21:46:10 +02:00
State.Block.operations b i >>= fun (ops, _) ->
return ops
2018-02-16 01:26:24 +01:00
let read_opt chain_state (h, i) =
State.Block.read_opt chain_state h >>= function
2017-04-19 21:46:10 +02:00
| None -> Lwt.return_none
| Some b ->
State.Block.operations b i >>= fun (ops, _) ->
Lwt.return (Some ops)
2018-02-16 01:26:24 +01:00
let read_exn chain_state (h, i) =
State.Block.read_exn chain_state h >>= fun b ->
2017-04-19 21:46:10 +02:00
State.Block.operations b i >>= fun (ops, _) ->
Lwt.return ops
module Raw_operations = struct
2017-07-21 16:16:39 +02:00
type t = Block_hash.t * int
2017-09-29 18:43:13 +02:00
let name = "operations"
2017-07-21 16:16:39 +02:00
let pp ppf (h, n) = Format.fprintf ppf "%a:%d" Block_hash.pp h n
let encoding =
let open Data_encoding in
obj2 (req "block" Block_hash.encoding) (req "index" uint16)
2017-04-19 21:46:10 +02:00
2017-11-14 03:52:26 +01:00
type param = unit
let forge () keys =
Message.Get_operations_for_blocks keys
2017-04-19 21:46:10 +02:00
type param = Operation_list_list_hash.t
type notified_value = Operation.t list * Operation_list_list_hash.path
let precheck (_block, expected_ofs) expected_hash (ops, path) =
let received_hash, received_ofs =
Operation_list_list_hash.check_path path
(List.map Operation.hash ops)) in
received_ofs = expected_ofs &&
Operation_list_list_hash.compare expected_hash received_hash = 0
Some ops
let inject_all table hash operations =
(fun i ops -> Table.inject table (hash, i) ops)
operations >>= Lwt_list.for_all_s (fun x -> Lwt.return x)
let read_all table hash n =
map_p (fun i -> Table.read table (hash, i)) (0 -- (n-1))
2017-06-09 17:54:08 +02:00
let clear_all table hash n =
2017-11-06 15:23:06 +01:00
List.iter (fun i -> Table.clear_or_cancel table (hash, i)) (0 -- (n-1))
2017-04-19 21:46:10 +02:00
module Protocol_storage = struct
type store = State.t
type value = Protocol.t
let known = State.Protocol.known
let read = State.Protocol.read
let read_opt = State.Protocol.read_opt
let read_exn = State.Protocol.read_exn
2017-03-30 13:16:21 +02:00
2017-02-24 17:17:53 +01:00
module Raw_protocol =
2017-03-28 13:31:41 +02:00
2017-04-19 21:46:10 +02:00
2017-03-28 13:31:41 +02:00
type param = unit
let forge () keys = Message.Get_protocols keys
2017-04-19 21:46:10 +02:00
type param = unit
type notified_value = Protocol.t
let precheck _ _ v = Some v
2017-02-24 17:17:53 +01:00
type callback = {
2017-11-11 03:34:12 +01:00
2018-01-24 12:48:25 +01:00
P2p_peer.Id.t -> Block_locator.t -> unit ;
2017-11-11 03:34:12 +01:00
2018-01-24 12:48:25 +01:00
P2p_peer.Id.t -> Block_header.t -> Mempool.t -> unit ;
disconnection: P2p_peer.Id.t -> unit ;
2017-02-24 17:17:53 +01:00
type db = {
p2p: p2p ;
2018-01-24 12:48:25 +01:00
p2p_readers: p2p_reader P2p_peer.Table.t ;
2017-02-24 17:17:53 +01:00
disk: State.t ;
2018-02-16 01:26:24 +01:00
active_chains: chain_db Chain_id.Table.t ;
2017-02-24 17:17:53 +01:00
protocol_db: Raw_protocol.t ;
2017-11-27 06:13:12 +01:00
block_input: (Block_hash.t * Block_header.t) Lwt_watcher.input ;
operation_input: (Operation_hash.t * Operation.t) Lwt_watcher.input ;
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
and chain_db = {
chain_state: State.Chain.t ;
2017-02-24 17:17:53 +01:00
global_db: db ;
operation_db: Raw_operation.t ;
block_header_db: Raw_block_header.t ;
2017-04-19 21:46:10 +02:00
operation_hashes_db: Raw_operation_hashes.t ;
operations_db: Raw_operations.t ;
2017-09-29 18:43:13 +02:00
mutable callback: callback ;
2018-01-24 12:48:25 +01:00
active_peers: P2p_peer.Set.t ref ;
active_connections: p2p_reader P2p_peer.Table.t ;
2017-02-24 17:17:53 +01:00
and p2p_reader = {
2018-01-24 12:48:25 +01:00
gid: P2p_peer.Id.t ;
2017-02-24 17:17:53 +01:00
conn: connection ;
2018-02-16 01:26:24 +01:00
peer_active_chains: chain_db Chain_id.Table.t ;
2017-11-27 06:13:12 +01:00
canceler: Lwt_canceler.t ;
2017-02-24 17:17:53 +01:00
mutable worker: unit Lwt.t ;
2017-09-29 18:43:13 +02:00
let noop_callback = {
2017-11-13 16:34:00 +01:00
notify_branch = begin fun _gid _locator -> () end ;
notify_head = begin fun _gid _block _ops -> () end ;
disconnection = begin fun _gid -> () end ;
2017-09-29 18:43:13 +02:00
2017-02-24 17:17:53 +01:00
type t = db
2017-11-09 11:26:25 +01:00
let state { disk } = disk
2018-02-16 01:26:24 +01:00
let chain_state { chain_state } = chain_state
2017-11-09 11:26:25 +01:00
let db { global_db } = global_db
2017-02-24 17:17:53 +01:00
2018-03-21 15:38:41 +01:00
let my_peer_id chain_db = P2p.peer_id chain_db.global_db.p2p
2017-11-14 03:52:26 +01:00
let read_block_header { disk } h =
State.read_block disk h >>= function
| Some b ->
2018-02-16 01:26:24 +01:00
Lwt.return_some (State.Block.chain_id b, State.Block.header b)
2017-11-14 03:52:26 +01:00
| None ->
2018-02-16 01:26:24 +01:00
let find_pending_block_header { peer_active_chains } h =
(fun _chain_id chain_db acc ->
2017-11-14 03:14:26 +01:00
match acc with
| Some _ -> acc
| None when Raw_block_header.Table.pending
2018-02-16 01:26:24 +01:00
chain_db.block_header_db.table h ->
Some chain_db
2017-11-14 03:14:26 +01:00
| None -> None)
2018-02-16 01:26:24 +01:00
2017-11-14 03:14:26 +01:00
2018-02-16 01:26:24 +01:00
let find_pending_operations { peer_active_chains } h i =
(fun _chain_id chain_db acc ->
2017-11-14 03:52:26 +01:00
match acc with
| Some _ -> acc
| None when Raw_operations.Table.pending
2018-02-16 01:26:24 +01:00
chain_db.operations_db.table (h, i) ->
Some chain_db
2017-11-14 03:52:26 +01:00
| None -> None)
2018-02-16 01:26:24 +01:00
2017-11-14 03:52:26 +01:00
2018-02-16 01:26:24 +01:00
let find_pending_operation_hashes { peer_active_chains } h i =
(fun _chain_id chain_db acc ->
2017-11-14 03:52:26 +01:00
match acc with
| Some _ -> acc
| None when Raw_operation_hashes.Table.pending
2018-02-16 01:26:24 +01:00
chain_db.operation_hashes_db.table (h, i) ->
Some chain_db
2017-11-14 03:52:26 +01:00
| None -> None)
2018-02-16 01:26:24 +01:00
2017-11-14 03:52:26 +01:00
2018-02-16 01:26:24 +01:00
let find_pending_operation { peer_active_chains } h =
(fun _chain_id chain_db acc ->
2017-11-14 02:41:37 +01:00
match acc with
| Some _ -> acc
2017-11-14 03:14:26 +01:00
| None when Raw_operation.Table.pending
2018-02-16 01:26:24 +01:00
chain_db.operation_db.table h ->
Some chain_db
2017-11-14 03:14:26 +01:00
| None -> None)
2018-02-16 01:26:24 +01:00
2017-11-14 02:41:37 +01:00
2018-02-16 01:26:24 +01:00
let read_operation { active_chains } h =
(fun chain_id chain_db acc ->
2017-11-14 03:52:26 +01:00
acc >>= function
| Some _ -> acc
| None ->
2018-02-16 01:26:24 +01:00
chain_db.operation_db.table h >>= function
2017-11-14 03:52:26 +01:00
| None -> Lwt.return_none
2018-02-16 01:26:24 +01:00
| Some bh -> Lwt.return_some (chain_id, bh))
2017-11-14 03:52:26 +01:00
2017-02-24 17:17:53 +01:00
module P2p_reader = struct
2018-02-16 01:26:24 +01:00
let may_activate global_db state chain_id f =
match Chain_id.Table.find state.peer_active_chains chain_id with
| chain_db ->
f chain_db
2017-02-24 17:17:53 +01:00
| exception Not_found ->
2018-02-16 01:26:24 +01:00
match Chain_id.Table.find global_db.active_chains chain_id with
| chain_db ->
chain_db.active_peers :=
P2p_peer.Set.add state.gid !(chain_db.active_peers) ;
P2p_peer.Table.add chain_db.active_connections
2017-02-24 17:17:53 +01:00
state.gid state ;
2018-02-16 01:26:24 +01:00
Chain_id.Table.add state.peer_active_chains chain_id chain_db ;
f chain_db
2017-02-24 17:17:53 +01:00
| exception Not_found ->
(* TODO decrease peer score. *)
2018-02-16 01:26:24 +01:00
let deactivate state chain_db =
chain_db.callback.disconnection state.gid ;
chain_db.active_peers :=
P2p_peer.Set.remove state.gid !(chain_db.active_peers) ;
P2p_peer.Table.remove chain_db.active_connections state.gid
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
let may_handle state chain_id f =
match Chain_id.Table.find state.peer_active_chains chain_id with
2017-02-24 17:17:53 +01:00
| exception Not_found ->
(* TODO decrease peer score *)
2018-02-16 01:26:24 +01:00
| chain_db ->
f chain_db
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
let may_handle_global global_db chain_id f =
match Chain_id.Table.find global_db.active_chains chain_id with
2017-02-24 17:17:53 +01:00
| exception Not_found ->
2018-02-16 01:26:24 +01:00
| chain_db ->
f chain_db
2017-02-24 17:17:53 +01:00
let handle_msg global_db state msg =
let open Message in
2017-09-29 18:43:13 +02:00
let module Logging =
Logging.Make(struct let name = "node.distributed_db.p2p_reader" end) in
let open Logging in
2017-02-24 17:17:53 +01:00
lwt_debug "Read message from %a: %a"
2018-01-24 12:48:25 +01:00
P2p_peer.Id.pp_short state.gid Message.pp_json msg >>= fun () ->
2017-02-24 17:17:53 +01:00
match msg with
2018-02-16 01:26:24 +01:00
| Get_current_branch chain_id ->
may_handle_global global_db chain_id @@ fun chain_db ->
if not (Chain_id.Table.mem state.peer_active_chains chain_id) then
2017-02-24 17:17:53 +01:00
@@ P2p.try_send global_db.p2p state.conn
2018-02-16 01:26:24 +01:00
@@ Get_current_branch chain_id ;
2018-03-21 15:38:41 +01:00
let seed = {
sender_id=(my_peer_id chain_db) } in
(Chain.locator chain_db.chain_state seed) >>= fun locator ->
2017-02-24 17:17:53 +01:00
@@ P2p.try_send global_db.p2p state.conn
2018-02-16 01:26:24 +01:00
@@ Current_branch (chain_id, locator) ;
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
| Current_branch (chain_id, locator) ->
may_activate global_db state chain_id @@ fun chain_db ->
2017-11-11 03:34:12 +01:00
let head, hist = (locator :> Block_header.t * Block_hash.t list) in
2017-04-19 21:46:10 +02:00
2018-02-16 01:26:24 +01:00
(State.Block.known_invalid chain_db.chain_state)
2017-11-11 03:34:12 +01:00
(Block_header.hash head :: hist) >>= fun known_invalid ->
2017-04-19 21:46:10 +02:00
if not known_invalid then
2018-02-22 15:35:50 +01:00
chain_db.callback.notify_branch state.gid locator
(* Kickban *)
2018-03-11 15:02:59 +01:00
P2p.greylist_peer global_db.p2p state.gid;
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
| Deactivate chain_id ->
may_handle state chain_id @@ fun chain_db ->
deactivate state chain_db ;
Chain_id.Table.remove state.peer_active_chains chain_id ;
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
| Get_current_head chain_id ->
may_handle state chain_id @@ fun chain_db ->
State.Current_mempool.get chain_db.chain_state >>= fun (head, mempool) ->
2017-11-13 14:33:39 +01:00
(* TODO bound the sent mempool size *)
2017-02-24 17:17:53 +01:00
@@ P2p.try_send global_db.p2p state.conn
2018-02-16 01:26:24 +01:00
@@ Current_head (chain_id, head, mempool) ;
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
| Current_head (chain_id, header, mempool) ->
may_handle state chain_id @@ fun chain_db ->
2017-11-11 03:34:12 +01:00
let head = Block_header.hash header in
2018-02-16 01:26:24 +01:00
State.Block.known_invalid chain_db.chain_state head >>= fun known_invalid ->
2017-04-19 21:46:10 +02:00
if not known_invalid then
2018-02-22 15:35:50 +01:00
chain_db.callback.notify_head state.gid header mempool
(* Kickban *)
2018-03-11 15:02:59 +01:00
P2p.greylist_peer global_db.p2p state.gid ;
2017-02-24 17:17:53 +01:00
2017-11-14 03:52:26 +01:00
| Get_block_headers hashes ->
2017-02-24 17:17:53 +01:00
(fun hash ->
2017-11-14 03:52:26 +01:00
read_block_header global_db hash >>= function
| None ->
(* TODO: Blame request of unadvertised blocks ? *)
2018-02-16 01:26:24 +01:00
| Some (_chain_id, header) ->
2017-02-24 17:17:53 +01:00
ignore @@
2017-11-14 03:52:26 +01:00
P2p.try_send global_db.p2p state.conn (Block_header header) ;
2017-02-24 17:17:53 +01:00
2017-11-14 03:14:26 +01:00
| Block_header block -> begin
2017-04-19 19:21:23 +02:00
let hash = Block_header.hash block in
2017-11-14 03:52:26 +01:00
match find_pending_block_header state hash with
2017-11-14 03:14:26 +01:00
| None ->
(* TODO some penalty. *)
2018-02-16 01:26:24 +01:00
| Some chain_db ->
2017-11-14 03:14:26 +01:00
2018-02-16 01:26:24 +01:00
chain_db.block_header_db.table state.gid hash block >>= fun () ->
2017-11-14 03:14:26 +01:00
2017-02-24 17:17:53 +01:00
2017-11-14 03:52:26 +01:00
| Get_operations hashes ->
2017-02-24 17:17:53 +01:00
(fun hash ->
2017-11-14 03:52:26 +01:00
read_operation global_db hash >>= function
| None ->
(* TODO: Blame request of unadvertised operations ? *)
2018-02-16 01:26:24 +01:00
| Some (_chain_id, op) ->
2017-02-24 17:17:53 +01:00
ignore @@
2017-11-14 03:52:26 +01:00
P2p.try_send global_db.p2p state.conn (Operation op) ;
2017-02-24 17:17:53 +01:00
2017-11-14 02:41:37 +01:00
| Operation operation -> begin
2017-04-19 19:21:23 +02:00
let hash = Operation.hash operation in
2017-11-14 03:52:26 +01:00
match find_pending_operation state hash with
2017-11-14 02:41:37 +01:00
| None ->
(* TODO some penalty. *)
2018-02-16 01:26:24 +01:00
| Some chain_db ->
2017-11-14 02:41:37 +01:00
2018-02-16 01:26:24 +01:00
chain_db.operation_db.table state.gid hash operation >>= fun () ->
2017-11-14 02:41:37 +01:00
2017-02-24 17:17:53 +01:00
| Get_protocols hashes ->
(fun hash ->
2017-11-14 03:52:26 +01:00
State.Protocol.read_opt global_db.disk hash >>= function
| None ->
(* TODO: Blame request of unadvertised protocol ? *)
2017-02-24 17:17:53 +01:00
| Some p ->
ignore @@
2017-11-14 03:52:26 +01:00
P2p.try_send global_db.p2p state.conn (Protocol p) ;
2017-02-24 17:17:53 +01:00
| Protocol protocol ->
2017-04-19 19:21:23 +02:00
let hash = Protocol.hash protocol in
2017-02-24 17:17:53 +01:00
global_db.protocol_db.table state.gid hash protocol >>= fun () ->
2017-11-14 03:52:26 +01:00
| Get_operation_hashes_for_blocks blocks ->
2017-03-30 13:16:21 +02:00
2017-04-19 21:46:10 +02:00
(fun (hash, ofs) ->
2017-11-14 03:52:26 +01:00
State.read_block global_db.disk hash >>= function
2017-03-30 13:16:21 +02:00
| None -> Lwt.return_unit
2017-11-14 03:52:26 +01:00
| Some block ->
block ofs >>= fun (hashes, path) ->
2017-03-30 13:16:21 +02:00
ignore @@
2017-11-14 03:52:26 +01:00
P2p.try_send global_db.p2p state.conn @@
Operation_hashes_for_block (hash, ofs, hashes, path) ;
2017-03-30 13:16:21 +02:00
2017-04-19 21:46:10 +02:00
2017-03-30 13:16:21 +02:00
2017-11-14 03:52:26 +01:00
| Operation_hashes_for_block (block, ofs, ops, path) -> begin
match find_pending_operation_hashes state block ofs with
| None ->
(* TODO some penalty. *)
2018-02-16 01:26:24 +01:00
| Some chain_db ->
2017-11-14 03:52:26 +01:00
2018-02-16 01:26:24 +01:00
chain_db.operation_hashes_db.table state.gid
2017-11-14 03:52:26 +01:00
(block, ofs) (ops, path) >>= fun () ->
2017-04-19 21:46:10 +02:00
2017-11-14 03:52:26 +01:00
| Get_operations_for_blocks blocks ->
2017-04-19 21:46:10 +02:00
(fun (hash, ofs) ->
2017-11-14 03:52:26 +01:00
State.read_block global_db.disk hash >>= function
2017-04-19 21:46:10 +02:00
| None -> Lwt.return_unit
2017-11-14 03:52:26 +01:00
| Some block ->
block ofs >>= fun (ops, path) ->
2017-04-19 21:46:10 +02:00
ignore @@
2017-11-14 03:52:26 +01:00
P2p.try_send global_db.p2p state.conn @@
Operations_for_block (hash, ofs, ops, path) ;
2017-04-19 21:46:10 +02:00
2017-11-14 03:52:26 +01:00
| Operations_for_block (block, ofs, ops, path) -> begin
match find_pending_operations state block ofs with
| None ->
(* TODO some penalty. *)
2018-02-16 01:26:24 +01:00
| Some chain_db ->
2017-11-14 03:52:26 +01:00
2018-02-16 01:26:24 +01:00
chain_db.operations_db.table state.gid
2017-11-14 03:52:26 +01:00
(block, ofs) (ops, path) >>= fun () ->
2017-03-30 13:16:21 +02:00
2017-02-24 17:17:53 +01:00
let rec worker_loop global_db state =
2018-02-08 10:51:01 +01:00
protect ~canceler:state.canceler begin fun () ->
2017-02-24 17:17:53 +01:00
P2p.recv global_db.p2p state.conn
end >>= function
| Ok msg ->
handle_msg global_db state msg >>= fun () ->
worker_loop global_db state
| Error _ ->
2018-02-16 01:26:24 +01:00
2017-02-24 17:17:53 +01:00
(fun _ -> deactivate state)
2018-02-16 01:26:24 +01:00
state.peer_active_chains ;
2018-01-24 12:48:25 +01:00
P2p_peer.Table.remove global_db.p2p_readers state.gid ;
2017-02-24 17:17:53 +01:00
let run db gid conn =
2017-11-27 06:13:12 +01:00
let canceler = Lwt_canceler.create () in
2017-02-24 17:17:53 +01:00
let state = {
conn ; gid ; canceler ;
2018-02-16 01:26:24 +01:00
peer_active_chains = Chain_id.Table.create 17 ;
2017-02-24 17:17:53 +01:00
worker = Lwt.return_unit ;
} in
2018-02-16 01:26:24 +01:00
Chain_id.Table.iter (fun chain_id _chain_db ->
2017-02-24 17:17:53 +01:00
Lwt.async begin fun () ->
2018-02-16 01:26:24 +01:00
P2p.send db.p2p conn (Get_current_branch chain_id)
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
db.active_chains ;
2017-02-24 17:17:53 +01:00
state.worker <-
2017-11-11 03:34:12 +01:00
(Format.asprintf "db_network_reader.%a"
2018-01-24 12:48:25 +01:00
P2p_peer.Id.pp_short gid)
2017-02-24 17:17:53 +01:00
~run:(fun () -> worker_loop db state)
2017-11-27 06:13:12 +01:00
~cancel:(fun () -> Lwt_canceler.cancel canceler) ;
2018-01-24 12:48:25 +01:00
P2p_peer.Table.add db.p2p_readers gid state
2017-02-24 17:17:53 +01:00
let shutdown s =
2017-11-27 06:13:12 +01:00
Lwt_canceler.cancel s.canceler >>= fun () ->
2017-02-24 17:17:53 +01:00
let active_peer_ids p2p () =
(fun acc conn ->
2018-01-24 12:48:25 +01:00
let { P2p_connection.Info.peer_id } = P2p.connection_info p2p conn in
P2p_peer.Set.add peer_id acc)
2017-02-24 17:17:53 +01:00
(P2p.connections p2p)
let raw_try_send p2p peer_id msg =
match P2p.find_connection p2p peer_id with
| None -> ()
| Some conn -> ignore (P2p.try_send p2p conn msg : bool)
2018-03-21 15:38:41 +01:00
2017-02-24 17:17:53 +01:00
let create disk p2p =
let global_request =
{ data = () ;
active = active_peer_ids p2p ;
send = raw_try_send p2p ;
} in
let protocol_db = Raw_protocol.create global_request disk in
2018-02-16 01:26:24 +01:00
let active_chains = Chain_id.Table.create 17 in
2018-01-24 12:48:25 +01:00
let p2p_readers = P2p_peer.Table.create 17 in
2017-11-27 06:13:12 +01:00
let block_input = Lwt_watcher.create_input () in
let operation_input = Lwt_watcher.create_input () in
2017-02-24 17:17:53 +01:00
let db =
{ p2p ; p2p_readers ; disk ;
2018-02-16 01:26:24 +01:00
active_chains ; protocol_db ;
2017-02-24 17:17:53 +01:00
block_input ; operation_input } in
P2p.on_new_connection p2p (P2p_reader.run db) ;
P2p.iter_connections p2p (P2p_reader.run db) ;
2018-02-16 01:26:24 +01:00
let activate ({ p2p ; active_chains } as global_db) chain_state =
let chain_id = State.Chain.id chain_state in
match Chain_id.Table.find active_chains chain_id with
2017-02-24 17:17:53 +01:00
| exception Not_found ->
2018-01-24 12:48:25 +01:00
let active_peers = ref P2p_peer.Set.empty in
2017-02-24 17:17:53 +01:00
let p2p_request =
2017-11-14 03:52:26 +01:00
{ data = () ;
2017-02-24 17:17:53 +01:00
active = (fun () -> !active_peers) ;
send = raw_try_send p2p ;
} in
let operation_db =
2018-02-16 01:26:24 +01:00
~global_input:global_db.operation_input p2p_request chain_state in
2017-02-24 17:17:53 +01:00
let block_header_db =
2018-02-16 01:26:24 +01:00
~global_input:global_db.block_input p2p_request chain_state in
2017-04-19 21:46:10 +02:00
let operation_hashes_db =
2018-02-16 01:26:24 +01:00
Raw_operation_hashes.create p2p_request chain_state in
2017-04-19 21:46:10 +02:00
let operations_db =
2018-02-16 01:26:24 +01:00
Raw_operations.create p2p_request chain_state in
let chain = {
2017-04-19 21:46:10 +02:00
global_db ; operation_db ; block_header_db ;
operation_hashes_db ; operations_db ;
2018-02-16 01:26:24 +01:00
chain_state ; callback = noop_callback ; active_peers ;
2018-01-24 12:48:25 +01:00
active_connections = P2p_peer.Table.create 53 ;
2017-02-24 17:17:53 +01:00
} in
P2p.iter_connections p2p (fun _peer_id conn ->
2017-11-13 16:34:00 +01:00
Lwt.async begin fun () ->
2018-02-16 01:26:24 +01:00
P2p.send p2p conn (Get_current_branch chain_id)
2017-11-13 16:34:00 +01:00
end) ;
2018-02-16 01:26:24 +01:00
Chain_id.Table.add active_chains chain_id chain ;
| chain ->
let set_callback chain_db callback =
chain_db.callback <- callback
let deactivate chain_db =
let { active_chains ; p2p } = chain_db.global_db in
let chain_id = State.Chain.id chain_db.chain_state in
Chain_id.Table.remove active_chains chain_id ;
2018-01-24 12:48:25 +01:00
2017-02-24 17:17:53 +01:00
(fun _peer_id reader ->
2018-02-16 01:26:24 +01:00
P2p_reader.deactivate reader chain_db ;
2017-02-24 17:17:53 +01:00
Lwt.async begin fun () ->
2018-02-16 01:26:24 +01:00
P2p.send p2p reader.conn (Deactivate chain_id)
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
chain_db.active_connections ;
Raw_operation.shutdown chain_db.operation_db >>= fun () ->
Raw_block_header.shutdown chain_db.block_header_db >>= fun () ->
2017-11-13 16:34:00 +01:00
Lwt.return_unit >>= fun () ->
2017-02-24 17:17:53 +01:00
2018-02-16 01:26:24 +01:00
let get_chain { active_chains } chain_id =
try Some (Chain_id.Table.find active_chains chain_id)
2017-02-24 17:17:53 +01:00
with Not_found -> None
2018-03-11 15:02:59 +01:00
let greylist { global_db = { p2p } } peer_id =
Lwt.return (P2p.greylist_peer p2p peer_id)
2018-02-22 15:35:50 +01:00
2017-09-29 18:43:13 +02:00
let disconnect { global_db = { p2p } } peer_id =
match P2p.find_connection p2p peer_id with
| None -> Lwt.return_unit
| Some conn -> P2p.disconnect p2p conn
2018-02-16 01:26:24 +01:00
let shutdown { p2p ; p2p_readers ; active_chains } =
2018-01-24 12:48:25 +01:00
2017-02-24 17:17:53 +01:00
(fun _peer_id reader acc ->
P2p_reader.shutdown reader >>= fun () -> acc)
Lwt.return_unit >>= fun () ->
2018-02-16 01:26:24 +01:00
(fun _ chain_db acc ->
Raw_operation.shutdown chain_db.operation_db >>= fun () ->
Raw_block_header.shutdown chain_db.block_header_db >>= fun () ->
2017-11-13 16:34:00 +01:00
2018-02-16 01:26:24 +01:00
2017-02-24 17:17:53 +01:00
Lwt.return_unit >>= fun () ->
P2p.shutdown p2p >>= fun () ->
2018-02-16 01:26:24 +01:00
let clear_block chain_db hash n =
Raw_operations.clear_all chain_db.operations_db.table hash n ;
Raw_operation_hashes.clear_all chain_db.operation_hashes_db.table hash n ;
Raw_block_header.Table.clear_or_cancel chain_db.block_header_db.table hash
2017-11-11 03:34:12 +01:00
2018-02-16 01:26:24 +01:00
let commit_block chain_db hash header operations result =
2017-11-11 03:34:12 +01:00
assert (Block_hash.equal hash (Block_header.hash header)) ;
assert (List.length operations = header.shell.validation_passes) ;
2018-02-16 01:26:24 +01:00
State.Block.store chain_db.chain_state header operations result >>=? fun res ->
clear_block chain_db hash header.shell.validation_passes ;
2017-04-19 21:46:10 +02:00
return res
2018-02-16 01:26:24 +01:00
let commit_invalid_block chain_db hash header errors =
2017-11-11 03:34:12 +01:00
assert (Block_hash.equal hash (Block_header.hash header)) ;
2018-02-16 01:26:24 +01:00
State.Block.store_invalid chain_db.chain_state header errors >>=? fun res ->
clear_block chain_db hash header.shell.validation_passes ;
2017-11-11 03:34:12 +01:00
return res
2018-02-16 01:26:24 +01:00
let inject_operation chain_db h op =
2017-11-14 02:41:37 +01:00
assert (Operation_hash.equal h (Operation.hash op)) ;
2018-02-16 01:26:24 +01:00
Raw_operation.Table.inject chain_db.operation_db.table h op
2017-04-19 21:46:10 +02:00
2017-11-11 03:34:12 +01:00
let commit_protocol db h p =
2017-04-19 21:46:10 +02:00
State.Protocol.store db.disk p >>= fun res ->
2017-11-06 15:23:06 +01:00
Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ;
2017-04-19 21:46:10 +02:00
return (res <> None)
let watch_block_header { block_input } =
2017-11-27 06:13:12 +01:00
Lwt_watcher.create_stream block_input
2017-02-24 17:17:53 +01:00
let watch_operation { operation_input } =
2017-11-27 06:13:12 +01:00
Lwt_watcher.create_stream operation_input
2017-02-24 17:17:53 +01:00
module Raw = struct
let encoding = P2p.Raw.encoding Message.cfg.encoding
let supported_versions = Message.cfg.versions
2017-04-19 21:46:10 +02:00
module Make
(Table : Distributed_db_functors.DISTRIBUTED_DB)
(Kind : sig
type t
val proj: t -> Table.t
end) = struct
type key = Table.key
type value = Table.value
let known t k = Table.known (Kind.proj t) k
2017-09-29 18:43:13 +02:00
type error += Missing_data = Table.Missing_data
2017-11-08 12:42:09 +01:00
type error += Canceled = Table.Canceled
2017-11-08 12:52:10 +01:00
type error += Timeout = Table.Timeout
2017-04-19 21:46:10 +02:00
let read t k = Table.read (Kind.proj t) k
let read_opt t k = Table.read_opt (Kind.proj t) k
let read_exn t k = Table.read_exn (Kind.proj t) k
2017-11-10 20:30:29 +01:00
let prefetch t ?peer ?timeout k p =
Table.prefetch (Kind.proj t) ?peer ?timeout k p
2017-11-08 12:52:10 +01:00
let fetch t ?peer ?timeout k p =
Table.fetch (Kind.proj t) ?peer ?timeout k p
2017-11-06 15:23:06 +01:00
let clear_or_cancel t k = Table.clear_or_cancel (Kind.proj t) k
2017-04-19 21:46:10 +02:00
let inject t k v = Table.inject (Kind.proj t) k v
2018-02-14 17:27:14 +01:00
let pending t k = Table.pending (Kind.proj t) k
2017-04-19 21:46:10 +02:00
let watch t = Table.watch (Kind.proj t)
2017-11-13 19:06:30 +01:00
module Block_header = struct
type t = Block_header.t
include (Make (Raw_block_header.Table) (struct
2018-02-16 01:26:24 +01:00
type t = chain_db
let proj chain = chain.block_header_db.table
end) : Distributed_db_functors.DISTRIBUTED_DB with type t := chain_db
2018-02-14 17:27:14 +01:00
and type key := Block_hash.t
and type value := Block_header.t
and type param := unit)
2017-11-13 19:06:30 +01:00
2017-04-19 21:46:10 +02:00
module Operation_hashes =
Make (Raw_operation_hashes.Table) (struct
2018-02-16 01:26:24 +01:00
type t = chain_db
let proj chain = chain.operation_hashes_db.table
2017-04-19 21:46:10 +02:00
module Operations =
Make (Raw_operations.Table) (struct
2018-02-16 01:26:24 +01:00
type t = chain_db
let proj chain = chain.operations_db.table
2017-04-19 21:46:10 +02:00
2017-11-13 19:06:30 +01:00
module Operation = struct
2017-11-14 02:41:37 +01:00
include Operation
2017-11-13 19:06:30 +01:00
include (Make (Raw_operation.Table) (struct
2018-02-16 01:26:24 +01:00
type t = chain_db
let proj chain = chain.operation_db.table
end) : Distributed_db_functors.DISTRIBUTED_DB with type t := chain_db
2018-02-14 17:27:14 +01:00
and type key := Operation_hash.t
and type value := Operation.t
and type param := unit)
2017-11-13 19:06:30 +01:00
2017-04-19 21:46:10 +02:00
2017-11-13 19:06:30 +01:00
module Protocol = struct
type t = Protocol.t
include (Make (Raw_protocol.Table) (struct
type t = db
let proj db = db.protocol_db.table
2018-02-14 17:27:14 +01:00
end) : Distributed_db_functors.DISTRIBUTED_DB with type t := db
and type key := Protocol_hash.t
and type value := Protocol.t
and type param := unit)
2017-11-13 19:06:30 +01:00
2017-04-19 21:46:10 +02:00
2017-11-11 03:34:12 +01:00
2018-02-16 01:26:24 +01:00
let broadcast chain_db msg =
2018-01-24 12:48:25 +01:00
2017-11-11 03:34:12 +01:00
(fun _peer_id state ->
2018-02-16 01:26:24 +01:00
ignore (P2p.try_send chain_db.global_db.p2p state.conn msg))
2017-11-11 03:34:12 +01:00
2018-02-16 01:26:24 +01:00
let try_send chain_db peer_id msg =
2017-11-11 03:34:12 +01:00
2018-02-16 01:26:24 +01:00
let conn = P2p_peer.Table.find chain_db.active_connections peer_id in
ignore (P2p.try_send chain_db.global_db.p2p conn.conn msg : bool)
2017-11-11 03:34:12 +01:00
with Not_found -> ()
2018-02-16 01:26:24 +01:00
let send chain_db ?peer msg =
2017-11-11 03:34:12 +01:00
match peer with
2018-02-16 01:26:24 +01:00
| Some peer -> try_send chain_db peer msg
| None -> broadcast chain_db msg
2017-11-11 03:34:12 +01:00
module Request = struct
2018-02-16 01:26:24 +01:00
let current_head chain_db ?peer () =
let chain_id = State.Chain.id chain_db.chain_state in
send chain_db ?peer @@ Get_current_head chain_id
2017-11-11 03:34:12 +01:00
2018-02-16 01:26:24 +01:00
let current_branch chain_db ?peer () =
let chain_id = State.Chain.id chain_db.chain_state in
send chain_db ?peer @@ Get_current_branch chain_id
2017-11-11 03:34:12 +01:00
module Advertise = struct
2018-02-16 01:26:24 +01:00
let current_head chain_db ?peer ?(mempool = Mempool.empty) head =
let chain_id = State.Chain.id chain_db.chain_state in
assert (Chain_id.equal chain_id (State.Block.chain_id head)) ;
send chain_db ?peer @@
Current_head (chain_id, State.Block.header head, mempool)
2017-11-11 03:34:12 +01:00
2018-03-21 15:38:41 +01:00
let current_branch ?peer chain_db =
2018-02-16 01:26:24 +01:00
let chain_id = State.Chain.id chain_db.chain_state in
2018-03-21 15:38:41 +01:00
let chain_state = chain_state chain_db in
let sender_id = my_peer_id chain_db in
match peer with
| Some receiver_id ->
let seed = {
Block_locator.receiver_id=receiver_id; sender_id } in
(Chain.locator chain_state seed) >>= fun locator ->
let msg = Message.Current_branch (chain_id, locator) in
try_send chain_db receiver_id msg;
| None ->
(fun (receiver_id,state) ->
let seed = {
Block_locator.receiver_id=receiver_id; sender_id } in
(Chain.locator chain_state seed) >>= fun locator ->
let msg = Message.Current_branch (chain_id, locator) in
ignore (P2p.try_send chain_db.global_db.p2p state.conn msg);
) (P2p_peer.Table.fold (fun k v acc -> (k,v)::acc) chain_db.active_connections [])
2017-11-11 03:34:12 +01:00