From 322fc1e353951e747172c281933132e13059a792 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sun, 11 Feb 2018 21:10:25 +0100 Subject: [PATCH] RPC: move p2p services registration in `tezos-p2p` --- src/lib_p2p/jbuild | 6 +- src/lib_p2p/p2p.ml | 419 +++++++++++++++++++--------------- src/lib_p2p/p2p.mli | 59 +---- src/lib_p2p/tezos-p2p.opam | 1 + src/lib_rpc/RPC_directory.ml | 15 ++ src/lib_rpc/RPC_directory.mli | 43 ++++ src/lib_shell/node.ml | 62 +---- src/lib_shell/node.mli | 56 +---- src/lib_shell/node_rpc.ml | 100 +------- 9 files changed, 307 insertions(+), 454 deletions(-) diff --git a/src/lib_p2p/jbuild b/src/lib_p2p/jbuild index 7a82c5564..4a75b6d2f 100644 --- a/src/lib_p2p/jbuild +++ b/src/lib_p2p/jbuild @@ -4,11 +4,13 @@ ((name tezos_p2p) (public_name tezos-p2p) (libraries (tezos-base - tezos-stdlib-unix)) + tezos-stdlib-unix + tezos-shell-services)) (flags (:standard -w -9+27-30-32-40@8 -safe-string -open Tezos_base__TzPervasives - -open Tezos_stdlib_unix)))) + -open Tezos_stdlib_unix + -open Tezos_shell_services)))) (alias ((name runtest_indent) diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index b2de18f9a..4c076dce9 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -313,6 +313,7 @@ module Fake = struct end type ('msg, 'meta) t = { + versions : P2p_version.t list ; peer_id : P2p_peer.Id.t ; maintain : unit -> unit Lwt.t ; roll : unit -> unit Lwt.t ; @@ -380,6 +381,7 @@ let create ~config ~limits meta_cfg msg_cfg = check_limits limits >>=? fun () -> Real.create ~config ~limits meta_cfg msg_cfg >>=? fun net -> return { + versions = msg_cfg.versions ; peer_id = Real.peer_id net ; maintain = Real.maintain net ; roll = Real.roll net ; @@ -404,6 +406,7 @@ let create ~config ~limits meta_cfg msg_cfg = } let faked_network meta_config = { + versions = [] ; peer_id = Fake.id.peer_id ; maintain = Lwt.return ; roll = Lwt.return ; @@ -459,201 +462,249 @@ module Raw = struct let encoding = P2p_pool.Message.encoding end -module RPC = struct +let info_of_point_info i = + let open P2p_point.Info in + let open P2p_point.State in + let state = match P2p_point_state.get i with + | Requested _ -> Requested + | Accepted { current_peer_id ; _ } -> Accepted current_peer_id + | Running { current_peer_id ; _ } -> Running current_peer_id + | Disconnected -> Disconnected in + P2p_point_state.Info.{ + trusted = trusted i ; + state ; + greylisted_until = greylisted_until i ; + last_failed_connection = last_failed_connection i ; + last_rejected_connection = last_rejected_connection i ; + last_established_connection = last_established_connection i ; + last_disconnection = last_disconnection i ; + last_seen = last_seen i ; + last_miss = last_miss i ; + } - let stat net = - match net.pool with +let info_of_peer_info pool i = + let open P2p_peer.Info in + let open P2p_peer.State in + let state, id_point = match P2p_peer_state.get i with + | Accepted { current_point } -> Accepted, Some current_point + | Running { current_point } -> Running, Some current_point + | Disconnected -> Disconnected, None in + let peer_id = P2p_peer_state.Info.peer_id i in + let score = P2p_pool.Peers.get_score pool peer_id in + let stat = + match P2p_pool.Connection.find_by_peer_id pool peer_id with | None -> P2p_stat.empty - | Some pool -> P2p_pool.pool_stat pool + | Some conn -> P2p_pool.Connection.stat conn in + P2p_peer_state.Info.{ + score ; + trusted = trusted i ; + state ; + id_point ; + stat ; + last_failed_connection = last_failed_connection i ; + last_rejected_connection = last_rejected_connection i ; + last_established_connection = last_established_connection i ; + last_disconnection = last_disconnection i ; + last_seen = last_seen i ; + last_miss = last_miss i ; + } - let watch net = - match net.pool with - | None -> Lwt_watcher.create_fake_stream () - | Some pool -> P2p_pool.watch pool +let build_rpc_directory net = - let connect net point timeout = - match net.pool with - | None -> failwith "fake net" - | Some pool -> - P2p_pool.connect ~timeout pool point >>|? ignore + let dir = RPC_directory.empty in - module Connection = struct - let info net peer_id = + (* Network : Global *) + + let dir = + RPC_directory.register0 dir P2p_services.S.versions begin fun () () -> + return net.versions + end in + + let dir = + RPC_directory.register0 dir P2p_services.S.stat begin fun () () -> match net.pool with - | None -> None + | None -> return P2p_stat.empty + | Some pool -> return (P2p_pool.pool_stat pool) + end in + + let dir = + RPC_directory.gen_register0 dir P2p_services.S.events begin fun () () -> + let stream, stopper = + match net.pool with + | None -> Lwt_watcher.create_fake_stream () + | Some pool -> P2p_pool.watch pool in + let shutdown () = Lwt_watcher.shutdown stopper in + let next () = Lwt_stream.get stream in + RPC_answer.return_stream { next ; shutdown } + end in + + let dir = + RPC_directory.register1 dir P2p_services.S.connect begin fun point () timeout -> + match net.pool with + | None -> failwith "The node has disable the P2P layer." | Some pool -> - Option.map + ignore (P2p_pool.connect ~timeout pool point : _ tzresult Lwt.t) ; + return () + end in + + (* Network : Connection *) + + let dir = + RPC_directory.opt_register1 dir P2p_services.Connections.S.info + begin fun peer_id () () -> + return @@ + Option.apply net.pool ~f: begin fun pool -> + Option.map ~f:P2p_pool.Connection.info (P2p_pool.Connection.find_by_peer_id pool peer_id) - ~f:P2p_pool.Connection.info - - let kick net peer_id wait = - match net.pool with - | None -> Lwt.return_unit - | Some pool -> - match P2p_pool.Connection.find_by_peer_id pool peer_id with - | None -> Lwt.return_unit - | Some conn -> P2p_pool.disconnect ~wait conn - - let list net = - match net.pool with - | None -> [] - | Some pool -> - P2p_pool.Connection.fold - pool ~init:[] - ~f:begin fun _peer_id c acc -> - P2p_pool.Connection.info c :: acc - end - - let count net = - match net.pool with - | None -> 0 - | Some pool -> P2p_pool.active_connections pool - end - - module Point = struct - - open P2p_point.Info - open P2p_point.State - - let info_of_point_info i = - let state = match P2p_point_state.get i with - | Requested _ -> Requested - | Accepted { current_peer_id ; _ } -> Accepted current_peer_id - | Running { current_peer_id ; _ } -> Running current_peer_id - | Disconnected -> Disconnected in - P2p_point_state.Info.{ - trusted = trusted i ; - state ; - greylisted_until = greylisted_until i ; - last_failed_connection = last_failed_connection i ; - last_rejected_connection = last_rejected_connection i ; - last_established_connection = last_established_connection i ; - last_disconnection = last_disconnection i ; - last_seen = last_seen i ; - last_miss = last_miss i ; - } - - let info net point = - match net.pool with - | None -> None - | Some pool -> - Option.map - (P2p_pool.Points.info pool point) - ~f:info_of_point_info - - let events ?(max=max_int) ?(rev=false) net point = - match net.pool with - | None -> [] - | Some pool -> - Option.unopt_map - (P2p_pool.Points.info pool point) - ~default:[] - ~f:begin fun pi -> - let evts = - P2p_point_state.Info.fold - pi ~init:[] ~f:(fun a e -> e :: a) in - (if rev then List.rev_sub else List.sub) evts max - end - - let watch net point = - match net.pool with - | None -> raise Not_found - | Some pool -> - match P2p_pool.Points.info pool point with - | None -> raise Not_found - | Some pi -> P2p_point_state.Info.watch pi - - let list ?(restrict=[]) net = - match net.pool with - | None -> [] - | Some pool -> - P2p_pool.Points.fold_known - pool ~init:[] - ~f:begin fun point i a -> - let info = info_of_point_info i in - match restrict with - | [] -> (point, info) :: a - | _ when List.mem info.state restrict -> (point, info) :: a - | _ -> a - end - - end - - module Peer_id = struct - - open P2p_peer.Info - open P2p_peer.State - - let info_of_peer_info pool i = - let state, id_point = match P2p_peer_state.get i with - | Accepted { current_point } -> Accepted, Some current_point - | Running { current_point } -> Running, Some current_point - | Disconnected -> Disconnected, None - in - let peer_id = P2p_peer_state.Info.peer_id i in - let score = P2p_pool.Peers.get_score pool peer_id in - let stat = - match P2p_pool.Connection.find_by_peer_id pool peer_id with - | None -> P2p_stat.empty - | Some conn -> P2p_pool.Connection.stat conn - in P2p_peer_state.Info.{ - score ; - trusted = trusted i ; - state ; - id_point ; - stat ; - last_failed_connection = last_failed_connection i ; - last_rejected_connection = last_rejected_connection i ; - last_established_connection = last_established_connection i ; - last_disconnection = last_disconnection i ; - last_seen = last_seen i ; - last_miss = last_miss i ; - } - - let info net peer_id = - match net.pool with - | None -> None - | Some pool -> begin - match P2p_pool.Peers.info pool peer_id with - | Some info -> Some (info_of_peer_info pool info) - | None -> None end + end in - let events ?(max=max_int) ?(rev=false) net peer_id = - match net.pool with - | None -> [] - | Some pool -> - Option.unopt_map - (P2p_pool.Peers.info pool peer_id) - ~default:[] - ~f:begin fun gi -> - let evts = P2p_peer_state.Info.fold gi - ~init:[] ~f:(fun a e -> e :: a) in - (if rev then List.rev_sub else List.sub) evts max - end + let dir = + RPC_directory.lwt_register1 dir P2p_services.Connections.S.kick + begin fun peer_id () wait -> + match net.pool with + | None -> Lwt.return_unit + | Some pool -> + match P2p_pool.Connection.find_by_peer_id pool peer_id with + | None -> Lwt.return_unit + | Some conn -> P2p_pool.disconnect ~wait conn + end in - let watch net peer_id = - match net.pool with - | None -> raise Not_found - | Some pool -> - match P2p_pool.Peers.info pool peer_id with - | None -> raise Not_found - | Some gi -> P2p_peer_state.Info.watch gi + let dir = + RPC_directory.register0 dir P2p_services.Connections.S.list + begin fun () () -> + match net.pool with + | None -> return [] + | Some pool -> + return @@ + P2p_pool.Connection.fold + pool ~init:[] + ~f:begin fun _peer_id c acc -> + P2p_pool.Connection.info c :: acc + end + end in - let list ?(restrict=[]) net = - match net.pool with - | None -> [] - | Some pool -> - P2p_pool.Peers.fold_known pool - ~init:[] - ~f:begin fun peer_id i a -> - let info = info_of_peer_info pool i in - match restrict with - | [] -> (peer_id, info) :: a - | _ when List.mem info.state restrict -> (peer_id, info) :: a - | _ -> a - end + (* Network : Peer_id *) - end + let dir = + RPC_directory.register0 dir P2p_services.Peers.S.list + begin fun () restrict -> + match net.pool with + | None -> return [] + | Some pool -> + return @@ + P2p_pool.Peers.fold_known pool + ~init:[] + ~f:begin fun peer_id i a -> + let info = info_of_peer_info pool i in + match restrict with + | [] -> (peer_id, info) :: a + | _ when List.mem info.state restrict -> (peer_id, info) :: a + | _ -> a + end + end in -end + let dir = + RPC_directory.opt_register1 dir P2p_services.Peers.S.info + begin fun peer_id () () -> + match net.pool with + | None -> return None + | Some pool -> + return @@ + Option.map ~f:(info_of_peer_info pool) + (P2p_pool.Peers.info pool peer_id) + end in + + let dir = + RPC_directory.gen_register1 dir P2p_services.Peers.S.events + begin fun peer_id () monitor -> + match net.pool with + | None -> RPC_answer.not_found + | Some pool -> + match P2p_pool.Peers.info pool peer_id with + | None -> RPC_answer.return [] + | Some gi -> + let rev = false and max = max_int in + let evts = + P2p_peer_state.Info.fold gi ~init:[] + ~f:(fun a e -> e :: a) in + let evts = (if rev then List.rev_sub else List.sub) evts max in + if not monitor then + RPC_answer.return evts + else + let stream, stopper = P2p_peer_state.Info.watch gi in + let shutdown () = Lwt_watcher.shutdown stopper in + let first_request = ref true in + let next () = + if not !first_request then begin + Lwt_stream.get stream >|= Option.map ~f:(fun i -> [i]) + end else begin + first_request := false ; + Lwt.return_some evts + end in + RPC_answer.return_stream { next ; shutdown } + end in + + (* Network : Point *) + + let dir = + RPC_directory.register0 dir P2p_services.Points.S.list + begin fun () restrict -> + match net.pool with + | None -> return [] + | Some pool -> + return @@ + P2p_pool.Points.fold_known + pool ~init:[] + ~f:begin fun point i a -> + let info = info_of_point_info i in + match restrict with + | [] -> (point, info) :: a + | _ when List.mem info.state restrict -> (point, info) :: a + | _ -> a + end + end in + + let dir = + RPC_directory.opt_register1 dir P2p_services.Points.S.info + begin fun point () () -> + match net.pool with + | None -> return None + | Some pool -> + return @@ + Option.map + (P2p_pool.Points.info pool point) + ~f:info_of_point_info + end in + + let dir = + RPC_directory.gen_register1 dir P2p_services.Points.S.events + begin fun point_id () monitor -> + match net.pool with + | None -> RPC_answer.not_found + | Some pool -> + match P2p_pool.Points.info pool point_id with + | None -> RPC_answer.return [] + | Some gi -> + let rev = false and max = max_int in + let evts = + P2p_point_state.Info.fold gi ~init:[] + ~f:(fun a e -> e :: a) in + let evts = (if rev then List.rev_sub else List.sub) evts max in + if not monitor then + RPC_answer.return evts + else + let stream, stopper = P2p_point_state.Info.watch gi in + let shutdown () = Lwt_watcher.shutdown stopper in + let first_request = ref true in + let next () = + if not !first_request then begin + Lwt_stream.get stream >|= Option.map ~f:(fun i -> [i]) + end else begin + first_request := false ; + Lwt.return_some evts + end in + RPC_answer.return_stream { next ; shutdown } + end in + + dir diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index cf686aefd..54fe15c81 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -183,63 +183,6 @@ val try_send : (** Send a message to all peers *) val broadcast : ('msg, 'meta) net -> 'msg -> unit -module RPC : sig - - val stat : ('msg, 'meta) net -> P2p_stat.t - - val watch : - ('msg, 'meta) net -> - P2p_connection.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper - val connect : ('msg, 'meta) net -> P2p_point.Id.t -> float -> unit tzresult Lwt.t - - module Connection : sig - val info : ('msg, 'meta) net -> P2p_peer.Id.t -> P2p_connection.Info.t option - val kick : ('msg, 'meta) net -> P2p_peer.Id.t -> bool -> unit Lwt.t - val list : ('msg, 'meta) net -> P2p_connection.Info.t list - val count : ('msg, 'meta) net -> int - end - - module Point : sig - - val info : - ('msg, 'meta) net -> P2p_point.Id.t -> P2p_point.Info.t option - - val list : - ?restrict: P2p_point.State.t list -> - ('msg, 'meta) net -> (P2p_point.Id.t * P2p_point.Info.t) list - - val events : - ?max:int -> ?rev:bool -> ('msg, 'meta) net -> P2p_point.Id.t -> - P2p_point.Pool_event.t list - - val watch : - ('msg, 'meta) net -> P2p_point.Id.t -> - P2p_point.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper - - end - - module Peer_id : sig - - val info : - ('msg, 'meta) net -> P2p_peer.Id.t -> P2p_peer.Info.t option - - val list : - ?restrict: P2p_peer.State.t list -> - ('msg, 'meta) net -> (P2p_peer.Id.t * P2p_peer.Info.t) list - - val events : - ?max: int -> ?rev: bool -> - ('msg, 'meta) net -> P2p_peer.Id.t -> - P2p_peer.Pool_event.t list - - val watch : - ('msg, 'meta) net -> P2p_peer.Id.t -> - P2p_peer.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper - - end - -end - val fold_connections : ('msg, 'meta) net -> init:'a -> f:(P2p_peer.Id.t -> ('msg, 'meta) connection -> 'a -> 'a) -> 'a @@ -252,6 +195,8 @@ val on_new_connection : ('msg, 'meta) net -> (P2p_peer.Id.t -> ('msg, 'meta) connection -> unit) -> unit +val build_rpc_directory : _ t -> unit RPC_directory.t + (**/**) module Raw : sig diff --git a/src/lib_p2p/tezos-p2p.opam b/src/lib_p2p/tezos-p2p.opam index 3dea88f12..59016613b 100644 --- a/src/lib_p2p/tezos-p2p.opam +++ b/src/lib_p2p/tezos-p2p.opam @@ -11,6 +11,7 @@ depends: [ "jbuilder" { build & >= "1.0+beta17" } "tezos-base" "tezos-stdlib-unix" + "tezos-shell-services" "alcotest-lwt" { test } ] build: [ diff --git a/src/lib_rpc/RPC_directory.ml b/src/lib_rpc/RPC_directory.ml index 6f9f935b6..516ae3cc3 100644 --- a/src/lib_rpc/RPC_directory.ml +++ b/src/lib_rpc/RPC_directory.ml @@ -32,6 +32,14 @@ let register dir service handler = | Ok o -> RPC_answer.return o | Error e -> RPC_answer.fail e) +let opt_register dir service handler = + gen_register dir service + (fun p q i -> + handler p q i >>= function + | Ok (Some o) -> RPC_answer.return o + | Ok None -> RPC_answer.not_found + | Error e -> RPC_answer.fail e) + let lwt_register dir service handler = gen_register dir service (fun p q i -> @@ -47,6 +55,13 @@ let register3 root s f = register root s (curry (S (S (S Z))) f) let register4 root s f = register root s (curry (S (S (S (S Z)))) f) let register5 root s f = register root s (curry (S (S (S (S (S Z))))) f) +let opt_register0 root s f = opt_register root s (curry Z f) +let opt_register1 root s f = opt_register root s (curry (S Z) f) +let opt_register2 root s f = opt_register root s (curry (S (S Z)) f) +let opt_register3 root s f = opt_register root s (curry (S (S (S Z))) f) +let opt_register4 root s f = opt_register root s (curry (S (S (S (S Z)))) f) +let opt_register5 root s f = opt_register root s (curry (S (S (S (S (S Z))))) f) + let gen_register0 root s f = gen_register root s (curry Z f) let gen_register1 root s f = gen_register root s (curry (S Z) f) let gen_register2 root s f = gen_register root s (curry (S (S Z)) f) diff --git a/src/lib_rpc/RPC_directory.mli b/src/lib_rpc/RPC_directory.mli index d5fdb9847..e32fe0004 100644 --- a/src/lib_rpc/RPC_directory.mli +++ b/src/lib_rpc/RPC_directory.mli @@ -18,6 +18,12 @@ val register: ('p -> 'q -> 'i -> 'o tzresult Lwt.t) -> 'prefix directory +val opt_register: + 'prefix directory -> + ([< Resto.meth ], 'prefix, 'p, 'q, 'i, 'o) RPC_service.t -> + ('p -> 'q -> 'i -> 'o option tzresult Lwt.t) -> + 'prefix directory + val gen_register: 'prefix directory -> ('meth, 'prefix, 'params, 'query, 'input, 'output) RPC_service.t -> @@ -69,6 +75,43 @@ val register5: 'prefix directory +val opt_register0: + unit directory -> + ('m, unit, unit, 'q, 'i, 'o) RPC_service.t -> + ('q -> 'i -> 'o option tzresult Lwt.t) -> + unit directory + +val opt_register1: + 'prefix directory -> + ('m, 'prefix, unit * 'a, 'q , 'i, 'o) RPC_service.t -> + ('a -> 'q -> 'i -> 'o option tzresult Lwt.t) -> + 'prefix directory + +val opt_register2: + 'prefix directory -> + ('m, 'prefix, (unit * 'a) * 'b, 'q , 'i, 'o) RPC_service.t -> + ('a -> 'b -> 'q -> 'i -> 'o option tzresult Lwt.t) -> + 'prefix directory + +val opt_register3: + 'prefix directory -> + ('m, 'prefix, ((unit * 'a) * 'b) * 'c, 'q , 'i, 'o) RPC_service.t -> + ('a -> 'b -> 'c -> 'q -> 'i -> 'o option tzresult Lwt.t) -> + 'prefix directory + +val opt_register4: + 'prefix directory -> + ('m, 'prefix, (((unit * 'a) * 'b) * 'c) * 'd, 'q , 'i, 'o) RPC_service.t -> + ('a -> 'b -> 'c -> 'd -> 'q -> 'i -> 'o option tzresult Lwt.t) -> + 'prefix directory + +val opt_register5: + 'prefix directory -> + ('m, 'prefix, ((((unit * 'a) * 'b) * 'c) * 'd) * 'e, 'q , 'i, 'o) RPC_service.t -> + ('a -> 'b -> 'c -> 'd -> 'e -> 'q -> 'i -> 'o option tzresult Lwt.t) -> + 'prefix directory + + val gen_register0: unit directory -> ('m, unit, unit, 'q, 'i, 'o) RPC_service.t -> diff --git a/src/lib_shell/node.ml b/src/lib_shell/node.ml index b1965c221..dc50483ce 100644 --- a/src/lib_shell/node.ml +++ b/src/lib_shell/node.ml @@ -667,65 +667,7 @@ module RPC = struct let shutdown () = Lwt_watcher.shutdown stopper in RPC_answer.{ next ; shutdown } - module Network = struct - - let stat (node : t) = - P2p.RPC.stat node.p2p - - let watch (node : t) = - P2p.RPC.watch node.p2p - - let connect (node : t) = - P2p.RPC.connect node.p2p - - module Connection = struct - - let info (node : t) = - P2p.RPC.Connection.info node.p2p - - let kick (node : t) = - P2p.RPC.Connection.kick node.p2p - - let list (node : t) = - P2p.RPC.Connection.list node.p2p - - let count (node : t) = - P2p.RPC.Connection.count node.p2p - - end - - module Point = struct - - let info (node : t) = - P2p.RPC.Point.info node.p2p - - let list ?restrict (node : t) = - P2p.RPC.Point.list ?restrict node.p2p - - let events ?max ?rev (node : t) = - P2p.RPC.Point.events node.p2p ?max ?rev - - let watch (node : t) = - P2p.RPC.Point.watch node.p2p - - end - - module Peer_id = struct - - let info (node : t) = - P2p.RPC.Peer_id.info node.p2p - - let list ?restrict (node : t) = - P2p.RPC.Peer_id.list ?restrict node.p2p - - let events ?max ?rev (node : t) = - P2p.RPC.Peer_id.events node.p2p ?max ?rev - - let watch (node : t) = - P2p.RPC.Peer_id.watch node.p2p - - end - - end + let build_p2p_rpc_directory (t : t) = + P2p.build_rpc_directory t.p2p end diff --git a/src/lib_shell/node.mli b/src/lib_shell/node.mli index d25658622..da3634a10 100644 --- a/src/lib_shell/node.mli +++ b/src/lib_shell/node.mli @@ -127,62 +127,8 @@ module RPC : sig val bootstrapped: t -> (Block_hash.t * Time.t) RPC_answer.stream - module Network : sig - val stat : t -> P2p_stat.t - - val watch : - t -> - P2p_connection.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper - val connect : t -> P2p_point.Id.t -> float -> unit tzresult Lwt.t - - module Connection : sig - val info : t -> P2p_peer.Id.t -> P2p_connection.Info.t option - val kick : t -> P2p_peer.Id.t -> bool -> unit Lwt.t - val list : t -> P2p_connection.Info.t list - val count : t -> int - end - - module Point : sig - - val info : - t -> P2p_point.Id.t -> P2p_point.Info.t option - - val list : - ?restrict: P2p_point.State.t list -> - t -> (P2p_point.Id.t * P2p_point.Info.t) list - - val events : - ?max:int -> ?rev:bool -> t -> P2p_point.Id.t -> - P2p_point.Pool_event.t list - - val watch : - t -> P2p_point.Id.t -> - P2p_point.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper - - end - - module Peer_id : sig - - val info : - t -> P2p_peer.Id.t -> P2p_peer.Info.t option - - val list : - ?restrict: P2p_peer.State.t list -> - t -> (P2p_peer.Id.t * P2p_peer.Info.t) list - - val events : - ?max: int -> ?rev: bool -> - t -> P2p_peer.Id.t -> - P2p_peer.Pool_event.t list - - val watch : - t -> P2p_peer.Id.t -> - P2p_peer.Pool_event.t Lwt_stream.t * Lwt_watcher.stopper - - end - - end + val build_p2p_rpc_directory: t -> unit RPC_directory.t end diff --git a/src/lib_shell/node_rpc.ml b/src/lib_shell/node_rpc.ml index 6f08df840..3ee6dc51b 100644 --- a/src/lib_shell/node_rpc.ml +++ b/src/lib_shell/node_rpc.ml @@ -509,103 +509,11 @@ let build_rpc_directory node = backlog = Net_validator.last_events w ; current_request = Net_validator.current_request w }) in - (* Network : Global *) + (* Network *) + let dir = RPC_directory.merge dir (Node.RPC.build_p2p_rpc_directory node) in let dir = - let implementation () () = Node.RPC.Network.stat node |> return in - RPC_directory.register0 dir P2p_services.S.stat implementation in - let dir = - let implementation () () = - return Distributed_db.Raw.supported_versions in - RPC_directory.register0 dir P2p_services.S.versions implementation in - let dir = - let implementation () () = - let stream, stopper = Node.RPC.Network.watch node in - let shutdown () = Lwt_watcher.shutdown stopper in - let next () = Lwt_stream.get stream in - RPC_answer.return_stream { next ; shutdown } in - RPC_directory.gen_register0 dir P2p_services.S.events implementation in - let dir = - let implementation point () timeout = - Node.RPC.Network.connect node point timeout in - RPC_directory.register1 dir P2p_services.S.connect implementation in + RPC_directory.register_describe_directory_service + dir RPC_service.description_service in - (* Network : Connection *) - - let dir = - let implementation peer_id () () = - match Node.RPC.Network.Connection.info node peer_id with - | None -> raise Not_found - | Some v -> return v in - RPC_directory.register1 dir P2p_services.Connections.S.info implementation in - let dir = - let implementation peer_id () wait = - Node.RPC.Network.Connection.kick node peer_id wait >>= return in - RPC_directory.register1 dir P2p_services.Connections.S.kick implementation in - let dir = - let implementation () () = - Node.RPC.Network.Connection.list node |> return in - RPC_directory.register0 dir P2p_services.Connections.S.list implementation in - - (* Network : Peer_id *) - - let dir = - let implementation () state = - Node.RPC.Network.Peer_id.list node ~restrict:state |> return in - RPC_directory.register0 dir P2p_services.Peers.S.list implementation in - let dir = - let implementation peer_id () () = - match Node.RPC.Network.Peer_id.info node peer_id with - | None -> raise Not_found - | Some v -> return v in - RPC_directory.register1 dir P2p_services.Peers.S.info implementation in - let dir = - let implementation peer_id () monitor = - if monitor then - let stream, stopper = Node.RPC.Network.Peer_id.watch node peer_id in - let shutdown () = Lwt_watcher.shutdown stopper in - let first_request = ref true in - let next () = - if not !first_request then begin - Lwt_stream.get stream >|= Option.map ~f:(fun i -> [i]) - end else begin - first_request := false ; - Lwt.return_some @@ Node.RPC.Network.Peer_id.events node peer_id - end in - RPC_answer.return_stream { next ; shutdown } - else - Node.RPC.Network.Peer_id.events node peer_id |> RPC_answer.return in - RPC_directory.gen_register1 dir P2p_services.Peers.S.events implementation in - - (* Network : Point *) - - let dir = - let implementation () state = - Node.RPC.Network.Point.list node ~restrict:state |> return in - RPC_directory.register0 dir P2p_services.Points.S.list implementation in - let dir = - let implementation point () () = - match Node.RPC.Network.Point.info node point with - | None -> raise Not_found - | Some v -> return v in - RPC_directory.register1 dir P2p_services.Points.S.info implementation in - let dir = - let implementation point () monitor = - if monitor then - let stream, stopper = Node.RPC.Network.Point.watch node point in - let shutdown () = Lwt_watcher.shutdown stopper in - let first_request = ref true in - let next () = - if not !first_request then begin - Lwt_stream.get stream >|= Option.map ~f:(fun i -> [i]) - end else begin - first_request := false ; - Lwt.return_some @@ Node.RPC.Network.Point.events node point - end in - RPC_answer.return_stream { next ; shutdown } - else - Node.RPC.Network.Point.events node point |> RPC_answer.return in - RPC_directory.gen_register1 dir P2p_services.Points.S.events implementation in - let dir = - RPC_directory.register_describe_directory_service dir RPC_service.description_service in dir