Client refactor: Move Lwt_utils_unix.protect into Error_monad

This commit is contained in:
Grégoire Henry 2018-02-08 10:51:01 +01:00
parent 7dc52dcf10
commit 24c6f4ea98
18 changed files with 115 additions and 145 deletions

View File

@ -93,7 +93,7 @@ let init_logger ?verbosity (log_config : Node_config_file.log) =
let init_node ?sandbox (config : Node_config_file.t) = let init_node ?sandbox (config : Node_config_file.t) =
let patch_context json ctxt = let patch_context json ctxt =
let module Proto = (val Registred_protocol.get_exn genesis.protocol) in let module Proto = (val Registred_protocol.get_exn genesis.protocol) in
Lwt_utils_unix.protect begin fun () -> protect begin fun () ->
Proto.configure_sandbox ctxt json Proto.configure_sandbox ctxt json
end >|= function end >|= function
| Error err -> | Error err ->

View File

@ -474,12 +474,6 @@ module Make() = struct
else else
Format.kasprintf (fun msg -> fail (Assert_error (loc, msg))) fmt Format.kasprintf (fun msg -> fail (Assert_error (loc, msg))) fmt
let protect ~on_error t =
t >>= function
| Ok res -> return res
| Error err -> on_error err
end end
include Make() include Make()
@ -501,15 +495,6 @@ let record_trace_exn exn f = record_trace (Exn exn) f
let failure fmt = let failure fmt =
Format.kasprintf (fun str -> Exn (Failure str)) fmt Format.kasprintf (fun str -> Exn (Failure str)) fmt
let protect ?on_error t =
Lwt.catch t (fun exn -> fail (Exn exn)) >>= function
| Ok res -> return res
| Error err ->
match on_error with
| Some f -> f err
| None -> Lwt.return (Error err)
let pp_exn ppf exn = pp ppf (Exn exn) let pp_exn ppf exn = pp ppf (Exn exn)
let () = let () =
@ -527,3 +512,49 @@ let () =
| Exn exn -> Some (Printexc.to_string exn) | Exn exn -> Some (Printexc.to_string exn)
| _ -> None) | _ -> None)
(fun msg -> Exn (Failure msg)) (fun msg -> Exn (Failure msg))
type error += Canceled
let protect ?on_error ?canceler t =
let cancelation =
match canceler with
| None -> Lwt_utils.never_ending
| Some canceler ->
(Lwt_canceler.cancelation canceler >>= fun () ->
fail Canceled ) in
let res =
Lwt.pick [ cancelation ;
Lwt.catch t (fun exn -> fail (Exn exn)) ] in
res >>= function
| Ok _ -> res
| Error err ->
let canceled =
Option.unopt_map canceler ~default:false ~f:Lwt_canceler.canceled in
let err = if canceled then [Canceled] else err in
match on_error with
| None -> Lwt.return (Error err)
| Some on_error ->
Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn))
type error += Timeout
let () =
register_error_kind
`Temporary
~id:"utils.Timeout"
~title:"Timeout"
~description:"Timeout"
Data_encoding.unit
(function Timeout -> Some () | _ -> None)
(fun () -> Timeout)
let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f =
let target = f canceler in
Lwt.choose [ timeout ; (target >|= fun _ -> ()) ] >>= fun () ->
if Lwt.state target <> Lwt.Sleep then begin
Lwt.cancel timeout ;
target
end else begin
Lwt_canceler.cancel canceler >>= fun () ->
fail Timeout
end

View File

@ -28,10 +28,6 @@ val failwith :
('a, Format.formatter, unit, 'b tzresult Lwt.t) format4 -> ('a, Format.formatter, unit, 'b tzresult Lwt.t) format4 ->
'a 'a
val protect :
?on_error: (error list -> 'a tzresult Lwt.t) ->
(unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t
val error_exn : exn -> 'a tzresult val error_exn : exn -> 'a tzresult
val record_trace_exn : exn -> 'a tzresult -> 'a tzresult val record_trace_exn : exn -> 'a tzresult -> 'a tzresult
val trace_exn : exn -> 'b tzresult Lwt.t -> 'b tzresult Lwt.t val trace_exn : exn -> 'b tzresult Lwt.t -> 'b tzresult Lwt.t
@ -45,6 +41,18 @@ val failure : ('a, Format.formatter, unit, error) format4 -> 'a
type error += Exn of exn type error += Exn of exn
type error += Unclassified of string type error += Unclassified of string
type error += Canceled
val protect :
?on_error:(error list -> 'a tzresult Lwt.t) ->
?canceler:Lwt_canceler.t ->
(unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t
type error += Timeout
val with_timeout:
?canceler:Lwt_canceler.t ->
unit Lwt.t -> (Lwt_canceler.t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t
module Make() : Error_monad_sig.S module Make() : Error_monad_sig.S
(**/**) (**/**)

View File

@ -109,10 +109,6 @@ module type S = sig
bool -> string -> bool -> string ->
('a, Format.formatter, unit, unit tzresult Lwt.t) format4 -> 'a ('a, Format.formatter, unit, unit tzresult Lwt.t) format4 -> 'a
val protect :
on_error: (error list -> 'a tzresult Lwt.t) ->
'a tzresult Lwt.t -> 'a tzresult Lwt.t
(** {2 In-monad list iterators} ********************************************) (** {2 In-monad list iterators} ********************************************)
(** A {!List.iter} in the monad *) (** A {!List.iter} in the monad *)

View File

@ -120,7 +120,7 @@ module Scheduler(IO : IO) = struct
false, (Queue.pop st.readys_low) false, (Queue.pop st.readys_low)
in in
match msg with match msg with
| Error [ Lwt_utils_unix.Canceled ] -> | Error [ Canceled ] ->
worker_loop st worker_loop st
| Error ([Connection_closed | | Error ([Connection_closed |
Exn ( Lwt_pipe.Closed | Exn ( Lwt_pipe.Closed |
@ -140,7 +140,7 @@ module Scheduler(IO : IO) = struct
conn.current_push <- begin conn.current_push <- begin
IO.push conn.out_param msg >>= function IO.push conn.out_param msg >>= function
| Ok () | Ok ()
| Error [ Lwt_utils_unix.Canceled ] -> | Error [ Canceled ] ->
return () return ()
| Error ([Connection_closed | | Error ([Connection_closed |
Exn (Unix.Unix_error (EBADF, _, _) | Exn (Unix.Unix_error (EBADF, _, _) |
@ -481,10 +481,10 @@ let close ?timeout conn =
| None -> | None ->
return (Lwt_canceler.cancelation conn.canceler) return (Lwt_canceler.cancelation conn.canceler)
| Some timeout -> | Some timeout ->
Lwt_utils_unix.with_timeout with_timeout
~canceler:conn.canceler timeout begin fun canceler -> ~canceler:conn.canceler
return (Lwt_canceler.cancelation canceler) (Lwt_unix.sleep timeout)
end (fun canceler -> return (Lwt_canceler.cancelation canceler))
end >>=? fun _ -> end >>=? fun _ ->
conn.write_conn.current_push >>= fun res -> conn.write_conn.current_push >>= fun res ->
lwt_log_info "<-- close (%d)" conn.id >>= fun () -> lwt_log_info "<-- close (%d)" conn.id >>= fun () ->

View File

@ -119,7 +119,7 @@ and too_few_connections st n_connected =
discover the local network and then wait *) discover the local network and then wait *)
Option.iter ~f:P2p_discovery.restart st.disco ; Option.iter ~f:P2p_discovery.restart st.disco ;
P2p_pool.broadcast_bootstrap_msg pool ; P2p_pool.broadcast_bootstrap_msg pool ;
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
Lwt.pick [ Lwt.pick [
P2p_pool.Pool_event.wait_new_peer pool ; P2p_pool.Pool_event.wait_new_peer pool ;
Lwt_unix.sleep 5.0 (* TODO exponential back-off ?? Lwt_unix.sleep 5.0 (* TODO exponential back-off ??
@ -146,7 +146,7 @@ and too_many_connections st n_connected =
let rec worker_loop st = let rec worker_loop st =
let Pool pool = st.pool in let Pool pool = st.pool in
begin begin
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
Lwt.pick [ Lwt.pick [
Lwt_unix.sleep 120. ; (* every two minutes *) Lwt_unix.sleep 120. ; (* every two minutes *)
Lwt_condition.wait st.please_maintain ; (* when asked *) Lwt_condition.wait st.please_maintain ; (* when asked *)
@ -165,7 +165,7 @@ let rec worker_loop st =
end end
end >>= function end >>= function
| Ok () -> worker_loop st | Ok () -> worker_loop st
| Error [ Lwt_utils_unix.Canceled ] -> Lwt.return_unit | Error [ Canceled ] -> Lwt.return_unit
| Error _ -> Lwt.return_unit | Error _ -> Lwt.return_unit
let run ~connection_timeout bounds pool disco = let run ~connection_timeout bounds pool disco =

View File

@ -87,7 +87,7 @@ module Answerer = struct
let rec worker_loop st = let rec worker_loop st =
Lwt_unix.yield () >>= fun () -> Lwt_unix.yield () >>= fun () ->
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
P2p_socket.read st.conn P2p_socket.read st.conn
end >>= function end >>= function
| Ok (_, Bootstrap) -> begin | Ok (_, Bootstrap) -> begin
@ -122,7 +122,7 @@ module Answerer = struct
(* TODO: Penalize peer... *) (* TODO: Penalize peer... *)
Lwt_canceler.cancel st.canceler >>= fun () -> Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit Lwt.return_unit
| Error [Lwt_utils_unix.Canceled] -> | Error [ Canceled ] ->
Lwt.return_unit Lwt.return_unit
| Error err -> | Error err ->
lwt_log_error "@[Answerer unexpected error:@ %a@]" lwt_log_error "@[Answerer unexpected error:@ %a@]"
@ -568,7 +568,7 @@ let rec connect ~timeout pool point =
(active_connections pool <= pool.config.max_connections) (active_connections pool <= pool.config.max_connections)
Too_many_connections >>=? fun () -> Too_many_connections >>=? fun () ->
let canceler = Lwt_canceler.create () in let canceler = Lwt_canceler.create () in
Lwt_utils_unix.with_timeout ~canceler timeout begin fun canceler -> with_timeout ~canceler (Lwt_unix.sleep timeout) begin fun canceler ->
let point_info = let point_info =
register_point pool pool.config.identity.peer_id point in register_point pool pool.config.identity.peer_id point in
let addr, port as point = P2p_point_state.Info.point point_info in let addr, port as point = P2p_point_state.Info.point point_info in
@ -581,7 +581,7 @@ let rec connect ~timeout pool point =
let uaddr = let uaddr =
Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in
lwt_debug "connect: %a" P2p_point.Id.pp point >>= fun () -> lwt_debug "connect: %a" P2p_point.Id.pp point >>= fun () ->
Lwt_utils_unix.protect ~canceler begin fun () -> protect ~canceler begin fun () ->
log pool (Outgoing_connection point) ; log pool (Outgoing_connection point) ;
Lwt_unix.connect fd uaddr >>= fun () -> Lwt_unix.connect fd uaddr >>= fun () ->
return () return ()
@ -603,7 +603,7 @@ and authenticate pool ?point_info canceler fd point =
lwt_debug "authenticate: %a%s" lwt_debug "authenticate: %a%s"
P2p_point.Id.pp point P2p_point.Id.pp point
(if incoming then " incoming" else "") >>= fun () -> (if incoming then " incoming" else "") >>= fun () ->
Lwt_utils_unix.protect ~canceler begin fun () -> protect ~canceler begin fun () ->
P2p_socket.authenticate P2p_socket.authenticate
~proof_of_work_target:pool.config.proof_of_work_target ~proof_of_work_target:pool.config.proof_of_work_target
~incoming (P2p_io_scheduler.register pool.io_sched fd) point ~incoming (P2p_io_scheduler.register pool.io_sched fd) point
@ -612,7 +612,7 @@ and authenticate pool ?point_info canceler fd point =
end ~on_error: begin fun err -> end ~on_error: begin fun err ->
(* TODO do something when the error is Not_enough_proof_of_work ?? *) (* TODO do something when the error is Not_enough_proof_of_work ?? *)
begin match err with begin match err with
| [ Lwt_utils_unix.Canceled ] -> | [ Canceled ] ->
(* Currently only on time out *) (* Currently only on time out *)
lwt_debug "authenticate: %a%s -> canceled" lwt_debug "authenticate: %a%s -> canceled"
P2p_point.Id.pp point P2p_point.Id.pp point
@ -682,7 +682,7 @@ and authenticate pool ?point_info canceler fd point =
lwt_debug "authenticate: %a -> accept %a" lwt_debug "authenticate: %a -> accept %a"
P2p_point.Id.pp point P2p_point.Id.pp point
P2p_connection.Info.pp info >>= fun () -> P2p_connection.Info.pp info >>= fun () ->
Lwt_utils_unix.protect ~canceler begin fun () -> protect ~canceler begin fun () ->
P2p_socket.accept P2p_socket.accept
?incoming_message_queue_size:pool.config.incoming_message_queue_size ?incoming_message_queue_size:pool.config.incoming_message_queue_size
?outgoing_message_queue_size:pool.config.outgoing_message_queue_size ?outgoing_message_queue_size:pool.config.outgoing_message_queue_size
@ -881,7 +881,7 @@ and swap pool conn current_peer_id new_point =
pool.latest_accepted_swap <- pool.latest_succesfull_swap ; pool.latest_accepted_swap <- pool.latest_succesfull_swap ;
log pool (Swap_failure { source = source_peer_id }) ; log pool (Swap_failure { source = source_peer_id }) ;
match err with match err with
| [ Lwt_utils_unix.Timeout ] -> | [ Timeout ] ->
lwt_debug "Swap to %a was interupted: %a" lwt_debug "Swap to %a was interupted: %a"
P2p_point.Id.pp new_point pp_print_error err P2p_point.Id.pp new_point pp_print_error err
| _ -> | _ ->
@ -898,8 +898,8 @@ let accept pool fd point =
let canceler = Lwt_canceler.create () in let canceler = Lwt_canceler.create () in
P2p_point.Table.add pool.incoming point canceler ; P2p_point.Table.add pool.incoming point canceler ;
Lwt.async begin fun () -> Lwt.async begin fun () ->
Lwt_utils_unix.with_timeout with_timeout
~canceler pool.config.authentification_timeout ~canceler (Lwt_unix.sleep pool.config.authentification_timeout)
(fun canceler -> authenticate pool canceler fd point) (fun canceler -> authenticate pool canceler fd point)
end end

View File

@ -249,7 +249,7 @@ module Reader = struct
lwt_debug "[read_message] incremental decoding error" >>= fun () -> lwt_debug "[read_message] incremental decoding error" >>= fun () ->
return None return None
| Await decode_next_buf -> | Await decode_next_buf ->
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
Crypto.read_chunk st.conn.fd st.conn.cryptobox_data Crypto.read_chunk st.conn.fd st.conn.cryptobox_data
end >>=? fun buf -> end >>=? fun buf ->
lwt_debug lwt_debug
@ -265,12 +265,12 @@ module Reader = struct
read_message st init_mbytes >>=? fun msg -> read_message st init_mbytes >>=? fun msg ->
match msg with match msg with
| None -> | None ->
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () -> Lwt_pipe.push st.messages (Error [Decoding_error]) >>= fun () ->
return None return None
end end
| Some (msg, size, rem_mbytes) -> | Some (msg, size, rem_mbytes) ->
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () -> Lwt_pipe.push st.messages (Ok (size, msg)) >>= fun () ->
return (Some rem_mbytes) return (Some rem_mbytes)
end end
@ -280,7 +280,7 @@ module Reader = struct
| Ok None -> | Ok None ->
Lwt_canceler.cancel st.canceler >>= fun () -> Lwt_canceler.cancel st.canceler >>= fun () ->
Lwt.return_unit Lwt.return_unit
| Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> | Error [Canceled | Exn Lwt_pipe.Closed] ->
lwt_debug "connection closed to %a" lwt_debug "connection closed to %a"
P2p_connection.Info.pp st.conn.info >>= fun () -> P2p_connection.Info.pp st.conn.info >>= fun () ->
Lwt.return_unit Lwt.return_unit
@ -331,7 +331,7 @@ module Writer = struct
let rec loop = function let rec loop = function
| [] -> return () | [] -> return ()
| buf :: l -> | buf :: l ->
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf Crypto.write_chunk st.conn.fd st.conn.cryptobox_data buf
end >>=? fun () -> end >>=? fun () ->
lwt_debug "writing %d bytes to %a" lwt_debug "writing %d bytes to %a"
@ -345,10 +345,10 @@ module Writer = struct
let rec worker_loop st = let rec worker_loop st =
Lwt_unix.yield () >>= fun () -> Lwt_unix.yield () >>= fun () ->
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
Lwt_pipe.pop st.messages >>= return Lwt_pipe.pop st.messages >>= return
end >>= function end >>= function
| Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> | Error [Canceled | Exn Lwt_pipe.Closed] ->
lwt_debug "connection closed to %a" lwt_debug "connection closed to %a"
P2p_connection.Info.pp st.conn.info >>= fun () -> P2p_connection.Info.pp st.conn.info >>= fun () ->
Lwt.return_unit Lwt.return_unit
@ -370,7 +370,7 @@ module Writer = struct
Lwt.wakeup_later u Lwt.wakeup_later u
(Error [P2p_io_scheduler.Connection_closed])) ; (Error [P2p_io_scheduler.Connection_closed])) ;
match err with match err with
| [ Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed ] -> | [ Canceled | Exn Lwt_pipe.Closed ] ->
lwt_debug "connection closed to %a" lwt_debug "connection closed to %a"
P2p_connection.Info.pp st.conn.info >>= fun () -> P2p_connection.Info.pp st.conn.info >>= fun () ->
Lwt.return_unit Lwt.return_unit
@ -453,7 +453,7 @@ let info { conn } = conn.info
let accept let accept
?incoming_message_queue_size ?outgoing_message_queue_size ?incoming_message_queue_size ?outgoing_message_queue_size
?binary_chunks_size (fd, info, cryptobox_data) encoding = ?binary_chunks_size (fd, info, cryptobox_data) encoding =
Lwt_utils_unix.protect begin fun () -> protect begin fun () ->
Ack.write fd cryptobox_data Ack >>=? fun () -> Ack.write fd cryptobox_data Ack >>=? fun () ->
Ack.read fd cryptobox_data Ack.read fd cryptobox_data
end ~on_error:begin fun err -> end ~on_error:begin fun err ->

View File

@ -21,7 +21,7 @@ type t = {
let rec worker_loop st = let rec worker_loop st =
let Pool pool = st.pool in let Pool pool = st.pool in
Lwt_unix.yield () >>= fun () -> Lwt_unix.yield () >>= fun () ->
Lwt_utils_unix.protect ~canceler:st.canceler begin fun () -> protect ~canceler:st.canceler begin fun () ->
Lwt_unix.accept st.socket >>= return Lwt_unix.accept st.socket >>= return
end >>= function end >>= function
| Ok (fd, addr) -> | Ok (fd, addr) ->
@ -32,7 +32,7 @@ let rec worker_loop st =
(Ipaddr_unix.V6.of_inet_addr_exn addr, port) in (Ipaddr_unix.V6.of_inet_addr_exn addr, port) in
P2p_pool.accept pool fd point ; P2p_pool.accept pool fd point ;
worker_loop st worker_loop st
| Error [Lwt_utils_unix.Canceled] -> | Error [ Canceled ] ->
Lwt.return_unit Lwt.return_unit
| Error err -> | Error err ->
lwt_log_error "@[<v 2>Unexpected error in the Welcome worker@ %a@]" lwt_log_error "@[<v 2>Unexpected error in the Welcome worker@ %a@]"

View File

@ -122,8 +122,8 @@ module Simple = struct
| Error ([ P2p_pool.Connection_refused | Error ([ P2p_pool.Connection_refused
| P2p_pool.Pending_connection | P2p_pool.Pending_connection
| P2p_socket.Rejected | P2p_socket.Rejected
| Lwt_utils_unix.Canceled | Canceled
| Lwt_utils_unix.Timeout | Timeout
| P2p_pool.Rejected _ as err ]) -> | P2p_pool.Rejected _ as err ]) ->
lwt_log_info "Connection to %a failed (%a)" lwt_log_info "Connection to %a failed (%a)"
P2p_point.Id.pp point P2p_point.Id.pp point
@ -134,9 +134,9 @@ module Simple = struct
Format.fprintf ppf "pending connection" Format.fprintf ppf "pending connection"
| P2p_socket.Rejected -> | P2p_socket.Rejected ->
Format.fprintf ppf "rejected" Format.fprintf ppf "rejected"
| Lwt_utils_unix.Canceled -> | Canceled ->
Format.fprintf ppf "canceled" Format.fprintf ppf "canceled"
| Lwt_utils_unix.Timeout -> | Timeout ->
Format.fprintf ppf "timeout" Format.fprintf ppf "timeout"
| P2p_pool.Rejected peer -> | P2p_pool.Rejected peer ->
Format.fprintf ppf "rejected (%a)" P2p_peer.Id.pp peer Format.fprintf ppf "rejected (%a)" P2p_peer.Id.pp peer

View File

@ -241,7 +241,7 @@ let on_request
net_state header.shell.predecessor >>=? fun pred -> net_state header.shell.predecessor >>=? fun pred ->
get_proto pred hash >>=? fun proto -> get_proto pred hash >>=? fun proto ->
(* TODO also protect with [Worker.canceler w]. *) (* TODO also protect with [Worker.canceler w]. *)
Lwt_utils_unix.protect ?canceler begin fun () -> protect ?canceler begin fun () ->
apply_block apply_block
(Distributed_db.net_state net_db) (Distributed_db.net_state net_db)
pred proto hash header operations pred proto hash header operations
@ -264,7 +264,7 @@ let on_request
end end
(* TODO catch other temporary error (e.g. system errors) (* TODO catch other temporary error (e.g. system errors)
and do not 'commit' them on disk... *) and do not 'commit' them on disk... *)
| Error [Lwt_utils_unix.Canceled | Unavailable_protocol _] as err -> | Error [Canceled | Unavailable_protocol _] as err ->
return err return err
| Error errors -> | Error errors ->
Worker.protect w begin fun () -> Worker.protect w begin fun () ->

View File

@ -55,7 +55,7 @@ let fetch_step pipeline (step : Block_locator_iterator.step) =
lwt_debug "fetching block header %a from peer %a." lwt_debug "fetching block header %a from peer %a."
Block_hash.pp_short hash Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> protect ~canceler:pipeline.canceler begin fun () ->
Distributed_db.Block_header.fetch Distributed_db.Block_header.fetch
~timeout:pipeline.block_header_timeout ~timeout:pipeline.block_header_timeout
pipeline.net_db ~peer:pipeline.peer_id pipeline.net_db ~peer:pipeline.peer_id
@ -69,7 +69,7 @@ let fetch_step pipeline (step : Block_locator_iterator.step) =
fetch_loop [] step.block step.step >>=? fun headers -> fetch_loop [] step.block step.step >>=? fun headers ->
iter_s iter_s
begin fun header -> begin fun header ->
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.push pipeline.fetched_headers header >>= return Lwt_pipe.push pipeline.fetched_headers header >>= return
end end
end end
@ -87,7 +87,7 @@ let headers_fetch_worker_loop pipeline =
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
Lwt_pipe.close pipeline.fetched_headers ; Lwt_pipe.close pipeline.fetched_headers ;
Lwt.return_unit Lwt.return_unit
| Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> | Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] ->
Lwt.return_unit Lwt.return_unit
| Error [ Distributed_db.Block_header.Timeout bh ] -> | Error [ Distributed_db.Block_header.Timeout bh ] ->
lwt_log_info "request for header %a from peer %a timed out." lwt_log_info "request for header %a from peer %a timed out."
@ -105,7 +105,7 @@ let headers_fetch_worker_loop pipeline =
let rec operations_fetch_worker_loop pipeline = let rec operations_fetch_worker_loop pipeline =
begin begin
Lwt_unix.yield () >>= fun () -> Lwt_unix.yield () >>= fun () ->
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.pop pipeline.fetched_headers >>= return Lwt_pipe.pop pipeline.fetched_headers >>= return
end >>=? fun (hash, header) -> end >>=? fun (hash, header) ->
lwt_log_info "fetching operations of block %a from peer %a." lwt_log_info "fetching operations of block %a from peer %a."
@ -113,7 +113,7 @@ let rec operations_fetch_worker_loop pipeline =
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
map_p map_p
(fun i -> (fun i ->
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> protect ~canceler:pipeline.canceler begin fun () ->
Distributed_db.Operations.fetch Distributed_db.Operations.fetch
~timeout:pipeline.block_operations_timeout ~timeout:pipeline.block_operations_timeout
pipeline.net_db ~peer:pipeline.peer_id pipeline.net_db ~peer:pipeline.peer_id
@ -123,14 +123,14 @@ let rec operations_fetch_worker_loop pipeline =
lwt_log_info "fetched operations of block %a from peer %a." lwt_log_info "fetched operations of block %a from peer %a."
Block_hash.pp_short hash Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.push pipeline.fetched_blocks Lwt_pipe.push pipeline.fetched_blocks
(hash, header, operations) >>= return (hash, header, operations) >>= return
end end
end >>= function end >>= function
| Ok () -> | Ok () ->
operations_fetch_worker_loop pipeline operations_fetch_worker_loop pipeline
| Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> | Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] ->
Lwt_pipe.close pipeline.fetched_blocks ; Lwt_pipe.close pipeline.fetched_blocks ;
Lwt.return_unit Lwt.return_unit
| Error [ Distributed_db.Operations.Timeout (bh, n) ] -> | Error [ Distributed_db.Operations.Timeout (bh, n) ] ->
@ -149,13 +149,13 @@ let rec operations_fetch_worker_loop pipeline =
let rec validation_worker_loop pipeline = let rec validation_worker_loop pipeline =
begin begin
Lwt_unix.yield () >>= fun () -> Lwt_unix.yield () >>= fun () ->
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> protect ~canceler:pipeline.canceler begin fun () ->
Lwt_pipe.pop pipeline.fetched_blocks >>= return Lwt_pipe.pop pipeline.fetched_blocks >>= return
end >>=? fun (hash, header, operations) -> end >>=? fun (hash, header, operations) ->
lwt_log_info "requesting validation for block %a from peer %a." lwt_log_info "requesting validation for block %a from peer %a."
Block_hash.pp_short hash Block_hash.pp_short hash
P2p_peer.Id.pp_short pipeline.peer_id >>= fun () -> P2p_peer.Id.pp_short pipeline.peer_id >>= fun () ->
Lwt_utils_unix.protect ~canceler:pipeline.canceler begin fun () -> protect ~canceler:pipeline.canceler begin fun () ->
Block_validator.validate Block_validator.validate
~canceler:pipeline.canceler ~canceler:pipeline.canceler
~notify_new_block:pipeline.notify_new_block ~notify_new_block:pipeline.notify_new_block
@ -168,7 +168,7 @@ let rec validation_worker_loop pipeline =
return () return ()
end >>= function end >>= function
| Ok () -> validation_worker_loop pipeline | Ok () -> validation_worker_loop pipeline
| Error [Exn Lwt.Canceled | Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> | Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] ->
Lwt.return_unit Lwt.return_unit
| Error ([ Block_validator_errors.Invalid_block _ | Error ([ Block_validator_errors.Invalid_block _
| Block_validator_errors.Unavailable_protocol _ ] as err ) -> | Block_validator_errors.Unavailable_protocol _ ] as err ) ->

View File

@ -629,7 +629,7 @@ module P2p_reader = struct
end end
let rec worker_loop global_db state = let rec worker_loop global_db state =
Lwt_utils_unix.protect ~canceler:state.canceler begin fun () -> protect ~canceler:state.canceler begin fun () ->
P2p.recv global_db.p2p state.conn P2p.recv global_db.p2p state.conn
end >>= function end >>= function
| Ok msg -> | Ok msg ->

View File

@ -78,7 +78,7 @@ let () =
let rec worker_loop bv = let rec worker_loop bv =
begin begin
Lwt_utils_unix.protect ~canceler:bv.canceler begin fun () -> protect ~canceler:bv.canceler begin fun () ->
Lwt_pipe.pop bv.messages >>= return Lwt_pipe.pop bv.messages >>= return
end >>=? function Message (request, wakener) -> end >>=? function Message (request, wakener) ->
match request with match request with
@ -115,7 +115,7 @@ let rec worker_loop bv =
end >>= function end >>= function
| Ok () -> | Ok () ->
worker_loop bv worker_loop bv
| Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed] -> | Error [Canceled | Exn Lwt_pipe.Closed] ->
lwt_log_notice "terminating" >>= fun () -> lwt_log_notice "terminating" >>= fun () ->
Lwt.return_unit Lwt.return_unit
| Error err -> | Error err ->

View File

@ -203,9 +203,6 @@ module Make
let trigger_shutdown w = let trigger_shutdown w =
Lwt.ignore_result (Lwt_canceler.cancel w.canceler) Lwt.ignore_result (Lwt_canceler.cancel w.canceler)
let protect { canceler } ?on_error f =
Lwt_utils_unix.protect ?on_error ~canceler f
let canceler { canceler } = canceler let canceler { canceler } = canceler
let log_event w evt = let log_event w evt =
@ -272,7 +269,7 @@ module Make
Lwt.return_unit in Lwt.return_unit in
let rec loop () = let rec loop () =
begin begin
Lwt_utils_unix.protect ~canceler:w.canceler begin fun () -> protect ~canceler:w.canceler begin fun () ->
pop w pop w
end >>=? function end >>=? function
| None -> Handlers.on_no_request w | None -> Handlers.on_no_request w
@ -303,7 +300,7 @@ module Make
end >>= function end >>= function
| Ok () -> | Ok () ->
loop () loop ()
| Error [Lwt_utils_unix.Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] -> | Error [Canceled | Exn Lwt_pipe.Closed | Exn Lwt_dropbox.Closed ] ->
Logger.lwt_log_info Logger.lwt_log_info
"[%a] worker terminated" "[%a] worker terminated"
Name.pp w.name >>= fun () -> Name.pp w.name >>= fun () ->
@ -413,4 +410,7 @@ module Make
(fun n w acc -> (n, w) :: acc) (fun n w acc -> (n, w) :: acc)
instances [] instances []
let protect { canceler } ?on_error f =
protect ?on_error ~canceler f
end end

View File

@ -34,7 +34,7 @@ let blocking_create
create_inner Unix.F_LOCK ~close_on_exec ~unlink_on_exit fn in create_inner Unix.F_LOCK ~close_on_exec ~unlink_on_exit fn in
match timeout with match timeout with
| None -> create () | None -> create ()
| Some duration -> Lwt_utils_unix.with_timeout duration (fun _ -> create ()) | Some duration -> with_timeout (Lwt_unix.sleep duration) (fun _ -> create ())
let is_locked fn = let is_locked fn =
if not @@ Sys.file_exists fn then return false else if not @@ Sys.file_exists fn then return false else
@ -50,7 +50,7 @@ let is_locked fn =
let get_pid fn = let get_pid fn =
let open Lwt_io in let open Lwt_io in
Lwt_utils_unix.protect begin fun () -> protect begin fun () ->
with_file ~mode:Input fn begin fun ic -> with_file ~mode:Input fn begin fun ic ->
read ic >>= fun content -> read ic >>= fun content ->
return (int_of_string content) return (int_of_string content)

View File

@ -116,53 +116,3 @@ let getaddrinfo ~passive ~node ~service =
(fun { ai_addr ; _ } -> of_sockaddr ai_addr) (fun { ai_addr ; _ } -> of_sockaddr ai_addr)
addr in addr in
Lwt.return points Lwt.return points
open Error_monad
type error += Canceled
let protect ?on_error ?canceler t =
let cancelation =
match canceler with
| None -> Lwt_utils.never_ending
| Some canceler ->
(Lwt_canceler.cancelation canceler >>= fun () ->
fail Canceled ) in
let res =
Lwt.pick [ cancelation ;
Lwt.catch t (fun exn -> fail (Exn exn)) ] in
res >>= function
| Ok _ -> res
| Error err ->
let canceled =
Option.unopt_map canceler ~default:false ~f:Lwt_canceler.canceled in
let err = if canceled then [Canceled] else err in
match on_error with
| None -> Lwt.return (Error err)
| Some on_error ->
Lwt.catch (fun () -> on_error err) (fun exn -> fail (Exn exn))
type error += Timeout
let () =
register_error_kind
`Temporary
~id:"utils.Timeout"
~title:"Timeout"
~description:"Timeout"
Data_encoding.unit
(function Timeout -> Some () | _ -> None)
(fun () -> Timeout)
let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f =
let timeout = Lwt_unix.sleep timeout in
let target = f canceler in
Lwt.choose [ timeout ; (target >|= fun _ -> ()) ] >>= fun () ->
Lwt_unix.yield () >>= fun () ->
if Lwt.state target <> Lwt.Sleep then begin
Lwt.cancel timeout ;
target
end else begin
Lwt_canceler.cancel canceler >>= fun () ->
fail Timeout
end

View File

@ -28,18 +28,3 @@ val getaddrinfo:
passive:bool -> passive:bool ->
node:string -> service:string -> node:string -> service:string ->
(Ipaddr.V6.t * int) list Lwt.t (Ipaddr.V6.t * int) list Lwt.t
open Error_monad
type error += Canceled
val protect :
?on_error:(error list -> 'a tzresult Lwt.t) ->
?canceler:Lwt_canceler.t ->
(unit -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t
type error += Timeout
val with_timeout:
?canceler:Lwt_canceler.t ->
float -> (Lwt_canceler.t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t