Shell: first batch of statistics in the DistributedDB

Co-authored-by: Pietro Abate <pietro.abate@tezcore.com>
Co-authored-by: Mathias Bourgoin <mathias.bourgoin@tezcore.com>
This commit is contained in:
Mathias Bourgoin 2018-07-25 18:00:22 +02:00 committed by Grégoire Henry
parent ee640c8653
commit 1272b11ea2
No known key found for this signature in database
GPG Key ID: 50D984F20BD445D2
15 changed files with 757 additions and 71 deletions

View File

@ -89,10 +89,11 @@ end
module Info = struct
type 'conn_meta t = {
type ('peer_meta, 'conn_meta) t = {
score : float ;
trusted : bool ;
conn_metadata : 'conn_meta option;
conn_metadata : 'conn_meta option ;
peer_metadata : 'peer_meta ;
state : State.t ;
id_point : P2p_connection.Id.t option ;
stat : P2p_stat.t ;
@ -104,31 +105,36 @@ module Info = struct
last_miss : (P2p_connection.Id.t * Time.t) option ;
}
let encoding conn_metadata_encoding =
let encoding peer_metadata_encoding conn_metadata_encoding =
let open Data_encoding in
conv
(fun (
{ score ; trusted ; conn_metadata ; state ; id_point ; stat ;
{ score ; trusted ; conn_metadata ; peer_metadata ;
state ; id_point ; stat ;
last_failed_connection ; last_rejected_connection ;
last_established_connection ; last_disconnection ;
last_seen ; last_miss }) ->
((score, trusted, conn_metadata, state, id_point, stat),
((score, trusted, conn_metadata, peer_metadata,
state, id_point, stat),
(last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection,
last_seen, last_miss)))
(fun ((score, trusted, conn_metadata, state, id_point, stat),
(fun ((score, trusted, conn_metadata, peer_metadata,
state, id_point, stat),
(last_failed_connection, last_rejected_connection,
last_established_connection, last_disconnection,
last_seen, last_miss)) ->
{ score ; trusted ; conn_metadata ; state ; id_point ; stat ;
{ score ; trusted ; conn_metadata ; peer_metadata ;
state ; id_point ; stat ;
last_failed_connection ; last_rejected_connection ;
last_established_connection ; last_disconnection ;
last_seen ; last_miss })
(merge_objs
(obj6
(obj7
(req "score" float)
(req "trusted" bool)
(opt "conn_metadata" conn_metadata_encoding)
(req "peer_metadata" peer_metadata_encoding)
(req "state" State.encoding)
(opt "reachable_at" P2p_connection.Id.encoding)
(req "stat" P2p_stat.encoding))

View File

@ -56,10 +56,11 @@ end
module Info : sig
type 'conn_meta t = {
type ('peer_meta, 'conn_meta) t = {
score : float ;
trusted : bool ;
conn_metadata : 'conn_meta option ;
peer_metadata : 'peer_meta ;
state : State.t ;
id_point : P2p_connection.Id.t option ;
stat : P2p_stat.t ;
@ -71,7 +72,8 @@ module Info : sig
last_miss : (P2p_connection.Id.t * Time.t) option ;
}
val encoding : 'conn_meta Data_encoding.t -> 'conn_meta t Data_encoding.t
val encoding : 'peer_meta Data_encoding.t ->
'conn_meta Data_encoding.t -> ('peer_meta, 'conn_meta) t Data_encoding.t
end

View File

@ -27,7 +27,7 @@ include Logging.Make(struct let name = "p2p" end)
type 'peer_meta peer_meta_config = 'peer_meta P2p_pool.peer_meta_config = {
peer_meta_encoding : 'peer_meta Data_encoding.t ;
peer_meta_initial : 'peer_meta ;
peer_meta_initial : unit -> 'peer_meta ;
score : 'peer_meta -> float ;
}
@ -462,7 +462,7 @@ let faked_network peer_cfg faked_metadata = {
connection_remote_metadata = (fun _ -> faked_metadata) ;
connection_stat = (fun _ -> Fake.empty_stat) ;
global_stat = (fun () -> Fake.empty_stat) ;
get_peer_metadata = (fun _ -> peer_cfg.peer_meta_initial) ;
get_peer_metadata = (fun _ -> peer_cfg.peer_meta_initial ()) ;
set_peer_metadata = (fun _ _ -> ()) ;
recv = (fun _ -> Lwt_utils.never_ending ()) ;
recv_any = (fun () -> Lwt_utils.never_ending ()) ;
@ -556,6 +556,7 @@ let info_of_peer_info pool i =
score ;
trusted = trusted i ;
conn_metadata = meta_opt ;
peer_metadata = peer_metadata i;
state ;
id_point ;
stat ;

View File

@ -33,7 +33,7 @@
type 'peer_meta peer_meta_config = {
peer_meta_encoding : 'peer_meta Data_encoding.t;
peer_meta_initial : 'peer_meta;
peer_meta_initial : unit -> 'peer_meta;
score : 'peer_meta -> float ;
}
@ -277,7 +277,7 @@ val on_new_connection :
(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit
val build_rpc_directory :
(_, _, Connection_metadata.t) t -> unit RPC_directory.t
(_, Peer_metadata.t , Connection_metadata.t) t -> unit RPC_directory.t
val greylist_addr : ('msg, 'peer_meta, 'conn_meta) net -> P2p_addr.t -> unit
val greylist_peer : ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t -> unit

View File

@ -217,7 +217,7 @@ type config = {
type 'peer_meta peer_meta_config = {
peer_meta_encoding : 'peer_meta Data_encoding.t ;
peer_meta_initial : 'peer_meta ;
peer_meta_initial : unit -> 'peer_meta ;
score : 'peer_meta -> float ;
}
@ -377,7 +377,7 @@ let register_peer pool peer_id =
Lwt_condition.broadcast pool.events.new_peer () ;
let peer =
P2p_peer_state.Info.create peer_id
~peer_metadata:pool.peer_meta_config.peer_meta_initial in
~peer_metadata:(pool.peer_meta_config.peer_meta_initial ()) in
Option.iter pool.config.max_known_peer_ids ~f:begin fun (max, _) ->
if P2p_peer.Table.length pool.known_peer_ids >= max then gc_peer_ids pool
end ;
@ -527,7 +527,7 @@ module Peers = struct
let get_peer_metadata pool peer_id =
try P2p_peer_state.Info.peer_metadata (P2p_peer.Table.find pool.known_peer_ids peer_id)
with Not_found -> pool.peer_meta_config.peer_meta_initial
with Not_found -> pool.peer_meta_config.peer_meta_initial ()
let get_score pool peer_id =
pool.peer_meta_config.score (get_peer_metadata pool peer_id)
@ -1194,11 +1194,23 @@ let create config peer_meta_config conn_meta_config message_config io_sched =
peer_ids ;
Lwt.return pool
| Error err ->
log_error "@[Failed to parsed peers file:@ %a@]"
log_error "@[Failed to parse peers file:@ %a@]"
pp_print_error err ;
Lwt.return pool
let destroy pool =
let destroy ({ config ; peer_meta_config } as pool) =
lwt_log_info "Saving metadata in %s" config.peers_file >>= fun () ->
begin
P2p_peer_state.Info.File.save
config.peers_file
peer_meta_config.peer_meta_encoding
(P2p_peer.Table.fold (fun _ a b -> a::b) pool.known_peer_ids []) >>= function
| Error err ->
log_error "@[Failed to save peers file:@ %a@]"
pp_print_error err;
Lwt.return_unit
| Ok ()-> Lwt.return_unit
end >>= fun () ->
P2p_point.Table.fold (fun _point point_info acc ->
match P2p_point_state.get point_info with
| Requested { cancel } | Accepted { cancel } ->

View File

@ -144,7 +144,7 @@ type config = {
type 'peer_meta peer_meta_config = {
peer_meta_encoding : 'peer_meta Data_encoding.t ;
peer_meta_initial : 'peer_meta ;
peer_meta_initial : unit -> 'peer_meta ;
score : 'peer_meta -> float ;
}

View File

@ -46,7 +46,7 @@ type metadata = unit
let peer_meta_config : metadata P2p_pool.peer_meta_config = {
peer_meta_encoding = Data_encoding.empty ;
peer_meta_initial = () ;
peer_meta_initial = (fun _ -> ()) ;
score = fun () -> 0. ;
}

View File

@ -29,6 +29,7 @@ type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net
type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection
type 'a request_param = {
p2p : (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.t ;
data: 'a ;
active: unit -> P2p_peer.Set.t ;
send: P2p_peer.Id.t -> Message.t -> unit ;
@ -63,7 +64,21 @@ module Make_raw
let active { active } = active ()
let rec send state gid keys =
let first_keys, keys = List.split_n Request_message.max_length keys in
state.send gid (Request_message.forge state.data first_keys) ;
let msg = (Request_message.forge state.data first_keys) in
state.send gid msg ;
let open Peer_metadata in
let (req : requests_kind) = match msg with
| Get_current_branch _ -> Branch
| Get_current_head _ -> Head
| Get_block_headers _ -> Block_header
| Get_operations _ -> Operations
| Get_protocols _ -> Protocols
| Get_operation_hashes_for_blocks _ -> Operation_hashes_for_block
| Get_operations_for_blocks _ -> Operations_for_block
| _ -> Other in
let meta = P2p.get_peer_metadata state.p2p gid in
Peer_metadata.incr meta @@ Scheduled_request req ;
(* TODO update peer_metadata *)
if keys <> [] then send state gid keys
end
@ -375,6 +390,8 @@ let db { global_db } = global_db
let my_peer_id chain_db = P2p.peer_id chain_db.global_db.p2p
let get_peer_metadata chain_db = P2p.get_peer_metadata chain_db.global_db.p2p
let read_block_header { disk } h =
State.read_block disk h >>= function
| Some b ->
@ -459,7 +476,8 @@ module P2p_reader = struct
Chain_id.Table.add state.peer_active_chains chain_id chain_db ;
f chain_db
| None ->
(* TODO decrease peer score. *)
let meta = P2p.get_peer_metadata global_db.p2p state.gid in
Peer_metadata.incr meta Unactivated_chain ;
Lwt.return_unit
let deactivate state chain_db =
@ -468,10 +486,12 @@ module P2p_reader = struct
P2p_peer.Set.remove state.gid !(chain_db.active_peers) ;
P2p_peer.Table.remove chain_db.active_connections state.gid
let may_handle state chain_id f =
(* check if the chain advertized by a peer is (still) active *)
let may_handle global_db state chain_id f =
match Chain_id.Table.find_opt state.peer_active_chains chain_id with
| None ->
(* TODO decrease peer score *)
let meta = P2p.get_peer_metadata global_db.p2p state.gid in
Peer_metadata.incr meta Inactive_chain ;
Lwt.return_unit
| Some chain_db ->
f chain_db
@ -490,6 +510,7 @@ module P2p_reader = struct
let open Message in
let open Handle_msg_Logging in
let meta = P2p.get_peer_metadata global_db.p2p state.gid in
lwt_debug Tag.DSL.(fun f ->
f "Read message from %a: %a"
@ -498,20 +519,20 @@ module P2p_reader = struct
-% a Message.Logging.tag msg) >>= fun () ->
match msg with
| Get_current_branch chain_id ->
Peer_metadata.incr meta @@ Received_request Branch;
may_handle_global global_db chain_id @@ fun chain_db ->
if not (Chain_id.Table.mem state.peer_active_chains chain_id) then
ignore
@@ P2p.try_send global_db.p2p state.conn
@@ Get_current_branch chain_id ;
Peer_metadata.update_requests meta Branch @@
P2p.try_send global_db.p2p state.conn @@
Get_current_branch chain_id ;
let seed = {
Block_locator.receiver_id=state.gid;
sender_id=(my_peer_id chain_db) } in
(Chain.locator chain_db.chain_state seed) >>= fun locator ->
ignore
@@ P2p.try_send global_db.p2p state.conn
@@ Current_branch (chain_id, locator) ;
Peer_metadata.update_responses meta Branch @@
P2p.try_send global_db.p2p state.conn @@
Current_branch (chain_id, locator) ;
Lwt.return_unit
| Current_branch (chain_id, locator) ->
@ -521,11 +542,11 @@ module P2p_reader = struct
(State.Block.known_invalid chain_db.chain_state)
(Block_header.hash head :: hist) >>= fun known_invalid ->
if known_invalid then begin
(* TODO Kick *)
P2p.disconnect global_db.p2p state.conn >>= fun () ->
P2p.greylist_peer global_db.p2p state.gid ;
Lwt.return_unit
end else if Time.(add (now ()) 15L < head.shell.timestamp) then begin
(* TODO some penalty *)
Peer_metadata.incr meta Future_block ;
lwt_log_notice Tag.DSL.(fun f ->
f "Received future block %a from peer %a."
-% t event "received_future_block"
@ -534,17 +555,21 @@ module P2p_reader = struct
Lwt.return_unit
end else begin
chain_db.callback.notify_branch state.gid locator ;
(* TODO discriminate between received advertisements
and responses? *)
Peer_metadata.incr meta @@ Received_advertisement Branch ;
Lwt.return_unit
end
| Deactivate chain_id ->
may_handle state chain_id @@ fun chain_db ->
may_handle global_db state chain_id @@ fun chain_db ->
deactivate state chain_db ;
Chain_id.Table.remove state.peer_active_chains chain_id ;
Lwt.return_unit
| Get_current_head chain_id ->
may_handle state chain_id @@ fun chain_db ->
may_handle global_db state chain_id @@ fun chain_db ->
Peer_metadata.incr meta @@ Received_request Head ;
let { Connection_metadata.disable_mempool } =
P2p.connection_remote_metadata chain_db.global_db.p2p state.conn in
begin
@ -555,13 +580,13 @@ module P2p_reader = struct
State.Current_mempool.get chain_db.chain_state
end >>= fun (head, mempool) ->
(* TODO bound the sent mempool size *)
ignore
@@ P2p.try_send global_db.p2p state.conn
@@ Current_head (chain_id, head, mempool) ;
Peer_metadata.update_responses meta Head @@
P2p.try_send global_db.p2p state.conn @@
Current_head (chain_id, head, mempool) ;
Lwt.return_unit
| Current_head (chain_id, header, mempool) ->
may_handle state chain_id @@ fun chain_db ->
may_handle global_db state chain_id @@ fun chain_db ->
let head = Block_header.hash header in
State.Block.known_invalid chain_db.chain_state head >>= fun known_invalid ->
let { Connection_metadata.disable_mempool } =
@ -574,11 +599,11 @@ module P2p_reader = struct
This should probably warrant a reduction of the sender's score. *)
in
if known_invalid then begin
(* TODO Kick *)
P2p.disconnect global_db.p2p state.conn >>= fun () ->
P2p.greylist_peer global_db.p2p state.gid ;
Lwt.return_unit
end else if Time.(add (now ()) 15L < header.shell.timestamp) then begin
(* TODO some penalty *)
Peer_metadata.incr meta Future_block ;
lwt_log_notice Tag.DSL.(fun f ->
f "Received future block %a from peer %a."
-% t event "received_future_block"
@ -587,44 +612,51 @@ module P2p_reader = struct
Lwt.return_unit
end else begin
chain_db.callback.notify_head state.gid header mempool ;
(* TODO discriminate between received advertisements
and responses? *)
Peer_metadata.incr meta @@ Received_advertisement Head ;
Lwt.return_unit
end
| Get_block_headers hashes ->
Peer_metadata.incr meta @@ Received_request Block_header ;
Lwt_list.iter_p
(fun hash ->
read_block_header global_db hash >>= function
| None ->
(* TODO: Blame request of unadvertised blocks ? *)
Peer_metadata.incr meta @@ Unadvertised Block ;
Lwt.return_unit
| Some (_chain_id, header) ->
ignore @@
P2p.try_send global_db.p2p state.conn (Block_header header) ;
Peer_metadata.update_responses meta Block_header @@
P2p.try_send global_db.p2p state.conn @@
Block_header header ;
Lwt.return_unit)
hashes
| Block_header block -> begin
let hash = Block_header.hash block in
match find_pending_block_header state hash with
| None ->
(* TODO some penalty. *)
Peer_metadata.incr meta Unexpected_response ;
Lwt.return_unit
| Some chain_db ->
Raw_block_header.Table.notify
chain_db.block_header_db.table state.gid hash block >>= fun () ->
Peer_metadata.incr meta @@ Received_response Block_header ;
Lwt.return_unit
end
| Get_operations hashes ->
Peer_metadata.incr meta @@ Received_request Operations ;
Lwt_list.iter_p
(fun hash ->
read_operation global_db hash >>= function
| None ->
(* TODO: Blame request of unadvertised operations ? *)
Peer_metadata.incr meta @@ Unadvertised Operations ;
Lwt.return_unit
| Some (_chain_id, op) ->
ignore @@
P2p.try_send global_db.p2p state.conn (Operation op) ;
Peer_metadata.update_responses meta Operations @@
P2p.try_send global_db.p2p state.conn @@
Operation op ;
Lwt.return_unit)
hashes
@ -632,24 +664,27 @@ module P2p_reader = struct
let hash = Operation.hash operation in
match find_pending_operation state hash with
| None ->
(* TODO some penalty. *)
Peer_metadata.incr meta Unexpected_response ;
Lwt.return_unit
| Some chain_db ->
Raw_operation.Table.notify
chain_db.operation_db.table state.gid hash operation >>= fun () ->
Peer_metadata.incr meta @@ Received_response Operations ;
Lwt.return_unit
end
| Get_protocols hashes ->
Peer_metadata.incr meta @@ Received_request Protocols ;
Lwt_list.iter_p
(fun hash ->
State.Protocol.read_opt global_db.disk hash >>= function
| None ->
(* TODO: Blame request of unadvertised protocol ? *)
Peer_metadata.incr meta @@ Unadvertised Protocol ;
Lwt.return_unit
| Some p ->
ignore @@
P2p.try_send global_db.p2p state.conn (Protocol p) ;
Peer_metadata.update_responses meta Protocols @@
P2p.try_send global_db.p2p state.conn @@
Protocol p ;
Lwt.return_unit)
hashes
@ -657,9 +692,12 @@ module P2p_reader = struct
let hash = Protocol.hash protocol in
Raw_protocol.Table.notify
global_db.protocol_db.table state.gid hash protocol >>= fun () ->
Peer_metadata.incr meta @@ Received_response Protocols ;
Lwt.return_unit
| Get_operation_hashes_for_blocks blocks ->
Peer_metadata.incr meta @@
Received_request Operation_hashes_for_block ;
Lwt_list.iter_p
(fun (hash, ofs) ->
State.read_block global_db.disk hash >>= function
@ -667,7 +705,8 @@ module P2p_reader = struct
| Some block ->
State.Block.operation_hashes
block ofs >>= fun (hashes, path) ->
ignore @@
Peer_metadata.update_responses meta
Operation_hashes_for_block @@
P2p.try_send global_db.p2p state.conn @@
Operation_hashes_for_block (hash, ofs, hashes, path) ;
Lwt.return_unit)
@ -676,16 +715,20 @@ module P2p_reader = struct
| Operation_hashes_for_block (block, ofs, ops, path) -> begin
match find_pending_operation_hashes state block ofs with
| None ->
(* TODO some penalty. *)
Peer_metadata.incr meta Unexpected_response ;
Lwt.return_unit
| Some chain_db ->
Raw_operation_hashes.Table.notify
chain_db.operation_hashes_db.table state.gid
(block, ofs) (ops, path) >>= fun () ->
Peer_metadata.incr meta @@
Received_response Operation_hashes_for_block ;
Lwt.return_unit
end
| Get_operations_for_blocks blocks ->
Peer_metadata.incr meta @@
Received_request Operations_for_block ;
Lwt_list.iter_p
(fun (hash, ofs) ->
State.read_block global_db.disk hash >>= function
@ -693,7 +736,8 @@ module P2p_reader = struct
| Some block ->
State.Block.operations
block ofs >>= fun (ops, path) ->
ignore @@
Peer_metadata.update_responses meta
Operations_for_block @@
P2p.try_send global_db.p2p state.conn @@
Operations_for_block (hash, ofs, ops, path) ;
Lwt.return_unit)
@ -702,12 +746,14 @@ module P2p_reader = struct
| Operations_for_block (block, ofs, ops, path) -> begin
match find_pending_operations state block ofs with
| None ->
(* TODO some penalty. *)
Peer_metadata.incr meta Unexpected_response ;
Lwt.return_unit
| Some chain_db ->
Raw_operations.Table.notify
chain_db.operations_db.table state.gid
(block, ofs) (ops, path) >>= fun () ->
Peer_metadata.incr meta @@
Received_response Operations_for_block ;
Lwt.return_unit
end
@ -734,6 +780,8 @@ module P2p_reader = struct
} in
Chain_id.Table.iter (fun chain_id _chain_db ->
Lwt.async begin fun () ->
let meta = P2p.get_peer_metadata db.p2p gid in
Peer_metadata.incr meta (Sent_request Branch) ;
P2p.send db.p2p conn (Get_current_branch chain_id)
end)
db.active_chains ;
@ -767,7 +815,8 @@ let raw_try_send p2p peer_id msg =
let create disk p2p =
let global_request =
{ data = () ;
{ p2p ;
data = () ;
active = active_peer_ids p2p ;
send = raw_try_send p2p ;
} in
@ -791,7 +840,8 @@ let activate ({ p2p ; active_chains } as global_db) chain_state =
| None ->
let active_peers = ref P2p_peer.Set.empty in
let p2p_request =
{ data = () ;
{ p2p ;
data = () ;
active = (fun () -> !active_peers) ;
send = raw_try_send p2p ;
} in
@ -997,10 +1047,22 @@ module Request = struct
let current_head chain_db ?peer () =
let chain_id = State.Chain.id chain_db.chain_state in
begin match peer with
|Some peer ->
let meta = P2p.get_peer_metadata chain_db.global_db.p2p peer in
Peer_metadata.incr meta (Sent_request Head)
|None -> ()
end ;
send chain_db ?peer @@ Get_current_head chain_id
let current_branch chain_db ?peer () =
let chain_id = State.Chain.id chain_db.chain_state in
begin match peer with
|Some peer ->
let meta = P2p.get_peer_metadata chain_db.global_db.p2p peer in
Peer_metadata.incr meta (Sent_request Head)
|None -> ()
end ;
send chain_db ?peer @@ Get_current_branch chain_id
end
@ -1010,6 +1072,12 @@ module Advertise = struct
let current_head chain_db ?peer ?(mempool = Mempool.empty) head =
let chain_id = State.Chain.id chain_db.chain_state in
assert (Chain_id.equal chain_id (State.Block.chain_id head)) ;
begin match peer with
| Some peer ->
let meta = P2p.get_peer_metadata chain_db.global_db.p2p peer in
Peer_metadata.incr meta (Sent_advertisement Head)
| None -> ()
end ;
let msg_mempool =
Message.Current_head (chain_id, State.Block.header head, mempool) in
if mempool = Mempool.empty then
@ -1035,6 +1103,13 @@ module Advertise = struct
let chain_id = State.Chain.id chain_db.chain_state in
let chain_state = chain_state chain_db in
let sender_id = my_peer_id chain_db in
begin match peer with
| Some peer ->
let meta = P2p.get_peer_metadata chain_db.global_db.p2p peer in
Peer_metadata.incr meta (Sent_advertisement Branch)
| None -> ()
end ;
match peer with
| Some receiver_id ->
let seed = {

View File

@ -79,6 +79,8 @@ val db: chain_db -> db
(** Return the peer id of the node *)
val my_peer_id: chain_db -> P2p_peer.Id.t
val get_peer_metadata: chain_db -> P2p_peer.Id.t -> Peer_metadata.t
(** {1 Sending messages} *)
module Request : sig

View File

@ -38,8 +38,8 @@ type t = {
let peer_metadata_cfg : _ P2p.peer_meta_config = {
peer_meta_encoding = Peer_metadata.encoding ;
peer_meta_initial = () ;
score = fun _ -> 0. ;
peer_meta_initial = Peer_metadata.empty ;
score = Peer_metadata.score ;
}
let connection_metadata_cfg cfg : _ P2p.conn_meta_config = {

View File

@ -165,6 +165,8 @@ let validate_new_head w hash (header : Block_header.t) =
Block_hash.pp_short hash
P2p_peer.Id.pp_short pv.peer_id ;
set_bootstrapped pv ;
let meta = Distributed_db.get_peer_metadata pv.parameters.chain_db pv.peer_id in
Peer_metadata.incr meta Valid_blocks;
return_unit
let only_if_fitness_increases w distant_header cont =
@ -180,6 +182,8 @@ let only_if_fitness_increases w distant_header cont =
Block_hash.pp_short (Block_header.hash distant_header)
P2p_peer.Id.pp_short pv.peer_id ;
(* Don't download a branch that cannot beat the current head. *)
let meta = Distributed_db.get_peer_metadata pv.parameters.chain_db pv.peer_id in
Peer_metadata.incr meta Old_heads;
return_unit
end else cont ()

View File

@ -235,7 +235,8 @@ module Peers = struct
let info =
RPC_service.get_service
~query: RPC_query.empty
~output: (P2p_peer.Info.encoding Connection_metadata.encoding)
~output: (P2p_peer.Info.encoding Peer_metadata.encoding
Connection_metadata.encoding)
~description:"Details about a given peer."
RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg)
@ -260,7 +261,8 @@ module Peers = struct
~output:
Data_encoding.(list (tup2
P2p_peer.Id.encoding
(P2p_peer.Info.encoding Connection_metadata.encoding)))
(P2p_peer.Info.encoding Peer_metadata.encoding
Connection_metadata.encoding)))
~description:"List the peers the node ever met."
RPC_path.(root / "network" / "peers")

View File

@ -173,11 +173,11 @@ module Peers : sig
val list:
?filter:(P2p_peer.Filter.t list) ->
#simple ->
(P2p_peer.Id.t * Connection_metadata.t P2p_peer.Info.t) list tzresult Lwt.t
(P2p_peer.Id.t * (Peer_metadata.t, Connection_metadata.t) P2p_peer.Info.t) list tzresult Lwt.t
val info:
#simple -> P2p_peer.Id.t ->
Connection_metadata.t P2p_peer.Info.t tzresult Lwt.t
(Peer_metadata.t, Connection_metadata.t) P2p_peer.Info.t tzresult Lwt.t
val events:
#streamed -> P2p_peer.Id.t ->
@ -198,12 +198,12 @@ module Peers : sig
val list :
([ `GET ], unit,
unit, < filters: P2p_peer.Filter.t list >, unit,
(P2p_peer.Id.t * Connection_metadata.t P2p_peer.Info.t) list) RPC_service.t
(P2p_peer.Id.t * (Peer_metadata.t, Connection_metadata.t) P2p_peer.Info.t) list) RPC_service.t
val info :
([ `GET ], unit,
unit * P2p_peer.Id.t, unit, unit,
Connection_metadata.t P2p_peer.Info.t) RPC_service.t
(Peer_metadata.t, Connection_metadata.t) P2p_peer.Info.t) RPC_service.t
val events :
([ `GET ], unit,

View File

@ -23,5 +23,533 @@
(* *)
(*****************************************************************************)
type t = unit
let encoding = Data_encoding.empty
type counter = int
let counter = Data_encoding.int31
(* Distributed DB peer metadata *)
type messages =
{
mutable branch: counter ;
mutable head: counter ;
mutable block_header: counter ;
mutable operations: counter ;
mutable protocols: counter ;
mutable operation_hashes_for_block: counter ;
mutable operations_for_block: counter ;
mutable other: counter ;
}
let sent_requests_encoding =
let open Data_encoding in
(conv
(fun { branch ; head ; block_header ;
operations ; protocols ;
operation_hashes_for_block ;
operations_for_block ;
other ; } -> (branch, head, block_header,
operations, protocols,
operation_hashes_for_block,
operations_for_block,
other ))
(fun (branch, head, block_header,
operations, protocols,
operation_hashes_for_block,
operations_for_block,
other) -> { branch ; head ; block_header ;
operations ; protocols ;
operation_hashes_for_block ;
operations_for_block ;
other })
)
(obj8
(req "branch" counter)
(req "head" counter)
(req "block_header" counter)
(req "operations" counter)
(req "protocols" counter)
(req "operation_hashes_for_block" counter)
(req "operations_for_block" counter)
(req "other" counter)
)
type requests_kind =
| Branch | Head | Block_header | Operations
| Protocols | Operation_hashes_for_block
| Operations_for_block | Other
type requests = {
sent : messages ;
(** p2p sent messages of type requests *)
received : messages ;
(** p2p received messages of type requests *)
failed : messages ;
(** p2p messages of type requests that we failed to send *)
scheduled : messages ;
(** p2p messages ent via request scheduler *)
}
let requests_encoding =
let open Data_encoding in
(conv
(fun
{ sent ; received ;
failed ; scheduled } -> (sent, received,
failed, scheduled))
(fun (sent, received,
failed, scheduled) -> { sent ; received ;
failed ; scheduled })
)
(obj4
(req "sent" sent_requests_encoding)
(req "received" sent_requests_encoding)
(req "failed" sent_requests_encoding)
(req "scheduled" sent_requests_encoding)
)
(* Prevalidator peer metadata *)
type prevalidator_results =
{ cannot_download : counter ; cannot_parse : counter ;
refused_by_prefilter : counter ;
refused_by_postfilter : counter ;
(* prevalidation results *)
applied : counter ; branch_delayed : counter ;
branch_refused : counter ;
refused : counter ; duplicate : counter ; outdated : counter }
let prevalidator_results_encoding =
let open Data_encoding in
(conv
(fun { cannot_download ;
cannot_parse ;
refused_by_prefilter ;
refused_by_postfilter ;
applied ; branch_delayed;
branch_refused ;
refused ; duplicate ;
outdated } -> (cannot_download, cannot_parse,
refused_by_prefilter,
refused_by_postfilter,
applied, branch_delayed,
branch_refused,
refused, duplicate, outdated))
(fun (cannot_download,
cannot_parse,
refused_by_prefilter,
refused_by_postfilter,
applied, branch_delayed,
branch_refused,
refused, duplicate,
outdated) -> { cannot_download ; cannot_parse ;
refused_by_prefilter ;
refused_by_postfilter ;
applied ; branch_delayed;
branch_refused ;
refused ; duplicate ; outdated }
)
(obj10
(req "cannot_download" counter)
(req "cannot_parse" counter)
(req "refused_by_prefilter" counter)
(req "refused_by_postfilter" counter)
(req "applied" counter)
(req "branch_delayed" counter)
(req "branch_refused" counter)
(req "refused" counter)
(req "duplicate" counter)
(req "outdated" counter)
)
)
type resource_kind =
| Block | Operations | Protocol
type advertisement = Head | Branch
type metadata =
(* Distributed_db *)
| Received_request of requests_kind
| Sent_request of requests_kind
| Failed_request of requests_kind
| Scheduled_request of requests_kind
| Received_response of requests_kind
| Sent_response of requests_kind
| Unexpected_response
| Unactivated_chain
| Inactive_chain
| Future_block
| Unadvertised of resource_kind
| Sent_advertisement of advertisement
| Received_advertisement of advertisement
| Outdated_response (* TODO : unused *)
(* Peer validator *)
| Valid_blocks | Old_heads
(* Prevalidation *)
| Cannot_download | Cannot_parse
| Refused_by_prefilter
| Refused_by_postfilter
| Applied | Branch_delayed
| Branch_refused
| Refused | Duplicate | Outdated
type responses = {
mutable sent : messages ;
(** p2p sent messages of type responses *)
mutable failed : messages ;
(** p2p sent messages of type responses *)
mutable received : messages ;
(** p2p received responses *)
mutable unexpected : counter ;
(** p2p received responses that were unexpected *)
mutable outdated : counter ;
(** p2p received responses that are now outdated *)
}
let responses_encoding =
let open Data_encoding in
(conv
(fun
{ sent ; failed ; received ;
unexpected ; outdated ; } -> (sent, failed, received,
unexpected, outdated))
(fun
(sent, failed, received,
unexpected, outdated) -> { sent ; failed ; received ;
unexpected ; outdated })
)
(obj5
(req "sent" sent_requests_encoding)
(req "failed" sent_requests_encoding)
(req "received" sent_requests_encoding)
(req "unexpected" counter)
(req "outdated" counter)
)
type unadvertised = {
mutable block : counter ;
(** requests for unadvertised block *)
mutable operations : counter ;
(** requests for unadvertised operations *)
mutable protocol : counter ;
(** requests for unadvertised protocol *)
}
let unadvertised_encoding =
let open Data_encoding in
(conv
(fun
{ block ; operations ; protocol ; } -> (block, operations, protocol))
(fun
(block, operations, protocol) -> { block ; operations ; protocol ; })
)
(obj3
(req "block" counter)
(req "operations" counter)
(req "protocol" counter)
)
type advertisements_kind = {
mutable head : counter ;
mutable branch : counter ;
}
let advertisements_kind_encoding =
let open Data_encoding in
(conv
(fun
{ head ; branch ; } -> (head, branch))
(fun
(head, branch) -> { head ; branch ; })
)
(obj2
(req "head" counter)
(req "branch" counter)
)
type advertisements = {
mutable sent: advertisements_kind ;
mutable received: advertisements_kind ;
}
let advertisements_encoding =
let open Data_encoding in
(conv
(fun
{ sent ; received ; } -> (sent, received))
(fun
(sent, received) -> { sent ; received ; })
)
(obj2
(req "sent" advertisements_kind_encoding)
(req "received" advertisements_kind_encoding)
)
type t = {
mutable responses : responses ;
(** responses sent/received *)
mutable requests : requests ;
(** requests sent/received *)
mutable valid_blocks : counter ;
(** new valid blocks advertized by a peer *)
mutable old_heads : counter ;
(** previously validated blocks from a peer *)
mutable prevalidator_results : prevalidator_results ;
(** prevalidator metadata *)
mutable unactivated_chains : counter ;
(** requests from unactivated chains *)
mutable inactive_chains : counter ;
(** advertise inactive chains *)
mutable future_blocks_advertised : counter ;
(** future blocks *)
mutable unadvertised : unadvertised ;
(** requests for unadvertised resources *)
mutable advertisements : advertisements ;
(** advertisements sent *)
}
let empty () =
let empty_request () =
{ branch = 0 ; head = 0 ; block_header = 0 ;
operations = 0 ; protocols = 0 ;
operation_hashes_for_block = 0 ;
operations_for_block = 0 ;
other = 0 ;
} in
{
responses = { sent = empty_request () ;
failed = empty_request () ;
received = empty_request () ;
unexpected = 0 ;
outdated = 0 ;
} ;
requests =
{ sent = empty_request () ;
failed = empty_request () ;
scheduled = empty_request () ;
received = empty_request () ;
} ;
valid_blocks = 0 ;
old_heads = 0 ;
prevalidator_results =
{ cannot_download = 0 ; cannot_parse = 0 ;
refused_by_prefilter = 0 ; refused_by_postfilter = 0 ;
applied = 0 ; branch_delayed = 0 ; branch_refused = 0 ;
refused = 0 ; duplicate = 0 ; outdated = 0
} ;
unactivated_chains = 0 ;
inactive_chains = 0 ;
future_blocks_advertised = 0 ;
unadvertised = {block = 0 ; operations = 0 ; protocol = 0 } ;
advertisements = { sent = { head = 0 ; branch = 0 ; } ;
received = { head = 0 ; branch = 0 ; } }
}
let encoding =
let open Data_encoding in
(conv
(fun { responses ; requests ;
valid_blocks ; old_heads ;
prevalidator_results ;
unactivated_chains ;
inactive_chains ;
future_blocks_advertised ;
unadvertised ;
advertisements } ->
((responses, requests,
valid_blocks, old_heads,
prevalidator_results,
unactivated_chains,
inactive_chains,
future_blocks_advertised),
(unadvertised,
advertisements))
)
(fun ((responses, requests,
valid_blocks, old_heads,
prevalidator_results,
unactivated_chains,
inactive_chains,
future_blocks_advertised),
(unadvertised,
advertisements)) ->
{ responses ; requests ;
valid_blocks ; old_heads ;
prevalidator_results ;
unactivated_chains ;
inactive_chains ;
future_blocks_advertised ;
unadvertised ;
advertisements ; }
)
)
(merge_objs
(obj8
(req "responses" responses_encoding)
(req "requests" requests_encoding)
(req "valid_blocks" counter)
(req "old_heads" counter)
(req "prevalidator_results" prevalidator_results_encoding)
(req "unactivated_chains" counter)
(req "inactive_chains" counter)
(req "future_blocks_advertised" counter)
)
(obj2
(req "unadvertised" unadvertised_encoding)
(req "advertisements" advertisements_encoding)
)
)
let incr_requests (msgs : messages) (req : requests_kind) =
match req with
| Branch -> msgs.branch <- msgs.branch + 1
| Head -> msgs.head <- msgs.head + 1
| Block_header -> msgs.block_header <- msgs.block_header + 1
| Operations -> msgs.operations <- msgs.operations + 1
| Protocols -> msgs.protocols <- msgs.protocols + 1
| Operation_hashes_for_block ->
msgs.operation_hashes_for_block <- msgs.operation_hashes_for_block + 1
| Operations_for_block ->
msgs.operations_for_block <- msgs.operations_for_block + 1
| Other ->
msgs.other <- msgs.other + 1
let incr_unadvertised { unadvertised = u ; _ } = function
| Block -> u.block <- u.block + 1
| Operations -> u.operations <- u.operations + 1
| Protocol -> u.protocol <- u.protocol + 1
let incr ({responses = rsps ; requests = rqst ; _ } as m) metadata =
match metadata with
(* requests *)
| Received_request req ->
incr_requests rqst.received req
| Sent_request req ->
incr_requests rqst.sent req
| Scheduled_request req ->
incr_requests rqst.scheduled req
| Failed_request req ->
incr_requests rqst.failed req
(* responses *)
| Received_response req ->
incr_requests rsps.received req
| Sent_response req ->
incr_requests rsps.sent req
| Unexpected_response ->
rsps.unexpected <- rsps.unexpected + 1
| Outdated_response ->
rsps.outdated <- rsps.outdated + 1
(* Advertisements *)
| Sent_advertisement ad ->
begin match ad with
| Head ->
m.advertisements.sent.head <- m.advertisements.sent.head + 1
| Branch ->
m.advertisements.sent.branch <- m.advertisements.sent.branch + 1
end
| Received_advertisement ad ->
begin match ad with
| Head ->
m.advertisements.received.head <- m.advertisements.received.head + 1
| Branch ->
m.advertisements.received.branch <- m.advertisements.received.branch + 1
end
(* Unexpected erroneous msg *)
| Unactivated_chain ->
m.unactivated_chains <- m.unactivated_chains + 1
| Inactive_chain ->
m.inactive_chains <- m.inactive_chains + 1
| Future_block ->
m.future_blocks_advertised <- m.future_blocks_advertised + 1
| Unadvertised u -> incr_unadvertised m u
(* Peer validator *)
| Valid_blocks ->
m.valid_blocks <- m.valid_blocks + 1
| Old_heads ->
m.old_heads <- m.old_heads + 1
(* prevalidation *)
| Cannot_download ->
m.prevalidator_results <-
{ m.prevalidator_results with
cannot_download = m.prevalidator_results.cannot_download + 1 }
| Cannot_parse -> m.prevalidator_results <-
{ m.prevalidator_results with
cannot_parse = m.prevalidator_results.cannot_parse + 1 }
| Refused_by_prefilter -> m.prevalidator_results <-
{ m.prevalidator_results with
refused_by_prefilter =
m.prevalidator_results.refused_by_prefilter + 1 }
| Refused_by_postfilter -> m.prevalidator_results <-
{ m.prevalidator_results with
refused_by_postfilter =
m.prevalidator_results.refused_by_postfilter + 1 }
| Applied ->
m.prevalidator_results <-
{ m.prevalidator_results with
applied = m.prevalidator_results.applied + 1 }
| Branch_delayed ->
m.prevalidator_results <-
{ m.prevalidator_results with
branch_delayed = m.prevalidator_results.branch_delayed + 1 }
| Branch_refused ->
m.prevalidator_results <-
{ m.prevalidator_results with
branch_refused = m.prevalidator_results.branch_refused + 1 }
| Refused ->
m.prevalidator_results <-
{ m.prevalidator_results with
refused = m.prevalidator_results.refused + 1 }
| Duplicate ->
m.prevalidator_results <-
{ m.prevalidator_results with
duplicate = m.prevalidator_results.duplicate + 1 }
| Outdated ->
m.prevalidator_results <-
{ m.prevalidator_results with
outdated = m.prevalidator_results.outdated + 1 }
(* shortcuts to update sent/failed requests/responses *)
let update_requests { requests = { sent ; failed ; _ } ; _ } kind = function
| true -> incr_requests sent kind
| false -> incr_requests failed kind
let update_responses { responses = { sent ; failed ; _ } ; _ } kind = function
| true -> incr_requests sent kind
| false -> incr_requests failed kind
(* Scores computation *)
(* TODO:
- scores cannot be kept as integers (use big numbers?)
- they scores should probably be reset frequently (at each block/cycle?)
- we might still need to keep some kind of score history
- store only best/worst/last_value/mean/variance... ?
- do we need to keep "good" scores ?
- maybe "bad" scores are enough to reduce resources
allocated to misbehaving peers *)
let distributed_db_score _ =
(* TODO *)
1.0
let prevalidation_score { prevalidator_results = _ ; _ } =
(* TODO *)
1.0
let score _ =
(* TODO *)
1.0

View File

@ -25,5 +25,59 @@
(** All the (persistent) metadata associated to a peer. *)
type t = unit (* TODO *)
type t
val encoding: t Data_encoding.t
val empty : unit -> t
(** the aggregate score function computed from
the metadata collected for a peer *)
val distributed_db_score : t -> float
val prevalidation_score : t -> float
val score : t -> float
type requests_kind =
| Branch | Head | Block_header
| Operations | Protocols
| Operation_hashes_for_block | Operations_for_block
| Other
type resource_kind =
| Block | Operations | Protocol
type advertisement = Head | Branch
type metadata =
(* Distributed_db *)
| Received_request of requests_kind
| Sent_request of requests_kind
| Failed_request of requests_kind
| Scheduled_request of requests_kind
| Received_response of requests_kind
| Sent_response of requests_kind
| Unexpected_response
| Unactivated_chain
| Inactive_chain
| Future_block
| Unadvertised of resource_kind
| Sent_advertisement of advertisement
| Received_advertisement of advertisement
| Outdated_response (* TODO : unused *)
(* Peer validator *)
| Valid_blocks | Old_heads
(* Prevalidation *)
| Cannot_download | Cannot_parse
| Refused_by_prefilter
| Refused_by_postfilter
| Applied | Branch_delayed
| Branch_refused
| Refused | Duplicate | Outdated
(** incr score counters . Used to compute the final score for a peer *)
val incr : t -> metadata -> unit
val update_requests : t -> requests_kind -> bool -> unit
val update_responses : t -> requests_kind -> bool -> unit