Shell: fix unhandled exception in io_scheduler
.
This commit is contained in:
parent
cf81497e9d
commit
7e1cc171cc
@ -236,21 +236,25 @@ module Reader = struct
|
|||||||
let size = 6 * (Sys.word_size / 8) + MBytes.length buf in
|
let size = 6 * (Sys.word_size / 8) + MBytes.length buf in
|
||||||
lwt_debug "reading %d bytes from %a"
|
lwt_debug "reading %d bytes from %a"
|
||||||
size Connection_info.pp st.conn.info >>= fun () ->
|
size Connection_info.pp st.conn.info >>= fun () ->
|
||||||
read_message st buf >>|? fun msg ->
|
read_message st buf >>=? fun msg ->
|
||||||
size, msg
|
match msg with
|
||||||
end >>= function
|
| None ->
|
||||||
| Ok (_, None) ->
|
|
||||||
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
|
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
|
||||||
worker_loop st
|
return false
|
||||||
| Ok (size, Some msg) ->
|
| Some msg ->
|
||||||
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
|
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
|
||||||
|
return true
|
||||||
|
end >>= function
|
||||||
|
| Ok true ->
|
||||||
worker_loop st
|
worker_loop st
|
||||||
|
| Ok false ->
|
||||||
|
Lwt.return_unit
|
||||||
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
|
||||||
lwt_debug "connection closed to %a"
|
lwt_debug "connection closed to %a"
|
||||||
Connection_info.pp st.conn.info >>= fun () ->
|
Connection_info.pp st.conn.info >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Error _ as err ->
|
| Error _ as err ->
|
||||||
Lwt_pipe.push st.messages err >>= fun () ->
|
Lwt_pipe.safe_push_now st.messages err ;
|
||||||
Canceler.cancel st.canceler >>= fun () ->
|
Canceler.cancel st.canceler >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
||||||
|
@ -108,6 +108,10 @@ exception Full
|
|||||||
let push_now_exn q elt =
|
let push_now_exn q elt =
|
||||||
if not (push_now q elt) then raise Full
|
if not (push_now q elt) then raise Full
|
||||||
|
|
||||||
|
let safe_push_now q elt =
|
||||||
|
try push_now_exn q elt
|
||||||
|
with _ -> ()
|
||||||
|
|
||||||
let rec pop ({ closed ; queue ; empty ; current_size } as q) =
|
let rec pop ({ closed ; queue ; empty ; current_size } as q) =
|
||||||
if not (Queue.is_empty queue) then
|
if not (Queue.is_empty queue) then
|
||||||
let (elt_size, elt) = Queue.pop queue in
|
let (elt_size, elt) = Queue.pop queue in
|
||||||
|
@ -45,6 +45,10 @@ val push_now_exn : 'a t -> 'a -> unit
|
|||||||
(** [push_now q v] adds [v] at the ends of [q] immediately or
|
(** [push_now q v] adds [v] at the ends of [q] immediately or
|
||||||
raise [Full] if [q] is currently full. *)
|
raise [Full] if [q] is currently full. *)
|
||||||
|
|
||||||
|
val safe_push_now : 'a t -> 'a -> unit
|
||||||
|
(** [safe_push_now q v] may adds [v] at the ends of [q]. It does
|
||||||
|
nothing if the queue is fulled or closed. *)
|
||||||
|
|
||||||
val pop_now : 'a t -> 'a option
|
val pop_now : 'a t -> 'a option
|
||||||
(** [pop_now q] maybe removes and returns the first element in [q] if
|
(** [pop_now q] maybe removes and returns the first element in [q] if
|
||||||
[q] contains at least one element. *)
|
[q] contains at least one element. *)
|
||||||
|
Loading…
Reference in New Issue
Block a user