Merge branch 'p2p-refactoring' into 'master'

P2P: refactoring

* Use Lwt_pipe instead of Lwt_stream
* Use Error_monad
* Open only Lwt_infix
* Expose more stuff to net (gids)
* Add network traffic counters

See merge request !118
This commit is contained in:
Grégoire Henry 2016-12-01 14:05:27 +01:00
commit 1a1e17e1a0
10 changed files with 808 additions and 365 deletions

View File

@ -111,7 +111,9 @@ UTILS_LIB_INTFS := \
utils/error_monad.mli \ utils/error_monad.mli \
utils/logging.mli \ utils/logging.mli \
utils/lwt_utils.mli \ utils/lwt_utils.mli \
utils/lwt_pipe.mli \
utils/IO.mli \ utils/IO.mli \
utils/moving_average.mli \
UTILS_LIB_IMPLS := \ UTILS_LIB_IMPLS := \
utils/mBytes.ml \ utils/mBytes.ml \
@ -128,7 +130,9 @@ UTILS_LIB_IMPLS := \
utils/error_monad.ml \ utils/error_monad.ml \
utils/logging.ml \ utils/logging.ml \
utils/lwt_utils.ml \ utils/lwt_utils.ml \
utils/lwt_pipe.ml \
utils/IO.ml \ utils/IO.ml \
utils/moving_average.ml \
UTILS_PACKAGES := \ UTILS_PACKAGES := \
base64 \ base64 \

File diff suppressed because it is too large Load Diff

View File

