diff --git a/src/lib_base/p2p_peer.ml b/src/lib_base/p2p_peer.ml index 7ac72408b..e59dde236 100644 --- a/src/lib_base/p2p_peer.ml +++ b/src/lib_base/p2p_peer.ml @@ -89,10 +89,11 @@ end module Info = struct - type 'conn_meta t = { + type ('peer_meta, 'conn_meta) t = { score : float ; trusted : bool ; - conn_metadata : 'conn_meta option; + conn_metadata : 'conn_meta option ; + peer_metadata : 'peer_meta ; state : State.t ; id_point : P2p_connection.Id.t option ; stat : P2p_stat.t ; @@ -104,31 +105,36 @@ module Info = struct last_miss : (P2p_connection.Id.t * Time.t) option ; } - let encoding conn_metadata_encoding = + let encoding peer_metadata_encoding conn_metadata_encoding = let open Data_encoding in conv (fun ( - { score ; trusted ; conn_metadata ; state ; id_point ; stat ; + { score ; trusted ; conn_metadata ; peer_metadata ; + state ; id_point ; stat ; last_failed_connection ; last_rejected_connection ; last_established_connection ; last_disconnection ; last_seen ; last_miss }) -> - ((score, trusted, conn_metadata, state, id_point, stat), + ((score, trusted, conn_metadata, peer_metadata, + state, id_point, stat), (last_failed_connection, last_rejected_connection, last_established_connection, last_disconnection, last_seen, last_miss))) - (fun ((score, trusted, conn_metadata, state, id_point, stat), + (fun ((score, trusted, conn_metadata, peer_metadata, + state, id_point, stat), (last_failed_connection, last_rejected_connection, last_established_connection, last_disconnection, last_seen, last_miss)) -> - { score ; trusted ; conn_metadata ; state ; id_point ; stat ; + { score ; trusted ; conn_metadata ; peer_metadata ; + state ; id_point ; stat ; last_failed_connection ; last_rejected_connection ; last_established_connection ; last_disconnection ; last_seen ; last_miss }) (merge_objs - (obj6 + (obj7 (req "score" float) (req "trusted" bool) (opt "conn_metadata" conn_metadata_encoding) + (req "peer_metadata" peer_metadata_encoding) (req "state" State.encoding) (opt "reachable_at" P2p_connection.Id.encoding) (req "stat" P2p_stat.encoding)) diff --git a/src/lib_base/p2p_peer.mli b/src/lib_base/p2p_peer.mli index 4299ec99b..57d020051 100644 --- a/src/lib_base/p2p_peer.mli +++ b/src/lib_base/p2p_peer.mli @@ -56,10 +56,11 @@ end module Info : sig - type 'conn_meta t = { + type ('peer_meta, 'conn_meta) t = { score : float ; trusted : bool ; conn_metadata : 'conn_meta option ; + peer_metadata : 'peer_meta ; state : State.t ; id_point : P2p_connection.Id.t option ; stat : P2p_stat.t ; @@ -71,7 +72,8 @@ module Info : sig last_miss : (P2p_connection.Id.t * Time.t) option ; } - val encoding : 'conn_meta Data_encoding.t -> 'conn_meta t Data_encoding.t + val encoding : 'peer_meta Data_encoding.t -> + 'conn_meta Data_encoding.t -> ('peer_meta, 'conn_meta) t Data_encoding.t end diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index a7b1328dc..d20879c6d 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -27,7 +27,7 @@ include Logging.Make(struct let name = "p2p" end) type 'peer_meta peer_meta_config = 'peer_meta P2p_pool.peer_meta_config = { peer_meta_encoding : 'peer_meta Data_encoding.t ; - peer_meta_initial : 'peer_meta ; + peer_meta_initial : unit -> 'peer_meta ; score : 'peer_meta -> float ; } @@ -462,7 +462,7 @@ let faked_network peer_cfg faked_metadata = { connection_remote_metadata = (fun _ -> faked_metadata) ; connection_stat = (fun _ -> Fake.empty_stat) ; global_stat = (fun () -> Fake.empty_stat) ; - get_peer_metadata = (fun _ -> peer_cfg.peer_meta_initial) ; + get_peer_metadata = (fun _ -> peer_cfg.peer_meta_initial ()) ; set_peer_metadata = (fun _ _ -> ()) ; recv = (fun _ -> Lwt_utils.never_ending ()) ; recv_any = (fun () -> Lwt_utils.never_ending ()) ; @@ -556,6 +556,7 @@ let info_of_peer_info pool i = score ; trusted = trusted i ; conn_metadata = meta_opt ; + peer_metadata = peer_metadata i; state ; id_point ; stat ; diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index 05e0affe3..fc160f507 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -33,7 +33,7 @@ type 'peer_meta peer_meta_config = { peer_meta_encoding : 'peer_meta Data_encoding.t; - peer_meta_initial : 'peer_meta; + peer_meta_initial : unit -> 'peer_meta; score : 'peer_meta -> float ; } @@ -277,7 +277,7 @@ val on_new_connection : (P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit val build_rpc_directory : - (_, _, Connection_metadata.t) t -> unit RPC_directory.t + (_, Peer_metadata.t , Connection_metadata.t) t -> unit RPC_directory.t val greylist_addr : ('msg, 'peer_meta, 'conn_meta) net -> P2p_addr.t -> unit val greylist_peer : ('msg, 'peer_meta, 'conn_meta) net -> P2p_peer.Id.t -> unit diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index ecbd2b69c..ef5ea25ee 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -217,7 +217,7 @@ type config = { type 'peer_meta peer_meta_config = { peer_meta_encoding : 'peer_meta Data_encoding.t ; - peer_meta_initial : 'peer_meta ; + peer_meta_initial : unit -> 'peer_meta ; score : 'peer_meta -> float ; } @@ -377,7 +377,7 @@ let register_peer pool peer_id = Lwt_condition.broadcast pool.events.new_peer () ; let peer = P2p_peer_state.Info.create peer_id - ~peer_metadata:pool.peer_meta_config.peer_meta_initial in + ~peer_metadata:(pool.peer_meta_config.peer_meta_initial ()) in Option.iter pool.config.max_known_peer_ids ~f:begin fun (max, _) -> if P2p_peer.Table.length pool.known_peer_ids >= max then gc_peer_ids pool end ; @@ -527,7 +527,7 @@ module Peers = struct let get_peer_metadata pool peer_id = try P2p_peer_state.Info.peer_metadata (P2p_peer.Table.find pool.known_peer_ids peer_id) - with Not_found -> pool.peer_meta_config.peer_meta_initial + with Not_found -> pool.peer_meta_config.peer_meta_initial () let get_score pool peer_id = pool.peer_meta_config.score (get_peer_metadata pool peer_id) @@ -1194,11 +1194,23 @@ let create config peer_meta_config conn_meta_config message_config io_sched = peer_ids ; Lwt.return pool | Error err -> - log_error "@[Failed to parsed peers file:@ %a@]" + log_error "@[Failed to parse peers file:@ %a@]" pp_print_error err ; Lwt.return pool -let destroy pool = +let destroy ({ config ; peer_meta_config } as pool) = + lwt_log_info "Saving metadata in %s" config.peers_file >>= fun () -> + begin + P2p_peer_state.Info.File.save + config.peers_file + peer_meta_config.peer_meta_encoding + (P2p_peer.Table.fold (fun _ a b -> a::b) pool.known_peer_ids []) >>= function + | Error err -> + log_error "@[Failed to save peers file:@ %a@]" + pp_print_error err; + Lwt.return_unit + | Ok ()-> Lwt.return_unit + end >>= fun () -> P2p_point.Table.fold (fun _point point_info acc -> match P2p_point_state.get point_info with | Requested { cancel } | Accepted { cancel } -> diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index 598fa34b7..1a0a6edc8 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -144,7 +144,7 @@ type config = { type 'peer_meta peer_meta_config = { peer_meta_encoding : 'peer_meta Data_encoding.t ; - peer_meta_initial : 'peer_meta ; + peer_meta_initial : unit -> 'peer_meta ; score : 'peer_meta -> float ; } diff --git a/src/lib_p2p/test/test_p2p_pool.ml b/src/lib_p2p/test/test_p2p_pool.ml index 3ab130451..b0cc3b21c 100644 --- a/src/lib_p2p/test/test_p2p_pool.ml +++ b/src/lib_p2p/test/test_p2p_pool.ml @@ -46,7 +46,7 @@ type metadata = unit let peer_meta_config : metadata P2p_pool.peer_meta_config = { peer_meta_encoding = Data_encoding.empty ; - peer_meta_initial = () ; + peer_meta_initial = (fun _ -> ()) ; score = fun () -> 0. ; } diff --git a/src/lib_shell/distributed_db.ml b/src/lib_shell/distributed_db.ml index 4ab7a44c8..b99f4cad0 100644 --- a/src/lib_shell/distributed_db.ml +++ b/src/lib_shell/distributed_db.ml @@ -29,6 +29,7 @@ type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection type 'a request_param = { + p2p : (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.t ; data: 'a ; active: unit -> P2p_peer.Set.t ; send: P2p_peer.Id.t -> Message.t -> unit ; @@ -63,7 +64,21 @@ module Make_raw let active { active } = active () let rec send state gid keys = let first_keys, keys = List.split_n Request_message.max_length keys in - state.send gid (Request_message.forge state.data first_keys) ; + let msg = (Request_message.forge state.data first_keys) in + state.send gid msg ; + let open Peer_metadata in + let (req : requests_kind) = match msg with + | Get_current_branch _ -> Branch + | Get_current_head _ -> Head + | Get_block_headers _ -> Block_header + | Get_operations _ -> Operations + | Get_protocols _ -> Protocols + | Get_operation_hashes_for_blocks _ -> Operation_hashes_for_block + | Get_operations_for_blocks _ -> Operations_for_block + | _ -> Other in + let meta = P2p.get_peer_metadata state.p2p gid in + Peer_metadata.incr meta @@ Scheduled_request req ; + (* TODO update peer_metadata *) if keys <> [] then send state gid keys end @@ -375,6 +390,8 @@ let db { global_db } = global_db let my_peer_id chain_db = P2p.peer_id chain_db.global_db.p2p +let get_peer_metadata chain_db = P2p.get_peer_metadata chain_db.global_db.p2p + let read_block_header { disk } h = State.read_block disk h >>= function | Some b -> @@ -459,7 +476,8 @@ module P2p_reader = struct Chain_id.Table.add state.peer_active_chains chain_id chain_db ; f chain_db | None -> - (* TODO decrease peer score. *) + let meta = P2p.get_peer_metadata global_db.p2p state.gid in + Peer_metadata.incr meta Unactivated_chain ; Lwt.return_unit let deactivate state chain_db = @@ -468,10 +486,12 @@ module P2p_reader = struct P2p_peer.Set.remove state.gid !(chain_db.active_peers) ; P2p_peer.Table.remove chain_db.active_connections state.gid - let may_handle state chain_id f = + (* check if the chain advertized by a peer is (still) active *) + let may_handle global_db state chain_id f = match Chain_id.Table.find_opt state.peer_active_chains chain_id with | None -> - (* TODO decrease peer score *) + let meta = P2p.get_peer_metadata global_db.p2p state.gid in + Peer_metadata.incr meta Inactive_chain ; Lwt.return_unit | Some chain_db -> f chain_db @@ -490,6 +510,7 @@ module P2p_reader = struct let open Message in let open Handle_msg_Logging in + let meta = P2p.get_peer_metadata global_db.p2p state.gid in lwt_debug Tag.DSL.(fun f -> f "Read message from %a: %a" @@ -498,20 +519,20 @@ module P2p_reader = struct -% a Message.Logging.tag msg) >>= fun () -> match msg with - | Get_current_branch chain_id -> + Peer_metadata.incr meta @@ Received_request Branch; may_handle_global global_db chain_id @@ fun chain_db -> if not (Chain_id.Table.mem state.peer_active_chains chain_id) then - ignore - @@ P2p.try_send global_db.p2p state.conn - @@ Get_current_branch chain_id ; + Peer_metadata.update_requests meta Branch @@ + P2p.try_send global_db.p2p state.conn @@ + Get_current_branch chain_id ; let seed = { Block_locator.receiver_id=state.gid; sender_id=(my_peer_id chain_db) } in (Chain.locator chain_db.chain_state seed) >>= fun locator -> - ignore - @@ P2p.try_send global_db.p2p state.conn - @@ Current_branch (chain_id, locator) ; + Peer_metadata.update_responses meta Branch @@ + P2p.try_send global_db.p2p state.conn @@ + Current_branch (chain_id, locator) ; Lwt.return_unit | Current_branch (chain_id, locator) -> @@ -521,11 +542,11 @@ module P2p_reader = struct (State.Block.known_invalid chain_db.chain_state) (Block_header.hash head :: hist) >>= fun known_invalid -> if known_invalid then begin - (* TODO Kick *) + P2p.disconnect global_db.p2p state.conn >>= fun () -> P2p.greylist_peer global_db.p2p state.gid ; Lwt.return_unit end else if Time.(add (now ()) 15L < head.shell.timestamp) then begin - (* TODO some penalty *) + Peer_metadata.incr meta Future_block ; lwt_log_notice Tag.DSL.(fun f -> f "Received future block %a from peer %a." -% t event "received_future_block" @@ -534,17 +555,21 @@ module P2p_reader = struct Lwt.return_unit end else begin chain_db.callback.notify_branch state.gid locator ; + (* TODO discriminate between received advertisements + and responses? *) + Peer_metadata.incr meta @@ Received_advertisement Branch ; Lwt.return_unit end | Deactivate chain_id -> - may_handle state chain_id @@ fun chain_db -> + may_handle global_db state chain_id @@ fun chain_db -> deactivate state chain_db ; Chain_id.Table.remove state.peer_active_chains chain_id ; Lwt.return_unit | Get_current_head chain_id -> - may_handle state chain_id @@ fun chain_db -> + may_handle global_db state chain_id @@ fun chain_db -> + Peer_metadata.incr meta @@ Received_request Head ; let { Connection_metadata.disable_mempool } = P2p.connection_remote_metadata chain_db.global_db.p2p state.conn in begin @@ -555,13 +580,13 @@ module P2p_reader = struct State.Current_mempool.get chain_db.chain_state end >>= fun (head, mempool) -> (* TODO bound the sent mempool size *) - ignore - @@ P2p.try_send global_db.p2p state.conn - @@ Current_head (chain_id, head, mempool) ; + Peer_metadata.update_responses meta Head @@ + P2p.try_send global_db.p2p state.conn @@ + Current_head (chain_id, head, mempool) ; Lwt.return_unit | Current_head (chain_id, header, mempool) -> - may_handle state chain_id @@ fun chain_db -> + may_handle global_db state chain_id @@ fun chain_db -> let head = Block_header.hash header in State.Block.known_invalid chain_db.chain_state head >>= fun known_invalid -> let { Connection_metadata.disable_mempool } = @@ -574,11 +599,11 @@ module P2p_reader = struct This should probably warrant a reduction of the sender's score. *) in if known_invalid then begin - (* TODO Kick *) + P2p.disconnect global_db.p2p state.conn >>= fun () -> P2p.greylist_peer global_db.p2p state.gid ; Lwt.return_unit end else if Time.(add (now ()) 15L < header.shell.timestamp) then begin - (* TODO some penalty *) + Peer_metadata.incr meta Future_block ; lwt_log_notice Tag.DSL.(fun f -> f "Received future block %a from peer %a." -% t event "received_future_block" @@ -587,44 +612,51 @@ module P2p_reader = struct Lwt.return_unit end else begin chain_db.callback.notify_head state.gid header mempool ; + (* TODO discriminate between received advertisements + and responses? *) + Peer_metadata.incr meta @@ Received_advertisement Head ; Lwt.return_unit end | Get_block_headers hashes -> + Peer_metadata.incr meta @@ Received_request Block_header ; Lwt_list.iter_p (fun hash -> read_block_header global_db hash >>= function | None -> - (* TODO: Blame request of unadvertised blocks ? *) + Peer_metadata.incr meta @@ Unadvertised Block ; Lwt.return_unit | Some (_chain_id, header) -> - ignore @@ - P2p.try_send global_db.p2p state.conn (Block_header header) ; + Peer_metadata.update_responses meta Block_header @@ + P2p.try_send global_db.p2p state.conn @@ + Block_header header ; Lwt.return_unit) hashes - | Block_header block -> begin let hash = Block_header.hash block in match find_pending_block_header state hash with | None -> - (* TODO some penalty. *) + Peer_metadata.incr meta Unexpected_response ; Lwt.return_unit | Some chain_db -> Raw_block_header.Table.notify chain_db.block_header_db.table state.gid hash block >>= fun () -> + Peer_metadata.incr meta @@ Received_response Block_header ; Lwt.return_unit end | Get_operations hashes -> + Peer_metadata.incr meta @@ Received_request Operations ; Lwt_list.iter_p (fun hash -> read_operation global_db hash >>= function | None -> - (* TODO: Blame request of unadvertised operations ? *) + Peer_metadata.incr meta @@ Unadvertised Operations ; Lwt.return_unit | Some (_chain_id, op) -> - ignore @@ - P2p.try_send global_db.p2p state.conn (Operation op) ; + Peer_metadata.update_responses meta Operations @@ + P2p.try_send global_db.p2p state.conn @@ + Operation op ; Lwt.return_unit) hashes @@ -632,24 +664,27 @@ module P2p_reader = struct let hash = Operation.hash operation in match find_pending_operation state hash with | None -> - (* TODO some penalty. *) + Peer_metadata.incr meta Unexpected_response ; Lwt.return_unit | Some chain_db -> Raw_operation.Table.notify chain_db.operation_db.table state.gid hash operation >>= fun () -> + Peer_metadata.incr meta @@ Received_response Operations ; Lwt.return_unit end | Get_protocols hashes -> + Peer_metadata.incr meta @@ Received_request Protocols ; Lwt_list.iter_p (fun hash -> State.Protocol.read_opt global_db.disk hash >>= function | None -> - (* TODO: Blame request of unadvertised protocol ? *) + Peer_metadata.incr meta @@ Unadvertised Protocol ; Lwt.return_unit | Some p -> - ignore @@ - P2p.try_send global_db.p2p state.conn (Protocol p) ; + Peer_metadata.update_responses meta Protocols @@ + P2p.try_send global_db.p2p state.conn @@ + Protocol p ; Lwt.return_unit) hashes @@ -657,9 +692,12 @@ module P2p_reader = struct let hash = Protocol.hash protocol in Raw_protocol.Table.notify global_db.protocol_db.table state.gid hash protocol >>= fun () -> + Peer_metadata.incr meta @@ Received_response Protocols ; Lwt.return_unit | Get_operation_hashes_for_blocks blocks -> + Peer_metadata.incr meta @@ + Received_request Operation_hashes_for_block ; Lwt_list.iter_p (fun (hash, ofs) -> State.read_block global_db.disk hash >>= function @@ -667,7 +705,8 @@ module P2p_reader = struct | Some block -> State.Block.operation_hashes block ofs >>= fun (hashes, path) -> - ignore @@ + Peer_metadata.update_responses meta + Operation_hashes_for_block @@ P2p.try_send global_db.p2p state.conn @@ Operation_hashes_for_block (hash, ofs, hashes, path) ; Lwt.return_unit) @@ -676,16 +715,20 @@ module P2p_reader = struct | Operation_hashes_for_block (block, ofs, ops, path) -> begin match find_pending_operation_hashes state block ofs with | None -> - (* TODO some penalty. *) + Peer_metadata.incr meta Unexpected_response ; Lwt.return_unit | Some chain_db -> Raw_operation_hashes.Table.notify chain_db.operation_hashes_db.table state.gid (block, ofs) (ops, path) >>= fun () -> + Peer_metadata.incr meta @@ + Received_response Operation_hashes_for_block ; Lwt.return_unit end | Get_operations_for_blocks blocks -> + Peer_metadata.incr meta @@ + Received_request Operations_for_block ; Lwt_list.iter_p (fun (hash, ofs) -> State.read_block global_db.disk hash >>= function @@ -693,7 +736,8 @@ module P2p_reader = struct | Some block -> State.Block.operations block ofs >>= fun (ops, path) -> - ignore @@ + Peer_metadata.update_responses meta + Operations_for_block @@ P2p.try_send global_db.p2p state.conn @@ Operations_for_block (hash, ofs, ops, path) ; Lwt.return_unit) @@ -702,12 +746,14 @@ module P2p_reader = struct | Operations_for_block (block, ofs, ops, path) -> begin match find_pending_operations state block ofs with | None -> - (* TODO some penalty. *) + Peer_metadata.incr meta Unexpected_response ; Lwt.return_unit | Some chain_db -> Raw_operations.Table.notify chain_db.operations_db.table state.gid (block, ofs) (ops, path) >>= fun () -> + Peer_metadata.incr meta @@ + Received_response Operations_for_block ; Lwt.return_unit end @@ -734,6 +780,8 @@ module P2p_reader = struct } in Chain_id.Table.iter (fun chain_id _chain_db -> Lwt.async begin fun () -> + let meta = P2p.get_peer_metadata db.p2p gid in + Peer_metadata.incr meta (Sent_request Branch) ; P2p.send db.p2p conn (Get_current_branch chain_id) end) db.active_chains ; @@ -767,7 +815,8 @@ let raw_try_send p2p peer_id msg = let create disk p2p = let global_request = - { data = () ; + { p2p ; + data = () ; active = active_peer_ids p2p ; send = raw_try_send p2p ; } in @@ -791,7 +840,8 @@ let activate ({ p2p ; active_chains } as global_db) chain_state = | None -> let active_peers = ref P2p_peer.Set.empty in let p2p_request = - { data = () ; + { p2p ; + data = () ; active = (fun () -> !active_peers) ; send = raw_try_send p2p ; } in @@ -997,10 +1047,22 @@ module Request = struct let current_head chain_db ?peer () = let chain_id = State.Chain.id chain_db.chain_state in + begin match peer with + |Some peer -> + let meta = P2p.get_peer_metadata chain_db.global_db.p2p peer in + Peer_metadata.incr meta (Sent_request Head) + |None -> () + end ; send chain_db ?peer @@ Get_current_head chain_id let current_branch chain_db ?peer () = let chain_id = State.Chain.id chain_db.chain_state in + begin match peer with + |Some peer -> + let meta = P2p.get_peer_metadata chain_db.global_db.p2p peer in + Peer_metadata.incr meta (Sent_request Head) + |None -> () + end ; send chain_db ?peer @@ Get_current_branch chain_id end @@ -1010,6 +1072,12 @@ module Advertise = struct let current_head chain_db ?peer ?(mempool = Mempool.empty) head = let chain_id = State.Chain.id chain_db.chain_state in assert (Chain_id.equal chain_id (State.Block.chain_id head)) ; + begin match peer with + | Some peer -> + let meta = P2p.get_peer_metadata chain_db.global_db.p2p peer in + Peer_metadata.incr meta (Sent_advertisement Head) + | None -> () + end ; let msg_mempool = Message.Current_head (chain_id, State.Block.header head, mempool) in if mempool = Mempool.empty then @@ -1035,6 +1103,13 @@ module Advertise = struct let chain_id = State.Chain.id chain_db.chain_state in let chain_state = chain_state chain_db in let sender_id = my_peer_id chain_db in + begin match peer with + | Some peer -> + let meta = P2p.get_peer_metadata chain_db.global_db.p2p peer in + Peer_metadata.incr meta (Sent_advertisement Branch) + | None -> () + end ; + match peer with | Some receiver_id -> let seed = { diff --git a/src/lib_shell/distributed_db.mli b/src/lib_shell/distributed_db.mli index 1a7caa333..cbf410689 100644 --- a/src/lib_shell/distributed_db.mli +++ b/src/lib_shell/distributed_db.mli @@ -79,6 +79,8 @@ val db: chain_db -> db (** Return the peer id of the node *) val my_peer_id: chain_db -> P2p_peer.Id.t +val get_peer_metadata: chain_db -> P2p_peer.Id.t -> Peer_metadata.t + (** {1 Sending messages} *) module Request : sig diff --git a/src/lib_shell/node.ml b/src/lib_shell/node.ml index c2a44930d..065ae2fa0 100644 --- a/src/lib_shell/node.ml +++ b/src/lib_shell/node.ml @@ -38,8 +38,8 @@ type t = { let peer_metadata_cfg : _ P2p.peer_meta_config = { peer_meta_encoding = Peer_metadata.encoding ; - peer_meta_initial = () ; - score = fun _ -> 0. ; + peer_meta_initial = Peer_metadata.empty ; + score = Peer_metadata.score ; } let connection_metadata_cfg cfg : _ P2p.conn_meta_config = { diff --git a/src/lib_shell/peer_validator.ml b/src/lib_shell/peer_validator.ml index 974e8d7cb..c608d4fd0 100644 --- a/src/lib_shell/peer_validator.ml +++ b/src/lib_shell/peer_validator.ml @@ -165,6 +165,8 @@ let validate_new_head w hash (header : Block_header.t) = Block_hash.pp_short hash P2p_peer.Id.pp_short pv.peer_id ; set_bootstrapped pv ; + let meta = Distributed_db.get_peer_metadata pv.parameters.chain_db pv.peer_id in + Peer_metadata.incr meta Valid_blocks; return_unit let only_if_fitness_increases w distant_header cont = @@ -180,6 +182,8 @@ let only_if_fitness_increases w distant_header cont = Block_hash.pp_short (Block_header.hash distant_header) P2p_peer.Id.pp_short pv.peer_id ; (* Don't download a branch that cannot beat the current head. *) + let meta = Distributed_db.get_peer_metadata pv.parameters.chain_db pv.peer_id in + Peer_metadata.incr meta Old_heads; return_unit end else cont () diff --git a/src/lib_shell_services/p2p_services.ml b/src/lib_shell_services/p2p_services.ml index 7be588eac..9cec46c3c 100644 --- a/src/lib_shell_services/p2p_services.ml +++ b/src/lib_shell_services/p2p_services.ml @@ -235,7 +235,8 @@ module Peers = struct let info = RPC_service.get_service ~query: RPC_query.empty - ~output: (P2p_peer.Info.encoding Connection_metadata.encoding) + ~output: (P2p_peer.Info.encoding Peer_metadata.encoding + Connection_metadata.encoding) ~description:"Details about a given peer." RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg) @@ -260,7 +261,8 @@ module Peers = struct ~output: Data_encoding.(list (tup2 P2p_peer.Id.encoding - (P2p_peer.Info.encoding Connection_metadata.encoding))) + (P2p_peer.Info.encoding Peer_metadata.encoding + Connection_metadata.encoding))) ~description:"List the peers the node ever met." RPC_path.(root / "network" / "peers") diff --git a/src/lib_shell_services/p2p_services.mli b/src/lib_shell_services/p2p_services.mli index 73ee47bbe..38b9d2913 100644 --- a/src/lib_shell_services/p2p_services.mli +++ b/src/lib_shell_services/p2p_services.mli @@ -173,11 +173,11 @@ module Peers : sig val list: ?filter:(P2p_peer.Filter.t list) -> #simple -> - (P2p_peer.Id.t * Connection_metadata.t P2p_peer.Info.t) list tzresult Lwt.t + (P2p_peer.Id.t * (Peer_metadata.t, Connection_metadata.t) P2p_peer.Info.t) list tzresult Lwt.t val info: #simple -> P2p_peer.Id.t -> - Connection_metadata.t P2p_peer.Info.t tzresult Lwt.t + (Peer_metadata.t, Connection_metadata.t) P2p_peer.Info.t tzresult Lwt.t val events: #streamed -> P2p_peer.Id.t -> @@ -198,12 +198,12 @@ module Peers : sig val list : ([ `GET ], unit, unit, < filters: P2p_peer.Filter.t list >, unit, - (P2p_peer.Id.t * Connection_metadata.t P2p_peer.Info.t) list) RPC_service.t + (P2p_peer.Id.t * (Peer_metadata.t, Connection_metadata.t) P2p_peer.Info.t) list) RPC_service.t val info : ([ `GET ], unit, unit * P2p_peer.Id.t, unit, unit, - Connection_metadata.t P2p_peer.Info.t) RPC_service.t + (Peer_metadata.t, Connection_metadata.t) P2p_peer.Info.t) RPC_service.t val events : ([ `GET ], unit, diff --git a/src/lib_shell_services/peer_metadata.ml b/src/lib_shell_services/peer_metadata.ml index e7fd0df72..2ffdda4c1 100644 --- a/src/lib_shell_services/peer_metadata.ml +++ b/src/lib_shell_services/peer_metadata.ml @@ -23,5 +23,533 @@ (* *) (*****************************************************************************) -type t = unit -let encoding = Data_encoding.empty +type counter = int +let counter = Data_encoding.int31 + +(* Distributed DB peer metadata *) +type messages = + { + mutable branch: counter ; + mutable head: counter ; + mutable block_header: counter ; + mutable operations: counter ; + mutable protocols: counter ; + mutable operation_hashes_for_block: counter ; + mutable operations_for_block: counter ; + mutable other: counter ; + } + +let sent_requests_encoding = + let open Data_encoding in + (conv + (fun { branch ; head ; block_header ; + operations ; protocols ; + operation_hashes_for_block ; + operations_for_block ; + other ; } -> (branch, head, block_header, + operations, protocols, + operation_hashes_for_block, + operations_for_block, + other )) + (fun (branch, head, block_header, + operations, protocols, + operation_hashes_for_block, + operations_for_block, + other) -> { branch ; head ; block_header ; + operations ; protocols ; + operation_hashes_for_block ; + operations_for_block ; + other }) + ) + (obj8 + (req "branch" counter) + (req "head" counter) + (req "block_header" counter) + (req "operations" counter) + (req "protocols" counter) + (req "operation_hashes_for_block" counter) + (req "operations_for_block" counter) + (req "other" counter) + ) + +type requests_kind = + | Branch | Head | Block_header | Operations + | Protocols | Operation_hashes_for_block + | Operations_for_block | Other + + +type requests = { + sent : messages ; + (** p2p sent messages of type requests *) + received : messages ; + (** p2p received messages of type requests *) + failed : messages ; + (** p2p messages of type requests that we failed to send *) + scheduled : messages ; + (** p2p messages ent via request scheduler *) +} + +let requests_encoding = + let open Data_encoding in + (conv + (fun + { sent ; received ; + failed ; scheduled } -> (sent, received, + failed, scheduled)) + (fun (sent, received, + failed, scheduled) -> { sent ; received ; + failed ; scheduled }) + ) + (obj4 + (req "sent" sent_requests_encoding) + (req "received" sent_requests_encoding) + (req "failed" sent_requests_encoding) + (req "scheduled" sent_requests_encoding) + ) + + +(* Prevalidator peer metadata *) +type prevalidator_results = + { cannot_download : counter ; cannot_parse : counter ; + refused_by_prefilter : counter ; + refused_by_postfilter : counter ; + (* prevalidation results *) + applied : counter ; branch_delayed : counter ; + branch_refused : counter ; + refused : counter ; duplicate : counter ; outdated : counter } + +let prevalidator_results_encoding = + let open Data_encoding in + (conv + (fun { cannot_download ; + cannot_parse ; + refused_by_prefilter ; + refused_by_postfilter ; + applied ; branch_delayed; + branch_refused ; + refused ; duplicate ; + outdated } -> (cannot_download, cannot_parse, + refused_by_prefilter, + refused_by_postfilter, + applied, branch_delayed, + branch_refused, + refused, duplicate, outdated)) + (fun (cannot_download, + cannot_parse, + refused_by_prefilter, + refused_by_postfilter, + applied, branch_delayed, + branch_refused, + refused, duplicate, + outdated) -> { cannot_download ; cannot_parse ; + refused_by_prefilter ; + refused_by_postfilter ; + applied ; branch_delayed; + branch_refused ; + refused ; duplicate ; outdated } + + ) + (obj10 + (req "cannot_download" counter) + (req "cannot_parse" counter) + (req "refused_by_prefilter" counter) + (req "refused_by_postfilter" counter) + (req "applied" counter) + (req "branch_delayed" counter) + (req "branch_refused" counter) + (req "refused" counter) + (req "duplicate" counter) + (req "outdated" counter) + ) + ) + + +type resource_kind = + | Block | Operations | Protocol + +type advertisement = Head | Branch + +type metadata = + (* Distributed_db *) + | Received_request of requests_kind + | Sent_request of requests_kind + | Failed_request of requests_kind + | Scheduled_request of requests_kind + | Received_response of requests_kind + | Sent_response of requests_kind + | Unexpected_response + | Unactivated_chain + | Inactive_chain + | Future_block + | Unadvertised of resource_kind + | Sent_advertisement of advertisement + | Received_advertisement of advertisement + | Outdated_response (* TODO : unused *) + (* Peer validator *) + | Valid_blocks | Old_heads + (* Prevalidation *) + | Cannot_download | Cannot_parse + | Refused_by_prefilter + | Refused_by_postfilter + | Applied | Branch_delayed + | Branch_refused + | Refused | Duplicate | Outdated + + + + +type responses = { + mutable sent : messages ; + (** p2p sent messages of type responses *) + mutable failed : messages ; + (** p2p sent messages of type responses *) + mutable received : messages ; + (** p2p received responses *) + mutable unexpected : counter ; + (** p2p received responses that were unexpected *) + mutable outdated : counter ; + (** p2p received responses that are now outdated *) +} + + +let responses_encoding = + let open Data_encoding in + (conv + (fun + { sent ; failed ; received ; + unexpected ; outdated ; } -> (sent, failed, received, + unexpected, outdated)) + (fun + (sent, failed, received, + unexpected, outdated) -> { sent ; failed ; received ; + unexpected ; outdated }) + ) + (obj5 + (req "sent" sent_requests_encoding) + (req "failed" sent_requests_encoding) + (req "received" sent_requests_encoding) + (req "unexpected" counter) + (req "outdated" counter) + ) + + +type unadvertised = { + mutable block : counter ; + (** requests for unadvertised block *) + mutable operations : counter ; + (** requests for unadvertised operations *) + mutable protocol : counter ; + (** requests for unadvertised protocol *) +} + +let unadvertised_encoding = + let open Data_encoding in + (conv + (fun + { block ; operations ; protocol ; } -> (block, operations, protocol)) + (fun + (block, operations, protocol) -> { block ; operations ; protocol ; }) + ) + (obj3 + (req "block" counter) + (req "operations" counter) + (req "protocol" counter) + ) + + +type advertisements_kind = { + mutable head : counter ; + mutable branch : counter ; +} + +let advertisements_kind_encoding = + let open Data_encoding in + (conv + (fun + { head ; branch ; } -> (head, branch)) + (fun + (head, branch) -> { head ; branch ; }) + ) + (obj2 + (req "head" counter) + (req "branch" counter) + ) + +type advertisements = { + mutable sent: advertisements_kind ; + mutable received: advertisements_kind ; +} + + +let advertisements_encoding = + let open Data_encoding in + (conv + (fun + { sent ; received ; } -> (sent, received)) + (fun + (sent, received) -> { sent ; received ; }) + ) + (obj2 + (req "sent" advertisements_kind_encoding) + (req "received" advertisements_kind_encoding) + ) + +type t = { + mutable responses : responses ; + (** responses sent/received *) + mutable requests : requests ; + (** requests sent/received *) + mutable valid_blocks : counter ; + (** new valid blocks advertized by a peer *) + mutable old_heads : counter ; + (** previously validated blocks from a peer *) + mutable prevalidator_results : prevalidator_results ; + (** prevalidator metadata *) + mutable unactivated_chains : counter ; + (** requests from unactivated chains *) + mutable inactive_chains : counter ; + (** advertise inactive chains *) + mutable future_blocks_advertised : counter ; + (** future blocks *) + mutable unadvertised : unadvertised ; + (** requests for unadvertised resources *) + mutable advertisements : advertisements ; + (** advertisements sent *) +} + +let empty () = + let empty_request () = + { branch = 0 ; head = 0 ; block_header = 0 ; + operations = 0 ; protocols = 0 ; + operation_hashes_for_block = 0 ; + operations_for_block = 0 ; + other = 0 ; + } in + { + responses = { sent = empty_request () ; + failed = empty_request () ; + received = empty_request () ; + unexpected = 0 ; + outdated = 0 ; + } ; + requests = + { sent = empty_request () ; + failed = empty_request () ; + scheduled = empty_request () ; + received = empty_request () ; + } ; + valid_blocks = 0 ; + old_heads = 0 ; + prevalidator_results = + { cannot_download = 0 ; cannot_parse = 0 ; + refused_by_prefilter = 0 ; refused_by_postfilter = 0 ; + applied = 0 ; branch_delayed = 0 ; branch_refused = 0 ; + refused = 0 ; duplicate = 0 ; outdated = 0 + } ; + unactivated_chains = 0 ; + inactive_chains = 0 ; + future_blocks_advertised = 0 ; + unadvertised = {block = 0 ; operations = 0 ; protocol = 0 } ; + advertisements = { sent = { head = 0 ; branch = 0 ; } ; + received = { head = 0 ; branch = 0 ; } } + } + + +let encoding = + let open Data_encoding in + (conv + (fun { responses ; requests ; + valid_blocks ; old_heads ; + prevalidator_results ; + unactivated_chains ; + inactive_chains ; + future_blocks_advertised ; + unadvertised ; + advertisements } -> + ((responses, requests, + valid_blocks, old_heads, + prevalidator_results, + unactivated_chains, + inactive_chains, + future_blocks_advertised), + (unadvertised, + advertisements)) + ) + (fun ((responses, requests, + valid_blocks, old_heads, + prevalidator_results, + unactivated_chains, + inactive_chains, + future_blocks_advertised), + (unadvertised, + advertisements)) -> + { responses ; requests ; + valid_blocks ; old_heads ; + prevalidator_results ; + unactivated_chains ; + inactive_chains ; + future_blocks_advertised ; + unadvertised ; + advertisements ; } + ) + ) + (merge_objs + (obj8 + (req "responses" responses_encoding) + (req "requests" requests_encoding) + (req "valid_blocks" counter) + (req "old_heads" counter) + (req "prevalidator_results" prevalidator_results_encoding) + (req "unactivated_chains" counter) + (req "inactive_chains" counter) + (req "future_blocks_advertised" counter) + + ) + (obj2 + (req "unadvertised" unadvertised_encoding) + (req "advertisements" advertisements_encoding) + ) + ) + +let incr_requests (msgs : messages) (req : requests_kind) = + match req with + | Branch -> msgs.branch <- msgs.branch + 1 + | Head -> msgs.head <- msgs.head + 1 + | Block_header -> msgs.block_header <- msgs.block_header + 1 + | Operations -> msgs.operations <- msgs.operations + 1 + | Protocols -> msgs.protocols <- msgs.protocols + 1 + | Operation_hashes_for_block -> + msgs.operation_hashes_for_block <- msgs.operation_hashes_for_block + 1 + | Operations_for_block -> + msgs.operations_for_block <- msgs.operations_for_block + 1 + | Other -> + msgs.other <- msgs.other + 1 + + + +let incr_unadvertised { unadvertised = u ; _ } = function + | Block -> u.block <- u.block + 1 + | Operations -> u.operations <- u.operations + 1 + | Protocol -> u.protocol <- u.protocol + 1 + + +let incr ({responses = rsps ; requests = rqst ; _ } as m) metadata = + match metadata with + (* requests *) + | Received_request req -> + incr_requests rqst.received req + | Sent_request req -> + incr_requests rqst.sent req + | Scheduled_request req -> + incr_requests rqst.scheduled req + | Failed_request req -> + incr_requests rqst.failed req + (* responses *) + | Received_response req -> + incr_requests rsps.received req + | Sent_response req -> + incr_requests rsps.sent req + | Unexpected_response -> + rsps.unexpected <- rsps.unexpected + 1 + | Outdated_response -> + rsps.outdated <- rsps.outdated + 1 + (* Advertisements *) + | Sent_advertisement ad -> + begin match ad with + | Head -> + m.advertisements.sent.head <- m.advertisements.sent.head + 1 + | Branch -> + m.advertisements.sent.branch <- m.advertisements.sent.branch + 1 + end + | Received_advertisement ad -> + begin match ad with + | Head -> + m.advertisements.received.head <- m.advertisements.received.head + 1 + | Branch -> + m.advertisements.received.branch <- m.advertisements.received.branch + 1 + end + (* Unexpected erroneous msg *) + | Unactivated_chain -> + m.unactivated_chains <- m.unactivated_chains + 1 + | Inactive_chain -> + m.inactive_chains <- m.inactive_chains + 1 + | Future_block -> + m.future_blocks_advertised <- m.future_blocks_advertised + 1 + | Unadvertised u -> incr_unadvertised m u + (* Peer validator *) + | Valid_blocks -> + m.valid_blocks <- m.valid_blocks + 1 + | Old_heads -> + m.old_heads <- m.old_heads + 1 + (* prevalidation *) + | Cannot_download -> + m.prevalidator_results <- + { m.prevalidator_results with + cannot_download = m.prevalidator_results.cannot_download + 1 } + | Cannot_parse -> m.prevalidator_results <- + { m.prevalidator_results with + cannot_parse = m.prevalidator_results.cannot_parse + 1 } + | Refused_by_prefilter -> m.prevalidator_results <- + { m.prevalidator_results with + refused_by_prefilter = + m.prevalidator_results.refused_by_prefilter + 1 } + | Refused_by_postfilter -> m.prevalidator_results <- + { m.prevalidator_results with + refused_by_postfilter = + m.prevalidator_results.refused_by_postfilter + 1 } + | Applied -> + m.prevalidator_results <- + { m.prevalidator_results with + applied = m.prevalidator_results.applied + 1 } + | Branch_delayed -> + m.prevalidator_results <- + { m.prevalidator_results with + branch_delayed = m.prevalidator_results.branch_delayed + 1 } + | Branch_refused -> + m.prevalidator_results <- + { m.prevalidator_results with + branch_refused = m.prevalidator_results.branch_refused + 1 } + | Refused -> + m.prevalidator_results <- + { m.prevalidator_results with + refused = m.prevalidator_results.refused + 1 } + | Duplicate -> + m.prevalidator_results <- + { m.prevalidator_results with + duplicate = m.prevalidator_results.duplicate + 1 } + | Outdated -> + m.prevalidator_results <- + { m.prevalidator_results with + outdated = m.prevalidator_results.outdated + 1 } + + +(* shortcuts to update sent/failed requests/responses *) +let update_requests { requests = { sent ; failed ; _ } ; _ } kind = function + | true -> incr_requests sent kind + | false -> incr_requests failed kind + +let update_responses { responses = { sent ; failed ; _ } ; _ } kind = function + | true -> incr_requests sent kind + | false -> incr_requests failed kind + + +(* Scores computation *) +(* TODO: + - scores cannot be kept as integers (use big numbers?) + - they scores should probably be reset frequently (at each block/cycle?) + - we might still need to keep some kind of score history + - store only best/worst/last_value/mean/variance... ? + - do we need to keep "good" scores ? + - maybe "bad" scores are enough to reduce resources + allocated to misbehaving peers *) +let distributed_db_score _ = + (* TODO *) + 1.0 + +let prevalidation_score { prevalidator_results = _ ; _ } = + (* TODO *) + 1.0 + +let score _ = + (* TODO *) + 1.0 diff --git a/src/lib_shell_services/peer_metadata.mli b/src/lib_shell_services/peer_metadata.mli index 0a4970b5f..36bc1f585 100644 --- a/src/lib_shell_services/peer_metadata.mli +++ b/src/lib_shell_services/peer_metadata.mli @@ -25,5 +25,59 @@ (** All the (persistent) metadata associated to a peer. *) -type t = unit (* TODO *) +type t + val encoding: t Data_encoding.t +val empty : unit -> t + + + +(** the aggregate score function computed from + the metadata collected for a peer *) +val distributed_db_score : t -> float +val prevalidation_score : t -> float +val score : t -> float + +type requests_kind = + | Branch | Head | Block_header + | Operations | Protocols + | Operation_hashes_for_block | Operations_for_block + | Other + +type resource_kind = + | Block | Operations | Protocol + +type advertisement = Head | Branch + +type metadata = + (* Distributed_db *) + | Received_request of requests_kind + | Sent_request of requests_kind + | Failed_request of requests_kind + | Scheduled_request of requests_kind + | Received_response of requests_kind + | Sent_response of requests_kind + | Unexpected_response + | Unactivated_chain + | Inactive_chain + | Future_block + | Unadvertised of resource_kind + | Sent_advertisement of advertisement + | Received_advertisement of advertisement + | Outdated_response (* TODO : unused *) + (* Peer validator *) + | Valid_blocks | Old_heads + (* Prevalidation *) + | Cannot_download | Cannot_parse + | Refused_by_prefilter + | Refused_by_postfilter + | Applied | Branch_delayed + | Branch_refused + | Refused | Duplicate | Outdated + +(** incr score counters . Used to compute the final score for a peer *) +val incr : t -> metadata -> unit +val update_requests : t -> requests_kind -> bool -> unit +val update_responses : t -> requests_kind -> bool -> unit + +