P2p: (re)add local peer discovery

This commit is contained in:
Grégoire Henry 2019-02-20 17:52:47 +01:00
parent f51b8ad998
commit e3f9ae4578
No known key found for this signature in database
GPG Key ID: 827A020B224844F1
15 changed files with 524 additions and 59 deletions

View File

@ -169,7 +169,7 @@ usage() {
echo "Small script to initialize a client to a local and closed test network with a maximum of 9 nodes."
echo
echo "Usage: eval \`$0 <id>\`"
echo " where <id> should be an integer between 1 and 9."
echo " where <id> should be a positive integer."
}
main () {
@ -189,7 +189,7 @@ main () {
local_compiler="${local_compiler:-$(which tezos-protocol-compiler)}"
fi
if [ $# -lt 1 ] || [ "$1" -le 0 ] || [ 10 -le "$1" ]; then
if [ $# -lt 1 ] || [ "$1" -le 0 ] ; then
usage
exit 1
fi

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -30,8 +31,9 @@ let home =
with Not_found -> "/root"
let default_data_dir = home // ".tezos-node"
let default_p2p_port = 9732
let default_rpc_port = 8732
let default_p2p_port = 9732
let default_discovery_port = 10732
type t = {
data_dir : string ;
@ -45,6 +47,7 @@ and p2p = {
expected_pow : float ;
bootstrap_peers : string list ;
listen_addr : string option ;
discovery_addr : string option ;
private_mode : bool ;
limits : P2p.limits ;
disable_mempool : bool ;
@ -98,6 +101,7 @@ let default_p2p = {
expected_pow = 26. ;
bootstrap_peers = [] ;
listen_addr = Some ("[::]:" ^ string_of_int default_p2p_port) ;
discovery_addr = None ;
private_mode = false ;
limits = default_p2p_limits ;
disable_mempool = false ;
@ -241,15 +245,15 @@ let limit : P2p.limits Data_encoding.t =
let p2p =
let open Data_encoding in
conv
(fun { expected_pow ; bootstrap_peers ;
listen_addr ; private_mode ; limits ; disable_mempool } ->
( expected_pow, bootstrap_peers,
listen_addr, private_mode, limits, disable_mempool ))
(fun ( expected_pow, bootstrap_peers,
listen_addr, private_mode, limits, disable_mempool ) ->
{ expected_pow ; bootstrap_peers ;
listen_addr ; private_mode ; limits ; disable_mempool })
(obj6
(fun { expected_pow ; bootstrap_peers ; listen_addr ; discovery_addr ;
private_mode ; limits ; disable_mempool } ->
( expected_pow, bootstrap_peers, listen_addr, discovery_addr ,
private_mode, limits, disable_mempool ))
(fun ( expected_pow, bootstrap_peers, listen_addr, discovery_addr,
private_mode, limits, disable_mempool ) ->
{ expected_pow ; bootstrap_peers ; listen_addr ; discovery_addr ;
private_mode ; limits ; disable_mempool })
(obj7
(dft "expected-proof-of-work"
~description: "Floating point number between 0 and 256 that represents a \
difficulty, 24 signifies for example that at least 24 leading \
@ -264,6 +268,11 @@ let p2p =
specified, the default port 8732 will be \
assumed."
string)
(dft "discovery-addr"
~description: "Host for local peer discovery. If the port is not \
specified, the default port 10732 will be \
assumed."
(option string) default_p2p.discovery_addr)
(dft "private-mode"
~description: "Specify if the node is in private mode or \
not. A node in private mode rejects incoming \
@ -493,6 +502,7 @@ let update
?expected_pow
?bootstrap_peers
?listen_addr
?discovery_addr
?rpc_listen_addr
?(private_mode = false)
?(disable_mempool = false)
@ -544,6 +554,8 @@ let update
Option.unopt ~default:cfg.p2p.bootstrap_peers bootstrap_peers ;
listen_addr =
Option.first_some listen_addr cfg.p2p.listen_addr ;
discovery_addr =
Option.first_some discovery_addr cfg.p2p.discovery_addr ;
private_mode = cfg.p2p.private_mode || private_mode ;
limits ;
disable_mempool = cfg.p2p.disable_mempool || disable_mempool ;
@ -577,39 +589,61 @@ let update
in
return { data_dir ; p2p ; rpc ; log ; shell }
let resolve_addr ?default_port ?(passive = false) peer =
let resolve_addr ~default_addr ?default_port ?(passive = false) peer =
let addr, port = P2p_point.Id.parse_addr_port peer in
let node = if addr = "" || addr = "_" then "::" else addr
let node = if addr = "" || addr = "_" then default_addr else addr
and service =
match port, default_port with
| "", None ->
invalid_arg ""
| "", None -> invalid_arg ""
| "", Some default_port -> string_of_int default_port
| port, _ -> port in
Lwt_utils_unix.getaddrinfo ~passive ~node ~service
let resolve_addrs ?default_port ?passive peers =
let resolve_addrs ~default_addr ?default_port ?passive peers =
Lwt_list.fold_left_s begin fun a peer ->
resolve_addr ?default_port ?passive peer >>= fun points ->
resolve_addr ~default_addr ?default_port ?passive peer >>= fun points ->
Lwt.return (List.rev_append points a)
end [] peers
let resolve_discovery_addrs discovery_addr =
resolve_addr
~default_addr:Ipaddr.V4.(to_string broadcast)
~default_port:default_discovery_port
~passive:true
discovery_addr
>>= fun addrs ->
let rec to_ipv4 acc = function
| [] -> Lwt.return (List.rev acc)
| (ip, port) :: xs -> begin match Ipaddr.v4_of_v6 ip with
| Some v -> to_ipv4 ((v, port) :: acc) xs
| None ->
Format.eprintf
"Warning: failed to convert %S to an ipv4 address@."
(Ipaddr.V6.to_string ip) ;
to_ipv4 acc xs
end
in to_ipv4 [] addrs
let resolve_listening_addrs listen_addr =
resolve_addr
~default_addr:"::"
~default_port:default_p2p_port
~passive:true
listen_addr
let resolve_rpc_listening_addrs listen_addr =
resolve_addr
~default_addr:"::"
~default_port:default_rpc_port
~passive:true
listen_addr
let resolve_bootstrap_addrs peers =
resolve_addrs
~default_addr:"::"
~default_port:default_p2p_port
peers
let check_listening_addr config =
match config.p2p.listen_addr with
| None -> Lwt.return_unit
@ -628,6 +662,24 @@ let check_listening_addr config =
| exn -> Lwt.fail exn
end
let check_discovery_addr config =
match config.p2p.discovery_addr with
| None -> Lwt.return_unit
| Some addr ->
Lwt.catch begin fun () ->
resolve_discovery_addrs addr >>= function
| [] ->
Format.eprintf "Warning: failed to resolve %S\n@." addr ;
Lwt.return_unit
| _ :: _ ->
Lwt.return_unit
end begin function
| (Invalid_argument msg) ->
Format.eprintf "Warning: failed to parse %S:\ %s\n@." addr msg ;
Lwt.return_unit
| exn -> Lwt.fail exn
end
let check_rpc_listening_addr config =
match config.rpc.listen_addr with
| None -> Lwt.return_unit
@ -711,6 +763,7 @@ let check_connections config =
let check config =
check_listening_addr config >>= fun () ->
check_rpc_listening_addr config >>= fun () ->
check_discovery_addr config >>= fun () ->
check_bootstrap_peers config >>= fun () ->
check_connections config ;
Lwt.return_unit

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -35,6 +36,7 @@ and p2p = {
expected_pow : float ;
bootstrap_peers : string list ;
listen_addr : string option ;
discovery_addr : string option ;
private_mode : bool ;
limits : P2p.limits ;
disable_mempool : bool ;
@ -77,6 +79,7 @@ val update:
?expected_pow:float ->
?bootstrap_peers:string list ->
?listen_addr:string ->
?discovery_addr:string ->
?rpc_listen_addr:string ->
?private_mode:bool ->
?disable_mempool:bool ->
@ -92,6 +95,7 @@ val read: string -> t tzresult Lwt.t
val write: string -> t -> unit tzresult Lwt.t
val resolve_listening_addrs: string -> (P2p_addr.t * int) list Lwt.t
val resolve_discovery_addrs: string -> (Ipaddr.V4.t * int) list Lwt.t
val resolve_rpc_listening_addrs: string -> (P2p_addr.t * int) list Lwt.t
val resolve_bootstrap_addrs: string list -> (P2p_addr.t * int) list Lwt.t

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -118,6 +119,18 @@ let init_node ?sandbox ?checkpoint (config : Node_config_file.t) =
Lwt.return_some json
end >>= fun sandbox_param ->
(* TODO "WARN" when pow is below our expectation. *)
begin
match config.p2p.discovery_addr with
| None ->
lwt_log_notice "No local peer discovery." >>= fun () ->
return (None, None)
| Some addr ->
Node_config_file.resolve_discovery_addrs addr >>= function
| [] ->
failwith "Cannot resolve P2P discovery address: %S" addr
| (addr, port) :: _ ->
return (Some addr, Some port)
end >>=? fun (discovery_addr, discovery_port) ->
begin
match config.p2p.listen_addr with
| None ->
@ -149,6 +162,8 @@ let init_node ?sandbox ?checkpoint (config : Node_config_file.t) =
let p2p_config : P2p.config =
{ listening_addr ;
listening_port ;
discovery_addr ;
discovery_port ;
trusted_points ;
peers_file =
(config.data_dir // "peers.json") ;

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -42,6 +43,7 @@ type t = {
peers: string list ;
no_bootstrap_peers: bool ;
listen_addr: string option ;
discovery_addr: string option ;
rpc_listen_addr: string option ;
private_mode: bool ;
disable_mempool: bool ;
@ -56,7 +58,8 @@ let wrap
data_dir config_file
connections max_download_speed max_upload_speed binary_chunks_size
peer_table_size
listen_addr peers no_bootstrap_peers bootstrap_threshold private_mode disable_mempool
listen_addr discovery_addr peers no_bootstrap_peers
bootstrap_threshold private_mode disable_mempool
expected_pow rpc_listen_addr rpc_tls
cors_origins cors_headers log_output =
@ -100,6 +103,7 @@ let wrap
peers ;
no_bootstrap_peers ;
listen_addr ;
discovery_addr ;
rpc_listen_addr ;
private_mode ;
disable_mempool ;
@ -188,9 +192,7 @@ module Term = struct
let binary_chunks_size =
let doc =
Format.sprintf
"Size limit (in kB) of binary blocks that are sent to other peers."
in
"Size limit (in kB) of binary blocks that are sent to other peers." in
Arg.(value & opt (some int) None &
info ~docs ~doc ~docv:"NUM" ["binary-chunks-size"])
@ -207,6 +209,11 @@ module Term = struct
Arg.(value & opt (some string) None &
info ~docs ~doc ~docv:"ADDR:PORT" ["net-addr"])
let discovery_addr =
let doc = "The UDP address and port used for local peer discovery." in
Arg.(value & opt (some string) None &
info ~docs ~doc ~docv:"ADDR:PORT" ["discovery-addr"])
let no_bootstrap_peers =
let doc =
"Ignore the peers found in the config file (or the hard-coded \
@ -286,7 +293,8 @@ module Term = struct
$ connections
$ max_download_speed $ max_upload_speed $ binary_chunks_size
$ peer_table_size
$ listen_addr $ peers $ no_bootstrap_peers $ bootstrap_threshold
$ listen_addr $ discovery_addr $ peers $ no_bootstrap_peers
$ bootstrap_threshold
$ private_mode $ disable_mempool
$ expected_pow $ rpc_listen_addr $ rpc_tls
$ cors_origins $ cors_headers
@ -308,6 +316,7 @@ let read_and_patch_config_file ?(ignore_bootstrap_peers=false) args =
expected_pow ;
peers ; no_bootstrap_peers ;
listen_addr ; private_mode ;
discovery_addr ;
disable_mempool ;
rpc_listen_addr ; rpc_tls ;
cors_origins ; cors_headers ;
@ -325,6 +334,6 @@ let read_and_patch_config_file ?(ignore_bootstrap_peers=false) args =
?data_dir ?min_connections ?expected_connections ?max_connections
?max_download_speed ?max_upload_speed ?binary_chunks_size
?peer_table_size ?expected_pow
~bootstrap_peers ?listen_addr ?rpc_listen_addr ~private_mode
~bootstrap_peers ?listen_addr ?discovery_addr ?rpc_listen_addr ~private_mode
~disable_mempool ~cors_origins ~cors_headers ?rpc_tls ?log_output
?bootstrap_threshold cfg

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -37,6 +38,7 @@ type t = {
peers: string list ;
no_bootstrap_peers: bool ;
listen_addr: string option ;
discovery_addr: string option ;
rpc_listen_addr: string option ;
private_mode: bool ;
disable_mempool: bool ;

View File

@ -17,11 +17,8 @@ start_sandboxed_node() {
expected_connections="${expected_connections:-3}"
node_dir="$(mktemp -d -t tezos-node.XXXXXXXX)"
peers=("--no-bootstrap-peers")
for peer_port in $(seq 19730 $((19730 + max_peer_id))); do
peers+=("--peer")
peers+=("127.0.0.1:$peer_port")
done
peers+=("--private-mode")
# peers+=("--private-mode") ## Should we accept discovered peers as trusted nodes ?
node="${local_node}"
sandbox_param="--sandbox=$sandbox_file"
@ -41,6 +38,7 @@ EOF
--data-dir "$node_dir" \
--net-addr "127.0.0.1:$port" \
--rpc-addr "127.0.0.1:$rpc" \
--discovery-addr "127.255.255.255" \
--rpc-tls "${node_dir}/tezos.crt,${node_dir}/tezos.key" \
--expected-pow "$expected_pow" \
--connections "$expected_connections"
@ -169,6 +167,7 @@ EOF
--data-dir "$node_dir" \
--net-addr "127.0.0.1:$port" \
--rpc-addr "127.0.0.1:$rpc" \
--discovery-addr "127.255.255.255" \
--expected-pow "$expected_pow" \
--connections "$expected_connections"
fi
@ -197,11 +196,11 @@ main() {
sandbox_file="${sandbox_file:-sandbox.json}"
fi
if [ $# -lt 1 ] || [ "$1" -le 0 ] || [ 10 -le "$1" ]; then
if [ $# -lt 1 ] || [ "$1" -le 0 ]; then
echo "Small script to launch local and closed test network with a maximum of 9 nodes."
echo
echo "Usage: $0 <id>"
echo " where <id> should be an integer between 1 and 9."
echo " where <id> should be a positive integer."
exit 1
fi

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -55,6 +56,8 @@ type 'msg message_config = 'msg P2p_pool.message_config = {
type config = {
listening_port : P2p_addr.port option ;
listening_addr : P2p_addr.t option ;
discovery_port : P2p_addr.port option ;
discovery_addr : Ipaddr.V4.t option ;
trusted_points : P2p_point.Id.t list ;
peers_file : string ;
private_mode : bool ;
@ -138,6 +141,16 @@ let create_connection_pool config limits meta_cfg conn_meta_cfg msg_cfg io_sched
P2p_pool.create pool_cfg meta_cfg conn_meta_cfg msg_cfg io_sched in
pool
let may_create_discovery_worker _limits config pool =
match (config.listening_port, config.discovery_port, config.discovery_addr) with
| (Some listening_port, Some discovery_port, Some discovery_addr) ->
Some (P2p_discovery.create pool
config.identity.peer_id
~listening_port
~discovery_port ~discovery_addr)
| (_, _, _) ->
None
let bounds ~min ~expected ~max =
assert (min <= expected) ;
assert (expected <= max) ;
@ -151,14 +164,16 @@ let bounds ~min ~expected ~max =
max_threshold = max - step_max ;
}
let create_maintenance_worker limits pool =
let create_maintenance_worker limits pool config =
let bounds =
bounds
~min:limits.min_connections
~expected:limits.expected_connections
~max:limits.max_connections
in
P2p_maintenance.create bounds pool
let discovery =
may_create_discovery_worker limits config pool in
P2p_maintenance.create ?discovery bounds pool
let may_create_welcome_worker config limits pool =
match config.listening_port with
@ -188,7 +203,7 @@ module Real = struct
let io_sched = create_scheduler limits in
create_connection_pool
config limits meta_cfg conn_meta_cfg msg_cfg io_sched >>= fun pool ->
let maintenance = create_maintenance_worker limits pool in
let maintenance = create_maintenance_worker limits pool config in
may_create_welcome_worker config limits pool >>= fun welcome ->
return {
config ;

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -62,12 +63,20 @@ type config = {
listening_port : P2p_addr.port option;
(** Tells if incoming connections accepted, precising the TCP port
on which the peer can be reached *)
on which the peer can be reached (default: [9732])*)
listening_addr : P2p_addr.t option;
(** When incoming connections are accepted, precising on which
(** When incoming connections are accepted, precise on which
IP adddress the node listen (default: [[::]]). *)
discovery_port : P2p_addr.port option;
(** Tells if local peer discovery is enabled, precising the TCP port
on which the peer can be reached (default: [10732]) *)
discovery_addr : Ipaddr.V4.t option;
(** When local peer discovery is enabled, precise on which
IP address messages are broadcasted (default: [255.255.255.255]). *)
trusted_points : P2p_point.Id.t list ;
(** List of hard-coded known peers to bootstrap the network from. *)
@ -296,4 +305,3 @@ module Raw : sig
| Disconnect
val encoding: 'msg app_message_encoding list -> 'msg t Data_encoding.t
end

View File

@ -0,0 +1,272 @@
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
include Logging.Make (struct let name = "p2p.discovery" end)
type pool = Pool : ('msg, 'meta, 'meta_conn) P2p_pool.t -> pool
module Message = struct
let encoding =
Data_encoding.(tup3 (Fixed.string 10) P2p_peer.Id.encoding int16)
let length = Data_encoding.Binary.fixed_length_exn encoding
let key = "DISCOMAGIC"
let make peer_id port =
Data_encoding.Binary.to_bytes_exn encoding (key, peer_id, port)
end
module Answer = struct
type t = {
my_peer_id: P2p_peer.Id.t ;
pool: pool ;
discovery_port: int ;
canceler: Lwt_canceler.t ;
mutable worker: unit Lwt.t ;
}
let create_socket st =
Lwt.catch
begin fun () ->
let socket = Lwt_unix.socket PF_INET SOCK_DGRAM 0 in
Lwt_canceler.on_cancel st.canceler (fun () ->
Lwt_utils_unix.safe_close socket
) ;
Lwt_unix.setsockopt socket SO_BROADCAST true ;
Lwt_unix.setsockopt socket SO_REUSEADDR true ;
let addr = Lwt_unix.ADDR_INET (Unix.inet_addr_any, st.discovery_port) in
Lwt_unix.bind socket addr >>= fun () ->
Lwt.return socket
end
begin fun exn ->
lwt_debug "Error creating a socket" >>= fun () ->
Lwt.fail exn
end
let loop st =
protect ~canceler:st.canceler begin fun () ->
create_socket st >>= fun socket ->
return socket
end >>=? fun socket ->
(* Infinite loop, should never exit. *)
let rec aux () =
let buf = MBytes.create Message.length in
protect ~canceler:st.canceler begin fun () ->
Lwt_bytes.recvfrom socket buf 0 Message.length [] >>= fun content ->
lwt_debug "Received discovery message..." >>= fun () ->
return content
end >>=? function
| (len, Lwt_unix.ADDR_INET (remote_addr, _))
when Compare.Int.equal len Message.length ->
begin match Data_encoding.Binary.of_bytes Message.encoding buf with
| Some (key, remote_peer_id, remote_port)
when Compare.String.equal key Message.key
&& not (P2p_peer.Id.equal remote_peer_id st.my_peer_id) ->
let s_addr = Unix.string_of_inet_addr remote_addr in
begin match P2p_addr.of_string_opt s_addr with
| None ->
lwt_debug "Failed to parse %S\n@." s_addr >>= fun () ->
aux ()
| Some addr ->
let Pool pool = st.pool in
lwt_log_info "Registering new point %a:%d"
P2p_addr.pp addr remote_port >>= fun () ->
P2p_pool.register_new_point pool st.my_peer_id
(addr, remote_port) ;
aux ()
end
| _ -> aux ()
end
| _ -> aux ()
in aux ()
let worker_loop st =
loop st >>= function
| Error [ Canceled ] ->
Lwt.return_unit
| Error err ->
lwt_log_error
"@[<v 2>Unexpected error in answer worker@ %a@]"
pp_print_error err >>= fun () ->
Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
| Ok () ->
lwt_log_error
"@[<v 2>Unexpected exit in answer worker@]" >>= fun () ->
Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
let create my_peer_id pool ~discovery_port = {
canceler = Lwt_canceler.create () ;
my_peer_id ;
discovery_port ;
pool = Pool pool ;
worker = Lwt.return_unit ;
}
let activate st =
st.worker <-
Lwt_utils.worker "discovery_answer"
~run:(fun () -> worker_loop st)
~cancel:(fun () -> Lwt_canceler.cancel st.canceler)
end
(* ************************************************************ *)
(* Sender *)
module Sender = struct
type t = {
canceler: Lwt_canceler.t ;
my_peer_id: P2p_peer.Id.t ;
listening_port: int ;
discovery_port: int ;
discovery_addr: Ipaddr.V4.t ;
pool: pool ;
restart_discovery: unit Lwt_condition.t ;
mutable worker: unit Lwt.t ;
}
module Config = struct
type t = {
delay: float;
loop: int;
}
let initial = {
delay = 0.1 ;
loop = 0 ;
}
let increase_delay config = { config with delay = 2.0 *. config.delay ; }
let max_loop = 10
end
let broadcast_message st =
let msg = Message.make st.my_peer_id st.listening_port in
Lwt.catch
begin fun () ->
let socket = Lwt_unix.(socket PF_INET SOCK_DGRAM 0) in
Lwt_canceler.on_cancel st.canceler (fun () ->
Lwt_utils_unix.safe_close socket
) ;
Lwt_unix.setsockopt socket Lwt_unix.SO_BROADCAST true ;
let broadcast_ipv4 = Ipaddr_unix.V4.to_inet_addr st.discovery_addr in
let addr = Lwt_unix.ADDR_INET (broadcast_ipv4, st.discovery_port) in
Lwt_unix.connect socket addr >>= fun () ->
lwt_debug "Broadcasting discovery message..." >>= fun () ->
Lwt_bytes.sendto socket msg 0 Message.length [] addr >>= fun _len ->
Lwt_utils_unix.safe_close socket
end
begin fun _exn ->
lwt_debug "Error broadcasting a discovery request" >>= fun () ->
Lwt.return_unit
end
let rec worker_loop sender_config st =
begin
protect ~canceler:st.canceler begin fun () ->
broadcast_message st >>= fun () ->
return_unit
end >>=? fun () ->
protect ~canceler:st.canceler begin fun () ->
Lwt.pick [
begin
Lwt_condition.wait st.restart_discovery >>= fun () ->
return Config.initial
end ;
begin
Lwt_unix.sleep sender_config.Config.delay >>= fun () ->
return { sender_config with Config.loop = succ sender_config.loop ; }
end ;
]
end
end >>= function
| Ok config when config.Config.loop = Config.max_loop ->
let new_sender_config = {
config with Config.loop = pred config.loop ;
} in
worker_loop new_sender_config st
| Ok config ->
let new_sender_config = Config.increase_delay config in
worker_loop new_sender_config st
| Error [ Canceled ] ->
Lwt.return_unit
| Error err ->
lwt_log_error
"@[<v 2>Unexpected error in sender worker@ %a@]"
pp_print_error err >>= fun () ->
Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
let create
my_peer_id pool ~listening_port ~discovery_port ~discovery_addr = {
canceler = Lwt_canceler.create () ;
my_peer_id ;
listening_port ;
discovery_port ;
discovery_addr ;
restart_discovery = Lwt_condition.create () ;
pool = Pool pool ;
worker = Lwt.return_unit ;
}
let activate st =
st.worker <-
Lwt_utils.worker "discovery_sender"
~run:begin fun () -> worker_loop Config.initial st end
~cancel:begin fun () -> Lwt_canceler.cancel st.canceler end
end
(* ********************************************************************** *)
type t = {
answer: Answer.t ;
sender: Sender.t ;
}
let create ~listening_port ~discovery_port ~discovery_addr pool my_peer_id =
let answer = Answer.create my_peer_id pool ~discovery_port in
let sender =
Sender.create
my_peer_id pool ~listening_port ~discovery_port ~discovery_addr in
{ answer ; sender }
let activate { answer ; sender } =
Answer.activate answer ;
Sender.activate sender
let wakeup t = Lwt_condition.signal t.sender.restart_discovery ()
let shutdown t =
Lwt.join [
Lwt_canceler.cancel t.answer.canceler ;
Lwt_canceler.cancel t.sender.canceler ;
]

View File

@ -0,0 +1,57 @@
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
(** Local peer discovery.
This module manages the discovery of local peers through UDP broadcasting.
It is composed of two workers:
- The sender worker whose role is to broadcast discovery messages.
- The answer worker whose role is to listen discovery messages and register new
peers in the current pool.
Discovery messages are composed of an arbitrary key, the listening port and
the peer id of the current peer.
*)
(** Type of a discovery worker. *)
type t
(** [create ~listening_port ~discovery_port ~discovery_addr pool peer_id]
returns a discovery worker registering local peers to the [pool]
and broadcasting discovery messages with the [peer_id] and
the [listening_port] through the address [discovery_addr:discovery_port]. *)
val create : listening_port:int -> discovery_port:int ->
discovery_addr:Ipaddr.V4.t -> ('a, 'b, 'c) P2p_pool.t -> P2p_peer.Table.key ->
t
val activate : t -> unit
(** [wakeup t] sends a signal to the sender machine of [t], asking it
to immediately proceed to broadcasting. *)
val wakeup : t -> unit
(** [shutdown t] returns when [t] has completed shutdown. *)
val shutdown : t -> unit Lwt.t

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -38,6 +39,7 @@ type 'meta t = {
canceler: Lwt_canceler.t ;
bounds: bounds ;
pool: 'meta pool ;
discovery: P2p_discovery.t option ;
just_maintained: unit Lwt_condition.t ;
please_maintain: unit Lwt_condition.t ;
mutable maintain_worker: unit Lwt.t ;
@ -155,11 +157,14 @@ and too_few_connections st n_connected =
if success then begin
maintain st
end else begin
(* not enough contacts, ask the pals of our pals, and then wait *)
(* not enough contacts, ask the pals of our pals,
discover the local network and then wait *)
P2p_pool.broadcast_bootstrap_msg pool ;
Option.iter ~f:P2p_discovery.wakeup st.discovery ;
protect ~canceler:st.canceler begin fun () ->
Lwt.pick [
P2p_pool.Pool_event.wait_new_peer pool ;
P2p_pool.Pool_event.wait_new_point pool ;
Lwt_unix.sleep 5.0 (* TODO exponential back-off ??
or wait for the existence of a
non grey-listed peer ?? *)
@ -189,7 +194,7 @@ let rec worker_loop st =
Lwt_unix.sleep 120. ; (* every two minutes *)
Lwt_condition.wait st.please_maintain ; (* when asked *)
P2p_pool.Pool_event.wait_too_few_connections pool ; (* limits *)
P2p_pool.Pool_event.wait_too_many_connections pool
P2p_pool.Pool_event.wait_too_many_connections pool ;
] >>= fun () ->
return_unit
end >>=? fun () ->
@ -206,9 +211,10 @@ let rec worker_loop st =
| Error [ Canceled ] -> Lwt.return_unit
| Error _ -> Lwt.return_unit
let create bounds pool = {
let create ?discovery bounds pool = {
canceler = Lwt_canceler.create () ;
bounds ;
discovery ;
pool = Pool pool ;
just_maintained = Lwt_condition.create () ;
please_maintain = Lwt_condition.create () ;
@ -219,7 +225,8 @@ let activate st =
st.maintain_worker <-
Lwt_utils.worker "maintenance"
~run:(fun () -> worker_loop st)
~cancel:(fun () -> Lwt_canceler.cancel st.canceler)
~cancel:(fun () -> Lwt_canceler.cancel st.canceler) ;
Option.iter st.discovery ~f:P2p_discovery.activate
let maintain { just_maintained ; please_maintain } =
let wait = Lwt_condition.wait just_maintained in
@ -228,10 +235,12 @@ let maintain { just_maintained ; please_maintain } =
let shutdown {
canceler ;
discovery ;
maintain_worker ;
just_maintained } =
just_maintained ;
} =
Lwt_canceler.cancel canceler >>= fun () ->
Lwt_utils.may ~f:P2p_discovery.shutdown discovery >>= fun () ->
maintain_worker >>= fun () ->
Lwt_condition.broadcast just_maintained () ;
Lwt.return_unit

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -52,9 +53,14 @@ type bounds = {
type 'meta t
(** Type of a maintenance worker. *)
val create : bounds -> ('msg, 'meta, 'meta_conn) P2p_pool.t -> 'meta t
(** [create ~greylist_timeout bounds pool] prepares a maintenance
worker for [pool] with connection targets specified in [bounds]. *)
val create:
?discovery:P2p_discovery.t ->
bounds ->
('msg, 'meta, 'meta_conn) P2p_pool.t ->
'meta t
(** [run ?discovery bounds pool] returns a maintenance worker, with
the [discovery] worker if present, for [pool] with connection targets
specified in [bounds]. *)
val activate: 'meta t -> unit
(** [activate t] start the worker that will maintain connections *)

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -260,6 +261,7 @@ and events = {
too_few_connections : unit Lwt_condition.t ;
too_many_connections : unit Lwt_condition.t ;
new_peer : unit Lwt_condition.t ;
new_point : unit Lwt_condition.t ;
new_connection : unit Lwt_condition.t ;
}
@ -285,6 +287,8 @@ module Pool_event = struct
Lwt_condition.wait pool.events.too_many_connections
let wait_new_peer pool =
Lwt_condition.wait pool.events.new_peer
let wait_new_point pool =
Lwt_condition.wait pool.events.new_point
let wait_new_connection pool =
Lwt_condition.wait pool.events.new_connection
end
@ -330,6 +334,7 @@ let register_point pool ?trusted _source_peer_id (addr, port as point) =
if P2p_point.Table.length pool.known_points >= max then gc_points pool
end ;
P2p_point.Table.add pool.known_points point point_info ;
Lwt_condition.broadcast pool.events.new_point () ;
log pool (New_point point) ;
point_info
| Some point_info -> point_info
@ -1028,9 +1033,9 @@ and register_new_points pool conn =
List.iter (register_new_point pool source_peer_id) points ;
Lwt.return_unit
and register_new_point pool _source_peer_id point =
and register_new_point pool source_peer_id point =
if not (P2p_point.Table.mem pool.my_id_points point) then
ignore (register_point pool _source_peer_id point)
ignore (register_point pool source_peer_id point)
and list_known_points ?(ignore_private = false) pool conn =
if Connection.private_node conn then
@ -1177,6 +1182,7 @@ let create config peer_meta_config conn_meta_config message_config io_sched =
too_few_connections = Lwt_condition.create () ;
too_many_connections = Lwt_condition.create () ;
new_peer = Lwt_condition.create () ;
new_point = Lwt_condition.create () ;
new_connection = Lwt_condition.create () ;
} in
let pool = {

View File

@ -2,6 +2,7 @@
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
@ -119,6 +120,7 @@ type config = {
known_peer_ids_history_size : int ;
(** Size of the known peer_ids log buffer (default: 50) *)
known_points_history_size : int ;
(** Size of the known points log buffer (default: 50) *)
@ -204,6 +206,10 @@ module Pool_event : sig
(** [wait_new_peer pool] is determined when a new peer
(i.e. authentication successful) gets added to the pool. *)
val wait_new_point: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t
(** [wait_new_point pool] is determined when a new point gets registered
to the pool. *)
val wait_new_connection: ('msg, 'peer_meta,'conn_meta) pool -> unit Lwt.t
(** [wait_new_connection pool] is determined when a new connection is
successfully established in the pool. *)
@ -230,7 +236,12 @@ val connect:
val accept:
('msg, 'peer_meta,'conn_meta) pool -> P2p_fd.t -> P2p_point.Id.t -> unit
(** [accept pool fd point] instructs [pool] to start the process of
accepting a connection from [fd]. Used by [P2p]. *)
accepting a connection from [fd]. Used by [P2p_welcome]. *)
val register_new_point:
('a, 'b, 'c) pool -> P2p_peer.Table.key -> P2p_point.Id.t -> unit
(** [register_new_point pool source_peer_id point] tries to register [point]
in pool's internal peer table. *)
val disconnect:
?wait:bool -> ('msg, 'peer_meta,'conn_meta) connection -> unit Lwt.t
@ -429,4 +440,3 @@ module Message : sig
val encoding: 'msg encoding list -> 'msg t Data_encoding.t
end