Client refactor: Move Client_node_rpcs.Network into P2p_services

This commit is contained in:
Grégoire Henry 2018-02-08 10:51:02 +01:00
parent 02c2035e93
commit 37e65d93e7
8 changed files with 302 additions and 226 deletions

View File

@ -93,6 +93,14 @@ module Id = struct
let encoding = let encoding =
Data_encoding.conv to_string of_string_exn Data_encoding.string Data_encoding.conv to_string of_string_exn Data_encoding.string
let rpc_arg =
RPC_arg.make
~name:"point"
~descr:"A network point (ipv4:port or [ipv6]:port)."
~destruct:of_string
~construct:to_string
()
end end
module Map = Map.Make (Id) module Map = Map.Make (Id)

View File

@ -23,6 +23,7 @@ module Id : sig
val is_global : t -> bool val is_global : t -> bool
val parse_addr_port : string -> string * string val parse_addr_port : string -> string * string
val rpc_arg : t RPC_arg.t
end end
module Map : Map.S with type key = Id.t module Map : Map.S with type key = Id.t

View File

@ -16,10 +16,10 @@ let commands () = [
command ~group ~desc: "show global network status" command ~group ~desc: "show global network status"
no_options no_options
(prefixes ["network" ; "stat"] stop) begin fun () (cctxt : Client_commands.full_context) -> (prefixes ["network" ; "stat"] stop) begin fun () (cctxt : Client_commands.full_context) ->
Client_node_rpcs.Network.stat cctxt >>=? fun stat -> P2p_services.stat cctxt >>=? fun stat ->
Client_node_rpcs.Network.connections cctxt >>=? fun conns -> P2p_services.Connections.list cctxt >>=? fun conns ->
Client_node_rpcs.Network.peers cctxt >>=? fun peers -> P2p_services.Peers.list cctxt >>=? fun peers ->
Client_node_rpcs.Network.points cctxt >>=? fun points -> P2p_services.Points.list cctxt >>=? fun points ->
cctxt#message "GLOBAL STATS" >>= fun () -> cctxt#message "GLOBAL STATS" >>= fun () ->
cctxt#message " %a" P2p_stat.pp stat >>= fun () -> cctxt#message " %a" P2p_stat.pp stat >>= fun () ->
cctxt#message "CONNECTIONS" >>= fun () -> cctxt#message "CONNECTIONS" >>= fun () ->

View File

@ -57,19 +57,3 @@ module Protocols = struct
{ contents; monitor = Some false } { contents; monitor = Some false }
end end
module Network = struct
let stat cctxt =
call_service0 cctxt P2p_services.stat ()
let connections cctxt =
call_service0 cctxt P2p_services.Connection.list ()
let peers cctxt =
call_service0 cctxt P2p_services.Peer_id.list []
let points cctxt =
call_service0 cctxt P2p_services.Point.list []
end

View File

@ -54,22 +54,6 @@ end
val bootstrapped: val bootstrapped:
#Client_rpcs.ctxt -> (Block_hash.t * Time.t) Lwt_stream.t tzresult Lwt.t #Client_rpcs.ctxt -> (Block_hash.t * Time.t) Lwt_stream.t tzresult Lwt.t
module Network : sig
val stat:
#Client_rpcs.ctxt -> P2p_stat.t tzresult Lwt.t
val connections:
#Client_rpcs.ctxt -> P2p_connection.Info.t list tzresult Lwt.t
val peers:
#Client_rpcs.ctxt -> (P2p_peer.Id.t * P2p_peer.Info.t) list tzresult Lwt.t
val points:
#Client_rpcs.ctxt -> (P2p_point.Id.t * P2p_point.Info.t) list tzresult Lwt.t
end
val complete: val complete:
#Client_rpcs.ctxt -> #Client_rpcs.ctxt ->
?block:Block_services.block -> string -> string list tzresult Lwt.t ?block:Block_services.block -> string -> string list tzresult Lwt.t

View File

@ -522,48 +522,52 @@ let build_rpc_directory node =
let dir = let dir =
let implementation () () = Node.RPC.Network.stat node |> RPC_answer.return in let implementation () () = Node.RPC.Network.stat node |> RPC_answer.return in
RPC_directory.register0 dir P2p_services.stat implementation in RPC_directory.register0 dir P2p_services.S.stat implementation in
let dir = let dir =
let implementation () () = let implementation () () =
RPC_answer.return Distributed_db.Raw.supported_versions in RPC_answer.return Distributed_db.Raw.supported_versions in
RPC_directory.register0 dir P2p_services.versions implementation in RPC_directory.register0 dir P2p_services.S.versions implementation in
let dir = let dir =
let implementation () () = let implementation () () =
let stream, stopper = Node.RPC.Network.watch node in let stream, stopper = Node.RPC.Network.watch node in
let shutdown () = Lwt_watcher.shutdown stopper in let shutdown () = Lwt_watcher.shutdown stopper in
let next () = Lwt_stream.get stream in let next () = Lwt_stream.get stream in
RPC_answer.return_stream { next ; shutdown } in RPC_answer.return_stream { next ; shutdown } in
RPC_directory.register0 dir P2p_services.events implementation in RPC_directory.register0 dir P2p_services.S.events implementation in
let dir = let dir =
let implementation point () timeout = let implementation point () timeout =
Node.RPC.Network.connect node point timeout >>= RPC_answer.return in Node.RPC.Network.connect node point timeout >>= RPC_answer.return in
RPC_directory.register1 dir P2p_services.connect implementation in RPC_directory.register1 dir P2p_services.S.connect implementation in
(* Network : Connection *) (* Network : Connection *)
let dir = let dir =
let implementation peer_id () () = let implementation peer_id () () =
Node.RPC.Network.Connection.info node peer_id |> RPC_answer.return in match Node.RPC.Network.Connection.info node peer_id with
RPC_directory.register1 dir P2p_services.Connection.info implementation in | None -> raise Not_found
| Some v -> RPC_answer.return v in
RPC_directory.register1 dir P2p_services.Connections.S.info implementation in
let dir = let dir =
let implementation peer_id () wait = let implementation peer_id () wait =
Node.RPC.Network.Connection.kick node peer_id wait >>= RPC_answer.return in Node.RPC.Network.Connection.kick node peer_id wait >>= RPC_answer.return in
RPC_directory.register1 dir P2p_services.Connection.kick implementation in RPC_directory.register1 dir P2p_services.Connections.S.kick implementation in
let dir = let dir =
let implementation () () = let implementation () () =
Node.RPC.Network.Connection.list node |> RPC_answer.return in Node.RPC.Network.Connection.list node |> RPC_answer.return in
RPC_directory.register0 dir P2p_services.Connection.list implementation in RPC_directory.register0 dir P2p_services.Connections.S.list implementation in
(* Network : Peer_id *) (* Network : Peer_id *)
let dir = let dir =
let implementation () state = let implementation () state =
Node.RPC.Network.Peer_id.list node ~restrict:state |> RPC_answer.return in Node.RPC.Network.Peer_id.list node ~restrict:state |> RPC_answer.return in
RPC_directory.register0 dir P2p_services.Peer_id.list implementation in RPC_directory.register0 dir P2p_services.Peers.S.list implementation in
let dir = let dir =
let implementation peer_id () () = let implementation peer_id () () =
Node.RPC.Network.Peer_id.info node peer_id |> RPC_answer.return in match Node.RPC.Network.Peer_id.info node peer_id with
RPC_directory.register1 dir P2p_services.Peer_id.info implementation in | None -> raise Not_found
| Some v -> RPC_answer.return v in
RPC_directory.register1 dir P2p_services.Peers.S.info implementation in
let dir = let dir =
let implementation peer_id () monitor = let implementation peer_id () monitor =
if monitor then if monitor then
@ -580,18 +584,20 @@ let build_rpc_directory node =
RPC_answer.return_stream { next ; shutdown } RPC_answer.return_stream { next ; shutdown }
else else
Node.RPC.Network.Peer_id.events node peer_id |> RPC_answer.return in Node.RPC.Network.Peer_id.events node peer_id |> RPC_answer.return in
RPC_directory.register1 dir P2p_services.Peer_id.events implementation in RPC_directory.register1 dir P2p_services.Peers.S.events implementation in
(* Network : Point *) (* Network : Point *)
let dir = let dir =
let implementation () state = let implementation () state =
Node.RPC.Network.Point.list node ~restrict:state |> RPC_answer.return in Node.RPC.Network.Point.list node ~restrict:state |> RPC_answer.return in
RPC_directory.register0 dir P2p_services.Point.list implementation in RPC_directory.register0 dir P2p_services.Points.S.list implementation in
let dir = let dir =
let implementation point () () = let implementation point () () =
Node.RPC.Network.Point.info node point |> RPC_answer.return in match Node.RPC.Network.Point.info node point with
RPC_directory.register1 dir P2p_services.Point.info implementation in | None -> raise Not_found
| Some v -> RPC_answer.return v in
RPC_directory.register1 dir P2p_services.Points.S.info implementation in
let dir = let dir =
let implementation point () monitor = let implementation point () monitor =
if monitor then if monitor then
@ -608,7 +614,7 @@ let build_rpc_directory node =
RPC_answer.return_stream { next ; shutdown } RPC_answer.return_stream { next ; shutdown }
else else
Node.RPC.Network.Point.events node point |> RPC_answer.return in Node.RPC.Network.Point.events node point |> RPC_answer.return in
RPC_directory.register1 dir P2p_services.Point.events implementation in RPC_directory.register1 dir P2p_services.Points.S.events implementation in
let dir = let dir =
RPC_directory.register_describe_directory_service dir Shell_services.describe in RPC_directory.register_describe_directory_service dir Shell_services.describe in
dir dir

View File

@ -7,143 +7,173 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
let point_arg = module S = struct
RPC_arg.make
~name:"point"
~descr:"A network point (ipv4:port or [ipv6]:port)."
~destruct:P2p_point.Id.of_string
~construct:P2p_point.Id.to_string
()
let versions = let versions =
RPC_service.post_service RPC_service.post_service
~description:"Supported network layer versions." ~description:"Supported network layer versions."
~query: RPC_query.empty ~query: RPC_query.empty
~input: Data_encoding.empty ~input: Data_encoding.empty
~output: (Data_encoding.list P2p_version.encoding) ~output: (Data_encoding.list P2p_version.encoding)
RPC_path.(root / "network" / "versions") RPC_path.(root / "network" / "versions")
let stat = let stat =
RPC_service.post_service RPC_service.post_service
~description:"Global network bandwidth statistics in B/s." ~description:"Global network bandwidth statistics in B/s."
~query: RPC_query.empty ~query: RPC_query.empty
~input: Data_encoding.empty ~input: Data_encoding.empty
~output: P2p_stat.encoding ~output: P2p_stat.encoding
RPC_path.(root / "network" / "stat") RPC_path.(root / "network" / "stat")
let events = let events =
RPC_service.post_service RPC_service.post_service
~description:"Stream of all network events" ~description:"Stream of all network events"
~query: RPC_query.empty ~query: RPC_query.empty
~input: Data_encoding.empty ~input: Data_encoding.empty
~output: P2p_connection.Pool_event.encoding ~output: P2p_connection.Pool_event.encoding
RPC_path.(root / "network" / "log") RPC_path.(root / "network" / "log")
let connect = let connect =
RPC_service.post_service RPC_service.post_service
~description:"Connect to a peer" ~description:"Connect to a peer"
~query: RPC_query.empty ~query: RPC_query.empty
~input: Data_encoding.(obj1 (dft "timeout" float 5.)) ~input: Data_encoding.(obj1 (dft "timeout" float 5.))
~output: (RPC_error.wrap Data_encoding.empty) ~output: (RPC_error.wrap Data_encoding.empty)
RPC_path.(root / "network" / "connect" /: point_arg) RPC_path.(root / "network" / "connect" /: P2p_point.Id.rpc_arg)
end
open RPC_context
let stat ctxt = make_call S.stat ctxt () () ()
let versions ctxt = make_call S.versions ctxt () () ()
let events ctxt = make_streamed_call S.events ctxt () () ()
let connect ctxt ~timeout peer_id =
make_err_call1 S.connect ctxt peer_id () timeout
let monitor_encoding = Data_encoding.(obj1 (dft "monitor" bool false)) let monitor_encoding = Data_encoding.(obj1 (dft "monitor" bool false))
module Connection = struct module Connections = struct
let list = module S = struct
RPC_service.post_service
~description:"List the running P2P connection."
~query: RPC_query.empty
~input: Data_encoding.empty
~output: (Data_encoding.list P2p_connection.Info.encoding)
RPC_path.(root / "network" / "connection")
let info = let list =
RPC_service.post_service RPC_service.post_service
~query: RPC_query.empty ~description:"List the running P2P connection."
~input: Data_encoding.empty ~query: RPC_query.empty
~output: (Data_encoding.option P2p_connection.Info.encoding) ~input: Data_encoding.empty
~description:"Details about the current P2P connection to the given peer." ~output: (Data_encoding.list P2p_connection.Info.encoding)
RPC_path.(root / "network" / "connection" /: P2p_peer.Id.rpc_arg) RPC_path.(root / "network" / "connections")
let kick = let info =
RPC_service.post_service RPC_service.post_service
~query: RPC_query.empty ~query: RPC_query.empty
~input: Data_encoding.(obj1 (req "wait" bool)) ~input: Data_encoding.empty
~output: Data_encoding.empty ~output: P2p_connection.Info.encoding
~description:"Forced close of the current P2P connection to the given peer." ~description:"Details about the current P2P connection to the given peer."
RPC_path.(root / "network" / "connection" /: P2p_peer.Id.rpc_arg / "kick") RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg)
let kick =
RPC_service.post_service
~query: RPC_query.empty
~input: Data_encoding.(obj1 (req "wait" bool))
~output: Data_encoding.empty
~description:"Forced close of the current P2P connection to the given peer."
RPC_path.(root / "network" / "connections" /: P2p_peer.Id.rpc_arg / "kick")
end
let list ctxt = make_call S.list ctxt () () ()
let info ctxt peer_id = make_call1 S.info ctxt peer_id () ()
let kick ctxt ?(wait = false) peer_id = make_call1 S.kick ctxt peer_id () wait
end end
module Point = struct module Points = struct
let info = module S = struct
RPC_service.post_service
~query: RPC_query.empty
~input: Data_encoding.empty
~output: (Data_encoding.option P2p_point.Info.encoding)
~description: "Details about a given `IP:addr`."
RPC_path.(root / "network" / "point" /: point_arg)
let events = let info =
RPC_service.post_service RPC_service.post_service
~query: RPC_query.empty ~query: RPC_query.empty
~input: monitor_encoding ~input: Data_encoding.empty
~output: (Data_encoding.list ~output: P2p_point.Info.encoding
P2p_point.Pool_event.encoding) ~description: "Details about a given `IP:addr`."
~description: "Monitor network events related to an `IP:addr`." RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg)
RPC_path.(root / "network" / "point" /: point_arg / "log")
let list = let events =
let filter = RPC_service.post_service
let open Data_encoding in ~query: RPC_query.empty
obj1 (dft "filter" (list P2p_point.State.encoding) []) in ~input: monitor_encoding
RPC_service.post_service ~output: (Data_encoding.list
~query: RPC_query.empty P2p_point.Pool_event.encoding)
~input: filter ~description: "Monitor network events related to an `IP:addr`."
~output: RPC_path.(root / "network" / "points" /: P2p_point.Id.rpc_arg / "log")
Data_encoding.(list (tup2
P2p_point.Id.encoding let list =
P2p_point.Info.encoding)) let filter =
~description:"List the pool of known `IP:port` \ let open Data_encoding in
used for establishing P2P connections ." obj1 (dft "filter" (list P2p_point.State.encoding) []) in
RPC_path.(root / "network" / "point") RPC_service.post_service
~query: RPC_query.empty
~input: filter
~output:
Data_encoding.(list (tup2
P2p_point.Id.encoding
P2p_point.Info.encoding))
~description:"List the pool of known `IP:port` \
used for establishing P2P connections ."
RPC_path.(root / "networks" / "point")
end
open RPC_context
let info ctxt peer_id = make_call1 S.info ctxt peer_id () ()
let events ctxt point =
make_streamed_call S.events ctxt ((), point) () true
let list ?(filter = []) ctxt = make_call S.list ctxt () () filter
end end
module Peer_id = struct module Peers = struct
let info = module S = struct
RPC_service.post_service
~query: RPC_query.empty
~input: Data_encoding.empty
~output: (Data_encoding.option P2p_peer.Info.encoding)
~description:"Details about a given peer."
RPC_path.(root / "network" / "peer_id" /: P2p_peer.Id.rpc_arg)
let events = let info =
RPC_service.post_service RPC_service.post_service
~query: RPC_query.empty ~query: RPC_query.empty
~input: monitor_encoding ~input: Data_encoding.empty
~output: (Data_encoding.list ~output: P2p_peer.Info.encoding
P2p_peer.Pool_event.encoding) ~description:"Details about a given peer."
~description:"Monitor network events related to a given peer." RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg)
RPC_path.(root / "network" / "peer_id" /: P2p_peer.Id.rpc_arg / "log")
let list = let events =
let filter = RPC_service.post_service
let open Data_encoding in ~query: RPC_query.empty
obj1 (dft "filter" (list P2p_peer.State.encoding) []) in ~input: monitor_encoding
RPC_service.post_service ~output: (Data_encoding.list
~query: RPC_query.empty P2p_peer.Pool_event.encoding)
~input: filter ~description:"Monitor network events related to a given peer."
~output: RPC_path.(root / "network" / "peers" /: P2p_peer.Id.rpc_arg / "log")
Data_encoding.(list (tup2
P2p_peer.Id.encoding let list =
P2p_peer.Info.encoding)) let filter =
~description:"List the peers the node ever met." let open Data_encoding in
RPC_path.(root / "network" / "peer_id") obj1 (dft "filter" (list P2p_peer.State.encoding) []) in
RPC_service.post_service
~query: RPC_query.empty
~input: filter
~output:
Data_encoding.(list (tup2
P2p_peer.Id.encoding
P2p_peer.Info.encoding))
~description:"List the peers the node ever met."
RPC_path.(root / "network" / "peers")
end
let info ctxt peer_id = make_call1 S.info ctxt peer_id () ()
let events ctxt point =
make_streamed_call S.events ctxt ((), point) () true
let list ?(filter = []) ctxt = make_call S.list ctxt () () filter
end end

