From b0ed3cefac25ea5fd4c4272432145a8ce45172ce Mon Sep 17 00:00:00 2001 From: Vincent Bernardoff Date: Fri, 17 Feb 2017 19:12:06 +0100 Subject: [PATCH] Shell: Add RPCs for introspecting the state of the P2P layer --- src/node/net/p2p.ml | 332 ++++++++++++++++++++- src/node/net/p2p.mli | 88 ++++++ src/node/net/p2p_connection_pool.ml | 200 +++++++++++-- src/node/net/p2p_connection_pool.mli | 76 ++++- src/node/net/p2p_connection_pool_types.ml | 75 ++++- src/node/net/p2p_connection_pool_types.mli | 10 + src/node/net/p2p_maintenance.ml | 6 +- src/node/shell/node.ml | 57 +++- src/node/shell/node.mli | 30 ++ src/node/shell/node_rpc.ml | 89 ++++++ src/node/shell/node_rpc_services.ml | 108 +++++++ src/node/shell/node_rpc_services.mli | 40 +++ src/node/shell/tezos_p2p.ml | 40 ++- src/node/shell/tezos_p2p.mli | 35 +++ 14 files changed, 1140 insertions(+), 46 deletions(-) diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index a41a5e43d..20de33019 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -222,7 +222,7 @@ module Real = struct | Error _ -> Lwt_utils.never_ending) :: acc end in Lwt.pick ( - ( P2p_connection_pool.Events.new_connection net.pool >>= fun () -> + ( P2p_connection_pool.PoolEvent.wait_new_connection net.pool >>= fun () -> Lwt.return_none ):: pipes) >>= function | None -> recv_any net () @@ -311,6 +311,7 @@ type ('msg, 'meta) t = { send : ('msg, 'meta) connection -> 'msg -> unit Lwt.t ; try_send : ('msg, 'meta) connection -> 'msg -> bool ; broadcast : 'msg -> unit ; + pool : ('msg, 'meta) P2p_connection_pool.t option ; } type ('msg, 'meta) net = ('msg, 'meta) t @@ -333,6 +334,7 @@ let create ~config ~limits meta_cfg msg_cfg = send = Real.send net ; try_send = Real.try_send net ; broadcast = Real.broadcast net ; + pool = Some net.pool ; } let faked_network = { @@ -352,6 +354,7 @@ let faked_network = { send = (fun _ _ -> Lwt_utils.never_ending) ; try_send = (fun _ _ -> false) ; broadcast = ignore ; + pool = None } let gid net = net.gid @@ -379,3 +382,330 @@ module Raw = struct | Disconnect let encoding = P2p_connection_pool.Message.encoding end + +module RPC = struct + + let stat net = + match net.pool with + | None -> Stat.empty + | Some pool -> P2p_connection_pool.pool_stat pool + + module Event = P2p_connection_pool.LogEvent + + let watch net = + match net.pool with + | None -> Watcher.create_fake_stream () + | Some pool -> P2p_connection_pool.watch pool + + let connect net point timeout = + match net.pool with + | None -> fail (Unclassified "fake net") + | Some pool -> + P2p_connection_pool.connect ~timeout pool point >>|? ignore + + module Connection = struct + let info net gid = + match net.pool with + | None -> None + | Some pool -> + map_option + (P2p_connection_pool.Gids.find_connection pool gid) + ~f:P2p_connection_pool.connection_info + + let kick net gid wait = + match net.pool with + | None -> Lwt.return_unit + | Some pool -> + match P2p_connection_pool.Gids.find_connection pool gid with + | None -> Lwt.return_unit + | Some conn -> P2p_connection_pool.disconnect ~wait conn + + let list net = + match net.pool with + | None -> [] + | Some pool -> + P2p_connection_pool.fold_connections + pool ~init:[] + ~f:begin fun _gid c acc -> + P2p_connection_pool.connection_info c :: acc + end + + let count net = + match net.pool with + | None -> 0 + | Some pool -> P2p_connection_pool.active_connections pool + end + + module Point = struct + type state = + | Requested + | Accepted + | Running + | Disconnected + + let state_encoding = + let open Data_encoding in + string_enum [ + "requested", Requested ; + "accepted", Accepted ; + "running", Running ; + "disconnected", Disconnected ; + ] + + type info = { + trusted : bool ; + greylisted_end : Time.t ; + state : state ; + gid : Gid.t option ; + last_failed_connection : Time.t option ; + last_rejected_connection : (Gid.t * Time.t) option ; + last_established_connection : (Gid.t * Time.t) option ; + last_disconnection : (Gid.t * Time.t) option ; + last_seen : (Gid.t * Time.t) option ; + last_miss : Time.t option ; + } + + let info_encoding = + let open Data_encoding in + conv + (fun { trusted ; greylisted_end ; state ; gid ; + last_failed_connection ; last_rejected_connection ; + last_established_connection ; last_disconnection ; + last_seen ; last_miss ; + } -> + (trusted, greylisted_end, state, gid, + last_failed_connection, last_rejected_connection, + last_established_connection, last_disconnection, + last_seen, last_miss) + ) + (fun (trusted, greylisted_end, state, gid, + last_failed_connection, last_rejected_connection, + last_established_connection, last_disconnection, + last_seen, last_miss) -> + { trusted ; greylisted_end ; state ; gid ; + last_failed_connection ; last_rejected_connection ; + last_established_connection ; last_disconnection ; + last_seen ; last_miss ; + } + ) + (obj10 + (req "trusted" bool) + (dft "greylisted_end" Time.encoding Time.epoch) + (req "state" state_encoding) + (opt "gid" Gid.encoding) + (opt "last_failed_connection" Time.encoding) + (opt "last_rejected_connection" (tup2 Gid.encoding Time.encoding)) + (opt "last_established_connection" (tup2 Gid.encoding Time.encoding)) + (opt "last_disconnection" (tup2 Gid.encoding Time.encoding)) + (opt "last_seen" (tup2 Gid.encoding Time.encoding)) + (opt "last_miss" Time.encoding)) + + let info_of_point_info i = + let open P2p_connection_pool in + let open P2p_connection_pool_types in + let state, gid = match Point_info.State.get i with + | Requested _ -> Requested, None + | Accepted { current_gid } -> Accepted, Some current_gid + | Running { current_gid } -> Running, Some current_gid + | Disconnected -> Disconnected, None in + Point_info.{ + trusted = trusted i ; + state ; gid ; + greylisted_end = greylisted_end i ; + last_failed_connection = last_failed_connection i ; + last_rejected_connection = last_rejected_connection i ; + last_established_connection = last_established_connection i ; + last_disconnection = last_disconnection i ; + last_seen = last_seen i ; + last_miss = last_miss i ; + } + + let info net point = + match net.pool with + | None -> None + | Some pool -> + map_option + (P2p_connection_pool.Points.info pool point) + ~f:info_of_point_info + + module Event = P2p_connection_pool_types.Point_info.Event + + let events ?(max=max_int) ?(rev=false) net point = + match net.pool with + | None -> [] + | Some pool -> + unopt_map + (P2p_connection_pool.Points.info pool point) + ~default:[] + ~f:begin fun pi -> + let evts = + P2p_connection_pool_types.Point_info.fold_events + pi ~init:[] ~f:(fun a e -> e :: a) in + (if rev then list_rev_sub else list_sub) evts max + end + + let watch net point = + match net.pool with + | None -> raise Not_found + | Some pool -> + match P2p_connection_pool.Points.info pool point with + | None -> raise Not_found + | Some pi -> P2p_connection_pool_types.Point_info.watch pi + + let infos ?(restrict=[]) net = + match net.pool with + | None -> [] + | Some pool -> + P2p_connection_pool.Points.fold_known + pool ~init:[] + ~f:begin fun point i a -> + let info = info_of_point_info i in + match restrict with + | [] -> (point, info) :: a + | _ when List.mem info.state restrict -> (point, info) :: a + | _ -> a + end + + end + + module Gid = struct + type state = + | Accepted + | Running + | Disconnected + + let state_encoding = + let open Data_encoding in + string_enum [ + "accepted", Accepted ; + "running", Running ; + "disconnected", Disconnected ; + ] + + type info = { + score : float ; + trusted : bool ; + state : state ; + id_point : Id_point.t option ; + stat : Stat.t ; + last_failed_connection : (Id_point.t * Time.t) option ; + last_rejected_connection : (Id_point.t * Time.t) option ; + last_established_connection : (Id_point.t * Time.t) option ; + last_disconnection : (Id_point.t * Time.t) option ; + last_seen : (Id_point.t * Time.t) option ; + last_miss : (Id_point.t * Time.t) option ; + } + + let info_encoding = + let open Data_encoding in + conv + (fun ( + { score ; trusted ; state ; id_point ; stat ; + last_failed_connection ; last_rejected_connection ; + last_established_connection ; last_disconnection ; + last_seen ; last_miss }) -> + ((score, trusted, state, id_point, stat), + (last_failed_connection, last_rejected_connection, + last_established_connection, last_disconnection, + last_seen, last_miss))) + (fun ((score, trusted, state, id_point, stat), + (last_failed_connection, last_rejected_connection, + last_established_connection, last_disconnection, + last_seen, last_miss)) -> + { score ; trusted ; state ; id_point ; stat ; + last_failed_connection ; last_rejected_connection ; + last_established_connection ; last_disconnection ; + last_seen ; last_miss }) + (merge_objs + (obj5 + (req "score" float) + (req "trusted" bool) + (req "state" state_encoding) + (opt "id_point" Id_point.encoding) + (req "stat" Stat.encoding)) + (obj6 + (opt "last_failed_connection" (tup2 Id_point.encoding Time.encoding)) + (opt "last_rejected_connection" (tup2 Id_point.encoding Time.encoding)) + (opt "last_established_connection" (tup2 Id_point.encoding Time.encoding)) + (opt "last_disconnection" (tup2 Id_point.encoding Time.encoding)) + (opt "last_seen" (tup2 Id_point.encoding Time.encoding)) + (opt "last_miss" (tup2 Id_point.encoding Time.encoding)))) + + let info_of_gid_info pool i = + let open P2p_connection_pool in + let open P2p_connection_pool_types in + let state, id_point = match Gid_info.State.get i with + | Accepted { current_point } -> Accepted, Some current_point + | Running { current_point } -> Running, Some current_point + | Disconnected -> Disconnected, None + in + let gid = Gid_info.gid i in + let meta = Gid_info.metadata i in + let score = P2p_connection_pool.score pool meta in + let stat = + match P2p_connection_pool.Gids.find_connection pool gid with + | None -> Stat.empty + | Some conn -> P2p_connection_pool.connection_stat conn + in Gid_info.{ + score ; + trusted = trusted i ; + state ; + id_point ; + stat ; + last_failed_connection = last_failed_connection i ; + last_rejected_connection = last_rejected_connection i ; + last_established_connection = last_established_connection i ; + last_disconnection = last_disconnection i ; + last_seen = last_seen i ; + last_miss = last_miss i ; + } + + let info net gid = + match net.pool with + | None -> None + | Some pool -> begin + match P2p_connection_pool.Gids.info pool gid with + | Some info -> Some (info_of_gid_info pool info) + | None -> None + end + + module Event = P2p_connection_pool_types.Gid_info.Event + + let events ?(max=max_int) ?(rev=false) net gid = + match net.pool with + | None -> [] + | Some pool -> + unopt_map + (P2p_connection_pool.Gids.info pool gid) + ~default:[] + ~f:begin fun gi -> + let evts = P2p_connection_pool_types.Gid_info.fold_events gi + ~init:[] ~f:(fun a e -> e :: a) in + (if rev then list_rev_sub else list_sub) evts max + end + + let watch net gid = + match net.pool with + | None -> raise Not_found + | Some pool -> + match P2p_connection_pool.Gids.info pool gid with + | None -> raise Not_found + | Some gi -> P2p_connection_pool_types.Gid_info.watch gi + + let infos ?(restrict=[]) net = + match net.pool with + | None -> [] + | Some pool -> + P2p_connection_pool.Gids.fold_known pool + ~init:[] + ~f:begin fun gid i a -> + let info = info_of_gid_info pool i in + match restrict with + | [] -> (gid, info) :: a + | _ when List.mem info.state restrict -> (gid, info) :: a + | _ -> a + end + + end + +end diff --git a/src/node/net/p2p.mli b/src/node/net/p2p.mli index 24967335d..f81469cd5 100644 --- a/src/node/net/p2p.mli +++ b/src/node/net/p2p.mli @@ -189,6 +189,94 @@ val try_send : (** Send a message to all peers *) val broadcast : ('msg, 'meta) net -> 'msg -> unit +module RPC : sig + + val stat : ('msg, 'meta) net -> Stat.t + + module Event = P2p_connection_pool.LogEvent + + val watch : ('msg, 'meta) net -> Event.t Lwt_stream.t * Watcher.stopper + val connect : ('msg, 'meta) net -> Point.t -> float -> unit tzresult Lwt.t + + module Connection : sig + val info : ('msg, 'meta) net -> Gid.t -> Connection_info.t option + val kick : ('msg, 'meta) net -> Gid.t -> bool -> unit Lwt.t + val list : ('msg, 'meta) net -> Connection_info.t list + val count : ('msg, 'meta) net -> int + end + + module Point : sig + + type state = + | Requested + | Accepted + | Running + | Disconnected + + val state_encoding : state Data_encoding.t + + type info = { + trusted : bool ; + greylisted_end : Time.t ; + state : state ; + gid : Gid.t option ; + last_failed_connection : Time.t option ; + last_rejected_connection : (Gid.t * Time.t) option ; + last_established_connection : (Gid.t * Time.t) option ; + last_disconnection : (Gid.t * Time.t) option ; + last_seen : (Gid.t * Time.t) option ; + last_miss : Time.t option ; + } + + val info_encoding : info Data_encoding.t + + module Event = P2p_connection_pool_types.Point_info.Event + + val info : + ('msg, 'meta) net -> Point.t -> info option + val infos : + ?restrict:state list -> ('msg, 'meta) net -> (Point.t * info) list + val events : + ?max:int -> ?rev:bool -> ('msg, 'meta) net -> Point.t -> Event.t list + val watch : + ('msg, 'meta) net -> Point.t -> Event.t Lwt_stream.t * Watcher.stopper + end + + module Gid : sig + + type state = + | Accepted + | Running + | Disconnected + + val state_encoding : state Data_encoding.t + + type info = { + score : float ; + trusted : bool ; + state : state ; + id_point : Id_point.t option ; + stat : Stat.t ; + last_failed_connection : (Id_point.t * Time.t) option ; + last_rejected_connection : (Id_point.t * Time.t) option ; + last_established_connection : (Id_point.t * Time.t) option ; + last_disconnection : (Id_point.t * Time.t) option ; + last_seen : (Id_point.t * Time.t) option ; + last_miss : (Id_point.t * Time.t) option ; + } + val info_encoding : info Data_encoding.t + + module Event = P2p_connection_pool_types.Gid_info.Event + + val info : ('msg, 'meta) net -> Gid.t -> info option + val infos : ?restrict:state list -> ('msg, 'meta) net -> (Gid.t * info) list + val events : ?max:int -> ?rev:bool -> ('msg, 'meta) net -> Gid.t -> Event.t list + val watch : ('msg, 'meta) net -> Gid.t -> Event.t Lwt_stream.t * Watcher.stopper + + end + +end + (**/**) module Raw : sig type 'a t = diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index 408051a9d..a47a1538f 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -125,6 +125,124 @@ module Answerer = struct end +module LogEvent = struct + type t = + | Too_few_connections + | Too_many_connections + | New_point of Point.t + | New_peer of Gid.t + | Incoming_connection of Point.t + | Outgoing_connection of Point.t + | Authentication_failed of Point.t + | Accepting_request of Point.t * Id_point.t * Gid.t + | Rejecting_request of Point.t * Id_point.t * Gid.t + | Request_rejected of Point.t * (Id_point.t * Gid.t) option + | Connection_established of Id_point.t * Gid.t + | Disconnection of Gid.t + | External_disconnection of Gid.t + + | Gc_points + | Gc_gids + + let encoding = + let open Data_encoding in + let branch_encoding name obj = + conv (fun x -> (), x) (fun ((), x) -> x) + (merge_objs + (obj1 (req "event" (constant name))) obj) in + union ~tag_size:`Uint8 [ + case ~tag:0 (branch_encoding "too_few_connections" empty) + (function Too_few_connections -> Some () | _ -> None) + (fun () -> Too_few_connections) ; + case ~tag:1 (branch_encoding "too_many_connections" empty) + (function Too_many_connections -> Some () | _ -> None) + (fun () -> Too_many_connections) ; + case ~tag:2 (branch_encoding "new_point" + (obj1 (req "point" Point.encoding))) + (function New_point p -> Some p | _ -> None) + (fun p -> New_point p) ; + case ~tag:3 (branch_encoding "new_peer" + (obj1 (req "gid" Gid.encoding))) + (function New_peer p -> Some p | _ -> None) + (fun p -> New_peer p) ; + case ~tag:4 (branch_encoding "incoming_connection" + (obj1 (req "point" Point.encoding))) + (function Incoming_connection p -> Some p | _ -> None) + (fun p -> Incoming_connection p) ; + case ~tag:5 (branch_encoding "outgoing_connection" + (obj1 (req "point" Point.encoding))) + (function Outgoing_connection p -> Some p | _ -> None) + (fun p -> Outgoing_connection p) ; + case ~tag:6 (branch_encoding "authentication_failed" + (obj1 (req "point" Point.encoding))) + (function Authentication_failed p -> Some p | _ -> None) + (fun p -> Authentication_failed p) ; + case ~tag:7 (branch_encoding "accepting_request" + (obj3 + (req "point" Point.encoding) + (req "id_point" Id_point.encoding) + (req "gid" Gid.encoding))) + (function Accepting_request (p, id_p, g) -> Some (p, id_p, g) | _ -> None) + (fun (p, id_p, g) -> Accepting_request (p, id_p, g)) ; + case ~tag:8 (branch_encoding "rejecting_request" + (obj3 + (req "point" Point.encoding) + (req "id_point" Id_point.encoding) + (req "gid" Gid.encoding))) + (function Rejecting_request (p, id_p, g) -> Some (p, id_p, g) | _ -> None) + (fun (p, id_p, g) -> Rejecting_request (p, id_p, g)) ; + case ~tag:9 (branch_encoding "request_rejected" + (obj2 + (req "point" Point.encoding) + (opt "identity" (tup2 Id_point.encoding Gid.encoding)))) + (function Request_rejected (p, id) -> Some (p, id) | _ -> None) + (fun (p, id) -> Request_rejected (p, id)) ; + case ~tag:10 (branch_encoding "connection_established" + (obj2 + (req "id_point" Id_point.encoding) + (req "gid" Gid.encoding))) + (function Connection_established (id_p, g) -> Some (id_p, g) | _ -> None) + (fun (id_p, g) -> Connection_established (id_p, g)) ; + case ~tag:11 (branch_encoding "disconnection" + (obj1 (req "gid" Gid.encoding))) + (function Disconnection g -> Some g | _ -> None) + (fun g -> Disconnection g) ; + case ~tag:12 (branch_encoding "external_disconnection" + (obj1 (req "gid" Gid.encoding))) + (function External_disconnection g -> Some g | _ -> None) + (fun g -> External_disconnection g) ; + case ~tag:13 (branch_encoding "gc_points" empty) + (function Gc_points -> Some () | _ -> None) + (fun () -> Gc_points) ; + case ~tag:14 (branch_encoding "gc_gids" empty) + (function Gc_gids -> Some () | _ -> None) + (fun () -> Gc_gids) ; + ] + + let log watcher event = Watcher.notify watcher event + + let too_few_connections watcher = log watcher Too_few_connections + let too_many_connections watcher = log watcher Too_many_connections + let new_point watcher ~point = log watcher (New_point point) + let new_peer watcher ~gid = log watcher (New_peer gid) + let incoming_connection watcher ~point = log watcher (Incoming_connection point) + let outgoing_connection watcher ~point = log watcher (Outgoing_connection point) + let authentication_failed watcher ~point = log watcher (Authentication_failed point) + let accepting_request watcher ~id_point ~point ~gid = + log watcher (Accepting_request (point, id_point, gid)) + let rejecting_request watcher ~id_point ~point ~gid = + log watcher (Rejecting_request (point, id_point, gid)) + let request_rejected watcher ?credentials ~point = + log watcher (Request_rejected (point, credentials)) + let connection_established watcher ~id_point ~gid = + log watcher (Connection_established (id_point, gid)) + let disconnection watcher ~is_external ~gid = + log watcher (if is_external then External_disconnection gid + else Disconnection gid) + let gc_points watcher = log watcher Gc_points + let gc_gids watcher = log watcher Gc_gids +end + type config = { identity : Identity.t ; @@ -174,12 +292,14 @@ type ('msg, 'meta) t = { io_sched : P2p_io_scheduler.t ; encoding : 'msg Message.t Data_encoding.t ; events : events ; + watcher : LogEvent.t Watcher.input ; } + and events = { too_few_connections : unit Lwt_condition.t ; too_many_connections : unit Lwt_condition.t ; - new_point : unit Lwt_condition.t ; + new_peer : unit Lwt_condition.t ; new_connection : unit Lwt_condition.t ; } @@ -195,12 +315,25 @@ and ('msg, 'meta) connection = { type ('msg, 'meta) pool = ('msg, 'meta) t +module PoolEvent = struct + let wait_too_few_connections pool = + Lwt_condition.wait pool.events.too_few_connections + let wait_too_many_connections pool = + Lwt_condition.wait pool.events.too_many_connections + let wait_new_peer pool = + Lwt_condition.wait pool.events.new_peer + let wait_new_connection pool = + Lwt_condition.wait pool.events.new_connection +end + +let watch { watcher } = Watcher.create_stream watcher + module GcPointSet = Utils.Bounded(struct type t = Time.t * Point.t let compare (x, _) (y, _) = - (Time.compare x y) end) -let gc_points { config = { max_known_points } ; known_points } = +let gc_points ({ config = { max_known_points } ; known_points } as pool) = match max_known_points with | None -> () | Some (_, target) -> @@ -217,7 +350,8 @@ let gc_points { config = { max_known_points } ; known_points } = let to_remove = GcPointSet.get table in ListLabels.iter to_remove ~f:begin fun (_, p) -> Point.Table.remove known_points p - end + end ; + LogEvent.gc_points pool.watcher let register_point pool ?trusted (addr, port as point) = match Point.Table.find pool.known_points point with @@ -227,6 +361,7 @@ let register_point pool ?trusted (addr, port as point) = if Point.Table.length pool.known_points >= max then gc_points pool end ; Point.Table.add pool.known_points point pi ; + LogEvent.new_point pool.watcher point ; pi | pi -> pi @@ -245,9 +380,9 @@ module GcGidSet = Utils.Bounded(struct if score_cmp = 0 then Time.compare t t' else - score_cmp end) -let gc_gids { meta_config = { score } ; +let gc_gids ({ meta_config = { score } ; config = { max_known_gids } ; - known_gids ; } = + known_gids ; } as pool) = match max_known_gids with | None -> () | Some (_, target) -> @@ -260,17 +395,19 @@ let gc_gids { meta_config = { score } ; let to_remove = GcGidSet.get table in ListLabels.iter to_remove ~f:begin fun (_, _, gid) -> Gid.Table.remove known_gids gid - end + end ; + LogEvent.gc_gids pool.watcher let register_peer pool gid = match Gid.Table.find pool.known_gids gid with | exception Not_found -> - Lwt_condition.broadcast pool.events.new_point () ; + Lwt_condition.broadcast pool.events.new_peer () ; let peer = Gid_info.create gid ~metadata:pool.meta_config.initial in iter_option pool.config.max_known_gids ~f:begin fun (max, _) -> if Gid.Table.length pool.known_gids >= max then gc_gids pool end ; Gid.Table.add pool.known_gids gid peer ; + LogEvent.new_peer pool.watcher gid ; peer | peer -> peer @@ -329,9 +466,11 @@ let create_connection pool conn id_point pi gi _version = { conn ; point_info = pi ; gid_info = gi ; messages ; canceler ; answerer ; wait_close = false } in iter_option pi ~f:begin fun pi -> + let point = Point_info.point pi in Point_info.State.set_running pi gid conn ; - Point.Table.add pool.connected_points (Point_info.point pi) pi ; + Point.Table.add pool.connected_points point pi ; end ; + LogEvent.connection_established pool.watcher ~id_point ~gid ; Gid_info.State.set_running gi id_point conn ; Gid.Table.add pool.connected_gids gid gi ; Lwt_condition.broadcast pool.events.new_connection () ; @@ -339,17 +478,22 @@ let create_connection pool conn id_point pi gi _version = lwt_debug "Disconnect: %a (%a)" Gid.pp gid Id_point.pp id_point >>= fun () -> iter_option ~f:Point_info.State.set_disconnected pi; + LogEvent.disconnection pool.watcher ~is_external:false ~gid ; Gid_info.State.set_disconnected gi ; iter_option pi ~f:begin fun pi -> Point.Table.remove pool.connected_points (Point_info.point pi) ; end ; Gid.Table.remove pool.connected_gids gid ; - if pool.config.max_connections <= active_connections pool then + if pool.config.max_connections <= active_connections pool then begin Lwt_condition.broadcast pool.events.too_many_connections () ; + LogEvent.too_many_connections pool.watcher ; + end ; P2p_connection.close ~wait:conn.wait_close conn.conn end ; - if active_connections pool < pool.config.min_connections then + if active_connections pool < pool.config.min_connections then begin Lwt_condition.broadcast pool.events.too_few_connections () ; + LogEvent.too_few_connections pool.watcher ; + end ; conn let disconnect ?(wait = false) conn = @@ -379,18 +523,21 @@ let authenticate pool ?pi canceler fd point = ?listening_port:pool.config.listening_port pool.config.identity pool.message_config.versions end ~on_error: begin fun err -> + (* Authentication incorrect! *) (* TODO do something when the error is Not_enough_proof_of_work ?? *) lwt_debug "authenticate: %a%s -> failed %a" Point.pp point (if incoming then " incoming" else "") pp_print_error err >>= fun () -> may_register_my_id_point pool err ; + LogEvent.authentication_failed pool.watcher ~point ; if incoming then Point.Table.remove pool.incoming point else iter_option Point_info.State.set_disconnected pi ; Lwt.return (Error err) end >>=? fun (info, auth_fd) -> + (* Authentication correct! *) lwt_debug "authenticate: %a -> auth %a" Point.pp point Connection_info.pp info >>= fun () -> @@ -428,9 +575,12 @@ let authenticate pool ?pi canceler fd point = | Running _ -> false | Disconnected -> true in - if incoming then Point.Table.remove pool.incoming point ; + if incoming then + Point.Table.remove pool.incoming point ; match acceptable_versions with | Some version when acceptable_gid && acceptable_point -> begin + LogEvent.accepting_request pool.watcher + ~id_point:info.id_point ~point ~gid:info.gid ; iter_option connection_pi ~f:(fun pi -> Point_info.State.set_accepted pi info.gid canceler) ; Gid_info.State.set_accepted gi info.id_point canceler ; @@ -447,6 +597,9 @@ let authenticate pool ?pi canceler fd point = Connection_info.pp info >>= fun () -> Lwt.return conn end ~on_error: begin fun err -> + if incoming then + LogEvent.request_rejected pool.watcher + ~credentials:(info.id_point, info.gid) ~point ; lwt_debug "authenticate: %a -> rejected %a" Point.pp point Connection_info.pp info >>= fun () -> @@ -461,6 +614,8 @@ let authenticate pool ?pi canceler fd point = return (create_connection pool conn id_point connection_pi gi version) end | _ -> begin + LogEvent.rejecting_request pool.watcher + ~id_point:info.id_point ~point ~gid:info.gid ; lwt_debug "authenticate: %a -> kick %a point: %B gid: %B" Point.pp point Connection_info.pp info @@ -504,6 +659,7 @@ let raw_connect canceler pool point = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in lwt_debug "connect: %a" Point.pp point >>= fun () -> Lwt_utils.protect ~canceler begin fun () -> + LogEvent.outgoing_connection pool.watcher ~point ; Lwt_unix.connect fd uaddr >>= fun () -> return () end ~on_error: begin fun err -> @@ -530,6 +686,7 @@ let connect ~timeout pool point = end let accept pool fd point = + LogEvent.incoming_connection pool.watcher ~point ; if pool.config.max_incoming_connections <= Point.Table.length pool.incoming || pool.config.max_connections <= active_connections pool then Lwt.async (fun () -> Lwt_utils.safe_close fd) @@ -600,6 +757,10 @@ module Gids = struct try Some (Gid_info.metadata (Gid.Table.find pool.known_gids gid)) with Not_found -> None + let get_score pool gid = + try Some (pool.meta_config.score @@ Gid_info.metadata (Gid.Table.find pool.known_gids gid)) + with Not_found -> None + let set_metadata pool gid data = Gid_info.set_metadata (register_peer pool gid) data @@ -673,24 +834,14 @@ module Points = struct end -module Events = struct - let too_few_connections pool = - Lwt_condition.wait pool.events.too_few_connections - let too_many_connections pool = - Lwt_condition.wait pool.events.too_many_connections - let new_point pool = - Lwt_condition.wait pool.events.new_point - let new_connection pool = - Lwt_condition.wait pool.events.new_connection -end - - let connection_stat { conn } = P2p_connection.stat conn let pool_stat { io_sched } = P2p_io_scheduler.global_stat io_sched +let score { meta_config = { score }} meta = score meta + let connection_info { conn } = P2p_connection.info conn @@ -700,7 +851,7 @@ let create config meta_config message_config io_sched = let events = { too_few_connections = Lwt_condition.create () ; too_many_connections = Lwt_condition.create () ; - new_point = Lwt_condition.create () ; + new_peer = Lwt_condition.create () ; new_connection = Lwt_condition.create () ; } in let pool = { @@ -714,6 +865,7 @@ let create config meta_config message_config io_sched = io_sched ; encoding = Message.encoding message_config.encoding ; events ; + watcher = Watcher.create_input () ; } in List.iter (Points.set_trusted pool) config.trusted_points ; Gid_info.File.load config.peers_file meta_config.encoding >>= function diff --git a/src/node/net/p2p_connection_pool.mli b/src/node/net/p2p_connection_pool.mli index 62f4e67a2..5ef940626 100644 --- a/src/node/net/p2p_connection_pool.mli +++ b/src/node/net/p2p_connection_pool.mli @@ -66,12 +66,12 @@ type config = { min_connections : int ; (** Strict minimum number of connections - (triggers [Event.too_few_connections]). *) + (triggers [LogEvent.too_few_connections]). *) max_connections : int ; (** Max number of connections. If it's reached, [connect] and [accept] will fail, i.e. not add more connections - (also triggers [Event.too_many_connections]). *) + (also triggers [LogEvent.too_many_connections]). *) max_incoming_connections : int ; (** Max not-yet-authentified incoming connections. @@ -142,15 +142,76 @@ val pool_stat: ('msg, 'meta) pool -> Stat.t (** [pool_stat pool] is a snapshot of current bandwidth usage for the entire [pool]. *) +val score: ('msg, 'meta) pool -> 'meta -> float +(** [score pool meta] is the floating-point score of [meta] using + [pool]'s metrics. *) + (** {2 Pool events} *) -module Events : sig - val too_few_connections: ('msg, 'meta) pool -> unit Lwt.t - val too_many_connections: ('msg, 'meta) pool -> unit Lwt.t - val new_point: ('msg, 'meta) pool -> unit Lwt.t - val new_connection: ('msg, 'meta) pool -> unit Lwt.t +module PoolEvent : sig + val wait_too_few_connections: ('msg, 'meta) pool -> unit Lwt.t + (** [wait_too_few_connections pool] is determined when the number of + connections drops below the desired level. *) + + val wait_too_many_connections: ('msg, 'meta) pool -> unit Lwt.t + (** [wait_too_many_connections pool] is determined when the number of + connections exceeds the desired level. *) + + val wait_new_peer: ('msg, 'meta) pool -> unit Lwt.t + (** [wait_new_peer pool] is determined when a new peer + (i.e. authentication successful) gets added to the pool. *) + + val wait_new_connection: ('msg, 'meta) pool -> unit Lwt.t + (** [wait_new_connection pool] is determined when a new connection is + succesfully established in the pool. *) end +module LogEvent : sig + type t = + (** Pool-level events *) + + | Too_few_connections + | Too_many_connections + + | New_point of Point.t + | New_peer of Gid.t + + (** Connection-level events *) + + | Incoming_connection of Point.t + (** We accept(2)-ed an incoming connection *) + | Outgoing_connection of Point.t + (** We connect(2)-ed to a remote endpoint *) + | Authentication_failed of Point.t + (** Remote point failed authentication *) + + | Accepting_request of Point.t * Id_point.t * Gid.t + (** We accepted a connection after authentifying the remote peer. *) + | Rejecting_request of Point.t * Id_point.t * Gid.t + (** We rejected a connection after authentifying the remote peer. *) + | Request_rejected of Point.t * (Id_point.t * Gid.t) option + (** The remote peer rejected our connection. *) + + | Connection_established of Id_point.t * Gid.t + (** We succesfully established a authentified connection. *) + + | Disconnection of Gid.t + (** We decided to close the connection. *) + | External_disconnection of Gid.t + (** The connection was closed for external reason. *) + + | Gc_points + (** Garbage correction of known point table has been triggered. *) + | Gc_gids + (** Garbage correction of known gids table has been triggered. *) + + val encoding : t Data_encoding.t +end + +val watch: ('msg, 'meta) pool -> LogEvent.t Lwt_stream.t * Watcher.stopper +(** [watch pool] is a [stream, close] a [stream] of events and a + [close] function for this stream. *) + (** {1 Connections management} *) type ('msg, 'meta) connection @@ -241,6 +302,7 @@ module Gids : sig val get_metadata: ('msg, 'meta) pool -> Gid.t -> 'meta option val set_metadata: ('msg, 'meta) pool -> Gid.t -> 'meta -> unit + val get_score: ('msg, 'meta) pool -> Gid.t -> float option val get_trusted: ('msg, 'meta) pool -> Gid.t -> bool val set_trusted: ('msg, 'meta) pool -> Gid.t -> unit diff --git a/src/node/net/p2p_connection_pool_types.ml b/src/node/net/p2p_connection_pool_types.ml index b550d4f87..8191dccee 100644 --- a/src/node/net/p2p_connection_pool_types.ml +++ b/src/node/net/p2p_connection_pool_types.ml @@ -30,11 +30,55 @@ module Point_info = struct | Disconnection of Gid.t | External_disconnection of Gid.t + let kind_encoding = + let open Data_encoding in + let branch_encoding name obj = + conv (fun x -> (), x) (fun ((), x) -> x) + (merge_objs + (obj1 (req "event" (constant name))) obj) in + union ~tag_size:`Uint8 [ + case ~tag:0 (branch_encoding "outgoing_request" empty) + (function Outgoing_request -> Some () | _ -> None) + (fun () -> Outgoing_request) ; + case ~tag:1 (branch_encoding "accepting_request" + (obj1 (req "gid" Gid.encoding))) + (function Accepting_request gid -> Some gid | _ -> None) + (fun gid -> Accepting_request gid) ; + case ~tag:2 (branch_encoding "rejecting_request" + (obj1 (req "gid" Gid.encoding))) + (function Rejecting_request gid -> Some gid | _ -> None) + (fun gid -> Rejecting_request gid) ; + case ~tag:3 (branch_encoding "request_rejected" + (obj1 (opt "gid" Gid.encoding))) + (function Request_rejected gid -> Some gid | _ -> None) + (fun gid -> Request_rejected gid) ; + case ~tag:4 (branch_encoding "rejecting_request" + (obj1 (req "gid" Gid.encoding))) + (function Connection_established gid -> Some gid | _ -> None) + (fun gid -> Connection_established gid) ; + case ~tag:5 (branch_encoding "rejecting_request" + (obj1 (req "gid" Gid.encoding))) + (function Disconnection gid -> Some gid | _ -> None) + (fun gid -> Disconnection gid) ; + case ~tag:6 (branch_encoding "rejecting_request" + (obj1 (req "gid" Gid.encoding))) + (function External_disconnection gid -> Some gid | _ -> None) + (fun gid -> External_disconnection gid) ; + ] + type t = { kind : kind ; timestamp : Time.t ; } + let encoding = + let open Data_encoding in + conv + (fun { kind ; timestamp ; } -> (kind, timestamp)) + (fun (kind, timestamp) -> { kind ; timestamp ; }) + (obj2 + (req "kind" kind_encoding) + (req "timestamp" Time.encoding)) end type greylisting_config = { @@ -55,6 +99,7 @@ module Point_info = struct mutable greylisting_delay : float ; mutable greylisting_end : Time.t ; events : Event.t Ring.t ; + watchers : Event.t Watcher.input ; } type 'data point_info = 'data t @@ -81,7 +126,8 @@ module Point_info = struct events = Ring.create log_size ; greylisting = greylisting_config ; greylisting_delay = 1. ; - greylisting_end = Time.now () ; + greylisting_end = Time.epoch ; + watchers = Watcher.create_input () ; } let point s = s.point @@ -94,6 +140,7 @@ module Point_info = struct let last_rejected_connection s = s.last_rejected_connection let greylisted ?(now = Time.now ()) s = Time.compare now s.greylisting_end <= 0 + let greylisted_end s = s.greylisting_end let recent a1 a2 = match a1, a2 with @@ -118,8 +165,12 @@ module Point_info = struct let fold_events { events } ~init ~f = Ring.fold events ~init ~f - let log { events } ?(timestamp = Time.now ()) kind = - Ring.add events { kind ; timestamp } + let watch { watchers } = Watcher.create_stream watchers + + let log { events ; watchers } ?(timestamp = Time.now ()) kind = + let event = { Event.kind ; timestamp } in + Ring.add events event ; + Watcher.notify watchers event let log_incoming_rejection ?timestamp point_info gid = log point_info ?timestamp (Rejecting_request gid) @@ -287,11 +338,12 @@ module Gid_info = struct mutable state : 'conn state ; mutable metadata : 'meta ; mutable trusted : bool ; - events : Event.t Ring.t ; mutable last_failed_connection : (Id_point.t * Time.t) option ; mutable last_rejected_connection : (Id_point.t * Time.t) option ; mutable last_established_connection : (Id_point.t * Time.t) option ; mutable last_disconnection : (Id_point.t * Time.t) option ; + events : Event.t Ring.t ; + watchers : Event.t Watcher.input ; } type ('conn, 'meta) gid_info = ('conn, 'meta) t @@ -310,6 +362,7 @@ module Gid_info = struct last_established_connection = None ; last_disconnection = None ; events = Ring.create log_size ; + watchers = Watcher.create_input () ; } let encoding metadata_encoding = @@ -327,12 +380,14 @@ module Gid_info = struct let info = create ~trusted ~metadata gid in let events = Ring.create log_size in Ring.add_list info.events event_list ; - { gid ; created ; state = Disconnected ; - trusted ; metadata ; events ; + { state = Disconnected ; + trusted ; gid ; metadata ; created ; last_failed_connection ; last_rejected_connection ; last_established_connection ; last_disconnection ; + events ; + watchers = Watcher.create_input () ; }) (obj9 (req "gid" Gid.encoding) @@ -373,8 +428,12 @@ module Gid_info = struct s.last_failed_connection (recent s.last_rejected_connection s.last_disconnection) - let log { events } ?(timestamp = Time.now ()) point kind = - Ring.add events { kind ; timestamp ; point } + let log { events ; watchers } ?(timestamp = Time.now ()) point kind = + let event = { Event.kind ; timestamp ; point } in + Ring.add events event ; + Watcher.notify watchers event + + let watch { watchers } = Watcher.create_stream watchers let log_incoming_rejection ?timestamp gid_info point = log gid_info ?timestamp point Rejecting_request diff --git a/src/node/net/p2p_connection_pool_types.mli b/src/node/net/p2p_connection_pool_types.mli index 2773c7743..bbfab729a 100644 --- a/src/node/net/p2p_connection_pool_types.mli +++ b/src/node/net/p2p_connection_pool_types.mli @@ -68,6 +68,8 @@ module Point_info : sig val greylisted : ?now:Time.t -> 'conn point_info -> bool + val greylisted_end : 'conn point_info -> Time.t + val point : 'conn point_info -> Point.t module State : sig @@ -130,11 +132,15 @@ module Point_info : sig timestamp : Time.t ; } + val encoding : t Data_encoding.t end val fold_events : 'conn point_info -> init:'a -> f:('a -> Event.t -> 'a) -> 'a + val watch : + 'conn point_info -> Event.t Lwt_stream.t * Watcher.stopper + val log_incoming_rejection : ?timestamp:Time.t -> 'conn point_info -> Gid.t -> unit @@ -252,11 +258,15 @@ module Gid_info : sig point : Id_point.t ; } + val encoding : t Data_encoding.t end val fold_events : ('conn, 'meta) gid_info -> init:'a -> f:('a -> Event.t -> 'a) -> 'a + val watch : + ('conn, 'meta) gid_info -> Event.t Lwt_stream.t * Watcher.stopper + val log_incoming_rejection : ?timestamp:Time.t -> ('conn, 'meta) gid_info -> Id_point.t -> unit diff --git a/src/node/net/p2p_maintenance.ml b/src/node/net/p2p_maintenance.ml index 0def0ede7..05b10fc81 100644 --- a/src/node/net/p2p_maintenance.ml +++ b/src/node/net/p2p_maintenance.ml @@ -125,7 +125,7 @@ and too_few_connections st n_connected = P2p_connection_pool.broadcast_bootstrap_msg pool ; Lwt_utils.protect ~canceler:st.canceler begin fun () -> Lwt.pick [ - P2p_connection_pool.Events.new_point pool ; + P2p_connection_pool.PoolEvent.wait_new_peer pool ; Lwt_unix.sleep 5.0 (* TODO exponential back-off ?? or wait for the existence of a non grey-listed peer ?? *) @@ -154,8 +154,8 @@ let rec worker_loop st = Lwt.pick [ Lwt_unix.sleep 120. ; (* every two minutes *) Lwt_condition.wait st.please_maintain ; (* when asked *) - P2p_connection_pool.Events.too_few_connections pool ; (* limits *) - P2p_connection_pool.Events.too_many_connections pool + P2p_connection_pool.PoolEvent.wait_too_few_connections pool ; (* limits *) + P2p_connection_pool.PoolEvent.wait_too_many_connections pool ] >>= fun () -> return () end >>=? fun () -> diff --git a/src/node/shell/node.ml b/src/node/shell/node.ml index 9cfb2b335..6dc1b21a4 100644 --- a/src/node/shell/node.ml +++ b/src/node/shell/node.ml @@ -7,10 +7,9 @@ (* *) (**************************************************************************) +open Lwt.Infix open Logging.Node.Worker -let (>|=) = Lwt.(>|=) - let inject_operation validator ?force bytes = let t = match Store.Operation.of_bytes bytes with @@ -186,6 +185,7 @@ type t = { ?force:bool -> MBytes.t -> (Operation_hash.t * unit tzresult Lwt.t) Lwt.t ; inject_protocol: ?force:bool -> Store.protocol -> (Protocol_hash.t * unit tzresult Lwt.t) Lwt.t ; + p2p: Tezos_p2p.net ; (* For P2P RPCs *) shutdown: unit -> unit Lwt.t ; } @@ -290,6 +290,7 @@ let create { genesis ; store_root ; context_root ; inject_block = inject_block state validator ; inject_operation = inject_operation validator ; inject_protocol = inject_protocol state ; + p2p ; shutdown ; } @@ -593,4 +594,56 @@ module RPC = struct Validator.fetch_block net_v block >>=? fun _ -> return () + module Network = struct + let stat (node : t) = + Tezos_p2p.RPC.stat node.p2p + + let watch (node : t) = + Tezos_p2p.RPC.watch node.p2p + + let connect (node : t) = + Tezos_p2p.RPC.connect node.p2p + + module Connection = struct + let info (node : t) = + Tezos_p2p.RPC.Connection.info node.p2p + + let kick (node : t) = + Tezos_p2p.RPC.Connection.kick node.p2p + + let list (node : t) = + Tezos_p2p.RPC.Connection.list node.p2p + + let count (node : t) = + Tezos_p2p.RPC.Connection.count node.p2p + end + + module Point = struct + let info (node : t) = + Tezos_p2p.RPC.Point.info node.p2p + + let infos (node : t) restrict = + Tezos_p2p.RPC.Point.infos ~restrict node.p2p + + let events (node : t) = + Tezos_p2p.RPC.Point.events node.p2p + + let watch (node : t) = + Tezos_p2p.RPC.Point.watch node.p2p + end + + module Gid = struct + let info (node : t) = + Tezos_p2p.RPC.Gid.info node.p2p + + let infos (node : t) restrict = + Tezos_p2p.RPC.Gid.infos ~restrict node.p2p + + let events (node : t) = + Tezos_p2p.RPC.Gid.events node.p2p + + let watch (node : t) = + Tezos_p2p.RPC.Gid.watch node.p2p + end + end end diff --git a/src/node/shell/node.mli b/src/node/shell/node.mli index eb5c13163..41b686491 100644 --- a/src/node/shell/node.mli +++ b/src/node/shell/node.mli @@ -80,6 +80,36 @@ module RPC : sig val complete: t -> ?block:block -> string -> string list Lwt.t + module Network : sig + val stat : t -> P2p.Stat.t + val watch : t -> P2p.RPC.Event.t Lwt_stream.t * Watcher.stopper + val connect : t -> P2p.Point.t -> float -> unit tzresult Lwt.t + + module Connection : sig + val info : t -> P2p.Gid.t -> P2p.Connection_info.t option + val kick : t -> P2p.Gid.t -> bool -> unit Lwt.t + val list : t -> P2p.Connection_info.t list + val count : t -> int + end + + module Gid : sig + val infos : t -> + P2p.RPC.Gid.state list -> (P2p.Gid.t * P2p.RPC.Gid.info) list + val info : t -> P2p.Gid.t -> P2p.RPC.Gid.info option + val events : t -> P2p.Gid.t -> P2p.RPC.Gid.Event.t list + val watch : t -> P2p.Gid.t -> + P2p.RPC.Gid.Event.t Lwt_stream.t * Watcher.stopper + end + + module Point : sig + val infos : t -> + P2p.RPC.Point.state list -> (P2p.Point.t * P2p.RPC.Point.info) list + val info : t -> P2p.Point.t -> P2p.RPC.Point.info option + val events : t -> P2p.Point.t -> P2p.RPC.Point.Event.t list + val watch : t -> P2p.Point.t -> + P2p.RPC.Point.Event.t Lwt_stream.t * Watcher.stopper + end + end end val shutdown: t -> unit Lwt.t diff --git a/src/node/shell/node_rpc.ml b/src/node/shell/node_rpc.ml index efb3f9870..00c087911 100644 --- a/src/node/shell/node_rpc.ml +++ b/src/node/shell/node_rpc.ml @@ -442,6 +442,95 @@ let build_rpc_directory node = RPC.register2 dir Services.Blocks.complete (fun block s () -> Node.RPC.complete node ~block s >>= RPC.Answer.return) in + + (* Network : Global *) + + let dir = + let implementation () = + Node.RPC.Network.stat node |> RPC.Answer.return in + RPC.register0 dir Services.Network.stat implementation in + let dir = + let implementation () = + let stream, stopper = Node.RPC.Network.watch node in + let shutdown () = Watcher.shutdown stopper in + let next () = Lwt_stream.get stream in + RPC.Answer.return_stream { next ; shutdown } in + RPC.register0 dir Services.Network.events implementation in + let dir = + let implementation point timeout = + Node.RPC.Network.connect node point timeout >>= RPC.Answer.return in + RPC.register1 dir Services.Network.connect implementation in + + (* Network : Connection *) + + let dir = + let implementation gid () = + Node.RPC.Network.Connection.info node gid |> RPC.Answer.return in + RPC.register1 dir Services.Network.Connection.info implementation in + let dir = + let implementation gid wait = + Node.RPC.Network.Connection.kick node gid wait >>= RPC.Answer.return in + RPC.register1 dir Services.Network.Connection.kick implementation in + let dir = + let implementation () = + Node.RPC.Network.Connection.list node |> RPC.Answer.return in + RPC.register0 dir Services.Network.Connection.list implementation in + + (* Network : Gid *) + + let dir = + let implementation state = + Node.RPC.Network.Gid.infos node state |> RPC.Answer.return in + RPC.register0 dir Services.Network.Gid.infos implementation in + let dir = + let implementation gid () = + Node.RPC.Network.Gid.info node gid |> RPC.Answer.return in + RPC.register1 dir Services.Network.Gid.info implementation in + let dir = + let implementation gid monitor = + if monitor then + let stream, stopper = Node.RPC.Network.Gid.watch node gid in + let shutdown () = Watcher.shutdown stopper in + let first_request = ref true in + let next () = + if not !first_request then begin + Lwt_stream.get stream >|= map_option ~f:(fun i -> [i]) + end else begin + first_request := false ; + Lwt.return_some @@ Node.RPC.Network.Gid.events node gid + end in + RPC.Answer.return_stream { next ; shutdown } + else + Node.RPC.Network.Gid.events node gid |> RPC.Answer.return in + RPC.register1 dir Services.Network.Gid.events implementation in + + (* Network : Point *) + + let dir = + let implementation state = + Node.RPC.Network.Point.infos node state |> RPC.Answer.return in + RPC.register0 dir Services.Network.Point.infos implementation in + let dir = + let implementation point () = + Node.RPC.Network.Point.info node point |> RPC.Answer.return in + RPC.register1 dir Services.Network.Point.info implementation in + let dir = + let implementation point monitor = + if monitor then + let stream, stopper = Node.RPC.Network.Point.watch node point in + let shutdown () = Watcher.shutdown stopper in + let first_request = ref true in + let next () = + if not !first_request then begin + Lwt_stream.get stream >|= map_option ~f:(fun i -> [i]) + end else begin + first_request := false ; + Lwt.return_some @@ Node.RPC.Network.Point.events node point + end in + RPC.Answer.return_stream { next ; shutdown } + else + Node.RPC.Network.Point.events node point |> RPC.Answer.return in + RPC.register1 dir Services.Network.Point.events implementation in let dir = RPC.register_describe_directory_service dir Services.describe in dir diff --git a/src/node/shell/node_rpc_services.ml b/src/node/shell/node_rpc_services.ml index 389f6df70..01eba12aa 100644 --- a/src/node/shell/node_rpc_services.ml +++ b/src/node/shell/node_rpc_services.ml @@ -484,6 +484,114 @@ module Protocols = struct RPC.Path.(root / "protocols") end +module Network = struct + open P2p_types + let (gid_arg : P2p_types.Gid.t RPC.Arg.arg) = + RPC.Arg.make + ~name:"gid" + ~descr:"A network global identifier, also known as an identity." + ~destruct:(fun s -> try + Ok (Crypto_box.Public_key_hash.of_b58check s) + with Failure msg -> Error msg) + ~construct:Crypto_box.Public_key_hash.to_b58check + () + + let point_arg = + RPC.Arg.make + ~name:"point" + ~descr:"A network point (ipv4:port or [ipv6]:port)." + ~destruct:Point.of_string + ~construct:Point.to_string + () + + let stat = + RPC.service + ~description:"Global network bandwidth statistics in B/s." + ~input: empty + ~output: P2p.Stat.encoding + RPC.Path.(root / "network" / "stat") + + let events = + RPC.service + ~description:"Stream of all network events" + ~input: empty + ~output: P2p.RPC.Event.encoding + RPC.Path.(root / "network" / "log") + + let connect = + RPC.service + ~description:"Connect to a peer" + ~input: (obj1 (dft "timeout" float 5.)) + ~output: (Error.wrap @@ empty) + RPC.Path.(root / "network" / "connect" /: point_arg) + + let monitor_encoding = obj1 (dft "monitor" bool false) + + module Connection = struct + let list = + RPC.service + ~input: empty + ~output: (list P2p.Connection_info.encoding) + RPC.Path.(root / "network" / "connection") + + let info = + RPC.service + ~input: empty + ~output: (option P2p.Connection_info.encoding) + RPC.Path.(root / "network" / "connection" /: gid_arg) + + let kick = + RPC.service + ~input: (obj1 (req "wait" bool)) + ~output: empty + RPC.Path.(root / "network" / "connection" /: gid_arg / "kick") + end + + module Point = struct + let infos = + let filter = + obj1 (dft "filter" (list P2p.RPC.Point.state_encoding) []) in + RPC.service + ~input: filter + ~output: (list (tup2 P2p.Point.encoding P2p.RPC.Point.info_encoding)) + RPC.Path.(root / "network" / "point") + + let info = + RPC.service + ~input: empty + ~output: (option P2p.RPC.Point.info_encoding) + RPC.Path.(root / "network" / "point" /: point_arg) + + let events = + RPC.service + ~input: monitor_encoding + ~output: (list P2p.RPC.Point.Event.encoding) + RPC.Path.(root / "network" / "point" /: point_arg / "log") + end + + module Gid = struct + let infos = + let filter = + obj1 (dft "filter" (list P2p.RPC.Gid.state_encoding) []) in + RPC.service + ~input: filter + ~output: (list (tup2 P2p.Gid.encoding P2p.RPC.Gid.info_encoding)) + RPC.Path.(root / "network" / "gid") + + let info = + RPC.service + ~input: empty + ~output: (option P2p.RPC.Gid.info_encoding) + RPC.Path.(root / "network" / "gid" /: gid_arg) + + let events = + RPC.service + ~input: monitor_encoding + ~output: (list P2p.RPC.Gid.Event.encoding) + RPC.Path.(root / "network" / "gid" /: gid_arg / "log") + end +end + let forge_block = RPC.service ~description: "Forge a block header" diff --git a/src/node/shell/node_rpc_services.mli b/src/node/shell/node_rpc_services.mli index 5ad9771a9..10e990557 100644 --- a/src/node/shell/node_rpc_services.mli +++ b/src/node/shell/node_rpc_services.mli @@ -119,6 +119,46 @@ module Protocols : sig list_param, (Protocol_hash.t * Store.protocol option) list) RPC.service end +module Network : sig + val stat : + (unit, unit, unit, P2p.Stat.t) RPC.service + + val events : + (unit, unit, unit, P2p.RPC.Event.t) RPC.service + + val connect : + (unit, unit * P2p.Point.t, float, unit tzresult) RPC.service + + module Connection : sig + val list : + (unit, unit, unit, P2p.Connection_info.t list) RPC.service + val info : + (unit, unit * P2p.Gid.t, unit, P2p.Connection_info.t option) RPC.service + val kick : + (unit, unit * P2p.Gid.t, bool, unit) RPC.service + end + + module Point : sig + val infos : + (unit, unit, P2p.RPC.Point.state list, + (P2p.Point.t * P2p.RPC.Point.info) list) RPC.service + val info : + (unit, unit * P2p.Point.t, unit, P2p.RPC.Point.info option) RPC.service + val events : + (unit, unit * P2p.Point.t, bool, P2p.RPC.Point.Event.t list) RPC.service + end + + module Gid : sig + val infos : + (unit, unit, P2p.RPC.Gid.state list, + (P2p.Gid.t * P2p.RPC.Gid.info) list) RPC.service + val info : + (unit, unit * P2p.Gid.t, unit, P2p.RPC.Gid.info option) RPC.service + val events : + (unit, unit * P2p.Gid.t, bool, P2p.RPC.Gid.Event.t list) RPC.service + end +end + val forge_block: (unit, unit, Updater.net_id option * Block_hash.t option * Time.t option * diff --git a/src/node/shell/tezos_p2p.ml b/src/node/shell/tezos_p2p.ml index a0c55cf7d..899d92e5e 100644 --- a/src/node/shell/tezos_p2p.ml +++ b/src/node/shell/tezos_p2p.ml @@ -97,7 +97,7 @@ end let meta_cfg : _ P2p.meta_config = { P2p.encoding = Metadata.encoding ; initial = Metadata.initial ; - score = Metadata.score + score = Metadata.score ; } and msg_cfg : _ P2p.message_config = { @@ -106,6 +106,7 @@ and msg_cfg : _ P2p.message_config = { } type net = (Message.t, Metadata.t) P2p.net +type pool = (Message.t, Metadata.t) P2p_connection_pool.t let create ~config ~limits = P2p.create ~config ~limits meta_cfg msg_cfg @@ -135,3 +136,40 @@ module Raw = struct let encoding = P2p.Raw.encoding msg_cfg.encoding let supported_versions = msg_cfg.versions end + +module RPC = struct + let stat net = P2p.RPC.stat net + + module Event = P2p.RPC.Event + + let watch = P2p.RPC.watch + + let connect = P2p.RPC.connect + + module Connection = struct + let info = P2p.RPC.Connection.info + let kick = P2p.RPC.Connection.kick + let list = P2p.RPC.Connection.list + let count = P2p.RPC.Connection.count + end + + module Point = struct + type info = P2p.RPC.Point.info + module Event = P2p_connection_pool_types.Point_info.Event + + let info = P2p.RPC.Point.info + let events = P2p.RPC.Point.events + let infos = P2p.RPC.Point.infos + let watch = P2p.RPC.Point.watch + end + + module Gid = struct + type info = P2p.RPC.Gid.info + module Event = P2p_connection_pool_types.Gid_info.Event + + let info = P2p.RPC.Gid.info + let events = P2p.RPC.Gid.events + let infos = P2p.RPC.Gid.infos + let watch = P2p.RPC.Gid.watch + end +end diff --git a/src/node/shell/tezos_p2p.mli b/src/node/shell/tezos_p2p.mli index 0f1111e40..7e6dc6d37 100644 --- a/src/node/shell/tezos_p2p.mli +++ b/src/node/shell/tezos_p2p.mli @@ -82,3 +82,38 @@ module Raw : sig val encoding: message Data_encoding.t val supported_versions: Version.t list end + +module RPC : sig + val stat : net -> Stat.t + + module Event = P2p_connection_pool.LogEvent + val watch : net -> Event.t Lwt_stream.t * Watcher.stopper + val connect : net -> Point.t -> float -> unit tzresult Lwt.t + + module Connection : sig + val info : net -> Gid.t -> Connection_info.t option + val kick : net -> Gid.t -> bool -> unit Lwt.t + val list : net -> Connection_info.t list + val count : net -> int + end + + module Point : sig + open P2p.RPC.Point + module Event = Event + + val info : net -> Point.t -> info option + val events : ?max:int -> ?rev:bool -> net -> Point.t -> Event.t list + val infos : ?restrict:state list -> net -> (Point.t * info) list + val watch : net -> Point.t -> Event.t Lwt_stream.t * Watcher.stopper + end + + module Gid : sig + open P2p.RPC.Gid + module Event = Event + + val info : net -> Gid.t -> info option + val events : ?max:int -> ?rev:bool -> net -> Gid.t -> Event.t list + val infos : ?restrict:state list -> net -> (Gid.t * info) list + val watch : net -> Gid.t -> Event.t Lwt_stream.t * Watcher.stopper + end +end