diff --git a/src/bin_client/tezos-init-sandboxed-client.sh b/src/bin_client/tezos-init-sandboxed-client.sh index 4250db669..52cd53f58 100755 --- a/src/bin_client/tezos-init-sandboxed-client.sh +++ b/src/bin_client/tezos-init-sandboxed-client.sh @@ -169,7 +169,7 @@ usage() { echo "Small script to initialize a client to a local and closed test network with a maximum of 9 nodes." echo echo "Usage: eval \`$0 \`" - echo " where should be an integer between 1 and 9." + echo " where should be a positive integer." } main () { @@ -189,7 +189,7 @@ main () { local_compiler="${local_compiler:-$(which tezos-protocol-compiler)}" fi - if [ $# -lt 1 ] || [ "$1" -le 0 ] || [ 10 -le "$1" ]; then + if [ $# -lt 1 ] || [ "$1" -le 0 ] ; then usage exit 1 fi diff --git a/src/bin_node/node_config_file.ml b/src/bin_node/node_config_file.ml index 0b8d9563c..f3a1f4d13 100644 --- a/src/bin_node/node_config_file.ml +++ b/src/bin_node/node_config_file.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -30,8 +31,9 @@ let home = with Not_found -> "/root" let default_data_dir = home // ".tezos-node" -let default_p2p_port = 9732 -let default_rpc_port = 8732 +let default_rpc_port = 8732 +let default_p2p_port = 9732 +let default_discovery_port = 10732 type t = { data_dir : string ; @@ -45,6 +47,7 @@ and p2p = { expected_pow : float ; bootstrap_peers : string list ; listen_addr : string option ; + discovery_addr : string option ; private_mode : bool ; limits : P2p.limits ; disable_mempool : bool ; @@ -97,8 +100,9 @@ let default_p2p_limits : P2p.limits = { let default_p2p = { expected_pow = 26. ; bootstrap_peers = [] ; - listen_addr = Some ("[::]:" ^ string_of_int default_p2p_port) ; - private_mode = false ; + listen_addr = Some ("[::]:" ^ string_of_int default_p2p_port) ; + discovery_addr = None ; + private_mode = false ; limits = default_p2p_limits ; disable_mempool = false ; } @@ -241,15 +245,15 @@ let limit : P2p.limits Data_encoding.t = let p2p = let open Data_encoding in conv - (fun { expected_pow ; bootstrap_peers ; - listen_addr ; private_mode ; limits ; disable_mempool } -> - ( expected_pow, bootstrap_peers, - listen_addr, private_mode, limits, disable_mempool )) - (fun ( expected_pow, bootstrap_peers, - listen_addr, private_mode, limits, disable_mempool ) -> - { expected_pow ; bootstrap_peers ; - listen_addr ; private_mode ; limits ; disable_mempool }) - (obj6 + (fun { expected_pow ; bootstrap_peers ; listen_addr ; discovery_addr ; + private_mode ; limits ; disable_mempool } -> + ( expected_pow, bootstrap_peers, listen_addr, discovery_addr , + private_mode, limits, disable_mempool )) + (fun ( expected_pow, bootstrap_peers, listen_addr, discovery_addr, + private_mode, limits, disable_mempool ) -> + { expected_pow ; bootstrap_peers ; listen_addr ; discovery_addr ; + private_mode ; limits ; disable_mempool }) + (obj7 (dft "expected-proof-of-work" ~description: "Floating point number between 0 and 256 that represents a \ difficulty, 24 signifies for example that at least 24 leading \ @@ -264,6 +268,11 @@ let p2p = specified, the default port 8732 will be \ assumed." string) + (dft "discovery-addr" + ~description: "Host for local peer discovery. If the port is not \ + specified, the default port 10732 will be \ + assumed." + (option string) default_p2p.discovery_addr) (dft "private-mode" ~description: "Specify if the node is in private mode or \ not. A node in private mode rejects incoming \ @@ -493,6 +502,7 @@ let update ?expected_pow ?bootstrap_peers ?listen_addr + ?discovery_addr ?rpc_listen_addr ?(private_mode = false) ?(disable_mempool = false) @@ -544,6 +554,8 @@ let update Option.unopt ~default:cfg.p2p.bootstrap_peers bootstrap_peers ; listen_addr = Option.first_some listen_addr cfg.p2p.listen_addr ; + discovery_addr = + Option.first_some discovery_addr cfg.p2p.discovery_addr ; private_mode = cfg.p2p.private_mode || private_mode ; limits ; disable_mempool = cfg.p2p.disable_mempool || disable_mempool ; @@ -577,39 +589,61 @@ let update in return { data_dir ; p2p ; rpc ; log ; shell } -let resolve_addr ?default_port ?(passive = false) peer = +let resolve_addr ~default_addr ?default_port ?(passive = false) peer = let addr, port = P2p_point.Id.parse_addr_port peer in - let node = if addr = "" || addr = "_" then "::" else addr + let node = if addr = "" || addr = "_" then default_addr else addr and service = match port, default_port with - | "", None -> - invalid_arg "" + | "", None -> invalid_arg "" | "", Some default_port -> string_of_int default_port | port, _ -> port in Lwt_utils_unix.getaddrinfo ~passive ~node ~service -let resolve_addrs ?default_port ?passive peers = +let resolve_addrs ~default_addr ?default_port ?passive peers = Lwt_list.fold_left_s begin fun a peer -> - resolve_addr ?default_port ?passive peer >>= fun points -> + resolve_addr ~default_addr ?default_port ?passive peer >>= fun points -> Lwt.return (List.rev_append points a) end [] peers +let resolve_discovery_addrs discovery_addr = + resolve_addr + ~default_addr:Ipaddr.V4.(to_string broadcast) + ~default_port:default_discovery_port + ~passive:true + discovery_addr + >>= fun addrs -> + let rec to_ipv4 acc = function + | [] -> Lwt.return (List.rev acc) + | (ip, port) :: xs -> begin match Ipaddr.v4_of_v6 ip with + | Some v -> to_ipv4 ((v, port) :: acc) xs + | None -> + Format.eprintf + "Warning: failed to convert %S to an ipv4 address@." + (Ipaddr.V6.to_string ip) ; + to_ipv4 acc xs + end + in to_ipv4 [] addrs + let resolve_listening_addrs listen_addr = resolve_addr + ~default_addr:"::" ~default_port:default_p2p_port ~passive:true listen_addr let resolve_rpc_listening_addrs listen_addr = resolve_addr + ~default_addr:"::" ~default_port:default_rpc_port ~passive:true listen_addr let resolve_bootstrap_addrs peers = resolve_addrs + ~default_addr:"::" ~default_port:default_p2p_port peers + let check_listening_addr config = match config.p2p.listen_addr with | None -> Lwt.return_unit @@ -628,6 +662,24 @@ let check_listening_addr config = | exn -> Lwt.fail exn end +let check_discovery_addr config = + match config.p2p.discovery_addr with + | None -> Lwt.return_unit + | Some addr -> + Lwt.catch begin fun () -> + resolve_discovery_addrs addr >>= function + | [] -> + Format.eprintf "Warning: failed to resolve %S\n@." addr ; + Lwt.return_unit + | _ :: _ -> + Lwt.return_unit + end begin function + | (Invalid_argument msg) -> + Format.eprintf "Warning: failed to parse %S:\ %s\n@." addr msg ; + Lwt.return_unit + | exn -> Lwt.fail exn + end + let check_rpc_listening_addr config = match config.rpc.listen_addr with | None -> Lwt.return_unit @@ -711,6 +763,7 @@ let check_connections config = let check config = check_listening_addr config >>= fun () -> check_rpc_listening_addr config >>= fun () -> + check_discovery_addr config >>= fun () -> check_bootstrap_peers config >>= fun () -> check_connections config ; Lwt.return_unit diff --git a/src/bin_node/node_config_file.mli b/src/bin_node/node_config_file.mli index 6ca3849c7..98b90e637 100644 --- a/src/bin_node/node_config_file.mli +++ b/src/bin_node/node_config_file.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -35,6 +36,7 @@ and p2p = { expected_pow : float ; bootstrap_peers : string list ; listen_addr : string option ; + discovery_addr : string option ; private_mode : bool ; limits : P2p.limits ; disable_mempool : bool ; @@ -77,6 +79,7 @@ val update: ?expected_pow:float -> ?bootstrap_peers:string list -> ?listen_addr:string -> + ?discovery_addr:string -> ?rpc_listen_addr:string -> ?private_mode:bool -> ?disable_mempool:bool -> @@ -92,6 +95,7 @@ val read: string -> t tzresult Lwt.t val write: string -> t -> unit tzresult Lwt.t val resolve_listening_addrs: string -> (P2p_addr.t * int) list Lwt.t +val resolve_discovery_addrs: string -> (Ipaddr.V4.t * int) list Lwt.t val resolve_rpc_listening_addrs: string -> (P2p_addr.t * int) list Lwt.t val resolve_bootstrap_addrs: string list -> (P2p_addr.t * int) list Lwt.t diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index 578814edf..325c2f343 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -118,6 +119,18 @@ let init_node ?sandbox ?checkpoint (config : Node_config_file.t) = Lwt.return_some json end >>= fun sandbox_param -> (* TODO "WARN" when pow is below our expectation. *) + begin + match config.p2p.discovery_addr with + | None -> + lwt_log_notice "No local peer discovery." >>= fun () -> + return (None, None) + | Some addr -> + Node_config_file.resolve_discovery_addrs addr >>= function + | [] -> + failwith "Cannot resolve P2P discovery address: %S" addr + | (addr, port) :: _ -> + return (Some addr, Some port) + end >>=? fun (discovery_addr, discovery_port) -> begin match config.p2p.listen_addr with | None -> @@ -149,6 +162,8 @@ let init_node ?sandbox ?checkpoint (config : Node_config_file.t) = let p2p_config : P2p.config = { listening_addr ; listening_port ; + discovery_addr ; + discovery_port ; trusted_points ; peers_file = (config.data_dir // "peers.json") ; diff --git a/src/bin_node/node_shared_arg.ml b/src/bin_node/node_shared_arg.ml index a3d35b7f4..682583801 100644 --- a/src/bin_node/node_shared_arg.ml +++ b/src/bin_node/node_shared_arg.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -42,6 +43,7 @@ type t = { peers: string list ; no_bootstrap_peers: bool ; listen_addr: string option ; + discovery_addr: string option ; rpc_listen_addr: string option ; private_mode: bool ; disable_mempool: bool ; @@ -56,7 +58,8 @@ let wrap data_dir config_file connections max_download_speed max_upload_speed binary_chunks_size peer_table_size - listen_addr peers no_bootstrap_peers bootstrap_threshold private_mode disable_mempool + listen_addr discovery_addr peers no_bootstrap_peers + bootstrap_threshold private_mode disable_mempool expected_pow rpc_listen_addr rpc_tls cors_origins cors_headers log_output = @@ -100,6 +103,7 @@ let wrap peers ; no_bootstrap_peers ; listen_addr ; + discovery_addr ; rpc_listen_addr ; private_mode ; disable_mempool ; @@ -188,9 +192,7 @@ module Term = struct let binary_chunks_size = let doc = - Format.sprintf - "Size limit (in kB) of binary blocks that are sent to other peers." - in + "Size limit (in kB) of binary blocks that are sent to other peers." in Arg.(value & opt (some int) None & info ~docs ~doc ~docv:"NUM" ["binary-chunks-size"]) @@ -207,6 +209,11 @@ module Term = struct Arg.(value & opt (some string) None & info ~docs ~doc ~docv:"ADDR:PORT" ["net-addr"]) + let discovery_addr = + let doc = "The UDP address and port used for local peer discovery." in + Arg.(value & opt (some string) None & + info ~docs ~doc ~docv:"ADDR:PORT" ["discovery-addr"]) + let no_bootstrap_peers = let doc = "Ignore the peers found in the config file (or the hard-coded \ @@ -286,7 +293,8 @@ module Term = struct $ connections $ max_download_speed $ max_upload_speed $ binary_chunks_size $ peer_table_size - $ listen_addr $ peers $ no_bootstrap_peers $ bootstrap_threshold + $ listen_addr $ discovery_addr $ peers $ no_bootstrap_peers + $ bootstrap_threshold $ private_mode $ disable_mempool $ expected_pow $ rpc_listen_addr $ rpc_tls $ cors_origins $ cors_headers @@ -308,6 +316,7 @@ let read_and_patch_config_file ?(ignore_bootstrap_peers=false) args = expected_pow ; peers ; no_bootstrap_peers ; listen_addr ; private_mode ; + discovery_addr ; disable_mempool ; rpc_listen_addr ; rpc_tls ; cors_origins ; cors_headers ; @@ -325,6 +334,6 @@ let read_and_patch_config_file ?(ignore_bootstrap_peers=false) args = ?data_dir ?min_connections ?expected_connections ?max_connections ?max_download_speed ?max_upload_speed ?binary_chunks_size ?peer_table_size ?expected_pow - ~bootstrap_peers ?listen_addr ?rpc_listen_addr ~private_mode + ~bootstrap_peers ?listen_addr ?discovery_addr ?rpc_listen_addr ~private_mode ~disable_mempool ~cors_origins ~cors_headers ?rpc_tls ?log_output ?bootstrap_threshold cfg diff --git a/src/bin_node/node_shared_arg.mli b/src/bin_node/node_shared_arg.mli index f5afa2506..c2bf744f7 100644 --- a/src/bin_node/node_shared_arg.mli +++ b/src/bin_node/node_shared_arg.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -37,6 +38,7 @@ type t = { peers: string list ; no_bootstrap_peers: bool ; listen_addr: string option ; + discovery_addr: string option ; rpc_listen_addr: string option ; private_mode: bool ; disable_mempool: bool ; diff --git a/src/bin_node/tezos-sandboxed-node.sh b/src/bin_node/tezos-sandboxed-node.sh index 0f1304778..b0aac6bab 100755 --- a/src/bin_node/tezos-sandboxed-node.sh +++ b/src/bin_node/tezos-sandboxed-node.sh @@ -17,11 +17,8 @@ start_sandboxed_node() { expected_connections="${expected_connections:-3}" node_dir="$(mktemp -d -t tezos-node.XXXXXXXX)" peers=("--no-bootstrap-peers") - for peer_port in $(seq 19730 $((19730 + max_peer_id))); do - peers+=("--peer") - peers+=("127.0.0.1:$peer_port") - done - peers+=("--private-mode") +# peers+=("--private-mode") ## Should we accept discovered peers as trusted nodes ? + node="${local_node}" sandbox_param="--sandbox=$sandbox_file" @@ -41,6 +38,7 @@ EOF --data-dir "$node_dir" \ --net-addr "127.0.0.1:$port" \ --rpc-addr "127.0.0.1:$rpc" \ + --discovery-addr "127.255.255.255" \ --rpc-tls "${node_dir}/tezos.crt,${node_dir}/tezos.key" \ --expected-pow "$expected_pow" \ --connections "$expected_connections" @@ -169,6 +167,7 @@ EOF --data-dir "$node_dir" \ --net-addr "127.0.0.1:$port" \ --rpc-addr "127.0.0.1:$rpc" \ + --discovery-addr "127.255.255.255" \ --expected-pow "$expected_pow" \ --connections "$expected_connections" fi @@ -197,11 +196,11 @@ main() { sandbox_file="${sandbox_file:-sandbox.json}" fi - if [ $# -lt 1 ] || [ "$1" -le 0 ] || [ 10 -le "$1" ]; then + if [ $# -lt 1 ] || [ "$1" -le 0 ]; then echo "Small script to launch local and closed test network with a maximum of 9 nodes." echo echo "Usage: $0 " - echo " where should be an integer between 1 and 9." + echo " where should be a positive integer." exit 1 fi diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index 70bce3146..bcda18bf2 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -53,8 +54,10 @@ type 'msg message_config = 'msg P2p_pool.message_config = { } type config = { - listening_port : P2p_addr.port option; - listening_addr : P2p_addr.t option; + listening_port : P2p_addr.port option ; + listening_addr : P2p_addr.t option ; + discovery_port : P2p_addr.port option ; + discovery_addr : Ipaddr.V4.t option ; trusted_points : P2p_point.Id.t list ; peers_file : string ; private_mode : bool ; @@ -138,6 +141,16 @@ let create_connection_pool config limits meta_cfg conn_meta_cfg msg_cfg io_sched P2p_pool.create pool_cfg meta_cfg conn_meta_cfg msg_cfg io_sched in pool +let may_create_discovery_worker _limits config pool = + match (config.listening_port, config.discovery_port, config.discovery_addr) with + | (Some listening_port, Some discovery_port, Some discovery_addr) -> + Some (P2p_discovery.create pool + config.identity.peer_id + ~listening_port + ~discovery_port ~discovery_addr) + | (_, _, _) -> + None + let bounds ~min ~expected ~max = assert (min <= expected) ; assert (expected <= max) ; @@ -151,14 +164,16 @@ let bounds ~min ~expected ~max = max_threshold = max - step_max ; } -let create_maintenance_worker limits pool = +let create_maintenance_worker limits pool config = let bounds = bounds ~min:limits.min_connections ~expected:limits.expected_connections ~max:limits.max_connections in - P2p_maintenance.create bounds pool + let discovery = + may_create_discovery_worker limits config pool in + P2p_maintenance.create ?discovery bounds pool let may_create_welcome_worker config limits pool = match config.listening_port with @@ -188,7 +203,7 @@ module Real = struct let io_sched = create_scheduler limits in create_connection_pool config limits meta_cfg conn_meta_cfg msg_cfg io_sched >>= fun pool -> - let maintenance = create_maintenance_worker limits pool in + let maintenance = create_maintenance_worker limits pool config in may_create_welcome_worker config limits pool >>= fun welcome -> return { config ; @@ -212,7 +227,7 @@ module Real = struct | None -> () | Some w -> P2p_welcome.activate w end ; - P2p_maintenance.activate t.maintenance; + P2p_maintenance.activate t.maintenance ; Lwt.async (fun () -> P2p_maintenance.maintain t.maintenance) ; () diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index 9a810d860..d11009883 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -62,12 +63,20 @@ type config = { listening_port : P2p_addr.port option; (** Tells if incoming connections accepted, precising the TCP port - on which the peer can be reached *) + on which the peer can be reached (default: [9732])*) listening_addr : P2p_addr.t option; - (** When incoming connections are accepted, precising on which + (** When incoming connections are accepted, precise on which IP adddress the node listen (default: [[::]]). *) + discovery_port : P2p_addr.port option; + (** Tells if local peer discovery is enabled, precising the TCP port + on which the peer can be reached (default: [10732]) *) + + discovery_addr : Ipaddr.V4.t option; + (** When local peer discovery is enabled, precise on which + IP address messages are broadcasted (default: [255.255.255.255]). *) + trusted_points : P2p_point.Id.t list ; (** List of hard-coded known peers to bootstrap the network from. *) @@ -296,4 +305,3 @@ module Raw : sig | Disconnect val encoding: 'msg app_message_encoding list -> 'msg t Data_encoding.t end - diff --git a/src/lib_p2p/p2p_discovery.ml b/src/lib_p2p/p2p_discovery.ml new file mode 100644 index 000000000..c306e3c37 --- /dev/null +++ b/src/lib_p2p/p2p_discovery.ml @@ -0,0 +1,272 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +include Logging.Make (struct let name = "p2p.discovery" end) + +type pool = Pool : ('msg, 'meta, 'meta_conn) P2p_pool.t -> pool + +module Message = struct + + let encoding = + Data_encoding.(tup3 (Fixed.string 10) P2p_peer.Id.encoding int16) + + let length = Data_encoding.Binary.fixed_length_exn encoding + + let key = "DISCOMAGIC" + + let make peer_id port = + Data_encoding.Binary.to_bytes_exn encoding (key, peer_id, port) + +end + +module Answer = struct + + type t = { + my_peer_id: P2p_peer.Id.t ; + pool: pool ; + discovery_port: int ; + canceler: Lwt_canceler.t ; + mutable worker: unit Lwt.t ; + } + + let create_socket st = + Lwt.catch + begin fun () -> + let socket = Lwt_unix.socket PF_INET SOCK_DGRAM 0 in + Lwt_canceler.on_cancel st.canceler (fun () -> + Lwt_utils_unix.safe_close socket + ) ; + Lwt_unix.setsockopt socket SO_BROADCAST true ; + Lwt_unix.setsockopt socket SO_REUSEADDR true ; + let addr = Lwt_unix.ADDR_INET (Unix.inet_addr_any, st.discovery_port) in + Lwt_unix.bind socket addr >>= fun () -> + Lwt.return socket + end + begin fun exn -> + lwt_debug "Error creating a socket" >>= fun () -> + Lwt.fail exn + end + + let loop st = + protect ~canceler:st.canceler begin fun () -> + create_socket st >>= fun socket -> + return socket + end >>=? fun socket -> + (* Infinite loop, should never exit. *) + let rec aux () = + let buf = MBytes.create Message.length in + protect ~canceler:st.canceler begin fun () -> + Lwt_bytes.recvfrom socket buf 0 Message.length [] >>= fun content -> + lwt_debug "Received discovery message..." >>= fun () -> + return content + end >>=? function + | (len, Lwt_unix.ADDR_INET (remote_addr, _)) + when Compare.Int.equal len Message.length -> + begin match Data_encoding.Binary.of_bytes Message.encoding buf with + | Some (key, remote_peer_id, remote_port) + when Compare.String.equal key Message.key + && not (P2p_peer.Id.equal remote_peer_id st.my_peer_id) -> + let s_addr = Unix.string_of_inet_addr remote_addr in + begin match P2p_addr.of_string_opt s_addr with + | None -> + lwt_debug "Failed to parse %S\n@." s_addr >>= fun () -> + aux () + | Some addr -> + let Pool pool = st.pool in + lwt_log_info "Registering new point %a:%d" + P2p_addr.pp addr remote_port >>= fun () -> + P2p_pool.register_new_point pool st.my_peer_id + (addr, remote_port) ; + aux () + end + | _ -> aux () + end + | _ -> aux () + in aux () + + let worker_loop st = + loop st >>= function + | Error [ Canceled ] -> + Lwt.return_unit + | Error err -> + lwt_log_error + "@[Unexpected error in answer worker@ %a@]" + pp_print_error err >>= fun () -> + Lwt_canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + | Ok () -> + lwt_log_error + "@[Unexpected exit in answer worker@]" >>= fun () -> + Lwt_canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + + let create my_peer_id pool ~discovery_port = { + canceler = Lwt_canceler.create () ; + my_peer_id ; + discovery_port ; + pool = Pool pool ; + worker = Lwt.return_unit ; + } + + let activate st = + st.worker <- + Lwt_utils.worker "discovery_answer" + ~run:(fun () -> worker_loop st) + ~cancel:(fun () -> Lwt_canceler.cancel st.canceler) + +end + +(* ************************************************************ *) +(* Sender *) + +module Sender = struct + + type t = { + canceler: Lwt_canceler.t ; + my_peer_id: P2p_peer.Id.t ; + listening_port: int ; + discovery_port: int ; + discovery_addr: Ipaddr.V4.t ; + pool: pool ; + restart_discovery: unit Lwt_condition.t ; + mutable worker: unit Lwt.t ; + } + + module Config = struct + type t = { + delay: float; + loop: int; + } + let initial = { + delay = 0.1 ; + loop = 0 ; + } + let increase_delay config = { config with delay = 2.0 *. config.delay ; } + let max_loop = 10 + end + + let broadcast_message st = + let msg = Message.make st.my_peer_id st.listening_port in + Lwt.catch + begin fun () -> + let socket = Lwt_unix.(socket PF_INET SOCK_DGRAM 0) in + Lwt_canceler.on_cancel st.canceler (fun () -> + Lwt_utils_unix.safe_close socket + ) ; + Lwt_unix.setsockopt socket Lwt_unix.SO_BROADCAST true ; + let broadcast_ipv4 = Ipaddr_unix.V4.to_inet_addr st.discovery_addr in + let addr = Lwt_unix.ADDR_INET (broadcast_ipv4, st.discovery_port) in + Lwt_unix.connect socket addr >>= fun () -> + lwt_debug "Broadcasting discovery message..." >>= fun () -> + Lwt_bytes.sendto socket msg 0 Message.length [] addr >>= fun _len -> + Lwt_utils_unix.safe_close socket + end + begin fun _exn -> + lwt_debug "Error broadcasting a discovery request" >>= fun () -> + Lwt.return_unit + end + + let rec worker_loop sender_config st = + begin + protect ~canceler:st.canceler begin fun () -> + broadcast_message st >>= fun () -> + return_unit + end >>=? fun () -> + protect ~canceler:st.canceler begin fun () -> + Lwt.pick [ + begin + Lwt_condition.wait st.restart_discovery >>= fun () -> + return Config.initial + end ; + begin + Lwt_unix.sleep sender_config.Config.delay >>= fun () -> + return { sender_config with Config.loop = succ sender_config.loop ; } + end ; + ] + end + end >>= function + | Ok config when config.Config.loop = Config.max_loop -> + let new_sender_config = { + config with Config.loop = pred config.loop ; + } in + worker_loop new_sender_config st + | Ok config -> + let new_sender_config = Config.increase_delay config in + worker_loop new_sender_config st + | Error [ Canceled ] -> + Lwt.return_unit + | Error err -> + lwt_log_error + "@[Unexpected error in sender worker@ %a@]" + pp_print_error err >>= fun () -> + Lwt_canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + + let create + my_peer_id pool ~listening_port ~discovery_port ~discovery_addr = { + canceler = Lwt_canceler.create () ; + my_peer_id ; + listening_port ; + discovery_port ; + discovery_addr ; + restart_discovery = Lwt_condition.create () ; + pool = Pool pool ; + worker = Lwt.return_unit ; + } + + let activate st = + st.worker <- + Lwt_utils.worker "discovery_sender" + ~run:begin fun () -> worker_loop Config.initial st end + ~cancel:begin fun () -> Lwt_canceler.cancel st.canceler end + +end + +(* ********************************************************************** *) + +type t = { + answer: Answer.t ; + sender: Sender.t ; +} + +let create ~listening_port ~discovery_port ~discovery_addr pool my_peer_id = + let answer = Answer.create my_peer_id pool ~discovery_port in + let sender = + Sender.create + my_peer_id pool ~listening_port ~discovery_port ~discovery_addr in + { answer ; sender } + +let activate { answer ; sender } = + Answer.activate answer ; + Sender.activate sender + +let wakeup t = Lwt_condition.signal t.sender.restart_discovery () + +let shutdown t = + Lwt.join [ + Lwt_canceler.cancel t.answer.canceler ; + Lwt_canceler.cancel t.sender.canceler ; + ] diff --git a/src/lib_p2p/p2p_discovery.mli b/src/lib_p2p/p2p_discovery.mli new file mode 100644 index 000000000..27ad96de7 --- /dev/null +++ b/src/lib_p2p/p2p_discovery.mli @@ -0,0 +1,57 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + + +(** Local peer discovery. + + This module manages the discovery of local peers through UDP broadcasting. + It is composed of two workers: + - The sender worker whose role is to broadcast discovery messages. + - The answer worker whose role is to listen discovery messages and register new + peers in the current pool. + Discovery messages are composed of an arbitrary key, the listening port and + the peer id of the current peer. +*) + +(** Type of a discovery worker. *) +type t + +(** [create ~listening_port ~discovery_port ~discovery_addr pool peer_id] + returns a discovery worker registering local peers to the [pool] + and broadcasting discovery messages with the [peer_id] and + the [listening_port] through the address [discovery_addr:discovery_port]. *) +val create : listening_port:int -> discovery_port:int -> + discovery_addr:Ipaddr.V4.t -> ('a, 'b, 'c) P2p_pool.t -> P2p_peer.Table.key -> + t + +val activate : t -> unit + +(** [wakeup t] sends a signal to the sender machine of [t], asking it + to immediately proceed to broadcasting. *) +val wakeup : t -> unit + +(** [shutdown t] returns when [t] has completed shutdown. *) +val shutdown : t -> unit Lwt.t diff --git a/src/lib_p2p/p2p_maintenance.ml b/src/lib_p2p/p2p_maintenance.ml index 6e1155b28..617281aee 100644 --- a/src/lib_p2p/p2p_maintenance.ml +++ b/src/lib_p2p/p2p_maintenance.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -38,9 +39,10 @@ type 'meta t = { canceler: Lwt_canceler.t ; bounds: bounds ; pool: 'meta pool ; + discovery: P2p_discovery.t option ; just_maintained: unit Lwt_condition.t ; please_maintain: unit Lwt_condition.t ; - mutable maintain_worker : unit Lwt.t ; + mutable maintain_worker: unit Lwt.t ; } (** Select [expected] points among the disconnected known points. @@ -155,11 +157,14 @@ and too_few_connections st n_connected = if success then begin maintain st end else begin - (* not enough contacts, ask the pals of our pals, and then wait *) + (* not enough contacts, ask the pals of our pals, + discover the local network and then wait *) P2p_pool.broadcast_bootstrap_msg pool ; + Option.iter ~f:P2p_discovery.wakeup st.discovery ; protect ~canceler:st.canceler begin fun () -> Lwt.pick [ P2p_pool.Pool_event.wait_new_peer pool ; + P2p_pool.Pool_event.wait_new_point pool ; Lwt_unix.sleep 5.0 (* TODO exponential back-off ?? or wait for the existence of a non grey-listed peer ?? *) @@ -189,7 +194,7 @@ let rec worker_loop st = Lwt_unix.sleep 120. ; (* every two minutes *) Lwt_condition.wait st.please_maintain ; (* when asked *) P2p_pool.Pool_event.wait_too_few_connections pool ; (* limits *) - P2p_pool.Pool_event.wait_too_many_connections pool + P2p_pool.Pool_event.wait_too_many_connections pool ; ] >>= fun () -> return_unit end >>=? fun () -> @@ -206,9 +211,10 @@ let rec worker_loop st = | Error [ Canceled ] -> Lwt.return_unit | Error _ -> Lwt.return_unit -let create bounds pool = { - canceler = Lwt_canceler.create (); +let create ?discovery bounds pool = { + canceler = Lwt_canceler.create () ; bounds ; + discovery ; pool = Pool pool ; just_maintained = Lwt_condition.create () ; please_maintain = Lwt_condition.create () ; @@ -219,7 +225,8 @@ let activate st = st.maintain_worker <- Lwt_utils.worker "maintenance" ~run:(fun () -> worker_loop st) - ~cancel:(fun () -> Lwt_canceler.cancel st.canceler) + ~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ; + Option.iter st.discovery ~f:P2p_discovery.activate let maintain { just_maintained ; please_maintain } = let wait = Lwt_condition.wait just_maintained in @@ -228,10 +235,12 @@ let maintain { just_maintained ; please_maintain } = let shutdown { canceler ; + discovery ; maintain_worker ; - just_maintained } = + just_maintained ; + } = Lwt_canceler.cancel canceler >>= fun () -> + Lwt_utils.may ~f:P2p_discovery.shutdown discovery >>= fun () -> maintain_worker >>= fun () -> Lwt_condition.broadcast just_maintained () ; Lwt.return_unit - diff --git a/src/lib_p2p/p2p_maintenance.mli b/src/lib_p2p/p2p_maintenance.mli index b5f6f942f..0c93e15e5 100644 --- a/src/lib_p2p/p2p_maintenance.mli +++ b/src/lib_p2p/p2p_maintenance.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -52,11 +53,16 @@ type bounds = { type 'meta t (** Type of a maintenance worker. *) -val create : bounds -> ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta t -(** [create ~greylist_timeout bounds pool] prepares a maintenance - worker for [pool] with connection targets specified in [bounds]. *) +val create: + ?discovery:P2p_discovery.t -> + bounds -> + ('msg, 'meta, 'meta_conn) P2p_pool.t -> + 'meta t +(** [run ?discovery bounds pool] returns a maintenance worker, with + the [discovery] worker if present, for [pool] with connection targets + specified in [bounds]. *) -val activate : 'meta t -> unit +val activate: 'meta t -> unit (** [activate t] start the worker that will maintain connections *) val maintain: 'meta t -> unit Lwt.t diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index 8938e78cc..1c503651b 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -260,6 +261,7 @@ and events = { too_few_connections : unit Lwt_condition.t ; too_many_connections : unit Lwt_condition.t ; new_peer : unit Lwt_condition.t ; + new_point : unit Lwt_condition.t ; new_connection : unit Lwt_condition.t ; } @@ -285,6 +287,8 @@ module Pool_event = struct Lwt_condition.wait pool.events.too_many_connections let wait_new_peer pool = Lwt_condition.wait pool.events.new_peer + let wait_new_point pool = + Lwt_condition.wait pool.events.new_point let wait_new_connection pool = Lwt_condition.wait pool.events.new_connection end @@ -330,6 +334,7 @@ let register_point pool ?trusted _source_peer_id (addr, port as point) = if P2p_point.Table.length pool.known_points >= max then gc_points pool end ; P2p_point.Table.add pool.known_points point point_info ; + Lwt_condition.broadcast pool.events.new_point () ; log pool (New_point point) ; point_info | Some point_info -> point_info @@ -1028,9 +1033,9 @@ and register_new_points pool conn = List.iter (register_new_point pool source_peer_id) points ; Lwt.return_unit -and register_new_point pool _source_peer_id point = +and register_new_point pool source_peer_id point = if not (P2p_point.Table.mem pool.my_id_points point) then - ignore (register_point pool _source_peer_id point) + ignore (register_point pool source_peer_id point) and list_known_points ?(ignore_private = false) pool conn = if Connection.private_node conn then @@ -1177,6 +1182,7 @@ let create config peer_meta_config conn_meta_config message_config io_sched = too_few_connections = Lwt_condition.create () ; too_many_connections = Lwt_condition.create () ; new_peer = Lwt_condition.create () ; + new_point = Lwt_condition.create () ; new_connection = Lwt_condition.create () ; } in let pool = { diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index 238eb7791..ddd5e0598 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) +(* Copyright (c) 2019 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -119,6 +120,7 @@ type config = { known_peer_ids_history_size : int ; (** Size of the known peer_ids log buffer (default: 50) *) + known_points_history_size : int ; (** Size of the known points log buffer (default: 50) *) @@ -204,6 +206,10 @@ module Pool_event : sig (** [wait_new_peer pool] is determined when a new peer (i.e. authentication successful) gets added to the pool. *) + val wait_new_point: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t + (** [wait_new_point pool] is determined when a new point gets registered + to the pool. *) + val wait_new_connection: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t (** [wait_new_connection pool] is determined when a new connection is successfully established in the pool. *) @@ -230,7 +236,12 @@ val connect: val accept: ('msg, 'peer_meta,'conn_meta) pool -> P2p_fd.t -> P2p_point.Id.t -> unit (** [accept pool fd point] instructs [pool] to start the process of - accepting a connection from [fd]. Used by [P2p]. *) + accepting a connection from [fd]. Used by [P2p_welcome]. *) + +val register_new_point: + ('a, 'b, 'c) pool -> P2p_peer.Table.key -> P2p_point.Id.t -> unit +(** [register_new_point pool source_peer_id point] tries to register [point] + in pool's internal peer table. *) val disconnect: ?wait:bool -> ('msg, 'peer_meta,'conn_meta) connection -> unit Lwt.t @@ -429,4 +440,3 @@ module Message : sig val encoding: 'msg encoding list -> 'msg t Data_encoding.t end -