View File

@ -7,75 +7,138 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
val stat : open RPC_context
([ `POST ], unit,
unit, unit, unit,
P2p_stat.t) RPC_service.t
val versions : val stat: #simple -> P2p_stat.t tzresult Lwt.t
([ `POST ], unit,
unit, unit, unit,
P2p_version.t list) RPC_service.t
val events : val versions: #simple -> P2p_version.t list tzresult Lwt.t
([ `POST ], unit,
unit, unit, unit,
P2p_connection.Pool_event.t) RPC_service.t
val connect : val events: #streamed ->
([ `POST ], unit, (P2p_connection.Pool_event.t Lwt_stream.t * stopper) tzresult Lwt.t
unit * P2p_point.Id.t, unit, float,
unit tzresult) RPC_service.t
module Connection : sig val connect: #simple -> timeout:float -> P2p_point.Id.t -> unit tzresult Lwt.t
val list : module S : sig
val stat :
([ `POST ], unit, ([ `POST ], unit,
unit, unit, unit, unit, unit, unit,
P2p_connection.Info.t list) RPC_service.t P2p_stat.t) RPC_service.t
val info : val versions :
([ `POST ], unit, ([ `POST ], unit,
unit * P2p_peer.Id.t, unit, unit, unit, unit, unit,
P2p_connection.Info.t option) RPC_service.t P2p_version.t list) RPC_service.t
val kick :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, bool,
unit) RPC_service.t
end
module Point : sig
val list :
([ `POST ], unit,
unit, unit, P2p_point.State.t list,
(P2p_point.Id.t * P2p_point.Info.t) list) RPC_service.t
val info :
([ `POST ], unit,
unit * P2p_point.Id.t, unit, unit,
P2p_point.Info.t option) RPC_service.t
val events :
([ `POST ], unit,
unit * P2p_point.Id.t, unit, bool,
P2p_point.Pool_event.t list) RPC_service.t
end
module Peer_id : sig
val list :
([ `POST ], unit,
unit, unit, P2p_peer.State.t list,
(P2p_peer.Id.t * P2p_peer.Info.t) list) RPC_service.t
val info :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, unit,
P2p_peer.Info.t option) RPC_service.t
val events : val events :
([ `POST ], unit, ([ `POST ], unit,
unit * P2p_peer.Id.t, unit, bool, unit, unit, unit,
P2p_peer.Pool_event.t list) RPC_service.t P2p_connection.Pool_event.t) RPC_service.t
val connect :
([ `POST ], unit,
unit * P2p_point.Id.t, unit, float,
unit tzresult) RPC_service.t
end
module Connections : sig
open RPC_context
val list: #simple -> P2p_connection.Info.t list tzresult Lwt.t
val info: #simple -> P2p_peer.Id.t -> P2p_connection.Info.t tzresult Lwt.t
val kick: #simple -> ?wait:bool -> P2p_peer.Id.t -> unit tzresult Lwt.t
module S : sig
val list :
([ `POST ], unit,
unit, unit, unit,
P2p_connection.Info.t list) RPC_service.t
val info :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, unit,
P2p_connection.Info.t) RPC_service.t
val kick :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, bool,
unit) RPC_service.t
end
end
module Points : sig
val list:
?filter:(P2p_point.State.t list) ->
#simple -> (P2p_point.Id.t * P2p_point.Info.t) list tzresult Lwt.t
val info: #simple -> P2p_point.Id.t -> P2p_point.Info.t tzresult Lwt.t
val events:
#streamed ->
P2p_point.Id.t ->
(P2p_point.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t
module S : sig
val list :
([ `POST ], unit,
unit, unit, P2p_point.State.t list,
(P2p_point.Id.t * P2p_point.Info.t) list) RPC_service.t
val info :
([ `POST ], unit,
unit * P2p_point.Id.t, unit, unit,
P2p_point.Info.t) RPC_service.t
val events :
([ `POST ], unit,
unit * P2p_point.Id.t, unit, bool,
P2p_point.Pool_event.t list) RPC_service.t
end
end
module Peers : sig
val list:
?filter:(P2p_peer.State.t list) ->
#simple ->
(P2p_peer.Id.t * P2p_peer.Info.t) list tzresult Lwt.t
val info: #simple -> P2p_peer.Id.t -> P2p_peer.Info.t tzresult Lwt.t
val events:
#streamed -> P2p_peer.Id.t ->
(P2p_peer.Pool_event.t list Lwt_stream.t * stopper) tzresult Lwt.t
module S : sig
val list :
([ `POST ], unit,
unit, unit, P2p_peer.State.t list,
(P2p_peer.Id.t * P2p_peer.Info.t) list) RPC_service.t
val info :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, unit,
P2p_peer.Info.t) RPC_service.t
val events :
([ `POST ], unit,
unit * P2p_peer.Id.t, unit, bool,
P2p_peer.Pool_event.t list) RPC_service.t
end
end end