From 4537c8780e38f66d3cfc4b0fb228bb1aa589b2bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 10 Apr 2017 00:09:01 +0200 Subject: [PATCH 1/7] Shell: introduce `Error_monad._assert`. --- src/utils/error_monad.ml | 42 ++++++++++++++++++++++++++++++++++++ src/utils/error_monad_sig.ml | 7 ++++++ 2 files changed, 49 insertions(+) diff --git a/src/utils/error_monad.ml b/src/utils/error_monad.ml index 85dd780c4..17c70ec0a 100644 --- a/src/utils/error_monad.ml +++ b/src/utils/error_monad.ml @@ -299,9 +299,15 @@ module Make() = struct let fail_unless cond exn = if cond then return () else fail exn + let fail_when cond exn = + if cond then fail exn else return () + let unless cond f = if cond then return () else f () + let _when cond f = + if cond then f () else return () + let pp_print_error ppf errors = match errors with | [] -> @@ -339,6 +345,42 @@ let () = error_kinds := Error_kind { id; from_error ; category; encoding_case ; pp } :: !error_kinds +type error += Assert_error of string * string + +let () = + let id = "" in + let category = `Permanent in + let to_error (loc, msg) = Assert_error (loc, msg) in + let from_error = function + | Assert_error (loc, msg) -> Some (loc, msg) + | _ -> None in + let title = "Assertion error" in + let description = "An fatal assertion" in + let encoding_case = + let open Data_encoding in + case + (describe ~title ~description @@ + conv (fun (x, y) -> ((), x, y)) (fun ((), x, y) -> (x, y)) @@ + (obj3 + (req "kind" (constant "assertion")) + (req "location" string) + (req "error" string))) + from_error to_error in + let pp ppf (loc, msg) = + Format.fprintf ppf + "Assert failure (%s)%s" + loc + (if msg = "" then "." else ": " ^ msg) in + error_kinds := + Error_kind { id; from_error ; category; encoding_case ; pp } :: !error_kinds + +let _assert b loc fmt = + if b then + Format.ikfprintf (fun _ -> return ()) Format.str_formatter fmt + else + Format.kasprintf (fun msg -> fail (Assert_error (loc, msg))) fmt + + let protect ~on_error t = t >>= function | Ok res -> return res diff --git a/src/utils/error_monad_sig.ml b/src/utils/error_monad_sig.ml index 493d3f000..fa7680236 100644 --- a/src/utils/error_monad_sig.ml +++ b/src/utils/error_monad_sig.ml @@ -99,8 +99,15 @@ module type S = sig (** Erroneous return on failed assertion *) val fail_unless : bool -> error -> unit tzresult Lwt.t + val fail_when : bool -> error -> unit tzresult Lwt.t val unless : bool -> (unit -> unit tzresult Lwt.t) -> unit tzresult Lwt.t + val _when : bool -> (unit -> unit tzresult Lwt.t) -> unit tzresult Lwt.t + + (* Usage: [_assert cond __LOC__ "" ...] *) + val _assert : + bool -> string -> + ('a, Format.formatter, unit, unit tzresult Lwt.t) format4 -> 'a val protect : on_error: (error list -> 'a tzresult Lwt.t) -> From 4523a67e7d18272d3a233fcc7757b8c86625fe24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 10 Apr 2017 00:16:17 +0200 Subject: [PATCH 2/7] Shell/P2p: fix connection close on read error --- src/node/net/p2p_connection.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index 26ba918db..b00c7619b 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -248,6 +248,7 @@ module Reader = struct | Ok true -> worker_loop st | Ok false -> + Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> lwt_debug "connection closed to %a" From c2a4db2d81737c8e1649d579ad87987de3d4492f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 10 Apr 2017 00:40:47 +0200 Subject: [PATCH 3/7] Shell/P2p: propagate disconnection errors through the `pool`. --- src/node/net/p2p_connection_pool.ml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index 450f6e026..61bab5608 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -117,7 +117,11 @@ module Answerer = struct | Ok (size, Message msg) -> st.callback.message size msg >>= fun () -> worker_loop st - | Ok (_, Disconnect) | Error [P2p_io_scheduler.Connection_closed] -> + | Ok (_, Disconnect)| Error [P2p_io_scheduler.Connection_closed] -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + | Error [P2p_connection.Decoding_error] -> + (* TODO: Penalize peer... *) Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit | Error [Lwt_utils.Canceled] -> @@ -916,6 +920,7 @@ and create_connection pool p2p_conn id_point point_info peer_info _version = Lwt_condition.broadcast pool.events.too_many_connections () ; log pool Too_many_connections ; end ; + Lwt_pipe.close messages ; P2p_connection.close ~wait:conn.wait_close conn.conn end ; List.iter (fun f -> f peer_id conn) pool.new_connection_hook ; From c187a0b792814499f2a69657fac2bb63292d4a10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sun, 9 Apr 2017 19:05:56 +0200 Subject: [PATCH 4/7] Shell/P2p: implements `raw_write_sync`. For testing only: it allows to send 'gardled' messages to a peer. --- src/node/net/p2p_connection.ml | 31 +++++++++++++++++----------- src/node/net/p2p_connection.mli | 5 +++++ src/node/net/p2p_connection_pool.ml | 3 +++ src/node/net/p2p_connection_pool.mli | 4 ++++ 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index b00c7619b..b4e439f9d 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -292,19 +292,18 @@ module Writer = struct canceler: Canceler.t ; conn: connection ; encoding: 'msg Data_encoding.t ; - messages: ('msg * unit tzresult Lwt.u option) Lwt_pipe.t ; + messages: (MBytes.t * unit tzresult Lwt.u option) Lwt_pipe.t ; mutable worker: unit Lwt.t ; } let encode_message st msg = - try return (Data_encoding.Binary.to_bytes st.encoding msg) - with _ -> fail Encoding_error + try ok (Data_encoding.Binary.to_bytes st.encoding msg) + with _ -> error Encoding_error let rec worker_loop st = Lwt_unix.yield () >>= fun () -> Lwt_utils.protect ~canceler:st.canceler begin fun () -> - Lwt_pipe.pop st.messages >>= fun (msg, wakener) -> - encode_message st msg >>=? fun buf -> + Lwt_pipe.pop st.messages >>= fun (buf, wakener) -> lwt_debug "writing %d bytes to %a" (MBytes.length buf) Connection_info.pp st.conn.info >>= fun () -> Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf >>= fun res -> @@ -326,10 +325,8 @@ module Writer = struct let run ?size conn encoding canceler = let compute_size = function - | msg, None -> - 10 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg - | msg, Some _ -> - 18 * (Sys.word_size / 8) + Data_encoding.Binary.length encoding msg + | buf, None -> Sys.word_size + MBytes.length buf + | buf, Some _ -> 2 * Sys.word_size + MBytes.length buf in let size = map_option size ~f:(fun max -> max, compute_size) in let st = @@ -403,18 +400,28 @@ let catch_closed_pipe f = let write { writer } msg = catch_closed_pipe begin fun () -> - Lwt_pipe.push writer.messages (msg, None) >>= return + Lwt.return (Writer.encode_message writer msg) >>=? fun buf -> + Lwt_pipe.push writer.messages (buf, None) >>= return end let write_sync { writer } msg = catch_closed_pipe begin fun () -> let waiter, wakener = Lwt.wait () in - Lwt_pipe.push writer.messages (msg, Some wakener) >>= fun () -> + Lwt.return (Writer.encode_message writer msg) >>=? fun buf -> + Lwt_pipe.push writer.messages (buf, Some wakener) >>= fun () -> waiter end let write_now { writer } msg = - try Ok (Lwt_pipe.push_now writer.messages (msg, None)) + Writer.encode_message writer msg >>? fun buf -> + try Ok (Lwt_pipe.push_now writer.messages (buf, None)) with Lwt_pipe.Closed -> Error [P2p_io_scheduler.Connection_closed] +let raw_write_sync { writer } bytes = + catch_closed_pipe begin fun () -> + let waiter, wakener = Lwt.wait () in + Lwt_pipe.push writer.messages (bytes, Some wakener) >>= fun () -> + waiter + end + let is_readable { reader } = not (Lwt_pipe.is_empty reader.messages) let wait_readable { reader } = diff --git a/src/node/net/p2p_connection.mli b/src/node/net/p2p_connection.mli index 60aa6dbf5..4ec769413 100644 --- a/src/node/net/p2p_connection.mli +++ b/src/node/net/p2p_connection.mli @@ -112,3 +112,8 @@ val stat: 'msg t -> Stat.t [conn]. *) val close: ?wait:bool -> 'msg t -> unit Lwt.t + +(**/**) + +(** for testing only *) +val raw_write_sync: 'msg t -> MBytes.t -> unit tzresult Lwt.t diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index 61bab5608..ad5abb600 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -509,6 +509,9 @@ let write { conn } msg = let write_sync { conn } msg = P2p_connection.write_sync conn (Message msg) +let raw_write_sync { conn } buf = + P2p_connection.raw_write_sync conn buf + let write_now { conn } msg = P2p_connection.write_now conn (Message msg) diff --git a/src/node/net/p2p_connection_pool.mli b/src/node/net/p2p_connection_pool.mli index 43d13fafb..ce645156d 100644 --- a/src/node/net/p2p_connection_pool.mli +++ b/src/node/net/p2p_connection_pool.mli @@ -253,6 +253,10 @@ val write_sync: ('msg, 'meta) connection -> 'msg -> unit tzresult Lwt.t (** [write_sync conn msg] is [P2p_connection.write_sync conn' msg] where [conn'] is the internal [P2p_connection.t] inside [conn]. *) +(**/**) +val raw_write_sync: ('msg, 'meta) connection -> MBytes.t -> unit tzresult Lwt.t +(**/**) + val write_now: ('msg, 'meta) connection -> 'msg -> bool tzresult (** [write_now conn msg] is [P2p_connection.write_now conn' msg] where [conn'] is the internal [P2p_connection.t] inside [conn]. *) From 413bddcd96a2981046f79bf423ae0f6ddc5dac01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 10 Apr 2017 00:10:42 +0200 Subject: [PATCH 5/7] Shell/P2p: propagate all errors to `{raw_,}write_sync`. Those functions are only used in the testsuite. --- src/node/net/p2p_connection.ml | 47 ++++++++++++++++++++++++----- src/node/net/p2p_connection_pool.ml | 2 +- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index b4e439f9d..68a059932 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -303,15 +303,8 @@ module Writer = struct let rec worker_loop st = Lwt_unix.yield () >>= fun () -> Lwt_utils.protect ~canceler:st.canceler begin fun () -> - Lwt_pipe.pop st.messages >>= fun (buf, wakener) -> - lwt_debug "writing %d bytes to %a" - (MBytes.length buf) Connection_info.pp st.conn.info >>= fun () -> - Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf >>= fun res -> - iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ; - Lwt.return res + Lwt_pipe.pop st.messages >>= return end >>= function - | Ok () -> - worker_loop st | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> lwt_debug "connection closed to %a" Connection_info.pp st.conn.info >>= fun () -> @@ -322,6 +315,38 @@ module Writer = struct Connection_info.pp st.conn.info pp_print_error err >>= fun () -> Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit + | Ok (buf, wakener) -> + lwt_debug "writing %d bytes to %a" + (MBytes.length buf) Connection_info.pp st.conn.info >>= fun () -> + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf + end >>= fun res -> + match res with + | Ok () -> + iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ; + worker_loop st + | Error err -> + iter_option wakener + ~f:(fun u -> + Lwt.wakeup_later u + (Error [P2p_io_scheduler.Connection_closed])) ; + match err with + | [ Lwt_utils.Canceled | Exn Lwt_pipe.Closed ] -> + lwt_debug "connection closed to %a" + Connection_info.pp st.conn.info >>= fun () -> + Lwt.return_unit + | [ P2p_io_scheduler.Connection_closed ] -> + lwt_debug "connection closed to %a" + Connection_info.pp st.conn.info >>= fun () -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + | err -> + lwt_log_error + "@[error writing to %a@ %a@]" + Connection_info.pp st.conn.info + pp_print_error err >>= fun () -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit let run ?size conn encoding canceler = let compute_size = function @@ -336,6 +361,11 @@ module Writer = struct } in Canceler.on_cancel st.canceler begin fun () -> Lwt_pipe.close st.messages ; + while not (Lwt_pipe.is_empty st.messages) do + let _, w = Lwt_pipe.pop_now_exn st.messages in + iter_option w + ~f:(fun u -> Lwt.wakeup_later u (Error [Exn Lwt_pipe.Closed])) + done ; Lwt.return_unit end ; st.worker <- @@ -451,3 +481,4 @@ let close ?(wait = false) st = Writer.shutdown st.writer >>= fun () -> P2p_io_scheduler.close st.conn.fd >>= fun _ -> Lwt.return_unit + diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index ad5abb600..22e4564a9 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -117,7 +117,7 @@ module Answerer = struct | Ok (size, Message msg) -> st.callback.message size msg >>= fun () -> worker_loop st - | Ok (_, Disconnect)| Error [P2p_io_scheduler.Connection_closed] -> + | Ok (_, Disconnect) | Error [P2p_io_scheduler.Connection_closed] -> Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit | Error [P2p_connection.Decoding_error] -> From e11e9c9ac5fc987c5e382e90a3bcaa6b69f0c98d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sun, 9 Apr 2017 22:52:24 +0200 Subject: [PATCH 6/7] Shell/P2p: Split the unit tests in smaller atom. And use more `Error_monad`... --- src/utils/error_monad.ml | 10 + src/utils/error_monad_sig.ml | 3 + test/lib/process.ml | 132 ++++++---- test/lib/process.mli | 20 +- test/lib/test.ml | 3 - test/p2p/Makefile | 4 +- test/p2p/test_p2p_connection.ml | 348 ++++++++++++++++++--------- test/p2p/test_p2p_connection_pool.ml | 263 ++++++++++++-------- test/p2p/test_p2p_io_scheduler.ml | 46 ++-- 9 files changed, 524 insertions(+), 305 deletions(-) diff --git a/src/utils/error_monad.ml b/src/utils/error_monad.ml index 17c70ec0a..37f669f51 100644 --- a/src/utils/error_monad.ml +++ b/src/utils/error_monad.ml @@ -286,6 +286,16 @@ module Make() = struct fold_right_s f t init >>=? fun acc -> f h acc + let rec join = function + | [] -> return () + | t :: ts -> + t >>= function + | Error _ as err -> + join ts >>=? fun () -> + Lwt.return err + | Ok () -> + join ts + let record_trace err result = match result with | Ok _ as res -> res diff --git a/src/utils/error_monad_sig.ml b/src/utils/error_monad_sig.ml index fa7680236..477a81991 100644 --- a/src/utils/error_monad_sig.ml +++ b/src/utils/error_monad_sig.ml @@ -144,4 +144,7 @@ module type S = sig val fold_right_s : ('a -> 'b -> 'b tzresult Lwt.t) -> 'a list -> 'b -> 'b tzresult Lwt.t + (** A {!Lwt.join} in the monad *) + val join : unit tzresult Lwt.t list -> unit tzresult Lwt.t + end diff --git a/test/lib/process.ml b/test/lib/process.ml index 979f741c1..586d4e830 100644 --- a/test/lib/process.ml +++ b/test/lib/process.ml @@ -7,41 +7,14 @@ (* *) (**************************************************************************) +let () = Lwt_unix.set_default_async_method Async_none + include Logging.Make (struct let name = "process" end) open Error_monad exception Exited of int -let detach ?(prefix = "") f = - Lwt_io.flush_all () >>= fun () -> - match Lwt_unix.fork () with - | 0 -> - Random.self_init () ; - let template = Format.asprintf "%s$(section): $(message)" prefix in - Lwt_main.run begin - Logging.init ~template Stderr >>= fun () -> - lwt_log_notice "PID: %d" (Unix.getpid ()) >>= fun () -> - f () - end ; - exit 0 - | pid -> - Lwt.catch - (fun () -> - Lwt_unix.waitpid [] pid >>= function - | (_,Lwt_unix.WEXITED 0) -> - Lwt.return_unit - | (_,Lwt_unix.WEXITED n) -> - Lwt.fail (Exited n) - | (_,Lwt_unix.WSIGNALED _) - | (_,Lwt_unix.WSTOPPED _) -> - Lwt.fail Exit) - (function - | Lwt.Canceled -> - Unix.kill pid Sys.sigkill ; - Lwt.return_unit - | exn -> Lwt.fail exn) - let handle_error f = Lwt.catch f @@ -51,27 +24,90 @@ let handle_error f = lwt_log_error "%a" Error_monad.pp_print_error err >>= fun () -> exit 1 -let rec wait processes = +module Channel = struct + type ('a, 'b) t = (Lwt_io.input_channel * Lwt_io.output_channel) + let push (_, outch) v = + Lwt.catch + (fun () -> Lwt_io.write_value outch v >>= return) + (fun exn -> fail (Exn exn)) + let pop (inch, _) = + Lwt.catch + (fun () -> Lwt_io.read_value inch >>= return) + (fun exn -> fail (Exn exn)) +end + +let wait pid = Lwt.catch (fun () -> - Lwt.nchoose_split processes >>= function - | (_, []) -> lwt_log_notice "All done!" - | (_, processes) -> wait processes) + Lwt_unix.waitpid [] pid >>= function + | (_,Lwt_unix.WEXITED 0) -> + return () + | (_,Lwt_unix.WEXITED n) -> + fail (Exn (Exited n)) + | (_,Lwt_unix.WSIGNALED _) + | (_,Lwt_unix.WSTOPPED _) -> + Lwt.fail Exit) (function - | Exited n -> - lwt_log_notice "Early error!" >>= fun () -> - List.iter Lwt.cancel processes ; - Lwt.catch - (fun () -> Lwt.join processes) - (fun _ -> Lwt.return_unit) >>= fun () -> - lwt_log_notice "A process finished with error %d !" n >>= fun () -> - Pervasives.exit n + | Lwt.Canceled -> + Unix.kill pid Sys.sigkill ; + return () | exn -> - lwt_log_notice "Unexpected error!%a" - Error_monad.pp_exn exn >>= fun () -> - List.iter Lwt.cancel processes ; - Lwt.catch - (fun () -> Lwt.join processes) - (fun _ -> Lwt.return_unit) >>= fun () -> - Pervasives.exit 2) + fail (Exn exn)) + +type ('a, 'b) t = { + termination: unit tzresult Lwt.t ; + channel: ('b, 'a) Channel.t ; +} + +let detach ?(prefix = "") f = + Lwt_io.flush_all () >>= fun () -> + let main_in, child_out = Lwt_io.pipe () in + let child_in, main_out = Lwt_io.pipe () in + match Lwt_unix.fork () with + | 0 -> + Logging.init Stderr >>= fun () -> + Random.self_init () ; + let template = Format.asprintf "%s$(message)" prefix in + Lwt_main.run begin + Lwt_io.close main_in >>= fun () -> + Lwt_io.close main_out >>= fun () -> + Logging.init ~template Stderr >>= fun () -> + lwt_log_info "PID: %d" (Unix.getpid ()) >>= fun () -> + handle_error (fun () -> f (child_in, child_out)) + end ; + exit 0 + | pid -> + let termination = wait pid in + Lwt_io.close child_in >>= fun () -> + Lwt_io.close child_out >>= fun () -> + Lwt.return { termination ; channel = (main_in, main_out) } + +let wait_all processes = + let rec loop processes = + match processes with + | [] -> Lwt.return_none + | processes -> + Lwt.nchoose_split processes >>= function + | (finished, remaining) -> + let rec handle = function + | [] -> loop remaining + | Ok () :: finished -> handle finished + | Error err :: _ -> + Lwt.return (Some (err, remaining)) in + handle finished in + loop (List.map (fun p -> p.termination) processes) >>= function + | None -> + lwt_log_info "All done!" >>= fun () -> + return () + | Some ([Exn (Exited n)], remaining) -> + lwt_log_error "Early error!" >>= fun () -> + List.iter Lwt.cancel remaining ; + join remaining >>= fun _ -> + failwith "A process finished with error %d !" n + | Some (err, remaining) -> + lwt_log_error "Unexpected error!%a" + pp_print_error err >>= fun () -> + List.iter Lwt.cancel remaining ; + join remaining >>= fun _ -> + failwith "A process finished with an unexpected error !" diff --git a/test/lib/process.mli b/test/lib/process.mli index c1933cc11..67dff1453 100644 --- a/test/lib/process.mli +++ b/test/lib/process.mli @@ -10,6 +10,20 @@ open Error_monad exception Exited of int -val detach: ?prefix:string -> (unit -> unit Lwt.t) -> unit Lwt.t -val handle_error: (unit -> (unit, error list) result Lwt.t) -> unit Lwt.t -val wait: unit Lwt.t list -> unit Lwt.t +module Channel : sig + type ('a, 'b) t + val push: ('a, 'b) t -> 'a -> unit tzresult Lwt.t + val pop: ('a, 'b) t -> 'b tzresult Lwt.t +end + +type ('a, 'b) t = { + termination: unit tzresult Lwt.t ; + channel: ('b, 'a) Channel.t ; +} + +val detach: + ?prefix:string -> + (('a, 'b) Channel.t -> unit tzresult Lwt.t) -> + ('a, 'b) t Lwt.t + +val wait_all: ('a, 'b) t list -> unit tzresult Lwt.t diff --git a/test/lib/test.ml b/test/lib/test.ml index ebd78d6a4..cfc648961 100644 --- a/test/lib/test.ml +++ b/test/lib/test.ml @@ -11,9 +11,6 @@ module Test = Kaputt.Abbreviations.Test let keep_dir = try ignore (Sys.getenv "KEEPDIR") ; true with _ -> false -let make_test ~title test = - Test.add_simple_test ~title (fun () -> Lwt_main.run (test ())) - let rec remove_dir dir = if Sys.file_exists dir then begin Array.iter (fun file -> diff --git a/test/p2p/Makefile b/test/p2p/Makefile index 0523a3392..e38f215d7 100644 --- a/test/p2p/Makefile +++ b/test/p2p/Makefile @@ -24,7 +24,7 @@ OPENED_MODULES := ${NODE_OPENED_MODULES} .PHONY:run-test-p2p-connection run-test-p2p-connection: @echo - ./test-p2p-connection + ./test-p2p-connection -v TEST_CONNECTION_IMPLS := \ test_p2p_connection.ml @@ -42,7 +42,7 @@ clean:: .PHONY:run-test-p2p-connection-pool run-test-p2p-connection-pool: @echo - ./test-p2p-connection-pool --clients 10 --repeat 5 + ./test-p2p-connection-pool --clients 10 --repeat 5 -v TEST_CONNECTION_POOL_IMPLS := \ test_p2p_connection_pool.ml diff --git a/test/p2p/test_p2p_connection.ml b/test/p2p/test_p2p_connection.ml index 0a14e6f8b..b2e199aa8 100644 --- a/test/p2p/test_p2p_connection.ml +++ b/test/p2p/test_p2p_connection.ml @@ -12,7 +12,9 @@ open Error_monad open P2p_types -include Logging.Make (struct let name = "test-p2p-connection" end) +include Logging.Make (struct let name = "test.p2p.connection" end) + +let default_addr = Ipaddr.V6.localhost let proof_of_work_target = Crypto_box.make_target 16. let id1 = Identity.generate proof_of_work_target @@ -44,6 +46,46 @@ let rec listen ?port addr = | exn -> Lwt.fail exn end +let sync ch = + Process.Channel.push ch () >>=? fun () -> + Process.Channel.pop ch >>=? fun () -> + return () + +let rec sync_nodes nodes = + iter_p + (fun { Process.channel } -> Process.Channel.pop channel) + nodes >>=? fun () -> + iter_p + (fun { Process.channel } -> Process.Channel.push channel ()) + nodes >>=? fun () -> + sync_nodes nodes + +let sync_nodes nodes = + sync_nodes nodes >>= function + | Ok () | Error (Exn End_of_file :: _) -> + return () + | Error e as err -> + Lwt.return err + +let run_nodes client server = + listen default_addr >>= fun (main_socket, port) -> + Process.detach ~prefix:"server: " begin fun channel -> + let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in + server channel sched main_socket >>=? fun () -> + P2p_io_scheduler.shutdown sched >>= fun () -> + return () + end >>= fun server_node -> + Process.detach ~prefix:"client: " begin fun channel -> + Lwt_utils.safe_close main_socket >>= fun () -> + let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in + client channel sched default_addr port >>=? fun () -> + P2p_io_scheduler.shutdown sched >>= fun () -> + return () + end >>= fun client_node -> + let nodes = [ server_node ; client_node ] in + Lwt.ignore_result (sync_nodes nodes) ; + Process.wait_all nodes + let raw_accept sched main_socket = Lwt_unix.accept main_socket >>= fun (fd, sockaddr) -> let fd = P2p_io_scheduler.register sched fd in @@ -73,20 +115,11 @@ let connect sched addr port id = P2p_connection.authenticate ~proof_of_work_target ~incoming:false fd (addr, port) id versions >>=? fun (info, auth_fd) -> - assert (not info.incoming) ; - assert (Peer_id.compare info.peer_id id1.peer_id = 0) ; + _assert (not info.incoming) __LOC__ "" >>=? fun () -> + _assert (Peer_id.compare info.peer_id id1.peer_id = 0) + __LOC__ "" >>=? fun () -> return auth_fd -let simple_msg = - MBytes.create (1 lsl 1) - -let is_rejected = function - | Error [P2p_connection.Rejected] -> true - | Ok _ -> false - | Error err -> - log_notice "Error: %a" pp_print_error err ; - false - let is_connection_closed = function | Error [P2p_io_scheduler.Connection_closed] -> true | Ok _ -> false @@ -94,116 +127,193 @@ let is_connection_closed = function log_notice "Error: %a" pp_print_error err ; false -let bytes_encoding = Data_encoding.Variable.bytes +let is_decoding_error = function + | Error [P2p_connection.Decoding_error] -> true + | Ok _ -> false + | Error err -> + log_notice "Error: %a" pp_print_error err ; + false -let server main_socket = - let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in - (* Low-level test. *) - raw_accept sched main_socket >>= fun (fd, _point) -> - lwt_log_notice "Low_level" >>= fun () -> - P2p_io_scheduler.write fd simple_msg >>=? fun () -> - P2p_io_scheduler.close fd >>=? fun _ -> - lwt_log_notice "Low_level OK" >>= fun () -> - (* Kick the first connection. *) - accept sched main_socket >>=? fun (info, auth_fd) -> - lwt_log_notice "Kick" >>= fun () -> - assert (info.incoming) ; - assert (Peer_id.compare info.peer_id id2.peer_id = 0) ; - P2p_connection.kick auth_fd >>= fun () -> - lwt_log_notice "Kick OK" >>= fun () -> - (* Let's be rejected. *) - accept sched main_socket >>=? fun (_info, auth_fd) -> - P2p_connection.accept auth_fd bytes_encoding >>= fun conn -> - assert (is_rejected conn) ; - lwt_log_notice "Kicked OK" >>= fun () -> - (* Accept and send a single message. *) - accept sched main_socket >>=? fun (_info, auth_fd) -> - lwt_log_notice "Single" >>= fun () -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.write_sync conn simple_msg >>=? fun () -> - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Single OK" >>= fun () -> - (* Accept and send a single message, while the client expected 2. *) - accept sched main_socket >>=? fun (_info, auth_fd) -> - lwt_log_notice "Early close (read)" >>= fun () -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.write_sync conn simple_msg >>=? fun () -> - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Early close (read) OK" >>= fun () -> - (* Accept and wait for the client to close the connection. *) - accept sched main_socket >>=? fun (_info, auth_fd) -> - lwt_log_notice "Early close (write)" >>= fun () -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Early close (write) OK" >>= fun () -> - P2p_io_scheduler.shutdown sched >>= fun () -> - Lwt_unix.sleep 0.2 >>= fun () -> - lwt_log_notice "Success" >>= fun () -> - return () +module Low_level = struct -let client addr port = - let msg = MBytes.create (MBytes.length simple_msg) in - let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in - raw_connect sched addr port >>= fun fd -> - P2p_io_scheduler.read_full fd msg >>=? fun () -> - assert (MBytes.compare simple_msg msg = 0) ; - P2p_io_scheduler.close fd >>=? fun () -> - lwt_log_notice "Low_level OK" >>= fun () -> - (* let's be rejected. *) - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.accept auth_fd bytes_encoding >>= fun conn -> - assert (is_rejected conn) ; - lwt_log_notice "Kick OK" >>= fun () -> - (* let's reject! *) - lwt_log_notice "Kicked" >>= fun () -> - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.kick auth_fd >>= fun () -> - (* let's exchange a simple message. *) - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.read conn >>=? fun (_msg_size, msg) -> - assert (MBytes.compare simple_msg msg = 0) ; - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Simple OK" >>= fun () -> - (* let's detect a closed connection on `read`. *) - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - P2p_connection.read conn >>=? fun (_msg_size, msg) -> - assert (MBytes.compare simple_msg msg = 0) ; - P2p_connection.read conn >>= fun msg -> - assert (is_connection_closed msg) ; - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Early close (read) OK" >>= fun () -> - (* let's detect a closed connection on `write`. *) - connect sched addr port id2 >>=? fun auth_fd -> - P2p_connection.accept auth_fd bytes_encoding >>=? fun conn -> - Lwt_unix.sleep 0.1 >>= fun () -> - P2p_connection.write_sync conn simple_msg >>= fun unit -> - assert (is_connection_closed unit) ; - P2p_connection.close conn >>= fun _stat -> - lwt_log_notice "Early close (write) OK" >>= fun () -> - P2p_io_scheduler.shutdown sched >>= fun () -> - lwt_log_notice "Success" >>= fun () -> - return () + let simple_msg = MBytes.create (1 lsl 4) -let default_addr = Ipaddr.V6.localhost + let client _ch sched addr port = + let msg = MBytes.create (MBytes.length simple_msg) in + raw_connect sched addr port >>= fun fd -> + P2p_io_scheduler.read_full fd msg >>=? fun () -> + _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> + P2p_io_scheduler.close fd >>=? fun () -> + return () + + let server _ch sched socket = + raw_accept sched socket >>= fun (fd, point) -> + P2p_io_scheduler.write fd simple_msg >>=? fun () -> + P2p_io_scheduler.close fd >>=? fun _ -> + return () + + let run _dir = run_nodes client server + +end + +module Kick = struct + + let encoding = Data_encoding.bytes + + let is_rejected = function + | Error [P2p_connection.Rejected] -> true + | Ok _ -> false + | Error err -> + log_notice "Error: %a" pp_print_error err ; + false + + let server _ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + _assert (info.incoming) __LOC__ "" >>=? fun () -> + _assert (Peer_id.compare info.peer_id id2.peer_id = 0) + __LOC__ "" >>=? fun () -> + P2p_connection.kick auth_fd >>= fun () -> + return () + + let client _ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd encoding >>= fun conn -> + _assert (is_rejected conn) __LOC__ "" >>=? fun () -> + return () + + let run _dir = run_nodes client server + +end + +module Kicked = struct + + let encoding = Data_encoding.bytes + + let server _ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd encoding >>= fun conn -> + _assert (Kick.is_rejected conn) __LOC__ "" >>=? fun () -> + return () + + let client _ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.kick auth_fd >>= fun () -> + return () + + let run _dir = run_nodes client server + +end + +module Simple_message = struct + + let encoding = Data_encoding.bytes + + let simple_msg = MBytes.create (1 lsl 4) + let simple_msg2 = MBytes.create (1 lsl 4) + + let server ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.write_sync conn simple_msg >>=? fun () -> + P2p_connection.read conn >>=? fun (_msg_size, msg) -> + _assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () -> + sync ch >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let client ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.write_sync conn simple_msg2 >>=? fun () -> + P2p_connection.read conn >>=? fun (_msg_size, msg) -> + _assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () -> + sync ch >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let run _dir = run_nodes client server + +end + +module Close_on_read = struct + + let encoding = Data_encoding.bytes + + let simple_msg = MBytes.create (1 lsl 4) + + let server _ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.close conn >>= fun _stat -> + return () + + let client _ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.read conn >>= fun err -> + _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let run _dir = run_nodes client server + +end + +module Close_on_write = struct + + let encoding = Data_encoding.bytes + + let simple_msg = MBytes.create (1 lsl 4) + + let server ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.close conn >>= fun _stat -> + sync ch >>=? fun ()-> + return () + + let client ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + sync ch >>=? fun ()-> + P2p_connection.write_sync conn simple_msg >>= fun err -> + _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let run _dir = run_nodes client server + +end + +let spec = Arg.[ + + "-v", Unit (fun () -> + Lwt_log_core.(add_rule "test.p2p.connection" Info) ; + Lwt_log_core.(add_rule "p2p.connection" Info)), + " Log up to info msgs" ; + + "-vv", Unit (fun () -> + Lwt_log_core.(add_rule "test.p2p.connection" Debug) ; + Lwt_log_core.(add_rule "p2p.connection" Debug)), + " Log up to debug msgs"; + + ] let main () = - listen default_addr >>= fun (main_socket, port) -> - let server = - Process.detach ~prefix:"server " begin fun () -> - Process.handle_error begin fun () -> - server main_socket - end - end in - let client = - Process.detach ~prefix:"client " begin fun () -> - Lwt_utils.safe_close main_socket >>= fun () -> - Process.handle_error begin fun () -> - client default_addr port - end - end in - Process.wait [ server ; client ] + let open Utils in + let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in + let usage_msg = "Usage: %s.\nArguments are:" in + Arg.parse spec anon_fun usage_msg ; + Test.run "p2p-connection." [ + "low-level", Low_level.run ; + "kick", Kick.run ; + "kicked", Kicked.run ; + "simple-message", Simple_message.run ; + "close-on-read", Close_on_read.run ; + "close-on-write", Close_on_write.run ; + ] let () = - Lwt_main.run (main ()) + Sys.catch_break true ; + try main () + with _ -> () diff --git a/test/p2p/test_p2p_connection_pool.ml b/test/p2p/test_p2p_connection_pool.ml index 9d1206f17..e81a31399 100644 --- a/test/p2p/test_p2p_connection_pool.ml +++ b/test/p2p/test_p2p_connection_pool.ml @@ -9,7 +9,7 @@ open Error_monad open P2p_types -include Logging.Make (struct let name = "test-p2p-connection-pool" end) +include Logging.Make (struct let name = "test.p2p.connection-pool" end) type message = | Ping @@ -36,97 +36,29 @@ let meta_config : metadata P2p_connection_pool.meta_config = { score = fun () -> 0. ; } -let rec connect ~timeout pool point = - lwt_log_info "Connect to %a" Point.pp point >>= fun () -> - P2p_connection_pool.connect pool point ~timeout >>= function - | Error [P2p_connection_pool.Connected] -> begin - match P2p_connection_pool.Connection.find_by_point pool point with - | Some conn -> return conn - | None -> failwith "Woops..." - end - | Error ([ P2p_connection_pool.Connection_refused - | P2p_connection_pool.Pending_connection - | P2p_connection.Rejected - | Lwt_utils.Canceled - | Lwt_utils.Timeout - | P2p_connection_pool.Rejected _ - ] as err) -> - lwt_log_info "@[Connection to %a failed:@ %a@]" - Point.pp point pp_print_error err >>= fun () -> - Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () -> - connect ~timeout pool point - | Ok _ | Error _ as res -> Lwt.return res - -let connect_all ~timeout pool points = - map_p (connect ~timeout pool) points - -type error += Connect | Write | Read - -let write_all conns msg = - iter_p - (fun conn -> - trace Write @@ P2p_connection_pool.write_sync conn msg) - conns - -let read_all conns = - iter_p - (fun conn -> - trace Read @@ P2p_connection_pool.read conn >>=? fun Ping -> - return ()) - conns - -let rec connect_random pool total rem point n = - Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> - (trace Connect @@ connect ~timeout:2. pool point) >>=? fun conn -> - (trace Write @@ P2p_connection_pool.write conn Ping) >>= fun _ -> - (trace Read @@ P2p_connection_pool.read conn) >>=? fun Ping -> - Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> - P2p_connection_pool.disconnect conn >>= fun () -> - begin - decr rem ; - if !rem mod total = 0 then - lwt_log_notice "Remaining: %d." (!rem / total) - else - Lwt.return () - end >>= fun () -> - if n > 1 then - connect_random pool total rem point (pred n) - else - return () - -let connect_random_all pool points n = - let total = List.length points in - let rem = ref (n * total) in - iter_p (fun point -> connect_random pool total rem point n) points - -let close_all conns = - Lwt_list.iter_p P2p_connection_pool.disconnect conns - - -let run_net config repeat points addr port = - Lwt_unix.sleep (Random.float 2.0) >>= fun () -> - let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in - P2p_connection_pool.create - config meta_config msg_config sched >>= fun pool -> - P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome -> - connect_all ~timeout:2. pool points >>=? fun conns -> - lwt_log_notice "Bootstrap OK" >>= fun () -> - write_all conns Ping >>=? fun () -> - lwt_log_notice "Sent all messages." >>= fun () -> - read_all conns >>=? fun () -> - lwt_log_notice "Read all messages." >>= fun () -> - close_all conns >>= fun () -> - lwt_log_notice "Begin random connections." >>= fun () -> - connect_random_all pool points repeat >>=? fun () -> - lwt_log_notice "Shutting down" >>= fun () -> - P2p_welcome.shutdown welcome >>= fun () -> - P2p_connection_pool.destroy pool >>= fun () -> - P2p_io_scheduler.shutdown sched >>= fun () -> - lwt_log_notice "Shutdown Ok" >>= fun () -> +let sync ch = + Process.Channel.push ch () >>=? fun () -> + Process.Channel.pop ch >>=? fun () -> return () -let make_net points repeat n = - let point, points = Utils.select n points in +let rec sync_nodes nodes = + iter_p + (fun { Process.channel } -> Process.Channel.pop channel) + nodes >>=? fun () -> + iter_p + (fun { Process.channel } -> Process.Channel.push channel ()) + nodes >>=? fun () -> + sync_nodes nodes + +let sync_nodes nodes = + sync_nodes nodes >>= function + | Ok () | Error (Exn End_of_file :: _) -> + return () + | Error e as err -> + Lwt.return err + +let detach_node f points n = + let (addr, port), points = Utils.select n points in let proof_of_work_target = Crypto_box.make_target 0. in let identity = Identity.generate proof_of_work_target in let nb_points = List.length points in @@ -136,7 +68,7 @@ let make_net points repeat n = trusted_points = points ; peers_file = "/dev/null" ; closed_network = true ; - listening_port = Some (snd point) ; + listening_port = Some port ; min_connections = nb_points ; max_connections = nb_points ; max_incoming_connections = nb_points ; @@ -151,17 +83,129 @@ let make_net points repeat n = swap_linger = 0. ; } in Process.detach - ~prefix:(Format.asprintf "%a " Peer_id.pp identity.peer_id) - begin fun () -> - run_net config repeat points (fst point) (snd point) >>= function - | Ok () -> Lwt.return_unit - | Error err -> - lwt_log_error "@[Unexpected error: %d@ %a@]" - (List.length err) - pp_print_error err >>= fun () -> - exit 1 + ~prefix:(Format.asprintf "%a: " Peer_id.pp_short identity.peer_id) + begin fun channel -> + let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in + P2p_connection_pool.create + config meta_config msg_config sched >>= fun pool -> + P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome -> + lwt_log_info "Node ready (port: %d)" port >>= fun () -> + sync channel >>=? fun () -> + f channel pool points >>=? fun () -> + lwt_log_info "Shutting down..." >>= fun () -> + P2p_welcome.shutdown welcome >>= fun () -> + P2p_connection_pool.destroy pool >>= fun () -> + P2p_io_scheduler.shutdown sched >>= fun () -> + lwt_log_info "Bye." >>= fun () -> + return () end +let detach_nodes ?(sync = 0) run_node points = + let open Utils in + let clients = List.length points in + Lwt_list.map_p + (detach_node run_node points) (0 -- (clients - 1)) >>= fun nodes -> + Lwt.ignore_result (sync_nodes nodes) ; + Process.wait_all nodes + +type error += Connect | Write | Read + +module Simple = struct + + let rec connect ~timeout pool point = + lwt_log_info "Connect to %a" Point.pp point >>= fun () -> + P2p_connection_pool.connect pool point ~timeout >>= function + | Error [P2p_connection_pool.Connected] -> begin + match P2p_connection_pool.Connection.find_by_point pool point with + | Some conn -> return conn + | None -> failwith "Woops..." + end + | Error ([ P2p_connection_pool.Connection_refused + | P2p_connection_pool.Pending_connection + | P2p_connection.Rejected + | Lwt_utils.Canceled + | Lwt_utils.Timeout + | P2p_connection_pool.Rejected _ + ] as err) -> + lwt_log_info "@[Connection to %a failed:@ %a@]" + Point.pp point pp_print_error err >>= fun () -> + Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () -> + connect ~timeout pool point + | Ok _ | Error _ as res -> Lwt.return res + + let connect_all ~timeout pool points = + map_p (connect ~timeout pool) points + + let write_all conns msg = + iter_p + (fun conn -> + trace Write @@ P2p_connection_pool.write_sync conn msg) + conns + + let read_all conns = + iter_p + (fun conn -> + trace Read @@ P2p_connection_pool.read conn >>=? fun Ping -> + return ()) + conns + + let close_all conns = + Lwt_list.iter_p P2p_connection_pool.disconnect conns + + let node channel pool points = + connect_all ~timeout:2. pool points >>=? fun conns -> + lwt_log_info "Bootstrap OK" >>= fun () -> + sync channel >>=? fun () -> + write_all conns Ping >>=? fun () -> + lwt_log_info "Sent all messages." >>= fun () -> + sync channel >>=? fun () -> + read_all conns >>=? fun () -> + lwt_log_info "Read all messages." >>= fun () -> + sync channel >>=? fun () -> + close_all conns >>= fun () -> + lwt_log_info "All connections successfully closed." >>= fun () -> + return () + + let run points = detach_nodes node points + +end + +module Random_connections = struct + + let rec connect_random pool total rem point n = + Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> + (trace Connect @@ Simple.connect ~timeout:2. pool point) >>=? fun conn -> + (trace Write @@ P2p_connection_pool.write conn Ping) >>= fun _ -> + (trace Read @@ P2p_connection_pool.read conn) >>=? fun Ping -> + Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () -> + P2p_connection_pool.disconnect conn >>= fun () -> + begin + decr rem ; + if !rem mod total = 0 then + lwt_log_info "Remaining: %d." (!rem / total) + else + Lwt.return () + end >>= fun () -> + if n > 1 then + connect_random pool total rem point (pred n) + else + return () + + let connect_random_all pool points n = + let total = List.length points in + let rem = ref (n * total) in + iter_p (fun point -> connect_random pool total rem point n) points + + let node repeat channel pool points = + lwt_log_info "Begin random connections." >>= fun () -> + connect_random_all pool points repeat >>=? fun () -> + lwt_log_info "Random connections OK." >>= fun () -> + return () + + let run points repeat = detach_nodes (node repeat) points + +end + let addr = ref Ipaddr.V6.localhost let port = ref (1024 + Random.int 8192) let clients = ref 10 @@ -179,25 +223,32 @@ let spec = Arg.[ "--repeat", Set_int repeat_connections, " Number of connections/disconnections." ; - "-v", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Info)), + + "-v", Unit (fun () -> + Lwt_log_core.(add_rule "test.p2p.connection-pool" Info) ; + Lwt_log_core.(add_rule "p2p.connection-pool" Info)), " Log up to info msgs" ; - "-vv", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Debug)), + "-vv", Unit (fun () -> + Lwt_log_core.(add_rule "test.p2p.connection-pool" Debug) ; + Lwt_log_core.(add_rule "p2p.connection-pool" Debug)), " Log up to debug msgs"; ] let main () = let open Utils in - Logging.init Stderr >>= fun () -> - let anon_fun _num_peers = raise (Arg.Bad "No anonymous argument.") in + let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in let usage_msg = "Usage: %s .\nArguments are:" in Arg.parse spec anon_fun usage_msg ; let ports = !port -- (!port + !clients - 1) in let points = List.map (fun port -> !addr, port) ports in - Lwt_list.iter_p (make_net points !repeat_connections) (0 -- (!clients - 1)) + Test.run "p2p-connection-pool." [ + "simple", (fun _ -> Simple.run points) ; + "random", (fun _ -> Random_connections.run points !repeat_connections) ; + ] let () = Sys.catch_break true ; - try Lwt_main.run @@ main () + try main () with _ -> () diff --git a/test/p2p/test_p2p_io_scheduler.ml b/test/p2p/test_p2p_io_scheduler.ml index ce6ba7872..78114e673 100644 --- a/test/p2p/test_p2p_io_scheduler.ml +++ b/test/p2p/test_p2p_io_scheduler.ml @@ -142,24 +142,20 @@ let run addr port time n = Logging.init Stderr >>= fun () -> listen ?port addr >>= fun (main_socket, port) -> - let server = - Process.detach ~prefix:"server " begin fun () -> - Process.handle_error begin fun () -> - server - ?display_client_stat ?max_download_speed - ~read_buffer_size ?read_queue_size - main_socket n - end - end in + Process.detach ~prefix:"server: " begin fun _ -> + server + ?display_client_stat ?max_download_speed + ~read_buffer_size ?read_queue_size + main_socket n + end >>= fun server_node -> let client n = - let prefix = Printf.sprintf "client(%d) " n in - Process.detach ~prefix begin fun () -> + let prefix = Printf.sprintf "client(%d): " n in + Process.detach ~prefix begin fun _ -> Lwt_utils.safe_close main_socket >>= fun () -> - Process.handle_error begin fun () -> - client ?max_upload_speed ?write_queue_size addr port time n - end + client ?max_upload_speed ?write_queue_size addr port time n end in - Process.wait (server :: List.map client Utils.(1 -- n)) + Lwt_list.map_p client Utils.(1 -- n) >>= fun client_nodes -> + Process.wait_all (server_node :: client_nodes) let () = Random.self_init () @@ -221,12 +217,14 @@ let () = let () = Sys.catch_break true ; - Lwt_main.run - (run - ?display_client_stat:!display_client_stat - ?max_download_speed:!max_download_speed - ?max_upload_speed:!max_upload_speed - ~read_buffer_size:!read_buffer_size - ?read_queue_size:!read_queue_size - ?write_queue_size:!write_queue_size - !addr !port !delay !clients) + Test.run "p2p.io-scheduler." [ + "trivial-quota", (fun _dir -> + run + ?display_client_stat:!display_client_stat + ?max_download_speed:!max_download_speed + ?max_upload_speed:!max_upload_speed + ~read_buffer_size:!read_buffer_size + ?read_queue_size:!read_queue_size + ?write_queue_size:!write_queue_size + !addr !port !delay !clients) + ] From 3d00dcf19fc80afa163f0babb56d8b02faefc4c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 10 Apr 2017 00:44:10 +0200 Subject: [PATCH 7/7] Shell/P2p: add unit test with "garbled" messages. --- test/p2p/test_p2p_connection.ml | 28 ++++++++++++++++++++++++++ test/p2p/test_p2p_connection_pool.ml | 30 ++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/test/p2p/test_p2p_connection.ml b/test/p2p/test_p2p_connection.ml index b2e199aa8..6b5e7709d 100644 --- a/test/p2p/test_p2p_connection.ml +++ b/test/p2p/test_p2p_connection.ml @@ -285,6 +285,33 @@ module Close_on_write = struct end +module Garbled_data = struct + + let encoding = Data_encoding.bytes + + let garbled_msg = MBytes.create (1 lsl 4) + + let server ch sched socket = + accept sched socket >>=? fun (info, auth_fd) -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.raw_write_sync conn garbled_msg >>=? fun () -> + P2p_connection.read conn >>= fun err -> + _assert (is_connection_closed err) __LOC__ "" >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let client ch sched addr port = + connect sched addr port id2 >>=? fun auth_fd -> + P2p_connection.accept auth_fd encoding >>=? fun conn -> + P2p_connection.read conn >>= fun err -> + _assert (is_decoding_error err) __LOC__ "" >>=? fun () -> + P2p_connection.close conn >>= fun _stat -> + return () + + let run _dir = run_nodes client server + +end + let spec = Arg.[ "-v", Unit (fun () -> @@ -311,6 +338,7 @@ let main () = "simple-message", Simple_message.run ; "close-on-read", Close_on_read.run ; "close-on-write", Close_on_write.run ; + "garbled-data", Garbled_data.run ; ] let () = diff --git a/test/p2p/test_p2p_connection_pool.ml b/test/p2p/test_p2p_connection_pool.ml index e81a31399..f677f3796 100644 --- a/test/p2p/test_p2p_connection_pool.ml +++ b/test/p2p/test_p2p_connection_pool.ml @@ -206,6 +206,35 @@ module Random_connections = struct end +module Garbled = struct + + let is_connection_closed = function + | Error ((Write | Read) :: P2p_io_scheduler.Connection_closed :: _) -> true + | Ok _ -> false + | Error err -> + log_info "Unexpected error: %a" pp_print_error err ; + false + + let write_bad_all conns = + let bad_msg = MBytes.of_string (String.make 16 '\000') in + iter_p + (fun conn -> + trace Write @@ P2p_connection_pool.raw_write_sync conn bad_msg) + conns + + let node ch pool points = + Simple.connect_all ~timeout:2. pool points >>=? fun conns -> + sync ch >>=? fun () -> + begin + write_bad_all conns >>=? fun () -> + Simple.read_all conns + end >>= fun err -> + _assert (is_connection_closed err) __LOC__ "" + + let run points = detach_nodes node points + +end + let addr = ref Ipaddr.V6.localhost let port = ref (1024 + Random.int 8192) let clients = ref 10 @@ -246,6 +275,7 @@ let main () = Test.run "p2p-connection-pool." [ "simple", (fun _ -> Simple.run points) ; "random", (fun _ -> Random_connections.run points !repeat_connections) ; + "garbled", (fun _ -> Garbled.run points) ; ] let () =