diff --git a/src/utils/error_monad.ml b/src/utils/error_monad.ml index 17c70ec0a..37f669f51 100644 --- a/src/utils/error_monad.ml +++ b/src/utils/error_monad.ml @@ -286,6 +286,16 @@ module Make() = struct fold_right_s f t init >>=? fun acc -> f h acc + let rec join = function + | [] -> return () + | t :: ts -> + t >>= function + | Error _ as err -> + join ts >>=? fun () -> + Lwt.return err + | Ok () -> + join ts + let record_trace err result = match result with | Ok _ as res -> res diff --git a/src/utils/error_monad_sig.ml b/src/utils/error_monad_sig.ml index fa7680236..477a81991 100644 --- a/src/utils/error_monad_sig.ml +++ b/src/utils/error_monad_sig.ml @@ -144,4 +144,7 @@ module type S = sig val fold_right_s : ('a -> 'b -> 'b tzresult Lwt.t) -> 'a list -> 'b -> 'b tzresult Lwt.t + (** A {!Lwt.join} in the monad *) + val join : unit tzresult Lwt.t list -> unit tzresult Lwt.t + end diff --git a/test/lib/process.ml b/test/lib/process.ml index 979f741c1..586d4e830 100644 --- a/test/lib/process.ml +++ b/test/lib/process.ml @@ -7,41 +7,14 @@ (* *) (**************************************************************************) +let () = Lwt_unix.set_default_async_method Async_none + include Logging.Make (struct let name = "process" end) open Error_monad exception Exited of int -let detach ?(prefix = "") f = - Lwt_io.flush_all () >>= fun () -> - match Lwt_unix.fork () with - | 0 -> - Random.self_init () ; - let template = Format.asprintf "%s$(section): $(message)" prefix in - Lwt_main.run begin - Logging.init ~template Stderr >>= fun () -> - lwt_log_notice "PID: %d" (Unix.getpid ()) >>= fun () -> - f () - end ; - exit 0 - | pid -> - Lwt.catch - (fun () -> - Lwt_unix.waitpid [] pid >>= function - | (_,Lwt_unix.WEXITED 0) -> - Lwt.return_unit - | (_,Lwt_unix.WEXITED n) -> - Lwt.fail (Exited n) - | (_,Lwt_unix.WSIGNALED _) - | (_,Lwt_unix.WSTOPPED _) -> - Lwt.fail Exit) - (function - | Lwt.Canceled -> - Unix.kill pid Sys.sigkill ; - Lwt.return_unit - | exn -> Lwt.fail exn) - let handle_error f = Lwt.catch f @@ -51,27 +24,90 @@ let handle_error f = lwt_log_error "%a" Error_monad.pp_print_error err >>= fun () -> exit 1 -let rec wait processes = +module Channel = struct + type ('a, 'b) t = (Lwt_io.input_channel * Lwt_io.output_channel) + let push (_, outch) v = + Lwt.catch + (fun () -> Lwt_io.write_value outch v >>= return) + (fun exn -> fail (Exn exn)) + let pop (inch, _) = + Lwt.catch + (fun () -> Lwt_io.read_value inch >>= return) + (fun exn -> fail (Exn exn)) +end + +let wait pid = Lwt.catch (fun () -> - Lwt.nchoose_split processes >>= function - | (_, []) -> lwt_log_notice "All done!" - | (_, processes) -> wait processes) + Lwt_unix.waitpid [] pid >>= function + | (_,Lwt_unix.WEXITED 0) -> + return () + | (_,Lwt_unix.WEXITED n) -> + fail (Exn (Exited n)) + | (_,Lwt_unix.WSIGNALED _) + | (_,Lwt_unix.WSTOPPED _) -> + Lwt.fail Exit) (function - | Exited n -> - lwt_log_notice "Early error!" >>= fun () -> - List.iter Lwt.cancel processes ; - Lwt.catch - (fun () -> Lwt.join processes) - (fun _ -> Lwt.return_unit) >>= fun () -> - lwt_log_notice "A process finished with error %d !" n >>= fun () -> - Pervasives.exit n + | Lwt.Canceled -> + Unix.kill pid Sys.sigkill ; + return () | exn -> - lwt_log_notice "Unexpected error!%a" - Error_monad.pp_exn exn >>= fun () -> - List.iter Lwt.cancel processes ; - Lwt.catch - (fun () -> Lwt.join processes) - (fun _ -> Lwt.return_unit) >>= fun () -> - Pervasives.exit 2) + fail (Exn exn)) + +type ('a, 'b) t = { + termination: unit tzresult Lwt.t ; + channel: ('b, 'a) Channel.t ; +} + +let detach ?(prefix = "") f = + Lwt_io.flush_all () >>= fun () -> + let main_in, child_out = Lwt_io.pipe () in + let child_in, main_out = Lwt_io.pipe () in + match Lwt_unix.fork () with + | 0 -> + Logging.init Stderr >>= fun () -> + Random.self_init () ; + let template = Format.asprintf "%s$(message)" prefix in + Lwt_main.run begin + Lwt_io.close main_in >>= fun () -> + Lwt_io.close main_out >>= fun () -> + Logging.init ~template Stderr >>= fun () -> + lwt_log_info "PID: %d" (Unix.getpid ()) >>= fun () -> + handle_error (fun () -> f (child_in, child_out)) + end ; + exit 0 + | pid -> + let termination = wait pid in + Lwt_io.close child_in >>= fun () -> + Lwt_io.close child_out >>= fun () -> + Lwt.return { termination ; channel = (main_in, main_out) } + +let wait_all processes = + let rec loop processes = + match processes with + | [] -> Lwt.return_none + | processes -> + Lwt.nchoose_split processes >>= function + | (finished, remaining) -> + let rec handle = function + | [] -> loop remaining + | Ok () :: finished -> handle finished + | Error err :: _ -> + Lwt.return (Some (err, remaining)) in + handle finished in + loop (List.map (fun p -> p.termination) processes) >>= function + | None -> + lwt_log_info "All done!" >>= fun () -> + return () + | Some ([Exn (Exited n)], remaining) -> + lwt_log_error "Early error!" >>= fun () -> + List.iter Lwt.cancel remaining ; + join remaining >>= fun _ -> + failwith "A process finished with error %d !" n + | Some (err, remaining) -> + lwt_log_error "Unexpected error!%a" + pp_print_error err >>= fun () -> + List.iter Lwt.cancel remaining ; + join remaining >>= fun _ -> + failwith "A process finished with an unexpected error !" diff --git a/test/lib/process.mli b/test/lib/process.mli index c1933cc11..67dff1453 100644 --- a/test/lib/process.mli +++ b/test/lib/process.mli @@ -10,6 +10,20 @@ open Error_monad exception Exited of int -val detach: ?prefix:string -> (unit -> unit Lwt.t) -> unit Lwt.t -val handle_error: (unit -> (unit, error list) result Lwt.t) -> unit Lwt.t -val wait: unit Lwt.t list -> unit Lwt.t +module Channel : sig + type ('a, 'b) t + val push: ('a, 'b) t -> 'a -> unit tzresult Lwt.t + val pop: ('a, 'b) t -> 'b tzresult Lwt.t +end + +type ('a, 'b) t = { + termination: unit tzresult Lwt.t ; + channel: ('b, 'a) Channel.t ; +} + +val detach: + ?prefix:string -> + (('a, 'b) Channel.t -> unit tzresult Lwt.t) -> + ('a, 'b) t Lwt.t + +val wait_all: ('a, 'b) t list -> unit tzresult Lwt.t diff --git a/test/lib/test.ml b/test/lib/test.ml index ebd78d6a4..cfc648961 100644 --- a/test/lib/test.ml +++ b/test/lib/test.ml @@ -11,9 +11,6 @@ module Test = Kaputt.Abbreviations.Test let keep_dir = try ignore (Sys.getenv "KEEPDIR") ; true with _ -> false -let make_test ~title test = - Test.add_simple_test ~title (fun () -> Lwt_main.run (test ())) - let rec remove_dir dir = if Sys.file_exists dir then begin Array.iter (fun file -> diff --git a/test/p2p/Makefile b/test/p2p/Makefile index 0523a3392..e38f215d7 100644 --- a/test/p2p/Makefile +++ b/test/p2p/Makefile @@ -24,7 +24,7 @@ OPENED_MODULES := ${NODE_OPENED_MODULES} .PHONY:run-test-p2p-connection run-test-p2p-connection: @echo - ./test-p2p-connection + ./test-p2p-connection -v TEST_CONNECTION_IMPLS := \ test_p2p_connection.ml @@ -42,7 +42,7 @@ clean:: .PHONY:run-test-p2p-connection-pool run-test-p2p-connection-pool: @echo - ./test-p2p-connection-pool --clients 10 --repeat 5 + ./test-p2p-connection-pool --clients 10 --repeat 5 -v TEST_CONNECTION_POOL_IMPLS := \ test_p2p_connection_pool.ml diff --git a/test/p2p/test_p2p_connection.ml b/test/p2p/test_p2p_connection.ml index 0a14e6f8b..b2e199aa8 100644 --- a/test/p2p/test_p2p_connection.ml +++ b/test/p2p/test_p2p_connection.ml @@ -12,7 +12,9 @@ open Error_monad open P2p_types -include Logging.Make (struct let name = "test-p2p-connection" end) +include Logging.Make (struct let name = "test.p2p.connection" end) + +let default_addr = Ipaddr.V6.localhost let proof_of_work_target = Crypto_box.make_target 16. let id1 = Identity.generate proof_of_work_target @@ -44,6 +46,46 @@ let rec listen ?port addr = | exn -> Lwt.fail exn end +let sync ch = + Process.Channel.push ch () >>=? fun () -> + Process.Channel.pop ch >>=? fun () -> + return () + +let rec sync_nodes nodes = + iter_p + (fun { Process.channel } -> Process.Channel.pop channel) + nodes >>=? fun () -> + iter_p + (fun { Process.channel } -> Process.Channel.push channel ()) + nodes >>=? fun () -> + sync_nodes nodes + +let sync_nodes nodes = + sync_nodes nodes >>= function + | Ok () | Error (Exn End_of_file :: _) -> + return () + | Error e as err -> + Lwt.return err + +let run_nodes client server = + listen default_addr >>= fun (main_socket, port) -> + Process.detach ~prefix:"server: " begin fun channel -> + let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in + server channel sched main_socket >>=? fun () -> + P2p_io_scheduler.shutdown sched >>= fun () -> + return () + end >>= fun server_node -> + Process.detach ~prefix:"client: " begin fun channel -> + Lwt_utils.safe_close main_socket >>= fun () -> + let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in + client channel sched default_addr port >>=? fun () -> + P2p_io_scheduler.shutdown sched >>= fun () -> + return () + end >>= fun client_node -> + let nodes = [ server_node ; client_node ] in + Lwt.ignore_result (sync_nodes nodes) ; + Process.wait_all nodes + let raw_accept sched main_socket = Lwt_unix.accept main_socket >>= fun (fd, sockaddr) -> let fd = P2p_io_scheduler.register sched fd in @@ -73,20 +115,11 @@ let connect sched addr port id = P2p_connection.authenticate ~proof_of_work_target ~incoming:false fd (addr, port) id versions >>=? fun (info, auth_fd) -> - assert (not info.incoming) ; - assert (Peer_id.compare info.peer_id id1.peer_id = 0) ; + _assert (not info.incoming) __LOC__ "" >>=? fun () -> + _assert (Peer_id.compare info.peer_id id1.peer_id = 0) + __LOC__ "" >>=? fun () -> return auth_fd -let simple_msg = - MBytes.create (1 lsl 1) - -let is_rejected = function - | Error [P2p_connection.Rejected] -> true - | Ok _ -> false - | Error err -> - log_notice "Error: %a" pp_print_error err ; - false - let is_connection_closed = function | Error [P2p_io_scheduler.Connection_closed] -> true | Ok _ -> false @@ -94,116 +127,193 @@ let is_connection_closed = function log_notice "Error: %a" pp_print_error err ; false -let bytes_encoding = Data_encoding.Variable.bytes +let is_decoding_error = function + | Error [P2p_connection.Decoding_error] -> true + | Ok _ -> false + | Error err -> + log_notice "Error: %a" pp_print_error err ; + false -let server main_socket = - let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in - (* Low-level test. *) - raw_accept sched main_socket >>= fun (fd, _point) -> - lwt_log_notice "Low_level" >>= fun () -> - P2p_io_scheduler.write fd simple_msg >>=? fun () -> - P2p_io_scheduler.close fd >>=? fun _ -> - lwt_log_notice "Low_level OK" >>= fun () -> - (* Kick the first connection. *) - accept sched main_socket >>=? fun (info, auth_fd) -> - lwt_log_notice "Kick" >>= fun () -> - assert (info.incoming) ; - assert (Peer_id.compare info.peer_id id2.peer_id = 0) ; - P2p_connection.kick auth_fd >>= fun () -> - lwt_log_notice "Kick OK" >>= fun () -> - (* Let's be rejected. *) - accept sched main_socket >>=? fun (_info, auth_fd) -> - P2p_connection.accept auth_fd bytes_encoding >>= fun conn -> - assert (is_rejected conn) ; - lwt_log_notice "Kicked OK" >>= fun () -> - (* Accept and send a single message. *) - accept sched main_socket >>=? fun (_info, auth_fd) -> - lwt_log_notice "Single" >>= fun () -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.write_sync conn simple_msg >>=? fun () -> - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Single OK" >>= fun () -> - (* Accept and send a single message, while the client expected 2. *) - accept sched main_socket >>=? fun (_info, auth_fd) -> - lwt_log_notice "Early close (read)" >>= fun () -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.write_sync conn simple_msg >>=? fun () -> - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Early close (read) OK" >>= fun () -> - (* Accept and wait for the client to close the connection. *) - accept sched main_socket >>=? fun (_info, auth_fd) -> - lwt_log_notice "Early close (write)" >>= fun () -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Early close (write) OK" >>= fun () -> - P2p_io_scheduler.shutdown sched >>= fun () -> - Lwt_unix.sleep 0.2 >>= fun () -> - lwt_log_notice "Success" >>= fun () -> - return () +module Low_level = struct -let client addr port = - let msg = MBytes.create (MBytes.length simple_msg) in - let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in - raw_connect sched addr port >>= fun fd -> - P2p_io_scheduler.read_full fd msg >>=? fun () -> - assert (MBytes.compare simple_msg msg = 0) ; - P2p_io_scheduler.close fd >>=? fun () -> - lwt_log_notice "Low_level OK" >>= fun () -> - (* let's be rejected. *) - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.accept auth_fd bytes_encoding >>= fun conn -> - assert (is_rejected conn) ; - lwt_log_notice "Kick OK" >>= fun () -> - (* let's reject! *) - lwt_log_notice "Kicked" >>= fun () -> - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.kick auth_fd >>= fun () -> - (* let's exchange a simple message. *) - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.read conn >>=? fun (_msg_size, msg) -> - assert (MBytes.compare simple_msg msg = 0) ; - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Simple OK" >>= fun () -> - (* let's detect a closed connection on `read`. *) - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.read conn >>=? fun (_msg_size, msg) -> - assert (MBytes.compare simple_msg msg = 0) ; - P2p_connection.read conn >>= fun msg -> - assert (is_connection_closed msg) ; - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Early close (read) OK" >>= fun () -> - (* let's detect a closed connection on `write`. *) - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - Lwt_unix.sleep 0.1 >>= fun () -> - P2p_connection.write_sync conn simple_msg >>= fun unit -> - assert (is_connection_closed unit) ; - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Early close (write) OK" >>= fun () -> - P2p_io_scheduler.shutdown sched >>= fun () -> - lwt_log_notice "Success" >>= fun () -> - return () + let simple_msg = MBytes.create (1 lsl 4) -let default_addr = Ipaddr.V6.localhost + let client _ch sched addr port = + let msg = MBytes.create (MBytes.length simple_msg) in + raw_connect sched addr port >>= fun fd -> + P2p_io_scheduler.read_full fd msg >>=? fun () -> + _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> + P2p_io_scheduler.close fd >>=? fun () -> + return () + + let server _ch sched socket = + raw_accept sched socket >>= fun (fd, point) -> + P2p_io_scheduler.write fd simple_msg >>=? fun () -> + P2p_io_scheduler.close fd >>=? fun _ -> + return () + + let run _dir = run_nodes client server + +end + +module Kick = struct + + let encoding = Data_encoding.bytes + + let is_rejected = function + | Error [P2p_connection.Rejected] -> true + | Ok _ -> false + | Error err -> + log_notice "Error: %a" pp_print_error err ; + false + + let server _ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + _assert (info.incoming) __LOC__ "" >>=? fun () -> + _assert (Peer_id.compare info.peer_id id2.peer_id = 0) + __LOC__ "" >>=? fun () -> + P2p_connection.kick auth_fd >>= fun () -> + return () + + let client _ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd encoding >>= fun conn -> + _assert (is_rejected conn) __LOC__ "" >>=? fun () -> + return () + + let run _dir = run_nodes client server + +end + +module Kicked = struct + + let encoding = Data_encoding.bytes + + let server _ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd encoding >>= fun conn -> + _assert (Kick.is_rejected conn) __LOC__ "" >>=? fun () -> + return () + + let client _ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.kick auth_fd >>= fun () -> + return () + + let run _dir = run_nodes client server + +end + +module Simple_message = struct + + let encoding = Data_encoding.bytes + + let simple_msg = MBytes.create (1 lsl 4) + let simple_msg2 = MBytes.create (1 lsl 4) + + let server ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.write_sync conn simple_msg >>=? fun () -> + P2p_connection.read conn >>=? fun (_msg_size, msg) -> + _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> + sync ch >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let client ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.write_sync conn simple_msg2 >>=? fun () -> + P2p_connection.read conn >>=? fun (_msg_size, msg) -> + _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> + sync ch >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let run _dir = run_nodes client server + +end + +module Close_on_read = struct + + let encoding = Data_encoding.bytes + + let simple_msg = MBytes.create (1 lsl 4) + + let server _ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.close conn >>= fun _stat -> + return () + + let client _ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.read conn >>= fun err -> + _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let run _dir = run_nodes client server + +end + +module Close_on_write = struct + + let encoding = Data_encoding.bytes + + let simple_msg = MBytes.create (1 lsl 4) + + let server ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.close conn >>= fun _stat -> + sync ch >>=? fun ()-> + return () + + let client ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + sync ch >>=? fun ()-> + P2p_connection.write_sync conn simple_msg >>= fun err -> + _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let run _dir = run_nodes client server + +end + +let spec = Arg.[ + + "-v", Unit (fun () -> + Lwt_log_core.(add_rule "test.p2p.connection" Info) ; + Lwt_log_core.(add_rule "p2p.connection" Info)), + " Log up to info msgs" ; + + "-vv", Unit (fun () -> + Lwt_log_core.(add_rule "test.p2p.connection" Debug) ; + Lwt_log_core.(add_rule "p2p.connection" Debug)), + " Log up to debug msgs"; + + ] let main () = - listen default_addr >>= fun (main_socket, port) -> - let server = - Process.detach ~prefix:"server " begin fun () -> - Process.handle_error begin fun () -> - server main_socket - end - end in - let client = - Process.detach ~prefix:"client " begin fun () -> - Lwt_utils.safe_close main_socket >>= fun () -> - Process.handle_error begin fun () -> - client default_addr port - end - end in - Process.wait [ server ; client ] + let open Utils in + let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in + let usage_msg = "Usage: %s.\nArguments are:" in + Arg.parse spec anon_fun usage_msg ; + Test.run "p2p-connection." [ + "low-level", Low_level.run ; + "kick", Kick.run ; + "kicked", Kicked.run ; + "simple-message", Simple_message.run ; + "close-on-read", Close_on_read.run ; + "close-on-write", Close_on_write.run ; + ] let () = - Lwt_main.run (main ()) + Sys.catch_break true ; + try main () + with _ -> () diff --git a/test/p2p/test_p2p_connection_pool.ml b/test/p2p/test_p2p_connection_pool.ml index 9d1206f17..e81a31399 100644 --- a/test/p2p/test_p2p_connection_pool.ml +++ b/test/p2p/test_p2p_connection_pool.ml @@ -9,7 +9,7 @@ open Error_monad open P2p_types -include Logging.Make (struct let name = "test-p2p-connection-pool" end) +include Logging.Make (struct let name = "test.p2p.connection-pool" end) type message = | Ping @@ -36,97 +36,29 @@ let meta_config : metadata P2p_connection_pool.meta_config = { score = fun () -> 0. ; } -let rec connect ~timeout pool point = - lwt_log_info "Connect to %a" Point.pp point >>= fun () -> - P2p_connection_pool.connect pool point ~timeout >>= function - | Error [P2p_connection_pool.Connected] -> begin - match P2p_connection_pool.Connection.find_by_point pool point with - | Some conn -> return conn - | None -> failwith "Woops..." - end - | Error ([ P2p_connection_pool.Connection_refused - | P2p_connection_pool.Pending_connection - | P2p_connection.Rejected - | Lwt_utils.Canceled - | Lwt_utils.Timeout - | P2p_connection_pool.Rejected _ - ] as err) -> - lwt_log_info "@[Connection to %a failed:@ %a@]" - Point.pp point pp_print_error err >>= fun () -> - Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () -> - connect ~timeout pool point - | Ok _ | Error _ as res -> Lwt.return res - -let connect_all ~timeout pool points = - map_p (connect ~timeout pool) points - -type error += Connect | Write | Read - -let write_all conns msg = - iter_p - (fun conn -> - trace Write @@ P2p_connection_pool.write_sync conn msg) - conns - -let read_all conns = - iter_p - (fun conn -> - trace Read @@ P2p_connection_pool.read conn >>=? fun Ping -> - return ()) - conns - -let rec connect_random pool total rem point n = - Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> - (trace Connect @@ connect ~timeout:2. pool point) >>=? fun conn -> - (trace Write @@ P2p_connection_pool.write conn Ping) >>= fun _ -> - (trace Read @@ P2p_connection_pool.read conn) >>=? fun Ping -> - Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> - P2p_connection_pool.disconnect conn >>= fun () -> - begin - decr rem ; - if !rem mod total = 0 then - lwt_log_notice "Remaining: %d." (!rem / total) - else - Lwt.return () - end >>= fun () -> - if n > 1 then - connect_random pool total rem point (pred n) - else - return () - -let connect_random_all pool points n = - let total = List.length points in - let rem = ref (n * total) in - iter_p (fun point -> connect_random pool total rem point n) points - -let close_all conns = - Lwt_list.iter_p P2p_connection_pool.disconnect conns - - -let run_net config repeat points addr port = - Lwt_unix.sleep (Random.float 2.0) >>= fun () -> - let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in - P2p_connection_pool.create - config meta_config msg_config sched >>= fun pool -> - P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome -> - connect_all ~timeout:2. pool points >>=? fun conns -> - lwt_log_notice "Bootstrap OK" >>= fun () -> - write_all conns Ping >>=? fun () -> - lwt_log_notice "Sent all messages." >>= fun () -> - read_all conns >>=? fun () -> - lwt_log_notice "Read all messages." >>= fun () -> - close_all conns >>= fun () -> - lwt_log_notice "Begin random connections." >>= fun () -> - connect_random_all pool points repeat >>=? fun () -> - lwt_log_notice "Shutting down" >>= fun () -> - P2p_welcome.shutdown welcome >>= fun () -> - P2p_connection_pool.destroy pool >>= fun () -> - P2p_io_scheduler.shutdown sched >>= fun () -> - lwt_log_notice "Shutdown Ok" >>= fun () -> +let sync ch = + Process.Channel.push ch () >>=? fun () -> + Process.Channel.pop ch >>=? fun () -> return () -let make_net points repeat n = - let point, points = Utils.select n points in +let rec sync_nodes nodes = + iter_p + (fun { Process.channel } -> Process.Channel.pop channel) + nodes >>=? fun () -> + iter_p + (fun { Process.channel } -> Process.Channel.push channel ()) + nodes >>=? fun () -> + sync_nodes nodes + +let sync_nodes nodes = + sync_nodes nodes >>= function + | Ok () | Error (Exn End_of_file :: _) -> + return () + | Error e as err -> + Lwt.return err + +let detach_node f points n = + let (addr, port), points = Utils.select n points in let proof_of_work_target = Crypto_box.make_target 0. in let identity = Identity.generate proof_of_work_target in let nb_points = List.length points in @@ -136,7 +68,7 @@ let make_net points repeat n = trusted_points = points ; peers_file = "/dev/null" ; closed_network = true ; - listening_port = Some (snd point) ; + listening_port = Some port ; min_connections = nb_points ; max_connections = nb_points ; max_incoming_connections = nb_points ; @@ -151,17 +83,129 @@ let make_net points repeat n = swap_linger = 0. ; } in Process.detach - ~prefix:(Format.asprintf "%a " Peer_id.pp identity.peer_id) - begin fun () -> - run_net config repeat points (fst point) (snd point) >>= function - | Ok () -> Lwt.return_unit - | Error err -> - lwt_log_error "@[Unexpected error: %d@ %a@]" - (List.length err) - pp_print_error err >>= fun () -> - exit 1 + ~prefix:(Format.asprintf "%a: " Peer_id.pp_short identity.peer_id) + begin fun channel -> + let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in + P2p_connection_pool.create + config meta_config msg_config sched >>= fun pool -> + P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome -> + lwt_log_info "Node ready (port: %d)" port >>= fun () -> + sync channel >>=? fun () -> + f channel pool points >>=? fun () -> + lwt_log_info "Shutting down..." >>= fun () -> + P2p_welcome.shutdown welcome >>= fun () -> + P2p_connection_pool.destroy pool >>= fun () -> + P2p_io_scheduler.shutdown sched >>= fun () -> + lwt_log_info "Bye." >>= fun () -> + return () end +let detach_nodes ?(sync = 0) run_node points = + let open Utils in + let clients = List.length points in + Lwt_list.map_p + (detach_node run_node points) (0 -- (clients - 1)) >>= fun nodes -> + Lwt.ignore_result (sync_nodes nodes) ; + Process.wait_all nodes + +type error += Connect | Write | Read + +module Simple = struct + + let rec connect ~timeout pool point = + lwt_log_info "Connect to %a" Point.pp point >>= fun () -> + P2p_connection_pool.connect pool point ~timeout >>= function + | Error [P2p_connection_pool.Connected] -> begin + match P2p_connection_pool.Connection.find_by_point pool point with + | Some conn -> return conn + | None -> failwith "Woops..." + end + | Error ([ P2p_connection_pool.Connection_refused + | P2p_connection_pool.Pending_connection + | P2p_connection.Rejected + | Lwt_utils.Canceled + | Lwt_utils.Timeout + | P2p_connection_pool.Rejected _ + ] as err) -> + lwt_log_info "@[Connection to %a failed:@ %a@]" + Point.pp point pp_print_error err >>= fun () -> + Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () -> + connect ~timeout pool point + | Ok _ | Error _ as res -> Lwt.return res + + let connect_all ~timeout pool points = + map_p (connect ~timeout pool) points + + let write_all conns msg = + iter_p + (fun conn -> + trace Write @@ P2p_connection_pool.write_sync conn msg) + conns + + let read_all conns = + iter_p + (fun conn -> + trace Read @@ P2p_connection_pool.read conn >>=? fun Ping -> + return ()) + conns + + let close_all conns = + Lwt_list.iter_p P2p_connection_pool.disconnect conns + + let node channel pool points = + connect_all ~timeout:2. pool points >>=? fun conns -> + lwt_log_info "Bootstrap OK" >>= fun () -> + sync channel >>=? fun () -> + write_all conns Ping >>=? fun () -> + lwt_log_info "Sent all messages." >>= fun () -> + sync channel >>=? fun () -> + read_all conns >>=? fun () -> + lwt_log_info "Read all messages." >>= fun () -> + sync channel >>=? fun () -> + close_all conns >>= fun () -> + lwt_log_info "All connections successfully closed." >>= fun () -> + return () + + let run points = detach_nodes node points + +end + +module Random_connections = struct + + let rec connect_random pool total rem point n = + Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> + (trace Connect @@ Simple.connect ~timeout:2. pool point) >>=? fun conn -> + (trace Write @@ P2p_connection_pool.write conn Ping) >>= fun _ -> + (trace Read @@ P2p_connection_pool.read conn) >>=? fun Ping -> + Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> + P2p_connection_pool.disconnect conn >>= fun () -> + begin + decr rem ; + if !rem mod total = 0 then + lwt_log_info "Remaining: %d." (!rem / total) + else + Lwt.return () + end >>= fun () -> + if n > 1 then + connect_random pool total rem point (pred n) + else + return () + + let connect_random_all pool points n = + let total = List.length points in + let rem = ref (n * total) in + iter_p (fun point -> connect_random pool total rem point n) points + + let node repeat channel pool points = + lwt_log_info "Begin random connections." >>= fun () -> + connect_random_all pool points repeat >>=? fun () -> + lwt_log_info "Random connections OK." >>= fun () -> + return () + + let run points repeat = detach_nodes (node repeat) points + +end + let addr = ref Ipaddr.V6.localhost let port = ref (1024 + Random.int 8192) let clients = ref 10 @@ -179,25 +223,32 @@ let spec = Arg.[ "--repeat", Set_int repeat_connections, " Number of connections/disconnections." ; - "-v", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Info)), + + "-v", Unit (fun () -> + Lwt_log_core.(add_rule "test.p2p.connection-pool" Info) ; + Lwt_log_core.(add_rule "p2p.connection-pool" Info)), " Log up to info msgs" ; - "-vv", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Debug)), + "-vv", Unit (fun () -> + Lwt_log_core.(add_rule "test.p2p.connection-pool" Debug) ; + Lwt_log_core.(add_rule "p2p.connection-pool" Debug)), " Log up to debug msgs"; ] let main () = let open Utils in - Logging.init Stderr >>= fun () -> - let anon_fun _num_peers = raise (Arg.Bad "No anonymous argument.") in + let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in let usage_msg = "Usage: %s .\nArguments are:" in Arg.parse spec anon_fun usage_msg ; let ports = !port -- (!port + !clients - 1) in let points = List.map (fun port -> !addr, port) ports in - Lwt_list.iter_p (make_net points !repeat_connections) (0 -- (!clients - 1)) + Test.run "p2p-connection-pool." [ + "simple", (fun _ -> Simple.run points) ; + "random", (fun _ -> Random_connections.run points !repeat_connections) ; + ] let () = Sys.catch_break true ; - try Lwt_main.run @@ main () + try main () with _ -> () diff --git a/test/p2p/test_p2p_io_scheduler.ml b/test/p2p/test_p2p_io_scheduler.ml index ce6ba7872..78114e673 100644 --- a/test/p2p/test_p2p_io_scheduler.ml +++ b/test/p2p/test_p2p_io_scheduler.ml @@ -142,24 +142,20 @@ let run addr port time n = Logging.init Stderr >>= fun () -> listen ?port addr >>= fun (main_socket, port) -> - let server = - Process.detach ~prefix:"server " begin fun () -> - Process.handle_error begin fun () -> - server - ?display_client_stat ?max_download_speed - ~read_buffer_size ?read_queue_size - main_socket n - end - end in + Process.detach ~prefix:"server: " begin fun _ -> + server + ?display_client_stat ?max_download_speed + ~read_buffer_size ?read_queue_size + main_socket n + end >>= fun server_node -> let client n = - let prefix = Printf.sprintf "client(%d) " n in - Process.detach ~prefix begin fun () -> + let prefix = Printf.sprintf "client(%d): " n in + Process.detach ~prefix begin fun _ -> Lwt_utils.safe_close main_socket >>= fun () -> - Process.handle_error begin fun () -> - client ?max_upload_speed ?write_queue_size addr port time n - end + client ?max_upload_speed ?write_queue_size addr port time n end in - Process.wait (server :: List.map client Utils.(1 -- n)) + Lwt_list.map_p client Utils.(1 -- n) >>= fun client_nodes -> + Process.wait_all (server_node :: client_nodes) let () = Random.self_init () @@ -221,12 +217,14 @@ let () = let () = Sys.catch_break true ; - Lwt_main.run - (run - ?display_client_stat:!display_client_stat - ?max_download_speed:!max_download_speed - ?max_upload_speed:!max_upload_speed - ~read_buffer_size:!read_buffer_size - ?read_queue_size:!read_queue_size - ?write_queue_size:!write_queue_size - !addr !port !delay !clients) + Test.run "p2p.io-scheduler." [ + "trivial-quota", (fun _dir -> + run + ?display_client_stat:!display_client_stat + ?max_download_speed:!max_download_speed + ?max_upload_speed:!max_upload_speed + ~read_buffer_size:!read_buffer_size + ?read_queue_size:!read_queue_size + ?write_queue_size:!write_queue_size + !addr !port !delay !clients) + ]