From c175cd1c6536a1b1196afe7288144dbdd85422cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sat, 21 Apr 2018 12:46:33 +0200 Subject: [PATCH] Shell/RPC: use query parameters for `/p2p` --- src/lib_base/p2p_peer.ml | 36 ++++++ src/lib_base/p2p_peer.mli | 13 +++ src/lib_base/p2p_point.ml | 42 +++++++ src/lib_base/p2p_point.mli | 14 +++ src/lib_p2p/p2p.ml | 30 ++--- src/lib_shell_services/p2p_services.ml | 145 +++++++++++++----------- src/lib_shell_services/p2p_services.mli | 60 +++++----- 7 files changed, 231 insertions(+), 109 deletions(-) diff --git a/src/lib_base/p2p_peer.ml b/src/lib_base/p2p_peer.ml index 1f81aa0ba..1bf01df93 100644 --- a/src/lib_base/p2p_peer.ml +++ b/src/lib_base/p2p_peer.ml @@ -13,6 +13,29 @@ module Table = Id.Table module Map = Id.Map module Set = Id.Set +module Filter = struct + + type t = + | Accepted + | Running + | Disconnected + + let rpc_arg = + RPC_arg.make + ~name:"p2p.point.state_filter" + ~destruct:(function + | "accepted" -> Ok Accepted + | "running" -> Ok Running + | "disconnected" -> Ok Disconnected + | s -> Error (Format.asprintf "Invalid state: %s" s)) + ~construct:(function + | Accepted -> "accepted" + | Running -> "running" + | Disconnected -> "disconnected") + () + +end + module State = struct type t = @@ -33,6 +56,19 @@ module State = struct "disconnected", Disconnected ; ] + let raw_filter (f : Filter.t) (s : t) = + match f, s with + | Accepted, Accepted -> true + | Accepted, (Running | Disconnected) + | (Running | Disconnected), Accepted -> false + | Running, Running -> true + | Disconnected, Disconnected -> true + | Running, Disconnected + | Disconnected, Running -> false + + let filter filters state = + List.exists (fun f -> raw_filter f state) filters + end module Info = struct diff --git a/src/lib_base/p2p_peer.mli b/src/lib_base/p2p_peer.mli index 08db0f1d2..d7ee932a7 100644 --- a/src/lib_base/p2p_peer.mli +++ b/src/lib_base/p2p_peer.mli @@ -13,6 +13,17 @@ module Map = Id.Map module Set = Id.Set module Table = Id.Table +module Filter : sig + + type t = + | Accepted + | Running + | Disconnected + + val rpc_arg : t RPC_arg.t + +end + module State : sig type t = @@ -23,6 +34,8 @@ module State : sig val pp_digram : Format.formatter -> t -> unit val encoding : t Data_encoding.t + val filter : Filter.t list -> t -> bool + end module Info : sig diff --git a/src/lib_base/p2p_point.ml b/src/lib_base/p2p_point.ml index 6d54a1332..ab186c9ca 100644 --- a/src/lib_base/p2p_point.ml +++ b/src/lib_base/p2p_point.ml @@ -107,6 +107,32 @@ module Map = Map.Make (Id) module Set = Set.Make (Id) module Table = Hashtbl.Make (Id) +module Filter = struct + + type t = + | Requested + | Accepted + | Running + | Disconnected + + let rpc_arg = + RPC_arg.make + ~name:"p2p.point.state_filter" + ~destruct:(function + | "requested" -> Ok Requested + | "accepted" -> Ok Accepted + | "running" -> Ok Running + | "disconnected" -> Ok Disconnected + | s -> Error (Format.asprintf "Invalid state: %s" s)) + ~construct:(function + | Requested -> "requested" + | Accepted -> "accepted" + | Running -> "running" + | Disconnected -> "disconnected") + () + +end + module State = struct type t = @@ -166,6 +192,22 @@ module State = struct (fun () -> Disconnected) ; ] + let raw_filter (f : Filter.t) (s : t) = + match f, s with + | Requested, Requested -> true + | Requested, (Accepted _ | Running _ | Disconnected) + | (Accepted | Running | Disconnected), Requested -> false + | Accepted, Accepted _-> true + | Accepted, (Running _ | Disconnected) + | (Running | Disconnected), Accepted _ -> false + | Running, Running _ -> true + | Disconnected, Disconnected -> true + | Running, Disconnected + | Disconnected, Running _ -> false + + let filter filters state = + List.exists (fun f -> raw_filter f state) filters + end module Info = struct diff --git a/src/lib_base/p2p_point.mli b/src/lib_base/p2p_point.mli index 2a8c966b5..916544c9e 100644 --- a/src/lib_base/p2p_point.mli +++ b/src/lib_base/p2p_point.mli @@ -30,6 +30,18 @@ module Map : Map.S with type key = Id.t module Set : Set.S with type elt = Id.t module Table : Hashtbl.S with type key = Id.t +module Filter : sig + + type t = + | Requested + | Accepted + | Running + | Disconnected + + val rpc_arg : t RPC_arg.t + +end + module State : sig type t = @@ -44,6 +56,8 @@ module State : sig val of_p2p_peer_id : t -> P2p_peer_id.t option val of_peerid_state : t -> P2p_peer_id.t option -> t + val filter : Filter.t list -> t -> bool + end module Info : sig diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index fff7f0682..ae2a44fa5 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -580,11 +580,11 @@ let build_rpc_directory net = end in let dir = - RPC_directory.register1 dir P2p_services.S.connect begin fun point () timeout -> + RPC_directory.register1 dir P2p_services.S.connect begin fun point q () -> match net.pool with | None -> failwith "The P2P layer is disabled." | Some pool -> - P2p_pool.connect ~timeout pool point >>=? fun _conn -> + P2p_pool.connect ~timeout:q#timeout pool point >>=? fun _conn -> return () end in @@ -602,13 +602,13 @@ let build_rpc_directory net = let dir = RPC_directory.lwt_register1 dir P2p_services.Connections.S.kick - begin fun peer_id () wait -> + begin fun peer_id q () -> match net.pool with | None -> Lwt.return_unit | Some pool -> match P2p_pool.Connection.find_by_peer_id pool peer_id with | None -> Lwt.return_unit - | Some conn -> P2p_pool.disconnect ~wait conn + | Some conn -> P2p_pool.disconnect ~wait:q#wait conn end in let dir = @@ -629,7 +629,7 @@ let build_rpc_directory net = let dir = RPC_directory.register0 dir P2p_services.Peers.S.list - begin fun () restrict -> + begin fun q () -> match net.pool with | None -> return [] | Some pool -> @@ -638,9 +638,10 @@ let build_rpc_directory net = ~init:[] ~f:begin fun peer_id i a -> let info = info_of_peer_info pool i in - match restrict with + match q#filters with | [] -> (peer_id, info) :: a - | _ when List.mem info.state restrict -> (peer_id, info) :: a + | filters when P2p_peer.State.filter filters info.state -> + (peer_id, info) :: a | _ -> a end end in @@ -658,7 +659,7 @@ let build_rpc_directory net = let dir = RPC_directory.gen_register1 dir P2p_services.Peers.S.events - begin fun peer_id () monitor -> + begin fun peer_id q () -> match net.pool with | None -> RPC_answer.not_found | Some pool -> @@ -670,7 +671,7 @@ let build_rpc_directory net = P2p_peer_state.Info.fold gi ~init:[] ~f:(fun a e -> e :: a) in let evts = (if rev then List.rev_sub else List.sub) evts max in - if not monitor then + if not q#monitor then RPC_answer.return evts else let stream, stopper = P2p_peer_state.Info.watch gi in @@ -722,7 +723,7 @@ let build_rpc_directory net = let dir = RPC_directory.register0 dir P2p_services.Points.S.list - begin fun () restrict -> + begin fun q () -> match net.pool with | None -> return [] | Some pool -> @@ -731,9 +732,10 @@ let build_rpc_directory net = pool ~init:[] ~f:begin fun point i a -> let info = info_of_point_info i in - match restrict with + match q#filters with | [] -> (point, info) :: a - | _ when List.mem info.state restrict -> (point, info) :: a + | filters when P2p_point.State.filter filters info.state -> + (point, info) :: a | _ -> a end end in @@ -752,7 +754,7 @@ let build_rpc_directory net = let dir = RPC_directory.gen_register1 dir P2p_services.Points.S.events - begin fun point_id () monitor -> + begin fun point_id q () -> match net.pool with | None -> RPC_answer.not_found | Some pool -> @@ -764,7 +766,7 @@ let build_rpc_directory net = P2p_point_state.Info.fold gi ~init:[] ~f:(fun a e -> e :: a) in let evts = (if rev then List.rev_sub else List.sub) evts max in - if not monitor then + if not q#monitor then RPC_answer.return evts else let stream, stopper = P2p_point_state.Info.watch gi in diff --git a/src/lib_shell_services/p2p_services.ml b/src/lib_shell_services/p2p_services.ml index c29b2260a..b04678047 100644 --- a/src/lib_shell_services/p2p_services.ml +++ b/src/lib_shell_services/p2p_services.ml @@ -7,39 +7,60 @@ (* *) (**************************************************************************) +let wait_query = + let open RPC_query in + query (fun wait -> object + method wait = wait + end) + |+ flag "wait" (fun t -> t#wait) + |> seal + +let monitor_query = + let open RPC_query in + query (fun monitor -> object + method monitor = monitor + end) + |+ flag "monitor" (fun t -> t#monitor) + |> seal + +let timeout_query = + let open RPC_query in + query (fun timeout -> object + method timeout = timeout + end) + |+ field "timeout" RPC_arg.float 10. (fun t -> t#timeout) + |> seal + module S = struct let versions = - RPC_service.post_service + RPC_service.get_service ~description:"Supported network layer versions." ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: (Data_encoding.list P2p_version.encoding) RPC_path.(root / "network" / "versions") let stat = - RPC_service.post_service + RPC_service.get_service ~description:"Global network bandwidth statistics in B/s." ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: P2p_stat.encoding RPC_path.(root / "network" / "stat") let events = - RPC_service.post_service + RPC_service.get_service ~description:"Stream of all network events" ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: P2p_connection.Pool_event.encoding RPC_path.(root / "network" / "log") let connect = - RPC_service.post_service + RPC_service.put_service ~description:"Connect to a peer" - ~query: RPC_query.empty - ~input: Data_encoding.(obj1 (dft "timeout" float 5.)) + ~query: timeout_query + ~input: Data_encoding.empty ~output: Data_encoding.empty - RPC_path.(root / "network" / "connect" /: P2p_point.Id.rpc_arg) + RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg) end @@ -48,9 +69,7 @@ let stat ctxt = make_call S.stat ctxt () () () let versions ctxt = make_call S.versions ctxt () () () let events ctxt = make_streamed_call S.events ctxt () () () let connect ctxt ~timeout peer_id = - make_call1 S.connect ctxt peer_id () timeout - -let monitor_encoding = Data_encoding.(obj1 (dft "monitor" bool false)) + make_call1 S.connect ctxt peer_id (object method timeout = timeout end) () module Connections = struct @@ -62,34 +81,32 @@ module Connections = struct module S = struct let list = - RPC_service.post_service + RPC_service.get_service ~description:"List the running P2P connection." ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: (Data_encoding.list connection_info_encoding) RPC_path.(root / "network" / "connections") let info = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: connection_info_encoding ~description:"Details about the current P2P connection to the given peer." RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg) let kick = - RPC_service.post_service - ~query: RPC_query.empty - ~input: Data_encoding.(obj1 (req "wait" bool)) + RPC_service.delete_service + ~query: wait_query ~output: Data_encoding.empty ~description:"Forced close of the current P2P connection to the given peer." - RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg / "kick") + RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg) end let list ctxt = make_call S.list ctxt () () () let info ctxt peer_id = make_call1 S.info ctxt peer_id () () - let kick ctxt ?(wait = false) peer_id = make_call1 S.kick ctxt peer_id () wait + let kick ctxt ?(wait = false) peer_id = + make_call1 S.kick ctxt peer_id (object method wait = wait end) () end @@ -98,29 +115,30 @@ module Points = struct module S = struct let info = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: P2p_point.Info.encoding ~description: "Details about a given `IP:addr`." RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg) let events = - RPC_service.post_service - ~query: RPC_query.empty - ~input: monitor_encoding + RPC_service.get_service + ~query: monitor_query ~output: (Data_encoding.list P2p_point.Pool_event.encoding) ~description: "Monitor network events related to an `IP:addr`." RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "log") let list = - let filter = - let open Data_encoding in - obj1 (dft "filter" (list P2p_point.State.encoding) []) in - RPC_service.post_service - ~query: RPC_query.empty - ~input: filter + let filter_query = + let open RPC_query in + query (fun filters -> object + method filters = filters + end) + |+ multi_field "filter" P2p_point.Filter.rpc_arg (fun t -> t#filters) + |> seal in + RPC_service.get_service + ~query: filter_query ~output: Data_encoding.(list (tup2 P2p_point.Id.encoding @@ -130,25 +148,22 @@ module Points = struct RPC_path.(root / "network" / "points") let forget = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: Data_encoding.empty ~description:"Remove the given address from the whitelist/blacklist." RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "forget" ) let ban = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: Data_encoding.empty ~description:"Blacklist the given address." RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "ban" ) let trust = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: Data_encoding.empty ~description:"Trust a given address permanently. \ Connections from this address can still be closed \ @@ -156,9 +171,8 @@ module Points = struct RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "trust" ) let banned = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: Data_encoding.bool ~description:"Check is a given address is blacklisted or \ greylisted." @@ -169,8 +183,10 @@ module Points = struct open RPC_context let info ctxt peer_id = make_call1 S.info ctxt peer_id () () let events ctxt point = - make_streamed_call S.events ctxt ((), point) () true - let list ?(filter = []) ctxt = make_call S.list ctxt () () filter + make_streamed_call S.events ctxt ((), point) + (object method monitor = true end) () + let list ?(filter = []) ctxt = make_call S.list ctxt () + (object method filters = filter end) () let forget ctxt peer_id = make_call1 S.forget ctxt peer_id () () let ban ctxt peer_id = make_call1 S.ban ctxt peer_id () () let trust ctxt peer_id = make_call1 S.trust ctxt peer_id () () @@ -183,17 +199,15 @@ module Peers = struct module S = struct let info = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: (P2p_peer.Info.encoding Connection_metadata.encoding) ~description:"Details about a given peer." RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg) let events = - RPC_service.post_service - ~query: RPC_query.empty - ~input: monitor_encoding + RPC_service.get_service + ~query: monitor_query ~output: (Data_encoding.list P2p_peer.Pool_event.encoding) ~description:"Monitor network events related to a given peer." @@ -201,11 +215,14 @@ module Peers = struct let list = let filter = - let open Data_encoding in - obj1 (dft "filter" (list P2p_peer.State.encoding) []) in - RPC_service.post_service - ~query: RPC_query.empty - ~input: filter + let open RPC_query in + query (fun filters -> object + method filters = filters + end) + |+ multi_field "filter" P2p_peer.Filter.rpc_arg (fun t -> t#filters) + |> seal in + RPC_service.get_service + ~query: filter ~output: Data_encoding.(list (tup2 P2p_peer.Id.encoding @@ -214,34 +231,30 @@ module Peers = struct RPC_path.(root / "network" / "peers") let forget = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: Data_encoding.empty ~description:"Remove the given peer from the whitelist/blacklist." RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "forget" ) let ban = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: Data_encoding.empty ~description:"Blacklist the given peer." RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "ban" ) let trust = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: Data_encoding.empty ~description:"Trust a given peer permanently: the peer cannot \ be blocked (but its host IP still can)." RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "trust" ) let banned = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: Data_encoding.bool ~description:"Check if a given peer is blacklisted or \ greylisted." @@ -251,8 +264,10 @@ module Peers = struct let info ctxt peer_id = make_call1 S.info ctxt peer_id () () let events ctxt point = - make_streamed_call S.events ctxt ((), point) () true - let list ?(filter = []) ctxt = make_call S.list ctxt () () filter + make_streamed_call S.events ctxt ((), point) + (object method monitor = true end) () + let list ?(filter = []) ctxt = + make_call S.list ctxt () (object method filters = filter end) () let forget ctxt point_id = make_call1 S.forget ctxt point_id () () let ban ctxt point_id = make_call1 S.ban ctxt point_id () () let trust ctxt point_id = make_call1 S.trust ctxt point_id () () @@ -265,12 +280,12 @@ module ACL = struct module S = struct let clear = - RPC_service.post_service + RPC_service.get_service ~query: RPC_query.empty - ~input: Data_encoding.empty ~output: Data_encoding.empty ~description:"Clear all greylists tables." RPC_path.(root / "network" / "greylist" / "clear" ) + end let clear ctxt = make_call S.clear ctxt () () diff --git a/src/lib_shell_services/p2p_services.mli b/src/lib_shell_services/p2p_services.mli index a3de8364d..9682d5d99 100644 --- a/src/lib_shell_services/p2p_services.mli +++ b/src/lib_shell_services/p2p_services.mli @@ -21,23 +21,23 @@ val connect: #simple -> timeout:float -> P2p_point.Id.t -> unit tzresult Lwt.t module S : sig val stat : - ([ `POST ], unit, + ([ `GET ], unit, unit, unit, unit, P2p_stat.t) RPC_service.t val versions : - ([ `POST ], unit, + ([ `GET ], unit, unit, unit, unit, P2p_version.t list) RPC_service.t val events : - ([ `POST ], unit, + ([ `GET ], unit, unit, unit, unit, P2p_connection.Pool_event.t) RPC_service.t val connect : - ([ `POST ], unit, - unit * P2p_point.Id.t, unit, float, + ([ `PUT ], unit, + unit * P2p_point.Id.t, < timeout: float >, unit, unit) RPC_service.t end @@ -57,18 +57,18 @@ module Connections : sig module S : sig val list : - ([ `POST ], unit, + ([ `GET ], unit, unit, unit, unit, connection_info list) RPC_service.t val info : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_peer.Id.t, unit, unit, connection_info) RPC_service.t val kick : - ([ `POST ], unit, - unit * P2p_peer.Id.t, unit, bool, + ([ `DELETE ], unit, + unit * P2p_peer.Id.t, < wait: bool >, unit, unit) RPC_service.t end @@ -79,7 +79,7 @@ end module Points : sig val list: - ?filter:(P2p_point.State.t list) -> + ?filter:(P2p_point.Filter.t list) -> #simple -> (P2p_point.Id.t * P2p_point.Info.t) list tzresult Lwt.t val info: #simple -> P2p_point.Id.t -> P2p_point.Info.t tzresult Lwt.t @@ -100,37 +100,37 @@ module Points : sig module S : sig val list : - ([ `POST ], unit, - unit, unit, P2p_point.State.t list, + ([ `GET ], unit, + unit, < filters: P2p_point.Filter.t list >, unit, (P2p_point.Id.t * P2p_point.Info.t) list) RPC_service.t val info : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_point.Id.t, unit, unit, P2p_point.Info.t) RPC_service.t val events : - ([ `POST ], unit, - unit * P2p_point.Id.t, unit, bool, + ([ `GET ], unit, + unit * P2p_point.Id.t, < monitor: bool>, unit, P2p_point.Pool_event.t list) RPC_service.t val forget : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_point.Id.t, unit, unit, unit) RPC_service.t val ban : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_point.Id.t, unit, unit, unit) RPC_service.t val trust : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_point.Id.t, unit, unit, unit) RPC_service.t val banned : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_point.Id.t, unit, unit, bool) RPC_service.t @@ -141,7 +141,7 @@ end module Peers : sig val list: - ?filter:(P2p_peer.State.t list) -> + ?filter:(P2p_peer.Filter.t list) -> #simple -> (P2p_peer.Id.t * Connection_metadata.t P2p_peer.Info.t) list tzresult Lwt.t @@ -164,37 +164,37 @@ module Peers : sig module S : sig val list : - ([ `POST ], unit, - unit, unit, P2p_peer.State.t list, + ([ `GET ], unit, + unit, < filters: P2p_peer.Filter.t list >, unit, (P2p_peer.Id.t * Connection_metadata.t P2p_peer.Info.t) list) RPC_service.t val info : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_peer.Id.t, unit, unit, Connection_metadata.t P2p_peer.Info.t) RPC_service.t val events : - ([ `POST ], unit, - unit * P2p_peer.Id.t, unit, bool, + ([ `GET ], unit, + unit * P2p_peer.Id.t, < monitor: bool>, unit, P2p_peer.Pool_event.t list) RPC_service.t val forget : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_peer.Id.t, unit, unit, unit) RPC_service.t val ban : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_peer.Id.t, unit, unit, unit) RPC_service.t val trust : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_peer.Id.t, unit, unit, unit) RPC_service.t val banned : - ([ `POST ], unit, + ([ `GET ], unit, unit * P2p_peer.Id.t, unit, unit, bool) RPC_service.t @@ -209,7 +209,7 @@ module ACL : sig module S : sig val clear : - ([ `POST ], unit, + ([ `GET ], unit, unit, unit, unit, unit) RPC_service.t