Shell: Count sent and received bytes in P2p.

This commit is contained in:
Vincent Bernardoff 2016-11-28 23:57:13 +01:00 committed by Grégoire Henry
parent 56a58cc962
commit 158447416b

View File

@ -272,6 +272,10 @@ module Make (P: PARAMS) = struct
try_send : msg -> bool ; try_send : msg -> bool ;
reader : event Lwt_pipe.t ; reader : event Lwt_pipe.t ;
writer : msg Lwt_pipe.t ; writer : msg Lwt_pipe.t ;
total_sent : unit -> int ;
total_received : unit -> int ;
inflow : unit -> float ;
outflow : unit -> float ;
} }
type peer_info = { type peer_info = {
@ -401,10 +405,11 @@ module Make (P: PARAMS) = struct
(* a non exception-based cancelation mechanism *) (* a non exception-based cancelation mechanism *)
let cancelation, cancel, on_cancel = Lwt_utils.canceler () in let cancelation, cancel, on_cancel = Lwt_utils.canceler () in
(* a cancelable encrypted reception *) (* a cancelable encrypted reception *)
let recv ?uncrypt buf = let recv ?received ?uncrypt buf =
Lwt.pick [ recv_msg ?uncrypt socket buf ; Lwt.pick [ recv_msg ?uncrypt socket buf ;
(cancelation () >>= fun () -> Error_monad.fail Canceled) ] (cancelation () >>= fun () -> Error_monad.fail Canceled) ]
>>=? fun (_size, message) -> >>=? fun (size, message) ->
Utils.iter_option received ~f:(fun r -> r := !r + size) ;
return message in return message in
(* First step: send and receive credentials, makes no difference (* First step: send and receive credentials, makes no difference
whether we're trying to connect to a peer or checking an incoming whether we're trying to connect to a peer or checking an incoming
@ -480,12 +485,27 @@ module Make (P: PARAMS) = struct
debug "(%a) connection rejected (bad connection request) from %a:%d" debug "(%a) connection rejected (bad connection request) from %a:%d"
pp_gid my_gid Ipaddr.pp_hum addr port ; pp_gid my_gid Ipaddr.pp_hum addr port ;
cancel () cancel ()
(* Them we can build the net object and launch the worker. *) (* Them we can build the net object and launch the worker. *)
and connected buf local_nonce version gid public_key nonce listening_port = and connected buf local_nonce version gid public_key nonce listening_port =
let feed_ma ?(freq=1.) ma counter =
let rec inner old_received =
Lwt_unix.sleep freq >>= fun () ->
let received = !counter in
ma#add_int (received - old_received);
inner received in
Lwt.async (fun () -> inner !counter)
in
(* net object state *) (* net object state *)
let last = ref (Unix.gettimeofday ()) in let last = ref (Unix.gettimeofday ()) in
let local_nonce = ref local_nonce in let local_nonce = ref local_nonce in
let remote_nonce = ref nonce in let remote_nonce = ref nonce in
let received = ref 0 in
let sent = ref 0 in
let received_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in
let sent_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in
feed_ma received_ema received ;
feed_ma sent_ema sent ;
(* net object callbaks *) (* net object callbaks *)
let last_seen () = !last in let last_seen () = !last in
let get_nonce nonce = let get_nonce nonce =
@ -500,10 +520,15 @@ module Make (P: PARAMS) = struct
let send p = Lwt_pipe.push writer p in let send p = Lwt_pipe.push writer p in
let try_send p = Lwt_pipe.push_now writer p in let try_send p = Lwt_pipe.push_now writer p in
let reader = Lwt_pipe.create 2 in let reader = Lwt_pipe.create 2 in
let total_sent () = !sent in
let total_received () = !received in
let inflow () = received_ema#get in
let outflow () = sent_ema#get in
(* net object construction *) (* net object construction *)
let peer = { gid ; public_key ; point = (addr, port) ; let peer = { gid ; public_key ; point = (addr, port) ;
listening_port ; version ; last_seen ; listening_port ; version ; last_seen ;
disconnect ; send ; try_send ; reader ; writer } in disconnect ; send ; try_send ; reader ; writer ;
total_sent ; total_received ; inflow ; outflow } in
let uncrypt buf = let uncrypt buf =
let nonce = get_nonce local_nonce in let nonce = get_nonce local_nonce in
match Crypto_box.box_open my_secret_key public_key buf nonce with match Crypto_box.box_open my_secret_key public_key buf nonce with
@ -514,7 +539,7 @@ module Make (P: PARAMS) = struct
| Some _ as res -> res in | Some _ as res -> res in
(* The message reception loop. *) (* The message reception loop. *)
let rec receiver () = let rec receiver () =
recv ~uncrypt buf >>= function recv ~received ~uncrypt buf >>= function
| Error err -> | Error err ->
debug "(%a) error receiving: %a" debug "(%a) error receiving: %a"
pp_gid my_gid Error_monad.pp_print_error err ; pp_gid my_gid Error_monad.pp_print_error err ;