diff --git a/src/lib_base/p2p_point.ml b/src/lib_base/p2p_point.ml index db7a6c843..2b8299620 100644 --- a/src/lib_base/p2p_point.ml +++ b/src/lib_base/p2p_point.ml @@ -93,6 +93,14 @@ module Id = struct let encoding = Data_encoding.conv to_string of_string_exn Data_encoding.string + let rpc_arg = + RPC_arg.make + ~name:"point" + ~descr:"A network point (ipv4:port or [ipv6]:port)." + ~destruct:of_string + ~construct:to_string + () + end module Map = Map.Make (Id) diff --git a/src/lib_base/p2p_point.mli b/src/lib_base/p2p_point.mli index b806f532c..2a8c966b5 100644 --- a/src/lib_base/p2p_point.mli +++ b/src/lib_base/p2p_point.mli @@ -23,6 +23,7 @@ module Id : sig val is_global : t -> bool val parse_addr_port : string -> string * string + val rpc_arg : t RPC_arg.t end module Map : Map.S with type key = Id.t diff --git a/src/lib_client_base/client_network.ml b/src/lib_client_base/client_network.ml index 9dbcf006c..2bb1c815c 100644 --- a/src/lib_client_base/client_network.ml +++ b/src/lib_client_base/client_network.ml @@ -16,10 +16,10 @@ let commands () = [ command ~group ~desc: "show global network status" no_options (prefixes ["network" ; "stat"] stop) begin fun () (cctxt : Client_commands.full_context) -> - Client_node_rpcs.Network.stat cctxt >>=? fun stat -> - Client_node_rpcs.Network.connections cctxt >>=? fun conns -> - Client_node_rpcs.Network.peers cctxt >>=? fun peers -> - Client_node_rpcs.Network.points cctxt >>=? fun points -> + P2p_services.stat cctxt >>=? fun stat -> + P2p_services.Connections.list cctxt >>=? fun conns -> + P2p_services.Peers.list cctxt >>=? fun peers -> + P2p_services.Points.list cctxt >>=? fun points -> cctxt#message "GLOBAL STATS" >>= fun () -> cctxt#message " %a" P2p_stat.pp stat >>= fun () -> cctxt#message "CONNECTIONS" >>= fun () -> diff --git a/src/lib_client_base/client_node_rpcs.ml b/src/lib_client_base/client_node_rpcs.ml index 4c5683984..f702a4467 100644 --- a/src/lib_client_base/client_node_rpcs.ml +++ b/src/lib_client_base/client_node_rpcs.ml @@ -57,19 +57,3 @@ module Protocols = struct { contents; monitor = Some false } end - -module Network = struct - - let stat cctxt = - call_service0 cctxt P2p_services.stat () - - let connections cctxt = - call_service0 cctxt P2p_services.Connection.list () - - let peers cctxt = - call_service0 cctxt P2p_services.Peer_id.list [] - - let points cctxt = - call_service0 cctxt P2p_services.Point.list [] - -end diff --git a/src/lib_client_base/client_node_rpcs.mli b/src/lib_client_base/client_node_rpcs.mli index 8da7160c3..c60a5d109 100644 --- a/src/lib_client_base/client_node_rpcs.mli +++ b/src/lib_client_base/client_node_rpcs.mli @@ -54,22 +54,6 @@ end val bootstrapped: #Client_rpcs.ctxt -> (Block_hash.t * Time.t) Lwt_stream.t tzresult Lwt.t -module Network : sig - - val stat: - #Client_rpcs.ctxt -> P2p_stat.t tzresult Lwt.t - - val connections: - #Client_rpcs.ctxt -> P2p_connection.Info.t list tzresult Lwt.t - - val peers: - #Client_rpcs.ctxt -> (P2p_peer.Id.t * P2p_peer.Info.t) list tzresult Lwt.t - - val points: - #Client_rpcs.ctxt -> (P2p_point.Id.t * P2p_point.Info.t) list tzresult Lwt.t - -end - val complete: #Client_rpcs.ctxt -> ?block:Block_services.block -> string -> string list tzresult Lwt.t diff --git a/src/lib_shell/node_rpc.ml b/src/lib_shell/node_rpc.ml index 1905a7e0d..63dee2d4c 100644 --- a/src/lib_shell/node_rpc.ml +++ b/src/lib_shell/node_rpc.ml @@ -522,48 +522,52 @@ let build_rpc_directory node = let dir = let implementation () () = Node.RPC.Network.stat node |> RPC_answer.return in - RPC_directory.register0 dir P2p_services.stat implementation in + RPC_directory.register0 dir P2p_services.S.stat implementation in let dir = let implementation () () = RPC_answer.return Distributed_db.Raw.supported_versions in - RPC_directory.register0 dir P2p_services.versions implementation in + RPC_directory.register0 dir P2p_services.S.versions implementation in let dir = let implementation () () = let stream, stopper = Node.RPC.Network.watch node in let shutdown () = Lwt_watcher.shutdown stopper in let next () = Lwt_stream.get stream in RPC_answer.return_stream { next ; shutdown } in - RPC_directory.register0 dir P2p_services.events implementation in + RPC_directory.register0 dir P2p_services.S.events implementation in let dir = let implementation point () timeout = Node.RPC.Network.connect node point timeout >>= RPC_answer.return in - RPC_directory.register1 dir P2p_services.connect implementation in + RPC_directory.register1 dir P2p_services.S.connect implementation in (* Network : Connection *) let dir = let implementation peer_id () () = - Node.RPC.Network.Connection.info node peer_id |> RPC_answer.return in - RPC_directory.register1 dir P2p_services.Connection.info implementation in + match Node.RPC.Network.Connection.info node peer_id with + | None -> raise Not_found + | Some v -> RPC_answer.return v in + RPC_directory.register1 dir P2p_services.Connections.S.info implementation in let dir = let implementation peer_id () wait = Node.RPC.Network.Connection.kick node peer_id wait >>= RPC_answer.return in - RPC_directory.register1 dir P2p_services.Connection.kick implementation in + RPC_directory.register1 dir P2p_services.Connections.S.kick implementation in let dir = let implementation () () = Node.RPC.Network.Connection.list node |> RPC_answer.return in - RPC_directory.register0 dir P2p_services.Connection.list implementation in + RPC_directory.register0 dir P2p_services.Connections.S.list implementation in (* Network : Peer_id *) let dir = let implementation () state = Node.RPC.Network.Peer_id.list node ~restrict:state |> RPC_answer.return in - RPC_directory.register0 dir P2p_services.Peer_id.list implementation in + RPC_directory.register0 dir P2p_services.Peers.S.list implementation in let dir = let implementation peer_id () () = - Node.RPC.Network.Peer_id.info node peer_id |> RPC_answer.return in - RPC_directory.register1 dir P2p_services.Peer_id.info implementation in + match Node.RPC.Network.Peer_id.info node peer_id with + | None -> raise Not_found + | Some v -> RPC_answer.return v in + RPC_directory.register1 dir P2p_services.Peers.S.info implementation in let dir = let implementation peer_id () monitor = if monitor then @@ -580,18 +584,20 @@ let build_rpc_directory node = RPC_answer.return_stream { next ; shutdown } else Node.RPC.Network.Peer_id.events node peer_id |> RPC_answer.return in - RPC_directory.register1 dir P2p_services.Peer_id.events implementation in + RPC_directory.register1 dir P2p_services.Peers.S.events implementation in (* Network : Point *) let dir = let implementation () state = Node.RPC.Network.Point.list node ~restrict:state |> RPC_answer.return in - RPC_directory.register0 dir P2p_services.Point.list implementation in + RPC_directory.register0 dir P2p_services.Points.S.list implementation in let dir = let implementation point () () = - Node.RPC.Network.Point.info node point |> RPC_answer.return in - RPC_directory.register1 dir P2p_services.Point.info implementation in + match Node.RPC.Network.Point.info node point with + | None -> raise Not_found + | Some v -> RPC_answer.return v in + RPC_directory.register1 dir P2p_services.Points.S.info implementation in let dir = let implementation point () monitor = if monitor then @@ -608,7 +614,7 @@ let build_rpc_directory node = RPC_answer.return_stream { next ; shutdown } else Node.RPC.Network.Point.events node point |> RPC_answer.return in - RPC_directory.register1 dir P2p_services.Point.events implementation in + RPC_directory.register1 dir P2p_services.Points.S.events implementation in let dir = RPC_directory.register_describe_directory_service dir Shell_services.describe in dir diff --git a/src/lib_shell_services/p2p_services.ml b/src/lib_shell_services/p2p_services.ml index 009079ede..a96ccf6d1 100644 --- a/src/lib_shell_services/p2p_services.ml +++ b/src/lib_shell_services/p2p_services.ml @@ -7,143 +7,173 @@ (* *) (**************************************************************************) -let point_arg = - RPC_arg.make - ~name:"point" - ~descr:"A network point (ipv4:port or [ipv6]:port)." - ~destruct:P2p_point.Id.of_string - ~construct:P2p_point.Id.to_string - () +module S = struct -let versions = - RPC_service.post_service - ~description:"Supported network layer versions." - ~query: RPC_query.empty - ~input: Data_encoding.empty - ~output: (Data_encoding.list P2p_version.encoding) - RPC_path.(root / "network" / "versions") + let versions = + RPC_service.post_service + ~description:"Supported network layer versions." + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: (Data_encoding.list P2p_version.encoding) + RPC_path.(root / "network" / "versions") -let stat = - RPC_service.post_service - ~description:"Global network bandwidth statistics in B/s." - ~query: RPC_query.empty - ~input: Data_encoding.empty - ~output: P2p_stat.encoding - RPC_path.(root / "network" / "stat") + let stat = + RPC_service.post_service + ~description:"Global network bandwidth statistics in B/s." + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: P2p_stat.encoding + RPC_path.(root / "network" / "stat") -let events = - RPC_service.post_service - ~description:"Stream of all network events" - ~query: RPC_query.empty - ~input: Data_encoding.empty - ~output: P2p_connection.Pool_event.encoding - RPC_path.(root / "network" / "log") + let events = + RPC_service.post_service + ~description:"Stream of all network events" + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: P2p_connection.Pool_event.encoding + RPC_path.(root / "network" / "log") -let connect = - RPC_service.post_service - ~description:"Connect to a peer" - ~query: RPC_query.empty - ~input: Data_encoding.(obj1 (dft "timeout" float 5.)) - ~output: (RPC_error.wrap Data_encoding.empty) - RPC_path.(root / "network" / "connect" /: point_arg) + let connect = + RPC_service.post_service + ~description:"Connect to a peer" + ~query: RPC_query.empty + ~input: Data_encoding.(obj1 (dft "timeout" float 5.)) + ~output: (RPC_error.wrap Data_encoding.empty) + RPC_path.(root / "network" / "connect" /: P2p_point.Id.rpc_arg) + +end + +open RPC_context +let stat ctxt = make_call S.stat ctxt () () () +let versions ctxt = make_call S.versions ctxt () () () +let events ctxt = make_streamed_call S.events ctxt () () () +let connect ctxt ~timeout peer_id = + make_err_call1 S.connect ctxt peer_id () timeout let monitor_encoding = Data_encoding.(obj1 (dft "monitor" bool false)) -module Connection = struct +module Connections = struct - let list = - RPC_service.post_service - ~description:"List the running P2P connection." - ~query: RPC_query.empty - ~input: Data_encoding.empty - ~output: (Data_encoding.list P2p_connection.Info.encoding) - RPC_path.(root / "network" / "connection") + module S = struct - let info = - RPC_service.post_service - ~query: RPC_query.empty - ~input: Data_encoding.empty - ~output: (Data_encoding.option P2p_connection.Info.encoding) - ~description:"Details about the current P2P connection to the given peer." - RPC_path.(root / "network" / "connection" /: P2p_peer.Id.rpc_arg) + let list = + RPC_service.post_service + ~description:"List the running P2P connection." + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: (Data_encoding.list P2p_connection.Info.encoding) + RPC_path.(root / "network" / "connections") - let kick = - RPC_service.post_service - ~query: RPC_query.empty - ~input: Data_encoding.(obj1 (req "wait" bool)) - ~output: Data_encoding.empty - ~description:"Forced close of the current P2P connection to the given peer." - RPC_path.(root / "network" / "connection" /: P2p_peer.Id.rpc_arg / "kick") + let info = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: P2p_connection.Info.encoding + ~description:"Details about the current P2P connection to the given peer." + RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg) + + let kick = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.(obj1 (req "wait" bool)) + ~output: Data_encoding.empty + ~description:"Forced close of the current P2P connection to the given peer." + RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg / "kick") + + end + + let list ctxt = make_call S.list ctxt () () () + let info ctxt peer_id = make_call1 S.info ctxt peer_id () () + let kick ctxt ?(wait = false) peer_id = make_call1 S.kick ctxt peer_id () wait end -module Point = struct +module Points = struct - let info = - RPC_service.post_service - ~query: RPC_query.empty - ~input: Data_encoding.empty - ~output: (Data_encoding.option P2p_point.Info.encoding) - ~description: "Details about a given `IP:addr`." - RPC_path.(root / "network" / "point" /: point_arg) + module S = struct - let events = - RPC_service.post_service - ~query: RPC_query.empty - ~input: monitor_encoding - ~output: (Data_encoding.list - P2p_point.Pool_event.encoding) - ~description: "Monitor network events related to an `IP:addr`." - RPC_path.(root / "network" / "point" /: point_arg / "log") + let info = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: P2p_point.Info.encoding + ~description: "Details about a given `IP:addr`." + RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg) - let list = - let filter = - let open Data_encoding in - obj1 (dft "filter" (list P2p_point.State.encoding) []) in - RPC_service.post_service - ~query: RPC_query.empty - ~input: filter - ~output: - Data_encoding.(list (tup2 - P2p_point.Id.encoding - P2p_point.Info.encoding)) - ~description:"List the pool of known `IP:port` \ - used for establishing P2P connections ." - RPC_path.(root / "network" / "point") + let events = + RPC_service.post_service + ~query: RPC_query.empty + ~input: monitor_encoding + ~output: (Data_encoding.list + P2p_point.Pool_event.encoding) + ~description: "Monitor network events related to an `IP:addr`." + RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "log") + + let list = + let filter = + let open Data_encoding in + obj1 (dft "filter" (list P2p_point.State.encoding) []) in + RPC_service.post_service + ~query: RPC_query.empty + ~input: filter + ~output: + Data_encoding.(list (tup2 + P2p_point.Id.encoding + P2p_point.Info.encoding)) + ~description:"List the pool of known `IP:port` \ + used for establishing P2P connections ." + RPC_path.(root / "networks" / "point") + + end + + open RPC_context + let info ctxt peer_id = make_call1 S.info ctxt peer_id () () + let events ctxt point = + make_streamed_call S.events ctxt ((), point) () true + let list ?(filter = []) ctxt = make_call S.list ctxt () () filter end -module Peer_id = struct +module Peers = struct - let info = - RPC_service.post_service - ~query: RPC_query.empty - ~input: Data_encoding.empty - ~output: (Data_encoding.option P2p_peer.Info.encoding) - ~description:"Details about a given peer." - RPC_path.(root / "network" / "peer_id" /: P2p_peer.Id.rpc_arg) + module S = struct - let events = - RPC_service.post_service - ~query: RPC_query.empty - ~input: monitor_encoding - ~output: (Data_encoding.list - P2p_peer.Pool_event.encoding) - ~description:"Monitor network events related to a given peer." - RPC_path.(root / "network" / "peer_id" /: P2p_peer.Id.rpc_arg / "log") + let info = + RPC_service.post_service + ~query: RPC_query.empty + ~input: Data_encoding.empty + ~output: P2p_peer.Info.encoding + ~description:"Details about a given peer." + RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg) - let list = - let filter = - let open Data_encoding in - obj1 (dft "filter" (list P2p_peer.State.encoding) []) in - RPC_service.post_service - ~query: RPC_query.empty - ~input: filter - ~output: - Data_encoding.(list (tup2 - P2p_peer.Id.encoding - P2p_peer.Info.encoding)) - ~description:"List the peers the node ever met." - RPC_path.(root / "network" / "peer_id") + let events = + RPC_service.post_service + ~query: RPC_query.empty + ~input: monitor_encoding + ~output: (Data_encoding.list + P2p_peer.Pool_event.encoding) + ~description:"Monitor network events related to a given peer." + RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "log") + + let list = + let filter = + let open Data_encoding in + obj1 (dft "filter" (list P2p_peer.State.encoding) []) in + RPC_service.post_service + ~query: RPC_query.empty + ~input: filter + ~output: + Data_encoding.(list (tup2 + P2p_peer.Id.encoding + P2p_peer.Info.encoding)) + ~description:"List the peers the node ever met." + RPC_path.(root / "network" / "peers") + + end + + let info ctxt peer_id = make_call1 S.info ctxt peer_id () () + let events ctxt point = + make_streamed_call S.events ctxt ((), point) () true + let list ?(filter = []) ctxt = make_call S.list ctxt () () filter end diff --git a/src/lib_shell_services/p2p_services.mli b/src/lib_shell_services/p2p_services.mli index 3a205e34d..827398a9b 100644 --- a/src/lib_shell_services/p2p_services.mli +++ b/src/lib_shell_services/p2p_services.mli @@ -7,75 +7,138 @@ (* *) (**************************************************************************) -val stat : - ([ `POST ], unit, - unit, unit, unit, - P2p_stat.t) RPC_service.t +open RPC_context -val versions : - ([ `POST ], unit, - unit, unit, unit, - P2p_version.t list) RPC_service.t +val stat: #simple -> P2p_stat.t tzresult Lwt.t -val events : - ([ `POST ], unit, - unit, unit, unit, - P2p_connection.Pool_event.t) RPC_service.t +val versions: #simple -> P2p_version.t list tzresult Lwt.t -val connect : - ([ `POST ], unit, - unit * P2p_point.Id.t, unit, float, - unit tzresult) RPC_service.t +val events: #streamed -> + (P2p_connection.Pool_event.t Lwt_stream.t * stopper) tzresult Lwt.t -module Connection : sig +val connect: #simple -> timeout:float -> P2p_point.Id.t -> unit tzresult Lwt.t - val list : +module S : sig + + val stat : ([ `POST ], unit, unit, unit, unit, - P2p_connection.Info.t list) RPC_service.t + P2p_stat.t) RPC_service.t - val info : + val versions : ([ `POST ], unit, - unit * P2p_peer.Id.t, unit, unit, - P2p_connection.Info.t option) RPC_service.t - - val kick : - ([ `POST ], unit, - unit * P2p_peer.Id.t, unit, bool, - unit) RPC_service.t - -end - -module Point : sig - val list : - ([ `POST ], unit, - unit, unit, P2p_point.State.t list, - (P2p_point.Id.t * P2p_point.Info.t) list) RPC_service.t - val info : - ([ `POST ], unit, - unit * P2p_point.Id.t, unit, unit, - P2p_point.Info.t option) RPC_service.t - val events : - ([ `POST ], unit, - unit * P2p_point.Id.t, unit, bool, - P2p_point.Pool_event.t list) RPC_service.t -end - -module Peer_id : sig - - val list : - ([ `POST ], unit, - unit, unit, P2p_peer.State.t list, - (P2p_peer.Id.t * P2p_peer.Info.t) list) RPC_service.t - - val info : - ([ `POST ], unit, - unit * P2p_peer.Id.t, unit, unit, - P2p_peer.Info.t option) RPC_service.t + unit, unit, unit, + P2p_version.t list) RPC_service.t val events : ([ `POST ], unit, - unit * P2p_peer.Id.t, unit, bool, - P2p_peer.Pool_event.t list) RPC_service.t + unit, unit, unit, + P2p_connection.Pool_event.t) RPC_service.t + + val connect : + ([ `POST ], unit, + unit * P2p_point.Id.t, unit, float, + unit tzresult) RPC_service.t + +end + +module Connections : sig + + open RPC_context + + val list: #simple -> P2p_connection.Info.t list tzresult Lwt.t + + val info: #simple -> P2p_peer.Id.t -> P2p_connection.Info.t tzresult Lwt.t + + val kick: #simple -> ?wait:bool -> P2p_peer.Id.t -> unit tzresult Lwt.t + + module S : sig + + val list : + ([ `POST ], unit, + unit, unit, unit, + P2p_connection.Info.t list) RPC_service.t + + val info : + ([ `POST ], unit, + unit * P2p_peer.Id.t, unit, unit, + P2p_connection.Info.t) RPC_service.t + + val kick : + ([ `POST ], unit, + unit * P2p_peer.Id.t, unit, bool, + unit) RPC_service.t + + end + +end + + +module Points : sig + + val list: + ?filter:(P2p_point.State.t list) -> + #simple -> (P2p_point.Id.t * P2p_point.Info.t) list tzresult Lwt.t + + val info: #simple -> P2p_point.Id.t -> P2p_point.Info.t tzresult Lwt.t + + val events: + #streamed -> + P2p_point.Id.t -> + (P2p_point.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t + + module S : sig + + val list : + ([ `POST ], unit, + unit, unit, P2p_point.State.t list, + (P2p_point.Id.t * P2p_point.Info.t) list) RPC_service.t + + val info : + ([ `POST ], unit, + unit * P2p_point.Id.t, unit, unit, + P2p_point.Info.t) RPC_service.t + + val events : + ([ `POST ], unit, + unit * P2p_point.Id.t, unit, bool, + P2p_point.Pool_event.t list) RPC_service.t + + end + +end + + +module Peers : sig + + val list: + ?filter:(P2p_peer.State.t list) -> + #simple -> + (P2p_peer.Id.t * P2p_peer.Info.t) list tzresult Lwt.t + + val info: #simple -> P2p_peer.Id.t -> P2p_peer.Info.t tzresult Lwt.t + + val events: + #streamed -> P2p_peer.Id.t -> + (P2p_peer.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t + + module S : sig + + val list : + ([ `POST ], unit, + unit, unit, P2p_peer.State.t list, + (P2p_peer.Id.t * P2p_peer.Info.t) list) RPC_service.t + + val info : + ([ `POST ], unit, + unit * P2p_peer.Id.t, unit, unit, + P2p_peer.Info.t) RPC_service.t + + val events : + ([ `POST ], unit, + unit * P2p_peer.Id.t, unit, bool, + P2p_peer.Pool_event.t list) RPC_service.t + + end end