From 2ed8bf2cfa678f991544dd03d32c831de8b30bc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Sat, 14 Jan 2017 13:13:59 +0100 Subject: [PATCH] Shell: implement `P2p_io_scheduler` --- .gitignore | 2 +- src/.merlin | 4 +- src/Makefile | 10 +- src/node/net/p2p.ml | 16 +- src/node/net/p2p_io_scheduler.ml | 449 ++++++++++++++++++++++++++++++ src/node/net/p2p_io_scheduler.mli | 93 +++++++ src/node/net/p2p_types.ml | 46 +++ src/node/net/p2p_types.mli | 26 ++ src/tezos-deps.opam | 1 + src/utils/moving_average.ml | 97 +++++-- src/utils/moving_average.mli | 34 +-- test/Makefile | 39 ++- test/lib/process.ml | 79 ++++++ test/lib/process.mli | 15 + test/test_p2p.ml | 167 ----------- test/test_p2p_io_scheduler.ml | 232 +++++++++++++++ 16 files changed, 1072 insertions(+), 238 deletions(-) create mode 100644 src/node/net/p2p_io_scheduler.ml create mode 100644 src/node/net/p2p_io_scheduler.mli create mode 100644 src/node/net/p2p_types.ml create mode 100644 src/node/net/p2p_types.mli create mode 100644 test/lib/process.ml create mode 100644 test/lib/process.mli delete mode 100644 test/test_p2p.ml create mode 100644 test/test_p2p_io_scheduler.ml diff --git a/.gitignore b/.gitignore index ab918853d..dc5373a3c 100644 --- a/.gitignore +++ b/.gitignore @@ -39,7 +39,7 @@ /test/test-context /test/test-basic /test/test-data-encoding -/test/test-p2p +/test/test-p2p-io-scheduler /test/LOG *~ diff --git a/src/.merlin b/src/.merlin index d193250dc..d351d972f 100644 --- a/src/.merlin +++ b/src/.merlin @@ -23,6 +23,7 @@ FLG -w -30 FLG -w -40 PKG base64 PKG calendar +PKG cmdliner PKG cohttp PKG compiler-libs.optcomp PKG conduit @@ -33,12 +34,13 @@ PKG ezjsonm PKG git PKG irmin PKG lwt +PKG mtime.os PKG ocplib-endian PKG ocplib-json-typed PKG ocplib-ocamlres PKG ocplib-resto.directory PKG result PKG sodium +PKG ssl PKG unix PKG zarith -PKG cmdliner diff --git a/src/Makefile b/src/Makefile index 2bbba54b5..6d4ee8cfa 100644 --- a/src/Makefile +++ b/src/Makefile @@ -101,7 +101,7 @@ clean:: MINUTILS_LIB_INTFS := \ minutils/mBytes.mli \ - minutils/hex_encode.mli \ + minutils/hex_encode.mli \ minutils/utils.mli \ minutils/compare.mli \ minutils/data_encoding.mli \ @@ -187,6 +187,7 @@ UTILS_PACKAGES := \ base64 \ calendar \ ezjsonm \ + mtime.os \ sodium \ zarith \ $(COVERAGEPKG) \ @@ -256,6 +257,8 @@ clean:: NODE_LIB_INTFS := \ \ + node/net/p2p_types.mli \ + node/net/p2p_io_scheduler.mli \ node/net/p2p.mli \ node/net/RPC_server.mli \ \ @@ -286,7 +289,10 @@ NODE_LIB_IMPLS := \ \ compiler/node_compiler_main.ml \ \ + node/net/p2p_types.ml \ + node/net/p2p_io_scheduler.ml \ node/net/p2p.ml \ + \ node/net/RPC_server.ml \ \ node/updater/fitness.ml \ @@ -594,10 +600,8 @@ NO_DEPS := \ compiler/embedded_cmis.cmx compiler/embedded_cmis.cmi: OPENED_MODULES= ifneq ($(MAKECMDGOALS),clean) -ifneq ($(MAKECMDGOALS),build-deps) include .depend endif -endif DEPENDS := $(filter-out $(NO_DEPS), \ $(MINUTILS_LIB_INTFS) $(MINUTILS_LIB_IMPLS) \ $(UTILS_LIB_INTFS) $(UTILS_LIB_IMPLS) \ diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 08cd4c6f7..3d21775d5 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -486,24 +486,12 @@ module Make (P: PARAMS) = struct (* Them we can build the net object and launch the worker. *) and connected buf local_nonce version gid public_key nonce listening_port = - let feed_ma ?(freq=1.) ma counter = - let rec inner old_received = - Lwt_unix.sleep freq >>= fun () -> - let received = !counter in - ma#add_int (received - old_received); - inner received in - Lwt.async (fun () -> Lwt.pick [cancelation (); inner !counter]) - in (* net object state *) let last = ref (Unix.gettimeofday ()) in let local_nonce = ref local_nonce in let remote_nonce = ref nonce in let received = ref 0 in let sent = ref 0 in - let received_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in - let sent_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in - feed_ma received_ema received ; - feed_ma sent_ema sent ; (* net object callbaks *) let last_seen () = !last in let get_nonce nonce = @@ -520,8 +508,8 @@ module Make (P: PARAMS) = struct let reader = Lwt_pipe.create ~size:2 () in let total_sent () = !sent in let total_recv () = !received in - let current_inflow () = received_ema#get in - let current_outflow () = sent_ema#get in + let current_inflow () = 0. in + let current_outflow () = 0. in (* net object construction *) let peer = { gid ; public_key ; point = (addr, port) ; listening_port ; version ; last_seen ; diff --git a/src/node/net/p2p_io_scheduler.ml b/src/node/net/p2p_io_scheduler.ml new file mode 100644 index 000000000..0a0acb087 --- /dev/null +++ b/src/node/net/p2p_io_scheduler.ml @@ -0,0 +1,449 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(* TODO decide whether we need to preallocate buffers or not. *) + +open P2p_types +include Logging.Make (struct let name = "p2p.io-scheduler" end) + +module Inttbl = Hashtbl.Make(struct + type t = int + let equal (x: int) (y: int) = x = y + let hash = Hashtbl.hash + end) + +let alpha = 0.2 + +module type IO = sig + val name: string + type in_param + val pop: in_param -> MBytes.t tzresult Lwt.t + type out_param + val push: out_param -> MBytes.t -> unit tzresult Lwt.t + val close: out_param -> error list -> unit Lwt.t +end + +module Scheduler(IO : IO) = struct + + type t = { + canceler: Canceler.t ; + mutable worker: unit Lwt.t ; + counter: Moving_average.t ; + max_speed: int option ; + mutable quota: int ; + quota_updated: unit Lwt_condition.t ; + readys: unit Lwt_condition.t ; + readys_high: (connection * MBytes.t tzresult) Queue.t ; + readys_low: (connection * MBytes.t tzresult) Queue.t ; + } + + and connection = { + id: int ; + mutable closed: bool ; + canceler: Canceler.t ; + in_param: IO.in_param ; + out_param: IO.out_param ; + mutable current_pop: MBytes.t tzresult Lwt.t ; + mutable current_push: unit tzresult Lwt.t ; + counter: Moving_average.t ; + mutable quota: int ; + mutable last_quota: int ; + } + + let cancel (conn : connection) err = + if not conn.closed then begin + conn.closed <- true ; + Lwt.catch + (fun () -> IO.close conn.out_param err) + (fun _ -> Lwt.return_unit) >>= fun () -> + Canceler.cancel conn.canceler + end else + Lwt.return_unit + + let waiter st conn = + assert (Lwt.state conn.current_pop <> Sleep) ; + conn.current_pop <- IO.pop conn.in_param ; + Lwt.async begin fun () -> + conn.current_pop >>= fun res -> + conn.current_push >>= fun _ -> + let was_empty = + Queue.is_empty st.readys_high && Queue.is_empty st.readys_low in + if conn.quota > 0 then + Queue.push (conn, res) st.readys_high + else + Queue.push (conn, res) st.readys_low ; + if was_empty then Lwt_condition.broadcast st.readys () ; + Lwt.return_unit + end + + let wait_data st = + let is_empty = + Queue.is_empty st.readys_high && Queue.is_empty st.readys_low in + if is_empty then Lwt_condition.wait st.readys else Lwt.return_unit + + let check_quota st = + if st.max_speed <> None && st.quota < 0 then + Lwt_condition.wait st.quota_updated + else + Lwt_unix.yield () + + let rec worker_loop st = + check_quota st >>= fun () -> + Lwt.pick [ + Canceler.cancelation st.canceler ; + wait_data st + ] >>= fun () -> + if Canceler.canceled st.canceler then + Lwt.return_unit + else + let prio, (conn, msg) = + if not (Queue.is_empty st.readys_high) then + true, (Queue.pop st.readys_high) + else + false, (Queue.pop st.readys_low) + in + match msg with + | Error [Lwt_utils.Canceled] -> + worker_loop st + | Error ([Exn (Lwt_pipe.Closed | + Unix.Unix_error (EBADF, _, _))] as err) -> + cancel conn err >>= fun () -> + worker_loop st + | Error err -> + lwt_debug "Error %a" pp_print_error err >>= fun () -> + cancel conn err >>= fun () -> + worker_loop st + | Ok msg -> + conn.current_push <- begin + IO.push conn.out_param msg >>= function + | Ok () + | Error [Lwt_utils.Canceled] -> + return () + | Error ([Exn (Unix.Unix_error (EBADF, _, _) | + Lwt_pipe.Closed)] as err) -> + cancel conn err >>= fun () -> + return () + | Error err -> + lwt_debug "Error %a" pp_print_error err >>= fun () -> + cancel conn err >>= fun () -> + Lwt.return (Error err) + end ; + let len = MBytes.length msg in + Moving_average.add st.counter len ; + st.quota <- st.quota - len ; + Moving_average.add conn.counter len ; + if prio then conn.quota <- conn.quota - len ; + waiter st conn ; + worker_loop st + + let create max_speed = + let st = { + canceler = Canceler.create () ; + worker = Lwt.return_unit ; + counter = Moving_average.create ~init:0 ~alpha ; + max_speed ; quota = unopt 0 max_speed ; + quota_updated = Lwt_condition.create () ; + readys = Lwt_condition.create () ; + readys_high = Queue.create () ; + readys_low = Queue.create () ; + } in + st.worker <- + Lwt_utils.worker IO.name + (fun () -> worker_loop st) + (fun () -> Canceler.cancel st.canceler) ; + st + + let create_connection st in_param out_param canceler id = + let conn = + { id ; closed = false ; + canceler ; + in_param ; out_param ; + current_pop = Lwt.fail Not_found (* dummy *) ; + current_push = return () ; + counter = Moving_average.create ~init:0 ~alpha ; + quota = 0 ; last_quota = 0 ; + } in + waiter st conn ; + conn + + let update_quota st = + iter_option st.max_speed ~f:begin fun quota -> + st.quota <- (min st.quota 0) + quota ; + Lwt_condition.broadcast st.quota_updated () + end ; + if not (Queue.is_empty st.readys_low) then begin + let tmp = Queue.create () in + Queue.iter + (fun ((conn : connection), _ as msg) -> + if conn.quota > 0 then + Queue.push msg st.readys_high + else + Queue.push msg tmp) + st.readys_low ; + Queue.clear st.readys_low ; + Queue.transfer tmp st.readys_low ; + end + + let shutdown st = + Canceler.cancel st.canceler >>= fun () -> + st.worker + +end + +type error += Connection_closed + +module ReadScheduler = Scheduler(struct + let name = "io_scheduler(read)" + type in_param = Lwt_unix.file_descr * int + let pop (fd, maxlen) = + Lwt.catch + (fun () -> + let buf = MBytes.create maxlen in + Lwt_bytes.read fd buf 0 maxlen >>= fun len -> + if len = 0 then + fail Connection_closed + else + return (MBytes.sub buf 0 len) ) + (function + | Unix.Unix_error(Unix.ECONNRESET, _, _) -> + fail Connection_closed + | exn -> + Lwt.return (error_exn exn)) + type out_param = MBytes.t tzresult Lwt_pipe.t + let push p msg = + Lwt.catch + (fun () -> Lwt_pipe.push p (Ok msg) >>= return) + (fun exn -> fail (Exn exn)) + let close p err = + Lwt.catch + (fun () -> Lwt_pipe.push p (Error err)) + (fun _ -> Lwt.return_unit) + end) + +module WriteScheduler = Scheduler(struct + let name = "io_scheduler(write)" + type in_param = MBytes.t Lwt_pipe.t + let pop p = + Lwt.catch + (fun () -> Lwt_pipe.pop p >>= return) + (fun _ -> fail (Exn Lwt_pipe.Closed)) + type out_param = Lwt_unix.file_descr + let push fd buf = + Lwt.catch + (fun () -> + Lwt_utils.write_mbytes fd buf >>= return) + (function + | Unix.Unix_error(Unix.EPIPE, _, _) + | Lwt.Canceled + | End_of_file -> + fail Connection_closed + | exn -> + Lwt.return (error_exn exn)) + let close _p _err = Lwt.return_unit + end) + +type connection = { + id: int ; + sched: t ; + conn: Lwt_unix.file_descr ; + canceler: Canceler.t ; + read_conn: ReadScheduler.connection ; + read_queue: MBytes.t tzresult Lwt_pipe.t ; + write_conn: WriteScheduler.connection ; + write_queue: MBytes.t Lwt_pipe.t ; + mutable partial_read: MBytes.t option ; +} + +and t = { + mutable closed: bool ; + connected: connection Inttbl.t ; + read_scheduler: ReadScheduler.t ; + write_scheduler: WriteScheduler.t ; + max_upload_speed: int option ; (* bytes per second. *) + max_download_speed: int option ; + read_buffer_size: int ; + read_queue_size: int option ; + write_queue_size: int option ; +} + +let reset_quota st = + let { Moving_average.average = current_inflow } = + Moving_average.stat st.read_scheduler.counter + and { Moving_average.average = current_outflow } = + Moving_average.stat st.write_scheduler.counter in + let nb_conn = Inttbl.length st.connected in + if nb_conn > 0 then begin + let fair_read_quota = current_inflow / nb_conn + and fair_write_quota = current_outflow / nb_conn in + Inttbl.iter + (fun _id conn -> + conn.read_conn.last_quota <- fair_read_quota ; + conn.read_conn.quota <- + (min conn.read_conn.quota 0) + fair_read_quota ; + conn.write_conn.last_quota <- fair_write_quota ; + conn.write_conn.quota <- + (min conn.write_conn.quota 0) + fair_write_quota ; ) + st.connected + end ; + ReadScheduler.update_quota st.read_scheduler ; + WriteScheduler.update_quota st.write_scheduler + +let create + ?max_upload_speed ?max_download_speed + ?read_queue_size ?write_queue_size + ~read_buffer_size + () = + let st = { + closed = false ; + connected = Inttbl.create 53 ; + read_scheduler = ReadScheduler.create max_download_speed ; + write_scheduler = WriteScheduler.create max_upload_speed ; + max_upload_speed ; + max_download_speed ; + read_buffer_size ; + read_queue_size ; + write_queue_size ; + } in + Moving_average.on_update (fun () -> reset_quota st) ; + st + +exception Closed + +let register = + let cpt = ref 0 in + fun st conn -> + if st.closed then begin + Lwt.async (fun () -> Lwt_utils.safe_close conn) ; + raise Closed + end else begin + let id = incr cpt; !cpt in + let canceler = Canceler.create () in + let read_queue = Lwt_pipe.create ?size:st.read_queue_size () + and write_queue = Lwt_pipe.create ?size:st.write_queue_size () in + let read_conn = + ReadScheduler.create_connection + st.read_scheduler (conn, st.read_buffer_size) read_queue canceler id + and write_conn = + WriteScheduler.create_connection + st.write_scheduler write_queue conn canceler id in + Canceler.on_cancel canceler begin fun () -> + Inttbl.remove st.connected id ; + Moving_average.destroy read_conn.counter ; + Moving_average.destroy write_conn.counter ; + Lwt_pipe.close write_queue ; + Lwt_pipe.close read_queue ; + Lwt_utils.safe_close conn + end ; + let conn = { + sched = st ; id ; conn ; canceler ; + read_queue ; read_conn ; + write_queue ; write_conn ; + partial_read = None ; + } in + Inttbl.add st.connected id conn ; + conn + end + +let write { write_queue } msg = + Lwt.catch + (fun () -> Lwt_pipe.push write_queue msg >>= return) + (fun _ -> fail Connection_closed) +let write_now { write_queue } msg = Lwt_pipe.push_now write_queue msg + +let read_from conn ?pos ?len buf msg = + let maxlen = MBytes.length buf in + let pos = unopt 0 pos in + assert (0 <= pos && pos < maxlen) ; + let len = unopt (maxlen - pos) len in + assert (len <= maxlen - pos) ; + match msg with + | Ok msg -> + let msg_len = MBytes.length msg in + let read_len = min len msg_len in + MBytes.blit msg 0 buf pos read_len ; + if read_len < msg_len then + conn.partial_read <- + Some (MBytes.sub msg read_len (msg_len - read_len)) ; + Ok read_len + | Error _ -> + Error [Connection_closed] + +let read_now conn ?pos ?len buf = + match conn.partial_read with + | Some msg -> + conn.partial_read <- None ; + Some (read_from conn ?pos ?len buf (Ok msg)) + | None -> + try + map_option + (read_from conn ?pos ?len buf) + (Lwt_pipe.pop_now conn.read_queue) + with Lwt_pipe.Closed -> Some (Error [Connection_closed]) + +let read conn ?pos ?len buf = + match conn.partial_read with + | Some msg -> + conn.partial_read <- None ; + Lwt.return (read_from conn ?pos ?len buf (Ok msg)) + | None -> + Lwt.catch + (fun () -> + Lwt_pipe.pop conn.read_queue >|= fun msg -> + read_from conn ?pos ?len buf msg) + (fun _ -> fail Connection_closed) + +let read_full conn ?pos ?len buf = + let maxlen = MBytes.length buf in + let pos = unopt 0 pos in + let len = unopt (maxlen - pos) len in + assert (0 <= pos && pos < maxlen) ; + assert (len <= maxlen - pos) ; + let rec loop pos len = + if len = 0 then + return () + else + read conn ~pos ~len buf >>=? fun read_len -> + loop (pos + read_len) (len - read_len) in + loop pos len + +let convert ~ws ~rs = + { Stat.total_sent = ws.Moving_average.total ; + total_recv = rs.Moving_average.total ; + current_outflow = ws.average ; + current_inflow = rs.average ; + } + +let global_stat { read_scheduler ; write_scheduler } = + let rs = Moving_average.stat read_scheduler.counter + and ws = Moving_average.stat write_scheduler.counter in + convert ~rs ~ws + +let stat { read_conn ; write_conn} = + let rs = Moving_average.stat read_conn.counter + and ws = Moving_average.stat write_conn.counter in + convert ~rs ~ws + +let close conn = + Inttbl.remove conn.sched.connected conn.id ; + Lwt_pipe.close conn.write_queue ; + Canceler.cancelation conn.canceler >>= fun () -> + conn.write_conn.current_push >>= fun res -> + Lwt.return res + +let iter_connection { connected } f = + Inttbl.iter f connected + +let shutdown st = + st.closed <- true ; + ReadScheduler.shutdown st.read_scheduler >>= fun () -> + WriteScheduler.shutdown st.write_scheduler >>= fun () -> + Inttbl.fold + (fun _gid conn acc -> close conn >>= fun _ -> acc) + st.connected + Lwt.return_unit diff --git a/src/node/net/p2p_io_scheduler.mli b/src/node/net/p2p_io_scheduler.mli new file mode 100644 index 000000000..9e5d20139 --- /dev/null +++ b/src/node/net/p2p_io_scheduler.mli @@ -0,0 +1,93 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +(** IO Scheduling. This module implements generic IO scheduling + between file descriptors. In order to use IO scheduling, the + [register] function must be used to make a file descriptor managed + by a [scheduler].. It will return a value of type [connection] + that must be used to perform IO on the managed file descriptor + using this module's dedicated IO functions (read, write, etc.). + + Each connection is allowed a read (resp. write) quota, which is + for now fairly distributed among connections. + + To each connection is associated a read (resp. write) queue where + data is copied to (resp. read from), at a rate of + max_download_speed / num_connections (resp. max_upload_speed / + num_connections). +*) + +open P2p_types + +type connection +(** Type of a connection. *) + +type t +(** Type of an IO scheduler. *) + +val create: + ?max_upload_speed:int -> + ?max_download_speed:int -> + ?read_queue_size:int -> + ?write_queue_size:int -> + read_buffer_size:int -> + unit -> t +(** [create ~max_upload_speed ~max_download_speed ~read_queue_size + ~write_queue_size ()] is an IO scheduler with specified (global) + max upload (resp. download) speed, and specified read + (resp. write) queue sizes for connections. *) + +val register: t -> Lwt_unix.file_descr -> connection +(** [register sched fd] is a [connection] managed by [sched]. *) + +type error += Connection_closed + +val write: connection -> MBytes.t -> unit tzresult Lwt.t +(** [write conn msg] returns [Ok ()] when [msg] has been added to + [conn]'s write queue, or fail with an error. *) + +val write_now: connection -> MBytes.t -> bool +(** [write_now conn msg] is [true] iff [msg] has been (immediately) + added to [conn]'s write queue, [false] if it has been dropped. *) + +val read_now: + connection -> ?pos:int -> ?len:int -> MBytes.t -> int tzresult option +(** [read_now conn ~pos ~len buf] blits at most [len] bytes from + [conn]'s read queue and returns the number of bytes written in + [buf] starting at [pos]. *) + +val read: + connection -> ?pos:int -> ?len:int -> MBytes.t -> int tzresult Lwt.t +(** Like [read_now], but waits till [conn] read queue has at least one + element instead of failing. *) + +val read_full: + connection -> ?pos:int -> ?len:int -> MBytes.t -> unit tzresult Lwt.t +(** Like [read], but blits exactly [len] bytes in [buf]. *) + +val stat: connection -> Stat.t +(** [stat conn] is a snapshot of current bandwidth usage for + [conn]. *) + +val global_stat: t -> Stat.t +(** [global_stat sched] is a snapshot of [sched]'s bandwidth usage + (sum of [stat conn] for each [conn] in [sched]). *) + +val iter_connection: t -> (int -> connection -> unit) -> unit +(** [iter_connection sched f] applies [f] on each connection managed + by [sched]. *) + +val close: connection -> unit tzresult Lwt.t +(** [close conn] cancels [conn] and returns after any pending data has + been sent. *) + +val shutdown: t -> unit Lwt.t +(** [shutdown sched] returns after all connections managed by [sched] + have been closed and [sched]'s inner worker has successfully + canceled. *) diff --git a/src/node/net/p2p_types.ml b/src/node/net/p2p_types.ml new file mode 100644 index 000000000..846ba75a3 --- /dev/null +++ b/src/node/net/p2p_types.ml @@ -0,0 +1,46 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Logging.Net + +module Canceler = Lwt_utils.Canceler + +module Stat = struct + + type t = { + total_sent : int ; + total_recv : int ; + current_inflow : int ; + current_outflow : int ; + } + + let print_size ppf sz = + let ratio n = (float_of_int sz /. float_of_int (1 lsl n)) in + if sz < 1 lsl 10 then + Format.fprintf ppf "%d B" sz + else if sz < 1 lsl 20 then + Format.fprintf ppf "%.2f kiB" (ratio 10) + else + Format.fprintf ppf "%.2f MiB" (ratio 20) + + let pp ppf stat = + Format.fprintf ppf + "sent: %a (%a/s) recv: %a (%a/s)" + print_size stat.total_sent print_size stat.current_outflow + print_size stat.total_recv print_size stat.current_inflow + +end + +module Gid = struct + include Crypto_box.Public_key_hash + let pp = pp_short + module Map = Map.Make (Crypto_box.Public_key_hash) + module Set = Set.Make (Crypto_box.Public_key_hash) + module Table = Hash.Hash_table (Crypto_box.Public_key_hash) +end diff --git a/src/node/net/p2p_types.mli b/src/node/net/p2p_types.mli new file mode 100644 index 000000000..27e596452 --- /dev/null +++ b/src/node/net/p2p_types.mli @@ -0,0 +1,26 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +module Canceler = Lwt_utils.Canceler + + +(** Bandwidth usage statistics *) + +module Stat : sig + + type t = { + total_sent : int ; + total_recv : int ; + current_inflow : int ; + current_outflow : int ; + } + + val pp: Format.formatter -> t -> unit + +end diff --git a/src/tezos-deps.opam b/src/tezos-deps.opam index 0637af7dc..c1914a775 100644 --- a/src/tezos-deps.opam +++ b/src/tezos-deps.opam @@ -26,6 +26,7 @@ depends: [ "lwt" {>= "2.7.0" } "lwt_ssl" "menhir" + "mtime" "ocp-ocamlres" {>= "dev" } "ocplib-endian" "ocplib-json-typed" diff --git a/src/utils/moving_average.ml b/src/utils/moving_average.ml index 00b79977c..eea6a4334 100644 --- a/src/utils/moving_average.ml +++ b/src/utils/moving_average.ml @@ -7,31 +7,80 @@ (* *) (**************************************************************************) -class type ma = object - method add_float : float -> unit - method add_int : int -> unit - method get : float -end +open Lwt.Infix -class virtual base ?(init = 0.) () = object (self) - val mutable acc : float = init - method virtual add_float : float -> unit - method add_int x = self#add_float (float_of_int x) - method get = acc -end +module Inttbl = Hashtbl.Make(struct + type t = int + let equal (x: int) (y: int) = x = y + let hash = Hashtbl.hash + end) -class sma ?init () = object - inherit base ?init () - val mutable i = match init with None -> 0 | _ -> 1 - method add_float x = - acc <- (acc +. (x -. acc) /. (float_of_int @@ succ i)) ; - i <- succ i -end +type t = { + id: int; + alpha: int ; + mutable total: int ; + mutable current: int ; + mutable average: int ; +} -class ema ?init ~alpha () = object - inherit base ?init () - val alpha = alpha - method add_float x = - acc <- alpha *. x +. (1. -. alpha) *. acc -end +let counters = Inttbl.create 51 +let updated = Lwt_condition.create () + +let update_hook = ref [] +let on_update f = update_hook := f :: !update_hook + +let worker_loop () = + let prev = ref @@ Mtime.elapsed () in + let rec inner sleep = + sleep >>= fun () -> + let sleep = Lwt_unix.sleep 1. in + let now = Mtime.elapsed () in + let elapsed = int_of_float (Mtime.(to_ms now -. to_ms !prev)) in + prev := now; + Inttbl.iter + (fun _ c -> + c.average <- + (c.alpha * c.current) / elapsed + (1000 - c.alpha) * c.average / 1000; + c.current <- 0) + counters ; + List.iter (fun f -> f ()) !update_hook ; + Lwt_condition.broadcast updated () ; + inner sleep + in + inner (Lwt_unix.sleep 1.) + +let worker = + lazy begin + Lwt.async begin fun () -> + let (_cancelation, cancel, _on_cancel) = Lwt_utils.canceler () in + Lwt_utils.worker "counter" ~run:worker_loop ~cancel + end + end + +let create = + let cpt = ref 0 in + fun ~init ~alpha -> + Lazy.force worker ; + let id = !cpt in + incr cpt ; + assert (0. < alpha && alpha <= 1.) ; + let alpha = int_of_float (1000. *. alpha) in + let c = { id ; alpha ; total = 0 ; current = 0 ; average = init } in + Inttbl.add counters id c ; + c + +let add c x = + c.total <- c.total + x ; + c.current <- c.current + x + +let destroy c = + Inttbl.remove counters c.id + +type stat = { + total: int ; + average: int ; +} + +let stat ({ total ; average } : t) : stat = + { total ; average } diff --git a/src/utils/moving_average.mli b/src/utils/moving_average.mli index a5768ee51..24acbe95b 100644 --- a/src/utils/moving_average.mli +++ b/src/utils/moving_average.mli @@ -7,28 +7,18 @@ (* *) (**************************************************************************) -(** Moving averages. The formulas are from Wikipedia - [https://en.wikipedia.org/wiki/Moving_average] *) +type t -class type ma = object - method add_float : float -> unit - method add_int : int -> unit - method get : float -end -(** Common class type for objects computing a cumulative moving - average of some flavor. In a cumulative moving average, the data - arrive in an ordered datum stream, and the user would like to get - the average of all of the data up until the current datum - point. The method [add_float] and [add_int] are used to add the - next datum. The method [get] and [get_exn] are used to compute the - moving average up until the current datum point. *) +val create: init:int -> alpha:float -> t +val destroy: t -> unit -class sma : ?init:float -> unit -> ma -(** [sma ?init ()] is an object that computes the Simple Moving - Average of a datum stream. [SMA(n+1) = SMA(n) + (x_(n+1) / SMA(n)) - / (n+1)] *) +val add: t -> int -> unit -class ema : ?init:float -> alpha:float -> unit -> ma -(** [ema ?init ~alpha ()] is an object that computes the Exponential - Moving Average of a datum stream. [EMA(n+1) = alpha * x_(n+1) + - (1 - alpha) * x_n] *) +val on_update: (unit -> unit) -> unit +val updated: unit Lwt_condition.t + +type stat = { + total: int ; + average: int ; +} +val stat: t -> stat diff --git a/test/Makefile b/test/Makefile index 429836541..36e6a356d 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,5 +1,9 @@ -TESTS := data-encoding store context state basic basic.sh +TESTS := \ + data-encoding \ + store context state \ + basic basic.sh \ + p2p-io-scheduler all: test @@ -36,6 +40,7 @@ PACKAGES := \ irmin.unix \ lwt \ lwt.unix \ + mtime.os \ ocplib-endian \ ocplib-ocamlres \ ocplib-json-typed.bson \ @@ -66,7 +71,7 @@ ${NODELIB} ${CLIENTLIB}: ${MAKE} -C ../src $@ .PHONY: build-test run-test test -build-test: ${addprefix build-test-,${TESTS}} test-p2p +build-test: ${addprefix build-test-,${TESTS}} run-test: @$(patsubst %,${MAKE} run-test-% ; , ${TESTS}) \ echo && echo "Success" && echo @@ -177,13 +182,35 @@ clean:: ############################################################################ ## p2p test program -TEST_P2P_INTFS = +.PHONY:build-test-io_schduler +build-test-p2p-io-scheduler: test-p2p-io-scheduler +run-test-p2p-io-scheduler: + ./test-p2p-io-scheduler \ + --delay 20 --clients 8 \ + --max-upload-speed $$((1 << 18)) \ + --max-download-speed $$((1 << 20)) -TEST_P2P_IMPLS = \ - test_p2p.ml +TEST_P2P_IO_SCHEDULER_IMPLS = \ + lib/process.ml \ + test_p2p_io_scheduler.ml + +${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx}: ${NODELIB} +test-p2p-io-scheduler: ${NODELIB} ${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx} + ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^ + +clean:: + rm -f test-p2p-io_scheduler + +############################################################################ +## lwt pipe test program + +build-test-lwt-pipe: test-lwt-pipe + +TEST_PIPE_IMPLS = \ + test_lwt_pipe.ml ${TEST_BASIC_IMPLS:.ml=.cmx}: ${NODELIB} -test-p2p: ${NODELIB} ${TEST_P2P_IMPLS:.ml=.cmx} +test-lwt-pipe: ${NODELIB} ${TEST_PIPE_IMPLS:.ml=.cmx} ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^ clean:: diff --git a/test/lib/process.ml b/test/lib/process.ml new file mode 100644 index 000000000..2a60b2bbc --- /dev/null +++ b/test/lib/process.ml @@ -0,0 +1,79 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +include Logging.Make (struct let name = "process" end) + +open Error_monad + +exception Exited of int + +let detach ?(prefix = "") f = + Lwt_io.flush_all () >>= fun () -> + match Lwt_unix.fork () with + | 0 -> + Random.self_init () ; + let template = Format.asprintf "%s$(section): $(message)" prefix in + let logger = + Lwt_log.channel + ~template ~close_mode:`Keep ~channel:Lwt_io.stderr () in + Logging.init (Manual logger) ; + Lwt_main.run begin + lwt_log_notice "PID: %d" (Unix.getpid ()) >>= fun () -> + f () + end ; + exit 0 + | pid -> + Lwt.catch + (fun () -> + Lwt_unix.waitpid [] pid >>= function + | (_,Lwt_unix.WEXITED 0) -> + Lwt.return_unit + | (_,Lwt_unix.WEXITED n) -> + Lwt.fail (Exited n) + | (_,Lwt_unix.WSIGNALED _) + | (_,Lwt_unix.WSTOPPED _) -> + Lwt.fail Exit) + (function + | Lwt.Canceled -> + Unix.kill pid Sys.sigkill ; + Lwt.return_unit + | exn -> Lwt.fail exn) + +let handle_error f = + Lwt.catch + f + (fun exn -> Lwt.return (error_exn exn)) >>= function + | Ok () -> Lwt.return_unit + | Error err -> + lwt_log_error "%a" Error_monad.pp_print_error err >>= fun () -> + exit 1 + +let rec wait processes = + Lwt.catch + (fun () -> + Lwt.nchoose_split processes >>= function + | (_, []) -> lwt_log_notice "All done!" + | (_, processes) -> wait processes) + (function + | Exited n -> + lwt_log_notice "Early error!" >>= fun () -> + List.iter Lwt.cancel processes ; + Lwt.catch + (fun () -> Lwt.join processes) + (fun _ -> Lwt.return_unit) >>= fun () -> + lwt_log_notice "A process finished with error %d !" n >>= fun () -> + Pervasives.exit n + | exn -> + lwt_log_notice "Unexpected error!%a" + Error_monad.pp_exn exn >>= fun () -> + List.iter Lwt.cancel processes ; + Lwt.catch + (fun () -> Lwt.join processes) + (fun _ -> Lwt.return_unit) >>= fun () -> + Pervasives.exit 2) diff --git a/test/lib/process.mli b/test/lib/process.mli new file mode 100644 index 000000000..c1933cc11 --- /dev/null +++ b/test/lib/process.mli @@ -0,0 +1,15 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2017. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Error_monad +exception Exited of int + +val detach: ?prefix:string -> (unit -> unit Lwt.t) -> unit Lwt.t +val handle_error: (unit -> (unit, error list) result Lwt.t) -> unit Lwt.t +val wait: unit Lwt.t list -> unit Lwt.t diff --git a/test/test_p2p.ml b/test/test_p2p.ml deleted file mode 100644 index bf1c44617..000000000 --- a/test/test_p2p.ml +++ /dev/null @@ -1,167 +0,0 @@ -open Lwt.Infix -open P2p - -include Logging.Make (struct let name = "test-p2p" end) - -module Param = struct - - let dump_encoding = Data_encoding.(Variable.list (tup2 string string)) - - type msg = - | Create of string * string - | Update of string * string - | Delete of string - | Dump of (string * string) list - - let encodings = [ - Encoding { tag = 0x10; - encoding = Data_encoding.(tup2 string string); - wrap = (function (k, v) -> Create (k, v)); - unwrap = (function Create (k, v) -> Some (k, v) | _ -> None); - max_length = Some 0x400; - }; - Encoding { tag = 0x11; - encoding = Data_encoding.(tup2 string string); - wrap = (function (k, v) -> Update (k, v)); - unwrap = (function Create (k, v) -> Some (k, v) | _ -> None); - max_length = Some 0x400; - }; - Encoding { tag = 0x12; - encoding = Data_encoding.string; - wrap = (function x -> Delete x); - unwrap = (function Delete x -> Some x | _ -> None); - max_length = Some 0x400; - }; - Encoding { tag = 0x13; - encoding = dump_encoding; - wrap = (function x -> Dump x); - unwrap = (function Dump x -> Some x | _ -> None); - max_length = Some 0x10000; - }; - ] - - type metadata = unit - let initial_metadata = () - let metadata_encoding = Data_encoding.empty - let score () = 0. - - let supported_versions = [ { name = "TEST"; major = 0; minor = 0; } ] -end - -module Net = Make(Param) - -let print_peer_info { Net.gid; addr; port; version = { name; major; minor } } = - Printf.sprintf "%s:%d (%s.%d.%d)" (Ipaddr.to_string addr) port name major minor - -let string_of_gid gid = Format.asprintf "%a" pp_gid gid - -let net_monitor config limits num_nets net = - let my_gid_str = string_of_gid @@ Net.gid net in - let send_msgs_to_neighbours neighbours = - Lwt_list.iter_p begin fun p -> - let { Net.gid } = Net.peer_info net p in - let remote_gid_str = string_of_gid gid in - Net.send net p (Create (my_gid_str, remote_gid_str)) >>= fun _ -> - lwt_log_notice "(%s) Done sending msg to %s" my_gid_str remote_gid_str - end neighbours >>= fun () -> - lwt_log_notice "(%s) Done sending all msgs." my_gid_str - in - let rec inner () = - let neighbours = Net.peers net in - let nb_neighbours = List.length neighbours in - if nb_neighbours < num_nets - 1 then begin - log_notice "(%s) I have %d peers" my_gid_str nb_neighbours; - Lwt_unix.sleep 1. >>= inner end - else begin - log_notice "(%s) I know all my %d peers" my_gid_str nb_neighbours; - Lwt.async (fun () -> send_msgs_to_neighbours neighbours); - let rec recv_peer_msgs acc = - if List.length acc = num_nets - 1 then begin - (* Print total sent/recv *) - let peers = Net.peers net in - ListLabels.iter peers ~f:begin fun p -> - let pi = Net.peer_info net p in - log_info "%a -> %a %d %d %.2f %.2f" pp_gid (Net.gid net) pp_gid pi.gid - pi.total_sent pi.total_recv pi.current_inflow pi.current_outflow; - end; - ListLabels.iter acc ~f:(fun (k, v) -> log_info "%s %s" k v); - Lwt.return_unit - end - else begin - lwt_log_notice "(%s) recv_peers_msgs: Got %d, need %d" - my_gid_str (List.length acc) (num_nets - 1) >>= fun () -> - Net.recv net >>= function - | p, (Create (their_gid, my_gid)) -> - lwt_log_notice "(%s) Got a message from %s" my_gid_str their_gid >>= fun () -> - recv_peer_msgs ((their_gid, my_gid) :: acc) - | _ -> assert false - end - in - recv_peer_msgs [] - end - in inner () - -let range n = - let rec inner acc = function - | -1 -> acc - | n -> inner (n :: acc) (pred n) - in - if n < 0 then invalid_arg "range" - else inner [] (pred n) - -let main () = - let incoming_port = ref @@ Some 11732 in - let discovery_port = ref @@ Some 10732 in - let closed_network = ref false in - - let max_packet_size = ref 1024 in - let peer_answer_timeout = ref 10. in - let blacklist_time = ref 100. in - let num_networks = ref 0 in - - let make_net nb_neighbours n = - let config = { - incoming_port = Utils.map_option !incoming_port ~f:(fun p -> p + n); - discovery_port = !discovery_port; - known_peers = []; - peers_file = ""; - closed_network = !closed_network; - } - in - let limits = { - max_message_size = !max_packet_size; - peer_answer_timeout = !peer_answer_timeout; - expected_connections = nb_neighbours; - min_connections = nb_neighbours; - max_connections = nb_neighbours; - blacklist_time = !blacklist_time; - } - in - Net.bootstrap ~config ~limits >|= fun net -> - config, limits, net - in - let spec = Arg.[ - "-start-port", Int (fun p -> incoming_port := Some p), " Incoming port"; - "-dport", Int (fun p -> discovery_port := Some p), " Discovery port"; - "-closed", Set closed_network, " Closed network mode"; - - "-max-packet-size", Set_int max_packet_size, "int Max size of packets"; - "-peer-answer-timeout", Set_float peer_answer_timeout, "float Number of seconds"; - "-blacklist-time", Set_float blacklist_time, "float Number of seconds"; - "-v", Unit (fun () -> Lwt_log_core.(add_rule "*" Info)), " Log up to info msgs"; - "-vv", Unit (fun () -> Lwt_log_core.(add_rule "*" Debug)), " Log up to debug msgs"; - ] - in - let anon_fun num_peers = num_networks := int_of_string num_peers in - let usage_msg = "Usage: %s .\nArguments are:" in - Arg.parse spec anon_fun usage_msg; - let nets = range !num_networks in - Lwt_list.map_p (make_net (pred !num_networks)) nets >>= fun nets -> - Lwt_list.iter_p (fun (cfg, limits, net) -> net_monitor cfg limits !num_networks net) nets >>= fun () -> - lwt_log_notice "All done!" - -let () = - Sys.catch_break true; - try - Lwt_main.run @@ main () - with _ -> () diff --git a/test/test_p2p_io_scheduler.ml b/test/test_p2p_io_scheduler.ml new file mode 100644 index 000000000..e41fca204 --- /dev/null +++ b/test/test_p2p_io_scheduler.ml @@ -0,0 +1,232 @@ +(**************************************************************************) +(* *) +(* Copyright (c) 2014 - 2016. *) +(* Dynamic Ledger Solutions, Inc. *) +(* *) +(* All rights reserved. No warranty, explicit or implicit, provided. *) +(* *) +(**************************************************************************) + +open Error_monad +open P2p_types +include Logging.Make (struct let name = "test-p2p-io-scheduler" end) + +exception Error of error list + +let rec listen ?port addr = + let tentative_port = + match port with + | None -> 1024 + Random.int 8192 + | Some port -> port in + let uaddr = Ipaddr_unix.V6.to_inet_addr addr in + let main_socket = Lwt_unix.(socket PF_INET6 SOCK_STREAM 0) in + Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ; + Lwt.catch begin fun () -> + Lwt_unix.Versioned.bind_2 main_socket + (ADDR_INET (uaddr, tentative_port)) >>= fun () -> + Lwt_unix.listen main_socket 50 ; + Lwt.return (main_socket, tentative_port) + end begin function + | Unix.Unix_error + ((Unix.EADDRINUSE | Unix.EADDRNOTAVAIL), _, _) when port = None -> + listen addr + | exn -> Lwt.fail exn + end + +let accept main_socket = + Lwt_unix.accept main_socket >>= fun (fd, sockaddr) -> + return fd + +let rec accept_n main_socket n = + if n <= 0 then + return [] + else + accept_n main_socket (n-1) >>=? fun acc -> + accept main_socket >>=? fun conn -> + return (conn :: acc) + +let connect addr port = + let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in + let uaddr = + Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in + Lwt_unix.connect fd uaddr >>= fun () -> + return fd + +let simple_msgs = + [| + MBytes.create (1 lsl 6) ; + MBytes.create (1 lsl 7) ; + MBytes.create (1 lsl 8) ; + MBytes.create (1 lsl 9) ; + MBytes.create (1 lsl 10) ; + MBytes.create (1 lsl 11) ; + MBytes.create (1 lsl 12) ; + MBytes.create (1 lsl 13) ; + MBytes.create (1 lsl 14) ; + MBytes.create (1 lsl 15) ; + MBytes.create (1 lsl 16) ; + |] +let nb_simple_msgs = Array.length simple_msgs + +let receive conn = + let buf = MBytes.create (1 lsl 16) in + let rec loop () = + P2p_io_scheduler.read conn buf >>= function + | Ok _ -> loop () + | Error [P2p_io_scheduler.Connection_closed] -> + Lwt.return () + | Error err -> Lwt.fail (Error err) + in + loop () + +let server + ?(display_client_stat = true) + ?max_download_speed ?read_queue_size ~read_buffer_size + main_socket n = + let sched = + P2p_io_scheduler.create + ?max_download_speed + ?read_queue_size + ~read_buffer_size + () in + Moving_average.on_update begin fun () -> + log_notice "Stat: %a" Stat.pp (P2p_io_scheduler.global_stat sched) ; + if display_client_stat then + P2p_io_scheduler.iter_connection sched + (fun id conn -> + log_notice " client(%d) %a" id Stat.pp (P2p_io_scheduler.stat conn)) ; + end ; + (* Accept and read message until the connection is closed. *) + accept_n main_socket n >>=? fun conns -> + let conns = List.map (P2p_io_scheduler.register sched) conns in + Lwt.join (List.map receive conns) >>= fun () -> + iter_p P2p_io_scheduler.close conns >>=? fun () -> + log_notice "OK %a" Stat.pp (P2p_io_scheduler.global_stat sched) ; + return () + +let max_size ?max_upload_speed () = + match max_upload_speed with + | None -> nb_simple_msgs + | Some max_upload_speed -> + let rec loop n = + if n <= 1 then 1 + else if MBytes.length simple_msgs.(n-1) <= max_upload_speed then n + else loop (n - 1) + in + loop nb_simple_msgs + +let rec send conn nb_simple_msgs = + Lwt_main.yield () >>= fun () -> + let msg = simple_msgs.(Random.int nb_simple_msgs) in + P2p_io_scheduler.write conn msg >>=? fun () -> + send conn nb_simple_msgs + +let client ?max_upload_speed ?write_queue_size addr port time n = + let sched = + P2p_io_scheduler.create + ?max_upload_speed ?write_queue_size ~read_buffer_size:(1 lsl 12) () in + connect addr port >>=? fun conn -> + let conn = P2p_io_scheduler.register sched conn in + let nb_simple_msgs = max_size ?max_upload_speed () in + Lwt.pick [ send conn nb_simple_msgs ; + Lwt_unix.sleep time >>= return ] >>=? fun () -> + P2p_io_scheduler.close conn >>=? fun () -> + let stat = P2p_io_scheduler.stat conn in + lwt_log_notice "Client OK %a" Stat.pp stat >>= fun () -> + return () + +let run + ?display_client_stat + ?max_download_speed ?max_upload_speed + ~read_buffer_size ?read_queue_size ?write_queue_size + addr port time n = + Logging.init Stderr ; + listen ?port addr >>= fun (main_socket, port) -> + let server = + Process.detach ~prefix:"server " begin fun () -> + Process.handle_error begin fun () -> + server + ?display_client_stat ?max_download_speed + ~read_buffer_size ?read_queue_size + main_socket n + end + end in + let client n = + let prefix = Printf.sprintf "client(%d) " n in + Process.detach ~prefix begin fun () -> + Lwt_utils.safe_close main_socket >>= fun () -> + Process.handle_error begin fun () -> + client ?max_upload_speed ?write_queue_size addr port time n + end + end in + Process.wait (server :: List.map client Utils.(1 -- n)) + +let () = Random.self_init () + +let addr = ref Ipaddr.V6.localhost +let port = ref None + +let max_download_speed = ref None +let max_upload_speed = ref None + +let read_buffer_size = ref (1 lsl 14) +let read_queue_size = ref (Some 1) +let write_queue_size = ref (Some 1) + +let delay = ref 60. +let clients = ref 8 + +let display_client_stat = ref None + +let spec = + Arg.[ + + "--port", Int (fun p -> port := Some p), " Listening port"; + + "--addr", String (fun p -> addr := Ipaddr.V6.of_string_exn p), + " Listening addr"; + + "--max-download-speed", Int (fun i -> max_download_speed := Some i), + " Max download speed in B/s (default: unbounded)"; + + "--max-upload-speed", Int (fun i -> max_upload_speed := Some i), + " Max upload speed in B/s (default: unbounded)"; + + "--read-buffer-size", Set_int read_buffer_size, + " Size of the read buffers"; + + "--read-queue-size", Int (fun i -> + read_queue_size := if i <= 0 then None else Some i), + " Size of the read queue (0=unbounded)"; + + "--write-queue-size", Int (fun i -> + write_queue_size := if i <= 0 then None else Some i), + " Size of the write queue (0=unbounded)"; + + "--delay", Set_float delay, " Client execution time."; + "--clients", Set_int clients, " Number of concurrent clients."; + + "--hide-clients-stat", Unit (fun () -> display_client_stat := Some false), + " Hide the client bandwidth statistic." ; + + "--display_clients_stat", Unit (fun () -> display_client_stat := Some true), + " Display the client bandwidth statistic." ; + + ] + +let () = + let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in + let usage_msg = "Usage: %s .\nArguments are:" in + Arg.parse spec anon_fun usage_msg + +let () = + Sys.catch_break true ; + Lwt_main.run + (run + ?display_client_stat:!display_client_stat + ?max_download_speed:!max_download_speed + ?max_upload_speed:!max_upload_speed + ~read_buffer_size:!read_buffer_size + ?read_queue_size:!read_queue_size + ?write_queue_size:!write_queue_size + !addr !port !delay !clients)