ligo/test/test_p2p_connection_pool.ml

197 lines
6.6 KiB
OCaml
Raw Normal View History

(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open Error_monad
open P2p_types
include Logging.Make (struct let name = "test-p2p-connection-pool" end)
type message =
| Ping
let msg_config : message P2p_connection_pool.message_config = {
encoding = [
P2p_connection_pool.Encoding {
tag = 0x10 ;
encoding = Data_encoding.empty ;
wrap = (function () -> Ping) ;
unwrap = (function Ping -> Some ()) ;
max_length = None ;
} ;
] ;
versions = Version.[ { name = "TEST" ; major = 0 ; minor = 0 } ] ;
}
type metadata = unit
let meta_config : metadata P2p_connection_pool.meta_config = {
encoding = Data_encoding.empty ;
initial = () ;
}
let rec connect ~timeout pool point =
lwt_log_info "Connect to %a" Point.pp point >>= fun () ->
P2p_connection_pool.connect pool point ~timeout >>= function
| Error [P2p_connection_pool.Connected] -> begin
match P2p_connection_pool.Points.find_connection pool point with
| Some conn -> return conn
| None -> failwith "Woops..."
end
| Error ([ P2p_connection_pool.Connection_refused
| P2p_connection_pool.Pending_connection
| P2p_connection.Rejected
| Lwt_utils.Canceled
| Lwt_utils.Timeout
| P2p_connection_pool.Rejected _
] as err) ->
lwt_log_info "@[Connection to %a failed:@ %a@]"
Point.pp point pp_print_error err >>= fun () ->
Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () ->
connect ~timeout pool point
| Ok _ | Error _ as res -> Lwt.return res
let connect_all ~timeout pool points =
map_p (connect ~timeout pool) points
type error += Connect | Write | Read
let write_all conns msg =
iter_p
(fun conn ->
trace Write @@ P2p_connection_pool.write_sync conn msg)
conns
let read_all conns =
iter_p
(fun conn ->
trace Read @@ P2p_connection_pool.read conn >>=? fun Ping ->
return ())
conns
let rec connect_random pool total rem point n =
Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () ->
(trace Connect @@ connect ~timeout:2. pool point) >>=? fun conn ->
(trace Write @@ P2p_connection_pool.write conn Ping) >>= fun _ ->
(trace Read @@ P2p_connection_pool.read conn) >>=? fun Ping ->
Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () ->
P2p_connection_pool.disconnect conn >>= fun () ->
begin
decr rem ;
if !rem mod total = 0 then
lwt_log_notice "Remaining: %d." (!rem / total)
else
Lwt.return ()
end >>= fun () ->
if n > 1 then
connect_random pool total rem point (pred n)
else
return ()
let connect_random_all pool points n =
let total = List.length points in
let rem = ref (n * total) in
iter_p (fun point -> connect_random pool total rem point n) points
let close_all conns =
Lwt_list.iter_p P2p_connection_pool.disconnect conns
let run_net config repeat points addr port =
Lwt_unix.sleep (Random.float 2.0) >>= fun () ->
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
P2p_connection_pool.create
config meta_config msg_config sched >>= fun pool ->
P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome ->
connect_all ~timeout:2. pool points >>=? fun conns ->
lwt_log_notice "Bootstrap OK" >>= fun () ->
write_all conns Ping >>=? fun () ->
lwt_log_notice "Sent all messages." >>= fun () ->
read_all conns >>=? fun () ->
lwt_log_notice "Read all messages." >>= fun () ->
close_all conns >>= fun () ->
lwt_log_notice "Begin random connections." >>= fun () ->
connect_random_all pool points repeat >>=? fun () ->
lwt_log_notice "Shutting down" >>= fun () ->
P2p_welcome.shutdown welcome >>= fun () ->
P2p_connection_pool.destroy pool >>= fun () ->
P2p_io_scheduler.shutdown sched >>= fun () ->
lwt_log_notice "Shutdown Ok" >>= fun () ->
return ()
let make_net points repeat n =
let point, points = Utils.select n points in
let proof_of_work_target = Crypto_box.make_target [] in
let identity = Identity.generate proof_of_work_target in
let config = P2p_connection_pool.{
identity ;
proof_of_work_target ;
trusted_points = points ;
peers_file = "/dev/null" ;
closed_network = true ;
listening_port = Some (snd point) ;
min_connections = List.length points ;
max_connections = List.length points ;
max_incoming_connections = List.length points ;
authentification_timeout = 2. ;
incoming_app_message_queue_size = None ;
incoming_message_queue_size = None ;
outgoing_message_queue_size = None ;
} in
Process.detach
~prefix:(Format.asprintf "%a " Gid.pp identity.gid)
begin fun () ->
run_net config repeat points (fst point) (snd point) >>= function
| Ok () -> Lwt.return_unit
| Error err ->
lwt_log_error "@[<v 2>Unexpected error: %d@ %a@]"
(List.length err)
pp_print_error err >>= fun () ->
exit 1
end
let addr = ref Ipaddr.V6.localhost
let port = ref (1024 + Random.int 8192)
let clients = ref 10
2017-01-23 14:09:33 +04:00
let repeat_connections = ref 5
let spec = Arg.[
"--port", Int (fun p -> port := p), " Listening port of the first peer.";
"--addr", String (fun p -> addr := Ipaddr.V6.of_string_exn p),
" Listening addr";
"--clients", Set_int clients, " Number of concurrent clients." ;
2017-01-23 14:09:33 +04:00
"--repeat", Set_int repeat_connections,
" Number of connections/disconnections." ;
"-v", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Info)),
" Log up to info msgs" ;
"-vv", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Debug)),
" Log up to debug msgs";
]
let main () =
let open Utils in
2017-01-23 14:09:36 +04:00
Logging.init Stderr >>= fun () ->
let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in
let usage_msg = "Usage: %s <num_peers>.\nArguments are:" in
Arg.parse spec anon_fun usage_msg ;
let ports = !port -- (!port + !clients - 1) in
let points = List.map (fun port -> !addr, port) ports in
2017-01-23 14:09:33 +04:00
Lwt_list.iter_p (make_net points !repeat_connections) (0 -- (!clients - 1))
let () =
Sys.catch_break true ;
2017-01-23 14:09:36 +04:00
try Lwt_main.run @@ main ()
with _ -> ()