Shell: implement P2p_io_scheduler

This commit is contained in:
Grégoire Henry 2017-01-14 13:13:59 +01:00
parent 7f091b38b9
commit 2ed8bf2cfa
16 changed files with 1072 additions and 238 deletions

2
.gitignore vendored
View File

@ -39,7 +39,7 @@
/test/test-context
/test/test-basic
/test/test-data-encoding
/test/test-p2p
/test/test-p2p-io-scheduler
/test/LOG
*~

View File

@ -23,6 +23,7 @@ FLG -w -30
FLG -w -40
PKG base64
PKG calendar
PKG cmdliner
PKG cohttp
PKG compiler-libs.optcomp
PKG conduit
@ -33,12 +34,13 @@ PKG ezjsonm
PKG git
PKG irmin
PKG lwt
PKG mtime.os
PKG ocplib-endian
PKG ocplib-json-typed
PKG ocplib-ocamlres
PKG ocplib-resto.directory
PKG result
PKG sodium
PKG ssl
PKG unix
PKG zarith
PKG cmdliner

View File

@ -187,6 +187,7 @@ UTILS_PACKAGES := \
base64 \
calendar \
ezjsonm \
mtime.os \
sodium \
zarith \
$(COVERAGEPKG) \
@ -256,6 +257,8 @@ clean::
NODE_LIB_INTFS := \
\
node/net/p2p_types.mli \
node/net/p2p_io_scheduler.mli \
node/net/p2p.mli \
node/net/RPC_server.mli \
\
@ -286,7 +289,10 @@ NODE_LIB_IMPLS := \
\
compiler/node_compiler_main.ml \
\
node/net/p2p_types.ml \
node/net/p2p_io_scheduler.ml \
node/net/p2p.ml \
\
node/net/RPC_server.ml \
\
node/updater/fitness.ml \
@ -594,10 +600,8 @@ NO_DEPS := \
compiler/embedded_cmis.cmx compiler/embedded_cmis.cmi: OPENED_MODULES=
ifneq ($(MAKECMDGOALS),clean)
ifneq ($(MAKECMDGOALS),build-deps)
include .depend
endif
endif
DEPENDS := $(filter-out $(NO_DEPS), \
$(MINUTILS_LIB_INTFS) $(MINUTILS_LIB_IMPLS) \
$(UTILS_LIB_INTFS) $(UTILS_LIB_IMPLS) \

View File

@ -486,24 +486,12 @@ module Make (P: PARAMS) = struct
(* Them we can build the net object and launch the worker. *)
and connected buf local_nonce version gid public_key nonce listening_port =
let feed_ma ?(freq=1.) ma counter =
let rec inner old_received =
Lwt_unix.sleep freq >>= fun () ->
let received = !counter in
ma#add_int (received - old_received);
inner received in
Lwt.async (fun () -> Lwt.pick [cancelation (); inner !counter])
in
(* net object state *)
let last = ref (Unix.gettimeofday ()) in
let local_nonce = ref local_nonce in
let remote_nonce = ref nonce in
let received = ref 0 in
let sent = ref 0 in
let received_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in
let sent_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in
feed_ma received_ema received ;
feed_ma sent_ema sent ;
(* net object callbaks *)
let last_seen () = !last in
let get_nonce nonce =
@ -520,8 +508,8 @@ module Make (P: PARAMS) = struct
let reader = Lwt_pipe.create ~size:2 () in
let total_sent () = !sent in
let total_recv () = !received in
let current_inflow () = received_ema#get in
let current_outflow () = sent_ema#get in
let current_inflow () = 0. in
let current_outflow () = 0. in
(* net object construction *)
let peer = { gid ; public_key ; point = (addr, port) ;
listening_port ; version ; last_seen ;

View File

@ -0,0 +1,449 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(* TODO decide whether we need to preallocate buffers or not. *)
open P2p_types
include Logging.Make (struct let name = "p2p.io-scheduler" end)
module Inttbl = Hashtbl.Make(struct
type t = int
let equal (x: int) (y: int) = x = y
let hash = Hashtbl.hash
end)
let alpha = 0.2
module type IO = sig
val name: string
type in_param
val pop: in_param -> MBytes.t tzresult Lwt.t
type out_param
val push: out_param -> MBytes.t -> unit tzresult Lwt.t
val close: out_param -> error list -> unit Lwt.t
end
module Scheduler(IO : IO) = struct
type t = {
canceler: Canceler.t ;
mutable worker: unit Lwt.t ;
counter: Moving_average.t ;
max_speed: int option ;
mutable quota: int ;
quota_updated: unit Lwt_condition.t ;
readys: unit Lwt_condition.t ;
readys_high: (connection * MBytes.t tzresult) Queue.t ;
readys_low: (connection * MBytes.t tzresult) Queue.t ;
}
and connection = {
id: int ;
mutable closed: bool ;
canceler: Canceler.t ;
in_param: IO.in_param ;
out_param: IO.out_param ;
mutable current_pop: MBytes.t tzresult Lwt.t ;
mutable current_push: unit tzresult Lwt.t ;
counter: Moving_average.t ;
mutable quota: int ;
mutable last_quota: int ;
}
let cancel (conn : connection) err =
if not conn.closed then begin
conn.closed <- true ;
Lwt.catch
(fun () -> IO.close conn.out_param err)
(fun _ -> Lwt.return_unit) >>= fun () ->
Canceler.cancel conn.canceler
end else
Lwt.return_unit
let waiter st conn =
assert (Lwt.state conn.current_pop <> Sleep) ;
conn.current_pop <- IO.pop conn.in_param ;
Lwt.async begin fun () ->
conn.current_pop >>= fun res ->
conn.current_push >>= fun _ ->
let was_empty =
Queue.is_empty st.readys_high && Queue.is_empty st.readys_low in
if conn.quota > 0 then
Queue.push (conn, res) st.readys_high
else
Queue.push (conn, res) st.readys_low ;
if was_empty then Lwt_condition.broadcast st.readys () ;
Lwt.return_unit
end
let wait_data st =
let is_empty =
Queue.is_empty st.readys_high && Queue.is_empty st.readys_low in
if is_empty then Lwt_condition.wait st.readys else Lwt.return_unit
let check_quota st =
if st.max_speed <> None && st.quota < 0 then
Lwt_condition.wait st.quota_updated
else
Lwt_unix.yield ()
let rec worker_loop st =
check_quota st >>= fun () ->
Lwt.pick [
Canceler.cancelation st.canceler ;
wait_data st
] >>= fun () ->
if Canceler.canceled st.canceler then
Lwt.return_unit
else
let prio, (conn, msg) =
if not (Queue.is_empty st.readys_high) then
true, (Queue.pop st.readys_high)
else
false, (Queue.pop st.readys_low)
in
match msg with
| Error [Lwt_utils.Canceled] ->
worker_loop st
| Error ([Exn (Lwt_pipe.Closed |
Unix.Unix_error (EBADF, _, _))] as err) ->
cancel conn err >>= fun () ->
worker_loop st
| Error err ->
lwt_debug "Error %a" pp_print_error err >>= fun () ->
cancel conn err >>= fun () ->
worker_loop st
| Ok msg ->
conn.current_push <- begin
IO.push conn.out_param msg >>= function
| Ok ()
| Error [Lwt_utils.Canceled] ->
return ()
| Error ([Exn (Unix.Unix_error (EBADF, _, _) |
Lwt_pipe.Closed)] as err) ->
cancel conn err >>= fun () ->
return ()
| Error err ->
lwt_debug "Error %a" pp_print_error err >>= fun () ->
cancel conn err >>= fun () ->
Lwt.return (Error err)
end ;
let len = MBytes.length msg in
Moving_average.add st.counter len ;
st.quota <- st.quota - len ;
Moving_average.add conn.counter len ;
if prio then conn.quota <- conn.quota - len ;
waiter st conn ;
worker_loop st
let create max_speed =
let st = {
canceler = Canceler.create () ;
worker = Lwt.return_unit ;
counter = Moving_average.create ~init:0 ~alpha ;
max_speed ; quota = unopt 0 max_speed ;
quota_updated = Lwt_condition.create () ;
readys = Lwt_condition.create () ;
readys_high = Queue.create () ;
readys_low = Queue.create () ;
} in
st.worker <-
Lwt_utils.worker IO.name
(fun () -> worker_loop st)
(fun () -> Canceler.cancel st.canceler) ;
st
let create_connection st in_param out_param canceler id =
let conn =
{ id ; closed = false ;
canceler ;
in_param ; out_param ;
current_pop = Lwt.fail Not_found (* dummy *) ;
current_push = return () ;
counter = Moving_average.create ~init:0 ~alpha ;
quota = 0 ; last_quota = 0 ;
} in
waiter st conn ;
conn
let update_quota st =
iter_option st.max_speed ~f:begin fun quota ->
st.quota <- (min st.quota 0) + quota ;
Lwt_condition.broadcast st.quota_updated ()
end ;
if not (Queue.is_empty st.readys_low) then begin
let tmp = Queue.create () in
Queue.iter
(fun ((conn : connection), _ as msg) ->
if conn.quota > 0 then
Queue.push msg st.readys_high
else
Queue.push msg tmp)
st.readys_low ;
Queue.clear st.readys_low ;
Queue.transfer tmp st.readys_low ;
end
let shutdown st =
Canceler.cancel st.canceler >>= fun () ->
st.worker
end
type error += Connection_closed
module ReadScheduler = Scheduler(struct
let name = "io_scheduler(read)"
type in_param = Lwt_unix.file_descr * int
let pop (fd, maxlen) =
Lwt.catch
(fun () ->
let buf = MBytes.create maxlen in
Lwt_bytes.read fd buf 0 maxlen >>= fun len ->
if len = 0 then
fail Connection_closed
else
return (MBytes.sub buf 0 len) )
(function
| Unix.Unix_error(Unix.ECONNRESET, _, _) ->
fail Connection_closed
| exn ->
Lwt.return (error_exn exn))
type out_param = MBytes.t tzresult Lwt_pipe.t
let push p msg =
Lwt.catch
(fun () -> Lwt_pipe.push p (Ok msg) >>= return)
(fun exn -> fail (Exn exn))
let close p err =
Lwt.catch
(fun () -> Lwt_pipe.push p (Error err))
(fun _ -> Lwt.return_unit)
end)
module WriteScheduler = Scheduler(struct
let name = "io_scheduler(write)"
type in_param = MBytes.t Lwt_pipe.t
let pop p =
Lwt.catch
(fun () -> Lwt_pipe.pop p >>= return)
(fun _ -> fail (Exn Lwt_pipe.Closed))
type out_param = Lwt_unix.file_descr
let push fd buf =
Lwt.catch
(fun () ->
Lwt_utils.write_mbytes fd buf >>= return)
(function
| Unix.Unix_error(Unix.EPIPE, _, _)
| Lwt.Canceled
| End_of_file ->
fail Connection_closed
| exn ->
Lwt.return (error_exn exn))
let close _p _err = Lwt.return_unit
end)
type connection = {
id: int ;
sched: t ;
conn: Lwt_unix.file_descr ;
canceler: Canceler.t ;
read_conn: ReadScheduler.connection ;
read_queue: MBytes.t tzresult Lwt_pipe.t ;
write_conn: WriteScheduler.connection ;
write_queue: MBytes.t Lwt_pipe.t ;
mutable partial_read: MBytes.t option ;
}
and t = {
mutable closed: bool ;
connected: connection Inttbl.t ;
read_scheduler: ReadScheduler.t ;
write_scheduler: WriteScheduler.t ;
max_upload_speed: int option ; (* bytes per second. *)
max_download_speed: int option ;
read_buffer_size: int ;
read_queue_size: int option ;
write_queue_size: int option ;
}
let reset_quota st =
let { Moving_average.average = current_inflow } =
Moving_average.stat st.read_scheduler.counter
and { Moving_average.average = current_outflow } =
Moving_average.stat st.write_scheduler.counter in
let nb_conn = Inttbl.length st.connected in
if nb_conn > 0 then begin
let fair_read_quota = current_inflow / nb_conn
and fair_write_quota = current_outflow / nb_conn in
Inttbl.iter
(fun _id conn ->
conn.read_conn.last_quota <- fair_read_quota ;
conn.read_conn.quota <-
(min conn.read_conn.quota 0) + fair_read_quota ;
conn.write_conn.last_quota <- fair_write_quota ;
conn.write_conn.quota <-
(min conn.write_conn.quota 0) + fair_write_quota ; )
st.connected
end ;
ReadScheduler.update_quota st.read_scheduler ;
WriteScheduler.update_quota st.write_scheduler
let create
?max_upload_speed ?max_download_speed
?read_queue_size ?write_queue_size
~read_buffer_size
() =
let st = {
closed = false ;
connected = Inttbl.create 53 ;
read_scheduler = ReadScheduler.create max_download_speed ;
write_scheduler = WriteScheduler.create max_upload_speed ;
max_upload_speed ;
max_download_speed ;
read_buffer_size ;
read_queue_size ;
write_queue_size ;
} in
Moving_average.on_update (fun () -> reset_quota st) ;
st
exception Closed
let register =
let cpt = ref 0 in
fun st conn ->
if st.closed then begin
Lwt.async (fun () -> Lwt_utils.safe_close conn) ;
raise Closed
end else begin
let id = incr cpt; !cpt in
let canceler = Canceler.create () in
let read_queue = Lwt_pipe.create ?size:st.read_queue_size ()
and write_queue = Lwt_pipe.create ?size:st.write_queue_size () in
let read_conn =
ReadScheduler.create_connection
st.read_scheduler (conn, st.read_buffer_size) read_queue canceler id
and write_conn =
WriteScheduler.create_connection
st.write_scheduler write_queue conn canceler id in
Canceler.on_cancel canceler begin fun () ->
Inttbl.remove st.connected id ;
Moving_average.destroy read_conn.counter ;
Moving_average.destroy write_conn.counter ;
Lwt_pipe.close write_queue ;
Lwt_pipe.close read_queue ;
Lwt_utils.safe_close conn
end ;
let conn = {
sched = st ; id ; conn ; canceler ;
read_queue ; read_conn ;
write_queue ; write_conn ;
partial_read = None ;
} in
Inttbl.add st.connected id conn ;
conn
end
let write { write_queue } msg =
Lwt.catch
(fun () -> Lwt_pipe.push write_queue msg >>= return)
(fun _ -> fail Connection_closed)
let write_now { write_queue } msg = Lwt_pipe.push_now write_queue msg
let read_from conn ?pos ?len buf msg =
let maxlen = MBytes.length buf in
let pos = unopt 0 pos in
assert (0 <= pos && pos < maxlen) ;
let len = unopt (maxlen - pos) len in
assert (len <= maxlen - pos) ;
match msg with
| Ok msg ->
let msg_len = MBytes.length msg in
let read_len = min len msg_len in
MBytes.blit msg 0 buf pos read_len ;
if read_len < msg_len then
conn.partial_read <-
Some (MBytes.sub msg read_len (msg_len - read_len)) ;
Ok read_len
| Error _ ->
Error [Connection_closed]
let read_now conn ?pos ?len buf =
match conn.partial_read with
| Some msg ->
conn.partial_read <- None ;
Some (read_from conn ?pos ?len buf (Ok msg))
| None ->
try
map_option
(read_from conn ?pos ?len buf)
(Lwt_pipe.pop_now conn.read_queue)
with Lwt_pipe.Closed -> Some (Error [Connection_closed])
let read conn ?pos ?len buf =
match conn.partial_read with
| Some msg ->
conn.partial_read <- None ;
Lwt.return (read_from conn ?pos ?len buf (Ok msg))
| None ->
Lwt.catch
(fun () ->
Lwt_pipe.pop conn.read_queue >|= fun msg ->
read_from conn ?pos ?len buf msg)
(fun _ -> fail Connection_closed)
let read_full conn ?pos ?len buf =
let maxlen = MBytes.length buf in
let pos = unopt 0 pos in
let len = unopt (maxlen - pos) len in
assert (0 <= pos && pos < maxlen) ;
assert (len <= maxlen - pos) ;
let rec loop pos len =
if len = 0 then
return ()
else
read conn ~pos ~len buf >>=? fun read_len ->
loop (pos + read_len) (len - read_len) in
loop pos len
let convert ~ws ~rs =
{ Stat.total_sent = ws.Moving_average.total ;
total_recv = rs.Moving_average.total ;
current_outflow = ws.average ;
current_inflow = rs.average ;
}
let global_stat { read_scheduler ; write_scheduler } =
let rs = Moving_average.stat read_scheduler.counter
and ws = Moving_average.stat write_scheduler.counter in
convert ~rs ~ws
let stat { read_conn ; write_conn} =
let rs = Moving_average.stat read_conn.counter
and ws = Moving_average.stat write_conn.counter in
convert ~rs ~ws
let close conn =
Inttbl.remove conn.sched.connected conn.id ;
Lwt_pipe.close conn.write_queue ;
Canceler.cancelation conn.canceler >>= fun () ->
conn.write_conn.current_push >>= fun res ->
Lwt.return res
let iter_connection { connected } f =
Inttbl.iter f connected
let shutdown st =
st.closed <- true ;
ReadScheduler.shutdown st.read_scheduler >>= fun () ->
WriteScheduler.shutdown st.write_scheduler >>= fun () ->
Inttbl.fold
(fun _gid conn acc -> close conn >>= fun _ -> acc)
st.connected
Lwt.return_unit

View File

@ -0,0 +1,93 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
(** IO Scheduling. This module implements generic IO scheduling
between file descriptors. In order to use IO scheduling, the
[register] function must be used to make a file descriptor managed
by a [scheduler].. It will return a value of type [connection]
that must be used to perform IO on the managed file descriptor
using this module's dedicated IO functions (read, write, etc.).
Each connection is allowed a read (resp. write) quota, which is
for now fairly distributed among connections.
To each connection is associated a read (resp. write) queue where
data is copied to (resp. read from), at a rate of
max_download_speed / num_connections (resp. max_upload_speed /
num_connections).
*)
open P2p_types
type connection
(** Type of a connection. *)
type t
(** Type of an IO scheduler. *)
val create:
?max_upload_speed:int ->
?max_download_speed:int ->
?read_queue_size:int ->
?write_queue_size:int ->
read_buffer_size:int ->
unit -> t
(** [create ~max_upload_speed ~max_download_speed ~read_queue_size
~write_queue_size ()] is an IO scheduler with specified (global)
max upload (resp. download) speed, and specified read
(resp. write) queue sizes for connections. *)
val register: t -> Lwt_unix.file_descr -> connection
(** [register sched fd] is a [connection] managed by [sched]. *)
type error += Connection_closed
val write: connection -> MBytes.t -> unit tzresult Lwt.t
(** [write conn msg] returns [Ok ()] when [msg] has been added to
[conn]'s write queue, or fail with an error. *)
val write_now: connection -> MBytes.t -> bool
(** [write_now conn msg] is [true] iff [msg] has been (immediately)
added to [conn]'s write queue, [false] if it has been dropped. *)
val read_now:
connection -> ?pos:int -> ?len:int -> MBytes.t -> int tzresult option
(** [read_now conn ~pos ~len buf] blits at most [len] bytes from
[conn]'s read queue and returns the number of bytes written in
[buf] starting at [pos]. *)
val read:
connection -> ?pos:int -> ?len:int -> MBytes.t -> int tzresult Lwt.t
(** Like [read_now], but waits till [conn] read queue has at least one
element instead of failing. *)
val read_full:
connection -> ?pos:int -> ?len:int -> MBytes.t -> unit tzresult Lwt.t
(** Like [read], but blits exactly [len] bytes in [buf]. *)
val stat: connection -> Stat.t
(** [stat conn] is a snapshot of current bandwidth usage for
[conn]. *)
val global_stat: t -> Stat.t
(** [global_stat sched] is a snapshot of [sched]'s bandwidth usage
(sum of [stat conn] for each [conn] in [sched]). *)
val iter_connection: t -> (int -> connection -> unit) -> unit
(** [iter_connection sched f] applies [f] on each connection managed
by [sched]. *)
val close: connection -> unit tzresult Lwt.t
(** [close conn] cancels [conn] and returns after any pending data has
been sent. *)
val shutdown: t -> unit Lwt.t
(** [shutdown sched] returns after all connections managed by [sched]
have been closed and [sched]'s inner worker has successfully
canceled. *)

46
src/node/net/p2p_types.ml Normal file
View File

@ -0,0 +1,46 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open Logging.Net
module Canceler = Lwt_utils.Canceler
module Stat = struct
type t = {
total_sent : int ;
total_recv : int ;
current_inflow : int ;
current_outflow : int ;
}
let print_size ppf sz =
let ratio n = (float_of_int sz /. float_of_int (1 lsl n)) in
if sz < 1 lsl 10 then
Format.fprintf ppf "%d B" sz
else if sz < 1 lsl 20 then
Format.fprintf ppf "%.2f kiB" (ratio 10)
else
Format.fprintf ppf "%.2f MiB" (ratio 20)
let pp ppf stat =
Format.fprintf ppf
"sent: %a (%a/s) recv: %a (%a/s)"
print_size stat.total_sent print_size stat.current_outflow
print_size stat.total_recv print_size stat.current_inflow
end
module Gid = struct
include Crypto_box.Public_key_hash
let pp = pp_short
module Map = Map.Make (Crypto_box.Public_key_hash)
module Set = Set.Make (Crypto_box.Public_key_hash)
module Table = Hash.Hash_table (Crypto_box.Public_key_hash)
end

View File

@ -0,0 +1,26 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
module Canceler = Lwt_utils.Canceler
(** Bandwidth usage statistics *)
module Stat : sig
type t = {
total_sent : int ;
total_recv : int ;
current_inflow : int ;
current_outflow : int ;
}
val pp: Format.formatter -> t -> unit
end

View File

@ -26,6 +26,7 @@ depends: [
"lwt" {>= "2.7.0" }
"lwt_ssl"
"menhir"
"mtime"
"ocp-ocamlres" {>= "dev" }
"ocplib-endian"
"ocplib-json-typed"

View File

@ -7,31 +7,80 @@
(* *)
(**************************************************************************)
class type ma = object
method add_float : float -> unit
method add_int : int -> unit
method get : float
end
open Lwt.Infix
class virtual base ?(init = 0.) () = object (self)
val mutable acc : float = init
method virtual add_float : float -> unit
method add_int x = self#add_float (float_of_int x)
method get = acc
end
module Inttbl = Hashtbl.Make(struct
type t = int
let equal (x: int) (y: int) = x = y
let hash = Hashtbl.hash
end)
class sma ?init () = object
inherit base ?init ()
val mutable i = match init with None -> 0 | _ -> 1
method add_float x =
acc <- (acc +. (x -. acc) /. (float_of_int @@ succ i)) ;
i <- succ i
end
type t = {
id: int;
alpha: int ;
mutable total: int ;
mutable current: int ;
mutable average: int ;
}
class ema ?init ~alpha () = object
inherit base ?init ()
val alpha = alpha
method add_float x =
acc <- alpha *. x +. (1. -. alpha) *. acc
end
let counters = Inttbl.create 51
let updated = Lwt_condition.create ()
let update_hook = ref []
let on_update f = update_hook := f :: !update_hook
let worker_loop () =
let prev = ref @@ Mtime.elapsed () in
let rec inner sleep =
sleep >>= fun () ->
let sleep = Lwt_unix.sleep 1. in
let now = Mtime.elapsed () in
let elapsed = int_of_float (Mtime.(to_ms now -. to_ms !prev)) in
prev := now;
Inttbl.iter
(fun _ c ->
c.average <-
(c.alpha * c.current) / elapsed + (1000 - c.alpha) * c.average / 1000;
c.current <- 0)
counters ;
List.iter (fun f -> f ()) !update_hook ;
Lwt_condition.broadcast updated () ;
inner sleep
in
inner (Lwt_unix.sleep 1.)
let worker =
lazy begin
Lwt.async begin fun () ->
let (_cancelation, cancel, _on_cancel) = Lwt_utils.canceler () in
Lwt_utils.worker "counter" ~run:worker_loop ~cancel
end
end
let create =
let cpt = ref 0 in
fun ~init ~alpha ->
Lazy.force worker ;
let id = !cpt in
incr cpt ;
assert (0. < alpha && alpha <= 1.) ;
let alpha = int_of_float (1000. *. alpha) in
let c = { id ; alpha ; total = 0 ; current = 0 ; average = init } in
Inttbl.add counters id c ;
c
let add c x =
c.total <- c.total + x ;
c.current <- c.current + x
let destroy c =
Inttbl.remove counters c.id
type stat = {
total: int ;
average: int ;
}
let stat ({ total ; average } : t) : stat =
{ total ; average }

View File

@ -7,28 +7,18 @@
(* *)
(**************************************************************************)
(** Moving averages. The formulas are from Wikipedia
[https://en.wikipedia.org/wiki/Moving_average] *)
type t
class type ma = object
method add_float : float -> unit
method add_int : int -> unit
method get : float
end
(** Common class type for objects computing a cumulative moving
average of some flavor. In a cumulative moving average, the data
arrive in an ordered datum stream, and the user would like to get
the average of all of the data up until the current datum
point. The method [add_float] and [add_int] are used to add the
next datum. The method [get] and [get_exn] are used to compute the
moving average up until the current datum point. *)
val create: init:int -> alpha:float -> t
val destroy: t -> unit
class sma : ?init:float -> unit -> ma
(** [sma ?init ()] is an object that computes the Simple Moving
Average of a datum stream. [SMA(n+1) = SMA(n) + (x_(n+1) / SMA(n))
/ (n+1)] *)
val add: t -> int -> unit
class ema : ?init:float -> alpha:float -> unit -> ma
(** [ema ?init ~alpha ()] is an object that computes the Exponential
Moving Average of a datum stream. [EMA(n+1) = alpha * x_(n+1) +
(1 - alpha) * x_n] *)
val on_update: (unit -> unit) -> unit
val updated: unit Lwt_condition.t
type stat = {
total: int ;
average: int ;
}
val stat: t -> stat

View File

@ -1,5 +1,9 @@
TESTS := data-encoding store context state basic basic.sh
TESTS := \
data-encoding \
store context state \
basic basic.sh \
p2p-io-scheduler
all: test
@ -36,6 +40,7 @@ PACKAGES := \
irmin.unix \
lwt \
lwt.unix \
mtime.os \
ocplib-endian \
ocplib-ocamlres \
ocplib-json-typed.bson \
@ -66,7 +71,7 @@ ${NODELIB} ${CLIENTLIB}:
${MAKE} -C ../src $@
.PHONY: build-test run-test test
build-test: ${addprefix build-test-,${TESTS}} test-p2p
build-test: ${addprefix build-test-,${TESTS}}
run-test:
@$(patsubst %,${MAKE} run-test-% ; , ${TESTS}) \
echo && echo "Success" && echo
@ -177,13 +182,35 @@ clean::
############################################################################
## p2p test program
TEST_P2P_INTFS =
.PHONY:build-test-io_schduler
build-test-p2p-io-scheduler: test-p2p-io-scheduler
run-test-p2p-io-scheduler:
./test-p2p-io-scheduler \
--delay 20 --clients 8 \
--max-upload-speed $$((1 << 18)) \
--max-download-speed $$((1 << 20))
TEST_P2P_IMPLS = \
test_p2p.ml
TEST_P2P_IO_SCHEDULER_IMPLS = \
lib/process.ml \
test_p2p_io_scheduler.ml
${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx}: ${NODELIB}
test-p2p-io-scheduler: ${NODELIB} ${TEST_P2P_IO_SCHEDULER_IMPLS:.ml=.cmx}
ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^
clean::
rm -f test-p2p-io_scheduler
############################################################################
## lwt pipe test program
build-test-lwt-pipe: test-lwt-pipe
TEST_PIPE_IMPLS = \
test_lwt_pipe.ml
${TEST_BASIC_IMPLS:.ml=.cmx}: ${NODELIB}
test-p2p: ${NODELIB} ${TEST_P2P_IMPLS:.ml=.cmx}
test-lwt-pipe: ${NODELIB} ${TEST_PIPE_IMPLS:.ml=.cmx}
ocamlfind ocamlopt -linkall -linkpkg ${OCAMLFLAGS} -o $@ $^
clean::

79
test/lib/process.ml Normal file
View File

@ -0,0 +1,79 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
include Logging.Make (struct let name = "process" end)
open Error_monad
exception Exited of int
let detach ?(prefix = "") f =
Lwt_io.flush_all () >>= fun () ->
match Lwt_unix.fork () with
| 0 ->
Random.self_init () ;
let template = Format.asprintf "%s$(section): $(message)" prefix in
let logger =
Lwt_log.channel
~template ~close_mode:`Keep ~channel:Lwt_io.stderr () in
Logging.init (Manual logger) ;
Lwt_main.run begin
lwt_log_notice "PID: %d" (Unix.getpid ()) >>= fun () ->
f ()
end ;
exit 0
| pid ->
Lwt.catch
(fun () ->
Lwt_unix.waitpid [] pid >>= function
| (_,Lwt_unix.WEXITED 0) ->
Lwt.return_unit
| (_,Lwt_unix.WEXITED n) ->
Lwt.fail (Exited n)
| (_,Lwt_unix.WSIGNALED _)
| (_,Lwt_unix.WSTOPPED _) ->
Lwt.fail Exit)
(function
| Lwt.Canceled ->
Unix.kill pid Sys.sigkill ;
Lwt.return_unit
| exn -> Lwt.fail exn)
let handle_error f =
Lwt.catch
f
(fun exn -> Lwt.return (error_exn exn)) >>= function
| Ok () -> Lwt.return_unit
| Error err ->
lwt_log_error "%a" Error_monad.pp_print_error err >>= fun () ->
exit 1
let rec wait processes =
Lwt.catch
(fun () ->
Lwt.nchoose_split processes >>= function
| (_, []) -> lwt_log_notice "All done!"
| (_, processes) -> wait processes)
(function
| Exited n ->
lwt_log_notice "Early error!" >>= fun () ->
List.iter Lwt.cancel processes ;
Lwt.catch
(fun () -> Lwt.join processes)
(fun _ -> Lwt.return_unit) >>= fun () ->
lwt_log_notice "A process finished with error %d !" n >>= fun () ->
Pervasives.exit n
| exn ->
lwt_log_notice "Unexpected error!%a"
Error_monad.pp_exn exn >>= fun () ->
List.iter Lwt.cancel processes ;
Lwt.catch
(fun () -> Lwt.join processes)
(fun _ -> Lwt.return_unit) >>= fun () ->
Pervasives.exit 2)

15
test/lib/process.mli Normal file
View File

@ -0,0 +1,15 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2017. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open Error_monad
exception Exited of int
val detach: ?prefix:string -> (unit -> unit Lwt.t) -> unit Lwt.t
val handle_error: (unit -> (unit, error list) result Lwt.t) -> unit Lwt.t
val wait: unit Lwt.t list -> unit Lwt.t

View File

@ -1,167 +0,0 @@
open Lwt.Infix
open P2p
include Logging.Make (struct let name = "test-p2p" end)
module Param = struct
let dump_encoding = Data_encoding.(Variable.list (tup2 string string))
type msg =
| Create of string * string
| Update of string * string
| Delete of string
| Dump of (string * string) list
let encodings = [
Encoding { tag = 0x10;
encoding = Data_encoding.(tup2 string string);
wrap = (function (k, v) -> Create (k, v));
unwrap = (function Create (k, v) -> Some (k, v) | _ -> None);
max_length = Some 0x400;
};
Encoding { tag = 0x11;
encoding = Data_encoding.(tup2 string string);
wrap = (function (k, v) -> Update (k, v));
unwrap = (function Create (k, v) -> Some (k, v) | _ -> None);
max_length = Some 0x400;
};
Encoding { tag = 0x12;
encoding = Data_encoding.string;
wrap = (function x -> Delete x);
unwrap = (function Delete x -> Some x | _ -> None);
max_length = Some 0x400;
};
Encoding { tag = 0x13;
encoding = dump_encoding;
wrap = (function x -> Dump x);
unwrap = (function Dump x -> Some x | _ -> None);
max_length = Some 0x10000;
};
]
type metadata = unit
let initial_metadata = ()
let metadata_encoding = Data_encoding.empty
let score () = 0.
let supported_versions = [ { name = "TEST"; major = 0; minor = 0; } ]
end
module Net = Make(Param)
let print_peer_info { Net.gid; addr; port; version = { name; major; minor } } =
Printf.sprintf "%s:%d (%s.%d.%d)" (Ipaddr.to_string addr) port name major minor
let string_of_gid gid = Format.asprintf "%a" pp_gid gid
let net_monitor config limits num_nets net =
let my_gid_str = string_of_gid @@ Net.gid net in
let send_msgs_to_neighbours neighbours =
Lwt_list.iter_p begin fun p ->
let { Net.gid } = Net.peer_info net p in
let remote_gid_str = string_of_gid gid in
Net.send net p (Create (my_gid_str, remote_gid_str)) >>= fun _ ->
lwt_log_notice "(%s) Done sending msg to %s" my_gid_str remote_gid_str
end neighbours >>= fun () ->
lwt_log_notice "(%s) Done sending all msgs." my_gid_str
in
let rec inner () =
let neighbours = Net.peers net in
let nb_neighbours = List.length neighbours in
if nb_neighbours < num_nets - 1 then begin
log_notice "(%s) I have %d peers" my_gid_str nb_neighbours;
Lwt_unix.sleep 1. >>= inner end
else begin
log_notice "(%s) I know all my %d peers" my_gid_str nb_neighbours;
Lwt.async (fun () -> send_msgs_to_neighbours neighbours);
let rec recv_peer_msgs acc =
if List.length acc = num_nets - 1 then begin
(* Print total sent/recv *)
let peers = Net.peers net in
ListLabels.iter peers ~f:begin fun p ->
let pi = Net.peer_info net p in
log_info "%a -> %a %d %d %.2f %.2f" pp_gid (Net.gid net) pp_gid pi.gid
pi.total_sent pi.total_recv pi.current_inflow pi.current_outflow;
end;
ListLabels.iter acc ~f:(fun (k, v) -> log_info "%s %s" k v);
Lwt.return_unit
end
else begin
lwt_log_notice "(%s) recv_peers_msgs: Got %d, need %d"
my_gid_str (List.length acc) (num_nets - 1) >>= fun () ->
Net.recv net >>= function
| p, (Create (their_gid, my_gid)) ->
lwt_log_notice "(%s) Got a message from %s" my_gid_str their_gid >>= fun () ->
recv_peer_msgs ((their_gid, my_gid) :: acc)
| _ -> assert false
end
in
recv_peer_msgs []
end
in inner ()
let range n =
let rec inner acc = function
| -1 -> acc
| n -> inner (n :: acc) (pred n)
in
if n < 0 then invalid_arg "range"
else inner [] (pred n)
let main () =
let incoming_port = ref @@ Some 11732 in
let discovery_port = ref @@ Some 10732 in
let closed_network = ref false in
let max_packet_size = ref 1024 in
let peer_answer_timeout = ref 10. in
let blacklist_time = ref 100. in
let num_networks = ref 0 in
let make_net nb_neighbours n =
let config = {
incoming_port = Utils.map_option !incoming_port ~f:(fun p -> p + n);
discovery_port = !discovery_port;
known_peers = [];
peers_file = "";
closed_network = !closed_network;
}
in
let limits = {
max_message_size = !max_packet_size;
peer_answer_timeout = !peer_answer_timeout;
expected_connections = nb_neighbours;
min_connections = nb_neighbours;
max_connections = nb_neighbours;
blacklist_time = !blacklist_time;
}
in
Net.bootstrap ~config ~limits >|= fun net ->
config, limits, net
in
let spec = Arg.[
"-start-port", Int (fun p -> incoming_port := Some p), " Incoming port";
"-dport", Int (fun p -> discovery_port := Some p), " Discovery port";
"-closed", Set closed_network, " Closed network mode";
"-max-packet-size", Set_int max_packet_size, "int Max size of packets";
"-peer-answer-timeout", Set_float peer_answer_timeout, "float Number of seconds";
"-blacklist-time", Set_float blacklist_time, "float Number of seconds";
"-v", Unit (fun () -> Lwt_log_core.(add_rule "*" Info)), " Log up to info msgs";
"-vv", Unit (fun () -> Lwt_log_core.(add_rule "*" Debug)), " Log up to debug msgs";
]
in
let anon_fun num_peers = num_networks := int_of_string num_peers in
let usage_msg = "Usage: %s <num_peers>.\nArguments are:" in
Arg.parse spec anon_fun usage_msg;
let nets = range !num_networks in
Lwt_list.map_p (make_net (pred !num_networks)) nets >>= fun nets ->
Lwt_list.iter_p (fun (cfg, limits, net) -> net_monitor cfg limits !num_networks net) nets >>= fun () ->
lwt_log_notice "All done!"
let () =
Sys.catch_break true;
try
Lwt_main.run @@ main ()
with _ -> ()

View File

@ -0,0 +1,232 @@
(**************************************************************************)
(* *)
(* Copyright (c) 2014 - 2016. *)
(* Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* All rights reserved. No warranty, explicit or implicit, provided. *)
(* *)
(**************************************************************************)
open Error_monad
open P2p_types
include Logging.Make (struct let name = "test-p2p-io-scheduler" end)
exception Error of error list
let rec listen ?port addr =
let tentative_port =
match port with
| None -> 1024 + Random.int 8192
| Some port -> port in
let uaddr = Ipaddr_unix.V6.to_inet_addr addr in
let main_socket = Lwt_unix.(socket PF_INET6 SOCK_STREAM 0) in
Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ;
Lwt.catch begin fun () ->
Lwt_unix.Versioned.bind_2 main_socket
(ADDR_INET (uaddr, tentative_port)) >>= fun () ->
Lwt_unix.listen main_socket 50 ;
Lwt.return (main_socket, tentative_port)
end begin function
| Unix.Unix_error
((Unix.EADDRINUSE | Unix.EADDRNOTAVAIL), _, _) when port = None ->
listen addr
| exn -> Lwt.fail exn
end
let accept main_socket =
Lwt_unix.accept main_socket >>= fun (fd, sockaddr) ->
return fd
let rec accept_n main_socket n =
if n <= 0 then
return []
else
accept_n main_socket (n-1) >>=? fun acc ->
accept main_socket >>=? fun conn ->
return (conn :: acc)
let connect addr port =
let fd = Lwt_unix.socket PF_INET6 SOCK_STREAM 0 in
let uaddr =
Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in
Lwt_unix.connect fd uaddr >>= fun () ->
return fd
let simple_msgs =
[|
MBytes.create (1 lsl 6) ;
MBytes.create (1 lsl 7) ;
MBytes.create (1 lsl 8) ;
MBytes.create (1 lsl 9) ;
MBytes.create (1 lsl 10) ;
MBytes.create (1 lsl 11) ;
MBytes.create (1 lsl 12) ;
MBytes.create (1 lsl 13) ;
MBytes.create (1 lsl 14) ;
MBytes.create (1 lsl 15) ;
MBytes.create (1 lsl 16) ;
|]
let nb_simple_msgs = Array.length simple_msgs
let receive conn =
let buf = MBytes.create (1 lsl 16) in
let rec loop () =
P2p_io_scheduler.read conn buf >>= function
| Ok _ -> loop ()
| Error [P2p_io_scheduler.Connection_closed] ->
Lwt.return ()
| Error err -> Lwt.fail (Error err)
in
loop ()
let server
?(display_client_stat = true)
?max_download_speed ?read_queue_size ~read_buffer_size
main_socket n =
let sched =
P2p_io_scheduler.create
?max_download_speed
?read_queue_size
~read_buffer_size
() in
Moving_average.on_update begin fun () ->
log_notice "Stat: %a" Stat.pp (P2p_io_scheduler.global_stat sched) ;
if display_client_stat then
P2p_io_scheduler.iter_connection sched
(fun id conn ->
log_notice " client(%d) %a" id Stat.pp (P2p_io_scheduler.stat conn)) ;
end ;
(* Accept and read message until the connection is closed. *)
accept_n main_socket n >>=? fun conns ->
let conns = List.map (P2p_io_scheduler.register sched) conns in
Lwt.join (List.map receive conns) >>= fun () ->
iter_p P2p_io_scheduler.close conns >>=? fun () ->
log_notice "OK %a" Stat.pp (P2p_io_scheduler.global_stat sched) ;
return ()
let max_size ?max_upload_speed () =
match max_upload_speed with
| None -> nb_simple_msgs
| Some max_upload_speed ->
let rec loop n =
if n <= 1 then 1
else if MBytes.length simple_msgs.(n-1) <= max_upload_speed then n
else loop (n - 1)
in
loop nb_simple_msgs
let rec send conn nb_simple_msgs =
Lwt_main.yield () >>= fun () ->
let msg = simple_msgs.(Random.int nb_simple_msgs) in
P2p_io_scheduler.write conn msg >>=? fun () ->
send conn nb_simple_msgs
let client ?max_upload_speed ?write_queue_size addr port time n =
let sched =
P2p_io_scheduler.create
?max_upload_speed ?write_queue_size ~read_buffer_size:(1 lsl 12) () in
connect addr port >>=? fun conn ->
let conn = P2p_io_scheduler.register sched conn in
let nb_simple_msgs = max_size ?max_upload_speed () in
Lwt.pick [ send conn nb_simple_msgs ;
Lwt_unix.sleep time >>= return ] >>=? fun () ->
P2p_io_scheduler.close conn >>=? fun () ->
let stat = P2p_io_scheduler.stat conn in
lwt_log_notice "Client OK %a" Stat.pp stat >>= fun () ->
return ()
let run
?display_client_stat
?max_download_speed ?max_upload_speed
~read_buffer_size ?read_queue_size ?write_queue_size
addr port time n =
Logging.init Stderr ;
listen ?port addr >>= fun (main_socket, port) ->
let server =
Process.detach ~prefix:"server " begin fun () ->
Process.handle_error begin fun () ->
server
?display_client_stat ?max_download_speed
~read_buffer_size ?read_queue_size
main_socket n
end
end in
let client n =
let prefix = Printf.sprintf "client(%d) " n in
Process.detach ~prefix begin fun () ->
Lwt_utils.safe_close main_socket >>= fun () ->
Process.handle_error begin fun () ->
client ?max_upload_speed ?write_queue_size addr port time n
end
end in
Process.wait (server :: List.map client Utils.(1 -- n))
let () = Random.self_init ()
let addr = ref Ipaddr.V6.localhost
let port = ref None
let max_download_speed = ref None
let max_upload_speed = ref None
let read_buffer_size = ref (1 lsl 14)
let read_queue_size = ref (Some 1)
let write_queue_size = ref (Some 1)
let delay = ref 60.
let clients = ref 8
let display_client_stat = ref None
let spec =
Arg.[
"--port", Int (fun p -> port := Some p), " Listening port";
"--addr", String (fun p -> addr := Ipaddr.V6.of_string_exn p),
" Listening addr";
"--max-download-speed", Int (fun i -> max_download_speed := Some i),
" Max download speed in B/s (default: unbounded)";
"--max-upload-speed", Int (fun i -> max_upload_speed := Some i),
" Max upload speed in B/s (default: unbounded)";
"--read-buffer-size", Set_int read_buffer_size,
" Size of the read buffers";
"--read-queue-size", Int (fun i ->
read_queue_size := if i <= 0 then None else Some i),
" Size of the read queue (0=unbounded)";
"--write-queue-size", Int (fun i ->
write_queue_size := if i <= 0 then None else Some i),
" Size of the write queue (0=unbounded)";
"--delay", Set_float delay, " Client execution time.";
"--clients", Set_int clients, " Number of concurrent clients.";
"--hide-clients-stat", Unit (fun () -> display_client_stat := Some false),
" Hide the client bandwidth statistic." ;
"--display_clients_stat", Unit (fun () -> display_client_stat := Some true),
" Display the client bandwidth statistic." ;
]
let () =
let anon_fun num_peers = raise (Arg.Bad "No anonymous argument.") in
let usage_msg = "Usage: %s <num_peers>.\nArguments are:" in
Arg.parse spec anon_fun usage_msg
let () =
Sys.catch_break true ;
Lwt_main.run
(run
?display_client_stat:!display_client_stat
?max_download_speed:!max_download_speed
?max_upload_speed:!max_upload_speed
~read_buffer_size:!read_buffer_size
?read_queue_size:!read_queue_size
?write_queue_size:!write_queue_size
!addr !port !delay !clients)