diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 870ff96ba..0f9ac312f 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -272,6 +272,10 @@ module Make (P: PARAMS) = struct try_send : msg -> bool ; reader : event Lwt_pipe.t ; writer : msg Lwt_pipe.t ; + total_sent : unit -> int ; + total_received : unit -> int ; + inflow : unit -> float ; + outflow : unit -> float ; } type peer_info = { @@ -401,10 +405,11 @@ module Make (P: PARAMS) = struct (* a non exception-based cancelation mechanism *) let cancelation, cancel, on_cancel = Lwt_utils.canceler () in (* a cancelable encrypted reception *) - let recv ?uncrypt buf = + let recv ?received ?uncrypt buf = Lwt.pick [ recv_msg ?uncrypt socket buf ; (cancelation () >>= fun () -> Error_monad.fail Canceled) ] - >>=? fun (_size, message) -> + >>=? fun (size, message) -> + Utils.iter_option received ~f:(fun r -> r := !r + size) ; return message in (* First step: send and receive credentials, makes no difference whether we're trying to connect to a peer or checking an incoming @@ -480,12 +485,27 @@ module Make (P: PARAMS) = struct debug "(%a) connection rejected (bad connection request) from %a:%d" pp_gid my_gid Ipaddr.pp_hum addr port ; cancel () + (* Them we can build the net object and launch the worker. *) and connected buf local_nonce version gid public_key nonce listening_port = + let feed_ma ?(freq=1.) ma counter = + let rec inner old_received = + Lwt_unix.sleep freq >>= fun () -> + let received = !counter in + ma#add_int (received - old_received); + inner received in + Lwt.async (fun () -> inner !counter) + in (* net object state *) let last = ref (Unix.gettimeofday ()) in let local_nonce = ref local_nonce in let remote_nonce = ref nonce in + let received = ref 0 in + let sent = ref 0 in + let received_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in + let sent_ema = new Moving_average.ema ~init:0. ~alpha:0.2 () in + feed_ma received_ema received ; + feed_ma sent_ema sent ; (* net object callbaks *) let last_seen () = !last in let get_nonce nonce = @@ -500,10 +520,15 @@ module Make (P: PARAMS) = struct let send p = Lwt_pipe.push writer p in let try_send p = Lwt_pipe.push_now writer p in let reader = Lwt_pipe.create 2 in + let total_sent () = !sent in + let total_received () = !received in + let inflow () = received_ema#get in + let outflow () = sent_ema#get in (* net object construction *) let peer = { gid ; public_key ; point = (addr, port) ; listening_port ; version ; last_seen ; - disconnect ; send ; try_send ; reader ; writer } in + disconnect ; send ; try_send ; reader ; writer ; + total_sent ; total_received ; inflow ; outflow } in let uncrypt buf = let nonce = get_nonce local_nonce in match Crypto_box.box_open my_secret_key public_key buf nonce with @@ -514,7 +539,7 @@ module Make (P: PARAMS) = struct | Some _ as res -> res in (* The message reception loop. *) let rec receiver () = - recv ~uncrypt buf >>= function + recv ~received ~uncrypt buf >>= function | Error err -> debug "(%a) error receiving: %a" pp_gid my_gid Error_monad.pp_print_error err ;