Shell/P2p: Split the unit tests in smaller atom.

And use more `Error_monad`...
This commit is contained in:
Grégoire Henry 2017-04-09 22:52:24 +02:00
parent 413bddcd96
commit e11e9c9ac5
9 changed files with 524 additions and 305 deletions

View File

@ -286,6 +286,16 @@ module Make() = struct
fold_right_s f t init >>=? fun acc -> fold_right_s f t init >>=? fun acc ->
f h acc f h acc
let rec join = function
| [] -> return ()
| t :: ts ->
t >>= function
| Error _ as err ->
join ts >>=? fun () ->
Lwt.return err
| Ok () ->
join ts
let record_trace err result = let record_trace err result =
match result with match result with
| Ok _ as res -> res | Ok _ as res -> res

View File

@ -144,4 +144,7 @@ module type S = sig
val fold_right_s : val fold_right_s :
('a -> 'b -> 'b tzresult Lwt.t) -> 'a list -> 'b -> 'b tzresult Lwt.t ('a -> 'b -> 'b tzresult Lwt.t) -> 'a list -> 'b -> 'b tzresult Lwt.t
(** A {!Lwt.join} in the monad *)
val join : unit tzresult Lwt.t list -> unit tzresult Lwt.t
end end

View File

@ -7,41 +7,14 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
let () = Lwt_unix.set_default_async_method Async_none
include Logging.Make (struct let name = "process" end) include Logging.Make (struct let name = "process" end)
open Error_monad open Error_monad
exception Exited of int exception Exited of int
let detach ?(prefix = "") f =
Lwt_io.flush_all () >>= fun () ->
match Lwt_unix.fork () with
| 0 ->
Random.self_init () ;
let template = Format.asprintf "%s$(section): $(message)" prefix in
Lwt_main.run begin
Logging.init ~template Stderr >>= fun () ->
lwt_log_notice "PID: %d" (Unix.getpid ()) >>= fun () ->
f ()
end ;
exit 0
| pid ->
Lwt.catch
(fun () ->
Lwt_unix.waitpid [] pid >>= function
| (_,Lwt_unix.WEXITED 0) ->
Lwt.return_unit
| (_,Lwt_unix.WEXITED n) ->
Lwt.fail (Exited n)
| (_,Lwt_unix.WSIGNALED _)
| (_,Lwt_unix.WSTOPPED _) ->
Lwt.fail Exit)
(function
| Lwt.Canceled ->
Unix.kill pid Sys.sigkill ;
Lwt.return_unit
| exn -> Lwt.fail exn)
let handle_error f = let handle_error f =
Lwt.catch Lwt.catch
f f
@ -51,27 +24,90 @@ let handle_error f =
lwt_log_error "%a" Error_monad.pp_print_error err >>= fun () -> lwt_log_error "%a" Error_monad.pp_print_error err >>= fun () ->
exit 1 exit 1
let rec wait processes = module Channel = struct
type ('a, 'b) t = (Lwt_io.input_channel * Lwt_io.output_channel)
let push (_, outch) v =
Lwt.catch
(fun () -> Lwt_io.write_value outch v >>= return)
(fun exn -> fail (Exn exn))
let pop (inch, _) =
Lwt.catch
(fun () -> Lwt_io.read_value inch >>= return)
(fun exn -> fail (Exn exn))
end
let wait pid =
Lwt.catch Lwt.catch
(fun () -> (fun () ->
Lwt.nchoose_split processes >>= function Lwt_unix.waitpid [] pid >>= function
| (_, []) -> lwt_log_notice "All done!" | (_,Lwt_unix.WEXITED 0) ->
| (_, processes) -> wait processes) return ()
| (_,Lwt_unix.WEXITED n) ->
fail (Exn (Exited n))
| (_,Lwt_unix.WSIGNALED _)
| (_,Lwt_unix.WSTOPPED _) ->
Lwt.fail Exit)
(function (function
| Exited n -> | Lwt.Canceled ->
lwt_log_notice "Early error!" >>= fun () -> Unix.kill pid Sys.sigkill ;
List.iter Lwt.cancel processes ; return ()
Lwt.catch
(fun () -> Lwt.join processes)
(fun _ -> Lwt.return_unit) >>= fun () ->
lwt_log_notice "A process finished with error %d !" n >>= fun () ->
Pervasives.exit n
| exn -> | exn ->
lwt_log_notice "Unexpected error!%a" fail (Exn exn))
Error_monad.pp_exn exn >>= fun () ->
List.iter Lwt.cancel processes ; type ('a, 'b) t = {
Lwt.catch termination: unit tzresult Lwt.t ;
(fun () -> Lwt.join processes) channel: ('b, 'a) Channel.t ;
(fun _ -> Lwt.return_unit) >>= fun () -> }
Pervasives.exit 2)
let detach ?(prefix = "") f =
Lwt_io.flush_all () >>= fun () ->
let main_in, child_out = Lwt_io.pipe () in
let child_in, main_out = Lwt_io.pipe () in
match Lwt_unix.fork () with
| 0 ->
Logging.init Stderr >>= fun () ->
Random.self_init () ;
let template = Format.asprintf "%s$(message)" prefix in
Lwt_main.run begin
Lwt_io.close main_in >>= fun () ->
Lwt_io.close main_out >>= fun () ->
Logging.init ~template Stderr >>= fun () ->
lwt_log_info "PID: %d" (Unix.getpid ()) >>= fun () ->
handle_error (fun () -> f (child_in, child_out))
end ;
exit 0
| pid ->
let termination = wait pid in
Lwt_io.close child_in >>= fun () ->
Lwt_io.close child_out >>= fun () ->
Lwt.return { termination ; channel = (main_in, main_out) }
let wait_all processes =
let rec loop processes =
match processes with
| [] -> Lwt.return_none
| processes ->
Lwt.nchoose_split processes >>= function
| (finished, remaining) ->
let rec handle = function
| [] -> loop remaining
| Ok () :: finished -> handle finished
| Error err :: _ ->
Lwt.return (Some (err, remaining)) in
handle finished in
loop (List.map (fun p -> p.termination) processes) >>= function
| None ->
lwt_log_info "All done!" >>= fun () ->
return ()
| Some ([Exn (Exited n)], remaining) ->
lwt_log_error "Early error!" >>= fun () ->
List.iter Lwt.cancel remaining ;
join remaining >>= fun _ ->
failwith "A process finished with error %d !" n
| Some (err, remaining) ->
lwt_log_error "Unexpected error!%a"
pp_print_error err >>= fun () ->
List.iter Lwt.cancel remaining ;
join remaining >>= fun _ ->
failwith "A process finished with an unexpected error !"

View File

@ -10,6 +10,20 @@
open Error_monad open Error_monad
exception Exited of int exception Exited of int
val detach: ?prefix:string -> (unit -> unit Lwt.t) -> unit Lwt.t module Channel : sig
val handle_error: (unit -> (unit, error list) result Lwt.t) -> unit Lwt.t type ('a, 'b) t
val wait: unit Lwt.t list -> unit Lwt.t val push: ('a, 'b) t -> 'a -> unit tzresult Lwt.t
val pop: ('a, 'b) t -> 'b tzresult Lwt.t
end
type ('a, 'b) t = {
termination: unit tzresult Lwt.t ;
channel: ('b, 'a) Channel.t ;
}
val detach:
?prefix:string ->
(('a, 'b) Channel.t -> unit tzresult Lwt.t) ->
('a, 'b) t Lwt.t
val wait_all: ('a, 'b) t list -> unit tzresult Lwt.t

View File

@ -11,9 +11,6 @@ module Test = Kaputt.Abbreviations.Test
let keep_dir = try ignore (Sys.getenv "KEEPDIR") ; true with _ -> false let keep_dir = try ignore (Sys.getenv "KEEPDIR") ; true with _ -> false
let make_test ~title test =
Test.add_simple_test ~title (fun () -> Lwt_main.run (test ()))
let rec remove_dir dir = let rec remove_dir dir =
if Sys.file_exists dir then begin if Sys.file_exists dir then begin
Array.iter (fun file -> Array.iter (fun file ->

View File

@ -24,7 +24,7 @@ OPENED_MODULES := ${NODE_OPENED_MODULES}
.PHONY:run-test-p2p-connection .PHONY:run-test-p2p-connection
run-test-p2p-connection: run-test-p2p-connection:
@echo @echo
./test-p2p-connection ./test-p2p-connection -v
TEST_CONNECTION_IMPLS := \ TEST_CONNECTION_IMPLS := \
test_p2p_connection.ml test_p2p_connection.ml
@ -42,7 +42,7 @@ clean::
.PHONY:run-test-p2p-connection-pool .PHONY:run-test-p2p-connection-pool
run-test-p2p-connection-pool: run-test-p2p-connection-pool:
@echo @echo
./test-p2p-connection-pool --clients 10 --repeat 5 ./test-p2p-connection-pool --clients 10 --repeat 5 -v
TEST_CONNECTION_POOL_IMPLS := \ TEST_CONNECTION_POOL_IMPLS := \
test_p2p_connection_pool.ml test_p2p_connection_pool.ml

View File

@ -12,7 +12,9 @@
open Error_monad open Error_monad
open P2p_types open P2p_types
include Logging.Make (struct let name = "test-p2p-connection" end) include Logging.Make (struct let name = "test.p2p.connection" end)
let default_addr = Ipaddr.V6.localhost
let proof_of_work_target = Crypto_box.make_target 16. let proof_of_work_target = Crypto_box.make_target 16.
let id1 = Identity.generate proof_of_work_target let id1 = Identity.generate proof_of_work_target
@ -44,6 +46,46 @@ let rec listen ?port addr =
| exn -> Lwt.fail exn | exn -> Lwt.fail exn
end end
let sync ch =
Process.Channel.push ch () >>=? fun () ->
Process.Channel.pop ch >>=? fun () ->
return ()
let rec sync_nodes nodes =
iter_p
(fun { Process.channel } -> Process.Channel.pop channel)
nodes >>=? fun () ->
iter_p
(fun { Process.channel } -> Process.Channel.push channel ())
nodes >>=? fun () ->
sync_nodes nodes
let sync_nodes nodes =
sync_nodes nodes >>= function
| Ok () | Error (Exn End_of_file :: _) ->
return ()
| Error e as err ->
Lwt.return err
let run_nodes client server =
listen default_addr >>= fun (main_socket, port) ->
Process.detach ~prefix:"server: " begin fun channel ->
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
server channel sched main_socket >>=? fun () ->
P2p_io_scheduler.shutdown sched >>= fun () ->
return ()
end >>= fun server_node ->
Process.detach ~prefix:"client: " begin fun channel ->
Lwt_utils.safe_close main_socket >>= fun () ->
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
client channel sched default_addr port >>=? fun () ->
P2p_io_scheduler.shutdown sched >>= fun () ->
return ()
end >>= fun client_node ->
let nodes = [ server_node ; client_node ] in
Lwt.ignore_result (sync_nodes nodes) ;
Process.wait_all nodes
let raw_accept sched main_socket = let raw_accept sched main_socket =
Lwt_unix.accept main_socket >>= fun (fd, sockaddr) -> Lwt_unix.accept main_socket >>= fun (fd, sockaddr) ->
let fd = P2p_io_scheduler.register sched fd in let fd = P2p_io_scheduler.register sched fd in
@ -73,20 +115,11 @@ let connect sched addr port id =
P2p_connection.authenticate P2p_connection.authenticate
~proof_of_work_target ~proof_of_work_target
~incoming:false fd (addr, port) id versions >>=? fun (info, auth_fd) -> ~incoming:false fd (addr, port) id versions >>=? fun (info, auth_fd) ->
assert (not info.incoming) ; _assert (not info.incoming) __LOC__ "" >>=? fun () ->
assert (Peer_id.compare info.peer_id id1.peer_id = 0) ; _assert (Peer_id.compare info.peer_id id1.peer_id = 0)
__LOC__ "" >>=? fun () ->
return auth_fd return auth_fd
let simple_msg =
MBytes.create (1 lsl 1)
let is_rejected = function
| Error [P2p_connection.Rejected] -> true
| Ok _ -> false
| Error err ->
log_notice "Error: %a" pp_print_error err ;
false
let is_connection_closed = function let is_connection_closed = function
| Error [P2p_io_scheduler.Connection_closed] -> true | Error [P2p_io_scheduler.Connection_closed] -> true
| Ok _ -> false | Ok _ -> false
@ -94,116 +127,193 @@ let is_connection_closed = function
log_notice "Error: %a" pp_print_error err ; log_notice "Error: %a" pp_print_error err ;
false false
let bytes_encoding = Data_encoding.Variable.bytes let is_decoding_error = function
| Error [P2p_connection.Decoding_error] -> true
| Ok _ -> false
| Error err ->
log_notice "Error: %a" pp_print_error err ;
false
let server main_socket = module Low_level = struct
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
(* Low-level test. *)
raw_accept sched main_socket >>= fun (fd, _point) ->
lwt_log_notice "Low_level" >>= fun () ->
P2p_io_scheduler.write fd simple_msg >>=? fun () ->
P2p_io_scheduler.close fd >>=? fun _ ->
lwt_log_notice "Low_level OK" >>= fun () ->
(* Kick the first connection. *)
accept sched main_socket >>=? fun (info, auth_fd) ->
lwt_log_notice "Kick" >>= fun () ->
assert (info.incoming) ;
assert (Peer_id.compare info.peer_id id2.peer_id = 0) ;
P2p_connection.kick auth_fd >>= fun () ->
lwt_log_notice "Kick OK" >>= fun () ->
(* Let's be rejected. *)
accept sched main_socket >>=? fun (_info, auth_fd) ->
P2p_connection.accept auth_fd bytes_encoding >>= fun conn ->
assert (is_rejected conn) ;
lwt_log_notice "Kicked OK" >>= fun () ->
(* Accept and send a single message. *)
accept sched main_socket >>=? fun (_info, auth_fd) ->
lwt_log_notice "Single" >>= fun () ->
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
P2p_connection.write_sync conn simple_msg >>=? fun () ->
P2p_connection.close conn >>= fun _stat ->
lwt_log_notice "Single OK" >>= fun () ->
(* Accept and send a single message, while the client expected 2. *)
accept sched main_socket >>=? fun (_info, auth_fd) ->
lwt_log_notice "Early close (read)" >>= fun () ->
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
P2p_connection.write_sync conn simple_msg >>=? fun () ->
P2p_connection.close conn >>= fun _stat ->
lwt_log_notice "Early close (read) OK" >>= fun () ->
(* Accept and wait for the client to close the connection. *)
accept sched main_socket >>=? fun (_info, auth_fd) ->
lwt_log_notice "Early close (write)" >>= fun () ->
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
P2p_connection.close conn >>= fun _stat ->
lwt_log_notice "Early close (write) OK" >>= fun () ->
P2p_io_scheduler.shutdown sched >>= fun () ->
Lwt_unix.sleep 0.2 >>= fun () ->
lwt_log_notice "Success" >>= fun () ->
return ()
let client addr port = let simple_msg = MBytes.create (1 lsl 4)
let msg = MBytes.create (MBytes.length simple_msg) in
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
raw_connect sched addr port >>= fun fd ->
P2p_io_scheduler.read_full fd msg >>=? fun () ->
assert (MBytes.compare simple_msg msg = 0) ;
P2p_io_scheduler.close fd >>=? fun () ->
lwt_log_notice "Low_level OK" >>= fun () ->
(* let's be rejected. *)
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd bytes_encoding >>= fun conn ->
assert (is_rejected conn) ;
lwt_log_notice "Kick OK" >>= fun () ->
(* let's reject! *)
lwt_log_notice "Kicked" >>= fun () ->
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.kick auth_fd >>= fun () ->
(* let's exchange a simple message. *)
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
P2p_connection.read conn >>=? fun (_msg_size, msg) ->
assert (MBytes.compare simple_msg msg = 0) ;
P2p_connection.close conn >>= fun _stat ->
lwt_log_notice "Simple OK" >>= fun () ->
(* let's detect a closed connection on `read`. *)
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
P2p_connection.read conn >>=? fun (_msg_size, msg) ->
assert (MBytes.compare simple_msg msg = 0) ;
P2p_connection.read conn >>= fun msg ->
assert (is_connection_closed msg) ;
P2p_connection.close conn >>= fun _stat ->
lwt_log_notice "Early close (read) OK" >>= fun () ->
(* let's detect a closed connection on `write`. *)
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd bytes_encoding >>=? fun conn ->
Lwt_unix.sleep 0.1 >>= fun () ->
P2p_connection.write_sync conn simple_msg >>= fun unit ->
assert (is_connection_closed unit) ;
P2p_connection.close conn >>= fun _stat ->
lwt_log_notice "Early close (write) OK" >>= fun () ->
P2p_io_scheduler.shutdown sched >>= fun () ->
lwt_log_notice "Success" >>= fun () ->
return ()
let default_addr = Ipaddr.V6.localhost let client _ch sched addr port =
let msg = MBytes.create (MBytes.length simple_msg) in
raw_connect sched addr port >>= fun fd ->
P2p_io_scheduler.read_full fd msg >>=? fun () ->
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
P2p_io_scheduler.close fd >>=? fun () ->
return ()
let server _ch sched socket =
raw_accept sched socket >>= fun (fd, point) ->
P2p_io_scheduler.write fd simple_msg >>=? fun () ->
P2p_io_scheduler.close fd >>=? fun _ ->
return ()
let run _dir = run_nodes client server
end
module Kick = struct
let encoding = Data_encoding.bytes
let is_rejected = function
| Error [P2p_connection.Rejected] -> true
| Ok _ -> false
| Error err ->
log_notice "Error: %a" pp_print_error err ;
false
let server _ch sched socket =
accept sched socket >>=? fun (info, auth_fd) ->
_assert (info.incoming) __LOC__ "" >>=? fun () ->
_assert (Peer_id.compare info.peer_id id2.peer_id = 0)
__LOC__ "" >>=? fun () ->
P2p_connection.kick auth_fd >>= fun () ->
return ()
let client _ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd encoding >>= fun conn ->
_assert (is_rejected conn) __LOC__ "" >>=? fun () ->
return ()
let run _dir = run_nodes client server
end
module Kicked = struct
let encoding = Data_encoding.bytes
let server _ch sched socket =
accept sched socket >>=? fun (info, auth_fd) ->
P2p_connection.accept auth_fd encoding >>= fun conn ->
_assert (Kick.is_rejected conn) __LOC__ "" >>=? fun () ->
return ()
let client _ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.kick auth_fd >>= fun () ->
return ()
let run _dir = run_nodes client server
end
module Simple_message = struct
let encoding = Data_encoding.bytes
let simple_msg = MBytes.create (1 lsl 4)
let simple_msg2 = MBytes.create (1 lsl 4)
let server ch sched socket =
accept sched socket >>=? fun (info, auth_fd) ->
P2p_connection.accept auth_fd encoding >>=? fun conn ->
P2p_connection.write_sync conn simple_msg >>=? fun () ->
P2p_connection.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
sync ch >>=? fun () ->
P2p_connection.close conn >>= fun _stat ->
return ()
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd encoding >>=? fun conn ->
P2p_connection.write_sync conn simple_msg2 >>=? fun () ->
P2p_connection.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
sync ch >>=? fun () ->
P2p_connection.close conn >>= fun _stat ->
return ()
let run _dir = run_nodes client server
end
module Close_on_read = struct
let encoding = Data_encoding.bytes
let simple_msg = MBytes.create (1 lsl 4)
let server _ch sched socket =
accept sched socket >>=? fun (info, auth_fd) ->
P2p_connection.accept auth_fd encoding >>=? fun conn ->
P2p_connection.close conn >>= fun _stat ->
return ()
let client _ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd encoding >>=? fun conn ->
P2p_connection.read conn >>= fun err ->
_assert (is_connection_closed err) __LOC__ "" >>=? fun () ->
P2p_connection.close conn >>= fun _stat ->
return ()
let run _dir = run_nodes client server
end
module Close_on_write = struct
let encoding = Data_encoding.bytes
let simple_msg = MBytes.create (1 lsl 4)
let server ch sched socket =
accept sched socket >>=? fun (info, auth_fd) ->
P2p_connection.accept auth_fd encoding >>=? fun conn ->
P2p_connection.close conn >>= fun _stat ->
sync ch >>=? fun ()->
return ()
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_connection.accept auth_fd encoding >>=? fun conn ->
sync ch >>=? fun ()->
P2p_connection.write_sync conn simple_msg >>= fun err ->
_assert (is_connection_closed err) __LOC__ "" >>=? fun () ->
P2p_connection.close conn >>= fun _stat ->
return ()
let run _dir = run_nodes client server
end
let spec = Arg.[
"-v", Unit (fun () ->
Lwt_log_core.(add_rule "test.p2p.connection" Info) ;
Lwt_log_core.(add_rule "p2p.connection" Info)),
" Log up to info msgs" ;
"-vv", Unit (fun () ->
Lwt_log_core.(add_rule "test.p2p.connection" Debug) ;
Lwt_log_core.(add_rule "p2p.connection" Debug)),
" Log up to debug msgs";
]
let main () = let main () =
listen default_addr >>= fun (main_socket, port) -> let open Utils in
let server = let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in
Process.detach ~prefix:"server " begin fun () -> let usage_msg = "Usage: %s.\nArguments are:" in
Process.handle_error begin fun () -> Arg.parse spec anon_fun usage_msg ;
server main_socket Test.run "p2p-connection." [
end "low-level", Low_level.run ;
end in "kick", Kick.run ;
let client = "kicked", Kicked.run ;
Process.detach ~prefix:"client " begin fun () -> "simple-message", Simple_message.run ;
Lwt_utils.safe_close main_socket >>= fun () -> "close-on-read", Close_on_read.run ;
Process.handle_error begin fun () -> "close-on-write", Close_on_write.run ;
client default_addr port ]
end
end in
Process.wait [ server ; client ]
let () = let () =
Lwt_main.run (main ()) Sys.catch_break true ;
try main ()
with _ -> ()

View File

@ -9,7 +9,7 @@
open Error_monad open Error_monad
open P2p_types open P2p_types
include Logging.Make (struct let name = "test-p2p-connection-pool" end) include Logging.Make (struct let name = "test.p2p.connection-pool" end)
type message = type message =
| Ping | Ping
@ -36,97 +36,29 @@ let meta_config : metadata P2p_connection_pool.meta_config = {
score = fun () -> 0. ; score = fun () -> 0. ;
} }
let rec connect ~timeout pool point = let sync ch =
lwt_log_info "Connect to %a" Point.pp point >>= fun () -> Process.Channel.push ch () >>=? fun () ->
P2p_connection_pool.connect pool point ~timeout >>= function Process.Channel.pop ch >>=? fun () ->
| Error [P2p_connection_pool.Connected] -> begin
match P2p_connection_pool.Connection.find_by_point pool point with
| Some conn -> return conn
| None -> failwith "Woops..."
end
| Error ([ P2p_connection_pool.Connection_refused
| P2p_connection_pool.Pending_connection
| P2p_connection.Rejected
| Lwt_utils.Canceled
| Lwt_utils.Timeout
| P2p_connection_pool.Rejected _
] as err) ->
lwt_log_info "@[Connection to %a failed:@ %a@]"
Point.pp point pp_print_error err >>= fun () ->
Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () ->
connect ~timeout pool point
| Ok _ | Error _ as res -> Lwt.return res
let connect_all ~timeout pool points =
map_p (connect ~timeout pool) points
type error += Connect | Write | Read
let write_all conns msg =
iter_p
(fun conn ->
trace Write @@ P2p_connection_pool.write_sync conn msg)
conns
let read_all conns =
iter_p
(fun conn ->
trace Read @@ P2p_connection_pool.read conn >>=? fun Ping ->
return ())
conns
let rec connect_random pool total rem point n =
Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () ->
(trace Connect @@ connect ~timeout:2. pool point) >>=? fun conn ->
(trace Write @@ P2p_connection_pool.write conn Ping) >>= fun _ ->
(trace Read @@ P2p_connection_pool.read conn) >>=? fun Ping ->
Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () ->
P2p_connection_pool.disconnect conn >>= fun () ->
begin
decr rem ;
if !rem mod total = 0 then
lwt_log_notice "Remaining: %d." (!rem / total)
else
Lwt.return ()
end >>= fun () ->
if n > 1 then
connect_random pool total rem point (pred n)
else
return ()
let connect_random_all pool points n =
let total = List.length points in
let rem = ref (n * total) in
iter_p (fun point -> connect_random pool total rem point n) points
let close_all conns =
Lwt_list.iter_p P2p_connection_pool.disconnect conns
let run_net config repeat points addr port =
Lwt_unix.sleep (Random.float 2.0) >>= fun () ->
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
P2p_connection_pool.create
config meta_config msg_config sched >>= fun pool ->
P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome ->
connect_all ~timeout:2. pool points >>=? fun conns ->
lwt_log_notice "Bootstrap OK" >>= fun () ->
write_all conns Ping >>=? fun () ->
lwt_log_notice "Sent all messages." >>= fun () ->
read_all conns >>=? fun () ->
lwt_log_notice "Read all messages." >>= fun () ->
close_all conns >>= fun () ->
lwt_log_notice "Begin random connections." >>= fun () ->
connect_random_all pool points repeat >>=? fun () ->
lwt_log_notice "Shutting down" >>= fun () ->
P2p_welcome.shutdown welcome >>= fun () ->
P2p_connection_pool.destroy pool >>= fun () ->
P2p_io_scheduler.shutdown sched >>= fun () ->
lwt_log_notice "Shutdown Ok" >>= fun () ->
return () return ()
let make_net points repeat n = let rec sync_nodes nodes =
let point, points = Utils.select n points in iter_p
(fun { Process.channel } -> Process.Channel.pop channel)
nodes >>=? fun () ->
iter_p
(fun { Process.channel } -> Process.Channel.push channel ())
nodes >>=? fun () ->
sync_nodes nodes
let sync_nodes nodes =
sync_nodes nodes >>= function
| Ok () | Error (Exn End_of_file :: _) ->
return ()
| Error e as err ->
Lwt.return err
let detach_node f points n =
let (addr, port), points = Utils.select n points in
let proof_of_work_target = Crypto_box.make_target 0. in let proof_of_work_target = Crypto_box.make_target 0. in
let identity = Identity.generate proof_of_work_target in let identity = Identity.generate proof_of_work_target in
let nb_points = List.length points in let nb_points = List.length points in
@ -136,7 +68,7 @@ let make_net points repeat n =
trusted_points = points ; trusted_points = points ;
peers_file = "/dev/null" ; peers_file = "/dev/null" ;
closed_network = true ; closed_network = true ;
listening_port = Some (snd point) ; listening_port = Some port ;
min_connections = nb_points ; min_connections = nb_points ;
max_connections = nb_points ; max_connections = nb_points ;
max_incoming_connections = nb_points ; max_incoming_connections = nb_points ;
@ -151,17 +83,129 @@ let make_net points repeat n =
swap_linger = 0. ; swap_linger = 0. ;
} in } in
Process.detach Process.detach
~prefix:(Format.asprintf "%a " Peer_id.pp identity.peer_id) ~prefix:(Format.asprintf "%a: " Peer_id.pp_short identity.peer_id)
begin fun () -> begin fun channel ->
run_net config repeat points (fst point) (snd point) >>= function let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
| Ok () -> Lwt.return_unit P2p_connection_pool.create
| Error err -> config meta_config msg_config sched >>= fun pool ->
lwt_log_error "@[<v 2>Unexpected error: %d@ %a@]" P2p_welcome.run ~backlog:10 pool ~addr port >>= fun welcome ->
(List.length err) lwt_log_info "Node ready (port: %d)" port >>= fun () ->
pp_print_error err >>= fun () -> sync channel >>=? fun () ->
exit 1 f channel pool points >>=? fun () ->
lwt_log_info "Shutting down..." >>= fun () ->
P2p_welcome.shutdown welcome >>= fun () ->
P2p_connection_pool.destroy pool >>= fun () ->
P2p_io_scheduler.shutdown sched >>= fun () ->
lwt_log_info "Bye." >>= fun () ->
return ()
end end
let detach_nodes ?(sync = 0) run_node points =
let open Utils in
let clients = List.length points in
Lwt_list.map_p
(detach_node run_node points) (0 -- (clients - 1)) >>= fun nodes ->
Lwt.ignore_result (sync_nodes nodes) ;
Process.wait_all nodes
type error += Connect | Write | Read
module Simple = struct
let rec connect ~timeout pool point =
lwt_log_info "Connect to %a" Point.pp point >>= fun () ->
P2p_connection_pool.connect pool point ~timeout >>= function
| Error [P2p_connection_pool.Connected] -> begin
match P2p_connection_pool.Connection.find_by_point pool point with
| Some conn -> return conn
| None -> failwith "Woops..."
end
| Error ([ P2p_connection_pool.Connection_refused
| P2p_connection_pool.Pending_connection
| P2p_connection.Rejected
| Lwt_utils.Canceled
| Lwt_utils.Timeout
| P2p_connection_pool.Rejected _
] as err) ->
lwt_log_info "@[Connection to %a failed:@ %a@]"
Point.pp point pp_print_error err >>= fun () ->
Lwt_unix.sleep (0.5 +. Random.float 2.) >>= fun () ->
connect ~timeout pool point
| Ok _ | Error _ as res -> Lwt.return res
let connect_all ~timeout pool points =
map_p (connect ~timeout pool) points
let write_all conns msg =
iter_p
(fun conn ->
trace Write @@ P2p_connection_pool.write_sync conn msg)
conns
let read_all conns =
iter_p
(fun conn ->
trace Read @@ P2p_connection_pool.read conn >>=? fun Ping ->
return ())
conns
let close_all conns =
Lwt_list.iter_p P2p_connection_pool.disconnect conns
let node channel pool points =
connect_all ~timeout:2. pool points >>=? fun conns ->
lwt_log_info "Bootstrap OK" >>= fun () ->
sync channel >>=? fun () ->
write_all conns Ping >>=? fun () ->
lwt_log_info "Sent all messages." >>= fun () ->
sync channel >>=? fun () ->
read_all conns >>=? fun () ->
lwt_log_info "Read all messages." >>= fun () ->
sync channel >>=? fun () ->
close_all conns >>= fun () ->
lwt_log_info "All connections successfully closed." >>= fun () ->
return ()
let run points = detach_nodes node points
end
module Random_connections = struct
let rec connect_random pool total rem point n =
Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () ->
(trace Connect @@ Simple.connect ~timeout:2. pool point) >>=? fun conn ->
(trace Write @@ P2p_connection_pool.write conn Ping) >>= fun _ ->
(trace Read @@ P2p_connection_pool.read conn) >>=? fun Ping ->
Lwt_unix.sleep (0.2 +. Random.float 1.0) >>= fun () ->
P2p_connection_pool.disconnect conn >>= fun () ->
begin
decr rem ;
if !rem mod total = 0 then
lwt_log_info "Remaining: %d." (!rem / total)
else
Lwt.return ()
end >>= fun () ->
if n > 1 then
connect_random pool total rem point (pred n)
else
return ()
let connect_random_all pool points n =
let total = List.length points in
let rem = ref (n * total) in
iter_p (fun point -> connect_random pool total rem point n) points
let node repeat channel pool points =
lwt_log_info "Begin random connections." >>= fun () ->
connect_random_all pool points repeat >>=? fun () ->
lwt_log_info "Random connections OK." >>= fun () ->
return ()
let run points repeat = detach_nodes (node repeat) points
end
let addr = ref Ipaddr.V6.localhost let addr = ref Ipaddr.V6.localhost
let port = ref (1024 + Random.int 8192) let port = ref (1024 + Random.int 8192)
let clients = ref 10 let clients = ref 10
@ -179,25 +223,32 @@ let spec = Arg.[
"--repeat", Set_int repeat_connections, "--repeat", Set_int repeat_connections,
" Number of connections/disconnections." ; " Number of connections/disconnections." ;
"-v", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Info)),
"-v", Unit (fun () ->
Lwt_log_core.(add_rule "test.p2p.connection-pool" Info) ;
Lwt_log_core.(add_rule "p2p.connection-pool" Info)),
" Log up to info msgs" ; " Log up to info msgs" ;
"-vv", Unit (fun () -> Lwt_log_core.(add_rule "p2p.connection-pool" Debug)), "-vv", Unit (fun () ->
Lwt_log_core.(add_rule "test.p2p.connection-pool" Debug) ;
Lwt_log_core.(add_rule "p2p.connection-pool" Debug)),
" Log up to debug msgs"; " Log up to debug msgs";
] ]
let main () = let main () =
let open Utils in let open Utils in
Logging.init Stderr >>= fun () -> let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in
let anon_fun _num_peers = raise (Arg.Bad "No anonymous argument.") in
let usage_msg = "Usage: %s <num_peers>.\nArguments are:" in let usage_msg = "Usage: %s <num_peers>.\nArguments are:" in
Arg.parse spec anon_fun usage_msg ; Arg.parse spec anon_fun usage_msg ;
let ports = !port -- (!port + !clients - 1) in let ports = !port -- (!port + !clients - 1) in
let points = List.map (fun port -> !addr, port) ports in let points = List.map (fun port -> !addr, port) ports in
Lwt_list.iter_p (make_net points !repeat_connections) (0 -- (!clients - 1)) Test.run "p2p-connection-pool." [
"simple", (fun _ -> Simple.run points) ;
"random", (fun _ -> Random_connections.run points !repeat_connections) ;
]
let () = let () =
Sys.catch_break true ; Sys.catch_break true ;
try Lwt_main.run @@ main () try main ()
with _ -> () with _ -> ()

View File

@ -142,24 +142,20 @@ let run
addr port time n = addr port time n =
Logging.init Stderr >>= fun () -> Logging.init Stderr >>= fun () ->
listen ?port addr >>= fun (main_socket, port) -> listen ?port addr >>= fun (main_socket, port) ->
let server = Process.detach ~prefix:"server: " begin fun _ ->
Process.detach ~prefix:"server " begin fun () -> server
Process.handle_error begin fun () -> ?display_client_stat ?max_download_speed
server ~read_buffer_size ?read_queue_size
?display_client_stat ?max_download_speed main_socket n
~read_buffer_size ?read_queue_size end >>= fun server_node ->
main_socket n
end
end in
let client n = let client n =
let prefix = Printf.sprintf "client(%d) " n in let prefix = Printf.sprintf "client(%d): " n in
Process.detach ~prefix begin fun () -> Process.detach ~prefix begin fun _ ->
Lwt_utils.safe_close main_socket >>= fun () -> Lwt_utils.safe_close main_socket >>= fun () ->
Process.handle_error begin fun () -> client ?max_upload_speed ?write_queue_size addr port time n
client ?max_upload_speed ?write_queue_size addr port time n
end
end in end in
Process.wait (server :: List.map client Utils.(1 -- n)) Lwt_list.map_p client Utils.(1 -- n) >>= fun client_nodes ->
Process.wait_all (server_node :: client_nodes)
let () = Random.self_init () let () = Random.self_init ()
@ -221,12 +217,14 @@ let () =
let () = let () =
Sys.catch_break true ; Sys.catch_break true ;
Lwt_main.run Test.run "p2p.io-scheduler." [
(run "trivial-quota", (fun _dir ->
?display_client_stat:!display_client_stat run
?max_download_speed:!max_download_speed ?display_client_stat:!display_client_stat
?max_upload_speed:!max_upload_speed ?max_download_speed:!max_download_speed
~read_buffer_size:!read_buffer_size ?max_upload_speed:!max_upload_speed
?read_queue_size:!read_queue_size ~read_buffer_size:!read_buffer_size
?write_queue_size:!write_queue_size ?read_queue_size:!read_queue_size
!addr !port !delay !clients) ?write_queue_size:!write_queue_size
!addr !port !delay !clients)
]