@ -39,8 +39,8 @@ type config = {
(** Network capacities *) (** Network capacities *)
type limits = { type limits = {
(** Maximum length in bytes of network messages' payload *) (** Maximum length in bytes of network messages *)
max_packet_size : int ; max_message_size : int ;
(** Delay after which a non responding peer is considered dead *) (** Delay after which a non responding peer is considered dead *)
peer_answer_timeout : float ; peer_answer_timeout : float ;
(** Minimum number of connections to reach when staring / maitening *) (** Minimum number of connections to reach when staring / maitening *)
@ -55,6 +55,7 @@ type limits = {
(** A global identifier for a peer, a.k.a. an identity *) (** A global identifier for a peer, a.k.a. an identity *)
type gid type gid
val pp_gid : Format.formatter -> gid -> unit
type 'msg encoding = Encoding : { type 'msg encoding = Encoding : {
tag: int ; tag: int ;
@ -97,6 +98,9 @@ module Make (P : PARAMS) : sig
(** Main network initialisation function *) (** Main network initialisation function *)
val bootstrap : config:config -> limits:limits -> net Lwt.t val bootstrap : config:config -> limits:limits -> net Lwt.t
(** Return one's gid *)
val gid : net -> gid
(** A maintenance operation : try and reach the ideal number of peers *) (** A maintenance operation : try and reach the ideal number of peers *)
val maintain : net -> unit Lwt.t val maintain : net -> unit Lwt.t
@ -129,18 +133,18 @@ module Make (P : PARAMS) : sig
val get_metadata : net -> gid -> P.metadata option val get_metadata : net -> gid -> P.metadata option
val set_metadata : net -> gid -> P.metadata -> unit val set_metadata : net -> gid -> P.metadata -> unit
(** Wait for a payload from any peer in the network *) (** Wait for a message from any peer in the network *)
val recv : net -> (peer * P.msg) Lwt.t val recv : net -> (peer * P.msg) Lwt.t
(** Send a payload to a peer and wait for it to be in the tube *) (** [send net peer msg] is a thread that returns when [msg] has been
successfully enqueued in the send queue. *)
val send : net -> peer -> P.msg -> unit Lwt.t val send : net -> peer -> P.msg -> unit Lwt.t
(** Send a payload to a peer without waiting for the result. Return (** [try_send net peer msg] is [true] if [msg] has been added to the
[true] if the message can be enqueued in the peer's output queue send queue for [peer], [false] otherwise *)
or [false] otherwise. *)
val try_send : net -> peer -> P.msg -> bool val try_send : net -> peer -> P.msg -> bool
(** Send a payload to all peers *) (** Send a message to all peers *)
val broadcast : net -> P.msg -> unit val broadcast : net -> P.msg -> unit
(** Shutdown the connection to all peers at this address and stop the (** Shutdown the connection to all peers at this address and stop the

View File

@ -67,12 +67,12 @@ type msg =
(** Wait for a payload from any peer in the network *) (** Wait for a payload from any peer in the network *)
val recv : net -> (peer * msg) Lwt.t val recv : net -> (peer * msg) Lwt.t
(** Send a payload to a peer and wait for it to be in the tube *) (** [send net peer msg] is a thread that returns when [msg] has been
successfully enqueued in the send queue. *)
val send : net -> peer -> msg -> unit Lwt.t val send : net -> peer -> msg -> unit Lwt.t
(** Send a payload to a peer without waiting for the result. Return (** [try_send net peer msg] is [true] if [msg] has been added to the
[true] if the msg can be enqueued in the peer's output queue send queue for [peer], [false] otherwise *)
or [false] otherwise. *)
val try_send : net -> peer -> msg -> bool val try_send : net -> peer -> msg -> bool
(** Send a payload to all peers *) (** Send a payload to all peers *)

View File

@ -396,7 +396,7 @@ let init_node { sandbox ; sandbox_param ;
| Some _ -> None | Some _ -> None
| None -> | None ->
let limits = let limits =
{ max_packet_size = 10_000 ; { max_message_size = 10_000 ;
peer_answer_timeout = 5. ; peer_answer_timeout = 5. ;
expected_connections ; expected_connections ;
min_connections ; min_connections ;

105
src/utils/lwt_pipe.ml Normal file
View File

@ -0,0 +1,105 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open Lwt.Infix
type 'a t =
{ queue : 'a Queue.t ;
size : int ;
mutable push_waiter : (unit Lwt.t * unit Lwt.u) option ;
mutable pop_waiter : (unit Lwt.t * unit Lwt.u) option }
let create ~size =
{ queue = Queue.create () ;
size ;
push_waiter = None ;
pop_waiter = None }
let notify_push q =
match q.push_waiter with
| None -> ()
| Some (_, w) ->
q.push_waiter <- None ;
Lwt.wakeup_later w ()
let notify_pop q =
match q.pop_waiter with
| None -> ()
| Some (_, w) ->
q.pop_waiter <- None ;
Lwt.wakeup_later w ()
let wait_push q =
match q.push_waiter with
| Some (t, _) -> t
| None ->
let waiter, wakener = Lwt.wait () in
q.push_waiter <- Some (waiter, wakener) ;
waiter
let wait_pop q =
match q.pop_waiter with
| Some (t, _) -> t
| None ->
let waiter, wakener = Lwt.wait () in
q.pop_waiter <- Some (waiter, wakener) ;
waiter
let rec push ({ queue ; size } as q) elt =
if Queue.length queue < size then begin
Queue.push elt queue ;
notify_push q ;
Lwt.return_unit
end else
wait_pop q >>= fun () ->
push q elt
let rec push_now ({ queue; size } as q) elt =
Queue.length queue < size && begin
Queue.push elt queue ;
notify_push q ;
true
end
let rec pop ({ queue } as q) =
if not (Queue.is_empty queue) then
let elt = Queue.pop queue in
notify_pop q ;
Lwt.return elt
else
wait_push q >>= fun () ->
pop q
let rec peek ({ queue } as q) =
if not (Queue.is_empty queue) then
let elt = Queue.peek queue in
Lwt.return elt
else
wait_push q >>= fun () ->
peek q
let pop_now_exn ({ queue } as q) =
let elt = Queue.pop queue in
notify_pop q ;
elt
let pop_now q =
match pop_now_exn q with
| exception Queue.Empty -> None
| elt -> Some elt
let length { queue } = Queue.length queue
let is_empty { queue } = Queue.is_empty queue
let rec values_available q =
if is_empty q then
wait_push q >>= fun () ->
values_available q
else
Lwt.return_unit

54
src/utils/lwt_pipe.mli Normal file
View File

@ -0,0 +1,54 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(** Data queues similar to the [Pipe] module in Jane Street's [Async]
library. They are implemented with [Queue]s, limited in size, and
use lwt primitives for concurrent access. *)
type 'a t
(** Type of queues holding values of type ['a]. *)
val create : size:int -> 'a t
(** [create ~size] is an empty queue that can hold max [size]
elements. *)
val push : 'a t -> 'a -> unit Lwt.t
(** [push q v] is a thread that blocks while [q] contains more
than [size] elements, then adds [v] at the end of [q]. *)
val pop : 'a t -> 'a Lwt.t
(** [pop q] is a thread that blocks while [q] is empty, then
removes and returns the first element in [q]. *)
val peek : 'a t -> 'a Lwt.t
(** [peek] is like [pop] except it does not removes the first
element. *)
val values_available : 'a t -> unit Lwt.t
(** [values_available] is like [peek] but it ignores the value
returned. *)
val push_now : 'a t -> 'a -> bool
(** [push_now q v] adds [v] at the ends of [q] immediately and returns
[false] if [q] is currently full, [true] otherwise. *)
val pop_now : 'a t -> 'a option
(** [pop_now q] maybe removes and returns the first element in [q] if
[q] contains at least one element. *)
val pop_now_exn : 'a t -> 'a
(** [pop_now_exn q] removes and returns the first element in [q] if
[q] contains at least one element, or raise [Empty] otherwise. *)
val length : 'a t -> int
(** [length q] is the number of elements in [q]. *)
val is_empty : 'a t -> bool
(** [is_empty q] is [true] if [q] is empty, [false] otherwise. *)

View File

@ -0,0 +1,37 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
class type ma = object
method add_float : float -> unit
method add_int : int -> unit
method get : float
end
class virtual base ?(init = 0.) () = object (self)
val mutable acc : float = init
method virtual add_float : float -> unit
method add_int x = self#add_float (float_of_int x)
method get = acc
end
class sma ?init () = object
inherit base ?init ()
val mutable i = match init with None -> 0 | _ -> 1
method add_float x =
acc <- (acc +. (x -. acc) /. (float_of_int @@ succ i)) ;
i <- succ i
end
class ema ?init ~alpha () = object
inherit base ?init ()
val alpha = alpha
method add_float x =
acc <- alpha *. x +. (1. -. alpha) *. acc
end

View File

@ -0,0 +1,34 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(** Moving averages. The formulas are from Wikipedia
[https://en.wikipedia.org/wiki/Moving_average] *)
class type ma = object
method add_float : float -> unit
method add_int : int -> unit
method get : float
end
(** Common class type for objects computing a cumulative moving
average of some flavor. In a cumulative moving average, the data
arrive in an ordered datum stream, and the user would like to get
the average of all of the data up until the current datum
point. The method [add_float] and [add_int] are used to add the
next datum. The method [get] and [get_exn] are used to compute the
moving average up until the current datum point. *)
class sma : ?init:float -> unit -> ma
(** [sma ?init ()] is an object that computes the Simple Moving
Average of a datum stream. [SMA(n+1) = SMA(n) + (x_(n+1) / SMA(n))
/ (n+1)] *)
class ema : ?init:float -> alpha:float -> unit -> ma
(** [ema ?init ~alpha ()] is an object that computes the Exponential
Moving Average of a datum stream. [EMA(n+1) = alpha * x_(n+1) +
(1 - alpha) * x_n] *)

View File

@ -4,13 +4,41 @@ open P2p
include Logging.Make (struct let name = "test-p2p" end) include Logging.Make (struct let name = "test-p2p" end)
module Param = struct module Param = struct
type msg = unit
let encodings = [Encoding { tag = 0x10; let dump_encoding = Data_encoding.(Variable.list (tup2 string string))
encoding = Data_encoding.null;
wrap = (fun () -> ()); type msg =
unwrap = (fun () -> Some ()); | Create of string * string
max_length = Some 0; | Update of string * string
}] | Delete of string
| Dump of (string * string) list
let encodings = [
Encoding { tag = 0x10;
encoding = Data_encoding.(tup2 string string);
wrap = (function (k, v) -> Create (k, v));
unwrap = (function Create (k, v) -> Some (k, v) | _ -> None);
max_length = Some 0x400;
};
Encoding { tag = 0x11;
encoding = Data_encoding.(tup2 string string);
wrap = (function (k, v) -> Update (k, v));
unwrap = (function Create (k, v) -> Some (k, v) | _ -> None);
max_length = Some 0x400;
};
Encoding { tag = 0x12;
encoding = Data_encoding.string;
wrap = (function x -> Delete x);
unwrap = (function Delete x -> Some x | _ -> None);
max_length = Some 0x400;
};
Encoding { tag = 0x13;
encoding = dump_encoding;
wrap = (function x -> Dump x);
unwrap = (function Dump x -> Some x | _ -> None);
max_length = Some 0x10000;
};
]
type metadata = unit type metadata = unit
let initial_metadata = () let initial_metadata = ()
@ -20,82 +48,113 @@ module Param = struct
let supported_versions = [ { name = "TEST"; major = 0; minor = 0; } ] let supported_versions = [ { name = "TEST"; major = 0; minor = 0; } ]
end end
let peer_of_string peer = module Net = Make(Param)
let p = String.rindex peer ':' in
let addr = String.sub peer 0 p in
let port = String.(sub peer (p+1) (length peer - p - 1)) in
Ipaddr.of_string_exn addr, int_of_string port
include Make(Param) let print_peer_info { Net.gid; addr; port; version = { name; major; minor } } =
let print_peer_info { gid; addr; port; version = { name; major; minor } } =
Printf.sprintf "%s:%d (%s.%d.%d)" (Ipaddr.to_string addr) port name major minor Printf.sprintf "%s:%d (%s.%d.%d)" (Ipaddr.to_string addr) port name major minor
let peers_file = ref @@ Filename.temp_file "p2p-test" "" let string_of_gid gid = Format.asprintf "%a" pp_gid gid
let net_monitor config limits num_nets net =
let my_gid_str = string_of_gid @@ Net.gid net in
let send_msgs_to_neighbours neighbours =
Lwt_list.iter_p begin fun p ->
let { Net.gid } = Net.peer_info net p in
let remote_gid_str = string_of_gid gid in
Net.send net p (Create (my_gid_str, remote_gid_str)) >>= fun _ ->
lwt_log_notice "(%s) Done sending msg to %s" my_gid_str remote_gid_str
end neighbours >>= fun () ->
lwt_log_notice "(%s) Done sending all msgs." my_gid_str
in
let rec inner () =
let neighbours = Net.peers net in
let nb_neighbours = List.length neighbours in
if nb_neighbours < num_nets - 1 then begin
log_notice "(%s) I have %d peers" my_gid_str nb_neighbours;
Lwt_unix.sleep 1. >>= inner end
else begin
log_notice "(%s) I know all my %d peers" my_gid_str nb_neighbours;
Lwt.async (fun () -> send_msgs_to_neighbours neighbours);
let rec recv_peer_msgs acc =
if List.length acc = num_nets - 1 then begin
ListLabels.iter acc ~f:(fun (k, v) -> log_info "%s %s" k v);
Lwt.return_unit
end
else begin
lwt_log_notice "(%s) recv_peers_msgs: Got %d, need %d"
my_gid_str (List.length acc) (num_nets - 1) >>= fun () ->
Net.recv net >>= function
| p, (Create (their_gid, my_gid)) ->
lwt_log_notice "(%s) Got a message from %s" my_gid_str their_gid >>= fun () ->
recv_peer_msgs ((their_gid, my_gid) :: acc)
| _ -> assert false
end
in
recv_peer_msgs []
end
in inner ()
let range n =
let rec inner acc = function
| -1 -> acc
| n -> inner (n :: acc) (pred n)
in
if n < 0 then invalid_arg "range"
else inner [] (pred n)
let main () = let main () =
let incoming_port = ref @@ Some 11732 in let incoming_port = ref @@ Some 11732 in
let discovery_port = ref None in let discovery_port = ref @@ Some 10732 in
let known_peers = ref [] in
let closed_network = ref false in let closed_network = ref false in
let max_packet_size = ref 1024 in let max_packet_size = ref 1024 in
let peer_answer_timeout = ref 10. in let peer_answer_timeout = ref 10. in
let expected_connections = ref 1 in
let min_connections = ref 0 in
let max_connections = ref 10 in
let blacklist_time = ref 100. in let blacklist_time = ref 100. in
let num_networks = ref 0 in
let make_net nb_neighbours n =
let config = {
incoming_port = Utils.map_option !incoming_port ~f:(fun p -> p + n);
discovery_port = !discovery_port;
known_peers = [];
peers_file = "";
closed_network = !closed_network;
}
in
let limits = {
max_message_size = !max_packet_size;
peer_answer_timeout = !peer_answer_timeout;
expected_connections = nb_neighbours;
min_connections = nb_neighbours;
max_connections = nb_neighbours;
blacklist_time = !blacklist_time;
}
in
Net.bootstrap ~config ~limits >|= fun net ->
config, limits, net
in
let spec = Arg.[ let spec = Arg.[
"-iport", Int (fun p -> incoming_port := Some p), " Incoming port"; "-start-port", Int (fun p -> incoming_port := Some p), " Incoming port";
"-dport", Int (fun p -> discovery_port := Some p), " Discovery port"; "-dport", Int (fun p -> discovery_port := Some p), " Discovery port";
"-peers-file", Set_string peers_file, " Peers filepath";
"-closed", Set closed_network, " Closed network mode"; "-closed", Set closed_network, " Closed network mode";
"-max-packet-size", Set_int max_packet_size, "int Max size of packets"; "-max-packet-size", Set_int max_packet_size, "int Max size of packets";
"-peer-answer-timeout", Set_float peer_answer_timeout, "float Number of seconds"; "-peer-answer-timeout", Set_float peer_answer_timeout, "float Number of seconds";
"-expected-connections", Set_int expected_connections, "conns Expected connections";
"-min-connections", Set_int min_connections, "conns Minimal number of connections";
"-max-connections", Set_int max_connections, "conns num of connections";
"-blacklist-time", Set_float blacklist_time, "float Number of seconds"; "-blacklist-time", Set_float blacklist_time, "float Number of seconds";
"-v", Unit (fun () -> Lwt_log_core.(add_rule "*" Info)), " Log up to info msgs"; "-v", Unit (fun () -> Lwt_log_core.(add_rule "*" Info)), " Log up to info msgs";
"-vv", Unit (fun () -> Lwt_log_core.(add_rule "*" Debug)), " Log up to debug msgs"; "-vv", Unit (fun () -> Lwt_log_core.(add_rule "*" Debug)), " Log up to debug msgs";
] ]
in in
let anon_fun peer = known_peers := peer_of_string peer :: !known_peers in let anon_fun num_peers = num_networks := int_of_string num_peers in
let usage_msg = "Test P2p. Arguments are:" in let usage_msg = "Usage: %s <num_peers>.\nArguments are:" in
Arg.parse spec anon_fun usage_msg; Arg.parse spec anon_fun usage_msg;
let config = { let nets = range !num_networks in
incoming_port = !incoming_port; Lwt_list.map_p (make_net (pred !num_networks)) nets >>= fun nets ->
discovery_port = !discovery_port; Lwt_list.iter_p (fun (cfg, limits, net) -> net_monitor cfg limits !num_networks net) nets >>= fun () ->
known_peers = !known_peers; lwt_log_notice "All done!"
peers_file = !peers_file;
closed_network = !closed_network;
}
in
let limits = {
max_packet_size = !max_packet_size;
peer_answer_timeout = !peer_answer_timeout;
expected_connections = !expected_connections;
min_connections = !min_connections;
max_connections = !max_connections;
blacklist_time = !blacklist_time;
}
in
bootstrap ~config ~limits >>= fun net ->
let rec loop () =
ListLabels.iter (peers net) ~f:begin fun p ->
let pi = peer_info net p in
log_info "%s" (print_peer_info pi)
end;
Lwt_unix.sleep 3. >>=
loop
in
loop ()
let () = let () =
Sys.catch_break true; Sys.catch_break true;
try try
Lwt_main.run @@ main () Lwt_main.run @@ main ()
with _ -> with _ -> ()
Sys.remove !peers_file