diff --git a/src/node/net/p2p.ml b/src/node/net/p2p.ml index 2993d7dfe..49451ca57 100644 --- a/src/node/net/p2p.ml +++ b/src/node/net/p2p.ml @@ -9,6 +9,8 @@ include P2p_types +include Logging.Make(struct let name = "p2p" end) + type 'meta meta_config = 'meta P2p_connection_pool.meta_config = { encoding : 'meta Data_encoding.t; initial : 'meta; @@ -195,7 +197,11 @@ module Real = struct P2p_connection_pool.Gids.get_metadata pool conn let rec recv _net conn = - P2p_connection_pool.read conn + P2p_connection_pool.read conn >>=? fun msg -> + lwt_debug "message read from %a" + Connection_info.pp + (P2p_connection_pool.connection_info conn) >>= fun () -> + return msg let rec recv_any net () = let pipes = @@ -214,22 +220,48 @@ module Real = struct | Some conn -> P2p_connection_pool.read conn >>= function | Ok msg -> + lwt_debug "message read from %a" + Connection_info.pp + (P2p_connection_pool.connection_info conn) >>= fun () -> Lwt.return (conn, msg) | Error _ -> + lwt_debug "error reading message from %a" + Connection_info.pp + (P2p_connection_pool.connection_info conn) >>= fun () -> Lwt_unix.yield () >>= fun () -> recv_any net () - let send _net c m = - P2p_connection_pool.write c m >>= function - | Ok () -> Lwt.return_unit - | Error _ -> Lwt.fail End_of_file (* temporary *) + let send _net conn m = + P2p_connection_pool.write conn m >>= function + | Ok () -> + lwt_debug "message sent to %a" + Connection_info.pp + (P2p_connection_pool.connection_info conn) >>= fun () -> + Lwt.return_unit + | Error _ -> + lwt_debug "error sending message from %a" + Connection_info.pp + (P2p_connection_pool.connection_info conn) >>= fun () -> + Lwt.fail End_of_file (* temporary *) - let try_send _net c v = - match P2p_connection_pool.write_now c v with - | Ok v -> v - | Error _ -> false + let try_send _net conn v = + match P2p_connection_pool.write_now conn v with + | Ok v -> + Lwt.ignore_result + (lwt_debug "message trysent to %a" + Connection_info.pp + (P2p_connection_pool.connection_info conn)) ; + v + | Error _ -> + Lwt.ignore_result + (lwt_debug "error trysending message to %a" + Connection_info.pp + (P2p_connection_pool.connection_info conn)) ; + false - let broadcast { pool } msg = P2p_connection_pool.write_all pool msg + let broadcast { pool } msg = + P2p_connection_pool.write_all pool msg ; + Lwt.ignore_result (lwt_debug "message broadcasted") end diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index 35c521cdf..1fd08adea 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -228,6 +228,8 @@ module Reader = struct Lwt_utils.protect ~canceler:st.canceler begin fun () -> Crypto.read_chunk st.conn.fd st.conn.cryptobox_data >>=? fun buf -> let size = 6 * (Sys.word_size / 8) + MBytes.length buf in + lwt_debug "reading %d bytes from %a" + size Connection_info.pp st.conn.info >>= fun () -> read_message st buf >>|? fun msg -> size, msg end >>= function @@ -238,6 +240,8 @@ module Reader = struct Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> worker_loop st | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + lwt_debug "connection closed to %a" + Connection_info.pp st.conn.info >>= fun () -> Lwt.return_unit | Error _ as err -> Lwt_pipe.push st.messages err >>= fun () -> @@ -290,6 +294,8 @@ module Writer = struct Lwt_utils.protect ~canceler:st.canceler begin fun () -> Lwt_pipe.pop st.messages >>= fun (msg, wakener) -> encode_message st msg >>=? fun buf -> + lwt_debug "writing %d bytes to %a" + (MBytes.length buf) Connection_info.pp st.conn.info >>= fun () -> Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf >>= fun res -> iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ; Lwt.return res @@ -297,10 +303,12 @@ module Writer = struct | Ok () -> worker_loop st | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> + lwt_debug "connection closed to %a" + Connection_info.pp st.conn.info >>= fun () -> Lwt.return_unit | Error err -> lwt_log_error - "@[Error while writing to %a@ %a@]" + "@[error writing to %a@ %a@]" Connection_info.pp st.conn.info pp_print_error err >>= fun () -> Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index ef0852f74..68e4727a3 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -478,9 +478,12 @@ let accept pool fd point = (***************************************************************************) -let read { messages } = +let read { messages ; conn } = Lwt.catch - (fun () -> Lwt_pipe.pop messages >>= fun ( _, msg) -> return msg) + (fun () -> Lwt_pipe.pop messages >>= fun (s, msg) -> + lwt_debug "%d bytes message popped from queue %a\027[0m" + s Connection_info.pp (P2p_connection.info conn) >>= fun () -> + return msg) (fun _ (* Closed *) -> fail P2p_io_scheduler.Connection_closed) let is_readable { messages } = diff --git a/src/node/net/p2p_io_scheduler.ml b/src/node/net/p2p_io_scheduler.ml index 216cc4181..25ace0756 100644 --- a/src/node/net/p2p_io_scheduler.ml +++ b/src/node/net/p2p_io_scheduler.ml @@ -59,14 +59,14 @@ module Scheduler(IO : IO) = struct } let cancel (conn : connection) err = - if not conn.closed then begin + Lwt_utils.unless conn.closed begin fun () -> + lwt_debug "Connection closed (%d, %s) " conn.id IO.name >>= fun () -> conn.closed <- true ; Lwt.catch (fun () -> IO.close conn.out_param err) (fun _ -> Lwt.return_unit) >>= fun () -> Canceler.cancel conn.canceler - end else - Lwt.return_unit + end let waiter st conn = assert (Lwt.state conn.current_pop <> Sleep) ; @@ -90,13 +90,15 @@ module Scheduler(IO : IO) = struct if is_empty then Lwt_condition.wait st.readys else Lwt.return_unit let check_quota st = - if st.max_speed <> None && st.quota < 0 then + if st.max_speed <> None && st.quota < 0 then begin + lwt_debug "scheduler.wait_quota(%s)" IO.name >>= fun () -> Lwt_condition.wait st.quota_updated - else + end else Lwt_unix.yield () let rec worker_loop st = check_quota st >>= fun () -> + lwt_debug "scheduler.wait(%s)" IO.name >>= fun () -> Lwt.pick [ Canceler.cancelation st.canceler ; wait_data st @@ -116,10 +118,14 @@ module Scheduler(IO : IO) = struct | Error ([Connection_closed | Exn ( Lwt_pipe.Closed | Unix.Unix_error (EBADF, _, _) )] as err) -> + lwt_debug "Connection closed (pop: %d, %s)" + conn.id IO.name >>= fun () -> cancel conn err >>= fun () -> worker_loop st | Error err -> - lwt_debug "Error %a" pp_print_error err >>= fun () -> + lwt_log_error + "@[Unexpected error in connection (pop: %d, %s):@ %a@]" + conn.id IO.name pp_print_error err >>= fun () -> cancel conn err >>= fun () -> worker_loop st | Ok msg -> @@ -131,14 +137,19 @@ module Scheduler(IO : IO) = struct | Error ([Connection_closed | Exn (Unix.Unix_error (EBADF, _, _) | Lwt_pipe.Closed)] as err) -> + lwt_debug "Connection closed (push: %d, %s)" + conn.id IO.name >>= fun () -> cancel conn err >>= fun () -> return () | Error err -> - lwt_debug "Error %a" pp_print_error err >>= fun () -> + lwt_log_error + "@[Unexpected error in connection (push: %d, %s):@ %a@]" + conn.id IO.name pp_print_error err >>= fun () -> cancel conn err >>= fun () -> Lwt.return (Error err) end ; let len = MBytes.length msg in + lwt_debug "Handle: %d (%d, %s)" len conn.id IO.name >>= fun () -> Moving_average.add st.counter len ; st.quota <- st.quota - len ; Moving_average.add conn.counter len ; @@ -164,6 +175,7 @@ module Scheduler(IO : IO) = struct st let create_connection st in_param out_param canceler id = + debug "scheduler(%s).create_connection (%d)" IO.name id ; let conn = { id ; closed = false ; canceler ; @@ -177,6 +189,7 @@ module Scheduler(IO : IO) = struct conn let update_quota st = + debug "scheduler(%s).update_quota" IO.name ; iter_option st.max_speed ~f:begin fun quota -> st.quota <- (min st.quota 0) + quota ; Lwt_condition.broadcast st.quota_updated () @@ -195,8 +208,12 @@ module Scheduler(IO : IO) = struct end let shutdown st = + lwt_debug "--> scheduler(%s).shutdown" IO.name >>= fun () -> Canceler.cancel st.canceler >>= fun () -> - st.worker + st.worker >>= fun () -> + lwt_debug "<-- scheduler(%s).shutdown" IO.name >>= fun () -> + Lwt.return_unit + end @@ -276,6 +293,7 @@ and t = { } let reset_quota st = + debug "--> reset quota" ; let { Moving_average.average = current_inflow } = Moving_average.stat st.read_scheduler.counter and { Moving_average.average = current_outflow } = @@ -302,6 +320,7 @@ let create ?read_queue_size ?write_queue_size ~read_buffer_size () = + log_info "--> create" ; let st = { closed = false ; connected = Inttbl.create 53 ; @@ -361,6 +380,7 @@ let register = partial_read = None ; } in Inttbl.add st.connected id conn ; + log_info "--> register (%d)" conn.id ; conn end @@ -444,6 +464,7 @@ let stat { read_conn ; write_conn} = convert ~rs ~ws let close ?timeout conn = + lwt_log_info "--> close (%d)" conn.id >>= fun () -> Inttbl.remove conn.sched.connected conn.id ; Lwt_pipe.close conn.write_queue ; begin @@ -457,6 +478,7 @@ let close ?timeout conn = end end >>=? fun _ -> conn.write_conn.current_push >>= fun res -> + lwt_log_info "<-- close (%d)" conn.id >>= fun () -> Lwt.return res let iter_connection { connected } f = @@ -471,4 +493,5 @@ let shutdown ?timeout st = st.connected Lwt.return_unit >>= fun () -> WriteScheduler.shutdown st.write_scheduler >>= fun () -> + lwt_log_info "<-- shutdown" >>= fun () -> Lwt.return_unit