diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index 22caf3542..80f87c90d 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -93,7 +93,7 @@ let init_logger ?verbosity (log_config : Node_config_file.log) = let init_node ?sandbox (config : Node_config_file.t) = let patch_context json ctxt = let module Proto = (val Registred_protocol.get_exn genesis.protocol) in - Lwt_utils_unix.protect begin fun () -> + protect begin fun () -> Proto.configure_sandbox ctxt json end >|= function | Error err -> diff --git a/src/lib_error_monad/error_monad.ml b/src/lib_error_monad/error_monad.ml index e5dcf76b3..fd66f785a 100644 --- a/src/lib_error_monad/error_monad.ml +++ b/src/lib_error_monad/error_monad.ml @@ -474,12 +474,6 @@ module Make() = struct else Format.kasprintf (fun msg -> fail (Assert_error (loc, msg))) fmt - - let protect ~on_error t = - t >>= function - | Ok res -> return res - | Error err -> on_error err - end include Make() @@ -501,15 +495,6 @@ let record_trace_exn exn f = record_trace (Exn exn) f let failure fmt = Format.kasprintf (fun str -> Exn (Failure str)) fmt - -let protect ?on_error t = - Lwt.catch t (fun exn -> fail (Exn exn)) >>= function - | Ok res -> return res - | Error err -> - match on_error with - | Some f -> f err - | None -> Lwt.return (Error err) - let pp_exn ppf exn = pp ppf (Exn exn) let () = @@ -527,3 +512,49 @@ let () = | Exn exn -> Some (Printexc.to_string exn) | _ -> None) (fun msg -> Exn (Failure msg)) + +type error += Canceled + +let protect ?on_error ?canceler t = + let cancelation = + match canceler with + | None -> Lwt_utils.never_ending + | Some canceler -> + (Lwt_canceler.cancelation canceler >>= fun () -> + fail Canceled ) in + let res = + Lwt.pick [ cancelation ; + Lwt.catch t (fun exn -> fail (Exn exn)) ] in + res >>= function + | Ok _ -> res + | Error err -> + let canceled = + Option.unopt_map canceler ~default:false ~f:Lwt_canceler.canceled in + let err = if canceled then [Canceled] else err in + match on_error with + | None -> Lwt.return (Error err) + | Some on_error -> + Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn)) + +type error += Timeout + +let () = + register_error_kind + `Temporary + ~id:"utils.Timeout" + ~title:"Timeout" + ~description:"Timeout" + Data_encoding.unit + (function Timeout -> Some () | _ -> None) + (fun () -> Timeout) + +let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f = + let target = f canceler in + Lwt.choose [ timeout ; (target >|= fun _ -> ()) ] >>= fun () -> + if Lwt.state target <> Lwt.Sleep then begin + Lwt.cancel timeout ; + target + end else begin + Lwt_canceler.cancel canceler >>= fun () -> + fail Timeout + end diff --git a/src/lib_error_monad/error_monad.mli b/src/lib_error_monad/error_monad.mli index 4ebc9c960..b344504c3 100644 --- a/src/lib_error_monad/error_monad.mli +++ b/src/lib_error_monad/error_monad.mli @@ -28,10 +28,6 @@ val failwith : ('a, Format.formatter, unit, 'b tzresult Lwt.t) format4 -> 'a -val protect : - ?on_error: (error list -> 'a tzresult Lwt.t) -> - (unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t - val error_exn : exn -> 'a tzresult val record_trace_exn : exn -> 'a tzresult -> 'a tzresult val trace_exn : exn -> 'b tzresult Lwt.t -> 'b tzresult Lwt.t @@ -45,6 +41,18 @@ val failure : ('a, Format.formatter, unit, error) format4 -> 'a type error += Exn of exn type error += Unclassified of string +type error += Canceled + +val protect : + ?on_error:(error list -> 'a tzresult Lwt.t) -> + ?canceler:Lwt_canceler.t -> + (unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t + +type error += Timeout +val with_timeout: + ?canceler:Lwt_canceler.t -> + unit Lwt.t -> (Lwt_canceler.t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t + module Make() : Error_monad_sig.S (**/**) diff --git a/src/lib_error_monad/error_monad_sig.ml b/src/lib_error_monad/error_monad_sig.ml index fff53379e..420fa03c8 100644 --- a/src/lib_error_monad/error_monad_sig.ml +++ b/src/lib_error_monad/error_monad_sig.ml @@ -109,10 +109,6 @@ module type S = sig bool -> string -> ('a, Format.formatter, unit, unit tzresult Lwt.t) format4 -> 'a - val protect : - on_error: (error list -> 'a tzresult Lwt.t) -> - 'a tzresult Lwt.t -> 'a tzresult Lwt.t - (** {2 In-monad list iterators} ********************************************) (** A {!List.iter} in the monad *) diff --git a/src/lib_p2p/p2p_io_scheduler.ml b/src/lib_p2p/p2p_io_scheduler.ml index 41d0d248f..eb316110e 100644 --- a/src/lib_p2p/p2p_io_scheduler.ml +++ b/src/lib_p2p/p2p_io_scheduler.ml @@ -120,7 +120,7 @@ module Scheduler(IO : IO) = struct false, (Queue.pop st.readys_low) in match msg with - | Error [ Lwt_utils_unix.Canceled ] -> + | Error [ Canceled ] -> worker_loop st | Error ([Connection_closed | Exn ( Lwt_pipe.Closed | @@ -140,7 +140,7 @@ module Scheduler(IO : IO) = struct conn.current_push <- begin IO.push conn.out_param msg >>= function | Ok () - | Error [ Lwt_utils_unix.Canceled ] -> + | Error [ Canceled ] -> return () | Error ([Connection_closed | Exn (Unix.Unix_error (EBADF, _, _) | @@ -481,10 +481,10 @@ let close ?timeout conn = | None -> return (Lwt_canceler.cancelation conn.canceler) | Some timeout -> - Lwt_utils_unix.with_timeout - ~canceler:conn.canceler timeout begin fun canceler -> - return (Lwt_canceler.cancelation canceler) - end + with_timeout + ~canceler:conn.canceler + (Lwt_unix.sleep timeout) + (fun canceler -> return (Lwt_canceler.cancelation canceler)) end >>=? fun _ -> conn.write_conn.current_push >>= fun res -> lwt_log_info "<-- close (%d)" conn.id >>= fun () -> diff --git a/src/lib_p2p/p2p_maintenance.ml b/src/lib_p2p/p2p_maintenance.ml index 95946b54c..bf56abe70 100644 --- a/src/lib_p2p/p2p_maintenance.ml +++ b/src/lib_p2p/p2p_maintenance.ml @@ -119,7 +119,7 @@ and too_few_connections st n_connected = discover the local network and then wait *) Option.iter ~f:P2p_discovery.restart st.disco ; P2p_pool.broadcast_bootstrap_msg pool ; - Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> + protect ~canceler:st.canceler begin fun () -> Lwt.pick [ P2p_pool.Pool_event.wait_new_peer pool ; Lwt_unix.sleep 5.0 (* TODO exponential back-off ?? @@ -146,7 +146,7 @@ and too_many_connections st n_connected = let rec worker_loop st = let Pool pool = st.pool in begin - Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> + protect ~canceler:st.canceler begin fun () -> Lwt.pick [ Lwt_unix.sleep 120. ; (* every two minutes *) Lwt_condition.wait st.please_maintain ; (* when asked *) @@ -165,7 +165,7 @@ let rec worker_loop st = end end >>= function | Ok () -> worker_loop st - | Error [ Lwt_utils_unix.Canceled ] -> Lwt.return_unit + | Error [ Canceled ] -> Lwt.return_unit | Error _ -> Lwt.return_unit let run ~connection_timeout bounds pool disco = diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index 9a880324e..13bca7333 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -87,7 +87,7 @@ module Answerer = struct let rec worker_loop st = Lwt_unix.yield () >>= fun () -> - Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> + protect ~canceler:st.canceler begin fun () -> P2p_socket.read st.conn end >>= function | Ok (_, Bootstrap) -> begin @@ -122,7 +122,7 @@ module Answerer = struct (* TODO: Penalize peer... *) Lwt_canceler.cancel st.canceler >>= fun () -> Lwt.return_unit - | Error [Lwt_utils_unix.Canceled] -> + | Error [ Canceled ] -> Lwt.return_unit | Error err -> lwt_log_error "@[Answerer unexpected error:@ %a@]" @@ -568,7 +568,7 @@ let rec connect ~timeout pool point = (active_connections pool <= pool.config.max_connections) Too_many_connections >>=? fun () -> let canceler = Lwt_canceler.create () in - Lwt_utils_unix.with_timeout ~canceler timeout begin fun canceler -> + with_timeout ~canceler (Lwt_unix.sleep timeout) begin fun canceler -> let point_info = register_point pool pool.config.identity.peer_id point in let addr, port as point = P2p_point_state.Info.point point_info in @@ -581,7 +581,7 @@ let rec connect ~timeout pool point = let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in lwt_debug "connect: %a" P2p_point.Id.pp point >>= fun () -> - Lwt_utils_unix.protect ~canceler begin fun () -> + protect ~canceler begin fun () -> log pool (Outgoing_connection point) ; Lwt_unix.connect fd uaddr >>= fun () -> return () @@ -603,7 +603,7 @@ and authenticate pool ?point_info canceler fd point = lwt_debug "authenticate: %a%s" P2p_point.Id.pp point (if incoming then " incoming" else "") >>= fun () -> - Lwt_utils_unix.protect ~canceler begin fun () -> + protect ~canceler begin fun () -> P2p_socket.authenticate ~proof_of_work_target:pool.config.proof_of_work_target ~incoming (P2p_io_scheduler.register pool.io_sched fd) point @@ -612,7 +612,7 @@ and authenticate pool ?point_info canceler fd point = end ~on_error: begin fun err -> (* TODO do something when the error is Not_enough_proof_of_work ?? *) begin match err with - | [ Lwt_utils_unix.Canceled ] -> + | [ Canceled ] -> (* Currently only on time out *) lwt_debug "authenticate: %a%s -> canceled" P2p_point.Id.pp point @@ -682,7 +682,7 @@ and authenticate pool ?point_info canceler fd point = lwt_debug "authenticate: %a -> accept %a" P2p_point.Id.pp point P2p_connection.Info.pp info >>= fun () -> - Lwt_utils_unix.protect ~canceler begin fun () -> + protect ~canceler begin fun () -> P2p_socket.accept ?incoming_message_queue_size:pool.config.incoming_message_queue_size ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size @@ -881,7 +881,7 @@ and swap pool conn current_peer_id new_point = pool.latest_accepted_swap <- pool.latest_succesfull_swap ; log pool (Swap_failure { source = source_peer_id }) ; match err with - | [ Lwt_utils_unix.Timeout ] -> + | [ Timeout ] -> lwt_debug "Swap to %a was interupted: %a" P2p_point.Id.pp new_point pp_print_error err | _ -> @@ -898,8 +898,8 @@ let accept pool fd point = let canceler = Lwt_canceler.create () in P2p_point.Table.add pool.incoming point canceler ; Lwt.async begin fun () -> - Lwt_utils_unix.with_timeout - ~canceler pool.config.authentification_timeout + with_timeout + ~canceler (Lwt_unix.sleep pool.config.authentification_timeout) (fun canceler -> authenticate pool canceler fd point) end diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index f6fa85bd3..bde328077 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -249,7 +249,7 @@ module Reader = struct lwt_debug "[read_message] incremental decoding error" >>= fun () -> return None | Await decode_next_buf -> - Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> + protect ~canceler:st.canceler begin fun () -> Crypto.read_chunk st.conn.fd st.conn.cryptobox_data end >>=? fun buf -> lwt_debug @@ -265,12 +265,12 @@ module Reader = struct read_message st init_mbytes >>=? fun msg -> match msg with | None -> - Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> + protect ~canceler:st.canceler begin fun () -> Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () -> return None end | Some (msg, size, rem_mbytes) -> - Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> + protect ~canceler:st.canceler begin fun () -> Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> return (Some rem_mbytes) end @@ -280,7 +280,7 @@ module Reader = struct | Ok None -> Lwt_canceler.cancel st.canceler >>= fun () -> Lwt.return_unit - | Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Canceled | Exn Lwt_pipe.Closed] -> lwt_debug "connection closed to %a" P2p_connection.Info.pp st.conn.info >>= fun () -> Lwt.return_unit @@ -331,7 +331,7 @@ module Writer = struct let rec loop = function | [] -> return () | buf :: l -> - Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> + protect ~canceler:st.canceler begin fun () -> Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf end >>=? fun () -> lwt_debug "writing %d bytes to %a" @@ -345,10 +345,10 @@ module Writer = struct let rec worker_loop st = Lwt_unix.yield () >>= fun () -> - Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> + protect ~canceler:st.canceler begin fun () -> Lwt_pipe.pop st.messages >>= return end >>= function - | Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Canceled | Exn Lwt_pipe.Closed] -> lwt_debug "connection closed to %a" P2p_connection.Info.pp st.conn.info >>= fun () -> Lwt.return_unit @@ -370,7 +370,7 @@ module Writer = struct Lwt.wakeup_later u (Error [P2p_io_scheduler.Connection_closed])) ; match err with - | [ Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed ] -> + | [ Canceled | Exn Lwt_pipe.Closed ] -> lwt_debug "connection closed to %a" P2p_connection.Info.pp st.conn.info >>= fun () -> Lwt.return_unit @@ -453,7 +453,7 @@ let info { conn } = conn.info let accept ?incoming_message_queue_size ?outgoing_message_queue_size ?binary_chunks_size (fd, info, cryptobox_data) encoding = - Lwt_utils_unix.protect begin fun () -> + protect begin fun () -> Ack.write fd cryptobox_data Ack >>=? fun () -> Ack.read fd cryptobox_data end ~on_error:begin fun err -> diff --git a/src/lib_p2p/p2p_welcome.ml b/src/lib_p2p/p2p_welcome.ml index a0b541591..213b7c9e6 100644 --- a/src/lib_p2p/p2p_welcome.ml +++ b/src/lib_p2p/p2p_welcome.ml @@ -21,7 +21,7 @@ type t = { let rec worker_loop st = let Pool pool = st.pool in Lwt_unix.yield () >>= fun () -> - Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> + protect ~canceler:st.canceler begin fun () -> Lwt_unix.accept st.socket >>= return end >>= function | Ok (fd, addr) -> @@ -32,7 +32,7 @@ let rec worker_loop st = (Ipaddr_unix.V6.of_inet_addr_exn addr, port) in P2p_pool.accept pool fd point ; worker_loop st - | Error [Lwt_utils_unix.Canceled] -> + | Error [ Canceled ] -> Lwt.return_unit | Error err -> lwt_log_error "@[Unexpected error in the Welcome worker@ %a@]" diff --git a/src/lib_p2p/test/test_p2p_pool.ml b/src/lib_p2p/test/test_p2p_pool.ml index b9ff90c0d..382ce61da 100644 --- a/src/lib_p2p/test/test_p2p_pool.ml +++ b/src/lib_p2p/test/test_p2p_pool.ml @@ -122,8 +122,8 @@ module Simple = struct | Error ([ P2p_pool.Connection_refused | P2p_pool.Pending_connection | P2p_socket.Rejected - | Lwt_utils_unix.Canceled - | Lwt_utils_unix.Timeout + | Canceled + | Timeout | P2p_pool.Rejected _ as err ]) -> lwt_log_info "Connection to %a failed (%a)" P2p_point.Id.pp point @@ -134,9 +134,9 @@ module Simple = struct Format.fprintf ppf "pending connection" | P2p_socket.Rejected -> Format.fprintf ppf "rejected" - | Lwt_utils_unix.Canceled -> + | Canceled -> Format.fprintf ppf "canceled" - | Lwt_utils_unix.Timeout -> + | Timeout -> Format.fprintf ppf "timeout" | P2p_pool.Rejected peer -> Format.fprintf ppf "rejected (%a)" P2p_peer.Id.pp peer diff --git a/src/lib_shell/block_validator.ml b/src/lib_shell/block_validator.ml index 268b1f5ad..f7086c575 100644 --- a/src/lib_shell/block_validator.ml +++ b/src/lib_shell/block_validator.ml @@ -241,7 +241,7 @@ let on_request net_state header.shell.predecessor >>=? fun pred -> get_proto pred hash >>=? fun proto -> (* TODO also protect with [Worker.canceler w]. *) - Lwt_utils_unix.protect ?canceler begin fun () -> + protect ?canceler begin fun () -> apply_block (Distributed_db.net_state net_db) pred proto hash header operations @@ -264,7 +264,7 @@ let on_request end (* TODO catch other temporary error (e.g. system errors) and do not 'commit' them on disk... *) - | Error [Lwt_utils_unix.Canceled | Unavailable_protocol _] as err -> + | Error [Canceled | Unavailable_protocol _] as err -> return err | Error errors -> Worker.protect w begin fun () -> diff --git a/src/lib_shell/bootstrap_pipeline.ml b/src/lib_shell/bootstrap_pipeline.ml index 215a2bad0..891ff8672 100644 --- a/src/lib_shell/bootstrap_pipeline.ml +++ b/src/lib_shell/bootstrap_pipeline.ml @@ -55,7 +55,7 @@ let fetch_step pipeline (step : Block_locator_iterator.step) = lwt_debug "fetching block header %a from peer %a." Block_hash.pp_short hash P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> - Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> + protect ~canceler:pipeline.canceler begin fun () -> Distributed_db.Block_header.fetch ~timeout:pipeline.block_header_timeout pipeline.net_db ~peer:pipeline.peer_id @@ -69,7 +69,7 @@ let fetch_step pipeline (step : Block_locator_iterator.step) = fetch_loop [] step.block step.step >>=? fun headers -> iter_s begin fun header -> - Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> + protect ~canceler:pipeline.canceler begin fun () -> Lwt_pipe.push pipeline.fetched_headers header >>= return end end @@ -87,7 +87,7 @@ let headers_fetch_worker_loop pipeline = P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> Lwt_pipe.close pipeline.fetched_headers ; Lwt.return_unit - | Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] -> Lwt.return_unit | Error [ Distributed_db.Block_header.Timeout bh ] -> lwt_log_info "request for header %a from peer %a timed out." @@ -105,7 +105,7 @@ let headers_fetch_worker_loop pipeline = let rec operations_fetch_worker_loop pipeline = begin Lwt_unix.yield () >>= fun () -> - Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> + protect ~canceler:pipeline.canceler begin fun () -> Lwt_pipe.pop pipeline.fetched_headers >>= return end >>=? fun (hash, header) -> lwt_log_info "fetching operations of block %a from peer %a." @@ -113,7 +113,7 @@ let rec operations_fetch_worker_loop pipeline = P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> map_p (fun i -> - Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> + protect ~canceler:pipeline.canceler begin fun () -> Distributed_db.Operations.fetch ~timeout:pipeline.block_operations_timeout pipeline.net_db ~peer:pipeline.peer_id @@ -123,14 +123,14 @@ let rec operations_fetch_worker_loop pipeline = lwt_log_info "fetched operations of block %a from peer %a." Block_hash.pp_short hash P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> - Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> + protect ~canceler:pipeline.canceler begin fun () -> Lwt_pipe.push pipeline.fetched_blocks (hash, header, operations) >>= return end end >>= function | Ok () -> operations_fetch_worker_loop pipeline - | Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] -> Lwt_pipe.close pipeline.fetched_blocks ; Lwt.return_unit | Error [ Distributed_db.Operations.Timeout (bh, n) ] -> @@ -149,13 +149,13 @@ let rec operations_fetch_worker_loop pipeline = let rec validation_worker_loop pipeline = begin Lwt_unix.yield () >>= fun () -> - Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> + protect ~canceler:pipeline.canceler begin fun () -> Lwt_pipe.pop pipeline.fetched_blocks >>= return end >>=? fun (hash, header, operations) -> lwt_log_info "requesting validation for block %a from peer %a." Block_hash.pp_short hash P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> - Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> + protect ~canceler:pipeline.canceler begin fun () -> Block_validator.validate ~canceler:pipeline.canceler ~notify_new_block:pipeline.notify_new_block @@ -168,7 +168,7 @@ let rec validation_worker_loop pipeline = return () end >>= function | Ok () -> validation_worker_loop pipeline - | Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] -> Lwt.return_unit | Error ([ Block_validator_errors.Invalid_block _ | Block_validator_errors.Unavailable_protocol _ ] as err ) -> diff --git a/src/lib_shell/distributed_db.ml b/src/lib_shell/distributed_db.ml index d406e00d4..d312cca04 100644 --- a/src/lib_shell/distributed_db.ml +++ b/src/lib_shell/distributed_db.ml @@ -629,7 +629,7 @@ module P2p_reader = struct end let rec worker_loop global_db state = - Lwt_utils_unix.protect ~canceler:state.canceler begin fun () -> + protect ~canceler:state.canceler begin fun () -> P2p.recv global_db.p2p state.conn end >>= function | Ok msg -> diff --git a/src/lib_shell/protocol_validator.ml b/src/lib_shell/protocol_validator.ml index 9fbb78cd7..2bddd9181 100644 --- a/src/lib_shell/protocol_validator.ml +++ b/src/lib_shell/protocol_validator.ml @@ -78,7 +78,7 @@ let () = let rec worker_loop bv = begin - Lwt_utils_unix.protect ~canceler:bv.canceler begin fun () -> + protect ~canceler:bv.canceler begin fun () -> Lwt_pipe.pop bv.messages >>= return end >>=? function Message (request, wakener) -> match request with @@ -115,7 +115,7 @@ let rec worker_loop bv = end >>= function | Ok () -> worker_loop bv - | Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> + | Error [Canceled | Exn Lwt_pipe.Closed] -> lwt_log_notice "terminating" >>= fun () -> Lwt.return_unit | Error err -> diff --git a/src/lib_shell/worker.ml b/src/lib_shell/worker.ml index 85f176565..73fb2bf87 100644 --- a/src/lib_shell/worker.ml +++ b/src/lib_shell/worker.ml @@ -203,9 +203,6 @@ module Make let trigger_shutdown w = Lwt.ignore_result (Lwt_canceler.cancel w.canceler) - let protect { canceler } ?on_error f = - Lwt_utils_unix.protect ?on_error ~canceler f - let canceler { canceler } = canceler let log_event w evt = @@ -272,7 +269,7 @@ module Make Lwt.return_unit in let rec loop () = begin - Lwt_utils_unix.protect ~canceler:w.canceler begin fun () -> + protect ~canceler:w.canceler begin fun () -> pop w end >>=? function | None -> Handlers.on_no_request w @@ -303,7 +300,7 @@ module Make end >>= function | Ok () -> loop () - | Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] -> + | Error [Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] -> Logger.lwt_log_info "[%a] worker terminated" Name.pp w.name >>= fun () -> @@ -413,4 +410,7 @@ module Make (fun n w acc -> (n, w) :: acc) instances [] + let protect { canceler } ?on_error f = + protect ?on_error ~canceler f + end diff --git a/src/lib_stdlib_lwt/lwt_lock_file.ml b/src/lib_stdlib_lwt/lwt_lock_file.ml index 7f86b50b7..df0999d39 100644 --- a/src/lib_stdlib_lwt/lwt_lock_file.ml +++ b/src/lib_stdlib_lwt/lwt_lock_file.ml @@ -34,7 +34,7 @@ let blocking_create create_inner Unix.F_LOCK ~close_on_exec ~unlink_on_exit fn in match timeout with | None -> create () - | Some duration -> Lwt_utils_unix.with_timeout duration (fun _ -> create ()) + | Some duration -> with_timeout (Lwt_unix.sleep duration) (fun _ -> create ()) let is_locked fn = if not @@ Sys.file_exists fn then return false else @@ -50,7 +50,7 @@ let is_locked fn = let get_pid fn = let open Lwt_io in - Lwt_utils_unix.protect begin fun () -> + protect begin fun () -> with_file ~mode:Input fn begin fun ic -> read ic >>= fun content -> return (int_of_string content) diff --git a/src/lib_stdlib_lwt/lwt_utils_unix.ml b/src/lib_stdlib_lwt/lwt_utils_unix.ml index 4cb7fc468..235c7f222 100644 --- a/src/lib_stdlib_lwt/lwt_utils_unix.ml +++ b/src/lib_stdlib_lwt/lwt_utils_unix.ml @@ -116,53 +116,3 @@ let getaddrinfo ~passive ~node ~service = (fun { ai_addr ; _ } -> of_sockaddr ai_addr) addr in Lwt.return points - -open Error_monad - -type error += Canceled - -let protect ?on_error ?canceler t = - let cancelation = - match canceler with - | None -> Lwt_utils.never_ending - | Some canceler -> - (Lwt_canceler.cancelation canceler >>= fun () -> - fail Canceled ) in - let res = - Lwt.pick [ cancelation ; - Lwt.catch t (fun exn -> fail (Exn exn)) ] in - res >>= function - | Ok _ -> res - | Error err -> - let canceled = - Option.unopt_map canceler ~default:false ~f:Lwt_canceler.canceled in - let err = if canceled then [Canceled] else err in - match on_error with - | None -> Lwt.return (Error err) - | Some on_error -> - Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn)) - -type error += Timeout - -let () = - register_error_kind - `Temporary - ~id:"utils.Timeout" - ~title:"Timeout" - ~description:"Timeout" - Data_encoding.unit - (function Timeout -> Some () | _ -> None) - (fun () -> Timeout) - -let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f = - let timeout = Lwt_unix.sleep timeout in - let target = f canceler in - Lwt.choose [ timeout ; (target >|= fun _ -> ()) ] >>= fun () -> - Lwt_unix.yield () >>= fun () -> - if Lwt.state target <> Lwt.Sleep then begin - Lwt.cancel timeout ; - target - end else begin - Lwt_canceler.cancel canceler >>= fun () -> - fail Timeout - end diff --git a/src/lib_stdlib_lwt/lwt_utils_unix.mli b/src/lib_stdlib_lwt/lwt_utils_unix.mli index 7fc566739..ec5143e4e 100644 --- a/src/lib_stdlib_lwt/lwt_utils_unix.mli +++ b/src/lib_stdlib_lwt/lwt_utils_unix.mli @@ -28,18 +28,3 @@ val getaddrinfo: passive:bool -> node:string -> service:string -> (Ipaddr.V6.t * int) list Lwt.t - -open Error_monad - -type error += Canceled - -val protect : - ?on_error:(error list -> 'a tzresult Lwt.t) -> - ?canceler:Lwt_canceler.t -> - (unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t - -type error += Timeout -val with_timeout: - ?canceler:Lwt_canceler.t -> - float -> (Lwt_canceler.t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t -