Shell/P2p: propagate all errors to {raw_,}write_sync.

Those functions are only used in the testsuite.
This commit is contained in:
Grégoire Henry 2017-04-10 00:10:42 +02:00
parent c187a0b792
commit 413bddcd96
2 changed files with 40 additions and 9 deletions

View File

@ -303,15 +303,8 @@ module Writer = struct
let rec worker_loop st =
Lwt_unix.yield () >>= fun () ->
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Lwt_pipe.pop st.messages >>= fun (buf, wakener) ->
lwt_debug "writing %d bytes to %a"
(MBytes.length buf) Connection_info.pp st.conn.info >>= fun () ->
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf >>= fun res ->
iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ;
Lwt.return res
Lwt_pipe.pop st.messages >>= return
end >>= function
| Ok () ->
worker_loop st
| Error [Lwt_utils.Canceled | Exn Lwt_pipe.Closed] ->
lwt_debug "connection closed to %a"
Connection_info.pp st.conn.info >>= fun () ->
@ -322,6 +315,38 @@ module Writer = struct
Connection_info.pp st.conn.info pp_print_error err >>= fun () ->
Canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
| Ok (buf, wakener) ->
lwt_debug "writing %d bytes to %a"
(MBytes.length buf) Connection_info.pp st.conn.info >>= fun () ->
Lwt_utils.protect ~canceler:st.canceler begin fun () ->
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf
end >>= fun res ->
match res with
| Ok () ->
iter_option wakener ~f:(fun u -> Lwt.wakeup_later u res) ;
worker_loop st
| Error err ->
iter_option wakener
~f:(fun u ->
Lwt.wakeup_later u
(Error [P2p_io_scheduler.Connection_closed])) ;
match err with
| [ Lwt_utils.Canceled | Exn Lwt_pipe.Closed ] ->
lwt_debug "connection closed to %a"
Connection_info.pp st.conn.info >>= fun () ->
Lwt.return_unit
| [ P2p_io_scheduler.Connection_closed ] ->
lwt_debug "connection closed to %a"
Connection_info.pp st.conn.info >>= fun () ->
Canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
| err ->
lwt_log_error
"@[<v 2>error writing to %a@ %a@]"
Connection_info.pp st.conn.info
pp_print_error err >>= fun () ->
Canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
let run ?size conn encoding canceler =
let compute_size = function
@ -336,6 +361,11 @@ module Writer = struct
} in
Canceler.on_cancel st.canceler begin fun () ->
Lwt_pipe.close st.messages ;
while not (Lwt_pipe.is_empty st.messages) do
let _, w = Lwt_pipe.pop_now_exn st.messages in
iter_option w
~f:(fun u -> Lwt.wakeup_later u (Error [Exn Lwt_pipe.Closed]))
done ;
Lwt.return_unit
end ;
st.worker <-
@ -451,3 +481,4 @@ let close ?(wait = false) st =
Writer.shutdown st.writer >>= fun () ->
P2p_io_scheduler.close st.conn.fd >>= fun _ ->
Lwt.return_unit

View File

@ -117,7 +117,7 @@ module Answerer = struct
| Ok (size, Message msg) ->
st.callback.message size msg >>= fun () ->
worker_loop st
| Ok (_, Disconnect)| Error [P2p_io_scheduler.Connection_closed] ->
| Ok (_, Disconnect) | Error [P2p_io_scheduler.Connection_closed] ->
Canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit
| Error [P2p_connection.Decoding_error] ->