P2p: more debug traces.

This commit is contained in:
Grégoire Henry 2017-02-13 13:37:57 +01:00
parent 48da8299a6
commit 55e2429758
4 changed files with 87 additions and 21 deletions

View File

@ -9,6 +9,8 @@
include P2p_types include P2p_types
include Logging.Make(struct let name = "p2p" end)
type 'meta meta_config = 'meta P2p_connection_pool.meta_config = { type 'meta meta_config = 'meta P2p_connection_pool.meta_config = {
encoding : 'meta Data_encoding.t; encoding : 'meta Data_encoding.t;
initial : 'meta; initial : 'meta;
@ -195,7 +197,11 @@ module Real = struct
P2p_connection_pool.Gids.get_metadata pool conn P2p_connection_pool.Gids.get_metadata pool conn
let rec recv _net conn = let rec recv _net conn =
P2p_connection_pool.read conn P2p_connection_pool.read conn >>=? fun msg ->
lwt_debug "message read from %a"
Connection_info.pp
(P2p_connection_pool.connection_info conn) >>= fun () ->
return msg
let rec recv_any net () = let rec recv_any net () =
let pipes = let pipes =
@ -214,22 +220,48 @@ module Real = struct
| Some conn -> | Some conn ->
P2p_connection_pool.read conn >>= function P2p_connection_pool.read conn >>= function
| Ok msg -> | Ok msg ->
lwt_debug "message read from %a"
Connection_info.pp
(P2p_connection_pool.connection_info conn) >>= fun () ->
Lwt.return (conn, msg) Lwt.return (conn, msg)
| Error _ -> | Error _ ->
lwt_debug "error reading message from %a"
Connection_info.pp
(P2p_connection_pool.connection_info conn) >>= fun () ->
Lwt_unix.yield () >>= fun () -> Lwt_unix.yield () >>= fun () ->
recv_any net () recv_any net ()
let send _net c m = let send _net conn m =
P2p_connection_pool.write c m >>= function P2p_connection_pool.write conn m >>= function
| Ok () -> Lwt.return_unit | Ok () ->
| Error _ -> Lwt.fail End_of_file (* temporary *) lwt_debug "message sent to %a"
Connection_info.pp
(P2p_connection_pool.connection_info conn) >>= fun () ->
Lwt.return_unit
| Error _ ->
lwt_debug "error sending message from %a"
Connection_info.pp
(P2p_connection_pool.connection_info conn) >>= fun () ->
Lwt.fail End_of_file (* temporary *)
let try_send _net c v = let try_send _net conn v =
match P2p_connection_pool.write_now c v with match P2p_connection_pool.write_now conn v with
| Ok v -> v | Ok v ->
| Error _ -> false Lwt.ignore_result
(lwt_debug "message trysent to %a"
Connection_info.pp
(P2p_connection_pool.connection_info conn)) ;
v
| Error _ ->
Lwt.ignore_result
(lwt_debug "error trysending message to %a"
Connection_info.pp
(P2p_connection_pool.connection_info conn)) ;
false
let broadcast { pool } msg = P2p_connection_pool.write_all pool msg let broadcast { pool } msg =
P2p_connection_pool.write_all pool msg ;
Lwt.ignore_result (lwt_debug "message broadcasted")
end end

View File

@ -228,6 +228,8 @@ module Reader = struct
Lwt_utils.protect ~canceler:st.canceler begin fun () -> Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data >>=? fun buf -> Crypto.read_chunk st.conn.fd st.conn.cryptobox_data >>=? fun buf ->
let size = 6 * (Sys.word_size / 8) + MBytes.length buf in let size = 6 * (Sys.word_size / 8) + MBytes.length buf in
lwt_debug "reading %d bytes from %a"
size Connection_info.pp st.conn.info >>= fun () ->
read_message st buf >>|? fun msg -> read_message st buf >>|? fun msg ->
size, msg size, msg
end >>= function end >>= function
@ -238,6 +240,8 @@ module Reader = struct
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
worker_loop st worker_loop st
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
lwt_debug "connection closed to %a"
Connection_info.pp st.conn.info >>= fun () ->
Lwt.return_unit Lwt.return_unit
| Error _ as err -> | Error _ as err ->
Lwt_pipe.push st.messages err >>= fun () -> Lwt_pipe.push st.messages err >>= fun () ->
@ -290,6 +294,8 @@ module Writer = struct
Lwt_utils.protect ~canceler:st.canceler begin fun () -> Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Lwt_pipe.pop st.messages >>= fun (msg, wakener) -> Lwt_pipe.pop st.messages >>= fun (msg, wakener) ->
encode_message st msg >>=? fun buf -> encode_message st msg >>=? fun buf ->
lwt_debug "writing %d bytes to %a"
(MBytes.length buf) Connection_info.pp st.conn.info >>= fun () ->
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf >>= fun res -> Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf >>= fun res ->
iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ; iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ;
Lwt.return res Lwt.return res
@ -297,10 +303,12 @@ module Writer = struct
| Ok () -> | Ok () ->
worker_loop st worker_loop st
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
lwt_debug "connection closed to %a"
Connection_info.pp st.conn.info >>= fun () ->
Lwt.return_unit Lwt.return_unit
| Error err -> | Error err ->
lwt_log_error lwt_log_error
"@[<v 2>Error while writing to %a@ %a@]" "@[<v 2>error writing to %a@ %a@]"
Connection_info.pp st.conn.info pp_print_error err >>= fun () -> Connection_info.pp st.conn.info pp_print_error err >>= fun () ->
Canceler.cancel st.canceler >>= fun () -> Canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit Lwt.return_unit

View File

@ -478,9 +478,12 @@ let accept pool fd point =
(***************************************************************************) (***************************************************************************)
let read { messages } = let read { messages ; conn } =
Lwt.catch Lwt.catch
(fun () -> Lwt_pipe.pop messages >>= fun ( _, msg) -> return msg) (fun () -> Lwt_pipe.pop messages >>= fun (s, msg) ->
lwt_debug "%d bytes message popped from queue %a\027[0m"
s Connection_info.pp (P2p_connection.info conn) >>= fun () ->
return msg)
(fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed) (fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed)
let is_readable { messages } = let is_readable { messages } =

View File

@ -59,14 +59,14 @@ module Scheduler(IO : IO) = struct
} }
let cancel (conn : connection) err = let cancel (conn : connection) err =
if not conn.closed then begin Lwt_utils.unless conn.closed begin fun () ->
lwt_debug "Connection closed (%d, %s) " conn.id IO.name >>= fun () ->
conn.closed <- true ; conn.closed <- true ;
Lwt.catch Lwt.catch
(fun () -> IO.close conn.out_param err) (fun () -> IO.close conn.out_param err)
(fun _ -> Lwt.return_unit) >>= fun () -> (fun _ -> Lwt.return_unit) >>= fun () ->
Canceler.cancel conn.canceler Canceler.cancel conn.canceler
end else end
Lwt.return_unit
let waiter st conn = let waiter st conn =
assert (Lwt.state conn.current_pop <> Sleep) ; assert (Lwt.state conn.current_pop <> Sleep) ;
@ -90,13 +90,15 @@ module Scheduler(IO : IO) = struct
if is_empty then Lwt_condition.wait st.readys else Lwt.return_unit if is_empty then Lwt_condition.wait st.readys else Lwt.return_unit
let check_quota st = let check_quota st =
if st.max_speed <> None && st.quota < 0 then if st.max_speed <> None && st.quota < 0 then begin
lwt_debug "scheduler.wait_quota(%s)" IO.name >>= fun () ->
Lwt_condition.wait st.quota_updated Lwt_condition.wait st.quota_updated
else end else
Lwt_unix.yield () Lwt_unix.yield ()
let rec worker_loop st = let rec worker_loop st =
check_quota st >>= fun () -> check_quota st >>= fun () ->
lwt_debug "scheduler.wait(%s)" IO.name >>= fun () ->
Lwt.pick [ Lwt.pick [
Canceler.cancelation st.canceler ; Canceler.cancelation st.canceler ;
wait_data st wait_data st
@ -116,10 +118,14 @@ module Scheduler(IO : IO) = struct
| Error ([Connection_closed | | Error ([Connection_closed |
Exn ( Lwt_pipe.Closed | Exn ( Lwt_pipe.Closed |
Unix.Unix_error (EBADF, _, _) )] as err) -> Unix.Unix_error (EBADF, _, _) )] as err) ->
lwt_debug "Connection closed (pop: %d, %s)"
conn.id IO.name >>= fun () ->
cancel conn err >>= fun () -> cancel conn err >>= fun () ->
worker_loop st worker_loop st
| Error err -> | Error err ->
lwt_debug "Error %a" pp_print_error err >>= fun () -> lwt_log_error
"@[Unexpected error in connection (pop: %d, %s):@ %a@]"
conn.id IO.name pp_print_error err >>= fun () ->
cancel conn err >>= fun () -> cancel conn err >>= fun () ->
worker_loop st worker_loop st
| Ok msg -> | Ok msg ->
@ -131,14 +137,19 @@ module Scheduler(IO : IO) = struct
| Error ([Connection_closed | | Error ([Connection_closed |
Exn (Unix.Unix_error (EBADF, _, _) | Exn (Unix.Unix_error (EBADF, _, _) |
Lwt_pipe.Closed)] as err) -> Lwt_pipe.Closed)] as err) ->
lwt_debug "Connection closed (push: %d, %s)"
conn.id IO.name >>= fun () ->
cancel conn err >>= fun () -> cancel conn err >>= fun () ->
return () return ()
| Error err -> | Error err ->
lwt_debug "Error %a" pp_print_error err >>= fun () -> lwt_log_error
"@[Unexpected error in connection (push: %d, %s):@ %a@]"
conn.id IO.name pp_print_error err >>= fun () ->
cancel conn err >>= fun () -> cancel conn err >>= fun () ->
Lwt.return (Error err) Lwt.return (Error err)
end ; end ;
let len = MBytes.length msg in let len = MBytes.length msg in
lwt_debug "Handle: %d (%d, %s)" len conn.id IO.name >>= fun () ->
Moving_average.add st.counter len ; Moving_average.add st.counter len ;
st.quota <- st.quota - len ; st.quota <- st.quota - len ;
Moving_average.add conn.counter len ; Moving_average.add conn.counter len ;
@ -164,6 +175,7 @@ module Scheduler(IO : IO) = struct
st st
let create_connection st in_param out_param canceler id = let create_connection st in_param out_param canceler id =
debug "scheduler(%s).create_connection (%d)" IO.name id ;
let conn = let conn =
{ id ; closed = false ; { id ; closed = false ;
canceler ; canceler ;
@ -177,6 +189,7 @@ module Scheduler(IO : IO) = struct
conn conn
let update_quota st = let update_quota st =
debug "scheduler(%s).update_quota" IO.name ;
iter_option st.max_speed ~f:begin fun quota -> iter_option st.max_speed ~f:begin fun quota ->
st.quota <- (min st.quota 0) + quota ; st.quota <- (min st.quota 0) + quota ;
Lwt_condition.broadcast st.quota_updated () Lwt_condition.broadcast st.quota_updated ()
@ -195,8 +208,12 @@ module Scheduler(IO : IO) = struct
end end
let shutdown st = let shutdown st =
lwt_debug "--> scheduler(%s).shutdown" IO.name >>= fun () ->
Canceler.cancel st.canceler >>= fun () -> Canceler.cancel st.canceler >>= fun () ->
st.worker st.worker >>= fun () ->
lwt_debug "<-- scheduler(%s).shutdown" IO.name >>= fun () ->
Lwt.return_unit
end end
@ -276,6 +293,7 @@ and t = {
} }
let reset_quota st = let reset_quota st =
debug "--> reset quota" ;
let { Moving_average.average = current_inflow } = let { Moving_average.average = current_inflow } =
Moving_average.stat st.read_scheduler.counter Moving_average.stat st.read_scheduler.counter
and { Moving_average.average = current_outflow } = and { Moving_average.average = current_outflow } =
@ -302,6 +320,7 @@ let create
?read_queue_size ?write_queue_size ?read_queue_size ?write_queue_size
~read_buffer_size ~read_buffer_size
() = () =
log_info "--> create" ;
let st = { let st = {
closed = false ; closed = false ;
connected = Inttbl.create 53 ; connected = Inttbl.create 53 ;
@ -361,6 +380,7 @@ let register =
partial_read = None ; partial_read = None ;
} in } in
Inttbl.add st.connected id conn ; Inttbl.add st.connected id conn ;
log_info "--> register (%d)" conn.id ;
conn conn
end end
@ -444,6 +464,7 @@ let stat { read_conn ; write_conn} =
convert ~rs ~ws convert ~rs ~ws
let close ?timeout conn = let close ?timeout conn =
lwt_log_info "--> close (%d)" conn.id >>= fun () ->
Inttbl.remove conn.sched.connected conn.id ; Inttbl.remove conn.sched.connected conn.id ;
Lwt_pipe.close conn.write_queue ; Lwt_pipe.close conn.write_queue ;
begin begin
@ -457,6 +478,7 @@ let close ?timeout conn =
end end
end >>=? fun _ -> end >>=? fun _ ->
conn.write_conn.current_push >>= fun res -> conn.write_conn.current_push >>= fun res ->
lwt_log_info "<-- close (%d)" conn.id >>= fun () ->
Lwt.return res Lwt.return res
let iter_connection { connected } f = let iter_connection { connected } f =
@ -471,4 +493,5 @@ let shutdown ?timeout st =
st.connected st.connected
Lwt.return_unit >>= fun () -> Lwt.return_unit >>= fun () ->
WriteScheduler.shutdown st.write_scheduler >>= fun () -> WriteScheduler.shutdown st.write_scheduler >>= fun () ->
lwt_log_info "<-- shutdown" >>= fun () ->
Lwt.return_unit Lwt.return_unit