From 413bddcd96a2981046f79bf423ae0f6ddc5dac01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 10 Apr 2017 00:10:42 +0200 Subject: [PATCH] Shell/P2p: propagate all errors to `{raw_,}write_sync`. Those functions are only used in the testsuite. --- src/node/net/p2p_connection.ml | 47 ++++++++++++++++++++++++----- src/node/net/p2p_connection_pool.ml | 2 +- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index b4e439f9d..68a059932 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -303,15 +303,8 @@ module Writer = struct let rec worker_loop st = Lwt_unix.yield () >>= fun () -> Lwt_utils.protect ~canceler:st.canceler begin fun () -> - Lwt_pipe.pop st.messages >>= fun (buf, wakener) -> - lwt_debug "writing %d bytes to %a" - (MBytes.length buf) Connection_info.pp st.conn.info >>= fun () -> - Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf >>= fun res -> - iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ; - Lwt.return res + Lwt_pipe.pop st.messages >>= return end >>= function - | Ok () -> - worker_loop st | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> lwt_debug "connection closed to %a" Connection_info.pp st.conn.info >>= fun () -> @@ -322,6 +315,38 @@ module Writer = struct Connection_info.pp st.conn.info pp_print_error err >>= fun () -> Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit + | Ok (buf, wakener) -> + lwt_debug "writing %d bytes to %a" + (MBytes.length buf) Connection_info.pp st.conn.info >>= fun () -> + Lwt_utils.protect ~canceler:st.canceler begin fun () -> + Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf + end >>= fun res -> + match res with + | Ok () -> + iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ; + worker_loop st + | Error err -> + iter_option wakener + ~f:(fun u -> + Lwt.wakeup_later u + (Error [P2p_io_scheduler.Connection_closed])) ; + match err with + | [ Lwt_utils.Canceled | Exn Lwt_pipe.Closed ] -> + lwt_debug "connection closed to %a" + Connection_info.pp st.conn.info >>= fun () -> + Lwt.return_unit + | [ P2p_io_scheduler.Connection_closed ] -> + lwt_debug "connection closed to %a" + Connection_info.pp st.conn.info >>= fun () -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit + | err -> + lwt_log_error + "@[error writing to %a@ %a@]" + Connection_info.pp st.conn.info + pp_print_error err >>= fun () -> + Canceler.cancel st.canceler >>= fun () -> + Lwt.return_unit let run ?size conn encoding canceler = let compute_size = function @@ -336,6 +361,11 @@ module Writer = struct } in Canceler.on_cancel st.canceler begin fun () -> Lwt_pipe.close st.messages ; + while not (Lwt_pipe.is_empty st.messages) do + let _, w = Lwt_pipe.pop_now_exn st.messages in + iter_option w + ~f:(fun u -> Lwt.wakeup_later u (Error [Exn Lwt_pipe.Closed])) + done ; Lwt.return_unit end ; st.worker <- @@ -451,3 +481,4 @@ let close ?(wait = false) st = Writer.shutdown st.writer >>= fun () -> P2p_io_scheduler.close st.conn.fd >>= fun _ -> Lwt.return_unit + diff --git a/src/node/net/p2p_connection_pool.ml b/src/node/net/p2p_connection_pool.ml index ad5abb600..22e4564a9 100644 --- a/src/node/net/p2p_connection_pool.ml +++ b/src/node/net/p2p_connection_pool.ml @@ -117,7 +117,7 @@ module Answerer = struct | Ok (size, Message msg) -> st.callback.message size msg >>= fun () -> worker_loop st - | Ok (_, Disconnect)| Error [P2p_io_scheduler.Connection_closed] -> + | Ok (_, Disconnect) | Error [P2p_io_scheduler.Connection_closed] -> Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit | Error [P2p_connection.Decoding_error] ->