From 7e1cc171cc465ac66e0019c4ef67fe4a08ea505d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Mon, 27 Mar 2017 16:40:24 +0200 Subject: [PATCH] Shell: fix unhandled exception in `io_scheduler`. --- src/node/net/p2p_connection.ml | 20 ++++++++++++-------- src/utils/lwt_pipe.ml | 4 ++++ src/utils/lwt_pipe.mli | 4 ++++ 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/node/net/p2p_connection.ml b/src/node/net/p2p_connection.ml index bff35c40e..26ba918db 100644 --- a/src/node/net/p2p_connection.ml +++ b/src/node/net/p2p_connection.ml @@ -236,21 +236,25 @@ module Reader = struct let size = 6 * (Sys.word_size / 8) + MBytes.length buf in lwt_debug "reading %d bytes from %a" size Connection_info.pp st.conn.info >>= fun () -> - read_message st buf >>|? fun msg -> - size, msg + read_message st buf >>=? fun msg -> + match msg with + | None -> + Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () -> + return false + | Some msg -> + Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> + return true end >>= function - | Ok (_, None) -> - Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () -> - worker_loop st - | Ok (size, Some msg) -> - Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> + | Ok true -> worker_loop st + | Ok false -> + Lwt.return_unit | Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] -> lwt_debug "connection closed to %a" Connection_info.pp st.conn.info >>= fun () -> Lwt.return_unit | Error _ as err -> - Lwt_pipe.push st.messages err >>= fun () -> + Lwt_pipe.safe_push_now st.messages err ; Canceler.cancel st.canceler >>= fun () -> Lwt.return_unit diff --git a/src/utils/lwt_pipe.ml b/src/utils/lwt_pipe.ml index 0fede0225..f50caf7a0 100644 --- a/src/utils/lwt_pipe.ml +++ b/src/utils/lwt_pipe.ml @@ -108,6 +108,10 @@ exception Full let push_now_exn q elt = if not (push_now q elt) then raise Full +let safe_push_now q elt = + try push_now_exn q elt + with _ -> () + let rec pop ({ closed ; queue ; empty ; current_size } as q) = if not (Queue.is_empty queue) then let (elt_size, elt) = Queue.pop queue in diff --git a/src/utils/lwt_pipe.mli b/src/utils/lwt_pipe.mli index cbeb995a5..a5d56a151 100644 --- a/src/utils/lwt_pipe.mli +++ b/src/utils/lwt_pipe.mli @@ -45,6 +45,10 @@ val push_now_exn : 'a t -> 'a -> unit (** [push_now q v] adds [v] at the ends of [q] immediately or raise [Full] if [q] is currently full. *) +val safe_push_now : 'a t -> 'a -> unit +(** [safe_push_now q v] may adds [v] at the ends of [q]. It does + nothing if the queue is fulled or closed. *) + val pop_now : 'a t -> 'a option (** [pop_now q] maybe removes and returns the first element in [q] if [q] contains at least one element. *)